[tor-commits] [snowflake/master] broker tracking a heap of snowflakes

arlo at torproject.org arlo at torproject.org
Thu Jan 21 22:15:14 UTC 2016


commit 0cd6852ad0060fad40aaf31d5ba1f6951127fc35
Author: Serene Han <keroserene+git at gmail.com>
Date:   Thu Jan 21 10:44:14 2016 -0800

    broker tracking a heap of snowflakes
---
 broker/snowflake-broker.go      |  130 +++++++++++++++++++++++++++++++++++----
 broker/snowflake-broker_test.go |   59 ++++++++++++++++++
 client/meek-webrtc.go           |    3 +-
 proxy/broker.coffee             |    7 +--
 proxy/snowflake.coffee          |    2 +-
 5 files changed, 182 insertions(+), 19 deletions(-)

diff --git a/broker/snowflake-broker.go b/broker/snowflake-broker.go
index 4c0e62f..279b2b6 100644
--- a/broker/snowflake-broker.go
+++ b/broker/snowflake-broker.go
@@ -1,13 +1,13 @@
 package snowflake_broker
 
 import (
-	// "io"
+	"container/heap"
+	"fmt"
 	"io/ioutil"
 	"log"
 	"net"
 	"net/http"
-	"path"
-
+	"time"
 	// "appengine"
 	// "appengine/urlfetch"
 )
