[tor-commits] [oonib/master] Feature/web connectivity (#63)

art at torproject.org art at torproject.org
Mon May 30 13:44:56 UTC 2016


commit 5178fc299d361c9ab57002e619e40b05061f5a7d
Author: Arturo Filastò <arturo at filasto.net>
Date:   Sat May 28 19:05:42 2016 +0200

    Feature/web connectivity (#63)
    
    * Add web connectivity test helper
    
    * Forward test helper addresses to the host machine
    
    * Align the request headers used by the web_connectivity test helper to those of the probe
    
    * Add monkey patch for bug in twisted RedirectAgent:
    
    https://twistedmatrix.com/trac/ticket/8265
    
    * Add monkey patch for HTTPClientParser.statusReceived
    
    * Use TrueHeaders in control request
    
    * Add support for specifying endpoints in web_connectivity test helper
    
    * Add endpoint for checking the status of the WebConnectivity test helper
    
    * Add support for parsing test-helpers-alternate
    
    * Fix key for web_connectivity test in example configuration file
    
    * Implement on-disk web_connectivity cache
    
    * Add support for Gzip content decoding
    
    * Also record CNAME resolution.
    
    * Rename ips to addrs
    
    * Add support for retries in the http_request
    
    * Add support for extracting title
    
    * Encode the responses as well when debug mode
    
    * Handle partial downloads
    
    * Ignore errors when encoding headers
    
    * Cast title to unicode and ignore errors
    
    * Improvements based on feedback and comments by @bassosimone
    
    * Move twisted related patches into txextra module
    
    * Add support for returning the responses based on a key sent by the
      client.
    
    * Inherit from OONIBHandler so we can get the error message format
    
    * Stylistic improvements
    
    * Set defaults in a way that oonib.conf can start from the example
    
    * Avoid doing join on nonetype
    
    * Address comments by @bassosimone
    
    * Properly set the body also when we get a partial body downloaded
    
    * Move more code into shared oonib.common module
    
    * Fully sync the common module with ooni-probe (pulling in also other
      shared functionality so the Agents match entirely)
    
    * Fix location of comment for the patched HTTPClient
    
    * Add unittests and integration tests for web_connectivity
---
 Vagrantfile                         |  25 +++
 oonib.conf.example                  |  18 +-
 oonib/bouncer/handlers.py           |  14 +-
 oonib/common/__init__.py            |  10 ++
 oonib/common/http_utils.py          |  54 ++++++
 oonib/common/tcp_utils.py           |  10 ++
 oonib/common/txextra.py             | 202 +++++++++++++++++++++
 oonib/oonibackend.py                |  29 ++-
 oonib/test/test_web_connectivity.py |  71 ++++++++
 oonib/testhelpers/http_helpers.py   | 344 +++++++++++++++++++++++++++++++++++-
 10 files changed, 758 insertions(+), 19 deletions(-)

diff --git a/Vagrantfile b/Vagrantfile
index b71e4e7..7aa5f71 100644
--- a/Vagrantfile
+++ b/Vagrantfile
@@ -4,6 +4,31 @@
 Vagrant.configure("2") do |config|
   config.vm.box = "precise32"
   config.vm.box_url = "http://files.vagrantup.com/precise32.box"
+
+  # Create a forwarded port mapping which allows access to a specific port
+  # within the machine from a port on the host machine. In the example below,
+  # accessing "localhost:8080" will access port 80 on the guest machine.
+  config.vm.network :forwarded_port, guest: 57001, host: 57001
+  config.vm.network :forwarded_port, guest: 57001, host: 57002
+  config.vm.network :forwarded_port, guest: 57001, host: 57003
+  config.vm.network :forwarded_port, guest: 57004, host: 57004
+  config.vm.network :forwarded_port, guest: 57005, host: 57005
+  config.vm.network :forwarded_port, guest: 57006, host: 57006
+  config.vm.network :forwarded_port, guest: 57007, host: 57007
+
+  # Create a private network, which allows host-only access to the machine
+  # using a specific IP.
+  # config.vm.network :private_network, ip: "192.168.33.10"
+
+  # Create a public network, which generally matched to bridged network.
+  # Bridged networks make the machine appear as another physical device on
+  # your network.
+  # config.vm.network :public_network
+
+  # Share an additional folder to the guest VM. The first argument is
+  # the path on the host to the actual folder. The second argument is
+  # the path on the guest to mount the folder. And the optional third
+  # argument is a set of non-required options.
   config.vm.synced_folder ".", "/data/oonib"
 end
 
diff --git a/oonib.conf.example b/oonib.conf.example
index 1925d81..6f4c7f3 100644
--- a/oonib.conf.example
+++ b/oonib.conf.example
@@ -30,12 +30,13 @@ main:
     tor_datadir: null
 
     bouncer_endpoints:
-    - {type: tls, port: 10443, cert: "private/ssl-key-and-cert.pem"}
+    #- {type: tls, port: 10443, cert: "private/ssl-key-and-cert.pem"}
     - {type: tcp, port: 10080}
-    - {type: onion, hsdir: bouncer}
+    - {type: onion, hsdir: /tmp/bouncer}
 
     collector_endpoints:
-    - {type: tls, port: 11443, cert: "private/ssl-key-and-cert.pem"}
+    #- {type: tls, port: 11443, cert: "private/ssl-key-and-cert.pem"}
+    - {type: onion, hsdir: /tmp/collector}
 
     report_file_template: '{iso8601_timestamp}-{test_name}-{report_id}-{probe_asn}-{probe_cc}-probe-0.2.0.{ext}'
 helpers:
@@ -62,12 +63,17 @@ helpers:
 
     dns_discovery:
         address: null
-        udp_port: 53
-        tcp_port: 53
+        udp_port: null
+        tcp_port: null
         resolver_address: null
 
     ssl:
         address: null
         private_key: 'private.key'
         certificate: 'certificate.crt'
-        port: 57006
+        #port: 57006
+        port: null
+
+    web_connectivity:
+        endpoints:
+        - {type: tcp, port: 57007}
diff --git a/oonib/bouncer/handlers.py b/oonib/bouncer/handlers.py
index 1c7627c..a9370c0 100644
--- a/oonib/bouncer/handlers.py
+++ b/oonib/bouncer/handlers.py
@@ -159,9 +159,12 @@ class Bouncer(object):
                 requested_nettest['input-hashes'],
                 requested_nettest['test-helpers'])
             test_helpers = {}
+            test_helpers_alternate = {}
             for test_helper in requested_nettest['test-helpers']:
+                collector_info = self.bouncerFile['collector'][collector]
                 try:
-                    test_helpers[test_helper] = self.bouncerFile['collector'][collector]['test-helper'][test_helper]
+                    test_helpers[test_helper] = \
+                        collector_info['test-helper'][test_helper]
                 except KeyError:
                     helpers = self.knownHelpers.get(test_helper)
                     if not helpers:
@@ -169,12 +172,19 @@ class Bouncer(object):
                     helper = random.choice(helpers)
                     test_helpers[test_helper] = helper['helper-address']
 
+                try:
+                    test_helpers_alternate[test_helper] = \
+                        collector_info['test-helper-alternate'][test_helper]
+                except KeyError:
+                    pass
+
             nettest = {
                 'name': requested_nettest['name'],
                 'version': requested_nettest['version'],
                 'input-hashes': requested_nettest['input-hashes'],
                 'test-helpers': test_helpers,
-                'collector': collector,
+                'test-helpers-alternate': test_helpers_alternate,
+                'collector': collector
             }
             nettests.append(nettest)
         return {'net-tests': nettests}
diff --git a/oonib/common/__init__.py b/oonib/common/__init__.py
new file mode 100644
index 0000000..7f6cf73
--- /dev/null
+++ b/oonib/common/__init__.py
@@ -0,0 +1,10 @@
+"""
+This modules contains functionality that is shared amongst ooni-probe and
+ooni-backend. If the code in here starts growing too much I think it would
+make sense to either:
+
+ * Make the code in here into it's own package that is imported by
+ ooni-probe and ooni-backend.
+
+ * Merge ooniprobe with oonibackend.
+"""
diff --git a/oonib/common/http_utils.py b/oonib/common/http_utils.py
new file mode 100644
index 0000000..6f9db2d
--- /dev/null
+++ b/oonib/common/http_utils.py
@@ -0,0 +1,54 @@
+import re
+import codecs
+from base64 import b64encode
+
+META_CHARSET_REGEXP = re.compile('<meta(?!\s*(?:name|value)\s*=)[^>]*?charset\s*=[\s"\']*([^\s"\'/>]*)')
+
+def representBody(body):
+    if not body:
+        return body
+    body = body.replace('\0', '')
+    decoded = False
+    charsets = ['ascii', 'utf-8']
+
+    # If we are able to detect the charset of body from the meta tag
+    # try to decode using that one first
+    charset = META_CHARSET_REGEXP.search(body, re.IGNORECASE)
+    if charset:
+        try:
+            encoding = charset.group(1).lower()
+            codecs.lookup(encoding)
+            charsets.insert(0, encoding)
+        except (LookupError, IndexError):
+            # Skip invalid codecs and partial regexp match
+            pass
+
+    for encoding in charsets:
+        try:
+            body = unicode(body, encoding)
+            decoded = True
+            break
+        except UnicodeDecodeError:
+            pass
+    if not decoded:
+        body = {
+            'data': b64encode(body),
+            'format': 'base64'
+        }
+    return body
+
+TITLE_REGEXP = re.compile("<title>(.*?)</title>", re.IGNORECASE | re.DOTALL)
+
+def extractTitle(body):
+    m = TITLE_REGEXP.search(body, re.IGNORECASE | re.DOTALL)
+    if m:
+        return unicode(m.group(1), errors='ignore')
+    return ''
+
+REQUEST_HEADERS = {
+    'User-Agent': ['Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, '
+                   'like Gecko) Chrome/47.0.2526.106 Safari/537.36'],
+    'Accept-Language': ['en-US;q=0.8,en;q=0.5'],
+    'Accept': ['text/html,application/xhtml+xml,application/xml;q=0.9,'
+               '*/*;q=0.8']
+}
diff --git a/oonib/common/tcp_utils.py b/oonib/common/tcp_utils.py
new file mode 100644
index 0000000..7b7a8a4
--- /dev/null
+++ b/oonib/common/tcp_utils.py
@@ -0,0 +1,10 @@
+from twisted.internet.protocol import Factory, Protocol
+
+class TCPConnectProtocol(Protocol):
+    def connectionMade(self):
+        self.transport.loseConnection()
+
+class TCPConnectFactory(Factory):
+    noisy = False
+    def buildProtocol(self, addr):
+        return TCPConnectProtocol()
diff --git a/oonib/common/txextra.py b/oonib/common/txextra.py
new file mode 100644
index 0000000..7a84592
--- /dev/null
+++ b/oonib/common/txextra.py
@@ -0,0 +1,202 @@
+import itertools
+from copy import copy
+
+from twisted.web.http_headers import Headers
+from twisted.web import error
+
+from twisted.web.client import BrowserLikeRedirectAgent
+from twisted.web._newclient import ResponseFailed
+from twisted.web._newclient import HTTPClientParser, ParseError
+from twisted.python.failure import Failure
+
+from twisted.web import client, _newclient
+
+from twisted.web._newclient import RequestNotSent, RequestGenerationFailed
+from twisted.web._newclient import TransportProxyProducer, STATUS
+
+from twisted.internet import reactor
+from twisted.internet.defer import Deferred, fail, maybeDeferred, failure
+
+from twisted.python import log
+
+class TrueHeaders(Headers):
+    def __init__(self, rawHeaders=None):
+        self._rawHeaders = dict()
+        if rawHeaders is not None:
+            for name, values in rawHeaders.iteritems():
+                if type(values) is list:
+                    self.setRawHeaders(name, values[:])
+                elif type(values) is str:
+                    self.setRawHeaders(name, values)
+
+    def setRawHeaders(self, name, values):
+        if name.lower() not in self._rawHeaders:
+            self._rawHeaders[name.lower()] = dict()
+        self._rawHeaders[name.lower()]['name'] = name
+        self._rawHeaders[name.lower()]['values'] = values
+
+    def getAllRawHeaders(self):
+        for _, v in self._rawHeaders.iteritems():
+            yield v['name'], v['values']
+
+    def getRawHeaders(self, name, default=None):
+        if name.lower() in self._rawHeaders:
+            return self._rawHeaders[name.lower()]['values']
+        return default
+
+
+    def getDiff(self, headers, ignore=[]):
+        """
+
+        Args:
+
+            headers: a TrueHeaders object
+
+            ignore: specify a list of header fields to ignore
+
+        Returns:
+
+            a set containing the header names that are not present in
+            header_dict or not present in self.
+        """
+        diff = set()
+        field_names = []
+
+        headers_a = copy(self)
+        headers_b = copy(headers)
+        for name in ignore:
+            try:
+                del headers_a._rawHeaders[name.lower()]
+            except KeyError:
+                pass
+            try:
+                del headers_b._rawHeaders[name.lower()]
+            except KeyError:
+                pass
+
+        for k, v in itertools.chain(headers_a.getAllRawHeaders(),
+                                    headers_b.getAllRawHeaders()):
+            field_names.append(k)
+
+        for name in field_names:
+            if self.getRawHeaders(name) and headers.getRawHeaders(name):
+                pass
+            else:
+                diff.add(name)
+        return list(diff)
+
+class HTTPClientParser(_newclient.HTTPClientParser):
+    def logPrefix(self):
+        return 'HTTPClientParser'
+
+    def connectionMade(self):
+        self.headers = TrueHeaders()
+        self.connHeaders = TrueHeaders()
+        self.state = STATUS
+        self._partialHeader = None
+
+    def headerReceived(self, name, value):
+        if self.isConnectionControlHeader(name.lower()):
+            headers = self.connHeaders
+        else:
+            headers = self.headers
+        headers.addRawHeader(name, value)
+
+    def statusReceived(self, status):
+        # This is a fix for invalid number of parts
+        try:
+            return _newclient.HTTPClientParser.statusReceived(self, status)
+        except ParseError as exc:
+            if exc.args[0] == 'wrong number of parts':
+                return _newclient.HTTPClientParser.statusReceived(self,
+                                                                  status + " XXX")
+            raise
+
+class HTTP11ClientProtocol(_newclient.HTTP11ClientProtocol):
+    def request(self, request):
+        if self._state != 'QUIESCENT':
+            return fail(RequestNotSent())
+
+        self._state = 'TRANSMITTING'
+        _requestDeferred = maybeDeferred(request.writeTo, self.transport)
+        self._finishedRequest = Deferred()
+
+        self._currentRequest = request
+
+        self._transportProxy = TransportProxyProducer(self.transport)
+        self._parser = HTTPClientParser(request, self._finishResponse)
+        self._parser.makeConnection(self._transportProxy)
+        self._responseDeferred = self._parser._responseDeferred
+
+        def cbRequestWrotten(ignored):
+            if self._state == 'TRANSMITTING':
+                self._state = 'WAITING'
+                self._responseDeferred.chainDeferred(self._finishedRequest)
+
+        def ebRequestWriting(err):
+            if self._state == 'TRANSMITTING':
+                self._state = 'GENERATION_FAILED'
+                self.transport.loseConnection()
+                self._finishedRequest.errback(
+                    failure.Failure(RequestGenerationFailed([err])))
+            else:
+                log.err(err, 'Error writing request, but not in valid state '
+                             'to finalize request: %s' % self._state)
+
+        _requestDeferred.addCallbacks(cbRequestWrotten, ebRequestWriting)
+
+        return self._finishedRequest
+
+
+class _HTTP11ClientFactory(client._HTTP11ClientFactory):
+    noisy = False
+
+    def buildProtocol(self, addr):
+        return HTTP11ClientProtocol(self._quiescentCallback)
+
+
+class HTTPConnectionPool(client.HTTPConnectionPool):
+    _factory = _HTTP11ClientFactory
+
+class TrueHeadersAgent(client.Agent):
+    def __init__(self, *args, **kw):
+        super(TrueHeadersAgent, self).__init__(*args, **kw)
+        self._pool = HTTPConnectionPool(reactor, False)
+
+class FixedRedirectAgent(BrowserLikeRedirectAgent):
+    """
+    This is a redirect agent with this patch manually applied:
+    https://twistedmatrix.com/trac/ticket/8265
+    """
+    def _handleRedirect(self, response, method, uri, headers, redirectCount):
+        """
+        Handle a redirect response, checking the number of redirects already
+        followed, and extracting the location header fields.
+
+        This is patched to fix a bug in infinite redirect loop.
+        """
+        if redirectCount >= self._redirectLimit:
+            err = error.InfiniteRedirection(
+                response.code,
+                b'Infinite redirection detected',
+                location=uri)
+            raise ResponseFailed([Failure(err)], response)
+        locationHeaders = response.headers.getRawHeaders(b'location', [])
+        if not locationHeaders:
+            err = error.RedirectWithNoLocation(
+                response.code, b'No location header field', uri)
+            raise ResponseFailed([Failure(err)], response)
+        location = self._resolveLocation(
+            # This is the fix to properly handle redirects
+            response.request.absoluteURI,
+            locationHeaders[0]
+        )
+        deferred = self._agent.request(method, location, headers)
+
+        def _chainResponse(newResponse):
+            newResponse.setPreviousResponse(response)
+            return newResponse
+
+        deferred.addCallback(_chainResponse)
+        return deferred.addCallback(
+            self._handleResponse, method, uri, headers, redirectCount + 1)
diff --git a/oonib/oonibackend.py b/oonib/oonibackend.py
index 601a972..ebf3c52 100644
--- a/oonib/oonibackend.py
+++ b/oonib/oonibackend.py
@@ -118,15 +118,22 @@ def getHSEndpoint(endpoint_config):
                                         data_dir=hsdir)
 
 def getTCPEndpoint(endpoint_config):
