commit 40f5789e926d9af0479bd1f879ad9ab6053f36c9 Merge: b9bc4f0 c027888 Author: Arturo Filastò art@fuffa.org Date: Wed Sep 11 18:20:01 2013 +0200
Merge remote-tracking branch 'thetorproject/master' into feature/move_nettests
* thetorproject/master: Add encoding format Add a version and author to dnsspoof Write unittest for testing https://github.com/TheTorProject/ooni-backend/pull/16 URL must be utf-8
ooni/deck.py | 2 +- ooni/nettests/manipulation/dnsspoof.py | 9 +++++++-- ooni/tests/test_oonibclient.py | 9 +++++++++ 3 files changed, 17 insertions(+), 3 deletions(-)
diff --cc ooni/nettests/manipulation/dnsspoof.py index c9120a4,0000000..598c41f mode 100644,000000..100644 --- a/ooni/nettests/manipulation/dnsspoof.py +++ b/ooni/nettests/manipulation/dnsspoof.py @@@ -1,70 -1,0 +1,75 @@@ ++# -*- encoding: utf-8 -*- ++# ++# :authors: Arturo Filastò ++# :licence: see LICENSE ++ +from twisted.internet import defer +from twisted.python import usage + +from scapy.all import IP, UDP, DNS, DNSQR + +from ooni.templates import scapyt +from ooni.utils import log + +class UsageOptions(usage.Options): + optParameters = [['resolver', 'r', None, + 'Specify the resolver that should be used for DNS queries (ip:port)'], + ['hostname', 'h', None, + 'Specify the hostname of a censored site'], + ['backend', 'b', '8.8.8.8:53', + 'Specify the IP address of a good DNS resolver (ip:port)'] + ] + + +class DNSSpoof(scapyt.ScapyTest): + name = "DNS Spoof" ++ author = "Arturo Filastò" ++ version = "0.0.1" + timeout = 2 + + usageOptions = UsageOptions + + requiredTestHelpers = {'backend': 'dns'} + requiredOptions = ['hostname', 'resolver'] + + def setUp(self): + self.resolverAddr, self.resolverPort = self.localOptions['resolver'].split(':') + self.resolverPort = int(self.resolverPort) + + self.controlResolverAddr, self.controlResolverPort = self.localOptions['backend'].split(':') + self.controlResolverPort = int(self.controlResolverPort) + + self.hostname = self.localOptions['hostname'] + + def postProcessor(self, report): + """ + This is not tested, but the concept is that if the two responses + match up then spoofing is occuring. + """ + try: + test_answer = report['test_a_lookup']['answered_packets'][0][1] + control_answer = report['test_control_a_lookup']['answered_packets'][0][1] + except IndexError: + self.report['spoofing'] = 'no_answer' + return + + if test_answer[UDP] == control_answer[UDP]: + self.report['spoofing'] = True + else: + self.report['spoofing'] = False + return + + @defer.inlineCallbacks + def test_a_lookup(self): + question = IP(dst=self.resolverAddr)/UDP()/DNS(rd=1, + qd=DNSQR(qtype="A", qclass="IN", qname=self.hostname)) + log.msg("Performing query to %s with %s:%s" % (self.hostname, self.resolverAddr, self.resolverPort)) + yield self.sr1(question) + + @defer.inlineCallbacks + def test_control_a_lookup(self): + question = IP(dst=self.controlResolverAddr)/UDP()/DNS(rd=1, + qd=DNSQR(qtype="A", qclass="IN", qname=self.hostname)) + log.msg("Performing query to %s with %s:%s" % (self.hostname, + self.controlResolverAddr, self.controlResolverPort)) + yield self.sr1(question) - -
tor-commits@lists.torproject.org