[tor-commits] [ooni-probe/master] Make reporting more robust

art at torproject.org art at torproject.org
Tue Apr 30 13:01:44 UTC 2013


commit f7bfd1198d5b13ad571838372f7316488a6fb3c4
Author: Arturo Filastò <art at fuffa.org>
Date:   Thu Apr 11 11:59:46 2013 +0200

    Make reporting more robust
    
    Get rid of ridiculous logic that uses a deferred list, instead keep track of
    the report task via a single deferred.
---
 ooni/director.py |    6 +-
 ooni/reporter.py |  166 +++++++++++++++++++++++++++++-------------------------
 2 files changed, 93 insertions(+), 79 deletions(-)

diff --git a/ooni/director.py b/ooni/director.py
index ec4e461..e963049 100644
--- a/ooni/director.py
+++ b/ooni/director.py
@@ -189,14 +189,16 @@ class Director(object):
 
         net_test = NetTest(net_test_loader, report)
         net_test.director = self
-        net_test.report.open()
+
+        yield net_test.report.open()
 
         self.measurementManager.schedule(net_test.generateMeasurements())
 
         self.activeNetTests.append(net_test)
         net_test.done.addBoth(self.netTestDone, net_test)
         net_test.done.addBoth(report.close)
-        return net_test.done
+
+        yield net_test.done
 
     def startSniffing(self):
         """ Start sniffing with Scapy. Exits if required privileges (root) are not
diff --git a/ooni/reporter.py b/ooni/reporter.py
index a3811b7..5338f3a 100644
--- a/ooni/reporter.py
+++ b/ooni/reporter.py
@@ -26,11 +26,10 @@ except ImportError:
     log.err("Scapy is not installed.")
 
 
-from ooni.errors import InvalidOONIBCollectorAddress
-from ooni.errors import ReportNotCreated, ReportAlreadyClosed
+from ooni import errors
 
 from ooni import otime
-from ooni.utils import pushFilenameStack
+from ooni.utils import geodata, pushFilenameStack
 from ooni.utils.net import BodyReceiver, StringProducer, userAgents
 
 from ooni import config
@@ -119,7 +118,6 @@ def safe_dump(data, stream=None, **kw):
 
 class OReporter(object):
     def __init__(self, test_details):
-        self.created = defer.Deferred()
         self.testDetails = test_details
 
     def createReport(self):
@@ -190,9 +188,9 @@ class YAMLReporter(OReporter):
 
     def _write(self, format_string, *args):
         if not self._stream:
-            raise ReportNotCreated
+            raise errors.ReportNotCreated
         if self._stream.closed:
-            raise ReportAlreadyClosed
+            raise errors.ReportAlreadyClosed
         s = str(format_string)
         assert isinstance(s, type(''))
         if args:
@@ -225,8 +223,6 @@ class YAMLReporter(OReporter):
         self._writeln("###########################################")
 
         self.writeReportEntry(self.testDetails)
-        self.created.callback(self)
-        return defer.succeed(None)
 
     def finish(self):
         self._stream.close()
@@ -260,7 +256,7 @@ class OONIBReporter(OReporter):
         regexp = '^(http|httpo):\/\/[a-zA-Z0-9\-\.]+(:\d+)?$'
         if not re.match(regexp, self.collectorAddress) or \
             len(self.collectorAddress) < 30:
-            raise InvalidOONIBCollectorAddress
+            raise errors.InvalidOONIBCollectorAddress
 
     @defer.inlineCallbacks
     def writeReportEntry(self, entry):
@@ -306,7 +302,7 @@ class OONIBReporter(OReporter):
             self.agent = Agent(reactor, sockshost="127.0.0.1",
                 socksport=int(config.tor.socks_port))
         except Exception, e:
-            yield defer.fail(e)
+            log.exception(e)
 
         url = self.collectorAddress + '/report'
 
@@ -339,11 +335,16 @@ class OONIBReporter(OReporter):
                                 bodyProducer=bodyProducer)
         except ConnectionRefusedError:
             log.err("Connection to reporting backend failed (ConnectionRefusedError)")
-            self.created.errback(defer.fail(OONIBReportCreationError))
+            raise OONIBReportCreationError
+
+        except errors.HostUnreachable:
+            log.err("Host is not reachable (HostUnreachable error")
+            raise OONIBReportCreationError
 
         except Exception, e:
+            log.err("Failed to connect to reporter backend")
             log.exception(e)
-            yield defer.fail(OONIBReportCreationError)
+            raise OONIBReportCreationError
 
         # This is a little trix to allow us to unspool the response. We create
         # a deferred and call yield on it.
@@ -355,13 +356,13 @@ class OONIBReporter(OReporter):
         try:
             parsed_response = json.loads(backend_response)
         except Exception, e:
+            log.err("Failed to parse collector response")
             log.exception(e)
-            yield defer.fail(e)
+            raise OONIBReportCreationError
 
         self.reportID = parsed_response['report_id']
         self.backendVersion = parsed_response['backend_version']
         log.debug("Created report with id %s" % parsed_response['report_id'])
-        self.created.callback(self)
 
 class ReportClosed(Exception):
     pass
@@ -386,42 +387,39 @@ class Report(object):
         self.done = defer.Deferred()
         self.reportEntryManager = reportEntryManager
 
+        self._reporters_openned = 0
+        self._reporters_written = 0
+        self._reporters_closed = 0
+
     def open(self):
         """
         This will create all the reports that need to be created and fires the
         created callback of the reporter whose report got created.
         """
-        l = []
+        all_openned = defer.Deferred()
+
         for reporter in self.reporters[:]:
-            d = reporter.createReport()
-            d.addErrback(self.failedOpeningReport, reporter)
-            reporter.created.addErrback(self.failedOpeningReport, reporter)
-            l.append(reporter.created)
-        log.debug("Reporters created: %s" % l)
-        # Should we consume errors silently?
-        dl = defer.DeferredList(l)
-        return dl
 
-    def failedOpeningReport(self, failure, reporter):
-        """
-        This errback get's called every time we fail to create a report.
-        By fail we mean that the number of retries has exceeded.
-        Once a report has failed to be created with a reporter we give up and
-        remove the reporter from the list of reporters to write to.
-        """
-        log.err("Failed to open %s reporter, giving up..." % reporter)
-        log.err("Reporter %s failed, removing from report..." % reporter)
-        self.reporters.remove(reporter)
-        # Don't forward the exception unless there are no more reporters
-        if len(self.reporters) == 0:
-            log.err("Removed last reporter %s" % reporter)
-            raise NoMoreReporters
+            def report_created(result):
+                self._reporters_openned += 1
+                if len(self.reporters) == self._reporters_openned:
+                    all_openned.callback(self._reporters_openned)
+
+            def report_failed(failure):
+                print "WE HAVE FAILED!"
+                try:
+                    self.failedOpeningReport(failure, reporter)
+                except errors.NoMoreReporters, e:
+                    all_openned.errback(defer.fail(e))
+
+            d = defer.maybeDeferred(reporter.createReport)
+            d.addErrback(report_failed)
+            d.addCallback(report_created)
+
+        return all_openned
 
     def write(self, measurement):
         """
