[tor-commits] [snowflake/master] able to break out of ConnectLoop, try separate webrtcConfigs as well

serene at torproject.org serene at torproject.org
Mon Jun 13 22:13:11 UTC 2016


commit a71c98c0aef198e7cf7308dee414e3e17a14059b
Author: Serene Han <keroserene+git at gmail.com>
Date:   Mon Jun 13 15:11:55 2016 -0700

    able to break out of ConnectLoop, try separate webrtcConfigs as well
---
 client/client_test.go | 11 ++++++-----
 client/interfaces.go  |  3 +++
 client/peers.go       | 23 +++++++++++++++++++----
 client/rendezvous.go  | 11 +++++++----
 client/snowflake.go   | 16 ++++++++++------
 client/webrtc.go      | 49 +++++++++++++++++++++++--------------------------
 6 files changed, 68 insertions(+), 45 deletions(-)

diff --git a/client/client_test.go b/client/client_test.go
index 87fb2cb..41d4870 100644
--- a/client/client_test.go
+++ b/client/client_test.go
@@ -65,8 +65,9 @@ func (f FakeSocksConn) Grant(addr *net.TCPAddr) error { return nil }
 
 type FakePeers struct{ toRelease *webRTCConn }
 
-func (f FakePeers) Collect() error { return nil }
-func (f FakePeers) Pop() Snowflake { return nil }
+func (f FakePeers) Collect() error          { return nil }
+func (f FakePeers) Pop() Snowflake          { return nil }
+func (f FakePeers) Melted() <-chan struct{} { return nil }
 
 func TestSnowflakeClient(t *testing.T) {
 
@@ -144,6 +145,7 @@ func TestSnowflakeClient(t *testing.T) {
 			}
 			So(p.Count(), ShouldEqual, cnt)
 			p.End()
+			<-p.Melted()
 			So(p.Count(), ShouldEqual, 0)
 		})
 
