commit 8557bf6255e6e3745088033e8e7bad7801421686 Author: iwakeh iwakeh@torproject.org Date: Tue Feb 20 16:30:09 2018 +0000
Reduce memory footprint and wall time.
Adapt to latest changes of metrics-lib (task-25329) and make use of the high redundancy of logs (e.g. a 3G file might only contain 350 different lines). This avoids OOM and array out of bounds exceptions for large files (>2G) and gives a speed-up of roughly 50%. (The earlier 66min are down to 34min for meronense&weschniakowii files plus two larger files.)
There is a BATCH constant, which could be tuned for processing speed. It is logged for each webstats module run. Currently, it is set to 100k. This was more or less arbitrarily chosen and used for all the tests. A test run using 500k didn't show significant differences. --- .../persist/WebServerAccessLogPersistence.java | 8 --- .../collector/webstats/SanitizeWeblogs.java | 61 ++++++++++++++++++---- 2 files changed, 51 insertions(+), 18 deletions(-)
diff --git a/src/main/java/org/torproject/collector/persist/WebServerAccessLogPersistence.java b/src/main/java/org/torproject/collector/persist/WebServerAccessLogPersistence.java index 792d3a9..dab4112 100644 --- a/src/main/java/org/torproject/collector/persist/WebServerAccessLogPersistence.java +++ b/src/main/java/org/torproject/collector/persist/WebServerAccessLogPersistence.java @@ -5,7 +5,6 @@ package org.torproject.collector.persist;
import org.torproject.descriptor.WebServerAccessLog; import org.torproject.descriptor.internal.FileType; -import org.torproject.descriptor.log.InternalLogDescriptor; import org.torproject.descriptor.log.InternalWebServerAccessLog;
import org.slf4j.Logger; @@ -30,13 +29,6 @@ public class WebServerAccessLogPersistence /** Prepare storing the given descriptor. */ public WebServerAccessLogPersistence(WebServerAccessLog desc) { super(desc, new byte[0]); - byte[] compressedBytes = null; - try { // The descriptor bytes have to be stored compressed. - compressedBytes = COMPRESSION.compress(desc.getRawDescriptorBytes()); - ((InternalLogDescriptor)desc).setRawDescriptorBytes(compressedBytes); - } catch (Exception ex) { - log.warn("Cannot compress ’{}’. Storing uncompressed.", ex); - } calculatePaths(); }
diff --git a/src/main/java/org/torproject/collector/webstats/SanitizeWeblogs.java b/src/main/java/org/torproject/collector/webstats/SanitizeWeblogs.java index 7601898..1f2e922 100644 --- a/src/main/java/org/torproject/collector/webstats/SanitizeWeblogs.java +++ b/src/main/java/org/torproject/collector/webstats/SanitizeWeblogs.java @@ -3,6 +3,7 @@
package org.torproject.collector.webstats;
+import static java.util.stream.Collectors.counting; import static java.util.stream.Collectors.groupingByConcurrent; import static java.util.stream.Collectors.toList;
@@ -17,6 +18,7 @@ import org.torproject.collector.persist.WebServerAccessLogPersistence; import org.torproject.descriptor.DescriptorParseException; import org.torproject.descriptor.Method; import org.torproject.descriptor.WebServerAccessLog; +import org.torproject.descriptor.internal.FileType; import org.torproject.descriptor.log.InternalLogDescriptor; import org.torproject.descriptor.log.InternalWebServerAccessLog; import org.torproject.descriptor.log.WebServerAccessLogImpl; @@ -26,8 +28,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory;
import java.io.BufferedReader; -import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.InputStreamReader; +import java.io.OutputStream; import java.nio.file.Files; import java.nio.file.Path; import java.time.LocalDate; @@ -40,6 +43,7 @@ import java.util.SortedSet; import java.util.StringJoiner; import java.util.TreeMap; import java.util.TreeSet; +import java.util.stream.Collectors; import java.util.stream.Stream;
/** @@ -87,6 +91,7 @@ public class SanitizeWeblogs extends CollecTorMain { Set<SourceType> sources = this.config.getSourceTypeSet( Key.WebstatsSources); if (sources.contains(SourceType.Local)) { + log.info("Processing logs using batch value {}.", BATCH); findCleanWrite(this.config.getPath(Key.WebstatsLocalOrigins)); PersistenceUtils.cleanDirectory(this.config.getPath(Key.RecentPath)); } @@ -126,24 +131,60 @@ public class SanitizeWeblogs extends CollecTorMain { String name = new StringJoiner(InternalLogDescriptor.SEP) .add(virtualHost).add(physicalHost) .add(InternalWebServerAccessLog.MARKER) - .add(date.format(DateTimeFormatter.BASIC_ISO_DATE)).toString(); + .add(date.format(DateTimeFormatter.BASIC_ISO_DATE)) + .toString() + "." + FileType.XZ.name().toLowerCase(); log.debug("Sanitizing {}.", name); - List<String> retainedLines = lines + Map<String, Long> retainedLines = new TreeMap<>(lines .stream().parallel().map((line) -> sanitize(line, date)) - .filter((line) -> line.isPresent()).map((line) -> line.get()) - .collect(toList()); - retainedLines.sort(null); + .filter((line) -> line.isPresent()) + .map((line) -> line.get()) + .collect(groupingByConcurrent(line -> line, counting()))); + lines.clear(); // not needed anymore try { WebServerAccessLogPersistence walp = new WebServerAccessLogPersistence( - new WebServerAccessLogImpl(retainedLines, name, false)); + new WebServerAccessLogImpl(toCompressedBytes(retainedLines), + name, false)); log.debug("Storing {}.", name); walp.storeOut(this.outputPathName); walp.storeRecent(this.recentPathName); } catch (DescriptorParseException dpe) { log.error("Cannot store log desriptor {}.", name, dpe); + } catch (Throwable th) { // catch all else + log.error("Serious problem. Cannot store log desriptor {}.", name, th); } - lines.clear(); + } + + private static final int BATCH = 100_000; + + static byte[] toCompressedBytes(Map<String, Long> lines) + throws DescriptorParseException { + try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); + OutputStream os = FileType.XZ.outputStream(baos)) { + for (Map.Entry<String, Long> entry : lines.entrySet()) { + long count = entry.getValue(); + byte[] batch = bytesFor(entry.getKey(), BATCH); + while (count > 0) { + if (count > BATCH) { + os.write(batch); + count -= BATCH; + } else { + os.write(bytesFor(entry.getKey(), count)); + break; + } + } + } + os.flush(); + os.close(); + return baos.toByteArray(); + } catch (Exception ex) { + throw new DescriptorParseException(ex.getMessage()); + } + } + + private static byte[] bytesFor(String line, long times) { + return Stream.of(line).limit(times) + .collect(Collectors.joining("\n", "", "\n")).getBytes(); }
static Optional<String> sanitize(WebServerAccessLogLine logLine, @@ -186,8 +227,8 @@ public class SanitizeWeblogs extends CollecTorMain { private Stream<WebServerAccessLogLine> lineStream(LogMetadata metadata) { log.debug("Processing file {}.", metadata.path); try (BufferedReader br - = new BufferedReader(new InputStreamReader(new ByteArrayInputStream( - metadata.fileType.decompress(Files.readAllBytes(metadata.path)))))) { + = new BufferedReader(new InputStreamReader( + metadata.fileType.decompress(Files.newInputStream(metadata.path))))) { return br.lines() .map((String line) -> WebServerAccessLogLine.makeLine(line)) .collect(toList()).stream();