commit 12726ca463d9e68e93d49fcb418421648d054744 Author: Arturo Filastò art@fuffa.org Date: Fri Nov 9 22:19:44 2012 +0100
Completely rewrite the txscapy. * It is now much cleaner and does not start the packet capture in a separate thread. It subclasses the twisted filedescriptor protocol and returns a deferred that will fire the callback with the packets it received and the ones it considers answers to the request. --- nettests/bridge_reachability/echo.py | 2 +- nettests/core/chinatrigger.py | 9 ++- ooni/templates/scapyt.py | 105 +++------------------------ ooni/utils/txscapy.py | 133 ++++++++++++++++++++++++++++++---- 4 files changed, 137 insertions(+), 112 deletions(-)
diff --git a/nettests/bridge_reachability/echo.py b/nettests/bridge_reachability/echo.py index 542e017..5060ffd 100644 --- a/nettests/bridge_reachability/echo.py +++ b/nettests/bridge_reachability/echo.py @@ -164,5 +164,5 @@ class EchoTest(BaseScapyTest): raise IfaceError("Could not find a working network interface.")
def test_icmp(self): - self.sr(IP(dst=self.input)/ICMP()) + return self.sr(IP(dst=self.input)/ICMP())
diff --git a/nettests/core/chinatrigger.py b/nettests/core/chinatrigger.py index 53fadb9..de1f64d 100644 --- a/nettests/core/chinatrigger.py +++ b/nettests/core/chinatrigger.py @@ -23,6 +23,7 @@ class ChinaTriggerTest(BaseScapyTest): name = "chinatrigger" usageOptions = UsageOptions requiredOptions = ['dst', 'port'] + timeout = 2
def setUp(self): self.dst = self.localOptions['dst'] @@ -47,7 +48,7 @@ class ChinaTriggerTest(BaseScapyTest): def set_random_field(pkt): ret = pkt[:15] for i in range(28): - ret += chr(random.randint(0, 256)) + ret += chr(random.randint(0, 255)) ret += pkt[15+28:] return ret
@@ -57,9 +58,9 @@ class ChinaTriggerTest(BaseScapyTest): Slightly changed mutate function. """ ret = pkt[:idx-1] - mutation = chr(random.randint(0, 256)) + mutation = chr(random.randint(0, 255)) while mutation == pkt[idx]: - mutation = chr(random.randint(0, 256)) + mutation = chr(random.randint(0, 255)) ret += mutation ret += pkt[idx:] return ret @@ -103,5 +104,5 @@ class ChinaTriggerTest(BaseScapyTest): for x in range(len(pkt)): mutation = IP(dst=self.dst)/TCP(dport=self.port)/ChinaTriggerTest.mutate(pkt, x) pkts.append(mutation) - self.send(pkts) + return self.sr(pkts, timeout=2)
diff --git a/ooni/templates/scapyt.py b/ooni/templates/scapyt.py index a1d2969..4c18f0a 100644 --- a/ooni/templates/scapyt.py +++ b/ooni/templates/scapyt.py @@ -14,7 +14,7 @@ from scapy.all import send, sr, IP, TCP from ooni.nettest import NetTestCase from ooni.utils import log
-from ooni.lib.txscapy import TXScapy +from ooni.utils.txscapy import ScapyProtocol
def createPacketReport(packet_list): """ @@ -42,106 +42,25 @@ class BaseScapyTest(NetTestCase):
requiresRoot = True
- sentPackets = [] - answeredPackets = [] - - def sr(self, pkts, *arg, **kw): + def sr(self, packets, *arg, **kw): """ Wrapper around scapy.sendrecv.sr for sending and receiving of packets at layer 3. """ - answered_packets, unanswered = sr(pkts, *arg, **kw) - self.report['answered_packets'] = createPacketReport(answered_packets) - self.report['sent_packets'] = createPacketReport(pkts) - return (answered_packets, sent_packets) + def finished(result): + answered, unanswered = result + sent_packets, received_packets = answered + self.report['answered_packets'] = createPacketReport(received_packets) + self.report['sent_packets'] = createPacketReport(sent_packets) + + scapyProtocol = ScapyProtocol(*arg, **kw) + d = scapyProtocol.startSending(packets) + return d
def send(self, pkts, *arg, **kw): """ Wrapper around scapy.sendrecv.send for sending of packets at layer 3 """ - sent_packets = send(pkts, *arg, **kw) - self.report['sent_packets'] = createPacketReport(pkts) - return sent_packets - -class TXScapyTest(BaseScapyTest): - """ - A utility class for writing scapy driven OONI tests. - - * pcapfile: specify where to store the logged pcapfile - - * timeout: timeout in ms of when we should stop waiting to receive packets - - * receive: if we should also receive packets and not just send - - XXX This is currently not working - """ - name = "TX Scapy Test" - version = 0.1 - - receive = True - timeout = 1 - pcapfile = 'packet_capture.pcap' - packet = IP()/TCP() - reactor = None - - answered = None - unanswered = None - - def processInputs(self): - """ - Place here the logic for validating and processing of inputs and - command line arguments. - """ - pass - - def tearDown(self): - log.debug("Tearing down reactor") - - def finished(self, *arg): - log.debug("Calling final close") - - self.questions = self.txscapy.questions - self.answers = self.txscapy.answers + raise Exception("Not implemented")
- log.debug("These are the questions: %s" % self.questions) - log.debug("These are the answers: %s" % self.answers) - - self.txscapy.finalClose() - - def sendReceivePackets(self): - packets = self.buildPackets() - - log.debug("Sending and receiving %s" % packets) - - self.txscapy = TXScapy(packets, pcapfile=self.pcapfile, - timeout=self.timeout, reactor=self.reactor) - - self.txscapy.sr(packets, pcapfile=self.pcapfile, - timeout=self.timeout, reactor=self.reactor) - - d = self.txscapy.deferred - d.addCallback(self.finished) - - return d - - def sendPackets(self): - log.debug("Sending and receiving of packets %s" % packets) - - packets = self.buildPackets() - - self.txscapy = TXScapy(packets, pcapfile=self.pcapfile, - timeout=self.timeout, reactor=self.reactor) - - self.txscapy.send(packets, reactor=self.reactor).deferred - - d = self.txscapy.deferred - d.addCallback(self.finished) - - return d - - def buildPackets(self): - """ - Override this method to build scapy packets. - """ - pass
diff --git a/ooni/utils/txscapy.py b/ooni/utils/txscapy.py index a3a5610..2559d19 100644 --- a/ooni/utils/txscapy.py +++ b/ooni/utils/txscapy.py @@ -12,29 +12,134 @@ import os import sys import time
-from twisted.internet import protocol, base, fdesc, error, defer -from twisted.internet import reactor, threads +from twisted.internet import protocol, base, fdesc +from twisted.internet import reactor, threads, error +from twisted.internet import defer, abstract from zope.interface import implements
-from scapy.all import Gen -from scapy.all import SetGen - -from ooni.utils import log
from scapy.all import PcapWriter, MTU from scapy.all import BasePacketList, conf, PcapReader
+from scapy.all import conf, Gen, SetGen + +from ooni.utils import log + class TXPcapWriter(PcapWriter): def __init__(self, *arg, **kw): PcapWriter.__init__(self, *arg, **kw) fdesc.setNonBlocking(self.f)
-def txSniff(count=0, store=1, offline=None, - prn = None, lfilter=None, - L2socket=None, timeout=None, - opened_socket=None, stop_filter=None, - *arg, **karg): - """ - XXX we probably want to rewrite the scapy sniff function to better suite our needs. - """ +class ScapyProtocol(abstract.FileDescriptor): + def __init__(self, super_socket=None, + reactor=None, timeout=None, receive=True): + abstract.FileDescriptor.__init__(self, reactor) + # By default we use the conf.L3socket + if not super_socket: + super_socket = conf.L3socket() + self.super_socket = super_socket + + self.timeout = timeout + + # This dict is used to store the unique hashes that allow scapy to + # match up request with answer + self.hr_sent_packets = {} + + # These are the packets we have received as answer to the ones we sent + self.answered_packets = [] + + # These are the packets we send + self.sent_packets = [] + + # This deferred will fire when we have finished sending a receiving packets. + self.d = defer.Deferred() + self.debug = False + self.multi = False + # XXX this needs to be implemented. It would involve keeping track of + # the state of the sending via the super socket file descriptor and + # firing the callback when we have concluded sending. Check out + # twisted.internet.udp to see how this is done. + self.receive = receive + + def fileno(self): + return self.super_socket.ins.fileno() + + def processPacket(self, packet): + """ + Hook useful for processing packets as they come in. + """ + + def processAnswer(self, packet, answer_hr): + log.debug("Got an answer processing it") + for i in range(len(answer_hr)): + if packet.answers(answer_hr[i]): + self.answered_packets.append((answer_hr[i], packet)) + if self.debug: + print packet.src, packet.ttl + #answer.show() + + if not self.multi: + del(answer_hr[i]) + break + if len(self.answered_packets) == len(self.sent_packets): + # All of our questions have been answered. + self.stopSending() + + def doRead(self): + timeout = time.time() - self._start_time + log.debug("Checking for timeout %s > %s" % (timeout, self.timeout)) + if self.timeout and time.time() - self._start_time > self.timeout: + self.stopSending() + packet = self.super_socket.recv() + if packet: + self.processPacket(packet) + # A string that has the same value for the request than for the + # response. + hr = packet.hashret() + if hr in self.hr_sent_packets: + answer_hr = self.hr_sent_packets[hr] + self.processAnswer(packet, answer_hr) + + def stopSending(self): + self.stopReading() + self.super_socket.close() + if hasattr(self, "d"): + result = (self.answered_packets, self.sent_packets) + self.d.callback(result) + del self.d + + def write(self, packet): + """ + Write a scapy packet to the wire + """ + hashret = packet.hashret() + if hashret in self.hr_sent_packets: + self.hr_sent_packets[hashret].append(packet) + else: + self.hr_sent_packets[hashret] = [packet] + self.sent_packets.append(packet) + return self.super_socket.send(packet) + + def sendPackets(self, packets): + if not isinstance(packets, Gen): + packets = SetGen(packets) + for packet in packets: + self.write(packet) + + def startSending(self, packets): + self._start_time = time.time() + self.startReading() + self.sendPackets(packets) + return self.d + +def sr(x, filter=None, iface=None, nofilter=0, timeout=None): + super_socket = conf.L3socket(filter=filter, iface=iface, nofilter=nofilter) + sp = ScapyProtocol(super_socket=super_socket, timeout=timeout) + return sp.startSending(x) + +def send(x, filter=None, iface=None, nofilter=0, timeout=None): + super_socket = conf.L3socket(filter=filter, iface=iface, nofilter=nofilter) + sp = ScapyProtocol(super_socket=super_socket, timeout=timeout) + return sp.startSending(x) +