tor-commits
Threads by month
- ----- 2025 -----
- July
- June
- May
- April
- March
- February
- January
- ----- 2024 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2023 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2022 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2021 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2020 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2019 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2018 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2017 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2016 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2015 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2014 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2013 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2012 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2011 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
March 2019
- 20 participants
- 3265 discussions

[tor/master] Add a new inline function to check whether debug logging is on
by asn@torproject.org 27 Mar '19
by asn@torproject.org 27 Mar '19
27 Mar '19
commit a62ac1719887f0756ceb516ce3b12cd2aee18191
Author: Nick Mathewson <nickm(a)torproject.org>
Date: Sun Jan 13 15:45:06 2019 -0500
Add a new inline function to check whether debug logging is on
We already do this in our log_debug() macro, but there are times
when we'd like to avoid allocating or precomputing something that we
are only going to log if debugging is on.
---
src/lib/log/log.h | 17 +++++++++++++----
1 file changed, 13 insertions(+), 4 deletions(-)
diff --git a/src/lib/log/log.h b/src/lib/log/log.h
index 45e01f339..fda984e78 100644
--- a/src/lib/log/log.h
+++ b/src/lib/log/log.h
@@ -193,6 +193,15 @@ void tor_log_get_logfile_names(struct smartlist_t *out);
extern int log_global_min_severity_;
+static inline bool debug_logging_enabled(void);
+/**
+ * Return true iff debug logging is enabled for at least one domain.
+ */
+static inline bool debug_logging_enabled(void)
+{
+ return PREDICT_UNLIKELY(log_global_min_severity_ == LOG_DEBUG);
+}
+
void log_fn_(int severity, log_domain_mask_t domain,
const char *funcname, const char *format, ...)
CHECK_PRINTF(4,5);
@@ -222,8 +231,8 @@ void tor_log_string(int severity, log_domain_mask_t domain,
log_fn_ratelim_(ratelim, severity, domain, __FUNCTION__, args)
#define log_debug(domain, args...) \
STMT_BEGIN \
- if (PREDICT_UNLIKELY(log_global_min_severity_ == LOG_DEBUG)) \
- log_fn_(LOG_DEBUG, domain, __FUNCTION__, args); \
+ if (debug_logging_enabled()) \
+ log_fn_(LOG_DEBUG, domain, __FUNCTION__, args); \
STMT_END
#define log_info(domain, args...) \
log_fn_(LOG_INFO, domain, __FUNCTION__, args)
@@ -240,8 +249,8 @@ void tor_log_string(int severity, log_domain_mask_t domain,
#define log_debug(domain, args, ...) \
STMT_BEGIN \
- if (PREDICT_UNLIKELY(log_global_min_severity_ == LOG_DEBUG)) \
- log_fn_(LOG_DEBUG, domain, __FUNCTION__, args, ##__VA_ARGS__); \
+ if (debug_logging_enabled()) \
+ log_fn_(LOG_DEBUG, domain, __FUNCTION__, args, ##__VA_ARGS__); \
STMT_END
#define log_info(domain, args,...) \
log_fn_(LOG_INFO, domain, __FUNCTION__, args, ##__VA_ARGS__)
1
0
commit 9e60482b8073f2d43187c36c9159fd4367d7140a
Author: Nick Mathewson <nickm(a)torproject.org>
Date: Tue Jan 15 09:01:20 2019 -0500
Pubsub: an OO layer on top of lib/dispatch
This "publish/subscribe" layer sits on top of lib/dispatch, and
tries to provide more type-safety and cross-checking for the
lower-level layer.
Even with this commit, we're still not done: more checking will come
in the next commit, and a set of usability/typesafety macros will
come after.
---
src/lib/pubsub/.may_include | 10 ++
src/lib/pubsub/include.am | 24 +++
src/lib/pubsub/pub_binding_st.h | 36 +++++
src/lib/pubsub/pubsub.h | 85 +++++++++++
src/lib/pubsub/pubsub_build.c | 286 ++++++++++++++++++++++++++++++++++
src/lib/pubsub/pubsub_build.h | 87 +++++++++++
src/lib/pubsub/pubsub_builder_st.h | 161 ++++++++++++++++++++
src/lib/pubsub/pubsub_connect.h | 47 ++++++
src/lib/pubsub/pubsub_flags.h | 32 ++++
src/lib/pubsub/pubsub_publish.c | 70 +++++++++
src/lib/pubsub/pubsub_publish.h | 15 ++
src/test/include.am | 1 +
src/test/test.c | 1 +
src/test/test.h | 1 +
src/test/test_pubsub_msg.c | 305 +++++++++++++++++++++++++++++++++++++
15 files changed, 1161 insertions(+)
diff --git a/src/lib/pubsub/.may_include b/src/lib/pubsub/.may_include
new file mode 100644
index 000000000..5623492f0
--- /dev/null
+++ b/src/lib/pubsub/.may_include
@@ -0,0 +1,10 @@
+orconfig.h
+
+lib/cc/*.h
+lib/container/*.h
+lib/dispatch/*.h
+lib/intmath/*.h
+lib/log/*.h
+lib/malloc/*.h
+lib/pubsub/*.h
+lib/string/*.h
diff --git a/src/lib/pubsub/include.am b/src/lib/pubsub/include.am
new file mode 100644
index 000000000..9856c94a5
--- /dev/null
+++ b/src/lib/pubsub/include.am
@@ -0,0 +1,24 @@
+
+noinst_LIBRARIES += src/lib/libtor-pubsub.a
+
+if UNITTESTS_ENABLED
+noinst_LIBRARIES += src/lib/libtor-pubsub-testing.a
+endif
+
+src_lib_libtor_pubsub_a_SOURCES = \
+ src/lib/pubsub/pubsub_build.c \
+ src/lib/pubsub/pubsub_publish.c
+
+src_lib_libtor_pubsub_testing_a_SOURCES = \
+ $(src_lib_libtor_pubsub_a_SOURCES)
+src_lib_libtor_pubsub_testing_a_CPPFLAGS = $(AM_CPPFLAGS) $(TEST_CPPFLAGS)
+src_lib_libtor_pubsub_testing_a_CFLAGS = $(AM_CFLAGS) $(TEST_CFLAGS)
+
+noinst_HEADERS += \
+ src/lib/pubsub/pub_binding_st.h \
+ src/lib/pubsub/pubsub.h \
+ src/lib/pubsub/pubsub_build.h \
+ src/lib/pubsub/pubsub_builder_st.h \
+ src/lib/pubsub/pubsub_connect.h \
+ src/lib/pubsub/pubsub_flags.h \
+ src/lib/pubsub/pubsub_publish.h
diff --git a/src/lib/pubsub/pub_binding_st.h b/src/lib/pubsub/pub_binding_st.h
new file mode 100644
index 000000000..e8c0d47ef
--- /dev/null
+++ b/src/lib/pubsub/pub_binding_st.h
@@ -0,0 +1,36 @@
+/* Copyright (c) 2001, Matej Pfajfar.
+ * Copyright (c) 2001-2004, Roger Dingledine.
+ * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
+ * Copyright (c) 2007-2018, The Tor Project, Inc. */
+/* See LICENSE for licensing information */
+
+/**
+ * @file pubsub_build.h
+ * @brief Declaration of pub_binding_t.
+ */
+
+#ifndef TOR_PUB_BINDING_ST_H
+#define TOR_PUB_BINDING_ST_H
+
+#include "lib/dispatch/msgtypes.h"
+struct dispatch_t;
+
+/**
+ * A pub_binding_t is an opaque object that subsystems use to publish
+ * messages. The DISPATCH_ADD_PUB*() macros set it up.
+ **/
+typedef struct pub_binding_t {
+ /**
+ * A pointer to a configured dispatch_t object. This is filled in
+ * when the dispatch_t is finally constructed.
+ **/
+ struct dispatch_t *dispatch_ptr;
+ /**
+ * A template for the msg_t fields that are filled in for this message.
+ * This is copied into outgoing messages, ensuring that their fields are set
+ * corretly.
+ **/
+ msg_t msg_template;
+} pub_binding_t;
+
+#endif
diff --git a/src/lib/pubsub/pubsub.h b/src/lib/pubsub/pubsub.h
new file mode 100644
index 000000000..303b36ad5
--- /dev/null
+++ b/src/lib/pubsub/pubsub.h
@@ -0,0 +1,85 @@
+/* Copyright (c) 2001, Matej Pfajfar.
+ * Copyright (c) 2001-2004, Roger Dingledine.
+ * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
+ * Copyright (c) 2007-2018, The Tor Project, Inc. */
+/* See LICENSE for licensing information */
+
+/**
+ * @file pubsub.h
+ * @brief Header for OO publish-subscribe functionality.
+ *
+ * This module provides a wrapper around the "dispatch" module,
+ * ensuring type-safety and allowing us to do static analysis on
+ * publication and subscriptions.
+ *
+ * With this module, we enforce:
+ * <ul>
+ * <li>that every message has (potential) publishers and subscribers;
+ * <li>that every message is published and subscribed from the correct
+ * channels, with the correct type ID, every time it is published.
+ * <li>that type IDs correspond to a single C type, and that the C types are
+ * used correctly.
+ * <li>that when a message is published or subscribed, it is done with
+ * a correct subsystem identifier
+ * </ul>
+ *
+ * We do this by making "publication requests" and "subscription requests"
+ * into objects, and doing some computation on them before we create
+ * a dispatch_t with them.
+ *
+ * Rather than using the dispatch module directly, a publishing module
+ * receives a "binding" object that it uses to send messages with the right
+ * settings.
+ */
+
+/*
+ *
+ * Overview: Messages are sent over channels. Before sending a message on a
+ * channel, or receiving a message on a channel, a subsystem needs to register
+ * that it publishes, or subscribes, to that message, on that channel.
+ *
+ * Messages, channels, and subsystems are represented internally as short
+ * integers, though they are associated with human-readable strings for
+ * initialization and debugging.
+ *
+ * When registering for a message, a subsystem must say whether it is an
+ * exclusive publisher/subscriber to that message type, or whether other
+ * subsystems may also publish/subscribe to it.
+ *
+ * All messages and their publishers/subscribers must be registered early in
+ * the initialization process.
+ *
+ * By default, it is an error for a message type to have publishers and no
+ * subscribers on a channel, or subscribers and no publishers on a channel.
+ *
+ * A subsystem may register for a message with a note that delivery or
+ * production is disabled -- for example, because the subsystem is
+ * disabled at compile-time. It is not an error for a message type to
+ * have all of its publishers or subscribers disabled.
+ *
+ * After a message is sent, it is delivered to every recipient. This
+ * delivery happens from the top level of the event loop; it may be
+ * interleaved with network events, timers, etc.
+ *
+ * Messages may have associated data. This data is typed, and is owned
+ * by the message. Strings, byte-arrays, and integers have built-in
+ * support. Other types may be added. If objects are to be sent,
+ * they should be identified by handle. If an object requires cleanup,
+ * it should be declared with an associated free function.
+ *
+ * Semantically, if two subsystems communicate only by this kind of
+ * message passing, neither is considered to depend on the other, though
+ * both are considered to have a dependency on the message and on any
+ * types it contains.
+ *
+ * (Or generational index?)
+ */
+#ifndef TOR_PUBSUB_PUBSUB_H
+#define TOR_PUBSUB_PUBSUB_H
+
+#include "lib/pubsub/pub_binding_st.h"
+#include "lib/pubsub/pubsub_connect.h"
+#include "lib/pubsub/pubsub_flags.h"
+#include "lib/pubsub/pubsub_publish.h"
+
+#endif
diff --git a/src/lib/pubsub/pubsub_build.c b/src/lib/pubsub/pubsub_build.c
new file mode 100644
index 000000000..72f2eacea
--- /dev/null
+++ b/src/lib/pubsub/pubsub_build.c
@@ -0,0 +1,286 @@
+/* Copyright (c) 2001, Matej Pfajfar.
+ * Copyright (c) 2001-2004, Roger Dingledine.
+ * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
+ * Copyright (c) 2007-2018, The Tor Project, Inc. */
+/* See LICENSE for licensing information */
+
+/**
+ * @file pubsub_build.c
+ * @brief Construct a dispatch_t in safer, more OO way.
+ **/
+
+#define PUBSUB_PRIVATE
+
+#include "lib/dispatch/dispatch.h"
+#include "lib/dispatch/dispatch_cfg.h"
+#include "lib/dispatch/dispatch_naming.h"
+#include "lib/dispatch/msgtypes.h"
+#include "lib/pubsub/pubsub_flags.h"
+#include "lib/pubsub/pub_binding_st.h"
+#include "lib/pubsub/pubsub_build.h"
+#include "lib/pubsub/pubsub_builder_st.h"
+#include "lib/pubsub/pubsub_connect.h"
+
+#include "lib/container/smartlist.h"
+#include "lib/log/util_bug.h"
+#include "lib/malloc/malloc.h"
+
+ #include <string.h>
+
+/** Construct and return a new empty pubsub_items_t. */
+static pubsub_items_t *
+pubsub_items_new(void)
+{
+ pubsub_items_t *cfg = tor_malloc_zero(sizeof(*cfg));
+ cfg->items = smartlist_new();
+ cfg->type_items = smartlist_new();
+ return cfg;
+}
+
+/** Release all storage held in a pubsub_items_t. */
+void
+pubsub_items_free_(pubsub_items_t *cfg)
+{
+ if (! cfg)
+ return;
+ SMARTLIST_FOREACH(cfg->items, pubsub_cfg_t *, item, tor_free(item));
+ SMARTLIST_FOREACH(cfg->type_items,
+ pubsub_type_cfg_t *, item, tor_free(item));
+ smartlist_free(cfg->items);
+ smartlist_free(cfg->type_items);
+ tor_free(cfg);
+}
+
+/** Construct and return a new pubsub_builder_t. */
+pubsub_builder_t *
+pubsub_builder_new(void)
+{
+ dispatch_naming_init();
+
+ pubsub_builder_t *pb = tor_malloc_zero(sizeof(*pb));
+ pb->cfg = dcfg_new();
+ pb->items = pubsub_items_new();
+ return pb;
+}
+
+/**
+ * Release all storage held by a pubsub_builder_t.
+ *
+ * You'll (mostly) only want to call this function on an error case: if you're
+ * constructing a dispatch_t instead, you should call
+ * pubsub_builder_finalize() to consume the pubsub_builder_t.
+ */
+void
+pubsub_builder_free_(pubsub_builder_t *pb)
+{
+ if (pb == NULL)
+ return;
+ pubsub_items_free(pb->items);
+ dcfg_free(pb->cfg);
+ tor_free(pb);
+}
+
+/**
+ * Create and return a pubsub_connector_t for the subsystem with ID
+ * <b>subsys</b> to use in adding publications, subscriptions, and types to
+ * <b>builder</b>.
+ **/
+pubsub_connector_t *
+pubsub_connector_for_subsystem(pubsub_builder_t *builder,
+ subsys_id_t subsys)
+{
+ tor_assert(builder);
+ ++builder->n_connectors;
+
+ pubsub_connector_t *con = tor_malloc_zero(sizeof(*con));
+
+ con->builder = builder;
+ con->subsys_id = subsys;
+
+ return con;
+}
+
+/**
+ * Release all storage held by a pubsub_connector_t.
+ **/
+void
+pubsub_connector_free_(pubsub_connector_t *con)
+{
+ if (!con)
+ return;
+
+ if (con->builder) {
+ --con->builder->n_connectors;
+ tor_assert(con->builder->n_connectors >= 0);
+ }
+ tor_free(con);
+}
+
+/**
+ * Use <b>con</b> to add a request for being able to publish messages of type
+ * <b>msg</b> with auxiliary data of <b>type</b> on <b>channel</b>.
+ **/
+int
+pubsub_add_pub_(pubsub_connector_t *con,
+ pub_binding_t *out,
+ channel_id_t channel,
+ message_id_t msg,
+ msg_type_id_t type,
+ unsigned flags,
+ const char *file,
+ unsigned line)
+{
+ pubsub_cfg_t *cfg = tor_malloc_zero(sizeof(*cfg));
+
+ memset(out, 0, sizeof(*out));
+ cfg->is_publish = true;
+
+ out->msg_template.sender = cfg->subsys = con->subsys_id;
+ out->msg_template.channel = cfg->channel = channel;
+ out->msg_template.msg = cfg->msg = msg;
+ out->msg_template.type = cfg->type = type;
+
+ cfg->flags = flags;
+ cfg->added_by_file = file;
+ cfg->added_by_line = line;
+
+ /* We're grabbing a pointer to the pub_binding_t so we can tell it about
+ * the dispatcher later on.
+ */
+ cfg->pub_binding = out;
+
+ smartlist_add(con->builder->items->items, cfg);
+
+ if (dcfg_msg_set_type(con->builder->cfg, msg, type) < 0)
+ goto err;
+ if (dcfg_msg_set_chan(con->builder->cfg, msg, channel) < 0)
+ goto err;
+
+ return 0;
+ err:
+ ++con->builder->n_errors;
+ return -1;
+}
+
+/**
+ * Use <b>con</b> to add a request for being able to publish messages of type
+ * <b>msg</b> with auxiliary data of <b>type</b> on <b>channel</b>,
+ * passing them to the callback in <b>recv_fn</b>.
+ **/
+int
+pubsub_add_sub_(pubsub_connector_t *con,
+ recv_fn_t recv_fn,
+ channel_id_t channel,
+ message_id_t msg,
+ msg_type_id_t type,
+ unsigned flags,
+ const char *file,
+ unsigned line)
+{
+ pubsub_cfg_t *cfg = tor_malloc_zero(sizeof(*cfg));
+
+ cfg->is_publish = false;
+ cfg->subsys = con->subsys_id;
+ cfg->channel = channel;
+ cfg->msg = msg;
+ cfg->type = type;
+ cfg->flags = flags;
+ cfg->added_by_file = file;
+ cfg->added_by_line = line;
+
+ cfg->recv_fn = recv_fn;
+
+ smartlist_add(con->builder->items->items, cfg);
+
+ if (dcfg_msg_set_type(con->builder->cfg, msg, type) < 0)
+ goto err;
+ if (dcfg_msg_set_chan(con->builder->cfg, msg, channel) < 0)
+ goto err;
+ if (! (flags & DISP_FLAG_STUB)) {
+ if (dcfg_add_recv(con->builder->cfg, msg, cfg->subsys, recv_fn) < 0)
+ goto err;
+ }
+
+ return 0;
+ err:
+ ++con->builder->n_errors;
+ return -1;
+}
+
+/**
+ * Use <b>con</b> to define a the functions to use for manipulating the type
+ * <b>type</b>. Any function pointers left as NULL will be implemented as
+ * no-ops.
+ **/
+int
+pubsub_connector_define_type_(pubsub_connector_t *con,
+ msg_type_id_t type,
+ dispatch_typefns_t *fns,
+ const char *file,
+ unsigned line)
+{
+ pubsub_type_cfg_t *cfg = tor_malloc_zero(sizeof(*cfg));
+ cfg->type = type;
+ memcpy(&cfg->fns, fns, sizeof(*fns));
+ cfg->subsys = con->subsys_id;
+ cfg->added_by_file = file;
+ cfg->added_by_line = line;
+
+ smartlist_add(con->builder->items->type_items, cfg);
+
+ if (dcfg_type_set_fns(con->builder->cfg, type, fns) < 0)
+ goto err;
+
+ return 0;
+ err:
+ ++con->builder->n_errors;
+ return -1;
+}
+
+/**
+ * Initialize the dispatch_ptr field in every relevant publish binding
+ * for <b>d</b>.
+ */
+static void
+dispatch_fill_pub_binding_backptrs(pubsub_builder_t *builder,
+ dispatch_t *d)
+{
+ SMARTLIST_FOREACH_BEGIN(builder->items->items, pubsub_cfg_t *, cfg) {
+ if (cfg->pub_binding) {
+ // XXXX we could skip this for STUB publishers, and for any publishers
+ // XXXX where all subscribers are STUB.
+ cfg->pub_binding->dispatch_ptr = d;
+ }
+ } SMARTLIST_FOREACH_END(cfg);
+}
+
+/**
+ * Create a new dispatcher as configured in a pubsub_builder_t.
+ *
+ * Consumes and frees its input.
+ **/
+dispatch_t *
+pubsub_builder_finalize(pubsub_builder_t *builder)
+{
+ dispatch_t *dispatcher = NULL;
+ tor_assert_nonfatal(builder->n_connectors == 0);
+
+ if (builder->n_errors)
+ goto err;
+
+ /* Coming in the next commit.
+ if (pubsub_builder_check(builder) < 0)
+ goto err;
+ */
+
+ dispatcher = dispatch_new(builder->cfg);
+
+ if (!dispatcher)
+ goto err;
+
+ dispatch_fill_pub_binding_backptrs(builder, dispatcher);
+
+ err:
+ pubsub_builder_free(builder);
+ return dispatcher;
+}
diff --git a/src/lib/pubsub/pubsub_build.h b/src/lib/pubsub/pubsub_build.h
new file mode 100644
index 000000000..199eab219
--- /dev/null
+++ b/src/lib/pubsub/pubsub_build.h
@@ -0,0 +1,87 @@
+/* Copyright (c) 2001, Matej Pfajfar.
+ * Copyright (c) 2001-2004, Roger Dingledine.
+ * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
+ * Copyright (c) 2007-2018, The Tor Project, Inc. */
+/* See LICENSE for licensing information */
+
+/**
+ * @file pubsub_build.h
+ * @brief Header used for constructing the OO publish-subscribe facility.
+ *
+ * (See pubsub.h for more general information on this API.)
+ **/
+
+#ifndef TOR_PUBSUB_BUILD_H
+#define TOR_PUBSUB_BUILD_H
+
+struct dispatch_t;
+
+/**
+ * A "dispatch builder" is an incomplete dispatcher, used when
+ * registering messages. It does not have the same integrity guarantees
+ * as a dispatcher. It cannot actually handle messages itself: once all
+ * subsystems have registered, it is converted into a dispatch_t.
+ **/
+typedef struct pubsub_builder_t pubsub_builder_t;
+
+/**
+ * A "dispatch connector" is a view of the dispatcher that a subsystem
+ * uses while initializing itself. It is specific to the subsystem, and
+ * ensures that each subsystem doesn't need to identify itself
+ * repeatedly while registering its messages.
+ **/
+typedef struct pubsub_connector_t pubsub_connector_t;
+
+/**
+ * Create a new pubsub_builder. This should only happen in the
+ * main-init code.
+ */
+pubsub_builder_t *pubsub_builder_new(void);
+
+/** DOCDOC */
+int pubsub_builder_check(pubsub_builder_t *);
+
+/**
+ * Free a pubsub builder. This should only happen on error paths, where
+ * we have decided not to construct a dispatcher for some reason.
+ */
+#define pubsub_builder_free(db) \
+ FREE_AND_NULL(pubsub_builder_t, pubsub_builder_free_, (db))
+
+/** Internal implementation of pubsub_builder_free(). */
+void pubsub_builder_free_(pubsub_builder_t *);
+
+/**
+ * Create a pubsub connector that a single subsystem will use to
+ * register its messages. The main-init code does this during susbsystem
+ * initialization.
+ */
+pubsub_connector_t *pubsub_connector_for_subsystem(pubsub_builder_t *,
+ subsys_id_t);
+
+/**
+ * The main-init code does this after subsystem initialization.
+ */
+#define pubsub_connector_free(c) \
+ FREE_AND_NULL(pubsub_connector_t, pubsub_connector_free_, (c))
+
+void pubsub_connector_free_(pubsub_connector_t *);
+
+/**
+ * Constructs a dispatcher from a dispatch_builder, after checking that the
+ * invariances on the messages, channels, and connections have been
+ * respected.
+ *
+ * This should happen after every subsystem has initialized, and before
+ * entering the mainloop.
+ */
+struct dispatch_t *pubsub_builder_finalize(pubsub_builder_t *);
+
+#ifdef PUBSUB_PRIVATE
+struct pubsub_items_t;
+#define pubsub_items_free(cfg) \
+ FREE_AND_NULL(pubsub_items_t, pubsub_items_free_, (cfg))
+void pubsub_items_free_(struct pubsub_items_t *cfg);
+#endif
+
+#endif
diff --git a/src/lib/pubsub/pubsub_builder_st.h b/src/lib/pubsub/pubsub_builder_st.h
new file mode 100644
index 000000000..a1cc6e718
--- /dev/null
+++ b/src/lib/pubsub/pubsub_builder_st.h
@@ -0,0 +1,161 @@
+/* Copyright (c) 2001, Matej Pfajfar.
+ * Copyright (c) 2001-2004, Roger Dingledine.
+ * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
+ * Copyright (c) 2007-2018, The Tor Project, Inc. */
+/* See LICENSE for licensing information */
+
+/**
+ * @file pubsub_builder_st.h
+ *
+ * @brief private structures used for configuring dispatchers and messages.
+ */
+
+#ifndef TOR_PUBSUB_BUILDER_ST_H
+#define TOR_PUBSUB_BUILDER_ST_H
+
+#ifdef PUBSUB_PRIVATE
+
+#include <stdbool.h>
+#include <stddef.h>
+
+struct dispatch_cfg_t;
+struct smartlist_t;
+struct pub_binding_t;
+
+/**
+ * Configuration for a single publication or subscription request.
+ *
+ * These can be stored while the dispatcher is in use, but are only used for
+ * setup, teardown, and debugging.
+ *
+ * There are various fields in this request describing the message; all of
+ * them must match other descriptions of the message, or a bug has occurred.
+ **/
+typedef struct pubsub_cfg_t {
+ /** True if this is a publishing request; false for a subscribing request. */
+ bool is_publish;
+ /** The system making this request. */
+ subsys_id_t subsys;
+ /** The channel on which the message is to be sent. */
+ channel_id_t channel;
+ /** The message ID to be sent or received. */
+ message_id_t msg;
+ /** The C type associated with the message. */
+ msg_type_id_t type;
+ /** One or more DISP_FLAGS_* items, combined with bitwise OR. */
+ unsigned flags;
+
+ /**
+ * Publishing only: a pub_binding object that will receive the binding for
+ * this request. We will finish filling this in when the dispatcher is
+ * constructed, so that the subsystem can publish then and not before.
+ */
+ struct pub_binding_t *pub_binding;
+
+ /**
+ * Subscribing only: a function to receive message objects for this request.
+ */
+ recv_fn_t recv_fn;
+
+ /** The file from which this message was configured */
+ const char *added_by_file;
+ /** The line at which this message was configured */
+ unsigned added_by_line;
+} pubsub_cfg_t;
+
+/**
+ * Configuration request for a single C type.
+ *
+ * These are stored while the dispatcher is in use, but are only used for
+ * setup, teardown, and debugging.
+ **/
+typedef struct pubsub_type_cfg_t {
+ /**
+ * The identifier for this type.
+ */
+ msg_type_id_t type;
+ /**
+ * Functions to use when manipulating the type.
+ */
+ dispatch_typefns_t fns;
+
+ /** The subsystem that configured this type. */
+ subsys_id_t subsys;
+ /** The file from which this type was configured */
+ const char *added_by_file;
+ /** The line at which this type was configured */
+ unsigned added_by_line;
+} pubsub_type_cfg_t;
+
+/**
+ * The set of configuration requests for a dispatcher, as made by various
+ * subsystems.
+ **/
+typedef struct pubsub_items_t {
+ /** List of pubsub_cfg_t. */
+ struct smartlist_t *items;
+ /** List of pubsub_type_cfg_t. */
+ struct smartlist_t *type_items;
+} pubsub_items_t;
+
+/**
+ * Type used to construct a dispatcher. We use this type to build up the
+ * configuration for a dispatcher, and then pass ownership of that
+ * configuration to the newly constructed dispatcher.
+ **/
+struct pubsub_builder_t {
+ /** Number of outstanding pubsub_connector_t objects pointing to this
+ * pubsub_builder_t. */
+ int n_connectors;
+ /** Number of errors encountered while constructing this object so far. */
+ int n_errors;
+ /** In-progress configuration that we're constructing, as a list of the
+ * requests that have been made. */
+ pubsub_items_t *items;
+ /** In-progress configuration that we're constructing, in a form that can
+ * be converted to a dispatch_t. */
+ struct dispatch_cfg_t *cfg;
+};
+
+/**
+ * Type given to a subsystem when adding connections to a pubsub_builder_t.
+ * We use this type to force each subsystem to get blamed for the
+ * publications, subscriptions, and types that it adds.
+ **/
+struct pubsub_connector_t {
+ /** The pubsub_builder that this connector refers to. */
+ struct pubsub_builder_t *builder;
+ /** The subsystem that has been given this connector. */
+ subsys_id_t subsys_id;
+};
+
+/**
+ * Helper structure used when constructing a dispatcher that sorts the
+ * pubsub_cfg_t objects in various ways.
+ **/
+typedef struct pubsub_adjmap_t {
+ /* XXXX The next three fields are currently constructed but not yet
+ * XXXX used. I believe we'll want them in the future, though. -nickm
+ */
+ /** Number of subsystems; length of the *_by_subsys arrays. */
+ size_t n_subsystems;
+ /** Array of lists of publisher pubsub_cfg_t objects, indexed by
+ * subsystem. */
+ struct smartlist_t **pub_by_subsys;
+ /** Array of lists of subscriber pubsub_cfg_t objects, indexed by
+ * subsystem. */
+ struct smartlist_t **sub_by_subsys;
+
+ /** Number of message IDs; length of the *_by_msg arrays. */
+ size_t n_msgs;
+ /** Array of lists of publisher pubsub_cfg_t objects, indexed by
+ * message ID. */
+ struct smartlist_t **pub_by_msg;
+ /** Array of lists of subscriber pubsub_cfg_t objects, indexed by
+ * message ID. */
+ struct smartlist_t **sub_by_msg;
+} pubsub_adjmap_t;
+
+#endif
+
+#endif
diff --git a/src/lib/pubsub/pubsub_connect.h b/src/lib/pubsub/pubsub_connect.h
new file mode 100644
index 000000000..b63f9dc43
--- /dev/null
+++ b/src/lib/pubsub/pubsub_connect.h
@@ -0,0 +1,47 @@
+/* Copyright (c) 2001, Matej Pfajfar.
+ * Copyright (c) 2001-2004, Roger Dingledine.
+ * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
+ * Copyright (c) 2007-2018, The Tor Project, Inc. */
+/* See LICENSE for licensing information */
+
+/**
+ * @file pubsub_connect.h
+ * @brief Header for functions that add relationships to a pubsub builder.
+ *
+ * These functions are used by modules that need to add publication and
+ * subscription requests.
+ **/
+
+#ifndef TOR_PUBSUB_CONNECT_H
+#define TOR_PUBSUB_CONNECT_H
+
+#include "lib/dispatch/msgtypes.h"
+
+struct pub_binding_t;
+struct pubsub_connector_t;
+
+int pubsub_add_pub_(struct pubsub_connector_t *con,
+ struct pub_binding_t *out,
+ channel_id_t channel,
+ message_id_t msg,
+ msg_type_id_t type,
+ unsigned flags,
+ const char *file,
+ unsigned line);
+
+int pubsub_add_sub_(struct pubsub_connector_t *con,
+ recv_fn_t recv_fn,
+ channel_id_t channel,
+ message_id_t msg,
+ msg_type_id_t type,
+ unsigned flags,
+ const char *file,
+ unsigned line);
+
+int pubsub_connector_define_type_(struct pubsub_connector_t *,
+ msg_type_id_t,
+ dispatch_typefns_t *,
+ const char *file,
+ unsigned line);
+
+#endif
diff --git a/src/lib/pubsub/pubsub_flags.h b/src/lib/pubsub/pubsub_flags.h
new file mode 100644
index 000000000..d07a06be7
--- /dev/null
+++ b/src/lib/pubsub/pubsub_flags.h
@@ -0,0 +1,32 @@
+/* Copyright (c) 2001, Matej Pfajfar.
+ * Copyright (c) 2001-2004, Roger Dingledine.
+ * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
+ * Copyright (c) 2007-2018, The Tor Project, Inc. */
+/* See LICENSE for licensing information */
+
+/**
+ * @file pubsub_flags.h
+ * @brief Flags that can be set on publish/subscribe messages.
+ **/
+
+#ifndef TOR_PUBSUB_FLAGS_H
+#define TOR_PUBSUB_FLAGS_H
+
+/**
+ * Flag for registering a message: declare that no other module is allowed to
+ * publish this message if we are publishing it, or subscribe to it if we are
+ * subscribing to it.
+ */
+#define DISP_FLAG_EXCL (1u<<0)
+
+/**
+ * Flag for registering a message: declare that this message is a stub, and we
+ * will not actually publish/subscribe it, but that the dispatcher should
+ * treat us as if we did when typechecking.
+ *
+ * We use this so that messages aren't treated as "dangling" if they are
+ * potentially used by some other build of Tor.
+ */
+#define DISP_FLAG_STUB (1u<<1)
+
+#endif
diff --git a/src/lib/pubsub/pubsub_publish.c b/src/lib/pubsub/pubsub_publish.c
new file mode 100644
index 000000000..8c469e8ad
--- /dev/null
+++ b/src/lib/pubsub/pubsub_publish.c
@@ -0,0 +1,70 @@
+/* Copyright (c) 2001, Matej Pfajfar.
+ * Copyright (c) 2001-2004, Roger Dingledine.
+ * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
+ * Copyright (c) 2007-2018, The Tor Project, Inc. */
+/* See LICENSE for licensing information */
+
+/**
+ * @file pubsub_publish.h
+ * @brief Header for functions to publish using a pub_binding_t.
+ **/
+
+#define PUBSUB_PRIVATE
+#define DISPATCH_PRIVATE
+#include "orconfig.h"
+
+#include "lib/dispatch/dispatch.h"
+#include "lib/dispatch/dispatch_st.h"
+
+#include "lib/pubsub/pub_binding_st.h"
+#include "lib/pubsub/pubsub_publish.h"
+
+#include "lib/malloc/malloc.h"
+#include "lib/log/util_bug.h"
+
+#include <string.h>
+
+/**
+ * Publish a message from the publication binding <b>pub</b> using the
+ * auxiliary data <b>auxdata</b>.
+ *
+ * Return 0 on success, -1 on failure.
+ **/
+int
+pubsub_pub_(const pub_binding_t *pub, msg_aux_data_t auxdata)
+{
+ dispatch_t *d = pub->dispatch_ptr;
+ if (BUG(! d)) {
+ /* Tried to publish a message before the dispatcher was configured. */
+ /* (Without a dispatcher, we don't know how to free auxdata.) */
+ return -1;
+ }
+
+ if (BUG(pub->msg_template.type >= d->n_types)) {
+ /* The type associated with this message is not known to the dispatcher. */
+ /* (Without a correct type, we don't know how to free auxdata.) */
+ return -1;
+ }
+
+ if (BUG(pub->msg_template.msg >= d->n_msgs) ||
+ BUG(pub->msg_template.channel >= d->n_queues)) {
+ /* The message ID or channel ID was out of bounds. */
+ d->typefns[pub->msg_template.type].free_fn(auxdata);
+ return -1;
+ }
+
+ if (! d->table[pub->msg_template.msg]) {
+ /* Fast path: nobody wants this data. */
+
+ // XXXX Faster path: we could store this in the pub_binding_t.
+ d->typefns[pub->msg_template.type].free_fn(auxdata);
+ return 0;
+ }
+
+ /* Construct the message object */
+ msg_t *m = tor_malloc(sizeof(msg_t));
+ memcpy(m, &pub->msg_template, sizeof(msg_t));
+ m->aux_data__ = auxdata;
+
+ return dispatch_send_msg_unchecked(d, m);
+}
diff --git a/src/lib/pubsub/pubsub_publish.h b/src/lib/pubsub/pubsub_publish.h
new file mode 100644
index 000000000..0250fd076
--- /dev/null
+++ b/src/lib/pubsub/pubsub_publish.h
@@ -0,0 +1,15 @@
+/* Copyright (c) 2001, Matej Pfajfar.
+ * Copyright (c) 2001-2004, Roger Dingledine.
+ * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
+ * Copyright (c) 2007-2018, The Tor Project, Inc. */
+/* See LICENSE for licensing information */
+
+#ifndef TOR_PUBSUB_PUBLISH_H
+#define TOR_PUBSUB_PUBLISH_H
+
+#include "lib/dispatch/msgtypes.h"
+struct pub_binding_t;
+
+int pubsub_pub_(const struct pub_binding_t *pub, msg_aux_data_t auxdata);
+
+#endif
diff --git a/src/test/include.am b/src/test/include.am
index d4e8eb4e8..7734b73de 100644
--- a/src/test/include.am
+++ b/src/test/include.am
@@ -165,6 +165,7 @@ src_test_test_SOURCES += \
src/test/test_proto_misc.c \
src/test/test_protover.c \
src/test/test_pt.c \
+ src/test/test_pubsub_msg.c \
src/test/test_relay.c \
src/test/test_relaycell.c \
src/test/test_relaycrypt.c \
diff --git a/src/test/test.c b/src/test/test.c
index 4c6d9775b..da0be4133 100644
--- a/src/test/test.c
+++ b/src/test/test.c
@@ -910,6 +910,7 @@ struct testgroup_t testgroups[] = {
{ "proto/misc/", proto_misc_tests },
{ "protover/", protover_tests },
{ "pt/", pt_tests },
+ { "pubsub/msg/", pubsub_msg_tests },
{ "relay/" , relay_tests },
{ "relaycell/", relaycell_tests },
{ "relaycrypt/", relaycrypt_tests },
diff --git a/src/test/test.h b/src/test/test.h
index cd0ce4f6d..fb417124c 100644
--- a/src/test/test.h
+++ b/src/test/test.h
@@ -253,6 +253,7 @@ extern struct testcase_t proto_http_tests[];
extern struct testcase_t proto_misc_tests[];
extern struct testcase_t protover_tests[];
extern struct testcase_t pt_tests[];
+extern struct testcase_t pubsub_msg_tests[];
extern struct testcase_t relay_tests[];
extern struct testcase_t relaycell_tests[];
extern struct testcase_t relaycrypt_tests[];
diff --git a/src/test/test_pubsub_msg.c b/src/test/test_pubsub_msg.c
new file mode 100644
index 000000000..5b771d45a
--- /dev/null
+++ b/src/test/test_pubsub_msg.c
@@ -0,0 +1,305 @@
+/* Copyright (c) 2018, The Tor Project, Inc. */
+/* See LICENSE for licensing information */
+
+#define DISPATCH_PRIVATE
+
+#include "test/test.h"
+
+#include "lib/dispatch/dispatch.h"
+#include "lib/dispatch/dispatch_naming.h"
+#include "lib/dispatch/dispatch_st.h"
+#include "lib/dispatch/msgtypes.h"
+#include "lib/pubsub/pubsub_flags.h"
+#include "lib/pubsub/pub_binding_st.h"
+#include "lib/pubsub/pubsub_build.h"
+#include "lib/pubsub/pubsub_builder_st.h"
+#include "lib/pubsub/pubsub_connect.h"
+#include "lib/pubsub/pubsub_publish.h"
+
+#include "lib/log/escape.h"
+#include "lib/malloc/malloc.h"
+#include "lib/string/printf.h"
+
+#include <stdio.h>
+#include <string.h>
+
+static char *
+ex_str_fmt(msg_aux_data_t aux)
+{
+ return esc_for_log(aux.ptr);
+}
+static void
+ex_str_free(msg_aux_data_t aux)
+{
+ tor_free_(aux.ptr);
+}
+static dispatch_typefns_t stringfns = {
+ .free_fn = ex_str_free,
+ .fmt_fn = ex_str_fmt
+};
+
+// We're using the lowest-level publish/subscribe logic here, to avoid the
+// pubsub_macros.h macros and just test the dispatch core. We'll use a string
+// type for everything.
+
+#define DECLARE_MESSAGE(suffix) \
+ static pub_binding_t pub_binding_##suffix; \
+ static int msg_received_##suffix = 0; \
+ static void recv_msg_##suffix(const msg_t *m) { \
+ (void)m; \
+ ++msg_received_##suffix; \
+ } \
+ EAT_SEMICOLON
+
+#define ADD_PUBLISH(binding_suffix, subsys, channel, msg, flags) \
+ STMT_BEGIN { \
+ con = pubsub_connector_for_subsystem(builder, \
+ get_subsys_id(#subsys)); \
+ pubsub_add_pub_(con, &pub_binding_##binding_suffix, \
+ get_channel_id(#channel), \
+ get_message_id(#msg), get_msg_type_id("string"), \
+ (flags), __FILE__, __LINE__); \
+ pubsub_connector_free(con); \
+ } STMT_END
+
+#define ADD_SUBSCRIBE(hook_suffix, subsys, channel, msg, flags) \
+ STMT_BEGIN { \
+ con = pubsub_connector_for_subsystem(builder, \
+ get_subsys_id(#subsys)); \
+ pubsub_add_sub_(con, recv_msg_##hook_suffix, \
+ get_channel_id(#channel), \
+ get_message_id(#msg), get_msg_type_id("string"), \
+ (flags), __FILE__, __LINE__); \
+ pubsub_connector_free(con); \
+ } STMT_END
+
+#define SEND(binding_suffix, val) \
+ STMT_BEGIN { \
+ msg_aux_data_t data_; \
+ data_.ptr = tor_strdup(val); \
+ pubsub_pub_(&pub_binding_##binding_suffix, data_); \
+ } STMT_END
+
+DECLARE_MESSAGE(msg1);
+DECLARE_MESSAGE(msg2);
+DECLARE_MESSAGE(msg3);
+DECLARE_MESSAGE(msg4);
+DECLARE_MESSAGE(msg5);
+
+static smartlist_t *strings_received = NULL;
+static void
+recv_msg_copy_string(const msg_t *m)
+{
+ const char *s = m->aux_data__.ptr;
+ smartlist_add(strings_received, tor_strdup(s));
+}
+
+static void *
+setup_dispatcher(const struct testcase_t *testcase)
+{
+ (void)testcase;
+ pubsub_builder_t *builder = pubsub_builder_new();
+ pubsub_connector_t *con;
+
+ {
+ con = pubsub_connector_for_subsystem(builder, get_subsys_id("types"));
+ pubsub_connector_define_type_(con,
+ get_msg_type_id("string"),
+ &stringfns,
+ "nowhere.c", 99);
+ pubsub_connector_free(con);
+ }
+ // message1 has one publisher and one subscriber.
+ ADD_PUBLISH(msg1, sys1, main, message1, 0);
+ ADD_SUBSCRIBE(msg1, sys2, main, message1, 0);
+
+ // message2 has a publisher and a stub subscriber.
+ ADD_PUBLISH(msg2, sys1, main, message2, 0);
+ ADD_SUBSCRIBE(msg2, sys2, main, message2, DISP_FLAG_STUB);
+
+ // message3 has a publisher and three subscribers.
+ ADD_PUBLISH(msg3, sys1, main, message3, 0);
+ ADD_SUBSCRIBE(msg3, sys2, main, message3, 0);
+ ADD_SUBSCRIBE(msg3, sys3, main, message3, 0);
+ ADD_SUBSCRIBE(msg3, sys4, main, message3, 0);
+
+ // message4 has one publisher and two subscribers, but it's on another
+ // channel.
+ ADD_PUBLISH(msg4, sys2, other, message4, 0);
+ ADD_SUBSCRIBE(msg4, sys1, other, message4, 0);
+ ADD_SUBSCRIBE(msg4, sys3, other, message4, 0);
+
+ // message5 has a huge number of recipients.
+ ADD_PUBLISH(msg5, sys3, main, message5, 0);
+ ADD_SUBSCRIBE(msg5, sys4, main, message5, 0);
+ ADD_SUBSCRIBE(msg5, sys5, main, message5, 0);
+ ADD_SUBSCRIBE(msg5, sys6, main, message5, 0);
+ ADD_SUBSCRIBE(msg5, sys7, main, message5, 0);
+ ADD_SUBSCRIBE(msg5, sys8, main, message5, 0);
+ for (int i = 0; i < 1000-5; ++i) {
+ char *sys;
+ tor_asprintf(&sys, "xsys-%d", i);
+ con = pubsub_connector_for_subsystem(builder, get_subsys_id(sys));
+ pubsub_add_sub_(con, recv_msg_copy_string,
+ get_channel_id("main"),
+ get_message_id("message5"),
+ get_msg_type_id("string"), 0, "here", 100);
+ pubsub_connector_free(con);
+ tor_free(sys);
+ }
+
+ return pubsub_builder_finalize(builder);
+}
+
+static int
+cleanup_dispatcher(const struct testcase_t *testcase, void *dispatcher_)
+{
+ (void)testcase;
+ dispatch_t *dispatcher = dispatcher_;
+ dispatch_free(dispatcher);
+ return 1;
+}
+
+static const struct testcase_setup_t dispatcher_setup = {
+ setup_dispatcher, cleanup_dispatcher
+};
+
+static void
+test_pubsub_msg_minimal(void *arg)
+{
+ dispatch_t *d = arg;
+
+ tt_int_op(0, OP_EQ, msg_received_msg1);
+ SEND(msg1, "hello world");
+ tt_int_op(0, OP_EQ, msg_received_msg1); // hasn't actually arrived yet.
+
+ tt_int_op(0, OP_EQ, dispatch_flush(d, get_channel_id("main"), 1000));
+ tt_int_op(1, OP_EQ, msg_received_msg1); // we got the message!
+
+ done:
+ ;
+}
+
+static void
+test_pubsub_msg_send_to_stub(void *arg)
+{
+ dispatch_t *d = arg;
+
+ tt_int_op(0, OP_EQ, msg_received_msg2);
+ SEND(msg2, "hello silence");
+ tt_int_op(0, OP_EQ, msg_received_msg2); // hasn't actually arrived yet.
+
+ tt_int_op(0, OP_EQ, dispatch_flush(d, get_channel_id("main"), 1000));
+ tt_int_op(0, OP_EQ, msg_received_msg2); // doesn't arrive -- stub hook.
+
+ done:
+ ;
+}
+
+static void
+test_pubsub_msg_cancel_msgs(void *arg)
+{
+ dispatch_t *d = arg;
+
+ tt_int_op(0, OP_EQ, msg_received_msg1);
+ for (int i = 0; i < 100; ++i) {
+ SEND(msg1, "hello world");
+ }
+ tt_int_op(0, OP_EQ, msg_received_msg1); // hasn't actually arrived yet.
+
+ tt_int_op(0, OP_EQ, dispatch_flush(d, get_channel_id("main"), 10));
+ tt_int_op(10, OP_EQ, msg_received_msg1); // we got the message 10 times.
+
+ // At this point, the dispatcher will be freed with queued, undelivered
+ // messages.
+ done:
+ ;
+}
+
+struct alertfn_target {
+ dispatch_t *d;
+ channel_id_t ch;
+ int count;
+};
+static void
+alertfn_generic(dispatch_t *d, channel_id_t ch, void *arg)
+{
+ struct alertfn_target *t = arg;
+ tt_ptr_op(d, OP_EQ, t->d);
+ tt_int_op(ch, OP_EQ, t->ch);
+ ++t->count;
+ done:
+ ;
+}
+
+static void
+test_pubsub_msg_alertfns(void *arg)
+{
+ dispatch_t *d = arg;
+ struct alertfn_target ch1_a = { d, get_channel_id("main"), 0 };
+ struct alertfn_target ch2_a = { d, get_channel_id("other"), 0 };
+
+ tt_int_op(0, OP_EQ,
+ dispatch_set_alert_fn(d, get_channel_id("main"),
+ alertfn_generic, &ch1_a));
+ tt_int_op(0, OP_EQ,
+ dispatch_set_alert_fn(d, get_channel_id("other"),
+ alertfn_generic, &ch2_a));
+
+ SEND(msg3, "hello");
+ tt_int_op(ch1_a.count, OP_EQ, 1);
+ SEND(msg3, "world");
+ tt_int_op(ch1_a.count, OP_EQ, 1); // only the first message sends an alert
+ tt_int_op(ch2_a.count, OP_EQ, 0); // no alert for 'other'
+
+ SEND(msg4, "worse things happen in C");
+ tt_int_op(ch2_a.count, OP_EQ, 1);
+
+ // flush the first (main) channel...
+ tt_int_op(0, OP_EQ, dispatch_flush(d, get_channel_id("main"), 1000));
+ tt_int_op(6, OP_EQ, msg_received_msg3); // 3 subscribers, 2 instances.
+
+ // now that the main channel is flushed, sending another message on it
+ // starts another alert.
+ tt_int_op(ch1_a.count, OP_EQ, 1);
+ SEND(msg1, "plover");
+ tt_int_op(ch1_a.count, OP_EQ, 2);
+ tt_int_op(ch2_a.count, OP_EQ, 1);
+
+ done:
+ ;
+}
+
+/* try more than N_FAST_FNS hooks on msg5 */
+static void
+test_pubsub_msg_many_hooks(void *arg)
+{
+ dispatch_t *d = arg;
+ strings_received = smartlist_new();
+
+ tt_int_op(0, OP_EQ, msg_received_msg5);
+ SEND(msg5, "hello world");
+ tt_int_op(0, OP_EQ, msg_received_msg5);
+ tt_int_op(0, OP_EQ, smartlist_len(strings_received));
+
+ tt_int_op(0, OP_EQ, dispatch_flush(d, get_channel_id("main"), 100000));
+ tt_int_op(5, OP_EQ, msg_received_msg5);
+ tt_int_op(995, OP_EQ, smartlist_len(strings_received));
+
+ done:
+ SMARTLIST_FOREACH(strings_received, char *, s, tor_free(s));
+ smartlist_free(strings_received);
+}
+
+#define T(name) \
+ { #name, test_pubsub_msg_ ## name , TT_FORK, \
+ &dispatcher_setup, NULL }
+
+struct testcase_t pubsub_msg_tests[] = {
+ T(minimal),
+ T(send_to_stub),
+ T(cancel_msgs),
+ T(alertfns),
+ T(many_hooks),
+ END_OF_TESTCASES
+};
1
0

27 Mar '19
commit 24b945f713a713bba0ec4f0d8297b49cbc45c5a1
Author: Nick Mathewson <nickm(a)torproject.org>
Date: Sun Jan 13 16:01:44 2019 -0500
Debug logs to record all messages sent and delivered.
---
src/lib/dispatch/dispatch_core.c | 28 +++++++++++++++++++++++++++-
1 file changed, 27 insertions(+), 1 deletion(-)
diff --git a/src/lib/dispatch/dispatch_core.c b/src/lib/dispatch/dispatch_core.c
index 24dfc649a..da54f9b43 100644
--- a/src/lib/dispatch/dispatch_core.c
+++ b/src/lib/dispatch/dispatch_core.c
@@ -14,6 +14,7 @@
#include "lib/dispatch/dispatch.h"
#include "lib/dispatch/dispatch_st.h"
+#include "lib/dispatch/dispatch_naming.h"
#include "lib/malloc/malloc.h"
#include "lib/log/util_bug.h"
@@ -180,6 +181,17 @@ dispatch_send_msg_unchecked(dispatch_t *d, msg_t *m)
/* Append the message. */
TOR_SIMPLEQ_INSERT_TAIL(&q->queue, m, next);
+ if (debug_logging_enabled()) {
+ char *arg = dispatch_fmt_msg_data(d, m);
+ log_debug(LD_MESG,
+ "Queued: %s (%s) from %s, on %s.",
+ get_message_id_name(m->msg),
+ arg,
+ get_subsys_id_name(m->sender),
+ get_channel_id_name(m->channel));
+ tor_free(arg);
+ }
+
/* If we just made the queue nonempty for the first time, call the alert
* function. */
if (was_empty) {
@@ -199,10 +211,24 @@ dispatcher_run_msg_cbs(const dispatch_t *d, msg_t *m)
dtbl_entry_t *ent = d->table[m->msg];
int n_fns = ent->n_fns;
+ if (debug_logging_enabled()) {
+ char *arg = dispatch_fmt_msg_data(d, m);
+ log_debug(LD_MESG,
+ "Delivering: %s (%s) from %s, on %s:",
+ get_message_id_name(m->msg),
+ arg,
+ get_subsys_id_name(m->sender),
+ get_channel_id_name(m->channel));
+ tor_free(arg);
+ }
+
int i;
for (i=0; i < n_fns; ++i) {
- if (ent->rcv[i].enabled)
+ if (ent->rcv[i].enabled) {
+ log_debug(LD_MESG, " Delivering to %s.",
+ get_subsys_id_name(ent->rcv[i].sys));
ent->rcv[i].fn(m);
+ }
}
}
1
0

[tor/master] Code to manage publish/subscribe setup via subsystem interface.
by asn@torproject.org 27 Mar '19
by asn@torproject.org 27 Mar '19
27 Mar '19
commit bdeaf7d4b2929609c4d3f2ce9adfd973361ef578
Author: Nick Mathewson <nickm(a)torproject.org>
Date: Tue Jan 15 10:27:39 2019 -0500
Code to manage publish/subscribe setup via subsystem interface.
This commit has the necessary logic to run the publish/subscribe
system from the mainloop, and to initialize it on startup and tear
it down later.
---
src/app/main/subsysmgr.c | 52 ++++++++++++-
src/app/main/subsysmgr.h | 5 ++
src/core/include.am | 2 +
src/core/mainloop/mainloop_pubsub.c | 149 ++++++++++++++++++++++++++++++++++++
src/core/mainloop/mainloop_pubsub.h | 23 ++++++
src/lib/subsys/subsys.h | 4 +-
6 files changed, 232 insertions(+), 3 deletions(-)
diff --git a/src/app/main/subsysmgr.c b/src/app/main/subsysmgr.c
index abd2edd10..91a567ce0 100644
--- a/src/app/main/subsysmgr.c
+++ b/src/app/main/subsysmgr.c
@@ -5,9 +5,14 @@
#include "orconfig.h"
#include "app/main/subsysmgr.h"
-#include "lib/err/torerr.h"
+#include "lib/dispatch/dispatch_naming.h"
+#include "lib/dispatch/msgtypes.h"
+#include "lib/err/torerr.h"
#include "lib/log/log.h"
+#include "lib/malloc/malloc.h"
+#include "lib/pubsub/pubsub_build.h"
+#include "lib/pubsub/pubsub_connect.h"
#include <stdio.h>
#include <stdlib.h>
@@ -106,6 +111,51 @@ subsystems_init_upto(int target_level)
}
/**
+ * Add publish/subscribe relationships to <b>builder</b> for all
+ * initialized subsystems of level no more than <b>target_level</b>.
+ **/
+int
+subsystems_add_pubsub_upto(pubsub_builder_t *builder,
+ int target_level)
+{
+ for (unsigned i = 0; i < n_tor_subsystems; ++i) {
+ const subsys_fns_t *sys = tor_subsystems[i];
+ if (!sys->supported)
+ continue;
+ if (sys->level > target_level)
+ break;
+ if (! sys_initialized[i])
+ continue;
+ int r = 0;
+ if (sys->add_pubsub) {
+ subsys_id_t sysid = get_subsys_id(sys->name);
+ raw_assert(sysid != ERROR_ID);
+ pubsub_connector_t *connector;
+ connector = pubsub_connector_for_subsystem(builder, sysid);
+ r = sys->add_pubsub(connector);
+ pubsub_connector_free(connector);
+ }
+ if (r < 0) {
+ fprintf(stderr, "BUG: subsystem %s (at %u) could not connect to "
+ "publish/subscribe system.", sys->name, sys->level);
+ raw_assert_unreached_msg("A subsystem couldn't be connected.");
+ }
+ }
+
+ return 0;
+}
+
+/**
+ * Add publish/subscribe relationships to <b>builder</b> for all
+ * initialized subsystems.
+ **/
+int
+subsystems_add_pubsub(pubsub_builder_t *builder)
+{
+ return subsystems_add_pubsub_upto(builder, MAX_SUBSYS_LEVEL);
+}
+
+/**
* Shut down all the subsystems.
**/
void
diff --git a/src/app/main/subsysmgr.h b/src/app/main/subsysmgr.h
index 4b3cad62a..4878cf8c3 100644
--- a/src/app/main/subsysmgr.h
+++ b/src/app/main/subsysmgr.h
@@ -14,6 +14,11 @@ extern const unsigned n_tor_subsystems;
int subsystems_init(void);
int subsystems_init_upto(int level);
+struct pubsub_builder_t;
+int subsystems_add_pubsub_upto(struct pubsub_builder_t *builder,
+ int target_level);
+int subsystems_add_pubsub(struct pubsub_builder_t *builder);
+
void subsystems_shutdown(void);
void subsystems_shutdown_downto(int level);
diff --git a/src/core/include.am b/src/core/include.am
index ae47c75e0..3a0e907ed 100644
--- a/src/core/include.am
+++ b/src/core/include.am
@@ -22,6 +22,7 @@ LIBTOR_APP_A_SOURCES = \
src/core/mainloop/connection.c \
src/core/mainloop/cpuworker.c \
src/core/mainloop/mainloop.c \
+ src/core/mainloop/mainloop_pubsub.c \
src/core/mainloop/netstatus.c \
src/core/mainloop/periodic.c \
src/core/or/address_set.c \
@@ -213,6 +214,7 @@ noinst_HEADERS += \
src/core/mainloop/connection.h \
src/core/mainloop/cpuworker.h \
src/core/mainloop/mainloop.h \
+ src/core/mainloop/mainloop_pubsub.h \
src/core/mainloop/netstatus.h \
src/core/mainloop/periodic.h \
src/core/or/addr_policy_st.h \
diff --git a/src/core/mainloop/mainloop_pubsub.c b/src/core/mainloop/mainloop_pubsub.c
new file mode 100644
index 000000000..ab3614ae0
--- /dev/null
+++ b/src/core/mainloop/mainloop_pubsub.c
@@ -0,0 +1,149 @@
+/* Copyright (c) 2001, Matej Pfajfar.
+ * Copyright (c) 2001-2004, Roger Dingledine.
+ * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
+ * Copyright (c) 2007-2018, The Tor Project, Inc. */
+/* See LICENSE for licensing information */
+
+#include "orconfig.h"
+
+#include "src/core/or/or.h"
+#include "src/core/mainloop/mainloop.h"
+#include "src/core/mainloop/mainloop_pubsub.h"
+
+#include "lib/container/smartlist.h"
+#include "lib/dispatch/dispatch.h"
+#include "lib/dispatch/dispatch_naming.h"
+#include "lib/evloop/compat_libevent.h"
+#include "lib/pubsub/pubsub.h"
+#include "lib/pubsub/pubsub_build.h"
+
+/**
+ * Dispatcher to use for delivering messages.
+ **/
+static dispatch_t *the_dispatcher = NULL;
+static pubsub_items_t *the_pubsub_items = NULL;
+/**
+ * A list of mainloop_event_t, indexed by channel ID, to flush the messages
+ * on a channel.
+ **/
+static smartlist_t *alert_events = NULL;
+
+/**
+ * Mainloop event callback: flush all the messages in a channel.
+ *
+ * The channel is encoded as a pointer, and passed via arg.
+ **/
+static void
+flush_channel_event(mainloop_event_t *ev, void *arg)
+{
+ (void)ev;
+ if (!the_dispatcher)
+ return;
+
+ channel_id_t chan = (channel_id_t)(uintptr_t)(arg);
+ dispatch_flush(the_dispatcher, chan, INT_MAX);
+}
+
+int
+tor_mainloop_connect_pubsub(struct pubsub_builder_t *builder)
+{
+ int rv = -1;
+ tor_mainloop_disconnect_pubsub();
+
+ the_dispatcher = pubsub_builder_finalize(builder, &the_pubsub_items);
+ if (! the_dispatcher)
+ goto err;
+
+ const size_t num_channels = get_num_channel_ids();
+ alert_events = smartlist_new();
+ for (size_t i = 0; i < num_channels; ++i) {
+ smartlist_add(alert_events,
+ mainloop_event_postloop_new(flush_channel_event,
+ (void*)(uintptr_t)(i)));
+ }
+
+ rv = 0;
+ err:
+ tor_mainloop_disconnect_pubsub();
+ return rv;
+}
+
+/**
+ * Dispatch alertfn callback: do nothing. Implements DELIV_NEVER.
+ **/
+static void
+alertfn_never(dispatch_t *d, channel_id_t chan, void *arg)
+{
+ (void)d;
+ (void)chan;
+ (void)arg;
+}
+
+/**
+ * Dispatch alertfn callback: activate a mainloop event. Implements
+ * DELIV_PROMPT.
+ **/
+static void
+alertfn_prompt(dispatch_t *d, channel_id_t chan, void *arg)
+{
+ (void)d;
+ (void)chan;
+ mainloop_event_t *event = arg;
+ mainloop_event_activate(event);
+}
+
+/**
+ * Dispatch alertfn callback: flush all messages right now. Implements
+ * DELIV_IMMEDIATE.
+ **/
+static void
+alertfn_immediate(dispatch_t *d, channel_id_t chan, void *arg)
+{
+ (void) arg;
+ dispatch_flush(d, chan, INT_MAX);
+}
+
+/**
+ * Set the strategy to be used for delivering messages on the named channel.
+ **/
+int
+tor_mainloop_set_delivery_strategy(const char *msg_channel_name,
+ deliv_strategy_t strategy)
+{
+ channel_id_t chan = get_channel_id(msg_channel_name);
+ if (BUG(chan == ERROR_ID) ||
+ BUG(chan >= smartlist_len(alert_events)))
+ return -1;
+
+ switch (strategy) {
+ case DELIV_NEVER:
+ dispatch_set_alert_fn(the_dispatcher, chan, alertfn_never, NULL);
+ break;
+ case DELIV_PROMPT:
+ dispatch_set_alert_fn(the_dispatcher, chan, alertfn_prompt,
+ smartlist_get(alert_events, chan));
+ break;
+ case DELIV_IMMEDIATE:
+ dispatch_set_alert_fn(the_dispatcher, chan, alertfn_immediate, NULL);
+ break;
+ }
+ return 0;
+}
+
+/**
+ * Remove all pubsub dispatchers and events from the mainloop.
+ **/
+void
+tor_mainloop_disconnect_pubsub(void)
+{
+ if (the_pubsub_items) {
+ pubsub_items_clear_bindings(the_pubsub_items);
+ pubsub_items_free(the_pubsub_items);
+ }
+ if (alert_events) {
+ SMARTLIST_FOREACH(alert_events, mainloop_event_t *, ev,
+ mainloop_event_free(ev));
+ smartlist_free(alert_events);
+ }
+ dispatch_free(the_dispatcher);
+}
diff --git a/src/core/mainloop/mainloop_pubsub.h b/src/core/mainloop/mainloop_pubsub.h
new file mode 100644
index 000000000..6eff77842
--- /dev/null
+++ b/src/core/mainloop/mainloop_pubsub.h
@@ -0,0 +1,23 @@
+/* Copyright (c) 2001, Matej Pfajfar.
+ * Copyright (c) 2001-2004, Roger Dingledine.
+ * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
+ * Copyright (c) 2007-2018, The Tor Project, Inc. */
+/* See LICENSE for licensing information */
+
+#ifndef TOR_MAINLOOP_PUBSUB_H
+#define TOR_MAINLOOP_PUBSUB_H
+
+struct pubsub_builder_t;
+
+typedef enum {
+ DELIV_NEVER=0,
+ DELIV_PROMPT,
+ DELIV_IMMEDIATE,
+} deliv_strategy_t;
+
+int tor_mainloop_connect_pubsub(struct pubsub_builder_t *builder);
+int tor_mainloop_set_delivery_strategy(const char *msg_channel_name,
+ deliv_strategy_t strategy);
+void tor_mainloop_disconnect_pubsub(void);
+
+#endif
diff --git a/src/lib/subsys/subsys.h b/src/lib/subsys/subsys.h
index 2452ec6e2..6f1710c71 100644
--- a/src/lib/subsys/subsys.h
+++ b/src/lib/subsys/subsys.h
@@ -8,7 +8,7 @@
#include <stdbool.h>
-struct dispatch_connector_t;
+struct pubsub_connector_t;
/**
* A subsystem is a part of Tor that is initialized, shut down, configured,
@@ -58,7 +58,7 @@ typedef struct subsys_fns_t {
/**
* Connect a subsystem to the message dispatch system.
**/
- int (*add_pubsub)(struct dispatch_connector_t *);
+ int (*add_pubsub)(struct pubsub_connector_t *);
/**
* Perform any necessary pre-fork cleanup. This function may not fail.
1
0

27 Mar '19
commit 24df14eb096e73438d6045ff3b2840499a9af9b5
Author: Nick Mathewson <nickm(a)torproject.org>
Date: Mon Jan 14 11:29:21 2019 -0500
Pubsub: macros for ease-of-use and typesafety.
---
src/lib/pubsub/include.am | 1 +
src/lib/pubsub/pubsub.h | 1 +
src/lib/pubsub/pubsub_macros.h | 350 ++++++++++++++++++++++++
src/test/include.am | 1 +
src/test/test.c | 1 +
src/test/test.h | 1 +
src/test/test_pubsub_build.c | 596 +++++++++++++++++++++++++++++++++++++++++
7 files changed, 951 insertions(+)
diff --git a/src/lib/pubsub/include.am b/src/lib/pubsub/include.am
index 0ab2fd7b3..c0ec13d03 100644
--- a/src/lib/pubsub/include.am
+++ b/src/lib/pubsub/include.am
@@ -22,4 +22,5 @@ noinst_HEADERS += \
src/lib/pubsub/pubsub_builder_st.h \
src/lib/pubsub/pubsub_connect.h \
src/lib/pubsub/pubsub_flags.h \
+ src/lib/pubsub/pubsub_macros.h \
src/lib/pubsub/pubsub_publish.h
diff --git a/src/lib/pubsub/pubsub.h b/src/lib/pubsub/pubsub.h
index 303b36ad5..1c51f7a78 100644
--- a/src/lib/pubsub/pubsub.h
+++ b/src/lib/pubsub/pubsub.h
@@ -80,6 +80,7 @@
#include "lib/pubsub/pub_binding_st.h"
#include "lib/pubsub/pubsub_connect.h"
#include "lib/pubsub/pubsub_flags.h"
+#include "lib/pubsub/pubsub_macros.h"
#include "lib/pubsub/pubsub_publish.h"
#endif
diff --git a/src/lib/pubsub/pubsub_macros.h b/src/lib/pubsub/pubsub_macros.h
new file mode 100644
index 000000000..aed9c5128
--- /dev/null
+++ b/src/lib/pubsub/pubsub_macros.h
@@ -0,0 +1,350 @@
+/* Copyright (c) 2001, Matej Pfajfar.
+ * Copyright (c) 2001-2004, Roger Dingledine.
+ * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
+ * Copyright (c) 2007-2018, The Tor Project, Inc. */
+/* See LICENSE for licensing information */
+
+/**
+ * \file msg.h
+ * \brief Macros to help with the publish/subscribe dispatch API.
+ *
+ * The dispatch API allows different subsystems of Tor to communicate with
+ * another asynchronously via a shared "message" system. Some subsystems
+ * declare that they publish a given message, and others declare that they
+ * subscribe to it. Both subsystems depend on the message, but not upon one
+ * another.
+ *
+ * To declare a message, use DECLARE_MESSAGE() (for messages that take their
+ * data as a pointer) or DECLARE_MESSAGE_INT() (for messages that take their
+ * data as an integer. For example, you might say
+ *
+ * DECLARE_MESSAGE(new_circuit, circ, circuit_handle_t *);
+ * or
+ * DECLARE_MESSAGE_INT(shutdown_requested, boolean, bool);
+ *
+ * Every message has a unique name, a "type name" that the dispatch system
+ * uses to manage associated data, and a C type name. You'll want to put
+ * these declarations in a header, to be included by all publishers and all
+ * subscribers.
+ *
+ * When a subsystem wants to publish a message, it uses DECLARE_PUBLISH() at
+ * file scope to create necessary static functions. Then, in its subsystem
+ * initialization (in the "bind to dispatcher" callback) (TODO: name this
+ * properly!), it calls DISPATCH_ADD_PUB() to tell the dispatcher about its
+ * intent to publish. When it actually wants to publish, it uses the
+ * PUBLISH() macro. For example:
+ *
+ * // At file scope
+ * DECLARE_PUBLISH(shutdown_requested);
+ *
+ * static void bind_to_dispatcher(pubsub_connector_t *con)
+ * {
+ * DISPATCH_ADD_PUB(con, mainchannel, shutdown_requested);
+ * }
+ *
+ * // somewhere in a function
+ * {
+ * PUBLISH(shutdown_requested, true);
+ * }
+ *
+ * When a subsystem wants to subscribe to a message, it uses
+ * DECLARE_SUBSCRIBE() at file scope to declare static functions. It must
+ * declare a hook function that receives the message type. Then, in its "bind
+ * to dispatcher" function, it calls DISPATCHER_ADD_SUB() to tell the
+ * dispatcher about its intent to subscribe. When another module publishes
+ * the message, the dispatcher will call the provided hook function.
+ *
+ * // At file scope. The first argument is the message that you're
+ * // subscribing to; the second argument is the hook function to declare.
+ * DECLARE_SUBSCRIBE(shutdown_requested, on_shutdown_req_cb);
+ *
+ * // You need to declare this function.
+ * static void on_shutdown_req_cb(const msg_t *msg,
+ * bool value)
+ * {
+ * // (do something here.)
+ * }
+ *
+ * static void bind_to_dispatcher(pubsub_connector_t *con)
+ * {
+ * DISPATCH_ADD_SUB(con, mainchannel, shutdown_requested);
+ * }
+ *
+ * Where did these types come from? Somewhere in the code, you need to call
+ * DISPATCH_DEFINE_TYPE() to make sure that the dispatcher can manage the
+ * message auxiliary data. It associates a vtbl-like structure with the
+ * type name, so that the dispatcher knows how to manipulate the type you're
+ * giving it.
+ *
+ * For example, the "boolean" type we're using above could be defined as:
+ *
+ * static char *boolean_fmt(msg_aux_data_t d)
+ * {
+ * // This is used for debugging and dumping messages.
+ * if (d.u64)
+ * return tor_strdup("true");
+ * else
+ * return tor_strdup("false");
+ * }
+ *
+ * static void boolean_free(msg_aux_data_t d)
+ * {
+ * // We don't actually need to do anything to free a boolean.
+ * // We could use "NULL" instead of this function, but I'm including
+ * // it as an example.
+ * }
+ *
+ * static void bind_to_dispatcher(pubsub_connector_t *con)
+ * {
+ * dispatch_typefns_t boolean_fns = {
+ * .fmt_fn = boolean_fmt,
+ * .free_fn = boolean_free,
+ * };
+ * DISPATCH_DEFINE_TYPE(con, boolean, &boolean_fns);
+ * }
+ *
+ *
+ *
+ * So, how does this all work? (You can stop reading here, unless you're
+ * debugging something.)
+ *
+ * When you declare a message in a header with DECLARE_MESSAGE() or
+ * DECLARE_MESSAGE_INT(), it creates five things:
+ *
+ * * two typedefs for the message argument (constant and non-constant
+ * variants).
+ * * a constant string to hold the declared message type name
+ * * two inline functions, to coerce the message argument type to and from
+ * a "msg_aux_data_t" union.
+ *
+ * All of these declarations have names based on the message name.
+ *
+ * Later, when you say DECLARE_PUBLISH() or DECLARE_SUBSCRIBE(), we use the
+ * elements defined by DECLARE_MESSAGE() to make sure that the publish
+ * function takes the correct argument type, and that the subscription hook is
+ * declared with the right argument type.
+ **/
+
+#ifndef TOR_DISPATCH_MSG_H
+#define TOR_DISPATCH_MSG_H
+
+#include "lib/cc/compat_compiler.h"
+#include "lib/dispatch/dispatch_naming.h"
+#include "lib/pubsub/pub_binding_st.h"
+#include "lib/pubsub/pubsub_connect.h"
+#include "lib/pubsub/pubsub_flags.h"
+#include "lib/pubsub/pubsub_publish.h"
+
+/* Implemenation notes:
+ *
+ * For a messagename "foo", the DECLARE_MESSAGE*() macros must declare:
+ *
+ * msg_arg_type__foo -- a typedef for the argument type of the foo message.
+ * msg_arg_consttype__foo -- a typedef for the const argument type of the
+ * foo message.
+ * msg_arg_name__foo[] -- a static string constant holding the unique
+ * identifier for the type of the foo message.
+ * msg_arg_get__foo() -- an inline function taking a msg_aux_data_t and
+ * returning the C data type.
+ * msg_arg_set__foo() -- an inline function taking a msg_aux_data_t and
+ * the C type, setting the msg_aux_data_t to hold the C type.
+ *
+ * For a messagename "foo", the DECLARE_PUBLISH() macro must declare:
+ *
+ * pub_binding__foo -- A static pub_binding_t object used to send messages
+ * from this module.
+ * publish_fn__foo -- A function taking an argument of the appropriate
+ * C type, to be invoked by PUBLISH().
+ *
+ * For a messagename "foo", the DECLARE_SUBSCRIBE() macro must declare:
+ *
+ * hookfn -- A user-provided function name, with the correct signature.
+ * recv_fn__foo -- A wrapper callback that takes a msg_t *, and calls
+ * hookfn with the appropriate arguments.
+ */
+
+/* Macro to declare common elements shared by DECLARE_MESSAGE and
+ * DECLARE_MESSAGE_INT. Don't call this directly.
+ */
+#define DECLARE_MESSAGE_COMMON__(messagename, typename, c_type) \
+ typedef c_type msg_arg_type__ ##messagename; \
+ typedef const c_type msg_arg_consttype__ ##messagename; \
+ ATTR_UNUSED static const char msg_arg_name__ ##messagename[] = # typename;
+
+/**
+ * Use this macro in a header to declare the existence of a given message,
+ * taking a pointer as auxiliary data.
+ *
+ * "messagename" is a unique identifier for the message.
+ *
+ * "typename" is a unique identifier for the type of the auxiliary data.
+ *
+ * "c_type" is a C pointer type (like "char *" or "struct foo *").
+ */
+#define DECLARE_MESSAGE(messagename, typename, c_type) \
+ DECLARE_MESSAGE_COMMON__(messagename, typename, c_type) \
+ ATTR_UNUSED static inline c_type \
+ msg_arg_get__ ##messagename(msg_aux_data_t m) \
+ { \
+ return m.ptr; \
+ } \
+ ATTR_UNUSED static inline void \
+ msg_arg_set__ ##messagename(msg_aux_data_t *m, c_type v) \
+ { \
+ m->ptr = v; \
+ } \
+ EAT_SEMICOLON
+
+/**
+ * Use this macro in a header to declare the existence of a given message,
+ * taking an integer as auxiliary data.
+ *
+ * "messagename" is a unique identifier for the message.
+ *
+ * "typename" is a unique identifier for the type of the auxiliary data.
+ *
+ * "c_type" is a C integer type, like "int" or "bool". It needs to fit inside
+ * a uint64_t.
+ */
+#define DECLARE_MESSAGE_INT(messagename, typename, c_type) \
+ DECLARE_MESSAGE_COMMON__(messagename, typename, c_type) \
+ ATTR_UNUSED static inline c_type \
+ msg_arg_get__ ##messagename(msg_aux_data_t m) \
+ { \
+ return (c_type)m.u64; \
+ } \
+ ATTR_UNUSED static inline void \
+ msg_arg_set__ ##messagename(msg_aux_data_t *m, c_type v) \
+ { \
+ m->u64 = (uint64_t)v; \
+ } \
+ EAT_SEMICOLON
+
+/**
+ * Use this macro inside a C module declare that we'll be publishing a given
+ * message type from within this module.
+ *
+ * It creates necessary functions and wrappers to publish a message whose
+ * unique identifier is "messagename".
+ *
+ * Before you use this, you need to include the header where DECLARE_MESSAGE*()
+ * was used for this message.
+ */
+#define DECLARE_PUBLISH(messagename) \
+ static pub_binding_t pub_binding__ ##messagename; \
+ static void \
+ publish_fn__ ##messagename(msg_arg_type__ ##messagename arg) \
+ { \
+ msg_aux_data_t data; \
+ msg_arg_set__ ##messagename(&data, arg); \
+ pubsub_pub_(&pub_binding__ ##messagename, data); \
+ } \
+ EAT_SEMICOLON
+
+/**
+ * Use this macro inside a C file to declare that we're subscribing to a
+ * given message and associating it with a given "hook function". It
+ * declares the hook function static, and helps with strong typing.
+ *
+ * Before you use this, you need to include the header where
+ * DECLARE_MESSAGE*() was used for the message whose unique identifier is
+ * "messagename".
+ *
+ * You will need to define a function with the name that you provide for
+ * "hookfn". The type of this function will be:
+ * static void hookfn(const msg_t *, const c_type)
+ * where c_type is the c type that you declared in the header.
+ */
+#define DECLARE_SUBSCRIBE(messagename, hookfn) \
+ static void hookfn(const msg_t *, \
+ const msg_arg_consttype__ ##messagename); \
+ static void recv_fn__ ## messagename(const msg_t *m) \
+ { \
+ msg_arg_type__ ## messagename arg; \
+ arg = msg_arg_get__ ##messagename(m->aux_data__); \
+ hookfn(m, arg); \
+ } \
+ EAT_SEMICOLON
+
+/*
+ * This macro is for internal use. It backs DISPATCH_ADD_PUB*()
+ */
+#define DISPATCH_ADD_PUB_(connector, channel, messagename, flags) \
+ ( \
+ ((void)publish_fn__ ##messagename), \
+ pubsub_add_pub_((connector), \
+ &pub_binding__ ##messagename, \
+ get_channel_id(# channel), \
+ get_message_id(# messagename), \
+ get_msg_type_id(msg_arg_name__ ## messagename), \
+ (flags), \
+ __FILE__, \
+ __LINE__) \
+ )
+
+/**
+ * Use a given connector and channel name to declare that this subsystem will
+ * publish a given message type.
+ *
+ * Call this macro from within the add_subscriptions() function of a module.
+ */
+#define DISPATCH_ADD_PUB(connector, channel, messagename) \
+ DISPATCH_ADD_PUB_(connector, channel, messagename, 0)
+
+/**
+ * Use a given connector and channel name to declare that this subsystem will
+ * publish a given message type, and that no other subsystem is allowed to.
+ *
+ * Call this macro from within the add_subscriptions() function of a module.
+ */
+#define DISPATCH_ADD_PUB_EXCL(connector, channel, messagename) \
+ DISPATCH_ADD_PUB_(connector, channel, messagename, DISP_FLAG_EXCL)
+
+/*
+ * This macro is for internal use. It backs DISPATCH_ADD_SUB*()
+ */
+#define DISPATCH_ADD_SUB_(connector, channel, messagename, flags) \
+ pubsub_add_sub_((connector), \
+ recv_fn__ ##messagename, \
+ get_channel_id(#channel), \
+ get_message_id(# messagename), \
+ get_msg_type_id(msg_arg_name__ ##messagename), \
+ (flags), \
+ __FILE__, \
+ __LINE__)
+/*
+ * Use a given connector and channel name to declare that this subsystem will
+ * receive a given message type.
+ *
+ * Call this macro from within the add_subscriptions() function of a module.
+ */
+#define DISPATCH_ADD_SUB(connector, channel, messagename) \
+ DISPATCH_ADD_SUB_(connector, channel, messagename, 0)
+/**
+ * Use a given connector and channel name to declare that this subsystem will
+ * receive a given message type, and that no other subsystem is allowed to do
+ * so.
+ *
+ * Call this macro from within the add_subscriptions() function of a module.
+ */
+#define DISPATCH_ADD_SUB_EXCL(connector, channel, messagename) \
+ DISPATCH_ADD_SUB_(connector, channel, messagename, DISP_FLAG_EXCL)
+
+/**
+ * Publish a given message with a given argument.
+ */
+#define PUBLISH(messagename, arg) \
+ publish_fn__ ##messagename(arg)
+
+/**
+ * Use a given connector to declare that the functions to be used to manipuate
+ * a certain C type.
+ **/
+#define DISPATCH_DEFINE_TYPE(con, type, fns) \
+ pubsub_connector_define_type_((con), \
+ get_msg_type_id(#type), \
+ (fns), \
+ __FILE__, \
+ __LINE__)
+
+#endif
diff --git a/src/test/include.am b/src/test/include.am
index 7734b73de..2f3bcd59c 100644
--- a/src/test/include.am
+++ b/src/test/include.am
@@ -165,6 +165,7 @@ src_test_test_SOURCES += \
src/test/test_proto_misc.c \
src/test/test_protover.c \
src/test/test_pt.c \
+ src/test/test_pubsub_build.c \
src/test/test_pubsub_msg.c \
src/test/test_relay.c \
src/test/test_relaycell.c \
diff --git a/src/test/test.c b/src/test/test.c
index da0be4133..2309da208 100644
--- a/src/test/test.c
+++ b/src/test/test.c
@@ -910,6 +910,7 @@ struct testgroup_t testgroups[] = {
{ "proto/misc/", proto_misc_tests },
{ "protover/", protover_tests },
{ "pt/", pt_tests },
+ { "pubsub/build/", pubsub_build_tests },
{ "pubsub/msg/", pubsub_msg_tests },
{ "relay/" , relay_tests },
{ "relaycell/", relaycell_tests },
diff --git a/src/test/test.h b/src/test/test.h
index fb417124c..96b22a9a0 100644
--- a/src/test/test.h
+++ b/src/test/test.h
@@ -253,6 +253,7 @@ extern struct testcase_t proto_http_tests[];
extern struct testcase_t proto_misc_tests[];
extern struct testcase_t protover_tests[];
extern struct testcase_t pt_tests[];
+extern struct testcase_t pubsub_build_tests[];
extern struct testcase_t pubsub_msg_tests[];
extern struct testcase_t relay_tests[];
extern struct testcase_t relaycell_tests[];
diff --git a/src/test/test_pubsub_build.c b/src/test/test_pubsub_build.c
new file mode 100644
index 000000000..86b5f763a
--- /dev/null
+++ b/src/test/test_pubsub_build.c
@@ -0,0 +1,596 @@
+/* Copyright (c) 2018, The Tor Project, Inc. */
+/* See LICENSE for licensing information */
+
+#define DISPATCH_PRIVATE
+
+#include "test/test.h"
+
+#include "lib/cc/torint.h"
+#include "lib/dispatch/dispatch.h"
+#include "lib/dispatch/dispatch_naming.h"
+#include "lib/dispatch/dispatch_st.h"
+#include "lib/dispatch/msgtypes.h"
+#include "lib/pubsub/pubsub_macros.h"
+#include "lib/pubsub/pubsub_build.h"
+#include "lib/pubsub/pubsub_builder_st.h"
+
+#include "lib/log/escape.h"
+#include "lib/malloc/malloc.h"
+#include "lib/string/printf.h"
+
+#include "test/log_test_helpers.h"
+
+#include <stdio.h>
+#include <string.h>
+
+static char *
+ex_int_fmt(msg_aux_data_t aux)
+{
+ int val = (int) aux.u64;
+ char *r=NULL;
+ tor_asprintf(&r, "%d", val);
+ return r;
+}
+
+static char *
+ex_str_fmt(msg_aux_data_t aux)
+{
+ return esc_for_log(aux.ptr);
+}
+
+static void
+ex_str_free(msg_aux_data_t aux)
+{
+ tor_free_(aux.ptr);
+}
+
+static dispatch_typefns_t intfns = {
+ .fmt_fn = ex_int_fmt
+};
+
+static dispatch_typefns_t stringfns = {
+ .free_fn = ex_str_free,
+ .fmt_fn = ex_str_fmt
+};
+
+DECLARE_MESSAGE_INT(bunch_of_coconuts, int, int);
+DECLARE_PUBLISH(bunch_of_coconuts);
+DECLARE_SUBSCRIBE(bunch_of_coconuts, coconut_recipient_cb);
+
+DECLARE_MESSAGE(yes_we_have_no, string, char *);
+DECLARE_PUBLISH(yes_we_have_no);
+DECLARE_SUBSCRIBE(yes_we_have_no, absent_item_cb);
+
+static void
+coconut_recipient_cb(const msg_t *m, int n_coconuts)
+{
+ (void)m;
+ (void)n_coconuts;
+}
+
+static void
+absent_item_cb(const msg_t *m, const char *fruitname)
+{
+ (void)m;
+ (void)fruitname;
+}
+
+#define FLAG_SKIP 99999
+
+static void
+seed_dispatch_builder(pubsub_builder_t *b,
+ unsigned fl1, unsigned fl2, unsigned fl3, unsigned fl4)
+{
+ pubsub_connector_t *c = NULL;
+
+ {
+ c = pubsub_connector_for_subsystem(b, get_subsys_id("sys1"));
+ DISPATCH_DEFINE_TYPE(c, int, &intfns);
+ if (fl1 != FLAG_SKIP)
+ DISPATCH_ADD_PUB_(c, main, bunch_of_coconuts, fl1);
+ if (fl2 != FLAG_SKIP)
+ DISPATCH_ADD_SUB_(c, main, yes_we_have_no, fl2);
+ pubsub_connector_free(c);
+ }
+
+ {
+ c = pubsub_connector_for_subsystem(b, get_subsys_id("sys2"));
+ DISPATCH_DEFINE_TYPE(c, string, &stringfns);
+ if (fl3 != FLAG_SKIP)
+ DISPATCH_ADD_PUB_(c, main, yes_we_have_no, fl3);
+ if (fl4 != FLAG_SKIP)
+ DISPATCH_ADD_SUB_(c, main, bunch_of_coconuts, fl4);
+ pubsub_connector_free(c);
+ }
+}
+
+static void
+seed_pubsub_builder_basic(pubsub_builder_t *b)
+{
+ seed_dispatch_builder(b, 0, 0, 0, 0);
+}
+
+/* Regular builder with valid types and messages.
+ */
+static void
+test_pubsub_build_types_ok(void *arg)
+{
+ (void)arg;
+ pubsub_builder_t *b = NULL;
+ dispatch_t *dispatcher = NULL;
+ pubsub_connector_t *c = NULL;
+
+ b = pubsub_builder_new();
+ seed_pubsub_builder_basic(b);
+
+ dispatcher = pubsub_builder_finalize(b, NULL);
+ b = NULL;
+ tt_assert(dispatcher);
+
+ tt_int_op(dispatcher->n_types, OP_GE, 2);
+ tt_assert(dispatcher->typefns);
+
+ tt_assert(dispatcher->typefns[get_msg_type_id("int")].fmt_fn == ex_int_fmt);
+ tt_assert(dispatcher->typefns[get_msg_type_id("string")].fmt_fn ==
+ ex_str_fmt);
+
+ done:
+ pubsub_connector_free(c);
+ pubsub_builder_free(b);
+ dispatch_free(dispatcher);
+}
+
+/* We fail if the same type is defined in two places with different functions.
+ */
+static void
+test_pubsub_build_types_decls_conflict(void *arg)
+{
+ (void)arg;
+ pubsub_builder_t *b = NULL;
+ dispatch_t *dispatcher = NULL;
+ pubsub_connector_t *c = NULL;
+
+ b = pubsub_builder_new();
+ seed_pubsub_builder_basic(b);
+ {
+ c = pubsub_connector_for_subsystem(b, get_subsys_id("sys3"));
+ // Extra declaration of int: we don't allow this.
+ DISPATCH_DEFINE_TYPE(c, int, &stringfns);
+ pubsub_connector_free(c);
+ }
+
+ setup_full_capture_of_logs(LOG_WARN);
+ dispatcher = pubsub_builder_finalize(b, NULL);
+ b = NULL;
+ tt_assert(dispatcher == NULL);
+ // expect_log_msg_containing("(int) declared twice"); // XXXX
+
+ done:
+ pubsub_connector_free(c);
+ pubsub_builder_free(b);
+ dispatch_free(dispatcher);
+ teardown_capture_of_logs();
+}
+
+/* If a message ID exists but nobody is publishing or subscribing to it,
+ * that's okay. */
+static void
+test_pubsub_build_unused_message(void *arg)
+{
+ (void)arg;
+ pubsub_builder_t *b = NULL;
+ dispatch_t *dispatcher = NULL;
+
+ b = pubsub_builder_new();
+ seed_pubsub_builder_basic(b);
+
+ // This message isn't actually generated by anyone, but that will be fine:
+ // we just log it at info.
+ get_message_id("unused");
+ setup_capture_of_logs(LOG_INFO);
+
+ dispatcher = pubsub_builder_finalize(b, NULL);
+ b = NULL;
+ tt_assert(dispatcher);
+ expect_log_msg_containing(
+ "Nobody is publishing or subscribing to message");
+
+ done:
+ pubsub_builder_free(b);
+ dispatch_free(dispatcher);
+ teardown_capture_of_logs();
+}
+
+/* Publishing or subscribing to a message with no subscribers / publishers
+ * should fail and warn. */
+static void
+test_pubsub_build_missing_pubsub(void *arg)
+{
+ (void)arg;
+ pubsub_builder_t *b = NULL;
+ dispatch_t *dispatcher = NULL;
+
+ b = pubsub_builder_new();
+ seed_dispatch_builder(b, 0, 0, FLAG_SKIP, FLAG_SKIP);
+
+ setup_full_capture_of_logs(LOG_WARN);
+ dispatcher = pubsub_builder_finalize(b, NULL);
+ b = NULL;
+ tt_assert(dispatcher == NULL);
+
+ expect_log_msg_containing(
+ "Message 0 (bunch_of_coconuts) has publishers, but no subscribers.");
+ expect_log_msg_containing(
+ "Message 1 (yes_we_have_no) has subscribers, but no publishers.");
+
+ done:
+ pubsub_builder_free(b);
+ dispatch_free(dispatcher);
+ teardown_capture_of_logs();
+}
+
+/* Make sure that a stub publisher or subscriber prevents an error from
+ * happening even if there are no other publishers/subscribers for a message
+ */
+static void
+test_pubsub_build_stub_pubsub(void *arg)
+{
+ (void)arg;
+ pubsub_builder_t *b = NULL;
+ dispatch_t *dispatcher = NULL;
+
+ b = pubsub_builder_new();
+ seed_dispatch_builder(b, 0, 0, DISP_FLAG_STUB, DISP_FLAG_STUB);
+
+ dispatcher = pubsub_builder_finalize(b, NULL);
+ b = NULL;
+ tt_assert(dispatcher);
+
+ // 1 subscriber.
+ tt_int_op(1, OP_EQ,
+ dispatcher->table[get_message_id("yes_we_have_no")]->n_enabled);
+ // no subscribers
+ tt_ptr_op(NULL, OP_EQ,
+ dispatcher->table[get_message_id("bunch_of_coconuts")]);
+
+ done:
+ pubsub_builder_free(b);
+ dispatch_free(dispatcher);
+}
+
+/* Only one channel per msg id. */
+static void
+test_pubsub_build_channels_conflict(void *arg)
+{
+ (void)arg;
+ pubsub_builder_t *b = NULL;
+ dispatch_t *dispatcher = NULL;
+ pubsub_connector_t *c = NULL;
+
+ b = pubsub_builder_new();
+ seed_pubsub_builder_basic(b);
+ pub_binding_t btmp;
+
+ {
+ c = pubsub_connector_for_subsystem(b, get_subsys_id("problems"));
+ /* Usually the DISPATCH_ADD_PUB macro would keep us from using
+ * the wrong channel */
+ pubsub_add_pub_(c, &btmp, get_channel_id("hithere"),
+ get_message_id("bunch_of_coconuts"),
+ get_msg_type_id("int"),
+ 0 /* flags */,
+ "somewhere.c", 22);
+ pubsub_connector_free(c);
+ };
+
+ setup_full_capture_of_logs(LOG_WARN);
+ dispatcher = pubsub_builder_finalize(b, NULL);
+ b = NULL;
+ tt_assert(dispatcher == NULL);
+
+ expect_log_msg_containing("Message 0 (bunch_of_coconuts) is associated "
+ "with multiple inconsistent channels.");
+
+ done:
+ pubsub_builder_free(b);
+ dispatch_free(dispatcher);
+ teardown_capture_of_logs();
+}
+
+/* Only one type per msg id. */
+static void
+test_pubsub_build_types_conflict(void *arg)
+{
+ (void)arg;
+ pubsub_builder_t *b = NULL;
+ dispatch_t *dispatcher = NULL;
+ pubsub_connector_t *c = NULL;
+
+ b = pubsub_builder_new();
+ seed_pubsub_builder_basic(b);
+ pub_binding_t btmp;
+
+ {
+ c = pubsub_connector_for_subsystem(b, get_subsys_id("problems"));
+ /* Usually the DISPATCH_ADD_PUB macro would keep us from using
+ * the wrong channel */
+ pubsub_add_pub_(c, &btmp, get_channel_id("hithere"),
+ get_message_id("bunch_of_coconuts"),
+ get_msg_type_id("string"),
+ 0 /* flags */,
+ "somewhere.c", 22);
+ pubsub_connector_free(c);
+ };
+
+ setup_full_capture_of_logs(LOG_WARN);
+ dispatcher = pubsub_builder_finalize(b, NULL);
+ b = NULL;
+ tt_assert(dispatcher == NULL);
+
+ expect_log_msg_containing("Message 0 (bunch_of_coconuts) is associated "
+ "with multiple inconsistent message types.");
+
+ done:
+ pubsub_builder_free(b);
+ dispatch_free(dispatcher);
+ teardown_capture_of_logs();
+}
+
+/* The same module can't publish and subscribe the same message */
+static void
+test_pubsub_build_pubsub_same(void *arg)
+{
+ (void)arg;
+ pubsub_builder_t *b = NULL;
+ dispatch_t *dispatcher = NULL;
+ pubsub_connector_t *c = NULL;
+
+ b = pubsub_builder_new();
+ seed_pubsub_builder_basic(b);
+
+ {
+ c = pubsub_connector_for_subsystem(b, get_subsys_id("sys1"));
+ // already publishing this.
+ DISPATCH_ADD_SUB(c, main, bunch_of_coconuts);
+ pubsub_connector_free(c);
+ };
+
+ setup_full_capture_of_logs(LOG_WARN);
+ dispatcher = pubsub_builder_finalize(b, NULL);
+ b = NULL;
+ tt_assert(dispatcher == NULL);
+
+ expect_log_msg_containing("Message 0 (bunch_of_coconuts) is published "
+ "and subscribed by the same subsystem 0 (sys1)");
+
+ done:
+ pubsub_builder_free(b);
+ dispatch_free(dispatcher);
+ teardown_capture_of_logs();
+}
+
+/* More than one subsystem may publish or subscribe, and that's okay. */
+static void
+test_pubsub_build_pubsub_multi(void *arg)
+{
+ (void)arg;
+ pubsub_builder_t *b = NULL;
+ dispatch_t *dispatcher = NULL;
+ pubsub_connector_t *c = NULL;
+
+ b = pubsub_builder_new();
+ seed_pubsub_builder_basic(b);
+ pub_binding_t btmp;
+
+ {
+ c = pubsub_connector_for_subsystem(b, get_subsys_id("sys3"));
+ DISPATCH_ADD_SUB(c, main, bunch_of_coconuts);
+ pubsub_add_pub_(c, &btmp, get_channel_id("main"),
+ get_message_id("yes_we_have_no"),
+ get_msg_type_id("string"),
+ 0 /* flags */,
+ "somewhere.c", 22);
+ pubsub_connector_free(c);
+ };
+
+ dispatcher = pubsub_builder_finalize(b, NULL);
+ b = NULL;
+ tt_assert(dispatcher);
+
+ // 1 subscribers
+ tt_int_op(1, OP_EQ,
+ dispatcher->table[get_message_id("yes_we_have_no")]->n_enabled);
+ // 2 subscribers.
+ dtbl_entry_t *ent =
+ dispatcher->table[get_message_id("bunch_of_coconuts")];
+ tt_int_op(2, OP_EQ, ent->n_enabled);
+ tt_int_op(2, OP_EQ, ent->n_fns);
+ tt_ptr_op(ent->rcv[0].fn, OP_EQ, recv_fn__bunch_of_coconuts);
+ tt_ptr_op(ent->rcv[1].fn, OP_EQ, recv_fn__bunch_of_coconuts);
+
+ done:
+ pubsub_builder_free(b);
+ dispatch_free(dispatcher);
+}
+
+static void
+some_other_coconut_hook(const msg_t *m)
+{
+ (void)m;
+}
+
+/* Subscribe hooks should be build correctly when there are a bunch of
+ * them. */
+static void
+test_pubsub_build_sub_many(void *arg)
+{
+ (void)arg;
+ pubsub_builder_t *b = NULL;
+ dispatch_t *dispatcher = NULL;
+ pubsub_connector_t *c = NULL;
+ char *sysname = NULL;
+ b = pubsub_builder_new();
+ seed_pubsub_builder_basic(b);
+
+ int i;
+ for (i = 1; i < 100; ++i) {
+ tor_asprintf(&sysname, "system%d",i);
+ c = pubsub_connector_for_subsystem(b, get_subsys_id(sysname));
+ if (i % 7) {
+ DISPATCH_ADD_SUB(c, main, bunch_of_coconuts);
+ } else {
+ pubsub_add_sub_(c, some_other_coconut_hook,
+ get_channel_id("main"),
+ get_message_id("bunch_of_coconuts"),
+ get_msg_type_id("int"),
+ 0 /* flags */,
+ "somewhere.c", 22);
+ }
+ pubsub_connector_free(c);
+ tor_free(sysname);
+ };
+
+ dispatcher = pubsub_builder_finalize(b, NULL);
+ b = NULL;
+ tt_assert(dispatcher);
+
+ dtbl_entry_t *ent =
+ dispatcher->table[get_message_id("bunch_of_coconuts")];
+ tt_int_op(100, OP_EQ, ent->n_enabled);
+ tt_int_op(100, OP_EQ, ent->n_fns);
+ tt_ptr_op(ent->rcv[0].fn, OP_EQ, recv_fn__bunch_of_coconuts);
+ tt_ptr_op(ent->rcv[1].fn, OP_EQ, recv_fn__bunch_of_coconuts);
+ tt_ptr_op(ent->rcv[76].fn, OP_EQ, recv_fn__bunch_of_coconuts);
+ tt_ptr_op(ent->rcv[77].fn, OP_EQ, some_other_coconut_hook);
+ tt_ptr_op(ent->rcv[78].fn, OP_EQ, recv_fn__bunch_of_coconuts);
+
+ done:
+ pubsub_builder_free(b);
+ dispatch_free(dispatcher);
+ tor_free(sysname);
+}
+
+/* The same subsystem can only declare one publish or subscribe. */
+static void
+test_pubsub_build_pubsub_redundant(void *arg)
+{
+ (void)arg;
+ pubsub_builder_t *b = NULL;
+ dispatch_t *dispatcher = NULL;
+ pubsub_connector_t *c = NULL;
+
+ b = pubsub_builder_new();
+ seed_pubsub_builder_basic(b);
+ pub_binding_t btmp;
+
+ {
+ c = pubsub_connector_for_subsystem(b, get_subsys_id("sys2"));
+ DISPATCH_ADD_SUB(c, main, bunch_of_coconuts);
+ pubsub_add_pub_(c, &btmp, get_channel_id("main"),
+ get_message_id("yes_we_have_no"),
+ get_msg_type_id("string"),
+ 0 /* flags */,
+ "somewhere.c", 22);
+ pubsub_connector_free(c);
+ };
+
+ setup_full_capture_of_logs(LOG_WARN);
+ dispatcher = pubsub_builder_finalize(b, NULL);
+ b = NULL;
+ tt_assert(dispatcher == NULL);
+
+ expect_log_msg_containing(
+ "is configured to be published by subsystem 1 (sys2) more than once");
+ expect_log_msg_containing(
+ "is configured to be subscribed by subsystem 1 (sys2) more than once");
+
+ done:
+ pubsub_builder_free(b);
+ dispatch_free(dispatcher);
+ teardown_capture_of_logs();
+}
+
+/* It's fine to declare the excl flag. */
+static void
+test_pubsub_build_excl_ok(void *arg)
+{
+ (void)arg;
+ pubsub_builder_t *b = NULL;
+ dispatch_t *dispatcher = NULL;
+
+ b = pubsub_builder_new();
+ // Try one excl/excl pair and one excl/non pair.
+ seed_dispatch_builder(b, DISP_FLAG_EXCL, 0,
+ DISP_FLAG_EXCL, DISP_FLAG_EXCL);
+
+ dispatcher = pubsub_builder_finalize(b, NULL);
+ b = NULL;
+ tt_assert(dispatcher);
+
+ // 1 subscribers
+ tt_int_op(1, OP_EQ,
+ dispatcher->table[get_message_id("yes_we_have_no")]->n_enabled);
+ // 1 subscriber.
+ tt_int_op(1, OP_EQ,
+ dispatcher->table[get_message_id("bunch_of_coconuts")]->n_enabled);
+
+ done:
+ pubsub_builder_free(b);
+ dispatch_free(dispatcher);
+}
+
+/* but if you declare the excl flag, you need to mean it. */
+static void
+test_pubsub_build_excl_bad(void *arg)
+{
+ (void)arg;
+ pubsub_builder_t *b = NULL;
+ dispatch_t *dispatcher = NULL;
+ pubsub_connector_t *c = NULL;
+
+ b = pubsub_builder_new();
+ seed_dispatch_builder(b, DISP_FLAG_EXCL, DISP_FLAG_EXCL,
+ 0, 0);
+
+ {
+ c = pubsub_connector_for_subsystem(b, get_subsys_id("sys3"));
+ DISPATCH_ADD_PUB_(c, main, bunch_of_coconuts, 0);
+ DISPATCH_ADD_SUB_(c, main, yes_we_have_no, 0);
+ pubsub_connector_free(c);
+ };
+
+ setup_full_capture_of_logs(LOG_WARN);
+ dispatcher = pubsub_builder_finalize(b, NULL);
+ b = NULL;
+ tt_assert(dispatcher == NULL);
+
+ expect_log_msg_containing("has multiple publishers, but at least one is "
+ "marked as exclusive.");
+ expect_log_msg_containing("has multiple subscribers, but at least one is "
+ "marked as exclusive.");
+
+ done:
+ pubsub_builder_free(b);
+ dispatch_free(dispatcher);
+ teardown_capture_of_logs();
+}
+
+#define T(name, flags) \
+ { #name, test_pubsub_build_ ## name , (flags), NULL, NULL }
+
+struct testcase_t pubsub_build_tests[] = {
+ T(types_ok, TT_FORK),
+ T(types_decls_conflict, TT_FORK),
+ T(unused_message, TT_FORK),
+ T(missing_pubsub, TT_FORK),
+ T(stub_pubsub, TT_FORK),
+ T(channels_conflict, TT_FORK),
+ T(types_conflict, TT_FORK),
+ T(pubsub_same, TT_FORK),
+ T(pubsub_multi, TT_FORK),
+ T(sub_many, TT_FORK),
+ T(pubsub_redundant, TT_FORK),
+ T(excl_ok, TT_FORK),
+ T(excl_bad, TT_FORK),
+ END_OF_TESTCASES
+};
1
0

[tor/master] Low-level dispatch module for publish-subscribe mechanism
by asn@torproject.org 27 Mar '19
by asn@torproject.org 27 Mar '19
27 Mar '19
commit e4d3098d4d23686320013b80b6305fbd52863f76
Author: Nick Mathewson <nickm(a)torproject.org>
Date: Fri Jan 11 20:17:04 2019 -0500
Low-level dispatch module for publish-subscribe mechanism
This module implements a way to send messages from one module to
another, with associated data types. It does not yet do anything to
ensure that messages are correct, that types match, or that other
forms of consistency are preserved.
---
.gitignore | 2 +
Makefile.am | 2 +
src/include.am | 1 +
src/lib/dispatch/.may_include | 9 ++
src/lib/dispatch/dispatch.h | 114 ++++++++++++++++++
src/lib/dispatch/dispatch_cfg.c | 138 ++++++++++++++++++++++
src/lib/dispatch/dispatch_cfg.h | 39 +++++++
src/lib/dispatch/dispatch_cfg_st.h | 25 ++++
src/lib/dispatch/dispatch_core.c | 234 +++++++++++++++++++++++++++++++++++++
src/lib/dispatch/dispatch_new.c | 170 +++++++++++++++++++++++++++
src/lib/dispatch/dispatch_st.h | 108 +++++++++++++++++
src/lib/dispatch/include.am | 23 ++++
src/lib/dispatch/msgtypes.h | 80 +++++++++++++
src/test/include.am | 1 +
src/test/test.c | 1 +
src/test/test.h | 1 +
src/test/test_dispatch.c | 181 ++++++++++++++++++++++++++++
17 files changed, 1129 insertions(+)
diff --git a/.gitignore b/.gitignore
index 6a49285b8..f4f6dacbb 100644
--- a/.gitignore
+++ b/.gitignore
@@ -168,6 +168,8 @@ uptime-*.json
/src/lib/libtor-crypt-ops-testing.a
/src/lib/libtor-ctime.a
/src/lib/libtor-ctime-testing.a
+/src/lib/libtor-dispatch.a
+/src/lib/libtor-dispatch-testing.a
/src/lib/libtor-encoding.a
/src/lib/libtor-encoding-testing.a
/src/lib/libtor-evloop.a
diff --git a/Makefile.am b/Makefile.am
index a5086b303..36d9725f3 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -41,6 +41,7 @@ TOR_UTIL_LIBS = \
src/lib/libtor-geoip.a \
src/lib/libtor-process.a \
src/lib/libtor-buf.a \
+ src/lib/libtor-dispatch.a \
src/lib/libtor-time.a \
src/lib/libtor-fs.a \
src/lib/libtor-encoding.a \
@@ -72,6 +73,7 @@ TOR_UTIL_TESTING_LIBS = \
src/lib/libtor-geoip-testing.a \
src/lib/libtor-process-testing.a \
src/lib/libtor-buf-testing.a \
+ src/lib/libtor-dispatch-testing.a \
src/lib/libtor-time-testing.a \
src/lib/libtor-fs-testing.a \
src/lib/libtor-encoding-testing.a \
diff --git a/src/include.am b/src/include.am
index 9070a69a0..c6c351c80 100644
--- a/src/include.am
+++ b/src/include.am
@@ -8,6 +8,7 @@ include src/lib/compress/include.am
include src/lib/container/include.am
include src/lib/crypt_ops/include.am
include src/lib/defs/include.am
+include src/lib/dispatch/include.am
include src/lib/encoding/include.am
include src/lib/evloop/include.am
include src/lib/fdio/include.am
diff --git a/src/lib/dispatch/.may_include b/src/lib/dispatch/.may_include
new file mode 100644
index 000000000..9b5373907
--- /dev/null
+++ b/src/lib/dispatch/.may_include
@@ -0,0 +1,9 @@
+orconfig.h
+
+ext/tor_queue.h
+
+lib/container/*.h
+lib/dispatch/*.h
+lib/intmath/*.h
+lib/log/*.h
+lib/malloc/*.h
diff --git a/src/lib/dispatch/dispatch.h b/src/lib/dispatch/dispatch.h
new file mode 100644
index 000000000..8e62e8f16
--- /dev/null
+++ b/src/lib/dispatch/dispatch.h
@@ -0,0 +1,114 @@
+/* Copyright (c) 2001, Matej Pfajfar.
+ * Copyright (c) 2001-2004, Roger Dingledine.
+ * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
+ * Copyright (c) 2007-2018, The Tor Project, Inc. */
+/* See LICENSE for licensing information */
+
+#ifndef TOR_DISPATCH_H
+#define TOR_DISPATCH_H
+
+#include "lib/dispatch/msgtypes.h"
+
+/**
+ * \file dispatch.h
+ * \brief Low-level APIs for message-passing system.
+ *
+ * This module implements message dispatch based on a set of short integer
+ * identifiers. For a higher-level interface, see pubsub.h.
+ *
+ * Each message is represented as a generic msg_t object, and is discriminated
+ * by its message_id_t. Messages are delivered by a dispatch_t object, which
+ * delivers each message to its recipients by a configured "channel".
+ *
+ * A "channel" is a means of delivering messages. Every message_id_t must
+ * be associated with exactly one channel, identified by channel_id_t.
+ * When a channel receives messages, a callback is invoked to either process
+ * the messages immediately, or to cause them to be processed later.
+ *
+ * Every message_id_t has zero or more associated receiver functions set up in
+ * the dispatch_t object. Once the dispatch_t object is created, receivers
+ * can be enabled or disabled [TODO], but not added or removed.
+ *
+ * Every message_id_t has an associated datatype, identified by a
+ * msg_type_id_t. These datatypes can be associated with functions to
+ * (for example) free them, or format them for debugging.
+ *
+ * To setup a dispatch_t object, first create a dispatch_cfg_t object, and
+ * configure messages with their types, channels, and receivers. Then, use
+ * dispatch_new() with that dispatch_cfg_t to create the dispatch_t object.
+ *
+ * (We use a two-phase contruction procedure here to enable better static
+ * reasoning about publish/subscribe relationships.)
+ *
+ * Once you have a dispatch_t, you can queue messages on it with
+ * dispatch_send*(), and cause those messages to be delivered with
+ * dispatch_flush().
+ **/
+
+/**
+ * A "dispatcher" is the highest-level object; it handles making sure that
+ * messages are received and delivered properly. Only the mainloop
+ * should handle this type directly.
+ */
+typedef struct dispatch_t dispatch_t;
+
+struct dispatch_cfg_t;
+
+dispatch_t *dispatch_new(const struct dispatch_cfg_t *cfg);
+
+/**
+ * Free a dispatcher. Tor does this at exit.
+ */
+#define dispatch_free(d) \
+ FREE_AND_NULL(dispatch_t, dispatch_free_, (d))
+
+void dispatch_free_(dispatch_t *);
+
+int dispatch_send(dispatch_t *d,
+ subsys_id_t sender,
+ channel_id_t channel,
+ message_id_t msg,
+ msg_type_id_t type,
+ msg_aux_data_t auxdata);
+
+int dispatch_send_msg(dispatch_t *d, msg_t *m);
+
+int dispatch_send_msg_unchecked(dispatch_t *d, msg_t *m);
+
+/* Flush up to <b>max_msgs</b> currently pending messages from the
+ * dispatcher. Messages that are not pending when this function are
+ * called, are not flushed by this call. Return 0 on success, -1 on
+ * unrecoverable error.
+ */
+int dispatch_flush(dispatch_t *, channel_id_t chan, int max_msgs);
+
+/**
+ * Function callback type used to alert some other module when a channel's
+ * queue changes from empty to nonempty.
+ *
+ * Ex 1: To cause messages to be processed immediately on-stack, this callback
+ * should invoke dispatch_flush() directly.
+ *
+ * Ex 2: To cause messages to be processed very soon, from the event queue,
+ * this callback should schedule an event callback to run dispatch_flush().
+ *
+ * Ex 3: To cause messages to be processed periodically, this function should
+ * do nothing, and a periodic event should invoke dispatch_flush().
+ **/
+typedef void (*dispatch_alertfn_t)(struct dispatch_t *,
+ channel_id_t, void *);
+
+int dispatch_set_alert_fn(dispatch_t *d, channel_id_t chan,
+ dispatch_alertfn_t fn, void *userdata);
+
+#define dispatch_free_msg(d,msg) \
+ STMT_BEGIN { \
+ msg_t **msg_tmp_ptr__ = &(msg); \
+ dispatch_free_msg_((d), *msg_tmp_ptr__); \
+ *msg_tmp_ptr__= NULL; \
+ } STMT_END
+void dispatch_free_msg_(const dispatch_t *d, msg_t *msg);
+
+char *dispatch_fmt_msg_data(const dispatch_t *d, const msg_t *msg);
+
+#endif
diff --git a/src/lib/dispatch/dispatch_cfg.c b/src/lib/dispatch/dispatch_cfg.c
new file mode 100644
index 000000000..26e37f469
--- /dev/null
+++ b/src/lib/dispatch/dispatch_cfg.c
@@ -0,0 +1,138 @@
+/* Copyright (c) 2001, Matej Pfajfar.
+ * Copyright (c) 2001-2004, Roger Dingledine.
+ * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
+ * Copyright (c) 2007-2018, The Tor Project, Inc. */
+/* See LICENSE for licensing information */
+
+/**
+ * \file dispatch_cfg.c
+ * \brief Create and configure a dispatch_cfg_t.
+ *
+ * A dispatch_cfg_t object is used to configure a set of messages and
+ * associated information before creating a dispatch_t.
+ */
+
+#define DISPATCH_PRIVATE
+
+#include "orconfig.h"
+#include "lib/dispatch/dispatch_cfg.h"
+#include "lib/dispatch/dispatch_cfg_st.h"
+#include "lib/dispatch/dispatch.h"
+#include "lib/dispatch/dispatch_st.h"
+
+#include "lib/container/smartlist.h"
+#include "lib/malloc/malloc.h"
+
+/**
+ * Create and return a new dispatch_cfg_t.
+ **/
+dispatch_cfg_t *
+dcfg_new(void)
+{
+ dispatch_cfg_t *cfg = tor_malloc(sizeof(dispatch_cfg_t));
+ cfg->type_by_msg = smartlist_new();
+ cfg->chan_by_msg = smartlist_new();
+ cfg->fns_by_type = smartlist_new();
+ cfg->recv_by_msg = smartlist_new();
+ return cfg;
+}
+
+/** DOCDOC */
+#define ID_TO_VOID(id) ((void*)(uintptr_t)(id))
+
+/**
+ * Associate a message with a datatype. Return 0 on success, -1 if a
+ * different type was previously associated with the message ID.
+ **/
+int
+dcfg_msg_set_type(dispatch_cfg_t *cfg, message_id_t msg,
+ msg_type_id_t type)
+{
+ smartlist_grow(cfg->type_by_msg, msg+1);
+ void *oldval = smartlist_get(cfg->type_by_msg, msg);
+ if (oldval != NULL && oldval != ID_TO_VOID(type)) {
+ return -1;
+ }
+ smartlist_set(cfg->type_by_msg, msg, ID_TO_VOID(type));
+ return 0;
+}
+
+/**
+ * Associate a message with a channel. Return 0 on success, -1 if a
+ * different channel was previously associated with the message ID.
+ **/
+int
+dcfg_msg_set_chan(dispatch_cfg_t *cfg, message_id_t msg,
+ channel_id_t chan)
+{
+ smartlist_grow(cfg->chan_by_msg, msg+1);
+ void *oldval = smartlist_get(cfg->chan_by_msg, msg);
+ if (oldval != NULL && oldval != ID_TO_VOID(chan)) {
+ return -1;
+ }
+ smartlist_set(cfg->chan_by_msg, msg, ID_TO_VOID(chan));
+ return 0;
+}
+
+/**
+ * Associate a set of functions with a datatype. Return 0 on success, -1 if
+ * different functions were previously associated with the type.
+ **/
+int
+dcfg_type_set_fns(dispatch_cfg_t *cfg, msg_type_id_t type,
+ const dispatch_typefns_t *fns)
+{
+ smartlist_grow(cfg->fns_by_type, type+1);
+ dispatch_typefns_t *oldfns = smartlist_get(cfg->fns_by_type, type);
+ if (oldfns && (oldfns->free_fn != fns->free_fn ||
+ oldfns->fmt_fn != fns->fmt_fn))
+ return -1;
+ smartlist_set(cfg->fns_by_type, type, (dispatch_typefns_t *) fns);
+ return 0;
+}
+
+/**
+ * Associate a receiver with a message ID. Multiple receivers may be
+ * associated with a single messasge ID.
+ *
+ * Return 0 on success, on failure.
+ **/
+int
+dcfg_add_recv(dispatch_cfg_t *cfg, message_id_t msg,
+ subsys_id_t sys, recv_fn_t fn)
+{
+ smartlist_grow(cfg->recv_by_msg, msg+1);
+ smartlist_t *receivers = smartlist_get(cfg->recv_by_msg, msg);
+ if (!receivers) {
+ receivers = smartlist_new();
+ smartlist_set(cfg->recv_by_msg, msg, receivers);
+ }
+
+ dispatch_rcv_t *rcv = tor_malloc(sizeof(dispatch_rcv_t));
+ rcv->sys = sys;
+ rcv->enabled = true;
+ rcv->fn = fn;
+ smartlist_add(receivers, (void*)rcv);
+ return 0;
+}
+
+/** Helper: release all storage held by <b>cfg</b>. */
+void
+dcfg_free_(dispatch_cfg_t *cfg)
+{
+ if (!cfg)
+ return;
+
+ smartlist_free(cfg->type_by_msg);
+ smartlist_free(cfg->chan_by_msg);
+ smartlist_free(cfg->fns_by_type);
+ SMARTLIST_FOREACH_BEGIN(cfg->recv_by_msg, smartlist_t *, receivers) {
+ if (!receivers)
+ continue;
+ SMARTLIST_FOREACH(receivers, dispatch_rcv_t *, rcv, tor_free(rcv));
+ smartlist_free(receivers);
+ } SMARTLIST_FOREACH_END(receivers);
+ smartlist_free(cfg->recv_by_msg);
+
+ tor_free(cfg);
+}
diff --git a/src/lib/dispatch/dispatch_cfg.h b/src/lib/dispatch/dispatch_cfg.h
new file mode 100644
index 000000000..2c755e39b
--- /dev/null
+++ b/src/lib/dispatch/dispatch_cfg.h
@@ -0,0 +1,39 @@
+/* Copyright (c) 2001, Matej Pfajfar.
+ * Copyright (c) 2001-2004, Roger Dingledine.
+ * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
+ * Copyright (c) 2007-2018, The Tor Project, Inc. */
+/* See LICENSE for licensing information */
+
+#ifndef TOR_DISPATCH_CFG_H
+#define TOR_DISPATCH_CFG_H
+
+#include "lib/dispatch/msgtypes.h"
+
+/**
+ * A "dispatch_cfg" is the configuration used to set up a dispatcher.
+ * It is created and accessed with a set of dcfg_* functions, and then
+ * used with dispatcher_new() to make the dispatcher.
+ */
+typedef struct dispatch_cfg_t dispatch_cfg_t;
+
+dispatch_cfg_t *dcfg_new(void);
+
+int dcfg_msg_set_type(dispatch_cfg_t *cfg, message_id_t msg,
+ msg_type_id_t type);
+
+int dcfg_msg_set_chan(dispatch_cfg_t *cfg, message_id_t msg,
+ channel_id_t chan);
+
+int dcfg_type_set_fns(dispatch_cfg_t *cfg, msg_type_id_t type,
+ const dispatch_typefns_t *fns);
+
+int dcfg_add_recv(dispatch_cfg_t *cfg, message_id_t msg,
+ subsys_id_t sys, recv_fn_t fn);
+
+/** Free a dispatch_cfg_t. */
+#define dcfg_free(cfg) \
+ FREE_AND_NULL(dispatch_cfg_t, dcfg_free_, (cfg))
+
+void dcfg_free_(dispatch_cfg_t *cfg);
+
+#endif
diff --git a/src/lib/dispatch/dispatch_cfg_st.h b/src/lib/dispatch/dispatch_cfg_st.h
new file mode 100644
index 000000000..d004fe593
--- /dev/null
+++ b/src/lib/dispatch/dispatch_cfg_st.h
@@ -0,0 +1,25 @@
+/* Copyright (c) 2001, Matej Pfajfar.
+ * Copyright (c) 2001-2004, Roger Dingledine.
+ * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
+ * Copyright (c) 2007-2018, The Tor Project, Inc. */
+/* See LICENSE for licensing information */
+
+#ifndef TOR_DISPATCH_CFG_ST_H
+#define TOR_DISPATCH_CFG_ST_H
+
+struct smartlist_t;
+
+/* Information needed to create a dispatcher, but in a less efficient, more
+ * mutable format. */
+struct dispatch_cfg_t {
+ /** A list of msg_type_id_t (cast to void*), indexed by msg_t. */
+ struct smartlist_t *type_by_msg;
+ /** A list of channel_id_t (cast to void*), indexed by msg_t. */
+ struct smartlist_t *chan_by_msg;
+ /** A list of dispatch_rcv_t, indexed by msg_type_id_t. */
+ struct smartlist_t *fns_by_type;
+ /** A list of dispatch_typefns_t, indexed by msg_t. */
+ struct smartlist_t *recv_by_msg;
+};
+
+#endif
diff --git a/src/lib/dispatch/dispatch_core.c b/src/lib/dispatch/dispatch_core.c
new file mode 100644
index 000000000..24dfc649a
--- /dev/null
+++ b/src/lib/dispatch/dispatch_core.c
@@ -0,0 +1,234 @@
+/* Copyright (c) 2001, Matej Pfajfar.
+ * Copyright (c) 2001-2004, Roger Dingledine.
+ * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
+ * Copyright (c) 2007-2018, The Tor Project, Inc. */
+/* See LICENSE for licensing information */
+
+/**
+ * \file dispatch_core.c
+ * \brief Core module for sending and receiving messages.
+ */
+
+#define DISPATCH_PRIVATE
+#include "orconfig.h"
+
+#include "lib/dispatch/dispatch.h"
+#include "lib/dispatch/dispatch_st.h"
+
+#include "lib/malloc/malloc.h"
+#include "lib/log/util_bug.h"
+
+#include <string.h>
+
+/**
+ * Use <b>d</b> to drop all storage held for <b>msg</b>.
+ *
+ * (We need the dispatcher so we know how to free the auxiliary data.)
+ **/
+void
+dispatch_free_msg_(const dispatch_t *d, msg_t *msg)
+{
+ if (!msg)
+ return;
+
+ d->typefns[msg->type].free_fn(msg->aux_data__);
+ tor_free(msg);
+}
+
+/**
+ * Format the auxiliary data held by msg.
+ **/
+char *
+dispatch_fmt_msg_data(const dispatch_t *d, const msg_t *msg)
+{
+ if (!msg)
+ return NULL;
+
+ return d->typefns[msg->type].fmt_fn(msg->aux_data__);
+}
+
+/**
+ * Release all storage held by <b>d</b>.
+ **/
+void
+dispatch_free_(dispatch_t *d)
+{
+ if (d == NULL)
+ return;
+
+ size_t n_queues = d->n_queues;
+ for (size_t i = 0; i < n_queues; ++i) {
+ msg_t *m, *mtmp;
+ TOR_SIMPLEQ_FOREACH_SAFE(m, &d->queues[i].queue, next, mtmp) {
+ dispatch_free_msg(d, m);
+ }
+ }
+
+ size_t n_msgs = d->n_msgs;
+
+ for (size_t i = 0; i < n_msgs; ++i) {
+ tor_free(d->table[i]);
+ }
+ tor_free(d->table);
+ tor_free(d->typefns);
+ tor_free(d->queues);
+
+ // This is the only time we will treat d->cfg as non-const.
+ //dispatch_cfg_free_((dispatch_items_t *) d->cfg);
+
+ tor_free(d);
+}
+
+/**
+ * Tell the dispatcher to call <b>fn</b> with <b>userdata</b> whenever
+ * <b>chan</b> becomes nonempty. Return 0 on success, -1 on error.
+ **/
+int
+dispatch_set_alert_fn(dispatch_t *d, channel_id_t chan,
+ dispatch_alertfn_t fn, void *userdata)
+{
+ if (BUG(chan >= d->n_queues))
+ return -1;
+
+ dqueue_t *q = &d->queues[chan];
+ q->alert_fn = fn;
+ q->alert_fn_arg = userdata;
+ return 0;
+}
+
+/**
+ * Send a message on the appropriate channel notifying that channel if
+ * necessary.
+ *
+ * This function takes ownership of the auxiliary data; it can't be static or
+ * stack-allocated, and the caller is not allowed to use it afterwards.
+ *
+ * This function does not check the various vields of the message object for
+ * consistency.
+ **/
+int
+dispatch_send(dispatch_t *d,
+ subsys_id_t sender,
+ channel_id_t channel,
+ message_id_t msg,
+ msg_type_id_t type,
+ msg_aux_data_t auxdata)
+{
+ if (!d->table[msg]) {
+ /* Fast path: nobody wants this data. */
+
+ d->typefns[type].free_fn(auxdata);
+ return 0;
+ }
+
+ msg_t *m = tor_malloc(sizeof(msg_t));
+
+ m->sender = sender;
+ m->channel = channel;
+ m->msg = msg;
+ m->type = type;
+ memcpy(&m->aux_data__, &auxdata, sizeof(msg_aux_data_t));
+
+ return dispatch_send_msg(d, m);
+}
+
+int
+dispatch_send_msg(dispatch_t *d, msg_t *m)
+{
+ if (BUG(!d))
+ goto err;
+ if (BUG(!m))
+ goto err;
+ if (BUG(m->channel >= d->n_queues))
+ goto err;
+ if (BUG(m->msg >= d->n_msgs))
+ goto err;
+
+ dtbl_entry_t *ent = d->table[m->msg];
+ if (ent) {
+ if (BUG(m->type != ent->type))
+ goto err;
+ if (BUG(m->channel != ent->channel))
+ goto err;
+ }
+
+ return dispatch_send_msg_unchecked(d, m);
+ err:
+ /* Probably it isn't safe to free m, since type could be wrong. */
+ return -1;
+}
+
+/**
+ * Send a message on the appropriate queue, notifying that queue if necessary.
+ *
+ * This function takes ownership of the message object and its auxiliary data;
+ * it can't be static or stack-allocated, and the caller isn't allowed to use
+ * it afterwards.
+ *
+ * This function does not check the various fields of the message object for
+ * consistency, and can crash if they are out of range. Only functions that
+ * have already constructed the message in a safe way, or checked it for
+ * correctness themselves, should call this function.
+ **/
+int
+dispatch_send_msg_unchecked(dispatch_t *d, msg_t *m)
+{
+ /* Find the right queue. */
+ dqueue_t *q = &d->queues[m->channel];
+ bool was_empty = TOR_SIMPLEQ_EMPTY(&q->queue);
+
+ /* Append the message. */
+ TOR_SIMPLEQ_INSERT_TAIL(&q->queue, m, next);
+
+ /* If we just made the queue nonempty for the first time, call the alert
+ * function. */
+ if (was_empty) {
+ q->alert_fn(d, m->channel, q->alert_fn_arg);
+ }
+
+ return 0;
+}
+
+/**
+ * Run all of the callbacks on <b>d</b> associated with <b>m</b>.
+ **/
+static void
+dispatcher_run_msg_cbs(const dispatch_t *d, msg_t *m)
+{
+ tor_assert(m->msg <= d->n_msgs);
+ dtbl_entry_t *ent = d->table[m->msg];
+ int n_fns = ent->n_fns;
+
+ int i;
+ for (i=0; i < n_fns; ++i) {
+ if (ent->rcv[i].enabled)
+ ent->rcv[i].fn(m);
+ }
+}
+
+/**
+ * Run up to <b>max_msgs</b> callbacks for messages on the channel <b>ch</b>
+ * on the given dispatcher. Return 0 on success or recoverable failure,
+ * -1 on unrecoverable error.
+ **/
+int
+dispatch_flush(dispatch_t *d, channel_id_t ch, int max_msgs)
+{
+ if (BUG(ch >= d->n_queues))
+ return 0;
+
+ int n_flushed = 0;
+ dqueue_t *q = &d->queues[ch];
+
+ while (n_flushed < max_msgs) {
+ msg_t *m = TOR_SIMPLEQ_FIRST(&q->queue);
+ if (!m)
+ break;
+ TOR_SIMPLEQ_REMOVE_HEAD(&q->queue, next);
+ dispatcher_run_msg_cbs(d, m);
+ dispatch_free_msg(d, m);
+ ++n_flushed;
+ }
+
+ return 0;
+}
diff --git a/src/lib/dispatch/dispatch_new.c b/src/lib/dispatch/dispatch_new.c
new file mode 100644
index 000000000..a2879016a
--- /dev/null
+++ b/src/lib/dispatch/dispatch_new.c
@@ -0,0 +1,170 @@
+/* Copyright (c) 2001, Matej Pfajfar.
+ * Copyright (c) 2001-2004, Roger Dingledine.
+ * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
+ * Copyright (c) 2007-2018, The Tor Project, Inc. */
+/* See LICENSE for licensing information */
+
+/**
+ * \file dispatch_new.c
+ * \brief Code to construct a dispatch_t from a dispatch_cfg_t.
+ **/
+
+#define DISPATCH_PRIVATE
+#include "orconfig.h"
+
+#include "lib/dispatch/dispatch.h"
+#include "lib/dispatch/dispatch_st.h"
+#include "lib/dispatch/dispatch_cfg.h"
+#include "lib/dispatch/dispatch_cfg_st.h"
+
+#include "lib/intmath/cmp.h"
+#include "lib/malloc/malloc.h"
+#include "lib/log/util_bug.h"
+
+#include <string.h>
+
+/** Convert a void* in a smartlist to the corresponding integer. */
+#define VOID_TO_ID(p) ((intptr_t)(p))
+
+/** Given a smartlist full of void* fields encoding intptr_t values,
+ * return the largest intptr_t, or dflt if the list is empty. */
+static intptr_t
+max_in_sl(const smartlist_t *sl, intptr_t dflt)
+{
+ if (!smartlist_len(sl))
+ return dflt;
+ void *as_ptr = smartlist_get(sl, 0);
+ intptr_t max = VOID_TO_ID(as_ptr);
+ SMARTLIST_FOREACH_BEGIN(sl, void *, p) {
+ intptr_t i = VOID_TO_ID(p);
+ if (i > max)
+ max = i;
+ } SMARTLIST_FOREACH_END(p);
+ return max;
+}
+
+/** Helper: Format an unformattable message auxiliary data item: just return a
+* copy of the string <>. */
+static char *
+type_fmt_nop(msg_aux_data_t arg)
+{
+ (void)arg;
+ return tor_strdup("<>");
+}
+
+/** Helper: Free an unfreeable message auxiliary data item: do nothing. */
+static void
+type_free_nop(msg_aux_data_t arg)
+{
+ (void)arg;
+}
+
+/** Type functions to use when no type functions are provided. */
+static dispatch_typefns_t nop_typefns = {
+ .free_fn = type_free_nop,
+ .fmt_fn = type_fmt_nop
+};
+
+/**
+ * Alert function to use when none is configured: do nothing.
+ **/
+static void
+alert_fn_nop(dispatch_t *d, channel_id_t ch, void *arg)
+{
+ (void)d;
+ (void)ch;
+ (void)arg;
+}
+
+/**
+ * Given a list of recvfn_t, create and return a new dtbl_entry_t mapping
+ * to each of those functions.
+ **/
+static dtbl_entry_t *
+dtbl_entry_from_lst(smartlist_t *receivers)
+{
+ if (!receivers)
+ return NULL;
+
+ size_t n_recv = smartlist_len(receivers);
+ dtbl_entry_t *ent;
+ ent = tor_malloc_zero(offsetof(dtbl_entry_t, rcv) +
+ sizeof(dispatch_rcv_t) * n_recv);
+
+ ent->n_fns = n_recv;
+
+ SMARTLIST_FOREACH_BEGIN(receivers, const dispatch_rcv_t *, rcv) {
+ memcpy(&ent->rcv[rcv_sl_idx], rcv, sizeof(*rcv));
+ if (rcv->enabled) {
+ ++ent->n_enabled;
+ }
+ } SMARTLIST_FOREACH_END(rcv);
+
+ return ent;
+}
+
+/** Create and return a new dispatcher from a given dispatch_cfg_t. */
+dispatch_t *
+dispatch_new(const dispatch_cfg_t *cfg)
+{
+ dispatch_t *d = tor_malloc_zero(sizeof(dispatch_t));
+
+ /* Any message that has a type or a receiver counts towards our messages */
+ const size_t n_msgs = MAX(smartlist_len(cfg->type_by_msg),
+ smartlist_len(cfg->recv_by_msg)) + 1;
+
+ /* Any channel that any message has counts towards the number of channels. */
+ const size_t n_chans = (size_t) MAX(1, max_in_sl(cfg->chan_by_msg,0)) + 1;
+
+ /* Any type that a message has, or that has functions, counts towards
+ * the number of types. */
+ const size_t n_types = (size_t) MAX(max_in_sl(cfg->type_by_msg,0),
+ smartlist_len(cfg->fns_by_type)) + 1;
+
+ d->n_msgs = n_msgs;
+ d->n_queues = n_chans;
+ d->n_types = n_types;
+
+ /* Initialize the array of type-functions. */
+ d->typefns = tor_calloc(n_types, sizeof(dispatch_typefns_t));
+ for (size_t i = 0; i < n_types; ++i) {
+ /* Default to no-op for everything... */
+ memcpy(&d->typefns[i], &nop_typefns, sizeof(dispatch_typefns_t));
+ }
+ SMARTLIST_FOREACH_BEGIN(cfg->fns_by_type, dispatch_typefns_t *, fns) {
+ /* Set the functions if they are provided. */
+ if (fns) {
+ if (fns->free_fn)
+ d->typefns[fns_sl_idx].free_fn = fns->free_fn;
+ if (fns->fmt_fn)
+ d->typefns[fns_sl_idx].fmt_fn = fns->fmt_fn;
+ }
+ } SMARTLIST_FOREACH_END(fns);
+
+ /* Initialize the message queues: one for each channel. */
+ d->queues = tor_calloc(d->n_queues, sizeof(dqueue_t));
+ for (size_t i = 0; i < d->n_queues; ++i) {
+ TOR_SIMPLEQ_INIT(&d->queues[i].queue);
+ d->queues[i].alert_fn = alert_fn_nop;
+ }
+
+ /* Build the dispatch tables mapping message IDs to receivers. */
+ d->table = tor_calloc(d->n_msgs, sizeof(dtbl_entry_t *));
+ SMARTLIST_FOREACH_BEGIN(cfg->recv_by_msg, smartlist_t *, rcv) {
+ d->table[rcv_sl_idx] = dtbl_entry_from_lst(rcv);
+ } SMARTLIST_FOREACH_END(rcv);
+
+ /* Fill in the empty entries in the dispatch tables:
+ * types and channels for each message. */
+ SMARTLIST_FOREACH_BEGIN(cfg->type_by_msg, smartlist_t *, type) {
+ if (d->table[type_sl_idx])
+ d->table[type_sl_idx]->type = VOID_TO_ID(type);
+ } SMARTLIST_FOREACH_END(type);
+
+ SMARTLIST_FOREACH_BEGIN(cfg->chan_by_msg, smartlist_t *, chan) {
+ if (d->table[chan_sl_idx])
+ d->table[chan_sl_idx]->channel = VOID_TO_ID(chan);
+ } SMARTLIST_FOREACH_END(chan);
+
+ return d;
+}
diff --git a/src/lib/dispatch/dispatch_st.h b/src/lib/dispatch/dispatch_st.h
new file mode 100644
index 000000000..568107b70
--- /dev/null
+++ b/src/lib/dispatch/dispatch_st.h
@@ -0,0 +1,108 @@
+/* Copyright (c) 2001, Matej Pfajfar.
+ * Copyright (c) 2001-2004, Roger Dingledine.
+ * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
+ * Copyright (c) 2007-2018, The Tor Project, Inc. */
+/* See LICENSE for licensing information */
+
+/**
+ * \file dispatch_st.h
+ *
+ * \brief private structures used for the dispatcher module
+ */
+
+#ifndef TOR_DISPATCH_ST_H
+#define TOR_DISPATCH_ST_H
+
+#ifdef DISPATCH_PRIVATE
+
+#include "lib/container/smartlist.h"
+
+/**
+ * Information about the recipient of a message.
+ **/
+typedef struct dispatch_rcv_t {
+ /** The subsystem receiving a message. */
+ subsys_id_t sys;
+ /** True iff this recipient is enabled. */
+ bool enabled;
+ /** The function that will handle the message. */
+ recv_fn_t fn;
+} dispatch_rcv_t;
+
+/**
+ * Information used by a dispatcher to handle and dispatch a single message
+ * ID. It maps that message ID to its type, channel, and list of receiver
+ * functions.
+ *
+ * This structure is used when the dispatcher is running.
+ **/
+typedef struct dtbl_entry_t {
+ /** The number of enabled non-stub subscribers for this message.
+ *
+ * Note that for now, this will be the same as <b>n_fns</b>, since there is
+ * no way to turn these subscribers on an off yet. */
+ uint16_t n_enabled;
+ /** The channel that handles this message. */
+ channel_id_t channel;
+ /** The associated C type for this message. */
+ msg_type_id_t type;
+ /**
+ * The number of functions pointers for subscribers that receive this
+ * message, in rcv. */
+ uint16_t n_fns;
+ /**
+ * The recipients for this message.
+ */
+ dispatch_rcv_t rcv[FLEXIBLE_ARRAY_MEMBER];
+} dtbl_entry_t;
+
+/**
+ * A queue of messages for a given channel, used by a live dispatcher.
+ */
+typedef struct dqueue_t {
+ /** The queue of messages itself. */
+ TOR_SIMPLEQ_HEAD( , msg_t) queue;
+ /** A function to be called when the queue becomes nonempty. */
+ dispatch_alertfn_t alert_fn;
+ /** An argument for the alert_fn. */
+ void *alert_fn_arg;
+} dqueue_t ;
+
+/**
+ * A single dispatcher for cross-module messages.
+ */
+struct dispatch_t {
+ /**
+ * The length of <b>table</b>: the number of message IDs that this
+ * dispatcher can handle.
+ */
+ size_t n_msgs;
+ /**
+ * The length of <b>queues</b>: the number of channels that this dispatcher
+ * has configured.
+ */
+ size_t n_queues;
+ /**
+ * The length of <b>typefns</b>: the number of C type IDs that this
+ * dispatcher has configured.
+ */
+ size_t n_types;
+ /**
+ * An array of message queues, indexed by channel ID.
+ */
+ dqueue_t *queues;
+ /**
+ * An array of entries about how to handle particular message types, indexed
+ * by message ID.
+ */
+ dtbl_entry_t **table;
+ /**
+ * An array of function tables for manipulating types, index by message
+ * type ID.
+ **/
+ dispatch_typefns_t *typefns;
+};
+
+#endif
+
+#endif
diff --git a/src/lib/dispatch/include.am b/src/lib/dispatch/include.am
new file mode 100644
index 000000000..c4aa170db
--- /dev/null
+++ b/src/lib/dispatch/include.am
@@ -0,0 +1,23 @@
+
+noinst_LIBRARIES += src/lib/libtor-dispatch.a
+
+if UNITTESTS_ENABLED
+noinst_LIBRARIES += src/lib/libtor-dispatch-testing.a
+endif
+
+src_lib_libtor_dispatch_a_SOURCES = \
+ src/lib/dispatch/dispatch_cfg.c \
+ src/lib/dispatch/dispatch_core.c \
+ src/lib/dispatch/dispatch_new.c
+
+src_lib_libtor_dispatch_testing_a_SOURCES = \
+ $(src_lib_libtor_dispatch_a_SOURCES)
+src_lib_libtor_dispatch_testing_a_CPPFLAGS = $(AM_CPPFLAGS) $(TEST_CPPFLAGS)
+src_lib_libtor_dispatch_testing_a_CFLAGS = $(AM_CFLAGS) $(TEST_CFLAGS)
+
+noinst_HEADERS += \
+ src/lib/dispatch/dispatch.h \
+ src/lib/dispatch/dispatch_cfg.h \
+ src/lib/dispatch/dispatch_cfg_st.h \
+ src/lib/dispatch/dispatch_st.h \
+ src/lib/dispatch/msgtypes.h
diff --git a/src/lib/dispatch/msgtypes.h b/src/lib/dispatch/msgtypes.h
new file mode 100644
index 000000000..4e79e592a
--- /dev/null
+++ b/src/lib/dispatch/msgtypes.h
@@ -0,0 +1,80 @@
+/* Copyright (c) 2001, Matej Pfajfar.
+ * Copyright (c) 2001-2004, Roger Dingledine.
+ * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
+ * Copyright (c) 2007-2018, The Tor Project, Inc. */
+/* See LICENSE for licensing information */
+
+/**
+ * \file msgtypes.h
+ * \brief Types used for messages in the dispatcher code.
+ **/
+
+#ifndef TOR_DISPATCH_MSGTYPES_H
+#define TOR_DISPATCH_MSGTYPES_H
+
+#include <stdint.h>
+
+#include "ext/tor_queue.h"
+
+/**
+ * These types are aliases for subsystems, channels, and message IDs.
+ **/
+typedef uint16_t subsys_id_t;
+typedef uint16_t channel_id_t;
+typedef uint16_t message_id_t;
+
+/**
+ * This identifies a C type that can be sent along with a message.
+ **/
+typedef uint16_t msg_type_id_t;
+
+/**
+ * An ID value returned for *_type_t when none exists.
+ */
+#define ERROR_ID 65535
+
+/**
+ * Auxiliary (untyped) data sent along with a message.
+ *
+ * We define this as a union of a pointer and a u64, so that the integer
+ * types will have the same range across platforms.
+ **/
+typedef union {
+ void *ptr;
+ uint64_t u64;
+} msg_aux_data_t;
+
+/**
+ * Structure of a received message.
+ **/
+typedef struct msg_t {
+ TOR_SIMPLEQ_ENTRY(msg_t) next;
+ subsys_id_t sender;
+ channel_id_t channel;
+ message_id_t msg;
+ /** We could omit this field, since it is implicit in the message type, but
+ * IMO let's leave it in for safety. */
+ msg_type_id_t type;
+ /** Untyped auxiliary data. You shouldn't have to mess with this
+ * directly. */
+ msg_aux_data_t aux_data__;
+} msg_t;
+
+/**
+ * A function that a subscriber uses to receive a message.
+ **/
+typedef void (*recv_fn_t)(const msg_t *m);
+
+/**
+ * Table of functions to use for a given C type. Any omitted (NULL) functions
+ * will be treated as no-ops.
+ **/
+typedef struct dispatch_typefns_t {
+ /** Release storage held for the auxiliary data of this type. */
+ void (*free_fn)(msg_aux_data_t);
+ /** Format and return a newly allocated string describing the contents
+ * of this data element. */
+ char *(*fmt_fn)(msg_aux_data_t);
+} dispatch_typefns_t;
+
+#endif
diff --git a/src/test/include.am b/src/test/include.am
index c3827d3eb..d4e8eb4e8 100644
--- a/src/test/include.am
+++ b/src/test/include.am
@@ -124,6 +124,7 @@ src_test_test_SOURCES += \
src/test/test_dir.c \
src/test/test_dir_common.c \
src/test/test_dir_handle_get.c \
+ src/test/test_dispatch.c \
src/test/test_dos.c \
src/test/test_entryconn.c \
src/test/test_entrynodes.c \
diff --git a/src/test/test.c b/src/test/test.c
index 1230b632a..4c6d9775b 100644
--- a/src/test/test.c
+++ b/src/test/test.c
@@ -872,6 +872,7 @@ struct testgroup_t testgroups[] = {
{ "dir/voting/flags/", voting_flags_tests },
{ "dir/voting/schedule/", voting_schedule_tests },
{ "dir_handle_get/", dir_handle_get_tests },
+ { "dispatch/", dispatch_tests, },
{ "dns/", dns_tests },
{ "dos/", dos_tests },
{ "entryconn/", entryconn_tests },
diff --git a/src/test/test.h b/src/test/test.h
index 7a3a4d8fd..cd0ce4f6d 100644
--- a/src/test/test.h
+++ b/src/test/test.h
@@ -209,6 +209,7 @@ extern struct testcase_t crypto_openssl_tests[];
extern struct testcase_t crypto_tests[];
extern struct testcase_t dir_handle_get_tests[];
extern struct testcase_t dir_tests[];
+extern struct testcase_t dispatch_tests[];
extern struct testcase_t dns_tests[];
extern struct testcase_t dos_tests[];
extern struct testcase_t entryconn_tests[];
diff --git a/src/test/test_dispatch.c b/src/test/test_dispatch.c
new file mode 100644
index 000000000..ec704c042
--- /dev/null
+++ b/src/test/test_dispatch.c
@@ -0,0 +1,181 @@
+/* Copyright (c) 2018, The Tor Project, Inc. */
+/* See LICENSE for licensing information */
+
+#define DISPATCH_PRIVATE
+
+#include "test/test.h"
+
+#include "lib/dispatch/dispatch.h"
+#include "lib/dispatch/dispatch_cfg.h"
+#include "lib/dispatch/dispatch_st.h"
+#include "lib/dispatch/msgtypes.h"
+
+#include "lib/log/escape.h"
+#include "lib/malloc/malloc.h"
+#include "lib/string/printf.h"
+
+#include <stdio.h>
+#include <string.h>
+
+/* Construct an empty dispatch_t. */
+static void
+test_dispatch_empty(void *arg)
+{
+ (void)arg;
+
+ dispatch_t *d=NULL;
+ dispatch_cfg_t *cfg=NULL;
+
+ cfg = dcfg_new();
+ d = dispatch_new(cfg);
+ tt_assert(d);
+
+ done:
+ dispatch_free(d);
+ dcfg_free(cfg);
+}
+
+static int total_recv1_simple = 0;
+static int total_recv2_simple = 0;
+
+static void
+simple_recv1(const msg_t *m)
+{
+ total_recv1_simple += m->aux_data__.u64;
+}
+
+static void
+simple_recv2(const msg_t *m)
+{
+ total_recv2_simple += m->aux_data__.u64*10;
+}
+
+/* Construct a dispatch_t with two messages, make sure that they both get
+ * delivered. */
+static void
+test_dispatch_simple(void *arg)
+{
+ (void)arg;
+
+ dispatch_t *d=NULL;
+ dispatch_cfg_t *cfg=NULL;
+ int r;
+
+ cfg = dcfg_new();
+ r = dcfg_msg_set_type(cfg,0,0);
+ r += dcfg_msg_set_chan(cfg,0,0);
+ r += dcfg_add_recv(cfg,0,1,simple_recv1);
+ r += dcfg_msg_set_type(cfg,1,0);
+ r += dcfg_msg_set_chan(cfg,1,0);
+ r += dcfg_add_recv(cfg,1,1,simple_recv2);
+ r += dcfg_add_recv(cfg,1,1,simple_recv2); /* second copy */
+ tt_int_op(r, OP_EQ, 0);
+
+ d = dispatch_new(cfg);
+ tt_assert(d);
+
+ msg_aux_data_t data = {.u64 = 7};
+ r = dispatch_send(d, 99, 0, 0, 0, data);
+ tt_int_op(r, OP_EQ, 0);
+ tt_int_op(total_recv1_simple, OP_EQ, 0);
+
+ r = dispatch_flush(d, 0, INT_MAX);
+ tt_int_op(r, OP_EQ, 0);
+ tt_int_op(total_recv1_simple, OP_EQ, 7);
+ tt_int_op(total_recv2_simple, OP_EQ, 0);
+
+ total_recv1_simple = 0;
+ r = dispatch_send(d, 99, 0, 1, 0, data);
+ tt_int_op(r, OP_EQ, 0);
+ r = dispatch_flush(d, 0, INT_MAX);
+ tt_int_op(total_recv1_simple, OP_EQ, 0);
+ tt_int_op(total_recv2_simple, OP_EQ, 140);
+
+ done:
+ dispatch_free(d);
+ dcfg_free(cfg);
+}
+
+struct coord { int x; int y; };
+static void
+free_coord(msg_aux_data_t d)
+{
+ tor_free(d.ptr);
+}
+static char *
+fmt_coord(msg_aux_data_t d)
+{
+ char *v;
+ struct coord *c = d.ptr;
+ tor_asprintf(&v, "[%d, %d]", c->x, c->y);
+ return v;
+}
+static dispatch_typefns_t coord_fns = {
+ .fmt_fn = fmt_coord,
+ .free_fn = free_coord,
+};
+static void
+alert_run_immediate(dispatch_t *d, channel_id_t ch, void *arg)
+{
+ (void)arg;
+ dispatch_flush(d, ch, INT_MAX);
+}
+
+static dispatch_t *dispatcher_in_use=NULL;
+static char *received_data=NULL;
+
+static void
+recv_typed_data(const msg_t *m)
+{
+ tor_free(received_data);
+ received_data = dispatch_fmt_msg_data(dispatcher_in_use, m);
+}
+
+static void
+test_dispatch_with_types(void *arg)
+{
+ (void)arg;
+
+ dispatch_t *d=NULL;
+ dispatch_cfg_t *cfg=NULL;
+ int r;
+
+ cfg = dcfg_new();
+ r = dcfg_msg_set_type(cfg,5,3);
+ r += dcfg_msg_set_chan(cfg,5,2);
+ r += dcfg_add_recv(cfg,5,0,recv_typed_data);
+ r += dcfg_type_set_fns(cfg,3,&coord_fns);
+ tt_int_op(r, OP_EQ, 0);
+
+ d = dispatch_new(cfg);
+ tt_assert(d);
+ dispatcher_in_use = d;
+
+ /* Make this message get run immediately. */
+ r = dispatch_set_alert_fn(d, 2, alert_run_immediate, NULL);
+ tt_int_op(r, OP_EQ, 0);
+
+ struct coord *xy = tor_malloc(sizeof(*xy));
+ xy->x = 13;
+ xy->y = 37;
+ msg_aux_data_t data = {.ptr = xy};
+ r = dispatch_send(d, 99/*sender*/, 2/*channel*/, 5/*msg*/, 3/*type*/, data);
+ tt_int_op(r, OP_EQ, 0);
+ tt_str_op(received_data, OP_EQ, "[13, 37]");
+
+ done:
+ dispatch_free(d);
+ dcfg_free(cfg);
+ tor_free(received_data);
+ dispatcher_in_use = NULL;
+}
+
+#define T(name) \
+ { #name, test_dispatch_ ## name, TT_FORK, NULL, NULL }
+
+struct testcase_t dispatch_tests[] = {
+ T(empty),
+ T(simple),
+ T(with_types),
+ END_OF_TESTCASES
+};
1
0

[tor/master] Connect the mainloop pubsub dispatcher on startup; free it on shutdown.
by asn@torproject.org 27 Mar '19
by asn@torproject.org 27 Mar '19
27 Mar '19
commit 6d1abd37e27761bc3c92c398dbc03711fcf9e5c8
Author: Nick Mathewson <nickm(a)torproject.org>
Date: Mon Mar 11 16:02:38 2019 -0400
Connect the mainloop pubsub dispatcher on startup; free it on shutdown.
---
src/app/main/main.c | 12 ++++++++++++
1 file changed, 12 insertions(+)
diff --git a/src/app/main/main.c b/src/app/main/main.c
index ba2dfebd7..7bf9d3fe2 100644
--- a/src/app/main/main.c
+++ b/src/app/main/main.c
@@ -19,6 +19,7 @@
#include "core/mainloop/connection.h"
#include "core/mainloop/cpuworker.h"
#include "core/mainloop/mainloop.h"
+#include "core/mainloop/mainloop_pubsub.h"
#include "core/mainloop/netstatus.h"
#include "core/or/channel.h"
#include "core/or/channelpadding.h"
@@ -75,6 +76,7 @@
#include "lib/net/resolve.h"
#include "lib/process/waitpid.h"
+#include "lib/pubsub/pubsub_build.h"
#include "lib/meminfo/meminfo.h"
#include "lib/osinfo/uname.h"
@@ -807,6 +809,7 @@ tor_free_all(int postfork)
}
/* stuff in main.c */
+ tor_mainloop_disconnect_pubsub();
tor_mainloop_free_all();
if (!postfork) {
@@ -1407,6 +1410,15 @@ tor_run_main(const tor_main_configuration_t *tor_cfg)
}
}
#endif /* defined(NT_SERVICE) */
+
+ {
+ pubsub_builder_t *builder = pubsub_builder_new();
+ int r = subsystems_add_pubsub(builder);
+ tor_assert(r == 0);
+ r = tor_mainloop_connect_pubsub(builder); // consumes builder
+ tor_assert(r == 0);
+ }
+
{
int init_rv = tor_init(argc, argv);
if (init_rv) {
1
0
commit a7681525ab670c2b7a783f9e1285bf9a8e97d1ea
Author: Nick Mathewson <nickm(a)torproject.org>
Date: Tue Jan 15 10:46:01 2019 -0500
Add function to clear publish bindings.
When we clean up, we'd like to clear all the bindings that refer to
a dispatch_t, so that they don't have dangling pointers to it.
---
src/lib/pubsub/pubsub_build.c | 30 +++++++++++++++++++++++++-----
src/lib/pubsub/pubsub_build.h | 20 +++++++++++++++-----
src/lib/pubsub/pubsub_builder_st.h | 6 +++---
src/test/test_pubsub_msg.c | 2 +-
4 files changed, 44 insertions(+), 14 deletions(-)
diff --git a/src/lib/pubsub/pubsub_build.c b/src/lib/pubsub/pubsub_build.c
index 7e4ab5ba8..64cdcc1d5 100644
--- a/src/lib/pubsub/pubsub_build.c
+++ b/src/lib/pubsub/pubsub_build.c
@@ -242,10 +242,10 @@ pubsub_connector_define_type_(pubsub_connector_t *con,
* for <b>d</b>.
*/
static void
-dispatch_fill_pub_binding_backptrs(pubsub_builder_t *builder,
- dispatch_t *d)
+pubsub_items_install_bindings(pubsub_items_t *items,
+ dispatch_t *d)
{
- SMARTLIST_FOREACH_BEGIN(builder->items->items, pubsub_cfg_t *, cfg) {
+ SMARTLIST_FOREACH_BEGIN(items->items, pubsub_cfg_t *, cfg) {
if (cfg->pub_binding) {
// XXXX we could skip this for STUB publishers, and for any publishers
// XXXX where all subscribers are STUB.
@@ -255,12 +255,28 @@ dispatch_fill_pub_binding_backptrs(pubsub_builder_t *builder,
}
/**
+ * Remove the dispatch_ptr fields for all the relevant publish bindings
+ * in <b>items</b>. The prevents subsequent dispatch_pub_() calls from
+ * sending messages to a dispatcher that has been freed.
+ **/
+void
+pubsub_items_clear_bindings(pubsub_items_t *items)
+{
+ SMARTLIST_FOREACH_BEGIN(items->items, pubsub_cfg_t *, cfg) {
+ if (cfg->pub_binding) {
+ cfg->pub_binding->dispatch_ptr = NULL;
+ }
+ } SMARTLIST_FOREACH_END(cfg);
+}
+
+/**
* Create a new dispatcher as configured in a pubsub_builder_t.
*
* Consumes and frees its input.
**/
dispatch_t *
-pubsub_builder_finalize(pubsub_builder_t *builder)
+pubsub_builder_finalize(pubsub_builder_t *builder,
+ pubsub_items_t **items_out)
{
dispatch_t *dispatcher = NULL;
tor_assert_nonfatal(builder->n_connectors == 0);
@@ -276,7 +292,11 @@ pubsub_builder_finalize(pubsub_builder_t *builder)
if (!dispatcher)
goto err;
- dispatch_fill_pub_binding_backptrs(builder, dispatcher);
+ pubsub_items_install_bindings(builder->items, dispatcher);
+ if (items_out) {
+ *items_out = builder->items;
+ builder->items = NULL; /* Prevent free */
+ }
err:
pubsub_builder_free(builder);
diff --git a/src/lib/pubsub/pubsub_build.h b/src/lib/pubsub/pubsub_build.h
index 199eab219..d2920e021 100644
--- a/src/lib/pubsub/pubsub_build.h
+++ b/src/lib/pubsub/pubsub_build.h
@@ -33,6 +33,13 @@ typedef struct pubsub_builder_t pubsub_builder_t;
typedef struct pubsub_connector_t pubsub_connector_t;
/**
+ * A "pubsub items" holds the configuration items used to configure a
+ * pubsub_builder. After the builder is finalized, this field is extracted,
+ * and used later to tear down pointers that enable publishing.
+ **/
+typedef struct pubsub_items_t pubsub_items_t;
+
+/**
* Create a new pubsub_builder. This should only happen in the
* main-init code.
*/
@@ -75,13 +82,16 @@ void pubsub_connector_free_(pubsub_connector_t *);
* This should happen after every subsystem has initialized, and before
* entering the mainloop.
*/
-struct dispatch_t *pubsub_builder_finalize(pubsub_builder_t *);
+struct dispatch_t *pubsub_builder_finalize(pubsub_builder_t *,
+ pubsub_items_t **items_out);
+
+/**
+ * Clear all pub_binding_t backpointers in <b>items</b>.
+ **/
+void pubsub_items_clear_bindings(pubsub_items_t *items);
-#ifdef PUBSUB_PRIVATE
-struct pubsub_items_t;
#define pubsub_items_free(cfg) \
FREE_AND_NULL(pubsub_items_t, pubsub_items_free_, (cfg))
-void pubsub_items_free_(struct pubsub_items_t *cfg);
-#endif
+void pubsub_items_free_(pubsub_items_t *cfg);
#endif
diff --git a/src/lib/pubsub/pubsub_builder_st.h b/src/lib/pubsub/pubsub_builder_st.h
index a1cc6e718..cedeb02b1 100644
--- a/src/lib/pubsub/pubsub_builder_st.h
+++ b/src/lib/pubsub/pubsub_builder_st.h
@@ -91,12 +91,12 @@ typedef struct pubsub_type_cfg_t {
* The set of configuration requests for a dispatcher, as made by various
* subsystems.
**/
-typedef struct pubsub_items_t {
+struct pubsub_items_t {
/** List of pubsub_cfg_t. */
struct smartlist_t *items;
/** List of pubsub_type_cfg_t. */
struct smartlist_t *type_items;
-} pubsub_items_t;
+};
/**
* Type used to construct a dispatcher. We use this type to build up the
@@ -111,7 +111,7 @@ struct pubsub_builder_t {
int n_errors;
/** In-progress configuration that we're constructing, as a list of the
* requests that have been made. */
- pubsub_items_t *items;
+ struct pubsub_items_t *items;
/** In-progress configuration that we're constructing, in a form that can
* be converted to a dispatch_t. */
struct dispatch_cfg_t *cfg;
diff --git a/src/test/test_pubsub_msg.c b/src/test/test_pubsub_msg.c
index 5b771d45a..41a8a25ad 100644
--- a/src/test/test_pubsub_msg.c
+++ b/src/test/test_pubsub_msg.c
@@ -148,7 +148,7 @@ setup_dispatcher(const struct testcase_t *testcase)
tor_free(sys);
}
- return pubsub_builder_finalize(builder);
+ return pubsub_builder_finalize(builder, NULL);
}
static int
1
0
commit f5683d90be693ecf0561fe90803f5a54c7ed264d
Author: Nick Mathewson <nickm(a)torproject.org>
Date: Sun Jan 13 14:43:13 2019 -0500
Add a naming system for IDs in dispatch.
---
src/lib/dispatch/.may_include | 1 +
src/lib/dispatch/dispatch_naming.c | 63 ++++++++++++++++++++++++++++++++++++++
src/lib/dispatch/dispatch_naming.h | 46 ++++++++++++++++++++++++++++
src/lib/dispatch/include.am | 2 ++
4 files changed, 112 insertions(+)
diff --git a/src/lib/dispatch/.may_include b/src/lib/dispatch/.may_include
index 9b5373907..7f2df5859 100644
--- a/src/lib/dispatch/.may_include
+++ b/src/lib/dispatch/.may_include
@@ -2,6 +2,7 @@ orconfig.h
ext/tor_queue.h
+lib/cc/*.h
lib/container/*.h
lib/dispatch/*.h
lib/intmath/*.h
diff --git a/src/lib/dispatch/dispatch_naming.c b/src/lib/dispatch/dispatch_naming.c
new file mode 100644
index 000000000..83d9a2d60
--- /dev/null
+++ b/src/lib/dispatch/dispatch_naming.c
@@ -0,0 +1,63 @@
+/* Copyright (c) 2001, Matej Pfajfar.
+ * Copyright (c) 2001-2004, Roger Dingledine.
+ * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
+ * Copyright (c) 2007-2018, The Tor Project, Inc. */
+/* See LICENSE for licensing information */
+
+#include "orconfig.h"
+
+#include "lib/cc/compat_compiler.h"
+
+#include "lib/dispatch/dispatch_naming.h"
+#include "lib/dispatch/msgtypes.h"
+
+#include "lib/container/namemap.h"
+#include "lib/container/namemap_st.h"
+
+#include "lib/log/util_bug.h"
+#include "lib/log/log.h"
+
+#include <stdlib.h>
+
+/** Global namemap for message IDs. */
+static namemap_t message_id_map = NAMEMAP_INIT();
+/** Global namemap for subsystem IDs. */
+static namemap_t subsys_id_map = NAMEMAP_INIT();
+/** Global namemap for channel IDs. */
+static namemap_t channel_id_map = NAMEMAP_INIT();
+/** Global namemap for message type IDs. */
+static namemap_t msg_type_id_map = NAMEMAP_INIT();
+
+void
+dispatch_naming_init(void)
+{
+}
+
+/* Helper macro: declare functions to map IDs to and from names for a given
+ * type in a namemap_t.
+ */
+#define DECLARE_ID_MAP_FNS(type) \
+ type##_id_t \
+ get_##type##_id(const char *name) \
+ { \
+ unsigned u = namemap_get_or_create_id(&type##_id_map, name); \
+ tor_assert(u != NAMEMAP_ERR); \
+ tor_assert(u != ERROR_ID); \
+ return (type##_id_t) u; \
+ } \
+ const char * \
+ get_##type##_id_name(type##_id_t id) \
+ { \
+ return namemap_fmt_name(&type##_id_map, id); \
+ } \
+ size_t \
+ get_num_##type##_ids(void) \
+ { \
+ return namemap_get_size(&type##_id_map); \
+ } \
+ EAT_SEMICOLON
+
+DECLARE_ID_MAP_FNS(message);
+DECLARE_ID_MAP_FNS(channel);
+DECLARE_ID_MAP_FNS(subsys);
+DECLARE_ID_MAP_FNS(msg_type);
diff --git a/src/lib/dispatch/dispatch_naming.h b/src/lib/dispatch/dispatch_naming.h
new file mode 100644
index 000000000..c116d2184
--- /dev/null
+++ b/src/lib/dispatch/dispatch_naming.h
@@ -0,0 +1,46 @@
+/* Copyright (c) 2001, Matej Pfajfar.
+ * Copyright (c) 2001-2004, Roger Dingledine.
+ * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
+ * Copyright (c) 2007-2018, The Tor Project, Inc. */
+/* See LICENSE for licensing information */
+
+#ifndef TOR_DISPATCH_NAMING_H
+#define TOR_DISPATCH_NAMING_H
+
+#include "lib/dispatch/msgtypes.h"
+#include <stddef.h>
+
+/**
+ * Return an existing channel ID by name, allocating the channel ID if
+ * if necessary. Returns ERROR_ID if we have run out of
+ * channels
+ */
+channel_id_t get_channel_id(const char *);
+/**
+ * Return the name corresponding to a given channel ID.
+ **/
+const char *get_channel_id_name(channel_id_t);
+/**
+ * Return the total number of _named_ channel IDs.
+ **/
+size_t get_num_channel_ids(void);
+
+/* As above, but for messages. */
+message_id_t get_message_id(const char *);
+const char *get_message_id_name(message_id_t);
+size_t get_num_message_ids(void);
+
+/* As above, but for subsystems */
+subsys_id_t get_subsys_id(const char *);
+const char *get_subsys_id_name(subsys_id_t);
+size_t get_num_subsys_ids(void);
+
+/* As above, but for types. Note that types additionally must be
+ * "defined", if any message is to use them. */
+msg_type_id_t get_msg_type_id(const char *);
+const char *get_msg_type_id_name(msg_type_id_t);
+size_t get_num_msg_type_ids(void);
+
+void dispatch_naming_init(void);
+
+#endif
diff --git a/src/lib/dispatch/include.am b/src/lib/dispatch/include.am
index c4aa170db..4ec5b75cd 100644
--- a/src/lib/dispatch/include.am
+++ b/src/lib/dispatch/include.am
@@ -8,6 +8,7 @@ endif
src_lib_libtor_dispatch_a_SOURCES = \
src/lib/dispatch/dispatch_cfg.c \
src/lib/dispatch/dispatch_core.c \
+ src/lib/dispatch/dispatch_naming.c \
src/lib/dispatch/dispatch_new.c
src_lib_libtor_dispatch_testing_a_SOURCES = \
@@ -19,5 +20,6 @@ noinst_HEADERS += \
src/lib/dispatch/dispatch.h \
src/lib/dispatch/dispatch_cfg.h \
src/lib/dispatch/dispatch_cfg_st.h \
+ src/lib/dispatch/dispatch_naming.h \
src/lib/dispatch/dispatch_st.h \
src/lib/dispatch/msgtypes.h
1
0
commit 02e0a39d396ad4f86d27770a5cf6f46553a01b0c
Author: Nick Mathewson <nickm(a)torproject.org>
Date: Mon Mar 11 15:56:13 2019 -0400
Add msgtypes.h include to pubsub_build.h
(The header won't compile without it.)
---
src/lib/pubsub/pubsub_build.h | 2 ++
1 file changed, 2 insertions(+)
diff --git a/src/lib/pubsub/pubsub_build.h b/src/lib/pubsub/pubsub_build.h
index d2920e021..3fad25453 100644
--- a/src/lib/pubsub/pubsub_build.h
+++ b/src/lib/pubsub/pubsub_build.h
@@ -14,6 +14,8 @@
#ifndef TOR_PUBSUB_BUILD_H
#define TOR_PUBSUB_BUILD_H
+#include "lib/dispatch/msgtypes.h"
+
struct dispatch_t;
/**
1
0