[tor-commits] [snowflake/master] log traffic bytes only once every few seconds, along with OnMessage & datachannel.Send counts, to prevent flooded logs

serene at torproject.org serene at torproject.org
Thu Feb 18 22:15:38 UTC 2016


commit c4215b5614210ec81d728d834f42754efb6d5130
Author: Serene Han <keroserene+git at gmail.com>
Date:   Thu Feb 18 14:15:22 2016 -0800

    log traffic bytes only once every few seconds, along with OnMessage & datachannel.Send counts, to prevent flooded logs
---
 client/client_test.go |  4 +++
 client/snowflake.go   | 17 +++++++++----
 client/util.go        | 67 +++++++++++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 83 insertions(+), 5 deletions(-)

diff --git a/client/client_test.go b/client/client_test.go
index ff15d6d..6ee36d9 100644
--- a/client/client_test.go
+++ b/client/client_test.go
@@ -24,6 +24,10 @@ func TestConnect(t *testing.T) {
 
 		Convey("WebRTC Connection", func() {
 			c := new(webRTCConn)
+			c.BytesInfo = &BytesInfo{
+				inboundChan: make(chan int), outboundChan: make(chan int),
+				inbound: 0, outbound: 0, inEvents: 0, outEvents: 0,
+			}
 			So(c.buffer.Bytes(), ShouldEqual, nil)
 
 			Convey("SendData buffers when datachannel is nil", func() {
diff --git a/client/snowflake.go b/client/snowflake.go
index 081cb3b..eba68eb 100644
--- a/client/snowflake.go
+++ b/client/snowflake.go
@@ -67,6 +67,7 @@ type webRTCConn struct {
 	writePipe     *io.PipeWriter
 	buffer        bytes.Buffer
 	reset         chan struct{}
+	*BytesInfo
 }
 
 var webrtcRemote *webRTCConn
@@ -164,12 +165,12 @@ func (c *webRTCConn) EstablishDataChannel() error {
 	dc.OnOpen = func() {
 		log.Println("WebRTC: DataChannel.OnOpen")
 		// if nil != c.snowflake {
-			// panic("PeerConnection snowflake already exists.")
+		// panic("PeerConnection snowflake already exists.")
 		// }
 		// Flush the buffer, then enable datachannel.
 		// TODO: Make this more safe
 		dc.Send(c.buffer.Bytes())
-		log.Println("Flushed ", c.buffer.Len(), " bytes")
+		log.Println("Flushed", c.buffer.Len(), "bytes")
 		c.buffer.Reset()
 		c.snowflake = dc
 	}
@@ -184,7 +185,7 @@ func (c *webRTCConn) EstablishDataChannel() error {
 		}
 	}
 	dc.OnMessage = func(msg []byte) {
-		log.Printf("OnMessage <--- %d bytes", len(msg))
+		c.BytesInfo.AddInbound(len(msg))
 		n, err := c.writePipe.Write(msg)
 		if err != nil {
 			// TODO: Maybe shouldn't actually close.
@@ -247,6 +248,7 @@ func (c *webRTCConn) ReceiveAnswer() {
 }
 
 func (c *webRTCConn) sendData(data []byte) {
+	c.BytesInfo.AddOutbound(len(data))
 	// Buffer the data in case datachannel isn't available yet.
 	if nil == c.snowflake {
 		log.Printf("Buffered %d bytes --> WebRTC", len(data))
@@ -256,10 +258,9 @@ func (c *webRTCConn) sendData(data []byte) {
 	// Otherwise, flush buffer if necessary.
 	for c.buffer.Len() > 0 {
 		c.snowflake.Send(c.buffer.Bytes())
-		log.Println("Flushed ", c.buffer.Len(), " bytes")
+		log.Println("Flushed", c.buffer.Len(), "bytes")
 		c.buffer.Reset()
 	}
-	log.Printf("Write %d bytes --> WebRTC", len(data))
 	c.snowflake.Send(data)
 }
 
@@ -297,6 +298,12 @@ func dialWebRTC(config *webrtc.Configuration, broker *BrokerChannel) (
 	connection.answerChannel = make(chan *webrtc.SessionDescription)
 	connection.errorChannel = make(chan error)
 	connection.reset = make(chan struct{})
+	connection.BytesInfo = &BytesInfo{
+		inboundChan: make(chan int), outboundChan: make(chan int),
+		inbound: 0, outbound: 0, inEvents: 0, outEvents: 0,
+	}
+	go connection.BytesInfo.Log()
+
 	// Pipes remain the same even when DataChannel gets switched.
 	connection.recvPipe, connection.writePipe = io.Pipe()
 
diff --git a/client/util.go b/client/util.go
new file mode 100644
index 0000000..fa04220
--- /dev/null
+++ b/client/util.go
@@ -0,0 +1,67 @@
+package main
+
+import (
+	"log"
+	"time"
+)
+
+type BytesInfo struct {
+	outboundChan chan int
+	inboundChan  chan int
+	outbound     int
+	inbound      int
+	outEvents    int
+	inEvents     int
+	isLogging    bool
+}
+
+func (b *BytesInfo) Log() {
+	b.isLogging = true
+	var amount int
+	output := func() {
+		log.Printf("Traffic Bytes (in|out): %d | %d -- (%d OnMessages, %d Sends)",
+			b.inbound, b.outbound, b.inEvents, b.outEvents)
+		b.outbound = 0
+		b.outEvents = 0
+		b.inbound = 0
+		b.inEvents = 0
+	}
+	last := time.Now()
+	for {
+		select {
+		case amount = <-b.outboundChan:
+			b.outbound += amount
+			b.outEvents++
+			last := time.Now()
+			if time.Since(last) > time.Second*5 {
+				last = time.Now()
+				output()
+			}
+		case amount = <-b.inboundChan:
+			b.inbound += amount
+			b.inEvents++
+			if time.Since(last) > time.Second*5 {
+				last = time.Now()
+				output()
+			}
+		case <-time.After(time.Second * 5):
+			if b.inEvents > 0 || b.outEvents > 0 {
+				output()
+			}
+		}
+	}
+}
+
+func (b *BytesInfo) AddOutbound(amount int) {
+	if !b.isLogging {
+		return
+	}
+	b.outboundChan <- amount
+}
+
+func (b *BytesInfo) AddInbound(amount int) {
+	if !b.isLogging {
+		return
+	}
+	b.inboundChan <- amount
+}



More information about the tor-commits mailing list