[tor-commits] [tor/master] Test and fix workqueue_entry_cancel().

nickm at torproject.org nickm at torproject.org
Wed Jan 21 19:50:31 UTC 2015


commit e5f8c772f4c468a20da8b9176c2b276ac76bbe78
Author: Nick Mathewson <nickm at torproject.org>
Date:   Sat Sep 28 00:33:10 2013 -0400

    Test and fix workqueue_entry_cancel().
---
 src/common/workqueue.c    |   12 ++++---
 src/common/workqueue.h    |    2 +-
 src/test/test_workqueue.c |   77 ++++++++++++++++++++++++++++++++++++---------
 3 files changed, 71 insertions(+), 20 deletions(-)

diff --git a/src/common/workqueue.c b/src/common/workqueue.c
index 9293e1f..44cf98d 100644
--- a/src/common/workqueue.c
+++ b/src/common/workqueue.c
@@ -119,27 +119,29 @@ workqueue_entry_free(workqueue_entry_t *ent)
  * executed in the main thread; that will cause undefined behavior (probably,
  * a crash).
  *
- * If the work is cancelled, this function return 1. It is the caller's
- * responsibility to free any storage in the work function's arguments.
+ * If the work is cancelled, this function return the argument passed to the
+ * work function. It is the caller's responsibility to free this storage.
  *
  * This function will have no effect if the worker thread has already executed
- * or begun to execute the work item.  In that case, it will return 0.
+ * or begun to execute the work item.  In that case, it will return NULL.
  */
-int
+void *
 workqueue_entry_cancel(workqueue_entry_t *ent)
 {
   int cancelled = 0;
+  void *result = NULL;
   tor_mutex_acquire(&ent->on_thread->lock);
   if (ent->pending) {
     TOR_TAILQ_REMOVE(&ent->on_thread->work, ent, next_work);
     cancelled = 1;
+    result = ent->arg;
   }
   tor_mutex_release(&ent->on_thread->lock);
 
   if (cancelled) {
     tor_free(ent);
   }
-  return cancelled;
+  return result;
 }
 
 /**
diff --git a/src/common/workqueue.h b/src/common/workqueue.h
index 5a6cd80..ec1f7c9 100644
--- a/src/common/workqueue.h
+++ b/src/common/workqueue.h
@@ -32,7 +32,7 @@ int threadpool_queue_for_all(threadpool_t *pool,
                              int (*fn)(void *, void *),
                              void (*reply_fn)(void *),
                              void *arg);
-int workqueue_entry_cancel(workqueue_entry_t *pending_work);
+void *workqueue_entry_cancel(workqueue_entry_t *pending_work);
 threadpool_t *threadpool_new(int n_threads,
                              replyqueue_t *replyqueue,
                              void *(*new_thread_state_fn)(void*),
diff --git a/src/test/test_workqueue.c b/src/test/test_workqueue.c
index cbf9d81..6de6f03 100644
--- a/src/test/test_workqueue.c
+++ b/src/test/test_workqueue.c
@@ -23,6 +23,7 @@ static int opt_n_threads = 8;
 static int opt_n_items = 10000;
 static int opt_n_inflight = 1000;
 static int opt_n_lowwater = 250;
+static int opt_n_cancel = 0;
 static int opt_ratio_rsa = 5;
 
 #ifdef TRACK_RESPONSES
@@ -172,27 +173,68 @@ handle_reply(void *arg)
   ++n_received;
 }
 
-static int
+static workqueue_entry_t *
 add_work(threadpool_t *tp)
 {
   int add_rsa =
     opt_ratio_rsa == 0 ||
     tor_weak_random_range(&weak_rng, opt_ratio_rsa) == 0;
+
   if (add_rsa) {
     rsa_work_t *w = tor_malloc_zero(sizeof(*w));
     w->serial = n_sent++;
     crypto_rand((char*)w->msg, 20);
     w->msglen = 20;
     ++rsa_sent;
-    return threadpool_queue_work(tp, workqueue_do_rsa, handle_reply, w) != NULL;
+    return threadpool_queue_work(tp, workqueue_do_rsa, handle_reply, w);
   } else {
     ecdh_work_t *w = tor_malloc_zero(sizeof(*w));
     w->serial = n_sent++;
     /* Not strictly right, but this is just for benchmarks. */
     crypto_rand((char*)w->u.pk.public_key, 32);
     ++ecdh_sent;
