[tor-commits] [ooni-probe/master] Fix race condition bug in reporter

isis at torproject.org isis at torproject.org
Sun Mar 10 01:57:02 UTC 2013


commit 90fe8296e78ff48f93c2aaf4e0e03949f5f00960
Author: aagbsn <aagbsn at extc.org>
Date:   Fri Feb 22 20:01:28 2013 +0100

    Fix race condition bug in reporter
    
    Fixed a race where the report.reporters list got modified during
    iteration and clean up reporter failure handling.
---
 ooni/reporter.py |   65 +++++++++++++++++++++++++++++++++++++----------------
 1 files changed, 45 insertions(+), 20 deletions(-)

diff --git a/ooni/reporter.py b/ooni/reporter.py
index 638df41..f89d5bd 100644
--- a/ooni/reporter.py
+++ b/ooni/reporter.py
@@ -222,6 +222,7 @@ class YAMLReporter(OReporter):
         self._writeln("###########################################")
 
         self.writeReportEntry(self.testDetails)
+        self.created.callback(self)
 
     def finish(self):
         self._stream.close()
@@ -355,6 +356,7 @@ class OONIBReporter(OReporter):
         self.reportID = parsed_response['report_id']
         self.backendVersion = parsed_response['backend_version']
         log.debug("Created report with id %s" % parsed_response['report_id'])
+        self.created.callback(self)
 
 class ReportClosed(Exception):
     pass
@@ -377,8 +379,6 @@ class Report(object):
         self.reporters = reporters
 
         self.done = defer.Deferred()
-        #self.done.addCallback(self.close)
-
         self.reportEntryManager = reportEntryManager
 
     def open(self):
@@ -386,12 +386,15 @@ class Report(object):
         This will create all the reports that need to be created and fires the
         created callback of the reporter whose report got created.
         """
-        for reporter in self.reporters:
+        l = []
+        for reporter in self.reporters[:]:
+            reporter.createReport()
             reporter.created.addErrback(self.failedOpeningReport, reporter)
-            d = defer.maybeDeferred(reporter.createReport)
-            d.addCallback(reporter.created.callback)
-            d.addErrback(reporter.created.callback)
-
+            l.append(reporter.created)
+        log.debug("Reporters created: %s" % l)
+        # Should we consume errors silently?
+        dl = defer.DeferredList(l)
+        return dl
 
     def failedOpeningReport(self, failure, reporter):
         """
@@ -401,9 +404,13 @@ class Report(object):
         remove the reporter from the list of reporters to write to.
         """
         log.err("Failed to open %s reporter, giving up..." % reporter)
+        log.err("Reporter %s failed, removing from report..." % reporter)
         self.reporters.remove(reporter)
-        failure.reporter = reporter
-        return failure
+        # Don't forward the exception unless there are no more reporters
+        if len(self.reporters) == 0:
+            log.err("Removed last reporter %s" % reporter)
+            failure.reporter = reporter
+            return failure
 
     def write(self, measurement):
         """
@@ -423,18 +430,36 @@ class Report(object):
             been written.
         """
         l = []
-        for reporter in self.reporters:
-            def writeReportEntry(result):
-                report_write_task = ReportEntry(reporter, measurement)
+        for reporter in self.reporters[:]:
+            report_write_task = ReportEntry(reporter, measurement)
+            def scheduleWriteReportEntry(result):
                 self.reportEntryManager.schedule(report_write_task)
-                return report_write_task.done
-
-            d = reporter.created.addBoth(writeReportEntry)
-            # Give up on writing to the reporter if the task fails X times
-            d.addErrback(self.failedOpeningReport, reporter)
-            l.append(d)
 
-        dl = defer.DeferredList(l)
+            # delay scheduling the task until after the report is created
+            log.debug("Adding report entry task %s" % report_write_task)
+            reporter.created.addCallback(scheduleWriteReportEntry)
+
+            # if the write task fails n times, kill the reporter
+            report_write_task.done.addErrback(self.failedOpeningReport, reporter)
+            l.append(report_write_task.done)
+
+        # XXX: This seems a bit fragile.
+        # failedOpeningReport will forward the errback if the remaining
+        # reporter has failed. If we fireOnOneErrback, this means that
+        # the caller of report.write is responsible for attaching an
+        # errback to the returned deferred and handle this case. That
+        # probably means stopping the net test.
+
+        # Here, fireOnOneErrback means to call the deferredlists errback
+        # as soon as any of the deferreds return a failure. consumeErrors
+        # is used to prevent any uncaught failures from raising an
+        # exception. Alternately we could attach a logger to the errback
+        # of each deferred and it would have the same effect
+
+        # Probably the better thing to do here would be to add a callback
+        # to the deferredlist that checks to see if any reporters are left
+        # and raise an exception if there are no remaining reporters
+        dl = defer.DeferredList(l,fireOnOneErrback=True, consumeErrors=True)
         return dl
 
     def close(self, _):
@@ -447,7 +472,7 @@ class Report(object):
 
         """
         l = []
-        for reporter in self.reporters:
+        for reporter in self.reporters[:]:
             d = defer.maybeDeferred(reporter.finish)
             l.append(d)
         dl = defer.DeferredList(l)





More information about the tor-commits mailing list