commit 151cd121a2733506c69162330b210c3c45044dfa
Author: Nick Mathewson <nickm(a)torproject.org>
Date: Fri May 5 11:17:59 2017 -0400
consdiffmgr: compress incoming consensuses in the background
Also, compress them in several ways.
This breaks the unit tests; subsequent commits will make them pass
again.
---
src/or/consdiffmgr.c | 233 +++++++++++++++++++++++++++++++++++++--------------
src/or/consdiffmgr.h | 1 +
2 files changed, 173 insertions(+), 61 deletions(-)
diff --git a/src/or/consdiffmgr.c b/src/or/consdiffmgr.c
index b108c80..2f0a653 100644
--- a/src/or/consdiffmgr.c
+++ b/src/or/consdiffmgr.c
@@ -96,6 +96,24 @@ n_diff_compression_methods(void)
return ARRAY_LENGTH(compress_diffs_with);
}
+/** Which methods do we use for precompressing consensuses? */
+static const compress_method_t compress_consensus_with[] = {
+ ZLIB_METHOD,
+#ifdef HAVE_LZMA
+ LZMA_METHOD,
+#endif
+#ifdef HAVE_ZSTD
+ ZSTD_METHOD,
+#endif
+};
+
+/** How many different methods will we try to use for diff compression? */
+static unsigned
+n_consensus_compression_methods(void)
+{
+ return ARRAY_LENGTH(compress_consensus_with);
+}
+
/** Hashtable node used to remember the current status of the diff
* from a given sha3 digest to the current consensus. */
typedef struct cdm_diff_t {
@@ -135,13 +153,13 @@ static consdiff_cfg_t consdiff_cfg = {
};
static int consdiffmgr_ensure_space_for_files(int n);
+static int consensus_queue_compression_work(const char *consensus,
+ consensus_flavor_t flavor,
+ time_t valid_after);
static int consensus_diff_queue_diff_work(consensus_cache_entry_t *diff_from,
consensus_cache_entry_t *diff_to);
static void consdiffmgr_set_cache_flags(void);
-/* Just gzip consensuses for now. */
-#define COMPRESS_CONSENSUS_WITH GZIP_METHOD
-
/* =====
* Hashtable setup
* ===== */
@@ -410,11 +428,6 @@ cdm_cache_lookup_consensus(consensus_flavor_t flavor, time_t valid_after)
consensus_cache_filter_list(matches, LABEL_DOCTYPE, DOCTYPE_CONSENSUS);
consensus_cache_entry_t *result = NULL;
- if (smartlist_len(matches) > 1) {
- log_warn(LD_BUG, "How odd; there appear to be two matching consensuses "
- "with flavor %s published at %s.",
- flavname, formatted_time);
- }
if (smartlist_len(matches)) {
result = smartlist_get(matches, 0);
}
@@ -458,59 +471,7 @@ consdiffmgr_add_consensus(const char *consensus,
}
/* We don't have it. Add it to the cache. */
- consdiffmgr_ensure_space_for_files(1);
-
- {
- size_t bodylen = strlen(consensus);
- config_line_t *labels = NULL;
- char formatted_time[ISO_TIME_LEN+1];
- format_iso_time_nospace(formatted_time, valid_after);
- const char *flavname = networkstatus_get_flavor_name(flavor);
-
- cdm_labels_prepend_sha3(&labels, LABEL_SHA3_DIGEST_UNCOMPRESSED,
- (const uint8_t *)consensus, bodylen);
- {
- const char *start, *end;
- if (router_get_networkstatus_v3_signed_boundaries(consensus,
- &start, &end) < 0) {
- start = consensus;
- end = consensus+bodylen;
- }
- cdm_labels_prepend_sha3(&labels, LABEL_SHA3_DIGEST_AS_SIGNED,
- (const uint8_t *)start,
- end - start);
- }
-
- char *body_compressed = NULL;
- size_t size_compressed = 0;
- if (tor_compress(&body_compressed, &size_compressed,
- consensus, bodylen, COMPRESS_CONSENSUS_WITH) < 0) {
- config_free_lines(labels);
- return -1;
- }
- cdm_labels_prepend_sha3(&labels, LABEL_SHA3_DIGEST,
- (const uint8_t *)body_compressed, size_compressed);
- config_line_prepend(&labels, LABEL_COMPRESSION_TYPE,
- compression_method_get_name(COMPRESS_CONSENSUS_WITH));
- config_line_prepend(&labels, LABEL_FLAVOR, flavname);
- config_line_prepend(&labels, LABEL_VALID_AFTER, formatted_time);
- config_line_prepend(&labels, LABEL_DOCTYPE, DOCTYPE_CONSENSUS);
-
- entry = consensus_cache_add(cdm_cache_get(),
- labels,
- (const uint8_t *)body_compressed,
- size_compressed);
- tor_free(body_compressed);
- config_free_lines(labels);
- }
-
- if (entry) {
- consensus_cache_entry_mark_for_aggressive_release(entry);
- consensus_cache_entry_decref(entry);
- }
-
- cdm_cache_dirty = 1;
- return entry ? 0 : -1;
+ return consensus_queue_compression_work(consensus, flavor, valid_after);
}
/**
@@ -835,6 +796,10 @@ consdiffmgr_rescan_flavor_(consensus_flavor_t flavor)
if (strmap_get(have_diff_from, va) != NULL)
continue; /* we already have this one. */
smartlist_add(compute_diffs_from, ent);
+ /* Since we are not going to serve this as the most recent consensus
+ * any more, we should stop keeping it mmap'd when it's not in use.
+ */
+ consensus_cache_entry_mark_for_aggressive_release(ent);
} SMARTLIST_FOREACH_END(ent);
log_info(LD_DIRSERV,
@@ -1147,6 +1112,8 @@ store_multiple(consensus_cache_entry_handle_t **handles_out,
labels,
body_out,
bodylen_out);
+ if (BUG(ent == NULL))
+ continue;
status = CDM_DIFF_PRESENT;
handles_out[i] = consensus_cache_entry_handle_new(ent);
@@ -1464,3 +1431,147 @@ consensus_diff_queue_diff_work(consensus_cache_entry_t *diff_from,
return -1;
}
+/**
+ * Holds requests and replies for consensus_compress_workers.
+ */
+typedef struct consensus_compress_worker_job_t {
+ char *consensus;
+ size_t consensus_len;
+ consensus_flavor_t flavor;
+ time_t valid_after;
+ compressed_result_t out[ARRAY_LENGTH(compress_consensus_with)];
+} consensus_compress_worker_job_t;
+
+/**
+ * Free all resources held in <b>job</b>
+ */
+static void
+consensus_compress_worker_job_free(consensus_compress_worker_job_t *job)
+{
+ if (!job)
+ return;
+ tor_free(job->consensus);
+ unsigned u;
+ for (u = 0; u < n_consensus_compression_methods(); ++u) {
+ config_free_lines(job->out[u].labels);
+ tor_free(job->out[u].body);
+ }
+ tor_free(job);
+}
+/**
+ * Worker function. This function runs inside a worker thread and receives
+ * a consensus_compress_worker_job_t as its input.
+ */
+static workqueue_reply_t
+consensus_compress_worker_threadfn(void *state_, void *work_)
+{
+ (void)state_;
+ consensus_compress_worker_job_t *job = work_;
+ consensus_flavor_t flavor = job->flavor;
+ const char *consensus = job->consensus;
+ size_t bodylen = job->consensus_len;
+ time_t valid_after = job->valid_after;
+
+ config_line_t *labels = NULL;
+ char formatted_time[ISO_TIME_LEN+1];
+ format_iso_time_nospace(formatted_time, valid_after);
+
+ const char *flavname = networkstatus_get_flavor_name(flavor);
+
+ cdm_labels_prepend_sha3(&labels, LABEL_SHA3_DIGEST_UNCOMPRESSED,
+ (const uint8_t *)consensus, bodylen);
+ {
+ const char *start, *end;
+ if (router_get_networkstatus_v3_signed_boundaries(consensus,
+ &start, &end) < 0) {
+ start = consensus;
+ end = consensus+bodylen;
+ }
+ cdm_labels_prepend_sha3(&labels, LABEL_SHA3_DIGEST_AS_SIGNED,
+ (const uint8_t *)start,
+ end - start);
+ }
+ config_line_prepend(&labels, LABEL_FLAVOR, flavname);
+ config_line_prepend(&labels, LABEL_VALID_AFTER, formatted_time);
+ config_line_prepend(&labels, LABEL_DOCTYPE, DOCTYPE_CONSENSUS);
+
+ compress_multiple(job->out,
+ n_consensus_compression_methods(),
+ compress_consensus_with,
+ (const uint8_t*)consensus, bodylen, labels);
+ config_free_lines(labels);
+ return WQ_RPL_REPLY;
+}
+
+/**
+ * Worker function: This function runs in the main thread, and receives
+ * a consensus_diff_compress_job_t that the worker thread has already
+ * processed.
+ */
+static void
+consensus_compress_worker_replyfn(void *work_)
+{
+ consensus_compress_worker_job_t *job = work_;
+
+ consensus_cache_entry_handle_t *handles[
+ ARRAY_LENGTH(compress_consensus_with)];
+ memset(handles, 0, sizeof(handles));
+
+ store_multiple(handles,
+ n_consensus_compression_methods(),
+ compress_consensus_with,
+ job->out,
+ "consensus");
+ cdm_cache_dirty = 1;
+
+ consensus_compress_worker_job_free(job);
+}
+
+/**
+ * If true, we compress in worker threads.
+ */
+static int background_compression = 0;
+
+/**
+ * Queue a job to compress <b>consensus</b> and store its compressed
+ * text in the cache.
+ */
+static int
+consensus_queue_compression_work(const char *consensus,
+ consensus_flavor_t flavor,
+ time_t valid_after)
+{
+ consensus_compress_worker_job_t *job = tor_malloc_zero(sizeof(*job));
+ job->consensus = tor_strdup(consensus);
+ job->consensus_len = strlen(consensus);
+ job->flavor = flavor;
+ job->valid_after = valid_after;
+
+ if (background_compression) {
+ workqueue_entry_t *work;
+ work = cpuworker_queue_work(consensus_compress_worker_threadfn,
+ consensus_compress_worker_replyfn,
+ job);
+ if (!work) {
+ consensus_compress_worker_job_free(job);
+ return -1;
+ }
+
+ return 0;
+ } else {
+ consensus_compress_worker_threadfn(NULL, job);
+ consensus_compress_worker_replyfn(job);
+ return 0;
+ }
+}
+
+/**
+ * Tell the consdiffmgr backend to compress consensuses in worker threads.
+ */
+void
+consdiffmgr_enable_background_compression(void)
+{
+ // This isn't the default behavior because it would break unit tests.
+ background_compression = 1;
+}
+
diff --git a/src/or/consdiffmgr.h b/src/or/consdiffmgr.h
index 048dae4..cbff599 100644
--- a/src/or/consdiffmgr.h
+++ b/src/or/consdiffmgr.h
@@ -32,6 +32,7 @@ consdiff_status_t consdiffmgr_find_diff_from(
compress_method_t method);
void consdiffmgr_rescan(void);
int consdiffmgr_cleanup(void);
+void consdiffmgr_enable_background_compression(void);
void consdiffmgr_configure(const consdiff_cfg_t *cfg);
struct sandbox_cfg_elem;
int consdiffmgr_register_with_sandbox(struct sandbox_cfg_elem **cfg);