@@ -168,7 +170,6 @@ func TestSnowflakeClient(t *testing.T) {
 			Convey("Can construct a WebRTCConn", func() {
 				s := NewWebRTCConnection(nil, nil)
 				So(s, ShouldNotBeNil)
-				So(s.index, ShouldEqual, 0)
 				So(s.offerChannel, ShouldNotBeNil)
 				So(s.answerChannel, ShouldNotBeNil)
 				s.Close()
@@ -176,13 +177,13 @@ func TestSnowflakeClient(t *testing.T) {
 
 			Convey("Write buffers when datachannel is nil", func() {
 				c.Write([]byte("test"))
-				c.snowflake = nil
+				c.transport = nil
 				So(c.buffer.Bytes(), ShouldResemble, []byte("test"))
 			})
 
 			Convey("Write sends to datachannel when not nil", func() {
 				mock := new(MockDataChannel)
-				c.snowflake = mock
+				c.transport = mock
 				mock.done = make(chan bool, 1)
 				c.Write([]byte("test"))
 				<-mock.done
diff --git a/client/interfaces.go b/client/interfaces.go
index 4fb0dcf..502eefb 100644
--- a/client/interfaces.go
+++ b/client/interfaces.go
@@ -38,6 +38,9 @@ type SnowflakeCollector interface {
 
 	// Remove and return the most available Snowflake from the collection.
 	Pop() Snowflake
+
+	// Signal when the collector has stopped collecting.
+	Melted() <-chan struct{}
 }
 
 // Interface to adapt to goptlib's SocksConn struct.
diff --git a/client/peers.go b/client/peers.go
index eb435fd..74b804f 100644
--- a/client/peers.go
+++ b/client/peers.go
@@ -25,6 +25,8 @@ type Peers struct {
 	snowflakeChan chan Snowflake
 	activePeers   *list.List
 	capacity      int
+
+	melt chan struct{}
 }
 
 // Construct a fresh container of remote peers.
@@ -33,17 +35,19 @@ func NewPeers(max int) *Peers {
 	// Use buffered go channel to pass snowflakes onwards to the SOCKS handler.
 	p.snowflakeChan = make(chan Snowflake, max)
 	p.activePeers = list.New()
+	p.melt = make(chan struct{}, 1)
 	return p
 }
 
 // As part of |SnowflakeCollector| interface.
 func (p *Peers) Collect() error {
-
 	cnt := p.Count()
+	s := fmt.Sprintf("Currently at [%d/%d]", cnt, p.capacity)
 	if cnt >= p.capacity {
 		s := fmt.Sprintf("At capacity [%d/%d]", cnt, p.capacity)
 		return errors.New(s)
 	}
+	log.Println("WebRTC: Collecting a new Snowflake.", s)
 	// Engage the Snowflake Catching interface, which must be available.
 	if nil == p.Tongue {
 		return errors.New("Missing Tongue to catch Snowflakes with.")
@@ -60,7 +64,6 @@ func (p *Peers) Collect() error {
 
 // As part of |SnowflakeCollector| interface.
 func (p *Peers) Pop() Snowflake {
-
 	// Blocks until an available snowflake appears.
 	snowflake, ok := <-p.snowflakeChan
 	if !ok {
@@ -71,6 +74,11 @@ func (p *Peers) Pop() Snowflake {
 	return snowflake
 }
 
+// As part of |SnowflakeCollector| interface.
+func (p *Peers) Melted() <-chan struct{} {
+	return p.melt
+}
+
 // Returns total available Snowflakes (including the active one)
 // The count only reduces when connections themselves close, rather than when
 // they are popped.
@@ -93,9 +101,16 @@ func (p *Peers) purgeClosedPeers() {
 
 // Close all Peers contained here.
 func (p *Peers) End() {
-	log.Printf("WebRTC: Ending all peer connections.")
-	for e := p.activePeers.Front(); e != nil; e = e.Next() {
+	close(p.snowflakeChan)
+	p.melt <- struct{}{}
+	cnt := p.Count()
+	for e := p.activePeers.Front(); e != nil; {
+		log.Println(e, e.Value)
+		next := e.Next()
 		conn := e.Value.(*webRTCConn)
 		conn.Close()
+		p.activePeers.Remove(e)
+		e = next
 	}
+	log.Println("WebRTC: melted all", cnt, "snowflakes.")
 }
diff --git a/client/rendezvous.go b/client/rendezvous.go
index 6c7aa44..2bcce17 100644
--- a/client/rendezvous.go
+++ b/client/rendezvous.go
@@ -115,15 +115,17 @@ func (bc *BrokerChannel) Negotiate(offer *webrtc.SessionDescription) (
 // Implements the |Tongue| interface to catch snowflakes, using BrokerChannel.
 type WebRTCDialer struct {
 	*BrokerChannel
-	webrtcConfig *webrtc.Configuration
+	// webrtcConfig *webrtc.Configuration
+	iceServers IceServerList
 }
 
 func NewWebRTCDialer(
 	broker *BrokerChannel, iceServers IceServerList) *WebRTCDialer {
-	config := webrtc.NewConfiguration(iceServers...)
+
 	return &WebRTCDialer{
 		BrokerChannel: broker,
-		webrtcConfig:  config,
+		iceServers:    iceServers,
+		// webrtcConfig:  config,
 	}
 }
 
@@ -134,7 +136,8 @@ func (w WebRTCDialer) Catch() (Snowflake, error) {
 	}
 	// TODO: [#3] Fetch ICE server information from Broker.
 	// TODO: [#18] Consider TURN servers here too.
-	connection := NewWebRTCConnection(w.webrtcConfig, w.BrokerChannel)
+	config := webrtc.NewConfiguration(w.iceServers...)
+	connection := NewWebRTCConnection(config, w.BrokerChannel)
 	err := connection.Connect()
 	return connection, err
 }
diff --git a/client/snowflake.go b/client/snowflake.go
index 9f5cdc7..d122494 100644
--- a/client/snowflake.go
+++ b/client/snowflake.go
@@ -30,17 +30,19 @@ var handlerChan = make(chan int)
 // transfer to the Tor SOCKS handler when needed.
 func ConnectLoop(snowflakes SnowflakeCollector) {
 	for {
+		// Check if ending is necessary.
 		err := snowflakes.Collect()
 		if nil != err {
 			log.Println("WebRTC:", err,
 				" Retrying in", ReconnectTimeout, "seconds...")
-			// Failed collections get a timeout.
-			<-time.After(time.Second * ReconnectTimeout)
+		}
+		select {
+		case <-time.After(time.Second * ReconnectTimeout):
 			continue
+		case <-snowflakes.Melted():
+			log.Println("ConnectLoop: stopped.")
+			return
 		}
-		// Successful collection gets rate limited to once per second.
-		log.Println("WebRTC: Connected to new Snowflake.")
-		<-time.After(time.Second)
 	}
 }
 
@@ -50,7 +52,7 @@ func socksAcceptLoop(ln *pt.SocksListener, snowflakes SnowflakeCollector) error
 	log.Println("Started SOCKS listener.")
 	for {
 		conn, err := ln.AcceptSocks()
-		log.Println("SOCKS accepted ", conn.Req)
+		log.Println("SOCKS accepted: ", conn.Req)
 		if err != nil {
 			if e, ok := err.(net.Error); ok && e.Temporary() {
 				continue
@@ -72,6 +74,7 @@ func handler(socks SocksConnector, snowflakes SnowflakeCollector) error {
 		handlerChan <- -1
 	}()
 	// Obtain an available WebRTC remote. May block.
+	log.Println("handler: awaiting Snowflake...")
 	snowflake := snowflakes.Pop()
 	if nil == snowflake {
 		socks.Reject()
@@ -85,6 +88,7 @@ func handler(socks SocksConnector, snowflakes SnowflakeCollector) error {
 	}
 
 	// Begin exchanging data.
+	// BUG(serene): There's a leak here when multiplexed.
 	go copyLoop(socks, snowflake)
 
 	// When WebRTC resets, close the SOCKS connection, which induces new handler.
diff --git a/client/webrtc.go b/client/webrtc.go
index c7a46ea..6cd5da6 100644
--- a/client/webrtc.go
+++ b/client/webrtc.go
@@ -15,19 +15,17 @@ import (
 type webRTCConn struct {
 	config    *webrtc.Configuration
 	pc        *webrtc.PeerConnection
-	snowflake SnowflakeDataChannel // Holds the WebRTC DataChannel.
+	transport SnowflakeDataChannel // Holds the WebRTC DataChannel.
 	broker    *BrokerChannel
 
 	offerChannel  chan *webrtc.SessionDescription
 	answerChannel chan *webrtc.SessionDescription
 	errorChannel  chan error
-	endChannel    chan struct{}
 	recvPipe      *io.PipeReader
 	writePipe     *io.PipeWriter
 	buffer        bytes.Buffer
 	reset         chan struct{}
 
-	index  int
 	closed bool
 
 	BytesLogger
@@ -43,11 +41,11 @@ func (c *webRTCConn) Read(b []byte) (int, error) {
 // As part of |io.ReadWriter|
 func (c *webRTCConn) Write(b []byte) (int, error) {
 	c.BytesLogger.AddOutbound(len(b))
-	if nil == c.snowflake {
+	if nil == c.transport {
 		log.Printf("Buffered %d bytes --> WebRTC", len(b))
 		c.buffer.Write(b)
 	} else {
-		c.snowflake.Send(b)
+		c.transport.Send(b)
 	}
 	return len(b), nil
 }
@@ -57,15 +55,6 @@ func (c *webRTCConn) Close() error {
 	var err error = nil
 	log.Printf("WebRTC: Closing")
 	c.cleanup()
-	if nil != c.offerChannel {
-		close(c.offerChannel)
-	}
-	if nil != c.answerChannel {
-		close(c.answerChannel)
-	}
-	if nil != c.errorChannel {
-		close(c.errorChannel)
-	}
 	// Mark for deletion.
 	c.closed = true
 	return err
@@ -106,7 +95,6 @@ func NewWebRTCConnection(config *webrtc.Configuration,
 
 // As part of |Connector| interface.
 func (c *webRTCConn) Connect() error {
-	log.Printf("Establishing WebRTC connection #%d...", c.index)
 	// TODO: When go-webrtc is more stable, it's possible that a new
 	// PeerConnection won't need to be re-prepared each time.
 	err := c.preparePeerConnection()
@@ -174,7 +162,7 @@ func (c *webRTCConn) preparePeerConnection() error {
 
 // Create a WebRTC DataChannel locally.
 func (c *webRTCConn) establishDataChannel() error {
-	if c.snowflake != nil {
+	if c.transport != nil {
 		panic("Unexpected datachannel already exists!")
 	}
 	dc, err := c.pc.CreateDataChannel("snowflake", webrtc.Init{})
@@ -187,9 +175,8 @@ func (c *webRTCConn) establishDataChannel() error {
 	}
 	dc.OnOpen = func() {
 		log.Println("WebRTC: DataChannel.OnOpen")
-		if nil != c.snowflake {
-			log.Println("PeerConnection snowflake already exists.")
-			panic("PeerConnection snowflake already exists.")
+		if nil != c.transport {
+			panic("WebRTC: transport already exists.")
 		}
 		// Flush buffered outgoing SOCKS data if necessary.
 		if c.buffer.Len() > 0 {
@@ -198,11 +185,11 @@ func (c *webRTCConn) establishDataChannel() error {
 			c.buffer.Reset()
 		}
 		// Then enable the datachannel.
-		c.snowflake = dc
+		c.transport = dc
 	}
 	dc.OnClose = func() {
 		// Future writes will go to the buffer until a new DataChannel is available.
-		if nil == c.snowflake {
+		if nil == c.transport {
 			// Closed locally, as part of a reset.
 			log.Println("WebRTC: DataChannel.OnClose [locally]")
 			return
@@ -210,7 +197,7 @@ func (c *webRTCConn) establishDataChannel() error {
 		// Closed remotely, need to reset everything.
 		// Disable the DataChannel as a write destination.
 		log.Println("WebRTC: DataChannel.OnClose [remotely]")
-		c.snowflake = nil
+		c.transport = nil
 		c.Reset()
 	}
 	dc.OnMessage = func(msg []byte) {
@@ -284,13 +271,23 @@ func (c *webRTCConn) exchangeSDP() error {
 	return nil
 }
 
+// Close all channels and transports
 func (c *webRTCConn) cleanup() {
-	if nil != c.snowflake {
+	if nil != c.offerChannel {
+		close(c.offerChannel)
+	}
+	if nil != c.answerChannel {
+		close(c.answerChannel)
+	}
+	if nil != c.errorChannel {
+		close(c.errorChannel)
+	}
+	if nil != c.transport {
 		log.Printf("WebRTC: closing DataChannel")
-		dataChannel := c.snowflake
-		// Setting snowflake to nil *before* Close indicates to OnClose that it
+		dataChannel := c.transport
+		// Setting dc to nil *before* Close indicates to OnClose that it
 		// was locally triggered.
-		c.snowflake = nil
+		c.transport = nil
 		dataChannel.Close()
 	}
 	if nil != c.pc {



More information about the tor-commits mailing list