commit 0422eb26a70fc1450cc6b57902f189edc4eed10a Author: Mike Perry mikeperry-git@torproject.org Date: Tue Aug 10 21:35:46 2021 +0000
Prop#324: Hook up flow control --- src/app/main/main.c | 2 ++ src/core/mainloop/connection.c | 16 ++++++++++-- src/core/mainloop/mainloop.c | 7 ++++++ src/core/or/or.h | 3 +++ src/core/or/relay.c | 47 +++++++++++++++++++++++++++++++++--- src/core/or/sendme.c | 28 +++++++++++++++++++-- src/core/or/sendme.h | 2 +- src/feature/nodelist/networkstatus.c | 2 ++ 8 files changed, 99 insertions(+), 8 deletions(-)
diff --git a/src/app/main/main.c b/src/app/main/main.c index 89564490e6..0742abe70a 100644 --- a/src/app/main/main.c +++ b/src/app/main/main.c @@ -27,6 +27,7 @@ #include "core/or/channel.h" #include "core/or/channelpadding.h" #include "core/or/circuitpadding.h" +#include "core/or/congestion_control_flow.h" #include "core/or/circuitlist.h" #include "core/or/command.h" #include "core/or/connection_or.h" @@ -630,6 +631,7 @@ tor_init(int argc, char *argv[]) * until we get a consensus */ channelpadding_new_consensus_params(NULL); circpad_new_consensus_params(NULL); + flow_control_new_consensus_params(NULL);
/* Initialize circuit padding to defaults+torrc until we get a consensus */ circpad_machines_init(); diff --git a/src/core/mainloop/connection.c b/src/core/mainloop/connection.c index 48bea792ae..9271a70914 100644 --- a/src/core/mainloop/connection.c +++ b/src/core/mainloop/connection.c @@ -147,6 +147,8 @@ #include "feature/nodelist/routerinfo_st.h" #include "core/or/socks_request_st.h"
+#include "core/or/congestion_control_flow.h" + /** * On Windows and Linux we cannot reliably bind() a socket to an * address and port if: 1) There's already a socket bound to wildcard @@ -4594,9 +4596,9 @@ connection_handle_write_impl(connection_t *conn, int force) !dont_stop_writing) { /* it's done flushing */ if (connection_finished_flushing(conn) < 0) { /* already marked */ - return -1; + goto err; } - return 0; + goto done; }
/* Call even if result is 0, since the global write bucket may @@ -4606,7 +4608,17 @@ connection_handle_write_impl(connection_t *conn, int force) if (n_read > 0 && connection_is_reading(conn)) connection_consider_empty_read_buckets(conn);
+ done: + /* If this is an edge connection with congestion control, check to see + * if it is time to send an xon */ + if (conn_uses_flow_control(conn)) { + flow_control_decide_xon(TO_EDGE_CONN(conn), n_written); + } + return 0; + + err: + return -1; }
/* DOCDOC connection_handle_write */ diff --git a/src/core/mainloop/mainloop.c b/src/core/mainloop/mainloop.c index 37b53db92a..cd57dea3d4 100644 --- a/src/core/mainloop/mainloop.c +++ b/src/core/mainloop/mainloop.c @@ -641,6 +641,13 @@ connection_start_reading,(connection_t *conn)) if (connection_should_read_from_linked_conn(conn)) connection_start_reading_from_linked_conn(conn); } else { + if (CONN_IS_EDGE(conn) && TO_EDGE_CONN(conn)->xoff_received) { + /* We should not get called here if we're waiting for an XON, but + * belt-and-suspenders */ + log_notice(LD_NET, + "Request to start reading on an edgeconn blocked with XOFF"); + return; + } if (event_add(conn->read_event, NULL)) log_warn(LD_NET, "Error from libevent setting read event state for %d " "to watched: %s", diff --git a/src/core/or/or.h b/src/core/or/or.h index 99948f26e2..ad82130301 100644 --- a/src/core/or/or.h +++ b/src/core/or/or.h @@ -210,6 +210,9 @@ struct curve25519_public_key_t; #define RELAY_COMMAND_PADDING_NEGOTIATE 41 #define RELAY_COMMAND_PADDING_NEGOTIATED 42
+#define RELAY_COMMAND_XOFF 43 +#define RELAY_COMMAND_XON 44 + /* Reasons why an OR connection is closed. */ #define END_OR_CONN_REASON_DONE 1 #define END_OR_CONN_REASON_REFUSED 2 /* connection refused */ diff --git a/src/core/or/relay.c b/src/core/or/relay.c index e3d41d7bf0..0e889eb348 100644 --- a/src/core/or/relay.c +++ b/src/core/or/relay.c @@ -98,6 +98,7 @@ #include "core/or/socks_request_st.h" #include "core/or/sendme.h" #include "core/or/congestion_control_common.h" +#include "core/or/congestion_control_flow.h"
static edge_connection_t *relay_lookup_conn(circuit_t *circ, cell_t *cell, cell_direction_t cell_direction, @@ -1739,6 +1740,44 @@ handle_relay_cell_command(cell_t *cell, circuit_t *circ, sendme_connection_edge_consider_sending(conn); }
+ return 0; + case RELAY_COMMAND_XOFF: + if (!conn) { + if (CIRCUIT_IS_ORIGIN(circ)) { + origin_circuit_t *ocirc = TO_ORIGIN_CIRCUIT(circ); + if (relay_crypt_from_last_hop(ocirc, layer_hint) && + connection_half_edge_is_valid_data(ocirc->half_streams, + rh->stream_id)) { + circuit_read_valid_data(ocirc, rh->length); + } + } + return 0; + } + + if (circuit_process_stream_xoff(conn, layer_hint, cell)) { + if (CIRCUIT_IS_ORIGIN(circ)) { + circuit_read_valid_data(TO_ORIGIN_CIRCUIT(circ), rh->length); + } + } + return 0; + case RELAY_COMMAND_XON: + if (!conn) { + if (CIRCUIT_IS_ORIGIN(circ)) { + origin_circuit_t *ocirc = TO_ORIGIN_CIRCUIT(circ); + if (relay_crypt_from_last_hop(ocirc, layer_hint) && + connection_half_edge_is_valid_data(ocirc->half_streams, + rh->stream_id)) { + circuit_read_valid_data(ocirc, rh->length); + } + } + return 0; + } + + if (circuit_process_stream_xon(conn, layer_hint, cell)) { + if (CIRCUIT_IS_ORIGIN(circ)) { + circuit_read_valid_data(TO_ORIGIN_CIRCUIT(circ), rh->length); + } + } return 0; case RELAY_COMMAND_END: reason = rh->length > 0 ? @@ -2287,7 +2326,7 @@ connection_edge_package_raw_inbuf(edge_connection_t *conn, int package_partial, }
/* Handle the stream-level SENDME package window. */ - if (sendme_note_stream_data_packaged(conn) < 0) { + if (sendme_note_stream_data_packaged(conn, length) < 0) { connection_stop_reading(TO_CONN(conn)); log_debug(domain,"conn->package_window reached 0."); circuit_consider_stop_edge_reading(circ, cpath_layer); @@ -2402,7 +2441,8 @@ circuit_resume_edge_reading_helper(edge_connection_t *first_conn, /* Activate reading starting from the chosen stream */ for (conn=chosen_stream; conn; conn = conn->next_stream) { /* Start reading for the streams starting from here */ - if (conn->base_.marked_for_close || conn->package_window <= 0) + if (conn->base_.marked_for_close || conn->package_window <= 0 || + conn->xoff_received) continue; if (!layer_hint || conn->cpath_layer == layer_hint) { connection_start_reading(TO_CONN(conn)); @@ -2413,7 +2453,8 @@ circuit_resume_edge_reading_helper(edge_connection_t *first_conn, } /* Go back and do the ones we skipped, circular-style */ for (conn = first_conn; conn != chosen_stream; conn = conn->next_stream) { - if (conn->base_.marked_for_close || conn->package_window <= 0) + if (conn->base_.marked_for_close || conn->package_window <= 0 || + conn->xoff_received) continue; if (!layer_hint || conn->cpath_layer == layer_hint) { connection_start_reading(TO_CONN(conn)); diff --git a/src/core/or/sendme.c b/src/core/or/sendme.c index 900490a892..ee670f9d51 100644 --- a/src/core/or/sendme.c +++ b/src/core/or/sendme.c @@ -22,6 +22,7 @@ #include "core/or/relay.h" #include "core/or/sendme.h" #include "core/or/congestion_control_common.h" +#include "core/or/congestion_control_flow.h" #include "feature/nodelist/networkstatus.h" #include "lib/ctime/di_ops.h" #include "trunnel/sendme_cell.h" @@ -370,6 +371,10 @@ sendme_connection_edge_consider_sending(edge_connection_t *conn)
int log_domain = TO_CONN(conn)->type == CONN_TYPE_AP ? LD_APP : LD_EXIT;
+ /* If we use flow control, we do not send stream sendmes */ + if (edge_uses_flow_control(conn)) + goto end; + /* Don't send it if we still have data to deliver. */ if (connection_outbuf_too_full(TO_CONN(conn))) { goto end; @@ -546,6 +551,12 @@ sendme_process_stream_level(edge_connection_t *conn, circuit_t *circ, tor_assert(conn); tor_assert(circ);
+ if (edge_uses_flow_control(conn)) { + log_fn(LOG_PROTOCOL_WARN, LD_EDGE, + "Congestion control got stream sendme"); + return -END_CIRC_REASON_TORPROTOCOL; + } + /* Don't allow the other endpoint to request more than our maximum (i.e. * initial) stream SENDME window worth of data. Well-behaved stock clients * will not request more than this max (as per the check in the while loop @@ -603,7 +614,12 @@ int sendme_stream_data_received(edge_connection_t *conn) { tor_assert(conn); - return --conn->deliver_window; + + if (edge_uses_flow_control(conn)) { + return flow_control_decide_xoff(conn); + } else { + return --conn->deliver_window; + } }
/* Called when a relay DATA cell is packaged on the given circuit. If @@ -651,10 +667,18 @@ sendme_note_circuit_data_packaged(circuit_t *circ, crypt_path_t *layer_hint) /* Called when a relay DATA cell is packaged for the given edge connection * conn. Update the package window and return its new value. */ int -sendme_note_stream_data_packaged(edge_connection_t *conn) +sendme_note_stream_data_packaged(edge_connection_t *conn, size_t len) { tor_assert(conn);
+ if (edge_uses_flow_control(conn)) { + flow_control_note_sent_data(conn, len); + if (conn->xoff_received) + return -1; + else + return 1; + } + --conn->package_window; log_debug(LD_APP, "Stream package_window now %d.", conn->package_window); return conn->package_window; diff --git a/src/core/or/sendme.h b/src/core/or/sendme.h index c224d0a921..2abec91a91 100644 --- a/src/core/or/sendme.h +++ b/src/core/or/sendme.h @@ -33,7 +33,7 @@ int sendme_circuit_data_received(circuit_t *circ, crypt_path_t *layer_hint); /* Update package window functions. */ int sendme_note_circuit_data_packaged(circuit_t *circ, crypt_path_t *layer_hint); -int sendme_note_stream_data_packaged(edge_connection_t *conn); +int sendme_note_stream_data_packaged(edge_connection_t *conn, size_t len);
/* Record cell digest on circuit. */ void sendme_record_cell_digest_on_circ(circuit_t *circ, crypt_path_t *cpath); diff --git a/src/feature/nodelist/networkstatus.c b/src/feature/nodelist/networkstatus.c index 7a1e73ef60..0138dff033 100644 --- a/src/feature/nodelist/networkstatus.c +++ b/src/feature/nodelist/networkstatus.c @@ -45,6 +45,7 @@ #include "core/or/channel.h" #include "core/or/channelpadding.h" #include "core/or/circuitpadding.h" +#include "core/or/congestion_control_flow.h" #include "core/or/circuitmux.h" #include "core/or/circuitmux_ewma.h" #include "core/or/circuitstats.h" @@ -1699,6 +1700,7 @@ notify_after_networkstatus_changes(void) channelpadding_new_consensus_params(c); circpad_new_consensus_params(c); router_new_consensus_params(c); + flow_control_new_consensus_params(c);
/* Maintenance of our L2 guard list */ maintain_layer2_guards();