[or-cvs] major overhaul: dns slave subsystem, topics

Roger Dingledine arma at seul.org
Sun Jan 26 09:03:17 UTC 2003


Update of /home/or/cvsroot/src/or
In directory moria.mit.edu:/home/arma/work/onion/cvs/src/or

Modified Files:
	Makefile.am buffers.c circuit.c command.c config.c 
	connection.c connection_ap.c connection_exit.c main.c onion.c 
	or.h 
Added Files:
	dns.c 
Log Message:
major overhaul: dns slave subsystem, topics

on startup, it forks off a master dns handler, which forks off dns
slaves (like the apache model). slaves as spawned as load increases,
and then reused. excess slaves are not ever killed, currently.

implemented topics. each topic has a receive window in each direction
at each edge of the circuit, and sends sendme's at the data level, as
per before. each circuit also has receive windows in each direction at
each hop; an edge sends a circuit-level sendme as soon as enough data
cells have arrived (regardless of whether the data cells were flushed
to the exit conns). removed the 'connected' cell type, since it's now
a topic command within data cells.

at the edge of the circuit, there can be multiple connections associated
with a single circuit. you find them via the linked list conn->next_topic.

currently each new ap connection starts its own circuit, so we ought
to see comparable performance to what we had before. but that's only
because i haven't written the code to reattach to old circuits. please
try to break it as-is, and then i'll make it reuse the same circuit and
we'll try to break that.



--- NEW FILE: dns.c ---
/* Copyright 2003 Roger Dingledine. */
/* See LICENSE for licensing information */
/* $Id: dns.c,v 1.1 2003/01/26 09:02:24 arma Exp $ */

#include "or.h"

#define MAX_DNSSLAVES 50
#define MIN_DNSSLAVES 3 /* 1 for the tor process, 3 slaves */

struct slave_data_t {
  int fd; /* socket to talk on */
  int num_processed; /* number of times we've used this slave */
  char busy; /* whether this slave currently has a task */
  char question[256]; /* the hostname that we're resolving */
  unsigned char question_len; /* how many bytes in question */
  char answer[256]; /* the answer to the question */
  unsigned char answer_len; /* how many bytes in answer */
};

struct slave_data_t slave_data[MAX_DNSSLAVES+1];
struct pollfd poll_data[MAX_DNSSLAVES+1];

static int dns_spawn_slave(void);
static int dns_read_block(int fd, char *string, unsigned char *len);
static int dns_write_block(int fd, char *string, unsigned char len);
static int dns_read_tor_question(int index);
static int dns_read_slave_response(int index);
static int dns_find_idle_slave(int max);
static int dns_assign_to_slave(int from, int to);
static int dns_master_to_tor(int from, int to);
static void dns_master_main(int fd);

int connection_dns_finished_flushing(connection_t *conn) {

  assert(conn && conn->type == CONN_TYPE_DNSMASTER);

  connection_stop_writing(conn);

  return 0;
}

int connection_dns_process_inbuf(connection_t *conn) {
  unsigned char length;
  char buf[256];
  char *question;
  connection_t *exitconn;

  assert(conn && conn->type == CONN_TYPE_DNSMASTER);
  assert(conn->state == DNSMASTER_STATE_OPEN);

  if(conn->inbuf_reached_eof) {
    log(LOG_ERR,"connection_dns_process_inbuf(): Read eof. No more dnsmaster!");
    return -1;
  }

  assert(conn->inbuf);

  /* peek into the inbuf, so we can check if it's all here */
  length = *conn->inbuf; /* warning: abstraction violation :( */
  assert(length < 240);

  if(conn->inbuf_datalen < 1+length) { /* entire answer available? */
    log(LOG_INFO,"connection_dns_process_inbuf(): %d available, waiting for %d.", conn->inbuf_datalen, length+1);
    return 0; /* not yet */
  }

  if(connection_fetch_from_buf(buf,1+length,conn) < 0) {
    log(LOG_ERR,"connection_dns_process_inbuf(): Broken inbuf. No more dnsmaster!");
    return -1;
  }
 
  question = buf+1;
  log(LOG_DEBUG,"connection_dns_process_inbuf(): length %d, question '%s', strlen question %d", length, question, strlen(question));
  assert(length == 4 + strlen(question) + 1);
  
  /* find the conn that question refers to. */
  exitconn = connection_get_pendingresolve_by_address(question);

  if(!exitconn) {
    log(LOG_DEBUG,"connection_dns_process_inbuf(): No conn -- question no longer relevant? Dropping.");
    return connection_process_inbuf(conn); /* process the remainder of the buffer */
  }
  memcpy((char *)&exitconn->addr, buf+1+length-4,4);
  exitconn->addr = ntohl(exitconn->addr); /* get it back to host order */

  if(connection_exit_connect(exitconn) < 0) {
    exitconn->marked_for_close = 1;
  }

  return connection_process_inbuf(conn); /* process the remainder of the buffer */
}


/* return -1 if error, else the fd that can talk to the dns master */
int dns_master_start(void) {
  connection_t *conn;
  pid_t pid;
  int fd[2];

  if(socketpair(AF_UNIX, SOCK_STREAM, 0, fd) < 0) {
    log(LOG_ERR,"dns_master_start(): socketpair failed.");
    return -1;
  }

  pid = fork();
  if(pid < 0) {
    log(LOG_ERR,"dns_master_start(): fork failed.");
    return -1;
  }
  if(pid == 0) { /* i'm the child */
    log(LOG_DEBUG,"dns_master_start(): child says fd0 %d, fd1 %d.", fd[0], fd[1]);
    close(fd[0]);
    dns_master_main(fd[1]);
    assert(0); /* never gets here */
  }

  /* i'm the parent */

  close(fd[1]);

  fcntl(fd[0], F_SETFL, O_NONBLOCK); /* set s to non-blocking */

  conn = connection_new(CONN_TYPE_DNSMASTER);
  if(!conn) {
    log(LOG_INFO,"dns_master_start(): connection_new failed. Giving up.");
    /* XXX tell the dnsmaster to die */
    return -1;
  }

  conn->s = fd[0];
  conn->address = strdup("localhost");
  conn->receiver_bucket = -1; /* edge connections don't do receiver buckets */
  conn->bandwidth = -1;

  if(connection_add(conn) < 0) { /* no space, forget it */
    log(LOG_INFO,"dns_master_start(): connection_add failed. Giving up.");
    connection_free(conn);
    /* XXX tell the dnsmaster to die */    
    return -1;
  }

  conn->state = DNSMASTER_STATE_OPEN;
  connection_start_reading(conn);
  log(LOG_INFO,"dns_master_start(): dns handler is spawned.");
  return fd[0];
}

static void dns_slave_main(int fd) {
  char question[256];
  unsigned char question_len;
  struct hostent *rent;

  for(;;) {
    if(dns_read_block(fd, question, &question_len) < 0) { /* the master wants us to die */
      log(LOG_INFO,"dns_slave_main(): eof on read from master. Exiting.");
      exit(0);
    }

    rent = gethostbyname(question);
    if (!rent) { 
      log(LOG_INFO,"dns_slave_main(): Could not resolve dest addr %s. Returning nulls.",question);
      if(dns_write_block(fd, "\0\0\0\0", 4) < 0) {
        log(LOG_INFO,"dns_slave_main(): writing to master failed. Exiting.");
        exit(0);
      }     
    } else {
      if(dns_write_block(fd, rent->h_addr, rent->h_length) < 0) {
        log(LOG_INFO,"dns_slave_main(): writing to master failed. Exiting.");
        exit(0);
      }
      log(LOG_INFO,"dns_slave_main(): Answered question '%s'.",question);
    }
  }
}

static int dns_spawn_slave(void) {
  pid_t pid;
  int fd[2];

  if(socketpair(AF_UNIX, SOCK_STREAM, 0, fd) < 0) {
    perror("socketpair");
    exit(1);
  }

  pid = fork();
  if(pid < 0) {
    perror("fork");
    exit(1);
  }
  if(pid == 0) { /* i'm the child */
    close(fd[0]);
    dns_slave_main(fd[1]);
    assert(0); /* never gets here */  
  }

  /* i'm the parent */
  log(LOG_INFO,"dns_spawn_slave(): just spawned a slave."); // XXX change to debug
  close(fd[1]);
  return fd[0];
}

/* read a first byte from fd, put it into *len. Then read *len
 * bytes from fd and put it into string.
 * Return -1 if eof or read error or bad len, else return 0.
 */
int dns_read_block(int fd, char *string, unsigned char *len) {
  int read_result;

  log(LOG_DEBUG,"dns_read_block(): Calling read to learn length (fd %d).", fd);
  read_result = read(fd, len, 1);
  log(LOG_DEBUG,"dns_read_block(): read finished, returned %d", read_result);
  if (read_result < 0) {
    log(LOG_INFO,"dns_read_block(): read len returned error");
    return -1;
  } else if (read_result == 0) {
    log(LOG_INFO,"dns_read_block(): Encountered eof reading len");
    return -1;
  } else if (*len <= 0) {
    log(LOG_INFO,"dns_read_block(): len not >0");
    return -1;
  }

  log(LOG_DEBUG,"dns_read_block(): Calling read to get string, length %u.", *len);
  read_result = read(fd, string, *len);
  if (read_result < 0) {
    log(LOG_INFO,"dns_read_block(): read string returned error");
    return -1;
  } else if (read_result == 0) {
    log(LOG_INFO,"dns_read_block(): Encountered eof reading string");
    return -1;
  }

  string[*len] = 0; /* null terminate it, just in case */
  log(LOG_INFO,"dns_read_block(): Read '%s', len %u.",string,*len); // XXX make silent
  return 0;
}

/* write ("%c%s", string, len) onto fd */
static int dns_write_block(int fd, char *string, unsigned char len) {
  int write_result;
  int written=0;
  char tmp[257];

  assert(len <= 250);
  tmp[0] = len;
  memcpy(tmp+1, string, len);
  log(LOG_DEBUG,"dns_write_block(): writing length %u, fd %d.", len, fd);

  while(written < len+1) {
    write_result = write(fd, tmp, len+1-written);
    if (write_result < 0) {
      return -1;
    }
    written += write_result;
  }

  return 0;
}

/* pull in question. block until we've read everything. 
 * return -1 if eof. */
static int dns_read_tor_question(int index) {

  log(LOG_DEBUG,"dns_read_tor_question(): Pulling question from tor");
  if(dns_read_block(slave_data[index].fd,
                    slave_data[index].question,
                    &slave_data[index].question_len) < 0)
    return -1;

  log(LOG_INFO,"dns_read_tor_question(): Read question '%s'",slave_data[index].question);
  return 0;
}

/* pull in answer. block until we've read it. return -1 if eof. */
static int dns_read_slave_response(int index) {

  if(dns_read_block(slave_data[index].fd,
                    slave_data[index].answer,
                    &slave_data[index].answer_len) < 0)
    return -1;

  return 0;
}

static int dns_find_idle_slave(int max) {
  int i;

  for(i=1;i<max;i++)
    if(slave_data[i].busy == 0) {
      log(LOG_DEBUG,"dns_find_idle_slave(): slave %d is chosen.",i);
      return i;
    }

  assert(0); /* should never get here */
}

static int dns_assign_to_slave(int from, int to) {

  slave_data[to].question_len = slave_data[from].question_len;
  memcpy(slave_data[to].question, slave_data[from].question, slave_data[from].question_len);

//  slave_data[from].question_len = 0;

  log(LOG_DEBUG,"dns_assign_to_slave(): from index %d to %d (writing fd %d)",from,to,slave_data[to].fd);
  if(dns_write_block(slave_data[to].fd,
                     slave_data[to].question,
                     slave_data[to].question_len) < 0) {
    log(LOG_INFO,"dns_assign_to_slave(): writing to slave failed.");
    return -1;
  }

  return 0;
}

static int dns_master_to_tor(int from, int to) {
  char tmp[256];
  unsigned char len;

  len = slave_data[from].question_len+1+slave_data[from].answer_len;
  memcpy(tmp, slave_data[from].question, slave_data[from].question_len);
  tmp[slave_data[from].question_len] = 0; /* null terminate it */
  memcpy(tmp+1+slave_data[from].question_len, slave_data[from].answer, slave_data[from].answer_len);

  log(LOG_DEBUG,"dns_master_to_tor(): question is '%s', length %d",slave_data[from].question,slave_data[from].question_len);
  log(LOG_DEBUG,"dns_master_to_tor(): answer is %d %d %d %d",
    slave_data[from].answer[0],
    slave_data[from].answer[1],
    slave_data[from].answer[2],
    slave_data[from].answer[3]);
  assert(slave_data[from].answer_len == 4);
  if(dns_write_block(slave_data[to].fd, tmp, len) < 0) {
    log(LOG_INFO,"dns_master_to_tor(): writing to tor failed.");
    return -1;
  }

  return 0;
}

int dns_tor_to_master(char *address) {
  connection_t *conn;
  unsigned char len;

  conn = connection_get_by_type(CONN_TYPE_DNSMASTER);
  if(!conn) {
    log(LOG_ERR,"dns_tor_to_master(): dns master nowhere to be found!");
    /* XXX should do gethostbyname right here */
    return -1;
  }

  len = strlen(address);
  if(connection_write_to_buf(&len, 1, conn) < 0) {
    log(LOG_DEBUG,"dns_tor_to_master(): Couldn't write length.");
    return -1;
  }

  if(connection_write_to_buf(address, len, conn) < 0) {
    log(LOG_DEBUG,"dns_tor_to_master(): Couldn't write address.");
    return -1;
  }

  log(LOG_DEBUG,"dns_tor_to_master(): submitted '%s'", address);
  return 0;
}

