commit d05b4e4aee3bc15c3e4d5bac660dfcee5bc26279 Author: iwakeh iwakeh@torproject.org Date: Tue Feb 20 16:30:14 2018 +0000
Circumvent Collection (integer) size limit.
Clean log lines immediately when they are read and also make use of sanitized log's high redundancy immediately, i.e., continue with maps of <LocalDate, <Map<String, Long>>.
Rename method(s) to reflect what they do. --- .../collector/webstats/SanitizeWeblogs.java | 89 ++++++++++++++++------ 1 file changed, 65 insertions(+), 24 deletions(-)
diff --git a/src/main/java/org/torproject/collector/webstats/SanitizeWeblogs.java b/src/main/java/org/torproject/collector/webstats/SanitizeWeblogs.java index 1f2e922..5a270dd 100644 --- a/src/main/java/org/torproject/collector/webstats/SanitizeWeblogs.java +++ b/src/main/java/org/torproject/collector/webstats/SanitizeWeblogs.java @@ -4,8 +4,10 @@ package org.torproject.collector.webstats;
import static java.util.stream.Collectors.counting; +import static java.util.stream.Collectors.groupingBy; import static java.util.stream.Collectors.groupingByConcurrent; -import static java.util.stream.Collectors.toList; +import static java.util.stream.Collectors.reducing; +import static java.util.stream.Collectors.summingLong;
import org.torproject.collector.conf.Configuration; import org.torproject.collector.conf.ConfigurationException; @@ -35,6 +37,8 @@ import java.nio.file.Files; import java.nio.file.Path; import java.time.LocalDate; import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; @@ -111,35 +115,36 @@ public class SanitizeWeblogs extends CollecTorMain { : virtualEntry.getValue().entrySet()) { String physicalHost = physicalEntry.getKey(); log.info("Processing logs for {} on {}.", virtualHost, physicalHost); - Map<LocalDate, List<WebServerAccessLogLine>> linesByDate + Map<LocalDate, Map<String, Long>> linesByDate = physicalEntry.getValue().values().stream().parallel() - .flatMap((LogMetadata metadata) -> lineStream(metadata) - .filter((line) -> line.isValid())).parallel() - .collect(groupingByConcurrent(WebServerAccessLogLine::getDate)); + .flatMap(metadata -> sanitzedLineStream(metadata).entrySet() + .stream()) + .collect(groupingBy(Map.Entry::getKey, + reducing(Collections.emptyMap(), Map.Entry::getValue, + (e1, e2) -> Stream.concat(e1.entrySet().stream(), e2.entrySet() + .stream()) + .collect(groupingByConcurrent(Map.Entry::getKey, + summingLong(Map.Entry::getValue)))))); LocalDate[] interval = determineInterval(linesByDate.keySet()); linesByDate.entrySet().stream() .filter((entry) -> entry.getKey().isAfter(interval[0]) && entry.getKey().isBefore(interval[1])).parallel() - .forEach((entry) -> storeSanitized(virtualHost, physicalHost, + .forEach((entry) -> storeSortedAndForget(virtualHost, physicalHost, entry.getKey(), entry.getValue())); } } }
- private void storeSanitized(String virtualHost, String physicalHost, - LocalDate date, List<WebServerAccessLogLine> lines) { + private void storeSortedAndForget(String virtualHost, String physicalHost, + LocalDate date, Map<String, Long> lineCounts) { String name = new StringJoiner(InternalLogDescriptor.SEP) .add(virtualHost).add(physicalHost) .add(InternalWebServerAccessLog.MARKER) .add(date.format(DateTimeFormatter.BASIC_ISO_DATE)) .toString() + "." + FileType.XZ.name().toLowerCase(); - log.debug("Sanitizing {}.", name); - Map<String, Long> retainedLines = new TreeMap<>(lines - .stream().parallel().map((line) -> sanitize(line, date)) - .filter((line) -> line.isPresent()) - .map((line) -> line.get()) - .collect(groupingByConcurrent(line -> line, counting()))); - lines.clear(); // not needed anymore + log.debug("Storing {}.", name); + Map<String, Long> retainedLines = new TreeMap<>(lineCounts); + lineCounts.clear(); // not needed anymore try { WebServerAccessLogPersistence walp = new WebServerAccessLogPersistence( @@ -187,8 +192,8 @@ public class SanitizeWeblogs extends CollecTorMain { .collect(Collectors.joining("\n", "", "\n")).getBytes(); }
- static Optional<String> sanitize(WebServerAccessLogLine logLine, - LocalDate date) { + static Optional<WebServerAccessLogLine> + sanitize(WebServerAccessLogLine logLine) { if (!logLine.isValid() || !(Method.GET == logLine.getMethod() || Method.HEAD == logLine.getMethod()) @@ -203,10 +208,13 @@ public class SanitizeWeblogs extends CollecTorMain { if (queryStart > 0) { logLine.setRequest(logLine.getRequest().substring(0, queryStart)); } - return Optional.of(logLine.toLogString()); + return Optional.of(logLine); }
LocalDate[] determineInterval(Set<LocalDate> dates) { + if (dates.isEmpty()) { // return the empty interval + return new LocalDate[]{LocalDate.MAX, LocalDate.MIN}; + } SortedSet<LocalDate> sorted = new TreeSet<>(); sorted.addAll(dates); if (this.limits) { @@ -214,7 +222,7 @@ public class SanitizeWeblogs extends CollecTorMain { sorted.remove(sorted.last()); } } - if (sorted.isEmpty()) { + if (sorted.isEmpty()) { // return the empty interval return new LocalDate[]{LocalDate.MAX, LocalDate.MIN}; } if (!this.limits) { @@ -224,18 +232,51 @@ public class SanitizeWeblogs extends CollecTorMain { return new LocalDate[]{sorted.first(), sorted.last()}; }
- private Stream<WebServerAccessLogLine> lineStream(LogMetadata metadata) { + private static final int LISTLIMIT = Integer.MAX_VALUE / 2; + + private Map<LocalDate, Map<String, Long>> + sanitzedLineStream(LogMetadata metadata) { log.debug("Processing file {}.", metadata.path); try (BufferedReader br = new BufferedReader(new InputStreamReader( metadata.fileType.decompress(Files.newInputStream(metadata.path))))) { - return br.lines() - .map((String line) -> WebServerAccessLogLine.makeLine(line)) - .collect(toList()).stream(); + List<List<WebServerAccessLogLine>> lists = new ArrayList<>(); + List<WebServerAccessLogLine> currentList = new ArrayList<>(); + lists.add(currentList); + String lineStr = br.readLine(); + int count = 0; + while (null != lineStr) { + WebServerAccessLogLine wsal = WebServerAccessLogLine.makeLine(lineStr); + if (wsal.isValid()) { + currentList.add(wsal); + count++; + } + if (count >= LISTLIMIT) { + currentList = new ArrayList<>(); + lists.add(currentList); + count = 0; + } + lineStr = br.readLine(); + } + br.close(); + return lists.parallelStream() + .map(list -> list.stream() + .map(line -> sanitize(line)) + .filter(line -> line.isPresent()) + .map(line -> line.get()) + .collect(groupingBy(WebServerAccessLogLine::getDate, + groupingBy(WebServerAccessLogLine::toLogString, counting())))) + .flatMap(map -> map.entrySet().stream()).parallel() + .collect(groupingByConcurrent(Map.Entry::getKey, + reducing(Collections.emptyMap(), Map.Entry::getValue, + (e1, e2) -> Stream.concat(e1.entrySet().stream(), + e2.entrySet().stream()).parallel() + .collect(groupingByConcurrent(Map.Entry::getKey, + summingLong(Map.Entry::getValue)))))); } catch (Exception ex) { log.debug("Skipping log-file {}.", metadata.path, ex); } - return Stream.empty(); + return Collections.emptyMap(); }
}