commit 1caca7c1f4786ef31207b42ed8298998c989487b Author: Karsten Loesing karsten.loesing@gmx.net Date: Mon Aug 20 21:08:19 2018 +0200
Refactor ArchiveReader. --- .../collector/relaydescs/ArchiveReader.java | 100 +++++++++++++-------- .../collector/relaydescs/ArchiveWriter.java | 3 +- 2 files changed, 66 insertions(+), 37 deletions(-)
diff --git a/src/main/java/org/torproject/metrics/collector/relaydescs/ArchiveReader.java b/src/main/java/org/torproject/metrics/collector/relaydescs/ArchiveReader.java index 74700f7..7c59054 100644 --- a/src/main/java/org/torproject/metrics/collector/relaydescs/ArchiveReader.java +++ b/src/main/java/org/torproject/metrics/collector/relaydescs/ArchiveReader.java @@ -46,30 +46,52 @@ public class ArchiveReader { private Map<String, Set<String>> microdescriptorValidAfterTimes = new HashMap<>();
- /** Reads all descriptors from the given directory, possibly using a - * parse history file, and passes them to the given descriptor - * parser. */ - public ArchiveReader(RelayDescriptorParser rdp, File archivesDirectory, - File statsDirectory, boolean keepImportHistory) { + private RelayDescriptorParser rdp; + + private File archivesDirectory; + + private boolean keepImportHistory;
+ private int parsedFiles = 0; + + private int ignoredFiles = 0; + + private SortedSet<String> archivesImportHistory = new TreeSet<>(); + + private File archivesImportHistoryFile; + + /** Initializes an archive reader but without reading any descriptors yet. */ + ArchiveReader(RelayDescriptorParser rdp, File archivesDirectory, + File statsDirectory, boolean keepImportHistory) { if (rdp == null || archivesDirectory == null || statsDirectory == null) { throw new IllegalArgumentException(); } - - rdp.setArchiveReader(this); - int parsedFiles = 0; - int ignoredFiles = 0; - SortedSet<String> archivesImportHistory = new TreeSet<>(); - File archivesImportHistoryFile = new File(statsDirectory, + this.rdp = rdp; + this.rdp.setArchiveReader(this); + this.archivesDirectory = archivesDirectory; + this.keepImportHistory = keepImportHistory; + this.archivesImportHistoryFile = new File(statsDirectory, "archives-import-history"); - if (keepImportHistory && archivesImportHistoryFile.exists()) { + } + + /** Reads all descriptors from the given directory, possibly using a + * parse history file, and passes them to the given descriptor + * parser. */ + public void readDescriptors() { + this.readHistoryFile(); + this.readDescriptorFiles(); + this.writeHistoryFile(); + } + + private void readHistoryFile() { + if (this.keepImportHistory && this.archivesImportHistoryFile.exists()) { try { BufferedReader br = new BufferedReader(new FileReader( - archivesImportHistoryFile)); + this.archivesImportHistoryFile)); String line; while ((line = br.readLine()) != null) { - archivesImportHistory.add(line); + this.archivesImportHistory.add(line); } br.close(); } catch (IOException e) { @@ -77,11 +99,14 @@ public class ArchiveReader { + "history file. Skipping.", e); } } - if (archivesDirectory.exists()) { - logger.debug("Importing files in directory " + archivesDirectory + } + + private void readDescriptorFiles() { + if (this.archivesDirectory.exists()) { + logger.debug("Importing files in directory " + this.archivesDirectory + "/..."); Stack<File> filesInInputDir = new Stack<>(); - filesInInputDir.add(archivesDirectory); + filesInInputDir.add(this.archivesDirectory); List<File> problems = new ArrayList<>(); Set<File> filesToRetry = new HashSet<>(); while (!filesInInputDir.isEmpty()) { @@ -91,9 +116,9 @@ public class ArchiveReader { } else { try { BufferedInputStream bis; - if (keepImportHistory - && archivesImportHistory.contains(pop.getName())) { - ignoredFiles++; + if (this.keepImportHistory + && this.archivesImportHistory.contains(pop.getName())) { + this.ignoredFiles++; continue; } else if (pop.getName().endsWith(".tar.bz2")) { logger.warn("Cannot parse compressed tarball " @@ -116,15 +141,15 @@ public class ArchiveReader { } bis.close(); byte[] allData = baos.toByteArray(); - boolean stored = rdp.parse(allData); + boolean stored = this.rdp.parse(allData); if (!stored) { filesToRetry.add(pop); continue; } - if (keepImportHistory) { - archivesImportHistory.add(pop.getName()); + if (this.keepImportHistory) { + this.archivesImportHistory.add(pop.getName()); } - parsedFiles++; + this.parsedFiles++; } catch (IOException e) { problems.add(pop); if (problems.size() > 3) { @@ -219,10 +244,10 @@ public class ArchiveReader { } } } - if (keepImportHistory) { - archivesImportHistory.add(pop.getName()); + if (this.keepImportHistory) { + this.archivesImportHistory.add(pop.getName()); } - parsedFiles++; + this.parsedFiles++; } catch (IOException e) { problems.add(pop); if (problems.size() > 3) { @@ -232,10 +257,10 @@ public class ArchiveReader { } if (problems.isEmpty()) { logger.debug("Finished importing files in directory " - + archivesDirectory + "/."); + + this.archivesDirectory + "/."); } else { StringBuilder sb = new StringBuilder("Failed importing files in " - + "directory " + archivesDirectory + "/:"); + + "directory " + this.archivesDirectory + "/:"); int printed = 0; for (File f : problems) { sb.append("\n ").append(f.getAbsolutePath()); @@ -246,12 +271,15 @@ public class ArchiveReader { } } } - if (keepImportHistory) { + } + + private void writeHistoryFile() { + if (this.keepImportHistory) { try { - archivesImportHistoryFile.getParentFile().mkdirs(); + this.archivesImportHistoryFile.getParentFile().mkdirs(); BufferedWriter bw = new BufferedWriter(new FileWriter( - archivesImportHistoryFile)); - for (String line : archivesImportHistory) { + this.archivesImportHistoryFile)); + for (String line : this.archivesImportHistory) { bw.write(line + "\n"); } bw.close(); @@ -261,15 +289,15 @@ public class ArchiveReader { } } logger.info("Finished importing relay descriptors from local " - + "directory:\nParsed " + parsedFiles + ", ignored " - + ignoredFiles + " files."); + + "directory:\nParsed " + this.parsedFiles + ", ignored " + + this.ignoredFiles + " files."); }
/** Stores the valid-after time and microdescriptor digests of a given * microdesc consensus, so that microdescriptors (which don't contain a * publication time) can later be sorted into the correct month * folders. */ - public void haveParsedMicrodescConsensus(String validAfterTime, + void haveParsedMicrodescConsensus(String validAfterTime, SortedSet<String> microdescriptorDigests) { for (String microdescriptor : microdescriptorDigests) { if (!this.microdescriptorValidAfterTimes.containsKey( diff --git a/src/main/java/org/torproject/metrics/collector/relaydescs/ArchiveWriter.java b/src/main/java/org/torproject/metrics/collector/relaydescs/ArchiveWriter.java index ac3f5e3..8679439 100644 --- a/src/main/java/org/torproject/metrics/collector/relaydescs/ArchiveWriter.java +++ b/src/main/java/org/torproject/metrics/collector/relaydescs/ArchiveWriter.java @@ -169,7 +169,8 @@ public class ArchiveWriter extends CollecTorMain { new ArchiveReader(rdp, config.getPath(Key.RelayLocalOrigins).toFile(), statsDirectory, - config.getBool(Key.KeepDirectoryArchiveImportHistory)); + config.getBool(Key.KeepDirectoryArchiveImportHistory)) + .readDescriptors(); this.intermediateStats("importing relay descriptors from local " + "directory"); }