[tor-commits] [tor/master] consdiffmgr: compress incoming consensuses in the background

nickm at torproject.org nickm at torproject.org
Mon May 15 21:26:28 UTC 2017


commit 151cd121a2733506c69162330b210c3c45044dfa
Author: Nick Mathewson <nickm at torproject.org>
Date:   Fri May 5 11:17:59 2017 -0400

    consdiffmgr: compress incoming consensuses in the background
    
    Also, compress them in several ways.
    
    This breaks the unit tests; subsequent commits will make them pass
    again.
---
 src/or/consdiffmgr.c | 233 +++++++++++++++++++++++++++++++++++++--------------
 src/or/consdiffmgr.h |   1 +
 2 files changed, 173 insertions(+), 61 deletions(-)

diff --git a/src/or/consdiffmgr.c b/src/or/consdiffmgr.c
index b108c80..2f0a653 100644
--- a/src/or/consdiffmgr.c
+++ b/src/or/consdiffmgr.c
@@ -96,6 +96,24 @@ n_diff_compression_methods(void)
   return ARRAY_LENGTH(compress_diffs_with);
 }
 
+/** Which methods do we use for precompressing consensuses? */
+static const compress_method_t compress_consensus_with[] = {
+  ZLIB_METHOD,
+#ifdef HAVE_LZMA
+  LZMA_METHOD,
+#endif
+#ifdef HAVE_ZSTD
+  ZSTD_METHOD,
+#endif
+};
+
+/** How many different methods will we try to use for diff compression? */
+static unsigned
+n_consensus_compression_methods(void)
+{
+  return ARRAY_LENGTH(compress_consensus_with);
+}
+
 /** Hashtable node used to remember the current status of the diff
  * from a given sha3 digest to the current consensus.  */
 typedef struct cdm_diff_t {
@@ -135,13 +153,13 @@ static consdiff_cfg_t consdiff_cfg = {
 };
 
 static int consdiffmgr_ensure_space_for_files(int n);
+static int consensus_queue_compression_work(const char *consensus,
+                                            consensus_flavor_t flavor,
+                                            time_t valid_after);
 static int consensus_diff_queue_diff_work(consensus_cache_entry_t *diff_from,
                                           consensus_cache_entry_t *diff_to);
 static void consdiffmgr_set_cache_flags(void);
 
-/* Just gzip consensuses for now. */
-#define COMPRESS_CONSENSUS_WITH GZIP_METHOD
-
 /* =====
  * Hashtable setup
  * ===== */
@@ -410,11 +428,6 @@ cdm_cache_lookup_consensus(consensus_flavor_t flavor, time_t valid_after)
   consensus_cache_filter_list(matches, LABEL_DOCTYPE, DOCTYPE_CONSENSUS);
 
   consensus_cache_entry_t *result = NULL;
-  if (smartlist_len(matches) > 1) {
-    log_warn(LD_BUG, "How odd; there appear to be two matching consensuses "
-             "with flavor %s published at %s.",
-             flavname, formatted_time);
-  }
   if (smartlist_len(matches)) {
     result = smartlist_get(matches, 0);
   }
@@ -458,59 +471,7 @@ consdiffmgr_add_consensus(const char *consensus,
   }
 
   /* We don't have it. Add it to the cache. */
-  consdiffmgr_ensure_space_for_files(1);
-
-  {
-    size_t bodylen = strlen(consensus);
-    config_line_t *labels = NULL;
-    char formatted_time[ISO_TIME_LEN+1];
-    format_iso_time_nospace(formatted_time, valid_after);
-    const char *flavname = networkstatus_get_flavor_name(flavor);
-
-    cdm_labels_prepend_sha3(&labels, LABEL_SHA3_DIGEST_UNCOMPRESSED,
-                            (const uint8_t *)consensus, bodylen);
-    {
-      const char *start, *end;
-      if (router_get_networkstatus_v3_signed_boundaries(consensus,
-                                                        &start, &end) < 0) {
-        start = consensus;
-        end = consensus+bodylen;
-      }
-      cdm_labels_prepend_sha3(&labels, LABEL_SHA3_DIGEST_AS_SIGNED,
-                              (const uint8_t *)start,
-                              end - start);
-    }
-
-    char *body_compressed = NULL;
-    size_t size_compressed = 0;
-    if (tor_compress(&body_compressed, &size_compressed,
-                     consensus, bodylen, COMPRESS_CONSENSUS_WITH) < 0) {
-      config_free_lines(labels);
-      return -1;
-    }
-    cdm_labels_prepend_sha3(&labels, LABEL_SHA3_DIGEST,
-                            (const uint8_t *)body_compressed, size_compressed);
-    config_line_prepend(&labels, LABEL_COMPRESSION_TYPE,
-                        compression_method_get_name(COMPRESS_CONSENSUS_WITH));
-    config_line_prepend(&labels, LABEL_FLAVOR, flavname);
-    config_line_prepend(&labels, LABEL_VALID_AFTER, formatted_time);
-    config_line_prepend(&labels, LABEL_DOCTYPE, DOCTYPE_CONSENSUS);
-
-    entry = consensus_cache_add(cdm_cache_get(),
-                                labels,
-                                (const uint8_t *)body_compressed,
-                                size_compressed);
-    tor_free(body_compressed);
-    config_free_lines(labels);
-  }
-
-  if (entry) {
-    consensus_cache_entry_mark_for_aggressive_release(entry);
-    consensus_cache_entry_decref(entry);
-  }
-
-  cdm_cache_dirty = 1;
-  return entry ? 0 : -1;
+  return consensus_queue_compression_work(consensus, flavor, valid_after);
 }
 
 /**
@@ -835,6 +796,10 @@ consdiffmgr_rescan_flavor_(consensus_flavor_t flavor)
     if (strmap_get(have_diff_from, va) != NULL)
       continue; /* we already have this one. */
     smartlist_add(compute_diffs_from, ent);
