commit d4efe774d109b094b81699a608cd1024e096bb64 Author: Serene Han keroserene+git@gmail.com Date: Tue Feb 23 17:34:51 2016 -0800
remove webRTCConn SendLoop and simplify Write without additional channel, as net.Conn is already safe --- client/client_test.go | 14 ++++++-------- client/snowflake.go | 1 - client/webrtc.go | 52 ++++++++++++++++++--------------------------------- 3 files changed, 24 insertions(+), 43 deletions(-)
diff --git a/client/client_test.go b/client/client_test.go index cad40de..9acf6fc 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -33,19 +33,17 @@ func TestConnect(t *testing.T) { } So(c.buffer.Bytes(), ShouldEqual, nil)
- Convey("sendData buffers when datachannel is nil", func() { - c.sendData([]byte("test")) + Convey("Write buffers when datachannel is nil", func() { + c.Write([]byte("test")) c.snowflake = nil So(c.buffer.Bytes(), ShouldResemble, []byte("test")) })
- Convey("sendData sends to datachannel when not nil", func() { + Convey("Write sends to datachannel when not nil", func() { mock := new(MockDataChannel) - mock.done = make(chan bool) - go c.SendLoop() - c.writeChannel = make(chan []byte) c.snowflake = mock - c.sendData([]byte("test")) + mock.done = make(chan bool, 1) + c.Write([]byte("test")) <-mock.done So(c.buffer.Bytes(), ShouldEqual, nil) So(mock.destination.Bytes(), ShouldResemble, []byte("test")) @@ -54,7 +52,7 @@ func TestConnect(t *testing.T) { Convey("Receive answer sets remote description", func() { c.answerChannel = make(chan *webrtc.SessionDescription) c.config = webrtc.NewConfiguration() - c.PreparePeerConnection() + c.preparePeerConnection() c.receiveAnswer() sdp := webrtc.DeserializeSessionDescription("test") c.answerChannel <- sdp diff --git a/client/snowflake.go b/client/snowflake.go index 29d263a..615038c 100644 --- a/client/snowflake.go +++ b/client/snowflake.go @@ -70,7 +70,6 @@ func dialWebRTC() (*webRTCConn, error) {
connection := NewWebRTCConnection(config, broker) go connection.ConnectLoop() - go connection.SendLoop()
return connection, nil } diff --git a/client/webrtc.go b/client/webrtc.go index 89d1592..0c865fa 100644 --- a/client/webrtc.go +++ b/client/webrtc.go @@ -19,7 +19,6 @@ type webRTCConn struct { offerChannel chan *webrtc.SessionDescription answerChannel chan *webrtc.SessionDescription errorChannel chan error - writeChannel chan []byte recvPipe *io.PipeReader writePipe *io.PipeWriter buffer bytes.Buffer @@ -33,8 +32,15 @@ func (c *webRTCConn) Read(b []byte) (int, error) { return c.recvPipe.Read(b) }
+// Writes bytes out to the snowflake proxy. func (c *webRTCConn) Write(b []byte) (int, error) { - c.sendData(b) + c.BytesInfo.AddOutbound(len(b)) + if nil == c.snowflake { + log.Printf("Buffered %d bytes --> WebRTC", len(b)) + c.buffer.Write(b) + } else { + c.snowflake.Send(b) + } return len(b), nil }
@@ -64,19 +70,18 @@ func (c *webRTCConn) SetWriteDeadline(t time.Time) error { }
func NewWebRTCConnection(config *webrtc.Configuration, - broker *BrokerChannel) *webRTCConn { + broker *BrokerChannel) *webRTCConn { connection := new(webRTCConn) connection.config = config connection.broker = broker - connection.offerChannel = make(chan *webrtc.SessionDescription) - connection.answerChannel = make(chan *webrtc.SessionDescription) - connection.writeChannel = make(chan []byte) - connection.errorChannel = make(chan error) - connection.reset = make(chan struct{}) + connection.offerChannel = make(chan *webrtc.SessionDescription, 1) + connection.answerChannel = make(chan *webrtc.SessionDescription, 1) + connection.errorChannel = make(chan error, 1) + connection.reset = make(chan struct{}, 1)
// Log every few seconds. connection.BytesInfo = &BytesInfo{ - inboundChan: make(chan int), outboundChan: make(chan int), + inboundChan: make(chan int, 5), outboundChan: make(chan int, 5), inbound: 0, outbound: 0, inEvents: 0, outEvents: 0, } go connection.BytesInfo.Log() @@ -106,20 +111,6 @@ func (c *webRTCConn) ConnectLoop() { } }
-// Expected in own goroutine. -func (c *webRTCConn) SendLoop() { - log.Println("send loop") - for data := range c.writeChannel { - // Flush buffer if necessary. - for c.buffer.Len() > 0 { - c.snowflake.Send(c.buffer.Bytes()) - log.Println("Flushed", c.buffer.Len(), "bytes") - c.buffer.Reset() - } - c.snowflake.Send(data) - } -} - func (c *webRTCConn) preparePeerConnection() { if nil != c.pc { c.pc.Close() @@ -185,6 +176,7 @@ func (c *webRTCConn) establishDataChannel() error { dc.Send(c.buffer.Bytes()) log.Println("Flushed", c.buffer.Len(), "bytes") c.buffer.Reset() + c.snowflake = dc } dc.OnClose = func() { @@ -198,6 +190,9 @@ func (c *webRTCConn) establishDataChannel() error { } } dc.OnMessage = func(msg []byte) { + if len(msg) <= 0 { + log.Println("0 length---") + } c.BytesInfo.AddInbound(len(msg)) n, err := c.writePipe.Write(msg) if err != nil { @@ -261,17 +256,6 @@ func (c *webRTCConn) receiveAnswer() { }() }
-func (c *webRTCConn) sendData(data []byte) { - c.BytesInfo.AddOutbound(len(data)) - // Buffer the data in case datachannel isn't available yet. - if nil == c.snowflake { - log.Printf("Buffered %d bytes --> WebRTC", len(data)) - c.buffer.Write(data) - return - } - c.writeChannel <- data -} - func (c *webRTCConn) Reset() { go func() { c.reset <- struct{}{} // Attempt to negotiate a new datachannel..
tor-commits@lists.torproject.org