[tor-commits] [obfs4/master] meek-lite: combine small writes at request dispatch time.

yawning at torproject.org yawning at torproject.org
Fri Oct 30 09:51:38 UTC 2015


commit 43cdc20e7e7f136c96814bf752ef1fbc9b6fec33
Author: Yawning Angel <yawning at torproject.org>
Date:   Fri Oct 30 09:45:26 2015 +0000

    meek-lite: combine small writes at request dispatch time.
    
    This dramatically improves bulk upload performance, from totally shit
    to just shit.
---
 transports/meeklite/meek.go |   70 ++++++++++++++++++++++++++-----------------
 1 file changed, 42 insertions(+), 28 deletions(-)

diff --git a/transports/meeklite/meek.go b/transports/meeklite/meek.go
index 5842704..8957ceb 100644
--- a/transports/meeklite/meek.go
+++ b/transports/meeklite/meek.go
@@ -161,33 +161,22 @@ func (c *meekConn) Write(b []byte) (n int, err error) {
 		return 0, io.ErrClosedPipe
 	}
 
-	if len(b) > 0 {
-		// Copy the data to be written to a new slice, since
-		// we return immediately after queuing and the peer can
-		// happily reuse `b` before data has been sent.
-		toWrite := len(b)
-		b2 := make([]byte, toWrite)
-		copy(b2, b)
-		offset := 0
-		for toWrite > 0 {
-			// Chunk up the writes to keep them under the maximum
-			// payload length.
-			sz := toWrite
-			if sz > maxPayloadLength {
-				sz = maxPayloadLength
-			}
+	if len(b) == 0 {
+		return 0, nil
+	}
 
-			// Enqueue a properly sized subslice of our copy.
-			if ok := c.enqueueWrite(b2[offset : offset+sz]); !ok {
-				// Technically we did enqueue data, but the worker's
-				// got closed out from under us.
-				return 0, io.ErrClosedPipe
-			}
-			toWrite -= sz
-			offset += sz
-			runtime.Gosched()
-		}
+	// Copy the data to be written to a new slice, since
+	// we return immediately after queuing and the peer can
+	// happily reuse `b` before data has been sent.
+	toWrite := len(b)
+	b2 := make([]byte, toWrite)
+	copy(b2, b)
+	if ok := c.enqueueWrite(b2); !ok {
+		// Technically we did enqueue data, but the worker's
+		// got closed out from under us.
+		return 0, io.ErrClosedPipe
 	}
+	runtime.Gosched()
 	return len(b), nil
 }
 
@@ -269,9 +258,11 @@ func (c *meekConn) roundTrip(sndBuf []byte) (recvBuf []byte, err error) {
 
 func (c *meekConn) ioWorker() {
 	interval := initPollInterval
+	var sndBuf, leftBuf []byte
 loop:
+
 	for {
-		var sndBuf []byte
+		sndBuf = nil
 		select {
 		case <-time.After(interval):
 			// If the poll interval has elapsed, issue a request.
@@ -281,19 +272,42 @@ loop:
 			break loop
 		}
 
+		// Combine short writes as long as data is available to be
+		// sent immediately and it will not put us over the max
+		// payload limit.  Any excess data is stored and dispatched
+		// as the next request).
+		sndBuf = append(leftBuf, sndBuf...)
+		wrSz := len(sndBuf)
+		for len(c.workerWrChan) > 0 && wrSz < maxPayloadLength {
+			b := <-c.workerWrChan
+			sndBuf = append(sndBuf, b...)
+			wrSz = len(sndBuf)
+		}
+		if wrSz > maxPayloadLength {
+			wrSz = maxPayloadLength
+		}
+
 		// Issue a request.
-		rdBuf, err := c.roundTrip(sndBuf)
+		rdBuf, err := c.roundTrip(sndBuf[:wrSz])
 		if err != nil {
 			// Welp, something went horrifically wrong.
 			break loop
 		}
+
+		// Stash the remaining payload if any.
+		leftBuf = sndBuf[wrSz:] // Store the remaining data
+		if len(leftBuf) == 0 {
+			leftBuf = nil
+		}
+
+		// Determine the next poll interval.
 		if len(rdBuf) > 0 {
 			// Received data, enqueue the read.
 			c.workerRdChan <- rdBuf
 
 			// And poll immediately.
 			interval = 0
-		} else if sndBuf != nil {
+		} else if wrSz > 0 {
 			// Sent data, poll immediately.
 			interval = 0
 		} else if interval == 0 {





More information about the tor-commits mailing list