static void dns_master_main(int fd) {
  int nfds=1; /* the 0th index is the tor process, the rest are slaves */
  int num_slaves_busy=0;
  int num_slaves_needed = MIN_DNSSLAVES;
  int poll_result, idle, i;

  poll_data[0].fd = slave_data[0].fd = fd;
  poll_data[0].events = POLLIN;

  for(;;) { /* loop forever */

    assert(num_slaves_needed < MAX_DNSSLAVES);
    while(nfds-1 < num_slaves_needed) {
      /* add another slave. */

      i = nfds;
      memset(&slave_data[i], 0, sizeof(struct slave_data_t));
      memset(&poll_data[i], 0, sizeof(struct pollfd));
      slave_data[i].fd = poll_data[i].fd = dns_spawn_slave();
      poll_data[i].events = POLLIN; /* listen always, to prevent accidental deadlock */      
      nfds++;
    }

    /* XXX later, decide on a timeout value, to catch wedged slaves */

    poll_result = poll(poll_data, nfds, -1);
    log(LOG_DEBUG,"dns_master_main(): Poll returned -- activity!");
    for(i=0;i<nfds;i++) {
      if(poll_data[i].revents & POLLIN) {
        if(i==0) { /* note that we read only one question per poll loop */
          if(dns_read_tor_question(i) >= 0) {
            while(1) {
              idle = dns_find_idle_slave(nfds);
              if(dns_assign_to_slave(i, idle) >= 0)
                break; /* successfully assigned to one */
              /* XXX slave must die, recalc num slaves and num busy */
            }
            num_slaves_busy++; 
          } else { /* error */
            log(LOG_INFO,"dns_master_main(): dns_read_tor_question failed. Master dying.");
            exit(1);
          }
        } else {
          if(dns_read_slave_response(i) >= 0) {
            if(dns_master_to_tor(i, 0) < 0) {
              log(LOG_INFO,"dns_master_main(): dns_master_to_tor failed. Master dying.");
              exit(1);
            }
            slave_data[i].busy = 0;
            num_slaves_busy--;
            poll_data[0].events = POLLIN; /* resume reading from tor if we'd stopped */
          } else { /* error */
            log(LOG_INFO,"dns_master_main(): dns_read_slave_response failed. Leaving slave stranded (FIXME)");
          }
        }
      }
    }
    log(LOG_DEBUG,"dns_master_main(): Finished looping over fd's.");

    if(num_slaves_busy >= num_slaves_needed) {
      if(num_slaves_needed == MAX_DNSSLAVES-1)
        poll_data[0].events = 0; /* stop reading from tor */
      else
        num_slaves_needed++;
    }

  }
  assert(0); /* should never get here */
}


Index: Makefile.am
===================================================================
RCS file: /home/or/cvsroot/src/or/Makefile.am,v
retrieving revision 1.10
retrieving revision 1.11
diff -u -d -r1.10 -r1.11
--- Makefile.am	31 Dec 2002 15:04:14 -0000	1.10
+++ Makefile.am	26 Jan 2003 09:02:23 -0000	1.11
@@ -9,7 +9,7 @@
 
 or_SOURCES = buffers.c circuit.c command.c connection.c \
              connection_exit.c connection_ap.c connection_op.c connection_or.c config.c \
-             main.c onion.c routers.c directory.c
+             main.c onion.c routers.c directory.c dns.c
 
 test_config_SOURCES = test_config.c
 

Index: buffers.c
===================================================================
RCS file: /home/or/cvsroot/src/or/buffers.c,v
retrieving revision 1.9
retrieving revision 1.10
diff -u -d -r1.9 -r1.10
--- buffers.c	28 Sep 2002 05:53:00 -0000	1.9
+++ buffers.c	26 Jan 2003 09:02:24 -0000	1.10
@@ -51,7 +51,7 @@
     /* if no linkpadding: do a rudimentary round-robin so one
      * connection can't hog an outgoing connection
      */
-    at_most = 10*sizeof(cell_t);
+    at_most = 10*sizeof(cell_t); /* FIXME should be 10* size of usable payload */
   }
 
 //  log(LOG_DEBUG,"read_to_buf(): reading at most %d bytes.",at_most);

Index: circuit.c
===================================================================
RCS file: /home/or/cvsroot/src/or/circuit.c,v
retrieving revision 1.20
retrieving revision 1.21
diff -u -d -r1.20 -r1.21
--- circuit.c	30 Dec 2002 08:51:41 -0000	1.20
+++ circuit.c	26 Jan 2003 09:02:24 -0000	1.21
@@ -64,8 +64,8 @@
   circ->p_aci = p_aci;
   /* circ->n_aci remains 0 because we haven't identified the next hop yet */
 
-  circ->n_receive_window = RECEIVE_WINDOW_START;
-  circ->p_receive_window = RECEIVE_WINDOW_START;
+  circ->n_receive_circwindow = CIRCWINDOW_START;
+  circ->p_receive_circwindow = CIRCWINDOW_START;
 
   circuit_add(circ);
 
@@ -73,6 +73,7 @@
 }
 
 void circuit_free(circuit_t *circ) {
+  struct data_queue_t *tmpd;
   
   if (circ->n_crypto)
     crypto_free_cipher_env(circ->n_crypto);
@@ -83,6 +84,12 @@
     free(circ->onion);
   if(circ->cpath)
     circuit_free_cpath(circ->cpath, circ->cpathlen);
+  while(circ->data_queue) {
+    tmpd = circ->data_queue;
+    circ->data_queue = tmpd->next;
+    free(tmpd->cell);
+    free(tmpd);
+  }
 
   free(circ);
 }
@@ -202,7 +209,7 @@
   else
     circ = circ->next;
 
