commit 1114acbcb4acb82174b293983ced8afcaf9e2a93 Author: Arlo Breault arlolra@gmail.com Date: Wed Mar 14 13:35:39 2018 -0400
Add synchronization around destroying DataChannels and PeerConnections
From https://trac.torproject.org/projects/tor/ticket/21312#comment:33 --- client/webrtc.go | 36 ++++++++++++++++++++++++++++-------- proxy-go/snowflake.go | 17 +++++++++++++---- server-webrtc/snowflake.go | 17 +++++++++++++---- 3 files changed, 54 insertions(+), 16 deletions(-)
diff --git a/client/webrtc.go b/client/webrtc.go index e35c47d..8c7cb4c 100644 --- a/client/webrtc.go +++ b/client/webrtc.go @@ -5,6 +5,7 @@ import ( "errors" "io" "log" + "sync" "time"
"github.com/dchest/uniuri" @@ -35,6 +36,9 @@ type WebRTCPeer struct {
closed bool
+ lock sync.Mutex // Synchronization for DataChannel destruction + once sync.Once // Synchronization for PeerConnection destruction + BytesLogger }
@@ -69,6 +73,8 @@ func (c *WebRTCPeer) Read(b []byte) (int, error) { // Writes bytes out to remote WebRTC. // As part of |io.ReadWriter| func (c *WebRTCPeer) Write(b []byte) (int, error) { + c.lock.Lock() + defer c.lock.Unlock() c.BytesLogger.AddOutbound(len(b)) // TODO: Buffering could be improved / separated out of WebRTCPeer. if nil == c.transport { @@ -82,14 +88,12 @@ func (c *WebRTCPeer) Write(b []byte) (int, error) {
// As part of |Snowflake| func (c *WebRTCPeer) Close() error { - if c.closed { // Skip if already closed. - return nil - } - // Mark for deletion. - c.closed = true - c.cleanup() - c.Reset() - log.Printf("WebRTC: Closing") + c.once.Do(func() { + c.closed = true + c.cleanup() + c.Reset() + log.Printf("WebRTC: Closing") + }) return nil }
@@ -194,6 +198,8 @@ func (c *WebRTCPeer) preparePeerConnection() error {
// Create a WebRTC DataChannel locally. func (c *WebRTCPeer) establishDataChannel() error { + c.lock.Lock() + defer c.lock.Unlock() if c.transport != nil { panic("Unexpected datachannel already exists!") } @@ -206,6 +212,8 @@ func (c *WebRTCPeer) establishDataChannel() error { return err } dc.OnOpen = func() { + c.lock.Lock() + defer c.lock.Unlock() log.Println("WebRTC: DataChannel.OnOpen") if nil != c.transport { panic("WebRTC: transport already exists.") @@ -220,10 +228,12 @@ func (c *WebRTCPeer) establishDataChannel() error { c.transport = dc } dc.OnClose = func() { + c.lock.Lock() // Future writes will go to the buffer until a new DataChannel is available. if nil == c.transport { // Closed locally, as part of a reset. log.Println("WebRTC: DataChannel.OnClose [locally]") + c.lock.Unlock() return } // Closed remotely, need to reset everything. @@ -231,6 +241,9 @@ func (c *WebRTCPeer) establishDataChannel() error { log.Println("WebRTC: DataChannel.OnClose [remotely]") c.transport = nil c.pc.DeleteDataChannel(dc) + // Unlock before Close'ing, since it calls cleanup and asks for the + // lock to check if the transport needs to be be deleted. + c.lock.Unlock() c.Close() } dc.OnMessage = func(msg []byte) { @@ -321,16 +334,23 @@ func (c *WebRTCPeer) cleanup() { c.writePipe.Close() c.writePipe = nil } + c.lock.Lock() if nil != c.transport { log.Printf("WebRTC: closing DataChannel") dataChannel := c.transport // Setting transport to nil *before* dc Close indicates to OnClose that // this was locally triggered. c.transport = nil + // Release the lock before calling DeleteDataChannel (which in turn + // calls Close on the dataChannel), but after nil'ing out the transport, + // since otherwise we'll end up in the onClose handler in a deadlock. + c.lock.Unlock() if c.pc == nil { panic("DataChannel w/o PeerConnection, not good.") } c.pc.DeleteDataChannel(dataChannel.(*webrtc.DataChannel)) + } else { + c.lock.Unlock() } if nil != c.pc { log.Printf("WebRTC: closing PeerConnection") diff --git a/proxy-go/snowflake.go b/proxy-go/snowflake.go index a12cfb2..5a5925e 100644 --- a/proxy-go/snowflake.go +++ b/proxy-go/snowflake.go @@ -62,6 +62,9 @@ type webRTCConn struct { dc *webrtc.DataChannel pc *webrtc.PeerConnection pr *io.PipeReader + + lock sync.Mutex // Synchronization for DataChannel destruction + once sync.Once // Synchronization for PeerConnection destruction }
func (c *webRTCConn) Read(b []byte) (int, error) { @@ -69,6 +72,8 @@ func (c *webRTCConn) Read(b []byte) (int, error) { }
func (c *webRTCConn) Write(b []byte) (int, error) { + c.lock.Lock() + defer c.lock.Unlock() // log.Printf("webrtc Write %d %+q", len(b), string(b)) log.Printf("Write %d bytes --> WebRTC", len(b)) if c.dc != nil { @@ -77,8 +82,11 @@ func (c *webRTCConn) Write(b []byte) (int, error) { return len(b), nil }
-func (c *webRTCConn) Close() error { - return c.pc.Destroy() +func (c *webRTCConn) Close() (err error) { + c.once.Do(func() { + err = c.pc.Destroy() + }) + return }
func (c *webRTCConn) LocalAddr() net.Addr { @@ -255,17 +263,18 @@ func makePeerConnectionFromOffer(sdp *webrtc.SessionDescription, config *webrtc. log.Println("OnDataChannel")
pr, pw := io.Pipe() - conn := &webRTCConn{pc: pc, dc: dc, pr: pr}
dc.OnOpen = func() { log.Println("OnOpen channel") } dc.OnClose = func() { + conn.lock.Lock() + defer conn.lock.Unlock() log.Println("OnClose channel") - pw.Close() conn.dc = nil pc.DeleteDataChannel(dc) + pw.Close() } dc.OnMessage = func(msg []byte) { log.Printf("OnMessage <--- %d bytes", len(msg)) diff --git a/server-webrtc/snowflake.go b/server-webrtc/snowflake.go index 82b6afe..0484d94 100644 --- a/server-webrtc/snowflake.go +++ b/server-webrtc/snowflake.go @@ -43,6 +43,9 @@ type webRTCConn struct { dc *webrtc.DataChannel pc *webrtc.PeerConnection pr *io.PipeReader + + lock sync.Mutex // Synchronization for DataChannel destruction + once sync.Once // Synchronization for PeerConnection destruction }
func (c *webRTCConn) Read(b []byte) (int, error) { @@ -50,6 +53,8 @@ func (c *webRTCConn) Read(b []byte) (int, error) { }
func (c *webRTCConn) Write(b []byte) (int, error) { + c.lock.Lock() + defer c.lock.Unlock() // log.Printf("webrtc Write %d %+q", len(b), string(b)) log.Printf("Write %d bytes --> WebRTC", len(b)) if c.dc != nil { @@ -58,8 +63,11 @@ func (c *webRTCConn) Write(b []byte) (int, error) { return len(b), nil }
-func (c *webRTCConn) Close() error { - return c.pc.Destroy() +func (c *webRTCConn) Close() (err error) { + c.once.Do(func() { + err = c.pc.Destroy() + }) + return }
func (c *webRTCConn) LocalAddr() net.Addr { @@ -122,17 +130,18 @@ func makePeerConnectionFromOffer(sdp *webrtc.SessionDescription, config *webrtc. log.Println("OnDataChannel")
pr, pw := io.Pipe() - conn := &webRTCConn{pc: pc, dc: dc, pr: pr}
dc.OnOpen = func() { log.Println("OnOpen channel") } dc.OnClose = func() { + conn.lock.Lock() + defer conn.lock.Unlock() log.Println("OnClose channel") - pw.Close() conn.dc = nil pc.DeleteDataChannel(dc) + pw.Close() } dc.OnMessage = func(msg []byte) { log.Printf("OnMessage <--- %d bytes", len(msg))