commit e87b9175dd7559fccd665cd7eb4b6edecc231950 Author: Cecylia Bocovich cohosh@torproject.org Date: Sat Mar 20 12:36:33 2021 -0400
Implement snowflake client lib as PTv2.1 Go API
This implements a pluggable transports v2.1 compatible Go API in the Snowflake client library, and refactors how the main Snowflake program calls it. The Go API implements the two required client side functions: a constructor that returns a Transport, and a Dial function for the Transport that returns a net.Conn. See the PT specification for more information: https://github.com/Pluggable-Transports/Pluggable-Transports-spec/blob/maste... --- client/client_test.go | 59 --------------- client/lib/lib_test.go | 55 ++++++++++---- client/lib/snowflake.go | 198 +++++++++++++++++++++++++++++++++--------------- client/snowflake.go | 106 ++++++++------------------ 4 files changed, 211 insertions(+), 207 deletions(-)
diff --git a/client/client_test.go b/client/client_test.go deleted file mode 100644 index 84e9cc1..0000000 --- a/client/client_test.go +++ /dev/null @@ -1,59 +0,0 @@ -package main - -import ( - "testing" - - . "github.com/smartystreets/goconvey/convey" -) - -func TestICEServerParser(t *testing.T) { - Convey("Test parsing of ICE servers", t, func() { - for _, test := range []struct { - input string - urls [][]string - length int - }{ - { - "", - nil, - 0, - }, - { - " ", - nil, - 0, - }, - { - "stun:stun.l.google.com:19302", - [][]string{[]string{"stun:stun.l.google.com:19302"}}, - 1, - }, - { - "stun:stun.l.google.com:19302,stun.ekiga.net", - [][]string{[]string{"stun:stun.l.google.com:19302"}, []string{"stun.ekiga.net"}}, - 2, - }, - { - "stun:stun.l.google.com:19302, stun.ekiga.net", - [][]string{[]string{"stun:stun.l.google.com:19302"}, []string{"stun.ekiga.net"}}, - 2, - }, - } { - servers := parseIceServers(test.input) - - if test.urls == nil { - So(servers, ShouldBeNil) - } else { - So(servers, ShouldNotBeNil) - } - - So(len(servers), ShouldEqual, test.length) - - for _, server := range servers { - So(test.urls, ShouldContain, server.URLs) - } - - } - - }) -} diff --git a/client/lib/lib_test.go b/client/lib/lib_test.go index 5537a52..6140e0b 100644 --- a/client/lib/lib_test.go +++ b/client/lib/lib_test.go @@ -156,19 +156,6 @@ func TestSnowflakeClient(t *testing.T) {
})
- Convey("Snowflake", t, func() { - - SkipConvey("Handler Grants correctly", func() { - socks := &FakeSocksConn{} - broker := &BrokerChannel{Host: "test"} - d := NewWebRTCDialer(broker, nil, 1) - - So(socks.rejected, ShouldEqual, false) - Handler(socks, d) - So(socks.rejected, ShouldEqual, true) - }) - }) - Convey("Dialers", t, func() { Convey("Can construct WebRTCDialer.", func() { broker := &BrokerChannel{Host: "test"} @@ -267,3 +254,45 @@ func TestSnowflakeClient(t *testing.T) { })
} + +func TestICEServerParser(t *testing.T) { + Convey("Test parsing of ICE servers", t, func() { + for _, test := range []struct { + input []string + urls [][]string + length int + }{ + { + []string{"stun:stun.l.google.com:19302"}, + [][]string{[]string{"stun:stun.l.google.com:19302"}}, + 1, + }, + { + []string{"stun:stun.l.google.com:19302", "stun.ekiga.net"}, + [][]string{[]string{"stun:stun.l.google.com:19302"}, []string{"stun.ekiga.net"}}, + 2, + }, + { + []string{"stun:stun.l.google.com:19302", "stun.ekiga.net"}, + [][]string{[]string{"stun:stun.l.google.com:19302"}, []string{"stun.ekiga.net"}}, + 2, + }, + } { + servers := parseIceServers(test.input) + + if test.urls == nil { + So(servers, ShouldBeNil) + } else { + So(servers, ShouldNotBeNil) + } + + So(len(servers), ShouldEqual, test.length) + + for _, server := range servers { + So(test.urls, ShouldContain, server.URLs) + } + + } + + }) +} diff --git a/client/lib/snowflake.go b/client/lib/snowflake.go index 2ed51a1..6e87b81 100644 --- a/client/lib/snowflake.go +++ b/client/lib/snowflake.go @@ -3,12 +3,15 @@ package lib import ( "context" "errors" - "io" "log" + "math/rand" "net" + "strings" "time"
+ "git.torproject.org/pluggable-transports/snowflake.git/common/nat" "git.torproject.org/pluggable-transports/snowflake.git/common/turbotunnel" + "github.com/pion/webrtc/v3" "github.com/xtaci/kcp-go/v5" "github.com/xtaci/smux" ) @@ -25,6 +28,138 @@ type dummyAddr struct{} func (addr dummyAddr) Network() string { return "dummy" } func (addr dummyAddr) String() string { return "dummy" }
+// Transport is a structure with methods that conform to the Go PT v2.1 API +// https://github.com/Pluggable-Transports/Pluggable-Transports-spec/blob/maste... +type Transport struct { + dialer *WebRTCDialer +} + +// Create a new Snowflake transport client that can spawn multiple Snowflake connections. +// brokerURL and frontDomain are the urls for the broker host and domain fronting host +// iceAddresses are the STUN/TURN urls needed for WebRTC negotiation +// keepLocalAddresses is a flag to enable sending local network addresses (for testing purposes) +// max is the maximum number of snowflakes the client should gather for each SOCKS connection +func NewSnowflakeClient(brokerURL, frontDomain string, iceAddresses []string, keepLocalAddresses bool, max int) (*Transport, error) { + + log.Println("\n\n\n --- Starting Snowflake Client ---") + + iceServers := parseIceServers(iceAddresses) + // chooses a random subset of servers from inputs + rand.Seed(time.Now().UnixNano()) + rand.Shuffle(len(iceServers), func(i, j int) { + iceServers[i], iceServers[j] = iceServers[j], iceServers[i] + }) + if len(iceServers) > 2 { + iceServers = iceServers[:(len(iceServers)+1)/2] + } + log.Printf("Using ICE servers:") + for _, server := range iceServers { + log.Printf("url: %v", strings.Join(server.URLs, " ")) + } + + // Use potentially domain-fronting broker to rendezvous. + broker, err := NewBrokerChannel( + brokerURL, frontDomain, CreateBrokerTransport(), + keepLocalAddresses) + if err != nil { + return nil, err + } + go updateNATType(iceServers, broker) + + transport := &Transport{dialer: NewWebRTCDialer(broker, iceServers, max)} + + return transport, nil +} + +// Create a new Snowflake connection. Starts the collection of snowflakes and returns a +// smux Stream. +func (t *Transport) Dial() (net.Conn, error) { + // Prepare to collect remote WebRTC peers. + snowflakes, err := NewPeers(t.dialer) + if err != nil { + return nil, err + } + + // Use a real logger to periodically output how much traffic is happening. + snowflakes.BytesLogger = NewBytesSyncLogger() + + log.Printf("---- SnowflakeConn: begin collecting snowflakes ---") + go connectLoop(snowflakes) + + // Create a new smux session + log.Printf("---- SnowflakeConn: starting a new session ---") + pconn, sess, err := newSession(snowflakes) + if err != nil { + return nil, err + } + + // On the smux session we overlay a stream. + stream, err := sess.OpenStream() + if err != nil { + return nil, err + } + + // Begin exchanging data. + log.Printf("---- SnowflakeConn: begin stream %v ---", stream.ID()) + return &SnowflakeConn{Stream: stream, sess: sess, pconn: pconn, snowflakes: snowflakes}, nil +} + +type SnowflakeConn struct { + *smux.Stream + sess *smux.Session + pconn net.PacketConn + snowflakes *Peers +} + +func (conn *SnowflakeConn) Close() error { + log.Printf("---- SnowflakeConn: closed stream %v ---", conn.ID()) + conn.Stream.Close() + log.Printf("---- SnowflakeConn: end collecting snowflakes ---") + conn.snowflakes.End() + conn.pconn.Close() + log.Printf("---- SnowflakeConn: discarding finished session ---") + conn.sess.Close() + return nil //TODO: return errors if any of the above do +} + +// loop through all provided STUN servers until we exhaust the list or find +// one that is compatable with RFC 5780 +func updateNATType(servers []webrtc.ICEServer, broker *BrokerChannel) { + + var restrictedNAT bool + var err error + for _, server := range servers { + addr := strings.TrimPrefix(server.URLs[0], "stun:") + restrictedNAT, err = nat.CheckIfRestrictedNAT(addr) + if err == nil { + if restrictedNAT { + broker.SetNATType(nat.NATRestricted) + } else { + broker.SetNATType(nat.NATUnrestricted) + } + break + } + } + if err != nil { + broker.SetNATType(nat.NATUnknown) + } +} + +// Returns a slice of webrtc.ICEServer given a slice of addresses +func parseIceServers(addresses []string) []webrtc.ICEServer { + var servers []webrtc.ICEServer + if len(addresses) == 0 { + return nil + } + for _, url := range addresses { + url = strings.TrimSpace(url) + servers = append(servers, webrtc.ICEServer{ + URLs: []string{url}, + }) + } + return servers +} + // newSession returns a new smux.Session and the net.PacketConn it is running // over. The net.PacketConn successively connects through Snowflake proxies // pulled from snowflakes. @@ -94,47 +229,6 @@ func newSession(snowflakes SnowflakeCollector) (net.PacketConn, *smux.Session, e return pconn, sess, err }
-// Given an accepted SOCKS connection, establish a WebRTC connection to the -// remote peer and exchange traffic. -func Handler(socks net.Conn, tongue Tongue) error { - // Prepare to collect remote WebRTC peers. - snowflakes, err := NewPeers(tongue) - if err != nil { - return err - } - - // Use a real logger to periodically output how much traffic is happening. - snowflakes.BytesLogger = NewBytesSyncLogger() - - log.Printf("---- Handler: begin collecting snowflakes ---") - go connectLoop(snowflakes) - - // Create a new smux session - log.Printf("---- Handler: starting a new session ---") - pconn, sess, err := newSession(snowflakes) - if err != nil { - return err - } - - // On the smux session we overlay a stream. - stream, err := sess.OpenStream() - if err != nil { - return err - } - defer stream.Close() - - // Begin exchanging data. - log.Printf("---- Handler: begin stream %v ---", stream.ID()) - copyLoop(socks, stream) - log.Printf("---- Handler: closed stream %v ---", stream.ID()) - snowflakes.End() - log.Printf("---- Handler: end collecting snowflakes ---") - pconn.Close() - sess.Close() - log.Printf("---- Handler: discarding finished session ---") - return nil -} - // Maintain |SnowflakeCapacity| number of available WebRTC connections, to // transfer to the Tor SOCKS handler when needed. func connectLoop(snowflakes SnowflakeCollector) { @@ -153,23 +247,3 @@ func connectLoop(snowflakes SnowflakeCollector) { } } } - -// Exchanges bytes between two ReadWriters. -// (In this case, between a SOCKS connection and smux stream.) -func copyLoop(socks, stream io.ReadWriter) { - done := make(chan struct{}, 2) - go func() { - if _, err := io.Copy(socks, stream); err != nil { - log.Printf("copying WebRTC to SOCKS resulted in error: %v", err) - } - done <- struct{}{} - }() - go func() { - if _, err := io.Copy(stream, socks); err != nil { - log.Printf("copying SOCKS to stream resulted in error: %v", err) - } - done <- struct{}{} - }() - <-done - log.Println("copy loop ended") -} diff --git a/client/snowflake.go b/client/snowflake.go index d79de97..f19afcf 100644 --- a/client/snowflake.go +++ b/client/snowflake.go @@ -6,7 +6,6 @@ import ( "io" "io/ioutil" "log" - "math/rand" "net" "os" "os/signal" @@ -14,21 +13,38 @@ import ( "strings" "sync" "syscall" - "time"
pt "git.torproject.org/pluggable-transports/goptlib.git" sf "git.torproject.org/pluggable-transports/snowflake.git/client/lib" - "git.torproject.org/pluggable-transports/snowflake.git/common/nat" "git.torproject.org/pluggable-transports/snowflake.git/common/safelog" - "github.com/pion/webrtc/v3" )
const ( DefaultSnowflakeCapacity = 1 )
-// Accept local SOCKS connections and pass them to the handler. -func socksAcceptLoop(ln *pt.SocksListener, tongue sf.Tongue, shutdown chan struct{}, wg *sync.WaitGroup) { +// Exchanges bytes between two ReadWriters. +// (In this case, between a SOCKS connection and a snowflake transport conn) +func copyLoop(socks, sfconn io.ReadWriter) { + done := make(chan struct{}, 2) + go func() { + if _, err := io.Copy(socks, sfconn); err != nil { + log.Printf("copying Snowflake to SOCKS resulted in error: %v", err) + } + done <- struct{}{} + }() + go func() { + if _, err := io.Copy(sfconn, socks); err != nil { + log.Printf("copying SOCKS to Snowflake resulted in error: %v", err) + } + done <- struct{}{} + }() + <-done + log.Println("copy loop ended") +} + +// Accept local SOCKS connections and connect to a Snowflake connection +func socksAcceptLoop(ln *pt.SocksListener, transport *sf.Transport, shutdown chan struct{}, wg *sync.WaitGroup) { defer ln.Close() for { conn, err := ln.AcceptSocks() @@ -53,10 +69,14 @@ func socksAcceptLoop(ln *pt.SocksListener, tongue sf.Tongue, shutdown chan struc
handler := make(chan struct{}) go func() { - err = sf.Handler(conn, tongue) + // pass an empty address because the broker chooses the bridge + sconn, err := transport.Dial() if err != nil { - log.Printf("handler error: %s", err) + log.Printf("dial error: %s", err) } + // copy between the created Snowflake conn and the SOCKS conn + copyLoop(conn, sconn) + sconn.Close() close(handler) return
@@ -72,23 +92,6 @@ func socksAcceptLoop(ln *pt.SocksListener, tongue sf.Tongue, shutdown chan struc } }
-// s is a comma-separated list of ICE server URLs. -func parseIceServers(s string) []webrtc.ICEServer { - var servers []webrtc.ICEServer - s = strings.TrimSpace(s) - if len(s) == 0 { - return nil - } - urls := strings.Split(s, ",") - for _, url := range urls { - url = strings.TrimSpace(url) - servers = append(servers, webrtc.ICEServer{ - URLs: []string{url}, - }) - } - return servers -} - func main() { iceServersCommas := flag.String("ice", "", "comma-separated list of ICE servers") brokerURL := flag.String("url", "", "URL of signaling broker") @@ -137,33 +140,13 @@ func main() { log.SetOutput(&safelog.LogScrubber{Output: logOutput}) }
- log.Println("\n\n\n --- Starting Snowflake Client ---") - - iceServers := parseIceServers(*iceServersCommas) - // chooses a random subset of servers from inputs - rand.Seed(time.Now().UnixNano()) - rand.Shuffle(len(iceServers), func(i, j int) { - iceServers[i], iceServers[j] = iceServers[j], iceServers[i] - }) - if len(iceServers) > 2 { - iceServers = iceServers[:(len(iceServers)+1)/2] - } - log.Printf("Using ICE servers:") - for _, server := range iceServers { - log.Printf("url: %v", strings.Join(server.URLs, " ")) - } + iceAddresses := strings.Split(strings.TrimSpace(*iceServersCommas), ",")
- // Use potentially domain-fronting broker to rendezvous. - broker, err := sf.NewBrokerChannel( - *brokerURL, *frontDomain, sf.CreateBrokerTransport(), - *keepLocalAddresses || *oldKeepLocalAddresses) + transport, err := sf.NewSnowflakeClient(*brokerURL, *frontDomain, iceAddresses, + *keepLocalAddresses || *oldKeepLocalAddresses, *max) if err != nil { - log.Fatalf("parsing broker URL: %v", err) + log.Fatal("Failed to start snowflake transport: ", err) } - go updateNATType(iceServers, broker) - - // Create a new WebRTCDialer to use as the |Tongue| to catch snowflakes - dialer := sf.NewWebRTCDialer(broker, iceServers, *max)
// Begin goptlib client process. ptInfo, err := pt.ClientSetup(nil) @@ -187,7 +170,7 @@ func main() { break } log.Printf("Started SOCKS listener at %v.", ln.Addr()) - go socksAcceptLoop(ln, dialer, shutdown, &wg) + go socksAcceptLoop(ln, transport, shutdown, &wg) pt.Cmethod(methodName, ln.Version(), ln.Addr()) listeners = append(listeners, ln) default: @@ -223,26 +206,3 @@ func main() { wg.Wait() log.Println("snowflake is done.") } - -// loop through all provided STUN servers until we exhaust the list or find -// one that is compatable with RFC 5780 -func updateNATType(servers []webrtc.ICEServer, broker *sf.BrokerChannel) { - - var restrictedNAT bool - var err error - for _, server := range servers { - addr := strings.TrimPrefix(server.URLs[0], "stun:") - restrictedNAT, err = nat.CheckIfRestrictedNAT(addr) - if err == nil { - if restrictedNAT { - broker.SetNATType(nat.NATRestricted) - } else { - broker.SetNATType(nat.NATUnrestricted) - } - break - } - } - if err != nil { - broker.SetNATType(nat.NATUnknown) - } -}