[or-cvs] [ernie/master] Extract stats about versions, platforms, and bandwidth from relay descriptors.

karsten at torproject.org karsten at torproject.org
Tue Mar 2 21:29:16 UTC 2010


Author: Karsten Loesing <karsten.loesing at gmx.net>
Date: Tue, 2 Mar 2010 22:27:50 +0100
Subject: Extract stats about versions, platforms, and bandwidth from relay descriptors.
Commit: 2b2c921db0597484af940ac219f33c3a9b64b1f0

---
 src/ArchiveReader.java                    |   14 +-
 src/CachedRelayDescriptorReader.java      |   11 +-
 src/Main.java                             |    8 +-
 src/RelayDescriptorDownloader.java        |    8 +-
 src/RelayDescriptorParser.java            |   52 +++-
 src/ServerDescriptorStatsFileHandler.java |  497 +++++++++++++++++++++++++++++
 6 files changed, 564 insertions(+), 26 deletions(-)
 create mode 100644 src/ServerDescriptorStatsFileHandler.java

diff --git a/src/ArchiveReader.java b/src/ArchiveReader.java
index c607f67..346d870 100644
--- a/src/ArchiveReader.java
+++ b/src/ArchiveReader.java
@@ -24,9 +24,17 @@ public class ArchiveReader {
         } else {
           if (rdp != null) {
             try {
-              BufferedReader br = new BufferedReader(new FileReader(pop));
-              rdp.parse(br);
-              br.close();
+              BufferedInputStream bis =
+                  new BufferedInputStream(new FileInputStream(pop));
+              ByteArrayOutputStream baos = new ByteArrayOutputStream();
+              int len;
+              byte[] data = new byte[1024];
+              while ((len = bis.read(data, 0, 1024)) >= 0) {
+                baos.write(data, 0, len);
+              }
+              bis.close();
+              byte[] allData = baos.toByteArray();
+              rdp.parse(allData);
             } catch (IOException e) {
               problems.add(pop);
               if (problems.size() > 3) {
diff --git a/src/CachedRelayDescriptorReader.java b/src/CachedRelayDescriptorReader.java
index 10e207d..4288abf 100644
--- a/src/CachedRelayDescriptorReader.java
+++ b/src/CachedRelayDescriptorReader.java
@@ -32,16 +32,12 @@ public class CachedRelayDescriptorReader {
           bis.close();
           byte[] allData = baos.toByteArray();
           if (f.getName().equals("cached-consensus")) {
-            BufferedReader br = new BufferedReader(new FileReader(f));
             if (aw != null) {
               aw.store(allData);
             }
-            br.close();
-            br = new BufferedReader(new FileReader(f));
             if (rdp != null) {
-              rdp.parse(br);
+              rdp.parse(allData);
             }
-            br.close();
           } else if (f.getName().startsWith("cached-descriptors") ||
               f.getName().startsWith("cached-extrainfo")) {
             String ascii = new String(allData, "US-ASCII");
@@ -76,10 +72,7 @@ public class CachedRelayDescriptorReader {
                 aw.store(descBytes);
               }
               if (rdp != null) {
-                BufferedReader storeBr = new BufferedReader(
-                    new StringReader(desc));
-                rdp.parse(storeBr);
-                storeBr.close();
+                rdp.parse(descBytes);
               }
             }
             logger.fine("Finished reading cacheddesc/ directory.");
diff --git a/src/Main.java b/src/Main.java
index 3c02697..12c91c4 100644
--- a/src/Main.java
+++ b/src/Main.java
@@ -35,12 +35,14 @@ public class Main {
         new BridgeStatsFileHandler(countries) : null;
     DirreqStatsFileHandler dsfh = config.getWriteDirreqStats() ?
         new DirreqStatsFileHandler(countries) : null;
+    ServerDescriptorStatsFileHandler sdsfh =
+        new ServerDescriptorStatsFileHandler();
 
     // Prepare relay descriptor parser (only if we are writing the
     // stats)
     RelayDescriptorParser rdp = config.getWriteConsensusStats() &&
         config.getWriteBridgeStats() && config.getWriteDirreqStats() ?
-        new RelayDescriptorParser(csfh, bsfh, dsfh, countries,
+        new RelayDescriptorParser(csfh, bsfh, dsfh, sdsfh, countries,
         directories) : null;
 
     // Prepare writing relay descriptor archive to disk
@@ -73,6 +75,10 @@ public class Main {
       dsfh.writeFile();
       dsfh = null;
     }
+    if (sdsfh != null) {
+      sdsfh.writeFiles();
+      sdsfh = null;
+    }
 
     // Prepare bridge descriptor parser
     BridgeDescriptorParser bdp = config.getWriteConsensusStats() &&
diff --git a/src/RelayDescriptorDownloader.java b/src/RelayDescriptorDownloader.java
index af6a33c..c9c850c 100644
--- a/src/RelayDescriptorDownloader.java
+++ b/src/RelayDescriptorDownloader.java
@@ -84,21 +84,15 @@ public class RelayDescriptorDownloader {
             }
             if (verified) {
               if (rdp != null) {
-                BufferedReader br = new BufferedReader(new StringReader(
-                    result));
-                rdp.parse(br);
-                br.close();
+                rdp.parse(allData);
               }
               if (aw != null) {
-                BufferedReader br = new BufferedReader(new StringReader(
-                    result));
                 try {
                   aw.store(allData);
                 } catch (Exception e) {
                   e.printStackTrace();
                   //TODO find better way to handle this
                 }
-                br.close();
               }
             }
           } else {
diff --git a/src/RelayDescriptorParser.java b/src/RelayDescriptorParser.java
index 0b1dccf..5be2f79 100644
--- a/src/RelayDescriptorParser.java
+++ b/src/RelayDescriptorParser.java
@@ -17,17 +17,20 @@ public class RelayDescriptorParser {
   private DirreqStatsFileHandler dsfh;
   private ConsensusStatsFileHandler csfh;
   private BridgeStatsFileHandler bsfh;
+  private ServerDescriptorStatsFileHandler sdsfh;
   private SortedSet<String> countries;
   private SortedSet<String> directories;
   private Logger logger;
   public RelayDescriptorParser(ConsensusStatsFileHandler csfh,
       BridgeStatsFileHandler bsfh, DirreqStatsFileHandler dsfh,
-      SortedSet<String> countries, SortedSet<String> directories) {
+      ServerDescriptorStatsFileHandler sdsfh, SortedSet<String> countries,
+      SortedSet<String> directories) {
     this.relayDescriptorParseHistoryFile = new File(
         "stats/relay-descriptor-parse-history");
     this.csfh = csfh;
     this.bsfh = bsfh;
     this.dsfh = dsfh;
+    this.sdsfh = sdsfh;
     this.countries = countries;
     this.directories = directories;
     this.logger = Logger.getLogger(RelayDescriptorParser.class.getName());
@@ -60,7 +63,9 @@ public class RelayDescriptorParser {
       }
     }
   }
-  public void parse(BufferedReader br) throws IOException {
+  public void parse(byte[] data) throws IOException {
+    BufferedReader br = new BufferedReader(new StringReader(new String(
+        data, "US-ASCII")));
     String line = br.readLine();
     if (line == null) {
       this.logger.warning("Parsing empty file?");
@@ -68,7 +73,8 @@ public class RelayDescriptorParser {
     }
     if (line.equals("network-status-version 3")) {
       int exit = 0, fast = 0, guard = 0, running = 0, stable = 0;
-      String validAfter = null;
+      String validAfter = null, rLine = null;
+      StringBuilder descriptorIdentities = new StringBuilder();
       while ((line = br.readLine()) != null) {
         if (line.startsWith("valid-after ")) {
           validAfter = line.substring("valid-after ".length());
@@ -83,6 +89,7 @@ public class RelayDescriptorParser {
           String hashedRelay = DigestUtils.shaHex(Base64.decodeBase64(
               line.split(" ")[2] + "=")).toUpperCase();
           this.bsfh.addHashedRelay(hashedRelay);
+          rLine = line;
         } else if (line.startsWith("s ")) {
           if (line.contains(" Running")) {
             exit += line.contains(" Exit") ? 1 : 0;
@@ -90,15 +97,48 @@ public class RelayDescriptorParser {
             guard += line.contains(" Guard") ? 1 : 0;
             stable += line.contains(" Stable") ? 1 : 0;
             running++;
+            descriptorIdentities.append("," + rLine.split(" ")[3]);
           }
         }
       }
       if (this.csfh != null) {
-        csfh.addConsensusResults(validAfter, exit, fast, guard, running,
-          stable);
+        this.csfh.addConsensusResults(validAfter, exit, fast, guard,
+          running, stable);
+      }
+      if (this.sdsfh != null) {
+        this.sdsfh.addConsensus(validAfter,
+            descriptorIdentities.toString().substring(1));
       }
     } else if (line.startsWith("router ")) {
-      // in case we want to parse server descriptors in the future
+      String platformLine = null, publishedLine = null,
+          bandwidthLine = null;
+      while ((line = br.readLine()) != null) {
+        if (line.startsWith("platform ")) {
+          platformLine = line;
+        } else if (line.startsWith("published ")) {
+          publishedLine = line;
+        } else if (line.startsWith("bandwidth ")) {
+          bandwidthLine = line;
+        }
+      }
+      String ascii = new String(data, "US-ASCII");
+      String startToken = "router ";
+      String sigToken = "\nrouter-signature\n";
+      int start = ascii.indexOf(startToken);
+      int sig = ascii.indexOf(sigToken) + sigToken.length();
+      if (start < 0 || sig < 0 || sig < start) {
+        this.logger.warning("Cannot determine descriptor digest! "
+            + "Skipping.");
+        return;
+      }
+      byte[] forDigest = new byte[sig - start];
+      System.arraycopy(data, start, forDigest, 0, sig - start);
+      String descriptorIdentity = Base64.encodeBase64String(
+          DigestUtils.sha(forDigest)).substring(0, 27);
+      if (this.sdsfh != null) {
+        this.sdsfh.addServerDescriptor(descriptorIdentity, platformLine,
+            publishedLine, bandwidthLine);
+      }
     } else if (line.startsWith("extra-info ") && this.dsfh != null &&
         directories.contains(line.split(" ")[2])) {
       String dir = line.split(" ")[2];
diff --git a/src/ServerDescriptorStatsFileHandler.java b/src/ServerDescriptorStatsFileHandler.java
new file mode 100644
index 0000000..37e71c3
--- /dev/null
+++ b/src/ServerDescriptorStatsFileHandler.java
@@ -0,0 +1,497 @@
+import java.io.*;
+import java.text.*;
+import java.util.*;
+import java.util.logging.*;
+
+  /**
+   * two pieces of information: consensuses referencing N server
+   * descriptors that are combined with relay flags (like Running) and
+   * server descriptors containing information about tor
+   * versions, platforms, and advertised bandwidth. we want stats that
+   * combine information from consensuses and server descriptors. in
+   * databases this is a n:m relation with n consensus referencing m
+   * server descriptors. so, the straightforward way is to keep parse
+   * results in 2 tables and join them for extracting statistics.
+   * however, we don't want to use a database here. and even if we had
+   * a database, the table join would be too expensive to perform after
+   * adding new data every hour.
+   *
+   * the approach we take here is to de-normalize the data and write
+   * the join of consensuses and server descriptors into one file that
+   * is never kept in memory in the whole. this file has entries for
+   * every consensus line referencing a server descriptor and the
+   * information we want to use from the references server descriptor,
+   * if available. in addition to that, we need a smaller file containing
+   * unreferenced server descriptors that we were not able to write to
+   * the first file, yet. by implementing the join operation manually,
+   * we can make use of the fact that descriptors are not referenced for
+   * longer than 24 hours.
+   *
+   * stats/relay-version-stats:
+   * date,v011,v012,v020,v021,v022,other
+   *
+   * stats/relay-platform-stats:
+   * date,windows,sunos,openbsd,netbsd,linux,freebsd,dragonfly,darwin,other
+   *
+   * stats/relay-bandwidth-stats:
+   * date,q1,md,q3
+   *
+   * read largefile and merge our data in; also generate stats
+   * datetime,descriptor,version,platform,advbw
+   * 320095,aZ7mNo3lkjf2li34hlkvjsdru2,0.2.1,Darwin,1024
+   *
+   * TODO future extension: remove lines from server-descriptor-stats-raw
+   * as soon as we have written a full day (all consensuses, all SDs).
+   */
+public class ServerDescriptorStatsFileHandler {
+
+  private File consensusesFile;
+  private File consensusesTempFile;
+  private File descriptorsFile;
+  private File descriptorsTempFile;
+  private File versionStatsFile;
+  private File platformStatsFile;
+  private File bandwidthStatsFile;
+
+  /**
+   * map key "valid-after", map value "valid-after,descid,descid,descid.."
+   */
+  private SortedMap<String, String> consensuses;
+
+  /**
+   * map key "published,descid"
+   * map value "published,descid,version,platform,bandwidth"
+   */
+  private SortedMap<String, String> descriptors;
+
+  /**
+   * map key "descid"
+   * map value "published,descid,version,platform,bandwidth"
+   */
+  private SortedMap<String, String> descById;
+
+  private Logger logger;
+
+  /**
+   * Initializes this class, including reading in results file
+   * <code>stats/relay-version-stats</code> etc. Not that we don't read in
+   * <code>stats/server-descriptors-raw</code>, because it can grow
+   * really big!
+   */
+  public ServerDescriptorStatsFileHandler() {
+
+    /* init files */
+    this.versionStatsFile = new File("stats/version-stats");
+    this.platformStatsFile = new File("stats/platform-stats");
+    this.bandwidthStatsFile = new File("stats/bandwidth-stats");
+    this.consensusesFile = new File("stats/consensuses-raw");
+    this.consensusesTempFile = new File("stats/consensuses-raw.temp");
+    this.descriptorsFile = new File("stats/descriptors-raw");
+    this.descriptorsTempFile = new File("stats/descriptors-raw.temp");
+
+    /* Initialize local data structures. */
+    this.consensuses = new TreeMap<String, String>();
+    this.descriptors = new TreeMap<String, String>();
+    this.descById = new TreeMap<String, String>();
+
+    /* Initialize logger. */
+    this.logger =
+        Logger.getLogger(ServerDescriptorStatsFileHandler.class.getName());
+    this.logger.fine("Initialized.");
+  }
+
+  /* Just add to data structure. We cannot check whether we already got
+   * it right now. The only thing we can check is whether we got this
+   * consensus before in this run. */
+  public void addConsensus(String validAfter,
+      String descriptorIdentities) {
+    // TODO should there be a modified flag, too?
+    if (!this.consensuses.containsKey(validAfter)) {
+      this.logger.finer("Adding");
+    } else {
+      this.logger.fine("We already learned about this consensus in this "
+          + "run. Overwriting.");
+    }
+    this.consensuses.put(validAfter, validAfter + ","
+        + descriptorIdentities);
+    
+    // force autosave if we have too many data; 240 cons ^= 10 days
+    if (this.consensuses.size() > 240) {
+      this.logger.fine("Autosave triggered by adding consensus: We have "
+          + this.consensuses.size() + " consensuses and " + this.descriptors.size()
+          + " descriptors in memory. Writing to disk now.");
+      this.writeFiles();
+    }
+  }
+
+  // version string is the 0.2.1.23 part of the platform string
+  // platform is platform string with all parts after { removed
+  // advbw is in kibibytes
+  public void addServerDescriptor(String descriptorIdentity,
+      String platformLine, String publishedLine, String bandwidthLine) {
+    // TODO should there be a modified flag, too?
+    String version = "", platform = "", published = "", advBw = "";
+    if (platformLine.contains(" Tor ")) {
+      version = platformLine.substring(platformLine.indexOf(" Tor ") + 5).
+        split(" ")[0];
+    }
+    if (platformLine.contains(" on ")) {
+      platform = platformLine.substring(platformLine.indexOf(" on ") + 4);
+      if (platform.contains("{")) {
+        platform = platform.substring(0, platform.indexOf("{")).trim();
+      }
+    }
+    published = publishedLine.substring("published ".length());
+    String[] bwParts = bandwidthLine.split(" ");
+    if (bwParts.length == 4) {
+      advBw = "" + (Math.min(Long.parseLong(bwParts[1]),
+          Long.parseLong(bwParts[3])) / 1024L);
+      // TODO can't trust input! verify
+    }
+    String key = published + "," + descriptorIdentity;
+    String line = key + "," + version + "," + platform + "," + advBw;
+    if (!this.descriptors.containsKey(key)) {
+      this.logger.finer("Adding");
+    } else {
+      this.logger.fine("We already learned about this server descriptor "
+          + "in this run. Overwriting.");
+    }
+    this.descriptors.put(key, line);
+    this.descById.put(descriptorIdentity, line);
+
+    // force autosave if we have too many data; 50K descs ^= 10 days in early 2010
+    if (this.descriptors.size() > 50000) {
+      this.logger.fine("Autosave triggered by adding descriptor: We have "
+          + this.consensuses.size() + " consensuses and " + this.descriptors.size()
+          + " descriptors in memory. Writing to disk now.");
+      this.writeFiles();
+    }
+  }
+
+  /**
+   * Writes the newly learned consensuses and server descriptors to disk
+   * and merges new findings about relay versions, platforms, and advertised
+   * bandwidth with existing stats files.
+   */
+  /* why is this so complex? because the data doesn't fit into memory and
+   * we want to avoid going through the file more than once (that is,
+   * once for reading and once for writing) if at all possible. */
+  public void writeFiles() {
+
+   // TODO use separate try blocks?
+   try {
+      /* Initialize readers and writers for the two files. We are going to
+       * write to temporary files, delete originals, and rename. */
+      BufferedReader consensusesReader = null;
+      if (this.consensusesFile.exists()) {
+        consensusesReader = new BufferedReader(new FileReader(
+            this.consensusesFile));
+      }
+      BufferedReader descriptorsReader = null;
+      if (this.descriptorsFile.exists()) {
+        descriptorsReader = new BufferedReader(new FileReader(
+          this.descriptorsFile));
+      }
+
+      this.consensusesTempFile.getParentFile().mkdirs();
+      BufferedWriter consensusesWriter = new BufferedWriter(new FileWriter(
+          this.consensusesTempFile));
+      BufferedWriter descriptorsWriter = new BufferedWriter(new FileWriter(
+          this.descriptorsTempFile));
+      BufferedWriter versionWriter = new BufferedWriter(new FileWriter(
+          this.versionStatsFile));
+      BufferedWriter platformWriter = new BufferedWriter(new FileWriter(
+          this.platformStatsFile));
+      BufferedWriter bandwidthWriter = new BufferedWriter(new FileWriter(
+          this.bandwidthStatsFile));
+
+      SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
+      dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
+      SimpleDateFormat dateTimeFormat =
+          new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+      dateTimeFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
+
+      String statsDate = null;
+      // TODO make these configurable
+      List<String> versionKeys = new ArrayList<String>(Arrays.asList(
+          "0.1.1,0.1.2,0.2.0,0.2.1,0.2.2".split(",")));
+      List<String> platformKeys = new ArrayList<String>(Arrays.asList(
+          "Windows,SunOS,OpenBSD,NetBSD,Linux,FreeBSD,DragonFly,Darwin".
+          split(",")));
+      versionWriter.write("date");
+      for (String v : versionKeys) {
+        versionWriter.write("," + v);
+      }
+      versionWriter.write(",other\n");
+      platformWriter.write("date");
+      for (String p : platformKeys) {
+        platformWriter.write("," + p.toLowerCase());
+      }
+      platformWriter.write(",other\n");
+      bandwidthWriter.write("date,advbw\n");
+
+      int[] versionStats = new int[versionKeys.size() + 1];
+      int[] platformStats = new int[platformKeys.size() + 1];
+      long bandwidthStats = 0L;
+      int consensusesAtThisDay = 0;
+
+      /* Always keep one line of the consensuses and descriptors file in
+       * memory. */
+      String consensusLine = consensusesReader != null ?
+          consensusesReader.readLine() : null;
+      String descriptorLine = descriptorsReader != null ?
+          descriptorsReader.readLine() : null;
+
+      /* Iterate over both the consensus file and the consensus strings
+       * that we have in memory at the same time. Whichever has an earlier
+       * valid-after time gets processed. */
+      while (consensusLine != null || !this.consensuses.isEmpty()) {
+
+        /* Find out which line we want to process now, memorize it for
+         * parsing below, advance the source from where we got the line,
+         * and write the line to disk. Afterwards, line contains
+         * the consensus line we want to parse in this iteration. */
+        String line = null; // TODO rename
+        if (consensusLine != null) {
+          if (!this.consensuses.isEmpty()) {
+            String fileVA = consensusLine.split(",")[0];
+            String memVA = this.consensuses.firstKey();
+            if (fileVA.equals(memVA)) {
+              this.logger.finer("We have a consensus line in memory that "
+                  + "we already knew before. Skipping.");
+              // TODO should we compare the two lines here?
+              consensusLine = consensusesReader.readLine();
+              continue; // TODO is this correct?
+            } else if (fileVA.compareTo(memVA) < 0) {
+              line = consensusLine; // TODO rename
+              consensusLine = consensusesReader.readLine();
+            } else {
+              line = this.consensuses.remove(memVA);
+            }
+          } else {
+            line = consensusLine;
+            consensusLine = consensusesReader.readLine();
+          }
+        } else {
+          line = this.consensuses.remove(this.consensuses.firstKey());
+        }
+        consensusesWriter.write(line + "\n");
+
+        /* Write all descriptor to disk that were published more than 24
+         * hours before this consensus. */
+        String minus24h = dateTimeFormat.format(new Date(
+            dateTimeFormat.parse(line.split(",")[0]).getTime() -
+            (24L * 60L * 60L * 1000L)));
+        while ((descriptorLine != null &&
+            descriptorLine.split(",")[0].compareTo(minus24h) < 0) ||
+            (!this.descriptors.isEmpty() &&
+            this.descriptors.firstKey().split(",")[0].
+              compareTo(minus24h) < 0)) {
+          if (descriptorLine != null) {
+            if (!this.descriptors.isEmpty()) {
+              String filePubl = descriptorLine.substring(0, 47);
+              // 47 chars: 19 for datetime, 1 for comma, 27 for descid
+              String memPubl = this.descriptors.firstKey();
+              if (filePubl.equals(memPubl)) {
+                this.logger.finer("same desc. skipping.");
+                descriptorLine = descriptorsReader.readLine();
+                continue; // TODO is this correct?
+              } else if (filePubl.compareTo(memPubl) < 0) {
+                descriptorsWriter.write(descriptorLine + "\n");
+                descriptorLine = descriptorsReader.readLine();
+              } else {
+                String removed = this.descriptors.remove(memPubl);
+                this.descById.remove(removed.split(",")[1]);
+                descriptorsWriter.write(removed + "\n");
+              }
+            } else {
+              descriptorsWriter.write(descriptorLine + "\n");
+              descriptorLine = descriptorsReader.readLine();
+            }
+          } else {
+            String removed = this.descriptors.remove(
+                this.descriptors.firstKey());
+            this.descById.remove(removed.split(",")[1]);
+            descriptorsWriter.write(removed + "\n");
+          }
+        }
+
+        /* Read in all descriptors that were published in the last 24
+         * hours before the consensus that we're just parsing. */
+        String validAfter = line.split(",")[0];
+        while (descriptorsReader != null && descriptorLine != null &&
+            descriptorLine.split(",")[0].compareTo(validAfter) < 0) {
+          this.descriptors.put(descriptorLine.substring(0, 47),
+              descriptorLine);
+          this.descById.put(descriptorLine.split(",")[1], descriptorLine);
+          descriptorLine = descriptorsReader.readLine();
+        }
+
+        /* Now we have a consensus line we want to parse and all possibly
+         * referenced descriptors in descById (rename). Let's write some
+         * stats. */
+        String consensusDate = line.substring(0, 10);
+        if (statsDate == null) {
+          statsDate = consensusDate;
+        }
+        if (!statsDate.equals(consensusDate)) {
+          /* If we have parsed at least half of the consensuses of a day,
+           * Write stats to disk. */ // TODO document this somewhere
+          if (consensusesAtThisDay >= 12) {
+            versionWriter.write(statsDate);
+            for (int i = 0; i < versionStats.length; i++) {
+              versionWriter.write("," + (versionStats[i] /
+                  consensusesAtThisDay));
+            }
+            versionWriter.write("\n");
+            platformWriter.write(statsDate);
+            for (int i = 0; i < platformStats.length; i++) {
+              platformWriter.write("," + (platformStats[i] /
+                  consensusesAtThisDay));
+            }
+            platformWriter.write("\n");
+            bandwidthWriter.write(statsDate + ","
+                + (bandwidthStats / consensusesAtThisDay) + "\n");
+          } else {
+            this.logger.fine("Not enough consensuses to write to stats.");
+          }
+          versionStats = new int[versionKeys.size() + 1];
+          platformStats = new int[platformKeys.size() + 1];
+          bandwidthStats = 0L;
+          consensusesAtThisDay = 0;
+          // fill in NA's for missing dates
+          long writtenMillis = dateFormat.parse(statsDate).getTime();
+          long nextMillis = dateFormat.parse(consensusDate).getTime();
+          while (writtenMillis + (24L * 60L * 60L * 1000L) < nextMillis) {
+            writtenMillis += 24L * 60L * 60L * 1000L;
+            String date = dateFormat.format(new Date(writtenMillis));
+            versionWriter.write(date);
+            for (int i = 0; i < versionStats.length; i++) {
+              versionWriter.write(",NA");
+            }
+            versionWriter.write(",NA\n");
+            platformWriter.write(date);
+            for (int i = 0; i < platformStats.length; i++) {
+              platformWriter.write(",NA");
+            }
+            platformWriter.write(",NA\n");
+            bandwidthWriter.write(date + ",NA\n");
+          }
+          
+          statsDate = consensusDate;
+        }
+
+        /* Parse all descriptors that are referenced from this consensus.
+         * only add values if we have 90+ % of all ref. descriptors!!
+         * TODO document this somewhere! */
+        int[] versionStatsCons = new int[versionKeys.size() + 1];
+        int[] platformStatsCons = new int[platformKeys.size() + 1];
+        long bandwidthStatsCons = 0L;
+        String[] ids = line.split(",");
+        int seenDescs = 0;
+        for (int i = 1; i < ids.length; i++) {
+          if (this.descById.containsKey(ids[i])) {
+            seenDescs++;
+            String desc = this.descById.get(ids[i]);
+            String[] parts = desc.split(",");
+            String version = parts[2].substring(0,
+                parts[2].lastIndexOf("."));
+            if (versionKeys.contains(version)) {
+              versionStatsCons[versionKeys.indexOf(version)]++;
+            } else {
+              versionStatsCons[versionStatsCons.length - 1]++;
+            }
+            String platform = parts[3].toLowerCase();
+            boolean isOther = true;
+            // TODO document that order of platform strings in config
+            // matters! if there are two OS, "DragonFly" and "Dragon",
+            // put "DragonFly" first! capitalization doesn't matter, but
+            // is only relevant for stats file headers
+            for (String p : platformKeys) {
+              if (platform.contains(p.toLowerCase())) {
+                platformStatsCons[platformKeys.indexOf(p)]++;
+                isOther = false;
+                break;
+              }
+            }
+            if (isOther) {
+              platformStatsCons[platformStatsCons.length - 1]++;
+            }
+            bandwidthStatsCons += Long.parseLong(desc.substring(
+                desc.lastIndexOf(",") + 1));
+          }
+        }
+        if (10 * seenDescs / (ids.length - 1) >= 9) {
+          for (int i = 0; i < versionStatsCons.length; i++) {
+            versionStats[i] += versionStatsCons[i];
+          }
+          for (int i = 0; i < platformStatsCons.length; i++) {
+            platformStats[i] += platformStatsCons[i];
+          }
+          bandwidthStats += bandwidthStatsCons;
+          consensusesAtThisDay++;
+        } else {
+          this.logger.fine("not enough server descriptors for consensus, "
+              + "less than 90%. not including in stats.");
+        }
+
+        /* We're done reading one consensus. */
+      }
+
+      /* Write remaining server descriptors to disk. */
+      while (descriptorLine != null || !this.descriptors.isEmpty()) {
+        if (descriptorLine != null) {
+          if (!this.descriptors.isEmpty()) {
+            String filePubl = descriptorLine.substring(0, 47);
+            // 47 chars: 19 for datetime, 1 for comma, 27 for descid
+            String memPubl = this.descriptors.firstKey();
+            if (filePubl.equals(memPubl)) {
+              this.logger.finer("same desc. skipping.");
+              descriptorLine = descriptorsReader.readLine();
+              continue; // TODO is this correct?
+            } else if (filePubl.compareTo(memPubl) < 0) {
+              descriptorsWriter.write(descriptorLine + "\n");
+              descriptorLine = descriptorsReader.readLine();
+            } else {
+              descriptorsWriter.write(this.descriptors.remove(memPubl) + "\n");
+            }
+          } else {
+            descriptorsWriter.write(descriptorLine + "\n");
+            descriptorLine = descriptorsReader.readLine();
+          }
+        } else {
+          descriptorsWriter.write(this.descriptors.remove(this.descriptors.firstKey())
+              + "\n");
+        }
+      }
+      this.descById.clear();
+
+      /* Close the files that we read from and wrote to. */
+      if (consensusesReader != null) {
+        consensusesReader.close();
+      }
+      if (descriptorsReader != null) {
+        descriptorsReader.close();
+      }
+      consensusesWriter.close();
+      descriptorsWriter.close();
+      bandwidthWriter.close();
+      versionWriter.close();
+      platformWriter.close();
+      if (this.consensusesFile.exists()) {
+        this.consensusesFile.delete();
+      }
+      this.consensusesTempFile.renameTo(this.consensusesFile);
+      if (this.descriptorsFile.exists()) {
+        this.descriptorsFile.delete();
+      }
+      this.descriptorsTempFile.renameTo(this.descriptorsFile);
+
+      /* Done. Whee! */
+    } catch (Exception e) {
+      this.logger.log(Level.WARNING, "Exception while writing files.", e);
+    }
+    this.logger.fine("Finished writing.");
+  }
+}
+
-- 
1.6.5



More information about the tor-commits mailing list