[tor-commits] [snowflake/master] Buffer writes to DataChannel, remove blocking on openChannel (#12)

serene at torproject.org serene at torproject.org
Thu Feb 18 22:15:38 UTC 2016


commit eb7eb04ac01b7a7d2466958b35403c6729014a1f
Author: Serene Han <keroserene+git at gmail.com>
Date:   Wed Feb 17 18:38:40 2016 -0800

    Buffer writes to DataChannel, remove blocking on openChannel (#12)
---
 client/client_test.go | 44 +++++++++++++++++++++++++
 client/snowflake.go   | 89 ++++++++++++++++++++++++++++-----------------------
 2 files changed, 93 insertions(+), 40 deletions(-)

diff --git a/client/client_test.go b/client/client_test.go
new file mode 100644
index 0000000..7b8dad2
--- /dev/null
+++ b/client/client_test.go
@@ -0,0 +1,44 @@
+package main
+
+import (
+	"bytes"
+	. "github.com/smartystreets/goconvey/convey"
+	"testing"
+)
+
+type MockDataChannel struct {
+	destination bytes.Buffer
+}
+
+func (m *MockDataChannel) Send(data []byte) {
+	m.destination.Write(data)
+}
+
+func (*MockDataChannel) Close() error {
+	return nil
+}
+
+func TestConnect(t *testing.T) {
+	Convey("Snowflake", t, func() {
+
+		Convey("WebRTC Connection", func() {
+			c := new(webRTCConn)
+			So(c.buffer.Bytes(), ShouldEqual, nil)
+
+			Convey("SendData buffers when datachannel is nil", func() {
+				c.sendData([]byte("test"))
+				c.snowflake = nil
+				So(c.buffer.Bytes(), ShouldResemble, []byte("test"))
+			})
+
+			Convey("SendData sends to datachannel when not nil", func() {
+				mock := new(MockDataChannel)
+				c.snowflake = mock
+				c.sendData([]byte("test"))
+				So(c.buffer.Bytes(), ShouldEqual, nil)
+				So(mock.destination.Bytes(), ShouldResemble, []byte("test"))
+			})
+		})
+
+	})
+}
diff --git a/client/snowflake.go b/client/snowflake.go
index 7c47fbb..771e90b 100644
--- a/client/snowflake.go
+++ b/client/snowflake.go
@@ -3,6 +3,7 @@ package main
 
 import (
 	"bufio"
+	"bytes"
 	"errors"
 	"flag"
 	"fmt"
@@ -46,16 +47,22 @@ func copyLoop(a, b net.Conn) {
 	wg.Wait()
 }
 
+// Interface that matches both webrc.DataChannel and for testing.
+type SnowflakeChannel interface {
+	Send([]byte)
+	Close() error
+}
+
 // Implements net.Conn interface
 type webRTCConn struct {
 	pc           *webrtc.PeerConnection
-	dc           *webrtc.DataChannel
+	snowflake    SnowflakeChannel  // Interface holding the WebRTC DataChannel.
 	broker       *BrokerChannel
-	recvPipe     *io.PipeReader
-	writePipe    *io.PipeWriter
 	offerChannel chan *webrtc.SessionDescription
 	errorChannel chan error
-	openChannel  chan struct{}
+	recvPipe     *io.PipeReader
+	writePipe    *io.PipeWriter
+	buffer       bytes.Buffer
 }
 
 var webrtcRemote *webRTCConn
@@ -66,9 +73,7 @@ func (c *webRTCConn) Read(b []byte) (int, error) {
 
 func (c *webRTCConn) Write(b []byte) (int, error) {
 	// log.Printf("webrtc Write %d %+q", len(b), string(b))
-	log.Printf("Write %d bytes --> WebRTC", len(b))
-	// Buffer in case datachannel isn't available.
-	c.dc.Send(b)
+	c.sendData(b)
 	return len(b), nil
 }
 
@@ -98,21 +103,29 @@ func (c *webRTCConn) SetWriteDeadline(t time.Time) error {
 }
 
 // Create a WebRTC DataChannel locally.
-// This triggers "OnNegotiationNeeded" which should prepare an SDP offer.
 func (c *webRTCConn) EstablishDataChannel() error {
 	dc, err := c.pc.CreateDataChannel("snowflake", webrtc.Init{})
+	// Triggers "OnNegotiationNeeded" on the PeerConnection, which will  prepare
+	// an SDP offer while other goroutines operating on this struct handle the
+	// signaling. Eventually fires "OnOpen".
 	if err != nil {
 		log.Printf("CreateDataChannel: %s", err)
 		return err
 	}
 	dc.OnOpen = func() {
-		log.Println("OnOpen channel")
-		c.openChannel <- struct{}{}
+		log.Println("WebRTC: DataChannel.OnOpen")
+		// Flush the buffer, then enable datachannel.
+		// TODO: Make this more safe
+		dc.Send(c.buffer.Bytes())
+		log.Println("Flushed ", c.buffer.Len(), " bytes")
+		c.buffer.Reset()
+		c.snowflake = dc
 	}
 	dc.OnClose = func() {
-		log.Println("OnClose channel")
-		// writePipe.Close()
-		close(c.openChannel)
+		// Disable the DataChannel as a write destination.
+		// Future writes will go to the buffer until a new DataChannel is available.
+		log.Println("WebRTC: DataChannel.OnClose")
+		c.snowflake = nil
 		// TODO: (Issue #12) Should attempt to renegotiate at this point.
 	}
 	dc.OnMessage = func(msg []byte) {
@@ -126,7 +139,6 @@ func (c *webRTCConn) EstablishDataChannel() error {
 			panic("short write")
 		}
 	}
-	c.dc = dc
 	return nil
 }
 
@@ -153,8 +165,8 @@ func (c *webRTCConn) sendOffer() error {
 			}
 			if nil == answer {
 				log.Printf("BrokerChannel: No answer received.")
+				// TODO: Should try again here.
 				return
-				// return errors.New("No answer received.")
 			}
 			answerChannel <- answer
 		}()
@@ -165,6 +177,18 @@ func (c *webRTCConn) sendOffer() error {
 	return nil
 }
 
+func (c *webRTCConn) sendData(data []byte) {
+	// Buffer the data in case datachannel isn't available yet.
+	if nil == c.snowflake {
+		log.Printf("Buffered %d bytes --> WebRTC", len(data))
+		c.buffer.Write(data)
+		return
+	}
+	log.Printf("Write %d bytes --> WebRTC", len(data))
+	c.snowflake.Send(data)
+}
+
+// Initialize a WebRTC Connection.
 func dialWebRTC(config *webrtc.Configuration, broker *BrokerChannel) (
 	*webRTCConn, error) {
 	pc, err := webrtc.NewPeerConnection(config)
@@ -177,13 +201,14 @@ func dialWebRTC(config *webrtc.Configuration, broker *BrokerChannel) (
 	connection.pc = pc
 	connection.offerChannel = make(chan *webrtc.SessionDescription)
 	connection.errorChannel = make(chan error)
-	connection.openChannel = make(chan struct{})
+	// Pipes remain the same even when DataChannel gets switched.
+	connection.recvPipe, connection.writePipe = io.Pipe()
 
-	// Triggered by CreateDataChannel.
 	pc.OnNegotiationNeeded = func() {
 		log.Println("OnNegotiationNeeded")
 		go func() {
 			offer, err := pc.CreateOffer()
+			// TODO: Potentially timeout and retry if ICE isn't working.
 			if err != nil {
 				connection.errorChannel <- err
 				return
@@ -208,18 +233,17 @@ func dialWebRTC(config *webrtc.Configuration, broker *BrokerChannel) (
 	// of the data channel, not the remote peer.
 	pc.OnDataChannel = func(channel *webrtc.DataChannel) {
 		log.Println("OnDataChannel")
-		panic("OnDataChannel")
+		panic("Unexpected OnDataChannel!")
 	}
 
-	// Pipes remain the same even when DataChannel gets switched.
-	connection.recvPipe, connection.writePipe = io.Pipe()
-
 	connection.EstablishDataChannel()
-	connection.sendOffer()
 
+	// TODO: Make this part of a re-establishment loop.
+	connection.sendOffer()
 	log.Printf("waiting for answer...")
 	answer, ok := <-answerChannel
 	if !ok {
+		// TODO: Don't just fail, try again!
 		pc.Close()
 		return nil, fmt.Errorf("no answer received")
 	}
@@ -230,15 +254,6 @@ func dialWebRTC(config *webrtc.Configuration, broker *BrokerChannel) (
 		return nil, err
 	}
 
-	// Wait until data channel is open; otherwise for example sends may get
-	// lost.
-	// TODO: Buffering *should* work though.
-	_, ok = <-connection.openChannel
-	if !ok {
-		pc.Close()
-		return nil, fmt.Errorf("failed to open data channel")
-	}
-
 	return connection, nil
 }
 
@@ -247,9 +262,9 @@ func endWebRTC() {
 	if nil == webrtcRemote {
 		return
 	}
-	if nil != webrtcRemote.dc {
+	if nil != webrtcRemote.snowflake {
 		log.Printf("WebRTC: closing DataChannel")
-		webrtcRemote.dc.Close()
+		webrtcRemote.snowflake.Close()
 	}
 	if nil != webrtcRemote.pc {
 		log.Printf("WebRTC: closing PeerConnection")
@@ -332,7 +347,7 @@ func readSignalingMessages(f *os.File) {
 
 func main() {
 	var err error
-
+	webrtc.SetLoggingVerbosity(1)
 	flag.StringVar(&brokerURL, "url", "", "URL of signaling broker")
 	flag.StringVar(&frontDomain, "front", "", "front domain")
 	flag.Parse()
@@ -368,8 +383,6 @@ func main() {
 		go readSignalingMessages(signalFile)
 	}
 
-	webrtc.SetLoggingVerbosity(1)
-
 	ptInfo, err = pt.ClientSetup(nil)
 	if err != nil {
 		log.Fatal(err)
@@ -417,10 +430,6 @@ func main() {
 		ln.Close()
 	}
 
-	// if syscall.SIGTERM == sig || syscall.SIGINT == sig {
-	// return
-	// }
-
 	// wait for second signal or no more handlers
 	sig = nil
 	for sig == nil && numHandlers != 0 {





More information about the tor-commits mailing list