commit 7ef8c64833d41337d5c9cc5baaee2808092c9aad Author: Philipp Winter phw@nymity.ch Date: Fri Jun 26 10:00:29 2020 -0700
Make models more configurable.
This patch removes the --oneshot subcommand and replaces it with several new subcommands for OnionPerf's "measure" command:
--tgen-start-pause (Initial pause before file transfers.) --tgen-num-transfers (Number of file transfers.) --tgen-intertransfer-pause (Pause in between file transfers.) --tgen-transfer-size (Size of each file transfer.)
By default, OnionPerf continues to run in "continuous" mode. One can simulate oneshot mode by running onionperf with the following flags:
onionperf measure --tgen-num-transfers=1
In addition to the above subcommands, this patch improves the code base by 1) adding a TGenConf class to hold TGen's configuration and by 2) adding a TGenModelConf class to hold TGen's traffic model.
This fixes tpo/metrics/onionperf#33432. --- onionperf/measurement.py | 102 +++++++++++++++++++--------------- onionperf/model.py | 108 +++++++++++++++++++++++------------- onionperf/onionperf | 60 +++++++++++++++----- onionperf/tests/test_measurement.py | 12 ++-- 4 files changed, 175 insertions(+), 107 deletions(-)
diff --git a/onionperf/measurement.py b/onionperf/measurement.py index af1fa0d..e2d8d1c 100644 --- a/onionperf/measurement.py +++ b/onionperf/measurement.py @@ -15,6 +15,16 @@ from stem.control import Controller from stem.version import Version, Requirement, get_system_tor_version from stem import __version__ as stem_version
+class TGenConf(object): + """Represents a TGen configuration, for both client and server.""" + def __init__(self, listen_port=None, connect_ip=None, connect_port=None, tor_ctl_port=None, tor_socks_port=None): + self.listen_port = str(listen_port) + self.tor_ctl_port = tor_ctl_port + self.tor_socks_port = tor_socks_port + # TGen clients use connect_ip and connect_port. + self.connect_ip = connect_ip + self.connect_port = connect_port + # onionperf imports from . import analysis, monitor, model, util
@@ -173,12 +183,11 @@ def logrotate_thread_task(writables, tgen_writable, torctl_writable, docroot, ni
class Measurement(object):
- def __init__(self, tor_bin_path, tgen_bin_path, datadir_path, privatedir_path, nickname, oneshot, additional_client_conf=None, torclient_conf_file=None, torserver_conf_file=None, single_onion=False): + def __init__(self, tor_bin_path, tgen_bin_path, datadir_path, privatedir_path, nickname, additional_client_conf=None, torclient_conf_file=None, torserver_conf_file=None, single_onion=False): self.tor_bin_path = tor_bin_path self.tgen_bin_path = tgen_bin_path self.datadir_path = datadir_path self.privatedir_path = privatedir_path - self.oneshot = oneshot self.nickname = nickname self.threads = None self.done_event = None @@ -190,20 +199,30 @@ class Measurement(object): self.torserver_conf_file = torserver_conf_file self.single_onion = single_onion
- def run(self, do_onion=True, do_inet=True, client_tgen_listen_port=58888, client_tgen_connect_ip='0.0.0.0', client_tgen_connect_port=8080, client_tor_ctl_port=59050, client_tor_socks_port=59000, - server_tgen_listen_port=8080, server_tor_ctl_port=59051, server_tor_socks_port=59001): + def run(self, do_onion=True, do_inet=True, tgen_model=None, tgen_client_conf=None, tgen_server_conf=None): ''' - only `server_tgen_listen_port` are "public" and need to be opened on the firewall. - if `client_tgen_connect_port` != `server_tgen_listen_port`, then you should have installed a forwarding rule in the firewall. + only `tgen_server_conf.listen_port` are "public" and need to be opened on the firewall. + if `tgen_client_conf.connect_port` != `tgen_server_conf.listen_port`, then you should have installed a forwarding rule in the firewall. all ports need to be unique though, and unique among multiple onionperf instances.
here are some sane defaults: - client_tgen_listen_port=58888, client_tgen_connect_port=8080, client_tor_ctl_port=59050, client_tor_socks_port=59000, - server_tgen_listen_port=8080, server_tor_ctl_port=59051, server_tor_socks_port=59001 + tgen_client_conf.listen_port=58888, tgen_client_conf.connect_port=8080, tgen_client_conf.tor_ctl_port=59050, tgen_client_conf.tor_socks_port=59000, + tgen_server_conf.listen_port=8080, tgen_server_conf.tor_ctl_port=59051, tgen_server_conf.tor_socks_port=59001 ''' self.threads = [] self.done_event = threading.Event()
+ if tgen_client_conf is None: + tgen_client_conf = TGenConf(listen_port=58888, + connect_ip='0.0.0.0', + connect_port=8080, + tor_ctl_port=59050, + tor_socks_port=59000) + if tgen_server_conf is None: + tgen_server_conf = TGenConf(listen_port=8080, + tor_ctl_port=59051, + tor_socks_port=59001) + # if ctrl-c is pressed, shutdown child processes properly try: # make sure stem and Tor supports ephemeral HS (version >= 0.2.7.1-alpha) @@ -225,52 +244,53 @@ class Measurement(object): tgen_client_writable, torctl_client_writable = None, None
if do_onion or do_inet: - general_writables.append(self.__start_tgen_server(server_tgen_listen_port)) + tgen_model.port = tgen_server_conf.listen_port + general_writables.append(self.__start_tgen_server(tgen_model))
if do_onion: logging.info("Onion Service private keys will be placed in {0}".format(self.privatedir_path)) # one must not have an open socks port when running a single # onion service. see tor's man page for more information. if self.single_onion: - server_tor_socks_port = 0 - tor_writable, torctl_writable = self.__start_tor_server(server_tor_ctl_port, - server_tor_socks_port, - {client_tgen_connect_port:server_tgen_listen_port}) + tgen_server_conf.tor_socks_port = 0 + tor_writable, torctl_writable = self.__start_tor_server(tgen_server_conf.tor_ctl_port, + tgen_server_conf.tor_socks_port, + {tgen_client_conf.connect_port:tgen_server_conf.listen_port}) general_writables.append(tor_writable) general_writables.append(torctl_writable)
if do_onion or do_inet: - tor_writable, torctl_client_writable = self.__start_tor_client(client_tor_ctl_port, client_tor_socks_port) + tor_writable, torctl_client_writable = self.__start_tor_client(tgen_client_conf.tor_ctl_port, tgen_client_conf.tor_socks_port) general_writables.append(tor_writable)
server_urls = [] if do_onion and self.hs_v3_service_id is not None: - server_urls.append("{0}.onion:{1}".format(self.hs_v3_service_id, client_tgen_connect_port)) + server_urls.append("{0}.onion:{1}".format(self.hs_v3_service_id, tgen_client_conf.connect_port)) if do_inet: - connect_ip = client_tgen_connect_ip if client_tgen_connect_ip != '0.0.0.0' else util.get_ip_address() - server_urls.append("{0}:{1}".format(connect_ip, client_tgen_connect_port)) + connect_ip = tgen_client_conf.connect_ip if tgen_client_conf.connect_ip != '0.0.0.0' else util.get_ip_address() + server_urls.append("{0}:{1}".format(connect_ip, tgen_client_conf.connect_port)) + tgen_model.servers = server_urls
if do_onion or do_inet: assert len(server_urls) > 0
- tgen_client_writable = self.__start_tgen_client(server_urls, client_tgen_listen_port, client_tor_socks_port) + tgen_model.port = tgen_client_conf.listen_port + tgen_model.socks_port = tgen_client_conf.tor_socks_port + tgen_client_writable = self.__start_tgen_client(tgen_model)
self.__start_log_processors(general_writables, tgen_client_writable, torctl_client_writable)
logging.info("Bootstrapping finished, entering heartbeat loop") time.sleep(1) - if self.oneshot: - logging.info("Onionperf is running in Oneshot mode. It will download a 5M file and shut down gracefully...") while True: - # TODO add status update of some kind? maybe the number of files in the www directory? - # logging.info("Heartbeat: {0} downloads have completed successfully".format(self.__get_download_count(tgen_client_writable.filename))) - if self.oneshot: + if tgen_model.num_transfers: downloads = 0 while True: downloads = self.__get_download_count(tgen_client_writable.filename) - if downloads >= 1: - logging.info("Onionperf has downloaded a 5M file in oneshot mode, and will now shut down.") - break + time.sleep(1) + if downloads >= tgen_model.num_transfers: + logging.info("Onionperf has downloaded %d files and will now shut down." % tgen_model.num_transfers) + break else: continue break @@ -320,35 +340,25 @@ class Measurement(object): logrotate.start() self.threads.append(logrotate)
- def __start_tgen_client(self, server_urls, tgen_port, socks_port): - return self.__start_tgen("client", tgen_port, socks_port, server_urls) + def __start_tgen_client(self, tgen_model_conf): + return self.__start_tgen("client", tgen_model_conf)
- def __start_tgen_server(self, tgen_port): - return self.__start_tgen("server", tgen_port) + def __start_tgen_server(self, tgen_model_conf): + return self.__start_tgen("server", tgen_model_conf)
- def __start_tgen(self, name, tgen_port, socks_port=None, server_urls=None): - logging.info("Starting TGen {0} process on port {1}...".format(name, tgen_port)) + def __start_tgen(self, name, tgen_model_conf): + logging.info("Starting TGen {0} process on port {1}...".format(name, tgen_model_conf.port)) tgen_datadir = "{0}/tgen-{1}".format(self.datadir_path, name) if not os.path.exists(tgen_datadir): os.makedirs(tgen_datadir)
tgen_confpath = "{0}/tgen.graphml.xml".format(tgen_datadir) if os.path.exists(tgen_confpath): os.remove(tgen_confpath)
- if socks_port is None: - model.ListenModel(tgen_port="{0}".format(tgen_port)).dump_to_file(tgen_confpath) - logging.info("TGen server running at 0.0.0.0:{0}".format(tgen_port)) + if tgen_model_conf.socks_port is None: + model.ListenModel(tgen_port="{0}".format(tgen_model_conf.port)).dump_to_file(tgen_confpath) + logging.info("TGen server running at 0.0.0.0:{0}".format(tgen_model_conf.port)) else: - - tgen_model_args = { - "tgen_port": "{0}".format(tgen_port), - "tgen_servers": server_urls, - "socksproxy": "127.0.0.1:{0}".format(socks_port) - } - if self.oneshot: - tgen_model = model.OneshotModel(**tgen_model_args) - else: - tgen_model = model.TorperfModel(**tgen_model_args) - + tgen_model = model.TorperfModel(tgen_model_conf) tgen_model.dump_to_file(tgen_confpath)
tgen_logpath = "{0}/onionperf.tgen.log".format(tgen_datadir) diff --git a/onionperf/model.py b/onionperf/model.py index cb45f51..a4af2fc 100644 --- a/onionperf/model.py +++ b/onionperf/model.py @@ -41,6 +41,21 @@ class TGenLoadableModel(TGenModel): model_instance = cls(graph) return model_instance
+class TGenModelConf(object): + """Represents a TGen traffic model configuration.""" + def __init__(self, initial_pause=0, num_transfers=1, transfer_size="5 MiB", + continuous_transfers=False, inter_transfer_pause=5, port=None, servers=[], + socks_port=None): + self.initial_pause = initial_pause + self.num_transfers = num_transfers + self.transfer_size = transfer_size + self.continuous_transfers = continuous_transfers + self.inter_transfer_pause = inter_transfer_pause + self.port = port + self.servers = servers + self.socks_port = socks_port + + class GeneratableTGenModel(TGenModel, metaclass=ABCMeta):
@abstractmethod @@ -58,61 +73,74 @@ class ListenModel(GeneratableTGenModel): g.add_node("start", serverport=self.tgen_port, loglevel="info", heartbeat="1 minute") return g
+ class TorperfModel(GeneratableTGenModel):
- def __init__(self, tgen_port="8889", tgen_servers=["127.0.0.1:8888"], socksproxy=None): - self.tgen_port = tgen_port - self.tgen_servers = tgen_servers - self.socksproxy = socksproxy + def __init__(self, config): + self.config = config self.graph = self.generate()
def generate(self): - server_str = ','.join(self.tgen_servers) + server_str = ','.join(self.config.servers) g = DiGraph()
- if self.socksproxy is not None: - g.add_node("start", serverport=self.tgen_port, peers=server_str, loglevel="info", heartbeat="1 minute", socksproxy=self.socksproxy) + if self.config.socks_port is not None: + g.add_node("start", + serverport=self.config.port, + peers=server_str, + loglevel="info", + heartbeat="1 minute", + socksproxy="127.0.0.1:{0}".format(self.config.socks_port)) else: - g.add_node("start", serverport=self.tgen_port, peers=server_str, loglevel="info", heartbeat="1 minute") - g.add_node("pause", time="5 minutes") - g.add_node("stream5m", sendsize="0", recvsize="5 mib", timeout="270 seconds", stallout="0 seconds") + g.add_node("start", + serverport=self.config.port, + peers=server_str, + loglevel="info", + heartbeat="1 minute")
+ g.add_node("pause", time="%d seconds" % self.config.initial_pause) g.add_edge("start", "pause")
- # after the pause, we start another pause timer while *at the same time* choosing one of - # the file sizes and downloading it from one of the servers in the server pool - g.add_edge("pause", "pause") - - # these are chosen with weighted probability, change edge 'weight' attributes to adjust probability - g.add_edge("pause", "stream5m") - - return g - -class OneshotModel(GeneratableTGenModel): - - def __init__(self, tgen_port="8889", tgen_servers=["127.0.0.1:8888"], socksproxy=None): - self.tgen_port = tgen_port - self.tgen_servers = tgen_servers - self.socksproxy = socksproxy - self.graph = self.generate() - - def generate(self): - server_str = ','.join(self.tgen_servers) - g = DiGraph() - - if self.socksproxy is not None: - g.add_node("start", serverport=self.tgen_port, peers=server_str, loglevel="info", heartbeat="1 minute", socksproxy=self.socksproxy) - else: - g.add_node("start", serverport=self.tgen_port, peers=server_str, loglevel="info", heartbeat="1 minute") - g.add_node("stream5m", sendsize="0", recvsize="5 mib", timeout="270 seconds", stallout="0 seconds") - - g.add_edge("start", "stream5m") - g.add_edge("stream5m", "start") + # "One-shot mode," i.e., onionperf will stop after the given number of + # iterations. The idea is: + # start -> pause -> stream-1 -> pause-1 -> ... -> stream-n -> pause-n -> end + if self.config.num_transfers > 0: + for i in range(self.config.num_transfers): + g.add_node("stream-%d" % i, + sendsize="0", + recvsize=self.config.transfer_size, + timeout="15 seconds", + stallout="10 seconds") + g.add_node("pause-%d" % i, + time="%d seconds" % self.config.inter_transfer_pause) + + g.add_edge("stream-%d" % i, "pause-%d" % i) + if i > 0: + g.add_edge("pause-%d" % (i-1), "stream-%d" % i) + + g.add_node("end") + g.add_edge("pause", "stream-0") + g.add_edge("pause-%d" % (self.config.num_transfers - 1), "end") + + # Continuous mode, i.e., onionperf will not stop. The idea is: + # start -> pause -> stream -> pause + # ^ | + # +-------+ + elif self.config.continuous_transfers: + g.add_node("stream", + sendsize="0", + recvsize=self.config.transfer_size, + timeout="15 seconds", + stallout="10 seconds") + g.add_node("pause", + time="%d seconds" % self.config.inter_transfer_pause) + g.add_edge("pause", "stream") + g.add_edge("stream", "pause") + g.add_edge("pause", "stream")
return g
- def dump_example_tgen_torperf_model(domain_name, onion_name): # the server listens on 8888, the client uses Tor to come back directly, and using a hidden serv server = ListenModel(tgen_port="8888") diff --git a/onionperf/onionperf b/onionperf/onionperf index e8024ce..d95e691 100755 --- a/onionperf/onionperf +++ b/onionperf/onionperf @@ -154,11 +154,6 @@ def main(): action="store", dest="tgenpath", default=util.which("tgen"))
- measure_parser.add_argument('--oneshot', - help="""Enables oneshot mode, onionperf closes on successfully downloading a file""", - action="store_true", dest="oneshot", - default=False) - measure_parser.add_argument('--additional-client-conf', help="""Additional configuration lines for the Tor client, for example bridge lines""", metavar="CONFIG", type=str, @@ -195,6 +190,30 @@ def main(): action="store", dest="tgenconnectport", default=8080)
+ measure_parser.add_argument('--tgen-start-pause', + help="""the number of seconds TGen should wait before walking through its action graph""", + metavar="N", type=int, + action="store", dest="tgenstartpause", + default=5) + + measure_parser.add_argument('--tgen-intertransfer-pause', + help="""the number of seconds TGen should wait in between two transfers""", + metavar="N", type=int, + action="store", dest="tgenintertransferpause", + default=300) + + measure_parser.add_argument('--tgen-transfer-size', + help="""the size of the file transfer that TGen will perform (e.g., '5 MiB' or '10 KiB')""", + metavar="STRING", type=str, + action="store", dest="tgentransfersize", + default="5 MiB") + + measure_parser.add_argument('--tgen-num-transfers', + help="""the number of file transfers that TGen will perform""", + metavar="N", type=int, + action="store", dest="tgennumtransfers", + default=0) + onion_or_inet_only_group = measure_parser.add_mutually_exclusive_group()
onion_or_inet_only_group.add_argument('-o', '--onion-only', @@ -327,7 +346,8 @@ def monitor(args): writer.close()
def measure(args): - from onionperf.measurement import Measurement + from onionperf.measurement import Measurement, TGenConf + from onionperf.model import TGenModelConf
# check paths args.torpath = util.find_path(args.torpath, "tor") @@ -347,12 +367,27 @@ def measure(args): server_tor_ctl_port = util.get_random_free_port() server_tor_socks_port = util.get_random_free_port()
+ tgen_client_conf = TGenConf(listen_port=client_tgen_port, + connect_ip=client_connect_ip, + connect_port=client_connect_port, + tor_ctl_port=client_tor_ctl_port, + tor_socks_port=client_tor_socks_port) + + tgen_server_conf = TGenConf(listen_port=server_tgen_port, + tor_ctl_port=server_tor_ctl_port, + tor_socks_port=server_tor_socks_port) + + tgen_model = TGenModelConf(initial_pause=args.tgenstartpause, + transfer_size=args.tgentransfersize, + num_transfers=args.tgennumtransfers, + continuous_transfers=args.tgennumtransfers == 0, + inter_transfer_pause=args.tgenintertransferpause) + meas = Measurement(args.torpath, args.tgenpath, args.prefix, args.private_prefix, args.nickname, - args.oneshot, args.additional_client_conf, args.torclient_conf_file, args.torserver_conf_file, @@ -360,14 +395,9 @@ def measure(args):
meas.run(do_onion=not args.inet_only, do_inet=not args.onion_only, - client_tgen_listen_port=client_tgen_port, - client_tgen_connect_ip=client_connect_ip, - client_tgen_connect_port=client_connect_port, - client_tor_ctl_port=client_tor_ctl_port, - client_tor_socks_port=client_tor_socks_port, - server_tgen_listen_port=server_tgen_port, - server_tor_ctl_port=server_tor_ctl_port, - server_tor_socks_port=server_tor_socks_port) + tgen_model=tgen_model, + tgen_client_conf=tgen_client_conf, + tgen_server_conf=tgen_server_conf) else: logging.info("Please fix path errors to continue")
diff --git a/onionperf/tests/test_measurement.py b/onionperf/tests/test_measurement.py index e5010fa..6bca8ab 100644 --- a/onionperf/tests/test_measurement.py +++ b/onionperf/tests/test_measurement.py @@ -57,8 +57,8 @@ WarnUnsafeSocks 0\nSafeLogging 0\nMaxCircuitDirtiness 60 seconds\nDataDirectory known_config_server = "RunAsDaemon 0\nORPort 0\nDirPort 0\nControlPort 9001\nSocksPort 9050\nSocksListenAddress 127.0.0.1\nClientOnly 1\n\ WarnUnsafeSocks 0\nSafeLogging 0\nMaxCircuitDirtiness 60 seconds\nDataDirectory /tmp/\nDataDirectoryGroupReadable 1\nLog INFO stdout\nUseEntryGuards 0\n"
- meas = measurement.Measurement(None, None, None, None, None, None, - "UseBridges 1\n", None, None) + meas = measurement.Measurement(None, None, None, None, None, + "UseBridges 1\n", None, None, False) config_client = meas.create_tor_config(9001, 9050, "/tmp/", "client") config_server = meas.create_tor_config(9001, 9050, "/tmp/", "server") assert_equals(config_client, known_config) @@ -80,8 +80,8 @@ WarnUnsafeSocks 0\nSafeLogging 0\nMaxCircuitDirtiness 60 seconds\nDataDirectory known_config = "RunAsDaemon 0\nORPort 0\nDirPort 0\nControlPort 9001\nSocksPort 9050\nSocksListenAddress 127.0.0.1\nClientOnly 1\n\ WarnUnsafeSocks 0\nSafeLogging 0\nMaxCircuitDirtiness 60 seconds\nDataDirectory /tmp/\nDataDirectoryGroupReadable 1\nLog INFO stdout\nUseBridges 1\n"
- meas = measurement.Measurement(None, None, None, None, None, None, None, - absolute_data_path("config"), None) + meas = measurement.Measurement(None, None, None, None, None, None, + absolute_data_path("config"), None, False) config_client = meas.create_tor_config(9001, 9050, "/tmp/", "client") config_server = meas.create_tor_config(9001, 9050, "/tmp/", "server") assert_equals(config_client, known_config) @@ -103,8 +103,8 @@ WarnUnsafeSocks 0\nSafeLogging 0\nMaxCircuitDirtiness 60 seconds\nDataDirectory known_config = "RunAsDaemon 0\nORPort 0\nDirPort 0\nControlPort 9001\nSocksPort 9050\nSocksListenAddress 127.0.0.1\nClientOnly 1\n\ WarnUnsafeSocks 0\nSafeLogging 0\nMaxCircuitDirtiness 60 seconds\nDataDirectory /tmp/\nDataDirectoryGroupReadable 1\nLog INFO stdout\nUseEntryGuards 0\n"
- meas = measurement.Measurement(None, None, None, None, None, None, None, None, - absolute_data_path("config")) + meas = measurement.Measurement(None, None, None, None, None, None, None, + absolute_data_path("config"), False) config_client = meas.create_tor_config(9001, 9050, "/tmp/", "client") config_server = meas.create_tor_config(9001, 9050, "/tmp/", "server") assert_equals(config_client, known_config)