commit 8f92a27705e78ec6751b6974f6f31d772137b026
Author: Damian Johnson <atagar(a)torproject.org>
Date: Wed Feb 8 20:24:10 2012 -0800
Putting all locks under a 'with' clause
At first I was dubious of the usefulness of 'with' keyword gsathya showed me.
However, now that I've discovered that locking can be done under it I take that
all back - it's a wonderful, wonderful thing and I don't know how I got by with
manual locking/releasing before.
... and then they ate Sir Robin's minstrels and there was much rejoicing.
---
stem/socket.py | 113 +++++++++++++++-----------------------
stem/util/conf.py | 127 +++++++++++++++++++------------------------
test/runner.py | 155 ++++++++++++++++++++++++-----------------------------
3 files changed, 171 insertions(+), 224 deletions(-)
diff --git a/stem/socket.py b/stem/socket.py
index 5ae4418..9379884 100644
--- a/stem/socket.py
+++ b/stem/socket.py
@@ -110,18 +110,15 @@ class ControlSocket:
stem.socket.SocketClosed if the socket is known to be shut down
"""
- self._send_lock.acquire()
-
- try:
- if not self.is_alive(): raise SocketClosed()
- send_message(self._socket_file, message, raw)
- except SocketClosed, exc:
- # if send_message raises a SocketClosed then we should properly shut
- # everything down
- if self.is_alive(): self.close()
- raise exc
- finally:
- self._send_lock.release()
+ with self._send_lock:
+ try:
+ if not self.is_alive(): raise SocketClosed()
+ send_message(self._socket_file, message, raw)
+ except SocketClosed, exc:
+ # if send_message raises a SocketClosed then we should properly shut
+ # everything down
+ if self.is_alive(): self.close()
+ raise exc
def recv(self):
"""
@@ -137,18 +134,15 @@ class ControlSocket:
complete message
"""
- self._recv_lock.acquire()
-
- try:
- if not self.is_alive(): raise SocketClosed()
- return recv_message(self._socket_file)
- except SocketClosed, exc:
- # if recv_message raises a SocketClosed then we should properly shut
- # everything down
- if self.is_alive(): self.close()
- raise exc
- finally:
- self._recv_lock.release()
+ with self._recv_lock:
+ try:
+ if not self.is_alive(): raise SocketClosed()
+ return recv_message(self._socket_file)
+ except SocketClosed, exc:
+ # if recv_message raises a SocketClosed then we should properly shut
+ # everything down
+ if self.is_alive(): self.close()
+ raise exc
def is_alive(self):
"""
@@ -178,54 +172,41 @@ class ControlSocket:
stem.socket.SocketError if unable to make a socket
"""
- # we need both locks for this
- self._send_lock.acquire()
- self._recv_lock.acquire()
-
- # close the socket if we're currently attached to one
- if self.is_alive(): self.close()
-
- try:
+ with self._send_lock, self._recv_lock:
+ # close the socket if we're currently attached to one
+ if self.is_alive(): self.close()
+
self._socket = self._make_socket()
self._socket_file = self._socket.makefile()
self._is_alive = True
- finally:
- self._send_lock.release()
- self._recv_lock.release()
def close(self):
"""
Shuts down the socket. If it's already closed then this is a no-op.
"""
- # we need both locks for this
- self._send_lock.acquire()
- self._recv_lock.acquire()
-
- if self._socket:
- # if we haven't yet established a connection then this raises an error
- # socket.error: [Errno 107] Transport endpoint is not connected
- try: self._socket.shutdown(socket.SHUT_RDWR)
- except socket.error: pass
+ with self._send_lock, self._recv_lock:
+ if self._socket:
+ # if we haven't yet established a connection then this raises an error
+ # socket.error: [Errno 107] Transport endpoint is not connected
+ try: self._socket.shutdown(socket.SHUT_RDWR)
+ except socket.error: pass
+
+ # Suppressing unexpected exceptions from close. For instance, if the
+ # socket's file has already been closed then with python 2.7 that raises
+ # with...
+ # error: [Errno 32] Broken pipe
+
+ try: self._socket.close()
+ except: pass
- # Suppressing unexpected exceptions from close. For instance, if the
- # socket's file has already been closed then with python 2.7 that raises
- # with...
- # error: [Errno 32] Broken pipe
+ if self._socket_file:
+ try: self._socket_file.close()
+ except: pass
- try: self._socket.close()
- except: pass
-
- if self._socket_file:
- try: self._socket_file.close()
- except: pass
-
- self._socket = None
- self._socket_file = None
- self._is_alive = False
-
- self._send_lock.release()
- self._recv_lock.release()
+ self._socket = None
+ self._socket_file = None
+ self._is_alive = False
def __enter__(self):
return self
@@ -545,13 +526,10 @@ class ControlLine(str):
IndexError if we don't have any remaining content left to parse
"""
- try:
- self._remainder_lock.acquire()
+ with self._remainder_lock:
next_entry, remainder = _parse_entry(self._remainder, quoted, escaped)
self._remainder = remainder
return next_entry
- finally:
- self._remainder_lock.release()
def pop_mapping(self, quoted = False, escaped = False):
"""
@@ -570,8 +548,7 @@ class ControlLine(str):
the value being quoted
"""
- try:
- self._remainder_lock.acquire()
+ with self._remainder_lock:
if self.is_empty(): raise IndexError("no remaining content to parse")
key_match = KEY_ARG.match(self._remainder)
@@ -585,8 +562,6 @@ class ControlLine(str):
next_entry, remainder = _parse_entry(remainder, quoted, escaped)
self._remainder = remainder
return (key, next_entry)
- finally:
- self._remainder_lock.release()
def _parse_entry(line, quoted, escaped):
"""
diff --git a/stem/util/conf.py b/stem/util/conf.py
index 7d086f4..05e98f8 100644
--- a/stem/util/conf.py
+++ b/stem/util/conf.py
@@ -30,7 +30,7 @@ three things...
There are many ways of using the Config class but the most common ones are...
-- Call config_dict to get a dictionary that's always synced with with a Config.
+- Call config_dict to get a dictionary that's always synced with a Config.
- Make a dictionary and call synchronize() to bring it into sync with the
Config. This does not keep it in sync as the Config changes. See the Config
@@ -216,43 +216,41 @@ class Config():
with open(self._path, "r") as config_file:
read_contents = config_file.readlines()
- self._contents_lock.acquire()
- self._raw_contents = read_contents
- remainder = list(self._raw_contents)
-
- while remainder:
- line = remainder.pop(0)
-
- # strips any commenting or excess whitespace
- comment_start = line.find("#")
- if comment_start != -1: line = line[:comment_start]
- line = line.strip()
+ with self._contents_lock:
+ self._raw_contents = read_contents
+ remainder = list(self._raw_contents)
- # parse the key/value pair
- if line:
- try:
- key, value = line.split(" ", 1)
- value = value.strip()
- except ValueError:
- log.debug("Config entry '%s' is expected to be of the format 'Key Value', defaulting to '%s' -> ''" % (line, line))
- key, value = line, ""
+ while remainder:
+ line = remainder.pop(0)
+
+ # strips any commenting or excess whitespace
+ comment_start = line.find("#")
+ if comment_start != -1: line = line[:comment_start]
+ line = line.strip()
- if not value:
- # this might be a multi-line entry, try processing it as such
- multiline_buffer = []
+ # parse the key/value pair
+ if line:
+ try:
+ key, value = line.split(" ", 1)
+ value = value.strip()
+ except ValueError:
+ log.debug("Config entry '%s' is expected to be of the format 'Key Value', defaulting to '%s' -> ''" % (line, line))
+ key, value = line, ""
- while remainder and remainder[0].lstrip().startswith("|"):
- content = remainder.pop(0).lstrip()[1:] # removes '\s+|' prefix
- content = content.rstrip("\n") # trailing newline
- multiline_buffer.append(content)
+ if not value:
+ # this might be a multi-line entry, try processing it as such
+ multiline_buffer = []
+
+ while remainder and remainder[0].lstrip().startswith("|"):
+ content = remainder.pop(0).lstrip()[1:] # removes '\s+|' prefix
+ content = content.rstrip("\n") # trailing newline
+ multiline_buffer.append(content)
+
+ if multiline_buffer:
+ self.set(key, "\n".join(multiline_buffer), False)
+ continue
- if multiline_buffer:
- self.set(key, "\n".join(multiline_buffer), False)
- continue
-
- self.set(key, value, False)
-
- self._contents_lock.release()
+ self.set(key, value, False)
def save(self, path = None):
"""
@@ -272,17 +270,13 @@ class Config():
elif not self._path:
raise ValueError("Unable to save configuration: no path provided")
- self._contents_lock.acquire()
-
- with open(self._path, 'w') as output_file:
+ with self._contents_lock, open(self._path, 'w') as output_file:
for entry_key in sorted(self.keys()):
for entry_value in self.get_value(entry_key, multiple = True):
# check for multi line entries
if "\n" in entry_value: entry_value = "\n|" + entry_value.replace("\n", "\n|")
output_file.write('%s %s\n' % (entry_key, entry_value))
-
- self._contents_lock.release()
def clear(self):
"""
@@ -290,11 +284,10 @@ class Config():
state.
"""
- self._contents_lock.acquire()
- self._contents.clear()
- self._raw_contents = []
- self._requested_keys = set()
- self._contents_lock.release()
+ with self._contents_lock:
+ self._contents.clear()
+ self._raw_contents = []
+ self._requested_keys = set()
def synchronize(self, conf_mappings, limits = None):
"""
@@ -344,14 +337,12 @@ class Config():
backfill (bool) - calls the function with our current values if true
"""
- self._contents_lock.acquire()
- self._listeners.append(listener)
-
- if backfill:
- for key in self.keys():
- listener(self, key)
-
- self._contents_lock.release()
+ with self._contents_lock:
+ self._listeners.append(listener)
+
+ if backfill:
+ for key in self.keys():
+ listener(self, key)
def clear_listeners(self):
"""
@@ -393,9 +384,7 @@ class Config():
the values are appended
"""
- try:
- self._contents_lock.acquire()
-
+ with self._contents_lock:
if isinstance(value, str):
if not overwrite and key in self._contents: self._contents[key].append(value)
else: self._contents[key] = [value]
@@ -409,8 +398,6 @@ class Config():
for listener in self._listeners: listener(self, key)
else:
raise ValueError("Config.set() only accepts str, list, or tuple. Provided value was a '%s'" % type(value))
- finally:
- self._contents_lock.release()
def get(self, key, default = None):
"""
@@ -498,20 +485,18 @@ class Config():
key, providing the default if no such key exists
"""
- self._contents_lock.acquire()
-
- if key in self._contents:
- val = self._contents[key]
- if not multiple: val = val[-1]
- self._requested_keys.add(key)
- else:
- message_id = "stem.util.conf.missing_config_key_%s" % key
- log.log_once(message_id, log.TRACE, "config entry '%s' not found, defaulting to '%s'" % (key, default))
- val = default
-
- self._contents_lock.release()
-
- return val
+ with self._contents_lock:
+ if key in self._contents:
+ self._requested_keys.add(key)
+
+ if multiple:
+ return self._contents[key]
+ else:
+ return self._contents[key][-1]
+ else:
+ message_id = "stem.util.conf.missing_config_key_%s" % key
+ log.log_once(message_id, log.TRACE, "config entry '%s' not found, defaulting to '%s'" % (key, default))
+ return default
def get_str_csv(self, key, default = None, count = None, sub_key = None):
"""
diff --git a/test/runner.py b/test/runner.py
index 07a54fd..cf596d8 100644
--- a/test/runner.py
+++ b/test/runner.py
@@ -160,79 +160,75 @@ class Runner:
OSError if unable to run test preparations or start tor
"""
- self._runner_lock.acquire()
-
- # if we're holding on to a tor process (running or not) then clean up after
- # it so we can start a fresh instance
- if self._tor_process: self.stop()
-
- test.output.print_line("Setting up a test instance...", *STATUS_ATTR)
-
- # if 'test_directory' is unset then we make a new data directory in /tmp
- # and clean it up when we're done
-
- config_test_dir = CONFIG["integ.test_directory"]
-
- if config_test_dir:
- self._test_dir = stem.util.system.expand_path(config_test_dir, STEM_BASE)
- else:
- self._test_dir = tempfile.mktemp("-stem-integ")
-
- original_cwd, data_dir_path = os.getcwd(), self._test_dir
-
- if CONFIG["test.target.relative_data_dir"]:
- tor_cwd = os.path.dirname(self._test_dir)
- if not os.path.exists(tor_cwd): os.makedirs(tor_cwd)
+ with self._runner_lock:
+ # if we're holding on to a tor process (running or not) then clean up after
+ # it so we can start a fresh instance
+ if self._tor_process: self.stop()
- os.chdir(tor_cwd)
- data_dir_path = "./%s" % os.path.basename(self._test_dir)
-
- self._tor_cmd = tor_cmd
- self._custom_opts = extra_torrc_opts
- self._torrc_contents = BASE_TORRC % data_dir_path
-
- if extra_torrc_opts:
- self._torrc_contents += "\n".join(extra_torrc_opts) + "\n"
-
- try:
- self._tor_cwd = os.getcwd()
- self._run_setup()
- self._start_tor(tor_cmd)
+ test.output.print_line("Setting up a test instance...", *STATUS_ATTR)
+
+ # if 'test_directory' is unset then we make a new data directory in /tmp
+ # and clean it up when we're done
+
+ config_test_dir = CONFIG["integ.test_directory"]
+
+ if config_test_dir:
+ self._test_dir = stem.util.system.expand_path(config_test_dir, STEM_BASE)
+ else:
+ self._test_dir = tempfile.mktemp("-stem-integ")
+
+ original_cwd, data_dir_path = os.getcwd(), self._test_dir
- # revert our cwd back to normal
if CONFIG["test.target.relative_data_dir"]:
- os.chdir(original_cwd)
- except OSError, exc:
- self.stop()
- raise exc
- finally:
- self._runner_lock.release()
+ tor_cwd = os.path.dirname(self._test_dir)
+ if not os.path.exists(tor_cwd): os.makedirs(tor_cwd)
+
+ os.chdir(tor_cwd)
+ data_dir_path = "./%s" % os.path.basename(self._test_dir)
+
+ self._tor_cmd = tor_cmd
+ self._custom_opts = extra_torrc_opts
+ self._torrc_contents = BASE_TORRC % data_dir_path
+
+ if extra_torrc_opts:
+ self._torrc_contents += "\n".join(extra_torrc_opts) + "\n"
+
+ try:
+ self._tor_cwd = os.getcwd()
+ self._run_setup()
+ self._start_tor(tor_cmd)
+
+ # revert our cwd back to normal
+ if CONFIG["test.target.relative_data_dir"]:
+ os.chdir(original_cwd)
+ except OSError, exc:
+ self.stop()
+ raise exc
def stop(self):
"""
Stops our tor test instance and cleans up any temporary resources.
"""
- self._runner_lock.acquire()
- test.output.print_noline("Shutting down tor... ", *STATUS_ATTR)
-
- if self._tor_process:
- self._tor_process.kill()
- self._tor_process.communicate() # blocks until the process is done
-
- # if we've made a temporary data directory then clean it up
- if self._test_dir and CONFIG["integ.test_directory"] == "":
- shutil.rmtree(self._test_dir, ignore_errors = True)
-
- self._test_dir = ""
- self._tor_cmd = None
- self._tor_cwd = ""
- self._torrc_contents = ""
- self._custom_opts = None
- self._tor_process = None
-
- test.output.print_line("done", *STATUS_ATTR)
- self._runner_lock.release()
+ with self._runner_lock:
+ test.output.print_noline("Shutting down tor... ", *STATUS_ATTR)
+
+ if self._tor_process:
+ self._tor_process.kill()
+ self._tor_process.communicate() # blocks until the process is done
+
+ # if we've made a temporary data directory then clean it up
+ if self._test_dir and CONFIG["integ.test_directory"] == "":
+ shutil.rmtree(self._test_dir, ignore_errors = True)
+
+ self._test_dir = ""
+ self._tor_cmd = None
+ self._tor_cwd = ""
+ self._torrc_contents = ""
+ self._custom_opts = None
+ self._tor_process = None
+
+ test.output.print_line("done", *STATUS_ATTR)
def is_running(self):
"""
@@ -242,21 +238,16 @@ class Runner:
True if we have a running tor test instance, False otherwise
"""
- # subprocess.Popen.poll() checks the return code, returning None if it's
- # still going
-
- self._runner_lock.acquire()
- is_running = self._tor_process and self._tor_process.poll() == None
-
- # If the tor process closed unexpectedly then this is probably the first
- # place that we're realizing it. Clean up the temporary resources now since
- # we might not end up calling stop() as normal.
-
- if not is_running: self.stop(True)
-
- self._runner_lock.release()
-
- return is_running
+ with self._runner_lock:
+ # Check for an unexpected shutdown by calling subprocess.Popen.poll(),
+ # which returns the exit code or None if we're still running.
+
+ if self._tor_process and self._tor_process.poll() != None:
+ # clean up the temporary resources and note the unexpected shutdown
+ self.stop()
+ test.output.print_line("tor shut down unexpectedly", *ERROR_ATTR)
+
+ return bool(self._tor_process)
def is_accessible(self):
"""
@@ -439,14 +430,10 @@ class Runner:
RunnerStopped if we aren't running
"""
- try:
- self._runner_lock.acquire()
-
+ with self._runner_lock:
if self.is_running():
return self.__dict__[attr]
else: raise RunnerStopped()
- finally:
- self._runner_lock.release()
def _run_setup(self):
"""