commit 7c9954a02abd16e5c74c2a5dea9ed0f65af230be Author: Nick Mathewson nickm@torproject.org Date: Fri Oct 12 17:58:01 2012 -0400
Use SIMPLEQ, not smartlist_t, for channel cell queues.
This lets us use fewer memory allocations, and avoid O(n^2) iterations --- src/or/channel.c | 158 ++++++++++++++++++++---------------------------------- src/or/channel.h | 9 +++- 2 files changed, 66 insertions(+), 101 deletions(-)
diff --git a/src/or/channel.c b/src/or/channel.c index 6527288..b594d05 100644 --- a/src/or/channel.c +++ b/src/or/channel.c @@ -31,6 +31,7 @@
typedef struct cell_queue_entry_s cell_queue_entry_t; struct cell_queue_entry_s { + SIMPLEQ_ENTRY(cell_queue_entry_s) next; enum { CELL_QUEUE_FIXED, CELL_QUEUE_VAR, @@ -768,6 +769,10 @@ channel_init(channel_t *chan) /* Init next_circ_id */ chan->next_circ_id = crypto_rand_int(1 << 15);
+ /* Initialize queues. */ + SIMPLEQ_INIT(&chan->incoming_queue); + SIMPLEQ_INIT(&chan->outgoing_queue); + /* Timestamp it */ channel_timestamp_created(chan); } @@ -861,6 +866,7 @@ channel_listener_free(channel_listener_t *chan_l) static void channel_force_free(channel_t *chan) { + cell_queue_entry_t *cell, *cell_tmp; tor_assert(chan);
/* Call a free method if there is one */ @@ -869,26 +875,16 @@ channel_force_free(channel_t *chan) channel_clear_remote_end(chan);
/* We might still have a cell queue; kill it */ - if (chan->incoming_queue) { - SMARTLIST_FOREACH_BEGIN(chan->incoming_queue, - cell_queue_entry_t *, q) { - cell_queue_entry_free(q, 0); - } SMARTLIST_FOREACH_END(q); - - smartlist_free(chan->incoming_queue); - chan->incoming_queue = NULL; + SIMPLEQ_FOREACH_SAFE(cell, &chan->incoming_queue, next, cell_tmp) { + cell_queue_entry_free(cell, 0); } + SIMPLEQ_INIT(&chan->incoming_queue);
/* Outgoing cell queue is similar, but we can have to free packed cells */ - if (chan->outgoing_queue) { - SMARTLIST_FOREACH_BEGIN(chan->outgoing_queue, - cell_queue_entry_t *, q) { - cell_queue_entry_free(q, 0); - } SMARTLIST_FOREACH_END(q); - - smartlist_free(chan->outgoing_queue); - chan->outgoing_queue = NULL; + SIMPLEQ_FOREACH_SAFE(cell, &chan->outgoing_queue, next, cell_tmp) { + cell_queue_entry_free(cell, 0); } + SIMPLEQ_INIT(&chan->outgoing_queue);
tor_free(chan); } @@ -1046,8 +1042,7 @@ channel_set_cell_handlers(channel_t *chan, chan->var_cell_handler = var_cell_handler;
/* Re-run the queue if we have one and there's any reason to */ - if (chan->incoming_queue && - (smartlist_len(chan->incoming_queue) > 0) && + if (! SIMPLEQ_EMPTY(&chan->incoming_queue) && try_again && (chan->cell_handler || chan->var_cell_handler)) channel_process_cells(chan); @@ -1669,9 +1664,8 @@ channel_write_cell_queue_entry(channel_t *chan, cell_queue_entry_t *q) }
/* Can we send it right out? If so, try */ - if (!(chan->outgoing_queue && - (smartlist_len(chan->outgoing_queue) > 0)) && - chan->state == CHANNEL_STATE_OPEN) { + if (SIMPLEQ_EMPTY(&chan->outgoing_queue) && + chan->state == CHANNEL_STATE_OPEN) { /* Pick the right write function for this cell type and save the result */ switch (q->type) { case CELL_QUEUE_FIXED: @@ -1707,14 +1701,12 @@ channel_write_cell_queue_entry(channel_t *chan, cell_queue_entry_t *q)
if (!sent) { /* Not sent, queue it */ - if (!(chan->outgoing_queue)) - chan->outgoing_queue = smartlist_new(); /* * We have to copy the queue entry passed in, since the caller probably * used the stack. */ tmp = cell_queue_entry_dup(q); - smartlist_add(chan->outgoing_queue, tmp); + SIMPLEQ_INSERT_TAIL(&chan->outgoing_queue, tmp, next); /* Try to process the queue? */ if (chan->state == CHANNEL_STATE_OPEN) channel_flush_cells(chan); } @@ -1900,19 +1892,15 @@ channel_change_state(channel_t *chan, channel_state_t to_state) channel_do_open_actions(chan);
/* Check for queued cells to process */ - if (chan->incoming_queue && - smartlist_len(chan->incoming_queue) > 0) + if (! SIMPLEQ_EMPTY(&chan->incoming_queue)) channel_process_cells(chan); - if (chan->outgoing_queue && - smartlist_len(chan->outgoing_queue) > 0) + if (! SIMPLEQ_EMPTY(&chan->outgoing_queue)) channel_flush_cells(chan); } else if (to_state == CHANNEL_STATE_CLOSED || to_state == CHANNEL_STATE_ERROR) { /* Assert that all queues are empty */ - tor_assert(!(chan->incoming_queue) || - smartlist_len(chan->incoming_queue) == 0); - tor_assert(!(chan->outgoing_queue) || - smartlist_len(chan->outgoing_queue) == 0); + tor_assert(SIMPLEQ_EMPTY(&chan->incoming_queue)); + tor_assert(SIMPLEQ_EMPTY(&chan->outgoing_queue)); } }
@@ -2086,16 +2074,9 @@ channel_flush_some_cells_from_outgoing_queue(channel_t *chan, /* If we aren't in CHANNEL_STATE_OPEN, nothing goes through */ if (chan->state == CHANNEL_STATE_OPEN) { while ((unlimited || num_cells > flushed) && - (chan->outgoing_queue && - (smartlist_len(chan->outgoing_queue) > 0))) { - /* - * Ewww, smartlist_del_keeporder() is O(n) in list length; maybe a - * a linked list would make more sense for the queue. - */ - - /* Get the head of the queue */ - q = smartlist_get(chan->outgoing_queue, 0); - if (q) { + NULL != (q = SIMPLEQ_FIRST(&chan->outgoing_queue))) { + + if (1) { /* * Okay, we have a good queue entry, try to give it to the lower * layer. @@ -2180,26 +2161,17 @@ channel_flush_some_cells_from_outgoing_queue(channel_t *chan, cell_queue_entry_free(q, 0); q = NULL; } - } else { - /* This shouldn't happen; log and throw it away */ - log_info(LD_CHANNEL, - "Saw a NULL entry in the outgoing cell queue on channel %p " - "(global ID " U64_FORMAT "); this is definitely a bug.", - chan, U64_PRINTF_ARG(chan->global_identifier)); - /* q is already NULL, so we know to delete that queue entry */ - }
- /* if q got NULLed out, we used it and should remove the queue entry */ - if (!q) smartlist_del_keeporder(chan->outgoing_queue, 0); - /* No cell removed from list, so we can't go on any further */ - else break; + /* if q got NULLed out, we used it and should remove the queue entry */ + if (!q) SIMPLEQ_REMOVE_HEAD(&chan->outgoing_queue, next); + /* No cell removed from list, so we can't go on any further */ + else break; + } } }
/* Did we drain the queue? */ - if (!(chan->outgoing_queue) || - smartlist_len(chan->outgoing_queue) == 0) { - /* Timestamp it */ + if (SIMPLEQ_EMPTY(&chan->outgoing_queue)) { channel_timestamp_drained(chan); }
@@ -2233,8 +2205,8 @@ channel_more_to_flush(channel_t *chan) tor_assert(chan);
/* Check if we have any queued */ - if (chan->incoming_queue && - smartlist_len(chan->incoming_queue) > 0) return 1; + if (! SIMPLEQ_EMPTY(&chan->incoming_queue)) + return 1;
/* Check if any circuits would like to queue some */ if (circuitmux_num_cells(chan->cmux) > 0) return 1; @@ -2427,8 +2399,7 @@ channel_listener_queue_incoming(channel_listener_t *listener, void channel_process_cells(channel_t *chan) { - smartlist_t *queue; - int putback = 0; + cell_queue_entry_t *q; tor_assert(chan); tor_assert(chan->state == CHANNEL_STATE_CLOSING || chan->state == CHANNEL_STATE_MAINT || @@ -2442,24 +2413,21 @@ channel_process_cells(channel_t *chan) if (!(chan->cell_handler || chan->var_cell_handler)) return; /* Nothing we can do if we have no cells */ - if (!(chan->incoming_queue)) return; + if (SIMPLEQ_EMPTY(&chan->incoming_queue)) return;
- queue = chan->incoming_queue; - chan->incoming_queue = NULL; /* * Process cells until we're done or find one we have no current handler * for. */ - SMARTLIST_FOREACH_BEGIN(queue, cell_queue_entry_t *, q) { + while (NULL != (q = SIMPLEQ_FIRST(&chan->incoming_queue))) { tor_assert(q); tor_assert(q->type == CELL_QUEUE_FIXED || q->type == CELL_QUEUE_VAR);
- if (putback) { - smartlist_add(chan->incoming_queue, q); - } else if (q->type == CELL_QUEUE_FIXED && + if (q->type == CELL_QUEUE_FIXED && chan->cell_handler) { /* Handle a fixed-length cell */ + SIMPLEQ_REMOVE_HEAD(&chan->incoming_queue, next); tor_assert(q->u.fixed.cell); log_debug(LD_CHANNEL, "Processing incoming cell_t %p for channel %p (global ID " @@ -2471,6 +2439,7 @@ channel_process_cells(channel_t *chan) } else if (q->type == CELL_QUEUE_VAR && chan->var_cell_handler) { /* Handle a variable-length cell */ + SIMPLEQ_REMOVE_HEAD(&chan->incoming_queue, next); tor_assert(q->u.var.var_cell); log_debug(LD_CHANNEL, "Processing incoming var_cell_t %p for channel %p (global ID " @@ -2481,15 +2450,10 @@ channel_process_cells(channel_t *chan) tor_free(q); } else { /* Can't handle this one */ - if (!chan->incoming_queue) - chan->incoming_queue = smartlist_new(); - smartlist_add(chan->incoming_queue, q); - putback = 1; + break; } - } SMARTLIST_FOREACH_END(q); + }
- /* If the list is empty, free it */ - smartlist_free(queue); }
/** @@ -2511,15 +2475,9 @@ channel_queue_cell(channel_t *chan, cell_t *cell)
/* Do we need to queue it, or can we just call the handler right away? */ if (!(chan->cell_handler)) need_to_queue = 1; - if (chan->incoming_queue && - (smartlist_len(chan->incoming_queue) > 0)) + if (! SIMPLEQ_EMPTY(&chan->incoming_queue)) need_to_queue = 1;
- /* If we need to queue and have no queue, create one */ - if (need_to_queue && !(chan->incoming_queue)) { - chan->incoming_queue = smartlist_new(); - } - /* Timestamp for receiving */ channel_timestamp_recv(chan);
@@ -2537,14 +2495,13 @@ channel_queue_cell(channel_t *chan, cell_t *cell) chan->cell_handler(chan, cell); } else { /* Otherwise queue it and then process the queue if possible. */ - tor_assert(chan->incoming_queue); q = cell_queue_entry_new_fixed(cell); log_debug(LD_CHANNEL, "Queueing incoming cell_t %p for channel %p " "(global ID " U64_FORMAT ")", cell, chan, U64_PRINTF_ARG(chan->global_identifier)); - smartlist_add(chan->incoming_queue, q); + SIMPLEQ_INSERT_TAIL(&chan->incoming_queue, q, next); if (chan->cell_handler || chan->var_cell_handler) { channel_process_cells(chan); @@ -2571,15 +2528,9 @@ channel_queue_var_cell(channel_t *chan, var_cell_t *var_cell)
/* Do we need to queue it, or can we just call the handler right away? */ if (!(chan->var_cell_handler)) need_to_queue = 1; - if (chan->incoming_queue && - (smartlist_len(chan->incoming_queue) > 0)) + if (! SIMPLEQ_EMPTY(&chan->incoming_queue)) need_to_queue = 1;
- /* If we need to queue and have no queue, create one */ - if (need_to_queue && !(chan->incoming_queue)) { - chan->incoming_queue = smartlist_new(); - } - /* Timestamp for receiving */ channel_timestamp_recv(chan);
@@ -2597,14 +2548,13 @@ channel_queue_var_cell(channel_t *chan, var_cell_t *var_cell) chan->var_cell_handler(chan, var_cell); } else { /* Otherwise queue it and then process the queue if possible. */ - tor_assert(chan->incoming_queue); q = cell_queue_entry_new_var(var_cell); log_debug(LD_CHANNEL, "Queueing incoming var_cell_t %p for channel %p " "(global ID " U64_FORMAT ")", var_cell, chan, U64_PRINTF_ARG(chan->global_identifier)); - smartlist_add(chan->incoming_queue, q); + SIMPLEQ_INSERT_TAIL(&chan->incoming_queue, q, next); if (chan->cell_handler || chan->var_cell_handler) { channel_process_cells(chan); @@ -3134,6 +3084,19 @@ channel_listener_describe_transport(channel_listener_t *chan_l) }
/** + * Return the number of entries in <b>queue</b> + */ +static int +chan_cell_queue_len(const chan_cell_queue_t *queue) +{ + int r = 0; + cell_queue_entry_t *cell; + SIMPLEQ_FOREACH(cell, queue, next) + ++r; + return r; +} + +/** * Dump channel statistics * * Dump statistics for one channel to the log @@ -3246,10 +3209,8 @@ channel_dump_statistics(channel_t *chan, int severity) " * Channel " U64_FORMAT " has %d queued incoming cells" " and %d queued outgoing cells", U64_PRINTF_ARG(chan->global_identifier), - (chan->incoming_queue != NULL) ? - smartlist_len(chan->incoming_queue) : 0, - (chan->outgoing_queue != NULL) ? - smartlist_len(chan->outgoing_queue) : 0); + chan_cell_queue_len(&chan->incoming_queue), + chan_cell_queue_len(&chan->outgoing_queue));
/* Describe circuits */ log(severity, LD_GENERAL, @@ -3498,8 +3459,7 @@ channel_has_queued_writes(channel_t *chan) tor_assert(chan); tor_assert(chan->has_queued_writes);
- if (chan->outgoing_queue && - smartlist_len(chan->outgoing_queue) > 0) { + if (! SIMPLEQ_EMPTY(&chan->outgoing_queue)) { has_writes = 1; } else { /* Check with the lower layer */ diff --git a/src/or/channel.h b/src/or/channel.h index cb9835a..ccb8fe8 100644 --- a/src/or/channel.h +++ b/src/or/channel.h @@ -10,6 +10,7 @@ #define _TOR_CHANNEL_H
#include "or.h" +#include "tor_queue.h" #include "circuitmux.h"
/* Channel handler function pointer typedefs */ @@ -17,6 +18,10 @@ typedef void (*channel_listener_fn_ptr)(channel_listener_t *, channel_t *); typedef void (*channel_cell_handler_fn_ptr)(channel_t *, cell_t *); typedef void (*channel_var_cell_handler_fn_ptr)(channel_t *, var_cell_t *);
+struct cell_queue_entry_s; +SIMPLEQ_HEAD(chan_cell_queue, cell_queue_entry_s) incoming_queue; +typedef struct chan_cell_queue chan_cell_queue_t; + /* * Channel struct; see the channel_t typedef in or.h. A channel is an * abstract interface for the OR-to-OR connection, similar to connection_or_t, @@ -120,10 +125,10 @@ struct channel_s { channel_t *next_with_same_id, *prev_with_same_id;
/* List of incoming cells to handle */ - smartlist_t *incoming_queue; + chan_cell_queue_t incoming_queue;
/* List of queued outgoing cells */ - smartlist_t *outgoing_queue; + chan_cell_queue_t outgoing_queue;
/* Circuit mux for circuits sending on this channel */ circuitmux_t *cmux;