commit eca91753ab98118190626f1dbb6b583b54ec18d5 Author: kudrom kudrom@riseup.net Date: Sun Aug 17 22:04:24 2014 +0200
Fixed dns_spoof test --- ooni/nettests/manipulation/dns_spoof.py | 48 +++++++++++++------------------ 1 file changed, 20 insertions(+), 28 deletions(-)
diff --git a/ooni/nettests/manipulation/dns_spoof.py b/ooni/nettests/manipulation/dns_spoof.py index aa464d4..a73ad6e 100644 --- a/ooni/nettests/manipulation/dns_spoof.py +++ b/ooni/nettests/manipulation/dns_spoof.py @@ -37,53 +37,45 @@ class DNSSpoof(scapyt.ScapyTest): requiresTor = False
def setUp(self): - self.resolverAddr, self.resolverPort = self.localOptions[ - 'resolver'].split(':') + self.resolverAddr, self.resolverPort = self.localOptions['resolver'].split(':') self.resolverPort = int(self.resolverPort)
- self.controlResolverAddr, self.controlResolverPort = self.localOptions[ - 'backend'].split(':') + self.controlResolverAddr, self.controlResolverPort = self.localOptions['backend'].split(':') self.controlResolverPort = int(self.controlResolverPort)
self.hostname = self.localOptions['hostname']
- def postProcessor(self, report): + def postProcessor(self, measurements): """ This is not tested, but the concept is that if the two responses match up then spoofing is occuring. """ try: - test_answer = report['test_a_lookup']['answered_packets'][0][1] - control_answer = report['test_control_a_lookup'][ - 'answered_packets'][0][1] + test_answer = self.report['answered_packets'][0][UDP] + control_answer = self.report['answered_packets'][1][UDP] except IndexError: self.report['spoofing'] = 'no_answer' - return - - if test_answer[UDP] == control_answer[UDP]: - self.report['spoofing'] = True else: - self.report['spoofing'] = False - return + if test_answer == control_answer: + self.report['spoofing'] = False + else: + self.report['spoofing'] = True + return self.report
@defer.inlineCallbacks def test_a_lookup(self): - question = IP(dst=self.resolverAddr)/UDP() - question /= DNS(rd=1, qd=DNSQR(qtype="A", - qclass="IN", - qname=self.hostname)) - log.msg( - "Performing query to %s with %s:%s" % - (self.hostname, self.resolverAddr, self.resolverPort)) + question = IP(dst=self.resolverAddr) / \ + UDP() / \ + DNS(rd=1, qd=DNSQR(qtype="A", qclass="IN", qname=self.hostname)) + log.msg("Performing query to %s with %s:%s" % + (self.hostname, self.resolverAddr, self.resolverPort)) yield self.sr1(question)
@defer.inlineCallbacks def test_control_a_lookup(self): - question = IP(dst=self.controlResolverAddr)/UDP() / \ - DNS(rd=1, qd=DNSQR(qtype="A", qclass="IN", qname=self.hostname)) - log.msg( - "Performing query to %s with %s:%s" % - (self.hostname, - self.controlResolverAddr, - self.controlResolverPort)) + question = IP(dst=self.controlResolverAddr) / \ + UDP() / \ + DNS(rd=1, qd=DNSQR(qtype="A", qclass="IN", qname=self.hostname)) + log.msg("Performing query to %s with %s:%s" % + (self.hostname, self.controlResolverAddr, self.controlResolverPort)) yield self.sr1(question)