commit 816cff15f425d0cb87a1b996366989aa01833f99 Author: Yawning Angel yawning@schwanenlied.me Date: Sun Jan 20 16:14:28 2019 +0000
transports/meeklite: Cleanups, bugfixes and improvements
* Properly close the response body on HTTP error. * Cleanup close signaling. * Write() should return faster on closed connections. --- transports/meeklite/meek.go | 70 +++++++++++++++++++++------------------------ 1 file changed, 32 insertions(+), 38 deletions(-)
diff --git a/transports/meeklite/meek.go b/transports/meeklite/meek.go index a99556b..39a6d4b 100644 --- a/transports/meeklite/meek.go +++ b/transports/meeklite/meek.go @@ -39,6 +39,7 @@ import ( "net" "net/http" gourl "net/url" + "os" "runtime" "sync" "time" @@ -107,16 +108,14 @@ func newClientArgs(args *pt.Args) (ca *meekClientArgs, err error) { }
type meekConn struct { - sync.Mutex - args *meekClientArgs sessionID string transport *http.Transport
- workerRunning bool + closeOnce sync.Once workerWrChan chan []byte workerRdChan chan []byte - workerCloseChan chan bool + workerCloseChan chan struct{} rdBuf *bytes.Buffer }
@@ -154,11 +153,10 @@ func (c *meekConn) Read(p []byte) (n int, err error) {
func (c *meekConn) Write(b []byte) (n int, err error) { // Check to see if the connection is actually open. - c.Lock() - closed := !c.workerRunning - c.Unlock() - if closed { + select { + case <-c.workerCloseChan: return 0, io.ErrClosedPipe + default: }
if len(b) == 0 { @@ -168,9 +166,7 @@ func (c *meekConn) Write(b []byte) (n int, err error) { // Copy the data to be written to a new slice, since // we return immediately after queuing and the peer can // happily reuse `b` before data has been sent. - toWrite := len(b) - b2 := make([]byte, toWrite) - copy(b2, b) + b2 := append([]byte{}, b...) if ok := c.enqueueWrite(b2); !ok { // Technically we did enqueue data, but the worker's // got closed out from under us. @@ -181,18 +177,15 @@ func (c *meekConn) Write(b []byte) (n int, err error) { }
func (c *meekConn) Close() error { - // Ensure that we do this once and only once. - c.Lock() - defer c.Unlock() - if !c.workerRunning { - return nil - } + err := os.ErrClosed
- // Tear down the worker. - c.workerRunning = false - c.workerCloseChan <- true + c.closeOnce.Do(func() { + // Tear down the worker, if it is still running. + close(c.workerCloseChan) + err = nil + })
- return nil + return err }
func (c *meekConn) LocalAddr() net.Addr { @@ -216,7 +209,11 @@ func (c *meekConn) SetWriteDeadline(t time.Time) error { }
func (c *meekConn) enqueueWrite(b []byte) (ok bool) { - defer func() { _ = recover() }() + defer func() { + if err := recover(); err != nil { + ok = false + } + }() c.workerWrChan <- b return true } @@ -249,14 +246,16 @@ func (c *meekConn) roundTrip(sndBuf []byte) (recvBuf []byte, err error) { if err != nil { return nil, err } - if resp.StatusCode != http.StatusOK { - err = fmt.Errorf("status code was %d, not %d", resp.StatusCode, http.StatusOK) - time.Sleep(retryDelay) - } else { - defer resp.Body.Close() + + if resp.StatusCode == http.StatusOK { recvBuf, err = ioutil.ReadAll(io.LimitReader(resp.Body, maxPayloadLength)) + resp.Body.Close() return } + + resp.Body.Close() + err = fmt.Errorf("status code was %d, not %d", resp.StatusCode, http.StatusOK) + time.Sleep(retryDelay) } return } @@ -264,8 +263,8 @@ func (c *meekConn) roundTrip(sndBuf []byte) (recvBuf []byte, err error) { func (c *meekConn) ioWorker() { interval := initPollInterval var sndBuf, leftBuf []byte -loop:
+loop: for { sndBuf = nil select { @@ -316,7 +315,7 @@ loop: // Sent data, poll immediately. interval = 0 } else if interval == 0 { - // Neither sent nor received data, initialize the delay. + // Neither sent nor received data after a poll, re-initialize the delay. interval = initPollInterval } else { // Apply a multiplicative backoff. @@ -334,11 +333,8 @@ loop: close(c.workerRdChan) close(c.workerWrChan)
- // In case the close was done on an error condition, update the state - // variable so that further calls to Write() will fail. - c.Lock() - defer c.Unlock() - c.workerRunning = false + // Close the connection (extra calls to Close() are harmless). + _ = c.Close() }
func newMeekConn(network, addr string, dialFn base.DialFunc, ca *meekClientArgs) (net.Conn, error) { @@ -347,15 +343,13 @@ func newMeekConn(network, addr string, dialFn base.DialFunc, ca *meekClientArgs) return nil, err }
- tr := &http.Transport{Dial: dialFn} conn := &meekConn{ args: ca, sessionID: id, - transport: tr, - workerRunning: true, + transport: &http.Transport{Dial: dialFn}, workerWrChan: make(chan []byte, maxChanBacklog), workerRdChan: make(chan []byte, maxChanBacklog), - workerCloseChan: make(chan bool), + workerCloseChan: make(chan struct{}), }
// Start the I/O worker.