[tor-commits] [ooni-probe/master] Refactor the HTTP host test and improve the data format

art at torproject.org art at torproject.org
Fri Jan 10 13:48:29 UTC 2014


commit 951e244f92644bcf1b35c5ef6fc189137c9d7fa5
Author: Arturo Filastò <art at fuffa.org>
Date:   Fri Jan 10 14:37:31 2014 +0100

    Refactor the HTTP host test and improve the data format
---
 ooni/nettest.py                         |    3 +
 ooni/nettests/manipulation/http_host.py |  151 ++++++++++++++++---------------
 2 files changed, 82 insertions(+), 72 deletions(-)

diff --git a/ooni/nettest.py b/ooni/nettest.py
index 513c11f..526de5d 100644
--- a/ooni/nettest.py
+++ b/ooni/nettest.py
@@ -488,6 +488,9 @@ class NetTest(object):
         net_test_loader:
              an instance of :class:ooni.nettest.NetTestLoader containing
              the test to be run.
+
+        report:
+            an instance of :class:ooni.reporter.Reporter
         """
         self.report = report
         self.testCases = net_test_loader.testCases
diff --git a/ooni/nettests/manipulation/http_host.py b/ooni/nettests/manipulation/http_host.py
index 4adb1d3..5e77415 100644
--- a/ooni/nettests/manipulation/http_host.py
+++ b/ooni/nettests/manipulation/http_host.py
@@ -6,7 +6,9 @@
 # :authors: Arturo Filastò
 # :licence: see LICENSE
 
+import sys
 import json
+from twisted.internet import defer
 from twisted.python import usage
 
 from ooni.utils import randomStr, randomSTR
@@ -24,19 +26,18 @@ class UsageOptions(usage.Options):
 
 class HTTPHost(httpt.HTTPTest):
     """
-    This test is aimed at detecting the presence of a transparent HTTP proxy
-    and enumerating the sites that are being censored by it.
-
-    It places inside of the Host header field the hostname of the site that is
-    to be tested for censorship and then determines if the probe is behind a
-    transparent HTTP proxy (because the response from the backend server does
-    not match) and if the site is censorsed, by checking if the page that it
-    got back matches the input block page.
+    This test performs various manipulations of the HTTP Host header field and
+    attempts to detect which filter bypassing strategies will work against the
+    censor.
+
+    Usually this test should be run with a list of sites that are known to be
+    blocked inside of a particular network to assess which filter evasion strategies
+    will work.
     """
     name = "HTTP Host"
     description = "Tests a variety of different filter bypassing techniques based on the HTTP Host header field."
     author = "Arturo Filastò"
-    version = "0.2.3"
+    version = "0.2.4"
 
     randomizeUA = False
     usageOptions = UsageOptions
@@ -46,67 +47,11 @@ class HTTPHost(httpt.HTTPTest):
 
     requiredTestHelpers = {'backend': 'http-return-json-headers'}
     requiredOptions = ['backend']
+    
+    def setUp(self):
+        self.report['transparent_http_proxy'] = False
 
-    def test_filtering_prepend_newline_to_method(self):
-        headers = {}
-        headers["Host"] = [self.input]
-        return self.doRequest(self.localOptions['backend'], method="\nGET",
-                headers=headers)
-
-    def test_filtering_add_tab_to_host(self):
-        headers = {}
-        headers["Host"] = [self.input + '\t']
-        return self.doRequest(self.localOptions['backend'],
-                headers=headers)
-
-    def test_filtering_of_subdomain(self):
-        headers = {}
-        headers["Host"] = [randomStr(10) + '.' + self.input]
-        return self.doRequest(self.localOptions['backend'],
-                headers=headers)
-
-    def test_filtering_via_fuzzy_matching(self):
-        headers = {}
-        headers["Host"] = [randomStr(10) + self.input + randomStr(10)]
-        return self.doRequest(self.localOptions['backend'],
-                headers=headers)
-
-    def test_send_host_header(self):
-        """
-        Stuffs the HTTP Host header field with the site to be tested for
-        censorship and does an HTTP request of this kind to our backend.
-
-        We randomize the HTTP User Agent headers.
-        """
-        headers = {}
-        headers["Host"] = [self.input]
-        return self.doRequest(self.localOptions['backend'],
-                headers=headers)
-
-    def check_for_censorship(self, body):
-        """
-        If we have specified what a censorship page looks like here we will
-        check if the page we are looking at matches it.
-
-        XXX this is not tested, though it is basically what was used to detect
-        censorship in the palestine case.
-        """
-        if self.localOptions['content']:
-            self.report['censored'] = True
-            censorship_page = open(self.localOptions['content'])
-            response_page = iter(body.split("\n"))
-
-            for censorship_line in censorship_page.xreadlines():
-                response_line = response_page.next()
-                if response_line != censorship_line:
-                    self.report['censored'] = False
-                    break
-
-            censorship_page.close()
-        else:
-            self.report['censored'] = None
-
-    def processResponseBody(self, body):
+    def check_for_censorship(self, body, test_name):
         """
         XXX this is to be filled in with either a domclass based classified or
         with a rule that will allow to detect that the body of the result is
