[tor-commits] [collector/master] Avoid reprocessing webstats files.

karsten at torproject.org karsten at torproject.org
Tue Jan 14 16:06:28 UTC 2020


commit d48163379c2604626a62da775aafe68b5be62186
Author: Karsten Loesing <karsten.loesing at gmx.net>
Date:   Wed Dec 11 12:22:40 2019 +0100

    Avoid reprocessing webstats files.
    
    Web servers typically provide us with the last 14 days of request
    logs. We shouldn't process the whole 14 days over and over. Instead we
    should only process new logs files and any other log files containing
    log lines from newly written dates.
    
    In some cases web servers stop serving a given virtual host or stop
    acting as web server at all. However, in these cases we're left with
    14 days of logs per virtual host. Ideally, these logs would get
    cleaned up, but until that's the case, we should at least not
    reprocess these files over and over.
    
    In order to avoid reprocessing webstats files, we need a new state
    file with log dates contained in given input files. We use that state
    file to determine which of the previously processed webstats files to
    re-process, so that we can write complete daily logs.
---
 CHANGELOG.md                                       |   1 +
 .../metrics/collector/webstats/LogMetadata.java    |  17 +++
 .../collector/webstats/SanitizeWeblogs.java        | 155 ++++++++++++++++++---
 .../collector/webstats/WebServerAccessLogImpl.java |  19 +++
 4 files changed, 170 insertions(+), 22 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index 73abdea..fe7937c 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -3,6 +3,7 @@
  * Medium changes
    - Give up on periodically checking the configuration file for
      updates and reloading it in case of changes.
+   - Avoid reprocessing webstats files.
 
  * Minor changes
    - Remove dependency on metrics-lib's internal package.
diff --git a/src/main/java/org/torproject/metrics/collector/webstats/LogMetadata.java b/src/main/java/org/torproject/metrics/collector/webstats/LogMetadata.java
index 879e8d7..b30c13a 100644
--- a/src/main/java/org/torproject/metrics/collector/webstats/LogMetadata.java
+++ b/src/main/java/org/torproject/metrics/collector/webstats/LogMetadata.java
@@ -81,5 +81,22 @@ public class LogMetadata {
     }
     return Optional.ofNullable(metadata);
   }
+
+  @Override
+  public boolean equals(Object other) {
+    if (this == other) {
+      return true;
+    }
+    if (other == null || getClass() != other.getClass()) {
+      return false;
+    }
+    LogMetadata that = (LogMetadata) other;
+    return path.toString().equals(that.path.toString());
+  }
+
+  @Override
+  public int hashCode() {
+    return path.hashCode();
+  }
 }
 
diff --git a/src/main/java/org/torproject/metrics/collector/webstats/SanitizeWeblogs.java b/src/main/java/org/torproject/metrics/collector/webstats/SanitizeWeblogs.java
index fc7c64f..b3fee06 100644
--- a/src/main/java/org/torproject/metrics/collector/webstats/SanitizeWeblogs.java
+++ b/src/main/java/org/torproject/metrics/collector/webstats/SanitizeWeblogs.java
@@ -25,14 +25,18 @@ import org.slf4j.LoggerFactory;
 import java.io.BufferedReader;
 import java.io.ByteArrayOutputStream;
 import java.io.File;
