commit ac8669b38f8b2a363b5f46036ea8e5de5a69c121
Author: David Fifield <david(a)bamsoftware.com>
Date: Mon Jan 18 21:13:08 2016 -0800
Refactor signal receiving in server.
There's one FIFO reader goroutine instead of one per bindaddr.
makePeerConnectionFromOffer gives you a PeerConnection with an answer
and also sets up callbacks to pass a webRTCConn to datachannelHandler
when ready.
---
server/snowflake.go | 196 ++++++++++++++++++++++++++-------------------------
1 file changed, 99 insertions(+), 97 deletions(-)
diff --git a/server/snowflake.go b/server/snowflake.go
index d046998..88a07d8 100644
--- a/server/snowflake.go
+++ b/server/snowflake.go
@@ -81,6 +81,8 @@ func (c *webRTCConn) SetWriteDeadline(t time.Time) error {
}
func datachannelHandler(conn *webRTCConn) {
+ defer conn.Close()
+
handlerChan <- 1
defer func() {
handlerChan <- -1
@@ -91,121 +93,123 @@ func datachannelHandler(conn *webRTCConn) {
log.Printf("Failed to connect to ORPort: " + err.Error())
return
}
- //defer or.Close()
-
- pr, pw := io.Pipe()
- conn.pr = pr
-
- dc := conn.dc
- dc.OnOpen = func() {
- log.Println("OnOpen channel")
- }
- dc.OnClose = func() {
- log.Println("OnClose channel")
- pw.Close()
- }
- dc.OnMessage = func(msg []byte) {
- // log.Printf("OnMessage channel %d %+q", len(msg), msg)
- log.Printf("OnMessage <--- %d bytes", len(msg))
- n, err := pw.Write(msg)
- if err != nil {
- pw.CloseWithError(err)
- }
- if n != len(msg) {
- panic("short write")
- }
- }
+ defer or.Close()
- go copyLoop(conn, or)
+ copyLoop(conn, or)
}
-func makePeerConnection(config *webrtc.Configuration) (*webrtc.PeerConnection, error) {
- pc, err := webrtc.NewPeerConnection(config)
+// Create a PeerConnection from an SDP offer. Blocks until the gathering of ICE
+// candidates is complete and and answer is available in LocalDescription.
+// Installs an OnDataChannel callback that creates a webRTCConn and passes it to
+// datachannelHandler.
+func makePeerConnectionFromOffer(sdp *webrtc.SessionDescription, config *webrtc.Configuration) (*webrtc.PeerConnection, error) {
+ errChan := make(chan error)
+ answerChan := make(chan *webrtc.SessionDescription)
+ pc, err := webrtc.NewPeerConnection(config)
if err != nil {
- log.Printf("NewPeerConnection: %s", err)
- return nil, err
+ return nil, fmt.Errorf("accept: NewPeerConnection: %s", err)
}
pc.OnNegotiationNeeded = func() {
panic("OnNegotiationNeeded")
}
- pc.OnDataChannel = func(dc *data.Channel) {
- log.Println("OnDataChannel")
- datachannelHandler(&webRTCConn{pc: pc, dc: dc})
- }
pc.OnIceComplete = func() {
- log.Printf("----------------")
- fmt.Fprintln(logFile, pc.LocalDescription().Serialize())
- log.Printf("----------------")
+ answerChan <- pc.LocalDescription()
}
- return pc, nil
-}
+ pc.OnDataChannel = func(dc *data.Channel) {
+ log.Println("OnDataChannel")
-func readSignalingMessages(signalChan chan *webrtc.SessionDescription, f *os.File) {
- s := bufio.NewScanner(f)
- for s.Scan() {
- msg := s.Text()
- sdp := webrtc.DeserializeSessionDescription(msg)
- if sdp == nil {
- log.Printf("ignoring invalid signal message %+q", msg)
- continue
+ pr, pw := io.Pipe()
+
+ dc.OnOpen = func() {
+ log.Println("OnOpen channel")
}
- signalChan <- sdp
- continue
- }
- if err := s.Err(); err != nil {
- log.Printf("signal FIFO: %s", err)
+ dc.OnClose = func() {
+ log.Println("OnClose channel")
+ pw.Close()
+ }
+ dc.OnMessage = func(msg []byte) {
+ log.Printf("OnMessage <--- %d bytes", len(msg))
+ n, err := pw.Write(msg)
+ if err != nil {
+ pw.CloseWithError(err)
+ }
+ if n != len(msg) {
+ panic("short write")
+ }
+ }
+
+ conn := &webRTCConn{pc: pc, dc: dc, pr: pr}
+ go datachannelHandler(conn)
}
-}
-func generateAnswer(pc *webrtc.PeerConnection) {
- fmt.Println("Generating answer...")
- answer, err := pc.CreateAnswer() // blocking
+ err = pc.SetRemoteDescription(sdp)
if err != nil {
- fmt.Println(err)
- return
+ pc.Close()
+ return nil, fmt.Errorf("accept: SetRemoteDescription: %s", err)
}
- pc.SetLocalDescription(answer)
+ log.Println("sdp offer successfully received.")
+
+ go func() {
+ log.Println("Generating answer...")
+ answer, err := pc.CreateAnswer() // blocking
+ if err != nil {
+ errChan <- err
+ return
+ }
+ err = pc.SetLocalDescription(answer)
+ if err != nil {
+ errChan <- err
+ return
+ }
+ }()
+
+ // Wait until answer is ready.
+ select {
+ case err = <-errChan:
+ pc.Close()
+ return nil, err
+ case <-answerChan:
+ }
+
+ return pc, nil
}
-func listenWebRTC(config *webrtc.Configuration, signal string) (*os.File, error) {
- err := syscall.Mkfifo(signal, 0600)
+// Create a signaling named pipe and feed offers from it into
+// makePeerConnectionFromOffer.
+func receiveSignalsFIFO(filename string, config *webrtc.Configuration) error {
+ err := syscall.Mkfifo(filename, 0600)
if err != nil {
if err.(syscall.Errno) != syscall.EEXIST {
- return nil, err
+ return err
}
}
- signalFile, err := os.OpenFile(signal, os.O_RDONLY, 0600)
+ signalFile, err := os.OpenFile(filename, os.O_RDONLY, 0600)
if err != nil {
- return nil, err
+ return err
}
- //defer signalFile.Close()
+ defer signalFile.Close()
- var signalChan = make(chan *webrtc.SessionDescription)
-
- go func() {
- for {
- select {
- case sdp := <-signalChan:
- pc, err := makePeerConnection(config)
- if err != nil {
- log.Printf("makePeerConnection: %s", err)
- break
- }
- err = pc.SetRemoteDescription(sdp)
- if err != nil {
- fmt.Println("ERROR", err)
- break
- }
- fmt.Println("sdp offer successfully received.")
- go generateAnswer(pc)
- }
+ s := bufio.NewScanner(signalFile)
+ for s.Scan() {
+ msg := s.Text()
+ sdp := webrtc.DeserializeSessionDescription(msg)
+ if sdp == nil {
+ log.Printf("ignoring invalid signal message %+q", msg)
+ continue
}
- }()
- go readSignalingMessages(signalChan, signalFile)
- log.Printf("waiting for offer")
- return signalFile, nil
+ pc, err := makePeerConnectionFromOffer(sdp, config)
+ if err != nil {
+ log.Printf("makePeerConnectionFromOffer: %s", err)
+ continue
+ }
+ // Write offer to log for manual signaling.
+ log.Printf("----------------")
+ fmt.Fprintln(logFile, pc.LocalDescription().Serialize())
+ log.Printf("----------------")
+ }
+ return s.Err()
}
func main() {
@@ -228,18 +232,19 @@ func main() {
webRTCConfig := webrtc.NewConfiguration(webrtc.OptionIceServer("stun:stun.l.google.com:19302"))
- listeners := make([]*os.File, 0)
+ // Start FIFO-based signaling receiver.
+ go func() {
+ err := receiveSignalsFIFO("signal", webRTCConfig)
+ if err != nil {
+ log.Printf("receiveSignalsFIFO: %s", err)
+ }
+ }()
+
for _, bindaddr := range ptInfo.Bindaddrs {
switch bindaddr.MethodName {
case ptMethodName:
- ln, err := listenWebRTC(webRTCConfig, "signal") // meh
- if err != nil {
- pt.SmethodError(bindaddr.MethodName, err.Error())
- break
- }
bindaddr.Addr.Port = 12345 // lies!!!
pt.Smethod(bindaddr.MethodName, bindaddr.Addr)
- listeners = append(listeners, ln)
default:
pt.SmethodError(bindaddr.MethodName, "no such method")
}
@@ -260,9 +265,6 @@ func main() {
case sig = <-sigChan:
}
}
- for _, ln := range listeners {
- ln.Close()
- }
if sig == syscall.SIGTERM {
return