[tor-commits] [tor/master] Low-level dispatch module for publish-subscribe mechanism

asn at torproject.org asn at torproject.org
Wed Mar 27 12:31:31 UTC 2019


commit e4d3098d4d23686320013b80b6305fbd52863f76
Author: Nick Mathewson <nickm at torproject.org>
Date:   Fri Jan 11 20:17:04 2019 -0500

    Low-level dispatch module for publish-subscribe mechanism
    
    This module implements a way to send messages from one module to
    another, with associated data types.  It does not yet do anything to
    ensure that messages are correct, that types match, or that other
    forms of consistency are preserved.
---
 .gitignore                         |   2 +
 Makefile.am                        |   2 +
 src/include.am                     |   1 +
 src/lib/dispatch/.may_include      |   9 ++
 src/lib/dispatch/dispatch.h        | 114 ++++++++++++++++++
 src/lib/dispatch/dispatch_cfg.c    | 138 ++++++++++++++++++++++
 src/lib/dispatch/dispatch_cfg.h    |  39 +++++++
 src/lib/dispatch/dispatch_cfg_st.h |  25 ++++
 src/lib/dispatch/dispatch_core.c   | 234 +++++++++++++++++++++++++++++++++++++
 src/lib/dispatch/dispatch_new.c    | 170 +++++++++++++++++++++++++++
 src/lib/dispatch/dispatch_st.h     | 108 +++++++++++++++++
 src/lib/dispatch/include.am        |  23 ++++
 src/lib/dispatch/msgtypes.h        |  80 +++++++++++++
 src/test/include.am                |   1 +
 src/test/test.c                    |   1 +
 src/test/test.h                    |   1 +
 src/test/test_dispatch.c           | 181 ++++++++++++++++++++++++++++
 17 files changed, 1129 insertions(+)

diff --git a/.gitignore b/.gitignore
index 6a49285b8..f4f6dacbb 100644
--- a/.gitignore
+++ b/.gitignore
@@ -168,6 +168,8 @@ uptime-*.json
 /src/lib/libtor-crypt-ops-testing.a
 /src/lib/libtor-ctime.a
 /src/lib/libtor-ctime-testing.a
+/src/lib/libtor-dispatch.a
+/src/lib/libtor-dispatch-testing.a
 /src/lib/libtor-encoding.a
 /src/lib/libtor-encoding-testing.a
 /src/lib/libtor-evloop.a
diff --git a/Makefile.am b/Makefile.am
index a5086b303..36d9725f3 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -41,6 +41,7 @@ TOR_UTIL_LIBS = \
         src/lib/libtor-geoip.a \
 	src/lib/libtor-process.a \
         src/lib/libtor-buf.a \
+	src/lib/libtor-dispatch.a \
 	src/lib/libtor-time.a \
 	src/lib/libtor-fs.a \
 	src/lib/libtor-encoding.a \
@@ -72,6 +73,7 @@ TOR_UTIL_TESTING_LIBS = \
         src/lib/libtor-geoip-testing.a \
 	src/lib/libtor-process-testing.a \
         src/lib/libtor-buf-testing.a \
+	src/lib/libtor-dispatch-testing.a \
 	src/lib/libtor-time-testing.a \
 	src/lib/libtor-fs-testing.a \
 	src/lib/libtor-encoding-testing.a \
diff --git a/src/include.am b/src/include.am
index 9070a69a0..c6c351c80 100644
--- a/src/include.am
+++ b/src/include.am
@@ -8,6 +8,7 @@ include src/lib/compress/include.am
 include src/lib/container/include.am
 include src/lib/crypt_ops/include.am
 include src/lib/defs/include.am
+include src/lib/dispatch/include.am
 include src/lib/encoding/include.am
 include src/lib/evloop/include.am
 include src/lib/fdio/include.am
diff --git a/src/lib/dispatch/.may_include b/src/lib/dispatch/.may_include
new file mode 100644
index 000000000..9b5373907
--- /dev/null
+++ b/src/lib/dispatch/.may_include
@@ -0,0 +1,9 @@
+orconfig.h
+
+ext/tor_queue.h
+
+lib/container/*.h
+lib/dispatch/*.h
+lib/intmath/*.h
+lib/log/*.h
+lib/malloc/*.h
diff --git a/src/lib/dispatch/dispatch.h b/src/lib/dispatch/dispatch.h
new file mode 100644
index 000000000..8e62e8f16
--- /dev/null
+++ b/src/lib/dispatch/dispatch.h
@@ -0,0 +1,114 @@
+/* Copyright (c) 2001, Matej Pfajfar.
+ * Copyright (c) 2001-2004, Roger Dingledine.
+ * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
+ * Copyright (c) 2007-2018, The Tor Project, Inc. */
+/* See LICENSE for licensing information */
+
+#ifndef TOR_DISPATCH_H
+#define TOR_DISPATCH_H
+
+#include "lib/dispatch/msgtypes.h"
+
+/**
+ * \file dispatch.h
+ * \brief Low-level APIs for message-passing system.
+ *
+ * This module implements message dispatch based on a set of short integer
+ * identifiers.  For a higher-level interface, see pubsub.h.
+ *
+ * Each message is represented as a generic msg_t object, and is discriminated
+ * by its message_id_t.  Messages are delivered by a dispatch_t object, which
+ * delivers each message to its recipients by a configured "channel".
+ *
+ * A "channel" is a means of delivering messages.  Every message_id_t must
+ * be associated with exactly one channel, identified by channel_id_t.
+ * When a channel receives messages, a callback is invoked to either process
+ * the messages immediately, or to cause them to be processed later.
+ *
+ * Every message_id_t has zero or more associated receiver functions set up in
+ * the dispatch_t object.  Once the dispatch_t object is created, receivers
+ * can be enabled or disabled [TODO], but not added or removed.
+ *
+ * Every message_id_t has an associated datatype, identified by a
+ * msg_type_id_t.  These datatypes can be associated with functions to
+ * (for example) free them, or format them for debugging.
+ *
+ * To setup a dispatch_t object, first create a dispatch_cfg_t object, and
+ * configure messages with their types, channels, and receivers.  Then, use
+ * dispatch_new() with that dispatch_cfg_t to create the dispatch_t object.
+ *
+ * (We use a two-phase contruction procedure here to enable better static
+ * reasoning about publish/subscribe relationships.)
+ *
+ * Once you have a dispatch_t, you can queue messages on it with
+ * dispatch_send*(), and cause those messages to be delivered with
+ * dispatch_flush().
+ **/
+
+/**
+ * A "dispatcher" is the highest-level object; it handles making sure that
+ * messages are received and delivered properly.  Only the mainloop
+ * should handle this type directly.
+ */
+typedef struct dispatch_t dispatch_t;
+
+struct dispatch_cfg_t;
+
+dispatch_t *dispatch_new(const struct dispatch_cfg_t *cfg);
+
+/**
+ * Free a dispatcher.  Tor does this at exit.
+ */
+#define dispatch_free(d) \
+  FREE_AND_NULL(dispatch_t, dispatch_free_, (d))
+
+void dispatch_free_(dispatch_t *);
+
+int dispatch_send(dispatch_t *d,
+                  subsys_id_t sender,
+                  channel_id_t channel,
+                  message_id_t msg,
+                  msg_type_id_t type,
+                  msg_aux_data_t auxdata);
+
+int dispatch_send_msg(dispatch_t *d, msg_t *m);
+
+int dispatch_send_msg_unchecked(dispatch_t *d, msg_t *m);
+
+/* Flush up to <b>max_msgs</b> currently pending messages from the
+ * dispatcher.  Messages that are not pending when this function are
+ * called, are not flushed by this call.  Return 0 on success, -1 on
+ * unrecoverable error.
+ */
+int dispatch_flush(dispatch_t *, channel_id_t chan, int max_msgs);
+
+/**
+ * Function callback type used to alert some other module when a channel's
+ * queue changes from empty to nonempty.
+ *
+ * Ex 1: To cause messages to be processed immediately on-stack, this callback
+ * should invoke dispatch_flush() directly.
+ *
+ * Ex 2: To cause messages to be processed very soon, from the event queue,
+ * this callback should schedule an event callback to run dispatch_flush().
+ *
+ * Ex 3: To cause messages to be processed periodically, this function should
+ * do nothing, and a periodic event should invoke dispatch_flush().
+ **/
+typedef void (*dispatch_alertfn_t)(struct dispatch_t *,
+                                   channel_id_t, void *);
+
+int dispatch_set_alert_fn(dispatch_t *d, channel_id_t chan,
+                          dispatch_alertfn_t fn, void *userdata);
+
+#define dispatch_free_msg(d,msg)                                \
+  STMT_BEGIN {                                                  \
+    msg_t **msg_tmp_ptr__ = &(msg);                             \
+    dispatch_free_msg_((d), *msg_tmp_ptr__);                    \
+    *msg_tmp_ptr__= NULL;                                       \
+  } STMT_END
+void dispatch_free_msg_(const dispatch_t *d, msg_t *msg);
+
+char *dispatch_fmt_msg_data(const dispatch_t *d, const msg_t *msg);
+
+#endif
diff --git a/src/lib/dispatch/dispatch_cfg.c b/src/lib/dispatch/dispatch_cfg.c
new file mode 100644
index 000000000..26e37f469
--- /dev/null
+++ b/src/lib/dispatch/dispatch_cfg.c
@@ -0,0 +1,138 @@
+/* Copyright (c) 2001, Matej Pfajfar.
+ * Copyright (c) 2001-2004, Roger Dingledine.
+ * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
+ * Copyright (c) 2007-2018, The Tor Project, Inc. */
+/* See LICENSE for licensing information */
+
+/**
+ * \file dispatch_cfg.c
+ * \brief Create and configure a dispatch_cfg_t.
+ *
+ * A dispatch_cfg_t object is used to configure a set of messages and
+ * associated information before creating a dispatch_t.
+ */
+
+#define DISPATCH_PRIVATE
+
+#include "orconfig.h"
+#include "lib/dispatch/dispatch_cfg.h"
+#include "lib/dispatch/dispatch_cfg_st.h"
+#include "lib/dispatch/dispatch.h"
+#include "lib/dispatch/dispatch_st.h"
+
+#include "lib/container/smartlist.h"
+#include "lib/malloc/malloc.h"
+
+/**
+ * Create and return a new dispatch_cfg_t.
+ **/
+dispatch_cfg_t *
+dcfg_new(void)
+{
+  dispatch_cfg_t *cfg = tor_malloc(sizeof(dispatch_cfg_t));
+  cfg->type_by_msg = smartlist_new();
+  cfg->chan_by_msg = smartlist_new();
+  cfg->fns_by_type = smartlist_new();
+  cfg->recv_by_msg = smartlist_new();
+  return cfg;
+}
+
+/** DOCDOC */
+#define ID_TO_VOID(id) ((void*)(uintptr_t)(id))
+
+/**
+ * Associate a message with a datatype.  Return 0 on success, -1 if a
+ * different type was previously associated with the message ID.
+ **/
+int
+dcfg_msg_set_type(dispatch_cfg_t *cfg, message_id_t msg,
+                  msg_type_id_t type)
+{
+  smartlist_grow(cfg->type_by_msg, msg+1);
+  void *oldval = smartlist_get(cfg->type_by_msg, msg);
+  if (oldval != NULL && oldval != ID_TO_VOID(type)) {
+    return -1;
+  }
+  smartlist_set(cfg->type_by_msg, msg, ID_TO_VOID(type));
+  return 0;
+}
+
+/**
+ * Associate a message with a channel.  Return 0 on success, -1 if a
+ * different channel was previously associated with the message ID.
+ **/
+int
+dcfg_msg_set_chan(dispatch_cfg_t *cfg, message_id_t msg,
+                  channel_id_t chan)
+{
+  smartlist_grow(cfg->chan_by_msg, msg+1);
+  void *oldval = smartlist_get(cfg->chan_by_msg, msg);
+  if (oldval != NULL && oldval != ID_TO_VOID(chan)) {
+    return -1;
+  }
+  smartlist_set(cfg->chan_by_msg, msg, ID_TO_VOID(chan));
+  return 0;
+}
+
+/**
+ * Associate a set of functions with a datatype. Return 0 on success, -1 if
+ * different functions were previously associated with the type.
+ **/
+int
+dcfg_type_set_fns(dispatch_cfg_t *cfg, msg_type_id_t type,
+                  const dispatch_typefns_t *fns)
+{
+  smartlist_grow(cfg->fns_by_type, type+1);
+  dispatch_typefns_t *oldfns = smartlist_get(cfg->fns_by_type, type);
+  if (oldfns && (oldfns->free_fn != fns->free_fn ||
+                 oldfns->fmt_fn != fns->fmt_fn))
+    return -1;
+  smartlist_set(cfg->fns_by_type, type, (dispatch_typefns_t *) fns);
+  return 0;
+}
+
+/**
+ * Associate a receiver with a message ID.  Multiple receivers may be
+ * associated with a single messasge ID.
+ *
+ * Return 0 on success, on failure.
+ **/
+int
+dcfg_add_recv(dispatch_cfg_t *cfg, message_id_t msg,
+              subsys_id_t sys, recv_fn_t fn)
+{
+  smartlist_grow(cfg->recv_by_msg, msg+1);
+  smartlist_t *receivers = smartlist_get(cfg->recv_by_msg, msg);
+  if (!receivers) {
+    receivers = smartlist_new();
+    smartlist_set(cfg->recv_by_msg, msg, receivers);
+  }
+
+  dispatch_rcv_t *rcv = tor_malloc(sizeof(dispatch_rcv_t));
+  rcv->sys = sys;
+  rcv->enabled = true;
+  rcv->fn = fn;
+  smartlist_add(receivers, (void*)rcv);
+  return 0;
+}
+
+/** Helper: release all storage held by <b>cfg</b>. */
+void
+dcfg_free_(dispatch_cfg_t *cfg)
+{
+  if (!cfg)
+    return;
+
+  smartlist_free(cfg->type_by_msg);
+  smartlist_free(cfg->chan_by_msg);
+  smartlist_free(cfg->fns_by_type);
+  SMARTLIST_FOREACH_BEGIN(cfg->recv_by_msg, smartlist_t *, receivers) {
+    if (!receivers)
+      continue;
+    SMARTLIST_FOREACH(receivers, dispatch_rcv_t *, rcv, tor_free(rcv));
+    smartlist_free(receivers);
+  } SMARTLIST_FOREACH_END(receivers);
+  smartlist_free(cfg->recv_by_msg);
+
+  tor_free(cfg);
+}
diff --git a/src/lib/dispatch/dispatch_cfg.h b/src/lib/dispatch/dispatch_cfg.h
new file mode 100644
index 000000000..2c755e39b
--- /dev/null
+++ b/src/lib/dispatch/dispatch_cfg.h
@@ -0,0 +1,39 @@
+/* Copyright (c) 2001, Matej Pfajfar.
+ * Copyright (c) 2001-2004, Roger Dingledine.
+ * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
+ * Copyright (c) 2007-2018, The Tor Project, Inc. */
+/* See LICENSE for licensing information */
+
+#ifndef TOR_DISPATCH_CFG_H
+#define TOR_DISPATCH_CFG_H
+
+#include "lib/dispatch/msgtypes.h"
+
+/**
+ * A "dispatch_cfg" is the configuration used to set up a dispatcher.
+ * It is created and accessed with a set of dcfg_* functions, and then
+ * used with dispatcher_new() to make the dispatcher.
+ */
+typedef struct dispatch_cfg_t dispatch_cfg_t;
+
+dispatch_cfg_t *dcfg_new(void);
+
+int dcfg_msg_set_type(dispatch_cfg_t *cfg, message_id_t msg,
+                      msg_type_id_t type);
+
+int dcfg_msg_set_chan(dispatch_cfg_t *cfg, message_id_t msg,
+                      channel_id_t chan);
+
+int dcfg_type_set_fns(dispatch_cfg_t *cfg, msg_type_id_t type,
+                      const dispatch_typefns_t *fns);
+
+int dcfg_add_recv(dispatch_cfg_t *cfg, message_id_t msg,
+                  subsys_id_t sys, recv_fn_t fn);
+
+/** Free a dispatch_cfg_t. */
+#define dcfg_free(cfg) \
+  FREE_AND_NULL(dispatch_cfg_t, dcfg_free_, (cfg))
+
+void dcfg_free_(dispatch_cfg_t *cfg);
+
+#endif
diff --git a/src/lib/dispatch/dispatch_cfg_st.h b/src/lib/dispatch/dispatch_cfg_st.h
new file mode 100644
index 000000000..d004fe593
--- /dev/null
+++ b/src/lib/dispatch/dispatch_cfg_st.h
@@ -0,0 +1,25 @@
+/* Copyright (c) 2001, Matej Pfajfar.
+ * Copyright (c) 2001-2004, Roger Dingledine.
+ * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
+ * Copyright (c) 2007-2018, The Tor Project, Inc. */
+/* See LICENSE for licensing information */
+
+#ifndef TOR_DISPATCH_CFG_ST_H
+#define TOR_DISPATCH_CFG_ST_H
+
+struct smartlist_t;
+
+/* Information needed to create a dispatcher, but in a less efficient, more
+ * mutable format. */
+struct dispatch_cfg_t {
+  /** A list of msg_type_id_t (cast to void*), indexed by msg_t. */
+  struct smartlist_t *type_by_msg;
+  /** A list of channel_id_t (cast to void*), indexed by msg_t. */
+  struct smartlist_t *chan_by_msg;
+  /** A list of dispatch_rcv_t, indexed by msg_type_id_t. */
+  struct smartlist_t *fns_by_type;
+  /** A list of dispatch_typefns_t, indexed by msg_t. */
+  struct smartlist_t *recv_by_msg;
+};
+
+#endif
diff --git a/src/lib/dispatch/dispatch_core.c b/src/lib/dispatch/dispatch_core.c
new file mode 100644
index 000000000..24dfc649a
--- /dev/null
+++ b/src/lib/dispatch/dispatch_core.c
@@ -0,0 +1,234 @@
+/* Copyright (c) 2001, Matej Pfajfar.
+ * Copyright (c) 2001-2004, Roger Dingledine.
+ * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
+ * Copyright (c) 2007-2018, The Tor Project, Inc. */
+/* See LICENSE for licensing information */
+
+/**
+ * \file dispatch_core.c
+ * \brief Core module for sending and receiving messages.
+ */
+
+#define DISPATCH_PRIVATE
+#include "orconfig.h"
+
+#include "lib/dispatch/dispatch.h"
+#include "lib/dispatch/dispatch_st.h"
+
+#include "lib/malloc/malloc.h"
+#include "lib/log/util_bug.h"
+
+#include <string.h>
+
+/**
+ * Use <b>d</b> to drop all storage held for <b>msg</b>.
+ *
+ * (We need the dispatcher so we know how to free the auxiliary data.)
+ **/
+void
+dispatch_free_msg_(const dispatch_t *d, msg_t *msg)
+{
+  if (!msg)
+    return;
+
+  d->typefns[msg->type].free_fn(msg->aux_data__);
+  tor_free(msg);
+}
+
+/**
+ * Format the auxiliary data held by msg.
+ **/
+char *
+dispatch_fmt_msg_data(const dispatch_t *d, const msg_t *msg)
+{
+  if (!msg)
+    return NULL;
+
+  return d->typefns[msg->type].fmt_fn(msg->aux_data__);
+}
+
+/**
+ * Release all storage held by <b>d</b>.
+ **/
+void
+dispatch_free_(dispatch_t *d)
+{
+  if (d == NULL)
+    return;
+
+  size_t n_queues = d->n_queues;
+  for (size_t i = 0; i < n_queues; ++i) {
+    msg_t *m, *mtmp;
+    TOR_SIMPLEQ_FOREACH_SAFE(m, &d->queues[i].queue, next, mtmp) {
+      dispatch_free_msg(d, m);
+    }
+  }
+
+  size_t n_msgs = d->n_msgs;
+
+  for (size_t i = 0; i < n_msgs; ++i) {
+    tor_free(d->table[i]);
+  }
+  tor_free(d->table);
+  tor_free(d->typefns);
+  tor_free(d->queues);
+
+  // This is the only time we will treat d->cfg as non-const.
+  //dispatch_cfg_free_((dispatch_items_t *) d->cfg);
+
+  tor_free(d);
+}
+
+/**
+ * Tell the dispatcher to call <b>fn</b> with <b>userdata</b> whenever
+ * <b>chan</b> becomes nonempty.  Return 0 on success, -1 on error.
+ **/
+int
+dispatch_set_alert_fn(dispatch_t *d, channel_id_t chan,
+                      dispatch_alertfn_t fn, void *userdata)
+{
+  if (BUG(chan >= d->n_queues))
+    return -1;
+
+  dqueue_t *q = &d->queues[chan];
+  q->alert_fn = fn;
+  q->alert_fn_arg = userdata;
+  return 0;
+}
+
+/**
+ * Send a message on the appropriate channel notifying that channel if
+ * necessary.
+ *
+ * This function takes ownership of the auxiliary data; it can't be static or
+ * stack-allocated, and the caller is not allowed to use it afterwards.
+ *
+ * This function does not check the various vields of the message object for
+ * consistency.
+ **/
+int
+dispatch_send(dispatch_t *d,
+              subsys_id_t sender,
+              channel_id_t channel,
+              message_id_t msg,
+              msg_type_id_t type,
+              msg_aux_data_t auxdata)
+{
+  if (!d->table[msg]) {
+    /* Fast path: nobody wants this data. */
+
+    d->typefns[type].free_fn(auxdata);
+    return 0;
+  }
+
+  msg_t *m = tor_malloc(sizeof(msg_t));
+
+  m->sender = sender;
+  m->channel = channel;
+  m->msg = msg;
+  m->type = type;
+  memcpy(&m->aux_data__, &auxdata, sizeof(msg_aux_data_t));
+
+  return dispatch_send_msg(d, m);
+}
+
+int
+dispatch_send_msg(dispatch_t *d, msg_t *m)
+{
+  if (BUG(!d))
+    goto err;
+  if (BUG(!m))
+    goto err;
+  if (BUG(m->channel >= d->n_queues))
+    goto err;
+  if (BUG(m->msg >= d->n_msgs))
+    goto err;
+
+  dtbl_entry_t *ent = d->table[m->msg];
+  if (ent) {
+    if (BUG(m->type != ent->type))
+      goto err;
+    if (BUG(m->channel != ent->channel))
+      goto err;
+  }
+
+  return dispatch_send_msg_unchecked(d, m);
+ err:
+  /* Probably it isn't safe to free m, since type could be wrong. */
+  return -1;
+}
+
+/**
+ * Send a message on the appropriate queue, notifying that queue if necessary.
+ *
+ * This function takes ownership of the message object and its auxiliary data;
+ * it can't be static or stack-allocated, and the caller isn't allowed to use
+ * it afterwards.
+ *
+ * This function does not check the various fields of the message object for
+ * consistency, and can crash if they are out of range.  Only functions that
+ * have already constructed the message in a safe way, or checked it for
+ * correctness themselves, should call this function.
+ **/
+int
+dispatch_send_msg_unchecked(dispatch_t *d, msg_t *m)
+{
+  /* Find the right queue. */
+  dqueue_t *q = &d->queues[m->channel];
+  bool was_empty = TOR_SIMPLEQ_EMPTY(&q->queue);
+
+  /* Append the message. */
+  TOR_SIMPLEQ_INSERT_TAIL(&q->queue, m, next);
+
+  /* If we just made the queue nonempty for the first time, call the alert
+   * function. */
+  if (was_empty) {
+    q->alert_fn(d, m->channel, q->alert_fn_arg);
+  }
+
+  return 0;
+}
+
+/**
+ * Run all of the callbacks on <b>d</b> associated with <b>m</b>.
+ **/
+static void
+dispatcher_run_msg_cbs(const dispatch_t *d, msg_t *m)
+{
+  tor_assert(m->msg <= d->n_msgs);
+  dtbl_entry_t *ent = d->table[m->msg];
+  int n_fns = ent->n_fns;
+
+  int i;
+  for (i=0; i < n_fns; ++i) {
+    if (ent->rcv[i].enabled)
+      ent->rcv[i].fn(m);
+  }
+}
+
+/**
+ * Run up to <b>max_msgs</b> callbacks for messages on the channel <b>ch</b>
+ * on the given dispatcher.  Return 0 on success or recoverable failure,
+ * -1 on unrecoverable error.
+ **/
+int
+dispatch_flush(dispatch_t *d, channel_id_t ch, int max_msgs)
+{
+  if (BUG(ch >= d->n_queues))
+    return 0;
+
+  int n_flushed = 0;
+  dqueue_t *q = &d->queues[ch];
+
+  while (n_flushed < max_msgs) {
+    msg_t *m = TOR_SIMPLEQ_FIRST(&q->queue);
+    if (!m)
+      break;
+    TOR_SIMPLEQ_REMOVE_HEAD(&q->queue, next);
+    dispatcher_run_msg_cbs(d, m);
+    dispatch_free_msg(d, m);
+    ++n_flushed;
+  }
+
+  return 0;
+}
diff --git a/src/lib/dispatch/dispatch_new.c b/src/lib/dispatch/dispatch_new.c
new file mode 100644
index 000000000..a2879016a
--- /dev/null
+++ b/src/lib/dispatch/dispatch_new.c
@@ -0,0 +1,170 @@
+/* Copyright (c) 2001, Matej Pfajfar.
+ * Copyright (c) 2001-2004, Roger Dingledine.
+ * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
+ * Copyright (c) 2007-2018, The Tor Project, Inc. */
+/* See LICENSE for licensing information */
+
+/**
+ * \file dispatch_new.c
+ * \brief Code to construct a dispatch_t from a dispatch_cfg_t.
+ **/
+
+#define DISPATCH_PRIVATE
+#include "orconfig.h"
+
+#include "lib/dispatch/dispatch.h"
+#include "lib/dispatch/dispatch_st.h"
+#include "lib/dispatch/dispatch_cfg.h"
+#include "lib/dispatch/dispatch_cfg_st.h"
+
+#include "lib/intmath/cmp.h"
+#include "lib/malloc/malloc.h"
+#include "lib/log/util_bug.h"
+
+#include <string.h>
+
+/** Convert a void* in a smartlist to the corresponding integer. */
+#define VOID_TO_ID(p) ((intptr_t)(p))
+
+/** Given a smartlist full of void* fields encoding intptr_t values,
+ * return the largest intptr_t, or dflt if the list is empty. */
+static intptr_t
+max_in_sl(const smartlist_t *sl, intptr_t dflt)
+{
+  if (!smartlist_len(sl))
+    return dflt;
+  void *as_ptr = smartlist_get(sl, 0);
+  intptr_t max = VOID_TO_ID(as_ptr);
+  SMARTLIST_FOREACH_BEGIN(sl, void *, p) {
+    intptr_t i = VOID_TO_ID(p);
+    if (i > max)
+      max = i;
+  } SMARTLIST_FOREACH_END(p);
+  return max;
+}
+
+/** Helper: Format an unformattable message auxiliary data item: just return a
+* copy of the string <>. */
+static char *
+type_fmt_nop(msg_aux_data_t arg)
+{
+  (void)arg;
+  return tor_strdup("<>");
+}
+
+/** Helper: Free an unfreeable message auxiliary data item: do nothing. */
+static void
+type_free_nop(msg_aux_data_t arg)
+{
+  (void)arg;
+}
+
+/** Type functions to use when no type functions are provided. */
+static dispatch_typefns_t nop_typefns = {
+  .free_fn = type_free_nop,
+  .fmt_fn = type_fmt_nop
+};
+
+/**
+ * Alert function to use when none is configured: do nothing.
+ **/
+static void
+alert_fn_nop(dispatch_t *d, channel_id_t ch, void *arg)
+{
+  (void)d;
+  (void)ch;
+  (void)arg;
+}
+
+/**
+ * Given a list of recvfn_t, create and return a new dtbl_entry_t mapping
+ * to each of those functions.
+ **/
+static dtbl_entry_t *
+dtbl_entry_from_lst(smartlist_t *receivers)
+{
+  if (!receivers)
+    return NULL;
+
+  size_t n_recv = smartlist_len(receivers);
+  dtbl_entry_t *ent;
+  ent = tor_malloc_zero(offsetof(dtbl_entry_t, rcv) +
+                        sizeof(dispatch_rcv_t) * n_recv);
+
+  ent->n_fns = n_recv;
+
+  SMARTLIST_FOREACH_BEGIN(receivers, const dispatch_rcv_t *, rcv) {
+    memcpy(&ent->rcv[rcv_sl_idx], rcv, sizeof(*rcv));
+    if (rcv->enabled) {
+      ++ent->n_enabled;
+    }
+  } SMARTLIST_FOREACH_END(rcv);
+
+  return ent;
+}
+
+/** Create and return a new dispatcher from a given dispatch_cfg_t. */
+dispatch_t *
+dispatch_new(const dispatch_cfg_t *cfg)
+{
+  dispatch_t *d = tor_malloc_zero(sizeof(dispatch_t));
+
+  /* Any message that has a type or a receiver counts towards our messages */
+  const size_t n_msgs = MAX(smartlist_len(cfg->type_by_msg),
+                            smartlist_len(cfg->recv_by_msg)) + 1;
+
+  /* Any channel that any message has counts towards the number of channels. */
+  const size_t n_chans = (size_t) MAX(1, max_in_sl(cfg->chan_by_msg,0)) + 1;
+
+  /* Any type that a message has, or that has functions, counts towards
+   * the number of types. */
+  const size_t n_types = (size_t) MAX(max_in_sl(cfg->type_by_msg,0),
+                                      smartlist_len(cfg->fns_by_type)) + 1;
+
+  d->n_msgs = n_msgs;
+  d->n_queues = n_chans;
+  d->n_types = n_types;
+
+  /* Initialize the array of type-functions. */
+  d->typefns = tor_calloc(n_types, sizeof(dispatch_typefns_t));
+  for (size_t i = 0; i < n_types; ++i) {
+    /* Default to no-op for everything... */
+    memcpy(&d->typefns[i], &nop_typefns, sizeof(dispatch_typefns_t));
+  }
+  SMARTLIST_FOREACH_BEGIN(cfg->fns_by_type, dispatch_typefns_t *, fns) {
+    /* Set the functions if they are provided. */
+    if (fns) {
+      if (fns->free_fn)
+        d->typefns[fns_sl_idx].free_fn = fns->free_fn;
+      if (fns->fmt_fn)
+        d->typefns[fns_sl_idx].fmt_fn = fns->fmt_fn;
+    }
+  } SMARTLIST_FOREACH_END(fns);
+
+  /* Initialize the message queues: one for each channel. */
+  d->queues = tor_calloc(d->n_queues, sizeof(dqueue_t));
+  for (size_t i = 0; i < d->n_queues; ++i) {
+    TOR_SIMPLEQ_INIT(&d->queues[i].queue);
+    d->queues[i].alert_fn = alert_fn_nop;
+  }
+
+  /* Build the dispatch tables mapping message IDs to receivers. */
+  d->table = tor_calloc(d->n_msgs, sizeof(dtbl_entry_t *));
+  SMARTLIST_FOREACH_BEGIN(cfg->recv_by_msg, smartlist_t *, rcv) {
+    d->table[rcv_sl_idx] = dtbl_entry_from_lst(rcv);
+  } SMARTLIST_FOREACH_END(rcv);
+
+  /* Fill in the empty entries in the dispatch tables:
+   * types and channels for each message. */
+  SMARTLIST_FOREACH_BEGIN(cfg->type_by_msg, smartlist_t *, type) {
+    if (d->table[type_sl_idx])
+      d->table[type_sl_idx]->type = VOID_TO_ID(type);
+  } SMARTLIST_FOREACH_END(type);
+
+  SMARTLIST_FOREACH_BEGIN(cfg->chan_by_msg, smartlist_t *, chan) {
+    if (d->table[chan_sl_idx])
+      d->table[chan_sl_idx]->channel = VOID_TO_ID(chan);
+  } SMARTLIST_FOREACH_END(chan);
+
+  return d;
+}
diff --git a/src/lib/dispatch/dispatch_st.h b/src/lib/dispatch/dispatch_st.h
new file mode 100644
index 000000000..568107b70
--- /dev/null
+++ b/src/lib/dispatch/dispatch_st.h
@@ -0,0 +1,108 @@
+/* Copyright (c) 2001, Matej Pfajfar.
+ * Copyright (c) 2001-2004, Roger Dingledine.
+ * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
+ * Copyright (c) 2007-2018, The Tor Project, Inc. */
+/* See LICENSE for licensing information */
+
+/**
+ * \file dispatch_st.h
+ *
+ * \brief private structures used for the dispatcher module
+ */
+
+#ifndef TOR_DISPATCH_ST_H
+#define TOR_DISPATCH_ST_H
+
+#ifdef DISPATCH_PRIVATE
+
+#include "lib/container/smartlist.h"
+
+/**
+ * Information about the recipient of a message.
+ **/
+typedef struct dispatch_rcv_t {
+  /** The subsystem receiving a message. */
+  subsys_id_t sys;
+  /** True iff this recipient is enabled. */
+  bool enabled;
+  /** The function that will handle the message. */
+  recv_fn_t fn;
+} dispatch_rcv_t;
+
+/**
+ * Information used by a dispatcher to handle and dispatch a single message
+ * ID.  It maps that message ID to its type, channel, and list of receiver
+ * functions.
+ *
+ * This structure is used when the dispatcher is running.
+ **/
+typedef struct dtbl_entry_t {
+  /** The number of enabled non-stub subscribers for this message.
+   *
+   * Note that for now, this will be the same as <b>n_fns</b>, since there is
+   * no way to turn these subscribers on an off yet. */
+  uint16_t n_enabled;
+  /** The channel that handles this message. */
+  channel_id_t channel;
+  /** The associated C type for this message. */
+  msg_type_id_t type;
+  /**
+   * The number of functions pointers for subscribers that receive this
+   * message, in rcv. */
+  uint16_t n_fns;
+  /**
+   * The recipients for this message.
+   */
+  dispatch_rcv_t rcv[FLEXIBLE_ARRAY_MEMBER];
+} dtbl_entry_t;
+
+/**
+ * A queue of messages for a given channel, used by a live dispatcher.
+ */
+typedef struct dqueue_t {
+  /** The queue of messages itself. */
+  TOR_SIMPLEQ_HEAD( , msg_t) queue;
+  /** A function to be called when the queue becomes nonempty. */
+  dispatch_alertfn_t alert_fn;
+  /** An argument for the alert_fn. */
+  void *alert_fn_arg;
+} dqueue_t ;
+
+/**
+ * A single dispatcher for cross-module messages.
+ */
+struct dispatch_t {
+  /**
+   * The length of <b>table</b>: the number of message IDs that this
+   * dispatcher can handle.
+   */
+  size_t n_msgs;
+  /**
+   * The length of <b>queues</b>: the number of channels that this dispatcher
+   * has configured.
+   */
+  size_t n_queues;
+  /**
+   * The length of <b>typefns</b>: the number of C type IDs that this
+   * dispatcher has configured.
+   */
+  size_t n_types;
+  /**
+   * An array of message queues, indexed by channel ID.
+   */
+  dqueue_t *queues;
+  /**
+   * An array of entries about how to handle particular message types, indexed
+   * by message ID.
+   */
+  dtbl_entry_t **table;
+  /**
+   * An array of function tables for manipulating types, index by message
+   * type ID.
+   **/
+  dispatch_typefns_t *typefns;
+};
+
+#endif
+
+#endif
diff --git a/src/lib/dispatch/include.am b/src/lib/dispatch/include.am
new file mode 100644
index 000000000..c4aa170db
--- /dev/null
+++ b/src/lib/dispatch/include.am
@@ -0,0 +1,23 @@
+
+noinst_LIBRARIES += src/lib/libtor-dispatch.a
+
+if UNITTESTS_ENABLED
+noinst_LIBRARIES += src/lib/libtor-dispatch-testing.a
+endif
+
+src_lib_libtor_dispatch_a_SOURCES =			\
+	src/lib/dispatch/dispatch_cfg.c			\
+	src/lib/dispatch/dispatch_core.c		\
+	src/lib/dispatch/dispatch_new.c
+
+src_lib_libtor_dispatch_testing_a_SOURCES = \
+	$(src_lib_libtor_dispatch_a_SOURCES)
+src_lib_libtor_dispatch_testing_a_CPPFLAGS = $(AM_CPPFLAGS) $(TEST_CPPFLAGS)
+src_lib_libtor_dispatch_testing_a_CFLAGS = $(AM_CFLAGS) $(TEST_CFLAGS)
+
+noinst_HEADERS +=					\
+	src/lib/dispatch/dispatch.h			\
+	src/lib/dispatch/dispatch_cfg.h 		\
+	src/lib/dispatch/dispatch_cfg_st.h 		\
+	src/lib/dispatch/dispatch_st.h			\
+	src/lib/dispatch/msgtypes.h
diff --git a/src/lib/dispatch/msgtypes.h b/src/lib/dispatch/msgtypes.h
new file mode 100644
index 000000000..4e79e592a
--- /dev/null
+++ b/src/lib/dispatch/msgtypes.h
@@ -0,0 +1,80 @@
+/* Copyright (c) 2001, Matej Pfajfar.
+ * Copyright (c) 2001-2004, Roger Dingledine.
+ * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
+ * Copyright (c) 2007-2018, The Tor Project, Inc. */
+/* See LICENSE for licensing information */
+
+/**
+ * \file msgtypes.h
+ * \brief Types used for messages in the dispatcher code.
+ **/
+
+#ifndef TOR_DISPATCH_MSGTYPES_H
+#define TOR_DISPATCH_MSGTYPES_H
+
+#include <stdint.h>
+
+#include "ext/tor_queue.h"
+
+/**
+ * These types are aliases for subsystems, channels, and message IDs.
+ **/
+typedef uint16_t subsys_id_t;
+typedef uint16_t channel_id_t;
+typedef uint16_t message_id_t;
+
+/**
+ * This identifies a C type that can be sent along with a message.
+ **/
+typedef uint16_t msg_type_id_t;
+
+/**
+ * An ID value returned for *_type_t when none exists.
+ */
+#define ERROR_ID 65535
+
+/**
+ * Auxiliary (untyped) data sent along with a message.
+ *
+ * We define this as a union of a pointer and a u64, so that the integer
+ * types will have the same range across platforms.
+ **/
+typedef union {
+  void *ptr;
+  uint64_t u64;
+} msg_aux_data_t;
+
+/**
+ * Structure of a received message.
+ **/
+typedef struct msg_t {
+  TOR_SIMPLEQ_ENTRY(msg_t) next;
+  subsys_id_t sender;
+  channel_id_t channel;
+  message_id_t msg;
+  /** We could omit this field, since it is implicit in the message type, but
+   * IMO let's leave it in for safety. */
+  msg_type_id_t type;
+  /** Untyped auxiliary data. You shouldn't have to mess with this
+   * directly. */
+  msg_aux_data_t aux_data__;
+} msg_t;
+
+/**
+ * A function that a subscriber uses to receive a message.
+ **/
+typedef void (*recv_fn_t)(const msg_t *m);
+
+/**
+ * Table of functions to use for a given C type.  Any omitted (NULL) functions
+ * will be treated as no-ops.
+ **/
+typedef struct dispatch_typefns_t {
+  /** Release storage held for the auxiliary data of this type. */
+  void (*free_fn)(msg_aux_data_t);
+  /** Format and return a newly allocated string describing the contents
+   * of this data element. */
+  char *(*fmt_fn)(msg_aux_data_t);
+} dispatch_typefns_t;
+
+#endif
diff --git a/src/test/include.am b/src/test/include.am
index c3827d3eb..d4e8eb4e8 100644
--- a/src/test/include.am
+++ b/src/test/include.am
@@ -124,6 +124,7 @@ src_test_test_SOURCES += \
 	src/test/test_dir.c \
 	src/test/test_dir_common.c \
 	src/test/test_dir_handle_get.c \
