commit 7a0428e3b11ba437f27d09b1a9ad0fa820e54d24 Author: Cecylia Bocovich cohosh@torproject.org Date: Tue Oct 13 11:06:14 2020 -0400
Refactor proxy to reuse signaling code
Simplify proxy interactions with the broker signaling server and prepare for the introduction of an additional signaling server. --- proxy/proxy-go_test.go | 5 ++- proxy/snowflake.go | 114 +++++++++++++++++++++++++------------------------ 2 files changed, 61 insertions(+), 58 deletions(-)
diff --git a/proxy/proxy-go_test.go b/proxy/proxy-go_test.go index 1218289..e2fb82e 100644 --- a/proxy/proxy-go_test.go +++ b/proxy/proxy-go_test.go @@ -337,7 +337,7 @@ func TestBrokerInteractions(t *testing.T) { const sampleAnswer = `{"type":"answer","sdp":` + sampleSDP + `}`
Convey("Proxy connections to broker", t, func() { - broker := new(Broker) + broker := new(SignalingServer) broker.url, _ = url.Parse("localhost")
//Mock peerConnection @@ -417,7 +417,8 @@ func TestBrokerInteractions(t *testing.T) { } err = broker.sendAnswer("test", pc) So(err, ShouldNotEqual, nil) - So(err.Error(), ShouldResemble, "broker returned 410") + So(err.Error(), ShouldResemble, + "error sending answer to broker: remote returned status code 410")
//Error if we can't parse broker message broker.transport = &MockTransport{ diff --git a/proxy/snowflake.go b/proxy/snowflake.go index 96851ae..276ebed 100644 --- a/proxy/snowflake.go +++ b/proxy/snowflake.go @@ -44,7 +44,7 @@ const dataChannelTimeout = 20 * time.Second
const readLimit = 100000 //Maximum number of bytes to be read from an HTTP request
-var broker *Broker +var broker *SignalingServer var relayURL string
var currentNATType = NATUnknown @@ -110,12 +110,6 @@ func remoteIPFromSDP(str string) net.IP { return nil }
-type Broker struct { - url *url.URL - transport http.RoundTripper - keepLocalAddresses bool -} - type webRTCConn struct { dc *webrtc.DataChannel pc *webrtc.PeerConnection @@ -200,8 +194,33 @@ func limitedRead(r io.Reader, limit int64) ([]byte, error) { return p, err }
-func (b *Broker) pollOffer(sid string) *webrtc.SessionDescription { - brokerPath := b.url.ResolveReference(&url.URL{Path: "proxy"}) +type SignalingServer struct { + url *url.URL + transport http.RoundTripper + keepLocalAddresses bool +} + +func (s *SignalingServer) Post(path string, payload io.Reader) ([]byte, error) { + + req, err := http.NewRequest("POST", path, payload) + if err != nil { + return nil, err + } + resp, err := s.transport.RoundTrip(req) + if err != nil { + return nil, err + } + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("remote returned status code %d", resp.StatusCode) + } + + defer resp.Body.Close() + return limitedRead(resp.Body, readLimit) +} + +func (s *SignalingServer) pollOffer(sid string) *webrtc.SessionDescription { + brokerPath := s.url.ResolveReference(&url.URL{Path: "proxy"}) timeOfNextPoll := time.Now() for { // Sleep until we're scheduled to poll again. @@ -220,45 +239,33 @@ func (b *Broker) pollOffer(sid string) *webrtc.SessionDescription { log.Printf("Error encoding poll message: %s", err.Error()) return nil } - req, _ := http.NewRequest("POST", brokerPath.String(), bytes.NewBuffer(body)) - resp, err := b.transport.RoundTrip(req) + resp, err := s.Post(brokerPath.String(), bytes.NewBuffer(body)) if err != nil { - log.Printf("error polling broker: %s", err) - } else { - defer resp.Body.Close() - if resp.StatusCode != http.StatusOK { - log.Printf("broker returns: %d", resp.StatusCode) - } else { - body, err := limitedRead(resp.Body, readLimit) - if err != nil { - log.Printf("error reading broker response: %s", err) - } else { - - offer, _, err := messages.DecodePollResponse(body) - if err != nil { - log.Printf("error reading broker response: %s", err.Error()) - log.Printf("body: %s", body) - return nil - } - if offer != "" { - offer, err := util.DeserializeSessionDescription(offer) - if err != nil { - log.Printf("Error processing session description: %s", err.Error()) - return nil - } - return offer + log.Printf("error polling broker: %s", err.Error()) + }
- } - } + offer, _, err := messages.DecodePollResponse(resp) + if err != nil { + log.Printf("Error reading broker response: %s", err.Error()) + log.Printf("body: %s", resp) + return nil + } + if offer != "" { + offer, err := util.DeserializeSessionDescription(offer) + if err != nil { + log.Printf("Error processing session description: %s", err.Error()) + return nil } + return offer + } } }
-func (b *Broker) sendAnswer(sid string, pc *webrtc.PeerConnection) error { - brokerPath := b.url.ResolveReference(&url.URL{Path: "answer"}) +func (s *SignalingServer) sendAnswer(sid string, pc *webrtc.PeerConnection) error { + brokerPath := s.url.ResolveReference(&url.URL{Path: "answer"}) ld := pc.LocalDescription() - if !b.keepLocalAddresses { + if !s.keepLocalAddresses { ld = &webrtc.SessionDescription{ Type: ld.Type, SDP: util.StripLocalAddresses(ld.SDP), @@ -272,20 +279,12 @@ func (b *Broker) sendAnswer(sid string, pc *webrtc.PeerConnection) error { if err != nil { return err } - req, _ := http.NewRequest("POST", brokerPath.String(), bytes.NewBuffer(body)) - resp, err := b.transport.RoundTrip(req) + resp, err := s.Post(brokerPath.String(), bytes.NewBuffer(body)) if err != nil { - return err - } - if resp.StatusCode != http.StatusOK { - return fmt.Errorf("broker returned %d", resp.StatusCode) + return fmt.Errorf("error sending answer to broker: %s", err.Error()) }
- body, err = limitedRead(resp.Body, readLimit) - if err != nil { - return fmt.Errorf("error reading broker response: %s", err) - } - success, err := messages.DecodeAnswerResponse(body) + success, err := messages.DecodeAnswerResponse(resp) if err != nil { return err } @@ -327,7 +326,6 @@ func datachannelHandler(conn *webRTCConn, remoteAddr net.Addr) { log.Fatalf("invalid relay url: %s", err) }
- // Retrieve client IP address if remoteAddr != nil { // Encode client IP address in relay URL q := u.Query() @@ -354,7 +352,11 @@ func datachannelHandler(conn *webRTCConn, remoteAddr net.Addr) { // candidates is complete and the answer is available in LocalDescription. // Installs an OnDataChannel callback that creates a webRTCConn and passes it to // datachannelHandler. -func makePeerConnectionFromOffer(sdp *webrtc.SessionDescription, config webrtc.Configuration, dataChan chan struct{}) (*webrtc.PeerConnection, error) { +func makePeerConnectionFromOffer(sdp *webrtc.SessionDescription, + config webrtc.Configuration, + dataChan chan struct{}, + handler func(conn *webRTCConn, remoteAddr net.Addr)) (*webrtc.PeerConnection, error) { + pc, err := webrtc.NewPeerConnection(config) if err != nil { return nil, fmt.Errorf("accept: NewPeerConnection: %s", err) @@ -390,7 +392,7 @@ func makePeerConnectionFromOffer(sdp *webrtc.SessionDescription, config webrtc.C } })
- go datachannelHandler(conn, conn.RemoteAddr()) + go handler(conn, conn.RemoteAddr()) })
err = pc.SetRemoteDescription(*sdp) @@ -433,7 +435,7 @@ func runSession(sid string) { return } dataChan := make(chan struct{}) - pc, err := makePeerConnectionFromOffer(offer, config, dataChan) + pc, err := makePeerConnectionFromOffer(offer, config, dataChan, datachannelHandler) if err != nil { log.Printf("error making WebRTC connection: %s", err) retToken() @@ -500,7 +502,7 @@ func main() { log.Println("starting")
var err error - broker = new(Broker) + broker = new(SignalingServer) broker.keepLocalAddresses = keepLocalAddresses broker.url, err = url.Parse(rawBrokerURL) if err != nil {