commit 047d3214bfb46de07e5d9f223e4fb1ba24584c8a Author: David Fifield david@bamsoftware.com Date: Fri Apr 24 13:30:13 2020 -0600
Wait for data channel OnOpen before returning from NewWebRTCPeer.
Now callers cannot call Write without there being a DataChannel to write to. This lets us remove the internal buffer and checks for transport == nil.
Don't set internal fields like writePipe, transport, and pc to nil when closing; just close them and let them return errors if further calls are made on them.
There's now a constant DataChannelTimeout that's separate from SnowflakeTimeout (the latter is what checkForStaleness uses). Now we can set DataChannel timeout to a lower value, to quickly dispose of unconnectable proxies, while still keeping the threshold for detecting the failure of a once-working proxy at 30 seconds.
https://bugs.torproject.org/33897 --- client/lib/snowflake.go | 2 ++ client/lib/webrtc.go | 96 +++++++++++++------------------------------------ 2 files changed, 26 insertions(+), 72 deletions(-)
diff --git a/client/lib/snowflake.go b/client/lib/snowflake.go index 27991b2..0076b79 100644 --- a/client/lib/snowflake.go +++ b/client/lib/snowflake.go @@ -17,6 +17,8 @@ import ( const ( ReconnectTimeout = 10 * time.Second SnowflakeTimeout = 30 * time.Second + // How long to wait for the OnOpen callback on a DataChannel. + DataChannelTimeout = 30 * time.Second )
type dummyAddr struct{} diff --git a/client/lib/webrtc.go b/client/lib/webrtc.go index b4c0aad..edc8ab4 100644 --- a/client/lib/webrtc.go +++ b/client/lib/webrtc.go @@ -1,7 +1,6 @@ package lib
import ( - "bytes" "crypto/rand" "encoding/hex" "errors" @@ -25,12 +24,10 @@ type WebRTCPeer struct { recvPipe *io.PipeReader writePipe *io.PipeWriter lastReceive time.Time - buffer bytes.Buffer
closed bool
- lock sync.Mutex // Synchronization for DataChannel destruction - once sync.Once // Synchronization for PeerConnection destruction + once sync.Once // Synchronization for PeerConnection destruction
BytesLogger BytesLogger } @@ -70,16 +67,11 @@ func (c *WebRTCPeer) Read(b []byte) (int, error) { // Writes bytes out to remote WebRTC. // As part of |io.ReadWriter| func (c *WebRTCPeer) Write(b []byte) (int, error) { - c.lock.Lock() - defer c.lock.Unlock() - c.BytesLogger.AddOutbound(len(b)) - // TODO: Buffering could be improved / separated out of WebRTCPeer. - if nil == c.transport { - log.Printf("Buffered %d bytes --> WebRTC", len(b)) - c.buffer.Write(b) - } else { - c.transport.Send(b) + err := c.transport.Send(b) + if err != nil { + return 0, err } + c.BytesLogger.AddOutbound(len(b)) return len(b), nil }
@@ -127,8 +119,9 @@ func (c *WebRTCPeer) connect(config *webrtc.Configuration, broker *BrokerChannel log.Println("WebRTC: Unable to SetRemoteDescription:", err) return err } - err = c.establishDataChannel() + c.transport, err = c.establishDataChannel() if err != nil { + log.Printf("establishDataChannel: %v", err) // nolint: golint return errors.New("WebRTC: Could not establish DataChannel") } @@ -177,13 +170,9 @@ func preparePeerConnection(config *webrtc.Configuration) (*webrtc.PeerConnection return pc, nil }
-// Create a WebRTC DataChannel locally. -func (c *WebRTCPeer) establishDataChannel() error { - c.lock.Lock() - defer c.lock.Unlock() - if c.transport != nil { - panic("Unexpected datachannel already exists!") - } +// Create a WebRTC DataChannel locally. Blocks until the data channel is open, +// or a timeout or error occurs. +func (c *WebRTCPeer) establishDataChannel() (*webrtc.DataChannel, error) { ordered := true dataChannelOptions := &webrtc.DataChannelInit{ Ordered: &ordered, @@ -191,41 +180,15 @@ func (c *WebRTCPeer) establishDataChannel() error { dc, err := c.pc.CreateDataChannel(c.id, dataChannelOptions) if err != nil { log.Printf("CreateDataChannel ERROR: %s", err) - return err + return nil, err } + openChannel := make(chan struct{}) dc.OnOpen(func() { - c.lock.Lock() - defer c.lock.Unlock() log.Println("WebRTC: DataChannel.OnOpen") - if nil != c.transport { - panic("WebRTC: transport already exists.") - } - // Flush buffered outgoing SOCKS data if necessary. - if c.buffer.Len() > 0 { - dc.Send(c.buffer.Bytes()) - log.Println("Flushed", c.buffer.Len(), "bytes.") - c.buffer.Reset() - } - // Then enable the datachannel. - c.transport = dc + close(openChannel) }) dc.OnClose(func() { - c.lock.Lock() - // Future writes will go to the buffer until a new DataChannel is available. - if nil == c.transport { - // Closed locally, as part of a reset. - log.Println("WebRTC: DataChannel.OnClose [locally]") - c.lock.Unlock() - return - } - // Closed remotely, need to reset everything. - // Disable the DataChannel as a write destination. - log.Println("WebRTC: DataChannel.OnClose [remotely]") - c.transport = nil - dc.Close() - // Unlock before Close'ing, since it calls cleanup and asks for the - // lock to check if the transport needs to be be deleted. - c.lock.Unlock() + log.Println("WebRTC: DataChannel.OnClose") c.Close() }) dc.OnMessage(func(msg webrtc.DataChannelMessage) { @@ -244,7 +207,14 @@ func (c *WebRTCPeer) establishDataChannel() error { c.lastReceive = time.Now() }) log.Println("WebRTC: DataChannel created.") - return nil + + select { + case <-openChannel: + return dc, nil + case <-time.After(DataChannelTimeout): + dc.Close() + return nil, errors.New("timeout waiting for DataChannel.OnOpen") + } }
// exchangeSDP sends the local SDP offer to the Broker, awaits the SDP answer, @@ -266,27 +236,10 @@ func exchangeSDP(broker *BrokerChannel, offer *webrtc.SessionDescription) *webrt // Close all channels and transports func (c *WebRTCPeer) cleanup() { // Close this side of the SOCKS pipe. - if nil != c.writePipe { - c.writePipe.Close() - c.writePipe = nil - } - c.lock.Lock() + c.writePipe.Close() if nil != c.transport { log.Printf("WebRTC: closing DataChannel") - dataChannel := c.transport - // Setting transport to nil *before* dc Close indicates to OnClose that - // this was locally triggered. - c.transport = nil - // Release the lock before calling DeleteDataChannel (which in turn - // calls Close on the dataChannel), but after nil'ing out the transport, - // since otherwise we'll end up in the onClose handler in a deadlock. - c.lock.Unlock() - if c.pc == nil { - panic("DataChannel w/o PeerConnection, not good.") - } - dataChannel.Close() - } else { - c.lock.Unlock() + c.transport.Close() } if nil != c.pc { log.Printf("WebRTC: closing PeerConnection") @@ -294,6 +247,5 @@ func (c *WebRTCPeer) cleanup() { if nil != err { log.Printf("Error closing peerconnection...") } - c.pc = nil } }
tor-commits@lists.torproject.org