commit d547c8ecbadca64b046a041d94e80dddbeecc8c3 Author: Karsten Loesing karsten.loesing@gmx.net Date: Wed Mar 2 20:31:42 2011 +0100
Prepare bridge statistics as part of metrics-web. --- build.xml | 1 + config.template | 15 + lib/junit-4.8.2.jar | Bin 0 -> 237344 bytes .../ernie/cron/BridgeDescriptorParser.java | 141 ++++++ .../ernie/cron/BridgeStatsFileHandler.java | 519 ++++++++++++++++++++ src/org/torproject/ernie/cron/Configuration.java | 27 + .../ernie/cron/ConsensusStatsFileHandler.java | 282 +++++++++++ src/org/torproject/ernie/cron/Main.java | 34 ++- .../ernie/cron/RelayDescriptorParser.java | 17 +- .../ernie/cron/SanitizedBridgesReader.java | 114 +++++ .../ernie/test/SanitizedBridgesReaderTest.java | 33 ++ 11 files changed, 1181 insertions(+), 2 deletions(-)
diff --git a/build.xml b/build.xml index 446572b..cdae4e1 100644 --- a/build.xml +++ b/build.xml @@ -14,6 +14,7 @@ <pathelement path="${classes}"/> <pathelement location="lib/commons-codec-1.4.jar"/> <pathelement location="lib/postgresql-8.4-702.jdbc3.jar"/> + <pathelement location="lib/junit-4.8.2.jar"/> </path>
<target name="init"> diff --git a/config.template b/config.template index 1d0701c..f2dc9cb 100644 --- a/config.template +++ b/config.template @@ -10,6 +10,18 @@ ## again, but it can be confusing to users who don't know about it. #KeepDirectoryArchiveImportHistory 0 # +## Import sanitized bridges from disk, if available +#ImportSanitizedBridges 0 +# +## Relative path to directory to import sanitized bridges from +#SanitizedBridgesDirectory bridges/ +# +## Keep a history of imported sanitized bridge descriptors. This history +## can be useful when importing from a changing data source to avoid +## importing descriptors more than once, but it can be confusing to users +## who don't know about it. +#KeepSanitizedBridgesImportHistory 0 +# ## Write relay descriptors to the database #WriteRelayDescriptorDatabase 0 # @@ -27,4 +39,7 @@ ## Write statistics about the current consensus and votes to the ## website #WriteConsensusHealth 0 +# +## Write bridge stats to disk +#WriteBridgeStats 0
diff --git a/lib/junit-4.8.2.jar b/lib/junit-4.8.2.jar new file mode 100644 index 0000000..5b4bb84 Binary files /dev/null and b/lib/junit-4.8.2.jar differ diff --git a/src/org/torproject/ernie/cron/BridgeDescriptorParser.java b/src/org/torproject/ernie/cron/BridgeDescriptorParser.java new file mode 100644 index 0000000..4777f58 --- /dev/null +++ b/src/org/torproject/ernie/cron/BridgeDescriptorParser.java @@ -0,0 +1,141 @@ +/* Copyright 2011 The Tor Project + * See LICENSE for licensing information */ +package org.torproject.ernie.cron; + +import java.io.*; +import java.text.*; +import java.util.*; +import java.util.logging.*; +import org.apache.commons.codec.digest.*; + +public class BridgeDescriptorParser { + private ConsensusStatsFileHandler csfh; + private BridgeStatsFileHandler bsfh; + private Logger logger; + public BridgeDescriptorParser(ConsensusStatsFileHandler csfh, + BridgeStatsFileHandler bsfh) { + this.csfh = csfh; + this.bsfh = bsfh; + this.logger = + Logger.getLogger(BridgeDescriptorParser.class.getName()); + } + public void parse(byte[] allData, String dateTime, boolean sanitized) { + try { + BufferedReader br = new BufferedReader(new StringReader( + new String(allData, "US-ASCII"))); + SimpleDateFormat timeFormat = new SimpleDateFormat( + "yyyy-MM-dd HH:mm:ss"); + timeFormat.setTimeZone(TimeZone.getTimeZone("UTC")); + String hashedIdentity = null, platformLine = null, + publishedLine = null, geoipStartTimeLine = null, + bridgeStatsEndLine = null; + boolean skip = false; + String line = null; + while ((line = br.readLine()) != null) { + if (line.startsWith("r ")) { + int runningBridges = 0; + while ((line = br.readLine()) != null) { + if (line.startsWith("s ") && line.contains(" Running")) { + runningBridges++; + } + } + if (this.csfh != null) { + this.csfh.addBridgeConsensusResults(dateTime, runningBridges); + } + } else if (line.startsWith("router ")) { + } else if (line.startsWith("extra-info ")) { + hashedIdentity = sanitized ? line.split(" ")[2] + : DigestUtils.shaHex(line.split(" ")[2]).toUpperCase(); + if (this.bsfh != null) { + skip = this.bsfh.isKnownRelay(hashedIdentity); + } + } else if (!skip && line.startsWith("platform ")) { + platformLine = line; + } else if (!skip && line.startsWith("published ")) { + publishedLine = line; + } else if (line.startsWith("opt fingerprint") || + line.startsWith("fingerprint")) { + String identity = line.substring(line.startsWith("opt ") ? + "opt fingerprint".length() : "fingerprint".length()). + replaceAll(" ", "").toLowerCase(); + hashedIdentity = sanitized ? identity + : DigestUtils.shaHex(identity).toUpperCase(); + } else if (!skip && line.startsWith("geoip-start-time ")) { + geoipStartTimeLine = line; + } else if (!skip && line.startsWith("geoip-client-origins") + && line.split(" ").length > 1) { + if (publishedLine == null || + geoipStartTimeLine == null) { + this.logger.warning("Either published line or " + + "geoip-start-time line is not present in " + + (sanitized ? "sanitized" : "non-sanitized") + + " bridge descriptors from " + dateTime + "."); + break; + } + long published = timeFormat.parse(publishedLine. + substring("published ".length())).getTime(); + long started = timeFormat.parse(geoipStartTimeLine. + substring("geoip-start-time ".length())).getTime(); + long seconds = (published - started) / 1000L; + double allUsers = 0.0D; + Map<String, String> obs = new HashMap<String, String>(); + String[] parts = line.split(" ")[1].split(","); + for (String p : parts) { + String country = p.substring(0, 2); + double users = ((double) Long.parseLong(p.substring(3)) - 4L) + * 86400.0D / ((double) seconds); + allUsers += users; + obs.put(country, String.format("%.2f", users)); + } + obs.put("zy", String.format("%.2f", allUsers)); + String date = publishedLine.split(" ")[1]; + String time = publishedLine.split(" ")[2]; + if (this.bsfh != null) { + this.bsfh.addObs(hashedIdentity, date, time, obs); + } + } else if (!skip && line.startsWith("bridge-stats-end ")) { + bridgeStatsEndLine = line; + } else if (!skip && line.startsWith("bridge-ips") + && line.split(" ").length > 1) { + if (bridgeStatsEndLine == null) { + this.logger.warning("bridge-ips line without preceding " + + "bridge-stats-end line in " + + (sanitized ? "sanitized" : "non-sanitized") + + " bridge descriptor."); + break; + } + double allUsers = 0.0D; + Map<String, String> obs = new HashMap<String, String>(); + String[] parts = line.split(" ")[1].split(","); + for (String p : parts) { + String country = p.substring(0, 2); + double users = (double) Long.parseLong(p.substring(3)) - 4L; + allUsers += users; + obs.put(country, String.format("%.2f", users)); + } + obs.put("zy", String.format("%.2f", allUsers)); + String date = bridgeStatsEndLine.split(" ")[1]; + String time = bridgeStatsEndLine.split(" ")[2]; + if (this.bsfh != null) { + this.bsfh.addObs(hashedIdentity, date, time, obs); + } + } + } + if (this.bsfh != null && platformLine != null && + platformLine.startsWith("platform Tor 0.2.2")) { + String date = publishedLine.split(" ")[1]; + String time = publishedLine.split(" ")[2]; + this.bsfh.addZeroTwoTwoDescriptor(hashedIdentity, date, time); + } + } catch (IOException e) { + this.logger.log(Level.WARNING, "Could not parse bridge descriptor.", + e); + return; + } catch (ParseException e) { + this.logger.log(Level.WARNING, "Could not parse bridge descriptor.", + e); + return; + } + } +} + diff --git a/src/org/torproject/ernie/cron/BridgeStatsFileHandler.java b/src/org/torproject/ernie/cron/BridgeStatsFileHandler.java new file mode 100644 index 0000000..58ee3a2 --- /dev/null +++ b/src/org/torproject/ernie/cron/BridgeStatsFileHandler.java @@ -0,0 +1,519 @@ +/* Copyright 2011 The Tor Project + * See LICENSE for licensing information */ +package org.torproject.ernie.cron; + +import java.io.*; +import java.sql.*; +import java.text.*; +import java.util.*; +import java.util.logging.*; + +/** + * Determines estimates of bridge users per country and day from the + * extra-info descriptors that bridges publish. In a first step, the + * number of unique IP addresses that bridges see are normalized to a + * 24-hour period. In the next step, all bridges are excluded that have + * been running as a relay. Finally, observations are summed up and + * written to <code>stats/bridge-stats</code>. + */ +public class BridgeStatsFileHandler { + + /** + * Two-letter country codes of known countries. + */ + private SortedSet<String> countries; + + /** + * Intermediate results file containing bridge user numbers by country + * as seen by single bridges, normalized to 24-hour periods. + */ + private File bridgeStatsRawFile; + + /** + * Bridge user numbers by country as seen by single bridges on a given + * day. Map keys are bridge and date written as "bridge,date", map + * values are lines as read from <code>stats/bridge-stats-raw</code>. + */ + private SortedMap<String, Map<String, String>> bridgeUsersRaw; + + /** + * Helper file containing the hashed relay identities of all known + * relays. These hashes are compared to the bridge identity hashes to + * exclude bridges that have been known as relays from the statistics. + */ + private File hashedRelayIdentitiesFile; + + /** + * Known hashed relay identities used to exclude bridges that have been + * running as relays. + */ + private SortedSet<String> hashedRelays; + + /** + * Helper file containing extra-info descriptors published by 0.2.2.x + * bridges. If these descriptors contain geoip-stats, they are not + * included in the results, because stats are very likely broken. + */ + private File zeroTwoTwoDescriptorsFile; + + /** + * Extra-info descriptors published by 0.2.2.x bridges. If these + * descriptors contain geoip-stats, they are not included in the + * results, because stats are very likely broken. + */ + private SortedSet<String> zeroTwoTwoDescriptors; + + /** + * Final results file containing the number of bridge users per country + * and day. This file is not read in during initialization, but + * overwritten at the end of the execution. + */ + private File bridgeStatsFile; + + /** + * Logger for this class. + */ + private Logger logger; + + /* Database connection string. */ + private String connectionURL = null; + + /** + * Initializes this class, including reading in intermediate results + * files <code>stats/bridge-stats-raw</code> and + * <code>stats/hashed-relay-identities</code>. + */ + public BridgeStatsFileHandler(String connectionURL) { + + /* Initialize set of known countries. */ + this.countries = new TreeSet<String>(); + this.countries.add("zy"); + + /* Initialize local data structures to hold results. */ + this.bridgeUsersRaw = new TreeMap<String, Map<String, String>>(); + this.hashedRelays = new TreeSet<String>(); + this.zeroTwoTwoDescriptors = new TreeSet<String>(); + + /* Initialize file names for intermediate and final results. */ + this.bridgeStatsRawFile = new File("stats/bridge-stats-raw"); + this.bridgeStatsFile = new File("stats/bridge-stats"); + this.hashedRelayIdentitiesFile = new File( + "stats/hashed-relay-identities"); + this.zeroTwoTwoDescriptorsFile = new File( + "stats/v022-bridge-descriptors"); + + /* Initialize database connection string. */ + this.connectionURL = connectionURL; + + /* Initialize logger. */ + this.logger = Logger.getLogger( + BridgeStatsFileHandler.class.getName()); + + /* Read in bridge user numbers by country as seen by single bridges, + * normalized to 24-hour periods. */ + if (this.bridgeStatsRawFile.exists()) { + try { + this.logger.fine("Reading file " + + this.bridgeStatsRawFile.getAbsolutePath() + "..."); + BufferedReader br = new BufferedReader(new FileReader( + this.bridgeStatsRawFile)); + String line = br.readLine(); + if (line != null) { + /* The first line should contain headers that we need to parse + * in order to learn what countries we were interested in when + * writing this file. */ + if (!line.startsWith("bridge,date,time,")) { + this.logger.warning("Incorrect first line '" + line + "' in " + + this.bridgeStatsRawFile.getAbsolutePath() + "! This line " + + "should contain headers! Aborting to read in this " + + "file!"); + } else { + String[] headers = line.split(","); + for (int i = 3; i < headers.length; i++) { + if (!headers[i].equals("all")) { + this.countries.add(headers[i]); + } + } + /* Read in the rest of the file. */ + while ((line = br.readLine()) != null) { + String[] parts = line.split(","); + if (parts.length != headers.length) { + this.logger.warning("Corrupt line '" + line + "' in file " + + this.bridgeStatsRawFile.getAbsolutePath() + + "! Aborting to read this file!"); + break; + } + String hashedBridgeIdentity = parts[0]; + String date = parts[1]; + String time = parts[2]; + SortedMap<String, String> obs = + new TreeMap<String, String>(); + for (int i = 3; i < parts.length; i++) { + if (parts[i].equals("NA")) { + continue; + } + if (headers[i].equals("all")) { + obs.put("zy", parts[i]); + } else { + obs.put(headers[i], parts[i]); + } + } + this.addObs(hashedBridgeIdentity, date, time, obs); + } + } + } + br.close(); + this.logger.fine("Finished reading file " + + this.bridgeStatsRawFile.getAbsolutePath() + "."); + } catch (IOException e) { + this.logger.log(Level.WARNING, "Failed to read file " + + this.bridgeStatsRawFile.getAbsolutePath() + "!", e); + } + } + + /* Read in known hashed relay identities used to exclude bridges that + * have been running as relays. */ + if (this.hashedRelayIdentitiesFile.exists()) { + try { + this.logger.fine("Reading file " + + this.hashedRelayIdentitiesFile.getAbsolutePath() + "..."); + BufferedReader br = new BufferedReader(new FileReader( + this.hashedRelayIdentitiesFile)); + String line = null; + /* Read in all lines from the file and memorize them. */ + while ((line = br.readLine()) != null) { + this.hashedRelays.add(line); + } + br.close(); + this.logger.fine("Finished reading file " + + this.hashedRelayIdentitiesFile.getAbsolutePath() + "."); + } catch (IOException e) { + this.logger.log(Level.WARNING, "Failed to read file " + + this.hashedRelayIdentitiesFile.getAbsolutePath() + "!", e); + } + } + + /* Read in known extra-info descriptors published by 0.2.2.x + * bridges. */ + if (this.zeroTwoTwoDescriptorsFile.exists()) { + try { + this.logger.fine("Reading file " + + this.zeroTwoTwoDescriptorsFile.getAbsolutePath() + "..."); + BufferedReader br = new BufferedReader(new FileReader( + this.zeroTwoTwoDescriptorsFile)); + String line = null; + /* Read in all lines from the file and memorize them. */ + while ((line = br.readLine()) != null) { + this.zeroTwoTwoDescriptors.add(line); + } + br.close(); + this.logger.fine("Finished reading file " + + this.zeroTwoTwoDescriptorsFile.getAbsolutePath() + "."); + } catch (IOException e) { + this.logger.log(Level.WARNING, "Failed to read file " + + this.zeroTwoTwoDescriptorsFile.getAbsolutePath() + "!", e); + } + } + } + + /** + * Adds a hashed relay identity string to the list of bridges that we + * are going to ignore in the future. If we counted user numbers from + * bridges that have been running as relays, our numbers would be far + * higher than what we think is correct. + */ + public void addHashedRelay(String hashedRelayIdentity) { + if (!this.hashedRelays.contains(hashedRelayIdentity)) { + this.logger.finer("Adding new hashed relay identity: " + + hashedRelayIdentity); + this.hashedRelays.add(hashedRelayIdentity); + } + } + + /** + * Adds an extra-info descriptor identifier published by an 0.2.2.x + * bridges. If this extra-info descriptor contains geoip-stats, they are + * not included in the results, because stats are very likely broken. + */ + public void addZeroTwoTwoDescriptor(String hashedBridgeIdentity, + String date, String time) { + String value = hashedBridgeIdentity.toUpperCase() + "," + date + "," + + time; + if (!this.zeroTwoTwoDescriptors.contains(value)) { + this.logger.finer("Adding new bridge 0.2.2.x extra-info " + + "descriptor: " + value); + this.zeroTwoTwoDescriptors.add(value); + } + } + + /** + * Returns whether the given fingerprint is a known hashed relay + * identity. <code>BridgeDescriptorParser</code> uses this information + * to decide whether to continue parsing a bridge extra-descriptor + * descriptor or not. + */ + public boolean isKnownRelay(String hashedBridgeIdentity) { + return this.hashedRelays.contains(hashedBridgeIdentity); + } + + /** + * Adds bridge user numbers by country as seen by a single bridge on a + * given date and time. Bridges can publish statistics on unique IP + * addresses multiple times a day, but we only want to include one + * observation per day. If we already have an observation from the given + * bridge and day, we keep the one with the later publication time and + * discard the other one. + */ + public void addObs(String hashedIdentity, String date, String time, + Map<String, String> obs) { + for (String country : obs.keySet()) { + this.countries.add(country); + } + String shortKey = hashedIdentity + "," + date; + String longKey = shortKey + "," + time; + SortedMap<String, Map<String, String>> tailMap = + this.bridgeUsersRaw.tailMap(shortKey); + String nextKey = tailMap.isEmpty() ? null : tailMap.firstKey(); + if (nextKey == null || !nextKey.startsWith(shortKey)) { + this.logger.finer("Adding new bridge user numbers for key " + + longKey); + this.bridgeUsersRaw.put(longKey, obs); + } else if (longKey.compareTo(nextKey) > 0) { + this.logger.finer("Replacing existing bridge user numbers (" + + nextKey + " with new numbers: " + longKey); + this.bridgeUsersRaw.put(longKey, obs); + } else { + this.logger.finer("Not replacing existing bridge user numbers (" + + nextKey + " with new numbers (" + longKey + ")."); + } + } + + /** + * Writes the list of hashed relay identities and bridge user numbers as + * observed by single bridges to disk, aggregates per-day statistics for + * all bridges, and writes those to disk, too. + */ + public void writeFiles() { + + /* Write hashed relay identities to disk. */ + try { + this.logger.fine("Writing file " + + this.hashedRelayIdentitiesFile.getAbsolutePath() + "..."); + this.hashedRelayIdentitiesFile.getParentFile().mkdirs(); + BufferedWriter bw = new BufferedWriter(new FileWriter( + this.hashedRelayIdentitiesFile)); + for (String hashedRelay : this.hashedRelays) { + bw.append(hashedRelay + "\n"); + } + bw.close(); + this.logger.fine("Finished writing file " + + this.hashedRelayIdentitiesFile.getAbsolutePath() + "."); + } catch (IOException e) { + this.logger.log(Level.WARNING, "Failed to write " + + this.hashedRelayIdentitiesFile.getAbsolutePath() + "!", e); + } + + /* Write bridge extra-info descriptor identifiers to disk. */ + try { + this.logger.fine("Writing file " + + this.zeroTwoTwoDescriptorsFile.getAbsolutePath() + "..."); + this.zeroTwoTwoDescriptorsFile.getParentFile().mkdirs(); + BufferedWriter bw = new BufferedWriter(new FileWriter( + this.zeroTwoTwoDescriptorsFile)); + for (String descriptorIdentifier : this.zeroTwoTwoDescriptors) { + bw.append(descriptorIdentifier + "\n"); + } + bw.close(); + this.logger.fine("Finished writing file " + + this.zeroTwoTwoDescriptorsFile.getAbsolutePath() + "."); + } catch (IOException e) { + this.logger.log(Level.WARNING, "Failed to write " + + this.zeroTwoTwoDescriptorsFile.getAbsolutePath() + "!", e); + } + + /* Write observations made by single bridges to disk. */ + try { + this.logger.fine("Writing file " + + this.bridgeStatsRawFile.getAbsolutePath() + "..."); + this.bridgeStatsRawFile.getParentFile().mkdirs(); + BufferedWriter bw = new BufferedWriter(new FileWriter( + this.bridgeStatsRawFile)); + bw.append("bridge,date,time"); + for (String c : this.countries) { + if (c.equals("zy")) { + bw.append(",all"); + } else { + bw.append("," + c); + } + } + bw.append("\n"); + for (Map.Entry<String, Map<String, String>> e : + this.bridgeUsersRaw.entrySet()) { + String longKey = e.getKey(); + String[] parts = longKey.split(","); + String hashedBridgeIdentity = parts[0]; + String date = parts[1]; + String time = parts[2]; + if (!this.hashedRelays.contains(hashedBridgeIdentity) && + !this.zeroTwoTwoDescriptors.contains(longKey)) { + Map<String, String> obs = e.getValue(); + StringBuilder sb = new StringBuilder(longKey); + for (String c : this.countries) { + sb.append("," + (obs.containsKey(c) && + !obs.get(c).startsWith("-") ? obs.get(c) : "NA")); + } + String line = sb.toString(); + bw.append(line + "\n"); + } + } + bw.close(); + this.logger.fine("Finished writing file " + + this.bridgeStatsRawFile.getAbsolutePath() + "."); + } catch (IOException e) { + this.logger.log(Level.WARNING, "Failed to write " + + this.bridgeStatsRawFile.getAbsolutePath() + "!", e); + } + + /* Aggregate per-day statistics. */ + SortedMap<String, double[]> bridgeUsersPerDay = + new TreeMap<String, double[]>(); + for (Map.Entry<String, Map<String, String>> e : + this.bridgeUsersRaw.entrySet()) { + String longKey = e.getKey(); + String[] parts = longKey.split(","); + String hashedBridgeIdentity = parts[0]; + String date = parts[1]; + String time = parts[2]; + if (!this.hashedRelays.contains(hashedBridgeIdentity) && + !this.zeroTwoTwoDescriptors.contains(longKey)) { + double[] users = bridgeUsersPerDay.get(date); + Map<String, String> obs = e.getValue(); + if (users == null) { + users = new double[this.countries.size()]; + bridgeUsersPerDay.put(date, users); + } + int i = 0; + for (String c : this.countries) { + if (obs.containsKey(c) && !obs.get(c).startsWith("-")) { + users[i] += Double.parseDouble(obs.get(c)); + } + i++; + } + } + } + + /* Write final results of bridge users per day and country to + * <code>stats/bridge-stats</code>. */ + try { + this.logger.fine("Writing file " + + this.bridgeStatsRawFile.getAbsolutePath() + "..."); + this.bridgeStatsFile.getParentFile().mkdirs(); + BufferedWriter bw = new BufferedWriter(new FileWriter( + this.bridgeStatsFile)); + bw.append("date"); + for (String c : this.countries) { + if (c.equals("zy")) { + bw.append(",all"); + } else { + bw.append("," + c); + } + } + bw.append("\n"); + + /* Write current observation. */ + for (Map.Entry<String, double[]> e : bridgeUsersPerDay.entrySet()) { + String date = e.getKey(); + bw.append(date); + double[] users = e.getValue(); + for (int i = 0; i < users.length; i++) { + bw.append("," + String.format("%.2f", users[i])); + } + bw.append("\n"); + } + bw.close(); + this.logger.fine("Finished writing file " + + this.bridgeStatsFile.getAbsolutePath() + "."); + } catch (IOException e) { + this.logger.log(Level.WARNING, "Failed to write " + + this.bridgeStatsFile.getAbsolutePath() + "!", e); + } + + /* Add daily bridge users to database. */ + if (connectionURL != null) { + try { + List<String> countryList = new ArrayList<String>(); + for (String c : this.countries) { + countryList.add(c); + } + Map<String, Integer> insertRows = new HashMap<String, Integer>(), + updateRows = new HashMap<String, Integer>(); + for (Map.Entry<String, double[]> e : + bridgeUsersPerDay.entrySet()) { + String date = e.getKey(); + double[] users = e.getValue(); + for (int i = 0; i < users.length; i++) { + int usersInt = (int) users[i]; + if (usersInt < 1) { + continue; + } + String country = countryList.get(i); + String key = date + "," + country; + insertRows.put(key, usersInt); + } + } + Connection conn = DriverManager.getConnection(connectionURL); + conn.setAutoCommit(false); + Statement statement = conn.createStatement(); + ResultSet rs = statement.executeQuery( + "SELECT date, country, users FROM bridge_stats"); + while (rs.next()) { + String date = rs.getDate(1).toString(); + String country = rs.getString(2); + String key = date + "," + country; + if (insertRows.containsKey(key)) { + int insertRow = insertRows.remove(key); + int oldUsers = rs.getInt(3); + if (oldUsers != insertRow) { + updateRows.put(key, insertRow); + } + } + } + rs.close(); + PreparedStatement psU = conn.prepareStatement( + "UPDATE bridge_stats SET users = ? " + + "WHERE date = ? AND country = ?"); + for (Map.Entry<String, Integer> e : updateRows.entrySet()) { + String[] keyParts = e.getKey().split(","); + java.sql.Date date = java.sql.Date.valueOf(keyParts[0]); + String country = keyParts[1]; + int users = e.getValue(); + psU.clearParameters(); + psU.setInt(1, users); + psU.setDate(2, date); + psU.setString(3, country); + psU.executeUpdate(); + } + PreparedStatement psI = conn.prepareStatement( + "INSERT INTO bridge_stats (users, date, country) " + + "VALUES (?, ?, ?)"); + for (Map.Entry<String, Integer> e : insertRows.entrySet()) { + String[] keyParts = e.getKey().split(","); + java.sql.Date date = java.sql.Date.valueOf(keyParts[0]); + String country = keyParts[1]; + int users = e.getValue(); + psI.clearParameters(); + psI.setInt(1, users); + psI.setDate(2, date); + psI.setString(3, country); + psI.executeUpdate(); + } + conn.commit(); + conn.close(); + } catch (SQLException e) { + logger.log(Level.WARNING, "Failed to add daily bridge users to " + + "database.", e); + } + } + } +} + diff --git a/src/org/torproject/ernie/cron/Configuration.java b/src/org/torproject/ernie/cron/Configuration.java index 66ad778..818f9e9 100644 --- a/src/org/torproject/ernie/cron/Configuration.java +++ b/src/org/torproject/ernie/cron/Configuration.java @@ -16,12 +16,16 @@ public class Configuration { private boolean importDirectoryArchives = false; private String directoryArchivesDirectory = "archives/"; private boolean keepDirectoryArchiveImportHistory = false; + private boolean importSanitizedBridges = false; + private String sanitizedBridgesDirectory = "bridges/"; + private boolean keepSanitizedBridgesImportHistory = false; private boolean writeRelayDescriptorDatabase = false; private String relayDescriptorDatabaseJdbc = "jdbc:postgresql://localhost/tordir?user=metrics&password=password"; private boolean writeRelayDescriptorsRawFiles = false; private String relayDescriptorRawFilesDirectory = "pg-import/"; private boolean writeConsensusHealth = false; + private boolean writeBridgeStats = false; public Configuration() {
/* Initialize logger. */ @@ -47,6 +51,14 @@ public class Configuration { } else if (line.startsWith("KeepDirectoryArchiveImportHistory")) { this.keepDirectoryArchiveImportHistory = Integer.parseInt( line.split(" ")[1]) != 0; + } else if (line.startsWith("ImportSanitizedBridges")) { + this.importSanitizedBridges = Integer.parseInt( + line.split(" ")[1]) != 0; + } else if (line.startsWith("SanitizedBridgesDirectory")) { + this.sanitizedBridgesDirectory = line.split(" ")[1]; + } else if (line.startsWith("KeepSanitizedBridgesImportHistory")) { + this.keepSanitizedBridgesImportHistory = Integer.parseInt( + line.split(" ")[1]) != 0; } else if (line.startsWith("WriteRelayDescriptorDatabase")) { this.writeRelayDescriptorDatabase = Integer.parseInt( line.split(" ")[1]) != 0; @@ -60,6 +72,9 @@ public class Configuration { } else if (line.startsWith("WriteConsensusHealth")) { this.writeConsensusHealth = Integer.parseInt( line.split(" ")[1]) != 0; + } else if (line.startsWith("WriteBridgeStats")) { + this.writeBridgeStats = Integer.parseInt( + line.split(" ")[1]) != 0; } else { logger.severe("Configuration file contains unrecognized " + "configuration key in line '" + line + "'! Exiting!"); @@ -97,6 +112,15 @@ public class Configuration { public boolean getWriteRelayDescriptorDatabase() { return this.writeRelayDescriptorDatabase; } + public boolean getImportSanitizedBridges() { + return this.importSanitizedBridges; + } + public String getSanitizedBridgesDirectory() { + return this.sanitizedBridgesDirectory; + } + public boolean getKeepSanitizedBridgesImportHistory() { + return this.keepSanitizedBridgesImportHistory; + } public String getRelayDescriptorDatabaseJDBC() { return this.relayDescriptorDatabaseJdbc; } @@ -109,5 +133,8 @@ public class Configuration { public boolean getWriteConsensusHealth() { return this.writeConsensusHealth; } + public boolean getWriteBridgeStats() { + return this.writeBridgeStats; + } }
diff --git a/src/org/torproject/ernie/cron/ConsensusStatsFileHandler.java b/src/org/torproject/ernie/cron/ConsensusStatsFileHandler.java new file mode 100644 index 0000000..4ad5300 --- /dev/null +++ b/src/org/torproject/ernie/cron/ConsensusStatsFileHandler.java @@ -0,0 +1,282 @@ +/* Copyright 2011 The Tor Project + * See LICENSE for licensing information */ +package org.torproject.ernie.cron; + +import java.io.*; +import java.sql.*; +import java.text.*; +import java.util.*; +import java.util.logging.*; + +/** + * Generates statistics on the average number of relays and bridges per + * day. Accepts parse results from <code>RelayDescriptorParser</code> and + * <code>BridgeDescriptorParser</code> and stores them in intermediate + * result files <code>stats/consensus-stats-raw</code> and + * <code>stats/bridge-consensus-stats-raw</code>. Writes final results to + * <code>stats/consensus-stats</code> for all days for which at least half + * of the expected consensuses or statuses are known. + */ +public class ConsensusStatsFileHandler { + + /** + * Intermediate results file holding the number of running bridges per + * bridge status. + */ + private File bridgeConsensusStatsRawFile; + + /** + * Number of running bridges in a given bridge status. Map keys are + * bridge status times formatted as "yyyy-MM-dd HH:mm:ss", map values + * are lines as read from <code>stats/bridge-consensus-stats-raw</code>. + */ + private SortedMap<String, String> bridgesRaw; + + /** + * Average number of running bridges per day. Map keys are dates + * formatted as "yyyy-MM-dd", map values are the last column as written + * to <code>stats/consensus-stats</code>. + */ + private SortedMap<String, String> bridgesPerDay; + + /** + * Logger for this class. + */ + private Logger logger; + + private int bridgeResultsAdded = 0; + + /* Database connection string. */ + private String connectionURL = null; + + /** + * Initializes this class, including reading in intermediate results + * files <code>stats/consensus-stats-raw</code> and + * <code>stats/bridge-consensus-stats-raw</code> and final results file + * <code>stats/consensus-stats</code>. + */ + public ConsensusStatsFileHandler(String connectionURL) { + + /* Initialize local data structures to hold intermediate and final + * results. */ + this.bridgesPerDay = new TreeMap<String, String>(); + this.bridgesRaw = new TreeMap<String, String>(); + + /* Initialize file names for intermediate and final results files. */ + this.bridgeConsensusStatsRawFile = new File( + "stats/bridge-consensus-stats-raw"); + + /* Initialize database connection string. */ + this.connectionURL = connectionURL; + + /* Initialize logger. */ + this.logger = Logger.getLogger( + ConsensusStatsFileHandler.class.getName()); + + /* Read in number of running bridges per bridge status. */ + if (this.bridgeConsensusStatsRawFile.exists()) { + try { + this.logger.fine("Reading file " + + this.bridgeConsensusStatsRawFile.getAbsolutePath() + "..."); + BufferedReader br = new BufferedReader(new FileReader( + this.bridgeConsensusStatsRawFile)); + String line = null; + while ((line = br.readLine()) != null) { + if (line.startsWith("date")) { + /* Skip headers. */ + continue; + } + String[] parts = line.split(","); + if (parts.length != 2) { + this.logger.warning("Corrupt line '" + line + "' in file " + + this.bridgeConsensusStatsRawFile.getAbsolutePath() + + "! Aborting to read this file!"); + break; + } + String dateTime = parts[0]; + this.bridgesRaw.put(dateTime, line); + } + br.close(); + this.logger.fine("Finished reading file " + + this.bridgeConsensusStatsRawFile.getAbsolutePath() + "."); + } catch (IOException e) { + this.logger.log(Level.WARNING, "Failed to read file " + + this.bridgeConsensusStatsRawFile.getAbsolutePath() + "!", + e); + } + } + } + + /** + * Adds the intermediate results of the number of running bridges in a + * given bridge status to the existing observations. + */ + public void addBridgeConsensusResults(String published, int running) { + String line = published + "," + running; + if (!this.bridgesRaw.containsKey(published)) { + this.logger.finer("Adding new bridge numbers: " + line); + this.bridgesRaw.put(published, line); + this.bridgeResultsAdded++; + } else if (!line.equals(this.bridgesRaw.get(published))) { + this.logger.warning("The numbers of running bridges we were just " + + "given (" + line + ") are different from what we learned " + + "before (" + this.bridgesRaw.get(published) + ")! " + + "Overwriting!"); + this.bridgesRaw.put(published, line); + } + } + + /** + * Aggregates the raw observations on relay and bridge numbers and + * writes both raw and aggregate observations to disk. + */ + public void writeFiles() { + + /* Did we learn anything new about average relay or bridge numbers in + * this run? */ + boolean writeConsensusStats = false; + + /* Go through raw observations of numbers of running bridges in bridge + * statuses, calculate averages per day, and add these averages to + * final results. */ + if (!this.bridgesRaw.isEmpty()) { + String tempDate = null; + int brunning = 0, statuses = 0; + Iterator<String> it = this.bridgesRaw.values().iterator(); + boolean haveWrittenFinalLine = false; + while (it.hasNext() || !haveWrittenFinalLine) { + String next = it.hasNext() ? it.next() : null; + /* Finished reading a day or even all lines? */ + if (tempDate != null && (next == null + || !next.substring(0, 10).equals(tempDate))) { + /* Only write results if we have seen at least half of all + * statuses. */ + if (statuses >= 24) { + String line = "," + (brunning / statuses); + /* Are our results new? */ + if (!this.bridgesPerDay.containsKey(tempDate)) { + this.logger.finer("Adding new average bridge numbers: " + + tempDate + line); + this.bridgesPerDay.put(tempDate, line); + writeConsensusStats = true; + } else if (!line.equals(this.bridgesPerDay.get(tempDate))) { + this.logger.finer("Replacing existing average bridge " + + "numbers (" + this.bridgesPerDay.get(tempDate) + + " with new numbers: " + line); + this.bridgesPerDay.put(tempDate, line); + writeConsensusStats = true; + } + } + brunning = statuses = 0; + haveWrittenFinalLine = (next == null); + } + /* Sum up number of running bridges. */ + if (next != null) { + tempDate = next.substring(0, 10); + statuses++; + brunning += Integer.parseInt(next.split(",")[1]); + } + } + } + + /* Write raw numbers of running bridges to disk. */ + try { + this.logger.fine("Writing file " + + this.bridgeConsensusStatsRawFile.getAbsolutePath() + "..."); + this.bridgeConsensusStatsRawFile.getParentFile().mkdirs(); + BufferedWriter bw = new BufferedWriter( + new FileWriter(this.bridgeConsensusStatsRawFile)); + bw.append("datetime,brunning\n"); + for (String line : this.bridgesRaw.values()) { + bw.append(line + "\n"); + } + bw.close(); + this.logger.fine("Finished writing file " + + this.bridgeConsensusStatsRawFile.getAbsolutePath() + "."); + } catch (IOException e) { + this.logger.log(Level.WARNING, "Failed to write file " + + this.bridgeConsensusStatsRawFile.getAbsolutePath() + "!", + e); + } + + /* Add average number of bridges per day to the database. */ + if (connectionURL != null) { + try { + Map<String, String> insertRows = new HashMap<String, String>(), + updateRows = new HashMap<String, String>(); + insertRows.putAll(this.bridgesPerDay); + Connection conn = DriverManager.getConnection(connectionURL); + conn.setAutoCommit(false); + Statement statement = conn.createStatement(); + ResultSet rs = statement.executeQuery( + "SELECT date, avg_running FROM bridge_network_size"); + while (rs.next()) { + String date = rs.getDate(1).toString(); + if (insertRows.containsKey(date)) { + String insertRow = insertRows.remove(date); + long newAvgRunning = Long.parseLong(insertRow.substring(1)); + long oldAvgRunning = rs.getLong(2); + if (newAvgRunning != oldAvgRunning) { + updateRows.put(date, insertRow); + } + } + } + rs.close(); + PreparedStatement psU = conn.prepareStatement( + "UPDATE bridge_network_size SET avg_running = ? " + + "WHERE date = ?"); + for (Map.Entry<String, String> e : updateRows.entrySet()) { + java.sql.Date date = java.sql.Date.valueOf(e.getKey()); + long avgRunning = Long.parseLong(e.getValue().substring(1)); + psU.clearParameters(); + psU.setLong(1, avgRunning); + psU.setDate(2, date); + psU.executeUpdate(); + } + PreparedStatement psI = conn.prepareStatement( + "INSERT INTO bridge_network_size (avg_running, date) " + + "VALUES (?, ?)"); + for (Map.Entry<String, String> e : insertRows.entrySet()) { + java.sql.Date date = java.sql.Date.valueOf(e.getKey()); + long avgRunning = Long.parseLong(e.getValue().substring(1)); + psI.clearParameters(); + psI.setLong(1, avgRunning); + psI.setDate(2, date); + psI.executeUpdate(); + } + conn.commit(); + conn.close(); + } catch (SQLException e) { + logger.log(Level.WARNING, "Failed to add average bridge numbers " + + "to database.", e); + } + } + + /* Write stats. */ + StringBuilder dumpStats = new StringBuilder("Finished writing " + + "statistics on bridge network statuses to disk.\nAdded " + + this.bridgeResultsAdded + " bridge network status(es) in this " + + "execution."); + long now = System.currentTimeMillis(); + SimpleDateFormat dateTimeFormat = + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + dateTimeFormat.setTimeZone(TimeZone.getTimeZone("UTC")); + if (this.bridgesRaw.isEmpty()) { + dumpStats.append("\nNo bridge status known yet."); + } else { + dumpStats.append("\nLast known bridge status was published " + + this.bridgesRaw.lastKey() + "."); + try { + if (now - 6L * 60L * 60L * 1000L > dateTimeFormat.parse( + this.bridgesRaw.lastKey()).getTime()) { + logger.warning("Last known bridge status is more than 6 hours " + + "old: " + this.bridgesRaw.lastKey()); + } + } catch (ParseException e) { + /* Can't parse the timestamp? Whatever. */ + } + } + logger.info(dumpStats.toString()); + } +} + diff --git a/src/org/torproject/ernie/cron/Main.java b/src/org/torproject/ernie/cron/Main.java index 1a125bc..0551586 100644 --- a/src/org/torproject/ernie/cron/Main.java +++ b/src/org/torproject/ernie/cron/Main.java @@ -33,6 +33,11 @@ public class Main { // Define stats directory for temporary files File statsDirectory = new File("stats");
+ // Prepare bridge stats file handler + BridgeStatsFileHandler bsfh = config.getWriteBridgeStats() ? + new BridgeStatsFileHandler( + config.getRelayDescriptorDatabaseJDBC()) : null; + // Prepare consensus health checker ConsensusHealthChecker chc = config.getWriteConsensusHealth() ? new ConsensusHealthChecker() : null; @@ -50,7 +55,7 @@ public class Main { // Prepare relay descriptor parser (only if we are writing the // consensus-health page to disk) RelayDescriptorParser rdp = chc != null || rddi != null ? - new RelayDescriptorParser(chc, rddi) : null; + new RelayDescriptorParser(chc, rddi, bsfh) : null;
// Import relay descriptors if (rdp != null) { @@ -73,6 +78,33 @@ public class Main { chc = null; }
+ // Prepare consensus stats file handler (used for stats on running + // bridges only) + ConsensusStatsFileHandler csfh = config.getWriteBridgeStats() ? + new ConsensusStatsFileHandler( + config.getRelayDescriptorDatabaseJDBC()) : null; + + // Prepare bridge descriptor parser + BridgeDescriptorParser bdp = config.getWriteBridgeStats() ? + new BridgeDescriptorParser(csfh, bsfh) : null; + + // Import bridge descriptors + if (bdp != null && config.getImportSanitizedBridges()) { + new SanitizedBridgesReader(bdp, + new File(config.getSanitizedBridgesDirectory()), + statsDirectory, config.getKeepSanitizedBridgesImportHistory()); + } + + // Write updated stats files to disk + if (bsfh != null) { + bsfh.writeFiles(); + bsfh = null; + } + if (csfh != null) { + csfh.writeFiles(); + csfh = null; + } + // Remove lock file lf.releaseLock();
diff --git a/src/org/torproject/ernie/cron/RelayDescriptorParser.java b/src/org/torproject/ernie/cron/RelayDescriptorParser.java index d02022a..a9a0d3d 100644 --- a/src/org/torproject/ernie/cron/RelayDescriptorParser.java +++ b/src/org/torproject/ernie/cron/RelayDescriptorParser.java @@ -18,6 +18,11 @@ import org.apache.commons.codec.binary.*; public class RelayDescriptorParser {
/** + * Stats file handler that accepts parse results for bridge statistics. + */ + private BridgeStatsFileHandler bsfh; + + /** * Relay descriptor database importer that stores relay descriptor * contents for later evaluation. */ @@ -36,9 +41,10 @@ public class RelayDescriptorParser { * Initializes this class. */ public RelayDescriptorParser(ConsensusHealthChecker chc, - RelayDescriptorDatabaseImporter rddi) { + RelayDescriptorDatabaseImporter rddi, BridgeStatsFileHandler bsfh) { this.chc = chc; this.rddi = rddi; + this.bsfh = bsfh;
/* Initialize logger. */ this.logger = Logger.getLogger(RelayDescriptorParser.class.getName()); @@ -76,6 +82,7 @@ public class RelayDescriptorParser { orPort = 0L, dirPort = 0L; SortedSet<String> relayFlags = null; StringBuilder rawStatusEntry = null; + SortedSet<String> hashedRelayIdentities = new TreeSet<String>(); while ((line = br.readLine()) != null) { if (line.equals("vote-status vote")) { isConsensus = false; @@ -111,6 +118,9 @@ public class RelayDescriptorParser { relayIdentity = Hex.encodeHexString( Base64.decodeBase64(parts[2] + "=")). toLowerCase(); + hashedRelayIdentities.add(DigestUtils.shaHex( + Base64.decodeBase64(parts[2] + "=")). + toUpperCase()); serverDesc = Hex.encodeHexString(Base64.decodeBase64( parts[3] + "=")).toLowerCase(); published = parseFormat.parse(parts[4] + " " + parts[5]). @@ -144,6 +154,11 @@ public class RelayDescriptorParser { } } if (isConsensus) { + if (this.bsfh != null) { + for (String hashedRelayIdentity : hashedRelayIdentities) { + this.bsfh.addHashedRelay(hashedRelayIdentity); + } + } if (this.chc != null) { this.chc.processConsensus(validAfterTime, data); } diff --git a/src/org/torproject/ernie/cron/SanitizedBridgesReader.java b/src/org/torproject/ernie/cron/SanitizedBridgesReader.java new file mode 100644 index 0000000..b7f1d44 --- /dev/null +++ b/src/org/torproject/ernie/cron/SanitizedBridgesReader.java @@ -0,0 +1,114 @@ +/* Copyright 2011 The Tor Project + * See LICENSE for licensing information */ +package org.torproject.ernie.cron; + +import java.io.*; +import java.util.*; +import java.util.logging.*; + +public class SanitizedBridgesReader { + public SanitizedBridgesReader(BridgeDescriptorParser bdp, + File bridgesDir, File statsDirectory, boolean keepImportHistory) { + + if (bdp == null || bridgesDir == null || statsDirectory == null) { + throw new IllegalArgumentException(); + } + + Logger logger = + Logger.getLogger(SanitizedBridgesReader.class.getName()); + SortedSet<String> bridgesImportHistory = new TreeSet<String>(); + File bridgesImportHistoryFile = + new File(statsDirectory, "bridges-import-history"); + if (keepImportHistory && bridgesImportHistoryFile.exists()) { + try { + BufferedReader br = new BufferedReader(new FileReader( + bridgesImportHistoryFile)); + String line = null; + while ((line = br.readLine()) != null) { + bridgesImportHistory.add(line); + } + br.close(); + } catch (IOException e) { + logger.log(Level.WARNING, "Could not read in bridge descriptor " + + "import history file. Skipping."); + } + } + if (bridgesDir.exists()) { + logger.fine("Importing files in directory " + bridgesDir + "/..."); + Stack<File> filesInInputDir = new Stack<File>(); + filesInInputDir.add(bridgesDir); + List<File> problems = new ArrayList<File>(); + while (!filesInInputDir.isEmpty()) { + File pop = filesInInputDir.pop(); + if (pop.isDirectory()) { + for (File f : pop.listFiles()) { + filesInInputDir.add(f); + } + continue; + } else if (keepImportHistory && bridgesImportHistory.contains( + pop.getName())) { + continue; + } else { + try { + BufferedInputStream bis = new BufferedInputStream( + new FileInputStream(pop)); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + int len; + byte[] data = new byte[1024]; + while ((len = bis.read(data, 0, 1024)) >= 0) { + baos.write(data, 0, len); + } + bis.close(); + byte[] allData = baos.toByteArray(); + String fn = pop.getName(); + // TODO dateTime extraction doesn't work for sanitized network + // statuses! + String dateTime = fn.substring(0, 4) + "-" + fn.substring(4, 6) + + "-" + fn.substring(6, 8) + " " + fn.substring(9, 11) + + ":" + fn.substring(11, 13) + ":" + fn.substring(13, 15); + bdp.parse(allData, dateTime, true); + if (keepImportHistory) { + bridgesImportHistory.add(pop.getName()); + } + } catch (IOException e) { + problems.add(pop); + if (problems.size() > 3) { + break; + } + } + } + } + if (problems.isEmpty()) { + logger.fine("Finished importing files in directory " + bridgesDir + + "/."); + } else { + StringBuilder sb = new StringBuilder("Failed importing files in " + + "directory " + bridgesDir + "/:"); + int printed = 0; + for (File f : problems) { + sb.append("\n " + f.getAbsolutePath()); + if (++printed >= 3) { + sb.append("\n ... more"); + break; + } + } + logger.warning(sb.toString()); + } + if (keepImportHistory) { + try { + bridgesImportHistoryFile.getParentFile().mkdirs(); + BufferedWriter bw = new BufferedWriter(new FileWriter( + bridgesImportHistoryFile)); + for (String line : bridgesImportHistory) { + bw.write(line + "\n"); + } + bw.close(); + } catch (IOException e) { + logger.log(Level.WARNING, "Could not write bridge descriptor " + + "import history file."); + } + } + } + } +} + diff --git a/src/org/torproject/ernie/test/SanitizedBridgesReaderTest.java b/src/org/torproject/ernie/test/SanitizedBridgesReaderTest.java new file mode 100644 index 0000000..6dd9132 --- /dev/null +++ b/src/org/torproject/ernie/test/SanitizedBridgesReaderTest.java @@ -0,0 +1,33 @@ +/* Copyright 2011 The Tor Project + * See LICENSE for licensing information */ +package org.torproject.ernie.test; + +import org.torproject.ernie.cron.*; + +import java.io.*; + +import org.junit.*; +import org.junit.rules.*; +import static org.junit.Assert.*; + +public class SanitizedBridgesReaderTest { + + private File tempSanitizedBridgesDirectory; + private File tempStatsDirectory; + + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + + @Before + public void createTempDirectories() { + this.tempSanitizedBridgesDirectory = folder.newFolder("bridges"); + this.tempStatsDirectory = folder.newFolder("stats"); + } + + @Test(expected = IllegalArgumentException.class) + public void testBridgeDescriptorParserNull() { + new SanitizedBridgesReader(null, this.tempSanitizedBridgesDirectory, + this.tempStatsDirectory, false); + } +} +