-    return endpoints.TCP4ServerEndpoint(reactor, endpoint_config['port'])
+    return endpoints.TCP4ServerEndpoint(
+                reactor=reactor,
+                port=endpoint_config['port'],
+                interface=endpoint_config.get('address', '')
+    )
 
 def getTLSEndpoint(endpoint_config):
     with open(endpoint_config['cert'], 'r') as f:
         cert_data = f.read()
     certificate = ssl.PrivateCertificate.loadPEM(cert_data)
-    return endpoints.SSL4ServerEndpoint(reactor,
-                                        endpoint_config['port'],
-                                        certificate.options())
+    return endpoints.SSL4ServerEndpoint(
+        reactor=reactor,
+        port=endpoint_config['port'],
+        sslContextFactory=certificate.options(),
+        interface=endpoint_config.get('address', '')
+    )
 
 def getEndpoint(endpoint_config):
     if endpoint_config['type'] == 'onion':
@@ -143,6 +150,8 @@ def createService(endpoint, role, endpoint_config):
         factory = ooniBouncer
     elif role == 'collector':
         factory = ooniBackend
+    elif role == 'web_connectivity':
+        factory = http_helpers.WebConnectivityHelper
     else:
         raise Exception("unknown service type")
 
@@ -157,8 +166,11 @@ def createService(endpoint, role, endpoint_config):
 if config.main.tor_hidden_service and \
         config.main.bouncer_endpoints is None and \
         config.main.collector_endpoints is None:
