[tor-commits] [snowflake/master] prepare snowflake client for buffered datachannel writes, separate out dialWebRTC (#12)

serene at torproject.org serene at torproject.org
Thu Feb 18 22:15:38 UTC 2016


commit 760dee8a0f3efc1a71780bfde277ae7a5a7a6d9b
Author: Serene Han <keroserene+git at gmail.com>
Date:   Wed Feb 17 17:39:09 2016 -0800

    prepare snowflake client for buffered datachannel writes, separate out dialWebRTC (#12)
---
 client/snowflake.go | 196 ++++++++++++++++++++++++++++++----------------------
 1 file changed, 115 insertions(+), 81 deletions(-)

diff --git a/client/snowflake.go b/client/snowflake.go
index ba1561e..7c47fbb 100644
--- a/client/snowflake.go
+++ b/client/snowflake.go
@@ -28,7 +28,7 @@ var frontDomain string
 // ends, -1 is written.
 var handlerChan = make(chan int)
 
-var signalChan = make(chan *webrtc.SessionDescription)
+var answerChannel = make(chan *webrtc.SessionDescription)
 
 func copyLoop(a, b net.Conn) {
 	var wg sync.WaitGroup
@@ -46,10 +46,16 @@ func copyLoop(a, b net.Conn) {
 	wg.Wait()
 }
 
+// Implements net.Conn interface
 type webRTCConn struct {
-	pc       *webrtc.PeerConnection
-	dc       *webrtc.DataChannel
-	recvPipe *io.PipeReader
+	pc           *webrtc.PeerConnection
+	dc           *webrtc.DataChannel
+	broker       *BrokerChannel
+	recvPipe     *io.PipeReader
+	writePipe    *io.PipeWriter
+	offerChannel chan *webrtc.SessionDescription
+	errorChannel chan error
+	openChannel  chan struct{}
 }
 
 var webrtcRemote *webRTCConn
@@ -61,6 +67,7 @@ func (c *webRTCConn) Read(b []byte) (int, error) {
 func (c *webRTCConn) Write(b []byte) (int, error) {
 	// log.Printf("webrtc Write %d %+q", len(b), string(b))
 	log.Printf("Write %d bytes --> WebRTC", len(b))
+	// Buffer in case datachannel isn't available.
 	c.dc.Send(b)
 	return len(b), nil
 }
@@ -90,18 +97,87 @@ func (c *webRTCConn) SetWriteDeadline(t time.Time) error {
 	return fmt.Errorf("SetWriteDeadline not implemented")
 }
 
-func dialWebRTC(config *webrtc.Configuration, broker *BrokerChannel) (
-	*webRTCConn, error) {
+// Create a WebRTC DataChannel locally.
+// This triggers "OnNegotiationNeeded" which should prepare an SDP offer.
+func (c *webRTCConn) EstablishDataChannel() error {
+	dc, err := c.pc.CreateDataChannel("snowflake", webrtc.Init{})
+	if err != nil {
+		log.Printf("CreateDataChannel: %s", err)
+		return err
+	}
+	dc.OnOpen = func() {
+		log.Println("OnOpen channel")
+		c.openChannel <- struct{}{}
+	}
+	dc.OnClose = func() {
+		log.Println("OnClose channel")
+		// writePipe.Close()
+		close(c.openChannel)
+		// TODO: (Issue #12) Should attempt to renegotiate at this point.
+	}
+	dc.OnMessage = func(msg []byte) {
+		log.Printf("OnMessage <--- %d bytes", len(msg))
+		n, err := c.writePipe.Write(msg)
+		if err != nil {
+			// TODO: Maybe shouldn't actually close.
+			c.writePipe.CloseWithError(err)
+		}
+		if n != len(msg) {
+			panic("short write")
+		}
+	}
+	c.dc = dc
+	return nil
+}
 
-	offerChan := make(chan *webrtc.SessionDescription)
-	errChan := make(chan error)
-	openChan := make(chan struct{})
+// Block until an offer is available, then send it to either
+// the Broker or signal pipe.
+func (c *webRTCConn) sendOffer() error {
+	select {
+	case offer := <-c.offerChannel:
+		if "" == brokerURL {
+			log.Printf("Please Copy & Paste the following to the peer:")
+			log.Printf("----------------")
+			fmt.Fprintln(logFile, "\n"+offer.Serialize()+"\n")
+			log.Printf("----------------")
+			return nil
+		}
+		// Use Broker...
+		go func() {
+			log.Println("Sending offer via BrokerChannel...\nTarget URL: ", brokerURL,
+				"\nFront URL:  ", frontDomain)
+			answer, err := c.broker.Negotiate(c.pc.LocalDescription())
+			if nil != err {
+				log.Printf("BrokerChannel signaling error: %s", err)
+				return
+			}
+			if nil == answer {
+				log.Printf("BrokerChannel: No answer received.")
+				return
+				// return errors.New("No answer received.")
+			}
+			answerChannel <- answer
+		}()
+	case err := <-c.errorChannel:
+		c.pc.Close()
+		return err
+	}
+	return nil
+}
 
+func dialWebRTC(config *webrtc.Configuration, broker *BrokerChannel) (
+	*webRTCConn, error) {
 	pc, err := webrtc.NewPeerConnection(config)
 	if err != nil {
 		log.Printf("NewPeerConnection: %s", err)
 		return nil, err
 	}
+	connection := new(webRTCConn)
+	connection.broker = broker
+	connection.pc = pc
+	connection.offerChannel = make(chan *webrtc.SessionDescription)
+	connection.errorChannel = make(chan error)
+	connection.openChannel = make(chan struct{})
 
 	// Triggered by CreateDataChannel.
 	pc.OnNegotiationNeeded = func() {
@@ -109,12 +185,12 @@ func dialWebRTC(config *webrtc.Configuration, broker *BrokerChannel) (
 		go func() {
 			offer, err := pc.CreateOffer()
 			if err != nil {
-				errChan <- err
+				connection.errorChannel <- err
 				return
 			}
 			err = pc.SetLocalDescription(offer)
 			if err != nil {
-				errChan <- err
+				connection.errorChannel <- err
 				return
 			}
 		}()
@@ -126,7 +202,7 @@ func dialWebRTC(config *webrtc.Configuration, broker *BrokerChannel) (
 	// TODO: This may soon be deprecated, consider OnIceGatheringStateChange.
 	pc.OnIceComplete = func() {
 		log.Printf("OnIceComplete")
-		offerChan <- pc.LocalDescription()
+		connection.offerChannel <- pc.LocalDescription()
 	}
 	// This callback is not expected, as the Client initiates the creation
 	// of the data channel, not the remote peer.
@@ -135,62 +211,14 @@ func dialWebRTC(config *webrtc.Configuration, broker *BrokerChannel) (
 		panic("OnDataChannel")
 	}
 
-	pr, pw := io.Pipe()
+	// Pipes remain the same even when DataChannel gets switched.
+	connection.recvPipe, connection.writePipe = io.Pipe()
 
-	dc, err := pc.CreateDataChannel("test", webrtc.Init{})
-	if err != nil {
-		log.Printf("CreateDataChannel: %s", err)
-		return nil, err
-	}
-	dc.OnOpen = func() {
-		log.Println("OnOpen channel")
-		openChan <- struct{}{}
-	}
-	dc.OnClose = func() {
-		log.Println("OnClose channel")
-		pw.Close()
-		close(openChan)
-		// TODO: (Issue #12) Should attempt to renegotiate at this point.
-	}
-	dc.OnMessage = func(msg []byte) {
-		log.Printf("OnMessage <--- %d bytes", len(msg))
-		n, err := pw.Write(msg)
-		if err != nil {
-			pw.CloseWithError(err)
-		}
-		if n != len(msg) {
-			panic("short write")
-		}
-	}
-
-	select {
-	case err := <-errChan:
-		pc.Close()
-		return nil, err
-	case offer := <-offerChan:
-		log.Printf("----------------")
-		fmt.Fprintln(logFile, "\n"+offer.Serialize()+"\n")
-		log.Printf("----------------")
-		go func() {
-			if "" != brokerURL {
-				log.Println("Sending offer via BrokerChannel...\nTarget URL: ", brokerURL,
-					"\nFront URL:  ", frontDomain)
-				answer, err := broker.Negotiate(pc.LocalDescription())
-				if nil != err {
-					log.Printf("BrokerChannel signaling error: %s", err)
-				}
-				if nil == answer {
-					log.Printf("BrokerChannel: No answer received.")
-				} else {
-					signalChan <- answer
-				}
-			}
-		}()
-	}
-
-	log.Printf("waiting for answer")
-	answer, ok := <-signalChan
+	connection.EstablishDataChannel()
+	connection.sendOffer()
 
+	log.Printf("waiting for answer...")
+	answer, ok := <-answerChannel
 	if !ok {
 		pc.Close()
 		return nil, fmt.Errorf("no answer received")
@@ -205,13 +233,13 @@ func dialWebRTC(config *webrtc.Configuration, broker *BrokerChannel) (
 	// Wait until data channel is open; otherwise for example sends may get
 	// lost.
 	// TODO: Buffering *should* work though.
-	_, ok = <-openChan
+	_, ok = <-connection.openChannel
 	if !ok {
 		pc.Close()
 		return nil, fmt.Errorf("failed to open data channel")
 	}
 
-	return &webRTCConn{pc: pc, dc: dc, recvPipe: pr}, nil
+	return connection, nil
 }
 
 func endWebRTC() {
@@ -229,6 +257,7 @@ func endWebRTC() {
 	}
 }
 
+// Establish a WebRTC channel for SOCKS connections.
 func handler(conn *pt.SocksConn) error {
 	handlerChan <- 1
 	defer func() {
@@ -259,7 +288,6 @@ func handler(conn *pt.SocksConn) error {
 	}
 
 	copyLoop(conn, remote)
-
 	return nil
 }
 
@@ -293,10 +321,10 @@ func readSignalingMessages(f *os.File) {
 			log.Printf("ignoring invalid signal message %+q", msg)
 			continue
 		}
-		signalChan <- sdp
+		answerChannel <- sdp
 	}
-	log.Printf("close signalChan")
-	close(signalChan)
+	log.Printf("close answerChannel")
+	close(answerChannel)
 	if err := s.Err(); err != nil {
 		log.Printf("signal FIFO: %s", err)
 	}
@@ -308,19 +336,25 @@ func main() {
 	flag.StringVar(&brokerURL, "url", "", "URL of signaling broker")
 	flag.StringVar(&frontDomain, "front", "", "front domain")
 	flag.Parse()
-
 	logFile, err = os.OpenFile("snowflake.log", os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0600)
 	if err != nil {
 		log.Fatal(err)
 	}
 	defer logFile.Close()
 	log.SetOutput(logFile)
-	log.Println("starting")
-
-	if "" == brokerURL {
+	log.Println("\nStarting Snowflake Client...")
+
+	// Expect user to copy-paste if
+	// TODO: Maybe just get rid of copy-paste entirely.
+	if "" != brokerURL {
+		log.Println("Rendezvous using Broker at: ", brokerURL)
+		if "" != frontDomain {
+			log.Println("Domain fronting using:", frontDomain)
+		}
+	} else {
 		log.Println("No HTTP signaling detected. Waiting for a \"signal\" pipe...")
 		// This FIFO receives signaling messages.
-		err = syscall.Mkfifo("signal", 0600)
+		err := syscall.Mkfifo("signal", 0600)
 		if err != nil {
 			if err.(syscall.Errno) != syscall.EEXIST {
 				log.Fatal(err)
@@ -363,6 +397,7 @@ func main() {
 		}
 	}
 	pt.CmethodsDone()
+	defer endWebRTC()
 
 	var numHandlers int = 0
 	var sig os.Signal
@@ -382,10 +417,9 @@ func main() {
 		ln.Close()
 	}
 
-	if syscall.SIGTERM == sig || syscall.SIGINT == sig {
-		endWebRTC()
-		return
-	}
+	// if syscall.SIGTERM == sig || syscall.SIGINT == sig {
+	// return
+	// }
 
 	// wait for second signal or no more handlers
 	sig = nil





More information about the tor-commits mailing list