[or-cvs] [tor/master 29/38] Convert bufferevents to use rate-limiting.

nickm at torproject.org nickm at torproject.org
Mon Sep 27 20:51:00 UTC 2010


Author: Nick Mathewson <nickm at torproject.org>
Date: Mon, 22 Feb 2010 13:59:34 -0500
Subject: Convert bufferevents to use rate-limiting.
Commit: ffd5070b04b3db4409d8e3dc933ffc7d12b5219d

This requires the latest Git version of Libevent as of 24 March 2010.
In the future, we'll just say it requires Libevent 2.0.5-alpha or
later.

Since Libevent doesn't yet support hierarchical rate limit groups,
there isn't yet support for tracking relayed-bytes separately when
using the bufferevent system.  If a future version does add support
for hierarchical buckets, we can add that back in.
---
 src/common/compat_libevent.c |   24 +++++++++
 src/common/compat_libevent.h |    5 ++
 src/common/tortls.c          |    3 +-
 src/or/config.c              |   11 ++++
 src/or/connection.c          |  118 ++++++++++++++++++++++++++++++++++++++++++
 src/or/connection.h          |    3 +
 src/or/connection_or.c       |   19 +++++++
 src/or/main.c                |   25 ++++++++-
 src/or/or.h                  |    6 ++
 9 files changed, 210 insertions(+), 4 deletions(-)

diff --git a/src/common/compat_libevent.c b/src/common/compat_libevent.c
index 250fa2b..2ae280e 100644
--- a/src/common/compat_libevent.c
+++ b/src/common/compat_libevent.c
@@ -551,3 +551,27 @@ periodic_timer_free(periodic_timer_t *timer)
   tor_free(timer);
 }
 
