[tor-commits] [flashproxy/js] Do WebSocket linking of locals and remotes.

dcf at torproject.org dcf at torproject.org
Fri Mar 30 02:18:11 UTC 2012


commit aae557431a0b1bbdb2d60d92aeaf98727317a35c
Author: David Fifield <david at bamsoftware.com>
Date:   Thu Mar 29 18:40:13 2012 -0700

    Do WebSocket linking of locals and remotes.
---
 connector.py |  144 ++++++++++++++++++++++++++++++++++++---------------------
 1 files changed, 91 insertions(+), 53 deletions(-)

diff --git a/connector.py b/connector.py
index ec30401..07f0f3e 100755
--- a/connector.py
+++ b/connector.py
@@ -111,20 +111,6 @@ def format_addr(addr):
         return u"%s:%d" % (host, port)
 
 
-class BufferSocket(object):
-    """A socket containing a time of creation and a buffer of data received. The
-    buffer stores data to make the socket selectable again."""
-    def __init__(self, fd):
-        self.fd = fd
-        self.birthday = time.time()
-        self.buf = ""
-
-    def __getattr__(self, name):
-        return getattr(self.fd, name)
-
-    def is_expired(self, timeout):
-        return time.time() - self.birthday > timeout
-
 
 def apply_mask(payload, mask_key):
     result = []
@@ -545,21 +531,36 @@ def report_pending():
     log(u"locals  (%d): %s" % (len(locals), [format_peername(x) for x in locals]))
     log(u"remotes (%d): %s" % (len(remotes), [format_peername(x) for x in remotes]))
 
+def proxy_chunk_local_to_remote(local, remote):
+    try:
+        data = local.recv(65536)
+    except socket.error, e: # Can be "Connection reset by peer".
+        log(u"Socket error from local: %s" % repr(str(e)))
+        remote.close()
+        return False
+    if not data:
+        log(u"EOF from local %s." % format_peername(local))
+        local.close()
+        remote.close()
+        return False
+    else:
+        remote.send_chunk(data)
+        return True
 
-def proxy_chunk(fd_r, fd_w, label):
+def proxy_chunk_remote_to_local(remote, local):
     try:
-        data = fd_r.recv(65536)
+        data = remote.recv(65536)
     except socket.error, e: # Can be "Connection reset by peer".
-        log(u"Socket error from %s: %s" % (label, repr(str(e))))
-        fd_w.close()
+        log(u"Socket error from remote: %s" % repr(str(e)))
+        local.close()
         return False
     if not data:
-        log(u"EOF from %s %s." % (label, format_peername(fd_r)))
-        fd_r.close()
-        fd_w.close()
+        log(u"EOF from remote %s." % format_peername(remote))
+        remote.close()
+        local.close()
         return False
     else:
-        fd_w.sendall(data)
+        local.send_chunk(data)
         return True
 
 def receive_unlinked(fd, label):
@@ -587,28 +588,59 @@ def receive_unlinked(fd, label):
         return True
 
 def match_proxies():
-    while locals and remotes:
-        remote = remotes.pop(0)
-        local = locals.pop(0)
+    while unlinked_remotes and unlinked_locals:
+        remote = unlinked_remotes.pop(0)
+        local = unlinked_locals.pop(0)
         remote_addr, remote_port = remote.getpeername()
         local_addr, local_port = local.getpeername()
         log(u"Linking %s and %s." % (format_peername(local), format_peername(remote)))
+        remote.partner = local
+        local.partner = remote
         if local.buf:
-            remote.sendall(local.buf)
+            remote.send_chunk(local.buf)
         if remote.buf:
