commit 95a2f8b482ee882b4ec898a7bbc76018a6d98659 Author: Karsten Loesing karsten.loesing@gmx.net Date: Tue Jul 14 10:14:22 2020 +0200
Take out Tor summaries and the do_complete switch.
Implements tpo/metrics/onionperf#40005. --- onionperf/analysis.py | 70 ++++++++++++------------------------ onionperf/measurement.py | 2 +- onionperf/onionperf | 9 ++--- onionperf/reprocessing.py | 8 ++--- onionperf/tests/test_reprocessing.py | 2 +- 5 files changed, 30 insertions(+), 61 deletions(-)
diff --git a/onionperf/analysis.py b/onionperf/analysis.py index 19c0192..26b00b2 100644 --- a/onionperf/analysis.py +++ b/onionperf/analysis.py @@ -29,13 +29,7 @@ class OPAnalysis(Analysis): def add_torctl_file(self, filepath): self.torctl_filepaths.append(filepath)
- def get_tor_bandwidth_summary(self, node, direction): - try: - return self.json_db['data'][node]['tor']['bandwidth_summary'][direction] - except: - return None - - def analyze(self, do_complete=False, date_filter=None): + def analyze(self, date_filter=None): if self.did_analysis: return
@@ -47,7 +41,7 @@ class OPAnalysis(Analysis): if len(filepaths) > 0: for filepath in filepaths: logging.info("parsing log file at {0}".format(filepath)) - parser.parse(util.DataSource(filepath), do_complete=do_complete) + parser.parse(util.DataSource(filepath), do_complete=True)
if self.nickname is None: parsed_name = parser.get_name() @@ -134,7 +128,7 @@ class OPAnalysis(Analysis):
class Parser(object, metaclass=ABCMeta): @abstractmethod - def parse(self, source, do_complete): + def parse(self, source, do_complete=True): pass @abstractmethod def get_data(self): @@ -270,14 +264,10 @@ class TorCtlParser(Parser):
def __init__(self, date_filter=None): ''' date_filter should be given in UTC ''' - self.do_complete = False - self.bandwidth_summary = {'bytes_read':{}, 'bytes_written':{}} self.circuits_state = {} self.circuits = {} - self.circuits_summary = {'buildtimes':[], 'lifetimes':[]} self.streams_state = {} self.streams = {} - self.streams_summary = {'lifetimes':{}} self.name = None self.boot_succeeded = False self.build_timeout_last = None @@ -320,15 +310,10 @@ class TorCtlParser(Parser):
data = circ.get_data() if data is not None: - if built is not None and started is not None and len(data['path']) == 3: - self.circuits_summary['buildtimes'].append(built - started) - if ended is not None and started is not None: - self.circuits_summary['lifetimes'].append(ended - started) - if self.do_complete: - self.circuits[cid] = data + self.circuits[cid] = data self.circuits_state.pop(cid)
- elif self.do_complete and isinstance(event, CircMinorEvent): + elif isinstance(event, CircMinorEvent): if event.purpose != event.old_purpose or event.event != CircEvent.PURPOSE_CHANGED: key = "{0}:{1}".format(event.event, event.purpose) circ.add_event(key, arrival_dt) @@ -364,15 +349,9 @@ class TorCtlParser(Parser):
data = strm.get_data() if data is not None: - if self.do_complete: - self.streams[sid] = data - self.streams_summary['lifetimes'].setdefault(stream_type, []).append(ended - started) + self.streams[sid] = data self.streams_state.pop(sid)
- def __handle_bw(self, event, arrival_dt): - self.bandwidth_summary['bytes_read'][int(arrival_dt)] = event.read - self.bandwidth_summary['bytes_written'][int(arrival_dt)] = event.written - def __handle_buildtimeout(self, event, arrival_dt): self.build_timeout_last = event.timeout self.build_quantile_last = event.quantile @@ -382,8 +361,6 @@ class TorCtlParser(Parser): self.__handle_circuit(event, arrival_dt) elif isinstance(event, StreamEvent): self.__handle_stream(event, arrival_dt) - elif isinstance(event, BandwidthEvent): - self.__handle_bw(event, arrival_dt) elif isinstance(event, BuildTimeoutSetEvent): self.__handle_buildtimeout(event, arrival_dt)
@@ -408,27 +385,26 @@ class TorCtlParser(Parser): elif re.search("BOOTSTRAP", line) is not None and re.search("PROGRESS=100", line) is not None: self.boot_succeeded = True
- if self.do_complete or (self.do_complete is False and re.search("650\sBW", line) is not None): - # parse with stem - timestamps, sep, raw_event_str = line.partition(" 650 ") - if sep == '': - return True + # parse with stem + timestamps, sep, raw_event_str = line.partition(" 650 ") + if sep == '': + return True + + # event.arrived_at is also available but at worse granularity + unix_ts = float(timestamps.strip().split()[2])
- # event.arrived_at is also available but at worse granularity - unix_ts = float(timestamps.strip().split()[2]) + # check if we should ignore the line + line_date = datetime.datetime.utcfromtimestamp(unix_ts).date() + if not self.__is_date_valid(line_date): + return True
- # check if we should ignore the line - line_date = datetime.datetime.utcfromtimestamp(unix_ts).date() - if not self.__is_date_valid(line_date): - return True + event = ControlMessage.from_str("{0} {1}".format(sep.strip(), raw_event_str)) + convert('EVENT', event) + self.__handle_event(event, unix_ts)
- event = ControlMessage.from_str("{0} {1}".format(sep.strip(), raw_event_str)) - convert('EVENT', event) - self.__handle_event(event, unix_ts) return True
- def parse(self, source, do_complete=False): - self.do_complete = do_complete + def parse(self, source, do_complete=True): source.open(newline='\r\n') for line in source: # ignore line parsing errors @@ -442,9 +418,7 @@ class TorCtlParser(Parser): source.close()
def get_data(self): - return {'circuits': self.circuits, 'circuits_summary': self.circuits_summary, - 'streams':self.streams, 'streams_summary': self.streams_summary, - 'bandwidth_summary': self.bandwidth_summary} + return {'circuits': self.circuits, 'streams': self.streams}
def get_name(self): return self.name diff --git a/onionperf/measurement.py b/onionperf/measurement.py index 9d84d4e..823f37a 100644 --- a/onionperf/measurement.py +++ b/onionperf/measurement.py @@ -157,7 +157,7 @@ def logrotate_thread_task(writables, tgen_writable, torctl_writable, docroot, ni anal.add_torctl_file(torctl_writable.rotate_file(filename_datetime=next_midnight))
# run the analysis, i.e. parse the files - anal.analyze(do_simple=False) + anal.analyze()
# save the results in onionperf json format in the www docroot anal.save(output_prefix=docroot, do_compress=True, date_prefix=next_midnight.date()) diff --git a/onionperf/onionperf b/onionperf/onionperf index 5e2c2fb..36520ad 100755 --- a/onionperf/onionperf +++ b/onionperf/onionperf @@ -279,11 +279,6 @@ files generated by this script will be written""", action="store", dest="date_prefix", default=None)
- analyze_parser.add_argument('-s', '--do-simple-parse', - help="""parse and export only summary statistics rather than full transfer/circuit/stream data""", - action="store_false", dest="do_complete", - default=True) - # visualize visualize_parser = sub_parser.add_parser('visualize', description=DESC_VISUALIZE, help=HELP_VISUALIZE, formatter_class=my_formatter_class) @@ -387,7 +382,7 @@ def analyze(args): analysis.add_tgen_file(args.tgen_logpath) if args.torctl_logpath is not None: analysis.add_torctl_file(args.torctl_logpath) - analysis.analyze(args.do_complete, date_filter=args.date_filter) + analysis.analyze(date_filter=args.date_filter) analysis.save(output_prefix=args.prefix, date_prefix=args.date_prefix)
elif args.tgen_logpath is not None and os.path.isdir(args.tgen_logpath) and args.torctl_logpath is not None and os.path.isdir(args.torctl_logpath): @@ -396,7 +391,7 @@ def analyze(args): torctl_logs = reprocessing.collect_logs(args.torctl_logpath, '*torctl.log*') log_pairs = reprocessing.match(tgen_logs, torctl_logs, args.date_filter) logging.info("Found {0} matching log pairs to be reprocessed".format(len(log_pairs))) - reprocessing.multiprocess_logs(log_pairs, args.prefix, args.nickname, args.do_complete) + reprocessing.multiprocess_logs(log_pairs, args.prefix, args.nickname)
else: logging.error("Given paths were an unrecognized mix of file and directory paths, nothing will be analyzed") diff --git a/onionperf/reprocessing.py b/onionperf/reprocessing.py index 8d8dcd4..d949dca 100644 --- a/onionperf/reprocessing.py +++ b/onionperf/reprocessing.py @@ -46,21 +46,21 @@ def match(tgen_logs, tor_logs, date_filter): return log_pairs
-def analyze_func(prefix, nick, do_complete, pair): +def analyze_func(prefix, nick, pair): analysis = OPAnalysis(nickname=nick) logging.info('Analysing pair for date {0}'.format(pair[2])) analysis.add_tgen_file(pair[0]) analysis.add_torctl_file(pair[1]) - analysis.analyze(do_complete=do_complete, date_filter=pair[2]) + analysis.analyze(date_filter=pair[2]) analysis.save(output_prefix=prefix) return 1
-def multiprocess_logs(log_pairs, prefix, nick=None, do_complete=False): +def multiprocess_logs(log_pairs, prefix, nick=None): pool = Pool(cpu_count()) analyses = None try: - func = partial(analyze_func, prefix, nick, do_complete) + func = partial(analyze_func, prefix, nick) mr = pool.map_async(func, log_pairs) pool.close() while not mr.ready(): diff --git a/onionperf/tests/test_reprocessing.py b/onionperf/tests/test_reprocessing.py index 5e758d2..b51d154 100644 --- a/onionperf/tests/test_reprocessing.py +++ b/onionperf/tests/test_reprocessing.py @@ -61,7 +61,7 @@ def test_log_match_with_wrong_filter_date(): def test_analyze_func_json(): pair = (DATA_DIR + 'logs/onionperf_2019-01-10_23:59:59.tgen.log', DATA_DIR + 'logs/onionperf_2019-01-10_23:59:59.torctl.log', datetime.datetime(2019, 1, 10, 0, 0)) work_dir = tempfile.mkdtemp() - reprocessing.analyze_func(work_dir, None, False, pair) + reprocessing.analyze_func(work_dir, None, pair) json_file = os.path.join(work_dir, "2019-01-10.onionperf.analysis.json.xz") assert(os.path.exists(json_file)) shutil.rmtree(work_dir)