[tor-commits] [ooni-probe/master] Add test state execution tracking and resume support

art at torproject.org art at torproject.org
Sun Nov 25 23:05:01 UTC 2012


commit 659dda6b388ecc658f8060b7bef2aa7b4015dce2
Author: Arturo Filastò <art at fuffa.org>
Date:   Sun Nov 25 20:05:04 2012 +0100

    Add test state execution tracking and resume support
    * Clean up some debug messages
    * Handle DNS lookup problems
    * Handle TCP Timeouts properly
    * Better error handling in HTTP tests
    * Make sure that responses are written even if the response is not received
---
 ooni/config.py           |   31 ++++++++--
 ooni/oonicli.py          |   54 +++++-----------
 ooni/reporter.py         |    5 +-
 ooni/runner.py           |  161 +++++++++++++++++++++++++++++++++++++++++++---
 ooni/templates/httpt.py  |   60 +++++++++++++----
 ooni/templates/scapyt.py |    1 -
 ooniprobe.conf           |    2 +-
 7 files changed, 243 insertions(+), 71 deletions(-)

diff --git a/ooni/config.py b/ooni/config.py
index b017e87..d86f4d7 100644
--- a/ooni/config.py
+++ b/ooni/config.py
@@ -6,15 +6,28 @@
 import os
 import yaml
 
-from twisted.internet import reactor, threads
+from twisted.internet import reactor, threads, defer
 
 from ooni.utils import otime
 from ooni.utils import Storage
 
 reports = Storage()
+scapyFactory = None
+stateDict = None
+
+# XXX refactor this to use a database
+resume_lock = defer.DeferredLock()
+
 basic = None
 cmd_line_options = None
-scapyFactory = None
+resume_filename = None
+
+# XXX-Twisted this is used to check if we have started the reactor or not. It
+# is necessary because if the tests are already concluded because we have
+# resumed a test session then it will call reactor.run() even though there is
+# no condition that will ever stop it.
+# There should be a more twisted way of doing this.
+start_reactor = True
 
 def get_root_path():
     this_directory = os.path.dirname(__file__)
@@ -58,19 +71,27 @@ class TestFilenameNotSet(Exception):
 
 def generateReportFilenames():
     try:
-        test_file_name = os.path.basename(cmd_line_options['test'])
+        test_filename = os.path.basename(cmd_line_options['test'])
     except IndexError:
         raise TestFilenameNotSet
 
-    test_name = '.'.join(test_file_name.split(".")[:-1])
+    test_name = '.'.join(test_filename.split(".")[:-1])
     base_filename = "%s_%s_"+otime.timestamp()+".%s"
-    print "Setting yamloo to %s" % base_filename
     reports.yamloo = base_filename % (test_name, "report", "yamloo")
+    print "Setting yamloo to %s" % reports.yamloo
     reports.pcap = base_filename % (test_name, "packets", "pcap")
+    print "Setting pcap to %s" % reports.pcap
 
 if not basic:
     # Here we make sure that we instance the config file attributes only once
     basic, privacy, advanced = loadConfigFile()
 
+if not resume_filename:
+    resume_filename = os.path.join(get_root_path(), 'ooniprobe.resume')
+    try:
+        with open(resume_filename) as f: pass
+    except IOError as e:
+        with open(resume_filename, 'w+') as f: pass
+
 # This is used to keep track of the state of the sniffer
 sniffer_running = None
diff --git a/ooni/oonicli.py b/ooni/oonicli.py
index 1a316b3..3a8b3df 100644
--- a/ooni/oonicli.py
+++ b/ooni/oonicli.py
@@ -36,7 +36,8 @@ class Options(usage.Options):
                 " network tests. These are loaded from modules, packages and"
                 " files listed on the command line")
 
