[tor-commits] [snowflake/master] Snowflake client now using a reconnect loop (#12)

serene at torproject.org serene at torproject.org
Thu Feb 18 22:15:38 UTC 2016


commit f205a0be59aec40a04580cd46dad1bf6e9eba6c3
Author: Serene Han <keroserene+git at gmail.com>
Date:   Wed Feb 17 19:19:11 2016 -0800

    Snowflake client now using a reconnect loop (#12)
---
 client/client_test.go |   4 ++
 client/snowflake.go   | 151 +++++++++++++++++++++++++++++---------------------
 2 files changed, 93 insertions(+), 62 deletions(-)

diff --git a/client/client_test.go b/client/client_test.go
index 7b8dad2..399549a 100644
--- a/client/client_test.go
+++ b/client/client_test.go
@@ -38,6 +38,10 @@ func TestConnect(t *testing.T) {
 				So(c.buffer.Bytes(), ShouldEqual, nil)
 				So(mock.destination.Bytes(), ShouldResemble, []byte("test"))
 			})
+
+			Convey("Connect Loop", func() {
+				// TODO
+			})
 		})
 
 	})
diff --git a/client/snowflake.go b/client/snowflake.go
index 771e90b..907c8ae 100644
--- a/client/snowflake.go
+++ b/client/snowflake.go
@@ -28,9 +28,12 @@ var frontDomain string
 // When a connection handler starts, +1 is written to this channel; when it
 // ends, -1 is written.
 var handlerChan = make(chan int)
-
 var answerChannel = make(chan *webrtc.SessionDescription)
 
