commit eb200a8333b4cbb4b239b2fcfbaeeb3874c298ac Author: Zack Weinberg zackw@panix.com Date: Sun Apr 1 11:34:27 2012 -0700
Candidate fixes for bugs exposed by running the automated tests on HTTP steg. Not 100% working yet. --- src/network.cc | 8 ++- src/protocol/chop.cc | 120 ++++++++++++++++++++++++++----------------------- src/steg/jsSteg.cc | 4 +- src/steg/payloads.cc | 12 ++--- src/test/test_tl.py | 9 +++- 5 files changed, 81 insertions(+), 72 deletions(-)
diff --git a/src/network.cc b/src/network.cc index 7eac64d..55944fe 100644 --- a/src/network.cc +++ b/src/network.cc @@ -444,12 +444,14 @@ downstream_flush_cb(struct bufferevent *bev, void *arg) { conn_t *conn = (conn_t *)arg; size_t remain = evbuffer_get_length(bufferevent_get_output(bev)); - log_debug(conn, "%lu bytes still to transmit%s%s", + log_debug(conn, "%lu bytes still to transmit%s%s%s", (unsigned long)remain, conn->connected ? "" : " (not connected)", - conn->flushing ? "" : " (not flushing)"); + conn->flushing ? "" : " (not flushing)", + conn->circuit() ? "" : " (no circuit)");
- if (remain == 0 && conn->flushing && conn->connected) { + if (remain == 0 && ((conn->flushing && conn->connected) + || !conn->circuit())) { bufferevent_disable(bev, EV_WRITE); if (bufferevent_get_enabled(bev)) { log_debug(conn, "sending EOF downstream"); diff --git a/src/protocol/chop.cc b/src/protocol/chop.cc index 27e03ab..a46d0ac 100644 --- a/src/protocol/chop.cc +++ b/src/protocol/chop.cc @@ -667,46 +667,49 @@ chop_circuit_t::send() { circuit_disarm_flush_timer(this);
- if (downstreams.empty()) { - // We have no connections, but we must send. If we're the client, - // reopen our outbound connections; the on-connection event will - // bring us back here. If we're the server, we have to just - // twiddle our thumbs and hope the client reconnects. - log_debug(this, "no downstream connections"); - if (config->mode != LSN_SIMPLE_SERVER) - circuit_reopen_downstreams(this); - else - circuit_arm_axe_timer(this, axe_interval()); - return 0; - } - struct evbuffer *xmit_pending = bufferevent_get_input(up_buffer); size_t avail = evbuffer_get_length(xmit_pending); size_t avail0 = avail;
- // Send at least one block, even if there is no real data to send. - do { - log_debug(this, "%lu bytes to send", (unsigned long)avail); - size_t blocksize; - chop_conn_t *target = pick_connection(avail, &blocksize); - if (!target) { - // this is not an error; it can happen e.g. when the server has - // something to send immediately and the client hasn't spoken yet - log_debug(this, "no target connection available"); - break; - } + if (downstreams.empty()) { + log_debug(this, "no downstream connections"); + } else { + // Send at least one block, even if there is no real data to send. + do { + log_debug(this, "%lu bytes to send", (unsigned long)avail); + size_t blocksize; + chop_conn_t *target = pick_connection(avail, &blocksize); + if (!target) { + // this is not an error; it can happen e.g. when the server has + // something to send immediately and the client hasn't spoken yet + log_debug(this, "no target connection available"); + break; + }
- if (send_targeted(target, blocksize)) - return -1; + if (send_targeted(target, blocksize)) + return -1;
- avail = evbuffer_get_length(xmit_pending); - } while (avail > 0); + avail = evbuffer_get_length(xmit_pending); + } while (avail > 0); + }
- if (avail0 > avail) // we transmitted some real data - dead_cycles = 0; - else { + if (avail0 == avail) { // no forward progress dead_cycles++; log_debug(this, "%u dead cycles", dead_cycles); + + // If there was real data and we didn't make any progress on it, + // or if there are no downstream connections at all, and we're the + // client, try opening new connections. If we're the server, we + // have to just twiddle our thumbs and hope the client does that. + // Note that due to the sliding window of receive blocks, there is + // a hard upper limit of 127 outstanding connections (that is, + // half the receive window). + if ((avail0 > 0 && downstreams.size() < 127) || downstreams.empty()) { + if (config->mode != LSN_SIMPLE_SERVER) + circuit_reopen_downstreams(this); + else + circuit_arm_axe_timer(this, axe_interval()); + } }
return check_for_eof(); @@ -843,6 +846,10 @@ chop_circuit_t::send_targeted(chop_conn_t *conn, size_t d, size_t p, opcode_t f, send_seq++; if (f == op_FIN) sent_fin = true; + if ((f == op_DAT || f == op_FIN) && d > 0) + // We are making forward progress if we are _either_ sending or + // receiving data. + dead_cycles = 0; return 0; }
@@ -868,34 +875,32 @@ chop_circuit_t::pick_connection(size_t desired, size_t *blocksize) for (unordered_set<chop_conn_t *>::iterator i = downstreams.begin(); i != downstreams.end(); i++) { chop_conn_t *conn = *i; - // We can only use candidates that have a steg target already. - if (conn->steg) { - // Find the connections whose transmit rooms are closest to the - // desired transmission length from both directions. - size_t room = conn->steg->transmit_room(); + if (!conn->steg) { + log_debug(conn, "offers 0 bytes (no steg)"); + continue; + }
- if (room <= MIN_BLOCK_SIZE) - room = 0; + size_t room = conn->steg->transmit_room();
- if (room > MAX_BLOCK_SIZE) - room = MAX_BLOCK_SIZE; + if (room <= MIN_BLOCK_SIZE) + room = 0;
- log_debug(conn, "offers %lu bytes (%s)", (unsigned long)room, - conn->steg->cfg()->name()); + if (room > MAX_BLOCK_SIZE) + room = MAX_BLOCK_SIZE;
- if (room >= desired) { - if (room < minabove) { - minabove = room; - targabove = conn; - } - } else { - if (room > maxbelow) { - maxbelow = room; - targbelow = conn; - } + log_debug(conn, "offers %lu bytes (%s)", (unsigned long)room, + conn->steg->cfg()->name()); + + if (room >= desired) { + if (room < minabove) { + minabove = room; + targabove = conn; } } else { - log_debug(conn, "offers 0 bytes (no steg)"); + if (room > maxbelow) { + maxbelow = room; + targbelow = conn; + } } }
@@ -943,6 +948,9 @@ chop_circuit_t::process_queue() log_info(this, "protocol error: data after FIN"); pending_error = true; } else { + // We are making forward progress if we are _either_ sending or + // receiving data. + dead_cycles = 0; if (evbuffer_add_buffer(bufferevent_get_output(up_buffer), blk.data)) { log_warn(this, "buffer transfer failure"); @@ -989,8 +997,6 @@ chop_circuit_t::process_queue() }
log_debug(this, "processed %u blocks", count); - if (count > 0) - dead_cycles = 0; if (sent_error) return -1;
@@ -1318,8 +1324,10 @@ chop_conn_t::send() // comes in for a stale circuit. if (upstream) { log_debug(this, "must send"); - if (upstream->send_targeted(this)) + if (upstream->send_targeted(this)) { + upstream->drop_downstream(this); conn_do_flush(this); + }
} else { log_debug(this, "must send (no upstream)"); diff --git a/src/steg/jsSteg.cc b/src/steg/jsSteg.cc index 60893ef..06d31e4 100644 --- a/src/steg/jsSteg.cc +++ b/src/steg/jsSteg.cc @@ -778,11 +778,9 @@ http_server_JS_transmit (payloads& pl, struct evbuffer *source, conn_t *conn,
free(iv);
- log_debug("SERVER encoded data in hex string (len %d):", datalen); + //log_debug("SERVER encoded data in hex string (len %d):", datalen); // buf_dump((unsigned char*)data, datalen, stderr);
- - if (get_payload(pl, content_type, datalen, &jsTemplate, &jsLen) == 1) { log_debug("SERVER found the applicable HTTP response template with size %d", jsLen); } else { diff --git a/src/steg/payloads.cc b/src/steg/payloads.cc index d9546d2..8a766c5 100644 --- a/src/steg/payloads.cc +++ b/src/steg/payloads.cc @@ -1158,10 +1158,7 @@ int init_JS_payload_pool(payloads& pl, int len, int type, int minCapacity) { } }
- pl.max_JS_capacity = maxPayloadCap; - - pl.initTypePayload[contentType] = 1; pl.typePayloadCount[contentType] = cnt; log_debug("init_payload_pool: typePayloadCount for contentType %d = %d", @@ -1243,10 +1240,8 @@ int init_HTML_payload_pool(payloads& pl, int len, int type, int minCapacity) { } }
- pl.max_HTML_capacity = maxPayloadCap; - - + pl.initTypePayload[contentType] = 1; pl.typePayloadCount[contentType] = cnt; log_debug("init_payload_pool: typePayloadCount for contentType %d = %d", contentType, pl.typePayloadCount[contentType]); @@ -1436,8 +1431,9 @@ int get_next_payload (payloads& pl, int contentType, char** buf, int get_payload (payloads& pl, int contentType, int cap, char** buf, int* size) { int r, i, cnt, found = 0, numCandidate = 0, first, best, current;
- log_debug("get_payload: contentType = %d, initTypePayload = %d, typePayloadCount = %d", - contentType, pl.initTypePayload[contentType], pl.typePayloadCount[contentType]); + log_debug("contentType = %d, initTypePayload = %d, typePayloadCount = %d", + contentType, pl.initTypePayload[contentType], + pl.typePayloadCount[contentType]);
if (contentType <= 0 || contentType >= MAX_CONTENT_TYPE || diff --git a/src/test/test_tl.py b/src/test/test_tl.py index 9987c7d..eb61f31 100644 --- a/src/test/test_tl.py +++ b/src/test/test_tl.py @@ -87,8 +87,13 @@ class TimelineTest(object): # "127.0.0.1:5010","embed", # ))
- # NOTE: 'http' steg presently cannot be tested using this system - # because the trace pools are process-global rather than per-listener. + def test_http(self): + self.doTest("chop", + ("chop", "server", "127.0.0.1:5001", + "127.0.0.1:5010","http","127.0.0.1:5011","http", + "chop", "client", "127.0.0.1:4999", + "127.0.0.1:5010","http","127.0.0.1:5011","http", + ))
# Synthesize TimelineTest+TestCase subclasses for every 'tl_*' file in # the test directory.
tor-commits@lists.torproject.org