commit 26d2222480d6b10ef57097c9f40e44112408d1c1
Author: Illia Volochii <illia.volochii(a)gmail.com>
Date: Sun May 17 20:10:22 2020 +0300
Implement setting docstrings of `stem.control.Controller` methods
---
stem/control.py | 80 +++++++++++++++++++++++++++++++++++++++++++++++++++++++--
1 file changed, 78 insertions(+), 2 deletions(-)
diff --git a/stem/control.py b/stem/control.py
index bdcdfb75..6e15c16c 100644
--- a/stem/control.py
+++ b/stem/control.py
@@ -3861,7 +3861,18 @@ class AsyncController(BaseController):
return (set_events, failed_events)
+def _set_doc_from_async_controller(func: Callable) -> Callable:
+ func.__doc__ = getattr(AsyncController, func.__name__).__doc__
+ return func
+
+
class Controller(_BaseControllerSocketMixin, stem.util.AsyncClassWrapper):
+ """
+ Connection with Tor's control socket. This wraps
+ :class:`~stem.control.AsyncController` to provide a synchronous
+ interface and for backwards compatibility.
+ """
+
@classmethod
def from_port(cls: Type, address: str = '127.0.0.1', port: Union[int, str] = 'default') -> 'stem.control.Controller':
"""
@@ -3915,198 +3926,263 @@ class Controller(_BaseControllerSocketMixin, stem.util.AsyncClassWrapper):
self._wrapped_instance: AsyncController = self._init_async_class(AsyncController, control_socket, is_authenticated)
self._socket = self._wrapped_instance._socket
+ @_set_doc_from_async_controller
def msg(self, message: str) -> stem.response.ControlMessage:
return self._execute_async_method('msg', message)
+ @_set_doc_from_async_controller
def is_authenticated(self) -> bool:
return self._wrapped_instance.is_authenticated()
+ @_set_doc_from_async_controller
def connect(self) -> None:
self._execute_async_method('connect')
+ @_set_doc_from_async_controller
def reconnect(self, *args: Any, **kwargs: Any) -> None:
self._execute_async_method('reconnect', *args, **kwargs)
+ @_set_doc_from_async_controller
def close(self) -> None:
self._execute_async_method('close')
+ @_set_doc_from_async_controller
def get_latest_heartbeat(self) -> float:
return self._wrapped_instance.get_latest_heartbeat()
+ @_set_doc_from_async_controller
def add_status_listener(self, callback: Callable[['stem.control.BaseController', 'stem.control.State', float], None], spawn: bool = True) -> None:
self._wrapped_instance.add_status_listener(callback, spawn)
+ @_set_doc_from_async_controller
def remove_status_listener(self, callback: Callable[['stem.control.Controller', 'stem.control.State', float], None]) -> bool:
self._wrapped_instance.remove_status_listener(callback)
+ @_set_doc_from_async_controller
def authenticate(self, *args: Any, **kwargs: Any) -> None:
self._execute_async_method('authenticate', *args, **kwargs)
+ @_set_doc_from_async_controller
def get_info(self, params: Union[str, Sequence[str]], default: Any = UNDEFINED, get_bytes: bool = False) -> Union[str, Dict[str, str]]:
return self._execute_async_method('get_info', params, default, get_bytes)
+ @_set_doc_from_async_controller
def get_version(self, default: Any = UNDEFINED) -> stem.version.Version:
return self._execute_async_method('get_version', default)
+ @_set_doc_from_async_controller
def get_exit_policy(self, default: Any = UNDEFINED) -> stem.exit_policy.ExitPolicy:
return self._execute_async_method('get_exit_policy', default)
+ @_set_doc_from_async_controller
def get_ports(self, listener_type: 'stem.control.Listener', default: Any = UNDEFINED) -> Sequence[int]:
return self._execute_async_method('get_ports', listener_type, default)
+ @_set_doc_from_async_controller
def get_listeners(self, listener_type: 'stem.control.Listener', default: Any = UNDEFINED) -> Sequence[Tuple[str, int]]:
return self._execute_async_method('get_listeners', listener_type, default)
+ @_set_doc_from_async_controller
def get_accounting_stats(self, default: Any = UNDEFINED) -> 'stem.control.AccountingStats':
return self._execute_async_method('get_accounting_stats', default)
+ @_set_doc_from_async_controller
def get_protocolinfo(self, default: Any = UNDEFINED) -> stem.response.protocolinfo.ProtocolInfoResponse:
return self._execute_async_method('get_protocolinfo', default)
+ @_set_doc_from_async_controller
def get_user(self, default: Any = UNDEFINED) -> str:
return self._execute_async_method('get_user', default)
+ @_set_doc_from_async_controller
def get_pid(self, default: Any = UNDEFINED) -> int:
return self._execute_async_method('get_pid', default)
+ @_set_doc_from_async_controller
def get_start_time(self, default: Any = UNDEFINED) -> float:
return self._execute_async_method('get_start_time', default)
+ @_set_doc_from_async_controller
def get_uptime(self, default: Any = UNDEFINED) -> float:
return self._execute_async_method('get_uptime', default)
+ @_set_doc_from_async_controller
def is_user_traffic_allowed(self) -> 'stem.control.UserTrafficAllowed':
return self._execute_async_method('is_user_traffic_allowed')
+ @_set_doc_from_async_controller
def get_microdescriptor(self, relay: Optional[str] = None, default: Any = UNDEFINED) -> stem.descriptor.microdescriptor.Microdescriptor:
return self._execute_async_method('get_microdescriptor', relay, default)
+ @_set_doc_from_async_controller
def get_microdescriptors(self, default: Any = UNDEFINED) -> Iterator[stem.descriptor.microdescriptor.Microdescriptor]:
return self._execute_async_generator_method('get_microdescriptors', default)
+ @_set_doc_from_async_controller
def get_server_descriptor(self, relay: Optional[str] = None, default: Any = UNDEFINED) -> stem.descriptor.server_descriptor.RelayDescriptor:
return self._execute_async_method('get_server_descriptor', relay, default)
+ @_set_doc_from_async_controller
def get_server_descriptors(self, default: Any = UNDEFINED) -> Iterator[stem.descriptor.server_descriptor.RelayDescriptor]:
return self._execute_async_generator_method('get_server_descriptors', default)
+ @_set_doc_from_async_controller
def get_network_status(self, relay: Optional[str] = None, default: Any = UNDEFINED) -> stem.descriptor.router_status_entry.RouterStatusEntryV3:
return self._execute_async_method('get_network_status', relay, default)
+ @_set_doc_from_async_controller
def get_network_statuses(self, default: Any = UNDEFINED) -> Iterator[stem.descriptor.router_status_entry.RouterStatusEntryV3]:
return self._execute_async_generator_method('get_network_statuses', default)
+ @_set_doc_from_async_controller
def get_hidden_service_descriptor(self, address: str, default: Any = UNDEFINED, servers: Optional[Sequence[str]] = None, await_result: bool = True, timeout: Optional[float] = None) -> stem.descriptor.hidden_service.HiddenServiceDescriptorV2:
return self._execute_async_method('get_hidden_service_descriptor', address, default, servers, await_result, timeout)
+ @_set_doc_from_async_controller
def get_conf(self, param: str, default: Any = UNDEFINED, multiple: bool = False) -> Union[str, Sequence[str]]:
return self._execute_async_method('get_conf', param, default, multiple)
+ @_set_doc_from_async_controller
def get_conf_map(self, params: Union[str, Sequence[str]], default: Any = UNDEFINED, multiple: bool = True) -> Dict[str, Union[str, Sequence[str]]]:
return self._execute_async_method('get_conf_map', params, default, multiple)
+ @_set_doc_from_async_controller
def is_set(self, param: str, default: Any = UNDEFINED) -> bool:
return self._execute_async_method('is_set', param, default)
+ @_set_doc_from_async_controller
def set_conf(self, param: str, value: Union[str, Sequence[str]]) -> None:
self._execute_async_method('set_conf', param, value)
+ @_set_doc_from_async_controller
def reset_conf(self, *params: str) -> None:
self._execute_async_method('reset_conf', *params)
+ @_set_doc_from_async_controller
def set_options(self, params: Union[Mapping[str, Union[str, Sequence[str]]], Sequence[Tuple[str, Union[str, Sequence[str]]]]], reset: bool = False) -> None:
self._execute_async_method('set_options', params, reset)
+ @_set_doc_from_async_controller
def get_hidden_service_conf(self, default: Any = UNDEFINED) -> Dict[str, Any]:
return self._execute_async_method('get_hidden_service_conf', default)
+ @_set_doc_from_async_controller
def set_hidden_service_conf(self, conf: Mapping[str, Any]) -> None:
self._execute_async_method('set_hidden_service_conf', conf)
+ @_set_doc_from_async_controller
def create_hidden_service(self, path: str, port: int, target_address: Optional[str] = None, target_port: Optional[int] = None, auth_type: Optional[str] = None, client_names: Optional[Sequence[str]] = None) -> 'stem.control.CreateHiddenServiceOutput':
return self._execute_async_method('create_hidden_service', path, port, target_address, target_port, auth_type, client_names)
+ @_set_doc_from_async_controller
def remove_hidden_service(self, path: str, port: Optional[int] = None) -> bool:
return self._execute_async_method('remove_hidden_service', path, port)
+ @_set_doc_from_async_controller
def list_ephemeral_hidden_services(self, default: Any = UNDEFINED, our_services: bool = True, detached: bool = False) -> Sequence[str]:
return self._execute_async_method('list_ephemeral_hidden_services', default, our_services, detached)
+ @_set_doc_from_async_controller
def create_ephemeral_hidden_service(self, ports: Union[int, Sequence[int], Mapping[int, str]], key_type: str = 'NEW', key_content: str = 'BEST', discard_key: bool = False, detached: bool = False, await_publication: bool = False, timeout: Optional[float] = None, basic_auth: Optional[Mapping[str, str]] = None, max_streams: Optional[int] = None) -> stem.response.add_onion.AddOnionResponse:
return self._execute_async_method('create_ephemeral_hidden_service', ports, key_type, key_content, discard_key, detached, await_publication, timeout, basic_auth, max_streams)
+ @_set_doc_from_async_controller
def remove_ephemeral_hidden_service(self, service_id: str) -> bool:
return self._execute_async_method('remove_ephemeral_hidden_service', service_id)
- def add_event_listener(self, listener: Callable[[stem.response.events.Event], None], *events: 'stem.control.EventType') -> None:
+ @_set_doc_from_async_controller
+ def add_event_listener(self, listener: Callable[[stem.response.events.Event], Union[None, Awaitable[None]]], *events: 'stem.control.EventType') -> None:
self._execute_async_method('add_event_listener', listener, *events)
- def remove_event_listener(self, listener: Callable[[stem.response.events.Event], None]) -> None:
+ @_set_doc_from_async_controller
+ def remove_event_listener(self, listener: Callable[[stem.response.events.Event], Union[None, Awaitable[None]]]) -> None:
self._execute_async_method('remove_event_listener', listener)
+ @_set_doc_from_async_controller
def is_caching_enabled(self) -> bool:
return self._wrapped_instance.is_caching_enabled()
+ @_set_doc_from_async_controller
def set_caching(self, enabled: bool) -> None:
self._wrapped_instance.set_caching(enabled)
+ @_set_doc_from_async_controller
def clear_cache(self) -> None:
self._wrapped_instance.clear_cache()
+ @_set_doc_from_async_controller
def load_conf(self, configtext: str) -> None:
self._execute_async_method('load_conf', configtext)
+ @_set_doc_from_async_controller
def save_conf(self, force: bool = False) -> None:
return self._execute_async_method('save_conf', force)
+ @_set_doc_from_async_controller
def is_feature_enabled(self, feature: str) -> bool:
return self._wrapped_instance.is_feature_enabled(feature)
+ @_set_doc_from_async_controller
def enable_feature(self, features: Union[str, Sequence[str]]) -> None:
self._wrapped_instance.enable_feature(features)
+ @_set_doc_from_async_controller
def get_circuit(self, circuit_id: int, default: Any = UNDEFINED) -> stem.response.events.CircuitEvent:
return self._execute_async_method('get_circuit', circuit_id, default)
+ @_set_doc_from_async_controller
def get_circuits(self, default: Any = UNDEFINED) -> List[stem.response.events.CircuitEvent]:
return self._execute_async_method('get_circuits', default)
+ @_set_doc_from_async_controller
def new_circuit(self, path: Union[None, str, Sequence[str]] = None, purpose: str = 'general', await_build: bool = False, timeout: Optional[float] = None) -> str:
return self._execute_async_method('new_circuit', path, purpose, await_build, timeout)
+ @_set_doc_from_async_controller
def extend_circuit(self, circuit_id: str = '0', path: Union[None, str, Sequence[str]] = None, purpose: str = 'general', await_build: bool = False, timeout: Optional[float] = None) -> str:
return self._execute_async_method('extend_circuit', circuit_id, path, purpose, await_build, timeout)
+ @_set_doc_from_async_controller
def repurpose_circuit(self, circuit_id: str, purpose: str) -> None:
self._execute_async_method('repurpose_circuit', circuit_id, purpose)
+ @_set_doc_from_async_controller
def close_circuit(self, circuit_id: str, flag: str = '') -> None:
self._execute_async_method('close_circuit', circuit_id, flag)
+ @_set_doc_from_async_controller
def get_streams(self, default: Any = UNDEFINED) -> List[stem.response.events.StreamEvent]:
return self._execute_async_method('get_streams', default)
+ @_set_doc_from_async_controller
def attach_stream(self, stream_id: str, circuit_id: str, exiting_hop: Optional[int] = None) -> None:
self._execute_async_method('attach_stream', stream_id, circuit_id, exiting_hop)
+ @_set_doc_from_async_controller
def close_stream(self, stream_id: str, reason: stem.RelayEndReason = stem.RelayEndReason.MISC, flag: str = '') -> None:
self._execute_async_method('close_stream', stream_id, reason, flag)
+ @_set_doc_from_async_controller
def signal(self, signal: stem.Signal) -> None:
self._execute_async_method('signal', signal)
+ @_set_doc_from_async_controller
def is_newnym_available(self) -> bool:
return self._wrapped_instance.is_newnym_available()
+ @_set_doc_from_async_controller
def get_newnym_wait(self) -> float:
return self._wrapped_instance.get_newnym_wait()
+ @_set_doc_from_async_controller
def get_effective_rate(self, default: Any = UNDEFINED, burst: bool = False) -> int:
return self._execute_async_method('get_effective_rate', default, burst)
+ @_set_doc_from_async_controller
def map_address(self, mapping: Mapping[str, str]) -> Dict[str, str]:
return self._execute_async_method('map_address', mapping)
+ @_set_doc_from_async_controller
def drop_guards(self) -> None:
self._execute_async_method('drop_guards')