[tor-commits] [snowflake/master] Golang implementation of standalone snowflake proxy

dcf at torproject.org dcf at torproject.org
Thu May 4 08:21:18 UTC 2017


commit 4b5794c93580d9a48d7a52c9e8f7e50db73836a7
Author: yinghuocho <yinghuocho at gmail.com>
Date:   Mon Apr 24 16:16:02 2017 -0700

    Golang implementation of standalone snowflake proxy
---
 proxy-go/snowflake.go | 326 ++++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 326 insertions(+)

diff --git a/proxy-go/snowflake.go b/proxy-go/snowflake.go
new file mode 100644
index 0000000..c7ee24a
--- /dev/null
+++ b/proxy-go/snowflake.go
@@ -0,0 +1,326 @@
+package main
+
+import (
+	"bytes"
+	"encoding/base64"
+	"flag"
+	"fmt"
+	"io"
+	"io/ioutil"
+	"log"
+	"math/rand"
+	"net"
+	"net/http"
+	"net/url"
+	"strings"
+	"sync"
+	"time"
+
+	"github.com/keroserene/go-webrtc"
+	"golang.org/x/net/websocket"
+)
+
+type snowflakeOptions struct {
+	capacity  int
+	broker    string
+	brokerURL *url.URL
+	stun      string
+	stunURL   *url.URL
+	relay     string
+	relayURL  *url.URL
+}
+
+const (
+	sessionIDLength = 16
+)
+
+var (
+	tokens chan bool
+	opt    *snowflakeOptions
+	config *webrtc.Configuration
+	client http.Client
+)
+
+type webRTCConn struct {
+	dc *webrtc.DataChannel
+	pc *webrtc.PeerConnection
+	pr *io.PipeReader
+}
+
+func (c *webRTCConn) Read(b []byte) (int, error) {
+	return c.pr.Read(b)
+}
+
+func (c *webRTCConn) Write(b []byte) (int, error) {
+	// log.Printf("webrtc Write %d %+q", len(b), string(b))
+	log.Printf("Write %d bytes --> WebRTC", len(b))
+	c.dc.Send(b)
+	return len(b), nil
+}
+
+func (c *webRTCConn) Close() error {
+	return c.pc.Close()
+}
+
+func (c *webRTCConn) LocalAddr() net.Addr {
+	return nil
+}
+
+func (c *webRTCConn) RemoteAddr() net.Addr {
+	return nil
+}
+
+func (c *webRTCConn) SetDeadline(t time.Time) error {
+	return fmt.Errorf("SetDeadline not implemented")
+}
+
+func (c *webRTCConn) SetReadDeadline(t time.Time) error {
+	return fmt.Errorf("SetReadDeadline not implemented")
+}
+
+func (c *webRTCConn) SetWriteDeadline(t time.Time) error {
+	return fmt.Errorf("SetWriteDeadline not implemented")
+}
+
+func getToken() {
+	<-tokens
+}
+
+func retToken() {
+	tokens <- true
+}
+
+func genSessionID() string {
+	buf := make([]byte, sessionIDLength)
+	_, err := rand.Read(buf)
+	if err != nil {
+		panic(err.Error())
+	}
+	return strings.TrimRight(base64.StdEncoding.EncodeToString(buf), "=")
+}
+
+func pollOffer(sid string, broker *url.URL) *webrtc.SessionDescription {
+	broker.Path = "/proxy"
+	for {
+		req, _ := http.NewRequest("POST", broker.String(), bytes.NewBuffer([]byte(sid)))
+		req.Header.Set("X-Session-ID", sid)
+		resp, err := client.Do(req)
+		if err != nil {
+			log.Printf("error to poll broker: %s", err)
+		} else {
+			defer resp.Body.Close()
+			if resp.StatusCode != http.StatusOK {
+				log.Printf("broker returns: %d", resp.StatusCode)
+			} else {
+				body, err := ioutil.ReadAll(resp.Body)
+				if err != nil {
+					log.Printf("error to read broker responses: %s", err)
+				} else {
+					return webrtc.DeserializeSessionDescription(string(body))
+				}
+			}
+		}
+	}
+}
+
+func sendAnswer(sid string, broker *url.URL, pc *webrtc.PeerConnection) error {
+	broker.Path = "/answer"
+	body := bytes.NewBuffer([]byte(pc.LocalDescription().Serialize()))
+	req, _ := http.NewRequest("POST", broker.String(), body)
+	req.Header.Set("X-Session-ID", sid)
+	resp, err := client.Do(req)
+	if err != nil {
+		return err
+	}
+	if resp.StatusCode != http.StatusOK {
+		return fmt.Errorf("broker returned %d", resp.StatusCode)
+	}
+	return nil
+}
+
+type timeoutConn struct {
+	c net.Conn
+	t time.Duration
+}
+
+func (tc timeoutConn) Read(buf []byte) (int, error) {
+	tc.c.SetDeadline(time.Now().Add(tc.t))
+	return tc.c.Read(buf)
+}
+
+func (tc timeoutConn) Write(buf []byte) (int, error) {
+	tc.c.SetDeadline(time.Now().Add(tc.t))
+	return tc.c.Write(buf)
+}
+
+func (tc timeoutConn) Close() error {
+	return tc.c.Close()
+}
+
+func CopyLoopTimeout(c1 net.Conn, c2 net.Conn, timeout time.Duration) {
+	tc1 := timeoutConn{c: c1, t: timeout}
+	tc2 := timeoutConn{c: c2, t: timeout}
+	var wg sync.WaitGroup
+	copyer := func(dst io.ReadWriteCloser, src io.ReadWriteCloser) {
+		defer wg.Done()
+		io.Copy(dst, src)
+		dst.Close()
+		src.Close()
+	}
+	wg.Add(2)
+	go copyer(tc1, tc2)
+	go copyer(tc2, tc1)
+	wg.Wait()
+}
+
+func datachannelHandler(conn *webRTCConn) {
+	defer conn.Close()
+	defer retToken()
+
+	wsConn, err := websocket.Dial(opt.relay, "", opt.relay)
+	if err != nil {
+		log.Printf("error to dial relay: %s", err)
+		return
+	}
+	log.Printf("connected to relay")
+	defer wsConn.Close()
+	wsConn.PayloadType = websocket.BinaryFrame
+	CopyLoopTimeout(conn, wsConn, time.Minute)
+	log.Printf("datachannelHandler ends")
+}
+
+// Create a PeerConnection from an SDP offer. Blocks until the gathering of ICE
+// candidates is complete and the answer is available in LocalDescription.
+// Installs an OnDataChannel callback that creates a webRTCConn and passes it to
+// datachannelHandler.
+func makePeerConnectionFromOffer(sdp *webrtc.SessionDescription, config *webrtc.Configuration) (*webrtc.PeerConnection, error) {
+	errChan := make(chan error)
+	answerChan := make(chan struct{})
+
+	pc, err := webrtc.NewPeerConnection(config)
+	if err != nil {
+		return nil, fmt.Errorf("accept: NewPeerConnection: %s", err)
+	}
+	pc.OnNegotiationNeeded = func() {
+		panic("OnNegotiationNeeded")
+	}
+	pc.OnIceComplete = func() {
+		answerChan <- struct{}{}
+	}
+	pc.OnDataChannel = func(dc *webrtc.DataChannel) {
+		log.Println("OnDataChannel")
+
+		pr, pw := io.Pipe()
+
+		dc.OnOpen = func() {
+			log.Println("OnOpen channel")
+		}
+		dc.OnClose = func() {
+			log.Println("OnClose channel")
+			pw.Close()
+		}
+		dc.OnMessage = func(msg []byte) {
+			log.Printf("OnMessage <--- %d bytes", len(msg))
+			n, err := pw.Write(msg)
+			if err != nil {
+				pw.CloseWithError(err)
+			}
+			if n != len(msg) {
+				panic("short write")
+			}
+		}
+		conn := &webRTCConn{pc: pc, dc: dc, pr: pr}
+		go datachannelHandler(conn)
+	}
+
+	err = pc.SetRemoteDescription(sdp)
+	if err != nil {
+		pc.Close()
+		return nil, fmt.Errorf("accept: SetRemoteDescription: %s", err)
+	}
+	log.Println("sdp offer successfully received.")
+
+	go func() {
+		log.Println("Generating answer...")
+		answer, err := pc.CreateAnswer() // blocking
+		if err != nil {
+			errChan <- err
+			return
+		}
+		err = pc.SetLocalDescription(answer)
+		if err != nil {
+			errChan <- err
+			return
+		}
+	}()
+
+	// Wait until answer is ready.
+	select {
+	case err = <-errChan:
+		pc.Close()
+		return nil, err
+	case _, ok := <-answerChan:
+		if !ok {
+			pc.Close()
+			return nil, fmt.Errorf("Failed gathering ICE candidates.")
+		}
+	}
+	return pc, nil
+}
+
+func runSession(sid string) {
+	offer := pollOffer(sid, opt.brokerURL)
+	if offer == nil {
+		log.Printf("bad offer from broker")
+		retToken()
+		return
+	}
+	pc, err := makePeerConnectionFromOffer(offer, config)
+	if err != nil {
+		log.Printf("error to make WebRTC connection: %s", err)
+		retToken()
+		return
+	}
+	err = sendAnswer(sid, opt.brokerURL, pc)
+	if err != nil {
+		log.Printf("error to send answer to client through broker: %s", err)
+		pc.Close()
+		retToken()
+		return
+	}
+}
+
+func main() {
+	opt = new(snowflakeOptions)
+	flag.IntVar(&opt.capacity, "capacity", 10, "maximum concurrent clients")
+	flag.StringVar(&opt.broker, "broker", "https://snowflake-reg.appspot.com/", "broker URL")
+	flag.StringVar(&opt.relay, "relay", "wss://snowflake.bamsoftware.com/", "websocket relay URL")
+	flag.StringVar(&opt.stun, "stun", "stun:stun.l.google.com:19302", "stun URL")
+	flag.Parse()
+
+	var err error
+	opt.brokerURL, err = url.Parse(opt.broker)
+	if err != nil {
+		log.Fatalf("invalid broker url: %s", err)
+	}
+	opt.stunURL, err = url.Parse(opt.stun)
+	if err != nil {
+		log.Fatalf("invalid stun url: %s", err)
+	}
+	opt.relayURL, err = url.Parse(opt.relay)
+	if err != nil {
+		log.Fatalf("invalid relay url: %s", err)
+	}
+
+	config = webrtc.NewConfiguration(webrtc.OptionIceServer(opt.stun))
+	tokens = make(chan bool, opt.capacity)
+	for i := 0; i < opt.capacity; i++ {
+		tokens <- true
+	}
+
+	for {
+		getToken()
+		sessionID := genSessionID()
+		runSession(sessionID)
+	}
+}





More information about the tor-commits mailing list