commit 0428797ea09814afb36796014e96d911c2ecc554 Author: Cecylia Bocovich cohosh@torproject.org Date: Fri Jun 14 18:46:07 2019 -0400
Modified proxy-go to use pion/webrtc
The API is very similar, differences were mostly due to: - closing peer connections and datachannels (no destroy/delete methods) - different way to set datachannel/peer connection callbacks - differences in whether functions take pointers or values - no serialize/deserialize functions in the API --- proxy-go/snowflake.go | 119 +++++++++++++++++++++++++++++++++++--------------- 1 file changed, 84 insertions(+), 35 deletions(-)
diff --git a/proxy-go/snowflake.go b/proxy-go/snowflake.go index b3c863a..e0c9def 100644 --- a/proxy-go/snowflake.go +++ b/proxy-go/snowflake.go @@ -4,6 +4,7 @@ import ( "bytes" "crypto/rand" "encoding/base64" + "encoding/json" "flag" "fmt" "io" @@ -19,7 +20,7 @@ import ( "time"
"git.torproject.org/pluggable-transports/snowflake.git/common/safelog" - "github.com/keroserene/go-webrtc" + "github.com/pion/webrtc" "golang.org/x/net/websocket" )
@@ -43,7 +44,7 @@ const (
var ( tokens chan bool - config *webrtc.Configuration + config webrtc.Configuration client http.Client )
@@ -92,7 +93,7 @@ func (c *webRTCConn) Write(b []byte) (int, error) {
func (c *webRTCConn) Close() (err error) { c.once.Do(func() { - err = c.pc.Destroy() + err = c.pc.Close() }) return } @@ -103,7 +104,7 @@ func (c *webRTCConn) LocalAddr() net.Addr {
func (c *webRTCConn) RemoteAddr() net.Addr { //Parse Remote SDP offer and extract client IP - clientIP := remoteIPFromSDP(c.pc.RemoteDescription().Sdp) + clientIP := remoteIPFromSDP(c.pc.RemoteDescription().SDP) if clientIP == nil { return nil } @@ -178,7 +179,7 @@ func pollOffer(sid string) *webrtc.SessionDescription { if err != nil { log.Printf("error reading broker response: %s", err) } else { - return webrtc.DeserializeSessionDescription(string(body)) + return deserializeSessionDescription(string(body)) } } } @@ -187,7 +188,7 @@ func pollOffer(sid string) *webrtc.SessionDescription {
func sendAnswer(sid string, pc *webrtc.PeerConnection) error { broker := brokerURL.ResolveReference(&url.URL{Path: "answer"}) - body := bytes.NewBuffer([]byte(pc.LocalDescription().Serialize())) + body := bytes.NewBuffer([]byte(serializeSessionDescription(pc.LocalDescription()))) req, _ := http.NewRequest("POST", broker.String(), body) req.Header.Set("X-Session-ID", sid) resp, err := client.Do(req) @@ -275,71 +276,63 @@ func datachannelHandler(conn *webRTCConn, remoteAddr net.Addr) { // candidates is complete and the answer is available in LocalDescription. // Installs an OnDataChannel callback that creates a webRTCConn and passes it to // datachannelHandler. -func makePeerConnectionFromOffer(sdp *webrtc.SessionDescription, config *webrtc.Configuration, dataChan chan struct{}) (*webrtc.PeerConnection, error) { +func makePeerConnectionFromOffer(sdp *webrtc.SessionDescription, config webrtc.Configuration, dataChan chan struct{}) (*webrtc.PeerConnection, error) { pc, err := webrtc.NewPeerConnection(config) if err != nil { return nil, fmt.Errorf("accept: NewPeerConnection: %s", err) } - pc.OnNegotiationNeeded = func() { - panic("OnNegotiationNeeded") - } - pc.OnDataChannel = func(dc *webrtc.DataChannel) { + pc.OnDataChannel(func(dc *webrtc.DataChannel) { log.Println("OnDataChannel") close(dataChan)
pr, pw := io.Pipe() conn := &webRTCConn{pc: pc, dc: dc, pr: pr}
- dc.OnOpen = func() { + dc.OnOpen(func() { log.Println("OnOpen channel") - } - dc.OnClose = func() { + }) + dc.OnClose(func() { conn.lock.Lock() defer conn.lock.Unlock() log.Println("OnClose channel") conn.dc = nil - pc.DeleteDataChannel(dc) + dc.Close() pw.Close() - } - dc.OnMessage = func(msg []byte) { - log.Printf("OnMessage <--- %d bytes", len(msg)) - n, err := pw.Write(msg) + }) + dc.OnMessage(func(msg webrtc.DataChannelMessage) { + log.Printf("OnMessage <--- %d bytes", len(msg.Data)) + n, err := pw.Write(msg.Data) if err != nil { pw.CloseWithError(err) } - if n != len(msg) { + if n != len(msg.Data) { panic("short write") } - } + })
go datachannelHandler(conn, conn.RemoteAddr()) - } + })
- err = pc.SetRemoteDescription(sdp) + err = pc.SetRemoteDescription(*sdp) if err != nil { - pc.Destroy() + pc.Close() return nil, fmt.Errorf("accept: SetRemoteDescription: %s", err) } log.Println("sdp offer successfully received.")
log.Println("Generating answer...") - answer, err := pc.CreateAnswer() + answer, err := pc.CreateAnswer(nil) // blocks on ICE gathering. we need to add a timeout if needed // not putting this in a separate go routine, because we need // SetLocalDescription(answer) to be called before sendAnswer if err != nil { - pc.Destroy() + pc.Close() return nil, err }
- if answer == nil { - pc.Destroy() - return nil, fmt.Errorf("Failed gathering ICE candidates.") - } - err = pc.SetLocalDescription(answer) if err != nil { - pc.Destroy() + pc.Close() return nil, err }
@@ -363,7 +356,7 @@ func runSession(sid string) { err = sendAnswer(sid, pc) if err != nil { log.Printf("error sending answer to client through broker: %s", err) - pc.Destroy() + pc.Close() retToken() return } @@ -375,7 +368,7 @@ func runSession(sid string) { log.Println("Connection successful.") case <-time.After(dataChannelTimeout): log.Println("Timed out waiting for client to open data channel.") - pc.Destroy() + pc.Close() retToken() } } @@ -422,7 +415,13 @@ func main() { log.Fatalf("invalid relay url: %s", err) }
- config = webrtc.NewConfiguration(webrtc.OptionIceServer(stunURL)) + config = webrtc.Configuration{ + ICEServers: []webrtc.ICEServer{ + { + URLs: []string{stunURL}, + }, + }, + } tokens = make(chan bool, capacity) for i := uint(0); i < capacity; i++ { tokens <- true @@ -434,3 +433,53 @@ func main() { runSession(sessionID) } } + +func deserializeSessionDescription(msg string) *webrtc.SessionDescription { + var parsed map[string]interface{} + err := json.Unmarshal([]byte(msg), &parsed) + if nil != err { + log.Println(err) + return nil + } + if _, ok := parsed["type"]; !ok { + log.Println("Cannot deserialize SessionDescription without type field.") + return nil + } + if _, ok := parsed["sdp"]; !ok { + log.Println("Cannot deserialize SessionDescription without sdp field.") + return nil + } + + var stype webrtc.SDPType + switch parsed["type"].(string) { + default: + log.Println("Unknown SDP type") + return nil + case "offer": + stype = webrtc.SDPTypeOffer + case "pranswer": + stype = webrtc.SDPTypePranswer + case "answer": + stype = webrtc.SDPTypeAnswer + case "rollback": + stype = webrtc.SDPTypeRollback + } + + if err != nil { + log.Println(err) + return nil + } + return &webrtc.SessionDescription{ + Type: stype, + SDP: parsed["sdp"].(string), + } +} + +func serializeSessionDescription(desc *webrtc.SessionDescription) string { + bytes, err := json.Marshal(*desc) + if nil != err { + log.Println(err) + return "" + } + return string(bytes) +}
tor-commits@lists.torproject.org