[or-cvs] clean up receiver buckets; prepare for payloads in relay_en...

Roger Dingledine arma at seul.org
Sat Sep 27 21:09:59 UTC 2003


Update of /home/or/cvsroot/src/or
In directory moria.mit.edu:/home2/arma/work/onion/cvs/src/or

Modified Files:
	circuit.c command.c connection.c connection_edge.c 
	connection_or.c cpuworker.c directory.c dns.c main.c or.h 
Log Message:
clean up receiver buckets; prepare for payloads in relay_end; note a few bugs


Index: circuit.c
===================================================================
RCS file: /home/or/cvsroot/src/or/circuit.c,v
retrieving revision 1.69
retrieving revision 1.70
diff -u -d -r1.69 -r1.70
--- circuit.c	26 Sep 2003 10:03:49 -0000	1.69
+++ circuit.c	27 Sep 2003 21:09:55 -0000	1.70
@@ -125,6 +125,7 @@
 
   high_bit = (aci_type == ACI_TYPE_HIGHER) ? 1<<15 : 0;
   conn = connection_exact_get_by_addr_port(addr,port);
+  /* XXX race condition: if conn is marked_for_close it won't be noticed */
   if (!conn)
     return (1|high_bit); /* No connection exists; conflict is impossible. */
 
@@ -910,7 +911,7 @@
     for(stream = circ->p_streams; stream; stream=stream->next_stream) {
       if(stream->cpath_layer == victim) {
         log_fn(LOG_INFO, "Marking stream %d for close.", *(int*)stream->stream_id);
-        stream->marked_for_close = 1;
+/*ENDCLOSE*/    stream->marked_for_close = 1;
       }
     }
 

Index: command.c
===================================================================
RCS file: /home/or/cvsroot/src/or/command.c,v
retrieving revision 1.38
retrieving revision 1.39
diff -u -d -r1.38 -r1.39
--- command.c	26 Sep 2003 10:03:49 -0000	1.38
+++ command.c	27 Sep 2003 21:09:55 -0000	1.39
@@ -88,7 +88,7 @@
   circ = circuit_get_by_aci_conn(cell->aci, conn);
 
   if(circ) {
-    log_fn(LOG_WARNING,"received CREATE cell for known circ. Dropping.");
+    log_fn(LOG_WARNING,"received CREATE cell (aci %d) for known circ. Dropping.", cell->aci);
     return;
   }
 
@@ -118,7 +118,7 @@
   circ = circuit_get_by_aci_conn(cell->aci, conn);
 
   if(!circ) {
-    log_fn(LOG_WARNING,"received CREATED cell for unknown circ. Dropping.");
+    log_fn(LOG_WARNING,"received CREATED cell (aci %d) for unknown circ. Dropping.", cell->aci);
     return;
   }
 

Index: connection.c
===================================================================
RCS file: /home/or/cvsroot/src/or/connection.c,v
retrieving revision 1.105
retrieving revision 1.106
diff -u -d -r1.105 -r1.106
--- connection.c	27 Sep 2003 07:28:44 -0000	1.105
+++ connection.c	27 Sep 2003 21:09:55 -0000	1.106
@@ -83,9 +83,6 @@
   conn->inbuf = buf_new();
   conn->outbuf = buf_new();
 
-  conn->receiver_bucket = 50000; /* should be enough to do the handshake */
-  conn->bandwidth = conn->receiver_bucket / 10; /* give it a default */
-
   conn->timestamp_created = now.tv_sec;
   conn->timestamp_lastread = now.tv_sec;
   conn->timestamp_lastwritten = now.tv_sec;
@@ -149,8 +146,6 @@
 
   conn = connection_new(type);
   conn->s = s;
