[tor-commits] [doctor/master] Sending DocTor notifications directly to authority operators

atagar at torproject.org atagar at torproject.org
Tue Nov 18 17:52:36 UTC 2014


commit 47534b29b51739a8ea67e1f8e2ff5bbb5b6e27c6
Author: Damian Johnson <atagar at torproject.org>
Date:   Tue Nov 18 09:48:24 2014 -0800

    Sending DocTor notifications directly to authority operators
    
    Notifying authority operators in addition to tor-consensus-health at . Folks with
    public contact information are notified via cc, while and others via bcc.
    Contact information is specified in 'data/contact_information.cfg', which we're
    keeping out of git to hide private addresses.
    
    This necessitated reverting a change I made to sidestep OOM issues. If that
    starts biting us again I'll need to figure out another method for addressing
    it...
---
 .gitignore                  |    1 +
 consensus_health_checker.py |  152 ++++++++++++++++++++++++++++++-------------
 util.py                     |   20 ++++--
 3 files changed, 120 insertions(+), 53 deletions(-)

diff --git a/.gitignore b/.gitignore
index bcb8c0b..1a47934 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,6 +1,7 @@
 *.pyc
 *.swp
 logs/
+data/contact_information.cfg
 data/fingerprints
 data/last_notified.cfg
 stem
diff --git a/consensus_health_checker.py b/consensus_health_checker.py
index 13497bf..f79277b 100755
--- a/consensus_health_checker.py
+++ b/consensus_health_checker.py
@@ -6,7 +6,9 @@
 Performs a variety of checks against the present votes and consensus.
 """
 
+import collections
 import datetime
+import os
 import subprocess
 import time
 import traceback
@@ -25,7 +27,7 @@ from stem.util.lru_cache import lru_cache
 
 TEST_RUN = False
 
-Runlevel = stem.util.enum.UppercaseEnum("NOTICE", "WARNING", "ERROR")
+Runlevel = stem.util.enum.UppercaseEnum('NOTICE', 'WARNING', 'ERROR')
 
 DIRECTORY_AUTHORITIES = stem.descriptor.remote.get_authorities()
 EMAIL_SUBJECT = 'Consensus issues'
@@ -35,6 +37,8 @@ CONFIG = stem.util.conf.config_dict('consensus_health', {
   'suppression': {},
   'bandwidth_authorities': [],
   'known_params': [],
+  'contact_address': {},
+  'contact_via_bcc': [],
 })
 
 log = util.get_logger('consensus_health_checker')
@@ -46,6 +50,8 @@ downloader = stem.descriptor.remote.DescriptorDownloader(
   document_handler = stem.descriptor.DocumentHandler.DOCUMENT,
 )
 
+Destination = collections.namedtuple('Destination', ('address', 'bcc'))
+
 
 class Issue(object):
   """
@@ -57,6 +63,8 @@ class Issue(object):
     self._template = template
     self._attr = attr
 
+    self._authorities = attr.get('to', [])
+
   @lru_cache()
   def get_message(self):
     """
@@ -85,6 +93,28 @@ class Issue(object):
     return self._runlevel
 
   @lru_cache()
+  def get_destinations(self):
+    """
+    Provides a mapping of authorities with this issue to their Destination. The
+    destination is **None** if no contact information has been configured.
+
+    :returns: **dict** of authorities this concerns to their contact information
+    """
+
+    destinations = {}
+
+    for authority in self._authorities:
+      if authority in CONFIG['contact_address']:
+        address = CONFIG['contact_address'][authority]
+        is_via_bcc = authority in CONFIG['contact_via_bcc']
+
+        destinations[authority] = Destination(address, is_via_bcc)
+      else:
+        destinations[authority] = None
+
+    return destinations
+
+  @lru_cache()
   def get_suppression_key(self):
     """
     Provides the key used for issue suppression.
@@ -190,12 +220,16 @@ def main():
     # process. Hence we risk an OOM if this is done after loading gobs of
     # descriptor data into memory.
 
