commit 09cfdfdff4efc1aa1cc60f53f7f1353a6193e6ad Author: Karsten Loesing karsten.loesing@gmx.net Date: Mon Nov 12 19:50:46 2018 +0100
Remove advbw column from bandwidth.csv.
Instead use advbw data from ipv6servers module.
As a result, we can stop aggregating advertised bandwidths in the legacy module.
Required schema changes to live tordir databases:
DROP VIEW stats_bandwidth; CREATE VIEW stats_bandwidth [...] CREATE OR REPLACE FUNCTION refresh_all() [...] DROP FUNCTION refresh_bandwidth_flags(); DROP FUNCTION refresh_relay_statuses_per_day(); DROP TABLE relay_statuses_per_day; DROP TABLE bandwidth_flags; DROP TABLE consensus; DROP FUNCTION delete_old_descriptor(); DROP TABLE descriptor;
Part of #28116. --- src/main/R/rserver/graphs.R | 58 +++--- .../metrics/stats/ipv6servers/Database.java | 22 ++ .../torproject/metrics/stats/ipv6servers/Main.java | 2 + .../metrics/stats/servers/Configuration.java | 1 - .../servers/RelayDescriptorDatabaseImporter.java | 232 +-------------------- src/main/sql/ipv6servers/init-ipv6servers.sql | 11 + src/main/sql/legacy/tordir.sql | 135 +----------- 7 files changed, 73 insertions(+), 388 deletions(-)
diff --git a/src/main/R/rserver/graphs.R b/src/main/R/rserver/graphs.R index 9dc8c2d..df108e2 100644 --- a/src/main/R/rserver/graphs.R +++ b/src/main/R/rserver/graphs.R @@ -446,16 +446,19 @@ write_platforms <- function(start_p = NULL, end_p = NULL, path_p) { }
prepare_bandwidth <- function(start_p, end_p) { - read.csv(paste(stats_dir, "bandwidth.csv", sep = ""), + advbw <- read.csv(paste(stats_dir, "advbw.csv", sep = ""), + colClasses = c("date" = "Date")) %>% + transmute(date, variable = "advbw", value = advbw * 8 / 1e9) + bwhist <- read.csv(paste(stats_dir, "bandwidth.csv", sep = ""), colClasses = c("date" = "Date")) %>% + transmute(date, variable = "bwhist", value = (bwread + bwwrite) * 8 / 2e9) + rbind(advbw, bwhist) %>% filter(if (!is.null(start_p)) date >= as.Date(start_p) else TRUE) %>% filter(if (!is.null(end_p)) date <= as.Date(end_p) else TRUE) %>% - filter(isexit != "") %>% - filter(isguard != "") %>% - group_by(date) %>% - summarize(advbw = sum(advbw) * 8 / 1e9, - bwhist = sum(bwread + bwwrite) * 8 / 2e9) %>% - select(date, advbw, bwhist) + filter(!is.na(value)) %>% + group_by(date, variable) %>% + summarize(value = sum(value)) %>% + spread(variable, value) }
plot_bandwidth <- function(start_p, end_p, path_p) { @@ -810,33 +813,24 @@ write_connbidirect <- function(start_p = NULL, end_p = NULL, path_p) { }
prepare_bandwidth_flags <- function(start_p, end_p) { - b <- read.csv(paste(stats_dir, "bandwidth.csv", sep = ""), - colClasses = c("date" = "Date")) - b <- b %>% + advbw <- read.csv(paste(stats_dir, "advbw.csv", sep = ""), + colClasses = c("date" = "Date")) %>% + transmute(date, isguard, isexit, variable = "advbw", + value = advbw * 8 / 1e9) + bwhist <- read.csv(paste(stats_dir, "bandwidth.csv", sep = ""), + colClasses = c("date" = "Date")) %>% + transmute(date, isguard, isexit, variable = "bwhist", + value = (bwread + bwwrite) * 8 / 2e9) + rbind(advbw, bwhist) %>% filter(if (!is.null(start_p)) date >= as.Date(start_p) else TRUE) %>% filter(if (!is.null(end_p)) date <= as.Date(end_p) else TRUE) %>% - filter(isexit != "") %>% - filter(isguard != "") - b <- data.frame(date = b$date, - isexit = b$isexit == "t", isguard = b$isguard == "t", - advbw = b$advbw * 8 / 1e9, - bwhist = (b$bwread + b$bwwrite) * 8 / 2e9) - b <- rbind( - data.frame(b[b$isguard == TRUE, ], flag = "guard"), - data.frame(b[b$isexit == TRUE, ], flag = "exit")) - b <- data.frame(date = b$date, advbw = b$advbw, bwhist = b$bwhist, - flag = b$flag) - b <- aggregate(list(advbw = b$advbw, bwhist = b$bwhist), - by = list(date = b$date, flag = b$flag), FUN = sum, - na.rm = TRUE, na.action = NULL) - b <- gather(b, type, value, -c(date, flag)) - bandwidth <- b[b$value > 0, ] - bandwidth <- data.frame(date = bandwidth$date, - variable = as.factor(paste(bandwidth$flag, "_", bandwidth$type, - sep = "")), value = bandwidth$value) - bandwidth$variable <- factor(bandwidth$variable, - levels = levels(bandwidth$variable)[c(3, 4, 1, 2)]) - bandwidth + group_by(date, variable) %>% + summarize(exit = sum(value[isexit == "t"]), + guard = sum(value[isguard == "t"])) %>% + gather(flag, value, -date, -variable) %>% + unite(variable, flag, variable) %>% + mutate(variable = factor(variable, + levels = c("guard_advbw", "guard_bwhist", "exit_advbw", "exit_bwhist"))) }
plot_bandwidth_flags <- function(start_p, end_p, path_p) { diff --git a/src/main/java/org/torproject/metrics/stats/ipv6servers/Database.java b/src/main/java/org/torproject/metrics/stats/ipv6servers/Database.java index c3a1fec..b5efe3e 100644 --- a/src/main/java/org/torproject/metrics/stats/ipv6servers/Database.java +++ b/src/main/java/org/torproject/metrics/stats/ipv6servers/Database.java @@ -435,6 +435,28 @@ class Database implements AutoCloseable { return statistics; }
+ /** Query the bandwidth_advbw view. */ + List<String[]> queryAdvbw() throws SQLException { + List<String[]> statistics = new ArrayList<>(); + String columns = "date, isexit, isguard, advbw"; + statistics.add(columns.split(", ")); + Statement st = this.connection.createStatement(); + Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone("UTC"), + Locale.US); + String queryString = "SELECT " + columns + " FROM bandwidth_advbw"; + try (ResultSet rs = st.executeQuery(queryString)) { + while (rs.next()) { + String[] outputLine = new String[4]; + outputLine[0] = rs.getDate("date", calendar).toLocalDate().toString(); + outputLine[1] = rs.getString("isexit"); + outputLine[2] = rs.getString("isguard"); + outputLine[3] = getLongFromResultSet(rs, "advbw"); + statistics.add(outputLine); + } + } + return statistics; + } + /** Query the servers_networksize view. */ List<String[]> queryNetworksize() throws SQLException { List<String[]> statistics = new ArrayList<>(); diff --git a/src/main/java/org/torproject/metrics/stats/ipv6servers/Main.java b/src/main/java/org/torproject/metrics/stats/ipv6servers/Main.java index a91a74f..d322a2e 100644 --- a/src/main/java/org/torproject/metrics/stats/ipv6servers/Main.java +++ b/src/main/java/org/torproject/metrics/stats/ipv6servers/Main.java @@ -88,6 +88,8 @@ public class Main { log.info("Querying aggregated statistics from the database."); new Writer().write(Paths.get(Configuration.output, "ipv6servers.csv"), database.queryServersIpv6()); + new Writer().write(Paths.get(Configuration.output, "advbw.csv"), + database.queryAdvbw()); new Writer().write(Paths.get(Configuration.output, "networksize.csv"), database.queryNetworksize()); new Writer().write(Paths.get(Configuration.output, "relayflags.csv"), diff --git a/src/main/java/org/torproject/metrics/stats/servers/Configuration.java b/src/main/java/org/torproject/metrics/stats/servers/Configuration.java index c4597bc..76788df 100644 --- a/src/main/java/org/torproject/metrics/stats/servers/Configuration.java +++ b/src/main/java/org/torproject/metrics/stats/servers/Configuration.java @@ -102,7 +102,6 @@ public class Configuration { if (this.directoryArchivesDirectories.isEmpty()) { String prefix = "../../shared/in/recent/relay-descriptors/"; return Arrays.asList(new File(prefix + "consensuses/"), - new File(prefix + "server-descriptors/"), new File(prefix + "extra-infos/")); } else { return this.directoryArchivesDirectories; diff --git a/src/main/java/org/torproject/metrics/stats/servers/RelayDescriptorDatabaseImporter.java b/src/main/java/org/torproject/metrics/stats/servers/RelayDescriptorDatabaseImporter.java index c9a6fa7..2d1ae47 100644 --- a/src/main/java/org/torproject/metrics/stats/servers/RelayDescriptorDatabaseImporter.java +++ b/src/main/java/org/torproject/metrics/stats/servers/RelayDescriptorDatabaseImporter.java @@ -9,7 +9,6 @@ import org.torproject.descriptor.DescriptorSourceFactory; import org.torproject.descriptor.ExtraInfoDescriptor; import org.torproject.descriptor.NetworkStatusEntry; import org.torproject.descriptor.RelayNetworkStatusConsensus; -import org.torproject.descriptor.ServerDescriptor;
import org.postgresql.util.PGbytea;
@@ -20,7 +19,6 @@ import java.io.BufferedWriter; import java.io.File; import java.io.FileWriter; import java.io.IOException; -import java.nio.charset.StandardCharsets; import java.sql.CallableStatement; import java.sql.Connection; import java.sql.DriverManager; @@ -28,7 +26,6 @@ import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Timestamp; -import java.sql.Types; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.ArrayList; @@ -44,10 +41,6 @@ import java.util.TreeSet; /** * Parse directory data. */ - -/* TODO Split up this class and move its parts to cron.network, - * cron.users, and status.relaysearch packages. Requires extensive - * changes to the database schema though. */ public final class RelayDescriptorDatabaseImporter {
/** @@ -58,20 +51,10 @@ public final class RelayDescriptorDatabaseImporter { /* Counters to keep track of the number of records committed before * each transaction. */
- private int rdsCount = 0; - - private int resCount = 0; - private int rhsCount = 0;
private int rrsCount = 0;
- private int rcsCount = 0; - - private int rvsCount = 0; - - private int rqsCount = 0; - /** * Relay descriptor database connection. */ @@ -85,18 +68,6 @@ public final class RelayDescriptorDatabaseImporter { private PreparedStatement psSs;
/** - * Prepared statement to check whether a given server descriptor has - * been imported into the database before. - */ - private PreparedStatement psDs; - - /** - * Prepared statement to check whether a given network status consensus - * has been imported into the database before. - */ - private PreparedStatement psCs; - - /** * Set of dates that have been inserted into the database for being * included in the next refresh run. */ @@ -115,22 +86,11 @@ public final class RelayDescriptorDatabaseImporter { private PreparedStatement psR;
/** - * Prepared statement to insert a server descriptor into the database. - */ - private PreparedStatement psD; - - /** * Callable statement to insert the bandwidth history of an extra-info * descriptor into the database. */ private CallableStatement csH;
- /** - * Prepared statement to insert a network status consensus into the - * database. - */ - private PreparedStatement psC; - private static Logger log = LoggerFactory.getLogger(RelayDescriptorDatabaseImporter.class);
@@ -145,21 +105,11 @@ public final class RelayDescriptorDatabaseImporter { private BufferedWriter statusentryOut;
/** - * Raw import file containing server descriptors. - */ - private BufferedWriter descriptorOut; - - /** * Raw import file containing bandwidth histories. */ private BufferedWriter bwhistOut;
/** - * Raw import file containing consensuses. - */ - private BufferedWriter consensusOut; - - /** * Date format to parse timestamps. */ private SimpleDateFormat dateTimeFormat; @@ -212,10 +162,6 @@ public final class RelayDescriptorDatabaseImporter { /* Prepare statements. */ this.psSs = conn.prepareStatement("SELECT fingerprint " + "FROM statusentry WHERE validafter = ?"); - this.psDs = conn.prepareStatement("SELECT COUNT(*) " - + "FROM descriptor WHERE descriptor = ?"); - this.psCs = conn.prepareStatement("SELECT COUNT(*) " - + "FROM consensus WHERE validafter = ?"); this.psR = conn.prepareStatement("INSERT INTO statusentry " + "(validafter, nickname, fingerprint, descriptor, " + "published, address, orport, dirport, isauthority, " @@ -224,16 +170,8 @@ public final class RelayDescriptorDatabaseImporter { + "isvalid, isv2dir, isv3dir, version, bandwidth, ports, " + "rawdesc) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, " + "?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"); - this.psD = conn.prepareStatement("INSERT INTO descriptor " - + "(descriptor, nickname, address, orport, dirport, " - + "fingerprint, bandwidthavg, bandwidthburst, " - + "bandwidthobserved, platform, published, uptime, " - + "extrainfo) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, " - + "?)"); this.csH = conn.prepareCall("{call insert_bwhist(?, ?, ?, ?, ?, " + "?)}"); - this.psC = conn.prepareStatement("INSERT INTO consensus " - + "(validafter) VALUES (?)"); this.psU = conn.prepareStatement("INSERT INTO scheduled_updates " + "(date) VALUES (?)"); this.scheduledUpdates = new HashSet<>(); @@ -390,95 +328,9 @@ public final class RelayDescriptorDatabaseImporter { }
/** - * Insert server descriptor into database. - */ - public void addServerDescriptorContents(String descriptor, - String nickname, String address, int orPort, int dirPort, - String relayIdentifier, long bandwidthAvg, long bandwidthBurst, - long bandwidthObserved, String platform, long published, - Long uptime, String extraInfoDigest) { - if (this.importIntoDatabase) { - try { - this.addDateToScheduledUpdates(published); - this.addDateToScheduledUpdates( - published + 24L * 60L * 60L * 1000L); - Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC")); - this.psDs.setString(1, descriptor); - ResultSet rs = psDs.executeQuery(); - rs.next(); - if (rs.getInt(1) == 0) { - this.psD.clearParameters(); - this.psD.setString(1, descriptor); - this.psD.setString(2, nickname); - this.psD.setString(3, address); - this.psD.setInt(4, orPort); - this.psD.setInt(5, dirPort); - this.psD.setString(6, relayIdentifier); - this.psD.setLong(7, bandwidthAvg); - this.psD.setLong(8, bandwidthBurst); - this.psD.setLong(9, bandwidthObserved); - /* Remove all non-ASCII characters from the platform string, or - * we'll make Postgres unhappy. Sun's JDK and OpenJDK behave - * differently when creating a new String with a given encoding. - * That's what the regexp below is for. */ - this.psD.setString(10, new String(platform.getBytes(), - StandardCharsets.US_ASCII).replaceAll("[^\p{ASCII}]","")); - this.psD.setTimestamp(11, new Timestamp(published), cal); - if (null != uptime) { - this.psD.setLong(12, uptime); - } else { - this.psD.setNull(12, Types.BIGINT); - } - this.psD.setString(13, extraInfoDigest); - this.psD.executeUpdate(); - rdsCount++; - if (rdsCount % autoCommitCount == 0) { - this.conn.commit(); - } - } - } catch (SQLException e) { - log.warn("Could not add server " - + "descriptor. We won't make any further SQL requests in " - + "this execution.", e); - this.importIntoDatabase = false; - } - } - if (this.writeRawImportFiles) { - try { - if (this.descriptorOut == null) { - new File(rawFilesDirectory).mkdirs(); - this.descriptorOut = new BufferedWriter(new FileWriter( - rawFilesDirectory + "/descriptor.sql")); - this.descriptorOut.write(" COPY descriptor (descriptor, " - + "nickname, address, orport, dirport, fingerprint, " - + "bandwidthavg, bandwidthburst, bandwidthobserved, " - + "platform, published, uptime, extrainfo) FROM stdin;\n"); - } - this.descriptorOut.write(descriptor.toLowerCase() + "\t" - + nickname + "\t" + address + "\t" + orPort + "\t" + dirPort - + "\t" + relayIdentifier + "\t" + bandwidthAvg + "\t" - + bandwidthBurst + "\t" + bandwidthObserved + "\t" - + (platform != null && platform.length() > 0 - ? new String(platform.getBytes(), StandardCharsets.US_ASCII) - : "\N") + "\t" + this.dateTimeFormat.format(published) + "\t" - + (uptime >= 0 ? uptime : "\N") + "\t" - + (extraInfoDigest != null ? extraInfoDigest : "\N") - + "\n"); - } catch (IOException e) { - log.warn("Could not write server " - + "descriptor to raw database import file. We won't make " - + "any further attempts to write raw import files in this " - + "execution.", e); - this.writeRawImportFiles = false; - } - } - } - - /** * Insert extra-info descriptor into database. */ - public void addExtraInfoDescriptorContents(String extraInfoDigest, - String nickname, String fingerprint, long published, + public void addExtraInfoDescriptorContents(String fingerprint, long published, List<String> bandwidthHistoryLines) { if (!bandwidthHistoryLines.isEmpty()) { this.addBandwidthHistory(fingerprint.toLowerCase(), published, @@ -766,55 +618,6 @@ public final class RelayDescriptorDatabaseImporter { } }
- /** - * Insert network status consensus into database. - */ - public void addConsensus(long validAfter) { - if (this.importIntoDatabase) { - try { - this.addDateToScheduledUpdates(validAfter); - Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC")); - Timestamp validAfterTimestamp = new Timestamp(validAfter); - this.psCs.setTimestamp(1, validAfterTimestamp, cal); - ResultSet rs = psCs.executeQuery(); - rs.next(); - if (rs.getInt(1) == 0) { - this.psC.clearParameters(); - this.psC.setTimestamp(1, validAfterTimestamp, cal); - this.psC.executeUpdate(); - rcsCount++; - if (rcsCount % autoCommitCount == 0) { - this.conn.commit(); - } - } - } catch (SQLException e) { - log.warn("Could not add network status " - + "consensus. We won't make any further SQL requests in " - + "this execution.", e); - this.importIntoDatabase = false; - } - } - if (this.writeRawImportFiles) { - try { - if (this.consensusOut == null) { - new File(rawFilesDirectory).mkdirs(); - this.consensusOut = new BufferedWriter(new FileWriter( - rawFilesDirectory + "/consensus.sql")); - this.consensusOut.write(" COPY consensus (validafter) " - + "FROM stdin;\n"); - } - String validAfterString = this.dateTimeFormat.format(validAfter); - this.consensusOut.write(validAfterString + "\n"); - } catch (IOException e) { - log.warn("Could not write network status " - + "consensus to raw database import file. We won't make " - + "any further attempts to write raw import files in this " - + "execution.", e); - this.writeRawImportFiles = false; - } - } - } - /** Imports relay descriptors into the database. */ public void importRelayDescriptors() { log.info("Importing files in directories " + archivesDirectories @@ -834,8 +637,6 @@ public final class RelayDescriptorDatabaseImporter { if (descriptor instanceof RelayNetworkStatusConsensus) { this.addRelayNetworkStatusConsensus( (RelayNetworkStatusConsensus) descriptor); - } else if (descriptor instanceof ServerDescriptor) { - this.addServerDescriptor((ServerDescriptor) descriptor); } else if (descriptor instanceof ExtraInfoDescriptor) { this.addExtraInfoDescriptor((ExtraInfoDescriptor) descriptor); } @@ -862,18 +663,6 @@ public final class RelayDescriptorDatabaseImporter { statusEntry.getBandwidth(), statusEntry.getPortList(), statusEntry.getStatusEntryBytes()); } - this.addConsensus(consensus.getValidAfterMillis()); - } - - private void addServerDescriptor(ServerDescriptor descriptor) { - this.addServerDescriptorContents( - descriptor.getDigestSha1Hex(), descriptor.getNickname(), - descriptor.getAddress(), descriptor.getOrPort(), - descriptor.getDirPort(), descriptor.getFingerprint(), - descriptor.getBandwidthRate(), descriptor.getBandwidthBurst(), - descriptor.getBandwidthObserved(), descriptor.getPlatform(), - descriptor.getPublishedMillis(), descriptor.getUptime(), - descriptor.getExtraInfoDigestSha1Hex()); }
private void addExtraInfoDescriptor(ExtraInfoDescriptor descriptor) { @@ -892,8 +681,7 @@ public final class RelayDescriptorDatabaseImporter { bandwidthHistoryLines.add( descriptor.getDirreqReadHistory().getLine()); } - this.addExtraInfoDescriptorContents(descriptor.getDigestSha1Hex(), - descriptor.getNickname(), + this.addExtraInfoDescriptorContents( descriptor.getFingerprint().toLowerCase(), descriptor.getPublishedMillis(), bandwidthHistoryLines); } @@ -904,12 +692,8 @@ public final class RelayDescriptorDatabaseImporter { public void closeConnection() {
/* Log stats about imported descriptors. */ - log.info("Finished importing relay " - + "descriptors: {} consensuses, {} network status entries, {} " - + "votes, {} server descriptors, {} extra-info descriptors, {} " - + "bandwidth history elements, and {} dirreq stats elements", - rcsCount, rrsCount, rvsCount, rdsCount, resCount, rhsCount, - rqsCount); + log.info("Finished importing relay descriptors: {} network status entries " + + "and {} bandwidth history elements", rrsCount, rhsCount);
/* Insert scheduled updates a second time, just in case the refresh * run has started since inserting them the first time in which case @@ -951,18 +735,10 @@ public final class RelayDescriptorDatabaseImporter { this.statusentryOut.write("\.\n"); this.statusentryOut.close(); } - if (this.descriptorOut != null) { - this.descriptorOut.write("\.\n"); - this.descriptorOut.close(); - } if (this.bwhistOut != null) { this.bwhistOut.write("\.\n"); this.bwhistOut.close(); } - if (this.consensusOut != null) { - this.consensusOut.write("\.\n"); - this.consensusOut.close(); - } } catch (IOException e) { log.warn("Could not close one or more raw database import files.", e); } diff --git a/src/main/sql/ipv6servers/init-ipv6servers.sql b/src/main/sql/ipv6servers/init-ipv6servers.sql index b478a49..c94a19d 100644 --- a/src/main/sql/ipv6servers/init-ipv6servers.sql +++ b/src/main/sql/ipv6servers/init-ipv6servers.sql @@ -312,6 +312,17 @@ GROUP BY DATE(valid_after), server, guard_relay, exit_relay, announced_ipv6, ORDER BY valid_after_date, server, guard_relay, exit_relay, announced_ipv6, exiting_ipv6_relay, reachable_ipv6_relay;
+-- View on advertised bandwidth by Exit/Guard flag combination. +CREATE OR REPLACE VIEW bandwidth_advbw AS +SELECT valid_after_date AS date, + exit_relay AS isexit, + guard_relay AS isguard, + FLOOR(SUM(advertised_bandwidth_bytes_sum_avg)) AS advbw +FROM ipv6servers +WHERE server = 'relay' +GROUP BY date, isexit, isguard +ORDER BY date, isexit, isguard; + -- View on the number of running servers by relay flag. CREATE OR REPLACE VIEW servers_flags_complete AS WITH included_statuses AS ( diff --git a/src/main/sql/legacy/tordir.sql b/src/main/sql/legacy/tordir.sql index f1d6767..dfe7b5d 100644 --- a/src/main/sql/legacy/tordir.sql +++ b/src/main/sql/legacy/tordir.sql @@ -3,33 +3,6 @@
CREATE LANGUAGE plpgsql;
--- TABLE descriptor --- Contains all of the descriptors published by routers. -CREATE TABLE descriptor ( - descriptor CHARACTER(40) NOT NULL, - nickname CHARACTER VARYING(19) NOT NULL, - address CHARACTER VARYING(15) NOT NULL, - orport INTEGER NOT NULL, - dirport INTEGER NOT NULL, - fingerprint CHARACTER(40) NOT NULL, - bandwidthavg BIGINT NOT NULL, - bandwidthburst BIGINT NOT NULL, - bandwidthobserved BIGINT NOT NULL, - platform CHARACTER VARYING(256), - published TIMESTAMP WITHOUT TIME ZONE NOT NULL, - uptime BIGINT, - extrainfo CHARACTER(40), - CONSTRAINT descriptor_pkey PRIMARY KEY (descriptor) -); - -CREATE OR REPLACE FUNCTION delete_old_descriptor() -RETURNS INTEGER AS $$ - BEGIN - DELETE FROM descriptor WHERE DATE(published) < current_date - 14; - RETURN 1; - END; -$$ LANGUAGE plpgsql; - -- Contains bandwidth histories reported by relays in extra-info -- descriptors. Each row contains the reported bandwidth in 15-minute -- intervals for each relay and date. @@ -97,22 +70,6 @@ RETURNS INTEGER AS $$ END; $$ LANGUAGE plpgsql;
--- TABLE consensus --- Contains all of the consensuses published by the directories. -CREATE TABLE consensus ( - validafter TIMESTAMP WITHOUT TIME ZONE NOT NULL, - CONSTRAINT consensus_pkey PRIMARY KEY (validafter) -); - --- TABLE bandwidth_flags -CREATE TABLE bandwidth_flags ( - date DATE NOT NULL, - isexit BOOLEAN NOT NULL, - isguard BOOLEAN NOT NULL, - bwadvertised BIGINT NOT NULL, - CONSTRAINT bandwidth_flags_pkey PRIMARY KEY(date, isexit, isguard) -); - -- TABLE bwhist_flags CREATE TABLE bwhist_flags ( date DATE NOT NULL, @@ -149,15 +106,6 @@ CREATE TABLE user_stats ( CONSTRAINT user_stats_pkey PRIMARY KEY(date, country) );
--- TABLE relay_statuses_per_day --- A helper table which is commonly used to update the tables above in the --- refresh_* functions. -CREATE TABLE relay_statuses_per_day ( - date DATE NOT NULL, - count INTEGER NOT NULL, - CONSTRAINT relay_statuses_per_day_pkey PRIMARY KEY(date) -); - -- Dates to be included in the next refresh run. CREATE TABLE scheduled_updates ( id SERIAL, @@ -174,24 +122,6 @@ CREATE TABLE updates ( date DATE );
--- FUNCTION refresh_relay_statuses_per_day() --- Updates helper table which is used to refresh the aggregate tables. -CREATE OR REPLACE FUNCTION refresh_relay_statuses_per_day() -RETURNS INTEGER AS $$ - BEGIN - DELETE FROM relay_statuses_per_day - WHERE date IN (SELECT date FROM updates); - INSERT INTO relay_statuses_per_day (date, count) - SELECT DATE(validafter) AS date, COUNT(*) AS count - FROM consensus - WHERE DATE(validafter) >= (SELECT MIN(date) FROM updates) - AND DATE(validafter) <= (SELECT MAX(date) FROM updates) - AND DATE(validafter) IN (SELECT date FROM updates) - GROUP BY DATE(validafter); - RETURN 1; - END; -$$ LANGUAGE plpgsql; - CREATE OR REPLACE FUNCTION array_sum (BIGINT[]) RETURNS BIGINT AS $$ SELECT SUM($1[i])::bigint FROM generate_series(array_lower($1, 1), array_upper($1, 1)) index(i); @@ -247,45 +177,11 @@ $$ LANGUAGE plpgsql;
-- refresh_* functions -- The following functions keep their corresponding aggregate tables --- up-to-date. They should be called every time ERNIE is run, or when new --- data is finished being added to the descriptor or statusentry tables. +-- up-to-date. They should be called every time this module is run, or when new +-- data is finished being added to the statusentry tables. -- They find what new data has been entered or updated based on the -- updates table.
-CREATE OR REPLACE FUNCTION refresh_bandwidth_flags() RETURNS INTEGER AS $$ - DECLARE - min_date TIMESTAMP WITHOUT TIME ZONE; - max_date TIMESTAMP WITHOUT TIME ZONE; - BEGIN - - min_date := (SELECT MIN(date) FROM updates); - max_date := (SELECT MAX(date) + 1 FROM updates); - - DELETE FROM bandwidth_flags WHERE date IN (SELECT date FROM updates); - EXECUTE ' - INSERT INTO bandwidth_flags (date, isexit, isguard, bwadvertised) - SELECT DATE(validafter) AS date, - BOOL_OR(isexit) AS isexit, - BOOL_OR(isguard) AS isguard, - (SUM(LEAST(bandwidthavg, bandwidthobserved)) - / relay_statuses_per_day.count)::BIGINT AS bwadvertised - FROM descriptor RIGHT JOIN statusentry - ON descriptor.descriptor = statusentry.descriptor - JOIN relay_statuses_per_day - ON DATE(validafter) = relay_statuses_per_day.date - WHERE isrunning = TRUE - AND validafter >= ''' || min_date || ''' - AND validafter < ''' || max_date || ''' - AND DATE(validafter) IN (SELECT date FROM updates) - AND relay_statuses_per_day.date >= ''' || min_date || ''' - AND relay_statuses_per_day.date < ''' || max_date || ''' - AND DATE(relay_statuses_per_day.date) IN - (SELECT date FROM updates) - GROUP BY DATE(validafter), isexit, isguard, relay_statuses_per_day.count'; - RETURN 1; - END; -$$ LANGUAGE plpgsql; - CREATE OR REPLACE FUNCTION refresh_bwhist_flags() RETURNS INTEGER AS $$ DECLARE min_date TIMESTAMP WITHOUT TIME ZONE; @@ -391,18 +287,12 @@ CREATE OR REPLACE FUNCTION refresh_all() RETURNS INTEGER AS $$ DELETE FROM updates; RAISE NOTICE '% Copying scheduled dates.', timeofday(); INSERT INTO updates SELECT * FROM scheduled_updates; - RAISE NOTICE '% Refreshing relay statuses per day.', timeofday(); - PERFORM refresh_relay_statuses_per_day(); - RAISE NOTICE '% Refreshing total relay bandwidth.', timeofday(); - PERFORM refresh_bandwidth_flags(); RAISE NOTICE '% Refreshing bandwidth history.', timeofday(); PERFORM refresh_bwhist_flags(); RAISE NOTICE '% Refreshing user statistics.', timeofday(); PERFORM refresh_user_stats(); RAISE NOTICE '% Deleting processed dates.', timeofday(); DELETE FROM scheduled_updates WHERE id IN (SELECT id FROM updates); - RAISE NOTICE '% Deleting old descriptors.', timeofday(); - PERFORM delete_old_descriptor(); RAISE NOTICE '% Deleting old bandwidth histories.', timeofday(); PERFORM delete_old_bwhist(); RAISE NOTICE '% Deleting old status entries.', timeofday(); @@ -414,23 +304,14 @@ $$ LANGUAGE plpgsql;
-- View for exporting bandwidth statistics. CREATE VIEW stats_bandwidth AS - (SELECT COALESCE(bandwidth_flags.date, bwhist_flags.date) AS date, - COALESCE(bandwidth_flags.isexit, bwhist_flags.isexit) AS isexit, - COALESCE(bandwidth_flags.isguard, bwhist_flags.isguard) AS isguard, - bandwidth_flags.bwadvertised AS advbw, - CASE WHEN bwhist_flags.read IS NOT NULL - THEN bwhist_flags.read / 86400 END AS bwread, - CASE WHEN bwhist_flags.written IS NOT NULL - THEN bwhist_flags.written / 86400 END AS bwwrite, + (SELECT date, isexit, isguard, + read / 86400 AS bwread, + written / 86400 AS bwwrite, NULL AS dirread, NULL AS dirwrite - FROM bandwidth_flags FULL OUTER JOIN bwhist_flags - ON bandwidth_flags.date = bwhist_flags.date - AND bandwidth_flags.isexit = bwhist_flags.isexit - AND bandwidth_flags.isguard = bwhist_flags.isguard - WHERE COALESCE(bandwidth_flags.date, bwhist_flags.date) < - current_date - 2) + FROM bwhist_flags + WHERE date < current_date - 2) UNION ALL - (SELECT date, NULL AS isexit, NULL AS isguard, NULL AS advbw, + (SELECT date, NULL AS isexit, NULL AS isguard, NULL AS bwread, NULL AS bwwrite, FLOOR(CAST(dr AS NUMERIC) / CAST(86400 AS NUMERIC)) AS dirread, FLOOR(CAST(dw AS NUMERIC) / CAST(86400 AS NUMERIC)) AS dirwrite
tor-commits@lists.torproject.org