[tor-commits] [onionperf/master] Update Analysis and TGenParser classes to use TGenTools

karsten at torproject.org karsten at torproject.org
Mon Aug 10 09:30:41 UTC 2020


commit 0a64de95106fcc3fb389165a74d99200cf4e18ea
Author: Ana Custura <ana at netstat.org.uk>
Date:   Fri Jun 26 11:01:16 2020 +0100

    Update Analysis and TGenParser classes to use TGenTools
---
 onionperf/analysis.py | 283 ++------------------------------------------------
 1 file changed, 8 insertions(+), 275 deletions(-)

diff --git a/onionperf/analysis.py b/onionperf/analysis.py
index f845dd2..2466aad 100644
--- a/onionperf/analysis.py
+++ b/onionperf/analysis.py
@@ -16,48 +16,28 @@ from stem import CircEvent, CircStatus, CircPurpose, StreamStatus
 from stem.response.events import CircuitEvent, CircMinorEvent, StreamEvent, BandwidthEvent, BuildTimeoutSetEvent
 from stem.response import ControlMessage, convert
 
+# tgentools imports
+from tgentools.analysis import Analysis, TGenParser
+
 # onionperf imports
 from . import util
 
-class Analysis(object):
+class OPAnalysis(Analysis):
 
     def __init__(self, nickname=None, ip_address=None):
-        self.nickname = nickname
-        self.measurement_ip = ip_address
-        self.hostname = gethostname().split('.')[0]
+        super().__init__(nickname, ip_address)
         self.json_db = {'type':'onionperf', 'version':'2.0', 'data':{}}
-        self.tgen_filepaths = []
         self.torctl_filepaths = []
-        self.date_filter = None
-        self.did_analysis = False
-
-    def add_tgen_file(self, filepath):
-        self.tgen_filepaths.append(filepath)
 
     def add_torctl_file(self, filepath):
         self.torctl_filepaths.append(filepath)
 
-    def get_nodes(self):
-        return list(self.json_db['data'].keys())
-
     def get_tor_bandwidth_summary(self, node, direction):
         try:
             return self.json_db['data'][node]['tor']['bandwidth_summary'][direction]
         except:
             return None
 
-    def get_tgen_transfers(self, node):
-        try:
-            return self.json_db['data'][node]['tgen']['transfers']
-        except:
-            return None
-
-    def get_tgen_transfers_summary(self, node):
-        try:
-            return self.json_db['data'][node]['tgen']['transfers_summary']
-        except:
-            return None
-
     def analyze(self, do_complete=False, date_filter=None):
         if self.did_analysis:
             return
@@ -84,17 +64,11 @@ class Analysis(object):
                 if self.measurement_ip is None:
                     self.measurement_ip = "unknown"
 
-                self.json_db['data'].setdefault(self.nickname, {'measurement_ip': self.measurement_ip}).setdefault(json_db_key, parser.get_data())
-
+                self.json_db['data'].setdefault(self.nickname, {'measurement_ip' : self.measurement_ip}).setdefault(json_db_key, parser.get_data())
+                self.json_db['data'][self.nickname]["tgen"].pop("heartbeats")
+                self.json_db['data'][self.nickname]["tgen"].pop("init_ts")
         self.did_analysis = True
 
-    def merge(self, analysis):
-        for nickname in analysis.json_db['data']:
-            if nickname in self.json_db['data']:
-                raise Exception("Merge does not yet support multiple Analysis objects from the same node \
-                (add multiple files from the same node to the same Analysis object before calling analyze instead)")
-            else:
-                self.json_db['data'][nickname] = analysis.json_db['data'][nickname]
 
     def save(self, filename=None, output_prefix=os.getcwd(), do_compress=True, date_prefix=None):
         if filename is None:
@@ -147,150 +121,6 @@ class Analysis(object):
             analysis_instance.json_db = db
             return analysis_instance
 
