[tor-commits] [tor/master] Add a new token-bucket backend abstraction, with tests

nickm at torproject.org nickm at torproject.org
Fri Apr 13 14:47:29 UTC 2018


commit c376200f6a77b2509928bc08d2aa1245028cec30
Author: Nick Mathewson <nickm at torproject.org>
Date:   Tue Apr 10 11:23:14 2018 -0400

    Add a new token-bucket backend abstraction, with tests
    
    This differs from our previous token bucket abstraction in a few
    ways:
    
      1) It is an abstraction, and not a collection of fields.
      2) It is meant to be used with monotonic timestamps, which should
         produce better results than calling gettimeofday over and over.
---
 src/common/include.am     |   2 +
 src/common/token_bucket.c | 180 +++++++++++++++++++++++++++++++++++++++++
 src/common/token_bucket.h |  72 +++++++++++++++++
 src/test/include.am       |   1 +
 src/test/test.c           |   1 +
 src/test/test.h           |   1 +
 src/test/test_bwmgt.c     | 199 ++++++++++++++++++++++++++++++++++++++++++++++
 7 files changed, 456 insertions(+)

diff --git a/src/common/include.am b/src/common/include.am
index 73c51ff0b..87ab9d79e 100644
--- a/src/common/include.am
+++ b/src/common/include.am
@@ -97,6 +97,7 @@ LIBOR_A_SRC = \
   src/common/util_process.c				\
   src/common/sandbox.c					\
   src/common/storagedir.c				\
+  src/common/token_bucket.c				\
   src/common/workqueue.c				\
   $(libor_extra_source)					\
   $(threads_impl_source)				\
@@ -184,6 +185,7 @@ COMMONHEADERS = \
   src/common/storagedir.h			\
   src/common/testsupport.h			\
   src/common/timers.h				\
+  src/common/token_bucket.h			\
   src/common/torint.h				\
   src/common/torlog.h				\
   src/common/tortls.h				\
