[or-cvs] [tor/master 18/38] Add support for linked connections with bufferevent_pair.

nickm at torproject.org nickm at torproject.org
Mon Sep 27 20:50:59 UTC 2010


Author: Nick Mathewson <nickm at torproject.org>
Date: Tue, 11 Aug 2009 15:16:16 -0400
Subject: Add support for linked connections with bufferevent_pair.
Commit: 4af6887d201d978a46072ead0036e0d16fa5908a

Also, set directory connections (linked and otherwise) to use bufferevents.

Also, stop using outbuf_flushlen anywhere except for OR connections.
---
 src/or/connection.c      |    2 +-
 src/or/connection.h      |    4 +-
 src/or/connection_edge.c |   22 ++++++++----
 src/or/connection_edge.h |    3 +-
 src/or/directory.c       |   11 ++++--
 src/or/main.c            |   90 +++++++++++++++++++++++++++++++++++++--------
 6 files changed, 102 insertions(+), 30 deletions(-)

diff --git a/src/or/connection.c b/src/or/connection.c
index 394aa3b..8b9d47d 100644
--- a/src/or/connection.c
+++ b/src/or/connection.c
@@ -191,8 +191,8 @@ connection_type_uses_bufferevent(connection_t *conn)
 {
   switch (conn->type) {
     case CONN_TYPE_AP:
-      return 1;
     case CONN_TYPE_EXIT:
+    case CONN_TYPE_DIR:
       return 1;
     default:
       return 0;
diff --git a/src/or/connection.h b/src/or/connection.h
index 4d269d6..adf79f1 100644
--- a/src/or/connection.h
+++ b/src/or/connection.h
@@ -100,7 +100,7 @@ connection_get_inbuf_len(connection_t *conn)
   IF_HAS_BUFFEREVENT(conn, {
     return evbuffer_get_length(bufferevent_get_input(conn->bufev));
   }) ELSE_IF_NO_BUFFEREVENT {
-    return buf_datalen(conn->inbuf);
+    return conn->inbuf ? buf_datalen(conn->inbuf) : 0;
   }
 }
 
@@ -110,7 +110,7 @@ connection_get_outbuf_len(connection_t *conn)
   IF_HAS_BUFFEREVENT(conn, {
     return evbuffer_get_length(bufferevent_get_output(conn->bufev));
   }) ELSE_IF_NO_BUFFEREVENT {
-    return buf_datalen(conn->outbuf);
+    return conn->outbuf ? buf_datalen(conn->outbuf) : 0;
   }
 }
 
diff --git a/src/or/connection_edge.c b/src/or/connection_edge.c
index f90c44f..39bc8e7 100644
--- a/src/or/connection_edge.c
+++ b/src/or/connection_edge.c
@@ -357,8 +357,9 @@ connection_edge_finished_connecting(edge_connection_t *edge_conn)
   rep_hist_note_exit_stream_opened(conn->port);
 
   conn->state = EXIT_CONN_STATE_OPEN;
-  connection_watch_events(conn, READ_EVENT); /* stop writing, keep reading */
-  if (connection_wants_to_flush(conn)) /* in case there are any queued relay
+  IF_HAS_NO_BUFFEREVENT(conn)
+    connection_watch_events(conn, READ_EVENT); /* stop writing, keep reading */
+  if (connection_get_outbuf_len(conn)) /* in case there are any queued relay
                                         * cells */
     connection_start_writing(conn);
   /* deliver a 'connected' relay cell back through the circuit. */
@@ -2109,8 +2110,10 @@ connection_ap_handshake_send_begin(edge_connection_t *ap_conn)
                ap_conn->socks_request->port);
   payload_len = (int)strlen(payload)+1;
 
-  log_debug(LD_APP,
-            "Sending relay cell to begin stream %d.", ap_conn->stream_id);
+  log_info(LD_APP,
+           "Sending relay cell %d to begin stream %d.",
+           (int)ap_conn->use_begindir,
+           ap_conn->stream_id);
 
   begin_type = ap_conn->use_begindir ?
                  RELAY_COMMAND_BEGIN_DIR : RELAY_COMMAND_BEGIN;
@@ -2218,9 +2221,11 @@ connection_ap_handshake_send_resolve(edge_connection_t *ap_conn)
  * and call connection_ap_handshake_attach_circuit(conn) on it.
  *
  * Return the other end of the linked connection pair, or -1 if error.
+ * DOCDOC partner.
  */
 edge_connection_t *
