commit 0a64de95106fcc3fb389165a74d99200cf4e18ea Author: Ana Custura ana@netstat.org.uk Date: Fri Jun 26 11:01:16 2020 +0100
Update Analysis and TGenParser classes to use TGenTools --- onionperf/analysis.py | 283 ++------------------------------------------------ 1 file changed, 8 insertions(+), 275 deletions(-)
diff --git a/onionperf/analysis.py b/onionperf/analysis.py index f845dd2..2466aad 100644 --- a/onionperf/analysis.py +++ b/onionperf/analysis.py @@ -16,48 +16,28 @@ from stem import CircEvent, CircStatus, CircPurpose, StreamStatus from stem.response.events import CircuitEvent, CircMinorEvent, StreamEvent, BandwidthEvent, BuildTimeoutSetEvent from stem.response import ControlMessage, convert
+# tgentools imports +from tgentools.analysis import Analysis, TGenParser + # onionperf imports from . import util
-class Analysis(object): +class OPAnalysis(Analysis):
def __init__(self, nickname=None, ip_address=None): - self.nickname = nickname - self.measurement_ip = ip_address - self.hostname = gethostname().split('.')[0] + super().__init__(nickname, ip_address) self.json_db = {'type':'onionperf', 'version':'2.0', 'data':{}} - self.tgen_filepaths = [] self.torctl_filepaths = [] - self.date_filter = None - self.did_analysis = False - - def add_tgen_file(self, filepath): - self.tgen_filepaths.append(filepath)
def add_torctl_file(self, filepath): self.torctl_filepaths.append(filepath)
- def get_nodes(self): - return list(self.json_db['data'].keys()) - def get_tor_bandwidth_summary(self, node, direction): try: return self.json_db['data'][node]['tor']['bandwidth_summary'][direction] except: return None
- def get_tgen_transfers(self, node): - try: - return self.json_db['data'][node]['tgen']['transfers'] - except: - return None - - def get_tgen_transfers_summary(self, node): - try: - return self.json_db['data'][node]['tgen']['transfers_summary'] - except: - return None - def analyze(self, do_complete=False, date_filter=None): if self.did_analysis: return @@ -84,17 +64,11 @@ class Analysis(object): if self.measurement_ip is None: self.measurement_ip = "unknown"
- self.json_db['data'].setdefault(self.nickname, {'measurement_ip': self.measurement_ip}).setdefault(json_db_key, parser.get_data()) - + self.json_db['data'].setdefault(self.nickname, {'measurement_ip' : self.measurement_ip}).setdefault(json_db_key, parser.get_data()) + self.json_db['data'][self.nickname]["tgen"].pop("heartbeats") + self.json_db['data'][self.nickname]["tgen"].pop("init_ts") self.did_analysis = True
- def merge(self, analysis): - for nickname in analysis.json_db['data']: - if nickname in self.json_db['data']: - raise Exception("Merge does not yet support multiple Analysis objects from the same node \ - (add multiple files from the same node to the same Analysis object before calling analyze instead)") - else: - self.json_db['data'][nickname] = analysis.json_db['data'][nickname]
def save(self, filename=None, output_prefix=os.getcwd(), do_compress=True, date_prefix=None): if filename is None: @@ -147,150 +121,6 @@ class Analysis(object): analysis_instance.json_db = db return analysis_instance
-def subproc_analyze_func(analysis_args): - signal(SIGINT, SIG_IGN) # ignore interrupts - a = analysis_args[0] - do_complete = analysis_args[1] - a.analyze(do_complete=do_complete) - return a - -class ParallelAnalysis(Analysis): - - def analyze(self, search_path, do_complete=False, nickname=None, tgen_search_expressions=["tgen.*.log"], - torctl_search_expressions=["torctl.*.log"], num_subprocs=cpu_count()): - - pathpairs = util.find_file_paths_pairs(search_path, tgen_search_expressions, torctl_search_expressions) - logging.info("processing input from {0} nodes...".format(len(pathpairs))) - - analysis_jobs = [] - for (tgen_filepaths, torctl_filepaths) in pathpairs: - a = Analysis() - for tgen_filepath in tgen_filepaths: - a.add_tgen_file(tgen_filepath) - for torctl_filepath in torctl_filepaths: - a.add_torctl_file(torctl_filepath) - analysis_args = [a, do_complete] - analysis_jobs.append(analysis_args) - - analyses = None - pool = Pool(num_subprocs if num_subprocs > 0 else cpu_count()) - try: - mr = pool.map_async(subproc_analyze_func, analysis_jobs) - pool.close() - while not mr.ready(): mr.wait(1) - analyses = mr.get() - except KeyboardInterrupt: - logging.info("interrupted, terminating process pool") - pool.terminate() - pool.join() - sys.exit() - - logging.info("merging {0} analysis results now...".format(len(analyses))) - while analyses is not None and len(analyses) > 0: - self.merge(analyses.pop()) - logging.info("done merging results: {0} total nicknames present in json db".format(len(self.json_db['data']))) - -class TransferStatusEvent(object): - - def __init__(self, line): - self.is_success = False - self.is_error = False - self.is_complete = False - - parts = line.strip().split() - self.unix_ts_end = util.timestamp_to_seconds(parts[2]) - - transport_parts = parts[8].split(',') - self.endpoint_local = transport_parts[2] - self.endpoint_proxy = transport_parts[3] - self.endpoint_remote = transport_parts[4] - - transfer_parts = parts[10].split(',') - - # for id, combine the time with the transfer num; this is unique for each node, - # as long as the node was running tgen without restarting for 100 seconds or longer - # #self.transfer_id = "{0}-{1}".format(round(self.unix_ts_end, -2), transfer_num) - self.transfer_id = "{0}:{1}".format(transfer_parts[0], transfer_parts[1]) # id:count - - self.hostname_local = transfer_parts[2] - self.method = transfer_parts[3] # 'GET' or 'PUT' - self.filesize_bytes = int(transfer_parts[4]) - self.hostname_remote = transfer_parts[5] - self.error_code = transfer_parts[8].split('=')[1] - - self.total_bytes_read = int(parts[11].split('=')[1]) - self.total_bytes_write = int(parts[12].split('=')[1]) - - # the commander is the side that sent the command, - # i.e., the side that is driving the download, i.e., the client side - progress_parts = parts[13].split('=') - self.is_commander = (self.method == 'GET' and 'read' in progress_parts[0]) or \ - (self.method == 'PUT' and 'write' in progress_parts[0]) - self.payload_bytes_status = int(progress_parts[1].split('/')[0]) - - self.unconsumed_parts = None if len(parts) < 16 else parts[15:] - self.elapsed_seconds = {} - -class TransferCompleteEvent(TransferStatusEvent): - def __init__(self, line): - super(TransferCompleteEvent, self).__init__(line) - self.is_complete = True - - i = 0 - elapsed_seconds = 0.0 - # match up self.unconsumed_parts[0:11] with the events in the transfer_steps enum - for k in ['socket_create', 'socket_connect', 'proxy_init', 'proxy_choice', 'proxy_request', - 'proxy_response', 'command', 'response', 'first_byte', 'last_byte', 'checksum']: - # parse out the elapsed time value - keyval = self.unconsumed_parts[i] - i += 1 - - val = float(int(keyval.split('=')[1])) - if val >= 0.0: - elapsed_seconds = val / 1000000.0 # usecs to secs - self.elapsed_seconds.setdefault(k, elapsed_seconds) - - self.unix_ts_start = self.unix_ts_end - elapsed_seconds - del(self.unconsumed_parts) - -class TransferSuccessEvent(TransferCompleteEvent): - def __init__(self, line): - super(TransferSuccessEvent, self).__init__(line) - self.is_success = True - -class TransferErrorEvent(TransferCompleteEvent): - def __init__(self, line): - super(TransferErrorEvent, self).__init__(line) - self.is_error = True - -class Transfer(object): - def __init__(self, tid): - self.id = tid - self.last_event = None - self.payload_progress = {decile:None for decile in [0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0]} - self.payload_bytes = {partial:None for partial in [10240, 20480, 51200, 102400, 204800, 512000, 1048576, 2097152, 5242880]} - - def add_event(self, status_event): - progress_frac = float(status_event.payload_bytes_status) / float(status_event.filesize_bytes) - progress = float(status_event.payload_bytes_status) - for partial in sorted(self.payload_bytes.keys()): - if progress >= partial and self.payload_bytes[partial] is None: - self.payload_bytes[partial] = status_event.unix_ts_end - for decile in sorted(self.payload_progress.keys()): - if progress_frac >= decile and self.payload_progress[decile] is None: - self.payload_progress[decile] = status_event.unix_ts_end - self.last_event = status_event - - def get_data(self): - e = self.last_event - if e is None or not e.is_complete: - return None - d = e.__dict__ - if not e.is_error: - d['elapsed_seconds']['payload_progress'] = {decile: round(self.payload_progress[decile] - e.unix_ts_start, 6) for decile in self.payload_progress if self.payload_progress[decile] is not None} - d['elapsed_seconds']['payload_bytes'] = {partial: round(self.payload_bytes[partial] - e.unix_ts_start, 6) for partial in self.payload_bytes if self.payload_bytes[partial] is not None} - return d - class Parser(object, metaclass=ABCMeta): @abstractmethod def parse(self, source, do_complete): @@ -302,103 +132,6 @@ class Parser(object, metaclass=ABCMeta): def get_name(self): pass
-class TGenParser(Parser): - - def __init__(self, date_filter=None): - ''' date_filter should be given in UTC ''' - self.state = {} - self.transfers = {} - self.transfers_summary = {'time_to_first_byte':{}, 'time_to_last_byte':{}, 'errors':{}} - self.name = None - self.date_filter = date_filter - - def __is_date_valid(self, date_to_check): - if self.date_filter is None: - # we are not asked to filter, so every date is valid - return True - else: - # we are asked to filter, so the line is only valid if the date matches the filter - # both the filter and the unix timestamp should be in UTC at this point - return util.do_dates_match(self.date_filter, date_to_check) - - def __parse_line(self, line, do_complete): - if self.name is None and re.search("Initializing traffic generator on host", line) is not None: - self.name = line.strip().split()[11] - - if self.date_filter is not None: - parts = line.split(' ', 3) - if len(parts) < 4: # the 3rd is the timestamp, the 4th is the rest of the line - return True - unix_ts = float(parts[2]) - line_date = datetime.datetime.utcfromtimestamp(unix_ts).date() - if not self.__is_date_valid(line_date): - return True - - if do_complete and re.search("state\sRESPONSE\sto\sstate\sPAYLOAD", line) is not None: - # another run of tgen starts the id over counting up from 1 - # if a prev transfer with the same id did not complete, we can be sure it never will - parts = line.strip().split() - transfer_parts = parts[7].strip().split(',') - transfer_id = "{0}:{1}".format(transfer_parts[0], transfer_parts[1]) # id:count - if transfer_id in self.state: - self.state.pop(transfer_id) - - elif do_complete and re.search("transfer-status", line) is not None: - status = TransferStatusEvent(line) - xfer = self.state.setdefault(status.transfer_id, Transfer(status.transfer_id)) - xfer.add_event(status) - - elif re.search("transfer-complete", line) is not None: - complete = TransferSuccessEvent(line) - - if do_complete: - xfer = self.state.setdefault(complete.transfer_id, Transfer(complete.transfer_id)) - xfer.add_event(complete) - self.transfers[xfer.id] = xfer.get_data() - self.state.pop(complete.transfer_id) - - filesize, second = complete.filesize_bytes, int(complete.unix_ts_end) - fb_secs = complete.elapsed_seconds['first_byte'] - complete.elapsed_seconds['command'] - lb_secs = complete.elapsed_seconds['last_byte'] - complete.elapsed_seconds['command'] - - fb_list = self.transfers_summary['time_to_first_byte'].setdefault(filesize, {}).setdefault(second, []) - fb_list.append(fb_secs) - lb_list = self.transfers_summary['time_to_last_byte'].setdefault(filesize, {}).setdefault(second, []) - lb_list.append(lb_secs) - - elif re.search("transfer-error", line) is not None: - error = TransferErrorEvent(line) - - if do_complete: - xfer = self.state.setdefault(error.transfer_id, Transfer(error.transfer_id)) - xfer.add_event(error) - self.transfers[xfer.id] = xfer.get_data() - self.state.pop(error.transfer_id) - - err_code, filesize, second = error.error_code, error.filesize_bytes, int(error.unix_ts_end) - - err_list = self.transfers_summary['errors'].setdefault(err_code, {}).setdefault(second, []) - err_list.append(filesize) - - return True - - def parse(self, source, do_complete=False): - source.open() - for line in source: - # ignore line parsing errors - try: - if not self.__parse_line(line, do_complete): - break - except: - logging.warning("TGenParser: skipping line due to parsing error: {0}".format(line)) - continue - source.close() - - def get_data(self): - return {'transfers':self.transfers, 'transfers_summary': self.transfers_summary} - - def get_name(self): - return self.name
class TorStream(object): def __init__(self, sid):
tor-commits@lists.torproject.org