commit eeed21131f3b955176ad7d0c1502f6f4551cc72a
Author: Isis Lovecruft <isis(a)patternsinthevoid.net>
Date: Mon Jun 25 11:34:15 2012 -0700
Adding old dnstamper.py file so git doesn't wipe it. Don't include this commit.
---
ooni/oonitests/dnstamper.py | 200 +++++++++++++++++++++++++++++++++++++++++++
1 files changed, 200 insertions(+), 0 deletions(-)
diff --git a/ooni/oonitests/dnstamper.py b/ooni/oonitests/dnstamper.py
new file mode 100644
index 0000000..f73f494
--- /dev/null
+++ b/ooni/oonitests/dnstamper.py
@@ -0,0 +1,200 @@
+# -*- coding: utf-8 -*-
+"""
+ dnstamper
+ *********
+
+ This test resolves DNS for a list of domain names, one per line, in the
+ file specified in the ooni-config under the setting "dns_experiment". If
+ the file is top-1m.txt, the test will be run using Amazon's list of top
+ one million domains. The experimental dns servers to query should
+ be specified one per line in assets/dns_servers.txt.
+
+ The test reports censorship if the cardinality of the intersection of
+ the query result set from the control server and the query result set
+ from the experimental server is zero, which is to say, if the two sets
+ have no matching results whatsoever.
+
+ NOTE: This test frequently results in false positives due to GeoIP-based
+ load balancing on major global sites such as google, facebook, and
+ youtube, etc.
+
+ :copyright: (c) 2012 Arturo Filastò, Isis Lovecruft
+ :license: see LICENSE for more details
+
+ TODO:
+ * Switch to using Twisted's DNS builtins instead of dnspython
+ *
+"""
+
+import os
+
+from twisted.names import client
+from twisted.internet import reactor
+from twisted.internet.protocol import Factory, Protocol
+from twisted.python import usage
+from twisted.plugin import IPlugin
+from zope.interface import implements
+
+from ooni.plugoo.assets import Asset
+from ooni.plugoo.tests import ITest, OONITest
+from ooni import log
+
+class Top1MAsset(Asset):
+ """
+ Class for parsing the Alexa top-1m.txt as an asset.
+ """
+ def __init__(self, file=None):
+ self = Asset.__init__(self, file)
+
+ def parse_line(self, line):
+ self = Asset.parse_line(self, line)
+ return line.split(',')[1].replace('\n','')
+
+class DNSTamperAsset(Asset):
+ """
+ Creates DNS testing specific Assets.
+ """
+ def __init__(self, file=None):
+ self = Asset.__init__(self, file)
+
+class DNSTamperArgs(usage.Options):
+ optParameters = [['asset', 'a', None, 'Asset file of hostnames to resolve'],
+ ['controlserver', 'c', '8.8.8.8', 'Known good DNS server'],
+ ['testservers', 't', None, 'Asset file of the DNS servers to test'],
+ ['resume', 'r', 0, 'Resume at this index in the asset file']]
+'''
+ def control(self, experiment_result, args):
+ print "Experiment Result:", experiment_result
+ print "Args", args
+ return experiment_result
+
+ def experiment(self, args):
+'''
+
+class DNSTamperTest(OONITest):
+ implements(IPlugin, ITest)
+
+ shortName = "DNSTamper"
+ description = "DNS censorship detection test"
+ requirements = None
+ options = DNSTamperArgs
+ blocking = False
+
+ def load_assets(self):
+ if self.local_options:
+ if self.local_options['asset']:
+ assetf = self.local_options['asset']
+ if assetf == 'top-1m.txt':
+ return {'asset': Top1MAsset(assetf)}
+ else:
+ return {'asset': DNSTamperAsset(assetf)}
+ else:
+ return {}
+
+ def lookup(self, hostname, nameserver):
+ """
+ Resolves a hostname through a DNS nameserver to the corresponding
+ IP addresses.
+ """
+ def got_result(result):
+ #self.logger.log(result)
+ print result
+ reactor.stop()
+
+ def got_failure(failure):
+ failure.printTraceback()
+ reactor.stop()
+
+ res = client.createResolver(servers=[(nameserver, 53)])
+ d = res.getHostByName(hostname)
+ d.addCallbacks(got_result, got_failure)
+
+ ## XXX MAY ALSO BE:
+ #answer = res.getAddress(servers=[('nameserver', 53)])
+
+ ret = []
+
+ for data in answer:
+ ret.append(data.address)
+
+ return ret
+
+ def reverse_lookup(self, ip, nameserver):
+ """
+ Attempt to do a reverse DNS lookup to determine if the control and exp
+ sets from a positive result resolve to the same domain, in order to
+ remove false positives due to GeoIP load balancing.
+ """
+ res = client.createResolver(servers=nameserver)
+ n = reversename.from_address(ip)
+ revn = res.query(n, "PTR").__iter__().next().to_text()[:-1]
+
+ return revn
+
+ def experiment(self, *a, **kw):
+ """
+ Compares the lookup() sets of the control and experiment groups.
+ """
+ # this is just a dirty hack
+ address = kw['data'][0]
+ ns = kw['data'][1]
+
+ config = self.config
+ ctrl_ns = config.tests.dns_control_server
+
+ print "ADDRESS: %s" % address
+ print "NAMESERVER: %s" % ns
+
+ exp = self.lookup(address, ns)
+ control = self.lookup(address, ctrl_ns)
+
+ result = []
+
+ if len(set(exp) & set(control)) > 0:
+ print "Address %s has not tampered with on DNS server %s\n" % (address, ns)
+ result = (address, ns, exp, control, False)
+ return result
+ else:
+ print "Address %s has possibly been tampered on %s:\nDNS resolution through %s yeilds:\n%s\nAlthough the control group DNS servers resolve to:\n%s" % (address, ns, ns, exp, control)
+ result = (address, ns, exp, control, True)
+
+ if config.tests.dns_reverse_lookup:
+
+ exprevn = [self.reverse_lookup(ip, ns) for ip in exp]
+ ctrlrevn = [self.reverse_lookup(ip, ctrl_ns)
+ for ip in control]
+
+ if len(set(exprevn) & set(ctrlrevn)) > 0:
+ print "Further testing has eliminated this as a false positive."
+ else:
+ print "Reverse DNS on the results returned by %s returned:\n%s\nWhich does not match the expected domainname:\n%s\n" % (ns, exprevn, ctrlrevn)
+ return result
+
+ else:
+ print "\n"
+ return result
+
+#def run(ooni):
+# """
+# Run the test.
+# """
+# config = ooni.config
+# urls = []
+#
+# if (config.tests.dns_experiment == "top-1m.txt"):
+# dns_experiment = Top1MAsset(os.path.join(config.main.assetdir,
+# config.tests.dns_experiment))
+# else:
+# dns_experiment = DNSTAsset(os.path.join(config.main.assetdir,
+# config.tests.dns_experiment))
+# dns_experiment_dns = DNSTAsset(os.path.join(config.main.assetdir,
+# config.tests.dns_experiment_dns))
+#
+# assets = [dns_experiment, dns_experiment_dns]
+#
+# dnstest = DNST(ooni)
+# ooni.logger.info("Beginning dnstamper test...")
+# dnstest.run(assets, {'index': 1})
+# ooni.logger.info("Dnstamper test completed!")
+
+dnstamper = DNSTamperTest(None, None, None)