[tor-commits] [ooni-probe/master] Add ICMP traceroute

art at torproject.org art at torproject.org
Thu May 31 03:01:42 UTC 2012


commit e452b755b750781b14b372780966e67a7af2f0fc
Author: Arturo Filastò <hellais at torproject.org>
Date:   Thu May 3 00:29:59 2012 +0200

    Add ICMP traceroute
---
 lib/traceroute.py   |   32 +++++
 lib/txtraceroute.py |  314 +++++++++++++++++++++++++++++++++++++++++++++++++++
 oonicli.py          |   16 ++-
 3 files changed, 357 insertions(+), 5 deletions(-)

diff --git a/lib/traceroute.py b/lib/traceroute.py
new file mode 100644
index 0000000..98015ea
--- /dev/null
+++ b/lib/traceroute.py
@@ -0,0 +1,32 @@
+import sys
+import socket
+from twisted.internet import defer
+from twisted.internet import reactor
+from twisted.internet import threads
+
+from txtraceroute import traceroute
+
+def run(target, src_port, dst_port):
+    res = []
+    @defer.inlineCallbacks
+    def start_trace(target, **settings):
+        hops = yield traceroute(target, **settings)
+        for hop in hops:
+            res.append(hop.get())
+        reactor.stop()
+
+    settings = dict(hop_callback=None,
+                    timeout=2,
+                    max_tries=3,
+                    max_hops=30)
+    try:
+        target = socket.gethostbyname(target)
+    except Exception, e:
+        print("could not resolve '%s': %s" % (target, str(e)))
+        sys.exit(1)
+
+    reactor.callWhenRunning(start_trace, target, **settings)
+    reactor.run()
+    return res
+
+print run("google.com")
diff --git a/lib/txtraceroute.py b/lib/txtraceroute.py
new file mode 100644
index 0000000..f74dfb1
--- /dev/null
+++ b/lib/txtraceroute.py
@@ -0,0 +1,314 @@
+#!/usr/bin/env python
+# coding: utf-8
+#
+# Copyright (c) 2012 Alexandre Fiori
+#
+# Permission is hereby granted, free of charge, to any person obtaining
+# a copy of this software and associated documentation files (the
+# "Software"), to deal in the Software without restriction, including
+# without limitation the rights to use, copy, modify, merge, publish,
+# distribute, sublicense, and/or sell copies of the Software, and to
+# permit persons to whom the Software is furnished to do so, subject to
+# the following conditions:
+#
+# The above copyright notice and this permission notice shall be
+# included in all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
+# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
+# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+import json
+import operator
+import os
+import socket
+import struct
+import sys
+import time
+
+from twisted.internet import defer
+from twisted.internet import reactor
+from twisted.internet import threads
+from twisted.python import usage
+from twisted.web.client import getPage
+
+
+class iphdr(object):
+    def __init__(self, proto=socket.IPPROTO_ICMP, src="0.0.0.0", dst=None):
+        self.version = 4
+        self.hlen = 5
+        self.tos = 0
+        self.length = 20
+        self.id = os.getpid()
+        self.frag = 0
+        self.ttl = 255
+        self.proto = proto
+        self.cksum = 0
+        self.src = src
+        self.saddr = socket.inet_aton(src)
+        self.dst = dst or "0.0.0.0"
+        self.daddr = socket.inet_aton(self.dst)
+        self.data = ""
+
+    def assemble(self):
+        header = struct.pack('BBHHHBB',
+                             (self.version & 0x0f) << 4 | (self.hlen & 0x0f),
+                             self.tos, self.length + len(self.data),
+                             socket.htons(self.id), self.frag,
+                             self.ttl, self.proto)
+        return header + "\000\000" + self.saddr + self.daddr + self.data
+
+    @classmethod
+    def disassemble(self, data):
+        ip = iphdr()
+        pkt = struct.unpack('!BBHHHBBH', data[:12])
+        ip.version = (pkt[0] >> 4 & 0x0f)
+        ip.hlen = (pkt[0] & 0x0f)
+        ip.tos, ip.length, ip.id, ip.frag, ip.ttl, ip.proto, ip.cksum = pkt[1:]
+        ip.saddr = data[12:16]
+        ip.daddr = data[16:20]
+        ip.src = socket.inet_ntoa(ip.saddr)
+        ip.dst = socket.inet_ntoa(ip.daddr)
+        return ip
+
+    def __repr__(self):
+        return "IP (tos %s, ttl %s, id %s, frag %s, proto %s, length %s) " \
+               "%s -> %s" % \
+               (self.tos, self.ttl, self.id, self.frag, self.proto,
+                self.length, self.src, self.dst)
+
+class icmphdr(object):
+    def __init__(self, data=""):
+        self.type = 8
+        self.code = 0
+        self.cksum = 0
+        self.id = os.getpid()
+        self.sequence = 0
+        self.data = data
+
+    def assemble(self):
+        part1 = struct.pack("BB", self.type, self.code)
+        part2 = struct.pack("!HH", self.id, self.sequence)
+        cksum = self.checksum(part1 + "\000\000" + part2 + self.data)
+        cksum = struct.pack("!H", cksum)
+        return part1 + cksum + part2 + self.data
+
+    @classmethod
+    def checksum(self, data):
+        if len(data) & 1:
+            data += "\0"
+        cksum = reduce(operator.add,
+                       struct.unpack('!%dH' % (len(data) >> 1), data))
+        cksum = (cksum >> 16) + (cksum & 0xffff)
+        cksum += (cksum >> 16)
+        cksum = (cksum & 0xffff) ^ 0xffff
+        return cksum
+
+    @classmethod
+    def disassemble(self, data):
+        icmp = icmphdr()
+        pkt = struct.unpack("!BBHHH", data)
+        icmp.type, icmp.code, icmp.cksum, icmp.id, icmp.sequence = pkt
+        return icmp
+
+    def __repr__(self):
+        return "ICMP (type %s, code %s, id %s, sequence %s)" % \
+               (self.type, self.code, self.id, self.sequence)
+
+
+ at defer.inlineCallbacks
+def geoip_lookup(ip):
+    try:
+        r = yield getPage("http://freegeoip.net/json/%s" % ip)
+        d = json.loads(r)
+        items = [d["country_name"], d["region_name"], d["city"]]
+        text = ", ".join([s for s in items if s])
+        defer.returnValue(text.encode("utf-8"))
+    except Exception:
+        defer.returnValue("Unknown location")
+
+
+ at defer.inlineCallbacks
+def reverse_lookup(ip):
+    try:
+        r = yield threads.deferToThread(socket.gethostbyaddr, ip)
+        defer.returnValue(r[0])
+    except Exception:
+        defer.returnValue(None)
+
+
+class Hop(object):
+    def __init__(self, target, ttl):
+        self.found = False
+        self.tries = 0
+        self.last_try = 0
+        self.remote_ip = None
+        self.remote_icmp = None
+        self.remote_host = None
+        self.location = ""
+
+        self.ttl = ttl
+        self.ip = iphdr(dst=target)
+        self.ip.ttl = ttl
+        self.ip.id += ttl
+
+        self.icmp = icmphdr("traceroute")
+        self.icmp.id = self.ip.id
+        self.ip.data = self.icmp.assemble()
+
+        self._pkt = self.ip.assemble()
+
+    @property
+    def pkt(self):
+        self.tries += 1
+        self.last_try = time.time()
+        return self._pkt
+
+    def get(self):
+        if self.found:
+            if self.remote_host:
+                ip = self.remote_host
+            else:
+                ip = self.remote_ip.src
+            ping = self.found - self.last_try
+        else:
+            ip = None
+            ping = None
+
+        location = self.location if self.location else None
+        return {'ttl': self.ttl, 'ping': ping, 'ip': ip, 'location': location}
+
+    def __repr__(self):
+        if self.found:
+            if self.remote_host:
+                ip = ":: %s" % self.remote_host
+            else:
+                ip = ":: %s" % self.remote_ip.src
+            ping = "%0.3fs" % (self.found - self.last_try)
+        else:
+            ip = "??"
+            ping = "-"
+
+        location = ":: %s" % self.location if self.location else ""
+        return "%02d. %s %s %s" % (self.ttl, ping, ip, location)
+
+
+class TracerouteProtocol(object):
+    def __init__(self, target, **settings):
+        self.target = target
+        self.settings = settings
+        self.fd = socket.socket(socket.AF_INET, socket.SOCK_RAW,
+                                socket.IPPROTO_ICMP)
+        self.fd.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)
+
+        self.hops = []
+        self.out_queue = []
+        self.waiting = True
+        self.deferred = defer.Deferred()
+
+        reactor.addReader(self)
+        reactor.addWriter(self)
+
+        # send 1st probe packet
+        self.out_queue.append(Hop(self.target, 1))
+
+    def logPrefix(self):
+        return "TracerouteProtocol(%s)" % self.target
+
+    def fileno(self):
+        return self.fd.fileno()
+
+    @defer.inlineCallbacks
+    def hopFound(self, hop, ip, icmp):
+        hop.remote_ip = ip
+        hop.remote_icmp = icmp
+
+        if (ip and icmp):
+            hop.found = time.time()
+            if self.settings.get("geoip_lookup") is True:
+                hop.location = yield geoip_lookup(ip.src)
+
+            if self.settings.get("reverse_lookup") is True:
+                hop.remote_host = yield reverse_lookup(ip.src)
+
+        ttl = hop.ttl + 1
+        last = self.hops[-2:]
+        if len(last) == 2 and last[0].remote_ip == ip or \
+           (ttl > (self.settings.get("max_hops", 30) + 1)):
+            done = True
+        else:
+            done = False
+
+        if not done:
+            cb = self.settings.get("hop_callback")
+            if callable(cb):
+                yield defer.maybeDeferred(cb, hop)
+
+        if not self.waiting:
+            if self.deferred:
+                self.deferred.callback(self.hops)
+                self.deferred = None
+        else:
+            self.out_queue.append(Hop(self.target, ttl))
+
+    def doRead(self):
+        if not self.waiting or not self.hops:
+            return
+
+        pkt = self.fd.recv(4096)
+
+        # disassemble ip header
+        ip = iphdr.disassemble(pkt[:20])
+        if ip.proto != socket.IPPROTO_ICMP:
+            return
+
+        found = False
+
+        # disassemble icmp header
+        icmp = icmphdr.disassemble(pkt[20:28])
+        if icmp.type == 0 and icmp.id == self.hops[-1].icmp.id:
+            found = True
+        elif icmp.type == 11:
+            # disassemble referenced ip header
+            ref = iphdr.disassemble(pkt[28:48])
+            if ref.dst == self.target:
+                found = True
+
+        if ip.src == self.target:
+            self.waiting = False
+
+        if found:
+            self.hopFound(self.hops[-1], ip, icmp)
+
+    def hopTimeout(self, *ign):
+        hop = self.hops[-1]
+        if not hop.found:
+            if hop.tries < self.settings.get("max_tries", 3):
+                # retry
+                self.out_queue.append(hop)
+            else:
+                # give up and move forward
+                self.hopFound(hop, None, None)
+
+    def doWrite(self):
+        if self.waiting and self.out_queue:
+            hop = self.out_queue.pop(0)
+            pkt = hop.pkt
+            if not self.hops or (self.hops and hop.ttl != self.hops[-1].ttl):
+                self.hops.append(hop)
+            self.fd.sendto(pkt, (hop.ip.dst, 0))
+
+            timeout = self.settings.get("timeout", 1)
+            reactor.callLater(timeout, self.hopTimeout)
+
+    def connectionLost(self, why):
+        pass
+
+
+def traceroute(target, **settings):
+    tr = TracerouteProtocol(target, **settings)
+    return tr.deferred
diff --git a/oonicli.py b/oonicli.py
index c2d01d2..68384f8 100755
--- a/oonicli.py
+++ b/oonicli.py
@@ -63,7 +63,7 @@ class StupidAsset(object):
         return self.idx
 
 
