[tor-commits] [fog/master] Added a modified socks protocol from obfsproxy. Stripped out references to pluggable transports..

infinity0 at torproject.org infinity0 at torproject.org
Fri Aug 1 16:50:36 UTC 2014


commit 49e865f46dec9534313f67dc57b3f0849a90f77d
Author: Quinn Jarrell <qjarrell at gosynapsify.com>
Date:   Wed Jun 11 17:06:30 2014 -0400

    Added a modified socks protocol from obfsproxy. Stripped out references to pluggable transports..
---
 fog/socks.py      |   59 +++++++++++++++
 obfs-flash-client |  205 +++++++++++++++++++++++++++++++++++------------------
 torrc             |    6 +-
 3 files changed, 198 insertions(+), 72 deletions(-)

diff --git a/fog/__init__.py b/fog/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/fog/socks.py b/fog/socks.py
new file mode 100644
index 0000000..e3b483b
--- /dev/null
+++ b/fog/socks.py
@@ -0,0 +1,59 @@
+from twisted.protocols import socks
+from twisted.internet.protocol import Factory
+import logging
+
+logger = logging.getLogger('obfs-flash-logger')
+
+class SOCKSv4InterceptorProtocol(socks.SOCKSv4):
+    """
+    A modified SOCKS protocol which extracts the requested ip and port
+    and redirects connections to the first pluggable transport in the chain.
+    """
+
+    def __init__(self, factory, pt_method_name):
+        """
+        :param twisted.internet.protocol.factory factory: The factory that launched this protocol
+        :param pt_method_name: The name of the chain to be launched when a new connection is received
+        """
+        self.factory = factory
+        self._pt_method_name = pt_method_name
+        socks.SOCKSv4.__init__(self)
+
+    def _dataReceived2(self, server, user, version, code, port):
+        """
+        Extracts the requested ip and port and redirects to a different address
+        """
+        if code == 1: # CONNECT
+            assert version == 4, "Bad version code: %s" % version
+            if not self.authorize(code, server, port, user):
+                self.makeReply(91)
+                return
+            def _chain_set_up(remote_address, remote_port):
+                logger.debug("chain finished, connecting %s:%s" % (remote_address, remote_port))
+                # Connect to our remote address instead of the requested one
+                d = self.connectClass(remote_address, remote_port, socks.SOCKSv4Outgoing, self)
+                d.addErrback(lambda result, self = self: self.makeReply(91))
+            self.factory._new_conn_callback(server, port, self._pt_method_name, _chain_set_up)
+            assert self.buf == "", "hmm, still stuff in buffer... %s" % repr(self.buf)
+        else:
+            super(SOCKSv4InterceptorProtocol, self)._dataReceived2(server, user, version, code, port)
+
+class SOCKSv4InterceptorFactory(Factory):
+
+    def __init__(self, pt_method_name, new_conn_callback):
+        """
+        :param str pt_method_name: The name of the pt_method that this factory is launching.
+        :param function new_conn_callback: The function to be called when a connection is made.
+            def new_conn_callback
+                :param str server: The ip address requested by the SOCKS client.
+                :param int port: The port requested by the SOCKS client.
+                :param str pt_method_name: The name of the pt_method this factory is a part of.
+                :param function chain_set_up: The function to be called when the chain has finished setting up.
+                            :param str remote_address: The address to relay the SOCKS request to.
+                            :param int remote_port: The port to to send the SOCKS request to.
+        """
+        self._pt_method_name = pt_method_name
+        self._new_conn_callback = new_conn_callback
+
+    def buildProtocol(self, addr):
+        return SOCKSv4InterceptorProtocol(self, self._pt_method_name)
\ No newline at end of file
diff --git a/obfs-flash-client b/obfs-flash-client
index 75622e6..486e714 100755
--- a/obfs-flash-client
+++ b/obfs-flash-client
@@ -24,6 +24,7 @@ from twisted.protocols.basic import LineReceiver
 from twisted.protocols.portforward import ProxyServer as _ProxyServer
 from twisted.python import log
 from txsocksx.client import SOCKS4ClientEndpoint
+from fog.socks import SOCKSv4InterceptorFactory
 
 import shlex
 
@@ -156,7 +157,7 @@ class ProxyServer(_ProxyServer):
     def connectProxyClient(self, client):
         raise NotImplementedError()
 
