[tor-commits] [tor/master] Rework orconn tracking to use pubsub

dgoulet at torproject.org dgoulet at torproject.org
Tue Jun 11 16:06:55 UTC 2019


commit a8c0f4ddfe3f0a63bd499959c8d921346aa9766e
Author: Taylor Yu <catalyst at torproject.org>
Date:   Fri Mar 8 09:41:43 2019 -0600

    Rework orconn tracking to use pubsub
    
    Part of ticket 29976.
---
 src/app/main/main.c                 |  1 +
 src/core/or/connection_or.c         | 28 ++++++------
 src/core/or/orconn_event.c          | 90 ++++++++++++++++++++++---------------
 src/core/or/orconn_event.h          | 31 +++----------
 src/feature/control/btrack.c        |  8 ++++
 src/feature/control/btrack_orconn.c | 49 ++++++++++----------
 src/feature/control/btrack_orconn.h |  3 ++
 src/test/test_btrack.c              | 65 +++++++++++++++++----------
 src/test/test_controller_events.c   | 51 ++++++++++++---------
 src/test/test_extorport.c           |  2 +-
 src/test/test_helpers.c             | 54 ++++++++++++++++++++++
 src/test/test_helpers.h             |  6 +++
 12 files changed, 242 insertions(+), 146 deletions(-)

diff --git a/src/app/main/main.c b/src/app/main/main.c
index 6e325f0b1..54f2afaa0 100644
--- a/src/app/main/main.c
+++ b/src/app/main/main.c
@@ -1256,6 +1256,7 @@ pubsub_connect(void)
     /* XXXX For each pubsub channel, its delivery strategy should be set at
      * this XXXX point, using tor_mainloop_set_delivery_strategy().
      */
+    tor_mainloop_set_delivery_strategy("orconn", DELIV_IMMEDIATE);
   }
 }
 
diff --git a/src/core/or/connection_or.c b/src/core/or/connection_or.c
index 830e09fd5..4c93351e3 100644
--- a/src/core/or/connection_or.c
+++ b/src/core/or/connection_or.c
@@ -414,13 +414,12 @@ void
 connection_or_event_status(or_connection_t *conn, or_conn_status_event_t tp,
                            int reason)
 {
-  orconn_event_msg_t msg;
+  orconn_status_msg_t *msg = tor_malloc(sizeof(*msg));
 
-  msg.type = ORCONN_MSGTYPE_STATUS;
-  msg.u.status.gid = conn->base_.global_identifier;
-  msg.u.status.status = tp;
-  msg.u.status.reason = reason;
-  orconn_event_publish(&msg);
+  msg->gid = conn->base_.global_identifier;
+  msg->status = tp;
+  msg->reason = reason;
+  orconn_status_publish(msg);
   control_event_or_conn_status(conn, tp, reason);
 }
 
