commit 43cdc20e7e7f136c96814bf752ef1fbc9b6fec33
Author: Yawning Angel <yawning(a)torproject.org>
Date: Fri Oct 30 09:45:26 2015 +0000
meek-lite: combine small writes at request dispatch time.
This dramatically improves bulk upload performance, from totally shit
to just shit.
---
transports/meeklite/meek.go | 70 ++++++++++++++++++++++++++-----------------
1 file changed, 42 insertions(+), 28 deletions(-)
diff --git a/transports/meeklite/meek.go b/transports/meeklite/meek.go
index 5842704..8957ceb 100644
--- a/transports/meeklite/meek.go
+++ b/transports/meeklite/meek.go
@@ -161,33 +161,22 @@ func (c *meekConn) Write(b []byte) (n int, err error) {
return 0, io.ErrClosedPipe
}
- if len(b) > 0 {
- // Copy the data to be written to a new slice, since
- // we return immediately after queuing and the peer can
- // happily reuse `b` before data has been sent.
- toWrite := len(b)
- b2 := make([]byte, toWrite)
- copy(b2, b)
- offset := 0
- for toWrite > 0 {
- // Chunk up the writes to keep them under the maximum
- // payload length.
- sz := toWrite
- if sz > maxPayloadLength {
- sz = maxPayloadLength
- }
+ if len(b) == 0 {
+ return 0, nil
+ }
- // Enqueue a properly sized subslice of our copy.
- if ok := c.enqueueWrite(b2[offset : offset+sz]); !ok {
- // Technically we did enqueue data, but the worker's
- // got closed out from under us.
- return 0, io.ErrClosedPipe
- }
- toWrite -= sz
- offset += sz
- runtime.Gosched()
- }
+ // Copy the data to be written to a new slice, since
+ // we return immediately after queuing and the peer can
+ // happily reuse `b` before data has been sent.
+ toWrite := len(b)
+ b2 := make([]byte, toWrite)
+ copy(b2, b)
+ if ok := c.enqueueWrite(b2); !ok {
+ // Technically we did enqueue data, but the worker's
+ // got closed out from under us.
+ return 0, io.ErrClosedPipe
}
+ runtime.Gosched()
return len(b), nil
}
@@ -269,9 +258,11 @@ func (c *meekConn) roundTrip(sndBuf []byte) (recvBuf []byte, err error) {
func (c *meekConn) ioWorker() {
interval := initPollInterval
+ var sndBuf, leftBuf []byte
loop:
+
for {
- var sndBuf []byte
+ sndBuf = nil
select {
case <-time.After(interval):
// If the poll interval has elapsed, issue a request.
@@ -281,19 +272,42 @@ loop:
break loop
}
+ // Combine short writes as long as data is available to be
+ // sent immediately and it will not put us over the max
+ // payload limit. Any excess data is stored and dispatched
+ // as the next request).
+ sndBuf = append(leftBuf, sndBuf...)
+ wrSz := len(sndBuf)
+ for len(c.workerWrChan) > 0 && wrSz < maxPayloadLength {
+ b := <-c.workerWrChan
+ sndBuf = append(sndBuf, b...)
+ wrSz = len(sndBuf)
+ }
+ if wrSz > maxPayloadLength {
+ wrSz = maxPayloadLength
+ }
+
// Issue a request.
- rdBuf, err := c.roundTrip(sndBuf)
+ rdBuf, err := c.roundTrip(sndBuf[:wrSz])
if err != nil {
// Welp, something went horrifically wrong.
break loop
}
+
+ // Stash the remaining payload if any.
+ leftBuf = sndBuf[wrSz:] // Store the remaining data
+ if len(leftBuf) == 0 {
+ leftBuf = nil
+ }
+
+ // Determine the next poll interval.
if len(rdBuf) > 0 {
// Received data, enqueue the read.
c.workerRdChan <- rdBuf
// And poll immediately.
interval = 0
- } else if sndBuf != nil {
+ } else if wrSz > 0 {
// Sent data, poll immediately.
interval = 0
} else if interval == 0 {