commit f7bfd1198d5b13ad571838372f7316488a6fb3c4 Author: Arturo Filastò art@fuffa.org Date: Thu Apr 11 11:59:46 2013 +0200
Make reporting more robust
Get rid of ridiculous logic that uses a deferred list, instead keep track of the report task via a single deferred. --- ooni/director.py | 6 +- ooni/reporter.py | 166 +++++++++++++++++++++++++++++------------------------- 2 files changed, 93 insertions(+), 79 deletions(-)
diff --git a/ooni/director.py b/ooni/director.py index ec4e461..e963049 100644 --- a/ooni/director.py +++ b/ooni/director.py @@ -189,14 +189,16 @@ class Director(object):
net_test = NetTest(net_test_loader, report) net_test.director = self - net_test.report.open() + + yield net_test.report.open()
self.measurementManager.schedule(net_test.generateMeasurements())
self.activeNetTests.append(net_test) net_test.done.addBoth(self.netTestDone, net_test) net_test.done.addBoth(report.close) - return net_test.done + + yield net_test.done
def startSniffing(self): """ Start sniffing with Scapy. Exits if required privileges (root) are not diff --git a/ooni/reporter.py b/ooni/reporter.py index a3811b7..5338f3a 100644 --- a/ooni/reporter.py +++ b/ooni/reporter.py @@ -26,11 +26,10 @@ except ImportError: log.err("Scapy is not installed.")
-from ooni.errors import InvalidOONIBCollectorAddress -from ooni.errors import ReportNotCreated, ReportAlreadyClosed +from ooni import errors
from ooni import otime -from ooni.utils import pushFilenameStack +from ooni.utils import geodata, pushFilenameStack from ooni.utils.net import BodyReceiver, StringProducer, userAgents
from ooni import config @@ -119,7 +118,6 @@ def safe_dump(data, stream=None, **kw):
class OReporter(object): def __init__(self, test_details): - self.created = defer.Deferred() self.testDetails = test_details
def createReport(self): @@ -190,9 +188,9 @@ class YAMLReporter(OReporter):
def _write(self, format_string, *args): if not self._stream: - raise ReportNotCreated + raise errors.ReportNotCreated if self._stream.closed: - raise ReportAlreadyClosed + raise errors.ReportAlreadyClosed s = str(format_string) assert isinstance(s, type('')) if args: @@ -225,8 +223,6 @@ class YAMLReporter(OReporter): self._writeln("###########################################")
self.writeReportEntry(self.testDetails) - self.created.callback(self) - return defer.succeed(None)
def finish(self): self._stream.close() @@ -260,7 +256,7 @@ class OONIBReporter(OReporter): regexp = '^(http|httpo)://[a-zA-Z0-9-.]+(:\d+)?$' if not re.match(regexp, self.collectorAddress) or \ len(self.collectorAddress) < 30: - raise InvalidOONIBCollectorAddress + raise errors.InvalidOONIBCollectorAddress
@defer.inlineCallbacks def writeReportEntry(self, entry): @@ -306,7 +302,7 @@ class OONIBReporter(OReporter): self.agent = Agent(reactor, sockshost="127.0.0.1", socksport=int(config.tor.socks_port)) except Exception, e: - yield defer.fail(e) + log.exception(e)
url = self.collectorAddress + '/report'
@@ -339,11 +335,16 @@ class OONIBReporter(OReporter): bodyProducer=bodyProducer) except ConnectionRefusedError: log.err("Connection to reporting backend failed (ConnectionRefusedError)") - self.created.errback(defer.fail(OONIBReportCreationError)) + raise OONIBReportCreationError + + except errors.HostUnreachable: + log.err("Host is not reachable (HostUnreachable error") + raise OONIBReportCreationError
except Exception, e: + log.err("Failed to connect to reporter backend") log.exception(e) - yield defer.fail(OONIBReportCreationError) + raise OONIBReportCreationError
# This is a little trix to allow us to unspool the response. We create # a deferred and call yield on it. @@ -355,13 +356,13 @@ class OONIBReporter(OReporter): try: parsed_response = json.loads(backend_response) except Exception, e: + log.err("Failed to parse collector response") log.exception(e) - yield defer.fail(e) + raise OONIBReportCreationError
self.reportID = parsed_response['report_id'] self.backendVersion = parsed_response['backend_version'] log.debug("Created report with id %s" % parsed_response['report_id']) - self.created.callback(self)
class ReportClosed(Exception): pass @@ -386,42 +387,39 @@ class Report(object): self.done = defer.Deferred() self.reportEntryManager = reportEntryManager
+ self._reporters_openned = 0 + self._reporters_written = 0 + self._reporters_closed = 0 + def open(self): """ This will create all the reports that need to be created and fires the created callback of the reporter whose report got created. """ - l = [] + all_openned = defer.Deferred() + for reporter in self.reporters[:]: - d = reporter.createReport() - d.addErrback(self.failedOpeningReport, reporter) - reporter.created.addErrback(self.failedOpeningReport, reporter) - l.append(reporter.created) - log.debug("Reporters created: %s" % l) - # Should we consume errors silently? - dl = defer.DeferredList(l) - return dl
- def failedOpeningReport(self, failure, reporter): - """ - This errback get's called every time we fail to create a report. - By fail we mean that the number of retries has exceeded. - Once a report has failed to be created with a reporter we give up and - remove the reporter from the list of reporters to write to. - """ - log.err("Failed to open %s reporter, giving up..." % reporter) - log.err("Reporter %s failed, removing from report..." % reporter) - self.reporters.remove(reporter) - # Don't forward the exception unless there are no more reporters - if len(self.reporters) == 0: - log.err("Removed last reporter %s" % reporter) - raise NoMoreReporters + def report_created(result): + self._reporters_openned += 1 + if len(self.reporters) == self._reporters_openned: + all_openned.callback(self._reporters_openned) + + def report_failed(failure): + print "WE HAVE FAILED!" + try: + self.failedOpeningReport(failure, reporter) + except errors.NoMoreReporters, e: + all_openned.errback(defer.fail(e)) + + d = defer.maybeDeferred(reporter.createReport) + d.addErrback(report_failed) + d.addCallback(report_created) + + return all_openned
def write(self, measurement): """ - This is a lazy call that will write to all the reporters by waiting on - them to be created. - Will return a deferred that will fire once the report for the specified measurement have been written to all the reporters.
@@ -431,41 +429,45 @@ class Report(object): an instance of :class:ooni.tasks.Measurement
Returns: - a deferred list that will fire once all the report entries have - been written. + a deferred that will fire once all the report entries have + been written or errbacks when no more reporters """ - l = [] + all_written = defer.Deferred() + for reporter in self.reporters[:]: - report_write_task = ReportEntry(reporter, measurement) - def scheduleWriteReportEntry(result): - self.reportEntryManager.schedule(report_write_task) - - # delay scheduling the task until after the report is created - log.debug("Adding report entry task %s" % report_write_task) - reporter.created.addCallback(scheduleWriteReportEntry) - - # if the write task fails n times, kill the reporter - report_write_task.done.addErrback(self.failedOpeningReport, reporter) - l.append(report_write_task.done) - - # XXX: This seems a bit fragile. - # failedOpeningReport will forward the errback if the remaining - # reporter has failed. If we fireOnOneErrback, this means that - # the caller of report.write is responsible for attaching an - # errback to the returned deferred and handle this case. That - # probably means stopping the net test. - - # Here, fireOnOneErrback means to call the deferredlists errback - # as soon as any of the deferreds return a failure. consumeErrors - # is used to prevent any uncaught failures from raising an - # exception. Alternately we could attach a logger to the errback - # of each deferred and it would have the same effect - - # Probably the better thing to do here would be to add a callback - # to the deferredlist that checks to see if any reporters are left - # and raise an exception if there are no remaining reporters - dl = defer.DeferredList(l,fireOnOneErrback=True, consumeErrors=True) - return dl + def report_written(result): + self._reporters_written += 1 + if len(self.reporters) == self._reporters_written: + all_written.callback(self._reporters_written) + + def report_failed(failure): + log.err("Failed writing report entry") + log.exception(failure) + + report_entry_task = ReportEntry(reporter, measurement) + self.reportEntryManager.schedule(report_entry_task) + + report_entry_task.done.addCallback(report_written) + report_entry_task.done.addErrback(report_failed) + + return all_written + + def failedOpeningReport(self, failure, reporter): + """ + This errback get's called every time we fail to create a report. + By fail we mean that the number of retries has exceeded. + Once a report has failed to be created with a reporter we give up and + remove the reporter from the list of reporters to write to. + """ + log.err("Failed to open %s reporter, giving up..." % reporter) + log.err("Reporter %s failed, removing from report..." % reporter) + log.exception(failure) + self.reporters.remove(reporter) + # Don't forward the exception unless there are no more reporters + if len(self.reporters) == 0: + log.err("Removed last reporter %s" % reporter) + raise errors.NoMoreReporters + return
def close(self, _): """ @@ -476,10 +478,20 @@ class Report(object): all the reports have been closed.
""" - l = [] + all_closed = defer.Deferred() + for reporter in self.reporters[:]: + def report_closed(result): + self._reporters_closed += 1 + if len(self.reporters) == self._reporters_closed: + all_closed.callback(self._reporters_closed) + + def report_failed(failure): + log.err("Failed closing report") + log.exception(failure) + d = defer.maybeDeferred(reporter.finish) - l.append(d) - dl = defer.DeferredList(l) - return dl + d.addCallback(report_closed) + d.addErrback(report_failed)
+ return all_closed