[tor-commits] [tor/master] Pubsub: an OO layer on top of lib/dispatch

asn at torproject.org asn at torproject.org
Wed Mar 27 12:31:31 UTC 2019


commit 9e60482b8073f2d43187c36c9159fd4367d7140a
Author: Nick Mathewson <nickm at torproject.org>
Date:   Tue Jan 15 09:01:20 2019 -0500

    Pubsub: an OO layer on top of lib/dispatch
    
    This "publish/subscribe" layer sits on top of lib/dispatch, and
    tries to provide more type-safety and cross-checking for the
    lower-level layer.
    
    Even with this commit, we're still not done: more checking will come
    in the next commit, and a set of usability/typesafety macros will
    come after.
---
 src/lib/pubsub/.may_include        |  10 ++
 src/lib/pubsub/include.am          |  24 +++
 src/lib/pubsub/pub_binding_st.h    |  36 +++++
 src/lib/pubsub/pubsub.h            |  85 +++++++++++
 src/lib/pubsub/pubsub_build.c      | 286 ++++++++++++++++++++++++++++++++++
 src/lib/pubsub/pubsub_build.h      |  87 +++++++++++
 src/lib/pubsub/pubsub_builder_st.h | 161 ++++++++++++++++++++
 src/lib/pubsub/pubsub_connect.h    |  47 ++++++
 src/lib/pubsub/pubsub_flags.h      |  32 ++++
 src/lib/pubsub/pubsub_publish.c    |  70 +++++++++
 src/lib/pubsub/pubsub_publish.h    |  15 ++
 src/test/include.am                |   1 +
 src/test/test.c                    |   1 +
 src/test/test.h                    |   1 +
 src/test/test_pubsub_msg.c         | 305 +++++++++++++++++++++++++++++++++++++
 15 files changed, 1161 insertions(+)

