[tor-commits] [tor/master] Schedule according to a queue size heuristic

nickm at torproject.org nickm at torproject.org
Fri Nov 28 03:58:31 UTC 2014


commit 1275002a46dfb131f6db5c0fe28bc1828db327e2
Author: Andrea Shepard <andrea at torproject.org>
Date:   Thu Nov 14 04:45:47 2013 -0800

    Schedule according to a queue size heuristic
---
 src/or/channel.c   |    4 ++
 src/or/scheduler.c |  192 +++++++++++++++++++++++++++++++++++++++++++++-------
 src/or/scheduler.h |    3 +
 3 files changed, 173 insertions(+), 26 deletions(-)

diff --git a/src/or/channel.c b/src/or/channel.c
index dddd7ab..7ed3894 100644
--- a/src/or/channel.c
+++ b/src/or/channel.c
@@ -4563,6 +4563,8 @@ channel_update_xmit_queue_size(channel_t *chan)
                 U64_FORMAT ", new size is " U64_FORMAT,
                 U64_PRINTF_ARG(adj), U64_PRINTF_ARG(chan->global_identifier),
                 U64_PRINTF_ARG(estimated_total_queue_size));
+      /* Tell the scheduler we're increasing the queue size */
+      scheduler_adjust_queue_size(chan, 1, adj);
     }
   } else if (queued < chan->bytes_queued_for_xmit) {
     adj = chan->bytes_queued_for_xmit - queued;
@@ -4585,6 +4587,8 @@ channel_update_xmit_queue_size(channel_t *chan)
                 U64_FORMAT ", new size is " U64_FORMAT,
                 U64_PRINTF_ARG(adj), U64_PRINTF_ARG(chan->global_identifier),
                 U64_PRINTF_ARG(estimated_total_queue_size));
+      /* Tell the scheduler we're decreasing the queue size */
+      scheduler_adjust_queue_size(chan, -1, adj);
     }
   }
 }
diff --git a/src/or/scheduler.c b/src/or/scheduler.c
index 1c6a2fd..1baf375 100644
--- a/src/or/scheduler.c
+++ b/src/or/scheduler.c
@@ -7,7 +7,10 @@
  **/
 
 #include "or.h"
+
+#define TOR_CHANNEL_INTERNAL_ /* For channel_flush_some_cells() */
 #include "channel.h"
+
 #include "compat_libevent.h"
 #include "scheduler.h"
 
@@ -17,6 +20,9 @@
 #include <event.h>
 #endif
 
+#define SCHED_Q_LOW_WATER 16384
+#define SCHED_Q_HIGH_WATER (2 * SCHED_Q_LOW_WATER)
+
 /*
  * Write scheduling works by keeping track of lists of channels that can
  * accept cells, and have cells to write.  From the scheduler's perspective,
@@ -118,6 +124,19 @@ static smartlist_t *channels_pending = NULL;
 
 static struct event *run_sched_ev = NULL;
 
+/*
+ * Queue heuristic; this is not the queue size, but an 'effective queuesize'
+ * that ages out contributions from stalled channels.
+ */
+
+static uint64_t queue_heuristic = 0;
+
+/*
+ * Timestamp for last queue heuristic update
+ */
+
+static time_t queue_heuristic_timestamp = 0;
+
 /* Scheduler static function declarations */
 
 static void scheduler_evt_callback(evutil_socket_t fd,
@@ -127,6 +146,8 @@ static void scheduler_retrigger(void);
 #if 0
 static void scheduler_trigger(void);
 #endif
+static uint64_t scheduler_get_queue_heuristic(void);
+static void scheduler_update_queue_heuristic(time_t now);
 
 /* Scheduler function implementations */
 
@@ -281,6 +302,8 @@ scheduler_init(void)
   channels_waiting_for_cells = smartlist_new();
   channels_waiting_to_write = smartlist_new();
   channels_pending = smartlist_new();
+  queue_heuristic = 0;
+  queue_heuristic_timestamp = approx_time();
 }
 
 /** Check if there's more scheduling work */
