[tor-commits] [ooni-probe/master] Completely rewrite the txscapy.

art at torproject.org art at torproject.org
Fri Nov 9 21:38:18 UTC 2012


commit 12726ca463d9e68e93d49fcb418421648d054744
Author: Arturo Filastò <art at fuffa.org>
Date:   Fri Nov 9 22:19:44 2012 +0100

    Completely rewrite the txscapy.
    * It is now much cleaner and does not start the packet capture in a separate thread.
      It subclasses the twisted filedescriptor protocol and returns a deferred that will
      fire the callback with the packets it received and the ones it considers answers
      to the request.
---
 nettests/bridge_reachability/echo.py |    2 +-
 nettests/core/chinatrigger.py        |    9 ++-
 ooni/templates/scapyt.py             |  105 +++------------------------
 ooni/utils/txscapy.py                |  133 ++++++++++++++++++++++++++++++----
 4 files changed, 137 insertions(+), 112 deletions(-)

diff --git a/nettests/bridge_reachability/echo.py b/nettests/bridge_reachability/echo.py
index 542e017..5060ffd 100644
--- a/nettests/bridge_reachability/echo.py
+++ b/nettests/bridge_reachability/echo.py
@@ -164,5 +164,5 @@ class EchoTest(BaseScapyTest):
             raise IfaceError("Could not find a working network interface.")
 
     def test_icmp(self):
-        self.sr(IP(dst=self.input)/ICMP())
+        return self.sr(IP(dst=self.input)/ICMP())
 
diff --git a/nettests/core/chinatrigger.py b/nettests/core/chinatrigger.py
index 53fadb9..de1f64d 100644
--- a/nettests/core/chinatrigger.py
+++ b/nettests/core/chinatrigger.py
@@ -23,6 +23,7 @@ class ChinaTriggerTest(BaseScapyTest):
     name = "chinatrigger"
     usageOptions = UsageOptions
     requiredOptions = ['dst', 'port']
+    timeout = 2
 
     def setUp(self):
         self.dst = self.localOptions['dst']
@@ -47,7 +48,7 @@ class ChinaTriggerTest(BaseScapyTest):
     def set_random_field(pkt):
         ret = pkt[:15]
         for i in range(28):
-            ret += chr(random.randint(0, 256))
+            ret += chr(random.randint(0, 255))
         ret += pkt[15+28:]
         return ret
 
@@ -57,9 +58,9 @@ class ChinaTriggerTest(BaseScapyTest):
         Slightly changed mutate function.
         """
         ret = pkt[:idx-1]
-        mutation = chr(random.randint(0, 256))
+        mutation = chr(random.randint(0, 255))
         while mutation == pkt[idx]:
-            mutation = chr(random.randint(0, 256))
+            mutation = chr(random.randint(0, 255))
         ret += mutation
         ret += pkt[idx:]
         return ret
@@ -103,5 +104,5 @@ class ChinaTriggerTest(BaseScapyTest):
         for x in range(len(pkt)):
             mutation = IP(dst=self.dst)/TCP(dport=self.port)/ChinaTriggerTest.mutate(pkt, x)
             pkts.append(mutation)
-        self.send(pkts)
+        return self.sr(pkts, timeout=2)
 
diff --git a/ooni/templates/scapyt.py b/ooni/templates/scapyt.py
index a1d2969..4c18f0a 100644
--- a/ooni/templates/scapyt.py
+++ b/ooni/templates/scapyt.py
@@ -14,7 +14,7 @@ from scapy.all import send, sr, IP, TCP
 from ooni.nettest import NetTestCase
 from ooni.utils import log
 
-from ooni.lib.txscapy import TXScapy
+from ooni.utils.txscapy import ScapyProtocol
 
 def createPacketReport(packet_list):
     """
@@ -42,106 +42,25 @@ class BaseScapyTest(NetTestCase):
 
     requiresRoot = True
 
-    sentPackets = []
-    answeredPackets = []
-
-    def sr(self, pkts, *arg, **kw):
+    def sr(self, packets, *arg, **kw):
         """
         Wrapper around scapy.sendrecv.sr for sending and receiving of packets
         at layer 3.
         """
