This is an automated email from the git hooks/post-receive script.
dcf pushed a commit to branch turbotunnel in repository pluggable-transports/meek.
commit 0b83ea61d699eaed82af0b6e1c1a8a70d92f44fe Author: David Fifield david@bamsoftware.com AuthorDate: Thu Oct 27 18:54:21 2022 -0600
Import server send loop from Champa.
Enforces maxPayloadLength (by stashing packets that exceed the limit) and should have lower latency because it does not wait for the maximum delay every time. --- meek-server/meek-server.go | 70 ++++++++++++++++++++++++++++++++++++---------- 1 file changed, 55 insertions(+), 15 deletions(-)
diff --git a/meek-server/meek-server.go b/meek-server/meek-server.go index f971ab8..f715107 100644 --- a/meek-server/meek-server.go +++ b/meek-server/meek-server.go @@ -55,7 +55,7 @@ const ( // chunk of data we'll send back in a response. maxPayloadLength = 0x10000 // How long we try to read from the OR port before closing a response. - turnaroundTimeout = 10 * time.Millisecond + turnaroundTimeout = 100 * time.Millisecond // Passed as ReadTimeout and WriteTimeout when constructing the // http.Server. readWriteTimeout = 20 * time.Second @@ -169,26 +169,66 @@ func (state *State) Post(w http.ResponseWriter, req *http.Request) { state.conn.QueueIncoming(p, clientID) }
- // Write outgoing packets, if any, up to turnaroundTimeout. w.Header().Set("Content-Type", "application/octet-stream") - outgoing := state.conn.OutgoingQueue(clientID) + // Write outgoing packets, if any. We wait up to turnaroundTimeout for + // the first available packet; after that we only include whatever + // packets are immediately available. + limit := maxPayloadLength timer := time.NewTimer(turnaroundTimeout) -loop: + defer timer.Stop() + first := true for { + var p []byte + unstash := state.conn.Unstash(clientID) + outgoing := state.conn.OutgoingQueue(clientID) + // Prioritize taking a packet first from the stash, then from + // the outgoing queue, then finally check for expiration of the + // timer. (We continue to bundle packets even after the timer + // expires, as long as the packets are immediately available.) select { - case <-timer.C: - break loop - case p := <-outgoing: - _, err := encapsulation.WriteData(w, p) - if err != nil { - break loop - } - // Flush after each chunk, this is important for - // latency. - if w, ok := w.(http.Flusher); ok { - w.Flush() + case p = <-unstash: + default: + select { + case p = <-unstash: + case p = <-outgoing: + default: + select { + case p = <-unstash: + case p = <-outgoing: + case <-timer.C: + } } } + // We wait for the first packet only. Later packets must be + // immediately available. + timer.Reset(0) + + if len(p) == 0 { + // Timer expired, we are done bundling packets into this + // response. + break + } + + limit -= len(p) + if !first && limit < 0 { + // This packet doesn't fit in the payload size limit. + // Stash it so that it will be first in line for the + // next response. + state.conn.Stash(p, clientID) + break + } + first = false + + // Write the packet to the HTTP response. + _, err := encapsulation.WriteData(w, p) + if err != nil { + log.Printf("encapsulation.WriteData: %v", err) + break + } + // Flush after each chunk, this is important for latency. + if rw, ok := w.(http.Flusher); ok { + rw.Flush() + } } }