commit a71c98c0aef198e7cf7308dee414e3e17a14059b Author: Serene Han keroserene+git@gmail.com Date: Mon Jun 13 15:11:55 2016 -0700
able to break out of ConnectLoop, try separate webrtcConfigs as well --- client/client_test.go | 11 ++++++----- client/interfaces.go | 3 +++ client/peers.go | 23 +++++++++++++++++++---- client/rendezvous.go | 11 +++++++---- client/snowflake.go | 16 ++++++++++------ client/webrtc.go | 49 +++++++++++++++++++++++-------------------------- 6 files changed, 68 insertions(+), 45 deletions(-)
diff --git a/client/client_test.go b/client/client_test.go index 87fb2cb..41d4870 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -65,8 +65,9 @@ func (f FakeSocksConn) Grant(addr *net.TCPAddr) error { return nil }
type FakePeers struct{ toRelease *webRTCConn }
-func (f FakePeers) Collect() error { return nil } -func (f FakePeers) Pop() Snowflake { return nil } +func (f FakePeers) Collect() error { return nil } +func (f FakePeers) Pop() Snowflake { return nil } +func (f FakePeers) Melted() <-chan struct{} { return nil }
func TestSnowflakeClient(t *testing.T) {
@@ -144,6 +145,7 @@ func TestSnowflakeClient(t *testing.T) { } So(p.Count(), ShouldEqual, cnt) p.End() + <-p.Melted() So(p.Count(), ShouldEqual, 0) })
@@ -168,7 +170,6 @@ func TestSnowflakeClient(t *testing.T) { Convey("Can construct a WebRTCConn", func() { s := NewWebRTCConnection(nil, nil) So(s, ShouldNotBeNil) - So(s.index, ShouldEqual, 0) So(s.offerChannel, ShouldNotBeNil) So(s.answerChannel, ShouldNotBeNil) s.Close() @@ -176,13 +177,13 @@ func TestSnowflakeClient(t *testing.T) {
Convey("Write buffers when datachannel is nil", func() { c.Write([]byte("test")) - c.snowflake = nil + c.transport = nil So(c.buffer.Bytes(), ShouldResemble, []byte("test")) })
Convey("Write sends to datachannel when not nil", func() { mock := new(MockDataChannel) - c.snowflake = mock + c.transport = mock mock.done = make(chan bool, 1) c.Write([]byte("test")) <-mock.done diff --git a/client/interfaces.go b/client/interfaces.go index 4fb0dcf..502eefb 100644 --- a/client/interfaces.go +++ b/client/interfaces.go @@ -38,6 +38,9 @@ type SnowflakeCollector interface {
// Remove and return the most available Snowflake from the collection. Pop() Snowflake + + // Signal when the collector has stopped collecting. + Melted() <-chan struct{} }
// Interface to adapt to goptlib's SocksConn struct. diff --git a/client/peers.go b/client/peers.go index eb435fd..74b804f 100644 --- a/client/peers.go +++ b/client/peers.go @@ -25,6 +25,8 @@ type Peers struct { snowflakeChan chan Snowflake activePeers *list.List capacity int + + melt chan struct{} }
// Construct a fresh container of remote peers. @@ -33,17 +35,19 @@ func NewPeers(max int) *Peers { // Use buffered go channel to pass snowflakes onwards to the SOCKS handler. p.snowflakeChan = make(chan Snowflake, max) p.activePeers = list.New() + p.melt = make(chan struct{}, 1) return p }
// As part of |SnowflakeCollector| interface. func (p *Peers) Collect() error { - cnt := p.Count() + s := fmt.Sprintf("Currently at [%d/%d]", cnt, p.capacity) if cnt >= p.capacity { s := fmt.Sprintf("At capacity [%d/%d]", cnt, p.capacity) return errors.New(s) } + log.Println("WebRTC: Collecting a new Snowflake.", s) // Engage the Snowflake Catching interface, which must be available. if nil == p.Tongue { return errors.New("Missing Tongue to catch Snowflakes with.") @@ -60,7 +64,6 @@ func (p *Peers) Collect() error {
// As part of |SnowflakeCollector| interface. func (p *Peers) Pop() Snowflake { - // Blocks until an available snowflake appears. snowflake, ok := <-p.snowflakeChan if !ok { @@ -71,6 +74,11 @@ func (p *Peers) Pop() Snowflake { return snowflake }
+// As part of |SnowflakeCollector| interface. +func (p *Peers) Melted() <-chan struct{} { + return p.melt +} + // Returns total available Snowflakes (including the active one) // The count only reduces when connections themselves close, rather than when // they are popped. @@ -93,9 +101,16 @@ func (p *Peers) purgeClosedPeers() {
// Close all Peers contained here. func (p *Peers) End() { - log.Printf("WebRTC: Ending all peer connections.") - for e := p.activePeers.Front(); e != nil; e = e.Next() { + close(p.snowflakeChan) + p.melt <- struct{}{} + cnt := p.Count() + for e := p.activePeers.Front(); e != nil; { + log.Println(e, e.Value) + next := e.Next() conn := e.Value.(*webRTCConn) conn.Close() + p.activePeers.Remove(e) + e = next } + log.Println("WebRTC: melted all", cnt, "snowflakes.") } diff --git a/client/rendezvous.go b/client/rendezvous.go index 6c7aa44..2bcce17 100644 --- a/client/rendezvous.go +++ b/client/rendezvous.go @@ -115,15 +115,17 @@ func (bc *BrokerChannel) Negotiate(offer *webrtc.SessionDescription) ( // Implements the |Tongue| interface to catch snowflakes, using BrokerChannel. type WebRTCDialer struct { *BrokerChannel - webrtcConfig *webrtc.Configuration + // webrtcConfig *webrtc.Configuration + iceServers IceServerList }
func NewWebRTCDialer( broker *BrokerChannel, iceServers IceServerList) *WebRTCDialer { - config := webrtc.NewConfiguration(iceServers...) + return &WebRTCDialer{ BrokerChannel: broker, - webrtcConfig: config, + iceServers: iceServers, + // webrtcConfig: config, } }
@@ -134,7 +136,8 @@ func (w WebRTCDialer) Catch() (Snowflake, error) { } // TODO: [#3] Fetch ICE server information from Broker. // TODO: [#18] Consider TURN servers here too. - connection := NewWebRTCConnection(w.webrtcConfig, w.BrokerChannel) + config := webrtc.NewConfiguration(w.iceServers...) + connection := NewWebRTCConnection(config, w.BrokerChannel) err := connection.Connect() return connection, err } diff --git a/client/snowflake.go b/client/snowflake.go index 9f5cdc7..d122494 100644 --- a/client/snowflake.go +++ b/client/snowflake.go @@ -30,17 +30,19 @@ var handlerChan = make(chan int) // transfer to the Tor SOCKS handler when needed. func ConnectLoop(snowflakes SnowflakeCollector) { for { + // Check if ending is necessary. err := snowflakes.Collect() if nil != err { log.Println("WebRTC:", err, " Retrying in", ReconnectTimeout, "seconds...") - // Failed collections get a timeout. - <-time.After(time.Second * ReconnectTimeout) + } + select { + case <-time.After(time.Second * ReconnectTimeout): continue + case <-snowflakes.Melted(): + log.Println("ConnectLoop: stopped.") + return } - // Successful collection gets rate limited to once per second. - log.Println("WebRTC: Connected to new Snowflake.") - <-time.After(time.Second) } }
@@ -50,7 +52,7 @@ func socksAcceptLoop(ln *pt.SocksListener, snowflakes SnowflakeCollector) error log.Println("Started SOCKS listener.") for { conn, err := ln.AcceptSocks() - log.Println("SOCKS accepted ", conn.Req) + log.Println("SOCKS accepted: ", conn.Req) if err != nil { if e, ok := err.(net.Error); ok && e.Temporary() { continue @@ -72,6 +74,7 @@ func handler(socks SocksConnector, snowflakes SnowflakeCollector) error { handlerChan <- -1 }() // Obtain an available WebRTC remote. May block. + log.Println("handler: awaiting Snowflake...") snowflake := snowflakes.Pop() if nil == snowflake { socks.Reject() @@ -85,6 +88,7 @@ func handler(socks SocksConnector, snowflakes SnowflakeCollector) error { }
// Begin exchanging data. + // BUG(serene): There's a leak here when multiplexed. go copyLoop(socks, snowflake)
// When WebRTC resets, close the SOCKS connection, which induces new handler. diff --git a/client/webrtc.go b/client/webrtc.go index c7a46ea..6cd5da6 100644 --- a/client/webrtc.go +++ b/client/webrtc.go @@ -15,19 +15,17 @@ import ( type webRTCConn struct { config *webrtc.Configuration pc *webrtc.PeerConnection - snowflake SnowflakeDataChannel // Holds the WebRTC DataChannel. + transport SnowflakeDataChannel // Holds the WebRTC DataChannel. broker *BrokerChannel
offerChannel chan *webrtc.SessionDescription answerChannel chan *webrtc.SessionDescription errorChannel chan error - endChannel chan struct{} recvPipe *io.PipeReader writePipe *io.PipeWriter buffer bytes.Buffer reset chan struct{}
- index int closed bool
BytesLogger @@ -43,11 +41,11 @@ func (c *webRTCConn) Read(b []byte) (int, error) { // As part of |io.ReadWriter| func (c *webRTCConn) Write(b []byte) (int, error) { c.BytesLogger.AddOutbound(len(b)) - if nil == c.snowflake { + if nil == c.transport { log.Printf("Buffered %d bytes --> WebRTC", len(b)) c.buffer.Write(b) } else { - c.snowflake.Send(b) + c.transport.Send(b) } return len(b), nil } @@ -57,15 +55,6 @@ func (c *webRTCConn) Close() error { var err error = nil log.Printf("WebRTC: Closing") c.cleanup() - if nil != c.offerChannel { - close(c.offerChannel) - } - if nil != c.answerChannel { - close(c.answerChannel) - } - if nil != c.errorChannel { - close(c.errorChannel) - } // Mark for deletion. c.closed = true return err @@ -106,7 +95,6 @@ func NewWebRTCConnection(config *webrtc.Configuration,
// As part of |Connector| interface. func (c *webRTCConn) Connect() error { - log.Printf("Establishing WebRTC connection #%d...", c.index) // TODO: When go-webrtc is more stable, it's possible that a new // PeerConnection won't need to be re-prepared each time. err := c.preparePeerConnection() @@ -174,7 +162,7 @@ func (c *webRTCConn) preparePeerConnection() error {
// Create a WebRTC DataChannel locally. func (c *webRTCConn) establishDataChannel() error { - if c.snowflake != nil { + if c.transport != nil { panic("Unexpected datachannel already exists!") } dc, err := c.pc.CreateDataChannel("snowflake", webrtc.Init{}) @@ -187,9 +175,8 @@ func (c *webRTCConn) establishDataChannel() error { } dc.OnOpen = func() { log.Println("WebRTC: DataChannel.OnOpen") - if nil != c.snowflake { - log.Println("PeerConnection snowflake already exists.") - panic("PeerConnection snowflake already exists.") + if nil != c.transport { + panic("WebRTC: transport already exists.") } // Flush buffered outgoing SOCKS data if necessary. if c.buffer.Len() > 0 { @@ -198,11 +185,11 @@ func (c *webRTCConn) establishDataChannel() error { c.buffer.Reset() } // Then enable the datachannel. - c.snowflake = dc + c.transport = dc } dc.OnClose = func() { // Future writes will go to the buffer until a new DataChannel is available. - if nil == c.snowflake { + if nil == c.transport { // Closed locally, as part of a reset. log.Println("WebRTC: DataChannel.OnClose [locally]") return @@ -210,7 +197,7 @@ func (c *webRTCConn) establishDataChannel() error { // Closed remotely, need to reset everything. // Disable the DataChannel as a write destination. log.Println("WebRTC: DataChannel.OnClose [remotely]") - c.snowflake = nil + c.transport = nil c.Reset() } dc.OnMessage = func(msg []byte) { @@ -284,13 +271,23 @@ func (c *webRTCConn) exchangeSDP() error { return nil }
+// Close all channels and transports func (c *webRTCConn) cleanup() { - if nil != c.snowflake { + if nil != c.offerChannel { + close(c.offerChannel) + } + if nil != c.answerChannel { + close(c.answerChannel) + } + if nil != c.errorChannel { + close(c.errorChannel) + } + if nil != c.transport { log.Printf("WebRTC: closing DataChannel") - dataChannel := c.snowflake - // Setting snowflake to nil *before* Close indicates to OnClose that it + dataChannel := c.transport + // Setting dc to nil *before* Close indicates to OnClose that it // was locally triggered. - c.snowflake = nil + c.transport = nil dataChannel.Close() } if nil != c.pc {
tor-commits@lists.torproject.org