commit 2022496d3b6fc76b7725135758c37d7d49546d3d Author: David Fifield david@bamsoftware.com Date: Wed Mar 18 18:00:44 2020 -0600
Use a global RedialPacketConn and smux.Session.
This allows multiple SOCKS connections to share the available proxies, and in particular prevents a SOCKS connection from being starved of a proxy when the maximum proxy capacity is less then the number of the number of SOCKS connections.
This is option 4 from https://bugs.torproject.org/33519. --- client/lib/snowflake.go | 78 ++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 71 insertions(+), 7 deletions(-)
diff --git a/client/lib/snowflake.go b/client/lib/snowflake.go index 4b7dd4d..27991b2 100644 --- a/client/lib/snowflake.go +++ b/client/lib/snowflake.go @@ -6,6 +6,7 @@ import ( "io" "log" "net" + "sync" "time"
"git.torproject.org/pluggable-transports/snowflake.git/common/turbotunnel" @@ -23,9 +24,10 @@ type dummyAddr struct{} func (addr dummyAddr) Network() string { return "dummy" } func (addr dummyAddr) String() string { return "dummy" }
-// Given an accepted SOCKS connection, establish a WebRTC connection to the -// remote peer and exchange traffic. -func Handler(socks net.Conn, snowflakes SnowflakeCollector) error { +// newSession returns a new smux.Session and the net.PacketConn it is running +// over. The net.PacketConn successively connects through Snowflake proxies +// pulled from snowflakes. +func newSession(snowflakes SnowflakeCollector) (net.PacketConn, *smux.Session, error) { clientID := turbotunnel.NewClientID()
// We build a persistent KCP session on a sequence of ephemeral WebRTC @@ -54,7 +56,6 @@ func Handler(socks net.Conn, snowflakes SnowflakeCollector) error { return NewEncapsulationPacketConn(dummyAddr{}, dummyAddr{}, conn), nil } pconn := turbotunnel.NewRedialPacketConn(dummyAddr{}, dummyAddr{}, dialContext) - defer pconn.Close()
// conn is built on the underlying RedialPacketConn—when one WebRTC // connection dies, another one will be found to take its place. The @@ -62,9 +63,9 @@ func Handler(socks net.Conn, snowflakes SnowflakeCollector) error { // engine. conn, err := kcp.NewConn2(dummyAddr{}, nil, 0, 0, pconn) if err != nil { - return err + pconn.Close() + return nil, nil, err } - defer conn.Close() // Permit coalescing the payloads of consecutive sends. conn.SetStreamMode(true) // Disable the dynamic congestion window (limit only by the @@ -81,9 +82,72 @@ func Handler(socks net.Conn, snowflakes SnowflakeCollector) error { smuxConfig.KeepAliveTimeout = 10 * time.Minute sess, err := smux.Client(conn, smuxConfig) if err != nil { + conn.Close() + pconn.Close() + return nil, nil, err + } + + return pconn, sess, err +} + +// sessionManager_ maintains a single global smux.Session that is shared among +// incoming SOCKS connections. +type sessionManager_ struct { + mutex sync.Mutex + sess *smux.Session +} + +// Get creates and returns a new global smux.Session if none exists yet. If one +// already exists, it returns the existing one. It monitors the returned session +// and if it ever fails, sets things up so the next call to Get will create a +// new session. +func (manager *sessionManager_) Get(snowflakes SnowflakeCollector) (*smux.Session, error) { + manager.mutex.Lock() + defer manager.mutex.Unlock() + + if manager.sess == nil { + log.Printf("starting a new session") + pconn, sess, err := newSession(snowflakes) + if err != nil { + return nil, err + } + manager.sess = sess + go func() { + // If the session dies, set it to be recreated. + for { + <-time.After(5 * time.Second) + if sess.IsClosed() { + break + } + } + log.Printf("discarding finished session") + // Close the underlying to force any ongoing WebRTC + // connection to close as well, and relinquish the + // SnowflakeCollector. + pconn.Close() + manager.mutex.Lock() + manager.sess = nil + manager.mutex.Unlock() + }() + } else { + log.Printf("reusing the existing session") + } + + return manager.sess, nil +} + +var sessionManager = sessionManager_{} + +// Given an accepted SOCKS connection, establish a WebRTC connection to the +// remote peer and exchange traffic. +func Handler(socks net.Conn, snowflakes SnowflakeCollector) error { + // Return the global smux.Session. + sess, err := sessionManager.Get(snowflakes) + if err != nil { return err } - defer sess.Close() + + // On the smux session we overlay a stream. stream, err := sess.OpenStream() if err != nil { return err