[or-cvs] Move most of the queue and thread logic into a new common b...

Nick Mathewson nickm at seul.org
Sat Nov 19 19:42:33 UTC 2005


Update of /home/or/cvsroot/control/python
In directory moria:/tmp/cvs-serv4235

Modified Files:
	TorCtl.py TorCtl0.py TorCtl1.py 
Log Message:
Move most of the queue and thread logic into a new common base class in TorCtl; this means that v0 control connections are smart too.

Index: TorCtl.py
===================================================================
RCS file: /home/or/cvsroot/control/python/TorCtl.py,v
retrieving revision 1.12
retrieving revision 1.13
diff -u -d -r1.12 -r1.13
--- TorCtl.py	12 Nov 2005 21:42:27 -0000	1.12
+++ TorCtl.py	19 Nov 2005 19:42:31 -0000	1.13
@@ -11,6 +11,8 @@
 import re
 import struct
 import sys
+import threading
+import Queue
 
 class TorCtlError(Exception):
     "Generic error raised by TorControl code."
@@ -216,6 +218,165 @@
         """
         raise NotImplemented
 
+class _ConnectionBase:
+    def __init__(self):
+        self._s = None
+        self._handler = None
+        self._handleFn = None
+        self._sendLock = threading.RLock()
+        self._queue = Queue.Queue()
+        self._thread = None
+        self._closedEx = None
+        self._closed = 0
+        self._closeHandler = None
+        self._eventThread = None
+        self._eventQueue = Queue.Queue()
+
+    def set_event_handler(self, handler):
+        """Cause future events from the Tor process to be sent to 'handler'.
+        """
+        raise NotImplemented
+
+    def set_close_handler(self, handler):
+        """Call 'handler' when the Tor process has closed its connection or
+           given us an exception.  If we close normally, no arguments are
+           provided; otherwise, it will be called with an exception as its
+           argument.
+        """
+        self._closeHandler = handler
+
+    def close(self):
+        """Shut down this controller connection"""
+        self._sendLock.acquire()
+        try:
+            self._queue.put("CLOSE")
+            self._eventQueue.put("CLOSE")
+            self._s.close()
+            self._s = None
+            self._closed = 1
+        finally:
+            self._sendLock.release()
+
+    def _read_reply(self):
+        """DOCDOC"""
+        raise NotImplementd
+
+    def launch_thread(self, daemon=1):
+        """Launch a background thread to handle messages from the Tor process."""
+        assert self._thread is None
+        t = threading.Thread(target=self._loop)
+        if daemon:
+            t.setDaemon(daemon)
+        t.start()
+        self._thread = t
+        t = threading.Thread(target=self._eventLoop)
+        if daemon:
+            t.setDaemon(daemon)
+        t.start()
+        self._eventThread = t
+        return self._thread
+
+    def _loop(self):
+        """Main subthread loop: Read commands from Tor, and handle them either
+           as events or as responses to other commands.
+        """
+        while 1:
+            ex = None
+            try:
+                isEvent, reply = self._read_reply()
+            except:
+                self._err(sys.exc_info())
+                return
+
+            if isEvent:
+                if self._handler is not None:
+                    self._eventQueue.put(reply)
+            else:
+                cb = self._queue.get()
+                cb(reply)
+
+    def _err(self, (tp, ex, tb), fromEventLoop=0):
+        """DOCDOC"""
+        if self._s:
+            try:
+                self.close()
+            except:
+                pass
+        self._sendLock.acquire()
+        try:
+            self._closedEx = ex
+            self._closed = 1
+        finally:
+            self._sendLock.release()
+        while 1:
+            try:
+                cb = self._queue.get(timeout=0)
+                if cb != "CLOSE":
+                    cb("EXCEPTION")
+            except Queue.Empty:
+                break
+        if self._closeHandler is not None:
+            self._closeHandler(ex)
+        return
+
+    def _eventLoop(self):
+        """DOCDOC"""
+        while 1:
+            reply = self._eventQueue.get()
+            if reply == "CLOSE":
+                return
+            try:
+                self._handleFn(reply)
+            except:
+                self._err(sys.exc_info(), 1)
+                return
+
+    def _sendImpl(self, sendFn, msg):
+        """DOCDOC"""
+        if self._thread is None:
+            self.launch_thread(1)
+        # This condition will get notified when we've got a result...
+        condition = threading.Condition()
+        # Here's where the result goes...
+        result = []
+
+        if self._closedEx is not None:
+            raise self._closedEx
+        elif self._closed:
+            raise TorCtl.TorCtlClosed()
+
+        def cb(reply,condition=condition,result=result):
+            condition.acquire()
+            try:
+                result.append(reply)
+                condition.notify()
+            finally:
+                condition.release()
+
+        # Sends a message to Tor...
+        self._sendLock.acquire()
+        try:
+            self._queue.put(cb)
+            sendFn(msg)
+        finally:
+            self._sendLock.release()
+
+        # Now wait till the answer is in...
+        condition.acquire()
+        try:
+            while not result:
+                condition.wait()
+        finally:
+            condition.release()
+
+        # ...And handle the answer appropriately.
+        assert len(result) == 1
+        reply = result[0]
+        if reply == "EXCEPTION":
+            raise self._closedEx
+
+        return reply
+
 class DebugEventHandler(EventHandler):
     """Trivial event handler: dumps all events to stdout."""
     def __init__(self, out=None):

Index: TorCtl0.py
===================================================================
RCS file: /home/or/cvsroot/control/python/TorCtl0.py,v
retrieving revision 1.6
retrieving revision 1.7
diff -u -d -r1.6 -r1.7
--- TorCtl0.py	12 Nov 2005 21:42:27 -0000	1.6
+++ TorCtl0.py	19 Nov 2005 19:42:31 -0000	1.7
@@ -13,8 +13,7 @@
 import socket
 import struct
 import sys
-import threading
-import Queue
+import TorCtl
 
 __all__ = [
     "MSG_TYPE", "EVENT_TYPE", "CIRC_STATUS", "STREAM_STATUS",
@@ -270,54 +269,29 @@
     else:
         return s
 
-class Connection:
+class Connection(TorCtl._ConnectionBase):
     """A Connection represents a connection to the Tor process."""
     def __init__(self, sock):
         """Create a Connection to communicate with the Tor process over the
            socket 'sock'.
         """
+        TorCtl._ConnectionBase.__init__(self)
         self._s = sock
-        self._handler = None
-        self._sendLock = threading.RLock()
-        self._queue = Queue.Queue()
-        self._thread = None
 
     def set_event_handler(self, handler):
         """Cause future events from the Tor process to be sent to 'handler'.
         """
         self._handler = handler
+        self._handleFn = handler.handle0
 
-    def launch_thread(self, daemon=1):
-        """Launch a background thread to handle messages from the Tor process."""
-        assert self._thread is None
-        t = threading.Thread(target=self._loop)
-        if daemon:
-            t.setDaemon(daemon)
-        t.start()
-        self._thread = t
-        return t
-
-    def _send(self, type, body=""):
+    def _doSend(self, (type, body)):
         """Helper: Deliver a command of type 'type' and body 'body' to Tor.
         """
         self._s.sendall(pack_message(type, body))
 
-    def _loop(self):
-        """Main subthread loop: Read commands from Tor, and handle them either
-           as events or as responses to other commands.
-        """
-        while 1:
-            try:
-                length, tp, body = _receive_message(self._s)
-            except (OSError, socket.error), _:
-                if self._queue.get(timeout=0) != "CLOSE":
-                    raise
-            if tp == MSG_TYPE.EVENT:
-                if self._handler is not None:
-                    self._handler.handle0(body)
-            else:
-                cb = self._queue.get()
-                cb(tp, body)
+    def _read_reply(self):
+        length, tp, body = _receive_message(self._s)
+        return (tp == MSG_TYPE.EVENT), (tp, body)
 
     def _sendAndRecv(self, tp, msg="", expectedTypes=(MSG_TYPE.DONE,)):
         """Helper: Send a command of type 'tp' and body 'msg' to Tor,