-def subproc_analyze_func(analysis_args):
-    signal(SIGINT, SIG_IGN)  # ignore interrupts
-    a = analysis_args[0]
-    do_complete = analysis_args[1]
-    a.analyze(do_complete=do_complete)
-    return a
-
-class ParallelAnalysis(Analysis):
-
-    def analyze(self, search_path, do_complete=False, nickname=None, tgen_search_expressions=["tgen.*\.log"],
-                torctl_search_expressions=["torctl.*\.log"], num_subprocs=cpu_count()):
-
-        pathpairs = util.find_file_paths_pairs(search_path, tgen_search_expressions, torctl_search_expressions)
-        logging.info("processing input from {0} nodes...".format(len(pathpairs)))
-
-        analysis_jobs = []
-        for (tgen_filepaths, torctl_filepaths) in pathpairs:
-            a = Analysis()
-            for tgen_filepath in tgen_filepaths:
-                a.add_tgen_file(tgen_filepath)
-            for torctl_filepath in torctl_filepaths:
-                a.add_torctl_file(torctl_filepath)
-            analysis_args = [a, do_complete]
-            analysis_jobs.append(analysis_args)
-
-        analyses = None
-        pool = Pool(num_subprocs if num_subprocs > 0 else cpu_count())
-        try:
-            mr = pool.map_async(subproc_analyze_func, analysis_jobs)
-            pool.close()
-            while not mr.ready(): mr.wait(1)
-            analyses = mr.get()
-        except KeyboardInterrupt:
-            logging.info("interrupted, terminating process pool")
-            pool.terminate()
-            pool.join()
-            sys.exit()
-
-        logging.info("merging {0} analysis results now...".format(len(analyses)))
-        while analyses is not None and len(analyses) > 0:
-            self.merge(analyses.pop())
-        logging.info("done merging results: {0} total nicknames present in json db".format(len(self.json_db['data'])))
-
-class TransferStatusEvent(object):
-
-    def __init__(self, line):
-        self.is_success = False
-        self.is_error = False
-        self.is_complete = False
-
-        parts = line.strip().split()
-        self.unix_ts_end = util.timestamp_to_seconds(parts[2])
-
-        transport_parts = parts[8].split(',')
-        self.endpoint_local = transport_parts[2]
-        self.endpoint_proxy = transport_parts[3]
-        self.endpoint_remote = transport_parts[4]
-
-        transfer_parts = parts[10].split(',')
-
-        # for id, combine the time with the transfer num; this is unique for each node,
-        # as long as the node was running tgen without restarting for 100 seconds or longer
-        # #self.transfer_id = "{0}-{1}".format(round(self.unix_ts_end, -2), transfer_num)
-        self.transfer_id = "{0}:{1}".format(transfer_parts[0], transfer_parts[1])  # id:count
-
-        self.hostname_local = transfer_parts[2]
-        self.method = transfer_parts[3]  # 'GET' or 'PUT'
-        self.filesize_bytes = int(transfer_parts[4])
-        self.hostname_remote = transfer_parts[5]
-        self.error_code = transfer_parts[8].split('=')[1]
-
-        self.total_bytes_read = int(parts[11].split('=')[1])
-        self.total_bytes_write = int(parts[12].split('=')[1])
-
-        # the commander is the side that sent the command,
-        # i.e., the side that is driving the download, i.e., the client side
-        progress_parts = parts[13].split('=')
-        self.is_commander = (self.method == 'GET' and 'read' in progress_parts[0]) or \
-                            (self.method == 'PUT' and 'write' in progress_parts[0])
-        self.payload_bytes_status = int(progress_parts[1].split('/')[0])
-
-        self.unconsumed_parts = None if len(parts) < 16 else parts[15:]
-        self.elapsed_seconds = {}
-
-class TransferCompleteEvent(TransferStatusEvent):
-    def __init__(self, line):
-        super(TransferCompleteEvent, self).__init__(line)
-        self.is_complete = True
-
-        i = 0
-        elapsed_seconds = 0.0
-        # match up self.unconsumed_parts[0:11] with the events in the transfer_steps enum
-        for k in ['socket_create', 'socket_connect', 'proxy_init', 'proxy_choice', 'proxy_request',
-                  'proxy_response', 'command', 'response', 'first_byte', 'last_byte', 'checksum']:
-            # parse out the elapsed time value
-            keyval = self.unconsumed_parts[i]
-            i += 1
-
-            val = float(int(keyval.split('=')[1]))
-            if val >= 0.0:
-                elapsed_seconds = val / 1000000.0  # usecs to secs
-                self.elapsed_seconds.setdefault(k, elapsed_seconds)
-
-        self.unix_ts_start = self.unix_ts_end - elapsed_seconds
-        del(self.unconsumed_parts)
-
-class TransferSuccessEvent(TransferCompleteEvent):
-    def __init__(self, line):
-        super(TransferSuccessEvent, self).__init__(line)
-        self.is_success = True
-
-class TransferErrorEvent(TransferCompleteEvent):
-    def __init__(self, line):
-        super(TransferErrorEvent, self).__init__(line)
-        self.is_error = True
-
-class Transfer(object):
-    def __init__(self, tid):
-        self.id = tid
-        self.last_event = None
-        self.payload_progress = {decile:None for decile in [0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0]}
-        self.payload_bytes = {partial:None for partial in [10240, 20480, 51200, 102400, 204800, 512000, 1048576, 2097152, 5242880]}
-
-    def add_event(self, status_event):
-        progress_frac = float(status_event.payload_bytes_status) / float(status_event.filesize_bytes)
-        progress = float(status_event.payload_bytes_status)
-        for partial in sorted(self.payload_bytes.keys()):
-            if progress >= partial and self.payload_bytes[partial] is None:
-                self.payload_bytes[partial] = status_event.unix_ts_end
-        for decile in sorted(self.payload_progress.keys()):
-            if progress_frac >= decile and self.payload_progress[decile] is None:
-                self.payload_progress[decile] = status_event.unix_ts_end
-        self.last_event = status_event
-
-    def get_data(self):
-        e = self.last_event
-        if e is None or not e.is_complete:
-            return None
-        d = e.__dict__
-        if not e.is_error:
-            d['elapsed_seconds']['payload_progress'] = {decile: round(self.payload_progress[decile] - e.unix_ts_start, 6) for decile in self.payload_progress if self.payload_progress[decile] is not None}
-            d['elapsed_seconds']['payload_bytes'] = {partial: round(self.payload_bytes[partial] - e.unix_ts_start, 6) for partial in self.payload_bytes if self.payload_bytes[partial] is not None}
-        return d
-
 class Parser(object, metaclass=ABCMeta):
     @abstractmethod
     def parse(self, source, do_complete):
