commit 170bad3df93f2d30e1c96a48ff4f0f2f384cf062 Author: Arturo Filastò art@fuffa.org Date: Wed May 14 15:48:19 2014 +0200
Pylint and pyflakes improvements to all stable tests. --- ooni/nettests/blocking/http_requests.py | 35 ++++++------ ooni/nettests/blocking/tcp_connect.py | 25 +++++---- ooni/nettests/manipulation/dns_spoof.py | 49 ++++++++++------- .../manipulation/http_header_field_manipulation.py | 51 ++++++++++++------ ooni/nettests/manipulation/http_host.py | 27 ++++++---- .../manipulation/http_invalid_request_line.py | 15 ++++-- ooni/nettests/manipulation/traceroute.py | 57 +++++++++++--------- 7 files changed, 159 insertions(+), 100 deletions(-)
diff --git a/ooni/nettests/blocking/http_requests.py b/ooni/nettests/blocking/http_requests.py index c660400..e5524ee 100644 --- a/ooni/nettests/blocking/http_requests.py +++ b/ooni/nettests/blocking/http_requests.py @@ -4,21 +4,23 @@ # :licence: see LICENSE
import random -from twisted.internet import defer from twisted.python import usage, failure
from ooni.utils import log from ooni.utils.net import userAgents from ooni.templates import httpt -from ooni.errors import failureToString, handleAllFailures +from ooni.errors import failureToString +
class UsageOptions(usage.Options): optParameters = [ - ['url', 'u', None, 'Specify a single URL to test.'], - ['factor', 'f', 0.8, 'What factor should be used for triggering censorship (0.8 == 80%)'] - ] + ['url', 'u', None, 'Specify a single URL to test.'], + ['factor', 'f', 0.8, + 'What factor should be used for triggering censorship (0.8 == 80%)']] +
class HTTPRequestsTest(httpt.HTTPTest): + """ Performs a two GET requests to the set of sites to be tested for censorship, one over a known good control channel (Tor), the other over the @@ -28,14 +30,15 @@ class HTTPRequestsTest(httpt.HTTPTest): lengths match. """ name = "HTTP Requests" - description = "Performs a HTTP GET request over Tor and one over the local network and compares the two results." + description = "Performs a HTTP GET request over Tor and one over the " \ + "local network and compares the two results." author = "Arturo Filastò" version = "0.2.4"
usageOptions = UsageOptions
inputFile = ['file', 'f', None, - 'List of URLS to perform GET and POST requests to'] + 'List of URLS to perform GET and POST requests to'] requiresRoot = False requiresTor = False
@@ -58,7 +61,7 @@ class HTTPRequestsTest(httpt.HTTPTest): self.factor = self.localOptions['factor'] self.report['control_failure'] = None self.report['experiment_failure'] = None - self.report['body_length_match'] = None + self.report['body_length_match'] = None self.report['body_proportion'] = None self.report['factor'] = float(self.factor) self.report['headers_diff'] = None @@ -101,31 +104,33 @@ class HTTPRequestsTest(httpt.HTTPTest):
def test_get_experiment(self): log.msg("Performing GET request to %s" % self.url) - return self.doRequest(self.url, method="GET", - use_tor=False, headers=self.headers) + return self.doRequest(self.url, method="GET", + use_tor=False, headers=self.headers)
def test_get_control(self): log.msg("Performing GET request to %s over Tor" % self.url) return self.doRequest(self.url, method="GET", - use_tor=True, headers=self.headers) + use_tor=True, headers=self.headers)
def postProcessor(self, measurements): experiment = control = None for status, measurement in measurements: if 'experiment' in str(measurement.netTestMethod): if isinstance(measurement.result, failure.Failure): - self.report['experiment_failure'] = failureToString(measurement.result) + self.report['experiment_failure'] = failureToString( + measurement.result) else: experiment = measurement.result elif 'control' in str(measurement.netTestMethod): if isinstance(measurement.result, failure.Failure): - self.report['control_failure'] = failureToString(measurement.result) + self.report['control_failure'] = failureToString( + measurement.result) else: control = measurement.result
if experiment and control: self.compare_body_lengths(len(control.body), - len(experiment.body)) + len(experiment.body)) self.compare_headers(control.headers, - experiment.headers) + experiment.headers) return self.report diff --git a/ooni/nettests/blocking/tcp_connect.py b/ooni/nettests/blocking/tcp_connect.py index 84ceb2d..852be68 100644 --- a/ooni/nettests/blocking/tcp_connect.py +++ b/ooni/nettests/blocking/tcp_connect.py @@ -2,35 +2,42 @@ from twisted.internet.protocol import Factory, Protocol from twisted.internet.endpoints import TCP4ClientEndpoint
-from twisted.internet.error import ConnectionRefusedError -from twisted.internet.error import TCPTimedOutError, TimeoutError - from ooni import nettest from ooni.errors import handleAllFailures from ooni.utils import log
+ class TCPFactory(Factory): + def buildProtocol(self, addr): return Protocol()
+ class TCPConnectTest(nettest.NetTestCase): name = "TCP Connect" - description = "Performs a TCP connect scan of all the host port combinations given as input." + description = "Performs a TCP connect scan of all the " \ + "host port combinations given as input." author = "Arturo Filastò" version = "0.1" - inputFile = ['file', 'f', None, - 'File containing the IP:PORT combinations to be tested, one per line'] - + inputFile = [ + 'file', + 'f', + None, + 'File containing the IP:PORT combinations to be tested, one per line'] + requiresTor = False requiresRoot = False requiredOptions = ['file'] + def test_connect(self): """ - This test performs a TCP connection to the remote host on the specified port. - the report will contains the string 'success' if the test has + This test performs a TCP connection to the remote host on the + specified port. + The report will contains the string 'success' if the test has succeeded, or the reason for the failure if it has failed. """ host, port = self.input.split(":") + def connectionSuccess(protocol): protocol.transport.loseConnection() log.debug("Got a connection to %s" % self.input) diff --git a/ooni/nettests/manipulation/dns_spoof.py b/ooni/nettests/manipulation/dns_spoof.py index 15aad07..aa464d4 100644 --- a/ooni/nettests/manipulation/dns_spoof.py +++ b/ooni/nettests/manipulation/dns_spoof.py @@ -11,19 +11,20 @@ from scapy.all import IP, UDP, DNS, DNSQR from ooni.templates import scapyt from ooni.utils import log
+ class UsageOptions(usage.Options): - optParameters = [['resolver', 'r', None, - 'Specify the resolver that should be used for DNS queries (ip:port)'], - ['hostname', 'h', None, - 'Specify the hostname of a censored site'], - ['backend', 'b', None, - 'Specify the IP address of a good DNS resolver (ip:port)'] - ] + optParameters = [ + ['resolver', 'r', None, + 'Specify the resolver that should be used for DNS queries (ip:port)'], + ['hostname', 'h', None, 'Specify the hostname of a censored site'], + ['backend', 'b', None, + 'Specify the IP address of a good DNS resolver (ip:port)']]
class DNSSpoof(scapyt.ScapyTest): name = "DNS Spoof" - description = "Used to validate if the type of censorship happening is DNS spoofing or not." + description = "Used to validate if the type of censorship " \ + "happening is DNS spoofing or not." author = "Arturo Filastò" version = "0.0.1" timeout = 2 @@ -36,10 +37,12 @@ class DNSSpoof(scapyt.ScapyTest): requiresTor = False
def setUp(self): - self.resolverAddr, self.resolverPort = self.localOptions['resolver'].split(':') + self.resolverAddr, self.resolverPort = self.localOptions[ + 'resolver'].split(':') self.resolverPort = int(self.resolverPort)
- self.controlResolverAddr, self.controlResolverPort = self.localOptions['backend'].split(':') + self.controlResolverAddr, self.controlResolverPort = self.localOptions[ + 'backend'].split(':') self.controlResolverPort = int(self.controlResolverPort)
self.hostname = self.localOptions['hostname'] @@ -51,28 +54,36 @@ class DNSSpoof(scapyt.ScapyTest): """ try: test_answer = report['test_a_lookup']['answered_packets'][0][1] - control_answer = report['test_control_a_lookup']['answered_packets'][0][1] + control_answer = report['test_control_a_lookup'][ + 'answered_packets'][0][1] except IndexError: self.report['spoofing'] = 'no_answer' return
if test_answer[UDP] == control_answer[UDP]: - self.report['spoofing'] = True + self.report['spoofing'] = True else: self.report['spoofing'] = False return
@defer.inlineCallbacks def test_a_lookup(self): - question = IP(dst=self.resolverAddr)/UDP()/DNS(rd=1, - qd=DNSQR(qtype="A", qclass="IN", qname=self.hostname)) - log.msg("Performing query to %s with %s:%s" % (self.hostname, self.resolverAddr, self.resolverPort)) + question = IP(dst=self.resolverAddr)/UDP() + question /= DNS(rd=1, qd=DNSQR(qtype="A", + qclass="IN", + qname=self.hostname)) + log.msg( + "Performing query to %s with %s:%s" % + (self.hostname, self.resolverAddr, self.resolverPort)) yield self.sr1(question)
@defer.inlineCallbacks def test_control_a_lookup(self): - question = IP(dst=self.controlResolverAddr)/UDP()/DNS(rd=1, - qd=DNSQR(qtype="A", qclass="IN", qname=self.hostname)) - log.msg("Performing query to %s with %s:%s" % (self.hostname, - self.controlResolverAddr, self.controlResolverPort)) + question = IP(dst=self.controlResolverAddr)/UDP() / \ + DNS(rd=1, qd=DNSQR(qtype="A", qclass="IN", qname=self.hostname)) + log.msg( + "Performing query to %s with %s:%s" % + (self.hostname, + self.controlResolverAddr, + self.controlResolverPort)) yield self.sr1(question) diff --git a/ooni/nettests/manipulation/http_header_field_manipulation.py b/ooni/nettests/manipulation/http_header_field_manipulation.py index 3f1b4ed..8378924 100644 --- a/ooni/nettests/manipulation/http_header_field_manipulation.py +++ b/ooni/nettests/manipulation/http_header_field_manipulation.py @@ -13,6 +13,7 @@ from ooni.utils import log, net, randomStr from ooni.templates import httpt from ooni.utils.trueheaders import TrueHeaders
+ def random_capitalization(string): output = "" original_string = string @@ -27,22 +28,27 @@ def random_capitalization(string): else: return output
+ class UsageOptions(usage.Options): optParameters = [ - ['backend', 'b', None, - 'URL of the backend to use for sending the requests'], - ['headers', 'h', None, - 'Specify a yaml formatted file from which to read the request headers to send'] - ] + ['backend', 'b', None, + 'URL of the backend to use for sending the requests'], + ['headers', 'h', None, + 'Specify a yaml formatted file from which to read ' + 'the request headers to send'] + ] +
class HTTPHeaderFieldManipulation(httpt.HTTPTest): + """ It performes HTTP requests with request headers that vary capitalization towards a backend. If the headers reported by the server differ from the ones we sent, then we have detected tampering. """ name = "HTTP Header Field Manipulation" - description = "Checks if the HTTP request the server sees is the same as the one that the client has created." + description = "Checks if the HTTP request the server " \ + "sees is the same as the one that the client has created." author = "Arturo Filastò" version = "0.1.4"
@@ -66,15 +72,20 @@ class HTTPHeaderFieldManipulation(httpt.HTTPTest): headers = yaml.safe_load(content) return headers else: - # XXX generate these from a random choice taken from whatheaders.com + # XXX generate these from a random choice taken from + # whatheaders.com # http://s3.amazonaws.com/data.whatheaders.com/whatheaders-latest.xml.zip - headers = {"User-Agent": [random.choice(net.userAgents)], + headers = { + "User-Agent": [ + random.choice( + net.userAgents)], "Accept": ["text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8"], "Accept-Encoding": ["gzip,deflate,sdch"], "Accept-Language": ["en-US,en;q=0.8"], "Accept-Charset": ["ISO-8859-1,utf-8;q=0.7,*;q=0.3"], - "Host": [randomStr(15)+'.com'] - } + "Host": [ + randomStr(15) + + '.com']} return headers
def get_random_caps_headers(self): @@ -143,20 +154,28 @@ class HTTPHeaderFieldManipulation(httpt.HTTPTest):
request_headers = TrueHeaders(self.request_headers) diff = request_headers.getDiff(TrueHeaders(response_headers_dict), - ignore=['Connection']) + ignore=['Connection']) if diff: self.report['tampering']['header_field_name'] = True else: self.report['tampering']['header_field_name'] = False self.report['tampering']['header_name_diff'] = list(diff) log.msg(" total: %(total)s" % self.report['tampering']) - log.msg(" request_line_capitalization: %(request_line_capitalization)s" % self.report['tampering']) - log.msg(" header_name_capitalization: %(header_name_capitalization)s" % self.report['tampering']) - log.msg(" header_field_value: %(header_field_value)s" % self.report['tampering']) - log.msg(" header_field_number: %(header_field_number)s" % self.report['tampering']) + log.msg( + " request_line_capitalization: %(request_line_capitalization)s" % + self.report['tampering']) + log.msg( + " header_name_capitalization: %(header_name_capitalization)s" % + self.report['tampering']) + log.msg( + " header_field_value: %(header_field_value)s" % + self.report['tampering']) + log.msg( + " header_field_number: %(header_field_number)s" % + self.report['tampering'])
def test_get_random_capitalization(self): self.request_method = random_capitalization("GET") self.request_headers = self.get_random_caps_headers() return self.doRequest(self.url, self.request_method, - headers=self.request_headers) + headers=self.request_headers) diff --git a/ooni/nettests/manipulation/http_host.py b/ooni/nettests/manipulation/http_host.py index 7207c10..d0387cb 100644 --- a/ooni/nettests/manipulation/http_host.py +++ b/ooni/nettests/manipulation/http_host.py @@ -11,11 +11,12 @@ import json from twisted.internet import defer from twisted.python import usage
-from ooni.utils import randomStr, randomSTR +from ooni.utils import randomStr
from ooni.utils import log from ooni.templates import httpt
+ class UsageOptions(usage.Options): optParameters = [['backend', 'b', None, 'URL of the test backend to use. Should be \ @@ -24,18 +25,21 @@ class UsageOptions(usage.Options): ['content', 'c', None, 'The file to read \ from containing the content of a block page']]
+ class HTTPHost(httpt.HTTPTest): + """ This test performs various manipulations of the HTTP Host header field and attempts to detect which filter bypassing strategies will work against the censor.
Usually this test should be run with a list of sites that are known to be - blocked inside of a particular network to assess which filter evasion strategies - will work. + blocked inside of a particular network to assess which filter evasion + strategies will work. """ name = "HTTP Host" - description = "Tests a variety of different filter bypassing techniques based on the HTTP Host header field." + description = "Tests a variety of different filter bypassing techniques " \ + "based on the HTTP Host header field." author = "Arturo Filastò" version = "0.2.4"
@@ -43,13 +47,13 @@ class HTTPHost(httpt.HTTPTest): usageOptions = UsageOptions
inputFile = ['file', 'f', None, - 'List of hostnames to test for censorship'] + 'List of hostnames to test for censorship']
requiredTestHelpers = {'backend': 'http-return-json-headers'} requiredOptions = ['backend'] requiresTor = False requiresRoot = False - + def setUp(self): self.report['transparent_http_proxy'] = False
@@ -81,7 +85,8 @@ class HTTPHost(httpt.HTTPTest): 'request_line' in content and \ 'headers_dict' in content: log.msg("Found the keys I expected in %s" % content) - self.report['transparent_http_proxy'] = self.report['transparent_http_proxy'] | False + self.report['transparent_http_proxy'] = self.report[ + 'transparent_http_proxy'] | False self.report[test_name] = False else: log.msg("Did not find the keys I expected in %s" % content) @@ -91,7 +96,7 @@ class HTTPHost(httpt.HTTPTest): censorship_page = open(self.localOptions['content']) response_page = iter(body.split("\n"))
- for censorship_line in censorship_page.xreadlines(): + for censorship_line in censorship_page: response_line = response_page.next() if response_line != censorship_line: self.report[test_name] = False @@ -104,7 +109,8 @@ class HTTPHost(httpt.HTTPTest): test_name = sys._getframe().f_code.co_name.replace('test_', '') headers = {} headers["Host"] = [self.input] - response = yield self.doRequest(self.localOptions['backend'], method="\nGET", + response = yield self.doRequest(self.localOptions['backend'], + method="\nGET", headers=headers) self.check_for_censorship(response.body, test_name)
@@ -159,4 +165,5 @@ class HTTPHost(httpt.HTTPTest): for x in fp.readlines(): yield x.strip().split('//')[-1].split('/')[0] fp.close() - else: pass + else: + pass diff --git a/ooni/nettests/manipulation/http_invalid_request_line.py b/ooni/nettests/manipulation/http_invalid_request_line.py index 6769269..b6ce2da 100644 --- a/ooni/nettests/manipulation/http_invalid_request_line.py +++ b/ooni/nettests/manipulation/http_invalid_request_line.py @@ -5,12 +5,17 @@ from ooni.utils import log from ooni.utils import randomStr, randomSTR from ooni.templates import tcpt
+ class UsageOptions(usage.Options): - optParameters = [['backend', 'b', None, - 'The OONI backend that runs a TCP echo server'], - ['backendport', 'p', 80, 'Specify the port that the TCP echo server is running (should only be set for debugging)']] + optParameters = [ + ['backend', 'b', None, 'The OONI backend that runs a TCP echo server'], + ['backendport', 'p', 80, + 'Specify the port that the TCP echo server is running ' + '(should only be set for debugging)']] +
class HTTPInvalidRequestLine(tcpt.TCPTest): + """ The goal of this test is to do some very basic and not very noisy fuzzing on the HTTP request line. We generate a series of requests that are not @@ -20,7 +25,8 @@ class HTTPInvalidRequestLine(tcpt.TCPTest): ascii letters or numbers ('XxXx' will be 4). """ name = "HTTP Invalid Request Line" - description = "Performs out of spec HTTP requests in the attempt to trigger a proxy error message." + description = "Performs out of spec HTTP requests in the attempt to "\ + "trigger a proxy error message." version = "0.2" authors = "Arturo Filastò"
@@ -109,4 +115,3 @@ class HTTPInvalidRequestLine(tcpt.TCPTest): d = self.sendPayload(payload) d.addCallback(self.check_for_manipulation, payload) return d - diff --git a/ooni/nettests/manipulation/traceroute.py b/ooni/nettests/manipulation/traceroute.py index c6e167c..a561281 100644 --- a/ooni/nettests/manipulation/traceroute.py +++ b/ooni/nettests/manipulation/traceroute.py @@ -1,30 +1,32 @@ # -*- encoding: utf-8 -*-
from twisted.python import usage -from twisted.internet import defer, reactor
from ooni.templates import scapyt -from itertools import chain
-from scapy.all import * - -from ooni.utils import log from ooni.utils.txscapy import MPTraceroute from ooni.settings import config
+ class UsageOptions(usage.Options): optParameters = [ ['backend', 'b', None, 'Test backend to use'], ['timeout', 't', 5, 'The timeout for the traceroute test'], - ['maxttl', 'm', 30, 'The maximum value of ttl to set on packets'], - ['dstport', 'd', None, 'Specify a single destination port. May be repeated.'], - ['interval', 'i', None, 'Specify the inter-packet delay in seconds'], - ['numPackets', 'n', None, 'Specify the number of packets to send per hop'], - ] + ['maxttl', 'm', 30, + 'The maximum value of ttl to set on packets'], + ['dstport', 'd', None, + 'Specify a single destination port. May be repeated.'], + ['interval', 'i', None, + 'Specify the inter-packet delay in seconds'], + ['numPackets', 'n', None, + 'Specify the number of packets to send per hop'], + ] +
class Traceroute(scapyt.BaseScapyTest): name = "Traceroute" - description = "Performs a UDP, TCP, ICMP traceroute with destination port number set to 0, 22, 23, 53, 80, 123, 443, 8080 and 65535" + description = "Performs a UDP, TCP, ICMP traceroute with destination port number "\ + "set to 0, 22, 23, 53, 80, 123, 443, 8080 and 65535"
requiredTestHelpers = {'backend': 'traceroute'} requiresRoot = True @@ -45,8 +47,10 @@ class Traceroute(scapyt.BaseScapyTest):
config.scapyFactory.registerProtocol(self.st)
- self.report['test_tcp_traceroute'] = dict([('hops_%d' % d,[]) for d in self.dst_ports]) - self.report['test_udp_traceroute'] = dict([('hops_%d' % d,[]) for d in self.dst_ports]) + self.report['test_tcp_traceroute'] = dict( + [('hops_%d' % d, []) for d in self.dst_ports]) + self.report['test_udp_traceroute'] = dict( + [('hops_%d' % d, []) for d in self.dst_ports]) self.report['test_icmp_traceroute'] = {'hops': []}
def test_icmp_traceroute(self): @@ -70,25 +74,26 @@ class Traceroute(scapyt.BaseScapyTest): self.report['answered_packets'].extend(packet)
for ttl in xrange(self.st.ttl_min, self.st.ttl_max): - matchedPackets = filter(lambda x: x.ttl == ttl, self.st.matched_packets.keys()) + matchedPackets = filter( + lambda x: x.ttl == ttl, + self.st.matched_packets.keys()) for packet in matchedPackets: for response in self.st.matched_packets[packet]: self.addToReport(packet, response) return self.report
def addToReport(self, packet, response): - p = {6: 'tcp', 17: 'udp', 1: 'icmp'} if packet.proto == 1: - self.report['test_icmp_traceroute']['hops'].append({'ttl': packet.ttl, - 'rtt': response.time - packet.time, - 'address': response.src}) + self.report['test_icmp_traceroute']['hops'].append( + {'ttl': packet.ttl, 'rtt': response.time - packet.time, + 'address': response.src}) elif packet.proto == 6: - self.report['test_tcp_traceroute']['hops_%s' % packet.dport].append({'ttl': packet.ttl, - 'rtt': response.time - packet.time, - 'address': response.src, - 'sport': response.sport}) + self.report['test_tcp_traceroute'][ + 'hops_%s' % packet.dport].append( + {'ttl': packet.ttl, 'rtt': response.time - packet.time, + 'address': response.src, 'sport': response.sport}) else: - self.report['test_udp_traceroute']['hops_%s' % packet.dport].append({'ttl': packet.ttl, - 'rtt': response.time - packet.time, - 'address': response.src, - 'sport': response.sport}) + self.report['test_udp_traceroute'][ + 'hops_%s' % packet.dport].append( + {'ttl': packet.ttl, 'rtt': response.time - packet.time, + 'address': response.src, 'sport': response.sport})