commit f3fefed7644f311b94fcd28cd57dda3e7c1eff1c
Author: christian <christian(a)avtok.com>
Date: Tue Aug 30 14:11:46 2011 -0400
Removing deprecated HTMLTest
---
NetworkScanners/ExitAuthority/soat.py | 394 +--------------------------------
1 files changed, 3 insertions(+), 391 deletions(-)
diff --git a/NetworkScanners/ExitAuthority/soat.py b/NetworkScanners/ExitAuthority/soat.py
index 4633dad..5bcf590 100755
--- a/NetworkScanners/ExitAuthority/soat.py
+++ b/NetworkScanners/ExitAuthority/soat.py
@@ -1464,354 +1464,6 @@ def is_script_mimetype(mime_type):
break
return is_script
-class BaseHTMLTest(BaseHTTPTest):
- def __init__(self):
- BaseHTTPTest.__init__(self)
- self.save_name = "HTMLTest"
- self.proto = "HTML" #CA .. ?
-
- def depickle_upgrade(self):
- if self._pickle_revision < 7:
- self.httpcode_fails_per_exit = {}
- Test.depickle_upgrade(self)
-
- def add_target(self, target):
- """Avoid BaseHTTP.add_target which keys entries"""
- Test.add_target(self, target)
-
- def run_test(self):
- # A single test should have a single cookie jar
- self.tor_cookie_jar = cookielib.MozillaCookieJar()
- self.cookie_jar = cookielib.MozillaCookieJar()
- self.headers = copy.copy(firefox_headers)
-
- use_referers = False
- first_referer = None
- if random.randint(1,100) < referer_chance_pct:
- use_referers = True
- # FIXME: Hrmm.. May want to do this a bit better..
- first_referer = random.choice(self.targets)
- plog("INFO", "Chose random referer "+first_referer)
-
- self.tests_run += 1
- address = random.choice(self.targets)
-
- # Keep a trail log for this test and check for loops
- fetched = set([])
-
- self.fetch_queue.append(("html", address, first_referer))
- n_success = n_fail = n_inconclusive = 0
- while self.fetch_queue:
- (test, url, referer) = self.fetch_queue.pop(0)
- if url in fetched:
- plog("INFO", "Already fetched "+url+", skipping")
- continue
- fetched.add(url)
- if use_referers and referer:
- self.headers.append(('Referer', referer))
- # Technically both html and js tests check and dispatch via mime types
- # but I want to know when link tags lie
- if test == "html" or test == "http":
- result = self.check_html(url)
- elif test == "js":
- result = self.check_js(url)
- elif test == "image":
- accept_hdr = filter(lambda h: h[0] == "Accept", self.headers)[0]
- orig_accept = accept_hdr[1]
- accept_hdr[1] = image_accept_hdr
- result = self.check_http(url)
- accept_hdr[1] = orig_accept
- else:
- plog("WARN", "Unknown test type: "+test+" for "+url)
- result = TEST_SUCCESS
- if result == TEST_INCONCLUSIVE:
- n_inconclusive += 1
- if result == TEST_FAILURE:
- n_fail += 1
- if result == TEST_SUCCESS:
- n_success += 1
-
- # Need to clear because the cookiejars use locks...
- self.tor_cookie_jar = None
- self.cookie_jar = None
-
- if n_fail:
- return TEST_FAILURE
- elif 2*n_inconclusive > n_success: # > 33% inconclusive -> redo
- return TEST_INCONCLUSIVE
- else:
- return TEST_SUCCESS
-
- def _add_recursive_targets(self, soup, orig_addr):
- # Only pull at most one filetype from the list of 'a' links
- targets = []
- got_type = {}
- found_favicon = False
- # Hrmm, if we recursively strained only these tags, this might be faster
- for tag in tags_to_recurse:
- tags = soup.findAll(tag)
- for t in tags:
- #plog("DEBUG", "Got tag: "+str(t))
- for a in t.attrs:
- attr_name = a[0]
- attr_tgt = a[1]
- if attr_name in attrs_to_recurse:
- if t.name in recurse_html:
- targets.append(("html", urlparse.urljoin(orig_addr, attr_tgt)))
- elif t.name in recurse_script:
- if t.name == "link":
- for a in t.attrs:
- a = map(lambda x: x.lower(), a)
- # Special case CSS and favicons
- if (a[0] == "type" and a[1] == "text/css") or \
- ((a[0] == "rel" or a[0] == "rev") and a[1] == "stylesheet"):
- plog("INFO", "Adding CSS of: "+str(t))
- targets.append(("http", urlparse.urljoin(orig_addr, attr_tgt)))
- elif (a[0] == "rel" or a[0] == "rev") and \
- ("shortcut" in a[1] or "icon" in a[1]):
- plog("INFO", "Adding favicon of: "+str(t))
- found_favicon = True
- targets.append(("image", urlparse.urljoin(orig_addr, attr_tgt)))
- elif a[0] == "type" and is_script_mimetype(a[1]):
- plog("INFO", "Adding link script of: "+str(t))
- targets.append(("js", urlparse.urljoin(orig_addr, attr_tgt)))
- else:
- plog("INFO", "Adding script tag of: "+str(t))
- targets.append(("js", urlparse.urljoin(orig_addr, attr_tgt)))
- elif t.name in recurse_image:
- plog("INFO", "Adding image tag of: "+str(t))
- targets.append(("image", urlparse.urljoin(orig_addr, attr_tgt)))
- elif t.name == 'a':
- if attr_name == "href":
- for f in self.scan_filetypes:
- if f not in got_type and attr_tgt[-len(f):] == f:
- got_type[f] = 1
- targets.append(("http", urlparse.urljoin(orig_addr, attr_tgt)))
- else:
- targets.append(("http", urlparse.urljoin(orig_addr, attr_tgt)))
-
- if not found_favicon:
- targets.insert(0, ("image", urlparse.urljoin(orig_addr, "/favicon.ico")))
-
- loaded = set([])
-
- for i in targets:
- if i[1] in loaded:
- continue
- loaded.add(i[1])
- if self._is_useable_url(i[1], html_schemes):
- plog("NOTICE", "Adding "+i[0]+" target: "+i[1])
- self.fetch_queue.append((i[0], i[1], orig_addr))
- else:
- plog("NOTICE", "Skipping "+i[0]+" target: "+i[1])
-
- def check_js(self, address):
- plog('INFO', 'Conducting a js test with destination ' + address)
-
- accept_hdr = filter(lambda h: h[0] == "Accept", self.headers)[0]
- orig_accept = accept_hdr[1]
- accept_hdr[1] = script_accept_hdr
- ret = self.check_http_nodynamic(address)
- accept_hdr[1] = orig_accept
-
- if type(ret) == int:
- return ret
- return self._check_js_worker(address, ret)
-
- def _check_js_worker(self, address, http_ret):
- (mime_type, tor_js, tsha, orig_js, osha, new_js, nsha, exit_node) = http_ret
-
- if not is_script_mimetype(mime_type):
- plog("WARN", "Non-script mime type "+mime_type+" fed to JS test for "+address)
-
- if is_html_mimetype(mime_type):
- return self._check_html_worker(address, http_ret)
- else:
- return self._check_http_worker(address, http_ret)
-
- address_file = DataHandler.safeFilename(address.replace('http://',''))
- content_prefix = http_content_dir+address_file
- failed_prefix = http_failed_dir+address_file
-
- if os.path.exists(content_prefix+".jsdiff"):
- plog("DEBUG", "Loading jsdiff for "+address)
- jsdiff = SnakePickler.load(content_prefix+".jsdiff")
- else:
- plog("DEBUG", "No jsdiff for "+address+". Creating+dumping")
- jsdiff = JSDiffer(orig_js)
-
- jsdiff.prune_differences(new_js)
- SnakePickler.dump(jsdiff, content_prefix+".jsdiff")
-
- has_js_changes = jsdiff.contains_differences(tor_js)
-
- if not has_js_changes:
- result = JsTestResult(self.node_map[exit_node[1:]],
- address, TEST_SUCCESS)
- self.register_success(result)
- return TEST_SUCCESS
- else:
- exit_content_file = open(DataHandler.uniqueFilename(failed_prefix+'.'+exit_node[1:]+'.dyn-content'), 'w')
- exit_content_file.write(tor_js)
- exit_content_file.close()
-
- result = JsTestResult(self.node_map[exit_node[1:]],
- address, TEST_FAILURE, FAILURE_DYNAMIC,
- content_prefix+".content", exit_content_file.name,
- content_prefix+'.content-old',
- content_prefix+".jsdiff")
- self.register_dynamic_failure(result)
- return TEST_FAILURE
-
- def check_html(self, address):
- plog('INFO', 'Conducting an html test with destination ' + address)
- ret = self.check_http_nodynamic(address)
-
- if type(ret) == int:
- return ret
-
- return self._check_html_worker(address, ret)
-
- def _check_html_worker(self, address, http_ret):
- (mime_type,tor_html,tsha,orig_html,osha,new_html,nsha,exit_node)=http_ret
-
- if not is_html_mimetype(mime_type):
- # XXX: Keep an eye on this logline.
- plog("WARN", "Non-html mime type "+mime_type+" fed to HTML test for "+address)
- if is_script_mimetype(mime_type):
- return self._check_js_worker(address, http_ret)
- else:
- return self._check_http_worker(address, http_ret)
-
- # an address representation acceptable for a filename
- address_file = DataHandler.safeFilename(address.replace('http://',''))
- content_prefix = http_content_dir+address_file
- failed_prefix = http_failed_dir+address_file
-
- orig_soup = FullyStrainedSoup(orig_html.decode('ascii', 'ignore'))
- tor_soup = FullyStrainedSoup(tor_html.decode('ascii', 'ignore'))
-
- # Also find recursive urls
- recurse_elements = SoupStrainer(lambda name, attrs:
- name in tags_to_recurse and
- len(set(map(lambda a: a[0], attrs)).intersection(set(attrs_to_recurse))) > 0)
- self._add_recursive_targets(TheChosenSoup(tor_html.decode('ascii',
- 'ignore'), recurse_elements), address)
-
- # compare the content
- # if content matches, everything is ok
- if str(orig_soup) == str(tor_soup):
- plog("INFO", "Successful soup comparison after SHA1 fail for "+address+" via "+exit_node)
- result = HtmlTestResult(self.node_map[exit_node[1:]],
- address, TEST_SUCCESS)
- self.register_success(result)
-
- return TEST_SUCCESS
-
- content_new = new_html.decode('ascii', 'ignore')
- if not content_new:
- plog("WARN", "Failed to re-frech "+address+" outside of Tor. Did our network fail?")
- result = HtmlTestResult(self.node_map[exit_node[1:]],
- address, TEST_INCONCLUSIVE,
- INCONCLUSIVE_NOLOCALCONTENT)
- if self.rescan_nodes:
- result.from_rescan = True
- self.results.append(result)
- datahandler.saveResult(result)
- return TEST_INCONCLUSIVE
-
- new_soup = FullyStrainedSoup(content_new)
-
- # compare the new and old content
- # if they match, means the node has been changing the content
- if str(orig_soup) == str(new_soup):
- exit_content_file = open(DataHandler.uniqueFilename(failed_prefix+'.'+exit_node[1:]+'.content'), 'w')
- exit_content_file.write(tor_html)
- exit_content_file.close()
-
- result = HtmlTestResult(self.node_map[exit_node[1:]],
- address, TEST_FAILURE, FAILURE_EXITONLY,
- content_prefix+".content", exit_content_file.name)
- self.register_exit_failure(result)
- return TEST_FAILURE
-
- # Lets try getting just the tag differences
- # 1. Take difference between old and new tags both ways
- # 2. Make map of tags that change to their attributes
- # 3. Compare list of changed tags for tor vs new and
- # see if any extra tags changed or if new attributes
- # were added to additional tags
- if os.path.exists(content_prefix+".soupdiff"):
- plog("DEBUG", "Loading soupdiff for "+address)
- soupdiff = SnakePickler.load(content_prefix+".soupdiff")
- soupdiff.prune_differences(new_soup)
- else:
- plog("DEBUG", "No soupdiff for "+address+". Creating+dumping")
- soupdiff = SoupDiffer(orig_soup, new_soup)
-
- SnakePickler.dump(soupdiff, content_prefix+".soupdiff")
-
- more_tags = soupdiff.show_changed_tags(tor_soup)
- more_attrs = soupdiff.show_changed_attrs(tor_soup)
- more_content = soupdiff.show_changed_content(tor_soup)
-
- # Verify all of our changed tags are present here
- if more_tags or more_attrs or (more_content and not soupdiff.content_changed):
- false_positive = False
- plog("NOTICE", "SoupDiffer finds differences for "+address)
- plog("NOTICE", "New Tags:\n"+more_tags)
- plog("NOTICE", "New Attrs:\n"+more_attrs)
- if more_content and not soupdiff.content_changed:
- plog("NOTICE", "New Content:\n"+more_content)
- else:
- plog("INFO", "SoupDiffer predicts false_positive")
- false_positive = True
-
- if false_positive:
- if os.path.exists(content_prefix+".jsdiff"):
- plog("DEBUG", "Loading jsdiff for "+address)
- jsdiff = SnakePickler.load(content_prefix+".jsdiff")
- else:
- plog("DEBUG", "No jsdiff for "+address+". Creating+dumping")
- jsdiff = JSSoupDiffer(orig_soup)
-
- jsdiff.prune_differences(new_soup)
- SnakePickler.dump(jsdiff, content_prefix+".jsdiff")
-
- differences = jsdiff.show_differences(tor_soup)
- false_positive = not differences
- plog("INFO", "JSSoupDiffer predicts false_positive="+str(false_positive))
- if not false_positive:
- plog("NOTICE", "JSSoupDiffer finds differences: "+differences)
-
- if false_positive:
- plog("NOTICE", "False positive detected for dynamic change at "+address+" via "+exit_node)
- result = HtmlTestResult(self.node_map[exit_node[1:]],
- address, TEST_SUCCESS)
- self.register_success(result)
- return TEST_SUCCESS
-
- exit_content_file = open(DataHandler.uniqueFilename(failed_prefix+'.'+exit_node[1:]+'.dyn-content'),'w')
- exit_content_file.write(tor_html)
- exit_content_file.close()
-
- if os.path.exists(content_prefix+".jsdiff"):
- jsdiff_file = content_prefix+".jsdiff"
- else:
- jsdiff_file = None
- if os.path.exists(content_prefix+".soupdiff"):
- soupdiff_file = content_prefix+".soupdiff"
- else:
- soupdiff_file = None
-
- result = HtmlTestResult(self.node_map[exit_node[1:]],
- address, TEST_FAILURE, FAILURE_DYNAMIC,
- content_prefix+".content", exit_content_file.name,
- content_prefix+'.content-old',
- soupdiff_file, jsdiff_file)
- self.register_dynamic_failure(result)
- return TEST_FAILURE
-
class BaseSSLTest(Test):
def __init__(self):
Test.__init__(self, "SSL", 443)
@@ -2041,15 +1693,6 @@ class FixedTargetHTTPTest(FixedTargetTest, BaseHTTPTest):
def add_target(self, target):
self.targets.add(target[0],[target[1]])
-class FixedTargetHTMLTest(FixedTargetTest, BaseHTMLTest):
- def __init__(self, targets):
- BaseHTMLTest.__init__(self)
- utargets = [t for t in targets if self._is_useable_url(t, ['http'])]
- FixedTargetTest.__init__(self, utargets)
- def _add_recursive_targets(self, soup, orig_addr):
- # Don't recurse for FixedTarget tests
- pass
-
class FixedTargetSSLTest(FixedTargetTest, BaseSSLTest):
def __init__(self, targets):
BaseSSLTest.__init__(self)
@@ -2236,25 +1879,6 @@ class SearchBasedHTTPTest(SearchBasedTest, BaseHTTPTest):
HTTPTest = SearchBasedHTTPTest # For resuming from old HTTPTest.*.test files
-class SearchBasedHTMLTest(SearchBasedTest, BaseHTMLTest):
- def __init__(self, wordlist):
- BaseHTMLTest.__init__(self)
- SearchBasedTest.__init__(self, wordlist)
- self.result_filetypes = ["any"]
-
- def depickle_upgrade(self):
- if self._pickle_revision < 7:
- self.result_filetypes = ["any"]
- self.result_protocol = "http"
- self.results_per_type = self.fetch_targets
- BaseHTMLTest.depickle_upgrade(self)
-
- def rewind(self):
- SearchBasedTest.rewind(self)
- BaseHTMLTest.rewind(self)
-
-HTMLTest = SearchBasedHTMLTest # For resuming from old HTMLTest.*.test files
-
class SearchBasedSSLTest(SearchBasedTest, BaseSSLTest):
def __init__(self, wordlist):
BaseSSLTest.__init__(self)
@@ -2991,7 +2615,6 @@ def main(argv):
print '--rescan=<n>'
print '--ssl'
print '--http'
- print '--html'
# print '--ssh (doesn\'t work yet)'
# print '--smtp (~works)'
# print '--pop (~works)'
@@ -3006,7 +2629,7 @@ def main(argv):
TorUtil.read_config(data_dir+"/torctl.cfg")
- opts = ['ssl','rescan', 'pernode=', 'resume=', 'html','http','ssh','smtp','pop','imap','dns','dnsrebind','policies','exit=','target=','loglevel=']
+ opts = ['ssl','rescan', 'pernode=', 'resume=','http','ssh','smtp','pop','imap','dns','dnsrebind','policies','exit=','target=','loglevel=']
flags, trailer = getopt.getopt(argv[1:], [], opts)
# get specific test types
@@ -3014,7 +2637,6 @@ def main(argv):
do_rescan = ('--rescan','') in flags
do_ssl = ('--ssl','') in flags
do_http = ('--http','') in flags
- do_html = ('--html','') in flags
#do_ssh = ('--ssh','') in flags
#do_smtp = ('--smtp','') in flags
#do_pop = ('--pop','') in flags
@@ -3074,7 +2696,7 @@ def main(argv):
scanhdlr.check_all_exits_port_consistency()
# maybe only the consistency test was required
- if not (do_ssl or do_html or do_http):
+ if not (do_ssl or do_http):
plog('INFO', 'Done.')
return
@@ -3095,7 +2717,7 @@ def main(argv):
ssl_data_dir = os.path.join(soat_dir, 'ssl')
tocheck += [ssl_certs_dir]
tocheck += [os.path.join(ssl_data_dir, r) for r in rsubdirs]
- if do_html or do_http:
+ if do_http:
tocheck += [http_content_dir]
tocheck += [os.path.join(http_data_dir, r) for r in rsubdirs]
if do_dns_rebind:
@@ -3130,10 +2752,6 @@ def main(argv):
tests["HTTP"] = datahandler.loadTest("HTTPTest", resume_run)
plog("NOTICE", "Resuming previous HTTP run "+os.path.split(tests["HTTP"].filename)[-1])
- if do_html:
- tests["HTML"] = datahandler.loadTest("HTMLTest", resume_run)
- plog("NOTICE", "Resuming previous HTML run "+os.path.split(tests["HTML"].filename)[-1])
-
elif fixed_targets:
if do_ssl:
tests["SSL"] = FixedTargetSSLTest(fixed_targets)
@@ -3141,9 +2759,6 @@ def main(argv):
if do_http:
tests["HTTP"] = FixedTargetHTTPTest(fixed_targets)
- if do_html:
- tests["HTML"] = FixedTargetHTMLTest(fixed_targets)
-
else:
if do_ssl:
tests["SSL"] = SearchBasedSSLTest(ssl_wordlist_file)
@@ -3151,9 +2766,6 @@ def main(argv):
if do_http:
tests["HTTP"] = SearchBasedHTTPTest(filetype_wordlist_file)
- if do_html:
- tests["HTML"] = SearchBasedHTMLTest(html_wordlist_file)
-
# maybe no tests could be initialized
if not tests:
plog('INFO', 'Done.')