[tor-commits] [ooni-probe/master] Continue implementing twisted version of OONI

art at torproject.org art at torproject.org
Thu May 31 03:01:42 UTC 2012


commit 113c237d387ac9c3984d99f908dc914d6f18d2a8
Author: Arturo Filastò <hellais at torproject.org>
Date:   Wed May 2 16:53:54 2012 +0200

    Continue implementing twisted version of OONI
---
 oonicli.py           |   57 +++++++++++++++++++
 ooniprobe.py         |    3 +-
 plugins/__init__.py  |    3 +
 plugins/dropin.cache |   48 ++++++++++++++++
 plugins/skel.py      |   22 ++++++++
 plugoo/interface.py  |    2 +
 plugoo/nodes.py      |   12 +++--
 plugoo/tests.py      |  135 +++++++++++++++-------------------------------
 plugoo/work.py       |  146 ++++++++++++++++++++++++++++++++++++++++++++++++++
 plugoo/workers.py    |   36 ------------
 skel.py              |   10 ++++
 unittest/tests.py    |   27 +++++++++
 12 files changed, 368 insertions(+), 133 deletions(-)

diff --git a/oonicli.py b/oonicli.py
new file mode 100755
index 0000000..d77ad0f
--- /dev/null
+++ b/oonicli.py
@@ -0,0 +1,57 @@
+#!/usr/bin/env python
+# -*- coding: UTF-8
+#
+#    oonicli
+#    *******
+#
+#    :copyright: (c) 2012 by Arturo Filastò
+#    :license: see LICENSE for more details.
+#
+
+from plugoo import tests
+
+from twisted.python import usage
+
+from twisted.plugin import getPlugins
+
+from zope.interface.exceptions import BrokenImplementation
+from zope.interface.exceptions import BrokenMethodImplementation
+from zope.interface.verify import verifyObject
+import plugins
+
+def retrieve_plugoo():
+    interface = tests.ITest
+    d = {}
+    error = False
+    for p in getPlugins(interface, plugins):
+        try:
+            verifyObject(interface, p)
+            d[p.shortName] = p
+        except BrokenImplementation, bi:
+            print "Plugin Broke"
+            print bi
+            error = True
+        except BrokenMethodImplementation, bmi:
+            print "Plugin Broke"
+            error = True
+    if error != False:
+        print "Plugin Loaded!"
+    return d
+
+plugoo = retrieve_plugoo()
+
+class Options(usage.Options):
+    tests = plugoo.keys()
+    subCommands = []
+    for test in tests:
+        subCommands.append([test, None, plugoo[test].arguments, "Run the %s test" % test])
+
+    optParameters = [
+        ['status', 's', 0, 'Show current state'],
+        ['restart', 'r', None, 'Restart OONI'],
+        ['node', 'n', 'localhost:31415', 'Select target node']
+    ]
+
+config = Options()
+config.parseOptions()
+
diff --git a/ooniprobe.py b/ooniprobe.py
index 9d9c0ac..3e8fbe7 100755
--- a/ooniprobe.py
+++ b/ooniprobe.py
@@ -1,7 +1,7 @@
 #!/usr/bin/env python
 # -*- coding: UTF-8
 """
-    ooni-probe
+    ooniprobe
     **********
 
     Open Observatory of Network Interference
@@ -152,7 +152,6 @@ class ooni(object):
             self.tests[test].module.run(self)
 
 if __name__ == "__main__":
-
     o = ooni()
 
     parser = argparse.ArgumentParser(formatter_class=argparse.RawDescriptionHelpFormatter,
diff --git a/plugins/__init__.py b/plugins/__init__.py
new file mode 100644
index 0000000..ddb8691
--- /dev/null
+++ b/plugins/__init__.py
@@ -0,0 +1,3 @@
+from twisted.plugin import pluginPackagePaths
+__path__.extend(pluginPackagePaths(__name__))
+__all__ = []
diff --git a/plugins/dropin.cache b/plugins/dropin.cache
new file mode 100755
index 0000000..7bc3990
--- /dev/null
+++ b/plugins/dropin.cache
@@ -0,0 +1,48 @@
+(dp1
+S'skel'
+p2
+ccopy_reg
+_reconstructor
+p3
+(ctwisted.plugin
+CachedDropin
+p4
+c__builtin__
+object
+p5
+NtRp6
+(dp7
+S'moduleName'
+p8
+S'plugins.skel'
+p9
+sS'description'
+p10
+NsS'plugins'
+p11
+(lp12
+g3
+(ctwisted.plugin
+CachedPlugin
+p13
+g5
+NtRp14
+(dp15
+S'provided'
+p16
+(lp17
+ctwisted.plugin
+IPlugin
+p18
+acplugoo.tests
+ITest
+p19
+asS'dropin'
+p20
+g6
+sS'name'
+p21
+S'skel'
+p22
+sg10
+Nsbasbs.
\ No newline at end of file
diff --git a/plugins/skel.py b/plugins/skel.py
new file mode 100644
index 0000000..d27932a
--- /dev/null
+++ b/plugins/skel.py
@@ -0,0 +1,22 @@
+from zope.interface import implements
+from twisted.python import usage
+from twisted.plugin import IPlugin
+from plugoo.tests import ITest
+
+class SkelArgs(usage.Options):
+    optParameters = [['assets', 'a', None, 'Asset file'],
+                     ['resume', 'r', None, 'Resume at this index']]
+
+class SkelTest(object):
+    implements(IPlugin, ITest)
+
+    shortName = "skeleton"
+    description = "Skeleton plugin"
+    requirements = None
+    arguments = SkelArgs
+
+    def startTest():
+        pass
+
+skel = SkelTest()
+
diff --git a/plugoo/interface.py b/plugoo/interface.py
new file mode 100644
index 0000000..72f58d7
--- /dev/null
+++ b/plugoo/interface.py
@@ -0,0 +1,2 @@
+from zope.interface import implements, Interface, Attribute
+
diff --git a/plugoo/nodes.py b/plugoo/nodes.py
index 5b16d22..6cdba65 100644
--- a/plugoo/nodes.py
+++ b/plugoo/nodes.py
@@ -33,6 +33,10 @@ class Node(object):
         self.address = address
         self.port = port
 
+class LocalNode(object):
+    def __init__(self):
+        pass
+
 """
 []: node = NetworkNode("192.168.0.112", 5555, "SOCKS5")
 []: node_socket = node.wrap_socket()
