commit eca91753ab98118190626f1dbb6b583b54ec18d5
Author: kudrom <kudrom(a)riseup.net>
Date: Sun Aug 17 22:04:24 2014 +0200
Fixed dns_spoof test
---
ooni/nettests/manipulation/dns_spoof.py | 48 +++++++++++++------------------
1 file changed, 20 insertions(+), 28 deletions(-)
diff --git a/ooni/nettests/manipulation/dns_spoof.py b/ooni/nettests/manipulation/dns_spoof.py
index aa464d4..a73ad6e 100644
--- a/ooni/nettests/manipulation/dns_spoof.py
+++ b/ooni/nettests/manipulation/dns_spoof.py
@@ -37,53 +37,45 @@ class DNSSpoof(scapyt.ScapyTest):
requiresTor = False
def setUp(self):
- self.resolverAddr, self.resolverPort = self.localOptions[
- 'resolver'].split(':')
+ self.resolverAddr, self.resolverPort = self.localOptions['resolver'].split(':')
self.resolverPort = int(self.resolverPort)
- self.controlResolverAddr, self.controlResolverPort = self.localOptions[
- 'backend'].split(':')
+ self.controlResolverAddr, self.controlResolverPort = self.localOptions['backend'].split(':')
self.controlResolverPort = int(self.controlResolverPort)
self.hostname = self.localOptions['hostname']
- def postProcessor(self, report):
+ def postProcessor(self, measurements):
"""
This is not tested, but the concept is that if the two responses
match up then spoofing is occuring.
"""
try:
- test_answer = report['test_a_lookup']['answered_packets'][0][1]
- control_answer = report['test_control_a_lookup'][
- 'answered_packets'][0][1]
+ test_answer = self.report['answered_packets'][0][UDP]
+ control_answer = self.report['answered_packets'][1][UDP]
except IndexError:
self.report['spoofing'] = 'no_answer'
- return
-
- if test_answer[UDP] == control_answer[UDP]:
- self.report['spoofing'] = True
else:
- self.report['spoofing'] = False
- return
+ if test_answer == control_answer:
+ self.report['spoofing'] = False
+ else:
+ self.report['spoofing'] = True
+ return self.report
@defer.inlineCallbacks
def test_a_lookup(self):
- question = IP(dst=self.resolverAddr)/UDP()
- question /= DNS(rd=1, qd=DNSQR(qtype="A",
- qclass="IN",
- qname=self.hostname))
- log.msg(
- "Performing query to %s with %s:%s" %
- (self.hostname, self.resolverAddr, self.resolverPort))
+ question = IP(dst=self.resolverAddr) / \
+ UDP() / \
+ DNS(rd=1, qd=DNSQR(qtype="A", qclass="IN", qname=self.hostname))
+ log.msg("Performing query to %s with %s:%s" %
+ (self.hostname, self.resolverAddr, self.resolverPort))
yield self.sr1(question)
@defer.inlineCallbacks
def test_control_a_lookup(self):
- question = IP(dst=self.controlResolverAddr)/UDP() / \
- DNS(rd=1, qd=DNSQR(qtype="A", qclass="IN", qname=self.hostname))
- log.msg(
- "Performing query to %s with %s:%s" %
- (self.hostname,
- self.controlResolverAddr,
- self.controlResolverPort))
+ question = IP(dst=self.controlResolverAddr) / \
+ UDP() / \
+ DNS(rd=1, qd=DNSQR(qtype="A", qclass="IN", qname=self.hostname))
+ log.msg("Performing query to %s with %s:%s" %
+ (self.hostname, self.controlResolverAddr, self.controlResolverPort))
yield self.sr1(question)