-  conn->receiver_bucket = -1; /* non-cell connections don't do receiver buckets */
-  conn->bandwidth = -1;
 
   if(connection_add(conn) < 0) { /* no space, forget it */
     log_fn(LOG_WARNING,"connection_add failed. Giving up.");
@@ -197,11 +192,6 @@
   newconn = connection_new(new_type);
   newconn->s = news;
 
-  if(!connection_speaks_cells(newconn)) {
-    newconn->receiver_bucket = -1;
-    newconn->bandwidth = -1;
-  }
-
   newconn->address = strdup(inet_ntoa(remote.sin_addr)); /* remember the remote address */
   newconn->addr = ntohl(remote.sin_addr.s_addr);
   newconn->port = ntohs(remote.sin_port);
@@ -305,7 +295,7 @@
       }
       crypto_free_pk_env(pk);
     } else { /* it's an OP */
-      conn->bandwidth = DEFAULT_BANDWIDTH_OP;
+      conn->receiver_bucket = conn->bandwidth = DEFAULT_BANDWIDTH_OP;
     }
   } else { /* I'm a client */
     if(!tor_tls_peer_has_cert(conn->tls)) { /* it's a client too?! */
@@ -330,7 +320,7 @@
     }
     log_fn(LOG_DEBUG,"The router's pk matches the one we meant to connect to. Good.");
     crypto_free_pk_env(pk);
-    conn->bandwidth = DEFAULT_BANDWIDTH_OP;
+    conn->receiver_bucket = conn->bandwidth = DEFAULT_BANDWIDTH_OP;
     circuit_n_conn_open(conn); /* send the pending create */
   }
   return 0;
@@ -446,10 +436,6 @@
     //log_fn(LOG_DEBUG,"connection_process_inbuf returned %d.",retval);
     return -1;
   }
-  if(!connection_state_is_open(conn) && conn->receiver_bucket == 0) {
-    log_fn(LOG_WARNING,"receiver bucket reached 0 before handshake finished. Closing.");
-    return -1;
-  }
   return 0;
 }
 
@@ -458,9 +444,6 @@
   int result;
   int at_most;
 
-  assert((connection_speaks_cells(conn) && conn->receiver_bucket >= 0) ||
-         (!connection_speaks_cells(conn) && conn->receiver_bucket < 0));
-
   if(options.LinkPadding) {
     at_most = global_read_bucket;
   } else {
@@ -477,14 +460,13 @@
       at_most = global_read_bucket;
   }
 
