[or-cvs] r10512: Commit for 6th June. Coding complete todo: compile/bugfix/te (in libevent-urz/trunk: . doc)

Urz at seul.org Urz at seul.org
Wed Jun 6 11:31:04 UTC 2007


Author: Urz
Date: 2007-06-06 07:31:04 -0400 (Wed, 06 Jun 2007)
New Revision: 10512

Modified:
   libevent-urz/trunk/Makefile.am
   libevent-urz/trunk/buffer.c
   libevent-urz/trunk/doc/plan.txt
   libevent-urz/trunk/event-internal.h
   libevent-urz/trunk/event.h
   libevent-urz/trunk/sa_evbuffer.c
Log:
Commit for 6th June.
Coding complete
todo: compile/bugfix/testcode

Modified: libevent-urz/trunk/Makefile.am
===================================================================
--- libevent-urz/trunk/Makefile.am	2007-06-06 04:51:33 UTC (rev 10511)
+++ libevent-urz/trunk/Makefile.am	2007-06-06 11:31:04 UTC (rev 10512)
@@ -39,7 +39,7 @@
 endif
 
 libevent_la_SOURCES = event.c buffer.c evbuffer.c log.c event_tagging.c \
-	http.c evhttp.h http-internal.h evdns.c evdns.h strlcpy.c \
+	sa_evbuffer.c http.c evhttp.h http-internal.h evdns.c evdns.h strlcpy.c \
 	strlcpy-internal.h strlcpy-internal.h \
 	$(SYS_SRC)
 libevent_la_LIBADD = @LTLIBOBJS@ $(SYS_LIBS)

Modified: libevent-urz/trunk/buffer.c
===================================================================
--- libevent-urz/trunk/buffer.c	2007-06-06 04:51:33 UTC (rev 10511)
+++ libevent-urz/trunk/buffer.c	2007-06-06 11:31:04 UTC (rev 10512)
@@ -60,7 +60,7 @@
 #endif
 
 #include "event.h"
-#include "event_internal.h"
+#include "event-internal.h"
 
 void evbuffer_lock(struct evbuffer *buffer)
 {
@@ -328,16 +328,12 @@
 evbuffer_align(struct evbuffer *buf)
 {
     evbuffer_lock(buf);
-    evbuffer_align(buf);
-    evbuffer_unlock(buf);
-}
-
-static inline void
-evbuffer_align(struct evbuffer *buf)
-{
+    
 	memmove(buf->orig_buffer, buf->buffer, buf->off);
 	buf->buffer = buf->orig_buffer;
 	buf->misalign = 0;
+    
+    evbuffer_unlock(buf);
 }
 
 /* Expands the available space in the event buffer to at least datlen */
@@ -601,4 +597,4 @@
 	buffer->cbarg = cbarg;
     
     evbuffer_unlock(buffer);
-}
\ No newline at end of file
+}

Modified: libevent-urz/trunk/doc/plan.txt
===================================================================
--- libevent-urz/trunk/doc/plan.txt	2007-06-06 04:51:33 UTC (rev 10511)
+++ libevent-urz/trunk/doc/plan.txt	2007-06-06 11:31:04 UTC (rev 10512)
@@ -33,32 +33,22 @@
 to set the bufferevent flags to let the main thread know callbacks are ready, and to call
 a function to ensure the main event-loop is notified of events.
     - Mutex work is done.
-    - Notification function written, but not yet used.
-    - bufferevent pending callback flags not yet set.
+    - Notification function written, used as needed.
+    - bufferevent pending callback flags set as needed.
     
 Write a short read event handler for the event_loop half of the socketpair which
 simply reads all there is to be read (no blocking!) and calls the real read/write callbacks
 depending on what it reads.
     - Done
 
-    
-Functions yet to modify:
-int sa_bufferevent_base_set(struct event_base *base, struct bufferevent *bufev);
-int sa_bufferevent_priority_set(struct bufferevent *bufev, int pri);
-void sa_bufferevent_free(struct bufferevent *bufev);
-int sa_bufferevent_write(struct bufferevent *bufev, void *data, size_t size);
-int sa_bufferevent_write_buffer(struct bufferevent *bufev, struct evbuffer *buf);
-size_t sa_bufferevent_read(struct bufferevent *bufev, void *data, size_t size);
-int sa_bufferevent_enable(struct bufferevent *bufev, short event);
-int sa_bufferevent_disable(struct bufferevent *bufev, short event);
-void sa_bufferevent_settimeout(struct bufferevent *bufev,
-    int timeout_read, int timeout_write);
 
 TODOs for tomorrow:
-Modify functions above where needed.
 Write regression tests.
 Compile and Test.
 Write sample code
+Talk to nickm, re:
+    - Timeouts
+    - Automake
 
 Testing:
 Add test cases to libevent testing code... test/regress.c I believe

Modified: libevent-urz/trunk/event-internal.h
===================================================================
--- libevent-urz/trunk/event-internal.h	2007-06-06 04:51:33 UTC (rev 10511)
+++ libevent-urz/trunk/event-internal.h	2007-06-06 11:31:04 UTC (rev 10512)
@@ -68,7 +68,7 @@
 #define FD_CLOSEONEXEC(x)
 #endif
 
-LIST_HEAD(sa_evbuf_list_elem, struct sa_bufferevent) bufev_list_head;
+LIST_HEAD(sa_evbuf_list_elem, sa_bufferevent) bufev_list_head;
 
 #ifdef __cplusplus
 }

Modified: libevent-urz/trunk/event.h
===================================================================
--- libevent-urz/trunk/event.h	2007-06-06 04:51:33 UTC (rev 10511)
+++ libevent-urz/trunk/event.h	2007-06-06 11:31:04 UTC (rev 10512)
@@ -31,6 +31,7 @@
 extern "C" {
 #endif
 
+#include "compat/sys/queue.h"
 #include <stdarg.h>
 
 #ifdef WIN32
@@ -222,7 +223,7 @@
 	void *cbarg;
     
     #ifdef WIN32
-    HANDLE lock
+    HANDLE lock;
     #endif
   
 };
@@ -306,7 +307,7 @@
     u_char del_read_event_set;
     u_char del_write_event_set;
     
-    LIST_ENTRY(struct sa_bufferevent) list_elem;	
+    LIST_ENTRY(sa_bufferevent) list_elem;	
 };
 
 struct bufferevent *sa_bufferevent_new(evbuffercb readcb, 

Modified: libevent-urz/trunk/sa_evbuffer.c
===================================================================
--- libevent-urz/trunk/sa_evbuffer.c	2007-06-06 04:51:33 UTC (rev 10511)
+++ libevent-urz/trunk/sa_evbuffer.c	2007-06-06 11:31:04 UTC (rev 10512)
@@ -83,9 +83,11 @@
     for (np = bufev_list_head.lh_first; np != NULL; np = np->list_elem.le_next) {
         if(np->del_read_event_set) {
             (*np->readcb)(np, np->cbarg);
+            np->del_read_event_set = 0;
         }
         if(np->del_write_event_set) {
             (*np->writecb)(np, np->cbarg);
+            np->del_write_event_set = 0;
         }
     }
 }
@@ -146,147 +148,96 @@
 	return (event_add(ev, ptv));
 }
 
-/* 
- * This callback is executed when the size of the input buffer changes.
- * We use it to apply back pressure on the reading side.
+/*
+ * Used by a 'loader' thread to add data to a sa_bufferevent.
+ * The loader thread reads/makes up/produces data, and adds it
+ * to the sa_bufferevent with sa_bufferevent_load. The return
+ * value is the amount of data copied into the sa_bufferevent.
+ * If an error occurs, 0 will be returned to indicate that no
+ * data was copied.
  */
 