diff --git a/src/lib/pubsub/.may_include b/src/lib/pubsub/.may_include
new file mode 100644
index 000000000..5623492f0
--- /dev/null
+++ b/src/lib/pubsub/.may_include
@@ -0,0 +1,10 @@
+orconfig.h
+
+lib/cc/*.h
+lib/container/*.h
+lib/dispatch/*.h
+lib/intmath/*.h
+lib/log/*.h
+lib/malloc/*.h
+lib/pubsub/*.h
+lib/string/*.h
diff --git a/src/lib/pubsub/include.am b/src/lib/pubsub/include.am
new file mode 100644
index 000000000..9856c94a5
--- /dev/null
+++ b/src/lib/pubsub/include.am
@@ -0,0 +1,24 @@
+
+noinst_LIBRARIES += src/lib/libtor-pubsub.a
+
+if UNITTESTS_ENABLED
+noinst_LIBRARIES += src/lib/libtor-pubsub-testing.a
+endif
+
+src_lib_libtor_pubsub_a_SOURCES =			\
+	src/lib/pubsub/pubsub_build.c			\
+	src/lib/pubsub/pubsub_publish.c
+
+src_lib_libtor_pubsub_testing_a_SOURCES = \
+	$(src_lib_libtor_pubsub_a_SOURCES)
+src_lib_libtor_pubsub_testing_a_CPPFLAGS = $(AM_CPPFLAGS) $(TEST_CPPFLAGS)
+src_lib_libtor_pubsub_testing_a_CFLAGS = $(AM_CFLAGS) $(TEST_CFLAGS)
+
+noinst_HEADERS +=					\
+	src/lib/pubsub/pub_binding_st.h			\
+	src/lib/pubsub/pubsub.h				\
+	src/lib/pubsub/pubsub_build.h			\
+	src/lib/pubsub/pubsub_builder_st.h		\
+	src/lib/pubsub/pubsub_connect.h			\
+	src/lib/pubsub/pubsub_flags.h			\
+	src/lib/pubsub/pubsub_publish.h
diff --git a/src/lib/pubsub/pub_binding_st.h b/src/lib/pubsub/pub_binding_st.h
new file mode 100644
index 000000000..e8c0d47ef
--- /dev/null
+++ b/src/lib/pubsub/pub_binding_st.h
@@ -0,0 +1,36 @@
+/* Copyright (c) 2001, Matej Pfajfar.
+ * Copyright (c) 2001-2004, Roger Dingledine.
+ * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
+ * Copyright (c) 2007-2018, The Tor Project, Inc. */
+/* See LICENSE for licensing information */
+
+/**
+ * @file pubsub_build.h
+ * @brief Declaration of pub_binding_t.
+ */
+
+#ifndef TOR_PUB_BINDING_ST_H
+#define TOR_PUB_BINDING_ST_H
+
+#include "lib/dispatch/msgtypes.h"
+struct dispatch_t;
+
+/**
+ * A pub_binding_t is an opaque object that subsystems use to publish
+ * messages.  The DISPATCH_ADD_PUB*() macros set it up.
+ **/
+typedef struct pub_binding_t {
+  /**
+   * A pointer to a configured dispatch_t object.  This is filled in
+   * when the dispatch_t is finally constructed.
+   **/
+  struct dispatch_t *dispatch_ptr;
+  /**
+   * A template for the msg_t fields that are filled in for this message.
+   * This is copied into outgoing messages, ensuring that their fields are set
+   * corretly.
+   **/
+  msg_t msg_template;
+} pub_binding_t;
+
+#endif
diff --git a/src/lib/pubsub/pubsub.h b/src/lib/pubsub/pubsub.h
new file mode 100644
index 000000000..303b36ad5
--- /dev/null
+++ b/src/lib/pubsub/pubsub.h
@@ -0,0 +1,85 @@
+/* Copyright (c) 2001, Matej Pfajfar.
+ * Copyright (c) 2001-2004, Roger Dingledine.
+ * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
+ * Copyright (c) 2007-2018, The Tor Project, Inc. */
+/* See LICENSE for licensing information */
+
+/**
+ * @file pubsub.h
+ * @brief Header for OO publish-subscribe functionality.
+ *
+ * This module provides a wrapper around the "dispatch" module,
+ * ensuring type-safety and allowing us to do static analysis on
+ * publication and subscriptions.
+ *
+ * With this module, we enforce:
+ *  <ul>
+ *   <li>that every message has (potential) publishers and subscribers;
+ *   <li>that every message is published and subscribed from the correct
+ *       channels, with the correct type ID, every time it is published.
+ *   <li>that type IDs correspond to a single C type, and that the C types are
+ *       used correctly.
+ *   <li>that when a message is published or subscribed, it is done with
+ *       a correct subsystem identifier
+ * </ul>
+ *
+ * We do this by making "publication requests" and "subscription requests"
+ * into objects, and doing some computation on them before we create
+ * a dispatch_t with them.
+ *
+ * Rather than using the dispatch module directly, a publishing module
+ * receives a "binding" object that it uses to send messages with the right
+ * settings.
+ */
+
+/*
+ *
+ * Overview: Messages are sent over channels.  Before sending a message on a
+ * channel, or receiving a message on a channel, a subsystem needs to register
+ * that it publishes, or subscribes, to that message, on that channel.
+ *
+ * Messages, channels, and subsystems are represented internally as short
+ * integers, though they are associated with human-readable strings for
+ * initialization and debugging.
+ *
+ * When registering for a message, a subsystem must say whether it is an
+ * exclusive publisher/subscriber to that message type, or whether other
+ * subsystems may also publish/subscribe to it.
+ *
+ * All messages and their publishers/subscribers must be registered early in
+ * the initialization process.
+ *
+ * By default, it is an error for a message type to have publishers and no
+ * subscribers on a channel, or subscribers and no publishers on a channel.
+ *
+ * A subsystem may register for a message with a note that delivery or
+ * production is disabled -- for example, because the subsystem is
+ * disabled at compile-time. It is not an error for a message type to
+ * have all of its publishers or subscribers disabled.
+ *
+ * After a message is sent, it is delivered to every recipient.  This
+ * delivery happens from the top level of the event loop; it may be
+ * interleaved with network events, timers, etc.
+ *
+ * Messages may have associated data.  This data is typed, and is owned
+ * by the message.  Strings, byte-arrays, and integers have built-in
+ * support.  Other types may be added.  If objects are to be sent,
+ * they should be identified by handle.  If an object requires cleanup,
+ * it should be declared with an associated free function.
+ *
+ * Semantically, if two subsystems communicate only by this kind of
+ * message passing, neither is considered to depend on the other, though
+ * both are considered to have a dependency on the message and on any
+ * types it contains.
+ *
+ * (Or generational index?)
+ */
+#ifndef TOR_PUBSUB_PUBSUB_H
+#define TOR_PUBSUB_PUBSUB_H
+
+#include "lib/pubsub/pub_binding_st.h"
+#include "lib/pubsub/pubsub_connect.h"
+#include "lib/pubsub/pubsub_flags.h"
+#include "lib/pubsub/pubsub_publish.h"
+
+#endif
diff --git a/src/lib/pubsub/pubsub_build.c b/src/lib/pubsub/pubsub_build.c
new file mode 100644
index 000000000..72f2eacea
--- /dev/null
+++ b/src/lib/pubsub/pubsub_build.c
@@ -0,0 +1,286 @@
+/* Copyright (c) 2001, Matej Pfajfar.
+ * Copyright (c) 2001-2004, Roger Dingledine.
+ * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
+ * Copyright (c) 2007-2018, The Tor Project, Inc. */
+/* See LICENSE for licensing information */
+
+/**
+ * @file pubsub_build.c
+ * @brief Construct a dispatch_t in safer, more OO way.
+ **/
+
+#define PUBSUB_PRIVATE
+
+#include "lib/dispatch/dispatch.h"
+#include "lib/dispatch/dispatch_cfg.h"
+#include "lib/dispatch/dispatch_naming.h"
+#include "lib/dispatch/msgtypes.h"
+#include "lib/pubsub/pubsub_flags.h"
+#include "lib/pubsub/pub_binding_st.h"
+#include "lib/pubsub/pubsub_build.h"
+#include "lib/pubsub/pubsub_builder_st.h"
+#include "lib/pubsub/pubsub_connect.h"
+
+#include "lib/container/smartlist.h"
+#include "lib/log/util_bug.h"
+#include "lib/malloc/malloc.h"
+
+ #include <string.h>
+
+/** Construct and return a new empty pubsub_items_t. */
+static pubsub_items_t *
+pubsub_items_new(void)
+{
+  pubsub_items_t *cfg = tor_malloc_zero(sizeof(*cfg));
+  cfg->items = smartlist_new();
+  cfg->type_items = smartlist_new();
+  return cfg;
+}
+
+/** Release all storage held in a pubsub_items_t. */
+void
+pubsub_items_free_(pubsub_items_t *cfg)
+{
+  if (! cfg)
+    return;
+  SMARTLIST_FOREACH(cfg->items, pubsub_cfg_t *, item, tor_free(item));
+  SMARTLIST_FOREACH(cfg->type_items,
+                    pubsub_type_cfg_t *, item, tor_free(item));
+  smartlist_free(cfg->items);
+  smartlist_free(cfg->type_items);
+  tor_free(cfg);
+}
+
+/** Construct and return a new pubsub_builder_t. */
+pubsub_builder_t *
+pubsub_builder_new(void)
+{
+  dispatch_naming_init();
+
+  pubsub_builder_t *pb = tor_malloc_zero(sizeof(*pb));
+  pb->cfg = dcfg_new();
+  pb->items = pubsub_items_new();
+  return pb;
+}
+
+/**
+ * Release all storage held by a pubsub_builder_t.
+ *
+ * You'll (mostly) only want to call this function on an error case: if you're
+ * constructing a dispatch_t instead, you should call
+ * pubsub_builder_finalize() to consume the pubsub_builder_t.
+ */
+void
+pubsub_builder_free_(pubsub_builder_t *pb)
+{
+  if (pb == NULL)
+    return;
+  pubsub_items_free(pb->items);
+  dcfg_free(pb->cfg);
+  tor_free(pb);
+}
+
+/**
+ * Create and return a pubsub_connector_t for the subsystem with ID
+ * <b>subsys</b> to use in adding publications, subscriptions, and types to
+ * <b>builder</b>.
+ **/
+pubsub_connector_t *
+pubsub_connector_for_subsystem(pubsub_builder_t *builder,
+                               subsys_id_t subsys)
+{
+  tor_assert(builder);
+  ++builder->n_connectors;
+
+  pubsub_connector_t *con = tor_malloc_zero(sizeof(*con));
+
+  con->builder = builder;
+  con->subsys_id = subsys;
+
+  return con;
+}
+
+/**
+ * Release all storage held by a pubsub_connector_t.
+ **/
+void
+pubsub_connector_free_(pubsub_connector_t *con)
+{
+  if (!con)
+    return;
+
+  if (con->builder) {
+    --con->builder->n_connectors;
+    tor_assert(con->builder->n_connectors >= 0);
+  }
+  tor_free(con);
+}
+
+/**
+ * Use <b>con</b> to add a request for being able to publish messages of type
+ * <b>msg</b> with auxiliary data of <b>type</b> on <b>channel</b>.
+ **/
+int
+pubsub_add_pub_(pubsub_connector_t *con,
+                pub_binding_t *out,
+                channel_id_t channel,
+                message_id_t msg,
+                msg_type_id_t type,
+                unsigned flags,
+                const char *file,
+                unsigned line)
+{
+  pubsub_cfg_t *cfg = tor_malloc_zero(sizeof(*cfg));
+
+  memset(out, 0, sizeof(*out));
+  cfg->is_publish = true;
+
+  out->msg_template.sender = cfg->subsys = con->subsys_id;
+  out->msg_template.channel = cfg->channel = channel;
+  out->msg_template.msg = cfg->msg = msg;
+  out->msg_template.type = cfg->type = type;
+
+  cfg->flags = flags;
+  cfg->added_by_file = file;
+  cfg->added_by_line = line;
+
+  /* We're grabbing a pointer to the pub_binding_t so we can tell it about
+   * the dispatcher later on.
+   */
+  cfg->pub_binding = out;
+
+  smartlist_add(con->builder->items->items, cfg);
+
+  if (dcfg_msg_set_type(con->builder->cfg, msg, type) < 0)
+    goto err;
+  if (dcfg_msg_set_chan(con->builder->cfg, msg, channel) < 0)
+    goto err;
+
+  return 0;
+ err:
+  ++con->builder->n_errors;
+  return -1;
+}
+
+/**
+ * Use <b>con</b> to add a request for being able to publish messages of type
+ * <b>msg</b> with auxiliary data of <b>type</b> on <b>channel</b>,
+ * passing them to the callback in <b>recv_fn</b>.
+ **/
+int
+pubsub_add_sub_(pubsub_connector_t *con,
+                recv_fn_t recv_fn,
+                channel_id_t channel,
+                message_id_t msg,
+                msg_type_id_t type,
+                unsigned flags,
+                const char *file,
+                unsigned line)
+{
+  pubsub_cfg_t *cfg = tor_malloc_zero(sizeof(*cfg));
+
+  cfg->is_publish = false;
+  cfg->subsys = con->subsys_id;
+  cfg->channel = channel;
+  cfg->msg = msg;
+  cfg->type = type;
+  cfg->flags = flags;
+  cfg->added_by_file = file;
+  cfg->added_by_line = line;
+
+  cfg->recv_fn = recv_fn;
+
+  smartlist_add(con->builder->items->items, cfg);
+
+  if (dcfg_msg_set_type(con->builder->cfg, msg, type) < 0)
+    goto err;
+  if (dcfg_msg_set_chan(con->builder->cfg, msg, channel) < 0)
+    goto err;
+  if (! (flags & DISP_FLAG_STUB)) {
+    if (dcfg_add_recv(con->builder->cfg, msg, cfg->subsys, recv_fn) < 0)
+      goto err;
+  }
+
+  return 0;
+ err:
+  ++con->builder->n_errors;
+  return -1;
+}
+
+/**
+ * Use <b>con</b> to define a the functions to use for manipulating the type
+ * <b>type</b>.  Any function pointers left as NULL will be implemented as
+ * no-ops.
+ **/
+int
+pubsub_connector_define_type_(pubsub_connector_t *con,
+                              msg_type_id_t type,
+                              dispatch_typefns_t *fns,
+                              const char *file,
+                              unsigned line)
+{
+  pubsub_type_cfg_t *cfg = tor_malloc_zero(sizeof(*cfg));
+  cfg->type = type;
+  memcpy(&cfg->fns, fns, sizeof(*fns));
+  cfg->subsys = con->subsys_id;
+  cfg->added_by_file = file;
+  cfg->added_by_line = line;
+
+  smartlist_add(con->builder->items->type_items, cfg);
+
+  if (dcfg_type_set_fns(con->builder->cfg, type, fns) < 0)
+    goto err;
+
+  return 0;
+ err:
+  ++con->builder->n_errors;
+  return -1;
+}
+
+/**
+ * Initialize the dispatch_ptr field in every relevant publish binding
+ * for <b>d</b>.
+ */
+static void
+dispatch_fill_pub_binding_backptrs(pubsub_builder_t *builder,
+                                   dispatch_t *d)
+{
+  SMARTLIST_FOREACH_BEGIN(builder->items->items, pubsub_cfg_t *, cfg) {
+    if (cfg->pub_binding) {
+      // XXXX we could skip this for STUB publishers, and for any publishers
+      // XXXX where all subscribers are STUB.
+      cfg->pub_binding->dispatch_ptr = d;
+    }
+  } SMARTLIST_FOREACH_END(cfg);
+}
+
+/**
+ * Create a new dispatcher as configured in a pubsub_builder_t.
+ *
+ * Consumes and frees its input.
+ **/
+dispatch_t *
+pubsub_builder_finalize(pubsub_builder_t *builder)
+{
+  dispatch_t *dispatcher = NULL;
+  tor_assert_nonfatal(builder->n_connectors == 0);
+
+  if (builder->n_errors)
+    goto err;
+
+  /* Coming in the next commit.
+  if (pubsub_builder_check(builder) < 0)
+    goto err;
+  */
+
+  dispatcher = dispatch_new(builder->cfg);
+
+  if (!dispatcher)
+    goto err;
+
+  dispatch_fill_pub_binding_backptrs(builder, dispatcher);
+
+ err:
+  pubsub_builder_free(builder);
+  return dispatcher;
+}
diff --git a/src/lib/pubsub/pubsub_build.h b/src/lib/pubsub/pubsub_build.h
new file mode 100644
index 000000000..199eab219
--- /dev/null
+++ b/src/lib/pubsub/pubsub_build.h
@@ -0,0 +1,87 @@
+/* Copyright (c) 2001, Matej Pfajfar.
+ * Copyright (c) 2001-2004, Roger Dingledine.
+ * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
+ * Copyright (c) 2007-2018, The Tor Project, Inc. */
+/* See LICENSE for licensing information */
+
+/**
+ * @file pubsub_build.h
+ * @brief Header used for constructing the OO publish-subscribe facility.
+ *
+ * (See pubsub.h for more general information on this API.)
+ **/
+
+#ifndef TOR_PUBSUB_BUILD_H
+#define TOR_PUBSUB_BUILD_H
+
+struct dispatch_t;
+
+/**
+ * A "dispatch builder" is an incomplete dispatcher, used when
+ * registering messages.  It does not have the same integrity guarantees
+ * as a dispatcher.  It cannot actually handle messages itself: once all
+ * subsystems have registered, it is converted into a dispatch_t.
+ **/
+typedef struct pubsub_builder_t pubsub_builder_t;
+
+/**
+ * A "dispatch connector" is a view of the dispatcher that a subsystem
+ * uses while initializing itself.  It is specific to the subsystem, and
+ * ensures that each subsystem doesn't need to identify itself
+ * repeatedly while registering its messages.
+ **/
+typedef struct pubsub_connector_t pubsub_connector_t;
+
+/**
+ * Create a new pubsub_builder. This should only happen in the
+ * main-init code.
+ */
+pubsub_builder_t *pubsub_builder_new(void);
+
+/** DOCDOC */
+int pubsub_builder_check(pubsub_builder_t *);
+
+/**
+ * Free a pubsub builder.  This should only happen on error paths, where
+ * we have decided not to construct a dispatcher for some reason.
+ */
+#define pubsub_builder_free(db) \
+  FREE_AND_NULL(pubsub_builder_t, pubsub_builder_free_, (db))
+
+/** Internal implementation of pubsub_builder_free(). */
+void pubsub_builder_free_(pubsub_builder_t *);
+
+/**
+ * Create a pubsub connector that a single subsystem will use to
+ * register its messages.  The main-init code does this during susbsystem
+ * initialization.
+ */
+pubsub_connector_t *pubsub_connector_for_subsystem(pubsub_builder_t *,
+                                                   subsys_id_t);
+
+/**
+ * The main-init code does this after subsystem initialization.
+ */
+#define pubsub_connector_free(c) \
+  FREE_AND_NULL(pubsub_connector_t, pubsub_connector_free_, (c))
+
+void pubsub_connector_free_(pubsub_connector_t *);
+
+/**
+ * Constructs a dispatcher from a dispatch_builder, after checking that the
+ * invariances on the messages, channels, and connections have been
+ * respected.
+ *
+ * This should happen after every subsystem has initialized, and before
+ * entering the mainloop.
+ */
+struct dispatch_t *pubsub_builder_finalize(pubsub_builder_t *);
+
+#ifdef PUBSUB_PRIVATE
+struct pubsub_items_t;
+#define pubsub_items_free(cfg) \
+  FREE_AND_NULL(pubsub_items_t, pubsub_items_free_, (cfg))
+void pubsub_items_free_(struct pubsub_items_t *cfg);
+#endif
+
+#endif
diff --git a/src/lib/pubsub/pubsub_builder_st.h b/src/lib/pubsub/pubsub_builder_st.h
new file mode 100644
index 000000000..a1cc6e718
--- /dev/null
+++ b/src/lib/pubsub/pubsub_builder_st.h
@@ -0,0 +1,161 @@
+/* Copyright (c) 2001, Matej Pfajfar.
+ * Copyright (c) 2001-2004, Roger Dingledine.
+ * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
+ * Copyright (c) 2007-2018, The Tor Project, Inc. */
+/* See LICENSE for licensing information */
+
+/**
+ * @file pubsub_builder_st.h
+ *
+ * @brief private structures used for configuring dispatchers and messages.
+ */
+
+#ifndef TOR_PUBSUB_BUILDER_ST_H
+#define TOR_PUBSUB_BUILDER_ST_H
+
+#ifdef PUBSUB_PRIVATE
+
+#include <stdbool.h>
+#include <stddef.h>
+
+struct dispatch_cfg_t;
+struct smartlist_t;
+struct pub_binding_t;
+
+/**
+ * Configuration for a single publication or subscription request.
+ *
+ * These can be stored while the dispatcher is in use, but are only used for
+ * setup, teardown, and debugging.
+ *
+ * There are various fields in this request describing the message; all of
+ * them must match other descriptions of the message, or a bug has occurred.
+ **/
+typedef struct pubsub_cfg_t {
+  /** True if this is a publishing request; false for a subscribing request. */
+  bool is_publish;
+  /** The system making this request. */
+  subsys_id_t subsys;
+  /** The channel on which the message is to be sent. */
+  channel_id_t channel;
+  /** The message ID to be sent or received. */
+  message_id_t msg;
+  /** The C type associated with the message. */
+  msg_type_id_t type;
+  /** One or more DISP_FLAGS_* items, combined with bitwise OR. */
+  unsigned flags;
+
+  /**
+   * Publishing only: a pub_binding object that will receive the binding for
+   * this request.  We will finish filling this in when the dispatcher is
+   * constructed, so that the subsystem can publish then and not before.
+   */
+  struct pub_binding_t *pub_binding;
+
+  /**
+   * Subscribing only: a function to receive message objects for this request.
+   */
+  recv_fn_t recv_fn;
+
+  /** The file from which this message was configured */
+  const char *added_by_file;
+  /** The line at which this message was configured */
+  unsigned added_by_line;
+} pubsub_cfg_t;
+
+/**
+ * Configuration request for a single C type.
+ *
+ * These are stored while the dispatcher is in use, but are only used for
+ * setup, teardown, and debugging.
+ **/
+typedef struct pubsub_type_cfg_t {
+  /**
+   * The identifier for this type.
+   */
+  msg_type_id_t type;
+  /**
+   * Functions to use when manipulating the type.
+   */
+  dispatch_typefns_t fns;
+
+  /** The subsystem that configured this type. */
+  subsys_id_t subsys;
+  /** The file from which this type was configured */
+  const char *added_by_file;
+  /** The line at which this type was configured */
+  unsigned added_by_line;
+} pubsub_type_cfg_t;
+
+/**
+ * The set of configuration requests for a dispatcher, as made by various
+ * subsystems.
+ **/
+typedef struct pubsub_items_t {
+  /** List of pubsub_cfg_t. */
+  struct smartlist_t *items;
+  /** List of pubsub_type_cfg_t. */
+  struct smartlist_t *type_items;
+} pubsub_items_t;
+
+/**
+ * Type used to construct a dispatcher.  We use this type to build up the
+ * configuration for a dispatcher, and then pass ownership of that
+ * configuration to the newly constructed dispatcher.
+ **/
+struct pubsub_builder_t {
+  /** Number of outstanding pubsub_connector_t objects pointing to this
+   * pubsub_builder_t. */
+  int n_connectors;
+  /** Number of errors encountered while constructing this object so far. */
+  int n_errors;
+  /** In-progress configuration that we're constructing, as a list of the
+   * requests that have been made. */
+  pubsub_items_t *items;
+  /** In-progress configuration that we're constructing, in a form that can
+   * be converted to a dispatch_t. */
+  struct dispatch_cfg_t *cfg;
+};
+
+/**
+ * Type given to a subsystem when adding connections to a pubsub_builder_t.
+ * We use this type to force each subsystem to get blamed for the
+ * publications, subscriptions, and types that it adds.
+ **/
+struct pubsub_connector_t {
+  /** The pubsub_builder that this connector refers to. */
+  struct pubsub_builder_t *builder;
+  /** The subsystem that has been given this connector. */
+  subsys_id_t subsys_id;
+};
+
+/**
+ * Helper structure used when constructing a dispatcher that sorts the
+ * pubsub_cfg_t objects in various ways.
+ **/
+typedef struct pubsub_adjmap_t {
+  /* XXXX The next three fields are currently constructed but not yet
+   * XXXX used. I believe we'll want them in the future, though. -nickm
+   */
+  /** Number of subsystems; length of the *_by_subsys arrays. */
+  size_t n_subsystems;
+  /** Array of lists of publisher pubsub_cfg_t objects, indexed by
+   * subsystem. */
+  struct smartlist_t **pub_by_subsys;
+  /** Array of lists of subscriber pubsub_cfg_t objects, indexed by
+   * subsystem. */
+  struct smartlist_t **sub_by_subsys;
+
+  /** Number of message IDs; length of the *_by_msg arrays. */
+  size_t n_msgs;
+  /** Array of lists of publisher pubsub_cfg_t objects, indexed by
+   * message ID. */
+  struct smartlist_t **pub_by_msg;
+  /** Array of lists of subscriber pubsub_cfg_t objects, indexed by
+   * message ID. */
+  struct smartlist_t **sub_by_msg;
+} pubsub_adjmap_t;
+
+#endif
+
+#endif
diff --git a/src/lib/pubsub/pubsub_connect.h b/src/lib/pubsub/pubsub_connect.h
new file mode 100644
index 000000000..b63f9dc43
--- /dev/null
+++ b/src/lib/pubsub/pubsub_connect.h
@@ -0,0 +1,47 @@
+/* Copyright (c) 2001, Matej Pfajfar.
+ * Copyright (c) 2001-2004, Roger Dingledine.
+ * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
+ * Copyright (c) 2007-2018, The Tor Project, Inc. */
+/* See LICENSE for licensing information */
+
+/**
+ * @file pubsub_connect.h
+ * @brief Header for functions that add relationships to a pubsub builder.
+ *
+ * These functions are used by modules that need to add publication and
+ * subscription requests.
+ **/
+
+#ifndef TOR_PUBSUB_CONNECT_H
+#define TOR_PUBSUB_CONNECT_H
+
+#include "lib/dispatch/msgtypes.h"
+
+struct pub_binding_t;
+struct pubsub_connector_t;
+
+int pubsub_add_pub_(struct pubsub_connector_t *con,
+                    struct pub_binding_t *out,
+                    channel_id_t channel,
+                    message_id_t msg,
+                    msg_type_id_t type,
+                    unsigned flags,
+                    const char *file,
+                    unsigned line);
+
+int pubsub_add_sub_(struct pubsub_connector_t *con,
+                    recv_fn_t recv_fn,
+                    channel_id_t channel,
+                    message_id_t msg,
+                    msg_type_id_t type,
+                    unsigned flags,
+                    const char *file,
+                    unsigned line);
+
+int pubsub_connector_define_type_(struct pubsub_connector_t *,
+                                  msg_type_id_t,
+                                  dispatch_typefns_t *,
+                                  const char *file,
+                                  unsigned line);
+
+#endif
diff --git a/src/lib/pubsub/pubsub_flags.h b/src/lib/pubsub/pubsub_flags.h
new file mode 100644
index 000000000..d07a06be7
--- /dev/null
+++ b/src/lib/pubsub/pubsub_flags.h
@@ -0,0 +1,32 @@
+/* Copyright (c) 2001, Matej Pfajfar.
+ * Copyright (c) 2001-2004, Roger Dingledine.
+ * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
+ * Copyright (c) 2007-2018, The Tor Project, Inc. */
+/* See LICENSE for licensing information */
+
+/**
+ * @file pubsub_flags.h
+ * @brief Flags that can be set on publish/subscribe messages.
+ **/
+
+#ifndef TOR_PUBSUB_FLAGS_H
+#define TOR_PUBSUB_FLAGS_H
+
+/**
+ * Flag for registering a message: declare that no other module is allowed to
+ * publish this message if we are publishing it, or subscribe to it if we are
+ * subscribing to it.
+ */
+#define DISP_FLAG_EXCL (1u<<0)
+
+/**
+ * Flag for registering a message: declare that this message is a stub, and we
+ * will not actually publish/subscribe it, but that the dispatcher should
+ * treat us as if we did when typechecking.
+ *
+ * We use this so that messages aren't treated as "dangling" if they are
+ * potentially used by some other build of Tor.
+ */
+#define DISP_FLAG_STUB (1u<<1)
+
+#endif
diff --git a/src/lib/pubsub/pubsub_publish.c b/src/lib/pubsub/pubsub_publish.c
new file mode 100644
index 000000000..8c469e8ad
--- /dev/null
+++ b/src/lib/pubsub/pubsub_publish.c
@@ -0,0 +1,70 @@
+/* Copyright (c) 2001, Matej Pfajfar.
+ * Copyright (c) 2001-2004, Roger Dingledine.
+ * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
+ * Copyright (c) 2007-2018, The Tor Project, Inc. */
+/* See LICENSE for licensing information */
+
+/**
+ * @file pubsub_publish.h
+ * @brief Header for functions to publish using a pub_binding_t.
+ **/
+
+#define PUBSUB_PRIVATE
+#define DISPATCH_PRIVATE
+#include "orconfig.h"
+
+#include "lib/dispatch/dispatch.h"
+#include "lib/dispatch/dispatch_st.h"
+
+#include "lib/pubsub/pub_binding_st.h"
+#include "lib/pubsub/pubsub_publish.h"
+
+#include "lib/malloc/malloc.h"
+#include "lib/log/util_bug.h"
+
+#include <string.h>
+
+/**
+ * Publish a message from the publication binding <b>pub</b> using the
+ * auxiliary data <b>auxdata</b>.
+ *
+ * Return 0 on success, -1 on failure.
+ **/
+int
+pubsub_pub_(const pub_binding_t *pub, msg_aux_data_t auxdata)
+{
+  dispatch_t *d = pub->dispatch_ptr;
+  if (BUG(! d)) {
+    /* Tried to publish a message before the dispatcher was configured. */
+    /* (Without a dispatcher, we don't know how to free auxdata.) */
+    return -1;
+  }
+
+  if (BUG(pub->msg_template.type >= d->n_types)) {
+    /* The type associated with this message is not known to the dispatcher. */
+    /* (Without a correct type, we don't know how to free auxdata.) */
+    return -1;
+  }
+
+  if (BUG(pub->msg_template.msg >= d->n_msgs) ||
+      BUG(pub->msg_template.channel >= d->n_queues)) {
+    /* The message ID or channel ID was out of bounds. */
+    d->typefns[pub->msg_template.type].free_fn(auxdata);
+    return -1;
+  }
+
+  if (! d->table[pub->msg_template.msg]) {
+    /* Fast path: nobody wants this data. */
+
+    // XXXX Faster path: we could store this in the pub_binding_t.
+    d->typefns[pub->msg_template.type].free_fn(auxdata);
+    return 0;
+  }
+
+  /* Construct the message object */
+  msg_t *m = tor_malloc(sizeof(msg_t));
+  memcpy(m, &pub->msg_template, sizeof(msg_t));
+  m->aux_data__ = auxdata;
+
+  return dispatch_send_msg_unchecked(d, m);
+}
diff --git a/src/lib/pubsub/pubsub_publish.h b/src/lib/pubsub/pubsub_publish.h
new file mode 100644
index 000000000..0250fd076
--- /dev/null
+++ b/src/lib/pubsub/pubsub_publish.h
@@ -0,0 +1,15 @@
+/* Copyright (c) 2001, Matej Pfajfar.
+ * Copyright (c) 2001-2004, Roger Dingledine.
+ * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
+ * Copyright (c) 2007-2018, The Tor Project, Inc. */
+/* See LICENSE for licensing information */
+
+#ifndef TOR_PUBSUB_PUBLISH_H
+#define TOR_PUBSUB_PUBLISH_H
+
+#include "lib/dispatch/msgtypes.h"
+struct pub_binding_t;
+
+int pubsub_pub_(const struct pub_binding_t *pub, msg_aux_data_t auxdata);
+
+#endif
diff --git a/src/test/include.am b/src/test/include.am
index d4e8eb4e8..7734b73de 100644
--- a/src/test/include.am
+++ b/src/test/include.am
@@ -165,6 +165,7 @@ src_test_test_SOURCES += \
 	src/test/test_proto_misc.c \
 	src/test/test_protover.c \
 	src/test/test_pt.c \