@@ -433,26 +432,25 @@ connection_or_event_status(or_connection_t *conn, or_conn_status_event_t tp,
 static void
 connection_or_state_publish(const or_connection_t *conn, uint8_t state)
 {
-  orconn_event_msg_t msg;
+  orconn_state_msg_t *msg = tor_malloc(sizeof(*msg));
 
-  msg.type = ORCONN_MSGTYPE_STATE;
-  msg.u.state.gid = conn->base_.global_identifier;
+  msg->gid = conn->base_.global_identifier;
   if (conn->is_pt) {
     /* Do extra decoding because conn->proxy_type indicates the proxy
      * protocol that tor uses to talk with the transport plugin,
      * instead of PROXY_PLUGGABLE. */
     tor_assert_nonfatal(conn->proxy_type != PROXY_NONE);
-    msg.u.state.proxy_type = PROXY_PLUGGABLE;
+    msg->proxy_type = PROXY_PLUGGABLE;
   } else {
-    msg.u.state.proxy_type = conn->proxy_type;
+    msg->proxy_type = conn->proxy_type;
   }
-  msg.u.state.state = state;
+  msg->state = state;
   if (conn->chan) {
-    msg.u.state.chan = TLS_CHAN_TO_BASE(conn->chan)->global_identifier;
+    msg->chan = TLS_CHAN_TO_BASE(conn->chan)->global_identifier;
   } else {
-    msg.u.state.chan = 0;
+    msg->chan = 0;
   }
-  orconn_event_publish(&msg);
+  orconn_state_publish(msg);
 }
 
 /** Call this to change or_connection_t states, so the owning channel_tls_t can
diff --git a/src/core/or/orconn_event.c b/src/core/or/orconn_event.c
index 9fb34bd1f..86f112fc0 100644
--- a/src/core/or/orconn_event.c
+++ b/src/core/or/orconn_event.c
@@ -17,65 +17,83 @@
  **/
 
 #include "core/or/or.h"
+#include "lib/pubsub/pubsub.h"
 #include "lib/subsys/subsys.h"
 
 #define ORCONN_EVENT_PRIVATE
 #include "core/or/orconn_event.h"
 #include "core/or/orconn_event_sys.h"
 
-/** List of subscribers */
-static smartlist_t *orconn_event_rcvrs;
+DECLARE_PUBLISH(orconn_state);
+DECLARE_PUBLISH(orconn_status);
 
-/** Initialize subscriber list */
-static int
-orconn_event_init(void)
+static void
+orconn_event_free(msg_aux_data_t u)
 {
-  orconn_event_rcvrs = smartlist_new();
-  return 0;
+  tor_free_(u.ptr);
 }
 
-/** Free subscriber list */
-static void
-orconn_event_fini(void)
+static char *
+orconn_state_fmt(msg_aux_data_t u)
 {
-  smartlist_free(orconn_event_rcvrs);
+  orconn_state_msg_t *msg = (orconn_state_msg_t *)u.ptr;
+  char *s = NULL;
+
+  tor_asprintf(&s, "<gid=%"PRIu64" chan=%"PRIu64" proxy_type=%d state=%d>",
+               msg->gid, msg->chan, msg->proxy_type, msg->state);
+  return s;
 }
 
-/**
- * Subscribe to messages about OR connection events
- *
- * Register a callback function to receive messages about ORCONNs.
- * The publisher calls this function synchronously.
- **/
-void
-orconn_event_subscribe(orconn_event_rcvr_t fn)
+static char *
+orconn_status_fmt(msg_aux_data_t u)
 {
-  tor_assert(fn);
-  /* Don't duplicate subscriptions. */
-  if (smartlist_contains(orconn_event_rcvrs, fn))
-    return;
+  orconn_status_msg_t *msg = (orconn_status_msg_t *)u.ptr;
+  char *s = NULL;
 
-  smartlist_add(orconn_event_rcvrs, fn);
+  tor_asprintf(&s, "<gid=%"PRIu64" status=%d reason=%d>",
+               msg->gid, msg->status, msg->reason);
+  return s;
+}
+
+static dispatch_typefns_t orconn_state_fns = {
+  .free_fn = orconn_event_free,
+  .fmt_fn = orconn_state_fmt,
+};
+
+static dispatch_typefns_t orconn_status_fns = {
+  .free_fn = orconn_event_free,
+  .fmt_fn = orconn_status_fmt,
+};
+
+static int
+orconn_add_pubsub(struct pubsub_connector_t *connector)
+{
+  if (DISPATCH_REGISTER_TYPE(connector, orconn_state, &orconn_state_fns))
+    return -1;
+  if (DISPATCH_REGISTER_TYPE(connector, orconn_status, &orconn_status_fns))
+    return -1;
+  if (DISPATCH_ADD_PUB(connector, orconn, orconn_state) != 0)
+    return -1;
+  if (DISPATCH_ADD_PUB(connector, orconn, orconn_status) != 0)
+    return -1;
+  return 0;
+}
+
+void
+orconn_state_publish(orconn_state_msg_t *msg)
+{
+  PUBLISH(orconn_state, msg);
 }
 
-/**
- * Publish a message about OR connection events
- *
- * This calls the subscriber receiver function synchronously.
- **/
 void
-orconn_event_publish(const orconn_event_msg_t *msg)
+orconn_status_publish(orconn_status_msg_t *msg)
 {
-  SMARTLIST_FOREACH_BEGIN(orconn_event_rcvrs, orconn_event_rcvr_t, fn) {
-    tor_assert(fn);
-    (*fn)(msg);
-  } SMARTLIST_FOREACH_END(fn);
+  PUBLISH(orconn_status, msg);
 }
 
 const subsys_fns_t sys_orconn_event = {
   .name = "orconn_event",
   .supported = true,
   .level = -33,
-  .initialize = orconn_event_init,
-  .shutdown = orconn_event_fini,
+  .add_pubsub = orconn_add_pubsub,
 };
diff --git a/src/core/or/orconn_event.h b/src/core/or/orconn_event.h
index 80289d53e..a3b37d2c3 100644
--- a/src/core/or/orconn_event.h
+++ b/src/core/or/orconn_event.h
@@ -16,6 +16,8 @@
 #ifndef TOR_ORCONN_EVENT_H
 #define TOR_ORCONN_EVENT_H
 
+#include "lib/pubsub/pubsub.h"
+
 /**
  * @name States of OR connections
  *
@@ -62,12 +64,6 @@ typedef enum or_conn_status_event_t {
   OR_CONN_EVENT_NEW          = 4,
 } or_conn_status_event_t;
 
-/** Discriminant values for orconn event message */
-typedef enum orconn_msgtype_t {
-  ORCONN_MSGTYPE_STATE,
-  ORCONN_MSGTYPE_STATUS,
-} orconn_msgtype_t;
-
 /**
  * Message for orconn state update
  *
@@ -83,6 +79,8 @@ typedef struct orconn_state_msg_t {
   uint8_t state;                /**< new connection state */
 } orconn_state_msg_t;
 
