commit 7a0428e3b11ba437f27d09b1a9ad0fa820e54d24
Author: Cecylia Bocovich <cohosh(a)torproject.org>
Date: Tue Oct 13 11:06:14 2020 -0400
Refactor proxy to reuse signaling code
Simplify proxy interactions with the broker signaling server and prepare
for the introduction of an additional signaling server.
---
proxy/proxy-go_test.go | 5 ++-
proxy/snowflake.go | 114 +++++++++++++++++++++++++------------------------
2 files changed, 61 insertions(+), 58 deletions(-)
diff --git a/proxy/proxy-go_test.go b/proxy/proxy-go_test.go
index 1218289..e2fb82e 100644
--- a/proxy/proxy-go_test.go
+++ b/proxy/proxy-go_test.go
@@ -337,7 +337,7 @@ func TestBrokerInteractions(t *testing.T) {
const sampleAnswer = `{"type":"answer","sdp":` + sampleSDP + `}`
Convey("Proxy connections to broker", t, func() {
- broker := new(Broker)
+ broker := new(SignalingServer)
broker.url, _ = url.Parse("localhost")
//Mock peerConnection
@@ -417,7 +417,8 @@ func TestBrokerInteractions(t *testing.T) {
}
err = broker.sendAnswer("test", pc)
So(err, ShouldNotEqual, nil)
- So(err.Error(), ShouldResemble, "broker returned 410")
+ So(err.Error(), ShouldResemble,
+ "error sending answer to broker: remote returned status code 410")
//Error if we can't parse broker message
broker.transport = &MockTransport{
diff --git a/proxy/snowflake.go b/proxy/snowflake.go
index 96851ae..276ebed 100644
--- a/proxy/snowflake.go
+++ b/proxy/snowflake.go
@@ -44,7 +44,7 @@ const dataChannelTimeout = 20 * time.Second
const readLimit = 100000 //Maximum number of bytes to be read from an HTTP request
-var broker *Broker
+var broker *SignalingServer
var relayURL string
var currentNATType = NATUnknown
@@ -110,12 +110,6 @@ func remoteIPFromSDP(str string) net.IP {
return nil
}
-type Broker struct {
- url *url.URL
- transport http.RoundTripper
- keepLocalAddresses bool
-}
-
type webRTCConn struct {
dc *webrtc.DataChannel
pc *webrtc.PeerConnection
@@ -200,8 +194,33 @@ func limitedRead(r io.Reader, limit int64) ([]byte, error) {
return p, err
}
-func (b *Broker) pollOffer(sid string) *webrtc.SessionDescription {
- brokerPath := b.url.ResolveReference(&url.URL{Path: "proxy"})
+type SignalingServer struct {
+ url *url.URL
+ transport http.RoundTripper
+ keepLocalAddresses bool
+}
+
+func (s *SignalingServer) Post(path string, payload io.Reader) ([]byte, error) {
+
+ req, err := http.NewRequest("POST", path, payload)
+ if err != nil {
+ return nil, err
+ }
+ resp, err := s.transport.RoundTrip(req)
+ if err != nil {
+ return nil, err
+ }
+
+ if resp.StatusCode != http.StatusOK {
+ return nil, fmt.Errorf("remote returned status code %d", resp.StatusCode)
+ }
+
+ defer resp.Body.Close()
+ return limitedRead(resp.Body, readLimit)
+}
+
+func (s *SignalingServer) pollOffer(sid string) *webrtc.SessionDescription {
+ brokerPath := s.url.ResolveReference(&url.URL{Path: "proxy"})
timeOfNextPoll := time.Now()
for {
// Sleep until we're scheduled to poll again.
@@ -220,45 +239,33 @@ func (b *Broker) pollOffer(sid string) *webrtc.SessionDescription {
log.Printf("Error encoding poll message: %s", err.Error())
return nil
}
- req, _ := http.NewRequest("POST", brokerPath.String(), bytes.NewBuffer(body))
- resp, err := b.transport.RoundTrip(req)
+ resp, err := s.Post(brokerPath.String(), bytes.NewBuffer(body))
if err != nil {
- log.Printf("error polling broker: %s", err)
- } else {
- defer resp.Body.Close()
- if resp.StatusCode != http.StatusOK {
- log.Printf("broker returns: %d", resp.StatusCode)
- } else {
- body, err := limitedRead(resp.Body, readLimit)
- if err != nil {
- log.Printf("error reading broker response: %s", err)
- } else {
-
- offer, _, err := messages.DecodePollResponse(body)
- if err != nil {
- log.Printf("error reading broker response: %s", err.Error())
- log.Printf("body: %s", body)
- return nil
- }
- if offer != "" {
- offer, err := util.DeserializeSessionDescription(offer)
- if err != nil {
- log.Printf("Error processing session description: %s", err.Error())
- return nil
- }
- return offer
+ log.Printf("error polling broker: %s", err.Error())
+ }
- }
- }
+ offer, _, err := messages.DecodePollResponse(resp)
+ if err != nil {
+ log.Printf("Error reading broker response: %s", err.Error())
+ log.Printf("body: %s", resp)
+ return nil
+ }
+ if offer != "" {
+ offer, err := util.DeserializeSessionDescription(offer)
+ if err != nil {
+ log.Printf("Error processing session description: %s", err.Error())
+ return nil
}
+ return offer
+
}
}
}
-func (b *Broker) sendAnswer(sid string, pc *webrtc.PeerConnection) error {
- brokerPath := b.url.ResolveReference(&url.URL{Path: "answer"})
+func (s *SignalingServer) sendAnswer(sid string, pc *webrtc.PeerConnection) error {
+ brokerPath := s.url.ResolveReference(&url.URL{Path: "answer"})
ld := pc.LocalDescription()
- if !b.keepLocalAddresses {
+ if !s.keepLocalAddresses {
ld = &webrtc.SessionDescription{
Type: ld.Type,
SDP: util.StripLocalAddresses(ld.SDP),
@@ -272,20 +279,12 @@ func (b *Broker) sendAnswer(sid string, pc *webrtc.PeerConnection) error {
if err != nil {
return err
}
- req, _ := http.NewRequest("POST", brokerPath.String(), bytes.NewBuffer(body))
- resp, err := b.transport.RoundTrip(req)
+ resp, err := s.Post(brokerPath.String(), bytes.NewBuffer(body))
if err != nil {
- return err
- }
- if resp.StatusCode != http.StatusOK {
- return fmt.Errorf("broker returned %d", resp.StatusCode)
+ return fmt.Errorf("error sending answer to broker: %s", err.Error())
}
- body, err = limitedRead(resp.Body, readLimit)
- if err != nil {
- return fmt.Errorf("error reading broker response: %s", err)
- }
- success, err := messages.DecodeAnswerResponse(body)
+ success, err := messages.DecodeAnswerResponse(resp)
if err != nil {
return err
}
@@ -327,7 +326,6 @@ func datachannelHandler(conn *webRTCConn, remoteAddr net.Addr) {
log.Fatalf("invalid relay url: %s", err)
}
- // Retrieve client IP address
if remoteAddr != nil {
// Encode client IP address in relay URL
q := u.Query()
@@ -354,7 +352,11 @@ func datachannelHandler(conn *webRTCConn, remoteAddr net.Addr) {
// candidates is complete and the answer is available in LocalDescription.
// Installs an OnDataChannel callback that creates a webRTCConn and passes it to
// datachannelHandler.
-func makePeerConnectionFromOffer(sdp *webrtc.SessionDescription, config webrtc.Configuration, dataChan chan struct{}) (*webrtc.PeerConnection, error) {
+func makePeerConnectionFromOffer(sdp *webrtc.SessionDescription,
+ config webrtc.Configuration,
+ dataChan chan struct{},
+ handler func(conn *webRTCConn, remoteAddr net.Addr)) (*webrtc.PeerConnection, error) {
+
pc, err := webrtc.NewPeerConnection(config)
if err != nil {
return nil, fmt.Errorf("accept: NewPeerConnection: %s", err)
@@ -390,7 +392,7 @@ func makePeerConnectionFromOffer(sdp *webrtc.SessionDescription, config webrtc.C
}
})
- go datachannelHandler(conn, conn.RemoteAddr())
+ go handler(conn, conn.RemoteAddr())
})
err = pc.SetRemoteDescription(*sdp)
@@ -433,7 +435,7 @@ func runSession(sid string) {
return
}
dataChan := make(chan struct{})
- pc, err := makePeerConnectionFromOffer(offer, config, dataChan)
+ pc, err := makePeerConnectionFromOffer(offer, config, dataChan, datachannelHandler)
if err != nil {
log.Printf("error making WebRTC connection: %s", err)
retToken()
@@ -500,7 +502,7 @@ func main() {
log.Println("starting")
var err error
- broker = new(Broker)
+ broker = new(SignalingServer)
broker.keepLocalAddresses = keepLocalAddresses
broker.url, err = url.Parse(rawBrokerURL)
if err != nil {