
commit eb7eb04ac01b7a7d2466958b35403c6729014a1f Author: Serene Han <keroserene+git@gmail.com> Date: Wed Feb 17 18:38:40 2016 -0800 Buffer writes to DataChannel, remove blocking on openChannel (#12) --- client/client_test.go | 44 +++++++++++++++++++++++++ client/snowflake.go | 89 ++++++++++++++++++++++++++++----------------------- 2 files changed, 93 insertions(+), 40 deletions(-) diff --git a/client/client_test.go b/client/client_test.go new file mode 100644 index 0000000..7b8dad2 --- /dev/null +++ b/client/client_test.go @@ -0,0 +1,44 @@ +package main + +import ( + "bytes" + . "github.com/smartystreets/goconvey/convey" + "testing" +) + +type MockDataChannel struct { + destination bytes.Buffer +} + +func (m *MockDataChannel) Send(data []byte) { + m.destination.Write(data) +} + +func (*MockDataChannel) Close() error { + return nil +} + +func TestConnect(t *testing.T) { + Convey("Snowflake", t, func() { + + Convey("WebRTC Connection", func() { + c := new(webRTCConn) + So(c.buffer.Bytes(), ShouldEqual, nil) + + Convey("SendData buffers when datachannel is nil", func() { + c.sendData([]byte("test")) + c.snowflake = nil + So(c.buffer.Bytes(), ShouldResemble, []byte("test")) + }) + + Convey("SendData sends to datachannel when not nil", func() { + mock := new(MockDataChannel) + c.snowflake = mock + c.sendData([]byte("test")) + So(c.buffer.Bytes(), ShouldEqual, nil) + So(mock.destination.Bytes(), ShouldResemble, []byte("test")) + }) + }) + + }) +} diff --git a/client/snowflake.go b/client/snowflake.go index 7c47fbb..771e90b 100644 --- a/client/snowflake.go +++ b/client/snowflake.go @@ -3,6 +3,7 @@ package main import ( "bufio" + "bytes" "errors" "flag" "fmt" @@ -46,16 +47,22 @@ func copyLoop(a, b net.Conn) { wg.Wait() } +// Interface that matches both webrc.DataChannel and for testing. +type SnowflakeChannel interface { + Send([]byte) + Close() error +} + // Implements net.Conn interface type webRTCConn struct { pc *webrtc.PeerConnection - dc *webrtc.DataChannel + snowflake SnowflakeChannel // Interface holding the WebRTC DataChannel. broker *BrokerChannel - recvPipe *io.PipeReader - writePipe *io.PipeWriter offerChannel chan *webrtc.SessionDescription errorChannel chan error - openChannel chan struct{} + recvPipe *io.PipeReader + writePipe *io.PipeWriter + buffer bytes.Buffer } var webrtcRemote *webRTCConn @@ -66,9 +73,7 @@ func (c *webRTCConn) Read(b []byte) (int, error) { func (c *webRTCConn) Write(b []byte) (int, error) { // log.Printf("webrtc Write %d %+q", len(b), string(b)) - log.Printf("Write %d bytes --> WebRTC", len(b)) - // Buffer in case datachannel isn't available. - c.dc.Send(b) + c.sendData(b) return len(b), nil } @@ -98,21 +103,29 @@ func (c *webRTCConn) SetWriteDeadline(t time.Time) error { } // Create a WebRTC DataChannel locally. -// This triggers "OnNegotiationNeeded" which should prepare an SDP offer. func (c *webRTCConn) EstablishDataChannel() error { dc, err := c.pc.CreateDataChannel("snowflake", webrtc.Init{}) + // Triggers "OnNegotiationNeeded" on the PeerConnection, which will prepare + // an SDP offer while other goroutines operating on this struct handle the + // signaling. Eventually fires "OnOpen". if err != nil { log.Printf("CreateDataChannel: %s", err) return err } dc.OnOpen = func() { - log.Println("OnOpen channel") - c.openChannel <- struct{}{} + log.Println("WebRTC: DataChannel.OnOpen") + // Flush the buffer, then enable datachannel. + // TODO: Make this more safe + dc.Send(c.buffer.Bytes()) + log.Println("Flushed ", c.buffer.Len(), " bytes") + c.buffer.Reset() + c.snowflake = dc } dc.OnClose = func() { - log.Println("OnClose channel") - // writePipe.Close() - close(c.openChannel) + // Disable the DataChannel as a write destination. + // Future writes will go to the buffer until a new DataChannel is available. + log.Println("WebRTC: DataChannel.OnClose") + c.snowflake = nil // TODO: (Issue #12) Should attempt to renegotiate at this point. } dc.OnMessage = func(msg []byte) { @@ -126,7 +139,6 @@ func (c *webRTCConn) EstablishDataChannel() error { panic("short write") } } - c.dc = dc return nil } @@ -153,8 +165,8 @@ func (c *webRTCConn) sendOffer() error { } if nil == answer { log.Printf("BrokerChannel: No answer received.") + // TODO: Should try again here. return - // return errors.New("No answer received.") } answerChannel <- answer }() @@ -165,6 +177,18 @@ func (c *webRTCConn) sendOffer() error { return nil } +func (c *webRTCConn) sendData(data []byte) { + // Buffer the data in case datachannel isn't available yet. + if nil == c.snowflake { + log.Printf("Buffered %d bytes --> WebRTC", len(data)) + c.buffer.Write(data) + return + } + log.Printf("Write %d bytes --> WebRTC", len(data)) + c.snowflake.Send(data) +} + +// Initialize a WebRTC Connection. func dialWebRTC(config *webrtc.Configuration, broker *BrokerChannel) ( *webRTCConn, error) { pc, err := webrtc.NewPeerConnection(config) @@ -177,13 +201,14 @@ func dialWebRTC(config *webrtc.Configuration, broker *BrokerChannel) ( connection.pc = pc connection.offerChannel = make(chan *webrtc.SessionDescription) connection.errorChannel = make(chan error) - connection.openChannel = make(chan struct{}) + // Pipes remain the same even when DataChannel gets switched. + connection.recvPipe, connection.writePipe = io.Pipe() - // Triggered by CreateDataChannel. pc.OnNegotiationNeeded = func() { log.Println("OnNegotiationNeeded") go func() { offer, err := pc.CreateOffer() + // TODO: Potentially timeout and retry if ICE isn't working. if err != nil { connection.errorChannel <- err return @@ -208,18 +233,17 @@ func dialWebRTC(config *webrtc.Configuration, broker *BrokerChannel) ( // of the data channel, not the remote peer. pc.OnDataChannel = func(channel *webrtc.DataChannel) { log.Println("OnDataChannel") - panic("OnDataChannel") + panic("Unexpected OnDataChannel!") } - // Pipes remain the same even when DataChannel gets switched. - connection.recvPipe, connection.writePipe = io.Pipe() - connection.EstablishDataChannel() - connection.sendOffer() + // TODO: Make this part of a re-establishment loop. + connection.sendOffer() log.Printf("waiting for answer...") answer, ok := <-answerChannel if !ok { + // TODO: Don't just fail, try again! pc.Close() return nil, fmt.Errorf("no answer received") } @@ -230,15 +254,6 @@ func dialWebRTC(config *webrtc.Configuration, broker *BrokerChannel) ( return nil, err } - // Wait until data channel is open; otherwise for example sends may get - // lost. - // TODO: Buffering *should* work though. - _, ok = <-connection.openChannel - if !ok { - pc.Close() - return nil, fmt.Errorf("failed to open data channel") - } - return connection, nil } @@ -247,9 +262,9 @@ func endWebRTC() { if nil == webrtcRemote { return } - if nil != webrtcRemote.dc { + if nil != webrtcRemote.snowflake { log.Printf("WebRTC: closing DataChannel") - webrtcRemote.dc.Close() + webrtcRemote.snowflake.Close() } if nil != webrtcRemote.pc { log.Printf("WebRTC: closing PeerConnection") @@ -332,7 +347,7 @@ func readSignalingMessages(f *os.File) { func main() { var err error - + webrtc.SetLoggingVerbosity(1) flag.StringVar(&brokerURL, "url", "", "URL of signaling broker") flag.StringVar(&frontDomain, "front", "", "front domain") flag.Parse() @@ -368,8 +383,6 @@ func main() { go readSignalingMessages(signalFile) } - webrtc.SetLoggingVerbosity(1) - ptInfo, err = pt.ClientSetup(nil) if err != nil { log.Fatal(err) @@ -417,10 +430,6 @@ func main() { ln.Close() } - // if syscall.SIGTERM == sig || syscall.SIGINT == sig { - // return - // } - // wait for second signal or no more handlers sig = nil for sig == nil && numHandlers != 0 {
participants (1)
-
serene@torproject.org