-    optFlags = [["help", "h"]]
+    optFlags = [["help", "h"],
+                ["resume", "r"]]
 
     optParameters = [["reportfile", "o", None, "report file name"],
                      ["testdeck", "i", None,
@@ -82,39 +83,9 @@ def testsEnded(*arg, **kw):
     You can place here all the post shutdown tasks.
     """
     log.debug("testsEnded: Finished running all tests")
-    reactor.stop()
-
-def runTest(cmd_line_options):
-    config.cmd_line_options = cmd_line_options
-    config.generateReportFilenames()
-
-    if cmd_line_options['reportfile']:
-        config.reports.yamloo = cmd_line_options['reportfile']
-        config.reports.pcap = config.reports.yamloo+".pcap"
-
-    if os.path.exists(config.reports.pcap):
-        print "Report PCAP already exists with filename %s" % config.reports.pcap
-        print "Renaming it to %s" % config.reports.pcap+'.old'
-        os.rename(config.reports.pcap, config.reports.pcap+'.old')
-
-    classes = runner.findTestClassesFromFile(cmd_line_options['test'])
-    test_cases, options = runner.loadTestsAndOptions(classes, cmd_line_options)
-    if config.privacy.includepcap:
-        from ooni.utils.txscapy import ScapyFactory, ScapySniffer
-        try:
-            checkForRoot()
-        except NotRootError:
-            print "[!] Includepcap options requires root priviledges to run"
-            print "    you should run ooniprobe as root or disable the options in ooniprobe.conf"
-            sys.exit(1)
-
-        print "Starting sniffer"
-        config.scapyFactory = ScapyFactory(config.advanced.interface)
-
-        sniffer = ScapySniffer(config.reports.pcap)
-        config.scapyFactory.registerProtocol(sniffer)
-
-    return runner.runTestCases(test_cases, options, cmd_line_options)
+    config.start_reactor = False
+    try: reactor.stop()
+    except: pass
 
 def run():
     """
@@ -129,6 +100,7 @@ def run():
         raise SystemExit, "%s: %s" % (sys.argv[0], ue)
 
     deck_dl = []
+    resume = cmd_line_options['resume']
 
     log.start(cmd_line_options['logfile'])
     if cmd_line_options['testdeck']:
@@ -136,15 +108,21 @@ def run():
         for test in test_deck:
             del cmd_line_options
             cmd_line_options = test['options']
-            d1 = runTest(cmd_line_options)
+            if resume:
+                cmd_line_options['resume'] = True
+            else:
+                cmd_line_options['resume'] = False
+            d1 = runner.runTest(cmd_line_options)
             deck_dl.append(d1)
     else:
         log.msg("No test deck detected")
         del cmd_line_options['testdeck']
-        d1 = runTest(cmd_line_options)
+        d1 = runner.runTest(cmd_line_options)
         deck_dl.append(d1)
 
     d2 = defer.DeferredList(deck_dl)
-    d2.addCallback(testsEnded)
+    d2.addBoth(testsEnded)
 
-    reactor.run()
+    if config.start_reactor:
+        log.debug("Starting reactor")
+        reactor.run()
diff --git a/ooni/reporter.py b/ooni/reporter.py
index 63f501e..3cc77d7 100644
--- a/ooni/reporter.py
+++ b/ooni/reporter.py
@@ -152,7 +152,7 @@ def getTestDetails(options):
                     'test_version': options['version'],
                     'software_name': 'ooniprobe',
                     'software_version': software_version
-                    }
+    }
     defer.returnValue(test_details)
 
 class OReporter(object):
@@ -191,7 +191,7 @@ class OReporter(object):
                 'test_started': test_started,
                 'test_runtime': test_runtime,
                 'report': test_report}
