[tor-commits] [fog/master] Adds launching multiple chains. Now after finding the needed chains, all the pts needed by the chains are launched first. Then after each pt succeeds, if a chain has all its pts setup, it will launch.

infinity0 at torproject.org infinity0 at torproject.org
Fri Aug 1 16:50:37 UTC 2014


commit c3aaf32b2b57401b56e1991b451f8252981df349
Author: Quinn Jarrell <qjarrell at gosynapsify.com>
Date:   Wed Jul 16 21:48:55 2014 -0400

    Adds launching multiple chains. Now after finding the needed chains, all the pts needed by the chains are launched first. Then after each pt succeeds, if a chain has all its pts setup, it will launch.
---
 fog-client/fog-client   |  113 +++++++++++++++++++++++++++++------------------
 fog-client/fog/socks.py |    4 +-
 2 files changed, 73 insertions(+), 44 deletions(-)

diff --git a/fog-client/fog-client b/fog-client/fog-client
index 808504c..f651a83 100755
--- a/fog-client/fog-client
+++ b/fog-client/fog-client
@@ -17,6 +17,7 @@ from pyptlib.client import ClientTransportPlugin
 from subprocess import PIPE
 
 from twisted.internet.defer import Deferred, DeferredList
+from twisted.internet.error import CannotListenError
 from twisted.internet.stdio import StandardIO
 from twisted.internet.protocol import Factory, connectionDone
 from twisted.internet.endpoints import TCP4ClientEndpoint
@@ -291,7 +292,7 @@ if sys.platform == "win32":
 
             self.proto.makeConnection(self)
 
