[tor-commits] [obfsproxy/master] A conn_t now wraps only one socket. A new struct, circuit_t, associates the two sides of a proxy channel.

nickm at torproject.org nickm at torproject.org
Fri Sep 9 17:08:58 UTC 2011


commit 99dee61ff3aa70aab43b9543a8fd0c58858f0dcd
Author: Zack Weinberg <zackw at panix.com>
Date:   Wed Aug 3 11:20:17 2011 -0700

    A conn_t now wraps only one socket. A new struct, circuit_t, associates the two sides of a proxy channel.
---
 src/network.c |  538 ++++++++++++++++++++++++++++++++------------------------
 src/network.h |   32 +++-
 src/util.h    |    1 +
 3 files changed, 331 insertions(+), 240 deletions(-)

diff --git a/src/network.c b/src/network.c
index 4ae2c15..08ce696 100644
--- a/src/network.c
+++ b/src/network.c
@@ -20,25 +20,27 @@
 
 /* Terminology used in this file:
 
-   A "side" is a bidirectional communications channel, usually backed
-   by a network socket and represented at this layer by a
-   'struct bufferevent'.
-
-   A "connection" is a _pair_ of sides, referred to as the "upstream"
-   side and the "downstream" side.  A connection is represented by a
-   'conn_t'.  The upstream side of a connection communicates in
-   cleartext with the higher-level program that wishes to make use of
-   our obfuscation service.  The downstream side commmunicates in an
-   obfuscated fashion with the remote peer that the higher-level
-   client wishes to contact.
+   A "connection" is a bidirectional communications channel, usually
+   backed by a network socket, and represented in this layer by a
+   'conn_t', wrapping a 'struct bufferevent'.
+
+   A "circuit" is a _pair_ of connections, referred to as the
+   "upstream" and "downstream" connections.  A circuit is represented
+   by a 'circuit_t'.  The upstream connection of a circuit
+   communicates in cleartext with the higher-level program that wishes
+   to make use of our obfuscation service.  The downstream connection
+   commmunicates in an obfuscated fashion with the remote peer that
+   the higher-level client wishes to contact.
 
    A "listener" is a listening socket bound to a particular
-   obfuscation protocol, represented in this layer by a 'listener_t'.
-   Connecting to a listener creates one side of a connection, and
-   causes this program to initiate the other side of the connection.
-   A listener is said to be a "client" listener if connecting to it
-   creates the _upstream_ side of a connection, and a "server"
-   listener if connecting to it creates the _downstream_ side.
+   obfuscation protocol, represented in this layer by a 'listener_t'
+   and its 'config_t'.  Connecting to a listener creates one
+   connection of a circuit, and causes this program to initiate the
+   other connection (possibly after receiving in-band instructions
+   about where to connect to).  A listener is said to be a "client"
+   listener if connecting to it creates the _upstream_ connection, and
+   a "server" listener if connecting to it creates the _downstream_
+   connection.
 
    There are two kinds of client listeners: a "simple" client listener
    always connects to the same remote peer every time it needs to
@@ -72,19 +74,17 @@ static void listener_cb(struct evconnlistener *evcl, evutil_socket_t fd,
                         struct sockaddr *sourceaddr, int socklen,
                         void *closure);
 
-static void simple_client_listener_cb(conn_t *conn, struct bufferevent *buf);
-static void socks_client_listener_cb(conn_t *conn, struct bufferevent *buf);
-static void simple_server_listener_cb(conn_t *conn, struct bufferevent *buf);
+static void simple_client_listener_cb(conn_t *conn);
+static void socks_client_listener_cb(conn_t *conn);
+static void simple_server_listener_cb(conn_t *conn);
 
 static void conn_free(conn_t *conn);
-static void close_conn(conn_t *conn);
-static void close_all_connections(void);
+static void conn_free_all(void);
+static void conn_free_on_flush(struct bufferevent *bev, void *arg);
 
-static void close_conn_on_flush(struct bufferevent *bev, void *arg);
-
-static struct bufferevent *open_outbound_socket(conn_t *conn,
-                                                struct event_base *base,
-                                                bufferevent_data_cb readcb);
+static int circuit_create(conn_t *upstream, conn_t *downstream);
+static void circuit_free(circuit_t *circuit);
+static conn_t *open_outbound(conn_t *conn, bufferevent_data_cb readcb);
 
 static void upstream_read_cb(struct bufferevent *bev, void *arg);
 static void downstream_read_cb(struct bufferevent *bev, void *arg);
@@ -114,7 +114,7 @@ start_shutdown(int barbaric)
     shutting_down=1;
 
   if (barbaric)
-    close_all_connections();
+    conn_free_all();
 
   if (connections && smartlist_len(connections) == 0) {
     smartlist_free(connections);
@@ -129,7 +129,7 @@ start_shutdown(int barbaric)
    Closes all open connections.
 */
 static void
