
commit 791f6925ec749a28ad95c76325f802bc4de2d75c Author: Serene Han <keroserene+git@gmail.com> Date: Tue Feb 16 20:50:00 2016 -0800 Simplify proxy poll handler, and broker match test --- broker/broker.go | 89 ++++++++++++++++++++++------------------- broker/snowflake-broker_test.go | 25 +++++++++--- broker/snowflake-heap.go | 4 ++ 3 files changed, 70 insertions(+), 48 deletions(-) diff --git a/broker/broker.go b/broker/broker.go index 69b8369..9e5ee30 100644 --- a/broker/broker.go +++ b/broker/broker.go @@ -28,17 +28,17 @@ type BrokerContext struct { snowflakes *SnowflakeHeap // Map keeping track of snowflakeIDs required to match SDP answers from // the second http POST. - snowflakeMap map[string]*Snowflake - createChannel chan *ProxyRequest + snowflakeMap map[string]*Snowflake + proxyPolls chan *ProxyPoll } func NewBrokerContext() *BrokerContext { snowflakes := new(SnowflakeHeap) heap.Init(snowflakes) return &BrokerContext{ - snowflakes: snowflakes, - snowflakeMap: make(map[string]*Snowflake), - createChannel: make(chan *ProxyRequest), + snowflakes: snowflakes, + snowflakeMap: make(map[string]*Snowflake), + proxyPolls: make(chan *ProxyPoll), } } @@ -51,46 +51,58 @@ func (sh SnowflakeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { sh.h(sh.BrokerContext, w, r) } -type ProxyRequest struct { - id string - offerChan chan []byte +// Proxies may poll for client offers concurrently. +type ProxyPoll struct { + id string + offerChannel chan []byte } -// Create and add a Snowflake to the heap. -func (sc *BrokerContext) AddSnowflake(id string) *Snowflake { - snowflake := new(Snowflake) - snowflake.id = id - snowflake.clients = 0 - snowflake.offerChannel = make(chan []byte) - snowflake.answerChannel = make(chan []byte) - heap.Push(sc.snowflakes, snowflake) - sc.snowflakeMap[id] = snowflake - return snowflake +// Registers a Snowflake and waits for some Client to send an offer, +// as part of the polling logic of the proxy handler. +func (ctx *BrokerContext) RequestOffer(id string) []byte { + request := new(ProxyPoll) + request.id = id + request.offerChannel = make(chan []byte) + ctx.proxyPolls <- request + // Block until an offer is available... + offer := <-request.offerChannel + return offer } -// Match proxies to clients. -// func (ctx *BrokerContext) Broker(proxies <-chan *ProxyRequest) { +// goroutine which match proxies to clients. +// Safely processes proxy requests, responding to them with either an available +// client offer or nil on timeout / none are available. func (ctx *BrokerContext) Broker() { - // for p := range proxies { - for p := range ctx.createChannel { - snowflake := ctx.AddSnowflake(p.id) - // Wait for a client to avail an offer to the snowflake, or timeout - // and ask the snowflake to poll later. - go func(p *ProxyRequest) { + for request := range ctx.proxyPolls { + snowflake := ctx.AddSnowflake(request.id) + // Wait for a client to avail an offer to the snowflake. + go func(request *ProxyPoll) { select { case offer := <-snowflake.offerChannel: - log.Println("Passing client offer to snowflake.") - p.offerChan <- offer + log.Println("Passing client offer to snowflake proxy.") + request.offerChannel <- offer case <-time.After(time.Second * ProxyTimeout): // This snowflake is no longer available to serve clients. heap.Remove(ctx.snowflakes, snowflake.index) delete(ctx.snowflakeMap, snowflake.id) - p.offerChan <- nil + request.offerChannel <- nil } - }(p) + }(request) } } +// Create and add a Snowflake to the heap. +func (ctx *BrokerContext) AddSnowflake(id string) *Snowflake { + snowflake := new(Snowflake) + snowflake.id = id + snowflake.clients = 0 + snowflake.offerChannel = make(chan []byte) + snowflake.answerChannel = make(chan []byte) + heap.Push(ctx.snowflakes, snowflake) + ctx.snowflakeMap[id] = snowflake + return snowflake +} + func robotsTxtHandler(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "text/plain; charset=utf-8") w.Write([]byte("User-agent: *\nDisallow:\n")) @@ -145,14 +157,15 @@ func clientHandler(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) { case answer := <-snowflake.answerChannel: log.Println("Client: Retrieving answer") w.Write(answer) - // Only remove from the snowflake map once the answer is set. - delete(ctx.snowflakeMap, snowflake.id) case <-time.After(time.Second * ClientTimeout): log.Println("Client: Timed out.") w.WriteHeader(http.StatusGatewayTimeout) w.Write([]byte("timed out waiting for answer!")) } + // Remove from the snowflake map whether answer was sent or not, because + // this client request is now over. + delete(ctx.snowflakeMap, snowflake.id) } /* @@ -172,17 +185,9 @@ func proxyHandler(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) { if string(body) != id { // Mismatched IDs! w.WriteHeader(http.StatusBadRequest) } - // Maybe confirm that X-Session-ID is the same. log.Println("Received snowflake: ", id) - - p := new(ProxyRequest) - p.id = id - p.offerChan = make(chan []byte) - ctx.createChannel <- p - - // Wait for a client to avail an offer to the snowflake, or timeout - // and ask the snowflake to poll later. - offer := <-p.offerChan + // Wait for a client to avail an offer to the snowflake, or timeout if nil. + offer := ctx.RequestOffer(id) if nil == offer { log.Println("Proxy " + id + " did not receive a Client offer.") w.WriteHeader(http.StatusGatewayTimeout) diff --git a/broker/snowflake-broker_test.go b/broker/snowflake-broker_test.go index ee984b0..b9432d8 100644 --- a/broker/snowflake-broker_test.go +++ b/broker/snowflake-broker_test.go @@ -22,6 +22,19 @@ func TestBroker(t *testing.T) { So(len(ctx.snowflakeMap), ShouldEqual, 1) }) + Convey("Broker goroutine matches clients with proxies", func() { + p := new(ProxyPoll) + p.id = "test" + p.offerChannel = make(chan []byte) + go func() { + ctx.proxyPolls <- p + close(ctx.proxyPolls) + }() + ctx.Broker() + So(ctx.snowflakes.Len(), ShouldEqual, 1) + So(ctx.snowflakes.Len(), ShouldEqual, 1) + }) + Convey("Responds to client offers...", func() { w := httptest.NewRecorder() data := bytes.NewReader([]byte("test")) @@ -83,9 +96,9 @@ func TestBroker(t *testing.T) { done <- true }(ctx) // Pass a fake client offer to this proxy - p := <-ctx.createChannel + p := <-ctx.proxyPolls So(p.id, ShouldEqual, "test") - p.offerChan <- []byte("fake offer") + p.offerChannel <- []byte("fake offer") <-done So(w.Code, ShouldEqual, http.StatusOK) So(w.Body.String(), ShouldEqual, "fake offer") @@ -96,10 +109,10 @@ func TestBroker(t *testing.T) { proxyHandler(ctx, w, r) done <- true }(ctx) - p := <-ctx.createChannel + p := <-ctx.proxyPolls So(p.id, ShouldEqual, "test") // nil means timeout - p.offerChan <- nil + p.offerChannel <- nil <-done So(w.Body.String(), ShouldEqual, "") So(w.Code, ShouldEqual, http.StatusGatewayTimeout) @@ -159,12 +172,12 @@ func TestBroker(t *testing.T) { }() // Manually do the Broker goroutine action here for full control. - p := <-ctx.createChannel + p := <-ctx.proxyPolls So(p.id, ShouldEqual, "test") s := ctx.AddSnowflake(p.id) go func() { offer := <-s.offerChannel - p.offerChan <- offer + p.offerChannel <- offer }() So(ctx.snowflakeMap["test"], ShouldNotBeNil) diff --git a/broker/snowflake-heap.go b/broker/snowflake-heap.go index d37228f..cf249fe 100644 --- a/broker/snowflake-heap.go +++ b/broker/snowflake-heap.go @@ -4,6 +4,10 @@ Keeping track of pending available snowflake proxies. package snowflake_broker +/* +The Snowflake struct contains a single interaction +over the offer and answer channels. +*/ type Snowflake struct { id string offerChannel chan []byte
participants (1)
-
serene@torproject.org