commit bdeaf7d4b2929609c4d3f2ce9adfd973361ef578 Author: Nick Mathewson nickm@torproject.org Date: Tue Jan 15 10:27:39 2019 -0500
Code to manage publish/subscribe setup via subsystem interface.
This commit has the necessary logic to run the publish/subscribe system from the mainloop, and to initialize it on startup and tear it down later. --- src/app/main/subsysmgr.c | 52 ++++++++++++- src/app/main/subsysmgr.h | 5 ++ src/core/include.am | 2 + src/core/mainloop/mainloop_pubsub.c | 149 ++++++++++++++++++++++++++++++++++++ src/core/mainloop/mainloop_pubsub.h | 23 ++++++ src/lib/subsys/subsys.h | 4 +- 6 files changed, 232 insertions(+), 3 deletions(-)
diff --git a/src/app/main/subsysmgr.c b/src/app/main/subsysmgr.c index abd2edd10..91a567ce0 100644 --- a/src/app/main/subsysmgr.c +++ b/src/app/main/subsysmgr.c @@ -5,9 +5,14 @@
#include "orconfig.h" #include "app/main/subsysmgr.h" -#include "lib/err/torerr.h"
+#include "lib/dispatch/dispatch_naming.h" +#include "lib/dispatch/msgtypes.h" +#include "lib/err/torerr.h" #include "lib/log/log.h" +#include "lib/malloc/malloc.h" +#include "lib/pubsub/pubsub_build.h" +#include "lib/pubsub/pubsub_connect.h"
#include <stdio.h> #include <stdlib.h> @@ -106,6 +111,51 @@ subsystems_init_upto(int target_level) }
/** + * Add publish/subscribe relationships to <b>builder</b> for all + * initialized subsystems of level no more than <b>target_level</b>. + **/ +int +subsystems_add_pubsub_upto(pubsub_builder_t *builder, + int target_level) +{ + for (unsigned i = 0; i < n_tor_subsystems; ++i) { + const subsys_fns_t *sys = tor_subsystems[i]; + if (!sys->supported) + continue; + if (sys->level > target_level) + break; + if (! sys_initialized[i]) + continue; + int r = 0; + if (sys->add_pubsub) { + subsys_id_t sysid = get_subsys_id(sys->name); + raw_assert(sysid != ERROR_ID); + pubsub_connector_t *connector; + connector = pubsub_connector_for_subsystem(builder, sysid); + r = sys->add_pubsub(connector); + pubsub_connector_free(connector); + } + if (r < 0) { + fprintf(stderr, "BUG: subsystem %s (at %u) could not connect to " + "publish/subscribe system.", sys->name, sys->level); + raw_assert_unreached_msg("A subsystem couldn't be connected."); + } + } + + return 0; +} + +/** + * Add publish/subscribe relationships to <b>builder</b> for all + * initialized subsystems. + **/ +int +subsystems_add_pubsub(pubsub_builder_t *builder) +{ + return subsystems_add_pubsub_upto(builder, MAX_SUBSYS_LEVEL); +} + +/** * Shut down all the subsystems. **/ void diff --git a/src/app/main/subsysmgr.h b/src/app/main/subsysmgr.h index 4b3cad62a..4878cf8c3 100644 --- a/src/app/main/subsysmgr.h +++ b/src/app/main/subsysmgr.h @@ -14,6 +14,11 @@ extern const unsigned n_tor_subsystems; int subsystems_init(void); int subsystems_init_upto(int level);
+struct pubsub_builder_t; +int subsystems_add_pubsub_upto(struct pubsub_builder_t *builder, + int target_level); +int subsystems_add_pubsub(struct pubsub_builder_t *builder); + void subsystems_shutdown(void); void subsystems_shutdown_downto(int level);
diff --git a/src/core/include.am b/src/core/include.am index ae47c75e0..3a0e907ed 100644 --- a/src/core/include.am +++ b/src/core/include.am @@ -22,6 +22,7 @@ LIBTOR_APP_A_SOURCES = \ src/core/mainloop/connection.c \ src/core/mainloop/cpuworker.c \ src/core/mainloop/mainloop.c \ + src/core/mainloop/mainloop_pubsub.c \ src/core/mainloop/netstatus.c \ src/core/mainloop/periodic.c \ src/core/or/address_set.c \ @@ -213,6 +214,7 @@ noinst_HEADERS += \ src/core/mainloop/connection.h \ src/core/mainloop/cpuworker.h \ src/core/mainloop/mainloop.h \ + src/core/mainloop/mainloop_pubsub.h \ src/core/mainloop/netstatus.h \ src/core/mainloop/periodic.h \ src/core/or/addr_policy_st.h \ diff --git a/src/core/mainloop/mainloop_pubsub.c b/src/core/mainloop/mainloop_pubsub.c new file mode 100644 index 000000000..ab3614ae0 --- /dev/null +++ b/src/core/mainloop/mainloop_pubsub.c @@ -0,0 +1,149 @@ +/* Copyright (c) 2001, Matej Pfajfar. + * Copyright (c) 2001-2004, Roger Dingledine. + * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson. + * Copyright (c) 2007-2018, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +#include "orconfig.h" + +#include "src/core/or/or.h" +#include "src/core/mainloop/mainloop.h" +#include "src/core/mainloop/mainloop_pubsub.h" + +#include "lib/container/smartlist.h" +#include "lib/dispatch/dispatch.h" +#include "lib/dispatch/dispatch_naming.h" +#include "lib/evloop/compat_libevent.h" +#include "lib/pubsub/pubsub.h" +#include "lib/pubsub/pubsub_build.h" + +/** + * Dispatcher to use for delivering messages. + **/ +static dispatch_t *the_dispatcher = NULL; +static pubsub_items_t *the_pubsub_items = NULL; +/** + * A list of mainloop_event_t, indexed by channel ID, to flush the messages + * on a channel. + **/ +static smartlist_t *alert_events = NULL; + +/** + * Mainloop event callback: flush all the messages in a channel. + * + * The channel is encoded as a pointer, and passed via arg. + **/ +static void +flush_channel_event(mainloop_event_t *ev, void *arg) +{ + (void)ev; + if (!the_dispatcher) + return; + + channel_id_t chan = (channel_id_t)(uintptr_t)(arg); + dispatch_flush(the_dispatcher, chan, INT_MAX); +} + +int +tor_mainloop_connect_pubsub(struct pubsub_builder_t *builder) +{ + int rv = -1; + tor_mainloop_disconnect_pubsub(); + + the_dispatcher = pubsub_builder_finalize(builder, &the_pubsub_items); + if (! the_dispatcher) + goto err; + + const size_t num_channels = get_num_channel_ids(); + alert_events = smartlist_new(); + for (size_t i = 0; i < num_channels; ++i) { + smartlist_add(alert_events, + mainloop_event_postloop_new(flush_channel_event, + (void*)(uintptr_t)(i))); + } + + rv = 0; + err: + tor_mainloop_disconnect_pubsub(); + return rv; +} + +/** + * Dispatch alertfn callback: do nothing. Implements DELIV_NEVER. + **/ +static void +alertfn_never(dispatch_t *d, channel_id_t chan, void *arg) +{ + (void)d; + (void)chan; + (void)arg; +} + +/** + * Dispatch alertfn callback: activate a mainloop event. Implements + * DELIV_PROMPT. + **/ +static void +alertfn_prompt(dispatch_t *d, channel_id_t chan, void *arg) +{ + (void)d; + (void)chan; + mainloop_event_t *event = arg; + mainloop_event_activate(event); +} + +/** + * Dispatch alertfn callback: flush all messages right now. Implements + * DELIV_IMMEDIATE. + **/ +static void +alertfn_immediate(dispatch_t *d, channel_id_t chan, void *arg) +{ + (void) arg; + dispatch_flush(d, chan, INT_MAX); +} + +/** + * Set the strategy to be used for delivering messages on the named channel. + **/ +int +tor_mainloop_set_delivery_strategy(const char *msg_channel_name, + deliv_strategy_t strategy) +{ + channel_id_t chan = get_channel_id(msg_channel_name); + if (BUG(chan == ERROR_ID) || + BUG(chan >= smartlist_len(alert_events))) + return -1; + + switch (strategy) { + case DELIV_NEVER: + dispatch_set_alert_fn(the_dispatcher, chan, alertfn_never, NULL); + break; + case DELIV_PROMPT: + dispatch_set_alert_fn(the_dispatcher, chan, alertfn_prompt, + smartlist_get(alert_events, chan)); + break; + case DELIV_IMMEDIATE: + dispatch_set_alert_fn(the_dispatcher, chan, alertfn_immediate, NULL); + break; + } + return 0; +} + +/** + * Remove all pubsub dispatchers and events from the mainloop. + **/ +void +tor_mainloop_disconnect_pubsub(void) +{ + if (the_pubsub_items) { + pubsub_items_clear_bindings(the_pubsub_items); + pubsub_items_free(the_pubsub_items); + } + if (alert_events) { + SMARTLIST_FOREACH(alert_events, mainloop_event_t *, ev, + mainloop_event_free(ev)); + smartlist_free(alert_events); + } + dispatch_free(the_dispatcher); +} diff --git a/src/core/mainloop/mainloop_pubsub.h b/src/core/mainloop/mainloop_pubsub.h new file mode 100644 index 000000000..6eff77842 --- /dev/null +++ b/src/core/mainloop/mainloop_pubsub.h @@ -0,0 +1,23 @@ +/* Copyright (c) 2001, Matej Pfajfar. + * Copyright (c) 2001-2004, Roger Dingledine. + * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson. + * Copyright (c) 2007-2018, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +#ifndef TOR_MAINLOOP_PUBSUB_H +#define TOR_MAINLOOP_PUBSUB_H + +struct pubsub_builder_t; + +typedef enum { + DELIV_NEVER=0, + DELIV_PROMPT, + DELIV_IMMEDIATE, +} deliv_strategy_t; + +int tor_mainloop_connect_pubsub(struct pubsub_builder_t *builder); +int tor_mainloop_set_delivery_strategy(const char *msg_channel_name, + deliv_strategy_t strategy); +void tor_mainloop_disconnect_pubsub(void); + +#endif diff --git a/src/lib/subsys/subsys.h b/src/lib/subsys/subsys.h index 2452ec6e2..6f1710c71 100644 --- a/src/lib/subsys/subsys.h +++ b/src/lib/subsys/subsys.h @@ -8,7 +8,7 @@
#include <stdbool.h>
-struct dispatch_connector_t; +struct pubsub_connector_t;
/** * A subsystem is a part of Tor that is initialized, shut down, configured, @@ -58,7 +58,7 @@ typedef struct subsys_fns_t { /** * Connect a subsystem to the message dispatch system. **/ - int (*add_pubsub)(struct dispatch_connector_t *); + int (*add_pubsub)(struct pubsub_connector_t *);
/** * Perform any necessary pre-fork cleanup. This function may not fail.