[tor-commits] [ooni-probe/master] Improve OONI(B) design and fix Daphn3

art at torproject.org art at torproject.org
Tue Jul 24 19:26:19 UTC 2012


commit 64ed356e8cde9d6f866205f978cf7c895ed28885
Author: Arturo Filastò <arturo at filasto.net>
Date:   Tue Jul 24 21:23:52 2012 +0200

    Improve OONI(B) design and fix Daphn3
    * Create a new kind of test functionality called tool
    * Add support for terminating a test via .end()
    * Fix some crazy off by one bugs in Daphn3
    * Fix unit tests by starting reactor in the right place
    * Make Daphn3 into an OONI tool that generates a YAML file
---
 ooni/ooniprobe.py             |   13 +++++--
 ooni/plugins/daphn3.py        |   66 +++++++++++++++++++++++++++++++----
 ooni/plugoo/tests.py          |    7 ++++
 ooni/plugoo/work.py           |   21 +++++++++---
 ooni/protocols/daphn3.py      |   76 +++++++++++++++++++++++++---------------
 oonib/README.md               |    2 +-
 oonib/lib/ssl.py              |    3 +-
 oonib/oonibackend.conf.sample |    1 +
 oonib/testhelpers/daphn3.py   |   17 +++++++--
 9 files changed, 155 insertions(+), 51 deletions(-)

diff --git a/ooni/ooniprobe.py b/ooni/ooniprobe.py
index d6de6a6..b75deca 100755
--- a/ooni/ooniprobe.py
+++ b/ooni/ooniprobe.py
@@ -79,13 +79,21 @@ def runTest(test, options, global_options, reactor=reactor):
     if 'resume' in options:
         resume = options['resume']
 
