[obfsproxy/master] A conn_t now wraps only one socket. A new struct, circuit_t, associates the two sides of a proxy channel.

commit 99dee61ff3aa70aab43b9543a8fd0c58858f0dcd Author: Zack Weinberg <zackw@panix.com> Date: Wed Aug 3 11:20:17 2011 -0700 A conn_t now wraps only one socket. A new struct, circuit_t, associates the two sides of a proxy channel. --- src/network.c | 538 ++++++++++++++++++++++++++++++++------------------------ src/network.h | 32 +++- src/util.h | 1 + 3 files changed, 331 insertions(+), 240 deletions(-) diff --git a/src/network.c b/src/network.c index 4ae2c15..08ce696 100644 --- a/src/network.c +++ b/src/network.c @@ -20,25 +20,27 @@ /* Terminology used in this file: - A "side" is a bidirectional communications channel, usually backed - by a network socket and represented at this layer by a - 'struct bufferevent'. - - A "connection" is a _pair_ of sides, referred to as the "upstream" - side and the "downstream" side. A connection is represented by a - 'conn_t'. The upstream side of a connection communicates in - cleartext with the higher-level program that wishes to make use of - our obfuscation service. The downstream side commmunicates in an - obfuscated fashion with the remote peer that the higher-level - client wishes to contact. + A "connection" is a bidirectional communications channel, usually + backed by a network socket, and represented in this layer by a + 'conn_t', wrapping a 'struct bufferevent'. + + A "circuit" is a _pair_ of connections, referred to as the + "upstream" and "downstream" connections. A circuit is represented + by a 'circuit_t'. The upstream connection of a circuit + communicates in cleartext with the higher-level program that wishes + to make use of our obfuscation service. The downstream connection + commmunicates in an obfuscated fashion with the remote peer that + the higher-level client wishes to contact. A "listener" is a listening socket bound to a particular - obfuscation protocol, represented in this layer by a 'listener_t'. - Connecting to a listener creates one side of a connection, and - causes this program to initiate the other side of the connection. - A listener is said to be a "client" listener if connecting to it - creates the _upstream_ side of a connection, and a "server" - listener if connecting to it creates the _downstream_ side. + obfuscation protocol, represented in this layer by a 'listener_t' + and its 'config_t'. Connecting to a listener creates one + connection of a circuit, and causes this program to initiate the + other connection (possibly after receiving in-band instructions + about where to connect to). A listener is said to be a "client" + listener if connecting to it creates the _upstream_ connection, and + a "server" listener if connecting to it creates the _downstream_ + connection. There are two kinds of client listeners: a "simple" client listener always connects to the same remote peer every time it needs to @@ -72,19 +74,17 @@ static void listener_cb(struct evconnlistener *evcl, evutil_socket_t fd, struct sockaddr *sourceaddr, int socklen, void *closure); -static void simple_client_listener_cb(conn_t *conn, struct bufferevent *buf); -static void socks_client_listener_cb(conn_t *conn, struct bufferevent *buf); -static void simple_server_listener_cb(conn_t *conn, struct bufferevent *buf); +static void simple_client_listener_cb(conn_t *conn); +static void socks_client_listener_cb(conn_t *conn); +static void simple_server_listener_cb(conn_t *conn); static void conn_free(conn_t *conn); -static void close_conn(conn_t *conn); -static void close_all_connections(void); +static void conn_free_all(void); +static void conn_free_on_flush(struct bufferevent *bev, void *arg); -static void close_conn_on_flush(struct bufferevent *bev, void *arg); - -static struct bufferevent *open_outbound_socket(conn_t *conn, - struct event_base *base, - bufferevent_data_cb readcb); +static int circuit_create(conn_t *upstream, conn_t *downstream); +static void circuit_free(circuit_t *circuit); +static conn_t *open_outbound(conn_t *conn, bufferevent_data_cb readcb); static void upstream_read_cb(struct bufferevent *bev, void *arg); static void downstream_read_cb(struct bufferevent *bev, void *arg); @@ -114,7 +114,7 @@ start_shutdown(int barbaric) shutting_down=1; if (barbaric) - close_all_connections(); + conn_free_all(); if (connections && smartlist_len(connections) == 0) { smartlist_free(connections); @@ -129,7 +129,7 @@ start_shutdown(int barbaric) Closes all open connections. */ static void -close_all_connections(void) +conn_free_all(void) { if (!connections) return; @@ -251,10 +251,11 @@ listener_cb(struct evconnlistener *evcl, evutil_socket_t fd, smartlist_len(connections)); conn->peername = peername; + conn->buffer = buf; switch (conn->mode) { - case LSN_SIMPLE_CLIENT: simple_client_listener_cb(conn, buf); break; - case LSN_SOCKS_CLIENT: socks_client_listener_cb(conn, buf); break; - case LSN_SIMPLE_SERVER: simple_server_listener_cb(conn, buf); break; + case LSN_SIMPLE_CLIENT: simple_client_listener_cb(conn); break; + case LSN_SOCKS_CLIENT: socks_client_listener_cb(conn); break; + case LSN_SIMPLE_SERVER: simple_server_listener_cb(conn); break; default: obfs_abort(); } @@ -265,23 +266,16 @@ listener_cb(struct evconnlistener *evcl, evutil_socket_t fd, simple client mode. */ static void -simple_client_listener_cb(conn_t *conn, struct bufferevent *buf) +simple_client_listener_cb(conn_t *conn) { - struct event_base *base = bufferevent_get_base(buf); - obfs_assert(buf); obfs_assert(conn); obfs_assert(conn->mode == LSN_SIMPLE_CLIENT); log_debug("%s: simple client connection", conn->peername); - conn->upstream = buf; - bufferevent_setcb(conn->upstream, upstream_read_cb, NULL, error_cb, conn); - - /* Don't enable the upstream side for reading at this point; wait - till the downstream side is established. */ + bufferevent_setcb(conn->buffer, upstream_read_cb, NULL, error_cb, conn); - conn->downstream = open_outbound_socket(conn, base, downstream_read_cb); - if (!conn->downstream) { - close_conn(conn); + if (circuit_create(conn, open_outbound(conn, downstream_read_cb))) { + conn_free(conn); return; } @@ -293,22 +287,19 @@ simple_client_listener_cb(conn_t *conn, struct bufferevent *buf) socks mode. */ static void -socks_client_listener_cb(conn_t *conn, struct bufferevent *buf) +socks_client_listener_cb(conn_t *conn) { - obfs_assert(buf); obfs_assert(conn); obfs_assert(conn->mode == LSN_SOCKS_CLIENT); log_debug("%s: socks client connection", conn->peername); - conn->upstream = buf; - bufferevent_setcb(conn->upstream, socks_read_cb, NULL, error_cb, conn); - bufferevent_enable(conn->upstream, EV_READ|EV_WRITE); - - /* Construct SOCKS state. */ conn->socks_state = socks_state_new(); - /* Do not create a downstream bufferevent at this time; the socks - handler will do it after it learns the downstream peer address. */ + bufferevent_setcb(conn->buffer, socks_read_cb, NULL, error_cb, conn); + bufferevent_enable(conn->buffer, EV_READ|EV_WRITE); + + /* Do not create a circuit at this time; the socks handler will do + it after it learns the remote peer address. */ log_debug("%s: setup complete", conn->peername); } @@ -318,25 +309,16 @@ socks_client_listener_cb(conn_t *conn, struct bufferevent *buf) server mode. */ static void -simple_server_listener_cb(conn_t *conn, struct bufferevent *buf) +simple_server_listener_cb(conn_t *conn) { - struct event_base *base = bufferevent_get_base(buf); - obfs_assert(buf); obfs_assert(conn); obfs_assert(conn->mode == LSN_SIMPLE_SERVER); log_debug("%s: server connection", conn->peername); - conn->downstream = buf; - bufferevent_setcb(conn->downstream, - downstream_read_cb, NULL, error_cb, conn); - - /* Don't enable the downstream side for reading at this point; wait - till the upstream side is established. */ + bufferevent_setcb(conn->buffer, downstream_read_cb, NULL, error_cb, conn); - /* New bufferevent to connect to the target address. */ - conn->upstream = open_outbound_socket(conn, base, upstream_read_cb); - if (!conn->upstream) { - close_conn(conn); + if (circuit_create(open_outbound(conn, upstream_read_cb), conn)) { + conn_free(conn); return; } @@ -349,36 +331,29 @@ simple_server_listener_cb(conn_t *conn, struct bufferevent *buf) static void conn_free(conn_t *conn) { - if (conn->peername) - free(conn->peername); - if (conn->socks_state) - socks_state_free(conn->socks_state); - if (conn->upstream) - bufferevent_free(conn->upstream); - if (conn->downstream) - bufferevent_free(conn->downstream); - - proto_conn_free(conn); -} - -/** - Closes a fully open connection. -*/ -static void -close_conn(conn_t *conn) -{ - obfs_assert(connections); - log_debug("Closing connection from %s; %d remaining", - conn->peername, smartlist_len(connections) - 1); - - smartlist_remove(connections, conn); - conn_free(conn); - - /* If this was the last connection AND we are shutting down, - finish shutdown. */ - if (smartlist_len(connections) == 0 && shutting_down) { - smartlist_free(connections); - finish_shutdown(); + if (conn->circuit) + circuit_free(conn->circuit); /* will recurse and take care of us */ + else { + if (connections) { + smartlist_remove(connections, conn); + log_debug("Closing connection with %s; %d remaining", + conn->peername, smartlist_len(connections)); + } + if (conn->peername) + free(conn->peername); + if (conn->socks_state) + socks_state_free(conn->socks_state); + if (conn->buffer) + bufferevent_free(conn->buffer); + proto_conn_free(conn); + + /* If this was the last connection AND we are shutting down, + finish shutdown. */ + if (shutting_down && (!connections || smartlist_len(connections) == 0)) { + if (connections) + smartlist_free(connections); + finish_shutdown(); + } } } @@ -387,25 +362,51 @@ close_conn(conn_t *conn) empty. */ static void -close_conn_on_flush(struct bufferevent *bev, void *arg) +conn_free_on_flush(struct bufferevent *bev, void *arg) { conn_t *conn = arg; log_debug("%s for %s", __func__, conn->peername); if (evbuffer_get_length(bufferevent_get_output(bev)) == 0) - close_conn(conn); + conn_free(conn); +} + +static int +circuit_create(conn_t *up, conn_t *down) +{ + if (!up || !down) + return -1; + + circuit_t *r = xzalloc(sizeof(circuit_t)); + r->upstream = up; + r->downstream = down; + up->circuit = r; + down->circuit = r; + return 0; +} + +static void +circuit_free(circuit_t *circuit) +{ + /* break the circular references before deallocating each side */ + circuit->upstream->circuit = NULL; + circuit->downstream->circuit = NULL; + conn_free(circuit->upstream); + conn_free(circuit->downstream); + free(circuit); } /** - Make the outbound socket for a connection. + Make the outbound connection for a circuit. */ -static struct bufferevent * -open_outbound_socket(conn_t *conn, struct event_base *base, - bufferevent_data_cb readcb) +static conn_t * +open_outbound(conn_t *conn, bufferevent_data_cb readcb) { struct evutil_addrinfo *addr = config_get_target_addr(conn->cfg); + struct event_base *base = bufferevent_get_base(conn->buffer); struct bufferevent *buf; char *peername; + conn_t *newconn; if (!addr) { log_warn("%s: no target addresses available", conn->peername); @@ -418,18 +419,23 @@ open_outbound_socket(conn_t *conn, struct event_base *base, return NULL; } - bufferevent_setcb(buf, readcb, NULL, pending_conn_cb, conn); + newconn = proto_conn_create(conn->cfg); + if (!conn) { + log_warn("%s: failed to allocate state for outbound connection", + conn->peername); + bufferevent_free(buf); + return NULL; + } + + newconn->buffer = buf; + bufferevent_setcb(buf, readcb, NULL, pending_conn_cb, newconn); do { peername = printable_address(addr->ai_addr, addr->ai_addrlen); log_info("%s (%s): trying to connect to %s", conn->peername, conn->cfg->vtable->name, peername); - if (bufferevent_socket_connect(buf, addr->ai_addr, addr->ai_addrlen) >= 0) { - /* success */ - bufferevent_enable(buf, EV_READ|EV_WRITE); - free(peername); - return buf; - } + if (bufferevent_socket_connect(buf, addr->ai_addr, addr->ai_addrlen) >= 0) + goto success; log_info("%s: connection to %s failed: %s", conn->peername, peername, evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR())); @@ -440,8 +446,55 @@ open_outbound_socket(conn_t *conn, struct event_base *base, log_warn("%s: all outbound connection attempts failed", conn->peername); - bufferevent_free(buf); + conn_free(newconn); return NULL; + + success: + bufferevent_enable(buf, EV_READ|EV_WRITE); + newconn->peername = peername; + obfs_assert(connections); + smartlist_add(connections, newconn); + return newconn; +} + +/** + As open_outbound, but uses bufferevent_socket_connect_hostname + rather than bufferevent_socket_connect. +*/ +static conn_t * +open_outbound_hostname(conn_t *conn, int af, const char *addr, int port) +{ + struct event_base *base = bufferevent_get_base(conn->buffer); + struct bufferevent *buf; + conn_t *newconn; + + buf = bufferevent_socket_new(base, -1, BEV_OPT_CLOSE_ON_FREE); + if (!buf) { + log_warn("%s: unable to create outbound socket buffer", conn->peername); + return NULL; + } + newconn = proto_conn_create(conn->cfg); + if (!conn) { + log_warn("%s: failed to allocate state for outbound connection", + conn->peername); + bufferevent_free(buf); + return NULL; + } + newconn->buffer = buf; + bufferevent_setcb(buf, downstream_read_cb, NULL, pending_socks_cb, newconn); + if (bufferevent_socket_connect_hostname(buf, get_evdns_base(), + af, addr, port) < 0) { + log_warn("%s: outbound connection to %s:%d failed: %s", + conn->peername, addr, port, + evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR())); + conn_free(newconn); + return NULL; + } + + bufferevent_enable(buf, EV_READ|EV_WRITE); + obfs_assert(connections); + smartlist_add(connections, newconn); + return newconn; } /** @@ -452,9 +505,7 @@ socks_read_cb(struct bufferevent *bev, void *arg) { conn_t *conn = arg; enum socks_ret socks_ret; - log_debug("%s for %s", __func__, conn->peername); - /* socks only makes sense on the upstream side */ - obfs_assert(bev == conn->upstream); + log_debug("%s: %s", conn->peername, __func__); do { enum socks_status_t status = socks_state_get_status(conn->socks_state); @@ -466,27 +517,16 @@ socks_read_cb(struct bufferevent *bev, void *arg) const char *addr=NULL; r = socks_state_get_address(conn->socks_state, &af, &addr, &port); obfs_assert(r==0); - conn->downstream = - bufferevent_socket_new(bufferevent_get_base(conn->upstream), - -1, BEV_OPT_CLOSE_ON_FREE); - - bufferevent_setcb(conn->downstream, - downstream_read_cb, NULL, pending_socks_cb, conn); - - r = bufferevent_socket_connect_hostname(conn->downstream, - get_evdns_base(), - af, addr, port); - bufferevent_enable(conn->downstream, EV_READ|EV_WRITE); - log_debug("socket_connect_hostname said %d! (%s,%d)", r, addr, port); - - if (r < 0) { + log_info("%s: socks: trying to connect to %s:%u", + conn->peername, addr, port); + if (circuit_create(conn, open_outbound_hostname(conn, af, addr, port))) { /* XXXX send socks reply */ - close_conn(conn); + conn_free(conn); return; } /* further upstream data will be processed once the downstream side is established */ - bufferevent_disable(conn->upstream, EV_READ|EV_WRITE); + bufferevent_disable(conn->buffer, EV_READ|EV_WRITE); return; } @@ -498,14 +538,14 @@ socks_read_cb(struct bufferevent *bev, void *arg) if (socks_ret == SOCKS_INCOMPLETE) return; /* need to read more data. */ else if (socks_ret == SOCKS_BROKEN) - close_conn(conn); /* XXXX send socks reply */ + conn_free(conn); /* XXXX send socks reply */ else if (socks_ret == SOCKS_CMD_NOT_CONNECT) { bufferevent_enable(bev, EV_WRITE); bufferevent_disable(bev, EV_READ); socks5_send_reply(bufferevent_get_output(bev), conn->socks_state, SOCKS5_FAILED_UNSUPPORTED); bufferevent_setcb(bev, NULL, - close_conn_on_flush, flush_error_cb, conn); + conn_free_on_flush, flush_error_cb, conn); return; } } @@ -518,17 +558,27 @@ socks_read_cb(struct bufferevent *bev, void *arg) static void upstream_read_cb(struct bufferevent *bev, void *arg) { - conn_t *conn = arg; - log_debug("%s: %s, %lu bytes available", conn->peername, __func__, + conn_t *up = arg; + conn_t *down; + log_debug("%s: %s, %lu bytes available", up->peername, __func__, (unsigned long)evbuffer_get_length(bufferevent_get_input(bev))); - obfs_assert(bev == conn->upstream); - if (proto_send(conn, - bufferevent_get_input(conn->upstream), - bufferevent_get_output(conn->downstream)) < 0) { - log_debug("%s: Error during transmit.", conn->peername); - close_conn(conn); + obfs_assert(up->buffer == bev); + obfs_assert(!up->flushing); + obfs_assert(up->is_open); + obfs_assert(up->circuit); + obfs_assert(up->circuit->downstream); + + down = up->circuit->downstream; + if (proto_send(up, + bufferevent_get_input(up->buffer), + bufferevent_get_output(down->buffer))) { + log_debug("%s: error during transmit.", up->peername); + conn_free(up); } + log_debug("%s: transmitted %lu bytes", down->peername, + (unsigned long) + evbuffer_get_length(bufferevent_get_output(down->buffer))); } /** @@ -539,28 +589,45 @@ upstream_read_cb(struct bufferevent *bev, void *arg) static void downstream_read_cb(struct bufferevent *bev, void *arg) { - conn_t *conn = arg; + conn_t *down = arg; + conn_t *up; enum recv_ret r; - log_debug("%s: %s, %lu bytes available", conn->peername, __func__, + + log_debug("%s: %s, %lu bytes available", down->peername, __func__, (unsigned long)evbuffer_get_length(bufferevent_get_input(bev))); - obfs_assert(bev == conn->downstream); - r = proto_recv(conn, - bufferevent_get_input(conn->downstream), - bufferevent_get_output(conn->upstream)); + obfs_assert(down->buffer == bev); + obfs_assert(!down->flushing); + obfs_assert(down->is_open); + obfs_assert(down->circuit); + obfs_assert(down->circuit->upstream); + up = down->circuit->upstream; + + + r = proto_recv(down, + bufferevent_get_input(down->buffer), + bufferevent_get_output(up->buffer)); if (r == RECV_BAD) { - log_debug("%s: Error during receive.", conn->peername); - close_conn(conn); - } else if (r == RECV_SEND_PENDING) { - log_debug("%s: Reply of %lu bytes", conn->peername, + log_debug("%s: error during receive.", down->peername); + conn_free(down); + } else { + log_debug("%s: forwarded %lu bytes", down->peername, (unsigned long) - evbuffer_get_length(bufferevent_get_input(conn->upstream))); - if (proto_send(conn, - bufferevent_get_input(conn->upstream), - bufferevent_get_output(conn->downstream)) < 0) { - log_debug("%s: Error during reply.", conn->peername); - close_conn(conn); + evbuffer_get_length(bufferevent_get_output(up->buffer))); + if (r == RECV_SEND_PENDING) { + log_debug("%s: reply of %lu bytes", down->peername, + (unsigned long) + evbuffer_get_length(bufferevent_get_input(up->buffer))); + if (proto_send(up, + bufferevent_get_input(up->buffer), + bufferevent_get_output(down->buffer)) < 0) { + log_debug("%s: error during reply.", down->peername); + conn_free(down); + } + log_debug("%s: transmitted %lu bytes", down->peername, + (unsigned long) + evbuffer_get_length(bufferevent_get_output(down->buffer))); } } } @@ -570,22 +637,29 @@ downstream_read_cb(struct bufferevent *bev, void *arg) We prepare the connection to be closed ASAP. */ static void -error_or_eof(conn_t *conn, struct bufferevent *bev_err) +error_or_eof(conn_t *conn) { + circuit_t *circ = conn->circuit; + struct bufferevent *bev_err = conn->buffer; struct bufferevent *bev_flush; - log_debug("%s for %s", __func__, conn->peername); - if (bev_err == conn->upstream) bev_flush = conn->downstream; - else if (bev_err == conn->downstream) bev_flush = conn->upstream; - else obfs_abort(); + log_debug("%s for %s", __func__, conn->peername); + if (!circ || conn->flushing || !conn->is_open) { + conn_free(conn); + return; + } - if (conn->flushing || !conn->is_open || - evbuffer_get_length(bufferevent_get_output(bev_flush)) == 0) { - close_conn(conn); + bev_flush = (conn == circ->upstream) ? circ->downstream->buffer + : circ->upstream->buffer; + if (evbuffer_get_length(bufferevent_get_output(bev_flush)) == 0) { + conn_free(conn); return; } - conn->flushing = 1; + /* XXX move ->flushing and ->is_open to circuit_t */ + circ->upstream->flushing = 1; + circ->downstream->flushing = 1; + /* Stop reading and writing; wait for the other side to flush if it has * data. */ bufferevent_disable(bev_err, EV_READ|EV_WRITE); @@ -595,7 +669,7 @@ error_or_eof(conn_t *conn, struct bufferevent *bev_err) official API to retrieve the callback functions and/or change just one callback while leaving the others intact. */ bufferevent_setcb(bev_flush, bev_flush->readcb, - close_conn_on_flush, flush_error_cb, conn); + conn_free_on_flush, flush_error_cb, conn); } /** @@ -607,7 +681,7 @@ error_cb(struct bufferevent *bev, short what, void *arg) { conn_t *conn = arg; int errcode = EVUTIL_SOCKET_ERROR(); - log_debug("%s for %s: what=%x err=%d", __func__, conn->peername, + log_debug("%s for %s: what=0x%04x errno=%d", __func__, conn->peername, what, errcode); /* It should be impossible to get here with BEV_EVENT_CONNECTED. */ @@ -615,21 +689,16 @@ error_cb(struct bufferevent *bev, short what, void *arg) obfs_assert(!(what & BEV_EVENT_CONNECTED)); if (what & BEV_EVENT_ERROR) { - log_warn("Error on %s side of connection from %s: %s", - bev == conn->upstream ? "upstream" : "downstream", + log_warn("Error talking to %s: %s", conn->peername, evutil_socket_error_to_string(errcode)); } else if (what & BEV_EVENT_EOF) { - log_info("EOF on %s side of connection from %s", - bev == conn->upstream ? "upstream" : "downstream", - conn->peername); + log_info("EOF from %s", conn->peername); } else { obfs_assert(what & BEV_EVENT_TIMEOUT); - log_info("Timeout on %s side of connection from %s", - bev == conn->upstream ? "upstream" : "downstream", - conn->peername); + log_info("Timeout talking to %s", conn->peername); } - error_or_eof(arg, bev); + error_or_eof(conn); } /** @@ -641,7 +710,7 @@ flush_error_cb(struct bufferevent *bev, short what, void *arg) { conn_t *conn = arg; int errcode = EVUTIL_SOCKET_ERROR(); - log_debug("%s for %s: what=%x err=%d", __func__, conn->peername, + log_debug("%s for %s: what=0x%04x errno=%d", __func__, conn->peername, what, errcode); /* It should be impossible to get here with BEV_EVENT_CONNECTED. */ @@ -650,11 +719,10 @@ flush_error_cb(struct bufferevent *bev, short what, void *arg) obfs_assert(conn->flushing); - log_warn("Error during flush of %s side of connection from %s: %s", - bev == conn->upstream ? "upstream" : "downstream", + log_warn("Error during flush of connection with %s: %s", conn->peername, evutil_socket_error_to_string(errcode)); - close_conn(conn); + conn_free(conn); return; } @@ -668,35 +736,36 @@ static void pending_conn_cb(struct bufferevent *bev, short what, void *arg) { conn_t *conn = arg; - struct bufferevent *other; log_debug("%s: %s", conn->peername, __func__); - if (bev == conn->upstream) other = conn->downstream; - else if (bev == conn->downstream) other = conn->upstream; - else obfs_abort(); - - /* Upon successful connection, enable traffic on the other side, - and replace this callback with the regular error_cb */ + /* Upon successful connection, enable traffic on both sides of the + connection, and replace this callback with the regular error_cb */ if (what & BEV_EVENT_CONNECTED) { - obfs_assert(!conn->flushing); + circuit_t *circ = conn->circuit; + obfs_assert(circ); + obfs_assert(!circ->upstream->flushing); + obfs_assert(!circ->downstream->flushing); + + circ->upstream->is_open = 1; + circ->downstream->is_open = 1; - conn->is_open = 1; - log_debug("%s: Successful %s connection", conn->peername, - bev == conn->upstream ? "upstream" : "downstream"); + log_debug("%s: Successful connection", conn->peername); /* Queue handshake, if any. */ - if (proto_handshake(conn, - bufferevent_get_output(conn->downstream))<0) { + if (proto_handshake(circ->downstream, + bufferevent_get_output(circ->downstream->buffer))<0) { log_debug("%s: Error during handshake", conn->peername); - close_conn(conn); + conn_free(conn); return; } - /* XXX Dirty access to bufferevent guts. There appears to be no - official API to retrieve the callback functions and/or change - just one callback while leaving the others intact. */ - bufferevent_setcb(bev, bev->readcb, bev->writecb, error_cb, conn); - bufferevent_enable(other, EV_READ|EV_WRITE); + bufferevent_setcb(circ->upstream->buffer, + upstream_read_cb, NULL, error_cb, circ->upstream); + bufferevent_setcb(circ->downstream->buffer, + downstream_read_cb, NULL, error_cb, circ->downstream); + + bufferevent_enable(circ->upstream->buffer, EV_READ|EV_WRITE); + bufferevent_enable(circ->downstream->buffer, EV_READ|EV_WRITE); return; } @@ -712,10 +781,20 @@ pending_conn_cb(struct bufferevent *bev, short what, void *arg) static void pending_socks_cb(struct bufferevent *bev, short what, void *arg) { - conn_t *conn = arg; - log_debug("%s: %s", conn->peername, __func__); - obfs_assert(bev == conn->downstream); - obfs_assert(conn->socks_state); + conn_t *down = arg; + circuit_t *circ = down->circuit; + conn_t *up; + socks_state_t *socks; + + log_debug("%s: %s", down->peername, __func__); + + obfs_assert(circ); + obfs_assert(circ->upstream); + obfs_assert(circ->upstream->socks_state); + obfs_assert(circ->downstream == down); + + up = circ->upstream; + socks = circ->upstream->socks_state; /* If we got an error while in the ST_HAVE_ADDR state, chances are that we failed connecting to the host requested by the CONNECT @@ -725,14 +804,11 @@ pending_socks_cb(struct bufferevent *bev, short what, void *arg) errno isn't meaningful in that case... */ if ((what & (BEV_EVENT_EOF|BEV_EVENT_ERROR|BEV_EVENT_TIMEOUT))) { int err = EVUTIL_SOCKET_ERROR(); - log_warn("Connection error: %s", - evutil_socket_error_to_string(err)); - if (socks_state_get_status(conn->socks_state) == ST_HAVE_ADDR) { - socks_send_reply(conn->socks_state, - bufferevent_get_output(conn->upstream), - err); + log_warn("Connection error: %s", evutil_socket_error_to_string(err)); + if (socks_state_get_status(socks) == ST_HAVE_ADDR) { + socks_send_reply(socks, bufferevent_get_output(up->buffer), err); } - error_or_eof(conn, bev); + error_or_eof(down); return; } @@ -744,39 +820,39 @@ pending_socks_cb(struct bufferevent *bev, short what, void *arg) struct sockaddr *sa = (struct sockaddr*)&ss; socklen_t slen = sizeof(&ss); - obfs_assert(!conn->flushing); + obfs_assert(!up->flushing); + obfs_assert(!down->flushing); + /* Figure out where we actually connected to, and tell the socks client */ if (getpeername(bufferevent_getfd(bev), sa, &slen) == 0) { - /* Figure out where we actually connected to so that we can tell the - * socks client */ - socks_state_set_address(conn->socks_state, sa); + socks_state_set_address(socks, sa); + if (!down->peername) + down->peername = printable_address(sa, slen); } - socks_send_reply(conn->socks_state, - bufferevent_get_output(conn->upstream), 0); + socks_send_reply(socks, bufferevent_get_output(up->buffer), 0); /* Switch to regular upstream behavior. */ - socks_state_free(conn->socks_state); - conn->socks_state = NULL; - conn->is_open = 1; - log_debug("%s: Successful %s connection", conn->peername, - bev == conn->upstream ? "upstream" : "downstream"); - - bufferevent_setcb(conn->upstream, - upstream_read_cb, NULL, error_cb, conn); - bufferevent_setcb(conn->downstream, - downstream_read_cb, NULL, error_cb, conn); - bufferevent_enable(conn->upstream, EV_READ|EV_WRITE); + socks_state_free(socks); + up->socks_state = NULL; + up->is_open = 1; + down->is_open = 1; + log_debug("%s: Successful outbound connection to %s", + up->peername, down->peername); + + bufferevent_setcb(up->buffer, upstream_read_cb, NULL, error_cb, up); + bufferevent_setcb(down->buffer, downstream_read_cb, NULL, error_cb, down); + bufferevent_enable(up->buffer, EV_READ|EV_WRITE); + bufferevent_enable(down->buffer, EV_READ|EV_WRITE); /* Queue handshake, if any. */ - if (proto_handshake(conn, - bufferevent_get_output(conn->downstream))<0) { - log_debug("%s: Error during handshake", conn->peername); - close_conn(conn); + if (proto_handshake(down, bufferevent_get_output(down->buffer))) { + log_debug("%s: Error during handshake", down->peername); + conn_free(down); return; } - if (evbuffer_get_length(bufferevent_get_input(conn->upstream)) != 0) - upstream_read_cb(conn->upstream, conn); + if (evbuffer_get_length(bufferevent_get_input(up->buffer)) > 0) + upstream_read_cb(up->buffer, up); return; } diff --git a/src/network.h b/src/network.h index 8fb41ef..72df32c 100644 --- a/src/network.h +++ b/src/network.h @@ -12,23 +12,37 @@ void close_all_listeners(void); void start_shutdown(int barbaric); /** - This struct defines the state of a connection between "upstream" - and "downstream" peers (it's really two connections at the socket - level). Again, each protocol may extend this structure with - additional private data by embedding it as the first member of a - larger structure. The protocol's conn_create() method is responsible - only for filling in the |vtable| field of this structure, plus any - private data of course. + This struct defines the state of one socket-level connection. Each + protocol may extend this structure with additional private data by + embedding it as the first member of a larger structure. The + protocol's conn_create() method is responsible only for filling in + the |cfg| and |mode| fields of this structure, plus any private + data of course. + + An incoming connection is not associated with a circuit until the + destination for the other side of the circuit is known. An outgoing + connection is associated with a circuit from its creation. */ struct conn_t { config_t *cfg; char *peername; socks_state_t *socks_state; - struct bufferevent *upstream; - struct bufferevent *downstream; + circuit_t *circuit; + struct bufferevent *buffer; enum listen_mode mode : 30; unsigned int flushing : 1; unsigned int is_open : 1; }; +/** + This struct defines a pair of established connections. The "upstream" + connection is to the higher-level client or server that we are proxying + traffic for. The "downstream" connection is to the remote peer. + */ + +struct circuit_t { + conn_t *upstream; + conn_t *downstream; +}; + #endif diff --git a/src/util.h b/src/util.h index 8228790..2b37c48 100644 --- a/src/util.h +++ b/src/util.h @@ -62,6 +62,7 @@ unsigned int ui64_log2(uint64_t u64); /***** Network types and functions. *****/ +typedef struct circuit_t circuit_t; typedef struct config_t config_t; typedef struct conn_t conn_t; typedef struct protocol_vtable protocol_vtable;
participants (1)
-
nickm@torproject.org