commit cce7ee64a77b89b8af6760a0e0a0682d2c9331b7 Author: Arlo Breault arlolra@gmail.com Date: Tue Nov 20 22:17:24 2018 -0500
Start refactoring out a client and library --- client/{ => lib}/interfaces.go | 2 +- client/{client_test.go => lib/lib_test.go} | 4 +- client/{ => lib}/peers.go | 2 +- client/{ => lib}/rendezvous.go | 2 +- client/lib/snowflake.go | 69 ++++++++++++++++++++++ client/{ => lib}/util.go | 50 ++++++++-------- client/{ => lib}/webrtc.go | 2 +- client/snowflake.go | 94 +++++++----------------------- 8 files changed, 120 insertions(+), 105 deletions(-)
diff --git a/client/interfaces.go b/client/lib/interfaces.go similarity index 98% rename from client/interfaces.go rename to client/lib/interfaces.go index f18987a..f62d4f5 100644 --- a/client/interfaces.go +++ b/client/lib/interfaces.go @@ -1,4 +1,4 @@ -package main +package lib
import ( "io" diff --git a/client/client_test.go b/client/lib/lib_test.go similarity index 99% rename from client/client_test.go rename to client/lib/lib_test.go index cfc8cbf..5a9a2e5 100644 --- a/client/client_test.go +++ b/client/lib/lib_test.go @@ -1,4 +1,4 @@ -package main +package lib
import ( "bytes" @@ -179,7 +179,7 @@ func TestSnowflakeClient(t *testing.T) {
So(socks.rejected, ShouldEqual, false) snowflakes.toRelease = nil - handler(socks, snowflakes) + Handler(socks, snowflakes) So(socks.rejected, ShouldEqual, true) })
diff --git a/client/peers.go b/client/lib/peers.go similarity index 99% rename from client/peers.go rename to client/lib/peers.go index 3187f09..21411ed 100644 --- a/client/peers.go +++ b/client/lib/peers.go @@ -1,4 +1,4 @@ -package main +package lib
import ( "container/list" diff --git a/client/rendezvous.go b/client/lib/rendezvous.go similarity index 99% rename from client/rendezvous.go rename to client/lib/rendezvous.go index cab7f5a..7436f54 100644 --- a/client/rendezvous.go +++ b/client/lib/rendezvous.go @@ -9,7 +9,7 @@ // // - Manual copy-paste signaling. User must create a signaling pipe. // (The flags in torrc-manual allow this) -package main +package lib
import ( "bufio" diff --git a/client/lib/snowflake.go b/client/lib/snowflake.go new file mode 100644 index 0000000..900af88 --- /dev/null +++ b/client/lib/snowflake.go @@ -0,0 +1,69 @@ +package lib + +import ( + "errors" + "io" + "log" + "net" + "sync" +) + +const ( + ReconnectTimeout = 10 + SnowflakeTimeout = 30 +) + +// When a connection handler starts, +1 is written to this channel; when it +// ends, -1 is written. +var HandlerChan = make(chan int) + +// Given an accepted SOCKS connection, establish a WebRTC connection to the +// remote peer and exchange traffic. +func Handler(socks SocksConnector, snowflakes SnowflakeCollector) error { + HandlerChan <- 1 + defer func() { + HandlerChan <- -1 + }() + // Obtain an available WebRTC remote. May block. + snowflake := snowflakes.Pop() + if nil == snowflake { + socks.Reject() + return errors.New("handler: Received invalid Snowflake") + } + defer socks.Close() + defer snowflake.Close() + log.Println("---- Handler: snowflake assigned ----") + err := socks.Grant(&net.TCPAddr{IP: net.IPv4zero, Port: 0}) + if err != nil { + return err + } + + go func() { + // When WebRTC resets, close the SOCKS connection too. + snowflake.WaitForReset() + socks.Close() + }() + + // Begin exchanging data. Either WebRTC or localhost SOCKS will close first. + // In eithercase, this closes the handler and induces a new handler. + copyLoop(socks, snowflake) + log.Println("---- Handler: closed ---") + return nil +} + +// Exchanges bytes between two ReadWriters. +// (In this case, between a SOCKS and WebRTC connection.) +func copyLoop(a, b io.ReadWriter) { + var wg sync.WaitGroup + wg.Add(2) + go func() { + io.Copy(b, a) + wg.Done() + }() + go func() { + io.Copy(a, b) + wg.Done() + }() + wg.Wait() + log.Println("copy loop ended") +} diff --git a/client/util.go b/client/lib/util.go similarity index 69% rename from client/util.go rename to client/lib/util.go index 20817c3..028fb1c 100644 --- a/client/util.go +++ b/client/lib/util.go @@ -1,4 +1,4 @@ -package main +package lib
import ( "fmt" @@ -34,46 +34,46 @@ func (b BytesNullLogger) AddInbound(amount int) {} // BytesSyncLogger uses channels to safely log from multiple sources with output // occuring at reasonable intervals. type BytesSyncLogger struct { - outboundChan chan int - inboundChan chan int - outbound int - inbound int - outEvents int - inEvents int - isLogging bool + OutboundChan chan int + InboundChan chan int + Outbound int + Inbound int + OutEvents int + InEvents int + IsLogging bool }
func (b *BytesSyncLogger) Log() { - b.isLogging = true + b.IsLogging = true var amount int output := func() { log.Printf("Traffic Bytes (in|out): %d | %d -- (%d OnMessages, %d Sends)", - b.inbound, b.outbound, b.inEvents, b.outEvents) - b.outbound = 0 - b.outEvents = 0 - b.inbound = 0 - b.inEvents = 0 + b.Inbound, b.Outbound, b.InEvents, b.OutEvents) + b.Outbound = 0 + b.OutEvents = 0 + b.Inbound = 0 + b.InEvents = 0 } last := time.Now() for { select { - case amount = <-b.outboundChan: - b.outbound += amount - b.outEvents++ + case amount = <-b.OutboundChan: + b.Outbound += amount + b.OutEvents++ last := time.Now() if time.Since(last) > time.Second*LogTimeInterval { last = time.Now() output() } - case amount = <-b.inboundChan: - b.inbound += amount - b.inEvents++ + case amount = <-b.InboundChan: + b.Inbound += amount + b.InEvents++ if time.Since(last) > time.Second*LogTimeInterval { last = time.Now() output() } case <-time.After(time.Second * LogTimeInterval): - if b.inEvents > 0 || b.outEvents > 0 { + if b.InEvents > 0 || b.OutEvents > 0 { output() } } @@ -81,15 +81,15 @@ func (b *BytesSyncLogger) Log() { }
func (b *BytesSyncLogger) AddOutbound(amount int) { - if !b.isLogging { + if !b.IsLogging { return } - b.outboundChan <- amount + b.OutboundChan <- amount }
func (b *BytesSyncLogger) AddInbound(amount int) { - if !b.isLogging { + if !b.IsLogging { return } - b.inboundChan <- amount + b.InboundChan <- amount } diff --git a/client/webrtc.go b/client/lib/webrtc.go similarity index 99% rename from client/webrtc.go rename to client/lib/webrtc.go index 8c7cb4c..e71a407 100644 --- a/client/webrtc.go +++ b/client/lib/webrtc.go @@ -1,4 +1,4 @@ -package main +package lib
import ( "bytes" diff --git a/client/snowflake.go b/client/snowflake.go index a9841be..b2dea5c 100644 --- a/client/snowflake.go +++ b/client/snowflake.go @@ -2,7 +2,6 @@ package main
import ( - "errors" "flag" "io" "io/ioutil" @@ -12,36 +11,30 @@ import ( "os/signal" "path/filepath" "strings" - "sync" "syscall" "time"
"git.torproject.org/pluggable-transports/goptlib.git" + sf "git.torproject.org/pluggable-transports/snowflake.git/client/lib" "github.com/keroserene/go-webrtc" )
const ( - ReconnectTimeout = 10 DefaultSnowflakeCapacity = 1 - SnowflakeTimeout = 30 )
-// When a connection handler starts, +1 is written to this channel; when it -// ends, -1 is written. -var handlerChan = make(chan int) - // Maintain |SnowflakeCapacity| number of available WebRTC connections, to // transfer to the Tor SOCKS handler when needed. -func ConnectLoop(snowflakes SnowflakeCollector) { +func ConnectLoop(snowflakes sf.SnowflakeCollector) { for { // Check if ending is necessary. _, err := snowflakes.Collect() if nil != err { log.Println("WebRTC:", err, - " Retrying in", ReconnectTimeout, "seconds...") + " Retrying in", sf.ReconnectTimeout, "seconds...") } select { - case <-time.After(time.Second * ReconnectTimeout): + case <-time.After(time.Second * sf.ReconnectTimeout): continue case <-snowflakes.Melted(): log.Println("ConnectLoop: stopped.") @@ -51,7 +44,7 @@ func ConnectLoop(snowflakes SnowflakeCollector) { }
// Accept local SOCKS connections and pass them to the handler. -func socksAcceptLoop(ln *pt.SocksListener, snowflakes SnowflakeCollector) error { +func socksAcceptLoop(ln *pt.SocksListener, snowflakes sf.SnowflakeCollector) error { defer ln.Close() log.Println("Started SOCKS listener.") for { @@ -64,64 +57,13 @@ func socksAcceptLoop(ln *pt.SocksListener, snowflakes SnowflakeCollector) error return err } log.Println("SOCKS accepted: ", conn.Req) - err = handler(conn, snowflakes) + err = sf.Handler(conn, snowflakes) if err != nil { log.Printf("handler error: %s", err) } } }
-// Given an accepted SOCKS connection, establish a WebRTC connection to the -// remote peer and exchange traffic. -func handler(socks SocksConnector, snowflakes SnowflakeCollector) error { - handlerChan <- 1 - defer func() { - handlerChan <- -1 - }() - // Obtain an available WebRTC remote. May block. - snowflake := snowflakes.Pop() - if nil == snowflake { - socks.Reject() - return errors.New("handler: Received invalid Snowflake") - } - defer socks.Close() - defer snowflake.Close() - log.Println("---- Handler: snowflake assigned ----") - err := socks.Grant(&net.TCPAddr{IP: net.IPv4zero, Port: 0}) - if err != nil { - return err - } - - go func() { - // When WebRTC resets, close the SOCKS connection too. - snowflake.WaitForReset() - socks.Close() - }() - - // Begin exchanging data. Either WebRTC or localhost SOCKS will close first. - // In eithercase, this closes the handler and induces a new handler. - copyLoop(socks, snowflake) - log.Println("---- Handler: closed ---") - return nil -} - -// Exchanges bytes between two ReadWriters. -// (In this case, between a SOCKS and WebRTC connection.) -func copyLoop(a, b io.ReadWriter) { - var wg sync.WaitGroup - wg.Add(2) - go func() { - io.Copy(b, a) - wg.Done() - }() - go func() { - io.Copy(a, b) - wg.Done() - }() - wg.Wait() - log.Println("copy loop ended") -} - func main() { iceServersCommas := flag.String("ice", "", "comma-separated list of ICE servers") brokerURL := flag.String("url", "", "URL of signaling broker") @@ -156,30 +98,34 @@ func main() {
log.Println("\n\n\n --- Starting Snowflake Client ---")
- var iceServers IceServerList + var iceServers sf.IceServerList if len(strings.TrimSpace(*iceServersCommas)) > 0 { option := webrtc.OptionIceServer(*iceServersCommas) iceServers = append(iceServers, option) }
// Prepare to collect remote WebRTC peers. - snowflakes := NewPeers(*max) + snowflakes := sf.NewPeers(*max) if "" != *brokerURL { // Use potentially domain-fronting broker to rendezvous. - broker := NewBrokerChannel(*brokerURL, *frontDomain, CreateBrokerTransport()) - snowflakes.Tongue = NewWebRTCDialer(broker, iceServers) + broker := sf.NewBrokerChannel(*brokerURL, *frontDomain, sf.CreateBrokerTransport()) + snowflakes.Tongue = sf.NewWebRTCDialer(broker, iceServers) } else { // Otherwise, use manual copy and pasting of SDP messages. - snowflakes.Tongue = NewCopyPasteDialer(iceServers) + snowflakes.Tongue = sf.NewCopyPasteDialer(iceServers) } if nil == snowflakes.Tongue { log.Fatal("Unable to prepare rendezvous method.") return } // Use a real logger to periodically output how much traffic is happening. - snowflakes.BytesLogger = &BytesSyncLogger{ - inboundChan: make(chan int, 5), outboundChan: make(chan int, 5), - inbound: 0, outbound: 0, inEvents: 0, outEvents: 0, + snowflakes.BytesLogger = &sf.BytesSyncLogger{ + InboundChan: make(chan int, 5), + OutboundChan: make(chan int, 5), + Inbound: 0, + Outbound: 0, + InEvents: 0, + OutEvents: 0, } go snowflakes.BytesLogger.Log()
@@ -232,7 +178,7 @@ func main() { sig = nil for sig == nil { select { - case n := <-handlerChan: + case n := <-sf.HandlerChan: numHandlers += n case sig = <-sigChan: } @@ -244,7 +190,7 @@ func main() { } snowflakes.End() for numHandlers > 0 { - numHandlers += <-handlerChan + numHandlers += <-sf.HandlerChan } log.Println("snowflake is done.") }
tor-commits@lists.torproject.org