
commit f205a0be59aec40a04580cd46dad1bf6e9eba6c3 Author: Serene Han <keroserene+git@gmail.com> Date: Wed Feb 17 19:19:11 2016 -0800 Snowflake client now using a reconnect loop (#12) --- client/client_test.go | 4 ++ client/snowflake.go | 151 +++++++++++++++++++++++++++++--------------------- 2 files changed, 93 insertions(+), 62 deletions(-) diff --git a/client/client_test.go b/client/client_test.go index 7b8dad2..399549a 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -38,6 +38,10 @@ func TestConnect(t *testing.T) { So(c.buffer.Bytes(), ShouldEqual, nil) So(mock.destination.Bytes(), ShouldResemble, []byte("test")) }) + + Convey("Connect Loop", func() { + // TODO + }) }) }) diff --git a/client/snowflake.go b/client/snowflake.go index 771e90b..907c8ae 100644 --- a/client/snowflake.go +++ b/client/snowflake.go @@ -28,9 +28,12 @@ var frontDomain string // When a connection handler starts, +1 is written to this channel; when it // ends, -1 is written. var handlerChan = make(chan int) - var answerChannel = make(chan *webrtc.SessionDescription) +const ( + ReconnectTimeout = 5 +) + func copyLoop(a, b net.Conn) { var wg sync.WaitGroup wg.Add(2) @@ -55,14 +58,16 @@ type SnowflakeChannel interface { // Implements net.Conn interface type webRTCConn struct { + config *webrtc.Configuration pc *webrtc.PeerConnection - snowflake SnowflakeChannel // Interface holding the WebRTC DataChannel. + snowflake SnowflakeChannel // Interface holding the WebRTC DataChannel. broker *BrokerChannel offerChannel chan *webrtc.SessionDescription errorChannel chan error recvPipe *io.PipeReader writePipe *io.PipeWriter buffer bytes.Buffer + reset chan struct{} } var webrtcRemote *webRTCConn @@ -72,7 +77,6 @@ func (c *webRTCConn) Read(b []byte) (int, error) { } func (c *webRTCConn) Write(b []byte) (int, error) { - // log.Printf("webrtc Write %d %+q", len(b), string(b)) c.sendData(b) return len(b), nil } @@ -102,10 +106,56 @@ func (c *webRTCConn) SetWriteDeadline(t time.Time) error { return fmt.Errorf("SetWriteDeadline not implemented") } +func (c *webRTCConn) PreparePeerConnection() { + if nil != c.pc { + log.Printf("PeerConnection already exists.") + c.pc.Close() + c.pc = nil + } + pc, err := webrtc.NewPeerConnection(c.config) + if err != nil { + log.Printf("NewPeerConnection: %s", err) + c.errorChannel <- err + } + // Prepare PeerConnection callbacks. + pc.OnNegotiationNeeded = func() { + log.Println("WebRTC: OnNegotiationNeeded") + go func() { + offer, err := pc.CreateOffer() + // TODO: Potentially timeout and retry if ICE isn't working. + if err != nil { + c.errorChannel <- err + return + } + err = pc.SetLocalDescription(offer) + if err != nil { + c.errorChannel <- err + return + } + }() + } + pc.OnIceCandidate = func(candidate webrtc.IceCandidate) { + log.Printf("OnIceCandidate %s", candidate.Serialize()) + // Allow candidates to accumulate until OnIceComplete. + } + // TODO: This may soon be deprecated, consider OnIceGatheringStateChange. + pc.OnIceComplete = func() { + log.Printf("OnIceComplete") + c.offerChannel <- pc.LocalDescription() + } + // This callback is not expected, as the Client initiates the creation + // of the data channel, not the remote peer. + pc.OnDataChannel = func(channel *webrtc.DataChannel) { + log.Println("OnDataChannel") + panic("Unexpected OnDataChannel!") + } + c.pc = pc +} + // Create a WebRTC DataChannel locally. func (c *webRTCConn) EstablishDataChannel() error { dc, err := c.pc.CreateDataChannel("snowflake", webrtc.Init{}) - // Triggers "OnNegotiationNeeded" on the PeerConnection, which will prepare + // Triggers "OnNegotiationNeeded" on the PeerConnection, which will prepare // an SDP offer while other goroutines operating on this struct handle the // signaling. Eventually fires "OnOpen". if err != nil { @@ -126,7 +176,7 @@ func (c *webRTCConn) EstablishDataChannel() error { // Future writes will go to the buffer until a new DataChannel is available. log.Println("WebRTC: DataChannel.OnClose") c.snowflake = nil - // TODO: (Issue #12) Should attempt to renegotiate at this point. + c.reset <- struct{}{} // Attempt to negotiate a new datachannel.. } dc.OnMessage = func(msg []byte) { log.Printf("OnMessage <--- %d bytes", len(msg)) @@ -144,7 +194,7 @@ func (c *webRTCConn) EstablishDataChannel() error { // Block until an offer is available, then send it to either // the Broker or signal pipe. -func (c *webRTCConn) sendOffer() error { +func (c *webRTCConn) SendOffer() error { select { case offer := <-c.offerChannel: if "" == brokerURL { @@ -166,6 +216,7 @@ func (c *webRTCConn) sendOffer() error { if nil == answer { log.Printf("BrokerChannel: No answer received.") // TODO: Should try again here. + c.reset <- struct{}{} return } answerChannel <- answer @@ -177,6 +228,19 @@ func (c *webRTCConn) sendOffer() error { return nil } +func (c *webRTCConn) ReceiveAnswer() error { + log.Printf("waiting for answer...") + answer, ok := <-answerChannel + if !ok { + // TODO: Don't just fail, try again! + c.pc.Close() + // connection.errorChannel <- errors.New("Bad answer") + return errors.New("Bad answer") + } + log.Printf("Received Answer:\n\n%s\n", answer.Sdp) + return c.pc.SetRemoteDescription(answer) +} + func (c *webRTCConn) sendData(data []byte) { // Buffer the data in case datachannel isn't available yet. if nil == c.snowflake { @@ -188,72 +252,35 @@ func (c *webRTCConn) sendData(data []byte) { c.snowflake.Send(data) } +// WebRTC re-establishment loop. Expected in own goroutine. +func (c *webRTCConn) ConnectLoop() { + for { + log.Println("Establishing WebRTC connection...") + // TODO: When go-webrtc is more stable, it's possible that a new + // PeerConnection won't need to be recreated each time. + // called once. + c.PreparePeerConnection() + c.EstablishDataChannel() + c.SendOffer() + c.ReceiveAnswer() + <-c.reset + log.Println(" --- snowflake connection reset ---") + } +} + // Initialize a WebRTC Connection. func dialWebRTC(config *webrtc.Configuration, broker *BrokerChannel) ( *webRTCConn, error) { - pc, err := webrtc.NewPeerConnection(config) - if err != nil { - log.Printf("NewPeerConnection: %s", err) - return nil, err - } connection := new(webRTCConn) + connection.config = config connection.broker = broker - connection.pc = pc connection.offerChannel = make(chan *webrtc.SessionDescription) connection.errorChannel = make(chan error) + connection.reset = make(chan struct{}) // Pipes remain the same even when DataChannel gets switched. connection.recvPipe, connection.writePipe = io.Pipe() - pc.OnNegotiationNeeded = func() { - log.Println("OnNegotiationNeeded") - go func() { - offer, err := pc.CreateOffer() - // TODO: Potentially timeout and retry if ICE isn't working. - if err != nil { - connection.errorChannel <- err - return - } - err = pc.SetLocalDescription(offer) - if err != nil { - connection.errorChannel <- err - return - } - }() - } - pc.OnIceCandidate = func(candidate webrtc.IceCandidate) { - log.Printf("OnIceCandidate %s", candidate.Serialize()) - // Allow candidates to accumulate until OnIceComplete. - } - // TODO: This may soon be deprecated, consider OnIceGatheringStateChange. - pc.OnIceComplete = func() { - log.Printf("OnIceComplete") - connection.offerChannel <- pc.LocalDescription() - } - // This callback is not expected, as the Client initiates the creation - // of the data channel, not the remote peer. - pc.OnDataChannel = func(channel *webrtc.DataChannel) { - log.Println("OnDataChannel") - panic("Unexpected OnDataChannel!") - } - - connection.EstablishDataChannel() - - // TODO: Make this part of a re-establishment loop. - connection.sendOffer() - log.Printf("waiting for answer...") - answer, ok := <-answerChannel - if !ok { - // TODO: Don't just fail, try again! - pc.Close() - return nil, fmt.Errorf("no answer received") - } - log.Printf("Received Answer:\n\n%s\n", answer.Sdp) - err = pc.SetRemoteDescription(answer) - if err != nil { - pc.Close() - return nil, err - } - + go connection.ConnectLoop() return connection, nil }
participants (1)
-
serene@torproject.org