commit 556596cc5aa14eecb687bbfb44188d1e733b6855 Author: Serene Han keroserene+git@gmail.com Date: Tue May 24 15:18:54 2016 -0700
interfaces.go, SnowflakeCollector, better composition --- client/client_test.go | 124 +++++++++++++++++++++++++------------- client/interfaces.go | 30 ++++++++++ client/snowflake.go | 161 +++++++++++++++++++++++--------------------------- client/webrtc.go | 18 ++++++ 4 files changed, 205 insertions(+), 128 deletions(-)
diff --git a/client/client_test.go b/client/client_test.go index 1ccf206..0bd3844 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -3,12 +3,15 @@ package main import ( "bytes" "fmt" - "github.com/keroserene/go-webrtc" - . "github.com/smartystreets/goconvey/convey" "io/ioutil" + "net" "net/http" "strings" "testing" + + // "git.torproject.org/pluggable-transports/goptlib.git" + "github.com/keroserene/go-webrtc" + . "github.com/smartystreets/goconvey/convey" )
type MockDataChannel struct { @@ -56,56 +59,93 @@ func (w FakeDialer) Catch() (*webRTCConn, error) { return &webRTCConn{}, nil }
+type FakeSocksConn struct { + net.Conn + rejected bool +} + +func (f FakeSocksConn) Reject() error { + f.rejected = true + return nil +} +func (f FakeSocksConn) Grant(addr *net.TCPAddr) error { + return nil +} + +type FakeSnowflakeJar struct { + toRelease *webRTCConn +} + +func (f FakeSnowflakeJar) Release() *webRTCConn { + return nil +} + +func (f FakeSnowflakeJar) Collect() (*webRTCConn, error) { + return nil, nil +} + func TestSnowflakeClient(t *testing.T) { - Convey("Snowflake", t, func() {
- Convey("Peers", func() { + Convey("WebRTC ConnectLoop", t, func() {
- Convey("WebRTC ConnectLoop continues until capacity of 1.\n", func() { - peers := NewPeers(1) - peers.Tongue = FakeDialer{} + Convey("WebRTC ConnectLoop continues until capacity of 1.\n", func() { + snowflakes := NewSnowflakeJar(1) + snowflakes.Tongue = FakeDialer{}
- go ConnectLoop(peers) - <-peers.maxedChan + go ConnectLoop(snowflakes) + <-snowflakes.maxedChan
- So(peers.Count(), ShouldEqual, 1) - r := <-peers.snowflakeChan - So(r, ShouldNotBeNil) - So(peers.Count(), ShouldEqual, 0) - }) + So(snowflakes.Count(), ShouldEqual, 1) + r := <-snowflakes.snowflakeChan + So(r, ShouldNotBeNil) + So(snowflakes.Count(), ShouldEqual, 0) + })
- Convey("WebRTC ConnectLoop continues until capacity of 3.\n", func() { - peers := NewPeers(3) - peers.Tongue = FakeDialer{} - - go ConnectLoop(peers) - <-peers.maxedChan - So(peers.Count(), ShouldEqual, 3) - <-peers.snowflakeChan - <-peers.snowflakeChan - <-peers.snowflakeChan - So(peers.Count(), ShouldEqual, 0) - }) + Convey("WebRTC ConnectLoop continues until capacity of 3.\n", func() { + snowflakes := NewSnowflakeJar(3) + snowflakes.Tongue = FakeDialer{} + + go ConnectLoop(snowflakes) + <-snowflakes.maxedChan + So(snowflakes.Count(), ShouldEqual, 3) + <-snowflakes.snowflakeChan + <-snowflakes.snowflakeChan + <-snowflakes.snowflakeChan + So(snowflakes.Count(), ShouldEqual, 0) + })
- Convey("WebRTC ConnectLoop continues filling when Snowflakes disconnect.\n", func() { - peers := NewPeers(3) - peers.Tongue = FakeDialer{} + Convey("WebRTC ConnectLoop continues filling when Snowflakes disconnect.\n", func() { + snowflakes := NewSnowflakeJar(3) + snowflakes.Tongue = FakeDialer{}
- go ConnectLoop(peers) - <-peers.maxedChan - So(peers.Count(), ShouldEqual, 3) + go ConnectLoop(snowflakes) + <-snowflakes.maxedChan + So(snowflakes.Count(), ShouldEqual, 3)
- r := <-peers.snowflakeChan - So(peers.Count(), ShouldEqual, 2) - r.Close() - <-peers.maxedChan - So(peers.Count(), ShouldEqual, 3) + r := <-snowflakes.snowflakeChan + So(snowflakes.Count(), ShouldEqual, 2) + r.Close() + <-snowflakes.maxedChan + So(snowflakes.Count(), ShouldEqual, 3) + + <-snowflakes.snowflakeChan + <-snowflakes.snowflakeChan + <-snowflakes.snowflakeChan + So(snowflakes.Count(), ShouldEqual, 0) + }) + }) + + Convey("Snowflake", t, func() { + + SkipConvey("Handler Grants correctly", func() { + socks := &FakeSocksConn{} + snowflakes := &FakeSnowflakeJar{} + + So(socks.rejected, ShouldEqual, false) + snowflakes.toRelease = nil + handler(socks, snowflakes) + So(socks.rejected, ShouldEqual, true)
- <-peers.snowflakeChan - <-peers.snowflakeChan - <-peers.snowflakeChan - So(peers.Count(), ShouldEqual, 0) - }) })
Convey("WebRTC Connection", func() { diff --git a/client/interfaces.go b/client/interfaces.go new file mode 100644 index 0000000..80a2ba3 --- /dev/null +++ b/client/interfaces.go @@ -0,0 +1,30 @@ +package main + +import ( + "net" +) + +// Interface for collecting and releasing snowflakes. +type SnowflakeCollector interface { + Collect() (*webRTCConn, error) + Release() *webRTCConn +} + +// Interface for catching those wild Snowflakes. +type Tongue interface { + Catch() (*webRTCConn, error) +} + +// Interface which primarily adapts to goptlib's SocksConn struct. +type SocksConnector interface { + Grant(*net.TCPAddr) error + Reject() error + net.Conn +} + +// Interface for the Snowflake's transport. +// (Specifically, webrtc.DataChannel) +type SnowflakeChannel interface { + Send([]byte) + Close() error +} diff --git a/client/snowflake.go b/client/snowflake.go index 464420c..cd7f151 100644 --- a/client/snowflake.go +++ b/client/snowflake.go @@ -1,4 +1,5 @@ // Client transport plugin for the Snowflake pluggable transport. +// In the Client context, "Snowflake" refers to a remote browser proxy. package main
import ( @@ -49,22 +50,12 @@ func copyLoop(a, b net.Conn) { log.Println("copy loop ended") }
-// Interface for catching Snowflakes. -type Tongue interface { - Catch() (*webRTCConn, error) -} - -// Interface for the Snowflake transport. (usually a webrtc.DataChannel) -type SnowflakeChannel interface { - Send([]byte) - Close() error -} - -// Collect and track available remote WebRTC Peers, to switch between if the -// current one disconnects. -// Right now, it is only possible to use one remote in a circuit. This can be -// updated once multiplexed transport on a single circuit is available. -type Peers struct { +// Collect and track available snowflakes. (Implements SnowflakeCollector) +// Right now, it is only possible to use one active remote in a circuit. +// This can be updated once multiplexed transport on a single circuit is available. +// Keeping multiple WebRTC connections available allows for quicker recovery when +// the current snowflake disconnects. +type SnowflakeJar struct { Tongue BytesLogger
@@ -74,30 +65,42 @@ type Peers struct { maxedChan chan struct{} }
-func NewPeers(max int) *Peers { - p := &Peers{capacity: max} +func NewSnowflakeJar(max int) *SnowflakeJar { + p := &SnowflakeJar{capacity: max} p.snowflakeChan = make(chan *webRTCConn, max) p.maxedChan = make(chan struct{}, 1) return p }
-// Find, connect, and add a new peer to the internal collection. -func (p *Peers) FindSnowflake() (*webRTCConn, error) { - if p.Count() >= p.capacity { - s := fmt.Sprintf("At capacity [%d/%d]", p.Count(), p.capacity) - p.maxedChan <- struct{}{} +// Establish connection to some remote WebRTC peer, and keep it available for +// later. +func (jar *SnowflakeJar) Collect() (*webRTCConn, error) { + if jar.Count() >= jar.capacity { + s := fmt.Sprintf("At capacity [%d/%d]", jar.Count(), jar.capacity) + jar.maxedChan <- struct{}{} return nil, errors.New(s) } - connection, err := p.Catch() - connection.BytesLogger = p.BytesLogger + snowflake, err := jar.Catch() if err != nil { return nil, err } - return connection, nil + jar.snowflakeChan <- snowflake + return snowflake, nil +} + +// Prepare and present an available remote WebRTC peer for active use. +func (jar *SnowflakeJar) Release() *webRTCConn { + snowflake, ok := <-jar.snowflakeChan + if !ok { + return nil + } + jar.current = snowflake + snowflake.BytesLogger = jar.BytesLogger + return snowflake }
// TODO: Needs fixing. -func (p *Peers) Count() int { +func (p *SnowflakeJar) Count() int { count := 0 if p.current != nil { count = 1 @@ -106,7 +109,7 @@ func (p *Peers) Count() int { }
// Close all remote peers. -func (p *Peers) End() { +func (p *SnowflakeJar) End() { log.Printf("WebRTC: interruped") if nil != p.current { p.current.Close() @@ -118,87 +121,71 @@ func (p *Peers) End() {
// Maintain |SnowflakeCapacity| number of available WebRTC connections, to // transfer to the Tor SOCKS handler when needed. -func ConnectLoop(peers *Peers) { +func ConnectLoop(snowflakes SnowflakeCollector) { for { - s, err := peers.FindSnowflake() + s, err := snowflakes.Collect() if nil == s || nil != err { log.Println("WebRTC Error:", err, " Retrying in", ReconnectTimeout, "seconds...") + // Failed collections get a timeout. <-time.After(time.Second * ReconnectTimeout) continue } - peers.snowflakeChan <- s + // Successful collection gets rate limited to once per second. <-time.After(time.Second) } }
-// Implements |Tongue| -type WebRTCDialer struct { - *BrokerChannel -} - -// Initialize a WebRTC Connection by signaling through the broker. -func (w WebRTCDialer) Catch() (*webRTCConn, error) { - if nil == w.BrokerChannel { - return nil, errors.New("Cannot Dial WebRTC without a BrokerChannel.") +// Accept local SOCKS connections and pass them to the handler. +func acceptLoop(ln *pt.SocksListener, snowflakes SnowflakeCollector) error { + defer ln.Close() + for { + log.Println("SOCKS listening...", ln) + conn, err := ln.AcceptSocks() + log.Println("accepting", conn, err) + if err != nil { + if e, ok := err.(net.Error); ok && e.Temporary() { + continue + } + return err + } + err = handler(conn, snowflakes) + if err != nil { + log.Printf("handler error: %s", err) + } } - // TODO: [#3] Fetch ICE server information from Broker. - // TODO: [#18] Consider TURN servers here too. - config := webrtc.NewConfiguration(iceServers...) - connection := NewWebRTCConnection(config, w.BrokerChannel) - err := connection.Connect() - return connection, err }
-// Establish a WebRTC channel for SOCKS connections. -func handler(conn *pt.SocksConn, peers *Peers) error { +// Given an accepted SOCKS connection, establish a WebRTC connection to the +// remote peer and exchange traffic. +func handler(socks SocksConnector, snowflakes SnowflakeCollector) error { handlerChan <- 1 defer func() { handlerChan <- -1 }() // Wait for an available WebRTC remote... - remote, ok := <-peers.snowflakeChan - peers.current = remote - if remote == nil || !ok { - conn.Reject() + snowflake := snowflakes.Release() + if nil == snowflake { + socks.Reject() return errors.New("handler: Received invalid Snowflake") } - defer conn.Close() + defer socks.Close() log.Println("handler: Snowflake assigned.") - - err := conn.Grant(&net.TCPAddr{IP: net.IPv4zero, Port: 0}) + err := socks.Grant(&net.TCPAddr{IP: net.IPv4zero, Port: 0}) if err != nil { return err }
- go copyLoop(conn, remote) + // Begin exchanging data. + go copyLoop(socks, snowflake) + // When WebRTC resets, close the SOCKS connection, which induces new handler. - <-remote.reset + // TODO: Double check this / fix it. + <-snowflake.reset log.Println("---- Closed ---") return nil }
-func acceptLoop(ln *pt.SocksListener, peers *Peers) error { - defer ln.Close() - for { - log.Println("SOCKS listening...", ln) - conn, err := ln.AcceptSocks() - log.Println("accepting", conn, err) - if err != nil { - if e, ok := err.(net.Error); ok && e.Temporary() { - continue - } - return err - } - go func() { - err := handler(conn, peers) - if err != nil { - log.Printf("handler error: %s", err) - } - }() - } -} - // TODO: Fix since multiplexing changes access to remotes. func readSignalingMessages(f *os.File) { log.Printf("readSignalingMessages") @@ -258,19 +245,21 @@ func main() { go readSignalingMessages(signalFile) }
- // Prepare WebRTC Peers and the Broker, then accumulate connections. + // Prepare WebRTC SnowflakeCollector and the Broker, then accumulate connections. // TODO: Expose remote peer capacity as a flag? - remotes := NewPeers(SnowflakeCapacity) + snowflakes := NewSnowflakeJar(SnowflakeCapacity) + broker := NewBrokerChannel(brokerURL, frontDomain, CreateBrokerTransport()) + snowflakes.Tongue = WebRTCDialer{broker}
- remotes.BytesLogger = &BytesSyncLogger{ + // Use a real logger for traffic. + snowflakes.BytesLogger = &BytesSyncLogger{ inboundChan: make(chan int, 5), outboundChan: make(chan int, 5), inbound: 0, outbound: 0, inEvents: 0, outEvents: 0, } - go remotes.BytesLogger.Log()
- remotes.Tongue = WebRTCDialer{broker} - go ConnectLoop(remotes) + go ConnectLoop(snowflakes) + go snowflakes.BytesLogger.Log()
ptInfo, err = pt.ClientSetup(nil) if err != nil { @@ -292,7 +281,7 @@ func main() { pt.CmethodError(methodName, err.Error()) break } - go acceptLoop(ln, remotes) + go acceptLoop(ln, snowflakes) pt.Cmethod(methodName, ln.Version(), ln.Addr()) listeners = append(listeners, ln) default: @@ -319,7 +308,7 @@ func main() { ln.Close() }
- remotes.End() + snowflakes.End()
// wait for second signal or no more handlers sig = nil diff --git a/client/webrtc.go b/client/webrtc.go index a47ac19..41642be 100644 --- a/client/webrtc.go +++ b/client/webrtc.go @@ -11,6 +11,24 @@ import ( "time" )
+// Implements the |Tongue| interface to catch snowflakes, using a BrokerChannel. +type WebRTCDialer struct { + *BrokerChannel +} + +// Initialize a WebRTC Connection by signaling through the broker. +func (w WebRTCDialer) Catch() (*webRTCConn, error) { + if nil == w.BrokerChannel { + return nil, errors.New("Cannot Dial WebRTC without a BrokerChannel.") + } + // TODO: [#3] Fetch ICE server information from Broker. + // TODO: [#18] Consider TURN servers here too. + config := webrtc.NewConfiguration(iceServers...) + connection := NewWebRTCConnection(config, w.BrokerChannel) + err := connection.Connect() + return connection, err +} + // Remote WebRTC peer. Implements the |net.Conn| interface. type webRTCConn struct { config *webrtc.Configuration
tor-commits@lists.torproject.org