+const (
+	ReconnectTimeout = 5
+)
+
 func copyLoop(a, b net.Conn) {
 	var wg sync.WaitGroup
 	wg.Add(2)
@@ -55,14 +58,16 @@ type SnowflakeChannel interface {
 
 // Implements net.Conn interface
 type webRTCConn struct {
+	config			 *webrtc.Configuration
 	pc           *webrtc.PeerConnection
-	snowflake    SnowflakeChannel  // Interface holding the WebRTC DataChannel.
+	snowflake    SnowflakeChannel // Interface holding the WebRTC DataChannel.
 	broker       *BrokerChannel
 	offerChannel chan *webrtc.SessionDescription
 	errorChannel chan error
 	recvPipe     *io.PipeReader
 	writePipe    *io.PipeWriter
 	buffer       bytes.Buffer
+	reset        chan struct{}
 }
 
 var webrtcRemote *webRTCConn
@@ -72,7 +77,6 @@ func (c *webRTCConn) Read(b []byte) (int, error) {
 }
 
 func (c *webRTCConn) Write(b []byte) (int, error) {
-	// log.Printf("webrtc Write %d %+q", len(b), string(b))
 	c.sendData(b)
 	return len(b), nil
 }
@@ -102,10 +106,56 @@ func (c *webRTCConn) SetWriteDeadline(t time.Time) error {
 	return fmt.Errorf("SetWriteDeadline not implemented")
 }
 
+func (c *webRTCConn) PreparePeerConnection() {
+	if nil != c.pc {
+		log.Printf("PeerConnection already exists.")
+		c.pc.Close()
+		c.pc = nil
+	}
+	pc, err := webrtc.NewPeerConnection(c.config)
+	if err != nil {
+		log.Printf("NewPeerConnection: %s", err)
+		c.errorChannel <- err
+	}
+	// Prepare PeerConnection callbacks.
+	pc.OnNegotiationNeeded = func() {
+		log.Println("WebRTC: OnNegotiationNeeded")
+		go func() {
+			offer, err := pc.CreateOffer()
+			// TODO: Potentially timeout and retry if ICE isn't working.
+			if err != nil {
+				c.errorChannel <- err
+				return
+			}
+			err = pc.SetLocalDescription(offer)
+			if err != nil {
+				c.errorChannel <- err
+				return
+			}
+		}()
+	}
+	pc.OnIceCandidate = func(candidate webrtc.IceCandidate) {
+		log.Printf("OnIceCandidate %s", candidate.Serialize())
+		// Allow candidates to accumulate until OnIceComplete.
+	}
+	// TODO: This may soon be deprecated, consider OnIceGatheringStateChange.
+	pc.OnIceComplete = func() {
+		log.Printf("OnIceComplete")
+		c.offerChannel <- pc.LocalDescription()
+	}
+	// This callback is not expected, as the Client initiates the creation
+	// of the data channel, not the remote peer.
+	pc.OnDataChannel = func(channel *webrtc.DataChannel) {
+		log.Println("OnDataChannel")
+		panic("Unexpected OnDataChannel!")
+	}
+	c.pc = pc
+}
+
 // Create a WebRTC DataChannel locally.
 func (c *webRTCConn) EstablishDataChannel() error {
 	dc, err := c.pc.CreateDataChannel("snowflake", webrtc.Init{})
-	// Triggers "OnNegotiationNeeded" on the PeerConnection, which will  prepare
+	// Triggers "OnNegotiationNeeded" on the PeerConnection, which will prepare
 	// an SDP offer while other goroutines operating on this struct handle the
 	// signaling. Eventually fires "OnOpen".
 	if err != nil {
@@ -126,7 +176,7 @@ func (c *webRTCConn) EstablishDataChannel() error {
 		// Future writes will go to the buffer until a new DataChannel is available.
 		log.Println("WebRTC: DataChannel.OnClose")
 		c.snowflake = nil
-		// TODO: (Issue #12) Should attempt to renegotiate at this point.
+		c.reset <- struct{}{} // Attempt to negotiate a new datachannel..
 	}
 	dc.OnMessage = func(msg []byte) {
 		log.Printf("OnMessage <--- %d bytes", len(msg))
@@ -144,7 +194,7 @@ func (c *webRTCConn) EstablishDataChannel() error {
 
 // Block until an offer is available, then send it to either
 // the Broker or signal pipe.
-func (c *webRTCConn) sendOffer() error {
+func (c *webRTCConn) SendOffer() error {
 	select {
 	case offer := <-c.offerChannel:
 		if "" == brokerURL {
@@ -166,6 +216,7 @@ func (c *webRTCConn) sendOffer() error {
 			if nil == answer {
 				log.Printf("BrokerChannel: No answer received.")
 				// TODO: Should try again here.
+				c.reset <- struct{}{}
 				return
 			}
 			answerChannel <- answer
@@ -177,6 +228,19 @@ func (c *webRTCConn) sendOffer() error {
 	return nil
 }
 
+func (c *webRTCConn) ReceiveAnswer() error {
+	log.Printf("waiting for answer...")
+	answer, ok := <-answerChannel
+	if !ok {
+		// TODO: Don't just fail, try again!
+		c.pc.Close()
+		// connection.errorChannel <- errors.New("Bad answer")
+		return errors.New("Bad answer")
+	}
+	log.Printf("Received Answer:\n\n%s\n", answer.Sdp)
+	return c.pc.SetRemoteDescription(answer)
+}
+
 func (c *webRTCConn) sendData(data []byte) {
 	// Buffer the data in case datachannel isn't available yet.
 	if nil == c.snowflake {
@@ -188,72 +252,35 @@ func (c *webRTCConn) sendData(data []byte) {
 	c.snowflake.Send(data)
 }
 
+// WebRTC re-establishment loop. Expected in own goroutine.
+func (c *webRTCConn) ConnectLoop() {
+	for {
+		log.Println("Establishing WebRTC connection...")
+		// TODO: When go-webrtc is more stable, it's possible that a new
+		// PeerConnection won't need to be recreated each time.
+		// called once.
+  	c.PreparePeerConnection()
+		c.EstablishDataChannel()
+		c.SendOffer()
+		c.ReceiveAnswer()
+		<-c.reset
+		log.Println(" --- snowflake connection reset ---")
+	}
+}
+
 // Initialize a WebRTC Connection.
 func dialWebRTC(config *webrtc.Configuration, broker *BrokerChannel) (
 	*webRTCConn, error) {
-	pc, err := webrtc.NewPeerConnection(config)
-	if err != nil {
-		log.Printf("NewPeerConnection: %s", err)
-		return nil, err
-	}
 	connection := new(webRTCConn)
+	connection.config = config
 	connection.broker = broker
-	connection.pc = pc
 	connection.offerChannel = make(chan *webrtc.SessionDescription)
 	connection.errorChannel = make(chan error)
+	connection.reset = make(chan struct{})
 	// Pipes remain the same even when DataChannel gets switched.
 	connection.recvPipe, connection.writePipe = io.Pipe()
 
-	pc.OnNegotiationNeeded = func() {
-		log.Println("OnNegotiationNeeded")
-		go func() {
-			offer, err := pc.CreateOffer()
-			// TODO: Potentially timeout and retry if ICE isn't working.
-			if err != nil {
-				connection.errorChannel <- err
-				return
-			}
-			err = pc.SetLocalDescription(offer)
-			if err != nil {
-				connection.errorChannel <- err
-				return
-			}
-		}()
-	}
-	pc.OnIceCandidate = func(candidate webrtc.IceCandidate) {
-		log.Printf("OnIceCandidate %s", candidate.Serialize())
-		// Allow candidates to accumulate until OnIceComplete.
-	}
-	// TODO: This may soon be deprecated, consider OnIceGatheringStateChange.
-	pc.OnIceComplete = func() {
-		log.Printf("OnIceComplete")
-		connection.offerChannel <- pc.LocalDescription()
-	}
-	// This callback is not expected, as the Client initiates the creation
-	// of the data channel, not the remote peer.
-	pc.OnDataChannel = func(channel *webrtc.DataChannel) {
-		log.Println("OnDataChannel")
-		panic("Unexpected OnDataChannel!")
-	}
-
-	connection.EstablishDataChannel()
-
-	// TODO: Make this part of a re-establishment loop.
-	connection.sendOffer()
-	log.Printf("waiting for answer...")
-	answer, ok := <-answerChannel
-	if !ok {
-		// TODO: Don't just fail, try again!
-		pc.Close()
-		return nil, fmt.Errorf("no answer received")
-	}
-	log.Printf("Received Answer:\n\n%s\n", answer.Sdp)
-	err = pc.SetRemoteDescription(answer)
-	if err != nil {
-		pc.Close()
-		return nil, err
-	}
-
+	go connection.ConnectLoop()
 	return connection, nil
 }
 





More information about the tor-commits mailing list