[tor-commits] [tor/master] Refactor our logic for sending events to controllers

nickm at torproject.org nickm at torproject.org
Tue Aug 18 13:10:55 UTC 2015


commit bab221f1131e3c0552b3564ea72800d1d4a8facf
Author: Nick Mathewson <nickm at torproject.org>
Date:   Thu Jul 30 10:43:37 2015 -0400

    Refactor our logic for sending events to controllers
    
    Previously we'd put these strings right on the controllers'
    outbufs. But this could cause some trouble, for these reasons:
    
      1) Calling the network stack directly here would make a huge portion
         of our networking code (from which so much of the rest of Tor is
         reachable) reachable from everything that potentially generated
         controller events.
    
      2) Since _some_ events (EVENT_ERR for instance) would cause us to
         call connection_flush(), every control_event_* function would
         appear to be able to reach even _more_ of the network stack in
         our cllgraph.
    
      3) Every time we generated an event, we'd have to walk the whole
         connection list, which isn't exactly fast.
    
    This is an attempt to break down the "blob" described in
    http://archives.seul.org/tor/dev/Mar-2015/msg00197.html -- the set of
    functions from which nearly all the other functions in Tor are
    reachable.
    
    Closes ticket 16695.
---
 changes/decouple_control_events |    8 ++
 src/or/control.c                |  272 ++++++++++++++++++++++++++++++++-------
 src/or/control.h                |   10 ++
 src/or/main.c                   |    2 +-
 src/test/test_hs.c              |   12 +-
 src/test/test_pt.c              |   12 +-
 6 files changed, 252 insertions(+), 64 deletions(-)

diff --git a/changes/decouple_control_events b/changes/decouple_control_events
new file mode 100644
index 0000000..67c9c11
--- /dev/null
+++ b/changes/decouple_control_events
@@ -0,0 +1,8 @@
+  o Code simplification and refactoring:
+    - When generating an event to send to the controller, we no longer
+      put the event over the network immediately.  Instead, we queue
+      these events, and use a Libevent callback to deliver them.
+      This change simplifies Tor's callgraph by reducing the number
+      of functions from which all other Tor functions are reachable.
+      Closes ticket 16695.
+
diff --git a/src/or/control.c b/src/or/control.c
index 7a113f2..a17f697 100644
--- a/src/or/control.c
+++ b/src/or/control.c
@@ -20,6 +20,7 @@
 #include "circuitstats.h"
 #include "circuituse.h"
 #include "command.h"
+#include "compat_libevent.h"
 #include "config.h"
 #include "confparse.h"
 #include "connection.h"
@@ -50,6 +51,12 @@
 #include <sys/resource.h>
 #endif
 
+#ifdef HAVE_EVENT2_EVENT_H
+#include <event2/event.h>
+#else
+#include <event.h>
+#endif
+
 #include "crypto_s2k.h"
 #include "procmon.h"
 
@@ -181,6 +188,8 @@ static void orconn_target_get_name(char *buf, size_t len,
 static int get_cached_network_liveness(void);
 static void set_cached_network_liveness(int liveness);
 
+static void flush_queued_events_cb(evutil_socket_t fd, short what, void *arg);
+
 /** Given a control event code for a message event, return the corresponding
  * log severity. */
 static INLINE int
@@ -578,6 +587,156 @@ send_control_done(control_connection_t *conn)
   connection_write_str_to_buf("250 OK\r\n", conn);
 }
 
