commit 7d604af457a17fdf6cb433ce46a3f691ad566658 Author: Arturo Filastò hellais@torproject.org Date: Fri Jul 13 21:16:26 2012 +0200
Implement client side component of b0wser --- ooni/plugins/b0wser.py | 85 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 files changed, 85 insertions(+), 0 deletions(-)
diff --git a/ooni/plugins/b0wser.py b/ooni/plugins/b0wser.py new file mode 100644 index 0000000..5d6687b --- /dev/null +++ b/ooni/plugins/b0wser.py @@ -0,0 +1,85 @@ +""" +This is a self genrated test created by scaffolding.py. +you will need to fill it up with all your necessities. +Safe hacking :). +""" +from zope.interface import implements +from twisted.python import usage +from twisted.plugin import IPlugin +from twisted.internet import protocol, endpoints + +from ooni.plugoo.tests import ITest, OONITest +from ooni.plugoo.assets import Asset +from ooni.protocols import b0wser +from ooni.utils import log + +class B0wserClientProtocol(b0wser.B0wserProtocol): + def connectionMade(self): + self.next_state() + + def connectionLost(self, reason): + print "LOST!" + +class B0wserClientFactory(protocol.ClientFactory): + protocol = B0wserClientProtocol + mutator = None + steps = None + + def buildProtocol(self, addr): + p = self.protocol() + p.factory = self + if self.steps: + p.steps = self.steps + + if not self.mutator: + self.mutator = b0wser.Mutator(p.steps) + p.mutator = self.mutator + else: + print "Moving on to next mutation" + self.mutator.next_mutation() + return p + + def clientConnectionFailed(self, reason): + print "We failed connecting the the OONIB" + print "Cannot perform test. Perhaps it got blocked?" + print "Please report this to tor-assistants@torproject.org" + + def clientConnectionLost(self, reason): + print "Connection Lost." + +class b0wserArgs(usage.Options): + optParameters = [['pcap', 'f', None, 'PCAP file to take as input'], + ['host', 'h', None, 'Target Hostname'], + ['port', 'p', None, 'Target port number'], + ['resume', 'r', 0, 'Resume at this index']] + +class b0wserTest(OONITest): + implements(IPlugin, ITest) + + shortName = "b0wser" + description = "b0wser" + requirements = None + options = b0wserArgs + blocking = False + + def initialize(self): + #pass + self.factory = B0wserClientFactory() + + def experiment(self, args): + steps = b0wser.get_b0wser_dictionary_from_pcap(self.local_options['pcap']) + print steps + self.factory.steps = steps + host = self.local_options['host'] + port = int(self.local_options['port']) + log.msg("Connecting to %s:%s" % (host, port)) + endpoint = endpoints.TCP4ClientEndpoint(self.reactor, host, port) + return endpoint.connect(self.factory) + #return endpoint.connect(B0wserClientFactory) + + def load_assets(self): + return {} + +# We need to instantiate it otherwise getPlugins does not detect it +# XXX Find a way to load plugins without instantiating them. +b0wsertest = b0wserTest(None, None, None)