[tor-commits] [obfsproxy/master] Restructure event callbacks by state, not connection side

nickm at torproject.org nickm at torproject.org
Fri Sep 9 17:08:57 UTC 2011


commit 9ebdad11d6bb8ae42c839a918f3b0116db97e5cb
Author: Zack Weinberg <zackw at panix.com>
Date:   Mon Jul 25 12:59:06 2011 -0700

    Restructure event callbacks by state, not connection side
---
 src/network.c |  173 +++++++++++++++++++++++++++++++++------------------------
 1 files changed, 101 insertions(+), 72 deletions(-)

diff --git a/src/network.c b/src/network.c
index 31a7013..f3d09cf 100644
--- a/src/network.c
+++ b/src/network.c
@@ -18,6 +18,7 @@
 
 #include <event2/buffer.h>
 #include <event2/bufferevent.h>
+#include <event2/bufferevent_struct.h>
 #include <event2/listener.h>
 #include <event2/util.h>
 
@@ -56,9 +57,10 @@ static void upstream_read_cb(struct bufferevent *bev, void *arg);
 static void downstream_read_cb(struct bufferevent *bev, void *arg);
 static void socks_read_cb(struct bufferevent *bev, void *arg);
 
-static void input_event_cb(struct bufferevent *bev, short what, void *arg);
-static void output_event_cb(struct bufferevent *bev, short what, void *arg);
-static void socks_event_cb(struct bufferevent *bev, short what, void *arg);
+static void error_cb(struct bufferevent *bev, short what, void *arg);
+static void flush_error_cb(struct bufferevent *bev, short what, void *arg);
+static void pending_conn_cb(struct bufferevent *bev, short what, void *arg);
+static void pending_socks_cb(struct bufferevent *bev, short what, void *arg);
 
 /**
    Puts obfsproxy's networking subsystem on "closing time" mode. This
@@ -213,12 +215,12 @@ simple_client_listener_cb(struct evconnlistener *evcl,
   if (!conn->output)
     goto err;
 
-  bufferevent_setcb(conn->input, upstream_read_cb, NULL, input_event_cb, conn);
+  bufferevent_setcb(conn->input, upstream_read_cb, NULL, error_cb, conn);
   /* don't enable the input side for reading at this point; wait till we
      have a connection to the target */
 
   bufferevent_setcb(conn->output,
-                    downstream_read_cb, NULL, output_event_cb, conn);
+                    downstream_read_cb, NULL, pending_conn_cb, conn);
 
   /* Queue handshake, if any, before connecting. */
   if (proto_handshake(conn->proto, bufferevent_get_output(conn->output)) < 0)
@@ -282,7 +284,7 @@ socks_client_listener_cb(struct evconnlistener *evcl,
     goto err;
   fd = -1; /* prevent double-close */
 
-  bufferevent_setcb(conn->input, socks_read_cb, NULL, input_event_cb, conn);
+  bufferevent_setcb(conn->input, socks_read_cb, NULL, error_cb, conn);
   bufferevent_enable(conn->input, EV_READ|EV_WRITE);
 
   /* Do not create an output bufferevent at this time; the socks
@@ -335,7 +337,7 @@ simple_server_listener_cb(struct evconnlistener *evcl,
     goto err;
   fd = -1; /* prevent double-close */
 
-  bufferevent_setcb(conn->input, downstream_read_cb, NULL, input_event_cb, conn);
+  bufferevent_setcb(conn->input, downstream_read_cb, NULL, error_cb, conn);
 
   /* don't enable the input side for reading at this point; wait till we
      have a connection to the target */
@@ -346,7 +348,7 @@ simple_server_listener_cb(struct evconnlistener *evcl,
     goto err;
 
   bufferevent_setcb(conn->output, upstream_read_cb, NULL,
-                    output_event_cb, conn);
+                    pending_conn_cb, conn);
 
   /* Queue handshake, if any, before connecting. */
   if (proto_handshake(conn->proto,
@@ -457,7 +459,7 @@ socks_read_cb(struct bufferevent *bev, void *arg)
                                             BEV_OPT_CLOSE_ON_FREE);
 
       bufferevent_setcb(conn->output, downstream_read_cb, NULL,
-                        socks_event_cb, conn);
+                        pending_socks_cb, conn);
 
       /* Queue handshake, if any, before connecting. */
       if (proto_handshake(conn->proto,
@@ -497,7 +499,7 @@ socks_read_cb(struct bufferevent *bev, void *arg)
     socks5_send_reply(bufferevent_get_output(bev), conn->socks_state,
                       SOCKS5_FAILED_UNSUPPORTED);
     bufferevent_setcb(bev, NULL,
-                      close_conn_on_flush, output_event_cb, conn);
+                      close_conn_on_flush, flush_error_cb, conn);
     return;
   }
 }
