commit 2caa47988dc2e7d78db083d2609b8656e4731079 Author: Serene Han keroserene+git@gmail.com Date: Sat Jun 11 23:18:38 2016 -0700
fix Peers.Count() using activePeers list, mark for delete on Close, and remove maxedChan --- client/client_test.go | 77 +++++++++++++++++++++++++++++++-------------------- client/peers.go | 72 ++++++++++++++++++++++++++--------------------- client/snowflake.go | 8 ++++-- client/webrtc.go | 5 ++-- 4 files changed, 95 insertions(+), 67 deletions(-)
diff --git a/client/client_test.go b/client/client_test.go index f58aeb0..de83768 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -75,7 +75,7 @@ func TestSnowflakeClient(t *testing.T) { snowflakes.Tongue = FakeDialer{}
go ConnectLoop(snowflakes) - <-snowflakes.maxedChan + // <-snowflakes.maxedChan
So(snowflakes.Count(), ShouldEqual, 1) r := <-snowflakes.snowflakeChan @@ -88,7 +88,7 @@ func TestSnowflakeClient(t *testing.T) { snowflakes.Tongue = FakeDialer{}
go ConnectLoop(snowflakes) - <-snowflakes.maxedChan + // <-snowflakes.maxedChan So(snowflakes.Count(), ShouldEqual, 3) <-snowflakes.snowflakeChan <-snowflakes.snowflakeChan @@ -101,13 +101,13 @@ func TestSnowflakeClient(t *testing.T) { snowflakes.Tongue = FakeDialer{}
go ConnectLoop(snowflakes) - <-snowflakes.maxedChan + // <-snowflakes.maxedChan So(snowflakes.Count(), ShouldEqual, 3)
r := <-snowflakes.snowflakeChan So(snowflakes.Count(), ShouldEqual, 2) r.Close() - <-snowflakes.maxedChan + // <-snowflakes.maxedChan So(snowflakes.Count(), ShouldEqual, 3)
<-snowflakes.snowflakeChan @@ -121,7 +121,6 @@ func TestSnowflakeClient(t *testing.T) { Convey("Can construct", func() { p := NewPeers(1) So(p.capacity, ShouldEqual, 1) - So(p.current, ShouldEqual, nil) So(p.snowflakeChan, ShouldNotBeNil) So(cap(p.snowflakeChan), ShouldEqual, 1) }) @@ -136,36 +135,54 @@ func TestSnowflakeClient(t *testing.T) { err = p.Collect() So(err, ShouldBeNil) So(p.Count(), ShouldEqual, 1) - // S + // S err = p.Collect() })
Convey("Collection continues until capacity.", func() { - c := 5 + c := 5 p := NewPeers(c) - p.Tongue = FakeDialer{} - // Fill up to capacity. - for i := 0 ; i < c ; i++ { - fmt.Println("Adding snowflake ", i) - err := p.Collect() - So(err, ShouldBeNil) - So(p.Count(), ShouldEqual, i + 1) - } - // But adding another gives an error. - So(p.Count(), ShouldEqual, c) - err := p.Collect() - So(err, ShouldNotBeNil) - So(p.Count(), ShouldEqual, c) - - // But popping allows it to continue. - s := p.Pop() - So(s, ShouldNotBeNil) - So(p.Count(), ShouldEqual, c) - - // err = p.Collect() - // So(err, ShouldNotBeNil) - // So(p.Count(), ShouldEqual, c) - }) + p.Tongue = FakeDialer{} + // Fill up to capacity. + for i := 0; i < c; i++ { + fmt.Println("Adding snowflake ", i) + err := p.Collect() + So(err, ShouldBeNil) + So(p.Count(), ShouldEqual, i+1) + } + // But adding another gives an error. + So(p.Count(), ShouldEqual, c) + err := p.Collect() + So(err, ShouldNotBeNil) + So(p.Count(), ShouldEqual, c) + + // But popping and closing allows it to continue. + s := p.Pop() + s.Close() + So(s, ShouldNotBeNil) + So(p.Count(), ShouldEqual, c-1) + + err = p.Collect() + So(err, ShouldBeNil) + So(p.Count(), ShouldEqual, c) + }) + + Convey("Count correctly purges peers marked for deletion.", func() { + p := NewPeers(4) + p.Tongue = FakeDialer{} + p.Collect() + p.Collect() + p.Collect() + p.Collect() + So(p.Count(), ShouldEqual, 4) + s := p.Pop() + s.Close() + So(p.Count(), ShouldEqual, 3) + s = p.Pop() + s.Close() + So(p.Count(), ShouldEqual, 2) + }) + })
Convey("Snowflake", t, func() { diff --git a/client/peers.go b/client/peers.go index 769174b..57570bf 100644 --- a/client/peers.go +++ b/client/peers.go @@ -1,6 +1,7 @@ package main
import ( + "container/list" "errors" "fmt" "log" @@ -22,70 +23,77 @@ type Peers struct { BytesLogger
snowflakeChan chan *webRTCConn - current *webRTCConn + activePeers *list.List capacity int - // TODO: Probably not necessary. - maxedChan chan struct{} }
// Construct a fresh container of remote peers. func NewPeers(max int) *Peers { - p := &Peers{capacity: max, current: nil} + p := &Peers{capacity: max} // Use buffered go channel to pass new snowflakes onwards to the SOCKS handler. p.snowflakeChan = make(chan *webRTCConn, max) - p.maxedChan = make(chan struct{}, 1) + p.activePeers = list.New() return p }
-// TODO: Needs fixing. -func (p *Peers) Count() int { - count := 0 - if p.current != nil { - count = 1 - } - return count + len(p.snowflakeChan) -} - // As part of |SnowflakeCollector| interface. func (p *Peers) Collect() error { - if p.Count() >= p.capacity { - s := fmt.Sprintf("At capacity [%d/%d]", p.Count(), p.capacity) - p.maxedChan <- struct{}{} + cnt := p.Count() + if cnt >= p.capacity { + s := fmt.Sprintf("At capacity [%d/%d]", cnt, p.capacity) return errors.New(s) } - // Engage the Snowflake Catching interface, which must be available. + // Engage the Snowflake Catching interface, which must be available. if nil == p.Tongue { return errors.New("Missing Tongue to catch Snowflakes with.") } connection, err := p.Tongue.Catch() - if nil == connection || nil != err { - return err - } - // Use the same rate-limited traffic logger to keep consistency. - connection.BytesLogger = p.BytesLogger + if nil == connection || nil != err { + return err + } + // Track new valid Snowflake in internal collection and pass along. + p.activePeers.PushBack(connection) p.snowflakeChan <- connection return nil }
// As part of |SnowflakeCollector| interface. func (p *Peers) Pop() *webRTCConn { - // Blocks until an available snowflake appears. + // Blocks until an available snowflake appears. snowflake, ok := <-p.snowflakeChan if !ok { return nil } - p.current = snowflake + // Set to use the same rate-limited traffic logger to keep consistency. snowflake.BytesLogger = p.BytesLogger return snowflake }
-// Close all remote peers. -func (p *Peers) End() { - log.Printf("WebRTC: interruped") - if nil != p.current { - p.current.Close() +// Returns total available Snowflakes (including the active one) +// The count only reduces when connections themselves close, rather than when +// they are popped. +func (p *Peers) Count() int { + p.purgeClosedPeers() + return p.activePeers.Len() +} + +func (p *Peers) purgeClosedPeers() { + for e := p.activePeers.Front(); e != nil; { + next := e.Next() + conn := e.Value.(*webRTCConn) + // Purge those marked for deletion. + if conn.closed { + p.activePeers.Remove(e) + } + e = next } - for r := range p.snowflakeChan { - r.Close() +} + +// Close all Peers contained here. +func (p *Peers) End() { + log.Printf("WebRTC: Ending all peer connections.") + for e := p.activePeers.Front(); e != nil; e = e.Next() { + conn := e.Value.(*webRTCConn) + conn.Close() } } diff --git a/client/snowflake.go b/client/snowflake.go index f8edc2a..aa0e470 100644 --- a/client/snowflake.go +++ b/client/snowflake.go @@ -61,6 +61,7 @@ func ConnectLoop(snowflakes SnowflakeCollector) { continue } // Successful collection gets rate limited to once per second. + log.Println("ConnectLoop success.") <-time.After(time.Second) } } @@ -68,10 +69,10 @@ func ConnectLoop(snowflakes SnowflakeCollector) { // Accept local SOCKS connections and pass them to the handler. func acceptLoop(ln *pt.SocksListener, snowflakes SnowflakeCollector) error { defer ln.Close() + log.Println("Started SOCKS listener.") for { - log.Println("SOCKS listening...", ln) conn, err := ln.AcceptSocks() - log.Println("accepting", conn, err) + log.Println("SOCKS accepted ", conn.Req) if err != nil { if e, ok := err.(net.Error); ok && e.Temporary() { continue @@ -138,7 +139,8 @@ func readSignalingMessages(f *os.File) {
func main() { webrtc.SetLoggingVerbosity(1) - logFile, err := os.OpenFile("snowflake.log", os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0600) + logFile, err := os.OpenFile("snowflake.log", + os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0600) if err != nil { log.Fatal(err) } diff --git a/client/webrtc.go b/client/webrtc.go index 2466a1d..87a1d19 100644 --- a/client/webrtc.go +++ b/client/webrtc.go @@ -18,9 +18,10 @@ type WebRTCDialer struct { }
func NewWebRTCDialer(broker *BrokerChannel) *WebRTCDialer { + config := webrtc.NewConfiguration(iceServers...) return &WebRTCDialer{ - broker, - webrtc.NewConfiguration(iceServers...), + BrokerChannel: broker, + webrtcConfig: config, } }