commit aae557431a0b1bbdb2d60d92aeaf98727317a35c Author: David Fifield david@bamsoftware.com Date: Thu Mar 29 18:40:13 2012 -0700
Do WebSocket linking of locals and remotes. --- connector.py | 144 ++++++++++++++++++++++++++++++++++++--------------------- 1 files changed, 91 insertions(+), 53 deletions(-)
diff --git a/connector.py b/connector.py index ec30401..07f0f3e 100755 --- a/connector.py +++ b/connector.py @@ -111,20 +111,6 @@ def format_addr(addr): return u"%s:%d" % (host, port)
-class BufferSocket(object): - """A socket containing a time of creation and a buffer of data received. The - buffer stores data to make the socket selectable again.""" - def __init__(self, fd): - self.fd = fd - self.birthday = time.time() - self.buf = "" - - def __getattr__(self, name): - return getattr(self.fd, name) - - def is_expired(self, timeout): - return time.time() - self.birthday > timeout -
def apply_mask(payload, mask_key): result = [] @@ -545,21 +531,36 @@ def report_pending(): log(u"locals (%d): %s" % (len(locals), [format_peername(x) for x in locals])) log(u"remotes (%d): %s" % (len(remotes), [format_peername(x) for x in remotes]))
+def proxy_chunk_local_to_remote(local, remote): + try: + data = local.recv(65536) + except socket.error, e: # Can be "Connection reset by peer". + log(u"Socket error from local: %s" % repr(str(e))) + remote.close() + return False + if not data: + log(u"EOF from local %s." % format_peername(local)) + local.close() + remote.close() + return False + else: + remote.send_chunk(data) + return True
-def proxy_chunk(fd_r, fd_w, label): +def proxy_chunk_remote_to_local(remote, local): try: - data = fd_r.recv(65536) + data = remote.recv(65536) except socket.error, e: # Can be "Connection reset by peer". - log(u"Socket error from %s: %s" % (label, repr(str(e)))) - fd_w.close() + log(u"Socket error from remote: %s" % repr(str(e))) + local.close() return False if not data: - log(u"EOF from %s %s." % (label, format_peername(fd_r))) - fd_r.close() - fd_w.close() + log(u"EOF from remote %s." % format_peername(remote)) + remote.close() + local.close() return False else: - fd_w.sendall(data) + local.send_chunk(data) return True
def receive_unlinked(fd, label): @@ -587,28 +588,59 @@ def receive_unlinked(fd, label): return True
def match_proxies(): - while locals and remotes: - remote = remotes.pop(0) - local = locals.pop(0) + while unlinked_remotes and unlinked_locals: + remote = unlinked_remotes.pop(0) + local = unlinked_locals.pop(0) remote_addr, remote_port = remote.getpeername() local_addr, local_port = local.getpeername() log(u"Linking %s and %s." % (format_peername(local), format_peername(remote))) + remote.partner = local + local.partner = remote if local.buf: - remote.sendall(local.buf) + remote.send_chunk(local.buf) if remote.buf: - local.sendall(remote.buf) - remote_for[local.fd] = remote.fd - local_for[remote.fd] = local.fd + local.send_chunk(remote.buf) + +class RemoteSocket(object): + def __init__(self, fd, protocols): + self.fd = fd + self.buf = "" + self.partner = None + self.dec = WebSocketBinaryDecoder(protocols, use_mask = True) + self.enc = WebSocketBinaryEncoder(protocols, use_mask = False) + + def send_chunk(self, data): + self.sendall(self.enc.encode(data)) + + def __getattr__(self, name): + return getattr(self.fd, name) + +class LocalSocket(object): + def __init__(self, fd): + self.fd = fd + self.buf = "" + self.partner = None + + def send_chunk(self, data): + self.partner.dec.feed(data) + while True: + data = self.partner.dec.read() + if not data: + break + self.sendall(data) + + def __getattr__(self, name): + return getattr(self.fd, name)
def main(): while True: - rset = [remote_s, local_s] + websocket_pending + socks_pending + remote_for.keys() + local_for.keys() + locals + remotes + rset = [remote_s, local_s] + websocket_pending + socks_pending + locals + remotes rset, _, _ = select.select(rset, [], []) for fd in rset: if fd == remote_s: remote_c, addr = fd.accept() log(u"Remote connection from %s." % format_addr(addr)) - websocket_pending.append(BufferSocket(remote_c)) + websocket_pending.append(remote_c) elif fd == local_s: local_c, addr = fd.accept() log(u"Local connection from %s." % format_addr(addr)) @@ -617,7 +649,9 @@ def main(): log(u"Data from WebSocket-pending %s." % format_addr(addr)) protocols = handle_websocket_request(fd) if protocols is not None: - remotes.append(fd) + wrapped = RemoteSocket(fd, protocols) + remotes.append(wrapped) + unlinked_remotes.append(wrapped) else: fd.close() websocket_pending.remove(fd) @@ -625,29 +659,33 @@ def main(): elif fd in socks_pending: log(u"SOCKS request from %s." % format_addr(addr)) if handle_socks_request(fd): - locals.append(BufferSocket(fd)) + wrapped = LocalSocket(fd) + locals.append(wrapped) + unlinked_locals.append(wrapped) else: fd.close() socks_pending.remove(fd) report_pending() - elif fd in local_for: - local = local_for[fd] - if not proxy_chunk(fd, local, "remote"): - del local_for[fd] - del remote_for[local] - elif fd in remote_for: - remote = remote_for[fd] - if not proxy_chunk(fd, remote, "local"): - del remote_for[fd] - del local_for[remote] - elif fd in locals: - if not receive_unlinked(fd, "local"): - locals.remove(fd) - report_pending() elif fd in remotes: - if not receive_unlinked(fd, "remote"): - remotes.remove(fd) + local = fd.partner + if local: + if not proxy_chunk_remote_to_local(fd, local): + remotes.remove(fd) + locals.remove(local) + else: + if not receive_unlinked(fd, "remote"): + remotes.remove(fd) report_pending() + elif fd in locals: + remote = fd.partner + if remote: + if not proxy_chunk_local_to_remote(fd, remote): + remotes.remove(remote) + locals.remove(fd) + else: + if not receive_unlinked(fd, "local"): + locals.remove(fd) + report_pending() match_proxies()
if __name__ == "__main__": @@ -692,14 +730,14 @@ if __name__ == "__main__": websocket_pending = [] # Remote connection sockets. remotes = [] + # Remotes not yet linked with a local. This is a subset of remotes. + unlinked_remotes = [] # New local sockets waiting to finish their SOCKS negotiation. socks_pending = [] # Local Tor sockets, after SOCKS negotiation. locals = [] - - # Bidirectional mapping between local sockets and remote sockets. - local_for = {} - remote_for = {} + # Locals not yet linked with a remote. This is a subset of remotes. + unlinked_locals = []
if options.daemonize: log(u"Daemonizing.")