commit 9ebdad11d6bb8ae42c839a918f3b0116db97e5cb Author: Zack Weinberg zackw@panix.com Date: Mon Jul 25 12:59:06 2011 -0700
Restructure event callbacks by state, not connection side --- src/network.c | 173 +++++++++++++++++++++++++++++++++------------------------ 1 files changed, 101 insertions(+), 72 deletions(-)
diff --git a/src/network.c b/src/network.c index 31a7013..f3d09cf 100644 --- a/src/network.c +++ b/src/network.c @@ -18,6 +18,7 @@
#include <event2/buffer.h> #include <event2/bufferevent.h> +#include <event2/bufferevent_struct.h> #include <event2/listener.h> #include <event2/util.h>
@@ -56,9 +57,10 @@ static void upstream_read_cb(struct bufferevent *bev, void *arg); static void downstream_read_cb(struct bufferevent *bev, void *arg); static void socks_read_cb(struct bufferevent *bev, void *arg);
-static void input_event_cb(struct bufferevent *bev, short what, void *arg); -static void output_event_cb(struct bufferevent *bev, short what, void *arg); -static void socks_event_cb(struct bufferevent *bev, short what, void *arg); +static void error_cb(struct bufferevent *bev, short what, void *arg); +static void flush_error_cb(struct bufferevent *bev, short what, void *arg); +static void pending_conn_cb(struct bufferevent *bev, short what, void *arg); +static void pending_socks_cb(struct bufferevent *bev, short what, void *arg);
/** Puts obfsproxy's networking subsystem on "closing time" mode. This @@ -213,12 +215,12 @@ simple_client_listener_cb(struct evconnlistener *evcl, if (!conn->output) goto err;
- bufferevent_setcb(conn->input, upstream_read_cb, NULL, input_event_cb, conn); + bufferevent_setcb(conn->input, upstream_read_cb, NULL, error_cb, conn); /* don't enable the input side for reading at this point; wait till we have a connection to the target */
bufferevent_setcb(conn->output, - downstream_read_cb, NULL, output_event_cb, conn); + downstream_read_cb, NULL, pending_conn_cb, conn);
/* Queue handshake, if any, before connecting. */ if (proto_handshake(conn->proto, bufferevent_get_output(conn->output)) < 0) @@ -282,7 +284,7 @@ socks_client_listener_cb(struct evconnlistener *evcl, goto err; fd = -1; /* prevent double-close */
- bufferevent_setcb(conn->input, socks_read_cb, NULL, input_event_cb, conn); + bufferevent_setcb(conn->input, socks_read_cb, NULL, error_cb, conn); bufferevent_enable(conn->input, EV_READ|EV_WRITE);
/* Do not create an output bufferevent at this time; the socks @@ -335,7 +337,7 @@ simple_server_listener_cb(struct evconnlistener *evcl, goto err; fd = -1; /* prevent double-close */
- bufferevent_setcb(conn->input, downstream_read_cb, NULL, input_event_cb, conn); + bufferevent_setcb(conn->input, downstream_read_cb, NULL, error_cb, conn);
/* don't enable the input side for reading at this point; wait till we have a connection to the target */ @@ -346,7 +348,7 @@ simple_server_listener_cb(struct evconnlistener *evcl, goto err;
bufferevent_setcb(conn->output, upstream_read_cb, NULL, - output_event_cb, conn); + pending_conn_cb, conn);
/* Queue handshake, if any, before connecting. */ if (proto_handshake(conn->proto, @@ -457,7 +459,7 @@ socks_read_cb(struct bufferevent *bev, void *arg) BEV_OPT_CLOSE_ON_FREE);
bufferevent_setcb(conn->output, downstream_read_cb, NULL, - socks_event_cb, conn); + pending_socks_cb, conn);
/* Queue handshake, if any, before connecting. */ if (proto_handshake(conn->proto, @@ -497,7 +499,7 @@ socks_read_cb(struct bufferevent *bev, void *arg) socks5_send_reply(bufferevent_get_output(bev), conn->socks_state, SOCKS5_FAILED_UNSUPPORTED); bufferevent_setcb(bev, NULL, - close_conn_on_flush, output_event_cb, conn); + close_conn_on_flush, flush_error_cb, conn); return; } } @@ -552,13 +554,17 @@ downstream_read_cb(struct bufferevent *bev, void *arg) We prepare the connection to be closed ASAP. */ static void -error_or_eof(conn_t *conn, - struct bufferevent *bev_err, struct bufferevent *bev_flush) +error_or_eof(conn_t *conn, struct bufferevent *bev_err) { - log_debug("error_or_eof"); + struct bufferevent *bev_flush; + + if (bev_err == conn->input) bev_flush = conn->output; + else if (bev_err == conn->output) bev_flush = conn->input; + else obfs_abort();
- if (conn->flushing || ! conn->is_open || - 0 == evbuffer_get_length(bufferevent_get_output(bev_flush))) { + log_debug("error_or_eof"); + if (conn->flushing || !conn->is_open || + evbuffer_get_length(bufferevent_get_output(bev_flush)) == 0) { close_conn(conn); return; } @@ -567,93 +573,111 @@ error_or_eof(conn_t *conn, /* Stop reading and writing; wait for the other side to flush if it has * data. */ bufferevent_disable(bev_err, EV_READ|EV_WRITE); - bufferevent_disable(bev_flush, EV_READ); + bufferevent_setcb(bev_err, NULL, NULL, flush_error_cb, conn);
+ bufferevent_disable(bev_flush, EV_READ); bufferevent_setcb(bev_flush, NULL, - close_conn_on_flush, output_event_cb, conn); + close_conn_on_flush, flush_error_cb, conn); bufferevent_enable(bev_flush, EV_WRITE); }
/** - Called when an "event" happens on conn->input. - On the input side, all such events are error conditions. - */ + Called when an "event" happens on an already-connected socket. + This can only be an error or EOF. +*/ static void -input_event_cb(struct bufferevent *bev, short what, void *arg) +error_cb(struct bufferevent *bev, short what, void *arg) { - conn_t *conn = arg; - obfs_assert(bev == conn->input); - - /* It should be impossible to get BEV_EVENT_CONNECTED on this side. */ + /* It should be impossible to get here with BEV_EVENT_CONNECTED. */ obfs_assert(what & (BEV_EVENT_EOF|BEV_EVENT_ERROR|BEV_EVENT_TIMEOUT)); obfs_assert(!(what & BEV_EVENT_CONNECTED));
log_warn("Got error: %s", evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR())); - error_or_eof(conn, bev, conn->output); + error_or_eof(arg, bev); }
/** - Called when an "event" happens on conn->output. - In addition to the error cases dealt with above, this side can see - BEV_EVENT_CONNECTED which indicates that the output connection is - now open. - */ + Called when an event happens on a socket that's in the process of + being flushed and closed. As above, this can only be an error. +*/ static void -output_event_cb(struct bufferevent *bev, short what, void *arg) +flush_error_cb(struct bufferevent *bev, short what, void *arg) { conn_t *conn = arg; - obfs_assert(bev == conn->output);
- /* If the connection is terminating *OR* if we got one of the error - events, close this connection soon. */ - if (conn->flushing || - (what & (BEV_EVENT_EOF|BEV_EVENT_ERROR|BEV_EVENT_TIMEOUT))) { - log_warn("Got error: %s", - evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR())); - error_or_eof(conn, bev, conn->input); - return; - } + /* It should be impossible to get here with BEV_EVENT_CONNECTED. */ + obfs_assert(what & (BEV_EVENT_EOF|BEV_EVENT_ERROR|BEV_EVENT_TIMEOUT)); + obfs_assert(!(what & BEV_EVENT_CONNECTED)); + + obfs_assert(conn->flushing); + + log_warn("Error during flush: %s", + evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR())); + close_conn(conn); + return; +}
- /* Upon successful connection, go ahead and enable traffic on the - input side. */ +/** + Called when an "event" happens on a socket that's still waiting to + be connected. We expect to get BEV_EVENT_CONNECTED, which + indicates that the connection is now open, but we might also get + errors as above. +*/ +static void +pending_conn_cb(struct bufferevent *bev, short what, void *arg) +{ + conn_t *conn = arg; + struct bufferevent *other; + if (bev == conn->input) other = conn->output; + else if (bev == conn->output) other = conn->input; + else obfs_abort(); + + /* Upon successful connection, enable traffic on the other side, + and replace this callback with the regular error_cb */ if (what & BEV_EVENT_CONNECTED) { + obfs_assert(!conn->flushing); + conn->is_open = 1; - log_debug("Connection done") ; - bufferevent_enable(conn->input, EV_READ|EV_WRITE); + log_debug("Connection successful") ; + bufferevent_enable(other, EV_READ|EV_WRITE); + + /* XXX Dirty access to bufferevent guts. There appears to be no + official API to retrieve the callback functions and/or change + just one callback while leaving the others intact. */ + bufferevent_setcb(bev, bev->readcb, bev->writecb, error_cb, conn); return; }
- /* unrecognized event */ - obfs_abort(); + /* Otherwise, must be an error */ + error_cb(bev, what, arg); }
/** - Called when an "event" happens on conn->output in socks mode. - Handles the same cases as output_event_cb but must also generate - appropriate socks messages back on the input side. + Called when an "event" happens on a socket in socks mode. + Both connections and errors are possible; must generate + appropriate socks messages on the input side. */ static void -socks_event_cb(struct bufferevent *bev, short what, void *arg) +pending_socks_cb(struct bufferevent *bev, short what, void *arg) { conn_t *conn = arg; obfs_assert(bev == conn->output); + obfs_assert(conn->socks_state);
/* If we got an error while in the ST_HAVE_ADDR state, chances are that we failed connecting to the host requested by the CONNECT call. This means that we should send a negative SOCKS reply back to the client and terminate the connection. */ - if ((what & BEV_EVENT_ERROR) && - socks_state_get_status(conn->socks_state) == ST_HAVE_ADDR) { - log_debug("Connection failed"); - /* Enable EV_WRITE so that we can send the response. - Disable EV_READ so that we don't get more stuff from the client. */ - bufferevent_enable(conn->input, EV_WRITE); - bufferevent_disable(conn->input, EV_READ); - socks_send_reply(conn->socks_state, bufferevent_get_output(conn->input), - evutil_socket_geterror(bufferevent_getfd(bev))); - bufferevent_setcb(conn->input, NULL, - close_conn_on_flush, output_event_cb, conn); + if ((what & (BEV_EVENT_EOF|BEV_EVENT_ERROR|BEV_EVENT_TIMEOUT))) { + int err = EVUTIL_SOCKET_ERROR(); + log_warn("Connection error: %s", + evutil_socket_error_to_string(err)); + if (socks_state_get_status(conn->socks_state) == ST_HAVE_ADDR) { + socks_send_reply(conn->socks_state, bufferevent_get_output(conn->input), + err); + } + error_or_eof(conn, bev); return; }
@@ -664,7 +688,9 @@ socks_event_cb(struct bufferevent *bev, short what, void *arg) struct sockaddr_storage ss; struct sockaddr *sa = (struct sockaddr*)&ss; socklen_t slen = sizeof(&ss); - obfs_assert(conn->socks_state); + + obfs_assert(!conn->flushing); + if (getpeername(bufferevent_getfd(bev), sa, &slen) == 0) { /* Figure out where we actually connected to so that we can tell the * socks client */ @@ -672,18 +698,21 @@ socks_event_cb(struct bufferevent *bev, short what, void *arg) } socks_send_reply(conn->socks_state, bufferevent_get_output(conn->input), 0); - /* we sent a socks reply. We can finally move over to being a regular - input bufferevent. */ + + /* Switch to regular upstream behavior. */ socks_state_free(conn->socks_state); conn->socks_state = NULL; - bufferevent_setcb(conn->input, - upstream_read_cb, NULL, input_event_cb, conn); - bufferevent_setcb(conn->output, - downstream_read_cb, NULL, output_event_cb, conn); + conn->is_open = 1; + log_debug("Connection successful"); + + bufferevent_setcb(conn->input, upstream_read_cb, NULL, error_cb, conn); + bufferevent_setcb(conn->output, downstream_read_cb, NULL, error_cb, conn); + bufferevent_enable(conn->input, EV_READ|EV_WRITE); if (evbuffer_get_length(bufferevent_get_input(conn->input)) != 0) downstream_read_cb(bev, conn->input); + return; }
- /* also do everything that's done on a normal connection */ - output_event_cb(bev, what, arg); + /* unknown event code */ + obfs_abort(); }
tor-commits@lists.torproject.org