+	src/test/test_pubsub_msg.c \
 	src/test/test_relay.c \
 	src/test/test_relaycell.c \
 	src/test/test_relaycrypt.c \
diff --git a/src/test/test.c b/src/test/test.c
index 4c6d9775b..da0be4133 100644
--- a/src/test/test.c
+++ b/src/test/test.c
@@ -910,6 +910,7 @@ struct testgroup_t testgroups[] = {
   { "proto/misc/", proto_misc_tests },
   { "protover/", protover_tests },
   { "pt/", pt_tests },
+  { "pubsub/msg/", pubsub_msg_tests },
   { "relay/" , relay_tests },
   { "relaycell/", relaycell_tests },
   { "relaycrypt/", relaycrypt_tests },
diff --git a/src/test/test.h b/src/test/test.h
index cd0ce4f6d..fb417124c 100644
--- a/src/test/test.h
+++ b/src/test/test.h
@@ -253,6 +253,7 @@ extern struct testcase_t proto_http_tests[];
 extern struct testcase_t proto_misc_tests[];
 extern struct testcase_t protover_tests[];
 extern struct testcase_t pt_tests[];
+extern struct testcase_t pubsub_msg_tests[];
 extern struct testcase_t relay_tests[];
 extern struct testcase_t relaycell_tests[];
 extern struct testcase_t relaycrypt_tests[];
diff --git a/src/test/test_pubsub_msg.c b/src/test/test_pubsub_msg.c
new file mode 100644
index 000000000..5b771d45a
--- /dev/null
+++ b/src/test/test_pubsub_msg.c
@@ -0,0 +1,305 @@
+/* Copyright (c) 2018, The Tor Project, Inc. */
+/* See LICENSE for licensing information */
+
+#define DISPATCH_PRIVATE
+
+#include "test/test.h"
+
+#include "lib/dispatch/dispatch.h"
+#include "lib/dispatch/dispatch_naming.h"
+#include "lib/dispatch/dispatch_st.h"
+#include "lib/dispatch/msgtypes.h"
+#include "lib/pubsub/pubsub_flags.h"
+#include "lib/pubsub/pub_binding_st.h"
+#include "lib/pubsub/pubsub_build.h"
+#include "lib/pubsub/pubsub_builder_st.h"
+#include "lib/pubsub/pubsub_connect.h"
+#include "lib/pubsub/pubsub_publish.h"
+
+#include "lib/log/escape.h"
+#include "lib/malloc/malloc.h"
+#include "lib/string/printf.h"
+
+#include <stdio.h>
+#include <string.h>
+
+static char *
+ex_str_fmt(msg_aux_data_t aux)
+{
+  return esc_for_log(aux.ptr);
+}
+static void
+ex_str_free(msg_aux_data_t aux)
+{
+  tor_free_(aux.ptr);
+}
+static dispatch_typefns_t stringfns = {
+  .free_fn = ex_str_free,
+  .fmt_fn = ex_str_fmt
+};
+
+// We're using the lowest-level publish/subscribe logic here, to avoid the
+// pubsub_macros.h macros and just test the dispatch core.  We'll use a string
+// type for everything.
+
+#define DECLARE_MESSAGE(suffix)                         \
+  static pub_binding_t pub_binding_##suffix;            \
+  static int msg_received_##suffix = 0;                 \
+  static void recv_msg_##suffix(const msg_t *m) {       \
+    (void)m;                                            \
+    ++msg_received_##suffix;                            \
+  }                                                     \
+  EAT_SEMICOLON
+
+#define ADD_PUBLISH(binding_suffix, subsys, channel, msg, flags)        \
+  STMT_BEGIN {                                                          \
+    con = pubsub_connector_for_subsystem(builder,                       \
+                                           get_subsys_id(#subsys));     \
+    pubsub_add_pub_(con, &pub_binding_##binding_suffix,                 \
+                      get_channel_id(#channel),                         \
+                      get_message_id(#msg), get_msg_type_id("string"),  \
+                      (flags), __FILE__, __LINE__);                     \
+    pubsub_connector_free(con);                                         \
+  } STMT_END
+
+#define ADD_SUBSCRIBE(hook_suffix, subsys, channel, msg, flags)         \
+  STMT_BEGIN {                                                          \
+    con = pubsub_connector_for_subsystem(builder,                       \
+                                           get_subsys_id(#subsys));     \
+    pubsub_add_sub_(con, recv_msg_##hook_suffix,                        \
+                      get_channel_id(#channel),                         \
+                      get_message_id(#msg), get_msg_type_id("string"),  \
+                      (flags), __FILE__, __LINE__);                     \
+    pubsub_connector_free(con);                                         \
+  } STMT_END
+
+#define SEND(binding_suffix, val)                          \
+  STMT_BEGIN {                                             \
+    msg_aux_data_t data_;                                  \
+    data_.ptr = tor_strdup(val);                           \
+    pubsub_pub_(&pub_binding_##binding_suffix, data_);     \
+  } STMT_END
+
+DECLARE_MESSAGE(msg1);
+DECLARE_MESSAGE(msg2);
+DECLARE_MESSAGE(msg3);
+DECLARE_MESSAGE(msg4);
+DECLARE_MESSAGE(msg5);
+
+static smartlist_t *strings_received = NULL;
+static void
+recv_msg_copy_string(const msg_t *m)
+{
+  const char *s = m->aux_data__.ptr;
+  smartlist_add(strings_received, tor_strdup(s));
+}
+
+static void *
+setup_dispatcher(const struct testcase_t *testcase)
+{
+  (void)testcase;
+  pubsub_builder_t *builder = pubsub_builder_new();
+  pubsub_connector_t *con;
+
+  {
+    con = pubsub_connector_for_subsystem(builder, get_subsys_id("types"));
+    pubsub_connector_define_type_(con,
+                                    get_msg_type_id("string"),
+                                    &stringfns,
+                                    "nowhere.c", 99);
+    pubsub_connector_free(con);
+  }
+  // message1 has one publisher and one subscriber.
+  ADD_PUBLISH(msg1, sys1, main, message1, 0);
+  ADD_SUBSCRIBE(msg1, sys2, main, message1, 0);
+
+  // message2 has a publisher and a stub subscriber.
+  ADD_PUBLISH(msg2, sys1, main, message2, 0);
+  ADD_SUBSCRIBE(msg2, sys2, main, message2, DISP_FLAG_STUB);
+
+  // message3 has a publisher and three subscribers.
+  ADD_PUBLISH(msg3, sys1, main, message3, 0);
+  ADD_SUBSCRIBE(msg3, sys2, main, message3, 0);
+  ADD_SUBSCRIBE(msg3, sys3, main, message3, 0);
+  ADD_SUBSCRIBE(msg3, sys4, main, message3, 0);
+
+  // message4 has one publisher and two subscribers, but it's on another
+  // channel.
+  ADD_PUBLISH(msg4, sys2, other, message4, 0);
+  ADD_SUBSCRIBE(msg4, sys1, other, message4, 0);
+  ADD_SUBSCRIBE(msg4, sys3, other, message4, 0);
+
+  // message5 has a huge number of recipients.
+  ADD_PUBLISH(msg5, sys3, main, message5, 0);
+  ADD_SUBSCRIBE(msg5, sys4, main, message5, 0);
+  ADD_SUBSCRIBE(msg5, sys5, main, message5, 0);
+  ADD_SUBSCRIBE(msg5, sys6, main, message5, 0);
+  ADD_SUBSCRIBE(msg5, sys7, main, message5, 0);
+  ADD_SUBSCRIBE(msg5, sys8, main, message5, 0);
+  for (int i = 0; i < 1000-5; ++i) {
+    char *sys;
+    tor_asprintf(&sys, "xsys-%d", i);
+    con = pubsub_connector_for_subsystem(builder, get_subsys_id(sys));
+    pubsub_add_sub_(con, recv_msg_copy_string,
+                    get_channel_id("main"),
+                    get_message_id("message5"),
+                    get_msg_type_id("string"), 0, "here", 100);
+    pubsub_connector_free(con);
+    tor_free(sys);
+  }
+
+  return pubsub_builder_finalize(builder);
+}
+
+static int
+cleanup_dispatcher(const struct testcase_t *testcase, void *dispatcher_)
+{
+  (void)testcase;
+  dispatch_t *dispatcher = dispatcher_;
+  dispatch_free(dispatcher);
+  return 1;
+}
+
+static const struct testcase_setup_t dispatcher_setup = {
+  setup_dispatcher, cleanup_dispatcher
+};
+
+static void
+test_pubsub_msg_minimal(void *arg)
+{
+  dispatch_t *d = arg;
+
+  tt_int_op(0, OP_EQ, msg_received_msg1);
+  SEND(msg1, "hello world");
+  tt_int_op(0, OP_EQ, msg_received_msg1); // hasn't actually arrived yet.
+
+  tt_int_op(0, OP_EQ, dispatch_flush(d, get_channel_id("main"), 1000));
+  tt_int_op(1, OP_EQ, msg_received_msg1); // we got the message!
+
+ done:
+  ;
+}
+
+static void
+test_pubsub_msg_send_to_stub(void *arg)
+{
+  dispatch_t *d = arg;
+
+  tt_int_op(0, OP_EQ, msg_received_msg2);
+  SEND(msg2, "hello silence");
+  tt_int_op(0, OP_EQ, msg_received_msg2); // hasn't actually arrived yet.
+
+  tt_int_op(0, OP_EQ, dispatch_flush(d, get_channel_id("main"), 1000));
+  tt_int_op(0, OP_EQ, msg_received_msg2); // doesn't arrive -- stub hook.
+
+ done:
+  ;
+}
+
+static void
+test_pubsub_msg_cancel_msgs(void *arg)
+{
+  dispatch_t *d = arg;
+
+  tt_int_op(0, OP_EQ, msg_received_msg1);
+  for (int i = 0; i < 100; ++i) {
+    SEND(msg1, "hello world");
+  }
+  tt_int_op(0, OP_EQ, msg_received_msg1); // hasn't actually arrived yet.
+
+  tt_int_op(0, OP_EQ, dispatch_flush(d, get_channel_id("main"), 10));
+  tt_int_op(10, OP_EQ, msg_received_msg1); // we got the message 10 times.
+
+  // At this point, the dispatcher will be freed with queued, undelivered
+  // messages.
+ done:
+  ;
+}
+
+struct alertfn_target {
+  dispatch_t *d;
+  channel_id_t ch;
+  int count;
+};
+static void
+alertfn_generic(dispatch_t *d, channel_id_t ch, void *arg)
+{
+  struct alertfn_target *t = arg;
+  tt_ptr_op(d, OP_EQ, t->d);
+  tt_int_op(ch, OP_EQ, t->ch);
+  ++t->count;
+ done:
+  ;
+}
+
+static void
+test_pubsub_msg_alertfns(void *arg)
+{
+  dispatch_t *d = arg;
+  struct alertfn_target ch1_a = { d, get_channel_id("main"), 0 };
+  struct alertfn_target ch2_a = { d, get_channel_id("other"), 0 };
+
+  tt_int_op(0, OP_EQ,
+            dispatch_set_alert_fn(d, get_channel_id("main"),
+                                  alertfn_generic, &ch1_a));
+  tt_int_op(0, OP_EQ,
+            dispatch_set_alert_fn(d, get_channel_id("other"),
+                                  alertfn_generic, &ch2_a));
+
+  SEND(msg3, "hello");
+  tt_int_op(ch1_a.count, OP_EQ, 1);
+  SEND(msg3, "world");
+  tt_int_op(ch1_a.count, OP_EQ, 1); // only the first message sends an alert
+  tt_int_op(ch2_a.count, OP_EQ, 0); // no alert for 'other'
+
+  SEND(msg4, "worse things happen in C");
+  tt_int_op(ch2_a.count, OP_EQ, 1);
+
+  // flush the first (main) channel...
+  tt_int_op(0, OP_EQ, dispatch_flush(d, get_channel_id("main"), 1000));
+  tt_int_op(6, OP_EQ, msg_received_msg3); // 3 subscribers, 2 instances.
+
+  // now that the main channel is flushed, sending another message on it
+  // starts another alert.
+  tt_int_op(ch1_a.count, OP_EQ, 1);
+  SEND(msg1, "plover");
+  tt_int_op(ch1_a.count, OP_EQ, 2);
+  tt_int_op(ch2_a.count, OP_EQ, 1);
+
+ done:
+  ;
+}
+
+/* try more than N_FAST_FNS hooks on msg5 */
+static void
+test_pubsub_msg_many_hooks(void *arg)
+{
+  dispatch_t *d = arg;
+  strings_received = smartlist_new();
+
+  tt_int_op(0, OP_EQ, msg_received_msg5);
+  SEND(msg5, "hello world");
+  tt_int_op(0, OP_EQ, msg_received_msg5);
+  tt_int_op(0, OP_EQ, smartlist_len(strings_received));
+
+  tt_int_op(0, OP_EQ, dispatch_flush(d, get_channel_id("main"), 100000));
+  tt_int_op(5, OP_EQ, msg_received_msg5);
+  tt_int_op(995, OP_EQ, smartlist_len(strings_received));
+
+ done:
+  SMARTLIST_FOREACH(strings_received, char *, s, tor_free(s));
+  smartlist_free(strings_received);
+}
+
+#define T(name)                                                 \
+  { #name, test_pubsub_msg_ ## name , TT_FORK,                \
+      &dispatcher_setup, NULL }
+
+struct testcase_t pubsub_msg_tests[] = {
+  T(minimal),
+  T(send_to_stub),
+  T(cancel_msgs),
+  T(alertfns),
+  T(many_hooks),
+  END_OF_TESTCASES
+};





More information about the tor-commits mailing list