commit 7c9005bed3e353c4e108355abd1ed4b35099f2ea Author: Cecylia Bocovich cohosh@torproject.org Date: Wed May 12 09:32:07 2021 -0400
Ensure turbotunnel read and write loop terminate
Introduce a waitgroup and done channel to ensure that both the read and write gorouting for turbotunnel connections terminate when the connection is closed. --- common/turbotunnel/clientmap.go | 1 + server/lib/http.go | 40 ++++++++++++++++++++++++++-------------- 2 files changed, 27 insertions(+), 14 deletions(-)
diff --git a/common/turbotunnel/clientmap.go b/common/turbotunnel/clientmap.go index fa12915..53d0302 100644 --- a/common/turbotunnel/clientmap.go +++ b/common/turbotunnel/clientmap.go @@ -140,5 +140,6 @@ func (inner *clientMapInner) Pop() interface{} { inner.byAge = inner.byAge[:n-1] // Remove from byAddr map. delete(inner.byAddr, record.Addr) + close(record.SendQueue) return record } diff --git a/server/lib/http.go b/server/lib/http.go index b1c453c..c612422 100644 --- a/server/lib/http.go +++ b/server/lib/http.go @@ -8,6 +8,7 @@ import ( "log" "net" "net/http" + "sync" "time"
"git.torproject.org/pluggable-transports/snowflake.git/common/encapsulation" @@ -139,18 +140,21 @@ func turbotunnelMode(conn net.Conn, addr net.Addr, pconn *turbotunnel.QueuePacke // credited for the entire KCP session. clientIDAddrMap.Set(clientID, addr.String())
- errCh := make(chan error) + var wg sync.WaitGroup + wg.Add(2) + done := make(chan struct{})
// The remainder of the WebSocket stream consists of encapsulated // packets. We read them one by one and feed them into the // QueuePacketConn on which kcp.ServeConn was set up, which eventually // leads to KCP-level sessions in the acceptSessions function. go func() { + defer wg.Done() + defer close(done) // Signal the write loop to finish for { p, err := encapsulation.ReadData(conn) if err != nil { - errCh <- err - break + return } pconn.QueueIncoming(p, clientID) } @@ -159,24 +163,32 @@ func turbotunnelMode(conn net.Conn, addr net.Addr, pconn *turbotunnel.QueuePacke // At the same time, grab packets addressed to this ClientID and // encapsulate them into the downstream. go func() { + defer wg.Done() + defer conn.Close() // Signal the read loop to finish + // Buffer encapsulation.WriteData operations to keep length // prefixes in the same send as the data that follows. bw := bufio.NewWriter(conn) - for p := range pconn.OutgoingQueue(clientID) { - _, err := encapsulation.WriteData(bw, p) - if err == nil { - err = bw.Flush() - } - if err != nil { - errCh <- err - break + for { + select { + case <-done: + return + case p, ok := <-pconn.OutgoingQueue(clientID): + if !ok { + return + } + _, err := encapsulation.WriteData(bw, p) + if err == nil { + err = bw.Flush() + } + if err != nil { + return + } } } }()
- // Wait until one of the above loops terminates. The closing of the - // WebSocket connection will terminate the other one. - <-errCh + wg.Wait()
return nil }