commit 859476ecaec2164e0d84bbba4377da11c90034b2 Author: Karsten Loesing karsten.loesing@gmx.net Date: Sat Nov 23 17:13:22 2019 +0100
Remove dependency on metrics-lib's log package (1/4).
- Copy types from metrics-lib to this code base. - Update package and import statements. - Copy remaining parts of metrics-lib's FileType. --- .../persist/WebServerAccessLogPersistence.java | 2 +- .../metrics/collector/webstats/FileType.java | 44 +++++ .../collector/webstats/InternalLogDescriptor.java | 63 +++++++ .../webstats/InternalWebServerAccessLog.java | 17 ++ .../collector/webstats/LogDescriptorImpl.java | 166 +++++++++++++++++++ .../metrics/collector/webstats/LogMetadata.java | 2 +- .../collector/webstats/SanitizeWeblogs.java | 4 - .../collector/webstats/WebServerAccessLogImpl.java | 173 ++++++++++++++++++++ .../collector/webstats/WebServerAccessLogLine.java | 182 +++++++++++++++++++++ 9 files changed, 647 insertions(+), 6 deletions(-)
diff --git a/src/main/java/org/torproject/metrics/collector/persist/WebServerAccessLogPersistence.java b/src/main/java/org/torproject/metrics/collector/persist/WebServerAccessLogPersistence.java index 848fa2e..0f862b4 100644 --- a/src/main/java/org/torproject/metrics/collector/persist/WebServerAccessLogPersistence.java +++ b/src/main/java/org/torproject/metrics/collector/persist/WebServerAccessLogPersistence.java @@ -4,8 +4,8 @@ package org.torproject.metrics.collector.persist;
import org.torproject.descriptor.WebServerAccessLog; -import org.torproject.descriptor.log.InternalWebServerAccessLog; import org.torproject.metrics.collector.webstats.FileType; +import org.torproject.metrics.collector.webstats.InternalWebServerAccessLog;
import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/src/main/java/org/torproject/metrics/collector/webstats/FileType.java b/src/main/java/org/torproject/metrics/collector/webstats/FileType.java index 79dcf21..15b1e00 100644 --- a/src/main/java/org/torproject/metrics/collector/webstats/FileType.java +++ b/src/main/java/org/torproject/metrics/collector/webstats/FileType.java @@ -12,6 +12,8 @@ import org.apache.commons.compress.compressors.xz.XZCompressorOutputStream;
import java.io.BufferedInputStream; import java.io.BufferedOutputStream; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.InputStream; import java.io.OutputStream;
@@ -69,10 +71,52 @@ public enum FileType { }
/** + * Compresses the given bytes in memory and returns the compressed bytes. + */ + public byte[] compress(byte[] bytes) throws Exception { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try (OutputStream os = this.outputStream(baos)) { + os.write(bytes); + os.flush(); + } + return baos.toByteArray(); + } + + /** + * Compresses the given InputStream and returns an OutputStream. + */ + public OutputStream compress(OutputStream os) throws Exception { + return this.outputStream(os); + } + + /** * Decompresses the given InputStream and returns an OutputStream. */ public InputStream decompress(InputStream is) throws Exception { return this.inputStream(is); } + + /** + * Decompresses the given bytes in memory and returns the decompressed bytes. + * + * @since 2.2.0 + */ + public byte[] decompress(byte[] bytes) throws Exception { + if (0 == bytes.length) { + return bytes; + } + try (InputStream is + = this.inputStream(new ByteArrayInputStream(bytes)); + ByteArrayOutputStream baos = new ByteArrayOutputStream()) { + int readByte = is.read(); + while (readByte > 0) { + baos.write(readByte); + readByte = is.read(); + } + baos.flush(); + return baos.toByteArray(); + } + } + }
diff --git a/src/main/java/org/torproject/metrics/collector/webstats/InternalLogDescriptor.java b/src/main/java/org/torproject/metrics/collector/webstats/InternalLogDescriptor.java new file mode 100644 index 0000000..3a8a1f0 --- /dev/null +++ b/src/main/java/org/torproject/metrics/collector/webstats/InternalLogDescriptor.java @@ -0,0 +1,63 @@ +/* Copyright 2017--2018 The Tor Project + * See LICENSE for licensing information */ + +package org.torproject.metrics.collector.webstats; + +import org.torproject.descriptor.DescriptorParseException; +import org.torproject.descriptor.LogDescriptor; + +/** + * This interface provides methods for internal use only. + * + * @since 2.2.0 + */ +public interface InternalLogDescriptor extends LogDescriptor { + + /** Logfile name parts separator. */ + String SEP = "_"; + + /** + * Validate log lines. + * + * @since 2.2.0 + */ + void validate() throws DescriptorParseException; + + /** + * Set the {@code Validator} that will perform the validation on log + * lines. + * + * <p>Usually set by the implementing class.</p> + * + * @since 2.2.0 + */ + void setValidator(Validator validator); + + /** + * Set the descriptor's bytes. + * + * @since 2.2.0 + */ + void setRawDescriptorBytes(byte[] bytes); + + /** Return the descriptor's preferred compression. */ + String getCompressionType(); + + /** + * Provides a single function for validating a single log line. + * + * @since 2.2.0 + */ + interface Validator { + + /** + * Verifies a log line. + * + * @since 2.2.0 + */ + boolean validate(String line); + + } + +} + diff --git a/src/main/java/org/torproject/metrics/collector/webstats/InternalWebServerAccessLog.java b/src/main/java/org/torproject/metrics/collector/webstats/InternalWebServerAccessLog.java new file mode 100644 index 0000000..817b8d5 --- /dev/null +++ b/src/main/java/org/torproject/metrics/collector/webstats/InternalWebServerAccessLog.java @@ -0,0 +1,17 @@ +/* Copyright 2018 The Tor Project + * See LICENSE for licensing information */ + +package org.torproject.metrics.collector.webstats; + +/** + * This interface provides methods for internal use only. + * + * @since 2.2.0 + */ +public interface InternalWebServerAccessLog extends InternalLogDescriptor { + + /** The log's name should include this string. */ + String MARKER = "access.log"; + +} + diff --git a/src/main/java/org/torproject/metrics/collector/webstats/LogDescriptorImpl.java b/src/main/java/org/torproject/metrics/collector/webstats/LogDescriptorImpl.java new file mode 100644 index 0000000..d13c85a --- /dev/null +++ b/src/main/java/org/torproject/metrics/collector/webstats/LogDescriptorImpl.java @@ -0,0 +1,166 @@ +/* Copyright 2017--2018 The Tor Project + * See LICENSE for licensing information */ + +package org.torproject.metrics.collector.webstats; + +import org.torproject.descriptor.Descriptor; +import org.torproject.descriptor.DescriptorParseException; +import org.torproject.descriptor.LogDescriptor; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +/** + * Base class for log descriptors. + * + * @since 2.2.0 + */ +public abstract class LogDescriptorImpl + implements LogDescriptor, InternalLogDescriptor { + + /** The log's file name should contain this string. */ + public static final String MARKER = ".log"; + + private static final int unrecognizedLinesLimit = 3; + + private static final Logger log + = LoggerFactory.getLogger(LogDescriptorImpl.class); + + private static Pattern filenamePattern = Pattern.compile( + "(?:\S*)" + MARKER + SEP + "(?:[0-9a-zA-Z]*)(?:\.?)([a-zA-Z2]*)"); + + private final File descriptorFile; + + /** Byte array for plain, i.e. uncompressed, log data. */ + private byte[] logBytes; + + private FileType fileType; + + private List<String> unrecognizedLines = new ArrayList<>(); + + private Validator validator = (String line) -> true; + + /** + * This constructor performs basic operations on the given bytes. + * + * <p>An unknown compression type (see {@link #getCompressionType}) + * is interpreted as missing compression. In this case the bytes + * will be compressed to the given compression type.</p> + * + * @since 2.2.0 + */ + protected LogDescriptorImpl(byte[] logBytes, File descriptorFile, + String logName, FileType defaultCompression) + throws DescriptorParseException { + this.logBytes = logBytes; + this.descriptorFile = descriptorFile; + try { + Matcher mat = filenamePattern.matcher(logName); + if (!mat.find()) { + throw new DescriptorParseException( + "Log file name doesn't comply to standard: " + logName); + } + this.fileType = FileType.findType(mat.group(1).toUpperCase()); + if (FileType.PLAIN == this.fileType) { + this.fileType = defaultCompression; + this.logBytes = this.fileType.compress(this.logBytes); + } + } catch (Exception ex) { + throw new DescriptorParseException("Cannot parse file " + + logName + " from file " + descriptorFile.getName(), ex); + } + } + + @Override + public InputStream decompressedByteStream() throws DescriptorParseException { + try { + return this.fileType.decompress(new ByteArrayInputStream(this.logBytes)); + } catch (Exception ex) { + throw new DescriptorParseException("Cannot provide deflated stream of " + + this.descriptorFile + ".", ex); + } + } + + @Override + public void validate() throws DescriptorParseException { + try (BufferedReader br = new BufferedReader( + new InputStreamReader(decompressedByteStream()))) { + this.unrecognizedLines.addAll(br.lines().parallel().filter((line) + -> null != line && !line.isEmpty() && !validator.validate(line)) + .limit(unrecognizedLinesLimit).collect(Collectors.toList())); + } catch (Exception ex) { + throw new DescriptorParseException("Cannot validate log lines.", ex); + } + } + + /** + * Assemble a LogDescriptor. + * + * @since 2.2.0 + */ + public static List<Descriptor> parse(byte[] logBytes, + File descriptorFile, String logName) throws DescriptorParseException { + if (logName.contains(InternalWebServerAccessLog.MARKER)) { + return Arrays.asList(new Descriptor[]{ + new WebServerAccessLogImpl(logBytes, descriptorFile, logName)}); + } else { + throw new DescriptorParseException("Cannot parse file " + logName + + " from file " + descriptorFile.getName()); + } + } + + @Override + public void setValidator(Validator validator) { + this.validator = validator; + } + + @Override + public String getCompressionType() { + return this.fileType.name().toLowerCase(); + } + + @Override + public byte[] getRawDescriptorBytes() { + return this.logBytes; + } + + @Override + public void setRawDescriptorBytes(byte[] bytes) { + this.logBytes = bytes; + } + + @Override + public int getRawDescriptorLength() { + return this.logBytes.length; + } + + @Override + public List<String> getAnnotations() { + return Collections.emptyList(); + } + + @Override + public List<String> getUnrecognizedLines() { + return this.unrecognizedLines; + } + + @Override + public File getDescriptorFile() { + return descriptorFile; + } + +} + diff --git a/src/main/java/org/torproject/metrics/collector/webstats/LogMetadata.java b/src/main/java/org/torproject/metrics/collector/webstats/LogMetadata.java index b04fcb5..879e8d7 100644 --- a/src/main/java/org/torproject/metrics/collector/webstats/LogMetadata.java +++ b/src/main/java/org/torproject/metrics/collector/webstats/LogMetadata.java @@ -3,7 +3,7 @@
package org.torproject.metrics.collector.webstats;
-import static org.torproject.descriptor.log.WebServerAccessLogImpl.MARKER; +import static org.torproject.metrics.collector.webstats.WebServerAccessLogImpl.MARKER;
import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/src/main/java/org/torproject/metrics/collector/webstats/SanitizeWeblogs.java b/src/main/java/org/torproject/metrics/collector/webstats/SanitizeWeblogs.java index 5c001b6..74bd741 100644 --- a/src/main/java/org/torproject/metrics/collector/webstats/SanitizeWeblogs.java +++ b/src/main/java/org/torproject/metrics/collector/webstats/SanitizeWeblogs.java @@ -12,10 +12,6 @@ import static java.util.stream.Collectors.summingLong; import org.torproject.descriptor.DescriptorParseException; import org.torproject.descriptor.Method; import org.torproject.descriptor.WebServerAccessLog; -import org.torproject.descriptor.log.InternalLogDescriptor; -import org.torproject.descriptor.log.InternalWebServerAccessLog; -import org.torproject.descriptor.log.WebServerAccessLogImpl; -import org.torproject.descriptor.log.WebServerAccessLogLine; import org.torproject.metrics.collector.conf.Configuration; import org.torproject.metrics.collector.conf.Key; import org.torproject.metrics.collector.conf.SourceType; diff --git a/src/main/java/org/torproject/metrics/collector/webstats/WebServerAccessLogImpl.java b/src/main/java/org/torproject/metrics/collector/webstats/WebServerAccessLogImpl.java new file mode 100644 index 0000000..2ca608a --- /dev/null +++ b/src/main/java/org/torproject/metrics/collector/webstats/WebServerAccessLogImpl.java @@ -0,0 +1,173 @@ +/* Copyright 2017--2018 The Tor Project + * See LICENSE for licensing information */ + +package org.torproject.metrics.collector.webstats; + +import org.torproject.descriptor.DescriptorParseException; +import org.torproject.descriptor.WebServerAccessLog; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.File; +import java.io.InputStreamReader; +import java.nio.file.Paths; +import java.time.LocalDate; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Stream; + +/** + * Implementation of web server access log descriptors. + * + * <p>Defines sanitization and validation for web server access logs.</p> + * + * @since 2.2.0 + */ +public class WebServerAccessLogImpl extends LogDescriptorImpl + implements InternalWebServerAccessLog, WebServerAccessLog { + + private static final Logger log + = LoggerFactory.getLogger(WebServerAccessLogImpl.class); + + /** The log's name should include this string. */ + public static final String MARKER = InternalWebServerAccessLog.MARKER; + + /** The mandatory web server log descriptor file name pattern. */ + public static final Pattern filenamePattern + = Pattern.compile("(\S*)" + SEP + "(\S*)" + SEP + "" + MARKER + + SEP + "(\d*)(?:\.?)([a-zA-Z]*)"); + + private final String physicalHost; + + private final String virtualHost; + + private final LocalDate logDate; + + private boolean validate = true; + + /** + * Creates a WebServerAccessLog from the given bytes and filename. + * + * <p>The given bytes are read, whereas the file is not read.</p> + * + * <p>The path of the given file has to be compliant to the following + * naming pattern + * {@code + * <virtualHost>-<physicalHost>-access.log-<yyyymmdd>.<compression>}, + * where an unknown compression type (see {@link #getCompressionType}) + * is interpreted as missing compression. In this case the bytes + * will be compressed to the default compression type. + * The immediate parent name is taken to be the physical host collecting the + * logs.</p> + */ + protected WebServerAccessLogImpl(byte[] logBytes, File file, String logName) + throws DescriptorParseException { + this(logBytes, file, logName, FileType.XZ); + } + + /** For internal use only. */ + public WebServerAccessLogImpl(byte[] bytes, String filename, + boolean validate) throws DescriptorParseException { + this(bytes, null, filename, FileType.XZ, validate); + } + + /** For internal use only. */ + public WebServerAccessLogImpl(byte[] bytes, File sourceFile, String filename, + boolean validate) throws DescriptorParseException { + this(bytes, sourceFile, filename, FileType.XZ, validate); + } + + private WebServerAccessLogImpl(byte[] logBytes, File file, String logName, + FileType defaultCompression) throws DescriptorParseException { + this(logBytes, file, logName, defaultCompression, true); + } + + private WebServerAccessLogImpl(byte[] logBytes, File file, String logName, + FileType defaultCompression, boolean validate) + throws DescriptorParseException { + super(logBytes, file, logName, defaultCompression); + try { + String fn = Paths.get(logName).getFileName().toString(); + Matcher mat = filenamePattern.matcher(fn); + if (!mat.find()) { + throw new DescriptorParseException( + "WebServerAccessLog file name doesn't comply to standard: " + fn); + } + this.virtualHost = mat.group(1); + this.physicalHost = mat.group(2); + if (null == this.virtualHost || null == this.physicalHost + || this.virtualHost.isEmpty() || this.physicalHost.isEmpty()) { + throw new DescriptorParseException( + "WebServerAccessLog file name doesn't comply to standard: " + fn); + } + String ymd = mat.group(3); + this.logDate = LocalDate.parse(ymd, DateTimeFormatter.BASIC_ISO_DATE); + this.setValidator((line) + -> WebServerAccessLogLine.makeLine(line).isValid()); + if (validate) { + this.validate(); + } + } catch (DescriptorParseException dpe) { + throw dpe; // escalate + } catch (Exception pe) { + throw new DescriptorParseException( + "Cannot parse WebServerAccessLog file: " + logName, pe); + } + } + + @Override + public String getPhysicalHost() { + return this.physicalHost; + } + + @Override + public String getVirtualHost() { + return this.virtualHost; + } + + @Override + public LocalDate getLogDate() { + return this.logDate; + } + + private static final int LISTLIMIT = Integer.MAX_VALUE / 2; + + /** Returns a stream of all valid log lines. */ + @Override + public Stream<WebServerAccessLog.Line> logLines() + throws DescriptorParseException { + try (BufferedReader br = new BufferedReader(new InputStreamReader( + this.decompressedByteStream()))) { + List<List<WebServerAccessLogLine>> lists = new ArrayList<>(); + List<WebServerAccessLogLine> currentList = new ArrayList<>(); + lists.add(currentList); + String lineStr = br.readLine(); + int count = 0; + while (null != lineStr) { + WebServerAccessLogLine wsal = WebServerAccessLogLine.makeLine(lineStr); + if (wsal.isValid()) { + currentList.add(wsal); + count++; + } + if (count >= LISTLIMIT) { + currentList = new ArrayList<>(); + lists.add(currentList); + count = 0; + } + lineStr = br.readLine(); + } + br.close(); + return lists.stream().flatMap(Collection::stream); + } catch (Exception ex) { + throw new DescriptorParseException("Cannot retrieve log lines.", ex); + } + } + +} + diff --git a/src/main/java/org/torproject/metrics/collector/webstats/WebServerAccessLogLine.java b/src/main/java/org/torproject/metrics/collector/webstats/WebServerAccessLogLine.java new file mode 100644 index 0000000..bc03c0a --- /dev/null +++ b/src/main/java/org/torproject/metrics/collector/webstats/WebServerAccessLogLine.java @@ -0,0 +1,182 @@ +/* Copyright 2018 The Tor Project + * See LICENSE for licensing information */ + +package org.torproject.metrics.collector.webstats; + +import org.torproject.descriptor.Method; +import org.torproject.descriptor.WebServerAccessLog; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.LocalDate; +import java.time.ZoneOffset; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class WebServerAccessLogLine implements WebServerAccessLog.Line { + + private static final Logger log = LoggerFactory + .getLogger(WebServerAccessLogLine.class); + + private static final String DATE_PATTERN = "dd/MMM/yyyy"; + private static final String DASH = "-"; + + private static final DateTimeFormatter dateTimeFormatter + = DateTimeFormatter.ofPattern(DATE_PATTERN + ":HH:mm:ss xxxx"); + + private static Pattern logLinePattern = Pattern.compile( + "^((?:\d{1,3}\.){3}\d{1,3}) (\S+) (\S+) " + + "\[([\w/]+)([\w:]+)(\s[+\-]\d{4})\] " + + ""([A-Z]+) ([^"]+) ([A-Z]+/\d\.\d)" " + + "(\d{3}) (\d+|-)(.*)"); + + private static Map<String, String> ipMap + = Collections.synchronizedMap(new HashMap<>()); + private static Map<LocalDate, LocalDate> dateMap + = Collections.synchronizedMap(new HashMap<>(500)); + private static Map<String, String> protocolMap + = Collections.synchronizedMap(new HashMap<>()); + private static Map<String, String> requestMap + = Collections.synchronizedMap(new HashMap<>(50_000)); + + private String ip; + private int response; + private String request; + private Method method; + private LocalDate date; + private int size = -1; + private boolean valid = false; + private String protocol; + + /** Returns a log line string. Possibly empty. */ + @Override + public String toLogString() { + if (!this.valid) { + return ""; + } + return toString(); + } + + @Override + public String toString() { + return String.format("%s - - [%s:00:00:00 +0000] "%s %s %s" %d %s", + this.ip, this.getDateString(), this.method.name(), this.request, + this.protocol, this.response, this.size < 0 ? DASH : this.size); + } + + /** Only used internally during sanitization. + * Returns the string of the date using 'dd/MMM/yyyy' format. */ + public String getDateString() { + return this.date.format(DateTimeFormatter.ofPattern(DATE_PATTERN)); + } + + @Override + public String getIp() { + return this.ip; + } + + /** Only used internally during sanitization. */ + public void setIp(String ip) { + this.ip = fromMap(ip, ipMap); + } + + @Override + public Method getMethod() { + return this.method; + } + + @Override + public String getProtocol() { + return this.protocol; + } + + @Override + public String getRequest() { + return this.request; + } + + @Override + public Optional<Integer> getSize() { + return this.size < 0 ? Optional.empty() : Optional.of(this.size); + } + + @Override + public int getResponse() { + return this.response; + } + + /** Only used internally during sanitization. */ + public void setRequest(String request) { + this.request = fromMap(request, requestMap); + } + + @Override + public LocalDate getDate() { + return this.date; + } + + @Override + public boolean isValid() { + return this.valid; + } + + /** Creates a Line from a string. */ + public static WebServerAccessLogLine makeLine(String line) { + WebServerAccessLogLine res = new WebServerAccessLogLine(); + try { + Matcher mat = logLinePattern.matcher(line); + if (mat.find()) { + res.response = Integer.valueOf(mat.group(10)); + res.method = Method.valueOf(mat.group(7)); + String dateTimeString = mat.group(4) + mat.group(5) + mat.group(6); + res.date = fromMap(ZonedDateTime.parse(dateTimeString, + dateTimeFormatter).withZoneSameInstant(ZoneOffset.UTC) + .toLocalDate(), dateMap); + res.ip = fromMap(mat.group(1), ipMap); + res.request = fromMap(mat.group(8), requestMap); + res.protocol = fromMap(mat.group(9), protocolMap); + if (DASH.equals(mat.group(11))) { + res.size = -1; + } else { + res.size = Integer.valueOf(mat.group(11)); + } + res.valid = true; + } + } catch (Throwable th) { + log.debug("Unmatchable line: '{}'.", line, th); + return new WebServerAccessLogLine(); + } + return res; + } + + private static <T> T fromMap(T val, Map<T, T> map) { + synchronized (map) { + map.putIfAbsent(Objects.requireNonNull(val), val); + return map.get(val); + } + } + + @Override + public boolean equals(Object other) { + if (other instanceof WebServerAccessLogLine) { + return this.toLogString() + .equals(((WebServerAccessLogLine)other).toLogString()); + } + return false; + } + + @Override + public int hashCode() { + return this.toLogString().hashCode(); + } + +} +