commit 2efbab2aaf98d8f8f0df504efd4fdd0fac77d354 Author: Andrea Shepard andrea@torproject.org Date: Tue Oct 29 02:13:53 2013 -0700
Provide generic mechanism for scheduler to query writeable cells on a channel --- src/or/channel.c | 34 ++++++++++++++++++++++++++++++++++ src/or/channel.h | 7 ++++++- src/or/channeltls.c | 30 ++++++++++++++++++++++++++++++ src/or/connection_or.c | 8 -------- src/or/or.h | 12 ++++++++++++ src/or/scheduler.c | 16 +++++++++++++--- 6 files changed, 95 insertions(+), 12 deletions(-)
diff --git a/src/or/channel.c b/src/or/channel.c index da2493a..e9451f1 100644 --- a/src/or/channel.c +++ b/src/or/channel.c @@ -3826,6 +3826,40 @@ channel_mark_outgoing(channel_t *chan) chan->is_incoming = 0; }
+/************************ + * Flow control queries * + ***********************/ + +/* + * Estimate the number of writeable cells + * + * Ask the lower layer for an estimate of how many cells it can accept, and + * then subtract the length of our outgoing_queue, if any, to produce an + * estimate of the number of cells this channel can accept for writes. + */ + +int +channel_num_cells_writeable(channel_t *chan) +{ + int result; + + tor_assert(chan); + tor_assert(chan->num_cells_writeable); + + if (chan->state == CHANNEL_STATE_OPEN) { + /* Query lower layer */ + result = chan->num_cells_writeable(chan); + /* Subtract cell queue length, if any */ + result -= chan_cell_queue_len(&chan->outgoing_queue); + if (result < 0) result = 0; + } else { + /* No cells are writeable in any other state */ + result = 0; + } + + return result; +} + /********************* * Timestamp updates * ********************/ diff --git a/src/or/channel.h b/src/or/channel.h index 1481992..28b5ab0 100644 --- a/src/or/channel.h +++ b/src/or/channel.h @@ -110,7 +110,9 @@ struct channel_s { int (*matches_extend_info)(channel_t *, extend_info_t *); /** Check if this channel matches a target address when extending */ int (*matches_target)(channel_t *, const tor_addr_t *); - /** Write a cell to an open channel */ + /* Ask the lower layer how many cells can be written */ + int (*num_cells_writeable)(channel_t *); + /* Write a cell to an open channel */ int (*write_cell)(channel_t *, cell_t *); /** Write a packed cell to an open channel */ int (*write_packed_cell)(channel_t *, packed_cell_t *); @@ -465,6 +467,9 @@ void channel_listener_dump_statistics(channel_listener_t *chan_l, void channel_listener_dump_transport_statistics(channel_listener_t *chan_l, int severity);
+/* Flow control queries */ +int channel_num_cells_writeable(channel_t *chan); + /* Timestamp queries */ time_t channel_when_created(channel_t *chan); time_t channel_when_last_active(channel_t *chan); diff --git a/src/or/channeltls.c b/src/or/channeltls.c index af1aab3..b828b15 100644 --- a/src/or/channeltls.c +++ b/src/or/channeltls.c @@ -68,6 +68,7 @@ channel_tls_matches_extend_info_method(channel_t *chan, extend_info_t *extend_info); static int channel_tls_matches_target_method(channel_t *chan, const tor_addr_t *target); +static int channel_tls_num_cells_writeable_method(channel_t *chan); static int channel_tls_write_cell_method(channel_t *chan, cell_t *cell); static int channel_tls_write_packed_cell_method(channel_t *chan, @@ -124,6 +125,7 @@ channel_tls_common_init(channel_tls_t *tlschan) chan->is_canonical = channel_tls_is_canonical_method; chan->matches_extend_info = channel_tls_matches_extend_info_method; chan->matches_target = channel_tls_matches_target_method; + chan->num_cells_writeable = channel_tls_num_cells_writeable_method; chan->write_cell = channel_tls_write_cell_method; chan->write_packed_cell = channel_tls_write_packed_cell_method; chan->write_var_cell = channel_tls_write_var_cell_method; @@ -674,6 +676,34 @@ channel_tls_matches_target_method(channel_t *chan, }
/** + * Tell the upper layer how many cells we can accept to write + * + * This implements the num_cells_writeable method for channel_tls_t; it + * returns an estimate of the number of cells we can accept with + * channel_tls_write_*_cell(). + */ + +static int +channel_tls_num_cells_writeable_method(channel_t *chan) +{ + size_t outbuf_len; + int n; + channel_tls_t *tlschan = BASE_CHAN_TO_TLS(chan); + size_t cell_network_size; + + tor_assert(tlschan); + tor_assert(tlschan->conn); + + cell_network_size = get_cell_network_size(tlschan->conn->wide_circ_ids); + outbuf_len = connection_get_outbuf_len(TO_CONN(tlschan->conn)); + /* Get the number of cells */ + n = CEIL_DIV(OR_CONN_HIGHWATER - outbuf_len, cell_network_size); + if (n < 0) n = 0; + + return n; +} + +/** * Write a cell to a channel_tls_t * * This implements the write_cell method for channel_tls_t; given a diff --git a/src/or/connection_or.c b/src/or/connection_or.c index 9074c0a..a8f9d41 100644 --- a/src/or/connection_or.c +++ b/src/or/connection_or.c @@ -576,14 +576,6 @@ connection_or_process_inbuf(or_connection_t *conn) return ret; }
-/** When adding cells to an OR connection's outbuf, keep adding until the - * outbuf is at least this long, or we run out of cells. */ -#define OR_CONN_HIGHWATER (32*1024) - -/** Add cells to an OR connection's outbuf whenever the outbuf's data length - * drops below this size. */ -#define OR_CONN_LOWWATER (16*1024) - /** Called whenever we have flushed some data on an or_conn: add more data * from active circuits. */ int diff --git a/src/or/or.h b/src/or/or.h index b2b0d5f..bdcb29e 100644 --- a/src/or/or.h +++ b/src/or/or.h @@ -1426,6 +1426,18 @@ typedef struct or_handshake_state_t {
/** Length of Extended ORPort connection identifier. */ #define EXT_OR_CONN_ID_LEN DIGEST_LEN /* 20 */ +/* + * OR_CONN_HIGHWATER and OR_CONN_LOWWATER moved from connection_or.c so + * channeltls.c can see them too. + */ + +/** When adding cells to an OR connection's outbuf, keep adding until the + * outbuf is at least this long, or we run out of cells. */ +#define OR_CONN_HIGHWATER (32*1024) + +/** Add cells to an OR connection's outbuf whenever the outbuf's data length + * drops below this size. */ +#define OR_CONN_LOWWATER (16*1024)
/** Subtype of connection_t for an "OR connection" -- that is, one that speaks * cells over TLS. */ diff --git a/src/or/scheduler.c b/src/or/scheduler.c index e2dcdb5..7023eaa 100644 --- a/src/or/scheduler.c +++ b/src/or/scheduler.c @@ -324,6 +324,7 @@ void scheduler_run(void) { smartlist_t *tmp = NULL; + int n_cells;
log_debug(LD_SCHED, "We have a chance to run the scheduler");
@@ -337,9 +338,18 @@ scheduler_run(void) channels_pending = smartlist_new();
SMARTLIST_FOREACH_BEGIN(tmp, channel_t *, chan) { - log_debug(LD_SCHED, - "Scheduler saw pending channel " U64_FORMAT " at %p", - U64_PRINTF_ARG(chan->global_identifier), chan); + n_cells = channel_num_cells_writeable(chan); + if (n_cells > 0) { + log_debug(LD_SCHED, + "Scheduler saw pending channel " U64_FORMAT " at %p with " + "%d cells writeable", + U64_PRINTF_ARG(chan->global_identifier), chan, n_cells); + } else { + log_info(LD_SCHED, + "Scheduler saw pending channel " U64_FORMAT " at %p with " + "no cells writeable", + U64_PRINTF_ARG(chan->global_identifier), chan); + } } SMARTLIST_FOREACH_END(chan);
smartlist_free(tmp);