@@ -552,13 +554,17 @@ downstream_read_cb(struct bufferevent *bev, void *arg)
    We prepare the connection to be closed ASAP.
  */
 static void
-error_or_eof(conn_t *conn,
-             struct bufferevent *bev_err, struct bufferevent *bev_flush)
+error_or_eof(conn_t *conn, struct bufferevent *bev_err)
 {
-  log_debug("error_or_eof");
+  struct bufferevent *bev_flush;
+
+  if (bev_err == conn->input) bev_flush = conn->output;
+  else if (bev_err == conn->output) bev_flush = conn->input;
+  else obfs_abort();
 
-  if (conn->flushing || ! conn->is_open ||
-      0 == evbuffer_get_length(bufferevent_get_output(bev_flush))) {
+  log_debug("error_or_eof");
+  if (conn->flushing || !conn->is_open ||
+      evbuffer_get_length(bufferevent_get_output(bev_flush)) == 0) {
     close_conn(conn);
     return;
   }
@@ -567,93 +573,111 @@ error_or_eof(conn_t *conn,
   /* Stop reading and writing; wait for the other side to flush if it has
    * data. */
   bufferevent_disable(bev_err, EV_READ|EV_WRITE);
-  bufferevent_disable(bev_flush, EV_READ);
+  bufferevent_setcb(bev_err, NULL, NULL, flush_error_cb, conn);
 
+  bufferevent_disable(bev_flush, EV_READ);
   bufferevent_setcb(bev_flush, NULL,
-                    close_conn_on_flush, output_event_cb, conn);
+                    close_conn_on_flush, flush_error_cb, conn);
   bufferevent_enable(bev_flush, EV_WRITE);
 }
 
 /**
-   Called when an "event" happens on conn->input.
-   On the input side, all such events are error conditions.
- */
+   Called when an "event" happens on an already-connected socket.
+   This can only be an error or EOF.
+*/
 static void
-input_event_cb(struct bufferevent *bev, short what, void *arg)
+error_cb(struct bufferevent *bev, short what, void *arg)
 {
-  conn_t *conn = arg;
-  obfs_assert(bev == conn->input);
-
-  /* It should be impossible to get BEV_EVENT_CONNECTED on this side. */
+  /* It should be impossible to get here with BEV_EVENT_CONNECTED. */
   obfs_assert(what & (BEV_EVENT_EOF|BEV_EVENT_ERROR|BEV_EVENT_TIMEOUT));
   obfs_assert(!(what & BEV_EVENT_CONNECTED));
 
   log_warn("Got error: %s",
            evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR()));
-  error_or_eof(conn, bev, conn->output);
+  error_or_eof(arg, bev);
 }
 
 /**
-   Called when an "event" happens on conn->output.
-   In addition to the error cases dealt with above, this side can see
-   BEV_EVENT_CONNECTED which indicates that the output connection is
-   now open.
- */
+   Called when an event happens on a socket that's in the process of
+   being flushed and closed.  As above, this can only be an error.
+*/
 static void
-output_event_cb(struct bufferevent *bev, short what, void *arg)
+flush_error_cb(struct bufferevent *bev, short what, void *arg)
 {
   conn_t *conn = arg;
-  obfs_assert(bev == conn->output);
 
-  /* If the connection is terminating *OR* if we got one of the error
-     events, close this connection soon. */
-  if (conn->flushing ||
-      (what & (BEV_EVENT_EOF|BEV_EVENT_ERROR|BEV_EVENT_TIMEOUT))) {
-    log_warn("Got error: %s",
-             evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR()));
-    error_or_eof(conn, bev, conn->input);
-    return;
-  }
+  /* It should be impossible to get here with BEV_EVENT_CONNECTED. */
+  obfs_assert(what & (BEV_EVENT_EOF|BEV_EVENT_ERROR|BEV_EVENT_TIMEOUT));
+  obfs_assert(!(what & BEV_EVENT_CONNECTED));
+
+  obfs_assert(conn->flushing);
+
+  log_warn("Error during flush: %s",
+           evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR()));
+  close_conn(conn);
+  return;
+}
 