+DECLARE_MESSAGE(orconn_state, orconn_state, orconn_state_msg_t *);
+
 /**
  * Message for orconn status event
  *
@@ -95,26 +93,11 @@ typedef struct orconn_status_msg_t {
   int reason;                   /**< reason */
 } orconn_status_msg_t;
 
-/** Discriminated union for the actual message */
-typedef struct orconn_event_msg_t {
-  int type;
-  union {
-    orconn_state_msg_t state;
-    orconn_status_msg_t status;
-  } u;
-} orconn_event_msg_t;
-
-/**
- * Receiver function pointer for OR subscribers
- *
- * This function gets called synchronously by the publisher.
- **/
-typedef void (*orconn_event_rcvr_t)(const orconn_event_msg_t *);
-
-void orconn_event_subscribe(orconn_event_rcvr_t);
+DECLARE_MESSAGE(orconn_status, orconn_status, orconn_status_msg_t *);
 
 #ifdef ORCONN_EVENT_PRIVATE
-void orconn_event_publish(const orconn_event_msg_t *);
+void orconn_state_publish(orconn_state_msg_t *);
+void orconn_status_publish(orconn_status_msg_t *);
 #endif
 
 #endif  /* defined(TOR_ORCONN_EVENT_H) */
diff --git a/src/feature/control/btrack.c b/src/feature/control/btrack.c
index d3d12cb2b..3a6ae0788 100644
--- a/src/feature/control/btrack.c
+++ b/src/feature/control/btrack.c
@@ -24,6 +24,7 @@
 #include "feature/control/btrack_circuit.h"
 #include "feature/control/btrack_orconn.h"
 #include "feature/control/btrack_sys.h"
+#include "lib/pubsub/pubsub.h"
 #include "lib/subsys/subsys.h"
 
 static int
@@ -44,10 +45,17 @@ btrack_fini(void)
   btrack_circ_fini();
 }
 
+static int
+btrack_add_pubsub(pubsub_connector_t *connector)
+{
+  return btrack_orconn_add_pubsub(connector);
+}
+
 const subsys_fns_t sys_btrack = {
   .name = "btrack",
   .supported = true,
   .level = -30,
   .initialize = btrack_init,
   .shutdown = btrack_fini,
+  .add_pubsub = btrack_add_pubsub,
 };