-    return threadpool_queue_work(tp, workqueue_do_ecdh, handle_reply, w) != NULL;
+    return threadpool_queue_work(tp, workqueue_do_ecdh, handle_reply, w);
+  }
+}
+
+static int n_failed_cancel = 0;
+static int n_successful_cancel = 0;
+
+static int
+add_n_work_items(threadpool_t *tp, int n)
+{
+  int n_queued = 0;
+  int n_try_cancel = 0, i;
+  workqueue_entry_t **to_cancel;
+  workqueue_entry_t *ent;
+
+  to_cancel = tor_malloc(sizeof(workqueue_entry_t*) * opt_n_cancel);
+
+  while (n_queued++ < n) {
+    ent = add_work(tp);
+    if (! ent) {
+      puts("Couldn't add work.");
+      tor_event_base_loopexit(tor_libevent_get_base(), NULL);
+      return -1;
+    }
+    if (n_try_cancel < opt_n_cancel &&
+        tor_weak_random_range(&weak_rng, n) < opt_n_cancel) {
+      to_cancel[n_try_cancel++] = ent;
+    }
+  }
+
+  for (i = 0; i < n_try_cancel; ++i) {
+    void *work = workqueue_entry_cancel(to_cancel[i]);
+    if (! work) {
+      n_failed_cancel++;
+    } else {
+      n_successful_cancel++;
+      tor_free(work);
+    }
   }
+
+  tor_free(to_cancel);
+  return 0;
 }
 
 static int shutting_down = 0;
@@ -223,8 +265,13 @@ replysock_readable_cb(tor_socket_t sock, short what, void *arg)
   if (old_r == n_received)
     return;
 
-  if (opt_verbose)
-    printf("%d / %d\n", n_received, n_sent);
+  if (opt_verbose) {
+    printf("%d / %d", n_received, n_sent);
+    if (opt_n_cancel)
+      printf(" (%d cancelled, %d uncancellable)",
+             n_successful_cancel, n_failed_cancel);
+    puts("");
+  }
 #ifdef TRACK_RESPONSES
   tor_mutex_acquire(&bitmap_mutex);
   for (i = 0; i < opt_n_items; ++i) {
@@ -239,16 +286,14 @@ replysock_readable_cb(tor_socket_t sock, short what, void *arg)
   tor_mutex_release(&bitmap_mutex);
 #endif
 
-  if (n_sent - n_received < opt_n_lowwater) {
-    while (n_sent < n_received + opt_n_inflight && n_sent < opt_n_items) {
-      if (! add_work(tp)) {
-        puts("Couldn't add work.");
-        tor_event_base_loopexit(tor_libevent_get_base(), NULL);
-      }
-    }
+  if (n_sent - (n_received+n_successful_cancel) < opt_n_lowwater) {
+    int n_to_send = n_received + opt_n_inflight - n_sent;
+    if (n_to_send > opt_n_items - n_sent)
+      n_to_send = opt_n_items - n_sent;
+    add_n_work_items(tp, n_to_send);
   }
 
-  if (shutting_down == 0 && n_received == n_sent && n_sent >= opt_n_items) {
+  if (shutting_down == 0 && n_received+n_successful_cancel == n_sent && n_sent >= opt_n_items) {
     shutting_down = 1;
     threadpool_queue_for_all(tp, NULL, workqueue_do_shutdown, shutdown_reply, NULL);
   }
@@ -263,6 +308,7 @@ help(void)
      "    -T <threads>  Use this many threads\n"
      "    -I <inflight> Have no more than this many requests queued at once\n"
      "    -L <lowwater> Add items whenever fewer than this many are pending\n"
+     "    -C <cancel>   Try to cancel N items of every batch that we add\n"
      "    -R <ratio>    Make one out of this many items be a slow (RSA) one\n"
      "    --no-{eventfd2,eventfd,pipe2,pipe,socketpair}\n"
      "                  Disable one of the alert_socket backends.");
@@ -291,6 +337,8 @@ main(int argc, char **argv)
       opt_n_lowwater = atoi(argv[++i]);
     } else if (!strcmp(argv[i], "-R") && i+1<argc) {
       opt_ratio_rsa = atoi(argv[++i]);
+    } else if (!strcmp(argv[i], "-C") && i+1<argc) {
+      opt_n_cancel = atoi(argv[++i]);
     } else if (!strcmp(argv[i], "--no-eventfd2")) {
       as_flags |= ASOCKS_NOEVENTFD2;
     } else if (!strcmp(argv[i], "--no-eventfd")) {
@@ -311,6 +359,7 @@ main(int argc, char **argv)
   }
   if (opt_n_threads < 1 ||
       opt_n_items < 1 || opt_n_inflight < 1 || opt_n_lowwater < 0 ||
+      opt_n_cancel > opt_n_inflight ||
       opt_ratio_rsa < 0) {
     help();
     return 1;
@@ -358,7 +407,7 @@ main(int argc, char **argv)
 
   event_base_loop(tor_libevent_get_base(), 0);
 
-  if (n_sent != opt_n_items || n_received != n_sent ||
+  if (n_sent != opt_n_items || n_received+n_successful_cancel != n_sent ||
       n_shutdowns_done != opt_n_threads) {
     puts("FAIL");
     return 1;





More information about the tor-commits mailing list