@@ -290,7 +313,8 @@ scheduler_more_work(void)
 {
   tor_assert(channels_pending);
 
-  return (smartlist_len(channels_pending) > 0) ? 1 : 0;
+  return ((scheduler_get_queue_heuristic() < SCHED_Q_LOW_WATER) &&
+          ((smartlist_len(channels_pending) > 0))) ? 1 : 0;
 }
 
 /** Retrigger the scheduler in a way safe to use from the callback */
@@ -324,39 +348,70 @@ void
 scheduler_run(void)
 {
   smartlist_t *tmp = NULL;
-  int n_cells;
+  int n_cells, n_chans_before, n_chans_after;
+  uint64_t q_len_before, q_heur_before, q_len_after, q_heur_after;
   ssize_t flushed, flushed_this_time;
 
   log_debug(LD_SCHED, "We have a chance to run the scheduler");
 
-  tmp = channels_pending;
-  channels_pending = smartlist_new();
+  if (scheduler_get_queue_heuristic() < SCHED_Q_LOW_WATER) {
+    n_chans_before = smartlist_len(channels_pending);
+    q_len_before = channel_get_global_queue_estimate();
+    q_heur_before = scheduler_get_queue_heuristic();
+    tmp = channels_pending;
+    channels_pending = smartlist_new();
 
-  /* For now, just run the old scheduler on all the chans in the list */
+    /*
+     * For now, just run the old scheduler on all the chans in the list, until
+     * we hit the high-water mark.  TODO real channel priority API
+     */
 
-  SMARTLIST_FOREACH_BEGIN(tmp, channel_t *, chan) {
-    n_cells = channel_num_cells_writeable(chan);
-    if (n_cells > 0) {
-      log_debug(LD_SCHED,
-                "Scheduler saw pending channel " U64_FORMAT " at %p with "
-                "%d cells writeable",
-                U64_PRINTF_ARG(chan->global_identifier), chan, n_cells);
-
-      flushed = 0;
-      while (flushed < n_cells) {
-        flushed_this_time = channel_flush_some_cells(chan, n_cells - flushed);
-        if (flushed_this_time <= 0) break;
-        flushed += flushed_this_time;
+    SMARTLIST_FOREACH_BEGIN(tmp, channel_t *, chan) {
+      if (scheduler_get_queue_heuristic() <= SCHED_Q_HIGH_WATER) {
+        n_cells = channel_num_cells_writeable(chan);
+        if (n_cells > 0) {
+          log_debug(LD_SCHED,
+                    "Scheduler saw pending channel " U64_FORMAT " at %p with "
+                    "%d cells writeable",
+                    U64_PRINTF_ARG(chan->global_identifier), chan, n_cells);
+
+          flushed = 0;
+          while (flushed < n_cells) {
+            flushed_this_time =
+              channel_flush_some_cells(chan, n_cells - flushed);
+            if (flushed_this_time <= 0) break;
+            flushed += flushed_this_time;
+          }
+
+          log_debug(LD_SCHED,
+                    "Scheduler flushed %d cells onto pending channel "
+                    U64_FORMAT " at %p",
+                    flushed, U64_PRINTF_ARG(chan->global_identifier), chan);
+        } else {
+          log_info(LD_SCHED,
+                   "Scheduler saw pending channel " U64_FORMAT " at %p with "
+                   "no cells writeable",
+                   U64_PRINTF_ARG(chan->global_identifier), chan);
+        }
+      } else {
+        /* Not getting it this round; put it back on the list */
+        smartlist_add(channels_pending, chan);
       }
-    } else {
-      log_info(LD_SCHED,
-               "Scheduler saw pending channel " U64_FORMAT " at %p with "
-               "no cells writeable",
-               U64_PRINTF_ARG(chan->global_identifier), chan);
-    }
-  } SMARTLIST_FOREACH_END(chan);
+    } SMARTLIST_FOREACH_END(chan);
 
-  smartlist_free(tmp);
+    smartlist_free(tmp);
+
+    n_chans_after = smartlist_len(channels_pending);
+    q_len_after = channel_get_global_queue_estimate();
+    q_heur_after = scheduler_get_queue_heuristic();
+    log_debug(LD_SCHED,
+              "Scheduler handled %d of %d pending channels, queue size from "
+              U64_FORMAT " to " U64_FORMAT ", queue heuristic from "
+              U64_FORMAT " to " U64_FORMAT,
+              n_chans_before - n_chans_after, n_chans_before,
+              U64_PRINTF_ARG(q_len_before), U64_PRINTF_ARG(q_len_after),
+              U64_PRINTF_ARG(q_heur_before), U64_PRINTF_ARG(q_heur_after));
+  }
 }
 
 /** Trigger the scheduling event so we run the scheduler later */
@@ -420,3 +475,88 @@ scheduler_channel_wants_writes(channel_t *chan)
   if (became_pending) scheduler_retrigger();
 }
 
