commit dfdc4fb57b2633359d41a993d78bc3ae8f7b92fc
Author: Zack Weinberg <zackw(a)panix.com>
Date: Wed Jul 27 17:06:41 2011 -0700
Add a whole bunch more logging and test framework tracking. Properly distinguish BEV_EVENT_EOF from BEV_EVENT_ERROR in error_cb (flush_error_cb and pending_socks_cb are still sloppy).
---
src/main.c | 1 +
src/network.c | 91 +++++++++++++++++++------
src/network.h | 2 +
src/test/tester.py.in | 183 ++++++++++++++++++++++++++++--------------------
src/util.c | 41 +++++++++++
src/util.h | 4 +
6 files changed, 226 insertions(+), 96 deletions(-)
diff --git a/src/main.c b/src/main.c
index f6280c2..1459af3 100644
--- a/src/main.c
+++ b/src/main.c
@@ -91,6 +91,7 @@ handle_signal_cb(evutil_socket_t fd, short what, void *arg)
void
finish_shutdown(void)
{
+ log_debug("Finishing shutdown.");
event_base_loopexit(the_event_base, NULL);
}
diff --git a/src/network.c b/src/network.c
index f05c5b8..7831a4e 100644
--- a/src/network.c
+++ b/src/network.c
@@ -92,6 +92,8 @@ static void pending_socks_cb(struct bufferevent *bev, short what, void *arg);
void
start_shutdown(int barbaric)
{
+ log_debug("Beginning %s shutdown.", barbaric ? "barbaric" : "normal");
+
if (!shutting_down)
shutting_down=1;
@@ -115,6 +117,7 @@ close_all_connections(void)
{
if (!connections)
return;
+ log_debug("Closing all connections.");
SMARTLIST_FOREACH(connections, conn_t *, conn,
{ conn_free(conn); });
smartlist_free(connections);
@@ -143,6 +146,8 @@ create_listener(struct event_base *base, protocol_params_t *params)
default: obfs_abort();
}
+ lsn->address = printable_address(params->listen_addr->ai_addr,
+ params->listen_addr->ai_addrlen);
lsn->proto_params = params;
lsn->listener =
evconnlistener_new_bind(base, callback, lsn, flags, -1,
@@ -156,6 +161,9 @@ create_listener(struct event_base *base, protocol_params_t *params)
return 0;
}
+ log_debug("Now listening on %s in mode %d, protocol %s.",
+ lsn->address, params->mode, params->vtable->name);
+
/* If we don't have a listener list, create one now. */
if (!listeners)
listeners = smartlist_create();
@@ -170,6 +178,8 @@ create_listener(struct event_base *base, protocol_params_t *params)
static void
listener_free(listener_t *lsn)
{
+ if (lsn->address)
+ free(lsn->address);
if (lsn->listener)
evconnlistener_free(lsn->listener);
if (lsn->proto_params)
@@ -206,7 +216,9 @@ simple_client_listener_cb(struct evconnlistener *evcl,
struct event_base *base;
conn_t *conn = xzalloc(sizeof(conn_t));
- log_debug("%s: connection attempt.", __func__);
+ conn->peername = printable_address(sourceaddr, socklen);
+ log_debug("%s: connection to %s from %s", __func__,
+ lsn->address, conn->peername);
conn->mode = lsn->proto_params->mode;
obfs_assert(conn->mode == LSN_SIMPLE_CLIENT);
@@ -280,7 +292,9 @@ socks_client_listener_cb(struct evconnlistener *evcl,
struct event_base *base;
conn_t *conn = xzalloc(sizeof(conn_t));
- log_debug("%s: connection attempt.", __func__);
+ conn->peername = printable_address(sourceaddr, socklen);
+ log_debug("%s: connection to %s from %s", __func__,
+ lsn->address, conn->peername);
conn->mode = lsn->proto_params->mode;
obfs_assert(conn->mode == LSN_SOCKS_CLIENT);
@@ -336,7 +350,9 @@ simple_server_listener_cb(struct evconnlistener *evcl,
struct event_base *base;
conn_t *conn = xzalloc(sizeof(conn_t));
- log_debug("%s: connection attempt.", __func__);
+ conn->peername = printable_address(sourceaddr, socklen);
+ log_debug("%s: connection to %s from %s", __func__,
+ lsn->address, conn->peername);
conn->mode = lsn->proto_params->mode;
obfs_assert(conn->mode == LSN_SIMPLE_SERVER);
@@ -403,6 +419,8 @@ simple_server_listener_cb(struct evconnlistener *evcl,
static void
conn_free(conn_t *conn)
{
+ if (conn->peername)
+ free(conn->peername);
if (conn->proto)
proto_destroy(conn->proto);
if (conn->socks_state)
@@ -423,10 +441,11 @@ static void
close_conn(conn_t *conn)
{
obfs_assert(connections);
+ log_debug("Closing connection from %s; %d remaining",
+ conn->peername, smartlist_len(connections) - 1);
+
smartlist_remove(connections, conn);
conn_free(conn);
- log_debug("Connection destroyed. "
- "We currently have %d connections!", smartlist_len(connections));
/* If this was the last connection AND we are shutting down,
finish shutdown. */
@@ -447,6 +466,7 @@ static void
close_conn_on_flush(struct bufferevent *bev, void *arg)
{
conn_t *conn = arg;
+ log_debug("%s for %s", __func__, conn->peername);
if (evbuffer_get_length(bufferevent_get_output(bev)) == 0)
close_conn(conn);
@@ -460,6 +480,7 @@ socks_read_cb(struct bufferevent *bev, void *arg)
{
conn_t *conn = arg;
enum socks_ret socks_ret;
+ log_debug("%s for %s", __func__, conn->peername);
/* socks only makes sense on the upstream side */
obfs_assert(bev == conn->upstream);
@@ -534,9 +555,9 @@ static void
upstream_read_cb(struct bufferevent *bev, void *arg)
{
conn_t *conn = arg;
+ log_debug("%s for %s", __func__, conn->peername);
obfs_assert(bev == conn->upstream);
- log_debug("Got data on upstream side");
if (proto_send(conn->proto,
bufferevent_get_input(conn->upstream),
bufferevent_get_output(conn->downstream)) < 0)
@@ -553,9 +574,9 @@ downstream_read_cb(struct bufferevent *bev, void *arg)
{
conn_t *conn = arg;
enum recv_ret r;
+ log_debug("%s for %s", __func__, conn->peername);
obfs_assert(bev == conn->downstream);
- log_debug("Got data on downstream side");
r = proto_recv(conn->proto,
bufferevent_get_input(conn->downstream),
bufferevent_get_output(conn->upstream));
@@ -576,12 +597,12 @@ static void
error_or_eof(conn_t *conn, struct bufferevent *bev_err)
{
struct bufferevent *bev_flush;
+ log_debug("%s for %s", __func__, conn->peername);
if (bev_err == conn->upstream) bev_flush = conn->downstream;
else if (bev_err == conn->downstream) bev_flush = conn->upstream;
else obfs_abort();
- log_debug("error_or_eof");
if (conn->flushing || !conn->is_open ||
evbuffer_get_length(bufferevent_get_output(bev_flush)) == 0) {
close_conn(conn);
@@ -607,17 +628,35 @@ error_or_eof(conn_t *conn, struct bufferevent *bev_err)
static void
error_cb(struct bufferevent *bev, short what, void *arg)
{
+ conn_t *conn = arg;
+ int errcode = EVUTIL_SOCKET_ERROR();
+ log_debug("%s for %s: what=%x err=%d", __func__, conn->peername,
+ what, errcode);
+
/* It should be impossible to get here with BEV_EVENT_CONNECTED. */
obfs_assert(what & (BEV_EVENT_EOF|BEV_EVENT_ERROR|BEV_EVENT_TIMEOUT));
obfs_assert(!(what & BEV_EVENT_CONNECTED));
- /* If we get EAGAIN or EINPROGRESS here, something has gone horribly
- wrong. */
- obfs_assert(EVUTIL_SOCKET_ERROR() != EAGAIN &&
- EVUTIL_SOCKET_ERROR() != EINPROGRESS);
-
- log_warn("Got error: %s",
- evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR()));
+ if (what & BEV_EVENT_ERROR) {
+ /* If we get EAGAIN, EINTR, or EINPROGRESS here, something has
+ gone horribly wrong. */
+ obfs_assert(errcode != EAGAIN && errcode != EINTR &&
+ errcode != EINPROGRESS);
+
+ log_warn("Error on %s side of connection from %s: %s",
+ bev == conn->upstream ? "upstream" : "downstream",
+ conn->peername,
+ evutil_socket_error_to_string(errcode));
+ } else if (what & BEV_EVENT_EOF) {
+ log_info("EOF on %s side of connection from %s",
+ bev == conn->upstream ? "upstream" : "downstream",
+ conn->peername);
+ } else {
+ obfs_assert(what & BEV_EVENT_TIMEOUT);
+ log_info("Timeout on %s side of connection from %s",
+ bev == conn->upstream ? "upstream" : "downstream",
+ conn->peername);
+ }
error_or_eof(arg, bev);
}
@@ -629,6 +668,9 @@ static void
flush_error_cb(struct bufferevent *bev, short what, void *arg)
{
conn_t *conn = arg;
+ int errcode = EVUTIL_SOCKET_ERROR();
+ log_debug("%s for %s: what=%x err=%d", __func__, conn->peername,
+ what, errcode);
/* It should be impossible to get here with BEV_EVENT_CONNECTED. */
obfs_assert(what & (BEV_EVENT_EOF|BEV_EVENT_ERROR|BEV_EVENT_TIMEOUT));
@@ -636,8 +678,10 @@ flush_error_cb(struct bufferevent *bev, short what, void *arg)
obfs_assert(conn->flushing);
- log_warn("Error during flush: %s",
- evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR()));
+ log_warn("Error during flush of %s side of connection from %s: %s",
+ bev == conn->upstream ? "upstream" : "downstream",
+ conn->peername,
+ evutil_socket_error_to_string(errcode));
close_conn(conn);
return;
}
@@ -653,6 +697,8 @@ pending_conn_cb(struct bufferevent *bev, short what, void *arg)
{
conn_t *conn = arg;
struct bufferevent *other;
+ log_debug("%s for %s", __func__, conn->peername);
+
if (bev == conn->upstream) other = conn->downstream;
else if (bev == conn->downstream) other = conn->upstream;
else obfs_abort();
@@ -663,13 +709,15 @@ pending_conn_cb(struct bufferevent *bev, short what, void *arg)
obfs_assert(!conn->flushing);
conn->is_open = 1;
- log_debug("Connection successful");
- bufferevent_enable(other, EV_READ|EV_WRITE);
+ log_debug("Successful %s connection for %s",
+ bev == conn->upstream ? "upstream" : "downstream",
+ conn->peername);
/* XXX Dirty access to bufferevent guts. There appears to be no
official API to retrieve the callback functions and/or change
just one callback while leaving the others intact. */
bufferevent_setcb(bev, bev->readcb, bev->writecb, error_cb, conn);
+ bufferevent_enable(other, EV_READ|EV_WRITE);
return;
}
@@ -686,13 +734,16 @@ static void
pending_socks_cb(struct bufferevent *bev, short what, void *arg)
{
conn_t *conn = arg;
+ log_debug("%s for %s", __func__, conn->peername);
obfs_assert(bev == conn->downstream);
obfs_assert(conn->socks_state);
/* If we got an error while in the ST_HAVE_ADDR state, chances are
that we failed connecting to the host requested by the CONNECT
call. This means that we should send a negative SOCKS reply back
- to the client and terminate the connection. */
+ to the client and terminate the connection.
+ XXX properly distinguish BEV_EVENT_EOF from BEV_EVENT_ERROR;
+ errno isn't meaningful in that case... */
if ((what & (BEV_EVENT_EOF|BEV_EVENT_ERROR|BEV_EVENT_TIMEOUT))) {
int err = EVUTIL_SOCKET_ERROR();
log_warn("Connection error: %s",
diff --git a/src/network.h b/src/network.h
index c5ef69e..4ae96fc 100644
--- a/src/network.h
+++ b/src/network.h
@@ -14,11 +14,13 @@ void start_shutdown(int barbaric);
#ifdef NETWORK_PRIVATE
typedef struct listener_t {
+ char *address;
protocol_params_t *proto_params;
struct evconnlistener *listener;
} listener_t;
typedef struct conn_t {
+ char *peername;
protocol_t *proto;
socks_state_t *socks_state;
struct bufferevent *upstream;
diff --git a/src/test/tester.py.in b/src/test/tester.py.in
index 104026c..d9040fe 100644
--- a/src/test/tester.py.in
+++ b/src/test/tester.py.in
@@ -7,16 +7,86 @@
# You need to be able to make connections to arbitrary high-numbered
# TCP ports on the loopback interface.
+import difflib
import errno
import multiprocessing
import Queue
+import re
import signal
import socket
import struct
import subprocess
import time
+import traceback
import unittest
+# Helper: generate unified-format diffs between two named strings.
+# Pythonic escaped-string syntax is used for unprintable characters.
+
+def diff(label, expected, received):
+ if expected == received:
+ return ""
+ else:
+ return (label + "\n"
+ + "\n".join(s.encode("string_escape")
+ for s in
+ difflib.unified_diff(expected.split("\n"),
+ received.split("\n"),
+ "expected", "received",
+ lineterm=""))
+ + "\n")
+
+# Helper: Run obfsproxy instances and confirm that they have
+# completed without any errors.
+
+class Obfsproxy(subprocess.Popen):
+ def __init__(self, *args, **kwargs):
+ argv = ["./obfsproxy", "--log-min-severity=debug"]
+ if len(args) == 1 and (isinstance(args[0], list) or
+ isinstance(args[0], tuple)):
+ argv.extend(args[0])
+ else:
+ argv.extend(args)
+
+ subprocess.Popen.__init__(self, argv,
+ stdin=open("/dev/null", "r"),
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ **kwargs)
+
+ severe_error_re = re.compile(r"\[(?:warn|err(?:or)?)\]")
+
+ def check_completion(self, label):
+ if self.poll() is None:
+ self.send_signal(signal.SIGINT)
+
+ (out, err) = self.communicate()
+
+ report = ""
+ def indent(s):
+ return "| " + "\n| ".join(s.strip().split("\n"))
+
+ # exit status should be zero
+ if self.returncode > 0:
+ report += label + " exit code: %d\n" % self.returncode
+ elif self.returncode < 0:
+ report += label + " killed: signal %d\n" % -self.returncode
+
+ # there should be nothing on stdout
+ if out != "":
+ report += label + " stdout:\n%s\n" % indent(out)
+
+ # there will be debugging messages on stderr, but there should be
+ # no [warn], [err], or [error] messages.
+ if self.severe_error_re.search(err):
+ report += label + " stderr:\n%s\n" % indent(err)
+
+ return report
+
+ def stop(self):
+ if self.poll() is None:
+ self.terminate()
+
# Helper: Repeatedly try to connect to the specified server socket
# until either it succeeds or one full second has elapsed. (Surely
# there is a better way to do this?)
@@ -89,32 +159,14 @@ EXIT_PORT = 5001
class DirectTest(object):
def setUp(self):
self.output_reader = ReadWorker(("127.0.0.1", EXIT_PORT))
- self.obfs = subprocess.Popen(
- self.obfs_args,
- stdin=open("/dev/null", "r"),
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
-
+ self.obfs = Obfsproxy(self.obfs_args)
self.input_chan = connect_with_retry(("127.0.0.1", ENTRY_PORT))
self.input_chan.settimeout(1.0)
def tearDown(self):
- if self.obfs.returncode is None:
- self.obfs.terminate()
+ self.obfs.stop()
self.output_reader.stop()
-
- def checkSubprocesses(self):
- if self.obfs.poll() is None:
- self.obfs.send_signal(signal.SIGINT)
-
- (out, err) = self.obfs.communicate()
-
- if (out != "" or err != "" or self.obfs.returncode != 0):
- self.fail("obfsproxy process failure:\n"
- "\treturn code: %d\n"
- "\tstdout: %s\n"
- "\tstderr: %s\n"
- % (self.obfs.returncode, out, err))
+ self.input_chan.close()
def test_direct_transfer(self):
# Open a server and a simple client (in the same process) and
@@ -127,8 +179,11 @@ class DirectTest(object):
except Queue.Empty:
output = ""
- self.checkSubprocesses()
- self.assertEqual(output, TEST_FILE)
+ report = self.obfs.check_completion("obfsproxy")
+ report += diff("errors in transfer:", TEST_FILE, output)
+
+ if report != "":
+ self.fail("\n" + report)
# Same as above, but we use a socks client instead of a simple client,
# and the server's a separate process.
@@ -136,47 +191,14 @@ class DirectTest(object):
class SocksTest(object):
def setUp(self):
self.output_reader = ReadWorker(("127.0.0.1", EXIT_PORT))
- self.obfs_server = subprocess.Popen(
- self.server_args,
- stdin=open("/dev/null", "r"),
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
- self.obfs_client = subprocess.Popen(
- self.client_args,
- stdin=open("/dev/null", "r"),
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
+ self.obfs_server = Obfsproxy(self.server_args)
+ self.obfs_client = Obfsproxy(self.client_args)
def tearDown(self):
- if self.obfs_server.returncode is None:
- self.obfs_server.terminate()
- if self.obfs_client.returncode is None:
- self.obfs_client.terminate()
+ self.obfs_server.stop()
+ self.obfs_client.stop()
self.output_reader.stop()
- def checkSubprocesses(self):
- if self.obfs_server.poll() is None:
- self.obfs_server.send_signal(signal.SIGINT)
- if self.obfs_client.poll() is None:
- self.obfs_client.send_signal(signal.SIGINT)
-
- (sout, serr) = self.obfs_server.communicate()
- (cout, cerr) = self.obfs_client.communicate()
-
- if (sout != "" or serr != "" or cout != "" or cerr != ""
- or self.obfs_server.returncode != 0
- or self.obfs_client.returncode != 0):
- self.fail("obfsproxy process failures:\n"
- "\tclient return code: %d\n"
- "\tserver return code: %d\n"
- "\tclient stdout: %s\n"
- "\tclient stderr: %s\n"
- "\tserver stdout: %s\n"
- "\tserver stderr: %s\n"
- % (self.obfs_client.returncode,
- self.obfs_server.returncode,
- cout, cerr, sout, serr))
-
# 'sequence' is a sequence of SOCKS[45] protocol messages
# which we will send or receive. Sends alternate with
# receives. Each entry may be a string, which is sent or
@@ -187,7 +209,7 @@ class SocksTest(object):
# the SOCKS sequence without the server having dropped the
# connection, we transmit the test file and expect to get it
# back from the far end.
- def socksTest(self, sequence):
+ def socksTestInner(self, sequence):
sending = True
good = True
input_chan = connect_with_retry(("127.0.0.1", ENTRY_PORT))
@@ -230,10 +252,25 @@ class SocksTest(object):
output = ""
input_chan.close()
- self.checkSubprocesses()
+ if good: return output
+ else: return None
- if good:
- self.assertEqual(output, TEST_FILE)
+ def socksTest(self, sequence):
+ try:
+ output = self.socksTestInner(sequence)
+ report = ""
+ except Exception:
+ output = None
+ report = traceback.format_exc()
+
+ report += self.obfs_server.check_completion("obfsproxy server")
+ report += self.obfs_client.check_completion("obfsproxy client")
+
+ if output is not None:
+ report += diff("errors in transfer:", TEST_FILE, output)
+
+ if report != "":
+ self.fail("\n" + report)
def test_illformed(self):
@@ -295,8 +332,7 @@ class SocksTest(object):
#
class DirectObfs2(DirectTest, unittest.TestCase):
- obfs_args = ("./obfsproxy", "--log-min-severity=warn",
- "obfs2",
+ obfs_args = ("obfs2",
"--dest=127.0.0.1:%d" % EXIT_PORT,
"server", "127.0.0.1:%d" % SERVER_PORT,
"+", "obfs2",
@@ -304,8 +340,7 @@ class DirectObfs2(DirectTest, unittest.TestCase):
"client", "127.0.0.1:%d" % ENTRY_PORT)
class DirectDummy(DirectTest, unittest.TestCase):
- obfs_args = ("./obfsproxy", "--log-min-severity=warn",
- "dummy", "server",
+ obfs_args = ("dummy", "server",
"127.0.0.1:%d" % SERVER_PORT,
"127.0.0.1:%d" % EXIT_PORT,
"+", "dummy", "client",
@@ -313,21 +348,17 @@ class DirectDummy(DirectTest, unittest.TestCase):
"127.0.0.1:%d" % SERVER_PORT)
class SocksObfs2(SocksTest, unittest.TestCase):
- server_args = ("./obfsproxy", "--log-min-severity=warn",
- "obfs2",
+ server_args = ("obfs2",
"--dest=127.0.0.1:%d" % EXIT_PORT,
"server", "127.0.0.1:%d" % SERVER_PORT)
- client_args = ("./obfsproxy", "--log-min-severity=warn",
- "obfs2",
+ client_args = ("obfs2",
"socks", "127.0.0.1:%d" % ENTRY_PORT)
class SocksDummy(SocksTest, unittest.TestCase):
- server_args = ("./obfsproxy", "--log-min-severity=warn",
- "dummy", "server",
+ server_args = ("dummy", "server",
"127.0.0.1:%d" % SERVER_PORT,
"127.0.0.1:%d" % EXIT_PORT)
- client_args = ("./obfsproxy", "--log-min-severity=warn",
- "dummy", "socks",
+ client_args = ("dummy", "socks",
"127.0.0.1:%d" % ENTRY_PORT)
TEST_FILE = """\
diff --git a/src/util.c b/src/util.c
index e44b131..46d4b42 100644
--- a/src/util.c
+++ b/src/util.c
@@ -10,6 +10,10 @@
#include <unistd.h>
#include <event2/dns.h>
+#include <arpa/inet.h>
+#ifdef AF_LOCAL
+#include <sys/un.h>
+#endif
/** Any size_t larger than this amount is likely to be an underflow. */
#define SIZE_T_CEILING (SIZE_MAX/2 - 16)
@@ -195,6 +199,43 @@ resolve_address_port(const char *address, int nodns, int passive,
return ai;
}
+char *
+printable_address(struct sockaddr *addr, socklen_t addrlen)
+{
+ char abuf[INET6_ADDRSTRLEN];
+ char apbuf[INET6_ADDRSTRLEN + 8]; /* []:65535 is 8 characters */
+
+ switch (addr->sa_family) {
+ case AF_INET: {
+ struct sockaddr_in *sin = (struct sockaddr_in*)addr;
+ if (!inet_ntop(AF_INET, &sin->sin_addr, abuf, INET6_ADDRSTRLEN))
+ break;
+ obfs_snprintf(apbuf, sizeof apbuf, "%s:%d", abuf, ntohs(sin->sin_port));
+ return xstrdup(apbuf);
+ }
+
+ case AF_INET6: {
+ struct sockaddr_in6 *sin6 = (struct sockaddr_in6*)addr;
+ if (!inet_ntop(AF_INET, &sin6->sin6_addr, abuf, INET6_ADDRSTRLEN))
+ break;
+ obfs_snprintf(apbuf, sizeof apbuf, "[%s]:%d", abuf,
+ ntohs(sin6->sin6_port));
+ return xstrdup(apbuf);
+ }
+
+#ifdef AF_LOCAL
+ case AF_LOCAL:
+ return xstrdup(((struct sockaddr_un*)addr)->sun_path);
+#endif
+ default:
+ break;
+ }
+
+ obfs_snprintf(apbuf, sizeof apbuf,
+ "<addr family %d>", addr->sa_family);
+ return xstrdup(apbuf);
+}
+
static struct evdns_base *the_evdns_base = NULL;
struct evdns_base *
diff --git a/src/util.h b/src/util.h
index 4fc1a33..5108b62 100644
--- a/src/util.h
+++ b/src/util.h
@@ -93,6 +93,10 @@ struct evutil_addrinfo *resolve_address_port(const char *address,
int nodns, int passive,
const char *default_port);
+/** Produce a printable name for this sockaddr. The result is in
+ malloced memory. */
+char *printable_address(struct sockaddr *addr, socklen_t addrlen);
+
struct evdns_base *get_evdns_base(void);
int init_evdns_base(struct event_base *base);