
commit a1b7e01c5423c69548afdf44c7455736605b5574 Author: Serene Han <keroserene+git@gmail.com> Date: Wed Feb 17 20:41:33 2016 -0800 Include answer channel as part of the webRTCConn struct (#12) --- client/client_test.go | 4 ++- client/snowflake.go | 78 ++++++++++++++++++++++++++++++++------------------- 2 files changed, 52 insertions(+), 30 deletions(-) diff --git a/client/client_test.go b/client/client_test.go index 546382f..ff15d6d 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -2,6 +2,7 @@ package main import ( "bytes" + "github.com/keroserene/go-webrtc" . "github.com/smartystreets/goconvey/convey" "testing" ) @@ -41,8 +42,9 @@ func TestConnect(t *testing.T) { Convey("Receive answer fails on nil answer", func() { c.reset = make(chan struct{}) + c.answerChannel = make(chan *webrtc.SessionDescription) c.ReceiveAnswer() - answerChannel <- nil + c.answerChannel <- nil <-c.reset }) diff --git a/client/snowflake.go b/client/snowflake.go index 6d612ce..081cb3b 100644 --- a/client/snowflake.go +++ b/client/snowflake.go @@ -28,7 +28,6 @@ var frontDomain string // When a connection handler starts, +1 is written to this channel; when it // ends, -1 is written. var handlerChan = make(chan int) -var answerChannel = make(chan *webrtc.SessionDescription) const ( ReconnectTimeout = 5 @@ -37,7 +36,6 @@ const ( func copyLoop(a, b net.Conn) { var wg sync.WaitGroup wg.Add(2) - go func() { io.Copy(b, a) wg.Done() @@ -46,8 +44,8 @@ func copyLoop(a, b net.Conn) { io.Copy(a, b) wg.Done() }() - wg.Wait() + log.Println("copy loop ended") } // Interface that matches both webrc.DataChannel and for testing. @@ -58,16 +56,17 @@ type SnowflakeChannel interface { // Implements net.Conn interface type webRTCConn struct { - config *webrtc.Configuration - pc *webrtc.PeerConnection - snowflake SnowflakeChannel // Interface holding the WebRTC DataChannel. - broker *BrokerChannel - offerChannel chan *webrtc.SessionDescription - errorChannel chan error - recvPipe *io.PipeReader - writePipe *io.PipeWriter - buffer bytes.Buffer - reset chan struct{} + config *webrtc.Configuration + pc *webrtc.PeerConnection + snowflake SnowflakeChannel // Interface holding the WebRTC DataChannel. + broker *BrokerChannel + offerChannel chan *webrtc.SessionDescription + answerChannel chan *webrtc.SessionDescription + errorChannel chan error + recvPipe *io.PipeReader + writePipe *io.PipeWriter + buffer bytes.Buffer + reset chan struct{} } var webrtcRemote *webRTCConn @@ -164,6 +163,9 @@ func (c *webRTCConn) EstablishDataChannel() error { } dc.OnOpen = func() { log.Println("WebRTC: DataChannel.OnOpen") + // if nil != c.snowflake { + // panic("PeerConnection snowflake already exists.") + // } // Flush the buffer, then enable datachannel. // TODO: Make this more safe dc.Send(c.buffer.Bytes()) @@ -175,14 +177,18 @@ func (c *webRTCConn) EstablishDataChannel() error { // Disable the DataChannel as a write destination. // Future writes will go to the buffer until a new DataChannel is available. log.Println("WebRTC: DataChannel.OnClose") - c.snowflake = nil - c.reset <- struct{}{} // Attempt to negotiate a new datachannel.. + if nil != c.snowflake { + c.snowflake = nil + // Only reset if this OnClose triggered + c.Reset() + } } dc.OnMessage = func(msg []byte) { log.Printf("OnMessage <--- %d bytes", len(msg)) n, err := c.writePipe.Write(msg) if err != nil { // TODO: Maybe shouldn't actually close. + log.Println("Error writing to SOCKS pipe") c.writePipe.CloseWithError(err) } if n != len(msg) { @@ -205,7 +211,7 @@ func (c *webRTCConn) SendOffer() error { log.Printf("----------------") return nil } - // Use Broker... + // Otherwise, use Broker. go func() { log.Println("Sending offer via BrokerChannel...\nTarget URL: ", brokerURL, "\nFront URL: ", frontDomain) @@ -214,7 +220,7 @@ func (c *webRTCConn) SendOffer() error { log.Printf("BrokerChannel error: %s", err) answer = nil } - answerChannel <- answer + c.answerChannel <- answer }() case err := <-c.errorChannel: c.pc.Close() @@ -225,11 +231,11 @@ func (c *webRTCConn) SendOffer() error { func (c *webRTCConn) ReceiveAnswer() { go func() { - answer, ok := <-answerChannel + answer, ok := <-c.answerChannel if !ok || nil == answer { log.Printf("Failed to retrieve answer. Retrying in %d seconds", ReconnectTimeout) <-time.After(time.Second * ReconnectTimeout) - c.reset <- struct{}{} + c.Reset() return } log.Printf("Received Answer:\n\n%s\n", answer.Sdp) @@ -247,6 +253,12 @@ func (c *webRTCConn) sendData(data []byte) { c.buffer.Write(data) return } + // Otherwise, flush buffer if necessary. + for c.buffer.Len() > 0 { + c.snowflake.Send(c.buffer.Bytes()) + log.Println("Flushed ", c.buffer.Len(), " bytes") + c.buffer.Reset() + } log.Printf("Write %d bytes --> WebRTC", len(data)) c.snowflake.Send(data) } @@ -256,18 +268,25 @@ func (c *webRTCConn) ConnectLoop() { for { log.Println("Establishing WebRTC connection...") // TODO: When go-webrtc is more stable, it's possible that a new - // PeerConnection won't need to be recreated each time. - // called once. + // PeerConnection won't need to be re-prepared each time. c.PreparePeerConnection() - c.EstablishDataChannel() - c.SendOffer() - c.ReceiveAnswer() - - <-c.reset - log.Println(" --- snowflake connection reset ---") + err := c.EstablishDataChannel() + if err == nil { + c.SendOffer() + c.ReceiveAnswer() + <-c.reset + log.Println(" --- snowflake connection reset ---") + } } } +func (c *webRTCConn) Reset() { + go func() { + c.reset <- struct{}{} // Attempt to negotiate a new datachannel.. + log.Println("WebRTC resetting...") + }() +} + // Initialize a WebRTC Connection. func dialWebRTC(config *webrtc.Configuration, broker *BrokerChannel) ( *webRTCConn, error) { @@ -275,6 +294,7 @@ func dialWebRTC(config *webrtc.Configuration, broker *BrokerChannel) ( connection.config = config connection.broker = broker connection.offerChannel = make(chan *webrtc.SessionDescription) + connection.answerChannel = make(chan *webrtc.SessionDescription) connection.errorChannel = make(chan error) connection.reset = make(chan struct{}) // Pipes remain the same even when DataChannel gets switched. @@ -363,10 +383,10 @@ func readSignalingMessages(f *os.File) { log.Printf("ignoring invalid signal message %+q", msg) continue } - answerChannel <- sdp + webrtcRemote.answerChannel <- sdp } log.Printf("close answerChannel") - close(answerChannel) + close(webrtcRemote.answerChannel) if err := s.Err(); err != nil { log.Printf("signal FIFO: %s", err) }
participants (1)
-
serene@torproject.org