This is an automated email from the git hooks/post-receive script.
ahf pushed a commit to branch main in repository tor.
commit 2f865b4bba1bedc96f17b696916d74c392c83e1b Author: Mike Perry mikeperry-git@torproject.org AuthorDate: Sun Apr 2 21:06:20 2023 +0000
Prop#329 streams: Handle stream usage with conflux
This adds utility functions to help stream block decisions, as well as cpath layer_hint checks for stream cell acceptance, and syncing stream lists for conflux circuits.
These functions are then called throughout the codebase to properly manage conflux streams. --- src/core/or/circuitlist.c | 1 + src/core/or/circuitpadding.c | 7 +- src/core/or/circuituse.c | 21 +- src/core/or/conflux_util.c | 393 ++++++++++++++++++++++++++++++++ src/core/or/conflux_util.h | 59 +++++ src/core/or/congestion_control_common.c | 3 +- src/core/or/congestion_control_flow.c | 47 +--- src/core/or/congestion_control_flow.h | 2 - src/core/or/connection_edge.c | 6 + src/core/or/edge_connection_st.h | 10 +- src/core/or/or_circuit_st.h | 12 +- src/core/or/origin_circuit_st.h | 11 +- src/core/or/relay.c | 127 ++++++----- src/feature/relay/dns.c | 4 + 14 files changed, 596 insertions(+), 107 deletions(-)
diff --git a/src/core/or/circuitlist.c b/src/core/or/circuitlist.c index 01baf7c795..8c5beebbf3 100644 --- a/src/core/or/circuitlist.c +++ b/src/core/or/circuitlist.c @@ -2368,6 +2368,7 @@ circuit_about_to_free(circuit_t *circ) if (! CIRCUIT_IS_ORIGIN(circ)) { or_circuit_t *or_circ = TO_OR_CIRCUIT(circ); edge_connection_t *conn; + for (conn=or_circ->n_streams; conn; conn=conn->next_stream) connection_edge_destroy(or_circ->p_circ_id, conn); or_circ->n_streams = NULL; diff --git a/src/core/or/circuitpadding.c b/src/core/or/circuitpadding.c index 99dc5f9d83..590a2d60c9 100644 --- a/src/core/or/circuitpadding.c +++ b/src/core/or/circuitpadding.c @@ -78,6 +78,8 @@ #include "core/crypto/relay_crypto.h" #include "feature/nodelist/nodelist.h"
+#include "src/core/or/conflux_util.h" + #include "app/config/config.h"
static inline circpad_circuit_state_t circpad_circuit_state( @@ -251,8 +253,11 @@ circpad_marked_circuit_for_padding(circuit_t *circ, int reason) * has shut down, but using the MaxCircuitDirtiness timer instead of * the idle circuit timer (again, we want this because we're not * supposed to look idle to Guard nodes that can see our lifespan). */ - if (!circ->timestamp_dirty) + if (!circ->timestamp_dirty) { circ->timestamp_dirty = approx_time(); + if (circ->conflux && CIRCUIT_IS_ORIGIN(circ)) + conflux_sync_circ_fields(circ->conflux, TO_ORIGIN_CIRCUIT(circ)); + }
/* Take ownership of the circuit */ circuit_change_purpose(circ, CIRCUIT_PURPOSE_C_CIRCUIT_PADDING); diff --git a/src/core/or/circuituse.c b/src/core/or/circuituse.c index 25401aea55..f5d5cb4397 100644 --- a/src/core/or/circuituse.c +++ b/src/core/or/circuituse.c @@ -64,6 +64,7 @@ #include "lib/time/tvdiff.h" #include "lib/trace/events.h" #include "src/core/mainloop/mainloop.h" +#include "core/or/conflux.h"
#include "core/or/cpath_build_state_st.h" #include "feature/dircommon/dir_connection_st.h" @@ -700,7 +701,6 @@ circuit_expire_building(void) } else { /* circuit not open, consider recording failure as timeout */ int first_hop_succeeded = TO_ORIGIN_CIRCUIT(victim)->cpath && TO_ORIGIN_CIRCUIT(victim)->cpath->state == CPATH_STATE_OPEN; - if (TO_ORIGIN_CIRCUIT(victim)->p_streams != NULL) { log_warn(LD_BUG, "Circuit %d (purpose %d, %s) has timed out, " "yet has attached streams!", @@ -1351,6 +1351,7 @@ circuit_detach_stream(circuit_t *circ, edge_connection_t *conn) int removed = 0; if (conn == origin_circ->p_streams) { origin_circ->p_streams = conn->next_stream; + conflux_update_p_streams(origin_circ, conn->next_stream); removed = 1; } else { for (prevconn = origin_circ->p_streams; @@ -1383,10 +1384,12 @@ circuit_detach_stream(circuit_t *circ, edge_connection_t *conn) or_circuit_t *or_circ = TO_OR_CIRCUIT(circ); if (conn == or_circ->n_streams) { or_circ->n_streams = conn->next_stream; + conflux_update_n_streams(or_circ, conn->next_stream); return; } if (conn == or_circ->resolving_streams) { or_circ->resolving_streams = conn->next_stream; + conflux_update_resolving_streams(or_circ, conn->next_stream); return; }
@@ -2556,7 +2559,13 @@ circuit_get_open_circ_or_launch(entry_connection_t *conn, }
/** Return true iff <b>crypt_path</b> is one of the crypt_paths for - * <b>circ</b>. */ + * <b>circ</b>. + * + * WARNING: This function only validates that the cpath is on the *current* + * circuit, for internal consistency checking. For codepaths involving streams, + * or cpaths or layer_hints that could be from a different circuit due to + * conflux, use edge_uses_cpath() or conflux_validate_source_hop() instead. + */ static int cpath_is_on_circuit(origin_circuit_t *circ, crypt_path_t *crypt_path) { @@ -2594,6 +2603,7 @@ link_apconn_to_circ(entry_connection_t *apconn, origin_circuit_t *circ, ENTRY_TO_EDGE_CONN(apconn)->on_circuit = TO_CIRCUIT(circ); /* assert_connection_ok(conn, time(NULL)); */ circ->p_streams = ENTRY_TO_EDGE_CONN(apconn); + conflux_update_p_streams(circ, ENTRY_TO_EDGE_CONN(apconn));
if (connection_edge_is_rendezvous_stream(ENTRY_TO_EDGE_CONN(apconn))) { /* We are attaching a stream to a rendezvous circuit. That means @@ -2733,6 +2743,9 @@ connection_ap_handshake_attach_chosen_circuit(entry_connection_t *conn, /* When stream isolation is in use and controlled by an application * we are willing to keep using the stream. */ circ->base_.timestamp_dirty = approx_time(); + if (TO_CIRCUIT(circ)->conflux) { + conflux_sync_circ_fields(TO_CIRCUIT(circ)->conflux, circ); + } }
pathbias_count_use_attempt(circ); @@ -3103,6 +3116,10 @@ mark_circuit_unusable_for_new_conns(origin_circuit_t *circ) circ->base_.timestamp_dirty -= options->MaxCircuitDirtiness;
circ->unusable_for_new_conns = 1; + + if (TO_CIRCUIT(circ)->conflux) { + conflux_sync_circ_fields(TO_CIRCUIT(circ)->conflux, circ); + } }
/** diff --git a/src/core/or/conflux_util.c b/src/core/or/conflux_util.c new file mode 100644 index 0000000000..855d477428 --- /dev/null +++ b/src/core/or/conflux_util.c @@ -0,0 +1,393 @@ +/* Copyright (c) 2021, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +/** + * \file conflux_util.c + * \brief Conflux utility functions for stream blocking and management. + */ + +#define TOR_CONFLUX_PRIVATE + +#include "core/or/or.h" + +#include "core/or/circuit_st.h" +#include "core/or/sendme.h" +#include "core/or/congestion_control_common.h" +#include "core/or/congestion_control_st.h" +#include "core/or/circuitlist.h" +#include "core/or/origin_circuit_st.h" +#include "core/or/or_circuit_st.h" +#include "core/or/conflux.h" +#include "core/or/conflux_params.h" +#include "core/or/conflux_util.h" +#include "core/or/conflux_st.h" +#include "lib/time/compat_time.h" +#include "app/config/config.h" + +/** + * This is a utility function that returns the package window circuit, + * regardless of if it has a conflux pair or not. + */ +int +circuit_get_package_window(circuit_t *circ, + const crypt_path_t *cpath) +{ + if (circ->conflux) { + if (CIRCUIT_IS_ORIGIN(circ)) { + tor_assert_nonfatal(circ->purpose == + CIRCUIT_PURPOSE_CONFLUX_LINKED); + } + circ = conflux_decide_next_circ(circ->conflux); + + /* If conflux has no circuit to send on, the package window is 0. */ + if (!circ) { + return 0; + } + + /* If we are the origin, we need to get the last hop's cpath for + * congestion control information. */ + if (CIRCUIT_IS_ORIGIN(circ)) { + cpath = CONST_TO_ORIGIN_CIRCUIT(circ)->cpath->prev; + } else { + if (BUG(cpath != NULL)) { + log_warn(LD_BUG, "cpath is not NULL for non-origin circuit"); + } + } + } + + return congestion_control_get_package_window(circ, cpath); +} + +/** + * Returns true if conflux can send a data cell. + * + * Used to decide if we should block streams or not, for + * proccess_sendme_cell(), circuit_resume_edge_reading(), + * circuit_consider_stop_edge_reading(), circuit_resume_edge_reading_helper(), + * channel_flush_from_first_active_circuit() +*/ +bool +conflux_can_send(conflux_t *cfx) +{ + const circuit_t *send_circ = conflux_decide_next_circ(cfx); + + /* If we have a circuit, we can send */ + if (send_circ) { + return true; + } else { + return false; + } +} + +/** + * For a given conflux circuit, return the cpath of the destination. + * + * The cpath destination is the last hop of the circuit, or NULL if + * the circuit is a non-origin circuit. + */ +crypt_path_t * +conflux_get_destination_hop(circuit_t *circ) +{ + if (BUG(!circ)) { + log_warn(LD_BUG, "No circuit to send on for conflux"); + return NULL; + } else { + /* Conflux circuits always send multiplexed relay commands to + * to the last hop. (Non-multiplexed commands go on their + * original circuit and hop). */ + if (CIRCUIT_IS_ORIGIN(circ)) { + return TO_ORIGIN_CIRCUIT(circ)->cpath->prev; + } else { + return NULL; + } + } +} + +/** + * Validates that the source of a cell is from the last hop of the circuit + * for origin circuits, and that there are no further hops for non-origin + * circuits. + */ +bool +conflux_validate_source_hop(circuit_t *in_circ, + crypt_path_t *layer_hint) +{ + crypt_path_t *dest = conflux_get_destination_hop(in_circ); + + if (dest != layer_hint) { + log_warn(LD_CIRC, "Got conflux command from incorrect hop"); + return false; + } + + if (layer_hint == NULL) { + /* We should not have further hops attached to this circuit */ + if (in_circ->n_chan) { + log_warn(LD_BUG, "Got conflux command on circuit with further hops"); + return false; + } + } + return true; +} + +/** + * Returns true if the edge connection uses the given cpath. + * + * If there is a conflux object, we inspect all the last hops of the conflux + * circuits. + */ +bool +edge_uses_cpath(const edge_connection_t *conn, + const crypt_path_t *cpath) +{ + if (!conn->on_circuit) + return false; + + if (CIRCUIT_IS_ORIGIN(conn->on_circuit)) { + if (conn->on_circuit->conflux) { + tor_assert_nonfatal(conn->on_circuit->purpose == + CIRCUIT_PURPOSE_CONFLUX_LINKED); + + /* If the circuit is an origin circuit with a conflux object, the cpath + * is valid if it came from any of the conflux circuit's last hops. */ + CONFLUX_FOR_EACH_LEG_BEGIN(conn->on_circuit->conflux, leg) { + const origin_circuit_t *ocirc = CONST_TO_ORIGIN_CIRCUIT(leg->circ); + if (ocirc->cpath->prev == cpath) { + return true; + } + } CONFLUX_FOR_EACH_LEG_END(leg); + } else { + return cpath == conn->cpath_layer; + } + } else { + /* For non-origin circuits, cpath should be null */ + return cpath == NULL; + } + + return false; +} + +/** + * Returns the max RTT for the circuit that carries this stream, + * as observed by congestion control. For conflux circuits, + * we return the max RTT across all circuits. + */ +uint64_t +edge_get_max_rtt(const edge_connection_t *stream) +{ + if (!stream->on_circuit) + return 0; + + if (stream->on_circuit->conflux) { + tor_assert_nonfatal(stream->on_circuit->purpose == + CIRCUIT_PURPOSE_CONFLUX_LINKED); + + /* Find the max rtt from the ccontrol object of each circuit. */ + uint64_t max_rtt = 0; + CONFLUX_FOR_EACH_LEG_BEGIN(stream->on_circuit->conflux, leg) { + const congestion_control_t *cc = circuit_ccontrol(leg->circ); + if (cc->max_rtt_usec > max_rtt) { + max_rtt = cc->max_rtt_usec; + } + } CONFLUX_FOR_EACH_LEG_END(leg); + + return max_rtt; + } else { + if (stream->on_circuit && stream->on_circuit->ccontrol) + return stream->on_circuit->ccontrol->max_rtt_usec; + else if (stream->cpath_layer && stream->cpath_layer->ccontrol) + return stream->cpath_layer->ccontrol->max_rtt_usec; + } + + return 0; +} + +/** + * Return true iff our decryption layer_hint is from the last hop + * in a circuit. + */ +bool +relay_crypt_from_last_hop(const origin_circuit_t *circ, + const crypt_path_t *layer_hint) +{ + tor_assert(circ); + tor_assert(layer_hint); + tor_assert(circ->cpath); + + if (TO_CIRCUIT(circ)->conflux) { + tor_assert_nonfatal(TO_CIRCUIT(circ)->purpose == + CIRCUIT_PURPOSE_CONFLUX_LINKED); + + /* If we are a conflux circuit, we need to check if the layer_hint + * is from the last hop of any of the conflux circuits. */ + CONFLUX_FOR_EACH_LEG_BEGIN(TO_CIRCUIT(circ)->conflux, leg) { + const origin_circuit_t *ocirc = CONST_TO_ORIGIN_CIRCUIT(leg->circ); + if (layer_hint == ocirc->cpath->prev) { + return true; + } + } CONFLUX_FOR_EACH_LEG_END(leg); + + log_fn(LOG_PROTOCOL_WARN, LD_CIRC, + "Got unexpected relay data from intermediate hop"); + return false; + } else { + if (layer_hint != circ->cpath->prev) { + log_fn(LOG_PROTOCOL_WARN, LD_CIRC, + "Got unexpected relay data from intermediate hop"); + return false; + } + return true; + } +} + +/** + * Update the head of the n_streams list on all circuits in the conflux + * set. + */ +void +conflux_update_p_streams(origin_circuit_t *circ, edge_connection_t *stream) +{ + tor_assert(circ); + + if (TO_CIRCUIT(circ)->conflux) { + tor_assert_nonfatal(TO_CIRCUIT(circ)->purpose == + CIRCUIT_PURPOSE_CONFLUX_LINKED); + CONFLUX_FOR_EACH_LEG_BEGIN(TO_CIRCUIT(circ)->conflux, leg) { + TO_ORIGIN_CIRCUIT(leg->circ)->p_streams = stream; + } CONFLUX_FOR_EACH_LEG_END(leg); + } +} + +/** + * Sync the next_stream_id, timestamp_dirty, and circuit_idle_timeout + * fields of a conflux set to the values in a particular circuit. + * + * This is called upon link, and whenever one of these fields + * changes on ref_circ. The ref_circ values are copied to all + * other circuits in the conflux set. +*/ +void +conflux_sync_circ_fields(conflux_t *cfx, origin_circuit_t *ref_circ) +{ + tor_assert(cfx); + tor_assert(ref_circ); + + CONFLUX_FOR_EACH_LEG_BEGIN(cfx, leg) { + if (leg->circ == TO_CIRCUIT(ref_circ)) { + continue; + } + origin_circuit_t *ocirc = TO_ORIGIN_CIRCUIT(leg->circ); + ocirc->next_stream_id = ref_circ->next_stream_id; + leg->circ->timestamp_dirty = TO_CIRCUIT(ref_circ)->timestamp_dirty; + ocirc->circuit_idle_timeout = ref_circ->circuit_idle_timeout; + ocirc->unusable_for_new_conns = ref_circ->unusable_for_new_conns; + } CONFLUX_FOR_EACH_LEG_END(leg); +} + +/** + * Update the head of the n_streams list on all circuits in the conflux + * set. + */ +void +conflux_update_n_streams(or_circuit_t *circ, edge_connection_t *stream) +{ + tor_assert(circ); + + if (TO_CIRCUIT(circ)->conflux) { + CONFLUX_FOR_EACH_LEG_BEGIN(TO_CIRCUIT(circ)->conflux, leg) { + TO_OR_CIRCUIT(leg->circ)->n_streams = stream; + } CONFLUX_FOR_EACH_LEG_END(leg); + } +} + +/** + * Update the head of the resolving_streams list on all circuits in the conflux + * set. + */ +void +conflux_update_resolving_streams(or_circuit_t *circ, edge_connection_t *stream) +{ + tor_assert(circ); + + if (TO_CIRCUIT(circ)->conflux) { + CONFLUX_FOR_EACH_LEG_BEGIN(TO_CIRCUIT(circ)->conflux, leg) { + TO_OR_CIRCUIT(leg->circ)->resolving_streams = stream; + } CONFLUX_FOR_EACH_LEG_END(leg); + } +} + +/** + * Update the half_streams list on all circuits in the conflux + */ +void +conflux_update_half_streams(origin_circuit_t *circ, smartlist_t *half_streams) +{ + tor_assert(circ); + + if (TO_CIRCUIT(circ)->conflux) { + tor_assert_nonfatal(TO_CIRCUIT(circ)->purpose == + CIRCUIT_PURPOSE_CONFLUX_LINKED); + CONFLUX_FOR_EACH_LEG_BEGIN(TO_CIRCUIT(circ)->conflux, leg) { + TO_ORIGIN_CIRCUIT(leg->circ)->half_streams = half_streams; + } CONFLUX_FOR_EACH_LEG_END(leg); + } +} + +/** + * Helper function that emits non-fatal asserts if the stream lists + * or next_stream_id is out of sync between any of the conflux legs. +*/ +void +conflux_validate_stream_lists(const conflux_t *cfx) +{ + const conflux_leg_t *first_leg = smartlist_get(cfx->legs, 0); + tor_assert(first_leg); + + /* Compare the stream lists of the first leg to all other legs. */ + if (CIRCUIT_IS_ORIGIN(first_leg->circ)) { + const origin_circuit_t *f_circ = + CONST_TO_ORIGIN_CIRCUIT(first_leg->circ); + + CONFLUX_FOR_EACH_LEG_BEGIN(cfx, leg) { + const origin_circuit_t *l_circ = CONST_TO_ORIGIN_CIRCUIT(leg->circ); + tor_assert_nonfatal(l_circ->p_streams == f_circ->p_streams); + tor_assert_nonfatal(l_circ->half_streams == f_circ->half_streams); + tor_assert_nonfatal(l_circ->next_stream_id == f_circ->next_stream_id); + } CONFLUX_FOR_EACH_LEG_END(leg); + } else { + const or_circuit_t *f_circ = CONST_TO_OR_CIRCUIT(first_leg->circ); + CONFLUX_FOR_EACH_LEG_BEGIN(cfx, leg) { + const or_circuit_t *l_circ = CONST_TO_OR_CIRCUIT(leg->circ); + tor_assert_nonfatal(l_circ->n_streams == f_circ->n_streams); + tor_assert_nonfatal(l_circ->resolving_streams == + f_circ->resolving_streams); + } CONFLUX_FOR_EACH_LEG_END(leg); + } +} + +/** + * Validate the conflux set has two legs, and both circuits have + * no nonce, and for origin circuits, the purpose is CONFLUX_PURPOSE_LINKED. + */ +void +conflux_validate_legs(const conflux_t *cfx) +{ + tor_assert(cfx); + // TODO-329-UDP: Eventually we want to allow three legs for the + // exit case, to allow reconnection of legs to hit an RTT target. + // For now, this validation helps find bugs. + if (BUG(smartlist_len(cfx->legs) > conflux_params_get_num_legs_set())) { + log_warn(LD_BUG, "Number of legs is above maximum of %d allowed: %d\n", + conflux_params_get_num_legs_set(), smartlist_len(cfx->legs)); + } + + CONFLUX_FOR_EACH_LEG_BEGIN(cfx, leg) { + /* Ensure we have no pending nonce on the circ */ + tor_assert_nonfatal(leg->circ->conflux_pending_nonce == NULL); + tor_assert_nonfatal(leg->circ->conflux != NULL); + + if (CIRCUIT_IS_ORIGIN(leg->circ)) { + tor_assert_nonfatal(leg->circ->purpose == + CIRCUIT_PURPOSE_CONFLUX_LINKED); + } + } CONFLUX_FOR_EACH_LEG_END(leg); +} diff --git a/src/core/or/conflux_util.h b/src/core/or/conflux_util.h new file mode 100644 index 0000000000..c556ae1848 --- /dev/null +++ b/src/core/or/conflux_util.h @@ -0,0 +1,59 @@ +/* Copyright (c) 2023, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +/** + * \file conflux_util.h + * \brief Header file for conflux_util.c. + **/ + +#ifndef TOR_CONFLUX_UTIL_H +#define TOR_CONFLUX_UTIL_H + +/* Forward decls */ +typedef struct edge_connection_t edge_connection_t; +typedef struct crypt_path_t crypt_path_t; +typedef struct origin_circuit_t origin_circuit_t; +typedef struct conflux_t conflux_t; + +/* True iff the given circuit_t circ is conflux related. */ +static inline bool +CIRCUIT_IS_CONFLUX(const circuit_t *circ) +{ + if (circ->conflux_pending_nonce) { + if (CIRCUIT_IS_ORIGIN(circ)) + tor_assert_nonfatal(circ->purpose == CIRCUIT_PURPOSE_CONFLUX_UNLINKED); + return true; + } else if (circ->conflux) { + if (CIRCUIT_IS_ORIGIN(circ)) + tor_assert_nonfatal(circ->purpose == CIRCUIT_PURPOSE_CONFLUX_LINKED); + return true; + } else { + tor_assert_nonfatal(circ->purpose != CIRCUIT_PURPOSE_CONFLUX_LINKED); + tor_assert_nonfatal(circ->purpose != CIRCUIT_PURPOSE_CONFLUX_UNLINKED); + return false; + } +} + +int circuit_get_package_window(circuit_t *circ, + const crypt_path_t *cpath); +bool conflux_can_send(conflux_t *cfx); + +bool edge_uses_cpath(const edge_connection_t *conn, + const crypt_path_t *cpath); +crypt_path_t *conflux_get_destination_hop(circuit_t *circ); +bool conflux_validate_source_hop(circuit_t *in_circ, + crypt_path_t *layer_hint); +uint64_t edge_get_max_rtt(const edge_connection_t *stream); +bool relay_crypt_from_last_hop(const origin_circuit_t *circ, + const crypt_path_t *layer_hint); + +void conflux_update_p_streams(origin_circuit_t *, edge_connection_t *); +void conflux_update_half_streams(origin_circuit_t *, smartlist_t *); +void conflux_update_n_streams(or_circuit_t *, edge_connection_t *); +void conflux_update_resolving_streams(or_circuit_t *, edge_connection_t *); +void conflux_sync_circ_fields(conflux_t *cfx, origin_circuit_t *ref_circ); +void conflux_validate_stream_lists(const conflux_t *cfx); +void conflux_validate_legs(const conflux_t *cfx); + +#endif /* TOR_CONFLUX_UTIL_H */ + diff --git a/src/core/or/congestion_control_common.c b/src/core/or/congestion_control_common.c index c7c950d0c8..1e0f504df1 100644 --- a/src/core/or/congestion_control_common.c +++ b/src/core/or/congestion_control_common.c @@ -24,6 +24,7 @@ #include "core/or/congestion_control_westwood.h" #include "core/or/congestion_control_st.h" #include "core/or/conflux.h" +#include "core/or/conflux_util.h" #include "core/or/trace_probes_cc.h" #include "lib/time/compat_time.h" #include "feature/nodelist/networkstatus.h" @@ -703,7 +704,7 @@ circuit_has_active_streams(const circuit_t *circ, if (conn->base_.marked_for_close) continue;
- if (!layer_hint || conn->cpath_layer == layer_hint) { + if (edge_uses_cpath(conn, layer_hint)) { if (connection_get_inbuf_len(TO_CONN(conn)) > 0) { log_info(LD_CIRC, "CC: More in edge inbuf..."); return 1; diff --git a/src/core/or/congestion_control_flow.c b/src/core/or/congestion_control_flow.c index 90b1927ef9..fa9455a8a1 100644 --- a/src/core/or/congestion_control_flow.c +++ b/src/core/or/congestion_control_flow.c @@ -28,6 +28,7 @@ #include "core/or/connection_st.h" #include "core/or/cell_st.h" #include "app/config/config.h" +#include "core/or/conflux_util.h"
/** Cache consensus parameters */ static uint32_t xoff_client; @@ -60,27 +61,6 @@ double cc_stats_flow_xon_outbuf_ma = 0; #define ONE_MEGABYTE (UINT64_C(1) << 20) #define TOTAL_XMIT_SCALE_AT (10 * ONE_MEGABYTE)
-/** - * Return the congestion control object of the given edge connection. - * - * Returns NULL if the edge connection doesn't have a cpath_layer or not - * attached to a circuit. But also if the cpath_layer or circuit doesn't have a - * congestion control object. - */ -static inline const congestion_control_t * -edge_get_ccontrol(const edge_connection_t *edge) -{ - congestion_control_t *ccontrol = NULL; - - if (edge->on_circuit && edge->on_circuit->ccontrol) { - ccontrol = edge->on_circuit->ccontrol; - } else if (edge->cpath_layer && edge->cpath_layer->ccontrol) { - ccontrol = edge->cpath_layer->ccontrol; - } - - return ccontrol; -} - /** * Update global congestion control related consensus parameter values, every * consensus update. @@ -265,13 +245,13 @@ circuit_process_stream_xoff(edge_connection_t *conn, }
/* Make sure this XOFF came from the right hop */ - if (layer_hint && layer_hint != conn->cpath_layer) { + if (!edge_uses_cpath(conn, layer_hint)) { log_fn(LOG_PROTOCOL_WARN, LD_EDGE, "Got XOFF from wrong hop."); return false; }
- if (edge_get_ccontrol(conn) == NULL) { + if (!edge_uses_flow_control(conn)) { log_fn(LOG_PROTOCOL_WARN, LD_EDGE, "Got XOFF for non-congestion control circuit"); return false; @@ -359,13 +339,13 @@ circuit_process_stream_xon(edge_connection_t *conn, }
/* Make sure this XON came from the right hop */ - if (layer_hint && layer_hint != conn->cpath_layer) { + if (!edge_uses_cpath(conn, layer_hint)) { log_fn(LOG_PROTOCOL_WARN, LD_EDGE, "Got XON from wrong hop."); return false; }
- if (edge_get_ccontrol(conn) == NULL) { + if (!edge_uses_flow_control(conn)) { log_fn(LOG_PROTOCOL_WARN, LD_EDGE, "Got XON for non-congestion control circuit"); return false; @@ -464,7 +444,7 @@ flow_control_decide_xoff(edge_connection_t *stream) size_t total_buffered = connection_get_outbuf_len(TO_CONN(stream)); uint32_t buffer_limit_xoff = 0;
- if (BUG(edge_get_ccontrol(stream) == NULL)) { + if (BUG(!edge_uses_flow_control(stream))) { log_err(LD_BUG, "Flow control called for non-congestion control circuit"); return -1; } @@ -717,21 +697,6 @@ edge_uses_flow_control(const edge_connection_t *stream) return ret; }
-/** - * Returns the max RTT for the circuit that carries this stream, - * as observed by congestion control. - */ -uint64_t -edge_get_max_rtt(const edge_connection_t *stream) -{ - if (stream->on_circuit && stream->on_circuit->ccontrol) - return stream->on_circuit->ccontrol->max_rtt_usec; - else if (stream->cpath_layer && stream->cpath_layer->ccontrol) - return stream->cpath_layer->ccontrol->max_rtt_usec; - - return 0; -} - /** Returns true if a connection is an edge conn that uses flow control */ bool conn_uses_flow_control(connection_t *conn) diff --git a/src/core/or/congestion_control_flow.h b/src/core/or/congestion_control_flow.h index 5c735cce23..05e25c44d0 100644 --- a/src/core/or/congestion_control_flow.h +++ b/src/core/or/congestion_control_flow.h @@ -31,8 +31,6 @@ bool edge_uses_flow_control(const edge_connection_t *stream);
bool conn_uses_flow_control(connection_t *stream);
-uint64_t edge_get_max_rtt(const edge_connection_t *); - /** Metricsport externs */ extern uint64_t cc_stats_flow_num_xoff_sent; extern uint64_t cc_stats_flow_num_xon_sent; diff --git a/src/core/or/connection_edge.c b/src/core/or/connection_edge.c index b0ccedc27f..ee6ab8596c 100644 --- a/src/core/or/connection_edge.c +++ b/src/core/or/connection_edge.c @@ -70,6 +70,7 @@ #include "core/or/circuitpadding.h" #include "core/or/connection_edge.h" #include "core/or/congestion_control_flow.h" +#include "core/or/conflux_util.h" #include "core/or/circuitstats.h" #include "core/or/connection_or.h" #include "core/or/extendinfo.h" @@ -628,6 +629,7 @@ connection_half_edge_add(const edge_connection_t *conn,
if (!circ->half_streams) { circ->half_streams = smartlist_new(); + conflux_update_half_streams(circ, circ->half_streams); }
half_conn->stream_id = conn->stream_id; @@ -3102,6 +3104,10 @@ get_unique_stream_id_by_circ(origin_circuit_t *circ) test_stream_id)) goto again;
+ if (TO_CIRCUIT(circ)->conflux) { + conflux_sync_circ_fields(TO_CIRCUIT(circ)->conflux, circ); + } + return test_stream_id; }
diff --git a/src/core/or/edge_connection_st.h b/src/core/or/edge_connection_st.h index 22f9040d15..e8a3039b33 100644 --- a/src/core/or/edge_connection_st.h +++ b/src/core/or/edge_connection_st.h @@ -28,11 +28,15 @@ struct edge_connection_t { * circuit? */ int deliver_window; /**< How many more relay cells can end at me? */
- struct circuit_t *on_circuit; /**< The circuit (if any) that this edge - * connection is using. */ + /** The circuit (if any) that this edge connection is using. + * Note that edges that use conflux should use the helpers + * in conflux_util.c instead of accessing this directly. */ + struct circuit_t *on_circuit;
/** A pointer to which node in the circ this conn exits at. Set for AP - * connections and for hidden service exit connections. */ + * connections and for hidden service exit connections. + * Note that edges that use conflux should use the helpers + * in conflux_util.c instead of accessing this directly. */ struct crypt_path_t *cpath_layer;
/* Hidden service connection identifier for edge connections. Used by the HS diff --git a/src/core/or/or_circuit_st.h b/src/core/or/or_circuit_st.h index 11695ec301..d5a7007928 100644 --- a/src/core/or/or_circuit_st.h +++ b/src/core/or/or_circuit_st.h @@ -35,10 +35,18 @@ struct or_circuit_t { cell_queue_t p_chan_cells; /** The channel that is previous in this circuit. */ channel_t *p_chan; - /** Linked list of Exit streams associated with this circuit. */ + /** Linked list of Exit streams associated with this circuit. + * + * Note that any updates to this pointer must be followed with + * conflux_update_n_streams() to keep the other legs n_streams + * in sync. */ edge_connection_t *n_streams; /** Linked list of Exit streams associated with this circuit that are - * still being resolved. */ + * still being resolved. + * + * Just like with n_streams, any updates to this pointer must + * be followed with conflux_update_resolving_streams(). + */ edge_connection_t *resolving_streams;
/** Cryptographic state used for encrypting and authenticating relay diff --git a/src/core/or/origin_circuit_st.h b/src/core/or/origin_circuit_st.h index 73b971f72d..c5c255bb49 100644 --- a/src/core/or/origin_circuit_st.h +++ b/src/core/or/origin_circuit_st.h @@ -80,11 +80,18 @@ struct origin_circuit_t { circuit_t base_;
/** Linked list of AP streams (or EXIT streams if hidden service) - * associated with this circuit. */ + * associated with this circuit. + * + * Any updates to this pointer must be followed with + * conflux_update_p_streams(). */ edge_connection_t *p_streams;
/** Smartlist of half-closed streams (half_edge_t*) that still - * have pending activity */ + * have pending activity. + * + * Any updates to this pointer must be followed with + * conflux_update_half_streams(). + */ smartlist_t *half_streams;
/** Bytes read on this circuit since last call to diff --git a/src/core/or/relay.c b/src/core/or/relay.c index 827f0c3e46..58e48df902 100644 --- a/src/core/or/relay.c +++ b/src/core/or/relay.c @@ -122,7 +122,8 @@ static int connection_edge_process_ordered_relay_cell(cell_t *cell, edge_connection_t *conn, crypt_path_t *layer_hint, relay_header_t *rh); -static void set_block_state_for_streams(edge_connection_t *stream_list, +static void set_block_state_for_streams(circuit_t *circ, + edge_connection_t *stream_list, int block, streamid_t stream_id);
/** Stats: how many relay cells have originated at this hop, or have @@ -455,7 +456,7 @@ relay_lookup_conn(circuit_t *circ, cell_t *cell, tmpconn=tmpconn->next_stream) { if (rh.stream_id == tmpconn->stream_id && !tmpconn->base_.marked_for_close && - tmpconn->cpath_layer == layer_hint) { + edge_uses_cpath(tmpconn, layer_hint)) { log_debug(LD_APP,"found conn for stream %d.", rh.stream_id); return tmpconn; } @@ -1549,25 +1550,6 @@ connection_edge_process_relay_cell_not_open( // return -1; }
-/** - * Return true iff our decryption layer_hint is from the last hop - * in a circuit. - */ -static bool -relay_crypt_from_last_hop(origin_circuit_t *circ, crypt_path_t *layer_hint) -{ - tor_assert(circ); - tor_assert(layer_hint); - tor_assert(circ->cpath); - - if (layer_hint != circ->cpath->prev) { - log_fn(LOG_PROTOCOL_WARN, LD_CIRC, - "Got unexpected relay data from intermediate hop"); - return false; - } - return true; -} - /** Process a SENDME cell that arrived on <b>circ</b>. If it is a stream level * cell, it is destined for the given <b>conn</b>. If it is a circuit level * cell, it is destined for the <b>layer_hint</b>. The <b>domain</b> is the @@ -2454,6 +2436,15 @@ circuit_resume_edge_reading(circuit_t *circ, crypt_path_t *layer_hint) log_debug(layer_hint?LD_APP:LD_EXIT,"Too big queue, no resuming"); return; } + + /* If we have a conflux negotiated, and it still can't send on + * any circuit, then do not resume sending. */ + if (circ->conflux && !conflux_can_send(circ->conflux)) { + log_debug(layer_hint?LD_APP:LD_EXIT, + "Conflux can't send, not resuming edges"); + return; + } + log_debug(layer_hint?LD_APP:LD_EXIT,"resuming");
if (CIRCUIT_IS_ORIGIN(circ)) @@ -2487,20 +2478,6 @@ circuit_resume_edge_reading_helper(edge_connection_t *first_conn, return 0; }
- /* How many cells do we have space for? It will be the minimum of - * the number needed to exhaust the package window, and the minimum - * needed to fill the cell queue. */ - - max_to_package = congestion_control_get_package_window(circ, layer_hint); - if (CIRCUIT_IS_ORIGIN(circ)) { - cells_on_queue = circ->n_chan_cells.n; - } else { - or_circuit_t *or_circ = TO_OR_CIRCUIT(circ); - cells_on_queue = or_circ->p_chan_cells.n; - } - if (cell_queue_highwatermark() - cells_on_queue < max_to_package) - max_to_package = cell_queue_highwatermark() - cells_on_queue; - /* Once we used to start listening on the streams in the order they * appeared in the linked list. That leads to starvation on the * streams that appeared later on the list, since the first streams @@ -2539,11 +2516,13 @@ circuit_resume_edge_reading_helper(edge_connection_t *first_conn, /* Activate reading starting from the chosen stream */ for (conn=chosen_stream; conn; conn = conn->next_stream) { /* Start reading for the streams starting from here */ - if (conn->base_.marked_for_close || conn->package_window <= 0 || - conn->xoff_received) + if (conn->base_.marked_for_close || conn->package_window <= 0) continue; - if (!layer_hint || conn->cpath_layer == layer_hint) { - connection_start_reading(TO_CONN(conn)); + + if (edge_uses_cpath(conn, layer_hint)) { + if (!conn->xoff_received) { + connection_start_reading(TO_CONN(conn)); + }
if (connection_get_inbuf_len(TO_CONN(conn)) > 0) ++n_packaging_streams; @@ -2551,11 +2530,13 @@ circuit_resume_edge_reading_helper(edge_connection_t *first_conn, } /* Go back and do the ones we skipped, circular-style */ for (conn = first_conn; conn != chosen_stream; conn = conn->next_stream) { - if (conn->base_.marked_for_close || conn->package_window <= 0 || - conn->xoff_received) + if (conn->base_.marked_for_close || conn->package_window <= 0) continue; - if (!layer_hint || conn->cpath_layer == layer_hint) { - connection_start_reading(TO_CONN(conn)); + + if (edge_uses_cpath(conn, layer_hint)) { + if (!conn->xoff_received) { + connection_start_reading(TO_CONN(conn)); + }
if (connection_get_inbuf_len(TO_CONN(conn)) > 0) ++n_packaging_streams; @@ -2567,6 +2548,32 @@ circuit_resume_edge_reading_helper(edge_connection_t *first_conn,
again:
+ /* If we're using conflux, the circuit we decide to send on may change + * after we're sending. Get it again, and re-check package windows + * for it */ + if (circ->conflux) { + if (circuit_consider_stop_edge_reading(circ, layer_hint)) + return -1; + + circ = conflux_decide_next_circ(circ->conflux); + + /* Get the destination layer hint for this circuit */ + layer_hint = conflux_get_destination_hop(circ); + } + + /* How many cells do we have space for? It will be the minimum of + * the number needed to exhaust the package window, and the minimum + * needed to fill the cell queue. */ + max_to_package = congestion_control_get_package_window(circ, layer_hint); + if (CIRCUIT_IS_ORIGIN(circ)) { + cells_on_queue = circ->n_chan_cells.n; + } else { + or_circuit_t *or_circ = TO_OR_CIRCUIT(circ); + cells_on_queue = or_circ->p_chan_cells.n; + } + if (cell_queue_highwatermark() - cells_on_queue < max_to_package) + max_to_package = cell_queue_highwatermark() - cells_on_queue; + cells_per_conn = CEIL_DIV(max_to_package, n_packaging_streams);
packaged_this_round = 0; @@ -2580,7 +2587,7 @@ circuit_resume_edge_reading_helper(edge_connection_t *first_conn, for (conn=first_conn; conn; conn=conn->next_stream) { if (conn->base_.marked_for_close || conn->package_window <= 0) continue; - if (!layer_hint || conn->cpath_layer == layer_hint) { + if (edge_uses_cpath(conn, layer_hint)) { int n = cells_per_conn, r; /* handle whatever might still be on the inbuf */ r = connection_edge_package_raw_inbuf(conn, 1, &n); @@ -2638,7 +2645,7 @@ circuit_consider_stop_edge_reading(circuit_t *circ, crypt_path_t *layer_hint) or_circuit_t *or_circ = TO_OR_CIRCUIT(circ); log_debug(domain,"considering circ->package_window %d", circ->package_window); - if (congestion_control_get_package_window(circ, layer_hint) <= 0) { + if (circuit_get_package_window(circ, layer_hint) <= 0) { log_debug(domain,"yes, not-at-origin. stopped."); for (conn = or_circ->n_streams; conn; conn=conn->next_stream) connection_stop_reading(TO_CONN(conn)); @@ -2649,11 +2656,11 @@ circuit_consider_stop_edge_reading(circuit_t *circ, crypt_path_t *layer_hint) /* else, layer hint is defined, use it */ log_debug(domain,"considering layer_hint->package_window %d", layer_hint->package_window); - if (congestion_control_get_package_window(circ, layer_hint) <= 0) { + if (circuit_get_package_window(circ, layer_hint) <= 0) { log_debug(domain,"yes, at-origin. stopped."); for (conn = TO_ORIGIN_CIRCUIT(circ)->p_streams; conn; conn=conn->next_stream) { - if (conn->cpath_layer == layer_hint) + if (edge_uses_cpath(conn, layer_hint)) connection_stop_reading(TO_CONN(conn)); } return 1; @@ -3029,7 +3036,7 @@ set_circuit_blocked_on_chan(circuit_t *circ, channel_t *chan, int block) edge = TO_OR_CIRCUIT(circ)->n_streams; }
- set_block_state_for_streams(edge, block, 0); + set_block_state_for_streams(circ, edge, block, 0); }
/** @@ -3039,15 +3046,29 @@ set_circuit_blocked_on_chan(circuit_t *circ, channel_t *chan, int block) * in the stream list. If it is non-zero, only apply to that specific stream. */ static void -set_block_state_for_streams(edge_connection_t *stream_list, int block, - streamid_t stream_id) +set_block_state_for_streams(circuit_t *circ, edge_connection_t *stream_list, + int block, streamid_t stream_id) { + /* If we have a conflux object, we need to examine its status before + * blocking and unblocking streams. */ + if (circ->conflux) { + bool can_send = conflux_can_send(circ->conflux); + + if (block && can_send) { + /* Don't actually block streams, since conflux can send*/ + return; + } else if (!block && !can_send) { + /* Don't actually unblock streams, since conflux still can't send */ + return; + } + } + for (edge_connection_t *edge = stream_list; edge; edge = edge->next_stream) { connection_t *conn = TO_CONN(edge); if (stream_id && edge->stream_id != stream_id) continue;
- if (!conn->read_event) { + if (!conn->read_event || edge->xoff_received) { /* This connection is a placeholder for something; probably a DNS * request. It can't actually stop or start reading.*/ continue; @@ -3412,8 +3433,8 @@ append_cell_to_circuit_queue(circuit_t *circ, channel_t *chan, if (circ_blocked && fromstream) { /* This edge connection is apparently not blocked; this can happen for * new streams on a blocked circuit, for their CONNECTED response. - * block it now. */ - set_block_state_for_streams(stream_list, 1, fromstream); + * block it now, unless we have conflux. */ + set_block_state_for_streams(circ, stream_list, 1, fromstream); }
update_circuit_on_cmux(circ, direction); diff --git a/src/feature/relay/dns.c b/src/feature/relay/dns.c index 7267ca06dd..f6a020d061 100644 --- a/src/feature/relay/dns.c +++ b/src/feature/relay/dns.c @@ -71,6 +71,7 @@
#include "core/or/edge_connection_st.h" #include "core/or/or_circuit_st.h" +#include "core/or/conflux_util.h"
#include "ht.h"
@@ -650,6 +651,7 @@ dns_resolve(edge_connection_t *exitconn) * connected cell. */ exitconn->next_stream = oncirc->n_streams; oncirc->n_streams = exitconn; + conflux_update_n_streams(oncirc, exitconn); } break; case 0: @@ -658,6 +660,7 @@ dns_resolve(edge_connection_t *exitconn) exitconn->base_.state = EXIT_CONN_STATE_RESOLVING; exitconn->next_stream = oncirc->resolving_streams; oncirc->resolving_streams = exitconn; + conflux_update_resolving_streams(oncirc, exitconn); break; case -2: case -1: @@ -1234,6 +1237,7 @@ inform_pending_connections(cached_resolve_t *resolve) pend->conn->next_stream = TO_OR_CIRCUIT(circ)->n_streams; pend->conn->on_circuit = circ; TO_OR_CIRCUIT(circ)->n_streams = pend->conn; + conflux_update_n_streams(TO_OR_CIRCUIT(circ), pend->conn);
connection_exit_connect(pend->conn); } else {