[tor-commits] [ooni-probe/master] Implement logging to PCAP file support

art at torproject.org art at torproject.org
Fri Nov 9 12:22:50 UTC 2012


commit 6a3ee55b574adaa8740ccafe2e4a01719dc0e86e
Author: Arturo Filastò <art at fuffa.org>
Date:   Fri Nov 9 13:20:38 2012 +0100

    Implement logging to PCAP file support
---
 docs/source/index.rst  |    2 +-
 ooni/config.py         |   17 ++-
 ooni/lib/txscapy.py    |  381 ------------------------------------------------
 ooni/nettest.py        |    2 +-
 ooni/oonicli.py        |   43 +++++-
 ooni/runner.py         |   24 +--
 ooni/utils/__init__.py |    4 +
 ooni/utils/net.py      |   38 ++++--
 ooniprobe.conf         |    5 +-
 9 files changed, 94 insertions(+), 422 deletions(-)

diff --git a/docs/source/index.rst b/docs/source/index.rst
index 2497a09..132381f 100644
--- a/docs/source/index.rst
+++ b/docs/source/index.rst
@@ -28,7 +28,7 @@ On debian based systems these can be installed with:
 The python dependencies required for running ooniprobe are:
 
     * Twisted
-    * Scapy
+    * Scapy >= 2.2.0
     * txtorcon
 
 They can be installed from the requirements.txt with:
diff --git a/ooni/config.py b/ooni/config.py
index de5f45b..f3d1a80 100644
--- a/ooni/config.py
+++ b/ooni/config.py
@@ -6,8 +6,9 @@
 import os
 import yaml
 
-from twisted.internet import reactor
+from twisted.internet import reactor, threads
 
+from ooni.utils import date
 from ooni.utils import Storage
 
 def get_root_path():
@@ -16,6 +17,18 @@ def get_root_path():
     root = os.path.abspath(root)
     return root
 
+def oreport_filenames():
+    """
+    returns the filenames for the pcap file and the yamloo report
+
+    returns
+    yamloo_filename, pcap_filename
+    """
+    base_filename = "%s_"+date.timestamp()+".%s"
+    yamloo_filename = base_filename % ("report", "yamloo")
+    pcap_filename = base_filename % ("packets", "pcap")
+    return yamloo_filename, pcap_filename
+
 config_file = os.path.join(get_root_path(), 'ooniprobe.conf')
 try:
     f = open(config_file)
@@ -41,7 +54,5 @@ advanced = Storage()
 for k, v in configuration['advanced'].items():
     advanced[k] = v
 
-threadpool = ThreadPool(0, advanced.threadpool_size)
-threadpool.start()
 # This is used to keep track of the state of the sniffer
 sniffer_running = None
