commit d4efe774d109b094b81699a608cd1024e096bb64
Author: Serene Han <keroserene+git(a)gmail.com>
Date: Tue Feb 23 17:34:51 2016 -0800
remove webRTCConn SendLoop and simplify Write without additional channel, as net.Conn is already safe
---
client/client_test.go | 14 ++++++--------
client/snowflake.go | 1 -
client/webrtc.go | 52 ++++++++++++++++++---------------------------------
3 files changed, 24 insertions(+), 43 deletions(-)
diff --git a/client/client_test.go b/client/client_test.go
index cad40de..9acf6fc 100644
--- a/client/client_test.go
+++ b/client/client_test.go
@@ -33,19 +33,17 @@ func TestConnect(t *testing.T) {
}
So(c.buffer.Bytes(), ShouldEqual, nil)
- Convey("sendData buffers when datachannel is nil", func() {
- c.sendData([]byte("test"))
+ Convey("Write buffers when datachannel is nil", func() {
+ c.Write([]byte("test"))
c.snowflake = nil
So(c.buffer.Bytes(), ShouldResemble, []byte("test"))
})
- Convey("sendData sends to datachannel when not nil", func() {
+ Convey("Write sends to datachannel when not nil", func() {
mock := new(MockDataChannel)
- mock.done = make(chan bool)
- go c.SendLoop()
- c.writeChannel = make(chan []byte)
c.snowflake = mock
- c.sendData([]byte("test"))
+ mock.done = make(chan bool, 1)
+ c.Write([]byte("test"))
<-mock.done
So(c.buffer.Bytes(), ShouldEqual, nil)
So(mock.destination.Bytes(), ShouldResemble, []byte("test"))
@@ -54,7 +52,7 @@ func TestConnect(t *testing.T) {
Convey("Receive answer sets remote description", func() {
c.answerChannel = make(chan *webrtc.SessionDescription)
c.config = webrtc.NewConfiguration()
- c.PreparePeerConnection()
+ c.preparePeerConnection()
c.receiveAnswer()
sdp := webrtc.DeserializeSessionDescription("test")
c.answerChannel <- sdp
diff --git a/client/snowflake.go b/client/snowflake.go
index 29d263a..615038c 100644
--- a/client/snowflake.go
+++ b/client/snowflake.go
@@ -70,7 +70,6 @@ func dialWebRTC() (*webRTCConn, error) {
connection := NewWebRTCConnection(config, broker)
go connection.ConnectLoop()
- go connection.SendLoop()
return connection, nil
}
diff --git a/client/webrtc.go b/client/webrtc.go
index 89d1592..0c865fa 100644
--- a/client/webrtc.go
+++ b/client/webrtc.go
@@ -19,7 +19,6 @@ type webRTCConn struct {
offerChannel chan *webrtc.SessionDescription
answerChannel chan *webrtc.SessionDescription
errorChannel chan error
- writeChannel chan []byte
recvPipe *io.PipeReader
writePipe *io.PipeWriter
buffer bytes.Buffer
@@ -33,8 +32,15 @@ func (c *webRTCConn) Read(b []byte) (int, error) {
return c.recvPipe.Read(b)
}
+// Writes bytes out to the snowflake proxy.
func (c *webRTCConn) Write(b []byte) (int, error) {
- c.sendData(b)
+ c.BytesInfo.AddOutbound(len(b))
+ if nil == c.snowflake {
+ log.Printf("Buffered %d bytes --> WebRTC", len(b))
+ c.buffer.Write(b)
+ } else {
+ c.snowflake.Send(b)
+ }
return len(b), nil
}
@@ -64,19 +70,18 @@ func (c *webRTCConn) SetWriteDeadline(t time.Time) error {
}
func NewWebRTCConnection(config *webrtc.Configuration,
- broker *BrokerChannel) *webRTCConn {
+ broker *BrokerChannel) *webRTCConn {
connection := new(webRTCConn)
connection.config = config
connection.broker = broker
- connection.offerChannel = make(chan *webrtc.SessionDescription)
- connection.answerChannel = make(chan *webrtc.SessionDescription)
- connection.writeChannel = make(chan []byte)
- connection.errorChannel = make(chan error)
- connection.reset = make(chan struct{})
+ connection.offerChannel = make(chan *webrtc.SessionDescription, 1)
+ connection.answerChannel = make(chan *webrtc.SessionDescription, 1)
+ connection.errorChannel = make(chan error, 1)
+ connection.reset = make(chan struct{}, 1)
// Log every few seconds.
connection.BytesInfo = &BytesInfo{
- inboundChan: make(chan int), outboundChan: make(chan int),
+ inboundChan: make(chan int, 5), outboundChan: make(chan int, 5),
inbound: 0, outbound: 0, inEvents: 0, outEvents: 0,
}
go connection.BytesInfo.Log()
@@ -106,20 +111,6 @@ func (c *webRTCConn) ConnectLoop() {
}
}
-// Expected in own goroutine.
-func (c *webRTCConn) SendLoop() {
- log.Println("send loop")
- for data := range c.writeChannel {
- // Flush buffer if necessary.
- for c.buffer.Len() > 0 {
- c.snowflake.Send(c.buffer.Bytes())
- log.Println("Flushed", c.buffer.Len(), "bytes")
- c.buffer.Reset()
- }
- c.snowflake.Send(data)
- }
-}
-
func (c *webRTCConn) preparePeerConnection() {
if nil != c.pc {
c.pc.Close()
@@ -185,6 +176,7 @@ func (c *webRTCConn) establishDataChannel() error {
dc.Send(c.buffer.Bytes())
log.Println("Flushed", c.buffer.Len(), "bytes")
c.buffer.Reset()
+
c.snowflake = dc
}
dc.OnClose = func() {
@@ -198,6 +190,9 @@ func (c *webRTCConn) establishDataChannel() error {
}
}
dc.OnMessage = func(msg []byte) {
+ if len(msg) <= 0 {
+ log.Println("0 length---")
+ }
c.BytesInfo.AddInbound(len(msg))
n, err := c.writePipe.Write(msg)
if err != nil {
@@ -261,17 +256,6 @@ func (c *webRTCConn) receiveAnswer() {
}()
}
-func (c *webRTCConn) sendData(data []byte) {
- c.BytesInfo.AddOutbound(len(data))
- // Buffer the data in case datachannel isn't available yet.
- if nil == c.snowflake {
- log.Printf("Buffered %d bytes --> WebRTC", len(data))
- c.buffer.Write(data)
- return
- }
- c.writeChannel <- data
-}
-
func (c *webRTCConn) Reset() {
go func() {
c.reset <- struct{}{} // Attempt to negotiate a new datachannel..