This is an automated email from the git hooks/post-receive script.
dcf pushed a change to branch turbotunnel in repository pluggable-transports/meek.
from 34cdac0 Port some performance tweaks from Champa. new 829ab9e Set queueSize = 256 in meek-client as well. new 4c4a6fd Remove obsolete comments referring to QUIC. new 80b6d3b Factor turbotunnel into a common package. new 70b33a7 Import RemoteMap from Champa/dnstt. new 0b83ea6 Import server send loop from Champa. new 4c76264 Add TestCloseHaltsRequestLoop. new a61ec4e Reqwrite PollingPacketConn in terms of QueuePacketConn. new 46c988b Halt requestLoop when PollingPacketConn is closed. add bc887de Use Clone to make copies of http.DefaultTransport. add ec78bba Remove d2zfqthxsdq309.cloudfront.net from the example torrc. add 9bd0572 Update utls fingerprints add 1b9883a Do the go.mod thing. add ae32853 Remove TestCopyPublicFieldsHTTPTransport. add d9d57c2 Use a non-empty ServerName in TestProxyHTTPSCONNECT. add 7aa47e7 Update utls to v0.0.0-20210713165636-0b2885c8c0d4. add 339f3d8 Document equalities of "_Auto" utls fingerprints. add ae42cad Fix markup in meek-server man page. add 87f2903 programVersion = "0.36.0" add 17083fc Actually recognize utls=HelloChrome_83 in meek-client. add 53974af programVersion = "0.37.0" add 6600c52 Add missing transport to ServerTransportListenAddr in meek-server man page. add 88fd723 Only lock the assignment to rt.rt, not the whole RoundTrip. add 46e612d Fix the locking around rt.rt. add 2da6dd9 Upgrade utls to v1.1.5. add 16854a7 `go fmt` with go1.19.1. add e9eff72 Move the error check immediately after http.NewRequest. new b93c5a0 Merge branch 'main' into turbotunnel
The 9 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference.
Summary of changes: common/encapsulation/encapsulation.go | 22 +- common/turbotunnel/clientid.go | 28 +++ common/turbotunnel/consts.go | 14 ++ .../turbotunnel/queuepacketconn.go | 79 +++--- common/turbotunnel/remotemap.go | 177 ++++++++++++++ doc/meek-client.1 | 208 +++++++++++++++- doc/meek-client.1.txt | 24 +- doc/meek-server.1 | 24 +- doc/meek-server.1.txt | 6 +- go.mod | 8 +- go.sum | 28 ++- meek-client-torbrowser/linux.go | 2 + meek-client-torbrowser/mac.go | 2 + meek-client-torbrowser/meek-client-torbrowser.go | 4 +- meek-client-torbrowser/terminate_other.go | 1 + meek-client-torbrowser/terminate_windows.go | 1 + meek-client-torbrowser/windows.go | 2 + meek-client/meek-client.go | 28 ++- meek-client/proxy_test.go | 2 +- meek-client/torrc | 1 - meek-client/turbotunnel.go | 272 ++++++--------------- meek-client/turbotunnel_test.go | 89 +++++++ meek-client/utls.go | 68 +++--- meek-client/utls_test.go | 45 +--- meek-server/certificate.go | 1 + meek-server/client_map.go | 144 ----------- meek-server/meek-server.go | 96 ++++++-- 27 files changed, 844 insertions(+), 532 deletions(-) create mode 100644 common/turbotunnel/clientid.go create mode 100644 common/turbotunnel/consts.go rename meek-server/turbotunnel.go => common/turbotunnel/queuepacketconn.go (70%) create mode 100644 common/turbotunnel/remotemap.go create mode 100644 meek-client/turbotunnel_test.go delete mode 100644 meek-server/client_map.go
This is an automated email from the git hooks/post-receive script.
dcf pushed a commit to branch turbotunnel in repository pluggable-transports/meek.
commit 829ab9e7b00fb0b223615b9693de1296ae2f4656 Author: David Fifield david@bamsoftware.com AuthorDate: Thu Oct 27 17:43:36 2022 -0600
Set queueSize = 256 in meek-client as well. --- meek-client/turbotunnel.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/meek-client/turbotunnel.go b/meek-client/turbotunnel.go index 17202b4..7e10d01 100644 --- a/meek-client/turbotunnel.go +++ b/meek-client/turbotunnel.go @@ -24,7 +24,7 @@ import (
const ( // The size of receive and send queues. - queueSize = 32 + queueSize = 256
// The size of the largest bundle of packets we will send in a poll. // (Actually it's not quite a maximum, we will quit bundling as soon as
This is an automated email from the git hooks/post-receive script.
dcf pushed a commit to branch turbotunnel in repository pluggable-transports/meek.
commit 4c4a6fd14e5ccaf35486fa798e5c206fee875168 Author: David Fifield david@bamsoftware.com AuthorDate: Thu Oct 27 17:49:52 2022 -0600
Remove obsolete comments referring to QUIC. --- meek-client/turbotunnel.go | 3 --- meek-server/client_map.go | 6 +++--- meek-server/turbotunnel.go | 3 --- 3 files changed, 3 insertions(+), 9 deletions(-)
diff --git a/meek-client/turbotunnel.go b/meek-client/turbotunnel.go index 7e10d01..535c05c 100644 --- a/meek-client/turbotunnel.go +++ b/meek-client/turbotunnel.go @@ -53,9 +53,6 @@ const ( // along with all HTTP requests, and the server uses the ClientID to // disambiguate requests among its many clients. ClientID implements the // net.Addr interface. -// -// ClientID duplicates the functionality of the QUIC connection ID, but quic-go -// does not provide accessors for the connection ID. type ClientID [8]byte
func newClientID() ClientID { diff --git a/meek-server/client_map.go b/meek-server/client_map.go index 33affff..6f2f69a 100644 --- a/meek-server/client_map.go +++ b/meek-server/client_map.go @@ -28,10 +28,10 @@ type ClientMap struct {
// NewClientMap creates a ClientMap that expires clients after a timeout. // -// The timeout does not have to be kept in sync with QUIC's internal idle -// timeout. If a client is removed from the client map while the QUIC session is +// The timeout does not have to be kept in sync with smux's internal idle +// timeout. If a client is removed from the client map while the smux session is // still live, the worst that can happen is a loss of whatever packets were in -// the send queue at the time. If QUIC later decides to send more packets to the +// the send queue at the time. If smux later decides to send more packets to the // same client, we'll instantiate a new send queue, and if the client ever // connects again with the proper client ID, we'll deliver them. func NewClientMap(timeout time.Duration) *ClientMap { diff --git a/meek-server/turbotunnel.go b/meek-server/turbotunnel.go index 277d656..d10b795 100644 --- a/meek-server/turbotunnel.go +++ b/meek-server/turbotunnel.go @@ -25,9 +25,6 @@ const queueSize = 256 // along with all HTTP requests, and the server uses the ClientID to // disambiguate requests among its many clients. ClientID implements the // net.Addr interface. -// -// ClientID duplicates the functionality of the QUIC connection ID, but quic-go -// does not provide accessors for the connection ID. type ClientID [8]byte
func newClientID() ClientID {
This is an automated email from the git hooks/post-receive script.
dcf pushed a commit to branch turbotunnel in repository pluggable-transports/meek.
commit 80b6d3b101108086375fda872c0f7c19d4c0c158 Author: David Fifield david@bamsoftware.com AuthorDate: Thu Oct 27 17:58:47 2022 -0600
Factor turbotunnel into a common package. --- {meek-server => common/turbotunnel}/client_map.go | 2 +- common/turbotunnel/clientid.go | 28 +++++++++++++ common/turbotunnel/consts.go | 14 +++++++ .../turbotunnel/queuepacketconn.go | 47 +++------------------- meek-client/turbotunnel.go | 27 ++----------- meek-server/meek-server.go | 7 ++-- 6 files changed, 56 insertions(+), 69 deletions(-)
diff --git a/meek-server/client_map.go b/common/turbotunnel/client_map.go similarity index 99% rename from meek-server/client_map.go rename to common/turbotunnel/client_map.go index 6f2f69a..733480a 100644 --- a/meek-server/client_map.go +++ b/common/turbotunnel/client_map.go @@ -1,4 +1,4 @@ -package main +package turbotunnel
import ( "container/heap" diff --git a/common/turbotunnel/clientid.go b/common/turbotunnel/clientid.go new file mode 100644 index 0000000..17257e1 --- /dev/null +++ b/common/turbotunnel/clientid.go @@ -0,0 +1,28 @@ +package turbotunnel + +import ( + "crypto/rand" + "encoding/hex" +) + +// ClientID is an abstract identifier that binds together all the communications +// belonging to a single client session, even though those communications may +// arrive from multiple IP addresses or over multiple lower-level connections. +// It plays the same role that an (IP address, port number) tuple plays in a +// net.UDPConn: it's the return address pertaining to a long-lived abstract +// client session. The client attaches its ClientID to each of its +// communications, enabling the server to disambiguate requests among its many +// clients. ClientID implements the net.Addr interface. +type ClientID [8]byte + +func NewClientID() ClientID { + var id ClientID + _, err := rand.Read(id[:]) + if err != nil { + panic(err) + } + return id +} + +func (id ClientID) Network() string { return "clientid" } +func (id ClientID) String() string { return hex.EncodeToString(id[:]) } diff --git a/common/turbotunnel/consts.go b/common/turbotunnel/consts.go new file mode 100644 index 0000000..0d280af --- /dev/null +++ b/common/turbotunnel/consts.go @@ -0,0 +1,14 @@ +// The code in this package provides a compatibility layer between an underlying +// packet-based connection and our polling-based domain-fronted HTTP carrier. +// The main interface is QueuePacketConn, which stores packets in buffered +// channels instead of sending/receiving them on some concrete network +// interface. Callers can inspect these channels. +package turbotunnel + +import "errors" + +// The size of receive and send queues. +const queueSize = 256 + +var errClosedPacketConn = errors.New("operation on closed connection") +var errNotImplemented = errors.New("not implemented") diff --git a/meek-server/turbotunnel.go b/common/turbotunnel/queuepacketconn.go similarity index 73% rename from meek-server/turbotunnel.go rename to common/turbotunnel/queuepacketconn.go index d10b795..acb04c1 100644 --- a/meek-server/turbotunnel.go +++ b/common/turbotunnel/queuepacketconn.go @@ -1,44 +1,12 @@ -// The code in this file provides a compatibility layer between an underlying -// packet-based connection and our polling-based domain-fronted HTTP carrier. -// The main interface is QueuePacketConn, which stores packets in buffered -// channels instead of sending/receiving them on some concrete network -// interface. Callers can inspect these channels. - -package main +package turbotunnel
import ( - "crypto/rand" - "encoding/hex" - "errors" "net" "sync" "sync/atomic" "time" )
-// The size of receive and send queues. -const queueSize = 256 - -// ClientID plays the role in QueuePacketConn that an (IP address, port) tuple -// plays in a net.UDPConn. It is a persistent identifier that binds together all -// the HTTP transactions that occur as part of a session. The ClientID is sent -// along with all HTTP requests, and the server uses the ClientID to -// disambiguate requests among its many clients. ClientID implements the -// net.Addr interface. -type ClientID [8]byte - -func newClientID() ClientID { - var id ClientID - _, err := rand.Read(id[:]) - if err != nil { - panic(err) - } - return id -} - -func (id ClientID) Network() string { return "clientid" } -func (id ClientID) String() string { return hex.EncodeToString(id[:]) } - // taggedPacket is a combination of a []byte and a net.Addr, encapsulating the // return type of PacketConn.ReadFrom. type taggedPacket struct { @@ -48,9 +16,9 @@ type taggedPacket struct {
// QueuePacketConn implements net.PacketConn by storing queues of packets. There // is one incoming queue (where packets are additionally tagged by the source -// address of the client that sent them). There are many outgoing queues, one -// for each client address that has been recently seen. The QueueIncoming method -// inserts a packet into the incoming queue, to eventually be returned by +// address of the peer that sent them). There are many outgoing queues, one for +// each remote peer address that has been recently seen. The QueueIncoming +// method inserts a packet into the incoming queue, to eventually be returned by // ReadFrom. WriteTo inserts a packet into an address-specific outgoing queue, // which can later by accessed through the OutgoingQueue method. type QueuePacketConn struct { @@ -63,8 +31,8 @@ type QueuePacketConn struct { err atomic.Value }
-// NewQueuePacketConn makes a new QueuePacketConn, set to track recent clients -// for at least a duration of timeout. +// NewQueuePacketConn makes a new QueuePacketConn, set to track recent peers for +// at least a duration of timeout. func NewQueuePacketConn(localAddr net.Addr, timeout time.Duration) *QueuePacketConn { return &QueuePacketConn{ clients: NewClientMap(timeout), @@ -100,9 +68,6 @@ func (c *QueuePacketConn) OutgoingQueue(addr net.Addr) <-chan []byte { return c.clients.SendQueue(addr) }
-var errClosedPacketConn = errors.New("operation on closed connection") -var errNotImplemented = errors.New("not implemented") - // ReadFrom returns a packet and address previously stored by QueueIncoming. func (c *QueuePacketConn) ReadFrom(p []byte) (int, net.Addr, error) { select { diff --git a/meek-client/turbotunnel.go b/meek-client/turbotunnel.go index 535c05c..d4274e2 100644 --- a/meek-client/turbotunnel.go +++ b/meek-client/turbotunnel.go @@ -10,8 +10,6 @@ package main
import ( "bytes" - "crypto/rand" - "encoding/hex" "errors" "io" "net" @@ -20,6 +18,7 @@ import ( "time"
"git.torproject.org/pluggable-transports/meek.git/common/encapsulation" + "git.torproject.org/pluggable-transports/meek.git/common/turbotunnel" )
const ( @@ -47,26 +46,6 @@ const ( pollDelayMultiplier = 2.0 )
-// ClientID plays the role in PollingPacketConn that an (IP address, port) tuple -// plays in a net.UDPConn. It is a persistent identifier that binds together all -// the HTTP transactions that occur as part of a session. The ClientID is sent -// along with all HTTP requests, and the server uses the ClientID to -// disambiguate requests among its many clients. ClientID implements the -// net.Addr interface. -type ClientID [8]byte - -func newClientID() ClientID { - var id ClientID - _, err := rand.Read(id[:]) - if err != nil { - panic(err) - } - return id -} - -func (id ClientID) Network() string { return "clientid" } -func (id ClientID) String() string { return hex.EncodeToString(id[:]) } - // Poller is an abstract interface over an operation that writes a stream of // bytes and reads a stream of bytes in return, like an HTTP request. type Poller interface { @@ -84,7 +63,7 @@ type Poller interface { // HTTP response body). The Poller can apply HTTP request customizations such as // domain fronting. type PollingPacketConn struct { - clientID ClientID + clientID turbotunnel.ClientID remoteAddr net.Addr poller Poller recvQueue chan []byte @@ -105,7 +84,7 @@ type PollingPacketConn struct { // effective remote address. func NewPollingPacketConn(remoteAddr net.Addr, poller Poller) *PollingPacketConn { c := &PollingPacketConn{ - clientID: newClientID(), + clientID: turbotunnel.NewClientID(), remoteAddr: remoteAddr, poller: poller, recvQueue: make(chan []byte, queueSize), diff --git a/meek-server/meek-server.go b/meek-server/meek-server.go index a8efad2..f971ab8 100644 --- a/meek-server/meek-server.go +++ b/meek-server/meek-server.go @@ -40,6 +40,7 @@ import (
"git.torproject.org/pluggable-transports/goptlib.git" "git.torproject.org/pluggable-transports/meek.git/common/encapsulation" + "git.torproject.org/pluggable-transports/meek.git/common/turbotunnel" "github.com/xtaci/kcp-go/v5" "github.com/xtaci/smux" "golang.org/x/crypto/acme/autocert" @@ -80,11 +81,11 @@ func httpInternalServerError(w http.ResponseWriter) { // listener, so there is just one global state. State also serves as the http // Handler. type State struct { - conn *QueuePacketConn + conn *turbotunnel.QueuePacketConn }
func NewState(localAddr net.Addr) (*State, error) { - pconn := NewQueuePacketConn(localAddr, smuxIdleTimeout) + pconn := turbotunnel.NewQueuePacketConn(localAddr, smuxIdleTimeout)
ln, err := kcp.ServeConn(nil, 0, 0, pconn) if err != nil { @@ -147,7 +148,7 @@ func (state *State) Post(w http.ResponseWriter, req *http.Request) { defer req.Body.Close() body := http.MaxBytesReader(w, req.Body, maxPayloadLength+1)
- var clientID ClientID + var clientID turbotunnel.ClientID _, err := io.ReadFull(body, clientID[:]) if err != nil { // The request body didn't even contain enough bytes for the
This is an automated email from the git hooks/post-receive script.
dcf pushed a commit to branch turbotunnel in repository pluggable-transports/meek.
commit 70b33a768d66497e4ed8f02e8e4bba10f2e26ab7 Author: David Fifield david@bamsoftware.com AuthorDate: Thu Oct 27 18:33:57 2022 -0600
Import RemoteMap from Champa/dnstt.
Adds support for a one-packet "stash," and closes send queues when expiring. --- common/turbotunnel/client_map.go | 144 --------------------------- common/turbotunnel/queuepacketconn.go | 37 +++++-- common/turbotunnel/remotemap.go | 177 ++++++++++++++++++++++++++++++++++ 3 files changed, 208 insertions(+), 150 deletions(-)
diff --git a/common/turbotunnel/client_map.go b/common/turbotunnel/client_map.go deleted file mode 100644 index 733480a..0000000 --- a/common/turbotunnel/client_map.go +++ /dev/null @@ -1,144 +0,0 @@ -package turbotunnel - -import ( - "container/heap" - "net" - "sync" - "time" -) - -// clientRecord is a record of a recently seen client, with the time it was last -// seen and a send queue. -type clientRecord struct { - Addr net.Addr - LastSeen time.Time - SendQueue chan []byte -} - -// ClientMap manages a mapping of live clients (keyed by address, which will be -// a ClientID) to their respective send queues. ClientMap's functions are safe -// to call from multiple goroutines. -type ClientMap struct { - // We use an inner structure to avoid exposing public heap.Interface - // functions to users of clientMap. - inner clientMapInner - // Synchronizes access to inner. - lock sync.Mutex -} - -// NewClientMap creates a ClientMap that expires clients after a timeout. -// -// The timeout does not have to be kept in sync with smux's internal idle -// timeout. If a client is removed from the client map while the smux session is -// still live, the worst that can happen is a loss of whatever packets were in -// the send queue at the time. If smux later decides to send more packets to the -// same client, we'll instantiate a new send queue, and if the client ever -// connects again with the proper client ID, we'll deliver them. -func NewClientMap(timeout time.Duration) *ClientMap { - m := &ClientMap{ - inner: clientMapInner{ - byAge: make([]*clientRecord, 0), - byAddr: make(map[net.Addr]int), - }, - } - go func() { - for { - time.Sleep(timeout / 2) - now := time.Now() - m.lock.Lock() - m.inner.removeExpired(now, timeout) - m.lock.Unlock() - } - }() - return m -} - -// SendQueue returns the send queue corresponding to addr, creating it if -// necessary. -func (m *ClientMap) SendQueue(addr net.Addr) chan []byte { - m.lock.Lock() - defer m.lock.Unlock() - return m.inner.SendQueue(addr, time.Now()) -} - -// clientMapInner is the inner type of ClientMap, implementing heap.Interface. -// byAge is the backing store, a heap ordered by LastSeen time, to facilitate -// expiring old client records. byAddr is a map from addresses (i.e., ClientIDs) -// to heap indices, to allow looking up by address. Unlike ClientMap, -// clientMapInner requires external synchonization. -type clientMapInner struct { - byAge []*clientRecord - byAddr map[net.Addr]int -} - -// removeExpired removes all client records whose LastSeen timestamp is more -// than timeout in the past. -func (inner *clientMapInner) removeExpired(now time.Time, timeout time.Duration) { - for len(inner.byAge) > 0 && now.Sub(inner.byAge[0].LastSeen) >= timeout { - heap.Pop(inner) - } -} - -// SendQueue finds the existing client record corresponding to addr, or creates -// a new one if none exists yet. It updates the client record's LastSeen time -// and returns its SendQueue. -func (inner *clientMapInner) SendQueue(addr net.Addr, now time.Time) chan []byte { - var record *clientRecord - i, ok := inner.byAddr[addr] - if ok { - // Found one, update its LastSeen. - record = inner.byAge[i] - record.LastSeen = now - heap.Fix(inner, i) - } else { - // Not found, create a new one. - record = &clientRecord{ - Addr: addr, - LastSeen: now, - SendQueue: make(chan []byte, queueSize), - } - heap.Push(inner, record) - } - return record.SendQueue -} - -// heap.Interface for clientMapInner. - -func (inner *clientMapInner) Len() int { - if len(inner.byAge) != len(inner.byAddr) { - panic("inconsistent clientMap") - } - return len(inner.byAge) -} - -func (inner *clientMapInner) Less(i, j int) bool { - return inner.byAge[i].LastSeen.Before(inner.byAge[j].LastSeen) -} - -func (inner *clientMapInner) Swap(i, j int) { - inner.byAge[i], inner.byAge[j] = inner.byAge[j], inner.byAge[i] - inner.byAddr[inner.byAge[i].Addr] = i - inner.byAddr[inner.byAge[j].Addr] = j -} - -func (inner *clientMapInner) Push(x interface{}) { - record := x.(*clientRecord) - if _, ok := inner.byAddr[record.Addr]; ok { - panic("duplicate address in clientMap") - } - // Insert into byAddr map. - inner.byAddr[record.Addr] = len(inner.byAge) - // Insert into byAge slice. - inner.byAge = append(inner.byAge, record) -} - -func (inner *clientMapInner) Pop() interface{} { - n := len(inner.byAddr) - // Remove from byAge slice. - record := inner.byAge[n-1] - inner.byAge[n-1] = nil - inner.byAge = inner.byAge[:n-1] - // Remove from byAddr map. - delete(inner.byAddr, record.Addr) - return record -} diff --git a/common/turbotunnel/queuepacketconn.go b/common/turbotunnel/queuepacketconn.go index acb04c1..eb4df4b 100644 --- a/common/turbotunnel/queuepacketconn.go +++ b/common/turbotunnel/queuepacketconn.go @@ -21,8 +21,19 @@ type taggedPacket struct { // method inserts a packet into the incoming queue, to eventually be returned by // ReadFrom. WriteTo inserts a packet into an address-specific outgoing queue, // which can later by accessed through the OutgoingQueue method. +// +// Besides the outgoing queues, there is also a one-element "stash" for each +// remote peer address. You can stash a packet using the Stash method, and get +// it back later by receiving from the channel returned by Unstash. The stash is +// meant as a convenient place to temporarily store a single packet, such as +// when you've read one too many packets from the send queue and need to store +// the extra packet to be processed first in the next pass. It's the caller's +// responsibility to Unstash what they have Stashed. Calling Stash does not put +// the packet at the head of the send queue; if there is the possibility that a +// packet has been stashed, it must be checked for by calling Unstash in +// addition to OutgoingQueue. type QueuePacketConn struct { - clients *ClientMap + remotes *RemoteMap localAddr net.Addr recvQueue chan taggedPacket closeOnce sync.Once @@ -31,11 +42,11 @@ type QueuePacketConn struct { err atomic.Value }
-// NewQueuePacketConn makes a new QueuePacketConn, set to track recent peers for -// at least a duration of timeout. +// NewQueuePacketConn makes a new QueuePacketConn, set to track recent peers +// for at least a duration of timeout. func NewQueuePacketConn(localAddr net.Addr, timeout time.Duration) *QueuePacketConn { return &QueuePacketConn{ - clients: NewClientMap(timeout), + remotes: NewRemoteMap(timeout), localAddr: localAddr, recvQueue: make(chan taggedPacket, queueSize), closed: make(chan struct{}), @@ -65,7 +76,21 @@ func (c *QueuePacketConn) QueueIncoming(p []byte, addr net.Addr) { // creating it if necessary. The contents of the queue will be packets that are // written to the address in question using WriteTo. func (c *QueuePacketConn) OutgoingQueue(addr net.Addr) <-chan []byte { - return c.clients.SendQueue(addr) + return c.remotes.SendQueue(addr) +} + +// Stash places p in the stash for addr, if the stash is not already occupied. +// Returns true if the packet was placed in the stash, or false if the stash was +// already occupied. This method is similar to WriteTo, except that it puts the +// packet in the stash queue (accessible via Unstash), rather than the outgoing +// queue (accessible via OutgoingQueue). +func (c *QueuePacketConn) Stash(p []byte, addr net.Addr) bool { + return c.remotes.Stash(addr, p) +} + +// Unstash returns the channel that represents the stash for addr. +func (c *QueuePacketConn) Unstash(addr net.Addr) <-chan []byte { + return c.remotes.Unstash(addr) }
// ReadFrom returns a packet and address previously stored by QueueIncoming. @@ -95,7 +120,7 @@ func (c *QueuePacketConn) WriteTo(p []byte, addr net.Addr) (int, error) { buf := make([]byte, len(p)) copy(buf, p) select { - case c.clients.SendQueue(addr) <- buf: + case c.remotes.SendQueue(addr) <- buf: return len(buf), nil default: // Drop the outgoing packet if the send queue is full. diff --git a/common/turbotunnel/remotemap.go b/common/turbotunnel/remotemap.go new file mode 100644 index 0000000..c679bfa --- /dev/null +++ b/common/turbotunnel/remotemap.go @@ -0,0 +1,177 @@ +package turbotunnel + +import ( + "container/heap" + "net" + "sync" + "time" +) + +// remoteRecord is a record of a recently seen remote peer, with the time it was +// last seen and queues of outgoing packets. +type remoteRecord struct { + Addr net.Addr + LastSeen time.Time + SendQueue chan []byte + Stash chan []byte +} + +// RemoteMap manages a mapping of live remote peers, keyed by address, to their +// respective send queues. Each peer has two queues: a primary send queue, and a +// "stash". The primary send queue is returned by the SendQueue method. The +// stash is an auxiliary one-element queue accessed using the Stash and Unstash +// methods. The stash is meant for use by callers that need to "unread" a packet +// that's already been removed from the primary send queue. +// +// RemoteMap's functions are safe to call from multiple goroutines. +type RemoteMap struct { + // We use an inner structure to avoid exposing public heap.Interface + // functions to users of remoteMap. + inner remoteMapInner + // Synchronizes access to inner. + lock sync.Mutex +} + +// NewRemoteMap creates a RemoteMap that expires peers after a timeout. +// +// If the timeout is 0, peers never expire. +// +// The timeout does not have to be kept in sync with smux's idle timeout. If a +// peer is removed from the map while the smux session is still live, the worst +// that can happen is a loss of whatever packets were in the send queue at the +// time. If smux later decides to send more packets to the same peer, we'll +// instantiate a new send queue, and if the peer is ever seen again with a +// matching address, we'll deliver them. +func NewRemoteMap(timeout time.Duration) *RemoteMap { + m := &RemoteMap{ + inner: remoteMapInner{ + byAge: make([]*remoteRecord, 0), + byAddr: make(map[net.Addr]int), + }, + } + if timeout > 0 { + go func() { + for { + time.Sleep(timeout / 2) + now := time.Now() + m.lock.Lock() + m.inner.removeExpired(now, timeout) + m.lock.Unlock() + } + }() + } + return m +} + +// SendQueue returns the send queue corresponding to addr, creating it if +// necessary. +func (m *RemoteMap) SendQueue(addr net.Addr) chan []byte { + m.lock.Lock() + defer m.lock.Unlock() + return m.inner.Lookup(addr, time.Now()).SendQueue +} + +// Stash places p in the stash corresponding to addr, if the stash is not +// already occupied. Returns true if the p was placed in the stash, false +// otherwise. +func (m *RemoteMap) Stash(addr net.Addr, p []byte) bool { + m.lock.Lock() + defer m.lock.Unlock() + select { + case m.inner.Lookup(addr, time.Now()).Stash <- p: + return true + default: + return false + } +} + +// Unstash returns the channel that reads from the stash for addr. +func (m *RemoteMap) Unstash(addr net.Addr) <-chan []byte { + m.lock.Lock() + defer m.lock.Unlock() + return m.inner.Lookup(addr, time.Now()).Stash +} + +// remoteMapInner is the inner type of RemoteMap, implementing heap.Interface. +// byAge is the backing store, a heap ordered by LastSeen time, to facilitate +// expiring old records. byAddr is a map from addresses to heap indices, to +// allow looking up by address. Unlike RemoteMap, remoteMapInner requires +// external synchonization. +type remoteMapInner struct { + byAge []*remoteRecord + byAddr map[net.Addr]int +} + +// removeExpired removes all records whose LastSeen timestamp is more than +// timeout in the past. +func (inner *remoteMapInner) removeExpired(now time.Time, timeout time.Duration) { + for len(inner.byAge) > 0 && now.Sub(inner.byAge[0].LastSeen) >= timeout { + record := heap.Pop(inner).(*remoteRecord) + close(record.SendQueue) + } +} + +// Lookup finds the existing record corresponding to addr, or creates a new +// one if none exists yet. It updates the record's LastSeen time and returns the +// record. +func (inner *remoteMapInner) Lookup(addr net.Addr, now time.Time) *remoteRecord { + var record *remoteRecord + i, ok := inner.byAddr[addr] + if ok { + // Found one, update its LastSeen. + record = inner.byAge[i] + record.LastSeen = now + heap.Fix(inner, i) + } else { + // Not found, create a new one. + record = &remoteRecord{ + Addr: addr, + LastSeen: now, + SendQueue: make(chan []byte, queueSize), + Stash: make(chan []byte, 1), + } + heap.Push(inner, record) + } + return record +} + +// heap.Interface for remoteMapInner. + +func (inner *remoteMapInner) Len() int { + if len(inner.byAge) != len(inner.byAddr) { + panic("inconsistent remoteMap") + } + return len(inner.byAge) +} + +func (inner *remoteMapInner) Less(i, j int) bool { + return inner.byAge[i].LastSeen.Before(inner.byAge[j].LastSeen) +} + +func (inner *remoteMapInner) Swap(i, j int) { + inner.byAge[i], inner.byAge[j] = inner.byAge[j], inner.byAge[i] + inner.byAddr[inner.byAge[i].Addr] = i + inner.byAddr[inner.byAge[j].Addr] = j +} + +func (inner *remoteMapInner) Push(x interface{}) { + record := x.(*remoteRecord) + if _, ok := inner.byAddr[record.Addr]; ok { + panic("duplicate address in remoteMap") + } + // Insert into byAddr map. + inner.byAddr[record.Addr] = len(inner.byAge) + // Insert into byAge slice. + inner.byAge = append(inner.byAge, record) +} + +func (inner *remoteMapInner) Pop() interface{} { + n := len(inner.byAddr) + // Remove from byAge slice. + record := inner.byAge[n-1] + inner.byAge[n-1] = nil + inner.byAge = inner.byAge[:n-1] + // Remove from byAddr map. + delete(inner.byAddr, record.Addr) + return record +}
This is an automated email from the git hooks/post-receive script.
dcf pushed a commit to branch turbotunnel in repository pluggable-transports/meek.
commit 0b83ea61d699eaed82af0b6e1c1a8a70d92f44fe Author: David Fifield david@bamsoftware.com AuthorDate: Thu Oct 27 18:54:21 2022 -0600
Import server send loop from Champa.
Enforces maxPayloadLength (by stashing packets that exceed the limit) and should have lower latency because it does not wait for the maximum delay every time. --- meek-server/meek-server.go | 70 ++++++++++++++++++++++++++++++++++++---------- 1 file changed, 55 insertions(+), 15 deletions(-)
diff --git a/meek-server/meek-server.go b/meek-server/meek-server.go index f971ab8..f715107 100644 --- a/meek-server/meek-server.go +++ b/meek-server/meek-server.go @@ -55,7 +55,7 @@ const ( // chunk of data we'll send back in a response. maxPayloadLength = 0x10000 // How long we try to read from the OR port before closing a response. - turnaroundTimeout = 10 * time.Millisecond + turnaroundTimeout = 100 * time.Millisecond // Passed as ReadTimeout and WriteTimeout when constructing the // http.Server. readWriteTimeout = 20 * time.Second @@ -169,26 +169,66 @@ func (state *State) Post(w http.ResponseWriter, req *http.Request) { state.conn.QueueIncoming(p, clientID) }
- // Write outgoing packets, if any, up to turnaroundTimeout. w.Header().Set("Content-Type", "application/octet-stream") - outgoing := state.conn.OutgoingQueue(clientID) + // Write outgoing packets, if any. We wait up to turnaroundTimeout for + // the first available packet; after that we only include whatever + // packets are immediately available. + limit := maxPayloadLength timer := time.NewTimer(turnaroundTimeout) -loop: + defer timer.Stop() + first := true for { + var p []byte + unstash := state.conn.Unstash(clientID) + outgoing := state.conn.OutgoingQueue(clientID) + // Prioritize taking a packet first from the stash, then from + // the outgoing queue, then finally check for expiration of the + // timer. (We continue to bundle packets even after the timer + // expires, as long as the packets are immediately available.) select { - case <-timer.C: - break loop - case p := <-outgoing: - _, err := encapsulation.WriteData(w, p) - if err != nil { - break loop - } - // Flush after each chunk, this is important for - // latency. - if w, ok := w.(http.Flusher); ok { - w.Flush() + case p = <-unstash: + default: + select { + case p = <-unstash: + case p = <-outgoing: + default: + select { + case p = <-unstash: + case p = <-outgoing: + case <-timer.C: + } } } + // We wait for the first packet only. Later packets must be + // immediately available. + timer.Reset(0) + + if len(p) == 0 { + // Timer expired, we are done bundling packets into this + // response. + break + } + + limit -= len(p) + if !first && limit < 0 { + // This packet doesn't fit in the payload size limit. + // Stash it so that it will be first in line for the + // next response. + state.conn.Stash(p, clientID) + break + } + first = false + + // Write the packet to the HTTP response. + _, err := encapsulation.WriteData(w, p) + if err != nil { + log.Printf("encapsulation.WriteData: %v", err) + break + } + // Flush after each chunk, this is important for latency. + if rw, ok := w.(http.Flusher); ok { + rw.Flush() + } } }
This is an automated email from the git hooks/post-receive script.
dcf pushed a commit to branch turbotunnel in repository pluggable-transports/meek.
commit 4c76264472b5071b48dfa44c8273eb13fddbaedb Author: David Fifield david@bamsoftware.com AuthorDate: Thu Oct 27 23:50:19 2022 -0600
Add TestCloseHaltsRequestLoop. --- meek-client/turbotunnel_test.go | 57 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 57 insertions(+)
diff --git a/meek-client/turbotunnel_test.go b/meek-client/turbotunnel_test.go new file mode 100644 index 0000000..11640ec --- /dev/null +++ b/meek-client/turbotunnel_test.go @@ -0,0 +1,57 @@ +package main + +import ( + "bytes" + "errors" + "io" + "io/ioutil" + "testing" + "time" +) + +type emptyAddr struct{} + +func (_ emptyAddr) Network() string { return "empty" } +func (_ emptyAddr) String() string { return "empty" } + +type funcPoller struct { + poll func(out io.Reader) (in io.ReadCloser, err error) +} + +func (fp funcPoller) Poll(out io.Reader) (in io.ReadCloser, err error) { + return fp.poll(out) +} + +// TestCloseHaltsRequestLoop tests that requestLoop terminates and stops calling +// its Poller after Close is called. +func TestCloseHaltsRequestLoop(t *testing.T) { + closedCh := make(chan struct{}) + resultCh := make(chan error) + // The poller returns immediately with a nil error as long as closedCh + // is not closed. When closedCh is closed, the poller returns + // immediately with a non-nil error. + poller := funcPoller{poll: func(_ io.Reader) (io.ReadCloser, error) { + select { + case <-closedCh: + resultCh <- errors.New("poll called after close") + default: + } + return ioutil.NopCloser(bytes.NewReader(nil)), nil + }} + pconn := NewPollingPacketConn(emptyAddr{}, poller) + // Close the connection. + err := pconn.Close() + if err != nil { + t.Fatal(err) + } + // Tell the poll function to return an error if it is called after this + // point. + close(closedCh) + // Wait a few seconds to see if the poll function is called after the + // conn is closed. + select { + case err := <-resultCh: + t.Fatal(err) + case <-time.After(3 * time.Second): + } +}
This is an automated email from the git hooks/post-receive script.
dcf pushed a commit to branch turbotunnel in repository pluggable-transports/meek.
commit a61ec4e3c8ab2a606f643f3a8159e3515295bcda Author: David Fifield david@bamsoftware.com AuthorDate: Fri Oct 28 00:22:03 2022 -0600
Reqwrite PollingPacketConn in terms of QueuePacketConn.
As in Champa. Use one main polling loop, not multiple, but execute each poll asynchronously. --- meek-client/turbotunnel.go | 242 ++++++++++++++------------------------------- 1 file changed, 75 insertions(+), 167 deletions(-)
diff --git a/meek-client/turbotunnel.go b/meek-client/turbotunnel.go index d4274e2..cbb3380 100644 --- a/meek-client/turbotunnel.go +++ b/meek-client/turbotunnel.go @@ -10,11 +10,9 @@ package main
import ( "bytes" - "errors" + "context" "io" "net" - "sync" - "sync/atomic" "time"
"git.torproject.org/pluggable-transports/meek.git/common/encapsulation" @@ -22,18 +20,11 @@ import ( )
const ( - // The size of receive and send queues. - queueSize = 256 - // The size of the largest bundle of packets we will send in a poll. // (Actually it's not quite a maximum, we will quit bundling as soon as // it is exceeded.) maxSendBundleLength = 0x10000
- // How many goroutines stand ready to do a poll when an outgoing packet - // needs to be sent. - numRequestLoops = 32 - // We must poll the server to see if it has anything to send; there is // no way for the server to push data back to us until we poll it. When // a timer expires, we send a request even if it has an empty body. The @@ -66,16 +57,13 @@ type PollingPacketConn struct { clientID turbotunnel.ClientID remoteAddr net.Addr poller Poller - recvQueue chan []byte - sendQueue chan []byte - // Sending on pollChan permits requestLoop to send an empty polling - // query. requestLoop also does its own polling according to a time - // schedule. - pollChan chan struct{} - closeOnce sync.Once - closed chan struct{} - // What error to return when the PollingPacketConn is closed. - err atomic.Value + ctx context.Context + cancel context.CancelFunc + // QueuePacketConn is the direct receiver of ReadFrom and WriteTo calls. + // requestLoop, via send, removes messages from the outgoing queue that + // were placed there by WriteTo, and inserts messages into the incoming + // queue to be returned from ReadFrom. + *turbotunnel.QueuePacketConn }
// NewPollingPacketConn creates a PollingPacketConn with a random ClientID as @@ -83,99 +71,31 @@ type PollingPacketConn struct { // ReadFrom, and the RemoteAddr method; is is poller that really controls the // effective remote address. func NewPollingPacketConn(remoteAddr net.Addr, poller Poller) *PollingPacketConn { + clientID := turbotunnel.NewClientID() + ctx, cancel := context.WithCancel(context.Background()) c := &PollingPacketConn{ - clientID: turbotunnel.NewClientID(), - remoteAddr: remoteAddr, - poller: poller, - recvQueue: make(chan []byte, queueSize), - sendQueue: make(chan []byte, queueSize), - pollChan: make(chan struct{}), - closed: make(chan struct{}), - } - for i := 0; i < numRequestLoops; i++ { - go c.requestLoop() + clientID: clientID, + remoteAddr: remoteAddr, + poller: poller, + ctx: ctx, + cancel: cancel, + QueuePacketConn: turbotunnel.NewQueuePacketConn(clientID, 0), } + go c.requestLoop() return c }
-var errClosedPacketConn = errors.New("operation on closed connection") -var errNotImplemented = errors.New("not implemented") - -// ReadFrom returns a packet received from a previous poll, blocking until there -// is a packet to return. Unless the returned error is non-nil, the returned -// net.Addr is always c.RemoteAddr(), -func (c *PollingPacketConn) ReadFrom(p []byte) (int, net.Addr, error) { - select { - case <-c.closed: - return 0, nil, &net.OpError{Op: "read", Net: c.RemoteAddr().Network(), Source: c.LocalAddr(), Addr: c.RemoteAddr(), Err: c.err.Load().(error)} - default: - } - select { - case <-c.closed: - return 0, nil, &net.OpError{Op: "read", Net: c.RemoteAddr().Network(), Source: c.LocalAddr(), Addr: c.RemoteAddr(), Err: c.err.Load().(error)} - case buf := <-c.recvQueue: - return copy(p, buf), c.RemoteAddr(), nil - } -} - -// WriteTo queues a packet to be sent (possibly batched) by the underlying -// poller. -func (c *PollingPacketConn) WriteTo(p []byte, addr net.Addr) (int, error) { - // The addr argument is ignored. - select { - case <-c.closed: - return 0, &net.OpError{Op: "write", Net: c.RemoteAddr().Network(), Source: c.LocalAddr(), Addr: c.RemoteAddr(), Err: c.err.Load().(error)} - default: - } - // Copy the slice so that the caller may reuse it. - buf := make([]byte, len(p)) - copy(buf, p) - select { - case c.sendQueue <- buf: - return len(buf), nil - default: - // Drop the outgoing packet if the send queue is full. - return len(buf), nil - } -} - -// closeWithError unblocks pending operations and makes future operations fail -// with the given error. If err is nil, it becomes errClosedPacketConn. -func (c *PollingPacketConn) closeWithError(err error) error { - var newlyClosed bool - c.closeOnce.Do(func() { - newlyClosed = true - // Store the error to be returned by future PacketConn - // operations. - if err == nil { - err = errClosedPacketConn - } - c.err.Store(err) - close(c.closed) - }) - if !newlyClosed { - return &net.OpError{Op: "close", Net: c.LocalAddr().Network(), Addr: c.LocalAddr(), Err: c.err.Load().(error)} - } - return nil -} - -// Close unblocks pending operations and makes future operations fail with a -// "closed connection" error. +// Close cancels any in-progress polls and closes the underlying +// QueuePacketConn. func (c *PollingPacketConn) Close() error { - return c.closeWithError(nil) + c.cancel() + return c.QueuePacketConn.Close() }
-// LocalAddr returns this connection's random Client ID. -func (c *PollingPacketConn) LocalAddr() net.Addr { return c.clientID } - // RemoteAddr returns the remoteAddr value that was passed to // NewPollingPacketConn. func (c *PollingPacketConn) RemoteAddr() net.Addr { return c.remoteAddr }
-func (c *PollingPacketConn) SetDeadline(t time.Time) error { return errNotImplemented } -func (c *PollingPacketConn) SetReadDeadline(t time.Time) error { return errNotImplemented } -func (c *PollingPacketConn) SetWriteDeadline(t time.Time) error { return errNotImplemented } - func (c *PollingPacketConn) requestLoop() { pollDelay := initPollDelay pollTimer := time.NewTimer(pollDelay) @@ -184,52 +104,31 @@ func (c *PollingPacketConn) requestLoop() { body.Write(c.clientID[:])
var p []byte + unstash := c.QueuePacketConn.Unstash(c.remoteAddr) + outgoing := c.QueuePacketConn.OutgoingQueue(c.remoteAddr) pollTimerExpired := false - // Block, waiting for at least one packet. + // Block, waiting for one packet or a demand to poll. Prioritize + // taking a packet from the stash, then taking one from the + // outgoing queue, then finally also consider polls. select { - case <-c.closed: + case <-c.ctx.Done(): return - case p = <-c.sendQueue: + case p = <-unstash: default: select { - case <-c.closed: + case <-c.ctx.Done(): return - case p = <-c.sendQueue: - case <-c.pollChan: - p = nil - case <-pollTimer.C: - p = nil - pollTimerExpired = true - } - } - - if len(p) > 0 { - // Encapsulate the packet into the request body. - encapsulation.WriteData(&body, p) - - // A data-carrying request displaces one pending poll - // opportunity, if any. - select { - case <-c.pollChan: - default: - } - } - - // Send any other packets that are immediately available, up to - // maxSendBundleLength. - loop: - // TODO: It would be better if maxSendBundleLength were a true - // maximum (we don't remove a packet from c.sendQueue unless it - // fits in the remaining length). That would also allow for - // arbitrary shaping, along with encapsulation.WritePadding. - for body.Len() < maxSendBundleLength { - select { - case <-c.closed: - return - case p := <-c.sendQueue: - encapsulation.WriteData(&body, p) + case p = <-unstash: + case p = <-outgoing: default: - break loop + select { + case <-c.ctx.Done(): + return + case p = <-unstash: + case p = <-outgoing: + case <-pollTimer.C: + pollTimerExpired = true + } } }
@@ -241,8 +140,7 @@ func (c *PollingPacketConn) requestLoop() { pollDelay = maxPollDelay } } else { - // We're sending an actual data packet, or we're polling - // in response to a received packet. Reset the poll + // We're sending an actual data packet. Reset the poll // delay to initial. if !pollTimer.Stop() { <-pollTimer.C @@ -251,36 +149,46 @@ func (c *PollingPacketConn) requestLoop() { } pollTimer.Reset(pollDelay)
- resp, err := c.poller.Poll(&body) - if err != nil { - c.closeWithError(err) - return - } - defer resp.Close() + // Grab as many more packets as are immediately available and + // fit in maxSendBundleLength. Always include the first packet, + // even if it doesn't fit. + first := true + for len(p) > 0 && (first || body.Len()+len(p) <= maxSendBundleLength) { + first = false + + // Encapsulate the packet into the request body. + encapsulation.WriteData(&body, p)
- queuedPoll := false - for { - p, err := encapsulation.ReadData(resp) - if err == io.EOF { - break - } else if err != nil { - c.closeWithError(err) - break - } select { - case c.recvQueue <- p: + case p = <-outgoing: default: - // Drop incoming packets when queue is full. + p = nil + } + } + if len(p) > 0 { + // We read an actual packet, but it didn't fit under the + // limit. Stash it so that it will be first in line for + // the next poll. + c.QueuePacketConn.Stash(p, c.remoteAddr) + } + + go func() { + resp, err := c.poller.Poll(&body) + if err != nil { + c.Close() + return } - if !queuedPoll { - queuedPoll = true - for i := 0; i < 2; i++ { - select { - case c.pollChan <- struct{}{}: - default: - } + defer resp.Close() + for { + p, err := encapsulation.ReadData(resp) + if err == io.EOF { + break + } else if err != nil { + c.Close() + return } + c.QueuePacketConn.QueueIncoming(p, c.remoteAddr) } - } + }() } }
This is an automated email from the git hooks/post-receive script.
dcf pushed a commit to branch turbotunnel in repository pluggable-transports/meek.
commit 46c988b2582fcfc9d6ff17114a1961a058edd556 Author: David Fifield david@bamsoftware.com AuthorDate: Fri Oct 28 01:01:17 2022 -0600
Halt requestLoop when PollingPacketConn is closed. --- meek-client/meek-client.go | 4 +++- meek-client/turbotunnel.go | 4 ++-- meek-client/turbotunnel_test.go | 50 +++++++++++++++++++++++++++++++++-------- 3 files changed, 46 insertions(+), 12 deletions(-)
diff --git a/meek-client/meek-client.go b/meek-client/meek-client.go index 683aa90..813a373 100644 --- a/meek-client/meek-client.go +++ b/meek-client/meek-client.go @@ -27,6 +27,7 @@ package main
import ( + "context" "flag" "fmt" "io" @@ -97,13 +98,14 @@ type RequestInfo struct { RoundTripper http.RoundTripper }
-func (info *RequestInfo) Poll(out io.Reader) (in io.ReadCloser, err error) { +func (info *RequestInfo) Poll(ctx context.Context, out io.Reader) (in io.ReadCloser, err error) { req, err := http.NewRequest("POST", info.URL.String(), out) // Prevent Content-Type sniffing by net/http and middleboxes. req.Header.Set("Content-Type", "application/octet-stream") if err != nil { return nil, err } + req = req.WithContext(ctx) if info.Host != "" { req.Host = info.Host } diff --git a/meek-client/turbotunnel.go b/meek-client/turbotunnel.go index cbb3380..43c0c46 100644 --- a/meek-client/turbotunnel.go +++ b/meek-client/turbotunnel.go @@ -40,7 +40,7 @@ const ( // Poller is an abstract interface over an operation that writes a stream of // bytes and reads a stream of bytes in return, like an HTTP request. type Poller interface { - Poll(out io.Reader) (in io.ReadCloser, err error) + Poll(ctx context.Context, out io.Reader) (in io.ReadCloser, err error) }
// PollingPacketConn implements the net.PacketConn interface over a carrier of @@ -173,7 +173,7 @@ func (c *PollingPacketConn) requestLoop() { }
go func() { - resp, err := c.poller.Poll(&body) + resp, err := c.poller.Poll(c.ctx, &body) if err != nil { c.Close() return diff --git a/meek-client/turbotunnel_test.go b/meek-client/turbotunnel_test.go index 11640ec..89aadab 100644 --- a/meek-client/turbotunnel_test.go +++ b/meek-client/turbotunnel_test.go @@ -2,6 +2,7 @@ package main
import ( "bytes" + "context" "errors" "io" "io/ioutil" @@ -15,24 +16,58 @@ func (_ emptyAddr) Network() string { return "empty" } func (_ emptyAddr) String() string { return "empty" }
type funcPoller struct { - poll func(out io.Reader) (in io.ReadCloser, err error) + poll func(ctx context.Context, out io.Reader) (in io.ReadCloser, err error) }
-func (fp funcPoller) Poll(out io.Reader) (in io.ReadCloser, err error) { - return fp.poll(out) +func (fp funcPoller) Poll(ctx context.Context, out io.Reader) (in io.ReadCloser, err error) { + return fp.poll(ctx, out) +} + +// TestCloseCancelsPoll tests that calling Close cancels the context passed to +// the poller. +func TestCloseCancelsPoll(t *testing.T) { + beginCh := make(chan struct{}) + resultCh := make(chan error) + // The poller returns immediately with a nil error when its context is + // canceled. It returns after a delay with a non-nil error if its + // context is not canceled. + poller := funcPoller{poll: func(ctx context.Context, _ io.Reader) (io.ReadCloser, error) { + defer close(resultCh) + beginCh <- struct{}{} + select { + case <-ctx.Done(): + resultCh <- nil + case <-time.After(5 * time.Second): + resultCh <- errors.New("poll was not canceled") + } + return ioutil.NopCloser(bytes.NewReader(nil)), nil + }} + pconn := NewPollingPacketConn(emptyAddr{}, poller) + // Wait until the poll function has been called. + <-beginCh + // Close the connection. + err := pconn.Close() + if err != nil { + t.Fatal(err) + } + // Observe what happened inside the poll function. Closing the + // connection should have canceled the context. + err = <-resultCh + if err != nil { + t.Fatal(err) + } }
// TestCloseHaltsRequestLoop tests that requestLoop terminates and stops calling // its Poller after Close is called. func TestCloseHaltsRequestLoop(t *testing.T) { - closedCh := make(chan struct{}) resultCh := make(chan error) // The poller returns immediately with a nil error as long as closedCh // is not closed. When closedCh is closed, the poller returns // immediately with a non-nil error. - poller := funcPoller{poll: func(_ io.Reader) (io.ReadCloser, error) { + poller := funcPoller{poll: func(ctx context.Context, _ io.Reader) (io.ReadCloser, error) { select { - case <-closedCh: + case <-ctx.Done(): resultCh <- errors.New("poll called after close") default: } @@ -44,9 +79,6 @@ func TestCloseHaltsRequestLoop(t *testing.T) { if err != nil { t.Fatal(err) } - // Tell the poll function to return an error if it is called after this - // point. - close(closedCh) // Wait a few seconds to see if the poll function is called after the // conn is closed. select {
This is an automated email from the git hooks/post-receive script.
dcf pushed a commit to branch turbotunnel in repository pluggable-transports/meek.
commit b93c5a027f7f60636c84bd405ec7c8089d4a8b0e Merge: 46c988b e9eff72 Author: David Fifield david@bamsoftware.com AuthorDate: Fri Oct 28 01:11:22 2022 -0600
Merge branch 'main' into turbotunnel
common/encapsulation/encapsulation.go | 22 ++- doc/meek-client.1 | 208 ++++++++++++++++++++++- doc/meek-client.1.txt | 24 ++- doc/meek-server.1 | 24 ++- doc/meek-server.1.txt | 6 +- go.mod | 8 +- go.sum | 28 ++- meek-client-torbrowser/linux.go | 2 + meek-client-torbrowser/mac.go | 2 + meek-client-torbrowser/meek-client-torbrowser.go | 4 +- meek-client-torbrowser/terminate_other.go | 1 + meek-client-torbrowser/terminate_windows.go | 1 + meek-client-torbrowser/windows.go | 2 + meek-client/meek-client.go | 24 ++- meek-client/proxy_test.go | 2 +- meek-client/torrc | 1 - meek-client/utls.go | 68 ++++---- meek-client/utls_test.go | 45 +---- meek-server/certificate.go | 1 + meek-server/meek-server.go | 19 ++- 20 files changed, 363 insertions(+), 129 deletions(-)
diff --cc common/encapsulation/encapsulation.go index 64a58d8,0000000..9fd7449 mode 100644,000000..100644 --- a/common/encapsulation/encapsulation.go +++ b/common/encapsulation/encapsulation.go @@@ -1,194 -1,0 +1,200 @@@ +// Package encapsulation implements a way of encoding variable-size chunks of +// data and padding into a byte stream. +// +// Each chunk of data or padding starts with a variable-size length prefix. One +// bit ("d") in the first byte of the prefix indicates whether the chunk +// represents data or padding (1=data, 0=padding). Another bit ("c" for +// "continuation") is the indicates whether there are more bytes in the length +// prefix. The remaining 6 bits ("x") encode part of the length value. - // dcxxxxxx ++// ++// dcxxxxxx ++// +// If the continuation bit is set, then the next byte is also part of the length +// prefix. It lacks the "d" bit, has its own "c" bit, and 7 value-carrying bits +// ("y"). - // cyyyyyyy ++// ++// cyyyyyyy ++// +// The length is decoded by concatenating value-carrying bits, from left to +// right, of all value-carrying bits, up to and including the first byte whose +// "c" bit is 0. Although in principle this encoding would allow for length +// prefixes of any size, length prefixes are arbitrarily limited to 3 bytes and +// any attempt to read or write a longer one is an error. These are therefore +// the only valid formats: - // 00xxxxxx xxxxxx₂ bytes of padding - // 10xxxxxx xxxxxx₂ bytes of data - // 01xxxxxx 0yyyyyyy xxxxxxyyyyyyy₂ bytes of padding - // 11xxxxxx 0yyyyyyy xxxxxxyyyyyyy₂ bytes of data - // 01xxxxxx 1yyyyyyy 0zzzzzzz xxxxxxyyyyyyyzzzzzzz₂ bytes of padding - // 11xxxxxx 1yyyyyyy 0zzzzzzz xxxxxxyyyyyyyzzzzzzz₂ bytes of data ++// ++// 00xxxxxx xxxxxx₂ bytes of padding ++// 10xxxxxx xxxxxx₂ bytes of data ++// 01xxxxxx 0yyyyyyy xxxxxxyyyyyyy₂ bytes of padding ++// 11xxxxxx 0yyyyyyy xxxxxxyyyyyyy₂ bytes of data ++// 01xxxxxx 1yyyyyyy 0zzzzzzz xxxxxxyyyyyyyzzzzzzz₂ bytes of padding ++// 11xxxxxx 1yyyyyyy 0zzzzzzz xxxxxxyyyyyyyzzzzzzz₂ bytes of data ++// +// The maximum encodable length is 11111111111111111111₂ = 0xfffff = 1048575. +// There is no requirement to use a length prefix of minimum size; i.e. 00000100 +// and 01000000 00000100 are both valid encodings of the value 4. +// +// After the length prefix follow that many bytes of padding or data. There are +// no restrictions on the value of bytes comprising padding. +// +// The idea for this encapsulation is sketched here: +// https://github.com/net4people/bbs/issues/9#issuecomment-524095186 +package encapsulation + +import ( + "errors" + "io" + "io/ioutil" +) + +// ErrTooLong is the error returned when an encoded length prefix is longer than +// 3 bytes, or when ReadData receives an input whose length is too large to +// encode in a 3-byte length prefix. +var ErrTooLong = errors.New("length prefix is too long") + +// ReadData returns a new slice with the contents of the next available data +// chunk, skipping over any padding chunks that may come first. The returned +// error value is nil if and only if a data chunk was present and was read in +// its entirety. The returned error is io.EOF only if r ended before the first +// byte of a length prefix. If r ended in the middle of a length prefix or +// data/padding, the returned error is io.ErrUnexpectedEOF. +func ReadData(r io.Reader) ([]byte, error) { + for { + var b [1]byte + _, err := r.Read(b[:]) + if err != nil { + // This is the only place we may return a real io.EOF. + return nil, err + } + isData := (b[0] & 0x80) != 0 + moreLength := (b[0] & 0x40) != 0 + n := int(b[0] & 0x3f) + for i := 0; moreLength; i++ { + if i >= 2 { + return nil, ErrTooLong + } + _, err := r.Read(b[:]) + if err == io.EOF { + err = io.ErrUnexpectedEOF + } + if err != nil { + return nil, err + } + moreLength = (b[0] & 0x80) != 0 + n = (n << 7) | int(b[0]&0x7f) + } + if isData { + p := make([]byte, n) + _, err := io.ReadFull(r, p) + if err == io.EOF { + err = io.ErrUnexpectedEOF + } + if err != nil { + return nil, err + } + return p, err + } else { + _, err := io.CopyN(ioutil.Discard, r, int64(n)) + if err == io.EOF { + err = io.ErrUnexpectedEOF + } + if err != nil { + return nil, err + } + } + } +} + +// dataPrefixForLength returns a length prefix for the given length, with the +// "d" bit set to 1. +func dataPrefixForLength(n int) ([]byte, error) { + switch { + case (n>>0)&0x3f == (n >> 0): + return []byte{0x80 | byte((n>>0)&0x3f)}, nil + case (n>>7)&0x3f == (n >> 7): + return []byte{0xc0 | byte((n>>7)&0x3f), byte((n >> 0) & 0x7f)}, nil + case (n>>14)&0x3f == (n >> 14): + return []byte{0xc0 | byte((n>>14)&0x3f), 0x80 | byte((n>>7)&0x7f), byte((n >> 0) & 0x7f)}, nil + default: + return nil, ErrTooLong + } +} + +// WriteData encodes a data chunk into w. It returns the total number of bytes +// written; i.e., including the length prefix. The error is ErrTooLong if the +// length of data cannot fit into a length prefix. +func WriteData(w io.Writer, data []byte) (int, error) { + prefix, err := dataPrefixForLength(len(data)) + if err != nil { + return 0, err + } + total := 0 + n, err := w.Write(prefix) + total += n + if err != nil { + return total, err + } + n, err = w.Write(data) + total += n + return total, err +} + +var paddingBuffer = make([]byte, 1024) + +// WritePadding encodes padding chunks, whose total size (including their own +// length prefixes) is n. Returns the total number of bytes written to w, which +// will be exactly n unless there was an error. The error cannot be ErrTooLong +// because this function will write multiple padding chunks if necessary to +// reach the requested size. Panics if n is negative. +func WritePadding(w io.Writer, n int) (int, error) { + if n < 0 { + panic("negative length") + } + total := 0 + for n > 0 { + p := len(paddingBuffer) + if p > n { + p = n + } + n -= p + var prefix []byte + switch { + case ((p-1)>>0)&0x3f == ((p - 1) >> 0): + p = p - 1 + prefix = []byte{byte((p >> 0) & 0x3f)} + case ((p-2)>>7)&0x3f == ((p - 2) >> 7): + p = p - 2 + prefix = []byte{0x40 | byte((p>>7)&0x3f), byte((p >> 0) & 0x7f)} + case ((p-3)>>14)&0x3f == ((p - 3) >> 14): + p = p - 3 + prefix = []byte{0x40 | byte((p>>14)&0x3f), 0x80 | byte((p>>7)&0x3f), byte((p >> 0) & 0x7f)} + } + nn, err := w.Write(prefix) + total += nn + if err != nil { + return total, err + } + nn, err = w.Write(paddingBuffer[:p]) + total += nn + if err != nil { + return total, err + } + } + return total, nil +} + +// MaxDataForSize returns the length of the longest slice that can be passed to +// WriteData, whose total encoded size (including length prefix) is no larger +// than n. Call this to find out if a chunk of data will fit into a length +// budget. Panics if n == 0. +func MaxDataForSize(n int) int { + if n == 0 { + panic("zero length") + } + prefix, err := dataPrefixForLength(n) + if err == ErrTooLong { + return (1 << (6 + 7 + 7)) - 1 - 3 + } else if err != nil { + panic(err) + } + return n - len(prefix) +} diff --cc doc/meek-client.1 index 8215537,ec64796..70ae1e1 --- a/doc/meek-client.1 +++ b/doc/meek-client.1 @@@ -2,12 -2,12 +2,12 @@@ ." Title: meek-client ." Author: [FIXME: author] [see http://docbook.sf.net/el/author] ." Generator: DocBook XSL Stylesheets v1.79.1 http://docbook.sf.net/ - ." Date: 04/29/2020 -." Date: 10/20/2022 ++." Date: 10/28/2022 ." Manual: \ & ." Source: \ & ." Language: English ." - .TH "MEEK-CLIENT" "1" "04/29/2020" "\ &" "\ &" -.TH "MEEK-CLIENT" "1" "10/20/2022" "\ &" "\ &" ++.TH "MEEK-CLIENT" "1" "10/28/2022" "\ &" "\ &" ." ----------------------------------------------------------------- ." * Define some portability stuff ." ----------------------------------------------------------------- diff --cc go.mod index e6c7dfb,cdeb2e8..99d67f7 --- a/go.mod +++ b/go.mod @@@ -4,10 -4,8 +4,10 @@@ go 1.1
require ( git.torproject.org/pluggable-transports/goptlib.git v1.1.0 - github.com/refraction-networking/utls v0.0.0-20190909200633-43c36d3c1f57 + github.com/refraction-networking/utls v1.1.5 + github.com/xtaci/kcp-go/v5 v5.5.14 + github.com/xtaci/smux v1.5.14 - golang.org/x/crypto v0.0.0-20191206172530-e9b2fee46413 - golang.org/x/net v0.0.0-20191209160850-c0dbc17a3553 - golang.org/x/sys v0.0.0-20191228213918-04cbcbbfeed8 + golang.org/x/crypto v0.0.0-20220829220503-c86fa9a7ed90 + golang.org/x/net v0.0.0-20220909164309-bea034e7d591 + golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10 ) diff --cc go.sum index e22f000,3b60946..7b248c4 --- a/go.sum +++ b/go.sum @@@ -1,34 -1,24 +1,50 @@@ git.torproject.org/pluggable-transports/goptlib.git v1.1.0 h1:LMQAA8pAho+QtYrrVNimJQiINNEwcwuuD99vezD/PAo= git.torproject.org/pluggable-transports/goptlib.git v1.1.0/go.mod h1:YT4XMSkuEXbtqlydr9+OxqFAyspUv0Gr9qhM3B++o/Q= + github.com/andybalholm/brotli v1.0.4 h1:V7DdXeJtZscaqfNuAdSRuRFzuiKlHSC/Zh3zl9qY3JY= + github.com/andybalholm/brotli v1.0.4/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= + github.com/klauspost/compress v1.15.9 h1:wKRjX6JRtDdrE9qwa4b/Cip7ACOshUI4smpCQanqjSY= + github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= +github.com/klauspost/cpuid v1.2.2 h1:1xAgYebNnsb9LKCdLOvFWtAxGU/33mjJtyOVbmUa0Us= +github.com/klauspost/cpuid v1.2.2/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek= +github.com/klauspost/reedsolomon v1.9.3 h1:N/VzgeMfHmLc+KHMD1UL/tNkfXAt8FnUqlgXGIduwAY= +github.com/klauspost/reedsolomon v1.9.3/go.mod h1:CwCi+NUr9pqSVktrkN+Ondf06rkhYZ/pcNv7fu+8Un4= +github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= +github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= - github.com/refraction-networking/utls v0.0.0-20190909200633-43c36d3c1f57 h1:SL1K0QAuC1b54KoY1pjPWe6kSlsFHwK9/oC960fKrTY= - github.com/refraction-networking/utls v0.0.0-20190909200633-43c36d3c1f57/go.mod h1:tz9gX959MEFfFN5whTIocCLUG57WiILqtdVxI8c6Wj0= + github.com/refraction-networking/utls v1.1.5 h1:JtrojoNhbUQkBqEg05sP3gDgDj6hIEAAVKbI9lx4n6w= + github.com/refraction-networking/utls v1.1.5/go.mod h1:jRQxtYi7nkq1p28HF2lwOH5zQm9aC8rpK0O9lIIzGh8= +github.com/templexxx/cpu v0.0.1 h1:hY4WdLOgKdc8y13EYklu9OUTXik80BkxHoWvTO6MQQY= +github.com/templexxx/cpu v0.0.1/go.mod h1:w7Tb+7qgcAlIyX4NhLuDKt78AHA5SzPmq0Wj6HiEnnk= +github.com/templexxx/xorsimd v0.4.1 h1:iUZcywbOYDRAZUasAs2eSCUW8eobuZDy0I9FJiORkVg= +github.com/templexxx/xorsimd v0.4.1/go.mod h1:W+ffZz8jJMH2SXwuKu9WhygqBMbFnp14G2fqEr8qaNo= +github.com/tjfoc/gmsm v1.0.1 h1:R11HlqhXkDospckjZEihx9SW/2VW0RgdwrykyWMFOQU= +github.com/tjfoc/gmsm v1.0.1/go.mod h1:XxO4hdhhrzAd+G4CjDqaOkd0hUzmtPR/d3EiBBMn/wc= +github.com/xtaci/kcp-go/v5 v5.5.14 h1:YODIwvTyZmOTj3SduJeIcQPxthDoHllMm8YIBlK44Ik= +github.com/xtaci/kcp-go/v5 v5.5.14/go.mod h1:H0T/EJ+lPNytnFYsKLH0JHUtiwZjG3KXlTM6c+Q4YUo= +github.com/xtaci/lossyconn v0.0.0-20190602105132-8df528c0c9ae h1:J0GxkO96kL4WF+AIT3M4mfUVinOCPgf2uUWYFUzN0sM= +github.com/xtaci/lossyconn v0.0.0-20190602105132-8df528c0c9ae/go.mod h1:gXtu8J62kEgmN++bm9BVICuT/e8yiLI2KFobd/TRFsE= +github.com/xtaci/smux v1.5.14 h1:1j+zJYDZRv9FHaWqCJfH5RPizIm0fSzJIFbfVn8zsfg= +github.com/xtaci/smux v1.5.14/go.mod h1:OMlQbT5vcgl2gb49mFkYo6SMf+zP3rcjcwQz7ZU7IGY= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= - golang.org/x/crypto v0.0.0-20191206172530-e9b2fee46413 h1:ULYEB3JvPRE/IfO+9uO7vKV/xzVTO7XPAwm8xbf4w2g= +golang.org/x/crypto v0.0.0-20191206172530-e9b2fee46413/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= + golang.org/x/crypto v0.0.0-20220829220503-c86fa9a7ed90 h1:Y/gsMcFOcR+6S6f3YeMKl5g+dZMEWqcz5Czj/GWYbkM= + golang.org/x/crypto v0.0.0-20220829220503-c86fa9a7ed90/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= - golang.org/x/net v0.0.0-20191209160850-c0dbc17a3553 h1:efeOvDhwQ29Dj3SdAV/MJf8oukgn+8D8WgaCaRMchF8= +golang.org/x/net v0.0.0-20191209160850-c0dbc17a3553/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= + golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= + golang.org/x/net v0.0.0-20220909164309-bea034e7d591 h1:D0B/7al0LLrVC8aWF4+oxpv/m8bc7ViFfVS8/gXGdqI= + golang.org/x/net v0.0.0-20220909164309-bea034e7d591/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= - golang.org/x/sys v0.0.0-20191228213918-04cbcbbfeed8 h1:JA8d3MPx/IToSyXZG/RhwYEtfrKO1Fxrqe8KrkiLXKM= +golang.org/x/sys v0.0.0-20191228213918-04cbcbbfeed8/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= - golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= + golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= + golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= + golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= + golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10 h1:WIoqL4EROvwiPdUtaip4VcDdpZ4kha7wBWZrbVKCIZg= + golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= + golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= + golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= + golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= + golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk= + golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= + golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= diff --cc meek-client/meek-client.go index 813a373,8905612..28943a8 --- a/meek-client/meek-client.go +++ b/meek-client/meek-client.go @@@ -98,14 -121,28 +104,14 @@@ type RequestInfo struct RoundTripper http.RoundTripper }
-// Make an http.Request from the payload data in buf and the request metadata in -// info. -func makeRequest(buf []byte, info *RequestInfo) (*http.Request, error) { - var body io.Reader - if len(buf) > 0 { - // Leave body == nil when buf is empty. A nil body is an - // explicit signal that the body is empty. An empty - // *bytes.Reader or the magic value http.NoBody are supposed to - // be equivalent ways to signal an empty body, but in Go 1.8 the - // HTTP/2 code only understands nil. Not leaving body == nil - // causes the Content-Length header to be omitted from HTTP/2 - // requests, which in some cases can cause the server to return - // a 411 "Length Required" error. See - // https://bugs.torproject.org/22865. - body = bytes.NewReader(buf) - } - req, err := http.NewRequest("POST", info.URL.String(), body) +func (info *RequestInfo) Poll(ctx context.Context, out io.Reader) (in io.ReadCloser, err error) { + req, err := http.NewRequest("POST", info.URL.String(), out) - // Prevent Content-Type sniffing by net/http and middleboxes. - req.Header.Set("Content-Type", "application/octet-stream") if err != nil { return nil, err } + req = req.WithContext(ctx) + // Prevent Content-Type sniffing by net/http and middleboxes. + req.Header.Set("Content-Type", "application/octet-stream") if info.Host != "" { req.Host = info.Host } diff --cc meek-server/meek-server.go index f715107,626ddb0..a4e5683 --- a/meek-server/meek-server.go +++ b/meek-server/meek-server.go @@@ -48,9 -49,13 +53,9 @@@ import )
const ( - programVersion = "0.34" + programVersion = "0.37.0"
ptMethodName = "meek" - // Reject session ids shorter than this, as a weak defense against - // client bugs that send an empty session id or something similarly - // likely to collide. - minSessionIDLength = 8 // The largest request body we are willing to process, and the largest // chunk of data we'll send back in a response. maxPayloadLength = 0x10000
tor-commits@lists.torproject.org