commit 01e28aa4604fea7a2af0259c8b18be1bd5f9b3d7 Author: David Fifield david@bamsoftware.com Date: Mon Feb 3 12:31:00 2020 -0700
Rewrite websocketconn with synchronous pipes.
Makes the following changes: * permits concurrent Read/Write/Close * converts certain CloseErrors into io.EOF
https://bugs.torproject.org/33144 --- common/websocketconn/websocketconn.go | 122 +++++++++++++++++++++++----------- 1 file changed, 82 insertions(+), 40 deletions(-)
diff --git a/common/websocketconn/websocketconn.go b/common/websocketconn/websocketconn.go index b87e657..fa2b0da 100644 --- a/common/websocketconn/websocketconn.go +++ b/common/websocketconn/websocketconn.go @@ -10,61 +10,103 @@ import ( // An abstraction that makes an underlying WebSocket connection look like an // io.ReadWriteCloser. type Conn struct { - Ws *websocket.Conn - r io.Reader + ws *websocket.Conn + Reader io.Reader + Writer io.Writer }
// Implements io.Reader. func (conn *Conn) Read(b []byte) (n int, err error) { - var opCode int - if conn.r == nil { - // New message - var r io.Reader - for { - if opCode, r, err = conn.Ws.NextReader(); err != nil { - return - } - if opCode != websocket.BinaryMessage && opCode != websocket.TextMessage { - continue - } - - conn.r = r - break - } - } - - n, err = conn.r.Read(b) - if err == io.EOF { - // Message finished - conn.r = nil - err = nil - } - return + return conn.Reader.Read(b) }
// Implements io.Writer. func (conn *Conn) Write(b []byte) (n int, err error) { - var w io.WriteCloser - if w, err = conn.Ws.NextWriter(websocket.BinaryMessage); err != nil { - return - } - if n, err = w.Write(b); err != nil { - return - } - err = w.Close() - return + return conn.Writer.Write(b) }
// Implements io.Closer. func (conn *Conn) Close() error { // Ignore any error in trying to write a Close frame. - _ = conn.Ws.WriteControl(websocket.CloseMessage, []byte{}, time.Now().Add(time.Second)) - return conn.Ws.Close() + _ = conn.ws.WriteControl(websocket.CloseMessage, []byte{}, time.Now().Add(time.Second)) + return conn.ws.Close() +} + +func readLoop(w io.Writer, ws *websocket.Conn) error { + for { + messageType, r, err := ws.NextReader() + if err != nil { + return err + } + if messageType != websocket.BinaryMessage && messageType != websocket.TextMessage { + continue + } + _, err = io.Copy(w, r) + if err != nil { + return err + } + } + return nil +} + +func writeLoop(ws *websocket.Conn, r io.Reader) error { + for { + var buf [2048]byte + n, err := r.Read(buf[:]) + if err != nil { + return err + } + data := buf[:n] + w, err := ws.NextWriter(websocket.BinaryMessage) + if err != nil { + return err + } + n, err = w.Write(data) + if err != nil { + return err + } + err = w.Close() + if err != nil { + return err + } + } +} + +// websocket.Conn methods start returning websocket.CloseError after the +// connection has been closed. We want to instead interpret that as io.EOF, just +// as you would find with a normal net.Conn. This only converts +// websocket.CloseErrors with known codes; other codes like CloseProtocolError +// and CloseAbnormalClosure will still be reported as anomalous. +func closeErrorToEOF(err error) error { + if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseNoStatusReceived) { + err = io.EOF + } + return err }
// Create a new Conn. func New(ws *websocket.Conn) *Conn { - var conn Conn - conn.Ws = ws - return &conn + // Set up synchronous pipes to serialize reads and writes to the + // underlying websocket.Conn. + // + // https://godoc.org/github.com/gorilla/websocket#hdr-Concurrency + // "Connections support one concurrent reader and one concurrent writer. + // Applications are responsible for ensuring that no more than one + // goroutine calls the write methods (NextWriter, etc.) concurrently and + // that no more than one goroutine calls the read methods (NextReader, + // etc.) concurrently. The Close and WriteControl methods can be called + // concurrently with all other methods." + pr1, pw1 := io.Pipe() + go func() { + pw1.CloseWithError(closeErrorToEOF(readLoop(pw1, ws))) + }() + pr2, pw2 := io.Pipe() + go func() { + pr2.CloseWithError(closeErrorToEOF(writeLoop(ws, pr2))) + }() + return &Conn{ + ws: ws, + Reader: pr1, + Writer: pw2, + } }
tor-commits@lists.torproject.org