commit c3ada1b54521927809b2ae67b45684dd412dc612 Author: Serene Han keroserene+git@gmail.com Date: Fri Feb 19 16:17:17 2016 -0800
Use a channel to safely synchronize datachannel writes, (#12) clean up ice candidate log message. still need to debug the copy loop break. --- client/client_test.go | 11 +++++++++-- client/snowflake.go | 54 ++++++++++++++++++++++++++++++++++++--------------- 2 files changed, 47 insertions(+), 18 deletions(-)
diff --git a/client/client_test.go b/client/client_test.go index 6ee36d9..85c2144 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -9,10 +9,12 @@ import (
type MockDataChannel struct { destination bytes.Buffer + done chan bool }
func (m *MockDataChannel) Send(data []byte) { m.destination.Write(data) + m.done <- true }
func (*MockDataChannel) Close() error { @@ -24,6 +26,7 @@ func TestConnect(t *testing.T) {
Convey("WebRTC Connection", func() { c := new(webRTCConn) + c.BytesInfo = &BytesInfo{ inboundChan: make(chan int), outboundChan: make(chan int), inbound: 0, outbound: 0, inEvents: 0, outEvents: 0, @@ -31,15 +34,19 @@ func TestConnect(t *testing.T) { So(c.buffer.Bytes(), ShouldEqual, nil)
Convey("SendData buffers when datachannel is nil", func() { - c.sendData([]byte("test")) + c.SendData([]byte("test")) c.snowflake = nil So(c.buffer.Bytes(), ShouldResemble, []byte("test")) })
Convey("SendData sends to datachannel when not nil", func() { mock := new(MockDataChannel) + mock.done = make(chan bool) + go c.SendLoop() + c.writeChannel = make(chan []byte) c.snowflake = mock - c.sendData([]byte("test")) + c.SendData([]byte("test")) + <-mock.done So(c.buffer.Bytes(), ShouldEqual, nil) So(mock.destination.Bytes(), ShouldResemble, []byte("test")) }) diff --git a/client/snowflake.go b/client/snowflake.go index eba68eb..d01ec5e 100644 --- a/client/snowflake.go +++ b/client/snowflake.go @@ -36,12 +36,15 @@ const ( func copyLoop(a, b net.Conn) { var wg sync.WaitGroup wg.Add(2) + // TODO fix the copy loop. go func() { io.Copy(b, a) + log.Println("copy loop b-a break") wg.Done() }() go func() { io.Copy(a, b) + log.Println("copy loop a-b break") wg.Done() }() wg.Wait() @@ -63,6 +66,7 @@ type webRTCConn struct { offerChannel chan *webrtc.SessionDescription answerChannel chan *webrtc.SessionDescription errorChannel chan error + writeChannel chan []byte recvPipe *io.PipeReader writePipe *io.PipeWriter buffer bytes.Buffer @@ -77,7 +81,7 @@ func (c *webRTCConn) Read(b []byte) (int, error) { }
func (c *webRTCConn) Write(b []byte) (int, error) { - c.sendData(b) + c.SendData(b) return len(b), nil }
@@ -133,9 +137,9 @@ func (c *webRTCConn) PreparePeerConnection() { } }() } + // Allow candidates to accumulate until OnIceComplete. pc.OnIceCandidate = func(candidate webrtc.IceCandidate) { - log.Printf("WebRTC: OnIceCandidate %s", candidate.Serialize()) - // Allow candidates to accumulate until OnIceComplete. + log.Printf(candidate.Candidate) } // TODO: This may soon be deprecated, consider OnIceGatheringStateChange. pc.OnIceComplete = func() { @@ -169,10 +173,11 @@ func (c *webRTCConn) EstablishDataChannel() error { // } // Flush the buffer, then enable datachannel. // TODO: Make this more safe - dc.Send(c.buffer.Bytes()) - log.Println("Flushed", c.buffer.Len(), "bytes") - c.buffer.Reset() + // dc.Send(c.buffer.Bytes()) + // log.Println("Flushed", c.buffer.Len(), "bytes") + // c.buffer.Reset() c.snowflake = dc + c.SendData(nil) } dc.OnClose = func() { // Disable the DataChannel as a write destination. @@ -180,7 +185,7 @@ func (c *webRTCConn) EstablishDataChannel() error { log.Println("WebRTC: DataChannel.OnClose") if nil != c.snowflake { c.snowflake = nil - // Only reset if this OnClose triggered + // Only reset if this OnClose was triggered remotely. c.Reset() } } @@ -247,21 +252,32 @@ func (c *webRTCConn) ReceiveAnswer() { }() }
-func (c *webRTCConn) sendData(data []byte) { - c.BytesInfo.AddOutbound(len(data)) +func (c *webRTCConn) SendData(data []byte) { // Buffer the data in case datachannel isn't available yet. if nil == c.snowflake { log.Printf("Buffered %d bytes --> WebRTC", len(data)) c.buffer.Write(data) return } - // Otherwise, flush buffer if necessary. - for c.buffer.Len() > 0 { - c.snowflake.Send(c.buffer.Bytes()) - log.Println("Flushed", c.buffer.Len(), "bytes") - c.buffer.Reset() + go func() { + c.writeChannel <- data + }() +} + +// Expected in own goroutine. +func (c *webRTCConn) SendLoop() { + log.Println("send loop") + for data := range c.writeChannel { + // Flush buffer if necessary. + for c.buffer.Len() > 0 { + c.snowflake.Send(c.buffer.Bytes()) + log.Println("Flushed", c.buffer.Len(), "bytes") + c.buffer.Reset() + } + + c.BytesInfo.AddOutbound(len(data)) + c.snowflake.Send(data) } - c.snowflake.Send(data) }
// WebRTC re-establishment loop. Expected in own goroutine. @@ -296,6 +312,7 @@ func dialWebRTC(config *webrtc.Configuration, broker *BrokerChannel) ( connection.broker = broker connection.offerChannel = make(chan *webrtc.SessionDescription) connection.answerChannel = make(chan *webrtc.SessionDescription) + connection.writeChannel = make(chan []byte) connection.errorChannel = make(chan error) connection.reset = make(chan struct{}) connection.BytesInfo = &BytesInfo{ @@ -308,6 +325,7 @@ func dialWebRTC(config *webrtc.Configuration, broker *BrokerChannel) ( connection.recvPipe, connection.writePipe = io.Pipe()
go connection.ConnectLoop() + go connection.SendLoop() return connection, nil }
@@ -317,8 +335,10 @@ func endWebRTC() { return } if nil != webrtcRemote.snowflake { + s := webrtcRemote.snowflake + webrtcRemote.snowflake = nil log.Printf("WebRTC: closing DataChannel") - webrtcRemote.snowflake.Close() + s.Close() } if nil != webrtcRemote.pc { log.Printf("WebRTC: closing PeerConnection") @@ -333,6 +353,7 @@ func handler(conn *pt.SocksConn) error { handlerChan <- -1 }() defer conn.Close() + log.Println("handler", conn)
// TODO: [#3] Fetch ICE server information from Broker. // TODO: [#18] Consider TURN servers here too. @@ -357,6 +378,7 @@ func handler(conn *pt.SocksConn) error { }
copyLoop(conn, remote) + log.Println("----END---") return nil }