commit 3a7e0ea620107dd35f7cea3fac25dd3c8591b0c5 Author: Serene Han keroserene+git@gmail.com Date: Tue Feb 23 16:51:04 2016 -0800
simplify client handler & dialWebRTC with independent webRTCConn constructor --- client/client_test.go | 12 +++--- client/snowflake.go | 55 ++++++++++---------------- client/webrtc.go | 105 +++++++++++++++++++++++++++++++------------------- 3 files changed, 92 insertions(+), 80 deletions(-)
diff --git a/client/client_test.go b/client/client_test.go index 14e9393..cad40de 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -33,19 +33,19 @@ func TestConnect(t *testing.T) { } So(c.buffer.Bytes(), ShouldEqual, nil)
- Convey("SendData buffers when datachannel is nil", func() { - c.SendData([]byte("test")) + Convey("sendData buffers when datachannel is nil", func() { + c.sendData([]byte("test")) c.snowflake = nil So(c.buffer.Bytes(), ShouldResemble, []byte("test")) })
- Convey("SendData sends to datachannel when not nil", func() { + Convey("sendData sends to datachannel when not nil", func() { mock := new(MockDataChannel) mock.done = make(chan bool) go c.SendLoop() c.writeChannel = make(chan []byte) c.snowflake = mock - c.SendData([]byte("test")) + c.sendData([]byte("test")) <-mock.done So(c.buffer.Bytes(), ShouldEqual, nil) So(mock.destination.Bytes(), ShouldResemble, []byte("test")) @@ -55,7 +55,7 @@ func TestConnect(t *testing.T) { c.answerChannel = make(chan *webrtc.SessionDescription) c.config = webrtc.NewConfiguration() c.PreparePeerConnection() - c.ReceiveAnswer() + c.receiveAnswer() sdp := webrtc.DeserializeSessionDescription("test") c.answerChannel <- sdp So(c.pc.RemoteDescription(), ShouldEqual, sdp) @@ -65,7 +65,7 @@ func TestConnect(t *testing.T) { Convey("Receive answer fails on nil answer", func() { c.reset = make(chan struct{}) c.answerChannel = make(chan *webrtc.SessionDescription) - c.ReceiveAnswer() + c.receiveAnswer() c.answerChannel <- nil <-c.reset }) diff --git a/client/snowflake.go b/client/snowflake.go index cc7f547..29d263a 100644 --- a/client/snowflake.go +++ b/client/snowflake.go @@ -18,7 +18,8 @@ import ( )
var ptInfo pt.ClientInfo -var logFile *os.File + +// var logFile *os.File var brokerURL string var frontDomain string
@@ -35,13 +36,13 @@ func copyLoop(a, b net.Conn) { wg.Add(2) // TODO fix copy loop recovery go func() { - io.Copy(b, a) - log.Println("copy loop b-a break") + written, err := io.Copy(b, a) + log.Println("copy loop b-a break", err, written) wg.Done() }() go func() { - io.Copy(a, b) - log.Println("copy loop a-b break") + written, err := io.Copy(a, b) + log.Println("copy loop a-b break", err, written) wg.Done() }() wg.Wait() @@ -55,27 +56,22 @@ type SnowflakeChannel interface { }
// Initialize a WebRTC Connection. -func dialWebRTC(config *webrtc.Configuration, broker *BrokerChannel) ( - *webRTCConn, error) { - connection := new(webRTCConn) - connection.config = config - connection.broker = broker - connection.offerChannel = make(chan *webrtc.SessionDescription) - connection.answerChannel = make(chan *webrtc.SessionDescription) - connection.writeChannel = make(chan []byte) - connection.errorChannel = make(chan error) - connection.reset = make(chan struct{}) - connection.BytesInfo = &BytesInfo{ - inboundChan: make(chan int), outboundChan: make(chan int), - inbound: 0, outbound: 0, inEvents: 0, outEvents: 0, - } - go connection.BytesInfo.Log() +func dialWebRTC() (*webRTCConn, error) { + + // TODO: [#3] Fetch ICE server information from Broker. + // TODO: [#18] Consider TURN servers here too. + config := webrtc.NewConfiguration( + webrtc.OptionIceServer("stun:stun.l.google.com:19302"))
- // Pipes remain the same even when DataChannel gets switched. - connection.recvPipe, connection.writePipe = io.Pipe() + broker := NewBrokerChannel(brokerURL, frontDomain) + if nil == broker { + return nil, errors.New("Failed to prepare BrokerChannel") + }
+ connection := NewWebRTCConnection(config, broker) go connection.ConnectLoop() go connection.SendLoop() + return connection, nil }
@@ -105,16 +101,7 @@ func handler(conn *pt.SocksConn) error { defer conn.Close() log.Println("handler fired:", conn)
- // TODO: [#3] Fetch ICE server information from Broker. - // TODO: [#18] Consider TURN servers here too. - config := webrtc.NewConfiguration( - webrtc.OptionIceServer("stun:stun.l.google.com:19302")) - broker := NewBrokerChannel(brokerURL, frontDomain) - if nil == broker { - conn.Reject() - return errors.New("Failed to prepare BrokerChannel") - } - remote, err := dialWebRTC(config, broker) + remote, err := dialWebRTC() if err != nil { conn.Reject() return err @@ -174,12 +161,12 @@ func readSignalingMessages(f *os.File) { }
func main() { - var err error + // var err error webrtc.SetLoggingVerbosity(1) flag.StringVar(&brokerURL, "url", "", "URL of signaling broker") flag.StringVar(&frontDomain, "front", "", "front domain") flag.Parse() - logFile, err = os.OpenFile("snowflake.log", os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0600) + logFile, err := os.OpenFile("snowflake.log", os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0600) if err != nil { log.Fatal(err) } diff --git a/client/webrtc.go b/client/webrtc.go index b1e1628..89d1592 100644 --- a/client/webrtc.go +++ b/client/webrtc.go @@ -34,7 +34,7 @@ func (c *webRTCConn) Read(b []byte) (int, error) { }
func (c *webRTCConn) Write(b []byte) (int, error) { - c.SendData(b) + c.sendData(b) return len(b), nil }
@@ -63,7 +63,64 @@ func (c *webRTCConn) SetWriteDeadline(t time.Time) error { return fmt.Errorf("SetWriteDeadline not implemented") }
-func (c *webRTCConn) PreparePeerConnection() { +func NewWebRTCConnection(config *webrtc.Configuration, + broker *BrokerChannel) *webRTCConn { + connection := new(webRTCConn) + connection.config = config + connection.broker = broker + connection.offerChannel = make(chan *webrtc.SessionDescription) + connection.answerChannel = make(chan *webrtc.SessionDescription) + connection.writeChannel = make(chan []byte) + connection.errorChannel = make(chan error) + connection.reset = make(chan struct{}) + + // Log every few seconds. + connection.BytesInfo = &BytesInfo{ + inboundChan: make(chan int), outboundChan: make(chan int), + inbound: 0, outbound: 0, inEvents: 0, outEvents: 0, + } + go connection.BytesInfo.Log() + + // Pipes remain the same even when DataChannel gets switched. + connection.recvPipe, connection.writePipe = io.Pipe() + + return connection +} + +// WebRTC re-establishment loop. Expected in own goroutine. +func (c *webRTCConn) ConnectLoop() { + for { + log.Println("Establishing WebRTC connection...") + // TODO: When go-webrtc is more stable, it's possible that a new + // PeerConnection won't need to be re-prepared each time. + c.preparePeerConnection() + err := c.establishDataChannel() + if err == nil { + c.sendOffer() + c.receiveAnswer() + <-c.reset + log.Println(" --- snowflake connection reset ---") + } else { + log.Println("WebRTC: Could not establish DataChannel.") + } + } +} + +// Expected in own goroutine. +func (c *webRTCConn) SendLoop() { + log.Println("send loop") + for data := range c.writeChannel { + // Flush buffer if necessary. + for c.buffer.Len() > 0 { + c.snowflake.Send(c.buffer.Bytes()) + log.Println("Flushed", c.buffer.Len(), "bytes") + c.buffer.Reset() + } + c.snowflake.Send(data) + } +} + +func (c *webRTCConn) preparePeerConnection() { if nil != c.pc { c.pc.Close() c.pc = nil @@ -110,7 +167,7 @@ func (c *webRTCConn) PreparePeerConnection() { }
// Create a WebRTC DataChannel locally. -func (c *webRTCConn) EstablishDataChannel() error { +func (c *webRTCConn) establishDataChannel() error { dc, err := c.pc.CreateDataChannel("snowflake", webrtc.Init{}) // Triggers "OnNegotiationNeeded" on the PeerConnection, which will prepare // an SDP offer while other goroutines operating on this struct handle the @@ -158,13 +215,14 @@ func (c *webRTCConn) EstablishDataChannel() error {
// Block until an offer is available, then send it to either // the Broker or signal pipe. -func (c *webRTCConn) SendOffer() error { +func (c *webRTCConn) sendOffer() error { + log.Println("sendOffer...") select { case offer := <-c.offerChannel: if "" == brokerURL { log.Printf("Please Copy & Paste the following to the peer:") log.Printf("----------------") - fmt.Fprintln(logFile, "\n"+offer.Serialize()+"\n") + log.Printf("\n" + offer.Serialize() + "\n") log.Printf("----------------") return nil } @@ -186,7 +244,7 @@ func (c *webRTCConn) SendOffer() error { return nil }
-func (c *webRTCConn) ReceiveAnswer() { +func (c *webRTCConn) receiveAnswer() { go func() { answer, ok := <-c.answerChannel if !ok || nil == answer { @@ -203,7 +261,7 @@ func (c *webRTCConn) ReceiveAnswer() { }() }
-func (c *webRTCConn) SendData(data []byte) { +func (c *webRTCConn) sendData(data []byte) { c.BytesInfo.AddOutbound(len(data)) // Buffer the data in case datachannel isn't available yet. if nil == c.snowflake { @@ -214,39 +272,6 @@ func (c *webRTCConn) SendData(data []byte) { c.writeChannel <- data }
-// Expected in own goroutine. -func (c *webRTCConn) SendLoop() { - log.Println("send loop") - for data := range c.writeChannel { - // Flush buffer if necessary. - for c.buffer.Len() > 0 { - c.snowflake.Send(c.buffer.Bytes()) - log.Println("Flushed", c.buffer.Len(), "bytes") - c.buffer.Reset() - } - c.snowflake.Send(data) - } -} - -// WebRTC re-establishment loop. Expected in own goroutine. -func (c *webRTCConn) ConnectLoop() { - for { - log.Println("Establishing WebRTC connection...") - // TODO: When go-webrtc is more stable, it's possible that a new - // PeerConnection won't need to be re-prepared each time. - c.PreparePeerConnection() - err := c.EstablishDataChannel() - if err == nil { - c.SendOffer() - c.ReceiveAnswer() - <-c.reset - log.Println(" --- snowflake connection reset ---") - } else { - log.Println("WebRTC: Could not establish DataChannel.") - } - } -} - func (c *webRTCConn) Reset() { go func() { c.reset <- struct{}{} // Attempt to negotiate a new datachannel..