commit a6f6284229689330be5f69cddda27f98d15642f2
Author: Serene Han <keroserene+git(a)gmail.com>
Date: Fri Jan 29 08:56:31 2016 -0800
Separate broker HTTP and snowflake heap go files
---
broker/broker.go | 178 ++++++++++++++++++++++++++++++++++++
broker/snowflake-broker.go | 223 ---------------------------------------------
broker/snowflake-heap.go | 46 ++++++++++
3 files changed, 224 insertions(+), 223 deletions(-)
diff --git a/broker/broker.go b/broker/broker.go
new file mode 100644
index 0000000..dda20cb
--- /dev/null
+++ b/broker/broker.go
@@ -0,0 +1,178 @@
+/*
+Broker acts as the HTTP signaling channel.
+It matches clients and snowflake proxies by passing corresponding
+SessionDescriptions in order to negotiate a WebRTC connection.
+
+TODO(serene): This code is currently the absolute minimum required to
+cause a successful negotiation.
+It's otherwise very unsafe and problematic, and needs quite some work...
+*/
+package snowflake_broker
+
+import (
+ "container/heap"
+ "io/ioutil"
+ "log"
+ "net"
+ "net/http"
+ "time"
+)
+
+// This is minimum viable client-proxy registration.
+// TODO(#13): better, more secure registration corresponding to what's in
+// the python flashproxy facilitator.
+
+var snowflakes *SnowflakeHeap
+
+// Map keeping track of snowflakeIDs required to match SDP answers from
+// the second http POST.
+var snowflakeMap map[string]*Snowflake
+
+// Create and add a Snowflake to the heap.
+func AddSnowflake(id string) *Snowflake {
+ snowflake := new(Snowflake)
+ snowflake.id = id
+ snowflake.clients = 0
+ snowflake.offerChannel = make(chan []byte)
+ snowflake.answerChannel = make(chan []byte)
+ heap.Push(snowflakes, snowflake)
+ snowflakeMap[id] = snowflake
+ return snowflake
+}
+
+func robotsTxtHandler(w http.ResponseWriter, r *http.Request) {
+ w.Header().Set("Content-Type", "text/plain; charset=utf-8")
+ w.Write([]byte("User-agent: *\nDisallow:\n"))
+}
+
+func ipHandler(w http.ResponseWriter, r *http.Request) {
+ remoteAddr := r.RemoteAddr
+ if net.ParseIP(remoteAddr).To4() == nil {
+ remoteAddr = "[" + remoteAddr + "]"
+ }
+ w.Header().Set("Content-Type", "text/plain; charset=utf-8")
+ w.Write([]byte(remoteAddr))
+}
+
+/*
+Expects a WebRTC SDP offer in the Request to give to an assigned
+snowflake proxy, which responds with the SDP answer to be sent in
+the HTTP response back to the client.
+*/
+func clientHandler(w http.ResponseWriter, r *http.Request) {
+ offer, err := ioutil.ReadAll(r.Body)
+ if nil != err {
+ log.Println("Invalid data.")
+ w.WriteHeader(http.StatusBadRequest)
+ return
+ }
+ w.Header().Set("Access-Control-Allow-Origin", "*")
+ w.Header().Set("Access-Control-Allow-Headers", "X-Session-ID")
+
+ // Find the most available snowflake proxy, and pass the offer to it.
+ // TODO: Needs improvement.
+ snowflake := heap.Pop(snowflakes).(*Snowflake)
+ if nil == snowflake {
+ w.WriteHeader(http.StatusServiceUnavailable)
+ // w.Write([]byte("no snowflake proxies available"))
+ return
+ }
+ snowflake.offerChannel <- offer
+
+ // Wait for the answer to be returned on the channel.
+ select {
+ case answer := <-snowflake.answerChannel:
+ log.Println("Retrieving answer")
+ w.Write(answer)
+ // Only remove from the snowflake map once the answer is set.
+ delete(snowflakeMap, snowflake.id)
+
+ case <-time.After(time.Second * 10):
+ w.WriteHeader(http.StatusGatewayTimeout)
+ w.Write([]byte("timed out waiting for answer!"))
+ }
+}
+
+/*
+For snowflake proxies to request a client from the Broker.
+*/
+func proxyHandler(w http.ResponseWriter, r *http.Request) {
+ w.Header().Set("Access-Control-Allow-Origin", "*")
+ w.Header().Set("Access-Control-Allow-Headers", "Origin, X-Session-ID")
+ // For CORS preflight.
+ if "OPTIONS" == r.Method {
+ return
+ }
+
+ id := r.Header.Get("X-Session-ID")
+ body, err := ioutil.ReadAll(r.Body)
+ if nil != err {
+ log.Println("Invalid data.")
+ w.WriteHeader(http.StatusBadRequest)
+ return
+ }
+ if string(body) != id { // Mismatched IDs!
+ w.WriteHeader(http.StatusBadRequest)
+ }
+ // Maybe confirm that X-Session-ID is the same.
+ log.Println("Received snowflake: ", id)
+ snowflake := AddSnowflake(id)
+
+ // Wait for a client to avail an offer to the snowflake, or timeout
+ // and ask the snowflake to poll later.
+ select {
+ case offer := <-snowflake.offerChannel:
+ log.Println("Passing client offer to snowflake.")
+ w.Write(offer)
+
+ case <-time.After(time.Second * 10):
+ // This snowflake is no longer available to serve clients.
+ heap.Remove(snowflakes, snowflake.index)
+ delete(snowflakeMap, snowflake.id)
+ w.WriteHeader(http.StatusGatewayTimeout)
+ }
+}
+
+/*
+Expects snowflake proxes which have previously successfully received
+an offer from proxyHandler to respond with an answer in an HTTP POST,
+which the broker will pass back to the original client.
+*/
+func answerHandler(w http.ResponseWriter, r *http.Request) {
+ w.Header().Set("Access-Control-Allow-Origin", "*")
+ w.Header().Set("Access-Control-Allow-Headers", "X-Session-ID")
+ // For CORS preflight.
+ if "OPTIONS" == r.Method {
+ return
+ }
+
+ id := r.Header.Get("X-Session-ID")
+ snowflake, ok := snowflakeMap[id]
+ if !ok || nil == snowflake {
+ // The snowflake took too long to respond with an answer,
+ // and the designated client is no longer around / recognized by the Broker.
+ w.WriteHeader(http.StatusGone)
+ return
+ }
+ body, err := ioutil.ReadAll(r.Body)
+ if nil != err {
+ log.Println("Invalid data.")
+ w.WriteHeader(http.StatusBadRequest)
+ return
+ }
+ log.Println("Received answer: ", body)
+ snowflake.answerChannel <- body
+}
+
+func init() {
+ snowflakes = new(SnowflakeHeap)
+ snowflakeMap = make(map[string]*Snowflake)
+ heap.Init(snowflakes)
+
+ http.HandleFunc("/robots.txt", robotsTxtHandler)
+ http.HandleFunc("/ip", ipHandler)
+
+ http.HandleFunc("/client", clientHandler)
+ http.HandleFunc("/proxy", proxyHandler)
+ http.HandleFunc("/answer", answerHandler)
+}
diff --git a/broker/snowflake-broker.go b/broker/snowflake-broker.go
deleted file mode 100644
index a6daa87..0000000
--- a/broker/snowflake-broker.go
+++ /dev/null
@@ -1,223 +0,0 @@
-/*
-Broker acts as the HTTP signaling channel.
-It matches clients and snowflake proxies by passing corresponding
-SessionDescriptions in order to negotiate a WebRTC connection.
-
-TODO(serene): This code is currently the absolute minimum required to
-cause a successful negotiation.
-It's otherwise very unsafe and problematic, and needs quite some work...
-*/
-package snowflake_broker
-
-import (
- "container/heap"
- // "fmt"
- "io/ioutil"
- "log"
- "net"
- "net/http"
- "time"
-)
-
-// This is an intermediate step - a basic hardcoded appengine rendezvous
-// to a single browser snowflake.
-
-// This is minimum viable client-proxy registration.
-// TODO: better, more secure registration corresponding to what's in
-// the python flashproxy facilitator.
-
-// Slice of available snowflake proxies.
-// var snowflakes []chan []byte
-
-type Snowflake struct {
- id string
- offerChannel chan []byte
- answerChannel chan []byte
- clients int
- index int
-}
-
-// Implements heap.Interface, and holds Snowflakes.
-type SnowflakeHeap []*Snowflake
-
-func (sh SnowflakeHeap) Len() int { return len(sh) }
-
-func (sh SnowflakeHeap) Less(i, j int) bool {
- // Snowflakes serving less clients should sort earlier.
- return sh[i].clients < sh[j].clients
-}
-
-func (sh SnowflakeHeap) Swap(i, j int) {
- sh[i], sh[j] = sh[j], sh[i]
- sh[i].index = i
- sh[j].index = j
-}
-
-func (sh *SnowflakeHeap) Push(s interface{}) {
- n := len(*sh)
- snowflake := s.(*Snowflake)
- snowflake.index = n
- *sh = append(*sh, snowflake)
-}
-
-// Only valid when Len() > 0.
-func (sh *SnowflakeHeap) Pop() interface{} {
- flakes := *sh
- n := len(flakes)
- snowflake := flakes[n-1]
- snowflake.index = -1
- *sh = flakes[0 : n-1]
- return snowflake
-}
-
-var snowflakes *SnowflakeHeap
-var snowflakeMap map[string]*Snowflake
-
-// Create and add a Snowflake to the heap.
-func AddSnowflake(id string) *Snowflake {
- snowflake := new(Snowflake)
- snowflake.id = id
- snowflake.clients = 0
- snowflake.offerChannel = make(chan []byte)
- snowflake.answerChannel = make(chan []byte)
- heap.Push(snowflakes, snowflake)
- snowflakeMap[id] = snowflake
- return snowflake
-}
-
-func robotsTxtHandler(w http.ResponseWriter, r *http.Request) {
- w.Header().Set("Content-Type", "text/plain; charset=utf-8")
- w.Write([]byte("User-agent: *\nDisallow:\n"))
-}
-
-func ipHandler(w http.ResponseWriter, r *http.Request) {
- remoteAddr := r.RemoteAddr
- if net.ParseIP(remoteAddr).To4() == nil {
- remoteAddr = "[" + remoteAddr + "]"
- }
- w.Header().Set("Content-Type", "text/plain; charset=utf-8")
- w.Write([]byte(remoteAddr))
-}
-
-/*
-Expects a WebRTC SDP offer in the Request to give to an assigned
-snowflake proxy, which responds with the SDP answer to be sent in
-the HTTP response back to the client.
-*/
-func clientHandler(w http.ResponseWriter, r *http.Request) {
- offer, err := ioutil.ReadAll(r.Body)
- if nil != err {
- log.Println("Invalid data.")
- w.WriteHeader(http.StatusBadRequest)
- return
- }
- w.Header().Set("Access-Control-Allow-Origin", "*")
- w.Header().Set("Access-Control-Allow-Headers", "X-Session-ID")
-
- // Find the most available snowflake proxy, and pass the offer to it.
- // TODO: Needs improvement.
- snowflake := heap.Pop(snowflakes).(*Snowflake)
- if nil == snowflake {
- w.WriteHeader(http.StatusServiceUnavailable)
- // w.Write([]byte("no snowflake proxies available"))
- return
- }
- snowflake.offerChannel <- offer
-
- // Wait for the answer to be returned on the channel.
- select {
- case answer := <-snowflake.answerChannel:
- log.Println("Retrieving answer")
- w.Write(answer)
- // Only remove from the snowflake map once the answer is set.
- delete(snowflakeMap, snowflake.id)
-
- case <-time.After(time.Second * 10):
- w.WriteHeader(http.StatusGatewayTimeout)
- w.Write([]byte("timed out waiting for answer!"))
- }
-}
-
-/*
-For snowflake proxies to request a client from the Broker.
-*/
-func proxyHandler(w http.ResponseWriter, r *http.Request) {
- w.Header().Set("Access-Control-Allow-Origin", "*")
- w.Header().Set("Access-Control-Allow-Headers", "Origin, X-Session-ID")
- // For CORS preflight.
- if "OPTIONS" == r.Method {
- return
- }
-
- id := r.Header.Get("X-Session-ID")
- body, err := ioutil.ReadAll(r.Body)
- if nil != err {
- log.Println("Invalid data.")
- w.WriteHeader(http.StatusBadRequest)
- return
- }
- if string(body) != id { // Mismatched IDs!
- w.WriteHeader(http.StatusBadRequest)
- }
- // Maybe confirm that X-Session-ID is the same.
- log.Println("Received snowflake: ", id)
- snowflake := AddSnowflake(id)
-
- // Wait for a client to avail an offer to the snowflake, or timeout
- // and ask the snowflake to poll later.
- select {
- case offer := <-snowflake.offerChannel:
- log.Println("Passing client offer to snowflake.")
- w.Write(offer)
-
- case <-time.After(time.Second * 10):
- // This snowflake is no longer available to serve clients.
- heap.Remove(snowflakes, snowflake.index)
- delete(snowflakeMap, snowflake.id)
- w.WriteHeader(http.StatusGatewayTimeout)
- }
-}
-
-/*
-Expects snowflake proxes which have previously successfully received
-an offer from proxyHandler to respond with an answer in an HTTP POST,
-which the broker will pass back to the original client.
-*/
-func answerHandler(w http.ResponseWriter, r *http.Request) {
- w.Header().Set("Access-Control-Allow-Origin", "*")
- w.Header().Set("Access-Control-Allow-Headers", "X-Session-ID")
- // For CORS preflight.
- if "OPTIONS" == r.Method {
- return
- }
-
- id := r.Header.Get("X-Session-ID")
- snowflake, ok := snowflakeMap[id]
- if !ok || nil == snowflake {
- // The snowflake took too long to respond with an answer,
- // and the designated client is no longer around / recognized by the Broker.
- w.WriteHeader(http.StatusGone)
- return
- }
- body, err := ioutil.ReadAll(r.Body)
- if nil != err {
- log.Println("Invalid data.")
- w.WriteHeader(http.StatusBadRequest)
- return
- }
- log.Println("Received answer: ", body)
- snowflake.answerChannel <- body
-}
-
-func init() {
- snowflakes = new(SnowflakeHeap)
- snowflakeMap = make(map[string]*Snowflake)
- heap.Init(snowflakes)
-
- http.HandleFunc("/robots.txt", robotsTxtHandler)
- http.HandleFunc("/ip", ipHandler)
-
- http.HandleFunc("/client", clientHandler)
- http.HandleFunc("/proxy", proxyHandler)
- http.HandleFunc("/answer", answerHandler)
-}
diff --git a/broker/snowflake-heap.go b/broker/snowflake-heap.go
new file mode 100644
index 0000000..d37228f
--- /dev/null
+++ b/broker/snowflake-heap.go
@@ -0,0 +1,46 @@
+/*
+Keeping track of pending available snowflake proxies.
+*/
+
+package snowflake_broker
+
+type Snowflake struct {
+ id string
+ offerChannel chan []byte
+ answerChannel chan []byte
+ clients int
+ index int
+}
+
+// Implements heap.Interface, and holds Snowflakes.
+type SnowflakeHeap []*Snowflake
+
+func (sh SnowflakeHeap) Len() int { return len(sh) }
+
+func (sh SnowflakeHeap) Less(i, j int) bool {
+ // Snowflakes serving less clients should sort earlier.
+ return sh[i].clients < sh[j].clients
+}
+
+func (sh SnowflakeHeap) Swap(i, j int) {
+ sh[i], sh[j] = sh[j], sh[i]
+ sh[i].index = i
+ sh[j].index = j
+}
+
+func (sh *SnowflakeHeap) Push(s interface{}) {
+ n := len(*sh)
+ snowflake := s.(*Snowflake)
+ snowflake.index = n
+ *sh = append(*sh, snowflake)
+}
+
+// Only valid when Len() > 0.
+func (sh *SnowflakeHeap) Pop() interface{} {
+ flakes := *sh
+ n := len(flakes)
+ snowflake := flakes[n-1]
+ snowflake.index = -1
+ *sh = flakes[0 : n-1]
+ return snowflake
+}