diff --git a/src/common/token_bucket.c b/src/common/token_bucket.c
new file mode 100644
index 000000000..f4d2cccff
--- /dev/null
+++ b/src/common/token_bucket.c
@@ -0,0 +1,180 @@
+/* Copyright (c) 2018, The Tor Project, Inc. */
+/* See LICENSE for licensing information */
+
+/**
+ * \file token_bucket.c
+ * \brief Functions to use and manipulate token buckets, used for
+ *    rate-limiting on connections and globally.
+ *
+ * Tor uses these token buckets to keep track of bandwidth usage, and
+ * sometimes other things too.
+ *
+ * The time units we use internally are based on "timestamp" units -- see
+ * monotime_coarse_to_stamp() for a rationale.
+ *
+ * Token buckets may become negative.
+ **/
+
+#define TOKEN_BUCKET_PRIVATE
+
+#include "token_bucket.h"
+#include "util_bug.h"
+
+/** Convert a rate in bytes per second to a rate in bytes per step */
+static uint32_t
+rate_per_sec_to_rate_per_step(uint32_t rate)
+{
+  /*
+    The precise calculation we'd want to do is
+
+    (rate / 1000) * to_approximate_msec(TICKS_PER_STEP).  But to minimize
+    rounding error, we do it this way instead, and divide last.
+  */
+  return (uint32_t)
+    monotime_coarse_stamp_units_to_approx_msec(rate*TICKS_PER_STEP)/1000;
+}
+
+/**
+ * Initialize a token bucket in *<b>bucket</b>, set up to allow <b>rate</b>
+ * bytes per second, with a maximum burst of <b>burst</b> bytes. The bucket
+ * is created such that <b>now_ts</b> is the current timestamp.  The bucket
+ * starts out full.
+ */
+void
+token_bucket_init(token_bucket_t *bucket,
+                  uint32_t rate,
+                  uint32_t burst,
+                  uint32_t now_ts)
+{
+  memset(bucket, 0, sizeof(token_bucket_t));
+  token_bucket_adjust(bucket, rate, burst);
+  token_bucket_reset(bucket, now_ts);
+}
+
+/**
+ * Change the configured rate (in bytes per second) and burst (in bytes)
+ * for the token bucket in *<b>bucket</b>.
+ */
+void
+token_bucket_adjust(token_bucket_t *bucket,
+                    uint32_t rate,
+                    uint32_t burst)
+{
+  tor_assert_nonfatal(rate > 0);
+  tor_assert_nonfatal(burst > 0);
+  if (burst > TOKEN_BUCKET_MAX_BURST)
+    burst = TOKEN_BUCKET_MAX_BURST;
+
+  bucket->rate = rate_per_sec_to_rate_per_step(rate);
+  bucket->burst = burst;
+  bucket->read_bucket = MIN(bucket->read_bucket, (int32_t)burst);
+  bucket->write_bucket = MIN(bucket->write_bucket, (int32_t)burst);
+}
+
+/**
+ * Reset <b>bucket</b> to be full, as of timestamp <b>now_ts</b>.
+ */
+void
+token_bucket_reset(token_bucket_t *bucket,
+                   uint32_t now_ts)
+{
+  bucket->read_bucket = bucket->burst;
+  bucket->write_bucket = bucket->burst;
+  bucket->last_refilled_at_ts = now_ts;
+}
+
+/* Helper: see token_bucket_refill */
+static int
+refill_single_bucket(int32_t *bucketptr,
+                     const uint32_t rate,
+                     const int32_t burst,
+                     const uint32_t elapsed_steps)
+{
+  const int was_empty = (*bucketptr <= 0);
+  /* The casts here prevent an underflow.
+   *
+   * Note that even if the bucket value is negative, subtracting it from
+   * "burst" will still produce a correct result.  If this result is
+   * ridiculously high, then the "elapsed_steps > gap / rate" check below
+   * should catch it. */
+  const size_t gap = ((size_t)burst) - ((size_t)*bucketptr);
+
+  if (elapsed_steps > gap / rate) {
+    *bucketptr = burst;
+  } else {
+    *bucketptr += rate * elapsed_steps;
+  }
+
+  return was_empty && *bucketptr > 0;
+}
+
+/**
+ * Refill <b>bucket</b> as appropriate, given that the current timestamp
+ * is <b>now_ts</b>.
+ *
+ * Return a bitmask containing TB_READ iff read bucket was empty and became
+ * nonempty, and TB_WRITE iff the write bucket was empty and became nonempty.
+ */
+int
+token_bucket_refill(token_bucket_t *bucket,
+                    uint32_t now_ts)
+{
+  const uint32_t elapsed_ticks = (now_ts - bucket->last_refilled_at_ts);
+  const uint32_t elapsed_steps = elapsed_ticks / TICKS_PER_STEP;
+
+  if (!elapsed_steps) {
+    /* Note that if less than one whole step elapsed, we don't advance the
+     * time in last_refilled_at_ts. That's intentional: we want to make sure
+     * that we add some bytes to it eventually. */
+    return 0;
+  }
+
+  int flags = 0;
+  if (refill_single_bucket(&bucket->read_bucket,
+                           bucket->rate, bucket->burst, elapsed_steps))
+    flags |= TB_READ;
+  if (refill_single_bucket(&bucket->write_bucket,
+                           bucket->rate, bucket->burst, elapsed_steps))
+    flags |= TB_WRITE;
+
+  bucket->last_refilled_at_ts = now_ts;
+  return flags;
+}
+
+static int
+decrement_single_bucket(int32_t *bucketptr,
+                        ssize_t n)
+{
+  if (BUG(n < 0))
+    return 0;
+  const int becomes_empty = *bucketptr > 0 && n >= *bucketptr;
+  *bucketptr -= n;
+  return becomes_empty;
+}
+
+/**
+ * Decrement the read token bucket in <b>bucket</b> by <b>n</b> bytes.
+ *
+ * Return true if the bucket was nonempty and became empty; return false
+ * otherwise.
+ */
+int
+token_bucket_dec_read(token_bucket_t *bucket,
+                      ssize_t n)
+{
+  return decrement_single_bucket(&bucket->read_bucket, n);
+}
+
+/**
+ * Decrement the write token bucket in <b>bucket</b> by <b>n</b> bytes.
+ *
+ * Return true if the bucket was nonempty and became empty; return false
+ * otherwise.
+ */
+int
+token_bucket_dec_write(token_bucket_t *bucket,
+                       ssize_t n)
+{
+  return decrement_single_bucket(&bucket->write_bucket, n);
+}
+
diff --git a/src/common/token_bucket.h b/src/common/token_bucket.h
new file mode 100644
index 000000000..ef0735219
--- /dev/null
+++ b/src/common/token_bucket.h
@@ -0,0 +1,72 @@
+/* Copyright (c) 2018, The Tor Project, Inc. */
+/* See LICENSE for licensing information */
+
+/**
+ * \file token_bucket.h
+ * \brief Headers for token_bucket.c
+ **/
+
+#ifndef TOR_TOKEN_BUCKET_H
+#define TOR_TOKEN_BUCKET_H
+
+#include "torint.h"
+
+typedef struct token_bucket_t {
+  uint32_t rate;
+  int32_t burst;
+  int32_t read_bucket;
+  int32_t write_bucket;
+  uint32_t last_refilled_at_ts;
+} token_bucket_t;
+
+#define TOKEN_BUCKET_MAX_BURST INT32_MAX
+
+void token_bucket_init(token_bucket_t *bucket,
+                       uint32_t rate,
+                       uint32_t burst,
+                       uint32_t now_ts);
+
+void token_bucket_adjust(token_bucket_t *bucket,
+                         uint32_t rate, uint32_t burst);
+
+void token_bucket_reset(token_bucket_t *bucket,
+                        uint32_t now_ts);
+
+#define TB_READ 1
+#define TB_WRITE 2
+
+int token_bucket_refill(token_bucket_t *bucket,
+                        uint32_t now_ts);
+
+int token_bucket_dec_read(token_bucket_t *bucket,
+                          ssize_t n);
+int token_bucket_dec_write(token_bucket_t *bucket,
+                           ssize_t n);
+
+static inline size_t token_bucket_get_read(const token_bucket_t *bucket);
+static inline size_t
+token_bucket_get_read(const token_bucket_t *bucket)
+{
+  const ssize_t b = bucket->read_bucket;
+  return b >= 0 ? b : 0;
+}
+
+static inline size_t token_bucket_get_write(const token_bucket_t *bucket);
+static inline size_t
+token_bucket_get_write(const token_bucket_t *bucket)
+{
+  const ssize_t b = bucket->write_bucket;
+  return b >= 0 ? b : 0;
+}
+
+#ifdef TOKEN_BUCKET_PRIVATE
+
+/* To avoid making the rates too small, we consider units of "steps",
+ * where a "step" is defined as this many timestamp ticks.  Keep this
+ * a power of two if you can. */
+#define TICKS_PER_STEP 16
+
+#endif
+
+#endif /* TOR_TOKEN_BUCKET_H */
+
diff --git a/src/test/include.am b/src/test/include.am
index e98b056a4..2da50de01 100644
--- a/src/test/include.am
+++ b/src/test/include.am
@@ -89,6 +89,7 @@ src_test_test_SOURCES = \
 	src/test/test_address.c \
 	src/test/test_address_set.c \
 	src/test/test_buffers.c \
