[or-cvs] Implemented link padding and receiver token buckets

Roger Dingledine arma at seul.org
Tue Jul 16 01:12:18 UTC 2002


Update of /home/or/cvsroot/src/or
In directory moria.seul.org:/home/arma/work/onion/cvs/src/or

Modified Files:
	buffers.c cell.c circuit.c command.c config.c connection.c 
	connection_ap.c connection_exit.c connection_op.c 
	connection_or.c main.c onion.c or.h routers.c 
Log Message:
Implemented link padding and receiver token buckets

Each socket reads at most 'bandwidth' bytes per second sustained, but
can handle bursts of up to 10*bandwidth bytes.

Cells are now sent out at evenly-spaced intervals, with padding sent
out otherwise. Set Linkpadding=0 in the rc file to send cells as soon
as they're available (and to never send padding cells).

Added license/copyrights statements at the top of most files.

router->min and router->max have been merged into a single 'bandwidth'
value. We should make the routerinfo_t reflect this (want to do that,
Mat?)

As the bandwidth increases, and we want to stop sleeping more and more
frequently to send a single cell, cpu usage goes up. At 128kB/s we're
pretty much calling poll with a timeout of 1ms or even 0ms. The current
code takes a timeout of 0-9ms and makes it 10ms. prepare_for_poll()
handles everything that should have happened in the past, so as long as
our buffers don't get too full in that 10ms, we're ok.

Speaking of too full, if you run three servers at 100kB/s with -l debug,
it spends too much time printing debugging messages to be able to keep
up with the cells. The outbuf ultimately fills up and it kills that
connection. If you run with -l err, it works fine up through 500kB/s and
probably beyond. Down the road we'll want to teach it to recognize when
an outbuf is getting full, and back off.



Index: buffers.c
===================================================================
RCS file: /home/or/cvsroot/src/or/buffers.c,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -d -r1.3 -r1.4
--- buffers.c	2 Jul 2002 06:02:15 -0000	1.3
+++ buffers.c	16 Jul 2002 01:12:15 -0000	1.4
@@ -1,3 +1,6 @@
+/* Copyright 2001,2002 Roger Dingledine, Matej Pfajfar. */
+/* See LICENSE for licensing information */
+/* $Id$ */
 
 /* buffers.c */
 
@@ -21,17 +24,29 @@
   free(buf);
 }
 