@@ -15,7 +15,64 @@ import (
 // This is an intermediate step - a basic hardcoded appengine rendezvous
 // to a single browser snowflake.
 
-var snowflakeProxy = ""
+// This is minimum viable client-proxy registration.
+// TODO: better, more secure registration corresponding to what's in
+// the python flashproxy facilitator.
+
+// Slice of available snowflake proxies.
+// var snowflakes []chan []byte
+
+type Snowflake struct {
+	id         string
+	sigChannel chan []byte
+	clients    int
+	index      int
+}
+
+// Implements heap.Interface, and holds Snowflakes.
+type SnowflakeHeap []*Snowflake
+
+func (sh SnowflakeHeap) Len() int { return len(sh) }
+
+func (sh SnowflakeHeap) Less(i, j int) bool {
+	// Snowflakes serving less clients should sort earlier.
+	return sh[i].clients < sh[j].clients
+}
+
+func (sh SnowflakeHeap) Swap(i, j int) {
+	sh[i], sh[j] = sh[j], sh[i]
+	sh[i].index = i
+	sh[j].index = j
+}
+
+func (sh *SnowflakeHeap) Push(s interface{}) {
+	n := len(*sh)
+	snowflake := s.(*Snowflake)
+	snowflake.index = n
+	*sh = append(*sh, snowflake)
+}
+
+// Only valid when Len() > 0.
+func (sh *SnowflakeHeap) Pop() interface{} {
+	flakes := *sh
+	n := len(flakes)
+	snowflake := flakes[n-1]
+	snowflake.index = -1
+	*sh = flakes[0 : n-1]
+	return snowflake
+}
+
+var snowflakes *SnowflakeHeap
+
+// Create and add a Snowflake to the heap.
+func AddSnowflake(id string) *Snowflake {
+	snowflake := new(Snowflake)
+	snowflake.id = id
+	snowflake.clients = 0
+	snowflake.sigChannel = make(chan []byte)
+	heap.Push(snowflakes, snowflake)
+	return snowflake
+}
 
 func robotsTxtHandler(w http.ResponseWriter, r *http.Request) {
 	w.Header().Set("Content-Type", "text/plain; charset=utf-8")
@@ -36,29 +93,76 @@ Expects a WebRTC SDP offer in the Request to give to an assigned
 snowflake proxy, which responds with the SDP answer to be sent in
 the HTTP response back to the client.
 */
-func regHandler(w http.ResponseWriter, r *http.Request) {
-	// TODO: Maybe don't pass anything on path, since it will always be bidirectional
-	dir, _ := path.Split(path.Clean(r.URL.Path))
-	if dir != "/reg/" {
-		http.NotFound(w, r)
+func clientHandler(w http.ResponseWriter, r *http.Request) {
+	offer, err := ioutil.ReadAll(r.Body)
+	if nil != err {
+		log.Println("Invalid data.")
+		return
+	}
+	w.Header().Set("Access-Control-Allow-Origin", "*")
+	// Pop the most available snowflake proxy, and pass the offer to it.
+	// TODO: Make this much better.
+	snowflake := heap.Pop(snowflakes).(*Snowflake)
+	if nil == snowflake {
+		w.Write([]byte("no snowflake proxies available"))
+		// w.WriteHeader(http.StatusServiceUnavailable)
 		return
 	}
+	// snowflakes = snowflakes[1:]
+	snowflake.sigChannel <- offer
+	w.Write([]byte("sent offer to proxy!"))
+	// TODO: Get browser snowflake to talkto this appengine instance
+	// so it can reply with an answer, and not just the offer again :)
+	// TODO: Real broker which matches clients and snowflake proxies.
+	w.Write(offer)
+}
+
+/*
+A snowflake browser proxy requests a client from the Broker.
+*/
+func proxyHandler(w http.ResponseWriter, r *http.Request) {
 	body, err := ioutil.ReadAll(r.Body)
 	if nil != err {
+		log.Println("Invalid data.")
 		return
+	}
+	w.Header().Set("Access-Control-Allow-Origin", "*")
+	snowflakeSession := body
+	log.Println("Received snowflake: ", snowflakeSession)
+	snowflake := AddSnowflake(string(snowflakeSession))
+	select {
+	case offer := <-snowflake.sigChannel:
+		log.Println("Passing client offer to snowflake.")
+		w.Write(offer)
+	case <-time.After(time.Second * 10):
+		s := fmt.Sprintf("%d snowflakes left.", snowflakes.Len())
+		w.Write([]byte("timed out. " + s))
+		heap.Remove(snowflakes, snowflake.index)
+		// w.WriteHeader(http.StatusRequestTimeout)
+	}
+}
+
+func reflectHandler(w http.ResponseWriter, r *http.Request) {
+	body, err := ioutil.ReadAll(r.Body)
+	if nil != err {
 		log.Println("Invalid data.")
+		return
 	}
 	w.Header().Set("Access-Control-Allow-Origin", "*")
-	// TODO: Get browser snowflake to talkto this appengine instance
-	// so it can reply with an answer, and not just the offer again :)
-	// TODO: Real broker which matches clients and snowflake proxies.
 	w.Write(body)
 }
 
 func init() {
+	// snowflakes = make([]chan []byte, 0)
+	snowflakes = new(SnowflakeHeap)
+	heap.Init(snowflakes)
+
 	http.HandleFunc("/robots.txt", robotsTxtHandler)
 	http.HandleFunc("/ip", ipHandler)
-	http.HandleFunc("/reg/", regHandler)
+
+	http.HandleFunc("/client", clientHandler)
+	http.HandleFunc("/proxy", proxyHandler)
+	http.HandleFunc("/reflect", reflectHandler)
 	// if SNOWFLAKE_BROKER == "" {
 	// panic("SNOWFLAKE_BROKER empty; did you forget to edit config.go?")
 	// }
diff --git a/broker/snowflake-broker_test.go b/broker/snowflake-broker_test.go
new file mode 100644
index 0000000..03f876b
--- /dev/null
+++ b/broker/snowflake-broker_test.go
@@ -0,0 +1,59 @@
+package snowflake_broker
+
+import (
+	"container/heap"
+	"testing"
+)
+
+func TestSnowflakeHeap(t *testing.T) {
+	h := new(SnowflakeHeap)
+	heap.Init(h)
+	if 0 != h.Len() {
+		t.Error("Unexpected length.")
+	}
+	s1 := new(Snowflake)
+	s2 := new(Snowflake)
+	s3 := new(Snowflake)
+	s4 := new(Snowflake)
+
+	s1.clients = 4
+	s2.clients = 5
+	s3.clients = 3
+	s4.clients = 1
+
+	heap.Push(h, s1)
+	heap.Push(h, s2)
+	heap.Push(h, s3)
+	heap.Push(h, s4)
+
+	if 4 != h.Len() {
+		t.Error("Unexpected length.")
+	}
+
+	heap.Remove(h, 0)
+	if 3 != h.Len() {
+		t.Error("Unexpected length.")
+	}
+
+	r := heap.Pop(h).(*Snowflake)
+	if r.clients != 3 {
+		t.Error("Unexpected clients: ", r.clients)
+	}
+	if r.index != -1 {
+		t.Error("Unexpected index: ", r.index)
+	}
+
+	r = heap.Pop(h).(*Snowflake)
+	if r.clients != 4 {
+		t.Error("Unexpected clients: ", r.clients)
+	}
+
+	r = heap.Pop(h).(*Snowflake)
+	if r.clients != 5 {
+		t.Error("Unexpected clients: ", r.clients)
+	}
+
+	if 0 != h.Len() {
+		t.Error("Unexpected length.")
+	}
+}
diff --git a/client/meek-webrtc.go b/client/meek-webrtc.go
index df59e98..2d68770 100644
--- a/client/meek-webrtc.go
+++ b/client/meek-webrtc.go
@@ -37,7 +37,7 @@ func NewMeekChannel(broker string, front string) *MeekChannel {
 	mc.Method = "POST"
 
 	mc.trueURL = targetUrl
-	mc.externalUrl = front + "/reg/test" // TODO: Have a better suffix.
+	mc.externalUrl = front + "/client"
 
 	// We make a copy of DefaultTransport because we want the default Dial
 	// and TLSHandshakeTimeout settings. But we want to disable the default
@@ -70,6 +70,7 @@ func (mc *MeekChannel) Negotiate(offer *webrtc.SessionDescription) (
 	if nil != err {
 		return nil, err
 	}
+	log.Println("Body: ", string(body))
 	answer := webrtc.DeserializeSessionDescription(string(body))
 	return answer, nil
 }
diff --git a/proxy/broker.coffee b/proxy/broker.coffee
index fb2add9..9432e5c 100644
--- a/proxy/broker.coffee
+++ b/proxy/broker.coffee
@@ -21,7 +21,6 @@ class Broker
     xhr = new XMLHttpRequest()
     try
       xhr.open 'POST', @url
-      xhr
     catch err
       ###
       An exception happens here when, for example, NoScript allows the domain on
@@ -35,14 +34,14 @@ class Broker
     # xhr.responseType = 'text'
     xhr.onreadystatechange = ->
       if xhr.DONE == xhr.readyState
+        log 'Broker: ' + xhr.status
         if 200 == xhr.status
-          log 'Broker: success'
           log 'Response: ' + xhr.responseText
-          # @fac_complete xhr.responseText
+          log xhr
         else
           log 'Broker error ' + xhr.status + ' - ' + xhr.statusText
-
     xhr.send 'snowflake-testing'
+    log "Broker: sent a registration message, waiting for reply..."
 
   sendAnswer: (answer) ->
     log 'Sending answer to broker.'
diff --git a/proxy/snowflake.coffee b/proxy/snowflake.coffee
index ac59419..1b837d5 100644
--- a/proxy/snowflake.coffee
+++ b/proxy/snowflake.coffee
@@ -8,7 +8,7 @@ Assume that the webrtc client plugin is always the offerer, in which case
 this must always act as the answerer.
 ###
 DEFAULT_WEBSOCKET = '192.81.135.242:9901'
-DEFAULT_BROKER = 'https://snowflake-reg.appspot.com/reg/test'
+DEFAULT_BROKER = 'https://snowflake-reg.appspot.com/proxy'
 DEFAULT_PORTS =
   http:  80
   https: 443





More information about the tor-commits mailing list