commit 6a3ee55b574adaa8740ccafe2e4a01719dc0e86e Author: Arturo Filastò art@fuffa.org Date: Fri Nov 9 13:20:38 2012 +0100
Implement logging to PCAP file support --- docs/source/index.rst | 2 +- ooni/config.py | 17 ++- ooni/lib/txscapy.py | 381 ------------------------------------------------ ooni/nettest.py | 2 +- ooni/oonicli.py | 43 +++++- ooni/runner.py | 24 +-- ooni/utils/__init__.py | 4 + ooni/utils/net.py | 38 ++++-- ooniprobe.conf | 5 +- 9 files changed, 94 insertions(+), 422 deletions(-)
diff --git a/docs/source/index.rst b/docs/source/index.rst index 2497a09..132381f 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -28,7 +28,7 @@ On debian based systems these can be installed with: The python dependencies required for running ooniprobe are:
* Twisted - * Scapy + * Scapy >= 2.2.0 * txtorcon
They can be installed from the requirements.txt with: diff --git a/ooni/config.py b/ooni/config.py index de5f45b..f3d1a80 100644 --- a/ooni/config.py +++ b/ooni/config.py @@ -6,8 +6,9 @@ import os import yaml
-from twisted.internet import reactor +from twisted.internet import reactor, threads
+from ooni.utils import date from ooni.utils import Storage
def get_root_path(): @@ -16,6 +17,18 @@ def get_root_path(): root = os.path.abspath(root) return root
+def oreport_filenames(): + """ + returns the filenames for the pcap file and the yamloo report + + returns + yamloo_filename, pcap_filename + """ + base_filename = "%s_"+date.timestamp()+".%s" + yamloo_filename = base_filename % ("report", "yamloo") + pcap_filename = base_filename % ("packets", "pcap") + return yamloo_filename, pcap_filename + config_file = os.path.join(get_root_path(), 'ooniprobe.conf') try: f = open(config_file) @@ -41,7 +54,5 @@ advanced = Storage() for k, v in configuration['advanced'].items(): advanced[k] = v
-threadpool = ThreadPool(0, advanced.threadpool_size) -threadpool.start() # This is used to keep track of the state of the sniffer sniffer_running = None diff --git a/ooni/lib/txscapy.py b/ooni/lib/txscapy.py deleted file mode 100644 index 00224d6..0000000 --- a/ooni/lib/txscapy.py +++ /dev/null @@ -1,381 +0,0 @@ -# -*- coding:utf8 -*- -""" -txscapy -******* -(c) 2012 Arturo Filastò -a twisted wrapper for scapys send and receive functions. - -This software has been written to be part of OONI, the Open Observatory of -Network Interference. More information on that here: http://ooni.nu/ - -""" - -import struct -import socket -import os -import sys -import time - -from twisted.internet import protocol, base, fdesc, error, defer -from twisted.internet import reactor, threads -from zope.interface import implements - -from scapy.all import Gen -from scapy.all import SetGen - -from ooni.utils import log - -LINUX=sys.platform.startswith("linux") -OPENBSD=sys.platform.startswith("openbsd") -FREEBSD=sys.platform.startswith("freebsd") -NETBSD=sys.platform.startswith("netbsd") -DARWIN=sys.platform.startswith("darwin") -SOLARIS=sys.platform.startswith("sunos") -WINDOWS=sys.platform.startswith("win32") - -from scapy.all import RawPcapWriter, MTU, BasePacketList, conf -class PcapWriter(RawPcapWriter): - def __init__(self, filename, linktype=None, gz=False, endianness="", - append=False, sync=False): - RawPcapWriter.__init__(self, filename, linktype=linktype, gz=gz, - endianness=endianness, append=append, sync=sync) - fdesc.setNonBlocking(self.f) - - def _write_header(self, pkt): - if self.linktype == None: - if type(pkt) is list or type(pkt) is tuple or isinstance(pkt, BasePacketList): - pkt = pkt[0] - try: - self.linktype = conf.l2types[pkt.__class__] - except KeyError: - self.linktype = 1 - RawPcapWriter._write_header(self, pkt) - - def _write_packet(self, packet): - sec = int(packet.time) - usec = int(round((packet.time-sec)*1000000)) - s = str(packet) - caplen = len(s) - RawPcapWriter._write_packet(self, s, sec, usec, caplen, caplen) - -class ScapySocket(object): - MTU = 1500 - def __init__(self, filter=None, iface=None, nofilter=None): - from scapy.all import conf - self.ssocket = conf.L3socket(filter=filter, iface=iface, nofilter=nofilter) - - def fileno(self): - return self.ssocket.ins.fileno() - - def send(self, data): - return self.ssocket.send(data) - - def recv(self): - if FREEBSD or DARWIN: - return self.ssocket.nonblock_recv() - else: - return self.ssocket.recv(self.MTU) - -class TXScapy(object): - """ - A twisted based wrapper for scapy send and receive functionality. - - It sends packets inside of a threadpool and receives packets using the - libdnet receive non blocking file descriptor. - """ - min = 2 - max = 6 - debug = False - write_only_answers = False - pcapwriter = None - recv = False - timeout_call = None - answers = [] - questions = [] - - def __init__(self, pkts=None, maxPacketSize=8192, reactor=None, filter=None, - iface=None, nofilter=None, pcapfile=None, timeout=None, *arg, **kw): - self.maxPacketSize = maxPacketSize - if not reactor: - from twisted.internet import reactor - - self._reactor = reactor - - if pkts: - self._buildPacketQueues(pkts) - try: - self._buildSocket() - except Exception, e: - log.err("Unable to build socket. Are you root?") - sys.exit() - - self.cthreads = 0 - self.mthreads = 80 - - self.running = False - self.done = False - self.finished = False - - import thread - from twisted.python import threadpool - self.threadID = thread.get_ident - self.threadpool = threadpool.ThreadPool(self.min, self.max) - self.startID = self._reactor.callWhenRunning(self._start) - - self.deferred = defer.Deferred() - - if pcapfile: - self.pcapwriter = PcapWriter(pcapfile) - - if timeout and self.recv: - pass - - def _buildSocket(self, filter=None, iface=None, nofilter=None): - self.socket = ScapySocket(filter, iface, nofilter) - if self.recv: - self._reactor.addReader(self) - - def _buildPacketQueues(self, pkts): - """ - Converts the list of packets to a Scapy generator and sets up all the - necessary attributes for understanding if all the needed responses have - been received. - """ - if not isinstance(pkts, Gen): - self.pkts = SetGen(pkts) - - self.outqueue = [p for p in pkts] - - self.total_count = len(self.outqueue) - self.answer_count = 0 - self.out_count = 0 - - self.hsent = {} - for p in self.outqueue: - h = p.hashret() - if h in self.hsent: - self.hsent[h].append(p) - else: - self.hsent[h] = [p] - - - def gotAnswer(self, answer, question): - """ - Got a packet that has been identified as an answer to one of the sent - out packets. - - If the answer count matches the sent count the finish callback is - fired. - - @param answer: the packet received on the wire. - - @param question: the sent packet that matches that response. - - """ - if self.pcapwriter and self.write_only_answers: - self.pcapwriter.write(question) - self.pcapwriter.write(answer) - self.answers.append(answers) - self.questions.append(question) - self.answer_count += 1 - if self.answer_count >= self.total_count and self.running: - log.debug("Got all the answers I need") - self.finalClose() - self.deferred.callback(None) - - def processAnswer(self, pkt, hlst): - """ - Checks if the potential answer is in fact an answer to one of the - matched sent packets. Uses the scapy .answers() function to verify - this. - - @param pkt: The packet to be tested if is the answer to a sent packet. - - @param hlst: a list of packets that match the hash for an answer to - pkt. - """ - for i in range(len(hlst)): - if pkt.answers(hlst[i]): - self.gotAnswer(pkt, hlst[i]) - - def fileno(self): - """ - Returns a fileno for use by twisteds Reader. - """ - return self.socket.fileno() - - def processPacket(self, pkt): - """ - Override this method to process your packets. - - @param pkt: the packet that has been received. - """ - pkt.show() - - def doRead(self): - """ - There is something to be read on the wire. Do all the processing on the - received packet. - """ - pkt = self.socket.recv() - if not pkt: - return - if self.pcapwriter and not self.write_only_answers: - self.pcapwriter.write(pkt) - - self.processPacket(pkt) - - h = pkt.hashret() - if h in self.hsent: - hlst = self.hsent[h] - self.processAnswer(pkt, hlst) - - def logPrefix(self): - """ - The prefix to be prepended in logging. - """ - return "txScapy" - - def _start(self): - """ - Start the twisted thread pool. - """ - self.startID = None - return self.start() - - def start(self): - """ - Actually start the thread pool. - """ - if not self.running: - self.threadpool.start() - self.shutdownID = self._reactor.addSystemEventTrigger( - 'during', 'shutdown', self.finalClose) - self.running = True - - def sendPkt(self, pkt): - """ - Send a packet to the wire. - - @param pkt: The packet to be sent. - """ - self.socket.send(pkt) - - def timeout(self, *arg, **kw): - if not self.done: - log.debug("I have not finished. Setting to call in %s" % - self.timeoutSeconds) - self.timeout_call = self._reactor.callLater(self.timeoutSeconds, self.timeout, None) - elif self.running: - log.debug("Cancelling timeout call") - self.finalClose() - self.deferred.callback(None) - - def sr(self, pkts, filter=None, iface=None, nofilter=0, timeout=None, *args, **kw): - """ - Wraps the scapy sr function. - - @param nofilter: put 1 to avoid use of bpf filters - - @param retry: if positive, how many times to resend unanswered packets - if negative, how many times to retry when no more packets are - answered (XXX to be implemented) - - @param timeout: how much time to wait after the last packet has - been sent (XXX to be implemented) - - @param multi: whether to accept multiple answers for the same - stimulus (XXX to be implemented) - - @param filter: provide a BPF filter - @param iface: listen answers only on the given interface - """ - log.debug("TXScapy sending and receiving packets") - self.recv = True - if timeout: - self.timeoutSeconds = timeout - self.timeout_call = self._reactor.callLater(timeout, self.timeout, None) - self._sendrcv(pkts, filter=filter, iface=iface, nofilter=nofilter) - - def send(self, pkts, filter=None, iface=None, nofilter=0, *args, **kw): - """ - Wraps the scapy send function. Its the same as send and receive, except - it does not receive. Who would have ever guessed? ;) - - @param nofilter: put 1 to avoid use of bpf filters - - @param retry: if positive, how many times to resend unanswered packets - if negative, how many times to retry when no more packets are - answered (XXX to be implemented) - - @param timeout: how much time to wait after the last packet has - been sent (XXX to be implemented) - - @param multi: whether to accept multiple answers for the same - stimulus (XXX to be implemented) - - @param filter: provide a BPF filter - @param iface: listen answers only on the given interface - """ - self.recv = False - self._sendrcv(pkts, filter=filter, iface=iface, nofilter=nofilter) - - def _sendrcv(self, pkts, filter=None, iface=None, nofilter=0): - self._buildSocket(filter, iface, nofilter) - self._buildPacketQueues(pkts) - def sent(cb): - if self.cthreads < self.mthreads and not self.done: - pkt = None - try: - pkt = self.outqueue.pop() - except: - self.done = True - if not self.recv and self.running: - log.debug("I am not in receiving state running callback") - self.deferred.callback(None) - return - d = threads.deferToThreadPool(reactor, self.threadpool, - self.sendPkt, pkt) - d.addCallback(sent) - return d - - for x in range(self.mthreads): - try: - pkt = self.outqueue.pop() - except: - self.done = True - return - if self.cthreads >= self.mthreads and self.done: - return - d = threads.deferToThreadPool(reactor, self.threadpool, - self.sendPkt, pkt) - d.addCallback(sent) - return d - - def connectionLost(self, why): - pass - - def finalClose(self): - """ - Clean all the shutdown related functions. - """ - self.shutdownID = None - self.threadpool.stop() - if self.timeout_call: - self.timeout_call.cancel() - self.timeout_call = None - self.running = False - -@defer.inlineCallbacks -def txsr(*args, **kw): - tr = TXScapy(*args, **kw) - tr.sr(*args, **kw) - yield tr.deferred - tr.finalClose() - -@defer.inlineCallbacks -def txsend(*arg, **kw): - tr = TXScapy(*arg, **kw) - tr.send(*arg, **kw) - yield tr.deferred - tr.finalClose() diff --git a/ooni/nettest.py b/ooni/nettest.py index 289cd23..6221a3f 100644 --- a/ooni/nettest.py +++ b/ooni/nettest.py @@ -136,7 +136,7 @@ class NetTestCase(object): for x in fp.xreadlines(): yield x.strip() fp.close() - + def _checkRequiredOptions(self): for required_option in self.requiredOptions: log.debug("Checking if %s is present" % required_option) diff --git a/ooni/oonicli.py b/ooni/oonicli.py index 2a6f0cd..4b63a61 100644 --- a/ooni/oonicli.py +++ b/ooni/oonicli.py @@ -27,7 +27,7 @@ from ooni import nettest, runner, reporter, config
from ooni.inputunit import InputUnitFactory
-from ooni.utils import net +from ooni.utils import net, checkForRoot from ooni.utils import log
@@ -44,7 +44,8 @@ class Options(usage.Options, app.ReactorSelectionMixin): 'Report deferred creation and callback stack traces'],]
optParameters = [["reportfile", "o", None, "report file name"], - ["logfile", "l", None, "log file name"],] + ["logfile", "l", None, "log file name"], + ["pcapfile", "p", None, "pcap file name"]]
compData = usage.Completions( extraActions=[usage.CompleteFiles( @@ -75,6 +76,12 @@ class Options(usage.Options, app.ReactorSelectionMixin): except: raise usage.UsageError("No test filename specified!")
+def testsEnded(*arg, **kw): + """ + You can place here all the post shutdown tasks. + """ + log.debug("Finished running all tests") + def run(): """ Call me to begin testing from a file. @@ -90,11 +97,37 @@ def run(): if cmd_line_options['debug-stacktraces']: defer.setDebugging(True)
+ yamloo_filename, pcap_filename = config.oreport_filenames() + + if cmd_line_options['reportfile']: + yamloo_filename = cmd_line_options['reportfile'] + pcap_filename = yamloo_filename+".pcap" + + if os.path.exists(yamloo_filename): + log.msg("Report already exists with filename %s" % yamloo_filename) + log.msg("Renaming it to %s" % yamloo_filename+'.old') + os.rename(yamloo_filename, yamloo_filename+'.old') + if os.path.exists(pcap_filename): + log.msg("Report already exists with filename %s" % pcap_filename) + log.msg("Renaming it to %s" % pcap_filename+'.old') + os.rename(pcap_filename, pcap_filename+'.old') + log.start(cmd_line_options['logfile']) classes = runner.findTestClassesFromConfig(cmd_line_options) test_cases, options = runner.loadTestsAndOptions(classes, cmd_line_options) - d = - runner.runTestCases(test_cases, options, cmd_line_options) - reactor.run() + if config.privacy.includepcap: + try: + checkForRoot() + except: + log.err("includepcap options requires root priviledges to run") + log.err("disable it in your ooniprobe.conf file") + sys.exit(1) + log.debug("Starting sniffer") + sniffer_d = net.capturePackets(pcap_filename)
+ tests_d = runner.runTestCases(test_cases, options, + cmd_line_options, yamloo_filename) + tests_d.addBoth(testsEnded) + + reactor.run()
diff --git a/ooni/runner.py b/ooni/runner.py index 083de35..d8b7df8 100644 --- a/ooni/runner.py +++ b/ooni/runner.py @@ -24,7 +24,8 @@ from ooni.inputunit import InputUnitFactory from ooni.nettest import NetTestCase
from ooni import reporter -from ooni.utils import log, date + +from ooni.utils import log, date, checkForRoot
def processTest(obj, cmd_line_options): """ @@ -41,8 +42,7 @@ def processTest(obj, cmd_line_options):
input_file = obj.inputFile if obj.requiresRoot: - if os.getuid() != 0: - raise Exception("This test requires root to run") + checkForRoot("test")
if obj.optParameters or input_file \ or obj.usageOptions or obj.optFlags: @@ -184,7 +184,8 @@ def runTestWithInputUnit(test_class, return defer.DeferredList(dl)
@defer.inlineCallbacks -def runTestCases(test_cases, options, cmd_line_options): +def runTestCases(test_cases, options, + cmd_line_options, yamloo_filename): try: assert len(options) != 0, "Length of options is zero!" except AssertionError, ae: @@ -203,17 +204,7 @@ def runTestCases(test_cases, options, cmd_line_options): log.msg("options[0] = %s" % first) test_inputs = [None]
- if cmd_line_options['reportfile']: - report_filename = cmd_line_options['reportfile'] - else: - report_filename = 'report_'+date.timestamp()+'.yamloo' - - if os.path.exists(report_filename): - print "Report already exists with filename %s" % report_filename - print "Renaming it to %s" % report_filename+'.old' - os.rename(report_filename, report_filename+'.old') - - reportFile = open(report_filename, 'w+') + reportFile = open(yamloo_filename, 'w+') oreporter = reporter.OReporter(reportFile) input_unit_factory = InputUnitFactory(test_inputs)
@@ -226,6 +217,7 @@ def runTestCases(test_cases, options, cmd_line_options): test_class = test_case[0] test_method = test_case[1] yield runTestWithInputUnit(test_class, - test_method, input_unit, oreporter) + test_method, input_unit, + oreporter) oreporter.allDone()
diff --git a/ooni/utils/__init__.py b/ooni/utils/__init__.py index cd82ab4..9961e03 100644 --- a/ooni/utils/__init__.py +++ b/ooni/utils/__init__.py @@ -3,6 +3,7 @@ """
import imp +import os import logging import string import random @@ -55,6 +56,9 @@ class Storage(dict): for (k, v) in value.items(): self[k] = v
+def checkForRoot(what): + if os.getuid() != 0: + raise Exception("This %s requires root to run" % what)
def get_logger(config): loglevel = getattr(logging, config.loglevel.upper()) diff --git a/ooni/utils/net.py b/ooni/utils/net.py index a5a512d..3fd4b41 100644 --- a/ooni/utils/net.py +++ b/ooni/utils/net.py @@ -4,25 +4,41 @@ # -------- # OONI utilities for networking related operations
+import sys +from twisted.internet import threads, reactor + from scapy.all import utils -from twisted.internet import defer -from ooni.utils import log -from ooni.config import threadpool + +from ooni.utils import log, txscapy
def getClientAddress(): address = {'asn': 'REPLACE_ME', 'ip': 'REPLACE_ME'} return address
-def writePacketToPcap(pkt): - from scapy.all import utils - log.debug("Writing to pcap file %s" % pkt) - utils.wrpcap('/tmp/foo.pcap', pkt) - -def capturePackets(): +def capturePackets(pcap_filename): from scapy.all import sniff - return defer.deferToThread(sniff, writePacketToPcap, - lfilter=writePacketToPcap) + global stop_packet_capture + stop_packet_capture = False + + def stopCapture(): + # XXX this is a bit of a hack to stop capturing packets when we close + # the reactor. Ideally we would want to be able to do this + # programmatically, but this requires some work on implementing + # properly the sniff function with deferreds. + global stop_packet_capture + stop_packet_capture = True + + def writePacketToPcap(pkt): + from scapy.all import utils + pcapwriter = txscapy.TXPcapWriter(pcap_filename, append=True) + pcapwriter.write(pkt) + if stop_packet_capture: + sys.exit(1) + + d = threads.deferToThread(sniff, lfilter=writePacketToPcap) + reactor.addSystemEventTrigger('before', 'shutdown', stopCapture) + return d
class PermissionsError(SystemExit): def __init__(self, *args, **kwargs): diff --git a/ooniprobe.conf b/ooniprobe.conf index b7ea1f3..1e76ad7 100644 --- a/ooniprobe.conf +++ b/ooniprobe.conf @@ -15,15 +15,12 @@ privacy: # Should we include the ASN of the probe in the report? includecity: false # Should we collect a full packet capture on the client? - includepcap: true + includepcap: false advanced: # XXX change this to point to the directory where you have stored the GeoIP # database file. This should be the directory in which OONI is installed # /path/to/ooni-probe/data/ geoip_data_dir: /home/x/code/networking/ooni-probe/data/ debug: true -<<<<<<< HEAD threadpool_size: 10 -======= ->>>>>>> master