+import java.io.IOException;
 import java.io.InputStreamReader;
 import java.io.OutputStream;
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.time.LocalDate;
 import java.time.format.DateTimeFormatter;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -56,8 +60,10 @@ public class SanitizeWeblogs extends CollecTorMain {
   private static final int LIMIT = 2;
 
   private static final String WEBSTATS = "webstats";
-  private String outputPathName;
-  private String recentPathName;
+  private Path outputDirectory;
+  private Path recentDirectory;
+  private Path processedWebstatsFile;
+
   private boolean limits;
 
   /**
@@ -84,14 +90,22 @@ public class SanitizeWeblogs extends CollecTorMain {
     try {
       Files.createDirectories(this.config.getPath(Key.OutputPath));
       Files.createDirectories(this.config.getPath(Key.RecentPath));
-      this.outputPathName = this.config.getPath(Key.OutputPath).toString();
-      this.recentPathName = this.config.getPath(Key.RecentPath).toString();
+      Files.createDirectories(this.config.getPath(Key.StatsPath));
+      this.outputDirectory = this.config.getPath(Key.OutputPath);
+      this.recentDirectory = this.config.getPath(Key.RecentPath);
+      this.processedWebstatsFile = this.config.getPath(Key.StatsPath)
+          .resolve("processed-webstats");
       this.limits = this.config.getBool(Key.WebstatsLimits);
       Set<SourceType> sources = this.config.getSourceTypeSet(
           Key.WebstatsSources);
       if (sources.contains(SourceType.Local)) {
         log.info("Processing logs using batch value {}.", BATCH);
-        findCleanWrite(this.config.getPath(Key.WebstatsLocalOrigins));
+        Map<LogMetadata, Set<LocalDate>> previouslyProcessedWebstats
+            = this.readProcessedWebstats();
+        Map<LogMetadata, Set<LocalDate>> newlyProcessedWebstats
+            = this.findCleanWrite(this.config.getPath(Key.WebstatsLocalOrigins),
+            previouslyProcessedWebstats);
+        this.writeProcessedWebstats(newlyProcessedWebstats);
         long cutOffMillis = System.currentTimeMillis()
             - 3L * 24L * 60L * 60L * 1000L;
         PersistenceUtils.cleanDirectory(this.config.getPath(Key.RecentPath),
@@ -103,7 +117,32 @@ public class SanitizeWeblogs extends CollecTorMain {
     }
   }
 
-  private void findCleanWrite(Path dir) {
+  private Map<LogMetadata, Set<LocalDate>> readProcessedWebstats() {
+    Map<LogMetadata, Set<LocalDate>> processedWebstats = new HashMap<>();
+    if (Files.exists(this.processedWebstatsFile)) {
+      try {
+        for (String line : Files.readAllLines(this.processedWebstatsFile)) {
+          String[] lineParts = line.split(",", 2);
+          Optional<LogMetadata> logMetadata
+              = LogMetadata.create(Paths.get(lineParts[1]));
+          if (logMetadata.isPresent()) {
+            processedWebstats.putIfAbsent(logMetadata.get(), new HashSet<>());
+            LocalDate containedLogDate = LocalDate.parse(lineParts[0]);
+            processedWebstats.get(logMetadata.get()).add(containedLogDate);
+          }
+        }
+      } catch (IOException e) {
+        log.error("Cannot read state file {}.", this.processedWebstatsFile, e);
+      }
+      log.debug("Read state file containing {} log files.",
+          processedWebstats.size());
+    }
+    return processedWebstats;
+  }
+
+  private Map<LogMetadata, Set<LocalDate>> findCleanWrite(Path dir,
+      Map<LogMetadata, Set<LocalDate>> previouslyProcessedWebstats) {
+    Map<LogMetadata, Set<LocalDate>> newlyProcessedWebstats = new HashMap<>();
     LogFileMap fileMapIn = new LogFileMap(dir);
     log.info("Found log files for {} virtual hosts.", fileMapIn.size());
     for (Map.Entry<String,TreeMap<String,TreeMap<LocalDate,LogMetadata>>>
@@ -113,24 +152,76 @@ public class SanitizeWeblogs extends CollecTorMain {
           : virtualEntry.getValue().entrySet()) {
         String physicalHost = physicalEntry.getKey();
         log.info("Processing logs for {} on {}.", virtualHost, physicalHost);
-        Map<LocalDate, Map<String, Long>> linesByDate
-            = physicalEntry.getValue().values().stream().parallel()
-            .flatMap(metadata -> sanitzedLineStream(metadata).entrySet()
-            .stream())
-            .collect(groupingBy(Map.Entry::getKey,
-              reducing(Collections.emptyMap(), Map.Entry::getValue,
-                (e1, e2) -> Stream.concat(e1.entrySet().stream(), e2.entrySet()
-                  .stream())
-                  .collect(groupingByConcurrent(Map.Entry::getKey,
-                  summingLong(Map.Entry::getValue))))));
-        LocalDate[] interval = determineInterval(linesByDate.keySet());
-        linesByDate.entrySet().stream()
-            .filter((entry) -> entry.getKey().isAfter(interval[0])
-              && entry.getKey().isBefore(interval[1])).parallel()
+        /* Go through current input log files for given virtual and physical
+         * host, and either look up contained log dates from the last execution,
+         * or parse files to memory now. */
+        Map<LocalDate, Map<String, Long>> sanitizedLinesByDate
+            = new HashMap<>();
+        Set<LogMetadata> previouslyReadFiles = new HashSet<>();
+        for (LogMetadata logMetadata : physicalEntry.getValue().values()) {
+          Set<LocalDate> containedLogDates;
+          if (previouslyProcessedWebstats.containsKey(logMetadata)) {
+            containedLogDates = previouslyProcessedWebstats.get(logMetadata);
+            for (LocalDate date : containedLogDates) {
+              sanitizedLinesByDate.putIfAbsent(date, new TreeMap<>());
+            }
+            previouslyReadFiles.add(logMetadata);
+          } else {
+            containedLogDates = sanitizeWebstatsLog(sanitizedLinesByDate,
+                logMetadata);
+          }
+          newlyProcessedWebstats.put(logMetadata, containedLogDates);
+        }
+        /* Determine log dates that are safe to be written to disk now and that
+         * we didn't write to disk before. */
+        Set<LocalDate> storeDates = new HashSet<>();
+        LocalDate[] interval = determineInterval(sanitizedLinesByDate.keySet());
+        for (LocalDate newDate : sanitizedLinesByDate.keySet()) {
+          if (newDate.isAfter(interval[0]) && newDate.isBefore(interval[1])) {
+            WebServerAccessLogPersistence walp
+                = new WebServerAccessLogPersistence(
+                new WebServerAccessLogImpl(virtualHost, physicalHost, newDate));
+            Path outputPath = this.outputDirectory
+                .resolve(walp.getStoragePath());
+            if (!Files.exists(outputPath)) {
+              storeDates.add(newDate);
+            }
+          }
+        }
+        /* Reprocess previously read files containing log dates that we're going
+         * to write to disk below. */
+        for (LogMetadata previouslyReadFile : previouslyReadFiles) {
+          if (!Collections.disjoint(storeDates,
+              newlyProcessedWebstats.get(previouslyReadFile))) {
+            sanitizeWebstatsLog(sanitizedLinesByDate, previouslyReadFile);
+          }
+        }
+        /* Write sanitized log files to disk. */
+        sanitizedLinesByDate.entrySet().stream()
+            .filter((entry) -> storeDates.contains(entry.getKey())).parallel()
             .forEach((entry) -> storeSortedAndForget(virtualHost, physicalHost,
               entry.getKey(), entry.getValue()));
       }
     }
