commit 113c237d387ac9c3984d99f908dc914d6f18d2a8 Author: Arturo Filastò hellais@torproject.org Date: Wed May 2 16:53:54 2012 +0200
Continue implementing twisted version of OONI --- oonicli.py | 57 +++++++++++++++++++ ooniprobe.py | 3 +- plugins/__init__.py | 3 + plugins/dropin.cache | 48 ++++++++++++++++ plugins/skel.py | 22 ++++++++ plugoo/interface.py | 2 + plugoo/nodes.py | 12 +++-- plugoo/tests.py | 135 +++++++++++++++------------------------------- plugoo/work.py | 146 ++++++++++++++++++++++++++++++++++++++++++++++++++ plugoo/workers.py | 36 ------------ skel.py | 10 ++++ unittest/tests.py | 27 +++++++++ 12 files changed, 368 insertions(+), 133 deletions(-)
diff --git a/oonicli.py b/oonicli.py new file mode 100755 index 0000000..d77ad0f --- /dev/null +++ b/oonicli.py @@ -0,0 +1,57 @@ +#!/usr/bin/env python +# -*- coding: UTF-8 +# +# oonicli +# ******* +# +# :copyright: (c) 2012 by Arturo Filastò +# :license: see LICENSE for more details. +# + +from plugoo import tests + +from twisted.python import usage + +from twisted.plugin import getPlugins + +from zope.interface.exceptions import BrokenImplementation +from zope.interface.exceptions import BrokenMethodImplementation +from zope.interface.verify import verifyObject +import plugins + +def retrieve_plugoo(): + interface = tests.ITest + d = {} + error = False + for p in getPlugins(interface, plugins): + try: + verifyObject(interface, p) + d[p.shortName] = p + except BrokenImplementation, bi: + print "Plugin Broke" + print bi + error = True + except BrokenMethodImplementation, bmi: + print "Plugin Broke" + error = True + if error != False: + print "Plugin Loaded!" + return d + +plugoo = retrieve_plugoo() + +class Options(usage.Options): + tests = plugoo.keys() + subCommands = [] + for test in tests: + subCommands.append([test, None, plugoo[test].arguments, "Run the %s test" % test]) + + optParameters = [ + ['status', 's', 0, 'Show current state'], + ['restart', 'r', None, 'Restart OONI'], + ['node', 'n', 'localhost:31415', 'Select target node'] + ] + +config = Options() +config.parseOptions() + diff --git a/ooniprobe.py b/ooniprobe.py index 9d9c0ac..3e8fbe7 100755 --- a/ooniprobe.py +++ b/ooniprobe.py @@ -1,7 +1,7 @@ #!/usr/bin/env python # -*- coding: UTF-8 """ - ooni-probe + ooniprobe **********
Open Observatory of Network Interference @@ -152,7 +152,6 @@ class ooni(object): self.tests[test].module.run(self)
if __name__ == "__main__": - o = ooni()
parser = argparse.ArgumentParser(formatter_class=argparse.RawDescriptionHelpFormatter, diff --git a/plugins/__init__.py b/plugins/__init__.py new file mode 100644 index 0000000..ddb8691 --- /dev/null +++ b/plugins/__init__.py @@ -0,0 +1,3 @@ +from twisted.plugin import pluginPackagePaths +__path__.extend(pluginPackagePaths(__name__)) +__all__ = [] diff --git a/plugins/dropin.cache b/plugins/dropin.cache new file mode 100755 index 0000000..7bc3990 --- /dev/null +++ b/plugins/dropin.cache @@ -0,0 +1,48 @@ +(dp1 +S'skel' +p2 +ccopy_reg +_reconstructor +p3 +(ctwisted.plugin +CachedDropin +p4 +c__builtin__ +object +p5 +NtRp6 +(dp7 +S'moduleName' +p8 +S'plugins.skel' +p9 +sS'description' +p10 +NsS'plugins' +p11 +(lp12 +g3 +(ctwisted.plugin +CachedPlugin +p13 +g5 +NtRp14 +(dp15 +S'provided' +p16 +(lp17 +ctwisted.plugin +IPlugin +p18 +acplugoo.tests +ITest +p19 +asS'dropin' +p20 +g6 +sS'name' +p21 +S'skel' +p22 +sg10 +Nsbasbs. \ No newline at end of file diff --git a/plugins/skel.py b/plugins/skel.py new file mode 100644 index 0000000..d27932a --- /dev/null +++ b/plugins/skel.py @@ -0,0 +1,22 @@ +from zope.interface import implements +from twisted.python import usage +from twisted.plugin import IPlugin +from plugoo.tests import ITest + +class SkelArgs(usage.Options): + optParameters = [['assets', 'a', None, 'Asset file'], + ['resume', 'r', None, 'Resume at this index']] + +class SkelTest(object): + implements(IPlugin, ITest) + + shortName = "skeleton" + description = "Skeleton plugin" + requirements = None + arguments = SkelArgs + + def startTest(): + pass + +skel = SkelTest() + diff --git a/plugoo/interface.py b/plugoo/interface.py new file mode 100644 index 0000000..72f58d7 --- /dev/null +++ b/plugoo/interface.py @@ -0,0 +1,2 @@ +from zope.interface import implements, Interface, Attribute + diff --git a/plugoo/nodes.py b/plugoo/nodes.py index 5b16d22..6cdba65 100644 --- a/plugoo/nodes.py +++ b/plugoo/nodes.py @@ -33,6 +33,10 @@ class Node(object): self.address = address self.port = port
+class LocalNode(object): + def __init__(self): + pass + """ []: node = NetworkNode("192.168.0.112", 5555, "SOCKS5") []: node_socket = node.wrap_socket() @@ -91,7 +95,7 @@ class CodeExecNode(Node): def get_status(self): pass
-class PlanetLab(Node, CodeExecNode): +class PlanetLab(CodeExecNode): def __init__(self, address, auth_creds, ooni): self.auth_creds = auth_creds
@@ -147,7 +151,7 @@ class PlanetLab(Node, CodeExecNode): except paramiko.SSHException: print 'Public key authentication to PlanetLab node %s failed.' % machinename,
- def _get_command: + def _get_command(): pass
def ssh_and_run_(slicename, machinename, command): @@ -169,8 +173,8 @@ class PlanetLab(Node, CodeExecNode): """Attempt to rsync a tree to the PL node.""" pass
- def add_unit: + def add_unit(): pass
- def get_status: + def get_status(): pass diff --git a/plugoo/tests.py b/plugoo/tests.py index 9a9f1de..4c3e1a1 100644 --- a/plugoo/tests.py +++ b/plugoo/tests.py @@ -1,10 +1,13 @@ import os from datetime import datetime import yaml +from zope.interface import Interface, Attribute
import logging import itertools import gevent +from twisted.internet import reactor, defer +from twisted.python import failure from plugoo.reports import Report
class Test: @@ -106,48 +109,11 @@ class Test: #print "JOB VAL: %s" % job.value self.logger.info("Writing report(s)") self.report(job.value) - job.kill() + job.kill() jobs = [] else: self.logger.error("No Assets! Dying!")
-class WorkUnit(object): - """ - This is an object responsible for completing WorkUnits it will - return its result in a deferred. - - The execution of a unit of work should be Atomic. - - Reporting to the OONI-net happens on completion of a Unit of Work. - - @Node node: This represents the node associated with the Work Unit - @Asset asset: This is the asset associated with the Work Unit - @Test test: This represents the Test to be with the specified assets - @ivar arguments: These are the extra attributes to be passsed to the Test - """ - - node = None - asset = None - test = None - arguments = None - - def __init__(self, node, asset, test, arguments): - self.assetGenerator = asset() - self.Test = test - self.node = node - self.arguments = arguments - - def next(): - """ - Launches the Unit of Work with the specified assets on the node. - """ - try: - asset = self.assetGenerator.next() - yield self.Test(asset, self.node, self.arguments) - except StopIteration: - raise StopIteration - - class ITest(Interface): """ This interface represents an OONI test. It fires a deferred on completion. @@ -156,10 +122,10 @@ class ITest(Interface): shortName = Attribute("""A short user facing description for this test""") description = Attribute("""A string containing a longer description for the test""")
- requirements = Attribtue("""What is required to run this this test, for example raw socket access or UDP or TCP""") + requirements = Attribute("""What is required to run this this test, for example raw socket access or UDP or TCP""")
- deferred = Attribute("""This will be fired on test completion""") - node = Attribute("""This represents the node that will run the test""") + #deferred = Attribute("""This will be fired on test completion""") + #node = Attribute("""This represents the node that will run the test""") arguments = Attribute("""These are the arguments to be passed to the test for it's execution""")
def startTest(): @@ -167,53 +133,8 @@ class ITest(Interface): Launches the Test with the specified arguments on a node. """
-class WorkGenerator(object): - """ - Factory responsible for creating units of work. - - This shall be run on the machine running OONI-cli. The returned WorkUnits - can either be run locally or on a remote OONI Node or Network Node. - """ - node = LocalNode - size = 10 - - def __init__(self, assets): - self.assets = assets() - - def next(self): - # Plank asset - p_asset = [] - for i in xrange(0, self.size): - p_asset.append(self.assets.next()) - yield WorkUnit(p_asset) - -def spawnDeferredTests(workunit, n): - def callback(result): - pass - - def errback(reason): - pass - - # XXX find more elegant solution to having 2 generators - workgenA = WorkGenerator(assets) - - for workunit in workgen: - deferredList = [] - workunitB = workunit - for i in range(n): - try: - test = workunit.next() - except StopIteration: - pass - - deferred = test.deferred - deferred.addCallback(callback).addErrback(errback) - - deferredList.append(deferred) - test.startTest() - - -class HTTPRequestTest(HTTPClient): +#class HTTPRequestTest(HTTPClient): +class HTTPRequestTest(object): """ This is an example of how I would like to be able to write a test.
@@ -221,9 +142,9 @@ class HTTPRequestTest(HTTPClient): kind of API that I am attempting to achieve to simplify the writing of tests.
- """ implements(ITest)
+ """ def startTest(): # The response object should also contain the request """ @@ -231,7 +152,6 @@ class HTTPRequestTest(HTTPClient): 'runtime': ..., 'timestamp': ...}, 'request': {'headers': ..., 'content', 'timestamp', ...} } - """ response = self.http_request(address, headers) if response.headers['content'].matches("Some string"): self.censorship = True @@ -240,9 +160,42 @@ class HTTPRequestTest(HTTPClient): self.censorship = False return response
+ """ + pass + +class TwistedTest(object): + def __init__(self, asset, node, arguments, ooninet=None): + self.asset = asset + self.node = node + self.arguments = arguments + self.start_time = datetime.now() + #self.ooninet = ooninet + + def __repr__(self): + return "<TwistedTest %s %s %s>" % (self.arguments, self.asset, self.node) + + def finished(self, result): + #self.ooninet.report(result) + self.end_time = datetime.now() + result['start_time'] = self.start_time + result['end_time'] = self.end_time + result['run_time'] = self.end_time - self.start_time + return self.d.callback(result) + + def startTest(self): + self.d = defer.Deferred() + result = {} + reactor.callLater(2.0, self.finished, result) + return self.d + + +class StupidTest(TwistedTest): + def __repr__(self): + return "<StupidTest %s %s %s>" % (self.arguments, self.asset, self.node) + class TwistedTestFactory(object):
- test = TwistedTest + test = StupidTest
def __init__(self, assets, node, idx=0): diff --git a/plugoo/work.py b/plugoo/work.py new file mode 100644 index 0000000..53cb18c --- /dev/null +++ b/plugoo/work.py @@ -0,0 +1,146 @@ +# -*- coding: UTF-8 +""" + work.py + ********** + + This contains all code related to generating + Units of Work and processing it. + + :copyright: (c) 2012 by Arturo Filastò. + :license: see LICENSE for more details. + +""" +from datetime import datetime +import yaml + +from zope.interface import Interface, Attribute + +from twisted.python import failure +from twisted.internet import reactor, defer + +from plugoo.nodes import LocalNode + +class Worker(object): + """ + This is the core of OONI. It takes as input Work Units and + runs them concurrently. + """ + def __init__(self, maxconcurrent=10): + self.maxconcurrent = maxconcurrent + self._running = 0 + self._queued = [] + + def _run(self, r): + self._running -= 1 + if self._running < self.maxconcurrent and self._queued: + workunit, d = self._queued.pop(0) + for work in workunit: + self._running += 1 + actuald = work.startTest().addBoth(self._run) + if isinstance(r, failure.Failure): + r.trap() + + print "Callback fired!" + print r['start_time'] + print r['end_time'] + print r['run_time'] + print repr(r) + return r + + def push(self, workunit): + if self._running < self.maxconcurrent: + for work in workunit: + self._running += 1 + work.startTest().addBoth(self._run) + return + d = defer.Deferred() + self._queued.append((workunit, d)) + return d + +class WorkUnit(object): + """ + This is an object responsible for completing WorkUnits it will + return its result in a deferred. + + The execution of a unit of work should be Atomic. + + Reporting to the OONI-net happens on completion of a Unit of Work. + + @Node node: This represents the node associated with the Work Unit + @Asset asset: This is the asset associated with the Work Unit + @Test test: This represents the Test to be with the specified assets + @ivar arguments: These are the extra attributes to be passsed to the Test + """ + + node = None + asset = None + test = None + arguments = None + + def __init__(self, node, asset, test, idx, arguments=None): + self.asset = asset + self.assetGenerator = iter(asset) + self.Test = test + self.node = node + self.arguments = arguments + self.idx = idx + + def __iter__(self): + return self + + def __repr__(self): + return "<WorkUnit %s %s %s>" % (self.arguments, self.Test, self.idx) + + def serialize(self): + """ + Serialize this unit of work for RPC activity. + """ + return yaml.dump(self) + + def next(self): + """ + Launches the Unit of Work with the specified assets on the node. + """ + try: + asset = self.assetGenerator.next() + return self.Test(asset, self.node, self.arguments) + except StopIteration: + raise StopIteration + + +class WorkGenerator(object): + """ + Factory responsible for creating units of work. + + This shall be run on the machine running OONI-cli. The returned WorkUnits + can either be run locally or on a remote OONI Node or Network Node. + """ + node = LocalNode + size = 10 + + def __init__(self, asset, test, arguments=None, start=None): + self.assetGenerator = asset() + self.Test = test + self.arguments = arguments + self.idx = 0 + if start: + self.skip(start) + + def __iter__(self): + return self + + def skip(self, start): + for j in xrange(0, start-1): + for i in xrange(0, self.size): + self.assetGenerator.next() + self.idx += 1 + + def next(self): + # Plank asset + p_asset = [] + for i in xrange(0, self.size): + p_asset.append(self.assetGenerator.next()) + self.idx += 1 + return WorkUnit(self.node, p_asset, self.Test, self.idx, self.arguments) + + diff --git a/plugoo/workers.py b/plugoo/workers.py deleted file mode 100644 index 3c8397d..0000000 --- a/plugoo/workers.py +++ /dev/null @@ -1,36 +0,0 @@ -import gevent -from gevent.pool import Pool - -class WorkFactory: - """ - This class is responsible for producing - units of work. - """ - def __init__(self, assets=None, - nodes=None, rule=None): - pass - - def _process_rule(self): - pass - - def get_work_unit(): - pass - -class UnitOfWork: - def __init__(self, tests, poolsize=20, - unit_of_work=None): - pass - - def _read_unit_of_work(self): - pass - - def _build_pools(self): - for i, x in enumerate(self.tests): - if i % self.poolsize == 0: - - - def do(self): - with gevent.Timeout(): - self.pool.join() - - diff --git a/skel.py b/skel.py new file mode 100644 index 0000000..8982106 --- /dev/null +++ b/skel.py @@ -0,0 +1,10 @@ +#!/usr/bin/env python +# -*- coding: UTF-8 +""" + XXX + *** + + :copyright: (c) 2012 by Arturo Filastò + :license: see LICENSE for more details. +""" + diff --git a/unittest/tests.py b/unittest/tests.py new file mode 100644 index 0000000..da7c7d0 --- /dev/null +++ b/unittest/tests.py @@ -0,0 +1,27 @@ +from twisted.internet import reactor +from plugoo import * + +class StupidAsset(object): + def __init__(self): + self.idx = 0 + + def __iter__(self): + return self + + def next(self): + if self.idx > 30: + raise StopIteration + self.idx += 1 + return self.idx + +wgen = work.WorkGenerator(StupidAsset, tests.StupidTest, {'bla': 'aaa'}, start=0) +worker = work.Worker() +for x in wgen: + print "------" + print "Work unit" + print "------" + worker.push(x) + print "------" + +reactor.run() +