commit b816984e3832d8e2d21a67b29d23a2b48cfae962 Author: Arturo Filastò art@torproject.org Date: Sat Jun 16 16:30:15 2012 +0200
Write phws GFW triggering test as an OONI plugin. * Import txscapy --- ooni/lib/txscapy.py | 363 ++++++++++++++++++++++++++++++++++++++++++ ooni/plugins/chinatrigger.py | 137 ++++++++++++++++ ooni/protocols/scapy.py | 8 +- 3 files changed, 507 insertions(+), 1 deletions(-)
diff --git a/ooni/lib/txscapy.py b/ooni/lib/txscapy.py new file mode 100644 index 0000000..8996f75 --- /dev/null +++ b/ooni/lib/txscapy.py @@ -0,0 +1,363 @@ +# -*- coding:utf8 -*- +""" + txscapy + ****** + (c) 2012 Arturo Filastò + a twisted wrapper for scapys send and receive functions. + + This software has been written to be part of OONI, the Open Observatory of + Network Interference. More information on that here: http://ooni.nu/ + +""" + +import struct +import socket +import os +import sys +import time + +from twisted.internet import protocol, base, fdesc, error, defer +from twisted.internet import reactor, threads +from twisted.python import log +from zope.interface import implements + +from scapy.all import Gen +from scapy.all import SetGen + +LINUX=sys.platform.startswith("linux") +OPENBSD=sys.platform.startswith("openbsd") +FREEBSD=sys.platform.startswith("freebsd") +NETBSD=sys.platform.startswith("netbsd") +DARWIN=sys.platform.startswith("darwin") +SOLARIS=sys.platform.startswith("sunos") +WINDOWS=sys.platform.startswith("win32") + +from scapy.all import RawPcapWriter, MTU, BasePacketList, conf +class PcapWriter(RawPcapWriter): + def __init__(self, filename, linktype=None, gz=False, endianness="", + append=False, sync=False): + RawPcapWriter.__init__(self, filename, linktype=None, gz=False, + endianness="", append=False, sync=False) + fdesc.setNonBlocking(self.f) + + def _write_header(self, pkt): + if self.linktype == None: + if type(pkt) is list or type(pkt) is tuple or isinstance(pkt, BasePacketList): + pkt = pkt[0] + try: + self.linktype = conf.l2types[pkt.__class__] + except KeyError: + self.linktype = 1 + RawPcapWriter._write_header(self, pkt) + + def _write_packet(self, packet): + sec = int(packet.time) + usec = int(round((packet.time-sec)*1000000)) + s = str(packet) + caplen = len(s) + RawPcapWriter._write_packet(self, s, sec, usec, caplen, caplen) + +class ScapySocket(object): + MTU = 1500 + def __init__(self, filter=None, iface=None, nofilter=None): + from scapy.all import conf + self.ssocket = conf.L3socket(filter=filter, iface=iface, nofilter=nofilter) + + def fileno(self): + return self.ssocket.ins.fileno() + + def send(self, data): + return self.ssocket.send(data) + + def recv(self): + if FREEBSD or DARWIN: + return self.ssocket.nonblock_recv() + else: + return self.ssocket.recv(self.MTU) + +class Scapy(object): + """ + A twisted based wrapper for scapy send and receive functionality. + + It sends packets inside of a threadpool and receives packets using the + libdnet receive non blocking file descriptor. + """ + min = 2 + max = 6 + debug = True + write_only_answers = False + pcapwriter = None + recv = False + timeout = None + + def __init__(self, pkts=None, maxPacketSize=8192, reactor=None, filter=None, + iface=None, nofilter=None, pcapfile=None, timeout=None, *args, **kw): + + self.timeout = timeout + self.last_answer = None + if self.debug: + log.startLogging(sys.stdout) + + self.maxPacketSize = maxPacketSize + if not reactor: + from twisted.internet import reactor + + self._reactor = reactor + + if pkts: + self._buildPacketQueues(pkts) + self._buildSocket() + + self.cthreads = 0 + self.mthreads = 80 + + self.running = False + self.done = False + self.finished = False + + import thread + from twisted.python import threadpool + self.threadID = thread.get_ident + self.threadpool = threadpool.ThreadPool(self.min, self.max) + self.startID = self._reactor.callWhenRunning(self._start) + + self.deferred = defer.Deferred() + + if pcapfile: + self.pcapwriter = PcapWriter(pcapfile) + + def _buildSocket(self, filter=None, iface=None, nofilter=None): + self.socket = ScapySocket(filter, iface, nofilter) + if self.recv: + self._reactor.addReader(self) + + def _buildPacketQueues(self, pkts): + """ + Converts the list of packets to a Scapy generator and sets up all the + necessary attributes for understanding if all the needed responses have + been received. + """ + if not isinstance(pkts, Gen): + self.pkts = SetGen(pkts) + + self.outqueue = [p for p in pkts] + + self.total_count = len(self.outqueue) + self.answer_count = 0 + self.out_count = 0 + + self.hsent = {} + for p in self.outqueue: + h = p.hashret() + if h in self.hsent: + self.hsent[h].append(p) + else: + self.hsent[h] = [p] + + + def gotAnswer(self, answer, question): + """ + Got a packet that has been identified as an answer to one of the sent + out packets. + + If the answer count matches the sent count the finish callback is + fired. + + @param answer: the packet received on the wire. + + @param question: the sent packet that matches that response. + + """ + + if self.pcapwriter and self.write_only_answers: + self.pcapwriter.write(question) + self.pcapwriter.write(answer) + self.answer_count += 1 + self.last_answer = time.time() + + if self.answer_count >= self.total_count: + print "Got all the answers I need" + self.deferred.callback(None) + + + def processAnswer(self, pkt, hlst): + """ + Checks if the potential answer is in fact an answer to one of the + matched sent packets. Uses the scapy .answers() function to verify + this. + + @param pkt: The packet to be tested if is the answer to a sent packet. + + @param hlst: a list of packets that match the hash for an answer to + pkt. + """ + for i in range(len(hlst)): + if pkt.answers(hlst[i]): + self.gotAnswer(pkt, hlst[i]) + + def fileno(self): + """ + Returns a fileno for use by twisteds Reader. + """ + return self.socket.fileno() + + def processPacket(self, pkt): + """ + Override this method to process your packets. + + @param pkt: the packet that has been received. + """ + #pkt.show() + + + def doRead(self): + """ + There is something to be read on the wire. Do all the processing on the + received packet. + """ + if self.timeout and (time.time() - self.last_answer) > self.timeout and\ + not self.outqueue: + print "Timing out.." + self.deferred.callback(None) + + pkt = self.socket.recv() + if self.pcapwriter and not self.write_only_answers: + self.pcapwriter.write(pkt) + self.processPacket(pkt) + + h = pkt.hashret() + if h in self.hsent: + hlst = self.hsent[h] + self.processAnswer(pkt, hlst) + + def logPrefix(self): + """ + The prefix to be prepended in logging. + """ + return "txScapy" + + def _start(self): + """ + Start the twisted thread pool. + """ + self.startID = None + return self.start() + + def start(self): + """ + Actually start the thread pool. + """ + if not self.running: + self.threadpool.start() + self.shutdownID = self._reactor.addSystemEventTrigger( + 'during', 'shutdown', self.finalClose) + self.running = True + + def sendPkt(self, pkt): + """ + Send a packet to the wire. + + @param pkt: The packet to be sent. + """ + self.socket.send(pkt) + + def sr(self, pkts, filter=None, iface=None, nofilter=0, timeout=None, *args, **kw): + """ + Wraps the scapy sr function. + + @param nofilter: put 1 to avoid use of bpf filters + + @param retry: if positive, how many times to resend unanswered packets + if negative, how many times to retry when no more packets are + answered (XXX to be implemented) + + @param timeout: how much time to wait after the last packet has + been sent (XXX to be implemented) + + @param multi: whether to accept multiple answers for the same + stimulus (XXX to be implemented) + + @param filter: provide a BPF filter + @param iface: listen answers only on the given interface + """ + self.timeout = timeout + self.recv = True + self._sendrcv(pkts, filter=filter, iface=iface, nofilter=nofilter) + + def send(self, pkts, filter=None, iface=None, nofilter=0, *args, **kw): + """ + Wraps the scapy send function. Its the same as send and receive, except + it does not receive. Who would have ever guessed? ;) + + @param nofilter: put 1 to avoid use of bpf filters + + @param retry: if positive, how many times to resend unanswered packets + if negative, how many times to retry when no more packets are + answered (XXX to be implemented) + + @param timeout: how much time to wait after the last packet has + been sent (XXX to be implemented) + + @param multi: whether to accept multiple answers for the same + stimulus (XXX to be implemented) + + @param filter: provide a BPF filter + @param iface: listen answers only on the given interface + """ + self.recv = False + self._sendrcv(pkts, filter=filter, iface=iface, nofilter=nofilter) + + def _sendrcv(self, pkts, filter=None, iface=None, nofilter=0): + self._buildSocket(filter, iface, nofilter) + self._buildPacketQueues(pkts) + if not self.last_answer: + self.last_answer = time.time() + + def sent(cb): + if self.cthreads < self.mthreads and not self.done: + pkt = None + try: + pkt = self.outqueue.pop() + except: + self.done = True + if not self.recv: + self.deferred.callback(None) + return + d = threads.deferToThreadPool(reactor, self.threadpool, + self.sendPkt, pkt) + d.addCallback(sent) + return d + + for x in range(self.mthreads): + try: + pkt = self.outqueue.pop() + except: + self.done = True + return + if self.cthreads >= self.mthreads and self.done: + return + d = threads.deferToThreadPool(reactor, self.threadpool, + self.sendPkt, pkt) + d.addCallback(sent) + return d + + def connectionLost(self, why): + pass + + def finalClose(self): + """ + Clean all the thread related stuff up. + """ + self.shutdownID = None + self.threadpool.stop() + self.running = False + +def txsr(*args, **kw): + tr = Scapy(*args, **kw) + tr.sr(*args, **kw) + return tr.deferred + +def txsend(*arg, **kw): + tr = Scapy(*arg, **kw) + tr.send(*arg, **kw) + return tr.deferred diff --git a/ooni/plugins/chinatrigger.py b/ooni/plugins/chinatrigger.py new file mode 100644 index 0000000..caa6a05 --- /dev/null +++ b/ooni/plugins/chinatrigger.py @@ -0,0 +1,137 @@ +import random +import string +import struct +import time + +from zope.interface import implements +from twisted.python import usage +from twisted.plugin import IPlugin +from twisted.internet import protocol, defer +from ooni.plugoo.tests import ITest, OONITest +from ooni.plugoo.assets import Asset +from ooni import log +from ooni.protocols.scapy import ScapyTest + +from ooni.lib.txscapy import txsr, txsend + +class scapyArgs(usage.Options): + optParameters = [['dst', 'd', None, 'Specify the target address'], + ['port', 'p', None, 'Specify the target port'], + ['pcap', 'f', None, 'The pcap file to write with the sent and received packets'], + ] + +class ChinaTriggerTest(ScapyTest): + """ + This test is a OONI based implementation of the C tool written + by Philipp Winter to engage chinese probes in active scanning. + + Example of running it: + ./ooni/ooniprobe.py chinatrigger -d 127.0.0.1 -p 8080 -f bla.pcap + """ + implements(IPlugin, ITest) + + shortName = "chinatrigger" + description = "Triggers the chinese probes into scanning" + requirements = None + options = scapyArgs + blocking = False + + receive = True + pcapfile = 'example_scapy.pcap' + timeout = 5 + + def initialize(self, reactor=None): + if not self.reactor: + from twisted.internet import reactor + self.reactor = reactor + + @staticmethod + def set_random_servername(pkt): + ret = pkt[:121] + for i in range(16): + ret += random.choice(string.ascii_lowercase) + ret += pkt[121+16:] + return ret + + @staticmethod + def set_random_time(pkt): + ret = pkt[:11] + ret += struct.pack('!I', int(time.time())) + ret += pkt[11+4:] + return ret + + @staticmethod + def set_random_field(pkt): + ret = pkt[:15] + for i in range(28): + ret += chr(random.randint(0, 256)) + ret += pkt[15+28:] + return ret + + @staticmethod + def mutate(pkt, idx): + """ + Slightly changed mutate function. + """ + ret = pkt[:idx-1] + ret += chr(random.randint(0, 256)) + ret += pkt[idx:] + return ret + + @staticmethod + def set_all_random_fields(pkt): + pkt = ChinaTriggerTest.set_random_servername(pkt) + pkt = ChinaTriggerTest.set_random_time(pkt) + pkt = ChinaTriggerTest.set_random_field(pkt) + return pkt + + def build_packets(self, *args, **kw): + """ + Override this method to build scapy packets. + """ + from scapy.all import IP, TCP + pkt = "\x16\x03\x01\x00\xcc\x01\x00\x00\xc8"\ + "\x03\x01\x4f\x12\xe5\x63\x3f\xef\x7d"\ + "\x20\xb9\x94\xaa\x04\xb0\xc1\xd4\x8c"\ + "\x50\xcd\xe2\xf9\x2f\xa9\xfb\x78\xca"\ + "\x02\xa8\x73\xe7\x0e\xa8\xf9\x00\x00"\ + "\x3a\xc0\x0a\xc0\x14\x00\x39\x00\x38"\ + "\xc0\x0f\xc0\x05\x00\x35\xc0\x07\xc0"\ + "\x09\xc0\x11\xc0\x13\x00\x33\x00\x32"\ + "\xc0\x0c\xc0\x0e\xc0\x02\xc0\x04\x00"\ + "\x04\x00\x05\x00\x2f\xc0\x08\xc0\x12"\ + "\x00\x16\x00\x13\xc0\x0d\xc0\x03\xfe"\ + "\xff\x00\x0a\x00\xff\x01\x00\x00\x65"\ + "\x00\x00\x00\x1d\x00\x1b\x00\x00\x18"\ + "\x77\x77\x77\x2e\x67\x6e\x6c\x69\x67"\ + "\x78\x7a\x70\x79\x76\x6f\x35\x66\x76"\ + "\x6b\x64\x2e\x63\x6f\x6d\x00\x0b\x00"\ + "\x04\x03\x00\x01\x02\x00\x0a\x00\x34"\ + "\x00\x32\x00\x01\x00\x02\x00\x03\x00"\ + "\x04\x00\x05\x00\x06\x00\x07\x00\x08"\ + "\x00\x09\x00\x0a\x00\x0b\x00\x0c\x00"\ + "\x0d\x00\x0e\x00\x0f\x00\x10\x00\x11"\ + "\x00\x12\x00\x13\x00\x14\x00\x15\x00"\ + "\x16\x00\x17\x00\x18\x00\x19\x00\x23"\ + "\x00\x00" + + pkt = ChinaTriggerTest.set_all_random_fields(pkt) + pkts = [IP(dst=self.dst)/TCP(dport=self.port)/pkt] + for x in range(len(pkt)): + mutation = IP(dst=self.dst)/TCP(dport=self.port)/ChinaTriggerTest.mutate(pkt, x) + pkts.append(mutation) + return pkts + + def load_assets(self): + if self.local_options: + self.dst = self.local_options['dst'] + self.port = int(self.local_options['port']) + if self.local_options['pcap']: + self.pcapfile = self.local_options['pcap'] + if not self.port or not self.dst: + pass + + return {} + +chinatrigger = ChinaTriggerTest(None, None, None) + diff --git a/ooni/protocols/scapy.py b/ooni/protocols/scapy.py index ba2ae66..bacc163 100644 --- a/ooni/protocols/scapy.py +++ b/ooni/protocols/scapy.py @@ -15,6 +15,7 @@ class ScapyTest(OONITest): """
receive = True + timeout = None pcapfile = 'scapytest.pcap' def initialize(self, reactor=None):
@@ -28,10 +29,15 @@ class ScapyTest(OONITest): def experiment(self, args): log.msg("Running experiment") if self.receive: - d = txsr(self.build_packets(), pcapfile=self.pcapfile) + log.msg("Sending and receiving packets.") + d = txsr(self.build_packets(), pcapfile=self.pcapfile, + timeout=self.timeout) else: + log.msg("Sending packets.") d = txsend(self.build_packets()) + def finished(data): + log.msg("Finished sending") return data
d.addCallback(finished)