@@ -325,40 +299,8 @@
            in expectedTypes, return a (tp,body) tuple.  If it is an error,
            raise ErrorReply.  Otherwise, raise ProtocolError.
         """
-        if self._thread is None:
-            self.launch_thread(1)
-        # This condition will get notified when we've got a result...
-        condition = threading.Condition()
-        # Here's where the result goes...
-        result = []
-
-        def cb(tp,body,condition=condition,result=result):
-            condition.acquire()
-            try:
-                result.append((tp, body))
-                condition.notify()
-            finally:
-                condition.release()
 
-        # Sends a message to Tor...
-        self._sendLock.acquire()
-        try:
-            self._queue.put(cb)
-            self._send(tp, msg)
-        finally:
-            self._sendLock.release()
-
-        # Now wait till the answer is in...
-        condition.acquire()
-        try:
-            while not result:
-                condition.wait()
-        finally:
-            condition.release()
-
-        # ...And handle the answer appropriately.
-        assert len(result) == 1
-        tp, msg = result[0]
+        tp, msg = self.sendImpl(self._doSend, (tp, msg))
         if tp in expectedTypes:
             return tp, msg
         elif tp == MSG_TYPE.ERROR:
@@ -371,15 +313,6 @@
         else:
             raise ProtocolError("Unexpectd message type 0x%04x"%tp)
 
-    def close(self):
-        """Shut down this controller connection"""
-        self._sendLock.acquire()
-        try:
-            self._queue.put("CLOSE")
-            self._s.close()
-        finally:
-            self._sendLock.release()
-
     def authenticate(self, secret=""):
         """Send an authenticating secret to Tor.  You'll need to call
            this method before other commands.  You need to use a
