commit 2bf0e5457e88f54344f895f4754cf98c05c1d7b0
Author: Serene Han <keroserene+git(a)gmail.com>
Date: Tue Jun 14 17:07:21 2016 -0700
pull copyLoop out of goroutine, better pop and reset
---
client/client_test.go | 38 +++++++++++++++++++++++++++++---------
client/interfaces.go | 7 +++----
client/peers.go | 30 +++++++++++++++++++-----------
client/snowflake.go | 21 ++++++++++++---------
client/webrtc.go | 9 +++++++--
5 files changed, 70 insertions(+), 35 deletions(-)
diff --git a/client/client_test.go b/client/client_test.go
index 41d4870..b5236a0 100644
--- a/client/client_test.go
+++ b/client/client_test.go
@@ -65,9 +65,9 @@ func (f FakeSocksConn) Grant(addr *net.TCPAddr) error { return nil }
type FakePeers struct{ toRelease *webRTCConn }
-func (f FakePeers) Collect() error { return nil }
-func (f FakePeers) Pop() Snowflake { return nil }
-func (f FakePeers) Melted() <-chan struct{} { return nil }
+func (f FakePeers) Collect() (Snowflake, error) { return &webRTCConn{}, nil }
+func (f FakePeers) Pop() Snowflake { return nil }
+func (f FakePeers) Melted() <-chan struct{} { return nil }
func TestSnowflakeClient(t *testing.T) {
@@ -81,16 +81,16 @@ func TestSnowflakeClient(t *testing.T) {
Convey("Collecting a Snowflake requires a Tongue.", func() {
p := NewPeers(1)
- err := p.Collect()
+ _, err := p.Collect()
So(err, ShouldNotBeNil)
So(p.Count(), ShouldEqual, 0)
// Set the dialer so that collection is possible.
p.Tongue = FakeDialer{}
- err = p.Collect()
+ _, err = p.Collect()
So(err, ShouldBeNil)
So(p.Count(), ShouldEqual, 1)
// S
- err = p.Collect()
+ _, err = p.Collect()
})
Convey("Collection continues until capacity.", func() {
@@ -100,13 +100,13 @@ func TestSnowflakeClient(t *testing.T) {
// Fill up to capacity.
for i := 0; i < c; i++ {
fmt.Println("Adding snowflake ", i)
- err := p.Collect()
+ _, err := p.Collect()
So(err, ShouldBeNil)
So(p.Count(), ShouldEqual, i+1)
}
// But adding another gives an error.
So(p.Count(), ShouldEqual, c)
- err := p.Collect()
+ _, err := p.Collect()
So(err, ShouldNotBeNil)
So(p.Count(), ShouldEqual, c)
@@ -116,7 +116,7 @@ func TestSnowflakeClient(t *testing.T) {
So(s, ShouldNotBeNil)
So(p.Count(), ShouldEqual, c-1)
- err = p.Collect()
+ _, err = p.Collect()
So(err, ShouldBeNil)
So(p.Count(), ShouldEqual, c)
})
@@ -149,6 +149,26 @@ func TestSnowflakeClient(t *testing.T) {
So(p.Count(), ShouldEqual, 0)
})
+ Convey("Pop skips over closed peers.", func() {
+ p := NewPeers(4)
+ p.Tongue = FakeDialer{}
+ wc1, _ := p.Collect()
+ wc2, _ := p.Collect()
+ wc3, _ := p.Collect()
+ So(wc1, ShouldNotBeNil)
+ So(wc2, ShouldNotBeNil)
+ So(wc3, ShouldNotBeNil)
+ wc1.Close()
+ r := p.Pop()
+ So(p.Count(), ShouldEqual, 2)
+ So(r, ShouldEqual, wc2)
+ wc4, _ := p.Collect()
+ wc2.Close()
+ wc3.Close()
+ r = p.Pop()
+ So(r, ShouldEqual, wc4)
+ })
+
})
Convey("Snowflake", t, func() {
diff --git a/client/interfaces.go b/client/interfaces.go
index 502eefb..f18987a 100644
--- a/client/interfaces.go
+++ b/client/interfaces.go
@@ -17,10 +17,9 @@ type Resetter interface {
// Interface for a single remote WebRTC peer.
// In the Client context, "Snowflake" refers to the remote browser proxy.
type Snowflake interface {
- io.ReadWriter
+ io.ReadWriteCloser
Resetter
Connector
- Close() error
}
// Interface for catching Snowflakes. (aka the remote dialer)
@@ -34,7 +33,7 @@ type SnowflakeCollector interface {
// Add a Snowflake to the collection.
// Implementation should decide how to connect and maintain the webRTCConn.
- Collect() error
+ Collect() (Snowflake, error)
// Remove and return the most available Snowflake from the collection.
Pop() Snowflake
@@ -52,6 +51,6 @@ type SocksConnector interface {
// Interface for the Snowflake's transport. (Typically just webrtc.DataChannel)
type SnowflakeDataChannel interface {
+ io.Closer
Send([]byte)
- Close() error
}
diff --git a/client/peers.go b/client/peers.go
index 74b804f..098dd81 100644
--- a/client/peers.go
+++ b/client/peers.go
@@ -40,34 +40,43 @@ func NewPeers(max int) *Peers {
}
// As part of |SnowflakeCollector| interface.
-func (p *Peers) Collect() error {
+func (p *Peers) Collect() (Snowflake, error) {
cnt := p.Count()
s := fmt.Sprintf("Currently at [%d/%d]", cnt, p.capacity)
if cnt >= p.capacity {
s := fmt.Sprintf("At capacity [%d/%d]", cnt, p.capacity)
- return errors.New(s)
+ return nil, errors.New(s)
}
log.Println("WebRTC: Collecting a new Snowflake.", s)
// Engage the Snowflake Catching interface, which must be available.
if nil == p.Tongue {
- return errors.New("Missing Tongue to catch Snowflakes with.")
+ return nil, errors.New("Missing Tongue to catch Snowflakes with.")
}
+ // BUG: some broker conflict here.
connection, err := p.Tongue.Catch()
- if nil == connection || nil != err {
- return err
+ if nil != err {
+ return nil, err
}
// Track new valid Snowflake in internal collection and pass along.
p.activePeers.PushBack(connection)
p.snowflakeChan <- connection
- return nil
+ return connection, nil
}
// As part of |SnowflakeCollector| interface.
func (p *Peers) Pop() Snowflake {
- // Blocks until an available snowflake appears.
- snowflake, ok := <-p.snowflakeChan
- if !ok {
- return nil
+ // Blocks until an available, valid snowflake appears.
+ var snowflake Snowflake
+ var ok bool
+ for nil == snowflake {
+ snowflake, ok = <-p.snowflakeChan
+ conn := snowflake.(*webRTCConn)
+ if !ok {
+ return nil
+ }
+ if conn.closed {
+ snowflake = nil
+ }
}
// Set to use the same rate-limited traffic logger to keep consistency.
snowflake.(*webRTCConn).BytesLogger = p.BytesLogger
@@ -105,7 +114,6 @@ func (p *Peers) End() {
p.melt <- struct{}{}
cnt := p.Count()
for e := p.activePeers.Front(); e != nil; {
- log.Println(e, e.Value)
next := e.Next()
conn := e.Value.(*webRTCConn)
conn.Close()
diff --git a/client/snowflake.go b/client/snowflake.go
index d122494..f172611 100644
--- a/client/snowflake.go
+++ b/client/snowflake.go
@@ -31,7 +31,7 @@ var handlerChan = make(chan int)
func ConnectLoop(snowflakes SnowflakeCollector) {
for {
// Check if ending is necessary.
- err := snowflakes.Collect()
+ _, err := snowflakes.Collect()
if nil != err {
log.Println("WebRTC:", err,
" Retrying in", ReconnectTimeout, "seconds...")
@@ -51,6 +51,7 @@ func socksAcceptLoop(ln *pt.SocksListener, snowflakes SnowflakeCollector) error
defer ln.Close()
log.Println("Started SOCKS listener.")
for {
+ log.Println("SOCKS listening...")
conn, err := ln.AcceptSocks()
log.Println("SOCKS accepted: ", conn.Req)
if err != nil {
@@ -81,20 +82,22 @@ func handler(socks SocksConnector, snowflakes SnowflakeCollector) error {
return errors.New("handler: Received invalid Snowflake")
}
defer socks.Close()
- log.Println("---- Snowflake assigned ----")
+ log.Println("---- Handler: snowflake assigned ----")
err := socks.Grant(&net.TCPAddr{IP: net.IPv4zero, Port: 0})
if err != nil {
return err
}
- // Begin exchanging data.
- // BUG(serene): There's a leak here when multiplexed.
- go copyLoop(socks, snowflake)
+ go func() {
+ // When WebRTC resets, close the SOCKS connection, which ends
+ // the copyLoop below and induces new handler.
+ snowflake.WaitForReset()
+ socks.Close()
+ }()
- // When WebRTC resets, close the SOCKS connection, which induces new handler.
- // TODO: Double check this / fix it.
- snowflake.WaitForReset()
- log.Println("---- Closed ---")
+ // Begin exchanging data.
+ copyLoop(socks, snowflake)
+ log.Println("---- Handler: closed ---")
return nil
}
diff --git a/client/webrtc.go b/client/webrtc.go
index 6cd5da6..4c7a3c8 100644
--- a/client/webrtc.go
+++ b/client/webrtc.go
@@ -31,7 +31,7 @@ type webRTCConn struct {
BytesLogger
}
-// Read bytes from remote WebRTC.
+// Read bytes from local SOCKS.
// As part of |io.ReadWriter|
func (c *webRTCConn) Read(b []byte) (int, error) {
return c.recvPipe.Read(b)
@@ -62,11 +62,11 @@ func (c *webRTCConn) Close() error {
// As part of |Resetter|
func (c *webRTCConn) Reset() {
+ c.Close()
go func() {
c.reset <- struct{}{}
log.Println("WebRTC resetting...")
}()
- c.Close()
}
// As part of |Resetter|
@@ -282,6 +282,11 @@ func (c *webRTCConn) cleanup() {
if nil != c.errorChannel {
close(c.errorChannel)
}
+ // Close this side of the SOCKS pipe.
+ if nil != c.writePipe {
+ c.writePipe.Close()
+ c.writePipe = nil
+ }
if nil != c.transport {
log.Printf("WebRTC: closing DataChannel")
dataChannel := c.transport