commit 2caa47988dc2e7d78db083d2609b8656e4731079
Author: Serene Han <keroserene+git(a)gmail.com>
Date: Sat Jun 11 23:18:38 2016 -0700
fix Peers.Count() using activePeers list, mark for delete on Close, and remove
maxedChan
---
client/client_test.go | 77 +++++++++++++++++++++++++++++++--------------------
client/peers.go | 72 ++++++++++++++++++++++++++---------------------
client/snowflake.go | 8 ++++--
client/webrtc.go | 5 ++--
4 files changed, 95 insertions(+), 67 deletions(-)
diff --git a/client/client_test.go b/client/client_test.go
index f58aeb0..de83768 100644
--- a/client/client_test.go
+++ b/client/client_test.go
@@ -75,7 +75,7 @@ func TestSnowflakeClient(t *testing.T) {
snowflakes.Tongue = FakeDialer{}
go ConnectLoop(snowflakes)
- <-snowflakes.maxedChan
+ // <-snowflakes.maxedChan
So(snowflakes.Count(), ShouldEqual, 1)
r := <-snowflakes.snowflakeChan
@@ -88,7 +88,7 @@ func TestSnowflakeClient(t *testing.T) {
snowflakes.Tongue = FakeDialer{}
go ConnectLoop(snowflakes)
- <-snowflakes.maxedChan
+ // <-snowflakes.maxedChan
So(snowflakes.Count(), ShouldEqual, 3)
<-snowflakes.snowflakeChan
<-snowflakes.snowflakeChan
@@ -101,13 +101,13 @@ func TestSnowflakeClient(t *testing.T) {
snowflakes.Tongue = FakeDialer{}
go ConnectLoop(snowflakes)
- <-snowflakes.maxedChan
+ // <-snowflakes.maxedChan
So(snowflakes.Count(), ShouldEqual, 3)
r := <-snowflakes.snowflakeChan
So(snowflakes.Count(), ShouldEqual, 2)
r.Close()
- <-snowflakes.maxedChan
+ // <-snowflakes.maxedChan
So(snowflakes.Count(), ShouldEqual, 3)
<-snowflakes.snowflakeChan
@@ -121,7 +121,6 @@ func TestSnowflakeClient(t *testing.T) {
Convey("Can construct", func() {
p := NewPeers(1)
So(p.capacity, ShouldEqual, 1)
- So(p.current, ShouldEqual, nil)
So(p.snowflakeChan, ShouldNotBeNil)
So(cap(p.snowflakeChan), ShouldEqual, 1)
})
@@ -136,36 +135,54 @@ func TestSnowflakeClient(t *testing.T) {
err = p.Collect()
So(err, ShouldBeNil)
So(p.Count(), ShouldEqual, 1)
- // S
+ // S
err = p.Collect()
})
Convey("Collection continues until capacity.", func() {
- c := 5
+ c := 5
p := NewPeers(c)
- p.Tongue = FakeDialer{}
- // Fill up to capacity.
- for i := 0 ; i < c ; i++ {
- fmt.Println("Adding snowflake ", i)
- err := p.Collect()
- So(err, ShouldBeNil)
- So(p.Count(), ShouldEqual, i + 1)
- }
- // But adding another gives an error.
- So(p.Count(), ShouldEqual, c)
- err := p.Collect()
- So(err, ShouldNotBeNil)
- So(p.Count(), ShouldEqual, c)
-
- // But popping allows it to continue.
- s := p.Pop()
- So(s, ShouldNotBeNil)
- So(p.Count(), ShouldEqual, c)
-
- // err = p.Collect()
- // So(err, ShouldNotBeNil)
- // So(p.Count(), ShouldEqual, c)
- })
+ p.Tongue = FakeDialer{}
+ // Fill up to capacity.
+ for i := 0; i < c; i++ {
+ fmt.Println("Adding snowflake ", i)
+ err := p.Collect()
+ So(err, ShouldBeNil)
+ So(p.Count(), ShouldEqual, i+1)
+ }
+ // But adding another gives an error.
+ So(p.Count(), ShouldEqual, c)
+ err := p.Collect()
+ So(err, ShouldNotBeNil)
+ So(p.Count(), ShouldEqual, c)
+
+ // But popping and closing allows it to continue.
+ s := p.Pop()
+ s.Close()
+ So(s, ShouldNotBeNil)
+ So(p.Count(), ShouldEqual, c-1)
+
+ err = p.Collect()
+ So(err, ShouldBeNil)
+ So(p.Count(), ShouldEqual, c)
+ })
+
+ Convey("Count correctly purges peers marked for deletion.", func() {
+ p := NewPeers(4)
+ p.Tongue = FakeDialer{}
+ p.Collect()
+ p.Collect()
+ p.Collect()
+ p.Collect()
+ So(p.Count(), ShouldEqual, 4)
+ s := p.Pop()
+ s.Close()
+ So(p.Count(), ShouldEqual, 3)
+ s = p.Pop()
+ s.Close()
+ So(p.Count(), ShouldEqual, 2)
+ })
+
})
Convey("Snowflake", t, func() {
diff --git a/client/peers.go b/client/peers.go
index 769174b..57570bf 100644
--- a/client/peers.go
+++ b/client/peers.go
@@ -1,6 +1,7 @@
package main
import (
+ "container/list"
"errors"
"fmt"
"log"
@@ -22,70 +23,77 @@ type Peers struct {
BytesLogger
snowflakeChan chan *webRTCConn
- current *webRTCConn
+ activePeers *list.List
capacity int
- // TODO: Probably not necessary.
- maxedChan chan struct{}
}
// Construct a fresh container of remote peers.
func NewPeers(max int) *Peers {
- p := &Peers{capacity: max, current: nil}
+ p := &Peers{capacity: max}
// Use buffered go channel to pass new snowflakes onwards to the SOCKS handler.
p.snowflakeChan = make(chan *webRTCConn, max)
- p.maxedChan = make(chan struct{}, 1)
+ p.activePeers = list.New()
return p
}
-// TODO: Needs fixing.
-func (p *Peers) Count() int {
- count := 0
- if p.current != nil {
- count = 1
- }
- return count + len(p.snowflakeChan)
-}
-
// As part of |SnowflakeCollector| interface.
func (p *Peers) Collect() error {
- if p.Count() >= p.capacity {
- s := fmt.Sprintf("At capacity [%d/%d]", p.Count(), p.capacity)
- p.maxedChan <- struct{}{}
+ cnt := p.Count()
+ if cnt >= p.capacity {
+ s := fmt.Sprintf("At capacity [%d/%d]", cnt, p.capacity)
return errors.New(s)
}
- // Engage the Snowflake Catching interface, which must be available.
+ // Engage the Snowflake Catching interface, which must be available.
if nil == p.Tongue {
return errors.New("Missing Tongue to catch Snowflakes with.")
}
connection, err := p.Tongue.Catch()
- if nil == connection || nil != err {
- return err
- }
- // Use the same rate-limited traffic logger to keep consistency.
- connection.BytesLogger = p.BytesLogger
+ if nil == connection || nil != err {
+ return err
+ }
+ // Track new valid Snowflake in internal collection and pass along.
+ p.activePeers.PushBack(connection)
p.snowflakeChan <- connection
return nil
}
// As part of |SnowflakeCollector| interface.
func (p *Peers) Pop() *webRTCConn {
- // Blocks until an available snowflake appears.
+ // Blocks until an available snowflake appears.
snowflake, ok := <-p.snowflakeChan
if !ok {
return nil
}
- p.current = snowflake
+ // Set to use the same rate-limited traffic logger to keep consistency.
snowflake.BytesLogger = p.BytesLogger
return snowflake
}
-// Close all remote peers.
-func (p *Peers) End() {
- log.Printf("WebRTC: interruped")
- if nil != p.current {
- p.current.Close()
+// Returns total available Snowflakes (including the active one)
+// The count only reduces when connections themselves close, rather than when
+// they are popped.
+func (p *Peers) Count() int {
+ p.purgeClosedPeers()
+ return p.activePeers.Len()
+}
+
+func (p *Peers) purgeClosedPeers() {
+ for e := p.activePeers.Front(); e != nil; {
+ next := e.Next()
+ conn := e.Value.(*webRTCConn)
+ // Purge those marked for deletion.
+ if conn.closed {
+ p.activePeers.Remove(e)
+ }
+ e = next
}
- for r := range p.snowflakeChan {
- r.Close()
+}
+
+// Close all Peers contained here.
+func (p *Peers) End() {
+ log.Printf("WebRTC: Ending all peer connections.")
+ for e := p.activePeers.Front(); e != nil; e = e.Next() {
+ conn := e.Value.(*webRTCConn)
+ conn.Close()
}
}
diff --git a/client/snowflake.go b/client/snowflake.go
index f8edc2a..aa0e470 100644
--- a/client/snowflake.go
+++ b/client/snowflake.go
@@ -61,6 +61,7 @@ func ConnectLoop(snowflakes SnowflakeCollector) {
continue
}
// Successful collection gets rate limited to once per second.
+ log.Println("ConnectLoop success.")
<-time.After(time.Second)
}
}
@@ -68,10 +69,10 @@ func ConnectLoop(snowflakes SnowflakeCollector) {
// Accept local SOCKS connections and pass them to the handler.
func acceptLoop(ln *pt.SocksListener, snowflakes SnowflakeCollector) error {
defer ln.Close()
+ log.Println("Started SOCKS listener.")
for {
- log.Println("SOCKS listening...", ln)
conn, err := ln.AcceptSocks()
- log.Println("accepting", conn, err)
+ log.Println("SOCKS accepted ", conn.Req)
if err != nil {
if e, ok := err.(net.Error); ok && e.Temporary() {
continue
@@ -138,7 +139,8 @@ func readSignalingMessages(f *os.File) {
func main() {
webrtc.SetLoggingVerbosity(1)
- logFile, err := os.OpenFile("snowflake.log", os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0600)
+ logFile, err := os.OpenFile("snowflake.log",
+ os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0600)
if err != nil {
log.Fatal(err)
}
diff --git a/client/webrtc.go b/client/webrtc.go
index 2466a1d..87a1d19 100644
--- a/client/webrtc.go
+++ b/client/webrtc.go
@@ -18,9 +18,10 @@ type WebRTCDialer struct {
}
func NewWebRTCDialer(broker *BrokerChannel) *WebRTCDialer {
+ config := webrtc.NewConfiguration(iceServers...)
return &WebRTCDialer{
- broker,
- webrtc.NewConfiguration(iceServers...),
+ BrokerChannel: broker,
+ webrtcConfig: config,
}
}