commit 9ec94f1d22a87fbf86f721c6106f78f85b8be33c Author: Nick Mathewson nickm@torproject.org Date: Wed Aug 12 10:49:19 2015 -0400
Use thread-local storage to block event_queue recursion. --- src/common/compat_pthreads.c | 27 +++++++++++++++++++++++ src/common/compat_threads.h | 13 +++++++++++ src/common/compat_winthreads.c | 27 +++++++++++++++++++++++ src/or/control.c | 47 +++++++++++++++++++++++++--------------- src/test/test_threads.c | 14 +++++++----- 5 files changed, 104 insertions(+), 24 deletions(-)
diff --git a/src/common/compat_pthreads.c b/src/common/compat_pthreads.c index b78ab3d..4b32fc9 100644 --- a/src/common/compat_pthreads.c +++ b/src/common/compat_pthreads.c @@ -275,6 +275,33 @@ tor_cond_signal_all(tor_cond_t *cond) pthread_cond_broadcast(&cond->cond); }
+int +tor_threadlocal_init(tor_threadlocal_t *threadlocal) +{ + int err = pthread_key_create(&threadlocal->key, NULL); + return err ? -1 : 0; +} + +void +tor_threadlocal_destroy(tor_threadlocal_t *threadlocal) +{ + pthread_key_delete(threadlocal->key); + memset(threadlocal, 0, sizeof(tor_threadlocal_t)); +} + +void * +tor_threadlocal_get(tor_threadlocal_t *threadlocal) +{ + return pthread_getspecific(threadlocal->key); +} + +void +tor_threadlocal_set(tor_threadlocal_t *threadlocal, void *value) +{ + int err = pthread_setspecific(threadlocal->key, value); + tor_assert(err == 0); +} + /** Set up common structures for use by threading. */ void tor_threads_init(void) diff --git a/src/common/compat_threads.h b/src/common/compat_threads.h index acf3083..a1b5056 100644 --- a/src/common/compat_threads.h +++ b/src/common/compat_threads.h @@ -111,5 +111,18 @@ typedef struct alert_sockets_s { int alert_sockets_create(alert_sockets_t *socks_out, uint32_t flags); void alert_sockets_close(alert_sockets_t *socks);
+typedef struct tor_threadlocal_s { +#ifdef _WIN32 + DWORD index; +#else + pthread_key_t key; +#endif +} tor_threadlocal_t; + +int tor_threadlocal_init(tor_threadlocal_t *threadlocal); +void tor_threadlocal_destroy(tor_threadlocal_t *threadlocal); +void *tor_threadlocal_get(tor_threadlocal_t *threadlocal); +void tor_threadlocal_set(tor_threadlocal_t *threadlocal, void *value); + #endif
diff --git a/src/common/compat_winthreads.c b/src/common/compat_winthreads.c index 71b994c..3d9e236 100644 --- a/src/common/compat_winthreads.c +++ b/src/common/compat_winthreads.c @@ -123,6 +123,33 @@ tor_cond_signal_all(tor_cond_t *cond) }
int +tor_threadlocal_init(tor_threadlocal_t *threadlocal) +{ + threadlocal->index = TlsAlloc(); + return (threadlocal->index == TLS_OUT_OF_INDEXES) ? -1 : 0; +} + +void +tor_threadlocal_destroy(tor_threadlocal_t *threadlocal) +{ + TlsFree(threadlocal->index); + memset(threadlocal, 0, sizeof(tor_threadlocal_t)); +} + +void * +tor_threadlocal_get(tor_threadlocal_t *threadlocal) +{ + return TlsGetValue(threadlocal->index); +} + +void +tor_threadlocal_set(tor_threadlocal_t *threadlocal, void *value) +{ + BOOL ok = TlsSetValue(threadlocal->index, value); + tor_assert(ok); +} + +int tor_cond_wait(tor_cond_t *cond, tor_mutex_t *lock_, const struct timeval *tv) { CRITICAL_SECTION *lock = &lock_->mutex; diff --git a/src/or/control.c b/src/or/control.c index 2be99d5..c805465 100644 --- a/src/or/control.c +++ b/src/or/control.c @@ -594,9 +594,9 @@ typedef struct queued_event_s { char *msg; } queued_event_t;
-/** If this is greater than 0, we don't allow new events to be queued. - * XXXX This should be thread-local. */ -static int block_event_queue = 0; +/** Pointer to int. If this is greater than 0, we don't allow new events to be + * queued. */ +static tor_threadlocal_t block_event_queue;
/** Holds a smartlist of queued_event_t objects that may need to be sent * to one or more controllers */ @@ -631,9 +631,21 @@ control_initialize_event_queue(void)
if (queued_control_events_lock == NULL) { queued_control_events_lock = tor_mutex_new(); + tor_threadlocal_init(&block_event_queue); } }
+static int * +get_block_event_queue(void) +{ + int *val = tor_threadlocal_get(&block_event_queue); + if (PREDICT_UNLIKELY(val == NULL)) { + val = tor_malloc_zero(sizeof(int)); + tor_threadlocal_set(&block_event_queue, val); + } + return val; +} + /** Helper: inserts an event on the list of events queued to be sent to * one or more controllers, and schedules the events to be flushed if needed. * @@ -655,21 +667,20 @@ queue_control_event_string,(uint16_t event, char *msg)) return; }
- queued_event_t *ev = tor_malloc(sizeof(*ev)); - ev->event = event; - ev->msg = msg; - - tor_mutex_acquire(queued_control_events_lock); - if (block_event_queue) { /* XXXX This should be thread-specific. */ - tor_mutex_release(queued_control_events_lock); + int *block_event_queue = get_block_event_queue(); + if (*block_event_queue) { tor_free(msg); - tor_free(ev); return; }
+ queued_event_t *ev = tor_malloc(sizeof(*ev)); + ev->event = event; + ev->msg = msg; + /* No queueing an event while queueing an event */ - ++block_event_queue; + ++*block_event_queue;
+ tor_mutex_acquire(queued_control_events_lock); tor_assert(queued_control_events); smartlist_add(queued_control_events, ev);
@@ -679,10 +690,10 @@ queue_control_event_string,(uint16_t event, char *msg)) flush_queued_event_pending = 1; }
- --block_event_queue; - tor_mutex_release(queued_control_events_lock);
+ --*block_event_queue; + /* We just put an event on the queue; mark the queue to be * flushed. We only do this from the main thread for now; otherwise, * we'd need to incur locking overhead in Libevent or use a socket. @@ -718,9 +729,11 @@ queued_events_flush_all(int force) smartlist_t *controllers = smartlist_new(); smartlist_t *queued_events;
+ int *block_event_queue = get_block_event_queue(); + ++*block_event_queue; + tor_mutex_acquire(queued_control_events_lock); /* No queueing an event while flushing events. */ - ++block_event_queue; flush_queued_event_pending = 0; queued_events = queued_control_events; queued_control_events = smartlist_new(); @@ -760,9 +773,7 @@ queued_events_flush_all(int force) smartlist_free(queued_events); smartlist_free(controllers);
- tor_mutex_acquire(queued_control_events_lock); - --block_event_queue; - tor_mutex_release(queued_control_events_lock); + --*block_event_queue; }
/** Libevent callback: Flushes pending events to controllers that are diff --git a/src/test/test_threads.c b/src/test/test_threads.c index 2ac08d4..35f5dc8 100644 --- a/src/test/test_threads.c +++ b/src/test/test_threads.c @@ -28,7 +28,7 @@ static unsigned long thread_fn_tid1, thread_fn_tid2; static void thread_test_func_(void* _s) ATTR_NORETURN;
/** How many iterations have the threads in the unit test run? */ -static int t1_count = 0, t2_count = 0; +static tor_threadlocal_t count;
/** Helper function for threading unit tests: This function runs in a * subthread. It grabs its own mutex (start1 or start2) to make sure that it @@ -38,19 +38,19 @@ static void thread_test_func_(void* _s) { char *s = _s; - int i, *count; + int i; tor_mutex_t *m; char buf[64]; char **cp; + int *mycount = tor_malloc_zero(sizeof(int)); + tor_threadlocal_set(&count, mycount); if (!strcmp(s, "thread 1")) { m = thread_test_start1_; cp = &thread1_name_; - count = &t1_count; thread_fn_tid1 = tor_get_thread_id(); } else { m = thread_test_start2_; cp = &thread2_name_; - count = &t2_count; thread_fn_tid2 = tor_get_thread_id(); }
@@ -62,8 +62,10 @@ thread_test_func_(void* _s) for (i=0; i<10000; ++i) { tor_mutex_acquire(thread_test_mutex_); strmap_set(thread_test_strmap_, "last to run", *cp); - ++*count; tor_mutex_release(thread_test_mutex_); + int *tls_count = tor_threadlocal_get(&count); + tor_assert(tls_count == mycount); + ++*tls_count; } tor_mutex_acquire(thread_test_mutex_); strmap_set(thread_test_strmap_, s, *cp); @@ -89,6 +91,7 @@ test_threads_basic(void *arg) tv.tv_usec=100*1000; #endif (void) arg; + tt_int_op(tor_threadlocal_init(&count), OP_EQ, 0);
set_main_thread();
@@ -128,7 +131,6 @@ test_threads_basic(void *arg) tor_mutex_free(thread_test_mutex_);
if (timedout) { - printf("\nTimed out: %d %d", t1_count, t2_count); tt_assert(strmap_get(thread_test_strmap_, "thread 1")); tt_assert(strmap_get(thread_test_strmap_, "thread 2")); tt_assert(!timedout);