-        return self.writeReportEntry(report)
+        return defer.maybeDeferred(self.writeReportEntry, report)
 
 class YAMLReporter(OReporter):
     """
@@ -224,6 +224,7 @@ class YAMLReporter(OReporter):
         self._write('---\n')
         self._write(safe_dump(entry))
         self._write('...\n')
+        return
 
     @defer.inlineCallbacks
     def createReport(self, options):
diff --git a/ooni/runner.py b/ooni/runner.py
index d7856e6..e7a40fd 100644
--- a/ooni/runner.py
+++ b/ooni/runner.py
@@ -14,6 +14,7 @@ import time
 import inspect
 import traceback
 import itertools
+import yaml
 
 from twisted.python import reflect, usage
 from twisted.internet import defer
@@ -155,7 +156,9 @@ def runTestCasesWithInput(test_cases, test_input, oreporter):
         return oreporter.testDone(test_instance, test_name)
 
     def test_error(failure, test_instance, test_name):
+        log.err("run Test Cases With Input problem")
         log.exception(failure)
+        return
 
     def tests_done(result, test_class):
         test_instance = test_class()
@@ -168,6 +171,7 @@ def runTestCasesWithInput(test_cases, test_input, oreporter):
             return oreporter.testDone(test_instance, 'summary')
         except NoPostProcessor:
             log.debug("No post processor configured")
+            return
 
     dl = []
     for test_case in test_cases:
@@ -191,7 +195,6 @@ def runTestCasesWithInput(test_cases, test_input, oreporter):
         d = defer.maybeDeferred(test)
         d.addCallback(test_done, test_instance, test_method)
         d.addErrback(test_error, test_instance, test_method)
-        log.debug("returning %s input" % test_method)
         dl.append(d)
 
     test_methods_d = defer.DeferredList(dl)
@@ -221,6 +224,104 @@ def runTestCasesWithInputUnit(test_cases, input_unit, oreporter):
         dl.append(d)
     return defer.DeferredList(dl)
 
+class InvalidResumeFile(Exception):
+    pass
+
+class noResumeSession(Exception):
+    pass
+
+def loadResumeFile():
+    """
+    Sets the singleton stateDict object to the content of the resume file.
+    If the file is empty then it will create an empty one.
+
+    Raises:
+
+        :class:ooni.runner.InvalidResumeFile if the resume file is not valid
+
+    """
+    if not config.stateDict:
+        try:
+            config.stateDict = yaml.safe_load(open(config.resume_filename))
+        except:
+            log.err("Error loading YAML file")
+            raise InvalidResumeFile
+
+        if not config.stateDict:
+            yaml.safe_dump(dict(), open(config.resume_filename, 'w+'))
+            config.stateDict = dict()
+
+        elif isinstance(config.stateDict, dict):
+            return
+        else:
+            log.err("The resume file is of the wrong format")
+            raise InvalidResumeFile
+
+def resumeTest(test_filename, input_unit_factory):
+    """
+    Returns the an input_unit_factory that is at the index of the previous run of the test 
+    for the specified test_filename.
+
+    Args:
+
+        test_filename (str): the filename of the test that is being run
+            including the .py extension.
+
+        input_unit_factory (:class:ooni.inputunit.InputUnitFactory): with the
+            same input of the past run.
+
+    Returns:
+
+        :class:ooni.inputunit.InputUnitFactory that is at the index of the
+            previous test run.
+
+    """
+    try:
+        idx = config.stateDict[test_filename]
+        for x in range(idx):
+            try:
+                input_unit_factory.next()
+            except StopIteration:
+                log.msg("Previous run was complete")
+                return input_unit_factory
+
+        return input_unit_factory
+
+    except KeyError:
+        log.debug("No resume key found for selected test name. It is therefore 0")
+        config.stateDict[test_filename] = 0
+        return input_unit_factory
+
+ at defer.inlineCallbacks
+def updateResumeFile(test_filename):
+    """
+    update the resume file with the current stateDict state.
+    """
+    log.debug("Acquiring lock for %s" % test_filename)
+    yield config.resume_lock.acquire()
+
+    current_resume_state = yaml.safe_load(open(config.resume_filename))
+    current_resume_state = config.stateDict
+    yaml.safe_dump(current_resume_state, open(config.resume_filename, 'w+'))
+
+    log.debug("Releasing lock for %s" % test_filename)
+    config.resume_lock.release()
+    defer.returnValue(config.stateDict[test_filename])
+
+ at defer.inlineCallbacks
+def increaseInputUnitIdx(test_filename):
+    """
+    Args:
+
+        test_filename (str): the filename of the test that is being run
+            including the .py extension.
+
+        input_unit_idx (int): the current input unit index for the test.
+
+    """
+    config.stateDict[test_filename] += 1
+    yield updateResumeFile(test_filename)
+
 @defer.inlineCallbacks
 def runTestCases(test_cases, options, cmd_line_options):
     log.debug("Running %s" % test_cases)
@@ -245,10 +346,10 @@ def runTestCases(test_cases, options, cmd_line_options):
             test_inputs = [None]
 
     if cmd_line_options['collector']:
-        log.debug("Using remote collector")
+        log.msg("Using remote collector, please be patient while we create the report.")
         oreporter = reporter.OONIBReporter(cmd_line_options)
     else:
-        log.debug("Reporting to file %s" % config.reports.yamloo)
+        log.msg("Reporting to file %s" % config.reports.yamloo)
         oreporter = reporter.YAMLReporter(cmd_line_options)
 
     try:
@@ -256,8 +357,6 @@ def runTestCases(test_cases, options, cmd_line_options):
     except Exception, e:
         log.exception(e)
 
-    log.debug("Creating report")
-
     try:
         yield oreporter.createReport(options)
     except reporter.OONIBReportCreationFailed:
@@ -266,17 +365,59 @@ def runTestCases(test_cases, options, cmd_line_options):
     except Exception, e:
         log.exception(e)
 
-    # This deferred list is a deferred list of deferred lists
-    # it is used to store all the deferreds of the tests that
-    # are run
-    input_unit_idx = 0
+    try:
+        loadResumeFile()
+    except InvalidResumeFile:
+        log.err("Error in loading resume file %s" % config.resume_filename)
+        log.err("Try deleting the resume file")
+        raise InvalidResumeFile
+
+    test_filename = os.path.basename(cmd_line_options['test'])
+
+    if cmd_line_options['resume']:
+        resumeTest(test_filename, input_unit_factory)
+    else:
+        config.stateDict[test_filename] = 0
+
     try:
         for input_unit in input_unit_factory:
             log.debug("Running this input unit %s" % input_unit)
+
             yield runTestCasesWithInputUnit(test_cases, input_unit,
                         oreporter)
-            input_unit_idx += 1
+            yield increaseInputUnitIdx(test_filename)
 
     except Exception:
         log.exception("Problem in running test")
 
+def runTest(cmd_line_options):
+    config.cmd_line_options = cmd_line_options
+    config.generateReportFilenames()
+
+    if cmd_line_options['reportfile']:
+        config.reports.yamloo = cmd_line_options['reportfile']
+        config.reports.pcap = config.reports.yamloo+".pcap"
+
+    if os.path.exists(config.reports.pcap):
+        print "Report PCAP already exists with filename %s" % config.reports.pcap
+        print "Renaming it to %s" % config.reports.pcap+".old"
+        os.rename(config.reports.pcap, config.reports.pcap+".old")
+
+    classes = findTestClassesFromFile(cmd_line_options['test'])
+    test_cases, options = loadTestsAndOptions(classes, cmd_line_options)
+    if config.privacy.includepcap:
+        from ooni.utils.txscapy import ScapyFactory, ScapySniffer
+        try:
+            checkForRoot()
+        except NotRootError:
+            print "[!] Includepcap options requires root priviledges to run"
+            print "    you should run ooniprobe as root or disable the options in ooniprobe.conf"
+            sys.exit(1)
+
+        print "Starting sniffer"
+        config.scapyFactory = ScapyFactory(config.advanced.interface)
+
+        sniffer = ScapySniffer(config.reports.pcap)
+        config.scapyFactory.registerProtocol(sniffer)
+
+    return runTestCases(test_cases, options, cmd_line_options)
diff --git a/ooni/templates/httpt.py b/ooni/templates/httpt.py
index e36c049..0d53ebe 100644
--- a/ooni/templates/httpt.py
+++ b/ooni/templates/httpt.py
@@ -12,9 +12,9 @@ from twisted.internet import protocol, defer
 from twisted.internet.ssl import ClientContextFactory
 
 from twisted.internet import reactor
-from twisted.internet.error import ConnectionRefusedError
+from twisted.internet.error import ConnectionRefusedError, DNSLookupError, TCPTimedOutError
 
-from twisted.web._newclient import Request
+from twisted.web._newclient import Request, Response
 
 from ooni.nettest import NetTestCase
 from ooni.utils import log
@@ -53,6 +53,9 @@ class HTTPTest(NetTestCase):
     request = {}
     response = {}
 
+    requests = []
+    responses = []
+
     def _setUp(self):
         try:
             import OpenSSL
@@ -97,21 +100,37 @@ class HTTPTest(NetTestCase):
     def processInputs(self):
         pass
 
-    def _processResponseBody(self, response_body, request, response, body_processor):
-        log.debug("Processing response body")
-        self.report['requests'].append({
+    def addToReport(self, request, response=None):
+        """
+        Adds to the report the specified request and response.
+
+        Args:
+            request (dict): A dict describing the request that was made
+
+            response (instance): An instance of
+                :class:twisted.web.client.Response.
+                Note: headers is our modified True Headers version.
+        """
+        log.debug("Adding %s to report" % request)
+        request_response = {
             'request': {
                 'headers': request['headers'],
                 'body': request['body'],
                 'url': request['url'],
                 'method': request['method']
-            },
-            'response': {
+            }
+        }
+        if response:
+            request_response['response'] = {
                 'headers': list(response.headers.getAllRawHeaders()),
                 'body': response_body,
                 'code': response.code
             }
-        })
+        self.report['requests'].append(request_response)
+
+    def _processResponseBody(self, response_body, request, response, body_processor):
+        log.debug("Processing response body")
+        self.addToReport(request, response)
         if body_processor:
             body_processor(response_body)
         else:
@@ -184,6 +203,7 @@ class HTTPTest(NetTestCase):
             return
         else:
             log.debug("Got response %s" % response)
+            return
 
         if str(response.code).startswith('3'):
             self.processRedirect(response.headers.getRawHeaders('Location')[0])
@@ -272,20 +292,32 @@ class HTTPTest(NetTestCase):
 
         headers = TrueHeaders(request['headers'])
 
-        def errback(failure):
-            failure.trap(ConnectionRefusedError, SOCKSError)
+        def errback(failure, request):
+            failure.trap(ConnectionRefusedError, SOCKSError, DNSLookupError, TCPTimedOutError)
+            log.err("Error performing %s" % request)
+            self.addToReport(request)
             if isinstance(failure.value, ConnectionRefusedError):
                 log.err("Connection refused. The backend may be down")
                 self.report['failure'] = 'connection_refused_error'
 
             elif isinstance(failure.value, SOCKSError):
-                log.err("Sock error. The SOCK proxy may be down")
-                self.report['failure'] = 'sockserror'
+                log.err("Sock error. The SOCKS proxy may be down")
+                self.report['failure'] = 'socks_error'
+
+            elif isinstance(failure.value, DNSLookupError):
+                log.err("DNS lookup failure")
+                self.report['failure'] = 'dns_lookup_error'
+
+            elif isinstance(failure.value, TCPTimedOutError):
+                log.err("DNS lookup failure")
+                self.report['failure'] = 'tcp_timed_out_error'
+            return
 
         d = agent.request(request['method'], request['url'], headers,
                 body_producer)
 
-        d.addCallback(self._cbResponse, request, headers_processor, body_processor)
-        d.addErrback(errback)
+        d.addCallback(self._cbResponse, request, headers_processor,
+                body_processor)
+        d.addErrback(errback, request)
         return d
 
diff --git a/ooni/templates/scapyt.py b/ooni/templates/scapyt.py
index a1eade4..a787115 100644
--- a/ooni/templates/scapyt.py
+++ b/ooni/templates/scapyt.py
@@ -140,7 +140,6 @@ class BaseScapyTest(NetTestCase):
         scapySender = ScapySender()
 
         config.scapyFactory.registerProtocol(scapySender)
-
         scapySender.sendPackets(packets)
 
         scapySender.stopSending()
diff --git a/ooniprobe.conf b/ooniprobe.conf
index e9f208f..66ab017 100644
--- a/ooniprobe.conf
+++ b/ooniprobe.conf
@@ -26,7 +26,7 @@ advanced:
     threadpool_size: 10
     tor_socksport: 9050
     # For auto detection
-    interface: auto 
+    interface: auto
     # Of specify a specific interface
     #interface: wlan0
 





More information about the tor-commits mailing list