[tor-commits] [collector/release] Reduce memory footprint and wall time.

karsten at torproject.org karsten at torproject.org
Mon Feb 26 15:26:12 UTC 2018


commit 8557bf6255e6e3745088033e8e7bad7801421686
Author: iwakeh <iwakeh at torproject.org>
Date:   Tue Feb 20 16:30:09 2018 +0000

    Reduce memory footprint and wall time.
    
    Adapt to latest changes of metrics-lib (task-25329) and make use of the high
    redundancy of logs (e.g. a 3G file might only contain 350 different lines).
    This avoids OOM and array out of bounds exceptions for large files (>2G) and
    gives a speed-up of roughly 50%. (The earlier 66min are down to 34min for
    meronense&weschniakowii files plus two larger files.)
    
    There is a BATCH constant, which could be tuned for processing speed. It is
    logged for each webstats module run.  Currently, it is set to 100k.  This
    was more or less arbitrarily chosen and used for all the tests.  A test run
    using 500k didn't show significant differences.
---
 .../persist/WebServerAccessLogPersistence.java     |  8 ---
 .../collector/webstats/SanitizeWeblogs.java        | 61 ++++++++++++++++++----
 2 files changed, 51 insertions(+), 18 deletions(-)

diff --git a/src/main/java/org/torproject/collector/persist/WebServerAccessLogPersistence.java b/src/main/java/org/torproject/collector/persist/WebServerAccessLogPersistence.java
index 792d3a9..dab4112 100644
--- a/src/main/java/org/torproject/collector/persist/WebServerAccessLogPersistence.java
+++ b/src/main/java/org/torproject/collector/persist/WebServerAccessLogPersistence.java
@@ -5,7 +5,6 @@ package org.torproject.collector.persist;
 
 import org.torproject.descriptor.WebServerAccessLog;
 import org.torproject.descriptor.internal.FileType;
-import org.torproject.descriptor.log.InternalLogDescriptor;
 import org.torproject.descriptor.log.InternalWebServerAccessLog;
 
 import org.slf4j.Logger;
@@ -30,13 +29,6 @@ public class WebServerAccessLogPersistence
   /** Prepare storing the given descriptor. */
   public WebServerAccessLogPersistence(WebServerAccessLog desc) {
     super(desc, new byte[0]);
-    byte[] compressedBytes = null;
-    try { // The descriptor bytes have to be stored compressed.
-      compressedBytes = COMPRESSION.compress(desc.getRawDescriptorBytes());
-      ((InternalLogDescriptor)desc).setRawDescriptorBytes(compressedBytes);
-    } catch (Exception ex) {
-      log.warn("Cannot compress ’{}’.  Storing uncompressed.", ex);
-    }
     calculatePaths();
   }
 
diff --git a/src/main/java/org/torproject/collector/webstats/SanitizeWeblogs.java b/src/main/java/org/torproject/collector/webstats/SanitizeWeblogs.java
index 7601898..1f2e922 100644
--- a/src/main/java/org/torproject/collector/webstats/SanitizeWeblogs.java
+++ b/src/main/java/org/torproject/collector/webstats/SanitizeWeblogs.java
@@ -3,6 +3,7 @@
 
 package org.torproject.collector.webstats;
 
+import static java.util.stream.Collectors.counting;
 import static java.util.stream.Collectors.groupingByConcurrent;
 import static java.util.stream.Collectors.toList;
 
@@ -17,6 +18,7 @@ import org.torproject.collector.persist.WebServerAccessLogPersistence;
 import org.torproject.descriptor.DescriptorParseException;
 import org.torproject.descriptor.Method;
 import org.torproject.descriptor.WebServerAccessLog;
+import org.torproject.descriptor.internal.FileType;
 import org.torproject.descriptor.log.InternalLogDescriptor;
 import org.torproject.descriptor.log.InternalWebServerAccessLog;
 import org.torproject.descriptor.log.WebServerAccessLogImpl;
@@ -26,8 +28,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
-import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.InputStreamReader;
+import java.io.OutputStream;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.time.LocalDate;
@@ -40,6 +43,7 @@ import java.util.SortedSet;
 import java.util.StringJoiner;
 import java.util.TreeMap;
 import java.util.TreeSet;