-  for( ;circ;circ = circ->next) {
+  for( ; circ; circ = circ->next) {
     if(circ->n_addr == naddr && circ->n_port == nport)
        return circ;
   }
@@ -211,56 +218,108 @@
 
 circuit_t *circuit_get_by_aci_conn(aci_t aci, connection_t *conn) {
   circuit_t *circ;
+  connection_t *tmpconn;
 
   for(circ=global_circuitlist;circ;circ = circ->next) {
-    if(circ->p_conn == conn && circ->p_aci == aci)
-       return circ;
-    if(circ->n_conn == conn && circ->n_aci == aci)
-       return circ;
+    if(circ->p_aci == aci) {
+      for(tmpconn = circ->p_conn; tmpconn; tmpconn = tmpconn->next_topic) {
+        if(tmpconn == conn)
+          return circ;
+      }
+    }
+    if(circ->n_aci == aci) {
+      for(tmpconn = circ->n_conn; tmpconn; tmpconn = tmpconn->next_topic) {
+        if(tmpconn == conn)
+          return circ;
+      }
+    }
   }
   return NULL;
 }
 
 circuit_t *circuit_get_by_conn(connection_t *conn) {
   circuit_t *circ;
+  connection_t *tmpconn;
 
   for(circ=global_circuitlist;circ;circ = circ->next) {
-    if(circ->p_conn == conn)
-       return circ;
-    if(circ->n_conn == conn)
-       return circ;
+    for(tmpconn = circ->p_conn; tmpconn; tmpconn=tmpconn->next_topic)
+      if(tmpconn == conn)
+        return circ;
+    for(tmpconn = circ->n_conn; tmpconn; tmpconn=tmpconn->next_topic)
+      if(tmpconn == conn)
+        return circ;
   }
   return NULL;
 }
 
-int circuit_deliver_data_cell(cell_t *cell, circuit_t *circ, connection_t *conn, int crypt_type) {
+int circuit_deliver_data_cell_from_edge(cell_t *cell, circuit_t *circ, char edge_type) {
+  int cell_direction;
 
-  /* first decrypt cell->length */
-  if(circuit_crypt(circ, &(cell->length), 1, crypt_type) < 0) {
-    log(LOG_DEBUG,"circuit_deliver_data_cell(): length decryption failed. Dropping connection.");
+  log(LOG_DEBUG,"circuit_deliver_data_cell_from_edge(): called, edge_type %d.", edge_type);
+
+  if(edge_type == EDGE_AP) { /* i'm the AP */
+    cell_direction = CELL_DIRECTION_OUT;
+    if(circ->p_receive_circwindow <= 0) {
+      log(LOG_DEBUG,"circuit_deliver_data_cell_from_edge(): window 0, queueing for later.");
+      circ->data_queue = data_queue_add(circ->data_queue, cell);
+      return 0;
+    }
+    circ->p_receive_circwindow--;
+  } else { /* i'm the exit */
+    cell_direction = CELL_DIRECTION_IN;
+    if(circ->n_receive_circwindow <= 0) {
+      log(LOG_DEBUG,"circuit_deliver_data_cell_from_edge(): window 0, queueing for later.");
+      circ->data_queue = data_queue_add(circ->data_queue, cell);
+      return 0;
+    }
+    circ->n_receive_circwindow--;
+  }
+
+  if(circuit_deliver_data_cell(cell, circ, cell_direction) < 0) {
     return -1;
   }
+    
+  circuit_consider_stop_edge_reading(circ, edge_type); /* has window reached 0? */
+  return 0;
+}
 
-  /* then decrypt the payload */
-  if(circuit_crypt(circ, (char *)&(cell->payload), CELL_PAYLOAD_SIZE, crypt_type) < 0) {
-    log(LOG_DEBUG,"circuit_deliver_data_cell(): payload decryption failed. Dropping connection.");
+int circuit_deliver_data_cell(cell_t *cell, circuit_t *circ, int cell_direction) {
+  connection_t *conn;
+
+  assert(cell && circ);
+  assert(cell_direction == CELL_DIRECTION_OUT || cell_direction == CELL_DIRECTION_IN); 
+  if(cell_direction == CELL_DIRECTION_OUT)
+    conn = circ->n_conn;
+  else
+    conn = circ->p_conn;
+
+  /* first crypt cell->length */
+  if(circuit_crypt(circ, &(cell->length), 1, cell_direction) < 0) {
+    log(LOG_DEBUG,"circuit_deliver_data_cell(): length crypt failed. Dropping connection.");
     return -1;
   }
 
-  if(conn->type == CONN_TYPE_EXIT) { /* send payload directly */
-//    log(LOG_DEBUG,"circuit_deliver_data_cell(): Sending to exit.");
-    return connection_exit_process_data_cell(cell, conn);
+  /* then crypt the payload */
+  if(circuit_crypt(circ, (char *)&(cell->payload), CELL_PAYLOAD_SIZE, cell_direction) < 0) {
+    log(LOG_DEBUG,"circuit_deliver_data_cell(): payload crypt failed. Dropping connection.");
+    return -1;
   }
-  if(conn->type == CONN_TYPE_AP) { /* send payload directly */
-//    log(LOG_DEBUG,"circuit_deliver_data_cell(): Sending to AP.");
-    return connection_ap_process_data_cell(cell, conn);
+
+  if((!conn && cell_direction == CELL_DIRECTION_OUT) || (conn && conn->type == CONN_TYPE_EXIT)) {
+    log(LOG_DEBUG,"circuit_deliver_data_cell(): Sending to exit.");
+    return connection_exit_process_data_cell(cell, circ);
+  }
+  if((!conn && cell_direction == CELL_DIRECTION_IN) || (conn && conn->type == CONN_TYPE_AP)) {
+    log(LOG_DEBUG,"circuit_deliver_data_cell(): Sending to AP.");
+    return connection_ap_process_data_cell(cell, circ);
   }
   /* else send it as a cell */
-//  log(LOG_DEBUG,"circuit_deliver_data_cell(): Sending to connection.");
+  assert(conn);
+  //log(LOG_DEBUG,"circuit_deliver_data_cell(): Sending to connection.");
   return connection_write_cell_to_buf(cell, conn);
 }
 
-int circuit_crypt(circuit_t *circ, char *in, int inlen, char crypt_type) {
+int circuit_crypt(circuit_t *circ, char *in, int inlen, char cell_direction) {
   char *out;
   int i;
   crypt_path_t *thishop;
@@ -271,20 +330,17 @@
   if(!out)
     return -1;
 
-  if(crypt_type == 'e') {
-//    log(LOG_DEBUG,"circuit_crypt(): Encrypting %d bytes.",inlen);
+  if(cell_direction == CELL_DIRECTION_IN) { //crypt_type == 'e') {
     if(circ->cpath) { /* we're at the beginning of the circuit. We'll want to do layered crypts. */
-      /* 'e' means we're preparing to send it out. */
-      for (i=0; i < circ->cpathlen; i++) /* moving from last to first hop 
-                                          * Remember : cpath is in reverse order, i.e. last hop first
-                                          */
+      for (i=circ->cpathlen-1; i >= 0; i--) /* moving from first to last hop 
+                                       * Remember : cpath is in reverse order, i.e. last hop first
+                                       */
       { 
-//        log(LOG_DEBUG,"circuit_crypt() : Encrypting via cpath: Processing hop %u",circ->cpathlen-i);
         thishop = circ->cpath[i];
-    
-        /* encrypt */
-        if(crypto_cipher_encrypt(thishop->f_crypto, in, inlen, (unsigned char *)out)) {
-          log(LOG_ERR,"Error performing encryption:%s",crypto_perror());
+
+        /* decrypt */
+        if(crypto_cipher_decrypt(thishop->b_crypto, in, inlen, out)) {
+          log(LOG_ERR,"Error performing decryption:%s",crypto_perror());
           free(out);
           return -1;
         }
@@ -301,19 +357,17 @@
       }
       memcpy(in,out,inlen);
     }
-  } else if(crypt_type == 'd') {
-//    log(LOG_DEBUG,"circuit_crypt(): Decrypting %d bytes.",inlen);
+  } else if(cell_direction == CELL_DIRECTION_OUT) { //crypt_type == 'd') {
     if(circ->cpath) { /* we're at the beginning of the circuit. We'll want to do layered crypts. */
-      for (i=circ->cpathlen-1; i >= 0; i--) /* moving from first to last hop 
-                                       * Remember : cpath is in reverse order, i.e. last hop first
-                                       */
+      for (i=0; i < circ->cpathlen; i++) /* moving from last to first hop 
+                                          * Remember : cpath is in reverse order, i.e. last hop first
+                                          */
       { 
-//        log(LOG_DEBUG,"circuit_crypt() : Decrypting via cpath: Processing hop %u",circ->cpathlen-i);
         thishop = circ->cpath[i];
-
+    
         /* encrypt */
-        if(crypto_cipher_decrypt(thishop->b_crypto, in, inlen, out)) {
-          log(LOG_ERR,"Error performing decryption:%s",crypto_perror());
+        if(crypto_cipher_encrypt(thishop->f_crypto, in, inlen, (unsigned char *)out)) {
+          log(LOG_ERR,"Error performing encryption:%s",crypto_perror());
           free(out);
           return -1;
         }
@@ -330,19 +384,120 @@
       }
       memcpy(in,out,inlen);
     }
+  } else {
+    log(LOG_ERR,"circuit_crypt(): unknown cell direction %d.", cell_direction);
+    assert(0);
   }
 
   free(out);
+  return 0;
+}
+
+void circuit_resume_edge_reading(circuit_t *circ, int edge_type) {
+  connection_t *conn;
+  struct data_queue_t *tmpd;
+
+  assert(edge_type == EDGE_EXIT || edge_type == EDGE_AP);
+
+  if(edge_type == EDGE_EXIT)
+    conn = circ->n_conn;
+  else
+    conn = circ->p_conn;
+
+  /* first, send the queue waiting at circ onto the circuit */
+  while(circ->data_queue) {
+    assert(circ->data_queue->cell);
+    if(edge_type == EDGE_EXIT) {   
+      circ->p_receive_circwindow--;
+      assert(circ->p_receive_circwindow >= 0);
+      
+      if(circuit_deliver_data_cell(circ->data_queue->cell, circ, CELL_DIRECTION_IN) < 0) {
+        circuit_close(circ);
+        return;
+      }
+    } else { /* ap */
+      circ->p_receive_circwindow--;
+      assert(circ->p_receive_circwindow >= 0);
+      
+      if(circuit_deliver_data_cell(circ->data_queue->cell, circ, CELL_DIRECTION_IN) < 0) {
+        circuit_close(circ);
+        return;
+      }
+    }
+    tmpd = circ->data_queue;
+    circ->data_queue = tmpd->next;
+    free(tmpd->cell);
+    free(tmpd);
 
+    if(circuit_consider_stop_edge_reading(circ, edge_type))
+      return;
+  }
+
+  for( ; conn; conn=conn->next_topic) {
+    if((edge_type == EDGE_EXIT && conn->n_receive_topicwindow > 0) ||
+       (edge_type == EDGE_AP   && conn->p_receive_topicwindow > 0)) {
+      connection_start_reading(conn);
+      connection_package_raw_inbuf(conn); /* handle whatever might still be on the inbuf */
+    }
+  }
+  circuit_consider_stop_edge_reading(circ, edge_type);
+}
+
+/* returns 1 if the window is empty, else 0. If it's empty, tell edge conns to stop reading. */
+int circuit_consider_stop_edge_reading(circuit_t *circ, int edge_type) {
+  connection_t *conn = NULL;
+
+  assert(edge_type == EDGE_EXIT || edge_type == EDGE_AP);
+
+  if(edge_type == EDGE_EXIT && circ->p_receive_circwindow <= 0)
+    conn = circ->n_conn;      
+  else if(edge_type == EDGE_AP && circ->n_receive_circwindow <= 0)
+    conn = circ->p_conn;      
+  else
+    return 0;
+
+  for( ; conn; conn=conn->next_topic)
+    connection_stop_reading(conn);
+
+  return 1;
+}
+
+int circuit_consider_sending_sendme(circuit_t *circ, int edge_type) {
+  cell_t sendme;
+
+  assert(circ);
+
+  sendme.command = CELL_SENDME;
+  sendme.length = CIRCWINDOW_INCREMENT;
+
+  if(edge_type == EDGE_AP) { /* i'm the AP */
+    if(circ->n_receive_circwindow < CIRCWINDOW_START-CIRCWINDOW_INCREMENT) {
+      log(LOG_DEBUG,"circuit_consider_sending_sendme(): Queueing sendme forward.");
+      circ->n_receive_circwindow += CIRCWINDOW_INCREMENT;
+      sendme.aci = circ->n_aci;
+      return connection_write_cell_to_buf(&sendme, circ->n_conn); /* (clobbers sendme) */
+    }
+  } else if(edge_type == EDGE_EXIT) { /* i'm the exit */
+    if(circ->p_receive_circwindow < CIRCWINDOW_START-CIRCWINDOW_INCREMENT) {
+      log(LOG_DEBUG,"circuit_consider_sending_sendme(): Queueing sendme back.");
+      circ->p_receive_circwindow += CIRCWINDOW_INCREMENT;
+      sendme.aci = circ->p_aci;
+      return connection_write_cell_to_buf(&sendme, circ->p_conn); /* (clobbers sendme) */
+    }
+  }
   return 0;
 }
 
 void circuit_close(circuit_t *circ) {
+  connection_t *conn;
+
   circuit_remove(circ);
-  if(circ->n_conn)
+  for(conn=circ->n_conn; conn; conn=conn->next_topic) {
     connection_send_destroy(circ->n_aci, circ->n_conn); 
-  if(circ->p_conn)
+  }
+  for(conn=circ->p_conn; conn; conn=conn->next_topic) {
     connection_send_destroy(circ->p_aci, circ->p_conn); 
+  }
   circuit_free(circ);
 }
 
@@ -352,31 +507,98 @@
    * down the road, maybe we'll consider that eof doesn't mean can't-write
    */
   circuit_t *circ;
+  connection_t *prevconn, *tmpconn;
+  cell_t cell;
+  int edge_type;
+
+  if(!connection_speaks_cells(conn)) {
+    /* it's an edge conn. need to remove it from the linked list of
+     * conn's for this circuit. Send an 'end' data topic.
+     * But don't kill the circuit.
+     */
+
+    circ = circuit_get_by_conn(conn);
+    if(!circ)
+      return;
+
+    memset(&cell, 0, sizeof(cell_t));
+    cell.command = CELL_DATA;
+    cell.length = TOPIC_HEADER_SIZE;
+    *(uint32_t *)cell.payload = conn->topic_id;
+    *cell.payload = TOPIC_COMMAND_END;
+   
+    if(conn == circ->p_conn) {
+      circ->p_conn = conn->next_topic;
+      edge_type = EDGE_AP;
+      goto send_end;
+    }
+    if(conn == circ->n_conn) {
+      circ->n_conn = conn->next_topic;
+      edge_type = EDGE_EXIT;
+      goto send_end;
+    }
+    for(prevconn = circ->p_conn; prevconn->next_topic && prevconn->next_topic != conn; prevconn = prevconn->next_topic) ;
+    if(prevconn->next_topic) {
+      prevconn->next_topic = conn->next_topic;
+      edge_type = EDGE_AP;
+      goto send_end;
+    }
+    for(prevconn = circ->n_conn; prevconn->next_topic && prevconn->next_topic != conn; prevconn = prevconn->next_topic) ;
+    if(prevconn->next_topic) {
+      prevconn->next_topic = conn->next_topic;
+      edge_type = EDGE_EXIT;
+      goto send_end;
+    }
+    log(LOG_ERR,"circuit_about_to_close_connection(): edge conn not in circuit's list?");
+    assert(0); /* should never get here */
+send_end:
+    if(edge_type == EDGE_AP) { /* send to circ->n_conn */
+      log(LOG_INFO,"circuit_about_to_close_connection(): send data end forward (aci %d).",circ->n_aci);
+      cell.aci = circ->n_aci;
+    } else { /* send to circ->p_conn */
+      assert(edge_type == EDGE_EXIT);
+      log(LOG_INFO,"circuit_about_to_close_connection(): send data end backward (aci %d).",circ->p_aci);
+      cell.aci = circ->p_aci;
+    }
+
+    if(circuit_deliver_data_cell_from_edge(&cell, circ, edge_type) < 0) {
+      log(LOG_DEBUG,"circuit_about_to_close_connection(): circuit_deliver_data_cell_from_edge (%d) failed. Closing.", edge_type);
+      circuit_close(circ);
+    }
+    return;
+  }
 
   while((circ = circuit_get_by_conn(conn))) {
     circuit_remove(circ);
     if(circ->n_conn == conn) /* it's closing in front of us */
-      /* circ->p_conn should always be set */
-      assert(circ->p_conn);
-      connection_send_destroy(circ->p_aci, circ->p_conn);
+      for(tmpconn=circ->p_conn; tmpconn; tmpconn=tmpconn->next_topic) {
+        connection_send_destroy(circ->p_aci, tmpconn); 
+      }
     if(circ->p_conn == conn) /* it's closing behind us */
-      if(circ->n_conn)
-        connection_send_destroy(circ->n_aci, circ->n_conn);
+      for(tmpconn=circ->n_conn; tmpconn; tmpconn=tmpconn->next_topic) {
+        connection_send_destroy(circ->n_aci, tmpconn); 
+      }
     circuit_free(circ);
   }  
 }
 
+/* FIXME this now leaves some out */
 void circuit_dump_by_conn(connection_t *conn) {
   circuit_t *circ;
+  connection_t *tmpconn;
 
   for(circ=global_circuitlist;circ;circ = circ->next) {
-    if(circ->p_conn == conn) {
-      printf("Conn %d has App-ward circuit:  aci %d (other side %d), state %d (%s)\n",
-        conn->poll_index, circ->p_aci, circ->n_aci, circ->state, circuit_state_to_string[circ->state]);
+    for(tmpconn=circ->p_conn; tmpconn; tmpconn=tmpconn->next_topic) {
+      if(tmpconn == conn) {
+        printf("Conn %d has App-ward circuit:  aci %d (other side %d), state %d (%s)\n",
+          conn->poll_index, circ->p_aci, circ->n_aci, circ->state, circuit_state_to_string[circ->state]);
+      }
     }
-    if(circ->n_conn == conn) {
-      printf("Conn %d has Exit-ward circuit: aci %d (other side %d), state %d (%s)\n",
-        conn->poll_index, circ->n_aci, circ->p_aci, circ->state, circuit_state_to_string[circ->state]);
+    for(tmpconn=circ->n_conn; tmpconn; tmpconn=tmpconn->next_topic) {
+      if(tmpconn == conn) {
+        printf("Conn %d has Exit-ward circuit: aci %d (other side %d), state %d (%s)\n",
+          conn->poll_index, circ->n_aci, circ->p_aci, circ->state, circuit_state_to_string[circ->state]);
+      }
     }
   }
 }

Index: command.c
===================================================================
RCS file: /home/or/cvsroot/src/or/command.c,v
retrieving revision 1.16
retrieving revision 1.17
diff -u -d -r1.16 -r1.17
--- command.c	23 Dec 2002 06:48:14 -0000	1.16
+++ command.c	26 Jan 2003 09:02:24 -0000	1.17
@@ -38,8 +38,8 @@
 }
 
 void command_process_cell(cell_t *cell, connection_t *conn) {
-  static int num_create=0, num_data=0, num_destroy=0, num_sendme=0, num_connected=0;
-  static int create_time=0, data_time=0, destroy_time=0, sendme_time=0, connected_time=0;
+  static int num_create=0, num_data=0, num_destroy=0, num_sendme=0;
+  static int create_time=0, data_time=0, destroy_time=0, sendme_time=0;
   static long current_second = 0; /* from previous calls to gettimeofday */
   struct timeval now;
 
@@ -55,11 +55,10 @@
     log(LOG_INFO,"Data:      %d (%d ms)", num_data, data_time/1000);
     log(LOG_INFO,"Destroy:   %d (%d ms)", num_destroy, destroy_time/1000);
     log(LOG_INFO,"Sendme:    %d (%d ms)", num_sendme, sendme_time/1000);
-    log(LOG_INFO,"Connected: %d (%d ms)", num_connected, connected_time/1000);
 
     /* zero out stats */
-    num_create = num_data = num_destroy = num_sendme = num_connected = 0;
-    create_time = data_time = destroy_time = sendme_time = connected_time = 0;
+    num_create = num_data = num_destroy = num_sendme = 0;
+    create_time = data_time = destroy_time = sendme_time = 0;
 
     /* remember which second it is, for next time */
     current_second = now.tv_sec; 
@@ -85,10 +84,6 @@
       command_time_process_cell(cell, conn, &num_sendme, &sendme_time,
                                 command_process_sendme_cell);
       break;
-    case CELL_CONNECTED:
-      command_time_process_cell(cell, conn, &num_connected, &connected_time,
-                                command_process_connected_cell);
-      break;
     default:
       log(LOG_DEBUG,"Cell of unknown type (%d) received. Dropping.", cell->command);
       break;
@@ -115,7 +110,7 @@
       circuit_close(circ);
       return;
     }
-    circ->onion = (unsigned char *)malloc(circ->onionlen);
+    circ->onion = malloc(circ->onionlen);
     if(!circ->onion) { 
       log(LOG_DEBUG,"command_process_create_cell(): Out of memory. Closing.");
       circuit_close(circ);
@@ -158,16 +153,6 @@
   return;
 }
 
-#if 0
-  conn->onions_handled_this_second++;
-  log(LOG_DEBUG,"command_process_create_cell(): Processing onion %d for this second.",conn->onions_handled_this_second);
-  if(conn->onions_handled_this_second > options.OnionsPerSecond) {
-    log(LOG_INFO,"command_process_create_cell(): Received too many onions (now %d) this second. Closing.", conn->onions_handled_this_second);
-    circuit_close(circ);
-    return;
-  }
-#endif
-
 void command_process_sendme_cell(cell_t *cell, connection_t *conn) {
   circuit_t *circ;
 
@@ -189,17 +174,16 @@
 
   /* at this point both circ->n_conn and circ->p_conn are guaranteed to be set */
 
-  if(cell->length != RECEIVE_WINDOW_INCREMENT) {
+  if(cell->length != CIRCWINDOW_INCREMENT) {
     log(LOG_WARNING,"command_process_sendme_cell(): non-standard sendme value %d.",cell->length);
   }
-//  assert(cell->length == RECEIVE_WINDOW_INCREMENT);
 
   if(cell->aci == circ->p_aci) { /* it's an outgoing cell */
-    circ->n_receive_window += cell->length;
-    log(LOG_DEBUG,"connection_process_sendme_cell(): n_receive_window for aci %d is %d.",circ->n_aci,circ->n_receive_window);
-    if(circ->n_conn->type == CONN_TYPE_EXIT) {
-      connection_start_reading(circ->n_conn);
-      connection_package_raw_inbuf(circ->n_conn); /* handle whatever might still be on the inbuf */
+    circ->n_receive_circwindow += cell->length;
+    assert(circ->n_receive_circwindow <= CIRCWINDOW_START);
+    log(LOG_DEBUG,"connection_process_sendme_cell(): n_receive_circwindow for aci %d is %d.",circ->n_aci,circ->n_receive_circwindow);
+    if(!circ->n_conn || circ->n_conn->type == CONN_TYPE_EXIT) {
+      circuit_resume_edge_reading(circ, EDGE_EXIT);
     } else {
       cell->aci = circ->n_aci; /* switch it */
       if(connection_write_cell_to_buf(cell, circ->n_conn) < 0) { /* (clobbers cell) */
@@ -208,11 +192,11 @@
       }
     }
   } else { /* it's an ingoing cell */
-    circ->p_receive_window += cell->length;
-    log(LOG_DEBUG,"connection_process_sendme_cell(): p_receive_window for aci %d is %d.",circ->p_aci,circ->p_receive_window);
-    if(circ->p_conn->type == CONN_TYPE_AP) {
-      connection_start_reading(circ->p_conn);
-      connection_package_raw_inbuf(circ->p_conn); /* handle whatever might still be on the inbuf */
+    circ->p_receive_circwindow += cell->length;
+    log(LOG_DEBUG,"connection_process_sendme_cell(): p_receive_circwindow for aci %d is %d.",circ->p_aci,circ->p_receive_circwindow);
+    assert(circ->p_receive_circwindow <= CIRCWINDOW_START);
+    if(!circ->p_conn || circ->p_conn->type == CONN_TYPE_AP) {
+      circuit_resume_edge_reading(circ, EDGE_AP);
     } else {
       cell->aci = circ->p_aci; /* switch it */
       if(connection_write_cell_to_buf(cell, circ->p_conn) < 0) { /* (clobbers cell) */
@@ -246,47 +230,41 @@
     onion_pending_data_add(circ, cell);
     return;
   }
-  /* at this point both circ->n_conn and circ->p_conn are guaranteed to be set */
+
+  /* circ->p_conn and n_conn are only null if we're at an edge point with no connections yet */
 
   if(cell->aci == circ->p_aci) { /* it's an outgoing cell */
     cell->aci = circ->n_aci; /* switch it */
-    if(--circ->p_receive_window < 0) { /* is it less than 0 after decrement? */
-      log(LOG_DEBUG,"connection_process_data_cell(): Too many data cells on aci %d. Closing.", circ->p_aci);
+    if(--circ->p_receive_circwindow < 0) { /* is it less than 0 after decrement? */
+      log(LOG_INFO,"connection_process_data_cell(): Too many data cells for circuit (aci %d). Closing.", circ->p_aci);
       circuit_close(circ);
       return;
     }
-    log(LOG_DEBUG,"connection_process_data_cell(): p_receive_window for aci %d is %d.",circ->p_aci,circ->p_receive_window);
-    if(circuit_deliver_data_cell(cell, circ, circ->n_conn, 'd') < 0) {
-      log(LOG_DEBUG,"command_process_data_cell(): circuit_deliver_data_cell (forward) failed. Closing.");
+    log(LOG_DEBUG,"connection_process_data_cell(): p_receive_circwindow for aci %d is %d.",circ->p_aci,circ->p_receive_circwindow);
+    if(circuit_deliver_data_cell(cell, circ, CELL_DIRECTION_OUT) < 0) {
+      log(LOG_INFO,"command_process_data_cell(): circuit_deliver_data_cell (forward) failed. Closing.");
       circuit_close(circ);
       return;
     }
   } else { /* it's an ingoing cell */
     cell->aci = circ->p_aci; /* switch it */
-    if(--circ->n_receive_window < 0) { /* is it less than 0 after decrement? */
-      log(LOG_DEBUG,"connection_process_data_cell(): Too many data cells on aci %d. Closing.", circ->n_aci);
+    if(--circ->n_receive_circwindow < 0) { /* is it less than 0 after decrement? */
+      log(LOG_DEBUG,"connection_process_data_cell(): Too many data cells for circuit (aci %d). Closing.", circ->n_aci);
       circuit_close(circ);
       return;
     }
-    log(LOG_DEBUG,"connection_process_data_cell(): n_receive_window for aci %d is %d.",circ->n_aci,circ->n_receive_window);
-    if(circ->p_conn->type == CONN_TYPE_AP) { /* we want to decrypt, not encrypt */
-      if(circuit_deliver_data_cell(cell, circ, circ->p_conn, 'd') < 0) {
-        log(LOG_DEBUG,"command_process_data_cell(): circuit_deliver_data_cell (backward to AP) failed. Closing.");
-        circuit_close(circ);
-        return;
-      }
-    } else {
-      if(circuit_deliver_data_cell(cell, circ, circ->p_conn, 'e') < 0) {
-        log(LOG_DEBUG,"command_process_data_cell(): circuit_deliver_data_cell (backward) failed. Closing.");
-        circuit_close(circ);
-        return;
-      }
+    log(LOG_DEBUG,"connection_process_data_cell(): n_receive_circwindow for aci %d is %d.",circ->n_aci,circ->n_receive_circwindow);
+    if(circuit_deliver_data_cell(cell, circ, CELL_DIRECTION_IN) < 0) {
+      log(LOG_DEBUG,"command_process_data_cell(): circuit_deliver_data_cell (backward to AP) failed. Closing.");
+      circuit_close(circ);
+      return;
     }
   }
 }
 
 void command_process_destroy_cell(cell_t *cell, connection_t *conn) {
   circuit_t *circ;
+  connection_t *tmpconn;
 
   circ = circuit_get_by_aci_conn(cell->aci, conn);
 
@@ -299,34 +277,19 @@
   if(circ->state == CIRCUIT_STATE_ONION_PENDING) {
     onion_pending_remove(circ);
   }
+
   circuit_remove(circ);
+
   if(cell->aci == circ->p_aci) { /* the destroy came from behind */
-    if(circ->n_conn) /* might not be defined, eg if it was just on the pending queue */
-      connection_send_destroy(circ->n_aci, circ->n_conn);
+    for(tmpconn = circ->n_conn; tmpconn; tmpconn=tmpconn->next_topic) {
+      connection_send_destroy(circ->n_aci, tmpconn);
+    }
   }
   if(cell->aci == circ->n_aci) { /* the destroy came from ahead */
-    assert(circ->p_conn);
-    connection_send_destroy(circ->p_aci, circ->p_conn);
+    for(tmpconn = circ->p_conn; tmpconn; tmpconn=tmpconn->next_topic) {
+      connection_send_destroy(circ->p_aci, tmpconn);
+    }
   }
   circuit_free(circ);
-}
-
-void command_process_connected_cell(cell_t *cell, connection_t *conn) {
-  circuit_t *circ;
-
-  circ = circuit_get_by_aci_conn(cell->aci, conn);
-
-  if(!circ) {
-    log(LOG_DEBUG,"command_process_connected_cell(): unknown circuit %d. Dropping.", cell->aci);
-    return;
-  }
-
-  if(circ->n_conn != conn) {
-    log(LOG_WARNING,"command_process_connected_cell(): cell didn't come from n_conn! (aci %d)",cell->aci);
-    return;
-  }
-
-  log(LOG_DEBUG,"command_process_connected_cell(): Received for aci %d.",cell->aci);
-  connection_send_connected(circ->p_aci, circ->p_conn);
 }
 

Index: config.c
===================================================================
RCS file: /home/or/cvsroot/src/or/config.c,v
retrieving revision 1.24
retrieving revision 1.25
diff -u -d -r1.24 -r1.25
--- config.c	3 Dec 2002 22:18:23 -0000	1.24
+++ config.c	26 Jan 2003 09:02:24 -0000	1.25
@@ -211,8 +211,8 @@
   options->loglevel = LOG_DEBUG;
   options->CoinWeight = 0.8;
   options->LinkPadding = 0;
-  options->DirRebuildPeriod = 600;
-  options->DirFetchPeriod = 6000;
+  options->DirRebuildPeriod = 300;
+  options->DirFetchPeriod = 600;
   options->KeepalivePeriod = 300;
   options->MaxOnionsPending = 10;
 //  options->ReconnectPeriod = 6001;

Index: connection.c
===================================================================
RCS file: /home/or/cvsroot/src/or/connection.c,v
retrieving revision 1.32
retrieving revision 1.33
diff -u -d -r1.32 -r1.33
--- connection.c	13 Oct 2002 13:17:27 -0000	1.32
+++ connection.c	26 Jan 2003 09:02:24 -0000	1.33
@@ -19,6 +19,7 @@
   "App",         /* 7 */
   "Dir listener",/* 8 */
   "Dir",         /* 9 */
+  "DNS master",  /* 10 */
 };
 
 char *conn_state_to_string[][15] = {
@@ -52,6 +53,7 @@
     "reading",                         /* 2 */
     "awaiting command",                /* 3 */
     "writing" },                       /* 4 */
+  { "open" }, /* dns master, 0 */
 };
 
 /********* END VARIABLES ************/
@@ -354,6 +356,9 @@
   if(!len)
     return 0;
 
+  if(conn->marked_for_close)
+    return 0;
+
   conn->timestamp_lastwritten = now.tv_sec;
 
   if( (!connection_speaks_cells(conn)) ||
@@ -507,23 +512,6 @@
   return connection_write_cell_to_buf(&cell, conn);
 }
 
-int connection_send_connected(aci_t aci, connection_t *conn) {
-  cell_t cell;
-
-  assert(conn);
-
-  if(!connection_speaks_cells(conn)) {
-    log(LOG_INFO,"connection_send_connected(): Aci %d: At entry point. Notifying proxy.", aci);
-    connection_ap_send_connected(conn);
-    return 0;
-  }
-
-  cell.aci = aci;
-  cell.command = CELL_CONNECTED;
-  log(LOG_INFO,"connection_send_connected(): passing back cell (aci %d).",aci);
-  return connection_write_cell_to_buf(&cell, conn);
-}
-
 int connection_write_cell_to_buf(cell_t *cellp, connection_t *conn) {
   char networkcell[CELL_NETWORK_SIZE];
   char *n = networkcell;
@@ -588,6 +576,8 @@
       return connection_ap_process_inbuf(conn);
     case CONN_TYPE_DIR:
       return connection_dir_process_inbuf(conn);
+    case CONN_TYPE_DNSMASTER:
+      return connection_dns_process_inbuf(conn); 
     default:
       log(LOG_DEBUG,"connection_process_inbuf() got unexpected conn->type.");
       return -1;
@@ -601,70 +591,76 @@
 
   assert(conn);
   assert(!connection_speaks_cells(conn));
-  /* this function should never get called if the receiver_window is 0 */
+  /* this function should never get called if the receive_topicwindow is 0 */
+ 
+repeat_connection_package_raw_inbuf:
 
   amount_to_process = conn->inbuf_datalen;
 
   if(!amount_to_process)
     return 0;
 
-  if(amount_to_process > CELL_PAYLOAD_SIZE) {
-    cell.length = CELL_PAYLOAD_SIZE;
+  if(amount_to_process > CELL_PAYLOAD_SIZE - TOPIC_HEADER_SIZE) {
+    cell.length = CELL_PAYLOAD_SIZE - TOPIC_HEADER_SIZE;
   } else {
     cell.length = amount_to_process;
   }
 
-  if(connection_fetch_from_buf(cell.payload, cell.length, conn) < 0)
+  if(connection_fetch_from_buf(cell.payload+TOPIC_HEADER_SIZE, cell.length, conn) < 0)
     return -1;
 
   circ = circuit_get_by_conn(conn);
   if(!circ) {
-    log(LOG_DEBUG,"connection_raw_package_inbuf(): conn has no circuits!");
+    log(LOG_DEBUG,"connection_package_raw_inbuf(): conn has no circuits!");
     return -1;
   }
 
-  log(LOG_DEBUG,"connection_raw_package_inbuf(): Packaging %d bytes.",cell.length);
+  log(LOG_DEBUG,"connection_package_raw_inbuf(): Packaging %d bytes (%d waiting).",cell.length, amount_to_process);
+
+  *(uint32_t *)cell.payload = conn->topic_id;
+  *cell.payload = TOPIC_COMMAND_DATA;
+  cell.length += TOPIC_HEADER_SIZE;
+  cell.command = CELL_DATA;
+
   if(circ->n_conn == conn) { /* send it backward. we're an exit. */
     cell.aci = circ->p_aci;
-    cell.command = CELL_DATA;
-    if(circuit_deliver_data_cell(&cell, circ, circ->p_conn, 'e') < 0) {
-      log(LOG_DEBUG,"connection_raw_package_inbuf(): circuit_deliver_data_cell (backward) failed. Closing.");
+    if(circuit_deliver_data_cell_from_edge(&cell, circ, EDGE_EXIT) < 0) {
+      log(LOG_DEBUG,"connection_package_raw_inbuf(): circuit_deliver_data_cell_from_edge (backward) failed. Closing.");
       circuit_close(circ);
       return 0;
     }
-    assert(circ->n_receive_window > 0);
-    if(--circ->n_receive_window <= 0) { /* is it 0 after decrement? */
+    assert(conn->n_receive_topicwindow > 0);
+    if(--conn->n_receive_topicwindow <= 0) { /* is it 0 after decrement? */
       connection_stop_reading(circ->n_conn);
-      log(LOG_DEBUG,"connection_raw_package_inbuf(): receive_window at exit reached 0.");
+      log(LOG_DEBUG,"connection_package_raw_inbuf(): receive_topicwindow at exit reached 0.");
       return 0; /* don't process the inbuf any more */
     }
-    log(LOG_DEBUG,"connection_raw_package_inbuf(): receive_window at exit is %d",circ->n_receive_window);
+    log(LOG_DEBUG,"connection_package_raw_inbuf(): receive_topicwindow at exit is %d",conn->n_receive_topicwindow);
   } else { /* send it forward. we're an AP */
     cell.aci = circ->n_aci;
-    cell.command = CELL_DATA;
-    if(circuit_deliver_data_cell(&cell, circ, circ->n_conn, 'e') < 0) {
-      /* yes, we use 'e' here, because the AP connection must *encrypt* its input. */
-      log(LOG_DEBUG,"connection_raw_package_inbuf(): circuit_deliver_data_cell (forward) failed. Closing.");
+    if(circuit_deliver_data_cell_from_edge(&cell, circ, EDGE_AP) < 0) {
+      log(LOG_DEBUG,"connection_package_raw_inbuf(): circuit_deliver_data_cell_from_edge (forward) failed. Closing.");
       circuit_close(circ);
       return 0;
     }
-    assert(circ->p_receive_window > 0);
-    if(--circ->p_receive_window <= 0) { /* is it 0 after decrement? */
+    assert(conn->p_receive_topicwindow > 0);
+    if(--conn->p_receive_topicwindow <= 0) { /* is it 0 after decrement? */
       connection_stop_reading(circ->p_conn);
-      log(LOG_DEBUG,"connection_raw_package_inbuf(): receive_window at AP reached 0.");
+      log(LOG_DEBUG,"connection_package_raw_inbuf(): receive_topicwindow at AP reached 0.");
       return 0; /* don't process the inbuf any more */
     }
-    log(LOG_DEBUG,"connection_raw_package_inbuf(): receive_window at AP is %d",circ->p_receive_window);
+    log(LOG_DEBUG,"connection_package_raw_inbuf(): receive_topicwindow at AP is %d",conn->p_receive_topicwindow);
+  }
+  if(amount_to_process > CELL_PAYLOAD_SIZE - TOPIC_HEADER_SIZE) {
+    log(LOG_DEBUG,"connection_package_raw_inbuf(): recursing.");
+    goto repeat_connection_package_raw_inbuf;
   }
-  if(amount_to_process > CELL_PAYLOAD_SIZE)
-    log(LOG_DEBUG,"connection_raw_package_inbuf(): recursing.");
-    return connection_package_raw_inbuf(conn);
   return 0;
 }
 
-int connection_consider_sending_sendme(connection_t *conn) {
+int connection_consider_sending_sendme(connection_t *conn, int edge_type) {
   circuit_t *circ;
-  cell_t sendme;
+  cell_t cell;
 
   if(connection_outbuf_too_full(conn))
     return 0;
@@ -675,22 +671,34 @@
     log(LOG_DEBUG,"connection_consider_sending_sendme(): No circuit associated with conn. Skipping.");
     return 0;
   }
-  sendme.command = CELL_SENDME;
-  sendme.length = RECEIVE_WINDOW_INCREMENT;
 
-  if(circ->n_conn == conn) { /* we're at an exit */
-    if(circ->p_receive_window < RECEIVE_WINDOW_START-RECEIVE_WINDOW_INCREMENT) {
-      log(LOG_DEBUG,"connection_consider_sending_sendme(): Outbuf %d, Queueing sendme back.", conn->outbuf_flushlen);
-      circ->p_receive_window += RECEIVE_WINDOW_INCREMENT;
-      sendme.aci = circ->p_aci;
-      return connection_write_cell_to_buf(&sendme, circ->p_conn); /* (clobbers sendme) */
+  *(uint32_t *)cell.payload = conn->topic_id;
+  *cell.payload = TOPIC_COMMAND_SENDME;
+  cell.length += TOPIC_HEADER_SIZE;
+  cell.command = CELL_DATA;
+
+  if(edge_type == EDGE_EXIT) { /* we're at an exit */
+    if(conn->p_receive_topicwindow < TOPICWINDOW_START - TOPICWINDOW_INCREMENT) {
+      log(LOG_DEBUG,"connection_consider_sending_sendme(): Outbuf %d, Queueing topic sendme back.", conn->outbuf_flushlen);
+      conn->p_receive_topicwindow += TOPICWINDOW_INCREMENT;
+      cell.aci = circ->p_aci;
+      if(circuit_deliver_data_cell_from_edge(&cell, circ, edge_type) < 0) {
+        log(LOG_DEBUG,"connection_consider_sending_sendme(): circuit_deliver_data_cell_from_edge (backward) failed. Closing.");
+        circuit_close(circ);
+        return 0;
+      }
     }
   } else { /* we're at an AP */
-    if(circ->n_receive_window < RECEIVE_WINDOW_START-RECEIVE_WINDOW_INCREMENT) {
-      log(LOG_DEBUG,"connection_consider_sending_sendme(): Outbuf %d, Queueing sendme forward.", conn->outbuf_flushlen);
-      circ->n_receive_window += RECEIVE_WINDOW_INCREMENT;
-      sendme.aci = circ->n_aci;
-      return connection_write_cell_to_buf(&sendme, circ->n_conn); /* (clobbers sendme) */
+    assert(edge_type == EDGE_AP);
+    if(conn->n_receive_topicwindow < TOPICWINDOW_START-TOPICWINDOW_INCREMENT) {
+      log(LOG_DEBUG,"connection_consider_sending_sendme(): Outbuf %d, Queueing topic sendme forward.", conn->outbuf_flushlen);
+      conn->n_receive_topicwindow += TOPICWINDOW_INCREMENT;
+      cell.aci = circ->n_aci;
+      if(circuit_deliver_data_cell_from_edge(&cell, circ, edge_type) < 0) {
+        log(LOG_DEBUG,"connection_consider_sending_sendme(): circuit_deliver_data_cell_from_edge (forward) failed. Closing.");
+        circuit_close(circ);
+        return 0;
+      }
     }
   }
   return 0;
@@ -713,6 +721,8 @@
       return connection_exit_finished_flushing(conn);
     case CONN_TYPE_DIR:
       return connection_dir_finished_flushing(conn);
+    case CONN_TYPE_DNSMASTER:
+      return connection_dns_finished_flushing(conn);
     default:
       log(LOG_DEBUG,"connection_finished_flushing() got unexpected conn->type.");
       return -1;

Index: connection_ap.c
===================================================================
RCS file: /home/or/cvsroot/src/or/connection_ap.c,v
retrieving revision 1.21
retrieving revision 1.22
diff -u -d -r1.21 -r1.22
--- connection_ap.c	13 Oct 2002 13:17:27 -0000	1.21
+++ connection_ap.c	26 Jan 2003 09:02:24 -0000	1.22
@@ -22,7 +22,10 @@
     case AP_CONN_STATE_SOCKS_WAIT:
       return ap_handshake_process_socks(conn);
     case AP_CONN_STATE_OPEN:
-      return connection_package_raw_inbuf(conn);
+      if(connection_package_raw_inbuf(conn) < 0)
+        return -1;
+      circuit_consider_stop_edge_reading(circuit_get_by_conn(conn), EDGE_AP);
+      return 0;
     default:
       log(LOG_DEBUG,"connection_ap_process_inbuf() called in state where I'm waiting. Ignoring buf for now.");
   }
@@ -33,6 +36,7 @@
 int ap_handshake_process_socks(connection_t *conn) {
   char c;
   socks4_t socks4_info; 
+  circuit_t *circ;
 
   assert(conn);
 
@@ -118,8 +122,25 @@
     }
   }
 
-  /* now we're all ready to make an onion, etc */
-  return ap_handshake_create_onion(conn);
+  /* find the circuit that we should use, if there is one. */
+  circ = NULL; /* FIXME don't reuse circs, at least for me. */
+
+  /* now we're all ready to make an onion or send a begin */
+
+  if(circ) {
+    if(circ->state == CIRCUIT_STATE_OPEN) {
+      if(ap_handshake_send_begin(conn, circ) < 0) {
+        circuit_close(circ);
+        return -1;
+      }
+    }
+  } else {
+    if(ap_handshake_create_onion(conn) < 0) {
+      circuit_close(circ);
+      return -1;
+    }
+  }
+  return 0;
 }
 
 int ap_handshake_create_onion(connection_t *conn) {
@@ -240,13 +261,18 @@
     }
     connection_start_reading(p_conn); /* resume listening for reads */
     log(LOG_DEBUG,"ap_handshake_n_conn_open(): Found circ, sending onion.");
-    if(ap_handshake_send_onion(p_conn, or_conn, circ)<0) {
+    if(ap_handshake_send_onion(p_conn, or_conn, circ) < 0) {
       log(LOG_DEBUG,"ap_handshake_n_conn_open(): circuit marked for closing.");
-      p_conn->marked_for_close = 1;
-      return; /* XXX will want to try the rest too */
-    } else {
-      circ = circuit_enumerate_by_naddr_nport(circ, or_conn->addr, or_conn->port);
+      circuit_close(circ);
+      return; /* FIXME will want to try the other circuits too? */
     }
+    for(p_conn = p_conn->next_topic; p_conn; p_conn = p_conn->next_topic) { /* start up any other pending topics */
+      if(ap_handshake_send_begin(p_conn, circ) < 0) {
+        circuit_close(circ);
+        return;
+      }
+    }
+    circ = circuit_enumerate_by_naddr_nport(circ, or_conn->addr, or_conn->port);
   }
 }
 
@@ -289,50 +315,39 @@
   }
   free(tmpbuf);
 
-#if 0
-  /* deliver the ss in a data cell */
-  cell.command = CELL_DATA;
-  cell.aci = circ->n_aci;
-  cell.length = sizeof(ss_t);
-  memcpy(cell.payload, &ap_conn->ss, sizeof(ss_t));
-  log(LOG_DEBUG,"ap_handshake_send_onion(): Sending a data cell for ss...");
-  if(circuit_deliver_data_cell(&cell, circ, circ->n_conn, 'e') < 0) {
-    log(LOG_DEBUG,"ap_handshake_send_onion(): failed to deliver ss cell. Closing.");
-    circuit_close(circ);
+  if(ap_handshake_send_begin(ap_conn, circ) < 0) {
     return -1;
   }
-#endif
 
-  /* deliver the dest_addr in a data cell */
-  cell.command = CELL_DATA;
-  cell.aci = circ->n_aci;
-  strncpy(cell.payload, ap_conn->dest_addr, CELL_PAYLOAD_SIZE);
-  cell.length = strlen(cell.payload)+1;
-  log(LOG_DEBUG,"ap_handshake_send_onion(): Sending a data cell for addr...");
-  if(circuit_deliver_data_cell(&cell, circ, circ->n_conn, 'e') < 0) {
-    log(LOG_DEBUG,"ap_handshake_send_onion(): failed to deliver addr cell. Closing.");
-    circuit_close(circ);
-    return -1;
-  }
+  circ->state = CIRCUIT_STATE_OPEN;
+  /* FIXME should set circ->expire to something here */
 
-  /* deliver the dest_port in a data cell */
+  return 0;
+}
+
+int ap_handshake_send_begin(connection_t *ap_conn, circuit_t *circ) {
+  cell_t cell;
+
+  memset(&cell, 0, sizeof(cell_t));
+  /* deliver the dest_addr in a data cell */
   cell.command = CELL_DATA;
   cell.aci = circ->n_aci;
-  snprintf(cell.payload, CELL_PAYLOAD_SIZE, "%d", ap_conn->dest_port);
-  cell.length = strlen(cell.payload)+1;
-  log(LOG_DEBUG,"ap_handshake_send_onion(): Sending a data cell for port...");
-  if(circuit_deliver_data_cell(&cell, circ, circ->n_conn, 'e') < 0) {
-    log(LOG_DEBUG,"ap_handshake_send_onion(): failed to deliver port cell. Closing.");
-    circuit_close(circ);
+  crypto_pseudo_rand(3, cell.payload+1); /* byte 0 is blank, bytes 1-3 are random */
+  /* FIXME check for collisions */
+  cell.payload[0] = 0;
+  ap_conn->topic_id = *(uint32_t *)cell.payload;
+  cell.payload[0] = TOPIC_COMMAND_BEGIN;
+  snprintf(cell.payload+4, CELL_PAYLOAD_SIZE-4, "%s,%d", ap_conn->dest_addr, ap_conn->dest_port);
+  cell.length = strlen(cell.payload+TOPIC_HEADER_SIZE)+1+TOPIC_HEADER_SIZE;
+  log(LOG_DEBUG,"ap_handshake_send_begin(): Sending data cell to begin topic %d.", ap_conn->topic_id);
+  if(circuit_deliver_data_cell_from_edge(&cell, circ, EDGE_AP) < 0) {
+    log(LOG_DEBUG,"ap_handshake_send_begin(): failed to deliver begin cell. Closing.");
     return -1;
   }
-
-  circ->state = CIRCUIT_STATE_OPEN;
+  ap_conn->n_receive_topicwindow = TOPICWINDOW_START;
+  ap_conn->p_receive_topicwindow = TOPICWINDOW_START;
   ap_conn->state = AP_CONN_STATE_OPEN;
-  log(LOG_INFO,"ap_handshake_send_onion(): Address/port sent, ap socket %d, n_aci %d",ap_conn->s,circ->n_aci);
-
-  /* FIXME should set circ->expire to something here */
-
+  log(LOG_INFO,"ap_handshake_send_begin(): Address/port sent, ap socket %d, n_aci %d",ap_conn->s,circ->n_aci);
   return 0;
 }
 
@@ -351,30 +366,99 @@
   return connection_flush_buf(conn); /* try to flush it, in case we're about to close the conn */
 }
 
-int connection_ap_send_connected(connection_t *conn) {
-  assert(conn);
+int connection_ap_process_data_cell(cell_t *cell, circuit_t *circ) {
+  connection_t *conn;
+  int topic_command;
+  int topic_id;
 
-  return ap_handshake_socks_reply(conn, SOCKS4_REQUEST_GRANTED);
-}
+  /* an incoming data cell has arrived */
 
-int connection_ap_process_data_cell(cell_t *cell, connection_t *conn) {
+  assert(cell && circ);
 
-  /* an incoming data cell has arrived */
+  topic_command = *cell->payload;
+  *cell->payload = 0;
+  topic_id = *(uint32_t *)cell->payload;
+  log(LOG_DEBUG,"connection_ap_process_data_cell(): command %d topic %d", topic_command, topic_id);
 
-  assert(conn && conn->type == CONN_TYPE_AP);
+  circuit_consider_sending_sendme(circ, EDGE_AP);
 
-  if(conn->state != AP_CONN_STATE_OPEN) {
+  for(conn = circ->p_conn; conn && conn->topic_id != topic_id; conn = conn->next_topic) ;
+
+  /* now conn is either NULL, in which case we don't recognize the topic_id, or
+   * it is set, in which case cell is talking about this conn.
+   */
+
+  if(conn && conn->state != AP_CONN_STATE_OPEN) {
     /* we should not have gotten this cell */
-    log(LOG_DEBUG,"connection_ap_process_data_cell(): Got a data cell when not in 'open' state. Closing.");
-    return -1;
+    log(LOG_DEBUG,"connection_ap_process_data_cell(): Got a data cell when not in 'open' state. Dropping.");
+    return 0;
   }
 
-//  log(LOG_DEBUG,"connection_ap_process_data_cell(): In state 'open', writing to buf.");
+  switch(topic_command) {
+    case TOPIC_COMMAND_BEGIN:
+      log(LOG_INFO,"connection_ap_process_data_cell(): topic begin request unsupported. Dropping.");
+      break;
+    case TOPIC_COMMAND_DATA:
+      if(!conn) {
+        log(LOG_DEBUG,"connection_ap_process_data_cell(): data cell dropped, unknown topic %d.",topic_id);
+        return 0;
+      }
+      if(--conn->n_receive_topicwindow < 0) { /* is it below 0 after decrement? */
+        log(LOG_DEBUG,"connection_ap_process_data_cell(): receive_topicwindow at exit below 0. Killing.");
+        return -1; /* exit node breaking protocol. kill the whole circuit. */
+      }
+      log(LOG_DEBUG,"connection_ap_process_data_cell(): willing to receive %d more cells from circ",conn->n_receive_topicwindow);
 
-  if(connection_write_to_buf(cell->payload, cell->length, conn) < 0)
-    return -1;
-  return connection_consider_sending_sendme(conn);
-}     
+      if(connection_write_to_buf(cell->payload + TOPIC_HEADER_SIZE,
+                                 cell->length - TOPIC_HEADER_SIZE, conn) < 0) {
+        conn->marked_for_close = 1;
+        return 0;
+      }
+      if(connection_consider_sending_sendme(conn, EDGE_AP) < 0)
+        conn->marked_for_close = 1;
+      return 0;
+    case TOPIC_COMMAND_END:
+      if(!conn) {
+        log(LOG_DEBUG,"connection_ap_process_data_cell(): end cell dropped, unknown topic %d.",topic_id);
+        return 0;
+      }
+      log(LOG_DEBUG,"connection_ap_process_data_cell(): end cell for topic %d. Removing topic.",topic_id);
+
+      /* go through and identify who points to conn. remove conn from the list. */
+#if 0
+      if(conn == circ->p_conn) {
+        circ->p_conn = conn->next_topic;
+      }
+      for(prevconn = circ->p_conn; prevconn->next_topic != conn; prevconn = prevconn->next_topic) ;
+      prevconn->next_topic = conn->next_topic;
+#endif
+      conn->marked_for_close = 1;
+      break;
+    case TOPIC_COMMAND_CONNECTED:
+      if(!conn) {
+        log(LOG_DEBUG,"connection_ap_process_data_cell(): connected cell dropped, unknown topic %d.",topic_id);
+        break;
+      }
+      log(LOG_DEBUG,"connection_ap_process_data_cell(): Connected! Notifying application.");
+      if(ap_handshake_socks_reply(conn, SOCKS4_REQUEST_GRANTED) < 0) {
+        conn->marked_for_close = 1;
+      }
+      break;
+    case TOPIC_COMMAND_SENDME:
+      if(!conn) {
+        log(LOG_DEBUG,"connection_exit_process_data_cell(): sendme cell dropped, unknown topic %d.",topic_id);
+        return 0;
+      }
+      conn->p_receive_topicwindow += TOPICWINDOW_INCREMENT;
+      connection_start_reading(conn);
+      connection_package_raw_inbuf(conn); /* handle whatever might still be on the inbuf */
+      circuit_consider_stop_edge_reading(circ, EDGE_AP);
+      break;
+    default:
+      log(LOG_DEBUG,"connection_ap_process_data_cell(): unknown topic command %d.",topic_command);
+  }
+  return 0;
+}
 
 int connection_ap_finished_flushing(connection_t *conn) {
 
@@ -384,14 +468,13 @@
     case AP_CONN_STATE_OPEN:
       /* FIXME down the road, we'll clear out circuits that are pending to close */
       connection_stop_writing(conn);
-      return connection_consider_sending_sendme(conn);
+      return connection_consider_sending_sendme(conn, EDGE_AP);
     default:
       log(LOG_DEBUG,"Bug: connection_ap_finished_flushing() called in unexpected state.");
       return 0;
   }
 
   return 0;
-
 }
 
 int connection_ap_create_listener(struct sockaddr_in *bindaddr) {

Index: connection_exit.c
===================================================================
RCS file: /home/or/cvsroot/src/or/connection_exit.c,v
retrieving revision 1.15
retrieving revision 1.16
diff -u -d -r1.15 -r1.16
--- connection_exit.c	24 Nov 2002 08:33:15 -0000	1.15
+++ connection_exit.c	26 Jan 2003 09:02:24 -0000	1.16
@@ -21,10 +21,12 @@
       log(LOG_DEBUG,"connection_exit_process_inbuf(): text from server while in 'connecting' state. Leaving it on buffer.");
       return 0;
     case EXIT_CONN_STATE_OPEN:
-      return connection_package_raw_inbuf(conn);
+      if(connection_package_raw_inbuf(conn) < 0)
+        return -1;
+      circuit_consider_stop_edge_reading(circuit_get_by_conn(conn), EDGE_EXIT);
+      return 0;
   }
   return 0;
-
 }
 
 int connection_exit_finished_flushing(connection_t *conn) {
@@ -46,20 +48,21 @@
       }
       /* the connect has finished. */
 
-      log(LOG_DEBUG,"connection_exit_finished_flushing() : Connection to %s:%u established.",
+      log(LOG_DEBUG,"connection_exit_finished_flushing(): Connection to %s:%u established.",
           conn->address,conn->port);
       
       conn->state = EXIT_CONN_STATE_OPEN;
+      connection_watch_events(conn, POLLIN); /* stop writing, continue reading */
       if(connection_wants_to_flush(conn)) /* in case there are any queued data cells */
         connection_start_writing(conn);
-      connection_start_reading(conn);
       return
-        connection_exit_send_connected(conn) || /* deliver a 'connected' cell back through the circuit. */
+        connection_exit_send_connected(conn) || /* deliver a 'connected' data cell back through the circuit. */
         connection_process_inbuf(conn); /* in case the server has written anything */
     case EXIT_CONN_STATE_OPEN:
       /* FIXME down the road, we'll clear out circuits that are pending to close */
+      log(LOG_DEBUG,"connection_exit_finished_flushing(): finished flushing.");
       connection_stop_writing(conn);
-      connection_consider_sending_sendme(conn);
+      connection_consider_sending_sendme(conn, EDGE_EXIT);
       return 0;
     default:
       log(LOG_DEBUG,"Bug: connection_exit_finished_flushing() called in unexpected state.");
@@ -71,6 +74,7 @@
 
 int connection_exit_send_connected(connection_t *conn) {
   circuit_t *circ;
+  cell_t cell;
 
   assert(conn);
 
@@ -81,123 +85,252 @@
     return -1;
   }
 
-  return connection_send_connected(circ->p_aci, circ->p_conn);
+  memset(&cell, 0, sizeof(cell_t));
+  cell.aci = circ->p_aci;
+  cell.command = CELL_DATA;
+  *(uint32_t *)cell.payload = conn->topic_id;
+  *cell.payload = TOPIC_COMMAND_CONNECTED;
+  cell.length = TOPIC_HEADER_SIZE;
+  log(LOG_INFO,"connection_exit_send_connected(): passing back cell (aci %d).",circ->p_aci);
+
+  if(circuit_deliver_data_cell_from_edge(&cell, circ, EDGE_EXIT) < 0) {
+    log(LOG_DEBUG,"connection_exit_send_connected(): circuit_deliver_data_cell (backward) failed. Closing.");
+    circuit_close(circ);
+    return 0;
+  }
+  return 0;
 }
 
-int connection_exit_process_data_cell(cell_t *cell, connection_t *conn) {
-  struct hostent *rent;
-  struct sockaddr_in dest_addr;
-  int s; /* for the new socket, if we're on connecting_wait */
+int connection_exit_begin_conn(cell_t *cell, circuit_t *circ) {
+  connection_t *n_conn;
+  char *comma;
+
+  if(!memchr(cell->payload + TOPIC_HEADER_SIZE,0,cell->length - TOPIC_HEADER_SIZE)) {
+    log(LOG_WARNING,"connection_exit_begin_conn(): topic begin cell has no \\0. Dropping.");
+    return 0;
+  }
+  comma = strchr(cell->payload + TOPIC_HEADER_SIZE, ',');
+  if(!comma) {
+    log(LOG_WARNING,"connection_exit_begin_conn(): topic begin cell has no comma. Dropping.");
+    return 0;
+  }
+  *comma = 0;
+
+  if(!atoi(comma+1)) { /* bad port */
+    log(LOG_DEBUG,"connection_exit_begin_conn(): topic begin cell has invalid port. Dropping.");
+    return 0;
+  }
+
+  log(LOG_DEBUG,"connection_exit_begin_conn(): Creating new exit connection.");
+  n_conn = connection_new(CONN_TYPE_EXIT);
+  if(!n_conn) {
+    log(LOG_DEBUG,"connection_exit_begin_conn(): connection_new failed. Closing.");
+    return -1;
+  }
+
+  cell->payload[0] = 0;
+  n_conn->topic_id = *(uint32_t *)(cell->payload);
+
+  n_conn->address = strdup(cell->payload + TOPIC_HEADER_SIZE);
+  n_conn->port = atoi(comma+1);
+  n_conn->state = EXIT_CONN_STATE_RESOLVING;
+  n_conn->receiver_bucket = -1; /* edge connections don't do receiver buckets */
+  n_conn->bandwidth = -1;
+  n_conn->s = -1; /* not yet valid */
+  n_conn->n_receive_topicwindow = TOPICWINDOW_START;
+  n_conn->p_receive_topicwindow = TOPICWINDOW_START;
+  if(connection_add(n_conn) < 0) { /* no space, forget it */
+    log(LOG_DEBUG,"connection_exit_begin_conn(): connection_add failed. Closing.");
+    connection_free(n_conn);
+    return -1;
+  }
+
+  /* add it into the linked list of topics on this circuit */
+  n_conn->next_topic = circ->n_conn;
+  circ->n_conn = n_conn;
+
+  /* send it off to the gethostbyname farm */
+  if(dns_tor_to_master(n_conn->address) < 0) {
+    log(LOG_DEBUG,"connection_exit_begin_conn(): Couldn't queue resolve request.");
+    connection_remove(n_conn);
+    connection_free(n_conn);
+    return 0;
+  }
+
+  return 0;
+}
+
+int connection_exit_process_data_cell(cell_t *cell, circuit_t *circ) {
+  connection_t *conn;
+  int topic_command;
+  int topic_id;
 
   /* an outgoing data cell has arrived */
 
-  assert(conn && conn->type == CONN_TYPE_EXIT);
+  assert(cell && circ);
+
+  topic_command = *cell->payload;
+  *cell->payload = 0;
+  topic_id = *(uint32_t *)cell->payload;
+  log(LOG_DEBUG,"connection_exit_process_data_cell(): command %d topic %d", topic_command, topic_id);
+
+  circuit_consider_sending_sendme(circ, EDGE_EXIT);
+
+  for(conn = circ->n_conn; conn && conn->topic_id != topic_id; conn = conn->next_topic) ;
+
+  /* now conn is either NULL, in which case we don't recognize the topic_id, or
+   * it is set, in which case cell is talking about this conn.
+   */
+
+  if(conn && conn->state != EXIT_CONN_STATE_OPEN) {
+    if(topic_command == TOPIC_COMMAND_END) {
+      log(LOG_ERR,"connection_exit_process_data_cell(): Got an end before we're connected. Marking for close.");
+      conn->marked_for_close = 1;
+      return 0;
+    } else {
+      log(LOG_INFO,"connection_exit_process_data_cell(): Got a non-end data cell when not in 'open' state. Dropping.");
+      return 0;
+    }
+  }
+
+  switch(topic_command) {
+    case TOPIC_COMMAND_BEGIN:
+      if(conn) {
+        log(LOG_INFO,"connection_exit_process_data_cell(): begin cell for known topic. Dropping.");
+        return 0;
+      }
+      return connection_exit_begin_conn(cell, circ);
+    case TOPIC_COMMAND_DATA:
+      if(!conn) {
+        log(LOG_INFO,"connection_exit_process_data_cell(): data cell for unknown topic. Dropping.");
+        return 0;
+      }
+      if(--conn->p_receive_topicwindow < 0) { /* is it below 0 after decrement? */
+        log(LOG_DEBUG,"connection_exit_process_data_cell(): receive_topicwindow at exit below 0. Killing.");
+        return -1; /* AP breaking protocol. kill the whole circuit. */
+      }
+      log(LOG_DEBUG,"connection_exit_process_data_cell(): willing to receive %d more cells from circ",conn->p_receive_topicwindow);
+
+      if(conn->state != EXIT_CONN_STATE_OPEN) {
+        log(LOG_DEBUG,"connection_exit_process_data_cell(): data received while resolving/connecting. Queueing.");
+      }
+      log(LOG_DEBUG,"connection_exit_process_data_cell(): put %d bytes on outbuf.",cell->length - TOPIC_HEADER_SIZE);
+      if(connection_write_to_buf(cell->payload + TOPIC_HEADER_SIZE,
+                                 cell->length - TOPIC_HEADER_SIZE, conn) < 0) {
+        log(LOG_INFO,"connection_exit_process_data_cell(): write to buf failed. Marking for close.");
+        conn->marked_for_close = 1;
+        return 0;
+      }
+      if(connection_consider_sending_sendme(conn, EDGE_EXIT) < 0)
+        conn->marked_for_close = 1;
+      return 0;
+    case TOPIC_COMMAND_END:
+      if(!conn) {
+        log(LOG_DEBUG,"connection_exit_process_data_cell(): end cell dropped, unknown topic %d.",topic_id);
+        return 0;
+      }
+      log(LOG_DEBUG,"connection_exit_process_data_cell(): end cell for topic %d. Removing topic.",topic_id);
 
-  switch(conn->state) {
-    case EXIT_CONN_STATE_CONNECTING_WAIT:
-      log(LOG_DEBUG,"connection_exit_process_data_cell(): state is connecting_wait. cell length %d.", cell->length);
 #if 0
-      if(!conn->ss_received) { /* this cell contains the ss */
-        if(cell->length != sizeof(ss_t)) {
-          log(LOG_DEBUG,"connection_exit_process_data_cell(): Supposed to contain SS but wrong size. Closing.");
-          return -1;
-        }
-        memcpy(&conn->ss, cell->payload, cell->length);
-        if(conn->ss.addr_fmt != SS_ADDR_FMT_ASCII_HOST_PORT) { /* unrecognized address format */
-          log(LOG_DEBUG,"connection_exit_process_data_cell(): SS has unrecognized address format. Closing.");
-          return -1;
-        }
-        conn->ss_received = 1;
-        log(LOG_DEBUG,"connection_exit_process_data_cell(): SS received.");
-      } else 
+      /* go through and identify who points to conn. remove conn from the list. */
+      if(conn == circ->n_conn) {
+        circ->n_conn = conn->next_topic;
+      }
+      for(prevconn = circ->n_conn; prevconn->next_topic != conn; prevconn = prevconn->next_topic) ;
+      prevconn->next_topic = conn->next_topic;
 #endif
-      if (!conn->addr) { /* this cell contains the dest addr */
-        if(!memchr(cell->payload,0,cell->length)) {
-          log(LOG_DEBUG,"connection_exit_process_data_cell(): dest_addr cell has no \\0. Closing.");
-          return -1;
-        }
-        conn->address = strdup(cell->payload);
-        rent = gethostbyname(cell->payload);
-        if (!rent) { 
-          log(LOG_ERR,"connection_exit_process_data_cell(): Could not resolve dest addr %s.",cell->payload);
-          return -1;
-        }
-        memcpy(&conn->addr, rent->h_addr,rent->h_length); 
-        conn->addr = ntohl(conn->addr); /* get it back to host order */
-        log(LOG_DEBUG,"connection_exit_process_data_cell(): addr is %s.",cell->payload);
-      } else if (!conn->port) { /* this cell contains the dest port */
-        if(!memchr(cell->payload,'\0',cell->length)) {
-          log(LOG_DEBUG,"connection_exit_process_data_cell(): dest_port cell has no \\0. Closing.");
-          return -1;
-        }
-        conn->port = atoi(cell->payload);
-        if(!conn->port) { /* bad port */
-          log(LOG_DEBUG,"connection_exit_process_data_cell(): dest_port cell isn't a valid number. Closing.");
-          return -1;
-        }
-        /* all the necessary info is here. Start the connect() */
-        s=socket(PF_INET,SOCK_STREAM,IPPROTO_TCP);
-        if (s < 0)
-        {
-          log(LOG_ERR,"connection_exit_process_data_cell(): Error creating network socket.");
-          return -1;
-        }
-        fcntl(s, F_SETFL, O_NONBLOCK); /* set s to non-blocking */
-      
-        memset((void *)&dest_addr,0,sizeof(dest_addr));
-        dest_addr.sin_family = AF_INET;
-        dest_addr.sin_port = htons(conn->port);
-        dest_addr.sin_addr.s_addr = htonl(conn->addr);
-      
-        log(LOG_DEBUG,"connection_exit_process_data_cell(): Connecting to %s:%u.",conn->address,conn->port); 
+      conn->marked_for_close = 1;
+      break;
+    case TOPIC_COMMAND_CONNECTED:
+      log(LOG_INFO,"connection_exit_process_data_cell(): topic connected request unsupported. Dropping.");
+      break;
+    case TOPIC_COMMAND_SENDME:
+      if(!conn) {
+        log(LOG_DEBUG,"connection_exit_process_data_cell(): sendme cell dropped, unknown topic %d.",topic_id);
+        return 0;
+      }
+      conn->n_receive_topicwindow += TOPICWINDOW_INCREMENT;
+      connection_start_reading(conn);
+      connection_package_raw_inbuf(conn); /* handle whatever might still be on the inbuf */
+      circuit_consider_stop_edge_reading(circ, EDGE_EXIT);
+      break;
+    default:
+      log(LOG_DEBUG,"connection_exit_process_data_cell(): unknown topic command %d.",topic_command);
+  }
+  return 0;
+}
 
-        if(connect(s,(struct sockaddr *)&dest_addr,sizeof(dest_addr)) < 0){
-          if(errno != EINPROGRESS){
-            /* yuck. kill it. */
-            log(LOG_DEBUG,"connection_exit_process_data_cell(): Connect failed.");
-            return -1;
-          } else {
-            /* it's in progress. set state appropriately and return. */
-            conn->s = s;
-            connection_set_poll_socket(conn);
-            conn->state = EXIT_CONN_STATE_CONNECTING;
-      
-            log(LOG_DEBUG,"connection_exit_process_data_cell(): connect in progress, socket %d.",s);
-            connection_watch_events(conn, POLLOUT | POLLIN);
-            return 0;
-          }
-        }
+#if 0
+static uint32_t address_to_addr(char *address) {
+  struct hostent *rent;
+  uint32_t addr;
+  char *caddr;
 
-        /* it succeeded. we're connected. */
-        log(LOG_DEBUG,"connection_exit_process_data_cell(): Connection to %s:%u established.",conn->address,conn->port);
+  rent = gethostbyname(address);
+  if (!rent) { 
+    log(LOG_ERR,"address_to_addr(): Could not resolve dest addr %s.",address);
+    return 0;
+  }
+  memcpy(&addr, rent->h_addr,rent->h_length); 
+  addr = ntohl(addr); /* get it back to host order */
+  caddr = (char *)&addr;
+  log(LOG_DEBUG,"address_to_addr(): addr is %d %d %d %d",
+    caddr[0], caddr[1], caddr[2], caddr[3]);
+  return addr;
+}
+#endif
 
-        conn->s = s;
-        connection_set_poll_socket(conn);
-        conn->state = EXIT_CONN_STATE_OPEN;
-        if(connection_wants_to_flush(conn)) { /* in case there are any queued data cells */
-          log(LOG_NOTICE,"connection_exit_process_data_cell(): tell roger: newly connected conn had data waiting!");
-//          connection_start_writing(conn);
-        }
-//        connection_process_inbuf(conn);
-        connection_watch_events(conn, POLLIN);
+int connection_exit_connect(connection_t *conn) {
+  int s; /* for the new socket */
+  struct sockaddr_in dest_addr;
 
-        /* also, deliver a 'connected' cell back through the circuit. */
-        return connection_exit_send_connected(conn);
-      } else {
-        log(LOG_DEBUG,"connection_exit_process_data_cell(): in connecting_wait, but I've already received everything. Closing.");
-        return -1;
-      }
+  /* all the necessary info is here. Start the connect() */
+  s=socket(PF_INET,SOCK_STREAM,IPPROTO_TCP);
+  if (s < 0) {
+    log(LOG_ERR,"connection_exit_connect(): Error creating network socket.");
+    return -1;
+  }
+  fcntl(s, F_SETFL, O_NONBLOCK); /* set s to non-blocking */
+
+  memset((void *)&dest_addr,0,sizeof(dest_addr));
+  dest_addr.sin_family = AF_INET;
+  dest_addr.sin_port = htons(conn->port);
+  dest_addr.sin_addr.s_addr = htonl(conn->addr);
+
+  log(LOG_DEBUG,"connection_exit_connect(): Connecting to %s:%u.",conn->address,conn->port); 
+
+  if(connect(s,(struct sockaddr *)&dest_addr,sizeof(dest_addr)) < 0) {
+    if(errno != EINPROGRESS){
+      /* yuck. kill it. */
+      perror("connect");
+      log(LOG_DEBUG,"connection_exit_connect(): Connect failed.");
+      return -1;
+    } else {
+      /* it's in progress. set state appropriately and return. */
+      conn->s = s;
+      connection_set_poll_socket(conn);
+      conn->state = EXIT_CONN_STATE_CONNECTING;
+
+      log(LOG_DEBUG,"connection_exit_connect(): connect in progress, socket %d.",s);
+      connection_watch_events(conn, POLLOUT | POLLIN);
       return 0;
-    case EXIT_CONN_STATE_CONNECTING:
-      log(LOG_DEBUG,"connection_exit_process_data_cell(): Data receiving while connecting. Queueing.");
-      /* we stay listening for writable, so connect() can finish */
-      /* fall through to the next state -- write the cell and consider sending back a sendme */
-    case EXIT_CONN_STATE_OPEN:
-      if(connection_write_to_buf(cell->payload, cell->length, conn) < 0)
-        return -1;
-      return connection_consider_sending_sendme(conn);
+    }
   }
 
-  return 0;
+  /* it succeeded. we're connected. */
+  log(LOG_DEBUG,"connection_exit_connect(): Connection to %s:%u established.",conn->address,conn->port);
+
+  conn->s = s;
+  connection_set_poll_socket(conn);
+  conn->state = EXIT_CONN_STATE_OPEN;
+  if(connection_wants_to_flush(conn)) { /* in case there are any queued data cells */
+    log(LOG_ERR,"connection_exit_connect(): tell roger: newly connected conn had data waiting!");
+//    connection_start_writing(conn);
+  }
+//   connection_process_inbuf(conn);
+  connection_watch_events(conn, POLLIN);
+
+  /* also, deliver a 'connected' cell back through the circuit. */
+  return connection_exit_send_connected(conn);
 }
 

Index: main.c
===================================================================
RCS file: /home/or/cvsroot/src/or/main.c,v
retrieving revision 1.37
retrieving revision 1.38
diff -u -d -r1.37 -r1.38
--- main.c	31 Dec 2002 15:04:14 -0000	1.37
+++ main.c	26 Jan 2003 09:02:24 -0000	1.38
@@ -76,7 +76,10 @@
   assert(nfds>0);
 
   log(LOG_INFO,"connection_remove(): removing socket %d, nfds now %d",conn->s, nfds-1);
-  circuit_about_to_close_connection(conn); /* flush and send destroys for all circuits on this conn */
+  circuit_about_to_close_connection(conn); /* if it's an edge conn, remove it from the list
+                                            * of conn's on this circuit. If it's not on an edge,
+                                            * flush and send destroys for all circuits on this conn
+                                            */
 
   current_index = conn->poll_index;
   if(current_index == nfds-1) { /* this is the end */
@@ -126,7 +129,6 @@
       return conn;
     }
   }
-
   /* guess not */
   return NULL;
 
@@ -142,7 +144,6 @@
     if(conn->addr == addr && conn->port == port)
        return conn;
   }
-
   return NULL;
 }
 
@@ -155,7 +156,21 @@
     if(conn->type == type)
        return conn;
   }
+  return NULL;
+}
+
+connection_t *connection_get_pendingresolve_by_address(char *address) {
+  int i;
+  connection_t *conn;
 
+  for(i=0;i<nfds;i++) {
+    conn = connection_array[i];
+    if(conn->type == CONN_TYPE_EXIT &&
+       conn->state == EXIT_CONN_STATE_RESOLVING &&
+       !strcmp(conn->address, address)) {
+         return conn;
+    }
+  }
   return NULL;
 }
 
@@ -223,7 +238,7 @@
       retval = connection_read_to_buf(conn);
       if (retval < 0 && conn->type == CONN_TYPE_DIR && conn->state == DIR_CONN_STATE_CONNECTING) {
          /* it's a directory server and connecting failed: forget about this router */
-         router_forget_router(conn->addr,conn->port);
+         router_forget_router(conn->addr,conn->port); /* FIXME i don't think this function works. */
       }
       if (retval >= 0) { /* all still well */
         retval = connection_process_inbuf(conn);
@@ -319,8 +334,8 @@
       /* it's time to rebuild our directory */
       if(time_to_rebuild_directory == 0) { 
         /* we just started up. if we build a directory now it will be meaningless. */
-        log(LOG_DEBUG,"prepare_for_poll(): Delaying initial dir build for 15 seconds.");
-        time_to_rebuild_directory = now.tv_sec + 15; /* try in 15 seconds */
+        log(LOG_DEBUG,"prepare_for_poll(): Delaying initial dir build for 10 seconds.");
+        time_to_rebuild_directory = now.tv_sec + 10; /* try in 10 seconds */
       } else {
         directory_rebuild();
         time_to_rebuild_directory = now.tv_sec + options.DirRebuildPeriod;
@@ -641,19 +656,25 @@
 int main(int argc, char *argv[]) {
   int retval = 0;
 
-  signal (SIGINT, catch); /* catch kills so we can exit cleanly */
-//  signal (SIGABRT, catch);
-  signal (SIGTERM, catch);
-  signal (SIGUSR1, catch); /* to dump stats to stdout */
-  signal (SIGHUP, catch); /* to reload directory */
-
   if(getconfig(argc,argv,&options))
     exit(1);
   log(options.loglevel,NULL);         /* assign logging severity level from options */
   global_role = options.Role;   /* assign global_role from options. FIXME: remove from global namespace later. */
 
-  crypto_global_init();
+  if(options.Role & ROLE_OR_LISTEN) { /* only spawn dns handlers if we're a router */
+    if(dns_master_start() < 0) {
+      log(LOG_ERR,"main(): We're running without a dns handler. Bad news.");
+    }
+  }
+
   init_tracked_tree(); /* initialize the replay detection tree */
+
+  signal (SIGINT,  catch); /* catch kills so we can exit cleanly */
+  signal (SIGTERM, catch);
+  signal (SIGUSR1, catch); /* to dump stats to stdout */
+  signal (SIGHUP,  catch); /* to reload directory */
+
+  crypto_global_init();
   retval = do_main_loop();
   crypto_global_cleanup();
 

Index: onion.c
===================================================================
RCS file: /home/or/cvsroot/src/or/onion.c,v
retrieving revision 1.25
retrieving revision 1.26
diff -u -d -r1.25 -r1.26
--- onion.c	31 Dec 2002 15:04:14 -0000	1.25
+++ onion.c	26 Jan 2003 09:02:24 -0000	1.26
@@ -23,17 +23,6 @@
    return ACI_TYPE_LOWER; 
 }
 
-struct data_queue_t {
-  cell_t *cell;
-  struct data_queue_t *next;
-};
-
-struct onion_queue_t {
-  circuit_t *circ;
-  struct data_queue_t *data_cells;
-  struct onion_queue_t *next;
-};
-
 /* global (within this file) variables used by the next few functions */
 static struct onion_queue_t *ol_list=NULL;
 static struct onion_queue_t *ol_tail=NULL;
@@ -150,12 +139,7 @@
 
 }
 
-/* a data cell has arrived for a circuit which is still pending. Find
- * the right entry in ol_list, and add it to the end of the 'data_cells'
- * list.
- */
-void onion_pending_data_add(circuit_t *circ, cell_t *cell) {
-  struct onion_queue_t *tmpo;
+struct data_queue_t *data_queue_add(struct data_queue_t *list, cell_t *cell) {
   struct data_queue_t *tmpd, *newd;
 
   newd = malloc(sizeof(struct data_queue_t));
@@ -163,15 +147,25 @@
   newd->cell = malloc(sizeof(cell_t));
   memcpy(newd->cell, cell, sizeof(cell_t));
 
+  if(!list) {
+    return newd;
+  }
+  for(tmpd = list; tmpd->next; tmpd=tmpd->next) ;
+  /* now tmpd->next is null */
+  tmpd->next = newd;
+  return list;
+}
+
+/* a data cell has arrived for a circuit which is still pending. Find
+ * the right entry in ol_list, and add it to the end of the 'data_cells'
+ * list.
+ */
+void onion_pending_data_add(circuit_t *circ, cell_t *cell) {
+  struct onion_queue_t *tmpo;
+
   for(tmpo=ol_list; tmpo; tmpo=tmpo->next) {
     if(tmpo->circ == circ) {
-      if(!tmpo->data_cells) {
-        tmpo->data_cells = newd;
-        return;
-      }
-      for(tmpd = tmpo->data_cells; tmpd->next; tmpd=tmpd->next) ;
-      /* now tmpd->next is null */
-      tmpd->next = newd;
+      tmpo->data_cells = data_queue_add(tmpo->data_cells, cell);
       return;
     }
   }
@@ -286,6 +280,8 @@
       return -1;
     }
   } else { /* this is destined for an exit */
+    log(LOG_DEBUG,"command_process_create_cell(): create cell reached exit. Circuit established.");
+#if 0
     log(LOG_DEBUG,"command_process_create_cell(): Creating new exit connection.");
     n_conn = connection_new(CONN_TYPE_EXIT);
     if(!n_conn) {
@@ -302,6 +298,7 @@
       return -1;
     }
     circ->n_conn = n_conn;
+#endif
   }
   return 0;
 }
@@ -522,7 +519,7 @@
       /* set crypto functions */
       hop->backf = *(layer+1) >> 4;
       hop->forwf = *(layer+1) & 0x0f;
-	
+
       /* calculate keys */
       crypto_SHA_digest(layer+12,16,hop->digest3);
       log(LOG_DEBUG,"create_onion() : First SHA pass performed.");
@@ -533,14 +530,14 @@
       log(LOG_DEBUG,"create_onion() : Keys generated.");
       /* set IV to zero */
       memset((void *)iv,0,16);
-	
+
       /* initialize cipher engines */
       if (! (hop->f_crypto = create_onion_cipher(hop->forwf, hop->digest3, iv, 1))) { 
         /* cipher initialization failed */
         log(LOG_ERR,"Could not create a crypto environment.");
         goto error;
       }
-	
+
       if (! (hop->b_crypto = create_onion_cipher(hop->backf, hop->digest2, iv, 0))) { 
         /* cipher initialization failed */
         log(LOG_ERR,"Could not create a crypto environment.");
@@ -731,7 +728,7 @@
 
 
 /* red black tree using Niels' tree.h. I used
-http://www.openbsd.org/cgi-bin/cvsweb/src/regress/sys/sys/tree/rb/rb-test.c?rev=1.2&content-type=text/x-cvsweb-markup
+http://www.openbsd.org/cgi-bin/cvsweb/src/regress/sys/sys/tree/rb/
 as my guide */
 
 #include "tree.h"

Index: or.h
===================================================================
RCS file: /home/or/cvsroot/src/or/or.h,v
retrieving revision 1.41
retrieving revision 1.42
diff -u -d -r1.41 -r1.42
--- or.h	31 Dec 2002 15:04:14 -0000	1.41
+++ or.h	26 Jan 2003 09:02:24 -0000	1.42
@@ -43,7 +43,7 @@
 #include "../common/version.h"
 
 #define MAXCONNECTIONS 200 /* upper bound on max connections.
-			      can be overridden by config file */
+                              can be lowered by config file */
 
 #define MAX_BUF_SIZE (640*1024)
 #define DEFAULT_BANDWIDTH_OP 102400
@@ -71,9 +71,12 @@
 #define CONN_TYPE_AP 7
 #define CONN_TYPE_DIR_LISTENER 8
 #define CONN_TYPE_DIR 9
+#define CONN_TYPE_DNSMASTER 10
 
 #define LISTENER_STATE_READY 0
 
+#define DNSMASTER_STATE_OPEN 0
+
 #define OP_CONN_STATE_AWAITING_KEYS 0
 #define OP_CONN_STATE_OPEN 1
 #if 0
@@ -96,8 +99,8 @@
 #define OR_CONN_STATE_SERVER_NONCE_WAIT 8 /* waiting for confirmation of nonce */
 #define OR_CONN_STATE_OPEN 9 /* ready to send/receive cells. */
 
-#define EXIT_CONN_STATE_CONNECTING_WAIT 0 /* waiting for standard structure or dest info */
-#define EXIT_CONN_STATE_CONNECTING 1
+#define EXIT_CONN_STATE_RESOLVING 0 /* waiting for response from dnsmaster */
+#define EXIT_CONN_STATE_CONNECTING 1 /* waiting for connect() to finish */
 #define EXIT_CONN_STATE_OPEN 2
 #if 0
 #define EXIT_CONN_STATE_CLOSE 3 /* flushing the buffer, then will close */
@@ -123,6 +126,16 @@
 //                                       (or if just one was sent, waiting for that one */
 //#define CIRCUIT_STATE_CLOSE 4 /* both acks received, connection is dead */ /* NOT USED */
 
+#define TOPIC_COMMAND_BEGIN 1
+#define TOPIC_COMMAND_DATA 2
+#define TOPIC_COMMAND_END 3
+#define TOPIC_COMMAND_CONNECTED 4
+#define TOPIC_COMMAND_SENDME 5
+
+#define TOPIC_HEADER_SIZE 4
+
+#define TOPIC_STATE_RESOLVING
+
 /* available cipher functions */
 #define ONION_CIPHER_IDENTITY 0
 #define ONION_CIPHER_DES 1
@@ -131,8 +144,16 @@
 /* default cipher function */
 #define ONION_DEFAULT_CIPHER ONION_CIPHER_DES
 
-#define RECEIVE_WINDOW_START 1000
-#define RECEIVE_WINDOW_INCREMENT 100
+#define CELL_DIRECTION_IN 1
+#define CELL_DIRECTION_OUT 2
+#define EDGE_EXIT 3 /* make direction and edge values not overlap, to help catch bugs */
+#define EDGE_AP 4
+
+#define CIRCWINDOW_START 1000
+#define CIRCWINDOW_INCREMENT 100
+
+#define TOPICWINDOW_START 500
+#define TOPICWINDOW_INCREMENT 50
 
 /* cell commands */
 #define CELL_PADDING 0
@@ -142,7 +163,6 @@
 #define CELL_ACK 4
 #define CELL_NACK 5
 #define CELL_SENDME 6
-#define CELL_CONNECTED 7
 
 #define CELL_PAYLOAD_SIZE 120
 #define CELL_NETWORK_SIZE 128
@@ -191,7 +211,7 @@
    /* dest host follows, terminated by a NULL */
 } socks4_t;
 
-typedef struct { 
+struct connection_t { 
 
 /* Used by all types: */
 
@@ -223,8 +243,8 @@
 
   uint32_t bandwidth; /* connection bandwidth */
   int receiver_bucket; /* when this hits 0, stop receiving. Every second we
-		       	* add 'bandwidth' to this, capping it at 10*bandwidth.
-		       	*/
+                        * add 'bandwidth' to this, capping it at 10*bandwidth.
+                        */
   struct timeval send_timeval; /* for determining when to send the next cell */
 
   /* link encryption */
@@ -239,8 +259,10 @@
 
 /* used by exit and ap: */
 
-//  ss_t ss; /* standard structure */
-//  int ss_received; /* size of ss, received so far */
+  uint32_t topic_id;
+  struct connection_t *next_topic;
+  int n_receive_topicwindow;
+  int p_receive_topicwindow;
 
   char socks_version; 
   char read_username;
@@ -251,18 +273,14 @@
   char dest_tmp[512];
   int dest_tmplen;
   
-#if 0 /* obsolete, we now use conn->bandwidth */
-  /* link info */
-  uint32_t min;
-  uint32_t max;
-#endif
-
   char *address; /* strdup into this, because free_connection frees it */
   crypto_pk_env_t *pkey; /* public RSA key for the other side */
 
   char nonce[8];
  
-} connection_t;
+};
+
+typedef struct connection_t connection_t;
 
 /* config stuff we know about the other ORs in the network */
 typedef struct {
@@ -302,22 +320,25 @@
   
 } crypt_path_t;
 
+struct data_queue_t {
+  cell_t *cell;
+  struct data_queue_t *next;
+};
+
 /* per-anonymous-connection struct */
 typedef struct {
-#if 0
-  uint32_t p_addr; /* all in network order */
-  uint16_t p_port;
-#endif
   uint32_t n_addr;
   uint16_t n_port;
   connection_t *p_conn;
   connection_t *n_conn;
-  int n_receive_window;
-  int p_receive_window;
+  int n_receive_circwindow;
+  int p_receive_circwindow;
 
   aci_t p_aci; /* connection identifiers */
   aci_t n_aci;
 
+  struct data_queue_t *data_queue; /* for queueing cells at the edges */
+
   unsigned char p_f; /* crypto functions */
   unsigned char n_f;
 
@@ -338,6 +359,12 @@
   void *next;
 } circuit_t;
 
+struct onion_queue_t {
+  circuit_t *circ;
+  struct data_queue_t *data_cells;
+  struct onion_queue_t *next;
+};
+
 #if 0
 typedef struct
 { 
@@ -399,7 +426,7 @@
    */
 
 int fetch_from_buf(char *string, int string_len,
-		                 char **buf, int *buflen, int *buf_datalen);
+                   char **buf, int *buflen, int *buf_datalen);
   /* if there is string_len bytes in buf, write them onto string,
    * then memmove buf back (that is, remove them from buf)
    */
@@ -429,13 +456,20 @@
 circuit_t *circuit_get_by_conn(connection_t *conn);
 circuit_t *circuit_enumerate_by_naddr_nport(circuit_t *start, uint32_t naddr, uint16_t nport);
 
-int circuit_deliver_data_cell(cell_t *cell, circuit_t *circ, connection_t *conn, int crypt_type);
+int circuit_deliver_data_cell_from_edge(cell_t *cell, circuit_t *circ, char edge_type);
+int circuit_deliver_data_cell(cell_t *cell, circuit_t *circ, int crypt_type);
 int circuit_crypt(circuit_t *circ, char *in, int inlen, char crypt_type);
 
+void circuit_resume_edge_reading(circuit_t *circ, int edge_type);
+int circuit_consider_stop_edge_reading(circuit_t *circ, int edge_type);
+int circuit_consider_sending_sendme(circuit_t *circ, int edge_type);
+
 int circuit_init(circuit_t *circ, int aci_type);
 void circuit_free(circuit_t *circ);
 void circuit_free_cpath(crypt_path_t **cpath, int cpathlen);
 
+
+
 void circuit_close(circuit_t *circ);
 
 void circuit_about_to_close_connection(connection_t *conn);
@@ -525,7 +559,7 @@
 int connection_package_raw_inbuf(connection_t *conn);
 int connection_process_cell_from_inbuf(connection_t *conn);
 
-int connection_consider_sending_sendme(connection_t *conn);
+int connection_consider_sending_sendme(connection_t *conn, int edge_type);
 int connection_finished_flushing(connection_t *conn);
 
 /********************************* connection_ap.c ****************************/
@@ -537,15 +571,16 @@
 int ap_handshake_create_onion(connection_t *conn);
 
 int ap_handshake_establish_circuit(connection_t *conn, unsigned int *route, int routelen, char *onion,
-		                                   int onionlen, crypt_path_t **cpath);
+                                   int onionlen, crypt_path_t **cpath);
 
 void ap_handshake_n_conn_open(connection_t *or_conn);
 
 int ap_handshake_send_onion(connection_t *ap_conn, connection_t *or_conn, circuit_t *circ);
+int ap_handshake_send_begin(connection_t *ap_conn, circuit_t *circ);
 
 int ap_handshake_socks_reply(connection_t *conn, char result);
 int connection_ap_send_connected(connection_t *conn);
-int connection_ap_process_data_cell(cell_t *cell, connection_t *conn);
+int connection_ap_process_data_cell(cell_t *cell, circuit_t *circ);
 
 int connection_ap_finished_flushing(connection_t *conn);
 
@@ -558,10 +593,10 @@
 int connection_exit_process_inbuf(connection_t *conn);
 int connection_exit_package_inbuf(connection_t *conn);
 int connection_exit_send_connected(connection_t *conn);
-int connection_exit_process_data_cell(cell_t *cell, connection_t *conn);
+int connection_exit_process_data_cell(cell_t *cell, circuit_t *circ);
 
 int connection_exit_finished_flushing(connection_t *conn);
-
+int connection_exit_connect(connection_t *conn);
 
 /********************************* connection_op.c ***************************/
 
@@ -610,6 +645,13 @@
 int connection_dir_create_listener(struct sockaddr_in *bindaddr);
 int connection_dir_handle_listener_read(connection_t *conn);
 
+/********************************* dns.c ***************************/
+
+int connection_dns_finished_flushing(connection_t *conn);
+int connection_dns_process_inbuf(connection_t *conn);
+int dns_tor_to_master(char *address);
+int dns_master_start(void);
+
 /********************************* main.c ***************************/
 
 void setprivatekey(crypto_pk_env_t *k);
@@ -622,6 +664,7 @@
 connection_t *connection_exact_get_by_addr_port(uint32_t addr, uint16_t port);
 
 connection_t *connection_get_by_type(int type);
+connection_t *connection_get_pendingresolve_by_address(char *address);
 
 void connection_watch_events(connection_t *conn, short events);
 void connection_stop_reading(connection_t *conn);
@@ -651,6 +694,7 @@
 int onion_pending_check(void);
 void onion_pending_process_one(void);
 void onion_pending_remove(circuit_t *circ);
+struct data_queue_t *data_queue_add(struct data_queue_t *list, cell_t *cell);
 void onion_pending_data_add(circuit_t *circ, cell_t *cell);
 
 /* uses a weighted coin with weight cw to choose a route length */



More information about the tor-commits mailing list