commit 87ad06a5e2f1b0d72c64dd0ca17543f524ac1d63 Author: Arlo Breault arlolra@gmail.com Date: Thu Jun 3 17:04:58 2021 -0400
Get rid of legacy version
Move the logic for the legacy version into the http handlers and use a shim when doing ipc. --- broker/http.go | 51 ++++++++++++++++++++++++++++++++++++++------------ broker/ipc.go | 44 ++++++++++++------------------------------- common/messages/ipc.go | 7 ++----- 3 files changed, 53 insertions(+), 49 deletions(-)
diff --git a/broker/http.go b/broker/http.go index 6555d7a..2c45b2b 100644 --- a/broker/http.go +++ b/broker/http.go @@ -102,7 +102,6 @@ func proxyPolls(i *IPC, w http.ResponseWriter, r *http.Request) { arg := messages.Arg{ Body: body, RemoteAddr: r.RemoteAddr, - NatType: "", }
var response []byte @@ -138,28 +137,57 @@ func clientOffers(i *IPC, w http.ResponseWriter, r *http.Request) { return }
+ // Handle the legacy version + isLegacy := false + if len(body) > 0 && body[0] == '{' { + isLegacy = true + req := messages.ClientPollRequest{ + Offer: string(body), + NAT: r.Header.Get("Snowflake-NAT-Type"), + } + body, err = req.EncodePollRequest() + if err != nil { + log.Printf("Error shimming the legacy request: %s", err.Error()) + w.WriteHeader(http.StatusInternalServerError) + return + } + } + arg := messages.Arg{ Body: body, RemoteAddr: "", - NatType: r.Header.Get("Snowflake-NAT-Type"), }
var response []byte err = i.ClientOffers(arg, &response) - switch { - case err == nil: - case errors.Is(err, messages.ErrUnavailable): - w.WriteHeader(http.StatusServiceUnavailable) - return - case errors.Is(err, messages.ErrTimeout): - w.WriteHeader(http.StatusGatewayTimeout) - return - default: + if err != nil { + // Assert err == messages.ErrInternal log.Println(err) w.WriteHeader(http.StatusInternalServerError) return }
+ if isLegacy { + resp, err := messages.DecodeClientPollResponse(response) + if err != nil { + log.Println(err) + w.WriteHeader(http.StatusInternalServerError) + return + } + switch resp.Error { + case "": + response = []byte(resp.Answer) + case "no snowflake proxies currently available": + w.WriteHeader(http.StatusServiceUnavailable) + return + case "timed out waiting for answer!": + w.WriteHeader(http.StatusGatewayTimeout) + return + default: + panic("unknown error") + } + } + if _, err := w.Write(response); err != nil { log.Printf("clientOffers unable to write answer with error: %v", err) } @@ -181,7 +209,6 @@ func proxyAnswers(i *IPC, w http.ResponseWriter, r *http.Request) { arg := messages.Arg{ Body: body, RemoteAddr: "", - NatType: "", }
var response []byte diff --git a/broker/ipc.go b/broker/ipc.go index 79ccf0f..a05f560 100644 --- a/broker/ipc.go +++ b/broker/ipc.go @@ -21,14 +21,10 @@ const ( NATUnrestricted = "unrestricted" )
-// We support two client message formats. The legacy format is for backwards -// combatability and relies heavily on HTTP headers and status codes to convey -// information. type clientVersion int
const ( - v0 clientVersion = iota //legacy version - v1 + v1 clientVersion = iota )
type IPC struct { @@ -141,32 +137,22 @@ func (i *IPC) ClientOffers(arg messages.Arg, response *[]byte) error { startTime := time.Now() body := arg.Body
- if len(body) > 0 && body[0] == '{' { - version = v0 + parts := bytes.SplitN(body, []byte("\n"), 2) + if len(parts) < 2 { + // no version number found + err := fmt.Errorf("unsupported message version") + return sendClientResponse(&messages.ClientPollResponse{Error: err.Error()}, response) + } + body = parts[1] + if string(parts[0]) == "1.0" { + version = v1 } else { - parts := bytes.SplitN(body, []byte("\n"), 2) - if len(parts) < 2 { - // no version number found - err := fmt.Errorf("unsupported message version") - return sendClientResponse(&messages.ClientPollResponse{Error: err.Error()}, response) - } - body = parts[1] - if string(parts[0]) == "1.0" { - version = v1 - - } else { - err := fmt.Errorf("unsupported message version") - return sendClientResponse(&messages.ClientPollResponse{Error: err.Error()}, response) - } + err := fmt.Errorf("unsupported message version") + return sendClientResponse(&messages.ClientPollResponse{Error: err.Error()}, response) }
var offer *ClientOffer switch version { - case v0: - offer = &ClientOffer{ - natType: arg.NatType, - sdp: body, - } case v1: req, err := messages.DecodeClientPollRequest(body) if err != nil { @@ -203,8 +189,6 @@ func (i *IPC) ClientOffers(arg messages.Arg, response *[]byte) error { } i.ctx.metrics.lock.Unlock() switch version { - case v0: - return messages.ErrUnavailable case v1: resp := &messages.ClientPollResponse{Error: "no snowflake proxies currently available"} return sendClientResponse(resp, response) @@ -230,8 +214,6 @@ func (i *IPC) ClientOffers(arg messages.Arg, response *[]byte) error { i.ctx.metrics.promMetrics.ClientPollTotal.With(prometheus.Labels{"nat": offer.natType, "status": "matched"}).Inc() i.ctx.metrics.lock.Unlock() switch version { - case v0: - *response = []byte(answer) case v1: resp := &messages.ClientPollResponse{Answer: answer} err = sendClientResponse(resp, response) @@ -243,8 +225,6 @@ func (i *IPC) ClientOffers(arg messages.Arg, response *[]byte) error { case <-time.After(time.Second * ClientTimeout): log.Println("Client: Timed out.") switch version { - case v0: - err = messages.ErrTimeout case v1: resp := &messages.ClientPollResponse{ Error: "timed out waiting for answer!"} diff --git a/common/messages/ipc.go b/common/messages/ipc.go index 3f89200..ee29a57 100644 --- a/common/messages/ipc.go +++ b/common/messages/ipc.go @@ -7,12 +7,9 @@ import ( type Arg struct { Body []byte RemoteAddr string - NatType string }
var ( - ErrBadRequest = errors.New("bad request") - ErrInternal = errors.New("internal error") - ErrUnavailable = errors.New("service unavailable") - ErrTimeout = errors.New("timeout") + ErrBadRequest = errors.New("bad request") + ErrInternal = errors.New("internal error") )