@@ -302,103 +132,6 @@ class Parser(object, metaclass=ABCMeta):
     def get_name(self):
         pass
 
-class TGenParser(Parser):
-
-    def __init__(self, date_filter=None):
-        ''' date_filter should be given in UTC '''
-        self.state = {}
-        self.transfers = {}
-        self.transfers_summary = {'time_to_first_byte':{}, 'time_to_last_byte':{}, 'errors':{}}
-        self.name = None
-        self.date_filter = date_filter
-
-    def __is_date_valid(self, date_to_check):
-        if self.date_filter is None:
-            # we are not asked to filter, so every date is valid
-            return True
-        else:
-            # we are asked to filter, so the line is only valid if the date matches the filter
-            # both the filter and the unix timestamp should be in UTC at this point
-            return util.do_dates_match(self.date_filter, date_to_check)
-
-    def __parse_line(self, line, do_complete):
-        if self.name is None and re.search("Initializing traffic generator on host", line) is not None:
-            self.name = line.strip().split()[11]
-
-        if self.date_filter is not None:
-            parts = line.split(' ', 3)
-            if len(parts) < 4: # the 3rd is the timestamp, the 4th is the rest of the line
-                return True
-            unix_ts = float(parts[2])
-            line_date = datetime.datetime.utcfromtimestamp(unix_ts).date()
-            if not self.__is_date_valid(line_date):
-                return True
-
-        if do_complete and re.search("state\sRESPONSE\sto\sstate\sPAYLOAD", line) is not None:
-            # another run of tgen starts the id over counting up from 1
-            # if a prev transfer with the same id did not complete, we can be sure it never will
-            parts = line.strip().split()
-            transfer_parts = parts[7].strip().split(',')
-            transfer_id = "{0}:{1}".format(transfer_parts[0], transfer_parts[1])  # id:count
-            if transfer_id in self.state:
-                self.state.pop(transfer_id)
-
-        elif do_complete and re.search("transfer-status", line) is not None:
-            status = TransferStatusEvent(line)
-            xfer = self.state.setdefault(status.transfer_id, Transfer(status.transfer_id))
-            xfer.add_event(status)
-
-        elif re.search("transfer-complete", line) is not None:
-            complete = TransferSuccessEvent(line)
-
-            if do_complete:
-                xfer = self.state.setdefault(complete.transfer_id, Transfer(complete.transfer_id))
-                xfer.add_event(complete)
-                self.transfers[xfer.id] = xfer.get_data()
-                self.state.pop(complete.transfer_id)
-
-            filesize, second = complete.filesize_bytes, int(complete.unix_ts_end)
-            fb_secs = complete.elapsed_seconds['first_byte'] - complete.elapsed_seconds['command']
-            lb_secs = complete.elapsed_seconds['last_byte'] - complete.elapsed_seconds['command']
-
-            fb_list = self.transfers_summary['time_to_first_byte'].setdefault(filesize, {}).setdefault(second, [])
-            fb_list.append(fb_secs)
-            lb_list = self.transfers_summary['time_to_last_byte'].setdefault(filesize, {}).setdefault(second, [])
-            lb_list.append(lb_secs)
-
-        elif re.search("transfer-error", line) is not None:
-            error = TransferErrorEvent(line)
-
-            if do_complete:
-                xfer = self.state.setdefault(error.transfer_id, Transfer(error.transfer_id))
-                xfer.add_event(error)
-                self.transfers[xfer.id] = xfer.get_data()
-                self.state.pop(error.transfer_id)
-
-            err_code, filesize, second = error.error_code, error.filesize_bytes, int(error.unix_ts_end)
-
-            err_list = self.transfers_summary['errors'].setdefault(err_code, {}).setdefault(second, [])
-            err_list.append(filesize)
-
-        return True
-
-    def parse(self, source, do_complete=False):
-        source.open()
-        for line in source:
-            # ignore line parsing errors
-            try:
-                if not self.__parse_line(line, do_complete):
-                    break
-            except:
-                logging.warning("TGenParser: skipping line due to parsing error: {0}".format(line))
-                continue
-        source.close()
-
-    def get_data(self):
-        return {'transfers':self.transfers, 'transfers_summary': self.transfers_summary}
-
-    def get_name(self):
-        return self.name
 
 class TorStream(object):
     def __init__(self, sid):





More information about the tor-commits mailing list