
commit 93ad89e9d219d6cea764652a05c236210c7de3fa Author: Nick Mathewson <nickm@torproject.org> Date: Wed Sep 25 11:36:02 2013 -0400 Rename bench_workqueue -> test_workqueue and make it a unit test. --- .gitignore | 2 + src/test/bench_workqueue.c | 288 ------------------------------------- src/test/include.am | 19 +-- src/test/test_workqueue.c | 342 ++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 355 insertions(+), 296 deletions(-) diff --git a/.gitignore b/.gitignore index 9ddd0c5..e63576c 100644 --- a/.gitignore +++ b/.gitignore @@ -163,10 +163,12 @@ cscope.* /src/test/test-bt-cl /src/test/test-child /src/test/test-ntor-cl +/src/test/test_workqueue /src/test/test.exe /src/test/test-bt-cl.exe /src/test/test-child.exe /src/test/test-ntor-cl.exe +/src/test/test_workqueue.exe # /src/tools/ /src/tools/tor-checkkey diff --git a/src/test/bench_workqueue.c b/src/test/bench_workqueue.c deleted file mode 100644 index f190c61..0000000 --- a/src/test/bench_workqueue.c +++ /dev/null @@ -1,288 +0,0 @@ -/* Copyright (c) 2001-2004, Roger Dingledine. - * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson. - * Copyright (c) 2007-2013, The Tor Project, Inc. */ -/* See LICENSE for licensing information */ - -#include "or.h" -#include "compat_threads.h" -#include "onion.h" -#include "workqueue.h" -#include "crypto.h" -#include "crypto_curve25519.h" -#include "compat_libevent.h" - -#include <stdio.h> -#ifdef HAVE_EVENT2_EVENT_H -#include <event2/event.h> -#else -#include <event.h> -#endif - -#ifdef TRACK_RESPONSES -tor_mutex_t bitmap_mutex; -int handled_len; -bitarray_t *handled; -#endif - -#define N_ITEMS 10000 -#define N_INFLIGHT 1000 -#define RELAUNCH_AT 250 - -typedef struct state_s { - int magic; - int n_handled; - crypto_pk_t *rsa; - curve25519_secret_key_t ecdh; -} state_t; - -typedef struct rsa_work_s { - int serial; - uint8_t msg[128]; - uint8_t msglen; -} rsa_work_t; - -typedef struct ecdh_work_s { - int serial; - union { - curve25519_public_key_t pk; - uint8_t msg[32]; - } u; -} ecdh_work_t; - -static void -mark_handled(int serial) -{ -#ifdef TRACK_RESPONSES - tor_mutex_acquire(&bitmap_mutex); - tor_assert(serial < handled_len); - tor_assert(! bitarray_is_set(handled, serial)); - bitarray_set(handled, serial); - tor_mutex_release(&bitmap_mutex); -#else - (void)serial; -#endif -} - -static int -workqueue_do_rsa(void *state, void *work) -{ - rsa_work_t *rw = work; - state_t *st = state; - crypto_pk_t *rsa = st->rsa; - uint8_t sig[256]; - int len; - - tor_assert(st->magic == 13371337); - - len = crypto_pk_private_sign(rsa, (char*)sig, 256, - (char*)rw->msg, rw->msglen); - if (len < 0) { - rw->msglen = 0; - return WQ_RPL_ERROR; - } - - memset(rw->msg, 0, sizeof(rw->msg)); - rw->msglen = len; - memcpy(rw->msg, sig, len); - ++st->n_handled; - - mark_handled(rw->serial); - - return WQ_RPL_REPLY; -} - -#if 0 -static int -workqueue_do_shutdown(void *state, void *work) -{ - (void)state; - (void)work; - (void)cmd; - crypto_pk_free(((state_t*)state)->rsa); - tor_free(state); - return WQ_RPL_SHUTDOWN; -} -#endif - -static int -workqueue_do_ecdh(void *state, void *work) -{ - ecdh_work_t *ew = work; - uint8_t output[CURVE25519_OUTPUT_LEN]; - state_t *st = state; - - tor_assert(st->magic == 13371337); - - curve25519_handshake(output, &st->ecdh, &ew->u.pk); - memcpy(ew->u.msg, output, CURVE25519_OUTPUT_LEN); - ++st->n_handled; - mark_handled(ew->serial); - return WQ_RPL_REPLY; -} - -static void * -new_state(void *arg) -{ - state_t *st; - (void)arg; - - st = tor_malloc(sizeof(*st)); - /* Every thread gets its own keys. not a problem for benchmarking */ - st->rsa = crypto_pk_new(); - if (crypto_pk_generate_key_with_bits(st->rsa, 1024) < 0) { - puts("keygen failed"); - crypto_pk_free(st->rsa); - tor_free(st); - return NULL; - } - curve25519_secret_key_generate(&st->ecdh, 0); - st->magic = 13371337; - return st; -} - -static void -free_state(void *arg) -{ - state_t *st = arg; - crypto_pk_free(st->rsa); - tor_free(st); -} - -static tor_weak_rng_t weak_rng; -static int n_sent = 0; -static int rsa_sent = 0; -static int ecdh_sent = 0; -static int n_received = 0; - -#ifdef TRACK_RESPONSES -bitarray_t *received; -#endif - -static void -handle_reply(void *arg) -{ -#ifdef TRACK_RESPONSES - rsa_work_t *rw = arg; /* Naughty cast, but only looking at serial. */ - tor_assert(! bitarray_is_set(received, rw->serial)); - bitarray_set(received,rw->serial); -#endif - - tor_free(arg); - ++n_received; -} - -static int -add_work(threadpool_t *tp) -{ - int add_rsa = tor_weak_random_range(&weak_rng, 5) == 0; - if (add_rsa) { - rsa_work_t *w = tor_malloc_zero(sizeof(*w)); - w->serial = n_sent++; - crypto_rand((char*)w->msg, 20); - w->msglen = 20; - ++rsa_sent; - return threadpool_queue_work(tp, workqueue_do_rsa, handle_reply, w) != NULL; - } else { - ecdh_work_t *w = tor_malloc_zero(sizeof(*w)); - w->serial = n_sent++; - /* Not strictly right, but this is just for benchmarks. */ - crypto_rand((char*)w->u.pk.public_key, 32); - ++ecdh_sent; - return threadpool_queue_work(tp, workqueue_do_ecdh, handle_reply, w) != NULL; - } -} - -static void -replysock_readable_cb(tor_socket_t sock, short what, void *arg) -{ - threadpool_t *tp = arg; - replyqueue_t *rq = threadpool_get_replyqueue(tp); - - int old_r = n_received; - (void) sock; - (void) what; - - replyqueue_process(rq); - if (old_r == n_received) - return; - - printf("%d / %d\n", n_received, n_sent); -#ifdef TRACK_RESPONSES - tor_mutex_acquire(&bitmap_mutex); - for (i = 0; i < N_ITEMS; ++i) { - if (bitarray_is_set(received, i)) - putc('o', stdout); - else if (bitarray_is_set(handled, i)) - putc('!', stdout); - else - putc('.', stdout); - } - puts(""); - tor_mutex_release(&bitmap_mutex); -#endif - - if (n_sent - n_received < RELAUNCH_AT) { - while (n_sent < n_received + N_INFLIGHT && n_sent < N_ITEMS) { - if (! add_work(tp)) { - puts("Couldn't add work."); - tor_event_base_loopexit(tor_libevent_get_base(), NULL); - } - } - } - - if (n_received == n_sent && n_sent >= N_ITEMS) { - tor_event_base_loopexit(tor_libevent_get_base(), NULL); - } -} - -int -main(int argc, char **argv) -{ - replyqueue_t *rq; - threadpool_t *tp; - int i; - tor_libevent_cfg evcfg; - struct event *ev; - - (void)argc; - (void)argv; - - init_logging(1); - crypto_global_init(1, NULL, NULL); - crypto_seed_rng(1); - - rq = replyqueue_new(); - tor_assert(rq); - tp = threadpool_new(16, - rq, new_state, free_state, NULL); - tor_assert(tp); - - crypto_seed_weak_rng(&weak_rng); - - memset(&evcfg, 0, sizeof(evcfg)); - tor_libevent_initialize(&evcfg); - - ev = tor_event_new(tor_libevent_get_base(), - replyqueue_get_socket(rq), EV_READ|EV_PERSIST, - replysock_readable_cb, tp); - - event_add(ev, NULL); - -#ifdef TRACK_RESPONSES - handled = bitarray_init_zero(N_ITEMS); - received = bitarray_init_zero(N_ITEMS); - tor_mutex_init(&bitmap_mutex); - handled_len = N_ITEMS; -#endif - - for (i = 0; i < N_INFLIGHT; ++i) { - if (! add_work(tp)) { - puts("Couldn't add work."); - return 1; - } - } - - event_base_loop(tor_libevent_get_base(), 0); - - return 0; -} diff --git a/src/test/include.am b/src/test/include.am index 6ad1b55..2badc47 100644 --- a/src/test/include.am +++ b/src/test/include.am @@ -1,8 +1,8 @@ TESTS += src/test/test -noinst_PROGRAMS+= src/test/bench src/test/bench_workqueue +noinst_PROGRAMS+= src/test/bench if UNITTESTS_ENABLED -noinst_PROGRAMS+= src/test/test src/test/test-child +noinst_PROGRAMS+= src/test/test src/test/test-child src/test/test_workqueue endif src_test_AM_CPPFLAGS = -DSHARE_DATADIR="\"$(datadir)\"" \ @@ -62,8 +62,10 @@ src_test_test_CPPFLAGS= $(src_test_AM_CPPFLAGS) src_test_bench_SOURCES = \ src/test/bench.c -src_test_bench_workqueue_SOURCES = \ - src/test/bench_workqueue.c +src_test_test_workqueue_SOURCES = \ + src/test/test_workqueue.c +src_test_test_workqueue_CPPFLAGS= $(src_test_AM_CPPFLAGS) +src_test_test_workqueue_CFLAGS = $(AM_CFLAGS) $(TEST_CFLAGS) src_test_test_LDFLAGS = @TOR_LDFLAGS_zlib@ @TOR_LDFLAGS_openssl@ \ @TOR_LDFLAGS_libevent@ @@ -83,11 +85,12 @@ src_test_bench_LDADD = src/or/libtor.a src/common/libor.a \ @TOR_OPENSSL_LIBS@ @TOR_LIB_WS32@ @TOR_LIB_GDI@ @CURVE25519_LIBS@ \ @TOR_SYSTEMD_LIBS@ -src_test_bench_workqueue_LDFLAGS = @TOR_LDFLAGS_zlib@ @TOR_LDFLAGS_openssl@ \ +src_test_test_workqueue_LDFLAGS = @TOR_LDFLAGS_zlib@ @TOR_LDFLAGS_openssl@ \ @TOR_LDFLAGS_libevent@ -src_test_bench_workqueue_LDADD = src/or/libtor.a src/common/libor.a \ - src/common/libor-crypto.a $(LIBDONNA) \ - src/common/libor-event.a \ +src_test_test_workqueue_LDADD = src/or/libtor-testing.a \ + src/common/libor-testing.a \ + src/common/libor-crypto-testing.a $(LIBDONNA) \ + src/common/libor-event-testing.a \ @TOR_ZLIB_LIBS@ @TOR_LIB_MATH@ @TOR_LIBEVENT_LIBS@ \ @TOR_OPENSSL_LIBS@ @TOR_LIB_WS32@ @TOR_LIB_GDI@ @CURVE25519_LIBS@ diff --git a/src/test/test_workqueue.c b/src/test/test_workqueue.c new file mode 100644 index 0000000..4077fb2 --- /dev/null +++ b/src/test/test_workqueue.c @@ -0,0 +1,342 @@ +/* Copyright (c) 2001-2004, Roger Dingledine. + * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson. + * Copyright (c) 2007-2013, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +#include "or.h" +#include "compat_threads.h" +#include "onion.h" +#include "workqueue.h" +#include "crypto.h" +#include "crypto_curve25519.h" +#include "compat_libevent.h" + +#include <stdio.h> +#ifdef HAVE_EVENT2_EVENT_H +#include <event2/event.h> +#else +#include <event.h> +#endif + +static int opt_verbose = 0; +static int opt_n_threads = 8; +static int opt_n_items = 10000; +static int opt_n_inflight = 1000; +static int opt_n_lowwater = 250; +static int opt_ratio_rsa = 5; + +#ifdef TRACK_RESPONSES +tor_mutex_t bitmap_mutex; +int handled_len; +bitarray_t *handled; +#endif + +typedef struct state_s { + int magic; + int n_handled; + crypto_pk_t *rsa; + curve25519_secret_key_t ecdh; +} state_t; + +typedef struct rsa_work_s { + int serial; + uint8_t msg[128]; + uint8_t msglen; +} rsa_work_t; + +typedef struct ecdh_work_s { + int serial; + union { + curve25519_public_key_t pk; + uint8_t msg[32]; + } u; +} ecdh_work_t; + +static void +mark_handled(int serial) +{ +#ifdef TRACK_RESPONSES + tor_mutex_acquire(&bitmap_mutex); + tor_assert(serial < handled_len); + tor_assert(! bitarray_is_set(handled, serial)); + bitarray_set(handled, serial); + tor_mutex_release(&bitmap_mutex); +#else + (void)serial; +#endif +} + +static int +workqueue_do_rsa(void *state, void *work) +{ + rsa_work_t *rw = work; + state_t *st = state; + crypto_pk_t *rsa = st->rsa; + uint8_t sig[256]; + int len; + + tor_assert(st->magic == 13371337); + + len = crypto_pk_private_sign(rsa, (char*)sig, 256, + (char*)rw->msg, rw->msglen); + if (len < 0) { + rw->msglen = 0; + return WQ_RPL_ERROR; + } + + memset(rw->msg, 0, sizeof(rw->msg)); + rw->msglen = len; + memcpy(rw->msg, sig, len); + ++st->n_handled; + + mark_handled(rw->serial); + + return WQ_RPL_REPLY; +} + +#if 0 +static int +workqueue_do_shutdown(void *state, void *work) +{ + (void)state; + (void)work; + (void)cmd; + crypto_pk_free(((state_t*)state)->rsa); + tor_free(state); + return WQ_RPL_SHUTDOWN; +} +#endif + +static int +workqueue_do_ecdh(void *state, void *work) +{ + ecdh_work_t *ew = work; + uint8_t output[CURVE25519_OUTPUT_LEN]; + state_t *st = state; + + tor_assert(st->magic == 13371337); + + curve25519_handshake(output, &st->ecdh, &ew->u.pk); + memcpy(ew->u.msg, output, CURVE25519_OUTPUT_LEN); + ++st->n_handled; + mark_handled(ew->serial); + return WQ_RPL_REPLY; +} + +static void * +new_state(void *arg) +{ + state_t *st; + (void)arg; + + st = tor_malloc(sizeof(*st)); + /* Every thread gets its own keys. not a problem for benchmarking */ + st->rsa = crypto_pk_new(); + if (crypto_pk_generate_key_with_bits(st->rsa, 1024) < 0) { + puts("keygen failed"); + crypto_pk_free(st->rsa); + tor_free(st); + return NULL; + } + curve25519_secret_key_generate(&st->ecdh, 0); + st->magic = 13371337; + return st; +} + +static void +free_state(void *arg) +{ + state_t *st = arg; + crypto_pk_free(st->rsa); + tor_free(st); +} + +static tor_weak_rng_t weak_rng; +static int n_sent = 0; +static int rsa_sent = 0; +static int ecdh_sent = 0; +static int n_received = 0; + +#ifdef TRACK_RESPONSES +bitarray_t *received; +#endif + +static void +handle_reply(void *arg) +{ +#ifdef TRACK_RESPONSES + rsa_work_t *rw = arg; /* Naughty cast, but only looking at serial. */ + tor_assert(! bitarray_is_set(received, rw->serial)); + bitarray_set(received,rw->serial); +#endif + + tor_free(arg); + ++n_received; +} + +static int +add_work(threadpool_t *tp) +{ + int add_rsa = + opt_ratio_rsa == 0 || + tor_weak_random_range(&weak_rng, opt_ratio_rsa) == 0; + if (add_rsa) { + rsa_work_t *w = tor_malloc_zero(sizeof(*w)); + w->serial = n_sent++; + crypto_rand((char*)w->msg, 20); + w->msglen = 20; + ++rsa_sent; + return threadpool_queue_work(tp, workqueue_do_rsa, handle_reply, w) != NULL; + } else { + ecdh_work_t *w = tor_malloc_zero(sizeof(*w)); + w->serial = n_sent++; + /* Not strictly right, but this is just for benchmarks. */ + crypto_rand((char*)w->u.pk.public_key, 32); + ++ecdh_sent; + return threadpool_queue_work(tp, workqueue_do_ecdh, handle_reply, w) != NULL; + } +} + +static void +replysock_readable_cb(tor_socket_t sock, short what, void *arg) +{ + threadpool_t *tp = arg; + replyqueue_t *rq = threadpool_get_replyqueue(tp); + + int old_r = n_received; + (void) sock; + (void) what; + + replyqueue_process(rq); + if (old_r == n_received) + return; + + if (opt_verbose) + printf("%d / %d\n", n_received, n_sent); +#ifdef TRACK_RESPONSES + tor_mutex_acquire(&bitmap_mutex); + for (i = 0; i < opt_n_items; ++i) { + if (bitarray_is_set(received, i)) + putc('o', stdout); + else if (bitarray_is_set(handled, i)) + putc('!', stdout); + else + putc('.', stdout); + } + puts(""); + tor_mutex_release(&bitmap_mutex); +#endif + + if (n_sent - n_received < opt_n_lowwater) { + while (n_sent < n_received + opt_n_inflight && n_sent < opt_n_items) { + if (! add_work(tp)) { + puts("Couldn't add work."); + tor_event_base_loopexit(tor_libevent_get_base(), NULL); + } + } + } + + if (n_received == n_sent && n_sent >= opt_n_items) { + tor_event_base_loopexit(tor_libevent_get_base(), NULL); + } +} + +static void +help(void) +{ + puts( + "Options:\n" + " -N <items> Run this many items of work\n" + " -T <threads> Use this many threads\n" + " -I <inflight> Have no more than this many requests queued at once\n" + " -L <lowwater> Add items whenever fewer than this many are pending.\n" + " -R <ratio> Make one out of this many items be a slow (RSA) one"); +} + +int +main(int argc, char **argv) +{ + replyqueue_t *rq; + threadpool_t *tp; + int i; + tor_libevent_cfg evcfg; + struct event *ev; + + for (i = 1; i < argc; ++i) { + if (!strcmp(argv[i], "-v")) { + opt_verbose = 1; + } else if (!strcmp(argv[i], "-T") && i+1<argc) { + opt_n_threads = atoi(argv[++i]); + } else if (!strcmp(argv[i], "-N") && i+1<argc) { + opt_n_items = atoi(argv[++i]); + } else if (!strcmp(argv[i], "-I") && i+1<argc) { + opt_n_inflight = atoi(argv[++i]); + } else if (!strcmp(argv[i], "-L") && i+1<argc) { + opt_n_lowwater = atoi(argv[++i]); + } else if (!strcmp(argv[i], "-R") && i+1<argc) { + opt_ratio_rsa = atoi(argv[++i]); + } else if (!strcmp(argv[i], "-h")) { + help(); + return 0; + } else { + help(); + return 1; + } + } + if (opt_n_threads < 1 || + opt_n_items < 1 || opt_n_inflight < 1 || opt_n_lowwater < 0 || + opt_ratio_rsa < 0) { + help(); + return 1; + } + + init_logging(1); + crypto_global_init(1, NULL, NULL); + crypto_seed_rng(1); + + rq = replyqueue_new(); + tor_assert(rq); + tp = threadpool_new(opt_n_threads, + rq, new_state, free_state, NULL); + tor_assert(tp); + + crypto_seed_weak_rng(&weak_rng); + + memset(&evcfg, 0, sizeof(evcfg)); + tor_libevent_initialize(&evcfg); + + ev = tor_event_new(tor_libevent_get_base(), + replyqueue_get_socket(rq), EV_READ|EV_PERSIST, + replysock_readable_cb, tp); + + event_add(ev, NULL); + +#ifdef TRACK_RESPONSES + handled = bitarray_init_zero(opt_n_items); + received = bitarray_init_zero(opt_n_items); + tor_mutex_init(&bitmap_mutex); + handled_len = opt_n_items; +#endif + + for (i = 0; i < opt_n_inflight; ++i) { + if (! add_work(tp)) { + puts("Couldn't add work."); + return 1; + } + } + + { + struct timeval limit = { 30, 0 }; + tor_event_base_loopexit(tor_libevent_get_base(), &limit); + } + + event_base_loop(tor_libevent_get_base(), 0); + + if (n_sent != opt_n_items || n_received != n_sent) { + puts("FAIL"); + return 1; + } else { + puts("OK"); + return 0; + } +}