+/**
+ * Notify the scheduler of a queue size adjustment, to recalculate the
+ * queue heuristic.
+ */
+
+void
+scheduler_adjust_queue_size(channel_t *chan, char dir, uint64_t adj)
+{
+  time_t now = approx_time();
+
+  log_debug(LD_SCHED,
+            "Queue size adjustment by %s" U64_FORMAT " for channel "
+            U64_FORMAT,
+            (dir >= 0) ? "+" : "-",
+            U64_PRINTF_ARG(adj),
+            U64_PRINTF_ARG(chan->global_identifier));
+
+  /* Get the queue heuristic up to date */
+  scheduler_update_queue_heuristic(now);
+
+  /* Adjust as appropriate */
+  if (dir >= 0) {
+    /* Increasing it */
+    queue_heuristic += adj;
+  } else {
+    /* Decreasing it */
+    if (queue_heuristic > adj) queue_heuristic -= adj;
+    else queue_heuristic = 0;
+  }
+
+  log_debug(LD_SCHED,
+            "Queue heuristic is now " U64_FORMAT,
+            U64_PRINTF_ARG(queue_heuristic));
+}
+
+/**
+ * Query the current value of the queue heuristic
+ */
+
+static uint64_t
+scheduler_get_queue_heuristic(void)
+{
+  time_t now = approx_time();
+
+  scheduler_update_queue_heuristic(now);
+
+  return queue_heuristic;
+}
+
+/**
+ * Adjust the queue heuristic value to the present time
+ */
+
+static void
+scheduler_update_queue_heuristic(time_t now)
+{
+  time_t diff;
+
+  if (queue_heuristic_timestamp == 0) {
+    /*
+     * Nothing we can sensibly do; must not have been initted properly.
+     * Oh well.
+     */
+    queue_heuristic_timestamp = now;
+  } else if (queue_heuristic_timestamp < now) {
+    diff = now - queue_heuristic_timestamp;
+    /*
+     * This is a simple exponential age-out; the other proposed alternative
+     * was a linear age-out using the bandwidth history in rephist.c; I'm
+     * going with this out of concern that if an adversary can jam the
+     * scheduler long enough, it would cause the bandwidth to drop to
+     * zero and render the aging mechanism ineffective thereafter.
+     */
+    if (0 <= diff && diff < 64) queue_heuristic >>= diff;
+    else queue_heuristic = 0;
+
+    queue_heuristic_timestamp = now;
+
+    log_debug(LD_SCHED,
+              "Queue heuristic is now " U64_FORMAT,
+              U64_PRINTF_ARG(queue_heuristic));
+  }
+  /* else no update needed, or time went backward */
+}
+
diff --git a/src/or/scheduler.h b/src/or/scheduler.h
index b25e36e..8fe59cb 100644
--- a/src/or/scheduler.h
+++ b/src/or/scheduler.h
@@ -27,5 +27,8 @@ void scheduler_channel_wants_writes(channel_t *chan);
 /* Notify the scheduler of a channel being closed */
 void scheduler_release_channel(channel_t *chan);
 
+/* Notify scheduler of queue size adjustments */
+void scheduler_adjust_queue_size(channel_t *chan, char dir, uint64_t adj);
+
 #endif /* !defined(TOR_SCHEDULER_H) */
 





More information about the tor-commits mailing list