commit 9d0c80561b84905e72b41758dfcb7b712f5f407f
Author: Karsten Loesing <karsten.loesing(a)gmx.net>
Date: Mon Aug 31 11:30:53 2020 +0200
Change filter mode to filter Tor circuits.
This new filter mode removes Tor circuits that don't match the
provided fingerprints and leaves TGen transfers/streams untouched. At
the same time the visualize mode only includes TGen transfers/streams
with an existing mapping between TGen transfers/streams and Tor
streams/circuits.
This patch changes the default behavior of the visualize mode. The
original behavior of visualizing TGen transfers/streams *without* an
existing mapping to Tor streams/circuits can be selected with the
--outer-join switch, even though that's rather an edge use case.
Another minor change is that the filtered analysis files is not
written with sort_keys=True anymore, which would have produced a newly
sorted file with keys in alphabetic order rather than the original
insert order. The result is an actually useful diff.
---
onionperf/analysis.py | 13 +++++-------
onionperf/filtering.py | 49 ++++++++--------------------------------------
onionperf/onionperf | 24 +++++++++++++++++------
onionperf/visualization.py | 33 ++++++++++++++++++++-----------
4 files changed, 53 insertions(+), 66 deletions(-)
diff --git a/onionperf/analysis.py b/onionperf/analysis.py
index b2f483f..49c109c 100644
--- a/onionperf/analysis.py
+++ b/onionperf/analysis.py
@@ -62,13 +62,7 @@ class OPAnalysis(Analysis):
self.json_db['data'][self.nickname]["tgen"].pop("stream_summary")
self.did_analysis = True
- def set_tgen_transfers(self, node, tgen_transfers):
- self.json_db['data'][node]['tgen']['transfers'] = tgen_transfers
-
- def set_tgen_streams(self, node, tgen_streams):
- self.json_db['data'][node]['tgen']['streams'] = tgen_streams
-
- def save(self, filename=None, output_prefix=os.getcwd(), do_compress=True, date_prefix=None):
+ def save(self, filename=None, output_prefix=os.getcwd(), do_compress=True, date_prefix=None, sort_keys=True):
if filename is None:
base_filename = "onionperf.analysis.json.xz"
if date_prefix is not None:
@@ -85,7 +79,7 @@ class OPAnalysis(Analysis):
logging.info("saving analysis results to {0}".format(filepath))
outf = util.FileWritable(filepath, do_compress=do_compress)
- json.dump(self.json_db, outf, sort_keys=True, separators=(',', ': '), indent=2)
+ json.dump(self.json_db, outf, sort_keys=sort_keys, separators=(',', ': '), indent=2)
outf.close()
logging.info("done!")
@@ -109,6 +103,9 @@ class OPAnalysis(Analysis):
except:
return None
+ def set_tor_circuits(self, node, tor_circuits):
+ self.json_db['data'][node]['tor']['circuits'] = tor_circuits
+
def get_tor_streams(self, node):
try:
return self.json_db['data'][node]['tor']['streams']
diff --git a/onionperf/filtering.py b/onionperf/filtering.py
index 9e7b34f..1b614d6 100644
--- a/onionperf/filtering.py
+++ b/onionperf/filtering.py
@@ -38,41 +38,11 @@ class Filtering(object):
if self.fingerprints_to_include is None and self.fingerprints_to_exclude is None:
return
for source in self.analysis.get_nodes():
- tor_streams_by_source_port = {}
- tor_streams = self.analysis.get_tor_streams(source)
- for tor_stream in tor_streams.values():
- if "source" in tor_stream and ":" in tor_stream["source"]:
- source_port = tor_stream["source"].split(":")[1]
- tor_streams_by_source_port.setdefault(source_port, []).append(tor_stream)
tor_circuits = self.analysis.get_tor_circuits(source)
- tgen_streams = self.analysis.get_tgen_streams(source)
- tgen_transfers = self.analysis.get_tgen_transfers(source)
- retained_tgen_streams = {}
- retained_tgen_transfers = {}
- while tgen_streams or tgen_transfers:
- stream_id = None
- transfer_id = None
- source_port = None
- unix_ts_end = None
+ filtered_circuit_ids = []
+ for circuit_id, tor_circuit in tor_circuits.items():
keep = False
- if tgen_streams:
- stream_id, stream_data = tgen_streams.popitem()
- if "local" in stream_data["transport_info"] and len(stream_data["transport_info"]["local"].split(":")) > 2:
- source_port = stream_data["transport_info"]["local"].split(":")[2]
- if "unix_ts_end" in stream_data:
- unix_ts_end = stream_data["unix_ts_end"]
- elif tgen_transfers:
- transfer_id, transfer_data = tgen_transfers.popitem()
- if "endpoint_local" in transfer_data and len(transfer_data["endpoint_local"].split(":")) > 2:
- source_port = transfer_data["endpoint_local"].split(":")[2]
- if "unix_ts_end" in transfer_data:
- unix_ts_end = transfer_data["unix_ts_end"]
- if source_port and unix_ts_end:
- for tor_stream in tor_streams_by_source_port[source_port]:
- if abs(unix_ts_end - tor_stream["unix_ts_end"]) < 150.0:
- circuit_id = tor_stream["circuit_id"]
- if circuit_id and str(circuit_id) in tor_circuits:
- tor_circuit = tor_circuits[circuit_id]
+ if "path" in tor_circuit:
path = tor_circuit["path"]
keep = True
for long_name, _ in path:
@@ -85,12 +55,9 @@ class Filtering(object):
if self.fingerprints_to_exclude is not None and fingerprint in self.fingerprints_to_exclude:
keep = False
break
- if keep:
- if stream_id:
- retained_tgen_streams[stream_id] = stream_data
- if transfer_id:
- retained_tgen_transfers[transfer_id] = transfer_data
- self.analysis.set_tgen_streams(source, retained_tgen_streams)
- self.analysis.set_tgen_transfers(source, retained_tgen_transfers)
- self.analysis.save(filename=output_file, output_prefix=output_dir)
+ if not keep:
+ filtered_circuit_ids.append(circuit_id)
+ for circuit_id in filtered_circuit_ids:
+ del(tor_circuits[circuit_id])
+ self.analysis.save(filename=output_file, output_prefix=output_dir, sort_keys=False)
diff --git a/onionperf/onionperf b/onionperf/onionperf
index e3f49c8..1efa8cb 100755
--- a/onionperf/onionperf
+++ b/onionperf/onionperf
@@ -79,8 +79,13 @@ DESC_FILTER = """
Takes an OnionPerf analysis results file or directory as input, applies filters,
and produces new OnionPerf analysis results file(s) as output.
-This subcommand only filters measurements in `data/[source]/tgen/transfers`
-and `data/[source]/tgen/streams`, but leaves any summaries unchanged.
+The `filter` subcommand is typically used in combination with the `visualize`
+subcommand. The work flow is to filter out any TGen transfers/streams or Tor
+streams/circuits that are not supposed to be visualized and then visualize only
+those measurements with an existing mapping between TGen transfers/streams and
+Tor streams/circuits.
+
+This subcommand only filters individual objects and leaves summaries unchanged.
"""
HELP_FILTER = """
Filter OnionPerf analysis results
@@ -304,15 +309,15 @@ files generated by this script will be written""",
action="store", dest="input")
filter_parser.add_argument('--include-fingerprints',
- help="""include only measurements with known circuit path and with all
+ help="""include only Tor circuits with known circuit path and with all
relays being contained in the fingerprints file located at
PATH""",
metavar="PATH", action="store", dest="include_fingerprints",
default=None)
filter_parser.add_argument('--exclude-fingerprints',
- help="""exclude measurements without known circuit path or with any
- relays being contained in the fingerprints file located at
+ help="""exclude Tor circuits without known circuit path or with any
+ relay being contained in the fingerprints file located at
PATH""",
metavar="PATH", action="store", dest="exclude_fingerprints",
default=None)
@@ -337,6 +342,13 @@ files generated by this script will be written""",
required="True",
action=PathStringArgsAction, dest="datasets")
+ visualize_parser.add_argument('--outer-join',
+ help="""Include measurements without an existing mapping between TGen
+ transfers/streams and Tor streams/circuits, which is the
+ equivalent of an outer join in the database sense""",
+ action="store_true", dest="outer_join",
+ default=False)
+
visualize_parser.add_argument('-p', '--prefix',
help="a STRING filename prefix for graphs we generate",
metavar="STRING", type=str,
@@ -477,7 +489,7 @@ def visualize(args):
if analysis is not None:
analyses.append(analysis)
tgen_viz.add_dataset(analyses, label)
- tgen_viz.plot_all(args.prefix)
+ tgen_viz.plot_all(args.prefix, outer_join=args.outer_join)
def type_nonnegative_integer(value):
i = int(value)
diff --git a/onionperf/visualization.py b/onionperf/visualization.py
index 660f52e..0f69879 100644
--- a/onionperf/visualization.py
+++ b/onionperf/visualization.py
@@ -31,11 +31,11 @@ class Visualization(object, metaclass=ABCMeta):
class TGenVisualization(Visualization):
- def plot_all(self, output_prefix):
+ def plot_all(self, output_prefix, outer_join=False):
if len(self.datasets) > 0:
prefix = output_prefix + '.' if output_prefix is not None else ''
ts = time.strftime("%Y-%m-%d_%H:%M:%S")
- self.__extract_data_frame()
+ self.__extract_data_frame(outer_join)
self.data.to_csv("{0}onionperf.viz.{1}.csv".format(prefix, ts))
sns.set_context("paper")
self.page = PdfPages("{0}onionperf.viz.{1}.pdf".format(prefix, ts))
@@ -51,7 +51,7 @@ class TGenVisualization(Visualization):
self.__plot_errors_time()
self.page.close()
- def __extract_data_frame(self):
+ def __extract_data_frame(self, outer_join=False):
streams = []
for (analyses, label) in self.datasets:
for analysis in analyses:
@@ -62,6 +62,7 @@ class TGenVisualization(Visualization):
if "source" in tor_stream and ":" in tor_stream["source"]:
source_port = tor_stream["source"].split(":")[1]
tor_streams_by_source_port.setdefault(source_port, []).append(tor_stream)
+ tor_circuits = analysis.get_tor_circuits(client)
tgen_streams = analysis.get_tgen_streams(client)
tgen_transfers = analysis.get_tgen_transfers(client)
while tgen_streams or tgen_transfers:
@@ -122,20 +123,30 @@ class TGenVisualization(Visualization):
unix_ts_end = transfer_data["unix_ts_end"]
if "unix_ts_start" in transfer_data:
stream["start"] = datetime.datetime.utcfromtimestamp(transfer_data["unix_ts_start"])
+ tor_stream = None
+ tor_circuit = None
+ if source_port and unix_ts_end:
+ for s in tor_streams_by_source_port[source_port]:
+ if abs(unix_ts_end - s["unix_ts_end"]) < 150.0:
+ tor_stream = s
+ break
+ if tor_stream and "circuit_id" in tor_stream:
+ circuit_id = tor_stream["circuit_id"]
+ if str(circuit_id) in tor_circuits:
+ tor_circuit = tor_circuits[circuit_id]
if error_code:
if error_code == "PROXY":
error_code_parts = ["TOR"]
else:
error_code_parts = ["TGEN", error_code]
- if source_port and unix_ts_end:
- for tor_stream in tor_streams_by_source_port[source_port]:
- if abs(unix_ts_end - tor_stream["unix_ts_end"]) < 150.0:
- if "failure_reason_local" in tor_stream:
- error_code_parts.append(tor_stream["failure_reason_local"])
- if "failure_reason_remote" in tor_stream:
- error_code_parts.append(tor_stream["failure_reason_remote"])
+ if tor_stream:
+ if "failure_reason_local" in tor_stream:
+ error_code_parts.append(tor_stream["failure_reason_local"])
+ if "failure_reason_remote" in tor_stream:
+ error_code_parts.append(tor_stream["failure_reason_remote"])
stream["error_code"] = "/".join(error_code_parts)
- streams.append(stream)
+ if tor_circuit or outer_join:
+ streams.append(stream)
self.data = pd.DataFrame.from_records(streams, index="id")
def __plot_firstbyte_ecdf(self):