commit 7187f1009ef7aaae6aa557fe1f724aa1df718b24
Author: Cecylia Bocovich <cohosh(a)torproject.org>
Date: Mon Jan 25 13:01:37 2021 -0500
Log a throughput summary for each connection
This will increase transparency for people running standalone proxies
and help us debug any potential issues with proxies behaving unreliably.
---
proxy/snowflake.go | 6 ++++
proxy/util.go | 84 ++++++++++++++++++++++++++++++++++++++++++++++++++++++
2 files changed, 90 insertions(+)
diff --git a/proxy/snowflake.go b/proxy/snowflake.go
index 1bc21ab..86ae0b2 100644
--- a/proxy/snowflake.go
+++ b/proxy/snowflake.go
@@ -118,6 +118,8 @@ type webRTCConn struct {
lock sync.Mutex // Synchronization for DataChannel destruction
once sync.Once // Synchronization for PeerConnection destruction
+
+ bytesLogger BytesLogger
}
func (c *webRTCConn) Read(b []byte) (int, error) {
@@ -125,6 +127,7 @@ func (c *webRTCConn) Read(b []byte) (int, error) {
}
func (c *webRTCConn) Write(b []byte) (int, error) {
+ c.bytesLogger.AddInbound(len(b))
c.lock.Lock()
defer c.lock.Unlock()
if c.dc != nil {
@@ -368,6 +371,7 @@ func makePeerConnectionFromOffer(sdp *webrtc.SessionDescription,
pr, pw := io.Pipe()
conn := &webRTCConn{pc: pc, dc: dc, pr: pr}
+ conn.bytesLogger = NewBytesSyncLogger()
dc.OnOpen(func() {
log.Println("OnOpen channel")
@@ -376,6 +380,7 @@ func makePeerConnectionFromOffer(sdp *webrtc.SessionDescription,
conn.lock.Lock()
defer conn.lock.Unlock()
log.Println("OnClose channel")
+ log.Println(conn.bytesLogger.ThroughputSummary())
conn.dc = nil
dc.Close()
pw.Close()
@@ -388,6 +393,7 @@ func makePeerConnectionFromOffer(sdp *webrtc.SessionDescription,
log.Printf("close with error generated an error: %v", inerr)
}
}
+ conn.bytesLogger.AddOutbound(n)
if n != len(msg.Data) {
panic("short write")
}
diff --git a/proxy/util.go b/proxy/util.go
new file mode 100644
index 0000000..d737056
--- /dev/null
+++ b/proxy/util.go
@@ -0,0 +1,84 @@
+package main
+
+import (
+ "fmt"
+ "time"
+)
+
+type BytesLogger interface {
+ AddOutbound(int)
+ AddInbound(int)
+ ThroughputSummary() string
+}
+
+// Default BytesLogger does nothing.
+type BytesNullLogger struct{}
+
+func (b BytesNullLogger) AddOutbound(amount int) {}
+func (b BytesNullLogger) AddInbound(amount int) {}
+func (b BytesNullLogger) ThroughputSummary() string { return "" }
+
+// BytesSyncLogger uses channels to safely log from multiple sources with output
+// occuring at reasonable intervals.
+type BytesSyncLogger struct {
+ outboundChan, inboundChan chan int
+ outbound, inbound, outEvents, inEvents int
+ start time.Time
+}
+
+// NewBytesSyncLogger returns a new BytesSyncLogger and starts it loggin.
+func NewBytesSyncLogger() *BytesSyncLogger {
+ b := &BytesSyncLogger{
+ outboundChan: make(chan int, 5),
+ inboundChan: make(chan int, 5),
+ }
+ go b.log()
+ b.start = time.Now()
+ return b
+}
+
+func (b *BytesSyncLogger) log() {
+ for {
+ select {
+ case amount := <-b.outboundChan:
+ b.outbound += amount
+ b.outEvents++
+ case amount := <-b.inboundChan:
+ b.inbound += amount
+ b.inEvents++
+ }
+ }
+}
+
+func (b *BytesSyncLogger) AddOutbound(amount int) {
+ b.outboundChan <- amount
+}
+
+func (b *BytesSyncLogger) AddInbound(amount int) {
+ b.inboundChan <- amount
+}
+
+func (b *BytesSyncLogger) ThroughputSummary() string {
+ var inUnit, outUnit string
+ units := []string{"B", "KB", "MB", "GB"}
+
+ inbound := b.inbound
+ outbound := b.outbound
+
+ for i, u := range units {
+ inUnit = u
+ if (inbound < 1000) || (i == len(units)-1) {
+ break
+ }
+ inbound = inbound / 1000
+ }
+ for i, u := range units {
+ outUnit = u
+ if (outbound < 1000) || (i == len(units)-1) {
+ break
+ }
+ outbound = outbound / 1000
+ }
+ t := time.Now()
+ return fmt.Sprintf("Traffic throughput (up|down): %d %s|%d %s -- (%d OnMessages, %d Sends, over %d seconds)", inbound, inUnit, outbound, outUnit, b.outEvents, b.inEvents, int(t.Sub(b.start).Seconds()))
+}