+#ifdef USE_BUFFEREVENTS
+static const struct timeval *one_tick = NULL;
+/**
+   DOCDOC
+*/
+const struct timeval *tor_libevent_get_one_tick_timeout(void)
+{
+
+  if (PREDICT_UNLIKELY(one_tick == NULL)) {
+    struct event_base *base = tor_libevent_get_base();
+    struct timeval tv;
+    if (TOR_LIBEVENT_TICKS_PER_SECOND == 1) {
+      tv.tv_sec = 1;
+      tv.tv_usec = 0;
+    } else {
+      tv.tv_sec = 0;
+      tv.tv_usec = 1000000 / TOR_LIBEVENT_TICKS_PER_SECOND;
+    }
+    one_tick = event_base_init_common_timeout(base, &tv);
+  }
+  return one_tick;
+}
+#endif
+
diff --git a/src/common/compat_libevent.h b/src/common/compat_libevent.h
index a4011e3..f483d6e 100644
--- a/src/common/compat_libevent.h
+++ b/src/common/compat_libevent.h
@@ -64,5 +64,10 @@ void tor_check_libevent_version(const char *m, int server,
 void tor_check_libevent_header_compatibility(void);
 const char *tor_libevent_get_version_str(void);
 
+#ifdef USE_BUFFEREVENTS
+#define TOR_LIBEVENT_TICKS_PER_SECOND 3
+const struct timeval *tor_libevent_get_one_tick_timeout(void);
+#endif
+
 #endif
 
diff --git a/src/common/tortls.c b/src/common/tortls.c
index 06533ca..3ae3ef8 100644
--- a/src/common/tortls.c
+++ b/src/common/tortls.c
@@ -1699,7 +1699,6 @@ tor_tls_init_bufferevent(tor_tls_t *tls, struct bufferevent *bufev_in,
                                        state,
                                        BEV_OPT_DEFER_CALLBACKS);
 #else
-  /* Disabled: just use filter for now. */
   if (bufev_in) {
     evutil_socket_t s = bufferevent_getfd(bufev_in);
     tor_assert(s == -1 || s == socket);
@@ -1715,7 +1714,7 @@ tor_tls_init_bufferevent(tor_tls_t *tls, struct bufferevent *bufev_in,
                                        tls->ssl,
                                        state,
                                        0);
-                                      //BEV_OPT_DEFER_CALLBACKS);
+                                       //BEV_OPT_DEFER_CALLBACKS);
 #endif
   return out;
 }
diff --git a/src/or/config.c b/src/or/config.c
index 8febe7a..fa2eb73 100644
--- a/src/or/config.c
+++ b/src/or/config.c
@@ -1231,6 +1231,17 @@ options_act(or_options_t *old_options)
   if (accounting_is_enabled(options))
     configure_accounting(time(NULL));
 
+#ifdef USE_BUFFEREVENTS
+  /* If we're using the bufferevents implementation and our rate limits
+   * changed, we need to tell the rate-limiting system about it. */
+  if (!old_options ||
+      old_options->BandwidthRate != options->BandwidthRate ||
+      old_options->BandwidthBurst != options->BandwidthBurst ||
+      old_options->RelayBandwidthRate != options->RelayBandwidthRate ||
+      old_options->RelayBandwidthBurst != options->RelayBandwidthBurst)
+    connection_bucket_init();
+#endif
+
   /* Change the cell EWMA settings */
   cell_ewma_set_scale_factor(options, networkstatus_get_latest_consensus());
 
diff --git a/src/or/connection.c b/src/or/connection.c
index 2944a0d..b6f0d5d 100644
--- a/src/or/connection.c
+++ b/src/or/connection.c
@@ -49,8 +49,10 @@ static void connection_init(time_t now, connection_t *conn, int type,
 static int connection_init_accepted_conn(connection_t *conn,
                                          uint8_t listener_type);
 static int connection_handle_listener_read(connection_t *conn, int new_type);
+#ifndef USE_BUFFEREVENTS
 static int connection_bucket_should_increase(int bucket,
                                              or_connection_t *conn);
+#endif
 static int connection_finished_flushing(connection_t *conn);
 static int connection_flushed_some(connection_t *conn);
 static int connection_finished_connecting(connection_t *conn);
@@ -199,6 +201,7 @@ connection_type_uses_bufferevent(connection_t *conn)
     case CONN_TYPE_DIR:
     case CONN_TYPE_CONTROL:
     case CONN_TYPE_OR:
+    case CONN_TYPE_CPUWORKER:
       return 1;
     default:
       return 0;
@@ -452,6 +455,8 @@ _connection_free(connection_t *conn)
   tor_free(conn->read_event); /* Probably already freed by connection_free. */
   tor_free(conn->write_event); /* Probably already freed by connection_free. */
   IF_HAS_BUFFEREVENT(conn, {
+      /* XXXX this is a workaround. */
+      bufferevent_setcb(conn->bufev, NULL, NULL, NULL, NULL);
       bufferevent_free(conn->bufev);
       conn->bufev = NULL;
   });
@@ -481,6 +486,11 @@ _connection_free(connection_t *conn)
     log_warn(LD_BUG, "called on OR conn with non-zeroed identity_digest");
     connection_or_remove_from_identity_map(TO_OR_CONN(conn));
   }
+#ifdef USE_BUFFEREVENTS
+  if (conn->type == CONN_TYPE_OR && TO_OR_CONN(conn)->bucket_cfg) {
+    ev_token_bucket_cfg_free(TO_OR_CONN(conn)->bucket_cfg);
+  }
+#endif
 
   memset(mem, 0xCC, memlen); /* poison memory */
   tor_free(mem);
@@ -1945,6 +1955,9 @@ connection_is_rate_limited(connection_t *conn)
     return 1;
 }
 
+#ifdef USE_BUFFEREVENTS
+static struct bufferevent_rate_limit_group *global_rate_limit = NULL;
+#else
 extern int global_read_bucket, global_write_bucket;
 extern int global_relayed_read_bucket, global_relayed_write_bucket;
 
@@ -1952,11 +1965,13 @@ extern int global_relayed_read_bucket, global_relayed_write_bucket;
  * we are likely to run dry again this second, so be stingy with the
  * tokens we just put in. */
 static int write_buckets_empty_last_second = 0;
+#endif
 
 /** How many seconds of no active local circuits will make the
  * connection revert to the "relayed" bandwidth class? */
 #define CLIENT_IDLE_TIME_FOR_PRIORITY 30
 
