commit af7f95dcc86328b558f4711a9b8e4b72688df27f Author: Karsten Loesing karsten.loesing@gmx.net Date: Thu Dec 13 13:50:33 2012 +0100
Split up ArchiveReader in two parts. --- src/org/torproject/ernie/cron/ArchiveReader.java | 173 -------------------- .../ernie/cron/BridgeStatsFileHandler.java | 68 +++++++- src/org/torproject/ernie/cron/Main.java | 15 ++- .../cron/RelayDescriptorDatabaseImporter.java | 135 +++++++++++++++- 4 files changed, 207 insertions(+), 184 deletions(-)
diff --git a/src/org/torproject/ernie/cron/ArchiveReader.java b/src/org/torproject/ernie/cron/ArchiveReader.java deleted file mode 100644 index 9126787..0000000 --- a/src/org/torproject/ernie/cron/ArchiveReader.java +++ /dev/null @@ -1,173 +0,0 @@ -/* Copyright 2011, 2012 The Tor Project - * See LICENSE for licensing information */ -package org.torproject.ernie.cron; - -import java.io.File; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.logging.Logger; - -import org.apache.commons.codec.DecoderException; -import org.apache.commons.codec.binary.Hex; -import org.apache.commons.codec.digest.DigestUtils; -import org.torproject.descriptor.Descriptor; -import org.torproject.descriptor.DescriptorFile; -import org.torproject.descriptor.DescriptorReader; -import org.torproject.descriptor.DescriptorSourceFactory; -import org.torproject.descriptor.ExtraInfoDescriptor; -import org.torproject.descriptor.NetworkStatusEntry; -import org.torproject.descriptor.RelayNetworkStatusConsensus; -import org.torproject.descriptor.ServerDescriptor; - -/** - * Read in all files in a given directory and pass buffered readers of - * them to the relay descriptor parser. - */ -public class ArchiveReader { - - /** - * Stats file handler that accepts parse results for bridge statistics. - */ - private BridgeStatsFileHandler bsfh; - - /** - * Relay descriptor database importer that stores relay descriptor - * contents for later evaluation. - */ - private RelayDescriptorDatabaseImporter rddi; - - /** - * Logger for this class. - */ - private Logger logger; - - public ArchiveReader(RelayDescriptorDatabaseImporter rddi, - BridgeStatsFileHandler bsfh, File archivesDirectory, - File statsDirectory, boolean keepImportHistory) { - - if (archivesDirectory == null || - statsDirectory == null) { - throw new IllegalArgumentException(); - } - - this.rddi = rddi; - this.bsfh = bsfh; - - this.logger = Logger.getLogger(ArchiveReader.class.getName()); - if (archivesDirectory.exists()) { - logger.fine("Importing files in directory " + archivesDirectory - + "/..."); - DescriptorReader reader = - DescriptorSourceFactory.createDescriptorReader(); - reader.addDirectory(archivesDirectory); - if (keepImportHistory) { - reader.setExcludeFiles(new File(statsDirectory, - "relay-descriptor-history")); - } - Iterator<DescriptorFile> descriptorFiles = reader.readDescriptors(); - while (descriptorFiles.hasNext()) { - DescriptorFile descriptorFile = descriptorFiles.next(); - if (descriptorFile.getDescriptors() != null) { - for (Descriptor descriptor : descriptorFile.getDescriptors()) { - if (descriptor instanceof RelayNetworkStatusConsensus) { - this.addRelayNetworkStatusConsensus( - (RelayNetworkStatusConsensus) descriptor); - } else if (descriptor instanceof ServerDescriptor) { - this.addServerDescriptor((ServerDescriptor) descriptor); - } else if (descriptor instanceof ExtraInfoDescriptor) { - this.addExtraInfoDescriptor( - (ExtraInfoDescriptor) descriptor); - } - } - } - } - } - - logger.info("Finished importing relay descriptors."); - } - - private void addRelayNetworkStatusConsensus( - RelayNetworkStatusConsensus consensus) { - for (NetworkStatusEntry statusEntry : - consensus.getStatusEntries().values()) { - this.rddi.addStatusEntry(consensus.getValidAfterMillis(), - statusEntry.getNickname(), - statusEntry.getFingerprint().toLowerCase(), - statusEntry.getDescriptor().toLowerCase(), - statusEntry.getPublishedMillis(), statusEntry.getAddress(), - statusEntry.getOrPort(), statusEntry.getDirPort(), - statusEntry.getFlags(), statusEntry.getVersion(), - statusEntry.getBandwidth(), statusEntry.getPortList(), - statusEntry.getStatusEntryBytes()); - try { - this.bsfh.addHashedRelay(DigestUtils.shaHex(Hex.decodeHex( - statusEntry.getFingerprint().toCharArray())).toUpperCase()); - } catch (DecoderException e) { - } - } - this.rddi.addConsensus(consensus.getValidAfterMillis(), - consensus.getRawDescriptorBytes()); - } - - private void addServerDescriptor(ServerDescriptor descriptor) { - this.rddi.addServerDescriptor(descriptor.getServerDescriptorDigest(), - descriptor.getNickname(), descriptor.getAddress(), - descriptor.getOrPort(), descriptor.getDirPort(), - descriptor.getFingerprint(), descriptor.getBandwidthRate(), - descriptor.getBandwidthBurst(), descriptor.getBandwidthObserved(), - descriptor.getPlatform(), descriptor.getPublishedMillis(), - descriptor.getUptime(), descriptor.getExtraInfoDigest(), - descriptor.getRawDescriptorBytes()); - } - - private void addExtraInfoDescriptor(ExtraInfoDescriptor descriptor) { - if (descriptor.getDirreqV3Reqs() != null) { - int allUsers = 0; - Map<String, String> obs = new HashMap<String, String>(); - for (Map.Entry<String, Integer> e : - descriptor.getDirreqV3Reqs().entrySet()) { - String country = e.getKey(); - int users = e.getValue() - 4; - allUsers += users; - obs.put(country, "" + users); - } - obs.put("zy", "" + allUsers); - this.rddi.addDirReqStats(descriptor.getFingerprint(), - descriptor.getDirreqStatsEndMillis(), - descriptor.getDirreqStatsIntervalLength(), obs); - } - if (descriptor.getConnBiDirectStatsEndMillis() >= 0L) { - this.rddi.addConnBiDirect(descriptor.getFingerprint(), - descriptor.getConnBiDirectStatsEndMillis(), - descriptor.getConnBiDirectStatsIntervalLength(), - descriptor.getConnBiDirectBelow(), - descriptor.getConnBiDirectRead(), - descriptor.getConnBiDirectWrite(), - descriptor.getConnBiDirectBoth()); - } - List<String> bandwidthHistoryLines = new ArrayList<String>(); - if (descriptor.getWriteHistory() != null) { - bandwidthHistoryLines.add(descriptor.getWriteHistory().getLine()); - } - if (descriptor.getReadHistory() != null) { - bandwidthHistoryLines.add(descriptor.getReadHistory().getLine()); - } - if (descriptor.getDirreqWriteHistory() != null) { - bandwidthHistoryLines.add( - descriptor.getDirreqWriteHistory().getLine()); - } - if (descriptor.getDirreqReadHistory() != null) { - bandwidthHistoryLines.add( - descriptor.getDirreqReadHistory().getLine()); - } - this.rddi.addExtraInfoDescriptor(descriptor.getExtraInfoDigest(), - descriptor.getNickname(), - descriptor.getFingerprint().toLowerCase(), - descriptor.getPublishedMillis(), - descriptor.getRawDescriptorBytes(), bandwidthHistoryLines); - } -} - diff --git a/src/org/torproject/ernie/cron/BridgeStatsFileHandler.java b/src/org/torproject/ernie/cron/BridgeStatsFileHandler.java index 4534d2f..085394c 100644 --- a/src/org/torproject/ernie/cron/BridgeStatsFileHandler.java +++ b/src/org/torproject/ernie/cron/BridgeStatsFileHandler.java @@ -29,11 +29,16 @@ import java.util.TreeSet; import java.util.logging.Level; import java.util.logging.Logger;
+import org.apache.commons.codec.DecoderException; +import org.apache.commons.codec.binary.Hex; +import org.apache.commons.codec.digest.DigestUtils; import org.torproject.descriptor.Descriptor; import org.torproject.descriptor.DescriptorFile; import org.torproject.descriptor.DescriptorReader; import org.torproject.descriptor.DescriptorSourceFactory; import org.torproject.descriptor.ExtraInfoDescriptor; +import org.torproject.descriptor.NetworkStatusEntry; +import org.torproject.descriptor.RelayNetworkStatusConsensus; import org.torproject.descriptor.ServerDescriptor;
/** @@ -117,7 +122,11 @@ public class BridgeStatsFileHandler {
private File statsDirectory;
- private boolean keepImportHistory; + private boolean keepBridgeDescriptorImportHistory; + + private File archivesDirectory; + + private boolean keepRelayDescriptorImportHistory;
/** * Initializes this class, including reading in intermediate results @@ -125,14 +134,21 @@ public class BridgeStatsFileHandler { * <code>stats/hashed-relay-identities</code>. */ public BridgeStatsFileHandler(String connectionURL, - File bridgesDir, File statsDirectory, boolean keepImportHistory) { + File bridgesDir, File statsDirectory, + boolean keepBridgeDescriptorImportHistory, File archivesDirectory, + boolean keepRelayDescriptorImportHistory) {
- if (bridgesDir == null || statsDirectory == null) { - throw new IllegalArgumentException(); + if (bridgesDir == null || statsDirectory == null || + archivesDirectory == null || statsDirectory == null) { + throw new IllegalArgumentException(); } this.bridgesDir = bridgesDir; this.statsDirectory = statsDirectory; - this.keepImportHistory = keepImportHistory; + this.keepBridgeDescriptorImportHistory = + keepBridgeDescriptorImportHistory; + this.archivesDirectory = archivesDirectory; + this.keepRelayDescriptorImportHistory = + keepRelayDescriptorImportHistory;
/* Initialize set of known countries. */ this.countries = new TreeSet<String>(); @@ -356,7 +372,7 @@ public class BridgeStatsFileHandler { DescriptorReader reader = DescriptorSourceFactory.createDescriptorReader(); reader.addDirectory(bridgesDir); - if (keepImportHistory) { + if (keepBridgeDescriptorImportHistory) { reader.setExcludeFiles(new File(statsDirectory, "bridge-descriptor-history")); } @@ -424,6 +440,46 @@ public class BridgeStatsFileHandler { } }
+ public void importRelayDescriptors() { + if (archivesDirectory.exists()) { + logger.fine("Importing files in directory " + archivesDirectory + + "/..."); + DescriptorReader reader = + DescriptorSourceFactory.createDescriptorReader(); + reader.addDirectory(archivesDirectory); + if (keepRelayDescriptorImportHistory) { + reader.setExcludeFiles(new File(statsDirectory, + "relay-descriptor-history")); + } + Iterator<DescriptorFile> descriptorFiles = reader.readDescriptors(); + while (descriptorFiles.hasNext()) { + DescriptorFile descriptorFile = descriptorFiles.next(); + if (descriptorFile.getDescriptors() != null) { + for (Descriptor descriptor : descriptorFile.getDescriptors()) { + if (descriptor instanceof RelayNetworkStatusConsensus) { + this.addRelayNetworkStatusConsensus( + (RelayNetworkStatusConsensus) descriptor); + } + } + } + } + } + + logger.info("Finished importing relay descriptors."); + } + + private void addRelayNetworkStatusConsensus( + RelayNetworkStatusConsensus consensus) { + for (NetworkStatusEntry statusEntry : + consensus.getStatusEntries().values()) { + try { + this.addHashedRelay(DigestUtils.shaHex(Hex.decodeHex( + statusEntry.getFingerprint().toCharArray())).toUpperCase()); + } catch (DecoderException e) { + } + } + } + /** * Writes the list of hashed relay identities and bridge user numbers as * observed by single bridges to disk, aggregates per-day statistics for diff --git a/src/org/torproject/ernie/cron/Main.java b/src/org/torproject/ernie/cron/Main.java index 457433f..a3f38e6 100644 --- a/src/org/torproject/ernie/cron/Main.java +++ b/src/org/torproject/ernie/cron/Main.java @@ -37,7 +37,9 @@ public class Main { new BridgeStatsFileHandler( config.getRelayDescriptorDatabaseJDBC(), new File(config.getSanitizedBridgesDirectory()), - statsDirectory, config.getKeepSanitizedBridgesImportHistory()) : + statsDirectory, config.getKeepSanitizedBridgesImportHistory(), + new File(config.getDirectoryArchivesDirectory()), + config.getKeepDirectoryArchiveImportHistory()) : null;
// Import relay descriptors @@ -49,11 +51,16 @@ public class Main { config.getWriteRelayDescriptorDatabase() ? config.getRelayDescriptorDatabaseJDBC() : null, config.getWriteRelayDescriptorsRawFiles() ? - config.getRelayDescriptorRawFilesDirectory() : null) : null; - new ArchiveReader(rddi, bsfh, + config.getRelayDescriptorRawFilesDirectory() : null, new File(config.getDirectoryArchivesDirectory()), statsDirectory, - config.getKeepDirectoryArchiveImportHistory()); + config.getKeepDirectoryArchiveImportHistory()) : null; + if (rddi != null) { + rddi.importRelayDescriptors(); + } + if (bsfh != null) { + bsfh.importRelayDescriptors(); + } rddi.closeConnection(); }
diff --git a/src/org/torproject/ernie/cron/RelayDescriptorDatabaseImporter.java b/src/org/torproject/ernie/cron/RelayDescriptorDatabaseImporter.java index 3e7e694..c9d9bc4 100644 --- a/src/org/torproject/ernie/cron/RelayDescriptorDatabaseImporter.java +++ b/src/org/torproject/ernie/cron/RelayDescriptorDatabaseImporter.java @@ -16,8 +16,11 @@ import java.sql.SQLException; import java.sql.Timestamp; import java.text.ParseException; import java.text.SimpleDateFormat; +import java.util.ArrayList; import java.util.Calendar; +import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -28,6 +31,14 @@ import java.util.logging.Level; import java.util.logging.Logger;
import org.postgresql.util.PGbytea; +import org.torproject.descriptor.Descriptor; +import org.torproject.descriptor.DescriptorFile; +import org.torproject.descriptor.DescriptorReader; +import org.torproject.descriptor.DescriptorSourceFactory; +import org.torproject.descriptor.ExtraInfoDescriptor; +import org.torproject.descriptor.NetworkStatusEntry; +import org.torproject.descriptor.RelayNetworkStatusConsensus; +import org.torproject.descriptor.ServerDescriptor;
/** * Parse directory data. @@ -207,12 +218,25 @@ public final class RelayDescriptorDatabaseImporter { private boolean importIntoDatabase; private boolean writeRawImportFiles;
+ private File archivesDirectory; + private File statsDirectory; + private boolean keepImportHistory; + /** * Initialize database importer by connecting to the database and * preparing statements. */ public RelayDescriptorDatabaseImporter(String connectionURL, - String rawFilesDirectory) { + String rawFilesDirectory, File archivesDirectory, + File statsDirectory, boolean keepImportHistory) { + + if (archivesDirectory == null || + statsDirectory == null) { + throw new IllegalArgumentException(); + } + this.archivesDirectory = archivesDirectory; + this.statsDirectory = statsDirectory; + this.keepImportHistory = keepImportHistory;
/* Initialize logger. */ this.logger = Logger.getLogger( @@ -979,6 +1003,115 @@ public final class RelayDescriptorDatabaseImporter { } }
+ public void importRelayDescriptors() { + if (archivesDirectory.exists()) { + logger.fine("Importing files in directory " + archivesDirectory + + "/..."); + DescriptorReader reader = + DescriptorSourceFactory.createDescriptorReader(); + reader.addDirectory(archivesDirectory); + if (keepImportHistory) { + reader.setExcludeFiles(new File(statsDirectory, + "relay-descriptor-history")); + } + Iterator<DescriptorFile> descriptorFiles = reader.readDescriptors(); + while (descriptorFiles.hasNext()) { + DescriptorFile descriptorFile = descriptorFiles.next(); + if (descriptorFile.getDescriptors() != null) { + for (Descriptor descriptor : descriptorFile.getDescriptors()) { + if (descriptor instanceof RelayNetworkStatusConsensus) { + this.addRelayNetworkStatusConsensus( + (RelayNetworkStatusConsensus) descriptor); + } else if (descriptor instanceof ServerDescriptor) { + this.addServerDescriptor((ServerDescriptor) descriptor); + } else if (descriptor instanceof ExtraInfoDescriptor) { + this.addExtraInfoDescriptor( + (ExtraInfoDescriptor) descriptor); + } + } + } + } + } + + logger.info("Finished importing relay descriptors."); + } + + private void addRelayNetworkStatusConsensus( + RelayNetworkStatusConsensus consensus) { + for (NetworkStatusEntry statusEntry : + consensus.getStatusEntries().values()) { + this.addStatusEntry(consensus.getValidAfterMillis(), + statusEntry.getNickname(), + statusEntry.getFingerprint().toLowerCase(), + statusEntry.getDescriptor().toLowerCase(), + statusEntry.getPublishedMillis(), statusEntry.getAddress(), + statusEntry.getOrPort(), statusEntry.getDirPort(), + statusEntry.getFlags(), statusEntry.getVersion(), + statusEntry.getBandwidth(), statusEntry.getPortList(), + statusEntry.getStatusEntryBytes()); + } + this.addConsensus(consensus.getValidAfterMillis(), + consensus.getRawDescriptorBytes()); + } + + private void addServerDescriptor(ServerDescriptor descriptor) { + this.addServerDescriptor(descriptor.getServerDescriptorDigest(), + descriptor.getNickname(), descriptor.getAddress(), + descriptor.getOrPort(), descriptor.getDirPort(), + descriptor.getFingerprint(), descriptor.getBandwidthRate(), + descriptor.getBandwidthBurst(), descriptor.getBandwidthObserved(), + descriptor.getPlatform(), descriptor.getPublishedMillis(), + descriptor.getUptime(), descriptor.getExtraInfoDigest(), + descriptor.getRawDescriptorBytes()); + } + + private void addExtraInfoDescriptor(ExtraInfoDescriptor descriptor) { + if (descriptor.getDirreqV3Reqs() != null) { + int allUsers = 0; + Map<String, String> obs = new HashMap<String, String>(); + for (Map.Entry<String, Integer> e : + descriptor.getDirreqV3Reqs().entrySet()) { + String country = e.getKey(); + int users = e.getValue() - 4; + allUsers += users; + obs.put(country, "" + users); + } + obs.put("zy", "" + allUsers); + this.addDirReqStats(descriptor.getFingerprint(), + descriptor.getDirreqStatsEndMillis(), + descriptor.getDirreqStatsIntervalLength(), obs); + } + if (descriptor.getConnBiDirectStatsEndMillis() >= 0L) { + this.addConnBiDirect(descriptor.getFingerprint(), + descriptor.getConnBiDirectStatsEndMillis(), + descriptor.getConnBiDirectStatsIntervalLength(), + descriptor.getConnBiDirectBelow(), + descriptor.getConnBiDirectRead(), + descriptor.getConnBiDirectWrite(), + descriptor.getConnBiDirectBoth()); + } + List<String> bandwidthHistoryLines = new ArrayList<String>(); + if (descriptor.getWriteHistory() != null) { + bandwidthHistoryLines.add(descriptor.getWriteHistory().getLine()); + } + if (descriptor.getReadHistory() != null) { + bandwidthHistoryLines.add(descriptor.getReadHistory().getLine()); + } + if (descriptor.getDirreqWriteHistory() != null) { + bandwidthHistoryLines.add( + descriptor.getDirreqWriteHistory().getLine()); + } + if (descriptor.getDirreqReadHistory() != null) { + bandwidthHistoryLines.add( + descriptor.getDirreqReadHistory().getLine()); + } + this.addExtraInfoDescriptor(descriptor.getExtraInfoDigest(), + descriptor.getNickname(), + descriptor.getFingerprint().toLowerCase(), + descriptor.getPublishedMillis(), + descriptor.getRawDescriptorBytes(), bandwidthHistoryLines); + } + /** * Close the relay descriptor database connection. */
tor-commits@lists.torproject.org