commit 113c237d387ac9c3984d99f908dc914d6f18d2a8
Author: Arturo Filastò <hellais(a)torproject.org>
Date: Wed May 2 16:53:54 2012 +0200
Continue implementing twisted version of OONI
---
oonicli.py | 57 +++++++++++++++++++
ooniprobe.py | 3 +-
plugins/__init__.py | 3 +
plugins/dropin.cache | 48 ++++++++++++++++
plugins/skel.py | 22 ++++++++
plugoo/interface.py | 2 +
plugoo/nodes.py | 12 +++--
plugoo/tests.py | 135 +++++++++++++++-------------------------------
plugoo/work.py | 146 ++++++++++++++++++++++++++++++++++++++++++++++++++
plugoo/workers.py | 36 ------------
skel.py | 10 ++++
unittest/tests.py | 27 +++++++++
12 files changed, 368 insertions(+), 133 deletions(-)
diff --git a/oonicli.py b/oonicli.py
new file mode 100755
index 0000000..d77ad0f
--- /dev/null
+++ b/oonicli.py
@@ -0,0 +1,57 @@
+#!/usr/bin/env python
+# -*- coding: UTF-8
+#
+# oonicli
+# *******
+#
+# :copyright: (c) 2012 by Arturo Filastò
+# :license: see LICENSE for more details.
+#
+
+from plugoo import tests
+
+from twisted.python import usage
+
+from twisted.plugin import getPlugins
+
+from zope.interface.exceptions import BrokenImplementation
+from zope.interface.exceptions import BrokenMethodImplementation
+from zope.interface.verify import verifyObject
+import plugins
+
+def retrieve_plugoo():
+ interface = tests.ITest
+ d = {}
+ error = False
+ for p in getPlugins(interface, plugins):
+ try:
+ verifyObject(interface, p)
+ d[p.shortName] = p
+ except BrokenImplementation, bi:
+ print "Plugin Broke"
+ print bi
+ error = True
+ except BrokenMethodImplementation, bmi:
+ print "Plugin Broke"
+ error = True
+ if error != False:
+ print "Plugin Loaded!"
+ return d
+
+plugoo = retrieve_plugoo()
+
+class Options(usage.Options):
+ tests = plugoo.keys()
+ subCommands = []
+ for test in tests:
+ subCommands.append([test, None, plugoo[test].arguments, "Run the %s test" % test])
+
+ optParameters = [
+ ['status', 's', 0, 'Show current state'],
+ ['restart', 'r', None, 'Restart OONI'],
+ ['node', 'n', 'localhost:31415', 'Select target node']
+ ]
+
+config = Options()
+config.parseOptions()
+
diff --git a/ooniprobe.py b/ooniprobe.py
index 9d9c0ac..3e8fbe7 100755
--- a/ooniprobe.py
+++ b/ooniprobe.py
@@ -1,7 +1,7 @@
#!/usr/bin/env python
# -*- coding: UTF-8
"""
- ooni-probe
+ ooniprobe
**********
Open Observatory of Network Interference
@@ -152,7 +152,6 @@ class ooni(object):
self.tests[test].module.run(self)
if __name__ == "__main__":
-
o = ooni()
parser = argparse.ArgumentParser(formatter_class=argparse.RawDescriptionHelpFormatter,
diff --git a/plugins/__init__.py b/plugins/__init__.py
new file mode 100644
index 0000000..ddb8691
--- /dev/null
+++ b/plugins/__init__.py
@@ -0,0 +1,3 @@
+from twisted.plugin import pluginPackagePaths
+__path__.extend(pluginPackagePaths(__name__))
+__all__ = []
diff --git a/plugins/dropin.cache b/plugins/dropin.cache
new file mode 100755
index 0000000..7bc3990
--- /dev/null
+++ b/plugins/dropin.cache
@@ -0,0 +1,48 @@
+(dp1
+S'skel'
+p2
+ccopy_reg
+_reconstructor
+p3
+(ctwisted.plugin
+CachedDropin
+p4
+c__builtin__
+object
+p5
+NtRp6
+(dp7
+S'moduleName'
+p8
+S'plugins.skel'
+p9
+sS'description'
+p10
+NsS'plugins'
+p11
+(lp12
+g3
+(ctwisted.plugin
+CachedPlugin
+p13
+g5
+NtRp14
+(dp15
+S'provided'
+p16
+(lp17
+ctwisted.plugin
+IPlugin
+p18
+acplugoo.tests
+ITest
+p19
+asS'dropin'
+p20
+g6
+sS'name'
+p21
+S'skel'
+p22
+sg10
+Nsbasbs.
\ No newline at end of file
diff --git a/plugins/skel.py b/plugins/skel.py
new file mode 100644
index 0000000..d27932a
--- /dev/null
+++ b/plugins/skel.py
@@ -0,0 +1,22 @@
+from zope.interface import implements
+from twisted.python import usage
+from twisted.plugin import IPlugin
+from plugoo.tests import ITest
+
+class SkelArgs(usage.Options):
+ optParameters = [['assets', 'a', None, 'Asset file'],
+ ['resume', 'r', None, 'Resume at this index']]
+
+class SkelTest(object):
+ implements(IPlugin, ITest)
+
+ shortName = "skeleton"
+ description = "Skeleton plugin"
+ requirements = None
+ arguments = SkelArgs
+
+ def startTest():
+ pass
+
+skel = SkelTest()
+
diff --git a/plugoo/interface.py b/plugoo/interface.py
new file mode 100644
index 0000000..72f58d7
--- /dev/null
+++ b/plugoo/interface.py
@@ -0,0 +1,2 @@
+from zope.interface import implements, Interface, Attribute
+
diff --git a/plugoo/nodes.py b/plugoo/nodes.py
index 5b16d22..6cdba65 100644
--- a/plugoo/nodes.py
+++ b/plugoo/nodes.py
@@ -33,6 +33,10 @@ class Node(object):
self.address = address
self.port = port
+class LocalNode(object):
+ def __init__(self):
+ pass
+
"""
[]: node = NetworkNode("192.168.0.112", 5555, "SOCKS5")
[]: node_socket = node.wrap_socket()
@@ -91,7 +95,7 @@ class CodeExecNode(Node):
def get_status(self):
pass
-class PlanetLab(Node, CodeExecNode):
+class PlanetLab(CodeExecNode):
def __init__(self, address, auth_creds, ooni):
self.auth_creds = auth_creds
@@ -147,7 +151,7 @@ class PlanetLab(Node, CodeExecNode):
except paramiko.SSHException:
print 'Public key authentication to PlanetLab node %s failed.' % machinename,
- def _get_command:
+ def _get_command():
pass
def ssh_and_run_(slicename, machinename, command):
@@ -169,8 +173,8 @@ class PlanetLab(Node, CodeExecNode):
"""Attempt to rsync a tree to the PL node."""
pass
- def add_unit:
+ def add_unit():
pass
- def get_status:
+ def get_status():
pass
diff --git a/plugoo/tests.py b/plugoo/tests.py
index 9a9f1de..4c3e1a1 100644
--- a/plugoo/tests.py
+++ b/plugoo/tests.py
@@ -1,10 +1,13 @@
import os
from datetime import datetime
import yaml
+from zope.interface import Interface, Attribute
import logging
import itertools
import gevent
+from twisted.internet import reactor, defer
+from twisted.python import failure
from plugoo.reports import Report
class Test:
@@ -106,48 +109,11 @@ class Test:
#print "JOB VAL: %s" % job.value
self.logger.info("Writing report(s)")
self.report(job.value)
- job.kill()
+ job.kill()
jobs = []
else:
self.logger.error("No Assets! Dying!")
-class WorkUnit(object):
- """
- This is an object responsible for completing WorkUnits it will
- return its result in a deferred.
-
- The execution of a unit of work should be Atomic.
-
- Reporting to the OONI-net happens on completion of a Unit of Work.
-
- @Node node: This represents the node associated with the Work Unit
- @Asset asset: This is the asset associated with the Work Unit
- @Test test: This represents the Test to be with the specified assets
- @ivar arguments: These are the extra attributes to be passsed to the Test
- """
-
- node = None
- asset = None
- test = None
- arguments = None
-
- def __init__(self, node, asset, test, arguments):
- self.assetGenerator = asset()
- self.Test = test
- self.node = node
- self.arguments = arguments
-
- def next():
- """
- Launches the Unit of Work with the specified assets on the node.
- """
- try:
- asset = self.assetGenerator.next()
- yield self.Test(asset, self.node, self.arguments)
- except StopIteration:
- raise StopIteration
-
-
class ITest(Interface):
"""
This interface represents an OONI test. It fires a deferred on completion.
@@ -156,10 +122,10 @@ class ITest(Interface):
shortName = Attribute("""A short user facing description for this test""")
description = Attribute("""A string containing a longer description for the test""")
- requirements = Attribtue("""What is required to run this this test, for example raw socket access or UDP or TCP""")
+ requirements = Attribute("""What is required to run this this test, for example raw socket access or UDP or TCP""")
- deferred = Attribute("""This will be fired on test completion""")
- node = Attribute("""This represents the node that will run the test""")
+ #deferred = Attribute("""This will be fired on test completion""")
+ #node = Attribute("""This represents the node that will run the test""")
arguments = Attribute("""These are the arguments to be passed to the test for it's execution""")
def startTest():
@@ -167,53 +133,8 @@ class ITest(Interface):
Launches the Test with the specified arguments on a node.
"""
-class WorkGenerator(object):
- """
- Factory responsible for creating units of work.
-
- This shall be run on the machine running OONI-cli. The returned WorkUnits
- can either be run locally or on a remote OONI Node or Network Node.
- """
- node = LocalNode
- size = 10
-
- def __init__(self, assets):
- self.assets = assets()
-
- def next(self):
- # Plank asset
- p_asset = []
- for i in xrange(0, self.size):
- p_asset.append(self.assets.next())
- yield WorkUnit(p_asset)
-
-def spawnDeferredTests(workunit, n):
- def callback(result):
- pass
-
- def errback(reason):
- pass
-
- # XXX find more elegant solution to having 2 generators
- workgenA = WorkGenerator(assets)
-
- for workunit in workgen:
- deferredList = []
- workunitB = workunit
- for i in range(n):
- try:
- test = workunit.next()
- except StopIteration:
- pass
-
- deferred = test.deferred
- deferred.addCallback(callback).addErrback(errback)
-
- deferredList.append(deferred)
- test.startTest()
-
-
-class HTTPRequestTest(HTTPClient):
+#class HTTPRequestTest(HTTPClient):
+class HTTPRequestTest(object):
"""
This is an example of how I would like to be able to write a test.
@@ -221,9 +142,9 @@ class HTTPRequestTest(HTTPClient):
kind of API that I am attempting to achieve to simplify the writing of
tests.
- """
implements(ITest)
+ """
def startTest():
# The response object should also contain the request
"""
@@ -231,7 +152,6 @@ class HTTPRequestTest(HTTPClient):
'runtime': ..., 'timestamp': ...},
'request': {'headers': ..., 'content', 'timestamp', ...}
}
- """
response = self.http_request(address, headers)
if response.headers['content'].matches("Some string"):
self.censorship = True
@@ -240,9 +160,42 @@ class HTTPRequestTest(HTTPClient):
self.censorship = False
return response
+ """
+ pass
+
+class TwistedTest(object):
+ def __init__(self, asset, node, arguments, ooninet=None):
+ self.asset = asset
+ self.node = node
+ self.arguments = arguments
+ self.start_time = datetime.now()
+ #self.ooninet = ooninet
+
+ def __repr__(self):
+ return "<TwistedTest %s %s %s>" % (self.arguments, self.asset, self.node)
+
+ def finished(self, result):
+ #self.ooninet.report(result)
+ self.end_time = datetime.now()
+ result['start_time'] = self.start_time
+ result['end_time'] = self.end_time
+ result['run_time'] = self.end_time - self.start_time
+ return self.d.callback(result)
+
+ def startTest(self):
+ self.d = defer.Deferred()
+ result = {}
+ reactor.callLater(2.0, self.finished, result)
+ return self.d
+
+
+class StupidTest(TwistedTest):
+ def __repr__(self):
+ return "<StupidTest %s %s %s>" % (self.arguments, self.asset, self.node)
+
class TwistedTestFactory(object):
- test = TwistedTest
+ test = StupidTest
def __init__(self, assets, node,
idx=0):
diff --git a/plugoo/work.py b/plugoo/work.py
new file mode 100644
index 0000000..53cb18c
--- /dev/null
+++ b/plugoo/work.py
@@ -0,0 +1,146 @@
+# -*- coding: UTF-8
+"""
+ work.py
+ **********
+
+ This contains all code related to generating
+ Units of Work and processing it.
+
+ :copyright: (c) 2012 by Arturo Filastò.
+ :license: see LICENSE for more details.
+
+"""
+from datetime import datetime
+import yaml
+
+from zope.interface import Interface, Attribute
+
+from twisted.python import failure
+from twisted.internet import reactor, defer
+
+from plugoo.nodes import LocalNode
+
+class Worker(object):
+ """
+ This is the core of OONI. It takes as input Work Units and
+ runs them concurrently.
+ """
+ def __init__(self, maxconcurrent=10):
+ self.maxconcurrent = maxconcurrent
+ self._running = 0
+ self._queued = []
+
+ def _run(self, r):
+ self._running -= 1
+ if self._running < self.maxconcurrent and self._queued:
+ workunit, d = self._queued.pop(0)
+ for work in workunit:
+ self._running += 1
+ actuald = work.startTest().addBoth(self._run)
+ if isinstance(r, failure.Failure):
+ r.trap()
+
+ print "Callback fired!"
+ print r['start_time']
+ print r['end_time']
+ print r['run_time']
+ print repr(r)
+ return r
+
+ def push(self, workunit):
+ if self._running < self.maxconcurrent:
+ for work in workunit:
+ self._running += 1
+ work.startTest().addBoth(self._run)
+ return
+ d = defer.Deferred()
+ self._queued.append((workunit, d))
+ return d
+
+class WorkUnit(object):
+ """
+ This is an object responsible for completing WorkUnits it will
+ return its result in a deferred.
+
+ The execution of a unit of work should be Atomic.
+
+ Reporting to the OONI-net happens on completion of a Unit of Work.
+
+ @Node node: This represents the node associated with the Work Unit
+ @Asset asset: This is the asset associated with the Work Unit
+ @Test test: This represents the Test to be with the specified assets
+ @ivar arguments: These are the extra attributes to be passsed to the Test
+ """
+
+ node = None
+ asset = None
+ test = None
+ arguments = None
+
+ def __init__(self, node, asset, test, idx, arguments=None):
+ self.asset = asset
+ self.assetGenerator = iter(asset)
+ self.Test = test
+ self.node = node
+ self.arguments = arguments
+ self.idx = idx
+
+ def __iter__(self):
+ return self
+
+ def __repr__(self):
+ return "<WorkUnit %s %s %s>" % (self.arguments, self.Test, self.idx)
+
+ def serialize(self):
+ """
+ Serialize this unit of work for RPC activity.
+ """
+ return yaml.dump(self)
+
+ def next(self):
+ """
+ Launches the Unit of Work with the specified assets on the node.
+ """
+ try:
+ asset = self.assetGenerator.next()
+ return self.Test(asset, self.node, self.arguments)
+ except StopIteration:
+ raise StopIteration
+
+
+class WorkGenerator(object):
+ """
+ Factory responsible for creating units of work.
+
+ This shall be run on the machine running OONI-cli. The returned WorkUnits
+ can either be run locally or on a remote OONI Node or Network Node.
+ """
+ node = LocalNode
+ size = 10
+
+ def __init__(self, asset, test, arguments=None, start=None):
+ self.assetGenerator = asset()
+ self.Test = test
+ self.arguments = arguments
+ self.idx = 0
+ if start:
+ self.skip(start)
+
+ def __iter__(self):
+ return self
+
+ def skip(self, start):
+ for j in xrange(0, start-1):
+ for i in xrange(0, self.size):
+ self.assetGenerator.next()
+ self.idx += 1
+
+ def next(self):
+ # Plank asset
+ p_asset = []
+ for i in xrange(0, self.size):
+ p_asset.append(self.assetGenerator.next())
+ self.idx += 1
+ return WorkUnit(self.node, p_asset, self.Test, self.idx, self.arguments)
+
+
diff --git a/plugoo/workers.py b/plugoo/workers.py
deleted file mode 100644
index 3c8397d..0000000
--- a/plugoo/workers.py
+++ /dev/null
@@ -1,36 +0,0 @@
-import gevent
-from gevent.pool import Pool
-
-class WorkFactory:
- """
- This class is responsible for producing
- units of work.
- """
- def __init__(self, assets=None,
- nodes=None, rule=None):
- pass
-
- def _process_rule(self):
- pass
-
- def get_work_unit():
- pass
-
-class UnitOfWork:
- def __init__(self, tests, poolsize=20,
- unit_of_work=None):
- pass
-
- def _read_unit_of_work(self):
- pass
-
- def _build_pools(self):
- for i, x in enumerate(self.tests):
- if i % self.poolsize == 0:
-
-
- def do(self):
- with gevent.Timeout():
- self.pool.join()
-
-
diff --git a/skel.py b/skel.py
new file mode 100644
index 0000000..8982106
--- /dev/null
+++ b/skel.py
@@ -0,0 +1,10 @@
+#!/usr/bin/env python
+# -*- coding: UTF-8
+"""
+ XXX
+ ***
+
+ :copyright: (c) 2012 by Arturo Filastò
+ :license: see LICENSE for more details.
+"""
+
diff --git a/unittest/tests.py b/unittest/tests.py
new file mode 100644
index 0000000..da7c7d0
--- /dev/null
+++ b/unittest/tests.py
@@ -0,0 +1,27 @@
+from twisted.internet import reactor
+from plugoo import *
+
+class StupidAsset(object):
+ def __init__(self):
+ self.idx = 0
+
+ def __iter__(self):
+ return self
+
+ def next(self):
+ if self.idx > 30:
+ raise StopIteration
+ self.idx += 1
+ return self.idx
+
+wgen = work.WorkGenerator(StupidAsset, tests.StupidTest, {'bla': 'aaa'}, start=0)
+worker = work.Worker()
+for x in wgen:
+ print "------"
+ print "Work unit"
+ print "------"
+ worker.push(x)
+ print "------"
+
+reactor.run()
+