This is an automated email from the git hooks/post-receive script.
itchyonion pushed a commit to branch main in repository pluggable-transports/snowflake.
commit 5dd0a31d95052eca7e89cef8ccc527b74a1c6be5 Author: itchyonion itchyonion@torproject.org AuthorDate: Fri Feb 24 01:56:22 2023 -0800
Add comments and improve logging --- proxy/lib/snowflake.go | 107 +++++++++++++++++++++++++++++++------------------ 1 file changed, 69 insertions(+), 38 deletions(-)
diff --git a/proxy/lib/snowflake.go b/proxy/lib/snowflake.go index 9616132..9b25405 100644 --- a/proxy/lib/snowflake.go +++ b/proxy/lib/snowflake.go @@ -51,30 +51,37 @@ import ( "github.com/pion/webrtc/v3" )
-const DefaultBrokerURL = "https://snowflake-broker.torproject.net/" - -const DefaultNATProbeURL = "https://snowflake-broker.torproject.net:8443/probe" - -const DefaultRelayURL = "wss://snowflake.bamsoftware.com/" - -const DefaultSTUNURL = "stun:stun.l.google.com:19302" -const DefaultProxyType = "standalone" -const pollInterval = 5 * time.Second +const ( + DefaultBrokerURL = "https://snowflake-broker.torproject.net/" + DefaultNATProbeURL = "https://snowflake-broker.torproject.net:8443/probe" + DefaultRelayURL = "wss://snowflake.bamsoftware.com/" + DefaultSTUNURL = "stun:stun.l.google.com:19302" + DefaultProxyType = "standalone" +)
const ( // NATUnknown represents a NAT type which is unknown. NATUnknown = "unknown" + // NATRestricted represents a restricted NAT. NATRestricted = "restricted" + // NATUnrestricted represents an unrestricted NAT. NATUnrestricted = "unrestricted" )
-// amount of time after sending an SDP answer before the proxy assumes the -// client is not going to connect -const dataChannelTimeout = 20 * time.Second +const ( + pollInterval = 5 * time.Second + + // Amount of time after sending an SDP answer before the proxy assumes the + // client is not going to connect + dataChannelTimeout = 20 * time.Second + + //Maximum number of bytes to be read from an HTTP request + readLimit = 100000
-const readLimit = 100000 //Maximum number of bytes to be read from an HTTP request + sessionIDLength = 16 +)
var broker *SignalingServer
@@ -90,10 +97,6 @@ func getCurrentNATType() string { return currentNATType }
-const ( - sessionIDLength = 16 -) - var ( tokens *tokens_t config webrtc.Configuration @@ -184,16 +187,15 @@ func newSignalingServer(rawURL string, keepLocalAddresses bool) (*SignalingServe
// Post sends a POST request to the SignalingServer func (s *SignalingServer) Post(path string, payload io.Reader) ([]byte, error) { - req, err := http.NewRequest("POST", path, payload) if err != nil { return nil, err } + resp, err := s.transport.RoundTrip(req) if err != nil { return nil, err } - if resp.StatusCode != http.StatusOK { return nil, fmt.Errorf("remote returned status code %d", resp.StatusCode) } @@ -202,6 +204,8 @@ func (s *SignalingServer) Post(path string, payload io.Reader) ([]byte, error) { return limitedRead(resp.Body, readLimit) }
+// pollOffer communicates the proxy's capabilities with broker +// and retrieves a compatible SDP offer func (s *SignalingServer) pollOffer(sid string, proxyType string, acceptedRelayPattern string, shutdown chan struct{}) (*webrtc.SessionDescription, string) { brokerPath := s.url.ResolveReference(&url.URL{Path: "proxy"})
@@ -221,6 +225,7 @@ func (s *SignalingServer) pollOffer(sid string, proxyType string, acceptedRelayP log.Printf("Error encoding poll message: %s", err.Error()) return nil, "" } + resp, err := s.Post(brokerPath.String(), bytes.NewBuffer(body)) if err != nil { log.Printf("error polling broker: %s", err.Error()) @@ -239,7 +244,6 @@ func (s *SignalingServer) pollOffer(sid string, proxyType string, acceptedRelayP return nil, "" } return offer, relayURL - } } } @@ -247,7 +251,6 @@ func (s *SignalingServer) pollOffer(sid string, proxyType string, acceptedRelayP }
func (s *SignalingServer) sendAnswer(sid string, pc *webrtc.PeerConnection) error { - brokerPath := s.url.ResolveReference(&url.URL{Path: "answer"}) ld := pc.LocalDescription() if !s.keepLocalAddresses { ld = &webrtc.SessionDescription{ @@ -255,14 +258,18 @@ func (s *SignalingServer) sendAnswer(sid string, pc *webrtc.PeerConnection) erro SDP: util.StripLocalAddresses(ld.SDP), } } + answer, err := util.SerializeSessionDescription(ld) if err != nil { return err } + body, err := messages.EncodeAnswerRequest(answer, sid) if err != nil { return err } + + brokerPath := s.url.ResolveReference(&url.URL{Path: "answer"}) resp, err := s.Post(brokerPath.String(), bytes.NewBuffer(body)) if err != nil { return fmt.Errorf("error sending answer to broker: %s", err.Error()) @@ -307,8 +314,8 @@ func copyLoop(c1 io.ReadWriteCloser, c2 io.ReadWriteCloser, shutdown chan struct
// We pass conn.RemoteAddr() as an additional parameter, rather than calling // conn.RemoteAddr() inside this function, as a workaround for a hang that -// otherwise occurs inside of conn.pc.RemoteDescription() (called by -// RemoteAddr). https://bugs.torproject.org/18628#comment:8 +// otherwise occurs inside conn.pc.RemoteDescription() (called by RemoteAddr). +// https://bugs.torproject.org/18628#comment:8 func (sf *SnowflakeProxy) datachannelHandler(conn *webRTCConn, remoteAddr net.Addr, relayURL string) { defer conn.Close() defer tokens.ret() @@ -316,6 +323,7 @@ func (sf *SnowflakeProxy) datachannelHandler(conn *webRTCConn, remoteAddr net.Ad if relayURL == "" { relayURL = sf.RelayURL } + u, err := url.Parse(relayURL) if err != nil { log.Fatalf("invalid relay url: %s", err) @@ -336,8 +344,9 @@ func (sf *SnowflakeProxy) datachannelHandler(conn *webRTCConn, remoteAddr net.Ad log.Printf("error dialing relay: %s = %s", u.String(), err) return } + wsConn := websocketconn.New(ws) - log.Printf("connected to relay: %v", relayURL) + log.Printf("Connected to relay: %v", relayURL) defer wsConn.Close() copyLoop(conn, wsConn, sf.shutdown) log.Printf("datachannelHandler ends") @@ -367,6 +376,7 @@ func (sf *SnowflakeProxy) makeWebRTCAPI() *webrtc.API { // still have server reflexive candidates to fall back on settingsEngine.SetNAT1To1IPs([]string{sf.OutboundAddress}, webrtc.ICECandidateTypeHost) } + settingsEngine.SetICEMulticastDNSMode(ice.MulticastDNSModeDisabled)
return webrtc.NewAPI(webrtc.WithSettingEngine(settingsEngine)) @@ -386,20 +396,33 @@ func (sf *SnowflakeProxy) makePeerConnectionFromOffer(sdp *webrtc.SessionDescrip if err != nil { return nil, fmt.Errorf("accept: NewPeerConnection: %s", err) } + pc.OnDataChannel(func(dc *webrtc.DataChannel) { - log.Println("OnDataChannel") + log.Printf("New Data Channel %s-%d\n", dc.Label(), dc.ID()) close(dataChan)
pr, pw := io.Pipe() conn := newWebRTCConn(pc, dc, pr, sf.EventDispatcher)
dc.OnOpen(func() { - log.Println("OnOpen channel") + log.Printf("Data Channel %s-%d open\n", dc.Label(), dc.ID()) + + if sf.OutboundAddress != "" { + selectedCandidatePair, err := pc.SCTP().Transport().ICETransport().GetSelectedCandidatePair() + if err != nil { + log.Printf("Warning: couldn't get the selected candidate pair") + } + + log.Printf("Selected Local Candidate: %s:%d", selectedCandidatePair.Local.Address, selectedCandidatePair.Local.Port) + if sf.OutboundAddress != selectedCandidatePair.Local.Address { + log.Printf("Warning: the IP address provided by --outbound-address is not used for establishing peerconnection") + } + } }) dc.OnClose(func() { conn.lock.Lock() defer conn.lock.Unlock() - log.Println("OnClose channel") + log.Printf("Data Channel %s-%d close\n", dc.Label(), dc.ID()) log.Println(conn.bytesLogger.ThroughputSummary()) in, out := conn.bytesLogger.GetStat() conn.eventLogger.OnNewSnowflakeEvent(event.EventOnProxyConnectionOver{ @@ -437,7 +460,6 @@ func (sf *SnowflakeProxy) makePeerConnectionFromOffer(sdp *webrtc.SessionDescrip } return nil, fmt.Errorf("accept: SetRemoteDescription: %s", err) } - log.Println("sdp offer successfully received.")
log.Println("Generating answer...") answer, err := pc.CreateAnswer(nil) @@ -458,8 +480,11 @@ func (sf *SnowflakeProxy) makePeerConnectionFromOffer(sdp *webrtc.SessionDescrip } return nil, err } + // Wait for ICE candidate gathering to complete <-done + log.Printf("Answer: \n\t%s", strings.ReplaceAll(pc.LocalDescription().SDP, "\n", "\n\t")) + return pc, nil }
@@ -497,7 +522,7 @@ func (sf *SnowflakeProxy) makeNewPeerConnection(config webrtc.Configuration, pc.Close() return nil, err } - log.Println("WebRTC: Created offer") + log.Println("Probetest: Creating offer")
// As of v3.0.0, pion-webrtc uses trickle ICE by default. // We have to wait for candidate gathering to complete @@ -509,7 +534,7 @@ func (sf *SnowflakeProxy) makeNewPeerConnection(config webrtc.Configuration, pc.Close() return nil, err } - log.Println("WebRTC: Set local description") + log.Println("Probetest: Set local description")
// Wait for ICE candidate gathering to complete <-done @@ -523,6 +548,8 @@ func (sf *SnowflakeProxy) runSession(sid string) { tokens.ret() return } + log.Printf("Received Offer From Broker: \n\t%s", strings.ReplaceAll(offer.SDP, "\n", "\n\t")) + matcher := namematcher.NewNameMatcher(sf.RelayDomainNamePattern) parsedRelayURL, err := url.Parse(relayURL) if err != nil { @@ -530,11 +557,13 @@ func (sf *SnowflakeProxy) runSession(sid string) { tokens.ret() return } + if relayURL != "" && (!matcher.IsMember(parsedRelayURL.Hostname()) || (!sf.AllowNonTLSRelay && parsedRelayURL.Scheme != "wss")) { log.Printf("bad offer from broker: rejected Relay URL") tokens.ret() return } + dataChan := make(chan struct{}) dataChannelAdaptor := dataChannelHandlerWithRelayURL{RelayURL: relayURL, sf: sf} pc, err := sf.makePeerConnectionFromOffer(offer, config, dataChan, dataChannelAdaptor.datachannelHandler) @@ -543,6 +572,7 @@ func (sf *SnowflakeProxy) runSession(sid string) { tokens.ret() return } + err = broker.sendAnswer(sid, pc) if err != nil { log.Printf("error sending answer to client through broker: %s", err) @@ -557,7 +587,7 @@ func (sf *SnowflakeProxy) runSession(sid string) { // destroy the peer connection and return the token. select { case <-dataChan: - log.Println("Connection successful.") + log.Println("Connection successful") case <-time.After(dataChannelTimeout): log.Println("Timed out waiting for client to open data channel.") if err := pc.Close(); err != nil { @@ -622,11 +652,8 @@ func (sf *SnowflakeProxy) Start() error { } tokens = newTokens(sf.Capacity)
- // use probetest to determine NAT compatability sf.checkNATType(config, sf.NATProbeURL) - currentNATTypeLoaded := getCurrentNATType() - sf.EventDispatcher.OnNewSnowflakeEvent(&event.EventOnCurrentNATTypeDetermined{CurNATType: currentNATTypeLoaded})
NatRetestTask := task.Periodic{ @@ -663,14 +690,16 @@ func (sf *SnowflakeProxy) Stop() { close(sf.shutdown) }
+// checkNATType use probetest to determine NAT compatability by +// attempting to connect with a known symmetric NAT. If success, +// it is considered "unrestricted". If timeout it is considered "restricted" func (sf *SnowflakeProxy) checkNATType(config webrtc.Configuration, probeURL string) { - probe, err := newSignalingServer(probeURL, false) if err != nil { log.Printf("Error parsing url: %s", err.Error()) }
- // create offer + // create offer used for probetest dataChan := make(chan struct{}) pc, err := sf.makeNewPeerConnection(config, dataChan) if err != nil { @@ -679,7 +708,7 @@ func (sf *SnowflakeProxy) checkNATType(config webrtc.Configuration, probeURL str }
offer := pc.LocalDescription() - log.Printf("Offer: \n\t%s", strings.ReplaceAll(offer.SDP, "\n", "\n\t")) + log.Printf("Probetest offer: \n\t%s", strings.ReplaceAll(offer.SDP, "\n", "\n\t")) sdp, err := util.SerializeSessionDescription(offer) if err != nil { log.Printf("Error encoding probe message: %s", err.Error()) @@ -692,6 +721,7 @@ func (sf *SnowflakeProxy) checkNATType(config webrtc.Configuration, probeURL str log.Printf("Error encoding probe message: %s", err.Error()) return } + resp, err := probe.Post(probe.url.String(), bytes.NewBuffer(body)) if err != nil { log.Printf("error polling probe: %s", err.Error()) @@ -703,11 +733,13 @@ func (sf *SnowflakeProxy) checkNATType(config webrtc.Configuration, probeURL str log.Printf("Error reading probe response: %s", err.Error()) return } + answer, err := util.DeserializeSessionDescription(sdp) if err != nil { log.Printf("Error setting answer: %s", err.Error()) return } + err = pc.SetRemoteDescription(*answer) if err != nil { log.Printf("Error setting answer: %s", err.Error()) @@ -745,5 +777,4 @@ func (sf *SnowflakeProxy) checkNATType(config webrtc.Configuration, probeURL str if err := pc.Close(); err != nil { log.Printf("error calling pc.Close: %v", err) } - }