-  /* Upon successful connection, go ahead and enable traffic on the
-     input side. */
+/**
+   Called when an "event" happens on a socket that's still waiting to
+   be connected.  We expect to get BEV_EVENT_CONNECTED, which
+   indicates that the connection is now open, but we might also get
+   errors as above.
+*/
+static void
+pending_conn_cb(struct bufferevent *bev, short what, void *arg)
+{
+  conn_t *conn = arg;
+  struct bufferevent *other;
+  if (bev == conn->input) other = conn->output;
+  else if (bev == conn->output) other = conn->input;
+  else obfs_abort();
+
+  /* Upon successful connection, enable traffic on the other side,
+     and replace this callback with the regular error_cb */
   if (what & BEV_EVENT_CONNECTED) {
+    obfs_assert(!conn->flushing);
+
     conn->is_open = 1;
-    log_debug("Connection done") ;
-    bufferevent_enable(conn->input, EV_READ|EV_WRITE);
+    log_debug("Connection successful") ;
+    bufferevent_enable(other, EV_READ|EV_WRITE);
+
+    /* XXX Dirty access to bufferevent guts.  There appears to be no
+       official API to retrieve the callback functions and/or change
+       just one callback while leaving the others intact. */
+    bufferevent_setcb(bev, bev->readcb, bev->writecb, error_cb, conn);
     return;
   }
 
-  /* unrecognized event */
-  obfs_abort();
+  /* Otherwise, must be an error */
+  error_cb(bev, what, arg);
 }
 
 /**
-   Called when an "event" happens on conn->output in socks mode.
-   Handles the same cases as output_event_cb but must also generate
-   appropriate socks messages back on the input side.
+   Called when an "event" happens on a socket in socks mode.
+   Both connections and errors are possible; must generate
+   appropriate socks messages on the input side.
  */
 static void
-socks_event_cb(struct bufferevent *bev, short what, void *arg)
+pending_socks_cb(struct bufferevent *bev, short what, void *arg)
 {
   conn_t *conn = arg;
   obfs_assert(bev == conn->output);
+  obfs_assert(conn->socks_state);
 
   /* If we got an error while in the ST_HAVE_ADDR state, chances are
      that we failed connecting to the host requested by the CONNECT
      call. This means that we should send a negative SOCKS reply back
      to the client and terminate the connection. */
-  if ((what & BEV_EVENT_ERROR) &&
-      socks_state_get_status(conn->socks_state) == ST_HAVE_ADDR) {
-    log_debug("Connection failed");
-    /* Enable EV_WRITE so that we can send the response.
-       Disable EV_READ so that we don't get more stuff from the client. */
-    bufferevent_enable(conn->input, EV_WRITE);
-    bufferevent_disable(conn->input, EV_READ);
-    socks_send_reply(conn->socks_state, bufferevent_get_output(conn->input),
-                     evutil_socket_geterror(bufferevent_getfd(bev)));
-    bufferevent_setcb(conn->input, NULL,
-                      close_conn_on_flush, output_event_cb, conn);
+  if ((what & (BEV_EVENT_EOF|BEV_EVENT_ERROR|BEV_EVENT_TIMEOUT))) {
+    int err = EVUTIL_SOCKET_ERROR();
+    log_warn("Connection error: %s",
+             evutil_socket_error_to_string(err));
+    if (socks_state_get_status(conn->socks_state) == ST_HAVE_ADDR) {
+      socks_send_reply(conn->socks_state, bufferevent_get_output(conn->input),
+                       err);
+    }
+    error_or_eof(conn, bev);
     return;
   }
 
@@ -664,7 +688,9 @@ socks_event_cb(struct bufferevent *bev, short what, void *arg)
     struct sockaddr_storage ss;
     struct sockaddr *sa = (struct sockaddr*)&ss;
     socklen_t slen = sizeof(&ss);
-    obfs_assert(conn->socks_state);
+
+    obfs_assert(!conn->flushing);
+
     if (getpeername(bufferevent_getfd(bev), sa, &slen) == 0) {
       /* Figure out where we actually connected to so that we can tell the
        * socks client */
@@ -672,18 +698,21 @@ socks_event_cb(struct bufferevent *bev, short what, void *arg)
     }
     socks_send_reply(conn->socks_state,
                      bufferevent_get_output(conn->input), 0);
-    /* we sent a socks reply.  We can finally move over to being a regular
-       input bufferevent. */
+
+    /* Switch to regular upstream behavior. */
     socks_state_free(conn->socks_state);
     conn->socks_state = NULL;
-    bufferevent_setcb(conn->input,
-                      upstream_read_cb, NULL, input_event_cb, conn);
-    bufferevent_setcb(conn->output,
-                      downstream_read_cb, NULL, output_event_cb, conn);
+    conn->is_open = 1;
+    log_debug("Connection successful");
+
+    bufferevent_setcb(conn->input, upstream_read_cb, NULL, error_cb, conn);
+    bufferevent_setcb(conn->output, downstream_read_cb, NULL, error_cb, conn);
+    bufferevent_enable(conn->input, EV_READ|EV_WRITE);
     if (evbuffer_get_length(bufferevent_get_input(conn->input)) != 0)
       downstream_read_cb(bev, conn->input);
+    return;
   }
 
-  /* also do everything that's done on a normal connection */
-  output_event_cb(bev, what, arg);
+  /* unknown event code */
+  obfs_abort();
 }





More information about the tor-commits mailing list