+/** Represents an event that's queued to be sent to one or more
+ * controllers. */
+typedef struct queued_event_s {
+  uint16_t event;
+  char *msg;
+} queued_event_t;
+
+/** If this is greater than 0, we don't allow new events to be queued. */
+static int block_event_queue = 0;
+
+/** Holds a smartlist of queued_event_t objects that may need to be sent
+ * to one or more controllers */
+static smartlist_t *queued_control_events = NULL;
+
+/** An event that should fire in order to flush the contents of
+ * queued_control_events. */
+static struct event *flush_queued_events_event = NULL;
+
+/** Helper: inserts an event on the list of events queued to be sent to
+ * one or more controllers, and schedules the events to be flushed if needed.
+ *
+ * This function takes ownership of <b>msg</b>, and may free it.
+ *
+ * We queue these events rather than send them immediately in order to break
+ * the dependency in our callgraph from code that generates events for the
+ * controller, and the network layer at large.  Otherwise, nearly every
+ * interesting part of Tor would potentially call every other interesting part
+ * of Tor.
+ */
+MOCK_IMPL(STATIC void,
+queue_control_event_string,(uint16_t event, char *msg))
+{
+  if (PREDICT_UNLIKELY(queued_control_events == NULL)) {
+    queued_control_events = smartlist_new();
+  }
+
+  /* This is redundant with checks done elsewhere, but it's a last-ditch
+   * attempt to avoid queueing something we shouldn't have to queue. */
+  if (PREDICT_UNLIKELY( ! EVENT_IS_INTERESTING(event) )) {
+    tor_free(msg);
+    return;
+  }
+  if (block_event_queue) {
+    tor_free(msg);
+    return;
+  }
+
+  /* No queueing an event while queueing an event */
+  ++block_event_queue;
+
+  queued_event_t *ev = tor_malloc(sizeof(*ev));
+  ev->event = event;
+  ev->msg = msg;
+
+  smartlist_add(queued_control_events, ev);
+
+  /* We just put the first event on the queue; mark the queue to be
+   * flushed.
+   */
+  if (smartlist_len(queued_control_events) == 1) {
+    if (PREDICT_UNLIKELY(flush_queued_events_event == NULL)) {
+      struct event_base *b = tor_libevent_get_base();
+      tor_assert(b);
+      flush_queued_events_event = tor_event_new(b,
+                                                -1, 0, flush_queued_events_cb,
+                                                NULL);
+      tor_assert(flush_queued_events_event);
+    }
+    event_active(flush_queued_events_event, EV_READ, 1);
+  }
+
+  --block_event_queue;
+}
+
+/** Release all storage held by <b>ev</b>. */
+static void
+queued_event_free(queued_event_t *ev)
+{
+  if (ev == NULL)
+    return;
+
+  tor_free(ev->msg);
+  tor_free(ev);
+}
+
+/** Send every queued event to every controller that's interested in it,
+ * and remove the events from the queue.  If <b>force</b> is true,
+ * then make all controllers send their data out immediately, since we
+ * may be about to shut down. */
+static void
+queued_events_flush_all(int force)
+{
+  smartlist_t *all_conns = get_connection_array();
+  smartlist_t *controllers = smartlist_new();
+
+  if (PREDICT_UNLIKELY(queued_control_events == NULL)) {
+    return;
+  }
+
+  /* No queueing an event while flushing events. */
+  ++block_event_queue;
+
+  /* Gather all the controllers that will care... */
+  SMARTLIST_FOREACH_BEGIN(all_conns, connection_t *, conn) {
+    if (conn->type == CONN_TYPE_CONTROL &&
+        !conn->marked_for_close &&
+        conn->state == CONTROL_CONN_STATE_OPEN) {
+      control_connection_t *control_conn = TO_CONTROL_CONN(conn);
+
+      smartlist_add(controllers, control_conn);
+    }
+  } SMARTLIST_FOREACH_END(conn);
+
+  SMARTLIST_FOREACH_BEGIN(queued_control_events, queued_event_t *, ev) {
+    const event_mask_t bit = ((event_mask_t)1) << ev->event;
+    const size_t msg_len = strlen(ev->msg);
+    SMARTLIST_FOREACH_BEGIN(controllers, control_connection_t *,
+                            control_conn) {
+      if (control_conn->event_mask & bit) {
+        connection_write_to_buf(ev->msg, msg_len, TO_CONN(control_conn));
+      }
+    } SMARTLIST_FOREACH_END(control_conn);
+
+    queued_event_free(ev);
+  } SMARTLIST_FOREACH_END(ev);
+
+  if (force) {
+    SMARTLIST_FOREACH_BEGIN(controllers, control_connection_t *,
+                            control_conn) {
+      connection_flush(TO_CONN(control_conn));
+    } SMARTLIST_FOREACH_END(control_conn);
+  }
+
+  smartlist_clear(queued_control_events);
+  smartlist_free(controllers);
+
+  --block_event_queue;
+}
+
+/** Libevent callback: Flushes pending events to controllers that are
+ * interested in them */
+static void
+flush_queued_events_cb(evutil_socket_t fd, short what, void *arg)
+{
+  (void) fd;
+  (void) what;
+  (void) arg;
+  queued_events_flush_all(0);
+}
+
 /** Send an event to all v1 controllers that are listening for code
  * <b>event</b>.  The event's body is given by <b>msg</b>.
  *
@@ -592,32 +751,9 @@ MOCK_IMPL(STATIC void,
 send_control_event_string,(uint16_t event, event_format_t which,
                            const char *msg))
 {
-  smartlist_t *conns = get_connection_array();
-  (void)which;
+  (void) which;
   tor_assert(event >= EVENT_MIN_ && event <= EVENT_MAX_);
-
-  SMARTLIST_FOREACH_BEGIN(conns, connection_t *, conn) {
-    if (conn->type == CONN_TYPE_CONTROL &&
-        !conn->marked_for_close &&
-        conn->state == CONTROL_CONN_STATE_OPEN) {
-      control_connection_t *control_conn = TO_CONTROL_CONN(conn);
-
-      if (control_conn->event_mask & (((event_mask_t)1)<<event)) {
-        int is_err = 0;
-        connection_write_to_buf(msg, strlen(msg), TO_CONN(control_conn));
-        if (event == EVENT_ERR_MSG)
-          is_err = 1;
-        else if (event == EVENT_STATUS_GENERAL)
-          is_err = !strcmpstart(msg, "STATUS_GENERAL ERR ");
-        else if (event == EVENT_STATUS_CLIENT)
-          is_err = !strcmpstart(msg, "STATUS_CLIENT ERR ");
-        else if (event == EVENT_STATUS_SERVER)
-          is_err = !strcmpstart(msg, "STATUS_SERVER ERR ");
-        if (is_err)
-          connection_flush(TO_CONN(control_conn));
-      }
-    }
-  } SMARTLIST_FOREACH_END(conn);
+  queue_control_event_string(event, tor_strdup(msg));
 }
 
 /** Helper for send_control_event and control_event_status:
@@ -628,6 +764,8 @@ static void
 send_control_event_impl(uint16_t event, event_format_t which,
                          const char *format, va_list ap)
 {
+  (void) which;
+
   char *buf = NULL;
   int len;
 
@@ -637,9 +775,7 @@ send_control_event_impl(uint16_t event, event_format_t which,
     return;
   }
 
-  send_control_event_string(event, which|ALL_FORMATS, buf);
-
-  tor_free(buf);
+  queue_control_event_string(event, buf);
 }
 
 /** Send an event to all v1 controllers that are listening for code
@@ -5032,6 +5168,10 @@ control_event_logmsg(int severity, uint32_t domain, const char *msg)
     }
     ++disable_log_messages;
     send_control_event(event, ALL_FORMATS, "650 %s %s\r\n", s, b?b:msg);
+    if (severity == LOG_ERR) {
+      /* Force a flush, since we may be about to die horribly */
+      queued_events_flush_all(1);
+    }
     --disable_log_messages;
     tor_free(b);
   }
