commit 6a7ebd9f12f89aa359d2df32cb47a44707a61008 Author: Damian Johnson atagar@torproject.org Date: Sun Nov 1 15:15:47 2020 -0800
Cache parsed descriptors within Query
Our Query class cached the bytes we download rather than parsed descriptors. This could be advantagous if a user downloads descriptors without caring about the results (unlikely), but otherwise it's all downside...
* Slower: The Query class downloads asynchronously so we can parallelize. By parsing when the results are requested we serialize that part of the runtime.
* Memory: Caching bytes reduced the upfront memory usage, but multiplies it upon retrieving the results because we create fresh Descriptor objects upon each invocation.
* Duplication: Each invocation of our run method re-parsed the descriptors. For larger documents like the consensus this duplicates a lot of work.
* Complexity: Caching bytes needlessly complicated the run method. --- stem/descriptor/remote.py | 67 +++++++++++++++++++---------------------------- 1 file changed, 27 insertions(+), 40 deletions(-)
diff --git a/stem/descriptor/remote.py b/stem/descriptor/remote.py index e8367e8b..136b9d15 100644 --- a/stem/descriptor/remote.py +++ b/stem/descriptor/remote.py @@ -338,9 +338,8 @@ class Query(Synchronous): :var bool fall_back_to_authority: when retrying request issues the last request to a directory authority if **True**
- :var str content: downloaded descriptor content + :var list downloaded: downloaded descriptors, **None** if not yet retrieved :var Exception error: exception if a problem occured - :var bool is_done: flag that indicates if our request has finished
:var float start_time: unix timestamp when we first started running :var dict reply_headers: headers provided in the response, @@ -413,9 +412,8 @@ class Query(Synchronous): self.retries = retries self.fall_back_to_authority = fall_back_to_authority
- self.content = None # type: Optional[bytes] + self.downloaded = None # type: Optional[List[stem.descriptor.Descriptor]] self.error = None # type: Optional[BaseException] - self.is_done = False self.download_url = None # type: Optional[str]
self.start_time = None # type: Optional[float] @@ -470,8 +468,14 @@ class Query(Synchronous):
async def _run(self, suppress: bool) -> AsyncIterator[stem.descriptor.Descriptor]: with self._downloader_lock: - self.start() - await self._downloader_task + if not self.downloaded and not self.error: + if not self._downloader_task: + self.start() + + try: + self.downloaded = await self._downloader_task + except Exception as exc: + self.error = exc
if self.error: if suppress: @@ -479,30 +483,8 @@ class Query(Synchronous):
raise self.error else: - if self.content is None: - if suppress: - return - - raise ValueError('BUG: _download_descriptors() finished without either results or an error') - - try: - results = stem.descriptor.parse_file( - io.BytesIO(self.content), - self.descriptor_type, - validate = self.validate, - document_handler = self.document_handler, - **self.kwargs - ) - - for desc in results: - yield desc - except ValueError as exc: - self.error = exc # encountered a parsing error - - if suppress: - return - - raise self.error + for desc in self.downloaded: + yield desc
async def __aiter__(self) -> AsyncIterator[stem.descriptor.Descriptor]: async for desc in self._run(True): @@ -526,7 +508,7 @@ class Query(Synchronous): else: return random.choice(self.endpoints)
- async def _download_descriptors(self, retries: int, timeout: Optional[float]) -> None: + async def _download_descriptors(self, retries: int, timeout: Optional[float]) -> List['stem.descriptor.Descriptor']: self.start_time = time.time()
retries = self.retries @@ -545,17 +527,24 @@ class Query(Synchronous):
try: response = await asyncio.wait_for(self._download_from(endpoint), time_remaining) - self.content, self.reply_headers = _http_body_and_headers(response) + content, self.reply_headers = _http_body_and_headers(response)
- self.is_done = True self.runtime = time.time() - self.start_time
log.trace('Descriptors retrieved from %s in %0.2fs' % (downloaded_from, self.runtime)) - return + + try: + return list(stem.descriptor.parse_file( + io.BytesIO(content), + self.descriptor_type, + validate = self.validate, + document_handler = self.document_handler, + **self.kwargs + )) + except ValueError: + raise # parsing failed except asyncio.TimeoutError as exc: - self.is_done = True - self.error = stem.DownloadTimeout(downloaded_from, exc, sys.exc_info()[2], self.timeout) - return + raise stem.DownloadTimeout(downloaded_from, exc, sys.exc_info()[2], self.timeout) except: exception = sys.exc_info()[1] retries -= 1 @@ -568,9 +557,7 @@ class Query(Synchronous): else: log.debug("Failed to download descriptors from '%s': %s" % (self.download_url, exception))
- self.is_done = True - self.error = exception - return + raise
async def _download_from(self, endpoint: stem.Endpoint) -> bytes: http_request = '\r\n'.join((
tor-commits@lists.torproject.org