@@ -91,7 +95,7 @@ class CodeExecNode(Node):
     def get_status(self):
         pass
 
-class PlanetLab(Node, CodeExecNode):
+class PlanetLab(CodeExecNode):
     def __init__(self, address, auth_creds, ooni):
         self.auth_creds = auth_creds
 
@@ -147,7 +151,7 @@ class PlanetLab(Node, CodeExecNode):
             except paramiko.SSHException:
                 print 'Public key authentication to PlanetLab node %s failed.' % machinename,
 
-    def _get_command:
+    def _get_command():
         pass
 
     def ssh_and_run_(slicename, machinename, command):
@@ -169,8 +173,8 @@ class PlanetLab(Node, CodeExecNode):
         """Attempt to rsync a tree to the PL node."""
         pass
 
-    def add_unit:
+    def add_unit():
         pass
 
-    def get_status:
+    def get_status():
         pass
diff --git a/plugoo/tests.py b/plugoo/tests.py
index 9a9f1de..4c3e1a1 100644
--- a/plugoo/tests.py
+++ b/plugoo/tests.py
@@ -1,10 +1,13 @@
 import os
 from datetime import datetime
 import yaml
+from zope.interface import Interface, Attribute
 
 import logging
 import itertools
 import gevent
+from twisted.internet import reactor, defer
+from twisted.python import failure
 from plugoo.reports import Report
 
 class Test:
@@ -106,48 +109,11 @@ class Test:
                     #print "JOB VAL: %s" % job.value
                     self.logger.info("Writing report(s)")
                     self.report(job.value)
-                   job.kill()
+                    job.kill()
                 jobs = []
         else:
             self.logger.error("No Assets! Dying!")
 
-class WorkUnit(object):
-    """
-    This is an object responsible for completing WorkUnits it will
-    return its result in a deferred.
-
-    The execution of a unit of work should be Atomic.
-
-    Reporting to the OONI-net happens on completion of a Unit of Work.
-
-    @Node node: This represents the node associated with the Work Unit
-    @Asset asset: This is the asset associated with the Work Unit
-    @Test test: This represents the Test to be with the specified assets
-    @ivar arguments: These are the extra attributes to be passsed to the Test
-    """
-
-    node = None
-    asset = None
-    test = None
-    arguments = None
-
-    def __init__(self, node, asset, test, arguments):
-        self.assetGenerator = asset()
-        self.Test = test
-        self.node = node
-        self.arguments = arguments
-
-    def next():
-        """
-        Launches the Unit of Work with the specified assets on the node.
-        """
-        try:
-           asset = self.assetGenerator.next()
-           yield self.Test(asset, self.node, self.arguments)
-        except StopIteration:
-            raise StopIteration
-
-
 class ITest(Interface):
     """
     This interface represents an OONI test. It fires a deferred on completion.
@@ -156,10 +122,10 @@ class ITest(Interface):
     shortName = Attribute("""A short user facing description for this test""")
     description = Attribute("""A string containing a longer description for the test""")
 
-    requirements = Attribtue("""What is required to run this this test, for example raw socket access or UDP or TCP""")
+    requirements = Attribute("""What is required to run this this test, for example raw socket access or UDP or TCP""")
 
-    deferred = Attribute("""This will be fired on test completion""")
-    node = Attribute("""This represents the node that will run the test""")
+    #deferred = Attribute("""This will be fired on test completion""")
+    #node = Attribute("""This represents the node that will run the test""")
     arguments = Attribute("""These are the arguments to be passed to the test for it's execution""")
 
     def startTest():
@@ -167,53 +133,8 @@ class ITest(Interface):
         Launches the Test with the specified arguments on a node.
         """
 
-class WorkGenerator(object):
-    """
-    Factory responsible for creating units of work.
-
-    This shall be run on the machine running OONI-cli. The returned WorkUnits
-    can either be run locally or on a remote OONI Node or Network Node.
-    """
-    node = LocalNode
-    size = 10
-
-    def __init__(self, assets):
-        self.assets = assets()
-
-    def next(self):
-        # Plank asset
-        p_asset = []
-        for i in xrange(0, self.size):
-            p_asset.append(self.assets.next())
-        yield WorkUnit(p_asset)
-
-def spawnDeferredTests(workunit, n):
-    def callback(result):
-        pass
-
-    def errback(reason):
-        pass
-
-    # XXX find more elegant solution to having 2 generators
-    workgenA = WorkGenerator(assets)
-
-    for workunit in workgen:
-        deferredList = []
-        workunitB = workunit
-        for i in range(n):
-            try:
-                test = workunit.next()
-            except StopIteration:
-                pass
-
-            deferred = test.deferred
-            deferred.addCallback(callback).addErrback(errback)
-
-            deferredList.append(deferred)
-            test.startTest()
-
-
-class HTTPRequestTest(HTTPClient):
+#class HTTPRequestTest(HTTPClient):
+class HTTPRequestTest(object):
     """
     This is an example of how I would like to be able to write a test.
 
@@ -221,9 +142,9 @@ class HTTPRequestTest(HTTPClient):
     kind of API that I am attempting to achieve to simplify the writing of
     tests.
 
