[tor-commits] [ooni-probe/master] * Start implementing worker

art at torproject.org art at torproject.org
Wed Mar 21 03:33:15 UTC 2012


commit 2e6edd21f3a7b272c2c487b8ce399fe2e0fde3a3
Author: Arturo Filastò <hellais at gmail.com>
Date:   Fri Mar 16 20:16:55 2012 -0700

    * Start implementing worker
    * Add hackish resume support
---
 plugoo/tests.py   |   25 ++++++++++++++++++++-----
 plugoo/workers.py |   36 ++++++++++++++++++++++++++++++++++++
 2 files changed, 56 insertions(+), 5 deletions(-)

diff --git a/plugoo/tests.py b/plugoo/tests.py
index d57c0a8..3d6b36b 100644
--- a/plugoo/tests.py
+++ b/plugoo/tests.py
@@ -32,7 +32,7 @@ class Test:
         """
         pass
 
-    def load_assets(self, assets):
+    def load_assets(self, assets, index=None):
         """
         Takes as input an array of Asset objects and
         outputs an iterator for the loaded assets.
@@ -57,21 +57,36 @@ class Test:
             smallassets = list(assets)
             smallassets.pop(bigidx)
 
+        i = 0
         for x in assets[bigidx]:
             if asset_count > 1:
                 # XXX this will only work in python 2.6, maybe refactor?
                 for comb in itertools.product(*smallassets):
-                    yield (x,) + comb
+                    if index and i < index:
+                        i += 1
+                    else:
+                        yield (x,) + comb
             else:
-                yield (x)
+                if index and i < index:
+                    i += 1
+                else:
+                    yield (x)
 
-    def run(self, assets=None, buffer=10, timeout=100000):
+    def run(self, assets=None, extradata=None, buffer=10, timeout=100000):
         self.logger.info("Starting %s", self.name)
         jobs = []
         if assets:
             self.logger.debug("Running through tests")
-            for i, data in enumerate(self.load_assets(assets)):
+
+            if extradata['index']:
+                index = extradata['index']
+            else:
+                index = None
+
+            for i, data in enumerate(self.load_assets(assets, index)):
                 args = {'data': data}
+                if extradata:
+                    args = dict(args.items()+extradata.items())
                 # Append to the job queue
                 jobs.append(gevent.spawn(self.experiment, **args))
                 # If the buffer is full run the jobs
diff --git a/plugoo/workers.py b/plugoo/workers.py
new file mode 100644
index 0000000..3c8397d
--- /dev/null
+++ b/plugoo/workers.py
@@ -0,0 +1,36 @@
+import gevent
+from gevent.pool import Pool
+
+class WorkFactory:
+    """
+    This class is responsible for producing
+    units of work.
+    """
+    def __init__(self, assets=None,
+                 nodes=None, rule=None):
+        pass
+
+    def _process_rule(self):
+        pass
+
+    def get_work_unit():
+        pass
+
+class UnitOfWork:
+    def __init__(self, tests, poolsize=20,
+                 unit_of_work=None):
+        pass
+
+    def _read_unit_of_work(self):
+        pass
+
+    def _build_pools(self):
+        for i, x in enumerate(self.tests):
+            if i % self.poolsize == 0:
+
+
+    def do(self):
+        with gevent.Timeout():
+            self.pool.join()
+
+





More information about the tor-commits mailing list