commit d5b4089a20f9abc11beaf1a454b895389b8e9ed9 Author: Arturo Filastò art@fuffa.org Date: Mon Apr 22 17:27:29 2013 +0200
Further debugging and code robustness
* Make startup logic more robust * Use a deferred to keep track of director activity * Fix some typos --- ooni/director.py | 10 +++++++++- ooni/managers.py | 8 ++++---- ooni/nettest.py | 5 ++++- ooni/oonicli.py | 4 ++-- ooni/reporter.py | 17 ++++++----------- ooni/tests/test_nettest.py | 26 -------------------------- ooniprobe.conf.sample | 4 ++-- 7 files changed, 27 insertions(+), 47 deletions(-)
diff --git a/ooni/director.py b/ooni/director.py index e963049..7ad99a9 100644 --- a/ooni/director.py +++ b/ooni/director.py @@ -83,6 +83,10 @@ class Director(object):
self.torControlProtocol = None
+ # This deferred is fired once all the measurements and their reporting + # tasks are completed. + self.allTestsDone = defer.Deferred() + @defer.inlineCallbacks def start(self): if config.privacy.includepcap: @@ -173,6 +177,9 @@ class Director(object):
def netTestDone(self, result, net_test): self.activeNetTests.remove(net_test) + if len(self.activeNetTests) == 0: + self.allTestsDone.callback(None) + self.allTestsDone = defer.Deferred()
@defer.inlineCallbacks def startNetTest(self, _, net_test_loader, reporters): @@ -195,8 +202,9 @@ class Director(object): self.measurementManager.schedule(net_test.generateMeasurements())
self.activeNetTests.append(net_test) - net_test.done.addBoth(self.netTestDone, net_test) + net_test.done.addBoth(report.close) + net_test.done.addBoth(self.netTestDone, net_test)
yield net_test.done
diff --git a/ooni/managers.py b/ooni/managers.py index d54c169..9f8366f 100644 --- a/ooni/managers.py +++ b/ooni/managers.py @@ -17,7 +17,6 @@ def makeIterable(item):
class TaskManager(object): retries = 2 - concurrency = 10
def __init__(self): @@ -26,22 +25,23 @@ class TaskManager(object): self.failures = []
def _failed(self, failure, task): - # XXX INFINITE RECURSION LOOP INSIDE OF THIS THING """ The has failed to complete, we append it to the end of the task chain to be re-run once all the currently scheduled tasks have run. """ - log.err("Task %s has failed" % task) + log.err("Task %s has failed %s times" % (task, task.failures)) log.exception(failure)
self._active_tasks.remove(task) self.failures.append((failure, task))
if task.failures <= self.retries: + log.debug("Rescheduling...") self._tasks = itertools.chain(self._tasks, makeIterable(task)) else: # This fires the errback when the task is done but has failed. + log.err('Permanent failure for %s' % task) task.done.errback(failure)
self._fillSlots() @@ -141,7 +141,7 @@ class MeasurementManager(TaskManager): NetTest on the contrary is aware of the typology of measurements that it is dispatching as they are logically grouped by test file. """ - retries = config.advanced.measuement_retries + retries = config.advanced.measurement_retries concurrency = config.advanced.measurement_concurrency
def succeeded(self, result, measurement): diff --git a/ooni/nettest.py b/ooni/nettest.py index f0cfa0d..50c0cf0 100644 --- a/ooni/nettest.py +++ b/ooni/nettest.py @@ -242,6 +242,8 @@ class NetTestState(object): self.tasks += 1
def checkAllTasksDone(self): + log.debug("Checking all tasks for completion %s == %s" % + (self.doneTasks, self.tasks)) if self.completedScheduling and \ self.doneTasks == self.tasks: self.allTasksDone.callback(self.doneTasks) @@ -324,7 +326,8 @@ class NetTest(object):
if self.director: measurement.done.addCallback(self.director.measurementSucceeded) - measurement.done.addErrback(self.director.measurementFailed, measurement) + measurement.done.addErrback(self.director.measurementFailed, + measurement)
if self.report: measurement.done.addBoth(self.report.write) diff --git a/ooni/oonicli.py b/ooni/oonicli.py index 0bc5364..c64710e 100644 --- a/ooni/oonicli.py +++ b/ooni/oonicli.py @@ -160,7 +160,7 @@ def runWithDirector(): reporters = [yaml_reporter]
if collector and collector.startswith('httpo') \ - and not config.tor_state: + and (not (config.tor_state or config.tor.socks_port)): raise errors.TorNotRunning elif collector: log.msg("Reporting using collector: %s" % collector) @@ -173,7 +173,7 @@ def runWithDirector():
log.debug("adding callback for startNetTest") d.addCallback(director.startNetTest, net_test_loader, reporters) - d.addBoth(shutdown) + director.allTestsDone.addBoth(shutdown)
def start(): d.addCallback(post_director_start) diff --git a/ooni/reporter.py b/ooni/reporter.py index 21a46ca..b04b46b 100644 --- a/ooni/reporter.py +++ b/ooni/reporter.py @@ -429,23 +429,18 @@ class Report(object): been written or errbacks when no more reporters """ all_written = defer.Deferred() - self._reporters_written = 0 + self._report_write_completed = []
for reporter in self.reporters[:]: - def report_written(result): - self._reporters_written += 1 - if len(self.reporters) == self._reporters_written: - all_written.callback(self._reporters_written) - - def report_failed(failure): - log.err("Failed writing report entry") - log.exception(failure) + def report_completed(result): + self._report_write_completed.append(result) + if len(self.reporters) == len(self._report_write_completed): + all_written.callback(self._report_write_completed)
report_entry_task = ReportEntry(reporter, measurement) self.reportEntryManager.schedule(report_entry_task)
- report_entry_task.done.addCallback(report_written) - report_entry_task.done.addErrback(report_failed) + report_entry_task.done.addBoth(report_completed)
return all_written
diff --git a/ooni/tests/test_nettest.py b/ooni/tests/test_nettest.py index 43f03e0..3b59cc4 100644 --- a/ooni/tests/test_nettest.py +++ b/ooni/tests/test_nettest.py @@ -239,29 +239,3 @@ class TestNetTest(unittest.TestCase):
for test_class, method in ntl.testCases: self.assertTrue(test_class.requiresRoot) - - #def test_require_root_failed(self): - # #XXX: will fail if you run as root - # try: - # net_test = NetTestLoader(StringIO(net_test_root_required), - # dummyArgs) - # except errors.InsufficientPrivileges: - # pass - - #def test_create_report_succeed(self): - # pass - - #def test_create_report_failed(self): - # pass - - #def test_run_all_test(self): - # raise NotImplementedError - - #def test_resume_test(self): - # pass - - #def test_progress(self): - # pass - - #def test_time_out(self): - # raise NotImplementedError diff --git a/ooniprobe.conf.sample b/ooniprobe.conf.sample index aeb96c2..ab01583 100644 --- a/ooniprobe.conf.sample +++ b/ooniprobe.conf.sample @@ -36,13 +36,13 @@ advanced: # After how many seconds we should give up on a particular measurement measurement_timeout: 30 # After how many retries we should give up on a measurement - measurement_retry: 2 + measurement_retries: 2 # How many measurments to perform concurrently measurement_concurrency = 100 # After how may seconds we should give up reporting reporting_timeout: 30 # After how many retries to give up on reporting - reporting_retry: 3 + reporting_retries: 3 # How many reports to perform concurrently reporting_concurrency: 20 tor:
tor-commits@lists.torproject.org