commit dfdc4fb57b2633359d41a993d78bc3ae8f7b92fc Author: Zack Weinberg zackw@panix.com Date: Wed Jul 27 17:06:41 2011 -0700
Add a whole bunch more logging and test framework tracking. Properly distinguish BEV_EVENT_EOF from BEV_EVENT_ERROR in error_cb (flush_error_cb and pending_socks_cb are still sloppy). --- src/main.c | 1 + src/network.c | 91 +++++++++++++++++++------ src/network.h | 2 + src/test/tester.py.in | 183 ++++++++++++++++++++++++++++-------------------- src/util.c | 41 +++++++++++ src/util.h | 4 + 6 files changed, 226 insertions(+), 96 deletions(-)
diff --git a/src/main.c b/src/main.c index f6280c2..1459af3 100644 --- a/src/main.c +++ b/src/main.c @@ -91,6 +91,7 @@ handle_signal_cb(evutil_socket_t fd, short what, void *arg) void finish_shutdown(void) { + log_debug("Finishing shutdown."); event_base_loopexit(the_event_base, NULL); }
diff --git a/src/network.c b/src/network.c index f05c5b8..7831a4e 100644 --- a/src/network.c +++ b/src/network.c @@ -92,6 +92,8 @@ static void pending_socks_cb(struct bufferevent *bev, short what, void *arg); void start_shutdown(int barbaric) { + log_debug("Beginning %s shutdown.", barbaric ? "barbaric" : "normal"); + if (!shutting_down) shutting_down=1;
@@ -115,6 +117,7 @@ close_all_connections(void) { if (!connections) return; + log_debug("Closing all connections."); SMARTLIST_FOREACH(connections, conn_t *, conn, { conn_free(conn); }); smartlist_free(connections); @@ -143,6 +146,8 @@ create_listener(struct event_base *base, protocol_params_t *params) default: obfs_abort(); }
+ lsn->address = printable_address(params->listen_addr->ai_addr, + params->listen_addr->ai_addrlen); lsn->proto_params = params; lsn->listener = evconnlistener_new_bind(base, callback, lsn, flags, -1, @@ -156,6 +161,9 @@ create_listener(struct event_base *base, protocol_params_t *params) return 0; }
+ log_debug("Now listening on %s in mode %d, protocol %s.", + lsn->address, params->mode, params->vtable->name); + /* If we don't have a listener list, create one now. */ if (!listeners) listeners = smartlist_create(); @@ -170,6 +178,8 @@ create_listener(struct event_base *base, protocol_params_t *params) static void listener_free(listener_t *lsn) { + if (lsn->address) + free(lsn->address); if (lsn->listener) evconnlistener_free(lsn->listener); if (lsn->proto_params) @@ -206,7 +216,9 @@ simple_client_listener_cb(struct evconnlistener *evcl, struct event_base *base; conn_t *conn = xzalloc(sizeof(conn_t));
- log_debug("%s: connection attempt.", __func__); + conn->peername = printable_address(sourceaddr, socklen); + log_debug("%s: connection to %s from %s", __func__, + lsn->address, conn->peername);
conn->mode = lsn->proto_params->mode; obfs_assert(conn->mode == LSN_SIMPLE_CLIENT); @@ -280,7 +292,9 @@ socks_client_listener_cb(struct evconnlistener *evcl, struct event_base *base; conn_t *conn = xzalloc(sizeof(conn_t));
- log_debug("%s: connection attempt.", __func__); + conn->peername = printable_address(sourceaddr, socklen); + log_debug("%s: connection to %s from %s", __func__, + lsn->address, conn->peername);
conn->mode = lsn->proto_params->mode; obfs_assert(conn->mode == LSN_SOCKS_CLIENT); @@ -336,7 +350,9 @@ simple_server_listener_cb(struct evconnlistener *evcl, struct event_base *base; conn_t *conn = xzalloc(sizeof(conn_t));
- log_debug("%s: connection attempt.", __func__); + conn->peername = printable_address(sourceaddr, socklen); + log_debug("%s: connection to %s from %s", __func__, + lsn->address, conn->peername);
conn->mode = lsn->proto_params->mode; obfs_assert(conn->mode == LSN_SIMPLE_SERVER); @@ -403,6 +419,8 @@ simple_server_listener_cb(struct evconnlistener *evcl, static void conn_free(conn_t *conn) { + if (conn->peername) + free(conn->peername); if (conn->proto) proto_destroy(conn->proto); if (conn->socks_state) @@ -423,10 +441,11 @@ static void close_conn(conn_t *conn) { obfs_assert(connections); + log_debug("Closing connection from %s; %d remaining", + conn->peername, smartlist_len(connections) - 1); + smartlist_remove(connections, conn); conn_free(conn); - log_debug("Connection destroyed. " - "We currently have %d connections!", smartlist_len(connections));
/* If this was the last connection AND we are shutting down, finish shutdown. */ @@ -447,6 +466,7 @@ static void close_conn_on_flush(struct bufferevent *bev, void *arg) { conn_t *conn = arg; + log_debug("%s for %s", __func__, conn->peername);
if (evbuffer_get_length(bufferevent_get_output(bev)) == 0) close_conn(conn); @@ -460,6 +480,7 @@ socks_read_cb(struct bufferevent *bev, void *arg) { conn_t *conn = arg; enum socks_ret socks_ret; + log_debug("%s for %s", __func__, conn->peername); /* socks only makes sense on the upstream side */ obfs_assert(bev == conn->upstream);
@@ -534,9 +555,9 @@ static void upstream_read_cb(struct bufferevent *bev, void *arg) { conn_t *conn = arg; + log_debug("%s for %s", __func__, conn->peername); obfs_assert(bev == conn->upstream);
- log_debug("Got data on upstream side"); if (proto_send(conn->proto, bufferevent_get_input(conn->upstream), bufferevent_get_output(conn->downstream)) < 0) @@ -553,9 +574,9 @@ downstream_read_cb(struct bufferevent *bev, void *arg) { conn_t *conn = arg; enum recv_ret r; + log_debug("%s for %s", __func__, conn->peername); obfs_assert(bev == conn->downstream);
- log_debug("Got data on downstream side"); r = proto_recv(conn->proto, bufferevent_get_input(conn->downstream), bufferevent_get_output(conn->upstream)); @@ -576,12 +597,12 @@ static void error_or_eof(conn_t *conn, struct bufferevent *bev_err) { struct bufferevent *bev_flush; + log_debug("%s for %s", __func__, conn->peername);
if (bev_err == conn->upstream) bev_flush = conn->downstream; else if (bev_err == conn->downstream) bev_flush = conn->upstream; else obfs_abort();
- log_debug("error_or_eof"); if (conn->flushing || !conn->is_open || evbuffer_get_length(bufferevent_get_output(bev_flush)) == 0) { close_conn(conn); @@ -607,17 +628,35 @@ error_or_eof(conn_t *conn, struct bufferevent *bev_err) static void error_cb(struct bufferevent *bev, short what, void *arg) { + conn_t *conn = arg; + int errcode = EVUTIL_SOCKET_ERROR(); + log_debug("%s for %s: what=%x err=%d", __func__, conn->peername, + what, errcode); + /* It should be impossible to get here with BEV_EVENT_CONNECTED. */ obfs_assert(what & (BEV_EVENT_EOF|BEV_EVENT_ERROR|BEV_EVENT_TIMEOUT)); obfs_assert(!(what & BEV_EVENT_CONNECTED));
- /* If we get EAGAIN or EINPROGRESS here, something has gone horribly - wrong. */ - obfs_assert(EVUTIL_SOCKET_ERROR() != EAGAIN && - EVUTIL_SOCKET_ERROR() != EINPROGRESS); - - log_warn("Got error: %s", - evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR())); + if (what & BEV_EVENT_ERROR) { + /* If we get EAGAIN, EINTR, or EINPROGRESS here, something has + gone horribly wrong. */ + obfs_assert(errcode != EAGAIN && errcode != EINTR && + errcode != EINPROGRESS); + + log_warn("Error on %s side of connection from %s: %s", + bev == conn->upstream ? "upstream" : "downstream", + conn->peername, + evutil_socket_error_to_string(errcode)); + } else if (what & BEV_EVENT_EOF) { + log_info("EOF on %s side of connection from %s", + bev == conn->upstream ? "upstream" : "downstream", + conn->peername); + } else { + obfs_assert(what & BEV_EVENT_TIMEOUT); + log_info("Timeout on %s side of connection from %s", + bev == conn->upstream ? "upstream" : "downstream", + conn->peername); + } error_or_eof(arg, bev); }
@@ -629,6 +668,9 @@ static void flush_error_cb(struct bufferevent *bev, short what, void *arg) { conn_t *conn = arg; + int errcode = EVUTIL_SOCKET_ERROR(); + log_debug("%s for %s: what=%x err=%d", __func__, conn->peername, + what, errcode);
/* It should be impossible to get here with BEV_EVENT_CONNECTED. */ obfs_assert(what & (BEV_EVENT_EOF|BEV_EVENT_ERROR|BEV_EVENT_TIMEOUT)); @@ -636,8 +678,10 @@ flush_error_cb(struct bufferevent *bev, short what, void *arg)
obfs_assert(conn->flushing);
- log_warn("Error during flush: %s", - evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR())); + log_warn("Error during flush of %s side of connection from %s: %s", + bev == conn->upstream ? "upstream" : "downstream", + conn->peername, + evutil_socket_error_to_string(errcode)); close_conn(conn); return; } @@ -653,6 +697,8 @@ pending_conn_cb(struct bufferevent *bev, short what, void *arg) { conn_t *conn = arg; struct bufferevent *other; + log_debug("%s for %s", __func__, conn->peername); + if (bev == conn->upstream) other = conn->downstream; else if (bev == conn->downstream) other = conn->upstream; else obfs_abort(); @@ -663,13 +709,15 @@ pending_conn_cb(struct bufferevent *bev, short what, void *arg) obfs_assert(!conn->flushing);
conn->is_open = 1; - log_debug("Connection successful"); - bufferevent_enable(other, EV_READ|EV_WRITE); + log_debug("Successful %s connection for %s", + bev == conn->upstream ? "upstream" : "downstream", + conn->peername);
/* XXX Dirty access to bufferevent guts. There appears to be no official API to retrieve the callback functions and/or change just one callback while leaving the others intact. */ bufferevent_setcb(bev, bev->readcb, bev->writecb, error_cb, conn); + bufferevent_enable(other, EV_READ|EV_WRITE); return; }
@@ -686,13 +734,16 @@ static void pending_socks_cb(struct bufferevent *bev, short what, void *arg) { conn_t *conn = arg; + log_debug("%s for %s", __func__, conn->peername); obfs_assert(bev == conn->downstream); obfs_assert(conn->socks_state);
/* If we got an error while in the ST_HAVE_ADDR state, chances are that we failed connecting to the host requested by the CONNECT call. This means that we should send a negative SOCKS reply back - to the client and terminate the connection. */ + to the client and terminate the connection. + XXX properly distinguish BEV_EVENT_EOF from BEV_EVENT_ERROR; + errno isn't meaningful in that case... */ if ((what & (BEV_EVENT_EOF|BEV_EVENT_ERROR|BEV_EVENT_TIMEOUT))) { int err = EVUTIL_SOCKET_ERROR(); log_warn("Connection error: %s", diff --git a/src/network.h b/src/network.h index c5ef69e..4ae96fc 100644 --- a/src/network.h +++ b/src/network.h @@ -14,11 +14,13 @@ void start_shutdown(int barbaric); #ifdef NETWORK_PRIVATE
typedef struct listener_t { + char *address; protocol_params_t *proto_params; struct evconnlistener *listener; } listener_t;
typedef struct conn_t { + char *peername; protocol_t *proto; socks_state_t *socks_state; struct bufferevent *upstream; diff --git a/src/test/tester.py.in b/src/test/tester.py.in index 104026c..d9040fe 100644 --- a/src/test/tester.py.in +++ b/src/test/tester.py.in @@ -7,16 +7,86 @@ # You need to be able to make connections to arbitrary high-numbered # TCP ports on the loopback interface.
+import difflib import errno import multiprocessing import Queue +import re import signal import socket import struct import subprocess import time +import traceback import unittest
+# Helper: generate unified-format diffs between two named strings. +# Pythonic escaped-string syntax is used for unprintable characters. + +def diff(label, expected, received): + if expected == received: + return "" + else: + return (label + "\n" + + "\n".join(s.encode("string_escape") + for s in + difflib.unified_diff(expected.split("\n"), + received.split("\n"), + "expected", "received", + lineterm="")) + + "\n") + +# Helper: Run obfsproxy instances and confirm that they have +# completed without any errors. + +class Obfsproxy(subprocess.Popen): + def __init__(self, *args, **kwargs): + argv = ["./obfsproxy", "--log-min-severity=debug"] + if len(args) == 1 and (isinstance(args[0], list) or + isinstance(args[0], tuple)): + argv.extend(args[0]) + else: + argv.extend(args) + + subprocess.Popen.__init__(self, argv, + stdin=open("/dev/null", "r"), + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + **kwargs) + + severe_error_re = re.compile(r"[(?:warn|err(?:or)?)]") + + def check_completion(self, label): + if self.poll() is None: + self.send_signal(signal.SIGINT) + + (out, err) = self.communicate() + + report = "" + def indent(s): + return "| " + "\n| ".join(s.strip().split("\n")) + + # exit status should be zero + if self.returncode > 0: + report += label + " exit code: %d\n" % self.returncode + elif self.returncode < 0: + report += label + " killed: signal %d\n" % -self.returncode + + # there should be nothing on stdout + if out != "": + report += label + " stdout:\n%s\n" % indent(out) + + # there will be debugging messages on stderr, but there should be + # no [warn], [err], or [error] messages. + if self.severe_error_re.search(err): + report += label + " stderr:\n%s\n" % indent(err) + + return report + + def stop(self): + if self.poll() is None: + self.terminate() + # Helper: Repeatedly try to connect to the specified server socket # until either it succeeds or one full second has elapsed. (Surely # there is a better way to do this?) @@ -89,32 +159,14 @@ EXIT_PORT = 5001 class DirectTest(object): def setUp(self): self.output_reader = ReadWorker(("127.0.0.1", EXIT_PORT)) - self.obfs = subprocess.Popen( - self.obfs_args, - stdin=open("/dev/null", "r"), - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - + self.obfs = Obfsproxy(self.obfs_args) self.input_chan = connect_with_retry(("127.0.0.1", ENTRY_PORT)) self.input_chan.settimeout(1.0)
def tearDown(self): - if self.obfs.returncode is None: - self.obfs.terminate() + self.obfs.stop() self.output_reader.stop() - - def checkSubprocesses(self): - if self.obfs.poll() is None: - self.obfs.send_signal(signal.SIGINT) - - (out, err) = self.obfs.communicate() - - if (out != "" or err != "" or self.obfs.returncode != 0): - self.fail("obfsproxy process failure:\n" - "\treturn code: %d\n" - "\tstdout: %s\n" - "\tstderr: %s\n" - % (self.obfs.returncode, out, err)) + self.input_chan.close()
def test_direct_transfer(self): # Open a server and a simple client (in the same process) and @@ -127,8 +179,11 @@ class DirectTest(object): except Queue.Empty: output = ""
- self.checkSubprocesses() - self.assertEqual(output, TEST_FILE) + report = self.obfs.check_completion("obfsproxy") + report += diff("errors in transfer:", TEST_FILE, output) + + if report != "": + self.fail("\n" + report)
# Same as above, but we use a socks client instead of a simple client, # and the server's a separate process. @@ -136,47 +191,14 @@ class DirectTest(object): class SocksTest(object): def setUp(self): self.output_reader = ReadWorker(("127.0.0.1", EXIT_PORT)) - self.obfs_server = subprocess.Popen( - self.server_args, - stdin=open("/dev/null", "r"), - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - self.obfs_client = subprocess.Popen( - self.client_args, - stdin=open("/dev/null", "r"), - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) + self.obfs_server = Obfsproxy(self.server_args) + self.obfs_client = Obfsproxy(self.client_args)
def tearDown(self): - if self.obfs_server.returncode is None: - self.obfs_server.terminate() - if self.obfs_client.returncode is None: - self.obfs_client.terminate() + self.obfs_server.stop() + self.obfs_client.stop() self.output_reader.stop()
- def checkSubprocesses(self): - if self.obfs_server.poll() is None: - self.obfs_server.send_signal(signal.SIGINT) - if self.obfs_client.poll() is None: - self.obfs_client.send_signal(signal.SIGINT) - - (sout, serr) = self.obfs_server.communicate() - (cout, cerr) = self.obfs_client.communicate() - - if (sout != "" or serr != "" or cout != "" or cerr != "" - or self.obfs_server.returncode != 0 - or self.obfs_client.returncode != 0): - self.fail("obfsproxy process failures:\n" - "\tclient return code: %d\n" - "\tserver return code: %d\n" - "\tclient stdout: %s\n" - "\tclient stderr: %s\n" - "\tserver stdout: %s\n" - "\tserver stderr: %s\n" - % (self.obfs_client.returncode, - self.obfs_server.returncode, - cout, cerr, sout, serr)) - # 'sequence' is a sequence of SOCKS[45] protocol messages # which we will send or receive. Sends alternate with # receives. Each entry may be a string, which is sent or @@ -187,7 +209,7 @@ class SocksTest(object): # the SOCKS sequence without the server having dropped the # connection, we transmit the test file and expect to get it # back from the far end. - def socksTest(self, sequence): + def socksTestInner(self, sequence): sending = True good = True input_chan = connect_with_retry(("127.0.0.1", ENTRY_PORT)) @@ -230,10 +252,25 @@ class SocksTest(object): output = ""
input_chan.close() - self.checkSubprocesses() + if good: return output + else: return None
- if good: - self.assertEqual(output, TEST_FILE) + def socksTest(self, sequence): + try: + output = self.socksTestInner(sequence) + report = "" + except Exception: + output = None + report = traceback.format_exc() + + report += self.obfs_server.check_completion("obfsproxy server") + report += self.obfs_client.check_completion("obfsproxy client") + + if output is not None: + report += diff("errors in transfer:", TEST_FILE, output) + + if report != "": + self.fail("\n" + report)
def test_illformed(self): @@ -295,8 +332,7 @@ class SocksTest(object): #
class DirectObfs2(DirectTest, unittest.TestCase): - obfs_args = ("./obfsproxy", "--log-min-severity=warn", - "obfs2", + obfs_args = ("obfs2", "--dest=127.0.0.1:%d" % EXIT_PORT, "server", "127.0.0.1:%d" % SERVER_PORT, "+", "obfs2", @@ -304,8 +340,7 @@ class DirectObfs2(DirectTest, unittest.TestCase): "client", "127.0.0.1:%d" % ENTRY_PORT)
class DirectDummy(DirectTest, unittest.TestCase): - obfs_args = ("./obfsproxy", "--log-min-severity=warn", - "dummy", "server", + obfs_args = ("dummy", "server", "127.0.0.1:%d" % SERVER_PORT, "127.0.0.1:%d" % EXIT_PORT, "+", "dummy", "client", @@ -313,21 +348,17 @@ class DirectDummy(DirectTest, unittest.TestCase): "127.0.0.1:%d" % SERVER_PORT)
class SocksObfs2(SocksTest, unittest.TestCase): - server_args = ("./obfsproxy", "--log-min-severity=warn", - "obfs2", + server_args = ("obfs2", "--dest=127.0.0.1:%d" % EXIT_PORT, "server", "127.0.0.1:%d" % SERVER_PORT) - client_args = ("./obfsproxy", "--log-min-severity=warn", - "obfs2", + client_args = ("obfs2", "socks", "127.0.0.1:%d" % ENTRY_PORT)
class SocksDummy(SocksTest, unittest.TestCase): - server_args = ("./obfsproxy", "--log-min-severity=warn", - "dummy", "server", + server_args = ("dummy", "server", "127.0.0.1:%d" % SERVER_PORT, "127.0.0.1:%d" % EXIT_PORT) - client_args = ("./obfsproxy", "--log-min-severity=warn", - "dummy", "socks", + client_args = ("dummy", "socks", "127.0.0.1:%d" % ENTRY_PORT)
TEST_FILE = """\ diff --git a/src/util.c b/src/util.c index e44b131..46d4b42 100644 --- a/src/util.c +++ b/src/util.c @@ -10,6 +10,10 @@ #include <unistd.h>
#include <event2/dns.h> +#include <arpa/inet.h> +#ifdef AF_LOCAL +#include <sys/un.h> +#endif
/** Any size_t larger than this amount is likely to be an underflow. */ #define SIZE_T_CEILING (SIZE_MAX/2 - 16) @@ -195,6 +199,43 @@ resolve_address_port(const char *address, int nodns, int passive, return ai; }
+char * +printable_address(struct sockaddr *addr, socklen_t addrlen) +{ + char abuf[INET6_ADDRSTRLEN]; + char apbuf[INET6_ADDRSTRLEN + 8]; /* []:65535 is 8 characters */ + + switch (addr->sa_family) { + case AF_INET: { + struct sockaddr_in *sin = (struct sockaddr_in*)addr; + if (!inet_ntop(AF_INET, &sin->sin_addr, abuf, INET6_ADDRSTRLEN)) + break; + obfs_snprintf(apbuf, sizeof apbuf, "%s:%d", abuf, ntohs(sin->sin_port)); + return xstrdup(apbuf); + } + + case AF_INET6: { + struct sockaddr_in6 *sin6 = (struct sockaddr_in6*)addr; + if (!inet_ntop(AF_INET, &sin6->sin6_addr, abuf, INET6_ADDRSTRLEN)) + break; + obfs_snprintf(apbuf, sizeof apbuf, "[%s]:%d", abuf, + ntohs(sin6->sin6_port)); + return xstrdup(apbuf); + } + +#ifdef AF_LOCAL + case AF_LOCAL: + return xstrdup(((struct sockaddr_un*)addr)->sun_path); +#endif + default: + break; + } + + obfs_snprintf(apbuf, sizeof apbuf, + "<addr family %d>", addr->sa_family); + return xstrdup(apbuf); +} + static struct evdns_base *the_evdns_base = NULL;
struct evdns_base * diff --git a/src/util.h b/src/util.h index 4fc1a33..5108b62 100644 --- a/src/util.h +++ b/src/util.h @@ -93,6 +93,10 @@ struct evutil_addrinfo *resolve_address_port(const char *address, int nodns, int passive, const char *default_port);
+/** Produce a printable name for this sockaddr. The result is in + malloced memory. */ +char *printable_address(struct sockaddr *addr, socklen_t addrlen); + struct evdns_base *get_evdns_base(void); int init_evdns_base(struct event_base *base);