[tor-commits] [ooni-probe/master] Prove how easy it is to port old test to the new model

art at torproject.org art at torproject.org
Thu May 31 03:01:43 UTC 2012


commit 5028911580a1c003f4dbf1dff0bd8043c528ef72
Author: Arturo Filastò <hellais at torproject.org>
Date:   Thu May 31 03:32:04 2012 +0200

    Prove how easy it is to port old test to the new model
---
 ooni/plugins/httphost.py |  121 ++++++++++++++++++++++++++++++++++++++++++++++
 1 files changed, 121 insertions(+), 0 deletions(-)

diff --git a/ooni/plugins/httphost.py b/ooni/plugins/httphost.py
new file mode 100644
index 0000000..17e2e11
--- /dev/null
+++ b/ooni/plugins/httphost.py
@@ -0,0 +1,121 @@
+"""
+    HTTP Host based filtering
+    *************************
+
+    This test detect HTTP Host field
+    based filtering.
+"""
+import os
+from datetime import datetime
+
+import urllib2
+import httplib
+
+from BeautifulSoup import BeautifulSoup
+
+# XXX reduce boilerplating.
+from zope.interface import implements
+from twisted.python import usage
+from twisted.plugin import IPlugin
+
+from ooni.plugoo.assets import Asset
+from ooni.plugoo.tests import ITest, TwistedTest
+
+class HTTPHostArgs(usage.Options):
+    optParameters = [['asset', 'a', None, 'Asset file'],
+                     ['controlserver', 'c', None, 'Specify the control server'],
+                     ['resume', 'r', 0, 'Resume at this index'],
+                     ['other', 'o', None, 'Other arguments']]
+    def control(self, experiment_result, args):
+        print "Experiment Result:", experiment_result
+        print "Args", args
+        return experiment_result
+
+    def experiment(self, args):
+        import urllib
+        req = urllib.urlopen(args['asset'])
+        return {'page': req.readlines()}
+
+class HTTPHostAsset(Asset):
+    """
+    This is the asset that should be used by the Test. It will
+    contain all the code responsible for parsing the asset file
+    and should be passed on instantiation to the test.
+    """
+    def __init__(self, file=None):
+        self = Asset.__init__(self, file)
+
+    def parse_line(self, line):
+        return line.split(',')[1].replace('\n','')
+
+
+class HTTPHostTest(TwistedTest):
+    implements(IPlugin, ITest)
+
+    shortName = "httphost"
+    description = "HTTP Host plugin"
+    requirements = None
+    options = HTTPHostArgs
+    # Tells this to be blocking.
+    blocking = True
+
+    def check_response(self, response):
+        soup = BeautifulSoup(response)
+        if soup.head.title.string == "WikiLeaks":
+            # Response indicates censorship
+            return True
+        else:
+            # Response does not indicate censorship
+            return False
+
+    def is_censored(self, response):
+        if response:
+            soup = BeautifulSoup(response)
+            censored = self.check_response(response)
+        else:
+            censored = "unreachable"
+        return censored
+
+    def urllib2_test(self, control_server, host):
+        req = urllib2.Request(control_server)
+        req.add_header('Host', host)
+        try:
+            r = urllib2.urlopen(req)
+            response = r.read()
+            censored = self.is_censored(response)
+        except Exception, e:
+            censored = "Error! %s" % e
+
+        return censored
+
+    def httplib_test(self, control_server, host):
+        try:
+            conn = httplib.HTTPConnection(control_server)
+            conn.putrequest("GET", "", skip_host=True, skip_accept_encoding=True)
+            conn.putheader("Host", host)
+            conn.putheader("User-Agent", "Mozilla/5.0 (Windows; U; Windows NT 5.1; en-GB; rv:1.8.1.6")
+            conn.endheaders()
+            r = conn.getresponse()
+            response = r.read()
+            censored = self.is_censored(response)
+        except Exception, e:
+            censored = "Error! %s" % e
+            respopnse = None
+
+        return (censored, response)
+
+    def experiment(self, args):
+        control_server = self.local_options['controlserver']
+        print args
+        censored = self.httplib_test(control_server, args['asset'])
+        return {'control': control_server,
+                'host': args['asset'],
+                'censored': censored}
+
+    def load_assets(self):
+        if self.local_options:
+            return {'asset': Asset(self.local_options['asset'])}
+        else:
+            return {}
+
+httphost = HTTPHostTest(None, None, None)





More information about the tor-commits mailing list