[tor-commits] [snowflake/master] Use a global RedialPacketConn and smux.Session.

dcf at torproject.org dcf at torproject.org
Thu Apr 23 22:43:24 UTC 2020


commit 2022496d3b6fc76b7725135758c37d7d49546d3d
Author: David Fifield <david at bamsoftware.com>
Date:   Wed Mar 18 18:00:44 2020 -0600

    Use a global RedialPacketConn and smux.Session.
    
    This allows multiple SOCKS connections to share the available proxies,
    and in particular prevents a SOCKS connection from being starved of a
    proxy when the maximum proxy capacity is less then the number of the
    number of SOCKS connections.
    
    This is option 4 from https://bugs.torproject.org/33519.
---
 client/lib/snowflake.go | 78 ++++++++++++++++++++++++++++++++++++++++++++-----
 1 file changed, 71 insertions(+), 7 deletions(-)

diff --git a/client/lib/snowflake.go b/client/lib/snowflake.go
index 4b7dd4d..27991b2 100644
--- a/client/lib/snowflake.go
+++ b/client/lib/snowflake.go
@@ -6,6 +6,7 @@ import (
 	"io"
 	"log"
 	"net"
+	"sync"
 	"time"
 
 	"git.torproject.org/pluggable-transports/snowflake.git/common/turbotunnel"
@@ -23,9 +24,10 @@ type dummyAddr struct{}
 func (addr dummyAddr) Network() string { return "dummy" }
 func (addr dummyAddr) String() string  { return "dummy" }
 
-// Given an accepted SOCKS connection, establish a WebRTC connection to the
-// remote peer and exchange traffic.
-func Handler(socks net.Conn, snowflakes SnowflakeCollector) error {
+// newSession returns a new smux.Session and the net.PacketConn it is running
+// over. The net.PacketConn successively connects through Snowflake proxies
+// pulled from snowflakes.
+func newSession(snowflakes SnowflakeCollector) (net.PacketConn, *smux.Session, error) {
 	clientID := turbotunnel.NewClientID()
 
 	// We build a persistent KCP session on a sequence of ephemeral WebRTC
@@ -54,7 +56,6 @@ func Handler(socks net.Conn, snowflakes SnowflakeCollector) error {
 		return NewEncapsulationPacketConn(dummyAddr{}, dummyAddr{}, conn), nil
 	}
 	pconn := turbotunnel.NewRedialPacketConn(dummyAddr{}, dummyAddr{}, dialContext)
-	defer pconn.Close()
 
 	// conn is built on the underlying RedialPacketConn—when one WebRTC
 	// connection dies, another one will be found to take its place. The
@@ -62,9 +63,9 @@ func Handler(socks net.Conn, snowflakes SnowflakeCollector) error {
 	// engine.
 	conn, err := kcp.NewConn2(dummyAddr{}, nil, 0, 0, pconn)
 	if err != nil {
-		return err
+		pconn.Close()
+		return nil, nil, err
 	}
-	defer conn.Close()
 	// Permit coalescing the payloads of consecutive sends.
 	conn.SetStreamMode(true)
 	// Disable the dynamic congestion window (limit only by the
@@ -81,9 +82,72 @@ func Handler(socks net.Conn, snowflakes SnowflakeCollector) error {
 	smuxConfig.KeepAliveTimeout = 10 * time.Minute
 	sess, err := smux.Client(conn, smuxConfig)
 	if err != nil {
+		conn.Close()
+		pconn.Close()
+		return nil, nil, err
+	}
+
+	return pconn, sess, err
+}
+
+// sessionManager_ maintains a single global smux.Session that is shared among
+// incoming SOCKS connections.
+type sessionManager_ struct {
+	mutex sync.Mutex
+	sess  *smux.Session
+}
+
+// Get creates and returns a new global smux.Session if none exists yet. If one
+// already exists, it returns the existing one. It monitors the returned session
+// and if it ever fails, sets things up so the next call to Get will create a
+// new session.
+func (manager *sessionManager_) Get(snowflakes SnowflakeCollector) (*smux.Session, error) {
+	manager.mutex.Lock()
+	defer manager.mutex.Unlock()
+
+	if manager.sess == nil {
+		log.Printf("starting a new session")
+		pconn, sess, err := newSession(snowflakes)
+		if err != nil {
+			return nil, err
+		}
+		manager.sess = sess
+		go func() {
+			// If the session dies, set it to be recreated.
+			for {
+				<-time.After(5 * time.Second)
+				if sess.IsClosed() {
+					break
+				}
+			}
+			log.Printf("discarding finished session")
+			// Close the underlying to force any ongoing WebRTC
+			// connection to close as well, and relinquish the
+			// SnowflakeCollector.
+			pconn.Close()
+			manager.mutex.Lock()
+			manager.sess = nil
+			manager.mutex.Unlock()
+		}()
+	} else {
+		log.Printf("reusing the existing session")
+	}
+
+	return manager.sess, nil
+}
+
+var sessionManager = sessionManager_{}
+
+// Given an accepted SOCKS connection, establish a WebRTC connection to the
+// remote peer and exchange traffic.
+func Handler(socks net.Conn, snowflakes SnowflakeCollector) error {
+	// Return the global smux.Session.
+	sess, err := sessionManager.Get(snowflakes)
+	if err != nil {
 		return err
 	}
-	defer sess.Close()
+
+	// On the smux session we overlay a stream.
 	stream, err := sess.OpenStream()
 	if err != nil {
 		return err



More information about the tor-commits mailing list