commit ac8669b38f8b2a363b5f46036ea8e5de5a69c121 Author: David Fifield david@bamsoftware.com Date: Mon Jan 18 21:13:08 2016 -0800
Refactor signal receiving in server.
There's one FIFO reader goroutine instead of one per bindaddr. makePeerConnectionFromOffer gives you a PeerConnection with an answer and also sets up callbacks to pass a webRTCConn to datachannelHandler when ready. --- server/snowflake.go | 196 ++++++++++++++++++++++++++------------------------- 1 file changed, 99 insertions(+), 97 deletions(-)
diff --git a/server/snowflake.go b/server/snowflake.go index d046998..88a07d8 100644 --- a/server/snowflake.go +++ b/server/snowflake.go @@ -81,6 +81,8 @@ func (c *webRTCConn) SetWriteDeadline(t time.Time) error { }
func datachannelHandler(conn *webRTCConn) { + defer conn.Close() + handlerChan <- 1 defer func() { handlerChan <- -1 @@ -91,121 +93,123 @@ func datachannelHandler(conn *webRTCConn) { log.Printf("Failed to connect to ORPort: " + err.Error()) return } - //defer or.Close() - - pr, pw := io.Pipe() - conn.pr = pr - - dc := conn.dc - dc.OnOpen = func() { - log.Println("OnOpen channel") - } - dc.OnClose = func() { - log.Println("OnClose channel") - pw.Close() - } - dc.OnMessage = func(msg []byte) { - // log.Printf("OnMessage channel %d %+q", len(msg), msg) - log.Printf("OnMessage <--- %d bytes", len(msg)) - n, err := pw.Write(msg) - if err != nil { - pw.CloseWithError(err) - } - if n != len(msg) { - panic("short write") - } - } + defer or.Close()
- go copyLoop(conn, or) + copyLoop(conn, or) }
-func makePeerConnection(config *webrtc.Configuration) (*webrtc.PeerConnection, error) { - pc, err := webrtc.NewPeerConnection(config) +// Create a PeerConnection from an SDP offer. Blocks until the gathering of ICE +// candidates is complete and and answer is available in LocalDescription. +// Installs an OnDataChannel callback that creates a webRTCConn and passes it to +// datachannelHandler. +func makePeerConnectionFromOffer(sdp *webrtc.SessionDescription, config *webrtc.Configuration) (*webrtc.PeerConnection, error) { + errChan := make(chan error) + answerChan := make(chan *webrtc.SessionDescription)
+ pc, err := webrtc.NewPeerConnection(config) if err != nil { - log.Printf("NewPeerConnection: %s", err) - return nil, err + return nil, fmt.Errorf("accept: NewPeerConnection: %s", err) } pc.OnNegotiationNeeded = func() { panic("OnNegotiationNeeded") } - pc.OnDataChannel = func(dc *data.Channel) { - log.Println("OnDataChannel") - datachannelHandler(&webRTCConn{pc: pc, dc: dc}) - } pc.OnIceComplete = func() { - log.Printf("----------------") - fmt.Fprintln(logFile, pc.LocalDescription().Serialize()) - log.Printf("----------------") + answerChan <- pc.LocalDescription() } - return pc, nil -} + pc.OnDataChannel = func(dc *data.Channel) { + log.Println("OnDataChannel")
-func readSignalingMessages(signalChan chan *webrtc.SessionDescription, f *os.File) { - s := bufio.NewScanner(f) - for s.Scan() { - msg := s.Text() - sdp := webrtc.DeserializeSessionDescription(msg) - if sdp == nil { - log.Printf("ignoring invalid signal message %+q", msg) - continue + pr, pw := io.Pipe() + + dc.OnOpen = func() { + log.Println("OnOpen channel") } - signalChan <- sdp - continue - } - if err := s.Err(); err != nil { - log.Printf("signal FIFO: %s", err) + dc.OnClose = func() { + log.Println("OnClose channel") + pw.Close() + } + dc.OnMessage = func(msg []byte) { + log.Printf("OnMessage <--- %d bytes", len(msg)) + n, err := pw.Write(msg) + if err != nil { + pw.CloseWithError(err) + } + if n != len(msg) { + panic("short write") + } + } + + conn := &webRTCConn{pc: pc, dc: dc, pr: pr} + go datachannelHandler(conn) } -}
-func generateAnswer(pc *webrtc.PeerConnection) { - fmt.Println("Generating answer...") - answer, err := pc.CreateAnswer() // blocking + err = pc.SetRemoteDescription(sdp) if err != nil { - fmt.Println(err) - return + pc.Close() + return nil, fmt.Errorf("accept: SetRemoteDescription: %s", err) } - pc.SetLocalDescription(answer) + log.Println("sdp offer successfully received.") + + go func() { + log.Println("Generating answer...") + answer, err := pc.CreateAnswer() // blocking + if err != nil { + errChan <- err + return + } + err = pc.SetLocalDescription(answer) + if err != nil { + errChan <- err + return + } + }() + + // Wait until answer is ready. + select { + case err = <-errChan: + pc.Close() + return nil, err + case <-answerChan: + } + + return pc, nil }
-func listenWebRTC(config *webrtc.Configuration, signal string) (*os.File, error) { - err := syscall.Mkfifo(signal, 0600) +// Create a signaling named pipe and feed offers from it into +// makePeerConnectionFromOffer. +func receiveSignalsFIFO(filename string, config *webrtc.Configuration) error { + err := syscall.Mkfifo(filename, 0600) if err != nil { if err.(syscall.Errno) != syscall.EEXIST { - return nil, err + return err } } - signalFile, err := os.OpenFile(signal, os.O_RDONLY, 0600) + signalFile, err := os.OpenFile(filename, os.O_RDONLY, 0600) if err != nil { - return nil, err + return err } - //defer signalFile.Close() + defer signalFile.Close()
- var signalChan = make(chan *webrtc.SessionDescription) - - go func() { - for { - select { - case sdp := <-signalChan: - pc, err := makePeerConnection(config) - if err != nil { - log.Printf("makePeerConnection: %s", err) - break - } - err = pc.SetRemoteDescription(sdp) - if err != nil { - fmt.Println("ERROR", err) - break - } - fmt.Println("sdp offer successfully received.") - go generateAnswer(pc) - } + s := bufio.NewScanner(signalFile) + for s.Scan() { + msg := s.Text() + sdp := webrtc.DeserializeSessionDescription(msg) + if sdp == nil { + log.Printf("ignoring invalid signal message %+q", msg) + continue } - }()
- go readSignalingMessages(signalChan, signalFile) - log.Printf("waiting for offer") - return signalFile, nil + pc, err := makePeerConnectionFromOffer(sdp, config) + if err != nil { + log.Printf("makePeerConnectionFromOffer: %s", err) + continue + } + // Write offer to log for manual signaling. + log.Printf("----------------") + fmt.Fprintln(logFile, pc.LocalDescription().Serialize()) + log.Printf("----------------") + } + return s.Err() }
func main() { @@ -228,18 +232,19 @@ func main() {
webRTCConfig := webrtc.NewConfiguration(webrtc.OptionIceServer("stun:stun.l.google.com:19302"))
- listeners := make([]*os.File, 0) + // Start FIFO-based signaling receiver. + go func() { + err := receiveSignalsFIFO("signal", webRTCConfig) + if err != nil { + log.Printf("receiveSignalsFIFO: %s", err) + } + }() + for _, bindaddr := range ptInfo.Bindaddrs { switch bindaddr.MethodName { case ptMethodName: - ln, err := listenWebRTC(webRTCConfig, "signal") // meh - if err != nil { - pt.SmethodError(bindaddr.MethodName, err.Error()) - break - } bindaddr.Addr.Port = 12345 // lies!!! pt.Smethod(bindaddr.MethodName, bindaddr.Addr) - listeners = append(listeners, ln) default: pt.SmethodError(bindaddr.MethodName, "no such method") } @@ -260,9 +265,6 @@ func main() { case sig = <-sigChan: } } - for _, ln := range listeners { - ln.Close() - }
if sig == syscall.SIGTERM { return
tor-commits@lists.torproject.org