commit bd948070e03ff71503fdba84cff6bc61c9fbe452
Author: iwakeh <iwakeh(a)torproject.org>
Date: Wed Jan 31 13:31:26 2018 +0000
Optimize parallel processing and use static imports for readability.
---
.../torproject/collector/webstats/SanitizeWeblogs.java | 16 +++++++++-------
1 file changed, 9 insertions(+), 7 deletions(-)
diff --git a/src/main/java/org/torproject/collector/webstats/SanitizeWeblogs.java b/src/main/java/org/torproject/collector/webstats/SanitizeWeblogs.java
index 4496861..635457c 100644
--- a/src/main/java/org/torproject/collector/webstats/SanitizeWeblogs.java
+++ b/src/main/java/org/torproject/collector/webstats/SanitizeWeblogs.java
@@ -3,6 +3,9 @@
package org.torproject.collector.webstats;
+import static java.util.stream.Collectors.groupingByConcurrent;
+import static java.util.stream.Collectors.toList;
+
import org.torproject.collector.conf.Configuration;
import org.torproject.collector.conf.ConfigurationException;
import org.torproject.collector.conf.Key;
@@ -37,7 +40,6 @@ import java.util.SortedSet;
import java.util.StringJoiner;
import java.util.TreeMap;
import java.util.TreeSet;
-import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
@@ -106,10 +108,9 @@ public class SanitizeWeblogs extends CollecTorMain {
log.info("Processing logs for {} on {}.", virtualHost, physicalHost);
Map<LocalDate, List<WebServerAccessLogLine>> linesByDate
= physicalEntry.getValue().values().stream().parallel()
- .flatMap((LogMetadata metadata)
- -> lineStream(metadata).filter((line) -> line.isValid()))
- .collect(Collectors.groupingBy(WebServerAccessLogLine::getDate,
- Collectors.toList()));
+ .flatMap((LogMetadata metadata) -> lineStream(metadata)
+ .filter((line) -> line.isValid())).parallel()
+ .collect(groupingByConcurrent(WebServerAccessLogLine::getDate));
LocalDate[] interval = determineInterval(linesByDate.keySet());
linesByDate.entrySet().stream()
.filter((entry) -> entry.getKey().isAfter(interval[0])
@@ -130,7 +131,7 @@ public class SanitizeWeblogs extends CollecTorMain {
List<String> retainedLines = lines
.stream().parallel().map((line) -> sanitize(line, date))
.filter((line) -> line.isPresent()).map((line) -> line.get())
- .collect(Collectors.toList());
+ .collect(toList());
retainedLines.sort(null);
try {
WebServerAccessLogPersistence walp
@@ -142,6 +143,7 @@ public class SanitizeWeblogs extends CollecTorMain {
} catch (DescriptorParseException dpe) {
log.error("Cannot store log desriptor {}.", name, dpe);
}
+ lines.clear();
}
static Optional<String> sanitize(WebServerAccessLogLine logLine,
@@ -188,7 +190,7 @@ public class SanitizeWeblogs extends CollecTorMain {
metadata.fileType.decompress(Files.readAllBytes(metadata.path)))))) {
return br.lines()
.map((String line) -> WebServerAccessLogLine.makeLine(line))
- .collect(Collectors.toList()).stream();
+ .collect(toList()).stream();
} catch (Exception ex) {
log.debug("Skipping log-file {}.", metadata.path, ex);
}