[tor-commits] [snowflake/master] Wait for data channel OnOpen before returning from NewWebRTCPeer.

dcf at torproject.org dcf at torproject.org
Tue Apr 28 03:12:49 UTC 2020


commit 047d3214bfb46de07e5d9f223e4fb1ba24584c8a
Author: David Fifield <david at bamsoftware.com>
Date:   Fri Apr 24 13:30:13 2020 -0600

    Wait for data channel OnOpen before returning from NewWebRTCPeer.
    
    Now callers cannot call Write without there being a DataChannel to write
    to. This lets us remove the internal buffer and checks for transport ==
    nil.
    
    Don't set internal fields like writePipe, transport, and pc to nil when
    closing; just close them and let them return errors if further calls are
    made on them.
    
    There's now a constant DataChannelTimeout that's separate from
    SnowflakeTimeout (the latter is what checkForStaleness uses). Now we can
    set DataChannel timeout to a lower value, to quickly dispose of
    unconnectable proxies, while still keeping the threshold for detecting
    the failure of a once-working proxy at 30 seconds.
    
    https://bugs.torproject.org/33897
---
 client/lib/snowflake.go |  2 ++
 client/lib/webrtc.go    | 96 +++++++++++++------------------------------------
 2 files changed, 26 insertions(+), 72 deletions(-)

