[tor-commits] [tor/master] Add a way to tell all threads to do something.

nickm at torproject.org nickm at torproject.org
Wed Jan 21 19:50:31 UTC 2015


commit 4abbf13f99dac9e15856dc4e458a8c9525acab4d
Author: Nick Mathewson <nickm at torproject.org>
Date:   Tue Sep 24 20:55:09 2013 -0400

    Add a way to tell all threads to do something.
---
 src/common/workqueue.c |   57 +++++++++++++++++++++++++++++++++++++-----------
 src/common/workqueue.h |    6 ++++-
 2 files changed, 49 insertions(+), 14 deletions(-)

diff --git a/src/common/workqueue.c b/src/common/workqueue.c
index ed70d5e..c4b64de 100644
--- a/src/common/workqueue.c
+++ b/src/common/workqueue.c
@@ -186,13 +186,32 @@ workerthread_new(void *state, replyqueue_t *replyqueue)
   return thr;
 }
 
+static workqueue_entry_t *
+workerthread_queue_work(workerthread_t *worker,
+                        int (*fn)(void *, void *),
+                        void (*reply_fn)(void *),
+                        void *arg)
+{
+  workqueue_entry_t *ent = workqueue_entry_new(fn, reply_fn, arg);
+
+  tor_mutex_acquire(&worker->lock);
+  ent->on_thread = worker;
+  ent->pending = 1;
+  TOR_TAILQ_INSERT_TAIL(&worker->work, ent, next_work);
+
+  if (worker->waiting) /* XXXX inside or outside of lock?? */
+    tor_cond_signal_one(&worker->condition);
+
+  tor_mutex_release(&worker->lock);
+  return ent;
+}
+
 workqueue_entry_t *
 threadpool_queue_work(threadpool_t *pool,
                       int (*fn)(void *, void *),
                       void (*reply_fn)(void *),
                       void *arg)
 {
-  workqueue_entry_t *ent;
   workerthread_t *worker;
 
   tor_mutex_acquire(&pool->lock);
@@ -205,22 +224,34 @@ threadpool_queue_work(threadpool_t *pool,
     pool->next_for_work = 0;
   tor_mutex_release(&pool->lock);
 
-  ent = workqueue_entry_new(fn, reply_fn, arg);
-
-  tor_mutex_acquire(&worker->lock);
-  ent->on_thread = worker;
-  ent->pending = 1;
-  TOR_TAILQ_INSERT_TAIL(&worker->work, ent, next_work);
-
-  if (worker->waiting) /* XXXX inside or outside of lock?? */
-    tor_cond_signal_one(&worker->condition);
+  return workerthread_queue_work(worker, fn, reply_fn, arg);
+}
 
-  tor_mutex_release(&worker->lock);
+int
+threadpool_queue_for_all(threadpool_t *pool,
+                         void *(*dup_fn)(void *),
+                         int (*fn)(void *, void *),
+                         void (*reply_fn)(void *),
+                         void *arg)
+{
+  int i = 0;
+  workerthread_t *worker;
+  void *arg_copy;
+  while (1) {
+    tor_mutex_acquire(&pool->lock);
+    if (i >= pool->n_threads) {
+      tor_mutex_release(&pool->lock);
+      return 0;
+    }
+    worker = pool->threads[i++];
+    tor_mutex_release(&pool->lock);
 
-  return ent;
+    arg_copy = dup_fn ? dup_fn(arg) : arg;
+    /* CHECK*/ workerthread_queue_work(worker, fn, reply_fn, arg_copy);
+  }
 }
 
-int
+static int
 threadpool_start_threads(threadpool_t *pool, int n)
 {
   tor_mutex_acquire(&pool->lock);
diff --git a/src/common/workqueue.h b/src/common/workqueue.h
index 47753cf..684fb19 100644
--- a/src/common/workqueue.h
+++ b/src/common/workqueue.h
@@ -21,8 +21,12 @@ workqueue_entry_t *threadpool_queue_work(threadpool_t *pool,
                                          int (*fn)(void *, void *),
                                          void (*reply_fn)(void *),
                                          void *arg);
+int threadpool_queue_for_all(threadpool_t *pool,
+                             void *(*dup_fn)(void *),
+                             int (*fn)(void *, void *),
+                             void (*reply_fn)(void *),
+                             void *arg);
 int workqueue_entry_cancel(workqueue_entry_t *pending_work);
-int threadpool_start_threads(threadpool_t *pool, int n);
 threadpool_t *threadpool_new(int n_threads,
                              replyqueue_t *replyqueue,
                              void *(*new_thread_state_fn)(void*),





More information about the tor-commits mailing list