commit 47534b29b51739a8ea67e1f8e2ff5bbb5b6e27c6 Author: Damian Johnson atagar@torproject.org Date: Tue Nov 18 09:48:24 2014 -0800
Sending DocTor notifications directly to authority operators
Notifying authority operators in addition to tor-consensus-health@. Folks with public contact information are notified via cc, while and others via bcc. Contact information is specified in 'data/contact_information.cfg', which we're keeping out of git to hide private addresses.
This necessitated reverting a change I made to sidestep OOM issues. If that starts biting us again I'll need to figure out another method for addressing it... --- .gitignore | 1 + consensus_health_checker.py | 152 ++++++++++++++++++++++++++++++------------- util.py | 20 ++++-- 3 files changed, 120 insertions(+), 53 deletions(-)
diff --git a/.gitignore b/.gitignore index bcb8c0b..1a47934 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,7 @@ *.pyc *.swp logs/ +data/contact_information.cfg data/fingerprints data/last_notified.cfg stem diff --git a/consensus_health_checker.py b/consensus_health_checker.py index 13497bf..f79277b 100755 --- a/consensus_health_checker.py +++ b/consensus_health_checker.py @@ -6,7 +6,9 @@ Performs a variety of checks against the present votes and consensus. """
+import collections import datetime +import os import subprocess import time import traceback @@ -25,7 +27,7 @@ from stem.util.lru_cache import lru_cache
TEST_RUN = False
-Runlevel = stem.util.enum.UppercaseEnum("NOTICE", "WARNING", "ERROR") +Runlevel = stem.util.enum.UppercaseEnum('NOTICE', 'WARNING', 'ERROR')
DIRECTORY_AUTHORITIES = stem.descriptor.remote.get_authorities() EMAIL_SUBJECT = 'Consensus issues' @@ -35,6 +37,8 @@ CONFIG = stem.util.conf.config_dict('consensus_health', { 'suppression': {}, 'bandwidth_authorities': [], 'known_params': [], + 'contact_address': {}, + 'contact_via_bcc': [], })
log = util.get_logger('consensus_health_checker') @@ -46,6 +50,8 @@ downloader = stem.descriptor.remote.DescriptorDownloader( document_handler = stem.descriptor.DocumentHandler.DOCUMENT, )
+Destination = collections.namedtuple('Destination', ('address', 'bcc')) +
class Issue(object): """ @@ -57,6 +63,8 @@ class Issue(object): self._template = template self._attr = attr
+ self._authorities = attr.get('to', []) + @lru_cache() def get_message(self): """ @@ -85,6 +93,28 @@ class Issue(object): return self._runlevel
@lru_cache() + def get_destinations(self): + """ + Provides a mapping of authorities with this issue to their Destination. The + destination is **None** if no contact information has been configured. + + :returns: **dict** of authorities this concerns to their contact information + """ + + destinations = {} + + for authority in self._authorities: + if authority in CONFIG['contact_address']: + address = CONFIG['contact_address'][authority] + is_via_bcc = authority in CONFIG['contact_via_bcc'] + + destinations[authority] = Destination(address, is_via_bcc) + else: + destinations[authority] = None + + return destinations + + @lru_cache() def get_suppression_key(self): """ Provides the key used for issue suppression. @@ -190,12 +220,16 @@ def main(): # process. Hence we risk an OOM if this is done after loading gobs of # descriptor data into memory.
- mail_process = subprocess.Popen( - ['mail', '-E', '-s', EMAIL_SUBJECT, util.TO_ADDRESS], - stdin = subprocess.PIPE, - stdout = subprocess.PIPE, - stderr = subprocess.PIPE, - ) + # TODO: The following doesn't work now that we're dynamically picking + # destionations based on the issues. We'll need to come up with another + # idea if this continues to bite us... + + #mail_process = subprocess.Popen( + # ['mail', '-E', '-s', EMAIL_SUBJECT, util.TO_ADDRESS], + # stdin = subprocess.PIPE, + # stdout = subprocess.PIPE, + # stderr = subprocess.PIPE, + #)
bot_notice_process = subprocess.Popen( ['mail', '-E', '-s', 'Announce or', 'tor-misc@commit.noreply.org'], @@ -209,8 +243,18 @@ def main(): config = stem.util.conf.get_config("consensus_health") config.load(util.get_path('data', 'consensus_health.cfg'))
+ contact_path = util.get_path('data', 'contact_information.cfg') + + if os.path.exists(contact_path): + config.load(contact_path) + config = stem.util.conf.get_config('last_notified') - config.load(util.get_path('data', 'last_notified.cfg')) + last_notified_path = util.get_path('data', 'last_notified.cfg') + + if os.path.exists(last_notified_path): + config.load(last_notified_path) + else: + config._path = last_notified_path
consensuses, consensus_fetching_issues = get_consensuses() votes, vote_fetching_issues = get_votes() @@ -229,27 +273,45 @@ def main(): break
if not is_all_suppressed: - log.debug("Sending notification for issues") + destinations = {}
for issue in issues: rate_limit_notice(issue) + destinations.update(issue.get_destinations()) + + destination_labels = [] + + for authority, destination in destinations.items(): + if not destination: + destination_labels.append('%s has no contact information' % authority) + elif not destination.bcc: + destination_labels.append('%s at %s' % (authority, destination.address)) + else: + destination_labels.append('%s at %s via bcc' % (authority, destination.address)) + + log.debug('Sending notification for issues (%s)' % ', '.join(destination_labels))
if TEST_RUN: print '\n'.join(map(str, issues)) else: - stdout, stderr = mail_process.communicate('\n'.join(map(str, issues))) - exit_code = mail_process.poll() + #stdout, stderr = mail_process.communicate('\n'.join(map(str, issues))) + #exit_code = mail_process.poll()
- if exit_code != 0: - raise ValueError("Unable to send email: %s" % stderr.strip()) + #if exit_code != 0: + # raise ValueError("Unable to send email: %s" % stderr.strip()) + + cc_addresses = [d.address for d in destinations.values() if d and not d.bcc] + bcc_addresses = [d.address for d in destinations.values() if d and d.bcc] + + util.send(EMAIL_SUBJECT, body_text = '\n'.join(map(str, issues)), cc_destionations = cc_addresses, bcc_destionations = bcc_addresses)
# notification for #tor-bots
stdout, stderr = bot_notice_process.communicate('\n'.join(['[consensus-health] %s' % issue for issue in issues])) - exit_code = mail_process.poll() + exit_code = bot_notice_process.poll()
if exit_code != 0: - raise ValueError("Unable to send email: %s" % stderr.strip()) + raise ValueError("Unable to send notice to #tor-bots: %s" % stderr.strip()) else: if issues: log.info("All %i issues were suppressed. Not sending a notification." % len(issues)) @@ -323,7 +385,7 @@ def missing_latest_consensus(latest_consensus, consensuses, votes):
if stale_authorities: runlevel = Runlevel.ERROR if len(stale_authorities) > 3 else Runlevel.WARNING - return Issue(runlevel, 'MISSING_LATEST_CONSENSUS', authorities = ', '.join(stale_authorities)) + return Issue(runlevel, 'MISSING_LATEST_CONSENSUS', authorities = ', '.join(stale_authorities), to = stale_authorities)
def consensus_method_unsupported(latest_consensus, consensuses, votes): @@ -336,35 +398,33 @@ def consensus_method_unsupported(latest_consensus, consensuses, votes): incompatible_authorities.append(authority)
if incompatible_authorities: - return Issue(Runlevel.WARNING, 'CONSENSUS_METHOD_UNSUPPORTED', authorities = ', '.join(incompatible_authorities)) + return Issue(Runlevel.WARNING, 'CONSENSUS_METHOD_UNSUPPORTED', authorities = ', '.join(incompatible_authorities), to = incompatible_authorities)
def different_recommended_client_version(latest_consensus, consensuses, votes): "Checks that the recommended tor versions for clients match the present consensus."
- differences = [] + differences = {}
for authority, vote in votes.items(): if vote.client_versions and latest_consensus.client_versions != vote.client_versions: - msg = _version_difference_str(authority, latest_consensus.client_versions, vote.client_versions) - differences.append(msg) + differences[authority] = _version_difference_str(authority, latest_consensus.client_versions, vote.client_versions)
if differences: - return Issue(Runlevel.NOTICE, 'DIFFERENT_RECOMMENDED_VERSION', type = 'client', differences = ', '.join(differences)) + return Issue(Runlevel.NOTICE, 'DIFFERENT_RECOMMENDED_VERSION', type = 'client', differences = ', '.join(differences.values()), to = differences.keys())
def different_recommended_server_version(latest_consensus, consensuses, votes): "Checks that the recommended tor versions for servers match the present consensus."
- differences = [] + differences = {}
for authority, vote in votes.items(): if vote.server_versions and latest_consensus.server_versions != vote.server_versions: - msg = _version_difference_str(authority, latest_consensus.server_versions, vote.server_versions) - differences.append(msg) + differences[authority] = _version_difference_str(authority, latest_consensus.server_versions, vote.server_versions)
if differences: - return Issue(Runlevel.NOTICE, 'DIFFERENT_RECOMMENDED_VERSION', type = 'server', differences = ', '.join(differences)) + return Issue(Runlevel.NOTICE, 'DIFFERENT_RECOMMENDED_VERSION', type = 'server', differences = ', '.join(differences.values()), to = differences.keys())
def _version_difference_str(authority, consensus_versions, vote_versions): @@ -392,7 +452,7 @@ def _version_difference_str(authority, consensus_versions, vote_versions): def unknown_consensus_parameters(latest_consensus, consensuses, votes): "Checks that votes don't contain any parameters that we don't recognize."
- unknown_entries = [] + unknown_entries = {}
for authority, vote in votes.items(): unknown_params = [] @@ -402,16 +462,16 @@ def unknown_consensus_parameters(latest_consensus, consensuses, votes): unknown_params.append('%s=%s' % (param_key, param_value))
if unknown_params: - unknown_entries.append('%s %s' % (authority, ' '.join(unknown_params))) + unknown_entries[authority] = '%s %s' % (authority, ' '.join(unknown_params))
if unknown_entries: - return Issue(Runlevel.NOTICE, 'UNKNOWN_CONSENSUS_PARAMETERS', parameters = ', '.join(unknown_entries)) + return Issue(Runlevel.NOTICE, 'UNKNOWN_CONSENSUS_PARAMETERS', parameters = ', '.join(unknown_entries.values()), to = unknown_entries.keys())
def vote_parameters_mismatch_consensus(latest_consensus, consensuses, votes): "Check that all vote parameters appear in the consensus."
- mismatching_entries = [] + mismatching_entries = {}
for authority, vote in votes.items(): mismatching_params = [] @@ -421,10 +481,10 @@ def vote_parameters_mismatch_consensus(latest_consensus, consensuses, votes): mismatching_params.append('%s=%s' % (param_key, param_value))
if mismatching_params: - mismatching_entries.append('%s %s' % (authority, ' '.join(mismatching_params))) + mismatching_entries[authority] = '%s %s' % (authority, ' '.join(mismatching_params))
if mismatching_entries: - return Issue(Runlevel.NOTICE, 'MISMATCH_CONSENSUS_PARAMETERS', parameters = ', '.join(mismatching_entries)) + return Issue(Runlevel.NOTICE, 'MISMATCH_CONSENSUS_PARAMETERS', parameters = ', '.join(mismatching_entries.values()), to = mismatching_entries.keys())
def certificate_expiration(latest_consensus, consensuses, votes): @@ -440,11 +500,11 @@ def certificate_expiration(latest_consensus, consensuses, votes): expiration_label = '%s (%s)' % (authority, cert_expiration.strftime('%Y-%m-%d %H-%M-%S'))
if (cert_expiration - current_time) <= datetime.timedelta(days = 7): - issues.append(Issue(Runlevel.WARNING, 'CERTIFICATE_ABOUT_TO_EXPIRE', duration = 'week', authority = expiration_label)) + issues.append(Issue(Runlevel.WARNING, 'CERTIFICATE_ABOUT_TO_EXPIRE', duration = 'week', authority = expiration_label, to = [authority])) elif (cert_expiration - current_time) <= datetime.timedelta(days = 14): - issues.append(Issue(Runlevel.WARNING, 'CERTIFICATE_ABOUT_TO_EXPIRE', duration = 'two weeks', authority = expiration_label)) + issues.append(Issue(Runlevel.WARNING, 'CERTIFICATE_ABOUT_TO_EXPIRE', duration = 'two weeks', authority = expiration_label, to = [authority])) elif (cert_expiration - current_time) <= datetime.timedelta(days = 21): - issues.append(Issue(Runlevel.NOTICE, 'CERTIFICATE_ABOUT_TO_EXPIRE', duration = 'three weeks', authority = expiration_label)) + issues.append(Issue(Runlevel.NOTICE, 'CERTIFICATE_ABOUT_TO_EXPIRE', duration = 'three weeks', authority = expiration_label, to = [authority]))
return issues
@@ -467,7 +527,7 @@ def consensuses_have_same_votes(latest_consensus, consensuses, votes): authorities_missing_votes.append(authority)
if authorities_missing_votes: - return Issue(Runlevel.NOTICE, 'MISSING_VOTES', authorities = ', '.join(authorities_missing_votes)) + return Issue(Runlevel.NOTICE, 'MISSING_VOTES', authorities = ', '.join(authorities_missing_votes), to = authorities_missing_votes)
def has_all_signatures(latest_consensus, consensuses, votes): @@ -494,7 +554,7 @@ def has_all_signatures(latest_consensus, consensuses, votes): missing_authorities.add(missing_authority)
if missing_authorities: - issues.append(Issue(Runlevel.NOTICE, 'MISSING_SIGNATURE', consensus_of = consensus_of, authorities = ', '.join(missing_authorities))) + issues.append(Issue(Runlevel.NOTICE, 'MISSING_SIGNATURE', consensus_of = consensus_of, authorities = ', '.join(missing_authorities), to = missing_authorities))
return issues
@@ -521,10 +581,10 @@ def voting_bandwidth_scanners(latest_consensus, consensuses, votes):
if missing_authorities: runlevel = Runlevel.ERROR if len(missing_authorities) > 1 else Runlevel.WARNING - issues.append(Issue(runlevel, 'MISSING_BANDWIDTH_SCANNERS', authorities = ', '.join(missing_authorities))) + issues.append(Issue(runlevel, 'MISSING_BANDWIDTH_SCANNERS', authorities = ', '.join(missing_authorities), to = missing_authorities))
if extra_authorities: - issues.append(Issue(Runlevel.NOTICE, 'EXTRA_BANDWIDTH_SCANNERS', authorities = ', '.join(extra_authorities))) + issues.append(Issue(Runlevel.NOTICE, 'EXTRA_BANDWIDTH_SCANNERS', authorities = ', '.join(extra_authorities), to = extra_authorities))
return issues
@@ -550,7 +610,7 @@ def unmeasured_relays(latest_consensus, consensuses, votes): percentage = 100 * unmeasured / total
if percentage >= 5: - issues.append(Issue(Runlevel.NOTICE, 'TOO_MANY_UNMEASURED_RELAYS', authority = authority, unmeasured = unmeasured, total = total, percentage = percentage)) + issues.append(Issue(Runlevel.NOTICE, 'TOO_MANY_UNMEASURED_RELAYS', authority = authority, unmeasured = unmeasured, total = total, percentage = percentage, to = [authority]))
return issues
@@ -571,10 +631,10 @@ def has_authority_flag(latest_consensus, consensuses, votes): issues = []
if missing_authorities: - issues.append(Issue(Runlevel.WARNING, 'MISSING_AUTHORITIES', authorities = ', '.join(missing_authorities))) + issues.append(Issue(Runlevel.WARNING, 'MISSING_AUTHORITIES', authorities = ', '.join(missing_authorities), to = missing_authorities))
if extra_authorities: - issues.append(Issue(Runlevel.NOTICE, 'EXTRA_AUTHORITIES', authorities = ', '.join(extra_authorities))) + issues.append(Issue(Runlevel.NOTICE, 'EXTRA_AUTHORITIES', authorities = ', '.join(extra_authorities), to = extra_authorities))
return issues
@@ -588,7 +648,7 @@ def has_expected_fingerprints(latest_consensus, consensuses, votes): expected_fingerprint = DIRECTORY_AUTHORITIES[desc.nickname].fingerprint
if desc.fingerprint != expected_fingerprint: - issues.append(Issue(Runlevel.ERROR, 'FINGERPRINT_MISMATCH', authority = desc.nickname, expected = desc.fingerprint, actual = expected_fingerprint)) + issues.append(Issue(Runlevel.ERROR, 'FINGERPRINT_MISMATCH', authority = desc.nickname, expected = desc.fingerprint, actual = expected_fingerprint, to = [desc.nickname]))
return issues
@@ -607,7 +667,7 @@ def is_recommended_versions(latest_consensus, consensuses, votes):
if outdated_authorities: entries = ['%s (%s)' % (k, v) for k, v in outdated_authorities.items()] - return Issue(Runlevel.WARNING, 'TOR_OUT_OF_DATE', authorities = ', '.join(entries)) + return Issue(Runlevel.WARNING, 'TOR_OUT_OF_DATE', authorities = ', '.join(entries), to = outdated_authorities.keys())
def bad_exits_in_sync(latest_consensus, consensuses, votes): @@ -634,7 +694,7 @@ def bad_exits_in_sync(latest_consensus, consensuses, votes): with_flag = set([authority for authority, flagged in bad_exits.items() if fingerprint in flagged]) without_flag = voting_authorities.difference(with_flag)
- issues.append(Issue(Runlevel.NOTICE, 'BADEXIT_OUT_OF_SYNC', fingerprint = fingerprint, with_flag = ', '.join(with_flag), without_flag = ', '.join(without_flag))) + issues.append(Issue(Runlevel.NOTICE, 'BADEXIT_OUT_OF_SYNC', fingerprint = fingerprint, with_flag = ', '.join(with_flag), without_flag = ', '.join(without_flag), to = bad_exits.keys()))
return issues
@@ -662,7 +722,7 @@ def bandwidth_authorities_in_sync(latest_consensus, consensuses, votes): for authority, count in measurement_counts.items(): if count > (1.2 * average) or count < (0.8 * average): entries = ['%s (%s)' % (authority, count) for authority, count in measurement_counts.items()] - return Issue(Runlevel.NOTICE, 'BANDWIDTH_AUTHORITIES_OUT_OF_SYNC', authorities = ', '.join(entries)) + return Issue(Runlevel.NOTICE, 'BANDWIDTH_AUTHORITIES_OUT_OF_SYNC', authorities = ', '.join(entries), to = measurement_counts.keys())
def get_consensuses(): @@ -718,7 +778,7 @@ def _get_documents(label, resource): documents[authority] = list(query)[0] continue
- issues.append(Issue(Runlevel.ERROR, 'AUTHORITY_UNAVAILABLE', fetch_type = label, authority = authority, url = query.download_url, error = exc)) + issues.append(Issue(Runlevel.ERROR, 'AUTHORITY_UNAVAILABLE', fetch_type = label, authority = authority, url = query.download_url, error = exc, to = [authority]))
return documents, issues
diff --git a/util.py b/util.py index 5347906..10f10b7 100644 --- a/util.py +++ b/util.py @@ -79,24 +79,30 @@ def log_stem_debugging(name): log.addHandler(handler)
-def send(subject, body_text = None, destination = TO_ADDRESS): +def send(subject, body_text = None, destination = TO_ADDRESS, cc_destinations = None, bcc_destinations = None): """ Sends an email notification via the local mail application.
:param str subject: subject of the email :param str body_text: plaintext body of the email :param str destination: location to send the email to + :param list cc_destinations: addresses for the cc field + :param list bcc_destinations: addresses for the bcc field
:raises: **Exception** if the email fails to be sent """
- process = subprocess.Popen( - ['mail', '-E', '-s', subject, destination], - stdin = subprocess.PIPE, - stdout = subprocess.PIPE, - stderr = subprocess.PIPE, - ) + args = ['mail', '-E', '-s', subject]
+ if cc_destinations: + args += ['-c', ','.join(cc_destinations)] + + if bcc_destinations: + args += ['-b', ','.join(bcc_destinations)] + + args.append(destination) + + process = subprocess.Popen(args, stdin = subprocess.PIPE, stdout = subprocess.PIPE, stderr = subprocess.PIPE) stdout, stderr = process.communicate(body_text) exit_code = process.poll()