commit 270eb218037ca78c5a09d8e8cae9187a22cee122 Author: Cecylia Bocovich cohosh@torproject.org Date: Wed May 5 15:31:39 2021 -0400
Encode client-broker messages as json in HTTP body
Send the client poll request and response in a json-encoded format in the HTTP request body rather than sending the data in HTTP headers. This will pave the way for using domain-fronting alternatives for the Snowflake rendezvous. --- broker/broker.go | 122 ++++++++++++++++--- broker/snowflake-broker_test.go | 130 ++++++++++++++++++--- broker/snowflake-heap.go | 2 +- client/lib/lib_test.go | 22 +--- client/lib/rendezvous.go | 36 ++++-- common/messages/client.go | 107 +++++++++++++++++ .../messages/{proxy_test.go => messages_test.go} | 116 ++++++++++++++++++ 7 files changed, 472 insertions(+), 63 deletions(-)
diff --git a/broker/broker.go b/broker/broker.go index 8c1159e..906c210 100644 --- a/broker/broker.go +++ b/broker/broker.go @@ -6,6 +6,7 @@ SessionDescriptions in order to negotiate a WebRTC connection. package main
import ( + "bytes" "container/heap" "crypto/tls" "flag" @@ -39,6 +40,16 @@ const ( NATUnrestricted = "unrestricted" )
+// We support two client message formats. The legacy format is for backwards +// combatability and relies heavily on HTTP headers and status codes to convey +// information. +type clientVersion int + +const ( + v0 clientVersion = iota //legacy version + v1 +) + type BrokerContext struct { snowflakes *SnowflakeHeap restrictedSnowflakes *SnowflakeHeap @@ -90,7 +101,7 @@ type MetricsHandler struct {
func (sh SnowflakeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { w.Header().Set("Access-Control-Allow-Origin", "*") - w.Header().Set("Access-Control-Allow-Headers", "Origin, X-Session-ID, Snowflake-NAT-Type") + w.Header().Set("Access-Control-Allow-Headers", "Origin, X-Session-ID") // Return early if it's CORS preflight. if "OPTIONS" == r.Method { return @@ -170,7 +181,7 @@ func (ctx *BrokerContext) AddSnowflake(id string, proxyType string, natType stri snowflake.proxyType = proxyType snowflake.natType = natType snowflake.offerChannel = make(chan *ClientOffer) - snowflake.answerChannel = make(chan []byte) + snowflake.answerChannel = make(chan string) ctx.snowflakeLock.Lock() if natType == NATUnrestricted { heap.Push(ctx.snowflakes, snowflake) @@ -245,6 +256,20 @@ type ClientOffer struct { sdp []byte }
+// Sends an encoded response to the client and an +// HTTP server error if the response encoding fails +func sendClientResponse(resp *messages.ClientPollResponse, w http.ResponseWriter) { + data, err := resp.EncodePollResponse() + if err != nil { + log.Printf("error encoding answer") + w.WriteHeader(http.StatusInternalServerError) + } else { + if _, err := w.Write([]byte(data)); err != nil { + log.Printf("unable to write answer with error: %v", err) + } + } +} + /* Expects a WebRTC SDP offer in the Request to give to an assigned snowflake proxy, which responds with the SDP answer to be sent in @@ -252,19 +277,55 @@ the HTTP response back to the client. */ func clientOffers(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) { var err error + var version clientVersion
startTime := time.Now() - offer := &ClientOffer{} - offer.sdp, err = ioutil.ReadAll(http.MaxBytesReader(w, r.Body, readLimit)) - if nil != err { - log.Println("Invalid data.") - w.WriteHeader(http.StatusBadRequest) + body, err := ioutil.ReadAll(http.MaxBytesReader(w, r.Body, readLimit)) + if err != nil { + log.Printf("Error reading client request: %s", err.Error()) + w.WriteHeader(http.StatusInternalServerError) return } + if len(body) > 0 && body[0] == '{' { + version = v0 + } else { + parts := bytes.SplitN(body, []byte("\n"), 2) + if len(parts) < 2 { + // no version number found + err := fmt.Errorf("unsupported message version") + sendClientResponse(&messages.ClientPollResponse{Error: err.Error()}, w) + return + } + body = parts[1] + if string(parts[0]) == "1.0" { + version = v1 + + } else { + err := fmt.Errorf("unsupported message version") + sendClientResponse(&messages.ClientPollResponse{Error: err.Error()}, w) + return + } + }
- offer.natType = r.Header.Get("Snowflake-NAT-Type") - if offer.natType == "" { - offer.natType = NATUnknown + var offer *ClientOffer + switch version { + case v0: + offer = &ClientOffer{ + natType: r.Header.Get("Snowflake-NAT-Type"), + sdp: body, + } + case v1: + req, err := messages.DecodeClientPollRequest(body) + if err != nil { + sendClientResponse(&messages.ClientPollResponse{Error: err.Error()}, w) + return + } + offer = &ClientOffer{ + natType: req.NAT, + sdp: []byte(req.Offer), + } + default: + panic("unknown version") }
// Only hand out known restricted snowflakes to unrestricted clients @@ -289,7 +350,15 @@ func clientOffers(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) { ctx.metrics.clientRestrictedDeniedCount++ } ctx.metrics.lock.Unlock() - w.WriteHeader(http.StatusServiceUnavailable) + switch version { + case v0: + w.WriteHeader(http.StatusServiceUnavailable) + case v1: + resp := &messages.ClientPollResponse{Error: "no snowflake proxies currently available"} + sendClientResponse(resp, w) + default: + panic("unknown version") + } return } // Otherwise, find the most available snowflake proxy, and pass the offer to it. @@ -306,17 +375,36 @@ func clientOffers(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) { ctx.metrics.clientProxyMatchCount++ ctx.metrics.promMetrics.ClientPollTotal.With(prometheus.Labels{"nat": offer.natType, "status": "matched"}).Inc() ctx.metrics.lock.Unlock() - if _, err := w.Write(answer); err != nil { - log.Printf("unable to write answer with error: %v", err) + switch version { + case v0: + if _, err := w.Write([]byte(answer)); err != nil { + log.Printf("unable to write answer with error: %v", err) + } + case v1: + resp := &messages.ClientPollResponse{Answer: answer} + sendClientResponse(resp, w) + default: + panic("unknown version") } // Initial tracking of elapsed time. ctx.metrics.clientRoundtripEstimate = time.Since(startTime) / time.Millisecond case <-time.After(time.Second * ClientTimeout): log.Println("Client: Timed out.") - w.WriteHeader(http.StatusGatewayTimeout) - if _, err := w.Write([]byte("timed out waiting for answer!")); err != nil { - log.Printf("unable to write timeout error, failed with error: %v", err) + switch version { + case v0: + w.WriteHeader(http.StatusGatewayTimeout) + if _, err := w.Write( + []byte("timed out waiting for answer!")); err != nil { + log.Printf("unable to write timeout error, failed with error: %v", + err) + } + case v1: + resp := &messages.ClientPollResponse{ + Error: "timed out waiting for answer!"} + sendClientResponse(resp, w) + default: + panic("unknown version") } }
@@ -364,7 +452,7 @@ func proxyAnswers(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) { w.Write(b)
if success { - snowflake.answerChannel <- []byte(answer) + snowflake.answerChannel <- answer }
} diff --git a/broker/snowflake-broker_test.go b/broker/snowflake-broker_test.go index b676b04..646fb02 100644 --- a/broker/snowflake-broker_test.go +++ b/broker/snowflake-broker_test.go @@ -70,10 +70,59 @@ func TestBroker(t *testing.T) {
Convey("Responds to client offers...", func() { w := httptest.NewRecorder() - data := bytes.NewReader([]byte("test")) + data := bytes.NewReader( + []byte("1.0\n{"offer": "fake", "nat": "unknown"}")) r, err := http.NewRequest("POST", "snowflake.broker/client", data) So(err, ShouldBeNil)
+ Convey("with error when no snowflakes are available.", func() { + clientOffers(ctx, w, r) + So(w.Code, ShouldEqual, http.StatusOK) + So(w.Body.String(), ShouldEqual, `{"error":"no snowflake proxies currently available"}`) + }) + + Convey("with a proxy answer if available.", func() { + done := make(chan bool) + // Prepare a fake proxy to respond with. + snowflake := ctx.AddSnowflake("fake", "", NATUnrestricted) + go func() { + clientOffers(ctx, w, r) + done <- true + }() + offer := <-snowflake.offerChannel + So(offer.sdp, ShouldResemble, []byte("fake")) + snowflake.answerChannel <- "fake answer" + <-done + So(w.Body.String(), ShouldEqual, `{"answer":"fake answer"}`) + So(w.Code, ShouldEqual, http.StatusOK) + }) + + Convey("Times out when no proxy responds.", func() { + if testing.Short() { + return + } + done := make(chan bool) + snowflake := ctx.AddSnowflake("fake", "", NATUnrestricted) + go func() { + clientOffers(ctx, w, r) + // Takes a few seconds here... + done <- true + }() + offer := <-snowflake.offerChannel + So(offer.sdp, ShouldResemble, []byte("fake")) + <-done + So(w.Code, ShouldEqual, http.StatusOK) + So(w.Body.String(), ShouldEqual, `{"error":"timed out waiting for answer!"}`) + }) + }) + + Convey("Responds to legacy client offers...", func() { + w := httptest.NewRecorder() + data := bytes.NewReader([]byte("{test}")) + r, err := http.NewRequest("POST", "snowflake.broker/client", data) + So(err, ShouldBeNil) + r.Header.Set("Snowflake-NAT-TYPE", "restricted") + Convey("with 503 when no snowflakes are available.", func() { clientOffers(ctx, w, r) So(w.Code, ShouldEqual, http.StatusServiceUnavailable) @@ -89,8 +138,8 @@ func TestBroker(t *testing.T) { done <- true }() offer := <-snowflake.offerChannel - So(offer.sdp, ShouldResemble, []byte("test")) - snowflake.answerChannel <- []byte("fake answer") + So(offer.sdp, ShouldResemble, []byte("{test}")) + snowflake.answerChannel <- "fake answer" <-done So(w.Body.String(), ShouldEqual, "fake answer") So(w.Code, ShouldEqual, http.StatusOK) @@ -108,10 +157,11 @@ func TestBroker(t *testing.T) { done <- true }() offer := <-snowflake.offerChannel - So(offer.sdp, ShouldResemble, []byte("test")) + So(offer.sdp, ShouldResemble, []byte("{test}")) <-done So(w.Code, ShouldEqual, http.StatusGatewayTimeout) }) + })
Convey("Responds to proxy polls...", func() { @@ -163,7 +213,7 @@ func TestBroker(t *testing.T) { }(ctx) answer := <-s.answerChannel So(w.Code, ShouldEqual, http.StatusOK) - So(answer, ShouldResemble, []byte("test")) + So(answer, ShouldResemble, "test") })
Convey("with client gone status if the proxy is not recognized", func() { @@ -272,7 +322,8 @@ func TestBroker(t *testing.T) { So(ctx.idToSnowflake["ymbcCMto7KHNGYlp"], ShouldNotBeNil)
// Client request blocks until proxy answer arrives. - dataC := bytes.NewReader([]byte("fake offer")) + dataC := bytes.NewReader( + []byte("1.0\n{"offer": "fake", "nat": "unknown"}")) wC := httptest.NewRecorder() rC, err := http.NewRequest("POST", "snowflake.broker/client", dataC) So(err, ShouldBeNil) @@ -283,7 +334,7 @@ func TestBroker(t *testing.T) {
<-polled So(wP.Code, ShouldEqual, http.StatusOK) - So(wP.Body.String(), ShouldResemble, `{"Status":"client match","Offer":"fake offer","NAT":"unknown"}`) + So(wP.Body.String(), ShouldResemble, `{"Status":"client match","Offer":"fake","NAT":"unknown"}`) So(ctx.idToSnowflake["ymbcCMto7KHNGYlp"], ShouldNotBeNil) // Follow up with the answer request afterwards wA := httptest.NewRecorder() @@ -295,7 +346,7 @@ func TestBroker(t *testing.T) {
<-done So(wC.Code, ShouldEqual, http.StatusOK) - So(wC.Body.String(), ShouldEqual, "test") + So(wC.Body.String(), ShouldEqual, `{"answer":"test"}`) }) }) } @@ -517,7 +568,8 @@ func TestMetrics(t *testing.T) { //Test addition of client failures Convey("for no proxies available", func() { w := httptest.NewRecorder() - data := bytes.NewReader([]byte("test")) + data := bytes.NewReader( + []byte("1.0\n{"offer": "fake", "nat": "unknown"}")) r, err := http.NewRequest("POST", "snowflake.broker/client", data) So(err, ShouldBeNil)
@@ -535,7 +587,8 @@ func TestMetrics(t *testing.T) { //Test addition of client matches Convey("for client-proxy match", func() { w := httptest.NewRecorder() - data := bytes.NewReader([]byte("test")) + data := bytes.NewReader( + []byte("1.0\n{"offer": "fake", "nat": "unknown"}")) r, err := http.NewRequest("POST", "snowflake.broker/client", data) So(err, ShouldBeNil)
@@ -546,8 +599,8 @@ func TestMetrics(t *testing.T) { done <- true }() offer := <-snowflake.offerChannel - So(offer.sdp, ShouldResemble, []byte("test")) - snowflake.answerChannel <- []byte("fake answer") + So(offer.sdp, ShouldResemble, []byte("fake")) + snowflake.answerChannel <- "fake answer" <-done
ctx.metrics.printMetrics() @@ -556,22 +609,63 @@ func TestMetrics(t *testing.T) { //Test rounding boundary Convey("binning boundary", func() { w := httptest.NewRecorder() - data := bytes.NewReader([]byte("test")) + data := bytes.NewReader( + []byte("1.0\n{"offer": "fake", "nat": "restricted"}")) r, err := http.NewRequest("POST", "snowflake.broker/client", data) So(err, ShouldBeNil)
clientOffers(ctx, w, r) + w = httptest.NewRecorder() + data = bytes.NewReader( + []byte("1.0\n{"offer": "fake", "nat": "restricted"}")) + r, err = http.NewRequest("POST", "snowflake.broker/client", data) + So(err, ShouldBeNil) clientOffers(ctx, w, r) + w = httptest.NewRecorder() + data = bytes.NewReader( + []byte("1.0\n{"offer": "fake", "nat": "restricted"}")) + r, err = http.NewRequest("POST", "snowflake.broker/client", data) + So(err, ShouldBeNil) clientOffers(ctx, w, r) + w = httptest.NewRecorder() + data = bytes.NewReader( + []byte("1.0\n{"offer": "fake", "nat": "restricted"}")) + r, err = http.NewRequest("POST", "snowflake.broker/client", data) + So(err, ShouldBeNil) clientOffers(ctx, w, r) + w = httptest.NewRecorder() + data = bytes.NewReader( + []byte("1.0\n{"offer": "fake", "nat": "restricted"}")) + r, err = http.NewRequest("POST", "snowflake.broker/client", data) + So(err, ShouldBeNil) clientOffers(ctx, w, r) + w = httptest.NewRecorder() + data = bytes.NewReader( + []byte("1.0\n{"offer": "fake", "nat": "restricted"}")) + r, err = http.NewRequest("POST", "snowflake.broker/client", data) + So(err, ShouldBeNil) clientOffers(ctx, w, r) + w = httptest.NewRecorder() + data = bytes.NewReader( + []byte("1.0\n{"offer": "fake", "nat": "restricted"}")) + r, err = http.NewRequest("POST", "snowflake.broker/client", data) + So(err, ShouldBeNil) clientOffers(ctx, w, r) + w = httptest.NewRecorder() + data = bytes.NewReader( + []byte("1.0\n{"offer": "fake", "nat": "restricted"}")) + r, err = http.NewRequest("POST", "snowflake.broker/client", data) + So(err, ShouldBeNil) clientOffers(ctx, w, r)
ctx.metrics.printMetrics() So(buf.String(), ShouldContainSubstring, "client-denied-count 8\nclient-restricted-denied-count 8\nclient-unrestricted-denied-count 0\n")
+ w = httptest.NewRecorder() + data = bytes.NewReader( + []byte("1.0\n{"offer": "fake", "nat": "restricted"}")) + r, err = http.NewRequest("POST", "snowflake.broker/client", data) + So(err, ShouldBeNil) clientOffers(ctx, w, r) buf.Reset() ctx.metrics.printMetrics() @@ -648,9 +742,9 @@ func TestMetrics(t *testing.T) { //Test client failures by NAT type Convey("client failures by NAT type", func() { w := httptest.NewRecorder() - data := bytes.NewReader([]byte("test")) + data := bytes.NewReader( + []byte("1.0\n{"offer": "fake", "nat": "restricted"}")) r, err := http.NewRequest("POST", "snowflake.broker/client", data) - r.Header.Set("Snowflake-NAT-TYPE", "restricted") So(err, ShouldBeNil)
clientOffers(ctx, w, r) @@ -661,8 +755,9 @@ func TestMetrics(t *testing.T) { buf.Reset() ctx.metrics.zeroMetrics()
+ data = bytes.NewReader( + []byte("1.0\n{"offer": "fake", "nat": "unrestricted"}")) r, err = http.NewRequest("POST", "snowflake.broker/client", data) - r.Header.Set("Snowflake-NAT-TYPE", "unrestricted") So(err, ShouldBeNil)
clientOffers(ctx, w, r) @@ -673,8 +768,9 @@ func TestMetrics(t *testing.T) { buf.Reset() ctx.metrics.zeroMetrics()
+ data = bytes.NewReader( + []byte("1.0\n{"offer": "fake", "nat": "unknown"}")) r, err = http.NewRequest("POST", "snowflake.broker/client", data) - r.Header.Set("Snowflake-NAT-TYPE", "unknown") So(err, ShouldBeNil)
clientOffers(ctx, w, r) diff --git a/broker/snowflake-heap.go b/broker/snowflake-heap.go index 16dd264..80c1f57 100644 --- a/broker/snowflake-heap.go +++ b/broker/snowflake-heap.go @@ -13,7 +13,7 @@ type Snowflake struct { proxyType string natType string offerChannel chan *ClientOffer - answerChannel chan []byte + answerChannel chan string clients int index int } diff --git a/client/lib/lib_test.go b/client/lib/lib_test.go index 6140e0b..e742e06 100644 --- a/client/lib/lib_test.go +++ b/client/lib/lib_test.go @@ -176,7 +176,7 @@ func TestSnowflakeClient(t *testing.T) { Convey("Rendezvous", t, func() { transport := &MockTransport{ http.StatusOK, - []byte(`{"type":"answer","sdp":"fake"}`), + []byte(`{"answer": "{"type":"answer","sdp":"fake"}" }`), } fakeOffer, err := util.DeserializeSessionDescription(`{"type":"offer","sdp":"test"}`) if err != nil { @@ -209,26 +209,25 @@ func TestSnowflakeClient(t *testing.T) { So(answer.SDP, ShouldResemble, "fake") })
- Convey("BrokerChannel.Negotiate fails with 503", func() { + Convey("BrokerChannel.Negotiate fails", func() { b, err := NewBrokerChannel("test.broker", "", - &MockTransport{http.StatusServiceUnavailable, []byte("\n")}, + &MockTransport{http.StatusOK, []byte(`{"error": "no snowflake proxies currently available"}`)}, false) So(err, ShouldBeNil) answer, err := b.Negotiate(fakeOffer) So(err, ShouldNotBeNil) So(answer, ShouldBeNil) - So(err.Error(), ShouldResemble, BrokerError503) })
- Convey("BrokerChannel.Negotiate fails with 400", func() { + Convey("BrokerChannel.Negotiate fails with unexpected error", func() { b, err := NewBrokerChannel("test.broker", "", - &MockTransport{http.StatusBadRequest, []byte("\n")}, + &MockTransport{http.StatusInternalServerError, []byte("\n")}, false) So(err, ShouldBeNil) answer, err := b.Negotiate(fakeOffer) So(err, ShouldNotBeNil) So(answer, ShouldBeNil) - So(err.Error(), ShouldResemble, BrokerError400) + So(err.Error(), ShouldResemble, BrokerErrorUnexpected) })
Convey("BrokerChannel.Negotiate fails with large read", func() { @@ -242,15 +241,6 @@ func TestSnowflakeClient(t *testing.T) { So(err.Error(), ShouldResemble, "unexpected EOF") })
- Convey("BrokerChannel.Negotiate fails with unexpected error", func() { - b, err := NewBrokerChannel("test.broker", "", - &MockTransport{123, []byte("")}, false) - So(err, ShouldBeNil) - answer, err := b.Negotiate(fakeOffer) - So(err, ShouldNotBeNil) - So(answer, ShouldBeNil) - So(err.Error(), ShouldResemble, BrokerErrorUnexpected) - }) })
} diff --git a/client/lib/rendezvous.go b/client/lib/rendezvous.go index 32da081..b89f432 100644 --- a/client/lib/rendezvous.go +++ b/client/lib/rendezvous.go @@ -19,14 +19,13 @@ import ( "sync" "time"
+ "git.torproject.org/pluggable-transports/snowflake.git/common/messages" "git.torproject.org/pluggable-transports/snowflake.git/common/nat" "git.torproject.org/pluggable-transports/snowflake.git/common/util" "github.com/pion/webrtc/v3" )
const ( - BrokerError503 string = "No snowflake proxies currently available." - BrokerError400 string = "You sent an invalid offer in the request." BrokerErrorUnexpected string = "Unexpected error, no answer." readLimit = 100000 //Maximum number of bytes to be read from an HTTP response ) @@ -107,7 +106,20 @@ func (bc *BrokerChannel) Negotiate(offer *webrtc.SessionDescription) ( if err != nil { return nil, err } - data := bytes.NewReader([]byte(offerSDP)) + + // Encode client poll request + bc.lock.Lock() + req := &messages.ClientPollRequest{ + Offer: offerSDP, + NAT: bc.NATType, + } + body, err := req.EncodePollRequest() + bc.lock.Unlock() + if err != nil { + return nil, err + } + + data := bytes.NewReader([]byte(body)) // Suffix with broker's client registration handler. clientURL := bc.url.ResolveReference(&url.URL{Path: "client"}) request, err := http.NewRequest("POST", clientURL.String(), data) @@ -117,10 +129,6 @@ func (bc *BrokerChannel) Negotiate(offer *webrtc.SessionDescription) ( if "" != bc.Host { // Set true host if necessary. request.Host = bc.Host } - // include NAT-TYPE - bc.lock.Lock() - request.Header.Set("Snowflake-NAT-TYPE", bc.NATType) - bc.lock.Unlock() resp, err := bc.transport.RoundTrip(request) if nil != err { return nil, err @@ -135,11 +143,15 @@ func (bc *BrokerChannel) Negotiate(offer *webrtc.SessionDescription) ( return nil, err } log.Printf("Received answer: %s", string(body)) - return util.DeserializeSessionDescription(string(body)) - case http.StatusServiceUnavailable: - return nil, errors.New(BrokerError503) - case http.StatusBadRequest: - return nil, errors.New(BrokerError400) + + resp, err := messages.DecodeClientPollResponse(body) + if err != nil { + return nil, err + } + if resp.Error != "" { + return nil, errors.New(resp.Error) + } + return util.DeserializeSessionDescription(resp.Answer) default: return nil, errors.New(BrokerErrorUnexpected) } diff --git a/common/messages/client.go b/common/messages/client.go new file mode 100644 index 0000000..1918e34 --- /dev/null +++ b/common/messages/client.go @@ -0,0 +1,107 @@ +//Package for communication with the snowflake broker + +//import "git.torproject.org/pluggable-transports/snowflake.git/common/messages" +package messages + +import ( + "encoding/json" + "fmt" +) + +const ClientVersion = "1.0" + +/* Client--Broker protocol v1.x specification: + +All messages contain the version number +followed by a new line and then the message body +<message> := <version>\n<body> +<version> := <digit>.<digit> +<body> := <poll request>|<poll response> + +There are two different types of body messages, +each encoded in JSON format + +== ClientPollRequest == +<poll request> := +{ + offer: <sdp offer> + [nat: (unknown|restricted|unrestricted)] +} + +The NAT field is optional, and if it is missing a +value of "unknown" will be assumed. + +== ClientPollResponse == +<poll response> := +{ + [answer: <sdp answer>] + [error: <error string>] +} + +If the broker succeeded in matching the client with a proxy, +the answer field MUST contain a valid SDP answer, and the +error field MUST be empty. If the answer field is empty, the +error field MUST contain a string explaining with a reason +for the error. + +*/ + +type ClientPollRequest struct { + Offer string `json:"offer"` + NAT string `json:"nat"` +} + +// Encodes a poll message from a snowflake client +func (req *ClientPollRequest) EncodePollRequest() ([]byte, error) { + body, err := json.Marshal(req) + if err != nil { + return nil, err + } + return append([]byte(ClientVersion+"\n"), body...), nil +} + +// Decodes a poll message from a snowflake client +func DecodeClientPollRequest(data []byte) (*ClientPollRequest, error) { + var message ClientPollRequest + + err := json.Unmarshal(data, &message) + if err != nil { + return nil, err + } + + if message.Offer == "" { + return nil, fmt.Errorf("no supplied offer") + } + + if message.NAT == "" { + message.NAT = "unknown" + } + + return &message, nil +} + +type ClientPollResponse struct { + Answer string `json:"answer,omitempty"` + Error string `json:"error,omitempty"` +} + +// Encodes a poll response for a snowflake client +func (resp *ClientPollResponse) EncodePollResponse() ([]byte, error) { + return json.Marshal(resp) +} + +// Decodes a poll response for a snowflake client +// If the Error field is empty, the Answer should be non-empty +func DecodeClientPollResponse(data []byte) (*ClientPollResponse, error) { + var message ClientPollResponse + + err := json.Unmarshal(data, &message) + if err != nil { + return nil, err + } + if message.Error == "" && message.Answer == "" { + return nil, fmt.Errorf("received empty broker response") + } + + return &message, nil +} diff --git a/common/messages/proxy_test.go b/common/messages/messages_test.go similarity index 71% rename from common/messages/proxy_test.go rename to common/messages/messages_test.go index f4191e1..3962d3b 100644 --- a/common/messages/proxy_test.go +++ b/common/messages/messages_test.go @@ -1,6 +1,7 @@ package messages
import ( + "bytes" "encoding/json" "fmt" "testing" @@ -252,3 +253,118 @@ func TestEncodeProxyAnswerResponse(t *testing.T) { So(err, ShouldEqual, nil) }) } + +func TestDecodeClientPollRequest(t *testing.T) { + Convey("Context", t, func() { + for _, test := range []struct { + natType string + offer string + data string + err error + }{ + { + //version 1.0 client message + "unknown", + "fake", + `{"nat":"unknown","offer":"fake"}`, + nil, + }, + { + //version 1.0 client message + "unknown", + "fake", + `{"offer":"fake"}`, + nil, + }, + { + //unknown version + "", + "", + `{"version":"2.0"}`, + fmt.Errorf(""), + }, + { + //no offer + "", + "", + `{"nat":"unknown"}`, + fmt.Errorf(""), + }, + } { + req, err := DecodeClientPollRequest([]byte(test.data)) + if test.err == nil { + So(req.NAT, ShouldResemble, test.natType) + So(req.Offer, ShouldResemble, test.offer) + } + So(err, ShouldHaveSameTypeAs, test.err) + } + + }) +} + +func TestEncodeClientPollRequests(t *testing.T) { + Convey("Context", t, func() { + req1 := &ClientPollRequest{ + NAT: "unknown", + Offer: "fake", + } + b, err := req1.EncodePollRequest() + So(err, ShouldEqual, nil) + fmt.Println(string(b)) + parts := bytes.SplitN(b, []byte("\n"), 2) + So(string(parts[0]), ShouldEqual, "1.0") + b = parts[1] + req2, err := DecodeClientPollRequest(b) + So(err, ShouldEqual, nil) + So(req2, ShouldResemble, req1) + }) +} + +func TestDecodeClientPollResponse(t *testing.T) { + Convey("Context", t, func() { + for _, test := range []struct { + answer string + msg string + data string + }{ + { + "fake answer", + "", + `{"answer":"fake answer"}`, + }, + { + "", + "no snowflakes", + `{"error":"no snowflakes"}`, + }, + } { + resp, err := DecodeClientPollResponse([]byte(test.data)) + So(err, ShouldBeNil) + So(resp.Answer, ShouldResemble, test.answer) + So(resp.Error, ShouldResemble, test.msg) + } + + }) +} + +func TestEncodeClientPollResponse(t *testing.T) { + Convey("Context", t, func() { + resp1 := &ClientPollResponse{ + Answer: "fake answer", + } + b, err := resp1.EncodePollResponse() + So(err, ShouldEqual, nil) + resp2, err := DecodeClientPollResponse(b) + So(err, ShouldEqual, nil) + So(resp1, ShouldResemble, resp2) + + resp1 = &ClientPollResponse{ + Error: "failed", + } + b, err = resp1.EncodePollResponse() + So(err, ShouldEqual, nil) + resp2, err = DecodeClientPollResponse(b) + So(err, ShouldEqual, nil) + So(resp1, ShouldResemble, resp2) + }) +}