commit 816cff15f425d0cb87a1b996366989aa01833f99
Author: Yawning Angel <yawning(a)schwanenlied.me>
Date: Sun Jan 20 16:14:28 2019 +0000
transports/meeklite: Cleanups, bugfixes and improvements
* Properly close the response body on HTTP error.
* Cleanup close signaling.
* Write() should return faster on closed connections.
---
transports/meeklite/meek.go | 70 +++++++++++++++++++++------------------------
1 file changed, 32 insertions(+), 38 deletions(-)
diff --git a/transports/meeklite/meek.go b/transports/meeklite/meek.go
index a99556b..39a6d4b 100644
--- a/transports/meeklite/meek.go
+++ b/transports/meeklite/meek.go
@@ -39,6 +39,7 @@ import (
"net"
"net/http"
gourl "net/url"
+ "os"
"runtime"
"sync"
"time"
@@ -107,16 +108,14 @@ func newClientArgs(args *pt.Args) (ca *meekClientArgs, err error) {
}
type meekConn struct {
- sync.Mutex
-
args *meekClientArgs
sessionID string
transport *http.Transport
- workerRunning bool
+ closeOnce sync.Once
workerWrChan chan []byte
workerRdChan chan []byte
- workerCloseChan chan bool
+ workerCloseChan chan struct{}
rdBuf *bytes.Buffer
}
@@ -154,11 +153,10 @@ func (c *meekConn) Read(p []byte) (n int, err error) {
func (c *meekConn) Write(b []byte) (n int, err error) {
// Check to see if the connection is actually open.
- c.Lock()
- closed := !c.workerRunning
- c.Unlock()
- if closed {
+ select {
+ case <-c.workerCloseChan:
return 0, io.ErrClosedPipe
+ default:
}
if len(b) == 0 {
@@ -168,9 +166,7 @@ func (c *meekConn) Write(b []byte) (n int, err error) {
// Copy the data to be written to a new slice, since
// we return immediately after queuing and the peer can
// happily reuse `b` before data has been sent.
- toWrite := len(b)
- b2 := make([]byte, toWrite)
- copy(b2, b)
+ b2 := append([]byte{}, b...)
if ok := c.enqueueWrite(b2); !ok {
// Technically we did enqueue data, but the worker's
// got closed out from under us.
@@ -181,18 +177,15 @@ func (c *meekConn) Write(b []byte) (n int, err error) {
}
func (c *meekConn) Close() error {
- // Ensure that we do this once and only once.
- c.Lock()
- defer c.Unlock()
- if !c.workerRunning {
- return nil
- }
+ err := os.ErrClosed
- // Tear down the worker.
- c.workerRunning = false
- c.workerCloseChan <- true
+ c.closeOnce.Do(func() {
+ // Tear down the worker, if it is still running.
+ close(c.workerCloseChan)
+ err = nil
+ })
- return nil
+ return err
}
func (c *meekConn) LocalAddr() net.Addr {
@@ -216,7 +209,11 @@ func (c *meekConn) SetWriteDeadline(t time.Time) error {
}
func (c *meekConn) enqueueWrite(b []byte) (ok bool) {
- defer func() { _ = recover() }()
+ defer func() {
+ if err := recover(); err != nil {
+ ok = false
+ }
+ }()
c.workerWrChan <- b
return true
}
@@ -249,14 +246,16 @@ func (c *meekConn) roundTrip(sndBuf []byte) (recvBuf []byte, err error) {
if err != nil {
return nil, err
}
- if resp.StatusCode != http.StatusOK {
- err = fmt.Errorf("status code was %d, not %d", resp.StatusCode, http.StatusOK)
- time.Sleep(retryDelay)
- } else {
- defer resp.Body.Close()
+
+ if resp.StatusCode == http.StatusOK {
recvBuf, err = ioutil.ReadAll(io.LimitReader(resp.Body, maxPayloadLength))
+ resp.Body.Close()
return
}
+
+ resp.Body.Close()
+ err = fmt.Errorf("status code was %d, not %d", resp.StatusCode, http.StatusOK)
+ time.Sleep(retryDelay)
}
return
}
@@ -264,8 +263,8 @@ func (c *meekConn) roundTrip(sndBuf []byte) (recvBuf []byte, err error) {
func (c *meekConn) ioWorker() {
interval := initPollInterval
var sndBuf, leftBuf []byte
-loop:
+loop:
for {
sndBuf = nil
select {
@@ -316,7 +315,7 @@ loop:
// Sent data, poll immediately.
interval = 0
} else if interval == 0 {
- // Neither sent nor received data, initialize the delay.
+ // Neither sent nor received data after a poll, re-initialize the delay.
interval = initPollInterval
} else {
// Apply a multiplicative backoff.
@@ -334,11 +333,8 @@ loop:
close(c.workerRdChan)
close(c.workerWrChan)
- // In case the close was done on an error condition, update the state
- // variable so that further calls to Write() will fail.
- c.Lock()
- defer c.Unlock()
- c.workerRunning = false
+ // Close the connection (extra calls to Close() are harmless).
+ _ = c.Close()
}
func newMeekConn(network, addr string, dialFn base.DialFunc, ca *meekClientArgs) (net.Conn, error) {
@@ -347,15 +343,13 @@ func newMeekConn(network, addr string, dialFn base.DialFunc, ca *meekClientArgs)
return nil, err
}
- tr := &http.Transport{Dial: dialFn}
conn := &meekConn{
args: ca,
sessionID: id,
- transport: tr,
- workerRunning: true,
+ transport: &http.Transport{Dial: dialFn},
workerWrChan: make(chan []byte, maxChanBacklog),
workerRdChan: make(chan []byte, maxChanBacklog),
- workerCloseChan: make(chan bool),
+ workerCloseChan: make(chan struct{}),
}
// Start the I/O worker.