[tor-commits] [tor/master] Basic work on a publish/subscribe abstraction

nickm at torproject.org nickm at torproject.org
Wed May 11 17:26:34 UTC 2016


commit 80a6c8caa3910b18ce50ef870ef0af546c64faa2
Author: Nick Mathewson <nickm at torproject.org>
Date:   Thu Mar 24 11:15:19 2016 -0400

    Basic work on a publish/subscribe abstraction
    
    The goal here is to provide a way to decouple pieces of the code
    that want to learn "when something happens" from those that realize
    that it has happened.
    
    The implementation here consists of a generic backend, plus a set of
    macros to define and implement a set of type-safe frontends.
---
 src/common/include.am  |   2 +
 src/common/pubsub.c    | 129 +++++++++++++++++++++++++++++++++++
 src/common/pubsub.h    | 177 +++++++++++++++++++++++++++++++++++++++++++++++++
 src/test/include.am    |   1 +
 src/test/test.c        |   2 +
 src/test/test_pubsub.c |  85 ++++++++++++++++++++++++
 6 files changed, 396 insertions(+)

diff --git a/src/common/include.am b/src/common/include.am
index 5afb30d..05342e9 100644
--- a/src/common/include.am
+++ b/src/common/include.am
@@ -67,6 +67,7 @@ LIBOR_A_SOURCES = \
   src/common/di_ops.c					\
   src/common/log.c					\
   src/common/memarea.c					\
+  src/common/pubsub.c					\
   src/common/util.c					\
   src/common/util_format.c				\
   src/common/util_process.c				\
@@ -132,6 +133,7 @@ COMMONHEADERS = \
   src/common/memarea.h				\
   src/common/linux_syscalls.inc			\
   src/common/procmon.h				\
+  src/common/pubsub.h				\
   src/common/sandbox.h				\
   src/common/testsupport.h			\
   src/common/torgzip.h				\
