commit c3aaf32b2b57401b56e1991b451f8252981df349 Author: Quinn Jarrell qjarrell@gosynapsify.com Date: Wed Jul 16 21:48:55 2014 -0400
Adds launching multiple chains. Now after finding the needed chains, all the pts needed by the chains are launched first. Then after each pt succeeds, if a chain has all its pts setup, it will launch. --- fog-client/fog-client | 113 +++++++++++++++++++++++++++++------------------ fog-client/fog/socks.py | 4 +- 2 files changed, 73 insertions(+), 44 deletions(-)
diff --git a/fog-client/fog-client b/fog-client/fog-client index 808504c..f651a83 100755 --- a/fog-client/fog-client +++ b/fog-client/fog-client @@ -17,6 +17,7 @@ from pyptlib.client import ClientTransportPlugin from subprocess import PIPE
from twisted.internet.defer import Deferred, DeferredList +from twisted.internet.error import CannotListenError from twisted.internet.stdio import StandardIO from twisted.internet.protocol import Factory, connectionDone from twisted.internet.endpoints import TCP4ClientEndpoint @@ -291,7 +292,7 @@ if sys.platform == "win32":
self.proto.makeConnection(self)
-def pt_launch_child(reactor, client, methodnames, pt_method_name, cmdline): +def pt_launch_child(reactor, client, methodnames, chain_names, cmdline): """Launch a child PT and ensure it has the right transport methods.""" cur_env = pt_child_env(ManagedTransportProtocolV1.protocol_version) environment = dict(cur_env + { @@ -305,17 +306,74 @@ def pt_launch_child(reactor, client, methodnames, pt_method_name, cmdline): # we ought to pass reactor=reactor in below, but this breaks Twisted 12 StandardIO(sub_protocol, stdin=sub_proc.stdout.fileno()) methoddefers = [sub_protocol.whenCMethodsDone().addCallback( - partial(pt_require_child, client, name, pt_method_name)) + partial(pt_require_child, client, name, chain_names)) for name in methodnames] return sub_proc, sub_protocol, methoddefers
-def pt_require_child(client, childmethod, pt_method_name, cmethods): +def pt_require_child(client, childmethod, chain_names, cmethods): """Callback for checking a child PT has the right transport methods.""" if childmethod not in cmethods: - client.reportMethodError(pt_method_name, "failed to start required child transport: %s" % childmethod) + for chain_name in chain_names: + client.reportMethodError(chain_name, "failed to start required child transport: %s" % childmethod) raise ValueError() return cmethods[childmethod]
+def pt_get_unique_transport_list(aliases, config): + """ + Returns all the pts needed by the chains without duplicates + :param aliases list: The list of alias names requested by tor and intersected with the transports fog can serve + :param Config config: The configuration object + """ + uniq_transports = set() + for alias in aliases: + for pt_name in config.alias_map[alias]: + uniq_transports.add(pt_name) + return list(uniq_transports) + +def pt_setup_transports(reactor, client, configuration, pt_names, chain_names): + """ + Sets up the pluggable transports needed by the chains + :param twisted.internet.interfaces.IReactor reactor: Reactor to install this PT to. + :param pyptlib.client.ClientTransportPlugin client: PT client API. + :param Config configuration: The configuration object. + :param list pt_names: The list of pt names to setup. + :param list chain_names: The list of chain names to launch. + """ + pt_defer_map = {} + for pt_name in pt_names: + if pt_name not in configuration.transport_map: + raise ValueError("Pluggable transport %s not found in transport_map. Check your configuration file." % pt_name) + pt_cmdline = configuration.transport_map[pt_name] + # TODO make this more than a one item list when fixing multiple transports launched by one ClientTransportPlugin line. + multi_pts = [pt_name] + # Find all the chains where this pt is used. + pt_chains = [chain_name for chain_name in chain_names if pt_name in configuration.alias_map[chain_name]] + _, _, defers = pt_launch_child(reactor, client, multi_pts, pt_chains, pt_cmdline) + for pt, defer in zip(multi_pts, defers): + pt_defer_map[pt] = defer + chains_finished_dlist = pt_setup_chains(reactor, client, configuration, chain_names, pt_defer_map) + chains_finished_dlist.addCallback(lambda x: client.reportMethodsEnd()) + +def pt_setup_chains(reactor, client, configuration, chain_names, pt_defer_map): + """ + Sets up each chain of pluggable transports + :param twisted.internet.interfaces.IReactor reactor: Reactor to install this PT to. + :param pyptlib.client.ClientTransportPlugin client: PT client API. + :param Config configuration: The configuration object. + :param list chain_names: The list of chain names to setup. + :param dict pt_defer_map: A map between each pt and the defer that will callback when the pt is successfully launched. + """ + all_chains_defer_list = [] + for chain_name in chain_names: + chain = configuration.alias_map[chain_name] + if len(chain) < 2: + raise ValueError("PT Chain %s does not contain enough transports." % chain) + chain_deferred_list = DeferredList([pt_defer_map[pt] for pt in set(chain)]) + partial_funct = partial(pt_launch_interceptor, reactor, client, configuration, chain_name) + chain_deferred_list.addCallback(partial_funct) + all_chains_defer_list.append(chain_deferred_list) + return DeferredList(all_chains_defer_list) + def pt_setup_socks_shim(pt_name, pt_chain, success_list, dest_address, dest_port, reactor, proxy_deferreds): """ Launches a socks proxy server to link two PTs together. @@ -395,7 +453,7 @@ def pt_launch_interceptor(reactor, client, configuration, pt_method_name, succes Ex: [(True, MethodSpec(name='dummy', protocol='socks4', addrport=('127.0.0.1', 58982), args=[], opts=[])), (True, MethodSpec(name='b64', protocol='socks4', addrport=('127.0.0.1', 58981), args=[], opts=[]))] """ - logger.debug("launching interceptor") + logger.debug("launching interceptor for %s" % pt_method_name) pt_chain = configuration.alias_map[pt_method_name] success = all(r[0] for r in success_list if r[1].name in pt_chain) # failure was already reported by pt_require_child, just return @@ -404,41 +462,13 @@ def pt_launch_interceptor(reactor, client, configuration, pt_method_name, succes lambda dest_address, dest_port, pt_method_name, chain_finished: pt_launch_chain(dest_address, dest_port, pt_chain, chain_finished, reactor, success_list)) # TODO switch to using endpoints instead of listenTCP - interceptor = reactor.listenTCP(interface='127.0.0.1', port=0, factory=socks_interceptor) - interceptor_port = interceptor.getHost().port + try: + interceptor = reactor.listenTCP(interface='127.0.0.1', port=0, factory=socks_interceptor) + interceptor_port = interceptor.getHost().port + except CannotListenError: + client.reportMethodError(pt_method_name, " failed to launch SOCKS interceptor. The interceptor listenTCP failed.") + return client.reportMethodSuccess(pt_method_name, "socks4", ("127.0.0.1", interceptor_port)) - client.reportMethodsEnd() - -def pt_setup_transports(reactor, client, configuration, pt_method_name): - """ - Launches the PT processes. - :param twisted.internet.interfaces.IReactor reactor: Reactor to install this PT to. - :param pyptlib.client.ClientTransportPlugin client: PT client API. - :param Config configuration: The configuration structure for this pair. - :param str pt_method_name: The name of the pt chain to launch. Ex: "obfs3_flashproxy" - """ - logger.debug("Setting up transports %s" % pt_method_name) - if pt_method_name in configuration.alias_map: - pt_chain = configuration.alias_map[pt_method_name] - else: - logger.error('Pluggable Transport Combination %s not found in configuration alias map.' % pt_method_name) - raise KeyError() - - defer_list = [] - - if len(pt_chain) < 2: - raise ValueError("PT Chain %s does not contain enough transports." % pt_chain) - - for pt in pt_chain: - if pt in configuration.transport_map: - pt_cmdline = configuration.transport_map[pt] - else: - raise ValueError("Pluggable transport %s not found in transport_map. Check your configuration file." % pt) - _, _, defer = pt_launch_child(reactor, client, [pt], pt_method_name, pt_cmdline) - defer_list.extend(defer) - whenAllDone = DeferredList(defer_list, consumeErrors=False) - whenAllDone.addCallback(lambda success_list: pt_launch_interceptor(reactor, client, configuration, pt_method_name, success_list)) -
class Config(): # Transport map links a pluggable transport name to the a commandline to launch it. @@ -527,14 +557,13 @@ def main(*args): if not client.getTransports(): logger.error("no transports to serve. pt_method_names may be invalid.") return 1 - + pt_method_names = pt_get_unique_transport_list(client.getTransports(), configuration) from twisted.internet import reactor auto_killall(1, cleanup=reactor.stop) #TODO Change from launching a single pair to launching multiple chains. - pt_setup_transports(reactor, client, configuration, pt_method_names[0]) + pt_setup_transports(reactor, client, configuration, pt_method_names, client.getTransports()) reactor.run(installSignalHandlers=0) return 0
if __name__ == "__main__": sys.exit(main(*sys.argv[1:])) - diff --git a/fog-client/fog/socks.py b/fog-client/fog/socks.py index b44135d..ec6d3d0 100644 --- a/fog-client/fog/socks.py +++ b/fog-client/fog/socks.py @@ -29,7 +29,7 @@ class SOCKSv4InterceptorProtocol(socks.SOCKSv4): self.makeReply(91) return def _chain_set_up(remote_address, remote_port): - logger.debug("chain finished, connecting %s:%s" % (remote_address, remote_port)) + logger.debug("chain finished, connecting to %s:%s" % (remote_address, remote_port)) # Connect to our remote address instead of the requested one d = self.connectClass(remote_address, remote_port, socks.SOCKSv4Outgoing, self) d.addErrback(lambda result, self = self: self.makeReply(91)) @@ -56,4 +56,4 @@ class SOCKSv4InterceptorFactory(Factory): self._new_conn_callback = new_conn_callback
def buildProtocol(self, addr): - return SOCKSv4InterceptorProtocol(self, self._pt_method_name) \ No newline at end of file + return SOCKSv4InterceptorProtocol(self, self._pt_method_name)
tor-commits@lists.torproject.org