[ooni-probe/master] Make the sniffer not run in a separate thread, but use a non blocking fdesc

commit 615ce75c47aec249b6b5a4c0b58fdf7a93f09582 Author: Arturo Filastò <art@fuffa.org> Date: Sun Nov 25 10:08:47 2012 +0100 Make the sniffer not run in a separate thread, but use a non blocking fdesc * Do some refactoring of scapy testing, following Factory creational pattern and a pub-sub pattern for the readers and writers (inspired by muxTCP). * Other misc refactoring --- ooni/oonicli.py | 6 ++- ooni/templates/scapyt.py | 50 ++++++++--------- ooni/utils/net.py | 23 -------- ooni/utils/txscapy.py | 138 ++++++++++++++++++++++++++++++++-------------- 4 files changed, 126 insertions(+), 91 deletions(-) diff --git a/ooni/oonicli.py b/ooni/oonicli.py index 5c582b2..1a316b3 100644 --- a/ooni/oonicli.py +++ b/ooni/oonicli.py @@ -100,6 +100,7 @@ def runTest(cmd_line_options): classes = runner.findTestClassesFromFile(cmd_line_options['test']) test_cases, options = runner.loadTestsAndOptions(classes, cmd_line_options) if config.privacy.includepcap: + from ooni.utils.txscapy import ScapyFactory, ScapySniffer try: checkForRoot() except NotRootError: @@ -108,7 +109,10 @@ def runTest(cmd_line_options): sys.exit(1) print "Starting sniffer" - net.capturePackets(config.reports.pcap) + config.scapyFactory = ScapyFactory(config.advanced.interface) + + sniffer = ScapySniffer(config.reports.pcap) + config.scapyFactory.registerProtocol(sniffer) return runner.runTestCases(test_cases, options, cmd_line_options) diff --git a/ooni/templates/scapyt.py b/ooni/templates/scapyt.py index 11b4381..cb02300 100644 --- a/ooni/templates/scapyt.py +++ b/ooni/templates/scapyt.py @@ -16,7 +16,7 @@ from ooni.nettest import NetTestCase from ooni.utils import log from ooni import config -from ooni.utils.txscapy import ScapyProtocol, getDefaultIface +from ooni.utils.txscapy import ScapySender, getDefaultIface class BaseScapyTest(NetTestCase): """ @@ -66,20 +66,8 @@ class BaseScapyTest(NetTestCase): else: config.check_TCPerror_seqack = 0 - if config.advanced.interface == 'auto': - self.interface = getDefaultIface() - else: - self.interface = config.advanced.interface - - def reportSentPacket(self, packet): - if 'sent_packets' not in self.report: - self.report['sent_packets'] = [] - self.report['sent_packets'].append(packet) - - def reportReceivedPacket(self, packet): - if 'answered_packets' not in self.report: - self.report['answered_packets'] = [] - self.report['answered_packets'].append(packet) + self.report['sent_packets'] = [] + self.report['answered_packets'] = [] def finishedSendReceive(self, packets): """ @@ -98,8 +86,8 @@ class BaseScapyTest(NetTestCase): sent_packet.src = '127.0.0.1' received_packet.dst = '127.0.0.1' - self.reportSentPacket(sent_packet) - self.reportReceivedPacket(received_packet) + self.report['sent_packets'].append(sent_packet) + self.report['answered_packets'].append(received_packet) return packets def sr(self, packets, *arg, **kw): @@ -107,8 +95,11 @@ class BaseScapyTest(NetTestCase): Wrapper around scapy.sendrecv.sr for sending and receiving of packets at layer 3. """ - scapyProtocol = ScapyProtocol(interface=self.interface, *arg, **kw) - d = scapyProtocol.startSending(packets) + scapySender = ScapySender() + + config.scapyFactory.registerProtocol(scapySender) + + d = scapySender.startSending(packets) d.addCallback(self.finishedSendReceive) return d @@ -123,12 +114,15 @@ class BaseScapyTest(NetTestCase): return packets[0][0][1] except IndexError: log.err("Got no response...") - return None + return packets + + scapySender = ScapySender() + scapySender.expected_answers = 1 + + config.scapyFactory.registerProtocol(scapySender) - scapyProtocol = ScapyProtocol(interface=self.interface, *arg, **kw) - scapyProtocol.expected_answers = 1 log.debug("Running sr1") - d = scapyProtocol.startSending(packets) + d = scapySender.startSending(packets) log.debug("Started to send") d.addCallback(self.finishedSendReceive) d.addCallback(done) @@ -138,9 +132,13 @@ class BaseScapyTest(NetTestCase): """ Wrapper around scapy.sendrecv.send for sending of packets at layer 3 """ - scapyProtocol = ScapyProtocol(interface=self.interface, *arg, **kw) - scapyProtocol.sendPackets(packets) - scapyProtocol.stopSending() + scapySender = ScapySender() + + config.scapyFactory.registerProtocol(scapySender) + + scapySender.sendPackets(packets) + + scapySender.stopSending() for packet in packets: self.reportSentPacket(packet) diff --git a/ooni/utils/net.py b/ooni/utils/net.py index 649dc64..df98412 100644 --- a/ooni/utils/net.py +++ b/ooni/utils/net.py @@ -82,29 +82,6 @@ class BodyReceiver(protocol.Protocol): def connectionLost(self, reason): self.finished.callback(self.data) -def capturePackets(pcap_filename): - from scapy.all import sniff - global stop_packet_capture - stop_packet_capture = False - - def stopCapture(): - # XXX this is a bit of a hack to stop capturing packets when we close - # the reactor. Ideally we would want to be able to do this - # programmatically, but this requires some work on implementing - # properly the sniff function with deferreds. - global stop_packet_capture - stop_packet_capture = True - - def writePacketToPcap(pkt): - from scapy.all import utils - pcapwriter = txscapy.TXPcapWriter(pcap_filename, append=True) - pcapwriter.write(pkt) - if stop_packet_capture: - sys.exit(1) - d = threads.deferToThread(sniff, lfilter=writePacketToPcap) - reactor.addSystemEventTrigger('before', 'shutdown', stopCapture) - return d - def getSystemResolver(): """ XXX implement a function that returns the resolver that is currently diff --git a/ooni/utils/txscapy.py b/ooni/utils/txscapy.py index 679eb42..8e479f8 100644 --- a/ooni/utils/txscapy.py +++ b/ooni/utils/txscapy.py @@ -32,7 +32,9 @@ try: config.pcap_dnet = True except ImportError, e: - log.err("pypcap or dnet not installed. Certain tests may not work.") + log.err("pypcap or dnet not installed. " + "Certain tests may not work.") + config.pcap_dnet = False conf.use_pcap = False conf.use_dnet = False @@ -59,6 +61,9 @@ def getNetworksFromRoutes(): return networks +class IfaceError(Exception): + pass + def getDefaultIface(): networks = getNetworksFromRoutes() for net in networks: @@ -66,45 +71,100 @@ def getDefaultIface(): return net.iface raise IfaceError -class TXPcapWriter(PcapWriter): - def __init__(self, *arg, **kw): - PcapWriter.__init__(self, *arg, **kw) - fdesc.setNonBlocking(self.f) +class ProtocolNotRegistered(Exception): + pass -class ScapyProtocol(abstract.FileDescriptor): +class ProtocolAlreadyRegistered(Exception): + pass + +class ScapyFactory(abstract.FileDescriptor): + """ + Inspired by muxTCP scapyLink: + https://github.com/enki/muXTCP/blob/master/scapyLink.py + """ def __init__(self, interface, super_socket=None, timeout=5): abstract.FileDescriptor.__init__(self, reactor) + if interface == 'auto': + interface = getDefaultIface() if not super_socket: - super_socket = conf.L3socket(iface=interface, promisc=True, filter='') + super_socket = conf.L3socket(iface=interface, + promisc=True, filter='') #super_socket = conf.L2socket(iface=interface) + self.protocols = [] fdesc._setCloseOnExec(super_socket.ins.fileno()) self.super_socket = super_socket - self.interface = interface - self.timeout = timeout + def writeSomeData(self, data): + """ + XXX we actually want to use this, but this requires overriding doWrite + or writeSequence. + """ + pass - # This dict is used to store the unique hashes that allow scapy to - # match up request with answer - self.hr_sent_packets = {} + def send(self, packet): + """ + Write a scapy packet to the wire. + """ + return self.super_socket.send(packet) - # These are the packets we have received as answer to the ones we sent - self.answered_packets = [] + def fileno(self): + return self.super_socket.ins.fileno() - # These are the packets we send - self.sent_packets = [] + def doRead(self): + packet = self.super_socket.recv(MTU) + for protocol in self.protocols: + protocol.packetReceived(packet) + + def registerProtocol(self, protocol): + if not self.connected: + self.startReading() + + if protocol not in self.protocols: + protocol.factory = self + self.protocols.append(protocol) + else: + raise ProtocolAlreadyRegistered + + def unRegisterProtocol(self, protocol): + if protocol in self.protocols: + self.protocols.remove(protocol) + if len(self.protocols) == 0: + self.loseConnection() + else: + raise ProtocolNotRegistered + +class ScapyProtocol(object): + factory = None + + def packetReceived(self, packet): + """ + When you register a protocol, this method will be called with argument + the packet it received. - # This deferred will fire when we have finished sending a receiving packets. - self.d = defer.Deferred() - # Should we look for multiple answers for the same sent packet? - self.multi = False + Every protocol that is registered will have this method called. + """ + raise NotImplementedError - # When 0 we stop when all the packets we have sent have received an - # answer - self.expected_answers = 0 +class ScapySender(ScapyProtocol): + timeout = 5 + # This dict is used to store the unique hashes that allow scapy to + # match up request with answer + hr_sent_packets = {} - def fileno(self): - return self.super_socket.ins.fileno() + # These are the packets we have received as answer to the ones we sent + answered_packets = [] + + # These are the packets we send + sent_packets = [] + + # This deferred will fire when we have finished sending a receiving packets. + # Should we look for multiple answers for the same sent packet? + multi = False + + # When 0 we stop when all the packets we have sent have received an + # answer + expected_answers = 0 def processPacket(self, packet): """ @@ -131,11 +191,10 @@ class ScapyProtocol(abstract.FileDescriptor): log.debug("Got the number of expected answers") self.stopSending() - def doRead(self): + def packetReceived(self, packet): timeout = time.time() - self._start_time if self.timeout and time.time() - self._start_time > self.timeout: self.stopSending() - packet = self.super_socket.recv(MTU) if packet: self.processPacket(packet) # A string that has the same value for the request than for the @@ -146,18 +205,9 @@ class ScapyProtocol(abstract.FileDescriptor): self.processAnswer(packet, answer_hr) def stopSending(self): - self.stopReading() - self.super_socket.close() - if hasattr(self, "d"): - result = (self.answered_packets, self.sent_packets) - self.d.callback(result) - del self.d - - def write(self, packet): - """ - Write a scapy packet to the wire. - """ - return self.super_socket.send(packet) + result = (self.answered_packets, self.sent_packets) + self.d.callback(result) + self.factory.unRegisterProtocol(self) def sendPackets(self, packets): if not isinstance(packets, Gen): @@ -169,12 +219,18 @@ class ScapyProtocol(abstract.FileDescriptor): else: self.hr_sent_packets[hashret] = [packet] self.sent_packets.append(packet) - self.write(packet) + self.factory.send(packet) def startSending(self, packets): self._start_time = time.time() - self.startReading() + self.d = defer.Deferred() self.sendPackets(packets) return self.d +class ScapySniffer(ScapyProtocol): + def __init__(self, pcap_filename, *arg, **kw): + self.pcapwriter = PcapWriter(pcap_filename, *arg, **kw) + + def packetReceived(self, packet): + self.pcapwriter.write(packet)
participants (1)
-
art@torproject.org