commit c01dfbdc4e5d817fb10fb45f79d8223853cdeac0 Author: iwakeh iwakeh@torproject.org Date: Fri Feb 16 09:05:46 2018 +0000
Enable handling of larger (> 2G) log files.
As log files can be compressed very efficiently log descriptor raw bytes contain the compressed bytes.
Added methods for accessing uncompressed log content, as well as stream based methods for decompression and compression in class FileType. Adapted all tests to the changes.
Implements task-25329. --- .../org/torproject/descriptor/LogDescriptor.java | 26 ++++++++++++-- .../torproject/descriptor/internal/FileType.java | 18 ++++++++++ .../descriptor/log/LogDescriptorImpl.java | 25 ++++++++------ .../descriptor/log/WebServerAccessLogImpl.java | 12 +++---- .../descriptor/log/LogDescriptorTest.java | 40 +++++++++++----------- .../descriptor/log/WebServerModuleTest.java | 4 ++- 6 files changed, 82 insertions(+), 43 deletions(-)
diff --git a/src/main/java/org/torproject/descriptor/LogDescriptor.java b/src/main/java/org/torproject/descriptor/LogDescriptor.java index 6a6bf84..826fcda 100644 --- a/src/main/java/org/torproject/descriptor/LogDescriptor.java +++ b/src/main/java/org/torproject/descriptor/LogDescriptor.java @@ -3,21 +3,34 @@
package org.torproject.descriptor;
+import java.io.InputStream; import java.util.List;
/** * Contains a log file. * * <p>Unlike other descriptors, logs can get very large and are typically stored - * on disk in compressed form. However, all access to log contents through this - * interface and its subinterfaces is made available in uncompressed form.</p> + * on disk in compressed form. Access to log contents through this + * interface and its subinterfaces is made available in compressed and + * decompressed form: + * <ul> + * <li>The raw descriptor bytes are compressed, because logs contain + * often redundant information that can achieve high compression rates. + * For example, a 500kB compressed log file might be deflated to 3GB.</li> + * <li>The uncompressed log contents can be accessed as a stream of bytes.</li> + * <li>A list of log lines (decompressed) can be retrieved.</li> + * </ul> + * </p> * * @since 2.2.0 */ public interface LogDescriptor extends Descriptor {
/** - * Returns the decompressed raw descriptor bytes of the log. + * Returns the raw compressed descriptor bytes of the log. + * + * <p>For access to the log's decompressed bytes of + * use method {@code decompressedByteStream}.</p> * * @since 2.2.0 */ @@ -25,6 +38,13 @@ public interface LogDescriptor extends Descriptor { public byte[] getRawDescriptorBytes();
/** + * Returns the decompressed raw descriptor bytes of the log as stream. + * + * @since 2.2.0 + */ + public InputStream decompressedByteStream() throws DescriptorParseException; + + /** * Returns annotations found in the log file, which may be an empty List if a * log format does not support adding annotations. * diff --git a/src/main/java/org/torproject/descriptor/internal/FileType.java b/src/main/java/org/torproject/descriptor/internal/FileType.java index 353f0bb..2c07df6 100644 --- a/src/main/java/org/torproject/descriptor/internal/FileType.java +++ b/src/main/java/org/torproject/descriptor/internal/FileType.java @@ -93,6 +93,24 @@ public enum FileType { }
/** + * Compresses the given InputStream and returns an OutputStream. + * + * @since 2.2.0 + */ + public OutputStream compress(OutputStream os) throws Exception { + return this.outputStream(os); + } + + /** + * Decompresses the given InputStream and returns an OutputStream. + * + * @since 2.2.0 + */ + public InputStream decompress(InputStream is) throws Exception { + return this.inputStream(is); + } + + /** * Decompresses the given bytes in memory and returns the decompressed bytes. * * @since 2.2.0 diff --git a/src/main/java/org/torproject/descriptor/log/LogDescriptorImpl.java b/src/main/java/org/torproject/descriptor/log/LogDescriptorImpl.java index 97854e4..3583d26 100644 --- a/src/main/java/org/torproject/descriptor/log/LogDescriptorImpl.java +++ b/src/main/java/org/torproject/descriptor/log/LogDescriptorImpl.java @@ -14,10 +14,10 @@ import org.slf4j.LoggerFactory; import java.io.BufferedReader; import java.io.ByteArrayInputStream; import java.io.File; +import java.io.InputStream; import java.io.InputStreamReader; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.regex.Matcher; @@ -76,8 +76,7 @@ public abstract class LogDescriptorImpl this.fileType = FileType.findType(mat.group(1).toUpperCase()); if (FileType.PLAIN == this.fileType) { this.fileType = defaultCompression; - } else { - this.logBytes = this.fileType.decompress(this.logBytes); + this.logBytes = this.fileType.compress(this.logBytes); } } catch (Exception ex) { throw new DescriptorParseException("Cannot parse file " @@ -86,10 +85,19 @@ public abstract class LogDescriptorImpl }
@Override + public InputStream decompressedByteStream() throws DescriptorParseException { + try { + return this.fileType.decompress(new ByteArrayInputStream(this.logBytes)); + } catch (Exception ex) { + throw new DescriptorParseException("Cannot provide deflated stream of " + + this.descriptorFile + ".", ex); + } + } + + @Override public void validate() throws DescriptorParseException { - try (BufferedReader br - = new BufferedReader(new InputStreamReader(new ByteArrayInputStream( - this.logBytes)))) { + try (BufferedReader br = new BufferedReader( + new InputStreamReader(decompressedByteStream()))) { this.unrecognizedLines.addAll(br.lines().parallel().filter((line) -> null != line && !line.isEmpty() && !validator.validate(line)) .limit(unrecognizedLinesLimit).collect(Collectors.toList())); @@ -114,11 +122,6 @@ public abstract class LogDescriptorImpl } }
- public static byte[] collectionToBytes(Collection<String> lines) { - return lines.stream().collect(Collectors.joining("\n", "", "\n")) - .getBytes(); - } - @Override public void setValidator(Validator validator) { this.validator = validator; diff --git a/src/main/java/org/torproject/descriptor/log/WebServerAccessLogImpl.java b/src/main/java/org/torproject/descriptor/log/WebServerAccessLogImpl.java index 7b56528..e48a262 100644 --- a/src/main/java/org/torproject/descriptor/log/WebServerAccessLogImpl.java +++ b/src/main/java/org/torproject/descriptor/log/WebServerAccessLogImpl.java @@ -11,12 +11,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory;
import java.io.BufferedReader; -import java.io.ByteArrayInputStream; import java.io.File; import java.io.InputStreamReader; import java.time.LocalDate; import java.time.format.DateTimeFormatter; -import java.util.Collection; import java.util.List; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -72,10 +70,9 @@ public class WebServerAccessLogImpl extends LogDescriptorImpl }
/** For internal use only. */ - public WebServerAccessLogImpl(Collection<String> lines, String filename, + public WebServerAccessLogImpl(byte[] bytes, String filename, boolean validate) throws DescriptorParseException { - this(LogDescriptorImpl.collectionToBytes(lines), new File(filename), - FileType.XZ, validate); + this(bytes, new File(filename), FileType.XZ, validate); }
private WebServerAccessLogImpl(byte[] logBytes, File file, @@ -135,9 +132,8 @@ public class WebServerAccessLogImpl extends LogDescriptorImpl @Override public List<WebServerAccessLog.Line> logLines() throws DescriptorParseException { - try (BufferedReader br - = new BufferedReader(new InputStreamReader(new ByteArrayInputStream( - this.getRawDescriptorBytes())))) { + try (BufferedReader br = new BufferedReader(new InputStreamReader( + this.decompressedByteStream()))) { return br.lines().map(line -> (WebServerAccessLog.Line) WebServerAccessLogLine.makeLine(line)) .filter(line -> line.isValid()).collect(Collectors.toList()); diff --git a/src/test/java/org/torproject/descriptor/log/LogDescriptorTest.java b/src/test/java/org/torproject/descriptor/log/LogDescriptorTest.java index a871791..67ba638 100644 --- a/src/test/java/org/torproject/descriptor/log/LogDescriptorTest.java +++ b/src/test/java/org/torproject/descriptor/log/LogDescriptorTest.java @@ -21,6 +21,7 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; import org.junit.runners.Parameterized.Parameters;
import java.io.File; @@ -46,46 +47,45 @@ public class LogDescriptorTest { protected DescriptorReader reader = DescriptorSourceFactory.createDescriptorReader();
- protected int size; - protected String[] pan; - protected Class<LogDescriptor> type; - protected boolean isDecompressionTest; - protected int lineCount; + @Parameter(0) + public boolean isDecompressedLog; + + @Parameter(1) + public int size; + + @Parameter(2) + public String[] pan; + + @Parameter(3) + public Class<LogDescriptor> type; + + @Parameter(4) + public int lineCount;
/** All types of data that can be encountered during sync. */ @Parameters public static Collection<Object[]> pathAndName() { return Arrays.asList(new Object[][] { - {Boolean.TRUE, 1878, new String[]{"meronense.torproject.org", + {Boolean.FALSE, 1878, new String[]{"meronense.torproject.org", "metrics.torproject.org_meronense.torproject.org_access.log" + "_20170530.gz", "metrics.torproject.org", "20170530", "gz"}, WebServerAccessLog.class, 24}, - {Boolean.FALSE, 1878, new String[]{"meronense.torproject.org", + {Boolean.TRUE, 1878, new String[]{"meronense.torproject.org", "xy.host.org_meronense.torproject.org_access.log_20170530.log", "metrics.torproject.org", "20170530", "xz"}, WebServerAccessLog.class, 24}, - {Boolean.TRUE, 70730, new String[]{"archeotrichon.torproject.org", + {Boolean.FALSE, 70730, new String[]{"archeotrichon.torproject.org", "archive.torproject.org_archeotrichon.torproject.org_access.log_" + "20151007.xz", "archive.torproject.org", "20151007", "xz"}, WebServerAccessLog.class, 655}, - {Boolean.TRUE, 0, new String[]{"dummy.host.net", + {Boolean.FALSE, 0, new String[]{"dummy.host.net", "nix.server.org_dummy.host.net_access.log_20111111.bz2", "nix.server.org", "20111111", "bz2"}, WebServerAccessLog.class, 0}}); }
- /** This constructor receives the above defined data for each run. */ - public LogDescriptorTest(boolean decompression, int size, String[] pan, - Class<LogDescriptor> type, int lineCount) { - this.pan = pan; - this.size = size; - this.type = type; - this.isDecompressionTest = decompression; - this.lineCount = lineCount; - } - /** Prepares the temporary folder and writes files to it for this test. */ private void createTemporaryFolderAndContents() throws IOException { this.indir = this.temp.newFolder(); @@ -157,7 +157,7 @@ public class LogDescriptorTest {
@Test public void testCompressionInvalid() throws Exception { - if (!isDecompressionTest) { + if (isDecompressedLog) { return; } assertEquals(1, this.reader.getParsedFiles().size()); diff --git a/src/test/java/org/torproject/descriptor/log/WebServerModuleTest.java b/src/test/java/org/torproject/descriptor/log/WebServerModuleTest.java index a11bc30..66c8f54 100644 --- a/src/test/java/org/torproject/descriptor/log/WebServerModuleTest.java +++ b/src/test/java/org/torproject/descriptor/log/WebServerModuleTest.java @@ -6,6 +6,7 @@ package org.torproject.descriptor.log; import static org.junit.Assert.assertEquals;
import org.torproject.descriptor.DescriptorParseException; +import org.torproject.descriptor.internal.FileType;
import org.hamcrest.Matchers;
@@ -104,7 +105,8 @@ public class WebServerModuleTest { WebServerAccessLogImpl wsal = new WebServerAccessLogImpl(logText.getBytes(), new File("vhost_host7_access.log_20170530")); assertEquals(wsal.getAnnotations().size(), 0); - assertEquals(logText, new String(wsal.getRawDescriptorBytes())); + assertEquals(logText, + new String(FileType.XZ.decompress(wsal.getRawDescriptorBytes()))); assertEquals("host7", wsal.getPhysicalHost()); assertEquals("vhost", wsal.getVirtualHost()); }