@@ -134,12 +79,74 @@ class HTTPHost(httpt.HTTPTest):
                 'request_line' in content and \
                 'headers_dict' in content:
             log.msg("Found the keys I expected in %s" % content)
-            self.report['transparent_http_proxy'] = False
-            self.report['censored'] = False
+            self.report['transparent_http_proxy'] = self.report['transparent_http_proxy'] | False
+            self.report[test_name] = False
         else:
             log.msg("Did not find the keys I expected in %s" % content)
             self.report['transparent_http_proxy'] = True
-            self.check_for_censorship(body)
+            if self.localOptions['content']:
+                self.report[test_name] = True
+                censorship_page = open(self.localOptions['content'])
+                response_page = iter(body.split("\n"))
+
+                for censorship_line in censorship_page.xreadlines():
+                    response_line = response_page.next()
+                    if response_line != censorship_line:
+                        self.report[test_name] = False
+                        break
+
+                censorship_page.close()
+
+    @defer.inlineCallbacks
+    def test_filtering_prepend_newline_to_method(self):
+        test_name = sys._getframe().f_code.co_name.replace('test_', '')
+        headers = {}
+        headers["Host"] = [self.input]
+        response = yield self.doRequest(self.localOptions['backend'], method="\nGET",
+                                        headers=headers)
+        self.check_for_censorship(response.body, test_name)
+
+    @defer.inlineCallbacks
+    def test_filtering_add_tab_to_host(self):
+        test_name = sys._getframe().f_code.co_name.replace('test_', '')
+        headers = {}
+        headers["Host"] = [self.input + '\t']
+        response = yield self.doRequest(self.localOptions['backend'],
+                                        headers=headers)
+        self.check_for_censorship(response.body, test_name)
+
+    @defer.inlineCallbacks
+    def test_filtering_of_subdomain(self):
+        test_name = sys._getframe().f_code.co_name.replace('test_', '')
+        headers = {}
+        headers["Host"] = [randomStr(10) + '.' + self.input]
+        response = yield self.doRequest(self.localOptions['backend'],
+                                        headers=headers)
+        self.check_for_censorship(response.body, test_name)
+
+    @defer.inlineCallbacks
+    def test_filtering_via_fuzzy_matching(self):
+        test_name = sys._getframe().f_code.co_name.replace('test_', '')
+        headers = {}
+        headers["Host"] = [randomStr(10) + self.input + randomStr(10)]
+        response = yield self.doRequest(self.localOptions['backend'],
+                                        headers=headers)
+        self.check_for_censorship(response.body, test_name)
+
+    @defer.inlineCallbacks
+    def test_send_host_header(self):
+        """
+        Stuffs the HTTP Host header field with the site to be tested for
+        censorship and does an HTTP request of this kind to our backend.
+
+        We randomize the HTTP User Agent headers.
+        """
+        test_name = sys._getframe().f_code.co_name.replace('test_', '')
+        headers = {}
+        headers["Host"] = [self.input]
+        response = yield self.doRequest(self.localOptions['backend'],
+                                        headers=headers)
+        self.check_for_censorship(response.body, test_name)
 
     def inputProcessor(self, filename=None):
         """



More information about the tor-commits mailing list