-void
-sa_bufferevent_read_pressure_cb(struct evbuffer *buf, size_t old, size_t now,
-    void *arg) {
-	struct sa_bufferevent *bufev = arg;
-	/* 
-	 * If we are below the watermark then reschedule reading if it's
-	 * still enabled.
-	 */
-	if (bufev->wm_read.high == 0 || now < bufev->wm_read.high) {
-		evbuffer_setcb(buf, NULL, NULL);
-
-		if (bufev->enabled & EV_READ)
-			sa_bufferevent_add(&bufev->ev_read, bufev->timeout_read);
-	}
-}
-
-static void
-sa_bufferevent_readcb(int fd, short event, void *arg)
+size_t
+sa_bufferevent_load(struct sa_bufferevent *bufev, void *data, size_t size)
 {
-	struct sa_bufferevent *bufev = arg;
 	int res = 0;
 	short what = EVBUFFER_READ;
 	size_t len;
-	int howmuch = -1;
+    size_t toload;
 
-	if (event == EV_TIMEOUT) {
-		what |= EVBUFFER_TIMEOUT;
-		goto error;
-	}
-
 	/*
 	 * If we have a high watermark configured then we don't want to
 	 * read more data than would make us reach the watermark.
 	 */
-	if (bufev->wm_read.high != 0)
-		howmuch = bufev->wm_read.high;
+	if (bufev->wm_read.high != 0 && ((size_t) bufev->wm_read.high < size)) {
+		toload = (size_t) bufev->wm_read.high;
+    } else {
+        toload = size;
+    }
 
-	res = evbuffer_read(bufev->input, fd, howmuch);
+    res = evbuffer_add(bufev->input, data, toload);
 	if (res == -1) {
-		if (errno == EAGAIN || errno == EINTR)
-			goto reschedule;
-		/* error case */
-		what |= EVBUFFER_ERROR;
-	} else if (res == 0) {
-		/* eof case */
-		what |= EVBUFFER_EOF;
+		return 0;
 	}
 
-	if (res <= 0)
-		goto error;
+	/*
+     * Since we do not directly control the load/unloading
+     * we cannot request more/less data be read/writen.
+     * (This comment will only make sense if you view this
+     * function side by side with bufferevent_readcb, from
+     * evbuffer.c which is where this function originated from)
+     */
 
-	sa_bufferevent_add(&bufev->ev_read, bufev->timeout_read);
-
-	/* See if this callbacks meets the water marks */
-	len = EVBUFFER_LENGTH(bufev->input);
-	if (bufev->wm_read.low != 0 && len < bufev->wm_read.low)
-		return;
-	if (bufev->wm_read.high != 0 && len > bufev->wm_read.high) {
-		struct evbuffer *buf = bufev->input;
-		event_del(&bufev->ev_read);
-
-		/* Now schedule a callback for us */
-		evbuffer_setcb(buf, sa_bufferevent_read_pressure_cb, bufev);
-		return;
-	}
-
 	/* Invoke the user callback - must always be called last */
-	if (bufev->readcb != NULL)
-		(*bufev->readcb)(bufev, bufev->cbarg);
-	return;
+	if (bufev->readcb != NULL) {
+		bufev->del_read_event_set = 1;
+        notify();
+    }
+	return toload;
 
- reschedule:
-	sa_bufferevent_add(&bufev->ev_read, bufev->timeout_read);
-	return;
-
- error:
-	(*bufev->errorcb)(bufev, what, bufev->cbarg);
 }
 
-static void
-sa_bufferevent_writecb(int fd, short event, void *arg)
+/*
+ * Used by an 'unloader' thread to pull data from a sa_bufferevent.
+ * The unloader thread writes/processes/forwards the data, and removes it
+ * from the sa_bufferevent with sa_bufferevent_unload. The return
+ * value is the amount of data copied into the provided buffer.
+ * If an error occurs, 0 will be returned to indicate that no
+ * data was copied.
+ * The amount of data copied will always be <= the size value supplied.
+ */
+
+size_t
+sa_bufferevent_unload(struct sa_bufferevent *bufev, void *data, size_t size)
 {
-	struct sa_bufferevent *bufev = arg;
 	int res = 0;
-	short what = EVBUFFER_WRITE;
+    size_t copysize;
+    
+    evbuffer_lock(bufev->output);
+    
+    if(EVBUFFER_LENGTH(bufev->output) < size) {
+        copysize = EVBUFFER_LENGTH(bufev->output);
+    } else {
+        copysize = size;
+    }
+    
+    if(size == 0)
+        return 0;
 
-	if (event == EV_TIMEOUT) {
-		what |= EVBUFFER_TIMEOUT;
-		goto error;
-	}
+    memcpy(data, bufev->output->buffer, copysize);
+    evbuffer_drain(bufev->output, copysize);
 
-	if (EVBUFFER_LENGTH(bufev->output)) {
-	    res = evbuffer_write(bufev->output, fd);
-	    if (res == -1) {
-#ifndef WIN32
-/*todo. evbuffer uses WriteFile when WIN32 is set. WIN32 system calls do not
- *set errno. thus this error checking is not portable*/
-		    if (errno == EAGAIN ||
-			errno == EINTR ||
-			errno == EINPROGRESS)
-			    goto reschedule;
-		    /* error case */
-		    what |= EVBUFFER_ERROR;
-
-#else
-				goto reschedule;
-#endif
-
-	    } else if (res == 0) {
-		    /* eof case */
-		    what |= EVBUFFER_EOF;
-	    }
-	    if (res <= 0)
-		    goto error;
-	}
-
-	if (EVBUFFER_LENGTH(bufev->output) != 0)
-		sa_bufferevent_add(&bufev->ev_write, bufev->timeout_write);
-
 	/*
 	 * Invoke the user callback if our buffer is drained or below the
 	 * low watermark.
 	 */
 	if (bufev->writecb != NULL &&
-	    EVBUFFER_LENGTH(bufev->output) <= bufev->wm_write.low)
-		(*bufev->writecb)(bufev, bufev->cbarg);
+	    EVBUFFER_LENGTH(bufev->output) <= bufev->wm_write.low) {
+        bufev->del_write_event_set = 1;
+        notify();
+    }
 