+    return newlyProcessedWebstats;
+  }
+
+  private Set<LocalDate> sanitizeWebstatsLog(
+      Map<LocalDate, Map<String, Long>> sanitizedLinesByDate,
+      LogMetadata logFile) {
+    Map<LocalDate, Map<String, Long>> newlySanitizedLinesByDate
+        = sanitzedLineStream(logFile);
+    for (Map.Entry<LocalDate, Map<String, Long>> e
+        : newlySanitizedLinesByDate.entrySet()) {
+      sanitizedLinesByDate.putIfAbsent(e.getKey(), new TreeMap<>());
+      Map<String, Long> newlySanitizedLines
+          = sanitizedLinesByDate.get(e.getKey());
+      for (Map.Entry<String, Long> e1 : e.getValue().entrySet()) {
+        newlySanitizedLines.put(e1.getKey(),
+            newlySanitizedLines.getOrDefault(e1.getKey(), 0L) + e1.getValue());
+      }
+    }
+    return newlySanitizedLinesByDate.keySet();
   }
 
   private void storeSortedAndForget(String virtualHost, String physicalHost,
@@ -149,8 +240,8 @@ public class SanitizeWeblogs extends CollecTorMain {
           new WebServerAccessLogImpl(toCompressedBytes(retainedLines),
           new File(name), name));
       log.debug("Storing {}.", name);
-      walp.storeOut(this.outputPathName);
-      walp.storeRecent(this.recentPathName);
+      walp.storeOut(this.outputDirectory.toString());
+      walp.storeRecent(this.recentDirectory.toString());
     } catch (DescriptorParseException dpe) {
       log.error("Cannot store log desriptor {}.", name, dpe);
     } catch (Throwable th) { // catch all else
@@ -279,5 +370,25 @@ public class SanitizeWeblogs extends CollecTorMain {
     return Collections.emptyMap();
   }
 
+  private void writeProcessedWebstats(
+      Map<LogMetadata, Set<LocalDate>> newlyProcessedWebstats) {
+    try {
+      if (!Files.exists(this.processedWebstatsFile.getParent())) {
+        Files.createDirectories(this.processedWebstatsFile.getParent());
+      }
+      List<String> lines = new ArrayList<>();
+      for (Map.Entry<LogMetadata, Set<LocalDate>> e
+          : newlyProcessedWebstats.entrySet()) {
+        for (LocalDate logLineDate : e.getValue()) {
+          lines.add(String.format("%s,%s", logLineDate, e.getKey().path));
+        }
+      }
+      Files.write(this.processedWebstatsFile, lines);
+    } catch (IOException e) {
+      log.error("Cannot write state file {}.", this.processedWebstatsFile, e);
+    }
+    log.debug("Wrote state file containing {} log files.",
+        newlyProcessedWebstats.size());
+  }
 }
 
diff --git a/src/main/java/org/torproject/metrics/collector/webstats/WebServerAccessLogImpl.java b/src/main/java/org/torproject/metrics/collector/webstats/WebServerAccessLogImpl.java
index af77c94..f091aa1 100644
--- a/src/main/java/org/torproject/metrics/collector/webstats/WebServerAccessLogImpl.java
+++ b/src/main/java/org/torproject/metrics/collector/webstats/WebServerAccessLogImpl.java
@@ -104,6 +104,25 @@ public class WebServerAccessLogImpl implements WebServerAccessLog {
     }
   }
 
+  /**
+   * Creates an empty WebServerAccessLog from the given filename parts.
+   *
+   * <p>This instance is not intended to be written to disk, as it doesn't have
+   * any content. The main intention of this instance is to compute storage
+   * paths.</p>
+   *
+   * @param virtualHost Virtual host name.
+   * @param physicalHost Physical host name.
+   * @param logDate Log date.
+   */
+  protected WebServerAccessLogImpl(String virtualHost, String physicalHost,
+      LocalDate logDate) {
+    this.descriptorFile = null;
+    this.virtualHost = virtualHost;
+    this.physicalHost = physicalHost;
+    this.logDate = logDate;
+  }
+
   @Override
   public InputStream decompressedByteStream() throws DescriptorParseException {
     try {



More information about the tor-commits mailing list