diff --git a/src/feature/control/btrack_orconn.c b/src/feature/control/btrack_orconn.c
index 93ebe8d9c..cbeb7b4ff 100644
--- a/src/feature/control/btrack_orconn.c
+++ b/src/feature/control/btrack_orconn.c
@@ -45,6 +45,10 @@
 #include "feature/control/btrack_orconn_cevent.h"
 #include "feature/control/btrack_orconn_maps.h"
 #include "lib/log/log.h"
+#include "lib/pubsub/pubsub.h"
+
+DECLARE_SUBSCRIBE(orconn_state, bto_state_rcvr);
+DECLARE_SUBSCRIBE(orconn_status, bto_status_rcvr);
 
 /** Pair of a best ORCONN GID and with its state */
 typedef struct bto_best_t {
@@ -110,16 +114,17 @@ bto_reset_bests(void)
  * message comes from code in connection_or.c.
  **/
 static void
-bto_state_rcvr(const orconn_state_msg_t *msg)
+bto_state_rcvr(const msg_t *msg, const orconn_state_msg_t *arg)
 {
   bt_orconn_t *bto;
 
-  bto = bto_find_or_new(msg->gid, msg->chan);
+  (void)msg;
+  bto = bto_find_or_new(arg->gid, arg->chan);
   log_debug(LD_BTRACK, "ORCONN gid=%"PRIu64" chan=%"PRIu64
             " proxy_type=%d state=%d",
-            msg->gid, msg->chan, msg->proxy_type, msg->state);
-  bto->proxy_type = msg->proxy_type;
-  bto->state = msg->state;
+            arg->gid, arg->chan, arg->proxy_type, arg->state);
+  bto->proxy_type = arg->proxy_type;
+  bto->state = arg->state;
   if (bto->is_orig)
     bto_update_bests(bto);
 }
@@ -130,33 +135,20 @@ bto_state_rcvr(const orconn_state_msg_t *msg)
  * control.c.
  **/
 static void
-bto_status_rcvr(const orconn_status_msg_t *msg)
+bto_status_rcvr(const msg_t *msg, const orconn_status_msg_t *arg)
 {
-  switch (msg->status) {
+  (void)msg;
+  switch (arg->status) {
   case OR_CONN_EVENT_FAILED:
   case OR_CONN_EVENT_CLOSED:
     log_info(LD_BTRACK, "ORCONN DELETE gid=%"PRIu64" status=%d reason=%d",
-             msg->gid, msg->status, msg->reason);
-    return bto_delete(msg->gid);
+             arg->gid, arg->status, arg->reason);
+    return bto_delete(arg->gid);
   default:
     break;
   }
 }
 
-/** Dispatch to individual ORCONN message handlers */
-static void
-bto_event_rcvr(const orconn_event_msg_t *msg)
-{
-  switch (msg->type) {
-  case ORCONN_MSGTYPE_STATE:
-    return bto_state_rcvr(&msg->u.state);
-  case ORCONN_MSGTYPE_STATUS:
-    return bto_status_rcvr(&msg->u.status);
-  default:
-    tor_assert(false);
-  }
-}
-
 /**
  * Create or update a cached ORCONN state for a newly launched
  * connection, including whether it's launched by an origin circuit
@@ -190,12 +182,21 @@ int
 btrack_orconn_init(void)
 {
   bto_init_maps();
-  orconn_event_subscribe(bto_event_rcvr);
   ocirc_event_subscribe(bto_chan_rcvr);
 
   return 0;
 }
 
+int
+btrack_orconn_add_pubsub(pubsub_connector_t *connector)
+{
+  if (DISPATCH_ADD_SUB(connector, orconn, orconn_state))
+    return -1;
+  if (DISPATCH_ADD_SUB(connector, orconn, orconn_status))
+    return -1;
+  return 0;
+}
+
 /** Clear the hash maps and reset the "best" states */
 void
 btrack_orconn_fini(void)
