[tor-commits] [ooni-probe/master] twisted Headers() class edited to avoid header fix in reference to: https://trac.torproject.org/projects/tor/ticket/7432

art at torproject.org art at torproject.org
Mon Nov 19 18:03:50 UTC 2012


commit 07ed416a6ad4735d1bd6ca0dcdd07e20be05b788
Author: Giovanni Pellerano <evilaliv3 at evilretis.(none)>
Date:   Mon Nov 19 14:23:46 2012 +0100

    twisted Headers() class edited to avoid header fix in reference to: https://trac.torproject.org/projects/tor/ticket/7432
---
 ooni/lib/txagentwithsocks.py |  164 +++++++++++++++++++++++++++---------------
 1 files changed, 106 insertions(+), 58 deletions(-)

diff --git a/ooni/lib/txagentwithsocks.py b/ooni/lib/txagentwithsocks.py
index fecd9fc..a1f7f48 100644
--- a/ooni/lib/txagentwithsocks.py
+++ b/ooni/lib/txagentwithsocks.py
@@ -6,10 +6,13 @@
 import struct
 
 from zope.interface import implements
-from twisted.web import client
+from twisted.web import client, _newclient, http_headers
+from twisted.web._newclient import Request, RequestNotSent, RequestGenerationFailed, TransportProxyProducer, STATUS
+from twisted.internet import protocol
 from twisted.internet.protocol import ClientFactory, Protocol
 from twisted.internet.endpoints import TCP4ClientEndpoint, SSL4ClientEndpoint, _WrappingProtocol, _WrappingFactory
 from twisted.internet import interfaces, defer
+from twisted.internet.defer import Deferred, succeed, fail, maybeDeferred
 
 class SOCKSError(Exception):
     def __init__(self, value):
@@ -24,7 +27,6 @@ class SOCKSv5ClientProtocol(_WrappingProtocol):
         self._host = host
         self._port = port
         self.ready = False
-        self.buf = []
 
     def socks_state_0(self, data):
         # error state
@@ -55,20 +57,13 @@ class SOCKSv5ClientProtocol(_WrappingProtocol):
         self._wrappedProtocol.transport = self.transport
         self._wrappedProtocol.connectionMade()
         
-        if self.buf != []:
-            self.transport.write(''.join(self.buf))
-            self.buf = []
-        
         self._connectedDeferred.callback(self._wrappedProtocol)
 
     def connectionMade(self):
         # We implement only Anonymous access
         self.transport.write(struct.pack("!BB", 5, len("\x00")) + "\x00")
-        
-        self.state = self.state + 1
 
-    def connectionLost(self, reason):
-        pass
+        self.state = self.state + 1
 
     def write(self, data):
         if self.ready:
@@ -79,7 +74,7 @@ class SOCKSv5ClientProtocol(_WrappingProtocol):
     def dataReceived(self, data):
         if self.state != 3:
             getattr(self, 'socks_state_%s' % (self.state),
-             self.socks_state_0)(data)
+                    self.socks_state_0)(data)
             self.state = self.state + 1
         else:
             self._wrappedProtocol.dataReceived(data)
@@ -92,46 +87,20 @@ class SOCKSv5ClientFactory(_WrappingFactory):
         self._host, self._port = host, port
 
     def buildProtocol(self, addr):
-        """
-        Proxy C{buildProtocol} to our C{self._wrappedFactory} or errback
-        the C{self._onConnection} L{Deferred}.
-
-        @return: An instance of L{_WrappingProtocol} or C{None}
-        """''
         try:
             proto = self._wrappedFactory.buildProtocol(addr)
         except:
             self._onConnection.errback()
         else:
-            print self._host
             return self.protocol(self._onConnection, proto,
                                  self._host, self._port)
 
 class SOCKS5ClientEndpoint(object):
-    """
-    TCP client endpoint with an IPv4 configuration.
-    """
     implements(interfaces.IStreamClientEndpoint)
 
     def __init__(self, reactor, sockhost, sockport,
-                 host, port, timeout=30,bindAddress=None):
-        """
-        @param reactor: An L{IReactorTCP} provider
-
-        @param host: A hostname, used when connecting
-        @type host: str
-
-        @param port: The port number, used when connecting
-        @type port: int
+                 host, port, timeout=30, bindAddress=None):
 
-        @param timeout: The number of seconds to wait before assuming the
-            connection has failed.
-        @type timeout: int
-
-        @param bindAddress: A (host, port) tuple of local address to bind to,
-            or None.
-        @type bindAddress: tuple
-        """
         self._reactor = reactor
         self._sockhost = sockhost
         self._sockport = sockport
@@ -141,9 +110,6 @@ class SOCKS5ClientEndpoint(object):
         self._bindAddress = bindAddress
 
     def connect(self, protocolFactory):
