commit 07ed416a6ad4735d1bd6ca0dcdd07e20be05b788 Author: Giovanni Pellerano <evilaliv3@evilretis.(none)> Date: Mon Nov 19 14:23:46 2012 +0100
twisted Headers() class edited to avoid header fix in reference to: https://trac.torproject.org/projects/tor/ticket/7432 --- ooni/lib/txagentwithsocks.py | 164 +++++++++++++++++++++++++++--------------- 1 files changed, 106 insertions(+), 58 deletions(-)
diff --git a/ooni/lib/txagentwithsocks.py b/ooni/lib/txagentwithsocks.py index fecd9fc..a1f7f48 100644 --- a/ooni/lib/txagentwithsocks.py +++ b/ooni/lib/txagentwithsocks.py @@ -6,10 +6,13 @@ import struct
from zope.interface import implements -from twisted.web import client +from twisted.web import client, _newclient, http_headers +from twisted.web._newclient import Request, RequestNotSent, RequestGenerationFailed, TransportProxyProducer, STATUS +from twisted.internet import protocol from twisted.internet.protocol import ClientFactory, Protocol from twisted.internet.endpoints import TCP4ClientEndpoint, SSL4ClientEndpoint, _WrappingProtocol, _WrappingFactory from twisted.internet import interfaces, defer +from twisted.internet.defer import Deferred, succeed, fail, maybeDeferred
class SOCKSError(Exception): def __init__(self, value): @@ -24,7 +27,6 @@ class SOCKSv5ClientProtocol(_WrappingProtocol): self._host = host self._port = port self.ready = False - self.buf = []
def socks_state_0(self, data): # error state @@ -55,20 +57,13 @@ class SOCKSv5ClientProtocol(_WrappingProtocol): self._wrappedProtocol.transport = self.transport self._wrappedProtocol.connectionMade()
- if self.buf != []: - self.transport.write(''.join(self.buf)) - self.buf = [] - self._connectedDeferred.callback(self._wrappedProtocol)
def connectionMade(self): # We implement only Anonymous access self.transport.write(struct.pack("!BB", 5, len("\x00")) + "\x00") - - self.state = self.state + 1
- def connectionLost(self, reason): - pass + self.state = self.state + 1
def write(self, data): if self.ready: @@ -79,7 +74,7 @@ class SOCKSv5ClientProtocol(_WrappingProtocol): def dataReceived(self, data): if self.state != 3: getattr(self, 'socks_state_%s' % (self.state), - self.socks_state_0)(data) + self.socks_state_0)(data) self.state = self.state + 1 else: self._wrappedProtocol.dataReceived(data) @@ -92,46 +87,20 @@ class SOCKSv5ClientFactory(_WrappingFactory): self._host, self._port = host, port
def buildProtocol(self, addr): - """ - Proxy C{buildProtocol} to our C{self._wrappedFactory} or errback - the C{self._onConnection} L{Deferred}. - - @return: An instance of L{_WrappingProtocol} or C{None} - """'' try: proto = self._wrappedFactory.buildProtocol(addr) except: self._onConnection.errback() else: - print self._host return self.protocol(self._onConnection, proto, self._host, self._port)
class SOCKS5ClientEndpoint(object): - """ - TCP client endpoint with an IPv4 configuration. - """ implements(interfaces.IStreamClientEndpoint)
def __init__(self, reactor, sockhost, sockport, - host, port, timeout=30,bindAddress=None): - """ - @param reactor: An L{IReactorTCP} provider - - @param host: A hostname, used when connecting - @type host: str - - @param port: The port number, used when connecting - @type port: int + host, port, timeout=30, bindAddress=None):
- @param timeout: The number of seconds to wait before assuming the - connection has failed. - @type timeout: int - - @param bindAddress: A (host, port) tuple of local address to bind to, - or None. - @type bindAddress: tuple - """ self._reactor = reactor self._sockhost = sockhost self._sockport = sockport @@ -141,9 +110,6 @@ class SOCKS5ClientEndpoint(object): self._bindAddress = bindAddress
def connect(self, protocolFactory): - """ - Implement L{IStreamClientEndpoint.connect} to connect via TCP. - """ try: wf = SOCKSv5ClientFactory(protocolFactory, self._host, self._port) self._reactor.connectTCP( @@ -153,13 +119,94 @@ class SOCKS5ClientEndpoint(object): except: return defer.fail()
+class Headers(http_headers.Headers): + def __init__(self, rawHeaders=None): + self._rawHeaders = dict() + if rawHeaders is not None: + for name, values in rawHeaders.iteritems(): + if type(values) is list: + self.setRawHeaders(name, values[:]) + elif type(values) is dict: + self._rawHeaders[name.lower()] = values + + def setRawHeaders(self, name, values): + if name.lower() not in self._rawHeaders: + self._rawHeaders[name.lower()] = dict() + self._rawHeaders[name.lower()]['name'] = name + self._rawHeaders[name.lower()]['values'] = values + + def getAllRawHeaders(self): + for k, v in self._rawHeaders.iteritems(): + yield v['name'], v['values'] + + def getRawHeaders(self, name, default=None): + if name.lower() in self._rawHeaders: + return self._rawHeaders[name.lower()]["values"] + return default +class HTTPClientParser(_newclient.HTTPClientParser): + def connectionMade(self): + self.headers = Headers() + self.connHeaders = Headers() + self.state = STATUS + self._partialHeader = None + + def headerReceived(self, name, value): + if self.isConnectionControlHeader(name): + headers = self.connHeaders + else: + headers = self.headers + headers.addRawHeader(name, value) + + +class HTTP11ClientProtocol(_newclient.HTTP11ClientProtocol): + def request(self, request): + if self._state != 'QUIESCENT': + return fail(RequestNotSent()) + + self._state = 'TRANSMITTING' + _requestDeferred = maybeDeferred(request.writeTo, self.transport) + self._finishedRequest = Deferred() + + self._currentRequest = request + + self._transportProxy = TransportProxyProducer(self.transport) + self._parser = HTTPClientParser(request, self._finishResponse) + self._parser.makeConnection(self._transportProxy) + self._responseDeferred = self._parser._responseDeferred + + def cbRequestWrotten(ignored): + if self._state == 'TRANSMITTING': + self._state = 'WAITING' + self._responseDeferred.chainDeferred(self._finishedRequest) + + def ebRequestWriting(err): + if self._state == 'TRANSMITTING': + self._state = 'GENERATION_FAILED' + self.transport.loseConnection() + self._finishedRequest.errback( + Failure(RequestGenerationFailed([err]))) + else: + log.err(err, 'Error writing request, but not in valid state ' + 'to finalize request: %s' % self._state) + + _requestDeferred.addCallbacks(cbRequestWrotten, ebRequestWriting) + + return self._finishedRequest + +class _HTTP11ClientFactory(client._HTTP11ClientFactory): + def buildProtocol(self, addr): + return HTTP11ClientProtocol(self._quiescentCallback) + +class HTTPConnectionPool(client.HTTPConnectionPool): + _factory = _HTTP11ClientFactory + class Agent(client.Agent): def __init__(self, reactor, contextFactory=client.WebClientContextFactory(), connectTimeout=None, bindAddress=None, pool=None, sockhost=None, sockport=None): if pool is None: - pool = client.HTTPConnectionPool(reactor, False) + pool = HTTPConnectionPool(reactor, False) self._reactor = reactor self._pool = pool self._contextFactory = contextFactory @@ -169,22 +216,6 @@ class Agent(client.Agent): self._sockport = sockport
def _getEndpoint(self, scheme, host, port): - """ - Get an endpoint for the given host and port, using a transport - selected based on scheme. - - @param scheme: A string like C{'http'} or C{'https'} (the only two - supported values) to use to determine how to establish the - connection. - - @param host: A C{str} giving the hostname which will be connected to in - order to issue a request. - - @param port: An C{int} giving the port number the connection will be - on. - - @return: An endpoint which can be used to connect to given address. - """ kwargs = {} if self._connectTimeout is not None: kwargs['timeout'] = self._connectTimeout @@ -201,3 +232,20 @@ class Agent(client.Agent): else: raise SchemeNotSupported("Unsupported scheme: %r" % (scheme,))
+ def _requestWithEndpoint(self, key, endpoint, method, parsedURI, + headers, bodyProducer, requestPath): + if headers is None: + headers = Headers() + if not headers.hasHeader('host'): + headers = headers.copy() + headers.addRawHeader( + 'host', self._computeHostValue(parsedURI.scheme, parsedURI.host, + parsedURI.port)) + + d = self._pool.getConnection(key, endpoint) + def cbConnected(proto): + return proto.request( + Request(method, requestPath, headers, bodyProducer, + persistent=self._pool.persistent)) + d.addCallback(cbConnected) + return d