[ooni-probe/master] * Start implementing worker

commit 2e6edd21f3a7b272c2c487b8ce399fe2e0fde3a3 Author: Arturo Filastò <hellais@gmail.com> Date: Fri Mar 16 20:16:55 2012 -0700 * Start implementing worker * Add hackish resume support --- plugoo/tests.py | 25 ++++++++++++++++++++----- plugoo/workers.py | 36 ++++++++++++++++++++++++++++++++++++ 2 files changed, 56 insertions(+), 5 deletions(-) diff --git a/plugoo/tests.py b/plugoo/tests.py index d57c0a8..3d6b36b 100644 --- a/plugoo/tests.py +++ b/plugoo/tests.py @@ -32,7 +32,7 @@ class Test: """ pass - def load_assets(self, assets): + def load_assets(self, assets, index=None): """ Takes as input an array of Asset objects and outputs an iterator for the loaded assets. @@ -57,21 +57,36 @@ class Test: smallassets = list(assets) smallassets.pop(bigidx) + i = 0 for x in assets[bigidx]: if asset_count > 1: # XXX this will only work in python 2.6, maybe refactor? for comb in itertools.product(*smallassets): - yield (x,) + comb + if index and i < index: + i += 1 + else: + yield (x,) + comb else: - yield (x) + if index and i < index: + i += 1 + else: + yield (x) - def run(self, assets=None, buffer=10, timeout=100000): + def run(self, assets=None, extradata=None, buffer=10, timeout=100000): self.logger.info("Starting %s", self.name) jobs = [] if assets: self.logger.debug("Running through tests") - for i, data in enumerate(self.load_assets(assets)): + + if extradata['index']: + index = extradata['index'] + else: + index = None + + for i, data in enumerate(self.load_assets(assets, index)): args = {'data': data} + if extradata: + args = dict(args.items()+extradata.items()) # Append to the job queue jobs.append(gevent.spawn(self.experiment, **args)) # If the buffer is full run the jobs diff --git a/plugoo/workers.py b/plugoo/workers.py new file mode 100644 index 0000000..3c8397d --- /dev/null +++ b/plugoo/workers.py @@ -0,0 +1,36 @@ +import gevent +from gevent.pool import Pool + +class WorkFactory: + """ + This class is responsible for producing + units of work. + """ + def __init__(self, assets=None, + nodes=None, rule=None): + pass + + def _process_rule(self): + pass + + def get_work_unit(): + pass + +class UnitOfWork: + def __init__(self, tests, poolsize=20, + unit_of_work=None): + pass + + def _read_unit_of_work(self): + pass + + def _build_pools(self): + for i, x in enumerate(self.tests): + if i % self.poolsize == 0: + + + def do(self): + with gevent.Timeout(): + self.pool.join() + +
participants (1)
-
art@torproject.org