commit b5c50b69d080fe488620367a1203ec3c67bb93b1 Author: Cecylia Bocovich cohosh@torproject.org Date: Tue Jun 25 16:35:37 2019 -0400
Ported snowflake client to work with pion/webrtc
Modified the snowflake client to use pion/webrtc as the webrtc library. This involved a few small changes to match function signatures as well as several larger ones: - OnNegotiationNeeded is no longer supported, so CreateOffer and SetLocalDescription have been moved to a go routine called after the other peer connection callbacks are set - We need our own deserialize/serialize functions - We need to use a SettingEngine in order to access the OnICEGatheringStateChange callback --- client/lib/interfaces.go | 2 +- client/lib/lib_test.go | 14 +++---- client/lib/rendezvous.go | 21 ++++++----- client/lib/util.go | 59 +++++++++++++++++++++++++---- client/lib/webrtc.go | 98 ++++++++++++++++++++++++++---------------------- client/snowflake.go | 28 ++++++++++---- 6 files changed, 146 insertions(+), 76 deletions(-)
diff --git a/client/lib/interfaces.go b/client/lib/interfaces.go index f62d4f5..609e610 100644 --- a/client/lib/interfaces.go +++ b/client/lib/interfaces.go @@ -52,5 +52,5 @@ type SocksConnector interface { // Interface for the Snowflake's transport. (Typically just webrtc.DataChannel) type SnowflakeDataChannel interface { io.Closer - Send([]byte) + Send([]byte) error } diff --git a/client/lib/lib_test.go b/client/lib/lib_test.go index 4f74cb3..4e9e2c7 100644 --- a/client/lib/lib_test.go +++ b/client/lib/lib_test.go @@ -8,7 +8,7 @@ import ( "net/http" "testing"
- "github.com/keroserene/go-webrtc" + "github.com/pion/webrtc" . "github.com/smartystreets/goconvey/convey" )
@@ -17,9 +17,10 @@ type MockDataChannel struct { done chan bool }
-func (m *MockDataChannel) Send(data []byte) { +func (m *MockDataChannel) Send(data []byte) error { m.destination.Write(data) m.done <- true + return nil }
func (*MockDataChannel) Close() error { return nil } @@ -217,11 +218,11 @@ func TestSnowflakeClient(t *testing.T) { c.offerChannel = make(chan *webrtc.SessionDescription, 1) c.answerChannel = make(chan *webrtc.SessionDescription, 1)
- c.config = webrtc.NewConfiguration() + c.config = &webrtc.Configuration{} c.preparePeerConnection()
c.offerChannel <- nil - answer := webrtc.DeserializeSessionDescription( + answer := deserializeSessionDescription( `{"type":"answer","sdp":""}`) c.answerChannel <- answer c.exchangeSDP() @@ -264,12 +265,11 @@ func TestSnowflakeClient(t *testing.T) { })
Convey("Rendezvous", t, func() { - webrtc.SetLoggingVerbosity(0) transport := &MockTransport{ http.StatusOK, []byte(`{"type":"answer","sdp":"fake"}`), } - fakeOffer := webrtc.DeserializeSessionDescription("test") + fakeOffer := deserializeSessionDescription(`{"type":"offer","sdp":"test"}`)
Convey("Construct BrokerChannel with no front domain", func() { b := NewBrokerChannel("test.broker", "", transport) @@ -291,7 +291,7 @@ func TestSnowflakeClient(t *testing.T) { answer, err := b.Negotiate(fakeOffer) So(err, ShouldBeNil) So(answer, ShouldNotBeNil) - So(answer.Sdp, ShouldResemble, "fake") + So(answer.SDP, ShouldResemble, "fake") })
Convey("BrokerChannel.Negotiate fails with 503", func() { diff --git a/client/lib/rendezvous.go b/client/lib/rendezvous.go index 54ce459..8f994f4 100644 --- a/client/lib/rendezvous.go +++ b/client/lib/rendezvous.go @@ -17,7 +17,7 @@ import ( "net/http" "net/url"
- "github.com/keroserene/go-webrtc" + "github.com/pion/webrtc" )
const ( @@ -84,7 +84,7 @@ func (bc *BrokerChannel) Negotiate(offer *webrtc.SessionDescription) ( *webrtc.SessionDescription, error) { log.Println("Negotiating via BrokerChannel...\nTarget URL: ", bc.Host, "\nFront URL: ", bc.url.Host) - data := bytes.NewReader([]byte(offer.Serialize())) + data := bytes.NewReader([]byte(serializeSessionDescription(offer))) // Suffix with broker's client registration handler. clientURL := bc.url.ResolveReference(&url.URL{Path: "client"}) request, err := http.NewRequest("POST", clientURL.String(), data) @@ -107,7 +107,7 @@ func (bc *BrokerChannel) Negotiate(offer *webrtc.SessionDescription) ( if nil != err { return nil, err } - answer := webrtc.DeserializeSessionDescription(string(body)) + answer := deserializeSessionDescription(string(body)) return answer, nil
case http.StatusServiceUnavailable: @@ -126,15 +126,18 @@ type WebRTCDialer struct { }
func NewWebRTCDialer( - broker *BrokerChannel, iceServers IceServerList) *WebRTCDialer { - config := webrtc.NewConfiguration(iceServers...) - if nil == config { - log.Println("Unable to prepare WebRTC configuration.") - return nil + broker *BrokerChannel, iceServers []webrtc.ICEServer) *WebRTCDialer { + var config webrtc.Configuration + if iceServers == nil { + config = webrtc.Configuration{ + ICEServers: iceServers, + } + } else { + config = webrtc.Configuration{} } return &WebRTCDialer{ BrokerChannel: broker, - webrtcConfig: config, + webrtcConfig: &config, } }
diff --git a/client/lib/util.go b/client/lib/util.go index 028fb1c..f385279 100644 --- a/client/lib/util.go +++ b/client/lib/util.go @@ -1,23 +1,17 @@ package lib
import ( - "fmt" + "encoding/json" "log" "time"
- "github.com/keroserene/go-webrtc" + "github.com/pion/webrtc" )
const ( LogTimeInterval = 5 )
-type IceServerList []webrtc.ConfigurationOption - -func (i *IceServerList) String() string { - return fmt.Sprint(*i) -} - type BytesLogger interface { Log() AddOutbound(int) @@ -93,3 +87,52 @@ func (b *BytesSyncLogger) AddInbound(amount int) { } b.InboundChan <- amount } +func deserializeSessionDescription(msg string) *webrtc.SessionDescription { + var parsed map[string]interface{} + err := json.Unmarshal([]byte(msg), &parsed) + if nil != err { + log.Println(err) + return nil + } + if _, ok := parsed["type"]; !ok { + log.Println("Cannot deserialize SessionDescription without type field.") + return nil + } + if _, ok := parsed["sdp"]; !ok { + log.Println("Cannot deserialize SessionDescription without sdp field.") + return nil + } + + var stype webrtc.SDPType + switch parsed["type"].(string) { + default: + log.Println("Unknown SDP type") + return nil + case "offer": + stype = webrtc.SDPTypeOffer + case "pranswer": + stype = webrtc.SDPTypePranswer + case "answer": + stype = webrtc.SDPTypeAnswer + case "rollback": + stype = webrtc.SDPTypeRollback + } + + if err != nil { + log.Println(err) + return nil + } + return &webrtc.SessionDescription{ + Type: stype, + SDP: parsed["sdp"].(string), + } +} + +func serializeSessionDescription(desc *webrtc.SessionDescription) string { + bytes, err := json.Marshal(*desc) + if nil != err { + log.Println(err) + return "" + } + return string(bytes) +} diff --git a/client/lib/webrtc.go b/client/lib/webrtc.go index 6406da5..dbc205e 100644 --- a/client/lib/webrtc.go +++ b/client/lib/webrtc.go @@ -9,7 +9,7 @@ import ( "time"
"github.com/dchest/uniuri" - "github.com/keroserene/go-webrtc" + "github.com/pion/webrtc" )
// Remote WebRTC peer. @@ -151,48 +151,54 @@ func (c *WebRTCPeer) Connect() error { // Create and prepare callbacks on a new WebRTC PeerConnection. func (c *WebRTCPeer) preparePeerConnection() error { if nil != c.pc { - c.pc.Destroy() + c.pc.Close() c.pc = nil } - pc, err := webrtc.NewPeerConnection(c.config) + s := webrtc.SettingEngine{} + s.SetTrickle(true) + api := webrtc.NewAPI(webrtc.WithSettingEngine(s)) + pc, err := api.NewPeerConnection(*c.config) if err != nil { log.Printf("NewPeerConnection ERROR: %s", err) return err } // Prepare PeerConnection callbacks. - pc.OnNegotiationNeeded = func() { - log.Println("WebRTC: OnNegotiationNeeded") - go func() { - offer, err := pc.CreateOffer() - // TODO: Potentially timeout and retry if ICE isn't working. - if err != nil { - c.errorChannel <- err - return - } - err = pc.SetLocalDescription(offer) - if err != nil { - c.errorChannel <- err - return - } - }() - } - // Allow candidates to accumulate until IceGatheringStateComplete. - pc.OnIceCandidate = func(candidate webrtc.IceCandidate) { - log.Printf(candidate.Candidate) - } - pc.OnIceGatheringStateChange = func(state webrtc.IceGatheringState) { - if state == webrtc.IceGatheringStateComplete { - log.Printf("WebRTC: IceGatheringStateComplete") + // Allow candidates to accumulate until ICEGatheringStateComplete. + pc.OnICECandidate(func(candidate *webrtc.ICECandidate) { + if candidate == nil { + log.Printf("WebRTC: Done gathering candidates") + } else { + log.Printf("WebRTC: Got ICE candidate: %s", candidate.String()) + } + }) + pc.OnICEGatheringStateChange(func(state webrtc.ICEGathererState) { + if state == webrtc.ICEGathererStateComplete { + log.Println("WebRTC: ICEGatheringStateComplete") c.offerChannel <- pc.LocalDescription() } - } + }) // This callback is not expected, as the Client initiates the creation // of the data channel, not the remote peer. - pc.OnDataChannel = func(channel *webrtc.DataChannel) { + pc.OnDataChannel(func(channel *webrtc.DataChannel) { log.Println("OnDataChannel") panic("Unexpected OnDataChannel!") - } + }) c.pc = pc + go func() { + offer, err := pc.CreateOffer(nil) + // TODO: Potentially timeout and retry if ICE isn't working. + if err != nil { + c.errorChannel <- err + return + } + log.Println("WebRTC: Created offer") + err = pc.SetLocalDescription(offer) + if err != nil { + c.errorChannel <- err + return + } + log.Println("WebRTC: Set local description") + }() log.Println("WebRTC: PeerConnection created.") return nil } @@ -204,7 +210,11 @@ func (c *WebRTCPeer) establishDataChannel() error { if c.transport != nil { panic("Unexpected datachannel already exists!") } - dc, err := c.pc.CreateDataChannel(c.id) + ordered := true + dataChannelOptions := &webrtc.DataChannelInit{ + Ordered: &ordered, + } + dc, err := c.pc.CreateDataChannel(c.id, dataChannelOptions) // Triggers "OnNegotiationNeeded" on the PeerConnection, which will prepare // an SDP offer while other goroutines operating on this struct handle the // signaling. Eventually fires "OnOpen". @@ -212,7 +222,7 @@ func (c *WebRTCPeer) establishDataChannel() error { log.Printf("CreateDataChannel ERROR: %s", err) return err } - dc.OnOpen = func() { + dc.OnOpen(func() { c.lock.Lock() defer c.lock.Unlock() log.Println("WebRTC: DataChannel.OnOpen") @@ -227,8 +237,8 @@ func (c *WebRTCPeer) establishDataChannel() error { } // Then enable the datachannel. c.transport = dc - } - dc.OnClose = func() { + }) + dc.OnClose(func() { c.lock.Lock() // Future writes will go to the buffer until a new DataChannel is available. if nil == c.transport { @@ -241,29 +251,29 @@ func (c *WebRTCPeer) establishDataChannel() error { // Disable the DataChannel as a write destination. log.Println("WebRTC: DataChannel.OnClose [remotely]") c.transport = nil - c.pc.DeleteDataChannel(dc) + dc.Close() // Unlock before Close'ing, since it calls cleanup and asks for the // lock to check if the transport needs to be be deleted. c.lock.Unlock() c.Close() - } - dc.OnMessage = func(msg []byte) { - if len(msg) <= 0 { + }) + dc.OnMessage(func(msg webrtc.DataChannelMessage) { + if len(msg.Data) <= 0 { log.Println("0 length message---") } - c.BytesLogger.AddInbound(len(msg)) - n, err := c.writePipe.Write(msg) + c.BytesLogger.AddInbound(len(msg.Data)) + n, err := c.writePipe.Write(msg.Data) if err != nil { // TODO: Maybe shouldn't actually close. log.Println("Error writing to SOCKS pipe") c.writePipe.CloseWithError(err) } - if n != len(msg) { + if n != len(msg.Data) { log.Println("Error: short write") panic("short write") } c.lastReceive = time.Now() - } + }) log.Println("WebRTC: DataChannel created.") return nil } @@ -304,7 +314,7 @@ func (c *WebRTCPeer) exchangeSDP() error { } } log.Printf("Received Answer.\n") - err := c.pc.SetRemoteDescription(answer) + err := c.pc.SetRemoteDescription(*answer) if nil != err { log.Println("WebRTC: Unable to SetRemoteDescription:", err) return err @@ -342,13 +352,13 @@ func (c *WebRTCPeer) cleanup() { if c.pc == nil { panic("DataChannel w/o PeerConnection, not good.") } - c.pc.DeleteDataChannel(dataChannel.(*webrtc.DataChannel)) + dataChannel.(*webrtc.DataChannel).Close() } else { c.lock.Unlock() } if nil != c.pc { log.Printf("WebRTC: closing PeerConnection") - err := c.pc.Destroy() + err := c.pc.Close() if nil != err { log.Printf("Error closing peerconnection...") } diff --git a/client/snowflake.go b/client/snowflake.go index 9098de7..01c89d8 100644 --- a/client/snowflake.go +++ b/client/snowflake.go @@ -17,7 +17,7 @@ import ( "git.torproject.org/pluggable-transports/goptlib.git" sf "git.torproject.org/pluggable-transports/snowflake.git/client/lib" "git.torproject.org/pluggable-transports/snowflake.git/common/safelog" - "github.com/keroserene/go-webrtc" + "github.com/pion/webrtc" )
const ( @@ -65,6 +65,25 @@ func socksAcceptLoop(ln *pt.SocksListener, snowflakes sf.SnowflakeCollector) err } }
+//s is a comma-separated list of ICE server URLs +func parseIceServers(s string) []webrtc.ICEServer { + var servers []webrtc.ICEServer + log.Println(s) + s = strings.TrimSpace(s) + if len(s) == 0 { + return nil + } + urls := strings.Split(s, ",") + log.Printf("Using ICE Servers:") + for _, url := range urls { + log.Printf("url: %s", url) + servers = append(servers, webrtc.ICEServer{ + URLs: []string{url}, + }) + } + return servers +} + func main() { iceServersCommas := flag.String("ice", "", "comma-separated list of ICE servers") brokerURL := flag.String("url", "", "URL of signaling broker") @@ -75,7 +94,6 @@ func main() { "capacity for number of multiplexed WebRTC peers") flag.Parse()
- webrtc.SetLoggingVerbosity(1) log.SetFlags(log.LstdFlags | log.LUTC)
// Don't write to stderr; versions of tor earlier than about @@ -105,11 +123,7 @@ func main() {
log.Println("\n\n\n --- Starting Snowflake Client ---")
- var iceServers sf.IceServerList - if len(strings.TrimSpace(*iceServersCommas)) > 0 { - option := webrtc.OptionIceServer(*iceServersCommas) - iceServers = append(iceServers, option) - } + iceServers := parseIceServers(*iceServersCommas)
// Prepare to collect remote WebRTC peers. snowflakes := sf.NewPeers(*max)
tor-commits@lists.torproject.org