commit 05eb9cdf56f6ae275ace65a0bdbdcf2c3b5e1c40 Author: Ana Custura ana@netstat.org.uk Date: Fri Jun 26 10:52:46 2020 +0100
Update do_simple analysis param to new do_complete tgen semantics --- onionperf/analysis.py | 42 +++++++++++++++++++++--------------------- onionperf/onionperf | 8 ++++---- onionperf/reprocessing.py | 8 ++++---- 3 files changed, 29 insertions(+), 29 deletions(-)
diff --git a/onionperf/analysis.py b/onionperf/analysis.py index 20ca354..eaacbb9 100644 --- a/onionperf/analysis.py +++ b/onionperf/analysis.py @@ -58,7 +58,7 @@ class Analysis(object): except: return None
- def analyze(self, do_simple=True, date_filter=None): + def analyze(self, do_complete=False, date_filter=None): if self.did_analysis: return
@@ -70,7 +70,7 @@ class Analysis(object): if len(filepaths) > 0: for filepath in filepaths: logging.info("parsing log file at {0}".format(filepath)) - parser.parse(util.DataSource(filepath), do_simple=do_simple) + parser.parse(util.DataSource(filepath), do_complete=do_complete)
if self.nickname is None: parsed_name = parser.get_name() @@ -150,13 +150,13 @@ class Analysis(object): def subproc_analyze_func(analysis_args): signal(SIGINT, SIG_IGN) # ignore interrupts a = analysis_args[0] - do_simple = analysis_args[1] - a.analyze(do_simple=do_simple) + do_complete = analysis_args[1] + a.analyze(do_complete=do_complete) return a
class ParallelAnalysis(Analysis):
- def analyze(self, search_path, do_simple=True, nickname=None, tgen_search_expressions=["tgen.*.log"], + def analyze(self, search_path, do_complete=False, nickname=None, tgen_search_expressions=["tgen.*.log"], torctl_search_expressions=["torctl.*.log"], num_subprocs=cpu_count()):
pathpairs = util.find_file_paths_pairs(search_path, tgen_search_expressions, torctl_search_expressions) @@ -169,7 +169,7 @@ class ParallelAnalysis(Analysis): a.add_tgen_file(tgen_filepath) for torctl_filepath in torctl_filepaths: a.add_torctl_file(torctl_filepath) - analysis_args = [a, do_simple] + analysis_args = [a, do_complete] analysis_jobs.append(analysis_args)
analyses = None @@ -293,7 +293,7 @@ class Transfer(object):
class Parser(object, metaclass=ABCMeta): @abstractmethod - def parse(self, source, do_simple): + def parse(self, source, do_complete): pass @abstractmethod def get_data(self): @@ -321,7 +321,7 @@ class TGenParser(Parser): # both the filter and the unix timestamp should be in UTC at this point return util.do_dates_match(self.date_filter, date_to_check)
- def __parse_line(self, line, do_simple): + def __parse_line(self, line, do_complete): if self.name is None and re.search("Initializing traffic generator on host", line) is not None: self.name = line.strip().split()[11]
@@ -334,7 +334,7 @@ class TGenParser(Parser): if not self.__is_date_valid(line_date): return True
- if not do_simple and re.search("state\sRESPONSE\sto\sstate\sPAYLOAD", line) is not None: + if do_complete and re.search("state\sRESPONSE\sto\sstate\sPAYLOAD", line) is not None: # another run of tgen starts the id over counting up from 1 # if a prev transfer with the same id did not complete, we can be sure it never will parts = line.strip().split() @@ -343,7 +343,7 @@ class TGenParser(Parser): if transfer_id in self.state: self.state.pop(transfer_id)
- elif not do_simple and re.search("transfer-status", line) is not None: + elif do_complete and re.search("transfer-status", line) is not None: status = TransferStatusEvent(line) xfer = self.state.setdefault(status.transfer_id, Transfer(status.transfer_id)) xfer.add_event(status) @@ -351,7 +351,7 @@ class TGenParser(Parser): elif re.search("transfer-complete", line) is not None: complete = TransferSuccessEvent(line)
- if not do_simple: + if do_complete: xfer = self.state.setdefault(complete.transfer_id, Transfer(complete.transfer_id)) xfer.add_event(complete) self.transfers[xfer.id] = xfer.get_data() @@ -369,7 +369,7 @@ class TGenParser(Parser): elif re.search("transfer-error", line) is not None: error = TransferErrorEvent(line)
- if not do_simple: + if do_complete: xfer = self.state.setdefault(error.transfer_id, Transfer(error.transfer_id)) xfer.add_event(error) self.transfers[xfer.id] = xfer.get_data() @@ -382,12 +382,12 @@ class TGenParser(Parser):
return True
- def parse(self, source, do_simple=True): + def parse(self, source, do_complete=False): source.open() for line in source: # ignore line parsing errors try: - if not self.__parse_line(line, do_simple): + if not self.__parse_line(line, do_complete): break except: logging.warning("TGenParser: skipping line due to parsing error: {0}".format(line)) @@ -526,7 +526,7 @@ class TorCtlParser(Parser):
def __init__(self, date_filter=None): ''' date_filter should be given in UTC ''' - self.do_simple = True + self.do_complete = False self.bandwidth_summary = {'bytes_read':{}, 'bytes_written':{}} self.circuits_state = {} self.circuits = {} @@ -580,11 +580,11 @@ class TorCtlParser(Parser): self.circuits_summary['buildtimes'].append(built - started) if ended is not None and started is not None: self.circuits_summary['lifetimes'].append(ended - started) - if not self.do_simple: + if self.do_complete: self.circuits[cid] = data self.circuits_state.pop(cid)
- elif not self.do_simple and isinstance(event, CircMinorEvent): + elif self.do_complete and isinstance(event, CircMinorEvent): if event.purpose != event.old_purpose or event.event != CircEvent.PURPOSE_CHANGED: key = "{0}:{1}".format(event.event, event.purpose) circ.add_event(key, arrival_dt) @@ -620,7 +620,7 @@ class TorCtlParser(Parser):
data = strm.get_data() if data is not None: - if not self.do_simple: + if self.do_complete: self.streams[sid] = data self.streams_summary['lifetimes'].setdefault(stream_type, []).append(ended - started) self.streams_state.pop(sid) @@ -664,7 +664,7 @@ class TorCtlParser(Parser): elif re.search("BOOTSTRAP", line) is not None and re.search("PROGRESS=100", line) is not None: self.boot_succeeded = True
- if self.do_simple is False or (self.do_simple is True and re.search("650\sBW", line) is not None): + if self.do_complete or (self.do_complete is False and re.search("650\sBW", line) is not None): # parse with stem timestamps, sep, raw_event_str = line.partition(" 650 ") if sep == '': @@ -683,8 +683,8 @@ class TorCtlParser(Parser): self.__handle_event(event, unix_ts) return True
- def parse(self, source, do_simple=True): - self.do_simple = do_simple + def parse(self, source, do_complete=False): + self.do_complete = do_complete source.open(newline='\r\n') for line in source: # ignore line parsing errors diff --git a/onionperf/onionperf b/onionperf/onionperf index a7d32f6..ddbeaf1 100755 --- a/onionperf/onionperf +++ b/onionperf/onionperf @@ -281,8 +281,8 @@ files generated by this script will be written""",
analyze_parser.add_argument('-s', '--do-simple-parse', help="""parse and export only summary statistics rather than full transfer/circuit/stream data""", - action="store_true", dest="do_simple", - default=False) + action="store_false", dest="do_complete", + default=True)
# visualize visualize_parser = sub_parser.add_parser('visualize', description=DESC_VISUALIZE, help=HELP_VISUALIZE, @@ -387,7 +387,7 @@ def analyze(args): analysis.add_tgen_file(args.tgen_logpath) if args.torctl_logpath is not None: analysis.add_torctl_file(args.torctl_logpath) - analysis.analyze(args.do_simple, date_filter=args.date_filter) + analysis.analyze(args.do_complete, date_filter=args.date_filter) analysis.save(output_prefix=args.prefix, date_prefix=args.date_prefix)
elif args.tgen_logpath is not None and os.path.isdir(args.tgen_logpath) and args.torctl_logpath is not None and os.path.isdir(args.torctl_logpath): @@ -396,7 +396,7 @@ def analyze(args): torctl_logs = reprocessing.collect_logs(args.torctl_logpath, '*torctl.log*') log_pairs = reprocessing.match(tgen_logs, torctl_logs, args.date_filter) logging.info("Found {0} matching log pairs to be reprocessed".format(len(log_pairs))) - reprocessing.multiprocess_logs(log_pairs, args.prefix, args.nickname, args.do_simple) + reprocessing.multiprocess_logs(log_pairs, args.prefix, args.nickname, args.do_complete)
else: logging.error("Given paths were an unrecognized mix of file and directory paths, nothing will be analyzed") diff --git a/onionperf/reprocessing.py b/onionperf/reprocessing.py index f88f311..ad0308f 100644 --- a/onionperf/reprocessing.py +++ b/onionperf/reprocessing.py @@ -46,21 +46,21 @@ def match(tgen_logs, tor_logs, date_filter): return log_pairs
-def analyze_func(prefix, nick, do_simple, pair): +def analyze_func(prefix, nick, do_complete, pair): analysis = Analysis(nickname=nick) logging.info('Analysing pair for date {0}'.format(pair[2])) analysis.add_tgen_file(pair[0]) analysis.add_torctl_file(pair[1]) - analysis.analyze(do_simple=do_simple, date_filter=pair[2]) + analysis.analyze(do_complete=do_complete, date_filter=pair[2]) analysis.save(output_prefix=prefix) return 1
-def multiprocess_logs(log_pairs, prefix, nick=None, do_simple=False): +def multiprocess_logs(log_pairs, prefix, nick=None, do_complete=False): pool = Pool(cpu_count()) analyses = None try: - func = partial(analyze_func, prefix, nick, do_simple) + func = partial(analyze_func, prefix, nick, do_complete) mr = pool.map_async(func, log_pairs) pool.close() while not mr.ready():