-    wgen = work.WorkGenerator(test_class(options, global_options, report,
-                                         reactor=reactor),
+    test = test_class(options, global_options, report, reactor=reactor)
+    if test.tool:
+        test.runTool()
+        return
+
+    if test.end:
+        return
+
+    wgen = work.WorkGenerator(test,
                               dict(options),
                               start=resume)
     for x in wgen:
         worker.push(x)
 
+    reactor.run()
 
 class Options(usage.Options):
     tests = plugoo.keys()
@@ -132,5 +140,4 @@ if __name__ == "__main__":
         sys.exit(1)
 
     runTest(config.subCommand, config.subOptions, config)
-    reactor.run()
 
diff --git a/ooni/plugins/daphn3.py b/ooni/plugins/daphn3.py
index 82a3331..6c1186e 100644
--- a/ooni/plugins/daphn3.py
+++ b/ooni/plugins/daphn3.py
@@ -8,6 +8,7 @@ from twisted.python import usage
 from twisted.plugin import IPlugin
 from twisted.internet import protocol, endpoints
 
+from ooni.plugoo import reports
 from ooni.plugoo.tests import ITest, OONITest
 from ooni.plugoo.assets import Asset
 from ooni.protocols import daphn3
@@ -15,40 +16,53 @@ from ooni.utils import log
 
 class Daphn3ClientProtocol(daphn3.Daphn3Protocol):
     def connectionMade(self):
+        print "I have made a connection!"
         self.next_state()
 
     def connectionLost(self, reason):
-        print "LOST!"
+        pass
 
 class Daphn3ClientFactory(protocol.ClientFactory):
     protocol = Daphn3ClientProtocol
     mutator = None
     steps = None
+    test = None
+    report = reports.Report('daphn3', 'daphn3.yamlooni')
 
     def buildProtocol(self, addr):
         p = self.protocol()
+        p.report = self.report
         p.factory = self
         if self.steps:
             p.steps = self.steps
 
         if not self.mutator:
             self.mutator = daphn3.Mutator(p.steps)
-            p.mutator = self.mutator
         else:
             print "Moving on to next mutation"
-            self.mutator.next_mutation()
+            self.mutator.next()
+        p.mutator = self.mutator
         return p
 
     def clientConnectionFailed(self, reason):
         print "We failed connecting the the OONIB"
         print "Cannot perform test. Perhaps it got blocked?"
         print "Please report this to tor-assistants at torproject.org"
+        self.test.end(d)
 
     def clientConnectionLost(self, reason):
         print "Connection Lost."
 
 class daphn3Args(usage.Options):
-    optParameters = [['pcap', 'f', None, 'PCAP file to take as input'],
+    optParameters = [['pcap', 'f', None,
+                        'PCAP to read for generating the YAML output'],
+
+                     ['output', 'o', 'daphn3.yaml',
+                        'What file should be written'],
+
+                     ['yaml', 'y', None,
+                        'The input file to the test'],
+
                      ['host', 'h', None, 'Target Hostname'],
                      ['port', 'p', None, 'Target port number'],
                      ['resume', 'r', 0, 'Resume at this index']]
@@ -65,30 +79,66 @@ class daphn3Test(OONITest):
     local_options = None
 
     steps = None
+
     def initialize(self):
         if not self.local_options:
+            self.end()
             return
         #pass
         self.factory = Daphn3ClientFactory()
-        self.steps = daphn3.read_pcap(self.local_options['pcap'])
+        self.factory.test = self
+
+        if self.local_options['pcap']:
+            self.tool = True
+
+        elif self.local_options['yaml']:
+            self.steps = daphn3.read_yaml(self.local_options['yaml'])
+
+        else:
+            log.msg("Not enough inputs specified to the test")
+            self.end()
+
+    def runTool(self):
+        import yaml
+        pcap = daphn3.read_pcap(self.local_options['pcap'])
+        f = open(self.local_options['output'], 'w')
+        f.write(yaml.dump(pcap))
+        f.close()
 
     def control(self, exp_res, args):
-        mutation = self.factory.mutator.get_mutation(0)
+        try:
+            mutation = self.factory.mutator.get(0)
+        except:
+            mutation = None
         return {'mutation_number': args['mutation'], 'value': mutation}
 
+    def _failure(self, *argc, **kw):
+        print "We failed connecting the the OONIB"
+        print "Cannot perform test. Perhaps it got blocked?"
+        print "Please report this to tor-assistants at torproject.org"
+        print "Traceback: %s %s" % (argc, kw)
+        self.end()
+
     def experiment(self, args):
         log.msg("Doing mutation %s" % args['mutation'])
         self.factory.steps = self.steps
         host = self.local_options['host']
         port = int(self.local_options['port'])
         log.msg("Connecting to %s:%s" % (host, port))
+
+        if self.ended:
+            return
+
         endpoint = endpoints.TCP4ClientEndpoint(self.reactor, host, port)
-        return endpoint.connect(self.factory)
+        d = endpoint.connect(self.factory)
+        d.addErrback(self._failure)
+        return d
         #return endpoint.connect(Daphn3ClientFactory)
 
     def load_assets(self):
         if not self.steps:
-            print "No asset!"
+            print "Error: No assets!"
+            self.end()
             return {}
         mutations = 0
         for x in self.steps:
diff --git a/ooni/plugoo/tests.py b/ooni/plugoo/tests.py
index 9b6ea26..482b8bc 100644
--- a/ooni/plugoo/tests.py
+++ b/ooni/plugoo/tests.py
@@ -25,6 +25,7 @@ class OONITest(object):
     # By default we set this to False, meaning that we don't block
     blocking = False
     reactor = None
+    tool = False
 
     def __init__(self, local_options, global_options, report, ooninet=None,
             reactor=None):
@@ -58,6 +59,12 @@ class OONITest(object):
         return "<OONITest %s %s %s>" % (self.options, self.global_options,
                                            self.assets)
 
+    def end(self):
+        """
+        State that the current test should finish.
+        """
+        self.ended = True
+
     def finished(self, control):
         """
         The Test has finished running, we must now calculate the test runtime
diff --git a/ooni/plugoo/work.py b/ooni/plugoo/work.py
index 6106456..db88fbf 100644
--- a/ooni/plugoo/work.py
+++ b/ooni/plugoo/work.py
@@ -43,12 +43,22 @@ class Worker(object):
 
         @param r: the return value of a previous test.
         """
-        self._running -= 1
+        if self._running > 0:
+            self._running -= 1
+
         if self._running < self.maxconcurrent and self._queued:
             workunit, d = self._queued.pop(0)
             asset, test, idx = workunit
-            self._running += 1
-            actuald = test.startTest(asset).addBoth(self._run)
+            while test.ended and workunit:
+                try:
+                    workunit, d = self._queued.pop(0)
+                    asset, test, idx = workunit
+                except:
+                    workunit = None
+
+            if not test.ended:
+                self._running += 1
+                actuald = test.startTest(asset).addBoth(self._run)
 
         if isinstance(r, failure.Failure):
             # XXX probably we should be doing something to retry test running
@@ -71,8 +81,9 @@ class Worker(object):
         """
         if self._running < self.maxconcurrent:
             asset, test, idx = workunit
-            self._running += 1
-            return test.startTest(asset).addBoth(self._run)
+            if not test.ended:
+                self._running += 1
+                return test.startTest(asset).addBoth(self._run)
 
         d = defer.Deferred()
         self._queued.append((workunit, d))
diff --git a/ooni/protocols/daphn3.py b/ooni/protocols/daphn3.py
index 093929b..d7471ab 100644
--- a/ooni/protocols/daphn3.py
+++ b/ooni/protocols/daphn3.py
@@ -1,13 +1,14 @@
+import sys
+import yaml
+
 from twisted.internet import protocol, defer
 from twisted.internet.error import ConnectionDone
 
+from scapy.all import *
+
 from ooni.utils import log
 from ooni.plugoo import reports
 
-import sys
-from scapy.all import *
-import yaml
-
 def read_pcap(filename):
     """
     @param filename: Filesystem path to the pcap.
@@ -65,6 +66,12 @@ def read_pcap(filename):
 
     return messages
 
+def read_yaml(filename):
+    f = open(filename)
+    obj = yaml.load(f)
+    f.close()
+    return obj
+
 class Mutator:
     idx = 0
     step = 0
@@ -74,10 +81,10 @@ class Mutator:
 
     def __init__(self, steps):
         """
-        @param steps: array of dicts containing as keys data and wait. Data is
-                      the content of the ith packet to be sent and wait is how
-                      much we should wait before mutating the packet of the
-                      next step.
+        @param steps: array of dicts for the steps that must be gone over by
+                      the mutator. Looks like this:
+                      [{"sender": "client", "data": "\xde\xad\xbe\xef"},
+                       {"sender": "server", "data": "\xde\xad\xbe\xef"}]
         """
         self.steps = steps
 
@@ -105,7 +112,7 @@ class Mutator:
         current_state =  {'idx': self.idx, 'step': self.step}
         return current_state
 
-    def next_mutation(self):
+    def next(self):
         """
         Increases by one the mutation state.
 
@@ -121,9 +128,9 @@ class Mutator:
         returns True if another mutation is available.
         returns False if all the possible mutations have been done.
         """
-        if (self.step + 1) > len(self.steps):
+        if (self.step) == len(self.steps):
             # Hack to stop once we have gone through all the steps
-            print "[Mutator.next_mutation()] I believe I have gone over all steps"
+            print "[Mutator.next()] I believe I have gone over all steps"
             print "                          Stopping!"
             self.waiting = True
             return False
@@ -132,35 +139,40 @@ class Mutator:
         current_idx = self.idx
         current_step = self.step
         current_data = self.steps[current_step]['data']
-        try:
-            data_to_receive = len(self.steps[current_step +1 ]['data'])
-            print "[Mutator.next_mutation()] Managed to receive some data."
-        except:
-            print "[Mutator.next_mutation()] No more data to receive."
+
+        if 0:
+            print "current_step: %s" % current_step
+            print "current_idx: %s" % current_idx
+            print "current_data: %s" % current_data
+            print "steps: %s" % len(self.steps)
+            print "waiting_step: %s" % self.waiting_step
+
+        data_to_receive = len(self.steps[current_step]['data'])
 
         if self.waiting and self.waiting_step == data_to_receive:
-            print "[Mutator.next_mutation()] I am no longer waiting"
+            print "[Mutator.next()] I am no longer waiting"
             log.debug("I am no longer waiting.")
             self.waiting = False
             self.waiting_step = 0
             self.idx = 0
 
         elif self.waiting:
-            print "[Mutator.next_mutation()] Waiting some more."
+            print "[Mutator.next()] Waiting some more."
             log.debug("Waiting some more.")
             self.waiting_step += 1
 
         elif current_idx >= len(current_data):
-            print "[Mutator.next_mutation()] Entering waiting mode."
+            print "[Mutator.next()] Entering waiting mode."
             log.debug("Entering waiting mode.")
             self.step += 1
             self.idx = 0
             self.waiting = True
+
         log.debug("current index %s" % current_idx)
         log.debug("current data %s" % len(current_data))
         return True
 
-    def get_mutation(self, step):
+    def get(self, step):
         """
         Returns the current packet to be sent to the wire.
         If no mutation is necessary it will return the plain data.
@@ -172,11 +184,11 @@ class Mutator:
         returns the mutated packet for the specified step.
         """
         if step != self.step or self.waiting:
-            log.debug("[Mutator.get_mutation()] I am not going to do anything :)")
+            log.debug("[Mutator.get()] I am not going to do anything :)")
             return self.steps[step]['data']
 
         data = self.steps[step]['data']
-        print "Mutating %s with idx %s" % (data, self.idx)
+        #print "Mutating %s with idx %s" % (data, self.idx)
         return self._mutate(data, self.idx)
 
 class Daphn3Protocol(protocol.Protocol):
@@ -205,11 +217,13 @@ class Daphn3Protocol(protocol.Protocol):
             print "[Daphn3Protocol.next_state] No mutator. There is no point to stay on this earth."
             self.transport.loseConnection()
             return
+
         if self.role is self.steps[self.state]['sender']:
             print "[Daphn3Protocol.next_state] I am a sender"
-            data = self.mutator.get_mutation(self.state)
+            data = self.mutator.get(self.state)
             self.transport.write(data)
             self.to_receive_data = 0
+
         else:
             print "[Daphn3Protocol.next_state] I am a receiver"
             self.to_receive_data = len(self.steps[self.state]['data'])
@@ -228,11 +242,8 @@ class Daphn3Protocol(protocol.Protocol):
             print "I don't have a mutator. My life means nothing."
             self.transport.loseConnection()
             return
-        if len(self.steps) <= self.state:
-            print "I have reached the end of the state machine"
-            print "Censorship fingerprint bruteforced!"
-            report = {'mutator_state': self.mutator.state()}
-            self.report(report)
+
+        if len(self.steps) == self.state:
             self.transport.loseConnection()
             return
 
@@ -256,7 +267,7 @@ class Daphn3Protocol(protocol.Protocol):
         print "The connection was closed because of %s" % report['reason']
         print "State %s, Mutator %s" % (report['proto_state'],
                                         report['mutator_state'])
-        self.mutator.next_mutation()
+        self.mutator.next()
 
 
 
@@ -275,6 +286,13 @@ class Daphn3Protocol(protocol.Protocol):
             report['trigger'] = 'did not finish state walk'
             self.censorship_detected(report)
 
+        else:
+            print "I have reached the end of the state machine"
+            print "Censorship fingerprint bruteforced!"
+            report = {'mutator_state': self.mutator.state()}
+            self.report(report)
+            return
+
         if reason.check(ConnectionDone):
             print "Connection closed cleanly"
         else:
diff --git a/oonib/README.md b/oonib/README.md
index 2d6caba..8136b06 100644
--- a/oonib/README.md
+++ b/oonib/README.md
@@ -2,8 +2,8 @@
 
 The extra dependencies necessary to run OONIB are:
 
+* twisted-names
 * cyclone: https://github.com/fiorix/cyclone
-*
 
 # Generate self signed certs for OONIB
 
diff --git a/oonib/lib/ssl.py b/oonib/lib/ssl.py
index 5f19686..094bdc9 100644
--- a/oonib/lib/ssl.py
+++ b/oonib/lib/ssl.py
@@ -1,7 +1,8 @@
 from twisted.internet import ssl
+from oonib.lib import config
 
 class SSLContext(ssl.DefaultOpenSSLContextFactory):
-    def __init__(self, config):
+    def __init__(self, *args, **kw):
         ssl.DefaultOpenSSLContextFactory.__init__(self, config.main.ssl_private_key,
                                                   config.main.ssl_certificate)
 
diff --git a/oonib/oonibackend.conf.sample b/oonib/oonibackend.conf.sample
index a5cbbd3..021e0e2 100644
--- a/oonib/oonibackend.conf.sample
+++ b/oonib/oonibackend.conf.sample
@@ -8,3 +8,4 @@ ssl_private_key = /path/to/private.key
 ssl_certificate = /path/to/certificate.crt
 [daphn3]
 pcap_file = /path/to/server.pcap
+yaml_file = /path/to/server.yaml
diff --git a/oonib/testhelpers/daphn3.py b/oonib/testhelpers/daphn3.py
index 59bdeea..2dd2801 100644
--- a/oonib/testhelpers/daphn3.py
+++ b/oonib/testhelpers/daphn3.py
@@ -1,11 +1,11 @@
 from twisted.internet import protocol
 from twisted.internet.error import ConnectionDone
 
-from oonib.common import config
+from oonib.lib import config
 
 from ooni.plugoo import reports
 from ooni.protocols.daphn3 import Mutator, Daphn3Protocol
-from ooni.protocols.daphn3 import read_pcap
+from ooni.protocols.daphn3 import read_pcap, read_yaml
 
 class Daphn3Server(protocol.ServerFactory):
     """
@@ -21,13 +21,22 @@ class Daphn3Server(protocol.ServerFactory):
     def buildProtocol(self, addr):
         p = self.protocol()
         p.factory = self
-        p.factory.steps = read_pcap(config.daphn3.pcap_file)
+
+        if config.daphn3.yaml_file:
+            steps = read_yaml(config.daphn3.yaml_file)
+        elif config.daphn3.pcap_file:
+            steps = read_pcap(config.daphn3.pcap_file)
+        else:
+            print "Error! No PCAP, nor YAML file provided."
+            steps = None
+
+        p.factory.steps = steps
 
         if addr.host not in self.mutations:
             self.mutations[addr.host] = Mutator(p.steps)
         else:
             print "Moving on to next mutation"
-            if not self.mutations[addr.host].next_mutation():
+            if not self.mutations[addr.host].next():
                 self.mutations.pop(addr.host)
         try:
             p.mutator = self.mutations[addr.host]



More information about the tor-commits mailing list