[tor-commits] [snowflake/master] Include answer channel as part of the webRTCConn struct (#12)

serene at torproject.org serene at torproject.org
Thu Feb 18 22:15:38 UTC 2016


commit a1b7e01c5423c69548afdf44c7455736605b5574
Author: Serene Han <keroserene+git at gmail.com>
Date:   Wed Feb 17 20:41:33 2016 -0800

    Include answer channel as part of the webRTCConn struct (#12)
---
 client/client_test.go |  4 ++-
 client/snowflake.go   | 78 ++++++++++++++++++++++++++++++++-------------------
 2 files changed, 52 insertions(+), 30 deletions(-)

diff --git a/client/client_test.go b/client/client_test.go
index 546382f..ff15d6d 100644
--- a/client/client_test.go
+++ b/client/client_test.go
@@ -2,6 +2,7 @@ package main
 
 import (
 	"bytes"
+	"github.com/keroserene/go-webrtc"
 	. "github.com/smartystreets/goconvey/convey"
 	"testing"
 )
@@ -41,8 +42,9 @@ func TestConnect(t *testing.T) {
 
 			Convey("Receive answer fails on nil answer", func() {
 				c.reset = make(chan struct{})
+				c.answerChannel = make(chan *webrtc.SessionDescription)
 				c.ReceiveAnswer()
-				answerChannel <- nil
+				c.answerChannel <- nil
 				<-c.reset
 			})
 
diff --git a/client/snowflake.go b/client/snowflake.go
index 6d612ce..081cb3b 100644
--- a/client/snowflake.go
+++ b/client/snowflake.go
@@ -28,7 +28,6 @@ var frontDomain string
 // When a connection handler starts, +1 is written to this channel; when it
 // ends, -1 is written.
 var handlerChan = make(chan int)
-var answerChannel = make(chan *webrtc.SessionDescription)
 
 const (
 	ReconnectTimeout = 5
@@ -37,7 +36,6 @@ const (
 func copyLoop(a, b net.Conn) {
 	var wg sync.WaitGroup
 	wg.Add(2)
-
 	go func() {
 		io.Copy(b, a)
 		wg.Done()
@@ -46,8 +44,8 @@ func copyLoop(a, b net.Conn) {
 		io.Copy(a, b)
 		wg.Done()
 	}()
-
 	wg.Wait()
+	log.Println("copy loop ended")
 }
 
 // Interface that matches both webrc.DataChannel and for testing.
@@ -58,16 +56,17 @@ type SnowflakeChannel interface {
 
 // Implements net.Conn interface
 type webRTCConn struct {
-	config       *webrtc.Configuration
-	pc           *webrtc.PeerConnection
-	snowflake    SnowflakeChannel // Interface holding the WebRTC DataChannel.
-	broker       *BrokerChannel
-	offerChannel chan *webrtc.SessionDescription
-	errorChannel chan error
-	recvPipe     *io.PipeReader
-	writePipe    *io.PipeWriter
-	buffer       bytes.Buffer
-	reset        chan struct{}
+	config        *webrtc.Configuration
+	pc            *webrtc.PeerConnection
+	snowflake     SnowflakeChannel // Interface holding the WebRTC DataChannel.
+	broker        *BrokerChannel
+	offerChannel  chan *webrtc.SessionDescription
+	answerChannel chan *webrtc.SessionDescription
+	errorChannel  chan error
+	recvPipe      *io.PipeReader
+	writePipe     *io.PipeWriter
+	buffer        bytes.Buffer
+	reset         chan struct{}
 }
 
 var webrtcRemote *webRTCConn
@@ -164,6 +163,9 @@ func (c *webRTCConn) EstablishDataChannel() error {
 	}
 	dc.OnOpen = func() {
 		log.Println("WebRTC: DataChannel.OnOpen")
+		// if nil != c.snowflake {
+			// panic("PeerConnection snowflake already exists.")
+		// }
 		// Flush the buffer, then enable datachannel.
 		// TODO: Make this more safe
 		dc.Send(c.buffer.Bytes())
@@ -175,14 +177,18 @@ func (c *webRTCConn) EstablishDataChannel() error {
 		// Disable the DataChannel as a write destination.
 		// Future writes will go to the buffer until a new DataChannel is available.
 		log.Println("WebRTC: DataChannel.OnClose")
-		c.snowflake = nil
-		c.reset <- struct{}{} // Attempt to negotiate a new datachannel..
+		if nil != c.snowflake {
+			c.snowflake = nil
+			// Only reset if this OnClose triggered
+			c.Reset()
+		}
 	}
 	dc.OnMessage = func(msg []byte) {
 		log.Printf("OnMessage <--- %d bytes", len(msg))
 		n, err := c.writePipe.Write(msg)
 		if err != nil {
 			// TODO: Maybe shouldn't actually close.
+			log.Println("Error writing to SOCKS pipe")
 			c.writePipe.CloseWithError(err)
 		}
 		if n != len(msg) {
@@ -205,7 +211,7 @@ func (c *webRTCConn) SendOffer() error {
 			log.Printf("----------------")
 			return nil
 		}
-		// Use Broker...
+		// Otherwise, use Broker.
 		go func() {
 			log.Println("Sending offer via BrokerChannel...\nTarget URL: ", brokerURL,
 				"\nFront URL:  ", frontDomain)
@@ -214,7 +220,7 @@ func (c *webRTCConn) SendOffer() error {
 				log.Printf("BrokerChannel error: %s", err)
 				answer = nil
 			}
-			answerChannel <- answer
+			c.answerChannel <- answer
 		}()
 	case err := <-c.errorChannel:
 		c.pc.Close()
@@ -225,11 +231,11 @@ func (c *webRTCConn) SendOffer() error {
 
 func (c *webRTCConn) ReceiveAnswer() {
 	go func() {
-		answer, ok := <-answerChannel
+		answer, ok := <-c.answerChannel
 		if !ok || nil == answer {
 			log.Printf("Failed to retrieve answer. Retrying in %d seconds", ReconnectTimeout)
 			<-time.After(time.Second * ReconnectTimeout)
-			c.reset <- struct{}{}
+			c.Reset()
 			return
 		}
 		log.Printf("Received Answer:\n\n%s\n", answer.Sdp)
@@ -247,6 +253,12 @@ func (c *webRTCConn) sendData(data []byte) {
 		c.buffer.Write(data)
 		return
 	}
+	// Otherwise, flush buffer if necessary.
+	for c.buffer.Len() > 0 {
+		c.snowflake.Send(c.buffer.Bytes())
+		log.Println("Flushed ", c.buffer.Len(), " bytes")
+		c.buffer.Reset()
+	}
 	log.Printf("Write %d bytes --> WebRTC", len(data))
 	c.snowflake.Send(data)
 }
@@ -256,18 +268,25 @@ func (c *webRTCConn) ConnectLoop() {
 	for {
 		log.Println("Establishing WebRTC connection...")
 		// TODO: When go-webrtc is more stable, it's possible that a new
-		// PeerConnection won't need to be recreated each time.
-		// called once.
+		// PeerConnection won't need to be re-prepared each time.
 		c.PreparePeerConnection()
-		c.EstablishDataChannel()
-		c.SendOffer()
-		c.ReceiveAnswer()
-
-		<-c.reset
-		log.Println(" --- snowflake connection reset ---")
+		err := c.EstablishDataChannel()
+		if err == nil {
+			c.SendOffer()
+			c.ReceiveAnswer()
+			<-c.reset
+			log.Println(" --- snowflake connection reset ---")
+		}
 	}
 }
 
+func (c *webRTCConn) Reset() {
+	go func() {
+		c.reset <- struct{}{} // Attempt to negotiate a new datachannel..
+		log.Println("WebRTC resetting...")
+	}()
+}
+
 // Initialize a WebRTC Connection.
 func dialWebRTC(config *webrtc.Configuration, broker *BrokerChannel) (
 	*webRTCConn, error) {
@@ -275,6 +294,7 @@ func dialWebRTC(config *webrtc.Configuration, broker *BrokerChannel) (
 	connection.config = config
 	connection.broker = broker
 	connection.offerChannel = make(chan *webrtc.SessionDescription)
+	connection.answerChannel = make(chan *webrtc.SessionDescription)
 	connection.errorChannel = make(chan error)
 	connection.reset = make(chan struct{})
 	// Pipes remain the same even when DataChannel gets switched.
@@ -363,10 +383,10 @@ func readSignalingMessages(f *os.File) {
 			log.Printf("ignoring invalid signal message %+q", msg)
 			continue
 		}
-		answerChannel <- sdp
+		webrtcRemote.answerChannel <- sdp
 	}
 	log.Printf("close answerChannel")
-	close(answerChannel)
+	close(webrtcRemote.answerChannel)
 	if err := s.Err(); err != nil {
 		log.Printf("signal FIFO: %s", err)
 	}





More information about the tor-commits mailing list