commit 0f34a7778fa1f4c28c7cc161991080d146689591 Author: David Fifield david@bamsoftware.com Date: Sun Jul 18 12:36:16 2021 -0600
Factor out httpRendezvous separate from BrokerChannel.
Makes BrokerChannel abstract over a rendezvousMethod. BrokerChannel itself is responsible for keepLocalAddresses and the NAT type state, as well as encoding and decoding client poll messages. rendezvousMethod is only responsible for delivery of encoded messages. --- client/lib/lib_test.go | 93 +-------------------------- client/lib/rendezvous.go | 99 ++++++++++------------------ client/lib/rendezvous_http.go | 77 ++++++++++++++++++++++ client/lib/rendezvous_test.go | 145 ++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 258 insertions(+), 156 deletions(-)
diff --git a/client/lib/lib_test.go b/client/lib/lib_test.go index 9087eed..86601b1 100644 --- a/client/lib/lib_test.go +++ b/client/lib/lib_test.go @@ -1,33 +1,14 @@ package lib
import ( - "bytes" "fmt" - "io/ioutil" "net" - "net/http" "testing" "time"
- "git.torproject.org/pluggable-transports/snowflake.git/common/util" . "github.com/smartystreets/goconvey/convey" )
-type MockTransport struct { - statusOverride int - body []byte -} - -// Just returns a response with fake SDP answer. -func (m *MockTransport) RoundTrip(req *http.Request) (*http.Response, error) { - s := ioutil.NopCloser(bytes.NewReader(m.body)) - r := &http.Response{ - StatusCode: m.statusOverride, - Body: s, - } - return r, nil -} - type FakeDialer struct { max int } @@ -172,11 +153,10 @@ func TestSnowflakeClient(t *testing.T) {
Convey("Dialers", t, func() { Convey("Can construct WebRTCDialer.", func() { - broker := &BrokerChannel{front: "test"} + broker := &BrokerChannel{} d := NewWebRTCDialer(broker, nil, 1) So(d, ShouldNotBeNil) So(d.BrokerChannel, ShouldNotBeNil) - So(d.BrokerChannel.front, ShouldEqual, "test") }) SkipConvey("WebRTCDialer can Catch a snowflake.", func() { broker := &BrokerChannel{} @@ -187,77 +167,6 @@ func TestSnowflakeClient(t *testing.T) { }) })
- Convey("Rendezvous", t, func() { - transport := &MockTransport{ - http.StatusOK, - []byte(`{"answer": "{"type":"answer","sdp":"fake"}" }`), - } - fakeOffer, err := util.DeserializeSessionDescription(`{"type":"offer","sdp":"test"}`) - if err != nil { - panic(err) - } - - Convey("Construct BrokerChannel with no front domain", func() { - b, err := NewBrokerChannel("http://test.broker", "", transport, false) - So(b.url, ShouldNotBeNil) - So(err, ShouldBeNil) - So(b.url.Host, ShouldResemble, "test.broker") - So(b.front, ShouldResemble, "") - So(b.transport, ShouldNotBeNil) - }) - - Convey("Construct BrokerChannel *with* front domain", func() { - b, err := NewBrokerChannel("http://test.broker", "front", transport, false) - So(b.url, ShouldNotBeNil) - So(err, ShouldBeNil) - So(b.url.Host, ShouldResemble, "test.broker") - So(b.front, ShouldResemble, "front") - So(b.transport, ShouldNotBeNil) - }) - - Convey("BrokerChannel.Negotiate responds with answer", func() { - b, err := NewBrokerChannel("http://test.broker", "", transport, false) - So(err, ShouldBeNil) - answer, err := b.Negotiate(fakeOffer) - So(err, ShouldBeNil) - So(answer, ShouldNotBeNil) - So(answer.SDP, ShouldResemble, "fake") - }) - - Convey("BrokerChannel.Negotiate fails", func() { - b, err := NewBrokerChannel("http://test.broker", "", - &MockTransport{http.StatusOK, []byte(`{"error": "no snowflake proxies currently available"}`)}, - false) - So(err, ShouldBeNil) - answer, err := b.Negotiate(fakeOffer) - So(err, ShouldNotBeNil) - So(answer, ShouldBeNil) - }) - - Convey("BrokerChannel.Negotiate fails with unexpected error", func() { - b, err := NewBrokerChannel("http://test.broker", "", - &MockTransport{http.StatusInternalServerError, []byte("\n")}, - false) - So(err, ShouldBeNil) - answer, err := b.Negotiate(fakeOffer) - So(err, ShouldNotBeNil) - So(answer, ShouldBeNil) - So(err.Error(), ShouldResemble, BrokerErrorUnexpected) - }) - - Convey("BrokerChannel.Negotiate fails with large read", func() { - b, err := NewBrokerChannel("http://test.broker", "", - &MockTransport{http.StatusOK, make([]byte, readLimit+1)}, - false) - So(err, ShouldBeNil) - answer, err := b.Negotiate(fakeOffer) - So(err, ShouldNotBeNil) - So(answer, ShouldBeNil) - So(err.Error(), ShouldResemble, "unexpected EOF") - }) - - }) - }
func TestWebRTCPeer(t *testing.T) { diff --git a/client/lib/rendezvous.go b/client/lib/rendezvous.go index caa4ae4..8568120 100644 --- a/client/lib/rendezvous.go +++ b/client/lib/rendezvous.go @@ -9,13 +9,9 @@ package lib
import ( - "bytes" "errors" - "io" - "io/ioutil" "log" "net/http" - "net/url" "sync" "time"
@@ -30,11 +26,21 @@ const ( readLimit = 100000 //Maximum number of bytes to be read from an HTTP response )
-// Signalling Channel to the Broker. +// rendezvousMethod represents a way of communicating with the broker: sending +// an encoded client poll request (SDP offer) and receiving an encoded client +// poll response (SDP answer) in return. rendezvousMethod is used by +// BrokerChannel, which is in charge of encoding and decoding, and all other +// tasks that are independent of the rendezvous method. +type rendezvousMethod interface { + Exchange([]byte) ([]byte, error) +} + +// BrokerChannel contains a rendezvousMethod, as well as data that is not +// specific to any rendezvousMethod. BrokerChannel has the responsibility of +// encoding and decoding SDP offers and answers; rendezvousMethod is responsible +// for the exchange of encoded information. type BrokerChannel struct { - url *url.URL - front string // Optional front domain to replace url.Host in requests. - transport http.RoundTripper // Used to make all requests. + rendezvous rendezvousMethod keepLocalAddresses bool NATType string lock sync.Mutex @@ -54,31 +60,21 @@ func CreateBrokerTransport() http.RoundTripper { // |broker| is the full URL of the facilitating program which assigns proxies // to clients, and |front| is the option fronting domain. func NewBrokerChannel(broker string, front string, transport http.RoundTripper, keepLocalAddresses bool) (*BrokerChannel, error) { - targetURL, err := url.Parse(broker) - if err != nil { - return nil, err - } log.Println("Rendezvous using Broker at:", broker) if front != "" { log.Println("Domain fronting using:", front) } - bc := new(BrokerChannel) - bc.url = targetURL - bc.front = front - bc.transport = transport - bc.keepLocalAddresses = keepLocalAddresses - bc.NATType = nat.NATUnknown - return bc, nil -}
-func limitedRead(r io.Reader, limit int64) ([]byte, error) { - p, err := ioutil.ReadAll(&io.LimitedReader{R: r, N: limit + 1}) + rendezvous, err := newHTTPRendezvous(broker, front, transport) if err != nil { - return p, err - } else if int64(len(p)) == limit+1 { - return p[0:limit], io.ErrUnexpectedEOF + return nil, err } - return p, err + + return &BrokerChannel{ + rendezvous: rendezvous, + keepLocalAddresses: keepLocalAddresses, + NATType: nat.NATUnknown, + }, nil }
// Roundtrip HTTP POST using WebRTC SessionDescriptions. @@ -87,8 +83,6 @@ func limitedRead(r io.Reader, limit int64) ([]byte, error) { // with an SDP answer from a designated remote WebRTC peer. func (bc *BrokerChannel) Negotiate(offer *webrtc.SessionDescription) ( *webrtc.SessionDescription, error) { - log.Println("Negotiating via BrokerChannel...\nTarget URL: ", - bc.url.Host, "\nFront URL: ", bc.front) // Ideally, we could specify an `RTCIceTransportPolicy` that would handle // this for us. However, "public" was removed from the draft spec. // See https://developer.mozilla.org/en-US/docs/Web/API/RTCConfiguration#RTCIceTran... @@ -103,57 +97,34 @@ func (bc *BrokerChannel) Negotiate(offer *webrtc.SessionDescription) ( return nil, err }
- // Encode client poll request + // Encode the client poll request. bc.lock.Lock() req := &messages.ClientPollRequest{ Offer: offerSDP, NAT: bc.NATType, } - body, err := req.EncodePollRequest() + encReq, err := req.EncodePollRequest() bc.lock.Unlock() if err != nil { return nil, err }
- data := bytes.NewReader([]byte(body)) - // Suffix with broker's client registration handler. - clientURL := bc.url.ResolveReference(&url.URL{Path: "client"}) - request, err := http.NewRequest("POST", clientURL.String(), data) - if nil != err { + // Do the exchange using our rendezvousMethod. + encResp, err := bc.rendezvous.Exchange(encReq) + if err != nil { return nil, err } - if bc.front != "" { - // Do domain fronting. Replace the domain in the URL's with the - // front, and store the original domain the HTTP Host header. - request.Host = request.URL.Host - request.URL.Host = bc.front - } - resp, err := bc.transport.RoundTrip(request) - if nil != err { + log.Printf("Received answer: %s", string(encResp)) + + // Decode the client poll response. + resp, err := messages.DecodeClientPollResponse(encResp) + if err != nil { return nil, err } - defer resp.Body.Close() - log.Printf("BrokerChannel Response:\n%s\n\n", resp.Status) - - switch resp.StatusCode { - case http.StatusOK: - body, err := limitedRead(resp.Body, readLimit) - if nil != err { - return nil, err - } - log.Printf("Received answer: %s", string(body)) - - resp, err := messages.DecodeClientPollResponse(body) - if err != nil { - return nil, err - } - if resp.Error != "" { - return nil, errors.New(resp.Error) - } - return util.DeserializeSessionDescription(resp.Answer) - default: - return nil, errors.New(BrokerErrorUnexpected) + if resp.Error != "" { + return nil, errors.New(resp.Error) } + return util.DeserializeSessionDescription(resp.Answer) }
func (bc *BrokerChannel) SetNATType(NATType string) { diff --git a/client/lib/rendezvous_http.go b/client/lib/rendezvous_http.go new file mode 100644 index 0000000..01219cb --- /dev/null +++ b/client/lib/rendezvous_http.go @@ -0,0 +1,77 @@ +package lib + +import ( + "bytes" + "errors" + "io" + "io/ioutil" + "log" + "net/http" + "net/url" +) + +// httpRendezvous is a rendezvousMethod that communicates with the .../client +// route of the broker over HTTP or HTTPS, with optional domain fronting. +type httpRendezvous struct { + brokerURL *url.URL + front string // Optional front domain to replace url.Host in requests. + transport http.RoundTripper // Used to make all requests. +} + +// newHTTPRendezvous creates a new httpRendezvous that contacts the broker at +// the given URL, with an optional front domain. transport is the +// http.RoundTripper used to make all requests. +func newHTTPRendezvous(broker, front string, transport http.RoundTripper) (*httpRendezvous, error) { + brokerURL, err := url.Parse(broker) + if err != nil { + return nil, err + } + return &httpRendezvous{ + brokerURL: brokerURL, + front: front, + transport: transport, + }, nil +} + +func (r *httpRendezvous) Exchange(encPollReq []byte) ([]byte, error) { + log.Println("Negotiating via HTTP rendezvous...") + log.Println("Target URL: ", r.brokerURL.Host) + log.Println("Front URL: ", r.front) + + // Suffix the path with the broker's client registration handler. + reqURL := r.brokerURL.ResolveReference(&url.URL{Path: "client"}) + req, err := http.NewRequest("POST", reqURL.String(), bytes.NewReader(encPollReq)) + if err != nil { + return nil, err + } + + if r.front != "" { + // Do domain fronting. Replace the domain in the URL's with the + // front, and store the original domain the HTTP Host header. + req.Host = req.URL.Host + req.URL.Host = r.front + } + + resp, err := r.transport.RoundTrip(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + log.Printf("HTTP rendezvous response: %s", resp.Status) + if resp.StatusCode != http.StatusOK { + return nil, errors.New(BrokerErrorUnexpected) + } + + return limitedRead(resp.Body, readLimit) +} + +func limitedRead(r io.Reader, limit int64) ([]byte, error) { + p, err := ioutil.ReadAll(&io.LimitedReader{R: r, N: limit + 1}) + if err != nil { + return p, err + } else if int64(len(p)) == limit+1 { + return p[0:limit], io.ErrUnexpectedEOF + } + return p, err +} diff --git a/client/lib/rendezvous_test.go b/client/lib/rendezvous_test.go new file mode 100644 index 0000000..c263e37 --- /dev/null +++ b/client/lib/rendezvous_test.go @@ -0,0 +1,145 @@ +package lib + +import ( + "bytes" + "errors" + "fmt" + "io" + "io/ioutil" + "net/http" + "testing" + + "git.torproject.org/pluggable-transports/snowflake.git/common/messages" + "git.torproject.org/pluggable-transports/snowflake.git/common/nat" + . "github.com/smartystreets/goconvey/convey" +) + +// mockTransport's RoundTrip method returns a response with a fake status and +// body. +type mockTransport struct { + statusCode int + body []byte +} + +func (t *mockTransport) RoundTrip(req *http.Request) (*http.Response, error) { + return &http.Response{ + Status: fmt.Sprintf("%d %s", t.statusCode, http.StatusText(t.statusCode)), + StatusCode: t.statusCode, + Body: ioutil.NopCloser(bytes.NewReader(t.body)), + }, nil +} + +// errorTransport's RoundTrip method returns an error. +type errorTransport struct { + err error +} + +func (t errorTransport) RoundTrip(req *http.Request) (*http.Response, error) { + return nil, t.err +} + +// makeEncPollReq returns an encoded client poll request containing a given +// offer. +func makeEncPollReq(offer string) []byte { + encPollReq, err := (&messages.ClientPollRequest{ + Offer: offer, + NAT: nat.NATUnknown, + }).EncodePollRequest() + if err != nil { + panic(err) + } + return encPollReq +} + +// makeEncPollResp returns an encoded client poll response with given answer and +// error strings. +func makeEncPollResp(answer, errorStr string) []byte { + encPollResp, err := (&messages.ClientPollResponse{ + Answer: answer, + Error: errorStr, + }).EncodePollResponse() + if err != nil { + panic(err) + } + return encPollResp +} + +func TestHTTPRendezvous(t *testing.T) { + Convey("HTTP rendezvous", t, func() { + Convey("Construct httpRendezvous with no front domain", func() { + transport := &mockTransport{http.StatusOK, []byte{}} + rend, err := newHTTPRendezvous("http://test.broker", "", transport) + So(err, ShouldBeNil) + So(rend.brokerURL, ShouldNotBeNil) + So(rend.brokerURL.Host, ShouldResemble, "test.broker") + So(rend.front, ShouldResemble, "") + So(rend.transport, ShouldEqual, transport) + }) + + Convey("Construct httpRendezvous *with* front domain", func() { + transport := &mockTransport{http.StatusOK, []byte{}} + rend, err := newHTTPRendezvous("http://test.broker", "front", transport) + So(err, ShouldBeNil) + So(rend.brokerURL, ShouldNotBeNil) + So(rend.brokerURL.Host, ShouldResemble, "test.broker") + So(rend.front, ShouldResemble, "front") + So(rend.transport, ShouldEqual, transport) + }) + + fakeEncPollReq := makeEncPollReq(`{"type":"offer","sdp":"test"}`) + + Convey("httpRendezvous.Exchange responds with answer", func() { + fakeEncPollResp := makeEncPollResp( + `{"answer": "{"type":"answer","sdp":"fake"}" }`, + "", + ) + rend, err := newHTTPRendezvous("http://test.broker", "", + &mockTransport{http.StatusOK, fakeEncPollResp}) + So(err, ShouldBeNil) + answer, err := rend.Exchange(fakeEncPollReq) + So(err, ShouldBeNil) + So(answer, ShouldResemble, fakeEncPollResp) + }) + + Convey("httpRendezvous.Exchange responds with no answer", func() { + fakeEncPollResp := makeEncPollResp( + "", + `{"error": "no snowflake proxies currently available"}`, + ) + rend, err := newHTTPRendezvous("http://test.broker", "", + &mockTransport{http.StatusOK, fakeEncPollResp}) + So(err, ShouldBeNil) + answer, err := rend.Exchange(fakeEncPollReq) + So(err, ShouldBeNil) + So(answer, ShouldResemble, fakeEncPollResp) + }) + + Convey("httpRendezvous.Exchange fails with unexpected HTTP status code", func() { + rend, err := newHTTPRendezvous("http://test.broker", "", + &mockTransport{http.StatusInternalServerError, []byte{}}) + So(err, ShouldBeNil) + answer, err := rend.Exchange(fakeEncPollReq) + So(err, ShouldNotBeNil) + So(answer, ShouldBeNil) + So(err.Error(), ShouldResemble, BrokerErrorUnexpected) + }) + + Convey("httpRendezvous.Exchange fails with error", func() { + transportErr := errors.New("error") + rend, err := newHTTPRendezvous("http://test.broker", "", + &errorTransport{err: transportErr}) + So(err, ShouldBeNil) + answer, err := rend.Exchange(fakeEncPollReq) + So(err, ShouldEqual, transportErr) + So(answer, ShouldBeNil) + }) + + Convey("httpRendezvous.Exchange fails with large read", func() { + rend, err := newHTTPRendezvous("http://test.broker", "", + &mockTransport{http.StatusOK, make([]byte, readLimit+1)}) + So(err, ShouldBeNil) + _, err = rend.Exchange(fakeEncPollReq) + So(err, ShouldEqual, io.ErrUnexpectedEOF) + }) + }) +}