-class SOCKS4Wrapper(ProxyServer):
+class OneUseSOCKS4Wrapper(ProxyServer):
 
     def connectProxyClient(self, client):
         TCPPoint = TCP4ClientEndpoint(
@@ -167,19 +168,60 @@ class SOCKS4Wrapper(ProxyServer):
             self.factory.remote_host,
             self.factory.remote_port,
             TCPPoint)
+        # Store port for debugging messages before stopListening is called. listen_port will not have a port after stopListening is called.
+        stored_port = self.factory.listen_port.getHost().port
+        d_port_closed = self.factory.listen_port.stopListening()
+        d_port_closed.addCallback(
+            lambda x: logging.debug("Closed factory listener %s on port %s" % (self.factory, stored_port)))
+        d_port_closed.addErrback(
+            lambda x: logging.warn("Failed to close factory listener %s listening on port %s" % (self.factory, stored_port)))
         d = SOCKSPoint.connect(client)
+        d.chainDeferred(self.factory.d_connected)
         @d.addErrback
         def _gotError(error):
             log.err(error, "error connecting to SOCKS server")
 
-class SOCKS4WrapperFactory(Factory):
-    protocol = SOCKS4Wrapper
+class OneUseSOCKS4Factory(Factory):
+    protocol = OneUseSOCKS4Wrapper
 
     def __init__(self, local_host, local_port, remote_host, remote_port):
+        self._connected_once = False
         self.local_host = local_host
         self.local_port = local_port
         self.remote_host = remote_host
         self.remote_port = remote_port
+        self.d_connected = Deferred()
+        self.listen_port = None
+
+    def __str__(self):
+        return "OneUseSOCKS4Factory connecting %s:%s to %s:%s" % (self.local_host, self.local_port, self.remote_host, self.remote_port)
+
+    def __repr__(self):
+        return "OneUseSOCKS4Factory(%s, %s, %s, %s)" % (self.local_host, self.local_port, self.remote_host, self.remote_port)
+
+    def setListenPort(self, listen_port):
+        """
+        Sets the listen_port object.
+        :param function listen_port: The function returned from a ListenTCP call. Used to shutdown the port when a connection is made.
+        """
+        self.listen_port = listen_port
+
+    def whenConnected(self):
+        """
+        Returns a new Deferred that triggers when a connection is successfully made.
+        """
+        return branch(self.d_connected)
+
+    def buildProtocol(self, addr):
+        """
+        Only allows one protocol to be created. After that it always returns None
+        :param twisted.internet.interfaces.IAddress addr: an object implementing L{twisted.internet.interfaces.IAddress}
+        """
+        if self._connected_once:
+            return None
+        else:
+            self._connected_once = True
+            return Factory.buildProtocol(self, addr)
 
 if sys.platform == "win32":
     # TODO(infinity0): push this upstream to Twisted
@@ -246,8 +288,9 @@ def pt_require_child(client, childmethod, pt_method_name, cmethods):
         raise ValueError()
     return cmethods[childmethod]
 
-def pt_setup_socks_proxy(pt_name, pt_chain, success_list, dest_address, dest_port, reactor, listen_port=0):
-    """Launches a socks proxy server to link two PTs together.
+def pt_setup_socks_shim(pt_name, pt_chain, success_list, dest_address, dest_port, reactor, proxy_deferreds):
+    """
+    Launches a socks proxy server to link two PTs together.
     :param str pt_name: The name of the pt to send traffic to.
     :param list pt_chain: The list of PTs in this chain.
     :param list success_list: A list of tuples containing a launch status boolean, MethodSpec pairs.
