[tor-commits] [meek/master] Read from the local tor in a goroutine.

dcf at torproject.org dcf at torproject.org
Mon May 5 18:20:04 UTC 2014


commit 5e8afc7eedfe9db6a73efeb1ca75a3ce4089d082
Author: David Fifield <david at bamsoftware.com>
Date:   Sun May 4 23:32:20 2014 -0700

    Read from the local tor in a goroutine.
    
    Previously we were using a read timeout to control the polling interval.
    We would try to read from the SOCKS port until the next poll was due; if
    anything was read we would send it immediately, otherwise we would send
    an empty request. In order to be able to read anything from the SOCKS
    port, the timeout had to be set not too short: I had it at 100 ms for a
    minimum. The downside of that was that when only downloading, and not
    uploading, the minimum timeout would have to expire in between every
    request, adding an extra 100 ms to every request.
    
    So now we read from the SOCKS port in a goroutine, and have it write the
    chunks it reads to a channel. The channel can be polled without waiting.
---
 meek-client/meek-client.go |   62 +++++++++++++++++++++++++++++++++++---------
 1 file changed, 50 insertions(+), 12 deletions(-)

diff --git a/meek-client/meek-client.go b/meek-client/meek-client.go
index 8f0f319..25ce0ad 100644
--- a/meek-client/meek-client.go
+++ b/meek-client/meek-client.go
@@ -1,6 +1,7 @@
 package main
 
 import (
+	"bufio"
 	"bytes"
 	"crypto/rand"
 	"encoding/base64"
@@ -100,31 +101,68 @@ func sendRecv(buf []byte, conn net.Conn, info *RequestInfo) (int64, error) {
 }
 
 func copyLoop(conn net.Conn, info *RequestInfo) error {
-	buf := make([]byte, maxPayloadLength)
 	var interval time.Duration
 
+	ch := make(chan []byte)
+
+	// Read from the Conn and send byte slices on the channel.
+	go func() {
+		var buf [maxPayloadLength]byte
+		r := bufio.NewReader(conn)
+		for {
+			n, err := r.Read(buf[:])
+			b := make([]byte, n)
+			copy(b, buf[:n])
+			// log.Printf("read from local: %q", b)
+			ch <- b
+			if err != nil {
+				log.Printf("error reading from local: %s", err)
+				break
+			}
+		}
+		close(ch)
+	}()
+
 	interval = initPollInterval
+loop:
 	for {
-		conn.SetReadDeadline(time.Now().Add(interval))
-		// log.Printf("next poll %.6f s", interval.Seconds())
-		nr, readErr := conn.Read(buf)
-		// log.Printf("read from local: %q", buf[:nr])
+		var buf []byte
+		var ok bool
 
-		nw, err := sendRecv(buf[:nr], conn, info)
+		// log.Printf("waiting up to %.2f s", interval.Seconds())
+		// start := time.Now()
+		select {
+		case buf, ok = <-ch:
+			if !ok {
+				break loop
+			}
+			// log.Printf("read %d bytes from local after %.2f s", len(buf), time.Since(start).Seconds())
+		case <-time.After(interval):
+			// log.Printf("read nothing from local after %.2f s", time.Since(start).Seconds())
+			buf = nil
+		}
+
+		nw, err := sendRecv(buf, conn, info)
 		if err != nil {
 			return err
 		}
-		// log.Printf("read from remote: %d", nw)
-
-		if readErr != nil {
-			if e, ok := readErr.(net.Error); !ok || !e.Timeout() {
-				return readErr
+		/*
+			if nw > 0 {
+				log.Printf("got %d bytes from remote", nw)
+			} else {
+				log.Printf("got nothing from remote")
 			}
-		}
+		*/
 
 		if nw > 0 {
+			// If we received anything, poll again immediately.
+			interval = 0
+		} else if interval == 0 {
+			// The first time we don't receive anything, wait a
+			// while.
 			interval = initPollInterval
 		} else {
+			// After that, wait a little longer.
 			interval = time.Duration(float64(interval) * pollIntervalMultiplier)
 		}
 		if interval > maxPollInterval {





More information about the tor-commits mailing list