+	src/test/test_bwmgt.c \
 	src/test/test_cell_formats.c \
 	src/test/test_cell_queue.c \
 	src/test/test_channel.c \
diff --git a/src/test/test.c b/src/test/test.c
index 4f2fbc693..7df385bc3 100644
--- a/src/test/test.c
+++ b/src/test/test.c
@@ -812,6 +812,7 @@ struct testgroup_t testgroups[] = {
   { "address/", address_tests },
   { "address_set/", address_set_tests },
   { "buffer/", buffer_tests },
+  { "bwmgt/", bwmgt_tests },
   { "cellfmt/", cell_format_tests },
   { "cellqueue/", cell_queue_tests },
   { "channel/", channel_tests },
diff --git a/src/test/test.h b/src/test/test.h
index 02ec9bda8..95715da7a 100644
--- a/src/test/test.h
+++ b/src/test/test.h
@@ -178,6 +178,7 @@ extern struct testcase_t accounting_tests[];
 extern struct testcase_t addr_tests[];
 extern struct testcase_t address_tests[];
 extern struct testcase_t address_set_tests[];
+extern struct testcase_t bwmgt_tests[];
 extern struct testcase_t buffer_tests[];
 extern struct testcase_t cell_format_tests[];
 extern struct testcase_t cell_queue_tests[];
diff --git a/src/test/test_bwmgt.c b/src/test/test_bwmgt.c
new file mode 100644
index 000000000..7bcfcf7fe
--- /dev/null
+++ b/src/test/test_bwmgt.c
@@ -0,0 +1,199 @@
+/* Copyright (c) 2018, The Tor Project, Inc. */
+/* See LICENSE for licensing information */
+
+/**
+ * \file test_bwmgt.c
+ * \brief tests for bandwidth management / token bucket functions
+ */
+
+#define TOKEN_BUCKET_PRIVATE
+
+#include "or.h"
+#include "test.h"
+
+#include "token_bucket.h"
+
+// an imaginary time, in timestamp units. Chosen so it will roll over.
+static const uint32_t START_TS = UINT32_MAX-10;
+static const int32_t KB = 1024;
+
+static void
+test_bwmgt_token_buf_init(void *arg)
+{
+  (void)arg;
+  token_bucket_t b;
+
+  token_bucket_init(&b, 16*KB, 64*KB, START_TS);
+  // Burst is correct
+  tt_uint_op(b.burst, OP_EQ, 64*KB);
+  // Rate is correct, within 1 percent.
+  {
+    uint32_t ticks_per_sec =
+      (uint32_t) monotime_msec_to_approx_coarse_stamp_units(1000);
+    uint32_t rate_per_sec = (b.rate * ticks_per_sec / TICKS_PER_STEP);
+
+    tt_uint_op(rate_per_sec, OP_GT, 16*KB-160);
+    tt_uint_op(rate_per_sec, OP_LT, 16*KB+160);
+  }
+  // Bucket starts out full:
+  tt_uint_op(b.last_refilled_at_ts, OP_EQ, START_TS);
+  tt_int_op(b.read_bucket, OP_EQ, 64*KB);
+
+ done:
+  ;
+}
+
+static void
+test_bwmgt_token_buf_adjust(void *arg)
+{
+  (void)arg;
+  token_bucket_t b;
+
+  token_bucket_init(&b, 16*KB, 64*KB, START_TS);
+
+  uint32_t rate_orig = b.rate;
+  // Increasing burst
+  token_bucket_adjust(&b, 16*KB, 128*KB);
+  tt_uint_op(b.rate, OP_EQ, rate_orig);
+  tt_uint_op(b.read_bucket, OP_EQ, 64*KB);
+  tt_uint_op(b.burst, OP_EQ, 128*KB);
+
+  // Decreasing burst but staying above bucket
+  token_bucket_adjust(&b, 16*KB, 96*KB);
+  tt_uint_op(b.rate, OP_EQ, rate_orig);
+  tt_uint_op(b.read_bucket, OP_EQ, 64*KB);
+  tt_uint_op(b.burst, OP_EQ, 96*KB);
+
+  // Decreasing burst below bucket,
+  token_bucket_adjust(&b, 16*KB, 48*KB);
+  tt_uint_op(b.rate, OP_EQ, rate_orig);
+  tt_uint_op(b.read_bucket, OP_EQ, 48*KB);
+  tt_uint_op(b.burst, OP_EQ, 48*KB);
+
+  // Changing rate.
+  token_bucket_adjust(&b, 32*KB, 48*KB);
+  tt_uint_op(b.rate, OP_GE, rate_orig*2 - 10);
+  tt_uint_op(b.rate, OP_LE, rate_orig*2 + 10);
+  tt_uint_op(b.read_bucket, OP_EQ, 48*KB);
+  tt_uint_op(b.burst, OP_EQ, 48*KB);
+
+ done:
+  ;
+}
+
+static void
+test_bwmgt_token_buf_dec(void *arg)
+{
+  (void)arg;
+  token_bucket_t b;
+  token_bucket_init(&b, 16*KB, 64*KB, START_TS);
+
+  // full-to-not-full.
+  tt_int_op(0, OP_EQ, token_bucket_dec_read(&b, KB));
+  tt_int_op(b.read_bucket, OP_EQ, 63*KB);
+
+  // Full to almost-not-full
+  tt_int_op(0, OP_EQ, token_bucket_dec_read(&b, 63*KB - 1));
+  tt_int_op(b.read_bucket, OP_EQ, 1);
+
+  // almost-not-full to empty.
+  tt_int_op(1, OP_EQ, token_bucket_dec_read(&b, 1));
+  tt_int_op(b.read_bucket, OP_EQ, 0);
+
+  // reset bucket, try full-to-empty
+  token_bucket_init(&b, 16*KB, 64*KB, START_TS);
+  tt_int_op(1, OP_EQ, token_bucket_dec_read(&b, 64*KB));
+  tt_int_op(b.read_bucket, OP_EQ, 0);
+
+  // reset bucket, try underflow.
+  token_bucket_init(&b, 16*KB, 64*KB, START_TS);
+  tt_int_op(1, OP_EQ, token_bucket_dec_read(&b, 64*KB + 1));
+  tt_int_op(b.read_bucket, OP_EQ, -1);
+
+  // A second underflow does not make the bucket empty.
+  tt_int_op(0, OP_EQ, token_bucket_dec_read(&b, 1000));
+  tt_int_op(b.read_bucket, OP_EQ, -1001);
+
+ done:
+  ;
+}
+
+static void
+test_bwmgt_token_buf_refill(void *arg)
+{
+  (void)arg;
+  token_bucket_t b;
+  const uint32_t SEC =
+    (uint32_t)monotime_msec_to_approx_coarse_stamp_units(1000);
+  token_bucket_init(&b, 16*KB, 64*KB, START_TS);
+
+  /* Make the buffer much emptier, then let one second elapse. */
+  token_bucket_dec_read(&b, 48*KB);
+  tt_int_op(b.read_bucket, OP_EQ, 16*KB);
+  tt_int_op(0, OP_EQ, token_bucket_refill(&b, START_TS + SEC));
+  tt_int_op(b.read_bucket, OP_GT, 32*KB - 300);
+  tt_int_op(b.read_bucket, OP_LT, 32*KB + 300);
+
+  /* Another half second. */
+  tt_int_op(0, OP_EQ, token_bucket_refill(&b, START_TS + SEC*3/2));
+  tt_int_op(b.read_bucket, OP_GT, 40*KB - 400);
+  tt_int_op(b.read_bucket, OP_LT, 40*KB + 400);
+  tt_uint_op(b.last_refilled_at_ts, OP_EQ, START_TS + SEC*3/2);
+
+  /* No time: nothing happens. */
+  {
+    const uint32_t bucket_orig = b.read_bucket;
+    tt_int_op(0, OP_EQ, token_bucket_refill(&b, START_TS + SEC*3/2));
+    tt_int_op(b.read_bucket, OP_EQ, bucket_orig);
+  }
+
+  /* Another 30 seconds: fill the bucket. */
+  tt_int_op(0, OP_EQ, token_bucket_refill(&b, START_TS + SEC*3/2 + SEC*30));
+  tt_int_op(b.read_bucket, OP_EQ, b.burst);
+  tt_uint_op(b.last_refilled_at_ts, OP_EQ, START_TS + SEC*3/2 + SEC*30);
+
+  /* Another 30 seconds: nothing happens. */
+  tt_int_op(0, OP_EQ, token_bucket_refill(&b, START_TS + SEC*3/2 + SEC*60));
+  tt_int_op(b.read_bucket, OP_EQ, b.burst);
+  tt_uint_op(b.last_refilled_at_ts, OP_EQ, START_TS + SEC*3/2 + SEC*60);
+
+  /* Empty the bucket, let two seconds pass, and make sure that a refill is
+   * noticed. */
+  tt_int_op(1, OP_EQ, token_bucket_dec_read(&b, b.burst));
+  tt_int_op(0, OP_EQ, b.read_bucket);
+  tt_int_op(1, OP_EQ, token_bucket_refill(&b, START_TS + SEC*3/2 + SEC*61));
+  tt_int_op(0, OP_EQ, token_bucket_refill(&b, START_TS + SEC*3/2 + SEC*62));
+  tt_int_op(b.read_bucket, OP_GT, 32*KB-300);
+  tt_int_op(b.read_bucket, OP_LT, 32*KB+300);
+
+  /* Underflow the bucket, make sure we detect when it has tokens again. */
+  tt_int_op(1, OP_EQ, token_bucket_dec_read(&b, b.read_bucket+16*KB));
+  tt_int_op(-16*KB, OP_EQ, b.read_bucket);
+  // half a second passes...
+  tt_int_op(0, OP_EQ, token_bucket_refill(&b, START_TS + SEC*64));
+  tt_int_op(b.read_bucket, OP_GT, -8*KB-200);
+  tt_int_op(b.read_bucket, OP_LT, -8*KB+200);
+  // a second passes
+  tt_int_op(1, OP_EQ, token_bucket_refill(&b, START_TS + SEC*65));
+  tt_int_op(b.read_bucket, OP_GT, 8*KB-200);
+  tt_int_op(b.read_bucket, OP_LT, 8*KB+200);
+
+  // a ridiculous amount of time passes
+  tt_int_op(0, OP_EQ, token_bucket_refill(&b, START_TS + SEC*64));
+  tt_int_op(b.read_bucket, OP_EQ, b.burst);
+
+ done:
+  ;
+}
+
+#define BWMGT(name)                                          \
+  { #name, test_bwmgt_ ## name , 0, NULL, NULL }
+
+struct testcase_t bwmgt_tests[] = {
+  BWMGT(token_buf_init),
+  BWMGT(token_buf_adjust),
+  BWMGT(token_buf_dec),
+  BWMGT(token_buf_refill),
+  END_OF_TESTCASES
+};
+





More information about the tor-commits mailing list