tor-commits
Threads by month
- ----- 2025 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2024 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2023 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2022 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2021 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2020 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2019 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2018 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2017 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2016 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2015 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2014 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2013 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2012 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
- January
- ----- 2011 -----
- December
- November
- October
- September
- August
- July
- June
- May
- April
- March
- February
July 2020
- 17 participants
- 2101 discussions
[stem/master] Make requesting for descriptor content asynchronous
by atagar@torproject.org 15 Jul '20
by atagar@torproject.org 15 Jul '20
15 Jul '20
commit 841e2105147177f5959987c8bec1179dc94a59b3
Author: Illia Volochii <illia.volochii(a)gmail.com>
Date: Thu May 14 00:08:59 2020 +0300
Make requesting for descriptor content asynchronous
---
stem/client/__init__.py | 91 +++++++++++++-------------
stem/descriptor/remote.py | 138 ++++++++++++++++++++++++++++------------
stem/util/test_tools.py | 4 +-
test/integ/client/connection.py | 38 +++++++----
test/unit/descriptor/remote.py | 24 +++++--
5 files changed, 187 insertions(+), 108 deletions(-)
diff --git a/stem/client/__init__.py b/stem/client/__init__.py
index 639f118f..941f0ee7 100644
--- a/stem/client/__init__.py
+++ b/stem/client/__init__.py
@@ -26,7 +26,6 @@ a wrapper for :class:`~stem.socket.RelaySocket`, much the same way as
"""
import hashlib
-import threading
import stem
import stem.client.cell
@@ -71,11 +70,10 @@ class Relay(object):
self.link_protocol = LinkProtocol(link_protocol)
self._orport = orport
self._orport_buffer = b'' # unread bytes
- self._orport_lock = threading.RLock()
- self._circuits = {} # type: Dict[int, stem.client.Circuit]
+ self._circuits = {}
@staticmethod
- def connect(address: str, port: int, link_protocols: Sequence['stem.client.datatype.LinkProtocol'] = DEFAULT_LINK_PROTOCOLS) -> 'stem.client.Relay': # type: ignore
+ async def connect(address: str, port: int, link_protocols: Sequence['stem.client.datatype.LinkProtocol'] = DEFAULT_LINK_PROTOCOLS) -> 'stem.client.Relay': # type: ignore
"""
Establishes a connection with the given ORPort.
@@ -97,8 +95,9 @@ class Relay(object):
try:
conn = stem.socket.RelaySocket(address, port)
+ await conn.connect()
except stem.SocketError as exc:
- if 'Connection refused' in str(exc):
+ if 'Connect call failed' in str(exc):
raise stem.SocketError("Failed to connect to %s:%i. Maybe it isn't an ORPort?" % (address, port))
# If not an ORPort (for instance, mistakenly connecting to a ControlPort
@@ -122,21 +121,21 @@ class Relay(object):
# first VERSIONS cell, always have CIRCID_LEN == 2 for backward
# compatibility.
- conn.send(stem.client.cell.VersionsCell(link_protocols).pack(2)) # type: ignore
- response = conn.recv()
+ await conn.send(stem.client.cell.VersionsCell(link_protocols).pack(2)) # type: ignore
+ response = await conn.recv()
# Link negotiation ends right away if we lack a common protocol
# version. (#25139)
if not response:
- conn.close()
+ await conn.close()
raise stem.SocketError('Unable to establish a common link protocol with %s:%i' % (address, port))
versions_reply = stem.client.cell.Cell.pop(response, 2)[0] # type: stem.client.cell.VersionsCell # type: ignore
common_protocols = set(link_protocols).intersection(versions_reply.versions)
if not common_protocols:
- conn.close()
+ await conn.close()
raise stem.SocketError('Unable to find a common link protocol. We support %s but %s:%i supports %s.' % (', '.join(map(str, link_protocols)), address, port, ', '.join(map(str, versions_reply.versions))))
# Establishing connections requires sending a NETINFO, but including our
@@ -144,14 +143,14 @@ class Relay(object):
# where it would help.
link_protocol = max(common_protocols)
- conn.send(stem.client.cell.NetinfoCell(relay_addr, []).pack(link_protocol))
+ await conn.send(stem.client.cell.NetinfoCell(relay_addr, []).pack(link_protocol))
return Relay(conn, link_protocol)
- def _recv_bytes(self) -> bytes:
- return self._recv(True) # type: ignore
+ async def _recv_bytes(self) -> bytes:
+ return await self._recv(True) # type: ignore
- def _recv(self, raw: bool = False) -> 'stem.client.cell.Cell':
+ async def _recv(self, raw: bool = False) -> 'stem.client.cell.Cell':
"""
Reads the next cell from our ORPort. If none is present this blocks
until one is available.
@@ -161,13 +160,13 @@ class Relay(object):
:returns: next :class:`~stem.client.cell.Cell`
"""
- with self._orport_lock:
+ async with self._orport_lock:
# cells begin with [circ_id][cell_type][...]
circ_id_size = self.link_protocol.circ_id_size.size
while len(self._orport_buffer) < (circ_id_size + CELL_TYPE_SIZE.size):
- self._orport_buffer += self._orport.recv() # read until we know the cell type
+ self._orport_buffer += await self._orport.recv() # read until we know the cell type
cell_type = Cell.by_value(CELL_TYPE_SIZE.pop(self._orport_buffer[circ_id_size:])[0])
@@ -177,13 +176,13 @@ class Relay(object):
# variable length, our next field is the payload size
while len(self._orport_buffer) < (circ_id_size + CELL_TYPE_SIZE.size + FIXED_PAYLOAD_LEN):
- self._orport_buffer += self._orport.recv() # read until we know the cell size
+ self._orport_buffer += await self._orport.recv() # read until we know the cell size
payload_len = PAYLOAD_LEN_SIZE.pop(self._orport_buffer[circ_id_size + CELL_TYPE_SIZE.size:])[0]
cell_size = circ_id_size + CELL_TYPE_SIZE.size + payload_len
while len(self._orport_buffer) < cell_size:
- self._orport_buffer += self._orport.recv() # read until we have the full cell
+ self._orport_buffer += await self._orport.recv() # read until we have the full cell
if raw:
content, self._orport_buffer = split(self._orport_buffer, cell_size)
@@ -192,7 +191,7 @@ class Relay(object):
cell, self._orport_buffer = Cell.pop(self._orport_buffer, self.link_protocol)
return cell
- def _msg(self, cell: 'stem.client.cell.Cell') -> Iterator['stem.client.cell.Cell']:
+ async def _msg(self, cell: 'stem.client.cell.Cell') -> Iterator['stem.client.cell.Cell']:
"""
Sends a cell on the ORPort and provides the response we receive in reply.
@@ -219,9 +218,9 @@ class Relay(object):
# TODO: why is this an iterator?
- self._orport.recv(timeout = 0) # discard unread data
- self._orport.send(cell.pack(self.link_protocol))
- response = self._orport.recv(timeout = 1)
+ await self._orport.recv(timeout = 0) # discard unread data
+ await self._orport.send(cell.pack(self.link_protocol))
+ response = await self._orport.recv(timeout = 1)
yield stem.client.cell.Cell.pop(response, self.link_protocol)[0]
def is_alive(self) -> bool:
@@ -246,27 +245,27 @@ class Relay(object):
return self._orport.connection_time()
- def close(self) -> None:
+ async def close(self) -> None:
"""
Closes our socket connection. This is a pass-through for our socket's
:func:`~stem.socket.BaseSocket.close` method.
"""
- with self._orport_lock:
- return self._orport.close()
+ async with self._orport_lock:
+ return await self._orport.close()
- def create_circuit(self) -> 'stem.client.Circuit':
+ async def create_circuit(self) -> 'stem.client.Circuit':
"""
Establishes a new circuit.
"""
- with self._orport_lock:
+ async with self._orport_lock:
circ_id = max(self._circuits) + 1 if self._circuits else self.link_protocol.first_circ_id
create_fast_cell = stem.client.cell.CreateFastCell(circ_id)
created_fast_cell = None
- for cell in self._msg(create_fast_cell):
+ async for cell in self._msg(create_fast_cell):
if isinstance(cell, stem.client.cell.CreatedFastCell):
created_fast_cell = cell
break
@@ -284,16 +283,16 @@ class Relay(object):
return circ
- def __iter__(self) -> Iterator['stem.client.Circuit']:
- with self._orport_lock:
+ async def __aiter__(self) -> Iterator['stem.client.Circuit']:
+ async with self._orport_lock:
for circ in self._circuits.values():
yield circ
- def __enter__(self) -> 'stem.client.Relay':
+ async def __aenter__(self) -> 'stem.client.Relay':
return self
- def __exit__(self, exit_type: Optional[Type[BaseException]], value: Optional[BaseException], traceback: Optional[TracebackType]) -> None:
- self.close()
+ async def __aexit__(self, exit_type: Optional[Type[BaseException]], value: Optional[BaseException], traceback: Optional[TracebackType]) -> None:
+ await self.close()
class Circuit(object):
@@ -327,7 +326,7 @@ class Circuit(object):
self.forward_key = Cipher(algorithms.AES(kdf.forward_key), ctr, default_backend()).encryptor()
self.backward_key = Cipher(algorithms.AES(kdf.backward_key), ctr, default_backend()).decryptor()
- def directory(self, request: str, stream_id: int = 0) -> bytes:
+ async def directory(self, request: str, stream_id: int = 0) -> bytes:
"""
Request descriptors from the relay.
@@ -337,9 +336,9 @@ class Circuit(object):
:returns: **str** with the requested descriptor data
"""
- with self.relay._orport_lock:
- self._send(RelayCommand.BEGIN_DIR, stream_id = stream_id)
- self._send(RelayCommand.DATA, request, stream_id = stream_id)
+ async with self.relay._orport_lock:
+ await self._send(RelayCommand.BEGIN_DIR, stream_id = stream_id)
+ await self._send(RelayCommand.DATA, request, stream_id = stream_id)
response = [] # type: List[stem.client.cell.RelayCell]
@@ -347,7 +346,7 @@ class Circuit(object):
# Decrypt relay cells received in response. Our digest/key only
# updates when handled successfully.
- encrypted_cell = self.relay._recv_bytes()
+ encrypted_cell = await self.relay._recv_bytes()
decrypted_cell, backward_key, backward_digest = stem.client.cell.RelayCell.decrypt(self.relay.link_protocol, encrypted_cell, self.backward_key, self.backward_digest)
@@ -362,7 +361,7 @@ class Circuit(object):
else:
response.append(decrypted_cell)
- def _send(self, command: 'stem.client.datatype.RelayCommand', data: Union[bytes, str] = b'', stream_id: int = 0) -> None:
+ async def _send(self, command: 'stem.client.datatype.RelayCommand', data: Union[bytes, str] = b'', stream_id: int = 0) -> None:
"""
Sends a message over the circuit.
@@ -371,24 +370,24 @@ class Circuit(object):
:param stream_id: specific stream this concerns
"""
- with self.relay._orport_lock:
+ async with self.relay._orport_lock:
# Encrypt and send the cell. Our digest/key only updates if the cell is
# successfully sent.
cell = stem.client.cell.RelayCell(self.id, command, data, stream_id = stream_id)
payload, forward_key, forward_digest = cell.encrypt(self.relay.link_protocol, self.forward_key, self.forward_digest)
- self.relay._orport.send(payload)
+ await self.relay._orport.send(payload)
self.forward_digest = forward_digest
self.forward_key = forward_key
- def close(self) -> None:
- with self.relay._orport_lock:
- self.relay._orport.send(stem.client.cell.DestroyCell(self.id).pack(self.relay.link_protocol))
+ async def close(self)- > None:
+ async with self.relay._orport_lock:
+ await self.relay._orport.send(stem.client.cell.DestroyCell(self.id).pack(self.relay.link_protocol))
del self.relay._circuits[self.id]
- def __enter__(self) -> 'stem.client.Circuit':
+ async def __aenter__(self) -> 'stem.client.Circuit':
return self
- def __exit__(self, exit_type: Optional[Type[BaseException]], value: Optional[BaseException], traceback: Optional[TracebackType]) -> None:
- self.close()
+ async def __aexit__(self, exit_type: Optional[Type[BaseException]], value: Optional[BaseException], traceback: Optional[TracebackType]) -> None:
+ await self.close()
diff --git a/stem/descriptor/remote.py b/stem/descriptor/remote.py
index e90c4442..eca846ee 100644
--- a/stem/descriptor/remote.py
+++ b/stem/descriptor/remote.py
@@ -83,6 +83,8 @@ content. For example...
hashes.
"""
+import asyncio
+import functools
import io
import random
import socket
@@ -93,6 +95,7 @@ import urllib.request
import stem
import stem.client
+import stem.control
import stem.descriptor
import stem.descriptor.networkstatus
import stem.directory
@@ -227,7 +230,7 @@ def get_detached_signatures(**query_args: Any) -> 'stem.descriptor.remote.Query'
return get_instance().get_detached_signatures(**query_args)
-class Query(object):
+class AsyncQuery(object):
"""
Asynchronous request for descriptor content from a directory authority or
mirror. These can either be made through the
@@ -427,32 +430,27 @@ class Query(object):
self.reply_headers = None # type: Optional[Dict[str, str]]
self.kwargs = kwargs
- self._downloader_thread = None # type: Optional[threading.Thread]
- self._downloader_thread_lock = threading.RLock()
+ self._downloader_task = None
+ self._downloader_lock = threading.RLock()
+
+ self._asyncio_loop = asyncio.get_event_loop()
if start:
self.start()
if block:
- self.run(True)
+ self._asyncio_loop.create_task(self.run(True))
def start(self) -> None:
"""
Starts downloading the scriptors if we haven't started already.
"""
- with self._downloader_thread_lock:
- if self._downloader_thread is None:
- self._downloader_thread = threading.Thread(
- name = 'Descriptor query',
- target = self._download_descriptors,
- args = (self.retries, self.timeout)
- )
-
- self._downloader_thread.setDaemon(True)
- self._downloader_thread.start()
+ with self._downloader_lock:
+ if self._downloader_task is None:
+ self._downloader_task = self._asyncio_loop.create_task(self._download_descriptors(self.retries, self.timeout))
- def run(self, suppress: bool = False) -> List['stem.descriptor.Descriptor']:
+ async def run(self, suppress: bool = False) -> List['stem.descriptor.Descriptor']:
"""
Blocks until our request is complete then provides the descriptors. If we
haven't yet started our request then this does so.
@@ -470,12 +468,12 @@ class Query(object):
* :class:`~stem.DownloadFailed` if our request fails
"""
- return list(self._run(suppress))
+ return [desc async for desc in self._run(suppress)]
- def _run(self, suppress: bool) -> Iterator[stem.descriptor.Descriptor]:
- with self._downloader_thread_lock:
+ async def _run(self, suppress: bool) -> Iterator[stem.descriptor.Descriptor]:
+ with self._downloader_lock:
self.start()
- self._downloader_thread.join()
+ await self._downloader_task
if self.error:
if suppress:
@@ -508,8 +506,8 @@ class Query(object):
raise self.error
- def __iter__(self) -> Iterator[stem.descriptor.Descriptor]:
- for desc in self._run(True):
+ async def __aiter__(self) -> Iterator[stem.descriptor.Descriptor]:
+ async for desc in self._run(True):
yield desc
def _pick_endpoint(self, use_authority: bool = False) -> stem.Endpoint:
@@ -530,18 +528,18 @@ class Query(object):
else:
return random.choice(self.endpoints)
- def _download_descriptors(self, retries: int, timeout: Optional[float]) -> None:
+ async def _download_descriptors(self, retries: int, timeout: Optional[float]) -> None:
try:
self.start_time = time.time()
endpoint = self._pick_endpoint(use_authority = retries == 0 and self.fall_back_to_authority)
if isinstance(endpoint, stem.ORPort):
downloaded_from = 'ORPort %s:%s (resource %s)' % (endpoint.address, endpoint.port, self.resource)
- self.content, self.reply_headers = _download_from_orport(endpoint, self.compression, self.resource)
+ self.content, self.reply_headers = await _download_from_orport(endpoint, self.compression, self.resource)
elif isinstance(endpoint, stem.DirPort):
self.download_url = 'http://%s:%i/%s' % (endpoint.address, endpoint.port, self.resource.lstrip('/'))
downloaded_from = self.download_url
- self.content, self.reply_headers = _download_from_dirport(self.download_url, self.compression, timeout)
+ self.content, self.reply_headers = await _download_from_dirport(self.download_url, self.compression, timeout)
else:
raise ValueError("BUG: endpoints can only be ORPorts or DirPorts, '%s' was a %s" % (endpoint, type(endpoint).__name__))
@@ -555,7 +553,7 @@ class Query(object):
if retries > 0 and (timeout is None or timeout > 0):
log.debug("Unable to download descriptors from '%s' (%i retries remaining): %s" % (self.download_url, retries, exc))
- return self._download_descriptors(retries - 1, timeout)
+ return await self._download_descriptors(retries - 1, timeout)
else:
log.debug("Unable to download descriptors from '%s': %s" % (self.download_url, exc))
self.error = exc
@@ -563,6 +561,64 @@ class Query(object):
self.is_done = True
+class Query(stem.util.AsyncClassWrapper):
+ def __init__(self, resource, descriptor_type = None, endpoints = None, compression = (Compression.GZIP,), retries = 2, fall_back_to_authority = False, timeout = None, start = True, block = False, validate = False, document_handler = stem.descriptor.DocumentHandler.ENTRIES, **kwargs):
+ self._thread_for_wrapped_class = stem.util.ThreadForWrappedAsyncClass()
+ self._thread_for_wrapped_class.start()
+ self._wrapped_instance = self._init_async_class(
+ AsyncQuery,
+ resource,
+ descriptor_type,
+ endpoints,
+ compression,
+ retries,
+ fall_back_to_authority,
+ timeout,
+ start,
+ block,
+ validate,
+ document_handler,
+ **kwargs,
+ )
+
+ def start(self):
+ return self._call_async_method_soon('start')
+
+ def run(self, suppress = False):
+ return self._execute_async_method('run', suppress)
+
+ def __iter__(self):
+ for desc in self._execute_async_generator_method('__aiter__'):
+ yield desc
+
+ # Add public attributes of `AsyncQuery` as properties.
+ for attr in (
+ 'descriptor_type',
+ 'endpoints',
+ 'resource',
+ 'compression',
+ 'retries',
+ 'fall_back_to_authority',
+ 'content',
+ 'error',
+ 'is_done',
+ 'download_url',
+ 'start_time',
+ 'timeout',
+ 'runtime',
+ 'validate',
+ 'document_handler',
+ 'reply_headers',
+ 'kwargs',
+ ):
+ locals()[attr] = property(
+ functools.partial(
+ lambda self, attr_name: getattr(self._wrapped_instance, attr_name),
+ attr_name=attr,
+ ),
+ )
+
+
class DescriptorDownloader(object):
"""
Configurable class that issues :class:`~stem.descriptor.remote.Query`
@@ -925,7 +981,7 @@ class DescriptorDownloader(object):
return Query(resource, **args)
-def _download_from_orport(endpoint: stem.ORPort, compression: Sequence[stem.descriptor._Compression], resource: str) -> Tuple[bytes, Dict[str, str]]:
+async def _download_from_orport(endpoint: stem.ORPort, compression: Sequence[stem.descriptor._Compression], resource: str) -> Tuple[bytes, Dict[str, str]]:
"""
Downloads descriptors from the given orport. Payload is just like an http
response (headers and all)...
@@ -956,15 +1012,15 @@ def _download_from_orport(endpoint: stem.ORPort, compression: Sequence[stem.desc
link_protocols = endpoint.link_protocols if endpoint.link_protocols else [3]
- with stem.client.Relay.connect(endpoint.address, endpoint.port, link_protocols) as relay:
- with relay.create_circuit() as circ:
+ async with await stem.client.Relay.connect(endpoint.address, endpoint.port, link_protocols) as relay:
+ async with await relay.create_circuit() as circ:
request = '\r\n'.join((
'GET %s HTTP/1.0' % resource,
'Accept-Encoding: %s' % ', '.join(map(lambda c: c.encoding, compression)),
'User-Agent: %s' % stem.USER_AGENT,
)) + '\r\n\r\n'
- response = circ.directory(request, stream_id = 1)
+ response = await circ.directory(request, stream_id = 1)
first_line, data = response.split(b'\r\n', 1)
header_data, body_data = data.split(b'\r\n\r\n', 1)
@@ -983,7 +1039,7 @@ def _download_from_orport(endpoint: stem.ORPort, compression: Sequence[stem.desc
return _decompress(body_data, headers.get('Content-Encoding')), headers
-def _download_from_dirport(url: str, compression: Sequence[stem.descriptor._Compression], timeout: Optional[float]) -> Tuple[bytes, Dict[str, str]]:
+async def _download_from_dirport(url: str, compression: Sequence[stem.descriptor._Compression], timeout: Optional[float]) -> Tuple[bytes, Dict[str, str]]:
"""
Downloads descriptors from the given url.
@@ -998,17 +1054,19 @@ def _download_from_dirport(url: str, compression: Sequence[stem.descriptor._Comp
* :class:`~stem.DownloadFailed` if our request fails
"""
+ # TODO: use an asyncronous solution for the HTTP request.
+ request = urllib.request.Request(
+ url,
+ headers = {
+ 'Accept-Encoding': ', '.join(map(lambda c: c.encoding, compression)),
+ 'User-Agent': stem.USER_AGENT,
+ }
+ )
+ get_response = functools.partial(urllib.request.urlopen, request, timeout = timeout)
+
+ loop = asyncio.get_event_loop()
try:
- response = urllib.request.urlopen(
- urllib.request.Request(
- url,
- headers = {
- 'Accept-Encoding': ', '.join(map(lambda c: c.encoding, compression)),
- 'User-Agent': stem.USER_AGENT,
- }
- ),
- timeout = timeout,
- )
+ response = await loop.run_in_executor(None, get_response)
except socket.timeout as exc:
raise stem.DownloadTimeout(url, exc, sys.exc_info()[2], timeout)
except:
diff --git a/stem/util/test_tools.py b/stem/util/test_tools.py
index f3c736a1..455f3da3 100644
--- a/stem/util/test_tools.py
+++ b/stem/util/test_tools.py
@@ -251,7 +251,7 @@ class TimedTestRunner(unittest.TextTestRunner):
TEST_RUNTIMES[self.id()] = time.time() - start_time
return result
- def assertRaisesWith(self, exc_type: Type[Exception], exc_msg: str, func: Callable, *args: Any, **kwargs: Any) -> None:
+ def assertRaisesWith(self, exc_type: Type[Exception], exc_msg: str, *args: Any, **kwargs: Any) -> None:
"""
Asserts the given invokation raises the expected excepiton. This is
similar to unittest's assertRaises and assertRaisesRegexp, but checks
@@ -262,7 +262,7 @@ class TimedTestRunner(unittest.TextTestRunner):
vended API then please let us know.
"""
- return self.assertRaisesRegexp(exc_type, '^%s$' % re.escape(exc_msg), func, *args, **kwargs)
+ return self.assertRaisesRegexp(exc_type, '^%s$' % re.escape(exc_msg), *args, **kwargs)
def id(self) -> str:
return '%s.%s.%s' % (original_type.__module__, original_type.__name__, self._testMethodName)
diff --git a/test/integ/client/connection.py b/test/integ/client/connection.py
index 2294a07d..316d54ba 100644
--- a/test/integ/client/connection.py
+++ b/test/integ/client/connection.py
@@ -9,46 +9,57 @@ import stem
import test.runner
from stem.client import Relay
+from stem.util.test_tools import async_test
class TestConnection(unittest.TestCase):
- def test_invalid_arguments(self):
+ @async_test
+ async def test_invalid_arguments(self):
"""
Provide invalid arguments to Relay.connect().
"""
- self.assertRaisesWith(ValueError, "'nope' isn't an IPv4 or IPv6 address", Relay.connect, 'nope', 80)
- self.assertRaisesWith(ValueError, "'-54' isn't a valid port", Relay.connect, '127.0.0.1', -54)
- self.assertRaisesWith(ValueError, "Connection can't be established without a link protocol.", Relay.connect, '127.0.0.1', 54, [])
+ with self.assertRaisesWith(ValueError, "'nope' isn't an IPv4 or IPv6 address"):
+ await Relay.connect('nope', 80)
+ with self.assertRaisesWith(ValueError, "'-54' isn't a valid port"):
+ await Relay.connect('127.0.0.1', -54)
+ with self.assertRaisesWith(ValueError, "Connection can't be established without a link protocol."):
+ await Relay.connect('127.0.0.1', 54, [])
- def test_not_orport(self):
+ @async_test
+ async def test_not_orport(self):
"""
Attempt to connect to an ORPort that doesn't exist.
"""
- self.assertRaisesWith(stem.SocketError, "Failed to connect to 127.0.0.1:1587. Maybe it isn't an ORPort?", Relay.connect, '127.0.0.1', 1587)
+ with self.assertRaisesWith(stem.SocketError, "Failed to connect to 127.0.0.1:1587. Maybe it isn't an ORPort?"):
+ await Relay.connect('127.0.0.1', 1587)
# connect to our ControlPort like it's an ORPort
if test.runner.Torrc.PORT in test.runner.get_runner().get_options():
- self.assertRaisesWith(stem.SocketError, "Failed to SSL authenticate to 127.0.0.1:1111. Maybe it isn't an ORPort?", Relay.connect, '127.0.0.1', test.runner.CONTROL_PORT)
+ with self.assertRaisesWith(stem.SocketError, "Failed to SSL authenticate to 127.0.0.1:1111. Maybe it isn't an ORPort?"):
+ await Relay.connect('127.0.0.1', test.runner.CONTROL_PORT)
- def test_no_common_link_protocol(self):
+ @async_test
+ async def test_no_common_link_protocol(self):
"""
Connection without a commonly accepted link protocol version.
"""
for link_protocol in (1, 2, 6, 20):
- self.assertRaisesWith(stem.SocketError, 'Unable to establish a common link protocol with 127.0.0.1:1113', Relay.connect, '127.0.0.1', test.runner.ORPORT, [link_protocol])
+ with self.assertRaisesWith(stem.SocketError, 'Unable to establish a common link protocol with 127.0.0.1:1113'):
+ await Relay.connect('127.0.0.1', test.runner.ORPORT, [link_protocol])
- def test_connection_time(self):
+ @async_test
+ async def test_connection_time(self):
"""
Checks duration we've been connected.
"""
before = time.time()
- with Relay.connect('127.0.0.1', test.runner.ORPORT) as conn:
+ async with await Relay.connect('127.0.0.1', test.runner.ORPORT) as conn:
connection_time = conn.connection_time()
self.assertTrue(time.time() >= connection_time >= before)
time.sleep(0.02)
@@ -57,10 +68,11 @@ class TestConnection(unittest.TestCase):
self.assertFalse(conn.is_alive())
self.assertTrue(conn.connection_time() >= connection_time + 0.02)
- def test_established(self):
+ @async_test
+ async def test_established(self):
"""
Successfully establish ORPort connection.
"""
- conn = Relay.connect('127.0.0.1', test.runner.ORPORT)
+ conn = await Relay.connect('127.0.0.1', test.runner.ORPORT)
self.assertTrue(int(conn.link_protocol) in (4, 5))
diff --git a/test/unit/descriptor/remote.py b/test/unit/descriptor/remote.py
index e57da92b..33ee57fb 100644
--- a/test/unit/descriptor/remote.py
+++ b/test/unit/descriptor/remote.py
@@ -13,9 +13,10 @@ import stem.descriptor.remote
import stem.util.str_tools
import test.require
-from unittest.mock import patch, Mock, MagicMock
+from unittest.mock import patch, Mock
from stem.descriptor.remote import Compression
+from stem.util.test_tools import coro_func_returning_value
from test.unit.descriptor import read_resource
TEST_RESOURCE = '/tor/server/fp/9695DFC35FFEB861329B9F1AB04C46397020CE31'
@@ -78,11 +79,20 @@ def _orport_mock(data, encoding = 'identity', response_code_header = None):
cell.data = hunk
cells.append(cell)
- connect_mock = MagicMock()
- relay_mock = connect_mock().__enter__()
- circ_mock = relay_mock.create_circuit().__enter__()
- circ_mock.directory.return_value = data
- return connect_mock
+ class AsyncMock(Mock):
+ async def __aenter__(self):
+ return self
+
+ async def __aexit__(self, exc_type, exc_val, exc_tb):
+ return
+
+ circ_mock = AsyncMock()
+ circ_mock.directory.side_effect = coro_func_returning_value(data)
+
+ relay_mock = AsyncMock()
+ relay_mock.create_circuit.side_effect = coro_func_returning_value(circ_mock)
+
+ return coro_func_returning_value(relay_mock)
def _dirport_mock(data, encoding = 'identity'):
@@ -294,7 +304,7 @@ class TestDescriptorDownloader(unittest.TestCase):
skip_crypto_validation = not test.require.CRYPTOGRAPHY_AVAILABLE,
)
- self.assertEqual(stem.DirPort('128.31.0.39', 9131), query._pick_endpoint())
+ self.assertEqual(stem.DirPort('128.31.0.39', 9131), query._wrapped_instance._pick_endpoint())
descriptors = list(query)
self.assertEqual(1, len(descriptors))
1
0
commit 152497ffaff55edc956f7423ab03e90697a252d3
Author: Illia Volochii <illia.volochii(a)gmail.com>
Date: Sun May 17 18:29:57 2020 +0300
Update docstrings in stem/socket.py
---
stem/socket.py | 54 ++++++++++++++++++++++++++++--------------------------
1 file changed, 28 insertions(+), 26 deletions(-)
diff --git a/stem/socket.py b/stem/socket.py
index e8ae492b..0feae831 100644
--- a/stem/socket.py
+++ b/stem/socket.py
@@ -15,31 +15,42 @@ Tor...
::
- import stem
+ import asyncio
+ import sys
+
import stem.connection
import stem.socket
- if __name__ == '__main__':
+ async def print_version() -> None:
try:
control_socket = stem.socket.ControlPort(port = 9051)
- stem.connection.authenticate(control_socket)
+ await control_socket.connect()
+ await stem.connection.authenticate(control_socket)
except stem.SocketError as exc:
- print 'Unable to connect to tor on port 9051: %s' % exc
+ print(f'Unable to connect to tor on port 9051: {exc}')
sys.exit(1)
except stem.connection.AuthenticationFailure as exc:
- print 'Unable to authenticate: %s' % exc
+ print(f'Unable to authenticate: {exc}')
sys.exit(1)
- print "Issuing 'GETINFO version' query...\\n"
- control_socket.send('GETINFO version')
- print control_socket.recv()
+ print("Issuing 'GETINFO version' query...\\n")
+ await control_socket.send('GETINFO version')
+ print(await control_socket.recv())
+
+
+ if __name__ == '__main__':
+ loop = asyncio.get_event_loop()
+ try:
+ loop.run_until_complete(print_version())
+ finally:
+ loop.close()
::
% python example.py
Issuing 'GETINFO version' query...
- version=0.2.4.10-alpha-dev (git-8be6058d8f31e578)
+ version=0.4.3.5
OK
**Module Overview:**
@@ -66,6 +77,7 @@ Tor...
send_message - Writes a message to a control socket.
recv_message - Reads a ControlMessage from a control socket.
+ recv_message_from_bytes_io - Reads a ControlMessage from an I/O stream.
send_formatting - Performs the formatting expected from sent messages.
"""
@@ -210,11 +222,7 @@ class BaseSocket(object):
async def _send(self, message: Union[bytes, str], handler: Callable[[Union[socket.socket, ssl.SSLSocket], BinaryIO, Union[bytes, str]], None]) -> None:
"""
- Send message in a thread safe manner. Handler is expected to be of the form...
-
- ::
-
- my_handler(socket, socket_file, message)
+ Send message in a thread safe manner.
"""
with self._send_lock:
@@ -242,11 +250,7 @@ class BaseSocket(object):
async def _recv(self, handler):
"""
- Receives a message in a thread safe manner. Handler is expected to be of the form...
-
- ::
-
- my_handler(socket, socket_file)
+ Receives a message in a thread safe manner.
"""
with self._recv_lock:
@@ -405,7 +409,7 @@ class ControlSocket(BaseSocket):
receiving complete messages.
Callers should not instantiate this class directly, but rather use subclasses
- which are expected to implement the **_make_socket()** method.
+ which are expected to implement the **_open_connection()** method.
"""
def __init__(self) -> None:
@@ -483,7 +487,7 @@ class ControlSocketFile(ControlSocket):
"""
ControlSocketFile constructor.
- :param socket_path: path where the control socket is located
+ :param path: path where the control socket is located
"""
super(ControlSocketFile, self).__init__()
@@ -571,8 +575,7 @@ async def recv_message(reader: asyncio.StreamReader, arrived_at: Optional[float]
Pulls from a control socket until we either have a complete message or
encounter a problem.
- :param control_file: file derived from the control socket (see the
- socket's makefile() method for more information)
+ :param reader: reader object
:returns: :class:`~stem.response.ControlMessage` read from the socket
@@ -692,11 +695,10 @@ async def recv_message(reader: asyncio.StreamReader, arrived_at: Optional[float]
def recv_message_from_bytes_io(reader: asyncio.StreamReader, arrived_at: Optional[float] = None) -> 'stem.response.ControlMessage':
"""
- Pulls from a control socket until we either have a complete message or
+ Pulls from an I/O stream until we either have a complete message or
encounter a problem.
- :param file control_file: file derived from the control socket (see the
- socket's makefile() method for more information)
+ :param file reader: I/O stream
:returns: :class:`~stem.response.ControlMessage` read from the socket
1
0
15 Jul '20
commit e1dff48079d60674c7530871875666f054973c6a
Author: Illia Volochii <illia.volochii(a)gmail.com>
Date: Sun May 17 16:59:09 2020 +0300
Fix `AsyncController._event_listeners_lock`
---
stem/control.py | 61 ++++++++++++++++++++++++++++-----------------------------
1 file changed, 30 insertions(+), 31 deletions(-)
diff --git a/stem/control.py b/stem/control.py
index 2ffcf173..bdcdfb75 100644
--- a/stem/control.py
+++ b/stem/control.py
@@ -1040,8 +1040,8 @@ class AsyncController(BaseController):
# mapping of event types to their listeners
- self._event_listeners = {} # type: Dict[stem.control.EventType, List[Callable[[stem.response.events.Event], None]]]
- self._event_listeners_lock = threading.RLock()
+ self._event_listeners = {} # type: Dict[stem.control.EventType, List[Callable[[stem.response.events.Event], Union[None, Awaitable[None]]]]]
+ self._event_listeners_lock = stem.util.CombinedReentrantAndAsyncioLock()
self._enabled_features = [] # type: List[str]
self._last_address_exc = None # type: Optional[BaseException]
@@ -3062,7 +3062,7 @@ class AsyncController(BaseController):
# first checking that tor supports these event types
- with self._event_listeners_lock:
+ async with self._event_listeners_lock:
if self.is_authenticated():
for event_type in events:
event_type = stem.response.events.EVENT_TYPE_TO_CLASS.get(event_type)
@@ -3091,7 +3091,7 @@ class AsyncController(BaseController):
:raises: :class:`stem.ProtocolError` if unable to set the events
"""
- with self._event_listeners_lock:
+ async with self._event_listeners_lock:
event_types_changed = False
for event_type, event_listeners in list(self._event_listeners.items()):
@@ -3760,7 +3760,7 @@ class AsyncController(BaseController):
# try to re-attach event listeners to the new instance
- with self._event_listeners_lock:
+ async with self._event_listeners_lock:
try:
failed_events = (await self._attach_listeners())[1]
@@ -3807,7 +3807,7 @@ class AsyncController(BaseController):
log.error('Tor sent a malformed event (%s): %s' % (exc, event_message))
event_type = MALFORMED_EVENTS
- with self._event_listeners_lock:
+ async with self._event_listeners_lock:
for listener_type, event_listeners in list(self._event_listeners.items()):
if listener_type == event_type:
for listener in event_listeners:
@@ -3830,34 +3830,33 @@ class AsyncController(BaseController):
set_events, failed_events = [], []
- with self._event_listeners_lock:
- if self.is_authenticated():
- # try to set them all
- response = await self.msg('SETEVENTS %s' % ' '.join(self._event_listeners.keys()))
+ if self.is_authenticated():
+ # try to set them all
+ response = await self.msg('SETEVENTS %s' % ' '.join(self._event_listeners.keys()))
- if response.is_ok():
- set_events = list(self._event_listeners.keys())
- else:
- # One of the following likely happened...
- #
- # * Our user attached listeners before having an authenticated
- # connection, so we couldn't check if we met the version
- # requirement.
- #
- # * User attached listeners to one tor instance, then connected us to
- # an older tor instancce.
- #
- # * Some other controller hiccup (far less likely).
- #
- # See if we can set some subset of our events.
+ if response.is_ok():
+ set_events = list(self._event_listeners.keys())
+ else:
+ # One of the following likely happened...
+ #
+ # * Our user attached listeners before having an authenticated
+ # connection, so we couldn't check if we met the version
+ # requirement.
+ #
+ # * User attached listeners to one tor instance, then connected us to
+ # an older tor instancce.
+ #
+ # * Some other controller hiccup (far less likely).
+ #
+ # See if we can set some subset of our events.
- for event in list(self._event_listeners.keys()):
- response = await self.msg('SETEVENTS %s' % ' '.join(set_events + [event]))
+ for event in list(self._event_listeners.keys()):
+ response = await self.msg('SETEVENTS %s' % ' '.join(set_events + [event]))
- if response.is_ok():
- set_events.append(event)
- else:
- failed_events.append(event)
+ if response.is_ok():
+ set_events.append(event)
+ else:
+ failed_events.append(event)
return (set_events, failed_events)
1
0
[stem/master] Fix mypy errors related to `stem.descriptor.remote.Query` attributes
by atagar@torproject.org 15 Jul '20
by atagar@torproject.org 15 Jul '20
15 Jul '20
commit 626d540d2d5fc6c9039ff738546795e7818324e9
Author: Illia Volochii <illia.volochii(a)gmail.com>
Date: Sun May 17 17:57:18 2020 +0300
Fix mypy errors related to `stem.descriptor.remote.Query` attributes
---
stem/descriptor/remote.py | 93 ++++++++++++++++++++++++++++++++++-------------
1 file changed, 67 insertions(+), 26 deletions(-)
diff --git a/stem/descriptor/remote.py b/stem/descriptor/remote.py
index 429954cb..4e1ad034 100644
--- a/stem/descriptor/remote.py
+++ b/stem/descriptor/remote.py
@@ -591,32 +591,73 @@ class Query(stem.util.AsyncClassWrapper):
for desc in self._execute_async_generator_method('__aiter__'):
yield desc
- # Add public attributes of `AsyncQuery` as properties.
- for attr in (
- 'descriptor_type',
- 'endpoints',
- 'resource',
- 'compression',
- 'retries',
- 'fall_back_to_authority',
- 'content',
- 'error',
- 'is_done',
- 'download_url',
- 'start_time',
- 'timeout',
- 'runtime',
- 'validate',
- 'document_handler',
- 'reply_headers',
- 'kwargs',
- ):
- locals()[attr] = property(
- functools.partial(
- lambda self, attr_name: getattr(self._wrapped_instance, attr_name),
- attr_name=attr,
- ),
- )
+ @property
+ def descriptor_type(self) -> str:
+ return self._wrapped_instance.descriptor_type
+
+ @property
+ def endpoints(self) -> List[Union[stem.ORPort, stem.DirPort]]:
+ return self._wrapped_instance.endpoints
+
+ @property
+ def resource(self) -> str:
+ return self._wrapped_instance.resource
+
+ @property
+ def compression(self) -> List[stem.descriptor._Compression]:
+ return self._wrapped_instance.compression
+
+ @property
+ def retries(self) -> int:
+ return self._wrapped_instance.retries
+
+ @property
+ def fall_back_to_authority(self) -> bool:
+ return self._wrapped_instance.fall_back_to_authority
+
+ @property
+ def content(self) -> Optional[bytes]:
+ return self._wrapped_instance.content
+
+ @property
+ def error(self) -> Optional[BaseException]:
+ return self._wrapped_instance.error
+
+ @property
+ def is_done(self) -> bool:
+ return self._wrapped_instance.is_done
+
+ @property
+ def download_url(self) -> Optional[str]:
+ return self._wrapped_instance.download_url
+
+ @property
+ def start_time(self) -> Optional[float]:
+ return self._wrapped_instance.start_time
+
+ @property
+ def timeout(self) -> Optional[float]:
+ return self._wrapped_instance.timeout
+
+ @property
+ def runtime(self) -> Optional[float]:
+ return self._wrapped_instance.runtime
+
+ @property
+ def validate(self) -> bool:
+ return self._wrapped_instance.validate
+
+ @property
+ def document_handler(self) -> stem.descriptor.DocumentHandler:
+ return self._wrapped_instance.document_handler
+
+ @property
+ def reply_headers(self) -> Optional[Dict[str, str]]:
+ return self._wrapped_instance.reply_headers
+
+ @property
+ def kwargs(self) -> Dict[str, Any]:
+ return self._wrapped_instance.kwargs
class DescriptorDownloader(object):
1
0
15 Jul '20
commit 9310094a8844531aeadc53caa2df5ca503fb3d34
Author: Illia Volochii <illia.volochii(a)gmail.com>
Date: Sun May 17 19:30:17 2020 +0300
Update docstrings in stem/descriptor/remote.py
---
stem/descriptor/remote.py | 213 ++++++++++++++++++++++++++++++++++++----------
1 file changed, 167 insertions(+), 46 deletions(-)
diff --git a/stem/descriptor/remote.py b/stem/descriptor/remote.py
index 4e1ad034..d7a833a4 100644
--- a/stem/descriptor/remote.py
+++ b/stem/descriptor/remote.py
@@ -238,23 +238,18 @@ class AsyncQuery(object):
advanced usage.
To block on the response and get results either call
- :func:`~stem.descriptor.remote.Query.run` or iterate over the Query. The
- :func:`~stem.descriptor.remote.Query.run` method pass along any errors that
- arise...
+ :func:`~stem.descriptor.remote.AsyncQuery.run` or iterate over the Query. The
+ :func:`~stem.descriptor.remote.AsyncQuery.run` method pass along any errors
+ that arise...
::
- from stem.descriptor.remote import Query
-
- query = Query(
- '/tor/server/all',
- timeout = 30,
- )
+ from stem.descriptor.remote import AsyncQuery
print('Current relays:')
try:
- for desc in Query('/tor/server/all', 'server-descriptor 1.0').run():
+ for desc in await AsyncQuery('/tor/server/all', 'server-descriptor 1.0').run():
print(desc.fingerprint)
except Exception as exc:
print('Unable to retrieve the server descriptors: %s' % exc)
@@ -265,7 +260,7 @@ class AsyncQuery(object):
print('Current relays:')
- for desc in Query('/tor/server/all', 'server-descriptor 1.0'):
+ async for desc in AsyncQuery('/tor/server/all', 'server-descriptor 1.0'):
print(desc.fingerprint)
In either case exceptions are available via our 'error' attribute.
@@ -298,39 +293,6 @@ class AsyncQuery(object):
For legacy reasons if our resource has a '.z' suffix then our **compression**
argument is overwritten with Compression.GZIP.
- .. versionchanged:: 1.7.0
- Added support for downloading from ORPorts.
-
- .. versionchanged:: 1.7.0
- Added the compression argument.
-
- .. versionchanged:: 1.7.0
- Added the reply_headers attribute.
-
- The class this provides changed between Python versions. In python2
- this was called httplib.HTTPMessage, whereas in python3 the class was
- renamed to http.client.HTTPMessage.
-
- .. versionchanged:: 1.7.0
- Avoid downloading from tor26. This directory authority throttles its
- DirPort to such an extent that requests either time out or take on the
- order of minutes.
-
- .. versionchanged:: 1.7.0
- Avoid downloading from Bifroest. This is the bridge authority so it
- doesn't vote in the consensus, and apparently times out frequently.
-
- .. versionchanged:: 1.8.0
- Serge has replaced Bifroest as our bridge authority. Avoiding descriptor
- downloads from it instead.
-
- .. versionchanged:: 1.8.0
- Defaulting to gzip compression rather than plaintext downloads.
-
- .. versionchanged:: 1.8.0
- Using :class:`~stem.descriptor.__init__.Compression` for our compression
- argument.
-
:var str resource: resource being fetched, such as '/tor/server/all'
:var str descriptor_type: type of descriptors being fetched (for options see
:func:`~stem.descriptor.__init__.parse_file`), this is guessed from the
@@ -562,7 +524,145 @@ class AsyncQuery(object):
class Query(stem.util.AsyncClassWrapper):
- def __init__(self, resource, descriptor_type = None, endpoints = None, compression = (Compression.GZIP,), retries = 2, fall_back_to_authority = False, timeout = None, start = True, block = False, validate = False, document_handler = stem.descriptor.DocumentHandler.ENTRIES, **kwargs):
+ """
+ Asynchronous request for descriptor content from a directory authority or
+ mirror. These can either be made through the
+ :class:`~stem.descriptor.remote.DescriptorDownloader` or directly for more
+ advanced usage.
+
+ To block on the response and get results either call
+ :func:`~stem.descriptor.remote.Query.run` or iterate over the Query. The
+ :func:`~stem.descriptor.remote.Query.run` method pass along any errors that
+ arise...
+
+ ::
+
+ from stem.descriptor.remote import Query
+
+ print('Current relays:')
+
+ try:
+ for desc in Query('/tor/server/all', 'server-descriptor 1.0').run():
+ print(desc.fingerprint)
+ except Exception as exc:
+ print('Unable to retrieve the server descriptors: %s' % exc)
+
+ ... while iterating fails silently...
+
+ ::
+
+ print('Current relays:')
+
+ for desc in Query('/tor/server/all', 'server-descriptor 1.0'):
+ print(desc.fingerprint)
+
+ In either case exceptions are available via our 'error' attribute.
+
+ Tor provides quite a few different descriptor resources via its directory
+ protocol (see section 4.2 and later of the `dir-spec
+ <https://gitweb.torproject.org/torspec.git/tree/dir-spec.txt>`_).
+ Commonly useful ones include...
+
+ =============================================== ===========
+ Resource Description
+ =============================================== ===========
+ /tor/server/all all present server descriptors
+ /tor/server/fp/<fp1>+<fp2>+<fp3> server descriptors with the given fingerprints
+ /tor/extra/all all present extrainfo descriptors
+ /tor/extra/fp/<fp1>+<fp2>+<fp3> extrainfo descriptors with the given fingerprints
+ /tor/micro/d/<hash1>-<hash2> microdescriptors with the given hashes
+ /tor/status-vote/current/consensus present consensus
+ /tor/status-vote/current/consensus-microdesc present microdescriptor consensus
+ /tor/status-vote/next/bandwidth bandwidth authority heuristics for the next consenus
+ /tor/status-vote/next/consensus-signatures detached signature, used for making the next consenus
+ /tor/keys/all key certificates for the authorities
+ /tor/keys/fp/<v3ident1>+<v3ident2> key certificates for specific authorities
+ =============================================== ===========
+
+ **ZSTD** compression requires `zstandard
+ <https://pypi.org/project/zstandard/>`_, and **LZMA** requires the `lzma
+ module <https://docs.python.org/3/library/lzma.html>`_.
+
+ For legacy reasons if our resource has a '.z' suffix then our **compression**
+ argument is overwritten with Compression.GZIP.
+
+ .. versionchanged:: 1.7.0
+ Added support for downloading from ORPorts.
+
+ .. versionchanged:: 1.7.0
+ Added the compression argument.
+
+ .. versionchanged:: 1.7.0
+ Added the reply_headers attribute.
+
+ The class this provides changed between Python versions. In python2
+ this was called httplib.HTTPMessage, whereas in python3 the class was
+ renamed to http.client.HTTPMessage.
+
+ .. versionchanged:: 1.7.0
+ Avoid downloading from tor26. This directory authority throttles its
+ DirPort to such an extent that requests either time out or take on the
+ order of minutes.
+
+ .. versionchanged:: 1.7.0
+ Avoid downloading from Bifroest. This is the bridge authority so it
+ doesn't vote in the consensus, and apparently times out frequently.
+
+ .. versionchanged:: 1.8.0
+ Serge has replaced Bifroest as our bridge authority. Avoiding descriptor
+ downloads from it instead.
+
+ .. versionchanged:: 1.8.0
+ Defaulting to gzip compression rather than plaintext downloads.
+
+ .. versionchanged:: 1.8.0
+ Using :class:`~stem.descriptor.__init__.Compression` for our compression
+ argument.
+
+ :var str resource: resource being fetched, such as '/tor/server/all'
+ :var str descriptor_type: type of descriptors being fetched (for options see
+ :func:`~stem.descriptor.__init__.parse_file`), this is guessed from the
+ resource if **None**
+
+ :var list endpoints: :class:`~stem.DirPort` or :class:`~stem.ORPort` of the
+ authority or mirror we're querying, this uses authorities if undefined
+ :var list compression: list of :data:`stem.descriptor.Compression`
+ we're willing to accept, when none are mutually supported downloads fall
+ back to Compression.PLAINTEXT
+ :var int retries: number of times to attempt the request if downloading it
+ fails
+ :var bool fall_back_to_authority: when retrying request issues the last
+ request to a directory authority if **True**
+
+ :var str content: downloaded descriptor content
+ :var Exception error: exception if a problem occured
+ :var bool is_done: flag that indicates if our request has finished
+
+ :var float start_time: unix timestamp when we first started running
+ :var http.client.HTTPMessage reply_headers: headers provided in the response,
+ **None** if we haven't yet made our request
+ :var float runtime: time our query took, this is **None** if it's not yet
+ finished
+
+ :var bool validate: checks the validity of the descriptor's content if
+ **True**, skips these checks otherwise
+ :var stem.descriptor.__init__.DocumentHandler document_handler: method in
+ which to parse a :class:`~stem.descriptor.networkstatus.NetworkStatusDocument`
+ :var dict kwargs: additional arguments for the descriptor constructor
+
+ Following are only applicable when downloading from a
+ :class:`~stem.DirPort`...
+
+ :var float timeout: duration before we'll time out our request
+ :var str download_url: last url used to download the descriptor, this is
+ unset until we've actually made a download attempt
+
+ :param start: start making the request when constructed (default is **True**)
+ :param block: only return after the request has been completed, this is
+ the same as running **query.run(True)** (default is **False**)
+ """
+
+ def __init__(self, resource: str, descriptor_type: Optional[str] = None, endpoints: Optional[Sequence[stem.Endpoint]] = None, compression: Union[stem.descriptor._Compression, Sequence[stem.descriptor._Compression]] = (Compression.GZIP,), retries: int = 2, fall_back_to_authority: bool = False, timeout: Optional[float] = None, start: bool = True, block: bool = False, validate: bool = False, document_handler: stem.descriptor.DocumentHandler = stem.descriptor.DocumentHandler.ENTRIES, **kwargs: Any) -> None:
self._thread_for_wrapped_class = stem.util.ThreadForWrappedAsyncClass()
self._thread_for_wrapped_class.start()
self._wrapped_instance: AsyncQuery = self._init_async_class(
@@ -582,9 +682,30 @@ class Query(stem.util.AsyncClassWrapper):
)
def start(self) -> None:
+ """
+ Starts downloading the scriptors if we haven't started already.
+ """
+
self._call_async_method_soon('start')
- def run(self, suppress = False):
+ def run(self, suppress: bool = False) -> List['stem.descriptor.Descriptor']:
+ """
+ Blocks until our request is complete then provides the descriptors. If we
+ haven't yet started our request then this does so.
+
+ :param suppress: avoids raising exceptions if **True**
+
+ :returns: list for the requested :class:`~stem.descriptor.__init__.Descriptor` instances
+
+ :raises:
+ Using the iterator can fail with the following if **suppress** is
+ **False**...
+
+ * **ValueError** if the descriptor contents is malformed
+ * :class:`~stem.DownloadTimeout` if our request timed out
+ * :class:`~stem.DownloadFailed` if our request fails
+ """
+
return self._execute_async_method('run', suppress)
def __iter__(self):
1
0
commit fb803c96db3ac357850b93730a33154adc719a89
Author: Illia Volochii <illia.volochii(a)gmail.com>
Date: Sun May 17 17:40:29 2020 +0300
Add type hints to new functions
---
stem/descriptor/remote.py | 4 ++--
stem/util/__init__.py | 31 +++++++++++++++++--------------
stem/util/test_tools.py | 8 ++++----
3 files changed, 23 insertions(+), 20 deletions(-)
diff --git a/stem/descriptor/remote.py b/stem/descriptor/remote.py
index daa3b42c..429954cb 100644
--- a/stem/descriptor/remote.py
+++ b/stem/descriptor/remote.py
@@ -581,8 +581,8 @@ class Query(stem.util.AsyncClassWrapper):
**kwargs,
)
- def start(self):
- return self._call_async_method_soon('start')
+ def start(self) -> None:
+ self._call_async_method_soon('start')
def run(self, suppress = False):
return self._execute_async_method('run', suppress)
diff --git a/stem/util/__init__.py b/stem/util/__init__.py
index a230cfbd..7c53730c 100644
--- a/stem/util/__init__.py
+++ b/stem/util/__init__.py
@@ -8,8 +8,10 @@ Utility functions used by the stem library.
import asyncio
import datetime
import threading
+from concurrent.futures import Future
-from typing import Any, Union
+from types import TracebackType
+from typing import Any, AsyncIterator, Iterator, Optional, Type, Union
__all__ = [
'conf',
@@ -150,36 +152,37 @@ class CombinedReentrantAndAsyncioLock:
__slots__ = ('_r_lock', '_async_lock')
- def __init__(self):
+ def __init__(self) -> None:
self._r_lock = threading.RLock()
self._async_lock = asyncio.Lock()
- async def acquire(self):
+ async def acquire(self) -> bool:
await self._async_lock.acquire()
self._r_lock.acquire()
+ return True
- def release(self):
+ def release(self) -> None:
self._r_lock.release()
self._async_lock.release()
- async def __aenter__(self):
+ async def __aenter__(self) -> 'CombinedReentrantAndAsyncioLock':
await self.acquire()
return self
- async def __aexit__(self, exc_type, exc_val, exc_tb):
+ async def __aexit__(self, exit_type: Optional[Type[BaseException]], value: Optional[BaseException], traceback: Optional[TracebackType]) -> None:
self.release()
class ThreadForWrappedAsyncClass(threading.Thread):
- def __init__(self, *args, **kwargs):
+ def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, *kwargs)
self.loop = asyncio.new_event_loop()
self.setDaemon(True)
- def run(self):
+ def run(self) -> None:
self.loop.run_forever()
- def join(self, timeout=None):
+ def join(self, timeout: Optional[float] = None) -> None:
self.loop.call_soon_threadsafe(self.loop.stop)
super().join(timeout)
self.loop.close()
@@ -189,7 +192,7 @@ class AsyncClassWrapper:
_thread_for_wrapped_class: ThreadForWrappedAsyncClass
_wrapped_instance: type
- def _init_async_class(self, async_class, *args, **kwargs):
+ def _init_async_class(self, async_class: Type, *args: Any, **kwargs: Any) -> Any:
thread = self._thread_for_wrapped_class
# The asynchronous class should be initialized in the thread where
# its methods will be executed.
@@ -201,17 +204,17 @@ class AsyncClassWrapper:
return async_class(*args, **kwargs)
- def _call_async_method_soon(self, method_name, *args, **kwargs):
+ def _call_async_method_soon(self, method_name: str, *args: Any, **kwargs: Any) -> Future:
return asyncio.run_coroutine_threadsafe(
getattr(self._wrapped_instance, method_name)(*args, **kwargs),
self._thread_for_wrapped_class.loop,
)
- def _execute_async_method(self, method_name, *args, **kwargs):
+ def _execute_async_method(self, method_name: str, *args: Any, **kwargs: Any) -> Any:
return self._call_async_method_soon(method_name, *args, **kwargs).result()
- def _execute_async_generator_method(self, method_name, *args, **kwargs):
- async def convert_async_generator(generator):
+ def _execute_async_generator_method(self, method_name: str, *args: Any, **kwargs: Any) -> Iterator:
+ async def convert_async_generator(generator: AsyncIterator) -> Iterator:
return iter([d async for d in generator])
return asyncio.run_coroutine_threadsafe(
diff --git a/stem/util/test_tools.py b/stem/util/test_tools.py
index 455f3da3..f5ae7525 100644
--- a/stem/util/test_tools.py
+++ b/stem/util/test_tools.py
@@ -46,7 +46,7 @@ import stem.util.conf
import stem.util.enum
import stem.util.system
-from typing import Any, Callable, Dict, Iterator, List, Mapping, Optional, Sequence, Tuple, Type, Union
+from typing import Any, Awaitable, Callable, Dict, Iterator, List, Mapping, Optional, Sequence, Tuple, Type, Union
CONFIG = stem.util.conf.config_dict('test', {
'pycodestyle.ignore': [],
@@ -686,7 +686,7 @@ def _is_ignored(config: Mapping[str, Sequence[str]], path: str, issue: str) -> b
def async_test(func: Callable) -> Callable:
@functools.wraps(func)
- def wrapper(*args, **kwargs):
+ def wrapper(*args: Any, **kwargs: Any) -> Any:
loop = asyncio.new_event_loop()
try:
result = loop.run_until_complete(func(*args, **kwargs))
@@ -696,13 +696,13 @@ def async_test(func: Callable) -> Callable:
return wrapper
-def coro_func_returning_value(return_value):
+def coro_func_returning_value(return_value: Any) -> Callable[..., Awaitable]:
async def coroutine_func(*args, **kwargs):
return return_value
return coroutine_func
-def coro_func_raising_exc(exc):
+def coro_func_raising_exc(exc: Exception) -> Callable[..., Awaitable]:
async def coroutine_func(*args, **kwargs):
raise exc
return coroutine_func
1
0
[stem/master] Implement setting docstrings of `stem.control.Controller` methods
by atagar@torproject.org 15 Jul '20
by atagar@torproject.org 15 Jul '20
15 Jul '20
commit 26d2222480d6b10ef57097c9f40e44112408d1c1
Author: Illia Volochii <illia.volochii(a)gmail.com>
Date: Sun May 17 20:10:22 2020 +0300
Implement setting docstrings of `stem.control.Controller` methods
---
stem/control.py | 80 +++++++++++++++++++++++++++++++++++++++++++++++++++++++--
1 file changed, 78 insertions(+), 2 deletions(-)
diff --git a/stem/control.py b/stem/control.py
index bdcdfb75..6e15c16c 100644
--- a/stem/control.py
+++ b/stem/control.py
@@ -3861,7 +3861,18 @@ class AsyncController(BaseController):
return (set_events, failed_events)
+def _set_doc_from_async_controller(func: Callable) -> Callable:
+ func.__doc__ = getattr(AsyncController, func.__name__).__doc__
+ return func
+
+
class Controller(_BaseControllerSocketMixin, stem.util.AsyncClassWrapper):
+ """
+ Connection with Tor's control socket. This wraps
+ :class:`~stem.control.AsyncController` to provide a synchronous
+ interface and for backwards compatibility.
+ """
+
@classmethod
def from_port(cls: Type, address: str = '127.0.0.1', port: Union[int, str] = 'default') -> 'stem.control.Controller':
"""
@@ -3915,198 +3926,263 @@ class Controller(_BaseControllerSocketMixin, stem.util.AsyncClassWrapper):
self._wrapped_instance: AsyncController = self._init_async_class(AsyncController, control_socket, is_authenticated)
self._socket = self._wrapped_instance._socket
+ @_set_doc_from_async_controller
def msg(self, message: str) -> stem.response.ControlMessage:
return self._execute_async_method('msg', message)
+ @_set_doc_from_async_controller
def is_authenticated(self) -> bool:
return self._wrapped_instance.is_authenticated()
+ @_set_doc_from_async_controller
def connect(self) -> None:
self._execute_async_method('connect')
+ @_set_doc_from_async_controller
def reconnect(self, *args: Any, **kwargs: Any) -> None:
self._execute_async_method('reconnect', *args, **kwargs)
+ @_set_doc_from_async_controller
def close(self) -> None:
self._execute_async_method('close')
+ @_set_doc_from_async_controller
def get_latest_heartbeat(self) -> float:
return self._wrapped_instance.get_latest_heartbeat()
+ @_set_doc_from_async_controller
def add_status_listener(self, callback: Callable[['stem.control.BaseController', 'stem.control.State', float], None], spawn: bool = True) -> None:
self._wrapped_instance.add_status_listener(callback, spawn)
+ @_set_doc_from_async_controller
def remove_status_listener(self, callback: Callable[['stem.control.Controller', 'stem.control.State', float], None]) -> bool:
self._wrapped_instance.remove_status_listener(callback)
+ @_set_doc_from_async_controller
def authenticate(self, *args: Any, **kwargs: Any) -> None:
self._execute_async_method('authenticate', *args, **kwargs)
+ @_set_doc_from_async_controller
def get_info(self, params: Union[str, Sequence[str]], default: Any = UNDEFINED, get_bytes: bool = False) -> Union[str, Dict[str, str]]:
return self._execute_async_method('get_info', params, default, get_bytes)
+ @_set_doc_from_async_controller
def get_version(self, default: Any = UNDEFINED) -> stem.version.Version:
return self._execute_async_method('get_version', default)
+ @_set_doc_from_async_controller
def get_exit_policy(self, default: Any = UNDEFINED) -> stem.exit_policy.ExitPolicy:
return self._execute_async_method('get_exit_policy', default)
+ @_set_doc_from_async_controller
def get_ports(self, listener_type: 'stem.control.Listener', default: Any = UNDEFINED) -> Sequence[int]:
return self._execute_async_method('get_ports', listener_type, default)
+ @_set_doc_from_async_controller
def get_listeners(self, listener_type: 'stem.control.Listener', default: Any = UNDEFINED) -> Sequence[Tuple[str, int]]:
return self._execute_async_method('get_listeners', listener_type, default)
+ @_set_doc_from_async_controller
def get_accounting_stats(self, default: Any = UNDEFINED) -> 'stem.control.AccountingStats':
return self._execute_async_method('get_accounting_stats', default)
+ @_set_doc_from_async_controller
def get_protocolinfo(self, default: Any = UNDEFINED) -> stem.response.protocolinfo.ProtocolInfoResponse:
return self._execute_async_method('get_protocolinfo', default)
+ @_set_doc_from_async_controller
def get_user(self, default: Any = UNDEFINED) -> str:
return self._execute_async_method('get_user', default)
+ @_set_doc_from_async_controller
def get_pid(self, default: Any = UNDEFINED) -> int:
return self._execute_async_method('get_pid', default)
+ @_set_doc_from_async_controller
def get_start_time(self, default: Any = UNDEFINED) -> float:
return self._execute_async_method('get_start_time', default)
+ @_set_doc_from_async_controller
def get_uptime(self, default: Any = UNDEFINED) -> float:
return self._execute_async_method('get_uptime', default)
+ @_set_doc_from_async_controller
def is_user_traffic_allowed(self) -> 'stem.control.UserTrafficAllowed':
return self._execute_async_method('is_user_traffic_allowed')
+ @_set_doc_from_async_controller
def get_microdescriptor(self, relay: Optional[str] = None, default: Any = UNDEFINED) -> stem.descriptor.microdescriptor.Microdescriptor:
return self._execute_async_method('get_microdescriptor', relay, default)
+ @_set_doc_from_async_controller
def get_microdescriptors(self, default: Any = UNDEFINED) -> Iterator[stem.descriptor.microdescriptor.Microdescriptor]:
return self._execute_async_generator_method('get_microdescriptors', default)
+ @_set_doc_from_async_controller
def get_server_descriptor(self, relay: Optional[str] = None, default: Any = UNDEFINED) -> stem.descriptor.server_descriptor.RelayDescriptor:
return self._execute_async_method('get_server_descriptor', relay, default)
+ @_set_doc_from_async_controller
def get_server_descriptors(self, default: Any = UNDEFINED) -> Iterator[stem.descriptor.server_descriptor.RelayDescriptor]:
return self._execute_async_generator_method('get_server_descriptors', default)
+ @_set_doc_from_async_controller
def get_network_status(self, relay: Optional[str] = None, default: Any = UNDEFINED) -> stem.descriptor.router_status_entry.RouterStatusEntryV3:
return self._execute_async_method('get_network_status', relay, default)
+ @_set_doc_from_async_controller
def get_network_statuses(self, default: Any = UNDEFINED) -> Iterator[stem.descriptor.router_status_entry.RouterStatusEntryV3]:
return self._execute_async_generator_method('get_network_statuses', default)
+ @_set_doc_from_async_controller
def get_hidden_service_descriptor(self, address: str, default: Any = UNDEFINED, servers: Optional[Sequence[str]] = None, await_result: bool = True, timeout: Optional[float] = None) -> stem.descriptor.hidden_service.HiddenServiceDescriptorV2:
return self._execute_async_method('get_hidden_service_descriptor', address, default, servers, await_result, timeout)
+ @_set_doc_from_async_controller
def get_conf(self, param: str, default: Any = UNDEFINED, multiple: bool = False) -> Union[str, Sequence[str]]:
return self._execute_async_method('get_conf', param, default, multiple)
+ @_set_doc_from_async_controller
def get_conf_map(self, params: Union[str, Sequence[str]], default: Any = UNDEFINED, multiple: bool = True) -> Dict[str, Union[str, Sequence[str]]]:
return self._execute_async_method('get_conf_map', params, default, multiple)
+ @_set_doc_from_async_controller
def is_set(self, param: str, default: Any = UNDEFINED) -> bool:
return self._execute_async_method('is_set', param, default)
+ @_set_doc_from_async_controller
def set_conf(self, param: str, value: Union[str, Sequence[str]]) -> None:
self._execute_async_method('set_conf', param, value)
+ @_set_doc_from_async_controller
def reset_conf(self, *params: str) -> None:
self._execute_async_method('reset_conf', *params)
+ @_set_doc_from_async_controller
def set_options(self, params: Union[Mapping[str, Union[str, Sequence[str]]], Sequence[Tuple[str, Union[str, Sequence[str]]]]], reset: bool = False) -> None:
self._execute_async_method('set_options', params, reset)
+ @_set_doc_from_async_controller
def get_hidden_service_conf(self, default: Any = UNDEFINED) -> Dict[str, Any]:
return self._execute_async_method('get_hidden_service_conf', default)
+ @_set_doc_from_async_controller
def set_hidden_service_conf(self, conf: Mapping[str, Any]) -> None:
self._execute_async_method('set_hidden_service_conf', conf)
+ @_set_doc_from_async_controller
def create_hidden_service(self, path: str, port: int, target_address: Optional[str] = None, target_port: Optional[int] = None, auth_type: Optional[str] = None, client_names: Optional[Sequence[str]] = None) -> 'stem.control.CreateHiddenServiceOutput':
return self._execute_async_method('create_hidden_service', path, port, target_address, target_port, auth_type, client_names)
+ @_set_doc_from_async_controller
def remove_hidden_service(self, path: str, port: Optional[int] = None) -> bool:
return self._execute_async_method('remove_hidden_service', path, port)
+ @_set_doc_from_async_controller
def list_ephemeral_hidden_services(self, default: Any = UNDEFINED, our_services: bool = True, detached: bool = False) -> Sequence[str]:
return self._execute_async_method('list_ephemeral_hidden_services', default, our_services, detached)
+ @_set_doc_from_async_controller
def create_ephemeral_hidden_service(self, ports: Union[int, Sequence[int], Mapping[int, str]], key_type: str = 'NEW', key_content: str = 'BEST', discard_key: bool = False, detached: bool = False, await_publication: bool = False, timeout: Optional[float] = None, basic_auth: Optional[Mapping[str, str]] = None, max_streams: Optional[int] = None) -> stem.response.add_onion.AddOnionResponse:
return self._execute_async_method('create_ephemeral_hidden_service', ports, key_type, key_content, discard_key, detached, await_publication, timeout, basic_auth, max_streams)
+ @_set_doc_from_async_controller
def remove_ephemeral_hidden_service(self, service_id: str) -> bool:
return self._execute_async_method('remove_ephemeral_hidden_service', service_id)
- def add_event_listener(self, listener: Callable[[stem.response.events.Event], None], *events: 'stem.control.EventType') -> None:
+ @_set_doc_from_async_controller
+ def add_event_listener(self, listener: Callable[[stem.response.events.Event], Union[None, Awaitable[None]]], *events: 'stem.control.EventType') -> None:
self._execute_async_method('add_event_listener', listener, *events)
- def remove_event_listener(self, listener: Callable[[stem.response.events.Event], None]) -> None:
+ @_set_doc_from_async_controller
+ def remove_event_listener(self, listener: Callable[[stem.response.events.Event], Union[None, Awaitable[None]]]) -> None:
self._execute_async_method('remove_event_listener', listener)
+ @_set_doc_from_async_controller
def is_caching_enabled(self) -> bool:
return self._wrapped_instance.is_caching_enabled()
+ @_set_doc_from_async_controller
def set_caching(self, enabled: bool) -> None:
self._wrapped_instance.set_caching(enabled)
+ @_set_doc_from_async_controller
def clear_cache(self) -> None:
self._wrapped_instance.clear_cache()
+ @_set_doc_from_async_controller
def load_conf(self, configtext: str) -> None:
self._execute_async_method('load_conf', configtext)
+ @_set_doc_from_async_controller
def save_conf(self, force: bool = False) -> None:
return self._execute_async_method('save_conf', force)
+ @_set_doc_from_async_controller
def is_feature_enabled(self, feature: str) -> bool:
return self._wrapped_instance.is_feature_enabled(feature)
+ @_set_doc_from_async_controller
def enable_feature(self, features: Union[str, Sequence[str]]) -> None:
self._wrapped_instance.enable_feature(features)
+ @_set_doc_from_async_controller
def get_circuit(self, circuit_id: int, default: Any = UNDEFINED) -> stem.response.events.CircuitEvent:
return self._execute_async_method('get_circuit', circuit_id, default)
+ @_set_doc_from_async_controller
def get_circuits(self, default: Any = UNDEFINED) -> List[stem.response.events.CircuitEvent]:
return self._execute_async_method('get_circuits', default)
+ @_set_doc_from_async_controller
def new_circuit(self, path: Union[None, str, Sequence[str]] = None, purpose: str = 'general', await_build: bool = False, timeout: Optional[float] = None) -> str:
return self._execute_async_method('new_circuit', path, purpose, await_build, timeout)
+ @_set_doc_from_async_controller
def extend_circuit(self, circuit_id: str = '0', path: Union[None, str, Sequence[str]] = None, purpose: str = 'general', await_build: bool = False, timeout: Optional[float] = None) -> str:
return self._execute_async_method('extend_circuit', circuit_id, path, purpose, await_build, timeout)
+ @_set_doc_from_async_controller
def repurpose_circuit(self, circuit_id: str, purpose: str) -> None:
self._execute_async_method('repurpose_circuit', circuit_id, purpose)
+ @_set_doc_from_async_controller
def close_circuit(self, circuit_id: str, flag: str = '') -> None:
self._execute_async_method('close_circuit', circuit_id, flag)
+ @_set_doc_from_async_controller
def get_streams(self, default: Any = UNDEFINED) -> List[stem.response.events.StreamEvent]:
return self._execute_async_method('get_streams', default)
+ @_set_doc_from_async_controller
def attach_stream(self, stream_id: str, circuit_id: str, exiting_hop: Optional[int] = None) -> None:
self._execute_async_method('attach_stream', stream_id, circuit_id, exiting_hop)
+ @_set_doc_from_async_controller
def close_stream(self, stream_id: str, reason: stem.RelayEndReason = stem.RelayEndReason.MISC, flag: str = '') -> None:
self._execute_async_method('close_stream', stream_id, reason, flag)
+ @_set_doc_from_async_controller
def signal(self, signal: stem.Signal) -> None:
self._execute_async_method('signal', signal)
+ @_set_doc_from_async_controller
def is_newnym_available(self) -> bool:
return self._wrapped_instance.is_newnym_available()
+ @_set_doc_from_async_controller
def get_newnym_wait(self) -> float:
return self._wrapped_instance.get_newnym_wait()
+ @_set_doc_from_async_controller
def get_effective_rate(self, default: Any = UNDEFINED, burst: bool = False) -> int:
return self._execute_async_method('get_effective_rate', default, burst)
+ @_set_doc_from_async_controller
def map_address(self, mapping: Mapping[str, str]) -> Dict[str, str]:
return self._execute_async_method('map_address', mapping)
+ @_set_doc_from_async_controller
def drop_guards(self) -> None:
self._execute_async_method('drop_guards')
1
0
15 Jul '20
commit cfd1f3d74f0dbb118c948d8520437498b5a30465
Author: Illia Volochii <illia.volochii(a)gmail.com>
Date: Sun May 17 19:06:25 2020 +0300
Update docstrings in stem/connection.py
---
stem/connection.py | 115 ++++++++++++++++++++++++++++++++++++++---------------
1 file changed, 82 insertions(+), 33 deletions(-)
diff --git a/stem/connection.py b/stem/connection.py
index 8f57f3b3..de76a345 100644
--- a/stem/connection.py
+++ b/stem/connection.py
@@ -22,60 +22,74 @@ exceptions, etc).
if not controller:
sys.exit(1) # unable to get a connection
- print 'Tor is running version %s' % controller.get_version()
+ print(f'Tor is running version {controller.get_version()}')
controller.close()
::
% python example.py
- Tor is running version 0.2.4.10-alpha-dev (git-8be6058d8f31e578)
+ Tor is running version 0.4.3.5
... or if Tor isn't running...
::
% python example.py
- [Errno 111] Connection refused
+ [Errno 111] Connect call failed
The :func:`~stem.connection.authenticate` function, however, gives easy but
fine-grained control over the authentication process. For instance...
::
- import sys
+ import asyncio
import getpass
+ import sys
+
import stem.connection
import stem.socket
- try:
- control_socket = stem.socket.ControlPort(port = 9051)
- except stem.SocketError as exc:
- print 'Unable to connect to port 9051 (%s)' % exc
- sys.exit(1)
- try:
- stem.connection.authenticate(control_socket)
- except stem.connection.IncorrectSocketType:
- print 'Please check in your torrc that 9051 is the ControlPort.'
- print 'Maybe you configured it to be the ORPort or SocksPort instead?'
- sys.exit(1)
- except stem.connection.MissingPassword:
- controller_password = getpass.getpass('Controller password: ')
+ async def authenticate() -> None:
+ try:
+ control_socket = stem.socket.ControlPort(port=9051)
+ await control_socket.connect()
+ except stem.SocketError as exc:
+ print(f'Unable to connect to port 9051 ({exc})')
+ sys.exit(1)
try:
- stem.connection.authenticate_password(control_socket, controller_password)
- except stem.connection.PasswordAuthFailed:
- print 'Unable to authenticate, password is incorrect'
+ await stem.connection.authenticate(control_socket)
+ except stem.connection.IncorrectSocketType:
+ print('Please check in your torrc that 9051 is the ControlPort.')
+ print('Maybe you configured it to be the ORPort or SocksPort instead?')
+ sys.exit(1)
+ except stem.connection.MissingPassword:
+ controller_password = getpass.getpass('Controller password: ')
+
+ try:
+ await stem.connection.authenticate_password(control_socket, controller_password)
+ except stem.connection.PasswordAuthFailed:
+ print('Unable to authenticate, password is incorrect')
+ sys.exit(1)
+ except stem.connection.AuthenticationFailure as exc:
+ print(f'Unable to authenticate: {exc}')
sys.exit(1)
- except stem.connection.AuthenticationFailure as exc:
- print 'Unable to authenticate: %s' % exc
- sys.exit(1)
+
+
+ if __name__ == '__main__':
+ loop = asyncio.get_event_loop()
+ try:
+ loop.run_until_complete(authenticate())
+ finally:
+ loop.close()
**Module Overview:**
::
- connect - Simple method for getting authenticated control connection
+ connect - Simple method for getting authenticated control connection for synchronous usage.
+ async_connect - Simple method for getting authenticated control connection for asynchronous usage.
authenticate - Main method for authenticating to a control socket
authenticate_none - Authenticates to an open control socket
@@ -215,10 +229,10 @@ COMMON_TOR_COMMANDS = (
def connect(control_port: Tuple[str, Union[str, int]] = ('127.0.0.1', 'default'), control_socket: str = '/var/run/tor/control', password: Optional[str] = None, password_prompt: bool = False, chroot_path: Optional[str] = None, controller: Type = stem.control.Controller) -> Any:
"""
- Convenience function for quickly getting a control connection. This is very
- handy for debugging or CLI setup, handling setup and prompting for a password
- if necessary (and none is provided). If any issues arise this prints a
- description of the problem and returns **None**.
+ Convenience function for quickly getting a control connection for synchronous
+ usage. This is very handy for debugging or CLI setup, handling setup and
+ prompting for a password if necessary (and none is provided). If any issues
+ arise this prints a description of the problem and returns **None**.
If both a **control_port** and **control_socket** are provided then the
**control_socket** is tried first, and this provides a generic error message
@@ -243,8 +257,8 @@ def connect(control_port: Tuple[str, Union[str, int]] = ('127.0.0.1', 'default')
:param password_prompt: prompt for the controller password if it wasn't
supplied
:param chroot_path: path prefix if in a chroot environment
- :param controller: :class:`~stem.control.BaseController` subclass to be
- returned, this provides a :class:`~stem.socket.ControlSocket` if **None**
+ :param controller: :class:`~stem.control.Controller` subclass to be
+ returned
:returns: authenticated control connection, the type based on the controller argument
@@ -272,7 +286,41 @@ def connect(control_port: Tuple[str, Union[str, int]] = ('127.0.0.1', 'default')
raise
-async def connect_async(control_port = ('127.0.0.1', 'default'), control_socket = '/var/run/tor/control', password = None, password_prompt = False, chroot_path = None, controller = stem.control.AsyncController):
+async def connect_async(control_port: Tuple[str, Union[str, int]] = ('127.0.0.1', 'default'), control_socket: str = '/var/run/tor/control', password: Optional[str] = None, password_prompt: bool = False, chroot_path: Optional[str] = None, controller: Type[stem.control.BaseController] = stem.control.AsyncController) -> Any:
+ """
+ Convenience function for quickly getting a control connection for
+ asynchronous usage. This is very handy for debugging or CLI setup, handling
+ setup and prompting for a password if necessary (and none is provided). If
+ any issues arise this prints a description of the problem and returns
+ **None**.
+
+ If both a **control_port** and **control_socket** are provided then the
+ **control_socket** is tried first, and this provides a generic error message
+ if they're both unavailable.
+
+ In much the same vein as git porcelain commands, users should not rely on
+ details of how this works. Messages and details of this function's behavior
+ could change in the future.
+
+ If the **port** is **'default'** then this checks on both 9051 (default for
+ relays) and 9151 (default for the Tor Browser). This default may change in
+ the future.
+
+ :param contol_port: address and port tuple, for instance **('127.0.0.1', 9051)**
+ :param control_socket: path where the control socket is located
+ :param password: passphrase to authenticate to the socket
+ :param password_prompt: prompt for the controller password if it wasn't
+ supplied
+ :param chroot_path: path prefix if in a chroot environment
+ :param controller: :class:`~stem.control.BaseController` subclass to be
+ returned
+
+ :returns: authenticated control connection, the type based on the controller argument
+
+ :raises: **ValueError** if given an invalid control_port, or both
+ **control_port** and **control_socket** are **None**
+ """
+
if controller and not issubclass(controller, stem.control.BaseController):
raise ValueError('The provided controller should be a stem.control.BaseController subclass.')
return await _connect_async(control_port, control_socket, password, password_prompt, chroot_path, controller)
@@ -339,8 +387,9 @@ async def _connect_auth(control_socket: stem.socket.ControlSocket, password: str
:param password_prompt: prompt for the controller password if it wasn't
supplied
:param chroot_path: path prefix if in a chroot environment
- :param controller: :class:`~stem.control.BaseController` subclass to be
- returned, this provides a :class:`~stem.socket.ControlSocket` if **None**
+ :param controller: :class:`~stem.control.BaseController` or
+ :class:`~stem.control.Controller` subclass to be returned, this provides a
+ :class:`~stem.socket.ControlSocket` if **None**
:returns: authenticated control connection, the type based on the controller argument
"""
1
0
commit 341dc930bf89aed8567cad255b333b36576a5f06
Author: Damian Johnson <atagar(a)torproject.org>
Date: Sat Mar 21 17:12:29 2020 -0700
Prototype Controller asyncio usage
Still very much an experiment. Several things are broken but this adjusts our
Controller successfully wrap an asyncio socket.
from stem.control import Controller
controller = Controller.from_port()
controller.connect()
controller.authenticate()
print("Tor is running version %s" % controller.get_version())
controller.close()
----------------------------------------
% python3.7 demo.py
Tor is running version 0.4.3.0-alpha-dev (git-5e70c27e8560ac18)
Essentially this replaces our reader daemon with an asyncio loop. Synchronous
code can invoke this loop in the following ways...
* Non-blocking call of a coroutine
self._asyncio_loop.create_task(my_coroutine())
* Blocking call of a coroutine
result = asyncio.run_coroutine_threadsafe(my_coroutine(), self._asyncio_loop).result(timeout)
* Non-blocking call of a function
self._asyncio_loop.call_soon_threadsafe(my_function)
I'm not quite sold yet on if we'll go this direction. An asynchronous socket
has two potential advantages...
* Simplify multi-threading
* Offer asynchronous variants of our methods
Either or both of these might be dead ends. I'll need to experiment more, but
now that we've seen this work in this limited way we have something to build
off of.
---
stem/control.py | 56 ++++++++++++++++++++++++++++++++------------------------
1 file changed, 32 insertions(+), 24 deletions(-)
diff --git a/stem/control.py b/stem/control.py
index ec88aa38..2d33bd54 100644
--- a/stem/control.py
+++ b/stem/control.py
@@ -240,13 +240,13 @@ If you're fine with allowing your script to raise exceptions then this can be mo
=============== ===========
"""
+import asyncio
import calendar
import collections
import functools
import inspect
import io
import os
-import queue
import threading
import time
@@ -544,14 +544,21 @@ class BaseController(object):
def __init__(self, control_socket: stem.socket.ControlSocket, is_authenticated: bool = False) -> None:
self._socket = control_socket
+
+ self._asyncio_loop = asyncio.get_event_loop()
+
+ self._asyncio_thread = threading.Thread(target = self._asyncio_loop.run_forever, name = 'asyncio')
+ self._asyncio_thread.setDaemon(True)
+ self._asyncio_thread.start()
+
self._msg_lock = threading.RLock()
self._status_listeners = [] # type: List[Tuple[Callable[[stem.control.BaseController, stem.control.State, float], None], bool]] # tuples of the form (callback, spawn_thread)
self._status_listeners_lock = threading.RLock()
# queues where incoming messages are directed
- self._reply_queue = queue.Queue() # type: queue.Queue[Union[stem.response.ControlMessage, stem.ControllerError]]
- self._event_queue = queue.Queue() # type: queue.Queue[stem.response.ControlMessage]
+ self._reply_queue = asyncio.Queue() # type: asyncio.Queue[Union[stem.response.ControlMessage, stem.ControllerError]]
+ self._event_queue = asyncio.Queue() # type: asyncio.Queue[stem.response.ControlMessage]
# thread to continually pull from the control socket
self._reader_thread = None # type: Optional[threading.Thread]
@@ -637,15 +644,16 @@ class BaseController(object):
log.info('Socket experienced a problem (%s)' % response)
elif isinstance(response, stem.response.ControlMessage):
log.info('Failed to deliver a response: %s' % response)
- except queue.Empty:
+ except asyncio.QueueEmpty:
# the empty() method is documented to not be fully reliable so this
# isn't entirely surprising
break
try:
- self._socket.send(message)
- response = self._reply_queue.get()
+ self._asyncio_loop.create_task(self._socket.send(message))
+
+ response = asyncio.run_coroutine_threadsafe(self._reply_queue.get(), self._asyncio_loop).result()
# If the message we received back had an exception then re-raise it to the
# caller. Otherwise return the response.
@@ -716,7 +724,7 @@ class BaseController(object):
:raises: :class:`stem.SocketError` if unable to make a socket
"""
- self._socket.connect()
+ asyncio.run_coroutine_threadsafe(self._socket.connect(), self._asyncio_loop).result()
def close(self) -> None:
"""
@@ -724,7 +732,7 @@ class BaseController(object):
:func:`~stem.socket.BaseSocket.close` method.
"""
- self._socket.close()
+ asyncio.run_coroutine_threadsafe(self._socket.close(), self._asyncio_loop).result()
# Join on any outstanding state change listeners. Closing is a state change
# of its own, so if we have any listeners it's quite likely there's some
@@ -737,6 +745,9 @@ class BaseController(object):
if t.is_alive() and threading.current_thread() != t:
t.join()
+ self._asyncio_loop.call_soon_threadsafe(self._asyncio_loop.stop)
+ self._asyncio_thread.join()
+
def get_socket(self) -> stem.socket.ControlSocket:
"""
Provides the socket used to speak with the tor process. Communicating with
@@ -825,13 +836,13 @@ class BaseController(object):
pass
- def _connect(self) -> None:
+ async def _connect(self) -> None:
self._launch_threads()
self._notify_status_listeners(State.INIT)
- self._socket_connect()
+ await self._socket_connect()
self._is_authenticated = False
- def _close(self) -> None:
+ async def _close(self) -> None:
# Our is_alive() state is now false. Our reader thread should already be
# awake from recv() raising a closure exception. Wake up the event thread
# too so it can end.
@@ -847,7 +858,7 @@ class BaseController(object):
self._notify_status_listeners(State.CLOSED)
- self._socket_close()
+ await self._socket_close()
def _post_authentication(self) -> None:
# actions to be taken after we have a newly authenticated connection
@@ -904,21 +915,18 @@ class BaseController(object):
them if we're restarted.
"""
+ self._asyncio_loop.create_task(self._reader_loop())
+
# In theory concurrent calls could result in multiple start() calls on a
# single thread, which would cause an unexpected exception. Best be safe.
with self._socket._get_send_lock():
- if not self._reader_thread or not self._reader_thread.is_alive():
- self._reader_thread = threading.Thread(target = self._reader_loop, name = 'Tor listener')
- self._reader_thread.setDaemon(True)
- self._reader_thread.start()
-
if not self._event_thread or not self._event_thread.is_alive():
self._event_thread = threading.Thread(target = self._event_loop, name = 'Event notifier')
self._event_thread.setDaemon(True)
self._event_thread.start()
- def _reader_loop(self) -> None:
+ async def _reader_loop(self) -> None:
"""
Continually pulls from the control socket, directing the messages into
queues based on their type. Controller messages come in two varieties...
@@ -929,23 +937,23 @@ class BaseController(object):
while self.is_alive():
try:
- control_message = self._socket.recv()
+ control_message = await self._socket.recv()
self._last_heartbeat = time.time()
if control_message.content()[-1][0] == '650':
# asynchronous message, adds to the event queue and wakes up its handler
- self._event_queue.put(control_message)
+ await self._event_queue.put(control_message)
self._event_notice.set()
else:
# response to a msg() call
- self._reply_queue.put(control_message)
+ await self._reply_queue.put(control_message)
except stem.ControllerError as exc:
# Assume that all exceptions belong to the reader. This isn't always
# true, but the msg() call can do a better job of sorting it out.
#
# Be aware that the msg() method relies on this to unblock callers.
- self._reply_queue.put(exc)
+ await self._reply_queue.put(exc)
def _event_loop(self) -> None:
"""
@@ -970,7 +978,7 @@ class BaseController(object):
socket_closed_at = time.time()
elif time.time() - socket_closed_at > EVENTS_LISTENING_TIMEOUT:
break
- except queue.Empty:
+ except asyncio.queues.QueueEmpty:
if not self.is_alive():
break
@@ -3972,7 +3980,7 @@ def _get_with_timeout(event_queue: queue.Queue, timeout: float, start_time: floa
try:
return event_queue.get(True, time_left)
- except queue.Empty:
+ except asyncio.queues.QueueEmpty:
raise stem.Timeout('Reached our %0.1f second timeout' % timeout)
else:
return event_queue.get()
1
0
[stem/master] Revert "Add temporary scripts for testing async communication with sockets"
by atagar@torproject.org 15 Jul '20
by atagar@torproject.org 15 Jul '20
15 Jul '20
commit e6974dd5bbb804a6b98d172c2eb55206f6a47a56
Author: Illia Volochii <illia.volochii(a)gmail.com>
Date: Sat Mar 14 22:05:15 2020 +0200
Revert "Add temporary scripts for testing async communication with sockets"
This reverts commit 072b5e682faa6d190fa8cd4cc52651fdc10b3e01.
---
test_control_port.py | 30 ------------------------------
test_control_socket_file.py | 30 ------------------------------
test_relay_socket.py | 28 ----------------------------
3 files changed, 88 deletions(-)
diff --git a/test_control_port.py b/test_control_port.py
deleted file mode 100644
index 85fe26d4..00000000
--- a/test_control_port.py
+++ /dev/null
@@ -1,30 +0,0 @@
-import asyncio
-
-from stem.async_socket import ControlPort
-
-
-async def run_command(i, command):
- async with ControlPort() as cp:
- print(f'{i} Connecting')
- await cp.connect()
- print(f'{i} Authenticating')
- await cp.send('AUTHENTICATE "password"')
- print(f'{i} Receiving auth message')
- await cp.recv()
- print(f'{i} Sending the command')
- await cp.send(command)
- print(f'{i} Receiving result of the command')
- result = await cp.recv()
- print(f'{i} {result.content()}')
-
-
-if __name__ == '__main__':
- loop = asyncio.get_event_loop()
- tasks = asyncio.gather(
- run_command(1, 'PROTOCOLINFO 1'),
- run_command(2, 'GETINFO traffic/read'),
- )
- try:
- loop.run_until_complete(tasks)
- finally:
- loop.close()
diff --git a/test_control_socket_file.py b/test_control_socket_file.py
deleted file mode 100644
index 4c6816f4..00000000
--- a/test_control_socket_file.py
+++ /dev/null
@@ -1,30 +0,0 @@
-import asyncio
-
-from stem.async_socket import ControlSocketFile
-
-
-async def run_command(i, command):
- async with ControlSocketFile() as cp:
- print(f'{i} Connecting')
- await cp.connect()
- print(f'{i} Authenticating')
- await cp.send('AUTHENTICATE "password"')
- print(f'{i} Receiving auth message')
- await cp.recv()
- print(f'{i} Sending the command')
- await cp.send(command)
- print(f'{i} Receiving result of the command')
- result = await cp.recv()
- print(f'{i} {result.content()}')
-
-
-if __name__ == '__main__':
- loop = asyncio.get_event_loop()
- tasks = asyncio.gather(
- run_command(1, 'PROTOCOLINFO 1'),
- run_command(2, 'GETINFO traffic/read'),
- )
- try:
- loop.run_until_complete(tasks)
- finally:
- loop.close()
diff --git a/test_relay_socket.py b/test_relay_socket.py
deleted file mode 100644
index 76289d46..00000000
--- a/test_relay_socket.py
+++ /dev/null
@@ -1,28 +0,0 @@
-import asyncio
-
-from stem.client import DEFAULT_LINK_PROTOCOLS
-from stem.client.cell import VersionsCell
-from stem.async_socket import RelaySocket
-
-
-async def run_command(i, command):
- async with RelaySocket(address='127.0.0.1', port=443) as cp:
- print(f'{i} Connecting')
- await cp.connect()
- print(f'{i} Sending the command')
- await cp.send(command)
- print(f'{i} Receiving result of the command')
- result = await cp.recv(2)
- print(result)
-
-
-if __name__ == '__main__':
- loop = asyncio.get_event_loop()
- tasks = asyncio.gather(
- run_command(1, VersionsCell(DEFAULT_LINK_PROTOCOLS).pack(2)),
- run_command(2, VersionsCell(DEFAULT_LINK_PROTOCOLS).pack(2)),
- )
- try:
- loop.run_until_complete(tasks)
- finally:
- loop.close()
1
0