[tor-commits] [tor/master] Isolate the "socketpair or a pipe" logic for alerting main thread

nickm at torproject.org nickm at torproject.org
Wed Jan 21 19:50:31 UTC 2015


commit 51bc0e7f3d612b099382500b434d31f179eaa8a8
Author: Nick Mathewson <nickm at torproject.org>
Date:   Tue Sep 24 20:43:48 2013 -0400

    Isolate the "socketpair or a pipe" logic for alerting main thread
    
    This way we can use the linux eventfd extension where available.
    Using EVFILT_USER on the BSDs will be a teeny bit trickier, and will
    require libevent hacking.
---
 configure.ac                |    4 ++
 src/common/compat_threads.c |  150 +++++++++++++++++++++++++++++++++++++++++++
 src/common/compat_threads.h |   11 ++++
 src/common/workqueue.c      |   68 ++++----------------
 4 files changed, 177 insertions(+), 56 deletions(-)

diff --git a/configure.ac b/configure.ac
index 65b3ff2..69a266a 100644
--- a/configure.ac
+++ b/configure.ac
@@ -407,6 +407,7 @@ AC_CHECK_FUNCS(
         backtrace \
         backtrace_symbols_fd \
         clock_gettime \
+	eventfd \
         flock \
         ftime \
         getaddrinfo \
@@ -421,6 +422,8 @@ AC_CHECK_FUNCS(
         localtime_r \
         lround \
         memmem \
+	pipe \
+	pipe2 \
         prctl \
         rint \
         sigaction \
@@ -962,6 +965,7 @@ AC_CHECK_HEADERS(
         netinet/in6.h \
         pwd.h \
         stdint.h \
+	sys/eventfd.h \
         sys/file.h \
         sys/ioctl.h \
         sys/limits.h \
diff --git a/src/common/compat_threads.c b/src/common/compat_threads.c
index e0cbf5c..98bdbbc 100644
--- a/src/common/compat_threads.c
+++ b/src/common/compat_threads.c
@@ -3,8 +3,24 @@
  * Copyright (c) 2007-2015, The Tor Project, Inc. */
 /* See LICENSE for licensing information */
 
+#include "orconfig.h"
+#define _GNU_SOURCE
+#include <stdlib.h>
 #include "compat.h"
+#include "compat_threads.h"
+
 #include "util.h"
+#include "torlog.h"
+
+#ifdef HAVE_SYS_EVENTFD_H
+#include <sys/eventfd.h>
+#endif
+#ifdef HAVE_UNISTD_H
+#include <unistd.h>
+#endif
+#ifdef HAVE_FCNTL_H
+#include <fcntl.h>
+#endif
 
 /** Return a newly allocated, ready-for-use mutex. */
 tor_mutex_t *
@@ -57,3 +73,137 @@ in_main_thread(void)
 {
   return main_thread_id == tor_get_thread_id();
 }
+
+#ifdef HAVE_EVENTFD
+static int
+eventfd_alert(int fd)
+{
+  uint64_t u = 1;
+  int r = write(fd, (void*)&u, sizeof(u));
+  if (r < 0 && errno != EAGAIN)
+    return -1;
+  return 0;
+}
+
+static int
+eventfd_drain(int fd)
+{
+  uint64_t u = 0;
+  int r = read(fd, (void*)&u, sizeof(u));
+  if (r < 0 && errno != EAGAIN)
+    return -1;
+  return 0;
+}
+#endif
+
+#ifdef HAVE_PIPE
+static int
+pipe_alert(int fd)
+{
+  ssize_t r = write(fd, "x", 1);
+  if (r < 0 && errno != EAGAIN)
+    return -1;
+  return 0;
+}
+
+static int
+pipe_drain(int fd)
+{
+  char buf[32];
+  ssize_t r;
+  while ((r = read(fd, buf, sizeof(buf))) >= 0)
+    ;
+  if (r == 0 || errno != EAGAIN)
+    return -1;
+  return 0;
+}
+#endif
+
+static int
+sock_alert(tor_socket_t fd)
+{
+  ssize_t r = send(fd, "x", 1, 0);
+  if (r < 0 && !ERRNO_IS_EAGAIN(tor_socket_errno(fd)))
+    return -1;
+  return 0;
+}
+
+static int
+sock_drain(tor_socket_t fd)
+{
+  char buf[32];
+  ssize_t r;
+  while ((r = recv(fd, buf, sizeof(buf), 0)) >= 0)
+    ;
+  if (r == 0 || !ERRNO_IS_EAGAIN(tor_socket_errno(fd)))
+    return -1;
+  return 0;
+}
+
+/** Allocate a new set of alert sockets. DOCDOC */
+int
+alert_sockets_create(alert_sockets_t *socks_out)
+{
+  tor_socket_t socks[2];
+
+#ifdef HAVE_EVENTFD
+#if defined(EFD_CLOEXEC) && defined(EFD_NONBLOCK)
+  socks[0] = eventfd(0, EFD_CLOEXEC|EFD_NONBLOCK);
+#else
+  socks[0] = -1;
+#endif
+  if (socks[0] < 0) {
+    socks[0] = eventfd(0,0);
+    if (socks[0] >= 0) {
+      if (fcntl(socks[0], F_SETFD, FD_CLOEXEC) < 0 ||
+          set_socket_nonblocking(socks[0]) < 0) {
+        close(socks[0]);
+        return -1;
+      }
+    }
+  }
+  if (socks[0] >= 0) {
+    socks_out->read_fd = socks_out->write_fd = socks[0];
+    socks_out->alert_fn = eventfd_alert;
+    socks_out->drain_fn = eventfd_drain;
+    return 0;
+  }
+#endif
+
+#ifdef HAVE_PIPE2
+  if (pipe2(socks, O_NONBLOCK|O_CLOEXEC) == 0) {
+    socks_out->read_fd = socks[0];
+    socks_out->write_fd = socks[1];
+    socks_out->alert_fn = pipe_alert;
+    socks_out->drain_fn = pipe_drain;
+    return 0;
+  }
+#endif
+
+#ifdef HAVE_PIPE
+  if (pipe(socks) == 0) {
+    if (fcntl(socks[0], F_SETFD, FD_CLOEXEC) < 0 ||
+        fcntl(socks[1], F_SETFD, FD_CLOEXEC) < 0 ||
+        set_socket_nonblocking(socks[0]) < 0 ||
+        set_socket_nonblocking(socks[1]) < 0) {
+      close(socks[0]);
+      close(socks[1]);
+      return -1;
+    }
+    socks_out->read_fd = socks[0];
+    socks_out->write_fd = socks[1];
+    socks_out->alert_fn = pipe_alert;
+    socks_out->drain_fn = pipe_drain;
+    return 0;
+  }
+#endif
+
+  if (tor_socketpair(AF_UNIX, SOCK_STREAM, 0, socks) == 0) {
+    set_socket_nonblocking(socks[0]);
+    set_socket_nonblocking(socks[1]);
+    socks_out->alert_fn = sock_alert;
+    socks_out->drain_fn = sock_drain;
+    return 0;
+  }
+  return -1;
+}
diff --git a/src/common/compat_threads.h b/src/common/compat_threads.h
index 581d8dd..b053136 100644
--- a/src/common/compat_threads.h
+++ b/src/common/compat_threads.h
@@ -82,4 +82,15 @@ int tor_cond_wait(tor_cond_t *cond, tor_mutex_t *mutex,
 void tor_cond_signal_one(tor_cond_t *cond);
 void tor_cond_signal_all(tor_cond_t *cond);
 
+/** DOCDOC */
+typedef struct alert_sockets_s {
+  /*XXX needs a better name */
+  tor_socket_t read_fd;
+  tor_socket_t write_fd;
+  int (*alert_fn)(tor_socket_t write_fd);
+  int (*drain_fn)(tor_socket_t read_fd);
+} alert_sockets_t;
+
+int alert_sockets_create(alert_sockets_t *socks_out);
+
 #endif
diff --git a/src/common/workqueue.c b/src/common/workqueue.c
index 80e061d..ed70d5e 100644
--- a/src/common/workqueue.c
+++ b/src/common/workqueue.c
@@ -9,11 +9,6 @@
 #include "tor_queue.h"
 #include "torlog.h"
 
-#ifdef HAVE_UNISTD_H
-// XXXX move wherever we move the write/send stuff
-#include <unistd.h>
-#endif
-
 /*
   design:
 
@@ -44,9 +39,7 @@ struct replyqueue_s {
   tor_mutex_t lock;
   TOR_TAILQ_HEAD(, workqueue_entry_s) answers;
 
-  void (*alert_fn)(struct replyqueue_s *); // lock not held on this, next 2. 
-  tor_socket_t write_sock;
-  tor_socket_t read_sock;
+  alert_sockets_t alert; // lock not held on this.
 };
 
 typedef struct workerthread_s {
@@ -169,22 +162,12 @@ queue_reply(replyqueue_t *queue, workqueue_entry_t *work)
   tor_mutex_release(&queue->lock);
 
   if (was_empty) {
-    queue->alert_fn(queue);
+    if (queue->alert.alert_fn(queue->alert.write_fd) < 0) {
+      /* XXXX complain! */
+    }
   }
 }
 
-
-static void
-alert_by_fd(replyqueue_t *queue)
-{
-  /* XXX extract this into new function */
-#ifndef _WIN32
-  (void) send(queue->write_sock, "x", 1, 0);
-#else
-  (void) write(queue->write_sock, "x", 1);
-#endif
-}
-
 static workerthread_t *
 workerthread_new(void *state, replyqueue_t *replyqueue)
 {
@@ -293,59 +276,32 @@ threadpool_get_replyqueue(threadpool_t *tp)
 replyqueue_t *
 replyqueue_new(void)
 {
-  tor_socket_t pair[2];
   replyqueue_t *rq;
-  int r;
-
-  /* XXX extract this into new function */
-#ifdef _WIN32
-  r = tor_socketpair(AF_UNIX, SOCK_STREAM, 0, pair);
-#else
-  r = pipe(pair);
-#endif
-  if (r < 0)
-    return NULL;
-
-  set_socket_nonblocking(pair[0]); /* the read-size should be nonblocking. */
-#if defined(FD_CLOEXEC)
-  fcntl(pair[0], F_SETFD, FD_CLOEXEC);
-  fcntl(pair[1], F_SETFD, FD_CLOEXEC);
-#endif
 
   rq = tor_malloc_zero(sizeof(replyqueue_t));
+  if (alert_sockets_create(&rq->alert) < 0) {
+    tor_free(rq);
+    return NULL;
+  }
 
   tor_mutex_init(&rq->lock);
   TOR_TAILQ_INIT(&rq->answers);
 
-  rq->read_sock = pair[0];
-  rq->write_sock = pair[1];
-  rq->alert_fn = alert_by_fd;
-
   return rq;
 }
 
 tor_socket_t
 replyqueue_get_socket(replyqueue_t *rq)
 {
-  return rq->read_sock;
+  return rq->alert.read_fd;
 }
 
 void
 replyqueue_process(replyqueue_t *queue)
 {
-  ssize_t r;
-
-  /* XXX extract this into new function */
-  do {
-    char buf[64];
-#ifdef _WIN32
-    r = recv(queue->read_sock, buf, sizeof(buf), 0);
-#else
-    r = read(queue->read_sock, buf, sizeof(buf));
-#endif
-  } while (r > 0);
-
-  /* XXXX freak out on r == 0, or r == "error, not retryable". */
+  if (queue->alert.drain_fn(queue->alert.read_fd) < 0) {
+    /* XXXX complain! */
+  }
 
   tor_mutex_acquire(&queue->lock);
   while (!TOR_TAILQ_EMPTY(&queue->answers)) {





More information about the tor-commits mailing list