commit 7277bb37cd8a96afd8516870cc286b3845fa48bb Author: Cecylia Bocovich cohosh@torproject.org Date: Wed Nov 20 12:41:53 2019 -0500
Update broker--proxy protocol with proxy type
Proxies now include information about what type they are when they poll for client offers. The broker saves this information along with snowflake ids and outputs it on the /debug page. --- broker/broker.go | 26 +++++++++++++++++-------- broker/snowflake-broker_test.go | 14 +++++++------- broker/snowflake-heap.go | 1 + common/messages/proxy.go | 42 ++++++++++++++++++++++++----------------- common/messages/proxy_test.go | 28 +++++++++++++++++++++------ proxy-go/snowflake.go | 2 +- 6 files changed, 74 insertions(+), 39 deletions(-)
diff --git a/broker/broker.go b/broker/broker.go index 13d2575..3edfe84 100644 --- a/broker/broker.go +++ b/broker/broker.go @@ -97,14 +97,16 @@ func (mh MetricsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { // Proxies may poll for client offers concurrently. type ProxyPoll struct { id string + ptype string offerChannel chan []byte }
// Registers a Snowflake and waits for some Client to send an offer, // as part of the polling logic of the proxy handler. -func (ctx *BrokerContext) RequestOffer(id string) []byte { +func (ctx *BrokerContext) RequestOffer(id string, ptype string) []byte { request := new(ProxyPoll) request.id = id + request.ptype = ptype request.offerChannel = make(chan []byte) ctx.proxyPolls <- request // Block until an offer is available, or timeout which sends a nil offer. @@ -117,7 +119,7 @@ func (ctx *BrokerContext) RequestOffer(id string) []byte { // client offer or nil on timeout / none are available. func (ctx *BrokerContext) Broker() { for request := range ctx.proxyPolls { - snowflake := ctx.AddSnowflake(request.id) + snowflake := ctx.AddSnowflake(request.id, request.ptype) // Wait for a client to avail an offer to the snowflake. go func(request *ProxyPoll) { select { @@ -137,10 +139,11 @@ func (ctx *BrokerContext) Broker() { // Create and add a Snowflake to the heap. // Required to keep track of proxies between providing them // with an offer and awaiting their second POST with an answer. -func (ctx *BrokerContext) AddSnowflake(id string) *Snowflake { +func (ctx *BrokerContext) AddSnowflake(id string, ptype string) *Snowflake { snowflake := new(Snowflake) snowflake.id = id snowflake.clients = 0 + snowflake.ptype = ptype snowflake.offerChannel = make(chan []byte) snowflake.answerChannel = make(chan []byte) heap.Push(ctx.snowflakes, snowflake) @@ -159,7 +162,7 @@ func proxyPolls(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) { return }
- sid, err := messages.DecodePollRequest(body) + sid, ptype, err := messages.DecodePollRequest(body) if err != nil { w.WriteHeader(http.StatusBadRequest) return @@ -174,7 +177,7 @@ func proxyPolls(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) { }
// Wait for a client to avail an offer to the snowflake, or timeout if nil. - offer := ctx.RequestOffer(sid) + offer := ctx.RequestOffer(sid, ptype) var b []byte if nil == offer { ctx.metrics.proxyIdleCount++ @@ -286,16 +289,23 @@ func proxyAnswers(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) { func debugHandler(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) { s := fmt.Sprintf("current snowflakes available: %d\n", ctx.snowflakes.Len())
- var browsers, standalones int + var webexts, browsers, standalones, unknowns int for _, snowflake := range ctx.idToSnowflake { - if len(snowflake.id) < 16 { + if snowflake.ptype == "badge" { browsers++ - } else { + } else if snowflake.ptype == "webext" { + webexts++ + } else if snowflake.ptype == "standalone" { standalones++ + } else { + unknowns++ } + } s += fmt.Sprintf("\tstandalone proxies: %d", standalones) s += fmt.Sprintf("\n\tbrowser proxies: %d", browsers) + s += fmt.Sprintf("\n\twebext proxies: %d", webexts) + s += fmt.Sprintf("\n\tunknown proxies: %d", unknowns) if _, err := w.Write([]byte(s)); err != nil { log.Printf("writing proxy information returned error: %v ", err) } diff --git a/broker/snowflake-broker_test.go b/broker/snowflake-broker_test.go index c35c1d6..cb5f34f 100644 --- a/broker/snowflake-broker_test.go +++ b/broker/snowflake-broker_test.go @@ -29,7 +29,7 @@ func TestBroker(t *testing.T) { Convey("Adds Snowflake", func() { So(ctx.snowflakes.Len(), ShouldEqual, 0) So(len(ctx.idToSnowflake), ShouldEqual, 0) - ctx.AddSnowflake("foo") + ctx.AddSnowflake("foo", "") So(ctx.snowflakes.Len(), ShouldEqual, 1) So(len(ctx.idToSnowflake), ShouldEqual, 1) }) @@ -55,7 +55,7 @@ func TestBroker(t *testing.T) { Convey("Request an offer from the Snowflake Heap", func() { done := make(chan []byte) go func() { - offer := ctx.RequestOffer("test") + offer := ctx.RequestOffer("test", "") done <- offer }() request := <-ctx.proxyPolls @@ -79,7 +79,7 @@ func TestBroker(t *testing.T) { Convey("with a proxy answer if available.", func() { done := make(chan bool) // Prepare a fake proxy to respond with. - snowflake := ctx.AddSnowflake("fake") + snowflake := ctx.AddSnowflake("fake", "") go func() { clientOffers(ctx, w, r) done <- true @@ -97,7 +97,7 @@ func TestBroker(t *testing.T) { return } done := make(chan bool) - snowflake := ctx.AddSnowflake("fake") + snowflake := ctx.AddSnowflake("fake", "") go func() { clientOffers(ctx, w, r) // Takes a few seconds here... @@ -147,7 +147,7 @@ func TestBroker(t *testing.T) { })
Convey("Responds to proxy answers...", func() { - s := ctx.AddSnowflake("test") + s := ctx.AddSnowflake("test", "") w := httptest.NewRecorder() data := bytes.NewReader([]byte(`{"Version":"1.0","Sid":"test","Answer":"test"}`))
@@ -211,7 +211,7 @@ func TestBroker(t *testing.T) { // Manually do the Broker goroutine action here for full control. p := <-ctx.proxyPolls So(p.id, ShouldEqual, "ymbcCMto7KHNGYlp") - s := ctx.AddSnowflake(p.id) + s := ctx.AddSnowflake(p.id, "") go func() { offer := <-s.offerChannel p.offerChannel <- offer @@ -449,7 +449,7 @@ func TestMetrics(t *testing.T) { So(err, ShouldBeNil)
// Prepare a fake proxy to respond with. - snowflake := ctx.AddSnowflake("fake") + snowflake := ctx.AddSnowflake("fake", "") go func() { clientOffers(ctx, w, r) done <- true diff --git a/broker/snowflake-heap.go b/broker/snowflake-heap.go index 419956f..cf209ec 100644 --- a/broker/snowflake-heap.go +++ b/broker/snowflake-heap.go @@ -10,6 +10,7 @@ over the offer and answer channels. */ type Snowflake struct { id string + ptype string offerChannel chan []byte answerChannel chan []byte clients int diff --git a/common/messages/proxy.go b/common/messages/proxy.go index 042caf9..7ebab1d 100644 --- a/common/messages/proxy.go +++ b/common/messages/proxy.go @@ -6,16 +6,18 @@ package messages import ( "encoding/json" "fmt" + "strings" )
-const version = "1.0" +const version = "1.1"
-/* Version 1.0 specification: +/* Version 1.1 specification:
== ProxyPollRequest == { - Sid: [generated session id of proxy] - Version: 1.0 + Sid: [generated session id of proxy], + Version: 1.1, + Type: [badge|webext|standalone] }
== ProxyPollResponse == @@ -41,11 +43,11 @@ HTTP 400 BadRequest
== ProxyAnswerRequest == { - Sid: [generated session id of proxy] - Version: 1.0 + Sid: [generated session id of proxy], + Version: 1.1, Answer: { - type: answer + type: answer, sdp: [WebRTC SDP] } } @@ -73,34 +75,38 @@ HTTP 400 BadRequest type ProxyPollRequest struct { Sid string Version string + Type string }
-func EncodePollRequest(sid string) ([]byte, error) { +func EncodePollRequest(sid string, ptype string) ([]byte, error) { return json.Marshal(ProxyPollRequest{ Sid: sid, Version: version, + Type: ptype, }) }
// Decodes a poll message from a snowflake proxy and returns the // sid of the proxy on success and an error if it failed -func DecodePollRequest(data []byte) (string, error) { +func DecodePollRequest(data []byte) (string, string, error) { var message ProxyPollRequest
err := json.Unmarshal(data, &message) if err != nil { - return "", err + return "", "", err } - if message.Version != "1.0" { - return "", fmt.Errorf("using unknown version") + + majorVersion := strings.Split(message.Version, ".")[0] + if majorVersion != "1" { + return "", "", fmt.Errorf("using unknown version") }
- // Version 1.0 requires an Sid + // Version 1.x requires an Sid if message.Sid == "" { - return "", fmt.Errorf("no supplied session id") + return "", "", fmt.Errorf("no supplied session id") }
- return message.Sid, nil + return message.Sid, message.Type, nil }
type ProxyPollResponse struct { @@ -153,7 +159,7 @@ type ProxyAnswerRequest struct {
func EncodeAnswerRequest(answer string, sid string) ([]byte, error) { return json.Marshal(ProxyAnswerRequest{ - Version: "1.0", + Version: "1.1", Sid: sid, Answer: answer, }) @@ -167,7 +173,9 @@ func DecodeAnswerRequest(data []byte) (string, string, error) { if err != nil { return "", "", err } - if message.Version != "1.0" { + + majorVersion := strings.Split(message.Version, ".")[0] + if majorVersion != "1" { return "", "", fmt.Errorf("using unknown version") }
diff --git a/common/messages/proxy_test.go b/common/messages/proxy_test.go index f2f006e..83553a0 100644 --- a/common/messages/proxy_test.go +++ b/common/messages/proxy_test.go @@ -11,45 +11,60 @@ import ( func TestDecodeProxyPollRequest(t *testing.T) { Convey("Context", t, func() { for _, test := range []struct { - sid string - data string - err error + sid string + ptype string + data string + err error }{ { //Version 1.0 proxy message "ymbcCMto7KHNGYlp", + "", `{"Sid":"ymbcCMto7KHNGYlp","Version":"1.0"}`, nil, }, { + //Version 1.1 proxy message + "ymbcCMto7KHNGYlp", + "standalone", + `{"Sid":"ymbcCMto7KHNGYlp","Version":"1.1","Type":"standalone"}`, + nil, + }, + { //Version 0.X proxy message: "", + "", "ymbcCMto7KHNGYlp", &json.SyntaxError{}, }, { "", + "", `{"Sid":"ymbcCMto7KHNGYlp"}`, fmt.Errorf(""), }, { "", + "", "{}", fmt.Errorf(""), }, { "", + "", `{"Version":"1.0"}`, fmt.Errorf(""), }, { "", + "", `{"Version":"2.0"}`, fmt.Errorf(""), }, } { - sid, err := DecodePollRequest([]byte(test.data)) + sid, ptype, err := DecodePollRequest([]byte(test.data)) So(sid, ShouldResemble, test.sid) + So(ptype, ShouldResemble, test.ptype) So(err, ShouldHaveSameTypeAs, test.err) }
@@ -58,10 +73,11 @@ func TestDecodeProxyPollRequest(t *testing.T) {
func TestEncodeProxyPollRequests(t *testing.T) { Convey("Context", t, func() { - b, err := EncodePollRequest("ymbcCMto7KHNGYlp") + b, err := EncodePollRequest("ymbcCMto7KHNGYlp", "standalone") So(err, ShouldEqual, nil) - sid, err := DecodePollRequest(b) + sid, ptype, err := DecodePollRequest(b) So(sid, ShouldEqual, "ymbcCMto7KHNGYlp") + So(ptype, ShouldEqual, "standalone") So(err, ShouldEqual, nil) }) } diff --git a/proxy-go/snowflake.go b/proxy-go/snowflake.go index c10093a..dce7b70 100644 --- a/proxy-go/snowflake.go +++ b/proxy-go/snowflake.go @@ -175,7 +175,7 @@ func (b *Broker) pollOffer(sid string) *webrtc.SessionDescription { timeOfNextPoll = now }
- body, err := messages.EncodePollRequest(sid) + body, err := messages.EncodePollRequest(sid, "standalone") if err != nil { log.Printf("Error encoding poll message: %s", err.Error()) return nil