[tor-commits] [snowflake/master] Refactor proxy to reuse signaling code

cohosh at torproject.org cohosh at torproject.org
Thu Oct 29 15:04:28 UTC 2020


commit 7a0428e3b11ba437f27d09b1a9ad0fa820e54d24
Author: Cecylia Bocovich <cohosh at torproject.org>
Date:   Tue Oct 13 11:06:14 2020 -0400

    Refactor proxy to reuse signaling code
    
    Simplify proxy interactions with the broker signaling server and prepare
    for the introduction of an additional signaling server.
---
 proxy/proxy-go_test.go |   5 ++-
 proxy/snowflake.go     | 114 +++++++++++++++++++++++++------------------------
 2 files changed, 61 insertions(+), 58 deletions(-)

diff --git a/proxy/proxy-go_test.go b/proxy/proxy-go_test.go
index 1218289..e2fb82e 100644
--- a/proxy/proxy-go_test.go
+++ b/proxy/proxy-go_test.go
@@ -337,7 +337,7 @@ func TestBrokerInteractions(t *testing.T) {
 	const sampleAnswer = `{"type":"answer","sdp":` + sampleSDP + `}`
 
 	Convey("Proxy connections to broker", t, func() {
-		broker := new(Broker)
+		broker := new(SignalingServer)
 		broker.url, _ = url.Parse("localhost")
 
 		//Mock peerConnection
@@ -417,7 +417,8 @@ func TestBrokerInteractions(t *testing.T) {
 			}
 			err = broker.sendAnswer("test", pc)
 			So(err, ShouldNotEqual, nil)
-			So(err.Error(), ShouldResemble, "broker returned 410")
+			So(err.Error(), ShouldResemble,
+				"error sending answer to broker: remote returned status code 410")
 
 			//Error if we can't parse broker message
 			broker.transport = &MockTransport{
diff --git a/proxy/snowflake.go b/proxy/snowflake.go
index 96851ae..276ebed 100644
--- a/proxy/snowflake.go
+++ b/proxy/snowflake.go
@@ -44,7 +44,7 @@ const dataChannelTimeout = 20 * time.Second
 
 const readLimit = 100000 //Maximum number of bytes to be read from an HTTP request
 
-var broker *Broker
+var broker *SignalingServer
 var relayURL string
 
 var currentNATType = NATUnknown
@@ -110,12 +110,6 @@ func remoteIPFromSDP(str string) net.IP {
 	return nil
 }
 
-type Broker struct {
-	url                *url.URL
-	transport          http.RoundTripper
-	keepLocalAddresses bool
-}
-
 type webRTCConn struct {
 	dc *webrtc.DataChannel
 	pc *webrtc.PeerConnection
@@ -200,8 +194,33 @@ func limitedRead(r io.Reader, limit int64) ([]byte, error) {
 	return p, err
 }
 
-func (b *Broker) pollOffer(sid string) *webrtc.SessionDescription {
-	brokerPath := b.url.ResolveReference(&url.URL{Path: "proxy"})
+type SignalingServer struct {
+	url                *url.URL
+	transport          http.RoundTripper
+	keepLocalAddresses bool
+}
+
+func (s *SignalingServer) Post(path string, payload io.Reader) ([]byte, error) {
+
+	req, err := http.NewRequest("POST", path, payload)
+	if err != nil {
+		return nil, err
+	}
+	resp, err := s.transport.RoundTrip(req)
+	if err != nil {
+		return nil, err
+	}
+
+	if resp.StatusCode != http.StatusOK {
+		return nil, fmt.Errorf("remote returned status code %d", resp.StatusCode)
+	}
+
+	defer resp.Body.Close()
+	return limitedRead(resp.Body, readLimit)
+}
+
+func (s *SignalingServer) pollOffer(sid string) *webrtc.SessionDescription {
+	brokerPath := s.url.ResolveReference(&url.URL{Path: "proxy"})
 	timeOfNextPoll := time.Now()
 	for {
 		// Sleep until we're scheduled to poll again.
@@ -220,45 +239,33 @@ func (b *Broker) pollOffer(sid string) *webrtc.SessionDescription {
 			log.Printf("Error encoding poll message: %s", err.Error())
 			return nil
 		}
-		req, _ := http.NewRequest("POST", brokerPath.String(), bytes.NewBuffer(body))
-		resp, err := b.transport.RoundTrip(req)
+		resp, err := s.Post(brokerPath.String(), bytes.NewBuffer(body))
 		if err != nil {
-			log.Printf("error polling broker: %s", err)
-		} else {
-			defer resp.Body.Close()
-			if resp.StatusCode != http.StatusOK {
-				log.Printf("broker returns: %d", resp.StatusCode)
-			} else {
-				body, err := limitedRead(resp.Body, readLimit)
-				if err != nil {
-					log.Printf("error reading broker response: %s", err)
-				} else {
-
-					offer, _, err := messages.DecodePollResponse(body)
-					if err != nil {
-						log.Printf("error reading broker response: %s", err.Error())
-						log.Printf("body: %s", body)
-						return nil
-					}
-					if offer != "" {
-						offer, err := util.DeserializeSessionDescription(offer)
-						if err != nil {
-							log.Printf("Error processing session description: %s", err.Error())
-							return nil
-						}
-						return offer
+			log.Printf("error polling broker: %s", err.Error())
+		}
 
-					}
-				}
+		offer, _, err := messages.DecodePollResponse(resp)
+		if err != nil {
+			log.Printf("Error reading broker response: %s", err.Error())
+			log.Printf("body: %s", resp)
+			return nil
+		}
+		if offer != "" {
+			offer, err := util.DeserializeSessionDescription(offer)
+			if err != nil {
+				log.Printf("Error processing session description: %s", err.Error())
+				return nil
 			}
+			return offer
+
 		}
 	}
 }
 
-func (b *Broker) sendAnswer(sid string, pc *webrtc.PeerConnection) error {
-	brokerPath := b.url.ResolveReference(&url.URL{Path: "answer"})
+func (s *SignalingServer) sendAnswer(sid string, pc *webrtc.PeerConnection) error {
+	brokerPath := s.url.ResolveReference(&url.URL{Path: "answer"})
 	ld := pc.LocalDescription()
-	if !b.keepLocalAddresses {
+	if !s.keepLocalAddresses {
 		ld = &webrtc.SessionDescription{
 			Type: ld.Type,
 			SDP:  util.StripLocalAddresses(ld.SDP),
@@ -272,20 +279,12 @@ func (b *Broker) sendAnswer(sid string, pc *webrtc.PeerConnection) error {
 	if err != nil {
 		return err
 	}
-	req, _ := http.NewRequest("POST", brokerPath.String(), bytes.NewBuffer(body))
-	resp, err := b.transport.RoundTrip(req)
+	resp, err := s.Post(brokerPath.String(), bytes.NewBuffer(body))
 	if err != nil {
-		return err
-	}
-	if resp.StatusCode != http.StatusOK {
-		return fmt.Errorf("broker returned %d", resp.StatusCode)
+		return fmt.Errorf("error sending answer to broker: %s", err.Error())
 	}
 
-	body, err = limitedRead(resp.Body, readLimit)
-	if err != nil {
-		return fmt.Errorf("error reading broker response: %s", err)
-	}
-	success, err := messages.DecodeAnswerResponse(body)
+	success, err := messages.DecodeAnswerResponse(resp)
 	if err != nil {
 		return err
 	}
@@ -327,7 +326,6 @@ func datachannelHandler(conn *webRTCConn, remoteAddr net.Addr) {
 		log.Fatalf("invalid relay url: %s", err)
 	}
 
-	// Retrieve client IP address
 	if remoteAddr != nil {
 		// Encode client IP address in relay URL
 		q := u.Query()
@@ -354,7 +352,11 @@ func datachannelHandler(conn *webRTCConn, remoteAddr net.Addr) {
 // candidates is complete and the answer is available in LocalDescription.
 // Installs an OnDataChannel callback that creates a webRTCConn and passes it to
 // datachannelHandler.
-func makePeerConnectionFromOffer(sdp *webrtc.SessionDescription, config webrtc.Configuration, dataChan chan struct{}) (*webrtc.PeerConnection, error) {
+func makePeerConnectionFromOffer(sdp *webrtc.SessionDescription,
+	config webrtc.Configuration,
+	dataChan chan struct{},
+	handler func(conn *webRTCConn, remoteAddr net.Addr)) (*webrtc.PeerConnection, error) {
+
 	pc, err := webrtc.NewPeerConnection(config)
 	if err != nil {
 		return nil, fmt.Errorf("accept: NewPeerConnection: %s", err)
@@ -390,7 +392,7 @@ func makePeerConnectionFromOffer(sdp *webrtc.SessionDescription, config webrtc.C
 			}
 		})
 
-		go datachannelHandler(conn, conn.RemoteAddr())
+		go handler(conn, conn.RemoteAddr())
 	})
 
 	err = pc.SetRemoteDescription(*sdp)
@@ -433,7 +435,7 @@ func runSession(sid string) {
 		return
 	}
 	dataChan := make(chan struct{})
-	pc, err := makePeerConnectionFromOffer(offer, config, dataChan)
+	pc, err := makePeerConnectionFromOffer(offer, config, dataChan, datachannelHandler)
 	if err != nil {
 		log.Printf("error making WebRTC connection: %s", err)
 		retToken()
@@ -500,7 +502,7 @@ func main() {
 	log.Println("starting")
 
 	var err error
-	broker = new(Broker)
+	broker = new(SignalingServer)
 	broker.keepLocalAddresses = keepLocalAddresses
 	broker.url, err = url.Parse(rawBrokerURL)
 	if err != nil {





More information about the tor-commits mailing list