[tor-commits] [snowflake/master] Refactor signal receiving in server.

dcf at torproject.org dcf at torproject.org
Tue Jan 19 06:36:43 UTC 2016


commit ac8669b38f8b2a363b5f46036ea8e5de5a69c121
Author: David Fifield <david at bamsoftware.com>
Date:   Mon Jan 18 21:13:08 2016 -0800

    Refactor signal receiving in server.
    
    There's one FIFO reader goroutine instead of one per bindaddr.
    makePeerConnectionFromOffer gives you a PeerConnection with an answer
    and also sets up callbacks to pass a webRTCConn to datachannelHandler
    when ready.
---
 server/snowflake.go |  196 ++++++++++++++++++++++++++-------------------------
 1 file changed, 99 insertions(+), 97 deletions(-)

diff --git a/server/snowflake.go b/server/snowflake.go
index d046998..88a07d8 100644
--- a/server/snowflake.go
+++ b/server/snowflake.go
@@ -81,6 +81,8 @@ func (c *webRTCConn) SetWriteDeadline(t time.Time) error {
 }
 
 func datachannelHandler(conn *webRTCConn) {
+	defer conn.Close()
+
 	handlerChan <- 1
 	defer func() {
 		handlerChan <- -1
@@ -91,121 +93,123 @@ func datachannelHandler(conn *webRTCConn) {
 		log.Printf("Failed to connect to ORPort: " + err.Error())
 		return
 	}
-	//defer or.Close()
-
-	pr, pw := io.Pipe()
-	conn.pr = pr
-
-	dc := conn.dc
-	dc.OnOpen = func() {
-		log.Println("OnOpen channel")
-	}
-	dc.OnClose = func() {
-		log.Println("OnClose channel")
-		pw.Close()
-	}
-	dc.OnMessage = func(msg []byte) {
-		// log.Printf("OnMessage channel %d %+q", len(msg), msg)
-		log.Printf("OnMessage <--- %d bytes", len(msg))
-		n, err := pw.Write(msg)
-		if err != nil {
-			pw.CloseWithError(err)
-		}
-		if n != len(msg) {
-			panic("short write")
-		}
-	}
+	defer or.Close()
 
-	go copyLoop(conn, or)
+	copyLoop(conn, or)
 }
 
-func makePeerConnection(config *webrtc.Configuration) (*webrtc.PeerConnection, error) {
-	pc, err := webrtc.NewPeerConnection(config)
+// Create a PeerConnection from an SDP offer. Blocks until the gathering of ICE
+// candidates is complete and and answer is available in LocalDescription.
+// Installs an OnDataChannel callback that creates a webRTCConn and passes it to
+// datachannelHandler.
+func makePeerConnectionFromOffer(sdp *webrtc.SessionDescription, config *webrtc.Configuration) (*webrtc.PeerConnection, error) {
+	errChan := make(chan error)
+	answerChan := make(chan *webrtc.SessionDescription)
 
+	pc, err := webrtc.NewPeerConnection(config)
 	if err != nil {
-		log.Printf("NewPeerConnection: %s", err)
-		return nil, err
+		return nil, fmt.Errorf("accept: NewPeerConnection: %s", err)
 	}
 	pc.OnNegotiationNeeded = func() {
 		panic("OnNegotiationNeeded")
 	}
-	pc.OnDataChannel = func(dc *data.Channel) {
-		log.Println("OnDataChannel")
-		datachannelHandler(&webRTCConn{pc: pc, dc: dc})
-	}
 	pc.OnIceComplete = func() {
-		log.Printf("----------------")
-		fmt.Fprintln(logFile, pc.LocalDescription().Serialize())
-		log.Printf("----------------")
+		answerChan <- pc.LocalDescription()
 	}
-	return pc, nil
-}
+	pc.OnDataChannel = func(dc *data.Channel) {
+		log.Println("OnDataChannel")
 
-func readSignalingMessages(signalChan chan *webrtc.SessionDescription, f *os.File) {
-	s := bufio.NewScanner(f)
-	for s.Scan() {
-		msg := s.Text()
-		sdp := webrtc.DeserializeSessionDescription(msg)
-		if sdp == nil {
-			log.Printf("ignoring invalid signal message %+q", msg)
-			continue
+		pr, pw := io.Pipe()
+
+		dc.OnOpen = func() {
+			log.Println("OnOpen channel")
 		}
-		signalChan <- sdp
-		continue
-	}
-	if err := s.Err(); err != nil {
-		log.Printf("signal FIFO: %s", err)
+		dc.OnClose = func() {
+			log.Println("OnClose channel")
+			pw.Close()
+		}
+		dc.OnMessage = func(msg []byte) {
+			log.Printf("OnMessage <--- %d bytes", len(msg))
+			n, err := pw.Write(msg)
+			if err != nil {
+				pw.CloseWithError(err)
+			}
+			if n != len(msg) {
+				panic("short write")
+			}
+		}
+
+		conn := &webRTCConn{pc: pc, dc: dc, pr: pr}
+		go datachannelHandler(conn)
 	}
-}
 
-func generateAnswer(pc *webrtc.PeerConnection) {
-	fmt.Println("Generating answer...")
-	answer, err := pc.CreateAnswer() // blocking
+	err = pc.SetRemoteDescription(sdp)
 	if err != nil {
-		fmt.Println(err)
-		return
+		pc.Close()
+		return nil, fmt.Errorf("accept: SetRemoteDescription: %s", err)
 	}
-	pc.SetLocalDescription(answer)
+	log.Println("sdp offer successfully received.")
+
+	go func() {
+		log.Println("Generating answer...")
+		answer, err := pc.CreateAnswer() // blocking
+		if err != nil {
+			errChan <- err
+			return
+		}
+		err = pc.SetLocalDescription(answer)
+		if err != nil {
+			errChan <- err
+			return
+		}
+	}()
+
+	// Wait until answer is ready.
+	select {
+	case err = <-errChan:
+		pc.Close()
+		return nil, err
+	case <-answerChan:
+	}
+
+	return pc, nil
 }
 
-func listenWebRTC(config *webrtc.Configuration, signal string) (*os.File, error) {
-	err := syscall.Mkfifo(signal, 0600)
+// Create a signaling named pipe and feed offers from it into
+// makePeerConnectionFromOffer.
+func receiveSignalsFIFO(filename string, config *webrtc.Configuration) error {
+	err := syscall.Mkfifo(filename, 0600)
 	if err != nil {
 		if err.(syscall.Errno) != syscall.EEXIST {
-			return nil, err
+			return err
 		}
 	}
-	signalFile, err := os.OpenFile(signal, os.O_RDONLY, 0600)
+	signalFile, err := os.OpenFile(filename, os.O_RDONLY, 0600)
 	if err != nil {
-		return nil, err
+		return err
 	}
-	//defer signalFile.Close()
+	defer signalFile.Close()
 
-	var signalChan = make(chan *webrtc.SessionDescription)
-
-	go func() {
-		for {
-			select {
-			case sdp := <-signalChan:
-				pc, err := makePeerConnection(config)
-				if err != nil {
-					log.Printf("makePeerConnection: %s", err)
-					break
-				}
-				err = pc.SetRemoteDescription(sdp)
-				if err != nil {
-					fmt.Println("ERROR", err)
-					break
-				}
-				fmt.Println("sdp offer successfully received.")
-				go generateAnswer(pc)
-			}
+	s := bufio.NewScanner(signalFile)
+	for s.Scan() {
+		msg := s.Text()
+		sdp := webrtc.DeserializeSessionDescription(msg)
+		if sdp == nil {
+			log.Printf("ignoring invalid signal message %+q", msg)
+			continue
 		}
-	}()
 
-	go readSignalingMessages(signalChan, signalFile)
-	log.Printf("waiting for offer")
-	return signalFile, nil
+		pc, err := makePeerConnectionFromOffer(sdp, config)
+		if err != nil {
+			log.Printf("makePeerConnectionFromOffer: %s", err)
+			continue
+		}
+		// Write offer to log for manual signaling.
+		log.Printf("----------------")
+		fmt.Fprintln(logFile, pc.LocalDescription().Serialize())
+		log.Printf("----------------")
+	}
+	return s.Err()
 }
 
 func main() {
@@ -228,18 +232,19 @@ func main() {
 
 	webRTCConfig := webrtc.NewConfiguration(webrtc.OptionIceServer("stun:stun.l.google.com:19302"))
 
-	listeners := make([]*os.File, 0)
+	// Start FIFO-based signaling receiver.
+	go func() {
+		err := receiveSignalsFIFO("signal", webRTCConfig)
+		if err != nil {
+			log.Printf("receiveSignalsFIFO: %s", err)
+		}
+	}()
+
 	for _, bindaddr := range ptInfo.Bindaddrs {
 		switch bindaddr.MethodName {
 		case ptMethodName:
-			ln, err := listenWebRTC(webRTCConfig, "signal") // meh
-			if err != nil {
-				pt.SmethodError(bindaddr.MethodName, err.Error())
-				break
-			}
 			bindaddr.Addr.Port = 12345 // lies!!!
 			pt.Smethod(bindaddr.MethodName, bindaddr.Addr)
-			listeners = append(listeners, ln)
 		default:
 			pt.SmethodError(bindaddr.MethodName, "no such method")
 		}
@@ -260,9 +265,6 @@ func main() {
 		case sig = <-sigChan:
 		}
 	}
-	for _, ln := range listeners {
-		ln.Close()
-	}
 
 	if sig == syscall.SIGTERM {
 		return





More information about the tor-commits mailing list