[tor-commits] [snowflake/master] fix Peers.Count() using activePeers list, mark for delete on Close, and remove

serene at torproject.org serene at torproject.org
Sun Jun 12 19:44:05 UTC 2016


commit 2caa47988dc2e7d78db083d2609b8656e4731079
Author: Serene Han <keroserene+git at gmail.com>
Date:   Sat Jun 11 23:18:38 2016 -0700

    fix Peers.Count() using activePeers list, mark for delete on Close, and remove
    maxedChan
---
 client/client_test.go | 77 +++++++++++++++++++++++++++++++--------------------
 client/peers.go       | 72 ++++++++++++++++++++++++++---------------------
 client/snowflake.go   |  8 ++++--
 client/webrtc.go      |  5 ++--
 4 files changed, 95 insertions(+), 67 deletions(-)

diff --git a/client/client_test.go b/client/client_test.go
index f58aeb0..de83768 100644
--- a/client/client_test.go
+++ b/client/client_test.go
@@ -75,7 +75,7 @@ func TestSnowflakeClient(t *testing.T) {
 			snowflakes.Tongue = FakeDialer{}
 
 			go ConnectLoop(snowflakes)
-			<-snowflakes.maxedChan
+			// <-snowflakes.maxedChan
 
 			So(snowflakes.Count(), ShouldEqual, 1)
 			r := <-snowflakes.snowflakeChan
@@ -88,7 +88,7 @@ func TestSnowflakeClient(t *testing.T) {
 			snowflakes.Tongue = FakeDialer{}
 
 			go ConnectLoop(snowflakes)
-			<-snowflakes.maxedChan
+			// <-snowflakes.maxedChan
 			So(snowflakes.Count(), ShouldEqual, 3)
 			<-snowflakes.snowflakeChan
 			<-snowflakes.snowflakeChan
@@ -101,13 +101,13 @@ func TestSnowflakeClient(t *testing.T) {
 			snowflakes.Tongue = FakeDialer{}
 
 			go ConnectLoop(snowflakes)
-			<-snowflakes.maxedChan
+			// <-snowflakes.maxedChan
 			So(snowflakes.Count(), ShouldEqual, 3)
 
 			r := <-snowflakes.snowflakeChan
 			So(snowflakes.Count(), ShouldEqual, 2)
 			r.Close()
-			<-snowflakes.maxedChan
+			// <-snowflakes.maxedChan
 			So(snowflakes.Count(), ShouldEqual, 3)
 
 			<-snowflakes.snowflakeChan
@@ -121,7 +121,6 @@ func TestSnowflakeClient(t *testing.T) {
 		Convey("Can construct", func() {
 			p := NewPeers(1)
 			So(p.capacity, ShouldEqual, 1)
-			So(p.current, ShouldEqual, nil)
 			So(p.snowflakeChan, ShouldNotBeNil)
 			So(cap(p.snowflakeChan), ShouldEqual, 1)
 		})
@@ -136,36 +135,54 @@ func TestSnowflakeClient(t *testing.T) {
 			err = p.Collect()
 			So(err, ShouldBeNil)
 			So(p.Count(), ShouldEqual, 1)
-      // S
+			// S
 			err = p.Collect()
 		})
 
 		Convey("Collection continues until capacity.", func() {
-      c := 5
+			c := 5
 			p := NewPeers(c)
-      p.Tongue = FakeDialer{}
-      // Fill up to capacity.
-      for i := 0 ; i < c ; i++ {
-	      fmt.Println("Adding snowflake ", i)
-			  err := p.Collect()
-			  So(err, ShouldBeNil)
-    		So(p.Count(), ShouldEqual, i + 1)
-      }
-      // But adding another gives an error.
-  		So(p.Count(), ShouldEqual, c)
-  		err := p.Collect()
-  		So(err, ShouldNotBeNil)
-  		So(p.Count(), ShouldEqual, c)
-
-      // But popping allows it to continue.
-      s := p.Pop()
-      So(s, ShouldNotBeNil)
-  		So(p.Count(), ShouldEqual, c)
-
-  		// err = p.Collect()
-  		// So(err, ShouldNotBeNil)
-  		// So(p.Count(), ShouldEqual, c)
-    })
+			p.Tongue = FakeDialer{}
+			// Fill up to capacity.
+			for i := 0; i < c; i++ {
+				fmt.Println("Adding snowflake ", i)
+				err := p.Collect()
+				So(err, ShouldBeNil)
+				So(p.Count(), ShouldEqual, i+1)
+			}
+			// But adding another gives an error.
+			So(p.Count(), ShouldEqual, c)
+			err := p.Collect()
+			So(err, ShouldNotBeNil)
+			So(p.Count(), ShouldEqual, c)
+
+			// But popping and closing allows it to continue.
+			s := p.Pop()
+			s.Close()
+			So(s, ShouldNotBeNil)
+			So(p.Count(), ShouldEqual, c-1)
+
+			err = p.Collect()
+			So(err, ShouldBeNil)
+			So(p.Count(), ShouldEqual, c)
+		})
+
+		Convey("Count correctly purges peers marked for deletion.", func() {
+			p := NewPeers(4)
+			p.Tongue = FakeDialer{}
+			p.Collect()
+			p.Collect()
+			p.Collect()
+			p.Collect()
+			So(p.Count(), ShouldEqual, 4)
+			s := p.Pop()
+			s.Close()
+			So(p.Count(), ShouldEqual, 3)
+			s = p.Pop()
+			s.Close()
+			So(p.Count(), ShouldEqual, 2)
+		})
+
 	})
 
 	Convey("Snowflake", t, func() {
diff --git a/client/peers.go b/client/peers.go
index 769174b..57570bf 100644
--- a/client/peers.go
+++ b/client/peers.go
@@ -1,6 +1,7 @@
 package main
 
 import (
+	"container/list"
 	"errors"
 	"fmt"
 	"log"
@@ -22,70 +23,77 @@ type Peers struct {
 	BytesLogger
 
 	snowflakeChan chan *webRTCConn
-	current       *webRTCConn
+	activePeers   *list.List
 	capacity      int
-	// TODO: Probably not necessary.
-	maxedChan chan struct{}
 }
 
 // Construct a fresh container of remote peers.
 func NewPeers(max int) *Peers {
-	p := &Peers{capacity: max, current: nil}
+	p := &Peers{capacity: max}
 	// Use buffered go channel to pass new snowflakes onwards to the SOCKS handler.
 	p.snowflakeChan = make(chan *webRTCConn, max)
-	p.maxedChan = make(chan struct{}, 1)
+	p.activePeers = list.New()
 	return p
 }
 
-// TODO: Needs fixing.
-func (p *Peers) Count() int {
-	count := 0
-	if p.current != nil {
-		count = 1
-	}
-	return count + len(p.snowflakeChan)
-}
-
 // As part of |SnowflakeCollector| interface.
 func (p *Peers) Collect() error {
-	if p.Count() >= p.capacity {
-		s := fmt.Sprintf("At capacity [%d/%d]", p.Count(), p.capacity)
-		p.maxedChan <- struct{}{}
+	cnt := p.Count()
+	if cnt >= p.capacity {
+		s := fmt.Sprintf("At capacity [%d/%d]", cnt, p.capacity)
 		return errors.New(s)
 	}
-  // Engage the Snowflake Catching interface, which must be available.
+	// Engage the Snowflake Catching interface, which must be available.
 	if nil == p.Tongue {
 		return errors.New("Missing Tongue to catch Snowflakes with.")
 	}
 	connection, err := p.Tongue.Catch()
-  if nil == connection || nil != err {
-    return err
-  }
-  // Use the same rate-limited traffic logger to keep consistency.
-	connection.BytesLogger = p.BytesLogger
+	if nil == connection || nil != err {
+		return err
+	}
+	// Track new valid Snowflake in internal collection and pass along.
+	p.activePeers.PushBack(connection)
 	p.snowflakeChan <- connection
 	return nil
 }
 
 // As part of |SnowflakeCollector| interface.
 func (p *Peers) Pop() *webRTCConn {
-  // Blocks until an available snowflake appears.
+	// Blocks until an available snowflake appears.
 	snowflake, ok := <-p.snowflakeChan
 	if !ok {
 		return nil
 	}
-	p.current = snowflake
+	// Set to use the same rate-limited traffic logger to keep consistency.
 	snowflake.BytesLogger = p.BytesLogger
 	return snowflake
 }
 
-// Close all remote peers.
-func (p *Peers) End() {
-	log.Printf("WebRTC: interruped")
-	if nil != p.current {
-		p.current.Close()
+// Returns total available Snowflakes (including the active one)
+// The count only reduces when connections themselves close, rather than when
+// they are popped.
+func (p *Peers) Count() int {
+	p.purgeClosedPeers()
+	return p.activePeers.Len()
+}
+
+func (p *Peers) purgeClosedPeers() {
+	for e := p.activePeers.Front(); e != nil; {
+		next := e.Next()
+		conn := e.Value.(*webRTCConn)
+		// Purge those marked for deletion.
+		if conn.closed {
+			p.activePeers.Remove(e)
+		}
+		e = next
 	}
-	for r := range p.snowflakeChan {
-		r.Close()
+}
+
+// Close all Peers contained here.
+func (p *Peers) End() {
+	log.Printf("WebRTC: Ending all peer connections.")
+	for e := p.activePeers.Front(); e != nil; e = e.Next() {
+		conn := e.Value.(*webRTCConn)
+		conn.Close()
 	}
 }
diff --git a/client/snowflake.go b/client/snowflake.go
index f8edc2a..aa0e470 100644
--- a/client/snowflake.go
+++ b/client/snowflake.go
@@ -61,6 +61,7 @@ func ConnectLoop(snowflakes SnowflakeCollector) {
 			continue
 		}
 		// Successful collection gets rate limited to once per second.
+		log.Println("ConnectLoop success.")
 		<-time.After(time.Second)
 	}
 }
@@ -68,10 +69,10 @@ func ConnectLoop(snowflakes SnowflakeCollector) {
 // Accept local SOCKS connections and pass them to the handler.
 func acceptLoop(ln *pt.SocksListener, snowflakes SnowflakeCollector) error {
 	defer ln.Close()
+	log.Println("Started SOCKS listener.")
 	for {
-		log.Println("SOCKS listening...", ln)
 		conn, err := ln.AcceptSocks()
-		log.Println("accepting", conn, err)
+		log.Println("SOCKS accepted ", conn.Req)
 		if err != nil {
 			if e, ok := err.(net.Error); ok && e.Temporary() {
 				continue
@@ -138,7 +139,8 @@ func readSignalingMessages(f *os.File) {
 
 func main() {
 	webrtc.SetLoggingVerbosity(1)
-	logFile, err := os.OpenFile("snowflake.log", os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0600)
+	logFile, err := os.OpenFile("snowflake.log",
+		os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0600)
 	if err != nil {
 		log.Fatal(err)
 	}
diff --git a/client/webrtc.go b/client/webrtc.go
index 2466a1d..87a1d19 100644
--- a/client/webrtc.go
+++ b/client/webrtc.go
@@ -18,9 +18,10 @@ type WebRTCDialer struct {
 }
 
 func NewWebRTCDialer(broker *BrokerChannel) *WebRTCDialer {
+	config := webrtc.NewConfiguration(iceServers...)
 	return &WebRTCDialer{
-		broker,
-		webrtc.NewConfiguration(iceServers...),
+		BrokerChannel: broker,
+		webrtcConfig:  config,
 	}
 }
 





More information about the tor-commits mailing list