commit 0362ad88fbe2945f5311b3db5b67031a0e8b218d Author: Arturo Filastò arturo@filasto.net Date: Thu Jul 28 17:32:12 2016 +0200
The report log now is used only for measurements that are written to the ~/.ooni/measurements directory.
* Move tor related functions into utils onion. --- ooni/deck/deck.py | 4 +- ooni/director.py | 55 +------- ooni/reporter.py | 296 +++++++++++++++++++++++--------------------- ooni/scripts/oonireport.py | 128 ++++++++++++++----- ooni/tests/test_reporter.py | 87 ++++++------- ooni/utils/log.py | 4 +- ooni/utils/onion.py | 49 ++++++++ 7 files changed, 352 insertions(+), 271 deletions(-)
diff --git a/ooni/deck/deck.py b/ooni/deck/deck.py index b11e174..c9b3fc5 100644 --- a/ooni/deck/deck.py +++ b/ooni/deck/deck.py @@ -218,6 +218,7 @@ class NGDeck(object): net_test_loader = task.ooni["net_test_loader"] test_details = task.ooni["test_details"]
+ measurement_id = None report_filename = task.output_path if not task.output_path: measurement_id = task.id @@ -235,7 +236,8 @@ class NGDeck(object): net_test_loader, report_filename, collector_client=net_test_loader.collector, - test_details=test_details + test_details=test_details, + measurement_id=measurement_id ) d.addCallback(self._measurement_completed, task) d.addErrback(self._measurement_failed, task) diff --git a/ooni/director.py b/ooni/director.py index e3df907..464a203 100644 --- a/ooni/director.py +++ b/ooni/director.py @@ -1,4 +1,3 @@ -import pwd import os
from twisted.internet import defer @@ -7,7 +6,6 @@ from twisted.python.failure import Failure from ooni.managers import ReportEntryManager, MeasurementManager from ooni.reporter import Report from ooni.utils import log, generate_filename -from ooni.utils.net import randomFreePort from ooni.nettest import NetTest, getNetTestInformation from ooni.settings import config from ooni.nettest import normalizeTestName @@ -15,7 +13,7 @@ from ooni.deck.store import InputStore from ooni.geoip import probe_ip
from ooni.agent.scheduler import run_system_tasks -from ooni.utils.onion import start_tor, connect_to_control_port +from ooni.utils.onion import start_tor, connect_to_control_port, get_tor_config
class DirectorEvent(object): def __init__(self, type="update", message=""): @@ -299,7 +297,7 @@ class Director(object): @defer.inlineCallbacks def start_net_test_loader(self, net_test_loader, report_filename, collector_client=None, no_yamloo=False, - test_details=None): + test_details=None, measurement_id=None): """ Create the Report for the NetTest and start the report NetTest.
@@ -319,7 +317,8 @@ class Director(object): report = Report(test_details, report_filename, self.reportEntryManager, collector_client, - no_yamloo) + no_yamloo, + measurement_id)
yield report.open() net_test = NetTest(test_cases, test_details, report) @@ -392,50 +391,8 @@ class Director(object): raise exc
if config.advanced.start_tor and config.tor_state is None: - tor_config = TorConfig() - if config.tor.control_port is None: - config.tor.control_port = int(randomFreePort()) - if config.tor.socks_port is None: - config.tor.socks_port = int(randomFreePort()) - - tor_config.ControlPort = config.tor.control_port - tor_config.SocksPort = config.tor.socks_port - - if config.tor.data_dir: - data_dir = os.path.expanduser(config.tor.data_dir) - - if not os.path.exists(data_dir): - log.debug("%s does not exist. Creating it." % data_dir) - os.makedirs(data_dir) - tor_config.DataDirectory = data_dir - - if config.tor.bridges: - tor_config.UseBridges = 1 - if config.advanced.obfsproxy_binary: - tor_config.ClientTransportPlugin = ( - 'obfs2,obfs3 exec %s managed' % - config.advanced.obfsproxy_binary - ) - bridges = [] - with open(config.tor.bridges) as f: - for bridge in f: - if 'obfs' in bridge: - if config.advanced.obfsproxy_binary: - bridges.append(bridge.strip()) - else: - bridges.append(bridge.strip()) - tor_config.Bridge = bridges - - if config.tor.torrc: - for i in config.tor.torrc.keys(): - setattr(tor_config, i, config.tor.torrc[i]) - - if os.geteuid() == 0: - tor_config.User = pwd.getpwuid(os.geteuid()).pw_name - - tor_config.save() - log.debug("Setting control port as %s" % tor_config.ControlPort) - log.debug("Setting SOCKS port as %s" % tor_config.SocksPort) + tor_config = get_tor_config() + try: yield start_tor(tor_config) self._tor_starting.callback(self._tor_state) diff --git a/ooni/reporter.py b/ooni/reporter.py index 20a13f5..cf5341d 100644 --- a/ooni/reporter.py +++ b/ooni/reporter.py @@ -13,6 +13,7 @@ from yaml.emitter import Emitter from yaml.serializer import Serializer from yaml.resolver import Resolver
+from twisted.python.filepath import FilePath from twisted.python.util import untilConcludes from twisted.internet import defer from twisted.internet.error import ConnectionRefusedError @@ -362,154 +363,161 @@ class OONIBReporter(OReporter): log.debug("Closing report with id %s" % self.reportId) return self.collector_client.closeReport(self.reportId)
+class NoReportLog(Exception): + pass + class OONIBReportLog(object):
""" Used to keep track of report creation on a collector backend. """ + _date_format = "%Y%m%dT%H:%M:%SZ"
- def __init__(self, file_name=None): - if file_name is None: - file_name = config.report_log_file - self.file_name = file_name - self.create_report_log() - - def get_report_log(self): - with open(self.file_name) as f: - report_log = yaml.safe_load(f) - if not report_log: - report_log = {} # consumers expect dictionary structure - return report_log - - @property - def reports_incomplete(self): - reports = [] - report_log = self.get_report_log() - for report_file, value in report_log.items(): - if value['status'] in ('created'): - try: - os.kill(value['pid'], 0) - except: - reports.append((report_file, value)) - elif value['status'] in ('incomplete'): - reports.append((report_file, value)) - return reports - - @property - def reports_in_progress(self): - reports = [] - report_log = self.get_report_log() - for report_file, value in report_log.items(): - if value['status'] in ('created'): - try: - os.kill(value['pid'], 0) - reports.append((report_file, value)) - except: - pass - return reports - - @property - def reports_to_upload(self): - reports = [] - report_log = self.get_report_log() - for report_file, value in report_log.items(): - if value['status'] in ('creation-failed', 'not-created'): - reports.append((report_file, value)) - return reports - - def run(self, f, *arg, **kw): - lock = defer.DeferredFilesystemLock(self.file_name + '.lock') - d = lock.deferUntilLocked() - - def unlockAndReturn(r): + def __init__(self): + self.measurement_dir = FilePath(config.measurements_directory) + + def _parse_log_entry(self, in_file, measurement_id): + entry = json.load(in_file) + entry['last_update'] = datetime.strptime(entry['last_update'], + self._date_format) + entry['measurements_path'] = self.measurement_dir.child( + measurement_id).child('measurements.njson').path + entry['measurement_id'] = measurement_id + return entry + + def _lock_for_report_log(self, measurement_id): + lock_file = self.measurement_dir.child(measurement_id).child("report_log.lock") + return defer.DeferredFilesystemLock(lock_file.path) + + def _get_report_log_file(self, measurement_id): + report_log_file = self.measurement_dir.child(measurement_id).child("report_log.json") + return report_log_file + + @defer.inlineCallbacks + def get_report_log(self, measurement_id): + lock = self._lock_for_report_log(measurement_id) + yield lock.deferUntilLocked() + + report_log_file = self._get_report_log_file(measurement_id) + if not report_log_file.exists(): lock.unlock() - return r + raise NoReportLog
- def execute(_): - d = defer.maybeDeferred(f, *arg, **kw) - d.addBoth(unlockAndReturn) - return d + with report_log_file.open('r') as in_file: + entry = self._parse_log_entry(in_file, measurement_id)
- d.addCallback(execute) - return d + lock.unlock() + + defer.returnValue(entry)
- def create_report_log(self): - if not os.path.exists(self.file_name): - with open(self.file_name, 'w+') as f: - f.write(yaml.safe_dump({})) - - @contextmanager - def edit_log(self): - with open(self.file_name) as rfp: - report = yaml.safe_load(rfp) - # This should never happen. - if report is None: - report = {} - with open(self.file_name, 'w+') as wfp: + @defer.inlineCallbacks + def get_report_log_entries(self): + entries = [] + for measurement_id in self.measurement_dir.listdir(): try: - yield report - finally: - wfp.write(yaml.safe_dump(report)) - - def _not_created(self, report_file): - with self.edit_log() as report: - report[report_file] = { - 'pid': os.getpid(), - 'created_at': datetime.now(), - 'status': 'not-created', - 'collector': None - } + entry = yield self.get_report_log(measurement_id) + entries.append(entry) + except NoReportLog: + continue + defer.returnValue(entries)
- def not_created(self, report_file): - return self.run(self._not_created, report_file) - - def _created(self, report_file, collector_settings, report_id): - with self.edit_log() as report: - assert report_file is not None - report[report_file] = { - 'pid': os.getpid(), - 'created_at': datetime.now(), - 'status': 'created', - 'collector': collector_settings, - 'report_id': report_id - } - return report_id - - def created(self, report_file, collector_settings, report_id): - return self.run(self._created, report_file, - collector_settings, report_id) - - def _creation_failed(self, report_file, collector_settings): - with self.edit_log() as report: - report[report_file] = { - 'pid': os.getpid(), - 'created_at': datetime.now(), - 'status': 'creation-failed', - 'collector': collector_settings - } + @defer.inlineCallbacks + def update_log(self, measurement_id, value): + lock = self._lock_for_report_log(measurement_id) + yield lock.deferUntilLocked()
- def creation_failed(self, report_file, collector_settings): - return self.run(self._creation_failed, report_file, - collector_settings) + report_log_file = self._get_report_log_file(measurement_id) + with report_log_file.open('w+') as out_file: + entry = value + entry['last_update'] = datetime.utcnow().strftime(self._date_format) + json.dump(entry, out_file)
- def _incomplete(self, report_file): - with self.edit_log() as report: - if report[report_file]['status'] != "created": - raise errors.ReportNotCreated() - report[report_file]['status'] = 'incomplete' + lock.unlock()
- def incomplete(self, report_file): - return self.run(self._incomplete, report_file) + @defer.inlineCallbacks + def remove_log(self, measurement_id): + lock = self._lock_for_report_log(measurement_id) + yield lock.deferUntilLocked()
- def _closed(self, report_file): - with self.edit_log() as report: - rs = report[report_file]['status'] - if rs != "created" and rs != "incomplete": - raise errors.ReportNotCreated() - del report[report_file] + report_log_file = self._get_report_log_file(measurement_id) + try: + log.debug("Deleting log file") + report_log_file.remove() + except Exception as exc: + log.exception(exc) + + lock.unlock() + + @defer.inlineCallbacks + def get_incomplete(self): + incomplete_reports = [] + all_entries = yield self.get_report_log_entries() + for entry in all_entries[:]: + if entry['status'] in ('created',): + try: + os.kill(entry['pid'], 0) + except OSError: + incomplete_reports.append( + (entry['measurements_path'], entry) + ) + elif entry['status'] in ('incomplete',): + incomplete_reports.append( + (entry['measurements_path'], entry) + ) + defer.returnValue(incomplete_reports)
- def closed(self, report_file): - return self.run(self._closed, report_file) + @defer.inlineCallbacks + def get_in_progress(self): + in_progress_reports = [] + all_entries = yield self.get_report_log_entries() + for entry in all_entries[:]: + if entry['status'] in ('created',): + try: + os.kill(entry['pid'], 0) + in_progress_reports.append( + (entry['measurements_path'], entry) + ) + except OSError: + pass + defer.returnValue(in_progress_reports) + + @defer.inlineCallbacks + def get_to_upload(self): + to_upload_reports = [] + all_entries = yield self.get_report_log_entries() + for entry in all_entries[:]: + if entry['status'] in ('creation-failed', 'not-created'): + to_upload_reports.append( + (entry['measurements_path'], entry) + ) + defer.returnValue(to_upload_reports) + + def _update_status(self, measurement_id, status, collector_settings={}): + value = { + 'pid': os.getpid(), + 'status': status, + 'collector': collector_settings + } + return self.update_log(measurement_id, value) + + def not_created(self, measurement_id): + return self._update_status(measurement_id, 'not-created') + + def created(self, measurement_id, collector_settings): + return self._update_status(measurement_id, 'created', + collector_settings) + + + def creation_failed(self, measurement_id, collector_settings): + return self._update_status(measurement_id, 'creation-failed', + collector_settings) + + def incomplete(self, measurement_id, collector_settings): + return self._update_status(measurement_id, 'incomplete', + collector_settings) + + def closed(self, measurement_id): + return self.remove_log(measurement_id)
class Report(object): @@ -517,7 +525,7 @@ class Report(object):
def __init__(self, test_details, report_filename, reportEntryManager, collector_client=None, - no_njson=False): + no_njson=False, measurement_id=None): """ This is an abstraction layer on top of all the configured reporters.
@@ -542,10 +550,12 @@ class Report(object): """ self.test_details = test_details self.collector_client = collector_client + if report_filename is None: report_filename = self.generateReportFilename() self.report_filename = report_filename
+ self.measurement_id = measurement_id self.report_log = OONIBReportLog()
self.njson_reporter = None @@ -565,16 +575,17 @@ class Report(object): def open_oonib_reporter(self): def creation_failed(failure): self.oonib_reporter = None - return self.report_log.creation_failed(self.report_filename, - self.collector_client.settings) + if self.measurement_id: + return self.report_log.creation_failed(self.measurement_id, + self.collector_client.settings)
def created(report_id): if not self.oonib_reporter: return self.test_details['report_id'] = report_id - return self.report_log.created(self.report_filename, - self.collector_client.settings, - report_id) + if self.measurement_id: + return self.report_log.created(self.measurement_id, + self.collector_client.settings)
d = self.oonib_reporter.createReport() d.addErrback(creation_failed) @@ -595,8 +606,8 @@ class Report(object): if not self.no_njson: self.njson_reporter = NJSONReporter(self.test_details, self.report_filename) - if not self.oonib_reporter: - yield self.report_log.not_created(self.report_filename) + if not self.oonib_reporter and self.measurement_id: + yield self.report_log.not_created(self.measurement_id) yield defer.maybeDeferred(self.njson_reporter.createReport)
defer.returnValue(self.reportId) @@ -623,7 +634,9 @@ class Report(object): d.errback(failure)
def oonib_report_failed(failure): - return self.report_log.incomplete(self.report_filename) + if self.measurement_id: + return self.report_log.incomplete(self.measurement_id, + self.collector_client.settings)
def all_reports_written(_): if not d.called: @@ -662,7 +675,8 @@ class Report(object): d.errback(failure)
def oonib_report_closed(result): - return self.report_log.closed(self.report_filename) + if self.measurement_id: + return self.report_log.closed(self.measurement_id)
def oonib_report_failed(result): log.exception(result) diff --git a/ooni/scripts/oonireport.py b/ooni/scripts/oonireport.py index 8bcb1af..f59a843 100644 --- a/ooni/scripts/oonireport.py +++ b/ooni/scripts/oonireport.py @@ -2,6 +2,7 @@ from __future__ import print_function
import os import sys +import json import yaml
from twisted.python import usage @@ -21,7 +22,7 @@ def lookup_collector_client(report_header, bouncer): oonib_client = BouncerClient(bouncer) net_tests = [{ 'test-helpers': [], - 'input-hashes': report_header['input_hashes'], + 'input-hashes': [], 'name': report_header['test_name'], 'version': report_header['test_version'], }] @@ -33,36 +34,57 @@ def lookup_collector_client(report_header, bouncer): ) defer.returnValue(collector_client)
+class NoIDFound(Exception): + pass + +def report_path_to_id(report_file): + measurement_dir = os.path.dirname(report_file) + measurement_id = os.path.basename(measurement_dir) + if os.path.dirname(measurement_dir) != config.measurements_directory: + raise NoIDFound + return measurement_id + @defer.inlineCallbacks -def upload(report_file, collector=None, bouncer=None): +def upload(report_file, collector=None, bouncer=None, measurement_id=None): oonib_report_log = OONIBReportLog() collector_client = None if collector: collector_client = CollectorClient(address=collector)
+ try: + # Try to guess the measurement_id from the file path + measurement_id = report_path_to_id(report_file) + except NoIDFound: + pass + log.msg("Attempting to upload %s" % report_file)
- with open(config.report_log_file) as f: - report_log = yaml.safe_load(f) + if report_file.endswith(".njson"): + report = NJSONReportLoader(report_file) + else: + log.warn("Uploading of YAML formatted reports will be dropped in " + "future versions") + report = YAMLReportLoader(report_file)
- report = ReportLoader(report_file) if bouncer and collector_client is None: collector_client = yield lookup_collector_client(report.header, bouncer)
if collector_client is None: - try: - collector_settings = report_log[report_file]['collector'] - if collector_settings is None: - log.msg("Skipping uploading of %s since this measurement " - "was run by specifying no collector." % - report_file) + if measurement_id: + report_log = yield oonib_report_log.get_report_log(measurement_id) + collector_settings = report_log['collector'] + print(collector_settings) + if collector_settings is None or len(collector_settings) == 0: + log.warn("Skipping uploading of %s since this measurement " + "was run by specifying no collector." % + report_file) defer.returnValue(None) elif isinstance(collector_settings, dict): collector_client = CollectorClient(settings=collector_settings) elif isinstance(collector_settings, str): collector_client = CollectorClient(address=collector_settings) - except KeyError: + else: log.msg("Could not find %s in reporting.yaml. Looking up " "collector with canonical bouncer." % report_file) collector_client = yield lookup_collector_client(report.header, @@ -73,51 +95,59 @@ def upload(report_file, collector=None, bouncer=None): collector_client.settings)) report_id = yield oonib_reporter.createReport() report.header['report_id'] = report_id - yield oonib_report_log.created(report_file, - collector_client.settings, - report_id) + if measurement_id: + log.debug("Marking it as created") + yield oonib_report_log.created(measurement_id, + collector_client.settings) log.msg("Writing report entries") for entry in report: yield oonib_reporter.writeReportEntry(entry) - sys.stdout.write('.') - sys.stdout.flush() + log.msg("Written entry") log.msg("Closing report") yield oonib_reporter.finish() - yield oonib_report_log.closed(report_file) + if measurement_id: + log.debug("Closing log") + yield oonib_report_log.closed(measurement_id)
@defer.inlineCallbacks -def upload_all(collector=None, bouncer=None): +def upload_all(collector=None, bouncer=None, upload_incomplete=False): oonib_report_log = OONIBReportLog()
- for report_file, value in oonib_report_log.reports_to_upload: + reports_to_upload = yield oonib_report_log.get_to_upload() + for report_file, value in reports_to_upload: try: - yield upload(report_file, collector, bouncer) + yield upload(report_file, collector, bouncer, + value['measurement_id']) except Exception as exc: log.exception(exc)
def print_report(report_file, value): print("* %s" % report_file) - print(" %s" % value['created_at']) + print(" %s" % value['last_update'])
+@defer.inlineCallbacks def status(): oonib_report_log = OONIBReportLog()
+ reports_to_upload = yield oonib_report_log.get_to_upload() print("Reports to be uploaded") print("----------------------") - for report_file, value in oonib_report_log.reports_to_upload: + for report_file, value in reports_to_upload: print_report(report_file, value)
+ reports_in_progress = yield oonib_report_log.get_in_progress() print("Reports in progress") print("-------------------") - for report_file, value in oonib_report_log.reports_in_progress: + for report_file, value in reports_in_progress: print_report(report_file, value)
+ reports_incomplete = yield oonib_report_log.get_incomplete() print("Incomplete reports") print("------------------") - for report_file, value in oonib_report_log.reports_incomplete: + for report_file, value in reports_incomplete: print_report(report_file, value)
class ReportLoader(object): @@ -125,24 +155,34 @@ class ReportLoader(object): 'probe_asn', 'probe_cc', 'probe_ip', - 'start_time', + 'probe_city', + 'test_start_time', 'test_name', 'test_version', 'options', 'input_hashes', 'software_name', - 'software_version' + 'software_version', + 'data_format_version', + 'report_id', + 'test_helpers', + 'annotations', + 'id' )
+ def __iter__(self): + return self + + def close(self): + self._fp.close() + +class YAMLReportLoader(ReportLoader): def __init__(self, report_filename): self._fp = open(report_filename) self._yfp = yaml.safe_load_all(self._fp)
self.header = self._yfp.next()
- def __iter__(self): - return self - def next(self): try: return self._yfp.next() @@ -150,8 +190,30 @@ class ReportLoader(object): self.close() raise StopIteration
- def close(self): - self._fp.close() +class NJSONReportLoader(ReportLoader): + def __init__(self, report_filename): + self._fp = open(report_filename) + self.header = self._peek_header() + + def _peek_header(self): + header = {} + first_entry = json.loads(next(self._fp)) + for key in self._header_keys: + header[key] = first_entry.get(key, None) + self._fp.seek(0) + return header + + def next(self): + try: + entry = json.loads(next(self._fp)) + for key in self._header_keys: + entry.pop(key, None) + test_keys = entry.pop('test_keys') + entry.update(test_keys) + return entry + except StopIteration: + self.close() + raise StopIteration
class Options(usage.Options):
@@ -218,11 +280,13 @@ def oonireport(_reactor=reactor, _args=sys.argv[1:]): options['bouncer'] = CANONICAL_BOUNCER_ONION
if options['command'] == "upload" and options['report_file']: + log.start() tor_check() return upload(options['report_file'], options['collector'], options['bouncer']) elif options['command'] == "upload": + log.start() tor_check() return upload_all(options['collector'], options['bouncer']) diff --git a/ooni/tests/test_reporter.py b/ooni/tests/test_reporter.py index 8f32733..cbfdaeb 100644 --- a/ooni/tests/test_reporter.py +++ b/ooni/tests/test_reporter.py @@ -2,11 +2,12 @@ import os import yaml import json import time -from mock import MagicMock +import shutil
from twisted.internet import defer from twisted.trial import unittest
+from ooni.tests.bases import ConfigTestCase from ooni import errors as e from ooni.tests.mocks import MockCollectorClient from ooni.reporter import YAMLReporter, OONIBReporter, OONIBReportLog @@ -114,65 +115,57 @@ class TestOONIBReporter(unittest.TestCase): req = {'content': 'something'} yield self.oonib_reporter.writeReportEntry(req)
-class TestOONIBReportLog(unittest.TestCase): +class TestOONIBReportLog(ConfigTestCase):
def setUp(self): - self.report_log = OONIBReportLog('report_log') - self.report_log.create_report_log() + super(TestOONIBReportLog, self).setUp() + self.report_log = OONIBReportLog() + self.measurement_id = '20160727T182604Z-ZZ-AS0-dummy' + self.measurement_dir = os.path.join( + self.config.measurements_directory, + self.measurement_id + ) + self.report_log_path = os.path.join(self.measurement_dir, + 'report_log.json') + os.mkdir(self.measurement_dir)
def tearDown(self): - os.remove(self.report_log.file_name) + shutil.rmtree(self.measurement_dir) + super(TestOONIBReportLog, self).tearDown()
@defer.inlineCallbacks def test_report_created(self): - yield self.report_log.created("path_to_my_report.yaml", - 'httpo://foo.onion', - 'someid') - with open(self.report_log.file_name) as f: - report = yaml.safe_load(f) - assert "path_to_my_report.yaml" in report - - @defer.inlineCallbacks - def test_concurrent_edit(self): - d1 = self.report_log.created("path_to_my_report1.yaml", - 'httpo://foo.onion', - 'someid1') - d2 = self.report_log.created("path_to_my_report2.yaml", - 'httpo://foo.onion', - 'someid2') - yield defer.DeferredList([d1, d2]) - with open(self.report_log.file_name) as f: - report = yaml.safe_load(f) - assert "path_to_my_report1.yaml" in report - assert "path_to_my_report2.yaml" in report + yield self.report_log.created(self.measurement_id, {}) + with open(self.report_log_path) as f: + report = json.load(f) + self.assertEqual(report['status'], 'created')
@defer.inlineCallbacks def test_report_closed(self): - yield self.report_log.created("path_to_my_report.yaml", - 'httpo://foo.onion', - 'someid') - yield self.report_log.closed("path_to_my_report.yaml") + yield self.report_log.created(self.measurement_id, {}) + yield self.report_log.closed(self.measurement_id)
- with open(self.report_log.file_name) as f: - report = yaml.safe_load(f) - assert "path_to_my_report.yaml" not in report + self.assertFalse(os.path.exists(self.report_log_path))
@defer.inlineCallbacks def test_report_creation_failed(self): - yield self.report_log.creation_failed("path_to_my_report.yaml", - 'httpo://foo.onion') - with open(self.report_log.file_name) as f: - report = yaml.safe_load(f) - assert "path_to_my_report.yaml" in report - assert report["path_to_my_report.yaml"]["status"] == "creation-failed" + yield self.report_log.creation_failed(self.measurement_id, {}) + with open(self.report_log_path) as f: + report = json.load(f) + self.assertEqual(report["status"], "creation-failed") + + @defer.inlineCallbacks + def test_list_reports_in_progress(self): + yield self.report_log.created(self.measurement_id, {}) + in_progress = yield self.report_log.get_in_progress() + incomplete = yield self.report_log.get_incomplete() + self.assertEqual(len(incomplete), 0) + self.assertEqual(len(in_progress), 1)
@defer.inlineCallbacks - def test_list_reports(self): - yield self.report_log.creation_failed("failed_report.yaml", - 'httpo://foo.onion') - yield self.report_log.created("created_report.yaml", - 'httpo://foo.onion', 'XXXX') - - assert len(self.report_log.reports_in_progress) == 1 - assert len(self.report_log.reports_incomplete) == 0 - assert len(self.report_log.reports_to_upload) == 1 + def test_list_reports_to_upload(self): + yield self.report_log.creation_failed(self.measurement_id, {}) + incomplete = yield self.report_log.get_incomplete() + to_upload = yield self.report_log.get_to_upload() + self.assertEqual(len(incomplete), 0) + self.assertEqual(len(to_upload), 1) diff --git a/ooni/utils/log.py b/ooni/utils/log.py index c8a7360..982a353 100644 --- a/ooni/utils/log.py +++ b/ooni/utils/log.py @@ -114,7 +114,7 @@ class OONILogger(object): else: tw_log.err(msg, source="ooni")
- def warn(self, *arg, **kw): + def warn(self, msg, *arg, **kw): text = log_encode(msg) tw_log.msg(text, log_level=levels['WARNING'], source="ooni")
@@ -165,3 +165,5 @@ stop = oonilogger.stop msg = oonilogger.msg debug = oonilogger.debug err = oonilogger.err +warn = oonilogger.warn +exception = oonilogger.exception diff --git a/ooni/utils/onion.py b/ooni/utils/onion.py index df9dfec..6e0d906 100644 --- a/ooni/utils/onion.py +++ b/ooni/utils/onion.py @@ -1,5 +1,6 @@ import os import re +import pwd import string import StringIO import subprocess @@ -12,6 +13,7 @@ from twisted.internet.endpoints import TCP4ClientEndpoint from txtorcon import TorConfig, TorState, launch_tor, build_tor_connection from txtorcon.util import find_tor_binary as tx_find_tor_binary
+from ooni.utils.net import randomFreePort from ooni import constants from ooni import errors from ooni.utils import log @@ -213,6 +215,53 @@ def get_client_transport(transport): raise UninstalledTransport
+def get_tor_config(): + tor_config = TorConfig() + if config.tor.control_port is None: + config.tor.control_port = int(randomFreePort()) + if config.tor.socks_port is None: + config.tor.socks_port = int(randomFreePort()) + + tor_config.ControlPort = config.tor.control_port + tor_config.SocksPort = config.tor.socks_port + + if config.tor.data_dir: + data_dir = os.path.expanduser(config.tor.data_dir) + + if not os.path.exists(data_dir): + log.debug("%s does not exist. Creating it." % data_dir) + os.makedirs(data_dir) + tor_config.DataDirectory = data_dir + + if config.tor.bridges: + tor_config.UseBridges = 1 + if config.advanced.obfsproxy_binary: + tor_config.ClientTransportPlugin = ( + 'obfs2,obfs3 exec %s managed' % + config.advanced.obfsproxy_binary + ) + bridges = [] + with open(config.tor.bridges) as f: + for bridge in f: + if 'obfs' in bridge: + if config.advanced.obfsproxy_binary: + bridges.append(bridge.strip()) + else: + bridges.append(bridge.strip()) + tor_config.Bridge = bridges + + if config.tor.torrc: + for i in config.tor.torrc.keys(): + setattr(tor_config, i, config.tor.torrc[i]) + + if os.geteuid() == 0: + tor_config.User = pwd.getpwuid(os.geteuid()).pw_name + + tor_config.save() + log.debug("Setting control port as %s" % tor_config.ControlPort) + log.debug("Setting SOCKS port as %s" % tor_config.SocksPort) + return tor_config + class TorLauncherWithRetries(object): def __init__(self, tor_config, timeout=config.tor.timeout): self.retry_with = ["obfs4", "meek"]
tor-commits@lists.torproject.org