commit 1f960800bbbdd8dbb52de125375384bcaf45f000 Author: Zack Weinberg zackw@cmu.edu Date: Mon Feb 27 16:34:13 2012 -0800
Remove ->circuit pointer from the generic conn_t.
Protocols are now expected to maintain a conn-to-circuit pointer in the derived foo_conn_t. This allows it to have the correct static type (i.e. foo_circuit_t) and thus eliminates a bunch of casting back and forth. --- src/connections.cc | 15 ++--- src/connections.h | 6 ++- src/network.cc | 4 +- src/protocol.h | 1 + src/protocol/chop.cc | 171 +++++++++++++++++++++++++++---------------------- src/protocol/null.cc | 57 +++++++++++++---- src/util.cc | 9 ++- 7 files changed, 158 insertions(+), 105 deletions(-)
diff --git a/src/connections.cc b/src/connections.cc index 6320746..37eaba3 100644 --- a/src/connections.cc +++ b/src/connections.cc @@ -115,10 +115,6 @@ conn_t::~conn_t() log_debug(this, "closing connection; %lu remaining", (unsigned long) connections.size());
- if (this->circuit) { - circuit_drop_downstream(this->circuit, this); - } - if (this->peername) free((void *)this->peername); if (this->buffer) @@ -127,6 +123,13 @@ conn_t::~conn_t() maybe_finish_shutdown(); }
+/** Potentially called during connection construction or destruction. */ +circuit_t * +conn_t::circuit() const +{ + return 0; +} + void conn_close(conn_t *conn) { @@ -290,16 +293,12 @@ circuit_add_upstream(circuit_t *ckt, struct bufferevent *buf, const char *peer) void circuit_add_downstream(circuit_t *ckt, conn_t *down) { - log_assert(!down->circuit); - down->circuit = ckt; ckt->add_downstream(down); }
void circuit_drop_downstream(circuit_t *ckt, conn_t *down) { - log_assert(down->circuit == ckt); - down->circuit = NULL; ckt->drop_downstream(down); }
diff --git a/src/connections.h b/src/connections.h index 6c0c5f7..b0cac55 100644 --- a/src/connections.h +++ b/src/connections.h @@ -15,7 +15,6 @@ socket-level connections) as quickly as possible. */ struct conn_t { config_t *cfg; - circuit_t *circuit; const char *peername; struct bufferevent *buffer; unsigned int serial; @@ -25,6 +24,11 @@ struct conn_t { conn_t() : connected(false), flushing(false) {} virtual ~conn_t();
+ /** Return the upstream circuit for this connection, if there is one. + NOTE: this is *not* a pure virtual method because it can be called + legitimately after the subclass destructor has run. */ + virtual circuit_t *circuit() const; + /** Create an upstream circuit for this connection, if it is possible to do so without receiving data from the downstream peer. If data must be received first, this method should do diff --git a/src/network.cc b/src/network.cc index a55e539..3ecb378 100644 --- a/src/network.cc +++ b/src/network.cc @@ -503,7 +503,7 @@ downstream_connect_cb(struct bufferevent *bev, short what, void *arg) /* Upon successful connection, enable traffic on both sides of the connection, and replace this callback with the regular event_cb */ if (what & BEV_EVENT_CONNECTED) { - circuit_t *ckt = conn->circuit; + circuit_t *ckt = conn->circuit(); log_assert(ckt); log_assert(ckt->up_peer); log_assert(conn->buffer == bev); @@ -539,7 +539,7 @@ static void downstream_socks_connect_cb(struct bufferevent *bev, short what, void *arg) { conn_t *conn = (conn_t *)arg; - circuit_t *ckt = conn->circuit; + circuit_t *ckt = conn->circuit(); socks_state_t *socks;
log_debug(conn, "what=%04hx", what); diff --git a/src/protocol.h b/src/protocol.h index c48238c..d28bf19 100644 --- a/src/protocol.h +++ b/src/protocol.h @@ -123,6 +123,7 @@ extern const proto_module *const supported_protos[]; #define CONN_DECLARE_METHODS(mod) \ mod##_conn_t(); \ virtual ~mod##_conn_t(); \ + virtual circuit_t *circuit() const; \ virtual int maybe_open_upstream(); \ virtual int handshake(); \ virtual int recv(); \ diff --git a/src/protocol/chop.cc b/src/protocol/chop.cc index 89a9ccc..536481c 100644 --- a/src/protocol/chop.cc +++ b/src/protocol/chop.cc @@ -71,11 +71,13 @@ static const uint8_t s2c_key[] =
/* Connections and circuits */
-typedef unordered_map<uint64_t, circuit_t *> chop_circuit_table; - namespace { + struct chop_circuit_t; + typedef unordered_map<uint64_t, chop_circuit_t *> chop_circuit_table; + struct chop_conn_t : conn_t { + chop_circuit_t *upstream; steg_t *steg; struct evbuffer *recv_pending; struct event *must_transmit_timer; @@ -87,7 +89,7 @@ namespace { struct chop_circuit_t : circuit_t { chop_reassembly_elt reassembly_queue; - unordered_set<conn_t *> downstreams; + unordered_set<chop_conn_t *> downstreams; encryptor *send_crypt; decryptor *recv_crypt;
@@ -240,22 +242,22 @@ chop_decrypt_header(chop_circuit_t *ckt,
/* Transmit subroutines. */
-static conn_t * +static chop_conn_t * chop_pick_connection(chop_circuit_t *ckt, size_t desired, size_t *blocksize) { size_t maxbelow = 0; size_t minabove = SIZE_MAX; - conn_t *targbelow = NULL; - conn_t *targabove = NULL; + chop_conn_t *targbelow = NULL; + chop_conn_t *targabove = NULL;
if (desired > CHOP_MAX_DATA) desired = CHOP_MAX_DATA;
/* Find the best fit for the desired transmission from all the outbound connections' transmit rooms. */ - for (unordered_set<conn_t *>::iterator i = ckt->downstreams.begin(); + for (unordered_set<chop_conn_t *>::iterator i = ckt->downstreams.begin(); i != ckt->downstreams.end(); i++) { - chop_conn_t *conn = static_cast<chop_conn_t *>(*i); + chop_conn_t *conn = *i; /* We can only use candidates that have a steg target already. */ if (conn->steg) { /* Find the connections whose transmit rooms are closest to the @@ -304,14 +306,13 @@ chop_pick_connection(chop_circuit_t *ckt, size_t desired, size_t *blocksize) }
static int -chop_send_block(conn_t *d, +chop_send_block(chop_conn_t *dest, chop_circuit_t *ckt, struct evbuffer *source, struct evbuffer *block, uint16_t length, uint16_t flags) { - chop_conn_t *dest = static_cast<chop_conn_t *>(d); chop_header hdr; struct evbuffer_iovec v; uint8_t *p; @@ -381,18 +382,17 @@ chop_send_block(conn_t *d, }
static int -chop_send_blocks(circuit_t *c) +chop_send_blocks(chop_circuit_t *ckt) { - chop_circuit_t *ckt = static_cast<chop_circuit_t *>(c); - struct evbuffer *xmit_pending = bufferevent_get_input(c->up_buffer); + struct evbuffer *xmit_pending = bufferevent_get_input(ckt->up_buffer); struct evbuffer *block; - conn_t *target; + chop_conn_t *target; size_t avail; size_t blocksize; uint16_t flags;
if (!(block = evbuffer_new())) { - log_warn(c, "allocation failure"); + log_warn(ckt, "allocation failure"); return -1; }
@@ -400,14 +400,14 @@ chop_send_blocks(circuit_t *c) avail = evbuffer_get_length(xmit_pending); flags = ckt->sent_syn ? 0 : CHOP_F_SYN;
- log_debug(c, "%lu bytes to send", (unsigned long)avail); + log_debug(ckt, "%lu bytes to send", (unsigned long)avail);
if (avail == 0) break;
target = chop_pick_connection(ckt, avail, &blocksize); if (!target) { - log_debug(c, "no target connection available"); + log_debug(ckt, "no target connection available"); /* this is not an error; it can happen e.g. when the server has something to send immediately and the client hasn't spoken yet */ break; @@ -428,15 +428,14 @@ chop_send_blocks(circuit_t *c) evbuffer_free(block); avail = evbuffer_get_length(xmit_pending); if (avail) - log_debug(c, "%lu bytes still waiting to be sent", (unsigned long)avail); + log_debug(ckt, "%lu bytes still waiting to be sent", (unsigned long)avail); return 0; }
static int -chop_send_targeted(circuit_t *c, conn_t *target, size_t blocksize) +chop_send_targeted(chop_circuit_t *ckt, chop_conn_t *target, size_t blocksize) { - chop_circuit_t *ckt = static_cast<chop_circuit_t *>(c); - struct evbuffer *xmit_pending = bufferevent_get_input(c->up_buffer); + struct evbuffer *xmit_pending = bufferevent_get_input(ckt->up_buffer); size_t avail = evbuffer_get_length(xmit_pending); struct evbuffer *block = evbuffer_new(); uint16_t flags = 0; @@ -467,7 +466,8 @@ chop_send_targeted(circuit_t *c, conn_t *target, size_t blocksize) evbuffer_free(block); avail = evbuffer_get_length(xmit_pending); if (avail) - log_debug(c, "%lu bytes still waiting to be sent", (unsigned long)avail); + log_debug(ckt, "%lu bytes still waiting to be sent", + (unsigned long)avail); return 0;
} else { @@ -510,18 +510,17 @@ chop_send_targeted(circuit_t *c, conn_t *target, size_t blocksize) }
static int -chop_send_chaff(circuit_t *c) +chop_send_chaff(chop_circuit_t *ckt) { - chop_circuit_t *ckt = static_cast<chop_circuit_t *>(c); size_t room;
- conn_t *target = chop_pick_connection(ckt, 1, &room); + chop_conn_t *target = chop_pick_connection(ckt, 1, &room); if (!target) { /* If we have connections and we can't send, that means we're waiting for the server to respond. Just wait. */ return 0; } - return chop_send_targeted(c, target, room); + return chop_send_targeted(ckt, target, room); }
static void @@ -530,7 +529,7 @@ must_transmit_timer_cb(evutil_socket_t, short, void *arg) chop_conn_t *conn = static_cast<chop_conn_t*>(arg); size_t room;
- if (!conn->circuit) { + if (!conn->upstream) { log_debug(conn, "must transmit, but no circuit (stale connection)"); conn_do_flush(conn); return; @@ -547,7 +546,7 @@ must_transmit_timer_cb(evutil_socket_t, short, void *arg) }
log_debug(conn, "must transmit"); - chop_send_targeted(conn->circuit, conn, room - CHOP_BLOCK_OVERHD); + chop_send_targeted(conn->upstream, conn, room - CHOP_BLOCK_OVERHD); }
/* Receive subroutines. */ @@ -571,9 +570,9 @@ mod32_le(uint32_t s, uint32_t t) /** Add BLOCK to the reassembly queue at the appropriate location and merge adjacent blocks to the extent possible. */ static int -chop_reassemble_block(circuit_t *c, struct evbuffer *block, chop_header *hdr) +chop_reassemble_block(chop_circuit_t *ckt, struct evbuffer *block, + chop_header *hdr) { - chop_circuit_t *ckt = static_cast<chop_circuit_t *>(c); chop_reassembly_elt *queue = &ckt->reassembly_queue; chop_reassembly_elt *p, *q;
@@ -583,14 +582,14 @@ chop_reassemble_block(circuit_t *c, struct evbuffer *block, chop_header *hdr) contents. Doing all chaff-handling here simplifies the caller at the expense of slightly more buffer-management overhead. */ if (!(hdr->flags & (CHOP_F_SYN|CHOP_F_FIN))) { - log_debug(c, "discarding chaff with no flags"); + log_debug(ckt, "discarding chaff with no flags"); evbuffer_free(block); return 0; }
hdr->length = 0; evbuffer_drain(block, evbuffer_get_length(block)); - log_debug(c, "chaff with flags, treating length as 0"); + log_debug(ckt, "chaff with flags, treating length as 0"); }
/* SYN must occur at offset zero, may not be duplicated, and if we @@ -601,7 +600,7 @@ chop_reassemble_block(circuit_t *c, struct evbuffer *block, chop_header *hdr) (queue->next != queue && ((queue->next->flags & CHOP_F_SYN) || !mod32_le(hdr->offset + hdr->length, queue->next->offset))))) { - log_warn(c, "protocol error: inappropriate SYN block"); + log_warn(ckt, "protocol error: inappropriate SYN block"); return -1; }
@@ -610,7 +609,7 @@ chop_reassemble_block(circuit_t *c, struct evbuffer *block, chop_header *hdr) if ((hdr->flags & CHOP_F_FIN) && queue->prev != queue && ((queue->prev->flags & CHOP_F_FIN) || !mod32_le(queue->prev->offset + queue->prev->length, hdr->offset))) { - log_warn(c, "protocol error: inappropriate FIN block"); + log_warn(ckt, "protocol error: inappropriate FIN block"); return -1; }
@@ -621,7 +620,7 @@ chop_reassemble_block(circuit_t *c, struct evbuffer *block, chop_header *hdr) !mod32_le(queue->next->offset + queue->next->length, hdr->offset)) || ((queue->prev->flags & CHOP_F_FIN) && !mod32_le(hdr->offset + hdr->length, queue->prev->offset)))) { - log_warn(c, "protocol error: inappropriate normal block"); + log_warn(ckt, "protocol error: inappropriate normal block"); return -1; }
@@ -643,7 +642,7 @@ chop_reassemble_block(circuit_t *c, struct evbuffer *block, chop_header *hdr)
/* protocol error: this block goes before 'p' but does not fit after 'p->prev' */ - log_warn(c, "protocol error: %u byte block does not fit at offset %u", + log_warn(ckt, "protocol error: %u byte block does not fit at offset %u", hdr->length, hdr->offset); return -1; } @@ -654,7 +653,7 @@ chop_reassemble_block(circuit_t *c, struct evbuffer *block, chop_header *hdr) that this block goes after the last block in the list (aka p->prev). */ if (!p->data && p->prev->data && !mod32_lt(p->prev->offset + p->prev->length, hdr->offset)) { - log_warn(c, "protocol error: %u byte block does not fit at offset %u " + log_warn(ckt, "protocol error: %u byte block does not fit at offset %u " "(sentinel case)", hdr->length, hdr->offset); return -1; @@ -674,7 +673,7 @@ chop_reassemble_block(circuit_t *c, struct evbuffer *block, chop_header *hdr)
grow_back: if (evbuffer_add_buffer(p->data, block)) { - log_warn(c, "failed to append to existing buffer"); + log_warn(ckt, "failed to append to existing buffer"); return -1; } evbuffer_free(block); @@ -685,7 +684,7 @@ chop_reassemble_block(circuit_t *c, struct evbuffer *block, chop_header *hdr) while (p->next->data && p->offset + p->length == p->next->offset) { q = p->next; if (evbuffer_add_buffer(p->data, q->data)) { - log_warn(c, "failed to merge buffers"); + log_warn(ckt, "failed to merge buffers"); return -1; } p->length += q->length; @@ -700,7 +699,7 @@ chop_reassemble_block(circuit_t *c, struct evbuffer *block, chop_header *hdr)
grow_front: if (evbuffer_prepend_buffer(p->data, block)) { - log_warn(c, "failed to prepend to existing buffer"); + log_warn(ckt, "failed to prepend to existing buffer"); return -1; } evbuffer_free(block); @@ -712,7 +711,7 @@ chop_reassemble_block(circuit_t *c, struct evbuffer *block, chop_header *hdr) while (p->prev->data && p->offset == p->prev->offset + p->prev->length) { q = p->prev; if (evbuffer_prepend_buffer(p->data, q->data)) { - log_warn(c, "failed to merge buffers"); + log_warn(ckt, "failed to merge buffers"); return -1; } p->length += q->length; @@ -730,31 +729,31 @@ chop_reassemble_block(circuit_t *c, struct evbuffer *block, chop_header *hdr)
/* Flush as much data toward upstream as we can. */ static int -chop_push_to_upstream(circuit_t *c) +chop_push_to_upstream(chop_circuit_t *ckt) { - chop_circuit_t *ckt = static_cast<chop_circuit_t *>(c); /* Only the first reassembly queue entry, if any, can possibly be ready to flush (because chop_reassemble_block ensures that there are gaps between all queue elements). */ chop_reassembly_elt *ready = ckt->reassembly_queue.next; if (!ready->data || ckt->recv_offset != ready->offset) { - log_debug(c, "no data pushable to upstream yet"); + log_debug(ckt, "no data pushable to upstream yet"); return 0; }
if (!ckt->received_syn) { if (!(ready->flags & CHOP_F_SYN)) { - log_debug(c, "waiting for SYN"); + log_debug(ckt, "waiting for SYN"); return 0; } - log_debug(c, "processed SYN"); + log_debug(ckt, "processed SYN"); ckt->received_syn = true; }
- log_debug(c, "can push %lu bytes to upstream", + log_debug(ckt, "can push %lu bytes to upstream", (unsigned long)evbuffer_get_length(ready->data)); - if (evbuffer_add_buffer(bufferevent_get_output(c->up_buffer), ready->data)) { - log_warn(c, "failure pushing data to upstream"); + if (evbuffer_add_buffer(bufferevent_get_output(ckt->up_buffer), + ready->data)) { + log_warn(ckt, "failure pushing data to upstream"); return -1; }
@@ -765,8 +764,8 @@ chop_push_to_upstream(circuit_t *c) log_assert(!ckt->received_fin); log_assert(ready->next == &ckt->reassembly_queue); ckt->received_fin = true; - log_debug(c, "processed FIN"); - circuit_recv_eof(c); + log_debug(ckt, "processed FIN"); + circuit_recv_eof(ckt); }
log_assert(ready->next == &ckt->reassembly_queue || @@ -782,12 +781,12 @@ chop_push_to_upstream(circuit_t *c) /* Circuit handling */
static int -chop_find_or_make_circuit(conn_t *conn, uint64_t circuit_id) +chop_find_or_make_circuit(chop_conn_t *conn, uint64_t circuit_id) { chop_config_t *cfg = static_cast<chop_config_t *>(conn->cfg); chop_circuit_table::value_type in(circuit_id, 0); std::pair<chop_circuit_table::iterator, bool> out = cfg->circuits.insert(in); - circuit_t *ck; + chop_circuit_t *ck;
if (!out.second) { // element already exists if (!out.first->second) { @@ -797,7 +796,7 @@ chop_find_or_make_circuit(conn_t *conn, uint64_t circuit_id) ck = out.first->second; log_debug(conn, "found circuit to %s", ck->up_peer); } else { - ck = circuit_create(cfg, 0); + ck = dynamic_cast<chop_circuit_t *>(circuit_create(cfg, 0)); if (!ck) { log_warn(conn, "failed to create new circuit"); return -1; @@ -808,7 +807,7 @@ chop_find_or_make_circuit(conn_t *conn, uint64_t circuit_id) return -1; } log_debug(conn, "created new circuit to %s", ck->up_peer); - static_cast<chop_circuit_t *>(ck)->circuit_id = circuit_id; + ck->circuit_id = circuit_id; out.first->second = ck; }
@@ -978,10 +977,10 @@ chop_circuit_t::~chop_circuit_t() upstream_eof ? '+' : '-', (unsigned long)downstreams.size());
- for (unordered_set<conn_t *>::iterator i = this->downstreams.begin(); + for (unordered_set<chop_conn_t *>::iterator i = this->downstreams.begin(); i != this->downstreams.end(); i++) { - conn_t *conn = *i; - conn->circuit = NULL; + chop_conn_t *conn = *i; + conn->upstream = NULL; if (evbuffer_get_length(conn_get_outbound(conn)) > 0) conn_do_flush(conn); else @@ -1016,9 +1015,15 @@ chop_circuit_t::~chop_circuit_t() }
void -chop_circuit_t::add_downstream(conn_t *conn) +chop_circuit_t::add_downstream(conn_t *cn) { + chop_conn_t *conn = dynamic_cast<chop_conn_t *>(cn); + log_assert(conn); + log_assert(!conn->upstream); + + conn->upstream = this; this->downstreams.insert(conn); + log_debug(this, "added connection <%d.%d> to %s, now %lu", this->serial, conn->serial, conn->peername, (unsigned long)this->downstreams.size()); @@ -1027,9 +1032,15 @@ chop_circuit_t::add_downstream(conn_t *conn) }
void -chop_circuit_t::drop_downstream(conn_t *conn) +chop_circuit_t::drop_downstream(conn_t *cn) { + chop_conn_t *conn = dynamic_cast<chop_conn_t *>(cn); + log_assert(conn); + log_assert(conn->upstream == this); + + conn->upstream = NULL; this->downstreams.erase(conn); + log_debug(this, "dropped connection <%d.%d> to %s, now %lu", this->serial, conn->serial, conn->peername, (unsigned long)this->downstreams.size()); @@ -1078,6 +1089,8 @@ chop_conn_t::chop_conn_t()
chop_conn_t::~chop_conn_t() { + if (this->upstream) + circuit_drop_downstream(this->upstream, this); if (this->steg) delete this->steg; if (this->must_transmit_timer) @@ -1085,6 +1098,12 @@ chop_conn_t::~chop_conn_t() evbuffer_free(this->recv_pending); }
+circuit_t * +chop_conn_t::circuit() const +{ + return this->upstream; +} + int chop_conn_t::maybe_open_upstream() { @@ -1104,7 +1123,7 @@ chop_conn_t::handshake() server can't even _connect to its upstream_ till it gets the first packet from the client. */ if (this->cfg->mode != LSN_SIMPLE_SERVER) - circuit_arm_flush_timer(this->circuit, 1); + circuit_arm_flush_timer(this->upstream, 1); return 0; }
@@ -1143,9 +1162,9 @@ chop_circuit_t::send() as long as we haven't both sent and received a FIN, or we might deadlock. */ if (this->sent_fin && this->received_fin) { - for (unordered_set<conn_t *>::iterator i = this->downstreams.begin(); + for (unordered_set<chop_conn_t *>::iterator i = this->downstreams.begin(); i != this->downstreams.end(); i++) { - chop_conn_t *conn = static_cast<chop_conn_t*>(*i); + chop_conn_t *conn = *i; if (conn->must_transmit_timer && evtimer_pending(conn->must_transmit_timer, NULL)) must_transmit_timer_cb(-1, 0, conn); @@ -1168,7 +1187,6 @@ chop_circuit_t::send_eof() int chop_conn_t::recv() { - circuit_t *c; chop_circuit_t *ckt; chop_header hdr; struct evbuffer *block; @@ -1178,7 +1196,7 @@ chop_conn_t::recv() if (this->steg->receive(this, this->recv_pending)) return -1;
- if (!this->circuit) { + if (!this->upstream) { log_debug(this, "finding circuit"); if (chop_peek_circuit_id(this->recv_pending, &hdr)) { log_debug(this, "not enough data to find circuit yet"); @@ -1193,7 +1211,7 @@ chop_conn_t::recv() it's either chaff or a protocol error; either way we can just discard it. Since we will never reply, call conn_do_flush so the connection will be dropped as soon as we receive an EOF. */ - if (!this->circuit) { + if (!this->upstream) { evbuffer_drain(this->recv_pending, evbuffer_get_length(this->recv_pending)); conn_do_flush(this); @@ -1201,9 +1219,8 @@ chop_conn_t::recv() } }
- c = this->circuit; - ckt = static_cast<chop_circuit_t *>(c); - log_debug(this, "circuit to %s", c->up_peer); + ckt = this->upstream; + log_debug(this, "circuit to %s", ckt->up_peer);
for (;;) { avail = evbuffer_get_length(this->recv_pending); @@ -1269,18 +1286,18 @@ chop_conn_t::recv() return -1; }
- if (chop_reassemble_block(c, block, &hdr)) { + if (chop_reassemble_block(ckt, block, &hdr)) { evbuffer_free(block); return -1; } }
- if (chop_push_to_upstream(c)) + if (chop_push_to_upstream(ckt)) return -1;
/* It may have now become possible to send queued data. */ - if (evbuffer_get_length(bufferevent_get_input(c->up_buffer))) - c->send(); + if (evbuffer_get_length(bufferevent_get_input(ckt->up_buffer))) + ckt->send();
/* If we're at EOF, close all connections (sending first if necessary). If we're the client we have to keep trying to talk @@ -1288,9 +1305,9 @@ chop_conn_t::recv() deadlock. */ else if (ckt->sent_fin && ckt->received_fin) { circuit_disarm_flush_timer(ckt); - for (unordered_set<conn_t *>::iterator i = ckt->downstreams.begin(); + for (unordered_set<chop_conn_t *>::iterator i = ckt->downstreams.begin(); i != ckt->downstreams.end(); i++) { - chop_conn_t *conn = static_cast<chop_conn_t*>(*i); + chop_conn_t *conn = *i; if (conn->must_transmit_timer && evtimer_pending(conn->must_transmit_timer, NULL)) must_transmit_timer_cb(-1, 0, conn); @@ -1312,8 +1329,8 @@ chop_conn_t::recv_eof() We should only drop the connection from the circuit if we're no longer sending in the opposite direction. Also, we should not drop the connection if its must-transmit timer is still pending. */ - if (this->circuit) { - chop_circuit_t *ckt = static_cast<chop_circuit_t *>(this->circuit); + if (this->upstream) { + chop_circuit_t *ckt = this->upstream;
if (evbuffer_get_length(conn_get_inbound(this)) > 0) if (this->recv()) diff --git a/src/protocol/null.cc b/src/protocol/null.cc index 27a9320..be89821 100644 --- a/src/protocol/null.cc +++ b/src/protocol/null.cc @@ -9,19 +9,26 @@ #include <event2/buffer.h>
namespace { - struct null_config_t : config_t { + struct null_config_t : config_t + { struct evutil_addrinfo *listen_addr; struct evutil_addrinfo *target_addr;
CONFIG_DECLARE_METHODS(null); };
- struct null_conn_t : conn_t { + struct null_circuit_t; + + struct null_conn_t : conn_t + { + null_circuit_t *upstream; + CONN_DECLARE_METHODS(null); };
- struct null_circuit_t : circuit_t { - conn_t *downstream; + struct null_circuit_t : circuit_t + { + null_conn_t *downstream;
CIRCUIT_DECLARE_METHODS(null); }; @@ -112,12 +119,13 @@ null_config_t::get_target_addrs(size_t n) circuit_t * null_config_t::circuit_create(size_t) { - circuit_t *ckt = new null_circuit_t; + null_circuit_t *ckt = new null_circuit_t; ckt->cfg = this; return ckt; }
null_circuit_t::null_circuit_t() + : downstream(NULL) { }
@@ -126,17 +134,23 @@ null_circuit_t::~null_circuit_t() if (downstream) { /* break the circular reference before deallocating the downstream connection */ - downstream->circuit = NULL; + downstream->upstream = NULL; delete downstream; } }
/* Add a connection to this circuit. */ void -null_circuit_t::add_downstream(conn_t *conn) +null_circuit_t::add_downstream(conn_t *cn) { + null_conn_t *conn = dynamic_cast<null_conn_t *>(cn); + log_assert(conn); + log_assert(!conn->upstream); log_assert(!this->downstream); + this->downstream = conn; + conn->upstream = this; + log_debug(this, "added connection <%d.%d> to %s", this->serial, conn->serial, conn->peername); } @@ -145,12 +159,18 @@ null_circuit_t::add_downstream(conn_t *conn) protocol, it is because of a network error, and the whole circuit should be closed. */ void -null_circuit_t::drop_downstream(conn_t *conn) +null_circuit_t::drop_downstream(conn_t *cn) { + null_conn_t *conn = dynamic_cast<null_conn_t *>(cn); + log_assert(conn); log_assert(this->downstream == conn); + log_assert(conn->upstream == this); + log_debug(this, "dropped connection <%d.%d> to %s", this->serial, conn->serial, conn->peername); this->downstream = NULL; + conn->upstream = NULL; + if (evbuffer_get_length(bufferevent_get_output(this->up_buffer)) > 0) /* this may already have happened, but there's no harm in doing it again */ @@ -190,18 +210,29 @@ null_config_t::conn_create(size_t) }
null_conn_t::null_conn_t() + : upstream(NULL) { }
null_conn_t::~null_conn_t() { + if (this->upstream) + circuit_drop_downstream(this->upstream, this); +} + +/* Only used by connection callbacks */ +circuit_t * +null_conn_t::circuit() const +{ + return upstream; }
/** Null inbound-to-outbound connections are 1:1 */ int null_conn_t::maybe_open_upstream() { - circuit_t *ckt = circuit_create(this->cfg, 0); + null_circuit_t *ckt = dynamic_cast<null_circuit_t *> + (circuit_create(this->cfg, 0)); if (!ckt) return -1;
@@ -221,8 +252,8 @@ null_conn_t::handshake() int null_conn_t::recv() { - log_assert(this->circuit); - return evbuffer_add_buffer(bufferevent_get_output(this->circuit->up_buffer), + log_assert(this->upstream); + return evbuffer_add_buffer(bufferevent_get_output(this->upstream->up_buffer), conn_get_inbound(this)); }
@@ -230,12 +261,12 @@ null_conn_t::recv() int null_conn_t::recv_eof() { - if (this->circuit) { + if (this->upstream) { if (evbuffer_get_length(conn_get_inbound(this)) > 0) if (this->recv()) return -1;
- circuit_recv_eof(this->circuit); + circuit_recv_eof(this->upstream); } return 0; } diff --git a/src/util.cc b/src/util.cc index 9cca1fd..7273671 100644 --- a/src/util.cc +++ b/src/util.cc @@ -595,10 +595,11 @@ logpfx(int severity, const char *fn, conn_t *conn) fprintf(log_dest, "[%s] ", sev_to_string(severity)); if (log_min_sev == LOG_SEV_DEBUG && fn) fprintf(log_dest, "%s: ", fn); - if (conn) - fprintf(log_dest, "<%u.%u> ", - conn->circuit ? conn->circuit->serial : 0, - conn->serial); + if (conn) { + circuit_t *ckt = conn->circuit(); + unsigned int ckt_serial = ckt ? ckt->serial : 0; + fprintf(log_dest, "<%u.%u> ", ckt_serial, conn->serial); + } }
/**** Public logging API. ****/