-int read_to_buf(int s, char **buf, size_t *buflen, size_t *buf_datalen, int *reached_eof) {
+int read_to_buf(int s, int at_most, char **buf, size_t *buflen, size_t *buf_datalen, int *reached_eof) {
 
-  /* grab from s, put onto buf, return how many bytes read */
+  /* read from socket s, writing onto buf+buf_datalen. Read at most
+   * 'at_most' bytes, and also don't read more than will fit based on buflen.
+   * If read() returns 0, set *reached_eof to 1 and return 0. If you want to tear
+   * down the connection return -1, else return the number of bytes read.
+   */
 
   int read_result;
 
-  assert(buf && *buf && buflen && buf_datalen && reached_eof && (s>=0));
+  assert(buf && *buf && buflen && buf_datalen && reached_eof && (s>=0) && (at_most >= 0));
 
   /* this is the point where you would grow the buffer, if you want to */
 
-  read_result = read(s, *buf+*buf_datalen, *buflen - *buf_datalen);
+  if(*buflen - *buf_datalen < at_most)
+    at_most = *buflen - *buf_datalen; /* take the min of the two */
+    /* (note that this only modifies at_most inside this function) */
+
+  if(at_most == 0)
+    return 0; /* we shouldn't read anything */
+
+  log(LOG_DEBUG,"read_to_buf(): reading at most %d bytes.",at_most);
+  read_result = read(s, *buf+*buf_datalen, at_most);
   if (read_result < 0) {
     if(errno!=EAGAIN) { /* it's a real error */
       return -1;
@@ -49,22 +64,24 @@
 
 }
 
-int flush_buf(int s, char **buf, size_t *buflen, size_t *buf_datalen) {
+int flush_buf(int s, char **buf, size_t *buflen, size_t *buf_flushlen, size_t *buf_datalen) {
 
   /* push from buf onto s
    * then memmove to front of buf
-   * return -1 or how many bytes remain on the buf */
+   * return -1 or how many bytes remain to be flushed */
 
   int write_result;
 
-  assert(buf && *buf && buflen && buf_datalen && (s>=0));
+  assert(buf && *buf && buflen && buf_flushlen && buf_datalen && (s>=0) && (*buf_flushlen <= *buf_datalen));
 
-  if(*buf_datalen == 0) /* nothing to flush */
+  if(*buf_flushlen == 0) /* nothing to flush */
     return 0;
 
   /* this is the point where you would grow the buffer, if you want to */
 
-  write_result = write(s, *buf, *buf_datalen);
+  write_result = write(s, *buf, *buf_flushlen > 10240 ? 10240 : *buf_flushlen);
+    /* try to flush at most 10240 bytes at a time. otherwise write() can hang for
+     * quite a while trying to get it all out. that's bad. */
   if (write_result < 0) {
     if(errno!=EAGAIN) { /* it's a real error */
       return -1;
@@ -73,11 +90,12 @@
     return 0;
   } else {
     *buf_datalen -= write_result;
+    *buf_flushlen -= write_result;
     memmove(*buf, *buf+write_result, *buf_datalen);
-    log(LOG_DEBUG,"flush_buf(): flushed %d bytes, %d remain.",write_result,*buf_datalen);
-    return *buf_datalen;
+    log(LOG_DEBUG,"flush_buf(): flushed %d bytes, %d ready to flush, %d remain.",
+        write_result,*buf_flushlen,*buf_datalen);
+    return *buf_flushlen;
   }
-
 }
 
 int write_to_buf(char *string, size_t string_len,

Index: cell.c
===================================================================
RCS file: /home/or/cvsroot/src/or/cell.c,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -d -r1.2 -r1.3
--- cell.c	5 Jul 2002 06:27:23 -0000	1.2
+++ cell.c	16 Jul 2002 01:12:15 -0000	1.3
@@ -1,3 +1,6 @@
+/* Copyright 2001,2002 Roger Dingledine, Matej Pfajfar. */
+/* See LICENSE for licensing information */
+/* $Id$ */
 
 #include "or.h"
 
@@ -5,10 +8,12 @@
 
   assert(cell);
 
+#if 0 /* actually, the aci is 0 for padding cells */
   if(cell->aci == 0) {
     log(LOG_DEBUG,"check_sane_cell(): Cell has aci=0. Dropping.");
     return -1;
   }
+#endif
 
 #if 0 /* actually, the length is sometimes encrypted. so it's ok. */
   if(cell->length > 120) {

Index: circuit.c
===================================================================
RCS file: /home/or/cvsroot/src/or/circuit.c,v
retrieving revision 1.5
retrieving revision 1.6
diff -u -d -r1.5 -r1.6
--- circuit.c	8 Jul 2002 08:59:15 -0000	1.5
+++ circuit.c	16 Jul 2002 01:12:15 -0000	1.6
@@ -1,3 +1,6 @@
+/* Copyright 2001,2002 Roger Dingledine, Matej Pfajfar. */
+/* See LICENSE for licensing information */
+/* $Id$ */
 
 #include "or.h"
 

Index: command.c
===================================================================
RCS file: /home/or/cvsroot/src/or/command.c,v
retrieving revision 1.5
retrieving revision 1.6
diff -u -d -r1.5 -r1.6
--- command.c	10 Jul 2002 19:05:13 -0000	1.5
+++ command.c	16 Jul 2002 01:12:15 -0000	1.6
@@ -1,3 +1,6 @@
+/* Copyright 2001,2002 Roger Dingledine, Matej Pfajfar. */
+/* See LICENSE for licensing information */
+/* $Id$ */
 
 #include "or.h"
 
@@ -40,8 +43,7 @@
   if(!circ) { /* if it's not there, create it */
     circ = circuit_new(cell->aci, conn);
     circ->state = CIRCUIT_STATE_OPEN_WAIT;
-    memcpy((void *)&circ->onionlen,(void *)cell->payload, 4);
-    circ->onionlen = ntohl(circ->onionlen);
+    circ->onionlen = ntohl(*(int*)cell->payload);
     log(LOG_DEBUG,"command_process_create_cell():  Onion length is %u.",circ->onionlen);
     if(circ->onionlen > 50000 || circ->onionlen < 1) { /* too big or too small */
       log(LOG_DEBUG,"That's ludicrous. Closing.");

Index: config.c
===================================================================
RCS file: /home/or/cvsroot/src/or/config.c,v
retrieving revision 1.10
retrieving revision 1.11
diff -u -d -r1.10 -r1.11
--- config.c	15 Jul 2002 16:42:27 -0000	1.10
+++ config.c	16 Jul 2002 01:12:15 -0000	1.11
@@ -1,3 +1,7 @@
+/* Copyright 2001,2002 Roger Dingledine, Matej Pfajfar. */
+/* See LICENSE for licensing information */
+/* $Id$ */
+
 /**
  * config.c 
  * Routines for loading the configuration file.
@@ -5,50 +9,6 @@
  * Matej Pfajfar <mp292 at cam.ac.uk>
  */
 
-/*
- * Changes :
- * $Log$
- * Revision 1.10  2002/07/15 16:42:27  montrose
- * corrected some string literals
- *
- * Revision 1.9  2002/07/11 19:03:44  montrose
- * finishing touches. think its ready for integration now.
- *
- * Revision 1.8  2002/07/11 18:38:15  montrose
- * added new option GlobalRole to getoptions()
- *
- * Revision 1.7  2002/07/11 14:50:26  montrose
- * cleaned up some, added validation to getoptions()
- *
- * Revision 1.6  2002/07/10 12:37:49  montrose
- * Added usage display on error.
- *
- * Revision 1.5  2002/07/09 19:51:41  montrose
- * Miscellaneous bug fixes / activated "make check" for src/or
- *
- * Revision 1.4  2002/07/03 19:58:18  montrose
- * minor bug fix in error checking
- *
- * Revision 1.3  2002/07/03 16:53:34  montrose
- * added error checking into getoptions()
- *
- * Revision 1.2  2002/07/03 16:31:22  montrose
- * Added getoptions() and made minor adjustment to poptReadDefaultOptions()
- *
- * Revision 1.1.1.1  2002/06/26 22:45:50  arma
- * initial commit: current code
- *
- * Revision 1.3  2002/04/02 14:28:24  badbytes
- * Final finishes.
- *
- * Revision 1.2  2002/01/27 00:42:50  mp292
- * Reviewed according to Secure-Programs-HOWTO.
- *
- * Revision 1.1  2002/01/03 10:24:05  badbytes
- * COde based on that in op. Needs to be modified.
- *
- */
-
 #include "or.h"
 #include <libgen.h>
 
@@ -119,7 +79,9 @@
          0, "local port on which the onion proxy is running",  "<file>" },
       { "TrafficShaping",  't',  POPT_ARG_INT,     &options->TrafficShaping,
          0, "which traffic shaping policy to use",             "<policy>" },
-      { "GlobalRole",      'g',  POPT_ARG_INT,     &options->GlobalRole,
+      { "LinkPadding",     'P',  POPT_ARG_INT,     &options->LinkPadding,
+	 0, "whether to use link padding",                     "<padding>" },
+      { "Role",            'g',  POPT_ARG_INT,     &options->Role,
          0, "4-bit global role id",                            "<role>" },
       { "Verbose",         'v',  POPT_ARG_NONE,    &Verbose,
          0, "display options selected before execution",       NULL },
@@ -137,7 +99,8 @@
    options->LogLevel = "debug";
    options->loglevel = LOG_DEBUG;
    options->CoinWeight = 0.8;
-   options->GlobalRole = ROLE_OR_LISTEN | ROLE_OR_CONNECT_ALL | ROLE_OP_LISTEN | ROLE_AP_LISTEN;
+   options->LinkPadding = 1;
+   options->Role = ROLE_OR_LISTEN | ROLE_OR_CONNECT_ALL | ROLE_OP_LISTEN | ROLE_AP_LISTEN;
 
    code = poptGetNextOpt(optCon);         /* first we handle command-line args */
    if ( code == -1 )
@@ -170,19 +133,20 @@
 
    if ( Verbose )                      
    {
-      printf("LogLevel=%s, GlobalRole=%d\n",
+      printf("LogLevel=%s, Role=%d\n",
              options->LogLevel,
-             options->GlobalRole);
+             options->Role);
       printf("RouterFile=%s, PrivateKeyFile=%s\n",
              options->RouterFile,
              options->PrivateKeyFile);
       printf("ORPort=%d, OPPort=%d, APPort=%d\n",
              options->ORPort,options->OPPort,
              options->APPort);
-      printf("CoinWeight=%6.4f, MaxConn=%d, TrafficShaping=%d\n",
+      printf("CoinWeight=%6.4f, MaxConn=%d, TrafficShaping=%d, LinkPadding=%d\n",
              options->CoinWeight,
              options->MaxConn,
-             options->TrafficShaping);
+             options->TrafficShaping,
+             options->LinkPadding);
    }
 
    /* Validate options */
@@ -260,9 +224,15 @@
       code = -1;
    }
 
-   if ( options->GlobalRole < 0 || options->GlobalRole > 15 )
+   if ( options->LinkPadding != 0 && options->LinkPadding != 1 )
    {
-      log(LOG_ERR,"GlobalRole option must be an integer between 0 and 15 (inclusive).");
+      log(LOG_ERR,"LinkPadding option must be either 0 or 1.");
+      code = -1;
+   }
+
+   if ( options->Role < 0 || options->Role > 15 )
+   {
+      log(LOG_ERR,"Role option must be an integer between 0 and 15 (inclusive).");
       code = -1;
    }
 

Index: connection.c
===================================================================
RCS file: /home/or/cvsroot/src/or/connection.c,v
retrieving revision 1.5
retrieving revision 1.6
diff -u -d -r1.5 -r1.6
--- connection.c	8 Jul 2002 08:59:15 -0000	1.5
+++ connection.c	16 Jul 2002 01:12:15 -0000	1.6
@@ -1,8 +1,13 @@
+/* Copyright 2001,2002 Roger Dingledine, Matej Pfajfar. */
+/* See LICENSE for licensing information */
+/* $Id$ */
 
 #include "or.h"
 
 /********* START VARIABLES **********/
 
+extern or_options_t options; /* command-line and config-file options */
+
 #if 0
 /* these are now out of date :( -RD */
 char *conn_type_to_string[] = {
@@ -38,6 +43,34 @@
 
 /********* END VARIABLES ************/
 
+/**************************************************************/
+
+int tv_cmp(struct timeval *a, struct timeval *b) {
+        if (a->tv_sec > b->tv_sec)
+                return 1;
+        if (a->tv_sec < b->tv_sec)
+                return -1;
+        if (a->tv_usec > b->tv_usec)
+                return 1;
+        if (a->tv_usec < b->tv_usec)
+                return -1;
+        return 0;
+}
+
+void tv_add(struct timeval *a, struct timeval *b) {
+        a->tv_usec += b->tv_usec;
+        a->tv_sec += b->tv_sec + (a->tv_usec / 1000000);
+        a->tv_usec %= 1000000;
+}
+
+void tv_addms(struct timeval *a, long ms) {
+        a->tv_usec += (ms * 1000) % 1000000;
+        a->tv_sec += ((ms * 1000) / 1000000) + (a->tv_usec / 1000000);
+        a->tv_usec %= 1000000;
+}
+
+/**************************************************************/
+
 connection_t *connection_new(int type) {
   connection_t *conn;
 
@@ -51,6 +84,8 @@
      buf_new(&conn->outbuf, &conn->outbuflen, &conn->outbuf_datalen) < 0)
     return NULL;
 
+  conn->receiver_bucket = 10240; /* should be enough to do the handshake */
+  conn->bandwidth = conn->receiver_bucket / 10; /* give it a default */
   return conn;
 }
 
@@ -245,7 +280,26 @@
 }
 
 int connection_read_to_buf(connection_t *conn) {
-  return read_to_buf(conn->s, &conn->inbuf, &conn->inbuflen, &conn->inbuf_datalen, &conn->inbuf_reached_eof);
+  int read_result;
+
+  read_result = read_to_buf(conn->s, conn->receiver_bucket, &conn->inbuf, &conn->inbuflen,
+                            &conn->inbuf_datalen, &conn->inbuf_reached_eof);
+  log(LOG_DEBUG,"connection_read_to_buf(): read_to_buf returned %d.",read_result);
+  if(read_result >= 0) {
+    conn->receiver_bucket -= read_result;
+    if(conn->receiver_bucket <= 0) {
+
+      connection_stop_reading(conn);
+
+      /* If we're not in 'open' state here, then we're never going to finish the
+       * handshake, because we'll never increment the receiver_bucket. But we
+       * can't check for that here, because the buf we just read might have enough
+       * on it to finish the handshake. So we check for that in check_conn_read().
+       */
+    }
+  }
+
+  return read_result;
 }
 
 int connection_fetch_from_buf(char *string, int len, connection_t *conn) {
@@ -253,16 +307,114 @@
 }
 
 int connection_flush_buf(connection_t *conn) {
-  return flush_buf(conn->s, &conn->outbuf, &conn->outbuflen, &conn->outbuf_datalen);
+  return flush_buf(conn->s, &conn->outbuf, &conn->outbuflen, &conn->outbuf_flushlen, &conn->outbuf_datalen);
 }
 
 int connection_write_to_buf(char *string, int len, connection_t *conn) {
   if(!len)
     return 0;
-  connection_watch_events(conn, POLLOUT | POLLIN);
+
+  if( (conn->type != CONN_TYPE_OR && conn->type != CONN_TYPE_OR) ||
+      (!connection_state_is_open(conn)) ||
+      (options.LinkPadding == 0) ) {
+    /* connection types other than or and op, or or/op not in 'open' state, should flush immediately */
+    /* also flush immediately if we're not doing LinkPadding, since otherwise it will never flush */
+    connection_watch_events(conn, POLLOUT | POLLIN);
+    conn->outbuf_flushlen += len;
+  }
+
   return write_to_buf(string, len, &conn->outbuf, &conn->outbuflen, &conn->outbuf_datalen);
 }
 
+int connection_receiver_bucket_should_increase(connection_t *conn) {
+  assert(conn);
+
+  if(conn->receiver_bucket > 10*conn->bandwidth)
+    return 0;
+
+  return 1;
+}
+
+void connection_increment_receiver_bucket (connection_t *conn) {
+  assert(conn);
+
+  if(connection_receiver_bucket_should_increase(conn)) {
+    /* yes, the receiver_bucket can become overfull here. But not by much. */
+    conn->receiver_bucket += conn->bandwidth*1.1;
+    if(connection_state_is_open(conn)) {
+      /* if we're in state 'open', then start reading again */
+      connection_start_reading(conn);
+    }
+  }
+}
+
+int connection_state_is_open(connection_t *conn) {
+  assert(conn);
+
+  if((conn->type == CONN_TYPE_OR && conn->state == OR_CONN_STATE_OPEN) ||
+     (conn->type == CONN_TYPE_OP && conn->state == OP_CONN_STATE_OPEN) ||
+     (conn->type == CONN_TYPE_AP && conn->state == AP_CONN_STATE_OPEN) ||
+     (conn->type == CONN_TYPE_EXIT && conn->state == EXIT_CONN_STATE_OPEN))
+    return 1;
+
+  return 0;
+}
+
+void connection_send_cell(connection_t *conn) {
+  cell_t cell;
+
+  assert(conn);
+
+  if(conn->type != CONN_TYPE_OR && conn->type != CONN_TYPE_OP) {
+    /* this conn doesn't speak cells. do nothing. */
+    return;
+  }
+
+  if(!connection_state_is_open(conn)) {
+    /* it's not in 'open' state, all data should already be waiting to be flushed */
+    assert(conn->outbuf_datalen == conn->outbuf_flushlen);
+    return;
+  }
+
+#if 0 /* use to send evenly spaced cells, but not padding */
+  if(conn->outbuf_datalen - conn->outbuf_flushlen >= sizeof(cell_t)) {
+    conn->outbuf_flushlen += sizeof(cell_t); /* instruct it to send a cell */
+    connection_watch_events(conn, POLLOUT | POLLIN);
+  }
+#endif
+
+#if 1 /* experimental code, that sends padding cells too. 'probably' works :) */
+  if(conn->outbuf_datalen - conn->outbuf_flushlen < sizeof(cell_t)) {
+    /* we need to queue a padding cell first */
+    memset(&cell,0,sizeof(cell_t));
+    cell.command = CELL_PADDING;
+    connection_write_cell_to_buf(&cell, conn);
+  }
+
+  conn->outbuf_flushlen += sizeof(cell_t); /* instruct it to send a cell */
+  connection_watch_events(conn, POLLOUT | POLLIN);
+#endif
+
+  connection_increment_send_timeval(conn); /* update when we'll send the next cell */
+}
+
+void connection_increment_send_timeval(connection_t *conn) {
+  /* add "1000000 * sizeof(cell_t) / conn->bandwidth" microseconds to conn->send_timeval */
+  /* FIXME should perhaps use ceil() of this. For now I simply add 1. */
+
+  tv_addms(&conn->send_timeval, 1+1000 * sizeof(cell_t) / conn->bandwidth);
+}
+
+void connection_init_timeval(connection_t *conn) {
+
+  assert(conn);
+
+  if(gettimeofday(&conn->send_timeval,NULL) < 0)
+    return;
+
+  connection_increment_send_timeval(conn);
+}
+
 int connection_send_destroy(aci_t aci, connection_t *conn) {
   cell_t cell;
 
@@ -276,6 +428,8 @@
      return 0;
   }
 
+  assert(conn->type == CONN_TYPE_OR);
+
   cell.aci = aci;
   cell.command = CELL_DESTROY;
   log(LOG_DEBUG,"connection_send_destroy(): Sending destroy (aci %d).",aci);
@@ -291,7 +445,6 @@
   }
 
   return connection_write_to_buf((char *)cellp, sizeof(cell_t), conn);
-
 }
 
 int connection_encrypt_cell_header(cell_t *cellp, connection_t *conn) {
@@ -300,22 +453,26 @@
   int x;
   char *px;
 
+#if 0
   printf("Sending: Cell header plaintext: ");
   px = (char *)cellp;
   for(x=0;x<8;x++) {
     printf("%u ",px[x]);
   } 
   printf("\n");
+#endif
 
   if(!EVP_EncryptUpdate(&conn->f_ctx, newheader, &newsize, (char *)cellp, 8)) {
     log(LOG_ERR,"Could not encrypt data for connection %s:%u.",conn->address,ntohs(conn->port));
     return -1;
   }
+#if 0
   printf("Sending: Cell header crypttext: ");
   for(x=0;x<8;x++) {
     printf("%u ",newheader[x]);
   }
   printf("\n");
+#endif
 
   memcpy(cellp,newheader,8);
   return 0;
@@ -430,22 +587,26 @@
     return -1;
   }
 
+#if 0
   printf("Cell header crypttext: ");
   for(x=0;x<8;x++) {
     printf("%u ",crypted[x]);
   }
   printf("\n");
+#endif
   /* decrypt */
   if(!EVP_DecryptUpdate(&conn->b_ctx,(unsigned char *)outbuf,&outlen,crypted,8)) {
     log(LOG_ERR,"connection_process_cell_from_inbuf(): Decryption failed, dropping.");
     return connection_process_inbuf(conn); /* process the remainder of the buffer */
   }
   log(LOG_DEBUG,"connection_process_cell_from_inbuf(): Cell decrypted (%d bytes).",outlen);
+#if 0
   printf("Cell header plaintext: ");
   for(x=0;x<8;x++) {
     printf("%u ",outbuf[x]);
   }
   printf("\n");
+#endif
 
   /* copy the rest of the cell */
   memcpy((char *)outbuf+8, (char *)crypted+8, sizeof(cell_t)-8);

Index: connection_ap.c
===================================================================
RCS file: /home/or/cvsroot/src/or/connection_ap.c,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -d -r1.3 -r1.4
--- connection_ap.c	8 Jul 2002 08:59:15 -0000	1.3
+++ connection_ap.c	16 Jul 2002 01:12:15 -0000	1.4
@@ -1,3 +1,6 @@
+/* Copyright 2001,2002 Roger Dingledine, Matej Pfajfar. */
+/* See LICENSE for licensing information */
+/* $Id$ */
 
 #include "or.h"
 

Index: connection_exit.c
===================================================================
RCS file: /home/or/cvsroot/src/or/connection_exit.c,v
retrieving revision 1.6
retrieving revision 1.7
diff -u -d -r1.6 -r1.7
--- connection_exit.c	10 Jul 2002 20:17:27 -0000	1.6
+++ connection_exit.c	16 Jul 2002 01:12:15 -0000	1.7
@@ -1,3 +1,6 @@
+/* Copyright 2001,2002 Roger Dingledine, Matej Pfajfar. */
+/* See LICENSE for licensing information */
+/* $Id$ */
 
 #include "or.h"
 

Index: connection_op.c
===================================================================
RCS file: /home/or/cvsroot/src/or/connection_op.c,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -d -r1.3 -r1.4
--- connection_op.c	5 Jul 2002 06:27:23 -0000	1.3
+++ connection_op.c	16 Jul 2002 01:12:15 -0000	1.4
@@ -1,3 +1,6 @@
+/* Copyright 2001,2002 Roger Dingledine, Matej Pfajfar. */
+/* See LICENSE for licensing information */
+/* $Id$ */
 
 #include "or.h"
 
@@ -82,6 +85,7 @@
   EVP_DecryptInit(&conn->f_ctx, EVP_des_ofb(), conn->f_session_key, conn->f_session_iv);
 
   conn->state = OP_CONN_STATE_OPEN;
+  connection_init_timeval(conn);
   connection_watch_events(conn, POLLIN);
 
   return 0;

Index: connection_or.c
===================================================================
RCS file: /home/or/cvsroot/src/or/connection_or.c,v
retrieving revision 1.5
retrieving revision 1.6
diff -u -d -r1.5 -r1.6
--- connection_or.c	8 Jul 2002 08:59:15 -0000	1.5
+++ connection_or.c	16 Jul 2002 01:12:15 -0000	1.6
@@ -1,3 +1,6 @@
+/* Copyright 2001,2002 Roger Dingledine, Matej Pfajfar. */
+/* See LICENSE for licensing information */
+/* $Id$ */
 
 #include "or.h"
 
@@ -17,7 +20,7 @@
     return -1;
   }
 
-  log(LOG_DEBUG,"connection_or_process_inbuf(): state %d.",conn->state);
+//  log(LOG_DEBUG,"connection_or_process_inbuf(): state %d.",conn->state);
 
   switch(conn->state) {
     case OR_CONN_STATE_CLIENT_AUTH_WAIT:
@@ -84,6 +87,7 @@
       log(LOG_DEBUG,"connection_or_finished_flushing(): client finished sending nonce.");
       conn_or_init_crypto(conn);
       conn->state = OR_CONN_STATE_OPEN;
+      connection_init_timeval(conn);
       connection_watch_events(conn, POLLIN);
       return 0;
     case OR_CONN_STATE_SERVER_SENDING_AUTH:
@@ -148,7 +152,7 @@
   /* set up conn so it's got all the data we need to remember */
   conn->addr = router->addr, conn->port = router->or_port; /* NOTE we store or_port here always */
   conn->prkey = prkey;
-  conn->min = router->min, conn->max = router->max;
+  conn->bandwidth = router->min; /* kludge, should make a router->bandwidth and use that */
   conn->pkey = router->pkey;
   conn->address = strdup(router->address);
   memcpy(&conn->local,local,sizeof(struct sockaddr_in));
@@ -316,6 +320,7 @@
   conn_or_init_crypto(conn);
 
   conn->state = OR_CONN_STATE_OPEN;
+  connection_init_timeval(conn);
   connection_watch_events(conn, POLLIN); /* give it a default, tho the ap_handshake call may change it */
   ap_handshake_n_conn_open(conn); /* send the pending onion */
   return 0;
@@ -367,8 +372,7 @@
   char buf[44];
   char cipher[128];
 
-  if (!conn)
-    return -1;
+  assert(conn);
 
   /* generate random keys */
   if(!RAND_bytes(conn->f_session_key,8) ||
@@ -385,11 +389,9 @@
   memcpy(buf+10, (void *)&conn->port, 2); /* remote port */
   memcpy(buf+12,conn->f_session_key,8); /* keys */
   memcpy(buf+20,conn->b_session_key,8);
-  *((uint32_t *)(buf+28)) = htonl(conn->min); /* min link utilisation */
-  *((uint32_t *)(buf+32)) = htonl(conn->max); /* maximum link utilisation */
+  *((uint32_t *)(buf+28)) = htonl(conn->bandwidth); /* max link utilisation */
   log(LOG_DEBUG,"or_handshake_client_send_auth() : Generated first authentication message.");
 
-
   /* encrypt message */
   retval = RSA_public_encrypt(36,buf,cipher,conn->pkey,RSA_PKCS1_PADDING);
   if (retval == -1) /* error */
@@ -429,7 +431,7 @@
 int or_handshake_client_process_auth(connection_t *conn) {
   char buf[128]; /* only 44 of this is expected to be used */
   char cipher[128];
-  uint32_t min,max;
+  uint32_t bandwidth;
   int retval;
 
   assert(conn);
@@ -474,15 +476,10 @@
   log(LOG_DEBUG,"or_handshake_client_process_auth() : Response valid.");
 
   /* update link info */
-  min = *(uint32_t *)(buf+28);
-  max = *(uint32_t *)(buf+32);
-  min = ntohl(min);
-  max = ntohl(max);
+  bandwidth = ntohl(*(uint32_t *)(buf+28));
 
-  if (conn->min > min)
-    conn->min = min;
-  if (conn->max > max)
-    conn->max = max;
+  if (conn->bandwidth > bandwidth)
+    conn->bandwidth = bandwidth;
 
   /* reply is just local addr/port, remote addr/port, nonce */
   memcpy(buf+12, buf+36, 8);
@@ -519,6 +516,7 @@
   log(LOG_DEBUG,"or_handshake_client_process_auth(): Finished sending nonce.");
   conn_or_init_crypto(conn);
   conn->state = OR_CONN_STATE_OPEN;
+  connection_init_timeval(conn);
   connection_watch_events(conn, POLLIN);
   return connection_process_inbuf(conn); /* process the rest of the inbuf */
 
@@ -539,7 +537,7 @@
   uint32_t addr;
   uint16_t port;
 
-  uint32_t min,max;
+  uint32_t bandwidth;
   routerinfo_t *router;
 
   assert(conn);
@@ -593,18 +591,12 @@
   memcpy(conn->f_session_key,buf+20,8);
 
   /* update link info */
-  min = *(uint32_t *)(buf+28);
-  max = *(uint32_t *)(buf+32);
-  min = ntohl(min);
-  max = ntohl(max);
+  bandwidth = ntohl(*(uint32_t *)(buf+28));
 
-  conn->min = router->min;
-  conn->max = router->max;
+  conn->bandwidth = router->min; /* FIXME, should make a router->bandwidth and use that */
 
-  if (conn->min > min)
-    conn->min = min;
-  if (conn->max > max)
-    conn->max = max;
+  if (conn->bandwidth > bandwidth)
+    conn->bandwidth = bandwidth;
 
   /* copy all relevant info to conn */
   conn->addr = router->addr, conn->port = router->or_port;
@@ -622,8 +614,7 @@
 
   /* generate message */
   memcpy(buf+36,conn->nonce,8); /* append the nonce to the end of the message */
-  *(uint32_t *)(buf+28) = htonl(conn->min); /* send min link utilisation */
-  *(uint32_t *)(buf+32) = htonl(conn->max); /* send max link utilisation */
+  *(uint32_t *)(buf+28) = htonl(conn->bandwidth); /* send max link utilisation */
 
   /* encrypt message */
   retval = RSA_public_encrypt(44,buf,cipher,conn->pkey,RSA_PKCS1_PADDING);
@@ -709,6 +700,7 @@
 
   conn_or_init_crypto(conn);
   conn->state = OR_CONN_STATE_OPEN;
+  connection_init_timeval(conn);
   connection_watch_events(conn, POLLIN);
   return connection_process_inbuf(conn); /* process the rest of the inbuf */
 

Index: main.c
===================================================================
RCS file: /home/or/cvsroot/src/or/main.c,v
retrieving revision 1.9
retrieving revision 1.10
diff -u -d -r1.9 -r1.10
--- main.c	12 Jul 2002 18:14:17 -0000	1.9
+++ main.c	16 Jul 2002 01:12:15 -0000	1.10
@@ -1,9 +1,12 @@
+/* Copyright 2001,2002 Roger Dingledine, Matej Pfajfar. */
+/* See LICENSE for licensing information */
+/* $Id$ */
 
 #include "or.h"
 
 /********* START VARIABLES **********/
 
-static or_options_t options; /* command-line and config-file options */
+or_options_t options; /* command-line and config-file options */
 int global_role;
 
 static connection_t *connection_array[MAXCONNECTIONS] =
@@ -89,13 +92,19 @@
 }
 
 connection_t *connection_twin_get_by_addr_port(uint32_t addr, uint16_t port) {
+  /* Find a connection to the router described by addr and port,
+   *   or alternately any router which knows its key.
+   * This connection *must* be in 'open' state.
+   * If not, return NULL.
+   */
   int i;
   connection_t *conn;
 
   /* first check if it's there exactly */
   conn = connection_exact_get_by_addr_port(addr,port);
-  if(conn)
+  if(conn && connection_state_is_open(conn)) {
     return conn;
+  }
 
   /* now check if any of the other open connections are a twin for this one */
 
@@ -185,6 +194,21 @@
   poll_array[conn->poll_index].events = events;
 }
 
+void connection_stop_reading(connection_t *conn) {
+
+  assert(conn && conn->poll_index < nfds);
+
+  if(poll_array[conn->poll_index].events & POLLIN)
+    poll_array[conn->poll_index].events -= POLLIN;
+}
+
+void connection_start_reading(connection_t *conn) {
+
+  assert(conn && conn->poll_index < nfds);
+
+  poll_array[conn->poll_index].events |= POLLIN;
+}
+
 void check_conn_read(int i) {
   int retval;
   connection_t *conn;
@@ -193,7 +217,7 @@
 
     conn = connection_array[i];
     assert(conn);
-    log(LOG_DEBUG,"check_conn_read(): socket %d has something to read.",conn->s);
+//    log(LOG_DEBUG,"check_conn_read(): socket %d has something to read.",conn->s);
 
     if (conn->type == CONN_TYPE_OP_LISTENER) {
       retval = connection_op_handle_listener_read(conn);
@@ -206,10 +230,14 @@
       retval = connection_read_to_buf(conn);
       if (retval >= 0) { /* all still well */
         retval = connection_process_inbuf(conn);
-	log(LOG_DEBUG,"check_conn_read(): connection_process_inbuf returned %d.",retval);
+//	log(LOG_DEBUG,"check_conn_read(): connection_process_inbuf returned %d.",retval);
+        if(retval >= 0 && !connection_state_is_open(conn) && conn->receiver_bucket == 0) {
+          log(LOG_DEBUG,"check_conn_read(): receiver bucket reached 0 before handshake finished. Closing.");
+          retval = -1;
+        }
       }
     }
-  
+
     if(retval < 0) { /* this connection is broken. remove it */
       log(LOG_DEBUG,"check_conn_read(): Connection broken, removing."); 
       connection_remove(conn);
@@ -275,15 +303,93 @@
   }
 }
 
+int prepare_for_poll(int *timeout) {
+  int i;
+  int need_to_refill_buckets = 0;
+  connection_t *conn = NULL;
+  connection_t *tmpconn;
+  struct timeval now, soonest;
+  static int current_second = 0; /* from previous calls to gettimeofday */
+  int ms_until_conn;
+
+  *timeout = -1; /* set it to never timeout, possibly overridden below */
+  
+  /* first check if we need to refill buckets */
+  for(i=0;i<nfds;i++) {
+    if(connection_receiver_bucket_should_increase(connection_array[i])) {
+      need_to_refill_buckets = 1;
+      break;
+    }
+  }
+
+  if(gettimeofday(&now,NULL) < 0)
+    return -1;
+
+  if(need_to_refill_buckets) {
+    if(now.tv_sec > current_second) { /* the second has already rolled over! */
+      log(LOG_DEBUG,"prepare_for_poll(): The second has rolled over, immediately refilling.");
+      increment_receiver_buckets();
+      current_second = now.tv_sec; /* remember which second it is, for next time */
+    }
+    *timeout = 1000 - (now.tv_usec / 1000); /* how many milliseconds til the next second? */
+//    log(LOG_DEBUG,"prepare_for_poll(): %d milliseconds til next second.",*timeout);
+  }
+
+  if(options.LinkPadding) {
+    /* now check which conn wants to speak soonest */
+    for(i=0;i<nfds;i++) {
+      tmpconn = connection_array[i];
+      if(tmpconn->type != CONN_TYPE_OR && tmpconn->type != CONN_TYPE_OP)
+        continue; /* this conn type doesn't send cells */
+      if(!connection_state_is_open(tmpconn))
+        continue; /* only conns in state 'open' have a valid send_timeval */ 
+      while(tv_cmp(&tmpconn->send_timeval,&now) <= 0) { /* send_timeval has already passed, let it send a cell */
+        log(LOG_DEBUG,"prepare_for_poll(): doing backlogged connection_send_cell on socket %d (%d ms old)",tmpconn->s,
+          (now.tv_sec - tmpconn->send_timeval.tv_sec)*1000 +
+          (now.tv_usec - tmpconn->send_timeval.tv_usec)/1000
+        );
+        connection_send_cell(tmpconn);
+      }
+      if(!conn || tv_cmp(&tmpconn->send_timeval, &soonest) < 0) { /* this is the best choice so far */
+//        log(LOG_DEBUG,"prepare_for_poll(): chose socket %d as best connection so far",tmpconn->s);
+        conn = tmpconn;
+        soonest.tv_sec = conn->send_timeval.tv_sec;
+        soonest.tv_usec = conn->send_timeval.tv_usec;
+      }
+    }
+
+    if(conn) { /* we might want to set *timeout sooner */
+      ms_until_conn = (soonest.tv_sec - now.tv_sec)*1000 +
+                    (soonest.tv_usec - now.tv_usec)/1000;
+//      log(LOG_DEBUG,"prepare_for_poll(): conn %d times out in %d ms.",conn->s, ms_until_conn);
+      if(*timeout == -1 || ms_until_conn < *timeout) { /* use the new one */
+//        log(LOG_DEBUG,"prepare_for_poll(): conn %d soonest, in %d ms.",conn->s,ms_until_conn);
+        *timeout = ms_until_conn;
+      }
+    }
+  }
+
+  return 0;
+}
+
+void increment_receiver_buckets(void) {
+  int i;
+
+  for(i=0;i<nfds;i++)
+    connection_increment_receiver_bucket(connection_array[i]);
+}
+
 int do_main_loop(void) {
   int i;
+  int timeout;
+  int poll_result;
 
   /* load the routers file */
   router_array = getrouters(options.RouterFile,&rarray_len, options.ORPort);
   if (!router_array)
   {
     log(LOG_ERR,"Error loading router list.");
-    exit(1);
+    return -1;
   }
 
   /* load the private key */
@@ -291,29 +397,55 @@
   if (!prkey)
   {
     log(LOG_ERR,"Error loading private key.");
-    exit(1);
+    return -1;
   }
   log(LOG_DEBUG,"core : Loaded private key of size %u bytes.",RSA_size(prkey));
 
   /* start-up the necessary connections based on global_role. This is where we
    * try to connect to all the other ORs, and start the listeners */
-  retry_all_connections(options.GlobalRole, router_array, rarray_len, prkey, 
+  retry_all_connections(options.Role, router_array, rarray_len, prkey, 
 		        options.ORPort, options.OPPort, options.APPort);
 
   for(;;) {
-    poll(poll_array, nfds, -1); /* poll until we have an event */
+    if(prepare_for_poll(&timeout) < 0) {
+      log(LOG_DEBUG,"do_main_loop(): prepare_for_poll failed, exiting.");
+      return -1;
+    }
+    /* now timeout is the value we'll hand to poll. It's either -1, meaning
+     * don't timeout, else it indicates the soonest event (either the
+     * one-second rollover for refilling receiver buckets, or the soonest
+     * conn that needs to send a cell)
+     */
 
-    /* do all the reads first, so we can detect closed sockets */
-    for(i=0;i<nfds;i++)
-      check_conn_read(i); /* this also blows away broken connections */
+    /* if the timeout is less than 10, set it to 10 */
+    if(timeout >= 0 && timeout < 10)
+      timeout = 10;
 
-    /* then do the writes */
-    for(i=0;i<nfds;i++)
-      check_conn_write(i);
+    /* poll until we have an event, or it's time to do something */
+    poll_result = poll(poll_array, nfds, timeout);
 
-    /* any of the conns need to be closed now? */
-    for(i=0;i<nfds;i++)
-      check_conn_marked(i); 
+    if(poll_result < 0) {
+      log(LOG_ERR,"do_main_loop(): poll failed.");
+      if(errno != EINTR) /* let the program survive things like ^z */
+        return -1;
+    }
+
+    if(poll_result > 0) { /* we have at least one connection to deal with */
+      /* do all the reads first, so we can detect closed sockets */
+      for(i=0;i<nfds;i++)
+        check_conn_read(i); /* this also blows away broken connections */
+
+      /* then do the writes */
+      for(i=0;i<nfds;i++)
+        check_conn_write(i);
+
+      /* any of the conns need to be closed now? */
+      for(i=0;i<nfds;i++)
+        check_conn_marked(i); 
+    }
+    /* refilling buckets and sending cells happens at the beginning of the
+     * next iteration of the loop, inside prepare_for_poll()
+     */
   }
 }
 
@@ -332,7 +464,7 @@
 
   if ( getoptions(argc,argv,&options) ) exit(1);
   log(options.loglevel,NULL);         /* assign logging severity level from options */
-  global_role = options.GlobalRole;   /* assign global_role from options. FIX: remove from global namespace later. */
+  global_role = options.Role;   /* assign global_role from options. FIX: remove from global namespace later. */
 
   ERR_load_crypto_strings();
   retval = do_main_loop();

Index: onion.c
===================================================================
RCS file: /home/or/cvsroot/src/or/onion.c,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -d -r1.2 -r1.3
--- onion.c	2 Jul 2002 09:36:58 -0000	1.2
+++ onion.c	16 Jul 2002 01:12:15 -0000	1.3
@@ -1,3 +1,6 @@
+/* Copyright 2001,2002 Roger Dingledine, Matej Pfajfar. */
+/* See LICENSE for licensing information */
+/* $Id$ */
 
 #include "or.h"
 

Index: or.h
===================================================================
RCS file: /home/or/cvsroot/src/or/or.h,v
retrieving revision 1.9
retrieving revision 1.10
diff -u -d -r1.9 -r1.10
--- or.h	11 Jul 2002 18:38:16 -0000	1.9
+++ or.h	16 Jul 2002 01:12:15 -0000	1.10
@@ -1,4 +1,5 @@
-/* Copyright (c) 2002 Roger Dingledine.  See LICENSE for licensing information */
+/* Copyright 2001,2002 Roger Dingledine, Matej Pfajfar. */
+/* See LICENSE for licensing information */
 /* $Id$ */
 
 #ifndef __OR_H
@@ -38,8 +39,8 @@
 #define MAXCONNECTIONS 200 /* upper bound on max connections.
 			      can be overridden by config file */
 
-#define MAX_BUF_SIZE (64*1024)
-#define DEFAULT_BANDWIDTH_OP 1
+#define MAX_BUF_SIZE (640*1024)
+#define DEFAULT_BANDWIDTH_OP 1024
 
 #define ACI_TYPE_LOWER 0
 #define ACI_TYPE_HIGHER 1
@@ -130,16 +131,19 @@
   int inbuf_reached_eof;
 
   char *outbuf;
-  size_t outbuflen;
-  size_t outbuf_datalen;
+  size_t outbuflen; /* how many bytes are allocated for the outbuf? */
+  size_t outbuf_flushlen; /* how much data should we try to flush from the outbuf? */
+  size_t outbuf_datalen; /* how much data is there total on the outbuf? */
 
 //  uint16_t aci; /* anonymous connection identifier */
 
 /* used by OR and OP: */
 
   uint32_t bandwidth; /* connection bandwidth */
-  int window_sent; /* how many cells can i still send? */
-  int window_received; /* how many cells do i still expect to receive? */
+  int receiver_bucket; /* when this hits 0, stop receiving. Every second we
+		       	* add 'bandwidth' to this, capping it at 10*bandwidth.
+		       	*/
+  struct timeval send_timeval; /* for determining when to send the next cell */
 
   /* link encryption */
   unsigned char f_session_key[8];
@@ -169,9 +173,11 @@
   RSA *prkey;
   struct sockaddr_in local;
 
-   /* link info */
+#if 0 /* obsolete, we now use conn->bandwidth */
+  /* link info */
   uint32_t min;
   uint32_t max;
+#endif
 
   char *address; /* strdup into this, because free_connection frees it */
   RSA *pkey; /* public RSA key for the other side */
@@ -295,7 +301,8 @@
    int APPort;
    int MaxConn;
    int TrafficShaping;
-   int GlobalRole;
+   int LinkPadding;
+   int Role;
    int loglevel;
 } or_options_t;
 
@@ -303,24 +310,16 @@
     /* all the function prototypes go here */
 
 
-/********************************* args.c ***************************/
-
-/* print help*/
-void print_usage();
-
-/* get command-line arguments */
-int getargs(int argc,char *argv[], char *args,char **conf_filename, int *loglevel);
-
 /********************************* buffers.c ***************************/
 
 int buf_new(char **buf, size_t *buflen, size_t *buf_datalen);
 
 void buf_free(char *buf);
 
-int read_to_buf(int s, char **buf, size_t *buflen, size_t *buf_datalen, int *reached_eof);
+int read_to_buf(int s, int at_most, char **buf, size_t *buflen, size_t *buf_datalen, int *reached_eof);
   /* grab from s, put onto buf, return how many bytes read */
 
-int flush_buf(int s, char **buf, size_t *buflen, size_t *buf_datalen);
+int flush_buf(int s, char **buf, size_t *buflen, size_t *buf_flushlen, size_t *buf_datalen);
   /* push from buf onto s
    * then memmove to front of buf
    * return -1 or how many bytes remain on the buf */
@@ -384,6 +383,8 @@
 
 /********************************* connection.c ***************************/
 
+int tv_cmp(struct timeval *a, struct timeval *b);
+
 connection_t *connection_new(int type);
 
 void connection_free(connection_t *conn);
@@ -404,6 +405,16 @@
 int connection_flush_buf(connection_t *conn);
 
 int connection_write_to_buf(char *string, int len, connection_t *conn);
+void connection_send_cell(connection_t *conn);
+
+int connection_receiver_bucket_should_increase(connection_t *conn);
+void connection_increment_receiver_bucket (connection_t *conn);
+
+void connection_increment_send_timeval(connection_t *conn);
+void connection_init_timeval(connection_t *conn);
+
+int connection_state_is_open(connection_t *conn);
+
 int connection_send_destroy(aci_t aci, connection_t *conn);
 int connection_encrypt_cell_header(cell_t *cellp, connection_t *conn);
 int connection_write_cell_to_buf(cell_t *cellp, connection_t *conn);
@@ -500,12 +511,15 @@
 connection_t *connect_to_router_as_op(routerinfo_t *router);
 
 void connection_watch_events(connection_t *conn, short events);
+void connection_stop_reading(connection_t *conn);
+void connection_start_reading(connection_t *conn);
 
 void check_conn_read(int i);
 void check_conn_marked(int i);
 void check_conn_write(int i);
 
-void check_conn_hup(int i);
+int prepare_for_poll(int *timeout);
+void increment_receiver_buckets(void);
 
 int do_main_loop(void);
 

Index: routers.c
===================================================================
RCS file: /home/or/cvsroot/src/or/routers.c,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -d -r1.3 -r1.4
--- routers.c	10 Jul 2002 12:35:59 -0000	1.3
+++ routers.c	16 Jul 2002 01:12:15 -0000	1.4
@@ -1,3 +1,7 @@
+/* Copyright 2001,2002 Roger Dingledine, Matej Pfajfar. */
+/* See LICENSE for licensing information */
+/* $Id$ */
+
 /**
  * routers.c 
  * Routines for loading the list of routers and their public RSA keys.



More information about the tor-commits mailing list