[tor-commits] [snowflake/main] Intermediary refactor teasing apart http / ipc

arlo at torproject.org arlo at torproject.org
Thu Jul 8 19:44:16 UTC 2021


commit 015958fbe66bd91a003c6fc92a11bd5d13b887c3
Author: Arlo Breault <arlolra at gmail.com>
Date:   Thu May 20 07:49:27 2021 -0400

    Intermediary refactor teasing apart http / ipc
    
    Introduces an IPC struct and moves the logic out of the http handlers
    and into methods on that.
---
 broker/broker.go                | 338 +++++++++-------------------------------
 broker/ipc.go                   | 293 ++++++++++++++++++++++++++++++++++
 broker/snowflake-broker_test.go | 111 ++++++-------
 common/messages/ipc.go          |  18 +++
 4 files changed, 445 insertions(+), 315 deletions(-)

diff --git a/broker/broker.go b/broker/broker.go
index fc4727d..58f3955 100644
--- a/broker/broker.go
+++ b/broker/broker.go
@@ -6,15 +6,13 @@ SessionDescriptions in order to negotiate a WebRTC connection.
 package main
 
 import (
-	"bytes"
 	"container/heap"
 	"crypto/tls"
+	"errors"
 	"flag"
-	"fmt"
 	"io"
 	"io/ioutil"
 	"log"
-	"net"
 	"net/http"
 	"os"
 	"os/signal"
@@ -31,23 +29,7 @@ import (
 )
 
 const (
-	ClientTimeout = 10
-	ProxyTimeout  = 10
-	readLimit     = 100000 //Maximum number of bytes to be read from an HTTP request
-
-	NATUnknown      = "unknown"
-	NATRestricted   = "restricted"
-	NATUnrestricted = "unrestricted"
-)
-
-// We support two client message formats. The legacy format is for backwards
-// combatability and relies heavily on HTTP headers and status codes to convey
-// information.
-type clientVersion int
-
-const (
-	v0 clientVersion = iota //legacy version
-	v1
+	readLimit = 100000 // Maximum number of bytes to be read from an HTTP request
 )
 
 type BrokerContext struct {
@@ -89,8 +71,8 @@ func NewBrokerContext(metricsLogger *log.Logger) *BrokerContext {
 
 // Implements the http.Handler interface
 type SnowflakeHandler struct {
-	*BrokerContext
-	handle func(*BrokerContext, http.ResponseWriter, *http.Request)
+	*IPC
+	handle func(*IPC, http.ResponseWriter, *http.Request)
 }
 
 // Implements the http.Handler interface
@@ -106,7 +88,7 @@ func (sh SnowflakeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 	if "OPTIONS" == r.Method {
 		return
 	}
-	sh.handle(sh.BrokerContext, w, r)
+	sh.handle(sh.IPC, w, r)
 }
 
 func (mh MetricsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
@@ -199,7 +181,7 @@ func (ctx *BrokerContext) AddSnowflake(id string, proxyType string, natType stri
 /*
 For snowflake proxies to request a client from the Broker.
 */
-func proxyPolls(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
+func proxyPolls(i *IPC, w http.ResponseWriter, r *http.Request) {
 	body, err := ioutil.ReadAll(http.MaxBytesReader(w, r.Body, readLimit))
 	if err != nil {
 		log.Println("Invalid data.")
@@ -207,47 +189,28 @@ func proxyPolls(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
 		return
 	}
 
-	sid, proxyType, natType, clients, err := messages.DecodePollRequest(body)
-	if err != nil {
-		w.WriteHeader(http.StatusBadRequest)
-		return
-	}
-
-	// Log geoip stats
-	remoteIP, _, err := net.SplitHostPort(r.RemoteAddr)
-	if err != nil {
-		log.Println("Error processing proxy IP: ", err.Error())
-	} else {
-		ctx.metrics.lock.Lock()
-		ctx.metrics.UpdateCountryStats(remoteIP, proxyType, natType)
-		ctx.metrics.lock.Unlock()
+	arg := messages.Arg{
+		Body:       body,
+		RemoteAddr: r.RemoteAddr,
+		NatType:    "",
 	}
 
-	// Wait for a client to avail an offer to the snowflake, or timeout if nil.
-	offer := ctx.RequestOffer(sid, proxyType, natType, clients)
-	var b []byte
-	if nil == offer {
-		ctx.metrics.lock.Lock()
-		ctx.metrics.proxyIdleCount++
-		ctx.metrics.promMetrics.ProxyPollTotal.With(prometheus.Labels{"nat": natType, "status": "idle"}).Inc()
-		ctx.metrics.lock.Unlock()
-
-		b, err = messages.EncodePollResponse("", false, "")
-		if err != nil {
-			w.WriteHeader(http.StatusInternalServerError)
-			return
-		}
-
-		w.Write(b)
+	var response []byte
+	err = i.ProxyPolls(arg, &response)
+	switch {
+	case err == nil:
+	case errors.Is(err, messages.ErrBadRequest):
+		w.WriteHeader(http.StatusBadRequest)
 		return
-	}
-	ctx.metrics.promMetrics.ProxyPollTotal.With(prometheus.Labels{"nat": natType, "status": "matched"}).Inc()
-	b, err = messages.EncodePollResponse(string(offer.sdp), true, offer.natType)
-	if err != nil {
+	case errors.Is(err, messages.ErrInternal):
+		fallthrough
+	default:
+		log.Println(err)
 		w.WriteHeader(http.StatusInternalServerError)
 		return
 	}
-	if _, err := w.Write(b); err != nil {
+
+	if _, err := w.Write(response); err != nil {
 		log.Printf("proxyPolls unable to write offer with error: %v", err)
 	}
 }
@@ -258,162 +221,44 @@ type ClientOffer struct {
 	sdp     []byte
 }
 
-// Sends an encoded response to the client and an
-// HTTP server error if the response encoding fails
-func sendClientResponse(resp *messages.ClientPollResponse, w http.ResponseWriter) {
-	data, err := resp.EncodePollResponse()
-	if err != nil {
-		log.Printf("error encoding answer")
-		w.WriteHeader(http.StatusInternalServerError)
-	} else {
-		if _, err := w.Write([]byte(data)); err != nil {
-			log.Printf("unable to write answer with error: %v", err)
-		}
-	}
-}
-
 /*
 Expects a WebRTC SDP offer in the Request to give to an assigned
 snowflake proxy, which responds with the SDP answer to be sent in
 the HTTP response back to the client.
 */
-func clientOffers(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
-	var err error
-	var version clientVersion
-
-	startTime := time.Now()
+func clientOffers(i *IPC, w http.ResponseWriter, r *http.Request) {
 	body, err := ioutil.ReadAll(http.MaxBytesReader(w, r.Body, readLimit))
 	if err != nil {
 		log.Printf("Error reading client request: %s", err.Error())
-		w.WriteHeader(http.StatusInternalServerError)
+		w.WriteHeader(http.StatusBadRequest)
 		return
 	}
-	if len(body) > 0 && body[0] == '{' {
-		version = v0
-	} else {
-		parts := bytes.SplitN(body, []byte("\n"), 2)
-		if len(parts) < 2 {
-			// no version number found
-			err := fmt.Errorf("unsupported message version")
-			sendClientResponse(&messages.ClientPollResponse{Error: err.Error()}, w)
-			return
-		}
-		body = parts[1]
-		if string(parts[0]) == "1.0" {
-			version = v1
 
-		} else {
-			err := fmt.Errorf("unsupported message version")
-			sendClientResponse(&messages.ClientPollResponse{Error: err.Error()}, w)
-			return
-		}
+	arg := messages.Arg{
+		Body:       body,
+		RemoteAddr: "",
+		NatType:    r.Header.Get("Snowflake-NAT-Type"),
 	}
 
-	var offer *ClientOffer
-	switch version {
-	case v0:
-		offer = &ClientOffer{
-			natType: r.Header.Get("Snowflake-NAT-Type"),
-			sdp:     body,
-		}
-	case v1:
-		req, err := messages.DecodeClientPollRequest(body)
-		if err != nil {
-			sendClientResponse(&messages.ClientPollResponse{Error: err.Error()}, w)
-			return
-		}
-		offer = &ClientOffer{
-			natType: req.NAT,
-			sdp:     []byte(req.Offer),
-		}
+	var response []byte
+	err = i.ClientOffers(arg, &response)
+	switch {
+	case err == nil:
+	case errors.Is(err, messages.ErrUnavailable):
+		w.WriteHeader(http.StatusServiceUnavailable)
+		return
+	case errors.Is(err, messages.ErrTimeout):
+		w.WriteHeader(http.StatusGatewayTimeout)
+		return
 	default:
-		panic("unknown version")
-	}
-
-	// Only hand out known restricted snowflakes to unrestricted clients
-	var snowflakeHeap *SnowflakeHeap
-	if offer.natType == NATUnrestricted {
-		snowflakeHeap = ctx.restrictedSnowflakes
-	} else {
-		snowflakeHeap = ctx.snowflakes
-	}
-
-	// Immediately fail if there are no snowflakes available.
-	ctx.snowflakeLock.Lock()
-	numSnowflakes := snowflakeHeap.Len()
-	ctx.snowflakeLock.Unlock()
-	if numSnowflakes <= 0 {
-		ctx.metrics.lock.Lock()
-		ctx.metrics.clientDeniedCount++
-		ctx.metrics.promMetrics.ClientPollTotal.With(prometheus.Labels{"nat": offer.natType, "status": "denied"}).Inc()
-		if offer.natType == NATUnrestricted {
-			ctx.metrics.clientUnrestrictedDeniedCount++
-		} else {
-			ctx.metrics.clientRestrictedDeniedCount++
-		}
-		ctx.metrics.lock.Unlock()
-		switch version {
-		case v0:
-			w.WriteHeader(http.StatusServiceUnavailable)
-		case v1:
-			resp := &messages.ClientPollResponse{Error: "no snowflake proxies currently available"}
-			sendClientResponse(resp, w)
-		default:
-			panic("unknown version")
-		}
+		log.Println(err)
+		w.WriteHeader(http.StatusInternalServerError)
 		return
 	}
-	// Otherwise, find the most available snowflake proxy, and pass the offer to it.
-	// Delete must be deferred in order to correctly process answer request later.
-	ctx.snowflakeLock.Lock()
-	snowflake := heap.Pop(snowflakeHeap).(*Snowflake)
-	ctx.snowflakeLock.Unlock()
-	snowflake.offerChannel <- offer
-
-	// Wait for the answer to be returned on the channel or timeout.
-	select {
-	case answer := <-snowflake.answerChannel:
-		ctx.metrics.lock.Lock()
-		ctx.metrics.clientProxyMatchCount++
-		ctx.metrics.promMetrics.ClientPollTotal.With(prometheus.Labels{"nat": offer.natType, "status": "matched"}).Inc()
-		ctx.metrics.lock.Unlock()
-		switch version {
-		case v0:
-			if _, err := w.Write([]byte(answer)); err != nil {
-				log.Printf("unable to write answer with error: %v", err)
-			}
-		case v1:
-			resp := &messages.ClientPollResponse{Answer: answer}
-			sendClientResponse(resp, w)
-		default:
-			panic("unknown version")
-		}
-		// Initial tracking of elapsed time.
-		ctx.metrics.clientRoundtripEstimate = time.Since(startTime) /
-			time.Millisecond
-	case <-time.After(time.Second * ClientTimeout):
-		log.Println("Client: Timed out.")
-		switch version {
-		case v0:
-			w.WriteHeader(http.StatusGatewayTimeout)
-			if _, err := w.Write(
-				[]byte("timed out waiting for answer!")); err != nil {
-				log.Printf("unable to write timeout error, failed with error: %v",
-					err)
-			}
-		case v1:
-			resp := &messages.ClientPollResponse{
-				Error: "timed out waiting for answer!"}
-			sendClientResponse(resp, w)
-		default:
-			panic("unknown version")
-		}
-	}
 
-	ctx.snowflakeLock.Lock()
-	ctx.metrics.promMetrics.AvailableProxies.With(prometheus.Labels{"nat": snowflake.natType, "type": snowflake.proxyType}).Dec()
-	delete(ctx.idToSnowflake, snowflake.id)
-	ctx.snowflakeLock.Unlock()
+	if _, err := w.Write(response); err != nil {
+		log.Printf("clientOffers unable to write answer with error: %v", err)
+	}
 }
 
 /*
@@ -421,82 +266,51 @@ Expects snowflake proxes which have previously successfully received
 an offer from proxyHandler to respond with an answer in an HTTP POST,
 which the broker will pass back to the original client.
 */
-func proxyAnswers(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
-
+func proxyAnswers(i *IPC, w http.ResponseWriter, r *http.Request) {
 	body, err := ioutil.ReadAll(http.MaxBytesReader(w, r.Body, readLimit))
-	if nil != err || nil == body || len(body) <= 0 {
+	if err != nil {
 		log.Println("Invalid data.")
 		w.WriteHeader(http.StatusBadRequest)
 		return
 	}
 
-	answer, id, err := messages.DecodeAnswerRequest(body)
-	if err != nil || answer == "" {
-		w.WriteHeader(http.StatusBadRequest)
-		return
+	arg := messages.Arg{
+		Body:       body,
+		RemoteAddr: "",
+		NatType:    "",
 	}
 
-	var success = true
-	ctx.snowflakeLock.Lock()
-	snowflake, ok := ctx.idToSnowflake[id]
-	ctx.snowflakeLock.Unlock()
-	if !ok || nil == snowflake {
-		// The snowflake took too long to respond with an answer, so its client
-		// disappeared / the snowflake is no longer recognized by the Broker.
-		success = false
-	}
-	b, err := messages.EncodeAnswerResponse(success)
-	if err != nil {
-		log.Printf("Error encoding answer: %s", err.Error())
+	var response []byte
+	err = i.ProxyAnswers(arg, &response)
+	switch {
+	case err == nil:
+	case errors.Is(err, messages.ErrBadRequest):
+		w.WriteHeader(http.StatusBadRequest)
+		return
+	case errors.Is(err, messages.ErrInternal):
+		fallthrough
+	default:
+		log.Println(err)
 		w.WriteHeader(http.StatusInternalServerError)
 		return
 	}
-	w.Write(b)
 
-	if success {
-		snowflake.answerChannel <- answer
+	if _, err := w.Write(response); err != nil {
+		log.Printf("proxyAnswers unable to write answer response with error: %v", err)
 	}
-
 }
 
-func debugHandler(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
-
-	var webexts, browsers, standalones, unknowns int
-	var natRestricted, natUnrestricted, natUnknown int
-	ctx.snowflakeLock.Lock()
-	s := fmt.Sprintf("current snowflakes available: %d\n", len(ctx.idToSnowflake))
-	for _, snowflake := range ctx.idToSnowflake {
-		if snowflake.proxyType == "badge" {
-			browsers++
-		} else if snowflake.proxyType == "webext" {
-			webexts++
-		} else if snowflake.proxyType == "standalone" {
-			standalones++
-		} else {
-			unknowns++
-		}
-
-		switch snowflake.natType {
-		case NATRestricted:
-			natRestricted++
-		case NATUnrestricted:
-			natUnrestricted++
-		default:
-			natUnknown++
-		}
+func debugHandler(i *IPC, w http.ResponseWriter, r *http.Request) {
+	var response string
 
+	err := i.Debug(new(interface{}), &response)
+	if err != nil {
+		log.Println(err)
+		w.WriteHeader(http.StatusInternalServerError)
+		return
 	}
-	ctx.snowflakeLock.Unlock()
-	s += fmt.Sprintf("\tstandalone proxies: %d", standalones)
-	s += fmt.Sprintf("\n\tbrowser proxies: %d", browsers)
-	s += fmt.Sprintf("\n\twebext proxies: %d", webexts)
-	s += fmt.Sprintf("\n\tunknown proxies: %d", unknowns)
-
-	s += fmt.Sprintf("\nNAT Types available:")
-	s += fmt.Sprintf("\n\trestricted: %d", natRestricted)
-	s += fmt.Sprintf("\n\tunrestricted: %d", natUnrestricted)
-	s += fmt.Sprintf("\n\tunknown: %d", natUnknown)
-	if _, err := w.Write([]byte(s)); err != nil {
+
+	if _, err := w.Write([]byte(response)); err != nil {
 		log.Printf("writing proxy information returned error: %v ", err)
 	}
 }
@@ -589,12 +403,14 @@ func main() {
 
 	go ctx.Broker()
 
+	i := &IPC{ctx}
+
 	http.HandleFunc("/robots.txt", robotsTxtHandler)
 
-	http.Handle("/proxy", SnowflakeHandler{ctx, proxyPolls})
-	http.Handle("/client", SnowflakeHandler{ctx, clientOffers})
-	http.Handle("/answer", SnowflakeHandler{ctx, proxyAnswers})
-	http.Handle("/debug", SnowflakeHandler{ctx, debugHandler})
+	http.Handle("/proxy", SnowflakeHandler{i, proxyPolls})
+	http.Handle("/client", SnowflakeHandler{i, clientOffers})
+	http.Handle("/answer", SnowflakeHandler{i, proxyAnswers})
+	http.Handle("/debug", SnowflakeHandler{i, debugHandler})
 	http.Handle("/metrics", MetricsHandler{metricsFilename, metricsHandler})
 	http.Handle("/prometheus", promhttp.HandlerFor(ctx.metrics.promMetrics.registry, promhttp.HandlerOpts{}))
 
diff --git a/broker/ipc.go b/broker/ipc.go
new file mode 100644
index 0000000..79ccf0f
--- /dev/null
+++ b/broker/ipc.go
@@ -0,0 +1,293 @@
+package main
+
+import (
+	"bytes"
+	"container/heap"
+	"fmt"
+	"log"
+	"net"
+	"time"
+
+	"git.torproject.org/pluggable-transports/snowflake.git/common/messages"
+	"github.com/prometheus/client_golang/prometheus"
+)
+
+const (
+	ClientTimeout = 10
+	ProxyTimeout  = 10
+
+	NATUnknown      = "unknown"
+	NATRestricted   = "restricted"
+	NATUnrestricted = "unrestricted"
+)
+
+// We support two client message formats. The legacy format is for backwards
+// combatability and relies heavily on HTTP headers and status codes to convey
+// information.
+type clientVersion int
+
+const (
+	v0 clientVersion = iota //legacy version
+	v1
+)
+
+type IPC struct {
+	ctx *BrokerContext
+}
+
+func (i *IPC) Debug(_ interface{}, response *string) error {
+	var webexts, browsers, standalones, unknowns int
+	var natRestricted, natUnrestricted, natUnknown int
+
+	i.ctx.snowflakeLock.Lock()
+	s := fmt.Sprintf("current snowflakes available: %d\n", len(i.ctx.idToSnowflake))
+	for _, snowflake := range i.ctx.idToSnowflake {
+		if snowflake.proxyType == "badge" {
+			browsers++
+		} else if snowflake.proxyType == "webext" {
+			webexts++
+		} else if snowflake.proxyType == "standalone" {
+			standalones++
+		} else {
+			unknowns++
+		}
+
+		switch snowflake.natType {
+		case NATRestricted:
+			natRestricted++
+		case NATUnrestricted:
+			natUnrestricted++
+		default:
+			natUnknown++
+		}
+
+	}
+	i.ctx.snowflakeLock.Unlock()
+
+	s += fmt.Sprintf("\tstandalone proxies: %d", standalones)
+	s += fmt.Sprintf("\n\tbrowser proxies: %d", browsers)
+	s += fmt.Sprintf("\n\twebext proxies: %d", webexts)
+	s += fmt.Sprintf("\n\tunknown proxies: %d", unknowns)
+
+	s += fmt.Sprintf("\nNAT Types available:")
+	s += fmt.Sprintf("\n\trestricted: %d", natRestricted)
+	s += fmt.Sprintf("\n\tunrestricted: %d", natUnrestricted)
+	s += fmt.Sprintf("\n\tunknown: %d", natUnknown)
+
+	*response = s
+	return nil
+}
+
+func (i *IPC) ProxyPolls(arg messages.Arg, response *[]byte) error {
+	sid, proxyType, natType, clients, err := messages.DecodePollRequest(arg.Body)
+	if err != nil {
+		return messages.ErrBadRequest
+	}
+
+	// Log geoip stats
+	remoteIP, _, err := net.SplitHostPort(arg.RemoteAddr)
+	if err != nil {
+		log.Println("Error processing proxy IP: ", err.Error())
+	} else {
+		i.ctx.metrics.lock.Lock()
+		i.ctx.metrics.UpdateCountryStats(remoteIP, proxyType, natType)
+		i.ctx.metrics.lock.Unlock()
+	}
+
+	var b []byte
+
+	// Wait for a client to avail an offer to the snowflake, or timeout if nil.
+	offer := i.ctx.RequestOffer(sid, proxyType, natType, clients)
+
+	if offer == nil {
+		i.ctx.metrics.lock.Lock()
+		i.ctx.metrics.proxyIdleCount++
+		i.ctx.metrics.promMetrics.ProxyPollTotal.With(prometheus.Labels{"nat": natType, "status": "idle"}).Inc()
+		i.ctx.metrics.lock.Unlock()
+
+		b, err = messages.EncodePollResponse("", false, "")
+		if err != nil {
+			return messages.ErrInternal
+		}
+
+		*response = b
+		return nil
+	}
+
+	i.ctx.metrics.promMetrics.ProxyPollTotal.With(prometheus.Labels{"nat": natType, "status": "matched"}).Inc()
+	b, err = messages.EncodePollResponse(string(offer.sdp), true, offer.natType)
+	if err != nil {
+		return messages.ErrInternal
+	}
+	*response = b
+
+	return nil
+}
+
+func sendClientResponse(resp *messages.ClientPollResponse, response *[]byte) error {
+	data, err := resp.EncodePollResponse()
+	if err != nil {
+		log.Printf("error encoding answer")
+		return messages.ErrInternal
+	} else {
+		*response = []byte(data)
+		return nil
+	}
+}
+
+func (i *IPC) ClientOffers(arg messages.Arg, response *[]byte) error {
+	var version clientVersion
+
+	startTime := time.Now()
+	body := arg.Body
+
+	if len(body) > 0 && body[0] == '{' {
+		version = v0
+	} else {
+		parts := bytes.SplitN(body, []byte("\n"), 2)
+		if len(parts) < 2 {
+			// no version number found
+			err := fmt.Errorf("unsupported message version")
+			return sendClientResponse(&messages.ClientPollResponse{Error: err.Error()}, response)
+		}
+		body = parts[1]
+		if string(parts[0]) == "1.0" {
+			version = v1
+
+		} else {
+			err := fmt.Errorf("unsupported message version")
+			return sendClientResponse(&messages.ClientPollResponse{Error: err.Error()}, response)
+		}
+	}
+
+	var offer *ClientOffer
+	switch version {
+	case v0:
+		offer = &ClientOffer{
+			natType: arg.NatType,
+			sdp:     body,
+		}
+	case v1:
+		req, err := messages.DecodeClientPollRequest(body)
+		if err != nil {
+			return sendClientResponse(&messages.ClientPollResponse{Error: err.Error()}, response)
+		}
+		offer = &ClientOffer{
+			natType: req.NAT,
+			sdp:     []byte(req.Offer),
+		}
+	default:
+		panic("unknown version")
+	}
+
+	// Only hand out known restricted snowflakes to unrestricted clients
+	var snowflakeHeap *SnowflakeHeap
+	if offer.natType == NATUnrestricted {
+		snowflakeHeap = i.ctx.restrictedSnowflakes
+	} else {
+		snowflakeHeap = i.ctx.snowflakes
+	}
+
+	// Immediately fail if there are no snowflakes available.
+	i.ctx.snowflakeLock.Lock()
+	numSnowflakes := snowflakeHeap.Len()
+	i.ctx.snowflakeLock.Unlock()
+	if numSnowflakes <= 0 {
+		i.ctx.metrics.lock.Lock()
+		i.ctx.metrics.clientDeniedCount++
+		i.ctx.metrics.promMetrics.ClientPollTotal.With(prometheus.Labels{"nat": offer.natType, "status": "denied"}).Inc()
+		if offer.natType == NATUnrestricted {
+			i.ctx.metrics.clientUnrestrictedDeniedCount++
+		} else {
+			i.ctx.metrics.clientRestrictedDeniedCount++
+		}
+		i.ctx.metrics.lock.Unlock()
+		switch version {
+		case v0:
+			return messages.ErrUnavailable
+		case v1:
+			resp := &messages.ClientPollResponse{Error: "no snowflake proxies currently available"}
+			return sendClientResponse(resp, response)
+		default:
+			panic("unknown version")
+		}
+	}
+
+	// Otherwise, find the most available snowflake proxy, and pass the offer to it.
+	// Delete must be deferred in order to correctly process answer request later.
+	i.ctx.snowflakeLock.Lock()
+	snowflake := heap.Pop(snowflakeHeap).(*Snowflake)
+	i.ctx.snowflakeLock.Unlock()
+	snowflake.offerChannel <- offer
+
+	var err error
+
+	// Wait for the answer to be returned on the channel or timeout.
+	select {
+	case answer := <-snowflake.answerChannel:
+		i.ctx.metrics.lock.Lock()
+		i.ctx.metrics.clientProxyMatchCount++
+		i.ctx.metrics.promMetrics.ClientPollTotal.With(prometheus.Labels{"nat": offer.natType, "status": "matched"}).Inc()
+		i.ctx.metrics.lock.Unlock()
+		switch version {
+		case v0:
+			*response = []byte(answer)
+		case v1:
+			resp := &messages.ClientPollResponse{Answer: answer}
+			err = sendClientResponse(resp, response)
+		default:
+			panic("unknown version")
+		}
+		// Initial tracking of elapsed time.
+		i.ctx.metrics.clientRoundtripEstimate = time.Since(startTime) / time.Millisecond
+	case <-time.After(time.Second * ClientTimeout):
+		log.Println("Client: Timed out.")
+		switch version {
+		case v0:
+			err = messages.ErrTimeout
+		case v1:
+			resp := &messages.ClientPollResponse{
+				Error: "timed out waiting for answer!"}
+			err = sendClientResponse(resp, response)
+		default:
+			panic("unknown version")
+		}
+	}
+
+	i.ctx.snowflakeLock.Lock()
+	i.ctx.metrics.promMetrics.AvailableProxies.With(prometheus.Labels{"nat": snowflake.natType, "type": snowflake.proxyType}).Dec()
+	delete(i.ctx.idToSnowflake, snowflake.id)
+	i.ctx.snowflakeLock.Unlock()
+
+	return err
+}
+
+func (i *IPC) ProxyAnswers(arg messages.Arg, response *[]byte) error {
+	answer, id, err := messages.DecodeAnswerRequest(arg.Body)
+	if err != nil || answer == "" {
+		return messages.ErrBadRequest
+	}
+
+	var success = true
+	i.ctx.snowflakeLock.Lock()
+	snowflake, ok := i.ctx.idToSnowflake[id]
+	i.ctx.snowflakeLock.Unlock()
+	if !ok || snowflake == nil {
+		// The snowflake took too long to respond with an answer, so its client
+		// disappeared / the snowflake is no longer recognized by the Broker.
+		success = false
+	}
+
+	b, err := messages.EncodeAnswerResponse(success)
+	if err != nil {
+		log.Printf("Error encoding answer: %s", err.Error())
+		return messages.ErrInternal
+	}
+	*response = b
+
+	if success {
+		snowflake.answerChannel <- answer
+	}
+
+	return nil
+}
diff --git a/broker/snowflake-broker_test.go b/broker/snowflake-broker_test.go
index 825bc6f..77e62cf 100644
--- a/broker/snowflake-broker_test.go
+++ b/broker/snowflake-broker_test.go
@@ -28,6 +28,7 @@ func TestBroker(t *testing.T) {
 
 	Convey("Context", t, func() {
 		ctx := NewBrokerContext(NullLogger())
+		i := &IPC{ctx}
 
 		Convey("Adds Snowflake", func() {
 			So(ctx.snowflakes.Len(), ShouldEqual, 0)
@@ -76,7 +77,7 @@ func TestBroker(t *testing.T) {
 			So(err, ShouldBeNil)
 
 			Convey("with error when no snowflakes are available.", func() {
-				clientOffers(ctx, w, r)
+				clientOffers(i, w, r)
 				So(w.Code, ShouldEqual, http.StatusOK)
 				So(w.Body.String(), ShouldEqual, `{"error":"no snowflake proxies currently available"}`)
 			})
@@ -86,7 +87,7 @@ func TestBroker(t *testing.T) {
 				// Prepare a fake proxy to respond with.
 				snowflake := ctx.AddSnowflake("fake", "", NATUnrestricted, 0)
 				go func() {
-					clientOffers(ctx, w, r)
+					clientOffers(i, w, r)
 					done <- true
 				}()
 				offer := <-snowflake.offerChannel
@@ -104,7 +105,7 @@ func TestBroker(t *testing.T) {
 				done := make(chan bool)
 				snowflake := ctx.AddSnowflake("fake", "", NATUnrestricted, 0)
 				go func() {
-					clientOffers(ctx, w, r)
+					clientOffers(i, w, r)
 					// Takes a few seconds here...
 					done <- true
 				}()
@@ -124,7 +125,7 @@ func TestBroker(t *testing.T) {
 			r.Header.Set("Snowflake-NAT-TYPE", "restricted")
 
 			Convey("with 503 when no snowflakes are available.", func() {
-				clientOffers(ctx, w, r)
+				clientOffers(i, w, r)
 				So(w.Code, ShouldEqual, http.StatusServiceUnavailable)
 				So(w.Body.String(), ShouldEqual, "")
 			})
@@ -134,7 +135,7 @@ func TestBroker(t *testing.T) {
 				// Prepare a fake proxy to respond with.
 				snowflake := ctx.AddSnowflake("fake", "", NATUnrestricted, 0)
 				go func() {
-					clientOffers(ctx, w, r)
+					clientOffers(i, w, r)
 					done <- true
 				}()
 				offer := <-snowflake.offerChannel
@@ -152,7 +153,7 @@ func TestBroker(t *testing.T) {
 				done := make(chan bool)
 				snowflake := ctx.AddSnowflake("fake", "", NATUnrestricted, 0)
 				go func() {
-					clientOffers(ctx, w, r)
+					clientOffers(i, w, r)
 					// Takes a few seconds here...
 					done <- true
 				}()
@@ -173,7 +174,7 @@ func TestBroker(t *testing.T) {
 
 			Convey("with a client offer if available.", func() {
 				go func(ctx *BrokerContext) {
-					proxyPolls(ctx, w, r)
+					proxyPolls(i, w, r)
 					done <- true
 				}(ctx)
 				// Pass a fake client offer to this proxy
@@ -187,7 +188,7 @@ func TestBroker(t *testing.T) {
 
 			Convey("return empty 200 OK when no client offer is available.", func() {
 				go func(ctx *BrokerContext) {
-					proxyPolls(ctx, w, r)
+					proxyPolls(i, w, r)
 					done <- true
 				}(ctx)
 				p := <-ctx.proxyPolls
@@ -209,7 +210,7 @@ func TestBroker(t *testing.T) {
 				r, err := http.NewRequest("POST", "snowflake.broker/answer", data)
 				So(err, ShouldBeNil)
 				go func(ctx *BrokerContext) {
-					proxyAnswers(ctx, w, r)
+					proxyAnswers(i, w, r)
 				}(ctx)
 				answer := <-s.answerChannel
 				So(w.Code, ShouldEqual, http.StatusOK)
@@ -220,7 +221,7 @@ func TestBroker(t *testing.T) {
 				data = bytes.NewReader([]byte(`{"Version":"1.0","Sid":"invalid","Answer":"test"}`))
 				r, err := http.NewRequest("POST", "snowflake.broker/answer", data)
 				So(err, ShouldBeNil)
-				proxyAnswers(ctx, w, r)
+				proxyAnswers(i, w, r)
 				So(w.Code, ShouldEqual, http.StatusOK)
 				b, err := ioutil.ReadAll(w.Body)
 				So(err, ShouldBeNil)
@@ -232,7 +233,7 @@ func TestBroker(t *testing.T) {
 				data := bytes.NewReader(nil)
 				r, err := http.NewRequest("POST", "snowflake.broker/answer", data)
 				So(err, ShouldBeNil)
-				proxyAnswers(ctx, w, r)
+				proxyAnswers(i, w, r)
 				So(w.Code, ShouldEqual, http.StatusBadRequest)
 			})
 
@@ -240,7 +241,7 @@ func TestBroker(t *testing.T) {
 				data := bytes.NewReader(make([]byte, 100001))
 				r, err := http.NewRequest("POST", "snowflake.broker/answer", data)
 				So(err, ShouldBeNil)
-				proxyAnswers(ctx, w, r)
+				proxyAnswers(i, w, r)
 				So(w.Code, ShouldEqual, http.StatusBadRequest)
 			})
 
@@ -250,6 +251,7 @@ func TestBroker(t *testing.T) {
 
 	Convey("End-To-End", t, func() {
 		ctx := NewBrokerContext(NullLogger())
+		i := &IPC{ctx}
 
 		Convey("Check for client/proxy data race", func() {
 			proxy_done := make(chan bool)
@@ -264,7 +266,7 @@ func TestBroker(t *testing.T) {
 			So(err, ShouldBeNil)
 
 			go func(ctx *BrokerContext) {
-				proxyPolls(ctx, wp, rp)
+				proxyPolls(i, wp, rp)
 				proxy_done <- true
 			}(ctx)
 
@@ -275,7 +277,7 @@ func TestBroker(t *testing.T) {
 			So(err, ShouldBeNil)
 
 			go func() {
-				clientOffers(ctx, wc, rc)
+				clientOffers(i, wc, rc)
 				client_done <- true
 			}()
 
@@ -288,7 +290,7 @@ func TestBroker(t *testing.T) {
 			rp, err = http.NewRequest("POST", "snowflake.broker/answer", datap)
 			So(err, ShouldBeNil)
 			go func(ctx *BrokerContext) {
-				proxyAnswers(ctx, wp, rp)
+				proxyAnswers(i, wp, rp)
 				proxy_done <- true
 			}(ctx)
 
@@ -307,7 +309,7 @@ func TestBroker(t *testing.T) {
 			rP, err := http.NewRequest("POST", "snowflake.broker/proxy", dataP)
 			So(err, ShouldBeNil)
 			go func() {
-				proxyPolls(ctx, wP, rP)
+				proxyPolls(i, wP, rP)
 				polled <- true
 			}()
 
@@ -328,7 +330,7 @@ func TestBroker(t *testing.T) {
 			rC, err := http.NewRequest("POST", "snowflake.broker/client", dataC)
 			So(err, ShouldBeNil)
 			go func() {
-				clientOffers(ctx, wC, rC)
+				clientOffers(i, wC, rC)
 				done <- true
 			}()
 
@@ -341,7 +343,7 @@ func TestBroker(t *testing.T) {
 			dataA := bytes.NewReader([]byte(`{"Version":"1.0","Sid":"ymbcCMto7KHNGYlp","Answer":"test"}`))
 			rA, err := http.NewRequest("POST", "snowflake.broker/answer", dataA)
 			So(err, ShouldBeNil)
-			proxyAnswers(ctx, wA, rA)
+			proxyAnswers(i, wA, rA)
 			So(wA.Code, ShouldEqual, http.StatusOK)
 
 			<-done
@@ -503,6 +505,7 @@ func TestMetrics(t *testing.T) {
 		done := make(chan bool)
 		buf := new(bytes.Buffer)
 		ctx := NewBrokerContext(log.New(buf, "", 0))
+		i := &IPC{ctx}
 
 		err := ctx.metrics.LoadGeoipDatabases("test_geoip", "test_geoip6")
 		So(err, ShouldEqual, nil)
@@ -514,10 +517,10 @@ func TestMetrics(t *testing.T) {
 			r, err := http.NewRequest("POST", "snowflake.broker/proxy", data)
 			r.RemoteAddr = "129.97.208.23:8888" //CA geoip
 			So(err, ShouldBeNil)
-			go func(ctx *BrokerContext) {
-				proxyPolls(ctx, w, r)
+			go func(i *IPC) {
+				proxyPolls(i, w, r)
 				done <- true
-			}(ctx)
+			}(i)
 			p := <-ctx.proxyPolls //manually unblock poll
 			p.offerChannel <- nil
 			<-done
@@ -527,10 +530,10 @@ func TestMetrics(t *testing.T) {
 			r, err = http.NewRequest("POST", "snowflake.broker/proxy", data)
 			r.RemoteAddr = "129.97.208.23:8888" //CA geoip
 			So(err, ShouldBeNil)
-			go func(ctx *BrokerContext) {
-				proxyPolls(ctx, w, r)
+			go func(i *IPC) {
+				proxyPolls(i, w, r)
 				done <- true
-			}(ctx)
+			}(i)
 			p = <-ctx.proxyPolls //manually unblock poll
 			p.offerChannel <- nil
 			<-done
@@ -540,10 +543,10 @@ func TestMetrics(t *testing.T) {
 			r, err = http.NewRequest("POST", "snowflake.broker/proxy", data)
 			r.RemoteAddr = "129.97.208.23:8888" //CA geoip
 			So(err, ShouldBeNil)
-			go func(ctx *BrokerContext) {
-				proxyPolls(ctx, w, r)
+			go func(i *IPC) {
+				proxyPolls(i, w, r)
 				done <- true
-			}(ctx)
+			}(i)
 			p = <-ctx.proxyPolls //manually unblock poll
 			p.offerChannel <- nil
 			<-done
@@ -553,10 +556,10 @@ func TestMetrics(t *testing.T) {
 			r, err = http.NewRequest("POST", "snowflake.broker/proxy", data)
 			r.RemoteAddr = "129.97.208.23:8888" //CA geoip
 			So(err, ShouldBeNil)
-			go func(ctx *BrokerContext) {
-				proxyPolls(ctx, w, r)
+			go func(i *IPC) {
+				proxyPolls(i, w, r)
 				done <- true
-			}(ctx)
+			}(i)
 			p = <-ctx.proxyPolls //manually unblock poll
 			p.offerChannel <- nil
 			<-done
@@ -573,7 +576,7 @@ func TestMetrics(t *testing.T) {
 			r, err := http.NewRequest("POST", "snowflake.broker/client", data)
 			So(err, ShouldBeNil)
 
-			clientOffers(ctx, w, r)
+			clientOffers(i, w, r)
 
 			ctx.metrics.printMetrics()
 			So(buf.String(), ShouldContainSubstring, "client-denied-count 8\nclient-restricted-denied-count 8\nclient-unrestricted-denied-count 0\nclient-snowflake-match-count 0")
@@ -595,7 +598,7 @@ func TestMetrics(t *testing.T) {
 			// Prepare a fake proxy to respond with.
 			snowflake := ctx.AddSnowflake("fake", "", NATUnrestricted, 0)
 			go func() {
-				clientOffers(ctx, w, r)
+				clientOffers(i, w, r)
 				done <- true
 			}()
 			offer := <-snowflake.offerChannel
@@ -614,49 +617,49 @@ func TestMetrics(t *testing.T) {
 			r, err := http.NewRequest("POST", "snowflake.broker/client", data)
 			So(err, ShouldBeNil)
 
-			clientOffers(ctx, w, r)
+			clientOffers(i, w, r)
 			w = httptest.NewRecorder()
 			data = bytes.NewReader(
 				[]byte("1.0\n{\"offer\": \"fake\", \"nat\": \"restricted\"}"))
 			r, err = http.NewRequest("POST", "snowflake.broker/client", data)
 			So(err, ShouldBeNil)
-			clientOffers(ctx, w, r)
+			clientOffers(i, w, r)
 			w = httptest.NewRecorder()
 			data = bytes.NewReader(
 				[]byte("1.0\n{\"offer\": \"fake\", \"nat\": \"restricted\"}"))
 			r, err = http.NewRequest("POST", "snowflake.broker/client", data)
 			So(err, ShouldBeNil)
-			clientOffers(ctx, w, r)
+			clientOffers(i, w, r)
 			w = httptest.NewRecorder()
 			data = bytes.NewReader(
 				[]byte("1.0\n{\"offer\": \"fake\", \"nat\": \"restricted\"}"))
 			r, err = http.NewRequest("POST", "snowflake.broker/client", data)
 			So(err, ShouldBeNil)
-			clientOffers(ctx, w, r)
+			clientOffers(i, w, r)
 			w = httptest.NewRecorder()
 			data = bytes.NewReader(
 				[]byte("1.0\n{\"offer\": \"fake\", \"nat\": \"restricted\"}"))
 			r, err = http.NewRequest("POST", "snowflake.broker/client", data)
 			So(err, ShouldBeNil)
-			clientOffers(ctx, w, r)
+			clientOffers(i, w, r)
 			w = httptest.NewRecorder()
 			data = bytes.NewReader(
 				[]byte("1.0\n{\"offer\": \"fake\", \"nat\": \"restricted\"}"))
 			r, err = http.NewRequest("POST", "snowflake.broker/client", data)
 			So(err, ShouldBeNil)
-			clientOffers(ctx, w, r)
+			clientOffers(i, w, r)
 			w = httptest.NewRecorder()
 			data = bytes.NewReader(
 				[]byte("1.0\n{\"offer\": \"fake\", \"nat\": \"restricted\"}"))
 			r, err = http.NewRequest("POST", "snowflake.broker/client", data)
 			So(err, ShouldBeNil)
-			clientOffers(ctx, w, r)
+			clientOffers(i, w, r)
 			w = httptest.NewRecorder()
 			data = bytes.NewReader(
 				[]byte("1.0\n{\"offer\": \"fake\", \"nat\": \"restricted\"}"))
 			r, err = http.NewRequest("POST", "snowflake.broker/client", data)
 			So(err, ShouldBeNil)
-			clientOffers(ctx, w, r)
+			clientOffers(i, w, r)
 
 			ctx.metrics.printMetrics()
 			So(buf.String(), ShouldContainSubstring, "client-denied-count 8\nclient-restricted-denied-count 8\nclient-unrestricted-denied-count 0\n")
@@ -666,7 +669,7 @@ func TestMetrics(t *testing.T) {
 				[]byte("1.0\n{\"offer\": \"fake\", \"nat\": \"restricted\"}"))
 			r, err = http.NewRequest("POST", "snowflake.broker/client", data)
 			So(err, ShouldBeNil)
-			clientOffers(ctx, w, r)
+			clientOffers(i, w, r)
 			buf.Reset()
 			ctx.metrics.printMetrics()
 			So(buf.String(), ShouldContainSubstring, "client-denied-count 16\nclient-restricted-denied-count 16\nclient-unrestricted-denied-count 0\n")
@@ -680,7 +683,7 @@ func TestMetrics(t *testing.T) {
 			r.RemoteAddr = "129.97.208.23:8888" //CA geoip
 			So(err, ShouldBeNil)
 			go func(ctx *BrokerContext) {
-				proxyPolls(ctx, w, r)
+				proxyPolls(i, w, r)
 				done <- true
 			}(ctx)
 			p := <-ctx.proxyPolls //manually unblock poll
@@ -693,10 +696,10 @@ func TestMetrics(t *testing.T) {
 				log.Printf("unable to get NewRequest with error: %v", err)
 			}
 			r.RemoteAddr = "129.97.208.23:8888" //CA geoip
-			go func(ctx *BrokerContext) {
-				proxyPolls(ctx, w, r)
+			go func(i *IPC) {
+				proxyPolls(i, w, r)
 				done <- true
-			}(ctx)
+			}(i)
 			p = <-ctx.proxyPolls //manually unblock poll
 			p.offerChannel <- nil
 			<-done
@@ -711,10 +714,10 @@ func TestMetrics(t *testing.T) {
 			r, err := http.NewRequest("POST", "snowflake.broker/proxy", data)
 			r.RemoteAddr = "129.97.208.23:8888" //CA geoip
 			So(err, ShouldBeNil)
-			go func(ctx *BrokerContext) {
-				proxyPolls(ctx, w, r)
+			go func(i *IPC) {
+				proxyPolls(i, w, r)
 				done <- true
-			}(ctx)
+			}(i)
 			p := <-ctx.proxyPolls //manually unblock poll
 			p.offerChannel <- nil
 			<-done
@@ -728,10 +731,10 @@ func TestMetrics(t *testing.T) {
 				log.Printf("unable to get NewRequest with error: %v", err)
 			}
 			r.RemoteAddr = "129.97.208.24:8888" //CA geoip
-			go func(ctx *BrokerContext) {
-				proxyPolls(ctx, w, r)
+			go func(i *IPC) {
+				proxyPolls(i, w, r)
 				done <- true
-			}(ctx)
+			}(i)
 			p = <-ctx.proxyPolls //manually unblock poll
 			p.offerChannel <- nil
 			<-done
@@ -747,7 +750,7 @@ func TestMetrics(t *testing.T) {
 			r, err := http.NewRequest("POST", "snowflake.broker/client", data)
 			So(err, ShouldBeNil)
 
-			clientOffers(ctx, w, r)
+			clientOffers(i, w, r)
 
 			ctx.metrics.printMetrics()
 			So(buf.String(), ShouldContainSubstring, "client-denied-count 8\nclient-restricted-denied-count 8\nclient-unrestricted-denied-count 0\nclient-snowflake-match-count 0")
@@ -760,7 +763,7 @@ func TestMetrics(t *testing.T) {
 			r, err = http.NewRequest("POST", "snowflake.broker/client", data)
 			So(err, ShouldBeNil)
 
-			clientOffers(ctx, w, r)
+			clientOffers(i, w, r)
 
 			ctx.metrics.printMetrics()
 			So(buf.String(), ShouldContainSubstring, "client-denied-count 8\nclient-restricted-denied-count 0\nclient-unrestricted-denied-count 8\nclient-snowflake-match-count 0")
@@ -773,7 +776,7 @@ func TestMetrics(t *testing.T) {
 			r, err = http.NewRequest("POST", "snowflake.broker/client", data)
 			So(err, ShouldBeNil)
 
-			clientOffers(ctx, w, r)
+			clientOffers(i, w, r)
 
 			ctx.metrics.printMetrics()
 			So(buf.String(), ShouldContainSubstring, "client-denied-count 8\nclient-restricted-denied-count 8\nclient-unrestricted-denied-count 0\nclient-snowflake-match-count 0")
diff --git a/common/messages/ipc.go b/common/messages/ipc.go
new file mode 100644
index 0000000..3f89200
--- /dev/null
+++ b/common/messages/ipc.go
@@ -0,0 +1,18 @@
+package messages
+
+import (
+	"errors"
+)
+
+type Arg struct {
+	Body       []byte
+	RemoteAddr string
+	NatType    string
+}
+
+var (
+	ErrBadRequest  = errors.New("bad request")
+	ErrInternal    = errors.New("internal error")
+	ErrUnavailable = errors.New("service unavailable")
+	ErrTimeout     = errors.New("timeout")
+)





More information about the tor-commits mailing list