diff --git a/src/common/pubsub.c b/src/common/pubsub.c
new file mode 100644
index 0000000..98ec3f8
--- /dev/null
+++ b/src/common/pubsub.c
@@ -0,0 +1,129 @@
+/* Copyright (c) 2016, The Tor Project, Inc. */
+/* See LICENSE for licensing information */
+
+/**
+ * \file pubsub.c
+ *
+ * \brief DOCDOC
+ */
+
+#include "orconfig.h"
+#include "pubsub.h"
+#include "container.h"
+
+/** Helper: insert <b>s</b> into <b>topic's</b> list of subscribers, keeping
+ * them sorted in priority order. */
+static void
+subscriber_insert(pubsub_topic_t *topic, pubsub_subscriber_t *s)
+{
+  int i;
+  smartlist_t *sl = topic->subscribers;
+  for (i = 0; i < smartlist_len(sl); ++i) {
+    pubsub_subscriber_t *other = smartlist_get(sl, i);
+    if (s->priority < other->priority) {
+      break;
+    }
+  }
+  smartlist_insert(sl, i, s);
+}
+
+/**
+ * Add a new subscriber to <b>topic</b>, where (when an event is triggered),
+ * we'll notify the function <b>fn</b> by passing it <b>subscriber_data</b>.
+ * Return a handle to the subscribe which can later be passed to
+ * pubsub_unsubscribe_().
+ *
+ * Functions are called in priority order, from lowest to highest.
+ *
+ * See pubsub.h for <b>subscribe_flags</b>.
+ */
+const pubsub_subscriber_t *
+pubsub_subscribe_(pubsub_topic_t *topic,
+                  pubsub_subscriber_fn_t fn,
+                  void *subscriber_data,
+                  unsigned subscribe_flags,
+                  unsigned priority)
+{
+  tor_assert(! topic->locked);
+  if (subscribe_flags & SUBSCRIBE_ATSTART) {
+    tor_assert(topic->n_events_fired == 0);
+  }
+  pubsub_subscriber_t *r = tor_malloc_zero(sizeof(r));
+  r->priority = priority;
+  r->subscriber_flags = subscribe_flags;
+  r->fn = fn;
+  r->subscriber_data = subscriber_data;
+  if (topic->subscribers == NULL) {
+    topic->subscribers = smartlist_new();
+  }
+  subscriber_insert(topic, r);
+  return r;
+}
+
+/**
+ * Remove the subscriber <b>s</b> from <b>topic</b>.  After calling this
+ * function, <b>s</b> may no longer be used.
+ */
+int
+pubsub_unsubscribe_(pubsub_topic_t *topic,
+                    const pubsub_subscriber_t *s)
+{
+  tor_assert(! topic->locked);
+  smartlist_t *sl = topic->subscribers;
+  if (sl == NULL)
+    return -1;
+  int i = smartlist_pos(sl, s);
+  if (i == -1)
+    return -1;
+  pubsub_subscriber_t *tmp = smartlist_get(sl, i);
+  tor_assert(tmp == s);
+  smartlist_del_keeporder(sl, i);
+  tor_free(tmp);
+  return 0;
+}
+
+/**
+ * For every subscriber s in <b>topic</b>, invoke notify_fn on s and
+ * event_data.  Return 0 if there were no nonzero return values, and -1 if
+ * there were any.
+ */
+int
+pubsub_notify_(pubsub_topic_t *topic, pubsub_notify_fn_t notify_fn,
+               void *event_data, unsigned notify_flags)
+{
+  tor_assert(! topic->locked);
+  (void) notify_flags;
+  smartlist_t *sl = topic->subscribers;
+  int n_bad = 0;
+  ++topic->n_events_fired;
+  if (sl == NULL)
+    return -1;
+  topic->locked = 1;
+  SMARTLIST_FOREACH_BEGIN(sl, pubsub_subscriber_t *, s) {
+    int r = notify_fn(s, event_data);
+    if (r != 0)
+      ++n_bad;
+  } SMARTLIST_FOREACH_END(s);
+  topic->locked = 0;
+  return (n_bad == 0) ? 0 : -1;
+}
+
+/**
+ * Release all storage held by <b>topic</b>.
+ */
+void
+pubsub_clear_(pubsub_topic_t *topic)
+{
+  tor_assert(! topic->locked);
+
+  smartlist_t *sl = topic->subscribers;
+  if (sl == NULL)
+    return;
+  SMARTLIST_FOREACH_BEGIN(sl, pubsub_subscriber_t *, s) {
+    tor_free(s);
+  } SMARTLIST_FOREACH_END(s);
+  smartlist_free(sl);
+  topic->subscribers = NULL;
+  topic->n_events_fired = 0;
+}
+
diff --git a/src/common/pubsub.h b/src/common/pubsub.h
new file mode 100644
index 0000000..09e492e
--- /dev/null
+++ b/src/common/pubsub.h
@@ -0,0 +1,177 @@
+/* Copyright (c) 2016, The Tor Project, Inc. */
+/* See LICENSE for licensing information */
+
+/**
+ * \file pubsub.h
+ * \brief Macros to implement publish/subscribe abstractions.
+ *
+ * To use these macros, call DECLARE_PUBSUB_TOPIC() with an identifier to use
+ * as your topic.  Below, I'm going to assume you say DECLARE_PUBSUB_TOPIC(T).
+ *
+ * Doing this will declare the following types:
+ *   typedef struct T_event_data_t T_event_data_t; // you define this struct
+ *   typedef struct T_subscriber_data_t T_subscriber_data_t; // this one too.
+ *   typedef struct T_subscriber_t T_subscriber_t; // opaque
+ *   typedef int (*T_subscriber_fn_t)(T_event_data_t*, T_subscriber_data_t*);
+ *
+ * and it will declare the following functions:
+ *     const T_subscriber_t *T_subscribe(T_subscriber_fn_t,
+ *                                       T_subscriber_data_t *,
+ *                                       unsigned flags,
+ *                                       unsigned priority);
+ *     int T_unsubscribe(const T_subscriber_t *)
+ *
+ * Elsewhere you can say DECLARE_NOTIFY_PUBSUB_TOPIC(static, T), which declares:
+ *    static int T_notify(T_event_data_t *, unsigned notify_flags);
+ *    static void T_clear(void);
+ *
+ * And in some C file, you would define these functions with:
+ *    IMPLEMENT_PUBSUB_TOPIC(static, T).
+ *
+ * The implementations will be small typesafe wrappers over generic versions
+ * of the above functions.
+ *
+ * To use the typesafe functions, you add any number of subscribers with
+ * T_subscribe().  Each has an associated function pointer, data pointer,
+ * and priority. Later, you can invoke T_notify() to declare that the
+ * event has occurred. Each of the subscribers will be invoked once.
+ **/
+
+#ifndef TOR_PUBSUB_H
+#define TOR_PUBSUB_H
+
+#include "torint.h"
+
+/**
+ * Flag for T_subscribe: die with an assertion failure if the event
+ * have ever been published before.  Used when a subscriber must absolutely
+ * never have missed an event.
+ */
+#define SUBSCRIBE_ATSTART (1u<<0)
+
+#define DECLARE_PUBSUB_STRUCT_TYPES(name)                               \
+  /* You define this type. */                                           \
+  typedef struct name ## _event_data_t name ## _event_data_t;           \
+  /* You define this type. */                                           \
+  typedef struct name ## _subscriber_data_t name ## _subscriber_data_t;
+
+#define DECLARE_PUBSUB_TOPIC(name)                                      \
+  /* This type is opaque. */                                            \
+  typedef struct name ## _subscriber_t name ## _subscriber_t;           \
+  /* You declare functions matching this type. */                       \
+  typedef int (*name ## _subscriber_fn_t)(                              \
+                                    name ## _event_data_t *data,        \
+                                    name ## _subscriber_data_t *extra); \
+  /* Call this function to subscribe to a topic. */                     \
+  const name ## _subscriber_t *name ## _subscribe(                      \
+                         name##_subscriber_fn_t subscriber,             \
+                         name##_subscriber_data_t *extra_data,          \
+                         unsigned flags,                                \
+                         unsigned priority);                            \
+  /* Call this function to unsubscribe from a topic. */                 \
+  int name ## _unsubscribe(const name##_subscriber_t *s);
+
+#define DECLARE_NOTIFY_PUBSUB_TOPIC(linkage, name)                          \
+  /* Call this function to notify all subscribers. Flags not yet used. */   \
+  linkage int name ## _notify(name ## _event_data_t *data, unsigned flags); \
+  /* Call this function to release storage held by the topic. */            \
+  linkage void name ## _clear(void);
+
+/**
+ * Type used to hold a generic function for a subscriber.
+ *
+ * [Yes, it is safe to cast to this, so long as we cast back to the original
+ * type before calling.  From C99: "A pointer to a function of one type may be
+ * converted to a pointer to a function of another type and back again; the
+ * result shall compare equal to the original pointer."]
+*/
+typedef int (*pubsub_subscriber_fn_t)(void *, void *);
+
+/**
+ * Helper type to implement pubsub abstraction. Don't use this directly.
+ * It represents a subscriber.
+ */
+typedef struct pubsub_subscriber_t {
+  /** Function to invoke when the event triggers. */
+  pubsub_subscriber_fn_t fn;
+  /** Data associated with this subscriber. */
+  void *subscriber_data;
+  /** Priority for this subscriber. Low priorities happen first. */
+  unsigned priority;
+  /** Flags set on this subscriber. Not yet used.*/
+  unsigned subscriber_flags;
+} pubsub_subscriber_t;
+
+/**
+ * Helper type to implement pubsub abstraction. Don't use this directly.
+ * It represents a topic, and keeps a record of subscribers.
+ */
+typedef struct pubsub_topic_t {
+  /** List of subscribers to this topic. May be NULL. */
+  struct smartlist_t *subscribers;
+  /** Total number of times that pubsub_notify_() has ever been called on this
+   * topic. */
+  uint64_t n_events_fired;
+  /** True iff we're running 'notify' on this topic, and shouldn't allow
+   * any concurrent modifications or events. */
+  unsigned locked;
+} pubsub_topic_t;
+
+const pubsub_subscriber_t *pubsub_subscribe_(pubsub_topic_t *topic,
+                                             pubsub_subscriber_fn_t fn,
+                                             void *subscriber_data,
+                                             unsigned subscribe_flags,
+                                             unsigned priority);
+int pubsub_unsubscribe_(pubsub_topic_t *topic, const pubsub_subscriber_t *sub);
+void pubsub_clear_(pubsub_topic_t *topic);
+typedef int (*pubsub_notify_fn_t)(pubsub_subscriber_t *subscriber,
+                                  void *notify_data);
+int pubsub_notify_(pubsub_topic_t *topic, pubsub_notify_fn_t notify_fn,
+                   void *notify_data, unsigned notify_flags);
+
+#define IMPLEMENT_PUBSUB_TOPIC(notify_linkage, name)                    \
+  static pubsub_topic_t name ## _topic_ = { NULL, 0, 0 };               \
+  const name ## _subscriber_t *                                         \
+  name ## _subscribe(name##_subscriber_fn_t subscriber,                 \
+                     name##_subscriber_data_t *extra_data,              \
+                     unsigned flags,                                    \
+                     unsigned priority)                                 \
+  {                                                                     \
+    const pubsub_subscriber_t *s;                                       \
+    s = pubsub_subscribe_(&name##_topic_,                               \
+                          (pubsub_subscriber_fn_t)subscriber,           \
+                          extra_data,                                   \
+                          flags,                                        \
+                          priority);                                    \
+    return (const name##_subscriber_t *)s;                              \
+  }                                                                     \
+  int                                                                   \
+  name ## _unsubscribe(const name##_subscriber_t *subscriber)           \
+  {                                                                     \
+    return pubsub_unsubscribe_(&name##_topic_,                          \
+                               (const pubsub_subscriber_t *)subscriber); \
+  }                                                                     \
+  static int                                                            \
+  name##_call_the_notify_fn_(pubsub_subscriber_t *subscriber,           \
+                             void *notify_data)                         \
+  {                                                                     \
+    name ## _subscriber_fn_t fn;                                        \
+    fn = (name ## _subscriber_fn_t) subscriber->fn;                     \
+    return fn(notify_data, subscriber->subscriber_data);                \
+  }                                                                     \
+  notify_linkage int                                                    \
+  name ## _notify(name ## _event_data_t *event_data, unsigned flags)    \
+  {                                                                     \
+    return pubsub_notify_(&name##_topic_,                               \
+                          name##_call_the_notify_fn_,                   \
+                          event_data,                                   \
+                          flags);                                       \
+  }                                                                     \
+  notify_linkage void                                                   \
+  name ## _clear(void)                                                  \
+  {                                                                     \
+    pubsub_clear_(&name##_topic_);                                      \
+  }
+
+#endif /* TOR_PUBSUB_H */
+
diff --git a/src/test/include.am b/src/test/include.am
index 7d80fdf..f1f9047 100644
--- a/src/test/include.am
+++ b/src/test/include.am
@@ -97,6 +97,7 @@ src_test_test_SOURCES = \
 	src/test/test_policy.c \
 	src/test/test_procmon.c \
 	src/test/test_pt.c \
+	src/test/test_pubsub.c \
 	src/test/test_relay.c \
 	src/test/test_relaycell.c \
 	src/test/test_rendcache.c \
diff --git a/src/test/test.c b/src/test/test.c
index ed167a3..dcbaf5b 100644
--- a/src/test/test.c
+++ b/src/test/test.c
@@ -1159,6 +1159,7 @@ extern struct testcase_t oom_tests[];
 extern struct testcase_t options_tests[];
 extern struct testcase_t policy_tests[];
 extern struct testcase_t procmon_tests[];
+extern struct testcase_t pubsub_tests[];
 extern struct testcase_t pt_tests[];
 extern struct testcase_t relay_tests[];
 extern struct testcase_t relaycell_tests[];
@@ -1230,6 +1231,7 @@ struct testgroup_t testgroups[] = {
   { "util/format/", util_format_tests },
   { "util/logging/", logging_tests },
   { "util/process/", util_process_tests },
+  { "util/pubsub/", pubsub_tests },
   { "util/thread/", thread_tests },
   { "dns/", dns_tests },
   END_OF_GROUPS
diff --git a/src/test/test_pubsub.c b/src/test/test_pubsub.c
new file mode 100644
index 0000000..547d6c6
--- /dev/null
+++ b/src/test/test_pubsub.c
@@ -0,0 +1,85 @@
+/* Copyright (c) 2016, The Tor Project, Inc. */
+/* See LICENSE for licensing information */
+
+/**
+ * \file test_pubsub.c
+ * \brief Unit tests for publish-subscribe abstraction.
+ **/
+
+#include "or.h"
+#include "test.h"
+#include "pubsub.h"
+
+DECLARE_PUBSUB_STRUCT_TYPES(foobar)
+DECLARE_PUBSUB_TOPIC(foobar)
+DECLARE_NOTIFY_PUBSUB_TOPIC(static, foobar)
+IMPLEMENT_PUBSUB_TOPIC(static, foobar)
+
+struct foobar_event_data_t {
+  unsigned u;
+  const char *s;
+};
+
+struct foobar_subscriber_data_t {
+  const char *name;
+  long l;
+};
+
+static int
+foobar_sub1(foobar_event_data_t *ev, foobar_subscriber_data_t *mine)
+{
+  ev->u += 10;
+  mine->l += 100;
+  return 0;
+}
+
+static int
+foobar_sub2(foobar_event_data_t *ev, foobar_subscriber_data_t *mine)
+{
+  ev->u += 5;
+  mine->l += 50;
+  return 0;
+}
+
+static void
+test_pubsub_basic(void *arg)
+{
+  (void)arg;
+  foobar_subscriber_data_t subdata1 = { "hi", 0 };
+  foobar_subscriber_data_t subdata2 = { "wow", 0 };
+  const foobar_subscriber_t *sub1;
+  const foobar_subscriber_t *sub2;
+  foobar_event_data_t ed = { 0, "x" };
+  foobar_event_data_t ed2 = { 0, "y" };
+  sub1 = foobar_subscribe(foobar_sub1, &subdata1, SUBSCRIBE_ATSTART, 100);
+  tt_assert(sub1);
+
+  foobar_notify(&ed, 0);
+  tt_int_op(subdata1.l, OP_EQ, 100);
+  tt_int_op(subdata2.l, OP_EQ, 0);
+  tt_int_op(ed.u, OP_EQ, 10);
+
+  sub2 = foobar_subscribe(foobar_sub2, &subdata2, 0, 5);
+  tt_assert(sub2);
+
+  foobar_notify(&ed2, 0);
+  tt_int_op(subdata1.l, OP_EQ, 200);
+  tt_int_op(subdata2.l, OP_EQ, 50);
+  tt_int_op(ed2.u, OP_EQ, 15);
+
+  foobar_unsubscribe(sub1);
+
+  foobar_notify(&ed, 0);
+  tt_int_op(subdata1.l, OP_EQ, 200);
+  tt_int_op(subdata2.l, OP_EQ, 100);
+  tt_int_op(ed.u, OP_EQ, 15);
+
+ done:
+  foobar_clear();
+}
+
+struct testcase_t pubsub_tests[] = {
+  { "pubsub_basic", test_pubsub_basic, TT_FORK, NULL, NULL },
+  END_OF_TESTCASES
+};
+





More information about the tor-commits mailing list