tor-commits
Threads by month
- ----- 2025 -----
- May
- April
- March
- February
- January
- ----- 2024 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2023 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2022 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2021 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2020 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2019 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2018 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2017 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2016 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2015 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2014 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2013 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2012 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2011 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
January 2015
- 20 participants
- 934 discussions

[tor/master] Refactor cpuworker to use workqueue/threadpool code.
by nickm@torproject.org 21 Jan '15
by nickm@torproject.org 21 Jan '15
21 Jan '15
commit 1e896214e7eb5ede65663486291252b171e9daea
Author: Nick Mathewson <nickm(a)torproject.org>
Date: Wed Oct 2 12:32:09 2013 -0400
Refactor cpuworker to use workqueue/threadpool code.
---
changes/better_workqueues | 10 +
src/common/workqueue.c | 4 +-
src/common/workqueue.h | 2 +-
src/or/command.c | 2 +-
src/or/config.c | 2 +-
src/or/connection.c | 24 +-
src/or/cpuworker.c | 532 +++++++++++++++++----------------------------
src/or/cpuworker.h | 10 +-
src/or/main.c | 6 +-
src/or/onion.c | 19 +-
src/or/onion.h | 4 +-
src/or/or.h | 17 +-
12 files changed, 238 insertions(+), 394 deletions(-)
diff --git a/changes/better_workqueues b/changes/better_workqueues
new file mode 100644
index 0000000..32c984c
--- /dev/null
+++ b/changes/better_workqueues
@@ -0,0 +1,10 @@
+ o Major features:
+ - Refactor the CPU worker implementation for better performance by
+ avoiding the kernel and lengthening pipelines. The original
+ implementation used sockets to transfer data from the main thread
+ to the worker threads, and didn't allow any thread to be assigned
+ more than a single piece of work at once. The new implementation
+ avoids communications overhead by making requests in shared
+ memory, avoiding kernel IO where possible, and keeping more
+ request in flight at once. Resolves issue #9682.
+
diff --git a/src/common/workqueue.c b/src/common/workqueue.c
index 44cf98d..f3ef678 100644
--- a/src/common/workqueue.c
+++ b/src/common/workqueue.c
@@ -108,6 +108,7 @@ workqueue_entry_free(workqueue_entry_t *ent)
{
if (!ent)
return;
+ memset(ent, 0xf0, sizeof(*ent));
tor_free(ent);
}
@@ -310,7 +311,7 @@ threadpool_queue_work(threadpool_t *pool,
*/
int
threadpool_queue_for_all(threadpool_t *pool,
- void *(*dup_fn)(const void *),
+ void *(*dup_fn)(void *),
int (*fn)(void *, void *),
void (*reply_fn)(void *),
void *arg)
@@ -444,6 +445,7 @@ replyqueue_process(replyqueue_t *queue)
workqueue_entry_t *work = TOR_TAILQ_FIRST(&queue->answers);
TOR_TAILQ_REMOVE(&queue->answers, work, next_work);
tor_mutex_release(&queue->lock);
+ work->on_thread = NULL;
work->reply_fn(work->arg);
workqueue_entry_free(work);
diff --git a/src/common/workqueue.h b/src/common/workqueue.h
index aa8620d..aa1bcc5 100644
--- a/src/common/workqueue.h
+++ b/src/common/workqueue.h
@@ -28,7 +28,7 @@ workqueue_entry_t *threadpool_queue_work(threadpool_t *pool,
void (*reply_fn)(void *),
void *arg);
int threadpool_queue_for_all(threadpool_t *pool,
- void *(*dup_fn)(const void *),
+ void *(*dup_fn)(void *),
int (*fn)(void *, void *),
void (*reply_fn)(void *),
void *arg);
diff --git a/src/or/command.c b/src/or/command.c
index 6dde2a9..c4a0f9b 100644
--- a/src/or/command.c
+++ b/src/or/command.c
@@ -310,7 +310,7 @@ command_process_create_cell(cell_t *cell, channel_t *chan)
/* hand it off to the cpuworkers, and then return. */
if (connection_or_digest_is_known_relay(chan->identity_digest))
rep_hist_note_circuit_handshake_requested(create_cell->handshake_type);
- if (assign_onionskin_to_cpuworker(NULL, circ, create_cell) < 0) {
+ if (assign_onionskin_to_cpuworker(circ, create_cell) < 0) {
log_debug(LD_GENERAL,"Failed to hand off onionskin. Closing.");
circuit_mark_for_close(TO_CIRCUIT(circ), END_CIRC_REASON_RESOURCELIMIT);
return;
diff --git a/src/or/config.c b/src/or/config.c
index 5db065f..5b8560b 100644
--- a/src/or/config.c
+++ b/src/or/config.c
@@ -1729,7 +1729,7 @@ options_act(const or_options_t *old_options)
if (have_completed_a_circuit() || !any_predicted_circuits(time(NULL)))
inform_testing_reachability();
}
- cpuworkers_rotate();
+ cpuworkers_rotate_keyinfo();
if (dns_reset())
return -1;
} else {
diff --git a/src/or/connection.c b/src/or/connection.c
index 11ff224..1b7426b 100644
--- a/src/or/connection.c
+++ b/src/or/connection.c
@@ -29,7 +29,6 @@
#include "connection_edge.h"
#include "connection_or.h"
#include "control.h"
-#include "cpuworker.h"
#include "directory.h"
#include "dirserv.h"
#include "dns.h"
@@ -130,7 +129,6 @@ conn_type_to_string(int type)
case CONN_TYPE_AP: return "Socks";
case CONN_TYPE_DIR_LISTENER: return "Directory listener";
case CONN_TYPE_DIR: return "Directory";
- case CONN_TYPE_CPUWORKER: return "CPU worker";
case CONN_TYPE_CONTROL_LISTENER: return "Control listener";
case CONN_TYPE_CONTROL: return "Control";
case CONN_TYPE_EXT_OR: return "Extended OR";
@@ -213,12 +211,6 @@ conn_state_to_string(int type, int state)
case DIR_CONN_STATE_SERVER_WRITING: return "writing";
}
break;
- case CONN_TYPE_CPUWORKER:
- switch (state) {
- case CPUWORKER_STATE_IDLE: return "idle";
- case CPUWORKER_STATE_BUSY_ONION: return "busy with onion";
- }
- break;
case CONN_TYPE_CONTROL:
switch (state) {
case CONTROL_CONN_STATE_OPEN: return "open (protocol v1)";
@@ -248,7 +240,6 @@ connection_type_uses_bufferevent(connection_t *conn)
case CONN_TYPE_CONTROL:
case CONN_TYPE_OR:
case CONN_TYPE_EXT_OR:
- case CONN_TYPE_CPUWORKER:
return 1;
default:
return 0;
@@ -2436,7 +2427,6 @@ connection_mark_all_noncontrol_connections(void)
if (conn->marked_for_close)
continue;
switch (conn->type) {
- case CONN_TYPE_CPUWORKER:
case CONN_TYPE_CONTROL_LISTENER:
case CONN_TYPE_CONTROL:
break;
@@ -4530,8 +4520,6 @@ connection_process_inbuf(connection_t *conn, int package_partial)
package_partial);
case CONN_TYPE_DIR:
return connection_dir_process_inbuf(TO_DIR_CONN(conn));
- case CONN_TYPE_CPUWORKER:
- return connection_cpu_process_inbuf(conn);
case CONN_TYPE_CONTROL:
return connection_control_process_inbuf(TO_CONTROL_CONN(conn));
default:
@@ -4591,8 +4579,6 @@ connection_finished_flushing(connection_t *conn)
return connection_edge_finished_flushing(TO_EDGE_CONN(conn));
case CONN_TYPE_DIR:
return connection_dir_finished_flushing(TO_DIR_CONN(conn));
- case CONN_TYPE_CPUWORKER:
- return connection_cpu_finished_flushing(conn);
case CONN_TYPE_CONTROL:
return connection_control_finished_flushing(TO_CONTROL_CONN(conn));
default:
@@ -4648,8 +4634,6 @@ connection_reached_eof(connection_t *conn)
return connection_edge_reached_eof(TO_EDGE_CONN(conn));
case CONN_TYPE_DIR:
return connection_dir_reached_eof(TO_DIR_CONN(conn));
- case CONN_TYPE_CPUWORKER:
- return connection_cpu_reached_eof(conn);
case CONN_TYPE_CONTROL:
return connection_control_reached_eof(TO_CONTROL_CONN(conn));
default:
@@ -4855,10 +4839,6 @@ assert_connection_ok(connection_t *conn, time_t now)
tor_assert(conn->purpose >= DIR_PURPOSE_MIN_);
tor_assert(conn->purpose <= DIR_PURPOSE_MAX_);
break;
- case CONN_TYPE_CPUWORKER:
- tor_assert(conn->state >= CPUWORKER_STATE_MIN_);
- tor_assert(conn->state <= CPUWORKER_STATE_MAX_);
- break;
case CONN_TYPE_CONTROL:
tor_assert(conn->state >= CONTROL_CONN_STATE_MIN_);
tor_assert(conn->state <= CONTROL_CONN_STATE_MAX_);
@@ -4959,9 +4939,7 @@ proxy_type_to_string(int proxy_type)
}
/** Call connection_free_() on every connection in our array, and release all
- * storage held by connection.c. This is used by cpuworkers and dnsworkers
- * when they fork, so they don't keep resources held open (especially
- * sockets).
+ * storage held by connection.c.
*
* Don't do the checks in connection_free(), because they will
* fail.
diff --git a/src/or/cpuworker.c b/src/or/cpuworker.c
index 340fbec..f3f275d 100644
--- a/src/or/cpuworker.c
+++ b/src/or/cpuworker.c
@@ -5,84 +5,98 @@
/**
* \file cpuworker.c
- * \brief Implements a farm of 'CPU worker' processes to perform
- * CPU-intensive tasks in another thread or process, to not
- * interrupt the main thread.
+ * \brief Uses the workqueue/threadpool code to farm CPU-intensive activities
+ * out to subprocesses.
*
* Right now, we only use this for processing onionskins.
**/
#include "or.h"
-#include "buffers.h"
#include "channel.h"
-#include "channeltls.h"
#include "circuitbuild.h"
#include "circuitlist.h"
-#include "config.h"
-#include "connection.h"
#include "connection_or.h"
+#include "config.h"
#include "cpuworker.h"
#include "main.h"
#include "onion.h"
#include "rephist.h"
#include "router.h"
+#include "workqueue.h"
-/** The maximum number of cpuworker processes we will keep around. */
-#define MAX_CPUWORKERS 16
-/** The minimum number of cpuworker processes we will keep around. */
-#define MIN_CPUWORKERS 1
-
-/** The tag specifies which circuit this onionskin was from. */
-#define TAG_LEN 12
+#ifdef HAVE_EVENT2_EVENT_H
+#include <event2/event.h>
+#else
+#include <event.h>
+#endif
-/** How many cpuworkers we have running right now. */
-static int num_cpuworkers=0;
-/** How many of the running cpuworkers have an assigned task right now. */
-static int num_cpuworkers_busy=0;
-/** We need to spawn new cpuworkers whenever we rotate the onion keys
- * on platforms where execution contexts==processes. This variable stores
- * the last time we got a key rotation event. */
-static time_t last_rotation_time=0;
+static void queue_pending_tasks(void);
-static void cpuworker_main(void *data) ATTR_NORETURN;
-static int spawn_cpuworker(void);
-static void spawn_enough_cpuworkers(void);
-static void process_pending_task(connection_t *cpuworker);
+typedef struct worker_state_s {
+ int generation;
+ server_onion_keys_t *onion_keys;
+} worker_state_t;
-/** Initialize the cpuworker subsystem.
- */
-void
-cpu_init(void)
+static void *
+worker_state_new(void *arg)
{
- cpuworkers_rotate();
+ worker_state_t *ws;
+ (void)arg;
+ ws = tor_malloc_zero(sizeof(worker_state_t));
+ ws->onion_keys = server_onion_keys_new();
+ return ws;
}
-
-/** Called when we're done sending a request to a cpuworker. */
-int
-connection_cpu_finished_flushing(connection_t *conn)
+static void
+worker_state_free(void *arg)
{
- tor_assert(conn);
- tor_assert(conn->type == CONN_TYPE_CPUWORKER);
- return 0;
+ worker_state_t *ws = arg;
+ server_onion_keys_free(ws->onion_keys);
+ tor_free(ws);
}
-/** Pack global_id and circ_id; set *tag to the result. (See note on
- * cpuworker_main for wire format.) */
+static replyqueue_t *replyqueue = NULL;
+static threadpool_t *threadpool = NULL;
+static struct event *reply_event = NULL;
+
+static tor_weak_rng_t request_sample_rng = TOR_WEAK_RNG_INIT;
+
+static int total_pending_tasks = 0;
+static int max_pending_tasks = 128;
+
static void
-tag_pack(uint8_t *tag, uint64_t chan_id, circid_t circ_id)
+replyqueue_process_cb(evutil_socket_t sock, short events, void *arg)
{
- /*XXXX RETHINK THIS WHOLE MESS !!!! !NM NM NM NM*/
- /*XXXX DOUBLEPLUSTHIS!!!! AS AS AS AS*/
- set_uint64(tag, chan_id);
- set_uint32(tag+8, circ_id);
+ replyqueue_t *rq = arg;
+ (void) sock;
+ (void) events;
+ replyqueue_process(rq);
}
-/** Unpack <b>tag</b> into addr, port, and circ_id.
+/** Initialize the cpuworker subsystem.
*/
-static void
-tag_unpack(const uint8_t *tag, uint64_t *chan_id, circid_t *circ_id)
+void
+cpu_init(void)
{
- *chan_id = get_uint64(tag);
- *circ_id = get_uint32(tag+8);
+ if (!replyqueue) {
+ replyqueue = replyqueue_new(0);
+ }
+ if (!reply_event) {
+ reply_event = tor_event_new(tor_libevent_get_base(),
+ replyqueue_get_socket(replyqueue),
+ EV_READ|EV_PERSIST,
+ replyqueue_process_cb,
+ replyqueue);
+ event_add(reply_event, NULL);
+ }
+ if (!threadpool) {
+ threadpool = threadpool_new(get_num_cpus(get_options()),
+ replyqueue,
+ worker_state_new,
+ worker_state_free,
+ NULL);
+ }
+ /* Total voodoo. Can we make this more sensible? */
+ max_pending_tasks = get_num_cpus(get_options()) * 64;
+ crypto_seed_weak_rng(&request_sample_rng);
}
/** Magic numbers to make sure our cpuworker_requests don't grow any
@@ -94,10 +108,6 @@ tag_unpack(const uint8_t *tag, uint64_t *chan_id, circid_t *circ_id)
typedef struct cpuworker_request_t {
/** Magic number; must be CPUWORKER_REQUEST_MAGIC. */
uint32_t magic;
- /** Opaque tag to identify the job */
- uint8_t tag[TAG_LEN];
- /** Task code. Must be one of CPUWORKER_TASK_* */
- uint8_t task;
/** Flag: Are we timing this request? */
unsigned timed : 1;
@@ -114,8 +124,7 @@ typedef struct cpuworker_request_t {
typedef struct cpuworker_reply_t {
/** Magic number; must be CPUWORKER_REPLY_MAGIC. */
uint32_t magic;
- /** Opaque tag to identify the job; matches the request's tag.*/
- uint8_t tag[TAG_LEN];
+
/** True iff we got a successful request. */
uint8_t success;
@@ -142,42 +151,46 @@ typedef struct cpuworker_reply_t {
uint8_t rend_auth_material[DIGEST_LEN];
} cpuworker_reply_t;
+typedef struct cpuworker_job_u {
+ uint64_t chan_id;
+ uint32_t circ_id;
+ union {
+ cpuworker_request_t request;
+ cpuworker_reply_t reply;
+ } u;
+} cpuworker_job_t;
+
+static int
+update_state_threadfn(void *state_, void *work_)
+{
+ worker_state_t *state = state_;
+ worker_state_t *update = work_;
+ server_onion_keys_free(state->onion_keys);
+ state->onion_keys = update->onion_keys;
+ update->onion_keys = NULL;
+ ++state->generation;
+ return WQ_RPL_REPLY;
+}
+static void
+update_state_replyfn(void *work_)
+{
+ tor_free(work_);
+}
+
/** Called when the onion key has changed and we need to spawn new
* cpuworkers. Close all currently idle cpuworkers, and mark the last
* rotation time as now.
*/
void
-cpuworkers_rotate(void)
+cpuworkers_rotate_keyinfo(void)
{
- connection_t *cpuworker;
- while ((cpuworker = connection_get_by_type_state(CONN_TYPE_CPUWORKER,
- CPUWORKER_STATE_IDLE))) {
- connection_mark_for_close(cpuworker);
- --num_cpuworkers;
+ if (threadpool_queue_for_all(threadpool,
+ worker_state_new,
+ update_state_threadfn,
+ update_state_replyfn,
+ NULL)) {
+ log_warn(LD_OR, "Failed to queue key update for worker threads.");
}
- last_rotation_time = time(NULL);
- if (server_mode(get_options()))
- spawn_enough_cpuworkers();
-}
-
-/** If the cpuworker closes the connection,
- * mark it as closed and spawn a new one as needed. */
-int
-connection_cpu_reached_eof(connection_t *conn)
-{
- log_warn(LD_GENERAL,"Read eof. CPU worker died unexpectedly.");
- if (conn->state != CPUWORKER_STATE_IDLE) {
- /* the circ associated with this cpuworker will have to wait until
- * it gets culled in run_connection_housekeeping(), since we have
- * no way to find out which circ it was. */
- log_warn(LD_GENERAL,"...and it left a circuit queued; abandoning circ.");
- num_cpuworkers_busy--;
- }
- num_cpuworkers--;
- spawn_enough_cpuworkers(); /* try to regrow. hope we don't end up
- spinning. */
- connection_mark_for_close(conn);
- return 0;
}
/** Indexed by handshake type: how many onionskins have we processed and
@@ -197,8 +210,6 @@ static uint64_t onionskins_usec_roundtrip[MAX_ONION_HANDSHAKE_TYPE+1];
* time. (microseconds) */
#define MAX_BELIEVABLE_ONIONSKIN_DELAY (2*1000*1000)
-static tor_weak_rng_t request_sample_rng = TOR_WEAK_RNG_INIT;
-
/** Return true iff we'd like to measure a handshake of type
* <b>onionskin_type</b>. Call only from the main thread. */
static int
@@ -286,31 +297,22 @@ cpuworker_log_onionskin_overhead(int severity, int onionskin_type,
onionskin_type_name, (unsigned)overhead, relative_overhead*100);
}
-/** Called when we get data from a cpuworker. If the answer is not complete,
- * wait for a complete answer. If the answer is complete,
- * process it as appropriate.
- */
-int
-connection_cpu_process_inbuf(connection_t *conn)
+/** */
+static void
+cpuworker_onion_handshake_replyfn(void *work_)
{
+ cpuworker_job_t *job = work_;
+ cpuworker_reply_t rpl;
uint64_t chan_id;
circid_t circ_id;
channel_t *p_chan = NULL;
- circuit_t *circ;
-
- tor_assert(conn);
- tor_assert(conn->type == CONN_TYPE_CPUWORKER);
+ circuit_t *circ = NULL;
- if (!connection_get_inbuf_len(conn))
- return 0;
-
- if (conn->state == CPUWORKER_STATE_BUSY_ONION) {
- cpuworker_reply_t rpl;
- if (connection_get_inbuf_len(conn) < sizeof(cpuworker_reply_t))
- return 0; /* not yet */
- tor_assert(connection_get_inbuf_len(conn) == sizeof(cpuworker_reply_t));
+ --total_pending_tasks;
- connection_fetch_from_buf((void*)&rpl,sizeof(cpuworker_reply_t),conn);
+ if (1) {
+ /* Could avoid this, but doesn't matter. */
+ memcpy(&rpl, &job->u.reply, sizeof(rpl));
tor_assert(rpl.magic == CPUWORKER_REPLY_MAGIC);
@@ -337,18 +339,21 @@ connection_cpu_process_inbuf(connection_t *conn)
}
}
}
- /* parse out the circ it was talking about */
- tag_unpack(rpl.tag, &chan_id, &circ_id);
- circ = NULL;
- log_debug(LD_OR,
- "Unpacking cpuworker reply, chan_id is " U64_FORMAT
- ", circ_id is %u",
- U64_PRINTF_ARG(chan_id), (unsigned)circ_id);
+ /* Find the circ it was talking about */
+ chan_id = job->chan_id;
+ circ_id = job->circ_id;
+
p_chan = channel_find_by_global_id(chan_id);
if (p_chan)
circ = circuit_get_by_circid_channel(circ_id, p_chan);
+ log_debug(LD_OR,
+ "Unpacking cpuworker reply %p, chan_id is " U64_FORMAT
+ ", circ_id is %u, p_chan=%p, circ=%p, success=%d",
+ job, U64_PRINTF_ARG(chan_id), (unsigned)circ_id,
+ p_chan, circ, rpl.success);
+
if (rpl.success == 0) {
log_debug(LD_OR,
"decoding onionskin failed. "
@@ -367,6 +372,7 @@ connection_cpu_process_inbuf(connection_t *conn)
goto done_processing;
}
tor_assert(! CIRCUIT_IS_ORIGIN(circ));
+ TO_OR_CIRCUIT(circ)->workqueue_entry = NULL;
if (onionskin_answer(TO_OR_CIRCUIT(circ),
&rpl.created_cell,
(const char*)rpl.keys,
@@ -376,58 +382,33 @@ connection_cpu_process_inbuf(connection_t *conn)
goto done_processing;
}
log_debug(LD_OR,"onionskin_answer succeeded. Yay.");
- } else {
- tor_assert(0); /* don't ask me to do handshakes yet */
}
done_processing:
- conn->state = CPUWORKER_STATE_IDLE;
- num_cpuworkers_busy--;
- if (conn->timestamp_created < last_rotation_time) {
- connection_mark_for_close(conn);
- num_cpuworkers--;
- spawn_enough_cpuworkers();
- } else {
- process_pending_task(conn);
- }
- return 0;
+ memwipe(&rpl, 0, sizeof(rpl));
+ memwipe(job, 0, sizeof(*job));
+ tor_free(job);
+ queue_pending_tasks();
}
-/** Implement a cpuworker. 'data' is an fdarray as returned by socketpair.
- * Read and writes from fdarray[1]. Reads requests, writes answers.
- *
- * Request format:
- * cpuworker_request_t.
- * Response format:
- * cpuworker_reply_t
- */
-static void
-cpuworker_main(void *data)
+/** Implementation function for onion handshake requests. */
+static int
+cpuworker_onion_handshake_threadfn(void *state_, void *work_)
{
- /* For talking to the parent thread/process */
- tor_socket_t *fdarray = data;
- tor_socket_t fd;
+ worker_state_t *state = state_;
+ cpuworker_job_t *job = work_;
/* variables for onion processing */
- server_onion_keys_t onion_keys;
+ server_onion_keys_t *onion_keys = state->onion_keys;
cpuworker_request_t req;
cpuworker_reply_t rpl;
- fd = fdarray[1]; /* this side is ours */
- tor_free(data);
-
- setup_server_onion_keys(&onion_keys);
-
- for (;;) {
- if (read_all(fd, (void *)&req, sizeof(req), 1) != sizeof(req)) {
- log_info(LD_OR, "read request failed. Exiting.");
- goto end;
- }
- tor_assert(req.magic == CPUWORKER_REQUEST_MAGIC);
+ memcpy(&req, &job->u.request, sizeof(req));
- memset(&rpl, 0, sizeof(rpl));
+ tor_assert(req.magic == CPUWORKER_REQUEST_MAGIC);
+ memset(&rpl, 0, sizeof(rpl));
- if (req.task == CPUWORKER_TASK_ONION) {
+ if (1) {
const create_cell_t *cc = &req.create_cell;
created_cell_t *cell_out = &rpl.created_cell;
struct timeval tv_start = {0,0}, tv_end;
@@ -439,7 +420,7 @@ cpuworker_main(void *data)
tor_gettimeofday(&tv_start);
n = onion_skin_server_handshake(cc->handshake_type,
cc->onionskin, cc->handshake_len,
- &onion_keys,
+ onion_keys,
cell_out->reply,
rpl.keys, CPATH_KEY_MATERIAL_LEN,
rpl.rend_auth_material);
@@ -447,12 +428,10 @@ cpuworker_main(void *data)
/* failure */
log_debug(LD_OR,"onion_skin_server_handshake failed.");
memset(&rpl, 0, sizeof(rpl));
- memcpy(rpl.tag, req.tag, TAG_LEN);
rpl.success = 0;
} else {
/* success */
log_debug(LD_OR,"onion_skin_server_handshake succeeded.");
- memcpy(rpl.tag, req.tag, TAG_LEN);
cell_out->handshake_len = n;
switch (cc->cell_type) {
case CELL_CREATE:
@@ -463,7 +442,7 @@ cpuworker_main(void *data)
cell_out->cell_type = CELL_CREATED_FAST; break;
default:
tor_assert(0);
- goto end;
+ return WQ_RPL_SHUTDOWN;
}
rpl.success = 1;
}
@@ -479,187 +458,55 @@ cpuworker_main(void *data)
else
rpl.n_usec = (uint32_t) usec;
}
- if (write_all(fd, (void*)&rpl, sizeof(rpl), 1) != sizeof(rpl)) {
- log_err(LD_BUG,"writing response buf failed. Exiting.");
- goto end;
- }
- log_debug(LD_OR,"finished writing response.");
- } else if (req.task == CPUWORKER_TASK_SHUTDOWN) {
- log_info(LD_OR,"Clean shutdown: exiting");
- goto end;
- }
- memwipe(&req, 0, sizeof(req));
- memwipe(&rpl, 0, sizeof(req));
- }
- end:
- memwipe(&req, 0, sizeof(req));
- memwipe(&rpl, 0, sizeof(req));
- release_server_onion_keys(&onion_keys);
- tor_close_socket(fd);
- crypto_thread_cleanup();
- spawn_exit();
-}
-
-/** Launch a new cpuworker. Return 0 if we're happy, -1 if we failed.
- */
-static int
-spawn_cpuworker(void)
-{
- tor_socket_t *fdarray;
- tor_socket_t fd;
- connection_t *conn;
- int err;
-
- fdarray = tor_calloc(2, sizeof(tor_socket_t));
- if ((err = tor_socketpair(AF_UNIX, SOCK_STREAM, 0, fdarray)) < 0) {
- log_warn(LD_NET, "Couldn't construct socketpair for cpuworker: %s",
- tor_socket_strerror(-err));
- tor_free(fdarray);
- return -1;
- }
-
- tor_assert(SOCKET_OK(fdarray[0]));
- tor_assert(SOCKET_OK(fdarray[1]));
-
- fd = fdarray[0];
- if (spawn_func(cpuworker_main, (void*)fdarray) < 0) {
- tor_close_socket(fdarray[0]);
- tor_close_socket(fdarray[1]);
- tor_free(fdarray);
- return -1;
}
- log_debug(LD_OR,"just spawned a cpu worker.");
-
- conn = connection_new(CONN_TYPE_CPUWORKER, AF_UNIX);
-
- /* set up conn so it's got all the data we need to remember */
- conn->s = fd;
- conn->address = tor_strdup("localhost");
- tor_addr_make_unspec(&conn->addr);
-
- if (set_socket_nonblocking(fd) == -1) {
- connection_free(conn); /* this closes fd */
- return -1;
- }
-
- if (connection_add(conn) < 0) { /* no space, forget it */
- log_warn(LD_NET,"connection_add for cpuworker failed. Giving up.");
- connection_free(conn); /* this closes fd */
- return -1;
- }
-
- conn->state = CPUWORKER_STATE_IDLE;
- connection_start_reading(conn);
- return 0; /* success */
-}
-
-/** If we have too few or too many active cpuworkers, try to spawn new ones
- * or kill idle ones.
- */
-static void
-spawn_enough_cpuworkers(void)
-{
- int num_cpuworkers_needed = get_num_cpus(get_options());
- int reseed = 0;
-
- if (num_cpuworkers_needed < MIN_CPUWORKERS)
- num_cpuworkers_needed = MIN_CPUWORKERS;
- if (num_cpuworkers_needed > MAX_CPUWORKERS)
- num_cpuworkers_needed = MAX_CPUWORKERS;
-
- while (num_cpuworkers < num_cpuworkers_needed) {
- if (spawn_cpuworker() < 0) {
- log_warn(LD_GENERAL,"Cpuworker spawn failed. Will try again later.");
- return;
- }
- num_cpuworkers++;
- reseed++;
- }
+ memcpy(&job->u.reply, &rpl, sizeof(rpl));
- if (reseed)
- crypto_seed_weak_rng(&request_sample_rng);
+ memwipe(&req, 0, sizeof(req));
+ memwipe(&rpl, 0, sizeof(req));
+ return WQ_RPL_REPLY;
}
-/** Take a pending task from the queue and assign it to 'cpuworker'. */
+/** Take pending tasks from the queue and assign them to cpuworkers. */
static void
-process_pending_task(connection_t *cpuworker)
+queue_pending_tasks(void)
{
or_circuit_t *circ;
create_cell_t *onionskin = NULL;
- tor_assert(cpuworker);
+ while (total_pending_tasks < max_pending_tasks) {
+ circ = onion_next_task(&onionskin);
- /* for now only process onion tasks */
-
- circ = onion_next_task(&onionskin);
- if (!circ)
- return;
- if (assign_onionskin_to_cpuworker(cpuworker, circ, onionskin))
- log_warn(LD_OR,"assign_to_cpuworker failed. Ignoring.");
-}
-
-/** How long should we let a cpuworker stay busy before we give
- * up on it and decide that we have a bug or infinite loop?
- * This value is high because some servers with low memory/cpu
- * sometimes spend an hour or more swapping, and Tor starves. */
-#define CPUWORKER_BUSY_TIMEOUT (60*60*12)
+ if (!circ)
+ return;
-/** We have a bug that I can't find. Sometimes, very rarely, cpuworkers get
- * stuck in the 'busy' state, even though the cpuworker process thinks of
- * itself as idle. I don't know why. But here's a workaround to kill any
- * cpuworker that's been busy for more than CPUWORKER_BUSY_TIMEOUT.
- */
-static void
-cull_wedged_cpuworkers(void)
-{
- time_t now = time(NULL);
- smartlist_t *conns = get_connection_array();
- SMARTLIST_FOREACH_BEGIN(conns, connection_t *, conn) {
- if (!conn->marked_for_close &&
- conn->type == CONN_TYPE_CPUWORKER &&
- conn->state == CPUWORKER_STATE_BUSY_ONION &&
- conn->timestamp_lastwritten + CPUWORKER_BUSY_TIMEOUT < now) {
- log_notice(LD_BUG,
- "closing wedged cpuworker. Can somebody find the bug?");
- num_cpuworkers_busy--;
- num_cpuworkers--;
- connection_mark_for_close(conn);
- }
- } SMARTLIST_FOREACH_END(conn);
+ if (assign_onionskin_to_cpuworker(circ, onionskin))
+ log_warn(LD_OR,"assign_to_cpuworker failed. Ignoring.");
+ }
}
/** Try to tell a cpuworker to perform the public key operations necessary to
* respond to <b>onionskin</b> for the circuit <b>circ</b>.
*
- * If <b>cpuworker</b> is defined, assert that he's idle, and use him. Else,
- * look for an idle cpuworker and use him. If none idle, queue task onto the
- * pending onion list and return. Return 0 if we successfully assign the
- * task, or -1 on failure.
+ * Return 0 if we successfully assign the task, or -1 on failure.
*/
int
-assign_onionskin_to_cpuworker(connection_t *cpuworker,
- or_circuit_t *circ,
+assign_onionskin_to_cpuworker(or_circuit_t *circ,
create_cell_t *onionskin)
{
+ workqueue_entry_t *queue_entry;
+ cpuworker_job_t *job;
cpuworker_request_t req;
- time_t now = approx_time();
- static time_t last_culled_cpuworkers = 0;
int should_time;
- /* Checking for wedged cpuworkers requires a linear search over all
- * connections, so let's do it only once a minute.
- */
-#define CULL_CPUWORKERS_INTERVAL 60
-
- if (last_culled_cpuworkers + CULL_CPUWORKERS_INTERVAL <= now) {
- cull_wedged_cpuworkers();
- spawn_enough_cpuworkers();
- last_culled_cpuworkers = now;
- }
-
if (1) {
- if (num_cpuworkers_busy == num_cpuworkers) {
+ if (!circ->p_chan) {
+ log_info(LD_OR,"circ->p_chan gone. Failing circ.");
+ tor_free(onionskin);
+ return -1;
+ }
+
+ if (total_pending_tasks >= max_pending_tasks) {
log_debug(LD_OR,"No idle cpuworkers. Queuing.");
if (onion_pending_add(circ, onionskin) < 0) {
tor_free(onionskin);
@@ -668,36 +515,14 @@ assign_onionskin_to_cpuworker(connection_t *cpuworker,
return 0;
}
- if (!cpuworker)
- cpuworker = connection_get_by_type_state(CONN_TYPE_CPUWORKER,
- CPUWORKER_STATE_IDLE);
-
- tor_assert(cpuworker);
-
- if (!circ->p_chan) {
- log_info(LD_OR,"circ->p_chan gone. Failing circ.");
- tor_free(onionskin);
- return -1;
- }
-
if (connection_or_digest_is_known_relay(circ->p_chan->identity_digest))
rep_hist_note_circuit_handshake_assigned(onionskin->handshake_type);
should_time = should_time_request(onionskin->handshake_type);
memset(&req, 0, sizeof(req));
req.magic = CPUWORKER_REQUEST_MAGIC;
- tag_pack(req.tag, circ->p_chan->global_identifier,
- circ->p_circ_id);
req.timed = should_time;
- cpuworker->state = CPUWORKER_STATE_BUSY_ONION;
- /* touch the lastwritten timestamp, since that's how we check to
- * see how long it's been since we asked the question, and sometimes
- * we check before the first call to connection_handle_write(). */
- cpuworker->timestamp_lastwritten = now;
- num_cpuworkers_busy++;
-
- req.task = CPUWORKER_TASK_ONION;
memcpy(&req.create_cell, onionskin, sizeof(create_cell_t));
tor_free(onionskin);
@@ -705,9 +530,46 @@ assign_onionskin_to_cpuworker(connection_t *cpuworker,
if (should_time)
tor_gettimeofday(&req.started_at);
- connection_write_to_buf((void*)&req, sizeof(req), cpuworker);
+ job = tor_malloc_zero(sizeof(cpuworker_job_t));
+ job->chan_id = circ->p_chan->global_identifier;
+ job->circ_id = circ->p_circ_id;
+ memcpy(&job->u.request, &req, sizeof(req));
memwipe(&req, 0, sizeof(req));
+
+ ++total_pending_tasks;
+ queue_entry = threadpool_queue_work(threadpool,
+ cpuworker_onion_handshake_threadfn,
+ cpuworker_onion_handshake_replyfn,
+ job);
+ if (!queue_entry) {
+ log_warn(LD_BUG, "Couldn't queue work on threadpool");
+ tor_free(job);
+ return -1;
+ }
+ log_debug(LD_OR, "Queued task %p (qe=%p, chanid="U64_FORMAT", circid=%u)",
+ job, queue_entry, U64_PRINTF_ARG(job->chan_id), job->circ_id);
+
+ circ->workqueue_entry = queue_entry;
}
return 0;
}
+/** If <b>circ</b> has a pending handshake that hasn't been processed yet,
+ * remove it from the worker queue. */
+void
+cpuworker_cancel_circ_handshake(or_circuit_t *circ)
+{
+ cpuworker_job_t *job;
+ if (circ->workqueue_entry == NULL)
+ return;
+
+ job = workqueue_entry_cancel(circ->workqueue_entry);
+ if (job) {
+ /* It successfully cancelled. */
+ memwipe(job, 0xe0, sizeof(*job));
+ tor_free(job);
+ }
+
+ circ->workqueue_entry = NULL;
+}
+
diff --git a/src/or/cpuworker.h b/src/or/cpuworker.h
index 2a2b37a..70a595e 100644
--- a/src/or/cpuworker.h
+++ b/src/or/cpuworker.h
@@ -13,19 +13,17 @@
#define TOR_CPUWORKER_H
void cpu_init(void);
-void cpuworkers_rotate(void);
-int connection_cpu_finished_flushing(connection_t *conn);
-int connection_cpu_reached_eof(connection_t *conn);
-int connection_cpu_process_inbuf(connection_t *conn);
+void cpuworkers_rotate_keyinfo(void);
+
struct create_cell_t;
-int assign_onionskin_to_cpuworker(connection_t *cpuworker,
- or_circuit_t *circ,
+int assign_onionskin_to_cpuworker(or_circuit_t *circ,
struct create_cell_t *onionskin);
uint64_t estimated_usec_for_onionskins(uint32_t n_requests,
uint16_t onionskin_type);
void cpuworker_log_onionskin_overhead(int severity, int onionskin_type,
const char *onionskin_type_name);
+void cpuworker_cancel_circ_handshake(or_circuit_t *circ);
#endif
diff --git a/src/or/main.c b/src/or/main.c
index abf3230..136043c 100644
--- a/src/or/main.c
+++ b/src/or/main.c
@@ -1271,7 +1271,7 @@ run_scheduled_events(time_t now)
get_onion_key_set_at()+MIN_ONION_KEY_LIFETIME < now) {
log_info(LD_GENERAL,"Rotating onion key.");
rotate_onion_key();
- cpuworkers_rotate();
+ cpuworkers_rotate_keyinfo();
if (router_rebuild_descriptor(1)<0) {
log_info(LD_CONFIG, "Couldn't rebuild router descriptor");
}
@@ -1960,9 +1960,9 @@ do_hup(void)
* force a retry there. */
if (server_mode(options)) {
- /* Restart cpuworker and dnsworker processes, so they get up-to-date
+ /* Update cpuworker and dnsworker processes, so they get up-to-date
* configuration options. */
- cpuworkers_rotate();
+ cpuworkers_rotate_keyinfo();
dns_reset();
}
return 0;
diff --git a/src/or/onion.c b/src/or/onion.c
index 3723a3e..43fb63c 100644
--- a/src/or/onion.c
+++ b/src/or/onion.c
@@ -295,6 +295,8 @@ onion_pending_remove(or_circuit_t *circ)
victim = circ->onionqueue_entry;
if (victim)
onion_queue_entry_remove(victim);
+
+ cpuworker_cancel_circ_handshake(circ);
}
/** Remove a queue entry <b>victim</b> from the queue, unlinking it from
@@ -339,25 +341,25 @@ clear_pending_onions(void)
/* ============================================================ */
-/** Fill in a server_onion_keys_t object at <b>keys</b> with all of the keys
+/** Return a new server_onion_keys_t object with all of the keys
* and other info we might need to do onion handshakes. (We make a copy of
* our keys for each cpuworker to avoid race conditions with the main thread,
* and to avoid locking) */
-void
-setup_server_onion_keys(server_onion_keys_t *keys)
+server_onion_keys_t *
+server_onion_keys_new(void)
{
- memset(keys, 0, sizeof(server_onion_keys_t));
+ server_onion_keys_t *keys = tor_malloc_zero(sizeof(server_onion_keys_t));
memcpy(keys->my_identity, router_get_my_id_digest(), DIGEST_LEN);
dup_onion_keys(&keys->onion_key, &keys->last_onion_key);
keys->curve25519_key_map = construct_ntor_key_map();
keys->junk_keypair = tor_malloc_zero(sizeof(curve25519_keypair_t));
curve25519_keypair_generate(keys->junk_keypair, 0);
+ return keys;
}
-/** Release all storage held in <b>keys</b>, but do not free <b>keys</b>
- * itself (as it's likely to be stack-allocated.) */
+/** Release all storage held in <b>keys</b>. */
void
-release_server_onion_keys(server_onion_keys_t *keys)
+server_onion_keys_free(server_onion_keys_t *keys)
{
if (! keys)
return;
@@ -366,7 +368,8 @@ release_server_onion_keys(server_onion_keys_t *keys)
crypto_pk_free(keys->last_onion_key);
ntor_key_map_free(keys->curve25519_key_map);
tor_free(keys->junk_keypair);
- memset(keys, 0, sizeof(server_onion_keys_t));
+ memwipe(keys, 0, sizeof(server_onion_keys_t));
+ tor_free(keys);
}
/** Release whatever storage is held in <b>state</b>, depending on its
diff --git a/src/or/onion.h b/src/or/onion.h
index 3561987..9605008 100644
--- a/src/or/onion.h
+++ b/src/or/onion.h
@@ -30,8 +30,8 @@ typedef struct server_onion_keys_t {
#define MAX_ONIONSKIN_CHALLENGE_LEN 255
#define MAX_ONIONSKIN_REPLY_LEN 255
-void setup_server_onion_keys(server_onion_keys_t *keys);
-void release_server_onion_keys(server_onion_keys_t *keys);
+server_onion_keys_t *server_onion_keys_new(void);
+void server_onion_keys_free(server_onion_keys_t *keys);
void onion_handshake_state_release(onion_handshake_state_t *state);
diff --git a/src/or/or.h b/src/or/or.h
index 8a15529..4ff3555 100644
--- a/src/or/or.h
+++ b/src/or/or.h
@@ -213,8 +213,7 @@ typedef enum {
#define CONN_TYPE_DIR_LISTENER 8
/** Type for HTTP connections to the directory server. */
#define CONN_TYPE_DIR 9
-/** Connection from the main process to a CPU worker process. */
-#define CONN_TYPE_CPUWORKER 10
+/* Type 10 is unused. */
/** Type for listening for connections from user interface process. */
#define CONN_TYPE_CONTROL_LISTENER 11
/** Type for connections from user interface process. */
@@ -276,17 +275,6 @@ typedef enum {
/** State for any listener connection. */
#define LISTENER_STATE_READY 0
-#define CPUWORKER_STATE_MIN_ 1
-/** State for a connection to a cpuworker process that's idle. */
-#define CPUWORKER_STATE_IDLE 1
-/** State for a connection to a cpuworker process that's processing a
- * handshake. */
-#define CPUWORKER_STATE_BUSY_ONION 2
-#define CPUWORKER_STATE_MAX_ 2
-
-#define CPUWORKER_TASK_ONION CPUWORKER_STATE_BUSY_ONION
-#define CPUWORKER_TASK_SHUTDOWN 255
-
#define OR_CONN_STATE_MIN_ 1
/** State for a connection to an OR: waiting for connect() to finish. */
#define OR_CONN_STATE_CONNECTING 1
@@ -3158,6 +3146,9 @@ typedef struct or_circuit_t {
/** Pointer to an entry on the onion queue, if this circuit is waiting for a
* chance to give an onionskin to a cpuworker. Used only in onion.c */
struct onion_queue_t *onionqueue_entry;
+ /** Pointer to a workqueue entry, if this circuit has given an onionskin to
+ * a cpuworker and is waiting for a response. Used only in cpuworker.c */
+ struct workqueue_entry_s *workqueue_entry;
/** The circuit_id used in the previous (backward) hop of this circuit. */
circid_t p_circ_id;
1
0

[tor/master] Fix up some workqueue/threading issues spotted by dgoulet.
by nickm@torproject.org 21 Jan '15
by nickm@torproject.org 21 Jan '15
21 Jan '15
commit 3c8dabf69aa950c2df49f48aebbe02aac5b519f3
Author: Nick Mathewson <nickm(a)torproject.org>
Date: Wed Jan 21 12:22:41 2015 -0500
Fix up some workqueue/threading issues spotted by dgoulet.
---
src/common/compat_pthreads.c | 3 ++-
src/common/workqueue.c | 24 +++++-------------------
2 files changed, 7 insertions(+), 20 deletions(-)
diff --git a/src/common/compat_pthreads.c b/src/common/compat_pthreads.c
index 188a91f..c217c51 100644
--- a/src/common/compat_pthreads.c
+++ b/src/common/compat_pthreads.c
@@ -212,7 +212,8 @@ tor_cond_wait(tor_cond_t *cond, tor_mutex_t *mutex, const struct timeval *tv)
struct timespec ts;
struct timeval tvnow, tvsum;
while (1) {
- gettimeofday(&tvnow, NULL);
+ if (gettimeofday(&tvnow, NULL) < 0)
+ return -1;
timeradd(tv, &tvnow, &tvsum);
ts.tv_sec = tvsum.tv_sec;
ts.tv_nsec = tvsum.tv_usec * 1000;
diff --git a/src/common/workqueue.c b/src/common/workqueue.c
index 5ba29e3..77a4fbc 100644
--- a/src/common/workqueue.c
+++ b/src/common/workqueue.c
@@ -1,4 +1,4 @@
-/* Copyright (c) 2013-2015, The Tor Project, Inc. */
+/* copyright (c) 2013-2015, The Tor Project, Inc. */
/* See LICENSE for licensing information */
#include "orconfig.h"
@@ -81,13 +81,6 @@ typedef struct workerthread_s {
int index;
/** The pool this thread is a part of. */
struct threadpool_s *in_pool;
- /** True iff this thread is currently in its loop. (Not currently used.) */
- unsigned is_running;
- /** True iff this thread has crashed or is shut down for some reason. (Not
- * currently used.) */
- unsigned is_shut_down;
- /** True if we're waiting for more elements to get added to the queue. */
- unsigned waiting;
/** User-supplied state field that we pass to the worker functions of each
* work item. */
void *state;
@@ -181,8 +174,6 @@ worker_thread_main(void *thread_)
workqueue_entry_t *work;
int result;
- thread->is_running = 1;
-
tor_mutex_acquire(&pool->lock);
while (1) {
/* lock must be held at this point. */
@@ -198,8 +189,6 @@ worker_thread_main(void *thread_)
int r = update_fn(thread->state, arg);
if (r < 0) {
- thread->is_running = 0;
- thread->is_shut_down = 1;
return;
}
@@ -220,8 +209,6 @@ worker_thread_main(void *thread_)
/* We may need to exit the thread. */
if (result >= WQ_RPL_ERROR) {
- thread->is_running = 0;
- thread->is_shut_down = 1;
return;
}
tor_mutex_acquire(&pool->lock);
@@ -232,12 +219,9 @@ worker_thread_main(void *thread_)
/* TODO: support an idle-function */
/* Okay. Now, wait till somebody has work for us. */
- /* XXXX we could just omit waiting and instead */
- thread->waiting = 1;
if (tor_cond_wait(&pool->condition, &pool->lock, NULL) < 0) {
- /* XXXX ERROR */
+ log_warn(LD_GENERAL, "Fail tor_cond_wait.");
}
- thread->waiting = 0;
}
}
@@ -482,7 +466,9 @@ void
replyqueue_process(replyqueue_t *queue)
{
if (queue->alert.drain_fn(queue->alert.read_fd) < 0) {
- /* XXXX complain! */
+ static ratelim_t warn_limit = RATELIM_INIT(7200);
+ log_fn_ratelim(&warn_limit, LOG_WARN, LD_GENERAL,
+ "Failure from drain_fd");
}
tor_mutex_acquire(&queue->lock);
1
0

[tor/master] Update workqueue implementation to use a single queue for the work
by nickm@torproject.org 21 Jan '15
by nickm@torproject.org 21 Jan '15
21 Jan '15
commit a52e549124adb09ad0b49b7d2b5b3fb79bfe7aeb
Author: Nick Mathewson <nickm(a)torproject.org>
Date: Wed Jan 14 13:29:58 2015 -0500
Update workqueue implementation to use a single queue for the work
Previously I used one queue per worker; now I use one queue for
everyone. The "broadcast" code is gone, replaced with an idempotent
'update' operation.
---
src/common/compat_pthreads.c | 1 +
src/common/workqueue.c | 206 +++++++++++++++++++++++++-----------------
src/common/workqueue.h | 10 +-
src/or/cpuworker.c | 15 +--
src/test/test_workqueue.c | 23 +----
5 files changed, 141 insertions(+), 114 deletions(-)
diff --git a/src/common/compat_pthreads.c b/src/common/compat_pthreads.c
index 848bfe0..f434805 100644
--- a/src/common/compat_pthreads.c
+++ b/src/common/compat_pthreads.c
@@ -5,6 +5,7 @@
#include "orconfig.h"
#include <pthread.h>
+#include <signal.h>
#include "compat.h"
#include "torlog.h"
diff --git a/src/common/workqueue.c b/src/common/workqueue.c
index 7fa8967..5ba29e3 100644
--- a/src/common/workqueue.c
+++ b/src/common/workqueue.c
@@ -1,4 +1,4 @@
-/* Copyright (c) 2013, The Tor Project, Inc. */
+/* Copyright (c) 2013-2015, The Tor Project, Inc. */
/* See LICENSE for licensing information */
#include "orconfig.h"
@@ -13,8 +13,23 @@ struct threadpool_s {
/** An array of pointers to workerthread_t: one for each running worker
* thread. */
struct workerthread_s **threads;
- /** Index of the next thread that we'll give work to.*/
- int next_for_work;
+
+ /** Condition variable that we wait on when we have no work, and which
+ * gets signaled when our queue becomes nonempty. */
+ tor_cond_t condition;
+ /** Queue of pending work that we have to do. */
+ TOR_TAILQ_HEAD(, workqueue_entry_s) work;
+
+ /** The current 'update generation' of the threadpool. Any thread that is
+ * at an earlier generation needs to run the update function. */
+ unsigned generation;
+
+ /** Function that should be run for updates on each thread. */
+ int (*update_fn)(void *, void *);
+ /** Function to free update arguments if they can't be run. */
+ void (*free_update_arg_fn)(void *);
+ /** Array of n_threads update arguments. */
+ void **update_args;
/** Number of elements in threads. */
int n_threads;
@@ -34,10 +49,10 @@ struct workqueue_entry_s {
/** The next workqueue_entry_t that's pending on the same thread or
* reply queue. */
TOR_TAILQ_ENTRY(workqueue_entry_s) next_work;
- /** The thread to which this workqueue_entry_t was assigned. This field
+ /** The threadpool to which this workqueue_entry_t was assigned. This field
* is set when the workqueue_entry_t is created, and won't be cleared until
* after it's handled in the main thread. */
- struct workerthread_s *on_thread;
+ struct threadpool_s *on_pool;
/** True iff this entry is waiting for a worker to start processing it. */
uint8_t pending;
/** Function to run in the worker thread. */
@@ -62,13 +77,10 @@ struct replyqueue_s {
* contention, each gets its own queue. This breaks the guarantee that that
* queued work will get executed strictly in order. */
typedef struct workerthread_s {
- /** Lock to protect all fields of this thread and its queue. */
- tor_mutex_t lock;
- /** Condition variable that we wait on when we have no work, and which
- * gets signaled when our queue becomes nonempty. */
- tor_cond_t condition;
- /** Queue of pending work that we have to do. */
- TOR_TAILQ_HEAD(, workqueue_entry_s) work;
+ /** Which thread it this? In range 0..in_pool->n_threads-1 */
+ int index;
+ /** The pool this thread is a part of. */
+ struct threadpool_s *in_pool;
/** True iff this thread is currently in its loop. (Not currently used.) */
unsigned is_running;
/** True iff this thread has crashed or is shut down for some reason. (Not
@@ -81,6 +93,8 @@ typedef struct workerthread_s {
void *state;
/** Reply queue to which we pass our results. */
replyqueue_t *reply_queue;
+ /** The current update generation of this thread */
+ unsigned generation;
} workerthread_t;
static void queue_reply(replyqueue_t *queue, workqueue_entry_t *work);
@@ -132,13 +146,13 @@ workqueue_entry_cancel(workqueue_entry_t *ent)
{
int cancelled = 0;
void *result = NULL;
- tor_mutex_acquire(&ent->on_thread->lock);
+ tor_mutex_acquire(&ent->on_pool->lock);
if (ent->pending) {
- TOR_TAILQ_REMOVE(&ent->on_thread->work, ent, next_work);
+ TOR_TAILQ_REMOVE(&ent->on_pool->work, ent, next_work);
cancelled = 1;
result = ent->arg;
}
- tor_mutex_release(&ent->on_thread->lock);
+ tor_mutex_release(&ent->on_pool->lock);
if (cancelled) {
tor_free(ent);
@@ -146,6 +160,16 @@ workqueue_entry_cancel(workqueue_entry_t *ent)
return result;
}
+/**DOCDOC
+
+ must hold lock */
+static int
+worker_thread_has_work(workerthread_t *thread)
+{
+ return !TOR_TAILQ_EMPTY(&thread->in_pool->work) ||
+ thread->generation != thread->in_pool->generation;
+}
+
/**
* Main function for the worker thread.
*/
@@ -153,20 +177,39 @@ static void
worker_thread_main(void *thread_)
{
workerthread_t *thread = thread_;
+ threadpool_t *pool = thread->in_pool;
workqueue_entry_t *work;
int result;
- tor_mutex_acquire(&thread->lock);
thread->is_running = 1;
+
+ tor_mutex_acquire(&pool->lock);
while (1) {
/* lock must be held at this point. */
- while (!TOR_TAILQ_EMPTY(&thread->work)) {
+ while (worker_thread_has_work(thread)) {
/* lock must be held at this point. */
-
- work = TOR_TAILQ_FIRST(&thread->work);
- TOR_TAILQ_REMOVE(&thread->work, work, next_work);
+ if (thread->in_pool->generation != thread->generation) {
+ void *arg = thread->in_pool->update_args[thread->index];
+ thread->in_pool->update_args[thread->index] = NULL;
+ int (*update_fn)(void*,void*) = thread->in_pool->update_fn;
+ thread->generation = thread->in_pool->generation;
+ tor_mutex_release(&pool->lock);
+
+ int r = update_fn(thread->state, arg);
+
+ if (r < 0) {
+ thread->is_running = 0;
+ thread->is_shut_down = 1;
+ return;
+ }
+
+ tor_mutex_acquire(&pool->lock);
+ continue;
+ }
+ work = TOR_TAILQ_FIRST(&pool->work);
+ TOR_TAILQ_REMOVE(&pool->work, work, next_work);
work->pending = 0;
- tor_mutex_release(&thread->lock);
+ tor_mutex_release(&pool->lock);
/* We run the work function without holding the thread lock. This
* is the main thread's first opportunity to give us more work. */
@@ -175,25 +218,23 @@ worker_thread_main(void *thread_)
/* Queue the reply for the main thread. */
queue_reply(thread->reply_queue, work);
- tor_mutex_acquire(&thread->lock);
/* We may need to exit the thread. */
if (result >= WQ_RPL_ERROR) {
thread->is_running = 0;
thread->is_shut_down = 1;
- tor_mutex_release(&thread->lock);
return;
}
+ tor_mutex_acquire(&pool->lock);
}
/* At this point the lock is held, and there is no work in this thread's
* queue. */
- /* TODO: Try work-stealing. */
/* TODO: support an idle-function */
/* Okay. Now, wait till somebody has work for us. */
/* XXXX we could just omit waiting and instead */
thread->waiting = 1;
- if (tor_cond_wait(&thread->condition, &thread->lock, NULL) < 0) {
+ if (tor_cond_wait(&pool->condition, &pool->lock, NULL) < 0) {
/* XXXX ERROR */
}
thread->waiting = 0;
@@ -221,14 +262,12 @@ queue_reply(replyqueue_t *queue, workqueue_entry_t *work)
/** Allocate and start a new worker thread to use state object <b>state</b>,
* and send responses to <b>replyqueue</b>. */
static workerthread_t *
-workerthread_new(void *state, replyqueue_t *replyqueue)
+workerthread_new(void *state, threadpool_t *pool, replyqueue_t *replyqueue)
{
workerthread_t *thr = tor_malloc_zero(sizeof(workerthread_t));
- tor_mutex_init_for_cond(&thr->lock);
- tor_cond_init(&thr->condition);
- TOR_TAILQ_INIT(&thr->work);
thr->state = state;
thr->reply_queue = replyqueue;
+ thr->in_pool = pool;
if (spawn_func(worker_thread_main, thr) < 0) {
log_err(LD_GENERAL, "Can't launch worker thread.");
@@ -239,30 +278,6 @@ workerthread_new(void *state, replyqueue_t *replyqueue)
}
/**
- * Add an item of work to a single worker thread. See threadpool_queue_work(*)
- * for arguments.
- */
-static workqueue_entry_t *
-workerthread_queue_work(workerthread_t *worker,
- int (*fn)(void *, void *),
- void (*reply_fn)(void *),
- void *arg)
-{
- workqueue_entry_t *ent = workqueue_entry_new(fn, reply_fn, arg);
-
- tor_mutex_acquire(&worker->lock);
- ent->on_thread = worker;
- ent->pending = 1;
- TOR_TAILQ_INSERT_TAIL(&worker->work, ent, next_work);
-
- if (worker->waiting) /* XXXX inside or outside of lock?? */
- tor_cond_signal_one(&worker->condition);
-
- tor_mutex_release(&worker->lock);
- return ent;
-}
-
-/**
* Queue an item of work for a thread in a thread pool. The function
* <b>fn</b> will be run in a worker thread, and will receive as arguments the
* thread's state object, and the provided object <b>arg</b>. It must return
@@ -285,20 +300,19 @@ threadpool_queue_work(threadpool_t *pool,
void (*reply_fn)(void *),
void *arg)
{
- workerthread_t *worker;
+ workqueue_entry_t *ent = workqueue_entry_new(fn, reply_fn, arg);
+ ent->on_pool = pool;
+ ent->pending = 1;
tor_mutex_acquire(&pool->lock);
- /* Pick the next thread in random-access order. */
- worker = pool->threads[pool->next_for_work++];
- if (!worker) {
- tor_mutex_release(&pool->lock);
- return NULL;
- }
- if (pool->next_for_work >= pool->n_threads)
- pool->next_for_work = 0;
+
+ TOR_TAILQ_INSERT_TAIL(&pool->work, ent, next_work);
+
tor_mutex_release(&pool->lock);
- return workerthread_queue_work(worker, fn, reply_fn, arg);
+ tor_cond_signal_one(&pool->condition);
+
+ return ent;
}
/**
@@ -309,30 +323,56 @@ threadpool_queue_work(threadpool_t *pool,
* <b>arg</b> value is passed to <b>dup_fn</b> once per each thread to
* make a copy of it.
*
+ * UPDATE FUNCTIONS MUST BE IDEMPOTENT. We do not guarantee that every update
+ * will be run. If a new update is scheduled before the old update finishes
+ * running, then the new will replace the old in any threads that haven't run
+ * it yet.
+ *
* Return 0 on success, -1 on failure.
*/
int
-threadpool_queue_for_all(threadpool_t *pool,
+threadpool_queue_update(threadpool_t *pool,
void *(*dup_fn)(void *),
int (*fn)(void *, void *),
- void (*reply_fn)(void *),
+ void (*free_fn)(void *),
void *arg)
{
- int i = 0;
- workerthread_t *worker;
- void *arg_copy;
- while (1) {
- tor_mutex_acquire(&pool->lock);
- if (i >= pool->n_threads) {
- tor_mutex_release(&pool->lock);
- return 0;
- }
- worker = pool->threads[i++];
- tor_mutex_release(&pool->lock);
+ int i, n_threads;
+ void (*old_args_free_fn)(void *arg);
+ void **old_args;
+ void **new_args;
+
+ tor_mutex_acquire(&pool->lock);
+ n_threads = pool->n_threads;
+ old_args = pool->update_args;
+ old_args_free_fn = pool->free_update_arg_fn;
+
+ new_args = tor_calloc(n_threads, sizeof(void*));
+ for (i = 0; i < n_threads; ++i) {
+ if (dup_fn)
+ new_args[i] = dup_fn(arg);
+ else
+ new_args[i] = arg;
+ }
+
+ pool->update_args = new_args;
+ pool->free_update_arg_fn = free_fn;
+ pool->update_fn = fn;
+ ++pool->generation;
- arg_copy = dup_fn ? dup_fn(arg) : arg;
- /* CHECK*/ workerthread_queue_work(worker, fn, reply_fn, arg_copy);
+ tor_mutex_release(&pool->lock);
+
+ tor_cond_signal_all(&pool->condition);
+
+ if (old_args) {
+ for (i = 0; i < n_threads; ++i) {
+ if (old_args[i] && old_args_free_fn)
+ old_args_free_fn(old_args[i]);
+ }
+ tor_free(old_args);
}
+
+ return 0;
}
/** Launch threads until we have <b>n</b>. */
@@ -346,7 +386,8 @@ threadpool_start_threads(threadpool_t *pool, int n)
while (pool->n_threads < n) {
void *state = pool->new_thread_state_fn(pool->new_thread_state_arg);
- workerthread_t *thr = workerthread_new(state, pool->reply_queue);
+ workerthread_t *thr = workerthread_new(state, pool, pool->reply_queue);
+ thr->index = pool->n_threads;
if (!thr) {
tor_mutex_release(&pool->lock);
@@ -375,7 +416,10 @@ threadpool_new(int n_threads,
{
threadpool_t *pool;
pool = tor_malloc_zero(sizeof(threadpool_t));
- tor_mutex_init(&pool->lock);
+ tor_mutex_init_nonrecursive(&pool->lock);
+ tor_cond_init(&pool->condition);
+ TOR_TAILQ_INIT(&pool->work);
+
pool->new_thread_state_fn = new_thread_state_fn;
pool->new_thread_state_arg = arg;
pool->free_thread_state_fn = free_thread_state_fn;
@@ -447,7 +491,7 @@ replyqueue_process(replyqueue_t *queue)
workqueue_entry_t *work = TOR_TAILQ_FIRST(&queue->answers);
TOR_TAILQ_REMOVE(&queue->answers, work, next_work);
tor_mutex_release(&queue->lock);
- work->on_thread = NULL;
+ work->on_pool = NULL;
work->reply_fn(work->arg);
workqueue_entry_free(work);
diff --git a/src/common/workqueue.h b/src/common/workqueue.h
index aa1bcc5..92e82b8 100644
--- a/src/common/workqueue.h
+++ b/src/common/workqueue.h
@@ -27,11 +27,11 @@ workqueue_entry_t *threadpool_queue_work(threadpool_t *pool,
int (*fn)(void *, void *),
void (*reply_fn)(void *),
void *arg);
-int threadpool_queue_for_all(threadpool_t *pool,
- void *(*dup_fn)(void *),
- int (*fn)(void *, void *),
- void (*reply_fn)(void *),
- void *arg);
+int threadpool_queue_update(threadpool_t *pool,
+ void *(*dup_fn)(void *),
+ int (*fn)(void *, void *),
+ void (*free_fn)(void *),
+ void *arg);
void *workqueue_entry_cancel(workqueue_entry_t *pending_work);
threadpool_t *threadpool_new(int n_threads,
replyqueue_t *replyqueue,
diff --git a/src/or/cpuworker.c b/src/or/cpuworker.c
index 36ca505..3f129de 100644
--- a/src/or/cpuworker.c
+++ b/src/or/cpuworker.c
@@ -170,11 +170,6 @@ update_state_threadfn(void *state_, void *work_)
++state->generation;
return WQ_RPL_REPLY;
}
-static void
-update_state_replyfn(void *work_)
-{
- tor_free(work_);
-}
/** Called when the onion key has changed and we need to spawn new
* cpuworkers. Close all currently idle cpuworkers, and mark the last
@@ -183,11 +178,11 @@ update_state_replyfn(void *work_)
void
cpuworkers_rotate_keyinfo(void)
{
- if (threadpool_queue_for_all(threadpool,
- worker_state_new,
- update_state_threadfn,
- update_state_replyfn,
- NULL)) {
+ if (threadpool_queue_update(threadpool,
+ worker_state_new,
+ update_state_threadfn,
+ worker_state_free,
+ NULL)) {
log_warn(LD_OR, "Failed to queue key update for worker threads.");
}
}
diff --git a/src/test/test_workqueue.c b/src/test/test_workqueue.c
index 410f43c..8ce4405 100644
--- a/src/test/test_workqueue.c
+++ b/src/test/test_workqueue.c
@@ -132,7 +132,6 @@ new_state(void *arg)
/* Every thread gets its own keys. not a problem for benchmarking */
st->rsa = crypto_pk_new();
if (crypto_pk_generate_key_with_bits(st->rsa, 1024) < 0) {
- puts("keygen failed");
crypto_pk_free(st->rsa);
tor_free(st);
return NULL;
@@ -213,7 +212,6 @@ add_n_work_items(threadpool_t *tp, int n)
while (n_queued++ < n) {
ent = add_work(tp);
if (! ent) {
- puts("Couldn't add work.");
tor_event_base_loopexit(tor_libevent_get_base(), NULL);
return -1;
}
@@ -238,18 +236,6 @@ add_n_work_items(threadpool_t *tp, int n)
}
static int shutting_down = 0;
-static int n_shutdowns_done = 0;
-
-static void
-shutdown_reply(void *arg)
-{
- (void)arg;
- tor_assert(shutting_down);
- ++n_shutdowns_done;
- if (n_shutdowns_done == opt_n_threads) {
- tor_event_base_loopexit(tor_libevent_get_base(), NULL);
- }
-}
static void
replysock_readable_cb(tor_socket_t sock, short what, void *arg)
@@ -297,8 +283,8 @@ replysock_readable_cb(tor_socket_t sock, short what, void *arg)
n_received+n_successful_cancel == n_sent &&
n_sent >= opt_n_items) {
shutting_down = 1;
- threadpool_queue_for_all(tp, NULL,
- workqueue_do_shutdown, shutdown_reply, NULL);
+ threadpool_queue_update(tp, NULL,
+ workqueue_do_shutdown, NULL, NULL);
}
}
@@ -410,8 +396,9 @@ main(int argc, char **argv)
event_base_loop(tor_libevent_get_base(), 0);
- if (n_sent != opt_n_items || n_received+n_successful_cancel != n_sent ||
- n_shutdowns_done != opt_n_threads) {
+ if (n_sent != opt_n_items || n_received+n_successful_cancel != n_sent) {
+ printf("%d vs %d\n", n_sent, opt_n_items);
+ printf("%d+%d vs %d\n", n_received, n_successful_cancel, n_sent);
puts("FAIL");
return 1;
} else {
1
0

[tor/master] Incorporate some comments based on notes from dgoulet
by nickm@torproject.org 21 Jan '15
by nickm@torproject.org 21 Jan '15
21 Jan '15
commit 051ad788e0ebcd0c99c1498e7e45faa71c4830c1
Author: Nick Mathewson <nickm(a)torproject.org>
Date: Mon Dec 16 10:20:40 2013 -0500
Incorporate some comments based on notes from dgoulet
---
src/common/workqueue.c | 6 ++++--
1 file changed, 4 insertions(+), 2 deletions(-)
diff --git a/src/common/workqueue.c b/src/common/workqueue.c
index f3ef678..7fa8967 100644
--- a/src/common/workqueue.c
+++ b/src/common/workqueue.c
@@ -69,9 +69,10 @@ typedef struct workerthread_s {
tor_cond_t condition;
/** Queue of pending work that we have to do. */
TOR_TAILQ_HEAD(, workqueue_entry_s) work;
- /** True iff this thread is currently in its loop. */
+ /** True iff this thread is currently in its loop. (Not currently used.) */
unsigned is_running;
- /** True iff this thread has crashed or is shut down for some reason. */
+ /** True iff this thread has crashed or is shut down for some reason. (Not
+ * currently used.) */
unsigned is_shut_down;
/** True if we're waiting for more elements to get added to the queue. */
unsigned waiting;
@@ -190,6 +191,7 @@ worker_thread_main(void *thread_)
/* TODO: support an idle-function */
/* Okay. Now, wait till somebody has work for us. */
+ /* XXXX we could just omit waiting and instead */
thread->waiting = 1;
if (tor_cond_wait(&thread->condition, &thread->lock, NULL) < 0) {
/* XXXX ERROR */
1
0
commit ac5b70c700b211008853b5f212100a867f508dfd
Author: Nick Mathewson <nickm(a)torproject.org>
Date: Wed Jan 21 12:18:11 2015 -0500
handle EINTR in compat_*threads.c
---
src/common/compat_pthreads.c | 40 ++++++++++++++++++++----------
src/common/compat_threads.c | 55 +++++++++++++++++++++++++++++++++++++++---
2 files changed, 78 insertions(+), 17 deletions(-)
diff --git a/src/common/compat_pthreads.c b/src/common/compat_pthreads.c
index f434805..188a91f 100644
--- a/src/common/compat_pthreads.c
+++ b/src/common/compat_pthreads.c
@@ -196,23 +196,37 @@ tor_cond_uninit(tor_cond_t *cond)
int
tor_cond_wait(tor_cond_t *cond, tor_mutex_t *mutex, const struct timeval *tv)
{
+ int r;
if (tv == NULL) {
- return pthread_cond_wait(&cond->cond, &mutex->mutex) ? -1 : 0;
+ while (1) {
+ r = pthread_cond_wait(&cond->cond, &mutex->mutex);
+ if (r == EINTR) {
+ /* EINTR should be impossible according to POSIX, but POSIX, like the
+ * Pirate's Code, is apparently treated "more like what you'd call
+ * guidelines than actual rules." */
+ continue;
+ }
+ return r ? -1 : 0;
+ }
} else {
struct timespec ts;
struct timeval tvnow, tvsum;
- int r;
- gettimeofday(&tvnow, NULL);
- timeradd(tv, &tvnow, &tvsum);
- ts.tv_sec = tvsum.tv_sec;
- ts.tv_nsec = tvsum.tv_usec * 1000;
- r = pthread_cond_timedwait(&cond->cond, &mutex->mutex, &ts);
- if (r == 0)
- return 0;
- else if (r == ETIMEDOUT)
- return 1;
- else
- return -1;
+ while (1) {
+ gettimeofday(&tvnow, NULL);
+ timeradd(tv, &tvnow, &tvsum);
+ ts.tv_sec = tvsum.tv_sec;
+ ts.tv_nsec = tvsum.tv_usec * 1000;
+
+ r = pthread_cond_timedwait(&cond->cond, &mutex->mutex, &ts);
+ if (r == 0)
+ return 0;
+ else if (r == ETIMEDOUT)
+ return 1;
+ else if (r == EINTR)
+ continue;
+ else
+ return -1;
+ }
}
}
/** Wake up one of the waiters on <b>cond</b>. */
diff --git a/src/common/compat_threads.c b/src/common/compat_threads.c
index 7944007..3b79292 100644
--- a/src/common/compat_threads.c
+++ b/src/common/compat_threads.c
@@ -88,12 +88,59 @@ in_main_thread(void)
return main_thread_id == tor_get_thread_id();
}
+#if defined(HAVE_EVENTFD) || defined(HAVE_PIPE)
+/* non-interruptable versions */
+static int
+write_ni(int fd, const void *buf, size_t n)
+{
+ int r;
+ again:
+ r = write(fd, buf, n);
+ if (r < 0 && errno == EINTR)
+ goto again;
+ return r;
+}
+static int
+read_ni(int fd, void *buf, size_t n)
+{
+ int r;
+ again:
+ r = read(fd, buf, n);
+ if (r < 0 && errno == EINTR)
+ goto again;
+ return r;
+}
+#endif
+
+/* non-interruptable versions */
+static int
+send_ni(int fd, const void *buf, size_t n, int flags)
+{
+ int r;
+ again:
+ r = send(fd, buf, n, flags);
+ if (r < 0 && errno == EINTR)
+ goto again;
+ return r;
+}
+
+static int
+recv_ni(int fd, void *buf, size_t n, int flags)
+{
+ int r;
+ again:
+ r = recv(fd, buf, n, flags);
+ if (r < 0 && errno == EINTR)
+ goto again;
+ return r;
+}
+
#ifdef HAVE_EVENTFD
static int
eventfd_alert(int fd)
{
uint64_t u = 1;
- int r = write(fd, (void*)&u, sizeof(u));
+ int r = write_ni(fd, (void*)&u, sizeof(u));
if (r < 0 && errno != EAGAIN)
return -1;
return 0;
@@ -103,7 +150,7 @@ static int
eventfd_drain(int fd)
{
uint64_t u = 0;
- int r = read(fd, (void*)&u, sizeof(u));
+ int r = read_ni(fd, (void*)&u, sizeof(u));
if (r < 0 && errno != EAGAIN)
return -1;
return 0;
@@ -136,7 +183,7 @@ pipe_drain(int fd)
static int
sock_alert(tor_socket_t fd)
{
- ssize_t r = send(fd, "x", 1, 0);
+ ssize_t r = send_ni(fd, "x", 1, 0);
if (r < 0 && !ERRNO_IS_EAGAIN(tor_socket_errno(fd)))
return -1;
return 0;
@@ -147,7 +194,7 @@ sock_drain(tor_socket_t fd)
{
char buf[32];
ssize_t r;
- while ((r = recv(fd, buf, sizeof(buf), 0)) >= 0)
+ while ((r = recv_ni(fd, buf, sizeof(buf), 0)) >= 0)
;
if (r == 0 || !ERRNO_IS_EAGAIN(tor_socket_errno(fd)))
return -1;
1
0

21 Jan '15
commit c2f0d52b7fb937f3f66ef8b2b74b0fdf239b0e9b
Author: Nick Mathewson <nickm(a)torproject.org>
Date: Sun Sep 22 21:30:46 2013 -0400
Split threading-related code out of compat.c
Also, re-enable the #if'd out condition-variable code.
Work queues are going to make us hack on all of this stuff a bit more
closely, so it might not be a terrible idea to make it easier to hack.
---
configure.ac | 4 +
src/common/compat.c | 394 ----------------------------------------
src/common/compat.h | 57 +-----
src/common/compat_pthreads.c | 211 +++++++++++++++++++++
src/common/compat_threads.c | 42 +++++
src/common/compat_threads.h | 67 +++++++
src/common/compat_winthreads.c | 161 ++++++++++++++++
src/common/include.am | 13 +-
8 files changed, 499 insertions(+), 450 deletions(-)
diff --git a/configure.ac b/configure.ac
index c254725..65b3ff2 100644
--- a/configure.ac
+++ b/configure.ac
@@ -393,6 +393,10 @@ fi
AC_SEARCH_LIBS(pthread_create, [pthread])
AC_SEARCH_LIBS(pthread_detach, [pthread])
+AM_CONDITIONAL(THREADS_WIN32, test "$enable_threads" = "yes" && test "$bwin32" = "true")
+AM_CONDITIONAL(THREADS_PTHREADS, test "$enable_threads" = "yes" && test "$bwin32" = "false")
+AM_CONDITIONAL(THREADS_NONE, test "$enable_threads" != "yes")
+
dnl -------------------------------------------------------------------
dnl Check for functions before libevent, since libevent-1.2 apparently
dnl exports strlcpy without defining it in a header.
diff --git a/src/common/compat.c b/src/common/compat.c
index 6d36321..a22a61a 100644
--- a/src/common/compat.c
+++ b/src/common/compat.c
@@ -2544,109 +2544,6 @@ get_uname(void)
* Process control
*/
-#if defined(USE_PTHREADS)
-/** Wraps a void (*)(void*) function and its argument so we can
- * invoke them in a way pthreads would expect.
- */
-typedef struct tor_pthread_data_t {
- void (*func)(void *);
- void *data;
-} tor_pthread_data_t;
-/** Given a tor_pthread_data_t <b>_data</b>, call _data->func(d->data)
- * and free _data. Used to make sure we can call functions the way pthread
- * expects. */
-static void *
-tor_pthread_helper_fn(void *_data)
-{
- tor_pthread_data_t *data = _data;
- void (*func)(void*);
- void *arg;
- /* mask signals to worker threads to avoid SIGPIPE, etc */
- sigset_t sigs;
- /* We're in a subthread; don't handle any signals here. */
- sigfillset(&sigs);
- pthread_sigmask(SIG_SETMASK, &sigs, NULL);
-
- func = data->func;
- arg = data->data;
- tor_free(_data);
- func(arg);
- return NULL;
-}
-/**
- * A pthread attribute to make threads start detached.
- */
-static pthread_attr_t attr_detached;
-/** True iff we've called tor_threads_init() */
-static int threads_initialized = 0;
-#endif
-
-/** Minimalist interface to run a void function in the background. On
- * Unix calls fork, on win32 calls beginthread. Returns -1 on failure.
- * func should not return, but rather should call spawn_exit.
- *
- * NOTE: if <b>data</b> is used, it should not be allocated on the stack,
- * since in a multithreaded environment, there is no way to be sure that
- * the caller's stack will still be around when the called function is
- * running.
- */
-int
-spawn_func(void (*func)(void *), void *data)
-{
-#if defined(USE_WIN32_THREADS)
- int rv;
- rv = (int)_beginthread(func, 0, data);
- if (rv == (int)-1)
- return -1;
- return 0;
-#elif defined(USE_PTHREADS)
- pthread_t thread;
- tor_pthread_data_t *d;
- if (PREDICT_UNLIKELY(!threads_initialized))
- tor_threads_init();
- d = tor_malloc(sizeof(tor_pthread_data_t));
- d->data = data;
- d->func = func;
- if (pthread_create(&thread,&attr_detached,tor_pthread_helper_fn,d))
- return -1;
- return 0;
-#else
- pid_t pid;
- pid = fork();
- if (pid<0)
- return -1;
- if (pid==0) {
- /* Child */
- func(data);
- tor_assert(0); /* Should never reach here. */
- return 0; /* suppress "control-reaches-end-of-non-void" warning. */
- } else {
- /* Parent */
- return 0;
- }
-#endif
-}
-
-/** End the current thread/process.
- */
-void
-spawn_exit(void)
-{
-#if defined(USE_WIN32_THREADS)
- _endthread();
- //we should never get here. my compiler thinks that _endthread returns, this
- //is an attempt to fool it.
- tor_assert(0);
- _exit(0);
-#elif defined(USE_PTHREADS)
- pthread_exit(NULL);
-#else
- /* http://www.erlenstar.demon.co.uk/unix/faq_2.html says we should
- * call _exit, not exit, from child processes. */
- _exit(0);
-#endif
-}
-
/** Implementation logic for compute_num_cpus(). */
static int
compute_num_cpus_impl(void)
@@ -2935,280 +2832,6 @@ tor_gmtime_r(const time_t *timep, struct tm *result)
}
#endif
-#if defined(USE_WIN32_THREADS)
-void
-tor_mutex_init(tor_mutex_t *m)
-{
- InitializeCriticalSection(&m->mutex);
-}
-void
-tor_mutex_uninit(tor_mutex_t *m)
-{
- DeleteCriticalSection(&m->mutex);
-}
-void
-tor_mutex_acquire(tor_mutex_t *m)
-{
- tor_assert(m);
- EnterCriticalSection(&m->mutex);
-}
-void
-tor_mutex_release(tor_mutex_t *m)
-{
- LeaveCriticalSection(&m->mutex);
-}
-unsigned long
-tor_get_thread_id(void)
-{
- return (unsigned long)GetCurrentThreadId();
-}
-#elif defined(USE_PTHREADS)
-/** A mutex attribute that we're going to use to tell pthreads that we want
- * "reentrant" mutexes (i.e., once we can re-lock if we're already holding
- * them.) */
-static pthread_mutexattr_t attr_reentrant;
-/** Initialize <b>mutex</b> so it can be locked. Every mutex must be set
- * up with tor_mutex_init() or tor_mutex_new(); not both. */
-void
-tor_mutex_init(tor_mutex_t *mutex)
-{
- int err;
- if (PREDICT_UNLIKELY(!threads_initialized))
- tor_threads_init();
- err = pthread_mutex_init(&mutex->mutex, &attr_reentrant);
- if (PREDICT_UNLIKELY(err)) {
- log_err(LD_GENERAL, "Error %d creating a mutex.", err);
- tor_fragile_assert();
- }
-}
-/** Wait until <b>m</b> is free, then acquire it. */
-void
-tor_mutex_acquire(tor_mutex_t *m)
-{
- int err;
- tor_assert(m);
- err = pthread_mutex_lock(&m->mutex);
- if (PREDICT_UNLIKELY(err)) {
- log_err(LD_GENERAL, "Error %d locking a mutex.", err);
- tor_fragile_assert();
- }
-}
-/** Release the lock <b>m</b> so another thread can have it. */
-void
-tor_mutex_release(tor_mutex_t *m)
-{
- int err;
- tor_assert(m);
- err = pthread_mutex_unlock(&m->mutex);
- if (PREDICT_UNLIKELY(err)) {
- log_err(LD_GENERAL, "Error %d unlocking a mutex.", err);
- tor_fragile_assert();
- }
-}
-/** Clean up the mutex <b>m</b> so that it no longer uses any system
- * resources. Does not free <b>m</b>. This function must only be called on
- * mutexes from tor_mutex_init(). */
-void
-tor_mutex_uninit(tor_mutex_t *m)
-{
- int err;
- tor_assert(m);
- err = pthread_mutex_destroy(&m->mutex);
- if (PREDICT_UNLIKELY(err)) {
- log_err(LD_GENERAL, "Error %d destroying a mutex.", err);
- tor_fragile_assert();
- }
-}
-/** Return an integer representing this thread. */
-unsigned long
-tor_get_thread_id(void)
-{
- union {
- pthread_t thr;
- unsigned long id;
- } r;
- r.thr = pthread_self();
- return r.id;
-}
-#endif
-
-/** Return a newly allocated, ready-for-use mutex. */
-tor_mutex_t *
-tor_mutex_new(void)
-{
- tor_mutex_t *m = tor_malloc_zero(sizeof(tor_mutex_t));
- tor_mutex_init(m);
- return m;
-}
-/** Release all storage and system resources held by <b>m</b>. */
-void
-tor_mutex_free(tor_mutex_t *m)
-{
- if (!m)
- return;
- tor_mutex_uninit(m);
- tor_free(m);
-}
-
-/* Conditions. */
-#ifdef USE_PTHREADS
-#if 0
-/** Cross-platform condition implementation. */
-struct tor_cond_t {
- pthread_cond_t cond;
-};
-/** Return a newly allocated condition, with nobody waiting on it. */
-tor_cond_t *
-tor_cond_new(void)
-{
- tor_cond_t *cond = tor_malloc_zero(sizeof(tor_cond_t));
- if (pthread_cond_init(&cond->cond, NULL)) {
- tor_free(cond);
- return NULL;
- }
- return cond;
-}
-/** Release all resources held by <b>cond</b>. */
-void
-tor_cond_free(tor_cond_t *cond)
-{
- if (!cond)
- return;
- if (pthread_cond_destroy(&cond->cond)) {
- log_warn(LD_GENERAL,"Error freeing condition: %s", strerror(errno));
- return;
- }
- tor_free(cond);
-}
-/** Wait until one of the tor_cond_signal functions is called on <b>cond</b>.
- * All waiters on the condition must wait holding the same <b>mutex</b>.
- * Returns 0 on success, negative on failure. */
-int
-tor_cond_wait(tor_cond_t *cond, tor_mutex_t *mutex)
-{
- return pthread_cond_wait(&cond->cond, &mutex->mutex) ? -1 : 0;
-}
-/** Wake up one of the waiters on <b>cond</b>. */
-void
-tor_cond_signal_one(tor_cond_t *cond)
-{
- pthread_cond_signal(&cond->cond);
-}
-/** Wake up all of the waiters on <b>cond</b>. */
-void
-tor_cond_signal_all(tor_cond_t *cond)
-{
- pthread_cond_broadcast(&cond->cond);
-}
-#endif
-/** Set up common structures for use by threading. */
-void
-tor_threads_init(void)
-{
- if (!threads_initialized) {
- pthread_mutexattr_init(&attr_reentrant);
- pthread_mutexattr_settype(&attr_reentrant, PTHREAD_MUTEX_RECURSIVE);
- tor_assert(0==pthread_attr_init(&attr_detached));
- tor_assert(0==pthread_attr_setdetachstate(&attr_detached, 1));
- threads_initialized = 1;
- set_main_thread();
- }
-}
-#elif defined(USE_WIN32_THREADS)
-#if 0
-static DWORD cond_event_tls_index;
-struct tor_cond_t {
- CRITICAL_SECTION mutex;
- smartlist_t *events;
-};
-tor_cond_t *
-tor_cond_new(void)
-{
- tor_cond_t *cond = tor_malloc_zero(sizeof(tor_cond_t));
- InitializeCriticalSection(&cond->mutex);
- cond->events = smartlist_new();
- return cond;
-}
-void
-tor_cond_free(tor_cond_t *cond)
-{
- if (!cond)
- return;
- DeleteCriticalSection(&cond->mutex);
- /* XXXX notify? */
- smartlist_free(cond->events);
- tor_free(cond);
-}
-int
-tor_cond_wait(tor_cond_t *cond, tor_mutex_t *mutex)
-{
- HANDLE event;
- int r;
- tor_assert(cond);
- tor_assert(mutex);
- event = TlsGetValue(cond_event_tls_index);
- if (!event) {
- event = CreateEvent(0, FALSE, FALSE, NULL);
- TlsSetValue(cond_event_tls_index, event);
- }
- EnterCriticalSection(&cond->mutex);
-
- tor_assert(WaitForSingleObject(event, 0) == WAIT_TIMEOUT);
- tor_assert(!smartlist_contains(cond->events, event));
- smartlist_add(cond->events, event);
-
- LeaveCriticalSection(&cond->mutex);
-
- tor_mutex_release(mutex);
- r = WaitForSingleObject(event, INFINITE);
- tor_mutex_acquire(mutex);
-
- switch (r) {
- case WAIT_OBJECT_0: /* we got the mutex normally. */
- break;
- case WAIT_ABANDONED: /* holding thread exited. */
- case WAIT_TIMEOUT: /* Should never happen. */
- tor_assert(0);
- break;
- case WAIT_FAILED:
- log_warn(LD_GENERAL, "Failed to acquire mutex: %d",(int) GetLastError());
- }
- return 0;
-}
-void
-tor_cond_signal_one(tor_cond_t *cond)
-{
- HANDLE event;
- tor_assert(cond);
-
- EnterCriticalSection(&cond->mutex);
-
- if ((event = smartlist_pop_last(cond->events)))
- SetEvent(event);
-
- LeaveCriticalSection(&cond->mutex);
-}
-void
-tor_cond_signal_all(tor_cond_t *cond)
-{
- tor_assert(cond);
-
- EnterCriticalSection(&cond->mutex);
- SMARTLIST_FOREACH(cond->events, HANDLE, event, SetEvent(event));
- smartlist_clear(cond->events);
- LeaveCriticalSection(&cond->mutex);
-}
-#endif
-void
-tor_threads_init(void)
-{
-#if 0
- cond_event_tls_index = TlsAlloc();
-#endif
- set_main_thread();
-}
-#endif
-
#if defined(HAVE_MLOCKALL) && HAVE_DECL_MLOCKALL && defined(RLIMIT_MEMLOCK)
/** Attempt to raise the current and max rlimit to infinity for our process.
* This only needs to be done once and can probably only be done when we have
@@ -3292,23 +2915,6 @@ tor_mlockall(void)
#endif
}
-/** Identity of the "main" thread */
-static unsigned long main_thread_id = -1;
-
-/** Start considering the current thread to be the 'main thread'. This has
- * no effect on anything besides in_main_thread(). */
-void
-set_main_thread(void)
-{
- main_thread_id = tor_get_thread_id();
-}
-/** Return true iff called from the main thread. */
-int
-in_main_thread(void)
-{
- return main_thread_id == tor_get_thread_id();
-}
-
/**
* On Windows, WSAEWOULDBLOCK is not always correct: when you see it,
* you need to ask the socket for its actual errno. Also, you need to
diff --git a/src/common/compat.h b/src/common/compat.h
index 04e8cb2..23f8614 100644
--- a/src/common/compat.h
+++ b/src/common/compat.h
@@ -36,9 +36,6 @@
#ifdef HAVE_STRING_H
#include <string.h>
#endif
-#if defined(HAVE_PTHREAD_H) && !defined(_WIN32)
-#include <pthread.h>
-#endif
#include <stdarg.h>
#ifdef HAVE_SYS_RESOURCE_H
#include <sys/resource.h>
@@ -642,61 +639,10 @@ char **get_environment(void);
int get_total_system_memory(size_t *mem_out);
-int spawn_func(void (*func)(void *), void *data);
-void spawn_exit(void) ATTR_NORETURN;
-
-#if defined(_WIN32)
-#define USE_WIN32_THREADS
-#elif defined(HAVE_PTHREAD_H) && defined(HAVE_PTHREAD_CREATE)
-#define USE_PTHREADS
-#else
-#error "No threading system was found"
-#endif
-
int compute_num_cpus(void);
-/* Because we use threads instead of processes on most platforms (Windows,
- * Linux, etc), we need locking for them. On platforms with poor thread
- * support or broken gethostbyname_r, these functions are no-ops. */
-
-/** A generic lock structure for multithreaded builds. */
-typedef struct tor_mutex_t {
-#if defined(USE_WIN32_THREADS)
- /** Windows-only: on windows, we implement locks with CRITICAL_SECTIONS. */
- CRITICAL_SECTION mutex;
-#elif defined(USE_PTHREADS)
- /** Pthreads-only: with pthreads, we implement locks with
- * pthread_mutex_t. */
- pthread_mutex_t mutex;
-#else
- /** No-threads only: Dummy variable so that tor_mutex_t takes up space. */
- int _unused;
-#endif
-} tor_mutex_t;
-
int tor_mlockall(void);
-tor_mutex_t *tor_mutex_new(void);
-void tor_mutex_init(tor_mutex_t *m);
-void tor_mutex_acquire(tor_mutex_t *m);
-void tor_mutex_release(tor_mutex_t *m);
-void tor_mutex_free(tor_mutex_t *m);
-void tor_mutex_uninit(tor_mutex_t *m);
-unsigned long tor_get_thread_id(void);
-void tor_threads_init(void);
-
-void set_main_thread(void);
-int in_main_thread(void);
-
-#if 0
-typedef struct tor_cond_t tor_cond_t;
-tor_cond_t *tor_cond_new(void);
-void tor_cond_free(tor_cond_t *cond);
-int tor_cond_wait(tor_cond_t *cond, tor_mutex_t *mutex);
-void tor_cond_signal_one(tor_cond_t *cond);
-void tor_cond_signal_all(tor_cond_t *cond);
-#endif
-
/** Macros for MIN/MAX. Never use these when the arguments could have
* side-effects.
* {With GCC extensions we could probably define a safer MIN/MAX. But
@@ -742,5 +688,8 @@ STATIC int tor_ersatz_socketpair(int family, int type, int protocol,
#endif
#endif
+/* This needs some of the declarations above so we include it here. */
+#include "compat_threads.h"
+
#endif
diff --git a/src/common/compat_pthreads.c b/src/common/compat_pthreads.c
new file mode 100644
index 0000000..276b244
--- /dev/null
+++ b/src/common/compat_pthreads.c
@@ -0,0 +1,211 @@
+/* Copyright (c) 2003-2004, Roger Dingledine
+ * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
+ * Copyright (c) 2007-2015, The Tor Project, Inc. */
+/* See LICENSE for licensing information */
+
+#include "orconfig.h"
+#include <pthread.h>
+
+#include "compat.h"
+#include "torlog.h"
+#include "util.h"
+
+/** Wraps a void (*)(void*) function and its argument so we can
+ * invoke them in a way pthreads would expect.
+ */
+typedef struct tor_pthread_data_t {
+ void (*func)(void *);
+ void *data;
+} tor_pthread_data_t;
+/** Given a tor_pthread_data_t <b>_data</b>, call _data->func(d->data)
+ * and free _data. Used to make sure we can call functions the way pthread
+ * expects. */
+static void *
+tor_pthread_helper_fn(void *_data)
+{
+ tor_pthread_data_t *data = _data;
+ void (*func)(void*);
+ void *arg;
+ /* mask signals to worker threads to avoid SIGPIPE, etc */
+ sigset_t sigs;
+ /* We're in a subthread; don't handle any signals here. */
+ sigfillset(&sigs);
+ pthread_sigmask(SIG_SETMASK, &sigs, NULL);
+
+ func = data->func;
+ arg = data->data;
+ tor_free(_data);
+ func(arg);
+ return NULL;
+}
+/**
+ * A pthread attribute to make threads start detached.
+ */
+static pthread_attr_t attr_detached;
+/** True iff we've called tor_threads_init() */
+static int threads_initialized = 0;
+
+
+/** Minimalist interface to run a void function in the background. On
+ * Unix calls fork, on win32 calls beginthread. Returns -1 on failure.
+ * func should not return, but rather should call spawn_exit.
+ *
+ * NOTE: if <b>data</b> is used, it should not be allocated on the stack,
+ * since in a multithreaded environment, there is no way to be sure that
+ * the caller's stack will still be around when the called function is
+ * running.
+ */
+int
+spawn_func(void (*func)(void *), void *data)
+{
+ pthread_t thread;
+ tor_pthread_data_t *d;
+ if (PREDICT_UNLIKELY(!threads_initialized))
+ tor_threads_init();
+ d = tor_malloc(sizeof(tor_pthread_data_t));
+ d->data = data;
+ d->func = func;
+ if (pthread_create(&thread,&attr_detached,tor_pthread_helper_fn,d))
+ return -1;
+ return 0;
+}
+
+/** End the current thread/process.
+ */
+void
+spawn_exit(void)
+{
+ pthread_exit(NULL);
+}
+
+/** A mutex attribute that we're going to use to tell pthreads that we want
+ * "reentrant" mutexes (i.e., once we can re-lock if we're already holding
+ * them.) */
+static pthread_mutexattr_t attr_reentrant;
+/** Initialize <b>mutex</b> so it can be locked. Every mutex must be set
+ * up with tor_mutex_init() or tor_mutex_new(); not both. */
+void
+tor_mutex_init(tor_mutex_t *mutex)
+{
+ int err;
+ if (PREDICT_UNLIKELY(!threads_initialized))
+ tor_threads_init();
+ err = pthread_mutex_init(&mutex->mutex, &attr_reentrant);
+ if (PREDICT_UNLIKELY(err)) {
+ log_err(LD_GENERAL, "Error %d creating a mutex.", err);
+ tor_fragile_assert();
+ }
+}
+/** Wait until <b>m</b> is free, then acquire it. */
+void
+tor_mutex_acquire(tor_mutex_t *m)
+{
+ int err;
+ tor_assert(m);
+ err = pthread_mutex_lock(&m->mutex);
+ if (PREDICT_UNLIKELY(err)) {
+ log_err(LD_GENERAL, "Error %d locking a mutex.", err);
+ tor_fragile_assert();
+ }
+}
+/** Release the lock <b>m</b> so another thread can have it. */
+void
+tor_mutex_release(tor_mutex_t *m)
+{
+ int err;
+ tor_assert(m);
+ err = pthread_mutex_unlock(&m->mutex);
+ if (PREDICT_UNLIKELY(err)) {
+ log_err(LD_GENERAL, "Error %d unlocking a mutex.", err);
+ tor_fragile_assert();
+ }
+}
+/** Clean up the mutex <b>m</b> so that it no longer uses any system
+ * resources. Does not free <b>m</b>. This function must only be called on
+ * mutexes from tor_mutex_init(). */
+void
+tor_mutex_uninit(tor_mutex_t *m)
+{
+ int err;
+ tor_assert(m);
+ err = pthread_mutex_destroy(&m->mutex);
+ if (PREDICT_UNLIKELY(err)) {
+ log_err(LD_GENERAL, "Error %d destroying a mutex.", err);
+ tor_fragile_assert();
+ }
+}
+/** Return an integer representing this thread. */
+unsigned long
+tor_get_thread_id(void)
+{
+ union {
+ pthread_t thr;
+ unsigned long id;
+ } r;
+ r.thr = pthread_self();
+ return r.id;
+}
+
+/* Conditions. */
+
+/** Cross-platform condition implementation. */
+struct tor_cond_t {
+ pthread_cond_t cond;
+};
+/** Return a newly allocated condition, with nobody waiting on it. */
+tor_cond_t *
+tor_cond_new(void)
+{
+ tor_cond_t *cond = tor_malloc_zero(sizeof(tor_cond_t));
+ if (pthread_cond_init(&cond->cond, NULL)) {
+ tor_free(cond);
+ return NULL;
+ }
+ return cond;
+}
+/** Release all resources held by <b>cond</b>. */
+void
+tor_cond_free(tor_cond_t *cond)
+{
+ if (!cond)
+ return;
+ if (pthread_cond_destroy(&cond->cond)) {
+ log_warn(LD_GENERAL,"Error freeing condition: %s", strerror(errno));
+ return;
+ }
+ tor_free(cond);
+}
+/** Wait until one of the tor_cond_signal functions is called on <b>cond</b>.
+ * All waiters on the condition must wait holding the same <b>mutex</b>.
+ * Returns 0 on success, negative on failure. */
+int
+tor_cond_wait(tor_cond_t *cond, tor_mutex_t *mutex)
+{
+ return pthread_cond_wait(&cond->cond, &mutex->mutex) ? -1 : 0;
+}
+/** Wake up one of the waiters on <b>cond</b>. */
+void
+tor_cond_signal_one(tor_cond_t *cond)
+{
+ pthread_cond_signal(&cond->cond);
+}
+/** Wake up all of the waiters on <b>cond</b>. */
+void
+tor_cond_signal_all(tor_cond_t *cond)
+{
+ pthread_cond_broadcast(&cond->cond);
+}
+
+/** Set up common structures for use by threading. */
+void
+tor_threads_init(void)
+{
+ if (!threads_initialized) {
+ pthread_mutexattr_init(&attr_reentrant);
+ pthread_mutexattr_settype(&attr_reentrant, PTHREAD_MUTEX_RECURSIVE);
+ tor_assert(0==pthread_attr_init(&attr_detached));
+ tor_assert(0==pthread_attr_setdetachstate(&attr_detached, 1));
+ threads_initialized = 1;
+ set_main_thread();
+ }
+}
diff --git a/src/common/compat_threads.c b/src/common/compat_threads.c
new file mode 100644
index 0000000..84a8a21
--- /dev/null
+++ b/src/common/compat_threads.c
@@ -0,0 +1,42 @@
+/* Copyright (c) 2003-2004, Roger Dingledine
+ * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
+ * Copyright (c) 2007-2015, The Tor Project, Inc. */
+/* See LICENSE for licensing information */
+
+#include "compat.h"
+#include "util.h"
+
+/** Return a newly allocated, ready-for-use mutex. */
+tor_mutex_t *
+tor_mutex_new(void)
+{
+ tor_mutex_t *m = tor_malloc_zero(sizeof(tor_mutex_t));
+ tor_mutex_init(m);
+ return m;
+}
+/** Release all storage and system resources held by <b>m</b>. */
+void
+tor_mutex_free(tor_mutex_t *m)
+{
+ if (!m)
+ return;
+ tor_mutex_uninit(m);
+ tor_free(m);
+}
+
+/** Identity of the "main" thread */
+static unsigned long main_thread_id = -1;
+
+/** Start considering the current thread to be the 'main thread'. This has
+ * no effect on anything besides in_main_thread(). */
+void
+set_main_thread(void)
+{
+ main_thread_id = tor_get_thread_id();
+}
+/** Return true iff called from the main thread. */
+int
+in_main_thread(void)
+{
+ return main_thread_id == tor_get_thread_id();
+}
diff --git a/src/common/compat_threads.h b/src/common/compat_threads.h
new file mode 100644
index 0000000..f43e74a
--- /dev/null
+++ b/src/common/compat_threads.h
@@ -0,0 +1,67 @@
+/* Copyright (c) 2003-2004, Roger Dingledine
+ * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
+ * Copyright (c) 2007-2015, The Tor Project, Inc. */
+/* See LICENSE for licensing information */
+
+#ifndef TOR_COMPAT_THREADS_H
+#define TOR_COMPAT_THREADS_H
+
+#include "orconfig.h"
+#include "torint.h"
+#include "testsupport.h"
+
+#if defined(HAVE_PTHREAD_H) && !defined(_WIN32)
+#include <pthread.h>
+#endif
+
+#if defined(_WIN32)
+#define USE_WIN32_THREADS
+#elif defined(HAVE_PTHREAD_H) && defined(HAVE_PTHREAD_CREATE)
+#define USE_PTHREADS
+#else
+#error "No threading system was found"
+#endif
+
+int spawn_func(void (*func)(void *), void *data);
+void spawn_exit(void) ATTR_NORETURN;
+
+/* Because we use threads instead of processes on most platforms (Windows,
+ * Linux, etc), we need locking for them. On platforms with poor thread
+ * support or broken gethostbyname_r, these functions are no-ops. */
+
+/** A generic lock structure for multithreaded builds. */
+typedef struct tor_mutex_t {
+#if defined(USE_WIN32_THREADS)
+ /** Windows-only: on windows, we implement locks with CRITICAL_SECTIONS. */
+ CRITICAL_SECTION mutex;
+#elif defined(USE_PTHREADS)
+ /** Pthreads-only: with pthreads, we implement locks with
+ * pthread_mutex_t. */
+ pthread_mutex_t mutex;
+#else
+ /** No-threads only: Dummy variable so that tor_mutex_t takes up space. */
+ int _unused;
+#endif
+} tor_mutex_t;
+
+
+tor_mutex_t *tor_mutex_new(void);
+void tor_mutex_init(tor_mutex_t *m);
+void tor_mutex_acquire(tor_mutex_t *m);
+void tor_mutex_release(tor_mutex_t *m);
+void tor_mutex_free(tor_mutex_t *m);
+void tor_mutex_uninit(tor_mutex_t *m);
+unsigned long tor_get_thread_id(void);
+void tor_threads_init(void);
+
+void set_main_thread(void);
+int in_main_thread(void);
+
+typedef struct tor_cond_t tor_cond_t;
+tor_cond_t *tor_cond_new(void);
+void tor_cond_free(tor_cond_t *cond);
+int tor_cond_wait(tor_cond_t *cond, tor_mutex_t *mutex);
+void tor_cond_signal_one(tor_cond_t *cond);
+void tor_cond_signal_all(tor_cond_t *cond);
+
+#endif
diff --git a/src/common/compat_winthreads.c b/src/common/compat_winthreads.c
new file mode 100644
index 0000000..01332fd
--- /dev/null
+++ b/src/common/compat_winthreads.c
@@ -0,0 +1,161 @@
+/* Copyright (c) 2003-2004, Roger Dingledine
+ * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
+ * Copyright (c) 2007-2015, The Tor Project, Inc. */
+/* See LICENSE for licensing information */
+
+#include "compat.h"
+#include <windows.h>
+#include <process.h>
+#include "util.h"
+#include "container.h"
+#include "torlog.h"
+
+/** Minimalist interface to run a void function in the background. On
+ * Unix calls fork, on win32 calls beginthread. Returns -1 on failure.
+ * func should not return, but rather should call spawn_exit.
+ *
+ * NOTE: if <b>data</b> is used, it should not be allocated on the stack,
+ * since in a multithreaded environment, there is no way to be sure that
+ * the caller's stack will still be around when the called function is
+ * running.
+ */
+int
+spawn_func(void (*func)(void *), void *data)
+{
+ int rv;
+ rv = (int)_beginthread(func, 0, data);
+ if (rv == (int)-1)
+ return -1;
+ return 0;
+}
+
+
+/** End the current thread/process.
+ */
+void
+spawn_exit(void)
+{
+ _endthread();
+ //we should never get here. my compiler thinks that _endthread returns, this
+ //is an attempt to fool it.
+ tor_assert(0);
+ _exit(0);
+}
+
+
+void
+tor_mutex_init(tor_mutex_t *m)
+{
+ InitializeCriticalSection(&m->mutex);
+}
+void
+tor_mutex_uninit(tor_mutex_t *m)
+{
+ DeleteCriticalSection(&m->mutex);
+}
+void
+tor_mutex_acquire(tor_mutex_t *m)
+{
+ tor_assert(m);
+ EnterCriticalSection(&m->mutex);
+}
+void
+tor_mutex_release(tor_mutex_t *m)
+{
+ LeaveCriticalSection(&m->mutex);
+}
+unsigned long
+tor_get_thread_id(void)
+{
+ return (unsigned long)GetCurrentThreadId();
+}
+
+static DWORD cond_event_tls_index;
+struct tor_cond_t {
+ CRITICAL_SECTION mutex;
+ smartlist_t *events;
+};
+tor_cond_t *
+tor_cond_new(void)
+{
+ tor_cond_t *cond = tor_malloc_zero(sizeof(tor_cond_t));
+ InitializeCriticalSection(&cond->mutex);
+ cond->events = smartlist_new();
+ return cond;
+}
+void
+tor_cond_free(tor_cond_t *cond)
+{
+ if (!cond)
+ return;
+ DeleteCriticalSection(&cond->mutex);
+ /* XXXX notify? */
+ smartlist_free(cond->events);
+ tor_free(cond);
+}
+int
+tor_cond_wait(tor_cond_t *cond, tor_mutex_t *mutex)
+{
+ HANDLE event;
+ int r;
+ tor_assert(cond);
+ tor_assert(mutex);
+ event = TlsGetValue(cond_event_tls_index);
+ if (!event) {
+ event = CreateEvent(0, FALSE, FALSE, NULL);
+ TlsSetValue(cond_event_tls_index, event);
+ }
+ EnterCriticalSection(&cond->mutex);
+
+ tor_assert(WaitForSingleObject(event, 0) == WAIT_TIMEOUT);
+ tor_assert(!smartlist_contains(cond->events, event));
+ smartlist_add(cond->events, event);
+
+ LeaveCriticalSection(&cond->mutex);
+
+ tor_mutex_release(mutex);
+ r = WaitForSingleObject(event, INFINITE);
+ tor_mutex_acquire(mutex);
+
+ switch (r) {
+ case WAIT_OBJECT_0: /* we got the mutex normally. */
+ break;
+ case WAIT_ABANDONED: /* holding thread exited. */
+ case WAIT_TIMEOUT: /* Should never happen. */
+ tor_assert(0);
+ break;
+ case WAIT_FAILED:
+ log_warn(LD_GENERAL, "Failed to acquire mutex: %d",(int) GetLastError());
+ }
+ return 0;
+}
+void
+tor_cond_signal_one(tor_cond_t *cond)
+{
+ HANDLE event;
+ tor_assert(cond);
+
+ EnterCriticalSection(&cond->mutex);
+
+ if ((event = smartlist_pop_last(cond->events)))
+ SetEvent(event);
+
+ LeaveCriticalSection(&cond->mutex);
+}
+void
+tor_cond_signal_all(tor_cond_t *cond)
+{
+ tor_assert(cond);
+
+ EnterCriticalSection(&cond->mutex);
+ SMARTLIST_FOREACH(cond->events, HANDLE, event, SetEvent(event));
+ smartlist_clear(cond->events);
+ LeaveCriticalSection(&cond->mutex);
+}
+
+void
+tor_threads_init(void)
+{
+ cond_event_tls_index = TlsAlloc();
+ set_main_thread();
+}
diff --git a/src/common/include.am b/src/common/include.am
index 6441596..e4eeba6 100644
--- a/src/common/include.am
+++ b/src/common/include.am
@@ -54,10 +54,18 @@ endif
LIBDONNA += $(LIBED25519_REF10)
+if THREADS_PTHREADS
+threads_impl_source=src/common/compat_pthreads.c
+endif
+if THREADS_WIN32
+threads_impl_source=src/common/compat_winthreads.c
+endif
+
LIBOR_A_SOURCES = \
src/common/address.c \
src/common/backtrace.c \
src/common/compat.c \
+ src/common/compat_threads.c \
src/common/container.c \
src/common/di_ops.c \
src/common/log.c \
@@ -69,7 +77,8 @@ LIBOR_A_SOURCES = \
src/ext/csiphash.c \
src/ext/trunnel/trunnel.c \
$(libor_extra_source) \
- $(libor_mempool_source)
+ $(libor_mempool_source) \
+ $(threads_impl_source)
LIBOR_CRYPTO_A_SOURCES = \
src/common/aes.c \
@@ -102,7 +111,6 @@ src_common_libor_testing_a_CFLAGS = $(AM_CFLAGS) $(TEST_CFLAGS)
src_common_libor_crypto_testing_a_CFLAGS = $(AM_CFLAGS) $(TEST_CFLAGS)
src_common_libor_event_testing_a_CFLAGS = $(AM_CFLAGS) $(TEST_CFLAGS)
-
COMMONHEADERS = \
src/common/address.h \
src/common/backtrace.h \
@@ -110,6 +118,7 @@ COMMONHEADERS = \
src/common/ciphers.inc \
src/common/compat.h \
src/common/compat_libevent.h \
+ src/common/compat_threads.h \
src/common/container.h \
src/common/crypto.h \
src/common/crypto_curve25519.h \
1
0

[tor/master] Specialize handling for mutexes allocated for condition variables
by nickm@torproject.org 21 Jan '15
by nickm@torproject.org 21 Jan '15
21 Jan '15
commit 6c9363310aaea9d39fae4d9dd50e78d42c3598b3
Author: Nick Mathewson <nickm(a)torproject.org>
Date: Tue Sep 24 15:03:51 2013 -0400
Specialize handling for mutexes allocated for condition variables
(These must not be reentrant mutexes with pthreads.)
---
src/common/compat_pthreads.c | 16 ++++++++++++++++
src/common/compat_threads.h | 1 +
src/common/compat_winthreads.c | 6 ++++++
3 files changed, 23 insertions(+)
diff --git a/src/common/compat_pthreads.c b/src/common/compat_pthreads.c
index e58b3f7..59b54a6 100644
--- a/src/common/compat_pthreads.c
+++ b/src/common/compat_pthreads.c
@@ -96,6 +96,22 @@ tor_mutex_init(tor_mutex_t *mutex)
tor_fragile_assert();
}
}
+
+/** As tor_mutex_init, but initialize a mutex suitable for use with a
+ * condition variable. */
+void
+tor_mutex_init_for_cond(tor_mutex_t *mutex)
+{
+ int err;
+ if (PREDICT_UNLIKELY(!threads_initialized))
+ tor_threads_init();
+ err = pthread_mutex_init(&mutex->mutex, NULL);
+ if (PREDICT_UNLIKELY(err)) {
+ log_err(LD_GENERAL, "Error %d creating a mutex.", err);
+ tor_fragile_assert();
+ }
+}
+
/** Wait until <b>m</b> is free, then acquire it. */
void
tor_mutex_acquire(tor_mutex_t *m)
diff --git a/src/common/compat_threads.h b/src/common/compat_threads.h
index 6d3ba3a..581d8dd 100644
--- a/src/common/compat_threads.h
+++ b/src/common/compat_threads.h
@@ -47,6 +47,7 @@ typedef struct tor_mutex_t {
tor_mutex_t *tor_mutex_new(void);
void tor_mutex_init(tor_mutex_t *m);
+void tor_mutex_init_for_cond(tor_mutex_t *m);
void tor_mutex_acquire(tor_mutex_t *m);
void tor_mutex_release(tor_mutex_t *m);
void tor_mutex_free(tor_mutex_t *m);
diff --git a/src/common/compat_winthreads.c b/src/common/compat_winthreads.c
index 11f91c6..2b1527a 100644
--- a/src/common/compat_winthreads.c
+++ b/src/common/compat_winthreads.c
@@ -49,6 +49,12 @@ tor_mutex_init(tor_mutex_t *m)
InitializeCriticalSection(&m->mutex);
}
void
+tor_mutex_init_for_cond(tor_mutex_t *m)
+{
+ InitializeCriticalSection(&m->mutex);
+}
+
+void
tor_mutex_uninit(tor_mutex_t *m)
{
DeleteCriticalSection(&m->mutex);
1
0

[tor/master] Add a timeout to tor_cond_wait; add tor_cond impl from libevent
by nickm@torproject.org 21 Jan '15
by nickm@torproject.org 21 Jan '15
21 Jan '15
commit e865248156a8512d756be003118de446d29611d1
Author: Nick Mathewson <nickm(a)torproject.org>
Date: Sun Sep 22 22:08:41 2013 -0400
Add a timeout to tor_cond_wait; add tor_cond impl from libevent
The windows code may need some tweaks for it to compile; I've not
tested it yet.
---
src/common/compat_pthreads.c | 24 +++++--
src/common/compat_threads.h | 19 ++++-
src/common/compat_winthreads.c | 152 ++++++++++++++++++++++++----------------
3 files changed, 126 insertions(+), 69 deletions(-)
diff --git a/src/common/compat_pthreads.c b/src/common/compat_pthreads.c
index 276b244..0e5d33a 100644
--- a/src/common/compat_pthreads.c
+++ b/src/common/compat_pthreads.c
@@ -148,10 +148,6 @@ tor_get_thread_id(void)
/* Conditions. */
-/** Cross-platform condition implementation. */
-struct tor_cond_t {
- pthread_cond_t cond;
-};
/** Return a newly allocated condition, with nobody waiting on it. */
tor_cond_t *
tor_cond_new(void)
@@ -177,11 +173,25 @@ tor_cond_free(tor_cond_t *cond)
}
/** Wait until one of the tor_cond_signal functions is called on <b>cond</b>.
* All waiters on the condition must wait holding the same <b>mutex</b>.
- * Returns 0 on success, negative on failure. */
+ * Returns 0 on success, -1 on failure, 1 on timeout. */
int
-tor_cond_wait(tor_cond_t *cond, tor_mutex_t *mutex)
+tor_cond_wait(tor_cond_t *cond, tor_mutex_t *mutex, const struct timeval *tv)
{
- return pthread_cond_wait(&cond->cond, &mutex->mutex) ? -1 : 0;
+ if (tv == NULL) {
+ return pthread_cond_wait(&cond->cond, &mutex->mutex) ? -1 : 0;
+ } else {
+ struct timespec ts;
+ int r;
+ ts.tv_sec = tv->tv_sec;
+ ts.tv_nsec = tv->tv_usec * 1000;
+ r = pthread_cond_timedwait(&cond->cond, &mutex->mutex, &ts);
+ if (r == 0)
+ return 0;
+ else if (r == ETIMEDOUT)
+ return 1;
+ else
+ return -1;
+ }
}
/** Wake up one of the waiters on <b>cond</b>. */
void
diff --git a/src/common/compat_threads.h b/src/common/compat_threads.h
index f43e74a..bbd782f 100644
--- a/src/common/compat_threads.h
+++ b/src/common/compat_threads.h
@@ -57,10 +57,25 @@ void tor_threads_init(void);
void set_main_thread(void);
int in_main_thread(void);
-typedef struct tor_cond_t tor_cond_t;
+typedef struct tor_cond_t {
+#ifdef USE_PTHREADS
+ pthread_cond_t cond;
+#elif defined(USE_WIN32_THREADS)
+ HANDLE event;
+
+ CRITICAL_SECTION lock;
+ int n_waiting;
+ int n_to_wake;
+ int generation;
+#else
+#error no known condition implementation.
+#endif
+} tor_cond_t;
+
tor_cond_t *tor_cond_new(void);
void tor_cond_free(tor_cond_t *cond);
-int tor_cond_wait(tor_cond_t *cond, tor_mutex_t *mutex);
+int tor_cond_wait(tor_cond_t *cond, tor_mutex_t *mutex,
+ const struct timeval *tv);
void tor_cond_signal_one(tor_cond_t *cond);
void tor_cond_signal_all(tor_cond_t *cond);
diff --git a/src/common/compat_winthreads.c b/src/common/compat_winthreads.c
index 01332fd..634dfbe 100644
--- a/src/common/compat_winthreads.c
+++ b/src/common/compat_winthreads.c
@@ -70,17 +70,20 @@ tor_get_thread_id(void)
return (unsigned long)GetCurrentThreadId();
}
-static DWORD cond_event_tls_index;
-struct tor_cond_t {
- CRITICAL_SECTION mutex;
- smartlist_t *events;
-};
tor_cond_t *
tor_cond_new(void)
{
- tor_cond_t *cond = tor_malloc_zero(sizeof(tor_cond_t));
- InitializeCriticalSection(&cond->mutex);
- cond->events = smartlist_new();
+ tor_cond_t *cond = tor_malloc(sizeof(tor_cond_t));
+ if (InitializeCriticalSectionAndSpinCount(&cond->lock, SPIN_COUNT)==0) {
+ tor_free(cond);
+ return NULL;
+ }
+ if ((cond->event = CreateEvent(NULL,TRUE,FALSE,NULL)) == NULL) {
+ DeleteCriticalSection(&cond->lock);
+ tor_free(cond);
+ return NULL;
+ }
+ cond->n_waiting = cond->n_to_wake = cond->generation = 0;
return cond;
}
void
@@ -88,74 +91,103 @@ tor_cond_free(tor_cond_t *cond)
{
if (!cond)
return;
- DeleteCriticalSection(&cond->mutex);
- /* XXXX notify? */
- smartlist_free(cond->events);
- tor_free(cond);
+ DeleteCriticalSection(&cond->lock);
+ CloseHandle(cond->event);
+ mm_free(cond);
}
-int
-tor_cond_wait(tor_cond_t *cond, tor_mutex_t *mutex)
+
+static void
+tor_cond_signal_impl(tor_cond_t *cond, int broadcast)
{
- HANDLE event;
- int r;
- tor_assert(cond);
- tor_assert(mutex);
- event = TlsGetValue(cond_event_tls_index);
- if (!event) {
- event = CreateEvent(0, FALSE, FALSE, NULL);
- TlsSetValue(cond_event_tls_index, event);
- }
- EnterCriticalSection(&cond->mutex);
-
- tor_assert(WaitForSingleObject(event, 0) == WAIT_TIMEOUT);
- tor_assert(!smartlist_contains(cond->events, event));
- smartlist_add(cond->events, event);
-
- LeaveCriticalSection(&cond->mutex);
-
- tor_mutex_release(mutex);
- r = WaitForSingleObject(event, INFINITE);
- tor_mutex_acquire(mutex);
-
- switch (r) {
- case WAIT_OBJECT_0: /* we got the mutex normally. */
- break;
- case WAIT_ABANDONED: /* holding thread exited. */
- case WAIT_TIMEOUT: /* Should never happen. */
- tor_assert(0);
- break;
- case WAIT_FAILED:
- log_warn(LD_GENERAL, "Failed to acquire mutex: %d",(int) GetLastError());
- }
+ EnterCriticalSection(&cond->lock);
+ if (broadcast)
+ cond->n_to_wake = cond->n_waiting;
+ else
+ ++cond->n_to_wake;
+ cond->generation++;
+ SetEvent(cond->event);
+ LeaveCriticalSection(&cond->lock);
return 0;
}
void
tor_cond_signal_one(tor_cond_t *cond)
{
- HANDLE event;
- tor_assert(cond);
-
- EnterCriticalSection(&cond->mutex);
-
- if ((event = smartlist_pop_last(cond->events)))
- SetEvent(event);
-
- LeaveCriticalSection(&cond->mutex);
+ tor_cond_signal_impl(cond, 0);
}
void
tor_cond_signal_all(tor_cond_t *cond)
{
- tor_assert(cond);
+ tor_cond_signal_impl(cond, 1);
+}
- EnterCriticalSection(&cond->mutex);
- SMARTLIST_FOREACH(cond->events, HANDLE, event, SetEvent(event));
- smartlist_clear(cond->events);
- LeaveCriticalSection(&cond->mutex);
+int
+tor_cond_wait(tor_cond_t *cond, tor_mutex_t *lock, const struct timeval *tv)
+{
+ CRITICAL_SECTION *lock = &lock->mutex;
+ int generation_at_start;
+ int waiting = 1;
+ int result = -1;
+ DWORD ms = INFINITE, ms_orig = INFINITE, startTime, endTime;
+ if (tv)
+ ms_orig = ms = evutil_tv_to_msec_(tv);
+
+ EnterCriticalSection(&cond->lock);
+ ++cond->n_waiting;
+ generation_at_start = cond->generation;
+ LeaveCriticalSection(&cond->lock);
+
+ LeaveCriticalSection(lock);
+
+ startTime = GetTickCount();
+ do {
+ DWORD res;
+ res = WaitForSingleObject(cond->event, ms);
+ EnterCriticalSection(&cond->lock);
+ if (cond->n_to_wake &&
+ cond->generation != generation_at_start) {
+ --cond->n_to_wake;
+ --cond->n_waiting;
+ result = 0;
+ waiting = 0;
+ goto out;
+ } else if (res != WAIT_OBJECT_0) {
+ result = (res==WAIT_TIMEOUT) ? 1 : -1;
+ --cond->n_waiting;
+ waiting = 0;
+ goto out;
+ } else if (ms != INFINITE) {
+ endTime = GetTickCount();
+ if (startTime + ms_orig <= endTime) {
+ result = 1; /* Timeout */
+ --cond->n_waiting;
+ waiting = 0;
+ goto out;
+ } else {
+ ms = startTime + ms_orig - endTime;
+ }
+ }
+ /* If we make it here, we are still waiting. */
+ if (cond->n_to_wake == 0) {
+ /* There is nobody else who should wake up; reset
+ * the event. */
+ ResetEvent(cond->event);
+ }
+ out:
+ LeaveCriticalSection(&cond->lock);
+ } while (waiting);
+
+ EnterCriticalSection(lock);
+
+ EnterCriticalSection(&cond->lock);
+ if (!cond->n_waiting)
+ ResetEvent(cond->event);
+ LeaveCriticalSection(&cond->lock);
+
+ return result;
}
void
tor_threads_init(void)
{
- cond_event_tls_index = TlsAlloc();
set_main_thread();
}
1
0

21 Jan '15
commit a82604b526a2a258e057d6d515ac17429eb6fb67
Author: Nick Mathewson <nickm(a)torproject.org>
Date: Mon Sep 23 01:19:16 2013 -0400
Initial workqueue implemention, with a simple test.
It seems to be working, but more tuning is needed.
---
src/common/include.am | 2 +
src/common/workqueue.c | 347 ++++++++++++++++++++++++++++++++++++++++++++
src/common/workqueue.h | 37 +++++
src/test/bench_workqueue.c | 298 +++++++++++++++++++++++++++++++++++++
src/test/include.am | 13 +-
5 files changed, 696 insertions(+), 1 deletion(-)
diff --git a/src/common/include.am b/src/common/include.am
index e4eeba6..14838ab 100644
--- a/src/common/include.am
+++ b/src/common/include.am
@@ -74,6 +74,7 @@ LIBOR_A_SOURCES = \
src/common/util_codedigest.c \
src/common/util_process.c \
src/common/sandbox.c \
+ src/common/workqueue.c \
src/ext/csiphash.c \
src/ext/trunnel/trunnel.c \
$(libor_extra_source) \
@@ -137,6 +138,7 @@ COMMONHEADERS = \
src/common/tortls.h \
src/common/util.h \
src/common/util_process.h \
+ src/common/workqueue.h \
$(libor_mempool_header)
noinst_HEADERS+= $(COMMONHEADERS)
diff --git a/src/common/workqueue.c b/src/common/workqueue.c
new file mode 100644
index 0000000..ea8dcb0
--- /dev/null
+++ b/src/common/workqueue.c
@@ -0,0 +1,347 @@
+/* Copyright (c) 2013, The Tor Project, Inc. */
+/* See LICENSE for licensing information */
+
+#include "orconfig.h"
+#include "compat.h"
+#include "compat_threads.h"
+#include "util.h"
+#include "workqueue.h"
+#include "tor_queue.h"
+#include "torlog.h"
+
+#ifdef HAVE_UNISTD_H
+// XXXX move wherever we move the write/send stuff
+#include <unistd.h>
+#endif
+
+/*
+ design:
+
+ each thread has its own queue, try to keep at least elements min..max cycles
+ worth of work on each queue.
+
+keep array of threads; round-robin between them.
+
+ When out of work, work-steal.
+
+ alert threads with condition variables.
+
+ alert main thread with fd, since it's libevent.
+
+
+ */
+
+typedef struct workqueue_entry_s {
+ TOR_SIMPLEQ_ENTRY(workqueue_entry_s) next_work;
+ int (*fn)(int status, void *state, void *arg);
+ void (*reply_fn)(void *arg);
+ void *arg;
+} workqueue_entry_t;
+
+struct replyqueue_s {
+ tor_mutex_t lock;
+ TOR_SIMPLEQ_HEAD(, workqueue_entry_s) answers;
+
+ void (*alert_fn)(struct replyqueue_s *); // lock not held on this, next 2.
+ tor_socket_t write_sock;
+ tor_socket_t read_sock;
+};
+
+typedef struct workerthread_s {
+ tor_mutex_t lock;
+ tor_cond_t condition;
+ TOR_SIMPLEQ_HEAD(, workqueue_entry_s) work;
+ unsigned is_running;
+ unsigned is_shut_down;
+ unsigned waiting;
+ void *state;
+ replyqueue_t *reply_queue;
+} workerthread_t;
+
+struct threadpool_s {
+ workerthread_t **threads;
+ int next_for_work;
+
+ tor_mutex_t lock;
+ int n_threads;
+
+ replyqueue_t *reply_queue;
+
+ void *(*new_thread_state_fn)(void*);
+ void (*free_thread_state_fn)(void*);
+ void *new_thread_state_arg;
+
+};
+
+static void queue_reply(replyqueue_t *queue, workqueue_entry_t *work);
+
+static workqueue_entry_t *
+workqueue_entry_new(int (*fn)(int, void*, void*),
+ void (*reply_fn)(void*),
+ void *arg)
+{
+ workqueue_entry_t *ent = tor_malloc_zero(sizeof(workqueue_entry_t));
+ ent->fn = fn;
+ ent->reply_fn = reply_fn;
+ ent->arg = arg;
+ return ent;
+}
+
+static void
+workqueue_entry_free(workqueue_entry_t *ent)
+{
+ if (!ent)
+ return;
+ tor_free(ent);
+}
+
+static void
+worker_thread_main(void *thread_)
+{
+ workerthread_t *thread = thread_;
+ workqueue_entry_t *work;
+ int result;
+
+ tor_mutex_acquire(&thread->lock);
+
+ thread->is_running = 1;
+ while (1) {
+ /* lock held. */
+ while (!TOR_SIMPLEQ_EMPTY(&thread->work)) {
+ /* lock held. */
+
+ work = TOR_SIMPLEQ_FIRST(&thread->work);
+ TOR_SIMPLEQ_REMOVE_HEAD(&thread->work, next_work);
+ tor_mutex_release(&thread->lock);
+
+ result = work->fn(WQ_CMD_RUN, thread->state, work->arg);
+
+ if (result == WQ_RPL_QUEUE) {
+ queue_reply(thread->reply_queue, work);
+ } else {
+ workqueue_entry_free(work);
+ }
+
+ tor_mutex_acquire(&thread->lock);
+ if (result >= WQ_RPL_ERROR) {
+ thread->is_running = 0;
+ thread->is_shut_down = 1;
+ tor_mutex_release(&thread->lock);
+ return;
+ }
+ }
+ /* Lock held; no work in this thread's queue. */
+
+ /* TODO: Try work-stealing. */
+
+ /* TODO: support an idle-function */
+
+ thread->waiting = 1;
+ if (tor_cond_wait(&thread->condition, &thread->lock, NULL) < 0)
+ /* ERR */
+ thread->waiting = 0;
+ }
+}
+
+static void
+queue_reply(replyqueue_t *queue, workqueue_entry_t *work)
+{
+ int was_empty;
+ tor_mutex_acquire(&queue->lock);
+ was_empty = TOR_SIMPLEQ_EMPTY(&queue->answers);
+ TOR_SIMPLEQ_INSERT_TAIL(&queue->answers, work, next_work);
+ tor_mutex_release(&queue->lock);
+
+ if (was_empty) {
+ queue->alert_fn(queue);
+ }
+}
+
+
+static void
+alert_by_fd(replyqueue_t *queue)
+{
+ /* XXX extract this into new function */
+#ifndef _WIN32
+ (void) send(queue->write_sock, "x", 1, 0);
+#else
+ (void) write(queue->write_sock, "x", 1);
+#endif
+}
+
+static workerthread_t *
+workerthread_new(void *state, replyqueue_t *replyqueue)
+{
+ workerthread_t *thr = tor_malloc_zero(sizeof(workerthread_t));
+ tor_mutex_init_for_cond(&thr->lock);
+ tor_cond_init(&thr->condition);
+ TOR_SIMPLEQ_INIT(&thr->work);
+ thr->state = state;
+ thr->reply_queue = replyqueue;
+
+ if (spawn_func(worker_thread_main, thr) < 0) {
+ log_err(LD_GENERAL, "Can't launch worker thread.");
+ return NULL;
+ }
+
+ return thr;
+}
+
+void *
+threadpool_queue_work(threadpool_t *pool,
+ int (*fn)(int, void *, void *),
+ void (*reply_fn)(void *),
+ void *arg)
+{
+ workqueue_entry_t *ent;
+ workerthread_t *worker;
+
+ tor_mutex_acquire(&pool->lock);
+ worker = pool->threads[pool->next_for_work++];
+ if (!worker) {
+ tor_mutex_release(&pool->lock);
+ return NULL;
+ }
+ if (pool->next_for_work >= pool->n_threads)
+ pool->next_for_work = 0;
+ tor_mutex_release(&pool->lock);
+
+
+ ent = workqueue_entry_new(fn, reply_fn, arg);
+
+ tor_mutex_acquire(&worker->lock);
+ TOR_SIMPLEQ_INSERT_TAIL(&worker->work, ent, next_work);
+
+ if (worker->waiting) /* XXXX inside or outside of lock?? */
+ tor_cond_signal_one(&worker->condition);
+
+ tor_mutex_release(&worker->lock);
+
+ return ent;
+}
+
+int
+threadpool_start_threads(threadpool_t *pool, int n)
+{
+ tor_mutex_acquire(&pool->lock);
+
+ if (pool->n_threads < n)
+ pool->threads = tor_realloc(pool->threads, sizeof(workerthread_t*)*n);
+
+ while (pool->n_threads < n) {
+ void *state = pool->new_thread_state_fn(pool->new_thread_state_arg);
+ workerthread_t *thr = workerthread_new(state, pool->reply_queue);
+
+ if (!thr) {
+ tor_mutex_release(&pool->lock);
+ return -1;
+ }
+ pool->threads[pool->n_threads++] = thr;
+ }
+ tor_mutex_release(&pool->lock);
+
+ return 0;
+}
+
+threadpool_t *
+threadpool_new(int n_threads,
+ replyqueue_t *replyqueue,
+ void *(*new_thread_state_fn)(void*),
+ void (*free_thread_state_fn)(void*),
+ void *arg)
+{
+ threadpool_t *pool;
+ pool = tor_malloc_zero(sizeof(threadpool_t));
+ tor_mutex_init(&pool->lock);
+ pool->new_thread_state_fn = new_thread_state_fn;
+ pool->new_thread_state_arg = arg;
+ pool->free_thread_state_fn = free_thread_state_fn;
+ pool->reply_queue = replyqueue;
+
+ if (threadpool_start_threads(pool, n_threads) < 0) {
+ tor_mutex_uninit(&pool->lock);
+ tor_free(pool);
+ return NULL;
+ }
+
+ return pool;
+}
+
+replyqueue_t *
+threadpool_get_replyqueue(threadpool_t *tp)
+{
+ return tp->reply_queue;
+}
+
+replyqueue_t *
+replyqueue_new(void)
+{
+ tor_socket_t pair[2];
+ replyqueue_t *rq;
+ int r;
+
+ /* XXX extract this into new function */
+#ifdef _WIN32
+ r = tor_socketpair(AF_UNIX, SOCK_STREAM, 0, pair);
+#else
+ r = pipe(pair);
+#endif
+ if (r < 0)
+ return NULL;
+
+ set_socket_nonblocking(pair[0]); /* the read-size should be nonblocking. */
+#if defined(FD_CLOEXEC)
+ fcntl(pair[0], F_SETFD, FD_CLOEXEC);
+ fcntl(pair[1], F_SETFD, FD_CLOEXEC);
+#endif
+
+ rq = tor_malloc_zero(sizeof(replyqueue_t));
+
+ tor_mutex_init(&rq->lock);
+ TOR_SIMPLEQ_INIT(&rq->answers);
+
+ rq->read_sock = pair[0];
+ rq->write_sock = pair[1];
+ rq->alert_fn = alert_by_fd;
+
+ return rq;
+}
+
+tor_socket_t
+replyqueue_get_socket(replyqueue_t *rq)
+{
+ return rq->read_sock;
+}
+
+void
+replyqueue_process(replyqueue_t *queue)
+{
+ ssize_t r;
+
+ /* XXX extract this into new function */
+ do {
+ char buf[64];
+#ifdef _WIN32
+ r = recv(queue->read_sock, buf, sizeof(buf), 0);
+#else
+ r = read(queue->read_sock, buf, sizeof(buf));
+#endif
+ } while (r > 0);
+
+ /* XXXX freak out on r == 0, or r == "error, not retryable". */
+
+ tor_mutex_acquire(&queue->lock);
+ while (!TOR_SIMPLEQ_EMPTY(&queue->answers)) {
+ /* lock held. */
+ workqueue_entry_t *work = TOR_SIMPLEQ_FIRST(&queue->answers);
+ TOR_SIMPLEQ_REMOVE_HEAD(&queue->answers, next_work);
+ tor_mutex_release(&queue->lock);
+
+ work->reply_fn(work->arg);
+ workqueue_entry_free(work);
+
+ tor_mutex_acquire(&queue->lock);
+ }
+
+ tor_mutex_release(&queue->lock);
+}
diff --git a/src/common/workqueue.h b/src/common/workqueue.h
new file mode 100644
index 0000000..e502734
--- /dev/null
+++ b/src/common/workqueue.h
@@ -0,0 +1,37 @@
+/* Copyright (c) 2013, The Tor Project, Inc. */
+/* See LICENSE for licensing information */
+
+#ifndef TOR_WORKQUEUE_H
+#define TOR_WORKQUEUE_H
+
+#include "compat.h"
+
+typedef struct replyqueue_s replyqueue_t;
+typedef struct threadpool_s threadpool_t;
+
+
+#define WQ_CMD_RUN 0
+#define WQ_CMD_CANCEL 1
+
+#define WQ_RPL_QUEUE 0
+#define WQ_RPL_NOQUEUE 1
+#define WQ_RPL_ERROR 2
+#define WQ_RPL_SHUTDOWN 3
+
+void *threadpool_queue_work(threadpool_t *pool,
+ int (*fn)(int, void *, void *),
+ void (*reply_fn)(void *),
+ void *arg);
+int threadpool_start_threads(threadpool_t *pool, int n);
+threadpool_t *threadpool_new(int n_threads,
+ replyqueue_t *replyqueue,
+ void *(*new_thread_state_fn)(void*),
+ void (*free_thread_state_fn)(void*),
+ void *arg);
+replyqueue_t *threadpool_get_replyqueue(threadpool_t *tp);
+
+replyqueue_t *replyqueue_new(void);
+tor_socket_t replyqueue_get_socket(replyqueue_t *rq);
+void replyqueue_process(replyqueue_t *queue);
+
+#endif
diff --git a/src/test/bench_workqueue.c b/src/test/bench_workqueue.c
new file mode 100644
index 0000000..1bdfbef
--- /dev/null
+++ b/src/test/bench_workqueue.c
@@ -0,0 +1,298 @@
+/* Copyright (c) 2001-2004, Roger Dingledine.
+ * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
+ * Copyright (c) 2007-2013, The Tor Project, Inc. */
+/* See LICENSE for licensing information */
+
+#include "or.h"
+#include "compat_threads.h"
+#include "onion.h"
+#include "workqueue.h"
+#include "crypto.h"
+#include "crypto_curve25519.h"
+#include "compat_libevent.h"
+
+#include <stdio.h>
+#ifdef HAVE_EVENT2_EVENT_H
+#include <event2/event.h>
+#else
+#include <event.h>
+#endif
+
+#ifdef TRACK_RESPONSES
+tor_mutex_t bitmap_mutex;
+int handled_len;
+bitarray_t *handled;
+#endif
+
+#define N_ITEMS 10000
+#define N_INFLIGHT 1000
+#define RELAUNCH_AT 250
+
+typedef struct state_s {
+ int magic;
+ int n_handled;
+ crypto_pk_t *rsa;
+ curve25519_secret_key_t ecdh;
+} state_t;
+
+typedef struct rsa_work_s {
+ int serial;
+ uint8_t msg[128];
+ uint8_t msglen;
+} rsa_work_t;
+
+typedef struct ecdh_work_s {
+ int serial;
+ union {
+ curve25519_public_key_t pk;
+ uint8_t msg[32];
+ } u;
+} ecdh_work_t;
+
+static void
+mark_handled(int serial)
+{
+#ifdef TRACK_RESPONSES
+ tor_mutex_acquire(&bitmap_mutex);
+ tor_assert(serial < handled_len);
+ tor_assert(! bitarray_is_set(handled, serial));
+ bitarray_set(handled, serial);
+ tor_mutex_release(&bitmap_mutex);
+#else
+ (void)serial;
+#endif
+}
+
+static int
+workqueue_do_rsa(int cmd, void *state, void *work)
+{
+ rsa_work_t *rw = work;
+ state_t *st = state;
+ crypto_pk_t *rsa = st->rsa;
+ uint8_t sig[256];
+ int len;
+
+ tor_assert(st->magic == 13371337);
+
+ if (cmd == WQ_CMD_CANCEL) {
+ tor_free(work);
+ return WQ_RPL_NOQUEUE;
+ }
+
+ len = crypto_pk_private_sign(rsa, (char*)sig, 256,
+ (char*)rw->msg, rw->msglen);
+ if (len < 0) {
+ tor_free(work);
+ return WQ_RPL_NOQUEUE;
+ }
+
+ memset(rw->msg, 0, sizeof(rw->msg));
+ rw->msglen = len;
+ memcpy(rw->msg, sig, len);
+ ++st->n_handled;
+
+ mark_handled(rw->serial);
+
+ return WQ_RPL_QUEUE;
+}
+
+#if 0
+static int
+workqueue_do_shutdown(int cmd, void *state, void *work)
+{
+ (void)state;
+ (void)work;
+ (void)cmd;
+ crypto_pk_free(((state_t*)state)->rsa);
+ tor_free(state);
+ return WQ_RPL_SHUTDOWN;
+}
+#endif
+
+static int
+workqueue_do_ecdh(int cmd, void *state, void *work)
+{
+ ecdh_work_t *ew = work;
+ uint8_t output[CURVE25519_OUTPUT_LEN];
+ state_t *st = state;
+
+ tor_assert(st->magic == 13371337);
+
+ if (cmd == WQ_CMD_CANCEL) {
+ tor_free(work);
+ return WQ_RPL_NOQUEUE;
+ }
+
+ curve25519_handshake(output, &st->ecdh, &ew->u.pk);
+ memcpy(ew->u.msg, output, CURVE25519_OUTPUT_LEN);
+ ++st->n_handled;
+ mark_handled(ew->serial);
+ return WQ_RPL_QUEUE;
+}
+
+static void *
+new_state(void *arg)
+{
+ state_t *st;
+ (void)arg;
+
+ st = tor_malloc(sizeof(*st));
+ /* Every thread gets its own keys. not a problem for benchmarking */
+ st->rsa = crypto_pk_new();
+ if (crypto_pk_generate_key_with_bits(st->rsa, 1024) < 0) {
+ puts("keygen failed");
+ crypto_pk_free(st->rsa);
+ tor_free(st);
+ return NULL;
+ }
+ curve25519_secret_key_generate(&st->ecdh, 0);
+ st->magic = 13371337;
+ return st;
+}
+
+static void
+free_state(void *arg)
+{
+ state_t *st = arg;
+ crypto_pk_free(st->rsa);
+ tor_free(st);
+}
+
+static tor_weak_rng_t weak_rng;
+static int n_sent = 0;
+static int rsa_sent = 0;
+static int ecdh_sent = 0;
+static int n_received = 0;
+
+#ifdef TRACK_RESPONSES
+bitarray_t *received;
+#endif
+
+static void
+handle_reply(void *arg)
+{
+#ifdef TRACK_RESPONSES
+ rsa_work_t *rw = arg; /* Naughty cast, but only looking at serial. */
+ tor_assert(! bitarray_is_set(received, rw->serial));
+ bitarray_set(received,rw->serial);
+#endif
+
+ tor_free(arg);
+ ++n_received;
+}
+
+static int
+add_work(threadpool_t *tp)
+{
+ int add_rsa = tor_weak_random_range(&weak_rng, 5) == 0;
+ if (add_rsa) {
+ rsa_work_t *w = tor_malloc_zero(sizeof(*w));
+ w->serial = n_sent++;
+ crypto_rand((char*)w->msg, 20);
+ w->msglen = 20;
+ ++rsa_sent;
+ return threadpool_queue_work(tp, workqueue_do_rsa, handle_reply, w) != NULL;
+ } else {
+ ecdh_work_t *w = tor_malloc_zero(sizeof(*w));
+ w->serial = n_sent++;
+ /* Not strictly right, but this is just for benchmarks. */
+ crypto_rand((char*)w->u.pk.public_key, 32);
+ ++ecdh_sent;
+ return threadpool_queue_work(tp, workqueue_do_ecdh, handle_reply, w) != NULL;
+ }
+}
+
+static void
+replysock_readable_cb(tor_socket_t sock, short what, void *arg)
+{
+ threadpool_t *tp = arg;
+ replyqueue_t *rq = threadpool_get_replyqueue(tp);
+
+ int old_r = n_received;
+ (void) sock;
+ (void) what;
+
+ replyqueue_process(rq);
+ if (old_r == n_received)
+ return;
+
+ printf("%d / %d\n", n_received, n_sent);
+#ifdef TRACK_RESPONSES
+ tor_mutex_acquire(&bitmap_mutex);
+ for (i = 0; i < N_ITEMS; ++i) {
+ if (bitarray_is_set(received, i))
+ putc('o', stdout);
+ else if (bitarray_is_set(handled, i))
+ putc('!', stdout);
+ else
+ putc('.', stdout);
+ }
+ puts("");
+ tor_mutex_release(&bitmap_mutex);
+#endif
+
+ if (n_sent - n_received < RELAUNCH_AT) {
+ while (n_sent < n_received + N_INFLIGHT && n_sent < N_ITEMS) {
+ if (! add_work(tp)) {
+ puts("Couldn't add work.");
+ tor_event_base_loopexit(tor_libevent_get_base(), NULL);
+ }
+ }
+ }
+
+ if (n_received == n_sent && n_sent >= N_ITEMS) {
+ tor_event_base_loopexit(tor_libevent_get_base(), NULL);
+ }
+}
+
+int
+main(int argc, char **argv)
+{
+ replyqueue_t *rq;
+ threadpool_t *tp;
+ int i;
+ tor_libevent_cfg evcfg;
+ struct event *ev;
+
+ (void)argc;
+ (void)argv;
+
+ init_logging(1);
+ crypto_global_init(1, NULL, NULL);
+ crypto_seed_rng(1);
+
+ rq = replyqueue_new();
+ tor_assert(rq);
+ tp = threadpool_new(16,
+ rq, new_state, free_state, NULL);
+ tor_assert(tp);
+
+ crypto_seed_weak_rng(&weak_rng);
+
+ memset(&evcfg, 0, sizeof(evcfg));
+ tor_libevent_initialize(&evcfg);
+
+ ev = tor_event_new(tor_libevent_get_base(),
+ replyqueue_get_socket(rq), EV_READ|EV_PERSIST,
+ replysock_readable_cb, tp);
+
+ event_add(ev, NULL);
+
+#ifdef TRACK_RESPONSES
+ handled = bitarray_init_zero(N_ITEMS);
+ received = bitarray_init_zero(N_ITEMS);
+ tor_mutex_init(&bitmap_mutex);
+ handled_len = N_ITEMS;
+#endif
+
+ for (i = 0; i < N_INFLIGHT; ++i) {
+ if (! add_work(tp)) {
+ puts("Couldn't add work.");
+ return 1;
+ }
+ }
+
+ event_base_loop(tor_libevent_get_base(), 0);
+
+ return 0;
+}
diff --git a/src/test/include.am b/src/test/include.am
index b9b381f..6ad1b55 100644
--- a/src/test/include.am
+++ b/src/test/include.am
@@ -1,6 +1,6 @@
TESTS += src/test/test
-noinst_PROGRAMS+= src/test/bench
+noinst_PROGRAMS+= src/test/bench src/test/bench_workqueue
if UNITTESTS_ENABLED
noinst_PROGRAMS+= src/test/test src/test/test-child
endif
@@ -62,6 +62,9 @@ src_test_test_CPPFLAGS= $(src_test_AM_CPPFLAGS)
src_test_bench_SOURCES = \
src/test/bench.c
+src_test_bench_workqueue_SOURCES = \
+ src/test/bench_workqueue.c
+
src_test_test_LDFLAGS = @TOR_LDFLAGS_zlib@ @TOR_LDFLAGS_openssl@ \
@TOR_LDFLAGS_libevent@
src_test_test_LDADD = src/or/libtor-testing.a src/common/libor-testing.a \
@@ -80,6 +83,14 @@ src_test_bench_LDADD = src/or/libtor.a src/common/libor.a \
@TOR_OPENSSL_LIBS@ @TOR_LIB_WS32@ @TOR_LIB_GDI@ @CURVE25519_LIBS@ \
@TOR_SYSTEMD_LIBS@
+src_test_bench_workqueue_LDFLAGS = @TOR_LDFLAGS_zlib@ @TOR_LDFLAGS_openssl@ \
+ @TOR_LDFLAGS_libevent@
+src_test_bench_workqueue_LDADD = src/or/libtor.a src/common/libor.a \
+ src/common/libor-crypto.a $(LIBDONNA) \
+ src/common/libor-event.a \
+ @TOR_ZLIB_LIBS@ @TOR_LIB_MATH@ @TOR_LIBEVENT_LIBS@ \
+ @TOR_OPENSSL_LIBS@ @TOR_LIB_WS32@ @TOR_LIB_GDI@ @CURVE25519_LIBS@
+
noinst_HEADERS+= \
src/test/fakechans.h \
src/test/test.h \
1
0
commit 65016304d23503e230e8b097b5cdc1e4897b9b57
Author: Nick Mathewson <nickm(a)torproject.org>
Date: Mon Sep 23 01:15:30 2013 -0400
Add tor_cond_init/uninit
---
src/common/compat_pthreads.c | 17 ++++++-----------
src/common/compat_threads.c | 17 +++++++++++++++++
src/common/compat_threads.h | 2 ++
src/common/compat_winthreads.c | 19 +++++++------------
4 files changed, 32 insertions(+), 23 deletions(-)
diff --git a/src/common/compat_pthreads.c b/src/common/compat_pthreads.c
index 0e5d33a..e58b3f7 100644
--- a/src/common/compat_pthreads.c
+++ b/src/common/compat_pthreads.c
@@ -148,28 +148,23 @@ tor_get_thread_id(void)
/* Conditions. */
-/** Return a newly allocated condition, with nobody waiting on it. */
-tor_cond_t *
-tor_cond_new(void)
+int
+tor_cond_init(tor_cond_t *cond)
{
- tor_cond_t *cond = tor_malloc_zero(sizeof(tor_cond_t));
+ memset(cond, 0, sizeof(tor_cond_t));
if (pthread_cond_init(&cond->cond, NULL)) {
- tor_free(cond);
- return NULL;
+ return -1;
}
- return cond;
+ return 0;
}
/** Release all resources held by <b>cond</b>. */
void
-tor_cond_free(tor_cond_t *cond)
+tor_cond_uninit(tor_cond_t *cond)
{
- if (!cond)
- return;
if (pthread_cond_destroy(&cond->cond)) {
log_warn(LD_GENERAL,"Error freeing condition: %s", strerror(errno));
return;
}
- tor_free(cond);
}
/** Wait until one of the tor_cond_signal functions is called on <b>cond</b>.
* All waiters on the condition must wait holding the same <b>mutex</b>.
diff --git a/src/common/compat_threads.c b/src/common/compat_threads.c
index 84a8a21..e0cbf5c 100644
--- a/src/common/compat_threads.c
+++ b/src/common/compat_threads.c
@@ -24,6 +24,23 @@ tor_mutex_free(tor_mutex_t *m)
tor_free(m);
}
+tor_cond_t *
+tor_cond_new(void)
+{
+ tor_cond_t *cond = tor_malloc(sizeof(tor_cond_t));
+ if (tor_cond_init(cond)<0)
+ tor_free(cond);
+ return cond;
+}
+void
+tor_cond_free(tor_cond_t *c)
+{
+ if (!c)
+ return;
+ tor_cond_uninit(c);
+ tor_free(c);
+}
+
/** Identity of the "main" thread */
static unsigned long main_thread_id = -1;
diff --git a/src/common/compat_threads.h b/src/common/compat_threads.h
index bbd782f..6d3ba3a 100644
--- a/src/common/compat_threads.h
+++ b/src/common/compat_threads.h
@@ -74,6 +74,8 @@ typedef struct tor_cond_t {
tor_cond_t *tor_cond_new(void);
void tor_cond_free(tor_cond_t *cond);
+int tor_cond_init(tor_cond_t *cond);
+void tor_cond_uninit(tor_cond_t *cond);
int tor_cond_wait(tor_cond_t *cond, tor_mutex_t *mutex,
const struct timeval *tv);
void tor_cond_signal_one(tor_cond_t *cond);
diff --git a/src/common/compat_winthreads.c b/src/common/compat_winthreads.c
index 634dfbe..11f91c6 100644
--- a/src/common/compat_winthreads.c
+++ b/src/common/compat_winthreads.c
@@ -70,30 +70,25 @@ tor_get_thread_id(void)
return (unsigned long)GetCurrentThreadId();
}
-tor_cond_t *
-tor_cond_new(void)
+int
+tor_cond_init(tor_cond_t *cond)
{
- tor_cond_t *cond = tor_malloc(sizeof(tor_cond_t));
+ memset(cond, 0, sizeof(tor_cond_t));
if (InitializeCriticalSectionAndSpinCount(&cond->lock, SPIN_COUNT)==0) {
- tor_free(cond);
- return NULL;
+ return -1;
}
if ((cond->event = CreateEvent(NULL,TRUE,FALSE,NULL)) == NULL) {
DeleteCriticalSection(&cond->lock);
- tor_free(cond);
- return NULL;
+ return -1;
}
cond->n_waiting = cond->n_to_wake = cond->generation = 0;
- return cond;
+ return 0;
}
void
-tor_cond_free(tor_cond_t *cond)
+tor_cond_uninit(tor_cond_t *cond)
{
- if (!cond)
- return;
DeleteCriticalSection(&cond->lock);
CloseHandle(cond->event);
- mm_free(cond);
}
static void
1
0