commit 09cfdfdff4efc1aa1cc60f53f7f1353a6193e6ad
Author: Karsten Loesing <karsten.loesing(a)gmx.net>
Date: Mon Nov 12 19:50:46 2018 +0100
Remove advbw column from bandwidth.csv.
Instead use advbw data from ipv6servers module.
As a result, we can stop aggregating advertised bandwidths in the
legacy module.
Required schema changes to live tordir databases:
DROP VIEW stats_bandwidth;
CREATE VIEW stats_bandwidth [...]
CREATE OR REPLACE FUNCTION refresh_all() [...]
DROP FUNCTION refresh_bandwidth_flags();
DROP FUNCTION refresh_relay_statuses_per_day();
DROP TABLE relay_statuses_per_day;
DROP TABLE bandwidth_flags;
DROP TABLE consensus;
DROP FUNCTION delete_old_descriptor();
DROP TABLE descriptor;
Part of #28116.
---
src/main/R/rserver/graphs.R | 58 +++---
.../metrics/stats/ipv6servers/Database.java | 22 ++
.../torproject/metrics/stats/ipv6servers/Main.java | 2 +
.../metrics/stats/servers/Configuration.java | 1 -
.../servers/RelayDescriptorDatabaseImporter.java | 232 +--------------------
src/main/sql/ipv6servers/init-ipv6servers.sql | 11 +
src/main/sql/legacy/tordir.sql | 135 +-----------
7 files changed, 73 insertions(+), 388 deletions(-)
diff --git a/src/main/R/rserver/graphs.R b/src/main/R/rserver/graphs.R
index 9dc8c2d..df108e2 100644
--- a/src/main/R/rserver/graphs.R
+++ b/src/main/R/rserver/graphs.R
@@ -446,16 +446,19 @@ write_platforms <- function(start_p = NULL, end_p = NULL, path_p) {
}
prepare_bandwidth <- function(start_p, end_p) {
- read.csv(paste(stats_dir, "bandwidth.csv", sep = ""),
+ advbw <- read.csv(paste(stats_dir, "advbw.csv", sep = ""),
+ colClasses = c("date" = "Date")) %>%
+ transmute(date, variable = "advbw", value = advbw * 8 / 1e9)
+ bwhist <- read.csv(paste(stats_dir, "bandwidth.csv", sep = ""),
colClasses = c("date" = "Date")) %>%
+ transmute(date, variable = "bwhist", value = (bwread + bwwrite) * 8 / 2e9)
+ rbind(advbw, bwhist) %>%
filter(if (!is.null(start_p)) date >= as.Date(start_p) else TRUE) %>%
filter(if (!is.null(end_p)) date <= as.Date(end_p) else TRUE) %>%
- filter(isexit != "") %>%
- filter(isguard != "") %>%
- group_by(date) %>%
- summarize(advbw = sum(advbw) * 8 / 1e9,
- bwhist = sum(bwread + bwwrite) * 8 / 2e9) %>%
- select(date, advbw, bwhist)
+ filter(!is.na(value)) %>%
+ group_by(date, variable) %>%
+ summarize(value = sum(value)) %>%
+ spread(variable, value)
}
plot_bandwidth <- function(start_p, end_p, path_p) {
@@ -810,33 +813,24 @@ write_connbidirect <- function(start_p = NULL, end_p = NULL, path_p) {
}
prepare_bandwidth_flags <- function(start_p, end_p) {
- b <- read.csv(paste(stats_dir, "bandwidth.csv", sep = ""),
- colClasses = c("date" = "Date"))
- b <- b %>%
+ advbw <- read.csv(paste(stats_dir, "advbw.csv", sep = ""),
+ colClasses = c("date" = "Date")) %>%
+ transmute(date, isguard, isexit, variable = "advbw",
+ value = advbw * 8 / 1e9)
+ bwhist <- read.csv(paste(stats_dir, "bandwidth.csv", sep = ""),
+ colClasses = c("date" = "Date")) %>%
+ transmute(date, isguard, isexit, variable = "bwhist",
+ value = (bwread + bwwrite) * 8 / 2e9)
+ rbind(advbw, bwhist) %>%
filter(if (!is.null(start_p)) date >= as.Date(start_p) else TRUE) %>%
filter(if (!is.null(end_p)) date <= as.Date(end_p) else TRUE) %>%
- filter(isexit != "") %>%
- filter(isguard != "")
- b <- data.frame(date = b$date,
- isexit = b$isexit == "t", isguard = b$isguard == "t",
- advbw = b$advbw * 8 / 1e9,
- bwhist = (b$bwread + b$bwwrite) * 8 / 2e9)
- b <- rbind(
- data.frame(b[b$isguard == TRUE, ], flag = "guard"),
- data.frame(b[b$isexit == TRUE, ], flag = "exit"))
- b <- data.frame(date = b$date, advbw = b$advbw, bwhist = b$bwhist,
- flag = b$flag)
- b <- aggregate(list(advbw = b$advbw, bwhist = b$bwhist),
- by = list(date = b$date, flag = b$flag), FUN = sum,
- na.rm = TRUE, na.action = NULL)
- b <- gather(b, type, value, -c(date, flag))
- bandwidth <- b[b$value > 0, ]
- bandwidth <- data.frame(date = bandwidth$date,
- variable = as.factor(paste(bandwidth$flag, "_", bandwidth$type,
- sep = "")), value = bandwidth$value)
- bandwidth$variable <- factor(bandwidth$variable,
- levels = levels(bandwidth$variable)[c(3, 4, 1, 2)])
- bandwidth
+ group_by(date, variable) %>%
+ summarize(exit = sum(value[isexit == "t"]),
+ guard = sum(value[isguard == "t"])) %>%
+ gather(flag, value, -date, -variable) %>%
+ unite(variable, flag, variable) %>%
+ mutate(variable = factor(variable,
+ levels = c("guard_advbw", "guard_bwhist", "exit_advbw", "exit_bwhist")))
}
plot_bandwidth_flags <- function(start_p, end_p, path_p) {
diff --git a/src/main/java/org/torproject/metrics/stats/ipv6servers/Database.java b/src/main/java/org/torproject/metrics/stats/ipv6servers/Database.java
index c3a1fec..b5efe3e 100644
--- a/src/main/java/org/torproject/metrics/stats/ipv6servers/Database.java
+++ b/src/main/java/org/torproject/metrics/stats/ipv6servers/Database.java
@@ -435,6 +435,28 @@ class Database implements AutoCloseable {
return statistics;
}
+ /** Query the bandwidth_advbw view. */
+ List<String[]> queryAdvbw() throws SQLException {
+ List<String[]> statistics = new ArrayList<>();
+ String columns = "date, isexit, isguard, advbw";
+ statistics.add(columns.split(", "));
+ Statement st = this.connection.createStatement();
+ Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone("UTC"),
+ Locale.US);
+ String queryString = "SELECT " + columns + " FROM bandwidth_advbw";
+ try (ResultSet rs = st.executeQuery(queryString)) {
+ while (rs.next()) {
+ String[] outputLine = new String[4];
+ outputLine[0] = rs.getDate("date", calendar).toLocalDate().toString();
+ outputLine[1] = rs.getString("isexit");
+ outputLine[2] = rs.getString("isguard");
+ outputLine[3] = getLongFromResultSet(rs, "advbw");
+ statistics.add(outputLine);
+ }
+ }
+ return statistics;
+ }
+
/** Query the servers_networksize view. */
List<String[]> queryNetworksize() throws SQLException {
List<String[]> statistics = new ArrayList<>();
diff --git a/src/main/java/org/torproject/metrics/stats/ipv6servers/Main.java b/src/main/java/org/torproject/metrics/stats/ipv6servers/Main.java
index a91a74f..d322a2e 100644
--- a/src/main/java/org/torproject/metrics/stats/ipv6servers/Main.java
+++ b/src/main/java/org/torproject/metrics/stats/ipv6servers/Main.java
@@ -88,6 +88,8 @@ public class Main {
log.info("Querying aggregated statistics from the database.");
new Writer().write(Paths.get(Configuration.output, "ipv6servers.csv"),
database.queryServersIpv6());
+ new Writer().write(Paths.get(Configuration.output, "advbw.csv"),
+ database.queryAdvbw());
new Writer().write(Paths.get(Configuration.output, "networksize.csv"),
database.queryNetworksize());
new Writer().write(Paths.get(Configuration.output, "relayflags.csv"),
diff --git a/src/main/java/org/torproject/metrics/stats/servers/Configuration.java b/src/main/java/org/torproject/metrics/stats/servers/Configuration.java
index c4597bc..76788df 100644
--- a/src/main/java/org/torproject/metrics/stats/servers/Configuration.java
+++ b/src/main/java/org/torproject/metrics/stats/servers/Configuration.java
@@ -102,7 +102,6 @@ public class Configuration {
if (this.directoryArchivesDirectories.isEmpty()) {
String prefix = "../../shared/in/recent/relay-descriptors/";
return Arrays.asList(new File(prefix + "consensuses/"),
- new File(prefix + "server-descriptors/"),
new File(prefix + "extra-infos/"));
} else {
return this.directoryArchivesDirectories;
diff --git a/src/main/java/org/torproject/metrics/stats/servers/RelayDescriptorDatabaseImporter.java b/src/main/java/org/torproject/metrics/stats/servers/RelayDescriptorDatabaseImporter.java
index c9a6fa7..2d1ae47 100644
--- a/src/main/java/org/torproject/metrics/stats/servers/RelayDescriptorDatabaseImporter.java
+++ b/src/main/java/org/torproject/metrics/stats/servers/RelayDescriptorDatabaseImporter.java
@@ -9,7 +9,6 @@ import org.torproject.descriptor.DescriptorSourceFactory;
import org.torproject.descriptor.ExtraInfoDescriptor;
import org.torproject.descriptor.NetworkStatusEntry;
import org.torproject.descriptor.RelayNetworkStatusConsensus;
-import org.torproject.descriptor.ServerDescriptor;
import org.postgresql.util.PGbytea;
@@ -20,7 +19,6 @@ import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
-import java.nio.charset.StandardCharsets;
import java.sql.CallableStatement;
import java.sql.Connection;
import java.sql.DriverManager;
@@ -28,7 +26,6 @@ import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
-import java.sql.Types;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
@@ -44,10 +41,6 @@ import java.util.TreeSet;
/**
* Parse directory data.
*/
-
-/* TODO Split up this class and move its parts to cron.network,
- * cron.users, and status.relaysearch packages. Requires extensive
- * changes to the database schema though. */
public final class RelayDescriptorDatabaseImporter {
/**
@@ -58,20 +51,10 @@ public final class RelayDescriptorDatabaseImporter {
/* Counters to keep track of the number of records committed before
* each transaction. */
- private int rdsCount = 0;
-
- private int resCount = 0;
-
private int rhsCount = 0;
private int rrsCount = 0;
- private int rcsCount = 0;
-
- private int rvsCount = 0;
-
- private int rqsCount = 0;
-
/**
* Relay descriptor database connection.
*/
@@ -85,18 +68,6 @@ public final class RelayDescriptorDatabaseImporter {
private PreparedStatement psSs;
/**
- * Prepared statement to check whether a given server descriptor has
- * been imported into the database before.
- */
- private PreparedStatement psDs;
-
- /**
- * Prepared statement to check whether a given network status consensus
- * has been imported into the database before.
- */
- private PreparedStatement psCs;
-
- /**
* Set of dates that have been inserted into the database for being
* included in the next refresh run.
*/
@@ -115,22 +86,11 @@ public final class RelayDescriptorDatabaseImporter {
private PreparedStatement psR;
/**
- * Prepared statement to insert a server descriptor into the database.
- */
- private PreparedStatement psD;
-
- /**
* Callable statement to insert the bandwidth history of an extra-info
* descriptor into the database.
*/
private CallableStatement csH;
- /**
- * Prepared statement to insert a network status consensus into the
- * database.
- */
- private PreparedStatement psC;
-
private static Logger log
= LoggerFactory.getLogger(RelayDescriptorDatabaseImporter.class);
@@ -145,21 +105,11 @@ public final class RelayDescriptorDatabaseImporter {
private BufferedWriter statusentryOut;
/**
- * Raw import file containing server descriptors.
- */
- private BufferedWriter descriptorOut;
-
- /**
* Raw import file containing bandwidth histories.
*/
private BufferedWriter bwhistOut;
/**
- * Raw import file containing consensuses.
- */
- private BufferedWriter consensusOut;
-
- /**
* Date format to parse timestamps.
*/
private SimpleDateFormat dateTimeFormat;
@@ -212,10 +162,6 @@ public final class RelayDescriptorDatabaseImporter {
/* Prepare statements. */
this.psSs = conn.prepareStatement("SELECT fingerprint "
+ "FROM statusentry WHERE validafter = ?");
- this.psDs = conn.prepareStatement("SELECT COUNT(*) "
- + "FROM descriptor WHERE descriptor = ?");
- this.psCs = conn.prepareStatement("SELECT COUNT(*) "
- + "FROM consensus WHERE validafter = ?");
this.psR = conn.prepareStatement("INSERT INTO statusentry "
+ "(validafter, nickname, fingerprint, descriptor, "
+ "published, address, orport, dirport, isauthority, "
@@ -224,16 +170,8 @@ public final class RelayDescriptorDatabaseImporter {
+ "isvalid, isv2dir, isv3dir, version, bandwidth, ports, "
+ "rawdesc) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, "
+ "?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)");
- this.psD = conn.prepareStatement("INSERT INTO descriptor "
- + "(descriptor, nickname, address, orport, dirport, "
- + "fingerprint, bandwidthavg, bandwidthburst, "
- + "bandwidthobserved, platform, published, uptime, "
- + "extrainfo) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, "
- + "?)");
this.csH = conn.prepareCall("{call insert_bwhist(?, ?, ?, ?, ?, "
+ "?)}");
- this.psC = conn.prepareStatement("INSERT INTO consensus "
- + "(validafter) VALUES (?)");
this.psU = conn.prepareStatement("INSERT INTO scheduled_updates "
+ "(date) VALUES (?)");
this.scheduledUpdates = new HashSet<>();
@@ -390,95 +328,9 @@ public final class RelayDescriptorDatabaseImporter {
}
/**
- * Insert server descriptor into database.
- */
- public void addServerDescriptorContents(String descriptor,
- String nickname, String address, int orPort, int dirPort,
- String relayIdentifier, long bandwidthAvg, long bandwidthBurst,
- long bandwidthObserved, String platform, long published,
- Long uptime, String extraInfoDigest) {
- if (this.importIntoDatabase) {
- try {
- this.addDateToScheduledUpdates(published);
- this.addDateToScheduledUpdates(
- published + 24L * 60L * 60L * 1000L);
- Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
- this.psDs.setString(1, descriptor);
- ResultSet rs = psDs.executeQuery();
- rs.next();
- if (rs.getInt(1) == 0) {
- this.psD.clearParameters();
- this.psD.setString(1, descriptor);
- this.psD.setString(2, nickname);
- this.psD.setString(3, address);
- this.psD.setInt(4, orPort);
- this.psD.setInt(5, dirPort);
- this.psD.setString(6, relayIdentifier);
- this.psD.setLong(7, bandwidthAvg);
- this.psD.setLong(8, bandwidthBurst);
- this.psD.setLong(9, bandwidthObserved);
- /* Remove all non-ASCII characters from the platform string, or
- * we'll make Postgres unhappy. Sun's JDK and OpenJDK behave
- * differently when creating a new String with a given encoding.
- * That's what the regexp below is for. */
- this.psD.setString(10, new String(platform.getBytes(),
- StandardCharsets.US_ASCII).replaceAll("[^\\p{ASCII}]",""));
- this.psD.setTimestamp(11, new Timestamp(published), cal);
- if (null != uptime) {
- this.psD.setLong(12, uptime);
- } else {
- this.psD.setNull(12, Types.BIGINT);
- }
- this.psD.setString(13, extraInfoDigest);
- this.psD.executeUpdate();
- rdsCount++;
- if (rdsCount % autoCommitCount == 0) {
- this.conn.commit();
- }
- }
- } catch (SQLException e) {
- log.warn("Could not add server "
- + "descriptor. We won't make any further SQL requests in "
- + "this execution.", e);
- this.importIntoDatabase = false;
- }
- }
- if (this.writeRawImportFiles) {
- try {
- if (this.descriptorOut == null) {
- new File(rawFilesDirectory).mkdirs();
- this.descriptorOut = new BufferedWriter(new FileWriter(
- rawFilesDirectory + "/descriptor.sql"));
- this.descriptorOut.write(" COPY descriptor (descriptor, "
- + "nickname, address, orport, dirport, fingerprint, "
- + "bandwidthavg, bandwidthburst, bandwidthobserved, "
- + "platform, published, uptime, extrainfo) FROM stdin;\n");
- }
- this.descriptorOut.write(descriptor.toLowerCase() + "\t"
- + nickname + "\t" + address + "\t" + orPort + "\t" + dirPort
- + "\t" + relayIdentifier + "\t" + bandwidthAvg + "\t"
- + bandwidthBurst + "\t" + bandwidthObserved + "\t"
- + (platform != null && platform.length() > 0
- ? new String(platform.getBytes(), StandardCharsets.US_ASCII)
- : "\\N") + "\t" + this.dateTimeFormat.format(published) + "\t"
- + (uptime >= 0 ? uptime : "\\N") + "\t"
- + (extraInfoDigest != null ? extraInfoDigest : "\\N")
- + "\n");
- } catch (IOException e) {
- log.warn("Could not write server "
- + "descriptor to raw database import file. We won't make "
- + "any further attempts to write raw import files in this "
- + "execution.", e);
- this.writeRawImportFiles = false;
- }
- }
- }
-
- /**
* Insert extra-info descriptor into database.
*/
- public void addExtraInfoDescriptorContents(String extraInfoDigest,
- String nickname, String fingerprint, long published,
+ public void addExtraInfoDescriptorContents(String fingerprint, long published,
List<String> bandwidthHistoryLines) {
if (!bandwidthHistoryLines.isEmpty()) {
this.addBandwidthHistory(fingerprint.toLowerCase(), published,
@@ -766,55 +618,6 @@ public final class RelayDescriptorDatabaseImporter {
}
}
- /**
- * Insert network status consensus into database.
- */
- public void addConsensus(long validAfter) {
- if (this.importIntoDatabase) {
- try {
- this.addDateToScheduledUpdates(validAfter);
- Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
- Timestamp validAfterTimestamp = new Timestamp(validAfter);
- this.psCs.setTimestamp(1, validAfterTimestamp, cal);
- ResultSet rs = psCs.executeQuery();
- rs.next();
- if (rs.getInt(1) == 0) {
- this.psC.clearParameters();
- this.psC.setTimestamp(1, validAfterTimestamp, cal);
- this.psC.executeUpdate();
- rcsCount++;
- if (rcsCount % autoCommitCount == 0) {
- this.conn.commit();
- }
- }
- } catch (SQLException e) {
- log.warn("Could not add network status "
- + "consensus. We won't make any further SQL requests in "
- + "this execution.", e);
- this.importIntoDatabase = false;
- }
- }
- if (this.writeRawImportFiles) {
- try {
- if (this.consensusOut == null) {
- new File(rawFilesDirectory).mkdirs();
- this.consensusOut = new BufferedWriter(new FileWriter(
- rawFilesDirectory + "/consensus.sql"));
- this.consensusOut.write(" COPY consensus (validafter) "
- + "FROM stdin;\n");
- }
- String validAfterString = this.dateTimeFormat.format(validAfter);
- this.consensusOut.write(validAfterString + "\n");
- } catch (IOException e) {
- log.warn("Could not write network status "
- + "consensus to raw database import file. We won't make "
- + "any further attempts to write raw import files in this "
- + "execution.", e);
- this.writeRawImportFiles = false;
- }
- }
- }
-
/** Imports relay descriptors into the database. */
public void importRelayDescriptors() {
log.info("Importing files in directories " + archivesDirectories
@@ -834,8 +637,6 @@ public final class RelayDescriptorDatabaseImporter {
if (descriptor instanceof RelayNetworkStatusConsensus) {
this.addRelayNetworkStatusConsensus(
(RelayNetworkStatusConsensus) descriptor);
- } else if (descriptor instanceof ServerDescriptor) {
- this.addServerDescriptor((ServerDescriptor) descriptor);
} else if (descriptor instanceof ExtraInfoDescriptor) {
this.addExtraInfoDescriptor((ExtraInfoDescriptor) descriptor);
}
@@ -862,18 +663,6 @@ public final class RelayDescriptorDatabaseImporter {
statusEntry.getBandwidth(), statusEntry.getPortList(),
statusEntry.getStatusEntryBytes());
}
- this.addConsensus(consensus.getValidAfterMillis());
- }
-
- private void addServerDescriptor(ServerDescriptor descriptor) {
- this.addServerDescriptorContents(
- descriptor.getDigestSha1Hex(), descriptor.getNickname(),
- descriptor.getAddress(), descriptor.getOrPort(),
- descriptor.getDirPort(), descriptor.getFingerprint(),
- descriptor.getBandwidthRate(), descriptor.getBandwidthBurst(),
- descriptor.getBandwidthObserved(), descriptor.getPlatform(),
- descriptor.getPublishedMillis(), descriptor.getUptime(),
- descriptor.getExtraInfoDigestSha1Hex());
}
private void addExtraInfoDescriptor(ExtraInfoDescriptor descriptor) {
@@ -892,8 +681,7 @@ public final class RelayDescriptorDatabaseImporter {
bandwidthHistoryLines.add(
descriptor.getDirreqReadHistory().getLine());
}
- this.addExtraInfoDescriptorContents(descriptor.getDigestSha1Hex(),
- descriptor.getNickname(),
+ this.addExtraInfoDescriptorContents(
descriptor.getFingerprint().toLowerCase(),
descriptor.getPublishedMillis(), bandwidthHistoryLines);
}
@@ -904,12 +692,8 @@ public final class RelayDescriptorDatabaseImporter {
public void closeConnection() {
/* Log stats about imported descriptors. */
- log.info("Finished importing relay "
- + "descriptors: {} consensuses, {} network status entries, {} "
- + "votes, {} server descriptors, {} extra-info descriptors, {} "
- + "bandwidth history elements, and {} dirreq stats elements",
- rcsCount, rrsCount, rvsCount, rdsCount, resCount, rhsCount,
- rqsCount);
+ log.info("Finished importing relay descriptors: {} network status entries "
+ + "and {} bandwidth history elements", rrsCount, rhsCount);
/* Insert scheduled updates a second time, just in case the refresh
* run has started since inserting them the first time in which case
@@ -951,18 +735,10 @@ public final class RelayDescriptorDatabaseImporter {
this.statusentryOut.write("\\.\n");
this.statusentryOut.close();
}
- if (this.descriptorOut != null) {
- this.descriptorOut.write("\\.\n");
- this.descriptorOut.close();
- }
if (this.bwhistOut != null) {
this.bwhistOut.write("\\.\n");
this.bwhistOut.close();
}
- if (this.consensusOut != null) {
- this.consensusOut.write("\\.\n");
- this.consensusOut.close();
- }
} catch (IOException e) {
log.warn("Could not close one or more raw database import files.", e);
}
diff --git a/src/main/sql/ipv6servers/init-ipv6servers.sql b/src/main/sql/ipv6servers/init-ipv6servers.sql
index b478a49..c94a19d 100644
--- a/src/main/sql/ipv6servers/init-ipv6servers.sql
+++ b/src/main/sql/ipv6servers/init-ipv6servers.sql
@@ -312,6 +312,17 @@ GROUP BY DATE(valid_after), server, guard_relay, exit_relay, announced_ipv6,
ORDER BY valid_after_date, server, guard_relay, exit_relay, announced_ipv6,
exiting_ipv6_relay, reachable_ipv6_relay;
+-- View on advertised bandwidth by Exit/Guard flag combination.
+CREATE OR REPLACE VIEW bandwidth_advbw AS
+SELECT valid_after_date AS date,
+ exit_relay AS isexit,
+ guard_relay AS isguard,
+ FLOOR(SUM(advertised_bandwidth_bytes_sum_avg)) AS advbw
+FROM ipv6servers
+WHERE server = 'relay'
+GROUP BY date, isexit, isguard
+ORDER BY date, isexit, isguard;
+
-- View on the number of running servers by relay flag.
CREATE OR REPLACE VIEW servers_flags_complete AS
WITH included_statuses AS (
diff --git a/src/main/sql/legacy/tordir.sql b/src/main/sql/legacy/tordir.sql
index f1d6767..dfe7b5d 100644
--- a/src/main/sql/legacy/tordir.sql
+++ b/src/main/sql/legacy/tordir.sql
@@ -3,33 +3,6 @@
CREATE LANGUAGE plpgsql;
--- TABLE descriptor
--- Contains all of the descriptors published by routers.
-CREATE TABLE descriptor (
- descriptor CHARACTER(40) NOT NULL,
- nickname CHARACTER VARYING(19) NOT NULL,
- address CHARACTER VARYING(15) NOT NULL,
- orport INTEGER NOT NULL,
- dirport INTEGER NOT NULL,
- fingerprint CHARACTER(40) NOT NULL,
- bandwidthavg BIGINT NOT NULL,
- bandwidthburst BIGINT NOT NULL,
- bandwidthobserved BIGINT NOT NULL,
- platform CHARACTER VARYING(256),
- published TIMESTAMP WITHOUT TIME ZONE NOT NULL,
- uptime BIGINT,
- extrainfo CHARACTER(40),
- CONSTRAINT descriptor_pkey PRIMARY KEY (descriptor)
-);
-
-CREATE OR REPLACE FUNCTION delete_old_descriptor()
-RETURNS INTEGER AS $$
- BEGIN
- DELETE FROM descriptor WHERE DATE(published) < current_date - 14;
- RETURN 1;
- END;
-$$ LANGUAGE plpgsql;
-
-- Contains bandwidth histories reported by relays in extra-info
-- descriptors. Each row contains the reported bandwidth in 15-minute
-- intervals for each relay and date.
@@ -97,22 +70,6 @@ RETURNS INTEGER AS $$
END;
$$ LANGUAGE plpgsql;
--- TABLE consensus
--- Contains all of the consensuses published by the directories.
-CREATE TABLE consensus (
- validafter TIMESTAMP WITHOUT TIME ZONE NOT NULL,
- CONSTRAINT consensus_pkey PRIMARY KEY (validafter)
-);
-
--- TABLE bandwidth_flags
-CREATE TABLE bandwidth_flags (
- date DATE NOT NULL,
- isexit BOOLEAN NOT NULL,
- isguard BOOLEAN NOT NULL,
- bwadvertised BIGINT NOT NULL,
- CONSTRAINT bandwidth_flags_pkey PRIMARY KEY(date, isexit, isguard)
-);
-
-- TABLE bwhist_flags
CREATE TABLE bwhist_flags (
date DATE NOT NULL,
@@ -149,15 +106,6 @@ CREATE TABLE user_stats (
CONSTRAINT user_stats_pkey PRIMARY KEY(date, country)
);
--- TABLE relay_statuses_per_day
--- A helper table which is commonly used to update the tables above in the
--- refresh_* functions.
-CREATE TABLE relay_statuses_per_day (
- date DATE NOT NULL,
- count INTEGER NOT NULL,
- CONSTRAINT relay_statuses_per_day_pkey PRIMARY KEY(date)
-);
-
-- Dates to be included in the next refresh run.
CREATE TABLE scheduled_updates (
id SERIAL,
@@ -174,24 +122,6 @@ CREATE TABLE updates (
date DATE
);
--- FUNCTION refresh_relay_statuses_per_day()
--- Updates helper table which is used to refresh the aggregate tables.
-CREATE OR REPLACE FUNCTION refresh_relay_statuses_per_day()
-RETURNS INTEGER AS $$
- BEGIN
- DELETE FROM relay_statuses_per_day
- WHERE date IN (SELECT date FROM updates);
- INSERT INTO relay_statuses_per_day (date, count)
- SELECT DATE(validafter) AS date, COUNT(*) AS count
- FROM consensus
- WHERE DATE(validafter) >= (SELECT MIN(date) FROM updates)
- AND DATE(validafter) <= (SELECT MAX(date) FROM updates)
- AND DATE(validafter) IN (SELECT date FROM updates)
- GROUP BY DATE(validafter);
- RETURN 1;
- END;
-$$ LANGUAGE plpgsql;
-
CREATE OR REPLACE FUNCTION array_sum (BIGINT[]) RETURNS BIGINT AS $$
SELECT SUM($1[i])::bigint
FROM generate_series(array_lower($1, 1), array_upper($1, 1)) index(i);
@@ -247,45 +177,11 @@ $$ LANGUAGE plpgsql;
-- refresh_* functions
-- The following functions keep their corresponding aggregate tables
--- up-to-date. They should be called every time ERNIE is run, or when new
--- data is finished being added to the descriptor or statusentry tables.
+-- up-to-date. They should be called every time this module is run, or when new
+-- data is finished being added to the statusentry tables.
-- They find what new data has been entered or updated based on the
-- updates table.
-CREATE OR REPLACE FUNCTION refresh_bandwidth_flags() RETURNS INTEGER AS $$
- DECLARE
- min_date TIMESTAMP WITHOUT TIME ZONE;
- max_date TIMESTAMP WITHOUT TIME ZONE;
- BEGIN
-
- min_date := (SELECT MIN(date) FROM updates);
- max_date := (SELECT MAX(date) + 1 FROM updates);
-
- DELETE FROM bandwidth_flags WHERE date IN (SELECT date FROM updates);
- EXECUTE '
- INSERT INTO bandwidth_flags (date, isexit, isguard, bwadvertised)
- SELECT DATE(validafter) AS date,
- BOOL_OR(isexit) AS isexit,
- BOOL_OR(isguard) AS isguard,
- (SUM(LEAST(bandwidthavg, bandwidthobserved))
- / relay_statuses_per_day.count)::BIGINT AS bwadvertised
- FROM descriptor RIGHT JOIN statusentry
- ON descriptor.descriptor = statusentry.descriptor
- JOIN relay_statuses_per_day
- ON DATE(validafter) = relay_statuses_per_day.date
- WHERE isrunning = TRUE
- AND validafter >= ''' || min_date || '''
- AND validafter < ''' || max_date || '''
- AND DATE(validafter) IN (SELECT date FROM updates)
- AND relay_statuses_per_day.date >= ''' || min_date || '''
- AND relay_statuses_per_day.date < ''' || max_date || '''
- AND DATE(relay_statuses_per_day.date) IN
- (SELECT date FROM updates)
- GROUP BY DATE(validafter), isexit, isguard, relay_statuses_per_day.count';
- RETURN 1;
- END;
-$$ LANGUAGE plpgsql;
-
CREATE OR REPLACE FUNCTION refresh_bwhist_flags() RETURNS INTEGER AS $$
DECLARE
min_date TIMESTAMP WITHOUT TIME ZONE;
@@ -391,18 +287,12 @@ CREATE OR REPLACE FUNCTION refresh_all() RETURNS INTEGER AS $$
DELETE FROM updates;
RAISE NOTICE '% Copying scheduled dates.', timeofday();
INSERT INTO updates SELECT * FROM scheduled_updates;
- RAISE NOTICE '% Refreshing relay statuses per day.', timeofday();
- PERFORM refresh_relay_statuses_per_day();
- RAISE NOTICE '% Refreshing total relay bandwidth.', timeofday();
- PERFORM refresh_bandwidth_flags();
RAISE NOTICE '% Refreshing bandwidth history.', timeofday();
PERFORM refresh_bwhist_flags();
RAISE NOTICE '% Refreshing user statistics.', timeofday();
PERFORM refresh_user_stats();
RAISE NOTICE '% Deleting processed dates.', timeofday();
DELETE FROM scheduled_updates WHERE id IN (SELECT id FROM updates);
- RAISE NOTICE '% Deleting old descriptors.', timeofday();
- PERFORM delete_old_descriptor();
RAISE NOTICE '% Deleting old bandwidth histories.', timeofday();
PERFORM delete_old_bwhist();
RAISE NOTICE '% Deleting old status entries.', timeofday();
@@ -414,23 +304,14 @@ $$ LANGUAGE plpgsql;
-- View for exporting bandwidth statistics.
CREATE VIEW stats_bandwidth AS
- (SELECT COALESCE(bandwidth_flags.date, bwhist_flags.date) AS date,
- COALESCE(bandwidth_flags.isexit, bwhist_flags.isexit) AS isexit,
- COALESCE(bandwidth_flags.isguard, bwhist_flags.isguard) AS isguard,
- bandwidth_flags.bwadvertised AS advbw,
- CASE WHEN bwhist_flags.read IS NOT NULL
- THEN bwhist_flags.read / 86400 END AS bwread,
- CASE WHEN bwhist_flags.written IS NOT NULL
- THEN bwhist_flags.written / 86400 END AS bwwrite,
+ (SELECT date, isexit, isguard,
+ read / 86400 AS bwread,
+ written / 86400 AS bwwrite,
NULL AS dirread, NULL AS dirwrite
- FROM bandwidth_flags FULL OUTER JOIN bwhist_flags
- ON bandwidth_flags.date = bwhist_flags.date
- AND bandwidth_flags.isexit = bwhist_flags.isexit
- AND bandwidth_flags.isguard = bwhist_flags.isguard
- WHERE COALESCE(bandwidth_flags.date, bwhist_flags.date) <
- current_date - 2)
+ FROM bwhist_flags
+ WHERE date < current_date - 2)
UNION ALL
- (SELECT date, NULL AS isexit, NULL AS isguard, NULL AS advbw,
+ (SELECT date, NULL AS isexit, NULL AS isguard,
NULL AS bwread, NULL AS bwwrite,
FLOOR(CAST(dr AS NUMERIC) / CAST(86400 AS NUMERIC)) AS dirread,
FLOOR(CAST(dw AS NUMERIC) / CAST(86400 AS NUMERIC)) AS dirwrite