-            local.sendall(remote.buf)
-        remote_for[local.fd] = remote.fd
-        local_for[remote.fd] = local.fd
+            local.send_chunk(remote.buf)
+
+class RemoteSocket(object):
+    def __init__(self, fd, protocols):
+        self.fd = fd
+        self.buf = ""
+        self.partner = None
+        self.dec = WebSocketBinaryDecoder(protocols, use_mask = True)
+        self.enc = WebSocketBinaryEncoder(protocols, use_mask = False)
+
+    def send_chunk(self, data):
+        self.sendall(self.enc.encode(data))
+
+    def __getattr__(self, name):
+        return getattr(self.fd, name)
+
+class LocalSocket(object):
+    def __init__(self, fd):
+        self.fd = fd
+        self.buf = ""
+        self.partner = None
+
+    def send_chunk(self, data):
+        self.partner.dec.feed(data)
+        while True:
+            data = self.partner.dec.read()
+            if not data:
+                break
+            self.sendall(data)
+
+    def __getattr__(self, name):
+        return getattr(self.fd, name)
 
 def main():
     while True:
-        rset = [remote_s, local_s] + websocket_pending + socks_pending + remote_for.keys() + local_for.keys() + locals + remotes
+        rset = [remote_s, local_s] + websocket_pending + socks_pending + locals + remotes
         rset, _, _ = select.select(rset, [], [])
         for fd in rset:
             if fd == remote_s:
                 remote_c, addr = fd.accept()
                 log(u"Remote connection from %s." % format_addr(addr))
-                websocket_pending.append(BufferSocket(remote_c))
+                websocket_pending.append(remote_c)
             elif fd == local_s:
                 local_c, addr = fd.accept()
                 log(u"Local connection from %s." % format_addr(addr))
@@ -617,7 +649,9 @@ def main():
                 log(u"Data from WebSocket-pending %s." % format_addr(addr))
                 protocols = handle_websocket_request(fd)
                 if protocols is not None:
-                    remotes.append(fd)
+                    wrapped = RemoteSocket(fd, protocols)
+                    remotes.append(wrapped)
+                    unlinked_remotes.append(wrapped)
                 else:
                     fd.close()
                 websocket_pending.remove(fd)
@@ -625,29 +659,33 @@ def main():
             elif fd in socks_pending:
                 log(u"SOCKS request from %s." % format_addr(addr))
                 if handle_socks_request(fd):
-                    locals.append(BufferSocket(fd))
+                    wrapped = LocalSocket(fd)
+                    locals.append(wrapped)
+                    unlinked_locals.append(wrapped)
                 else:
                     fd.close()
                 socks_pending.remove(fd)
                 report_pending()
-            elif fd in local_for:
-                local = local_for[fd]
-                if not proxy_chunk(fd, local, "remote"):
-                    del local_for[fd]
-                    del remote_for[local]
-            elif fd in remote_for:
-                remote = remote_for[fd]
-                if not proxy_chunk(fd, remote, "local"):
-                    del remote_for[fd]
-                    del local_for[remote]
-            elif fd in locals:
-                if not receive_unlinked(fd, "local"):
-                    locals.remove(fd)
-                report_pending()
             elif fd in remotes:
-                if not receive_unlinked(fd, "remote"):
-                    remotes.remove(fd)
+                local = fd.partner
+                if local:
+                    if not proxy_chunk_remote_to_local(fd, local):
+                        remotes.remove(fd)
+                        locals.remove(local)
+                else:
+                    if not receive_unlinked(fd, "remote"):
+                        remotes.remove(fd)
                 report_pending()
+            elif fd in locals:
+                remote = fd.partner
+                if remote:
+                    if not proxy_chunk_local_to_remote(fd, remote):
+                        remotes.remove(remote)
+                        locals.remove(fd)
+                else:
+                    if not receive_unlinked(fd, "local"):
+                        locals.remove(fd)
+                    report_pending()
             match_proxies()
 
 if __name__ == "__main__":
@@ -692,14 +730,14 @@ if __name__ == "__main__":
     websocket_pending = []
     # Remote connection sockets.
     remotes = []
+    # Remotes not yet linked with a local. This is a subset of remotes.
+    unlinked_remotes = []
     # New local sockets waiting to finish their SOCKS negotiation.
     socks_pending = []
     # Local Tor sockets, after SOCKS negotiation.
     locals = []
-
-    # Bidirectional mapping between local sockets and remote sockets.
-    local_for = {}
-    remote_for = {}
+    # Locals not yet linked with a remote. This is a subset of remotes.
+    unlinked_locals = []
 
     if options.daemonize:
         log(u"Daemonizing.")





More information about the tor-commits mailing list