+	src/test/test_dispatch.c \
 	src/test/test_dos.c \
 	src/test/test_entryconn.c \
 	src/test/test_entrynodes.c \
diff --git a/src/test/test.c b/src/test/test.c
index 1230b632a..4c6d9775b 100644
--- a/src/test/test.c
+++ b/src/test/test.c
@@ -872,6 +872,7 @@ struct testgroup_t testgroups[] = {
   { "dir/voting/flags/", voting_flags_tests },
   { "dir/voting/schedule/", voting_schedule_tests },
   { "dir_handle_get/", dir_handle_get_tests },
+  { "dispatch/", dispatch_tests, },
   { "dns/", dns_tests },
   { "dos/", dos_tests },
   { "entryconn/", entryconn_tests },
diff --git a/src/test/test.h b/src/test/test.h
index 7a3a4d8fd..cd0ce4f6d 100644
--- a/src/test/test.h
+++ b/src/test/test.h
@@ -209,6 +209,7 @@ extern struct testcase_t crypto_openssl_tests[];
 extern struct testcase_t crypto_tests[];
 extern struct testcase_t dir_handle_get_tests[];
 extern struct testcase_t dir_tests[];
+extern struct testcase_t dispatch_tests[];
 extern struct testcase_t dns_tests[];
 extern struct testcase_t dos_tests[];
 extern struct testcase_t entryconn_tests[];
diff --git a/src/test/test_dispatch.c b/src/test/test_dispatch.c
new file mode 100644
index 000000000..ec704c042
--- /dev/null
+++ b/src/test/test_dispatch.c
@@ -0,0 +1,181 @@
+/* Copyright (c) 2018, The Tor Project, Inc. */
+/* See LICENSE for licensing information */
+
+#define DISPATCH_PRIVATE
+
+#include "test/test.h"
+
+#include "lib/dispatch/dispatch.h"
+#include "lib/dispatch/dispatch_cfg.h"
+#include "lib/dispatch/dispatch_st.h"
+#include "lib/dispatch/msgtypes.h"
+
+#include "lib/log/escape.h"
+#include "lib/malloc/malloc.h"
+#include "lib/string/printf.h"
+
+#include <stdio.h>
+#include <string.h>
+
+/* Construct an empty dispatch_t. */
+static void
+test_dispatch_empty(void *arg)
+{
+  (void)arg;
+
+  dispatch_t *d=NULL;
+  dispatch_cfg_t *cfg=NULL;
+
+  cfg = dcfg_new();
+  d = dispatch_new(cfg);
+  tt_assert(d);
+
+ done:
+  dispatch_free(d);
+  dcfg_free(cfg);
+}
+
+static int total_recv1_simple = 0;
+static int total_recv2_simple = 0;
+
+static void
+simple_recv1(const msg_t *m)
+{
+  total_recv1_simple += m->aux_data__.u64;
+}
+
+static void
+simple_recv2(const msg_t *m)
+{
+  total_recv2_simple += m->aux_data__.u64*10;
+}
+
+/* Construct a dispatch_t with two messages, make sure that they both get
+ * delivered. */
+static void
+test_dispatch_simple(void *arg)
+{
+  (void)arg;
+
+  dispatch_t *d=NULL;
+  dispatch_cfg_t *cfg=NULL;
+  int r;
+
+  cfg = dcfg_new();
+  r = dcfg_msg_set_type(cfg,0,0);
+  r += dcfg_msg_set_chan(cfg,0,0);
+  r += dcfg_add_recv(cfg,0,1,simple_recv1);
+  r += dcfg_msg_set_type(cfg,1,0);
+  r += dcfg_msg_set_chan(cfg,1,0);
+  r += dcfg_add_recv(cfg,1,1,simple_recv2);
+  r += dcfg_add_recv(cfg,1,1,simple_recv2); /* second copy */
+  tt_int_op(r, OP_EQ, 0);
+
+  d = dispatch_new(cfg);
+  tt_assert(d);
+
+  msg_aux_data_t data = {.u64 = 7};
+  r = dispatch_send(d, 99, 0, 0, 0, data);
+  tt_int_op(r, OP_EQ, 0);
+  tt_int_op(total_recv1_simple, OP_EQ, 0);
+
+  r = dispatch_flush(d, 0, INT_MAX);
+  tt_int_op(r, OP_EQ, 0);
+  tt_int_op(total_recv1_simple, OP_EQ, 7);
+  tt_int_op(total_recv2_simple, OP_EQ, 0);
+
+  total_recv1_simple = 0;
+  r = dispatch_send(d, 99, 0, 1, 0, data);
+  tt_int_op(r, OP_EQ, 0);
+  r = dispatch_flush(d, 0, INT_MAX);
+  tt_int_op(total_recv1_simple, OP_EQ, 0);
+  tt_int_op(total_recv2_simple, OP_EQ, 140);
+
+ done:
+  dispatch_free(d);
+  dcfg_free(cfg);
+}
+
+struct coord { int x; int y; };
+static void
+free_coord(msg_aux_data_t d)
+{
+  tor_free(d.ptr);
+}
+static char *
+fmt_coord(msg_aux_data_t d)
+{
+  char *v;
+  struct coord *c = d.ptr;
+  tor_asprintf(&v, "[%d, %d]", c->x, c->y);
+  return v;
+}
+static dispatch_typefns_t coord_fns = {
+  .fmt_fn = fmt_coord,
+  .free_fn = free_coord,
+};
+static void
+alert_run_immediate(dispatch_t *d, channel_id_t ch, void *arg)
+{
+  (void)arg;
+  dispatch_flush(d, ch, INT_MAX);
+}
+
+static dispatch_t *dispatcher_in_use=NULL;
+static char *received_data=NULL;
+
+static void
+recv_typed_data(const msg_t *m)
+{
+  tor_free(received_data);
+  received_data = dispatch_fmt_msg_data(dispatcher_in_use, m);
+}
+
+static void
+test_dispatch_with_types(void *arg)
+{
+  (void)arg;
+
+  dispatch_t *d=NULL;
+  dispatch_cfg_t *cfg=NULL;
+  int r;
+
+  cfg = dcfg_new();
+  r = dcfg_msg_set_type(cfg,5,3);
+  r += dcfg_msg_set_chan(cfg,5,2);
+  r += dcfg_add_recv(cfg,5,0,recv_typed_data);
+  r += dcfg_type_set_fns(cfg,3,&coord_fns);
+  tt_int_op(r, OP_EQ, 0);
+
+  d = dispatch_new(cfg);
+  tt_assert(d);
+  dispatcher_in_use = d;
+
+  /* Make this message get run immediately. */
+  r = dispatch_set_alert_fn(d, 2, alert_run_immediate, NULL);
+  tt_int_op(r, OP_EQ, 0);
+
+  struct coord *xy = tor_malloc(sizeof(*xy));
+  xy->x = 13;
+  xy->y = 37;
+  msg_aux_data_t data = {.ptr = xy};
+  r = dispatch_send(d, 99/*sender*/, 2/*channel*/, 5/*msg*/, 3/*type*/, data);
+  tt_int_op(r, OP_EQ, 0);
+  tt_str_op(received_data, OP_EQ, "[13, 37]");
+
+ done:
+  dispatch_free(d);
+  dcfg_free(cfg);
+  tor_free(received_data);
+  dispatcher_in_use = NULL;
+}
+
+#define T(name)                                                 \
+  { #name, test_dispatch_ ## name, TT_FORK, NULL, NULL }
+
+struct testcase_t dispatch_tests[] = {
+  T(empty),
+  T(simple),
+  T(with_types),
+  END_OF_TESTCASES
+};





More information about the tor-commits mailing list