[tor-commits] [pluggable-transports/meek] 07/09: Reqwrite PollingPacketConn in terms of QueuePacketConn.

gitolite role git at cupani.torproject.org
Fri Oct 28 07:13:06 UTC 2022


This is an automated email from the git hooks/post-receive script.

dcf pushed a commit to branch turbotunnel
in repository pluggable-transports/meek.

commit a61ec4e3c8ab2a606f643f3a8159e3515295bcda
Author: David Fifield <david at bamsoftware.com>
AuthorDate: Fri Oct 28 00:22:03 2022 -0600

    Reqwrite PollingPacketConn in terms of QueuePacketConn.
    
    As in Champa. Use one main polling loop, not multiple, but execute each
    poll asynchronously.
---
 meek-client/turbotunnel.go | 242 ++++++++++++++-------------------------------
 1 file changed, 75 insertions(+), 167 deletions(-)

diff --git a/meek-client/turbotunnel.go b/meek-client/turbotunnel.go
index d4274e2..cbb3380 100644
--- a/meek-client/turbotunnel.go
+++ b/meek-client/turbotunnel.go
@@ -10,11 +10,9 @@ package main
 
 import (
 	"bytes"
-	"errors"
+	"context"
 	"io"
 	"net"
-	"sync"
-	"sync/atomic"
 	"time"
 
 	"git.torproject.org/pluggable-transports/meek.git/common/encapsulation"
@@ -22,18 +20,11 @@ import (
 )
 
 const (
-	// The size of receive and send queues.
-	queueSize = 256
-
 	// The size of the largest bundle of packets we will send in a poll.
 	// (Actually it's not quite a maximum, we will quit bundling as soon as
 	// it is exceeded.)
 	maxSendBundleLength = 0x10000
 
-	// How many goroutines stand ready to do a poll when an outgoing packet
-	// needs to be sent.
-	numRequestLoops = 32
-
 	// We must poll the server to see if it has anything to send; there is
 	// no way for the server to push data back to us until we poll it. When
 	// a timer expires, we send a request even if it has an empty body. The
@@ -66,16 +57,13 @@ type PollingPacketConn struct {
 	clientID   turbotunnel.ClientID
 	remoteAddr net.Addr
 	poller     Poller
-	recvQueue  chan []byte
-	sendQueue  chan []byte
-	// Sending on pollChan permits requestLoop to send an empty polling
-	// query. requestLoop also does its own polling according to a time
-	// schedule.
-	pollChan  chan struct{}
-	closeOnce sync.Once
-	closed    chan struct{}
-	// What error to return when the PollingPacketConn is closed.
-	err atomic.Value
+	ctx        context.Context
+	cancel     context.CancelFunc
+	// QueuePacketConn is the direct receiver of ReadFrom and WriteTo calls.
+	// requestLoop, via send, removes messages from the outgoing queue that
+	// were placed there by WriteTo, and inserts messages into the incoming
+	// queue to be returned from ReadFrom.
+	*turbotunnel.QueuePacketConn
 }
 
 // NewPollingPacketConn creates a PollingPacketConn with a random ClientID as
@@ -83,99 +71,31 @@ type PollingPacketConn struct {
 // ReadFrom, and the RemoteAddr method; is is poller that really controls the
 // effective remote address.
 func NewPollingPacketConn(remoteAddr net.Addr, poller Poller) *PollingPacketConn {
+	clientID := turbotunnel.NewClientID()
+	ctx, cancel := context.WithCancel(context.Background())
 	c := &PollingPacketConn{
-		clientID:   turbotunnel.NewClientID(),
-		remoteAddr: remoteAddr,
-		poller:     poller,
-		recvQueue:  make(chan []byte, queueSize),
-		sendQueue:  make(chan []byte, queueSize),
-		pollChan:   make(chan struct{}),
-		closed:     make(chan struct{}),
-	}
-	for i := 0; i < numRequestLoops; i++ {
-		go c.requestLoop()
+		clientID:        clientID,
+		remoteAddr:      remoteAddr,
+		poller:          poller,
+		ctx:             ctx,
+		cancel:          cancel,
+		QueuePacketConn: turbotunnel.NewQueuePacketConn(clientID, 0),
 	}
+	go c.requestLoop()
 	return c
 }
 
-var errClosedPacketConn = errors.New("operation on closed connection")
-var errNotImplemented = errors.New("not implemented")
-
-// ReadFrom returns a packet received from a previous poll, blocking until there
-// is a packet to return. Unless the returned error is non-nil, the returned
-// net.Addr is always c.RemoteAddr(),
-func (c *PollingPacketConn) ReadFrom(p []byte) (int, net.Addr, error) {
-	select {
-	case <-c.closed:
-		return 0, nil, &net.OpError{Op: "read", Net: c.RemoteAddr().Network(), Source: c.LocalAddr(), Addr: c.RemoteAddr(), Err: c.err.Load().(error)}
-	default:
-	}
-	select {
-	case <-c.closed:
-		return 0, nil, &net.OpError{Op: "read", Net: c.RemoteAddr().Network(), Source: c.LocalAddr(), Addr: c.RemoteAddr(), Err: c.err.Load().(error)}
-	case buf := <-c.recvQueue:
-		return copy(p, buf), c.RemoteAddr(), nil
-	}
-}
-
-// WriteTo queues a packet to be sent (possibly batched) by the underlying
-// poller.
-func (c *PollingPacketConn) WriteTo(p []byte, addr net.Addr) (int, error) {
-	// The addr argument is ignored.
-	select {
-	case <-c.closed:
-		return 0, &net.OpError{Op: "write", Net: c.RemoteAddr().Network(), Source: c.LocalAddr(), Addr: c.RemoteAddr(), Err: c.err.Load().(error)}
-	default:
-	}
-	// Copy the slice so that the caller may reuse it.
-	buf := make([]byte, len(p))
-	copy(buf, p)
-	select {
-	case c.sendQueue <- buf:
-		return len(buf), nil
-	default:
-		// Drop the outgoing packet if the send queue is full.
-		return len(buf), nil
-	}
-}
-
-// closeWithError unblocks pending operations and makes future operations fail
-// with the given error. If err is nil, it becomes errClosedPacketConn.
-func (c *PollingPacketConn) closeWithError(err error) error {
-	var newlyClosed bool
-	c.closeOnce.Do(func() {
-		newlyClosed = true
-		// Store the error to be returned by future PacketConn
-		// operations.
-		if err == nil {
-			err = errClosedPacketConn
-		}
-		c.err.Store(err)
-		close(c.closed)
-	})
-	if !newlyClosed {
-		return &net.OpError{Op: "close", Net: c.LocalAddr().Network(), Addr: c.LocalAddr(), Err: c.err.Load().(error)}
-	}
-	return nil
-}
-
-// Close unblocks pending operations and makes future operations fail with a
-// "closed connection" error.
+// Close cancels any in-progress polls and closes the underlying
+// QueuePacketConn.
 func (c *PollingPacketConn) Close() error {
-	return c.closeWithError(nil)
+	c.cancel()
+	return c.QueuePacketConn.Close()
 }
 
-// LocalAddr returns this connection's random Client ID.
-func (c *PollingPacketConn) LocalAddr() net.Addr { return c.clientID }
-
 // RemoteAddr returns the remoteAddr value that was passed to
 // NewPollingPacketConn.
 func (c *PollingPacketConn) RemoteAddr() net.Addr { return c.remoteAddr }
 
-func (c *PollingPacketConn) SetDeadline(t time.Time) error      { return errNotImplemented }
-func (c *PollingPacketConn) SetReadDeadline(t time.Time) error  { return errNotImplemented }
-func (c *PollingPacketConn) SetWriteDeadline(t time.Time) error { return errNotImplemented }
-
 func (c *PollingPacketConn) requestLoop() {
 	pollDelay := initPollDelay
 	pollTimer := time.NewTimer(pollDelay)
@@ -184,52 +104,31 @@ func (c *PollingPacketConn) requestLoop() {
 		body.Write(c.clientID[:])
 
 		var p []byte
+		unstash := c.QueuePacketConn.Unstash(c.remoteAddr)
+		outgoing := c.QueuePacketConn.OutgoingQueue(c.remoteAddr)
 		pollTimerExpired := false
-		// Block, waiting for at least one packet.
+		// Block, waiting for one packet or a demand to poll. Prioritize
+		// taking a packet from the stash, then taking one from the
+		// outgoing queue, then finally also consider polls.
 		select {
-		case <-c.closed:
+		case <-c.ctx.Done():
 			return
-		case p = <-c.sendQueue:
+		case p = <-unstash:
 		default:
 			select {
-			case <-c.closed:
+			case <-c.ctx.Done():
 				return
-			case p = <-c.sendQueue:
-			case <-c.pollChan:
-				p = nil
-			case <-pollTimer.C:
-				p = nil
-				pollTimerExpired = true
-			}
-		}
-
-		if len(p) > 0 {
-			// Encapsulate the packet into the request body.
-			encapsulation.WriteData(&body, p)
-
-			// A data-carrying request displaces one pending poll
-			// opportunity, if any.
-			select {
-			case <-c.pollChan:
-			default:
-			}
-		}
-
-		// Send any other packets that are immediately available, up to
-		// maxSendBundleLength.
-	loop:
-		// TODO: It would be better if maxSendBundleLength were a true
-		// maximum (we don't remove a packet from c.sendQueue unless it
-		// fits in the remaining length). That would also allow for
-		// arbitrary shaping, along with encapsulation.WritePadding.
-		for body.Len() < maxSendBundleLength {
-			select {
-			case <-c.closed:
-				return
-			case p := <-c.sendQueue:
-				encapsulation.WriteData(&body, p)
+			case p = <-unstash:
+			case p = <-outgoing:
 			default:
-				break loop
+				select {
+				case <-c.ctx.Done():
+					return
+				case p = <-unstash:
+				case p = <-outgoing:
+				case <-pollTimer.C:
+					pollTimerExpired = true
+				}
 			}
 		}
 
@@ -241,8 +140,7 @@ func (c *PollingPacketConn) requestLoop() {
 				pollDelay = maxPollDelay
 			}
 		} else {
-			// We're sending an actual data packet, or we're polling
-			// in response to a received packet. Reset the poll
+			// We're sending an actual data packet. Reset the poll
 			// delay to initial.
 			if !pollTimer.Stop() {
 				<-pollTimer.C
@@ -251,36 +149,46 @@ func (c *PollingPacketConn) requestLoop() {
 		}
 		pollTimer.Reset(pollDelay)
 
-		resp, err := c.poller.Poll(&body)
-		if err != nil {
-			c.closeWithError(err)
-			return
-		}
-		defer resp.Close()
+		// Grab as many more packets as are immediately available and
+		// fit in maxSendBundleLength. Always include the first packet,
+		// even if it doesn't fit.
+		first := true
+		for len(p) > 0 && (first || body.Len()+len(p) <= maxSendBundleLength) {
+			first = false
+
+			// Encapsulate the packet into the request body.
+			encapsulation.WriteData(&body, p)
 
-		queuedPoll := false
-		for {
-			p, err := encapsulation.ReadData(resp)
-			if err == io.EOF {
-				break
-			} else if err != nil {
-				c.closeWithError(err)
-				break
-			}
 			select {
-			case c.recvQueue <- p:
+			case p = <-outgoing:
 			default:
-				// Drop incoming packets when queue is full.
+				p = nil
+			}
+		}
+		if len(p) > 0 {
+			// We read an actual packet, but it didn't fit under the
+			// limit. Stash it so that it will be first in line for
+			// the next poll.
+			c.QueuePacketConn.Stash(p, c.remoteAddr)
+		}
+
+		go func() {
+			resp, err := c.poller.Poll(&body)
+			if err != nil {
+				c.Close()
+				return
 			}
-			if !queuedPoll {
-				queuedPoll = true
-				for i := 0; i < 2; i++ {
-					select {
-					case c.pollChan <- struct{}{}:
-					default:
-					}
+			defer resp.Close()
+			for {
+				p, err := encapsulation.ReadData(resp)
+				if err == io.EOF {
+					break
+				} else if err != nil {
+					c.Close()
+					return
 				}
+				c.QueuePacketConn.QueueIncoming(p, c.remoteAddr)
 			}
-		}
+		}()
 	}
 }

-- 
To stop receiving notification emails like this one, please contact
the administrator of this repository.


More information about the tor-commits mailing list