This is an automated email from the git hooks/post-receive script.
dcf pushed a commit to branch turbotunnel in repository pluggable-transports/meek.
commit a61ec4e3c8ab2a606f643f3a8159e3515295bcda Author: David Fifield david@bamsoftware.com AuthorDate: Fri Oct 28 00:22:03 2022 -0600
Reqwrite PollingPacketConn in terms of QueuePacketConn.
As in Champa. Use one main polling loop, not multiple, but execute each poll asynchronously. --- meek-client/turbotunnel.go | 242 ++++++++++++++------------------------------- 1 file changed, 75 insertions(+), 167 deletions(-)
diff --git a/meek-client/turbotunnel.go b/meek-client/turbotunnel.go index d4274e2..cbb3380 100644 --- a/meek-client/turbotunnel.go +++ b/meek-client/turbotunnel.go @@ -10,11 +10,9 @@ package main
import ( "bytes" - "errors" + "context" "io" "net" - "sync" - "sync/atomic" "time"
"git.torproject.org/pluggable-transports/meek.git/common/encapsulation" @@ -22,18 +20,11 @@ import ( )
const ( - // The size of receive and send queues. - queueSize = 256 - // The size of the largest bundle of packets we will send in a poll. // (Actually it's not quite a maximum, we will quit bundling as soon as // it is exceeded.) maxSendBundleLength = 0x10000
- // How many goroutines stand ready to do a poll when an outgoing packet - // needs to be sent. - numRequestLoops = 32 - // We must poll the server to see if it has anything to send; there is // no way for the server to push data back to us until we poll it. When // a timer expires, we send a request even if it has an empty body. The @@ -66,16 +57,13 @@ type PollingPacketConn struct { clientID turbotunnel.ClientID remoteAddr net.Addr poller Poller - recvQueue chan []byte - sendQueue chan []byte - // Sending on pollChan permits requestLoop to send an empty polling - // query. requestLoop also does its own polling according to a time - // schedule. - pollChan chan struct{} - closeOnce sync.Once - closed chan struct{} - // What error to return when the PollingPacketConn is closed. - err atomic.Value + ctx context.Context + cancel context.CancelFunc + // QueuePacketConn is the direct receiver of ReadFrom and WriteTo calls. + // requestLoop, via send, removes messages from the outgoing queue that + // were placed there by WriteTo, and inserts messages into the incoming + // queue to be returned from ReadFrom. + *turbotunnel.QueuePacketConn }
// NewPollingPacketConn creates a PollingPacketConn with a random ClientID as @@ -83,99 +71,31 @@ type PollingPacketConn struct { // ReadFrom, and the RemoteAddr method; is is poller that really controls the // effective remote address. func NewPollingPacketConn(remoteAddr net.Addr, poller Poller) *PollingPacketConn { + clientID := turbotunnel.NewClientID() + ctx, cancel := context.WithCancel(context.Background()) c := &PollingPacketConn{ - clientID: turbotunnel.NewClientID(), - remoteAddr: remoteAddr, - poller: poller, - recvQueue: make(chan []byte, queueSize), - sendQueue: make(chan []byte, queueSize), - pollChan: make(chan struct{}), - closed: make(chan struct{}), - } - for i := 0; i < numRequestLoops; i++ { - go c.requestLoop() + clientID: clientID, + remoteAddr: remoteAddr, + poller: poller, + ctx: ctx, + cancel: cancel, + QueuePacketConn: turbotunnel.NewQueuePacketConn(clientID, 0), } + go c.requestLoop() return c }
-var errClosedPacketConn = errors.New("operation on closed connection") -var errNotImplemented = errors.New("not implemented") - -// ReadFrom returns a packet received from a previous poll, blocking until there -// is a packet to return. Unless the returned error is non-nil, the returned -// net.Addr is always c.RemoteAddr(), -func (c *PollingPacketConn) ReadFrom(p []byte) (int, net.Addr, error) { - select { - case <-c.closed: - return 0, nil, &net.OpError{Op: "read", Net: c.RemoteAddr().Network(), Source: c.LocalAddr(), Addr: c.RemoteAddr(), Err: c.err.Load().(error)} - default: - } - select { - case <-c.closed: - return 0, nil, &net.OpError{Op: "read", Net: c.RemoteAddr().Network(), Source: c.LocalAddr(), Addr: c.RemoteAddr(), Err: c.err.Load().(error)} - case buf := <-c.recvQueue: - return copy(p, buf), c.RemoteAddr(), nil - } -} - -// WriteTo queues a packet to be sent (possibly batched) by the underlying -// poller. -func (c *PollingPacketConn) WriteTo(p []byte, addr net.Addr) (int, error) { - // The addr argument is ignored. - select { - case <-c.closed: - return 0, &net.OpError{Op: "write", Net: c.RemoteAddr().Network(), Source: c.LocalAddr(), Addr: c.RemoteAddr(), Err: c.err.Load().(error)} - default: - } - // Copy the slice so that the caller may reuse it. - buf := make([]byte, len(p)) - copy(buf, p) - select { - case c.sendQueue <- buf: - return len(buf), nil - default: - // Drop the outgoing packet if the send queue is full. - return len(buf), nil - } -} - -// closeWithError unblocks pending operations and makes future operations fail -// with the given error. If err is nil, it becomes errClosedPacketConn. -func (c *PollingPacketConn) closeWithError(err error) error { - var newlyClosed bool - c.closeOnce.Do(func() { - newlyClosed = true - // Store the error to be returned by future PacketConn - // operations. - if err == nil { - err = errClosedPacketConn - } - c.err.Store(err) - close(c.closed) - }) - if !newlyClosed { - return &net.OpError{Op: "close", Net: c.LocalAddr().Network(), Addr: c.LocalAddr(), Err: c.err.Load().(error)} - } - return nil -} - -// Close unblocks pending operations and makes future operations fail with a -// "closed connection" error. +// Close cancels any in-progress polls and closes the underlying +// QueuePacketConn. func (c *PollingPacketConn) Close() error { - return c.closeWithError(nil) + c.cancel() + return c.QueuePacketConn.Close() }
-// LocalAddr returns this connection's random Client ID. -func (c *PollingPacketConn) LocalAddr() net.Addr { return c.clientID } - // RemoteAddr returns the remoteAddr value that was passed to // NewPollingPacketConn. func (c *PollingPacketConn) RemoteAddr() net.Addr { return c.remoteAddr }
-func (c *PollingPacketConn) SetDeadline(t time.Time) error { return errNotImplemented } -func (c *PollingPacketConn) SetReadDeadline(t time.Time) error { return errNotImplemented } -func (c *PollingPacketConn) SetWriteDeadline(t time.Time) error { return errNotImplemented } - func (c *PollingPacketConn) requestLoop() { pollDelay := initPollDelay pollTimer := time.NewTimer(pollDelay) @@ -184,52 +104,31 @@ func (c *PollingPacketConn) requestLoop() { body.Write(c.clientID[:])
var p []byte + unstash := c.QueuePacketConn.Unstash(c.remoteAddr) + outgoing := c.QueuePacketConn.OutgoingQueue(c.remoteAddr) pollTimerExpired := false - // Block, waiting for at least one packet. + // Block, waiting for one packet or a demand to poll. Prioritize + // taking a packet from the stash, then taking one from the + // outgoing queue, then finally also consider polls. select { - case <-c.closed: + case <-c.ctx.Done(): return - case p = <-c.sendQueue: + case p = <-unstash: default: select { - case <-c.closed: + case <-c.ctx.Done(): return - case p = <-c.sendQueue: - case <-c.pollChan: - p = nil - case <-pollTimer.C: - p = nil - pollTimerExpired = true - } - } - - if len(p) > 0 { - // Encapsulate the packet into the request body. - encapsulation.WriteData(&body, p) - - // A data-carrying request displaces one pending poll - // opportunity, if any. - select { - case <-c.pollChan: - default: - } - } - - // Send any other packets that are immediately available, up to - // maxSendBundleLength. - loop: - // TODO: It would be better if maxSendBundleLength were a true - // maximum (we don't remove a packet from c.sendQueue unless it - // fits in the remaining length). That would also allow for - // arbitrary shaping, along with encapsulation.WritePadding. - for body.Len() < maxSendBundleLength { - select { - case <-c.closed: - return - case p := <-c.sendQueue: - encapsulation.WriteData(&body, p) + case p = <-unstash: + case p = <-outgoing: default: - break loop + select { + case <-c.ctx.Done(): + return + case p = <-unstash: + case p = <-outgoing: + case <-pollTimer.C: + pollTimerExpired = true + } } }
@@ -241,8 +140,7 @@ func (c *PollingPacketConn) requestLoop() { pollDelay = maxPollDelay } } else { - // We're sending an actual data packet, or we're polling - // in response to a received packet. Reset the poll + // We're sending an actual data packet. Reset the poll // delay to initial. if !pollTimer.Stop() { <-pollTimer.C @@ -251,36 +149,46 @@ func (c *PollingPacketConn) requestLoop() { } pollTimer.Reset(pollDelay)
- resp, err := c.poller.Poll(&body) - if err != nil { - c.closeWithError(err) - return - } - defer resp.Close() + // Grab as many more packets as are immediately available and + // fit in maxSendBundleLength. Always include the first packet, + // even if it doesn't fit. + first := true + for len(p) > 0 && (first || body.Len()+len(p) <= maxSendBundleLength) { + first = false + + // Encapsulate the packet into the request body. + encapsulation.WriteData(&body, p)
- queuedPoll := false - for { - p, err := encapsulation.ReadData(resp) - if err == io.EOF { - break - } else if err != nil { - c.closeWithError(err) - break - } select { - case c.recvQueue <- p: + case p = <-outgoing: default: - // Drop incoming packets when queue is full. + p = nil + } + } + if len(p) > 0 { + // We read an actual packet, but it didn't fit under the + // limit. Stash it so that it will be first in line for + // the next poll. + c.QueuePacketConn.Stash(p, c.remoteAddr) + } + + go func() { + resp, err := c.poller.Poll(&body) + if err != nil { + c.Close() + return } - if !queuedPoll { - queuedPoll = true - for i := 0; i < 2; i++ { - select { - case c.pollChan <- struct{}{}: - default: - } + defer resp.Close() + for { + p, err := encapsulation.ReadData(resp) + if err == io.EOF { + break + } else if err != nil { + c.Close() + return } + c.QueuePacketConn.QueueIncoming(p, c.remoteAddr) } - } + }() } }