+#ifndef USE_BUFFEREVENTS
 /** Return 1 if <b>conn</b> should use tokens from the "relayed"
  * bandwidth rates, else 0. Currently, only OR conns with bandwidth
  * class 1, and directory conns that are serving data out, count.
@@ -2067,6 +2082,20 @@ connection_bucket_write_limit(connection_t *conn, time_t now)
   return connection_bucket_round_robin(base, priority,
                                        global_bucket, conn_bucket);
 }
+#else
+static ssize_t
+connection_bucket_read_limit(connection_t *conn, time_t now)
+{
+  (void) now;
+  return bufferevent_get_max_to_read(conn->bufev);
+}
+ssize_t
+connection_bucket_write_limit(connection_t *conn, time_t now)
+{
+  (void) now;
+  return bufferevent_get_max_to_write(conn->bufev);
+}
+#endif
 
 /** Return 1 if the global write buckets are low enough that we
  * shouldn't send <b>attempt</b> bytes of low-priority directory stuff
@@ -2091,8 +2120,12 @@ connection_bucket_write_limit(connection_t *conn, time_t now)
 int
 global_write_bucket_low(connection_t *conn, size_t attempt, int priority)
 {
+#ifdef USE_BUFFEREVENTS
+  ssize_t smaller_bucket = bufferevent_get_max_to_write(conn->bufev);
+#else
   int smaller_bucket = global_write_bucket < global_relayed_write_bucket ?
                        global_write_bucket : global_relayed_write_bucket;
+#endif
   if (authdir_mode(get_options()) && priority>1)
     return 0; /* there's always room to answer v2 if we're an auth dir */
 
@@ -2102,8 +2135,10 @@ global_write_bucket_low(connection_t *conn, size_t attempt, int priority)
   if (smaller_bucket < (int)attempt)
     return 1; /* not enough space no matter the priority */
 
+#ifndef USE_BUFFEREVENTS
   if (write_buckets_empty_last_second)
     return 1; /* we're already hitting our limits, no more please */
+#endif
 
   if (priority == 1) { /* old-style v1 query */
     /* Could we handle *two* of these requests within the next two seconds? */
@@ -2119,6 +2154,7 @@ global_write_bucket_low(connection_t *conn, size_t attempt, int priority)
   return 0;
 }
 
+#ifndef USE_BUFFEREVENTS
 /** We just read <b>num_read</b> and wrote <b>num_written</b> bytes
  * onto <b>conn</b>. Decrement buckets appropriately. */
 static void
@@ -2362,6 +2398,88 @@ connection_bucket_should_increase(int bucket, or_connection_t *conn)
 
   return 1;
 }
