[tor-commits] [snowflake/master] client interfaces compose better, remove some globals, test ConnectLoop

serene at torproject.org serene at torproject.org
Fri May 20 02:45:26 UTC 2016


commit 6b8568cc6cf7186d86e844a6f1460c9af802575a
Author: Serene Han <keroserene+git at gmail.com>
Date:   Thu May 19 18:06:34 2016 -0700

    client interfaces compose better, remove some globals, test ConnectLoop
---
 client/client_test.go |  85 +++++++++++++++++++++++++-------
 client/snowflake.go   | 133 +++++++++++++++++++++++++++++++++++---------------
 client/webrtc.go      |  43 +++++++++-------
 3 files changed, 186 insertions(+), 75 deletions(-)

diff --git a/client/client_test.go b/client/client_test.go
index 41dc5e6..93e0422 100644
--- a/client/client_test.go
+++ b/client/client_test.go
@@ -2,6 +2,7 @@ package main
 
 import (
 	"bytes"
+	"fmt"
 	"github.com/keroserene/go-webrtc"
 	. "github.com/smartystreets/goconvey/convey"
 	"io/ioutil"
@@ -48,9 +49,64 @@ func (m *MockTransport) RoundTrip(req *http.Request) (*http.Response, error) {
 	return r, nil
 }
 
-func TestConnect(t *testing.T) {
+type FakeDialer struct{}
+
+func (w FakeDialer) Catch() (*webRTCConn, error) {
+	fmt.Println("Caught a dummy snowflake.")
+	return &webRTCConn{}, nil
+}
+
+func TestSnowflakeClient(t *testing.T) {
 	Convey("Snowflake", t, func() {
-		webrtcRemotes = make(map[int]*webRTCConn)
+
+		Convey("Peers", func() {
+
+			Convey("WebRTC ConnectLoop continues until capacity of 1.\n", func() {
+				peers := NewPeers(1)
+				peers.Tongue = FakeDialer{}
+
+				go ConnectLoop(peers)
+				<-peers.maxedChan
+
+				So(peers.Count(), ShouldEqual, 1)
+				r := <-peers.snowflakeChan
+				So(r, ShouldNotBeNil)
+				So(peers.Count(), ShouldEqual, 0)
+			})
+
+			Convey("WebRTC ConnectLoop continues until capacity of 3.\n", func() {
+				peers := NewPeers(3)
+				peers.Tongue = FakeDialer{}
+
+				go ConnectLoop(peers)
+				<-peers.maxedChan
+				So(peers.Count(), ShouldEqual, 3)
+				<-peers.snowflakeChan
+				<-peers.snowflakeChan
+				<-peers.snowflakeChan
+				So(peers.Count(), ShouldEqual, 0)
+			})
+
+			Convey("WebRTC ConnectLoop continues filling when Snowflakes disconnect.\n", func() {
+				peers := NewPeers(3)
+				peers.Tongue = FakeDialer{}
+
+				go ConnectLoop(peers)
+				<-peers.maxedChan
+				So(peers.Count(), ShouldEqual, 3)
+
+				r := <-peers.snowflakeChan
+				So(peers.Count(), ShouldEqual, 2)
+				r.Close()
+				<-peers.maxedChan
+				So(peers.Count(), ShouldEqual, 3)
+
+				<-peers.snowflakeChan
+				<-peers.snowflakeChan
+				<-peers.snowflakeChan
+				So(peers.Count(), ShouldEqual, 0)
+			})
+		})
 
 		Convey("WebRTC Connection", func() {
 			c := new(webRTCConn)
@@ -60,17 +116,13 @@ func TestConnect(t *testing.T) {
 			}
 			So(c.buffer.Bytes(), ShouldEqual, nil)
 
-			Convey("Create and remove from WebRTCConn set", func() {
-				So(len(webrtcRemotes), ShouldEqual, 0)
-				So(remoteIndex, ShouldEqual, 0)
+			Convey("Can construct a WebRTCConn", func() {
 				s := NewWebRTCConnection(nil, nil)
 				So(s, ShouldNotBeNil)
 				So(s.index, ShouldEqual, 0)
-				So(len(webrtcRemotes), ShouldEqual, 1)
-				So(remoteIndex, ShouldEqual, 1)
+				So(s.offerChannel, ShouldNotBeNil)
+				So(s.answerChannel, ShouldNotBeNil)
 				s.Close()
-				So(len(webrtcRemotes), ShouldEqual, 0)
-				So(remoteIndex, ShouldEqual, 1)
 			})
 
 			Convey("Write buffers when datachannel is nil", func() {
@@ -113,9 +165,6 @@ func TestConnect(t *testing.T) {
 				<-c.reset
 			})
 
-			Convey("Connect Loop", func() {
-				// TODO
-			})
 		})
 	})
 
@@ -124,14 +173,14 @@ func TestConnect(t *testing.T) {
 		transport := &MockTransport{http.StatusOK}
 		fakeOffer := webrtc.DeserializeSessionDescription("test")
 
-		Convey("BrokerChannel with no front domain", func() {
+		Convey("Construct BrokerChannel with no front domain", func() {
 			b := NewBrokerChannel("test.broker", "", transport)
 			So(b.url, ShouldNotBeNil)
 			So(b.url.Path, ShouldResemble, "test.broker")
 			So(b.transport, ShouldNotBeNil)
 		})
 
-		Convey("BrokerChannel with front domain", func() {
+		Convey("Construct BrokerChannel *with* front domain", func() {
 			b := NewBrokerChannel("test.broker", "front", transport)
 			So(b.url, ShouldNotBeNil)
 			So(b.url.Path, ShouldResemble, "test.broker")
@@ -139,7 +188,7 @@ func TestConnect(t *testing.T) {
 			So(b.transport, ShouldNotBeNil)
 		})
 
-		Convey("BrokerChannel Negotiate responds with answer", func() {
+		Convey("BrokerChannel.Negotiate responds with answer", func() {
 			b := NewBrokerChannel("test.broker", "", transport)
 			answer, err := b.Negotiate(fakeOffer)
 			So(err, ShouldBeNil)
@@ -147,7 +196,7 @@ func TestConnect(t *testing.T) {
 			So(answer.Sdp, ShouldResemble, "fake")
 		})
 
-		Convey("BrokerChannel Negotiate fails with 503", func() {
+		Convey("BrokerChannel.Negotiate fails with 503", func() {
 			b := NewBrokerChannel("test.broker", "",
 				&MockTransport{http.StatusServiceUnavailable})
 			answer, err := b.Negotiate(fakeOffer)
@@ -156,7 +205,7 @@ func TestConnect(t *testing.T) {
 			So(err.Error(), ShouldResemble, BrokerError503)
 		})
 
-		Convey("BrokerChannel Negotiate fails with 400", func() {
+		Convey("BrokerChannel.Negotiate fails with 400", func() {
 			b := NewBrokerChannel("test.broker", "",
 				&MockTransport{http.StatusBadRequest})
 			answer, err := b.Negotiate(fakeOffer)
@@ -165,7 +214,7 @@ func TestConnect(t *testing.T) {
 			So(err.Error(), ShouldResemble, BrokerError400)
 		})
 
-		Convey("BrokerChannel Negotiate fails with unexpected", func() {
+		Convey("BrokerChannel.Negotiate fails with unexpected error", func() {
 			b := NewBrokerChannel("test.broker", "",
 				&MockTransport{123})
 			answer, err := b.Negotiate(fakeOffer)
diff --git a/client/snowflake.go b/client/snowflake.go
index f32ddc8..61864ca 100644
--- a/client/snowflake.go
+++ b/client/snowflake.go
@@ -5,6 +5,7 @@ import (
 	"bufio"
 	"errors"
 	"flag"
+	"fmt"
 	"io"
 	"log"
 	"net"
@@ -22,14 +23,12 @@ var ptInfo pt.ClientInfo
 
 const (
 	ReconnectTimeout  = 10
-	SnowflakeCapacity = 3
+	SnowflakeCapacity = 1
 )
 
 var brokerURL string
 var frontDomain string
 var iceServers IceServerList
-var snowflakeChan = make(chan *webRTCConn, 1)
-var broker *BrokerChannel
 
 // When a connection handler starts, +1 is written to this channel; when it
 // ends, -1 is written.
@@ -50,62 +49,110 @@ func copyLoop(a, b net.Conn) {
 	log.Println("copy loop ended")
 }
 
-// Interface that matches both webrtc.DataChannel and for testing.
+// Interface for catching Snowflakes.
+type Tongue interface {
+	Catch() (*webRTCConn, error)
+}
+
+// Interface for the Snowflake transport. (usually a webrtc.DataChannel)
 type SnowflakeChannel interface {
 	Send([]byte)
 	Close() error
 }
 
+// Collect and track available remote WebRTC Peers, to switch between if the
+// current one disconnects.
+// Right now, it is only possible to use one remote in a circuit. This can be
+// updated once multiplexed transport on a single circuit is available.
+type Peers struct {
+	Tongue
+
+	snowflakeChan chan *webRTCConn
+	current       *webRTCConn
+	capacity      int
+	maxedChan     chan struct{}
+}
+
+func NewPeers(max int) *Peers {
+	p := &Peers{capacity: max}
+	p.snowflakeChan = make(chan *webRTCConn, max)
+	p.maxedChan = make(chan struct{}, 1)
+	return p
+}
+
+// Find, connect, and add a new peer to the internal collection.
+func (p *Peers) FindSnowflake() (*webRTCConn, error) {
+	if p.Count() >= p.capacity {
+		s := fmt.Sprintf("At capacity [%d/%d]", p.Count(), p.capacity)
+		p.maxedChan <- struct{}{}
+		return nil, errors.New(s)
+	}
+	connection, err := p.Catch()
+	if err != nil {
+		return nil, err
+	}
+	return connection, nil
+}
+
+// TODO: Needs fixing.
+func (p *Peers) Count() int {
+	return len(p.snowflakeChan)
+}
+
+// Close all remote peers.
+func (p *Peers) End() {
+	log.Printf("WebRTC: interruped")
+	if nil != p.current {
+		p.current.Close()
+	}
+	for r := range p.snowflakeChan {
+		r.Close()
+	}
+}
+
 // Maintain |SnowflakeCapacity| number of available WebRTC connections, to
 // transfer to the Tor SOCKS handler when needed.
-func SnowflakeConnectLoop() {
-	transport := CreateBrokerTransport()
-	broker = NewBrokerChannel(brokerURL, frontDomain, transport)
+func ConnectLoop(peers *Peers) {
 	for {
-		numRemotes := len(webrtcRemotes)
-		if numRemotes >= SnowflakeCapacity {
-			log.Println("At Capacity: ", numRemotes, "snowflake. Re-checking in 10s")
-			<-time.After(time.Second * 10)
-			continue
-		}
-		s, err := dialWebRTC()
+		s, err := peers.FindSnowflake()
 		if nil == s || nil != err {
-			log.Println("WebRTC Error: ", err, " retrying...")
+			log.Println("WebRTC Error:", err,
+				" Retrying in", ReconnectTimeout, "seconds...")
 			<-time.After(time.Second * ReconnectTimeout)
 			continue
 		}
-		snowflakeChan <- s
+		peers.snowflakeChan <- s
+		<-time.After(time.Second)
 	}
 }
 
-// Initialize a WebRTC Connection.
-func dialWebRTC() (*webRTCConn, error) {
+// Implements |Tongue|
+type WebRTCDialer struct {
+	*BrokerChannel
+}
+
+// Initialize a WebRTC Connection by signaling through the broker.
+func (w WebRTCDialer) Catch() (*webRTCConn, error) {
+	if nil == w.BrokerChannel {
+		return nil, errors.New("Cannot Dial WebRTC without a BrokerChannel.")
+	}
 	// TODO: [#3] Fetch ICE server information from Broker.
 	// TODO: [#18] Consider TURN servers here too.
 	config := webrtc.NewConfiguration(iceServers...)
-	if nil == broker {
-		return nil, errors.New("Failed to prepare BrokerChannel")
-	}
-	connection := NewWebRTCConnection(config, broker)
+	connection := NewWebRTCConnection(config, w.BrokerChannel)
 	err := connection.Connect()
 	return connection, err
 }
 
-func endWebRTC() {
-	log.Printf("WebRTC: interruped")
-	for _, r := range webrtcRemotes {
-		r.Close()
-	}
-}
-
 // Establish a WebRTC channel for SOCKS connections.
-func handler(conn *pt.SocksConn) error {
+func handler(conn *pt.SocksConn, peers *Peers) error {
 	handlerChan <- 1
 	defer func() {
 		handlerChan <- -1
 	}()
 	// Wait for an available WebRTC remote...
-	remote, ok := <-snowflakeChan
+	remote, ok := <-peers.snowflakeChan
+	peers.current = remote
 	if remote == nil || !ok {
 		conn.Reject()
 		return errors.New("handler: Received invalid Snowflake")
@@ -125,7 +172,7 @@ func handler(conn *pt.SocksConn) error {
 	return nil
 }
 
-func acceptLoop(ln *pt.SocksListener) error {
+func acceptLoop(ln *pt.SocksListener, peers *Peers) error {
 	defer ln.Close()
 	for {
 		log.Println("SOCKS listening...", ln)
@@ -138,7 +185,7 @@ func acceptLoop(ln *pt.SocksListener) error {
 			return err
 		}
 		go func() {
-			err := handler(conn)
+			err := handler(conn, peers)
 			if err != nil {
 				log.Printf("handler error: %s", err)
 			}
@@ -146,6 +193,7 @@ func acceptLoop(ln *pt.SocksListener) error {
 	}
 }
 
+// TODO: Fix since multiplexing changes access to remotes.
 func readSignalingMessages(f *os.File) {
 	log.Printf("readSignalingMessages")
 	s := bufio.NewScanner(f)
@@ -157,10 +205,10 @@ func readSignalingMessages(f *os.File) {
 			log.Printf("ignoring invalid signal message %+q", msg)
 			continue
 		}
-		webrtcRemotes[0].answerChannel <- sdp
+		// webrtcRemotes[0].answerChannel <- sdp
 	}
 	log.Printf("close answerChannel")
-	close(webrtcRemotes[0].answerChannel)
+	// close(webrtcRemotes[0].answerChannel)
 	if err := s.Err(); err != nil {
 		log.Printf("signal FIFO: %s", err)
 	}
@@ -204,8 +252,13 @@ func main() {
 		go readSignalingMessages(signalFile)
 	}
 
-	webrtcRemotes = make(map[int]*webRTCConn)
-	go SnowflakeConnectLoop()
+	// Prepare WebRTC Peers and the Broker, then accumulate connections.
+	// TODO: Expose remote peer capacity as a flag?
+	remotes := NewPeers(SnowflakeCapacity)
+	broker := NewBrokerChannel(brokerURL, frontDomain, CreateBrokerTransport())
+
+	remotes.Tongue = WebRTCDialer{broker}
+	go ConnectLoop(remotes)
 
 	ptInfo, err = pt.ClientSetup(nil)
 	if err != nil {
@@ -221,12 +274,13 @@ func main() {
 	for _, methodName := range ptInfo.MethodNames {
 		switch methodName {
 		case "snowflake":
+			// TODO: Be able to recover when SOCKS dies.
 			ln, err := pt.ListenSocks("tcp", "127.0.0.1:0")
 			if err != nil {
 				pt.CmethodError(methodName, err.Error())
 				break
 			}
-			go acceptLoop(ln)
+			go acceptLoop(ln, remotes)
 			pt.Cmethod(methodName, ln.Version(), ln.Addr())
 			listeners = append(listeners, ln)
 		default:
@@ -234,7 +288,6 @@ func main() {
 		}
 	}
 	pt.CmethodsDone()
-	defer endWebRTC()
 
 	var numHandlers int = 0
 	var sig os.Signal
@@ -254,6 +307,8 @@ func main() {
 		ln.Close()
 	}
 
+	remotes.End()
+
 	// wait for second signal or no more handlers
 	sig = nil
 	for sig == nil && numHandlers != 0 {
diff --git a/client/webrtc.go b/client/webrtc.go
index 5b30e95..e01cbf7 100644
--- a/client/webrtc.go
+++ b/client/webrtc.go
@@ -11,26 +11,27 @@ import (
 	"time"
 )
 
-// Implements net.Conn interface
+// Remote WebRTC peer.  Implements the |net.Conn| interface.
 type webRTCConn struct {
-	config        *webrtc.Configuration
-	pc            *webrtc.PeerConnection
-	snowflake     SnowflakeChannel // Interface holding the WebRTC DataChannel.
-	broker        *BrokerChannel
+	config    *webrtc.Configuration
+	pc        *webrtc.PeerConnection
+	snowflake SnowflakeChannel // Holds the WebRTC DataChannel.
+	broker    *BrokerChannel
+
 	offerChannel  chan *webrtc.SessionDescription
 	answerChannel chan *webrtc.SessionDescription
 	errorChannel  chan error
+	endChannel    chan struct{}
 	recvPipe      *io.PipeReader
 	writePipe     *io.PipeWriter
 	buffer        bytes.Buffer
 	reset         chan struct{}
-	index         int
+
+	index  int
+	closed bool
 	*BytesInfo
 }
 
-var webrtcRemotes map[int]*webRTCConn
-var remoteIndex int = 0
-
 func (c *webRTCConn) Read(b []byte) (int, error) {
 	return c.recvPipe.Read(b)
 }
@@ -51,10 +52,17 @@ func (c *webRTCConn) Close() error {
 	var err error = nil
 	log.Printf("WebRTC: Closing")
 	c.cleanup()
-	close(c.offerChannel)
-	close(c.answerChannel)
-	close(c.errorChannel)
-	delete(webrtcRemotes, c.index)
+	if nil != c.offerChannel {
+		close(c.offerChannel)
+	}
+	if nil != c.answerChannel {
+		close(c.answerChannel)
+	}
+	if nil != c.errorChannel {
+		close(c.errorChannel)
+	}
+	// Mark for deletion.
+	c.closed = true
 	return err
 }
 
@@ -78,6 +86,7 @@ func (c *webRTCConn) SetWriteDeadline(t time.Time) error {
 	return fmt.Errorf("SetWriteDeadline not implemented")
 }
 
+// Construct a WebRTC PeerConnection.
 func NewWebRTCConnection(config *webrtc.Configuration,
 	broker *BrokerChannel) *webRTCConn {
 	connection := new(webRTCConn)
@@ -90,6 +99,7 @@ func NewWebRTCConnection(config *webrtc.Configuration,
 	connection.errorChannel = make(chan error, 1)
 	connection.reset = make(chan struct{}, 1)
 
+	// TODO: Separate out.
 	// Log every few seconds.
 	connection.BytesInfo = &BytesInfo{
 		inboundChan: make(chan int, 5), outboundChan: make(chan int, 5),
@@ -99,9 +109,6 @@ func NewWebRTCConnection(config *webrtc.Configuration,
 
 	// Pipes remain the same even when DataChannel gets switched.
 	connection.recvPipe, connection.writePipe = io.Pipe()
-	connection.index = remoteIndex
-	webrtcRemotes[connection.index] = connection
-	remoteIndex++
 	return connection
 }
 
@@ -296,12 +303,12 @@ func (c *webRTCConn) Reset() {
 
 func (c *webRTCConn) cleanup() {
 	if nil != c.snowflake {
-		s := c.snowflake
 		log.Printf("WebRTC: closing DataChannel")
+		dataChannel := c.snowflake
 		// Setting snowflake to nil *before* Close indicates to OnClose that it
 		// was locally triggered.
 		c.snowflake = nil
-		s.Close()
+		dataChannel.Close()
 	}
 	if nil != c.pc {
 		log.Printf("WebRTC: closing PeerConnection")



More information about the tor-commits mailing list