+import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 /**
@@ -87,6 +91,7 @@ public class SanitizeWeblogs extends CollecTorMain {
       Set<SourceType> sources = this.config.getSourceTypeSet(
           Key.WebstatsSources);
       if (sources.contains(SourceType.Local)) {
+        log.info("Processing logs using batch value {}.", BATCH);
         findCleanWrite(this.config.getPath(Key.WebstatsLocalOrigins));
         PersistenceUtils.cleanDirectory(this.config.getPath(Key.RecentPath));
       }
@@ -126,24 +131,60 @@ public class SanitizeWeblogs extends CollecTorMain {
     String name = new StringJoiner(InternalLogDescriptor.SEP)
         .add(virtualHost).add(physicalHost)
         .add(InternalWebServerAccessLog.MARKER)
-        .add(date.format(DateTimeFormatter.BASIC_ISO_DATE)).toString();
+        .add(date.format(DateTimeFormatter.BASIC_ISO_DATE))
+        .toString() + "." + FileType.XZ.name().toLowerCase();
     log.debug("Sanitizing {}.", name);
-    List<String> retainedLines = lines
+    Map<String, Long> retainedLines = new TreeMap<>(lines
         .stream().parallel().map((line) -> sanitize(line, date))
-        .filter((line) -> line.isPresent()).map((line) -> line.get())
-        .collect(toList());
-    retainedLines.sort(null);
+        .filter((line) -> line.isPresent())
+        .map((line) -> line.get())
+        .collect(groupingByConcurrent(line -> line, counting())));
+    lines.clear(); // not needed anymore
     try {
       WebServerAccessLogPersistence walp
           = new WebServerAccessLogPersistence(
-          new WebServerAccessLogImpl(retainedLines, name, false));
+          new WebServerAccessLogImpl(toCompressedBytes(retainedLines),
+          name, false));
       log.debug("Storing {}.", name);
       walp.storeOut(this.outputPathName);
       walp.storeRecent(this.recentPathName);
     } catch (DescriptorParseException dpe) {
       log.error("Cannot store log desriptor {}.", name, dpe);
+    } catch (Throwable th) { // catch all else
+      log.error("Serious problem.  Cannot store log desriptor {}.", name, th);
     }
-    lines.clear();
+  }
+
+  private static final int BATCH = 100_000;
+
+  static byte[] toCompressedBytes(Map<String, Long> lines)
+    throws DescriptorParseException {
+    try (ByteArrayOutputStream baos =  new ByteArrayOutputStream();
+         OutputStream os = FileType.XZ.outputStream(baos)) {
+      for (Map.Entry<String, Long> entry : lines.entrySet()) {
+        long count = entry.getValue();
+        byte[] batch = bytesFor(entry.getKey(), BATCH);
+        while (count > 0) {
+          if (count > BATCH) {
+            os.write(batch);
+            count -= BATCH;
+          } else {
+            os.write(bytesFor(entry.getKey(), count));
+            break;
+          }
+        }
+      }
+      os.flush();
+      os.close();
+      return baos.toByteArray();
+    } catch (Exception ex) {
+      throw new DescriptorParseException(ex.getMessage());
+    }
+  }
+
+  private static byte[] bytesFor(String line, long times) {
+    return Stream.of(line).limit(times)
+        .collect(Collectors.joining("\n", "", "\n")).getBytes();
   }
 
   static Optional<String> sanitize(WebServerAccessLogLine logLine,
@@ -186,8 +227,8 @@ public class SanitizeWeblogs extends CollecTorMain {
   private Stream<WebServerAccessLogLine> lineStream(LogMetadata metadata) {
     log.debug("Processing file {}.", metadata.path);
     try (BufferedReader br
-        = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(
-         metadata.fileType.decompress(Files.readAllBytes(metadata.path)))))) {
+        = new BufferedReader(new InputStreamReader(
+         metadata.fileType.decompress(Files.newInputStream(metadata.path))))) {
       return br.lines()
           .map((String line) -> WebServerAccessLogLine.makeLine(line))
           .collect(toList()).stream();





More information about the tor-commits mailing list