commit dccc15a6e9d620298f77fb7ae14692723b434306 Author: Cecylia Bocovich cohosh@torproject.org Date: Fri Nov 22 17:15:06 2019 -0500
Add synchronization to prevent race in broker
There's a race condition in the broker where both the proxy and the client processes try to pop/remove the same snowflake from the heap. This patch adds synchronization to prevent simultaneous accesses to snowflakes. --- broker/broker.go | 34 +++++++++++++++++++++++++++------- 1 file changed, 27 insertions(+), 7 deletions(-)
diff --git a/broker/broker.go b/broker/broker.go index c166f1a..a5b0edf 100644 --- a/broker/broker.go +++ b/broker/broker.go @@ -18,6 +18,7 @@ import ( "os" "os/signal" "strings" + "sync" "syscall" "time"
@@ -37,6 +38,8 @@ type BrokerContext struct { // Map keeping track of snowflakeIDs required to match SDP answers from // the second http POST. idToSnowflake map[string]*Snowflake + // Synchronization for the + snowflakeLock sync.Mutex proxyPolls chan *ProxyPoll metrics *Metrics } @@ -127,10 +130,13 @@ func (ctx *BrokerContext) Broker() { request.offerChannel <- offer case <-time.After(time.Second * ProxyTimeout): // This snowflake is no longer available to serve clients. - // TODO: Fix race using a delete channel - heap.Remove(ctx.snowflakes, snowflake.index) - delete(ctx.idToSnowflake, snowflake.id) - request.offerChannel <- nil + ctx.snowflakeLock.Lock() + defer ctx.snowflakeLock.Unlock() + if snowflake.index != -1 { + heap.Remove(ctx.snowflakes, snowflake.index) + delete(ctx.idToSnowflake, snowflake.id) + close(request.offerChannel) + } } }(request) } @@ -146,7 +152,9 @@ func (ctx *BrokerContext) AddSnowflake(id string, proxyType string) *Snowflake { snowflake.proxyType = proxyType snowflake.offerChannel = make(chan []byte) snowflake.answerChannel = make(chan []byte) + ctx.snowflakeLock.Lock() heap.Push(ctx.snowflakes, snowflake) + ctx.snowflakeLock.Unlock() ctx.idToSnowflake[id] = snowflake return snowflake } @@ -215,15 +223,19 @@ func clientOffers(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) { return } // Immediately fail if there are no snowflakes available. - if ctx.snowflakes.Len() <= 0 { + ctx.snowflakeLock.Lock() + numSnowflakes := ctx.snowflakes.Len() + ctx.snowflakeLock.Unlock() + if numSnowflakes <= 0 { ctx.metrics.clientDeniedCount++ w.WriteHeader(http.StatusServiceUnavailable) return } // Otherwise, find the most available snowflake proxy, and pass the offer to it. // Delete must be deferred in order to correctly process answer request later. + ctx.snowflakeLock.Lock() snowflake := heap.Pop(ctx.snowflakes).(*Snowflake) - defer delete(ctx.idToSnowflake, snowflake.id) + ctx.snowflakeLock.Unlock() snowflake.offerChannel <- offer
// Wait for the answer to be returned on the channel or timeout. @@ -243,6 +255,10 @@ func clientOffers(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) { log.Printf("unable to write timeout error, failed with error: %v", err) } } + + ctx.snowflakeLock.Lock() + delete(ctx.idToSnowflake, snowflake.id) + ctx.snowflakeLock.Unlock() }
/* @@ -266,7 +282,9 @@ func proxyAnswers(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) { }
var success = true + ctx.snowflakeLock.Lock() snowflake, ok := ctx.idToSnowflake[id] + ctx.snowflakeLock.Unlock() if !ok || nil == snowflake { // The snowflake took too long to respond with an answer, so its client // disappeared / the snowflake is no longer recognized by the Broker. @@ -287,9 +305,10 @@ func proxyAnswers(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) { }
func debugHandler(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) { - s := fmt.Sprintf("current snowflakes available: %d\n", ctx.snowflakes.Len())
var webexts, browsers, standalones, unknowns int + ctx.snowflakeLock.Lock() + s := fmt.Sprintf("current snowflakes available: %d\n", len(ctx.idToSnowflake)) for _, snowflake := range ctx.idToSnowflake { if snowflake.proxyType == "badge" { browsers++ @@ -302,6 +321,7 @@ func debugHandler(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) { }
} + ctx.snowflakeLock.Unlock() s += fmt.Sprintf("\tstandalone proxies: %d", standalones) s += fmt.Sprintf("\n\tbrowser proxies: %d", browsers) s += fmt.Sprintf("\n\twebext proxies: %d", webexts)