commit 54976512ff47cc3b8a2c6b5ab2cecc98b1ba8088 Author: Karsten Loesing karsten.loesing@gmx.net Date: Mon Oct 29 19:41:12 2018 +0100
Extend ipv6servers to replace servers.csv.
This change extends the ipv6servers module to generate all relevant data about servers (relays and bridges) that the legacy module currently generates.
There are several reasons why this is useful:
- This change is a step towards implementing all statistics in Java, without needing to call psql or other external tools from within Ant. In fact, it's a step towards getting rid of Ant for executing modules. - The ipv6servers module already supports other statistics than absolute server counts, namely advertised bandwidths. It's easy to extend statistics to consensus weights and guard/middle/exit probabilities. This prepares future graphs which can be added quite easily. - With these new statistics we can finally provide graphs on bridges by version or platform. Or we can extend current graphs to display both relays and bridges, if we want to avoid adding more graphs.
This commit does not yet remove any code from the legacy module. That will be the next logical step. It will even be fun.
Implements #28116. --- CHANGELOG.md | 2 + src/main/R/rserver/graphs.R | 45 +-- .../metrics/stats/ipv6servers/Configuration.java | 2 +- .../metrics/stats/ipv6servers/Database.java | 416 ++++++++++++++++++--- .../stats/ipv6servers/Ipv6NetworkStatus.java | 48 ++- .../stats/ipv6servers/Ipv6ServerDescriptor.java | 8 + .../torproject/metrics/stats/ipv6servers/Main.java | 24 +- .../metrics/stats/ipv6servers/OutputLine.java | 72 ---- .../metrics/stats/ipv6servers/Parser.java | 158 ++++++-- .../metrics/stats/ipv6servers/Writer.java | 14 +- src/main/sql/ipv6servers/init-ipv6servers.sql | 410 ++++++++++++++++++-- 11 files changed, 972 insertions(+), 227 deletions(-)
diff --git a/CHANGELOG.md b/CHANGELOG.md index f604ff9..028df01 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,8 @@ * Medium changes - Start downloading and processing votes. - Add Apache Commons Math 3.6.1 as dependency. + - Extend ipv6servers module to generate servers part of legacy + module.
# Changes in version 1.2.0 - 2018-08-25 diff --git a/src/main/R/rserver/graphs.R b/src/main/R/rserver/graphs.R index b021791..9dc8c2d 100644 --- a/src/main/R/rserver/graphs.R +++ b/src/main/R/rserver/graphs.R @@ -349,16 +349,10 @@ robust_call <- function(wrappee, filename) { }
prepare_networksize <- function(start_p, end_p) { - read.csv(paste(stats_dir, "servers.csv", sep = ""), + read.csv(paste(stats_dir, "networksize.csv", sep = ""), colClasses = c("date" = "Date")) %>% filter(if (!is.null(start_p)) date >= as.Date(start_p) else TRUE) %>% - filter(if (!is.null(end_p)) date <= as.Date(end_p) else TRUE) %>% - filter(flag == "") %>% - filter(country == "") %>% - filter(version == "") %>% - filter(platform == "") %>% - filter(ec2bridge == "") %>% - select(date, relays, bridges) + filter(if (!is.null(end_p)) date <= as.Date(end_p) else TRUE) }
plot_networksize <- function(start_p, end_p, path_p) { @@ -384,16 +378,10 @@ write_networksize <- function(start_p = NULL, end_p = NULL, path_p) { }
prepare_versions <- function(start_p, end_p) { - read.csv(paste(stats_dir, "servers.csv", sep = ""), + read.csv(paste(stats_dir, "versions.csv", sep = ""), colClasses = c("date" = "Date")) %>% filter(if (!is.null(start_p)) date >= as.Date(start_p) else TRUE) %>% - filter(if (!is.null(end_p)) date <= as.Date(end_p) else TRUE) %>% - filter(flag == "") %>% - filter(country == "") %>% - filter(version != "") %>% - filter(platform == "") %>% - filter(ec2bridge == "") %>% - select(date, version, relays) + filter(if (!is.null(end_p)) date <= as.Date(end_p) else TRUE) }
plot_versions <- function(start_p, end_p, path_p) { @@ -428,18 +416,10 @@ write_versions <- function(start_p = NULL, end_p = NULL, path_p) { }
prepare_platforms <- function(start_p, end_p) { - read.csv(paste(stats_dir, "servers.csv", sep = ""), + read.csv(paste(stats_dir, "platforms.csv", sep = ""), colClasses = c("date" = "Date")) %>% filter(if (!is.null(start_p)) date >= as.Date(start_p) else TRUE) %>% - filter(if (!is.null(end_p)) date <= as.Date(end_p) else TRUE) %>% - filter(flag == "") %>% - filter(country == "") %>% - filter(version == "") %>% - filter(platform != "") %>% - filter(ec2bridge == "") %>% - select(date, platform, relays) %>% - mutate(platform = ifelse(platform == "Darwin", "macOS", - as.character(platform))) + filter(if (!is.null(end_p)) date <= as.Date(end_p) else TRUE) }
plot_platforms <- function(start_p, end_p, path_p) { @@ -451,7 +431,8 @@ plot_platforms <- function(start_p, end_p, path_p) { scale_y_continuous(name = "", labels = formatter, limits = c(0, NA)) + scale_colour_manual(name = "Platform", breaks = c("Linux", "macOS", "BSD", "Windows", "Other"), - values = c("#E69F00", "#56B4E9", "#009E73", "#0072B2", "#333333")) + + values = c("Linux" = "#56B4E9", "macOS" = "#333333", "BSD" = "#E69F00", + "Windows" = "#0072B2", "Other" = "#009E73")) + ggtitle("Relay platforms") + labs(caption = copyright_notice) ggsave(filename = path_p, width = 8, height = 5, dpi = 150) @@ -576,17 +557,11 @@ write_dirbytes <- function(start_p = NULL, end_p = NULL, path_p) { }
prepare_relayflags <- function(start_p, end_p, flag_p) { - read.csv(paste(stats_dir, "servers.csv", sep = ""), + read.csv(paste(stats_dir, "relayflags.csv", sep = ""), colClasses = c("date" = "Date")) %>% filter(if (!is.null(start_p)) date >= as.Date(start_p) else TRUE) %>% filter(if (!is.null(end_p)) date <= as.Date(end_p) else TRUE) %>% - filter(country == "") %>% - filter(version == "") %>% - filter(platform == "") %>% - filter(ec2bridge == "") %>% - mutate(flag = ifelse(flag == "", "Running", as.character(flag))) %>% - filter(if (!is.null(flag_p)) flag %in% flag_p else TRUE) %>% - select(date, flag, relays) + filter(if (!is.null(flag_p)) flag %in% flag_p else TRUE) }
plot_relayflags <- function(start_p, end_p, flag_p, path_p) { diff --git a/src/main/java/org/torproject/metrics/stats/ipv6servers/Configuration.java b/src/main/java/org/torproject/metrics/stats/ipv6servers/Configuration.java index c940882..d849cb6 100644 --- a/src/main/java/org/torproject/metrics/stats/ipv6servers/Configuration.java +++ b/src/main/java/org/torproject/metrics/stats/ipv6servers/Configuration.java @@ -13,6 +13,6 @@ class Configuration { static String history = System.getProperty("ipv6servers.history", "status/read-descriptors"); static String output = System.getProperty("ipv6servers.output", - "stats/ipv6servers.csv"); + "stats/"); }
diff --git a/src/main/java/org/torproject/metrics/stats/ipv6servers/Database.java b/src/main/java/org/torproject/metrics/stats/ipv6servers/Database.java index 67afa92..c3a1fec 100644 --- a/src/main/java/org/torproject/metrics/stats/ipv6servers/Database.java +++ b/src/main/java/org/torproject/metrics/stats/ipv6servers/Database.java @@ -10,12 +10,18 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.sql.Timestamp; +import java.sql.Types; import java.time.ZoneId; import java.time.ZonedDateTime; import java.util.ArrayList; import java.util.Calendar; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Locale; +import java.util.Map; +import java.util.Set; +import java.util.SortedSet; import java.util.TimeZone;
/** Database wrapper to connect to the database, insert data, run the stored @@ -28,6 +34,37 @@ class Database implements AutoCloseable { /** Connection object for all interactions with the database. */ private Connection connection;
+ /** Cache for the mapping of platform strings to identifiers in the database. + * Initialized at startup and kept in sync with the database. */ + private Map<String, Integer> platformsCache = new HashMap<>(); + + /** Cache for the mapping of version strings to identifiers in the database. + * Initialized at startup and kept in sync with the database. */ + private Map<String, Integer> versionsCache = new HashMap<>(); + + /** Cache of version strings that have been recommended as server version at + * least once in a consensus. Initialized at startup and kept in sync with the + * database. */ + private Set<String> recommendedVersions = new HashSet<>(); + + /** Cache for the mapping of relay flags to identifiers in the database. + * Initialized at startup and kept in sync with the database. */ + private Map<String, Integer> flagsCache = new HashMap<>(); + + /** Prepared statement for inserting a platform string into the platforms + * table. */ + private PreparedStatement psPlatformsInsert; + + /** Prepared statement for inserting a version string into the versions + * table. */ + private PreparedStatement psVersionsInsert; + + /** Prepared statement for updating a version in the versions table. */ + private PreparedStatement psVersionsUpdate; + + /** Prepared statement for inserting a flag into the flags table. */ + private PreparedStatement psFlagsInsert; + /** Prepared statement for finding out whether a given server descriptor is * already contained in the server_descriptors table. */ private PreparedStatement psServerDescriptorsSelect; @@ -54,6 +91,7 @@ class Database implements AutoCloseable { this.jdbcString = jdbcString; this.connect(); this.prepareStatements(); + this.initializeCaches(); }
private void connect() throws SQLException { @@ -62,24 +100,69 @@ class Database implements AutoCloseable { }
private void prepareStatements() throws SQLException { + this.psPlatformsInsert = this.connection.prepareStatement( + "INSERT INTO platforms (platform_string) VALUES (?)", + Statement.RETURN_GENERATED_KEYS); + this.psVersionsInsert = this.connection.prepareStatement( + "INSERT INTO versions (version_string, recommended) VALUES (?, ?)", + Statement.RETURN_GENERATED_KEYS); + this.psVersionsUpdate = this.connection.prepareStatement( + "UPDATE versions SET recommended = TRUE WHERE version_id = ?"); + this.psFlagsInsert = this.connection.prepareStatement( + "INSERT INTO flags (flag_id, flag_string) VALUES (?, ?)"); this.psServerDescriptorsSelect = this.connection.prepareStatement( "SELECT EXISTS (SELECT 1 FROM server_descriptors " - + "WHERE descriptor_digest_sha1 = decode(?, 'hex'))"); + + "WHERE descriptor_digest_sha1 = decode(?, 'hex'))"); this.psServerDescriptorsInsert = this.connection.prepareStatement( - "INSERT INTO server_descriptors (descriptor_digest_sha1, " - + "advertised_bandwidth_bytes, announced_ipv6, exiting_ipv6_relay) " - + "VALUES (decode(?, 'hex'), ?, ?, ?)"); + "INSERT INTO server_descriptors (descriptor_digest_sha1, platform_id, " + + "version_id, advertised_bandwidth_bytes, announced_ipv6, " + + "exiting_ipv6_relay) VALUES (decode(?, 'hex'), ?, ?, ?, ?, ?)"); this.psStatusesSelect = this.connection.prepareStatement( "SELECT EXISTS (SELECT 1 FROM statuses " - + "WHERE server = CAST(? AS server_enum) AND valid_after = ?)"); + + "WHERE server = CAST(? AS server_enum) AND valid_after = ?)"); this.psStatusesInsert = this.connection.prepareStatement( - "INSERT INTO statuses (server, valid_after, running_count) " - + "VALUES (CAST(? AS server_enum), ?, ?)", + "INSERT INTO statuses (server, valid_after, running_count, " + + "consensus_weight_sum, guard_weight_sum, middle_weight_sum, " + + "exit_weight_sum) VALUES (CAST(? AS server_enum), ?, ?, ?, ?, ?, ?)", Statement.RETURN_GENERATED_KEYS); this.psStatusEntriesInsert = this.connection.prepareStatement( "INSERT INTO status_entries (status_id, descriptor_digest_sha1, " - + "guard_relay, exit_relay, reachable_ipv6_relay) " - + "VALUES (?, decode(?, 'hex'), ?, ?, ?)"); + + "flags, reachable_ipv6_relay, consensus_weight, guard_weight, " + + "middle_weight, exit_weight) " + + "VALUES (?, decode(?, 'hex'), ?, ?, ?, ?, ?, ?)"); + } + + private void initializeCaches() throws SQLException { + Statement st = this.connection.createStatement(); + String queryString = "SELECT platform_id, platform_string FROM platforms"; + try (ResultSet rs = st.executeQuery(queryString)) { + while (rs.next()) { + this.platformsCache.put( + rs.getString("platform_string"), rs.getInt("platform_id")); + } + } + st = this.connection.createStatement(); + queryString = "SELECT version_id, version_string, recommended " + + "FROM versions"; + try (ResultSet rs = st.executeQuery(queryString)) { + while (rs.next()) { + String version = rs.getString("version_string"); + int versionId = rs.getInt("version_id"); + boolean recommended = rs.getBoolean("recommended"); + this.versionsCache.put(version, versionId); + if (recommended) { + this.recommendedVersions.add(version); + } + } + } + st = this.connection.createStatement(); + queryString = "SELECT flag_id, flag_string FROM flags"; + try (ResultSet rs = st.executeQuery(queryString)) { + while (rs.next()) { + this.flagsCache.put( + rs.getString("flag_string"), rs.getInt("flag_id")); + } + } }
/** Insert a server descriptor into the server_descriptors table. */ @@ -98,17 +181,87 @@ class Database implements AutoCloseable { } this.psServerDescriptorsInsert.clearParameters(); this.psServerDescriptorsInsert.setString(1, serverDescriptor.digest); - this.psServerDescriptorsInsert.setInt(2, + if (null != serverDescriptor.platform) { + this.psServerDescriptorsInsert.setInt(2, + this.selectOrInsertPlatform(serverDescriptor.platform)); + } else { + this.psServerDescriptorsInsert.setNull(2, Types.INTEGER); + } + if (null != serverDescriptor.version) { + this.psServerDescriptorsInsert.setInt(3, + this.selectOrInsertVersion(serverDescriptor.version, false)); + } else { + this.psServerDescriptorsInsert.setNull(3, Types.INTEGER); + } + this.psServerDescriptorsInsert.setInt(4, serverDescriptor.advertisedBandwidth); - this.psServerDescriptorsInsert.setBoolean(3, serverDescriptor.announced); - this.psServerDescriptorsInsert.setBoolean(4, serverDescriptor.exiting); + this.psServerDescriptorsInsert.setBoolean(5, serverDescriptor.announced); + this.psServerDescriptorsInsert.setBoolean(6, serverDescriptor.exiting); this.psServerDescriptorsInsert.execute(); }
+ /** Return the platform identifier for a given platform string, either from + * our local cache, from a database query, or after inserting it into the + * platforms table. */ + int selectOrInsertPlatform(String platform) throws SQLException { + if (!this.platformsCache.containsKey(platform)) { + int platformId = -1; + this.psPlatformsInsert.clearParameters(); + this.psPlatformsInsert.setString(1, platform); + this.psPlatformsInsert.execute(); + try (ResultSet rs = this.psPlatformsInsert.getGeneratedKeys()) { + if (rs.next()) { + platformId = rs.getInt(1); + } + } + if (platformId < 0) { + throw new SQLException("Could not retrieve auto-generated key for " + + "new platforms entry."); + } + this.platformsCache.put(platform, platformId); + } + return this.platformsCache.get(platform); + } + + /** Return the version identifier for a given version string, either from our + * local cache, from a database query, or after inserting it into the versions + * table. */ + int selectOrInsertVersion(String version, boolean recommended) + throws SQLException { + if (!this.versionsCache.containsKey(version)) { + int versionId = -1; + this.psVersionsInsert.clearParameters(); + this.psVersionsInsert.setString(1, version); + this.psVersionsInsert.setBoolean(2, recommended); + this.psVersionsInsert.execute(); + try (ResultSet rs = this.psVersionsInsert.getGeneratedKeys()) { + if (rs.next()) { + versionId = rs.getInt(1); + } + } + if (versionId < 0) { + throw new SQLException("Could not retrieve auto-generated key for " + + "new versions entry."); + } + this.versionsCache.put(version, versionId); + if (recommended) { + this.recommendedVersions.add(version); + } + } + if (recommended && !this.recommendedVersions.contains(version)) { + int versionId = this.versionsCache.get(version); + this.psVersionsUpdate.clearParameters(); + this.psVersionsUpdate.setInt(1, versionId); + this.psVersionsUpdate.execute(); + this.recommendedVersions.add(version); + } + return this.versionsCache.get(version); + } + /** Insert a status and all contained entries into the statuses and * status_entries table. */ - void insertStatus(Ipv6NetworkStatus networkStatus) - throws SQLException { + void insertStatus(Ipv6NetworkStatus networkStatus) throws SQLException { + this.insertRecommendedVersions(networkStatus.recommendedVersions); this.psStatusesSelect.clearParameters(); this.psStatusesSelect.setString(1, networkStatus.isRelay ? "relay" : "bridge"); @@ -133,6 +286,26 @@ class Database implements AutoCloseable { Timestamp.from(ZonedDateTime.of(networkStatus.timestamp, ZoneId.of("UTC")).toInstant()), calendar); this.psStatusesInsert.setInt(3, networkStatus.running); + if (null != networkStatus.totalConsensusWeight) { + this.psStatusesInsert.setFloat(4, networkStatus.totalConsensusWeight); + } else { + this.psStatusesInsert.setNull(4, Types.FLOAT); + } + if (null != networkStatus.totalGuardWeight) { + this.psStatusesInsert.setFloat(5, networkStatus.totalGuardWeight); + } else { + this.psStatusesInsert.setNull(5, Types.FLOAT); + } + if (null != networkStatus.totalMiddleWeight) { + this.psStatusesInsert.setFloat(6, networkStatus.totalMiddleWeight); + } else { + this.psStatusesInsert.setNull(6, Types.FLOAT); + } + if (null != networkStatus.totalExitWeight) { + this.psStatusesInsert.setFloat(7, networkStatus.totalExitWeight); + } else { + this.psStatusesInsert.setNull(7, Types.FLOAT); + } this.psStatusesInsert.execute(); try (ResultSet rs = this.psStatusesInsert.getGeneratedKeys()) { if (rs.next()) { @@ -144,22 +317,81 @@ class Database implements AutoCloseable { + "statuses entry."); } for (Ipv6NetworkStatus.Entry entry : networkStatus.entries) { - this.psStatusEntriesInsert.clearParameters(); - this.psStatusEntriesInsert.setInt(1, statusId); - this.psStatusEntriesInsert.setString(2, entry.digest); - this.psStatusEntriesInsert.setBoolean(3, entry.guard); - this.psStatusEntriesInsert.setBoolean(4, entry.exit); - this.psStatusEntriesInsert.setBoolean(5, entry.reachable); - this.psStatusEntriesInsert.addBatch(); + this.insertStatusEntry(statusId, entry); } this.psStatusEntriesInsert.executeBatch(); }
+ + void insertRecommendedVersions(List<String> versions) + throws SQLException { + if (null != versions + && !this.versionsCache.keySet().containsAll(versions)) { + for (String version : versions) { + this.selectOrInsertVersion(version, true); + } + } + } + + /** Insert a status entry into the status_entries table. */ + void insertStatusEntry(int statusId, Ipv6NetworkStatus.Entry entry) + throws SQLException { + this.insertFlags(entry.flags); + int flags = 0; + if (null != entry.flags) { + for (String flag : entry.flags) { + int flagId = this.flagsCache.get(flag); + flags |= 1 << flagId; + } + } + this.psStatusEntriesInsert.clearParameters(); + this.psStatusEntriesInsert.setInt(1, statusId); + this.psStatusEntriesInsert.setString(2, entry.digest); + this.psStatusEntriesInsert.setInt(3, flags); + this.psStatusEntriesInsert.setBoolean(4, entry.reachable); + if (null != entry.consensusWeight) { + this.psStatusEntriesInsert.setFloat(5, entry.consensusWeight); + } else { + this.psStatusEntriesInsert.setNull(5, Types.FLOAT); + } + if (null != entry.guardWeight) { + this.psStatusEntriesInsert.setFloat(6, entry.guardWeight); + } else { + this.psStatusEntriesInsert.setNull(6, Types.FLOAT); + } + if (null != entry.middleWeight) { + this.psStatusEntriesInsert.setFloat(7, entry.middleWeight); + } else { + this.psStatusEntriesInsert.setNull(7, Types.FLOAT); + } + if (null != entry.exitWeight) { + this.psStatusEntriesInsert.setFloat(8, entry.exitWeight); + } else { + this.psStatusEntriesInsert.setNull(8, Types.FLOAT); + } + this.psStatusEntriesInsert.addBatch(); + } + + void insertFlags(SortedSet<String> flags) throws SQLException { + if (null != flags && !this.flagsCache.keySet().containsAll(flags)) { + for (String flag : flags) { + if (!this.flagsCache.containsKey(flag)) { + int flagId = this.flagsCache.size(); + this.psFlagsInsert.clearParameters(); + this.psFlagsInsert.setInt(1, flagId); + this.psFlagsInsert.setString(2, flag); + this.psFlagsInsert.execute(); + this.flagsCache.put(flag, flagId); + } + } + } + } + /** Call the aggregate() function to aggregate rows from the status_entries - * and server_descriptors tables into the aggregated table. */ + * and server_descriptors tables into the aggregated_* tables. */ void aggregate() throws SQLException { Statement st = this.connection.createStatement(); - st.executeQuery("SELECT aggregate_ipv6()"); + st.executeQuery("SELECT aggregate()"); }
/** Roll back any changes made in this execution. */ @@ -172,41 +404,131 @@ class Database implements AutoCloseable { this.connection.commit(); }
- /** Query the servers_ipv6 view to obtain aggregated statistics. */ - Iterable<OutputLine> queryServersIpv6() throws SQLException { - List<OutputLine> statistics = new ArrayList<>(); + /** Query the servers_ipv6 view. */ + List<String[]> queryServersIpv6() throws SQLException { + List<String[]> statistics = new ArrayList<>(); + String columns = "valid_after_date, server, guard_relay, exit_relay, " + + "announced_ipv6, exiting_ipv6_relay, reachable_ipv6_relay, " + + "server_count_sum_avg, advertised_bandwidth_bytes_sum_avg"; + statistics.add(columns.split(", ")); Statement st = this.connection.createStatement(); Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone("UTC"), Locale.US); - String queryString = "SELECT " + OutputLine.columnHeadersDelimitedBy(", ") - + " FROM ipv6servers"; + String queryString = "SELECT " + columns + " FROM ipv6servers"; try (ResultSet rs = st.executeQuery(queryString)) { while (rs.next()) { - OutputLine outputLine = new OutputLine(); - outputLine.date = rs.getDate(OutputLine.Column.VALID_AFTER_DATE.name(), - calendar).toLocalDate(); - outputLine.server = rs.getString(OutputLine.Column.SERVER.name()); - outputLine.guard = rs.getString(OutputLine.Column.GUARD_RELAY.name()); - outputLine.exit = rs.getString(OutputLine.Column.EXIT_RELAY.name()); - outputLine.announced = rs.getString( - OutputLine.Column.ANNOUNCED_IPV6.name()); - outputLine.exiting = rs.getString( - OutputLine.Column.EXITING_IPV6_RELAY.name()); - outputLine.reachable = rs.getString( - OutputLine.Column.REACHABLE_IPV6_RELAY.name()); - outputLine.count = rs.getLong( - OutputLine.Column.SERVER_COUNT_SUM_AVG.name()); - outputLine.advertisedBandwidth = rs.getLong( - OutputLine.Column.ADVERTISED_BANDWIDTH_BYTES_SUM_AVG.name()); - if (rs.wasNull()) { - outputLine.advertisedBandwidth = null; - } + String[] outputLine = new String[9]; + outputLine[0] = rs.getDate("valid_after_date", calendar) + .toLocalDate().toString(); + outputLine[1] = rs.getString("server"); + outputLine[2] = rs.getString("guard_relay"); + outputLine[3] = rs.getString("exit_relay"); + outputLine[4] = rs.getString("announced_ipv6"); + outputLine[5] = rs.getString("exiting_ipv6_relay"); + outputLine[6] = rs.getString("reachable_ipv6_relay"); + outputLine[7] = getLongFromResultSet(rs, "server_count_sum_avg"); + outputLine[8] = getLongFromResultSet(rs, + "advertised_bandwidth_bytes_sum_avg"); + statistics.add(outputLine); + } + } + return statistics; + } + + /** Query the servers_networksize view. */ + List<String[]> queryNetworksize() throws SQLException { + List<String[]> statistics = new ArrayList<>(); + String columns = "date, relays, bridges"; + statistics.add(columns.split(", ")); + Statement st = this.connection.createStatement(); + Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone("UTC"), + Locale.US); + String queryString = "SELECT " + columns + " FROM servers_networksize"; + try (ResultSet rs = st.executeQuery(queryString)) { + while (rs.next()) { + String[] outputLine = new String[3]; + outputLine[0] = rs.getDate("date", calendar).toLocalDate().toString(); + outputLine[1] = getLongFromResultSet(rs, "relays"); + outputLine[2] = getLongFromResultSet(rs, "bridges"); statistics.add(outputLine); } } return statistics; }
+ /** Query the servers_relayflags view. */ + List<String[]> queryRelayflags() throws SQLException { + List<String[]> statistics = new ArrayList<>(); + String columns = "date, flag, relays"; + statistics.add(columns.split(", ")); + Statement st = this.connection.createStatement(); + Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone("UTC"), + Locale.US); + String queryString = "SELECT " + columns + " FROM servers_relayflags"; + try (ResultSet rs = st.executeQuery(queryString)) { + while (rs.next()) { + String[] outputLine = new String[3]; + outputLine[0] = rs.getDate("date", calendar).toLocalDate().toString(); + outputLine[1] = rs.getString("flag"); + outputLine[2] = getLongFromResultSet(rs, "relays"); + statistics.add(outputLine); + } + } + return statistics; + } + + /** Query the servers_versions view. */ + List<String[]> queryVersions() throws SQLException { + List<String[]> statistics = new ArrayList<>(); + String columns = "date, version, relays"; + statistics.add(columns.split(", ")); + Statement st = this.connection.createStatement(); + Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone("UTC"), + Locale.US); + String queryString = "SELECT " + columns + " FROM servers_versions"; + try (ResultSet rs = st.executeQuery(queryString)) { + while (rs.next()) { + String[] outputLine = new String[3]; + outputLine[0] = rs.getDate("date", calendar).toLocalDate().toString(); + outputLine[1] = rs.getString("version"); + outputLine[2] = getLongFromResultSet(rs, "relays"); + statistics.add(outputLine); + } + } + return statistics; + } + + /** Query the servers_platforms view. */ + List<String[]> queryPlatforms() throws SQLException { + List<String[]> statistics = new ArrayList<>(); + String columns = "date, platform, relays"; + statistics.add(columns.split(", ")); + Statement st = this.connection.createStatement(); + Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone("UTC"), + Locale.US); + String queryString = "SELECT " + columns + " FROM servers_platforms"; + try (ResultSet rs = st.executeQuery(queryString)) { + while (rs.next()) { + String[] outputLine = new String[3]; + outputLine[0] = rs.getDate("date", calendar).toLocalDate().toString(); + outputLine[1] = rs.getString("platform"); + outputLine[2] = getLongFromResultSet(rs, "relays"); + statistics.add(outputLine); + } + } + return statistics; + } + + /** Retrieve the <code>long</code> value of the designated column in the + * current row of the given <code>ResultSet</code> object and format it as a + * <code>String</code> object, or return <code>null</code> if the retrieved + * value was <code>NULL</code>. */ + private static String getLongFromResultSet(ResultSet rs, String columnLabel) + throws SQLException { + long result = rs.getLong(columnLabel); + return rs.wasNull() ? null : String.valueOf(result); + } + /** Release database connection. */ public void close() throws SQLException { this.connection.close(); diff --git a/src/main/java/org/torproject/metrics/stats/ipv6servers/Ipv6NetworkStatus.java b/src/main/java/org/torproject/metrics/stats/ipv6servers/Ipv6NetworkStatus.java index 611ca4e..526adc7 100644 --- a/src/main/java/org/torproject/metrics/stats/ipv6servers/Ipv6NetworkStatus.java +++ b/src/main/java/org/torproject/metrics/stats/ipv6servers/Ipv6NetworkStatus.java @@ -6,6 +6,7 @@ package org.torproject.metrics.stats.ipv6servers; import java.time.LocalDateTime; import java.util.ArrayList; import java.util.List; +import java.util.SortedSet;
/** Data object holding all relevant parts parsed from a (relay or bridge) * network status. */ @@ -19,9 +20,32 @@ class Ipv6NetworkStatus { * case of bridge network status. */ LocalDateTime timestamp;
+ /** List of recommended (server) versions, or null in case of bridge network + * status. */ + List<String> recommendedVersions; + /** Number of relays or bridges with the Running flag. */ int running = 0;
+ /** Total consensus weight of all status entries, or null if the status does + * not contain entries with consensus weights. */ + Float totalConsensusWeight; + + /** Total guard-weighted consensus weight of all status entries, or null if + * the status either does not contain entries with consensus weight or no Wxx + * values. */ + Float totalGuardWeight; + + /** Total middle-weighted consensus weight of all status entries, or null if + * the status either does not contain entries with consensus weight or no Wxx + * values. */ + Float totalMiddleWeight; + + /** Total exit-weighted consensus weight of all status entries, or null if + * the status either does not contain entries with consensus weight or no Wxx + * values. */ + Float totalExitWeight; + /** Contained status entries. */ List<Entry> entries = new ArrayList<>();
@@ -31,17 +55,29 @@ class Ipv6NetworkStatus { /** Hex-encoded SHA-1 server descriptor digest. */ String digest;
- /** Whether this relay has the Guard flag; false for bridges. */ - boolean guard; - - /** Whether this relay has the Exit flag (and not the BadExit flag at the - * same time); false for bridges. */ - boolean exit; + /** Relay flags assigned to this relay or bridge. */ + SortedSet<String> flags;
/** Whether the directory authorities include an IPv6 address in this * entry's "a" line, confirming the relay's reachability via IPv6; false for * bridges. */ boolean reachable; + + /** Consensus weight of this entry, or null if the entry does not have a "w" + * line. */ + Float consensusWeight; + + /** Guard-weighted consensus weight of this entry, or null if either the + * entry does not have a "w" line or the consensus has no Wxx values. */ + Float guardWeight; + + /** Middle-weighted consensus weight of this entry, or null if either the + * entry does not have a "w" line or the consensus has no Wxx values. */ + Float middleWeight; + + /** Exit-weighted consensus weight of this entry, or null if either the + * entry does not have a "w" line or the consensus has no Wxx values. */ + Float exitWeight; } }
diff --git a/src/main/java/org/torproject/metrics/stats/ipv6servers/Ipv6ServerDescriptor.java b/src/main/java/org/torproject/metrics/stats/ipv6servers/Ipv6ServerDescriptor.java index cb725b0..387024b 100644 --- a/src/main/java/org/torproject/metrics/stats/ipv6servers/Ipv6ServerDescriptor.java +++ b/src/main/java/org/torproject/metrics/stats/ipv6servers/Ipv6ServerDescriptor.java @@ -10,6 +10,14 @@ class Ipv6ServerDescriptor { /** Hex-encoded SHA-1 server descriptor digest. */ String digest;
+ /** Platform obtained from the platform line without the Tor software + * version. */ + String platform; + + /** Tor software version obtained from the platform line without the + * platform. */ + String version; + /** Advertised bandwidth bytes of this relay as the minimum of bandwidth rate, * bandwidth burst, and observed bandwidth (if reported); 0 for bridges. */ int advertisedBandwidth; diff --git a/src/main/java/org/torproject/metrics/stats/ipv6servers/Main.java b/src/main/java/org/torproject/metrics/stats/ipv6servers/Main.java index 5a20b58..a91a74f 100644 --- a/src/main/java/org/torproject/metrics/stats/ipv6servers/Main.java +++ b/src/main/java/org/torproject/metrics/stats/ipv6servers/Main.java @@ -20,7 +20,7 @@ import java.util.Arrays;
/** Main class of the ipv6servers module that imports relevant parts from server * descriptors and network statuses into a database, and exports aggregate - * statistics on IPv6 support to a CSV file. */ + * statistics to CSV files. */ public class Main {
private static Logger log = LoggerFactory.getLogger(Main.class); @@ -61,8 +61,13 @@ public class Main { } else if (descriptor instanceof BridgeNetworkStatus) { database.insertStatus(parser.parseBridgeNetworkStatus( (BridgeNetworkStatus) descriptor)); + } else if (null != descriptor.getRawDescriptorBytes()) { + log.debug("Skipping unknown descriptor of type {} starting with " + + "'{}'.", descriptor.getClass(), + new String(descriptor.getRawDescriptorBytes(), 0, + Math.min(descriptor.getRawDescriptorLength(), 100))); } else { - log.debug("Skipping unknown descriptor of type {}.", + log.debug("Skipping unknown, empty descriptor of type {}.", descriptor.getClass()); } } @@ -81,11 +86,16 @@ public class Main { reader.saveHistoryFile(historyFile);
log.info("Querying aggregated statistics from the database."); - Iterable<OutputLine> output = database.queryServersIpv6(); - log.info("Writing aggregated statistics to {}.", Configuration.output); - if (null != output) { - new Writer().write(Paths.get(Configuration.output), output); - } + new Writer().write(Paths.get(Configuration.output, "ipv6servers.csv"), + database.queryServersIpv6()); + new Writer().write(Paths.get(Configuration.output, "networksize.csv"), + database.queryNetworksize()); + new Writer().write(Paths.get(Configuration.output, "relayflags.csv"), + database.queryRelayflags()); + new Writer().write(Paths.get(Configuration.output, "versions.csv"), + database.queryVersions()); + new Writer().write(Paths.get(Configuration.output, "platforms.csv"), + database.queryPlatforms());
log.info("Terminating ipv6servers module."); } catch (SQLException sqle) { diff --git a/src/main/java/org/torproject/metrics/stats/ipv6servers/OutputLine.java b/src/main/java/org/torproject/metrics/stats/ipv6servers/OutputLine.java deleted file mode 100644 index a5acfa0..0000000 --- a/src/main/java/org/torproject/metrics/stats/ipv6servers/OutputLine.java +++ /dev/null @@ -1,72 +0,0 @@ -/* Copyright 2017--2018 The Tor Project - * See LICENSE for licensing information */ - -package org.torproject.metrics.stats.ipv6servers; - -import java.time.LocalDate; -import java.util.Arrays; -import java.util.stream.Collectors; - -/** Data object holding all parts of an output line. */ -class OutputLine { - - /** Column names used in the database and in the first line of the output - * file. */ - enum Column { - VALID_AFTER_DATE, SERVER, GUARD_RELAY, EXIT_RELAY, ANNOUNCED_IPV6, - EXITING_IPV6_RELAY, REACHABLE_IPV6_RELAY, SERVER_COUNT_SUM_AVG, - ADVERTISED_BANDWIDTH_BYTES_SUM_AVG - } - - /** Column headers joined together with the given delimiter. */ - static String columnHeadersDelimitedBy(String delimiter) { - return Arrays.stream(Column.values()).map(c -> c.toString().toLowerCase()) - .collect(Collectors.joining(delimiter)); - } - - /** Date. */ - LocalDate date; - - /** Server type, which can be "relay" or "bridge". */ - String server; - - /** Whether relays had the Guard flag ("t") or not ("f"). */ - String guard; - - /** Whether relays had the Exit flag ("t") or not ("f"). */ - String exit; - - /** Whether relays or bridges have announced an IPv6 address in their server - * descriptor ("t") or not ("f"). */ - String announced; - - /** Whether relays have announced a non-reject-all IPv6 exit policy in their - * server descriptor ("t") or not ("f"). */ - String exiting; - - /** Whether the directory authorities have confirmed IPv6 OR reachability by - * including an "a" line for a relay containing an IPv6 address. */ - String reachable; - - /** Number of relays or bridges matching the previous criteria. */ - long count; - - /** Total advertised bandwidth of all relays matching the previous - * criteria. */ - Long advertisedBandwidth; - - /** Format all fields in a single output line for inclusion in a CSV - * file. */ - @Override - public String toString() { - return String.format("%s,%s,%s,%s,%s,%s,%s,%s,%s", - date, server, emptyNull(guard), emptyNull(exit), emptyNull(announced), - emptyNull(exiting), emptyNull(reachable), emptyNull(count), - emptyNull(advertisedBandwidth)); - } - - private static String emptyNull(Object text) { - return null == text ? "" : text.toString(); - } -} - diff --git a/src/main/java/org/torproject/metrics/stats/ipv6servers/Parser.java b/src/main/java/org/torproject/metrics/stats/ipv6servers/Parser.java index 95c495e..9d8b71a 100644 --- a/src/main/java/org/torproject/metrics/stats/ipv6servers/Parser.java +++ b/src/main/java/org/torproject/metrics/stats/ipv6servers/Parser.java @@ -13,17 +13,31 @@ import org.apache.commons.lang3.StringUtils;
import java.time.Instant; import java.time.ZoneId; +import java.util.Arrays; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern;
/** Parser that extracts all relevant parts from (relay and bridge) server * descriptors and (relay and bridge) statuses and creates data objects for * them. */ class Parser {
+ private Pattern platformPattern = Pattern.compile("^Tor (.+) on (.+)$"); + /** Parse a (relay or bridge) server descriptor. */ Ipv6ServerDescriptor parseServerDescriptor( ServerDescriptor serverDescriptor) { Ipv6ServerDescriptor parsedDescriptor = new Ipv6ServerDescriptor(); parsedDescriptor.digest = serverDescriptor.getDigestSha1Hex(); + if (null != serverDescriptor.getPlatform()) { + Matcher platformMatcher = platformPattern.matcher( + serverDescriptor.getPlatform()); + if (platformMatcher.matches() && platformMatcher.groupCount() == 2) { + parsedDescriptor.version = platformMatcher.group(1); + parsedDescriptor.platform = platformMatcher.group(2); + } + } for (String orAddress : serverDescriptor.getOrAddresses()) { /* Check whether the additional OR address is an IPv6 address containing * at least two colons as opposed to an IPv4 address and TCP port @@ -52,48 +66,132 @@ class Parser {
Ipv6NetworkStatus parseRelayNetworkStatusConsensus( RelayNetworkStatusConsensus consensus) { - return this.parseStatus(true, consensus.getValidAfterMillis(), - consensus.getStatusEntries().values()); - } - - Ipv6NetworkStatus parseBridgeNetworkStatus(BridgeNetworkStatus status) { - return this.parseStatus(false, status.getPublishedMillis(), - status.getStatusEntries().values()); - } - - private Ipv6NetworkStatus parseStatus(boolean isRelay, long timestampMillis, - Iterable<NetworkStatusEntry> entries) { Ipv6NetworkStatus parsedStatus = new Ipv6NetworkStatus(); - parsedStatus.isRelay = isRelay; - parsedStatus.timestamp = Instant.ofEpochMilli(timestampMillis) + parsedStatus.isRelay = true; + parsedStatus.timestamp = Instant.ofEpochMilli( + consensus.getValidAfterMillis()) .atZone(ZoneId.of("UTC")).toLocalDateTime(); - for (NetworkStatusEntry entry : entries) { - if (!entry.getFlags().contains("Running")) { - continue; + parsedStatus.recommendedVersions = consensus.getRecommendedServerVersions(); + boolean consensusContainsBandwidthWeights = + null != consensus.getBandwidthWeights() + && consensus.getBandwidthWeights().keySet().containsAll(Arrays.asList( + "Wgg", "Wgd", "Wmg", "Wmm", "Wme", "Wmd", "Wee", "Wed")); + float wgg = 0.0f; + float wgd = 0.0f; + float wmg = 0.0f; + float wmm = 0.0f; + float wme = 0.0f; + float wmd = 0.0f; + float wee = 0.0f; + float wed = 0.0f; + if (consensusContainsBandwidthWeights) { + for (Map.Entry<String, Integer> e + : consensus.getBandwidthWeights().entrySet()) { + float weight = e.getValue().floatValue() / 10000.0f; + switch (e.getKey()) { + case "Wgg": + wgg = weight; + break; + case "Wgd": + wgd = weight; + break; + case "Wmg": + wmg = weight; + break; + case "Wmm": + wmm = weight; + break; + case "Wme": + wme = weight; + break; + case "Wmd": + wmd = weight; + break; + case "Wee": + wee = weight; + break; + case "Wed": + wed = weight; + break; + default: + /* Ignore other weights. */ + } } - parsedStatus.running++; } - for (NetworkStatusEntry entry : entries) { + for (NetworkStatusEntry entry : consensus.getStatusEntries().values()) { if (!entry.getFlags().contains("Running")) { continue; } Ipv6NetworkStatus.Entry parsedEntry = new Ipv6NetworkStatus.Entry(); parsedEntry.digest = entry.getDescriptor().toLowerCase(); - if (isRelay) { - parsedEntry.guard = entry.getFlags().contains("Guard"); - parsedEntry.exit = entry.getFlags().contains("Exit") - && !entry.getFlags().contains("BadExit"); - parsedEntry.reachable = false; - for (String orAddress : entry.getOrAddresses()) { - /* Check whether the additional OR address is an IPv6 address - * containing at least two colons as opposed to an IPv4 address and - * TCP port containing only one colon as separator. */ - if (StringUtils.countMatches(orAddress, ":") >= 2) { - parsedEntry.reachable = true; - break; + parsedEntry.flags = entry.getFlags(); + parsedEntry.reachable = false; + for (String orAddress : entry.getOrAddresses()) { + /* Check whether the additional OR address is an IPv6 address + * containing at least two colons as opposed to an IPv4 address and + * TCP port containing only one colon as separator. */ + if (StringUtils.countMatches(orAddress, ":") >= 2) { + parsedEntry.reachable = true; + break; + } + } + parsedStatus.running++; + boolean isExit = entry.getFlags().contains("Exit") + && !entry.getFlags().contains("BadExit"); + boolean isGuard = entry.getFlags().contains("Guard"); + long consensusWeight = entry.getBandwidth(); + if (consensusWeight >= 0L) { + parsedEntry.consensusWeight = (float) consensusWeight; + if (consensusContainsBandwidthWeights) { + if (isGuard && isExit) { + parsedEntry.guardWeight = parsedEntry.consensusWeight * wgd; + parsedEntry.middleWeight = parsedEntry.consensusWeight * wmd; + parsedEntry.exitWeight = parsedEntry.consensusWeight * wed; + } else if (isGuard) { + parsedEntry.guardWeight = parsedEntry.consensusWeight * wgg; + parsedEntry.middleWeight = parsedEntry.consensusWeight * wmg; + parsedEntry.exitWeight = 0.0f; + } else if (isExit) { + parsedEntry.guardWeight = 0.0f; + parsedEntry.middleWeight = parsedEntry.consensusWeight * wme; + parsedEntry.exitWeight = parsedEntry.consensusWeight * wee; + } else { + parsedEntry.guardWeight = 0.0f; + parsedEntry.middleWeight = parsedEntry.consensusWeight * wmm; + parsedEntry.exitWeight = 0.0f; + } + if (null == parsedStatus.totalGuardWeight) { + parsedStatus.totalGuardWeight = 0.0f; + parsedStatus.totalMiddleWeight = 0.0f; + parsedStatus.totalExitWeight = 0.0f; } + parsedStatus.totalGuardWeight += parsedEntry.guardWeight; + parsedStatus.totalMiddleWeight += parsedEntry.middleWeight; + parsedStatus.totalExitWeight += parsedEntry.exitWeight; } + if (null == parsedStatus.totalConsensusWeight) { + parsedStatus.totalConsensusWeight = 0.0f; + } + parsedStatus.totalConsensusWeight += parsedEntry.consensusWeight; + } + parsedStatus.entries.add(parsedEntry); + } + return parsedStatus; + } + + Ipv6NetworkStatus parseBridgeNetworkStatus(BridgeNetworkStatus status) { + Ipv6NetworkStatus parsedStatus = new Ipv6NetworkStatus(); + parsedStatus.isRelay = false; + parsedStatus.timestamp = Instant.ofEpochMilli(status.getPublishedMillis()) + .atZone(ZoneId.of("UTC")).toLocalDateTime(); + for (NetworkStatusEntry entry : status.getStatusEntries().values()) { + if (!entry.getFlags().contains("Running")) { + continue; } + parsedStatus.running++; + Ipv6NetworkStatus.Entry parsedEntry = new Ipv6NetworkStatus.Entry(); + parsedEntry.digest = entry.getDescriptor().toLowerCase(); + parsedEntry.flags = entry.getFlags(); parsedStatus.entries.add(parsedEntry); } return parsedStatus; diff --git a/src/main/java/org/torproject/metrics/stats/ipv6servers/Writer.java b/src/main/java/org/torproject/metrics/stats/ipv6servers/Writer.java index 9edb347..4e2893c 100644 --- a/src/main/java/org/torproject/metrics/stats/ipv6servers/Writer.java +++ b/src/main/java/org/torproject/metrics/stats/ipv6servers/Writer.java @@ -16,7 +16,7 @@ import java.util.List; class Writer {
/** Write output lines to the given file. */ - void write(Path filePath, Iterable<OutputLine> outputLines) + void write(Path filePath, Iterable<String[]> outputLines) throws IOException { File parentFile = filePath.toFile().getParentFile(); if (null != parentFile && !parentFile.exists()) { @@ -26,9 +26,15 @@ class Writer { } } List<String> formattedOutputLines = new ArrayList<>(); - formattedOutputLines.add(OutputLine.columnHeadersDelimitedBy(",")); - for (OutputLine line : outputLines) { - formattedOutputLines.add(line.toString()); + for (String[] outputLine : outputLines) { + StringBuilder formattedOutputLine = new StringBuilder(); + for (String outputLinePart : outputLine) { + formattedOutputLine.append(','); + if (null != outputLinePart) { + formattedOutputLine.append(outputLinePart); + } + } + formattedOutputLines.add(formattedOutputLine.substring(1)); } Files.write(filePath, formattedOutputLines, StandardCharsets.UTF_8); } diff --git a/src/main/sql/ipv6servers/init-ipv6servers.sql b/src/main/sql/ipv6servers/init-ipv6servers.sql index 8ed9372..b478a49 100644 --- a/src/main/sql/ipv6servers/init-ipv6servers.sql +++ b/src/main/sql/ipv6servers/init-ipv6servers.sql @@ -1,12 +1,54 @@ -- Copyright 2017--2018 The Tor Project -- See LICENSE for licensing information
+-- Table of all known flags, to match flag strings to bit positions in the flags +-- column in status_entries. If the number of known flags ever grows larger than +-- 31, we'll have to extend flag_id to accept values between 0 and 63 and alter +-- the flags column in status_entries from INTEGER to BIGINT. And if it grows +-- even larger, we'll have to do something even smarter. +CREATE TABLE flags ( + flag_id SMALLINT PRIMARY KEY CHECK (flag_id BETWEEN 0 AND 30), + flag_string TEXT UNIQUE NOT NULL +); + +-- Hard-wire the Guard and (Bad)Exit flags, so that we can implement +-- aggregate_ipv6() below without having to look up their IDs in the flags +-- table. +INSERT INTO flags (flag_id, flag_string) VALUES (0, 'Guard'); +INSERT INTO flags (flag_id, flag_string) VALUES (1, 'Exit'); +INSERT INTO flags (flag_id, flag_string) VALUES (2, 'BadExit'); + +-- Table of all known versions, either from platform lines in server descriptors +-- or recommended-server-versions lines in consensuses. Versions found in the +-- latter are marked as recommended, which enables us to only include major +-- versions in results that have been recommended at least once in a consensus. +CREATE TABLE versions ( + version_id SERIAL PRIMARY KEY, + version_string TEXT UNIQUE NOT NULL, + recommended BOOLEAN NOT NULL +); + +-- Hard-wire a 0.1.0 version and a 0.1.1 version, because these major versions +-- were never recommended in a consensus, yet they are supposed to go into the +-- versions statistics. +INSERT INTO versions (version_string, recommended) VALUES ('0.1.0.17', TRUE); +INSERT INTO versions (version_string, recommended) VALUES ('0.1.1.26', TRUE); + +-- Table of all known platforms from platform lines in server descriptors, that +-- is, without Tor software version information. +CREATE TABLE platforms ( + platform_id SERIAL PRIMARY KEY, + platform_string TEXT UNIQUE NOT NULL +); + -- Table of all relevant parts contained in relay or bridge server descriptors. -- We're not deleting from this table, because we can never be sure that we -- won't import a previously missing status that we'll want to match against -- existing server descriptors. CREATE TABLE server_descriptors ( descriptor_digest_sha1 BYTEA PRIMARY KEY, + platform_id INTEGER REFERENCES platforms (platform_id), + version_id INTEGER REFERENCES versions (version_id), advertised_bandwidth_bytes INTEGER NOT NULL, announced_ipv6 BOOLEAN NOT NULL, exiting_ipv6_relay BOOLEAN NOT NULL @@ -21,6 +63,10 @@ CREATE TABLE statuses ( server server_enum NOT NULL, valid_after TIMESTAMP WITHOUT TIME ZONE NOT NULL, running_count INTEGER NOT NULL, + consensus_weight_sum REAL, + guard_weight_sum REAL, + middle_weight_sum REAL, + exit_weight_sum REAL, UNIQUE (server, valid_after) );
@@ -30,9 +76,12 @@ CREATE TABLE statuses ( CREATE TABLE status_entries ( status_id INTEGER REFERENCES statuses (status_id) NOT NULL, descriptor_digest_sha1 BYTEA NOT NULL, - guard_relay BOOLEAN NOT NULL, - exit_relay BOOLEAN NOT NULL, + flags INTEGER NOT NULL, reachable_ipv6_relay BOOLEAN NOT NULL, + consensus_weight REAL, + guard_weight REAL, + middle_weight REAL, + exit_weight REAL, UNIQUE (status_id, descriptor_digest_sha1) );
@@ -47,33 +96,158 @@ CREATE TABLE aggregated_ipv6 ( exiting_ipv6_relay BOOLEAN NOT NULL, reachable_ipv6_relay BOOLEAN NOT NULL, server_count_sum INTEGER NOT NULL, + consensus_weight REAL, + guard_weight REAL, + middle_weight REAL, + exit_weight REAL, advertised_bandwidth_bytes_sum BIGINT NOT NULL, CONSTRAINT aggregated_ipv6_unique UNIQUE (status_id, guard_relay, exit_relay, announced_ipv6, exiting_ipv6_relay, reachable_ipv6_relay) );
+-- Table of joined and aggregated server_descriptors and status_entries rows by +-- relay flag. +CREATE TABLE aggregated_flags ( + status_id INTEGER REFERENCES statuses (status_id) NOT NULL, + flag_id INTEGER REFERENCES flags (flag_id) NOT NULL, + server_count_sum INTEGER NOT NULL, + consensus_weight REAL, + guard_weight REAL, + middle_weight REAL, + exit_weight REAL, + advertised_bandwidth_bytes_sum BIGINT NOT NULL, + CONSTRAINT aggregated_flags_unique UNIQUE (status_id, flag_id) +); + +-- Table of joined and aggregated server_descriptors and status_entries rows by +-- version. +CREATE TABLE aggregated_versions ( + status_id INTEGER REFERENCES statuses (status_id) NOT NULL, + version_id INTEGER REFERENCES versions (version_id), + server_count_sum INTEGER NOT NULL, + consensus_weight REAL, + guard_weight REAL, + middle_weight REAL, + exit_weight REAL, + advertised_bandwidth_bytes_sum BIGINT NOT NULL, + CONSTRAINT aggregated_versions_unique UNIQUE (status_id, version_id) +); + +-- Table of joined and aggregated server_descriptors and status_entries rows by +-- platform. +CREATE TABLE aggregated_platforms ( + status_id INTEGER REFERENCES statuses (status_id) NOT NULL, + platform_id INTEGER REFERENCES platforms (platform_id), + server_count_sum INTEGER NOT NULL, + consensus_weight REAL, + guard_weight REAL, + middle_weight REAL, + exit_weight REAL, + advertised_bandwidth_bytes_sum BIGINT NOT NULL, + CONSTRAINT aggregated_platforms_unique UNIQUE (status_id, platform_id) +); + -- Function to aggregate server_descriptors and status_entries rows into the --- aggregated table and delete rows from status_entries that are then contained --- in the aggregated table. This function is supposed to be called once after --- inserting new rows into server_descriptors and/or status_entries. Subsequent --- calls won't have any effect. -CREATE OR REPLACE FUNCTION aggregate_ipv6() RETURNS VOID AS $$ +-- aggregated_* tables and delete rows from status_entries that are then +-- contained in the aggregated_* tables. This function is supposed to be called +-- once after inserting new rows into server_descriptors and/or status_entries. +-- Subsequent calls won't have any effect. +CREATE OR REPLACE FUNCTION aggregate() RETURNS VOID AS $$ +-- Aggregate by IPv6 support. INSERT INTO aggregated_ipv6 -SELECT status_id, guard_relay, exit_relay, announced_ipv6, exiting_ipv6_relay, +SELECT status_id, flags & 1 > 0 AS guard_relay, + flags & 2 > 0 AND flags & 4 = 0 AS exit_relay, + announced_ipv6, exiting_ipv6_relay, reachable_ipv6_relay, COUNT(*) AS server_count_sum, - SUM(advertised_bandwidth_bytes) AS advertised_bandwidth_bytes + SUM(consensus_weight) AS consensus_weight, + SUM(guard_weight) AS guard_weight, + SUM(middle_weight) AS middle_weight, + SUM(exit_weight) AS exit_weight, + SUM(advertised_bandwidth_bytes) AS advertised_bandwidth_bytes_sum FROM status_entries NATURAL JOIN server_descriptors -NATURAL JOIN statuses GROUP BY status_id, guard_relay, exit_relay, announced_ipv6, exiting_ipv6_relay, reachable_ipv6_relay ON CONFLICT ON CONSTRAINT aggregated_ipv6_unique DO UPDATE SET server_count_sum = aggregated_ipv6.server_count_sum - + EXCLUDED.server_count_sum, + + EXCLUDED.server_count_sum, + consensus_weight = aggregated_ipv6.consensus_weight + + EXCLUDED.consensus_weight, + guard_weight = aggregated_ipv6.guard_weight + EXCLUDED.guard_weight, + middle_weight = aggregated_ipv6.middle_weight + EXCLUDED.middle_weight, + exit_weight = aggregated_ipv6.exit_weight + EXCLUDED.exit_weight, + advertised_bandwidth_bytes_sum + = aggregated_ipv6.advertised_bandwidth_bytes_sum + + EXCLUDED.advertised_bandwidth_bytes_sum; +-- Aggregate by assigned relay flags. +INSERT INTO aggregated_flags +SELECT status_id, flag_id, COUNT(*) AS server_count_sum, + SUM(consensus_weight) AS consensus_weight, + SUM(guard_weight) AS guard_weight, + SUM(middle_weight) AS middle_weight, + SUM(exit_weight) AS exit_weight, + SUM(advertised_bandwidth_bytes) AS advertised_bandwidth_bytes_sum +FROM status_entries +NATURAL JOIN server_descriptors +JOIN flags ON flags & (1 << flag_id) > 0 +GROUP BY status_id, flag_id +ON CONFLICT ON CONSTRAINT aggregated_flags_unique +DO UPDATE SET server_count_sum = aggregated_flags.server_count_sum + + EXCLUDED.server_count_sum, + consensus_weight = aggregated_flags.consensus_weight + + EXCLUDED.consensus_weight, + guard_weight = aggregated_flags.guard_weight + EXCLUDED.guard_weight, + middle_weight = aggregated_flags.middle_weight + EXCLUDED.middle_weight, + exit_weight = aggregated_flags.exit_weight + EXCLUDED.exit_weight, advertised_bandwidth_bytes_sum - = aggregated_ipv6.advertised_bandwidth_bytes_sum - + EXCLUDED.advertised_bandwidth_bytes_sum; + = aggregated_flags.advertised_bandwidth_bytes_sum + + EXCLUDED.advertised_bandwidth_bytes_sum; +-- Aggregate by version. +INSERT INTO aggregated_versions +SELECT status_id, version_id, COUNT(*) AS server_count_sum, + SUM(consensus_weight) AS consensus_weight, + SUM(guard_weight) AS guard_weight, + SUM(middle_weight) AS middle_weight, + SUM(exit_weight) AS exit_weight, + SUM(advertised_bandwidth_bytes) AS advertised_bandwidth_bytes_sum +FROM status_entries +NATURAL JOIN server_descriptors +GROUP BY status_id, version_id +ON CONFLICT ON CONSTRAINT aggregated_versions_unique +DO UPDATE SET server_count_sum = aggregated_versions.server_count_sum + + EXCLUDED.server_count_sum, + consensus_weight = aggregated_versions.consensus_weight + + EXCLUDED.consensus_weight, + guard_weight = aggregated_versions.guard_weight + EXCLUDED.guard_weight, + middle_weight = aggregated_versions.middle_weight + EXCLUDED.middle_weight, + exit_weight = aggregated_versions.exit_weight + EXCLUDED.exit_weight, + advertised_bandwidth_bytes_sum + = aggregated_versions.advertised_bandwidth_bytes_sum + + EXCLUDED.advertised_bandwidth_bytes_sum; +-- Aggregate by platform. +INSERT INTO aggregated_platforms +SELECT status_id, platform_id, COUNT(*) AS server_count_sum, + SUM(consensus_weight) AS consensus_weight, + SUM(guard_weight) AS guard_weight, + SUM(middle_weight) AS middle_weight, + SUM(exit_weight) AS exit_weight, + SUM(advertised_bandwidth_bytes) AS advertised_bandwidth_bytes_sum +FROM status_entries +NATURAL JOIN server_descriptors +GROUP BY status_id, platform_id +ON CONFLICT ON CONSTRAINT aggregated_platforms_unique +DO UPDATE SET server_count_sum = aggregated_platforms.server_count_sum + + EXCLUDED.server_count_sum, + consensus_weight = aggregated_platforms.consensus_weight + + EXCLUDED.consensus_weight, + guard_weight = aggregated_platforms.guard_weight + EXCLUDED.guard_weight, + middle_weight = aggregated_platforms.middle_weight + EXCLUDED.middle_weight, + exit_weight = aggregated_platforms.exit_weight + EXCLUDED.exit_weight, + advertised_bandwidth_bytes_sum + = aggregated_platforms.advertised_bandwidth_bytes_sum + + EXCLUDED.advertised_bandwidth_bytes_sum; +-- Delete obsolete rows from the status_entries table. DELETE FROM status_entries WHERE EXISTS ( SELECT 1 FROM server_descriptors WHERE descriptor_digest_sha1 = status_entries.descriptor_digest_sha1); @@ -100,24 +274,210 @@ WITH included_statuses AS ( HAVING COUNT(status_id) >= 12 AND DATE(valid_after) < (SELECT MAX(DATE(valid_after)) FROM included_statuses) +), grouped_by_status AS ( + SELECT valid_after, server, + CASE WHEN server = 'relay' THEN guard_relay ELSE NULL END + AS guard_relay_or_null, + CASE WHEN server = 'relay' THEN exit_relay ELSE NULL END + AS exit_relay_or_null, + announced_ipv6, + CASE WHEN server = 'relay' THEN exiting_ipv6_relay ELSE NULL END + AS exiting_ipv6_relay_or_null, + CASE WHEN server = 'relay' THEN reachable_ipv6_relay ELSE NULL END + AS reachable_ipv6_relay_or_null, + SUM(server_count_sum) AS server_count_sum, + CASE WHEN server = 'relay' THEN SUM(advertised_bandwidth_bytes_sum) + ELSE NULL END AS advertised_bandwidth_bytes_sum + FROM statuses NATURAL JOIN aggregated_ipv6 + WHERE status_id IN (SELECT status_id FROM included_statuses) + AND DATE(valid_after) IN ( + SELECT valid_after_date FROM included_dates + WHERE included_dates.server = statuses.server) + GROUP BY status_id, valid_after, server, guard_relay_or_null, + exit_relay_or_null, announced_ipv6, exiting_ipv6_relay_or_null, + reachable_ipv6_relay_or_null ) SELECT DATE(valid_after) AS valid_after_date, server, - CASE WHEN server = 'relay' THEN guard_relay ELSE NULL END AS guard_relay, - CASE WHEN server = 'relay' THEN exit_relay ELSE NULL END AS exit_relay, + guard_relay_or_null AS guard_relay, + exit_relay_or_null AS exit_relay, announced_ipv6, - CASE WHEN server = 'relay' THEN exiting_ipv6_relay ELSE NULL END - AS exiting_ipv6_relay, - CASE WHEN server = 'relay' THEN reachable_ipv6_relay ELSE NULL END - AS reachable_ipv6_relay, + exiting_ipv6_relay_or_null AS exiting_ipv6_relay, + reachable_ipv6_relay_or_null AS reachable_ipv6_relay, FLOOR(AVG(server_count_sum)) AS server_count_sum_avg, - CASE WHEN server = 'relay' THEN FLOOR(AVG(advertised_bandwidth_bytes_sum)) - ELSE NULL END AS advertised_bandwidth_bytes_sum_avg -FROM statuses NATURAL JOIN aggregated_ipv6 -WHERE status_id IN (SELECT status_id FROM included_statuses) -AND DATE(valid_after) IN ( - SELECT valid_after_date FROM included_dates WHERE server = statuses.server) + FLOOR(AVG(advertised_bandwidth_bytes_sum)) + AS advertised_bandwidth_bytes_sum_avg +FROM grouped_by_status GROUP BY DATE(valid_after), server, guard_relay, exit_relay, announced_ipv6, exiting_ipv6_relay, reachable_ipv6_relay ORDER BY valid_after_date, server, guard_relay, exit_relay, announced_ipv6, exiting_ipv6_relay, reachable_ipv6_relay;
+-- View on the number of running servers by relay flag. +CREATE OR REPLACE VIEW servers_flags_complete AS +WITH included_statuses AS ( + SELECT status_id, server, valid_after + FROM statuses NATURAL JOIN aggregated_flags + GROUP BY status_id + HAVING running_count > 0 + AND 1000 * SUM(server_count_sum) > 999 * running_count +), included_dates AS ( + SELECT DATE(valid_after) AS valid_after_date, server + FROM included_statuses NATURAL JOIN aggregated_flags + GROUP BY DATE(valid_after), server + HAVING COUNT(status_id) >= 12 + AND DATE(valid_after) + < (SELECT MAX(DATE(valid_after)) FROM included_statuses) +) +SELECT DATE(valid_after) AS valid_after_date, server, flag_string AS flag, + FLOOR(AVG(server_count_sum)) AS server_count_sum_avg, + AVG(consensus_weight / consensus_weight_sum) AS consensus_weight_fraction, + AVG(guard_weight / guard_weight_sum) AS guard_weight_fraction, + AVG(middle_weight / middle_weight_sum) AS middle_weight_fraction, + AVG(exit_weight / exit_weight_sum) AS exit_weight_fraction +FROM statuses NATURAL JOIN aggregated_flags NATURAL JOIN flags +WHERE status_id IN (SELECT status_id FROM included_statuses) +AND DATE(valid_after) IN ( + SELECT valid_after_date FROM included_dates + WHERE included_dates.server = statuses.server) +GROUP BY DATE(valid_after), server, flag +ORDER BY valid_after_date, server, flag; + +-- View on the number of running relays and bridges. +CREATE OR REPLACE VIEW servers_networksize AS +SELECT valid_after_date AS date, + FLOOR(AVG(CASE WHEN server = 'relay' THEN server_count_sum_avg + ELSE NULL END)) AS relays, + FLOOR(AVG(CASE WHEN server = 'bridge' THEN server_count_sum_avg + ELSE NULL END)) AS bridges +FROM servers_flags_complete +WHERE flag = 'Running' +GROUP BY date +ORDER BY date; + +-- View on the number of running relays by relay flag. +CREATE OR REPLACE VIEW servers_relayflags AS +SELECT valid_after_date AS date, flag, server_count_sum_avg AS relays +FROM servers_flags_complete +WHERE server = 'relay' +AND flag IN ('Running', 'Exit', 'Fast', 'Guard', 'Stable', 'HSDir') +ORDER BY date, flag; + +-- View on the number of running servers by version. +CREATE OR REPLACE VIEW servers_versions_complete AS +WITH included_statuses AS ( + SELECT status_id, server, valid_after + FROM statuses NATURAL JOIN aggregated_versions + GROUP BY status_id, server, valid_after + HAVING running_count > 0 + AND 1000 * SUM(server_count_sum) > 999 * running_count +), included_dates AS ( + SELECT DATE(valid_after) AS valid_after_date, server + FROM included_statuses + GROUP BY DATE(valid_after), server + HAVING COUNT(status_id) >= 12 + AND DATE(valid_after) + < (SELECT MAX(DATE(valid_after)) FROM included_statuses) +), included_versions AS ( + SELECT SUBSTRING(version_string FROM '^([^.]+.[^.]+.[^.]+)') AS version + FROM versions + WHERE recommended IS TRUE + GROUP BY version +), grouped_by_version AS ( + SELECT server, valid_after, + CASE WHEN SUBSTRING(version_string FROM '^([^.]+.[^.]+.[^.]+)') + IN (SELECT version FROM included_versions) + THEN SUBSTRING(version_string FROM '^([^.]+.[^.]+.[^.]+)') + ELSE 'Other' END AS version, + SUM(server_count_sum) AS server_count_sum, + SUM(consensus_weight) AS consensus_weight, + SUM(guard_weight) AS guard_weight, + SUM(middle_weight) AS middle_weight, + SUM(exit_weight) AS exit_weight, + consensus_weight_sum, + guard_weight_sum, + middle_weight_sum, + exit_weight_sum + FROM statuses NATURAL JOIN aggregated_versions LEFT JOIN versions + ON aggregated_versions.version_id = versions.version_id + WHERE status_id IN (SELECT status_id FROM included_statuses) + AND DATE(valid_after) IN ( + SELECT valid_after_date FROM included_dates + WHERE included_dates.server = statuses.server) + GROUP BY status_id, server, valid_after, version +) +SELECT DATE(valid_after) AS valid_after_date, server, version, + FLOOR(AVG(server_count_sum)) AS server_count_sum_avg, + AVG(consensus_weight / consensus_weight_sum) AS consensus_weight_fraction, + AVG(guard_weight / guard_weight_sum) AS guard_weight_fraction, + AVG(middle_weight / middle_weight_sum) AS middle_weight_fraction, + AVG(exit_weight / exit_weight_sum) AS exit_weight_fraction +FROM grouped_by_version +GROUP BY DATE(valid_after), server, version +ORDER BY valid_after_date, server, version; + +-- View on the number of running relays by version. +CREATE OR REPLACE VIEW servers_versions AS +SELECT valid_after_date AS date, version, server_count_sum_avg AS relays +FROM servers_versions_complete +WHERE server = 'relay' +ORDER BY date, version; + +-- View on the number of running servers by platform. +CREATE OR REPLACE VIEW servers_platforms_complete AS +WITH included_statuses AS ( + SELECT status_id, server, valid_after + FROM statuses NATURAL JOIN aggregated_platforms + GROUP BY status_id, server, valid_after + HAVING running_count > 0 + AND 1000 * SUM(server_count_sum) > 999 * running_count +), included_dates AS ( + SELECT DATE(valid_after) AS valid_after_date, server + FROM included_statuses + GROUP BY DATE(valid_after), server + HAVING COUNT(status_id) >= 12 + AND DATE(valid_after) + < (SELECT MAX(DATE(valid_after)) FROM included_statuses) +), grouped_by_platform AS ( + SELECT server, valid_after, + CASE WHEN platform_string LIKE 'Linux%' THEN 'Linux' + WHEN platform_string LIKE 'Windows%' THEN 'Windows' + WHEN platform_string LIKE 'Darwin%' THEN 'macOS' + WHEN platform_string LIKE '%BSD%' + -- Uncomment, if we ever want to count DragonFly as BSD + -- OR platform_string = 'DragonFly%' + THEN 'BSD' + ELSE 'Other' END AS platform, + SUM(server_count_sum) AS server_count_sum, + SUM(consensus_weight) AS consensus_weight, + SUM(guard_weight) AS guard_weight, + SUM(middle_weight) AS middle_weight, + SUM(exit_weight) AS exit_weight, + consensus_weight_sum, + guard_weight_sum, + middle_weight_sum, + exit_weight_sum + FROM statuses NATURAL JOIN aggregated_platforms LEFT JOIN platforms + ON aggregated_platforms.platform_id = platforms.platform_id + WHERE status_id IN (SELECT status_id FROM included_statuses) + AND DATE(valid_after) IN ( + SELECT valid_after_date FROM included_dates + WHERE included_dates.server = statuses.server) + GROUP BY status_id, server, valid_after, platform +) +SELECT DATE(valid_after) AS valid_after_date, server, platform, + FLOOR(AVG(server_count_sum)) AS server_count_sum_avg, + AVG(consensus_weight / consensus_weight_sum) AS consensus_weight_fraction, + AVG(guard_weight / guard_weight_sum) AS guard_weight_fraction, + AVG(middle_weight / middle_weight_sum) AS middle_weight_fraction, + AVG(exit_weight / exit_weight_sum) AS exit_weight_fraction +FROM grouped_by_platform +GROUP BY DATE(valid_after), server, platform +ORDER BY valid_after_date, server, platform; + +-- View on the number of running relays by platform. +CREATE OR REPLACE VIEW servers_platforms AS +SELECT valid_after_date AS date, platform, server_count_sum_avg AS relays +FROM servers_platforms_complete +WHERE server = 'relay' +ORDER BY date, platform; +
tor-commits@lists.torproject.org