[tor-commits] [snowflake/main] Implement snowflake client lib as PTv2.1 Go API

cohosh at torproject.org cohosh at torproject.org
Wed May 12 13:11:17 UTC 2021


commit e87b9175dd7559fccd665cd7eb4b6edecc231950
Author: Cecylia Bocovich <cohosh at torproject.org>
Date:   Sat Mar 20 12:36:33 2021 -0400

    Implement snowflake client lib as PTv2.1 Go API
    
    This implements a pluggable transports v2.1 compatible Go API in the
    Snowflake client library, and refactors how the main Snowflake program
    calls it. The Go API implements the two required client side functions:
    a constructor that returns a Transport, and a Dial function for the
    Transport that returns a net.Conn. See the PT specification for more
    information:
    https://github.com/Pluggable-Transports/Pluggable-Transports-spec/blob/master/releases/PTSpecV2.1/Pluggable%20Transport%20Specification%20v2.1%20-%20Go%20Transport%20API.pdf
---
 client/client_test.go   |  59 ---------------
 client/lib/lib_test.go  |  55 ++++++++++----
 client/lib/snowflake.go | 198 +++++++++++++++++++++++++++++++++---------------
 client/snowflake.go     | 106 ++++++++------------------
 4 files changed, 211 insertions(+), 207 deletions(-)

diff --git a/client/client_test.go b/client/client_test.go
deleted file mode 100644
index 84e9cc1..0000000
--- a/client/client_test.go
+++ /dev/null
@@ -1,59 +0,0 @@
-package main
-
-import (
-	"testing"
-
-	. "github.com/smartystreets/goconvey/convey"
-)
-
-func TestICEServerParser(t *testing.T) {
-	Convey("Test parsing of ICE servers", t, func() {
-		for _, test := range []struct {
-			input  string
-			urls   [][]string
-			length int
-		}{
-			{
-				"",
-				nil,
-				0,
-			},
-			{
-				" ",
-				nil,
-				0,
-			},
-			{
-				"stun:stun.l.google.com:19302",
-				[][]string{[]string{"stun:stun.l.google.com:19302"}},
-				1,
-			},
-			{
-				"stun:stun.l.google.com:19302,stun.ekiga.net",
-				[][]string{[]string{"stun:stun.l.google.com:19302"}, []string{"stun.ekiga.net"}},
-				2,
-			},
-			{
-				"stun:stun.l.google.com:19302, stun.ekiga.net",
-				[][]string{[]string{"stun:stun.l.google.com:19302"}, []string{"stun.ekiga.net"}},
-				2,
-			},
-		} {
-			servers := parseIceServers(test.input)
-
-			if test.urls == nil {
-				So(servers, ShouldBeNil)
-			} else {
-				So(servers, ShouldNotBeNil)
-			}
-
-			So(len(servers), ShouldEqual, test.length)
-
-			for _, server := range servers {
-				So(test.urls, ShouldContain, server.URLs)
-			}
-
-		}
-
-	})
-}
diff --git a/client/lib/lib_test.go b/client/lib/lib_test.go
index 5537a52..6140e0b 100644
--- a/client/lib/lib_test.go
+++ b/client/lib/lib_test.go
@@ -156,19 +156,6 @@ func TestSnowflakeClient(t *testing.T) {
 
 	})
 