-close_all_connections(void)
+conn_free_all(void)
 {
   if (!connections)
     return;
@@ -251,10 +251,11 @@ listener_cb(struct evconnlistener *evcl, evutil_socket_t fd,
             smartlist_len(connections));
 
   conn->peername = peername;
+  conn->buffer = buf;
   switch (conn->mode) {
-  case LSN_SIMPLE_CLIENT: simple_client_listener_cb(conn, buf); break;
-  case LSN_SOCKS_CLIENT:  socks_client_listener_cb(conn, buf);  break;
-  case LSN_SIMPLE_SERVER: simple_server_listener_cb(conn, buf); break;
+  case LSN_SIMPLE_CLIENT: simple_client_listener_cb(conn); break;
+  case LSN_SOCKS_CLIENT:  socks_client_listener_cb(conn);  break;
+  case LSN_SIMPLE_SERVER: simple_server_listener_cb(conn); break;
   default:
     obfs_abort();
   }
@@ -265,23 +266,16 @@ listener_cb(struct evconnlistener *evcl, evutil_socket_t fd,
    simple client mode.
 */
 static void
-simple_client_listener_cb(conn_t *conn, struct bufferevent *buf)
+simple_client_listener_cb(conn_t *conn)
 {
-  struct event_base *base = bufferevent_get_base(buf);
-  obfs_assert(buf);
   obfs_assert(conn);
   obfs_assert(conn->mode == LSN_SIMPLE_CLIENT);
   log_debug("%s: simple client connection", conn->peername);
 
-  conn->upstream = buf;
-  bufferevent_setcb(conn->upstream, upstream_read_cb, NULL, error_cb, conn);
-
-  /* Don't enable the upstream side for reading at this point; wait
-     till the downstream side is established. */
+  bufferevent_setcb(conn->buffer, upstream_read_cb, NULL, error_cb, conn);
 
-  conn->downstream = open_outbound_socket(conn, base, downstream_read_cb);
-  if (!conn->downstream) {
-    close_conn(conn);
+  if (circuit_create(conn, open_outbound(conn, downstream_read_cb))) {
+    conn_free(conn);
     return;
   }
 
@@ -293,22 +287,19 @@ simple_client_listener_cb(conn_t *conn, struct bufferevent *buf)
    socks mode.
 */
 static void
-socks_client_listener_cb(conn_t *conn, struct bufferevent *buf)
+socks_client_listener_cb(conn_t *conn)
 {
-  obfs_assert(buf);
   obfs_assert(conn);
   obfs_assert(conn->mode == LSN_SOCKS_CLIENT);
   log_debug("%s: socks client connection", conn->peername);
 
-  conn->upstream = buf;
-  bufferevent_setcb(conn->upstream, socks_read_cb, NULL, error_cb, conn);
-  bufferevent_enable(conn->upstream, EV_READ|EV_WRITE);
-
-  /* Construct SOCKS state. */
   conn->socks_state = socks_state_new();
 
-  /* Do not create a downstream bufferevent at this time; the socks
-     handler will do it after it learns the downstream peer address. */
+  bufferevent_setcb(conn->buffer, socks_read_cb, NULL, error_cb, conn);
+  bufferevent_enable(conn->buffer, EV_READ|EV_WRITE);
+
+  /* Do not create a circuit at this time; the socks handler will do
+     it after it learns the remote peer address. */
 
   log_debug("%s: setup complete", conn->peername);
 }
@@ -318,25 +309,16 @@ socks_client_listener_cb(conn_t *conn, struct bufferevent *buf)
    server mode.
 */
 static void
-simple_server_listener_cb(conn_t *conn, struct bufferevent *buf)
+simple_server_listener_cb(conn_t *conn)
 {
-  struct event_base *base = bufferevent_get_base(buf);
-  obfs_assert(buf);
   obfs_assert(conn);
   obfs_assert(conn->mode == LSN_SIMPLE_SERVER);
   log_debug("%s: server connection", conn->peername);
 
-  conn->downstream = buf;
-  bufferevent_setcb(conn->downstream,
-                    downstream_read_cb, NULL, error_cb, conn);
-
-  /* Don't enable the downstream side for reading at this point; wait
-     till the upstream side is established. */
+  bufferevent_setcb(conn->buffer, downstream_read_cb, NULL, error_cb, conn);
 
-  /* New bufferevent to connect to the target address. */
-  conn->upstream = open_outbound_socket(conn, base, upstream_read_cb);
-  if (!conn->upstream) {
-    close_conn(conn);
+  if (circuit_create(open_outbound(conn, upstream_read_cb), conn)) {
+    conn_free(conn);
     return;
   }
 
@@ -349,36 +331,29 @@ simple_server_listener_cb(conn_t *conn, struct bufferevent *buf)
 static void
 conn_free(conn_t *conn)
 {
-  if (conn->peername)
-    free(conn->peername);
-  if (conn->socks_state)
-    socks_state_free(conn->socks_state);
-  if (conn->upstream)
-    bufferevent_free(conn->upstream);
-  if (conn->downstream)
-    bufferevent_free(conn->downstream);
-
-  proto_conn_free(conn);
-}
-
-/**
-   Closes a fully open connection.
-*/
-static void
-close_conn(conn_t *conn)
-{
-  obfs_assert(connections);
-  log_debug("Closing connection from %s; %d remaining",
-            conn->peername, smartlist_len(connections) - 1);
-
-  smartlist_remove(connections, conn);
-  conn_free(conn);
-
-  /* If this was the last connection AND we are shutting down,
-     finish shutdown. */
-  if (smartlist_len(connections) == 0 && shutting_down) {
-    smartlist_free(connections);
-    finish_shutdown();
+  if (conn->circuit)
+    circuit_free(conn->circuit); /* will recurse and take care of us */
+  else {
+    if (connections) {
+      smartlist_remove(connections, conn);
+      log_debug("Closing connection with %s; %d remaining",
+                conn->peername, smartlist_len(connections));
+    }
+    if (conn->peername)
+      free(conn->peername);
+    if (conn->socks_state)
+      socks_state_free(conn->socks_state);
+    if (conn->buffer)
+      bufferevent_free(conn->buffer);
+    proto_conn_free(conn);
+
+    /* If this was the last connection AND we are shutting down,
+       finish shutdown. */
+    if (shutting_down && (!connections || smartlist_len(connections) == 0)) {
+      if (connections)
+        smartlist_free(connections);
+      finish_shutdown();
+    }
   }
 }
 
@@ -387,25 +362,51 @@ close_conn(conn_t *conn)
    empty.
 */
 static void
-close_conn_on_flush(struct bufferevent *bev, void *arg)
+conn_free_on_flush(struct bufferevent *bev, void *arg)
 {
   conn_t *conn = arg;
   log_debug("%s for %s", __func__, conn->peername);
 
   if (evbuffer_get_length(bufferevent_get_output(bev)) == 0)
-    close_conn(conn);
+    conn_free(conn);
+}
+
+static int
+circuit_create(conn_t *up, conn_t *down)
+{
+  if (!up || !down)
+    return -1;
+
+  circuit_t *r = xzalloc(sizeof(circuit_t));
+  r->upstream = up;
+  r->downstream = down;
+  up->circuit = r;
+  down->circuit = r;
+  return 0;
+}
+
+static void
+circuit_free(circuit_t *circuit)
+{
+  /* break the circular references before deallocating each side */
+  circuit->upstream->circuit = NULL;
+  circuit->downstream->circuit = NULL;
+  conn_free(circuit->upstream);
+  conn_free(circuit->downstream);
+  free(circuit);
 }
 
 /**
-   Make the outbound socket for a connection.
+   Make the outbound connection for a circuit.
 */
-static struct bufferevent *
-open_outbound_socket(conn_t *conn, struct event_base *base,
-                     bufferevent_data_cb readcb)
+static conn_t *
+open_outbound(conn_t *conn, bufferevent_data_cb readcb)
 {
   struct evutil_addrinfo *addr = config_get_target_addr(conn->cfg);
+  struct event_base *base = bufferevent_get_base(conn->buffer);
   struct bufferevent *buf;
   char *peername;
+  conn_t *newconn;
 
   if (!addr) {
     log_warn("%s: no target addresses available", conn->peername);
@@ -418,18 +419,23 @@ open_outbound_socket(conn_t *conn, struct event_base *base,
     return NULL;
   }
 
-  bufferevent_setcb(buf, readcb, NULL, pending_conn_cb, conn);
+  newconn = proto_conn_create(conn->cfg);
+  if (!conn) {
+    log_warn("%s: failed to allocate state for outbound connection",
+             conn->peername);
+    bufferevent_free(buf);
+    return NULL;
+  }
+
+  newconn->buffer = buf;
+  bufferevent_setcb(buf, readcb, NULL, pending_conn_cb, newconn);
 
   do {
     peername = printable_address(addr->ai_addr, addr->ai_addrlen);
     log_info("%s (%s): trying to connect to %s",
              conn->peername, conn->cfg->vtable->name, peername);
-    if (bufferevent_socket_connect(buf, addr->ai_addr, addr->ai_addrlen) >= 0) {
-      /* success */
-      bufferevent_enable(buf, EV_READ|EV_WRITE);
-      free(peername);
-      return buf;
-    }
+    if (bufferevent_socket_connect(buf, addr->ai_addr, addr->ai_addrlen) >= 0)
+      goto success;
     log_info("%s: connection to %s failed: %s",
              conn->peername, peername,
              evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR()));
@@ -440,8 +446,55 @@ open_outbound_socket(conn_t *conn, struct event_base *base,
   log_warn("%s: all outbound connection attempts failed",
            conn->peername);
 
-  bufferevent_free(buf);
+  conn_free(newconn);
   return NULL;
+
+ success:
+  bufferevent_enable(buf, EV_READ|EV_WRITE);
+  newconn->peername = peername;
+  obfs_assert(connections);
+  smartlist_add(connections, newconn);
+  return newconn;
+}
+
+/**
+   As open_outbound, but uses bufferevent_socket_connect_hostname
+   rather than bufferevent_socket_connect.
+*/
+static conn_t *
+open_outbound_hostname(conn_t *conn, int af, const char *addr, int port)
+{
+  struct event_base *base = bufferevent_get_base(conn->buffer);
+  struct bufferevent *buf;
+  conn_t *newconn;
+
+  buf = bufferevent_socket_new(base, -1, BEV_OPT_CLOSE_ON_FREE);
+  if (!buf) {
+    log_warn("%s: unable to create outbound socket buffer", conn->peername);
+    return NULL;
+  }
+  newconn = proto_conn_create(conn->cfg);
+  if (!conn) {
+    log_warn("%s: failed to allocate state for outbound connection",
+             conn->peername);
+    bufferevent_free(buf);
+    return NULL;
+  }
+  newconn->buffer = buf;
+  bufferevent_setcb(buf, downstream_read_cb, NULL, pending_socks_cb, newconn);
+  if (bufferevent_socket_connect_hostname(buf, get_evdns_base(),
+                                          af, addr, port) < 0) {
+    log_warn("%s: outbound connection to %s:%d failed: %s",
+             conn->peername, addr, port,
+             evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR()));
+    conn_free(newconn);
+    return NULL;
+  }
+
+  bufferevent_enable(buf, EV_READ|EV_WRITE);
+  obfs_assert(connections);
+  smartlist_add(connections, newconn);
+  return newconn;
 }
 
 /**
@@ -452,9 +505,7 @@ socks_read_cb(struct bufferevent *bev, void *arg)
 {
   conn_t *conn = arg;
   enum socks_ret socks_ret;
-  log_debug("%s for %s", __func__, conn->peername);
-  /* socks only makes sense on the upstream side */
-  obfs_assert(bev == conn->upstream);
+  log_debug("%s: %s", conn->peername, __func__);
 
   do {
     enum socks_status_t status = socks_state_get_status(conn->socks_state);
@@ -466,27 +517,16 @@ socks_read_cb(struct bufferevent *bev, void *arg)
       const char *addr=NULL;
       r = socks_state_get_address(conn->socks_state, &af, &addr, &port);
       obfs_assert(r==0);
-      conn->downstream =
-        bufferevent_socket_new(bufferevent_get_base(conn->upstream),
-                               -1, BEV_OPT_CLOSE_ON_FREE);
-
-      bufferevent_setcb(conn->downstream,
-                        downstream_read_cb, NULL, pending_socks_cb, conn);
-
-      r = bufferevent_socket_connect_hostname(conn->downstream,
-                                              get_evdns_base(),
-                                              af, addr, port);
-      bufferevent_enable(conn->downstream, EV_READ|EV_WRITE);
-      log_debug("socket_connect_hostname said %d! (%s,%d)", r, addr, port);
-
-      if (r < 0) {
+      log_info("%s: socks: trying to connect to %s:%u",
+               conn->peername, addr, port);
+      if (circuit_create(conn, open_outbound_hostname(conn, af, addr, port))) {
         /* XXXX send socks reply */
-        close_conn(conn);
+        conn_free(conn);
         return;
       }
       /* further upstream data will be processed once the downstream
          side is established */
-      bufferevent_disable(conn->upstream, EV_READ|EV_WRITE);
+      bufferevent_disable(conn->buffer, EV_READ|EV_WRITE);
       return;
     }
 
@@ -498,14 +538,14 @@ socks_read_cb(struct bufferevent *bev, void *arg)
   if (socks_ret == SOCKS_INCOMPLETE)
     return; /* need to read more data. */
   else if (socks_ret == SOCKS_BROKEN)
-    close_conn(conn); /* XXXX send socks reply */
+    conn_free(conn); /* XXXX send socks reply */
   else if (socks_ret == SOCKS_CMD_NOT_CONNECT) {
     bufferevent_enable(bev, EV_WRITE);
     bufferevent_disable(bev, EV_READ);
     socks5_send_reply(bufferevent_get_output(bev), conn->socks_state,
                       SOCKS5_FAILED_UNSUPPORTED);
     bufferevent_setcb(bev, NULL,
-                      close_conn_on_flush, flush_error_cb, conn);
+                      conn_free_on_flush, flush_error_cb, conn);
     return;
   }
 }
@@ -518,17 +558,27 @@ socks_read_cb(struct bufferevent *bev, void *arg)
 static void
 upstream_read_cb(struct bufferevent *bev, void *arg)
 {
-  conn_t *conn = arg;
-  log_debug("%s: %s, %lu bytes available", conn->peername, __func__,
+  conn_t *up = arg;
+  conn_t *down;
+  log_debug("%s: %s, %lu bytes available", up->peername, __func__,
             (unsigned long)evbuffer_get_length(bufferevent_get_input(bev)));
-  obfs_assert(bev == conn->upstream);
 
-  if (proto_send(conn,
-                 bufferevent_get_input(conn->upstream),
-                 bufferevent_get_output(conn->downstream)) < 0) {
-    log_debug("%s: Error during transmit.", conn->peername);
-    close_conn(conn);
+  obfs_assert(up->buffer == bev);
+  obfs_assert(!up->flushing);
+  obfs_assert(up->is_open);
+  obfs_assert(up->circuit);
+  obfs_assert(up->circuit->downstream);
+
+  down = up->circuit->downstream;
+  if (proto_send(up,
+                 bufferevent_get_input(up->buffer),
+                 bufferevent_get_output(down->buffer))) {
+    log_debug("%s: error during transmit.", up->peername);
+    conn_free(up);
   }
+  log_debug("%s: transmitted %lu bytes", down->peername,
+            (unsigned long)
+            evbuffer_get_length(bufferevent_get_output(down->buffer)));
 }
 
 /**
@@ -539,28 +589,45 @@ upstream_read_cb(struct bufferevent *bev, void *arg)
 static void
 downstream_read_cb(struct bufferevent *bev, void *arg)
 {
-  conn_t *conn = arg;
+  conn_t *down = arg;
+  conn_t *up;
   enum recv_ret r;
-  log_debug("%s: %s, %lu bytes available", conn->peername, __func__,
+
+  log_debug("%s: %s, %lu bytes available", down->peername, __func__,
             (unsigned long)evbuffer_get_length(bufferevent_get_input(bev)));
-  obfs_assert(bev == conn->downstream);
 
-  r = proto_recv(conn,
-                 bufferevent_get_input(conn->downstream),
-                 bufferevent_get_output(conn->upstream));
+  obfs_assert(down->buffer == bev);
+  obfs_assert(!down->flushing);
+  obfs_assert(down->is_open);
+  obfs_assert(down->circuit);
+  obfs_assert(down->circuit->upstream);
+  up = down->circuit->upstream;
+
+
+  r = proto_recv(down,
+                 bufferevent_get_input(down->buffer),
+                 bufferevent_get_output(up->buffer));
 
   if (r == RECV_BAD) {
-    log_debug("%s: Error during receive.", conn->peername);
-    close_conn(conn);
-  } else if (r == RECV_SEND_PENDING) {
-    log_debug("%s: Reply of %lu bytes", conn->peername,
+    log_debug("%s: error during receive.", down->peername);
+    conn_free(down);
+  } else {
+    log_debug("%s: forwarded %lu bytes", down->peername,
               (unsigned long)
-              evbuffer_get_length(bufferevent_get_input(conn->upstream)));
-    if (proto_send(conn,
-                   bufferevent_get_input(conn->upstream),
-                   bufferevent_get_output(conn->downstream)) < 0) {
-      log_debug("%s: Error during reply.", conn->peername);
-      close_conn(conn);
+              evbuffer_get_length(bufferevent_get_output(up->buffer)));
+    if (r == RECV_SEND_PENDING) {
+      log_debug("%s: reply of %lu bytes", down->peername,
+                (unsigned long)
+                evbuffer_get_length(bufferevent_get_input(up->buffer)));
+      if (proto_send(up,
+                     bufferevent_get_input(up->buffer),
+                     bufferevent_get_output(down->buffer)) < 0) {
+        log_debug("%s: error during reply.", down->peername);
+        conn_free(down);
+      }
+      log_debug("%s: transmitted %lu bytes", down->peername,
+                (unsigned long)
+                evbuffer_get_length(bufferevent_get_output(down->buffer)));
     }
   }
 }
@@ -570,22 +637,29 @@ downstream_read_cb(struct bufferevent *bev, void *arg)
    We prepare the connection to be closed ASAP.
  */
 static void
-error_or_eof(conn_t *conn, struct bufferevent *bev_err)
+error_or_eof(conn_t *conn)
 {
+  circuit_t *circ = conn->circuit;
+  struct bufferevent *bev_err = conn->buffer;
   struct bufferevent *bev_flush;
-  log_debug("%s for %s", __func__, conn->peername);
 
-  if (bev_err == conn->upstream) bev_flush = conn->downstream;
-  else if (bev_err == conn->downstream) bev_flush = conn->upstream;
-  else obfs_abort();
+  log_debug("%s for %s", __func__, conn->peername);
+  if (!circ || conn->flushing || !conn->is_open) {
+    conn_free(conn);
+    return;
+  }
 
-  if (conn->flushing || !conn->is_open ||
-      evbuffer_get_length(bufferevent_get_output(bev_flush)) == 0) {
-    close_conn(conn);
+  bev_flush = (conn == circ->upstream) ? circ->downstream->buffer
+                                       : circ->upstream->buffer;
+  if (evbuffer_get_length(bufferevent_get_output(bev_flush)) == 0) {
+    conn_free(conn);
     return;
   }
 
-  conn->flushing = 1;
+  /* XXX move ->flushing and ->is_open to circuit_t */
+  circ->upstream->flushing = 1;
+  circ->downstream->flushing = 1;
+
   /* Stop reading and writing; wait for the other side to flush if it has
    * data. */
   bufferevent_disable(bev_err, EV_READ|EV_WRITE);
@@ -595,7 +669,7 @@ error_or_eof(conn_t *conn, struct bufferevent *bev_err)
      official API to retrieve the callback functions and/or change
      just one callback while leaving the others intact. */
   bufferevent_setcb(bev_flush, bev_flush->readcb,
-                    close_conn_on_flush, flush_error_cb, conn);
+                    conn_free_on_flush, flush_error_cb, conn);
 }
 
 /**
@@ -607,7 +681,7 @@ error_cb(struct bufferevent *bev, short what, void *arg)
 {
   conn_t *conn = arg;
   int errcode = EVUTIL_SOCKET_ERROR();
-  log_debug("%s for %s: what=%x err=%d", __func__, conn->peername,
+  log_debug("%s for %s: what=0x%04x errno=%d", __func__, conn->peername,
             what, errcode);
 
   /* It should be impossible to get here with BEV_EVENT_CONNECTED. */
@@ -615,21 +689,16 @@ error_cb(struct bufferevent *bev, short what, void *arg)
   obfs_assert(!(what & BEV_EVENT_CONNECTED));
 
   if (what & BEV_EVENT_ERROR) {
-    log_warn("Error on %s side of connection from %s: %s",
-             bev == conn->upstream ? "upstream" : "downstream",
+    log_warn("Error talking to %s: %s",
              conn->peername,
              evutil_socket_error_to_string(errcode));
   } else if (what & BEV_EVENT_EOF) {
-    log_info("EOF on %s side of connection from %s",
-             bev == conn->upstream ? "upstream" : "downstream",
-             conn->peername);
+    log_info("EOF from %s", conn->peername);
   } else {
     obfs_assert(what & BEV_EVENT_TIMEOUT);
-    log_info("Timeout on %s side of connection from %s",
-             bev == conn->upstream ? "upstream" : "downstream",
-             conn->peername);
+    log_info("Timeout talking to %s", conn->peername);
   }
-  error_or_eof(arg, bev);
+  error_or_eof(conn);
 }
 
 /**
@@ -641,7 +710,7 @@ flush_error_cb(struct bufferevent *bev, short what, void *arg)
 {
   conn_t *conn = arg;
   int errcode = EVUTIL_SOCKET_ERROR();
-  log_debug("%s for %s: what=%x err=%d", __func__, conn->peername,
+  log_debug("%s for %s: what=0x%04x errno=%d", __func__, conn->peername,
             what, errcode);
 
   /* It should be impossible to get here with BEV_EVENT_CONNECTED. */
@@ -650,11 +719,10 @@ flush_error_cb(struct bufferevent *bev, short what, void *arg)
 
   obfs_assert(conn->flushing);
 
-  log_warn("Error during flush of %s side of connection from %s: %s",
-           bev == conn->upstream ? "upstream" : "downstream",
+  log_warn("Error during flush of connection with %s: %s",
            conn->peername,
            evutil_socket_error_to_string(errcode));
-  close_conn(conn);
+  conn_free(conn);
   return;
 }
 
@@ -668,35 +736,36 @@ static void
 pending_conn_cb(struct bufferevent *bev, short what, void *arg)
 {
   conn_t *conn = arg;
-  struct bufferevent *other;
   log_debug("%s: %s", conn->peername, __func__);
 
-  if (bev == conn->upstream) other = conn->downstream;
-  else if (bev == conn->downstream) other = conn->upstream;
-  else obfs_abort();
-
-  /* Upon successful connection, enable traffic on the other side,
-     and replace this callback with the regular error_cb */
+  /* Upon successful connection, enable traffic on both sides of the
+     connection, and replace this callback with the regular error_cb */
   if (what & BEV_EVENT_CONNECTED) {
-    obfs_assert(!conn->flushing);
+    circuit_t *circ = conn->circuit;
+    obfs_assert(circ);
+    obfs_assert(!circ->upstream->flushing);
+    obfs_assert(!circ->downstream->flushing);
+
+    circ->upstream->is_open = 1;
+    circ->downstream->is_open = 1;
 
-    conn->is_open = 1;
-    log_debug("%s: Successful %s connection", conn->peername,
-              bev == conn->upstream ? "upstream" : "downstream");
+    log_debug("%s: Successful connection", conn->peername);
 
     /* Queue handshake, if any. */
-    if (proto_handshake(conn,
-                        bufferevent_get_output(conn->downstream))<0) {
+    if (proto_handshake(circ->downstream,
+                        bufferevent_get_output(circ->downstream->buffer))<0) {
       log_debug("%s: Error during handshake", conn->peername);
-      close_conn(conn);
+      conn_free(conn);
       return;
     }
 
-    /* XXX Dirty access to bufferevent guts.  There appears to be no
-       official API to retrieve the callback functions and/or change
-       just one callback while leaving the others intact. */
-    bufferevent_setcb(bev, bev->readcb, bev->writecb, error_cb, conn);
-    bufferevent_enable(other, EV_READ|EV_WRITE);
+    bufferevent_setcb(circ->upstream->buffer,
+                      upstream_read_cb, NULL, error_cb, circ->upstream);
+    bufferevent_setcb(circ->downstream->buffer,
+                      downstream_read_cb, NULL, error_cb, circ->downstream);
+
+    bufferevent_enable(circ->upstream->buffer, EV_READ|EV_WRITE);
+    bufferevent_enable(circ->downstream->buffer, EV_READ|EV_WRITE);
     return;
   }
 
@@ -712,10 +781,20 @@ pending_conn_cb(struct bufferevent *bev, short what, void *arg)
 static void
 pending_socks_cb(struct bufferevent *bev, short what, void *arg)
 {
-  conn_t *conn = arg;
-  log_debug("%s: %s", conn->peername, __func__);
-  obfs_assert(bev == conn->downstream);
-  obfs_assert(conn->socks_state);
+  conn_t *down = arg;
+  circuit_t *circ = down->circuit;
+  conn_t *up;
+  socks_state_t *socks;
+
+  log_debug("%s: %s", down->peername, __func__);
+
+  obfs_assert(circ);
+  obfs_assert(circ->upstream);
+  obfs_assert(circ->upstream->socks_state);
+  obfs_assert(circ->downstream == down);
+
+  up = circ->upstream;
+  socks = circ->upstream->socks_state;
 
   /* If we got an error while in the ST_HAVE_ADDR state, chances are
      that we failed connecting to the host requested by the CONNECT
@@ -725,14 +804,11 @@ pending_socks_cb(struct bufferevent *bev, short what, void *arg)
      errno isn't meaningful in that case...  */
   if ((what & (BEV_EVENT_EOF|BEV_EVENT_ERROR|BEV_EVENT_TIMEOUT))) {
     int err = EVUTIL_SOCKET_ERROR();
-    log_warn("Connection error: %s",
-             evutil_socket_error_to_string(err));
-    if (socks_state_get_status(conn->socks_state) == ST_HAVE_ADDR) {
-      socks_send_reply(conn->socks_state,
-                       bufferevent_get_output(conn->upstream),
-                       err);
+    log_warn("Connection error: %s", evutil_socket_error_to_string(err));
+    if (socks_state_get_status(socks) == ST_HAVE_ADDR) {
+      socks_send_reply(socks, bufferevent_get_output(up->buffer), err);
     }
-    error_or_eof(conn, bev);
+    error_or_eof(down);
     return;
   }
 
@@ -744,39 +820,39 @@ pending_socks_cb(struct bufferevent *bev, short what, void *arg)
     struct sockaddr *sa = (struct sockaddr*)&ss;
     socklen_t slen = sizeof(&ss);
 
-    obfs_assert(!conn->flushing);
+    obfs_assert(!up->flushing);
+    obfs_assert(!down->flushing);
 
+    /* Figure out where we actually connected to, and tell the socks client */
     if (getpeername(bufferevent_getfd(bev), sa, &slen) == 0) {
-      /* Figure out where we actually connected to so that we can tell the
-       * socks client */
-      socks_state_set_address(conn->socks_state, sa);
+      socks_state_set_address(socks, sa);
+      if (!down->peername)
+        down->peername = printable_address(sa, slen);
     }
-    socks_send_reply(conn->socks_state,
-                     bufferevent_get_output(conn->upstream), 0);
+    socks_send_reply(socks, bufferevent_get_output(up->buffer), 0);
 
     /* Switch to regular upstream behavior. */
-    socks_state_free(conn->socks_state);
-    conn->socks_state = NULL;
-    conn->is_open = 1;
-    log_debug("%s: Successful %s connection", conn->peername,
-              bev == conn->upstream ? "upstream" : "downstream");
-
-    bufferevent_setcb(conn->upstream,
-                      upstream_read_cb, NULL, error_cb, conn);
-    bufferevent_setcb(conn->downstream,
-                      downstream_read_cb, NULL, error_cb, conn);
-    bufferevent_enable(conn->upstream, EV_READ|EV_WRITE);
+    socks_state_free(socks);
+    up->socks_state = NULL;
+    up->is_open = 1;
+    down->is_open = 1;
+    log_debug("%s: Successful outbound connection to %s",
+              up->peername, down->peername);
+
+    bufferevent_setcb(up->buffer, upstream_read_cb, NULL, error_cb, up);
+    bufferevent_setcb(down->buffer, downstream_read_cb, NULL, error_cb, down);
+    bufferevent_enable(up->buffer, EV_READ|EV_WRITE);
+    bufferevent_enable(down->buffer, EV_READ|EV_WRITE);
 
     /* Queue handshake, if any. */
-    if (proto_handshake(conn,
-                        bufferevent_get_output(conn->downstream))<0) {
-      log_debug("%s: Error during handshake", conn->peername);
-      close_conn(conn);
+    if (proto_handshake(down, bufferevent_get_output(down->buffer))) {
+      log_debug("%s: Error during handshake", down->peername);
+      conn_free(down);
       return;
     }
 
-    if (evbuffer_get_length(bufferevent_get_input(conn->upstream)) != 0)
-      upstream_read_cb(conn->upstream, conn);
+    if (evbuffer_get_length(bufferevent_get_input(up->buffer)) > 0)
+      upstream_read_cb(up->buffer, up);
     return;
   }
 
diff --git a/src/network.h b/src/network.h
index 8fb41ef..72df32c 100644
--- a/src/network.h
+++ b/src/network.h
@@ -12,23 +12,37 @@ void close_all_listeners(void);
 void start_shutdown(int barbaric);
 
 /**
-   This struct defines the state of a connection between "upstream"
-   and "downstream" peers (it's really two connections at the socket
-   level).  Again, each protocol may extend this structure with
-   additional private data by embedding it as the first member of a
-   larger structure.  The protocol's conn_create() method is responsible
-   only for filling in the |vtable| field of this structure, plus any
-   private data of course.
+   This struct defines the state of one socket-level connection.  Each
+   protocol may extend this structure with additional private data by
+   embedding it as the first member of a larger structure.  The
+   protocol's conn_create() method is responsible only for filling in
+   the |cfg| and |mode| fields of this structure, plus any private
+   data of course.
+
+   An incoming connection is not associated with a circuit until the
+   destination for the other side of the circuit is known.  An outgoing
+   connection is associated with a circuit from its creation.
  */
 struct conn_t {
   config_t           *cfg;
   char               *peername;
   socks_state_t      *socks_state;
-  struct bufferevent *upstream;
-  struct bufferevent *downstream;
+  circuit_t          *circuit;
+  struct bufferevent *buffer;
   enum listen_mode    mode     : 30;
   unsigned int        flushing : 1;
   unsigned int        is_open  : 1;
 };
 
+/**
+   This struct defines a pair of established connections.  The "upstream"
+   connection is to the higher-level client or server that we are proxying
+   traffic for.  The "downstream" connection is to the remote peer.
+ */
+
+struct circuit_t {
+  conn_t             *upstream;
+  conn_t             *downstream;
+};
+
 #endif
diff --git a/src/util.h b/src/util.h
index 8228790..2b37c48 100644
--- a/src/util.h
+++ b/src/util.h
@@ -62,6 +62,7 @@ unsigned int ui64_log2(uint64_t u64);
 
 /***** Network types and functions. *****/
 
+typedef struct circuit_t circuit_t;
 typedef struct config_t config_t;
 typedef struct conn_t conn_t;
 typedef struct protocol_vtable protocol_vtable;





More information about the tor-commits mailing list