commit ac9d49b8727b953c12a76e3645fe71a9ec3aab75 Author: Serene H git@keroserene.net Date: Mon Aug 1 12:17:28 2016 -0700
ensure closing stale remotes from the client side --- client/snowflake.go | 3 +- client/webrtc.go | 81 ++++++++++++++++++++++++++++++++++------------------- 2 files changed, 54 insertions(+), 30 deletions(-)
diff --git a/client/snowflake.go b/client/snowflake.go index 8dfa390..75e999f 100644 --- a/client/snowflake.go +++ b/client/snowflake.go @@ -20,6 +20,7 @@ import ( const ( ReconnectTimeout = 10 DefaultSnowflakeCapacity = 1 + SnowflakeTimeout = 30 )
// When a connection handler starts, +1 is written to this channel; when it @@ -81,7 +82,7 @@ func handler(socks SocksConnector, snowflakes SnowflakeCollector) error { return errors.New("handler: Received invalid Snowflake") } defer socks.Close() - defer snowflake.Reset() + defer snowflake.Close() log.Println("---- Handler: snowflake assigned ----") err := socks.Grant(&net.TCPAddr{IP: net.IPv4zero, Port: 0}) if err != nil { diff --git a/client/webrtc.go b/client/webrtc.go index 1f7ac00..0492466 100644 --- a/client/webrtc.go +++ b/client/webrtc.go @@ -29,6 +29,7 @@ type WebRTCPeer struct { errorChannel chan error recvPipe *io.PipeReader writePipe *io.PipeWriter + lastReceive time.Time buffer bytes.Buffer reset chan struct{}
@@ -37,6 +38,28 @@ type WebRTCPeer struct { BytesLogger }
+// Construct a WebRTC PeerConnection. +func NewWebRTCPeer(config *webrtc.Configuration, + broker *BrokerChannel) *WebRTCPeer { + connection := new(WebRTCPeer) + connection.id = "snowflake-" + uniuri.New() + connection.config = config + connection.broker = broker + connection.offerChannel = make(chan *webrtc.SessionDescription, 1) + connection.answerChannel = make(chan *webrtc.SessionDescription, 1) + // Error channel is mostly for reporting during the initial SDP offer + // creation & local description setting, which happens asynchronously. + connection.errorChannel = make(chan error, 1) + connection.reset = make(chan struct{}, 1) + + // Override with something that's not NullLogger to have real logging. + connection.BytesLogger = &BytesNullLogger{} + + // Pipes remain the same even when DataChannel gets switched. + connection.recvPipe, connection.writePipe = io.Pipe() + return connection +} + // Read bytes from local SOCKS. // As part of |io.ReadWriter| func (c *WebRTCPeer) Read(b []byte) (int, error) { @@ -47,6 +70,7 @@ func (c *WebRTCPeer) Read(b []byte) (int, error) { // As part of |io.ReadWriter| func (c *WebRTCPeer) Write(b []byte) (int, error) { c.BytesLogger.AddOutbound(len(b)) + // TODO: Buffering could be improved / separated out of WebRTCPeer. if nil == c.transport { log.Printf("Buffered %d bytes --> WebRTC", len(b)) c.buffer.Write(b) @@ -61,45 +85,42 @@ func (c *WebRTCPeer) Close() error { if c.closed { // Skip if already closed. return nil } - log.Printf("WebRTC: Closing") - c.cleanup() // Mark for deletion. c.closed = true + c.cleanup() + c.Reset() + log.Printf("WebRTC: Closing") return nil }
// As part of |Resetter| func (c *WebRTCPeer) Reset() { - c.Close() - go func() { - c.reset <- struct{}{} - log.Println("WebRTC resetting...") - }() + if nil == c.reset { + return + } + c.reset <- struct{}{} }
// As part of |Resetter| func (c *WebRTCPeer) WaitForReset() { <-c.reset }
-// Construct a WebRTC PeerConnection. -func NewWebRTCPeer(config *webrtc.Configuration, - broker *BrokerChannel) *WebRTCPeer { - connection := new(WebRTCPeer) - connection.id = "snowflake-" + uniuri.New() - connection.config = config - connection.broker = broker - connection.offerChannel = make(chan *webrtc.SessionDescription, 1) - connection.answerChannel = make(chan *webrtc.SessionDescription, 1) - // Error channel is mostly for reporting during the initial SDP offer - // creation & local description setting, which happens asynchronously. - connection.errorChannel = make(chan error, 1) - connection.reset = make(chan struct{}, 1) - - // Override with something that's not NullLogger to have real logging. - connection.BytesLogger = &BytesNullLogger{} - - // Pipes remain the same even when DataChannel gets switched. - connection.recvPipe, connection.writePipe = io.Pipe() - return connection +// Prevent long-lived broken remotes. +// Should also update the DataChannel in underlying go-webrtc's to make Closes +// more immediate / responsive. +func (c *WebRTCPeer) checkForStaleness() { + c.lastReceive = time.Now() + for { + if c.closed { + return + } + if time.Since(c.lastReceive).Seconds() > SnowflakeTimeout { + log.Println("WebRTC: No messages received for", SnowflakeTimeout, + "seconds -- closing stale connection.") + c.Close() + return + } + <-time.After(time.Second) + } }
// As part of |Connector| interface. @@ -119,6 +140,7 @@ func (c *WebRTCPeer) Connect() error { if err != nil { return err } + go c.checkForStaleness() return nil }
@@ -208,7 +230,7 @@ func (c *WebRTCPeer) establishDataChannel() error { // Disable the DataChannel as a write destination. log.Println("WebRTC: DataChannel.OnClose [remotely]") c.transport = nil - c.Reset() + c.Close() } dc.OnMessage = func(msg []byte) { if len(msg) <= 0 { @@ -225,6 +247,7 @@ func (c *WebRTCPeer) establishDataChannel() error { log.Println("Error: short write") panic("short write") } + c.lastReceive = time.Now() } log.Println("WebRTC: DataChannel created.") return nil @@ -257,7 +280,7 @@ func (c *WebRTCPeer) exchangeSDP() error { } case err := <-c.errorChannel: log.Println("Failed to prepare offer", err) - c.Reset() + c.Close() return err } // Keep trying the same offer until a valid answer arrives.
tor-commits@lists.torproject.org