diff --git a/client/lib/snowflake.go b/client/lib/snowflake.go
index 27991b2..0076b79 100644
--- a/client/lib/snowflake.go
+++ b/client/lib/snowflake.go
@@ -17,6 +17,8 @@ import (
 const (
 	ReconnectTimeout = 10 * time.Second
 	SnowflakeTimeout = 30 * time.Second
+	// How long to wait for the OnOpen callback on a DataChannel.
+	DataChannelTimeout = 30 * time.Second
 )
 
 type dummyAddr struct{}
diff --git a/client/lib/webrtc.go b/client/lib/webrtc.go
index b4c0aad..edc8ab4 100644
--- a/client/lib/webrtc.go
+++ b/client/lib/webrtc.go
@@ -1,7 +1,6 @@
 package lib
 
 import (
-	"bytes"
 	"crypto/rand"
 	"encoding/hex"
 	"errors"
@@ -25,12 +24,10 @@ type WebRTCPeer struct {
 	recvPipe    *io.PipeReader
 	writePipe   *io.PipeWriter
 	lastReceive time.Time
-	buffer      bytes.Buffer
 
 	closed bool
 
-	lock sync.Mutex // Synchronization for DataChannel destruction
-	once sync.Once  // Synchronization for PeerConnection destruction
+	once sync.Once // Synchronization for PeerConnection destruction
 
 	BytesLogger BytesLogger
 }
@@ -70,16 +67,11 @@ func (c *WebRTCPeer) Read(b []byte) (int, error) {
 // Writes bytes out to remote WebRTC.
 // As part of |io.ReadWriter|
 func (c *WebRTCPeer) Write(b []byte) (int, error) {
-	c.lock.Lock()
-	defer c.lock.Unlock()
-	c.BytesLogger.AddOutbound(len(b))
-	// TODO: Buffering could be improved / separated out of WebRTCPeer.
-	if nil == c.transport {
-		log.Printf("Buffered %d bytes --> WebRTC", len(b))
-		c.buffer.Write(b)
-	} else {
-		c.transport.Send(b)
+	err := c.transport.Send(b)
+	if err != nil {
+		return 0, err
 	}
+	c.BytesLogger.AddOutbound(len(b))
 	return len(b), nil
 }
 
@@ -127,8 +119,9 @@ func (c *WebRTCPeer) connect(config *webrtc.Configuration, broker *BrokerChannel
 		log.Println("WebRTC: Unable to SetRemoteDescription:", err)
 		return err
 	}
-	err = c.establishDataChannel()
+	c.transport, err = c.establishDataChannel()
 	if err != nil {
+		log.Printf("establishDataChannel: %v", err)
 		// nolint: golint
 		return errors.New("WebRTC: Could not establish DataChannel")
 	}
@@ -177,13 +170,9 @@ func preparePeerConnection(config *webrtc.Configuration) (*webrtc.PeerConnection
 	return pc, nil
 }
 
-// Create a WebRTC DataChannel locally.
-func (c *WebRTCPeer) establishDataChannel() error {
-	c.lock.Lock()
-	defer c.lock.Unlock()
-	if c.transport != nil {
-		panic("Unexpected datachannel already exists!")
-	}
+// Create a WebRTC DataChannel locally. Blocks until the data channel is open,
+// or a timeout or error occurs.
+func (c *WebRTCPeer) establishDataChannel() (*webrtc.DataChannel, error) {
 	ordered := true
 	dataChannelOptions := &webrtc.DataChannelInit{
 		Ordered: &ordered,
@@ -191,41 +180,15 @@ func (c *WebRTCPeer) establishDataChannel() error {
 	dc, err := c.pc.CreateDataChannel(c.id, dataChannelOptions)
 	if err != nil {
 		log.Printf("CreateDataChannel ERROR: %s", err)
-		return err
+		return nil, err
 	}
+	openChannel := make(chan struct{})
 	dc.OnOpen(func() {
-		c.lock.Lock()
-		defer c.lock.Unlock()
 		log.Println("WebRTC: DataChannel.OnOpen")
-		if nil != c.transport {
-			panic("WebRTC: transport already exists.")
-		}
-		// Flush buffered outgoing SOCKS data if necessary.
-		if c.buffer.Len() > 0 {
-			dc.Send(c.buffer.Bytes())
-			log.Println("Flushed", c.buffer.Len(), "bytes.")
-			c.buffer.Reset()
-		}
-		// Then enable the datachannel.
-		c.transport = dc
+		close(openChannel)
 	})
 	dc.OnClose(func() {
-		c.lock.Lock()
-		// Future writes will go to the buffer until a new DataChannel is available.
-		if nil == c.transport {
-			// Closed locally, as part of a reset.
-			log.Println("WebRTC: DataChannel.OnClose [locally]")
-			c.lock.Unlock()
-			return
-		}
-		// Closed remotely, need to reset everything.
-		// Disable the DataChannel as a write destination.
-		log.Println("WebRTC: DataChannel.OnClose [remotely]")
-		c.transport = nil
-		dc.Close()
-		// Unlock before Close'ing, since it calls cleanup and asks for the
-		// lock to check if the transport needs to be be deleted.
-		c.lock.Unlock()
+		log.Println("WebRTC: DataChannel.OnClose")
 		c.Close()
 	})
 	dc.OnMessage(func(msg webrtc.DataChannelMessage) {
@@ -244,7 +207,14 @@ func (c *WebRTCPeer) establishDataChannel() error {
 		c.lastReceive = time.Now()
 	})
 	log.Println("WebRTC: DataChannel created.")
-	return nil
+
+	select {
+	case <-openChannel:
+		return dc, nil
+	case <-time.After(DataChannelTimeout):
+		dc.Close()
+		return nil, errors.New("timeout waiting for DataChannel.OnOpen")
+	}
 }
 
 // exchangeSDP sends the local SDP offer to the Broker, awaits the SDP answer,
@@ -266,27 +236,10 @@ func exchangeSDP(broker *BrokerChannel, offer *webrtc.SessionDescription) *webrt
 // Close all channels and transports
 func (c *WebRTCPeer) cleanup() {
 	// Close this side of the SOCKS pipe.
-	if nil != c.writePipe {
-		c.writePipe.Close()
-		c.writePipe = nil
-	}
-	c.lock.Lock()
+	c.writePipe.Close()
 	if nil != c.transport {
 		log.Printf("WebRTC: closing DataChannel")
-		dataChannel := c.transport
-		// Setting transport to nil *before* dc Close indicates to OnClose that
-		// this was locally triggered.
-		c.transport = nil
-		// Release the lock before calling DeleteDataChannel (which in turn
-		// calls Close on the dataChannel), but after nil'ing out the transport,
-		// since otherwise we'll end up in the onClose handler in a deadlock.
-		c.lock.Unlock()
-		if c.pc == nil {
-			panic("DataChannel w/o PeerConnection, not good.")
-		}
-		dataChannel.Close()
-	} else {
-		c.lock.Unlock()
+		c.transport.Close()
 	}
 	if nil != c.pc {
 		log.Printf("WebRTC: closing PeerConnection")
@@ -294,6 +247,5 @@ func (c *WebRTCPeer) cleanup() {
 		if nil != err {
 			log.Printf("Error closing peerconnection...")
 		}
-		c.pc = nil
 	}
 }



More information about the tor-commits mailing list