-        answered_packets, unanswered = sr(pkts, *arg, **kw)
-        self.report['answered_packets'] = createPacketReport(answered_packets)
-        self.report['sent_packets'] = createPacketReport(pkts)
-        return (answered_packets, sent_packets)
+        def finished(result):
+            answered, unanswered = result
+            sent_packets, received_packets = answered
+            self.report['answered_packets'] = createPacketReport(received_packets)
+            self.report['sent_packets'] = createPacketReport(sent_packets)
+
+        scapyProtocol = ScapyProtocol(*arg, **kw)
+        d = scapyProtocol.startSending(packets)
+        return d
 
     def send(self, pkts, *arg, **kw):
         """
         Wrapper around scapy.sendrecv.send for sending of packets at layer 3
         """
-        sent_packets = send(pkts, *arg, **kw)
-        self.report['sent_packets'] = createPacketReport(pkts)
-        return sent_packets
-
-class TXScapyTest(BaseScapyTest):
-    """
-    A utility class for writing scapy driven OONI tests.
-
-    * pcapfile: specify where to store the logged pcapfile
-
-    * timeout: timeout in ms of when we should stop waiting to receive packets
-
-    * receive: if we should also receive packets and not just send
-
-    XXX This is currently not working
-    """
-    name = "TX Scapy Test"
-    version = 0.1
-
-    receive = True
-    timeout = 1
-    pcapfile = 'packet_capture.pcap'
-    packet = IP()/TCP()
-    reactor = None
-
-    answered = None
-    unanswered = None
-
-    def processInputs(self):
-        """
-        Place here the logic for validating and processing of inputs and
-        command line arguments.
-        """
-        pass
-
-    def tearDown(self):
-        log.debug("Tearing down reactor")
-
-    def finished(self, *arg):
-        log.debug("Calling final close")
-
-        self.questions = self.txscapy.questions
-        self.answers = self.txscapy.answers
+        raise Exception("Not implemented")
 
-        log.debug("These are the questions: %s" % self.questions)
-        log.debug("These are the answers: %s" % self.answers)
-
-        self.txscapy.finalClose()
-
-    def sendReceivePackets(self):
-        packets = self.buildPackets()
-
-        log.debug("Sending and receiving %s" % packets)
-
-        self.txscapy = TXScapy(packets, pcapfile=self.pcapfile,
-                          timeout=self.timeout, reactor=self.reactor)
-
-        self.txscapy.sr(packets, pcapfile=self.pcapfile,
-                 timeout=self.timeout, reactor=self.reactor)
-
-        d = self.txscapy.deferred
-        d.addCallback(self.finished)
-
-        return d
-
-    def sendPackets(self):
-        log.debug("Sending and receiving of packets %s" % packets)
-
-        packets = self.buildPackets()
-
-        self.txscapy = TXScapy(packets, pcapfile=self.pcapfile,
-                          timeout=self.timeout, reactor=self.reactor)
-
-        self.txscapy.send(packets, reactor=self.reactor).deferred
-
-        d = self.txscapy.deferred
-        d.addCallback(self.finished)
-
-        return d
-
-    def buildPackets(self):
-        """
-        Override this method to build scapy packets.
-        """
-        pass
 
diff --git a/ooni/utils/txscapy.py b/ooni/utils/txscapy.py
index a3a5610..2559d19 100644
--- a/ooni/utils/txscapy.py
+++ b/ooni/utils/txscapy.py
@@ -12,29 +12,134 @@ import os
 import sys
 import time
 
-from twisted.internet import protocol, base, fdesc, error, defer
-from twisted.internet import reactor, threads
+from twisted.internet import protocol, base, fdesc
+from twisted.internet import reactor, threads, error
+from twisted.internet import defer, abstract
 from zope.interface import implements
 
-from scapy.all import Gen
-from scapy.all import SetGen
-
-from ooni.utils import log
 
 from scapy.all import PcapWriter, MTU
 from scapy.all import BasePacketList, conf, PcapReader
 
+from scapy.all import conf, Gen, SetGen
+
+from ooni.utils import log
+
 class TXPcapWriter(PcapWriter):
     def __init__(self, *arg, **kw):
         PcapWriter.__init__(self, *arg, **kw)
         fdesc.setNonBlocking(self.f)
 
