[tor-commits] [snowflake/master] Ported snowflake client to work with pion/webrtc

cohosh at torproject.org cohosh at torproject.org
Tue Oct 8 13:55:29 UTC 2019


commit b5c50b69d080fe488620367a1203ec3c67bb93b1
Author: Cecylia Bocovich <cohosh at torproject.org>
Date:   Tue Jun 25 16:35:37 2019 -0400

    Ported snowflake client to work with pion/webrtc
    
    Modified the snowflake client to use pion/webrtc as the webrtc library.
    This involved a few small changes to match function signatures as well
    as several larger ones:
    - OnNegotiationNeeded is no longer supported, so CreateOffer and
    SetLocalDescription have been moved to a go routine called after the
    other peer connection callbacks are set
    - We need our own deserialize/serialize functions
    - We need to use a SettingEngine in order to access the
    OnICEGatheringStateChange callback
---
 client/lib/interfaces.go |  2 +-
 client/lib/lib_test.go   | 14 +++----
 client/lib/rendezvous.go | 21 ++++++-----
 client/lib/util.go       | 59 +++++++++++++++++++++++++----
 client/lib/webrtc.go     | 98 ++++++++++++++++++++++++++----------------------
 client/snowflake.go      | 28 ++++++++++----
 6 files changed, 146 insertions(+), 76 deletions(-)