-        This is a lazy call that will write to all the reporters by waiting on
-        them to be created.
-
         Will return a deferred that will fire once the report for the specified
         measurement have been written to all the reporters.
 
@@ -431,41 +429,45 @@ class Report(object):
                 an instance of :class:ooni.tasks.Measurement
 
         Returns:
-            a deferred list that will fire once all the report entries have
-            been written.
+            a deferred that will fire once all the report entries have
+            been written or errbacks when no more reporters
         """
-        l = []
+        all_written = defer.Deferred()
+
         for reporter in self.reporters[:]:
-            report_write_task = ReportEntry(reporter, measurement)
-            def scheduleWriteReportEntry(result):
-                self.reportEntryManager.schedule(report_write_task)
-
-            # delay scheduling the task until after the report is created
-            log.debug("Adding report entry task %s" % report_write_task)
-            reporter.created.addCallback(scheduleWriteReportEntry)
-
-            # if the write task fails n times, kill the reporter
-            report_write_task.done.addErrback(self.failedOpeningReport, reporter)
-            l.append(report_write_task.done)
-
-        # XXX: This seems a bit fragile.
-        # failedOpeningReport will forward the errback if the remaining
-        # reporter has failed. If we fireOnOneErrback, this means that
-        # the caller of report.write is responsible for attaching an
-        # errback to the returned deferred and handle this case. That
-        # probably means stopping the net test.
-
-        # Here, fireOnOneErrback means to call the deferredlists errback
-        # as soon as any of the deferreds return a failure. consumeErrors
-        # is used to prevent any uncaught failures from raising an
-        # exception. Alternately we could attach a logger to the errback
-        # of each deferred and it would have the same effect
-
-        # Probably the better thing to do here would be to add a callback
-        # to the deferredlist that checks to see if any reporters are left
-        # and raise an exception if there are no remaining reporters
-        dl = defer.DeferredList(l,fireOnOneErrback=True, consumeErrors=True)
-        return dl
+            def report_written(result):
+                self._reporters_written += 1
+                if len(self.reporters) == self._reporters_written:
+                    all_written.callback(self._reporters_written)
+
+            def report_failed(failure):
+                log.err("Failed writing report entry")
+                log.exception(failure)
+
+            report_entry_task = ReportEntry(reporter, measurement)
+            self.reportEntryManager.schedule(report_entry_task)
+
+            report_entry_task.done.addCallback(report_written)
+            report_entry_task.done.addErrback(report_failed)
+
+        return all_written
+
+    def failedOpeningReport(self, failure, reporter):
+        """
+        This errback get's called every time we fail to create a report.
+        By fail we mean that the number of retries has exceeded.
+        Once a report has failed to be created with a reporter we give up and
+        remove the reporter from the list of reporters to write to.
+        """
+        log.err("Failed to open %s reporter, giving up..." % reporter)
+        log.err("Reporter %s failed, removing from report..." % reporter)
+        log.exception(failure)
+        self.reporters.remove(reporter)
+        # Don't forward the exception unless there are no more reporters
+        if len(self.reporters) == 0:
+            log.err("Removed last reporter %s" % reporter)
+            raise errors.NoMoreReporters
+        return
 
     def close(self, _):
         """
@@ -476,10 +478,20 @@ class Report(object):
             all the reports have been closed.
 
         """
-        l = []
+        all_closed = defer.Deferred()
+
         for reporter in self.reporters[:]:
+            def report_closed(result):
+                self._reporters_closed += 1
+                if len(self.reporters) == self._reporters_closed:
+                    all_closed.callback(self._reporters_closed)
+
+            def report_failed(failure):
+                log.err("Failed closing report")
+                log.exception(failure)
+
             d = defer.maybeDeferred(reporter.finish)
-            l.append(d)
-        dl = defer.DeferredList(l)
-        return dl
+            d.addCallback(report_closed)
+            d.addErrback(report_failed)
 
+        return all_closed





More information about the tor-commits mailing list