-	return;
-
- reschedule:
-	if (EVBUFFER_LENGTH(bufev->output) != 0)
-		sa_bufferevent_add(&bufev->ev_write, bufev->timeout_write);
-	return;
-
- error:
-	(*bufev->errorcb)(bufev, what, bufev->cbarg);
+	return copysize;
 }
 
 /*
@@ -344,13 +295,18 @@
 	return (bufev);
 }
 
+/*
+ * Again, one socketpair / event, so the priority can only be changed once.
+ * If we need it, I could probably implement prioritization within the
+ * sa_bufferevent callback, but that will still behave the same as
+ * a normal bufferevent.
+ */
+
 int
-sa_bufferevent_priority_set(struct sa_bufferevent *bufev, int priority)
+sa_bufferevent_priority_set(int priority)
 {
-	if (event_priority_set(&bufev->ev_read, priority) == -1)
+	if (event_priority_set(&evbuffer_del_event, priority) == -1)
 		return (-1);
-	if (event_priority_set(&bufev->ev_write, priority) == -1)
-		return (-1);
 
 	return (0);
 }
@@ -394,8 +350,10 @@
 		return (res);
 
 	/* If everything is okay, we need to schedule a write */
-	if (size > 0 && (bufev->enabled & EV_WRITE))
-		sa_bufferevent_add(&bufev->ev_write, bufev->timeout_write);
+	if (size > 0 && (bufev->enabled & EV_WRITE)) {
+        bufev->del_write_event_set = 1;
+        notify();
+    }
 
 	return (res);
 }
@@ -405,9 +363,13 @@
 {
 	int res;
 
+    evbuffer_lock(buf);
+    
 	res = sa_bufferevent_write(bufev, buf->buffer, buf->off);
 	if (res != -1)
 		evbuffer_drain(buf, buf->off);
+    
+    evbuffer_unlock(buf);
 
 	return (res);
 }
@@ -416,6 +378,8 @@
 sa_bufferevent_read(struct sa_bufferevent *bufev, void *data, size_t size)
 {
 	struct evbuffer *buf = bufev->input;
+    
+    evbuffer_lock(buf);
 
 	if (buf->off < size)
 		size = buf->off;
@@ -425,6 +389,8 @@
 
 	if (size)
 		evbuffer_drain(buf, size);
+    
+    evbuffer_unlock(buf);
 
 	return (size);
 }
@@ -432,15 +398,6 @@
 int
 sa_bufferevent_enable(struct sa_bufferevent *bufev, short event)
 {
-	if (event & EV_READ) {
-		if (sa_bufferevent_add(&bufev->ev_read, bufev->timeout_read) == -1)
-			return (-1);
-	}
-	if (event & EV_WRITE) {
-		if (sa_bufferevent_add(&bufev->ev_write, bufev->timeout_write) == -1)
-			return (-1);
-	}
-
 	bufev->enabled |= event;
 	return (0);
 }
@@ -448,15 +405,6 @@
 int
 sa_bufferevent_disable(struct sa_bufferevent *bufev, short event)
 {
-	if (event & EV_READ) {
-		if (event_del(&bufev->ev_read) == -1)
-			return (-1);
-	}
-	if (event & EV_WRITE) {
-		if (event_del(&bufev->ev_write) == -1)
-			return (-1);
-	}
-
 	bufev->enabled &= ~event;
 	return (0);
 }
@@ -495,15 +443,18 @@
 	    0, EVBUFFER_LENGTH(bufev->input), bufev);
 }
 
+/*
+ * Because we are using only one socketpair, and hence one event, we can only
+ * set the event_base for *all* sa_bufferevents at once. I don't know enough
+ * about what event_base(s) do, or how people use bufferevent_base_set to know
+ * if this is important or not
+ */
+
 int
-sa_bufferevent_base_set(struct event_base *base, struct sa_bufferevent *bufev)
+sa_bufferevent_base_set(struct event_base *base)
 {
 	int res;
 
-	res = event_base_set(base, &bufev->ev_read);
-	if (res == -1)
-		return (res);
-
-	res = event_base_set(base, &bufev->ev_write);
+	res = event_base_set(base, &evbuffer_del_event);
 	return (res);
 }



More information about the tor-commits mailing list