This is an automated email from the git hooks/post-receive script.
dcf pushed a commit to branch turbotunnel in repository pluggable-transports/meek.
commit 46c988b2582fcfc9d6ff17114a1961a058edd556 Author: David Fifield david@bamsoftware.com AuthorDate: Fri Oct 28 01:01:17 2022 -0600
Halt requestLoop when PollingPacketConn is closed. --- meek-client/meek-client.go | 4 +++- meek-client/turbotunnel.go | 4 ++-- meek-client/turbotunnel_test.go | 50 +++++++++++++++++++++++++++++++++-------- 3 files changed, 46 insertions(+), 12 deletions(-)
diff --git a/meek-client/meek-client.go b/meek-client/meek-client.go index 683aa90..813a373 100644 --- a/meek-client/meek-client.go +++ b/meek-client/meek-client.go @@ -27,6 +27,7 @@ package main
import ( + "context" "flag" "fmt" "io" @@ -97,13 +98,14 @@ type RequestInfo struct { RoundTripper http.RoundTripper }
-func (info *RequestInfo) Poll(out io.Reader) (in io.ReadCloser, err error) { +func (info *RequestInfo) Poll(ctx context.Context, out io.Reader) (in io.ReadCloser, err error) { req, err := http.NewRequest("POST", info.URL.String(), out) // Prevent Content-Type sniffing by net/http and middleboxes. req.Header.Set("Content-Type", "application/octet-stream") if err != nil { return nil, err } + req = req.WithContext(ctx) if info.Host != "" { req.Host = info.Host } diff --git a/meek-client/turbotunnel.go b/meek-client/turbotunnel.go index cbb3380..43c0c46 100644 --- a/meek-client/turbotunnel.go +++ b/meek-client/turbotunnel.go @@ -40,7 +40,7 @@ const ( // Poller is an abstract interface over an operation that writes a stream of // bytes and reads a stream of bytes in return, like an HTTP request. type Poller interface { - Poll(out io.Reader) (in io.ReadCloser, err error) + Poll(ctx context.Context, out io.Reader) (in io.ReadCloser, err error) }
// PollingPacketConn implements the net.PacketConn interface over a carrier of @@ -173,7 +173,7 @@ func (c *PollingPacketConn) requestLoop() { }
go func() { - resp, err := c.poller.Poll(&body) + resp, err := c.poller.Poll(c.ctx, &body) if err != nil { c.Close() return diff --git a/meek-client/turbotunnel_test.go b/meek-client/turbotunnel_test.go index 11640ec..89aadab 100644 --- a/meek-client/turbotunnel_test.go +++ b/meek-client/turbotunnel_test.go @@ -2,6 +2,7 @@ package main
import ( "bytes" + "context" "errors" "io" "io/ioutil" @@ -15,24 +16,58 @@ func (_ emptyAddr) Network() string { return "empty" } func (_ emptyAddr) String() string { return "empty" }
type funcPoller struct { - poll func(out io.Reader) (in io.ReadCloser, err error) + poll func(ctx context.Context, out io.Reader) (in io.ReadCloser, err error) }
-func (fp funcPoller) Poll(out io.Reader) (in io.ReadCloser, err error) { - return fp.poll(out) +func (fp funcPoller) Poll(ctx context.Context, out io.Reader) (in io.ReadCloser, err error) { + return fp.poll(ctx, out) +} + +// TestCloseCancelsPoll tests that calling Close cancels the context passed to +// the poller. +func TestCloseCancelsPoll(t *testing.T) { + beginCh := make(chan struct{}) + resultCh := make(chan error) + // The poller returns immediately with a nil error when its context is + // canceled. It returns after a delay with a non-nil error if its + // context is not canceled. + poller := funcPoller{poll: func(ctx context.Context, _ io.Reader) (io.ReadCloser, error) { + defer close(resultCh) + beginCh <- struct{}{} + select { + case <-ctx.Done(): + resultCh <- nil + case <-time.After(5 * time.Second): + resultCh <- errors.New("poll was not canceled") + } + return ioutil.NopCloser(bytes.NewReader(nil)), nil + }} + pconn := NewPollingPacketConn(emptyAddr{}, poller) + // Wait until the poll function has been called. + <-beginCh + // Close the connection. + err := pconn.Close() + if err != nil { + t.Fatal(err) + } + // Observe what happened inside the poll function. Closing the + // connection should have canceled the context. + err = <-resultCh + if err != nil { + t.Fatal(err) + } }
// TestCloseHaltsRequestLoop tests that requestLoop terminates and stops calling // its Poller after Close is called. func TestCloseHaltsRequestLoop(t *testing.T) { - closedCh := make(chan struct{}) resultCh := make(chan error) // The poller returns immediately with a nil error as long as closedCh // is not closed. When closedCh is closed, the poller returns // immediately with a non-nil error. - poller := funcPoller{poll: func(_ io.Reader) (io.ReadCloser, error) { + poller := funcPoller{poll: func(ctx context.Context, _ io.Reader) (io.ReadCloser, error) { select { - case <-closedCh: + case <-ctx.Done(): resultCh <- errors.New("poll called after close") default: } @@ -44,9 +79,6 @@ func TestCloseHaltsRequestLoop(t *testing.T) { if err != nil { t.Fatal(err) } - // Tell the poll function to return an error if it is called after this - // point. - close(closedCh) // Wait a few seconds to see if the poll function is called after the // conn is closed. select {