commit e833119befa052e4837fe147f8bc2766a4ca7c54 Author: David Fifield david@bamsoftware.com Date: Sun Jul 18 23:37:41 2021 -0600
Broker /amp/client route (AMP cache client registration). --- broker/amp.go | 76 +++++++++++++++++++++++++++++++++++++++ broker/broker.go | 2 ++ broker/snowflake-broker_test.go | 78 +++++++++++++++++++++++++++++++++++++++-- 3 files changed, 154 insertions(+), 2 deletions(-)
diff --git a/broker/amp.go b/broker/amp.go new file mode 100644 index 0000000..8641e51 --- /dev/null +++ b/broker/amp.go @@ -0,0 +1,76 @@ +package main + +import ( + "log" + "net/http" + "strings" + + "git.torproject.org/pluggable-transports/snowflake.git/common/amp" + "git.torproject.org/pluggable-transports/snowflake.git/common/messages" +) + +// ampClientOffers is the AMP-speaking endpoint for client poll messages, +// intended for access via an AMP cache. In contrast to the other clientOffers, +// the client's encoded poll message is stored in the URL path rather than the +// HTTP request body (because an AMP cache does not support POST), and the +// encoded client poll response is sent back as AMP-armored HTML. +func ampClientOffers(i *IPC, w http.ResponseWriter, r *http.Request) { + // The encoded client poll message immediately follows the /amp/client/ + // path prefix, so this function unfortunately needs to be aware of and + // remote its own routing prefix. + path := strings.TrimPrefix(r.URL.Path, "/amp/client/") + if path == r.URL.Path { + // The path didn't start with the expected prefix. This probably + // indicates an internal bug. + log.Println("ampClientOffers: unexpected prefix in path") + w.WriteHeader(http.StatusInternalServerError) + return + } + + var encPollReq []byte + var response []byte + var err error + + encPollReq, err = amp.DecodePath(path) + if err == nil { + arg := messages.Arg{ + Body: encPollReq, + RemoteAddr: "", + } + err = i.ClientOffers(arg, &response) + } else { + response, err = (&messages.ClientPollResponse{ + Error: "cannot decode URL path", + }).EncodePollResponse() + } + + if err != nil { + // We couldn't even construct a JSON object containing an error + // message :( Nothing to do but signal an error at the HTTP + // layer. The AMP cache will translate this 500 status into a + // 404 status. + // https://amp.dev/documentation/guides-and-tutorials/learn/amp-caches-and-cors... + log.Printf("ampClientOffers: %v", err) + w.WriteHeader(http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "text/html") + // Attempt to hint to an AMP cache not to waste resources caching this + // document. "The Google AMP Cache considers any document fresh for at + // least 15 seconds." + // https://developers.google.com/amp/cache/overview#google-amp-cache-updates + w.Header().Set("Cache-Control", "max-age=15") + w.WriteHeader(http.StatusOK) + + enc, err := amp.NewArmorEncoder(w) + if err != nil { + log.Printf("amp.NewArmorEncoder: %v", err) + return + } + defer enc.Close() + + if _, err := enc.Write(response); err != nil { + log.Printf("ampClientOffers: unable to write answer: %v", err) + } +} diff --git a/broker/broker.go b/broker/broker.go index 437a4d1..6c855f3 100644 --- a/broker/broker.go +++ b/broker/broker.go @@ -218,6 +218,8 @@ func main() { http.Handle("/metrics", MetricsHandler{metricsFilename, metricsHandler}) http.Handle("/prometheus", promhttp.HandlerFor(ctx.metrics.promMetrics.registry, promhttp.HandlerOpts{}))
+ http.Handle("/amp/client/", SnowflakeHandler{i, ampClientOffers}) + server := http.Server{ Addr: addr, } diff --git a/broker/snowflake-broker_test.go b/broker/snowflake-broker_test.go index 9e1c9f1..233cfea 100644 --- a/broker/snowflake-broker_test.go +++ b/broker/snowflake-broker_test.go @@ -3,6 +3,7 @@ package main import ( "bytes" "container/heap" + "io" "io/ioutil" "log" "net" @@ -13,6 +14,7 @@ import ( "testing" "time"
+ "git.torproject.org/pluggable-transports/snowflake.git/common/amp" . "github.com/smartystreets/goconvey/convey" )
@@ -24,6 +26,15 @@ func NullLogger() *log.Logger {
var promOnce sync.Once
+func decodeAMPArmorToString(r io.Reader) (string, error) { + dec, err := amp.NewArmorDecoder(r) + if err != nil { + return "", err + } + p, err := ioutil.ReadAll(dec) + return string(p), err +} + func TestBroker(t *testing.T) {
Convey("Context", t, func() { @@ -69,7 +80,7 @@ func TestBroker(t *testing.T) { So(offer.sdp, ShouldResemble, []byte("test offer")) })
- Convey("Responds to client offers...", func() { + Convey("Responds to HTTP client offers...", func() { w := httptest.NewRecorder() data := bytes.NewReader( []byte("1.0\n{"offer": "fake", "nat": "unknown"}")) @@ -117,7 +128,7 @@ func TestBroker(t *testing.T) { }) })
- Convey("Responds to legacy client offers...", func() { + Convey("Responds to HTTP legacy client offers...", func() { w := httptest.NewRecorder() data := bytes.NewReader([]byte("{test}")) r, err := http.NewRequest("POST", "snowflake.broker/client", data) @@ -165,6 +176,69 @@ func TestBroker(t *testing.T) {
})
+ Convey("Responds to AMP client offers...", func() { + w := httptest.NewRecorder() + encPollReq := []byte("1.0\n{"offer": "fake", "nat": "unknown"}") + r, err := http.NewRequest("GET", "/amp/client/"+amp.EncodePath(encPollReq), nil) + So(err, ShouldBeNil) + + Convey("with status 200 when request is badly formatted.", func() { + r, err := http.NewRequest("GET", "/amp/client/bad", nil) + So(err, ShouldBeNil) + ampClientOffers(i, w, r) + body, err := decodeAMPArmorToString(w.Body) + So(err, ShouldBeNil) + So(body, ShouldEqual, `{"error":"cannot decode URL path"}`) + }) + + Convey("with error when no snowflakes are available.", func() { + ampClientOffers(i, w, r) + So(w.Code, ShouldEqual, http.StatusOK) + body, err := decodeAMPArmorToString(w.Body) + So(err, ShouldBeNil) + So(body, ShouldEqual, `{"error":"no snowflake proxies currently available"}`) + }) + + Convey("with a proxy answer if available.", func() { + done := make(chan bool) + // Prepare a fake proxy to respond with. + snowflake := ctx.AddSnowflake("fake", "", NATUnrestricted, 0) + go func() { + ampClientOffers(i, w, r) + done <- true + }() + offer := <-snowflake.offerChannel + So(offer.sdp, ShouldResemble, []byte("fake")) + snowflake.answerChannel <- "fake answer" + <-done + body, err := decodeAMPArmorToString(w.Body) + So(err, ShouldBeNil) + So(body, ShouldEqual, `{"answer":"fake answer"}`) + So(w.Code, ShouldEqual, http.StatusOK) + }) + + Convey("Times out when no proxy responds.", func() { + if testing.Short() { + return + } + done := make(chan bool) + snowflake := ctx.AddSnowflake("fake", "", NATUnrestricted, 0) + go func() { + ampClientOffers(i, w, r) + // Takes a few seconds here... + done <- true + }() + offer := <-snowflake.offerChannel + So(offer.sdp, ShouldResemble, []byte("fake")) + <-done + So(w.Code, ShouldEqual, http.StatusOK) + body, err := decodeAMPArmorToString(w.Body) + So(err, ShouldBeNil) + So(body, ShouldEqual, `{"error":"timed out waiting for answer!"}`) + }) + + }) + Convey("Responds to proxy polls...", func() { done := make(chan bool) w := httptest.NewRecorder()