commit d5b4089a20f9abc11beaf1a454b895389b8e9ed9
Author: Arturo Filastò <art(a)fuffa.org>
Date: Mon Apr 22 17:27:29 2013 +0200
Further debugging and code robustness
* Make startup logic more robust
* Use a deferred to keep track of director activity
* Fix some typos
---
ooni/director.py | 10 +++++++++-
ooni/managers.py | 8 ++++----
ooni/nettest.py | 5 ++++-
ooni/oonicli.py | 4 ++--
ooni/reporter.py | 17 ++++++-----------
ooni/tests/test_nettest.py | 26 --------------------------
ooniprobe.conf.sample | 4 ++--
7 files changed, 27 insertions(+), 47 deletions(-)
diff --git a/ooni/director.py b/ooni/director.py
index e963049..7ad99a9 100644
--- a/ooni/director.py
+++ b/ooni/director.py
@@ -83,6 +83,10 @@ class Director(object):
self.torControlProtocol = None
+ # This deferred is fired once all the measurements and their reporting
+ # tasks are completed.
+ self.allTestsDone = defer.Deferred()
+
@defer.inlineCallbacks
def start(self):
if config.privacy.includepcap:
@@ -173,6 +177,9 @@ class Director(object):
def netTestDone(self, result, net_test):
self.activeNetTests.remove(net_test)
+ if len(self.activeNetTests) == 0:
+ self.allTestsDone.callback(None)
+ self.allTestsDone = defer.Deferred()
@defer.inlineCallbacks
def startNetTest(self, _, net_test_loader, reporters):
@@ -195,8 +202,9 @@ class Director(object):
self.measurementManager.schedule(net_test.generateMeasurements())
self.activeNetTests.append(net_test)
- net_test.done.addBoth(self.netTestDone, net_test)
+
net_test.done.addBoth(report.close)
+ net_test.done.addBoth(self.netTestDone, net_test)
yield net_test.done
diff --git a/ooni/managers.py b/ooni/managers.py
index d54c169..9f8366f 100644
--- a/ooni/managers.py
+++ b/ooni/managers.py
@@ -17,7 +17,6 @@ def makeIterable(item):
class TaskManager(object):
retries = 2
-
concurrency = 10
def __init__(self):
@@ -26,22 +25,23 @@ class TaskManager(object):
self.failures = []
def _failed(self, failure, task):
- # XXX INFINITE RECURSION LOOP INSIDE OF THIS THING
"""
The has failed to complete, we append it to the end of the task chain
to be re-run once all the currently scheduled tasks have run.
"""
- log.err("Task %s has failed" % task)
+ log.err("Task %s has failed %s times" % (task, task.failures))
log.exception(failure)
self._active_tasks.remove(task)
self.failures.append((failure, task))
if task.failures <= self.retries:
+ log.debug("Rescheduling...")
self._tasks = itertools.chain(self._tasks,
makeIterable(task))
else:
# This fires the errback when the task is done but has failed.
+ log.err('Permanent failure for %s' % task)
task.done.errback(failure)
self._fillSlots()
@@ -141,7 +141,7 @@ class MeasurementManager(TaskManager):
NetTest on the contrary is aware of the typology of measurements that it is
dispatching as they are logically grouped by test file.
"""
- retries = config.advanced.measuement_retries
+ retries = config.advanced.measurement_retries
concurrency = config.advanced.measurement_concurrency
def succeeded(self, result, measurement):
diff --git a/ooni/nettest.py b/ooni/nettest.py
index f0cfa0d..50c0cf0 100644
--- a/ooni/nettest.py
+++ b/ooni/nettest.py
@@ -242,6 +242,8 @@ class NetTestState(object):
self.tasks += 1
def checkAllTasksDone(self):
+ log.debug("Checking all tasks for completion %s == %s" %
+ (self.doneTasks, self.tasks))
if self.completedScheduling and \
self.doneTasks == self.tasks:
self.allTasksDone.callback(self.doneTasks)
@@ -324,7 +326,8 @@ class NetTest(object):
if self.director:
measurement.done.addCallback(self.director.measurementSucceeded)
- measurement.done.addErrback(self.director.measurementFailed, measurement)
+ measurement.done.addErrback(self.director.measurementFailed,
+ measurement)
if self.report:
measurement.done.addBoth(self.report.write)
diff --git a/ooni/oonicli.py b/ooni/oonicli.py
index 0bc5364..c64710e 100644
--- a/ooni/oonicli.py
+++ b/ooni/oonicli.py
@@ -160,7 +160,7 @@ def runWithDirector():
reporters = [yaml_reporter]
if collector and collector.startswith('httpo') \
- and not config.tor_state:
+ and (not (config.tor_state or config.tor.socks_port)):
raise errors.TorNotRunning
elif collector:
log.msg("Reporting using collector: %s" % collector)
@@ -173,7 +173,7 @@ def runWithDirector():
log.debug("adding callback for startNetTest")
d.addCallback(director.startNetTest, net_test_loader, reporters)
- d.addBoth(shutdown)
+ director.allTestsDone.addBoth(shutdown)
def start():
d.addCallback(post_director_start)
diff --git a/ooni/reporter.py b/ooni/reporter.py
index 21a46ca..b04b46b 100644
--- a/ooni/reporter.py
+++ b/ooni/reporter.py
@@ -429,23 +429,18 @@ class Report(object):
been written or errbacks when no more reporters
"""
all_written = defer.Deferred()
- self._reporters_written = 0
+ self._report_write_completed = []
for reporter in self.reporters[:]:
- def report_written(result):
- self._reporters_written += 1
- if len(self.reporters) == self._reporters_written:
- all_written.callback(self._reporters_written)
-
- def report_failed(failure):
- log.err("Failed writing report entry")
- log.exception(failure)
+ def report_completed(result):
+ self._report_write_completed.append(result)
+ if len(self.reporters) == len(self._report_write_completed):
+ all_written.callback(self._report_write_completed)
report_entry_task = ReportEntry(reporter, measurement)
self.reportEntryManager.schedule(report_entry_task)
- report_entry_task.done.addCallback(report_written)
- report_entry_task.done.addErrback(report_failed)
+ report_entry_task.done.addBoth(report_completed)
return all_written
diff --git a/ooni/tests/test_nettest.py b/ooni/tests/test_nettest.py
index 43f03e0..3b59cc4 100644
--- a/ooni/tests/test_nettest.py
+++ b/ooni/tests/test_nettest.py
@@ -239,29 +239,3 @@ class TestNetTest(unittest.TestCase):
for test_class, method in ntl.testCases:
self.assertTrue(test_class.requiresRoot)
-
- #def test_require_root_failed(self):
- # #XXX: will fail if you run as root
- # try:
- # net_test = NetTestLoader(StringIO(net_test_root_required),
- # dummyArgs)
- # except errors.InsufficientPrivileges:
- # pass
-
- #def test_create_report_succeed(self):
- # pass
-
- #def test_create_report_failed(self):
- # pass
-
- #def test_run_all_test(self):
- # raise NotImplementedError
-
- #def test_resume_test(self):
- # pass
-
- #def test_progress(self):
- # pass
-
- #def test_time_out(self):
- # raise NotImplementedError
diff --git a/ooniprobe.conf.sample b/ooniprobe.conf.sample
index aeb96c2..ab01583 100644
--- a/ooniprobe.conf.sample
+++ b/ooniprobe.conf.sample
@@ -36,13 +36,13 @@ advanced:
# After how many seconds we should give up on a particular measurement
measurement_timeout: 30
# After how many retries we should give up on a measurement
- measurement_retry: 2
+ measurement_retries: 2
# How many measurments to perform concurrently
measurement_concurrency = 100
# After how may seconds we should give up reporting
reporting_timeout: 30
# After how many retries to give up on reporting
- reporting_retry: 3
+ reporting_retries: 3
# How many reports to perform concurrently
reporting_concurrency: 20
tor: