[tor-commits] [snowflake/master] interfaces.go, SnowflakeCollector, better composition

serene at torproject.org serene at torproject.org
Sun Jun 12 19:44:05 UTC 2016


commit 556596cc5aa14eecb687bbfb44188d1e733b6855
Author: Serene Han <keroserene+git at gmail.com>
Date:   Tue May 24 15:18:54 2016 -0700

    interfaces.go, SnowflakeCollector, better composition
---
 client/client_test.go | 124 +++++++++++++++++++++++++-------------
 client/interfaces.go  |  30 ++++++++++
 client/snowflake.go   | 161 +++++++++++++++++++++++---------------------------
 client/webrtc.go      |  18 ++++++
 4 files changed, 205 insertions(+), 128 deletions(-)

diff --git a/client/client_test.go b/client/client_test.go
index 1ccf206..0bd3844 100644
--- a/client/client_test.go
+++ b/client/client_test.go
@@ -3,12 +3,15 @@ package main
 import (
 	"bytes"
 	"fmt"
-	"github.com/keroserene/go-webrtc"
-	. "github.com/smartystreets/goconvey/convey"
 	"io/ioutil"
+	"net"
 	"net/http"
 	"strings"
 	"testing"
+
+	// "git.torproject.org/pluggable-transports/goptlib.git"
+	"github.com/keroserene/go-webrtc"
+	. "github.com/smartystreets/goconvey/convey"
 )
 
 type MockDataChannel struct {
@@ -56,56 +59,93 @@ func (w FakeDialer) Catch() (*webRTCConn, error) {
 	return &webRTCConn{}, nil
 }
 
+type FakeSocksConn struct {
+	net.Conn
+	rejected bool
+}
+
+func (f FakeSocksConn) Reject() error {
+	f.rejected = true
+	return nil
+}
+func (f FakeSocksConn) Grant(addr *net.TCPAddr) error {
+	return nil
+}
+
+type FakeSnowflakeJar struct {
+	toRelease *webRTCConn
+}
+
+func (f FakeSnowflakeJar) Release() *webRTCConn {
+	return nil
+}
+
+func (f FakeSnowflakeJar) Collect() (*webRTCConn, error) {
+	return nil, nil
+}
+
 func TestSnowflakeClient(t *testing.T) {
-	Convey("Snowflake", t, func() {
 
-		Convey("Peers", func() {
+	Convey("WebRTC ConnectLoop", t, func() {
 
-			Convey("WebRTC ConnectLoop continues until capacity of 1.\n", func() {
-				peers := NewPeers(1)
-				peers.Tongue = FakeDialer{}
+		Convey("WebRTC ConnectLoop continues until capacity of 1.\n", func() {
+			snowflakes := NewSnowflakeJar(1)
+			snowflakes.Tongue = FakeDialer{}
 
-				go ConnectLoop(peers)
-				<-peers.maxedChan
+			go ConnectLoop(snowflakes)
+			<-snowflakes.maxedChan
 
-				So(peers.Count(), ShouldEqual, 1)
-				r := <-peers.snowflakeChan
-				So(r, ShouldNotBeNil)
-				So(peers.Count(), ShouldEqual, 0)
-			})
+			So(snowflakes.Count(), ShouldEqual, 1)
+			r := <-snowflakes.snowflakeChan
+			So(r, ShouldNotBeNil)
+			So(snowflakes.Count(), ShouldEqual, 0)
+		})
 
-			Convey("WebRTC ConnectLoop continues until capacity of 3.\n", func() {
-				peers := NewPeers(3)
-				peers.Tongue = FakeDialer{}
-
-				go ConnectLoop(peers)
-				<-peers.maxedChan
-				So(peers.Count(), ShouldEqual, 3)
-				<-peers.snowflakeChan
-				<-peers.snowflakeChan
-				<-peers.snowflakeChan
-				So(peers.Count(), ShouldEqual, 0)
-			})
+		Convey("WebRTC ConnectLoop continues until capacity of 3.\n", func() {
+			snowflakes := NewSnowflakeJar(3)
+			snowflakes.Tongue = FakeDialer{}
+
+			go ConnectLoop(snowflakes)
+			<-snowflakes.maxedChan
+			So(snowflakes.Count(), ShouldEqual, 3)
+			<-snowflakes.snowflakeChan
+			<-snowflakes.snowflakeChan
+			<-snowflakes.snowflakeChan
+			So(snowflakes.Count(), ShouldEqual, 0)
+		})
 
-			Convey("WebRTC ConnectLoop continues filling when Snowflakes disconnect.\n", func() {
-				peers := NewPeers(3)
-				peers.Tongue = FakeDialer{}
+		Convey("WebRTC ConnectLoop continues filling when Snowflakes disconnect.\n", func() {
+			snowflakes := NewSnowflakeJar(3)
+			snowflakes.Tongue = FakeDialer{}
 
-				go ConnectLoop(peers)
-				<-peers.maxedChan
-				So(peers.Count(), ShouldEqual, 3)
+			go ConnectLoop(snowflakes)
+			<-snowflakes.maxedChan
+			So(snowflakes.Count(), ShouldEqual, 3)
 
-				r := <-peers.snowflakeChan
-				So(peers.Count(), ShouldEqual, 2)
-				r.Close()
-				<-peers.maxedChan
-				So(peers.Count(), ShouldEqual, 3)
+			r := <-snowflakes.snowflakeChan
+			So(snowflakes.Count(), ShouldEqual, 2)
+			r.Close()
+			<-snowflakes.maxedChan
+			So(snowflakes.Count(), ShouldEqual, 3)
+
+			<-snowflakes.snowflakeChan
+			<-snowflakes.snowflakeChan
+			<-snowflakes.snowflakeChan
+			So(snowflakes.Count(), ShouldEqual, 0)
+		})
+	})
+
+	Convey("Snowflake", t, func() {
+
+		SkipConvey("Handler Grants correctly", func() {
+			socks := &FakeSocksConn{}
+			snowflakes := &FakeSnowflakeJar{}
+
+			So(socks.rejected, ShouldEqual, false)
+			snowflakes.toRelease = nil
+			handler(socks, snowflakes)
+			So(socks.rejected, ShouldEqual, true)
 
-				<-peers.snowflakeChan
-				<-peers.snowflakeChan
-				<-peers.snowflakeChan
-				So(peers.Count(), ShouldEqual, 0)
-			})
 		})
 
 		Convey("WebRTC Connection", func() {
diff --git a/client/interfaces.go b/client/interfaces.go
new file mode 100644
index 0000000..80a2ba3
--- /dev/null
+++ b/client/interfaces.go
@@ -0,0 +1,30 @@
+package main
+
+import (
+	"net"
+)
+
+// Interface for collecting and releasing snowflakes.
+type SnowflakeCollector interface {
+	Collect() (*webRTCConn, error)
+	Release() *webRTCConn
+}
+
+// Interface for catching those wild Snowflakes.
+type Tongue interface {
+	Catch() (*webRTCConn, error)
+}
+
+// Interface which primarily adapts to goptlib's SocksConn struct.
+type SocksConnector interface {
+	Grant(*net.TCPAddr) error
+	Reject() error
+	net.Conn
+}
+
+// Interface for the Snowflake's transport.
+// (Specifically, webrtc.DataChannel)
+type SnowflakeChannel interface {
+	Send([]byte)
+	Close() error
+}
diff --git a/client/snowflake.go b/client/snowflake.go
index 464420c..cd7f151 100644
--- a/client/snowflake.go
+++ b/client/snowflake.go
@@ -1,4 +1,5 @@
 // Client transport plugin for the Snowflake pluggable transport.
+// In the Client context, "Snowflake" refers to a remote browser proxy.
 package main
 
 import (
@@ -49,22 +50,12 @@ func copyLoop(a, b net.Conn) {
 	log.Println("copy loop ended")
 }
 
-// Interface for catching Snowflakes.
-type Tongue interface {
-	Catch() (*webRTCConn, error)
-}
-
-// Interface for the Snowflake transport. (usually a webrtc.DataChannel)
-type SnowflakeChannel interface {
-	Send([]byte)
-	Close() error
-}
-
-// Collect and track available remote WebRTC Peers, to switch between if the
-// current one disconnects.
-// Right now, it is only possible to use one remote in a circuit. This can be
-// updated once multiplexed transport on a single circuit is available.
-type Peers struct {
+// Collect and track available snowflakes. (Implements SnowflakeCollector)
+// Right now, it is only possible to use one active remote in a circuit.
+// This can be updated once multiplexed transport on a single circuit is available.
+// Keeping multiple WebRTC connections available allows for quicker recovery when
+// the current snowflake disconnects.
+type SnowflakeJar struct {
 	Tongue
 	BytesLogger
 
@@ -74,30 +65,42 @@ type Peers struct {
 	maxedChan     chan struct{}
 }
 
-func NewPeers(max int) *Peers {
-	p := &Peers{capacity: max}
+func NewSnowflakeJar(max int) *SnowflakeJar {
+	p := &SnowflakeJar{capacity: max}
 	p.snowflakeChan = make(chan *webRTCConn, max)
 	p.maxedChan = make(chan struct{}, 1)
 	return p
 }
 
-// Find, connect, and add a new peer to the internal collection.
-func (p *Peers) FindSnowflake() (*webRTCConn, error) {
-	if p.Count() >= p.capacity {
-		s := fmt.Sprintf("At capacity [%d/%d]", p.Count(), p.capacity)
-		p.maxedChan <- struct{}{}
+// Establish connection to some remote WebRTC peer, and keep it available for
+// later.
+func (jar *SnowflakeJar) Collect() (*webRTCConn, error) {
+	if jar.Count() >= jar.capacity {
+		s := fmt.Sprintf("At capacity [%d/%d]", jar.Count(), jar.capacity)
+		jar.maxedChan <- struct{}{}
 		return nil, errors.New(s)
 	}
-	connection, err := p.Catch()
-	connection.BytesLogger = p.BytesLogger
+	snowflake, err := jar.Catch()
 	if err != nil {
 		return nil, err
 	}
-	return connection, nil
+	jar.snowflakeChan <- snowflake
+	return snowflake, nil
+}
+
+// Prepare and present an available remote WebRTC peer for active use.
+func (jar *SnowflakeJar) Release() *webRTCConn {
+	snowflake, ok := <-jar.snowflakeChan
+	if !ok {
+		return nil
+	}
+	jar.current = snowflake
+	snowflake.BytesLogger = jar.BytesLogger
+	return snowflake
 }
 
 // TODO: Needs fixing.
-func (p *Peers) Count() int {
+func (p *SnowflakeJar) Count() int {
 	count := 0
 	if p.current != nil {
 		count = 1
@@ -106,7 +109,7 @@ func (p *Peers) Count() int {
 }
 
 // Close all remote peers.
-func (p *Peers) End() {
+func (p *SnowflakeJar) End() {
 	log.Printf("WebRTC: interruped")
 	if nil != p.current {
 		p.current.Close()
@@ -118,87 +121,71 @@ func (p *Peers) End() {
 
 // Maintain |SnowflakeCapacity| number of available WebRTC connections, to
 // transfer to the Tor SOCKS handler when needed.
-func ConnectLoop(peers *Peers) {
+func ConnectLoop(snowflakes SnowflakeCollector) {
 	for {
-		s, err := peers.FindSnowflake()
+		s, err := snowflakes.Collect()
 		if nil == s || nil != err {
 			log.Println("WebRTC Error:", err,
 				" Retrying in", ReconnectTimeout, "seconds...")
+			// Failed collections get a timeout.
 			<-time.After(time.Second * ReconnectTimeout)
 			continue
 		}
-		peers.snowflakeChan <- s
+		// Successful collection gets rate limited to once per second.
 		<-time.After(time.Second)
 	}
 }
 
-// Implements |Tongue|
-type WebRTCDialer struct {
-	*BrokerChannel
-}
-
-// Initialize a WebRTC Connection by signaling through the broker.
-func (w WebRTCDialer) Catch() (*webRTCConn, error) {
-	if nil == w.BrokerChannel {
-		return nil, errors.New("Cannot Dial WebRTC without a BrokerChannel.")
+// Accept local SOCKS connections and pass them to the handler.
+func acceptLoop(ln *pt.SocksListener, snowflakes SnowflakeCollector) error {
+	defer ln.Close()
+	for {
+		log.Println("SOCKS listening...", ln)
+		conn, err := ln.AcceptSocks()
+		log.Println("accepting", conn, err)
+		if err != nil {
+			if e, ok := err.(net.Error); ok && e.Temporary() {
+				continue
+			}
+			return err
+		}
+		err = handler(conn, snowflakes)
+		if err != nil {
+			log.Printf("handler error: %s", err)
+		}
 	}
-	// TODO: [#3] Fetch ICE server information from Broker.
-	// TODO: [#18] Consider TURN servers here too.
-	config := webrtc.NewConfiguration(iceServers...)
-	connection := NewWebRTCConnection(config, w.BrokerChannel)
-	err := connection.Connect()
-	return connection, err
 }
 
-// Establish a WebRTC channel for SOCKS connections.
-func handler(conn *pt.SocksConn, peers *Peers) error {
+// Given an accepted SOCKS connection, establish a WebRTC connection to the
+// remote peer and exchange traffic.
+func handler(socks SocksConnector, snowflakes SnowflakeCollector) error {
 	handlerChan <- 1
 	defer func() {
 		handlerChan <- -1
 	}()
 	// Wait for an available WebRTC remote...
-	remote, ok := <-peers.snowflakeChan
-	peers.current = remote
-	if remote == nil || !ok {
-		conn.Reject()
+	snowflake := snowflakes.Release()
+	if nil == snowflake {
+		socks.Reject()
 		return errors.New("handler: Received invalid Snowflake")
 	}
-	defer conn.Close()
+	defer socks.Close()
 	log.Println("handler: Snowflake assigned.")
-
-	err := conn.Grant(&net.TCPAddr{IP: net.IPv4zero, Port: 0})
+	err := socks.Grant(&net.TCPAddr{IP: net.IPv4zero, Port: 0})
 	if err != nil {
 		return err
 	}
 
-	go copyLoop(conn, remote)
+	// Begin exchanging data.
+	go copyLoop(socks, snowflake)
+
 	// When WebRTC resets, close the SOCKS connection, which induces new handler.
-	<-remote.reset
+	// TODO: Double check this / fix it.
+	<-snowflake.reset
 	log.Println("---- Closed ---")
 	return nil
 }
 
-func acceptLoop(ln *pt.SocksListener, peers *Peers) error {
-	defer ln.Close()
-	for {
-		log.Println("SOCKS listening...", ln)
-		conn, err := ln.AcceptSocks()
-		log.Println("accepting", conn, err)
-		if err != nil {
-			if e, ok := err.(net.Error); ok && e.Temporary() {
-				continue
-			}
-			return err
-		}
-		go func() {
-			err := handler(conn, peers)
-			if err != nil {
-				log.Printf("handler error: %s", err)
-			}
-		}()
-	}
-}
-
 // TODO: Fix since multiplexing changes access to remotes.
 func readSignalingMessages(f *os.File) {
 	log.Printf("readSignalingMessages")
@@ -258,19 +245,21 @@ func main() {
 		go readSignalingMessages(signalFile)
 	}
 
-	// Prepare WebRTC Peers and the Broker, then accumulate connections.
+	// Prepare WebRTC SnowflakeCollector and the Broker, then accumulate connections.
 	// TODO: Expose remote peer capacity as a flag?
-	remotes := NewPeers(SnowflakeCapacity)
+	snowflakes := NewSnowflakeJar(SnowflakeCapacity)
+
 	broker := NewBrokerChannel(brokerURL, frontDomain, CreateBrokerTransport())
+	snowflakes.Tongue = WebRTCDialer{broker}
 
-	remotes.BytesLogger = &BytesSyncLogger{
+	// Use a real logger for traffic.
+	snowflakes.BytesLogger = &BytesSyncLogger{
 		inboundChan: make(chan int, 5), outboundChan: make(chan int, 5),
 		inbound: 0, outbound: 0, inEvents: 0, outEvents: 0,
 	}
-	go remotes.BytesLogger.Log()
 
-	remotes.Tongue = WebRTCDialer{broker}
-	go ConnectLoop(remotes)
+	go ConnectLoop(snowflakes)
+	go snowflakes.BytesLogger.Log()
 
 	ptInfo, err = pt.ClientSetup(nil)
 	if err != nil {
@@ -292,7 +281,7 @@ func main() {
 				pt.CmethodError(methodName, err.Error())
 				break
 			}
-			go acceptLoop(ln, remotes)
+			go acceptLoop(ln, snowflakes)
 			pt.Cmethod(methodName, ln.Version(), ln.Addr())
 			listeners = append(listeners, ln)
 		default:
@@ -319,7 +308,7 @@ func main() {
 		ln.Close()
 	}
 
-	remotes.End()
+	snowflakes.End()
 
 	// wait for second signal or no more handlers
 	sig = nil
diff --git a/client/webrtc.go b/client/webrtc.go
index a47ac19..41642be 100644
--- a/client/webrtc.go
+++ b/client/webrtc.go
@@ -11,6 +11,24 @@ import (
 	"time"
 )
 
+// Implements the |Tongue| interface to catch snowflakes, using a BrokerChannel.
+type WebRTCDialer struct {
+	*BrokerChannel
+}
+
+// Initialize a WebRTC Connection by signaling through the broker.
+func (w WebRTCDialer) Catch() (*webRTCConn, error) {
+	if nil == w.BrokerChannel {
+		return nil, errors.New("Cannot Dial WebRTC without a BrokerChannel.")
+	}
+	// TODO: [#3] Fetch ICE server information from Broker.
+	// TODO: [#18] Consider TURN servers here too.
+	config := webrtc.NewConfiguration(iceServers...)
+	connection := NewWebRTCConnection(config, w.BrokerChannel)
+	err := connection.Connect()
+	return connection, err
+}
+
 // Remote WebRTC peer.  Implements the |net.Conn| interface.
 type webRTCConn struct {
 	config    *webrtc.Configuration





More information about the tor-commits mailing list