-def txSniff(count=0, store=1, offline=None, 
-        prn = None, lfilter=None,
-        L2socket=None, timeout=None, 
-        opened_socket=None, stop_filter=None,
-        *arg, **karg):
-    """
-    XXX we probably want to rewrite the scapy sniff function to better suite our needs.
-    """
+class ScapyProtocol(abstract.FileDescriptor):
+    def __init__(self, super_socket=None, 
+            reactor=None, timeout=None, receive=True):
+        abstract.FileDescriptor.__init__(self, reactor)
+        # By default we use the conf.L3socket
+        if not super_socket:
+            super_socket = conf.L3socket()
+        self.super_socket = super_socket
+
+        self.timeout = timeout
+
+        # This dict is used to store the unique hashes that allow scapy to
+        # match up request with answer
+        self.hr_sent_packets = {}
+
+        # These are the packets we have received as answer to the ones we sent
+        self.answered_packets = []
+
+        # These are the packets we send
+        self.sent_packets = []
+
+        # This deferred will fire when we have finished sending a receiving packets.
+        self.d = defer.Deferred()
+        self.debug = False
+        self.multi = False
+        # XXX this needs to be implemented. It would involve keeping track of
+        # the state of the sending via the super socket file descriptor and
+        # firing the callback when we have concluded sending. Check out
+        # twisted.internet.udp to see how this is done.
+        self.receive = receive
+
+    def fileno(self):
+        return self.super_socket.ins.fileno()
+
+    def processPacket(self, packet):
+        """
+        Hook useful for processing packets as they come in.
+        """
+
+    def processAnswer(self, packet, answer_hr):
+        log.debug("Got an answer processing it")
+        for i in range(len(answer_hr)):
+            if packet.answers(answer_hr[i]):
+                self.answered_packets.append((answer_hr[i], packet))
+                if self.debug:
+                    print packet.src, packet.ttl
+                    #answer.show()
+
+                if not self.multi:
+                    del(answer_hr[i])
+                break
+        if len(self.answered_packets) == len(self.sent_packets):
+            # All of our questions have been answered.
+            self.stopSending()
+
+    def doRead(self):
+        timeout = time.time() - self._start_time
+        log.debug("Checking for timeout %s > %s" % (timeout, self.timeout))
+        if self.timeout and time.time() - self._start_time > self.timeout:
+            self.stopSending()
+        packet = self.super_socket.recv()
+        if packet:
+            self.processPacket(packet)
+            # A string that has the same value for the request than for the
+            # response.
+            hr = packet.hashret()
+            if hr in self.hr_sent_packets:
+                answer_hr = self.hr_sent_packets[hr]
+                self.processAnswer(packet, answer_hr)
+
+    def stopSending(self):
+        self.stopReading()
+        self.super_socket.close()
+        if hasattr(self, "d"):
+            result = (self.answered_packets, self.sent_packets)
+            self.d.callback(result)
+            del self.d
+
+    def write(self, packet):
+        """
+        Write a scapy packet to the wire
+        """
+        hashret = packet.hashret()
+        if hashret in self.hr_sent_packets:
+            self.hr_sent_packets[hashret].append(packet)
+        else:
+            self.hr_sent_packets[hashret] = [packet]
+        self.sent_packets.append(packet)
+        return self.super_socket.send(packet)
+
+    def sendPackets(self, packets):
+        if not isinstance(packets, Gen):
+            packets = SetGen(packets)
+        for packet in packets:
+            self.write(packet)
+
+    def startSending(self, packets):
+        self._start_time = time.time()
+        self.startReading()
+        self.sendPackets(packets)
+        return self.d
+
+def sr(x, filter=None, iface=None, nofilter=0, timeout=None):
+    super_socket = conf.L3socket(filter=filter, iface=iface, nofilter=nofilter)
+    sp = ScapyProtocol(super_socket=super_socket, timeout=timeout)
+    return sp.startSending(x)
+
+def send(x, filter=None, iface=None, nofilter=0, timeout=None):
+    super_socket = conf.L3socket(filter=filter, iface=iface, nofilter=nofilter)
+    sp = ScapyProtocol(super_socket=super_socket, timeout=timeout)
+    return sp.startSending(x)
+
 



More information about the tor-commits mailing list