[tor-commits] [onionperf/develop] Let TGen client finish by itself in one-shot mode.

karsten at torproject.org karsten at torproject.org
Thu Aug 27 09:07:15 UTC 2020


commit 959cf3689106189001a83c7e58dc40e10497a081
Author: Philipp Winter <phw at nymity.ch>
Date:   Fri Aug 7 14:48:58 2020 -0700

    Let TGen client finish by itself in one-shot mode.
    
    We tell TGen client to finish on its own by passing the count option to
    the end node:
    https://github.com/shadow/tgen/blob/master/doc/TGen-Options.md#end-options
    
    This patch adds another argument to the function watchdog_thread_task(),
    no_relaunch, which instructs the function to not re-launch its process
    if it fails.
---
 onionperf/measurement.py | 45 +++++++++++++++++++++++----------------------
 onionperf/model.py       |  3 ++-
 2 files changed, 25 insertions(+), 23 deletions(-)

diff --git a/onionperf/measurement.py b/onionperf/measurement.py
index e2d8d1c..d699292 100644
--- a/onionperf/measurement.py
+++ b/onionperf/measurement.py
@@ -50,10 +50,11 @@ def readline_thread_task(instream, q):
     # wait for lines from stdout until the EOF
     for line in iter(instream.readline, b''): q.put(line)
 
-def watchdog_thread_task(cmd, cwd, writable, done_ev, send_stdin, ready_search_str, ready_ev):
+def watchdog_thread_task(cmd, cwd, writable, done_ev, send_stdin, ready_search_str, ready_ev, no_relaunch):
 
-    # launch or re-launch our sub process until we are told to stop
-    # if we fail too many times in too short of time, give up and exit
+    # launch or re-launch (or don't re-launch, if no_relaunch is set) our sub
+    # process until we are told to stop if we fail too many times in too short
+    # of time, give up and exit
     failure_times = []
     pause_time_seconds = 0
     while done_ev.is_set() is False:
@@ -105,6 +106,10 @@ def watchdog_thread_task(cmd, cwd, writable, done_ev, send_stdin, ready_search_s
             subp.wait()
         elif done_ev.is_set():
             logging.info("command '{}' finished as expected".format(cmd))
+        elif no_relaunch:
+            logging.info("command '{}' finished on its own".format(cmd))
+            # our command finished on its own. time to terminate.
+            done_ev.set()
         else:
             logging.warning("command '{}' finished before expected".format(cmd))
             now = time.time()
@@ -284,15 +289,9 @@ class Measurement(object):
                 time.sleep(1)
                 while True:
                     if tgen_model.num_transfers:
-                        downloads = 0
-                        while True:
-                            downloads = self.__get_download_count(tgen_client_writable.filename)
-                            time.sleep(1)
-                            if downloads >= tgen_model.num_transfers:
-                                logging.info("Onionperf has downloaded %d files and will now shut down." % tgen_model.num_transfers)
-                                break
-                        else:
-                            continue
+                        # This function blocks until our TGen client process
+                        # terminated on its own.
+                        self.__wait_for_tgen_client()
                         break
 
                     if self.__is_alive():
@@ -366,7 +365,10 @@ class Measurement(object):
         logging.info("Logging TGen {1} process output to {0}".format(tgen_logpath, name))
 
         tgen_cmd = "{0} {1}".format(self.tgen_bin_path, tgen_confpath)
-        tgen_args = (tgen_cmd, tgen_datadir, tgen_writable, self.done_event, None, None, None)
+        # If we're running in "one-shot mode", TGen client will terminate on
+        # its own and we don't need our watchdog to restart the process.
+        no_relaunch = (name == "client" and tgen_model_conf.num_transfers)
+        tgen_args = (tgen_cmd, tgen_datadir, tgen_writable, self.done_event, None, None, None, no_relaunch)
         tgen_watchdog = threading.Thread(target=watchdog_thread_task, name="tgen_{0}_watchdog".format(name), args=tgen_args)
         tgen_watchdog.start()
         self.threads.append(tgen_watchdog)
@@ -464,7 +466,7 @@ WarnUnsafeSocks 0\nSafeLogging 0\nMaxCircuitDirtiness 60 seconds\nDataDirectory
         tor_stdin_bytes = str_tools._to_bytes(tor_config)
         tor_ready_str = "Bootstrapped 100"
         tor_ready_ev = threading.Event()
-        tor_args = (tor_cmd, tor_datadir, tor_writable, self.done_event, tor_stdin_bytes, tor_ready_str, tor_ready_ev)
+        tor_args = (tor_cmd, tor_datadir, tor_writable, self.done_event, tor_stdin_bytes, tor_ready_str, tor_ready_ev, False)
         tor_watchdog = threading.Thread(target=watchdog_thread_task, name="tor_{0}_watchdog".format(name), args=tor_args)
         tor_watchdog.start()
         self.threads.append(tor_watchdog)
@@ -491,14 +493,13 @@ WarnUnsafeSocks 0\nSafeLogging 0\nMaxCircuitDirtiness 60 seconds\nDataDirectory
 
         return tor_writable, torctl_writable
 
-    def __get_download_count(self, tgen_logpath):
-        count = 0
-        if tgen_logpath is not None and os.path.exists(tgen_logpath):
-            with open(tgen_logpath, 'r') as fin:
-                for line in fin:
-                    if re.search("transfer-complete", line) is not None:
-                        count += 1
-        return count
+    def __wait_for_tgen_client(self):
+        logging.info("Waiting for TGen client to finish.")
+        for t in self.threads:
+            if t.getName() == "tgen_client_watchdog":
+                while t.is_alive():
+                    time.sleep(1)
+                logging.info("TGen client finished.")
 
     def __is_alive(self):
         all_alive = True
diff --git a/onionperf/model.py b/onionperf/model.py
index a4af2fc..bdd5a53 100644
--- a/onionperf/model.py
+++ b/onionperf/model.py
@@ -118,7 +118,8 @@ class TorperfModel(GeneratableTGenModel):
                 if i > 0:
                     g.add_edge("pause-%d" % (i-1), "stream-%d" % i)
 
-            g.add_node("end")
+            g.add_node("end",
+                       count=str(self.config.num_transfers))
             g.add_edge("pause", "stream-0")
             g.add_edge("pause-%d" % (self.config.num_transfers - 1), "end")
 





More information about the tor-commits mailing list