commit f86f5b5b07d8b9409a58d85f3169e85446fd17e8 Author: Serene Han keroserene+git@gmail.com Date: Sun Feb 21 22:03:17 2016 -0800
move webRTCConn into separate file --- client/snowflake.go | 247 -------------------------------------------------- client/webrtc.go | 255 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 255 insertions(+), 247 deletions(-)
diff --git a/client/snowflake.go b/client/snowflake.go index 4e31f15..cc7f547 100644 --- a/client/snowflake.go +++ b/client/snowflake.go @@ -3,10 +3,8 @@ package main
import ( "bufio" - "bytes" "errors" "flag" - "fmt" "io" "log" "net" @@ -14,7 +12,6 @@ import ( "os/signal" "sync" "syscall" - "time"
"git.torproject.org/pluggable-transports/goptlib.git" "github.com/keroserene/go-webrtc" @@ -57,250 +54,6 @@ type SnowflakeChannel interface { Close() error }
-// Implements net.Conn interface -type webRTCConn struct { - config *webrtc.Configuration - pc *webrtc.PeerConnection - snowflake SnowflakeChannel // Interface holding the WebRTC DataChannel. - broker *BrokerChannel - offerChannel chan *webrtc.SessionDescription - answerChannel chan *webrtc.SessionDescription - errorChannel chan error - writeChannel chan []byte - recvPipe *io.PipeReader - writePipe *io.PipeWriter - buffer bytes.Buffer - reset chan struct{} - *BytesInfo -} - -var webrtcRemote *webRTCConn - -func (c *webRTCConn) Read(b []byte) (int, error) { - return c.recvPipe.Read(b) -} - -func (c *webRTCConn) Write(b []byte) (int, error) { - c.SendData(b) - return len(b), nil -} - -func (c *webRTCConn) Close() error { - // Data channel closed implicitly? - return c.pc.Close() -} - -func (c *webRTCConn) LocalAddr() net.Addr { - return nil -} - -func (c *webRTCConn) RemoteAddr() net.Addr { - return nil -} - -func (c *webRTCConn) SetDeadline(t time.Time) error { - return fmt.Errorf("SetDeadline not implemented") -} - -func (c *webRTCConn) SetReadDeadline(t time.Time) error { - return fmt.Errorf("SetReadDeadline not implemented") -} - -func (c *webRTCConn) SetWriteDeadline(t time.Time) error { - return fmt.Errorf("SetWriteDeadline not implemented") -} - -func (c *webRTCConn) PreparePeerConnection() { - if nil != c.pc { - c.pc.Close() - c.pc = nil - } - pc, err := webrtc.NewPeerConnection(c.config) - if err != nil { - log.Printf("NewPeerConnection: %s", err) - c.errorChannel <- err - } - // Prepare PeerConnection callbacks. - pc.OnNegotiationNeeded = func() { - log.Println("WebRTC: OnNegotiationNeeded") - go func() { - offer, err := pc.CreateOffer() - // TODO: Potentially timeout and retry if ICE isn't working. - if err != nil { - c.errorChannel <- err - return - } - err = pc.SetLocalDescription(offer) - if err != nil { - c.errorChannel <- err - return - } - }() - } - // Allow candidates to accumulate until OnIceComplete. - pc.OnIceCandidate = func(candidate webrtc.IceCandidate) { - log.Printf(candidate.Candidate) - } - // TODO: This may soon be deprecated, consider OnIceGatheringStateChange. - pc.OnIceComplete = func() { - log.Printf("WebRTC: OnIceComplete") - c.offerChannel <- pc.LocalDescription() - } - // This callback is not expected, as the Client initiates the creation - // of the data channel, not the remote peer. - pc.OnDataChannel = func(channel *webrtc.DataChannel) { - log.Println("OnDataChannel") - panic("Unexpected OnDataChannel!") - } - c.pc = pc - log.Println("WebRTC: PeerConnection created.") -} - -// Create a WebRTC DataChannel locally. -func (c *webRTCConn) EstablishDataChannel() error { - dc, err := c.pc.CreateDataChannel("snowflake", webrtc.Init{}) - // Triggers "OnNegotiationNeeded" on the PeerConnection, which will prepare - // an SDP offer while other goroutines operating on this struct handle the - // signaling. Eventually fires "OnOpen". - if err != nil { - log.Printf("CreateDataChannel: %s", err) - return err - } - dc.OnOpen = func() { - log.Println("WebRTC: DataChannel.OnOpen") - if nil != c.snowflake { - panic("PeerConnection snowflake already exists.") - } - // Flush the buffer, then enable datachannel. - dc.Send(c.buffer.Bytes()) - log.Println("Flushed", c.buffer.Len(), "bytes") - c.buffer.Reset() - c.snowflake = dc - } - dc.OnClose = func() { - // Disable the DataChannel as a write destination. - // Future writes will go to the buffer until a new DataChannel is available. - log.Println("WebRTC: DataChannel.OnClose") - // Only reset if this OnClose was triggered remotely. - if nil != c.snowflake { - c.snowflake = nil - c.Reset() - } - } - dc.OnMessage = func(msg []byte) { - c.BytesInfo.AddInbound(len(msg)) - n, err := c.writePipe.Write(msg) - if err != nil { - // TODO: Maybe shouldn't actually close. - log.Println("Error writing to SOCKS pipe") - c.writePipe.CloseWithError(err) - } - if n != len(msg) { - panic("short write") - } - } - log.Println("WebRTC: DataChannel created.") - return nil -} - -// Block until an offer is available, then send it to either -// the Broker or signal pipe. -func (c *webRTCConn) SendOffer() error { - select { - case offer := <-c.offerChannel: - if "" == brokerURL { - log.Printf("Please Copy & Paste the following to the peer:") - log.Printf("----------------") - fmt.Fprintln(logFile, "\n"+offer.Serialize()+"\n") - log.Printf("----------------") - return nil - } - // Otherwise, use Broker. - go func() { - log.Println("Sending offer via BrokerChannel...\nTarget URL: ", brokerURL, - "\nFront URL: ", frontDomain) - answer, err := c.broker.Negotiate(c.pc.LocalDescription()) - if nil != err || nil == answer { - log.Printf("BrokerChannel error: %s", err) - answer = nil - } - c.answerChannel <- answer - }() - case err := <-c.errorChannel: - c.pc.Close() - return err - } - return nil -} - -func (c *webRTCConn) ReceiveAnswer() { - go func() { - answer, ok := <-c.answerChannel - if !ok || nil == answer { - log.Printf("Failed to retrieve answer. Retrying in %d seconds", ReconnectTimeout) - <-time.After(time.Second * ReconnectTimeout) - c.Reset() - return - } - log.Printf("Received Answer:\n\n%s\n", answer.Sdp) - err := c.pc.SetRemoteDescription(answer) - if nil != err { - c.errorChannel <- err - } - }() -} - -func (c *webRTCConn) SendData(data []byte) { - c.BytesInfo.AddOutbound(len(data)) - // Buffer the data in case datachannel isn't available yet. - if nil == c.snowflake { - log.Printf("Buffered %d bytes --> WebRTC", len(data)) - c.buffer.Write(data) - return - } - c.writeChannel <- data -} - -// Expected in own goroutine. -func (c *webRTCConn) SendLoop() { - log.Println("send loop") - for data := range c.writeChannel { - // Flush buffer if necessary. - for c.buffer.Len() > 0 { - c.snowflake.Send(c.buffer.Bytes()) - log.Println("Flushed", c.buffer.Len(), "bytes") - c.buffer.Reset() - } - c.snowflake.Send(data) - } -} - -// WebRTC re-establishment loop. Expected in own goroutine. -func (c *webRTCConn) ConnectLoop() { - for { - log.Println("Establishing WebRTC connection...") - // TODO: When go-webrtc is more stable, it's possible that a new - // PeerConnection won't need to be re-prepared each time. - c.PreparePeerConnection() - err := c.EstablishDataChannel() - if err == nil { - c.SendOffer() - c.ReceiveAnswer() - <-c.reset - log.Println(" --- snowflake connection reset ---") - } else { - log.Println("WebRTC: Could not establish DataChannel.") - } - } -} - -func (c *webRTCConn) Reset() { - go func() { - c.reset <- struct{}{} // Attempt to negotiate a new datachannel.. - log.Println("WebRTC resetting...") - }() -} - // Initialize a WebRTC Connection. func dialWebRTC(config *webrtc.Configuration, broker *BrokerChannel) ( *webRTCConn, error) { diff --git a/client/webrtc.go b/client/webrtc.go new file mode 100644 index 0000000..b1e1628 --- /dev/null +++ b/client/webrtc.go @@ -0,0 +1,255 @@ +package main + +import ( + "bytes" + "fmt" + "github.com/keroserene/go-webrtc" + "io" + "log" + "net" + "time" +) + +// Implements net.Conn interface +type webRTCConn struct { + config *webrtc.Configuration + pc *webrtc.PeerConnection + snowflake SnowflakeChannel // Interface holding the WebRTC DataChannel. + broker *BrokerChannel + offerChannel chan *webrtc.SessionDescription + answerChannel chan *webrtc.SessionDescription + errorChannel chan error + writeChannel chan []byte + recvPipe *io.PipeReader + writePipe *io.PipeWriter + buffer bytes.Buffer + reset chan struct{} + *BytesInfo +} + +var webrtcRemote *webRTCConn + +func (c *webRTCConn) Read(b []byte) (int, error) { + return c.recvPipe.Read(b) +} + +func (c *webRTCConn) Write(b []byte) (int, error) { + c.SendData(b) + return len(b), nil +} + +func (c *webRTCConn) Close() error { + // Data channel closed implicitly? + return c.pc.Close() +} + +func (c *webRTCConn) LocalAddr() net.Addr { + return nil +} + +func (c *webRTCConn) RemoteAddr() net.Addr { + return nil +} + +func (c *webRTCConn) SetDeadline(t time.Time) error { + return fmt.Errorf("SetDeadline not implemented") +} + +func (c *webRTCConn) SetReadDeadline(t time.Time) error { + return fmt.Errorf("SetReadDeadline not implemented") +} + +func (c *webRTCConn) SetWriteDeadline(t time.Time) error { + return fmt.Errorf("SetWriteDeadline not implemented") +} + +func (c *webRTCConn) PreparePeerConnection() { + if nil != c.pc { + c.pc.Close() + c.pc = nil + } + pc, err := webrtc.NewPeerConnection(c.config) + if err != nil { + log.Printf("NewPeerConnection: %s", err) + c.errorChannel <- err + } + // Prepare PeerConnection callbacks. + pc.OnNegotiationNeeded = func() { + log.Println("WebRTC: OnNegotiationNeeded") + go func() { + offer, err := pc.CreateOffer() + // TODO: Potentially timeout and retry if ICE isn't working. + if err != nil { + c.errorChannel <- err + return + } + err = pc.SetLocalDescription(offer) + if err != nil { + c.errorChannel <- err + return + } + }() + } + // Allow candidates to accumulate until OnIceComplete. + pc.OnIceCandidate = func(candidate webrtc.IceCandidate) { + log.Printf(candidate.Candidate) + } + // TODO: This may soon be deprecated, consider OnIceGatheringStateChange. + pc.OnIceComplete = func() { + log.Printf("WebRTC: OnIceComplete") + c.offerChannel <- pc.LocalDescription() + } + // This callback is not expected, as the Client initiates the creation + // of the data channel, not the remote peer. + pc.OnDataChannel = func(channel *webrtc.DataChannel) { + log.Println("OnDataChannel") + panic("Unexpected OnDataChannel!") + } + c.pc = pc + log.Println("WebRTC: PeerConnection created.") +} + +// Create a WebRTC DataChannel locally. +func (c *webRTCConn) EstablishDataChannel() error { + dc, err := c.pc.CreateDataChannel("snowflake", webrtc.Init{}) + // Triggers "OnNegotiationNeeded" on the PeerConnection, which will prepare + // an SDP offer while other goroutines operating on this struct handle the + // signaling. Eventually fires "OnOpen". + if err != nil { + log.Printf("CreateDataChannel: %s", err) + return err + } + dc.OnOpen = func() { + log.Println("WebRTC: DataChannel.OnOpen") + if nil != c.snowflake { + panic("PeerConnection snowflake already exists.") + } + // Flush the buffer, then enable datachannel. + dc.Send(c.buffer.Bytes()) + log.Println("Flushed", c.buffer.Len(), "bytes") + c.buffer.Reset() + c.snowflake = dc + } + dc.OnClose = func() { + // Disable the DataChannel as a write destination. + // Future writes will go to the buffer until a new DataChannel is available. + log.Println("WebRTC: DataChannel.OnClose") + // Only reset if this OnClose was triggered remotely. + if nil != c.snowflake { + c.snowflake = nil + c.Reset() + } + } + dc.OnMessage = func(msg []byte) { + c.BytesInfo.AddInbound(len(msg)) + n, err := c.writePipe.Write(msg) + if err != nil { + // TODO: Maybe shouldn't actually close. + log.Println("Error writing to SOCKS pipe") + c.writePipe.CloseWithError(err) + } + if n != len(msg) { + panic("short write") + } + } + log.Println("WebRTC: DataChannel created.") + return nil +} + +// Block until an offer is available, then send it to either +// the Broker or signal pipe. +func (c *webRTCConn) SendOffer() error { + select { + case offer := <-c.offerChannel: + if "" == brokerURL { + log.Printf("Please Copy & Paste the following to the peer:") + log.Printf("----------------") + fmt.Fprintln(logFile, "\n"+offer.Serialize()+"\n") + log.Printf("----------------") + return nil + } + // Otherwise, use Broker. + go func() { + log.Println("Sending offer via BrokerChannel...\nTarget URL: ", brokerURL, + "\nFront URL: ", frontDomain) + answer, err := c.broker.Negotiate(c.pc.LocalDescription()) + if nil != err || nil == answer { + log.Printf("BrokerChannel error: %s", err) + answer = nil + } + c.answerChannel <- answer + }() + case err := <-c.errorChannel: + c.pc.Close() + return err + } + return nil +} + +func (c *webRTCConn) ReceiveAnswer() { + go func() { + answer, ok := <-c.answerChannel + if !ok || nil == answer { + log.Printf("Failed to retrieve answer. Retrying in %d seconds", ReconnectTimeout) + <-time.After(time.Second * ReconnectTimeout) + c.Reset() + return + } + log.Printf("Received Answer:\n\n%s\n", answer.Sdp) + err := c.pc.SetRemoteDescription(answer) + if nil != err { + c.errorChannel <- err + } + }() +} + +func (c *webRTCConn) SendData(data []byte) { + c.BytesInfo.AddOutbound(len(data)) + // Buffer the data in case datachannel isn't available yet. + if nil == c.snowflake { + log.Printf("Buffered %d bytes --> WebRTC", len(data)) + c.buffer.Write(data) + return + } + c.writeChannel <- data +} + +// Expected in own goroutine. +func (c *webRTCConn) SendLoop() { + log.Println("send loop") + for data := range c.writeChannel { + // Flush buffer if necessary. + for c.buffer.Len() > 0 { + c.snowflake.Send(c.buffer.Bytes()) + log.Println("Flushed", c.buffer.Len(), "bytes") + c.buffer.Reset() + } + c.snowflake.Send(data) + } +} + +// WebRTC re-establishment loop. Expected in own goroutine. +func (c *webRTCConn) ConnectLoop() { + for { + log.Println("Establishing WebRTC connection...") + // TODO: When go-webrtc is more stable, it's possible that a new + // PeerConnection won't need to be re-prepared each time. + c.PreparePeerConnection() + err := c.EstablishDataChannel() + if err == nil { + c.SendOffer() + c.ReceiveAnswer() + <-c.reset + log.Println(" --- snowflake connection reset ---") + } else { + log.Println("WebRTC: Could not establish DataChannel.") + } + } +} + +func (c *webRTCConn) Reset() { + go func() { + c.reset <- struct{}{} // Attempt to negotiate a new datachannel.. + log.Println("WebRTC resetting...") + }() +}
tor-commits@lists.torproject.org