[or-cvs] r10771: Added initial code for writer and completed local establishm (libevent-urz/trunk/loaders)

Urz at seul.org Urz at seul.org
Mon Jul 9 10:55:58 UTC 2007


Author: Urz
Date: 2007-07-09 06:55:57 -0400 (Mon, 09 Jul 2007)
New Revision: 10771

Modified:
   libevent-urz/trunk/loaders/IOCPloader.c
   libevent-urz/trunk/loaders/IOCPloader.h
Log:
Added initial code for writer and completed local establishment code.

Modified: libevent-urz/trunk/loaders/IOCPloader.c
===================================================================
--- libevent-urz/trunk/loaders/IOCPloader.c	2007-07-09 08:44:17 UTC (rev 10770)
+++ libevent-urz/trunk/loaders/IOCPloader.c	2007-07-09 10:55:57 UTC (rev 10771)
@@ -3,11 +3,12 @@
 
 #include "../event.h"
 #include "IOCPloader.h"
+#include "IOCPloader.h"
 
 #define NO_WORKERS 2
 
 HANDLE IOCP;
-HANDLE Threads[NO_WORKERS];
+HANDLE Threads[NO_WORKERS+1];
 WSAEVENT *eventList = NULL;
 connection *connList = NULL;
 DWORD listSize = 0;
@@ -36,7 +37,7 @@
  * initializing is partially done.
  */
 
-DWORD new_connection_obj(SOCKET *s, sa_bufferevent * bufevent) {
+DWORD IOCPloader_bind(SOCKET *s, sa_bufferevent * bufevent) {
     DWORD myListElem;
     
     // gain global list lock
@@ -54,17 +55,33 @@
     // get connection lock here
     connList[myListElem].localbuf = bufevent;
     
-    connList[myListElem].netbuf = malloc(sizeof(WSABUF));
-    connList[myListElem].netbuf->len = SUGGESTED_BUF_SIZE;
-    connList[myListElem].netbuf->buf = malloc(SUGGESTED_BUF_SIZE);
+    connList[myListElem].recvbuf = malloc(sizeof(WSABUF));
+    connList[myListElem].recvbuf->len = SUGGESTED_BUF_SIZE;
+    connList[myListElem].recvbuf->buf = malloc(SUGGESTED_BUF_SIZE);
     
+    connList[myListElem].sendbuf = malloc(sizeof(WSABUF));
+    connList[myListElem].sendbuf->len = SUGGESTED_BUF_SIZE;
+    connList[myListElem].sendbuf->buf = malloc(SUGGESTED_BUF_SIZE);
+    
     connList[myListElem].sock = s;
     
-    connList[myListElem].ol = calloc(sizeof(WSAOVERLAPPED));
-    connList[myListElem].ol->hEvent = (HANDLE) eventList[myListElem];
+    connList[myListElem].recvol = calloc(sizeof(OLOPERATION));
+    connList[myListElem].recvol.op = OP_RECV;
+    connList[myListElem].recvol.connIndex = myListElem;
+    connList[myListElem].recvol.ol = calloc(sizeof(WSAOVERLAPPED));
+    connList[myListElem].recvol.ol->hEvent = (HANDLE) eventList[myListElem];
+    
+    
+    connList[myListElem].sendol = calloc(sizeof(OLOPERATION));
+    connList[myListElem].sendol.op = OP_SEND;
+    connList[myListElem].sendol.connIndex = myListElem;
+    connList[myListElem].sendol.ol = calloc(sizeof(WSAOVERLAPPED));
     // free conneciton lock here
     
     // relase global list lock
+    
+    // Despite the name, this call associates the socket with the I/O Completion port
+    IOCP = CreateIoCompletionPort(connList[myListElem].sock, IOCP, (ULONG_PTR) myListElem, 0);
 }
 
 DWORD WINAPI IOCPLoaderMain(LPVOID nodata) {
@@ -89,85 +106,136 @@
     }
     // This concludes the code modified from the Startup function()
 
+    Threads[NO_WORKERS] = CreateThread(NULL, 0, &iocp_writer_thread, NULL, 0, NULL);
 }
 