+    /* Since we are not going to serve this as the most recent consensus
+     * any more, we should stop keeping it mmap'd when it's not in use.
+     */
+    consensus_cache_entry_mark_for_aggressive_release(ent);
   } SMARTLIST_FOREACH_END(ent);
 
   log_info(LD_DIRSERV,
@@ -1147,6 +1112,8 @@ store_multiple(consensus_cache_entry_handle_t **handles_out,
                             labels,
                             body_out,
                             bodylen_out);
+      if (BUG(ent == NULL))
+        continue;
 
       status = CDM_DIFF_PRESENT;
       handles_out[i] = consensus_cache_entry_handle_new(ent);
@@ -1464,3 +1431,147 @@ consensus_diff_queue_diff_work(consensus_cache_entry_t *diff_from,
   return -1;
 }
 
+/**
+ * Holds requests and replies for consensus_compress_workers.
+ */
+typedef struct consensus_compress_worker_job_t {
+  char *consensus;
+  size_t consensus_len;
+  consensus_flavor_t flavor;
+  time_t valid_after;
+  compressed_result_t out[ARRAY_LENGTH(compress_consensus_with)];
+} consensus_compress_worker_job_t;
+
+/**
+ * Free all resources held in <b>job</b>
+ */
+static void
+consensus_compress_worker_job_free(consensus_compress_worker_job_t *job)
+{
+  if (!job)
+    return;
+  tor_free(job->consensus);
+  unsigned u;
+  for (u = 0; u < n_consensus_compression_methods(); ++u) {
+    config_free_lines(job->out[u].labels);
+    tor_free(job->out[u].body);
+  }
+  tor_free(job);
+}
+/**
+ * Worker function. This function runs inside a worker thread and receives
+ * a consensus_compress_worker_job_t as its input.
+ */
+static workqueue_reply_t
+consensus_compress_worker_threadfn(void *state_, void *work_)
+{
+  (void)state_;
+  consensus_compress_worker_job_t *job = work_;
+  consensus_flavor_t flavor = job->flavor;
+  const char *consensus = job->consensus;
+  size_t bodylen = job->consensus_len;
+  time_t valid_after = job->valid_after;
+
+  config_line_t *labels = NULL;
+  char formatted_time[ISO_TIME_LEN+1];
+  format_iso_time_nospace(formatted_time, valid_after);
+
+  const char *flavname = networkstatus_get_flavor_name(flavor);
+
+  cdm_labels_prepend_sha3(&labels, LABEL_SHA3_DIGEST_UNCOMPRESSED,
+                          (const uint8_t *)consensus, bodylen);
+  {
+    const char *start, *end;
+    if (router_get_networkstatus_v3_signed_boundaries(consensus,
+                                                        &start, &end) < 0) {
+      start = consensus;
+      end = consensus+bodylen;
+    }
+    cdm_labels_prepend_sha3(&labels, LABEL_SHA3_DIGEST_AS_SIGNED,
+                            (const uint8_t *)start,
+                            end - start);
+  }
+  config_line_prepend(&labels, LABEL_FLAVOR, flavname);
+  config_line_prepend(&labels, LABEL_VALID_AFTER, formatted_time);
+  config_line_prepend(&labels, LABEL_DOCTYPE, DOCTYPE_CONSENSUS);
+
+  compress_multiple(job->out,
+                    n_consensus_compression_methods(),
+                    compress_consensus_with,
+                    (const uint8_t*)consensus, bodylen, labels);
+  config_free_lines(labels);
+  return WQ_RPL_REPLY;
+}
+
+/**
+ * Worker function: This function runs in the main thread, and receives
+ * a consensus_diff_compress_job_t that the worker thread has already
+ * processed.
+ */
+static void
+consensus_compress_worker_replyfn(void *work_)
+{
+  consensus_compress_worker_job_t *job = work_;
+
+  consensus_cache_entry_handle_t *handles[
+                               ARRAY_LENGTH(compress_consensus_with)];
+  memset(handles, 0, sizeof(handles));
+
+  store_multiple(handles,
+                 n_consensus_compression_methods(),
+                 compress_consensus_with,
+                 job->out,
+                 "consensus");
+  cdm_cache_dirty = 1;
+
+  consensus_compress_worker_job_free(job);
+}
+
+/**
+ * If true, we compress in worker threads.
+ */
+static int background_compression = 0;
+
+/**
+ * Queue a job to compress <b>consensus</b> and store its compressed
+ * text in the cache.
+ */
+static int
+consensus_queue_compression_work(const char *consensus,
+                                 consensus_flavor_t flavor,
+                                 time_t valid_after)
+{
+  consensus_compress_worker_job_t *job = tor_malloc_zero(sizeof(*job));
+  job->consensus = tor_strdup(consensus);
+  job->consensus_len = strlen(consensus);
+  job->flavor = flavor;
+  job->valid_after = valid_after;
+
+  if (background_compression) {
+    workqueue_entry_t *work;
+    work = cpuworker_queue_work(consensus_compress_worker_threadfn,
+                                consensus_compress_worker_replyfn,
+                                job);
+    if (!work) {
+      consensus_compress_worker_job_free(job);
+      return -1;
+    }
+
+    return 0;
+  } else {
+    consensus_compress_worker_threadfn(NULL, job);
+    consensus_compress_worker_replyfn(job);
+    return 0;
+  }
+}
+
+/**
+ * Tell the consdiffmgr backend to compress consensuses in worker threads.
+ */
+void
+consdiffmgr_enable_background_compression(void)
+{
+  // This isn't the default behavior because it would break unit tests.
+  background_compression = 1;
+}
+
diff --git a/src/or/consdiffmgr.h b/src/or/consdiffmgr.h
index 048dae4..cbff599 100644
--- a/src/or/consdiffmgr.h
+++ b/src/or/consdiffmgr.h
@@ -32,6 +32,7 @@ consdiff_status_t consdiffmgr_find_diff_from(
                            compress_method_t method);
 void consdiffmgr_rescan(void);
 int consdiffmgr_cleanup(void);
+void consdiffmgr_enable_background_compression(void);
 void consdiffmgr_configure(const consdiff_cfg_t *cfg);
 struct sandbox_cfg_elem;
 int consdiffmgr_register_with_sandbox(struct sandbox_cfg_elem **cfg);





More information about the tor-commits mailing list