commit dde358667d386d2c7b28866b029effa062ab9b6d Author: Matt Traudt sirmatt@ksu.edu Date: Tue Jul 11 12:47:37 2017 -0400
sched: Implement the KIST scheduler
Closes #12541
Signed-off-by: David Goulet dgoulet@torproject.org --- src/or/channel.c | 4 - src/or/config.c | 11 +- src/or/scheduler.c | 639 +++++++++++++++++---------------------------- src/or/scheduler.h | 185 ++++++++++--- src/or/scheduler_kist.c | 585 +++++++++++++++++++++++++++++++++++++++++ src/or/scheduler_vanilla.c | 186 +++++++++++++ src/test/test_scheduler.c | 625 ++++++++++++++++++++++++++++++-------------- 7 files changed, 1591 insertions(+), 644 deletions(-)
diff --git a/src/or/channel.c b/src/or/channel.c index d64a0347a..56c54c0c5 100644 --- a/src/or/channel.c +++ b/src/or/channel.c @@ -4826,8 +4826,6 @@ channel_update_xmit_queue_size(channel_t *chan) U64_FORMAT ", new size is " U64_FORMAT, U64_PRINTF_ARG(adj), U64_PRINTF_ARG(chan->global_identifier), U64_PRINTF_ARG(estimated_total_queue_size)); - /* Tell the scheduler we're increasing the queue size */ - scheduler_adjust_queue_size(chan, 1, adj); } } else if (queued < chan->bytes_queued_for_xmit) { adj = chan->bytes_queued_for_xmit - queued; @@ -4850,8 +4848,6 @@ channel_update_xmit_queue_size(channel_t *chan) U64_FORMAT ", new size is " U64_FORMAT, U64_PRINTF_ARG(adj), U64_PRINTF_ARG(chan->global_identifier), U64_PRINTF_ARG(estimated_total_queue_size)); - /* Tell the scheduler we're decreasing the queue size */ - scheduler_adjust_queue_size(chan, -1, adj); } } } diff --git a/src/or/config.c b/src/or/config.c index a7bc23af5..285d4952f 100644 --- a/src/or/config.c +++ b/src/or/config.c @@ -1813,14 +1813,9 @@ options_act(const or_options_t *old_options) return -1; }
- /* XXXFORTOR remove set_watermarks */ - /* Set up scheduler thresholds */ - scheduler_set_watermarks(100 * 1024*1024 /* 100 MB */, - 101 * 1024*1024 /* 101 MB */, - 100); - - /* XXXFORTOR enable notification to sched that the conf might have changed */ - //scheduler_conf_changed(); + /* Inform the scheduler subsystem that a configuration changed happened. It + * might be a change of scheduler or parameter. */ + scheduler_conf_changed();
/* Set up accounting */ if (accounting_parse_options(options, 0)<0) { diff --git a/src/or/scheduler.c b/src/or/scheduler.c index eb31bc215..b04bdceb4 100644 --- a/src/or/scheduler.c +++ b/src/or/scheduler.c @@ -2,9 +2,7 @@ /* See LICENSE for licensing information */
#include "or.h" - -#define TOR_CHANNEL_INTERNAL_ /* For channel_flush_some_cells() */ -#include "channel.h" +#include "config.h"
#include "compat_libevent.h" #define SCHEDULER_PRIVATE_ @@ -12,35 +10,41 @@
#include <event2/event.h>
-/* - * Scheduler high/low watermarks - */ - -static uint32_t sched_q_low_water = 16384; -static uint32_t sched_q_high_water = 32768; - -/* - * Maximum cells to flush in a single call to channel_flush_some_cells(); - * setting this low means more calls, but too high and we could overshoot - * sched_q_high_water. - */ - -static uint32_t sched_max_flush_cells = 16; - /** * \file scheduler.c * \brief Channel scheduling system: decides which channels should send and * receive when. * - * This module implements a scheduler algorithm, to decide - * which channels should send/receive when. + * This module is the global/common parts of the scheduling system. This system + * is what decides what channels get to send cells on their circuits and when. + * + * Terms: + * - "Scheduling system": the collection of scheduler*.{h,c} files and their + * aggregate behavior. + * - "Scheduler implementation": a scheduler_t. The scheduling system has one + * active scheduling implementation at a time. + * + * In this file you will find state that any scheduler implmentation can have + * access to as well as the functions the rest of Tor uses to interact with the + * scheduling system. * * The earliest versions of Tor approximated a kind of round-robin system - * among active connections, but only approximated it. + * among active connections, but only approximated it. It would only consider + * one connection (roughly equal to a channel in today's terms) at a time, and + * thus could only prioritize circuits against others on the same connection. + * + * Then in response to the KIST paper[0], Tor implemented a global + * circuit scheduler. It was supposed to prioritize circuits across man + * channels, but wasn't effective. It is preserved in scheduler_vanilla.c. + * + * [0]: http://www.robgjansen.com/publications/kist-sec2014.pdf * - * Now, write scheduling works by keeping track of which channels can - * accept cells, and have cells to write. From the scheduler's perspective, - * a channel can be in four possible states: + * Then we actually got around to implementing KIST for real. We decided to + * modularize the scheduler so new ones can be implemented. You can find KIST + * in scheduler_kist.c. + * + * Channels have one of four scheduling states based on whether or not they + * have cells to send and whether or not they are able to send. * * <ol> * <li> @@ -125,85 +129,108 @@ static uint32_t sched_max_flush_cells = 16; * </ol> * * Other event-driven parts of the code move channels between these scheduling - * states by calling scheduler functions; the scheduler only runs on open-for- - * writes/has-cells channels and is the only path for those to transition to - * other states. The scheduler_run() function gives us the opportunity to do - * scheduling work, and is called from other scheduler functions whenever a - * state transition occurs, and periodically from the main event loop. + * states by calling scheduler functions. The scheduling system builds up a + * list of channels in the SCHED_CHAN_PENDING state that the scheduler + * implementation should then use when it runs. Scheduling implementations need + * to properly update channel states during their scheduler_t->run() function + * as that is the only opportunity for channels to move from SCHED_CHAN_PENDING + * to any other state. + * + * The remainder of this file is a small amount of state that any scheduler + * implementation should have access to, and the functions the rest of Tor uses + * to interact with the scheduling system. */
-/* Scheduler global data structures */ +/***************************************************************************** + * Scheduling system state + * + * State that can be accessed from any scheduler implementation (but not + * outside the scheduling system) + *****************************************************************************/ + +STATIC scheduler_t *scheduler;
/* * We keep a list of channels that are pending - i.e, have cells to write - * and can accept them to send. The enum scheduler_state in channel_t + * and can accept them to send. The enum scheduler_state in channel_t * is reserved for our use. + * + * Priority queue of channels that can write and have cells (pending work) */ - -/* Pqueue of channels that can write and have cells (pending work) */ STATIC smartlist_t *channels_pending = NULL;
/* * This event runs the scheduler from its callback, and is manually * activated whenever a channel enters open for writes/cells to send. */ - STATIC struct event *run_sched_ev = NULL;
-/* - * Queue heuristic; this is not the queue size, but an 'effective queuesize' - * that ages out contributions from stalled channels. - */ - -STATIC uint64_t queue_heuristic = 0; +/***************************************************************************** + * Scheduling system static function definitions + * + * Functions that can only be accessed from this file. + *****************************************************************************/
/* - * Timestamp for last queue heuristic update + * Scheduler event callback; this should get triggered once per event loop + * if any scheduling work was created during the event loop. */ +static void +scheduler_evt_callback(evutil_socket_t fd, short events, void *arg) +{ + (void) fd; + (void) events; + (void) arg;
-STATIC time_t queue_heuristic_timestamp = 0; + log_debug(LD_SCHED, "Scheduler event callback called");
-/* Scheduler static function declarations */ + tor_assert(run_sched_ev);
-static void scheduler_evt_callback(evutil_socket_t fd, - short events, void *arg); -static int scheduler_more_work(void); -static void scheduler_retrigger(void); -#if 0 -static void scheduler_trigger(void); -#endif + /* Run the scheduler. This is a mandatory function. */ + tor_assert(scheduler->run); + scheduler->run();
-/* Scheduler function implementations */ + /* Schedule itself back in if it has more work. */ + tor_assert(scheduler->schedule); + scheduler->schedule(); +}
-/** Free everything and shut down the scheduling system */ +/***************************************************************************** + * Scheduling system private function definitions + * + * Functions that can only be accessed from scheduler*.c + *****************************************************************************/
-void -scheduler_free_all(void) +/* Return the pending channel list. */ +smartlist_t * +get_channels_pending(void) { - log_debug(LD_SCHED, "Shutting down scheduler"); - - if (run_sched_ev) { - if (event_del(run_sched_ev) < 0) { - log_warn(LD_BUG, "Problem deleting run_sched_ev"); - } - tor_event_free(run_sched_ev); - run_sched_ev = NULL; - } + return channels_pending; +}
- if (channels_pending) { - smartlist_free(channels_pending); - channels_pending = NULL; - } +/* Return our libevent scheduler event. */ +struct event * +get_run_sched_ev(void) +{ + return run_sched_ev; }
-/** - * Comparison function to use when sorting pending channels - */ +/* Return true iff the scheduler subsystem should use KIST. */ +int +scheduler_should_use_kist(void) +{ + int64_t run_freq = kist_scheduler_run_interval(); + log_info(LD_SCHED, "Determined sched_run_interval should be %" PRId64 ". " + "Will%s use KIST.", + run_freq, (run_freq > 0 ? "" : " not")); + return run_freq > 0; +}
-MOCK_IMPL(STATIC int, +/* Comparison function to use when sorting pending channels */ +MOCK_IMPL(int, scheduler_compare_channels, (const void *c1_v, const void *c2_v)) { - channel_t *c1 = NULL, *c2 = NULL; + const channel_t *c1 = NULL, *c2 = NULL; /* These are a workaround for -Wbad-function-cast throwing a fit */ const circuitmux_policy_t *p1, *p2; uintptr_t p1_i, p2_i; @@ -211,8 +238,8 @@ scheduler_compare_channels, (const void *c1_v, const void *c2_v)) tor_assert(c1_v); tor_assert(c2_v);
- c1 = (channel_t *)(c1_v); - c2 = (channel_t *)(c2_v); + c1 = (const channel_t *)(c1_v); + c2 = (const channel_t *)(c2_v);
tor_assert(c1); tor_assert(c2); @@ -242,26 +269,109 @@ scheduler_compare_channels, (const void *c1_v, const void *c2_v)) } }
+/***************************************************************************** + * Scheduling system global functions + * + * Functions that can be accessed from anywhere in Tor. + *****************************************************************************/ + /* - * Scheduler event callback; this should get triggered once per event loop - * if any scheduling work was created during the event loop. + * Little helper function called from a few different places. It changes the + * scheduler implementation, if necessary. And if it did, it then tells the + * old one to free its state and the new one to initialize. */ - static void -scheduler_evt_callback(evutil_socket_t fd, short events, void *arg) +set_scheduler(void) { - (void)fd; - (void)events; - (void)arg; - log_debug(LD_SCHED, "Scheduler event callback called"); + int have_kist = 0;
- tor_assert(run_sched_ev); + /* Switch, if needed */ + scheduler_t *old_scheduler = scheduler; + if (scheduler_should_use_kist()) { + scheduler = get_kist_scheduler(); + have_kist = 1; + } else { + scheduler = get_vanilla_scheduler(); + } + tor_assert(scheduler); + + if (old_scheduler != scheduler) { + /* Allow the old scheduler to clean up, if needed. */ + if (old_scheduler && old_scheduler->free_all) { + old_scheduler->free_all(); + } + /* We don't clean up the old one, we keep any type of scheduler we've + * allocated so we can do an easy switch back. */ + + /* Initialize the new scheduler. */ + if (scheduler->init) { + scheduler->init(); + } + log_notice(LD_CONFIG, "Using the %s scheduler.", + have_kist ? "KIST" : "vanilla"); + } +} + +/* + * This is how the scheduling system is notified of Tor's configuration + * changing. For example: a SIGHUP was issued. + */ +void +scheduler_conf_changed(void) +{ + /* Let the scheduler decide what it should do. */ + set_scheduler(); + + /* Then tell the (possibly new) scheduler that we have new options. */ + if (scheduler->on_new_options) { + scheduler->on_new_options(); + } +} + +/* + * Whenever we get a new consensus, this function is called. + */ +void +scheduler_notify_networkstatus_changed(const networkstatus_t *old_c, + const networkstatus_t *new_c) +{ + /* Then tell the (possibly new) scheduler that we have a new consensus */ + if (scheduler->on_new_consensus) { + scheduler->on_new_consensus(old_c, new_c); + } + /* Maybe the consensus param made us change the scheduler. */ + set_scheduler(); +} + +/* + * Free everything scheduling-related from main.c. Note this is only called + * when Tor is shutting down, while scheduler_t->free_all() is called both when + * Tor is shutting down and when we are switching schedulers. + */ +void +scheduler_free_all(void) +{ + log_debug(LD_SCHED, "Shutting down scheduler"); + + if (run_sched_ev) { + if (event_del(run_sched_ev) < 0) { + log_warn(LD_BUG, "Problem deleting run_sched_ev"); + } + tor_event_free(run_sched_ev); + run_sched_ev = NULL; + }
- /* Run the scheduler */ - scheduler_run(); + if (channels_pending) { + /* We don't have ownership of the object in this list. */ + smartlist_free(channels_pending); + channels_pending = NULL; + }
- /* Do we have more work to do? */ - if (scheduler_more_work()) scheduler_retrigger(); + if (scheduler && scheduler->free_all) { + scheduler->free_all(); + } + tor_free(scheduler); + scheduler = NULL; }
/** Mark a channel as no longer ready to accept writes */ @@ -309,8 +419,6 @@ scheduler_channel_doesnt_want_writes,(channel_t *chan)) MOCK_IMPL(void, scheduler_channel_has_waiting_cells,(channel_t *chan)) { - int became_pending = 0; - tor_assert(chan); tor_assert(channels_pending);
@@ -330,7 +438,9 @@ scheduler_channel_has_waiting_cells,(channel_t *chan)) "Channel " U64_FORMAT " at %p went from waiting_for_cells " "to pending", U64_PRINTF_ARG(chan->global_identifier), chan); - became_pending = 1; + /* If we made a channel pending, we potentially have scheduling work to + * do. */ + scheduler->schedule(); } else { /* * It's not in waiting_for_cells, so it can't become pending; it's @@ -345,16 +455,13 @@ scheduler_channel_has_waiting_cells,(channel_t *chan)) U64_PRINTF_ARG(chan->global_identifier), chan); } } - - /* - * If we made a channel pending, we potentially have scheduling work - * to do. - */ - if (became_pending) scheduler_retrigger(); }
-/** Set up the scheduling system */ - +/* + * Initialize everything scheduling-related from config.c. Note this is only + * called when Tor is starting up, while scheduler_t->init() is called both + * when Tor is starting up and when we are switching schedulers. + */ void scheduler_init(void) { @@ -363,34 +470,17 @@ scheduler_init(void) tor_assert(!run_sched_ev); run_sched_ev = tor_event_new(tor_libevent_get_base(), -1, 0, scheduler_evt_callback, NULL); - channels_pending = smartlist_new(); - queue_heuristic = 0; - queue_heuristic_timestamp = approx_time(); -}
-/** Check if there's more scheduling work */ - -static int -scheduler_more_work(void) -{ - tor_assert(channels_pending); - - return ((scheduler_get_queue_heuristic() < sched_q_low_water) && - ((smartlist_len(channels_pending) > 0))) ? 1 : 0; + set_scheduler(); }
-/** Retrigger the scheduler in a way safe to use from the callback */ - -static void -scheduler_retrigger(void) -{ - tor_assert(run_sched_ev); - event_active(run_sched_ev, EV_TIMEOUT, 1); -} - -/** Notify the scheduler of a channel being closed */ - +/* + * If a channel is going away, this is how the scheduling system is informed + * so it can do any freeing necessary. This ultimately calls + * scheduler_t->on_channel_free() so the current scheduler can release any + * state specific to this channel. + */ MOCK_IMPL(void, scheduler_release_channel,(channel_t *chan)) { @@ -398,179 +488,29 @@ scheduler_release_channel,(channel_t *chan)) tor_assert(channels_pending);
if (chan->scheduler_state == SCHED_CHAN_PENDING) { - smartlist_pqueue_remove(channels_pending, - scheduler_compare_channels, - offsetof(channel_t, sched_heap_idx), - chan); - } - - chan->scheduler_state = SCHED_CHAN_IDLE; -} - -/** Run the scheduling algorithm if necessary */ - -MOCK_IMPL(void, -scheduler_run, (void)) -{ - int n_cells, n_chans_before, n_chans_after; - uint64_t q_len_before, q_heur_before, q_len_after, q_heur_after; - ssize_t flushed, flushed_this_time; - smartlist_t *to_readd = NULL; - channel_t *chan = NULL; - - log_debug(LD_SCHED, "We have a chance to run the scheduler"); - - if (scheduler_get_queue_heuristic() < sched_q_low_water) { - n_chans_before = smartlist_len(channels_pending); - q_len_before = channel_get_global_queue_estimate(); - q_heur_before = scheduler_get_queue_heuristic(); - - while (scheduler_get_queue_heuristic() <= sched_q_high_water && - smartlist_len(channels_pending) > 0) { - /* Pop off a channel */ - chan = smartlist_pqueue_pop(channels_pending, - scheduler_compare_channels, - offsetof(channel_t, sched_heap_idx)); - tor_assert(chan); - - /* Figure out how many cells we can write */ - n_cells = channel_num_cells_writeable(chan); - if (n_cells > 0) { - log_debug(LD_SCHED, - "Scheduler saw pending channel " U64_FORMAT " at %p with " - "%d cells writeable", - U64_PRINTF_ARG(chan->global_identifier), chan, n_cells); - - flushed = 0; - while (flushed < n_cells && - scheduler_get_queue_heuristic() <= sched_q_high_water) { - flushed_this_time = - channel_flush_some_cells(chan, - MIN(sched_max_flush_cells, - (size_t) n_cells - flushed)); - if (flushed_this_time <= 0) break; - flushed += flushed_this_time; - } - - if (flushed < n_cells) { - /* We ran out of cells to flush */ - chan->scheduler_state = SCHED_CHAN_WAITING_FOR_CELLS; - log_debug(LD_SCHED, - "Channel " U64_FORMAT " at %p " - "entered waiting_for_cells from pending", - U64_PRINTF_ARG(chan->global_identifier), - chan); - } else { - /* The channel may still have some cells */ - if (channel_more_to_flush(chan)) { - /* The channel goes to either pending or waiting_to_write */ - if (channel_num_cells_writeable(chan) > 0) { - /* Add it back to pending later */ - if (!to_readd) to_readd = smartlist_new(); - smartlist_add(to_readd, chan); - log_debug(LD_SCHED, - "Channel " U64_FORMAT " at %p " - "is still pending", - U64_PRINTF_ARG(chan->global_identifier), - chan); - } else { - /* It's waiting to be able to write more */ - chan->scheduler_state = SCHED_CHAN_WAITING_TO_WRITE; - log_debug(LD_SCHED, - "Channel " U64_FORMAT " at %p " - "entered waiting_to_write from pending", - U64_PRINTF_ARG(chan->global_identifier), - chan); - } - } else { - /* No cells left; it can go to idle or waiting_for_cells */ - if (channel_num_cells_writeable(chan) > 0) { - /* - * It can still accept writes, so it goes to - * waiting_for_cells - */ - chan->scheduler_state = SCHED_CHAN_WAITING_FOR_CELLS; - log_debug(LD_SCHED, - "Channel " U64_FORMAT " at %p " - "entered waiting_for_cells from pending", - U64_PRINTF_ARG(chan->global_identifier), - chan); - } else { - /* - * We exactly filled up the output queue with all available - * cells; go to idle. - */ - chan->scheduler_state = SCHED_CHAN_IDLE; - log_debug(LD_SCHED, - "Channel " U64_FORMAT " at %p " - "become idle from pending", - U64_PRINTF_ARG(chan->global_identifier), - chan); - } - } - } - - log_debug(LD_SCHED, - "Scheduler flushed %d cells onto pending channel " - U64_FORMAT " at %p", - (int)flushed, U64_PRINTF_ARG(chan->global_identifier), - chan); - } else { - log_info(LD_SCHED, - "Scheduler saw pending channel " U64_FORMAT " at %p with " - "no cells writeable", - U64_PRINTF_ARG(chan->global_identifier), chan); - /* Put it back to WAITING_TO_WRITE */ - chan->scheduler_state = SCHED_CHAN_WAITING_TO_WRITE; - } + if (smartlist_pos(channels_pending, chan) == -1) { + log_warn(LD_SCHED, "Scheduler asked to release channel %" PRIu64 " " + "but it wasn't in channels_pending", + chan->global_identifier); + } else { + smartlist_pqueue_remove(channels_pending, + scheduler_compare_channels, + offsetof(channel_t, sched_heap_idx), + chan); } - - /* Readd any channels we need to */ - if (to_readd) { - SMARTLIST_FOREACH_BEGIN(to_readd, channel_t *, readd_chan) { - readd_chan->scheduler_state = SCHED_CHAN_PENDING; - smartlist_pqueue_add(channels_pending, - scheduler_compare_channels, - offsetof(channel_t, sched_heap_idx), - readd_chan); - } SMARTLIST_FOREACH_END(readd_chan); - smartlist_free(to_readd); + if (scheduler->on_channel_free) { + scheduler->on_channel_free(chan); } - - n_chans_after = smartlist_len(channels_pending); - q_len_after = channel_get_global_queue_estimate(); - q_heur_after = scheduler_get_queue_heuristic(); - log_debug(LD_SCHED, - "Scheduler handled %d of %d pending channels, queue size from " - U64_FORMAT " to " U64_FORMAT ", queue heuristic from " - U64_FORMAT " to " U64_FORMAT, - n_chans_before - n_chans_after, n_chans_before, - U64_PRINTF_ARG(q_len_before), U64_PRINTF_ARG(q_len_after), - U64_PRINTF_ARG(q_heur_before), U64_PRINTF_ARG(q_heur_after)); } -} - -/** Trigger the scheduling event so we run the scheduler later */ - -#if 0 -static void -scheduler_trigger(void) -{ - log_debug(LD_SCHED, "Triggering scheduler event"); - - tor_assert(run_sched_ev);
- event_add(run_sched_ev, EV_TIMEOUT, 1); + chan->scheduler_state = SCHED_CHAN_IDLE; } -#endif
/** Mark a channel as ready to accept writes */
void scheduler_channel_wants_writes(channel_t *chan) { - int became_pending = 0; - tor_assert(chan); tor_assert(channels_pending);
@@ -579,6 +519,8 @@ scheduler_channel_wants_writes(channel_t *chan) /* * It can write now, so it goes to channels_pending. */ + log_debug(LD_SCHED, "chan=%" PRIu64 " became pending", + chan->global_identifier); smartlist_pqueue_add(channels_pending, scheduler_compare_channels, offsetof(channel_t, sched_heap_idx), @@ -588,7 +530,8 @@ scheduler_channel_wants_writes(channel_t *chan) "Channel " U64_FORMAT " at %p went from waiting_to_write " "to pending", U64_PRINTF_ARG(chan->global_identifier), chan); - became_pending = 1; + /* We just made a channel pending, we have scheduling work to do. */ + scheduler->schedule(); } else { /* * It's not in SCHED_CHAN_WAITING_TO_WRITE, so it can't become pending; @@ -602,19 +545,13 @@ scheduler_channel_wants_writes(channel_t *chan) U64_PRINTF_ARG(chan->global_identifier), chan); } } - - /* - * If we made a channel pending, we potentially have scheduling work - * to do. - */ - if (became_pending) scheduler_retrigger(); }
-/** - * Notify the scheduler that a channel's position in the pqueue may have - * changed - */ +#ifdef TOR_UNIT_TESTS
+/* + * Notify scheduler that a channel's queue position may have changed. + */ void scheduler_touch_channel(channel_t *chan) { @@ -634,115 +571,5 @@ scheduler_touch_channel(channel_t *chan) /* else no-op, since it isn't in the queue */ }
-/** - * Notify the scheduler of a queue size adjustment, to recalculate the - * queue heuristic. - */ - -void -scheduler_adjust_queue_size(channel_t *chan, int dir, uint64_t adj) -{ - time_t now = approx_time(); - - log_debug(LD_SCHED, - "Queue size adjustment by %s" U64_FORMAT " for channel " - U64_FORMAT, - (dir >= 0) ? "+" : "-", - U64_PRINTF_ARG(adj), - U64_PRINTF_ARG(chan->global_identifier)); - - /* Get the queue heuristic up to date */ - scheduler_update_queue_heuristic(now); - - /* Adjust as appropriate */ - if (dir >= 0) { - /* Increasing it */ - queue_heuristic += adj; - } else { - /* Decreasing it */ - if (queue_heuristic > adj) queue_heuristic -= adj; - else queue_heuristic = 0; - } - - log_debug(LD_SCHED, - "Queue heuristic is now " U64_FORMAT, - U64_PRINTF_ARG(queue_heuristic)); -} - -/** - * Query the current value of the queue heuristic - */ - -STATIC uint64_t -scheduler_get_queue_heuristic(void) -{ - time_t now = approx_time(); - - scheduler_update_queue_heuristic(now); - - return queue_heuristic; -} - -/** - * Adjust the queue heuristic value to the present time - */ - -STATIC void -scheduler_update_queue_heuristic(time_t now) -{ - time_t diff; - - if (queue_heuristic_timestamp == 0) { - /* - * Nothing we can sensibly do; must not have been initted properly. - * Oh well. - */ - queue_heuristic_timestamp = now; - } else if (queue_heuristic_timestamp < now) { - diff = now - queue_heuristic_timestamp; - /* - * This is a simple exponential age-out; the other proposed alternative - * was a linear age-out using the bandwidth history in rephist.c; I'm - * going with this out of concern that if an adversary can jam the - * scheduler long enough, it would cause the bandwidth to drop to - * zero and render the aging mechanism ineffective thereafter. - */ - if (0 <= diff && diff < 64) queue_heuristic >>= diff; - else queue_heuristic = 0; - - queue_heuristic_timestamp = now; - - log_debug(LD_SCHED, - "Queue heuristic is now " U64_FORMAT, - U64_PRINTF_ARG(queue_heuristic)); - } - /* else no update needed, or time went backward */ -} - -/** - * Set scheduler watermarks and flush size - */ - -void -scheduler_set_watermarks(uint32_t lo, uint32_t hi, uint32_t max_flush) -{ - /* Sanity assertions - caller should ensure these are true */ - tor_assert(lo > 0); - tor_assert(hi > lo); - tor_assert(max_flush > 0); - - sched_q_low_water = lo; - sched_q_high_water = hi; - sched_max_flush_cells = max_flush; -} - -/* XXXFORTOR Temp def of this func to get this commit to compile. Replace with - * real func */ -void -scheduler_notify_networkstatus_changed(const networkstatus_t *old_c, - const networkstatus_t *new_c) -{ - (void) old_c; - (void) new_c; -} +#endif /* TOR_UNIT_TESTS */
diff --git a/src/or/scheduler.h b/src/or/scheduler.h index 699ccde7a..3932e6049 100644 --- a/src/or/scheduler.h +++ b/src/or/scheduler.h @@ -1,9 +1,9 @@ -/* * Copyright (c) 2013-2017, The Tor Project, Inc. */ +/* * Copyright (c) 2017, The Tor Project, Inc. */ /* See LICENSE for licensing information */
/** * \file scheduler.h - * \brief Header file for scheduler.c + * \brief Header file for scheduler*.c **/
#ifndef TOR_SCHEDULER_H @@ -13,50 +13,163 @@ #include "channel.h" #include "testsupport.h"
-/* Global-visibility scheduler functions */ +/* + * A scheduler implementation is a collection of function pointers. If you + * would like to add a new scheduler called foo, create scheduler_foo.c, + * implement at least the mandatory ones, and implement get_foo_scheduler() + * that returns a complete scheduler_t for your foo scheduler. See + * scheduler_kist.c for an example. + * + * These function pointers SHOULD NOT be used anywhere outside of the + * scheduling source files. The rest of Tor should communicate with the + * scheduling system through the functions near the bottom of this file, and + * those functions will call into the current scheduler implementation as + * necessary. + * + * If your scheduler doesn't need to implement something (for example: it + * doesn't create any state for itself, thus it has nothing to free when Tor + * is shutting down), then set that function pointer to NULL. + */ +typedef struct scheduler_s { + /* (Optional) To be called when we want to prepare a scheduler for use. + * Perhaps Tor just started and we are the lucky chosen scheduler, or + * perhaps Tor is switching to this scheduler. No matter the case, this is + * where we would prepare any state and initialize parameters. You might + * think of this as the opposite of free_all(). */ + void (*init)(void); + + /* (Optional) To be called when we want to tell the scheduler to delete all + * of its state (if any). Perhaps Tor is shutting down or perhaps we are + * switching schedulers. */ + void (*free_all)(void); + + /* (Mandatory) Libevent controls the main event loop in Tor, and this is + * where we register with libevent the next execution of run_sched_ev [which + * ultimately calls run()]. */ + void (*schedule)(void); + + /* (Mandatory) This is the heart of a scheduler! This is where the + * excitement happens! Here libevent has given us the chance to execute, and + * we should do whatever we need to do in order to move some cells from + * their circuit queues to output buffers in an intelligent manner. We + * should do this quickly. When we are done, we'll try to schedule() ourself + * if more work needs to be done to setup the next scehduling run. */ + void (*run)(void); + + /* + * External event not related to the scheduler but that can influence it. + */ + + /* (Optional) To be called whenever Tor finds out about a new consensus. + * First the scheduling system as a whole will react to the new consensus + * and change the scheduler if needed. After that, whatever is the (possibly + * new) scheduler will call this so it has the chance to react to the new + * consensus too. If there's a consensus parameter that your scheduler wants + * to keep an eye on, this is where you should check for it. */ + void (*on_new_consensus)(const networkstatus_t *old_c, + const networkstatus_t *new_c); + + /* (Optional) To be called when a channel is being freed. Sometimes channels + * go away (for example: the relay on the other end is shutting down). If + * the scheduler keeps any channel-specific state and has memory to free + * when channels go away, implement this and free it here. */ + void (*on_channel_free)(const channel_t *); + + /* (Optional) To be called whenever Tor is reloading configuration options. + * For example: SIGHUP was issued and Tor is rereading its torrc. A + * scheduler should use this as an opportunity to parse and cache torrc + * options so that it doesn't have to call get_options() all the time. */ + void (*on_new_options)(void); +} scheduler_t; + +/***************************************************************************** + * Globally visible scheduler functions + * + * These functions are how the rest of Tor communicates with the scheduling + * system. + *****************************************************************************/
-/* Set up and shut down the scheduler from main.c */ -void scheduler_free_all(void); void scheduler_init(void); -MOCK_DECL(void, scheduler_run, (void)); - -/* Mark channels as having cells or wanting/not wanting writes */ -MOCK_DECL(void,scheduler_channel_doesnt_want_writes,(channel_t *chan)); -MOCK_DECL(void,scheduler_channel_has_waiting_cells,(channel_t *chan)); -void scheduler_channel_wants_writes(channel_t *chan); - -/* Notify the scheduler of a channel being closed */ -MOCK_DECL(void,scheduler_release_channel,(channel_t *chan)); - -/* Notify scheduler of queue size adjustments */ -void scheduler_adjust_queue_size(channel_t *chan, int dir, uint64_t adj); - -/* Notify scheduler that a channel's queue position may have changed */ -void scheduler_touch_channel(channel_t *chan); - -/* Adjust the watermarks from config file*/ -void scheduler_set_watermarks(uint32_t lo, uint32_t hi, uint32_t max_flush); - -/* XXXFORTOR Temp def of this func to get this commit to compile. Replace with - * real func */ +void scheduler_free_all(void); +void scheduler_conf_changed(void); void scheduler_notify_networkstatus_changed(const networkstatus_t *old_c, const networkstatus_t *new_c); +MOCK_DECL(void, scheduler_release_channel, (channel_t *chan));
-/* Things only scheduler.c and its test suite should see */ - +/* + * Ways for a channel to interact with the scheduling system. A channel only + * really knows (i) whether or not it has cells it wants to send, and + * (ii) whether or not it would like to write. + */ +void scheduler_channel_wants_writes(channel_t *chan); +MOCK_DECL(void, scheduler_channel_doesnt_want_writes, (channel_t *chan)); +MOCK_DECL(void, scheduler_channel_has_waiting_cells, (channel_t *chan)); + +/***************************************************************************** + * Private scheduler functions + * + * These functions are only visible to the scheduling system, the current + * scheduler implementation, and tests. + *****************************************************************************/ #ifdef SCHEDULER_PRIVATE_ -MOCK_DECL(STATIC int, scheduler_compare_channels, + +/********************************* + * Defined in scheduler.c + *********************************/ +int scheduler_should_use_kist(void); +smartlist_t *get_channels_pending(void); +struct event *get_run_sched_ev(void); +MOCK_DECL(int, scheduler_compare_channels, (const void *c1_v, const void *c2_v)); -STATIC uint64_t scheduler_get_queue_heuristic(void); -STATIC void scheduler_update_queue_heuristic(time_t now);
#ifdef TOR_UNIT_TESTS extern smartlist_t *channels_pending; extern struct event *run_sched_ev; -extern uint64_t queue_heuristic; -extern time_t queue_heuristic_timestamp; -#endif -#endif +extern scheduler_t *scheduler; +void scheduler_touch_channel(channel_t *chan); +#endif /* TOR_UNIT_TESTS */ + +/********************************* + * Defined in scheduler_kist.c + *********************************/ + +/* Socke table entry which holds information of a channel's socket and kernel + * TCP information. Only used by KIST. */ +typedef struct socket_table_ent_s { + HT_ENTRY(socket_table_ent_s) node; + const channel_t *chan; + /* Amount written this scheduling run */ + uint64_t written; + /* Amount that can be written this scheduling run */ + uint64_t limit; + /* TCP info from the kernel */ + uint32_t cwnd; + uint32_t unacked; + uint32_t mss; + uint32_t notsent; +} socket_table_ent_t; + +typedef HT_HEAD(outbuf_table_s, outbuf_table_ent_s) outbuf_table_t; + +MOCK_DECL(int, channel_should_write_to_kernel, + (outbuf_table_t *table, channel_t *chan)); +MOCK_DECL(void, channel_write_to_kernel, (channel_t *chan)); +MOCK_DECL(void, update_socket_info_impl, (socket_table_ent_t *ent)); + +scheduler_t *get_kist_scheduler(void); +int32_t kist_scheduler_run_interval(const networkstatus_t *ns); + +#ifdef TOR_UNIT_TESTS +extern int32_t sched_run_interval; +#endif /* TOR_UNIT_TESTS */ + +/********************************* + * Defined in scheduler_vanilla.c + *********************************/ + +scheduler_t *get_vanilla_scheduler(void); + +#endif /* SCHEDULER_PRIVATE_ */
-#endif /* !defined(TOR_SCHEDULER_H) */ +#endif /* TOR_SCHEDULER_H */
diff --git a/src/or/scheduler_kist.c b/src/or/scheduler_kist.c index c14084953..97722cb25 100644 --- a/src/or/scheduler_kist.c +++ b/src/or/scheduler_kist.c @@ -1,5 +1,590 @@ /* Copyright (c) 2017, The Tor Project, Inc. */ /* See LICENSE for licensing information */
+#include <event2/event.h> +#include <netinet/tcp.h> + +#include "or.h" +#include "buffers.h" +#include "config.h" +#include "connection.h" +#include "networkstatus.h" +#define TOR_CHANNEL_INTERNAL_ +#include "channel.h" +#include "channeltls.h" +#define SCHEDULER_PRIVATE_ #include "scheduler.h"
+#define TLS_PER_CELL_OVERHEAD 29 + +/* Kernel interface needed for KIST. */ +#include <linux/sockios.h> + +/***************************************************************************** + * Data structures and supporting functions + *****************************************************************************/ + +/* Socket_table hash table stuff. The socket_table keeps track of per-socket + * limit information imposed by kist and used by kist. */ + +static uint32_t +socket_table_ent_hash(const socket_table_ent_t *ent) +{ + return (uint32_t)ent->chan->global_identifier; +} + +static unsigned +socket_table_ent_eq(const socket_table_ent_t *a, const socket_table_ent_t *b) +{ + return a->chan->global_identifier == b->chan->global_identifier; +} + +typedef HT_HEAD(socket_table_s, socket_table_ent_s) socket_table_t; + +static socket_table_t socket_table = HT_INITIALIZER(); + +HT_PROTOTYPE(socket_table_s, socket_table_ent_s, node, socket_table_ent_hash, + socket_table_ent_eq) +HT_GENERATE2(socket_table_s, socket_table_ent_s, node, socket_table_ent_hash, + socket_table_ent_eq, 0.6, tor_reallocarray, tor_free_) + +/* outbuf_table hash table stuff. The outbuf_table keeps track of which + * channels have data sitting in their outbuf so the kist scheduler can force + * a write from outbuf to kernel periodically during a run and at the end of a + * run. */ + +typedef struct outbuf_table_ent_s { + HT_ENTRY(outbuf_table_ent_s) node; + channel_t *chan; +} outbuf_table_ent_t; + +static uint32_t +outbuf_table_ent_hash(const outbuf_table_ent_t *ent) +{ + return (uint32_t)ent->chan->global_identifier; +} + +static unsigned +outbuf_table_ent_eq(const outbuf_table_ent_t *a, const outbuf_table_ent_t *b) +{ + return a->chan->global_identifier == b->chan->global_identifier; +} + +static outbuf_table_t outbuf_table = HT_INITIALIZER(); + +HT_PROTOTYPE(outbuf_table_s, outbuf_table_ent_s, node, outbuf_table_ent_hash, + outbuf_table_ent_eq) +HT_GENERATE2(outbuf_table_s, outbuf_table_ent_s, node, outbuf_table_ent_hash, + outbuf_table_ent_eq, 0.6, tor_reallocarray, tor_free_) + +/***************************************************************************** + * Other internal data + *****************************************************************************/ + +static struct timeval scheduler_last_run = {0, 0}; +static double sock_buf_size_factor = 1.0; +STATIC int32_t sched_run_interval = 10; +static scheduler_t *kist_scheduler = NULL; + +/***************************************************************************** + * Internally called function implementations + *****************************************************************************/ + +/* Little helper function to get the length of a channel's output buffer */ +static inline size_t +channel_outbuf_length(channel_t *chan) +{ + return buf_datalen(TO_CONN(BASE_CHAN_TO_TLS(chan)->conn)->outbuf); +} + +/* Little helper function for HT_FOREACH_FN. */ +static int +each_channel_write_to_kernel(outbuf_table_ent_t *ent, void *data) +{ + (void) data; /* Make compiler happy. */ + channel_write_to_kernel(ent->chan); + return 0; /* Returning non-zero removes the element from the table. */ +} + +/* Free the given outbuf table entry ent. */ +static int +free_outbuf_info_by_ent(outbuf_table_ent_t *ent, void *data) +{ + (void) data; /* Make compiler happy. */ + log_debug(LD_SCHED, "Freeing outbuf table entry from chan=%" PRIu64, + ent->chan->global_identifier); + tor_free(ent); + return 1; /* So HT_FOREACH_FN will remove the element */ +} + +/* Clean up outbuf_table. Probably because the KIST sched impl is going away */ +static void +free_all_outbuf_info(void) +{ + HT_FOREACH_FN(outbuf_table_s, &outbuf_table, free_outbuf_info_by_ent, NULL); +} + +/* Free the given socket table entry ent. */ +static int +free_socket_info_by_ent(socket_table_ent_t *ent, void *data) +{ + (void) data; /* Make compiler happy. */ + log_debug(LD_SCHED, "Freeing socket table entry from chan=%" PRIu64, + ent->chan->global_identifier); + tor_free(ent); + return 1; /* So HT_FOREACH_FN will remove the element */ +} + +/* Clean up socket_table. Probably because the KIST sched impl is going away */ +static void +free_all_socket_info(void) +{ + HT_FOREACH_FN(socket_table_s, &socket_table, free_socket_info_by_ent, NULL); +} + +static socket_table_ent_t * +socket_table_search(socket_table_t *table, const channel_t *chan) +{ + socket_table_ent_t search, *ent = NULL; + search.chan = chan; + ent = HT_FIND(socket_table_s, table, &search); + return ent; +} + +/* Free a socket entry in table for the given chan. */ +static void +free_socket_info_by_chan(socket_table_t *table, const channel_t *chan) +{ + socket_table_ent_t *ent = NULL; + ent = socket_table_search(table, chan); + if (!ent) + return; + log_debug(LD_SCHED, "scheduler free socket info for chan=%" PRIu64, + chan->global_identifier); + HT_REMOVE(socket_table_s, table, ent); + free_socket_info_by_ent(ent, NULL); +} + +MOCK_IMPL(void, +update_socket_info_impl, (socket_table_ent_t *ent)) +{ + int64_t tcp_space, extra_space; + const tor_socket_t sock = + TO_CONN(BASE_CHAN_TO_TLS((channel_t *) ent->chan)->conn)->s; + struct tcp_info tcp; + socklen_t tcp_info_len = sizeof(tcp); + + /* Gather information */ + getsockopt(sock, SOL_TCP, TCP_INFO, (void *)&(tcp), &tcp_info_len); + ioctl(sock, SIOCOUTQNSD, &(ent->notsent)); + ent->cwnd = tcp.tcpi_snd_cwnd; + ent->unacked = tcp.tcpi_unacked; + ent->mss = tcp.tcpi_snd_mss; + + tcp_space = (ent->cwnd - ent->unacked) * ent->mss; + if (tcp_space < 0) { + tcp_space = 0; + } + extra_space = + clamp_double_to_int64((ent->cwnd * ent->mss) * sock_buf_size_factor) - + ent->notsent; + if (extra_space < 0) { + extra_space = 0; + } + ent->limit = tcp_space + extra_space; + return; +} + +/* Given a socket that isn't in the table, add it. + * Given a socket that is in the table, reinit values that need init-ing + * every scheduling run + */ +static void +init_socket_info(socket_table_t *table, const channel_t *chan) +{ + socket_table_ent_t *ent = NULL; + ent = socket_table_search(table, chan); + if (!ent) { + log_debug(LD_SCHED, "scheduler init socket info for chan=%" PRIu64, + chan->global_identifier); + ent = tor_malloc_zero(sizeof(*ent)); + ent->chan = chan; + HT_INSERT(socket_table_s, table, ent); + } + ent->written = 0; +} + +/* Add chan to the outbuf table if it isn't already in it. If it is, then don't + * do anything */ +static void +outbuf_table_add(outbuf_table_t *table, channel_t *chan) +{ + outbuf_table_ent_t search, *ent; + search.chan = chan; + ent = HT_FIND(outbuf_table_s, table, &search); + if (!ent) { + log_debug(LD_SCHED, "scheduler init outbuf info for chan=%" PRIu64, + chan->global_identifier); + ent = tor_malloc_zero(sizeof(*ent)); + ent->chan = chan; + HT_INSERT(outbuf_table_s, table, ent); + } +} + +static void +outbuf_table_remove(outbuf_table_t *table, channel_t *chan) +{ + outbuf_table_ent_t search, *ent; + search.chan = chan; + ent = HT_FIND(outbuf_table_s, table, &search); + if (ent) { + HT_REMOVE(outbuf_table_s, table, ent); + free_outbuf_info_by_ent(ent, NULL); + } +} + +/* Set the scheduler running interval. */ +static void +set_scheduler_run_interval(const networkstatus_t *ns) +{ + int32_t old_sched_run_interval = sched_run_interval; + sched_run_interval = kist_scheduler_run_interval(ns); + if (old_sched_run_interval != sched_run_interval) { + log_info(LD_SCHED, "Scheduler KIST changing its running interval " + "from %" PRId32 " to %" PRId32, + old_sched_run_interval, sched_run_interval); + } +} + +/* Return true iff the channel associated socket can write to the kernel that + * is hasn't reach the limit. */ +static int +socket_can_write(socket_table_t *table, const channel_t *chan) +{ + socket_table_ent_t *ent = NULL; + ent = socket_table_search(table, chan); + tor_assert(ent); + + int64_t kist_limit_space = + (int64_t) (ent->limit - ent->written) / + (CELL_MAX_NETWORK_SIZE + TLS_PER_CELL_OVERHEAD); + return kist_limit_space > 0; +} + +/* Update the channel's socket kernel information. */ +static void +update_socket_info(socket_table_t *table, const channel_t *chan) +{ + socket_table_ent_t *ent = NULL; + ent = socket_table_search(table, chan); + tor_assert(ent); + update_socket_info_impl(ent); +} + +/* Increament the channel's socket written value by the number of bytes. */ +static void +update_socket_written(socket_table_t *table, channel_t *chan, size_t bytes) +{ + socket_table_ent_t *ent = NULL; + ent = socket_table_search(table, chan); + tor_assert(ent); + + log_debug(LD_SCHED, "chan=%" PRIu64 " wrote %lu bytes, old was %" PRIi64, + chan->global_identifier, bytes, ent->written); + + ent->written += bytes; +} + +/* + * A naive KIST impl would write every single cell all the way to the kernel. + * That would take a lot of system calls. A less bad KIST impl would write a + * channel's outbuf to the kernel only when we are switching to a different + * channel. But if we have two channels with equal priority, we end up writing + * one cell for each and bouncing back and forth. This KIST impl avoids that + * by only writing a channel's outbuf to the kernel if it has 8 cells or more + * in it. + */ +MOCK_IMPL(int, channel_should_write_to_kernel, + (outbuf_table_t *table, channel_t *chan)) +{ + outbuf_table_add(table, chan); + /* CELL_MAX_NETWORK_SIZE * 8 because we only want to write the outbuf to the + * kernel if there's 8 or more cells waiting */ + return channel_outbuf_length(chan) > (CELL_MAX_NETWORK_SIZE * 8); +} + +/* Little helper function to write a channel's outbuf all the way to the + * kernel */ +MOCK_IMPL(void, channel_write_to_kernel, (channel_t *chan)) +{ + log_debug(LD_SCHED, "Writing %lu bytes to kernel for chan %" PRIu64, + channel_outbuf_length(chan), chan->global_identifier); + connection_handle_write(TO_CONN(BASE_CHAN_TO_TLS(chan)->conn), 0); +} + +/* Return true iff the scheduler has work to perform. */ +static int +have_work(void) +{ + smartlist_t *cp = get_channels_pending(); + tor_assert(cp); + return smartlist_len(cp) > 0; +} + +/* Function of the scheduler interface: free_all() */ +static void +kist_free_all(void) +{ + free_all_outbuf_info(); + free_all_socket_info(); +} + +/* Function of the scheduler interface: on_channel_free() */ +static void +kist_on_channel_free(const channel_t *chan) +{ + free_socket_info_by_chan(&socket_table, chan); +} + +/* Function of the scheduler interface: on_new_consensus() */ +static void +kist_scheduler_on_new_consensus(const networkstatus_t *old_c, + const networkstatus_t *new_c) +{ + (void) old_c; + (void) new_c; + + set_scheduler_run_interval(new_c); +} + +/* Function of the scheduler interface: run() */ +static void +kist_scheduler_on_new_options(void) +{ + sock_buf_size_factor = get_options()->KISTSockBufSizeFactor; + + /* Calls kist_scheduler_run_interval which calls get_options(). */ + set_scheduler_run_interval(NULL); +} + +/* Function of the scheduler interface: init() */ +static void +kist_scheduler_init(void) +{ + kist_scheduler_on_new_options(); + tor_assert(sched_run_interval > 0); +} + +/* Function of the scheduler interface: schedule() */ +static void +kist_scheduler_schedule(void) +{ + struct timeval now, next_run; + int32_t diff; + struct event *ev = get_run_sched_ev(); + tor_assert(ev); + if (!have_work()) { + return; + } + tor_gettimeofday(&now); + diff = (int32_t) tv_mdiff(&scheduler_last_run, &now); + if (diff < sched_run_interval) { + next_run.tv_sec = 0; + /* 1000 for ms -> us */ + next_run.tv_usec = (sched_run_interval - diff) * 1000; + /* Readding an event reschedules it. It does not duplicate it. */ + event_add(ev, &next_run); + } else { + event_active(ev, EV_TIMEOUT, 1); + } +} + +/* Function of the scheduler interface: run() */ +static void +kist_scheduler_run(void) +{ + /* Define variables */ + channel_t *chan = NULL; // current working channel + /* The last distinct chan served in a sched loop. */ + channel_t *prev_chan = NULL; + int flush_result; // temporarily store results from flush calls + /* Channels to be readding to pending at the end */ + smartlist_t *to_readd = NULL; + smartlist_t *cp = get_channels_pending(); + + /* For each pending channel, collect new kernel information */ + SMARTLIST_FOREACH_BEGIN(cp, const channel_t *, pchan) { + init_socket_info(&socket_table, pchan); + update_socket_info(&socket_table, pchan); + } SMARTLIST_FOREACH_END(pchan); + + log_debug(LD_SCHED, "Running the scheduler. %d channels pending", + smartlist_len(cp)); + + /* The main scheduling loop. Loop until there are no more pending channels */ + while (smartlist_len(cp) > 0) { + /* get best channel */ + chan = smartlist_pqueue_pop(cp, scheduler_compare_channels, + offsetof(channel_t, sched_heap_idx)); + tor_assert(chan); + outbuf_table_add(&outbuf_table, chan); + + /* if we have switched to a new channel, consider writing the previous + * channel's outbuf to the kernel. */ + if (!prev_chan) prev_chan = chan; + if (prev_chan != chan) { + if (channel_should_write_to_kernel(&outbuf_table, prev_chan)) { + channel_write_to_kernel(prev_chan); + outbuf_table_remove(&outbuf_table, prev_chan); + } + prev_chan = chan; + } + + /* Only flush and write if the per-socket limit hasn't been hit */ + if (socket_can_write(&socket_table, chan)) { + /* flush to channel queue/outbuf */ + flush_result = (int)channel_flush_some_cells(chan, 1); // 1 for num cells + /* flush_result has the # cells flushed */ + if (flush_result > 0) { + update_socket_written(&socket_table, chan, flush_result * + (CELL_MAX_NETWORK_SIZE + TLS_PER_CELL_OVERHEAD)); + } + /* XXX What if we didn't flush? */ + } + + /* Decide what to do with the channel now */ + + if (!channel_more_to_flush(chan) && + !socket_can_write(&socket_table, chan)) { + + /* Case 1: no more cells to send, and cannot write */ + + /* + * You might think we should put the channel in SCHED_CHAN_IDLE. And + * you're probably correct. While implementing KIST, we found that the + * scheduling system would sometimes lose track of channels when we did + * that. We suspect it has to do with the difference between "can't + * write because socket/outbuf is full" and KIST's "can't write because + * we've arbitrarily decided that that's enough for now." Sometimes + * channels run out of cells at the same time they hit their + * kist-imposed write limit and maybe the rest of Tor doesn't put the + * channel back in pending when it is supposed to. + * + * This should be investigated again. It is as simple as changing + * SCHED_CHAN_WAITING_FOR_CELLS to SCHED_CHAN_IDLE and seeing if Tor + * starts having serious throughput issues. Best done in shadow/chutney. + */ + chan->scheduler_state = SCHED_CHAN_WAITING_FOR_CELLS; + log_debug(LD_SCHED, "chan=%" PRIu64 " now waiting_for_cells", + chan->global_identifier); + } else if (!channel_more_to_flush(chan)) { + + /* Case 2: no more cells to send, but still open for writes */ + + chan->scheduler_state = SCHED_CHAN_WAITING_FOR_CELLS; + log_debug(LD_SCHED, "chan=%" PRIu64 " now waiting_for_cells", + chan->global_identifier); + } else if (!socket_can_write(&socket_table, chan)) { + + /* Case 3: cells to send, but cannot write */ + + chan->scheduler_state = SCHED_CHAN_WAITING_TO_WRITE; + if (!to_readd) + to_readd = smartlist_new(); + smartlist_add(to_readd, chan); + log_debug(LD_SCHED, "chan=%" PRIu64 " now waiting_to_write", + chan->global_identifier); + } else { + + /* Case 4: cells to send, and still open for writes */ + + chan->scheduler_state = SCHED_CHAN_PENDING; + smartlist_pqueue_add(cp, scheduler_compare_channels, + offsetof(channel_t, sched_heap_idx), chan); + } + } /* End of main scheduling loop */ + + /* Write the outbuf of any channels that still have data */ + HT_FOREACH_FN(outbuf_table_s, &outbuf_table, each_channel_write_to_kernel, + NULL); + free_all_outbuf_info(); + HT_CLEAR(outbuf_table_s, &outbuf_table); + + log_debug(LD_SCHED, "len pending=%d, len to_readd=%d", + smartlist_len(cp), + (to_readd ? smartlist_len(to_readd) : -1)); + + /* Readd any channels we need to */ + if (to_readd) { + SMARTLIST_FOREACH_BEGIN(to_readd, channel_t *, readd_chan) { + readd_chan->scheduler_state = SCHED_CHAN_PENDING; + if (!smartlist_contains(cp, readd_chan)) { + smartlist_pqueue_add(cp, scheduler_compare_channels, + offsetof(channel_t, sched_heap_idx), readd_chan); + } + } SMARTLIST_FOREACH_END(readd_chan); + smartlist_free(to_readd); + } + + tor_gettimeofday(&scheduler_last_run); +} + +/***************************************************************************** + * Externally called function implementations not called through scheduler_t + *****************************************************************************/ + +/* Return the KIST scheduler object. If it didn't exists, return a newly + * allocated one but init() is not called. */ +scheduler_t * +get_kist_scheduler(void) +{ + if (!kist_scheduler) { + log_debug(LD_SCHED, "Allocating kist scheduler struct"); + kist_scheduler = tor_malloc_zero(sizeof(*kist_scheduler)); + kist_scheduler->free_all = kist_free_all; + kist_scheduler->on_channel_free = kist_on_channel_free; + kist_scheduler->init = kist_scheduler_init; + kist_scheduler->on_new_consensus = kist_scheduler_on_new_consensus; + kist_scheduler->schedule = kist_scheduler_schedule; + kist_scheduler->run = kist_scheduler_run; + kist_scheduler->on_new_options = kist_scheduler_on_new_options; + } + return kist_scheduler; +} + +/* Default interval that KIST runs (in ms). */ +#define KIST_SCHED_RUN_INTERVAL_DEFAULT 10 +/* Minimum interval that KIST runs. This value disables KIST. */ +#define KIST_SCHED_RUN_INTERVAL_MIN 0 +/* Maximum interval that KIST runs (in ms). */ +#define KIST_SCHED_RUN_INTERVAL_MAX 100 + +/* Check the torrc for the configured KIST scheduler run frequency. + * - If torrc < 0, then return the negative torrc value (shouldn't even be + * using KIST) + * - If torrc > 0, then return the positive torrc value (should use KIST, and + * should use the set value) + * - If torrc == 0, then look in the consensus for what the value should be. + * - If == 0, then return -1 (don't use KIST) + * - If > 0, then return the positive consensus value + * - If consensus doesn't say anything, return 10 milliseconds + */ +int32_t +kist_scheduler_run_interval(const networkstatus_t *ns) +{ + int32_t run_interval = (int32_t)get_options()->KISTSchedRunInterval; + if (run_interval != 0) { + log_debug(LD_SCHED, "Found KISTSchedRunInterval in torrc. Using that."); + return run_interval; + } + + log_debug(LD_SCHED, "Turning to the consensus for KISTSchedRunInterval"); + run_interval = networkstatus_get_param(ns, "KISTSchedRunInterval", + KIST_SCHED_RUN_INTERVAL_DEFAULT, + KIST_SCHED_RUN_INTERVAL_MIN, + KIST_SCHED_RUN_INTERVAL_MAX); + if (run_interval <= 0) + return -1; + return run_interval; +} + diff --git a/src/or/scheduler_vanilla.c b/src/or/scheduler_vanilla.c index c14084953..a5e104e97 100644 --- a/src/or/scheduler_vanilla.c +++ b/src/or/scheduler_vanilla.c @@ -1,5 +1,191 @@ /* Copyright (c) 2017, The Tor Project, Inc. */ /* See LICENSE for licensing information */
+#include <event2/event.h> + +#include "or.h" +#include "config.h" +#define TOR_CHANNEL_INTERNAL_ +#include "channel.h" +#define SCHEDULER_PRIVATE_ #include "scheduler.h"
+/***************************************************************************** + * Other internal data + *****************************************************************************/ + +/* Maximum cells to flush in a single call to channel_flush_some_cells(); */ +#define MAX_FLUSH_CELLS 1000 + +static scheduler_t *vanilla_scheduler = NULL; + +/***************************************************************************** + * Externally called function implementations + *****************************************************************************/ + +/* Return true iff the scheduler has work to perform. */ +static int +have_work(void) +{ + smartlist_t *cp = get_channels_pending(); + tor_assert(cp); + return smartlist_len(cp) > 0; +} + +/** Retrigger the scheduler in a way safe to use from the callback */ + +static void +vanilla_scheduler_schedule(void) +{ + if (!have_work()) { + return; + } + struct event *ev = get_run_sched_ev(); + tor_assert(ev); + event_active(ev, EV_TIMEOUT, 1); +} + +static void +vanilla_scheduler_run(void) +{ + int n_cells, n_chans_before, n_chans_after; + ssize_t flushed, flushed_this_time; + smartlist_t *cp = get_channels_pending(); + smartlist_t *to_readd = NULL; + channel_t *chan = NULL; + + log_debug(LD_SCHED, "We have a chance to run the scheduler"); + + n_chans_before = smartlist_len(cp); + + while (smartlist_len(cp) > 0) { + /* Pop off a channel */ + chan = smartlist_pqueue_pop(cp, + scheduler_compare_channels, + offsetof(channel_t, sched_heap_idx)); + tor_assert(chan); + + /* Figure out how many cells we can write */ + n_cells = channel_num_cells_writeable(chan); + if (n_cells > 0) { + log_debug(LD_SCHED, + "Scheduler saw pending channel " U64_FORMAT " at %p with " + "%d cells writeable", + U64_PRINTF_ARG(chan->global_identifier), chan, n_cells); + + flushed = 0; + while (flushed < n_cells) { + flushed_this_time = + channel_flush_some_cells(chan, + MIN(MAX_FLUSH_CELLS, (size_t) n_cells - flushed)); + if (flushed_this_time <= 0) break; + flushed += flushed_this_time; + } + + if (flushed < n_cells) { + /* We ran out of cells to flush */ + chan->scheduler_state = SCHED_CHAN_WAITING_FOR_CELLS; + log_debug(LD_SCHED, + "Channel " U64_FORMAT " at %p " + "entered waiting_for_cells from pending", + U64_PRINTF_ARG(chan->global_identifier), + chan); + } else { + /* The channel may still have some cells */ + if (channel_more_to_flush(chan)) { + /* The channel goes to either pending or waiting_to_write */ + if (channel_num_cells_writeable(chan) > 0) { + /* Add it back to pending later */ + if (!to_readd) to_readd = smartlist_new(); + smartlist_add(to_readd, chan); + log_debug(LD_SCHED, + "Channel " U64_FORMAT " at %p " + "is still pending", + U64_PRINTF_ARG(chan->global_identifier), + chan); + } else { + /* It's waiting to be able to write more */ + chan->scheduler_state = SCHED_CHAN_WAITING_TO_WRITE; + log_debug(LD_SCHED, + "Channel " U64_FORMAT " at %p " + "entered waiting_to_write from pending", + U64_PRINTF_ARG(chan->global_identifier), + chan); + } + } else { + /* No cells left; it can go to idle or waiting_for_cells */ + if (channel_num_cells_writeable(chan) > 0) { + /* + * It can still accept writes, so it goes to + * waiting_for_cells + */ + chan->scheduler_state = SCHED_CHAN_WAITING_FOR_CELLS; + log_debug(LD_SCHED, + "Channel " U64_FORMAT " at %p " + "entered waiting_for_cells from pending", + U64_PRINTF_ARG(chan->global_identifier), + chan); + } else { + /* + * We exactly filled up the output queue with all available + * cells; go to idle. + */ + chan->scheduler_state = SCHED_CHAN_IDLE; + log_debug(LD_SCHED, + "Channel " U64_FORMAT " at %p " + "become idle from pending", + U64_PRINTF_ARG(chan->global_identifier), + chan); + } + } + } + + log_debug(LD_SCHED, + "Scheduler flushed %d cells onto pending channel " + U64_FORMAT " at %p", + (int)flushed, U64_PRINTF_ARG(chan->global_identifier), + chan); + } else { + log_info(LD_SCHED, + "Scheduler saw pending channel " U64_FORMAT " at %p with " + "no cells writeable", + U64_PRINTF_ARG(chan->global_identifier), chan); + /* Put it back to WAITING_TO_WRITE */ + chan->scheduler_state = SCHED_CHAN_WAITING_TO_WRITE; + } + } + + /* Readd any channels we need to */ + if (to_readd) { + SMARTLIST_FOREACH_BEGIN(to_readd, channel_t *, readd_chan) { + readd_chan->scheduler_state = SCHED_CHAN_PENDING; + smartlist_pqueue_add(cp, + scheduler_compare_channels, + offsetof(channel_t, sched_heap_idx), + readd_chan); + } SMARTLIST_FOREACH_END(readd_chan); + smartlist_free(to_readd); + } + + n_chans_after = smartlist_len(cp); + log_debug(LD_SCHED, "Scheduler handled %d of %d pending channels", + n_chans_before - n_chans_after, n_chans_before); +} + +scheduler_t * +get_vanilla_scheduler(void) +{ + if (!vanilla_scheduler) { + log_debug(LD_SCHED, "Initializing vanilla scheduler struct"); + vanilla_scheduler = tor_malloc_zero(sizeof(*vanilla_scheduler)); + vanilla_scheduler->free_all = NULL; + vanilla_scheduler->on_channel_free = NULL; + vanilla_scheduler->init = NULL; + vanilla_scheduler->on_new_consensus = NULL; + vanilla_scheduler->schedule = vanilla_scheduler_schedule; + vanilla_scheduler->run = vanilla_scheduler_run; + vanilla_scheduler->on_new_options = NULL; + } + return vanilla_scheduler; +} + diff --git a/src/test/test_scheduler.c b/src/test/test_scheduler.c index b0490ccfc..3ab8ed2bf 100644 --- a/src/test/test_scheduler.c +++ b/src/test/test_scheduler.c @@ -9,8 +9,12 @@ #define TOR_CHANNEL_INTERNAL_ #define CHANNEL_PRIVATE_ #include "or.h" +#include "config.h" #include "compat_libevent.h" #include "channel.h" +#include "channeltls.h" +#include "connection.h" +#include "networkstatus.h" #define SCHEDULER_PRIVATE_ #include "scheduler.h"
@@ -18,53 +22,65 @@ #include "test.h" #include "fakechans.h"
-/* Event base for scheduelr tests */ -static struct event_base *mock_event_base = NULL; - -/* Statics controlling mocks */ -static circuitmux_t *mock_ccm_tgt_1 = NULL; -static circuitmux_t *mock_ccm_tgt_2 = NULL; +/* Shamelessly stolen from compat_libevent.c */ +#define V(major, minor, patch) \ + (((major) << 24) | ((minor) << 16) | ((patch) << 8))
-static circuitmux_t *mock_cgp_tgt_1 = NULL; -static circuitmux_policy_t *mock_cgp_val_1 = NULL; -static circuitmux_t *mock_cgp_tgt_2 = NULL; -static circuitmux_policy_t *mock_cgp_val_2 = NULL; +/****************************************************************************** + * Statistical info + *****************************************************************************/ static int scheduler_compare_channels_mock_ctr = 0; static int scheduler_run_mock_ctr = 0;
-static void channel_flush_some_cells_mock_free_all(void); -static void channel_flush_some_cells_mock_set(channel_t *chan, - ssize_t num_cells); +/****************************************************************************** + * Utility functions and things we need to mock + *****************************************************************************/ +static or_options_t mocked_options; +static const or_options_t * +mock_get_options(void) +{ + return &mocked_options; +}
-/* Setup for mock event stuff */ -static void mock_event_free_all(void); -static void mock_event_init(void); +static void +clear_options(void) +{ + memset(&mocked_options, 0, sizeof(mocked_options)); +}
-/* Mocks used by scheduler tests */ -static ssize_t channel_flush_some_cells_mock(channel_t *chan, - ssize_t num_cells); -static int circuitmux_compare_muxes_mock(circuitmux_t *cmux_1, - circuitmux_t *cmux_2); -static const circuitmux_policy_t * circuitmux_get_policy_mock( - circuitmux_t *cmux); -static int scheduler_compare_channels_mock(const void *c1_v, - const void *c2_v); -static void scheduler_run_noop_mock(void); -static struct event_base * tor_libevent_get_base_mock(void); - -/* Scheduler test cases */ -static void test_scheduler_channel_states(void *arg); -static void test_scheduler_compare_channels(void *arg); -static void test_scheduler_initfree(void *arg); -static void test_scheduler_loop(void *arg); -static void test_scheduler_queue_heuristic(void *arg); - -/* Mock event init/free */ +static int32_t +mock_vanilla_networkstatus_get_param( + const networkstatus_t *ns, const char *param_name, int32_t default_val, + int32_t min_val, int32_t max_val) +{ + (void)ns; + (void)default_val; + (void)min_val; + (void)max_val; + // only support KISTSchedRunInterval right now + tor_assert(strcmp(param_name, "KISTSchedRunInterval")==0); + return -1; +}
-/* Shamelessly stolen from compat_libevent.c */ -#define V(major, minor, patch) \ - (((major) << 24) | ((minor) << 16) | ((patch) << 8)) +static int32_t +mock_kist_networkstatus_get_param( + const networkstatus_t *ns, const char *param_name, int32_t default_val, + int32_t min_val, int32_t max_val) +{ + (void)ns; + (void)default_val; + (void)min_val; + (void)max_val; + // only support KISTSchedRunInterval right now + tor_assert(strcmp(param_name, "KISTSchedRunInterval")==0); + return 12; +}
+/* Event base for scheduelr tests */ +static struct event_base *mock_event_base = NULL; +/* Setup for mock event stuff */ +static void mock_event_free_all(void); +static void mock_event_init(void); static void mock_event_free_all(void) { @@ -110,7 +126,84 @@ mock_event_init(void) return; }
-/* Mocks */ +static struct event_base * +tor_libevent_get_base_mock(void) +{ + return mock_event_base; +} + +static int +scheduler_compare_channels_mock(const void *c1_v, + const void *c2_v) +{ + uintptr_t p1, p2; + + p1 = (uintptr_t)(c1_v); + p2 = (uintptr_t)(c2_v); + + ++scheduler_compare_channels_mock_ctr; + + if (p1 == p2) return 0; + else if (p1 < p2) return 1; + else return -1; +} + +static void +scheduler_run_noop_mock(void) +{ + ++scheduler_run_mock_ctr; +} + +static circuitmux_t *mock_ccm_tgt_1 = NULL; +static circuitmux_t *mock_ccm_tgt_2 = NULL; +static circuitmux_t *mock_cgp_tgt_1 = NULL; +static circuitmux_policy_t *mock_cgp_val_1 = NULL; +static circuitmux_t *mock_cgp_tgt_2 = NULL; +static circuitmux_policy_t *mock_cgp_val_2 = NULL; + +static const circuitmux_policy_t * +circuitmux_get_policy_mock(circuitmux_t *cmux) +{ + const circuitmux_policy_t *result = NULL; + + tt_assert(cmux != NULL); + if (cmux) { + if (cmux == mock_cgp_tgt_1) result = mock_cgp_val_1; + else if (cmux == mock_cgp_tgt_2) result = mock_cgp_val_2; + else result = circuitmux_get_policy__real(cmux); + } + + done: + return result; +} + +static int +circuitmux_compare_muxes_mock(circuitmux_t *cmux_1, + circuitmux_t *cmux_2) +{ + int result = 0; + + tt_assert(cmux_1 != NULL); + tt_assert(cmux_2 != NULL); + + if (cmux_1 != cmux_2) { + if (cmux_1 == mock_ccm_tgt_1 && cmux_2 == mock_ccm_tgt_2) result = -1; + else if (cmux_1 == mock_ccm_tgt_2 && cmux_2 == mock_ccm_tgt_1) { + result = 1; + } else { + if (cmux_1 == mock_ccm_tgt_1 || cmux_1 == mock_ccm_tgt_2) result = -1; + else if (cmux_2 == mock_ccm_tgt_1 || cmux_2 == mock_ccm_tgt_2) { + result = 1; + } else { + result = circuitmux_compare_muxes__real(cmux_1, cmux_2); + } + } + } + /* else result = 0 always */ + + done: + return result; +}
typedef struct { const channel_t *chan; @@ -174,6 +267,67 @@ channel_flush_some_cells_mock_set(channel_t *chan, ssize_t num_cells) } }
+static int +channel_more_to_flush_mock(channel_t *chan) +{ + tor_assert(chan); + + flush_mock_channel_t *found_mock_ch = NULL; + + /* Check if we have any queued */ + if (! TOR_SIMPLEQ_EMPTY(&chan->incoming_queue)) + return 1; + + SMARTLIST_FOREACH_BEGIN(chans_for_flush_mock, + flush_mock_channel_t *, + flush_mock_ch) { + if (flush_mock_ch != NULL && flush_mock_ch->chan != NULL) { + if (flush_mock_ch->chan == chan) { + /* Found it */ + found_mock_ch = flush_mock_ch; + break; + } + } else { + /* That shouldn't be there... */ + SMARTLIST_DEL_CURRENT(chans_for_flush_mock, flush_mock_ch); + tor_free(flush_mock_ch); + } + } SMARTLIST_FOREACH_END(flush_mock_ch); + + tor_assert(found_mock_ch); + + /* Check if any circuits would like to queue some */ + /* special for the mock: return the number of cells (instead of 1), or zero + * if nothing to flush */ + return (found_mock_ch->cells > 0 ? (int)found_mock_ch->cells : 0 ); +} + +static void +channel_write_to_kernel_mock(channel_t *chan) +{ + (void)chan; + //log_debug(LD_SCHED, "chan=%d writing to kernel", + // (int)chan->global_identifier); +} + +static int +channel_should_write_to_kernel_mock(outbuf_table_t *ot, channel_t *chan) +{ + (void)ot; + (void)chan; + return 1; + /* We could make this more complicated if we wanted. But I don't think doing + * so tests much of anything */ + //static int called_counter = 0; + //if (++called_counter >= 3) { + // called_counter -= 3; + // log_debug(LD_SCHED, "chan=%d should write to kernel", + // (int)chan->global_identifier); + // return 1; + //} + //return 0; +} + static ssize_t channel_flush_some_cells_mock(channel_t *chan, ssize_t num_cells) { @@ -215,11 +369,6 @@ channel_flush_some_cells_mock(channel_t *chan, ssize_t num_cells)
flushed += max; found->cells -= max; - - if (found->cells <= 0) { - smartlist_remove(chans_for_flush_mock, found); - tor_free(found); - } } } } @@ -228,90 +377,25 @@ channel_flush_some_cells_mock(channel_t *chan, ssize_t num_cells) return flushed; }
-static int -circuitmux_compare_muxes_mock(circuitmux_t *cmux_1, - circuitmux_t *cmux_2) -{ - int result = 0; - - tt_ptr_op(cmux_1, OP_NE, NULL); - tt_ptr_op(cmux_2, OP_NE, NULL); - - if (cmux_1 != cmux_2) { - if (cmux_1 == mock_ccm_tgt_1 && cmux_2 == mock_ccm_tgt_2) result = -1; - else if (cmux_1 == mock_ccm_tgt_2 && cmux_2 == mock_ccm_tgt_1) { - result = 1; - } else { - if (cmux_1 == mock_ccm_tgt_1 || cmux_1 == mock_ccm_tgt_2) result = -1; - else if (cmux_2 == mock_ccm_tgt_1 || cmux_2 == mock_ccm_tgt_2) { - result = 1; - } else { - result = circuitmux_compare_muxes__real(cmux_1, cmux_2); - } - } - } - /* else result = 0 always */ - - done: - return result; -} - -static const circuitmux_policy_t * -circuitmux_get_policy_mock(circuitmux_t *cmux) -{ - const circuitmux_policy_t *result = NULL; - - tt_ptr_op(cmux, OP_NE, NULL); - if (cmux) { - if (cmux == mock_cgp_tgt_1) result = mock_cgp_val_1; - else if (cmux == mock_cgp_tgt_2) result = mock_cgp_val_2; - else result = circuitmux_get_policy__real(cmux); - } - - done: - return result; -} - -static int -scheduler_compare_channels_mock(const void *c1_v, - const void *c2_v) -{ - uintptr_t p1, p2; - - p1 = (uintptr_t)(c1_v); - p2 = (uintptr_t)(c2_v); - - ++scheduler_compare_channels_mock_ctr; - - if (p1 == p2) return 0; - else if (p1 < p2) return 1; - else return -1; -} - static void -scheduler_run_noop_mock(void) -{ - ++scheduler_run_mock_ctr; -} - -static struct event_base * -tor_libevent_get_base_mock(void) +update_socket_info_impl_mock(socket_table_ent_t *ent) { - return mock_event_base; + ent->cwnd = ent->unacked = ent->mss = ent->notsent = 0; + ent->limit = INT_MAX; }
-/* Test cases */ - static void -test_scheduler_channel_states(void *arg) +perform_channel_state_tests(int KISTSchedRunInterval) { channel_t *ch1 = NULL, *ch2 = NULL; int old_count;
- (void)arg; + /* setup options so we're sure about what sched we are running */ + MOCK(get_options, mock_get_options); + clear_options(); + mocked_options.KISTSchedRunInterval = KISTSchedRunInterval;
/* Set up libevent and scheduler */ - mock_event_init(); MOCK(tor_libevent_get_base, tor_libevent_get_base_mock); scheduler_init(); @@ -324,7 +408,7 @@ test_scheduler_channel_states(void *arg) * Disable scheduler_run so we can just check the state transitions * without having to make everything it might call work too. */ - MOCK(scheduler_run, scheduler_run_noop_mock); + scheduler->run = scheduler_run_noop_mock;
tt_int_op(smartlist_len(channels_pending), OP_EQ, 0);
@@ -351,7 +435,7 @@ test_scheduler_channel_states(void *arg) channel_register(ch2); tt_assert(ch2->registered);
- /* Send it to SCHED_CHAN_WAITING_TO_WRITE */ + /* Send ch1 to SCHED_CHAN_WAITING_TO_WRITE */ scheduler_channel_has_waiting_cells(ch1); tt_int_op(ch1->scheduler_state, OP_EQ, SCHED_CHAN_WAITING_TO_WRITE);
@@ -415,8 +499,8 @@ test_scheduler_channel_states(void *arg) tor_free(ch2);
UNMOCK(scheduler_compare_channels); - UNMOCK(scheduler_run); UNMOCK(tor_libevent_get_base); + UNMOCK(get_options);
return; } @@ -502,40 +586,21 @@ test_scheduler_compare_channels(void *arg) return; }
-static void -test_scheduler_initfree(void *arg) -{ - (void)arg; - - tt_ptr_op(channels_pending, OP_EQ, NULL); - tt_ptr_op(run_sched_ev, OP_EQ, NULL); - - mock_event_init(); - MOCK(tor_libevent_get_base, tor_libevent_get_base_mock); - - scheduler_init(); - - tt_ptr_op(channels_pending, OP_NE, NULL); - tt_ptr_op(run_sched_ev, OP_NE, NULL); - - scheduler_free_all(); - - UNMOCK(tor_libevent_get_base); - mock_event_free_all(); - - tt_ptr_op(channels_pending, OP_EQ, NULL); - tt_ptr_op(run_sched_ev, OP_EQ, NULL); - - done: - return; -} +/****************************************************************************** + * The actual tests! + *****************************************************************************/
static void -test_scheduler_loop(void *arg) +test_scheduler_loop_vanilla(void *arg) { + (void)arg; channel_t *ch1 = NULL, *ch2 = NULL; + void (*run_func_ptr)(void);
- (void)arg; + /* setup options so we're sure about what sched we are running */ + MOCK(get_options, mock_get_options); + clear_options(); + mocked_options.KISTSchedRunInterval = -1;
/* Set up libevent and scheduler */
@@ -551,12 +616,14 @@ test_scheduler_loop(void *arg) * Disable scheduler_run so we can just check the state transitions * without having to make everything it might call work too. */ - MOCK(scheduler_run, scheduler_run_noop_mock); + run_func_ptr = scheduler->run; + scheduler->run = scheduler_run_noop_mock;
tt_int_op(smartlist_len(channels_pending), OP_EQ, 0);
/* Set up a fake channel */ ch1 = new_fake_channel(); + ch1->magic = TLS_CHAN_MAGIC; tt_assert(ch1);
/* Start it off in OPENING */ @@ -574,6 +641,7 @@ test_scheduler_loop(void *arg)
/* Now get another one */ ch2 = new_fake_channel(); + ch2->magic = TLS_CHAN_MAGIC; tt_assert(ch2); ch2->state = CHANNEL_STATE_OPENING; ch2->cmux = circuitmux_alloc(); @@ -615,15 +683,9 @@ test_scheduler_loop(void *arg)
/* * Now we've got two pending channels and need to fire off - * scheduler_run(); first, unmock it. + * the scheduler run() that we kept. */ - - UNMOCK(scheduler_run); - - scheduler_run(); - - /* Now re-mock it */ - MOCK(scheduler_run, scheduler_run_noop_mock); + run_func_ptr();
/* * Assert that they're still in the states we left and aren't still @@ -661,15 +723,10 @@ test_scheduler_loop(void *arg) channel_flush_some_cells_mock_set(ch2, 48);
/* - * And re-run the scheduler_run() loop with non-zero returns from + * And re-run the scheduler run() loop with non-zero returns from * channel_flush_some_cells() this time. */ - UNMOCK(scheduler_run); - - scheduler_run(); - - /* Now re-mock it */ - MOCK(scheduler_run, scheduler_run_noop_mock); + run_func_ptr();
/* * ch1 should have gone to SCHED_CHAN_WAITING_FOR_CELLS, with 16 flushed @@ -707,52 +764,240 @@ test_scheduler_loop(void *arg)
UNMOCK(channel_flush_some_cells); UNMOCK(scheduler_compare_channels); - UNMOCK(scheduler_run); UNMOCK(tor_libevent_get_base); + UNMOCK(get_options); +} + +static void +test_scheduler_loop_kist(void *arg) +{ + (void) arg; + channel_t *ch1 = new_fake_channel(), *ch2 = new_fake_channel(); + + /* setup options so we're sure about what sched we are running */ + MOCK(get_options, mock_get_options); + MOCK(channel_flush_some_cells, channel_flush_some_cells_mock); + MOCK(channel_more_to_flush, channel_more_to_flush_mock); + MOCK(channel_write_to_kernel, channel_write_to_kernel_mock); + MOCK(channel_should_write_to_kernel, channel_should_write_to_kernel_mock); + MOCK(update_socket_info_impl, update_socket_info_impl_mock); + clear_options(); + mocked_options.KISTSchedRunInterval = 11; + scheduler_init(); + + tt_assert(ch1); + ch1->magic = TLS_CHAN_MAGIC; + ch1->state = CHANNEL_STATE_OPENING; + ch1->cmux = circuitmux_alloc(); + channel_register(ch1); + tt_assert(ch1->registered); + channel_change_state_open(ch1); + scheduler_channel_has_waiting_cells(ch1); + scheduler_channel_wants_writes(ch1); + channel_flush_some_cells_mock_set(ch1, 5); + + tt_assert(ch2); + ch2->magic = TLS_CHAN_MAGIC; + ch2->state = CHANNEL_STATE_OPENING; + ch2->cmux = circuitmux_alloc(); + channel_register(ch2); + tt_assert(ch2->registered); + channel_change_state_open(ch2); + scheduler_channel_has_waiting_cells(ch2); + scheduler_channel_wants_writes(ch2); + channel_flush_some_cells_mock_set(ch2, 5); + + scheduler->run(); + + scheduler_channel_has_waiting_cells(ch1); + channel_flush_some_cells_mock_set(ch1, 5); + + scheduler->run(); + + scheduler_channel_has_waiting_cells(ch1); + channel_flush_some_cells_mock_set(ch1, 5); + scheduler_channel_has_waiting_cells(ch2); + channel_flush_some_cells_mock_set(ch2, 5); + + scheduler->run(); + + channel_flush_some_cells_mock_free_all(); + tt_int_op(1,==,1); + + done: + /* Prep the channel so the free() function doesn't explode. */ + ch1->state = ch2->state = CHANNEL_STATE_CLOSED; + ch1->registered = ch2->registered = 0; + channel_free(ch1); + channel_free(ch2); + UNMOCK(update_socket_info_impl); + UNMOCK(channel_should_write_to_kernel); + UNMOCK(channel_write_to_kernel); + UNMOCK(channel_more_to_flush); + UNMOCK(channel_flush_some_cells); + UNMOCK(get_options); + scheduler_free_all(); + return; }
static void -test_scheduler_queue_heuristic(void *arg) +test_scheduler_channel_states(void *arg) { - time_t now = approx_time(); - uint64_t qh; + (void)arg; + perform_channel_state_tests(-1); // vanilla + perform_channel_state_tests(11); // kist +}
+static void +test_scheduler_initfree(void *arg) +{ (void)arg;
- queue_heuristic = 0; - queue_heuristic_timestamp = 0; + tt_ptr_op(channels_pending, ==, NULL); + tt_ptr_op(run_sched_ev, ==, NULL); + + mock_event_init(); + MOCK(tor_libevent_get_base, tor_libevent_get_base_mock);
- /* Not yet inited case */ - scheduler_update_queue_heuristic(now - 180); - tt_u64_op(queue_heuristic, OP_EQ, 0); - tt_int_op(queue_heuristic_timestamp, OP_EQ, now - 180); + scheduler_init();
- queue_heuristic = 1000000000L; - queue_heuristic_timestamp = now - 120; + tt_ptr_op(channels_pending, !=, NULL); + tt_ptr_op(run_sched_ev, !=, NULL); + /* We have specified nothing in the torrc and there's no consensus so the + * KIST scheduler is what should be in use */ + tt_ptr_op(scheduler, ==, get_kist_scheduler()); + tt_int_op(sched_run_interval, ==, 10);
- scheduler_update_queue_heuristic(now - 119); - tt_u64_op(queue_heuristic, OP_EQ, 500000000L); - tt_int_op(queue_heuristic_timestamp, OP_EQ, now - 119); + scheduler_free_all();
- scheduler_update_queue_heuristic(now - 116); - tt_u64_op(queue_heuristic, OP_EQ, 62500000L); - tt_int_op(queue_heuristic_timestamp, OP_EQ, now - 116); + UNMOCK(tor_libevent_get_base); + mock_event_free_all();
- qh = scheduler_get_queue_heuristic(); - tt_u64_op(qh, OP_EQ, 0); + tt_ptr_op(channels_pending, ==, NULL); + tt_ptr_op(run_sched_ev, ==, NULL);
done: return; }
+static void +test_scheduler_should_use_kist(void *arg) +{ + (void)arg; + + int res_should, res_freq; + MOCK(get_options, mock_get_options); + + /* Test force disabling of KIST */ + clear_options(); + mocked_options.KISTSchedRunInterval = -1; + res_should = scheduler_should_use_kist(); + res_freq = kist_scheduler_run_interval(NULL); + tt_int_op(res_should, ==, 0); + tt_int_op(res_freq, ==, -1); + + /* Test force enabling of KIST */ + clear_options(); + mocked_options.KISTSchedRunInterval = 1234; + res_should = scheduler_should_use_kist(); + res_freq = kist_scheduler_run_interval(NULL); + tt_int_op(res_should, ==, 1); + tt_int_op(res_freq, ==, 1234); + + /* Test defer to consensus, but no consensus available */ + clear_options(); + mocked_options.KISTSchedRunInterval = 0; + res_should = scheduler_should_use_kist(); + res_freq = kist_scheduler_run_interval(NULL); + tt_int_op(res_should, ==, 1); + tt_int_op(res_freq, ==, 10); + + /* Test defer to consensus, and kist consensus available */ + MOCK(networkstatus_get_param, mock_kist_networkstatus_get_param); + clear_options(); + mocked_options.KISTSchedRunInterval = 0; + res_should = scheduler_should_use_kist(); + res_freq = kist_scheduler_run_interval(NULL); + tt_int_op(res_should, ==, 1); + tt_int_op(res_freq, ==, 12); + UNMOCK(networkstatus_get_param); + + /* Test defer to consensus, and vanilla consensus available */ + MOCK(networkstatus_get_param, mock_vanilla_networkstatus_get_param); + clear_options(); + mocked_options.KISTSchedRunInterval = 0; + res_should = scheduler_should_use_kist(); + res_freq = kist_scheduler_run_interval(NULL); + tt_int_op(res_should, ==, 0); + tt_int_op(res_freq, ==, -1); + UNMOCK(networkstatus_get_param); + + done: + UNMOCK(get_options); + return; +} + +static void +test_scheduler_ns_changed(void *arg) +{ + (void) arg; + + /* + * Currently no scheduler implementations use the old/new consensuses passed + * in scheduler_notify_networkstatus_changed, so it is okay to pass NULL. + * + * "But then what does test actually exercise???" It tests that + * scheduler_notify_networkstatus_changed fetches the correct value from the + * consensus, and then switches the scheduler if necessasry. + */ + + MOCK(get_options, mock_get_options); + clear_options(); + + tt_ptr_op(scheduler, ==, NULL); + + /* Change from vanilla to kist via consensus */ + scheduler = get_vanilla_scheduler(); + MOCK(networkstatus_get_param, mock_kist_networkstatus_get_param); + scheduler_notify_networkstatus_changed(NULL, NULL); + UNMOCK(networkstatus_get_param); + tt_ptr_op(scheduler, ==, get_kist_scheduler()); + + /* Change from kist to vanilla via consensus */ + scheduler = get_kist_scheduler(); + MOCK(networkstatus_get_param, mock_vanilla_networkstatus_get_param); + scheduler_notify_networkstatus_changed(NULL, NULL); + UNMOCK(networkstatus_get_param); + tt_ptr_op(scheduler, ==, get_vanilla_scheduler()); + + /* Doesn't change when using KIST */ + scheduler = get_kist_scheduler(); + MOCK(networkstatus_get_param, mock_kist_networkstatus_get_param); + scheduler_notify_networkstatus_changed(NULL, NULL); + UNMOCK(networkstatus_get_param); + tt_ptr_op(scheduler, ==, get_kist_scheduler()); + + /* Doesn't change when using vanilla */ + scheduler = get_vanilla_scheduler(); + MOCK(networkstatus_get_param, mock_vanilla_networkstatus_get_param); + scheduler_notify_networkstatus_changed(NULL, NULL); + UNMOCK(networkstatus_get_param); + tt_ptr_op(scheduler, ==, get_vanilla_scheduler()); + + done: + UNMOCK(get_options); + return; +} + struct testcase_t scheduler_tests[] = { - { "channel_states", test_scheduler_channel_states, TT_FORK, NULL, NULL }, { "compare_channels", test_scheduler_compare_channels, TT_FORK, NULL, NULL }, + { "channel_states", test_scheduler_channel_states, TT_FORK, NULL, NULL }, { "initfree", test_scheduler_initfree, TT_FORK, NULL, NULL }, - { "loop", test_scheduler_loop, TT_FORK, NULL, NULL }, - { "queue_heuristic", test_scheduler_queue_heuristic, - TT_FORK, NULL, NULL }, + { "loop_vanilla", test_scheduler_loop_vanilla, TT_FORK, NULL, NULL }, + { "loop_kist", test_scheduler_loop_kist, TT_FORK, NULL, NULL }, + { "ns_changed", test_scheduler_ns_changed, TT_FORK, NULL, NULL}, + { "should_use_kist", test_scheduler_should_use_kist, TT_FORK, NULL, NULL }, END_OF_TESTCASES };