+UINT iocp_writer_thread(LPVOID pParam) {
+    DWORD listpos;
+    size_t unloaded;
+    DWORD WSASendFags = 0;
+    while(1) {
+        for(listpos = 0; listpos < listSize; listpos++) {
+            // lock connList[listpos]
+            if(connList[listpos].canSend) {
+                // grab data that needs to be sent
+                unloaded = sa_bufferevent_unload(connList[listpos].localbuf,
+                        connList[listpos].sendbuf->buf, SUGGESTED_BUF_SIZE);
+                if(unloaded == 0) {
+                    // no data ready for sending
+                    // unlock connList[listpos]
+                    continue;
+                }
+                // TODO: How does WSASend know the amount of available data? Does it
+                // assume the WSABUF is full?
+                // remind us that sending is in progress and we can't overwrite the buffer
+                connList[listpos].canSend = 0;
+                WSASend(
+                    connList[listpos].sock,
+                    // Socket to send on
+                    connList[listpos].sendbuf,
+                    // 'array' of WSA buffers to use
+                    1,
+                    // array of size 1
+                    NULL,
+                    // this socket is overlapped, so can't retrieve sent size immeditately
+                    &WSASendFlags,
+                    // any send flags
+                    &connList[listpos].sendol,
+                    // overlapped struct for the operation.
+                    NULL
+                    // no completion routine
+                );
+            }
+            // unlock connList[listpos]
+        }
+    }
+}
+
 UINT iocp_worker_thread(LPVOID pParam) {
-    // From IOWorkerThreadProc
-    LPOVERLAPPED CompletedOverlapped;
+    OLOPERATION CompletedOverlapped;
     // See http://msdn2.microsoft.com/en-us/library/ms684342.aspx
-    DWORD WaitRet;
-    ULONG *CompletionKey;
+    DWORD CompleteSize;
+    DWORD CompletionKey;
     BOOL GQCSRet
     connection *Conn;
     UINT WSARecvRet;
     ULONG WSARecvFlags = MSG_WAITALL;
 
     while(1) {
-    
-        WaitRet = WaitForMultipleObjects( 
-            listSize,   
-            // number of socket events in array
-            eventList,  
-            // array of socket read event handles
-            FALSE,          
-            // wait until any one event happens
+        Conn = NULL;
+        
+        GQCSRet = GetQueuedCompletionStatus(
+            IOCP, // The IOCP to get statuses from
+            &CompleteSize, // "... receives the number of bytes transferred 
+                            //during an I/O operation that has completed."
+            (ULONG_PTR) &CompletionKey,
+            // A pointer to a variable that receives the completion key value 
+            // associated with the file handle whose I/O operation has completed.
+            // In IOCPS.cpp : (LPDWORD) &lpClientContext,
+            &CompletedOverlapped
+            // A pointer to a variable that receives the address of the OVERLAPPED
+            // structure that was specified when the completed I/O operation was started.
             INFINITE
-            // wait forever
+            // The number of milliseconds that the caller is willing to wait for a
+            // completion packet to appear at the completion port.
             );
-        
-        if(WaitRet > listSize) {
-            // An error happened.
+        if(!GQCSRet) {
+            printf("GetQueuedCompletionStatus error %d\n", GetLastError());
+            continue;
         }
         
+        
         // This is no error, a completed read event has occured.
-        // Get the connection ths applies to. See semantics of
-        // return value for WaitForMultipleObjects
-        // (http://msdn2.microsoft.com/en-us/library/ms687025.aspx)
-        // to understand why this works.
-        Conn = connList + WaitRet;
+        // Get the connection ths applies to.
+        Conn = &connList[CompletionKey];
         
-        // Now we have a completed read event, perform the operations
-        // required on the read data.
-        ReadComplete(Conn);
-        
-        
-        // Now the data has been removed from the WSABUF, we can reset the
-        // read to continue reading.
-        // http://msdn2.microsoft.com/en-us/library/ms741688.aspx
-        WSARecvRet = WSARecv(
-            Conn->sock,
-            // The socket to recieve from
-            Conn->netbuf,
-            // Pointer to an 'array' of WSABUFs.
-            1,
-            // The 'array' is of size 1
-            NULL, 
-            // This parameter would recieve the size in bytes of the
-            // read. However, because we are using overlapped, it doesn't.
-            &WSARecvFlags,
-            // Flags which control the operation of WSARecv. I belive MSG_WAITALL
-            // is the one we want.
-            Conn->ol, 
-            // The overlapped structure for the event.
-			NULL
-            // The callback - we are using events for this, so that's not important
-            );
-
+        // lock Conn
+        if(CompletedOverlapped.op == OP_SEND) {
+            // the connection has sent all data and is ready for more
+            Conn->canSend = 1;
+        } else if(CompletedOverlapped.op == OP_RECV) {
+            // Now we have a completed read event, perform the operations
+            // required on the read data.
+            ReadComplete(Conn, CompleteSize);
+            
+            // Now the data has been removed from the WSABUF, we can reset the
+            // read to continue reading.
+            // http://msdn2.microsoft.com/en-us/library/ms741688.aspx
+            WSARecvRet = WSARecv(
+                Conn->sock,
+                // The socket to recieve from
+                Conn->recvbuf,
+                // Pointer to an 'array' of WSABUFs.
+                1,
+                // The 'array' is of size 1
+                NULL, 
+                // This parameter would recieve the size in bytes of the
+                // read. However, because we are using overlapped, it doesn't.
+                &WSARecvFlags,
+                // Flags which control the operation of WSARecv. I belive MSG_WAITALL
+                // is the one we want.
+                &Conn->recvol, 
+                // The overlapped structure for the event.
+    			NULL
+                // The callback - we are using events for this, so that's not important
+                );
+        }
+        // unlock Conn
     }
 }
 
-void ReadComplete(connection *Conn) {
+void ReadComplete(connection *Conn, DWORD size) {
     BOOL GORRet;
-    DWORD TransferSize;
+    //DWORD TransferSize;
     DWORD Flags;
     size_t loadRet, toload, loaded;
     char *upto;
     
     
-    GORRet = WSAGetOverlappedResult(
+/*     GORRet = WSAGetOverlappedResult(
         Conn->sock,
         // The socket to get the read size from
-        Conn->ol,
+        &Conn->recvol,
         // The overlapped data structure relating to the read event
         &TransferSize,
         // Gets the size of the transfer
@@ -176,10 +244,10 @@
         // is completed (we waited for it) we don't really care.
         &Flags
         // Gets the flags for the operation
-        );
+        ); */
         
-    upto = Conn->netbuf.buf;
-    toload = (size_t) TransferSize;
+    upto = Conn->recvbuf.buf;
+    toload = (size_t) size // TransferSize;
     
     while(toload > 0) {
         loaded = sa_bufferevent_load(Conn, upto, toload);

Modified: libevent-urz/trunk/loaders/IOCPloader.h
===================================================================
--- libevent-urz/trunk/loaders/IOCPloader.h	2007-07-09 08:44:17 UTC (rev 10770)
+++ libevent-urz/trunk/loaders/IOCPloader.h	2007-07-09 10:55:57 UTC (rev 10771)
@@ -4,10 +4,26 @@
 
 #define SUGGESTED_BUF_SIZE 4096
 
+#define OP_SEND 1
+#define OP_RECV 2
+
 typedef struct {
+    /* 
+    This struct must maintain compatiblity with the OVERLAPPED struct.
+    As such, the first element must be an OVERLAPPED struct
+    */
+    OVERLAPPED *ol;
+    char op;
+    DWORD connIndex;
+} OLOPERATION;
+
+typedef struct {
     sa_bufferevent *localbuf;
-    WSABUF *netbuf;
+    WSABUF *recvbuf;
+    WSABUF *sendbuf;
     SOCKET sock;
-    OVERLAPPED *ol;
+    OLOPERATION recvol;
+    OLOPERATION sendol;
     // mutex lock
+    int canSend;
 } connection;
\ No newline at end of file



More information about the tor-commits mailing list