commit 8894f057967a779875c8cb5b9c00971408d08fc9 Author: Isis Lovecruft isis@torproject.org Date: Fri Nov 2 17:03:33 2012 +0000
* Moving tests which are not ported to the new API to the top level old-to-be-ported/ directory. --- ooni/example_plugins/examplescapy.py | 49 -------- ooni/example_plugins/skel.py | 29 ----- ooni/hack_this/TO_BE_PORTED | 14 -- ooni/hack_this/dnstamper.py | 200 ------------------------------- ooni/hack_this/tcpscan.py | 84 ------------- ooni/hack_this/traceroute.py | 108 ----------------- ooni/plugins/TESTS_ARE_MOVING.txt | 8 -- ooni/plugins/chinatrigger.py | 140 ---------------------- ooni/plugins/daphn3.py | 152 ------------------------ ooni/plugins/domclass.py | 216 ---------------------------------- ooni/plugins/httpt.py | 94 --------------- ooni/plugins/tcpconnect.py | 65 ---------- 12 files changed, 0 insertions(+), 1159 deletions(-)
diff --git a/ooni/example_plugins/examplescapy.py b/ooni/example_plugins/examplescapy.py deleted file mode 100644 index 21a919d..0000000 --- a/ooni/example_plugins/examplescapy.py +++ /dev/null @@ -1,49 +0,0 @@ -import random -from zope.interface import implements -from twisted.python import usage -from twisted.plugin import IPlugin -from twisted.internet import protocol, defer -from ooni.plugoo.tests import ITest, OONITest -from ooni.plugoo.assets import Asset -from ooni.utils import log -from ooni.protocols.scapyproto import ScapyTest - -from ooni.lib.txscapy import txsr, txsend - -class scapyArgs(usage.Options): - optParameters = [] - -class ExampleScapyTest(ScapyTest): - """ - An example of writing a scapy Test - """ - implements(IPlugin, ITest) - - shortName = "example_scapy" - description = "An example of a scapy test" - requirements = None - options = scapyArgs - blocking = False - - receive = True - pcapfile = 'example_scapy.pcap' - def initialize(self, reactor=None): - if not self.reactor: - from twisted.internet import reactor - self.reactor = reactor - - self.request = {} - self.response = {} - - def build_packets(self): - """ - Override this method to build scapy packets. - """ - from scapy.all import IP, TCP - return IP()/TCP() - - def load_assets(self): - return {} - -examplescapy = ExampleScapyTest(None, None, None) - diff --git a/ooni/example_plugins/skel.py b/ooni/example_plugins/skel.py deleted file mode 100644 index 5f46620..0000000 --- a/ooni/example_plugins/skel.py +++ /dev/null @@ -1,29 +0,0 @@ -from zope.interface import implements -from twisted.python import usage -from twisted.plugin import IPlugin -from plugoo.tests import ITest, TwistedTest -import log - -class SkelArgs(usage.Options): - optParameters = [['asset', 'a', None, 'Asset file'], - ['resume', 'r', 0, 'Resume at this index'], - ['other', 'o', None, 'Other arguments']] - -class SkelTest(OONITest): - implements(IPlugin, ITest) - - shortName = "skeleton" - description = "Skeleton plugin" - requirements = None - options = SkelArgs - blocking = False - - def load_assets(self): - if self.local_options: - return {'asset': open(self.local_options['asset'])} - else: - return {} - -# We need to instantiate it otherwise getPlugins does not detect it -# XXX Find a way to load plugins without instantiating them. -skel = SkelTest(None, None, None) diff --git a/ooni/hack_this/TO_BE_PORTED b/ooni/hack_this/TO_BE_PORTED deleted file mode 100644 index 49ce5e0..0000000 --- a/ooni/hack_this/TO_BE_PORTED +++ /dev/null @@ -1,14 +0,0 @@ - -The tests in this directory are very old, and have neither been ported to -Twisted, nor to the new twisted.trial API framework. Although, they are not -old in the sense of the *seriously old* OONI code which was written two years -ago. - -These tests should be updated at least to use Twisted. - -If you want to hack on something care free, feel free to mess with these files -because it would be difficult to not improve on them. - -<(A)3 -isis -0x2cdb8b35 diff --git a/ooni/hack_this/dnstamper.py b/ooni/hack_this/dnstamper.py deleted file mode 100644 index d6f87a6..0000000 --- a/ooni/hack_this/dnstamper.py +++ /dev/null @@ -1,200 +0,0 @@ -# -*- coding: utf-8 -*- -""" - dnstamper - ********* - - This test resolves DNS for a list of domain names, one per line, in the - file specified in the ooni-config under the setting "dns_experiment". If - the file is top-1m.txt, the test will be run using Amazon's list of top - one million domains. The experimental dns servers to query should - be specified one per line in assets/dns_servers.txt. - - The test reports censorship if the cardinality of the intersection of - the query result set from the control server and the query result set - from the experimental server is zero, which is to say, if the two sets - have no matching results whatsoever. - - NOTE: This test frequently results in false positives due to GeoIP-based - load balancing on major global sites such as google, facebook, and - youtube, etc. - - :copyright: (c) 2012 Arturo Filastò, Isis Lovecruft - :license: see LICENSE for more details - - TODO: - * Switch to using Twisted's DNS builtins instead of dnspython - * -""" - -import os - -from twisted.names import client -from twisted.internet import reactor -from twisted.internet.protocol import Factory, Protocol -from twisted.python import usage -from twisted.plugin import IPlugin -from zope.interface import implements - -from ooni.plugoo.assets import Asset -from ooni.plugoo.tests import ITest, OONITest -from ooni import log - -class Top1MAsset(Asset): - """ - Class for parsing the Alexa top-1m.txt as an asset. - """ - def __init__(self, file=None): - self = Asset.__init__(self, file) - - def parse_line(self, line): - self = Asset.parse_line(self, line) - return line.split(',')[1].replace('\n','') - -class DNSTamperAsset(Asset): - """ - Creates DNS testing specific Assets. - """ - def __init__(self, file=None): - self = Asset.__init__(self, file) - -class DNSTamperArgs(usage.Options): - optParameters = [['asset', 'a', None, 'Asset file of hostnames to resolve'], - ['controlserver', 'c', '8.8.8.8', 'Known good DNS server'], - ['testservers', 't', None, 'Asset file of the DNS servers to test'], - ['resume', 'r', 0, 'Resume at this index in the asset file']] -''' - def control(self, experiment_result, args): - print "Experiment Result:", experiment_result - print "Args", args - return experiment_result - - def experiment(self, args): -''' - -class DNSTamperTest(OONITest): - implements(IPlugin, ITest) - - shortName = "DNSTamper" - description = "DNS censorship detection test" - requirements = None - options = DNSTamperArgs - blocking = False - - def load_assets(self): - if self.local_options: - if self.local_options['asset']: - assetf = self.local_options['asset'] - if assetf == 'top-1m.txt': - return {'asset': Top1MAsset(assetf)} - else: - return {'asset': DNSTamperAsset(assetf)} - else: - return {} - - def lookup(self, hostname, nameserver): - """ - Resolves a hostname through a DNS nameserver to the corresponding - IP addresses. - """ - def got_result(result): - #self.logger.log(result) - print result - reactor.stop() - - def got_failure(failure): - failure.printTraceback() - reactor.stop() - - res = client.createResolver(servers=[(nameserver, 53)]) - d = res.getHostByName(hostname) - d.addCallbacks(got_result, got_failure) - - ## XXX MAY ALSO BE: - #answer = res.getAddress(servers=[('nameserver', 53)]) - - ret = [] - - for data in answer: - ret.append(data.address) - - return ret - - def reverse_lookup(self, ip, nameserver): - """ - Attempt to do a reverse DNS lookup to determine if the control and exp - sets from a positive result resolve to the same domain, in order to - remove false positives due to GeoIP load balancing. - """ - res = client.createResolver(servers=nameserver) - n = reversename.from_address(ip) - revn = res.query(n, "PTR").__iter__().next().to_text()[:-1] - - return revn - - def experiment(self, *a, **kw): - """ - Compares the lookup() sets of the control and experiment groups. - """ - # this is just a dirty hack - address = kw['data'][0] - ns = kw['data'][1] - - config = self.config - ctrl_ns = config.tests.dns_control_server - - print "ADDRESS: %s" % address - print "NAMESERVER: %s" % ns - - exp = self.lookup(address, ns) - control = self.lookup(address, ctrl_ns) - - result = [] - - if len(set(exp) & set(control)) > 0: - print "Address %s has not tampered with on DNS server %s\n" % (address, ns) - result = (address, ns, exp, control, False) - return result - else: - print "Address %s has possibly been tampered on %s:\nDNS resolution through %s yeilds:\n%s\nAlthough the control group DNS servers resolve to:\n%s" % (address, ns, ns, exp, control) - result = (address, ns, exp, control, True) - - if config.tests.dns_reverse_lookup: - - exprevn = [self.reverse_lookup(ip, ns) for ip in exp] - ctrlrevn = [self.reverse_lookup(ip, ctrl_ns) - for ip in control] - - if len(set(exprevn) & set(ctrlrevn)) > 0: - print "Further testing has eliminated this as a false positive." - else: - print "Reverse DNS on the results returned by %s returned:\n%s\nWhich does not match the expected domainname:\n%s\n" % (ns, exprevn, ctrlrevn) - return result - - else: - print "\n" - return result - -#def run(ooni): -# """ -# Run the test. -# """ -# config = ooni.config -# urls = [] -# -# if (config.tests.dns_experiment == "top-1m.txt"): -# dns_experiment = Top1MAsset(os.path.join(config.main.assetdir, -# config.tests.dns_experiment)) -# else: -# dns_experiment = DNSTAsset(os.path.join(config.main.assetdir, -# config.tests.dns_experiment)) -# dns_experiment_dns = DNSTAsset(os.path.join(config.main.assetdir, -# config.tests.dns_experiment_dns)) -# -# assets = [dns_experiment, dns_experiment_dns] -# -# dnstest = DNST(ooni) -# ooni.logger.info("Beginning dnstamper test...") -# dnstest.run(assets, {'index': 1}) -# ooni.logger.info("Dnstamper test completed!") - -dnstamper = DNSTamperTest(None, None, None) diff --git a/ooni/hack_this/tcpscan.py b/ooni/hack_this/tcpscan.py deleted file mode 100644 index b371c88..0000000 --- a/ooni/hack_this/tcpscan.py +++ /dev/null @@ -1,84 +0,0 @@ -""" - TCP Port Scanner - **************** - - Does a TCP connect scan on the IP:port pairs. - -""" -import os -from gevent import socket -from datetime import datetime -import socks - -from plugoo.assets import Asset -from plugoo.tests import Test - -__plugoo__ = "TCP Port Scanner" -__desc__ = "This a test template to be used to build your own tests" - -class TCPScanAsset(Asset): - """ - This is the asset that should be used by the Test. It will - contain all the code responsible for parsing the asset file - and should be passed on instantiation to the test. - """ - def __init__(self, file=None): - self = Asset.__init__(self, file) - - -class TCPScan(Test): - """ - The main Test class - """ - - def experiment(self, *a, **kw): - """ - Fill this up with the tasks that should be performed - on the "dirty" network and should be compared with the - control. - """ - addr = kw['data'] - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - res = False - try: - self.logger.debug('Doing a connection to %s' % addr) - s.connect((addr.split(':')[0], int(addr.split(':')[1]))) - res = True - except socket.error, msg: - self.logger.debug('Connection failed to %s: %s' % (addr, msg)) - - finally: - s.close() - - return {'Time': datetime.now(), - 'Address': addr, - 'Status': res} - - def control(self): - """ - Fill this up with the control related code. - """ - return True - -def run(ooni, asset=None): - """ - This is the function that will be called by OONI - and it is responsible for instantiating and passing - the arguments to the Test class. - """ - config = ooni.config - - # This the assets array to be passed to the run function of - # the test - if asset: - assets = [TCPScanAsset(asset)] - else: - assets = [TCPScanAsset(os.path.join(config.main.assetdir, \ - "tcpscan.txt"))] - - # Instantiate the Test - thetest = TCPScan(ooni) - ooni.logger.info("starting TCP Scan...") - # Run the test with argument assets - thetest.run(assets) - ooni.logger.info("finished.") diff --git a/ooni/hack_this/traceroute.py b/ooni/hack_this/traceroute.py deleted file mode 100644 index e8252c1..0000000 --- a/ooni/hack_this/traceroute.py +++ /dev/null @@ -1,108 +0,0 @@ -try: - from dns import resolver -except: - print "Error: dnspython is not installed (http://www.dnspython.org/)" -import gevent -import os -import plugoo - -try: - import scapy -except: - print "Error: traceroute plugin requires scapy to be installed (http://www.secdev.org/projects/scapy)" - -from plugoo.assets import Asset -from plugoo.tests import Test - -import socket - -__plugoo__ = "Traceroute" -__desc__ = "Performs TTL walking tests" - -class TracerouteAsset(Asset): - def __init__(self, file=None): - self = Asset.__init__(self, file) - - -class Traceroute(Test): - """A *very* quick and dirty traceroute implementation, UDP and TCP - """ - def traceroute(self, dst, dst_port=3880, src_port=3000, proto="tcp", max_hops=30): - dest_addr = socket.gethostbyname(dst) - print "Doing traceroute on %s" % dst - - recv = socket.getprotobyname('icmp') - send = socket.getprotobyname(proto) - ttl = 1 - while True: - recv_sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, recv) - if proto == "tcp": - send_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, send) - else: - send_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, send) - recv_sock.settimeout(10) - send_sock.settimeout(10) - - send_sock.setsockopt(socket.SOL_IP, socket.IP_TTL, ttl) - recv_sock.bind(("", src_port)) - if proto == "tcp": - try: - send_sock.settimeout(2) - send_sock.connect((dst, dst_port)) - except socket.timeout: - pass - - except Exception, e: - print "Error doing connect %s" % e - else: - send_sock.sendto("", (dst, dst_port)) - - curr_addr = None - try: - print "receiving data..." - _, curr_addr = recv_sock.recvfrom(512) - curr_addr = curr_addr[0] - - except socket.error, e: - print "SOCKET ERROR: %s" % e - - except Exception, e: - print "ERROR: %s" % e - - finally: - send_sock.close() - recv_sock.close() - - if curr_addr is not None: - curr_host = "%s" % curr_addr - else: - curr_host = "*" - - print "%d\t%s" % (ttl, curr_host) - - if curr_addr == dest_addr or ttl > max_hops: - break - - ttl += 1 - - - def experiment(self, *a, **kw): - # this is just a dirty hack - address = kw['data'][0] - - self.traceroute(address) - -def run(ooni): - """Run the test""" - config = ooni.config - urls = [] - - traceroute_experiment = TracerouteAsset(os.path.join(config.main.assetdir, \ - config.tests.traceroute)) - - assets = [traceroute_experiment] - - traceroute = Traceroute(ooni) - ooni.logger.info("starting traceroute test") - traceroute.run(assets) - ooni.logger.info("finished") diff --git a/ooni/plugins/TESTS_ARE_MOVING.txt b/ooni/plugins/TESTS_ARE_MOVING.txt deleted file mode 100644 index f4c0084..0000000 --- a/ooni/plugins/TESTS_ARE_MOVING.txt +++ /dev/null @@ -1,8 +0,0 @@ -7/10/2012 - -All new tests will be moved to the directory /nettests/. - -Tests that are in this directory are either here for historical reasons or have -not yet been properly tested and fully supporting the new API. - -A. diff --git a/ooni/plugins/chinatrigger.py b/ooni/plugins/chinatrigger.py deleted file mode 100644 index cf4bcb3..0000000 --- a/ooni/plugins/chinatrigger.py +++ /dev/null @@ -1,140 +0,0 @@ -import random -import string -import struct -import time - -from zope.interface import implements -from twisted.python import usage -from twisted.plugin import IPlugin -from twisted.internet import protocol, defer -from ooni.plugoo.tests import ITest, OONITest -from ooni.plugoo.assets import Asset -from ooni.utils import log -from ooni.protocols.scapyproto import ScapyTest - -from ooni.lib.txscapy import txsr, txsend - -class scapyArgs(usage.Options): - optParameters = [['dst', 'd', None, 'Specify the target address'], - ['port', 'p', None, 'Specify the target port'], - ['pcap', 'f', None, 'The pcap file to write with the sent and received packets'], - ] - -class ChinaTriggerTest(ScapyTest): - """ - This test is a OONI based implementation of the C tool written - by Philipp Winter to engage chinese probes in active scanning. - - Example of running it: - ./ooni/ooniprobe.py chinatrigger -d 127.0.0.1 -p 8080 -f bla.pcap - """ - implements(IPlugin, ITest) - - shortName = "chinatrigger" - description = "Triggers the chinese probes into scanning" - requirements = ['root'] - options = scapyArgs - blocking = False - - receive = True - pcapfile = 'example_scapy.pcap' - timeout = 5 - - def initialize(self, reactor=None): - if not self.reactor: - from twisted.internet import reactor - self.reactor = reactor - - @staticmethod - def set_random_servername(pkt): - ret = pkt[:121] - for i in range(16): - ret += random.choice(string.ascii_lowercase) - ret += pkt[121+16:] - return ret - - @staticmethod - def set_random_time(pkt): - ret = pkt[:11] - ret += struct.pack('!I', int(time.time())) - ret += pkt[11+4:] - return ret - - @staticmethod - def set_random_field(pkt): - ret = pkt[:15] - for i in range(28): - ret += chr(random.randint(0, 256)) - ret += pkt[15+28:] - return ret - - @staticmethod - def mutate(pkt, idx): - """ - Slightly changed mutate function. - """ - ret = pkt[:idx-1] - mutation = chr(random.randint(0, 256)) - while mutation == pkt[idx]: - mutation = chr(random.randint(0, 256)) - ret += mutation - ret += pkt[idx:] - return ret - - @staticmethod - def set_all_random_fields(pkt): - pkt = ChinaTriggerTest.set_random_servername(pkt) - pkt = ChinaTriggerTest.set_random_time(pkt) - pkt = ChinaTriggerTest.set_random_field(pkt) - return pkt - - def build_packets(self, *args, **kw): - """ - Override this method to build scapy packets. - """ - from scapy.all import IP, TCP - pkt = "\x16\x03\x01\x00\xcc\x01\x00\x00\xc8"\ - "\x03\x01\x4f\x12\xe5\x63\x3f\xef\x7d"\ - "\x20\xb9\x94\xaa\x04\xb0\xc1\xd4\x8c"\ - "\x50\xcd\xe2\xf9\x2f\xa9\xfb\x78\xca"\ - "\x02\xa8\x73\xe7\x0e\xa8\xf9\x00\x00"\ - "\x3a\xc0\x0a\xc0\x14\x00\x39\x00\x38"\ - "\xc0\x0f\xc0\x05\x00\x35\xc0\x07\xc0"\ - "\x09\xc0\x11\xc0\x13\x00\x33\x00\x32"\ - "\xc0\x0c\xc0\x0e\xc0\x02\xc0\x04\x00"\ - "\x04\x00\x05\x00\x2f\xc0\x08\xc0\x12"\ - "\x00\x16\x00\x13\xc0\x0d\xc0\x03\xfe"\ - "\xff\x00\x0a\x00\xff\x01\x00\x00\x65"\ - "\x00\x00\x00\x1d\x00\x1b\x00\x00\x18"\ - "\x77\x77\x77\x2e\x67\x6e\x6c\x69\x67"\ - "\x78\x7a\x70\x79\x76\x6f\x35\x66\x76"\ - "\x6b\x64\x2e\x63\x6f\x6d\x00\x0b\x00"\ - "\x04\x03\x00\x01\x02\x00\x0a\x00\x34"\ - "\x00\x32\x00\x01\x00\x02\x00\x03\x00"\ - "\x04\x00\x05\x00\x06\x00\x07\x00\x08"\ - "\x00\x09\x00\x0a\x00\x0b\x00\x0c\x00"\ - "\x0d\x00\x0e\x00\x0f\x00\x10\x00\x11"\ - "\x00\x12\x00\x13\x00\x14\x00\x15\x00"\ - "\x16\x00\x17\x00\x18\x00\x19\x00\x23"\ - "\x00\x00" - - pkt = ChinaTriggerTest.set_all_random_fields(pkt) - pkts = [IP(dst=self.dst)/TCP(dport=self.port)/pkt] - for x in range(len(pkt)): - mutation = IP(dst=self.dst)/TCP(dport=self.port)/ChinaTriggerTest.mutate(pkt, x) - pkts.append(mutation) - return pkts - - def load_assets(self): - if self.local_options: - self.dst = self.local_options['dst'] - self.port = int(self.local_options['port']) - if self.local_options['pcap']: - self.pcapfile = self.local_options['pcap'] - if not self.port or not self.dst: - pass - - return {} - -#chinatrigger = ChinaTriggerTest(None, None, None) - diff --git a/ooni/plugins/daphn3.py b/ooni/plugins/daphn3.py deleted file mode 100644 index bf4d60d..0000000 --- a/ooni/plugins/daphn3.py +++ /dev/null @@ -1,152 +0,0 @@ -""" -This is a self genrated test created by scaffolding.py. -you will need to fill it up with all your necessities. -Safe hacking :). -""" -from zope.interface import implements -from twisted.python import usage -from twisted.plugin import IPlugin -from twisted.internet import protocol, endpoints - -from ooni.plugoo import reports -from ooni.plugoo.tests import ITest, OONITest -from ooni.plugoo.assets import Asset -from ooni.protocols import daphn3 -from ooni.utils import log - -class Daphn3ClientProtocol(daphn3.Daphn3Protocol): - def connectionMade(self): - self.next_state() - -class Daphn3ClientFactory(protocol.ClientFactory): - protocol = Daphn3ClientProtocol - mutator = None - steps = None - test = None - - def buildProtocol(self, addr): - p = self.protocol() - p.factory = self - p.test = self.test - - if self.steps: - p.steps = self.steps - - if not self.mutator: - self.mutator = daphn3.Mutator(p.steps) - - else: - print "Moving on to next mutation" - self.mutator.next() - - p.mutator = self.mutator - p.current_state = self.mutator.state() - return p - - def clientConnectionFailed(self, reason): - print "We failed connecting the the OONIB" - print "Cannot perform test. Perhaps it got blocked?" - print "Please report this to tor-assistants@torproject.org" - self.test.result['error'] = ('Failed in connecting to OONIB', reason) - self.test.end(d) - - def clientConnectionLost(self, reason): - print "Connection Lost." - -class daphn3Args(usage.Options): - optParameters = [['pcap', 'f', None, - 'PCAP to read for generating the YAML output'], - - ['output', 'o', 'daphn3.yaml', - 'What file should be written'], - - ['yaml', 'y', None, - 'The input file to the test'], - - ['host', 'h', None, 'Target Hostname'], - ['port', 'p', None, 'Target port number'], - ['resume', 'r', 0, 'Resume at this index']] - -class daphn3Test(OONITest): - implements(IPlugin, ITest) - - shortName = "daphn3" - description = "daphn3" - requirements = None - options = daphn3Args - blocking = False - - local_options = None - - steps = None - - def initialize(self): - if not self.local_options: - self.end() - return - - self.factory = Daphn3ClientFactory() - self.factory.test = self - - if self.local_options['pcap']: - self.tool = True - - elif self.local_options['yaml']: - self.steps = daphn3.read_yaml(self.local_options['yaml']) - - else: - log.msg("Not enough inputs specified to the test") - self.end() - - def runTool(self): - import yaml - pcap = daphn3.read_pcap(self.local_options['pcap']) - f = open(self.local_options['output'], 'w') - f.write(yaml.dump(pcap)) - f.close() - - def control(self, exp_res, args): - try: - mutation = self.factory.mutator.get(0) - self.result['censored'] = False - except: - mutation = None - - return {'mutation_number': args['mutation'], - 'value': mutation} - - def _failure(self, *argc, **kw): - self.result['censored'] = True - self.result['error'] = ('Failed in connecting', (argc, kw)) - self.end() - - def experiment(self, args): - log.msg("Doing mutation %s" % args['mutation']) - self.factory.steps = self.steps - host = self.local_options['host'] - port = int(self.local_options['port']) - log.msg("Connecting to %s:%s" % (host, port)) - - if self.ended: - return - - endpoint = endpoints.TCP4ClientEndpoint(self.reactor, host, port) - d = endpoint.connect(self.factory) - d.addErrback(self._failure) - return d - - def load_assets(self): - if not self.local_options: - return {} - if not self.steps: - print "Error: No assets!" - self.end() - return {} - mutations = 0 - for x in self.steps: - mutations += len(x['data']) - return {'mutation': range(mutations)} - -# We need to instantiate it otherwise getPlugins does not detect it -# XXX Find a way to load plugins without instantiating them. -#daphn3test = daphn3Test(None, None, None) diff --git a/ooni/plugins/domclass.py b/ooni/plugins/domclass.py deleted file mode 100644 index 3080c40..0000000 --- a/ooni/plugins/domclass.py +++ /dev/null @@ -1,216 +0,0 @@ -#!/usr/bin/env python -#-*- encoding: utf-8 -*- -# -# domclass -# ******** -# -# :copyright: (c) 2012 by Arturo Filastò -# :license: see LICENSE for more details. -# -# how this works -# -------------- -# -# This classifier uses the DOM structure of a website to determine how similar -# the two sites are. -# The procedure we use is the following: -# * First we parse all the DOM tree of the web page and we build a list of -# TAG parent child relationships (ex. <html><a><b></b></a><c></c></html> => -# (html, a), (a, b), (html, c)). -# -# * We then use this information to build a matrix (M) where m[i][j] = P(of -# transitioning from tag[i] to tag[j]). If tag[i] does not exists P() = 0. -# Note: M is a square matrix that is number_of_tags wide. -# -# * We then calculate the eigenvectors (v_i) and eigenvalues (e) of M. -# -# * The corelation between page A and B is given via this formula: -# correlation = dot_product(e_A, e_B), where e_A and e_B are -# resepectively the eigenvalues for the probability matrix A and the -# probability matrix B. -# - -try: - import numpy -except: - print "Error numpy not installed!" - -import yaml -from zope.interface import implements -from twisted.python import usage -from twisted.plugin import IPlugin -from ooni.plugoo.tests import ITest, OONITest -from ooni.plugoo.assets import Asset -from ooni.utils import log -from ooni.protocols.http import HTTPTest - -class domclassArgs(usage.Options): - optParameters = [['output', 'o', None, 'Output to write'], - ['file', 'f', None, 'Corpus file'], - ['fileb', 'b', None, 'Corpus file'], - ['urls', 'u', None, 'URL List'], - ['resume', 'r', 0, 'Resume at this index']] - -# All HTML4 tags -# XXX add link to W3C page where these came from -alltags = ['A', 'ABBR', 'ACRONYM', 'ADDRESS', 'APPLET', 'AREA', 'B', 'BASE', - 'BASEFONT', 'BD', 'BIG', 'BLOCKQUOTE', 'BODY', 'BR', 'BUTTON', 'CAPTION', - 'CENTER', 'CITE', 'CODE', 'COL', 'COLGROUP', 'DD', 'DEL', 'DFN', 'DIR', 'DIV', - 'DL', 'DT', 'E M', 'FIELDSET', 'FONT', 'FORM', 'FRAME', 'FRAMESET', 'H1', 'H2', - 'H3', 'H4', 'H5', 'H6', 'HEAD', 'HR', 'HTML', 'I', 'IFRAME ', 'IMG', - 'INPUT', 'INS', 'ISINDEX', 'KBD', 'LABEL', 'LEGEND', 'LI', 'LINK', 'MAP', - 'MENU', 'META', 'NOFRAMES', 'NOSCRIPT', 'OBJECT', 'OL', 'OPTGROUP', 'OPTION', - 'P', 'PARAM', 'PRE', 'Q', 'S', 'SAMP', 'SCRIPT', 'SELECT', 'SMALL', 'SPAN', - 'STRIKE', 'STRONG', 'STYLE', 'SUB', 'SUP', 'TABLE', 'TBODY', 'TD', - 'TEXTAREA', 'TFOOT', 'TH', 'THEAD', 'TITLE', 'TR', 'TT', 'U', 'UL', 'VAR'] - -# Reduced subset of only the most common tags -commontags = ['A', 'B', 'BLOCKQUOTE', 'BODY', 'BR', 'BUTTON', 'CAPTION', - 'CENTER', 'CITE', 'CODE', 'COL', 'DD', 'DIV', - 'DL', 'DT', 'EM', 'FIELDSET', 'FONT', 'FORM', 'FRAME', 'FRAMESET', 'H1', 'H2', - 'H3', 'H4', 'H5', 'H6', 'HEAD', 'HR', 'HTML', 'IFRAME ', 'IMG', - 'INPUT', 'INS', 'LABEL', 'LEGEND', 'LI', 'LINK', 'MAP', - 'MENU', 'META', 'NOFRAMES', 'NOSCRIPT', 'OBJECT', 'OL', 'OPTION', - 'P', 'PRE', 'SCRIPT', 'SELECT', 'SMALL', 'SPAN', - 'STRIKE', 'STRONG', 'STYLE', 'SUB', 'SUP', 'TABLE', 'TBODY', 'TD', - 'TEXTAREA', 'TFOOT', 'TH', 'THEAD', 'TITLE', 'TR', 'TT', 'U', 'UL'] - -# The tags we are intested in using for our analysis -thetags = ['A', 'DIV', 'FRAME', 'H1', 'H2', - 'H3', 'H4', 'IFRAME ', 'INPUT', - 'LABEL','LI', 'P', 'SCRIPT', 'SPAN', - 'STYLE', 'TR'] - -def compute_probability_matrix(dataset): - """ - Compute the probability matrix based on the input dataset. - - :dataset: an array of pairs representing the parent child relationships. - """ - import itertools - ret = {} - matrix = numpy.zeros((len(thetags) + 1, len(thetags) + 1)) - - for data in dataset: - x = data[0].upper() - y = data[1].upper() - try: - x = thetags.index(x) - except: - x = len(thetags) - - try: - y = thetags.index(y) - except: - y = len(thetags) - - matrix[x,y] += 1 - - for x in xrange(len(thetags) + 1): - possibilities = 0 - for y in matrix[x]: - possibilities += y - - for i in xrange(len(matrix[x])): - if possibilities != 0: - matrix[x][i] = matrix[x][i]/possibilities - - return matrix - -def compute_eigenvalues(matrix): - """ - Returns the eigenvalues of the supplied square matrix. - - :matrix: must be a square matrix and diagonalizable. - """ - return numpy.linalg.eigvals(matrix) - -def readDOM(content=None, filename=None): - """ - Parses the DOM of the HTML page and returns an array of parent, child - pairs. - - :content: the content of the HTML page to be read. - - :filename: the filename to be read from for getting the content of the - page. - """ - from bs4 import BeautifulSoup - - if filename: - f = open(filename) - content = ''.join(f.readlines()) - f.close() - - dom = BeautifulSoup(content) - couples = [] - for x in dom.findAll(): - couples.append((str(x.parent.name), str(x.name))) - - return couples - -class domclassTest(HTTPTest): - implements(IPlugin, ITest) - - shortName = "domclass" - description = "domclass" - requirements = None - options = domclassArgs - blocking = False - - follow_redirects = True - #tool = True - - def runTool(self): - site_a = readDOM(filename=self.local_options['file']) - site_b = readDOM(filename=self.local_options['fileb']) - a = {} - a['matrix'] = compute_probability_matrix(site_a) - a['eigen'] = compute_eigenvalues(a['matrix']) - - self.result['eigenvalues'] = a['eigen'] - b = {} - b['matrix'] = compute_probability_matrix(site_b) - b['eigen'] = compute_eigenvalues(b['matrix']) - - #print "A: %s" % a - #print "B: %s" % b - correlation = numpy.vdot(a['eigen'],b['eigen']) - correlation /= numpy.linalg.norm(a['eigen'])*numpy.linalg.norm(b['eigen']) - correlation = (correlation + 1)/2 - print "Corelation: %s" % correlation - self.end() - return a - - def processResponseBody(self, data): - site_a = readDOM(data) - #site_b = readDOM(self.local_options['fileb']) - a = {} - a['matrix'] = compute_probability_matrix(site_a) - a['eigen'] = compute_eigenvalues(a['matrix']) - - - if len(data) == 0: - self.result['eigenvalues'] = None - self.result['matrix'] = None - else: - self.result['eigenvalues'] = a['eigen'] - #self.result['matrix'] = a['matrix'] - #self.result['content'] = data[:200] - #b = compute_matrix(site_b) - print "A: %s" % a - return a['eigen'] - - def load_assets(self): - if self.local_options: - if self.local_options['file']: - self.tool = True - return {} - elif self.local_options['urls']: - return {'url': Asset(self.local_options['urls'])} - else: - self.end() - return {} - else: - return {} - -#domclass = domclassTest(None, None, None) diff --git a/ooni/plugins/httpt.py b/ooni/plugins/httpt.py deleted file mode 100644 index 358f1ea..0000000 --- a/ooni/plugins/httpt.py +++ /dev/null @@ -1,94 +0,0 @@ -""" -This is a self genrated test created by scaffolding.py. -you will need to fill it up with all your necessities. -Safe hacking :). -""" -from zope.interface import implements -from twisted.python import usage -from twisted.plugin import IPlugin -from ooni.plugoo.tests import ITest, OONITest -from ooni.plugoo.assets import Asset -from ooni.protocols import http -from ooni.utils import log - -class httptArgs(usage.Options): - optParameters = [['urls', 'f', None, 'Urls file'], - ['url', 'u', 'http://torproject.org/', 'Test single site'], - ['resume', 'r', 0, 'Resume at this index'], - ['rules', 'y', None, 'Specify the redirect rules file']] - -class httptTest(http.HTTPTest): - implements(IPlugin, ITest) - - shortName = "httpt" - description = "httpt" - requirements = None - options = httptArgs - blocking = False - - - def testPattern(self, value, pattern, type): - if type == 'eq': - return value == pattern - elif type == 're': - import re - if re.match(pattern, value): - return True - else: - return False - else: - return None - - def testPatterns(self, patterns, location): - test_result = False - - if type(patterns) == list: - for pattern in patterns: - test_result |= self.testPattern(location, pattern['value'], pattern['type']) - else: - test_result |= self.testPattern(location, patterns['value'], patterns['type']) - - return test_result - - def testRules(self, rules, location): - result = {} - blocked = False - for rule, value in rules.items(): - current_rule = {} - current_rule['name'] = value['name'] - current_rule['patterns'] = value['patterns'] - current_rule['test'] = self.testPatterns(value['patterns'], location) - blocked |= current_rule['test'] - result[rule] = current_rule - result['blocked'] = blocked - return result - - def processRedirect(self, location): - self.result['redirect'] = None - try: - rules_file = self.local_options['rules'] - import yaml - rules = yaml.load(open(rules_file)) - log.msg("Testing rules %s" % rules) - redirect = self.testRules(rules, location) - self.result['redirect'] = redirect - except TypeError: - log.msg("No rules file. Got a redirect, but nothing to do.") - - - def control(self, experiment_result, args): - print self.response - print self.request - # What you return here ends up inside of the report. - log.msg("Running control") - return {} - - def load_assets(self): - if self.local_options and self.local_options['urls']: - return {'url': Asset(self.local_options['urls'])} - else: - return {} - -# We need to instantiate it otherwise getPlugins does not detect it -# XXX Find a way to load plugins without instantiating them. -#httpt = httptTest(None, None, None) diff --git a/ooni/plugins/tcpconnect.py b/ooni/plugins/tcpconnect.py deleted file mode 100644 index 7758a9e..0000000 --- a/ooni/plugins/tcpconnect.py +++ /dev/null @@ -1,65 +0,0 @@ -""" -This is a self genrated test created by scaffolding.py. -you will need to fill it up with all your necessities. -Safe hacking :). -""" -from zope.interface import implements -from twisted.python import usage -from twisted.plugin import IPlugin -from twisted.internet.protocol import Factory, Protocol -from twisted.internet.endpoints import TCP4ClientEndpoint - -from ooni.plugoo.interface import ITest -from ooni.plugoo.tests import OONITest -from ooni.plugoo.assets import Asset -from ooni.utils import log - -class tcpconnectArgs(usage.Options): - optParameters = [['asset', 'a', None, 'File containing IP:PORT combinations, one per line.'], - ['resume', 'r', 0, 'Resume at this index']] - -class tcpconnectTest(OONITest): - implements(IPlugin, ITest) - - shortName = "tcpconnect" - description = "tcpconnect" - requirements = None - options = tcpconnectArgs - blocking = False - - def experiment(self, args): - try: - host, port = args['asset'].split(':') - except: - raise Exception("Error in parsing asset. Wrong format?") - class DummyFactory(Factory): - def buildProtocol(self, addr): - return Protocol() - - def gotProtocol(p): - p.transport.loseConnection() - log.msg("Got a connection!") - log.msg(str(p)) - return {'result': True, 'target': [host, port]} - - def gotError(err): - log.msg("Had error :(") - log.msg(err) - return {'result': False, 'target': [host, port]} - - # What you return here gets handed as input to control - point = TCP4ClientEndpoint(self.reactor, host, int(port)) - d = point.connect(DummyFactory()) - d.addCallback(gotProtocol) - d.addErrback(gotError) - return d - - def load_assets(self): - if self.local_options: - return {'asset': Asset(self.local_options['asset'])} - else: - return {} - -# We need to instantiate it otherwise getPlugins does not detect it -# XXX Find a way to load plugins without instantiating them. -#tcpconnect = tcpconnectTest(None, None, None)
tor-commits@lists.torproject.org