+#else
+
+static void
+connection_buckets_decrement(connection_t *conn, time_t now,
+                             size_t num_read, size_t num_written)
+{
+  (void) conn;
+  (void) now;
+  (void) num_read;
+  (void) num_written;
+  /* Libevent does this for us. */
+}
+void
+connection_bucket_refill(int seconds_elapsed, time_t now)
+{
+  (void) seconds_elapsed;
+  (void) now;
+  /* Libevent does this for us. */
+}
+void
+connection_bucket_init(void)
+{
+  or_options_t *options = get_options();
+  const struct timeval *tick = tor_libevent_get_one_tick_timeout();
+  struct ev_token_bucket_cfg *bucket_cfg;
+
+  uint64_t rate, burst;
+  if (options->RelayBandwidthRate) {
+    rate = options->RelayBandwidthRate;
+    burst = options->RelayBandwidthBurst;
+  } else {
+    rate = options->BandwidthRate;
+    burst = options->BandwidthBurst;
+  }
+
+  rate /= TOR_LIBEVENT_TICKS_PER_SECOND;
+  bucket_cfg = ev_token_bucket_cfg_new((uint32_t)rate, (uint32_t)burst,
+                                       (uint32_t)rate, (uint32_t)burst,
+                                       tick);
+
+  if (!global_rate_limit) {
+    global_rate_limit =
+      bufferevent_rate_limit_group_new(tor_libevent_get_base(), bucket_cfg);
+  } else {
+    bufferevent_rate_limit_group_set_cfg(global_rate_limit, bucket_cfg);
+  }
+  ev_token_bucket_cfg_free(bucket_cfg);
+}
+
+void
+connection_get_rate_limit_totals(uint64_t *read_out, uint64_t *written_out)
+{
+  if (global_rate_limit == NULL) {
+    *read_out = *written_out = 0;
+  } else {
+    bufferevent_rate_limit_group_get_totals(
+      global_rate_limit, read_out, written_out);
+  }
+}
+
+/** DOCDOC */
+void
+connection_enable_rate_limiting(connection_t *conn)
+{
+  if (conn->bufev) {
+    if (!global_rate_limit)
+      connection_bucket_init();
+    bufferevent_add_to_rate_limit_group(conn->bufev, global_rate_limit);
+  }
+}
+
+static void
+connection_consider_empty_write_buckets(connection_t *conn)
+{
+  (void) conn;
+}
+static void
+connection_consider_empty_read_buckets(connection_t *conn)
+{
+  (void) conn;
+}
+#endif
 
 /** Read bytes from conn-\>s and process them.
  *
diff --git a/src/or/connection.h b/src/or/connection.h
index 83aec0e..a40b1a5 100644
--- a/src/or/connection.h
+++ b/src/or/connection.h
@@ -144,6 +144,9 @@ void connection_handle_read_cb(struct bufferevent *bufev, void *arg);
 void connection_handle_write_cb(struct bufferevent *bufev, void *arg);
 void connection_handle_event_cb(struct bufferevent *bufev, short event,
                                  void *arg);
+void connection_get_rate_limit_totals(uint64_t *read_out,
+                                      uint64_t *written_out);
+void connection_enable_rate_limiting(connection_t *conn);
 #else
 #define connection_type_uses_bufferevent(c) (0)
 #endif
diff --git a/src/or/connection_or.c b/src/or/connection_or.c
index 2fbc230..044197d 100644
--- a/src/or/connection_or.c
+++ b/src/or/connection_or.c
@@ -388,6 +388,21 @@ connection_or_update_token_buckets_helper(or_connection_t *conn, int reset,
 
   conn->bandwidthrate = rate;
   conn->bandwidthburst = burst;
+#ifdef USE_BUFFEREVENTS
+  {
+    const struct timeval *tick = tor_libevent_get_one_tick_timeout();
+    struct ev_token_bucket_cfg *cfg, *old_cfg;
+    int rate_per_tick = rate / TOR_LIBEVENT_TICKS_PER_SECOND;
+    cfg = ev_token_bucket_cfg_new(rate_per_tick, burst, rate_per_tick,
+                                  burst, tick);
+    old_cfg = conn->bucket_cfg;
+    if (conn->_base.bufev)
+      bufferevent_set_rate_limit(conn->_base.bufev, cfg);
+    if (old_cfg)
+      ev_token_bucket_cfg_free(old_cfg);
+    conn->bucket_cfg = cfg;
+  }
+#else
   if (reset) { /* set up the token buckets to be full */
     conn->read_bucket = conn->write_bucket = burst;
     return;
@@ -398,6 +413,7 @@ connection_or_update_token_buckets_helper(or_connection_t *conn, int reset,
     conn->read_bucket = burst;
   if (conn->write_bucket > burst)
     conn->write_bucket = burst;
+#endif
 }
 
 /** Either our set of relays or our per-conn rate limits have changed.
@@ -879,6 +895,9 @@ connection_tls_start_handshake(or_connection_t *conn, int receiving)
       return -1;
     }
     conn->_base.bufev = b;
+    if (conn->bucket_cfg)
+      bufferevent_set_rate_limit(conn->_base.bufev, conn->bucket_cfg);
+    connection_enable_rate_limiting(TO_CONN(conn));
     bufferevent_setcb(b, connection_handle_read_cb,
                       connection_handle_write_cb,
                       connection_or_handle_event_cb,
diff --git a/src/or/main.c b/src/or/main.c
index f6f26b0..1979529 100644
--- a/src/or/main.c
+++ b/src/or/main.c
@@ -76,6 +76,7 @@ static int connection_should_read_from_linked_conn(connection_t *conn);
 
 /********* START VARIABLES **********/
 
+#ifndef USE_BUFFEREVENTS
 int global_read_bucket; /**< Max number of bytes I can read this second. */
 int global_write_bucket; /**< Max number of bytes I can write this second. */
 
@@ -83,13 +84,17 @@ int global_write_bucket; /**< Max number of bytes I can write this second. */
 int global_relayed_read_bucket;
 /** Max number of relayed (bandwidth class 1) bytes I can write this second. */
 int global_relayed_write_bucket;
