This is an automated email from the git hooks/post-receive script.
ahf pushed a commit to branch main in repository tor.
commit 2bd1eca78c7771b9d16bb2a670420987c4a36c91 Author: Mike Perry mikeperry-git@torproject.org AuthorDate: Wed Dec 14 21:03:52 2022 +0000
Prop#329 Algs: Conflux multiplexed cell receive handling --- src/core/or/conflux.c | 260 ++++++++++++++++++++++++++++++++++++++++++++++++++ src/core/or/relay.c | 80 ++++++++++++++-- 2 files changed, 331 insertions(+), 9 deletions(-)
diff --git a/src/core/or/conflux.c b/src/core/or/conflux.c new file mode 100644 index 0000000000..6179fea279 --- /dev/null +++ b/src/core/or/conflux.c @@ -0,0 +1,260 @@ +/* Copyright (c) 2021, The Tor Project, Inc. */ +/* See LICENSE for licensing information */ + +/** + * \file conflux.c + * \brief Conflux multipath core algorithms + */ + +#define TOR_CONFLUX_PRIVATE + +#include "core/or/or.h" + +#include "core/or/circuit_st.h" +#include "core/or/sendme.h" +#include "core/or/congestion_control_common.h" +#include "core/or/congestion_control_st.h" +#include "core/or/origin_circuit_st.h" +#include "core/or/circuitlist.h" +#include "core/or/circuituse.h" +#include "core/or/conflux.h" +#include "core/or/conflux_params.h" +#include "core/or/conflux_util.h" +#include "core/or/conflux_st.h" +#include "lib/time/compat_time.h" +#include "app/config/config.h" + +#include "trunnel/extension.h" + +/** One million microseconds in a second */ +#define USEC_PER_SEC 1000000 + +/** + * Determine if we should multiplex a specific relay command or not. + * + * TODO: Version of this that is the set of forbidden commands + * on linked circuits + */ +bool +conflux_should_multiplex(int relay_command) +{ + switch (relay_command) { + /* These are all fine to multiplex, and must be + * so that ordering is preserved */ + case RELAY_COMMAND_BEGIN: + case RELAY_COMMAND_DATA: + case RELAY_COMMAND_END: + case RELAY_COMMAND_CONNECTED: + return true; + + /* We can't multiplex these because they are + * circuit-specific */ + case RELAY_COMMAND_SENDME: + case RELAY_COMMAND_EXTEND: + case RELAY_COMMAND_EXTENDED: + case RELAY_COMMAND_TRUNCATE: + case RELAY_COMMAND_TRUNCATED: + case RELAY_COMMAND_DROP: + return false; + + /* We must multiplex RESOLVEs because their ordering + * impacts begin/end. */ + case RELAY_COMMAND_RESOLVE: + case RELAY_COMMAND_RESOLVED: + return true; + + /* These are all circuit-specific */ + case RELAY_COMMAND_BEGIN_DIR: + case RELAY_COMMAND_EXTEND2: + case RELAY_COMMAND_EXTENDED2: + case RELAY_COMMAND_ESTABLISH_INTRO: + case RELAY_COMMAND_ESTABLISH_RENDEZVOUS: + case RELAY_COMMAND_INTRODUCE1: + case RELAY_COMMAND_INTRODUCE2: + case RELAY_COMMAND_RENDEZVOUS1: + case RELAY_COMMAND_RENDEZVOUS2: + case RELAY_COMMAND_INTRO_ESTABLISHED: + case RELAY_COMMAND_RENDEZVOUS_ESTABLISHED: + case RELAY_COMMAND_INTRODUCE_ACK: + case RELAY_COMMAND_PADDING_NEGOTIATE: + case RELAY_COMMAND_PADDING_NEGOTIATED: + return false; + + /* These must be multiplexed because their ordering + * relative to BEGIN/END must be preserved */ + case RELAY_COMMAND_XOFF: + case RELAY_COMMAND_XON: + return true; + + /* These two are not multiplexed, because they must + * be processed immediately to update sequence numbers + * before any other cells are processed on the circuit */ + case RELAY_COMMAND_CONFLUX_SWITCH: + case RELAY_COMMAND_CONFLUX_LINK: + case RELAY_COMMAND_CONFLUX_LINKED: + case RELAY_COMMAND_CONFLUX_LINKED_ACK: + return false; + + default: + log_warn(LD_BUG, "Conflux asked to multiplex unknown relay command %d", + relay_command); + return false; + } +} + +/** Return the leg for a circuit in a conflux set. Return NULL if not found. */ +conflux_leg_t * +conflux_get_leg(conflux_t *cfx, const circuit_t *circ) +{ + conflux_leg_t *leg_found = NULL; + + // Find the leg that the cell is written on + CONFLUX_FOR_EACH_LEG_BEGIN(cfx, leg) { + if (leg->circ == circ) { + leg_found = leg; + break; + } + } CONFLUX_FOR_EACH_LEG_END(leg); + + return leg_found; +} + +/** + * Comparison function for ooo_q pqueue. + * + * Ensures that lower sequence numbers are at the head of the pqueue. + */ +static int +conflux_queue_cmp(const void *a, const void *b) +{ + // Compare a and b as conflux_cell_t using the seq field, and return a + // comparison result such that the lowest seq is at the head of the pqueue. + const conflux_cell_t *cell_a = a; + const conflux_cell_t *cell_b = b; + + tor_assert(cell_a); + tor_assert(cell_b); + + if (cell_a->seq < cell_b->seq) { + return -1; + } else if (cell_a->seq > cell_b->seq) { + return 1; + } else { + return 0; + } +} + +/** + * Get the congestion control object for a conflux circuit. + * + * Because conflux can only be negotiated with the last hop, we + * can use the last hop of the cpath to obtain the congestion + * control object for origin circuits. For non-origin circuits, + * we can use the circuit itself. + */ +const congestion_control_t * +circuit_ccontrol(const circuit_t *circ) +{ + const congestion_control_t *ccontrol = NULL; + tor_assert(circ); + + if (CIRCUIT_IS_ORIGIN(circ)) { + tor_assert(CONST_TO_ORIGIN_CIRCUIT(circ)->cpath); + tor_assert(CONST_TO_ORIGIN_CIRCUIT(circ)->cpath->prev); + ccontrol = CONST_TO_ORIGIN_CIRCUIT(circ)->cpath->prev->ccontrol; + } else { + ccontrol = circ->ccontrol; + } + + /* Conflux circuits always have congestion control*/ + tor_assert(ccontrol); + return ccontrol; +} + +/** + * Process an incoming relay cell for conflux. Called from + * connection_edge_process_relay_cell(). + * + * Returns true if the conflux system now has well-ordered cells to deliver + * to streams, false otherwise. + */ +bool +conflux_process_cell(conflux_t *cfx, circuit_t *in_circ, + crypt_path_t *layer_hint, cell_t *cell) +{ + // TODO-329-TUNING: Temporarily validate legs here. We can remove + // this after tuning is complete. + conflux_validate_legs(cfx); + + conflux_leg_t *leg = conflux_get_leg(cfx, in_circ); + if (!leg) { + log_warn(LD_BUG, "Got a conflux cell on a circuit without " + "conflux leg. Closing circuit."); + circuit_mark_for_close(in_circ, END_CIRC_REASON_INTERNAL); + return false; + } + + /* We need to make sure this cell came from the expected hop, or + * else it could be a data corruption attack from a middle node. */ + if (!conflux_validate_source_hop(in_circ, layer_hint)) { + circuit_mark_for_close(in_circ, END_CIRC_REASON_TORPROTOCOL); + return false; + } + + /* Update the running absolute sequence number */ + leg->last_seq_recv++; + + /* If this cell is next, fast-path it by processing the cell in-place */ + if (leg->last_seq_recv == cfx->last_seq_delivered + 1) { + /* The cell is now ready to be processed, and rest of the queue should + * now be checked for remaining elements */ + cfx->last_seq_delivered++; + return true; + } else if (BUG(leg->last_seq_recv <= cfx->last_seq_delivered)) { + log_warn(LD_BUG, "Got a conflux cell with a sequence number " + "less than the last delivered. Closing circuit."); + circuit_mark_for_close(in_circ, END_CIRC_REASON_INTERNAL); + return false; + } else { + conflux_cell_t *c_cell = tor_malloc_zero(sizeof(conflux_cell_t)); + c_cell->seq = leg->last_seq_recv; + + memcpy(&c_cell->cell, cell, sizeof(cell_t)); + + smartlist_pqueue_add(cfx->ooo_q, conflux_queue_cmp, + offsetof(conflux_cell_t, heap_idx), c_cell); + total_ooo_q_bytes += sizeof(cell_t); + + /* This cell should not be processed yet, and the queue is not ready + * to process because the next absolute seqnum has not yet arrived */ + return false; + } +} + +/** + * Dequeue the top cell from our queue. + * + * Returns the cell as a conflux_cell_t, or NULL if the queue is empty + * or has a hole. + */ +conflux_cell_t * +conflux_dequeue_cell(conflux_t *cfx) +{ + conflux_cell_t *top = NULL; + if (smartlist_len(cfx->ooo_q) == 0) + return NULL; + + top = smartlist_get(cfx->ooo_q, 0); + + /* If the top cell is the next sequence number we need, then + * pop and return it. */ + if (top->seq == cfx->last_seq_delivered+1) { + smartlist_pqueue_pop(cfx->ooo_q, conflux_queue_cmp, + offsetof(conflux_cell_t, heap_idx)); + total_ooo_q_bytes -= sizeof(cell_t); + cfx->last_seq_delivered++; + return top; + } else { + return NULL; + } +} diff --git a/src/core/or/relay.c b/src/core/or/relay.c index 7929f57ee6..26e52b0d95 100644 --- a/src/core/or/relay.c +++ b/src/core/or/relay.c @@ -99,6 +99,7 @@ #include "core/or/sendme.h" #include "core/or/congestion_control_common.h" #include "core/or/congestion_control_flow.h" +#include "core/or/conflux.h"
static edge_connection_t *relay_lookup_conn(circuit_t *circ, cell_t *cell, cell_direction_t cell_direction, @@ -116,6 +117,11 @@ static void adjust_exit_policy_from_exitpolicy_failure(origin_circuit_t *circ, entry_connection_t *conn, node_t *node, const tor_addr_t *addr); +static int connection_edge_process_ordered_relay_cell(cell_t *cell, + circuit_t *circ, + edge_connection_t *conn, + crypt_path_t *layer_hint, + relay_header_t *rh);
/** Stats: how many relay cells have originated at this hop, or have * been relayed onward (not recognized at this hop)? @@ -610,7 +616,7 @@ pad_cell_payload(uint8_t *cell_payload, size_t data_len) * return 0. */ MOCK_IMPL(int, -relay_send_command_from_edge_,(streamid_t stream_id, circuit_t *circ, +relay_send_command_from_edge_,(streamid_t stream_id, circuit_t *orig_circ, uint8_t relay_command, const char *payload, size_t payload_len, crypt_path_t *cpath_layer, const char *filename, int lineno)) @@ -640,6 +646,7 @@ relay_send_command_from_edge_,(streamid_t stream_id, circuit_t *circ, rh.stream_id = stream_id; rh.length = payload_len; relay_header_pack(cell.payload, &rh); + if (payload_len) memcpy(cell.payload+RELAY_HEADER_SIZE, payload, payload_len);
@@ -2051,9 +2058,6 @@ connection_edge_process_relay_cell(cell_t *cell, circuit_t *circ, static int num_seen=0; relay_header_t rh; unsigned domain = layer_hint?LD_APP:LD_EXIT; - int optimistic_data = 0; /* Set to 1 if we receive data on a stream - * that's in the EXIT_CONN_STATE_RESOLVING - * or EXIT_CONN_STATE_CONNECTING states. */
tor_assert(cell); tor_assert(circ); @@ -2086,8 +2090,66 @@ connection_edge_process_relay_cell(cell_t *cell, circuit_t *circ, } }
+ /* Conflux handling: If conflux is disabled, or the relay command is not + * multiplexed across circuits, then process it immediately. + * + * Otherwise, we need to process the relay cell against our conflux + * queues, and if doing so results in ordered cells to deliver, we + * dequeue and process those in-order until there are no more. + */ + if (!circ->conflux || !conflux_should_multiplex(rh.command)) { + return connection_edge_process_ordered_relay_cell(cell, circ, conn, + layer_hint, &rh); + } else { + // If conflux says this cell is in-order, then begin processing + // cells from queue until there are none. Otherwise, we do nothing + // until further cells arrive. + if (conflux_process_cell(circ->conflux, circ, layer_hint, cell)) { + conflux_cell_t *c_cell = NULL; + int ret = 0; + + /* First, process this cell */ + if ((ret = connection_edge_process_ordered_relay_cell(cell, circ, conn, + layer_hint, &rh)) < 0) { + return ret; + } + + /* Now, check queue for more */ + while ((c_cell = conflux_dequeue_cell(circ->conflux))) { + relay_header_unpack(&rh, c_cell->cell.payload); + conn = relay_lookup_conn(circ, &c_cell->cell, CELL_DIRECTION_OUT, + layer_hint); + if ((ret = connection_edge_process_ordered_relay_cell(&c_cell->cell, + circ, conn, layer_hint, + &rh)) < 0) { + /* Negative return value is a fatal error. Return early and tear down + * circuit */ + tor_free(c_cell); + return ret; + } + tor_free(c_cell); + } + } + } + + return 0; +} + +/** + * Helper function to process a relay cell that is in the proper order + * for processing right now. */ +static int +connection_edge_process_ordered_relay_cell(cell_t *cell, circuit_t *circ, + edge_connection_t *conn, + crypt_path_t *layer_hint, + relay_header_t *rh) +{ + int optimistic_data = 0; /* Set to 1 if we receive data on a stream + * that's in the EXIT_CONN_STATE_RESOLVING + * or EXIT_CONN_STATE_CONNECTING states. */ + /* Tell circpad that we've received a recognized cell */ - circpad_deliver_recognized_relay_cell_events(circ, rh.command, layer_hint); + circpad_deliver_recognized_relay_cell_events(circ, rh->command, layer_hint);
/* either conn is NULL, in which case we've got a control cell, or else * conn points to the recognized stream. */ @@ -2095,22 +2157,22 @@ connection_edge_process_relay_cell(cell_t *cell, circuit_t *circ, if (conn->base_.type == CONN_TYPE_EXIT && (conn->base_.state == EXIT_CONN_STATE_CONNECTING || conn->base_.state == EXIT_CONN_STATE_RESOLVING) && - rh.command == RELAY_COMMAND_DATA) { + rh->command == RELAY_COMMAND_DATA) { /* Allow DATA cells to be delivered to an exit node in state * EXIT_CONN_STATE_CONNECTING or EXIT_CONN_STATE_RESOLVING. * This speeds up HTTP, for example. */ optimistic_data = 1; - } else if (rh.stream_id == 0 && rh.command == RELAY_COMMAND_DATA) { + } else if (rh->stream_id == 0 && rh->command == RELAY_COMMAND_DATA) { log_warn(LD_BUG, "Somehow I had a connection that matched a " "data cell with stream ID 0."); } else { return connection_edge_process_relay_cell_not_open( - &rh, cell, circ, conn, layer_hint); + rh, cell, circ, conn, layer_hint); } }
return handle_relay_cell_command(cell, circ, conn, layer_hint, - &rh, optimistic_data); + rh, optimistic_data); }
/** How many relay_data cells have we built, ever? */