commit 38ce31132324da0a2e1c6529597bb245632ae8d3 Author: Karsten Loesing karsten.loesing@gmx.net Date: Fri Dec 30 17:01:07 2011 +0100
Implement the new sanitizing code. --- src/org/torproject/webstats/Main.java | 448 +++++++++++++++++++++++---------- 1 files changed, 310 insertions(+), 138 deletions(-)
diff --git a/src/org/torproject/webstats/Main.java b/src/org/torproject/webstats/Main.java index 628e4a3..1812567 100644 --- a/src/org/torproject/webstats/Main.java +++ b/src/org/torproject/webstats/Main.java @@ -12,8 +12,6 @@ import org.apache.commons.compress.compressors.gzip.*; * * TODO Document what exactly is sanitized and how that is done. * - * TODO Implement the following description. - * * The main operation is to parse Apache web log files from the in/ * directory and write sanitized web log files to the out/ directory. * Files in the in/ directory are assumed to never change and will be @@ -29,8 +27,9 @@ import org.apache.commons.compress.compressors.gzip.*; * files in the in/ directory. * - state/in-history.new is the file written in the current execution * that will replace state/in-history during the execution. - * - state/execution/ contains new or updated output files parsed in the - * current execution. + * - state/temp/ contains new or updated output files parsed in the + * current execution that are moved to out/, state/full/, or state/diff/ + * during the execution. * - state/out-history contains file names of previously written and * possibly deleted files in the out/ directory. * - state/out-history.new is the file written in the current execution @@ -44,31 +43,32 @@ import org.apache.commons.compress.compressors.gzip.*; * 1. Check that state/lock does not exists, or exit immediately. Add a * new state/lock file. * 2. Read the contents from state/in-history and state/out-history and - * the directory listings of out/, state/diff/, and state/update/ to - * memory. + * the directory listings of in/ to memory. * 3. For each file in in/: * a. Append the file name to state/in-history.new. * b. Check that the file name is not contained in state/in-history. * If it is, print out a warning and skip the file. - * c. Parse the file in chunks of 250,000 lines to reduce writes. + * c. Parse and sanitize the file in chunks of 250,000 lines to reduce + * writes. * d. When writing sanitized chunks to output files, for each output * file, check in the following order if there is already such a * file in - * i. state/execution/, + * i. state/temp/, * ii. state/full/, * iii. out/, or * iv. state/diff/. - * If there's such a file, merge the newly sanitized lines with - * that file and write the sorted result state/execution/. + * If there's such a file, merge the newly sanitized lines into + * that file and write the sorted result to state/temp/. * 4. Rename state/in-history to state/in-history.old and rename * state/in-history.new to state/in-history. Delete * state/in-history.old. * 5. Delete files in in/ that have been parsed in this execution. - * 6. For each file in state/execution/: + * 6. For each file in state/temp/: * a. Check if there's a corresponding line in state/out-history. If * so, check whether there is a file in state/full/ or out/. If * so, move the file to state/full/. Otherwise move the file to - * state/diff/, overwriting the file there if one exists. + * state/diff/. Print out a warning that there is a more recent + * file available. * b. If a. does not apply and the sanitized log is less than four (4) * days old, move the file to state/full/. * c. If b. does not apply, append a line to out-history.new and move @@ -82,26 +82,26 @@ import org.apache.commons.compress.compressors.gzip.*; * requires an operator to fix the state/ directory and make it work * again. IMPORTANT: DO NOT CHANGE ANYTHING IN THE state/ DIRECTORY * UNLESS YOU'RE CERTAIN WHAT YOU'RE DOING! The following situations can - * happen. It may make sense to try a solution in a non-productive - * setting first: + * happen. It may make sense to try a solution in a test environment + * first: * A. The file state/in-history.new does not exist and there are no files - * in state/execution/. The process died before step 3. Delete - * state/lock and re-run the program. + * in state/temp/. The process died before step 3. Delete state/lock + * and re-run the program. * B. The file state/in-history.new exists and there are files in - * state/execution/. The process died during steps 3 or 4. Delete - * all files in state/execution/. If state/in-history does not exist, - * but state/in-history.old does exist, rename the latter to the - * former. Delete state/lock and re-run the program. + * state/temp/. The process died during steps 3 or 4. Delete all + * files in state/temp/. If state/in-history does not exist but + * state/in-history.old does exist, rename the latter to the former. + * Delete state/lock and re-run the program. * C. The file state/in-history.new does not exist, but there are files - * in state/execution/. The process died after step 4. Run the steps - * 5 to 8 manually. Then re-run the program. + * in state/temp/. The process died after step 4. Run the steps 5 to + * 8 manually. Then re-run the program. * * Whenever logs are parsed that are 4 days old or older, there may * already be output files in out/ that cannot be modified anymore. The * operator may decide to manually overwrite files in out/ with the files * in state/full/ or state/diff/. IMPORTANT: ONLY OVERWRITE FILES IN out/ - * IF YOU'RE CERTAIN HOW TO FIX THE PROGRAM THAT PARSES ITS FILES. There - * are two possible situations: + * IF YOU'RE CERTAIN HOW TO FIX THE PROGRAM THAT PARSES THESE FILES. + * There are two possible situations: * A. There is a file in state/full/. This file is newer than the file * with the same name in out/ and contains everything from that file, * too. It's okay to overwrite the file in out/ with the file in @@ -109,98 +109,184 @@ import org.apache.commons.compress.compressors.gzip.*; * B. There is a file in state/diff/. The file in out/ didn't exist * anymore when parsing more log lines for it. The file that was in * out/ should be located and merged with the file in state/diff/. - * Afterwards, the file in state/diff/ should be deleted. + * Afterwards, the file in state/diff/ must be deleted. */ public class Main { - private static File historyFile = new File("hist"); - private static File inputDirectory = new File("in"); - private static File outputDirectory = new File("out"); - private static File tempDirectory = new File("temp"); + + /* Run the steps described above. */ public static void main(String[] args) { - readParseHistory(); - readInputFileList(); - parseInputFiles(); + checkAndCreateLockFile(); /* Step 1 */ + readHistoryFiles(); /* Step 2 */ + readInDirectoryListing(); + for (File inFile : inFiles) { /* Step 3 */ + appendToInHistory(inFile); + if (!checkFileName(inFile) || checkParsedBefore(inFile)) { + continue; + } + sanitizeInFile(inFile); + } + overwriteInHistoryFile(); /* Step 4 */ + deleteParsedInFiles(); /* Step 5 */ + for (String outputFileName : updatedOutputFiles) { /* Step 6 */ + moveOutputFile(outputFileName); + } + overwriteOutHistoryFile(); /* Step 7 */ + deleteLockFile(); /* Step 8 */ + } + + /* Define file and directory names. */ + private static File inDirectory = new File("in"); + private static File outDirectory = new File("out"); + private static File tempDirectory = new File("temp"); + private static File stateLockFile = new File("state/lock"); + private static File stateInHistoryFile = new File("state/in-history"); + private static File stateInHistoryNewFile = + new File("state/in-history.new"); + private static File stateInHistoryOldFile = + new File("state/in-history.old"); + private static File stateOutHistoryFile = new File("state/out-history"); + private static File stateOutHistoryNewFile = + new File("state/out-history.new"); + private static File stateOutHistoryOldFile = + new File("state/out-history.old"); + private static File stateDiffDirectory = new File("state/diff"); + private static File stateFullDirectory = new File("state/full"); + private static File stateTempDirectory = new File("state/temp"); + + /* Define data structures and helper classes. */ + private static Set<String> inHistoryFiles; + private static Set<String> inHistoryNewFiles; + private static Set<String> outHistoryFiles; + private static Set<File> inFiles; + private static Map<String, List<String>> cachedLinesPerOutputFile = + new HashMap<String, List<String>>(); + private static int cachedLines = 0; + private static Set<String> updatedOutputFiles = new HashSet<String>(); + private static SimpleDateFormat outputFileFormat = + new SimpleDateFormat("yyyy/MM/dd/"); + private static SimpleDateFormat logDateFormat = + new SimpleDateFormat("dd/MMM/yyyy"); + static { + outputFileFormat.setTimeZone(TimeZone.getTimeZone("UTC")); + logDateFormat.setTimeZone(TimeZone.getTimeZone("UTC")); } - private static Set<File> parsedFiles; - private static void readParseHistory() { - parsedFiles = new HashSet<File>(); - if (historyFile.exists()) { - try { + private static Pattern logLinePattern = Pattern.compile( + "(0.0.0.[01]) \S+ \S+ \[([\w/]{11})[\d:]+\s[+\-]\d{4}\] " + + ""GET (\S+?) (HTTP/[\d\.]+)" (\d{3}) ([\d-]+) "[^"]+" " + + ""[^"]+""); + private static long now = System.currentTimeMillis(); + + /* Implement the substeps. */ + private static void checkAndCreateLockFile() { + if (stateLockFile.exists()) { + System.err.println("Lock file '" + stateLockFile.getAbsolutePath() + + "' exists. This means that a previous run did not exit " + + "cleanly. Exiting."); + System.exit(1); + } + try { + stateLockFile.getParentFile().mkdirs(); + BufferedWriter bw = new BufferedWriter(new FileWriter( + stateLockFile)); + bw.close(); + } catch (IOException e) { + e.printStackTrace(); + System.err.println("Could not create lock file '" + + stateLockFile.getAbsolutePath() + "'. Exiting."); + System.exit(1); + } + } + private static void readHistoryFiles() { + inHistoryFiles = readAndCopyHistoryFile(stateInHistoryFile, + stateInHistoryNewFile); + inHistoryNewFiles = new HashSet<String>(inHistoryFiles); + outHistoryFiles = readAndCopyHistoryFile(stateOutHistoryFile, + stateOutHistoryNewFile); + } + private static Set<String> readAndCopyHistoryFile(File historyFile, + File historyNewFile) { + Set<String> result = new HashSet<String>(); + try { + BufferedWriter bw = new BufferedWriter(new FileWriter( + historyNewFile)); + if (historyFile.exists()) { BufferedReader br = new BufferedReader(new FileReader( historyFile)); String line; while ((line = br.readLine()) != null) { - parsedFiles.add(new File(line)); + result.add(line); + bw.write(line + "\n"); } br.close(); - } catch (IOException e) { - System.err.println("Could not read parse history file '" - + historyFile.getAbsolutePath() + "'. Exiting."); - System.exit(1); } + bw.close(); + } catch (IOException e) { + e.printStackTrace(); + System.err.println("Could not read parse history file '" + + historyFile.getAbsolutePath() + "'. Exiting."); + System.exit(1); } + return result; } - private static List<File> inputFiles = new ArrayList<File>(); - private static void readInputFileList() { - Stack<File> files = new Stack<File>(); - files.add(inputDirectory); - while (!files.isEmpty()) { - File file = files.pop(); - if (file.isDirectory()) { - files.addAll(Arrays.asList(file.listFiles())); - } else if (file.getName().contains("access.log") && - file.getName().endsWith(".gz")) { - inputFiles.add(file); - } - } + private static void readInDirectoryListing() { + inFiles = readDirectoryListing(inDirectory); } - private static void parseInputFiles() { - for (File inputFile : inputFiles) { - if (parsedFiles.contains(inputFile)) { - System.err.println("Parsed log file '" - + inputFile.getAbsolutePath() + "' before, either completely " - + "or partially. Not parsing it again, because that might " - + "mean adding duplicate log lines. Skipping file."); - continue; + private static Set<File> readDirectoryListing(File directory) { + Set<File> result = new HashSet<File>(); + if (directory.exists()) { + Stack<File> files = new Stack<File>(); + files.add(directory); + while (!files.isEmpty()) { + File file = files.pop(); + if (file.isDirectory()) { + files.addAll(Arrays.asList(file.listFiles())); + } else { + result.add(file); + } } - addToParseHistory(inputFile); - parseFile(inputFile); - deleteFile(inputFile); } + return result; + } + private static void appendToInHistory(File inFile) { + inHistoryNewFiles.add(inFile.getAbsolutePath()); + String line = inFile.getAbsolutePath(); + appendToHistoryFile(stateInHistoryNewFile, line); } - private static void addToParseHistory(File inputFile) { - parsedFiles.add(inputFile); - String line = inputFile.getAbsolutePath() + "\n"; + private static void appendToHistoryFile(File historyFile, String line) { try { - BufferedWriter bw = new BufferedWriter(new FileWriter( - historyFile, true)); + BufferedWriter bw = new BufferedWriter(new FileWriter(historyFile, + true)); bw.write(line + "\n"); bw.close(); } catch (IOException e) { + e.printStackTrace(); System.err.println("Could not append line '" + line + "' to parse " + "history file '" + historyFile.getAbsolutePath() + "'. " + "Exiting."); System.exit(1); } } - private static Map<String, List<String>> cachedLinesPerOutputFile = - new HashMap<String, List<String>>(); - private static int cachedLines = 0; - private static SimpleDateFormat outputFileFormat = - new SimpleDateFormat("yyyy/MM/dd/"); - private static SimpleDateFormat dateFormat = - new SimpleDateFormat("dd/MMM/yyyy"); - static { - outputFileFormat.setTimeZone(TimeZone.getTimeZone("UTC")); - dateFormat.setTimeZone(TimeZone.getTimeZone("UTC")); + private static boolean checkFileName(File inFile) { + return inFile.getName().contains("access.log") && + inFile.getName().endsWith(".gz"); } - private static void parseFile(File inputFile) { + private static boolean checkParsedBefore(File inFile) { + if (inHistoryFiles.contains(inFile.getAbsolutePath())) { + System.err.println("Parsed and subsequently deleted input file '" + + inFile.getAbsolutePath() + "' before. It shouldn't be " + + "there again. Skipping it now and not deleting it later."); + return true; + } else { + return false; + } + } + private static void sanitizeInFile(File inFile) { try { BufferedReader br = new BufferedReader(new InputStreamReader( - new GzipCompressorInputStream(new FileInputStream(inputFile)))); + new GzipCompressorInputStream(new FileInputStream(inFile)))); String line = null; - String outputFilenamePart = inputFile.getName().substring(0, - inputFile.getName().indexOf("access.log")) + "access.log"; + String outputFilenamePart = inFile.getName().substring(0, + inFile.getName().indexOf("access.log")) + "access.log"; while (true) { line = br.readLine(); if (line == null || cachedLines > 250000) { @@ -215,8 +301,10 @@ public class Main { } br.close(); } catch (IOException e) { + e.printStackTrace(); System.out.println("Error while parsing log file '" - + inputFile.getAbsolutePath() + "'. Skipping."); + + inFile.getAbsolutePath() + "'. Exiting."); + System.exit(1); } } private static void writeCachedLines() { @@ -225,67 +313,86 @@ public class Main { String outputFilename = e.getKey(); List<String> cachedLinesList = e.getValue(); Collections.sort(cachedLinesList); - writeOutputFile(outputFilename, cachedLinesList); + storeOutputFile(outputFilename, cachedLinesList); } cachedLinesPerOutputFile.clear(); cachedLines = 0; } - private static void writeOutputFile(String outputFilename, + private static void storeOutputFile(String outputFileName, List<String> cachedLinesList) { - File outputFile = new File(outputDirectory, outputFilename); - if (!outputFile.exists()) { - try { - outputFile.getParentFile().mkdirs(); - BufferedWriter bw = new BufferedWriter( - new FileWriter(outputFile)); - for (String cachedLine : cachedLinesList) { - bw.write(cachedLine + "\n"); - } - bw.close(); - } catch (IOException e) { - System.err.println("Could not write output file '" - + outputFile.getAbsolutePath() + "'. Exiting."); - System.exit(1); - } + updatedOutputFiles.add(outputFileName); + File stateTempFile = new File(stateTempDirectory, outputFileName); + File stateFullFile = new File(stateFullDirectory, outputFileName); + File outFile = new File(outDirectory, outputFileName); + File stateDiffFile = new File(stateDiffDirectory, outputFileName); + if (stateTempFile.exists()) { + File stateTempOldFile = new File(stateTempDirectory, + outputFileName + ".old"); + stateTempFile.renameTo(stateTempOldFile); + mergeOutputFile(stateTempOldFile, cachedLinesList, stateTempFile); + stateTempOldFile.delete(); + } else if (stateFullFile.exists()) { + mergeOutputFile(stateFullFile, cachedLinesList, stateTempFile); + } else if (outFile.exists()) { + mergeOutputFile(outFile, cachedLinesList, stateTempFile); + } else if (stateDiffFile.exists()) { + mergeOutputFile(stateDiffFile, cachedLinesList, stateTempFile); } else { - try { - BufferedReader br = new BufferedReader( - new FileReader(outputFile)); - String line; - File tempFile = new File(tempDirectory, outputFilename); - tempFile.getParentFile().mkdirs(); - BufferedWriter bw = new BufferedWriter( - new FileWriter(tempFile)); - int cachedLinesListPosition = 0, - totalCachedLines = cachedLinesList.size(); - while ((line = br.readLine()) != null) { - while (cachedLinesListPosition < totalCachedLines && - cachedLinesList.get(cachedLinesListPosition). - compareTo(line) <= 0) { - bw.write(cachedLinesList.get( - cachedLinesListPosition) + "\n"); - cachedLinesListPosition++; - } - bw.write(line + "\n"); + writeNewOutputFile(cachedLinesList, stateTempFile); + } + } + private static void mergeOutputFile(File oldOutputFile, + List<String> cachedLinesList, File newOutputFile) { + try { + BufferedReader br = new BufferedReader( + new FileReader(oldOutputFile)); + String line; + newOutputFile.getParentFile().mkdirs(); + BufferedWriter bw = new BufferedWriter( + new FileWriter(newOutputFile)); + int cachedLinesListPosition = 0, + totalCachedLines = cachedLinesList.size(); + while ((line = br.readLine()) != null) { + while (cachedLinesListPosition < totalCachedLines && + cachedLinesList.get(cachedLinesListPosition). + compareTo(line) <= 0) { + bw.write(cachedLinesList.get( + cachedLinesListPosition) + "\n"); + cachedLinesListPosition++; } - br.close(); - bw.close(); - outputFile.delete(); - tempFile.renameTo(outputFile); - } catch (IOException e) { - System.err.println("Could not updated output file '" - + outputFile.getAbsolutePath() + "'. Exiting."); - System.exit(1); + bw.write(line + "\n"); } + br.close(); + bw.close(); + } catch (IOException e) { + e.printStackTrace(); + System.err.println("Could not merge old output file '" + + oldOutputFile.getAbsolutePath() + "' with new log lines and " + + "write it to '" + newOutputFile.getAbsolutePath() + + "'. Exiting."); + System.exit(1); + } + } + private static void writeNewOutputFile(List<String> cachedLinesList, + File outputFile) { + try { + outputFile.getParentFile().mkdirs(); + BufferedWriter bw = new BufferedWriter( + new FileWriter(outputFile)); + for (String cachedLine : cachedLinesList) { + bw.write(cachedLine + "\n"); + } + bw.close(); + } catch (IOException e) { + e.printStackTrace(); + System.err.println("Could not write output file '" + + outputFile.getAbsolutePath() + "'. Exiting."); + System.exit(1); } } - private static Pattern pattern = Pattern.compile( - "(0.0.0.[01]) \S+ \S+ \[([\w/]{11})[\d:]+\s[+\-]\d{4}\] " - + ""GET (\S+?) (HTTP/[\d\.]+)" (\d{3}) ([\d-]+) "[^"]+" " - + ""[^"]+""); private static boolean parseLine(String line, String outputFilenamePart) { - Matcher matcher = pattern.matcher(line); + Matcher matcher = logLinePattern.matcher(line); if (!matcher.matches()) { return true; } @@ -306,7 +413,7 @@ public class Main { + "" " + statusCode + " " + returnedBytes + " "-" "-""; String outputFilename = null; try { - outputFilename = outputFileFormat.format(dateFormat.parse( + outputFilename = outputFileFormat.format(logDateFormat.parse( date).getTime()) + outputFilenamePart; } catch (ParseException e) { System.out.println("Error parsing date. Aborting to parse " @@ -322,8 +429,73 @@ public class Main { cachedLines++; return true; } - private static void deleteFile(File inputFile) { - inputFile.delete(); + private static void overwriteInHistoryFile() { + stateInHistoryFile.renameTo(stateInHistoryOldFile); + stateInHistoryNewFile.renameTo(stateInHistoryFile); + stateInHistoryOldFile.delete(); + } + private static void deleteParsedInFiles() { + Set<String> filesToDelete = new HashSet<String>(); + filesToDelete.addAll(inHistoryNewFiles); + filesToDelete.removeAll(inHistoryFiles); + for (String file : filesToDelete) { + new File(file).delete(); + } + } + private static void moveOutputFile(String outputFileName) { + File outFile = new File(outDirectory, outputFileName); + File stateTempFile = new File(stateTempDirectory, outputFileName); + File stateFullFile = new File(stateFullDirectory, outputFileName); + File stateDiffFile = new File(stateDiffDirectory, outputFileName); + long ageInDays = -1L; + try { + ageInDays = (now + - outputFileFormat.parse(outputFileName.substring(0, + outputFileName.lastIndexOf("/") + 1)).getTime()) + / (24L * 60L * 60L * 1000L); + } catch (ParseException e) { + e.printStackTrace(); + System.err.println("Could not parse timestamp from '" + + outputFileName + "'. Exiting."); + System.exit(1); + } + if (outHistoryFiles.contains(outFile.getAbsolutePath())) { + if (outFile.exists() || stateFullFile.exists()) { + System.out.println("Could not write to output file '" + + outFile.getAbsolutePath() + "', because that file was " + + "written " + (outFile.exists() ? "" : "and deleted ") + + "before and we're not supposed to change it anymore. The " + + "updated file that could replace the output file is '" + + stateFullFile.getAbsolutePath() + "'."); + stateFullFile.getParentFile().mkdirs(); + stateTempFile.renameTo(stateFullFile); + } else { + System.out.println("Could not write to output file '" + + outFile.getAbsolutePath() + "', because that file was " + + "written and deleted before and we're not supposed to " + + "change it anymore (even if we could). The file " + + "containing the new lines only is '" + + stateDiffFile.getAbsolutePath() + "'."); + stateDiffFile.getParentFile().mkdirs(); + stateTempFile.renameTo(stateDiffFile); + } + } else if (ageInDays < 4L) { + stateFullFile.getParentFile().mkdirs(); + stateTempFile.renameTo(stateFullFile); + } else { + outFile.getParentFile().mkdirs(); + String line = outFile.getAbsolutePath(); + appendToHistoryFile(stateOutHistoryNewFile, line); + stateTempFile.renameTo(outFile); + } + } + private static void overwriteOutHistoryFile() { + stateOutHistoryFile.renameTo(stateOutHistoryOldFile); + stateOutHistoryNewFile.renameTo(stateOutHistoryFile); + stateOutHistoryOldFile.delete(); + } + private static void deleteLockFile() { + stateLockFile.delete(); } }