[tor-commits] [ooni-probe/master] Add test state execution tracking and resume support

art at torproject.org art at torproject.org
Sun Nov 25 22:56:26 UTC 2012


commit 4d693cf148242f724d477796d4daa73cc15aa684
Author: Arturo Filastò <art at fuffa.org>
Date:   Sun Nov 25 20:05:04 2012 +0100

    Add test state execution tracking and resume support
    * Clean up some debug messages
---
 ooni/config.py           |   24 ++++++-
 ooni/oonicli.py          |   54 +++++-----------
 ooni/reporter.py         |    3 +-
 ooni/runner.py           |  161 +++++++++++++++++++++++++++++++++++++++++++---
 ooni/templates/scapyt.py |    1 -
 ooniprobe.conf           |    2 +-
 6 files changed, 192 insertions(+), 53 deletions(-)

diff --git a/ooni/config.py b/ooni/config.py
index b017e87..e01900f 100644
--- a/ooni/config.py
+++ b/ooni/config.py
@@ -6,15 +6,28 @@
 import os
 import yaml
 
-from twisted.internet import reactor, threads
+from twisted.internet import reactor, threads, defer
 
 from ooni.utils import otime
 from ooni.utils import Storage
 
 reports = Storage()
+scapyFactory = None
+stateDict = None
+
+# XXX refactor this to use a database
+resume_lock = defer.DeferredLock()
+
 basic = None
 cmd_line_options = None
-scapyFactory = None
+resume_filename = None
+
+# XXX-Twisted this is used to check if we have started the reactor or not. It
+# is necessary because if the tests are already concluded because we have
+# resumed a test session then it will call reactor.run() even though there is
+# no condition that will ever stop it.
+# There should be a more twisted way of doing this.
+start_reactor = True
 
 def get_root_path():
     this_directory = os.path.dirname(__file__)
@@ -72,5 +85,12 @@ if not basic:
     # Here we make sure that we instance the config file attributes only once
     basic, privacy, advanced = loadConfigFile()
 
+if not resume_filename:
+    resume_filename = os.path.join(get_root_path(), 'ooniprobe.resume')
+    try:
+        with open(resume_filename) as f: pass
+    except IOError as e:
+        with open(resume_filename, 'w+') as f: pass
+
 # This is used to keep track of the state of the sniffer
 sniffer_running = None
diff --git a/ooni/oonicli.py b/ooni/oonicli.py
index 1a316b3..3a8b3df 100644
--- a/ooni/oonicli.py
+++ b/ooni/oonicli.py
@@ -36,7 +36,8 @@ class Options(usage.Options):
                 " network tests. These are loaded from modules, packages and"
                 " files listed on the command line")
 
