
commit c4215b5614210ec81d728d834f42754efb6d5130 Author: Serene Han <keroserene+git@gmail.com> Date: Thu Feb 18 14:15:22 2016 -0800 log traffic bytes only once every few seconds, along with OnMessage & datachannel.Send counts, to prevent flooded logs --- client/client_test.go | 4 +++ client/snowflake.go | 17 +++++++++---- client/util.go | 67 +++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 83 insertions(+), 5 deletions(-) diff --git a/client/client_test.go b/client/client_test.go index ff15d6d..6ee36d9 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -24,6 +24,10 @@ func TestConnect(t *testing.T) { Convey("WebRTC Connection", func() { c := new(webRTCConn) + c.BytesInfo = &BytesInfo{ + inboundChan: make(chan int), outboundChan: make(chan int), + inbound: 0, outbound: 0, inEvents: 0, outEvents: 0, + } So(c.buffer.Bytes(), ShouldEqual, nil) Convey("SendData buffers when datachannel is nil", func() { diff --git a/client/snowflake.go b/client/snowflake.go index 081cb3b..eba68eb 100644 --- a/client/snowflake.go +++ b/client/snowflake.go @@ -67,6 +67,7 @@ type webRTCConn struct { writePipe *io.PipeWriter buffer bytes.Buffer reset chan struct{} + *BytesInfo } var webrtcRemote *webRTCConn @@ -164,12 +165,12 @@ func (c *webRTCConn) EstablishDataChannel() error { dc.OnOpen = func() { log.Println("WebRTC: DataChannel.OnOpen") // if nil != c.snowflake { - // panic("PeerConnection snowflake already exists.") + // panic("PeerConnection snowflake already exists.") // } // Flush the buffer, then enable datachannel. // TODO: Make this more safe dc.Send(c.buffer.Bytes()) - log.Println("Flushed ", c.buffer.Len(), " bytes") + log.Println("Flushed", c.buffer.Len(), "bytes") c.buffer.Reset() c.snowflake = dc } @@ -184,7 +185,7 @@ func (c *webRTCConn) EstablishDataChannel() error { } } dc.OnMessage = func(msg []byte) { - log.Printf("OnMessage <--- %d bytes", len(msg)) + c.BytesInfo.AddInbound(len(msg)) n, err := c.writePipe.Write(msg) if err != nil { // TODO: Maybe shouldn't actually close. @@ -247,6 +248,7 @@ func (c *webRTCConn) ReceiveAnswer() { } func (c *webRTCConn) sendData(data []byte) { + c.BytesInfo.AddOutbound(len(data)) // Buffer the data in case datachannel isn't available yet. if nil == c.snowflake { log.Printf("Buffered %d bytes --> WebRTC", len(data)) @@ -256,10 +258,9 @@ func (c *webRTCConn) sendData(data []byte) { // Otherwise, flush buffer if necessary. for c.buffer.Len() > 0 { c.snowflake.Send(c.buffer.Bytes()) - log.Println("Flushed ", c.buffer.Len(), " bytes") + log.Println("Flushed", c.buffer.Len(), "bytes") c.buffer.Reset() } - log.Printf("Write %d bytes --> WebRTC", len(data)) c.snowflake.Send(data) } @@ -297,6 +298,12 @@ func dialWebRTC(config *webrtc.Configuration, broker *BrokerChannel) ( connection.answerChannel = make(chan *webrtc.SessionDescription) connection.errorChannel = make(chan error) connection.reset = make(chan struct{}) + connection.BytesInfo = &BytesInfo{ + inboundChan: make(chan int), outboundChan: make(chan int), + inbound: 0, outbound: 0, inEvents: 0, outEvents: 0, + } + go connection.BytesInfo.Log() + // Pipes remain the same even when DataChannel gets switched. connection.recvPipe, connection.writePipe = io.Pipe() diff --git a/client/util.go b/client/util.go new file mode 100644 index 0000000..fa04220 --- /dev/null +++ b/client/util.go @@ -0,0 +1,67 @@ +package main + +import ( + "log" + "time" +) + +type BytesInfo struct { + outboundChan chan int + inboundChan chan int + outbound int + inbound int + outEvents int + inEvents int + isLogging bool +} + +func (b *BytesInfo) Log() { + b.isLogging = true + var amount int + output := func() { + log.Printf("Traffic Bytes (in|out): %d | %d -- (%d OnMessages, %d Sends)", + b.inbound, b.outbound, b.inEvents, b.outEvents) + b.outbound = 0 + b.outEvents = 0 + b.inbound = 0 + b.inEvents = 0 + } + last := time.Now() + for { + select { + case amount = <-b.outboundChan: + b.outbound += amount + b.outEvents++ + last := time.Now() + if time.Since(last) > time.Second*5 { + last = time.Now() + output() + } + case amount = <-b.inboundChan: + b.inbound += amount + b.inEvents++ + if time.Since(last) > time.Second*5 { + last = time.Now() + output() + } + case <-time.After(time.Second * 5): + if b.inEvents > 0 || b.outEvents > 0 { + output() + } + } + } +} + +func (b *BytesInfo) AddOutbound(amount int) { + if !b.isLogging { + return + } + b.outboundChan <- amount +} + +func (b *BytesInfo) AddInbound(amount int) { + if !b.isLogging { + return + } + b.inboundChan <- amount +}