-
 /** What was the read bucket before the last second_elapsed_callback() call?
  * (used to determine how many bytes we've read). */
 static int stats_prev_global_read_bucket;
 /** What was the write bucket before the last second_elapsed_callback() call?
  * (used to determine how many bytes we've written). */
 static int stats_prev_global_write_bucket;
+#else
+static uint64_t stats_prev_n_read = 0;
+static uint64_t stats_prev_n_written = 0;
+#endif
+
 /* XXX we might want to keep stats about global_relayed_*_bucket too. Or not.*/
 /** How many bytes have we read since we started the process? */
 static uint64_t stats_n_bytes_read = 0;
@@ -1395,6 +1400,9 @@ second_elapsed_callback(periodic_timer_t *timer, void *arg)
   size_t bytes_written;
   size_t bytes_read;
   int seconds_elapsed;
+#ifdef USE_BUFFEREVENTS
+  uint64_t cur_read,cur_written;
+#endif
   or_options_t *options = get_options();
   (void)timer;
   (void)arg;
@@ -1406,9 +1414,15 @@ second_elapsed_callback(periodic_timer_t *timer, void *arg)
   update_approx_time(now);
 
   /* the second has rolled over. check more stuff. */
+  seconds_elapsed = current_second ? (int)(now - current_second) : 0;
+#ifdef USE_BUFFEREVENTS
+  connection_get_rate_limit_totals(&cur_read, &cur_written);
+  bytes_written = ((size_t)cur_written) - stats_prev_n_written;
+  bytes_read = ((size_t)cur_read) - stats_prev_n_read;
+#else
   bytes_written = stats_prev_global_write_bucket - global_write_bucket;
   bytes_read = stats_prev_global_read_bucket - global_read_bucket;
-  seconds_elapsed = current_second ? (int)(now - current_second) : 0;
+#endif
   stats_n_bytes_read += bytes_read;
   stats_n_bytes_written += bytes_written;
   if (accounting_is_enabled(options) && seconds_elapsed >= 0)
@@ -1418,8 +1432,13 @@ second_elapsed_callback(periodic_timer_t *timer, void *arg)
 
   if (seconds_elapsed > 0)
     connection_bucket_refill(seconds_elapsed, now);
+#ifdef USE_BUFFEREVENTS
+  stats_prev_n_written = cur_written;
+  stats_prev_n_read = cur_read;
+#else
   stats_prev_global_read_bucket = global_read_bucket;
   stats_prev_global_write_bucket = global_write_bucket;
+#endif
 
   if (server_mode(options) &&
       !we_are_hibernating() &&
@@ -1620,8 +1639,10 @@ do_main_loop(void)
 
   /* Set up our buckets */
   connection_bucket_init();
+#ifndef USE_BUFFEREVENTS
   stats_prev_global_read_bucket = global_read_bucket;
   stats_prev_global_write_bucket = global_write_bucket;
+#endif
 
   /* initialize the bootstrap status events to know we're starting up */
   control_event_bootstrap(BOOTSTRAP_STATUS_STARTING, 0);
diff --git a/src/or/or.h b/src/or/or.h
index 8f87573..f4f511a 100644
--- a/src/or/or.h
+++ b/src/or/or.h
@@ -1078,10 +1078,16 @@ typedef struct or_connection_t {
   /* bandwidth* and *_bucket only used by ORs in OPEN state: */
   int bandwidthrate; /**< Bytes/s added to the bucket. (OPEN ORs only.) */
   int bandwidthburst; /**< Max bucket size for this conn. (OPEN ORs only.) */
+#ifndef USE_BUFFEREVENTS
   int read_bucket; /**< When this hits 0, stop receiving. Every second we
                     * add 'bandwidthrate' to this, capping it at
                     * bandwidthburst. (OPEN ORs only) */
   int write_bucket; /**< When this hits 0, stop writing. Like read_bucket. */
+#else
+  /** DOCDOC */
+  /* XXXX we could share this among all connections. */
+  struct ev_token_bucket_cfg *bucket_cfg;
+#endif
   int n_circuits; /**< How many circuits use this connection as p_conn or
                    * n_conn ? */
 
-- 
1.7.1




More information about the tor-commits mailing list