commit 7092b2cb2c24759286f3ecc7713ad30115415e41 Author: Arlo Breault arlolra@gmail.com Date: Thu Nov 21 19:33:39 2019 -0500
Revert abstracting copyloop --- common/websocketconn/websocketconn.go | 19 ---------------- common/websocketconn/websocketconn_test.go | 30 ------------------------- proxy-go/proxy-go_test.go | 19 ++++++++++++++++ proxy-go/snowflake.go | 18 ++++++++++++++- server/server.go | 36 +++++++++++++++++++++++++++--- 5 files changed, 69 insertions(+), 53 deletions(-)
diff --git a/common/websocketconn/websocketconn.go b/common/websocketconn/websocketconn.go index 399cbaa..7e12abf 100644 --- a/common/websocketconn/websocketconn.go +++ b/common/websocketconn/websocketconn.go @@ -2,8 +2,6 @@ package websocketconn
import ( "io" - "log" - "sync" "time"
"github.com/gorilla/websocket" @@ -70,20 +68,3 @@ func NewWebSocketConn(ws *websocket.Conn) WebSocketConn { conn.Ws = ws return conn } - -// Copy from WebSocket to socket and vice versa. -func CopyLoop(c1 io.ReadWriteCloser, c2 io.ReadWriteCloser) { - var wg sync.WaitGroup - copyer := func(dst io.ReadWriteCloser, src io.ReadWriteCloser) { - defer wg.Done() - if _, err := io.Copy(dst, src); err != nil { - log.Printf("io.Copy inside CopyLoop generated an error: %v", err) - } - dst.Close() - src.Close() - } - wg.Add(2) - go copyer(c1, c2) - go copyer(c2, c1) - wg.Wait() -} diff --git a/common/websocketconn/websocketconn_test.go b/common/websocketconn/websocketconn_test.go deleted file mode 100644 index 3293165..0000000 --- a/common/websocketconn/websocketconn_test.go +++ /dev/null @@ -1,30 +0,0 @@ -package websocketconn - -import ( - "net" - "testing" - - . "github.com/smartystreets/goconvey/convey" -) - -func TestWebsocketConn(t *testing.T) { - Convey("CopyLoop", t, func() { - c1, s1 := net.Pipe() - c2, s2 := net.Pipe() - go CopyLoop(s1, s2) - go func() { - bytes := []byte("Hello!") - c1.Write(bytes) - }() - bytes := make([]byte, 6) - n, err := c2.Read(bytes) - So(n, ShouldEqual, 6) - So(err, ShouldEqual, nil) - So(bytes, ShouldResemble, []byte("Hello!")) - s1.Close() - - // Check that copy loop has closed other connection - _, err = s2.Write(bytes) - So(err, ShouldNotBeNil) - }) -} diff --git a/proxy-go/proxy-go_test.go b/proxy-go/proxy-go_test.go index 538957b..ebe4381 100644 --- a/proxy-go/proxy-go_test.go +++ b/proxy-go/proxy-go_test.go @@ -374,4 +374,23 @@ func TestUtilityFuncs(t *testing.T) { sid2 := genSessionID() So(sid1, ShouldNotEqual, sid2) }) + Convey("CopyLoop", t, func() { + c1, s1 := net.Pipe() + c2, s2 := net.Pipe() + go CopyLoop(s1, s2) + go func() { + bytes := []byte("Hello!") + c1.Write(bytes) + }() + bytes := make([]byte, 6) + n, err := c2.Read(bytes) + So(n, ShouldEqual, 6) + So(err, ShouldEqual, nil) + So(bytes, ShouldResemble, []byte("Hello!")) + s1.Close() + + //Check that copy loop has closed other connection + _, err = s2.Write(bytes) + So(err, ShouldNotBeNil) + }) } diff --git a/proxy-go/snowflake.go b/proxy-go/snowflake.go index 0e14eb2..c10093a 100644 --- a/proxy-go/snowflake.go +++ b/proxy-go/snowflake.go @@ -240,6 +240,22 @@ func (b *Broker) sendAnswer(sid string, pc *webrtc.PeerConnection) error { return nil }
+func CopyLoop(c1 io.ReadWriteCloser, c2 io.ReadWriteCloser) { + var wg sync.WaitGroup + copyer := func(dst io.ReadWriteCloser, src io.ReadWriteCloser) { + defer wg.Done() + if _, err := io.Copy(dst, src); err != nil { + log.Printf("io.Copy inside CopyLoop generated an error: %v", err) + } + dst.Close() + src.Close() + } + wg.Add(2) + go copyer(c1, c2) + go copyer(c2, c1) + wg.Wait() +} + // We pass conn.RemoteAddr() as an additional parameter, rather than calling // conn.RemoteAddr() inside this function, as a workaround for a hang that // otherwise occurs inside of conn.pc.RemoteDescription() (called by @@ -272,7 +288,7 @@ func datachannelHandler(conn *webRTCConn, remoteAddr net.Addr) { wsConn := websocketconn.NewWebSocketConn(ws) log.Printf("connected to relay") defer wsConn.Close() - websocketconn.CopyLoop(conn, &wsConn) + CopyLoop(conn, &wsConn) log.Printf("datachannelHandler ends") }
diff --git a/server/server.go b/server/server.go index d950ddc..e3f4c6f 100644 --- a/server/server.go +++ b/server/server.go @@ -15,6 +15,7 @@ import ( "os/signal" "path/filepath" "strings" + "sync" "syscall" "time"
@@ -50,6 +51,35 @@ additional HTTP listener on port 80 to work with ACME. flag.PrintDefaults() }
+// Copy from WebSocket to socket and vice versa. +func proxy(local *net.TCPConn, conn *websocketconn.WebSocketConn) { + var wg sync.WaitGroup + wg.Add(2) + + go func() { + if _, err := io.Copy(conn, local); err != nil { + log.Printf("error copying ORPort to WebSocket %v", err) + } + if err := local.CloseRead(); err != nil { + log.Printf("error closing read after copying ORPort to WebSocket %v", err) + } + conn.Close() + wg.Done() + }() + go func() { + if _, err := io.Copy(local, conn); err != nil { + log.Printf("error copying WebSocket to ORPort") + } + if err := local.CloseWrite(); err != nil { + log.Printf("error closing write after copying WebSocket to ORPort %v", err) + } + conn.Close() + wg.Done() + }() + + wg.Wait() +} + // Return an address string suitable to pass into pt.DialOr. func clientAddr(clientIPParam string) string { if clientIPParam == "" { @@ -75,8 +105,8 @@ func (handler *HTTPHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { return }
- wsConn := websocketconn.NewWebSocketConn(ws) - defer wsConn.Close() + conn := websocketconn.NewWebSocketConn(ws) + defer conn.Close()
// Pass the address of client as the remote address of incoming connection clientIPParam := r.URL.Query().Get("client_ip") @@ -93,7 +123,7 @@ func (handler *HTTPHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } defer or.Close()
- websocketconn.CopyLoop(or, &wsConn) + proxy(or, &conn) }
func initServer(addr *net.TCPAddr,
tor-commits@lists.torproject.org