@@ -5401,19 +5541,35 @@ control_event_status(int type, int severity, const char *format, va_list args)
   return 0;
 }
 
+#define CONTROL_EVENT_STATUS_BODY(event, sev)                   \
+  int r;                                                        \
+  do {                                                          \
+    va_list ap;                                                 \
+    if (!EVENT_IS_INTERESTING(event))                           \
+      return 0;                                                 \
+                                                                \
+    va_start(ap, format);                                       \
+    r = control_event_status((event), (sev), format, ap);       \
+    va_end(ap);                                                 \
+  } while (0)
+
 /** Format and send an EVENT_STATUS_GENERAL event whose main text is obtained
  * by formatting the arguments using the printf-style <b>format</b>. */
 int
 control_event_general_status(int severity, const char *format, ...)
 {
-  va_list ap;
-  int r;
-  if (!EVENT_IS_INTERESTING(EVENT_STATUS_GENERAL))
-    return 0;
+  CONTROL_EVENT_STATUS_BODY(EVENT_STATUS_GENERAL, severity);
+  return r;
+}
 
-  va_start(ap, format);
-  r = control_event_status(EVENT_STATUS_GENERAL, severity, format, ap);
-  va_end(ap);
+/** Format and send an EVENT_STATUS_GENERAL LOG_ERR event, and flush it to the
+ * controller(s) immediately. */
+int
+control_event_general_error(const char *format, ...)
+{
+  CONTROL_EVENT_STATUS_BODY(EVENT_STATUS_GENERAL, LOG_ERR);
+  /* Force a flush, since we may be about to die horribly */
+  queued_events_flush_all(1);
   return r;
 }
 