-    mail_process = subprocess.Popen(
-      ['mail', '-E', '-s', EMAIL_SUBJECT, util.TO_ADDRESS],
-      stdin = subprocess.PIPE,
-      stdout = subprocess.PIPE,
-      stderr = subprocess.PIPE,
-    )
+    # TODO: The following doesn't work now that we're dynamically picking
+    # destionations based on the issues. We'll need to come up with another
+    # idea if this continues to bite us...
+
+    #mail_process = subprocess.Popen(
+    #  ['mail', '-E', '-s', EMAIL_SUBJECT, util.TO_ADDRESS],
+    #  stdin = subprocess.PIPE,
+    #  stdout = subprocess.PIPE,
+    #  stderr = subprocess.PIPE,
+    #)
 
     bot_notice_process = subprocess.Popen(
       ['mail', '-E', '-s', 'Announce or', 'tor-misc at commit.noreply.org'],
@@ -209,8 +243,18 @@ def main():
   config = stem.util.conf.get_config("consensus_health")
   config.load(util.get_path('data', 'consensus_health.cfg'))
 
+  contact_path = util.get_path('data', 'contact_information.cfg')
+
+  if os.path.exists(contact_path):
+    config.load(contact_path)
+
   config = stem.util.conf.get_config('last_notified')
-  config.load(util.get_path('data', 'last_notified.cfg'))
+  last_notified_path = util.get_path('data', 'last_notified.cfg')
+
+  if os.path.exists(last_notified_path):
+    config.load(last_notified_path)
+  else:
+    config._path = last_notified_path
 
   consensuses, consensus_fetching_issues = get_consensuses()
   votes, vote_fetching_issues = get_votes()
@@ -229,27 +273,45 @@ def main():
       break
 
   if not is_all_suppressed:
-    log.debug("Sending notification for issues")
+    destinations = {}
 
     for issue in issues:
       rate_limit_notice(issue)
+      destinations.update(issue.get_destinations())
+
+    destination_labels = []
+
+    for authority, destination in destinations.items():
+      if not destination:
+        destination_labels.append('%s has no contact information' % authority)
+      elif not destination.bcc:
+        destination_labels.append('%s at %s' % (authority, destination.address))
+      else:
+        destination_labels.append('%s at %s via bcc' % (authority, destination.address))
+
+    log.debug('Sending notification for issues (%s)' % ', '.join(destination_labels))
 
     if TEST_RUN:
       print '\n'.join(map(str, issues))
     else:
-      stdout, stderr = mail_process.communicate('\n'.join(map(str, issues)))
-      exit_code = mail_process.poll()
+      #stdout, stderr = mail_process.communicate('\n'.join(map(str, issues)))
+      #exit_code = mail_process.poll()
 
-      if exit_code != 0:
-        raise ValueError("Unable to send email: %s" % stderr.strip())
+      #if exit_code != 0:
+      #  raise ValueError("Unable to send email: %s" % stderr.strip())
+
+      cc_addresses = [d.address for d in destinations.values() if d and not d.bcc]
+      bcc_addresses = [d.address for d in destinations.values() if d and d.bcc]
+
+      util.send(EMAIL_SUBJECT, body_text = '\n'.join(map(str, issues)), cc_destionations = cc_addresses, bcc_destionations = bcc_addresses)
 
       # notification for #tor-bots
 
       stdout, stderr = bot_notice_process.communicate('\n'.join(['[consensus-health] %s' % issue for issue in issues]))
-      exit_code = mail_process.poll()
+      exit_code = bot_notice_process.poll()
 
       if exit_code != 0:
-        raise ValueError("Unable to send email: %s" % stderr.strip())
+        raise ValueError("Unable to send notice to #tor-bots: %s" % stderr.strip())
   else:
     if issues:
       log.info("All %i issues were suppressed. Not sending a notification." % len(issues))
@@ -323,7 +385,7 @@ def missing_latest_consensus(latest_consensus, consensuses, votes):
 
   if stale_authorities:
     runlevel = Runlevel.ERROR if len(stale_authorities) > 3 else Runlevel.WARNING
-    return Issue(runlevel, 'MISSING_LATEST_CONSENSUS', authorities = ', '.join(stale_authorities))
+    return Issue(runlevel, 'MISSING_LATEST_CONSENSUS', authorities = ', '.join(stale_authorities), to = stale_authorities)
 
 
 def consensus_method_unsupported(latest_consensus, consensuses, votes):
@@ -336,35 +398,33 @@ def consensus_method_unsupported(latest_consensus, consensuses, votes):
       incompatible_authorities.append(authority)
 
   if incompatible_authorities:
-    return Issue(Runlevel.WARNING, 'CONSENSUS_METHOD_UNSUPPORTED', authorities = ', '.join(incompatible_authorities))
+    return Issue(Runlevel.WARNING, 'CONSENSUS_METHOD_UNSUPPORTED', authorities = ', '.join(incompatible_authorities), to = incompatible_authorities)
 
 
 def different_recommended_client_version(latest_consensus, consensuses, votes):
   "Checks that the recommended tor versions for clients match the present consensus."
 
-  differences = []
+  differences = {}
 
   for authority, vote in votes.items():
     if vote.client_versions and latest_consensus.client_versions != vote.client_versions:
-      msg = _version_difference_str(authority, latest_consensus.client_versions, vote.client_versions)
-      differences.append(msg)
+      differences[authority] = _version_difference_str(authority, latest_consensus.client_versions, vote.client_versions)
 
   if differences:
-    return Issue(Runlevel.NOTICE, 'DIFFERENT_RECOMMENDED_VERSION', type = 'client', differences = ', '.join(differences))
+    return Issue(Runlevel.NOTICE, 'DIFFERENT_RECOMMENDED_VERSION', type = 'client', differences = ', '.join(differences.values()), to = differences.keys())
 
 
 def different_recommended_server_version(latest_consensus, consensuses, votes):
   "Checks that the recommended tor versions for servers match the present consensus."
 
-  differences = []
+  differences = {}
 
   for authority, vote in votes.items():
     if vote.server_versions and latest_consensus.server_versions != vote.server_versions:
-      msg = _version_difference_str(authority, latest_consensus.server_versions, vote.server_versions)
-      differences.append(msg)
+      differences[authority] = _version_difference_str(authority, latest_consensus.server_versions, vote.server_versions)
 
   if differences:
-    return Issue(Runlevel.NOTICE, 'DIFFERENT_RECOMMENDED_VERSION', type = 'server', differences = ', '.join(differences))
+    return Issue(Runlevel.NOTICE, 'DIFFERENT_RECOMMENDED_VERSION', type = 'server', differences = ', '.join(differences.values()), to = differences.keys())
 
 
 def _version_difference_str(authority, consensus_versions, vote_versions):
@@ -392,7 +452,7 @@ def _version_difference_str(authority, consensus_versions, vote_versions):
 def unknown_consensus_parameters(latest_consensus, consensuses, votes):
   "Checks that votes don't contain any parameters that we don't recognize."
 
-  unknown_entries = []
+  unknown_entries = {}
 
   for authority, vote in votes.items():
     unknown_params = []
@@ -402,16 +462,16 @@ def unknown_consensus_parameters(latest_consensus, consensuses, votes):
         unknown_params.append('%s=%s' % (param_key, param_value))
 
     if unknown_params:
-      unknown_entries.append('%s %s' % (authority, ' '.join(unknown_params)))
+      unknown_entries[authority] = '%s %s' % (authority, ' '.join(unknown_params))
 
   if unknown_entries:
-    return Issue(Runlevel.NOTICE, 'UNKNOWN_CONSENSUS_PARAMETERS', parameters = ', '.join(unknown_entries))
+    return Issue(Runlevel.NOTICE, 'UNKNOWN_CONSENSUS_PARAMETERS', parameters = ', '.join(unknown_entries.values()), to = unknown_entries.keys())
 
 
 def vote_parameters_mismatch_consensus(latest_consensus, consensuses, votes):
   "Check that all vote parameters appear in the consensus."
 
-  mismatching_entries = []
+  mismatching_entries = {}
 
   for authority, vote in votes.items():
     mismatching_params = []
@@ -421,10 +481,10 @@ def vote_parameters_mismatch_consensus(latest_consensus, consensuses, votes):
         mismatching_params.append('%s=%s' % (param_key, param_value))
 
     if mismatching_params:
-      mismatching_entries.append('%s %s' % (authority, ' '.join(mismatching_params)))
+      mismatching_entries[authority] = '%s %s' % (authority, ' '.join(mismatching_params))
 
   if mismatching_entries:
-    return Issue(Runlevel.NOTICE, 'MISMATCH_CONSENSUS_PARAMETERS', parameters = ', '.join(mismatching_entries))
+    return Issue(Runlevel.NOTICE, 'MISMATCH_CONSENSUS_PARAMETERS', parameters = ', '.join(mismatching_entries.values()), to = mismatching_entries.keys())
 
 
 def certificate_expiration(latest_consensus, consensuses, votes):
@@ -440,11 +500,11 @@ def certificate_expiration(latest_consensus, consensuses, votes):
     expiration_label = '%s (%s)' % (authority, cert_expiration.strftime('%Y-%m-%d %H-%M-%S'))
 
     if (cert_expiration - current_time) <= datetime.timedelta(days = 7):
-      issues.append(Issue(Runlevel.WARNING, 'CERTIFICATE_ABOUT_TO_EXPIRE', duration = 'week', authority = expiration_label))
+      issues.append(Issue(Runlevel.WARNING, 'CERTIFICATE_ABOUT_TO_EXPIRE', duration = 'week', authority = expiration_label, to = [authority]))
     elif (cert_expiration - current_time) <= datetime.timedelta(days = 14):
-      issues.append(Issue(Runlevel.WARNING, 'CERTIFICATE_ABOUT_TO_EXPIRE', duration = 'two weeks', authority = expiration_label))
+      issues.append(Issue(Runlevel.WARNING, 'CERTIFICATE_ABOUT_TO_EXPIRE', duration = 'two weeks', authority = expiration_label, to = [authority]))
     elif (cert_expiration - current_time) <= datetime.timedelta(days = 21):
-      issues.append(Issue(Runlevel.NOTICE, 'CERTIFICATE_ABOUT_TO_EXPIRE', duration = 'three weeks', authority = expiration_label))
+      issues.append(Issue(Runlevel.NOTICE, 'CERTIFICATE_ABOUT_TO_EXPIRE', duration = 'three weeks', authority = expiration_label, to = [authority]))
 
   return issues
 
@@ -467,7 +527,7 @@ def consensuses_have_same_votes(latest_consensus, consensuses, votes):
       authorities_missing_votes.append(authority)
 
   if authorities_missing_votes:
-    return Issue(Runlevel.NOTICE, 'MISSING_VOTES', authorities = ', '.join(authorities_missing_votes))
+    return Issue(Runlevel.NOTICE, 'MISSING_VOTES', authorities = ', '.join(authorities_missing_votes), to = authorities_missing_votes)
 
 
 def has_all_signatures(latest_consensus, consensuses, votes):
@@ -494,7 +554,7 @@ def has_all_signatures(latest_consensus, consensuses, votes):
       missing_authorities.add(missing_authority)
 
     if missing_authorities:
-      issues.append(Issue(Runlevel.NOTICE, 'MISSING_SIGNATURE', consensus_of = consensus_of, authorities = ', '.join(missing_authorities)))
+      issues.append(Issue(Runlevel.NOTICE, 'MISSING_SIGNATURE', consensus_of = consensus_of, authorities = ', '.join(missing_authorities), to = missing_authorities))
 
   return issues
 
@@ -521,10 +581,10 @@ def voting_bandwidth_scanners(latest_consensus, consensuses, votes):
 
   if missing_authorities:
     runlevel = Runlevel.ERROR if len(missing_authorities) > 1 else Runlevel.WARNING
-    issues.append(Issue(runlevel, 'MISSING_BANDWIDTH_SCANNERS', authorities = ', '.join(missing_authorities)))
+    issues.append(Issue(runlevel, 'MISSING_BANDWIDTH_SCANNERS', authorities = ', '.join(missing_authorities), to = missing_authorities))
 
   if extra_authorities:
-    issues.append(Issue(Runlevel.NOTICE, 'EXTRA_BANDWIDTH_SCANNERS', authorities = ', '.join(extra_authorities)))
+    issues.append(Issue(Runlevel.NOTICE, 'EXTRA_BANDWIDTH_SCANNERS', authorities = ', '.join(extra_authorities), to = extra_authorities))
 
   return issues
 
@@ -550,7 +610,7 @@ def unmeasured_relays(latest_consensus, consensuses, votes):
       percentage = 100 * unmeasured / total
 
       if percentage >= 5:
-        issues.append(Issue(Runlevel.NOTICE, 'TOO_MANY_UNMEASURED_RELAYS', authority = authority, unmeasured = unmeasured, total = total, percentage = percentage))
+        issues.append(Issue(Runlevel.NOTICE, 'TOO_MANY_UNMEASURED_RELAYS', authority = authority, unmeasured = unmeasured, total = total, percentage = percentage, to = [authority]))
 
   return issues
 
@@ -571,10 +631,10 @@ def has_authority_flag(latest_consensus, consensuses, votes):
   issues = []
 
   if missing_authorities:
-    issues.append(Issue(Runlevel.WARNING, 'MISSING_AUTHORITIES', authorities = ', '.join(missing_authorities)))
+    issues.append(Issue(Runlevel.WARNING, 'MISSING_AUTHORITIES', authorities = ', '.join(missing_authorities), to = missing_authorities))
 
   if extra_authorities:
-    issues.append(Issue(Runlevel.NOTICE, 'EXTRA_AUTHORITIES', authorities = ', '.join(extra_authorities)))
+    issues.append(Issue(Runlevel.NOTICE, 'EXTRA_AUTHORITIES', authorities = ', '.join(extra_authorities), to = extra_authorities))
 
   return issues
 
@@ -588,7 +648,7 @@ def has_expected_fingerprints(latest_consensus, consensuses, votes):
       expected_fingerprint = DIRECTORY_AUTHORITIES[desc.nickname].fingerprint
 
       if desc.fingerprint != expected_fingerprint:
-        issues.append(Issue(Runlevel.ERROR, 'FINGERPRINT_MISMATCH', authority = desc.nickname, expected = desc.fingerprint, actual = expected_fingerprint))
+        issues.append(Issue(Runlevel.ERROR, 'FINGERPRINT_MISMATCH', authority = desc.nickname, expected = desc.fingerprint, actual = expected_fingerprint, to = [desc.nickname]))
 
   return issues
 
@@ -607,7 +667,7 @@ def is_recommended_versions(latest_consensus, consensuses, votes):
 
   if outdated_authorities:
     entries = ['%s (%s)' % (k, v) for k, v in outdated_authorities.items()]
-    return Issue(Runlevel.WARNING, 'TOR_OUT_OF_DATE', authorities = ', '.join(entries))
+    return Issue(Runlevel.WARNING, 'TOR_OUT_OF_DATE', authorities = ', '.join(entries), to = outdated_authorities.keys())
 
 
 def bad_exits_in_sync(latest_consensus, consensuses, votes):
@@ -634,7 +694,7 @@ def bad_exits_in_sync(latest_consensus, consensuses, votes):
     with_flag = set([authority for authority, flagged in bad_exits.items() if fingerprint in flagged])
     without_flag = voting_authorities.difference(with_flag)
 
-    issues.append(Issue(Runlevel.NOTICE, 'BADEXIT_OUT_OF_SYNC', fingerprint = fingerprint, with_flag = ', '.join(with_flag), without_flag = ', '.join(without_flag)))
+    issues.append(Issue(Runlevel.NOTICE, 'BADEXIT_OUT_OF_SYNC', fingerprint = fingerprint, with_flag = ', '.join(with_flag), without_flag = ', '.join(without_flag), to = bad_exits.keys()))
 
   return issues
 
@@ -662,7 +722,7 @@ def bandwidth_authorities_in_sync(latest_consensus, consensuses, votes):
   for authority, count in measurement_counts.items():
     if count > (1.2 * average) or count < (0.8 * average):
       entries = ['%s (%s)' % (authority, count) for authority, count in measurement_counts.items()]
-      return Issue(Runlevel.NOTICE, 'BANDWIDTH_AUTHORITIES_OUT_OF_SYNC', authorities = ', '.join(entries))
+      return Issue(Runlevel.NOTICE, 'BANDWIDTH_AUTHORITIES_OUT_OF_SYNC', authorities = ', '.join(entries), to = measurement_counts.keys())
 
 
 def get_consensuses():
@@ -718,7 +778,7 @@ def _get_documents(label, resource):
           documents[authority] = list(query)[0]
           continue
 
-      issues.append(Issue(Runlevel.ERROR, 'AUTHORITY_UNAVAILABLE', fetch_type = label, authority = authority, url = query.download_url, error = exc))
+      issues.append(Issue(Runlevel.ERROR, 'AUTHORITY_UNAVAILABLE', fetch_type = label, authority = authority, url = query.download_url, error = exc, to = [authority]))
 
   return documents, issues
 
diff --git a/util.py b/util.py
index 5347906..10f10b7 100644
--- a/util.py
+++ b/util.py
@@ -79,24 +79,30 @@ def log_stem_debugging(name):
   log.addHandler(handler)
 
 
-def send(subject, body_text = None, destination = TO_ADDRESS):
+def send(subject, body_text = None, destination = TO_ADDRESS, cc_destinations = None, bcc_destinations = None):
   """
   Sends an email notification via the local mail application.
 
   :param str subject: subject of the email
   :param str body_text: plaintext body of the email
   :param str destination: location to send the email to
+  :param list cc_destinations: addresses for the cc field
+  :param list bcc_destinations: addresses for the bcc field
 
   :raises: **Exception** if the email fails to be sent
   """
 
-  process = subprocess.Popen(
-    ['mail', '-E', '-s', subject, destination],
-    stdin = subprocess.PIPE,
-    stdout = subprocess.PIPE,
-    stderr = subprocess.PIPE,
-  )
+  args = ['mail', '-E', '-s', subject]
 
+  if cc_destinations:
+    args += ['-c', ','.join(cc_destinations)]
+
+  if bcc_destinations:
+    args += ['-b', ','.join(bcc_destinations)]
+
+  args.append(destination)
+
+  process = subprocess.Popen(args, stdin = subprocess.PIPE, stdout = subprocess.PIPE, stderr = subprocess.PIPE)
   stdout, stderr = process.communicate(body_text)
   exit_code = process.poll()
 



More information about the tor-commits mailing list