commit dac73329a421cb3b13978c56be88ad36abd1c159 Author: Karsten Loesing karsten.loesing@gmx.net Date: Tue Mar 1 16:40:55 2011 +0100
Import relay descriptors as part of metrics-web. --- build.xml | 1 + config.template | 14 + src/org/torproject/ernie/cron/Configuration.java | 27 + src/org/torproject/ernie/cron/Main.java | 19 +- .../cron/RelayDescriptorDatabaseImporter.java | 1213 ++++++++++++++++++++ .../ernie/cron/RelayDescriptorParser.java | 241 ++++- 6 files changed, 1509 insertions(+), 6 deletions(-)
diff --git a/build.xml b/build.xml index 86ce310..446572b 100644 --- a/build.xml +++ b/build.xml @@ -13,6 +13,7 @@ <path id="classpath"> <pathelement path="${classes}"/> <pathelement location="lib/commons-codec-1.4.jar"/> + <pathelement location="lib/postgresql-8.4-702.jdbc3.jar"/> </path>
<target name="init"> diff --git a/config.template b/config.template index 479d78c..1d0701c 100644 --- a/config.template +++ b/config.template @@ -10,6 +10,20 @@ ## again, but it can be confusing to users who don't know about it. #KeepDirectoryArchiveImportHistory 0 # +## Write relay descriptors to the database +#WriteRelayDescriptorDatabase 0 +# +## JDBC string for relay descriptor database +#RelayDescriptorDatabaseJDBC jdbc:postgresql://localhost/tordir?user=metrics&password=password +# +## Write relay descriptors to raw text files for importing them into the +## database using PostgreSQL's \copy command +#WriteRelayDescriptorsRawFiles 0 +# +## Relative path to directory to write raw text files; note that existing +## files will be overwritten! +#RelayDescriptorRawFilesDirectory pg-import/ +# ## Write statistics about the current consensus and votes to the ## website #WriteConsensusHealth 0 diff --git a/src/org/torproject/ernie/cron/Configuration.java b/src/org/torproject/ernie/cron/Configuration.java index 6b76dc7..66ad778 100644 --- a/src/org/torproject/ernie/cron/Configuration.java +++ b/src/org/torproject/ernie/cron/Configuration.java @@ -16,6 +16,11 @@ public class Configuration { private boolean importDirectoryArchives = false; private String directoryArchivesDirectory = "archives/"; private boolean keepDirectoryArchiveImportHistory = false; + private boolean writeRelayDescriptorDatabase = false; + private String relayDescriptorDatabaseJdbc = + "jdbc:postgresql://localhost/tordir?user=metrics&password=password"; + private boolean writeRelayDescriptorsRawFiles = false; + private String relayDescriptorRawFilesDirectory = "pg-import/"; private boolean writeConsensusHealth = false; public Configuration() {
@@ -42,6 +47,16 @@ public class Configuration { } else if (line.startsWith("KeepDirectoryArchiveImportHistory")) { this.keepDirectoryArchiveImportHistory = Integer.parseInt( line.split(" ")[1]) != 0; + } else if (line.startsWith("WriteRelayDescriptorDatabase")) { + this.writeRelayDescriptorDatabase = Integer.parseInt( + line.split(" ")[1]) != 0; + } else if (line.startsWith("RelayDescriptorDatabaseJDBC")) { + this.relayDescriptorDatabaseJdbc = line.split(" ")[1]; + } else if (line.startsWith("WriteRelayDescriptorsRawFiles")) { + this.writeRelayDescriptorsRawFiles = Integer.parseInt( + line.split(" ")[1]) != 0; + } else if (line.startsWith("RelayDescriptorRawFilesDirectory")) { + this.relayDescriptorRawFilesDirectory = line.split(" ")[1]; } else if (line.startsWith("WriteConsensusHealth")) { this.writeConsensusHealth = Integer.parseInt( line.split(" ")[1]) != 0; @@ -79,6 +94,18 @@ public class Configuration { public boolean getKeepDirectoryArchiveImportHistory() { return this.keepDirectoryArchiveImportHistory; } + public boolean getWriteRelayDescriptorDatabase() { + return this.writeRelayDescriptorDatabase; + } + public String getRelayDescriptorDatabaseJDBC() { + return this.relayDescriptorDatabaseJdbc; + } + public boolean getWriteRelayDescriptorsRawFiles() { + return this.writeRelayDescriptorsRawFiles; + } + public String getRelayDescriptorRawFilesDirectory() { + return this.relayDescriptorRawFilesDirectory; + } public boolean getWriteConsensusHealth() { return this.writeConsensusHealth; } diff --git a/src/org/torproject/ernie/cron/Main.java b/src/org/torproject/ernie/cron/Main.java index b95a133..1a125bc 100644 --- a/src/org/torproject/ernie/cron/Main.java +++ b/src/org/torproject/ernie/cron/Main.java @@ -37,10 +37,20 @@ public class Main { ConsensusHealthChecker chc = config.getWriteConsensusHealth() ? new ConsensusHealthChecker() : null;
+ // Prepare writing relay descriptors to database + RelayDescriptorDatabaseImporter rddi = + config.getWriteRelayDescriptorDatabase() || + config.getWriteRelayDescriptorsRawFiles() ? + new RelayDescriptorDatabaseImporter( + config.getWriteRelayDescriptorDatabase() ? + config.getRelayDescriptorDatabaseJDBC() : null, + config.getWriteRelayDescriptorsRawFiles() ? + config.getRelayDescriptorRawFilesDirectory() : null) : null; + // Prepare relay descriptor parser (only if we are writing the // consensus-health page to disk) - RelayDescriptorParser rdp = config.getWriteConsensusHealth() ? - new RelayDescriptorParser(chc) : null; + RelayDescriptorParser rdp = chc != null || rddi != null ? + new RelayDescriptorParser(chc, rddi) : null;
// Import relay descriptors if (rdp != null) { @@ -52,6 +62,11 @@ public class Main { } }
+ // Close database connection (if active) + if (rddi != null) { + rddi.closeConnection(); + } + // Write consensus health website if (chc != null) { chc.writeStatusWebsite(); diff --git a/src/org/torproject/ernie/cron/RelayDescriptorDatabaseImporter.java b/src/org/torproject/ernie/cron/RelayDescriptorDatabaseImporter.java new file mode 100644 index 0000000..f8f1e9a --- /dev/null +++ b/src/org/torproject/ernie/cron/RelayDescriptorDatabaseImporter.java @@ -0,0 +1,1213 @@ +/* Copyright 2011 The Tor Project + * See LICENSE for licensing information */ +package org.torproject.ernie.cron; + +import java.io.*; +import java.sql.*; +import java.text.*; +import java.util.*; +import java.util.logging.*; +import org.postgresql.util.*; + +/** + * Parse directory data. + */ + +public final class RelayDescriptorDatabaseImporter { + + /** + * How many records to commit with each database transaction. + */ + private final long autoCommitCount = 500; + + /** + * Keep track of the number of records committed before each transaction + */ + private int rdsCount = 0; + private int resCount = 0; + private int rhsCount = 0; + private int rrsCount = 0; + private int rcsCount = 0; + private int rvsCount = 0; + private int rbsCount = 0; + private int rqsCount = 0; + + /** + * Relay descriptor database connection. + */ + private Connection conn; + + /** + * Prepared statement to check whether any network status consensus + * entries matching a given valid-after time have been imported into the + * database before. + */ + private PreparedStatement psSs; + + /** + * Prepared statement to check whether a given network status consensus + * entry has been imported into the database before. + */ + private PreparedStatement psRs; + + /** + * Prepared statement to check whether a given extra-info descriptor has + * been imported into the database before. + */ + private PreparedStatement psEs; + + /** + * Prepared statement to check whether a given server descriptor has + * been imported into the database before. + */ + private PreparedStatement psDs; + + /** + * Prepared statement to check whether a given network status consensus + * has been imported into the database before. + */ + private PreparedStatement psCs; + + /** + * Prepared statement to check whether a given network status vote has + * been imported into the database before. + */ + private PreparedStatement psVs; + + /** + * Prepared statement to check whether a given conn-bi-direct stats + * string has been imported into the database before. + */ + private PreparedStatement psBs; + + /** + * Prepared statement to check whether a given dirreq stats string has + * been imported into the database before. + */ + private PreparedStatement psQs; + + /** + * Set of dates that have been inserted into the database for being + * included in the next refresh run. + */ + private Set<Long> scheduledUpdates; + + /** + * Prepared statement to insert a date into the database that shall be + * included in the next refresh run. + */ + private PreparedStatement psU; + + /** + * Prepared statement to insert a network status consensus entry into + * the database. + */ + private PreparedStatement psR; + + /** + * Prepared statement to insert a server descriptor into the database. + */ + private PreparedStatement psD; + + /** + * Prepared statement to insert an extra-info descriptor into the + * database. + */ + private PreparedStatement psE; + + /** + * Callable statement to insert the bandwidth history of an extra-info + * descriptor into the database. + */ + private CallableStatement csH; + + /** + * Prepared statement to insert a network status consensus into the + * database. + */ + private PreparedStatement psC; + + /** + * Prepared statement to insert a network status vote into the + * database. + */ + private PreparedStatement psV; + + /** + * Prepared statement to insert a conn-bi-direct stats string into the + * database. + */ + private PreparedStatement psB; + + /** + * Prepared statement to insert a given dirreq stats string into the + * database. + */ + private PreparedStatement psQ; + + /** + * Logger for this class. + */ + private Logger logger; + + /** + * Directory for writing raw import files. + */ + private String rawFilesDirectory; + + /** + * Raw import file containing status entries. + */ + private BufferedWriter statusentryOut; + + /** + * Raw import file containing server descriptors. + */ + private BufferedWriter descriptorOut; + + /** + * Raw import file containing extra-info descriptors. + */ + private BufferedWriter extrainfoOut; + + /** + * Raw import file containing bandwidth histories. + */ + private BufferedWriter bwhistOut; + + /** + * Raw import file containing consensuses. + */ + private BufferedWriter consensusOut; + + /** + * Raw import file containing votes. + */ + private BufferedWriter voteOut; + + /** + * Raw import file containing conn-bi-direct stats strings. + */ + private BufferedWriter connBiDirectOut; + + /** + * Raw import file containing dirreq stats. + */ + private BufferedWriter dirReqOut; + + /** + * Date format to parse timestamps. + */ + private SimpleDateFormat dateTimeFormat; + + /** + * The last valid-after time for which we checked whether they have been + * any network status entries in the database. + */ + private long lastCheckedStatusEntries; + + /** + * Set of fingerprints that we imported for the valid-after time in + * <code>lastCheckedStatusEntries</code>. + */ + private Set<String> insertedStatusEntries; + + /** + * Flag that tells us whether we need to check whether a network status + * entry is already contained in the database or not. + */ + private boolean separateStatusEntryCheckNecessary; + + private boolean importIntoDatabase; + private boolean writeRawImportFiles; + + /** + * Initialize database importer by connecting to the database and + * preparing statements. + */ + public RelayDescriptorDatabaseImporter(String connectionURL, + String rawFilesDirectory) { + + /* Initialize logger. */ + this.logger = Logger.getLogger( + RelayDescriptorDatabaseImporter.class.getName()); + + if (connectionURL != null) { + try { + /* Connect to database. */ + this.conn = DriverManager.getConnection(connectionURL); + + /* Turn autocommit off */ + this.conn.setAutoCommit(false); + + /* Prepare statements. */ + this.psSs = conn.prepareStatement("SELECT COUNT(*) " + + "FROM statusentry WHERE validafter = ?"); + this.psRs = conn.prepareStatement("SELECT COUNT(*) " + + "FROM statusentry WHERE validafter = ? AND descriptor = ?"); + this.psDs = conn.prepareStatement("SELECT COUNT(*) " + + "FROM descriptor WHERE descriptor = ?"); + this.psEs = conn.prepareStatement("SELECT COUNT(*) " + + "FROM extrainfo WHERE extrainfo = ?"); + this.psCs = conn.prepareStatement("SELECT COUNT(*) " + + "FROM consensus WHERE validafter = ?"); + this.psVs = conn.prepareStatement("SELECT COUNT(*) " + + "FROM vote WHERE validafter = ? AND dirsource = ?"); + this.psBs = conn.prepareStatement("SELECT COUNT(*) " + + "FROM connbidirect WHERE source = ? AND statsend = ?"); + this.psQs = conn.prepareStatement("SELECT COUNT(*) " + + "FROM dirreq_stats WHERE source = ? AND statsend = ?"); + this.psR = conn.prepareStatement("INSERT INTO statusentry " + + "(validafter, nickname, fingerprint, descriptor, " + + "published, address, orport, dirport, isauthority, " + + "isbadexit, isbaddirectory, isexit, isfast, isguard, " + + "ishsdir, isnamed, isstable, isrunning, isunnamed, " + + "isvalid, isv2dir, isv3dir, version, bandwidth, ports, " + + "rawdesc) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, " + + "?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"); + this.psD = conn.prepareStatement("INSERT INTO descriptor " + + "(descriptor, nickname, address, orport, dirport, " + + "fingerprint, bandwidthavg, bandwidthburst, " + + "bandwidthobserved, platform, published, uptime, " + + "extrainfo, rawdesc) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, " + + "?, ?, ?, ?)"); + this.psE = conn.prepareStatement("INSERT INTO extrainfo " + + "(extrainfo, nickname, fingerprint, published, rawdesc) " + + "VALUES (?, ?, ?, ?, ?)"); + this.csH = conn.prepareCall("{call insert_bwhist(?, ?, ?, ?, ?, " + + "?)}"); + this.psC = conn.prepareStatement("INSERT INTO consensus " + + "(validafter, rawdesc) VALUES (?, ?)"); + this.psV = conn.prepareStatement("INSERT INTO vote " + + "(validafter, dirsource, rawdesc) VALUES (?, ?, ?)"); + this.psB = conn.prepareStatement("INSERT INTO connbidirect " + + "(source, statsend, seconds, belownum, readnum, writenum, " + + "bothnum) VALUES (?, ?, ?, ?, ?, ?, ?)"); + this.psQ = conn.prepareStatement("INSERT INTO dirreq_stats " + + "(source, statsend, seconds, country, requests) VALUES " + + "(?, ?, ?, ?, ?)"); + this.psU = conn.prepareStatement("INSERT INTO scheduled_updates " + + "(date) VALUES (?)"); + this.scheduledUpdates = new HashSet<Long>(); + this.importIntoDatabase = true; + } catch (SQLException e) { + this.logger.log(Level.WARNING, "Could not connect to database or " + + "prepare statements.", e); + } + + /* Initialize set of fingerprints to remember which status entries + * we already imported. */ + this.insertedStatusEntries = new HashSet<String>(); + } + + /* Remember where we want to write raw import files. */ + if (rawFilesDirectory != null) { + this.rawFilesDirectory = rawFilesDirectory; + this.writeRawImportFiles = true; + } + + /* Initialize date format, so that we can format timestamps. */ + this.dateTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + this.dateTimeFormat.setTimeZone(TimeZone.getTimeZone("UTC")); + } + + private void addDateToScheduledUpdates(long timestamp) + throws SQLException { + if (!this.importIntoDatabase) { + return; + } + long dateMillis = 0L; + try { + dateMillis = this.dateTimeFormat.parse( + this.dateTimeFormat.format(timestamp).substring(0, 10) + + " 00:00:00").getTime(); + } catch (ParseException e) { + this.logger.log(Level.WARNING, "Internal parsing error.", e); + return; + } + if (!this.scheduledUpdates.contains(dateMillis)) { + this.psU.setDate(1, new java.sql.Date(dateMillis)); + this.psU.execute(); + this.scheduledUpdates.add(dateMillis); + } + } + + /** + * Insert network status consensus entry into database. + */ + public void addStatusEntry(long validAfter, String nickname, + String fingerprint, String descriptor, long published, + String address, long orPort, long dirPort, + SortedSet<String> flags, String version, long bandwidth, + String ports, byte[] rawDescriptor) { + if (this.importIntoDatabase) { + try { + this.addDateToScheduledUpdates(validAfter); + Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC")); + Timestamp validAfterTimestamp = new Timestamp(validAfter); + if (lastCheckedStatusEntries != validAfter) { + this.psSs.setTimestamp(1, validAfterTimestamp, cal); + ResultSet rs = psSs.executeQuery(); + rs.next(); + if (rs.getInt(1) == 0) { + separateStatusEntryCheckNecessary = false; + insertedStatusEntries.clear(); + } else { + separateStatusEntryCheckNecessary = true; + } + rs.close(); + lastCheckedStatusEntries = validAfter; + } + boolean alreadyContained = false; + if (separateStatusEntryCheckNecessary || + insertedStatusEntries.contains(fingerprint)) { + this.psRs.setTimestamp(1, validAfterTimestamp, cal); + this.psRs.setString(2, descriptor); + ResultSet rs = psRs.executeQuery(); + rs.next(); + if (rs.getInt(1) > 0) { + alreadyContained = true; + } + rs.close(); + } else { + insertedStatusEntries.add(fingerprint); + } + if (!alreadyContained) { + this.psR.clearParameters(); + this.psR.setTimestamp(1, validAfterTimestamp, cal); + this.psR.setString(2, nickname); + this.psR.setString(3, fingerprint); + this.psR.setString(4, descriptor); + this.psR.setTimestamp(5, new Timestamp(published), cal); + this.psR.setString(6, address); + this.psR.setLong(7, orPort); + this.psR.setLong(8, dirPort); + this.psR.setBoolean(9, flags.contains("Authority")); + this.psR.setBoolean(10, flags.contains("BadExit")); + this.psR.setBoolean(11, flags.contains("BadDirectory")); + this.psR.setBoolean(12, flags.contains("Exit")); + this.psR.setBoolean(13, flags.contains("Fast")); + this.psR.setBoolean(14, flags.contains("Guard")); + this.psR.setBoolean(15, flags.contains("HSDir")); + this.psR.setBoolean(16, flags.contains("Named")); + this.psR.setBoolean(17, flags.contains("Stable")); + this.psR.setBoolean(18, flags.contains("Running")); + this.psR.setBoolean(19, flags.contains("Unnamed")); + this.psR.setBoolean(20, flags.contains("Valid")); + this.psR.setBoolean(21, flags.contains("V2Dir")); + this.psR.setBoolean(22, flags.contains("V3Dir")); + this.psR.setString(23, version); + this.psR.setLong(24, bandwidth); + this.psR.setString(25, ports); + this.psR.setBytes(26, rawDescriptor); + this.psR.executeUpdate(); + rrsCount++; + if (rrsCount % autoCommitCount == 0) { + this.conn.commit(); + } + } + } catch (SQLException e) { + this.logger.log(Level.WARNING, "Could not add network status " + + "consensus entry. We won't make any further SQL requests " + + "in this execution.", e); + this.importIntoDatabase = false; + } + } + if (this.writeRawImportFiles) { + try { + if (this.statusentryOut == null) { + new File(rawFilesDirectory).mkdirs(); + this.statusentryOut = new BufferedWriter(new FileWriter( + rawFilesDirectory + "/statusentry.sql")); + this.statusentryOut.write(" COPY statusentry (validafter, " + + "nickname, fingerprint, descriptor, published, address, " + + "orport, dirport, isauthority, isbadExit, " + + "isbaddirectory, isexit, isfast, isguard, ishsdir, " + + "isnamed, isstable, isrunning, isunnamed, isvalid, " + + "isv2dir, isv3dir, version, bandwidth, ports, rawdesc) " + + "FROM stdin;\n"); + } + this.statusentryOut.write( + this.dateTimeFormat.format(validAfter) + "\t" + nickname + + "\t" + fingerprint.toLowerCase() + "\t" + + descriptor.toLowerCase() + "\t" + + this.dateTimeFormat.format(published) + "\t" + address + + "\t" + orPort + "\t" + dirPort + "\t" + + (flags.contains("Authority") ? "t" : "f") + "\t" + + (flags.contains("BadExit") ? "t" : "f") + "\t" + + (flags.contains("BadDirectory") ? "t" : "f") + "\t" + + (flags.contains("Exit") ? "t" : "f") + "\t" + + (flags.contains("Fast") ? "t" : "f") + "\t" + + (flags.contains("Guard") ? "t" : "f") + "\t" + + (flags.contains("HSDir") ? "t" : "f") + "\t" + + (flags.contains("Named") ? "t" : "f") + "\t" + + (flags.contains("Stable") ? "t" : "f") + "\t" + + (flags.contains("Running") ? "t" : "f") + "\t" + + (flags.contains("Unnamed") ? "t" : "f") + "\t" + + (flags.contains("Valid") ? "t" : "f") + "\t" + + (flags.contains("V2Dir") ? "t" : "f") + "\t" + + (flags.contains("V3Dir") ? "t" : "f") + "\t" + + (version != null ? version : "\N") + "\t" + + (bandwidth >= 0 ? bandwidth : "\N") + "\t" + + (ports != null ? ports : "\N") + "\t"); + this.statusentryOut.write(PGbytea.toPGString(rawDescriptor). + replaceAll("\\", "\\\\") + "\n"); + } catch (SQLException e) { + this.logger.log(Level.WARNING, "Could not write network status " + + "consensus entry to raw database import file. We won't " + + "make any further attempts to write raw import files in " + + "this execution.", e); + this.writeRawImportFiles = false; + } catch (IOException e) { + this.logger.log(Level.WARNING, "Could not write network status " + + "consensus entry to raw database import file. We won't " + + "make any further attempts to write raw import files in " + + "this execution.", e); + this.writeRawImportFiles = false; + } + } + } + + /** + * Insert server descriptor into database. + */ + public void addServerDescriptor(String descriptor, String nickname, + String address, int orPort, int dirPort, String relayIdentifier, + long bandwidthAvg, long bandwidthBurst, long bandwidthObserved, + String platform, long published, long uptime, + String extraInfoDigest, byte[] rawDescriptor) { + if (this.importIntoDatabase) { + try { + this.addDateToScheduledUpdates(published); + this.addDateToScheduledUpdates( + published + 24L * 60L * 60L * 1000L); + Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC")); + this.psDs.setString(1, descriptor); + ResultSet rs = psDs.executeQuery(); + rs.next(); + if (rs.getInt(1) == 0) { + this.psD.clearParameters(); + this.psD.setString(1, descriptor); + this.psD.setString(2, nickname); + this.psD.setString(3, address); + this.psD.setInt(4, orPort); + this.psD.setInt(5, dirPort); + this.psD.setString(6, relayIdentifier); + this.psD.setLong(7, bandwidthAvg); + this.psD.setLong(8, bandwidthBurst); + this.psD.setLong(9, bandwidthObserved); + this.psD.setString(10, new String(platform.getBytes(), + "US-ASCII")); + this.psD.setTimestamp(11, new Timestamp(published), cal); + this.psD.setLong(12, uptime); + this.psD.setString(13, extraInfoDigest); + this.psD.setBytes(14, rawDescriptor); + this.psD.executeUpdate(); + rdsCount++; + if (rdsCount % autoCommitCount == 0) { + this.conn.commit(); + } + } + } catch (UnsupportedEncodingException e) { + // US-ASCII is supported for sure + } catch (SQLException e) { + this.logger.log(Level.WARNING, "Could not add server " + + "descriptor. We won't make any further SQL requests in " + + "this execution.", e); + this.importIntoDatabase = false; + } + } + if (this.writeRawImportFiles) { + try { + if (this.descriptorOut == null) { + new File(rawFilesDirectory).mkdirs(); + this.descriptorOut = new BufferedWriter(new FileWriter( + rawFilesDirectory + "/descriptor.sql")); + this.descriptorOut.write(" COPY descriptor (descriptor, " + + "nickname, address, orport, dirport, fingerprint, " + + "bandwidthavg, bandwidthburst, bandwidthobserved, " + + "platform, published, uptime, extrainfo, rawdesc) FROM " + + "stdin;\n"); + } + this.descriptorOut.write(descriptor.toLowerCase() + "\t" + + nickname + "\t" + address + "\t" + orPort + "\t" + dirPort + + "\t" + relayIdentifier + "\t" + bandwidthAvg + "\t" + + bandwidthBurst + "\t" + bandwidthObserved + "\t" + + (platform != null && platform.length() > 0 + ? new String(platform.getBytes(), "US-ASCII") : "\N") + + "\t" + this.dateTimeFormat.format(published) + "\t" + + (uptime >= 0 ? uptime : "\N") + "\t" + + (extraInfoDigest != null ? extraInfoDigest : "\N") + + "\t"); + this.descriptorOut.write(PGbytea.toPGString(rawDescriptor). + replaceAll("\\", "\\\\") + "\n"); + } catch (UnsupportedEncodingException e) { + // US-ASCII is supported for sure + } catch (SQLException e) { + this.logger.log(Level.WARNING, "Could not write server " + + "descriptor to raw database import file. We won't make " + + "any further attempts to write raw import files in this " + + "execution.", e); + this.writeRawImportFiles = false; + } catch (IOException e) { + this.logger.log(Level.WARNING, "Could not write server " + + "descriptor to raw database import file. We won't make " + + "any further attempts to write raw import files in this " + + "execution.", e); + this.writeRawImportFiles = false; + } + } + } + + /** + * Insert extra-info descriptor into database. + */ + public void addExtraInfoDescriptor(String extraInfoDigest, + String nickname, String fingerprint, long published, + byte[] rawDescriptor, List<String> bandwidthHistoryLines) { + if (this.importIntoDatabase) { + try { + Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC")); + this.psEs.setString(1, extraInfoDigest); + ResultSet rs = psEs.executeQuery(); + rs.next(); + if (rs.getInt(1) == 0) { + this.psE.clearParameters(); + this.psE.setString(1, extraInfoDigest); + this.psE.setString(2, nickname); + this.psE.setString(3, fingerprint); + this.psE.setTimestamp(4, new Timestamp(published), cal); + this.psE.setBytes(5, rawDescriptor); + this.psE.executeUpdate(); + resCount++; + if (resCount % autoCommitCount == 0) { + this.conn.commit(); + } + } + } catch (SQLException e) { + this.logger.log(Level.WARNING, "Could not add extra-info " + + "descriptor. We won't make any further SQL requests in " + + "this execution.", e); + this.importIntoDatabase = false; + } + } + if (this.writeRawImportFiles) { + try { + if (this.extrainfoOut == null) { + new File(rawFilesDirectory).mkdirs(); + this.extrainfoOut = new BufferedWriter(new FileWriter( + rawFilesDirectory + "/extrainfo.sql")); + this.extrainfoOut.write(" COPY extrainfo (extrainfo, nickname, " + + "fingerprint, published, rawdesc) FROM stdin;\n"); + } + this.extrainfoOut.write(extraInfoDigest.toLowerCase() + "\t" + + nickname + "\t" + fingerprint.toLowerCase() + "\t" + + this.dateTimeFormat.format(published) + "\t"); + this.extrainfoOut.write(PGbytea.toPGString(rawDescriptor). + replaceAll("\\", "\\\\") + "\n"); + } catch (IOException e) { + this.logger.log(Level.WARNING, "Could not write extra-info " + + "descriptor to raw database import file. We won't make " + + "any further attempts to write raw import files in this " + + "execution.", e); + this.writeRawImportFiles = false; + } catch (SQLException e) { + this.logger.log(Level.WARNING, "Could not write extra-info " + + "descriptor to raw database import file. We won't make " + + "any further attempts to write raw import files in this " + + "execution.", e); + this.writeRawImportFiles = false; + } + } + if (!bandwidthHistoryLines.isEmpty()) { + this.addBandwidthHistory(fingerprint.toLowerCase(), published, + bandwidthHistoryLines); + } + } + + private static class BigIntArray implements java.sql.Array { + + private final String stringValue; + + public BigIntArray(long[] array, int offset) { + if (array == null) { + this.stringValue = "[-1:-1]={0}"; + } else { + StringBuilder sb = new StringBuilder("[" + offset + ":" + + (offset + array.length - 1) + "]={"); + for (int i = 0; i < array.length; i++) { + sb.append((i > 0 ? "," : "") + array[i]); + } + sb.append('}'); + this.stringValue = sb.toString(); + } + } + + public String toString() { + return stringValue; + } + + public String getBaseTypeName() { + return "int8"; + } + + /* The other methods are never called; no need to implement them. */ + public void free() { + throw new UnsupportedOperationException(); + } + public Object getArray() { + throw new UnsupportedOperationException(); + } + public Object getArray(long index, int count) { + throw new UnsupportedOperationException(); + } + public Object getArray(long index, int count, + Map<String, Class<?>> map) { + throw new UnsupportedOperationException(); + } + public Object getArray(Map<String, Class<?>> map) { + throw new UnsupportedOperationException(); + } + public int getBaseType() { + throw new UnsupportedOperationException(); + } + public ResultSet getResultSet() { + throw new UnsupportedOperationException(); + } + public ResultSet getResultSet(long index, int count) { + throw new UnsupportedOperationException(); + } + public ResultSet getResultSet(long index, int count, + Map<String, Class<?>> map) { + throw new UnsupportedOperationException(); + } + public ResultSet getResultSet(Map<String, Class<?>> map) { + throw new UnsupportedOperationException(); + } + } + + public void addBandwidthHistory(String fingerprint, long published, + List<String> bandwidthHistoryStrings) { + + /* Split history lines by date and rewrite them so that the date + * comes first. */ + SortedSet<String> historyLinesByDate = new TreeSet<String>(); + for (String bandwidthHistoryString : bandwidthHistoryStrings) { + String[] parts = bandwidthHistoryString.split(" "); + if (parts.length != 6) { + this.logger.finer("Bandwidth history line does not have expected " + + "number of elements. Ignoring this line."); + continue; + } + long intervalLength = 0L; + try { + intervalLength = Long.parseLong(parts[3].substring(1)); + } catch (NumberFormatException e) { + this.logger.fine("Bandwidth history line does not have valid " + + "interval length '" + parts[3] + " " + parts[4] + "'. " + + "Ignoring this line."); + continue; + } + if (intervalLength != 900L) { + this.logger.fine("Bandwidth history line does not consist of " + + "15-minute intervals. Ignoring this line."); + continue; + } + String type = parts[0]; + String intervalEndTime = parts[1] + " " + parts[2]; + long intervalEnd, dateStart; + try { + intervalEnd = dateTimeFormat.parse(intervalEndTime).getTime(); + dateStart = dateTimeFormat.parse(parts[1] + " 00:00:00"). + getTime(); + } catch (ParseException e) { + this.logger.fine("Parse exception while parsing timestamp in " + + "bandwidth history line. Ignoring this line."); + continue; + } + if (Math.abs(published - intervalEnd) > + 7L * 24L * 60L * 60L * 1000L) { + this.logger.fine("Extra-info descriptor publication time " + + dateTimeFormat.format(published) + " and last interval " + + "time " + intervalEndTime + " in " + type + " line differ " + + "by more than 7 days! Not adding this line!"); + continue; + } + long currentIntervalEnd = intervalEnd; + StringBuilder sb = new StringBuilder(); + String[] values = parts[5].split(","); + SortedSet<String> newHistoryLines = new TreeSet<String>(); + try { + for (int i = values.length - 1; i >= -1; i--) { + if (i == -1 || currentIntervalEnd < dateStart) { + sb.insert(0, intervalEndTime + " " + type + " (" + + intervalLength + " s) "); + sb.setLength(sb.length() - 1); + String historyLine = sb.toString(); + newHistoryLines.add(historyLine); + sb = new StringBuilder(); + dateStart -= 24L * 60L * 60L * 1000L; + intervalEndTime = dateTimeFormat.format(currentIntervalEnd); + } + if (i == -1) { + break; + } + Long.parseLong(values[i]); + sb.insert(0, values[i] + ","); + currentIntervalEnd -= intervalLength * 1000L; + } + } catch (NumberFormatException e) { + this.logger.fine("Number format exception while parsing " + + "bandwidth history line. Ignoring this line."); + continue; + } + historyLinesByDate.addAll(newHistoryLines); + } + + /* Add split history lines to database. */ + String lastDate = null; + historyLinesByDate.add("EOL"); + long[] readArray = null, writtenArray = null, dirreadArray = null, + dirwrittenArray = null; + int readOffset = 0, writtenOffset = 0, dirreadOffset = 0, + dirwrittenOffset = 0; + for (String historyLine : historyLinesByDate) { + String[] parts = historyLine.split(" "); + String currentDate = parts[0]; + if (lastDate != null && (historyLine.equals("EOL") || + !currentDate.equals(lastDate))) { + BigIntArray readIntArray = new BigIntArray(readArray, + readOffset); + BigIntArray writtenIntArray = new BigIntArray(writtenArray, + writtenOffset); + BigIntArray dirreadIntArray = new BigIntArray(dirreadArray, + dirreadOffset); + BigIntArray dirwrittenIntArray = new BigIntArray(dirwrittenArray, + dirwrittenOffset); + if (this.importIntoDatabase) { + try { + long dateMillis = dateTimeFormat.parse(lastDate + + " 00:00:00").getTime(); + this.addDateToScheduledUpdates(dateMillis); + this.csH.setString(1, fingerprint); + this.csH.setDate(2, new java.sql.Date(dateMillis)); + this.csH.setArray(3, readIntArray); + this.csH.setArray(4, writtenIntArray); + this.csH.setArray(5, dirreadIntArray); + this.csH.setArray(6, dirwrittenIntArray); + this.csH.addBatch(); + rhsCount++; + if (rhsCount % autoCommitCount == 0) { + this.csH.executeBatch(); + } + } catch (SQLException e) { + this.logger.log(Level.WARNING, "Could not insert bandwidth " + + "history line into database. We won't make any " + + "further SQL requests in this execution.", e); + this.importIntoDatabase = false; + } catch (ParseException e) { + this.logger.log(Level.WARNING, "Could not insert bandwidth " + + "history line into database. We won't make any " + + "further SQL requests in this execution.", e); + this.importIntoDatabase = false; + } + } + if (this.writeRawImportFiles) { + try { + if (this.bwhistOut == null) { + new File(rawFilesDirectory).mkdirs(); + this.bwhistOut = new BufferedWriter(new FileWriter( + rawFilesDirectory + "/bwhist.sql")); + } + this.bwhistOut.write("SELECT insert_bwhist('" + fingerprint + + "','" + lastDate + "','" + readIntArray.toString() + + "','" + writtenIntArray.toString() + "','" + + dirreadIntArray.toString() + "','" + + dirwrittenIntArray.toString() + "');\n"); + } catch (IOException e) { + this.logger.log(Level.WARNING, "Could not write bandwidth " + + "history to raw database import file. We won't make " + + "any further attempts to write raw import files in " + + "this execution.", e); + this.writeRawImportFiles = false; + } + } + readArray = writtenArray = dirreadArray = dirwrittenArray = null; + } + if (historyLine.equals("EOL")) { + break; + } + long lastIntervalTime; + try { + lastIntervalTime = dateTimeFormat.parse(parts[0] + " " + + parts[1]).getTime() - dateTimeFormat.parse(parts[0] + + " 00:00:00").getTime(); + } catch (ParseException e) { + continue; + } + String[] stringValues = parts[5].split(","); + long[] longValues = new long[stringValues.length]; + for (int i = 0; i < longValues.length; i++) { + longValues[i] = Long.parseLong(stringValues[i]); + } + + int offset = (int) (lastIntervalTime / (15L * 60L * 1000L)) + - longValues.length + 1; + String type = parts[2]; + if (type.equals("read-history")) { + readArray = longValues; + readOffset = offset; + } else if (type.equals("write-history")) { + writtenArray = longValues; + writtenOffset = offset; + } else if (type.equals("dirreq-read-history")) { + dirreadArray = longValues; + dirreadOffset = offset; + } else if (type.equals("dirreq-write-history")) { + dirwrittenArray = longValues; + dirwrittenOffset = offset; + } + lastDate = currentDate; + } + } + + /** + * Insert network status consensus into database. + */ + public void addConsensus(long validAfter, byte[] rawDescriptor) { + if (this.importIntoDatabase) { + try { + this.addDateToScheduledUpdates(validAfter); + Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC")); + Timestamp validAfterTimestamp = new Timestamp(validAfter); + this.psCs.setTimestamp(1, validAfterTimestamp, cal); + ResultSet rs = psCs.executeQuery(); + rs.next(); + if (rs.getInt(1) == 0) { + this.psC.clearParameters(); + this.psC.setTimestamp(1, validAfterTimestamp, cal); + this.psC.setBytes(2, rawDescriptor); + this.psC.executeUpdate(); + rcsCount++; + if (rcsCount % autoCommitCount == 0) { + this.conn.commit(); + } + } + } catch (SQLException e) { + this.logger.log(Level.WARNING, "Could not add network status " + + "consensus. We won't make any further SQL requests in " + + "this execution.", e); + this.importIntoDatabase = false; + } + } + if (this.writeRawImportFiles) { + try { + if (this.consensusOut == null) { + new File(rawFilesDirectory).mkdirs(); + this.consensusOut = new BufferedWriter(new FileWriter( + rawFilesDirectory + "/consensus.sql")); + this.consensusOut.write(" COPY consensus (validafter, rawdesc) " + + "FROM stdin;\n"); + } + String validAfterString = this.dateTimeFormat.format(validAfter); + this.consensusOut.write(validAfterString + "\t"); + this.consensusOut.write(PGbytea.toPGString(rawDescriptor). + replaceAll("\\", "\\\\") + "\n"); + } catch (SQLException e) { + this.logger.log(Level.WARNING, "Could not write network status " + + "consensus to raw database import file. We won't make " + + "any further attempts to write raw import files in this " + + "execution.", e); + this.writeRawImportFiles = false; + } catch (IOException e) { + this.logger.log(Level.WARNING, "Could not write network status " + + "consensus to raw database import file. We won't make " + + "any further attempts to write raw import files in this " + + "execution.", e); + this.writeRawImportFiles = false; + } + } + } + + /** + * Insert network status vote into database. + */ + public void addVote(long validAfter, String dirSource, + byte[] rawDescriptor) { + if (this.importIntoDatabase) { + try { + Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC")); + Timestamp validAfterTimestamp = new Timestamp(validAfter); + this.psVs.setTimestamp(1, validAfterTimestamp, cal); + this.psVs.setString(2, dirSource); + ResultSet rs = psVs.executeQuery(); + rs.next(); + if (rs.getInt(1) == 0) { + this.psV.clearParameters(); + this.psV.setTimestamp(1, validAfterTimestamp, cal); + this.psV.setString(2, dirSource); + this.psV.setBytes(3, rawDescriptor); + this.psV.executeUpdate(); + rvsCount++; + if (rvsCount % autoCommitCount == 0) { + this.conn.commit(); + } + } + } catch (SQLException e) { + this.logger.log(Level.WARNING, "Could not add network status " + + "vote. We won't make any further SQL requests in this " + + "execution.", e); + this.importIntoDatabase = false; + } + } + if (this.writeRawImportFiles) { + try { + if (this.voteOut == null) { + new File(rawFilesDirectory).mkdirs(); + this.voteOut = new BufferedWriter(new FileWriter( + rawFilesDirectory + "/vote.sql")); + this.voteOut.write(" COPY vote (validafter, dirsource, " + + "rawdesc) FROM stdin;\n"); + } + String validAfterString = this.dateTimeFormat.format(validAfter); + this.voteOut.write(validAfterString + "\t" + dirSource + "\t"); + this.voteOut.write(PGbytea.toPGString(rawDescriptor). + replaceAll("\\", "\\\\") + "\n"); + } catch (SQLException e) { + this.logger.log(Level.WARNING, "Could not write network status " + + "vote to raw database import file. We won't make any " + + "further attempts to write raw import files in this " + + "execution.", e); + this.writeRawImportFiles = false; + } catch (IOException e) { + this.logger.log(Level.WARNING, "Could not write network status " + + "vote to raw database import file. We won't make any " + + "further attempts to write raw import files in this " + + "execution.", e); + this.writeRawImportFiles = false; + } + } + } + + /** + * Insert a conn-bi-direct stats string into the database. + */ + public void addConnBiDirect(String source, String statsEnd, + long seconds, long below, long read, long write, long both) { + long statsEndTime = 0L; + try { + statsEndTime = this.dateTimeFormat.parse(statsEnd).getTime(); + } catch (ParseException e) { + this.logger.log(Level.WARNING, "Could not add conn-bi-direct " + + "stats string with interval ending '" + statsEnd + "'.", e); + return; + } + if (this.importIntoDatabase) { + try { + this.addDateToScheduledUpdates(statsEndTime); + Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC")); + Timestamp statsEndTimestamp = new Timestamp(statsEndTime); + this.psBs.setString(1, source); + this.psBs.setTimestamp(2, statsEndTimestamp, cal); + ResultSet rs = psBs.executeQuery(); + rs.next(); + if (rs.getInt(1) == 0) { + this.psB.clearParameters(); + this.psB.setString(1, source); + this.psB.setTimestamp(2, statsEndTimestamp, cal); + this.psB.setLong(3, seconds); + this.psB.setLong(4, below); + this.psB.setLong(5, read); + this.psB.setLong(6, write); + this.psB.setLong(7, both); + this.psB.executeUpdate(); + rbsCount++; + if (rbsCount % autoCommitCount == 0) { + this.conn.commit(); + } + } + } catch (SQLException e) { + this.logger.log(Level.WARNING, "Could not add conn-bi-direct " + + "stats string. We won't make any further SQL requests in " + + "this execution.", e); + this.importIntoDatabase = false; + } + } + if (this.writeRawImportFiles) { + try { + if (this.connBiDirectOut == null) { + new File(rawFilesDirectory).mkdirs(); + this.connBiDirectOut = new BufferedWriter(new FileWriter( + rawFilesDirectory + "/connbidirect.sql")); + this.connBiDirectOut.write(" COPY connbidirect (source, " + + "statsend, seconds, belownum, readnum, writenum, " + + "bothnum) FROM stdin;\n"); + } + this.connBiDirectOut.write(source + "\t" + statsEnd + "\t" + + seconds + "\t" + below + "\t" + read + "\t" + write + "\t" + + both + "\n"); + } catch (IOException e) { + this.logger.log(Level.WARNING, "Could not write conn-bi-direct " + + "stats string to raw database import file. We won't make " + + "any further attempts to write raw import files in this " + + "execution.", e); + this.writeRawImportFiles = false; + } + } + } + + /** + * Adds observations on the number of directory requests by country as + * seen on a directory at a given date to the database. + */ + public void addDirReqStats(String source, String statsEnd, long seconds, + Map<String, String> dirReqsPerCountry) { + long statsEndTime = 0L; + try { + statsEndTime = this.dateTimeFormat.parse(statsEnd).getTime(); + } catch (ParseException e) { + this.logger.log(Level.WARNING, "Could not add dirreq stats with " + + "interval ending '" + statsEnd + "'.", e); + return; + } + if (this.importIntoDatabase) { + try { + this.addDateToScheduledUpdates(statsEndTime); + Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC")); + Timestamp statsEndTimestamp = new Timestamp(statsEndTime); + this.psQs.setString(1, source); + this.psQs.setTimestamp(2, statsEndTimestamp, cal); + ResultSet rs = psQs.executeQuery(); + rs.next(); + if (rs.getInt(1) == 0) { + for (Map.Entry<String, String> e : + dirReqsPerCountry.entrySet()) { + this.psQ.clearParameters(); + this.psQ.setString(1, source); + this.psQ.setTimestamp(2, statsEndTimestamp, cal); + this.psQ.setLong(3, seconds); + this.psQ.setString(4, e.getKey()); + this.psQ.setLong(5, Long.parseLong(e.getValue())); + this.psQ.executeUpdate(); + rqsCount++; + if (rqsCount % autoCommitCount == 0) { + this.conn.commit(); + } + } + } + } catch (SQLException e) { + this.logger.log(Level.WARNING, "Could not add dirreq stats. We " + + "won't make any further SQL requests in this execution.", + e); + this.importIntoDatabase = false; + } + } + if (this.writeRawImportFiles) { + try { + if (this.dirReqOut == null) { + new File(rawFilesDirectory).mkdirs(); + this.dirReqOut = new BufferedWriter(new FileWriter( + rawFilesDirectory + "/dirreq_stats.sql")); + this.dirReqOut.write(" COPY dirreq_stats (source, statsend, " + + "seconds, country, requests) FROM stdin;\n"); + } + for (Map.Entry<String, String> e : + dirReqsPerCountry.entrySet()) { + this.dirReqOut.write(source + "\t" + statsEnd + "\t" + seconds + + "\t" + e.getKey() + "\t" + e.getValue() + "\n"); + } + } catch (IOException e) { + this.logger.log(Level.WARNING, "Could not write dirreq stats to " + + "raw database import file. We won't make any further " + + "attempts to write raw import files in this execution.", e); + this.writeRawImportFiles = false; + } + } + } + + /** + * Close the relay descriptor database connection. + */ + public void closeConnection() { + + /* Log stats about imported descriptors. */ + this.logger.info(String.format("Finished importing relay " + + "descriptors: %d consensuses, %d network status entries, %d " + + "votes, %d server descriptors, %d extra-info descriptors, %d " + + "bandwidth history elements, %d dirreq stats elements, and %d " + + "conn-bi-direct stats lines", rcsCount, rrsCount, rvsCount, + rdsCount, resCount, rhsCount, rqsCount, rbsCount)); + + /* Insert scheduled updates a second time, just in case the refresh + * run has started since inserting them the first time in which case + * it will miss the data inserted afterwards. We cannot, however, + * insert them only now, because if a Java execution fails at a random + * point, we might have added data, but not the corresponding dates to + * update statistics. */ + if (this.importIntoDatabase) { + try { + for (long dateMillis : this.scheduledUpdates) { + this.psU.setDate(1, new java.sql.Date(dateMillis)); + this.psU.execute(); + } + } catch (SQLException e) { + this.logger.log(Level.WARNING, "Could not add scheduled dates " + + "for the next refresh run.", e); + } + } + + /* Commit any stragglers before closing. */ + if (this.conn != null) { + try { + this.csH.executeBatch(); + + this.conn.commit(); + } catch (SQLException e) { + this.logger.log(Level.WARNING, "Could not commit final records to " + + "database", e); + } + try { + this.conn.close(); + } catch (SQLException e) { + this.logger.log(Level.WARNING, "Could not close database " + + "connection.", e); + } + } + + /* Close raw import files. */ + try { + if (this.statusentryOut != null) { + this.statusentryOut.write("\.\n"); + this.statusentryOut.close(); + } + if (this.descriptorOut != null) { + this.descriptorOut.write("\.\n"); + this.descriptorOut.close(); + } + if (this.extrainfoOut != null) { + this.extrainfoOut.write("\.\n"); + this.extrainfoOut.close(); + } + if (this.bwhistOut != null) { + this.bwhistOut.write("\.\n"); + this.bwhistOut.close(); + } + if (this.consensusOut != null) { + this.consensusOut.write("\.\n"); + this.consensusOut.close(); + } + if (this.voteOut != null) { + this.voteOut.write("\.\n"); + this.voteOut.close(); + } + if (this.connBiDirectOut != null) { + this.connBiDirectOut.write("\.\n"); + this.connBiDirectOut.close(); + } + } catch (IOException e) { + this.logger.log(Level.WARNING, "Could not close one or more raw " + + "database import files.", e); + } + } +} + diff --git a/src/org/torproject/ernie/cron/RelayDescriptorParser.java b/src/org/torproject/ernie/cron/RelayDescriptorParser.java index 24b512b..d02022a 100644 --- a/src/org/torproject/ernie/cron/RelayDescriptorParser.java +++ b/src/org/torproject/ernie/cron/RelayDescriptorParser.java @@ -17,6 +17,12 @@ import org.apache.commons.codec.binary.*; */ public class RelayDescriptorParser {
+ /** + * Relay descriptor database importer that stores relay descriptor + * contents for later evaluation. + */ + private RelayDescriptorDatabaseImporter rddi; + private ConsensusHealthChecker chc;
/** @@ -29,8 +35,10 @@ public class RelayDescriptorParser { /** * Initializes this class. */ - public RelayDescriptorParser(ConsensusHealthChecker chc) { + public RelayDescriptorParser(ConsensusHealthChecker chc, + RelayDescriptorDatabaseImporter rddi) { this.chc = chc; + this.rddi = rddi;
/* Initialize logger. */ this.logger = Logger.getLogger(RelayDescriptorParser.class.getName()); @@ -60,31 +68,256 @@ public class RelayDescriptorParser { // time to see when we switch from hourly to half-hourly // consensuses boolean isConsensus = true; - String validAfterTime = null; - String dirSource = null; + String validAfterTime = null, nickname = null, + relayIdentity = null, serverDesc = null, version = null, + ports = null; + String fingerprint = null, dirSource = null, address = null; + long validAfter = -1L, published = -1L, bandwidth = -1L, + orPort = 0L, dirPort = 0L; + SortedSet<String> relayFlags = null; + StringBuilder rawStatusEntry = null; while ((line = br.readLine()) != null) { if (line.equals("vote-status vote")) { isConsensus = false; } else if (line.startsWith("valid-after ")) { validAfterTime = line.substring("valid-after ".length()); + validAfter = parseFormat.parse(validAfterTime).getTime(); } else if (line.startsWith("dir-source ")) { dirSource = line.split(" ")[2]; - break; + } else if (line.startsWith("fingerprint ")) { + fingerprint = line.split(" ")[1]; + } else if (line.startsWith("r ")) { + if (isConsensus && relayIdentity != null && + this.rddi != null) { + byte[] rawDescriptor = rawStatusEntry.toString().getBytes(); + this.rddi.addStatusEntry(validAfter, nickname, + relayIdentity, serverDesc, published, address, orPort, + dirPort, relayFlags, version, bandwidth, ports, + rawDescriptor); + relayFlags = null; + version = null; + bandwidth = -1L; + ports = null; + } + rawStatusEntry = new StringBuilder(line + "\n"); + String[] parts = line.split(" "); + if (parts.length < 9) { + this.logger.log(Level.WARNING, "Could not parse r line '" + + line + "' in descriptor. Skipping."); + break; + } + String publishedTime = parts[4] + " " + parts[5]; + nickname = parts[1]; + relayIdentity = Hex.encodeHexString( + Base64.decodeBase64(parts[2] + "=")). + toLowerCase(); + serverDesc = Hex.encodeHexString(Base64.decodeBase64( + parts[3] + "=")).toLowerCase(); + published = parseFormat.parse(parts[4] + " " + parts[5]). + getTime(); + address = parts[6]; + orPort = Long.parseLong(parts[7]); + dirPort = Long.parseLong(parts[8]); + } else if (line.startsWith("s ") || line.equals("s")) { + rawStatusEntry.append(line + "\n"); + relayFlags = new TreeSet<String>(); + if (line.length() > 2) { + for (String flag : line.substring(2).split(" ")) { + relayFlags.add(flag); + } + } + } else if (line.startsWith("v ")) { + rawStatusEntry.append(line + "\n"); + version = line.substring(2); + } else if (line.startsWith("w ")) { + rawStatusEntry.append(line + "\n"); + String[] parts = line.split(" "); + for (String part : parts) { + if (part.startsWith("Bandwidth=")) { + bandwidth = Long.parseLong(part.substring( + "Bandwidth=".length())); + } + } + } else if (line.startsWith("p ")) { + rawStatusEntry.append(line + "\n"); + ports = line.substring(2); } } if (isConsensus) { if (this.chc != null) { this.chc.processConsensus(validAfterTime, data); } + if (this.rddi != null) { + this.rddi.addConsensus(validAfter, data); + if (relayIdentity != null) { + byte[] rawDescriptor = rawStatusEntry.toString().getBytes(); + this.rddi.addStatusEntry(validAfter, nickname, + relayIdentity, serverDesc, published, address, orPort, + dirPort, relayFlags, version, bandwidth, ports, + rawDescriptor); + } + } } else { if (this.chc != null) { this.chc.processVote(validAfterTime, dirSource, data); } + if (this.rddi != null) { + this.rddi.addVote(validAfter, dirSource, data); + } + } + } else if (line.startsWith("router ")) { + String platformLine = null, publishedLine = null, + bandwidthLine = null, extraInfoDigest = null, + relayIdentifier = null; + String[] parts = line.split(" "); + String nickname = parts[1]; + String address = parts[2]; + int orPort = Integer.parseInt(parts[3]); + int dirPort = Integer.parseInt(parts[4]); + long published = -1L, uptime = -1L; + while ((line = br.readLine()) != null) { + if (line.startsWith("platform ")) { + platformLine = line; + } else if (line.startsWith("published ")) { + String publishedTime = line.substring("published ".length()); + published = parseFormat.parse(publishedTime).getTime(); + } else if (line.startsWith("opt fingerprint") || + line.startsWith("fingerprint")) { + relayIdentifier = line.substring(line.startsWith("opt ") ? + "opt fingerprint".length() : "fingerprint".length()). + replaceAll(" ", "").toLowerCase(); + } else if (line.startsWith("bandwidth ")) { + bandwidthLine = line; + } else if (line.startsWith("opt extra-info-digest ") || + line.startsWith("extra-info-digest ")) { + extraInfoDigest = line.startsWith("opt ") ? + line.split(" ")[2].toLowerCase() : + line.split(" ")[1].toLowerCase(); + } else if (line.startsWith("uptime ")) { + uptime = Long.parseLong(line.substring("uptime ".length())); + } + } + String ascii = new String(data, "US-ASCII"); + String startToken = "router "; + String sigToken = "\nrouter-signature\n"; + int start = ascii.indexOf(startToken); + int sig = ascii.indexOf(sigToken) + sigToken.length(); + String digest = null; + if (start >= 0 || sig >= 0 || sig > start) { + byte[] forDigest = new byte[sig - start]; + System.arraycopy(data, start, forDigest, 0, sig - start); + digest = DigestUtils.shaHex(forDigest); + } + if (this.rddi != null && digest != null) { + String[] bwParts = bandwidthLine.split(" "); + long bandwidthAvg = Long.parseLong(bwParts[1]); + long bandwidthBurst = Long.parseLong(bwParts[2]); + long bandwidthObserved = Long.parseLong(bwParts[3]); + String platform = platformLine.substring("platform ".length()); + this.rddi.addServerDescriptor(digest, nickname, address, orPort, + dirPort, relayIdentifier, bandwidthAvg, bandwidthBurst, + bandwidthObserved, platform, published, uptime, + extraInfoDigest, data); + } + } else if (line.startsWith("extra-info ")) { + String nickname = line.split(" ")[1]; + long published = -1L; + String dir = line.split(" ")[2]; + String statsEnd = null; + long seconds = -1L; + List<String> bandwidthHistory = new ArrayList<String>(); + boolean skip = false; + while ((line = br.readLine()) != null) { + if (line.startsWith("published ")) { + String publishedTime = line.substring("published ".length()); + published = parseFormat.parse(publishedTime).getTime(); + } else if (line.startsWith("read-history ") || + line.startsWith("write-history ") || + line.startsWith("dirreq-read-history ") || + line.startsWith("dirreq-write-history ")) { + bandwidthHistory.add(line); + } else if (line.startsWith("dirreq-stats-end ")) { + String[] parts = line.split(" "); + if (parts.length < 5) { + this.logger.warning("Could not parse dirreq-stats-end " + + "line '" + line + "' in descriptor. Skipping."); + break; + } + statsEnd = parts[1] + " " + parts[2]; + seconds = Long.parseLong(parts[3].substring(1)); + } else if (line.startsWith("dirreq-v3-reqs ") + && line.length() > "dirreq-v3-reqs ".length()) { + if (this.rddi != null) { + try { + int allUsers = 0; + Map<String, String> obs = new HashMap<String, String>(); + String[] parts = line.substring("dirreq-v3-reqs ". + length()).split(","); + for (String p : parts) { + String country = p.substring(0, 2); + int users = Integer.parseInt(p.substring(3)) - 4; + allUsers += users; + obs.put(country, "" + users); + } + obs.put("zy", "" + allUsers); + this.rddi.addDirReqStats(dir, statsEnd, seconds, obs); + } catch (NumberFormatException e) { + this.logger.log(Level.WARNING, "Could not parse " + + "dirreq-v3-reqs line '" + line + "' in descriptor. " + + "Skipping.", e); + break; + } + } + } else if (line.startsWith("conn-bi-direct ")) { + if (this.rddi != null) { + String[] parts = line.split(" "); + if (parts.length == 6 && + parts[5].split(",").length == 4) { + try { + String connBiDirectStatsEnd = parts[1] + " " + parts[2]; + long connBiDirectSeconds = Long.parseLong(parts[3]. + substring(1)); + String[] parts2 = parts[5].split(","); + long below = Long.parseLong(parts2[0]); + long read = Long.parseLong(parts2[1]); + long write = Long.parseLong(parts2[2]); + long both = Long.parseLong(parts2[3]); + this.rddi.addConnBiDirect(dir, connBiDirectStatsEnd, + connBiDirectSeconds, below, read, write, both); + } catch (NumberFormatException e) { + this.logger.log(Level.WARNING, "Number format " + + "exception while parsing conn-bi-direct stats " + + "string '" + line + "'. Skipping.", e); + } + } else { + this.logger.warning("Skipping invalid conn-bi-direct " + + "stats string '" + line + "'."); + } + } + } + } + String ascii = new String(data, "US-ASCII"); + String startToken = "extra-info "; + String sigToken = "\nrouter-signature\n"; + String digest = null; + int start = ascii.indexOf(startToken); + int sig = ascii.indexOf(sigToken) + sigToken.length(); + if (start >= 0 || sig >= 0 || sig > start) { + byte[] forDigest = new byte[sig - start]; + System.arraycopy(data, start, forDigest, 0, sig - start); + digest = DigestUtils.shaHex(forDigest); + } + if (this.rddi != null && digest != null) { + this.rddi.addExtraInfoDescriptor(digest, nickname, + dir.toLowerCase(), published, data, bandwidthHistory); } } } catch (IOException e) { this.logger.log(Level.WARNING, "Could not parse descriptor. " + "Skipping.", e); + } catch (ParseException e) { + this.logger.log(Level.WARNING, "Could not parse descriptor. " + + "Skipping.", e); } } }