commit 6e786ee25aaf9bf2f9abb93b2db3ec984d20f89c Author: Arturo Filastò art@fuffa.org Date: Thu Jun 6 21:46:16 2013 +0200
Move scripts/archive_reports.py to bin/archive_oonib_reports
This will make it so that the built rpm will also install the archiving script. --- .gitignore | 1 - bin/archive_oonib_reports | 174 ++++++++++++++++++++++++++++++++++++++++++++ scripts/archive_reports.py | 174 -------------------------------------------- setup.py | 2 +- 4 files changed, 175 insertions(+), 176 deletions(-)
diff --git a/.gitignore b/.gitignore index 7210250..628ccf5 100644 --- a/.gitignore +++ b/.gitignore @@ -17,7 +17,6 @@ dist build eggs parts -bin var sdist develop-eggs diff --git a/bin/archive_oonib_reports b/bin/archive_oonib_reports new file mode 100755 index 0000000..d8c1394 --- /dev/null +++ b/bin/archive_oonib_reports @@ -0,0 +1,174 @@ +#!/usr/bin/env python +import yaml +import sys +import glob +import fcntl +import os +import re +from ipaddr import IPAddress +from datetime import timedelta +from datetime import datetime +from oonib.otime import fromTimestamp, timestamp +from oonib.otime import InvalidTimestampFormat, utcDateNow +from oonib import log + +############################################################################### +# You can set some config options here # +############################################################################### +report_age = 1 # hours +report_archive_dir = '/var/spool/mlab_ooni/archived' +report_source_dir = '/var/spool/mlab_ooni' +valid_test_versions = ['0.1', '0.1.1', '0.4', '0.1.3'] +default_probe_cc = '??' +target_permission = 0444 +path_permission = 0755 +retry_attempts = 100 +############################################################################### + +now = utcDateNow() +delta = timedelta(hours=report_age) + +def filter_reports_by_age(report): + try: + ts,__,__ = os.path.basename(report).split('_') + if now - fromTimestamp(ts) > delta: + return True + except (InvalidTimestampFormat, ValueError): + return False + +class InvalidReportField(Exception): + pass + +def validate_fields(fields): + log.debug("Report fields are: %s" % fields) + + # check report version + if fields['test_version'] not in valid_test_versions: + raise InvalidReportField('test_version') + + # check report CC + #XXX: confirm what value we use for default CC and whether + # or not we should support > 2 character CC + if fields['probe_cc'] is None: + fields['probe_cc'] = default_probe_cc + if not re.match('[A-Z?]{2,4}', fields['probe_cc'].upper()): + raise InvalidReportField('probe_cc') + + # check report ASN + if fields['probe_asn'] is None: + fields['probe_asn'] = 'AS0' + if not re.match('^AS[0-9]{1,10}', fields['probe_asn'].upper()): + raise InvalidReportField('probe_asn') + + # check report timestamp + try: + datetime_ts = datetime.fromtimestamp(fields['start_time']) + datetime_str = timestamp(datetime_ts) + except InvalidTimestampFormat: + raise InvalidReportField('start_time') + + # check report IP + try: + IPAddress(fields['probe_ip']) + except ValueError: + raise InvalidReportField('probe_ip') + + # all looks good! + +def get_report_header_fields(report_header): + required_fields = ['probe_asn', 'probe_cc', 'probe_ip', 'start_time', + 'test_name', 'test_version'] + try: + return dict([(k,report_header[k]) for k in required_fields ]) + except KeyError: + return None + +def get_test_name(fields): + test_name = fields['test_name'].lower().replace(' ', '_') + return test_name + +def get_target_or_fail(fields, report): + # set the target filename + reportFormatVersion = fields['test_version'] + CC = fields['probe_cc'] + # XXX: wouldn't hurt to check timestamp for sanity again? + dateInISO8601Format,__,__ = os.path.basename(report).split('_') + probeASNumber = fields['probe_asn'] + testName = get_test_name(fields) + + # make sure path reportFormatVersion/CC exists + path = os.path.abspath(report_archive_dir) + for component in [reportFormatVersion, CC]: + path = os.path.join(path, component) + if not os.path.isdir(path): + try: + os.mkdir(path, path_permission) + log.debug("mkdir path: %s" % path) + except OSError: + return None + + # if the target file already exists, try to find another filename + filename = "%s-%s-%s.yamloo" % (testName, dateInISO8601Format, probeASNumber) + target = os.path.join(path, filename) + + # try to get a unique filename. os.open as used below requires + # that the file not already exist + naming_attempts = 1 + while os.path.exists(target) and naming_attempts < retry_attempts: + filename = "%s-%s-%s.%d.yamloo" % (testName, dateInISO8601Format, + probeASNumber, naming_attempts) + target = os.path.join(path, filename) + naming_attempts = naming_attempts + 1 + + if naming_attempts >= retry_attempts: + log.err("Failed getting unique filename %d times; skipping" % i) + return None + return target + +# grab list of reports +reports = glob.glob(report_source_dir+'/*') +reports_to_archive = filter(filter_reports_by_age, reports) + +# iterate over the reports to archive +for report in reports_to_archive: + log.debug("Parsing report: %s" % report) + try: + #XXX: verify that os.fdopen works as expected + f = os.fdopen(os.open(report, os.O_RDONLY|os.O_EXCL|os.O_NONBLOCK)) + except IOError: + log.err("Unable to get exclusive lock on %s; skipping" % report) + continue + + # parse the header and validate it + yamloo = yaml.safe_load_all(f) + report_header = yamloo.next() + fields = get_report_header_fields(report_header) + try: + validate_fields(fields) + except InvalidReportField, field_name: + log.err("Report %s contains invalid field called %s" % (report, field_name)) + continue + except: + log.err("An unhandled error occurred while processing %s" % report) + continue + + # get a target filename or fail + target = get_target_or_fail(fields, report) + if not target: + continue + + log.debug("target: %s" % target) + + try: + #XXX: My system does not have os.O_EXLOCK. Verify this works as is. + g = os.fdopen(os.open(target, os.O_CREAT|os.O_EXCL|os.O_NONBLOCK)) + + os.rename(report, target) + os.chmod(target, target_permission) + f.close() + g.close() + + except IOError: + # unable to lock the file... still held open? + log.err("Failed to lock target file. Possible race condition!") + continue diff --git a/scripts/archive_reports.py b/scripts/archive_reports.py deleted file mode 100755 index d8c1394..0000000 --- a/scripts/archive_reports.py +++ /dev/null @@ -1,174 +0,0 @@ -#!/usr/bin/env python -import yaml -import sys -import glob -import fcntl -import os -import re -from ipaddr import IPAddress -from datetime import timedelta -from datetime import datetime -from oonib.otime import fromTimestamp, timestamp -from oonib.otime import InvalidTimestampFormat, utcDateNow -from oonib import log - -############################################################################### -# You can set some config options here # -############################################################################### -report_age = 1 # hours -report_archive_dir = '/var/spool/mlab_ooni/archived' -report_source_dir = '/var/spool/mlab_ooni' -valid_test_versions = ['0.1', '0.1.1', '0.4', '0.1.3'] -default_probe_cc = '??' -target_permission = 0444 -path_permission = 0755 -retry_attempts = 100 -############################################################################### - -now = utcDateNow() -delta = timedelta(hours=report_age) - -def filter_reports_by_age(report): - try: - ts,__,__ = os.path.basename(report).split('_') - if now - fromTimestamp(ts) > delta: - return True - except (InvalidTimestampFormat, ValueError): - return False - -class InvalidReportField(Exception): - pass - -def validate_fields(fields): - log.debug("Report fields are: %s" % fields) - - # check report version - if fields['test_version'] not in valid_test_versions: - raise InvalidReportField('test_version') - - # check report CC - #XXX: confirm what value we use for default CC and whether - # or not we should support > 2 character CC - if fields['probe_cc'] is None: - fields['probe_cc'] = default_probe_cc - if not re.match('[A-Z?]{2,4}', fields['probe_cc'].upper()): - raise InvalidReportField('probe_cc') - - # check report ASN - if fields['probe_asn'] is None: - fields['probe_asn'] = 'AS0' - if not re.match('^AS[0-9]{1,10}', fields['probe_asn'].upper()): - raise InvalidReportField('probe_asn') - - # check report timestamp - try: - datetime_ts = datetime.fromtimestamp(fields['start_time']) - datetime_str = timestamp(datetime_ts) - except InvalidTimestampFormat: - raise InvalidReportField('start_time') - - # check report IP - try: - IPAddress(fields['probe_ip']) - except ValueError: - raise InvalidReportField('probe_ip') - - # all looks good! - -def get_report_header_fields(report_header): - required_fields = ['probe_asn', 'probe_cc', 'probe_ip', 'start_time', - 'test_name', 'test_version'] - try: - return dict([(k,report_header[k]) for k in required_fields ]) - except KeyError: - return None - -def get_test_name(fields): - test_name = fields['test_name'].lower().replace(' ', '_') - return test_name - -def get_target_or_fail(fields, report): - # set the target filename - reportFormatVersion = fields['test_version'] - CC = fields['probe_cc'] - # XXX: wouldn't hurt to check timestamp for sanity again? - dateInISO8601Format,__,__ = os.path.basename(report).split('_') - probeASNumber = fields['probe_asn'] - testName = get_test_name(fields) - - # make sure path reportFormatVersion/CC exists - path = os.path.abspath(report_archive_dir) - for component in [reportFormatVersion, CC]: - path = os.path.join(path, component) - if not os.path.isdir(path): - try: - os.mkdir(path, path_permission) - log.debug("mkdir path: %s" % path) - except OSError: - return None - - # if the target file already exists, try to find another filename - filename = "%s-%s-%s.yamloo" % (testName, dateInISO8601Format, probeASNumber) - target = os.path.join(path, filename) - - # try to get a unique filename. os.open as used below requires - # that the file not already exist - naming_attempts = 1 - while os.path.exists(target) and naming_attempts < retry_attempts: - filename = "%s-%s-%s.%d.yamloo" % (testName, dateInISO8601Format, - probeASNumber, naming_attempts) - target = os.path.join(path, filename) - naming_attempts = naming_attempts + 1 - - if naming_attempts >= retry_attempts: - log.err("Failed getting unique filename %d times; skipping" % i) - return None - return target - -# grab list of reports -reports = glob.glob(report_source_dir+'/*') -reports_to_archive = filter(filter_reports_by_age, reports) - -# iterate over the reports to archive -for report in reports_to_archive: - log.debug("Parsing report: %s" % report) - try: - #XXX: verify that os.fdopen works as expected - f = os.fdopen(os.open(report, os.O_RDONLY|os.O_EXCL|os.O_NONBLOCK)) - except IOError: - log.err("Unable to get exclusive lock on %s; skipping" % report) - continue - - # parse the header and validate it - yamloo = yaml.safe_load_all(f) - report_header = yamloo.next() - fields = get_report_header_fields(report_header) - try: - validate_fields(fields) - except InvalidReportField, field_name: - log.err("Report %s contains invalid field called %s" % (report, field_name)) - continue - except: - log.err("An unhandled error occurred while processing %s" % report) - continue - - # get a target filename or fail - target = get_target_or_fail(fields, report) - if not target: - continue - - log.debug("target: %s" % target) - - try: - #XXX: My system does not have os.O_EXLOCK. Verify this works as is. - g = os.fdopen(os.open(target, os.O_CREAT|os.O_EXCL|os.O_NONBLOCK)) - - os.rename(report, target) - os.chmod(target, target_permission) - f.close() - g.close() - - except IOError: - # unable to lock the file... still held open? - log.err("Failed to lock target file. Possible race condition!") - continue diff --git a/setup.py b/setup.py index 45a7908..b3d119a 100644 --- a/setup.py +++ b/setup.py @@ -31,7 +31,7 @@ setup( url="https://ooni.torproject.org", license="LICENSE", description="OONI-Probe Backend", - scripts=["bin/oonib"], + scripts=["bin/oonib", "bin/archive_oonib_reports"], packages=find_packages(), install_requires=install_requires, dependency_links=dependency_links,
tor-commits@lists.torproject.org