@@ -511,4 +444,3 @@
     def post_descriptor(self, descriptor):
         """Tell Tor about a new descriptor in 'descriptor'."""
         self._sendAndRecv(MSG_TYPE.POSTDESCRIPTOR,descriptor)
-

Index: TorCtl1.py
===================================================================
RCS file: /home/or/cvsroot/control/python/TorCtl1.py,v
retrieving revision 1.21
retrieving revision 1.22
diff -u -d -r1.21 -r1.22
--- TorCtl1.py	18 Nov 2005 20:12:42 -0000	1.21
+++ TorCtl1.py	19 Nov 2005 19:42:31 -0000	1.22
@@ -8,9 +8,7 @@
 import re
 import socket
 import sys
-import threading
 import types
-import Queue
 import TorCtl
 
 def _quote(s):
@@ -109,23 +107,15 @@
                 more.append(line)
             lines.append((code, s, _unescape_dots("".join(more))))
 
-class Connection:
+class Connection(TorCtl._ConnectionBase):
     """A Connection represents a connection to the Tor process."""
     def __init__(self, sock):
         """Create a Connection to communicate with the Tor process over the
            socket 'sock'.
         """
+        TorCtl._ConnectionBase.__init__(self)
         self._s = _BufSock(sock)
         self._debugFile = None
-        self._handler = None
-        self._sendLock = threading.RLock()
-        self._queue = Queue.Queue()
-        self._thread = None
-        self._closedEx = None
-        self._closed = 0
-        self._closeHandler = None
-        self._eventThread = None
-        self._eventQueue = Queue.Queue()
 
     def debug(self, f):
         """DOCDOC"""
@@ -135,97 +125,17 @@
         """Cause future events from the Tor process to be sent to 'handler'.
         """
         self._handler = handler
+        self._handleFn = handler.handle1
 