-	Convey("Snowflake", t, func() {
-
-		SkipConvey("Handler Grants correctly", func() {
-			socks := &FakeSocksConn{}
-			broker := &BrokerChannel{Host: "test"}
-			d := NewWebRTCDialer(broker, nil, 1)
-
-			So(socks.rejected, ShouldEqual, false)
-			Handler(socks, d)
-			So(socks.rejected, ShouldEqual, true)
-		})
-	})
-
 	Convey("Dialers", t, func() {
 		Convey("Can construct WebRTCDialer.", func() {
 			broker := &BrokerChannel{Host: "test"}
@@ -267,3 +254,45 @@ func TestSnowflakeClient(t *testing.T) {
 	})
 
 }
+
+func TestICEServerParser(t *testing.T) {
+	Convey("Test parsing of ICE servers", t, func() {
+		for _, test := range []struct {
+			input  []string
+			urls   [][]string
+			length int
+		}{
+			{
+				[]string{"stun:stun.l.google.com:19302"},
+				[][]string{[]string{"stun:stun.l.google.com:19302"}},
+				1,
+			},
+			{
+				[]string{"stun:stun.l.google.com:19302", "stun.ekiga.net"},
+				[][]string{[]string{"stun:stun.l.google.com:19302"}, []string{"stun.ekiga.net"}},
+				2,
+			},
+			{
+				[]string{"stun:stun.l.google.com:19302", "stun.ekiga.net"},
+				[][]string{[]string{"stun:stun.l.google.com:19302"}, []string{"stun.ekiga.net"}},
+				2,
+			},
+		} {
+			servers := parseIceServers(test.input)
+
+			if test.urls == nil {
+				So(servers, ShouldBeNil)
+			} else {
+				So(servers, ShouldNotBeNil)
+			}
+
+			So(len(servers), ShouldEqual, test.length)
+
+			for _, server := range servers {
+				So(test.urls, ShouldContain, server.URLs)
+			}
+
+		}
+
+	})
+}
diff --git a/client/lib/snowflake.go b/client/lib/snowflake.go
index 2ed51a1..6e87b81 100644
--- a/client/lib/snowflake.go
+++ b/client/lib/snowflake.go
@@ -3,12 +3,15 @@ package lib
 import (
 	"context"
 	"errors"
-	"io"
 	"log"
+	"math/rand"
 	"net"
+	"strings"
 	"time"
 
+	"git.torproject.org/pluggable-transports/snowflake.git/common/nat"
 	"git.torproject.org/pluggable-transports/snowflake.git/common/turbotunnel"
+	"github.com/pion/webrtc/v3"
 	"github.com/xtaci/kcp-go/v5"
 	"github.com/xtaci/smux"
 )
@@ -25,6 +28,138 @@ type dummyAddr struct{}
 func (addr dummyAddr) Network() string { return "dummy" }
 func (addr dummyAddr) String() string  { return "dummy" }
 
+// Transport is a structure with methods that conform to the Go PT v2.1 API
+// https://github.com/Pluggable-Transports/Pluggable-Transports-spec/blob/master/releases/PTSpecV2.1/Pluggable%20Transport%20Specification%20v2.1%20-%20Go%20Transport%20API.pdf
+type Transport struct {
+	dialer *WebRTCDialer
+}
+
+// Create a new Snowflake transport client that can spawn multiple Snowflake connections.
+// brokerURL and frontDomain are the urls for the broker host and domain fronting host
+// iceAddresses are the STUN/TURN urls needed for WebRTC negotiation
+// keepLocalAddresses is a flag to enable sending local network addresses (for testing purposes)
+// max is the maximum number of snowflakes the client should gather for each SOCKS connection
+func NewSnowflakeClient(brokerURL, frontDomain string, iceAddresses []string, keepLocalAddresses bool, max int) (*Transport, error) {
+
+	log.Println("\n\n\n --- Starting Snowflake Client ---")
+
+	iceServers := parseIceServers(iceAddresses)
+	// chooses a random subset of servers from inputs
+	rand.Seed(time.Now().UnixNano())
+	rand.Shuffle(len(iceServers), func(i, j int) {
+		iceServers[i], iceServers[j] = iceServers[j], iceServers[i]
+	})
+	if len(iceServers) > 2 {
+		iceServers = iceServers[:(len(iceServers)+1)/2]
+	}
+	log.Printf("Using ICE servers:")
+	for _, server := range iceServers {
+		log.Printf("url: %v", strings.Join(server.URLs, " "))
+	}
+
+	// Use potentially domain-fronting broker to rendezvous.
+	broker, err := NewBrokerChannel(
+		brokerURL, frontDomain, CreateBrokerTransport(),
+		keepLocalAddresses)
+	if err != nil {
+		return nil, err
+	}
+	go updateNATType(iceServers, broker)
+
+	transport := &Transport{dialer: NewWebRTCDialer(broker, iceServers, max)}
+
+	return transport, nil
+}
+
+// Create a new Snowflake connection. Starts the collection of snowflakes and returns a
+// smux Stream.
+func (t *Transport) Dial() (net.Conn, error) {
+	// Prepare to collect remote WebRTC peers.
+	snowflakes, err := NewPeers(t.dialer)
+	if err != nil {
+		return nil, err
+	}
+
+	// Use a real logger to periodically output how much traffic is happening.
+	snowflakes.BytesLogger = NewBytesSyncLogger()
+
+	log.Printf("---- SnowflakeConn: begin collecting snowflakes ---")
+	go connectLoop(snowflakes)
+
+	// Create a new smux session
+	log.Printf("---- SnowflakeConn: starting a new session ---")
+	pconn, sess, err := newSession(snowflakes)
+	if err != nil {
+		return nil, err
+	}
+
+	// On the smux session we overlay a stream.
+	stream, err := sess.OpenStream()
+	if err != nil {
+		return nil, err
+	}
+
+	// Begin exchanging data.
+	log.Printf("---- SnowflakeConn: begin stream %v ---", stream.ID())
+	return &SnowflakeConn{Stream: stream, sess: sess, pconn: pconn, snowflakes: snowflakes}, nil
+}
+
+type SnowflakeConn struct {
+	*smux.Stream
+	sess       *smux.Session
+	pconn      net.PacketConn
+	snowflakes *Peers
+}
+
+func (conn *SnowflakeConn) Close() error {
+	log.Printf("---- SnowflakeConn: closed stream %v ---", conn.ID())
+	conn.Stream.Close()
+	log.Printf("---- SnowflakeConn: end collecting snowflakes ---")
+	conn.snowflakes.End()
+	conn.pconn.Close()
+	log.Printf("---- SnowflakeConn: discarding finished session ---")
+	conn.sess.Close()
+	return nil //TODO: return errors if any of the above do
+}
+
+// loop through all provided STUN servers until we exhaust the list or find
+// one that is compatable with RFC 5780
+func updateNATType(servers []webrtc.ICEServer, broker *BrokerChannel) {
+
+	var restrictedNAT bool
+	var err error
+	for _, server := range servers {
+		addr := strings.TrimPrefix(server.URLs[0], "stun:")
+		restrictedNAT, err = nat.CheckIfRestrictedNAT(addr)
+		if err == nil {
+			if restrictedNAT {
+				broker.SetNATType(nat.NATRestricted)
+			} else {
+				broker.SetNATType(nat.NATUnrestricted)
+			}
+			break
+		}
+	}
+	if err != nil {
+		broker.SetNATType(nat.NATUnknown)
+	}
+}
+
+// Returns a slice of webrtc.ICEServer given a slice of addresses
+func parseIceServers(addresses []string) []webrtc.ICEServer {
+	var servers []webrtc.ICEServer
+	if len(addresses) == 0 {
+		return nil
+	}
+	for _, url := range addresses {
+		url = strings.TrimSpace(url)
+		servers = append(servers, webrtc.ICEServer{
+			URLs: []string{url},
+		})
+	}
+	return servers
+}
+
 // newSession returns a new smux.Session and the net.PacketConn it is running
 // over. The net.PacketConn successively connects through Snowflake proxies
 // pulled from snowflakes.
@@ -94,47 +229,6 @@ func newSession(snowflakes SnowflakeCollector) (net.PacketConn, *smux.Session, e
 	return pconn, sess, err
 }
 
-// Given an accepted SOCKS connection, establish a WebRTC connection to the
-// remote peer and exchange traffic.
-func Handler(socks net.Conn, tongue Tongue) error {
-	// Prepare to collect remote WebRTC peers.
-	snowflakes, err := NewPeers(tongue)
-	if err != nil {
-		return err
-	}
-
-	// Use a real logger to periodically output how much traffic is happening.
-	snowflakes.BytesLogger = NewBytesSyncLogger()
-
-	log.Printf("---- Handler: begin collecting snowflakes ---")
-	go connectLoop(snowflakes)
-
-	// Create a new smux session
-	log.Printf("---- Handler: starting a new session ---")
-	pconn, sess, err := newSession(snowflakes)
-	if err != nil {
-		return err
-	}
-
-	// On the smux session we overlay a stream.
-	stream, err := sess.OpenStream()
-	if err != nil {
-		return err
-	}
-	defer stream.Close()
-
-	// Begin exchanging data.
-	log.Printf("---- Handler: begin stream %v ---", stream.ID())
-	copyLoop(socks, stream)
-	log.Printf("---- Handler: closed stream %v ---", stream.ID())
-	snowflakes.End()
-	log.Printf("---- Handler: end collecting snowflakes ---")
-	pconn.Close()
-	sess.Close()
-	log.Printf("---- Handler: discarding finished session ---")
-	return nil
-}
-
 // Maintain |SnowflakeCapacity| number of available WebRTC connections, to
 // transfer to the Tor SOCKS handler when needed.
 func connectLoop(snowflakes SnowflakeCollector) {
@@ -153,23 +247,3 @@ func connectLoop(snowflakes SnowflakeCollector) {
 		}
 	}
 }
-
-// Exchanges bytes between two ReadWriters.
-// (In this case, between a SOCKS connection and smux stream.)
-func copyLoop(socks, stream io.ReadWriter) {
-	done := make(chan struct{}, 2)
-	go func() {
-		if _, err := io.Copy(socks, stream); err != nil {
-			log.Printf("copying WebRTC to SOCKS resulted in error: %v", err)
-		}
-		done <- struct{}{}
-	}()
-	go func() {
-		if _, err := io.Copy(stream, socks); err != nil {
-			log.Printf("copying SOCKS to stream resulted in error: %v", err)
-		}
-		done <- struct{}{}
-	}()
-	<-done
-	log.Println("copy loop ended")
-}
diff --git a/client/snowflake.go b/client/snowflake.go
index d79de97..f19afcf 100644
--- a/client/snowflake.go
+++ b/client/snowflake.go
@@ -6,7 +6,6 @@ import (
 	"io"
 	"io/ioutil"
 	"log"
-	"math/rand"
 	"net"
 	"os"
 	"os/signal"
@@ -14,21 +13,38 @@ import (
 	"strings"
 	"sync"
 	"syscall"
-	"time"
 
 	pt "git.torproject.org/pluggable-transports/goptlib.git"
 	sf "git.torproject.org/pluggable-transports/snowflake.git/client/lib"
-	"git.torproject.org/pluggable-transports/snowflake.git/common/nat"
 	"git.torproject.org/pluggable-transports/snowflake.git/common/safelog"
-	"github.com/pion/webrtc/v3"
 )
 
 const (
 	DefaultSnowflakeCapacity = 1
 )
 
-// Accept local SOCKS connections and pass them to the handler.
-func socksAcceptLoop(ln *pt.SocksListener, tongue sf.Tongue, shutdown chan struct{}, wg *sync.WaitGroup) {
+// Exchanges bytes between two ReadWriters.
+// (In this case, between a SOCKS connection and a snowflake transport conn)
+func copyLoop(socks, sfconn io.ReadWriter) {
+	done := make(chan struct{}, 2)
+	go func() {
+		if _, err := io.Copy(socks, sfconn); err != nil {
+			log.Printf("copying Snowflake to SOCKS resulted in error: %v", err)
+		}
+		done <- struct{}{}
+	}()
+	go func() {
+		if _, err := io.Copy(sfconn, socks); err != nil {
+			log.Printf("copying SOCKS to Snowflake resulted in error: %v", err)
+		}
+		done <- struct{}{}
+	}()
+	<-done
+	log.Println("copy loop ended")
+}
+
+// Accept local SOCKS connections and connect to a Snowflake connection
+func socksAcceptLoop(ln *pt.SocksListener, transport *sf.Transport, shutdown chan struct{}, wg *sync.WaitGroup) {
 	defer ln.Close()
 	for {
 		conn, err := ln.AcceptSocks()
@@ -53,10 +69,14 @@ func socksAcceptLoop(ln *pt.SocksListener, tongue sf.Tongue, shutdown chan struc
 
 			handler := make(chan struct{})
 			go func() {
-				err = sf.Handler(conn, tongue)
+				// pass an empty address because the broker chooses the bridge
+				sconn, err := transport.Dial()
 				if err != nil {
-					log.Printf("handler error: %s", err)
+					log.Printf("dial error: %s", err)
 				}
+				// copy between the created Snowflake conn and the SOCKS conn
+				copyLoop(conn, sconn)
+				sconn.Close()
 				close(handler)
 				return
 
@@ -72,23 +92,6 @@ func socksAcceptLoop(ln *pt.SocksListener, tongue sf.Tongue, shutdown chan struc
 	}
 }
 
-// s is a comma-separated list of ICE server URLs.
-func parseIceServers(s string) []webrtc.ICEServer {
-	var servers []webrtc.ICEServer
-	s = strings.TrimSpace(s)
-	if len(s) == 0 {
-		return nil
-	}
-	urls := strings.Split(s, ",")
-	for _, url := range urls {
-		url = strings.TrimSpace(url)
-		servers = append(servers, webrtc.ICEServer{
-			URLs: []string{url},
-		})
-	}
-	return servers
-}
-
 func main() {
 	iceServersCommas := flag.String("ice", "", "comma-separated list of ICE servers")
 	brokerURL := flag.String("url", "", "URL of signaling broker")
@@ -137,33 +140,13 @@ func main() {
 		log.SetOutput(&safelog.LogScrubber{Output: logOutput})
 	}
 
-	log.Println("\n\n\n --- Starting Snowflake Client ---")
-
-	iceServers := parseIceServers(*iceServersCommas)
-	// chooses a random subset of servers from inputs
-	rand.Seed(time.Now().UnixNano())
-	rand.Shuffle(len(iceServers), func(i, j int) {
-		iceServers[i], iceServers[j] = iceServers[j], iceServers[i]
-	})
-	if len(iceServers) > 2 {
-		iceServers = iceServers[:(len(iceServers)+1)/2]
-	}
-	log.Printf("Using ICE servers:")
-	for _, server := range iceServers {
-		log.Printf("url: %v", strings.Join(server.URLs, " "))
-	}
+	iceAddresses := strings.Split(strings.TrimSpace(*iceServersCommas), ",")
 
-	// Use potentially domain-fronting broker to rendezvous.
-	broker, err := sf.NewBrokerChannel(
-		*brokerURL, *frontDomain, sf.CreateBrokerTransport(),
-		*keepLocalAddresses || *oldKeepLocalAddresses)
+	transport, err := sf.NewSnowflakeClient(*brokerURL, *frontDomain, iceAddresses,
+		*keepLocalAddresses || *oldKeepLocalAddresses, *max)
 	if err != nil {
-		log.Fatalf("parsing broker URL: %v", err)
+		log.Fatal("Failed to start snowflake transport: ", err)
 	}
-	go updateNATType(iceServers, broker)
-
-	// Create a new WebRTCDialer to use as the |Tongue| to catch snowflakes
-	dialer := sf.NewWebRTCDialer(broker, iceServers, *max)
 
 	// Begin goptlib client process.
 	ptInfo, err := pt.ClientSetup(nil)
@@ -187,7 +170,7 @@ func main() {
 				break
 			}
 			log.Printf("Started SOCKS listener at %v.", ln.Addr())
-			go socksAcceptLoop(ln, dialer, shutdown, &wg)
+			go socksAcceptLoop(ln, transport, shutdown, &wg)
 			pt.Cmethod(methodName, ln.Version(), ln.Addr())
 			listeners = append(listeners, ln)
 		default:
@@ -223,26 +206,3 @@ func main() {
 	wg.Wait()
 	log.Println("snowflake is done.")
 }
-
-// loop through all provided STUN servers until we exhaust the list or find
-// one that is compatable with RFC 5780
-func updateNATType(servers []webrtc.ICEServer, broker *sf.BrokerChannel) {
-
-	var restrictedNAT bool
-	var err error
-	for _, server := range servers {
-		addr := strings.TrimPrefix(server.URLs[0], "stun:")
-		restrictedNAT, err = nat.CheckIfRestrictedNAT(addr)
-		if err == nil {
-			if restrictedNAT {
-				broker.SetNATType(nat.NATRestricted)
-			} else {
-				broker.SetNATType(nat.NATUnrestricted)
-			}
-			break
-		}
-	}
-	if err != nil {
-		broker.SetNATType(nat.NATUnknown)
-	}
-}





More information about the tor-commits mailing list