commit 1f960800bbbdd8dbb52de125375384bcaf45f000
Author: Zack Weinberg <zackw(a)cmu.edu>
Date: Mon Feb 27 16:34:13 2012 -0800
Remove ->circuit pointer from the generic conn_t.
Protocols are now expected to maintain a conn-to-circuit pointer
in the derived foo_conn_t. This allows it to have the correct
static type (i.e. foo_circuit_t) and thus eliminates a bunch of
casting back and forth.
---
src/connections.cc | 15 ++---
src/connections.h | 6 ++-
src/network.cc | 4 +-
src/protocol.h | 1 +
src/protocol/chop.cc | 171 +++++++++++++++++++++++++++----------------------
src/protocol/null.cc | 57 +++++++++++++----
src/util.cc | 9 ++-
7 files changed, 158 insertions(+), 105 deletions(-)
diff --git a/src/connections.cc b/src/connections.cc
index 6320746..37eaba3 100644
--- a/src/connections.cc
+++ b/src/connections.cc
@@ -115,10 +115,6 @@ conn_t::~conn_t()
log_debug(this, "closing connection; %lu remaining",
(unsigned long) connections.size());
- if (this->circuit) {
- circuit_drop_downstream(this->circuit, this);
- }
-
if (this->peername)
free((void *)this->peername);
if (this->buffer)
@@ -127,6 +123,13 @@ conn_t::~conn_t()
maybe_finish_shutdown();
}
+/** Potentially called during connection construction or destruction. */
+circuit_t *
+conn_t::circuit() const
+{
+ return 0;
+}
+
void
conn_close(conn_t *conn)
{
@@ -290,16 +293,12 @@ circuit_add_upstream(circuit_t *ckt, struct bufferevent *buf, const char *peer)
void
circuit_add_downstream(circuit_t *ckt, conn_t *down)
{
- log_assert(!down->circuit);
- down->circuit = ckt;
ckt->add_downstream(down);
}
void
circuit_drop_downstream(circuit_t *ckt, conn_t *down)
{
- log_assert(down->circuit == ckt);
- down->circuit = NULL;
ckt->drop_downstream(down);
}
diff --git a/src/connections.h b/src/connections.h
index 6c0c5f7..b0cac55 100644
--- a/src/connections.h
+++ b/src/connections.h
@@ -15,7 +15,6 @@
socket-level connections) as quickly as possible. */
struct conn_t {
config_t *cfg;
- circuit_t *circuit;
const char *peername;
struct bufferevent *buffer;
unsigned int serial;
@@ -25,6 +24,11 @@ struct conn_t {
conn_t() : connected(false), flushing(false) {}
virtual ~conn_t();
+ /** Return the upstream circuit for this connection, if there is one.
+ NOTE: this is *not* a pure virtual method because it can be called
+ legitimately after the subclass destructor has run. */
+ virtual circuit_t *circuit() const;
+
/** Create an upstream circuit for this connection, if it is
possible to do so without receiving data from the downstream
peer. If data must be received first, this method should do
diff --git a/src/network.cc b/src/network.cc
index a55e539..3ecb378 100644
--- a/src/network.cc
+++ b/src/network.cc
@@ -503,7 +503,7 @@ downstream_connect_cb(struct bufferevent *bev, short what, void *arg)
/* Upon successful connection, enable traffic on both sides of the
connection, and replace this callback with the regular event_cb */
if (what & BEV_EVENT_CONNECTED) {
- circuit_t *ckt = conn->circuit;
+ circuit_t *ckt = conn->circuit();
log_assert(ckt);
log_assert(ckt->up_peer);
log_assert(conn->buffer == bev);
@@ -539,7 +539,7 @@ static void
downstream_socks_connect_cb(struct bufferevent *bev, short what, void *arg)
{
conn_t *conn = (conn_t *)arg;
- circuit_t *ckt = conn->circuit;
+ circuit_t *ckt = conn->circuit();
socks_state_t *socks;
log_debug(conn, "what=%04hx", what);
diff --git a/src/protocol.h b/src/protocol.h
index c48238c..d28bf19 100644
--- a/src/protocol.h
+++ b/src/protocol.h
@@ -123,6 +123,7 @@ extern const proto_module *const supported_protos[];
#define CONN_DECLARE_METHODS(mod) \
mod##_conn_t(); \
virtual ~mod##_conn_t(); \
+ virtual circuit_t *circuit() const; \
virtual int maybe_open_upstream(); \
virtual int handshake(); \
virtual int recv(); \
diff --git a/src/protocol/chop.cc b/src/protocol/chop.cc
index 89a9ccc..536481c 100644
--- a/src/protocol/chop.cc
+++ b/src/protocol/chop.cc
@@ -71,11 +71,13 @@ static const uint8_t s2c_key[] =
/* Connections and circuits */
-typedef unordered_map<uint64_t, circuit_t *> chop_circuit_table;
-
namespace {
+ struct chop_circuit_t;
+ typedef unordered_map<uint64_t, chop_circuit_t *> chop_circuit_table;
+
struct chop_conn_t : conn_t
{
+ chop_circuit_t *upstream;
steg_t *steg;
struct evbuffer *recv_pending;
struct event *must_transmit_timer;
@@ -87,7 +89,7 @@ namespace {
struct chop_circuit_t : circuit_t
{
chop_reassembly_elt reassembly_queue;
- unordered_set<conn_t *> downstreams;
+ unordered_set<chop_conn_t *> downstreams;
encryptor *send_crypt;
decryptor *recv_crypt;
@@ -240,22 +242,22 @@ chop_decrypt_header(chop_circuit_t *ckt,
/* Transmit subroutines. */
-static conn_t *
+static chop_conn_t *
chop_pick_connection(chop_circuit_t *ckt, size_t desired, size_t *blocksize)
{
size_t maxbelow = 0;
size_t minabove = SIZE_MAX;
- conn_t *targbelow = NULL;
- conn_t *targabove = NULL;
+ chop_conn_t *targbelow = NULL;
+ chop_conn_t *targabove = NULL;
if (desired > CHOP_MAX_DATA)
desired = CHOP_MAX_DATA;
/* Find the best fit for the desired transmission from all the
outbound connections' transmit rooms. */
- for (unordered_set<conn_t *>::iterator i = ckt->downstreams.begin();
+ for (unordered_set<chop_conn_t *>::iterator i = ckt->downstreams.begin();
i != ckt->downstreams.end(); i++) {
- chop_conn_t *conn = static_cast<chop_conn_t *>(*i);
+ chop_conn_t *conn = *i;
/* We can only use candidates that have a steg target already. */
if (conn->steg) {
/* Find the connections whose transmit rooms are closest to the
@@ -304,14 +306,13 @@ chop_pick_connection(chop_circuit_t *ckt, size_t desired, size_t *blocksize)
}
static int
-chop_send_block(conn_t *d,
+chop_send_block(chop_conn_t *dest,
chop_circuit_t *ckt,
struct evbuffer *source,
struct evbuffer *block,
uint16_t length,
uint16_t flags)
{
- chop_conn_t *dest = static_cast<chop_conn_t *>(d);
chop_header hdr;
struct evbuffer_iovec v;
uint8_t *p;
@@ -381,18 +382,17 @@ chop_send_block(conn_t *d,
}
static int
-chop_send_blocks(circuit_t *c)
+chop_send_blocks(chop_circuit_t *ckt)
{
- chop_circuit_t *ckt = static_cast<chop_circuit_t *>(c);
- struct evbuffer *xmit_pending = bufferevent_get_input(c->up_buffer);
+ struct evbuffer *xmit_pending = bufferevent_get_input(ckt->up_buffer);
struct evbuffer *block;
- conn_t *target;
+ chop_conn_t *target;
size_t avail;
size_t blocksize;
uint16_t flags;
if (!(block = evbuffer_new())) {
- log_warn(c, "allocation failure");
+ log_warn(ckt, "allocation failure");
return -1;
}
@@ -400,14 +400,14 @@ chop_send_blocks(circuit_t *c)
avail = evbuffer_get_length(xmit_pending);
flags = ckt->sent_syn ? 0 : CHOP_F_SYN;
- log_debug(c, "%lu bytes to send", (unsigned long)avail);
+ log_debug(ckt, "%lu bytes to send", (unsigned long)avail);
if (avail == 0)
break;
target = chop_pick_connection(ckt, avail, &blocksize);
if (!target) {
- log_debug(c, "no target connection available");
+ log_debug(ckt, "no target connection available");
/* this is not an error; it can happen e.g. when the server has
something to send immediately and the client hasn't spoken yet */
break;
@@ -428,15 +428,14 @@ chop_send_blocks(circuit_t *c)
evbuffer_free(block);
avail = evbuffer_get_length(xmit_pending);
if (avail)
- log_debug(c, "%lu bytes still waiting to be sent", (unsigned long)avail);
+ log_debug(ckt, "%lu bytes still waiting to be sent", (unsigned long)avail);
return 0;
}
static int
-chop_send_targeted(circuit_t *c, conn_t *target, size_t blocksize)
+chop_send_targeted(chop_circuit_t *ckt, chop_conn_t *target, size_t blocksize)
{
- chop_circuit_t *ckt = static_cast<chop_circuit_t *>(c);
- struct evbuffer *xmit_pending = bufferevent_get_input(c->up_buffer);
+ struct evbuffer *xmit_pending = bufferevent_get_input(ckt->up_buffer);
size_t avail = evbuffer_get_length(xmit_pending);
struct evbuffer *block = evbuffer_new();
uint16_t flags = 0;
@@ -467,7 +466,8 @@ chop_send_targeted(circuit_t *c, conn_t *target, size_t blocksize)
evbuffer_free(block);
avail = evbuffer_get_length(xmit_pending);
if (avail)
- log_debug(c, "%lu bytes still waiting to be sent", (unsigned long)avail);
+ log_debug(ckt, "%lu bytes still waiting to be sent",
+ (unsigned long)avail);
return 0;
} else {
@@ -510,18 +510,17 @@ chop_send_targeted(circuit_t *c, conn_t *target, size_t blocksize)
}
static int
-chop_send_chaff(circuit_t *c)
+chop_send_chaff(chop_circuit_t *ckt)
{
- chop_circuit_t *ckt = static_cast<chop_circuit_t *>(c);
size_t room;
- conn_t *target = chop_pick_connection(ckt, 1, &room);
+ chop_conn_t *target = chop_pick_connection(ckt, 1, &room);
if (!target) {
/* If we have connections and we can't send, that means we're waiting
for the server to respond. Just wait. */
return 0;
}
- return chop_send_targeted(c, target, room);
+ return chop_send_targeted(ckt, target, room);
}
static void
@@ -530,7 +529,7 @@ must_transmit_timer_cb(evutil_socket_t, short, void *arg)
chop_conn_t *conn = static_cast<chop_conn_t*>(arg);
size_t room;
- if (!conn->circuit) {
+ if (!conn->upstream) {
log_debug(conn, "must transmit, but no circuit (stale connection)");
conn_do_flush(conn);
return;
@@ -547,7 +546,7 @@ must_transmit_timer_cb(evutil_socket_t, short, void *arg)
}
log_debug(conn, "must transmit");
- chop_send_targeted(conn->circuit, conn, room - CHOP_BLOCK_OVERHD);
+ chop_send_targeted(conn->upstream, conn, room - CHOP_BLOCK_OVERHD);
}
/* Receive subroutines. */
@@ -571,9 +570,9 @@ mod32_le(uint32_t s, uint32_t t)
/** Add BLOCK to the reassembly queue at the appropriate location
and merge adjacent blocks to the extent possible. */
static int
-chop_reassemble_block(circuit_t *c, struct evbuffer *block, chop_header *hdr)
+chop_reassemble_block(chop_circuit_t *ckt, struct evbuffer *block,
+ chop_header *hdr)
{
- chop_circuit_t *ckt = static_cast<chop_circuit_t *>(c);
chop_reassembly_elt *queue = &ckt->reassembly_queue;
chop_reassembly_elt *p, *q;
@@ -583,14 +582,14 @@ chop_reassemble_block(circuit_t *c, struct evbuffer *block, chop_header *hdr)
contents. Doing all chaff-handling here simplifies the caller
at the expense of slightly more buffer-management overhead. */
if (!(hdr->flags & (CHOP_F_SYN|CHOP_F_FIN))) {
- log_debug(c, "discarding chaff with no flags");
+ log_debug(ckt, "discarding chaff with no flags");
evbuffer_free(block);
return 0;
}
hdr->length = 0;
evbuffer_drain(block, evbuffer_get_length(block));
- log_debug(c, "chaff with flags, treating length as 0");
+ log_debug(ckt, "chaff with flags, treating length as 0");
}
/* SYN must occur at offset zero, may not be duplicated, and if we
@@ -601,7 +600,7 @@ chop_reassemble_block(circuit_t *c, struct evbuffer *block, chop_header *hdr)
(queue->next != queue &&
((queue->next->flags & CHOP_F_SYN) ||
!mod32_le(hdr->offset + hdr->length, queue->next->offset))))) {
- log_warn(c, "protocol error: inappropriate SYN block");
+ log_warn(ckt, "protocol error: inappropriate SYN block");
return -1;
}
@@ -610,7 +609,7 @@ chop_reassemble_block(circuit_t *c, struct evbuffer *block, chop_header *hdr)
if ((hdr->flags & CHOP_F_FIN) && queue->prev != queue &&
((queue->prev->flags & CHOP_F_FIN) ||
!mod32_le(queue->prev->offset + queue->prev->length, hdr->offset))) {
- log_warn(c, "protocol error: inappropriate FIN block");
+ log_warn(ckt, "protocol error: inappropriate FIN block");
return -1;
}
@@ -621,7 +620,7 @@ chop_reassemble_block(circuit_t *c, struct evbuffer *block, chop_header *hdr)
!mod32_le(queue->next->offset + queue->next->length, hdr->offset)) ||
((queue->prev->flags & CHOP_F_FIN) &&
!mod32_le(hdr->offset + hdr->length, queue->prev->offset)))) {
- log_warn(c, "protocol error: inappropriate normal block");
+ log_warn(ckt, "protocol error: inappropriate normal block");
return -1;
}
@@ -643,7 +642,7 @@ chop_reassemble_block(circuit_t *c, struct evbuffer *block, chop_header *hdr)
/* protocol error: this block goes before 'p' but does not fit
after 'p->prev' */
- log_warn(c, "protocol error: %u byte block does not fit at offset %u",
+ log_warn(ckt, "protocol error: %u byte block does not fit at offset %u",
hdr->length, hdr->offset);
return -1;
}
@@ -654,7 +653,7 @@ chop_reassemble_block(circuit_t *c, struct evbuffer *block, chop_header *hdr)
that this block goes after the last block in the list (aka p->prev). */
if (!p->data && p->prev->data &&
!mod32_lt(p->prev->offset + p->prev->length, hdr->offset)) {
- log_warn(c, "protocol error: %u byte block does not fit at offset %u "
+ log_warn(ckt, "protocol error: %u byte block does not fit at offset %u "
"(sentinel case)",
hdr->length, hdr->offset);
return -1;
@@ -674,7 +673,7 @@ chop_reassemble_block(circuit_t *c, struct evbuffer *block, chop_header *hdr)
grow_back:
if (evbuffer_add_buffer(p->data, block)) {
- log_warn(c, "failed to append to existing buffer");
+ log_warn(ckt, "failed to append to existing buffer");
return -1;
}
evbuffer_free(block);
@@ -685,7 +684,7 @@ chop_reassemble_block(circuit_t *c, struct evbuffer *block, chop_header *hdr)
while (p->next->data && p->offset + p->length == p->next->offset) {
q = p->next;
if (evbuffer_add_buffer(p->data, q->data)) {
- log_warn(c, "failed to merge buffers");
+ log_warn(ckt, "failed to merge buffers");
return -1;
}
p->length += q->length;
@@ -700,7 +699,7 @@ chop_reassemble_block(circuit_t *c, struct evbuffer *block, chop_header *hdr)
grow_front:
if (evbuffer_prepend_buffer(p->data, block)) {
- log_warn(c, "failed to prepend to existing buffer");
+ log_warn(ckt, "failed to prepend to existing buffer");
return -1;
}
evbuffer_free(block);
@@ -712,7 +711,7 @@ chop_reassemble_block(circuit_t *c, struct evbuffer *block, chop_header *hdr)
while (p->prev->data && p->offset == p->prev->offset + p->prev->length) {
q = p->prev;
if (evbuffer_prepend_buffer(p->data, q->data)) {
- log_warn(c, "failed to merge buffers");
+ log_warn(ckt, "failed to merge buffers");
return -1;
}
p->length += q->length;
@@ -730,31 +729,31 @@ chop_reassemble_block(circuit_t *c, struct evbuffer *block, chop_header *hdr)
/* Flush as much data toward upstream as we can. */
static int
-chop_push_to_upstream(circuit_t *c)
+chop_push_to_upstream(chop_circuit_t *ckt)
{
- chop_circuit_t *ckt = static_cast<chop_circuit_t *>(c);
/* Only the first reassembly queue entry, if any, can possibly be
ready to flush (because chop_reassemble_block ensures that there
are gaps between all queue elements). */
chop_reassembly_elt *ready = ckt->reassembly_queue.next;
if (!ready->data || ckt->recv_offset != ready->offset) {
- log_debug(c, "no data pushable to upstream yet");
+ log_debug(ckt, "no data pushable to upstream yet");
return 0;
}
if (!ckt->received_syn) {
if (!(ready->flags & CHOP_F_SYN)) {
- log_debug(c, "waiting for SYN");
+ log_debug(ckt, "waiting for SYN");
return 0;
}
- log_debug(c, "processed SYN");
+ log_debug(ckt, "processed SYN");
ckt->received_syn = true;
}
- log_debug(c, "can push %lu bytes to upstream",
+ log_debug(ckt, "can push %lu bytes to upstream",
(unsigned long)evbuffer_get_length(ready->data));
- if (evbuffer_add_buffer(bufferevent_get_output(c->up_buffer), ready->data)) {
- log_warn(c, "failure pushing data to upstream");
+ if (evbuffer_add_buffer(bufferevent_get_output(ckt->up_buffer),
+ ready->data)) {
+ log_warn(ckt, "failure pushing data to upstream");
return -1;
}
@@ -765,8 +764,8 @@ chop_push_to_upstream(circuit_t *c)
log_assert(!ckt->received_fin);
log_assert(ready->next == &ckt->reassembly_queue);
ckt->received_fin = true;
- log_debug(c, "processed FIN");
- circuit_recv_eof(c);
+ log_debug(ckt, "processed FIN");
+ circuit_recv_eof(ckt);
}
log_assert(ready->next == &ckt->reassembly_queue ||
@@ -782,12 +781,12 @@ chop_push_to_upstream(circuit_t *c)
/* Circuit handling */
static int
-chop_find_or_make_circuit(conn_t *conn, uint64_t circuit_id)
+chop_find_or_make_circuit(chop_conn_t *conn, uint64_t circuit_id)
{
chop_config_t *cfg = static_cast<chop_config_t *>(conn->cfg);
chop_circuit_table::value_type in(circuit_id, 0);
std::pair<chop_circuit_table::iterator, bool> out = cfg->circuits.insert(in);
- circuit_t *ck;
+ chop_circuit_t *ck;
if (!out.second) { // element already exists
if (!out.first->second) {
@@ -797,7 +796,7 @@ chop_find_or_make_circuit(conn_t *conn, uint64_t circuit_id)
ck = out.first->second;
log_debug(conn, "found circuit to %s", ck->up_peer);
} else {
- ck = circuit_create(cfg, 0);
+ ck = dynamic_cast<chop_circuit_t *>(circuit_create(cfg, 0));
if (!ck) {
log_warn(conn, "failed to create new circuit");
return -1;
@@ -808,7 +807,7 @@ chop_find_or_make_circuit(conn_t *conn, uint64_t circuit_id)
return -1;
}
log_debug(conn, "created new circuit to %s", ck->up_peer);
- static_cast<chop_circuit_t *>(ck)->circuit_id = circuit_id;
+ ck->circuit_id = circuit_id;
out.first->second = ck;
}
@@ -978,10 +977,10 @@ chop_circuit_t::~chop_circuit_t()
upstream_eof ? '+' : '-',
(unsigned long)downstreams.size());
- for (unordered_set<conn_t *>::iterator i = this->downstreams.begin();
+ for (unordered_set<chop_conn_t *>::iterator i = this->downstreams.begin();
i != this->downstreams.end(); i++) {
- conn_t *conn = *i;
- conn->circuit = NULL;
+ chop_conn_t *conn = *i;
+ conn->upstream = NULL;
if (evbuffer_get_length(conn_get_outbound(conn)) > 0)
conn_do_flush(conn);
else
@@ -1016,9 +1015,15 @@ chop_circuit_t::~chop_circuit_t()
}
void
-chop_circuit_t::add_downstream(conn_t *conn)
+chop_circuit_t::add_downstream(conn_t *cn)
{
+ chop_conn_t *conn = dynamic_cast<chop_conn_t *>(cn);
+ log_assert(conn);
+ log_assert(!conn->upstream);
+
+ conn->upstream = this;
this->downstreams.insert(conn);
+
log_debug(this, "added connection <%d.%d> to %s, now %lu",
this->serial, conn->serial, conn->peername,
(unsigned long)this->downstreams.size());
@@ -1027,9 +1032,15 @@ chop_circuit_t::add_downstream(conn_t *conn)
}
void
-chop_circuit_t::drop_downstream(conn_t *conn)
+chop_circuit_t::drop_downstream(conn_t *cn)
{
+ chop_conn_t *conn = dynamic_cast<chop_conn_t *>(cn);
+ log_assert(conn);
+ log_assert(conn->upstream == this);
+
+ conn->upstream = NULL;
this->downstreams.erase(conn);
+
log_debug(this, "dropped connection <%d.%d> to %s, now %lu",
this->serial, conn->serial, conn->peername,
(unsigned long)this->downstreams.size());
@@ -1078,6 +1089,8 @@ chop_conn_t::chop_conn_t()
chop_conn_t::~chop_conn_t()
{
+ if (this->upstream)
+ circuit_drop_downstream(this->upstream, this);
if (this->steg)
delete this->steg;
if (this->must_transmit_timer)
@@ -1085,6 +1098,12 @@ chop_conn_t::~chop_conn_t()
evbuffer_free(this->recv_pending);
}
+circuit_t *
+chop_conn_t::circuit() const
+{
+ return this->upstream;
+}
+
int
chop_conn_t::maybe_open_upstream()
{
@@ -1104,7 +1123,7 @@ chop_conn_t::handshake()
server can't even _connect to its upstream_ till it gets the
first packet from the client. */
if (this->cfg->mode != LSN_SIMPLE_SERVER)
- circuit_arm_flush_timer(this->circuit, 1);
+ circuit_arm_flush_timer(this->upstream, 1);
return 0;
}
@@ -1143,9 +1162,9 @@ chop_circuit_t::send()
as long as we haven't both sent and received a FIN, or we might
deadlock. */
if (this->sent_fin && this->received_fin) {
- for (unordered_set<conn_t *>::iterator i = this->downstreams.begin();
+ for (unordered_set<chop_conn_t *>::iterator i = this->downstreams.begin();
i != this->downstreams.end(); i++) {
- chop_conn_t *conn = static_cast<chop_conn_t*>(*i);
+ chop_conn_t *conn = *i;
if (conn->must_transmit_timer &&
evtimer_pending(conn->must_transmit_timer, NULL))
must_transmit_timer_cb(-1, 0, conn);
@@ -1168,7 +1187,6 @@ chop_circuit_t::send_eof()
int
chop_conn_t::recv()
{
- circuit_t *c;
chop_circuit_t *ckt;
chop_header hdr;
struct evbuffer *block;
@@ -1178,7 +1196,7 @@ chop_conn_t::recv()
if (this->steg->receive(this, this->recv_pending))
return -1;
- if (!this->circuit) {
+ if (!this->upstream) {
log_debug(this, "finding circuit");
if (chop_peek_circuit_id(this->recv_pending, &hdr)) {
log_debug(this, "not enough data to find circuit yet");
@@ -1193,7 +1211,7 @@ chop_conn_t::recv()
it's either chaff or a protocol error; either way we can just
discard it. Since we will never reply, call conn_do_flush so
the connection will be dropped as soon as we receive an EOF. */
- if (!this->circuit) {
+ if (!this->upstream) {
evbuffer_drain(this->recv_pending,
evbuffer_get_length(this->recv_pending));
conn_do_flush(this);
@@ -1201,9 +1219,8 @@ chop_conn_t::recv()
}
}
- c = this->circuit;
- ckt = static_cast<chop_circuit_t *>(c);
- log_debug(this, "circuit to %s", c->up_peer);
+ ckt = this->upstream;
+ log_debug(this, "circuit to %s", ckt->up_peer);
for (;;) {
avail = evbuffer_get_length(this->recv_pending);
@@ -1269,18 +1286,18 @@ chop_conn_t::recv()
return -1;
}
- if (chop_reassemble_block(c, block, &hdr)) {
+ if (chop_reassemble_block(ckt, block, &hdr)) {
evbuffer_free(block);
return -1;
}
}
- if (chop_push_to_upstream(c))
+ if (chop_push_to_upstream(ckt))
return -1;
/* It may have now become possible to send queued data. */
- if (evbuffer_get_length(bufferevent_get_input(c->up_buffer)))
- c->send();
+ if (evbuffer_get_length(bufferevent_get_input(ckt->up_buffer)))
+ ckt->send();
/* If we're at EOF, close all connections (sending first if
necessary). If we're the client we have to keep trying to talk
@@ -1288,9 +1305,9 @@ chop_conn_t::recv()
deadlock. */
else if (ckt->sent_fin && ckt->received_fin) {
circuit_disarm_flush_timer(ckt);
- for (unordered_set<conn_t *>::iterator i = ckt->downstreams.begin();
+ for (unordered_set<chop_conn_t *>::iterator i = ckt->downstreams.begin();
i != ckt->downstreams.end(); i++) {
- chop_conn_t *conn = static_cast<chop_conn_t*>(*i);
+ chop_conn_t *conn = *i;
if (conn->must_transmit_timer &&
evtimer_pending(conn->must_transmit_timer, NULL))
must_transmit_timer_cb(-1, 0, conn);
@@ -1312,8 +1329,8 @@ chop_conn_t::recv_eof()
We should only drop the connection from the circuit if we're no
longer sending in the opposite direction. Also, we should not
drop the connection if its must-transmit timer is still pending. */
- if (this->circuit) {
- chop_circuit_t *ckt = static_cast<chop_circuit_t *>(this->circuit);
+ if (this->upstream) {
+ chop_circuit_t *ckt = this->upstream;
if (evbuffer_get_length(conn_get_inbound(this)) > 0)
if (this->recv())
diff --git a/src/protocol/null.cc b/src/protocol/null.cc
index 27a9320..be89821 100644
--- a/src/protocol/null.cc
+++ b/src/protocol/null.cc
@@ -9,19 +9,26 @@
#include <event2/buffer.h>
namespace {
- struct null_config_t : config_t {
+ struct null_config_t : config_t
+ {
struct evutil_addrinfo *listen_addr;
struct evutil_addrinfo *target_addr;
CONFIG_DECLARE_METHODS(null);
};
- struct null_conn_t : conn_t {
+ struct null_circuit_t;
+
+ struct null_conn_t : conn_t
+ {
+ null_circuit_t *upstream;
+
CONN_DECLARE_METHODS(null);
};
- struct null_circuit_t : circuit_t {
- conn_t *downstream;
+ struct null_circuit_t : circuit_t
+ {
+ null_conn_t *downstream;
CIRCUIT_DECLARE_METHODS(null);
};
@@ -112,12 +119,13 @@ null_config_t::get_target_addrs(size_t n)
circuit_t *
null_config_t::circuit_create(size_t)
{
- circuit_t *ckt = new null_circuit_t;
+ null_circuit_t *ckt = new null_circuit_t;
ckt->cfg = this;
return ckt;
}
null_circuit_t::null_circuit_t()
+ : downstream(NULL)
{
}
@@ -126,17 +134,23 @@ null_circuit_t::~null_circuit_t()
if (downstream) {
/* break the circular reference before deallocating the
downstream connection */
- downstream->circuit = NULL;
+ downstream->upstream = NULL;
delete downstream;
}
}
/* Add a connection to this circuit. */
void
-null_circuit_t::add_downstream(conn_t *conn)
+null_circuit_t::add_downstream(conn_t *cn)
{
+ null_conn_t *conn = dynamic_cast<null_conn_t *>(cn);
+ log_assert(conn);
+ log_assert(!conn->upstream);
log_assert(!this->downstream);
+
this->downstream = conn;
+ conn->upstream = this;
+
log_debug(this, "added connection <%d.%d> to %s",
this->serial, conn->serial, conn->peername);
}
@@ -145,12 +159,18 @@ null_circuit_t::add_downstream(conn_t *conn)
protocol, it is because of a network error, and the whole circuit
should be closed. */
void
-null_circuit_t::drop_downstream(conn_t *conn)
+null_circuit_t::drop_downstream(conn_t *cn)
{
+ null_conn_t *conn = dynamic_cast<null_conn_t *>(cn);
+ log_assert(conn);
log_assert(this->downstream == conn);
+ log_assert(conn->upstream == this);
+
log_debug(this, "dropped connection <%d.%d> to %s",
this->serial, conn->serial, conn->peername);
this->downstream = NULL;
+ conn->upstream = NULL;
+
if (evbuffer_get_length(bufferevent_get_output(this->up_buffer)) > 0)
/* this may already have happened, but there's no harm in
doing it again */
@@ -190,18 +210,29 @@ null_config_t::conn_create(size_t)
}
null_conn_t::null_conn_t()
+ : upstream(NULL)
{
}
null_conn_t::~null_conn_t()
{
+ if (this->upstream)
+ circuit_drop_downstream(this->upstream, this);
+}
+
+/* Only used by connection callbacks */
+circuit_t *
+null_conn_t::circuit() const
+{
+ return upstream;
}
/** Null inbound-to-outbound connections are 1:1 */
int
null_conn_t::maybe_open_upstream()
{
- circuit_t *ckt = circuit_create(this->cfg, 0);
+ null_circuit_t *ckt = dynamic_cast<null_circuit_t *>
+ (circuit_create(this->cfg, 0));
if (!ckt)
return -1;
@@ -221,8 +252,8 @@ null_conn_t::handshake()
int
null_conn_t::recv()
{
- log_assert(this->circuit);
- return evbuffer_add_buffer(bufferevent_get_output(this->circuit->up_buffer),
+ log_assert(this->upstream);
+ return evbuffer_add_buffer(bufferevent_get_output(this->upstream->up_buffer),
conn_get_inbound(this));
}
@@ -230,12 +261,12 @@ null_conn_t::recv()
int
null_conn_t::recv_eof()
{
- if (this->circuit) {
+ if (this->upstream) {
if (evbuffer_get_length(conn_get_inbound(this)) > 0)
if (this->recv())
return -1;
- circuit_recv_eof(this->circuit);
+ circuit_recv_eof(this->upstream);
}
return 0;
}
diff --git a/src/util.cc b/src/util.cc
index 9cca1fd..7273671 100644
--- a/src/util.cc
+++ b/src/util.cc
@@ -595,10 +595,11 @@ logpfx(int severity, const char *fn, conn_t *conn)
fprintf(log_dest, "[%s] ", sev_to_string(severity));
if (log_min_sev == LOG_SEV_DEBUG && fn)
fprintf(log_dest, "%s: ", fn);
- if (conn)
- fprintf(log_dest, "<%u.%u> ",
- conn->circuit ? conn->circuit->serial : 0,
- conn->serial);
+ if (conn) {
+ circuit_t *ckt = conn->circuit();
+ unsigned int ckt_serial = ckt ? ckt->serial : 0;
+ fprintf(log_dest, "<%u.%u> ", ckt_serial, conn->serial);
+ }
}
/**** Public logging API. ****/