-    optFlags = [["help", "h"]]
+    optFlags = [["help", "h"],
+                ["resume", "r"]]
 
     optParameters = [["reportfile", "o", None, "report file name"],
                      ["testdeck", "i", None,
@@ -82,39 +83,9 @@ def testsEnded(*arg, **kw):
     You can place here all the post shutdown tasks.
     """
     log.debug("testsEnded: Finished running all tests")
-    reactor.stop()
-
-def runTest(cmd_line_options):
-    config.cmd_line_options = cmd_line_options
-    config.generateReportFilenames()
-
-    if cmd_line_options['reportfile']:
-        config.reports.yamloo = cmd_line_options['reportfile']
-        config.reports.pcap = config.reports.yamloo+".pcap"
-
-    if os.path.exists(config.reports.pcap):
-        print "Report PCAP already exists with filename %s" % config.reports.pcap
-        print "Renaming it to %s" % config.reports.pcap+'.old'
-        os.rename(config.reports.pcap, config.reports.pcap+'.old')
-
-    classes = runner.findTestClassesFromFile(cmd_line_options['test'])
-    test_cases, options = runner.loadTestsAndOptions(classes, cmd_line_options)
-    if config.privacy.includepcap:
-        from ooni.utils.txscapy import ScapyFactory, ScapySniffer
-        try:
-            checkForRoot()
-        except NotRootError:
-            print "[!] Includepcap options requires root priviledges to run"
-            print "    you should run ooniprobe as root or disable the options in ooniprobe.conf"
-            sys.exit(1)
-
-        print "Starting sniffer"
-        config.scapyFactory = ScapyFactory(config.advanced.interface)
-
-        sniffer = ScapySniffer(config.reports.pcap)
-        config.scapyFactory.registerProtocol(sniffer)
-
-    return runner.runTestCases(test_cases, options, cmd_line_options)
+    config.start_reactor = False
+    try: reactor.stop()
+    except: pass
 
 def run():
     """
@@ -129,6 +100,7 @@ def run():
         raise SystemExit, "%s: %s" % (sys.argv[0], ue)
 
     deck_dl = []
+    resume = cmd_line_options['resume']
 
     log.start(cmd_line_options['logfile'])
     if cmd_line_options['testdeck']:
@@ -136,15 +108,21 @@ def run():
         for test in test_deck:
             del cmd_line_options
             cmd_line_options = test['options']
-            d1 = runTest(cmd_line_options)
+            if resume:
+                cmd_line_options['resume'] = True
+            else:
+                cmd_line_options['resume'] = False
+            d1 = runner.runTest(cmd_line_options)
             deck_dl.append(d1)
     else:
         log.msg("No test deck detected")
         del cmd_line_options['testdeck']
-        d1 = runTest(cmd_line_options)
+        d1 = runner.runTest(cmd_line_options)
         deck_dl.append(d1)
 
     d2 = defer.DeferredList(deck_dl)
-    d2.addCallback(testsEnded)
+    d2.addBoth(testsEnded)
 
-    reactor.run()
+    if config.start_reactor:
+        log.debug("Starting reactor")
+        reactor.run()
diff --git a/ooni/reporter.py b/ooni/reporter.py
index 63f501e..9986334 100644
--- a/ooni/reporter.py
+++ b/ooni/reporter.py
@@ -191,7 +191,7 @@ class OReporter(object):
                 'test_started': test_started,
                 'test_runtime': test_runtime,
                 'report': test_report}
-        return self.writeReportEntry(report)
+        return defer.maybeDeferred(self.writeReportEntry, report)
 
 class YAMLReporter(OReporter):
     """
@@ -224,6 +224,7 @@ class YAMLReporter(OReporter):
         self._write('---\n')
         self._write(safe_dump(entry))
         self._write('...\n')
+        return
 
     @defer.inlineCallbacks
     def createReport(self, options):
diff --git a/ooni/runner.py b/ooni/runner.py
index d7856e6..77f9b03 100644
--- a/ooni/runner.py
+++ b/ooni/runner.py
@@ -14,6 +14,7 @@ import time
 import inspect
 import traceback
 import itertools
+import yaml
 
 from twisted.python import reflect, usage
 from twisted.internet import defer
@@ -155,7 +156,9 @@ def runTestCasesWithInput(test_cases, test_input, oreporter):
         return oreporter.testDone(test_instance, test_name)
 
     def test_error(failure, test_instance, test_name):
+        log.err("run Test Cases With Input problem")
         log.exception(failure)
+        return
 
     def tests_done(result, test_class):
         test_instance = test_class()
@@ -168,6 +171,7 @@ def runTestCasesWithInput(test_cases, test_input, oreporter):
             return oreporter.testDone(test_instance, 'summary')
         except NoPostProcessor:
             log.debug("No post processor configured")
+            return
 
     dl = []
     for test_case in test_cases:
@@ -191,7 +195,6 @@ def runTestCasesWithInput(test_cases, test_input, oreporter):
         d = defer.maybeDeferred(test)
         d.addCallback(test_done, test_instance, test_method)
         d.addErrback(test_error, test_instance, test_method)
-        log.debug("returning %s input" % test_method)
         dl.append(d)
 
     test_methods_d = defer.DeferredList(dl)
@@ -221,6 +224,104 @@ def runTestCasesWithInputUnit(test_cases, input_unit, oreporter):
         dl.append(d)
     return defer.DeferredList(dl)
 
+class InvalidResumeFile(Exception):
+    pass
+
+class noResumeSession(Exception):
+    pass
+
+def loadResumeFile():
+    """
+    Sets the singleton stateDict object to the content of the resume file.
+    If the file is empty then it will create an empty one.
+
+    Raises:
+
+        :class:ooni.runner.InvalidResumeFile if the resume file is not valid
+
+    """
+    if not config.stateDict:
+        try:
+            config.stateDict = yaml.safe_load(open(config.resume_filename))
+        except:
+            log.err("Error loading YAML file")
+            raise InvalidResumeFile
+
+        if not config.stateDict:
+            yaml.safe_dump(dict(), open(config.resume_filename, 'w+'))
+            config.stateDict = dict()
+
+        elif isinstance(config.stateDict, dict):
+            return
+        else:
+            log.err("The resume file is of the wrong format")
+            raise InvalidResumeFile
+
+def resumeTest(test_filename, input_unit_factory):
+    """
+    Returns the an input_unit_factory that is at the index of the previous run of the test 
+    for the specified test_filename.
+
+    Args:
+
+        test_filename (str): the filename of the test that is being run
+            including the .py extension.
+
+        input_unit_factory (:class:ooni.inputunit.InputUnitFactory): with the
+            same input of the past run.
+
+    Returns:
+
+        :class:ooni.inputunit.InputUnitFactory that is at the index of the
+            previous test run.
+
+    """
+    try:
+        idx = config.stateDict[test_filename]
+        for x in range(idx):
+            try:
+                input_unit_factory.next()
+            except StopIteration:
+                log.msg("Previous run was complete")
+                return input_unit_factory
+
+        return input_unit_factory
+
+    except KeyError:
+        log.debug("No resume key found for selected test name. It is therefore 0")
+        config.stateDict[test_filename] = 0
+        return input_unit_factory
+
+ at defer.inlineCallbacks
+def updateResumeFile(test_filename):
+    """
+    update the resume file with the current stateDict state.
+    """
+    log.debug("Acquiring lock for %s" % test_filename)
+    yield config.resume_lock.acquire()
+
+    current_resume_state = yaml.safe_load(open(config.resume_filename))
+    current_resume_state = config.stateDict
+    yaml.safe_dump(current_resume_state, open(config.resume_filename, 'w+'))
+
+    log.debug("Releasing lock for %s" % test_filename)
+    config.resume_lock.release()
+    defer.returnValue(config.stateDict[test_filename])
+
+ at defer.inlineCallbacks
+def increaseInputUnitIdx(test_filename):
+    """
+    Args:
+
+        test_filename (str): the filename of the test that is being run
+            including the .py extension.
+
+        input_unit_idx (int): the current input unit index for the test.
+
+    """
+    config.stateDict[test_filename] += 1
+    yield updateResumeFile(test_filename)
+
 @defer.inlineCallbacks
 def runTestCases(test_cases, options, cmd_line_options):
     log.debug("Running %s" % test_cases)
@@ -245,10 +346,10 @@ def runTestCases(test_cases, options, cmd_line_options):
             test_inputs = [None]
 
     if cmd_line_options['collector']:
-        log.debug("Using remote collector")
+        log.msg("Using remote collector, please be patient while we create the report.")
         oreporter = reporter.OONIBReporter(cmd_line_options)
     else:
-        log.debug("Reporting to file %s" % config.reports.yamloo)
+        log.msg("Reporting to file %s" % config.reports.yamloo)
         oreporter = reporter.YAMLReporter(cmd_line_options)
 
     try:
@@ -256,8 +357,6 @@ def runTestCases(test_cases, options, cmd_line_options):
     except Exception, e:
         log.exception(e)
 
-    log.debug("Creating report")
-
     try:
         yield oreporter.createReport(options)
     except reporter.OONIBReportCreationFailed:
@@ -266,17 +365,59 @@ def runTestCases(test_cases, options, cmd_line_options):
     except Exception, e:
         log.exception(e)
 
-    # This deferred list is a deferred list of deferred lists
-    # it is used to store all the deferreds of the tests that
-    # are run
-    input_unit_idx = 0
+    try:
+        loadResumeFile()
+    except InvalidResumeFile:
+        log.err("Error in loading resume file %s" % config.resume_filename)
+        log.err("Try deleting the resume file")
+        raise InvalidResumeFile
+
+    test_filename = os.path.basename(cmd_line_options['test'])
+
+    if cmd_line_options['resume']:
+        resumeTest(test_filename, input_unit_factory)
+    else:
+        config.stateDict[test_filename] = 0
+
     try:
         for input_unit in input_unit_factory:
             log.debug("Running this input unit %s" % input_unit)
+
             yield runTestCasesWithInputUnit(test_cases, input_unit,
                         oreporter)
-            input_unit_idx += 1
+            yield increaseInputUnitIdx(test_filename)
 
     except Exception:
         log.exception("Problem in running test")
 
+def runTest(cmd_line_options):
+    config.cmd_line_options = cmd_line_options
+    config.generateReportFilenames()
+
+    if cmd_line_options['reportfile']:
+        config.reports.yamloo = cmd_line_options['reportfile']
+        config.reports.pcap = config.reports.yamloo+".pcap"
+
+    if os.path.exists(config.reports.pcap):
+        print "Report PCAP already exists with filename %s" % config.reports.pcap
+        print "Renaming it to %s" % config.reports.pcap+'.old'
+        os.rename(config.reports.pcap, config.reports.pcap+'.old')
+
+    classes = findTestClassesFromFile(cmd_line_options['test'])
+    test_cases, options = loadTestsAndOptions(classes, cmd_line_options)
+    if config.privacy.includepcap:
+        from ooni.utils.txscapy import ScapyFactory, ScapySniffer
+        try:
+            checkForRoot()
+        except NotRootError:
+            print "[!] Includepcap options requires root priviledges to run"
+            print "    you should run ooniprobe as root or disable the options in ooniprobe.conf"
+            sys.exit(1)
+
+        print "Starting sniffer"
+        config.scapyFactory = ScapyFactory(config.advanced.interface)
+
+        sniffer = ScapySniffer(config.reports.pcap)
+        config.scapyFactory.registerProtocol(sniffer)
+
+    return runTestCases(test_cases, options, cmd_line_options)
diff --git a/ooni/templates/scapyt.py b/ooni/templates/scapyt.py
index a1eade4..a787115 100644
--- a/ooni/templates/scapyt.py
+++ b/ooni/templates/scapyt.py
@@ -140,7 +140,6 @@ class BaseScapyTest(NetTestCase):
         scapySender = ScapySender()
 
         config.scapyFactory.registerProtocol(scapySender)
-
         scapySender.sendPackets(packets)
 
         scapySender.stopSending()
diff --git a/ooniprobe.conf b/ooniprobe.conf
index e9f208f..66ab017 100644
--- a/ooniprobe.conf
+++ b/ooniprobe.conf
@@ -26,7 +26,7 @@ advanced:
     threadpool_size: 10
     tor_socksport: 9050
     # For auto detection
-    interface: auto 
+    interface: auto
     # Of specify a specific interface
     #interface: wlan0
 



More information about the tor-commits mailing list