-connection_ap_make_link(char *address, uint16_t port,
+connection_ap_make_link(connection_t *partner,
+                        char *address, uint16_t port,
                         const char *digest, int use_begindir, int want_onehop)
 {
   edge_connection_t *conn;
@@ -2255,6 +2260,8 @@ connection_ap_make_link(char *address, uint16_t port,
   tor_addr_make_unspec(&conn->_base.addr);
   conn->_base.port = 0;
 
+  connection_link_connections(partner, TO_CONN(conn));
+
   if (connection_add(TO_CONN(conn)) < 0) { /* no space, forget it */
     connection_free(TO_CONN(conn));
     return NULL;
@@ -2772,12 +2779,13 @@ connection_exit_connect(edge_connection_t *edge_conn)
   }
 
   conn->state = EXIT_CONN_STATE_OPEN;
-  if (connection_wants_to_flush(conn)) {
+  if (connection_get_outbuf_len(conn)) {
     /* in case there are any queued data cells */
     log_warn(LD_BUG,"newly connected conn had data waiting!");
 //    connection_start_writing(conn);
   }
-  connection_watch_events(conn, READ_EVENT);
+  IF_HAS_NO_BUFFEREVENT(conn)
+    connection_watch_events(conn, READ_EVENT);
 
   /* also, deliver a 'connected' cell back through the circuit. */
   if (connection_edge_is_rendezvous_stream(edge_conn)) {
diff --git a/src/or/connection_edge.h b/src/or/connection_edge.h
index 762af51..0f7bf07 100644
--- a/src/or/connection_edge.h
+++ b/src/or/connection_edge.h
@@ -29,7 +29,8 @@ int connection_edge_finished_connecting(edge_connection_t *conn);
 int connection_ap_handshake_send_begin(edge_connection_t *ap_conn);
 int connection_ap_handshake_send_resolve(edge_connection_t *ap_conn);
 
-edge_connection_t  *connection_ap_make_link(char *address, uint16_t port,
+edge_connection_t  *connection_ap_make_link(connection_t *partner,
+                                            char *address, uint16_t port,
                                             const char *digest,
                                             int use_begindir, int want_onehop);
 void connection_ap_handshake_socks_reply(edge_connection_t *conn, char *reply,
diff --git a/src/or/directory.c b/src/or/directory.c
index 284a1ad..ac6f205 100644
--- a/src/or/directory.c
+++ b/src/or/directory.c
@@ -892,14 +892,14 @@ directory_initiate_command_rend(const char *address, const tor_addr_t *_addr,
      * hook up both sides
      */
     linked_conn =
-      connection_ap_make_link(conn->_base.address, conn->_base.port,
+      connection_ap_make_link(TO_CONN(conn),
+                              conn->_base.address, conn->_base.port,
                               digest, use_begindir, conn->dirconn_direct);
     if (!linked_conn) {
       log_warn(LD_NET,"Making tunnel to dirserver failed.");
       connection_mark_for_close(TO_CONN(conn));
       return;
     }
-    connection_link_connections(TO_CONN(conn), TO_CONN(linked_conn));
 
     if (connection_add(TO_CONN(conn)) < 0) {
       log_warn(LD_NET,"Unable to add connection for link to dirserver.");
@@ -912,8 +912,12 @@ directory_initiate_command_rend(const char *address, const tor_addr_t *_addr,
                            payload, payload_len,
                            supports_conditional_consensus,
                            if_modified_since);
+
     connection_watch_events(TO_CONN(conn), READ_EVENT|WRITE_EVENT);
-    connection_start_reading(TO_CONN(linked_conn));
+    IF_HAS_BUFFEREVENT(TO_CONN(linked_conn), {
+      connection_watch_events(TO_CONN(linked_conn), READ_EVENT|WRITE_EVENT);
+    }) ELSE_IF_NO_BUFFEREVENT
+      connection_start_reading(TO_CONN(linked_conn));
   }
 }
 
@@ -3352,6 +3356,7 @@ connection_dir_finished_flushing(dir_connection_t *conn)
                               DIRREQ_DIRECT,
                               DIRREQ_FLUSHING_DIR_CONN_FINISHED);
   switch (conn->_base.state) {
+    case DIR_CONN_STATE_CONNECTING:
     case DIR_CONN_STATE_CLIENT_SENDING:
       log_debug(LD_DIR,"client finished sending command.");
       conn->_base.state = DIR_CONN_STATE_CLIENT_READING;
diff --git a/src/or/main.c b/src/or/main.c
index 976d805..cba98a8 100644
--- a/src/or/main.c
+++ b/src/or/main.c
@@ -155,6 +155,32 @@ int can_complete_circuit=0;
 *
 ****************************************************************************/
 
+#ifdef USE_BUFFEREVENTS
+static void
+free_old_inbuf(connection_t *conn)
+{
+  if (! conn->inbuf)
+    return;
+
+  tor_assert(conn->outbuf);
+  tor_assert(buf_datalen(conn->inbuf) == 0);
+  tor_assert(buf_datalen(conn->outbuf) == 0);
+  buf_free(conn->inbuf);
+  buf_free(conn->outbuf);
+  conn->inbuf = conn->outbuf = NULL;
+
+  if (conn->read_event) {
+    event_del(conn->read_event);
+    tor_event_free(conn->read_event);
+  }
+  if (conn->write_event) {
+    event_del(conn->read_event);
+    tor_event_free(conn->write_event);
+  }
+  conn->read_event = conn->write_event = NULL;
+}
+#endif
+
 /** Add <b>conn</b> to the array of connections that we can poll on.  The
  * connection's socket must be set; the connection starts out
  * non-reading and non-writing.
@@ -173,28 +199,47 @@ connection_add_impl(connection_t *conn, int is_connecting)
   smartlist_add(connection_array, conn);
 
 #ifdef USE_BUFFEREVENTS
-  if (connection_type_uses_bufferevent(conn) &&
-      conn->s >= 0 && !conn->linked) {
-    conn->bufev = bufferevent_socket_new(
+  if (connection_type_uses_bufferevent(conn)) {
+    if (conn->s >= 0 && !conn->linked) {
+      conn->bufev = bufferevent_socket_new(
                          tor_libevent_get_base(),
                          conn->s,
                          BEV_OPT_DEFER_CALLBACKS);
-   if (conn->inbuf) {
+      /* XXXX CHECK FOR NULL RETURN! */
+      if (is_connecting) {
+        /* Put the bufferevent into a "connecting" state so that we'll get
+         * a "connected" event callback on successful write. */
+        bufferevent_socket_connect(conn->bufev, NULL, 0);
+      }
+      connection_configure_bufferevent_callbacks(conn);
+    } else if (conn->linked && conn->linked_conn &&
+               connection_type_uses_bufferevent(conn->linked_conn)) {
+      tor_assert(conn->s < 0);
+      if (!conn->bufev) {
+        struct bufferevent *pair[2] = { NULL, NULL };
+        /* XXXX CHECK FOR ERROR RETURN! */
+        bufferevent_pair_new(tor_libevent_get_base(),
+                             BEV_OPT_DEFER_CALLBACKS,
+                             pair);
+        tor_assert(pair[0]);
+        conn->bufev = pair[0];
+        conn->linked_conn->bufev = pair[1];
+      } /* else the other side already was added, and got a bufferevent_pair */
+      connection_configure_bufferevent_callbacks(conn);
+    }
+
+    if (conn->bufev && conn->inbuf) {
       /* XXX Instead we should assert that there is no inbuf, once we
        * have linked connections using bufferevents. */
-      tor_assert(conn->outbuf);
-      tor_assert(buf_datalen(conn->inbuf) == 0);
-      tor_assert(buf_datalen(conn->outbuf) == 0);
-      buf_free(conn->inbuf);
-      buf_free(conn->outbuf);
-      conn->inbuf = conn->outbuf = NULL;
+      free_old_inbuf(conn);
     }
-    if (is_connecting) {
-      /* Put the bufferevent into a "connecting" state so that we'll get
-       * a "connected" event callback on successful write. */
-      bufferevent_socket_connect(conn->bufev, NULL, 0);
+
+    if (conn->linked_conn && conn->linked_conn->bufev &&
+        conn->linked_conn->inbuf) {
+      /* XXX Instead we should assert that there is no inbuf, once we
+       * have linked connections using bufferevents. */
+      free_old_inbuf(conn->linked_conn);
     }
-    connection_configure_bufferevent_callbacks(conn);
   }
 #else
   (void) is_connecting;
@@ -205,6 +250,7 @@ connection_add_impl(connection_t *conn, int is_connecting)
          conn->s, EV_READ|EV_PERSIST, conn_read_callback, conn);
     conn->write_event = tor_event_new(tor_libevent_get_base(),
          conn->s, EV_WRITE|EV_PERSIST, conn_write_callback, conn);
+    /* XXXX CHECK FOR NULL RETURN! */
   }
 
   log_debug(LD_NET,"new conn type %s, socket %d, address %s, n_conns %d.",
@@ -671,11 +717,19 @@ conn_close_if_marked(int i)
   /* assert_all_pending_dns_resolves_ok(); */
 
 #ifdef USE_BUFFEREVENTS
-  if (conn->bufev && conn->hold_open_until_flushed)
+  if (conn->bufev && conn->hold_open_until_flushed) {
+    if (conn->linked) {
+      /* We need to do this explicitly so that the linked connection
+       * notices that there was an EOF. */
+      bufferevent_flush(conn->bufev, EV_WRITE, BEV_FINISHED);
+      /* XXXX Now can we free it? */
+    }
     return 0;
+  }
 #endif
 
   log_debug(LD_NET,"Cleaning up connection (fd %d).",conn->s);
+  IF_HAS_BUFFEREVENT(conn, goto unlink);
   if ((conn->s >= 0 || conn->linked_conn) && connection_wants_to_flush(conn)) {
     /* s == -1 means it's an incomplete edge connection, or that the socket
      * has already been closed as unflushable. */
@@ -743,6 +797,10 @@ conn_close_if_marked(int i)
              conn->marked_for_close);
     }
   }
+
+#ifdef USE_BUFFEREVENTS
+ unlink:
+#endif
   connection_unlink(conn); /* unlink, remove, free */
   return 1;
 }
-- 
1.7.1




More information about the tor-commits mailing list