-  if(conn->receiver_bucket >= 0 && at_most > conn->receiver_bucket)
-    at_most = conn->receiver_bucket;
-
   if(connection_speaks_cells(conn) && conn->state != OR_CONN_STATE_CONNECTING) {
     if(conn->state == OR_CONN_STATE_HANDSHAKING)
       return connection_tls_continue_handshake(conn);
 
     /* else open, or closing */
+    if(at_most > conn->receiver_bucket)
+      at_most = conn->receiver_bucket;
     result = read_to_buf_tls(conn->tls, at_most, conn->inbuf);
 
     switch(result) {
@@ -510,14 +492,21 @@
   }
 
   global_read_bucket -= result; assert(global_read_bucket >= 0);
-  if(connection_speaks_cells(conn))
-    conn->receiver_bucket -= result;
-  if(conn->receiver_bucket == 0 || global_read_bucket == 0) {
-    log_fn(LOG_DEBUG,"buckets (%d, %d) exhausted. Pausing.", global_read_bucket, conn->receiver_bucket);
+  if(global_read_bucket == 0) {
+    log_fn(LOG_DEBUG,"global bucket exhausted. Pausing.");
     conn->wants_to_read = 1;
     connection_stop_reading(conn);
     return 0;
   }
+  if(connection_speaks_cells(conn) && conn->state == OR_CONN_STATE_OPEN) {
+    conn->receiver_bucket -= result; assert(conn->receiver_bucket >= 0);
+    if(conn->receiver_bucket == 0) {
+      log_fn(LOG_DEBUG,"receiver bucket exhausted. Pausing.");
+      conn->wants_to_read = 1;
+      connection_stop_reading(conn);
+      return 0;
+    }
+  }
   if(connection_speaks_cells(conn) && conn->state != OR_CONN_STATE_CONNECTING)
     if(result == at_most)
       return connection_read_to_buf(conn);
@@ -627,7 +616,10 @@
 
   if(!connection_speaks_cells(conn))
     return 0; /* edge connections don't use receiver_buckets */
+  if(conn->state != OR_CONN_STATE_OPEN)
+    return 0; /* only open connections play the rate limiting game */  
 
+  assert(conn->bandwidth > 0);
   if(conn->receiver_bucket > 9*conn->bandwidth)
     return 0;
 
@@ -660,7 +652,7 @@
 
   if(!connection_speaks_cells(conn)) {
      log_fn(LOG_INFO,"Aci %d: At an edge. Marking connection for close.", aci);
-     conn->marked_for_close = 1;
+/*ENDCLOSE*/ conn->marked_for_close = 1;
      return 0;
   }
 
@@ -746,13 +738,13 @@
 #endif
 
   if (conn->type != CONN_TYPE_OR) {
-    assert(conn->bandwidth == -1);
-    assert(conn->receiver_bucket == -1);
     assert(!conn->tls);
   } else {
-    assert(conn->bandwidth);
-    assert(conn->receiver_bucket >= 0);
-    assert(conn->receiver_bucket <= 10*conn->bandwidth);
+    if(conn->state == OR_CONN_STATE_OPEN) {
+      assert(conn->bandwidth > 0);
+      assert(conn->receiver_bucket >= 0);
+      assert(conn->receiver_bucket <= 10*conn->bandwidth);
+    }
     assert(conn->addr && conn->port);
     assert(conn->address);
     if (conn->state != OR_CONN_STATE_CONNECTING)

Index: connection_edge.c
===================================================================
RCS file: /home/or/cvsroot/src/or/connection_edge.c,v
retrieving revision 1.27
retrieving revision 1.28
diff -u -d -r1.27 -r1.28
--- connection_edge.c	27 Sep 2003 07:28:44 -0000	1.27
+++ connection_edge.c	27 Sep 2003 21:09:56 -0000	1.28
@@ -29,7 +29,7 @@
     conn->done_receiving = 1;
     shutdown(conn->s, 0); /* XXX check return, refactor NM */
     if (conn->done_sending)
-      conn->marked_for_close = 1;
+/*ENDCLOSE*/  conn->marked_for_close = 1;
 
     /* XXX Factor out common logic here and in circuit_about_to_close NM */
     circ = circuit_get_by_conn(conn);
@@ -51,17 +51,17 @@
 #else 
     /* eof reached, kill it. */
     log_fn(LOG_INFO,"conn (fd %d) reached eof. Closing.", conn->s);
-    return -1;
+/*ENDCLOSE*/ return -1;
 #endif
   }
 
   switch(conn->state) {
     case AP_CONN_STATE_SOCKS_WAIT:
-      return connection_ap_handshake_process_socks(conn);
+/*ENDCLOSE*/  return connection_ap_handshake_process_socks(conn);
     case AP_CONN_STATE_OPEN:
     case EXIT_CONN_STATE_OPEN:
       if(connection_package_raw_inbuf(conn) < 0)
-        return -1;
+/*ENDCLOSE*/  return -1;
       return 0;
     case EXIT_CONN_STATE_CONNECTING:
       log_fn(LOG_INFO,"text from server while in 'connecting' state at exit. Leaving it on buffer.");
@@ -133,10 +133,11 @@
         log_fn(LOG_INFO,"...and informing resolver we don't want the answer anymore.");
         dns_cancel_pending_resolve(conn->address, conn);
       }
+      return 0;
     } else {
-      log_fn(LOG_WARNING,"Got an unexpected relay cell, not in 'open' state. Dropping.");
+      log_fn(LOG_WARNING,"Got an unexpected relay cell, not in 'open' state. Closing.");
+      return -1;
     }
-    return 0;
   }
 
   switch(relay_command) {
@@ -174,11 +175,11 @@
 //      printf("New text for buf (%d bytes): '%s'", cell->length - RELAY_HEADER_SIZE, cell->payload + RELAY_HEADER_SIZE);
       if(connection_write_to_buf(cell->payload + RELAY_HEADER_SIZE,
                                  cell->length - RELAY_HEADER_SIZE, conn) < 0) {
-        conn->marked_for_close = 1;
+/*ENDCLOSE*/    conn->marked_for_close = 1;
         return 0;
       }
       if(connection_consider_sending_sendme(conn, edge_type) < 0)
-        conn->marked_for_close = 1;
+/*ENDCLOSE*/    conn->marked_for_close = 1;
       return 0;
     case RELAY_COMMAND_END:
       if(!conn) {
@@ -191,9 +192,9 @@
       conn->done_sending = 1;
       shutdown(conn->s, 1); /* XXX check return; refactor NM */
       if (conn->done_receiving)
-        conn->marked_for_close = 1;
+/*ENDCLOSE*/  conn->marked_for_close = 1;
 #endif
-      conn->marked_for_close = 1;
+/*ENDCLOSE*/  conn->marked_for_close = 1;
       break;
     case RELAY_COMMAND_EXTEND:
       if(conn) {
@@ -240,7 +241,7 @@
       }
       log_fn(LOG_INFO,"Connected! Notifying application.");
       if(connection_ap_handshake_socks_reply(conn, SOCKS4_REQUEST_GRANTED) < 0) {
-        conn->marked_for_close = 1;
+/*ENDCLOSE*/    conn->marked_for_close = 1;
       }
       break;
     case RELAY_COMMAND_SENDME:
@@ -331,7 +332,7 @@
     return 0;
  
   if(conn->package_window <= 0) {
-    log_fn(LOG_WARNING,"called with package_window 0. Tell Roger.");
+    log_fn(LOG_WARNING,"called with package_window %d. Tell Roger.", conn->package_window);
     connection_stop_reading(conn);
     return 0;
   }
@@ -526,7 +527,7 @@
   return connection_flush_buf(conn); /* try to flush it, in case we're about to close the conn */
 }
 
-static int connection_exit_begin_conn(cell_t *cell, circuit_t *circ) {
+/*ENDCLOSE*/ static int connection_exit_begin_conn(cell_t *cell, circuit_t *circ) {
   connection_t *n_stream;
   char *colon;
 
@@ -553,8 +554,6 @@
   n_stream->address = strdup(cell->payload + RELAY_HEADER_SIZE + STREAM_ID_SIZE);
   n_stream->port = atoi(colon+1);
   n_stream->state = EXIT_CONN_STATE_RESOLVING;
-  n_stream->receiver_bucket = -1; /* edge connections don't do receiver buckets */
-  n_stream->bandwidth = -1;
   n_stream->s = -1; /* not yet valid */
   n_stream->package_window = STREAMWINDOW_START;
   n_stream->deliver_window = STREAMWINDOW_START;

Index: connection_or.c
===================================================================
RCS file: /home/or/cvsroot/src/or/connection_or.c,v
retrieving revision 1.56
retrieving revision 1.57
diff -u -d -r1.56 -r1.57
--- connection_or.c	27 Sep 2003 07:28:44 -0000	1.56
+++ connection_or.c	27 Sep 2003 21:09:56 -0000	1.57
@@ -77,7 +77,7 @@
 void connection_or_init_conn_from_router(connection_t *conn, routerinfo_t *router) {
   conn->addr = router->addr;
   conn->port = router->or_port;
-  conn->bandwidth = router->bandwidth;
+  conn->receiver_bucket = conn->bandwidth = router->bandwidth;
   conn->onion_pkey = crypto_pk_dup_key(router->onion_pkey);
   conn->link_pkey = crypto_pk_dup_key(router->link_pkey);
   conn->identity_pkey = crypto_pk_dup_key(router->identity_pkey);

Index: cpuworker.c
===================================================================
RCS file: /home/or/cvsroot/src/or/cpuworker.c,v
retrieving revision 1.8
retrieving revision 1.9
diff -u -d -r1.8 -r1.9
--- cpuworker.c	27 Sep 2003 07:28:44 -0000	1.8
+++ cpuworker.c	27 Sep 2003 21:09:56 -0000	1.9
@@ -183,8 +183,6 @@
   set_socket_nonblocking(fd[0]);
 
   /* set up conn so it's got all the data we need to remember */
-  conn->receiver_bucket = -1; /* non-cell connections don't do receiver buckets */
-  conn->bandwidth = -1;
   conn->s = fd[0];
   conn->address = strdup("localhost");
 

Index: directory.c
===================================================================
RCS file: /home/or/cvsroot/src/or/directory.c,v
retrieving revision 1.34
retrieving revision 1.35
diff -u -d -r1.34 -r1.35
--- directory.c	27 Sep 2003 07:28:44 -0000	1.34
+++ directory.c	27 Sep 2003 21:09:56 -0000	1.35
@@ -48,8 +48,6 @@
   conn->addr = router->addr;
   conn->port = router->dir_port;
   conn->address = strdup(router->address);
-  conn->receiver_bucket = -1; /* edge connections don't do receiver buckets */
-  conn->bandwidth = -1;
   if (router->identity_pkey)
     conn->identity_pkey = crypto_pk_dup_key(router->identity_pkey);
   else {

Index: dns.c
===================================================================
RCS file: /home/or/cvsroot/src/or/dns.c,v
retrieving revision 1.28
retrieving revision 1.29
diff -u -d -r1.28 -r1.29
--- dns.c	27 Sep 2003 07:28:44 -0000	1.28
+++ dns.c	27 Sep 2003 21:09:56 -0000	1.29
@@ -225,7 +225,7 @@
     /* mark all pending connections to fail */
     while(resolve->pending_connections) {
       pend = resolve->pending_connections;
-      pend->conn->marked_for_close = 1;
+/*ENDCLOSE*/  pend->conn->marked_for_close = 1;
       resolve->pending_connections = pend->next;
       free(pend);
     }
@@ -278,7 +278,7 @@
     pend = resolve->pending_connections;
     pend->conn->addr = resolve->answer;
     if(resolve->state == CACHE_STATE_FAILED || connection_exit_connect(pend->conn) < 0) {
-      pend->conn->marked_for_close = 1;
+/*ENDCLOSE*/  pend->conn->marked_for_close = 1;
     }
     resolve->pending_connections = pend->next;
     free(pend);
@@ -386,8 +386,6 @@
   set_socket_nonblocking(fd[0]);
 
   /* set up conn so it's got all the data we need to remember */
-  conn->receiver_bucket = -1; /* non-cell connections don't do receiver buckets */
-  conn->bandwidth = -1;
   conn->s = fd[0];
   conn->address = strdup("localhost");
 
@@ -420,6 +418,7 @@
 
     dnsconn->marked_for_close = 1;
     num_dnsworkers_busy--;
+    num_dnsworkers--;
   }
 
   if(num_dnsworkers_busy >= MIN_DNSWORKERS)

Index: main.c
===================================================================
RCS file: /home/or/cvsroot/src/or/main.c,v
retrieving revision 1.106
retrieving revision 1.107
diff -u -d -r1.106 -r1.107
--- main.c	27 Sep 2003 07:21:36 -0000	1.106
+++ main.c	27 Sep 2003 21:09:56 -0000	1.107
@@ -391,7 +391,8 @@
 
       if(conn->wants_to_read == 1 /* it's marked to turn reading back on now */
          && global_read_bucket > 0 /* and we're allowed to read */
-         && conn->receiver_bucket != 0) { /* and either an edge conn or non-empty bucket */
+         && (!connection_speaks_cells(conn) || conn->receiver_bucket > 0)) {
+         /* and either a non-cell conn or a cell conn with non-empty bucket */
         conn->wants_to_read = 0;
         connection_start_reading(conn);
 	if(conn->wants_to_write == 1) {

Index: or.h
===================================================================
RCS file: /home/or/cvsroot/src/or/or.h,v
retrieving revision 1.142
retrieving revision 1.143
diff -u -d -r1.142 -r1.143
--- or.h	27 Sep 2003 07:33:07 -0000	1.142
+++ or.h	27 Sep 2003 21:09:56 -0000	1.143
@@ -273,12 +273,6 @@
 
   long timestamp_created; /* when was this connection_t created? */
 
-  uint32_t bandwidth; /* connection bandwidth. Set to -1 for non-OR conns. */
-  int receiver_bucket; /* when this hits 0, stop receiving. Every second we
-                        * add 'bandwidth' to this, capping it at 10*bandwidth.
-			* Set to -1 for non-OR conns.
-                        */
-
   uint32_t addr; /* these two uniquely identify a router. Both in host order. */
   uint16_t port; /* if non-zero, they identify the guy on the other end
                   * of the connection. */
@@ -293,6 +287,12 @@
   tor_tls *tls;
   uint16_t next_aci; /* Which ACI do we try to use next on this connection? 
                       * This is always in the range 0..1<<15-1.*/
+
+  /* bandwidth and receiver_bucket only used by ORs in OPEN state: */
+  uint32_t bandwidth; /* connection bandwidth. */
+  int receiver_bucket; /* when this hits 0, stop receiving. Every second we
+                        * add 'bandwidth' to this, capping it at 10*bandwidth.
+                        */
 
 /* Used only by edge connections: */
   char stream_id[STREAM_ID_SIZE];



More information about the tor-commits mailing list