diff --git a/src/feature/control/btrack_orconn.h b/src/feature/control/btrack_orconn.h
index 6ab4892a7..fed9a58eb 100644
--- a/src/feature/control/btrack_orconn.h
+++ b/src/feature/control/btrack_orconn.h
@@ -9,6 +9,8 @@
 #ifndef TOR_BTRACK_ORCONN_H
 #define TOR_BTRACK_ORCONN_H
 
+#include "lib/pubsub/pubsub.h"
+
 #ifdef BTRACK_ORCONN_PRIVATE
 
 #include "ht.h"
@@ -33,6 +35,7 @@ typedef struct bt_orconn_t {
 #endif  /* defined(BTRACK_ORCONN_PRIVATE) */
 
 int btrack_orconn_init(void);
+int btrack_orconn_add_pubsub(pubsub_connector_t *);
 void btrack_orconn_fini(void);
 
 #endif  /* defined(TOR_BTRACK_ORCONN_H) */
diff --git a/src/test/test_btrack.c b/src/test/test_btrack.c
index 48486fb5a..fef1da484 100644
--- a/src/test/test_btrack.c
+++ b/src/test/test_btrack.c
@@ -4,6 +4,7 @@
 #include "core/or/or.h"
 
 #include "test/test.h"
+#include "test_helpers.h"
 #include "test/log_test_helpers.h"
 
 #define OCIRC_EVENT_PRIVATE
@@ -12,20 +13,37 @@
 #include "core/or/orconn_event.h"
 
 static void
+send_state(const orconn_state_msg_t *msg_in)
+{
+  orconn_state_msg_t *msg = tor_malloc(sizeof(*msg));
+
+  *msg = *msg_in;
+  orconn_state_publish(msg);
+}
+
+static void
+send_status(const orconn_status_msg_t *msg_in)
+{
+  orconn_status_msg_t *msg = tor_malloc(sizeof(*msg));
+
+  *msg = *msg_in;
+  orconn_status_publish(msg);
+}
+
+static void
 test_btrack_launch(void *arg)
 {
-  orconn_event_msg_t conn;
+  orconn_state_msg_t conn;
   ocirc_event_msg_t circ;
 
   (void)arg;
-  conn.type = ORCONN_MSGTYPE_STATE;
-  conn.u.state.gid = 1;
-  conn.u.state.chan = 1;
-  conn.u.state.proxy_type = PROXY_NONE;
-  conn.u.state.state = OR_CONN_STATE_CONNECTING;
+  conn.gid = 1;
+  conn.chan = 1;
+  conn.proxy_type = PROXY_NONE;
+  conn.state = OR_CONN_STATE_CONNECTING;
 
   setup_full_capture_of_logs(LOG_DEBUG);
-  orconn_event_publish(&conn);
+  send_state(&conn);
   expect_log_msg_containing("ORCONN gid=1 chan=1 proxy_type=0 state=1");
   expect_no_log_msg_containing("ORCONN BEST_");
   teardown_capture_of_logs();
@@ -40,11 +58,11 @@ test_btrack_launch(void *arg)
   expect_log_msg_containing("ORCONN BEST_ANY state -1->1 gid=1");
   teardown_capture_of_logs();
 
-  conn.u.state.gid = 2;
-  conn.u.state.chan = 2;
+  conn.gid = 2;
+  conn.chan = 2;
 
   setup_full_capture_of_logs(LOG_DEBUG);
-  orconn_event_publish(&conn);
+  send_state(&conn);
   expect_log_msg_containing("ORCONN gid=2 chan=2 proxy_type=0 state=1");
   expect_no_log_msg_containing("ORCONN BEST_");
   teardown_capture_of_logs();
@@ -65,27 +83,26 @@ test_btrack_launch(void *arg)
 static void
 test_btrack_delete(void *arg)
 {
-  orconn_event_msg_t conn;
+  orconn_state_msg_t state;
+  orconn_status_msg_t status;
 
   (void)arg;
-  conn.type = ORCONN_MSGTYPE_STATE;
-  conn.u.state.gid = 1;
-  conn.u.state.chan = 1;
-  conn.u.state.proxy_type = PROXY_NONE;
-  conn.u.state.state = OR_CONN_STATE_CONNECTING;
+  state.gid = 1;
+  state.chan = 1;
+  state.proxy_type = PROXY_NONE;
+  state.state = OR_CONN_STATE_CONNECTING;
 
   setup_full_capture_of_logs(LOG_DEBUG);
-  orconn_event_publish(&conn);
+  send_state(&state);
   expect_log_msg_containing("ORCONN gid=1 chan=1 proxy_type=0");
   teardown_capture_of_logs();
 
-  conn.type = ORCONN_MSGTYPE_STATUS;
-  conn.u.status.gid = 1;
-  conn.u.status.status = OR_CONN_EVENT_CLOSED;
-  conn.u.status.reason = 0;
+  status.gid = 1;
+  status.status = OR_CONN_EVENT_CLOSED;
+  status.reason = 0;
 
   setup_full_capture_of_logs(LOG_DEBUG);
-  orconn_event_publish(&conn);
+  send_status(&status);
   expect_log_msg_containing("ORCONN DELETE gid=1 status=3 reason=0");
   teardown_capture_of_logs();
 
@@ -94,7 +111,7 @@ test_btrack_delete(void *arg)
 }
 
 struct testcase_t btrack_tests[] = {
-  { "launch", test_btrack_launch, TT_FORK, 0, NULL },
-  { "delete", test_btrack_delete, TT_FORK, 0, NULL },
+  { "launch", test_btrack_launch, TT_FORK, &helper_pubsub_setup, NULL },
+  { "delete", test_btrack_delete, TT_FORK, &helper_pubsub_setup, NULL },
   END_OF_TESTCASES
 };
diff --git a/src/test/test_controller_events.c b/src/test/test_controller_events.c
index 910aacace..14fe4fd66 100644
--- a/src/test/test_controller_events.c
+++ b/src/test/test_controller_events.c
@@ -7,6 +7,7 @@
 #define CONTROL_EVENTS_PRIVATE
 #define OCIRC_EVENT_PRIVATE
 #define ORCONN_EVENT_PRIVATE
+#include "app/main/subsysmgr.h"
 #include "core/or/or.h"
 #include "core/or/channel.h"
 #include "core/or/channeltls.h"
@@ -16,6 +17,7 @@
 #include "core/mainloop/connection.h"
 #include "feature/control/control_events.h"
 #include "test/test.h"
+#include "test/test_helpers.h"
 
 #include "core/or/or_circuit_st.h"
 #include "core/or/origin_circuit_st.h"
@@ -394,20 +396,22 @@ test_cntev_dirboot_defer_orconn(void *arg)
 }
 
 static void
-setup_orconn_state(orconn_event_msg_t *msg, uint64_t gid, uint64_t chan,
+setup_orconn_state(orconn_state_msg_t *msg, uint64_t gid, uint64_t chan,
                    int proxy_type)
 {
-  msg->type = ORCONN_MSGTYPE_STATE;
-  msg->u.state.gid = gid;
-  msg->u.state.chan = chan;
-  msg->u.state.proxy_type = proxy_type;
+  msg->gid = gid;
+  msg->chan = chan;
+  msg->proxy_type = proxy_type;
 }
 
 static void
-send_orconn_state(orconn_event_msg_t *msg, uint8_t state)
+send_orconn_state(const orconn_state_msg_t *msg_in, uint8_t state)
 {
-  msg->u.state.state = state;
-  orconn_event_publish(msg);
+  orconn_state_msg_t *msg = tor_malloc(sizeof(*msg));
+
+  *msg = *msg_in;
+  msg->state = state;
+  orconn_state_publish(msg);
 }
 
 static void
@@ -425,7 +429,7 @@ send_ocirc_chan(uint32_t gid, uint64_t chan, bool onehop)
 static void
 test_cntev_orconn_state(void *arg)
 {
-  orconn_event_msg_t conn;
+  orconn_state_msg_t conn;
 
   (void)arg;
   MOCK(queue_control_event_string, mock_queue_control_event_string);
@@ -442,8 +446,8 @@ test_cntev_orconn_state(void *arg)
   send_orconn_state(&conn, OR_CONN_STATE_OPEN);
   assert_bootmsg("15 TAG=handshake_done");
 
-  conn.u.state.gid = 2;
-  conn.u.state.chan = 2;
+  conn.gid = 2;
+  conn.chan = 2;
   send_orconn_state(&conn, OR_CONN_STATE_CONNECTING);
   /* It doesn't know it's an origin circuit yet */
   assert_bootmsg("15 TAG=handshake_done");
@@ -464,7 +468,7 @@ test_cntev_orconn_state(void *arg)
 static void
 test_cntev_orconn_state_pt(void *arg)
 {
-  orconn_event_msg_t conn;
+  orconn_state_msg_t conn;
 
   (void)arg;
   MOCK(queue_control_event_string, mock_queue_control_event_string);
@@ -484,8 +488,8 @@ test_cntev_orconn_state_pt(void *arg)
   assert_bootmsg("15 TAG=handshake_done");
 
   send_ocirc_chan(2, 2, false);
-  conn.u.state.gid = 2;
-  conn.u.state.chan = 2;
+  conn.gid = 2;
+  conn.chan = 2;
   send_orconn_state(&conn, OR_CONN_STATE_CONNECTING);
   assert_bootmsg("76 TAG=ap_conn_pt");
   send_orconn_state(&conn, OR_CONN_STATE_PROXY_HANDSHAKING);
@@ -499,7 +503,7 @@ test_cntev_orconn_state_pt(void *arg)
 static void
 test_cntev_orconn_state_proxy(void *arg)
 {
-  orconn_event_msg_t conn;
+  orconn_state_msg_t conn;
 
   (void)arg;
   MOCK(queue_control_event_string, mock_queue_control_event_string);
@@ -519,8 +523,8 @@ test_cntev_orconn_state_proxy(void *arg)
   assert_bootmsg("15 TAG=handshake_done");
 
   send_ocirc_chan(2, 2, false);
-  conn.u.state.gid = 2;
-  conn.u.state.chan = 2;
+  conn.gid = 2;
+  conn.chan = 2;
   send_orconn_state(&conn, OR_CONN_STATE_CONNECTING);
   assert_bootmsg("78 TAG=ap_conn_proxy");
   send_orconn_state(&conn, OR_CONN_STATE_PROXY_HANDSHAKING);
@@ -534,15 +538,18 @@ test_cntev_orconn_state_proxy(void *arg)
 #define TEST(name, flags)                               \
   { #name, test_cntev_ ## name, flags, 0, NULL }
 
+#define T_PUBSUB(name, setup)                                           \
+  { #name, test_cntev_ ## name, TT_FORK, &helper_pubsub_setup, NULL }
+
 struct testcase_t controller_event_tests[] = {
   TEST(sum_up_cell_stats, TT_FORK),
   TEST(append_cell_stats, TT_FORK),
   TEST(format_cell_stats, TT_FORK),
   TEST(event_mask, TT_FORK),
-  TEST(dirboot_defer_desc, TT_FORK),
-  TEST(dirboot_defer_orconn, TT_FORK),
-  TEST(orconn_state, TT_FORK),
-  TEST(orconn_state_pt, TT_FORK),
-  TEST(orconn_state_proxy, TT_FORK),
+  T_PUBSUB(dirboot_defer_desc, TT_FORK),
+  T_PUBSUB(dirboot_defer_orconn, TT_FORK),
+  T_PUBSUB(orconn_state, TT_FORK),
+  T_PUBSUB(orconn_state_pt, TT_FORK),
+  T_PUBSUB(orconn_state_proxy, TT_FORK),
   END_OF_TESTCASES
 };
diff --git a/src/test/test_extorport.c b/src/test/test_extorport.c
index 38aca9026..cb53a4e66 100644
--- a/src/test/test_extorport.c
+++ b/src/test/test_extorport.c
@@ -587,6 +587,6 @@ struct testcase_t extorport_tests[] = {
   { "cookie_auth", test_ext_or_cookie_auth, TT_FORK, NULL, NULL },
   { "cookie_auth_testvec", test_ext_or_cookie_auth_testvec, TT_FORK,
     NULL, NULL },
-  { "handshake", test_ext_or_handshake, TT_FORK, NULL, NULL },
+  { "handshake", test_ext_or_handshake, TT_FORK, &helper_pubsub_setup, NULL },
   END_OF_TESTCASES
 };
diff --git a/src/test/test_helpers.c b/src/test/test_helpers.c
index 489c25776..e856dc6cc 100644
--- a/src/test/test_helpers.c
+++ b/src/test/test_helpers.c
@@ -17,12 +17,17 @@
 #include "lib/buf/buffers.h"
 #include "app/config/config.h"
 #include "app/config/confparse.h"
+#include "app/main/subsysmgr.h"
 #include "core/mainloop/connection.h"
 #include "lib/crypt_ops/crypto_rand.h"
 #include "core/mainloop/mainloop.h"
 #include "feature/nodelist/nodelist.h"
 #include "core/or/relay.h"
 #include "feature/nodelist/routerlist.h"
+#include "lib/dispatch/dispatch.h"
+#include "lib/dispatch/dispatch_naming.h"
+#include "lib/pubsub/pubsub_build.h"
+#include "lib/pubsub/pubsub_connect.h"
 #include "lib/encoding/confline.h"
 #include "lib/net/resolve.h"
 
@@ -303,3 +308,52 @@ helper_parse_options(const char *conf)
   }
   return opt;
 }
+
+/**
+ * Dispatch alertfn callback: flush all messages right now. Implements
+ * DELIV_IMMEDIATE.
+ **/
+static void
+alertfn_immediate(dispatch_t *d, channel_id_t chan, void *arg)
+{
+  (void) arg;
+  dispatch_flush(d, chan, INT_MAX);
+}
+
+/**
+ * Setup helper for tests that need pubsub active
+ *
+ * Does not hook up mainloop events.  Does set immediate delivery for
+ * all channels.
+ */
+void *
+helper_setup_pubsub(const struct testcase_t *testcase)
+{
+  dispatch_t *dispatcher = NULL;
+  pubsub_builder_t *builder = pubsub_builder_new();
+  channel_id_t chan = get_channel_id("orconn");
+
+  (void)testcase;
+  (void)subsystems_add_pubsub(builder);
+  dispatcher = pubsub_builder_finalize(builder, NULL);
+  tor_assert(dispatcher);
+  dispatch_set_alert_fn(dispatcher, chan, alertfn_immediate, NULL);
+  return dispatcher;
+}
+
+/**
+ * Cleanup helper for tests that need pubsub active
+ */
+int
+helper_cleanup_pubsub(const struct testcase_t *testcase, void *dispatcher_)
+{
+  dispatch_t *dispatcher = dispatcher_;
+
+  (void)testcase;
+  dispatch_free(dispatcher);
+  return 1;
+}
+
+const struct testcase_setup_t helper_pubsub_setup = {
+  helper_setup_pubsub, helper_cleanup_pubsub
+};
diff --git a/src/test/test_helpers.h b/src/test/test_helpers.h
index 9e376a563..d82072bb3 100644
--- a/src/test/test_helpers.h
+++ b/src/test/test_helpers.h
@@ -7,6 +7,7 @@
 #define BUFFERS_PRIVATE
 
 #include "core/or/or.h"
+#include "tinytest.h"
 
 const char *get_yesterday_date_str(void);
 
@@ -31,5 +32,10 @@ or_options_t *helper_parse_options(const char *conf);
 
 extern const char TEST_DESCRIPTORS[];
 
+void *helper_setup_pubsub(const struct testcase_t *);
+int helper_cleanup_pubsub(const struct testcase_t *, void *);
+
+extern const struct testcase_setup_t helper_pubsub_setup;
+
 #endif /* !defined(TOR_TEST_HELPERS_H) */
 





More information about the tor-commits mailing list