commit 8f92a27705e78ec6751b6974f6f31d772137b026 Author: Damian Johnson atagar@torproject.org Date: Wed Feb 8 20:24:10 2012 -0800
Putting all locks under a 'with' clause
At first I was dubious of the usefulness of 'with' keyword gsathya showed me. However, now that I've discovered that locking can be done under it I take that all back - it's a wonderful, wonderful thing and I don't know how I got by with manual locking/releasing before.
... and then they ate Sir Robin's minstrels and there was much rejoicing. --- stem/socket.py | 113 +++++++++++++++----------------------- stem/util/conf.py | 127 +++++++++++++++++++------------------------ test/runner.py | 155 ++++++++++++++++++++++++----------------------------- 3 files changed, 171 insertions(+), 224 deletions(-)
diff --git a/stem/socket.py b/stem/socket.py index 5ae4418..9379884 100644 --- a/stem/socket.py +++ b/stem/socket.py @@ -110,18 +110,15 @@ class ControlSocket: stem.socket.SocketClosed if the socket is known to be shut down """
- self._send_lock.acquire() - - try: - if not self.is_alive(): raise SocketClosed() - send_message(self._socket_file, message, raw) - except SocketClosed, exc: - # if send_message raises a SocketClosed then we should properly shut - # everything down - if self.is_alive(): self.close() - raise exc - finally: - self._send_lock.release() + with self._send_lock: + try: + if not self.is_alive(): raise SocketClosed() + send_message(self._socket_file, message, raw) + except SocketClosed, exc: + # if send_message raises a SocketClosed then we should properly shut + # everything down + if self.is_alive(): self.close() + raise exc
def recv(self): """ @@ -137,18 +134,15 @@ class ControlSocket: complete message """
- self._recv_lock.acquire() - - try: - if not self.is_alive(): raise SocketClosed() - return recv_message(self._socket_file) - except SocketClosed, exc: - # if recv_message raises a SocketClosed then we should properly shut - # everything down - if self.is_alive(): self.close() - raise exc - finally: - self._recv_lock.release() + with self._recv_lock: + try: + if not self.is_alive(): raise SocketClosed() + return recv_message(self._socket_file) + except SocketClosed, exc: + # if recv_message raises a SocketClosed then we should properly shut + # everything down + if self.is_alive(): self.close() + raise exc
def is_alive(self): """ @@ -178,54 +172,41 @@ class ControlSocket: stem.socket.SocketError if unable to make a socket """
- # we need both locks for this - self._send_lock.acquire() - self._recv_lock.acquire() - - # close the socket if we're currently attached to one - if self.is_alive(): self.close() - - try: + with self._send_lock, self._recv_lock: + # close the socket if we're currently attached to one + if self.is_alive(): self.close() + self._socket = self._make_socket() self._socket_file = self._socket.makefile() self._is_alive = True - finally: - self._send_lock.release() - self._recv_lock.release()
def close(self): """ Shuts down the socket. If it's already closed then this is a no-op. """
- # we need both locks for this - self._send_lock.acquire() - self._recv_lock.acquire() - - if self._socket: - # if we haven't yet established a connection then this raises an error - # socket.error: [Errno 107] Transport endpoint is not connected - try: self._socket.shutdown(socket.SHUT_RDWR) - except socket.error: pass + with self._send_lock, self._recv_lock: + if self._socket: + # if we haven't yet established a connection then this raises an error + # socket.error: [Errno 107] Transport endpoint is not connected + try: self._socket.shutdown(socket.SHUT_RDWR) + except socket.error: pass + + # Suppressing unexpected exceptions from close. For instance, if the + # socket's file has already been closed then with python 2.7 that raises + # with... + # error: [Errno 32] Broken pipe + + try: self._socket.close() + except: pass
- # Suppressing unexpected exceptions from close. For instance, if the - # socket's file has already been closed then with python 2.7 that raises - # with... - # error: [Errno 32] Broken pipe + if self._socket_file: + try: self._socket_file.close() + except: pass
- try: self._socket.close() - except: pass - - if self._socket_file: - try: self._socket_file.close() - except: pass - - self._socket = None - self._socket_file = None - self._is_alive = False - - self._send_lock.release() - self._recv_lock.release() + self._socket = None + self._socket_file = None + self._is_alive = False
def __enter__(self): return self @@ -545,13 +526,10 @@ class ControlLine(str): IndexError if we don't have any remaining content left to parse """
- try: - self._remainder_lock.acquire() + with self._remainder_lock: next_entry, remainder = _parse_entry(self._remainder, quoted, escaped) self._remainder = remainder return next_entry - finally: - self._remainder_lock.release()
def pop_mapping(self, quoted = False, escaped = False): """ @@ -570,8 +548,7 @@ class ControlLine(str): the value being quoted """
- try: - self._remainder_lock.acquire() + with self._remainder_lock: if self.is_empty(): raise IndexError("no remaining content to parse") key_match = KEY_ARG.match(self._remainder)
@@ -585,8 +562,6 @@ class ControlLine(str): next_entry, remainder = _parse_entry(remainder, quoted, escaped) self._remainder = remainder return (key, next_entry) - finally: - self._remainder_lock.release()
def _parse_entry(line, quoted, escaped): """ diff --git a/stem/util/conf.py b/stem/util/conf.py index 7d086f4..05e98f8 100644 --- a/stem/util/conf.py +++ b/stem/util/conf.py @@ -30,7 +30,7 @@ three things...
There are many ways of using the Config class but the most common ones are...
-- Call config_dict to get a dictionary that's always synced with with a Config. +- Call config_dict to get a dictionary that's always synced with a Config.
- Make a dictionary and call synchronize() to bring it into sync with the Config. This does not keep it in sync as the Config changes. See the Config @@ -216,43 +216,41 @@ class Config(): with open(self._path, "r") as config_file: read_contents = config_file.readlines()
- self._contents_lock.acquire() - self._raw_contents = read_contents - remainder = list(self._raw_contents) - - while remainder: - line = remainder.pop(0) - - # strips any commenting or excess whitespace - comment_start = line.find("#") - if comment_start != -1: line = line[:comment_start] - line = line.strip() + with self._contents_lock: + self._raw_contents = read_contents + remainder = list(self._raw_contents)
- # parse the key/value pair - if line: - try: - key, value = line.split(" ", 1) - value = value.strip() - except ValueError: - log.debug("Config entry '%s' is expected to be of the format 'Key Value', defaulting to '%s' -> ''" % (line, line)) - key, value = line, "" + while remainder: + line = remainder.pop(0) + + # strips any commenting or excess whitespace + comment_start = line.find("#") + if comment_start != -1: line = line[:comment_start] + line = line.strip()
- if not value: - # this might be a multi-line entry, try processing it as such - multiline_buffer = [] + # parse the key/value pair + if line: + try: + key, value = line.split(" ", 1) + value = value.strip() + except ValueError: + log.debug("Config entry '%s' is expected to be of the format 'Key Value', defaulting to '%s' -> ''" % (line, line)) + key, value = line, ""
- while remainder and remainder[0].lstrip().startswith("|"): - content = remainder.pop(0).lstrip()[1:] # removes '\s+|' prefix - content = content.rstrip("\n") # trailing newline - multiline_buffer.append(content) + if not value: + # this might be a multi-line entry, try processing it as such + multiline_buffer = [] + + while remainder and remainder[0].lstrip().startswith("|"): + content = remainder.pop(0).lstrip()[1:] # removes '\s+|' prefix + content = content.rstrip("\n") # trailing newline + multiline_buffer.append(content) + + if multiline_buffer: + self.set(key, "\n".join(multiline_buffer), False) + continue
- if multiline_buffer: - self.set(key, "\n".join(multiline_buffer), False) - continue - - self.set(key, value, False) - - self._contents_lock.release() + self.set(key, value, False)
def save(self, path = None): """ @@ -272,17 +270,13 @@ class Config(): elif not self._path: raise ValueError("Unable to save configuration: no path provided")
- self._contents_lock.acquire() - - with open(self._path, 'w') as output_file: + with self._contents_lock, open(self._path, 'w') as output_file: for entry_key in sorted(self.keys()): for entry_value in self.get_value(entry_key, multiple = True): # check for multi line entries if "\n" in entry_value: entry_value = "\n|" + entry_value.replace("\n", "\n|")
output_file.write('%s %s\n' % (entry_key, entry_value)) - - self._contents_lock.release()
def clear(self): """ @@ -290,11 +284,10 @@ class Config(): state. """
- self._contents_lock.acquire() - self._contents.clear() - self._raw_contents = [] - self._requested_keys = set() - self._contents_lock.release() + with self._contents_lock: + self._contents.clear() + self._raw_contents = [] + self._requested_keys = set()
def synchronize(self, conf_mappings, limits = None): """ @@ -344,14 +337,12 @@ class Config(): backfill (bool) - calls the function with our current values if true """
- self._contents_lock.acquire() - self._listeners.append(listener) - - if backfill: - for key in self.keys(): - listener(self, key) - - self._contents_lock.release() + with self._contents_lock: + self._listeners.append(listener) + + if backfill: + for key in self.keys(): + listener(self, key)
def clear_listeners(self): """ @@ -393,9 +384,7 @@ class Config(): the values are appended """
- try: - self._contents_lock.acquire() - + with self._contents_lock: if isinstance(value, str): if not overwrite and key in self._contents: self._contents[key].append(value) else: self._contents[key] = [value] @@ -409,8 +398,6 @@ class Config(): for listener in self._listeners: listener(self, key) else: raise ValueError("Config.set() only accepts str, list, or tuple. Provided value was a '%s'" % type(value)) - finally: - self._contents_lock.release()
def get(self, key, default = None): """ @@ -498,20 +485,18 @@ class Config(): key, providing the default if no such key exists """
- self._contents_lock.acquire() - - if key in self._contents: - val = self._contents[key] - if not multiple: val = val[-1] - self._requested_keys.add(key) - else: - message_id = "stem.util.conf.missing_config_key_%s" % key - log.log_once(message_id, log.TRACE, "config entry '%s' not found, defaulting to '%s'" % (key, default)) - val = default - - self._contents_lock.release() - - return val + with self._contents_lock: + if key in self._contents: + self._requested_keys.add(key) + + if multiple: + return self._contents[key] + else: + return self._contents[key][-1] + else: + message_id = "stem.util.conf.missing_config_key_%s" % key + log.log_once(message_id, log.TRACE, "config entry '%s' not found, defaulting to '%s'" % (key, default)) + return default
def get_str_csv(self, key, default = None, count = None, sub_key = None): """ diff --git a/test/runner.py b/test/runner.py index 07a54fd..cf596d8 100644 --- a/test/runner.py +++ b/test/runner.py @@ -160,79 +160,75 @@ class Runner: OSError if unable to run test preparations or start tor """
- self._runner_lock.acquire() - - # if we're holding on to a tor process (running or not) then clean up after - # it so we can start a fresh instance - if self._tor_process: self.stop() - - test.output.print_line("Setting up a test instance...", *STATUS_ATTR) - - # if 'test_directory' is unset then we make a new data directory in /tmp - # and clean it up when we're done - - config_test_dir = CONFIG["integ.test_directory"] - - if config_test_dir: - self._test_dir = stem.util.system.expand_path(config_test_dir, STEM_BASE) - else: - self._test_dir = tempfile.mktemp("-stem-integ") - - original_cwd, data_dir_path = os.getcwd(), self._test_dir - - if CONFIG["test.target.relative_data_dir"]: - tor_cwd = os.path.dirname(self._test_dir) - if not os.path.exists(tor_cwd): os.makedirs(tor_cwd) + with self._runner_lock: + # if we're holding on to a tor process (running or not) then clean up after + # it so we can start a fresh instance + if self._tor_process: self.stop()
- os.chdir(tor_cwd) - data_dir_path = "./%s" % os.path.basename(self._test_dir) - - self._tor_cmd = tor_cmd - self._custom_opts = extra_torrc_opts - self._torrc_contents = BASE_TORRC % data_dir_path - - if extra_torrc_opts: - self._torrc_contents += "\n".join(extra_torrc_opts) + "\n" - - try: - self._tor_cwd = os.getcwd() - self._run_setup() - self._start_tor(tor_cmd) + test.output.print_line("Setting up a test instance...", *STATUS_ATTR) + + # if 'test_directory' is unset then we make a new data directory in /tmp + # and clean it up when we're done + + config_test_dir = CONFIG["integ.test_directory"] + + if config_test_dir: + self._test_dir = stem.util.system.expand_path(config_test_dir, STEM_BASE) + else: + self._test_dir = tempfile.mktemp("-stem-integ") + + original_cwd, data_dir_path = os.getcwd(), self._test_dir
- # revert our cwd back to normal if CONFIG["test.target.relative_data_dir"]: - os.chdir(original_cwd) - except OSError, exc: - self.stop() - raise exc - finally: - self._runner_lock.release() + tor_cwd = os.path.dirname(self._test_dir) + if not os.path.exists(tor_cwd): os.makedirs(tor_cwd) + + os.chdir(tor_cwd) + data_dir_path = "./%s" % os.path.basename(self._test_dir) + + self._tor_cmd = tor_cmd + self._custom_opts = extra_torrc_opts + self._torrc_contents = BASE_TORRC % data_dir_path + + if extra_torrc_opts: + self._torrc_contents += "\n".join(extra_torrc_opts) + "\n" + + try: + self._tor_cwd = os.getcwd() + self._run_setup() + self._start_tor(tor_cmd) + + # revert our cwd back to normal + if CONFIG["test.target.relative_data_dir"]: + os.chdir(original_cwd) + except OSError, exc: + self.stop() + raise exc
def stop(self): """ Stops our tor test instance and cleans up any temporary resources. """
- self._runner_lock.acquire() - test.output.print_noline("Shutting down tor... ", *STATUS_ATTR) - - if self._tor_process: - self._tor_process.kill() - self._tor_process.communicate() # blocks until the process is done - - # if we've made a temporary data directory then clean it up - if self._test_dir and CONFIG["integ.test_directory"] == "": - shutil.rmtree(self._test_dir, ignore_errors = True) - - self._test_dir = "" - self._tor_cmd = None - self._tor_cwd = "" - self._torrc_contents = "" - self._custom_opts = None - self._tor_process = None - - test.output.print_line("done", *STATUS_ATTR) - self._runner_lock.release() + with self._runner_lock: + test.output.print_noline("Shutting down tor... ", *STATUS_ATTR) + + if self._tor_process: + self._tor_process.kill() + self._tor_process.communicate() # blocks until the process is done + + # if we've made a temporary data directory then clean it up + if self._test_dir and CONFIG["integ.test_directory"] == "": + shutil.rmtree(self._test_dir, ignore_errors = True) + + self._test_dir = "" + self._tor_cmd = None + self._tor_cwd = "" + self._torrc_contents = "" + self._custom_opts = None + self._tor_process = None + + test.output.print_line("done", *STATUS_ATTR)
def is_running(self): """ @@ -242,21 +238,16 @@ class Runner: True if we have a running tor test instance, False otherwise """
- # subprocess.Popen.poll() checks the return code, returning None if it's - # still going - - self._runner_lock.acquire() - is_running = self._tor_process and self._tor_process.poll() == None - - # If the tor process closed unexpectedly then this is probably the first - # place that we're realizing it. Clean up the temporary resources now since - # we might not end up calling stop() as normal. - - if not is_running: self.stop(True) - - self._runner_lock.release() - - return is_running + with self._runner_lock: + # Check for an unexpected shutdown by calling subprocess.Popen.poll(), + # which returns the exit code or None if we're still running. + + if self._tor_process and self._tor_process.poll() != None: + # clean up the temporary resources and note the unexpected shutdown + self.stop() + test.output.print_line("tor shut down unexpectedly", *ERROR_ATTR) + + return bool(self._tor_process)
def is_accessible(self): """ @@ -439,14 +430,10 @@ class Runner: RunnerStopped if we aren't running """
- try: - self._runner_lock.acquire() - + with self._runner_lock: if self.is_running(): return self.__dict__[attr] else: raise RunnerStopped() - finally: - self._runner_lock.release()
def _run_setup(self): """
tor-commits@lists.torproject.org