@@ -256,27 +299,97 @@ def pt_setup_socks_proxy(pt_name, pt_chain, success_list, dest_address, dest_por
     :param str dest_address: The address for the next PT to send its results to.
     :param int dest_port: The port for the next PT to send to.
     :param twisted.internet.interfaces.IReactor reactor: Reactor to attack the TCP server to.
-    :param int listen_port. The port to for the TCP server to listen on. Default is chosen by the OS."""
 
+    :param list proxy_deferreds: This list has each factorys' deferred appended to it.
+
+    :returns twisted.internet.interfaces.IListeningPort: An IListeningPort used for shutting down a factory after a connection is made.
+    """
     methodspec = [r[1] for r in success_list if r[1].name == pt_name][0] # Returns the resulting methodspec.
-    proxy_server = reactor.listenTCP(interface='127.0.0.1', port=listen_port, factory=
-                                    SOCKS4WrapperFactory(methodspec.addrport[0], methodspec.addrport[1], dest_address, dest_port))
-    return proxy_server.getHost().port
+    factory = OneUseSOCKS4Factory(methodspec.addrport[0], methodspec.addrport[1], dest_address, dest_port)
+    # TODO switch to using endpoints instead of listenTCP
+    proxy_server = reactor.listenTCP(interface='127.0.0.1', port=0, factory=factory)
+    factory.setListenPort(proxy_server)
+    proxy_deferreds.append(factory.whenConnected())
+    logging.debug("launched %s on port %s with dest %s:%s" % (pt_name, proxy_server.getHost().port, dest_address, dest_port))
+    return proxy_server
+
+def pt_launch_chain(dest_address, dest_port, pt_chain, _chain_set_up, reactor, success_list):
+    """
+    Launches a chain of pluggable transports by connecting each pt with SOCKS proxies.
+    :param str dest_address: The bridge address to connect to.
+    :param int dest_port: The bridge port to connect to.
+    :param list pt_chain: The list of pt names to launch.
+    :param function _chain_set_up: The function to call when the shims have been set up.
+    :param twisted.internet.interfaces.IReactor reactor: Reactor to install this PT to.
+    :param list success_list: A list of tuples containing a launch status boolean, MethodSpec pairs.
+        Ex: [(True, MethodSpec(name='dummy', protocol='socks4', addrport=('127.0.0.1', 58982), args=[], opts=[])),
+            (True, MethodSpec(name='b64', protocol='socks4', addrport=('127.0.0.1', 58981), args=[], opts=[]))]
+    """
+    proxy_deferreds = []
+    last_pt_name = pt_chain[-1]
+    logging.debug("launching chain %s" % pt_chain)
+    # Initialize prev_server to the port picked by the last proxy server as that's the only one we know yet.
+    last_server = pt_setup_socks_shim(last_pt_name, pt_chain, success_list, dest_address, dest_port,
+                                    reactor, proxy_deferreds)
+    prev_server = last_server
+    for pt_name in reversed(pt_chain[:-1]):
+        # Loops through the pts linking them together through SOCKS proxies, skipping the last pt.
+        prev_server = pt_setup_socks_shim(pt_name, pt_chain, success_list, '127.0.0.1', prev_server.getHost().port,
+                                        reactor, proxy_deferreds)
+    def check_chain_all_connected(protocol_list):
+        """
+        Checks all the shims launched to see if they successfully connected.
+        :param list protocol_list: A list of tuples containing status boolean, twisted.protocols.portforward.ProxyClient pairs.
+            Ex: [(True, <twisted.protocols.portforward.ProxyClient instance at 0x10b825518>),
+                 (True, <twisted.protocols.portforward.ProxyClient instance at 0x10b829518>)]
+        """
+        if all([result[0] for result in protocol_list]):
+            logging.debug("All PT shims connected correctly")
+        else:
+            # At this point the SOCKS protocol is in communication mode so no need to call makeReply(91)
+            # This assumes that the child pluggable transport will shut down the connection cleanly.
+            failed_protocols = [x[1] for x in protocol_list if x[0] == False]
+            logging.error("Shims %s failed to connect." % failed_protocols)
+            raise ValueError()
+
+    finished = DeferredList(proxy_deferreds)
+    finished.addCallback(check_chain_all_connected)
+    _chain_set_up(prev_server.getHost().host, prev_server.getHost().port)
 
-def pt_launch_chain(reactor, client, callback_port, configuration, pt_method_name):
+def pt_launch_interceptor(reactor, client, configuration, pt_method_name, success_list):
     """
-    Launches a chain of pluggable transports.
+    Launches a SOCKS interceptor.
     :param twisted.internet.interfaces.IReactor reactor: Reactor to install this PT to.
     :param pyptlib.client.ClientTransportPlugin client: PT client API.
-    :param int callback_port: Local listen port for the first PT to connect to.
     :param Config configuration: The configuration structure for this pair.
     :param str pt_method_name: The name of the pt chain to launch. Ex: "obfs3_flashproxy"
+    :param list success_list: A list of tuples containing a launch status boolean, MethodSpec pairs.
+        Ex: [(True, MethodSpec(name='dummy', protocol='socks4', addrport=('127.0.0.1', 58982), args=[], opts=[])),
+            (True, MethodSpec(name='b64', protocol='socks4', addrport=('127.0.0.1', 58981), args=[], opts=[]))]
     """
-
-    # Temporary hardcoded ip/port
-    dest_address = '127.0.0.1'
-    dest_port = 2400
-
+    logging.debug("launching interceptor")
+    pt_chain = configuration.alias_map[pt_method_name]
+    success = all(r[0] for r in success_list if r[1].name in pt_chain)
+    # failure was already reported by pt_require_child, just return
+    if not success: return
+    socks_interceptor = SOCKSv4InterceptorFactory(pt_method_name,
+                        lambda dest_address, dest_port, pt_method_name, chain_finished:
+                            pt_launch_chain(dest_address, dest_port, pt_chain, chain_finished, reactor, success_list))
+    # TODO switch to using endpoints instead of listenTCP
+    interceptor = reactor.listenTCP(interface='127.0.0.1', port=0, factory=socks_interceptor)
+    interceptor_port = interceptor.getHost().port
+    client.reportMethodSuccess(pt_method_name, "socks4", ("127.0.0.1", interceptor_port))
+    client.reportMethodsEnd()
+
+def pt_setup_transports(reactor, client, configuration, pt_method_name):
+    """
+    Launches the PT processes.
+    :param twisted.internet.interfaces.IReactor reactor: Reactor to install this PT to.
+    :param pyptlib.client.ClientTransportPlugin client: PT client API.
+    :param Config configuration: The configuration structure for this pair.
+    :param str pt_method_name: The name of the pt chain to launch. Ex: "obfs3_flashproxy"
+    """
+    logging.debug("Setting up transports %s" % pt_method_name)
     if pt_method_name in configuration.alias_map:
         pt_chain = configuration.alias_map[pt_method_name]
     else:
@@ -296,40 +409,7 @@ def pt_launch_chain(reactor, client, callback_port, configuration, pt_method_nam
         _, _, defer = pt_launch_child(reactor, client, [pt], pt_method_name, pt_cmdline)
         defer_list.extend(defer)
     whenAllDone = DeferredList(defer_list, consumeErrors=False)
-
-    def allDone(success_list):
-        """
-        Connects the pluggable transports together through SOCKS proxies.
-        :param list success_list: A list of tuples containing a launch status boolean, MethodSpec pairs.
-        Ex: [(True, MethodSpec(name='dummy', protocol='socks4', addrport=('127.0.0.1', 58982), args=[], opts=[])),
-         (True, MethodSpec(name='b64', protocol='socks4', addrport=('127.0.0.1', 58981), args=[], opts=[]))]
-        """
-
-        success = all(r[0] for r in success_list if r[1].name in pt_chain)
-        # failure was already reported by pt_require_child, just return
-        if not success: return
-
-        #This TCP server forwards the data to the last pt, which then sends the data to the actual bridge address
-        last_pt_name = pt_chain[-1]
-        # Initialize prev_port to the first port picked by the last proxy as that's the only one we know yet.
-        listen_port = 0 if len(pt_chain) > 2 else callback_port
-        prev_port = pt_setup_socks_proxy(last_pt_name, pt_chain, success_list, dest_address, dest_port, reactor, listen_port)
-
-        if len(pt_chain) > 2:
-            for pt_name in reversed(pt_chain[2:-1]):
-                #Loops through the middle pts linking them together through SOCKS proxies, skipping the first and last pts.
-                prev_port = pt_setup_socks_proxy(pt_name, pt_chain, success_list, '127.0.0.1', prev_port, reactor)
-
-            # Links the second server to listen on the given callback_port
-            pt_name = pt_chain[1]
-            prev_port = pt_setup_socks_proxy(pt_name, pt_chain, success_list, '127.0.0.1', prev_port, reactor, listen_port=callback_port)
-
-        # now report success of the overall composed PT
-        first_pt_name = pt_chain[0]
-        first_methodspec = [r[1] for r in success_list if r[1].name == first_pt_name and r[1].name in pt_chain][0]
-        client.reportMethodSuccess(pt_method_name, first_methodspec.protocol, first_methodspec.addrport)
-        client.reportMethodsEnd()
-    whenAllDone.addCallback(allDone)
+    whenAllDone.addCallback(lambda success_list: pt_launch_interceptor(reactor, client, configuration, pt_method_name, success_list))
 
 class Config():
     # Transport map links a pluggable transport name to the a commandline to launch it.
@@ -399,17 +479,13 @@ class Config():
                 raise KeyError('Transport map is missing pluggable transport %s needed for chain %s. Check your configuration file for a ClientTransportPlugin line can launch %s' % (pt_name, alias_name, pt_name))
         alias_map[alias_name] = alias_path
 
-def obfs3_flashproxy(reactor, client, callback_port, fp_remote, fp_args=[], fp_local=0):
+def obfs3_flashproxy(fp_remote, fp_args=[], fp_local=0):
     """
     Set up the obfs3_flashproxy combined PT.
-
-    :param twisted.internet.interfaces.IReactor reactor: Reactor to install this PT to.
-    :param pyptlib.client.ClientTransportPlugin client: PT client API.
-    :param int callback_port: Local listen port for obfsproxy to connect to.
     :param str fp_remote: Listen address for remote flashproxy connections.
+    :param str fp_args: The arguments to pass to the flashproxy connections.
     :param int fp_local: Local listen port for local flashproxy connections.
     """
-
     ob_client = os.getenv("OBFSPROXY", "obfsproxy")
     fp_client = os.getenv("FLASHPROXY_CLIENT", "flashproxy-client")
 
@@ -420,14 +496,10 @@ def obfs3_flashproxy(reactor, client, callback_port, fp_remote, fp_args=[], fp_l
     alias_map = {'obfs3_flashproxy': ['obfs3', 'flashproxy']}
 
     configuration = Config(transport_map, alias_map)
-    pt_launch_chain(reactor, client, callback_port, configuration, "obfs3_flashproxy") # Launch
+    return configuration
 
 def main(*args):
     parser = argparse.ArgumentParser()
-    parser.add_argument("callback_port", help="local listen port for obfsproxy to "
-        "connect to. This must match the appropriate bridge line in your " # TODO print bridge line
-        "torrc.",
-        metavar='PORT', type=int)
     parser.add_argument("fp_remote", help="remote connections listen address "
         "for flashproxy, default %(default)s",
         metavar='REMOTE:PORT', nargs='?', default=":9000")
@@ -451,7 +523,7 @@ def main(*args):
         pt_method_names = configuration.alias_map.keys()
     else:
         pt_method_names = ["obfs3_flashproxy"]
-
+        configuration = obfs3_flashproxy(opts.fp_remote, opts.fp_arg or [])
     client = ClientTransportPlugin()
     client.init(pt_method_names) # Initialize our possible methods to all the chains listed by the fog file and stored in alias map.
     if not client.getTransports():
@@ -461,12 +533,7 @@ def main(*args):
     from twisted.internet import reactor
     auto_killall(1, cleanup=reactor.stop)
     #TODO Change from launching a single pair to launching multiple chains.
-    if configuration:
-        pt_launch_chain(reactor, client, opts.callback_port, configuration, pt_method_names[0])
-    else:
-        logging.warn("No configuration file specified. Defaulting to launching obfs3|flashproxy pair.")
-        obfs3_flashproxy(reactor, client, opts.callback_port, opts.fp_remote, opts.fp_arg or [])
-
+    pt_setup_transports(reactor, client, configuration, pt_method_names[0])
     reactor.run(installSignalHandlers=0)
     return 0
 
diff --git a/torrc b/torrc
index 7dc78f4..80da3f3 100644
--- a/torrc
+++ b/torrc
@@ -1,7 +1,7 @@
 UseBridges 1
-Bridge obfs3_flashproxy 127.0.0.1:2334
+Bridge obfs3_flashproxy 127.0.0.1:0
 LearnCircuitBuildTimeout 0
 CircuitBuildTimeout 300
-ClientTransportPlugin obfs3_flashproxy exec ./obfs-flash-client 2334 --fp-arg=--register
+ClientTransportPlugin obfs3_flashproxy exec ./obfs-flash-client --fp-arg=--register
 # use a different facilitator
-#ClientTransportPlugin obfs3_flashproxy exec ./obfs-flash-client --fp-arg=-f --fp-arg=http://siteb.fp-facilitator.org/fac/ --fp-arg=--register-methods=http 2334
+#ClientTransportPlugin obfs3_flashproxy exec ./obfs-flash-client --fp-arg=-f --fp-arg=http://siteb.fp-facilitator.org/fac/ --fp-arg=--register-methods=http





More information about the tor-commits mailing list