[tor-commits] [snowflake/master] Add a new heap at the broker for restricted flakes

cohosh at torproject.org cohosh at torproject.org
Mon Jul 6 17:27:06 UTC 2020


commit 0052c0e10cfd8e270a57d85711064e8d9e064bf5
Author: Cecylia Bocovich <cohosh at torproject.org>
Date:   Tue Jun 16 17:49:39 2020 -0400

    Add a new heap at the broker for restricted flakes
    
    Now when proxies poll, they provide their NAT type to the broker. This
    introduces a new snowflake heap of just restricted snowflakes that the
    broker can pull from if the client has a known, unrestricted NAT. All
    other clients will pull from a heap of snowflakes with unrestricted or
    unknown NAT topologies.
---
 broker/broker.go                | 67 ++++++++++++++++++++++++++++++-----------
 broker/snowflake-broker_test.go | 14 ++++-----
 2 files changed, 57 insertions(+), 24 deletions(-)

diff --git a/broker/broker.go b/broker/broker.go
index 2d3cd4b..9297980 100644
--- a/broker/broker.go
+++ b/broker/broker.go
@@ -31,12 +31,18 @@ const (
 	ClientTimeout = 10
 	ProxyTimeout  = 10
 	readLimit     = 100000 //Maximum number of bytes to be read from an HTTP request
+
+	NATUnknown      = "unknown"
+	NATRestricted   = "restricted"
+	NATUnrestricted = "unrestricted"
 )
 
 type BrokerContext struct {
-	snowflakes *SnowflakeHeap
-	// Map keeping track of snowflakeIDs required to match SDP answers from
-	// the second http POST.
+	snowflakes           *SnowflakeHeap
+	restrictedSnowflakes *SnowflakeHeap
+	// Maps keeping track of snowflakeIDs required to match SDP answers from
+	// the second http POST. Restricted snowflakes can only be matched up with
+	// clients behind an unrestricted NAT.
 	idToSnowflake map[string]*Snowflake
 	// Synchronization for the snowflake map and heap
 	snowflakeLock sync.Mutex
@@ -47,6 +53,8 @@ type BrokerContext struct {
 func NewBrokerContext(metricsLogger *log.Logger) *BrokerContext {
 	snowflakes := new(SnowflakeHeap)
 	heap.Init(snowflakes)
+	rSnowflakes := new(SnowflakeHeap)
+	heap.Init(rSnowflakes)
 	metrics, err := NewMetrics(metricsLogger)
 
 	if err != nil {
@@ -58,10 +66,11 @@ func NewBrokerContext(metricsLogger *log.Logger) *BrokerContext {
 	}
 
 	return &BrokerContext{
-		snowflakes:    snowflakes,
-		idToSnowflake: make(map[string]*Snowflake),
-		proxyPolls:    make(chan *ProxyPoll),
-		metrics:       metrics,
+		snowflakes:           snowflakes,
+		restrictedSnowflakes: rSnowflakes,
+		idToSnowflake:        make(map[string]*Snowflake),
+		proxyPolls:           make(chan *ProxyPoll),
+		metrics:              metrics,
 	}
 }
 
@@ -79,7 +88,7 @@ type MetricsHandler struct {
 
 func (sh SnowflakeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 	w.Header().Set("Access-Control-Allow-Origin", "*")
-	w.Header().Set("Access-Control-Allow-Headers", "Origin, X-Session-ID")
+	w.Header().Set("Access-Control-Allow-Headers", "Origin, X-Session-ID, Snowflake-NAT-Type")
 	// Return early if it's CORS preflight.
 	if "OPTIONS" == r.Method {
 		return
@@ -101,15 +110,17 @@ func (mh MetricsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 type ProxyPoll struct {
 	id           string
 	proxyType    string
+	natType      string
 	offerChannel chan []byte
 }
 
 // Registers a Snowflake and waits for some Client to send an offer,
 // as part of the polling logic of the proxy handler.
-func (ctx *BrokerContext) RequestOffer(id string, proxyType string) []byte {
+func (ctx *BrokerContext) RequestOffer(id string, proxyType string, natType string) []byte {
 	request := new(ProxyPoll)
 	request.id = id
 	request.proxyType = proxyType
+	request.natType = natType
 	request.offerChannel = make(chan []byte)
 	ctx.proxyPolls <- request
 	// Block until an offer is available, or timeout which sends a nil offer.
@@ -122,7 +133,7 @@ func (ctx *BrokerContext) RequestOffer(id string, proxyType string) []byte {
 // client offer or nil on timeout / none are available.
 func (ctx *BrokerContext) Broker() {
 	for request := range ctx.proxyPolls {
-		snowflake := ctx.AddSnowflake(request.id, request.proxyType)
+		snowflake := ctx.AddSnowflake(request.id, request.proxyType, request.natType)
 		// Wait for a client to avail an offer to the snowflake.
 		go func(request *ProxyPoll) {
 			select {
@@ -133,7 +144,11 @@ func (ctx *BrokerContext) Broker() {
 				ctx.snowflakeLock.Lock()
 				defer ctx.snowflakeLock.Unlock()
 				if snowflake.index != -1 {
-					heap.Remove(ctx.snowflakes, snowflake.index)
+					if request.natType == NATRestricted {
+						heap.Remove(ctx.restrictedSnowflakes, snowflake.index)
+					} else {
+						heap.Remove(ctx.snowflakes, snowflake.index)
+					}
 					delete(ctx.idToSnowflake, snowflake.id)
 					close(request.offerChannel)
 				}
@@ -145,7 +160,7 @@ func (ctx *BrokerContext) Broker() {
 // Create and add a Snowflake to the heap.
 // Required to keep track of proxies between providing them
 // with an offer and awaiting their second POST with an answer.
-func (ctx *BrokerContext) AddSnowflake(id string, proxyType string) *Snowflake {
+func (ctx *BrokerContext) AddSnowflake(id string, proxyType string, natType string) *Snowflake {
 	snowflake := new(Snowflake)
 	snowflake.id = id
 	snowflake.clients = 0
@@ -153,7 +168,11 @@ func (ctx *BrokerContext) AddSnowflake(id string, proxyType string) *Snowflake {
 	snowflake.offerChannel = make(chan []byte)
 	snowflake.answerChannel = make(chan []byte)
 	ctx.snowflakeLock.Lock()
-	heap.Push(ctx.snowflakes, snowflake)
+	if natType == NATRestricted {
+		heap.Push(ctx.restrictedSnowflakes, snowflake)
+	} else {
+		heap.Push(ctx.snowflakes, snowflake)
+	}
 	ctx.snowflakeLock.Unlock()
 	ctx.idToSnowflake[id] = snowflake
 	return snowflake
@@ -170,7 +189,7 @@ func proxyPolls(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
 		return
 	}
 
-	sid, proxyType, _, err := messages.DecodePollRequest(body)
+	sid, proxyType, natType, err := messages.DecodePollRequest(body)
 	if err != nil {
 		w.WriteHeader(http.StatusBadRequest)
 		return
@@ -187,7 +206,7 @@ func proxyPolls(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
 	}
 
 	// Wait for a client to avail an offer to the snowflake, or timeout if nil.
-	offer := ctx.RequestOffer(sid, proxyType)
+	offer := ctx.RequestOffer(sid, proxyType, natType)
 	var b []byte
 	if nil == offer {
 		ctx.metrics.lock.Lock()
@@ -226,9 +245,23 @@ func clientOffers(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
 		w.WriteHeader(http.StatusBadRequest)
 		return
 	}
+
+	natType := r.Header.Get("Snowflake-NAT-Type")
+	if natType == "" {
+		natType = NATUnknown
+	}
+
+	// Only hand out known restricted snowflakes to unrestricted clients
+	var snowflakeHeap *SnowflakeHeap
+	if natType == NATUnrestricted {
+		snowflakeHeap = ctx.restrictedSnowflakes
+	} else {
+		snowflakeHeap = ctx.snowflakes
+	}
+
 	// Immediately fail if there are no snowflakes available.
 	ctx.snowflakeLock.Lock()
-	numSnowflakes := ctx.snowflakes.Len()
+	numSnowflakes := snowflakeHeap.Len()
 	ctx.snowflakeLock.Unlock()
 	if numSnowflakes <= 0 {
 		ctx.metrics.lock.Lock()
@@ -240,7 +273,7 @@ func clientOffers(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
 	// Otherwise, find the most available snowflake proxy, and pass the offer to it.
 	// Delete must be deferred in order to correctly process answer request later.
 	ctx.snowflakeLock.Lock()
-	snowflake := heap.Pop(ctx.snowflakes).(*Snowflake)
+	snowflake := heap.Pop(snowflakeHeap).(*Snowflake)
 	ctx.snowflakeLock.Unlock()
 	snowflake.offerChannel <- offer
 
diff --git a/broker/snowflake-broker_test.go b/broker/snowflake-broker_test.go
index 18b83dd..91383a1 100644
--- a/broker/snowflake-broker_test.go
+++ b/broker/snowflake-broker_test.go
@@ -29,7 +29,7 @@ func TestBroker(t *testing.T) {
 		Convey("Adds Snowflake", func() {
 			So(ctx.snowflakes.Len(), ShouldEqual, 0)
 			So(len(ctx.idToSnowflake), ShouldEqual, 0)
-			ctx.AddSnowflake("foo", "")
+			ctx.AddSnowflake("foo", "", NATUnknown)
 			So(ctx.snowflakes.Len(), ShouldEqual, 1)
 			So(len(ctx.idToSnowflake), ShouldEqual, 1)
 		})
@@ -55,7 +55,7 @@ func TestBroker(t *testing.T) {
 		Convey("Request an offer from the Snowflake Heap", func() {
 			done := make(chan []byte)
 			go func() {
-				offer := ctx.RequestOffer("test", "")
+				offer := ctx.RequestOffer("test", "", NATUnknown)
 				done <- offer
 			}()
 			request := <-ctx.proxyPolls
@@ -79,7 +79,7 @@ func TestBroker(t *testing.T) {
 			Convey("with a proxy answer if available.", func() {
 				done := make(chan bool)
 				// Prepare a fake proxy to respond with.
-				snowflake := ctx.AddSnowflake("fake", "")
+				snowflake := ctx.AddSnowflake("fake", "", NATUnknown)
 				go func() {
 					clientOffers(ctx, w, r)
 					done <- true
@@ -97,7 +97,7 @@ func TestBroker(t *testing.T) {
 					return
 				}
 				done := make(chan bool)
-				snowflake := ctx.AddSnowflake("fake", "")
+				snowflake := ctx.AddSnowflake("fake", "", NATUnknown)
 				go func() {
 					clientOffers(ctx, w, r)
 					// Takes a few seconds here...
@@ -147,7 +147,7 @@ func TestBroker(t *testing.T) {
 		})
 
 		Convey("Responds to proxy answers...", func() {
-			s := ctx.AddSnowflake("test", "")
+			s := ctx.AddSnowflake("test", "", NATUnknown)
 			w := httptest.NewRecorder()
 			data := bytes.NewReader([]byte(`{"Version":"1.0","Sid":"test","Answer":"test"}`))
 
@@ -260,7 +260,7 @@ func TestBroker(t *testing.T) {
 			// Manually do the Broker goroutine action here for full control.
 			p := <-ctx.proxyPolls
 			So(p.id, ShouldEqual, "ymbcCMto7KHNGYlp")
-			s := ctx.AddSnowflake(p.id, "")
+			s := ctx.AddSnowflake(p.id, "", NATUnknown)
 			go func() {
 				offer := <-s.offerChannel
 				p.offerChannel <- offer
@@ -537,7 +537,7 @@ func TestMetrics(t *testing.T) {
 			So(err, ShouldBeNil)
 
 			// Prepare a fake proxy to respond with.
-			snowflake := ctx.AddSnowflake("fake", "")
+			snowflake := ctx.AddSnowflake("fake", "", NATUnknown)
 			go func() {
 				clientOffers(ctx, w, r)
 				done <- true





More information about the tor-commits mailing list