commit 3c4459bcbf1d3a9da4a8b3a8bfed10d2e045af74 Author: Alexander Færøy ahf@torproject.org Date: Mon Apr 17 14:57:37 2017 +0200
Refactor the streaming compression code.
This patch refactors our streaming compression code to allow us to extend it with non-zlib/non-gzip based compression schemas.
See https://bugs.torproject.org/21663 --- src/common/torgzip.c | 62 ++++++++++++++++++++++++------------------------- src/common/torgzip.h | 30 +++++++++++++----------- src/or/buffers.c | 20 +++++++++------- src/or/buffers.h | 2 +- src/or/circuitlist.c | 4 ++-- src/or/connection.c | 2 +- src/or/directory.c | 14 +++++------ src/or/dirserv.c | 2 +- src/or/or.h | 4 ++-- src/test/test_buffers.c | 12 +++++----- src/test/test_util.c | 28 +++++++++++----------- 11 files changed, 93 insertions(+), 87 deletions(-)
diff --git a/src/common/torgzip.c b/src/common/torgzip.c index cbfcabd..af4bbc5 100644 --- a/src/common/torgzip.c +++ b/src/common/torgzip.c @@ -399,9 +399,9 @@ detect_compression_method(const char *in, size_t in_len) } }
-/** Internal state for an incremental zlib compression/decompression. The - * body of this struct is not exposed. */ -struct tor_zlib_state_t { +/** Internal state for an incremental compression/decompression. The body of + * this struct is not exposed. */ +struct tor_compress_state_t { struct z_stream_s stream; /**< The zlib stream */ int compress; /**< True if we are compressing; false if we are inflating */
@@ -414,14 +414,13 @@ struct tor_zlib_state_t { size_t allocation; };
-/** Construct and return a tor_zlib_state_t object using <b>method</b>. If - * <b>compress</b>, it's for compression; otherwise it's for - * decompression. */ -tor_zlib_state_t * -tor_zlib_new(int compress_, compress_method_t method, - compression_level_t compression_level) +/** Construct and return a tor_compress_state_t object using <b>method</b>. If + * <b>compress</b>, it's for compression; otherwise it's for decompression. */ +tor_compress_state_t * +tor_compress_new(int compress_, compress_method_t method, + compression_level_t compression_level) { - tor_zlib_state_t *out; + tor_compress_state_t *out; int bits, memlevel;
if (! compress_) { @@ -430,7 +429,7 @@ tor_zlib_new(int compress_, compress_method_t method, compression_level = HIGH_COMPRESSION; }
- out = tor_malloc_zero(sizeof(tor_zlib_state_t)); + out = tor_malloc_zero(sizeof(tor_compress_state_t)); out->stream.zalloc = Z_NULL; out->stream.zfree = Z_NULL; out->stream.opaque = NULL; @@ -462,16 +461,17 @@ tor_zlib_new(int compress_, compress_method_t method, * to *<b>out</b>, adjusting the values as we go. If <b>finish</b> is true, * we've reached the end of the input. * - * Return TOR_ZLIB_DONE if we've finished the entire compression/decompression. - * Return TOR_ZLIB_OK if we're processed everything from the input. - * Return TOR_ZLIB_BUF_FULL if we're out of space on <b>out</b>. - * Return TOR_ZLIB_ERR if the stream is corrupt. + * Return TOR_COMPRESS_DONE if we've finished the entire + * compression/decompression. + * Return TOR_COMPRESS_OK if we're processed everything from the input. + * Return TOR_COMPRESS_BUFFER_FULL if we're out of space on <b>out</b>. + * Return TOR_COMPRESS_ERROR if the stream is corrupt. */ -tor_zlib_output_t -tor_zlib_process(tor_zlib_state_t *state, - char **out, size_t *out_len, - const char **in, size_t *in_len, - int finish) +tor_compress_output_t +tor_compress_process(tor_compress_state_t *state, + char **out, size_t *out_len, + const char **in, size_t *in_len, + int finish) { int err; tor_assert(*in_len <= UINT_MAX); @@ -498,31 +498,31 @@ tor_zlib_process(tor_zlib_state_t *state, if (! state->compress && is_compression_bomb(state->input_so_far, state->output_so_far)) { log_warn(LD_DIR, "Possible zlib bomb; abandoning stream."); - return TOR_ZLIB_ERR; + return TOR_COMPRESS_ERROR; }
switch (err) { case Z_STREAM_END: - return TOR_ZLIB_DONE; + return TOR_COMPRESS_DONE; case Z_BUF_ERROR: if (state->stream.avail_in == 0 && !finish) - return TOR_ZLIB_OK; - return TOR_ZLIB_BUF_FULL; + return TOR_COMPRESS_OK; + return TOR_COMPRESS_BUFFER_FULL; case Z_OK: if (state->stream.avail_out == 0 || finish) - return TOR_ZLIB_BUF_FULL; - return TOR_ZLIB_OK; + return TOR_COMPRESS_BUFFER_FULL; + return TOR_COMPRESS_OK; default: log_warn(LD_GENERAL, "Gzip returned an error: %s", state->stream.msg ? state->stream.msg : "<no message>"); - return TOR_ZLIB_ERR; + return TOR_COMPRESS_ERROR; } }
/** Deallocate <b>state</b>. */ void -tor_zlib_free(tor_zlib_state_t *state) +tor_compress_free(tor_compress_state_t *state) { if (!state) return; @@ -553,7 +553,7 @@ tor_zlib_state_size_precalc(int inflate_, int windowbits, int memlevel) that is, 32K for windowBits=15 (default value) plus a few kilobytes for small objects." */ - return sizeof(tor_zlib_state_t) + sizeof(struct z_stream_s) + + return sizeof(tor_compress_state_t) + sizeof(struct z_stream_s) + (1 << 15) + A_FEW_KILOBYTES; } else { /* Also from zconf.h: @@ -562,7 +562,7 @@ tor_zlib_state_size_precalc(int inflate_, int windowbits, int memlevel) (1 << (windowBits+2)) + (1 << (memLevel+9)) ... plus a few kilobytes for small objects." */ - return sizeof(tor_zlib_state_t) + sizeof(struct z_stream_s) + + return sizeof(tor_compress_state_t) + sizeof(struct z_stream_s) + (1 << (windowbits + 2)) + (1 << (memlevel + 9)) + A_FEW_KILOBYTES; } #undef A_FEW_KILOBYTES @@ -570,7 +570,7 @@ tor_zlib_state_size_precalc(int inflate_, int windowbits, int memlevel)
/** Return the approximate number of bytes allocated for <b>state</b>. */ size_t -tor_zlib_state_size(const tor_zlib_state_t *state) +tor_compress_state_size(const tor_compress_state_t *state) { return state->allocation; } diff --git a/src/common/torgzip.h b/src/common/torgzip.h index ac9763e..2b0504e 100644 --- a/src/common/torgzip.h +++ b/src/common/torgzip.h @@ -46,23 +46,27 @@ tor_zlib_get_header_version_str(void);
compress_method_t detect_compression_method(const char *in, size_t in_len);
-/** Return values from tor_zlib_process; see that function's documentation for - * details. */ +/** Return values from tor_compress_process; see that function's documentation + * for details. */ typedef enum { - TOR_ZLIB_OK, TOR_ZLIB_DONE, TOR_ZLIB_BUF_FULL, TOR_ZLIB_ERR -} tor_zlib_output_t; + TOR_COMPRESS_OK, + TOR_COMPRESS_DONE, + TOR_COMPRESS_BUFFER_FULL, + TOR_COMPRESS_ERROR +} tor_compress_output_t; /** Internal state for an incremental zlib compression/decompression. */ -typedef struct tor_zlib_state_t tor_zlib_state_t; -tor_zlib_state_t *tor_zlib_new(int compress, compress_method_t method, - compression_level_t level); +typedef struct tor_compress_state_t tor_compress_state_t; +tor_compress_state_t *tor_compress_new(int compress, + compress_method_t method, + compression_level_t level);
-tor_zlib_output_t tor_zlib_process(tor_zlib_state_t *state, - char **out, size_t *out_len, - const char **in, size_t *in_len, - int finish); -void tor_zlib_free(tor_zlib_state_t *state); +tor_compress_output_t tor_compress_process(tor_compress_state_t *state, + char **out, size_t *out_len, + const char **in, size_t *in_len, + int finish); +void tor_compress_free(tor_compress_state_t *state);
-size_t tor_zlib_state_size(const tor_zlib_state_t *state); +size_t tor_compress_state_size(const tor_compress_state_t *state); size_t tor_zlib_get_total_allocation(void);
#endif diff --git a/src/or/buffers.c b/src/or/buffers.c index e559f80..3490ce4 100644 --- a/src/or/buffers.c +++ b/src/or/buffers.c @@ -2088,11 +2088,11 @@ fetch_from_buf_line(buf_t *buf, char *data_out, size_t *data_len) }
/** Compress on uncompress the <b>data_len</b> bytes in <b>data</b> using the - * zlib state <b>state</b>, appending the result to <b>buf</b>. If + * compression state <b>state</b>, appending the result to <b>buf</b>. If * <b>done</b> is true, flush the data in the state and finish the * compression/uncompression. Return -1 on failure, 0 on success. */ int -write_to_buf_zlib(buf_t *buf, tor_zlib_state_t *state, +write_to_buf_zlib(buf_t *buf, tor_compress_state_t *state, const char *data, size_t data_len, int done) { @@ -2108,20 +2108,22 @@ write_to_buf_zlib(buf_t *buf, tor_zlib_state_t *state, } next = CHUNK_WRITE_PTR(buf->tail); avail = old_avail = CHUNK_REMAINING_CAPACITY(buf->tail); - switch (tor_zlib_process(state, &next, &avail, &data, &data_len, done)) { - case TOR_ZLIB_DONE: + switch (tor_compress_process(state, &next, &avail, + &data, &data_len, done)) { + case TOR_COMPRESS_DONE: over = 1; break; - case TOR_ZLIB_ERR: + case TOR_COMPRESS_ERROR: return -1; - case TOR_ZLIB_OK: + case TOR_COMPRESS_OK: if (data_len == 0) over = 1; break; - case TOR_ZLIB_BUF_FULL: + case TOR_COMPRESS_BUFFER_FULL: if (avail) { - /* Zlib says we need more room (ZLIB_BUF_FULL). Start a new chunk - * automatically, whether were going to or not. */ + /* The compression module says we need more room + * (TOR_COMPRESS_BUFFER_FULL). Start a new chunk automatically, + * whether were going to or not. */ need_new_chunk = 1; } break; diff --git a/src/or/buffers.h b/src/or/buffers.h index c6a5ffa..49a2ae4 100644 --- a/src/or/buffers.h +++ b/src/or/buffers.h @@ -36,7 +36,7 @@ int flush_buf(tor_socket_t s, buf_t *buf, size_t sz, size_t *buf_flushlen); int flush_buf_tls(tor_tls_t *tls, buf_t *buf, size_t sz, size_t *buf_flushlen);
int write_to_buf(const char *string, size_t string_len, buf_t *buf); -int write_to_buf_zlib(buf_t *buf, tor_zlib_state_t *state, +int write_to_buf_zlib(buf_t *buf, tor_compress_state_t *state, const char *data, size_t data_len, int done); int move_buf_to_buf(buf_t *buf_out, buf_t *buf_in, size_t *buf_flushlen); int fetch_from_buf(char *string, size_t string_len, buf_t *buf); diff --git a/src/or/circuitlist.c b/src/or/circuitlist.c index 80bb7f6..4f11586 100644 --- a/src/or/circuitlist.c +++ b/src/or/circuitlist.c @@ -1991,8 +1991,8 @@ single_conn_free_bytes(connection_t *conn) if (conn->type == CONN_TYPE_DIR) { dir_connection_t *dir_conn = TO_DIR_CONN(conn); if (dir_conn->zlib_state) { - result += tor_zlib_state_size(dir_conn->zlib_state); - tor_zlib_free(dir_conn->zlib_state); + result += tor_compress_state_size(dir_conn->zlib_state); + tor_compress_free(dir_conn->zlib_state); dir_conn->zlib_state = NULL; } } diff --git a/src/or/connection.c b/src/or/connection.c index 09e316d..ad97c3b 100644 --- a/src/or/connection.c +++ b/src/or/connection.c @@ -628,7 +628,7 @@ connection_free_(connection_t *conn) dir_connection_t *dir_conn = TO_DIR_CONN(conn); tor_free(dir_conn->requested_resource);
- tor_zlib_free(dir_conn->zlib_state); + tor_compress_free(dir_conn->zlib_state); if (dir_conn->spool) { SMARTLIST_FOREACH(dir_conn->spool, spooled_resource_t *, spooled, spooled_resource_free(spooled)); diff --git a/src/or/directory.c b/src/or/directory.c index e7a71dd..5ae61e4 100644 --- a/src/or/directory.c +++ b/src/or/directory.c @@ -3178,7 +3178,7 @@ handle_get_current_consensus(dir_connection_t *conn, write_http_response_header(conn, -1, compressed, smartlist_len(conn->spool) == 1 ? lifetime : 0); if (! compressed) - conn->zlib_state = tor_zlib_new(0, ZLIB_METHOD, HIGH_COMPRESSION); + conn->zlib_state = tor_compress_new(0, ZLIB_METHOD, HIGH_COMPRESSION);
/* Prime the connection with some data. */ const int initial_flush_result = connection_dirserv_flushed_some(conn); @@ -3276,8 +3276,8 @@ handle_get_status_vote(dir_connection_t *conn, const get_handler_args_t *args)
if (smartlist_len(items)) { if (compressed) { - conn->zlib_state = tor_zlib_new(1, ZLIB_METHOD, - choose_compression_level(estimated_len)); + conn->zlib_state = tor_compress_new(1, ZLIB_METHOD, + choose_compression_level(estimated_len)); SMARTLIST_FOREACH(items, const char *, c, connection_write_to_buf_zlib(c, strlen(c), conn, 0)); connection_write_to_buf_zlib("", 0, conn, 1); @@ -3335,7 +3335,7 @@ handle_get_microdesc(dir_connection_t *conn, const get_handler_args_t *args) write_http_response_header(conn, -1, compressed, MICRODESC_CACHE_LIFETIME);
if (compressed) - conn->zlib_state = tor_zlib_new(1, ZLIB_METHOD, + conn->zlib_state = tor_compress_new(1, ZLIB_METHOD, choose_compression_level(size_guess));
const int initial_flush_result = connection_dirserv_flushed_some(conn); @@ -3428,7 +3428,7 @@ handle_get_descriptor(dir_connection_t *conn, const get_handler_args_t *args) } write_http_response_header(conn, -1, compressed, cache_lifetime); if (compressed) - conn->zlib_state = tor_zlib_new(1, ZLIB_METHOD, + conn->zlib_state = tor_compress_new(1, ZLIB_METHOD, choose_compression_level(size_guess)); clear_spool = 0; /* Prime the connection with some data. */ @@ -3519,8 +3519,8 @@ handle_get_keys(dir_connection_t *conn, const get_handler_args_t *args)
write_http_response_header(conn, compressed?-1:len, compressed, 60*60); if (compressed) { - conn->zlib_state = tor_zlib_new(1, ZLIB_METHOD, - choose_compression_level(len)); + conn->zlib_state = tor_compress_new(1, ZLIB_METHOD, + choose_compression_level(len)); SMARTLIST_FOREACH(certs, authority_cert_t *, c, connection_write_to_buf_zlib(c->cache_info.signed_descriptor_body, c->cache_info.signed_descriptor_len, diff --git a/src/or/dirserv.c b/src/or/dirserv.c index 87afd69..186478c 100644 --- a/src/or/dirserv.c +++ b/src/or/dirserv.c @@ -3792,7 +3792,7 @@ connection_dirserv_flushed_some(dir_connection_t *conn) /* Flush the zlib state: there could be more bytes pending in there, and * we don't want to omit bytes. */ connection_write_to_buf_zlib("", 0, conn, 1); - tor_zlib_free(conn->zlib_state); + tor_compress_free(conn->zlib_state); conn->zlib_state = NULL; } return 0; diff --git a/src/or/or.h b/src/or/or.h index a7b3a66..6aec588 100644 --- a/src/or/or.h +++ b/src/or/or.h @@ -1773,8 +1773,8 @@ typedef struct dir_connection_t { /** List of spooled_resource_t for objects that we're spooling. We use * it from back to front. */ smartlist_t *spool; - /** The zlib object doing on-the-fly compression for spooled data. */ - tor_zlib_state_t *zlib_state; + /** The compression object doing on-the-fly compression for spooled data. */ + tor_compress_state_t *zlib_state;
/** What rendezvous service are we querying for? */ rend_data_t *rend_data; diff --git a/src/test/test_buffers.c b/src/test/test_buffers.c index f0edd42..a6f0309 100644 --- a/src/test/test_buffers.c +++ b/src/test/test_buffers.c @@ -584,12 +584,12 @@ test_buffers_zlib_impl(int finalize_with_nil) char *contents = NULL; char *expanded = NULL; buf_t *buf = NULL; - tor_zlib_state_t *zlib_state = NULL; + tor_compress_state_t *zlib_state = NULL; size_t out_len, in_len; int done;
buf = buf_new_with_capacity(128); /* will round up */ - zlib_state = tor_zlib_new(1, ZLIB_METHOD, HIGH_COMPRESSION); + zlib_state = tor_compress_new(1, ZLIB_METHOD, HIGH_COMPRESSION);
msg = tor_malloc(512); crypto_rand(msg, 512); @@ -621,7 +621,7 @@ test_buffers_zlib_impl(int finalize_with_nil)
done: buf_free(buf); - tor_zlib_free(zlib_state); + tor_compress_free(zlib_state); tor_free(contents); tor_free(expanded); tor_free(msg); @@ -647,7 +647,7 @@ test_buffers_zlib_fin_at_chunk_end(void *arg) char *contents = NULL; char *expanded = NULL; buf_t *buf = NULL; - tor_zlib_state_t *zlib_state = NULL; + tor_compress_state_t *zlib_state = NULL; size_t out_len, in_len; size_t sz, headerjunk; (void) arg; @@ -666,7 +666,7 @@ test_buffers_zlib_fin_at_chunk_end(void *arg) tt_uint_op(buf->head->datalen, OP_EQ, headerjunk); tt_uint_op(buf_datalen(buf), OP_EQ, headerjunk); /* Write an empty string, with finalization on. */ - zlib_state = tor_zlib_new(1, ZLIB_METHOD, HIGH_COMPRESSION); + zlib_state = tor_compress_new(1, ZLIB_METHOD, HIGH_COMPRESSION); tt_int_op(write_to_buf_zlib(buf, zlib_state, "", 0, 1), OP_EQ, 0);
in_len = buf_datalen(buf); @@ -687,7 +687,7 @@ test_buffers_zlib_fin_at_chunk_end(void *arg)
done: buf_free(buf); - tor_zlib_free(zlib_state); + tor_compress_free(zlib_state); tor_free(contents); tor_free(expanded); tor_free(msg); diff --git a/src/test/test_util.c b/src/test/test_util.c index 7e24279..dacf56f 100644 --- a/src/test/test_util.c +++ b/src/test/test_util.c @@ -2249,7 +2249,7 @@ test_util_gzip(void *arg) char *buf1=NULL, *buf2=NULL, *buf3=NULL, *cp1, *cp2; const char *ccp2; size_t len1, len2; - tor_zlib_state_t *state = NULL; + tor_compress_state_t *state = NULL;
(void)arg; buf1 = tor_strdup("AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAZAAAAAAAAAAAAAAAAAAAZ"); @@ -2320,21 +2320,21 @@ test_util_gzip(void *arg) tor_free(buf1); tor_free(buf2); tor_free(buf3); - state = tor_zlib_new(1, ZLIB_METHOD, HIGH_COMPRESSION); + state = tor_compress_new(1, ZLIB_METHOD, HIGH_COMPRESSION); tt_assert(state); cp1 = buf1 = tor_malloc(1024); len1 = 1024; ccp2 = "ABCDEFGHIJABCDEFGHIJ"; len2 = 21; - tt_assert(tor_zlib_process(state, &cp1, &len1, &ccp2, &len2, 0) - == TOR_ZLIB_OK); + tt_int_op(tor_compress_process(state, &cp1, &len1, &ccp2, &len2, 0), + OP_EQ, TOR_COMPRESS_OK); tt_int_op(0, OP_EQ, len2); /* Make sure we compressed it all. */ tt_assert(cp1 > buf1);
len2 = 0; cp2 = cp1; - tt_assert(tor_zlib_process(state, &cp1, &len1, &ccp2, &len2, 1) - == TOR_ZLIB_DONE); + tt_int_op(tor_compress_process(state, &cp1, &len1, &ccp2, &len2, 1), + OP_EQ, TOR_COMPRESS_DONE); tt_int_op(0, OP_EQ, len2); tt_assert(cp1 > cp2); /* Make sure we really added something. */
@@ -2346,7 +2346,7 @@ test_util_gzip(void *arg)
done: if (state) - tor_zlib_free(state); + tor_compress_free(state); tor_free(buf2); tor_free(buf3); tor_free(buf1); @@ -2364,7 +2364,7 @@ test_util_gzip_compression_bomb(void *arg) char *one_mb = tor_malloc_zero(one_million); char *result = NULL; size_t result_len = 0; - tor_zlib_state_t *state = NULL; + tor_compress_state_t *state = NULL;
/* Make sure we can't produce a compression bomb */ setup_full_capture_of_logs(LOG_WARN); @@ -2386,22 +2386,22 @@ test_util_gzip_compression_bomb(void *arg) ZLIB_METHOD, 0, LOG_WARN));
/* Now try streaming that. */ - state = tor_zlib_new(0, ZLIB_METHOD, HIGH_COMPRESSION); - tor_zlib_output_t r; + state = tor_compress_new(0, ZLIB_METHOD, HIGH_COMPRESSION); + tor_compress_output_t r; const char *inp = compression_bomb; size_t inlen = 1039; do { char *outp = one_mb; size_t outleft = 4096; /* small on purpose */ - r = tor_zlib_process(state, &outp, &outleft, &inp, &inlen, 0); + r = tor_compress_process(state, &outp, &outleft, &inp, &inlen, 0); tt_int_op(inlen, OP_NE, 0); - } while (r == TOR_ZLIB_BUF_FULL); + } while (r == TOR_COMPRESS_BUFFER_FULL);
- tt_int_op(r, OP_EQ, TOR_ZLIB_ERR); + tt_int_op(r, OP_EQ, TOR_COMPRESS_ERROR);
done: tor_free(one_mb); - tor_zlib_free(state); + tor_compress_free(state); }
/** Run unit tests for mmap() wrapper functionality. */