diff --git a/ooni/lib/txscapy.py b/ooni/lib/txscapy.py
deleted file mode 100644
index 00224d6..0000000
--- a/ooni/lib/txscapy.py
+++ /dev/null
@@ -1,381 +0,0 @@
-# -*- coding:utf8 -*-
-"""
-txscapy
-*******
-(c) 2012 Arturo Filastò
-a twisted wrapper for scapys send and receive functions.
-
-This software has been written to be part of OONI, the Open Observatory of
-Network Interference. More information on that here: http://ooni.nu/
-
-"""
-
-import struct
-import socket
-import os
-import sys
-import time
-
-from twisted.internet import protocol, base, fdesc, error, defer
-from twisted.internet import reactor, threads
-from zope.interface import implements
-
-from scapy.all import Gen
-from scapy.all import SetGen
-
-from ooni.utils import log
-
-LINUX=sys.platform.startswith("linux")
-OPENBSD=sys.platform.startswith("openbsd")
-FREEBSD=sys.platform.startswith("freebsd")
-NETBSD=sys.platform.startswith("netbsd")
-DARWIN=sys.platform.startswith("darwin")
-SOLARIS=sys.platform.startswith("sunos")
-WINDOWS=sys.platform.startswith("win32")
-
-from scapy.all import RawPcapWriter, MTU, BasePacketList, conf
-class PcapWriter(RawPcapWriter):
-    def __init__(self, filename, linktype=None, gz=False, endianness="",
-                 append=False, sync=False):
-        RawPcapWriter.__init__(self, filename, linktype=linktype, gz=gz,
-                               endianness=endianness, append=append, sync=sync)
-        fdesc.setNonBlocking(self.f)
-
-    def _write_header(self, pkt):
-        if self.linktype == None:
-            if type(pkt) is list or type(pkt) is tuple or isinstance(pkt, BasePacketList):
-                pkt = pkt[0]
-            try:
-                self.linktype = conf.l2types[pkt.__class__]
-            except KeyError:
-                self.linktype = 1
-        RawPcapWriter._write_header(self, pkt)
-
-    def _write_packet(self, packet):
-        sec = int(packet.time)
-        usec = int(round((packet.time-sec)*1000000))
-        s = str(packet)
-        caplen = len(s)
-        RawPcapWriter._write_packet(self, s, sec, usec, caplen, caplen)
-
-class ScapySocket(object):
-    MTU = 1500
-    def __init__(self, filter=None, iface=None, nofilter=None):
-        from scapy.all import conf
-        self.ssocket = conf.L3socket(filter=filter, iface=iface, nofilter=nofilter)
-
-    def fileno(self):
-        return self.ssocket.ins.fileno()
-
-    def send(self, data):
-        return self.ssocket.send(data)
-
-    def recv(self):
-        if FREEBSD or DARWIN:
-            return self.ssocket.nonblock_recv()
-        else:
-            return self.ssocket.recv(self.MTU)
-
-class TXScapy(object):
-    """
-    A twisted based wrapper for scapy send and receive functionality.
-
-    It sends packets inside of a threadpool and receives packets using the
-    libdnet receive non blocking file descriptor.
-    """
-    min = 2
-    max = 6
-    debug = False
-    write_only_answers = False
-    pcapwriter = None
-    recv = False
-    timeout_call = None
-    answers = []
-    questions = []
-
-    def __init__(self, pkts=None, maxPacketSize=8192, reactor=None, filter=None,
-            iface=None, nofilter=None, pcapfile=None, timeout=None, *arg, **kw):
-        self.maxPacketSize = maxPacketSize
-        if not reactor:
-            from twisted.internet import reactor
-
-        self._reactor = reactor
-
-        if pkts:
-            self._buildPacketQueues(pkts)
-            try:
-                self._buildSocket()
-            except Exception, e:
-                log.err("Unable to build socket. Are you root?")
-                sys.exit()
-
-        self.cthreads = 0
-        self.mthreads = 80
-
-        self.running = False
-        self.done = False
-        self.finished = False
-
-        import thread
-        from twisted.python import threadpool
-        self.threadID = thread.get_ident
-        self.threadpool = threadpool.ThreadPool(self.min, self.max)
-        self.startID = self._reactor.callWhenRunning(self._start)
-
-        self.deferred = defer.Deferred()
-
-        if pcapfile:
-            self.pcapwriter = PcapWriter(pcapfile)
-
-        if timeout and self.recv:
-            pass
-
-    def _buildSocket(self, filter=None, iface=None, nofilter=None):
-        self.socket = ScapySocket(filter, iface, nofilter)
-        if self.recv:
-            self._reactor.addReader(self)
-
-    def _buildPacketQueues(self, pkts):
-        """
-        Converts the list of packets to a Scapy generator and sets up all the
-        necessary attributes for understanding if all the needed responses have
-        been received.
-        """
-        if not isinstance(pkts, Gen):
-            self.pkts = SetGen(pkts)
-
-        self.outqueue = [p for p in pkts]
-
-        self.total_count = len(self.outqueue)
-        self.answer_count = 0
-        self.out_count = 0
-
-        self.hsent = {}
-        for p in self.outqueue:
-            h = p.hashret()
-            if h in self.hsent:
-                self.hsent[h].append(p)
-            else:
-                self.hsent[h] = [p]
-
-
-    def gotAnswer(self, answer, question):
-        """
-        Got a packet that has been identified as an answer to one of the sent
-        out packets.
-
-        If the answer count matches the sent count the finish callback is
-        fired.
-
-        @param answer: the packet received on the wire.
-
-        @param question: the sent packet that matches that response.
-
-        """
-        if self.pcapwriter and self.write_only_answers:
-            self.pcapwriter.write(question)
-            self.pcapwriter.write(answer)
-            self.answers.append(answers)
-            self.questions.append(question)
-        self.answer_count += 1
-        if self.answer_count >= self.total_count and self.running:
-            log.debug("Got all the answers I need")
-            self.finalClose()
-            self.deferred.callback(None)
-
-    def processAnswer(self, pkt, hlst):
-        """
-        Checks if the potential answer is in fact an answer to one of the
-        matched sent packets. Uses the scapy .answers() function to verify
-        this.
-
-        @param pkt: The packet to be tested if is the answer to a sent packet.
-
-        @param hlst: a list of packets that match the hash for an answer to
-                     pkt.
-        """
-        for i in range(len(hlst)):
-            if pkt.answers(hlst[i]):
-                self.gotAnswer(pkt, hlst[i])
-
-    def fileno(self):
-        """
-        Returns a fileno for use by twisteds Reader.
-        """
-        return self.socket.fileno()
-
-    def processPacket(self, pkt):
-        """
-        Override this method to process your packets.
-
-        @param pkt: the packet that has been received.
-        """
-        pkt.show()
-
-    def doRead(self):
-        """
-        There is something to be read on the wire. Do all the processing on the
-        received packet.
-        """
-        pkt = self.socket.recv()
-        if not pkt:
-            return
-        if self.pcapwriter and not self.write_only_answers:
-            self.pcapwriter.write(pkt)
-
-        self.processPacket(pkt)
-
-        h = pkt.hashret()
-        if h in self.hsent:
-            hlst = self.hsent[h]
-            self.processAnswer(pkt, hlst)
-
-    def logPrefix(self):
-        """
-        The prefix to be prepended in logging.
-        """
-        return "txScapy"
-
-    def _start(self):
-        """
-        Start the twisted thread pool.
-        """
-        self.startID = None
-        return self.start()
-
-    def start(self):
-        """
-        Actually start the thread pool.
-        """
-        if not self.running:
-            self.threadpool.start()
-            self.shutdownID = self._reactor.addSystemEventTrigger(
-                    'during', 'shutdown', self.finalClose)
-            self.running = True
-
-    def sendPkt(self, pkt):
-        """
-        Send a packet to the wire.
-
-        @param pkt: The packet to be sent.
-        """
-        self.socket.send(pkt)
-
-    def timeout(self, *arg, **kw):
-        if not self.done:
-            log.debug("I have not finished. Setting to call in %s" %
-                    self.timeoutSeconds)
-            self.timeout_call = self._reactor.callLater(self.timeoutSeconds, self.timeout, None)
-        elif self.running:
-            log.debug("Cancelling timeout call")
-            self.finalClose()
-            self.deferred.callback(None)
-
-    def sr(self, pkts, filter=None, iface=None, nofilter=0, timeout=None, *args, **kw):
-        """
-        Wraps the scapy sr function.
-
-        @param nofilter: put 1 to avoid use of bpf filters
-
-        @param retry:    if positive, how many times to resend unanswered packets
-                         if negative, how many times to retry when no more packets are
-                         answered (XXX to be implemented)
-
-        @param timeout:  how much time to wait after the last packet has
-                         been sent (XXX to be implemented)
-
-        @param multi:    whether to accept multiple answers for the same
-                         stimulus (XXX to be implemented)
-
-        @param filter:   provide a BPF filter
-        @param iface:    listen answers only on the given interface
-        """
-        log.debug("TXScapy sending and receiving packets")
-        self.recv = True
-        if timeout:
-            self.timeoutSeconds = timeout
-            self.timeout_call = self._reactor.callLater(timeout, self.timeout, None)
-        self._sendrcv(pkts, filter=filter, iface=iface, nofilter=nofilter)
-
-    def send(self, pkts, filter=None, iface=None, nofilter=0, *args, **kw):
-        """
-        Wraps the scapy send function. Its the same as send and receive, except
-        it does not receive. Who would have ever guessed? ;)
-
-        @param nofilter: put 1 to avoid use of bpf filters
-
-        @param retry:    if positive, how many times to resend unanswered packets
-                         if negative, how many times to retry when no more packets are
-                         answered (XXX to be implemented)
-
-        @param timeout:  how much time to wait after the last packet has
-                         been sent (XXX to be implemented)
-
-        @param multi:    whether to accept multiple answers for the same
-                         stimulus (XXX to be implemented)
-
-        @param filter:   provide a BPF filter
-        @param iface:    listen answers only on the given interface
-        """
-        self.recv = False
-        self._sendrcv(pkts, filter=filter, iface=iface, nofilter=nofilter)
-
-    def _sendrcv(self, pkts, filter=None, iface=None, nofilter=0):
-        self._buildSocket(filter, iface, nofilter)
-        self._buildPacketQueues(pkts)
-        def sent(cb):
-            if self.cthreads < self.mthreads and not self.done:
-                pkt = None
-                try:
-                    pkt = self.outqueue.pop()
-                except:
-                    self.done = True
-                    if not self.recv and self.running:
-                        log.debug("I am not in receiving state running callback")
-                        self.deferred.callback(None)
-                    return
-                d = threads.deferToThreadPool(reactor, self.threadpool,
-                                    self.sendPkt, pkt)
-                d.addCallback(sent)
-                return d
-
-        for x in range(self.mthreads):
-            try:
-                pkt = self.outqueue.pop()
-            except:
-                self.done = True
-                return
-            if self.cthreads >= self.mthreads and self.done:
-                return
-            d = threads.deferToThreadPool(reactor, self.threadpool,
-                                self.sendPkt, pkt)
-            d.addCallback(sent)
-            return d
-
-    def connectionLost(self, why):
-        pass
-
-    def finalClose(self):
-        """
-        Clean all the shutdown related functions.
-        """
-        self.shutdownID = None
-        self.threadpool.stop()
-        if self.timeout_call:
-            self.timeout_call.cancel()
-            self.timeout_call = None
-        self.running = False
-
- at defer.inlineCallbacks
-def txsr(*args, **kw):
-    tr = TXScapy(*args, **kw)
-    tr.sr(*args, **kw)
-    yield tr.deferred
-    tr.finalClose()
-
- at defer.inlineCallbacks
-def txsend(*arg, **kw):
-    tr = TXScapy(*arg, **kw)
-    tr.send(*arg, **kw)
-    yield tr.deferred
-    tr.finalClose()
diff --git a/ooni/nettest.py b/ooni/nettest.py
index 289cd23..6221a3f 100644
--- a/ooni/nettest.py
+++ b/ooni/nettest.py
@@ -136,7 +136,7 @@ class NetTestCase(object):
         for x in fp.xreadlines():
             yield x.strip()
         fp.close()