-        """
-        Implement L{IStreamClientEndpoint.connect} to connect via TCP.
-        """
         try:
             wf = SOCKSv5ClientFactory(protocolFactory, self._host, self._port)
             self._reactor.connectTCP(
@@ -153,13 +119,94 @@ class SOCKS5ClientEndpoint(object):
         except:
             return defer.fail()
 
+class Headers(http_headers.Headers):
+    def __init__(self, rawHeaders=None):
+        self._rawHeaders = dict()
+        if rawHeaders is not None:
+            for name, values in rawHeaders.iteritems():
+                if type(values) is list:
+                  self.setRawHeaders(name, values[:])
+                elif type(values) is dict:
+                  self._rawHeaders[name.lower()] = values
+
+    def setRawHeaders(self, name, values):
+        if name.lower() not in self._rawHeaders:
+          self._rawHeaders[name.lower()] = dict()
+        self._rawHeaders[name.lower()]['name'] = name
+        self._rawHeaders[name.lower()]['values'] = values
+
+    def getAllRawHeaders(self):
+        for k, v in self._rawHeaders.iteritems():
+            yield v['name'], v['values']
+
+    def getRawHeaders(self, name, default=None):
+        if name.lower() in self._rawHeaders:
+            return self._rawHeaders[name.lower()]["values"]
+        return default
+class HTTPClientParser(_newclient.HTTPClientParser):
+    def connectionMade(self):
+        self.headers = Headers()
+        self.connHeaders = Headers()
+        self.state = STATUS
+        self._partialHeader = None
+
+    def headerReceived(self, name, value):
+        if self.isConnectionControlHeader(name):
+            headers = self.connHeaders
+        else:
+            headers = self.headers
+        headers.addRawHeader(name, value)
+
+
+class HTTP11ClientProtocol(_newclient.HTTP11ClientProtocol):
+    def request(self, request):
+        if self._state != 'QUIESCENT':
+            return fail(RequestNotSent())
+
+        self._state = 'TRANSMITTING'
+        _requestDeferred = maybeDeferred(request.writeTo, self.transport)
+        self._finishedRequest = Deferred()
+
+        self._currentRequest = request
+
+        self._transportProxy = TransportProxyProducer(self.transport)
+        self._parser = HTTPClientParser(request, self._finishResponse)
+        self._parser.makeConnection(self._transportProxy)
+        self._responseDeferred = self._parser._responseDeferred
+
+        def cbRequestWrotten(ignored):
+            if self._state == 'TRANSMITTING':
+                self._state = 'WAITING'
+                self._responseDeferred.chainDeferred(self._finishedRequest)
+
+        def ebRequestWriting(err):
+            if self._state == 'TRANSMITTING':
+                self._state = 'GENERATION_FAILED'
+                self.transport.loseConnection()
+                self._finishedRequest.errback(
+                    Failure(RequestGenerationFailed([err])))
+            else:
+                log.err(err, 'Error writing request, but not in valid state '
+                             'to finalize request: %s' % self._state)
+
+        _requestDeferred.addCallbacks(cbRequestWrotten, ebRequestWriting)
+
+        return self._finishedRequest
+
+class _HTTP11ClientFactory(client._HTTP11ClientFactory):
+    def buildProtocol(self, addr):
+        return HTTP11ClientProtocol(self._quiescentCallback)
+
+class HTTPConnectionPool(client.HTTPConnectionPool):
+    _factory = _HTTP11ClientFactory
+
 class Agent(client.Agent):
     def __init__(self, reactor,
                  contextFactory=client.WebClientContextFactory(),
                  connectTimeout=None, bindAddress=None,
                  pool=None, sockhost=None, sockport=None):
         if pool is None:
-            pool = client.HTTPConnectionPool(reactor, False)
+            pool = HTTPConnectionPool(reactor, False)
         self._reactor = reactor
         self._pool = pool
         self._contextFactory = contextFactory
@@ -169,22 +216,6 @@ class Agent(client.Agent):
         self._sockport = sockport
 
     def _getEndpoint(self, scheme, host, port):
-        """
-        Get an endpoint for the given host and port, using a transport
-        selected based on scheme.
-
-        @param scheme: A string like C{'http'} or C{'https'} (the only two
-            supported values) to use to determine how to establish the
-            connection.
-
-        @param host: A C{str} giving the hostname which will be connected to in
-            order to issue a request.
-
-        @param port: An C{int} giving the port number the connection will be
-            on.
-
-        @return: An endpoint which can be used to connect to given address.
-        """
         kwargs = {}
         if self._connectTimeout is not None:
             kwargs['timeout'] = self._connectTimeout
@@ -201,3 +232,20 @@ class Agent(client.Agent):
         else:
             raise SchemeNotSupported("Unsupported scheme: %r" % (scheme,))
 
+    def _requestWithEndpoint(self, key, endpoint, method, parsedURI,
+                             headers, bodyProducer, requestPath):
+        if headers is None:
+            headers = Headers()
+        if not headers.hasHeader('host'):
+            headers = headers.copy()
+            headers.addRawHeader(
+                'host', self._computeHostValue(parsedURI.scheme, parsedURI.host,
+                                               parsedURI.port))
+
+        d = self._pool.getConnection(key, endpoint)
+        def cbConnected(proto):
+            return proto.request(
+                Request(method, requestPath, headers, bodyProducer,
+                        persistent=self._pool.persistent))
+        d.addCallback(cbConnected)
+        return d





More information about the tor-commits mailing list