[tor-commits] [ooni-probe/master] Write phws GFW triggering test as an OONI plugin.

art at torproject.org art at torproject.org
Sat Jun 16 14:28:03 UTC 2012


commit b816984e3832d8e2d21a67b29d23a2b48cfae962
Author: Arturo Filastò <art at torproject.org>
Date:   Sat Jun 16 16:30:15 2012 +0200

    Write phws GFW triggering test as an OONI plugin.
    * Import txscapy
---
 ooni/lib/txscapy.py          |  363 ++++++++++++++++++++++++++++++++++++++++++
 ooni/plugins/chinatrigger.py |  137 ++++++++++++++++
 ooni/protocols/scapy.py      |    8 +-
 3 files changed, 507 insertions(+), 1 deletions(-)

diff --git a/ooni/lib/txscapy.py b/ooni/lib/txscapy.py
new file mode 100644
index 0000000..8996f75
--- /dev/null
+++ b/ooni/lib/txscapy.py
@@ -0,0 +1,363 @@
+# -*- coding:utf8 -*-
+"""
+    txscapy
+    ******
+    (c) 2012 Arturo Filastò
+    a twisted wrapper for scapys send and receive functions.
+
+    This software has been written to be part of OONI, the Open Observatory of
+    Network Interference. More information on that here: http://ooni.nu/
+
+"""
+
+import struct
+import socket
+import os
+import sys
+import time
+
+from twisted.internet import protocol, base, fdesc, error, defer
+from twisted.internet import reactor, threads
+from twisted.python import log
+from zope.interface import implements
+
+from scapy.all import Gen
+from scapy.all import SetGen
+
+LINUX=sys.platform.startswith("linux")
+OPENBSD=sys.platform.startswith("openbsd")
+FREEBSD=sys.platform.startswith("freebsd")
+NETBSD=sys.platform.startswith("netbsd")
+DARWIN=sys.platform.startswith("darwin")
+SOLARIS=sys.platform.startswith("sunos")
+WINDOWS=sys.platform.startswith("win32")
+
+from scapy.all import RawPcapWriter, MTU, BasePacketList, conf
+class PcapWriter(RawPcapWriter):
+    def __init__(self, filename, linktype=None, gz=False, endianness="",
+                 append=False, sync=False):
+        RawPcapWriter.__init__(self, filename, linktype=None, gz=False,
+                               endianness="", append=False, sync=False)
+        fdesc.setNonBlocking(self.f)
+
+    def _write_header(self, pkt):
+        if self.linktype == None:
+            if type(pkt) is list or type(pkt) is tuple or isinstance(pkt, BasePacketList):
+                pkt = pkt[0]
+            try:
+                self.linktype = conf.l2types[pkt.__class__]
+            except KeyError:
+                self.linktype = 1
+        RawPcapWriter._write_header(self, pkt)
+
+    def _write_packet(self, packet):
+        sec = int(packet.time)
+        usec = int(round((packet.time-sec)*1000000))
+        s = str(packet)
+        caplen = len(s)
+        RawPcapWriter._write_packet(self, s, sec, usec, caplen, caplen)
+
+class ScapySocket(object):
+    MTU = 1500
+    def __init__(self, filter=None, iface=None, nofilter=None):
+        from scapy.all import conf
+        self.ssocket = conf.L3socket(filter=filter, iface=iface, nofilter=nofilter)
+
+    def fileno(self):
+        return self.ssocket.ins.fileno()
+
+    def send(self, data):
+        return self.ssocket.send(data)
+
+    def recv(self):
+        if FREEBSD or DARWIN:
+            return self.ssocket.nonblock_recv()
+        else:
+            return self.ssocket.recv(self.MTU)
+
+class Scapy(object):
+    """
+    A twisted based wrapper for scapy send and receive functionality.
+
+    It sends packets inside of a threadpool and receives packets using the
+    libdnet receive non blocking file descriptor.
+    """
+    min = 2
+    max = 6
+    debug = True
+    write_only_answers = False
+    pcapwriter = None
+    recv = False
+    timeout = None
+
+    def __init__(self, pkts=None, maxPacketSize=8192, reactor=None, filter=None,
+            iface=None, nofilter=None, pcapfile=None, timeout=None, *args, **kw):
+
+        self.timeout = timeout
+        self.last_answer = None
+        if self.debug:
+            log.startLogging(sys.stdout)
+
+        self.maxPacketSize = maxPacketSize
+        if not reactor:
+            from twisted.internet import reactor
+
+        self._reactor = reactor
+
+        if pkts:
+            self._buildPacketQueues(pkts)
+            self._buildSocket()
+
+        self.cthreads = 0
+        self.mthreads = 80
+
+        self.running = False
+        self.done = False
+        self.finished = False
+
+        import thread
+        from twisted.python import threadpool
+        self.threadID = thread.get_ident
+        self.threadpool = threadpool.ThreadPool(self.min, self.max)
+        self.startID = self._reactor.callWhenRunning(self._start)
+
+        self.deferred = defer.Deferred()
+
+        if pcapfile:
+            self.pcapwriter = PcapWriter(pcapfile)
+
+    def _buildSocket(self, filter=None, iface=None, nofilter=None):
+        self.socket = ScapySocket(filter, iface, nofilter)
+        if self.recv:
+            self._reactor.addReader(self)
+
+    def _buildPacketQueues(self, pkts):
+        """
+        Converts the list of packets to a Scapy generator and sets up all the
+        necessary attributes for understanding if all the needed responses have
+        been received.
+        """
+        if not isinstance(pkts, Gen):
+            self.pkts = SetGen(pkts)
+
+        self.outqueue = [p for p in pkts]
+
+        self.total_count = len(self.outqueue)
+        self.answer_count = 0
+        self.out_count = 0
+
+        self.hsent = {}
+        for p in self.outqueue:
+            h = p.hashret()
+            if h in self.hsent:
+                self.hsent[h].append(p)
+            else:
+                self.hsent[h] = [p]
+
+
+    def gotAnswer(self, answer, question):
+        """
+        Got a packet that has been identified as an answer to one of the sent
+        out packets.
+
+        If the answer count matches the sent count the finish callback is
+        fired.
+
+        @param answer: the packet received on the wire.
+
+        @param question: the sent packet that matches that response.
+
+        """
+
+        if self.pcapwriter and self.write_only_answers:
+            self.pcapwriter.write(question)
+            self.pcapwriter.write(answer)
+        self.answer_count += 1
+        self.last_answer = time.time()
+
+        if self.answer_count >= self.total_count:
+            print "Got all the answers I need"
+            self.deferred.callback(None)
+
+
+    def processAnswer(self, pkt, hlst):
+        """
+        Checks if the potential answer is in fact an answer to one of the
+        matched sent packets. Uses the scapy .answers() function to verify
+        this.
+
+        @param pkt: The packet to be tested if is the answer to a sent packet.
+
+        @param hlst: a list of packets that match the hash for an answer to
+                     pkt.
+        """
+        for i in range(len(hlst)):
+            if pkt.answers(hlst[i]):
+                self.gotAnswer(pkt, hlst[i])
+
+    def fileno(self):
+        """
+        Returns a fileno for use by twisteds Reader.
+        """
+        return self.socket.fileno()
+
+    def processPacket(self, pkt):
+        """
+        Override this method to process your packets.
+
+        @param pkt: the packet that has been received.
+        """
+        #pkt.show()
+
+
+    def doRead(self):
+        """
+        There is something to be read on the wire. Do all the processing on the
+        received packet.
+        """
+        if self.timeout and (time.time() - self.last_answer) > self.timeout and\
+                        not self.outqueue:
+            print "Timing out.."
+            self.deferred.callback(None)
+
+        pkt = self.socket.recv()
+        if self.pcapwriter and not self.write_only_answers:
+            self.pcapwriter.write(pkt)
+        self.processPacket(pkt)
+
+        h = pkt.hashret()
+        if h in self.hsent:
+            hlst = self.hsent[h]
+            self.processAnswer(pkt, hlst)
+
+    def logPrefix(self):
+        """
+        The prefix to be prepended in logging.
+        """
+        return "txScapy"
+
+    def _start(self):
+        """
+        Start the twisted thread pool.
+        """
+        self.startID = None
+        return self.start()
+
+    def start(self):
+        """
+        Actually start the thread pool.
+        """
+        if not self.running:
+            self.threadpool.start()
+            self.shutdownID = self._reactor.addSystemEventTrigger(
+                    'during', 'shutdown', self.finalClose)
+            self.running = True
+
+    def sendPkt(self, pkt):
+        """
+        Send a packet to the wire.
+
+        @param pkt: The packet to be sent.
+        """
+        self.socket.send(pkt)
+
+    def sr(self, pkts, filter=None, iface=None, nofilter=0, timeout=None, *args, **kw):
+        """
+        Wraps the scapy sr function.
+
+        @param nofilter: put 1 to avoid use of bpf filters
+
+        @param retry:    if positive, how many times to resend unanswered packets
+                         if negative, how many times to retry when no more packets are
+                         answered (XXX to be implemented)
+
+        @param timeout:  how much time to wait after the last packet has
+                         been sent (XXX to be implemented)
+
+        @param multi:    whether to accept multiple answers for the same
+                         stimulus (XXX to be implemented)
+
+        @param filter:   provide a BPF filter
+        @param iface:    listen answers only on the given interface
+        """
+        self.timeout = timeout
+        self.recv = True
+        self._sendrcv(pkts, filter=filter, iface=iface, nofilter=nofilter)
+
+    def send(self, pkts, filter=None, iface=None, nofilter=0, *args, **kw):
+        """
+        Wraps the scapy send function. Its the same as send and receive, except
+        it does not receive. Who would have ever guessed? ;)
+
+        @param nofilter: put 1 to avoid use of bpf filters
+
+        @param retry:    if positive, how many times to resend unanswered packets
+                         if negative, how many times to retry when no more packets are
+                         answered (XXX to be implemented)
+
+        @param timeout:  how much time to wait after the last packet has
+                         been sent (XXX to be implemented)
+
+        @param multi:    whether to accept multiple answers for the same
+                         stimulus (XXX to be implemented)
+
+        @param filter:   provide a BPF filter
+        @param iface:    listen answers only on the given interface
+        """
+        self.recv = False
+        self._sendrcv(pkts, filter=filter, iface=iface, nofilter=nofilter)
+
+    def _sendrcv(self, pkts, filter=None, iface=None, nofilter=0):
+        self._buildSocket(filter, iface, nofilter)
+        self._buildPacketQueues(pkts)
+        if not self.last_answer:
+            self.last_answer = time.time()
+
+        def sent(cb):
+            if self.cthreads < self.mthreads and not self.done:
+                pkt = None
+                try:
+                    pkt = self.outqueue.pop()
+                except:
+                    self.done = True
+                    if not self.recv:
+                        self.deferred.callback(None)
+                    return
+                d = threads.deferToThreadPool(reactor, self.threadpool,
+                                    self.sendPkt, pkt)
+                d.addCallback(sent)
+                return d
+
+        for x in range(self.mthreads):
+            try:
+                pkt = self.outqueue.pop()
+            except:
+                self.done = True
+                return
+            if self.cthreads >= self.mthreads and self.done:
+                return
+            d = threads.deferToThreadPool(reactor, self.threadpool,
+                                self.sendPkt, pkt)
+            d.addCallback(sent)
+            return d
+
+    def connectionLost(self, why):
+        pass
+
+    def finalClose(self):
+        """
+        Clean all the thread related stuff up.
+        """
+        self.shutdownID = None
+        self.threadpool.stop()
+        self.running = False
+
+def txsr(*args, **kw):
+    tr = Scapy(*args, **kw)
+    tr.sr(*args, **kw)
+    return tr.deferred
+
+def txsend(*arg, **kw):
+    tr = Scapy(*arg, **kw)
+    tr.send(*arg, **kw)
+    return tr.deferred
diff --git a/ooni/plugins/chinatrigger.py b/ooni/plugins/chinatrigger.py
new file mode 100644
index 0000000..caa6a05
--- /dev/null
+++ b/ooni/plugins/chinatrigger.py
@@ -0,0 +1,137 @@
+import random
+import string
+import struct
+import time
+
+from zope.interface import implements
+from twisted.python import usage
+from twisted.plugin import IPlugin
+from twisted.internet import protocol, defer
+from ooni.plugoo.tests import ITest, OONITest
+from ooni.plugoo.assets import Asset
+from ooni import log
+from ooni.protocols.scapy import ScapyTest
+
+from ooni.lib.txscapy import txsr, txsend
+
+class scapyArgs(usage.Options):
+    optParameters = [['dst', 'd', None, 'Specify the target address'],
+                     ['port', 'p', None, 'Specify the target port'],
+                     ['pcap', 'f', None, 'The pcap file to write with the sent and received packets'],
+                    ]
+
+class ChinaTriggerTest(ScapyTest):
+    """
+    This test is a OONI based implementation of the C tool written
+    by Philipp Winter to engage chinese probes in active scanning.
+
+    Example of running it:
+    ./ooni/ooniprobe.py chinatrigger -d 127.0.0.1 -p 8080 -f bla.pcap
+    """
+    implements(IPlugin, ITest)
+
+    shortName = "chinatrigger"
+    description = "Triggers the chinese probes into scanning"
+    requirements = None
+    options = scapyArgs
+    blocking = False
+
+    receive = True
+    pcapfile = 'example_scapy.pcap'
+    timeout = 5
+
+    def initialize(self, reactor=None):
+        if not self.reactor:
+            from twisted.internet import reactor
+            self.reactor = reactor
+
+    @staticmethod
+    def set_random_servername(pkt):
+        ret = pkt[:121]
+        for i in range(16):
+            ret += random.choice(string.ascii_lowercase)
+        ret += pkt[121+16:]
+        return ret
+
+    @staticmethod
+    def set_random_time(pkt):
+        ret = pkt[:11]
+        ret += struct.pack('!I', int(time.time()))
+        ret += pkt[11+4:]
+        return ret
+
+    @staticmethod
+    def set_random_field(pkt):
+        ret = pkt[:15]
+        for i in range(28):
+            ret += chr(random.randint(0, 256))
+        ret += pkt[15+28:]
+        return ret
+
+    @staticmethod
+    def mutate(pkt, idx):
+        """
+        Slightly changed mutate function.
+        """
+        ret = pkt[:idx-1]
+        ret += chr(random.randint(0, 256))
+        ret += pkt[idx:]
+        return ret
+
+    @staticmethod
+    def set_all_random_fields(pkt):
+        pkt = ChinaTriggerTest.set_random_servername(pkt)
+        pkt = ChinaTriggerTest.set_random_time(pkt)
+        pkt = ChinaTriggerTest.set_random_field(pkt)
+        return pkt
+
+    def build_packets(self, *args, **kw):
+        """
+        Override this method to build scapy packets.
+        """
+        from scapy.all import IP, TCP
+        pkt = "\x16\x03\x01\x00\xcc\x01\x00\x00\xc8"\
+              "\x03\x01\x4f\x12\xe5\x63\x3f\xef\x7d"\
+              "\x20\xb9\x94\xaa\x04\xb0\xc1\xd4\x8c"\
+              "\x50\xcd\xe2\xf9\x2f\xa9\xfb\x78\xca"\
+              "\x02\xa8\x73\xe7\x0e\xa8\xf9\x00\x00"\
+              "\x3a\xc0\x0a\xc0\x14\x00\x39\x00\x38"\
+              "\xc0\x0f\xc0\x05\x00\x35\xc0\x07\xc0"\
+              "\x09\xc0\x11\xc0\x13\x00\x33\x00\x32"\
+              "\xc0\x0c\xc0\x0e\xc0\x02\xc0\x04\x00"\
+              "\x04\x00\x05\x00\x2f\xc0\x08\xc0\x12"\
+              "\x00\x16\x00\x13\xc0\x0d\xc0\x03\xfe"\
+              "\xff\x00\x0a\x00\xff\x01\x00\x00\x65"\
+              "\x00\x00\x00\x1d\x00\x1b\x00\x00\x18"\
+              "\x77\x77\x77\x2e\x67\x6e\x6c\x69\x67"\
+              "\x78\x7a\x70\x79\x76\x6f\x35\x66\x76"\
+              "\x6b\x64\x2e\x63\x6f\x6d\x00\x0b\x00"\
+              "\x04\x03\x00\x01\x02\x00\x0a\x00\x34"\
+              "\x00\x32\x00\x01\x00\x02\x00\x03\x00"\
+              "\x04\x00\x05\x00\x06\x00\x07\x00\x08"\
+              "\x00\x09\x00\x0a\x00\x0b\x00\x0c\x00"\
+              "\x0d\x00\x0e\x00\x0f\x00\x10\x00\x11"\
+              "\x00\x12\x00\x13\x00\x14\x00\x15\x00"\
+              "\x16\x00\x17\x00\x18\x00\x19\x00\x23"\
+              "\x00\x00"
+
+        pkt = ChinaTriggerTest.set_all_random_fields(pkt)
+        pkts = [IP(dst=self.dst)/TCP(dport=self.port)/pkt]
+        for x in range(len(pkt)):
+            mutation = IP(dst=self.dst)/TCP(dport=self.port)/ChinaTriggerTest.mutate(pkt, x)
+            pkts.append(mutation)
+        return pkts
+
+    def load_assets(self):
+        if self.local_options:
+            self.dst = self.local_options['dst']
+            self.port = int(self.local_options['port'])
+            if self.local_options['pcap']:
+                self.pcapfile = self.local_options['pcap']
+            if not self.port or not self.dst:
+                pass
+
+        return {}
+
+chinatrigger = ChinaTriggerTest(None, None, None)
+
diff --git a/ooni/protocols/scapy.py b/ooni/protocols/scapy.py
index ba2ae66..bacc163 100644
--- a/ooni/protocols/scapy.py
+++ b/ooni/protocols/scapy.py
@@ -15,6 +15,7 @@ class ScapyTest(OONITest):
     """
 
     receive = True
+    timeout = None
     pcapfile = 'scapytest.pcap'
     def initialize(self, reactor=None):
 
@@ -28,10 +29,15 @@ class ScapyTest(OONITest):
     def experiment(self, args):
         log.msg("Running experiment")
         if self.receive:
-            d = txsr(self.build_packets(), pcapfile=self.pcapfile)
+            log.msg("Sending and receiving packets.")
+            d = txsr(self.build_packets(), pcapfile=self.pcapfile,
+                    timeout=self.timeout)
         else:
+            log.msg("Sending packets.")
             d = txsend(self.build_packets())
+
         def finished(data):
+            log.msg("Finished sending")
             return data
 
         d.addCallback(finished)



More information about the tor-commits mailing list