-    
+
     def _checkRequiredOptions(self):
         for required_option in self.requiredOptions:
             log.debug("Checking if %s is present" % required_option)
diff --git a/ooni/oonicli.py b/ooni/oonicli.py
index 2a6f0cd..4b63a61 100644
--- a/ooni/oonicli.py
+++ b/ooni/oonicli.py
@@ -27,7 +27,7 @@ from ooni import nettest, runner, reporter, config
 
 from ooni.inputunit import InputUnitFactory
 
-from ooni.utils import net
+from ooni.utils import net, checkForRoot
 from ooni.utils import log
 
 
@@ -44,7 +44,8 @@ class Options(usage.Options, app.ReactorSelectionMixin):
                     'Report deferred creation and callback stack traces'],]
 
     optParameters = [["reportfile", "o", None, "report file name"],
-                     ["logfile", "l", None, "log file name"],]
+                     ["logfile", "l", None, "log file name"],
+                     ["pcapfile", "p", None, "pcap file name"]]
 
     compData = usage.Completions(
         extraActions=[usage.CompleteFiles(
@@ -75,6 +76,12 @@ class Options(usage.Options, app.ReactorSelectionMixin):
         except:
             raise usage.UsageError("No test filename specified!")
 
+def testsEnded(*arg, **kw):
+    """
+    You can place here all the post shutdown tasks.
+    """
+    log.debug("Finished running all tests")
+
 def run():
     """
     Call me to begin testing from a file.
@@ -90,11 +97,37 @@ def run():
     if cmd_line_options['debug-stacktraces']:
         defer.setDebugging(True)
 
+    yamloo_filename, pcap_filename = config.oreport_filenames()
+
+    if cmd_line_options['reportfile']:
+        yamloo_filename = cmd_line_options['reportfile']
+        pcap_filename = yamloo_filename+".pcap"
+
+    if os.path.exists(yamloo_filename):
+        log.msg("Report already exists with filename %s" % yamloo_filename)
+        log.msg("Renaming it to %s" % yamloo_filename+'.old')
+        os.rename(yamloo_filename, yamloo_filename+'.old')
+    if os.path.exists(pcap_filename):
+        log.msg("Report already exists with filename %s" % pcap_filename)
+        log.msg("Renaming it to %s" % pcap_filename+'.old')
+        os.rename(pcap_filename, pcap_filename+'.old')
+
     log.start(cmd_line_options['logfile'])
     classes = runner.findTestClassesFromConfig(cmd_line_options)
     test_cases, options = runner.loadTestsAndOptions(classes, cmd_line_options)
-    d = 
-    runner.runTestCases(test_cases, options, cmd_line_options)
-    reactor.run()
+    if config.privacy.includepcap:
+        try:
+            checkForRoot()
+        except:
+            log.err("includepcap options requires root priviledges to run")
+            log.err("disable it in your ooniprobe.conf file")
+            sys.exit(1)
+        log.debug("Starting sniffer")
+        sniffer_d = net.capturePackets(pcap_filename)
 
+    tests_d = runner.runTestCases(test_cases, options,
+            cmd_line_options, yamloo_filename)
+    tests_d.addBoth(testsEnded)
+
+    reactor.run()
 
diff --git a/ooni/runner.py b/ooni/runner.py
index 083de35..d8b7df8 100644
--- a/ooni/runner.py
+++ b/ooni/runner.py
@@ -24,7 +24,8 @@ from ooni.inputunit import InputUnitFactory
 from ooni.nettest import NetTestCase
 
 from ooni import reporter
-from ooni.utils import log, date
+
+from ooni.utils import log, date, checkForRoot
 
 def processTest(obj, cmd_line_options):
     """
@@ -41,8 +42,7 @@ def processTest(obj, cmd_line_options):
 
     input_file = obj.inputFile
     if obj.requiresRoot:
-        if os.getuid() != 0:
-            raise Exception("This test requires root to run")
+        checkForRoot("test")
 
     if obj.optParameters or input_file \
             or obj.usageOptions or obj.optFlags:
@@ -184,7 +184,8 @@ def runTestWithInputUnit(test_class,
     return defer.DeferredList(dl)
 
 @defer.inlineCallbacks
-def runTestCases(test_cases, options, cmd_line_options):
+def runTestCases(test_cases, options, 
+        cmd_line_options, yamloo_filename):
     try:
         assert len(options) != 0, "Length of options is zero!"
     except AssertionError, ae:
@@ -203,17 +204,7 @@ def runTestCases(test_cases, options, cmd_line_options):
             log.msg("options[0] = %s" % first)
             test_inputs = [None]
 
-    if cmd_line_options['reportfile']:
-        report_filename = cmd_line_options['reportfile']
-    else:
-        report_filename = 'report_'+date.timestamp()+'.yamloo'
-
-    if os.path.exists(report_filename):
-        print "Report already exists with filename %s" % report_filename
-        print "Renaming it to %s" % report_filename+'.old'
-        os.rename(report_filename, report_filename+'.old')
-
-    reportFile = open(report_filename, 'w+')
+    reportFile = open(yamloo_filename, 'w+')
     oreporter = reporter.OReporter(reportFile)
     input_unit_factory = InputUnitFactory(test_inputs)
 
@@ -226,6 +217,7 @@ def runTestCases(test_cases, options, cmd_line_options):
             test_class = test_case[0]
             test_method = test_case[1]
             yield runTestWithInputUnit(test_class,
-                        test_method, input_unit, oreporter)
+                        test_method, input_unit, 
+                        oreporter)
     oreporter.allDone()
 
diff --git a/ooni/utils/__init__.py b/ooni/utils/__init__.py
index cd82ab4..9961e03 100644
--- a/ooni/utils/__init__.py
+++ b/ooni/utils/__init__.py
@@ -3,6 +3,7 @@
 """
 
 import imp
+import os
 import logging
 import string
 import random
@@ -55,6 +56,9 @@ class Storage(dict):
         for (k, v) in value.items():
             self[k] = v
 
+def checkForRoot(what):
+    if os.getuid() != 0:
+        raise Exception("This %s requires root to run" % what)
 
 def get_logger(config):
     loglevel = getattr(logging, config.loglevel.upper())
diff --git a/ooni/utils/net.py b/ooni/utils/net.py
index a5a512d..3fd4b41 100644
--- a/ooni/utils/net.py
+++ b/ooni/utils/net.py
@@ -4,25 +4,41 @@
 # --------
 # OONI utilities for networking related operations
 
+import sys
+from twisted.internet import threads, reactor
+
 from scapy.all import utils
-from twisted.internet import defer
-from ooni.utils import log
-from ooni.config import threadpool
+
+from ooni.utils import log, txscapy
 
 def getClientAddress():
     address = {'asn': 'REPLACE_ME',
                'ip': 'REPLACE_ME'}
     return address
 
-def writePacketToPcap(pkt):
-    from scapy.all import utils
-    log.debug("Writing to pcap file %s" % pkt)
-    utils.wrpcap('/tmp/foo.pcap', pkt)
-
-def capturePackets():
+def capturePackets(pcap_filename):
     from scapy.all import sniff
-    return defer.deferToThread(sniff, writePacketToPcap, 
-            lfilter=writePacketToPcap)
+    global stop_packet_capture
+    stop_packet_capture = False
+
+    def stopCapture():
+        # XXX this is a bit of a hack to stop capturing packets when we close
+        # the reactor. Ideally we would want to be able to do this
+        # programmatically, but this requires some work on implementing
+        # properly the sniff function with deferreds.
+        global stop_packet_capture
+        stop_packet_capture = True
+
+    def writePacketToPcap(pkt):
+        from scapy.all import utils
+        pcapwriter = txscapy.TXPcapWriter(pcap_filename, append=True)
+        pcapwriter.write(pkt)
+        if stop_packet_capture:
+            sys.exit(1)
+
+    d = threads.deferToThread(sniff, lfilter=writePacketToPcap)
+    reactor.addSystemEventTrigger('before', 'shutdown', stopCapture)
+    return d
 
 class PermissionsError(SystemExit):
     def __init__(self, *args, **kwargs):
diff --git a/ooniprobe.conf b/ooniprobe.conf
index b7ea1f3..1e76ad7 100644
--- a/ooniprobe.conf
+++ b/ooniprobe.conf
@@ -15,15 +15,12 @@ privacy:
     # Should we include the ASN of the probe in the report?
     includecity: false
     # Should we collect a full packet capture on the client?
-    includepcap: true
+    includepcap: false
 advanced:
     # XXX change this to point to the directory where you have stored the GeoIP
     # database file. This should be the directory in which OONI is installed
     # /path/to/ooni-probe/data/
     geoip_data_dir: /home/x/code/networking/ooni-probe/data/
     debug: true
-<<<<<<< HEAD
     threadpool_size: 10
-=======
->>>>>>> master
 



More information about the tor-commits mailing list