commit 0052c0e10cfd8e270a57d85711064e8d9e064bf5 Author: Cecylia Bocovich cohosh@torproject.org Date: Tue Jun 16 17:49:39 2020 -0400
Add a new heap at the broker for restricted flakes
Now when proxies poll, they provide their NAT type to the broker. This introduces a new snowflake heap of just restricted snowflakes that the broker can pull from if the client has a known, unrestricted NAT. All other clients will pull from a heap of snowflakes with unrestricted or unknown NAT topologies. --- broker/broker.go | 67 ++++++++++++++++++++++++++++++----------- broker/snowflake-broker_test.go | 14 ++++----- 2 files changed, 57 insertions(+), 24 deletions(-)
diff --git a/broker/broker.go b/broker/broker.go index 2d3cd4b..9297980 100644 --- a/broker/broker.go +++ b/broker/broker.go @@ -31,12 +31,18 @@ const ( ClientTimeout = 10 ProxyTimeout = 10 readLimit = 100000 //Maximum number of bytes to be read from an HTTP request + + NATUnknown = "unknown" + NATRestricted = "restricted" + NATUnrestricted = "unrestricted" )
type BrokerContext struct { - snowflakes *SnowflakeHeap - // Map keeping track of snowflakeIDs required to match SDP answers from - // the second http POST. + snowflakes *SnowflakeHeap + restrictedSnowflakes *SnowflakeHeap + // Maps keeping track of snowflakeIDs required to match SDP answers from + // the second http POST. Restricted snowflakes can only be matched up with + // clients behind an unrestricted NAT. idToSnowflake map[string]*Snowflake // Synchronization for the snowflake map and heap snowflakeLock sync.Mutex @@ -47,6 +53,8 @@ type BrokerContext struct { func NewBrokerContext(metricsLogger *log.Logger) *BrokerContext { snowflakes := new(SnowflakeHeap) heap.Init(snowflakes) + rSnowflakes := new(SnowflakeHeap) + heap.Init(rSnowflakes) metrics, err := NewMetrics(metricsLogger)
if err != nil { @@ -58,10 +66,11 @@ func NewBrokerContext(metricsLogger *log.Logger) *BrokerContext { }
return &BrokerContext{ - snowflakes: snowflakes, - idToSnowflake: make(map[string]*Snowflake), - proxyPolls: make(chan *ProxyPoll), - metrics: metrics, + snowflakes: snowflakes, + restrictedSnowflakes: rSnowflakes, + idToSnowflake: make(map[string]*Snowflake), + proxyPolls: make(chan *ProxyPoll), + metrics: metrics, } }
@@ -79,7 +88,7 @@ type MetricsHandler struct {
func (sh SnowflakeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { w.Header().Set("Access-Control-Allow-Origin", "*") - w.Header().Set("Access-Control-Allow-Headers", "Origin, X-Session-ID") + w.Header().Set("Access-Control-Allow-Headers", "Origin, X-Session-ID, Snowflake-NAT-Type") // Return early if it's CORS preflight. if "OPTIONS" == r.Method { return @@ -101,15 +110,17 @@ func (mh MetricsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { type ProxyPoll struct { id string proxyType string + natType string offerChannel chan []byte }
// Registers a Snowflake and waits for some Client to send an offer, // as part of the polling logic of the proxy handler. -func (ctx *BrokerContext) RequestOffer(id string, proxyType string) []byte { +func (ctx *BrokerContext) RequestOffer(id string, proxyType string, natType string) []byte { request := new(ProxyPoll) request.id = id request.proxyType = proxyType + request.natType = natType request.offerChannel = make(chan []byte) ctx.proxyPolls <- request // Block until an offer is available, or timeout which sends a nil offer. @@ -122,7 +133,7 @@ func (ctx *BrokerContext) RequestOffer(id string, proxyType string) []byte { // client offer or nil on timeout / none are available. func (ctx *BrokerContext) Broker() { for request := range ctx.proxyPolls { - snowflake := ctx.AddSnowflake(request.id, request.proxyType) + snowflake := ctx.AddSnowflake(request.id, request.proxyType, request.natType) // Wait for a client to avail an offer to the snowflake. go func(request *ProxyPoll) { select { @@ -133,7 +144,11 @@ func (ctx *BrokerContext) Broker() { ctx.snowflakeLock.Lock() defer ctx.snowflakeLock.Unlock() if snowflake.index != -1 { - heap.Remove(ctx.snowflakes, snowflake.index) + if request.natType == NATRestricted { + heap.Remove(ctx.restrictedSnowflakes, snowflake.index) + } else { + heap.Remove(ctx.snowflakes, snowflake.index) + } delete(ctx.idToSnowflake, snowflake.id) close(request.offerChannel) } @@ -145,7 +160,7 @@ func (ctx *BrokerContext) Broker() { // Create and add a Snowflake to the heap. // Required to keep track of proxies between providing them // with an offer and awaiting their second POST with an answer. -func (ctx *BrokerContext) AddSnowflake(id string, proxyType string) *Snowflake { +func (ctx *BrokerContext) AddSnowflake(id string, proxyType string, natType string) *Snowflake { snowflake := new(Snowflake) snowflake.id = id snowflake.clients = 0 @@ -153,7 +168,11 @@ func (ctx *BrokerContext) AddSnowflake(id string, proxyType string) *Snowflake { snowflake.offerChannel = make(chan []byte) snowflake.answerChannel = make(chan []byte) ctx.snowflakeLock.Lock() - heap.Push(ctx.snowflakes, snowflake) + if natType == NATRestricted { + heap.Push(ctx.restrictedSnowflakes, snowflake) + } else { + heap.Push(ctx.snowflakes, snowflake) + } ctx.snowflakeLock.Unlock() ctx.idToSnowflake[id] = snowflake return snowflake @@ -170,7 +189,7 @@ func proxyPolls(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) { return }
- sid, proxyType, _, err := messages.DecodePollRequest(body) + sid, proxyType, natType, err := messages.DecodePollRequest(body) if err != nil { w.WriteHeader(http.StatusBadRequest) return @@ -187,7 +206,7 @@ func proxyPolls(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) { }
// Wait for a client to avail an offer to the snowflake, or timeout if nil. - offer := ctx.RequestOffer(sid, proxyType) + offer := ctx.RequestOffer(sid, proxyType, natType) var b []byte if nil == offer { ctx.metrics.lock.Lock() @@ -226,9 +245,23 @@ func clientOffers(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusBadRequest) return } + + natType := r.Header.Get("Snowflake-NAT-Type") + if natType == "" { + natType = NATUnknown + } + + // Only hand out known restricted snowflakes to unrestricted clients + var snowflakeHeap *SnowflakeHeap + if natType == NATUnrestricted { + snowflakeHeap = ctx.restrictedSnowflakes + } else { + snowflakeHeap = ctx.snowflakes + } + // Immediately fail if there are no snowflakes available. ctx.snowflakeLock.Lock() - numSnowflakes := ctx.snowflakes.Len() + numSnowflakes := snowflakeHeap.Len() ctx.snowflakeLock.Unlock() if numSnowflakes <= 0 { ctx.metrics.lock.Lock() @@ -240,7 +273,7 @@ func clientOffers(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) { // Otherwise, find the most available snowflake proxy, and pass the offer to it. // Delete must be deferred in order to correctly process answer request later. ctx.snowflakeLock.Lock() - snowflake := heap.Pop(ctx.snowflakes).(*Snowflake) + snowflake := heap.Pop(snowflakeHeap).(*Snowflake) ctx.snowflakeLock.Unlock() snowflake.offerChannel <- offer
diff --git a/broker/snowflake-broker_test.go b/broker/snowflake-broker_test.go index 18b83dd..91383a1 100644 --- a/broker/snowflake-broker_test.go +++ b/broker/snowflake-broker_test.go @@ -29,7 +29,7 @@ func TestBroker(t *testing.T) { Convey("Adds Snowflake", func() { So(ctx.snowflakes.Len(), ShouldEqual, 0) So(len(ctx.idToSnowflake), ShouldEqual, 0) - ctx.AddSnowflake("foo", "") + ctx.AddSnowflake("foo", "", NATUnknown) So(ctx.snowflakes.Len(), ShouldEqual, 1) So(len(ctx.idToSnowflake), ShouldEqual, 1) }) @@ -55,7 +55,7 @@ func TestBroker(t *testing.T) { Convey("Request an offer from the Snowflake Heap", func() { done := make(chan []byte) go func() { - offer := ctx.RequestOffer("test", "") + offer := ctx.RequestOffer("test", "", NATUnknown) done <- offer }() request := <-ctx.proxyPolls @@ -79,7 +79,7 @@ func TestBroker(t *testing.T) { Convey("with a proxy answer if available.", func() { done := make(chan bool) // Prepare a fake proxy to respond with. - snowflake := ctx.AddSnowflake("fake", "") + snowflake := ctx.AddSnowflake("fake", "", NATUnknown) go func() { clientOffers(ctx, w, r) done <- true @@ -97,7 +97,7 @@ func TestBroker(t *testing.T) { return } done := make(chan bool) - snowflake := ctx.AddSnowflake("fake", "") + snowflake := ctx.AddSnowflake("fake", "", NATUnknown) go func() { clientOffers(ctx, w, r) // Takes a few seconds here... @@ -147,7 +147,7 @@ func TestBroker(t *testing.T) { })
Convey("Responds to proxy answers...", func() { - s := ctx.AddSnowflake("test", "") + s := ctx.AddSnowflake("test", "", NATUnknown) w := httptest.NewRecorder() data := bytes.NewReader([]byte(`{"Version":"1.0","Sid":"test","Answer":"test"}`))
@@ -260,7 +260,7 @@ func TestBroker(t *testing.T) { // Manually do the Broker goroutine action here for full control. p := <-ctx.proxyPolls So(p.id, ShouldEqual, "ymbcCMto7KHNGYlp") - s := ctx.AddSnowflake(p.id, "") + s := ctx.AddSnowflake(p.id, "", NATUnknown) go func() { offer := <-s.offerChannel p.offerChannel <- offer @@ -537,7 +537,7 @@ func TestMetrics(t *testing.T) { So(err, ShouldBeNil)
// Prepare a fake proxy to respond with. - snowflake := ctx.AddSnowflake("fake", "") + snowflake := ctx.AddSnowflake("fake", "", NATUnknown) go func() { clientOffers(ctx, w, r) done <- true