-def runTest(test, options):
+def runTest(test, options, global_options):
     asset = None
     if options['asset']:
         print options['asset']
@@ -73,7 +73,10 @@ def runTest(test, options):
     wgen = work.WorkGenerator(asset, plugoo[test].__class__,
             dict(options), start=options['resume'])
 
-    worker = work.Worker()
+    if global_options['parallelism']:
+        wgen.size = int(global_options['parallelism'])
+        worker = work.Worker(wgen.size)
+
     for x in wgen:
         worker.push(x)
 
@@ -92,7 +95,8 @@ class Options(usage.Options):
     ]
 
     optParameters = [
-        ['node', 'n', 'localhost:31415', 'Select target node'],
+        ['parallelism', 'n', 10, "Specify the number of parallel tests to run"],
+        ['target-node', 't', 'localhost:31415', 'Select target node'],
         ['ooninet', 'o', 'localhost:4242', "Select OONI-net address for reporting"],
         ['password', 'p', 'opennetwork', "Specify the password for authentication"],
     ]
@@ -129,8 +133,10 @@ if not config.subCommand:
     sys.exit(1)
 
 if config['local']:
-    runTest(config.subCommand, config.subOptions)
+    runTest(config.subCommand, config.subOptions, config)
 
 else:
-    print "The test will be run on the node %s" % config['node']
+    print "This feature is currently not supported. :("
+    print "Use -l to run the test locally."
+    sys.exit(0)
 





More information about the tor-commits mailing list