-    def set_close_handler(self, handler):
-        """Call 'handler' when the Tor process has closed its connection or
-           given us an exception.  If we close normally, no arguments are
-           provided; otherwise, it will be called with an exception as its
-           argument.
-        """
-        self._closeHandler = handler
-
-    def launch_thread(self, daemon=1):
-        """Launch a background thread to handle messages from the Tor process."""
-        assert self._thread is None
-        t = threading.Thread(target=self._loop)
-        if daemon:
-            t.setDaemon(daemon)
-        t.start()
-        self._thread = t
-        t = threading.Thread(target=self._eventLoop)
-        if daemon:
-            t.setDaemon(daemon)
-        t.start()
-        self._eventThread = t
-        return self._thread
-
-    def close(self):
-        """Shut down this controller connection"""
-        self._sendLock.acquire()
-        try:
-            self._queue.put("CLOSE")
-            self._eventQueue.put("CLOSE")
-            self._s.close()
-            self._s = None
-            self._closed = 1
-        finally:
-            self._sendLock.release()
-
-    def _loop(self):
-        """Main subthread loop: Read commands from Tor, and handle them either
-           as events or as responses to other commands.
-        """
-        while 1:
-            ex = None
-            try:
-                lines = _read_reply(self._s,self._debugFile)
-            except:
-                self._err(sys.exc_info())
-                return
-
-            assert lines
-            if lines[0][0][0] == "6":
-                if self._handler is not None:
-                    self._eventQueue.put(lines)
-            else:
-                cb = self._queue.get()
-                cb(lines)
-
-
-    def _err(self, (tp, ex, tb), fromEventLoop=0):
-        """DOCDOC"""
-        if self._s:
-            try:
-                self.close()
-            except:
-                pass
-        self._sendLock.acquire()
-        try:
-            self._closedEx = ex
-            self._closed = 1
-        finally:
-            self._sendLock.release()
-        while 1:
-            try:
-                cb = self._queue.get(timeout=0)
-                if cb != "CLOSE":
-                    cb("EXCEPTION")
-            except Queue.Empty:
-                break
-        if self._closeHandler is not None:
-            self._closeHandler(ex)
-        return
+    def _read_reply(self):
+        lines = _read_reply(self._s, self._debugFile)
+        isEvent = (lines and lines[0][0][0] == '6')
+        return isEvent, lines
 
-    def _eventLoop(self):
-        while 1:
-            lines = self._eventQueue.get()
-            if lines == "CLOSE":
-                return
-            try:
-                self._handler.handle1(lines)
-            except:
-                self._err(sys.exc_info(), 1)
-                return
+    def _doSend(self, msg):
+        if self._debugFile:
+            self._debugFile.write(">>> %s" % msg)
+        self._s.write(msg)
 
     def _sendAndRecv(self, msg="", expectedTypes=("250", "251")):
         """Helper: Send a command 'msg' to Tor, and wait for a command
@@ -233,54 +143,12 @@
            return a list of (tp,body,extra) tuples.  If it is an
            error, raise ErrorReply.  Otherwise, raise TorCtl.ProtocolError.
         """
-        if self._thread is None:
-            self.launch_thread(1)
-        # This condition will get notified when we've got a result...
-        condition = threading.Condition()
-        # Here's where the result goes...
-        result = []
-
-        if self._closedEx is not None:
-            raise self._closedEx
-        elif self._closed:
-            raise TorCtl.TorCtlClosed()
-
-        def cb(lines,condition=condition,result=result):
-            condition.acquire()
-            try:
-                result.append((lines))
-                condition.notify()
-            finally:
-                condition.release()
-
         if type(msg) == types.ListType:
             msg = "".join(msg)
-
         assert msg.endswith("\r\n")
 
-        # Sends a message to Tor...
-        self._sendLock.acquire()
-        try:
-            self._queue.put(cb)
-            if self._debugFile:
-                self._debugFile.write(">>> %s" % msg)
-            self._s.write(msg)
-        finally:
-            self._sendLock.release()
-
-        # Now wait till the answer is in...
-        condition.acquire()
-        try:
-            while not result:
-                condition.wait()
-        finally:
-            condition.release()
-
-        # ...And handle the answer appropriately.
-        assert len(result) == 1
-        lines = result[0]
-        if lines == "EXCEPTION":
-            raise self._closedEx
+        lines = self._sendImpl(self._doSend, msg)
+        print lines
         for tp, msg, _ in lines:
             if tp[0] in '45':
                 raise TorCtl.ErrorReply("%s %s"%(tp, msg))



More information about the tor-commits mailing list