This is an automated email from the git hooks/post-receive script.
dcf pushed a commit to branch turbotunnel in repository pluggable-transports/meek.
commit 70b33a768d66497e4ed8f02e8e4bba10f2e26ab7 Author: David Fifield david@bamsoftware.com AuthorDate: Thu Oct 27 18:33:57 2022 -0600
Import RemoteMap from Champa/dnstt.
Adds support for a one-packet "stash," and closes send queues when expiring. --- common/turbotunnel/client_map.go | 144 --------------------------- common/turbotunnel/queuepacketconn.go | 37 +++++-- common/turbotunnel/remotemap.go | 177 ++++++++++++++++++++++++++++++++++ 3 files changed, 208 insertions(+), 150 deletions(-)
diff --git a/common/turbotunnel/client_map.go b/common/turbotunnel/client_map.go deleted file mode 100644 index 733480a..0000000 --- a/common/turbotunnel/client_map.go +++ /dev/null @@ -1,144 +0,0 @@ -package turbotunnel - -import ( - "container/heap" - "net" - "sync" - "time" -) - -// clientRecord is a record of a recently seen client, with the time it was last -// seen and a send queue. -type clientRecord struct { - Addr net.Addr - LastSeen time.Time - SendQueue chan []byte -} - -// ClientMap manages a mapping of live clients (keyed by address, which will be -// a ClientID) to their respective send queues. ClientMap's functions are safe -// to call from multiple goroutines. -type ClientMap struct { - // We use an inner structure to avoid exposing public heap.Interface - // functions to users of clientMap. - inner clientMapInner - // Synchronizes access to inner. - lock sync.Mutex -} - -// NewClientMap creates a ClientMap that expires clients after a timeout. -// -// The timeout does not have to be kept in sync with smux's internal idle -// timeout. If a client is removed from the client map while the smux session is -// still live, the worst that can happen is a loss of whatever packets were in -// the send queue at the time. If smux later decides to send more packets to the -// same client, we'll instantiate a new send queue, and if the client ever -// connects again with the proper client ID, we'll deliver them. -func NewClientMap(timeout time.Duration) *ClientMap { - m := &ClientMap{ - inner: clientMapInner{ - byAge: make([]*clientRecord, 0), - byAddr: make(map[net.Addr]int), - }, - } - go func() { - for { - time.Sleep(timeout / 2) - now := time.Now() - m.lock.Lock() - m.inner.removeExpired(now, timeout) - m.lock.Unlock() - } - }() - return m -} - -// SendQueue returns the send queue corresponding to addr, creating it if -// necessary. -func (m *ClientMap) SendQueue(addr net.Addr) chan []byte { - m.lock.Lock() - defer m.lock.Unlock() - return m.inner.SendQueue(addr, time.Now()) -} - -// clientMapInner is the inner type of ClientMap, implementing heap.Interface. -// byAge is the backing store, a heap ordered by LastSeen time, to facilitate -// expiring old client records. byAddr is a map from addresses (i.e., ClientIDs) -// to heap indices, to allow looking up by address. Unlike ClientMap, -// clientMapInner requires external synchonization. -type clientMapInner struct { - byAge []*clientRecord - byAddr map[net.Addr]int -} - -// removeExpired removes all client records whose LastSeen timestamp is more -// than timeout in the past. -func (inner *clientMapInner) removeExpired(now time.Time, timeout time.Duration) { - for len(inner.byAge) > 0 && now.Sub(inner.byAge[0].LastSeen) >= timeout { - heap.Pop(inner) - } -} - -// SendQueue finds the existing client record corresponding to addr, or creates -// a new one if none exists yet. It updates the client record's LastSeen time -// and returns its SendQueue. -func (inner *clientMapInner) SendQueue(addr net.Addr, now time.Time) chan []byte { - var record *clientRecord - i, ok := inner.byAddr[addr] - if ok { - // Found one, update its LastSeen. - record = inner.byAge[i] - record.LastSeen = now - heap.Fix(inner, i) - } else { - // Not found, create a new one. - record = &clientRecord{ - Addr: addr, - LastSeen: now, - SendQueue: make(chan []byte, queueSize), - } - heap.Push(inner, record) - } - return record.SendQueue -} - -// heap.Interface for clientMapInner. - -func (inner *clientMapInner) Len() int { - if len(inner.byAge) != len(inner.byAddr) { - panic("inconsistent clientMap") - } - return len(inner.byAge) -} - -func (inner *clientMapInner) Less(i, j int) bool { - return inner.byAge[i].LastSeen.Before(inner.byAge[j].LastSeen) -} - -func (inner *clientMapInner) Swap(i, j int) { - inner.byAge[i], inner.byAge[j] = inner.byAge[j], inner.byAge[i] - inner.byAddr[inner.byAge[i].Addr] = i - inner.byAddr[inner.byAge[j].Addr] = j -} - -func (inner *clientMapInner) Push(x interface{}) { - record := x.(*clientRecord) - if _, ok := inner.byAddr[record.Addr]; ok { - panic("duplicate address in clientMap") - } - // Insert into byAddr map. - inner.byAddr[record.Addr] = len(inner.byAge) - // Insert into byAge slice. - inner.byAge = append(inner.byAge, record) -} - -func (inner *clientMapInner) Pop() interface{} { - n := len(inner.byAddr) - // Remove from byAge slice. - record := inner.byAge[n-1] - inner.byAge[n-1] = nil - inner.byAge = inner.byAge[:n-1] - // Remove from byAddr map. - delete(inner.byAddr, record.Addr) - return record -} diff --git a/common/turbotunnel/queuepacketconn.go b/common/turbotunnel/queuepacketconn.go index acb04c1..eb4df4b 100644 --- a/common/turbotunnel/queuepacketconn.go +++ b/common/turbotunnel/queuepacketconn.go @@ -21,8 +21,19 @@ type taggedPacket struct { // method inserts a packet into the incoming queue, to eventually be returned by // ReadFrom. WriteTo inserts a packet into an address-specific outgoing queue, // which can later by accessed through the OutgoingQueue method. +// +// Besides the outgoing queues, there is also a one-element "stash" for each +// remote peer address. You can stash a packet using the Stash method, and get +// it back later by receiving from the channel returned by Unstash. The stash is +// meant as a convenient place to temporarily store a single packet, such as +// when you've read one too many packets from the send queue and need to store +// the extra packet to be processed first in the next pass. It's the caller's +// responsibility to Unstash what they have Stashed. Calling Stash does not put +// the packet at the head of the send queue; if there is the possibility that a +// packet has been stashed, it must be checked for by calling Unstash in +// addition to OutgoingQueue. type QueuePacketConn struct { - clients *ClientMap + remotes *RemoteMap localAddr net.Addr recvQueue chan taggedPacket closeOnce sync.Once @@ -31,11 +42,11 @@ type QueuePacketConn struct { err atomic.Value }
-// NewQueuePacketConn makes a new QueuePacketConn, set to track recent peers for -// at least a duration of timeout. +// NewQueuePacketConn makes a new QueuePacketConn, set to track recent peers +// for at least a duration of timeout. func NewQueuePacketConn(localAddr net.Addr, timeout time.Duration) *QueuePacketConn { return &QueuePacketConn{ - clients: NewClientMap(timeout), + remotes: NewRemoteMap(timeout), localAddr: localAddr, recvQueue: make(chan taggedPacket, queueSize), closed: make(chan struct{}), @@ -65,7 +76,21 @@ func (c *QueuePacketConn) QueueIncoming(p []byte, addr net.Addr) { // creating it if necessary. The contents of the queue will be packets that are // written to the address in question using WriteTo. func (c *QueuePacketConn) OutgoingQueue(addr net.Addr) <-chan []byte { - return c.clients.SendQueue(addr) + return c.remotes.SendQueue(addr) +} + +// Stash places p in the stash for addr, if the stash is not already occupied. +// Returns true if the packet was placed in the stash, or false if the stash was +// already occupied. This method is similar to WriteTo, except that it puts the +// packet in the stash queue (accessible via Unstash), rather than the outgoing +// queue (accessible via OutgoingQueue). +func (c *QueuePacketConn) Stash(p []byte, addr net.Addr) bool { + return c.remotes.Stash(addr, p) +} + +// Unstash returns the channel that represents the stash for addr. +func (c *QueuePacketConn) Unstash(addr net.Addr) <-chan []byte { + return c.remotes.Unstash(addr) }
// ReadFrom returns a packet and address previously stored by QueueIncoming. @@ -95,7 +120,7 @@ func (c *QueuePacketConn) WriteTo(p []byte, addr net.Addr) (int, error) { buf := make([]byte, len(p)) copy(buf, p) select { - case c.clients.SendQueue(addr) <- buf: + case c.remotes.SendQueue(addr) <- buf: return len(buf), nil default: // Drop the outgoing packet if the send queue is full. diff --git a/common/turbotunnel/remotemap.go b/common/turbotunnel/remotemap.go new file mode 100644 index 0000000..c679bfa --- /dev/null +++ b/common/turbotunnel/remotemap.go @@ -0,0 +1,177 @@ +package turbotunnel + +import ( + "container/heap" + "net" + "sync" + "time" +) + +// remoteRecord is a record of a recently seen remote peer, with the time it was +// last seen and queues of outgoing packets. +type remoteRecord struct { + Addr net.Addr + LastSeen time.Time + SendQueue chan []byte + Stash chan []byte +} + +// RemoteMap manages a mapping of live remote peers, keyed by address, to their +// respective send queues. Each peer has two queues: a primary send queue, and a +// "stash". The primary send queue is returned by the SendQueue method. The +// stash is an auxiliary one-element queue accessed using the Stash and Unstash +// methods. The stash is meant for use by callers that need to "unread" a packet +// that's already been removed from the primary send queue. +// +// RemoteMap's functions are safe to call from multiple goroutines. +type RemoteMap struct { + // We use an inner structure to avoid exposing public heap.Interface + // functions to users of remoteMap. + inner remoteMapInner + // Synchronizes access to inner. + lock sync.Mutex +} + +// NewRemoteMap creates a RemoteMap that expires peers after a timeout. +// +// If the timeout is 0, peers never expire. +// +// The timeout does not have to be kept in sync with smux's idle timeout. If a +// peer is removed from the map while the smux session is still live, the worst +// that can happen is a loss of whatever packets were in the send queue at the +// time. If smux later decides to send more packets to the same peer, we'll +// instantiate a new send queue, and if the peer is ever seen again with a +// matching address, we'll deliver them. +func NewRemoteMap(timeout time.Duration) *RemoteMap { + m := &RemoteMap{ + inner: remoteMapInner{ + byAge: make([]*remoteRecord, 0), + byAddr: make(map[net.Addr]int), + }, + } + if timeout > 0 { + go func() { + for { + time.Sleep(timeout / 2) + now := time.Now() + m.lock.Lock() + m.inner.removeExpired(now, timeout) + m.lock.Unlock() + } + }() + } + return m +} + +// SendQueue returns the send queue corresponding to addr, creating it if +// necessary. +func (m *RemoteMap) SendQueue(addr net.Addr) chan []byte { + m.lock.Lock() + defer m.lock.Unlock() + return m.inner.Lookup(addr, time.Now()).SendQueue +} + +// Stash places p in the stash corresponding to addr, if the stash is not +// already occupied. Returns true if the p was placed in the stash, false +// otherwise. +func (m *RemoteMap) Stash(addr net.Addr, p []byte) bool { + m.lock.Lock() + defer m.lock.Unlock() + select { + case m.inner.Lookup(addr, time.Now()).Stash <- p: + return true + default: + return false + } +} + +// Unstash returns the channel that reads from the stash for addr. +func (m *RemoteMap) Unstash(addr net.Addr) <-chan []byte { + m.lock.Lock() + defer m.lock.Unlock() + return m.inner.Lookup(addr, time.Now()).Stash +} + +// remoteMapInner is the inner type of RemoteMap, implementing heap.Interface. +// byAge is the backing store, a heap ordered by LastSeen time, to facilitate +// expiring old records. byAddr is a map from addresses to heap indices, to +// allow looking up by address. Unlike RemoteMap, remoteMapInner requires +// external synchonization. +type remoteMapInner struct { + byAge []*remoteRecord + byAddr map[net.Addr]int +} + +// removeExpired removes all records whose LastSeen timestamp is more than +// timeout in the past. +func (inner *remoteMapInner) removeExpired(now time.Time, timeout time.Duration) { + for len(inner.byAge) > 0 && now.Sub(inner.byAge[0].LastSeen) >= timeout { + record := heap.Pop(inner).(*remoteRecord) + close(record.SendQueue) + } +} + +// Lookup finds the existing record corresponding to addr, or creates a new +// one if none exists yet. It updates the record's LastSeen time and returns the +// record. +func (inner *remoteMapInner) Lookup(addr net.Addr, now time.Time) *remoteRecord { + var record *remoteRecord + i, ok := inner.byAddr[addr] + if ok { + // Found one, update its LastSeen. + record = inner.byAge[i] + record.LastSeen = now + heap.Fix(inner, i) + } else { + // Not found, create a new one. + record = &remoteRecord{ + Addr: addr, + LastSeen: now, + SendQueue: make(chan []byte, queueSize), + Stash: make(chan []byte, 1), + } + heap.Push(inner, record) + } + return record +} + +// heap.Interface for remoteMapInner. + +func (inner *remoteMapInner) Len() int { + if len(inner.byAge) != len(inner.byAddr) { + panic("inconsistent remoteMap") + } + return len(inner.byAge) +} + +func (inner *remoteMapInner) Less(i, j int) bool { + return inner.byAge[i].LastSeen.Before(inner.byAge[j].LastSeen) +} + +func (inner *remoteMapInner) Swap(i, j int) { + inner.byAge[i], inner.byAge[j] = inner.byAge[j], inner.byAge[i] + inner.byAddr[inner.byAge[i].Addr] = i + inner.byAddr[inner.byAge[j].Addr] = j +} + +func (inner *remoteMapInner) Push(x interface{}) { + record := x.(*remoteRecord) + if _, ok := inner.byAddr[record.Addr]; ok { + panic("duplicate address in remoteMap") + } + // Insert into byAddr map. + inner.byAddr[record.Addr] = len(inner.byAge) + // Insert into byAge slice. + inner.byAge = append(inner.byAge, record) +} + +func (inner *remoteMapInner) Pop() interface{} { + n := len(inner.byAddr) + // Remove from byAge slice. + record := inner.byAge[n-1] + inner.byAge[n-1] = nil + inner.byAge = inner.byAge[:n-1] + // Remove from byAddr map. + delete(inner.byAddr, record.Addr) + return record +}