[snowflake/master] broker tracking a heap of snowflakes

commit 0cd6852ad0060fad40aaf31d5ba1f6951127fc35 Author: Serene Han <keroserene+git@gmail.com> Date: Thu Jan 21 10:44:14 2016 -0800 broker tracking a heap of snowflakes --- broker/snowflake-broker.go | 130 +++++++++++++++++++++++++++++++++++---- broker/snowflake-broker_test.go | 59 ++++++++++++++++++ client/meek-webrtc.go | 3 +- proxy/broker.coffee | 7 +-- proxy/snowflake.coffee | 2 +- 5 files changed, 182 insertions(+), 19 deletions(-) diff --git a/broker/snowflake-broker.go b/broker/snowflake-broker.go index 4c0e62f..279b2b6 100644 --- a/broker/snowflake-broker.go +++ b/broker/snowflake-broker.go @@ -1,13 +1,13 @@ package snowflake_broker import ( - // "io" + "container/heap" + "fmt" "io/ioutil" "log" "net" "net/http" - "path" - + "time" // "appengine" // "appengine/urlfetch" ) @@ -15,7 +15,64 @@ import ( // This is an intermediate step - a basic hardcoded appengine rendezvous // to a single browser snowflake. -var snowflakeProxy = "" +// This is minimum viable client-proxy registration. +// TODO: better, more secure registration corresponding to what's in +// the python flashproxy facilitator. + +// Slice of available snowflake proxies. +// var snowflakes []chan []byte + +type Snowflake struct { + id string + sigChannel chan []byte + clients int + index int +} + +// Implements heap.Interface, and holds Snowflakes. +type SnowflakeHeap []*Snowflake + +func (sh SnowflakeHeap) Len() int { return len(sh) } + +func (sh SnowflakeHeap) Less(i, j int) bool { + // Snowflakes serving less clients should sort earlier. + return sh[i].clients < sh[j].clients +} + +func (sh SnowflakeHeap) Swap(i, j int) { + sh[i], sh[j] = sh[j], sh[i] + sh[i].index = i + sh[j].index = j +} + +func (sh *SnowflakeHeap) Push(s interface{}) { + n := len(*sh) + snowflake := s.(*Snowflake) + snowflake.index = n + *sh = append(*sh, snowflake) +} + +// Only valid when Len() > 0. +func (sh *SnowflakeHeap) Pop() interface{} { + flakes := *sh + n := len(flakes) + snowflake := flakes[n-1] + snowflake.index = -1 + *sh = flakes[0 : n-1] + return snowflake +} + +var snowflakes *SnowflakeHeap + +// Create and add a Snowflake to the heap. +func AddSnowflake(id string) *Snowflake { + snowflake := new(Snowflake) + snowflake.id = id + snowflake.clients = 0 + snowflake.sigChannel = make(chan []byte) + heap.Push(snowflakes, snowflake) + return snowflake +} func robotsTxtHandler(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "text/plain; charset=utf-8") @@ -36,29 +93,76 @@ Expects a WebRTC SDP offer in the Request to give to an assigned snowflake proxy, which responds with the SDP answer to be sent in the HTTP response back to the client. */ -func regHandler(w http.ResponseWriter, r *http.Request) { - // TODO: Maybe don't pass anything on path, since it will always be bidirectional - dir, _ := path.Split(path.Clean(r.URL.Path)) - if dir != "/reg/" { - http.NotFound(w, r) +func clientHandler(w http.ResponseWriter, r *http.Request) { + offer, err := ioutil.ReadAll(r.Body) + if nil != err { + log.Println("Invalid data.") + return + } + w.Header().Set("Access-Control-Allow-Origin", "*") + // Pop the most available snowflake proxy, and pass the offer to it. + // TODO: Make this much better. + snowflake := heap.Pop(snowflakes).(*Snowflake) + if nil == snowflake { + w.Write([]byte("no snowflake proxies available")) + // w.WriteHeader(http.StatusServiceUnavailable) return } + // snowflakes = snowflakes[1:] + snowflake.sigChannel <- offer + w.Write([]byte("sent offer to proxy!")) + // TODO: Get browser snowflake to talkto this appengine instance + // so it can reply with an answer, and not just the offer again :) + // TODO: Real broker which matches clients and snowflake proxies. + w.Write(offer) +} + +/* +A snowflake browser proxy requests a client from the Broker. +*/ +func proxyHandler(w http.ResponseWriter, r *http.Request) { body, err := ioutil.ReadAll(r.Body) if nil != err { + log.Println("Invalid data.") return + } + w.Header().Set("Access-Control-Allow-Origin", "*") + snowflakeSession := body + log.Println("Received snowflake: ", snowflakeSession) + snowflake := AddSnowflake(string(snowflakeSession)) + select { + case offer := <-snowflake.sigChannel: + log.Println("Passing client offer to snowflake.") + w.Write(offer) + case <-time.After(time.Second * 10): + s := fmt.Sprintf("%d snowflakes left.", snowflakes.Len()) + w.Write([]byte("timed out. " + s)) + heap.Remove(snowflakes, snowflake.index) + // w.WriteHeader(http.StatusRequestTimeout) + } +} + +func reflectHandler(w http.ResponseWriter, r *http.Request) { + body, err := ioutil.ReadAll(r.Body) + if nil != err { log.Println("Invalid data.") + return } w.Header().Set("Access-Control-Allow-Origin", "*") - // TODO: Get browser snowflake to talkto this appengine instance - // so it can reply with an answer, and not just the offer again :) - // TODO: Real broker which matches clients and snowflake proxies. w.Write(body) } func init() { + // snowflakes = make([]chan []byte, 0) + snowflakes = new(SnowflakeHeap) + heap.Init(snowflakes) + http.HandleFunc("/robots.txt", robotsTxtHandler) http.HandleFunc("/ip", ipHandler) - http.HandleFunc("/reg/", regHandler) + + http.HandleFunc("/client", clientHandler) + http.HandleFunc("/proxy", proxyHandler) + http.HandleFunc("/reflect", reflectHandler) // if SNOWFLAKE_BROKER == "" { // panic("SNOWFLAKE_BROKER empty; did you forget to edit config.go?") // } diff --git a/broker/snowflake-broker_test.go b/broker/snowflake-broker_test.go new file mode 100644 index 0000000..03f876b --- /dev/null +++ b/broker/snowflake-broker_test.go @@ -0,0 +1,59 @@ +package snowflake_broker + +import ( + "container/heap" + "testing" +) + +func TestSnowflakeHeap(t *testing.T) { + h := new(SnowflakeHeap) + heap.Init(h) + if 0 != h.Len() { + t.Error("Unexpected length.") + } + s1 := new(Snowflake) + s2 := new(Snowflake) + s3 := new(Snowflake) + s4 := new(Snowflake) + + s1.clients = 4 + s2.clients = 5 + s3.clients = 3 + s4.clients = 1 + + heap.Push(h, s1) + heap.Push(h, s2) + heap.Push(h, s3) + heap.Push(h, s4) + + if 4 != h.Len() { + t.Error("Unexpected length.") + } + + heap.Remove(h, 0) + if 3 != h.Len() { + t.Error("Unexpected length.") + } + + r := heap.Pop(h).(*Snowflake) + if r.clients != 3 { + t.Error("Unexpected clients: ", r.clients) + } + if r.index != -1 { + t.Error("Unexpected index: ", r.index) + } + + r = heap.Pop(h).(*Snowflake) + if r.clients != 4 { + t.Error("Unexpected clients: ", r.clients) + } + + r = heap.Pop(h).(*Snowflake) + if r.clients != 5 { + t.Error("Unexpected clients: ", r.clients) + } + + if 0 != h.Len() { + t.Error("Unexpected length.") + } +} diff --git a/client/meek-webrtc.go b/client/meek-webrtc.go index df59e98..2d68770 100644 --- a/client/meek-webrtc.go +++ b/client/meek-webrtc.go @@ -37,7 +37,7 @@ func NewMeekChannel(broker string, front string) *MeekChannel { mc.Method = "POST" mc.trueURL = targetUrl - mc.externalUrl = front + "/reg/test" // TODO: Have a better suffix. + mc.externalUrl = front + "/client" // We make a copy of DefaultTransport because we want the default Dial // and TLSHandshakeTimeout settings. But we want to disable the default @@ -70,6 +70,7 @@ func (mc *MeekChannel) Negotiate(offer *webrtc.SessionDescription) ( if nil != err { return nil, err } + log.Println("Body: ", string(body)) answer := webrtc.DeserializeSessionDescription(string(body)) return answer, nil } diff --git a/proxy/broker.coffee b/proxy/broker.coffee index fb2add9..9432e5c 100644 --- a/proxy/broker.coffee +++ b/proxy/broker.coffee @@ -21,7 +21,6 @@ class Broker xhr = new XMLHttpRequest() try xhr.open 'POST', @url - xhr catch err ### An exception happens here when, for example, NoScript allows the domain on @@ -35,14 +34,14 @@ class Broker # xhr.responseType = 'text' xhr.onreadystatechange = -> if xhr.DONE == xhr.readyState + log 'Broker: ' + xhr.status if 200 == xhr.status - log 'Broker: success' log 'Response: ' + xhr.responseText - # @fac_complete xhr.responseText + log xhr else log 'Broker error ' + xhr.status + ' - ' + xhr.statusText - xhr.send 'snowflake-testing' + log "Broker: sent a registration message, waiting for reply..." sendAnswer: (answer) -> log 'Sending answer to broker.' diff --git a/proxy/snowflake.coffee b/proxy/snowflake.coffee index ac59419..1b837d5 100644 --- a/proxy/snowflake.coffee +++ b/proxy/snowflake.coffee @@ -8,7 +8,7 @@ Assume that the webrtc client plugin is always the offerer, in which case this must always act as the answerer. ### DEFAULT_WEBSOCKET = '192.81.135.242:9901' -DEFAULT_BROKER = 'https://snowflake-reg.appspot.com/reg/test' +DEFAULT_BROKER = 'https://snowflake-reg.appspot.com/proxy' DEFAULT_PORTS = http: 80 https: 443
participants (1)
-
arlo@torproject.org