@@ -5422,14 +5578,18 @@ control_event_general_status(int severity, const char *format, ...)
 int
 control_event_client_status(int severity, const char *format, ...)
 {
-  va_list ap;
-  int r;
-  if (!EVENT_IS_INTERESTING(EVENT_STATUS_CLIENT))
-    return 0;
+  CONTROL_EVENT_STATUS_BODY(EVENT_STATUS_CLIENT, severity);
+  return r;
+}
 
-  va_start(ap, format);
-  r = control_event_status(EVENT_STATUS_CLIENT, severity, format, ap);
-  va_end(ap);
+/** Format and send an EVENT_STATUS_CLIENT LOG_ERR event, and flush it to the
+ * controller(s) immediately. */
+int
+control_event_client_error(const char *format, ...)
+{
+  CONTROL_EVENT_STATUS_BODY(EVENT_STATUS_CLIENT, LOG_ERR);
+  /* Force a flush, since we may be about to die horribly */
+  queued_events_flush_all(1);
   return r;
 }
 
@@ -5438,14 +5598,18 @@ control_event_client_status(int severity, const char *format, ...)
 int
 control_event_server_status(int severity, const char *format, ...)
 {
-  va_list ap;
-  int r;
-  if (!EVENT_IS_INTERESTING(EVENT_STATUS_SERVER))
-    return 0;
+  CONTROL_EVENT_STATUS_BODY(EVENT_STATUS_SERVER, severity);
+  return r;
+}
 
-  va_start(ap, format);
-  r = control_event_status(EVENT_STATUS_SERVER, severity, format, ap);
-  va_end(ap);
+/** Format and send an EVENT_STATUS_SERVER LOG_ERR event, and flush it to the
+ * controller(s) immediately. */
+int
+control_event_server_error(const char *format, ...)
+{
+  CONTROL_EVENT_STATUS_BODY(EVENT_STATUS_SERVER, LOG_ERR);
+  /* Force a flush, since we may be about to die horribly */
+  queued_events_flush_all(1);
   return r;
 }
 
@@ -6262,6 +6426,16 @@ control_free_all(void)
     SMARTLIST_FOREACH(detached_onion_services, char *, cp, tor_free(cp));
     smartlist_free(detached_onion_services);
   }
+  if (queued_control_events) {
+    SMARTLIST_FOREACH(queued_control_events, queued_event_t *, ev,
+                      queued_event_free(ev));
+    smartlist_free(queued_control_events);
+    queued_control_events = NULL;
+  }
+  if (flush_queued_events_event) {
+    tor_event_free(flush_queued_events_event);
+    flush_queued_events_event = NULL;
+  }
 }
 
 #ifdef TOR_UNIT_TESTS
diff --git a/src/or/control.h b/src/or/control.h
index 2d02443..0840305 100644
--- a/src/or/control.h
+++ b/src/or/control.h
@@ -78,6 +78,14 @@ int control_event_client_status(int severity, const char *format, ...)
   CHECK_PRINTF(2,3);
 int control_event_server_status(int severity, const char *format, ...)
   CHECK_PRINTF(2,3);
+
+int control_event_general_error(const char *format, ...)
+  CHECK_PRINTF(1,2);
+int control_event_client_error(const char *format, ...)
+  CHECK_PRINTF(1,2);
+int control_event_server_error(const char *format, ...)
+  CHECK_PRINTF(1,2);
+
 int control_event_guard(const char *nickname, const char *digest,
                         const char *status);
 int control_event_conf_changed(const smartlist_t *elements);
@@ -215,6 +223,8 @@ typedef int event_format_t;
 MOCK_DECL(STATIC void,
 send_control_event_string,(uint16_t event, event_format_t which,
                            const char *msg));