-    bouncer_hsdir   = os.path.join(config.main.tor_datadir, 'bouncer')
-    collector_hsdir = os.path.join(config.main.tor_datadir, 'collector')
+    base_dir = '.'
+    if config.main.tor_datadir is not None:
+        base_dir = config.main.tor_datadir
+    bouncer_hsdir   = os.path.join(base_dir, 'bouncer')
+    collector_hsdir = os.path.join(base_dir, 'collector')
     config.main.bouncer_endpoints   = [ {'type': 'onion', 'hsdir':   bouncer_hsdir} ]
     config.main.collector_endpoints = [ {'type': 'onion', 'hsdir': collector_hsdir} ]
 
@@ -174,3 +186,8 @@ for endpoint_config in config.main.get('collector_endpoints', []):
     print "Starting collector with config %s" % endpoint_config
     endpoint = getEndpoint(endpoint_config)
     createService(endpoint, 'collector', endpoint_config)
+
+for endpoint_config in config.helpers.web_connectivity.get('endpoints', []):
+    print "Starting web_connectivity helper with config %s" % endpoint_config
+    endpoint = getEndpoint(endpoint_config)
+    createService(endpoint, 'web_connectivity', endpoint_config)
diff --git a/oonib/test/test_web_connectivity.py b/oonib/test/test_web_connectivity.py
new file mode 100644
index 0000000..24e2be0
--- /dev/null
+++ b/oonib/test/test_web_connectivity.py
@@ -0,0 +1,71 @@
+from hashlib import sha256
+from twisted.internet import defer
+from twisted.trial import unittest
+
+from oonib.testhelpers.http_helpers import WebConnectivityCache
+
+class WebConnectivityCacheTestCase(unittest.TestCase):
+    def setUp(self):
+        self.web_connectivity_cache = WebConnectivityCache()
+
+    def tearDown(self):
+        return self.web_connectivity_cache.expire_all()
+
+    @defer.inlineCallbacks
+    def test_http_request(self):
+        value = yield self.web_connectivity_cache.http_request(
+                    'https://www.google.com/humans.txt')
+        self.assertEqual(
+            value['body_length'], 286
+        )
+        self.assertEqual(
+            value['status_code'], 200
+        )
+        self.assertIsInstance(value['headers'],
+                              dict)
+
+    @defer.inlineCallbacks
+    def test_dns_consistency(self):
+        # The twisted.names resolve set a reactor.callLater() on parsing the
+        #  resolv.conf and this leads to the reactor being dirty. Look into
+        # a clean way to solve this and reactive this integration test.
+        self.skipTest("Skipping to avoid dirty reactor")
+        value = yield self.web_connectivity_cache.dns_consistency(
+                    'www.torproject.org')
+        self.assertIsInstance(
+            value['addrs'],
+            list
+        )
+        self.assertIn(
+            'failure',
+            value.keys()
+        )
+
+    @defer.inlineCallbacks
+    def test_tcp_connect(self):
+        value = yield self.web_connectivity_cache.tcp_connect(
+                    '216.58.213.14:80')
+        self.assertIsInstance(
+            value['status'],
+            bool
+        )
+        self.assertIn(
+            'failure',
+            value.keys()
+        )
+
+    @defer.inlineCallbacks
+    def test_cache_lifecycle(self):
+        key = 'http://example.com'
+        key_hash = sha256(key).hexdigest()
+        value = {'spam': 'ham'}
+
+        miss = yield self.web_connectivity_cache.lookup('http_request', key)
+        self.assertEqual(miss, None)
+
+        yield self.web_connectivity_cache.cache_value('http_request', key,
+                                                      value)
+        hit = yield self.web_connectivity_cache.lookup('http_request', key)
+        self.assertEqual(hit, value)
+
+        yield self.web_connectivity_cache.expire('http_request', key_hash)
diff --git a/oonib/testhelpers/http_helpers.py b/oonib/testhelpers/http_helpers.py
index a28cbad..1cddf24 100644
--- a/oonib/testhelpers/http_helpers.py
+++ b/oonib/testhelpers/http_helpers.py
@@ -1,16 +1,37 @@
 import json
