
commit 760dee8a0f3efc1a71780bfde277ae7a5a7a6d9b Author: Serene Han <keroserene+git@gmail.com> Date: Wed Feb 17 17:39:09 2016 -0800 prepare snowflake client for buffered datachannel writes, separate out dialWebRTC (#12) --- client/snowflake.go | 196 ++++++++++++++++++++++++++++++---------------------- 1 file changed, 115 insertions(+), 81 deletions(-) diff --git a/client/snowflake.go b/client/snowflake.go index ba1561e..7c47fbb 100644 --- a/client/snowflake.go +++ b/client/snowflake.go @@ -28,7 +28,7 @@ var frontDomain string // ends, -1 is written. var handlerChan = make(chan int) -var signalChan = make(chan *webrtc.SessionDescription) +var answerChannel = make(chan *webrtc.SessionDescription) func copyLoop(a, b net.Conn) { var wg sync.WaitGroup @@ -46,10 +46,16 @@ func copyLoop(a, b net.Conn) { wg.Wait() } +// Implements net.Conn interface type webRTCConn struct { - pc *webrtc.PeerConnection - dc *webrtc.DataChannel - recvPipe *io.PipeReader + pc *webrtc.PeerConnection + dc *webrtc.DataChannel + broker *BrokerChannel + recvPipe *io.PipeReader + writePipe *io.PipeWriter + offerChannel chan *webrtc.SessionDescription + errorChannel chan error + openChannel chan struct{} } var webrtcRemote *webRTCConn @@ -61,6 +67,7 @@ func (c *webRTCConn) Read(b []byte) (int, error) { func (c *webRTCConn) Write(b []byte) (int, error) { // log.Printf("webrtc Write %d %+q", len(b), string(b)) log.Printf("Write %d bytes --> WebRTC", len(b)) + // Buffer in case datachannel isn't available. c.dc.Send(b) return len(b), nil } @@ -90,18 +97,87 @@ func (c *webRTCConn) SetWriteDeadline(t time.Time) error { return fmt.Errorf("SetWriteDeadline not implemented") } -func dialWebRTC(config *webrtc.Configuration, broker *BrokerChannel) ( - *webRTCConn, error) { +// Create a WebRTC DataChannel locally. +// This triggers "OnNegotiationNeeded" which should prepare an SDP offer. +func (c *webRTCConn) EstablishDataChannel() error { + dc, err := c.pc.CreateDataChannel("snowflake", webrtc.Init{}) + if err != nil { + log.Printf("CreateDataChannel: %s", err) + return err + } + dc.OnOpen = func() { + log.Println("OnOpen channel") + c.openChannel <- struct{}{} + } + dc.OnClose = func() { + log.Println("OnClose channel") + // writePipe.Close() + close(c.openChannel) + // TODO: (Issue #12) Should attempt to renegotiate at this point. + } + dc.OnMessage = func(msg []byte) { + log.Printf("OnMessage <--- %d bytes", len(msg)) + n, err := c.writePipe.Write(msg) + if err != nil { + // TODO: Maybe shouldn't actually close. + c.writePipe.CloseWithError(err) + } + if n != len(msg) { + panic("short write") + } + } + c.dc = dc + return nil +} - offerChan := make(chan *webrtc.SessionDescription) - errChan := make(chan error) - openChan := make(chan struct{}) +// Block until an offer is available, then send it to either +// the Broker or signal pipe. +func (c *webRTCConn) sendOffer() error { + select { + case offer := <-c.offerChannel: + if "" == brokerURL { + log.Printf("Please Copy & Paste the following to the peer:") + log.Printf("----------------") + fmt.Fprintln(logFile, "\n"+offer.Serialize()+"\n") + log.Printf("----------------") + return nil + } + // Use Broker... + go func() { + log.Println("Sending offer via BrokerChannel...\nTarget URL: ", brokerURL, + "\nFront URL: ", frontDomain) + answer, err := c.broker.Negotiate(c.pc.LocalDescription()) + if nil != err { + log.Printf("BrokerChannel signaling error: %s", err) + return + } + if nil == answer { + log.Printf("BrokerChannel: No answer received.") + return + // return errors.New("No answer received.") + } + answerChannel <- answer + }() + case err := <-c.errorChannel: + c.pc.Close() + return err + } + return nil +} +func dialWebRTC(config *webrtc.Configuration, broker *BrokerChannel) ( + *webRTCConn, error) { pc, err := webrtc.NewPeerConnection(config) if err != nil { log.Printf("NewPeerConnection: %s", err) return nil, err } + connection := new(webRTCConn) + connection.broker = broker + connection.pc = pc + connection.offerChannel = make(chan *webrtc.SessionDescription) + connection.errorChannel = make(chan error) + connection.openChannel = make(chan struct{}) // Triggered by CreateDataChannel. pc.OnNegotiationNeeded = func() { @@ -109,12 +185,12 @@ func dialWebRTC(config *webrtc.Configuration, broker *BrokerChannel) ( go func() { offer, err := pc.CreateOffer() if err != nil { - errChan <- err + connection.errorChannel <- err return } err = pc.SetLocalDescription(offer) if err != nil { - errChan <- err + connection.errorChannel <- err return } }() @@ -126,7 +202,7 @@ func dialWebRTC(config *webrtc.Configuration, broker *BrokerChannel) ( // TODO: This may soon be deprecated, consider OnIceGatheringStateChange. pc.OnIceComplete = func() { log.Printf("OnIceComplete") - offerChan <- pc.LocalDescription() + connection.offerChannel <- pc.LocalDescription() } // This callback is not expected, as the Client initiates the creation // of the data channel, not the remote peer. @@ -135,62 +211,14 @@ func dialWebRTC(config *webrtc.Configuration, broker *BrokerChannel) ( panic("OnDataChannel") } - pr, pw := io.Pipe() + // Pipes remain the same even when DataChannel gets switched. + connection.recvPipe, connection.writePipe = io.Pipe() - dc, err := pc.CreateDataChannel("test", webrtc.Init{}) - if err != nil { - log.Printf("CreateDataChannel: %s", err) - return nil, err - } - dc.OnOpen = func() { - log.Println("OnOpen channel") - openChan <- struct{}{} - } - dc.OnClose = func() { - log.Println("OnClose channel") - pw.Close() - close(openChan) - // TODO: (Issue #12) Should attempt to renegotiate at this point. - } - dc.OnMessage = func(msg []byte) { - log.Printf("OnMessage <--- %d bytes", len(msg)) - n, err := pw.Write(msg) - if err != nil { - pw.CloseWithError(err) - } - if n != len(msg) { - panic("short write") - } - } - - select { - case err := <-errChan: - pc.Close() - return nil, err - case offer := <-offerChan: - log.Printf("----------------") - fmt.Fprintln(logFile, "\n"+offer.Serialize()+"\n") - log.Printf("----------------") - go func() { - if "" != brokerURL { - log.Println("Sending offer via BrokerChannel...\nTarget URL: ", brokerURL, - "\nFront URL: ", frontDomain) - answer, err := broker.Negotiate(pc.LocalDescription()) - if nil != err { - log.Printf("BrokerChannel signaling error: %s", err) - } - if nil == answer { - log.Printf("BrokerChannel: No answer received.") - } else { - signalChan <- answer - } - } - }() - } - - log.Printf("waiting for answer") - answer, ok := <-signalChan + connection.EstablishDataChannel() + connection.sendOffer() + log.Printf("waiting for answer...") + answer, ok := <-answerChannel if !ok { pc.Close() return nil, fmt.Errorf("no answer received") @@ -205,13 +233,13 @@ func dialWebRTC(config *webrtc.Configuration, broker *BrokerChannel) ( // Wait until data channel is open; otherwise for example sends may get // lost. // TODO: Buffering *should* work though. - _, ok = <-openChan + _, ok = <-connection.openChannel if !ok { pc.Close() return nil, fmt.Errorf("failed to open data channel") } - return &webRTCConn{pc: pc, dc: dc, recvPipe: pr}, nil + return connection, nil } func endWebRTC() { @@ -229,6 +257,7 @@ func endWebRTC() { } } +// Establish a WebRTC channel for SOCKS connections. func handler(conn *pt.SocksConn) error { handlerChan <- 1 defer func() { @@ -259,7 +288,6 @@ func handler(conn *pt.SocksConn) error { } copyLoop(conn, remote) - return nil } @@ -293,10 +321,10 @@ func readSignalingMessages(f *os.File) { log.Printf("ignoring invalid signal message %+q", msg) continue } - signalChan <- sdp + answerChannel <- sdp } - log.Printf("close signalChan") - close(signalChan) + log.Printf("close answerChannel") + close(answerChannel) if err := s.Err(); err != nil { log.Printf("signal FIFO: %s", err) } @@ -308,19 +336,25 @@ func main() { flag.StringVar(&brokerURL, "url", "", "URL of signaling broker") flag.StringVar(&frontDomain, "front", "", "front domain") flag.Parse() - logFile, err = os.OpenFile("snowflake.log", os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0600) if err != nil { log.Fatal(err) } defer logFile.Close() log.SetOutput(logFile) - log.Println("starting") - - if "" == brokerURL { + log.Println("\nStarting Snowflake Client...") + + // Expect user to copy-paste if + // TODO: Maybe just get rid of copy-paste entirely. + if "" != brokerURL { + log.Println("Rendezvous using Broker at: ", brokerURL) + if "" != frontDomain { + log.Println("Domain fronting using:", frontDomain) + } + } else { log.Println("No HTTP signaling detected. Waiting for a \"signal\" pipe...") // This FIFO receives signaling messages. - err = syscall.Mkfifo("signal", 0600) + err := syscall.Mkfifo("signal", 0600) if err != nil { if err.(syscall.Errno) != syscall.EEXIST { log.Fatal(err) @@ -363,6 +397,7 @@ func main() { } } pt.CmethodsDone() + defer endWebRTC() var numHandlers int = 0 var sig os.Signal @@ -382,10 +417,9 @@ func main() { ln.Close() } - if syscall.SIGTERM == sig || syscall.SIGINT == sig { - endWebRTC() - return - } + // if syscall.SIGTERM == sig || syscall.SIGINT == sig { + // return + // } // wait for second signal or no more handlers sig = nil
participants (1)
-
serene@torproject.org