+MOCK_DECL(STATIC void,
+          queue_control_event_string,(uint16_t event, char *msg));
 
 void control_testing_set_global_event_mask(uint64_t mask);
 #endif
diff --git a/src/or/main.c b/src/or/main.c
index 5bff82b..ee56e10 100644
--- a/src/or/main.c
+++ b/src/or/main.c
@@ -1007,7 +1007,7 @@ directory_all_unreachable(time_t now)
     connection_mark_unattached_ap(entry_conn,
                                   END_STREAM_REASON_NET_UNREACHABLE);
   }
-  control_event_general_status(LOG_ERR, "DIR_ALL_UNREACHABLE");
+  control_event_general_error("DIR_ALL_UNREACHABLE");
 }
 
 /** This function is called whenever we successfully pull down some new
diff --git a/src/test/test_hs.c b/src/test/test_hs.c
index 6d01798..126e211 100644
--- a/src/test/test_hs.c
+++ b/src/test/test_hs.c
@@ -102,13 +102,11 @@ static char *received_msg = NULL;
 /** Mock function for send_control_event_string
  */
 static void
-send_control_event_string_replacement(uint16_t event, event_format_t which,
-                                      const char *msg)
+queue_control_event_string_replacement(uint16_t event, char *msg)
 {
   (void) event;
-  (void) which;
   tor_free(received_msg);
-  received_msg = tor_strdup(msg);
+  received_msg = msg;
 }
 
 /** Mock function for node_describe_longname_by_id, it returns either
@@ -141,8 +139,8 @@ test_hs_desc_event(void *arg)
   char desc_id_base32[REND_DESC_ID_V2_LEN_BASE32 + 1];
 
   (void) arg;
-  MOCK(send_control_event_string,
-       send_control_event_string_replacement);
+  MOCK(queue_control_event_string,
+       queue_control_event_string_replacement);
   MOCK(node_describe_longname_by_id,
        node_describe_longname_by_id_replacement);
 
@@ -225,7 +223,7 @@ test_hs_desc_event(void *arg)
   smartlist_free(rend_query.hsdirs_fp);
 
  done:
-  UNMOCK(send_control_event_string);
+  UNMOCK(queue_control_event_string);
   UNMOCK(node_describe_longname_by_id);
   tor_free(received_msg);
 }
diff --git a/src/test/test_pt.c b/src/test/test_pt.c
index 996ef86..6c9aefc 100644
--- a/src/test/test_pt.c
+++ b/src/test/test_pt.c
@@ -333,15 +333,13 @@ static uint16_t controlevent_event = 0;
 static smartlist_t *controlevent_msgs = NULL;
 
 static void
-send_control_event_string_replacement(uint16_t event, event_format_t which,
-                                      const char *msg)
+queue_control_event_string_replacement(uint16_t event, char *msg)
 {
-  (void) which;
   ++controlevent_n;
   controlevent_event = event;
   if (!controlevent_msgs)
     controlevent_msgs = smartlist_new();
-  smartlist_add(controlevent_msgs, tor_strdup(msg));
+  smartlist_add(controlevent_msgs, msg);
 }
 
 /* Test the configure_proxy() function. */
@@ -360,8 +358,8 @@ test_pt_configure_proxy(void *arg)
        tor_process_handle_destroy_replacement);
   MOCK(get_or_state,
        get_or_state_replacement);
-  MOCK(send_control_event_string,
-       send_control_event_string_replacement);
+  MOCK(queue_control_event_string,
+       queue_control_event_string_replacement);
 
   control_testing_set_global_event_mask(EVENT_TRANSPORT_LAUNCHED);
 
@@ -435,7 +433,7 @@ test_pt_configure_proxy(void *arg)
   UNMOCK(tor_get_lines_from_handle);
   UNMOCK(tor_process_handle_destroy);
   UNMOCK(get_or_state);
-  UNMOCK(send_control_event_string);
+  UNMOCK(queue_control_event_string);
   if (controlevent_msgs) {
     SMARTLIST_FOREACH(controlevent_msgs, char *, cp, tor_free(cp));
     smartlist_free(controlevent_msgs);





More information about the tor-commits mailing list