-    """
     implements(ITest)
 
+    """
     def startTest():
         # The response object should also contain the request
         """
@@ -231,7 +152,6 @@ class HTTPRequestTest(HTTPClient):
         'runtime': ..., 'timestamp': ...},
         'request': {'headers': ..., 'content', 'timestamp', ...}
         }
-        """
         response = self.http_request(address, headers)
         if response.headers['content'].matches("Some string"):
             self.censorship = True
@@ -240,9 +160,42 @@ class HTTPRequestTest(HTTPClient):
             self.censorship = False
             return response
 
+        """
+        pass
+
+class TwistedTest(object):
+    def __init__(self, asset, node, arguments, ooninet=None):
+        self.asset = asset
+        self.node = node
+        self.arguments = arguments
+        self.start_time = datetime.now()
+        #self.ooninet = ooninet
+
+    def __repr__(self):
+        return "<TwistedTest %s %s %s>" % (self.arguments, self.asset, self.node)
+
+    def finished(self, result):
+        #self.ooninet.report(result)
+        self.end_time = datetime.now()
+        result['start_time'] = self.start_time
+        result['end_time'] = self.end_time
+        result['run_time'] = self.end_time - self.start_time
+        return self.d.callback(result)
+
+    def startTest(self):
+        self.d = defer.Deferred()
+        result = {}
+        reactor.callLater(2.0, self.finished, result)
+        return self.d
+
+
+class StupidTest(TwistedTest):
+    def __repr__(self):
+        return "<StupidTest %s %s %s>" % (self.arguments, self.asset, self.node)
+
 class TwistedTestFactory(object):
 
-    test = TwistedTest
+    test = StupidTest
 
     def __init__(self, assets, node,
                  idx=0):
diff --git a/plugoo/work.py b/plugoo/work.py
new file mode 100644
index 0000000..53cb18c
--- /dev/null
+++ b/plugoo/work.py
@@ -0,0 +1,146 @@
+# -*- coding: UTF-8
+"""
+    work.py
+    **********
+
+    This contains all code related to generating
+    Units of Work and processing it.
+
+    :copyright: (c) 2012 by Arturo Filastò.
+    :license: see LICENSE for more details.
+
+"""
+from datetime import datetime
+import yaml
+
+from zope.interface import Interface, Attribute
+
+from twisted.python import failure
+from twisted.internet import reactor, defer
+
+from plugoo.nodes import LocalNode
+
+class Worker(object):
+    """
+    This is the core of OONI. It takes as input Work Units and
+    runs them concurrently.
+    """
+    def __init__(self, maxconcurrent=10):
+        self.maxconcurrent = maxconcurrent
+        self._running = 0
+        self._queued = []
+
+    def _run(self, r):
+        self._running -= 1
+        if self._running < self.maxconcurrent and self._queued:
+            workunit, d = self._queued.pop(0)
+            for work in workunit:
+                self._running += 1
+                actuald = work.startTest().addBoth(self._run)
+        if isinstance(r, failure.Failure):
+            r.trap()
+
+        print "Callback fired!"
+        print r['start_time']
+        print r['end_time']
+        print r['run_time']
+        print repr(r)
+        return r
+
+    def push(self, workunit):
+        if self._running < self.maxconcurrent:
+            for work in workunit:
+                self._running += 1
+                work.startTest().addBoth(self._run)
+            return
+        d = defer.Deferred()
+        self._queued.append((workunit, d))
+        return d
+
+class WorkUnit(object):
+    """
+    This is an object responsible for completing WorkUnits it will
+    return its result in a deferred.
+
+    The execution of a unit of work should be Atomic.
+
+    Reporting to the OONI-net happens on completion of a Unit of Work.
+
+    @Node node: This represents the node associated with the Work Unit
+    @Asset asset: This is the asset associated with the Work Unit
+    @Test test: This represents the Test to be with the specified assets
+    @ivar arguments: These are the extra attributes to be passsed to the Test
+    """
+
+    node = None
+    asset = None
+    test = None
+    arguments = None
+
+    def __init__(self, node, asset, test, idx, arguments=None):
+        self.asset = asset
+        self.assetGenerator = iter(asset)
+        self.Test = test
+        self.node = node
+        self.arguments = arguments
+        self.idx = idx
+
+    def __iter__(self):
+        return self
+
+    def __repr__(self):
+        return "<WorkUnit %s %s %s>" % (self.arguments, self.Test, self.idx)
+
+    def serialize(self):
+        """
+        Serialize this unit of work for RPC activity.
+        """
+        return yaml.dump(self)
+
+    def next(self):
+        """
+        Launches the Unit of Work with the specified assets on the node.
+        """
+        try:
+           asset = self.assetGenerator.next()
+           return self.Test(asset, self.node, self.arguments)
+        except StopIteration:
+            raise StopIteration
+
+
+class WorkGenerator(object):
+    """
+    Factory responsible for creating units of work.
+
+    This shall be run on the machine running OONI-cli. The returned WorkUnits
+    can either be run locally or on a remote OONI Node or Network Node.
+    """
+    node = LocalNode
+    size = 10
+
+    def __init__(self, asset, test, arguments=None, start=None):
+        self.assetGenerator = asset()
+        self.Test = test
+        self.arguments = arguments
+        self.idx = 0
+        if start:
+            self.skip(start)
+
+    def __iter__(self):
+        return self
+
+    def skip(self, start):
+        for j in xrange(0, start-1):
+            for i in xrange(0, self.size):
+                self.assetGenerator.next()
+            self.idx += 1
+
+    def next(self):
+        # Plank asset
+        p_asset = []
+        for i in xrange(0, self.size):
+            p_asset.append(self.assetGenerator.next())
+        self.idx += 1
+        return WorkUnit(self.node, p_asset, self.Test, self.idx, self.arguments)
+
+
diff --git a/plugoo/workers.py b/plugoo/workers.py
deleted file mode 100644
index 3c8397d..0000000
--- a/plugoo/workers.py
+++ /dev/null
@@ -1,36 +0,0 @@
-import gevent
-from gevent.pool import Pool
-
-class WorkFactory:
-    """
-    This class is responsible for producing
-    units of work.
-    """
-    def __init__(self, assets=None,
-                 nodes=None, rule=None):
-        pass
-
-    def _process_rule(self):
-        pass
-
-    def get_work_unit():
-        pass
-
-class UnitOfWork:
-    def __init__(self, tests, poolsize=20,
-                 unit_of_work=None):
-        pass
-
-    def _read_unit_of_work(self):
-        pass
-
-    def _build_pools(self):
-        for i, x in enumerate(self.tests):
-            if i % self.poolsize == 0:
-
-
-    def do(self):
-        with gevent.Timeout():
-            self.pool.join()
-
-
diff --git a/skel.py b/skel.py
new file mode 100644
index 0000000..8982106
--- /dev/null
+++ b/skel.py
@@ -0,0 +1,10 @@
+#!/usr/bin/env python
+# -*- coding: UTF-8
+"""
+    XXX
+    ***
+
+    :copyright: (c) 2012 by Arturo Filastò
+    :license: see LICENSE for more details.
+"""
+
diff --git a/unittest/tests.py b/unittest/tests.py
new file mode 100644
index 0000000..da7c7d0
--- /dev/null
+++ b/unittest/tests.py
@@ -0,0 +1,27 @@
+from twisted.internet import reactor
+from plugoo import *
+
+class StupidAsset(object):
+    def __init__(self):
+        self.idx = 0
+
+    def __iter__(self):
+        return self
+
+    def next(self):
+        if self.idx > 30:
+            raise StopIteration
+        self.idx += 1
+        return self.idx
+
+wgen = work.WorkGenerator(StupidAsset, tests.StupidTest, {'bla': 'aaa'}, start=0)
+worker = work.Worker()
+for x in wgen:
+    print "------"
+    print "Work unit"
+    print "------"
+    worker.push(x)
+    print "------"
+
+reactor.run()
+





More information about the tor-commits mailing list