-def pt_launch_child(reactor, client, methodnames, pt_method_name, cmdline):
+def pt_launch_child(reactor, client, methodnames, chain_names, cmdline):
     """Launch a child PT and ensure it has the right transport methods."""
     cur_env = pt_child_env(ManagedTransportProtocolV1.protocol_version)
     environment = dict(cur_env + {
@@ -305,17 +306,74 @@ def pt_launch_child(reactor, client, methodnames, pt_method_name, cmdline):
     # we ought to pass reactor=reactor in below, but this breaks Twisted 12
     StandardIO(sub_protocol, stdin=sub_proc.stdout.fileno())
     methoddefers = [sub_protocol.whenCMethodsDone().addCallback(
-                        partial(pt_require_child, client, name, pt_method_name))
+                        partial(pt_require_child, client, name, chain_names))
                     for name in methodnames]
     return sub_proc, sub_protocol, methoddefers
 
-def pt_require_child(client, childmethod, pt_method_name, cmethods):
+def pt_require_child(client, childmethod, chain_names, cmethods):
     """Callback for checking a child PT has the right transport methods."""
     if childmethod not in cmethods:
-        client.reportMethodError(pt_method_name, "failed to start required child transport: %s" % childmethod)
+        for chain_name in chain_names:
+            client.reportMethodError(chain_name, "failed to start required child transport: %s" % childmethod)
         raise ValueError()
     return cmethods[childmethod]
 
+def pt_get_unique_transport_list(aliases, config):
+    """
+    Returns all the pts needed by the chains without duplicates
+    :param aliases list: The list of alias names requested by tor and intersected with the transports fog can serve
+    :param Config config: The configuration object
+    """
+    uniq_transports = set()
+    for alias in aliases:
+        for pt_name in config.alias_map[alias]:
+            uniq_transports.add(pt_name)
+    return list(uniq_transports)
+
+def pt_setup_transports(reactor, client, configuration, pt_names, chain_names):
+    """
+    Sets up the pluggable transports needed by the chains
+    :param twisted.internet.interfaces.IReactor reactor: Reactor to install this PT to.
+    :param pyptlib.client.ClientTransportPlugin client: PT client API.
+    :param Config configuration: The configuration object.
+    :param list pt_names: The list of pt names to setup.
+    :param list chain_names: The list of chain names to launch.
+    """
+    pt_defer_map = {}
+    for pt_name in pt_names:
+        if pt_name not in configuration.transport_map:
+            raise ValueError("Pluggable transport %s not found in transport_map. Check your configuration file." % pt_name)
+        pt_cmdline = configuration.transport_map[pt_name]
+        # TODO make this more than a one item list when fixing multiple transports launched by one ClientTransportPlugin line.
+        multi_pts = [pt_name]
+        # Find all the chains where this pt is used.
+        pt_chains = [chain_name for chain_name in chain_names if pt_name in configuration.alias_map[chain_name]]
+        _, _, defers = pt_launch_child(reactor, client, multi_pts, pt_chains, pt_cmdline)
+        for pt, defer in zip(multi_pts, defers):
+            pt_defer_map[pt] = defer
+    chains_finished_dlist = pt_setup_chains(reactor, client, configuration, chain_names, pt_defer_map)
+    chains_finished_dlist.addCallback(lambda x: client.reportMethodsEnd())
+
+def pt_setup_chains(reactor, client, configuration, chain_names, pt_defer_map):
+    """
+    Sets up each chain of pluggable transports
+    :param twisted.internet.interfaces.IReactor reactor: Reactor to install this PT to.
+    :param pyptlib.client.ClientTransportPlugin client: PT client API.
+    :param Config configuration: The configuration object.
+    :param list chain_names: The list of chain names to setup.
+    :param dict pt_defer_map: A map between each pt and the defer that will callback when the pt is successfully launched.
+    """
+    all_chains_defer_list = []
+    for chain_name in chain_names:
+        chain = configuration.alias_map[chain_name]
+        if len(chain) < 2:
+            raise ValueError("PT Chain %s does not contain enough transports." % chain)
+        chain_deferred_list = DeferredList([pt_defer_map[pt] for pt in set(chain)])
+        partial_funct = partial(pt_launch_interceptor, reactor, client, configuration, chain_name)
+        chain_deferred_list.addCallback(partial_funct)
+        all_chains_defer_list.append(chain_deferred_list)
+    return DeferredList(all_chains_defer_list)
+
 def pt_setup_socks_shim(pt_name, pt_chain, success_list, dest_address, dest_port, reactor, proxy_deferreds):
     """
     Launches a socks proxy server to link two PTs together.
@@ -395,7 +453,7 @@ def pt_launch_interceptor(reactor, client, configuration, pt_method_name, succes
         Ex: [(True, MethodSpec(name='dummy', protocol='socks4', addrport=('127.0.0.1', 58982), args=[], opts=[])),
             (True, MethodSpec(name='b64', protocol='socks4', addrport=('127.0.0.1', 58981), args=[], opts=[]))]
     """
-    logger.debug("launching interceptor")
+    logger.debug("launching interceptor for %s" % pt_method_name)
     pt_chain = configuration.alias_map[pt_method_name]
     success = all(r[0] for r in success_list if r[1].name in pt_chain)
     # failure was already reported by pt_require_child, just return
@@ -404,41 +462,13 @@ def pt_launch_interceptor(reactor, client, configuration, pt_method_name, succes
                         lambda dest_address, dest_port, pt_method_name, chain_finished:
                             pt_launch_chain(dest_address, dest_port, pt_chain, chain_finished, reactor, success_list))
     # TODO switch to using endpoints instead of listenTCP
-    interceptor = reactor.listenTCP(interface='127.0.0.1', port=0, factory=socks_interceptor)
-    interceptor_port = interceptor.getHost().port
+    try:
+        interceptor = reactor.listenTCP(interface='127.0.0.1', port=0, factory=socks_interceptor)
+        interceptor_port = interceptor.getHost().port
+    except CannotListenError:
+        client.reportMethodError(pt_method_name, " failed to launch SOCKS interceptor. The interceptor listenTCP failed.")
+        return
     client.reportMethodSuccess(pt_method_name, "socks4", ("127.0.0.1", interceptor_port))
-    client.reportMethodsEnd()
-
-def pt_setup_transports(reactor, client, configuration, pt_method_name):
-    """
-    Launches the PT processes.
-    :param twisted.internet.interfaces.IReactor reactor: Reactor to install this PT to.
-    :param pyptlib.client.ClientTransportPlugin client: PT client API.
-    :param Config configuration: The configuration structure for this pair.
-    :param str pt_method_name: The name of the pt chain to launch. Ex: "obfs3_flashproxy"
-    """
-    logger.debug("Setting up transports %s" % pt_method_name)
-    if pt_method_name in configuration.alias_map:
-        pt_chain = configuration.alias_map[pt_method_name]
-    else:
-        logger.error('Pluggable Transport Combination %s not found in configuration alias map.' % pt_method_name)
-        raise KeyError()
-
-    defer_list = []
-
-    if len(pt_chain) < 2:
-        raise ValueError("PT Chain %s does not contain enough transports." % pt_chain)
-
-    for pt in pt_chain:
-        if pt in configuration.transport_map:
-            pt_cmdline = configuration.transport_map[pt]
-        else:
-            raise ValueError("Pluggable transport %s not found in transport_map. Check your configuration file." % pt)
-        _, _, defer = pt_launch_child(reactor, client, [pt], pt_method_name, pt_cmdline)
-        defer_list.extend(defer)
-    whenAllDone = DeferredList(defer_list, consumeErrors=False)
-    whenAllDone.addCallback(lambda success_list: pt_launch_interceptor(reactor, client, configuration, pt_method_name, success_list))
-
 
 class Config():
     # Transport map links a pluggable transport name to the a commandline to launch it.
@@ -527,14 +557,13 @@ def main(*args):
     if not client.getTransports():
         logger.error("no transports to serve. pt_method_names may be invalid.")
         return 1
-
+    pt_method_names = pt_get_unique_transport_list(client.getTransports(), configuration)
     from twisted.internet import reactor
     auto_killall(1, cleanup=reactor.stop)
     #TODO Change from launching a single pair to launching multiple chains.
-    pt_setup_transports(reactor, client, configuration, pt_method_names[0])
+    pt_setup_transports(reactor, client, configuration, pt_method_names, client.getTransports())
     reactor.run(installSignalHandlers=0)
     return 0
 
 if __name__ == "__main__":
     sys.exit(main(*sys.argv[1:]))
-
diff --git a/fog-client/fog/socks.py b/fog-client/fog/socks.py
index b44135d..ec6d3d0 100644
--- a/fog-client/fog/socks.py
+++ b/fog-client/fog/socks.py
@@ -29,7 +29,7 @@ class SOCKSv4InterceptorProtocol(socks.SOCKSv4):
                 self.makeReply(91)
                 return
             def _chain_set_up(remote_address, remote_port):
-                logger.debug("chain finished, connecting %s:%s" % (remote_address, remote_port))
+                logger.debug("chain finished, connecting to %s:%s" % (remote_address, remote_port))
                 # Connect to our remote address instead of the requested one
                 d = self.connectClass(remote_address, remote_port, socks.SOCKSv4Outgoing, self)
                 d.addErrback(lambda result, self = self: self.makeReply(91))
@@ -56,4 +56,4 @@ class SOCKSv4InterceptorFactory(Factory):
         self._new_conn_callback = new_conn_callback
 
     def buildProtocol(self, addr):
-        return SOCKSv4InterceptorProtocol(self, self._pt_method_name)
\ No newline at end of file
+        return SOCKSv4InterceptorProtocol(self, self._pt_method_name)





More information about the tor-commits mailing list