commit 5178fc299d361c9ab57002e619e40b05061f5a7d Author: Arturo Filastò arturo@filasto.net Date: Sat May 28 19:05:42 2016 +0200
Feature/web connectivity (#63)
* Add web connectivity test helper
* Forward test helper addresses to the host machine
* Align the request headers used by the web_connectivity test helper to those of the probe
* Add monkey patch for bug in twisted RedirectAgent:
https://twistedmatrix.com/trac/ticket/8265
* Add monkey patch for HTTPClientParser.statusReceived
* Use TrueHeaders in control request
* Add support for specifying endpoints in web_connectivity test helper
* Add endpoint for checking the status of the WebConnectivity test helper
* Add support for parsing test-helpers-alternate
* Fix key for web_connectivity test in example configuration file
* Implement on-disk web_connectivity cache
* Add support for Gzip content decoding
* Also record CNAME resolution.
* Rename ips to addrs
* Add support for retries in the http_request
* Add support for extracting title
* Encode the responses as well when debug mode
* Handle partial downloads
* Ignore errors when encoding headers
* Cast title to unicode and ignore errors
* Improvements based on feedback and comments by @bassosimone
* Move twisted related patches into txextra module
* Add support for returning the responses based on a key sent by the client.
* Inherit from OONIBHandler so we can get the error message format
* Stylistic improvements
* Set defaults in a way that oonib.conf can start from the example
* Avoid doing join on nonetype
* Address comments by @bassosimone
* Properly set the body also when we get a partial body downloaded
* Move more code into shared oonib.common module
* Fully sync the common module with ooni-probe (pulling in also other shared functionality so the Agents match entirely)
* Fix location of comment for the patched HTTPClient
* Add unittests and integration tests for web_connectivity --- Vagrantfile | 25 +++ oonib.conf.example | 18 +- oonib/bouncer/handlers.py | 14 +- oonib/common/__init__.py | 10 ++ oonib/common/http_utils.py | 54 ++++++ oonib/common/tcp_utils.py | 10 ++ oonib/common/txextra.py | 202 +++++++++++++++++++++ oonib/oonibackend.py | 29 ++- oonib/test/test_web_connectivity.py | 71 ++++++++ oonib/testhelpers/http_helpers.py | 344 +++++++++++++++++++++++++++++++++++- 10 files changed, 758 insertions(+), 19 deletions(-)
diff --git a/Vagrantfile b/Vagrantfile index b71e4e7..7aa5f71 100644 --- a/Vagrantfile +++ b/Vagrantfile @@ -4,6 +4,31 @@ Vagrant.configure("2") do |config| config.vm.box = "precise32" config.vm.box_url = "http://files.vagrantup.com/precise32.box" + + # Create a forwarded port mapping which allows access to a specific port + # within the machine from a port on the host machine. In the example below, + # accessing "localhost:8080" will access port 80 on the guest machine. + config.vm.network :forwarded_port, guest: 57001, host: 57001 + config.vm.network :forwarded_port, guest: 57001, host: 57002 + config.vm.network :forwarded_port, guest: 57001, host: 57003 + config.vm.network :forwarded_port, guest: 57004, host: 57004 + config.vm.network :forwarded_port, guest: 57005, host: 57005 + config.vm.network :forwarded_port, guest: 57006, host: 57006 + config.vm.network :forwarded_port, guest: 57007, host: 57007 + + # Create a private network, which allows host-only access to the machine + # using a specific IP. + # config.vm.network :private_network, ip: "192.168.33.10" + + # Create a public network, which generally matched to bridged network. + # Bridged networks make the machine appear as another physical device on + # your network. + # config.vm.network :public_network + + # Share an additional folder to the guest VM. The first argument is + # the path on the host to the actual folder. The second argument is + # the path on the guest to mount the folder. And the optional third + # argument is a set of non-required options. config.vm.synced_folder ".", "/data/oonib" end
diff --git a/oonib.conf.example b/oonib.conf.example index 1925d81..6f4c7f3 100644 --- a/oonib.conf.example +++ b/oonib.conf.example @@ -30,12 +30,13 @@ main: tor_datadir: null
bouncer_endpoints: - - {type: tls, port: 10443, cert: "private/ssl-key-and-cert.pem"} + #- {type: tls, port: 10443, cert: "private/ssl-key-and-cert.pem"} - {type: tcp, port: 10080} - - {type: onion, hsdir: bouncer} + - {type: onion, hsdir: /tmp/bouncer}
collector_endpoints: - - {type: tls, port: 11443, cert: "private/ssl-key-and-cert.pem"} + #- {type: tls, port: 11443, cert: "private/ssl-key-and-cert.pem"} + - {type: onion, hsdir: /tmp/collector}
report_file_template: '{iso8601_timestamp}-{test_name}-{report_id}-{probe_asn}-{probe_cc}-probe-0.2.0.{ext}' helpers: @@ -62,12 +63,17 @@ helpers:
dns_discovery: address: null - udp_port: 53 - tcp_port: 53 + udp_port: null + tcp_port: null resolver_address: null
ssl: address: null private_key: 'private.key' certificate: 'certificate.crt' - port: 57006 + #port: 57006 + port: null + + web_connectivity: + endpoints: + - {type: tcp, port: 57007} diff --git a/oonib/bouncer/handlers.py b/oonib/bouncer/handlers.py index 1c7627c..a9370c0 100644 --- a/oonib/bouncer/handlers.py +++ b/oonib/bouncer/handlers.py @@ -159,9 +159,12 @@ class Bouncer(object): requested_nettest['input-hashes'], requested_nettest['test-helpers']) test_helpers = {} + test_helpers_alternate = {} for test_helper in requested_nettest['test-helpers']: + collector_info = self.bouncerFile['collector'][collector] try: - test_helpers[test_helper] = self.bouncerFile['collector'][collector]['test-helper'][test_helper] + test_helpers[test_helper] = \ + collector_info['test-helper'][test_helper] except KeyError: helpers = self.knownHelpers.get(test_helper) if not helpers: @@ -169,12 +172,19 @@ class Bouncer(object): helper = random.choice(helpers) test_helpers[test_helper] = helper['helper-address']
+ try: + test_helpers_alternate[test_helper] = \ + collector_info['test-helper-alternate'][test_helper] + except KeyError: + pass + nettest = { 'name': requested_nettest['name'], 'version': requested_nettest['version'], 'input-hashes': requested_nettest['input-hashes'], 'test-helpers': test_helpers, - 'collector': collector, + 'test-helpers-alternate': test_helpers_alternate, + 'collector': collector } nettests.append(nettest) return {'net-tests': nettests} diff --git a/oonib/common/__init__.py b/oonib/common/__init__.py new file mode 100644 index 0000000..7f6cf73 --- /dev/null +++ b/oonib/common/__init__.py @@ -0,0 +1,10 @@ +""" +This modules contains functionality that is shared amongst ooni-probe and +ooni-backend. If the code in here starts growing too much I think it would +make sense to either: + + * Make the code in here into it's own package that is imported by + ooni-probe and ooni-backend. + + * Merge ooniprobe with oonibackend. +""" diff --git a/oonib/common/http_utils.py b/oonib/common/http_utils.py new file mode 100644 index 0000000..6f9db2d --- /dev/null +++ b/oonib/common/http_utils.py @@ -0,0 +1,54 @@ +import re +import codecs +from base64 import b64encode + +META_CHARSET_REGEXP = re.compile('<meta(?!\s*(?:name|value)\s*=)[^>]*?charset\s*=[\s"']*([^\s"'/>]*)') + +def representBody(body): + if not body: + return body + body = body.replace('\0', '') + decoded = False + charsets = ['ascii', 'utf-8'] + + # If we are able to detect the charset of body from the meta tag + # try to decode using that one first + charset = META_CHARSET_REGEXP.search(body, re.IGNORECASE) + if charset: + try: + encoding = charset.group(1).lower() + codecs.lookup(encoding) + charsets.insert(0, encoding) + except (LookupError, IndexError): + # Skip invalid codecs and partial regexp match + pass + + for encoding in charsets: + try: + body = unicode(body, encoding) + decoded = True + break + except UnicodeDecodeError: + pass + if not decoded: + body = { + 'data': b64encode(body), + 'format': 'base64' + } + return body + +TITLE_REGEXP = re.compile("<title>(.*?)</title>", re.IGNORECASE | re.DOTALL) + +def extractTitle(body): + m = TITLE_REGEXP.search(body, re.IGNORECASE | re.DOTALL) + if m: + return unicode(m.group(1), errors='ignore') + return '' + +REQUEST_HEADERS = { + 'User-Agent': ['Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, ' + 'like Gecko) Chrome/47.0.2526.106 Safari/537.36'], + 'Accept-Language': ['en-US;q=0.8,en;q=0.5'], + 'Accept': ['text/html,application/xhtml+xml,application/xml;q=0.9,' + '*/*;q=0.8'] +} diff --git a/oonib/common/tcp_utils.py b/oonib/common/tcp_utils.py new file mode 100644 index 0000000..7b7a8a4 --- /dev/null +++ b/oonib/common/tcp_utils.py @@ -0,0 +1,10 @@ +from twisted.internet.protocol import Factory, Protocol + +class TCPConnectProtocol(Protocol): + def connectionMade(self): + self.transport.loseConnection() + +class TCPConnectFactory(Factory): + noisy = False + def buildProtocol(self, addr): + return TCPConnectProtocol() diff --git a/oonib/common/txextra.py b/oonib/common/txextra.py new file mode 100644 index 0000000..7a84592 --- /dev/null +++ b/oonib/common/txextra.py @@ -0,0 +1,202 @@ +import itertools +from copy import copy + +from twisted.web.http_headers import Headers +from twisted.web import error + +from twisted.web.client import BrowserLikeRedirectAgent +from twisted.web._newclient import ResponseFailed +from twisted.web._newclient import HTTPClientParser, ParseError +from twisted.python.failure import Failure + +from twisted.web import client, _newclient + +from twisted.web._newclient import RequestNotSent, RequestGenerationFailed +from twisted.web._newclient import TransportProxyProducer, STATUS + +from twisted.internet import reactor +from twisted.internet.defer import Deferred, fail, maybeDeferred, failure + +from twisted.python import log + +class TrueHeaders(Headers): + def __init__(self, rawHeaders=None): + self._rawHeaders = dict() + if rawHeaders is not None: + for name, values in rawHeaders.iteritems(): + if type(values) is list: + self.setRawHeaders(name, values[:]) + elif type(values) is str: + self.setRawHeaders(name, values) + + def setRawHeaders(self, name, values): + if name.lower() not in self._rawHeaders: + self._rawHeaders[name.lower()] = dict() + self._rawHeaders[name.lower()]['name'] = name + self._rawHeaders[name.lower()]['values'] = values + + def getAllRawHeaders(self): + for _, v in self._rawHeaders.iteritems(): + yield v['name'], v['values'] + + def getRawHeaders(self, name, default=None): + if name.lower() in self._rawHeaders: + return self._rawHeaders[name.lower()]['values'] + return default + + + def getDiff(self, headers, ignore=[]): + """ + + Args: + + headers: a TrueHeaders object + + ignore: specify a list of header fields to ignore + + Returns: + + a set containing the header names that are not present in + header_dict or not present in self. + """ + diff = set() + field_names = [] + + headers_a = copy(self) + headers_b = copy(headers) + for name in ignore: + try: + del headers_a._rawHeaders[name.lower()] + except KeyError: + pass + try: + del headers_b._rawHeaders[name.lower()] + except KeyError: + pass + + for k, v in itertools.chain(headers_a.getAllRawHeaders(), + headers_b.getAllRawHeaders()): + field_names.append(k) + + for name in field_names: + if self.getRawHeaders(name) and headers.getRawHeaders(name): + pass + else: + diff.add(name) + return list(diff) + +class HTTPClientParser(_newclient.HTTPClientParser): + def logPrefix(self): + return 'HTTPClientParser' + + def connectionMade(self): + self.headers = TrueHeaders() + self.connHeaders = TrueHeaders() + self.state = STATUS + self._partialHeader = None + + def headerReceived(self, name, value): + if self.isConnectionControlHeader(name.lower()): + headers = self.connHeaders + else: + headers = self.headers + headers.addRawHeader(name, value) + + def statusReceived(self, status): + # This is a fix for invalid number of parts + try: + return _newclient.HTTPClientParser.statusReceived(self, status) + except ParseError as exc: + if exc.args[0] == 'wrong number of parts': + return _newclient.HTTPClientParser.statusReceived(self, + status + " XXX") + raise + +class HTTP11ClientProtocol(_newclient.HTTP11ClientProtocol): + def request(self, request): + if self._state != 'QUIESCENT': + return fail(RequestNotSent()) + + self._state = 'TRANSMITTING' + _requestDeferred = maybeDeferred(request.writeTo, self.transport) + self._finishedRequest = Deferred() + + self._currentRequest = request + + self._transportProxy = TransportProxyProducer(self.transport) + self._parser = HTTPClientParser(request, self._finishResponse) + self._parser.makeConnection(self._transportProxy) + self._responseDeferred = self._parser._responseDeferred + + def cbRequestWrotten(ignored): + if self._state == 'TRANSMITTING': + self._state = 'WAITING' + self._responseDeferred.chainDeferred(self._finishedRequest) + + def ebRequestWriting(err): + if self._state == 'TRANSMITTING': + self._state = 'GENERATION_FAILED' + self.transport.loseConnection() + self._finishedRequest.errback( + failure.Failure(RequestGenerationFailed([err]))) + else: + log.err(err, 'Error writing request, but not in valid state ' + 'to finalize request: %s' % self._state) + + _requestDeferred.addCallbacks(cbRequestWrotten, ebRequestWriting) + + return self._finishedRequest + + +class _HTTP11ClientFactory(client._HTTP11ClientFactory): + noisy = False + + def buildProtocol(self, addr): + return HTTP11ClientProtocol(self._quiescentCallback) + + +class HTTPConnectionPool(client.HTTPConnectionPool): + _factory = _HTTP11ClientFactory + +class TrueHeadersAgent(client.Agent): + def __init__(self, *args, **kw): + super(TrueHeadersAgent, self).__init__(*args, **kw) + self._pool = HTTPConnectionPool(reactor, False) + +class FixedRedirectAgent(BrowserLikeRedirectAgent): + """ + This is a redirect agent with this patch manually applied: + https://twistedmatrix.com/trac/ticket/8265 + """ + def _handleRedirect(self, response, method, uri, headers, redirectCount): + """ + Handle a redirect response, checking the number of redirects already + followed, and extracting the location header fields. + + This is patched to fix a bug in infinite redirect loop. + """ + if redirectCount >= self._redirectLimit: + err = error.InfiniteRedirection( + response.code, + b'Infinite redirection detected', + location=uri) + raise ResponseFailed([Failure(err)], response) + locationHeaders = response.headers.getRawHeaders(b'location', []) + if not locationHeaders: + err = error.RedirectWithNoLocation( + response.code, b'No location header field', uri) + raise ResponseFailed([Failure(err)], response) + location = self._resolveLocation( + # This is the fix to properly handle redirects + response.request.absoluteURI, + locationHeaders[0] + ) + deferred = self._agent.request(method, location, headers) + + def _chainResponse(newResponse): + newResponse.setPreviousResponse(response) + return newResponse + + deferred.addCallback(_chainResponse) + return deferred.addCallback( + self._handleResponse, method, uri, headers, redirectCount + 1) diff --git a/oonib/oonibackend.py b/oonib/oonibackend.py index 601a972..ebf3c52 100644 --- a/oonib/oonibackend.py +++ b/oonib/oonibackend.py @@ -118,15 +118,22 @@ def getHSEndpoint(endpoint_config): data_dir=hsdir)
def getTCPEndpoint(endpoint_config): - return endpoints.TCP4ServerEndpoint(reactor, endpoint_config['port']) + return endpoints.TCP4ServerEndpoint( + reactor=reactor, + port=endpoint_config['port'], + interface=endpoint_config.get('address', '') + )
def getTLSEndpoint(endpoint_config): with open(endpoint_config['cert'], 'r') as f: cert_data = f.read() certificate = ssl.PrivateCertificate.loadPEM(cert_data) - return endpoints.SSL4ServerEndpoint(reactor, - endpoint_config['port'], - certificate.options()) + return endpoints.SSL4ServerEndpoint( + reactor=reactor, + port=endpoint_config['port'], + sslContextFactory=certificate.options(), + interface=endpoint_config.get('address', '') + )
def getEndpoint(endpoint_config): if endpoint_config['type'] == 'onion': @@ -143,6 +150,8 @@ def createService(endpoint, role, endpoint_config): factory = ooniBouncer elif role == 'collector': factory = ooniBackend + elif role == 'web_connectivity': + factory = http_helpers.WebConnectivityHelper else: raise Exception("unknown service type")
@@ -157,8 +166,11 @@ def createService(endpoint, role, endpoint_config): if config.main.tor_hidden_service and \ config.main.bouncer_endpoints is None and \ config.main.collector_endpoints is None: - bouncer_hsdir = os.path.join(config.main.tor_datadir, 'bouncer') - collector_hsdir = os.path.join(config.main.tor_datadir, 'collector') + base_dir = '.' + if config.main.tor_datadir is not None: + base_dir = config.main.tor_datadir + bouncer_hsdir = os.path.join(base_dir, 'bouncer') + collector_hsdir = os.path.join(base_dir, 'collector') config.main.bouncer_endpoints = [ {'type': 'onion', 'hsdir': bouncer_hsdir} ] config.main.collector_endpoints = [ {'type': 'onion', 'hsdir': collector_hsdir} ]
@@ -174,3 +186,8 @@ for endpoint_config in config.main.get('collector_endpoints', []): print "Starting collector with config %s" % endpoint_config endpoint = getEndpoint(endpoint_config) createService(endpoint, 'collector', endpoint_config) + +for endpoint_config in config.helpers.web_connectivity.get('endpoints', []): + print "Starting web_connectivity helper with config %s" % endpoint_config + endpoint = getEndpoint(endpoint_config) + createService(endpoint, 'web_connectivity', endpoint_config) diff --git a/oonib/test/test_web_connectivity.py b/oonib/test/test_web_connectivity.py new file mode 100644 index 0000000..24e2be0 --- /dev/null +++ b/oonib/test/test_web_connectivity.py @@ -0,0 +1,71 @@ +from hashlib import sha256 +from twisted.internet import defer +from twisted.trial import unittest + +from oonib.testhelpers.http_helpers import WebConnectivityCache + +class WebConnectivityCacheTestCase(unittest.TestCase): + def setUp(self): + self.web_connectivity_cache = WebConnectivityCache() + + def tearDown(self): + return self.web_connectivity_cache.expire_all() + + @defer.inlineCallbacks + def test_http_request(self): + value = yield self.web_connectivity_cache.http_request( + 'https://www.google.com/humans.txt') + self.assertEqual( + value['body_length'], 286 + ) + self.assertEqual( + value['status_code'], 200 + ) + self.assertIsInstance(value['headers'], + dict) + + @defer.inlineCallbacks + def test_dns_consistency(self): + # The twisted.names resolve set a reactor.callLater() on parsing the + # resolv.conf and this leads to the reactor being dirty. Look into + # a clean way to solve this and reactive this integration test. + self.skipTest("Skipping to avoid dirty reactor") + value = yield self.web_connectivity_cache.dns_consistency( + 'www.torproject.org') + self.assertIsInstance( + value['addrs'], + list + ) + self.assertIn( + 'failure', + value.keys() + ) + + @defer.inlineCallbacks + def test_tcp_connect(self): + value = yield self.web_connectivity_cache.tcp_connect( + '216.58.213.14:80') + self.assertIsInstance( + value['status'], + bool + ) + self.assertIn( + 'failure', + value.keys() + ) + + @defer.inlineCallbacks + def test_cache_lifecycle(self): + key = 'http://example.com' + key_hash = sha256(key).hexdigest() + value = {'spam': 'ham'} + + miss = yield self.web_connectivity_cache.lookup('http_request', key) + self.assertEqual(miss, None) + + yield self.web_connectivity_cache.cache_value('http_request', key, + value) + hit = yield self.web_connectivity_cache.lookup('http_request', key) + self.assertEqual(hit, value) + + yield self.web_connectivity_cache.expire('http_request', key_hash) diff --git a/oonib/testhelpers/http_helpers.py b/oonib/testhelpers/http_helpers.py index a28cbad..1cddf24 100644 --- a/oonib/testhelpers/http_helpers.py +++ b/oonib/testhelpers/http_helpers.py @@ -1,16 +1,37 @@ import json +import os import random +import re import string - -from twisted.internet import protocol, defer - -from cyclone.web import RequestHandler, Application - +import tempfile +from hashlib import sha256 +from urlparse import urlparse + +from cyclone.web import RequestHandler, Application, HTTPError +from cyclone.web import asynchronous +from twisted.internet import protocol, defer, reactor +from twisted.internet.endpoints import TCP4ClientEndpoint +from twisted.internet.error import ConnectionRefusedError +from twisted.internet.error import DNSLookupError, TimeoutError +from twisted.names import client as dns_client +from twisted.names import dns +from twisted.names.error import DNSNameError, DNSServerError from twisted.protocols import policies, basic +from twisted.web.client import readBody +from twisted.web.client import ContentDecoderAgent, GzipDecoder +from twisted.web.client import PartialDownloadError from twisted.web.http import Request
from oonib import log, randomStr
+from oonib.common.txextra import FixedRedirectAgent, TrueHeaders +from oonib.common.txextra import TrueHeadersAgent +from oonib.common.http_utils import representBody, extractTitle +from oonib.common.http_utils import REQUEST_HEADERS +from oonib.common.tcp_utils import TCPConnectFactory + +from oonib.handlers import OONIBHandler +
class SimpleHTTPChannel(basic.LineReceiver, policies.TimeoutMixin): """ @@ -168,7 +189,320 @@ class HTTPRandomPage(HTTPTrapAll): length = 100000 self.write(self.genRandomPage(length, keyword))
+ +def encodeResponse(response): + body = None + body_length = 0 + if (hasattr(response, 'body') and + response.body is not None): + body = response.body + body_length = len(response.body) + headers = {} + for k, v in response.headers.getAllRawHeaders(): + headers[k.lower()] = unicode(v[0], errors='ignore') + return { + 'headers': headers, + 'code': response.code, + 'body_length': body_length, + 'body': representBody(body) + } + +def encodeResponses(response): + responses = [] + responses += [encodeResponse(response)] + if response.previousResponse: + responses += encodeResponses(response.previousResponse) + return responses + + +class WebConnectivityCache(object): + expiration_time = 200 + enable_caching = True + http_retries = 2 + + def __init__(self): + self._response_types = ( + 'http_request', + 'tcp_connect', + 'dns_consistency' + ) + self._cache_lifecycle = {} + self._cache_dir = tempfile.mkdtemp() + for response_type in self._response_types: + os.mkdir(os.path.join(self._cache_dir, response_type)) + self._cache_lifecycle[response_type] = {} + + @defer.inlineCallbacks + def expire_all(self): + for response_type in self._cache_lifecycle.keys(): + for key_hash in self._cache_lifecycle[response_type].keys(): + yield self.expire(response_type, key_hash) + + @defer.inlineCallbacks + def cache_value(self, response_type, key, value): + if response_type not in self._response_types: + raise Exception("Invalid response type") + if self.enable_caching: + key_hash = sha256(key).hexdigest() + cache_file = os.path.join(self._cache_dir, response_type, key_hash) + + if key_hash in self._cache_lifecycle[response_type]: + yield self.expire(response_type, key_hash) + + self._cache_lifecycle[response_type][key_hash] = { + 'expiration': reactor.callLater(self.expiration_time, + self.expire, + response_type, key_hash), + 'lock': defer.DeferredLock() + } + lock = self._cache_lifecycle[response_type][key_hash]['lock'] + yield lock.acquire() + with open(cache_file, 'w+') as fw: + json.dump(value, fw) + lock.release() + + @defer.inlineCallbacks + def expire(self, response_type, key_hash): + if response_type not in self._response_types: + raise Exception("Invalid response type") + lifecycle = self._cache_lifecycle[response_type][key_hash] + if lifecycle['expiration'].active(): + lifecycle['expiration'].cancel() + + yield lifecycle['lock'].acquire() + try: + os.remove(os.path.join(self._cache_dir, response_type, key_hash)) + except OSError: + pass + lifecycle['lock'].release() + del self._cache_lifecycle[response_type][key_hash] + + @defer.inlineCallbacks + def lookup(self, response_type, key): + if not self.enable_caching: + defer.returnValue(None) + + key_hash = sha256(key).hexdigest() + cache_file = os.path.join(self._cache_dir, response_type, key_hash) + + if key_hash not in self._cache_lifecycle[response_type]: + defer.returnValue(None) + + lock = self._cache_lifecycle[response_type][key_hash]['lock'] + expiration = \ + self._cache_lifecycle[response_type][key_hash]['expiration'] + + yield lock.acquire() + + if not os.path.exists(cache_file): + lock.release() + defer.returnValue(None) + + with open(cache_file, 'r') as fh: + value = json.load(fh) + + expiration.reset(self.expiration_time) + lock.release() + defer.returnValue(value) + + @defer.inlineCallbacks + def http_request(self, url, include_http_responses=False): + cached_value = yield self.lookup('http_request', url) + if cached_value is not None: + if include_http_responses is not True: + cached_value.pop('responses', None) + defer.returnValue(cached_value) + + page_info = { + 'body_length': -1, + 'status_code': -1, + 'headers': {}, + 'failure': None + } + + agent = ContentDecoderAgent( + FixedRedirectAgent(TrueHeadersAgent(reactor)), + [('gzip', GzipDecoder)] + ) + try: + retries = 0 + while True: + try: + response = yield agent.request('GET', url, + TrueHeaders(REQUEST_HEADERS)) + headers = {} + for name, value in response.headers.getAllRawHeaders(): + headers[name] = unicode(value[0], errors='ignore') + body_length = -1 + body = None + try: + body = yield readBody(response) + body_length = len(body) + except PartialDownloadError as pde: + if pde.response: + body_length = len(pde.response) + body = pde.response + page_info['body_length'] = body_length + page_info['status_code'] = response.code + page_info['headers'] = headers + page_info['title'] = extractTitle(body) + response.body = body + page_info['responses'] = encodeResponses(response) + break + except: + if retries > self.http_retries: + raise + retries += 1 + except DNSLookupError: + page_info['failure'] = 'dns_lookup_error' + except TimeoutError: + page_info['failure'] = 'generic_timeout_error' + except ConnectionRefusedError: + page_info['failure'] = 'connection_refused_error' + except: + # XXX map more failures + page_info['failure'] = 'unknown_error' + + yield self.cache_value('http_request', url, page_info) + if include_http_responses is not True: + page_info.pop('responses', None) + defer.returnValue(page_info) + + @defer.inlineCallbacks + def tcp_connect(self, socket): + cached_value = yield self.lookup('tcp_connect', socket) + if cached_value is not None: + defer.returnValue(cached_value) + + socket_info = { + 'status': None, + 'failure': None + } + + ip_address, port = socket.split(":") + try: + point = TCP4ClientEndpoint(reactor, ip_address, int(port)) + yield point.connect(TCPConnectFactory()) + socket_info['status'] = True + except TimeoutError: + socket_info['status'] = False + socket_info['failure'] = 'generic_timeout_error' + except ConnectionRefusedError: + socket_info['status'] = False + socket_info['failure'] = 'connection_refused_error' + except: + socket_info['status'] = False + socket_info['failure'] = 'unknown_error' + yield self.cache_value('tcp_connect', socket, socket_info) + defer.returnValue(socket_info) + + @defer.inlineCallbacks + def dns_consistency(self, hostname): + cached_value = yield self.lookup('dns_consistency', hostname) + if cached_value is not None: + defer.returnValue(cached_value) + + dns_info = { + 'addrs': [], + 'failure': None + } + + try: + records = yield dns_client.lookupAddress(hostname) + answers = records[0] + for answer in answers: + if answer.type is dns.A: + dns_info['addrs'].append(answer.payload.dottedQuad()) + elif answer.type is dns.CNAME: + dns_info['addrs'].append(answer.payload.name.name) + except DNSNameError: + dns_info['failure'] = 'dns_name_error' + except DNSServerError: + dns_info['failure'] = 'dns_server_failure' + except: + dns_info['failure'] = 'unknown_error' + + yield self.cache_value('dns_consistency', hostname, dns_info) + defer.returnValue(dns_info) + + +# Taken from +# http://stackoverflow.com/questions/7160737/python-how-to-validate-a-url-in-p... +HTTP_REQUEST_REGEXP = re.compile( + r'^(?:http)s?://' # http:// or https:// + r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?.)+(?:[A-Z]{2,6}.?|[A-Z0-9-]{2,}.?)|' # domain... + r'\d{1,3}.\d{1,3}.\d{1,3}.\d{1,3})' # ...or ip + r'(?::\d+)?' # optional port + r'(?:/?|[/?]\S+)$', re.IGNORECASE) + +SOCKET_REGEXP = re.compile(r'^\d{1,3}.\d{1,3}.\d{1,3}.\d{1,3}:\d+$') + +web_connectivity_cache = WebConnectivityCache() + +class WebConnectivity(OONIBHandler): + @defer.inlineCallbacks + def control_measurement(self, http_url, socket_list, + include_http_responses): + hostname = urlparse(http_url).netloc + dl = [ + web_connectivity_cache.http_request(http_url, include_http_responses), + web_connectivity_cache.dns_consistency(hostname) + ] + for socket in socket_list: + dl.append(web_connectivity_cache.tcp_connect(socket)) + responses = yield defer.DeferredList(dl) + http_request = responses[0][1] + dns = responses[1][1] + tcp_connect = {} + for idx, response in enumerate(responses[2:]): + tcp_connect[socket_list[idx]] = response[1] + self.finish({ + 'http_request': http_request, + 'tcp_connect': tcp_connect, + 'dns': dns + }) + + def validate_request(self, request): + required_keys = ['http_request', 'tcp_connect'] + for rk in required_keys: + if rk not in request.keys(): + raise HTTPError(400, "Missing %s" % rk) + if not HTTP_REQUEST_REGEXP.match(request['http_request']): + raise HTTPError(400, "Invalid http_request URL") + if any([not SOCKET_REGEXP.match(socket) + for socket in request['tcp_connect']]): + raise HTTPError(400, "Invalid tcp_connect URL") + + @asynchronous + def post(self): + try: + request = json.loads(self.request.body) + self.validate_request(request) + include_http_responses = request.get("include_http_responses", + False) + self.control_measurement( + str(request['http_request']), + request['tcp_connect'], + include_http_responses + ) + except HTTPError: + raise + except Exception as exc: + log.msg("Got invalid request") + log.exception(exc) + raise HTTPError(400, 'invalid request') + +class WebConnectivityStatus(RequestHandler): + def get(self): + self.write({"status": "ok"}) + + HTTPRandomPageHelper = Application([ # XXX add regexps here (r"/(.*)/(.*)", HTTPRandomPage) ]) + +WebConnectivityHelper = Application([ + (r"/status", WebConnectivityStatus), + (r"/", WebConnectivity) +])
tor-commits@lists.torproject.org