commit 5544ee71c001d64aecc5a729724af53c8ec2f246 Author: Arturo Filastò art@fuffa.org Date: Sun Nov 11 18:15:54 2012 +0100
Implement basic collector for ooniprobe reports * Reports can be submitted over the network via http to a remote collector * Implement the backend component of the collector that writes submitted reports to flat files, following the report_id naming convention. * XXX add support for connecting to the collector via Tor Hidden Services --- nettests/core/http_host.py | 2 +- ooni/oonicli.py | 2 + ooni/reporter.py | 176 ++++++++++++++++++++------------------------ ooni/runner.py | 19 +++-- ooni/utils/net.py | 2 +- oonib/config.py | 6 +- oonib/report/api.py | 44 ++++++++++- 7 files changed, 138 insertions(+), 113 deletions(-)
diff --git a/nettests/core/http_host.py b/nettests/core/http_host.py index 0e73f82..662cc40 100644 --- a/nettests/core/http_host.py +++ b/nettests/core/http_host.py @@ -16,7 +16,7 @@ from ooni.utils import log from ooni.templates import httpt
class UsageOptions(usage.Options): - optParameters = [['backend', 'b', 'http://127.0.0.1:1234', + optParameters = [['backend', 'b', 'http://127.0.0.1:57001', 'URL of the test backend to use'], ['content', 'c', None, 'The file to read from containing the content of a block page']] diff --git a/ooni/oonicli.py b/ooni/oonicli.py index 9287174..9473ad8 100644 --- a/ooni/oonicli.py +++ b/ooni/oonicli.py @@ -40,6 +40,8 @@ class Options(usage.Options, app.ReactorSelectionMixin): 'Report deferred creation and callback stack traces'],]
optParameters = [["reportfile", "o", None, "report file name"], + ["collector", "c", None, + "Address of the collector of test results. (example: http://127.0.0.1:8888)"], ["logfile", "l", None, "log file name"], ["pcapfile", "p", None, "pcap file name"]]
diff --git a/ooni/reporter.py b/ooni/reporter.py index 7147b35..05ea94a 100644 --- a/ooni/reporter.py +++ b/ooni/reporter.py @@ -106,6 +106,8 @@ class OReporter(object): pass
def testDone(self, test, test_name): + log.debug("Finished running %s" % test_name) + log.debug("Writing report") test_report = dict(test.report)
if isinstance(test.input, packet.Packet): @@ -120,14 +122,15 @@ class OReporter(object): 'test_name': test_name, 'test_started': test_started, 'report': test_report} - self.writeReportEntry(report) + return self.writeReportEntry(report)
def allDone(self): log.debug("allDone: Finished running all tests") - self.finish() try: + log.debug("Stopping the reactor") reactor.stop() except: + log.debug("Unable to stop the reactor") pass return None
@@ -151,6 +154,7 @@ class YAMLReporter(OReporter): untilConcludes(self._stream.flush)
def writeReportEntry(self, entry): + log.debug("Writing report with YAML reporter") self._write('---\n') self._write(safe_dump(entry)) self._write('...\n') @@ -169,88 +173,60 @@ class YAMLReporter(OReporter): def finish(self): self._stream.close()
-class OONIBReporter(object): + +class OONIBReportUpdateFailed(Exception): + pass + +class OONIBReportCreationFailed(Exception): + pass + +class OONIBTestDetailsLookupFailed(Exception): + pass + +class OONIBReporter(OReporter): def __init__(self, backend_url): from twisted.web.client import Agent from twisted.internet import reactor self.agent = Agent(reactor) self.backend_url = backend_url
- def _newReportCreated(self, data): - log.debug("newReportCreated %s" % data) - return data - - def _processResponseBody(self, response, body_cb): - log.debug("processResponseBody %s" % response) - done = defer.Deferred() - response.deliverBody(BodyReceiver(done)) - done.addCallback(body_cb) - return done - - def createReport(self, test_name, - test_version, report_header): - url = self.backend_url + '/new' - software_version = '0.0.1' - - request = {'software_name': 'ooni-probe', - 'software_version': software_version, - 'test_name': test_name, - 'test_version': test_version, - 'progress': 0, - 'content': report_header - } - def gotDetails(test_details): - log.debug("Creating report via url %s" % url) - - bodyProducer = StringProducer(json.dumps(request)) - d = self.agent.request("POST", url, - bodyProducer=bodyProducer) - d.addCallback(self._processResponseBody, - self._newReportCreated) - return d - - d = getTestDetails(options) - d.addCallback(gotDetails) - return d - - def writeReportEntry(self, entry, test_id=None): - if not test_id: - log.err("Write report entry on OONIB requires test id") - raise NoTestIDSpecified + @defer.inlineCallbacks + def writeReportEntry(self, entry): + log.debug("Writing report with OONIB reporter") + content = '---\n' + content += safe_dump(entry) + content += '...\n'
- report = '---\n' - report += safe_dump(entry) - report += '...\n' + url = self.backend_url + '/report/new'
- url = self.backend_url + '/new' + request = {'report_id': self.report_id, + 'content': content}
- request = {'test_id': test_id, - 'content': report} + log.debug("Updating report with id %s" % self.report_id) + request_json = json.dumps(request) + log.debug("Sending %s" % request_json)
bodyProducer = StringProducer(json.dumps(request)) - d = self.agent.request("PUT", url, - bodyProducer=bodyProducer) - - d.addCallback(self._processResponseBody, - self._newReportCreated) - return d - + log.debug("Creating report via url %s" % url)
+ try: + response = yield self.agent.request("PUT", url, + bodyProducer=bodyProducer) + except: + # XXX we must trap this in the runner and make sure to report the data later. + raise OONIBReportUpdateFailed
-class OONIBReporter(OReporter): - def __init__(self, backend_url): - from twisted.web.client import Agent - from twisted.internet import reactor - self.agent = Agent(reactor) - self.backend_url = backend_url + #parsed_response = json.loads(backend_response) + #self.report_id = parsed_response['report_id'] + #self.backend_version = parsed_response['backend_version'] + #log.debug("Created report with id %s" % parsed_response['report_id'])
- def _processResponseBody(self, *arg, **kw): - #done = defer.Deferred() - #response.deliverBody(BodyReceiver(done)) - #done.addCallback(self._newReportCreated) - #return done
+ @defer.inlineCallbacks def createReport(self, options): + """ + Creates a report on the oonib collector. + """ test_name = options['name'] test_version = options['version']
@@ -258,33 +234,41 @@ class OONIBReporter(OReporter): url = self.backend_url + '/report/new' software_version = '0.0.1'
- def gotDetails(test_details): - content = '---\n' - content += safe_dump(test_details) - content += '...\n' + test_details = yield getTestDetails(options) + + content = '---\n' + content += safe_dump(test_details) + content += '...\n'
- request = {'software_name': 'ooniprobe', - 'software_version': software_version, - 'test_name': test_name, - 'test_version': test_version, - 'progress': 0, - 'content': content - } - log.debug("Creating report via url %s" % url) - request_json = json.dumps(request) - log.debug("Sending %s" % request_json) - - def bothCalls(*arg, **kw): - print arg, kw - - body_producer = StringProducer(request_json) - d = self.agent.request("POST", url, None, - body_producer) - d.addBoth(self._processResponseBody) - return d - - d = getTestDetails(options) - d.addCallback(gotDetails) - # XXX handle errors - return d + request = {'software_name': 'ooniprobe', + 'software_version': software_version, + 'test_name': test_name, + 'test_version': test_version, + 'progress': 0, + 'content': content + } + log.debug("Creating report via url %s" % url) + request_json = json.dumps(request) + log.debug("Sending %s" % request_json) + + bodyProducer = StringProducer(json.dumps(request)) + log.debug("Creating report via url %s" % url) + + try: + response = yield self.agent.request("POST", url, + bodyProducer=bodyProducer) + except: + raise OONIBReportCreationFailed + + # This is a little trix to allow us to unspool the response. We create + # a deferred and call yield on it. + response_body = defer.Deferred() + response.deliverBody(BodyReceiver(response_body)) + + backend_response = yield response_body + + parsed_response = json.loads(backend_response) + self.report_id = parsed_response['report_id'] + self.backend_version = parsed_response['backend_version'] + log.debug("Created report with id %s" % parsed_response['report_id'])
diff --git a/ooni/runner.py b/ooni/runner.py index b92ad7e..1aac145 100644 --- a/ooni/runner.py +++ b/ooni/runner.py @@ -153,8 +153,10 @@ def loadTestsAndOptions(classes, cmd_line_options):
def runTestWithInput(test_class, test_method, test_input, oreporter): log.debug("Running %s with %s" % (test_method, test_input)) + def test_done(result, test_instance, test_name): - oreporter.testDone(test_instance, test_name) + log.debug("runTestWithInput: concluded %s" % test_name) + return oreporter.testDone(test_instance, test_name)
def test_error(error, test_instance, test_name): log.err("%s\n" % error) @@ -221,22 +223,21 @@ def runTestCases(test_cases, options, log.msg("Could not find inputs!") log.msg("options[0] = %s" % first) test_inputs = [None] - + reportFile = open(yamloo_filename, 'w+')
- #oreporter = reporter.YAMLReporter(reportFile) - oreporter = reporter.OONIBReporter('http://127.0.0.1:8888') + + if cmd_line_options['collector']: + oreporter = reporter.OONIBReporter(cmd_line_options['collector']) + else: + oreporter = reporter.YAMLReporter(reportFile)
input_unit_factory = InputUnitFactory(test_inputs)
log.debug("Creating report") - yield oreporter.createReport(options) - - oreporter = reporter.YAMLReporter(reportFile) - - input_unit_factory = InputUnitFactory(test_inputs)
yield oreporter.createReport(options) + # This deferred list is a deferred list of deferred lists # it is used to store all the deferreds of the tests that # are run diff --git a/ooni/utils/net.py b/ooni/utils/net.py index d43261a..3ddba61 100644 --- a/ooni/utils/net.py +++ b/ooni/utils/net.py @@ -7,7 +7,7 @@ import sys from zope.interface import implements
-from twisted.internet import protocol +from twisted.internet import protocol, defer from twisted.internet import threads, reactor from twisted.web.iweb import IBodyProducer
diff --git a/oonib/config.py b/oonib/config.py index dc2be2f..b34a8ae 100644 --- a/oonib/config.py +++ b/oonib/config.py @@ -18,11 +18,13 @@ main.db_threadpool_size = 10 helpers = Storage()
helpers.http_return_request = Storage() -helpers.http_return_request.port = 57001 +helpers.http_return_request.port = 57001 +# XXX this actually needs to be the advertised Server HTTP header of our web +# server helpers.http_return_request.server_version = "Apache"
helpers.tcp_echo = Storage() -helpers.tcp_echo.port = 57002 +helpers.tcp_echo.port = 57002
helpers.daphn3 = Storage() helpers.daphn3.yaml_file = "/path/to/data/oonib/daphn3.yaml" diff --git a/oonib/report/api.py b/oonib/report/api.py index 489cc25..bd409b0 100644 --- a/oonib/report/api.py +++ b/oonib/report/api.py @@ -59,6 +59,28 @@ def parseNewReportRequest(request): raise InvalidRequestField(k) return parsed_request
+def parseUpdateReportRequest(request): + # XXX this and the function above can probably be refactored into something + # more compact. There is quite a bit of code duplication going on here. + + report_id_regexp = re.compile("[a-zA-Z0-9]+$") + + # XXX here we are actually parsing a json object that could be quite big. + # If we want this to scale properly we only want to look at the test_id + # field. + # We are also keeping in memory multiple copies of the same object. A lot + # of optimization can be done. + parsed_request = json.loads(request) + try: + report_id = parsed_request['report_id'] + except KeyError: + raise MissingField('report_id') + + if not re.match(report_id_regexp, report_id): + raise InvalidRequestField('report_id') + + return parsed_request + class NewReportHandlerFile(web.RequestHandler): """ Responsible for creating and updating reports by writing to flat file. @@ -129,7 +151,20 @@ class NewReportHandlerFile(web.RequestHandler): 'content': 'XXX' } """ - pass + parsed_request = parseUpdateReportRequest(self.request.body) + report_id = parsed_request['report_id'] + print "Got this request %s" % parsed_request + + report_filename = report_id + report_filename += '.yamloo' + try: + with open(report_filename, 'a+') as f: + # XXX this could be quite big. We should probably use the + # twisted.internet.fdesc module + print parsed_request['content'] + f.write(parsed_request['content']) + except IOError as e: + web.HTTPError(404, "Report not found")
class NewReportHandlerDB(web.RequestHandler): """ @@ -188,7 +223,8 @@ class PCAPReportHandler(web.RequestHandler): def post(self): pass
-spec = [(r"/report/new", NewReportHandlerFile), - (r"/report/pcap", PCAPReportHandler)] +reportingBackendAPI = [(r"/report/new", NewReportHandlerFile), + (r"/report/pcap", PCAPReportHandler) +]
-reportingBackend = web.Application(spec, debug=True) +reportingBackend = web.Application(reportingBackendAPI, debug=True)
tor-commits@lists.torproject.org