commit bd948070e03ff71503fdba84cff6bc61c9fbe452 Author: iwakeh iwakeh@torproject.org Date: Wed Jan 31 13:31:26 2018 +0000
Optimize parallel processing and use static imports for readability. --- .../torproject/collector/webstats/SanitizeWeblogs.java | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-)
diff --git a/src/main/java/org/torproject/collector/webstats/SanitizeWeblogs.java b/src/main/java/org/torproject/collector/webstats/SanitizeWeblogs.java index 4496861..635457c 100644 --- a/src/main/java/org/torproject/collector/webstats/SanitizeWeblogs.java +++ b/src/main/java/org/torproject/collector/webstats/SanitizeWeblogs.java @@ -3,6 +3,9 @@
package org.torproject.collector.webstats;
+import static java.util.stream.Collectors.groupingByConcurrent; +import static java.util.stream.Collectors.toList; + import org.torproject.collector.conf.Configuration; import org.torproject.collector.conf.ConfigurationException; import org.torproject.collector.conf.Key; @@ -37,7 +40,6 @@ import java.util.SortedSet; import java.util.StringJoiner; import java.util.TreeMap; import java.util.TreeSet; -import java.util.stream.Collectors; import java.util.stream.Stream;
/** @@ -106,10 +108,9 @@ public class SanitizeWeblogs extends CollecTorMain { log.info("Processing logs for {} on {}.", virtualHost, physicalHost); Map<LocalDate, List<WebServerAccessLogLine>> linesByDate = physicalEntry.getValue().values().stream().parallel() - .flatMap((LogMetadata metadata) - -> lineStream(metadata).filter((line) -> line.isValid())) - .collect(Collectors.groupingBy(WebServerAccessLogLine::getDate, - Collectors.toList())); + .flatMap((LogMetadata metadata) -> lineStream(metadata) + .filter((line) -> line.isValid())).parallel() + .collect(groupingByConcurrent(WebServerAccessLogLine::getDate)); LocalDate[] interval = determineInterval(linesByDate.keySet()); linesByDate.entrySet().stream() .filter((entry) -> entry.getKey().isAfter(interval[0]) @@ -130,7 +131,7 @@ public class SanitizeWeblogs extends CollecTorMain { List<String> retainedLines = lines .stream().parallel().map((line) -> sanitize(line, date)) .filter((line) -> line.isPresent()).map((line) -> line.get()) - .collect(Collectors.toList()); + .collect(toList()); retainedLines.sort(null); try { WebServerAccessLogPersistence walp @@ -142,6 +143,7 @@ public class SanitizeWeblogs extends CollecTorMain { } catch (DescriptorParseException dpe) { log.error("Cannot store log desriptor {}.", name, dpe); } + lines.clear(); }
static Optional<String> sanitize(WebServerAccessLogLine logLine, @@ -188,7 +190,7 @@ public class SanitizeWeblogs extends CollecTorMain { metadata.fileType.decompress(Files.readAllBytes(metadata.path)))))) { return br.lines() .map((String line) -> WebServerAccessLogLine.makeLine(line)) - .collect(Collectors.toList()).stream(); + .collect(toList()).stream(); } catch (Exception ex) { log.debug("Skipping log-file {}.", metadata.path, ex); }