[tor-commits] [snowflake/master] Separate broker HTTP and snowflake heap go files

serene at torproject.org serene at torproject.org
Fri Jan 29 17:10:21 UTC 2016


commit a6f6284229689330be5f69cddda27f98d15642f2
Author: Serene Han <keroserene+git at gmail.com>
Date:   Fri Jan 29 08:56:31 2016 -0800

    Separate broker HTTP and snowflake heap go files
---
 broker/broker.go           | 178 ++++++++++++++++++++++++++++++++++++
 broker/snowflake-broker.go | 223 ---------------------------------------------
 broker/snowflake-heap.go   |  46 ++++++++++
 3 files changed, 224 insertions(+), 223 deletions(-)

diff --git a/broker/broker.go b/broker/broker.go
new file mode 100644
index 0000000..dda20cb
--- /dev/null
+++ b/broker/broker.go
@@ -0,0 +1,178 @@
+/*
+Broker acts as the HTTP signaling channel.
+It matches clients and snowflake proxies by passing corresponding
+SessionDescriptions in order to negotiate a WebRTC connection.
+
+TODO(serene): This code is currently the absolute minimum required to
+cause a successful negotiation.
+It's otherwise very unsafe and problematic, and needs quite some work...
+*/
+package snowflake_broker
+
+import (
+	"container/heap"
+	"io/ioutil"
+	"log"
+	"net"
+	"net/http"
+	"time"
+)
+
+// This is minimum viable client-proxy registration.
+// TODO(#13): better, more secure registration corresponding to what's in
+// the python flashproxy facilitator.
+
+var snowflakes *SnowflakeHeap
+
+// Map keeping track of snowflakeIDs required to match SDP answers from
+// the second http POST.
+var snowflakeMap map[string]*Snowflake
+
+// Create and add a Snowflake to the heap.
+func AddSnowflake(id string) *Snowflake {
+	snowflake := new(Snowflake)
+	snowflake.id = id
+	snowflake.clients = 0
+	snowflake.offerChannel = make(chan []byte)
+	snowflake.answerChannel = make(chan []byte)
+	heap.Push(snowflakes, snowflake)
+	snowflakeMap[id] = snowflake
+	return snowflake
+}
+
+func robotsTxtHandler(w http.ResponseWriter, r *http.Request) {
+	w.Header().Set("Content-Type", "text/plain; charset=utf-8")
+	w.Write([]byte("User-agent: *\nDisallow:\n"))
+}
+
+func ipHandler(w http.ResponseWriter, r *http.Request) {
+	remoteAddr := r.RemoteAddr
+	if net.ParseIP(remoteAddr).To4() == nil {
+		remoteAddr = "[" + remoteAddr + "]"
+	}
+	w.Header().Set("Content-Type", "text/plain; charset=utf-8")
+	w.Write([]byte(remoteAddr))
+}
+
+/*
+Expects a WebRTC SDP offer in the Request to give to an assigned
+snowflake proxy, which responds with the SDP answer to be sent in
+the HTTP response back to the client.
+*/
+func clientHandler(w http.ResponseWriter, r *http.Request) {
+	offer, err := ioutil.ReadAll(r.Body)
+	if nil != err {
+		log.Println("Invalid data.")
+		w.WriteHeader(http.StatusBadRequest)
+		return
+	}
+	w.Header().Set("Access-Control-Allow-Origin", "*")
+	w.Header().Set("Access-Control-Allow-Headers", "X-Session-ID")
+
+	// Find the most available snowflake proxy, and pass the offer to it.
+	// TODO: Needs improvement.
+	snowflake := heap.Pop(snowflakes).(*Snowflake)
+	if nil == snowflake {
+		w.WriteHeader(http.StatusServiceUnavailable)
+		// w.Write([]byte("no snowflake proxies available"))
+		return
+	}
+	snowflake.offerChannel <- offer
+
+	// Wait for the answer to be returned on the channel.
+	select {
+	case answer := <-snowflake.answerChannel:
+		log.Println("Retrieving answer")
+		w.Write(answer)
+		// Only remove from the snowflake map once the answer is set.
+		delete(snowflakeMap, snowflake.id)
+
+	case <-time.After(time.Second * 10):
+		w.WriteHeader(http.StatusGatewayTimeout)
+		w.Write([]byte("timed out waiting for answer!"))
+	}
+}
+
+/*
+For snowflake proxies to request a client from the Broker.
+*/
+func proxyHandler(w http.ResponseWriter, r *http.Request) {
+	w.Header().Set("Access-Control-Allow-Origin", "*")
+	w.Header().Set("Access-Control-Allow-Headers", "Origin, X-Session-ID")
+	// For CORS preflight.
+	if "OPTIONS" == r.Method {
+		return
+	}
+
+	id := r.Header.Get("X-Session-ID")
+	body, err := ioutil.ReadAll(r.Body)
+	if nil != err {
+		log.Println("Invalid data.")
+		w.WriteHeader(http.StatusBadRequest)
+		return
+	}
+	if string(body) != id { // Mismatched IDs!
+		w.WriteHeader(http.StatusBadRequest)
+	}
+	// Maybe confirm that X-Session-ID is the same.
+	log.Println("Received snowflake: ", id)
+	snowflake := AddSnowflake(id)
+
+	// Wait for a client to avail an offer to the snowflake, or timeout
+	// and ask the snowflake to poll later.
+	select {
+	case offer := <-snowflake.offerChannel:
+		log.Println("Passing client offer to snowflake.")
+		w.Write(offer)
+
+	case <-time.After(time.Second * 10):
+		// This snowflake is no longer available to serve clients.
+		heap.Remove(snowflakes, snowflake.index)
+		delete(snowflakeMap, snowflake.id)
+		w.WriteHeader(http.StatusGatewayTimeout)
+	}
+}
+
+/*
+Expects snowflake proxes which have previously successfully received
+an offer from proxyHandler to respond with an answer in an HTTP POST,
+which the broker will pass back to the original client.
+*/
+func answerHandler(w http.ResponseWriter, r *http.Request) {
+	w.Header().Set("Access-Control-Allow-Origin", "*")
+	w.Header().Set("Access-Control-Allow-Headers", "X-Session-ID")
+	// For CORS preflight.
+	if "OPTIONS" == r.Method {
+		return
+	}
+
+	id := r.Header.Get("X-Session-ID")
+	snowflake, ok := snowflakeMap[id]
+	if !ok || nil == snowflake {
+		// The snowflake took too long to respond with an answer,
+		// and the designated client is no longer around / recognized by the Broker.
+		w.WriteHeader(http.StatusGone)
+		return
+	}
+	body, err := ioutil.ReadAll(r.Body)
+	if nil != err {
+		log.Println("Invalid data.")
+		w.WriteHeader(http.StatusBadRequest)
+		return
+	}
+	log.Println("Received answer: ", body)
+	snowflake.answerChannel <- body
+}
+
+func init() {
+	snowflakes = new(SnowflakeHeap)
+	snowflakeMap = make(map[string]*Snowflake)
+	heap.Init(snowflakes)
+
+	http.HandleFunc("/robots.txt", robotsTxtHandler)
+	http.HandleFunc("/ip", ipHandler)
+
+	http.HandleFunc("/client", clientHandler)
+	http.HandleFunc("/proxy", proxyHandler)
+	http.HandleFunc("/answer", answerHandler)
+}
diff --git a/broker/snowflake-broker.go b/broker/snowflake-broker.go
deleted file mode 100644
index a6daa87..0000000
--- a/broker/snowflake-broker.go
+++ /dev/null
@@ -1,223 +0,0 @@
-/*
-Broker acts as the HTTP signaling channel.
-It matches clients and snowflake proxies by passing corresponding
-SessionDescriptions in order to negotiate a WebRTC connection.
-
-TODO(serene): This code is currently the absolute minimum required to
-cause a successful negotiation.
-It's otherwise very unsafe and problematic, and needs quite some work...
-*/
-package snowflake_broker
-
-import (
-	"container/heap"
-	// "fmt"
-	"io/ioutil"
-	"log"
-	"net"
-	"net/http"
-	"time"
-)
-
-// This is an intermediate step - a basic hardcoded appengine rendezvous
-// to a single browser snowflake.
-
-// This is minimum viable client-proxy registration.
-// TODO: better, more secure registration corresponding to what's in
-// the python flashproxy facilitator.
-
-// Slice of available snowflake proxies.
-// var snowflakes []chan []byte
-
-type Snowflake struct {
-	id            string
-	offerChannel  chan []byte
-	answerChannel chan []byte
-	clients       int
-	index         int
-}
-
-// Implements heap.Interface, and holds Snowflakes.
-type SnowflakeHeap []*Snowflake
-
-func (sh SnowflakeHeap) Len() int { return len(sh) }
-
-func (sh SnowflakeHeap) Less(i, j int) bool {
-	// Snowflakes serving less clients should sort earlier.
-	return sh[i].clients < sh[j].clients
-}
-
-func (sh SnowflakeHeap) Swap(i, j int) {
-	sh[i], sh[j] = sh[j], sh[i]
-	sh[i].index = i
-	sh[j].index = j
-}
-
-func (sh *SnowflakeHeap) Push(s interface{}) {
-	n := len(*sh)
-	snowflake := s.(*Snowflake)
-	snowflake.index = n
-	*sh = append(*sh, snowflake)
-}
-
-// Only valid when Len() > 0.
-func (sh *SnowflakeHeap) Pop() interface{} {
-	flakes := *sh
-	n := len(flakes)
-	snowflake := flakes[n-1]
-	snowflake.index = -1
-	*sh = flakes[0 : n-1]
-	return snowflake
-}
-
-var snowflakes *SnowflakeHeap
-var snowflakeMap map[string]*Snowflake
-
-// Create and add a Snowflake to the heap.
-func AddSnowflake(id string) *Snowflake {
-	snowflake := new(Snowflake)
-	snowflake.id = id
-	snowflake.clients = 0
-	snowflake.offerChannel = make(chan []byte)
-	snowflake.answerChannel = make(chan []byte)
-	heap.Push(snowflakes, snowflake)
-	snowflakeMap[id] = snowflake
-	return snowflake
-}
-
-func robotsTxtHandler(w http.ResponseWriter, r *http.Request) {
-	w.Header().Set("Content-Type", "text/plain; charset=utf-8")
-	w.Write([]byte("User-agent: *\nDisallow:\n"))
-}
-
-func ipHandler(w http.ResponseWriter, r *http.Request) {
-	remoteAddr := r.RemoteAddr
-	if net.ParseIP(remoteAddr).To4() == nil {
-		remoteAddr = "[" + remoteAddr + "]"
-	}
-	w.Header().Set("Content-Type", "text/plain; charset=utf-8")
-	w.Write([]byte(remoteAddr))
-}
-
-/*
-Expects a WebRTC SDP offer in the Request to give to an assigned
-snowflake proxy, which responds with the SDP answer to be sent in
-the HTTP response back to the client.
-*/
-func clientHandler(w http.ResponseWriter, r *http.Request) {
-	offer, err := ioutil.ReadAll(r.Body)
-	if nil != err {
-		log.Println("Invalid data.")
-		w.WriteHeader(http.StatusBadRequest)
-		return
-	}
-	w.Header().Set("Access-Control-Allow-Origin", "*")
-	w.Header().Set("Access-Control-Allow-Headers", "X-Session-ID")
-
-	// Find the most available snowflake proxy, and pass the offer to it.
-	// TODO: Needs improvement.
-	snowflake := heap.Pop(snowflakes).(*Snowflake)
-	if nil == snowflake {
-		w.WriteHeader(http.StatusServiceUnavailable)
-		// w.Write([]byte("no snowflake proxies available"))
-		return
-	}
-	snowflake.offerChannel <- offer
-
-	// Wait for the answer to be returned on the channel.
-	select {
-	case answer := <-snowflake.answerChannel:
-		log.Println("Retrieving answer")
-		w.Write(answer)
-		// Only remove from the snowflake map once the answer is set.
-		delete(snowflakeMap, snowflake.id)
-
-	case <-time.After(time.Second * 10):
-		w.WriteHeader(http.StatusGatewayTimeout)
-		w.Write([]byte("timed out waiting for answer!"))
-	}
-}
-
-/*
-For snowflake proxies to request a client from the Broker.
-*/
-func proxyHandler(w http.ResponseWriter, r *http.Request) {
-	w.Header().Set("Access-Control-Allow-Origin", "*")
-	w.Header().Set("Access-Control-Allow-Headers", "Origin, X-Session-ID")
-	// For CORS preflight.
-	if "OPTIONS" == r.Method {
-		return
-	}
-
-	id := r.Header.Get("X-Session-ID")
-	body, err := ioutil.ReadAll(r.Body)
-	if nil != err {
-		log.Println("Invalid data.")
-		w.WriteHeader(http.StatusBadRequest)
-		return
-	}
-	if string(body) != id { // Mismatched IDs!
-		w.WriteHeader(http.StatusBadRequest)
-	}
-	// Maybe confirm that X-Session-ID is the same.
-	log.Println("Received snowflake: ", id)
-	snowflake := AddSnowflake(id)
-
-	// Wait for a client to avail an offer to the snowflake, or timeout
-	// and ask the snowflake to poll later.
-	select {
-	case offer := <-snowflake.offerChannel:
-		log.Println("Passing client offer to snowflake.")
-		w.Write(offer)
-
-	case <-time.After(time.Second * 10):
-		// This snowflake is no longer available to serve clients.
-		heap.Remove(snowflakes, snowflake.index)
-		delete(snowflakeMap, snowflake.id)
-		w.WriteHeader(http.StatusGatewayTimeout)
-	}
-}
-
-/*
-Expects snowflake proxes which have previously successfully received
-an offer from proxyHandler to respond with an answer in an HTTP POST,
-which the broker will pass back to the original client.
-*/
-func answerHandler(w http.ResponseWriter, r *http.Request) {
-	w.Header().Set("Access-Control-Allow-Origin", "*")
-	w.Header().Set("Access-Control-Allow-Headers", "X-Session-ID")
-	// For CORS preflight.
-	if "OPTIONS" == r.Method {
-		return
-	}
-
-	id := r.Header.Get("X-Session-ID")
-	snowflake, ok := snowflakeMap[id]
-	if !ok || nil == snowflake {
-		// The snowflake took too long to respond with an answer,
-		// and the designated client is no longer around / recognized by the Broker.
-		w.WriteHeader(http.StatusGone)
-		return
-	}
-	body, err := ioutil.ReadAll(r.Body)
-	if nil != err {
-		log.Println("Invalid data.")
-		w.WriteHeader(http.StatusBadRequest)
-		return
-	}
-	log.Println("Received answer: ", body)
-	snowflake.answerChannel <- body
-}
-
-func init() {
-	snowflakes = new(SnowflakeHeap)
-	snowflakeMap = make(map[string]*Snowflake)
-	heap.Init(snowflakes)
-
-	http.HandleFunc("/robots.txt", robotsTxtHandler)
-	http.HandleFunc("/ip", ipHandler)
-
-	http.HandleFunc("/client", clientHandler)
-	http.HandleFunc("/proxy", proxyHandler)
-	http.HandleFunc("/answer", answerHandler)
-}
diff --git a/broker/snowflake-heap.go b/broker/snowflake-heap.go
new file mode 100644
index 0000000..d37228f
--- /dev/null
+++ b/broker/snowflake-heap.go
@@ -0,0 +1,46 @@
+/*
+Keeping track of pending available snowflake proxies.
+*/
+
+package snowflake_broker
+
+type Snowflake struct {
+	id            string
+	offerChannel  chan []byte
+	answerChannel chan []byte
+	clients       int
+	index         int
+}
+
+// Implements heap.Interface, and holds Snowflakes.
+type SnowflakeHeap []*Snowflake
+
+func (sh SnowflakeHeap) Len() int { return len(sh) }
+
+func (sh SnowflakeHeap) Less(i, j int) bool {
+	// Snowflakes serving less clients should sort earlier.
+	return sh[i].clients < sh[j].clients
+}
+
+func (sh SnowflakeHeap) Swap(i, j int) {
+	sh[i], sh[j] = sh[j], sh[i]
+	sh[i].index = i
+	sh[j].index = j
+}
+
+func (sh *SnowflakeHeap) Push(s interface{}) {
+	n := len(*sh)
+	snowflake := s.(*Snowflake)
+	snowflake.index = n
+	*sh = append(*sh, snowflake)
+}
+
+// Only valid when Len() > 0.
+func (sh *SnowflakeHeap) Pop() interface{} {
+	flakes := *sh
+	n := len(flakes)
+	snowflake := flakes[n-1]
+	snowflake.index = -1
+	*sh = flakes[0 : n-1]
+	return snowflake
+}





More information about the tor-commits mailing list