+import os
 import random
+import re
 import string
-
-from twisted.internet import protocol, defer
-
-from cyclone.web import RequestHandler, Application
-
+import tempfile
+from hashlib import sha256
+from urlparse import urlparse
+
+from cyclone.web import RequestHandler, Application, HTTPError
+from cyclone.web import asynchronous
+from twisted.internet import protocol, defer, reactor
+from twisted.internet.endpoints import TCP4ClientEndpoint
+from twisted.internet.error import ConnectionRefusedError
+from twisted.internet.error import DNSLookupError, TimeoutError
+from twisted.names import client as dns_client
+from twisted.names import dns
+from twisted.names.error import DNSNameError, DNSServerError
 from twisted.protocols import policies, basic
+from twisted.web.client import readBody
+from twisted.web.client import ContentDecoderAgent, GzipDecoder
+from twisted.web.client import PartialDownloadError
 from twisted.web.http import Request
 
 from oonib import log, randomStr
 
+from oonib.common.txextra import FixedRedirectAgent, TrueHeaders
+from oonib.common.txextra import TrueHeadersAgent
+from oonib.common.http_utils import representBody, extractTitle
+from oonib.common.http_utils import REQUEST_HEADERS
+from oonib.common.tcp_utils import TCPConnectFactory
+
+from oonib.handlers import OONIBHandler
+
 
 class SimpleHTTPChannel(basic.LineReceiver, policies.TimeoutMixin):
     """
@@ -168,7 +189,320 @@ class HTTPRandomPage(HTTPTrapAll):
             length = 100000
         self.write(self.genRandomPage(length, keyword))
 
+
+def encodeResponse(response):
+    body = None
+    body_length = 0
+    if (hasattr(response, 'body') and
+                response.body is not None):
+        body = response.body
+        body_length = len(response.body)
+    headers = {}
+    for k, v in response.headers.getAllRawHeaders():
+        headers[k.lower()] = unicode(v[0], errors='ignore')
+    return {
+        'headers': headers,
+        'code': response.code,
+        'body_length': body_length,
+        'body': representBody(body)
+    }
+
+def encodeResponses(response):
+    responses = []
+    responses += [encodeResponse(response)]
+    if response.previousResponse:
+        responses += encodeResponses(response.previousResponse)
+    return responses
+
+
+class WebConnectivityCache(object):
+    expiration_time = 200
+    enable_caching = True
+    http_retries = 2
+
+    def __init__(self):
+        self._response_types = (
+            'http_request',
+            'tcp_connect',
+            'dns_consistency'
+        )
+        self._cache_lifecycle = {}
+        self._cache_dir = tempfile.mkdtemp()
+        for response_type in self._response_types:
+            os.mkdir(os.path.join(self._cache_dir, response_type))
+            self._cache_lifecycle[response_type] = {}
+
+    @defer.inlineCallbacks
+    def expire_all(self):
+        for response_type in self._cache_lifecycle.keys():
+            for key_hash in self._cache_lifecycle[response_type].keys():
+                yield self.expire(response_type, key_hash)
+
+    @defer.inlineCallbacks
+    def cache_value(self, response_type, key, value):
+        if response_type not in self._response_types:
+            raise Exception("Invalid response type")
+        if self.enable_caching:
+            key_hash = sha256(key).hexdigest()
+            cache_file = os.path.join(self._cache_dir, response_type, key_hash)
+
+            if key_hash in self._cache_lifecycle[response_type]:
+                yield self.expire(response_type, key_hash)
+
+            self._cache_lifecycle[response_type][key_hash] = {
+                'expiration': reactor.callLater(self.expiration_time,
+                                                self.expire,
+                                                response_type, key_hash),
+                'lock': defer.DeferredLock()
+            }
+            lock = self._cache_lifecycle[response_type][key_hash]['lock']
+            yield lock.acquire()
+            with open(cache_file, 'w+') as fw:
+                json.dump(value, fw)
+            lock.release()
+
+    @defer.inlineCallbacks
+    def expire(self, response_type, key_hash):
+        if response_type not in self._response_types:
+            raise Exception("Invalid response type")
+        lifecycle = self._cache_lifecycle[response_type][key_hash]
+        if lifecycle['expiration'].active():
+            lifecycle['expiration'].cancel()
+
+        yield lifecycle['lock'].acquire()
+        try:
+            os.remove(os.path.join(self._cache_dir, response_type, key_hash))
+        except OSError:
+            pass
+        lifecycle['lock'].release()
+        del self._cache_lifecycle[response_type][key_hash]
+
+    @defer.inlineCallbacks
+    def lookup(self, response_type, key):
+        if not self.enable_caching:
+            defer.returnValue(None)
+
+        key_hash = sha256(key).hexdigest()
+        cache_file = os.path.join(self._cache_dir, response_type, key_hash)
+
+        if key_hash not in self._cache_lifecycle[response_type]:
+            defer.returnValue(None)
+
+        lock = self._cache_lifecycle[response_type][key_hash]['lock']
+        expiration = \
+            self._cache_lifecycle[response_type][key_hash]['expiration']
+
+        yield lock.acquire()
+
+        if not os.path.exists(cache_file):
+            lock.release()
+            defer.returnValue(None)
+
+        with open(cache_file, 'r') as fh:
+            value = json.load(fh)
+
+        expiration.reset(self.expiration_time)
+        lock.release()
+        defer.returnValue(value)
+
+    @defer.inlineCallbacks
+    def http_request(self, url, include_http_responses=False):
+        cached_value = yield self.lookup('http_request', url)
+        if cached_value is not None:
+            if include_http_responses is not True:
+                cached_value.pop('responses', None)
+            defer.returnValue(cached_value)
+
+        page_info = {
+            'body_length': -1,
+            'status_code': -1,
+            'headers': {},
+            'failure': None
+        }
+
+        agent = ContentDecoderAgent(
+            FixedRedirectAgent(TrueHeadersAgent(reactor)),
+                               [('gzip', GzipDecoder)]
+        )
+        try:
+            retries = 0
+            while True:
+                try:
+                    response = yield agent.request('GET', url,
+                                                   TrueHeaders(REQUEST_HEADERS))
+                    headers = {}
+                    for name, value in response.headers.getAllRawHeaders():
+                        headers[name] = unicode(value[0], errors='ignore')
+                    body_length = -1
+                    body = None
+                    try:
+                        body = yield readBody(response)
+                        body_length = len(body)
+                    except PartialDownloadError as pde:
+                        if pde.response:
+                            body_length = len(pde.response)
+                            body = pde.response
+                    page_info['body_length'] = body_length
+                    page_info['status_code'] = response.code
+                    page_info['headers'] = headers
+                    page_info['title'] = extractTitle(body)
+                    response.body = body
+                    page_info['responses'] = encodeResponses(response)
+                    break
+                except:
+                    if retries > self.http_retries:
+                        raise
+                    retries += 1
+        except DNSLookupError:
+            page_info['failure'] = 'dns_lookup_error'
+        except TimeoutError:
+            page_info['failure'] = 'generic_timeout_error'
+        except ConnectionRefusedError:
+            page_info['failure'] = 'connection_refused_error'
+        except:
+            # XXX map more failures
+            page_info['failure'] = 'unknown_error'
+
+        yield self.cache_value('http_request', url, page_info)
+        if include_http_responses is not True:
+            page_info.pop('responses', None)
+        defer.returnValue(page_info)
+
+    @defer.inlineCallbacks
+    def tcp_connect(self, socket):
+        cached_value = yield self.lookup('tcp_connect', socket)
+        if cached_value is not None:
+            defer.returnValue(cached_value)
+
+        socket_info = {
+            'status': None,
+            'failure': None
+        }
+
+        ip_address, port = socket.split(":")
+        try:
+            point = TCP4ClientEndpoint(reactor, ip_address, int(port))
+            yield point.connect(TCPConnectFactory())
+            socket_info['status'] = True
+        except TimeoutError:
+            socket_info['status'] = False
+            socket_info['failure'] = 'generic_timeout_error'
+        except ConnectionRefusedError:
+            socket_info['status'] = False
+            socket_info['failure'] = 'connection_refused_error'
+        except:
+            socket_info['status'] = False
+            socket_info['failure'] = 'unknown_error'
+        yield self.cache_value('tcp_connect', socket, socket_info)
+        defer.returnValue(socket_info)
+
+    @defer.inlineCallbacks
+    def dns_consistency(self, hostname):
+        cached_value = yield self.lookup('dns_consistency', hostname)
+        if cached_value is not None:
+            defer.returnValue(cached_value)
+
+        dns_info = {
+            'addrs': [],
+            'failure': None
+        }
+
+        try:
+            records = yield dns_client.lookupAddress(hostname)
+            answers = records[0]
+            for answer in answers:
+                if answer.type is dns.A:
+                    dns_info['addrs'].append(answer.payload.dottedQuad())
+                elif answer.type is dns.CNAME:
+                    dns_info['addrs'].append(answer.payload.name.name)
+        except DNSNameError:
+            dns_info['failure'] = 'dns_name_error'
+        except DNSServerError:
+            dns_info['failure'] = 'dns_server_failure'
+        except:
+            dns_info['failure'] = 'unknown_error'
+
+        yield self.cache_value('dns_consistency', hostname, dns_info)
+        defer.returnValue(dns_info)
+
+
+# Taken from
+# http://stackoverflow.com/questions/7160737/python-how-to-validate-a-url-in-python-malformed-or-not
+HTTP_REQUEST_REGEXP = re.compile(
+    r'^(?:http)s?://'  # http:// or https://
+    r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}\.?)|'  # domain...
+    r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})'  # ...or ip
+    r'(?::\d+)?'  # optional port
+    r'(?:/?|[/?]\S+)$', re.IGNORECASE)
+
+SOCKET_REGEXP = re.compile(r'^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}:\d+$')
+
+web_connectivity_cache = WebConnectivityCache()
+
+class WebConnectivity(OONIBHandler):
+    @defer.inlineCallbacks
+    def control_measurement(self, http_url, socket_list,
+                            include_http_responses):
+        hostname = urlparse(http_url).netloc
+        dl = [
+            web_connectivity_cache.http_request(http_url, include_http_responses),
+            web_connectivity_cache.dns_consistency(hostname)
+        ]
+        for socket in socket_list:
+            dl.append(web_connectivity_cache.tcp_connect(socket))
+        responses = yield defer.DeferredList(dl)
+        http_request = responses[0][1]
+        dns = responses[1][1]
+        tcp_connect = {}
+        for idx, response in enumerate(responses[2:]):
+            tcp_connect[socket_list[idx]] = response[1]
+        self.finish({
+            'http_request': http_request,
+            'tcp_connect': tcp_connect,
+            'dns': dns
+        })
+
+    def validate_request(self, request):
+        required_keys = ['http_request', 'tcp_connect']
+        for rk in required_keys:
+            if rk not in request.keys():
+                raise HTTPError(400, "Missing %s" % rk)
+        if not HTTP_REQUEST_REGEXP.match(request['http_request']):
+            raise HTTPError(400, "Invalid http_request URL")
+        if any([not SOCKET_REGEXP.match(socket)
+                for socket in request['tcp_connect']]):
+            raise HTTPError(400, "Invalid tcp_connect URL")
+
+    @asynchronous
+    def post(self):
+        try:
+            request = json.loads(self.request.body)
+            self.validate_request(request)
+            include_http_responses = request.get("include_http_responses",
+                                                 False)
+            self.control_measurement(
+                str(request['http_request']),
+                request['tcp_connect'],
+                include_http_responses
+            )
+        except HTTPError:
+            raise
+        except Exception as exc:
+            log.msg("Got invalid request")
+            log.exception(exc)
+            raise HTTPError(400, 'invalid request')
+
+class WebConnectivityStatus(RequestHandler):
+    def get(self):
+        self.write({"status": "ok"})
+
+
 HTTPRandomPageHelper = Application([
     # XXX add regexps here
     (r"/(.*)/(.*)", HTTPRandomPage)
 ])
+
+WebConnectivityHelper = Application([
+    (r"/status", WebConnectivityStatus),
+    (r"/", WebConnectivity)
+])





More information about the tor-commits mailing list