commit 43cdc20e7e7f136c96814bf752ef1fbc9b6fec33 Author: Yawning Angel yawning@torproject.org Date: Fri Oct 30 09:45:26 2015 +0000
meek-lite: combine small writes at request dispatch time.
This dramatically improves bulk upload performance, from totally shit to just shit. --- transports/meeklite/meek.go | 70 ++++++++++++++++++++++++++----------------- 1 file changed, 42 insertions(+), 28 deletions(-)
diff --git a/transports/meeklite/meek.go b/transports/meeklite/meek.go index 5842704..8957ceb 100644 --- a/transports/meeklite/meek.go +++ b/transports/meeklite/meek.go @@ -161,33 +161,22 @@ func (c *meekConn) Write(b []byte) (n int, err error) { return 0, io.ErrClosedPipe }
- if len(b) > 0 { - // Copy the data to be written to a new slice, since - // we return immediately after queuing and the peer can - // happily reuse `b` before data has been sent. - toWrite := len(b) - b2 := make([]byte, toWrite) - copy(b2, b) - offset := 0 - for toWrite > 0 { - // Chunk up the writes to keep them under the maximum - // payload length. - sz := toWrite - if sz > maxPayloadLength { - sz = maxPayloadLength - } + if len(b) == 0 { + return 0, nil + }
- // Enqueue a properly sized subslice of our copy. - if ok := c.enqueueWrite(b2[offset : offset+sz]); !ok { - // Technically we did enqueue data, but the worker's - // got closed out from under us. - return 0, io.ErrClosedPipe - } - toWrite -= sz - offset += sz - runtime.Gosched() - } + // Copy the data to be written to a new slice, since + // we return immediately after queuing and the peer can + // happily reuse `b` before data has been sent. + toWrite := len(b) + b2 := make([]byte, toWrite) + copy(b2, b) + if ok := c.enqueueWrite(b2); !ok { + // Technically we did enqueue data, but the worker's + // got closed out from under us. + return 0, io.ErrClosedPipe } + runtime.Gosched() return len(b), nil }
@@ -269,9 +258,11 @@ func (c *meekConn) roundTrip(sndBuf []byte) (recvBuf []byte, err error) {
func (c *meekConn) ioWorker() { interval := initPollInterval + var sndBuf, leftBuf []byte loop: + for { - var sndBuf []byte + sndBuf = nil select { case <-time.After(interval): // If the poll interval has elapsed, issue a request. @@ -281,19 +272,42 @@ loop: break loop }
+ // Combine short writes as long as data is available to be + // sent immediately and it will not put us over the max + // payload limit. Any excess data is stored and dispatched + // as the next request). + sndBuf = append(leftBuf, sndBuf...) + wrSz := len(sndBuf) + for len(c.workerWrChan) > 0 && wrSz < maxPayloadLength { + b := <-c.workerWrChan + sndBuf = append(sndBuf, b...) + wrSz = len(sndBuf) + } + if wrSz > maxPayloadLength { + wrSz = maxPayloadLength + } + // Issue a request. - rdBuf, err := c.roundTrip(sndBuf) + rdBuf, err := c.roundTrip(sndBuf[:wrSz]) if err != nil { // Welp, something went horrifically wrong. break loop } + + // Stash the remaining payload if any. + leftBuf = sndBuf[wrSz:] // Store the remaining data + if len(leftBuf) == 0 { + leftBuf = nil + } + + // Determine the next poll interval. if len(rdBuf) > 0 { // Received data, enqueue the read. c.workerRdChan <- rdBuf
// And poll immediately. interval = 0 - } else if sndBuf != nil { + } else if wrSz > 0 { // Sent data, poll immediately. interval = 0 } else if interval == 0 {
tor-commits@lists.torproject.org