commit 6b8568cc6cf7186d86e844a6f1460c9af802575a Author: Serene Han keroserene+git@gmail.com Date: Thu May 19 18:06:34 2016 -0700
client interfaces compose better, remove some globals, test ConnectLoop --- client/client_test.go | 85 +++++++++++++++++++++++++------- client/snowflake.go | 133 +++++++++++++++++++++++++++++++++++--------------- client/webrtc.go | 43 +++++++++------- 3 files changed, 186 insertions(+), 75 deletions(-)
diff --git a/client/client_test.go b/client/client_test.go index 41dc5e6..93e0422 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -2,6 +2,7 @@ package main
import ( "bytes" + "fmt" "github.com/keroserene/go-webrtc" . "github.com/smartystreets/goconvey/convey" "io/ioutil" @@ -48,9 +49,64 @@ func (m *MockTransport) RoundTrip(req *http.Request) (*http.Response, error) { return r, nil }
-func TestConnect(t *testing.T) { +type FakeDialer struct{} + +func (w FakeDialer) Catch() (*webRTCConn, error) { + fmt.Println("Caught a dummy snowflake.") + return &webRTCConn{}, nil +} + +func TestSnowflakeClient(t *testing.T) { Convey("Snowflake", t, func() { - webrtcRemotes = make(map[int]*webRTCConn) + + Convey("Peers", func() { + + Convey("WebRTC ConnectLoop continues until capacity of 1.\n", func() { + peers := NewPeers(1) + peers.Tongue = FakeDialer{} + + go ConnectLoop(peers) + <-peers.maxedChan + + So(peers.Count(), ShouldEqual, 1) + r := <-peers.snowflakeChan + So(r, ShouldNotBeNil) + So(peers.Count(), ShouldEqual, 0) + }) + + Convey("WebRTC ConnectLoop continues until capacity of 3.\n", func() { + peers := NewPeers(3) + peers.Tongue = FakeDialer{} + + go ConnectLoop(peers) + <-peers.maxedChan + So(peers.Count(), ShouldEqual, 3) + <-peers.snowflakeChan + <-peers.snowflakeChan + <-peers.snowflakeChan + So(peers.Count(), ShouldEqual, 0) + }) + + Convey("WebRTC ConnectLoop continues filling when Snowflakes disconnect.\n", func() { + peers := NewPeers(3) + peers.Tongue = FakeDialer{} + + go ConnectLoop(peers) + <-peers.maxedChan + So(peers.Count(), ShouldEqual, 3) + + r := <-peers.snowflakeChan + So(peers.Count(), ShouldEqual, 2) + r.Close() + <-peers.maxedChan + So(peers.Count(), ShouldEqual, 3) + + <-peers.snowflakeChan + <-peers.snowflakeChan + <-peers.snowflakeChan + So(peers.Count(), ShouldEqual, 0) + }) + })
Convey("WebRTC Connection", func() { c := new(webRTCConn) @@ -60,17 +116,13 @@ func TestConnect(t *testing.T) { } So(c.buffer.Bytes(), ShouldEqual, nil)
- Convey("Create and remove from WebRTCConn set", func() { - So(len(webrtcRemotes), ShouldEqual, 0) - So(remoteIndex, ShouldEqual, 0) + Convey("Can construct a WebRTCConn", func() { s := NewWebRTCConnection(nil, nil) So(s, ShouldNotBeNil) So(s.index, ShouldEqual, 0) - So(len(webrtcRemotes), ShouldEqual, 1) - So(remoteIndex, ShouldEqual, 1) + So(s.offerChannel, ShouldNotBeNil) + So(s.answerChannel, ShouldNotBeNil) s.Close() - So(len(webrtcRemotes), ShouldEqual, 0) - So(remoteIndex, ShouldEqual, 1) })
Convey("Write buffers when datachannel is nil", func() { @@ -113,9 +165,6 @@ func TestConnect(t *testing.T) { <-c.reset })
- Convey("Connect Loop", func() { - // TODO - }) }) })
@@ -124,14 +173,14 @@ func TestConnect(t *testing.T) { transport := &MockTransport{http.StatusOK} fakeOffer := webrtc.DeserializeSessionDescription("test")
- Convey("BrokerChannel with no front domain", func() { + Convey("Construct BrokerChannel with no front domain", func() { b := NewBrokerChannel("test.broker", "", transport) So(b.url, ShouldNotBeNil) So(b.url.Path, ShouldResemble, "test.broker") So(b.transport, ShouldNotBeNil) })
- Convey("BrokerChannel with front domain", func() { + Convey("Construct BrokerChannel *with* front domain", func() { b := NewBrokerChannel("test.broker", "front", transport) So(b.url, ShouldNotBeNil) So(b.url.Path, ShouldResemble, "test.broker") @@ -139,7 +188,7 @@ func TestConnect(t *testing.T) { So(b.transport, ShouldNotBeNil) })
- Convey("BrokerChannel Negotiate responds with answer", func() { + Convey("BrokerChannel.Negotiate responds with answer", func() { b := NewBrokerChannel("test.broker", "", transport) answer, err := b.Negotiate(fakeOffer) So(err, ShouldBeNil) @@ -147,7 +196,7 @@ func TestConnect(t *testing.T) { So(answer.Sdp, ShouldResemble, "fake") })
- Convey("BrokerChannel Negotiate fails with 503", func() { + Convey("BrokerChannel.Negotiate fails with 503", func() { b := NewBrokerChannel("test.broker", "", &MockTransport{http.StatusServiceUnavailable}) answer, err := b.Negotiate(fakeOffer) @@ -156,7 +205,7 @@ func TestConnect(t *testing.T) { So(err.Error(), ShouldResemble, BrokerError503) })
- Convey("BrokerChannel Negotiate fails with 400", func() { + Convey("BrokerChannel.Negotiate fails with 400", func() { b := NewBrokerChannel("test.broker", "", &MockTransport{http.StatusBadRequest}) answer, err := b.Negotiate(fakeOffer) @@ -165,7 +214,7 @@ func TestConnect(t *testing.T) { So(err.Error(), ShouldResemble, BrokerError400) })
- Convey("BrokerChannel Negotiate fails with unexpected", func() { + Convey("BrokerChannel.Negotiate fails with unexpected error", func() { b := NewBrokerChannel("test.broker", "", &MockTransport{123}) answer, err := b.Negotiate(fakeOffer) diff --git a/client/snowflake.go b/client/snowflake.go index f32ddc8..61864ca 100644 --- a/client/snowflake.go +++ b/client/snowflake.go @@ -5,6 +5,7 @@ import ( "bufio" "errors" "flag" + "fmt" "io" "log" "net" @@ -22,14 +23,12 @@ var ptInfo pt.ClientInfo
const ( ReconnectTimeout = 10 - SnowflakeCapacity = 3 + SnowflakeCapacity = 1 )
var brokerURL string var frontDomain string var iceServers IceServerList -var snowflakeChan = make(chan *webRTCConn, 1) -var broker *BrokerChannel
// When a connection handler starts, +1 is written to this channel; when it // ends, -1 is written. @@ -50,62 +49,110 @@ func copyLoop(a, b net.Conn) { log.Println("copy loop ended") }
-// Interface that matches both webrtc.DataChannel and for testing. +// Interface for catching Snowflakes. +type Tongue interface { + Catch() (*webRTCConn, error) +} + +// Interface for the Snowflake transport. (usually a webrtc.DataChannel) type SnowflakeChannel interface { Send([]byte) Close() error }
+// Collect and track available remote WebRTC Peers, to switch between if the +// current one disconnects. +// Right now, it is only possible to use one remote in a circuit. This can be +// updated once multiplexed transport on a single circuit is available. +type Peers struct { + Tongue + + snowflakeChan chan *webRTCConn + current *webRTCConn + capacity int + maxedChan chan struct{} +} + +func NewPeers(max int) *Peers { + p := &Peers{capacity: max} + p.snowflakeChan = make(chan *webRTCConn, max) + p.maxedChan = make(chan struct{}, 1) + return p +} + +// Find, connect, and add a new peer to the internal collection. +func (p *Peers) FindSnowflake() (*webRTCConn, error) { + if p.Count() >= p.capacity { + s := fmt.Sprintf("At capacity [%d/%d]", p.Count(), p.capacity) + p.maxedChan <- struct{}{} + return nil, errors.New(s) + } + connection, err := p.Catch() + if err != nil { + return nil, err + } + return connection, nil +} + +// TODO: Needs fixing. +func (p *Peers) Count() int { + return len(p.snowflakeChan) +} + +// Close all remote peers. +func (p *Peers) End() { + log.Printf("WebRTC: interruped") + if nil != p.current { + p.current.Close() + } + for r := range p.snowflakeChan { + r.Close() + } +} + // Maintain |SnowflakeCapacity| number of available WebRTC connections, to // transfer to the Tor SOCKS handler when needed. -func SnowflakeConnectLoop() { - transport := CreateBrokerTransport() - broker = NewBrokerChannel(brokerURL, frontDomain, transport) +func ConnectLoop(peers *Peers) { for { - numRemotes := len(webrtcRemotes) - if numRemotes >= SnowflakeCapacity { - log.Println("At Capacity: ", numRemotes, "snowflake. Re-checking in 10s") - <-time.After(time.Second * 10) - continue - } - s, err := dialWebRTC() + s, err := peers.FindSnowflake() if nil == s || nil != err { - log.Println("WebRTC Error: ", err, " retrying...") + log.Println("WebRTC Error:", err, + " Retrying in", ReconnectTimeout, "seconds...") <-time.After(time.Second * ReconnectTimeout) continue } - snowflakeChan <- s + peers.snowflakeChan <- s + <-time.After(time.Second) } }
-// Initialize a WebRTC Connection. -func dialWebRTC() (*webRTCConn, error) { +// Implements |Tongue| +type WebRTCDialer struct { + *BrokerChannel +} + +// Initialize a WebRTC Connection by signaling through the broker. +func (w WebRTCDialer) Catch() (*webRTCConn, error) { + if nil == w.BrokerChannel { + return nil, errors.New("Cannot Dial WebRTC without a BrokerChannel.") + } // TODO: [#3] Fetch ICE server information from Broker. // TODO: [#18] Consider TURN servers here too. config := webrtc.NewConfiguration(iceServers...) - if nil == broker { - return nil, errors.New("Failed to prepare BrokerChannel") - } - connection := NewWebRTCConnection(config, broker) + connection := NewWebRTCConnection(config, w.BrokerChannel) err := connection.Connect() return connection, err }
-func endWebRTC() { - log.Printf("WebRTC: interruped") - for _, r := range webrtcRemotes { - r.Close() - } -} - // Establish a WebRTC channel for SOCKS connections. -func handler(conn *pt.SocksConn) error { +func handler(conn *pt.SocksConn, peers *Peers) error { handlerChan <- 1 defer func() { handlerChan <- -1 }() // Wait for an available WebRTC remote... - remote, ok := <-snowflakeChan + remote, ok := <-peers.snowflakeChan + peers.current = remote if remote == nil || !ok { conn.Reject() return errors.New("handler: Received invalid Snowflake") @@ -125,7 +172,7 @@ func handler(conn *pt.SocksConn) error { return nil }
-func acceptLoop(ln *pt.SocksListener) error { +func acceptLoop(ln *pt.SocksListener, peers *Peers) error { defer ln.Close() for { log.Println("SOCKS listening...", ln) @@ -138,7 +185,7 @@ func acceptLoop(ln *pt.SocksListener) error { return err } go func() { - err := handler(conn) + err := handler(conn, peers) if err != nil { log.Printf("handler error: %s", err) } @@ -146,6 +193,7 @@ func acceptLoop(ln *pt.SocksListener) error { } }
+// TODO: Fix since multiplexing changes access to remotes. func readSignalingMessages(f *os.File) { log.Printf("readSignalingMessages") s := bufio.NewScanner(f) @@ -157,10 +205,10 @@ func readSignalingMessages(f *os.File) { log.Printf("ignoring invalid signal message %+q", msg) continue } - webrtcRemotes[0].answerChannel <- sdp + // webrtcRemotes[0].answerChannel <- sdp } log.Printf("close answerChannel") - close(webrtcRemotes[0].answerChannel) + // close(webrtcRemotes[0].answerChannel) if err := s.Err(); err != nil { log.Printf("signal FIFO: %s", err) } @@ -204,8 +252,13 @@ func main() { go readSignalingMessages(signalFile) }
- webrtcRemotes = make(map[int]*webRTCConn) - go SnowflakeConnectLoop() + // Prepare WebRTC Peers and the Broker, then accumulate connections. + // TODO: Expose remote peer capacity as a flag? + remotes := NewPeers(SnowflakeCapacity) + broker := NewBrokerChannel(brokerURL, frontDomain, CreateBrokerTransport()) + + remotes.Tongue = WebRTCDialer{broker} + go ConnectLoop(remotes)
ptInfo, err = pt.ClientSetup(nil) if err != nil { @@ -221,12 +274,13 @@ func main() { for _, methodName := range ptInfo.MethodNames { switch methodName { case "snowflake": + // TODO: Be able to recover when SOCKS dies. ln, err := pt.ListenSocks("tcp", "127.0.0.1:0") if err != nil { pt.CmethodError(methodName, err.Error()) break } - go acceptLoop(ln) + go acceptLoop(ln, remotes) pt.Cmethod(methodName, ln.Version(), ln.Addr()) listeners = append(listeners, ln) default: @@ -234,7 +288,6 @@ func main() { } } pt.CmethodsDone() - defer endWebRTC()
var numHandlers int = 0 var sig os.Signal @@ -254,6 +307,8 @@ func main() { ln.Close() }
+ remotes.End() + // wait for second signal or no more handlers sig = nil for sig == nil && numHandlers != 0 { diff --git a/client/webrtc.go b/client/webrtc.go index 5b30e95..e01cbf7 100644 --- a/client/webrtc.go +++ b/client/webrtc.go @@ -11,26 +11,27 @@ import ( "time" )
-// Implements net.Conn interface +// Remote WebRTC peer. Implements the |net.Conn| interface. type webRTCConn struct { - config *webrtc.Configuration - pc *webrtc.PeerConnection - snowflake SnowflakeChannel // Interface holding the WebRTC DataChannel. - broker *BrokerChannel + config *webrtc.Configuration + pc *webrtc.PeerConnection + snowflake SnowflakeChannel // Holds the WebRTC DataChannel. + broker *BrokerChannel + offerChannel chan *webrtc.SessionDescription answerChannel chan *webrtc.SessionDescription errorChannel chan error + endChannel chan struct{} recvPipe *io.PipeReader writePipe *io.PipeWriter buffer bytes.Buffer reset chan struct{} - index int + + index int + closed bool *BytesInfo }
-var webrtcRemotes map[int]*webRTCConn -var remoteIndex int = 0 - func (c *webRTCConn) Read(b []byte) (int, error) { return c.recvPipe.Read(b) } @@ -51,10 +52,17 @@ func (c *webRTCConn) Close() error { var err error = nil log.Printf("WebRTC: Closing") c.cleanup() - close(c.offerChannel) - close(c.answerChannel) - close(c.errorChannel) - delete(webrtcRemotes, c.index) + if nil != c.offerChannel { + close(c.offerChannel) + } + if nil != c.answerChannel { + close(c.answerChannel) + } + if nil != c.errorChannel { + close(c.errorChannel) + } + // Mark for deletion. + c.closed = true return err }
@@ -78,6 +86,7 @@ func (c *webRTCConn) SetWriteDeadline(t time.Time) error { return fmt.Errorf("SetWriteDeadline not implemented") }
+// Construct a WebRTC PeerConnection. func NewWebRTCConnection(config *webrtc.Configuration, broker *BrokerChannel) *webRTCConn { connection := new(webRTCConn) @@ -90,6 +99,7 @@ func NewWebRTCConnection(config *webrtc.Configuration, connection.errorChannel = make(chan error, 1) connection.reset = make(chan struct{}, 1)
+ // TODO: Separate out. // Log every few seconds. connection.BytesInfo = &BytesInfo{ inboundChan: make(chan int, 5), outboundChan: make(chan int, 5), @@ -99,9 +109,6 @@ func NewWebRTCConnection(config *webrtc.Configuration,
// Pipes remain the same even when DataChannel gets switched. connection.recvPipe, connection.writePipe = io.Pipe() - connection.index = remoteIndex - webrtcRemotes[connection.index] = connection - remoteIndex++ return connection }
@@ -296,12 +303,12 @@ func (c *webRTCConn) Reset() {
func (c *webRTCConn) cleanup() { if nil != c.snowflake { - s := c.snowflake log.Printf("WebRTC: closing DataChannel") + dataChannel := c.snowflake // Setting snowflake to nil *before* Close indicates to OnClose that it // was locally triggered. c.snowflake = nil - s.Close() + dataChannel.Close() } if nil != c.pc { log.Printf("WebRTC: closing PeerConnection")
tor-commits@lists.torproject.org