diff --git a/client/lib/interfaces.go b/client/lib/interfaces.go
index f62d4f5..609e610 100644
--- a/client/lib/interfaces.go
+++ b/client/lib/interfaces.go
@@ -52,5 +52,5 @@ type SocksConnector interface {
 // Interface for the Snowflake's transport. (Typically just webrtc.DataChannel)
 type SnowflakeDataChannel interface {
 	io.Closer
-	Send([]byte)
+	Send([]byte) error
 }
diff --git a/client/lib/lib_test.go b/client/lib/lib_test.go
index 4f74cb3..4e9e2c7 100644
--- a/client/lib/lib_test.go
+++ b/client/lib/lib_test.go
@@ -8,7 +8,7 @@ import (
 	"net/http"
 	"testing"
 
-	"github.com/keroserene/go-webrtc"
+	"github.com/pion/webrtc"
 	. "github.com/smartystreets/goconvey/convey"
 )
 
@@ -17,9 +17,10 @@ type MockDataChannel struct {
 	done        chan bool
 }
 
-func (m *MockDataChannel) Send(data []byte) {
+func (m *MockDataChannel) Send(data []byte) error {
 	m.destination.Write(data)
 	m.done <- true
+	return nil
 }
 
 func (*MockDataChannel) Close() error { return nil }
@@ -217,11 +218,11 @@ func TestSnowflakeClient(t *testing.T) {
 				c.offerChannel = make(chan *webrtc.SessionDescription, 1)
 				c.answerChannel = make(chan *webrtc.SessionDescription, 1)
 
-				c.config = webrtc.NewConfiguration()
+				c.config = &webrtc.Configuration{}
 				c.preparePeerConnection()
 
 				c.offerChannel <- nil
-				answer := webrtc.DeserializeSessionDescription(
+				answer := deserializeSessionDescription(
 					`{"type":"answer","sdp":""}`)
 				c.answerChannel <- answer
 				c.exchangeSDP()
@@ -264,12 +265,11 @@ func TestSnowflakeClient(t *testing.T) {
 	})
 
 	Convey("Rendezvous", t, func() {
-		webrtc.SetLoggingVerbosity(0)
 		transport := &MockTransport{
 			http.StatusOK,
 			[]byte(`{"type":"answer","sdp":"fake"}`),
 		}
-		fakeOffer := webrtc.DeserializeSessionDescription("test")
+		fakeOffer := deserializeSessionDescription(`{"type":"offer","sdp":"test"}`)
 
 		Convey("Construct BrokerChannel with no front domain", func() {
 			b := NewBrokerChannel("test.broker", "", transport)
@@ -291,7 +291,7 @@ func TestSnowflakeClient(t *testing.T) {
 			answer, err := b.Negotiate(fakeOffer)
 			So(err, ShouldBeNil)
 			So(answer, ShouldNotBeNil)
-			So(answer.Sdp, ShouldResemble, "fake")
+			So(answer.SDP, ShouldResemble, "fake")
 		})
 
 		Convey("BrokerChannel.Negotiate fails with 503", func() {
diff --git a/client/lib/rendezvous.go b/client/lib/rendezvous.go
index 54ce459..8f994f4 100644
--- a/client/lib/rendezvous.go
+++ b/client/lib/rendezvous.go
@@ -17,7 +17,7 @@ import (
 	"net/http"
 	"net/url"
 
-	"github.com/keroserene/go-webrtc"
+	"github.com/pion/webrtc"
 )
 
 const (
@@ -84,7 +84,7 @@ func (bc *BrokerChannel) Negotiate(offer *webrtc.SessionDescription) (
 	*webrtc.SessionDescription, error) {
 	log.Println("Negotiating via BrokerChannel...\nTarget URL: ",
 		bc.Host, "\nFront URL:  ", bc.url.Host)
-	data := bytes.NewReader([]byte(offer.Serialize()))
+	data := bytes.NewReader([]byte(serializeSessionDescription(offer)))
 	// Suffix with broker's client registration handler.
 	clientURL := bc.url.ResolveReference(&url.URL{Path: "client"})
 	request, err := http.NewRequest("POST", clientURL.String(), data)
@@ -107,7 +107,7 @@ func (bc *BrokerChannel) Negotiate(offer *webrtc.SessionDescription) (
 		if nil != err {
 			return nil, err
 		}
-		answer := webrtc.DeserializeSessionDescription(string(body))
+		answer := deserializeSessionDescription(string(body))
 		return answer, nil
 
 	case http.StatusServiceUnavailable:
@@ -126,15 +126,18 @@ type WebRTCDialer struct {
 }
 
 func NewWebRTCDialer(
-	broker *BrokerChannel, iceServers IceServerList) *WebRTCDialer {
-	config := webrtc.NewConfiguration(iceServers...)
-	if nil == config {
-		log.Println("Unable to prepare WebRTC configuration.")
-		return nil
+	broker *BrokerChannel, iceServers []webrtc.ICEServer) *WebRTCDialer {
+	var config webrtc.Configuration
+	if iceServers == nil {
+		config = webrtc.Configuration{
+			ICEServers: iceServers,
+		}
+	} else {
+		config = webrtc.Configuration{}
 	}
 	return &WebRTCDialer{
 		BrokerChannel: broker,
-		webrtcConfig:  config,
+		webrtcConfig:  &config,
 	}
 }
 
diff --git a/client/lib/util.go b/client/lib/util.go
index 028fb1c..f385279 100644
--- a/client/lib/util.go
+++ b/client/lib/util.go
@@ -1,23 +1,17 @@
 package lib
 
 import (
-	"fmt"
+	"encoding/json"
 	"log"
 	"time"
 
-	"github.com/keroserene/go-webrtc"
+	"github.com/pion/webrtc"
 )
 
 const (
 	LogTimeInterval = 5
 )
 
-type IceServerList []webrtc.ConfigurationOption
-
-func (i *IceServerList) String() string {
-	return fmt.Sprint(*i)
-}
-
 type BytesLogger interface {
 	Log()
 	AddOutbound(int)
@@ -93,3 +87,52 @@ func (b *BytesSyncLogger) AddInbound(amount int) {
 	}
 	b.InboundChan <- amount
 }
+func deserializeSessionDescription(msg string) *webrtc.SessionDescription {
+	var parsed map[string]interface{}
+	err := json.Unmarshal([]byte(msg), &parsed)
+	if nil != err {
+		log.Println(err)
+		return nil
+	}
+	if _, ok := parsed["type"]; !ok {
+		log.Println("Cannot deserialize SessionDescription without type field.")
+		return nil
+	}
+	if _, ok := parsed["sdp"]; !ok {
+		log.Println("Cannot deserialize SessionDescription without sdp field.")
+		return nil
+	}
+
+	var stype webrtc.SDPType
+	switch parsed["type"].(string) {
+	default:
+		log.Println("Unknown SDP type")
+		return nil
+	case "offer":
+		stype = webrtc.SDPTypeOffer
+	case "pranswer":
+		stype = webrtc.SDPTypePranswer
+	case "answer":
+		stype = webrtc.SDPTypeAnswer
+	case "rollback":
+		stype = webrtc.SDPTypeRollback
+	}
+
+	if err != nil {
+		log.Println(err)
+		return nil
+	}
+	return &webrtc.SessionDescription{
+		Type: stype,
+		SDP:  parsed["sdp"].(string),
+	}
+}
+
+func serializeSessionDescription(desc *webrtc.SessionDescription) string {
+	bytes, err := json.Marshal(*desc)
+	if nil != err {
+		log.Println(err)
+		return ""
+	}
+	return string(bytes)
+}
diff --git a/client/lib/webrtc.go b/client/lib/webrtc.go
index 6406da5..dbc205e 100644
--- a/client/lib/webrtc.go
+++ b/client/lib/webrtc.go
@@ -9,7 +9,7 @@ import (
 	"time"
 
 	"github.com/dchest/uniuri"
-	"github.com/keroserene/go-webrtc"
+	"github.com/pion/webrtc"
 )
 
 // Remote WebRTC peer.
@@ -151,48 +151,54 @@ func (c *WebRTCPeer) Connect() error {
 // Create and prepare callbacks on a new WebRTC PeerConnection.
 func (c *WebRTCPeer) preparePeerConnection() error {
 	if nil != c.pc {
-		c.pc.Destroy()
+		c.pc.Close()
 		c.pc = nil
 	}
-	pc, err := webrtc.NewPeerConnection(c.config)
+	s := webrtc.SettingEngine{}
+	s.SetTrickle(true)
+	api := webrtc.NewAPI(webrtc.WithSettingEngine(s))
+	pc, err := api.NewPeerConnection(*c.config)
 	if err != nil {
 		log.Printf("NewPeerConnection ERROR: %s", err)
 		return err
 	}
 	// Prepare PeerConnection callbacks.
-	pc.OnNegotiationNeeded = func() {
-		log.Println("WebRTC: OnNegotiationNeeded")
-		go func() {
-			offer, err := pc.CreateOffer()
-			// TODO: Potentially timeout and retry if ICE isn't working.
-			if err != nil {
-				c.errorChannel <- err
-				return
-			}
-			err = pc.SetLocalDescription(offer)
-			if err != nil {
-				c.errorChannel <- err
-				return
-			}
-		}()
-	}
-	// Allow candidates to accumulate until IceGatheringStateComplete.
-	pc.OnIceCandidate = func(candidate webrtc.IceCandidate) {
-		log.Printf(candidate.Candidate)
-	}
-	pc.OnIceGatheringStateChange = func(state webrtc.IceGatheringState) {
-		if state == webrtc.IceGatheringStateComplete {
-			log.Printf("WebRTC: IceGatheringStateComplete")
+	// Allow candidates to accumulate until ICEGatheringStateComplete.
+	pc.OnICECandidate(func(candidate *webrtc.ICECandidate) {
+		if candidate == nil {
+			log.Printf("WebRTC: Done gathering candidates")
+		} else {
+			log.Printf("WebRTC: Got ICE candidate: %s", candidate.String())
+		}
+	})
+	pc.OnICEGatheringStateChange(func(state webrtc.ICEGathererState) {
+		if state == webrtc.ICEGathererStateComplete {
+			log.Println("WebRTC: ICEGatheringStateComplete")
 			c.offerChannel <- pc.LocalDescription()
 		}
-	}
+	})
 	// This callback is not expected, as the Client initiates the creation
 	// of the data channel, not the remote peer.
-	pc.OnDataChannel = func(channel *webrtc.DataChannel) {
+	pc.OnDataChannel(func(channel *webrtc.DataChannel) {
 		log.Println("OnDataChannel")
 		panic("Unexpected OnDataChannel!")
-	}
+	})
 	c.pc = pc
+	go func() {
+		offer, err := pc.CreateOffer(nil)
+		// TODO: Potentially timeout and retry if ICE isn't working.
+		if err != nil {
+			c.errorChannel <- err
+			return
+		}
+		log.Println("WebRTC: Created offer")
+		err = pc.SetLocalDescription(offer)
+		if err != nil {
+			c.errorChannel <- err
+			return
+		}
+		log.Println("WebRTC: Set local description")
+	}()
 	log.Println("WebRTC: PeerConnection created.")
 	return nil
 }
@@ -204,7 +210,11 @@ func (c *WebRTCPeer) establishDataChannel() error {
 	if c.transport != nil {
 		panic("Unexpected datachannel already exists!")
 	}
-	dc, err := c.pc.CreateDataChannel(c.id)
+	ordered := true
+	dataChannelOptions := &webrtc.DataChannelInit{
+		Ordered: &ordered,
+	}
+	dc, err := c.pc.CreateDataChannel(c.id, dataChannelOptions)
 	// Triggers "OnNegotiationNeeded" on the PeerConnection, which will prepare
 	// an SDP offer while other goroutines operating on this struct handle the
 	// signaling. Eventually fires "OnOpen".
@@ -212,7 +222,7 @@ func (c *WebRTCPeer) establishDataChannel() error {
 		log.Printf("CreateDataChannel ERROR: %s", err)
 		return err
 	}
-	dc.OnOpen = func() {
+	dc.OnOpen(func() {
 		c.lock.Lock()
 		defer c.lock.Unlock()
 		log.Println("WebRTC: DataChannel.OnOpen")
@@ -227,8 +237,8 @@ func (c *WebRTCPeer) establishDataChannel() error {
 		}
 		// Then enable the datachannel.
 		c.transport = dc
-	}
-	dc.OnClose = func() {
+	})
+	dc.OnClose(func() {
 		c.lock.Lock()
 		// Future writes will go to the buffer until a new DataChannel is available.
 		if nil == c.transport {
@@ -241,29 +251,29 @@ func (c *WebRTCPeer) establishDataChannel() error {
 		// Disable the DataChannel as a write destination.
 		log.Println("WebRTC: DataChannel.OnClose [remotely]")
 		c.transport = nil
-		c.pc.DeleteDataChannel(dc)
+		dc.Close()
 		// Unlock before Close'ing, since it calls cleanup and asks for the
 		// lock to check if the transport needs to be be deleted.
 		c.lock.Unlock()
 		c.Close()
-	}
-	dc.OnMessage = func(msg []byte) {
-		if len(msg) <= 0 {
+	})
+	dc.OnMessage(func(msg webrtc.DataChannelMessage) {
+		if len(msg.Data) <= 0 {
 			log.Println("0 length message---")
 		}
-		c.BytesLogger.AddInbound(len(msg))
-		n, err := c.writePipe.Write(msg)
+		c.BytesLogger.AddInbound(len(msg.Data))
+		n, err := c.writePipe.Write(msg.Data)
 		if err != nil {
 			// TODO: Maybe shouldn't actually close.
 			log.Println("Error writing to SOCKS pipe")
 			c.writePipe.CloseWithError(err)
 		}
-		if n != len(msg) {
+		if n != len(msg.Data) {
 			log.Println("Error: short write")
 			panic("short write")
 		}
 		c.lastReceive = time.Now()
-	}
+	})
 	log.Println("WebRTC: DataChannel created.")
 	return nil
 }
@@ -304,7 +314,7 @@ func (c *WebRTCPeer) exchangeSDP() error {
 		}
 	}
 	log.Printf("Received Answer.\n")
-	err := c.pc.SetRemoteDescription(answer)
+	err := c.pc.SetRemoteDescription(*answer)
 	if nil != err {
 		log.Println("WebRTC: Unable to SetRemoteDescription:", err)
 		return err
@@ -342,13 +352,13 @@ func (c *WebRTCPeer) cleanup() {
 		if c.pc == nil {
 			panic("DataChannel w/o PeerConnection, not good.")
 		}
-		c.pc.DeleteDataChannel(dataChannel.(*webrtc.DataChannel))
+		dataChannel.(*webrtc.DataChannel).Close()
 	} else {
 		c.lock.Unlock()
 	}
 	if nil != c.pc {
 		log.Printf("WebRTC: closing PeerConnection")
-		err := c.pc.Destroy()
+		err := c.pc.Close()
 		if nil != err {
 			log.Printf("Error closing peerconnection...")
 		}
diff --git a/client/snowflake.go b/client/snowflake.go
index 9098de7..01c89d8 100644
--- a/client/snowflake.go
+++ b/client/snowflake.go
@@ -17,7 +17,7 @@ import (
 	"git.torproject.org/pluggable-transports/goptlib.git"
 	sf "git.torproject.org/pluggable-transports/snowflake.git/client/lib"
 	"git.torproject.org/pluggable-transports/snowflake.git/common/safelog"
-	"github.com/keroserene/go-webrtc"
+	"github.com/pion/webrtc"
 )
 
 const (
@@ -65,6 +65,25 @@ func socksAcceptLoop(ln *pt.SocksListener, snowflakes sf.SnowflakeCollector) err
 	}
 }
 
+//s is a comma-separated list of ICE server URLs
+func parseIceServers(s string) []webrtc.ICEServer {
+	var servers []webrtc.ICEServer
+	log.Println(s)
+	s = strings.TrimSpace(s)
+	if len(s) == 0 {
+		return nil
+	}
+	urls := strings.Split(s, ",")
+	log.Printf("Using ICE Servers:")
+	for _, url := range urls {
+		log.Printf("url: %s", url)
+		servers = append(servers, webrtc.ICEServer{
+			URLs: []string{url},
+		})
+	}
+	return servers
+}
+
 func main() {
 	iceServersCommas := flag.String("ice", "", "comma-separated list of ICE servers")
 	brokerURL := flag.String("url", "", "URL of signaling broker")
@@ -75,7 +94,6 @@ func main() {
 		"capacity for number of multiplexed WebRTC peers")
 	flag.Parse()
 
-	webrtc.SetLoggingVerbosity(1)
 	log.SetFlags(log.LstdFlags | log.LUTC)
 
 	// Don't write to stderr; versions of tor earlier than about
@@ -105,11 +123,7 @@ func main() {
 
 	log.Println("\n\n\n --- Starting Snowflake Client ---")
 
-	var iceServers sf.IceServerList
-	if len(strings.TrimSpace(*iceServersCommas)) > 0 {
-		option := webrtc.OptionIceServer(*iceServersCommas)
-		iceServers = append(iceServers, option)
-	}
+	iceServers := parseIceServers(*iceServersCommas)
 
 	// Prepare to collect remote WebRTC peers.
 	snowflakes := sf.NewPeers(*max)





More information about the tor-commits mailing list