commit b631fc0bc82957e07ad2a18ca1747eced2d6ca6e Author: David Fifield david@bamsoftware.com Date: Sun Jun 12 00:25:39 2011 -0700
Do RTMFP client and proxy using the new RTMFPSocket and ProxyPair. --- swfcat.as | 294 ++++++++++++++++++++++++++++++++++--------------------------- 1 files changed, 163 insertions(+), 131 deletions(-)
diff --git a/swfcat.as b/swfcat.as index ac52ec8..6125187 100644 --- a/swfcat.as +++ b/swfcat.as @@ -5,26 +5,24 @@ package import flash.display.StageScaleMode; import flash.text.TextField; import flash.text.TextFormat; + import flash.net.Socket; + import flash.net.URLLoader; + import flash.net.URLLoaderDataFormat; + import flash.net.URLRequest; + import flash.net.URLRequestMethod; + import flash.net.URLVariables; import flash.events.Event; + import flash.events.IOErrorEvent; + import flash.events.SecurityErrorEvent; import flash.utils.setTimeout;
- import FacilitatorSocket; - import events.FacilitatorSocketEvent; - - import ProxyPair; - import RTMFPProxyPair; - import TCPProxyPair; - - import rtmfp.CirrusSocket; - import rtmfp.events.CirrusSocketEvent; - public class swfcat extends Sprite { /* Adobe's Cirrus server for RTMFP connections. The Cirrus key is defined at compile time by reading from the CIRRUS_KEY environment var. */ - private const DEFAULT_CIRRUS_ADDR:String = "rtmfp://p2p.rtmfp.net"; - private const DEFAULT_CIRRUS_KEY:String = RTMFP::CIRRUS_KEY; + private const CIRRUS_URL:String = "rtmfp://p2p.rtmfp.net"; + private const CIRRUS_KEY:String = RTMFP::CIRRUS_KEY;
private const DEFAULT_FACILITATOR_ADDR:Object = { host: "tor-facilitator.bamsoftware.com", @@ -47,17 +45,6 @@ package // Seconds. private const RATE_LIMIT_HISTORY:Number = 5.0;
- // Socket to Cirrus server - private var s_c:CirrusSocket; - // Socket to facilitator. - private var s_f:FacilitatorSocket; - // Handle local-remote traffic - private var p_p:ProxyPair; - - private var client_id:String; - private var proxy_pair_factory:Function; - private var relay_addr:Object; - private var proxy_mode:Boolean;
/* TextField for debug output. */ @@ -124,7 +111,10 @@ package return; }
- main(); + if (proxy_mode) + proxy_main(); + else + client_main(); }
/* Get an address structure from the given movie parameter, or the given @@ -141,129 +131,171 @@ package }
/* The main logic begins here, after start-up issues are taken care of. */ - private function main():void + private function proxy_main():void { - if (proxy_mode) { - establish_facilitator_connection(); - } else { - establish_cirrus_connection(); + var fac_url:String; + var loader:URLLoader; + + if (num_proxy_pairs >= MAX_NUM_PROXY_PAIRS) { + setTimeout(proxy_main, FACILITATOR_POLL_INTERVAL); + return; } - }
- private function establish_cirrus_connection():void - { - s_c = new CirrusSocket(); - s_c.addEventListener(CirrusSocketEvent.CONNECT_SUCCESS, function (e:CirrusSocketEvent):void { - puts("Cirrus: connected with id " + s_c.id + "."); - if (proxy_mode) { - start_proxy_pair(); - s_c.send_hello(client_id); - } else { - establish_facilitator_connection(); - } - }); - s_c.addEventListener(CirrusSocketEvent.CONNECT_FAILED, function (e:CirrusSocketEvent):void { - puts("Error: failed to connect to Cirrus."); - }); - s_c.addEventListener(CirrusSocketEvent.CONNECT_CLOSED, function (e:CirrusSocketEvent):void { - puts("Cirrus: closed connection."); + loader = new URLLoader(); + /* Get the x-www-form-urlencoded values. */ + loader.dataFormat = URLLoaderDataFormat.VARIABLES; + loader.addEventListener(Event.COMPLETE, fac_complete); + loader.addEventListener(IOErrorEvent.IO_ERROR, function (e:IOErrorEvent):void { + puts("Facilitator: I/O error: " + e.text + "."); }); - s_c.addEventListener(CirrusSocketEvent.HELLO_RECEIVED, function (e:CirrusSocketEvent):void { - puts("Cirrus: received hello from peer " + e.peer); - - /* don't bother if we already have a proxy going */ - if (p_p != null && p_p.connected) { - return; - } - - /* if we're in proxy mode, we should have already set - up a proxy pair */ - if (!proxy_mode) { - relay_addr = DEFAULT_TOR_CLIENT_ADDR; - proxy_pair_factory = rtmfp_proxy_pair_factory; - start_proxy_pair(); - s_c.send_hello(e.peer); - } else { - num_proxy_pairs++; - badge.proxy_begin(); - } - - p_p.client = {peer: e.peer, stream: e.stream}; + loader.addEventListener(SecurityErrorEvent.SECURITY_ERROR, function (e:SecurityErrorEvent):void { + puts("Facilitator: security error: " + e.text + "."); }); - - s_c.connect(DEFAULT_CIRRUS_ADDR, DEFAULT_CIRRUS_KEY); + + fac_url = "http://" + encodeURIComponent(fac_addr.host) + + ":" + encodeURIComponent(fac_addr.port) + "/"; + puts("Facilitator: connecting to " + fac_url + "."); + loader.load(new URLRequest(fac_url)); }
- private function establish_facilitator_connection():void + private function fac_complete(e:Event):void { - s_f = new FacilitatorSocket(fac_addr.host, fac_addr.port); - s_f.addEventListener(FacilitatorSocketEvent.CONNECT_FAILED, function (e:Event):void { - puts("Facilitator: connect failed."); - setTimeout(establish_facilitator_connection, FACILITATOR_POLL_INTERVAL); - }); - - if (proxy_mode) { - s_f.addEventListener(FacilitatorSocketEvent.REGISTRATION_RECEIVED, function (e:FacilitatorSocketEvent):void { - var client_addr:Object = parse_addr_spec(e.client); - relay_addr = parse_addr_spec(e.relay); - if (client_addr == null) { - puts("Facilitator: got registration " + e.client); - proxy_pair_factory = rtmfp_proxy_pair_factory; - if (s_c == null || !s_c.connected) { - client_id = e.client; - establish_cirrus_connection(); - } else { - start_proxy_pair(); - s_c.send_hello(e.client); - } - } else { - proxy_pair_factory = tcp_proxy_pair_factory; - start_proxy_pair(); - p_p.client = client_addr; - } - }); - s_f.addEventListener(FacilitatorSocketEvent.REGISTRATIONS_EMPTY, function (e:Event):void { - puts("Facilitator: no registrations available."); - setTimeout(establish_facilitator_connection, FACILITATOR_POLL_INTERVAL); - }); - puts("Facilitator: getting registration."); - s_f.get_registration(); - } else { - s_f.addEventListener(FacilitatorSocketEvent.REGISTRATION_FAILED, function (e:Event):void { - puts("Facilitator: registration failed."); - setTimeout(establish_facilitator_connection, FACILITATOR_POLL_INTERVAL); - }); - puts("Facilitator: posting registration."); - s_f.post_registration(s_c.id); + var loader:URLLoader; + var client_spec:String; + var relay_spec:String; + var proxy_pair:Object; + + setTimeout(proxy_main, FACILITATOR_POLL_INTERVAL); + + loader = e.target as URLLoader; + client_spec = loader.data.client; + if (client_spec == "") { + puts("No clients."); + return; + } else if (!client_spec) { + puts("Error: missing "client" in response."); + return; } + relay_spec = loader.data.relay; + if (!relay_spec) { + puts("Error: missing "relay" in response."); + return; + } + puts("Facilitator: got client:"" + client_spec + "" " + + "relay:"" + relay_spec + ""."); + + try { + proxy_pair = make_proxy_pair(client_spec, relay_spec); + } catch (e:ArgumentError) { + puts("Error: " + e); + return; + } + proxy_pair.addEventListener(Event.COMPLETE, function(e:Event):void { + proxy_pair.log("Complete."); + num_proxy_pairs--; + badge.proxy_end(); + }); + proxy_pair.connect(); + + num_proxy_pairs++; + badge.proxy_begin(); } - - private function start_proxy_pair():void + + private function client_main():void { - p_p = proxy_pair_factory(); - p_p.addEventListener(Event.CONNECT, function (e:Event):void { - puts("ProxyPair: connected!"); + var rs:RTMFPSocket; + + rs = new RTMFPSocket(CIRRUS_URL, CIRRUS_KEY); + rs.addEventListener(Event.COMPLETE, function (e:Event):void { + puts("Got RTMFP id " + rs.id); + register(rs); }); - p_p.addEventListener(Event.CLOSE, function (e:Event):void { - puts("ProxyPair: connection closed."); - p_p = null; - if (proxy_mode) { - num_proxy_pairs--; - badge.proxy_end(); - } - establish_facilitator_connection(); + rs.addEventListener(RTMFPSocket.ACCEPT_EVENT, client_accept); + + rs.listen(); + } + + private function client_accept(e:Event):void { + var rs:RTMFPSocket; + var s_t:Socket; + var proxy_pair:ProxyPair; + + rs = e.target as RTMFPSocket; + s_t = new Socket(); + + puts("Got RTMFP connection from " + rs.peer_id); + + proxy_pair = new ProxyPair(this, rs, function ():void { + /* Do nothing; already connected. */ + }, s_t, function ():void { + s_t.connect(DEFAULT_TOR_CLIENT_ADDR.host, DEFAULT_TOR_CLIENT_ADDR.port); }); - p_p.relay = relay_addr; + proxy_pair.connect(); } - - private function rtmfp_proxy_pair_factory():ProxyPair - { - return new RTMFPProxyPair(this, s_c, s_c.local_stream_name); + + private function register(rs:RTMFPSocket):void { + var fac_url:String; + var loader:URLLoader; + var request:URLRequest; + var variables:URLVariables; + + loader = new URLLoader(); + loader.addEventListener(Event.COMPLETE, function (e:Event):void { + puts("Facilitator: registered."); + }); + loader.addEventListener(SecurityErrorEvent.SECURITY_ERROR, function (e:SecurityErrorEvent):void { + puts("Facilitator: security error: " + e.text + "."); + rs.close(); + }); + loader.addEventListener(IOErrorEvent.IO_ERROR, function (e:IOErrorEvent):void { + puts("Facilitator: I/O error: " + e.text + "."); + rs.close(); + }); + + fac_url = "http://" + encodeURIComponent(fac_addr.host) + + ":" + encodeURIComponent(fac_addr.port) + "/"; + request = new URLRequest(fac_url); + request.method = URLRequestMethod.POST; + request.data = new URLVariables; + request.data["client"] = rs.id; + + puts("Facilitator: connecting to " + fac_url + "."); + loader.load(request); } - - private function tcp_proxy_pair_factory():ProxyPair + + private function make_proxy_pair(client_spec:String, relay_spec:String):ProxyPair { - return new TCPProxyPair(this); + var addr_c:Object; + var addr_r:Object; + var s_c:*; + var s_r:Socket; + + addr_r = swfcat.parse_addr_spec(relay_spec); + if (!addr_r) + throw new ArgumentError("Relay spec must be in the form "host:port"."); + + addr_c = swfcat.parse_addr_spec(client_spec); + if (addr_c) { + s_c = new Socket(); + s_r = new Socket(); + return new ProxyPair(this, s_c, function ():void { + s_c.connect(addr_c.host, addr_c.port); + }, s_r, function ():void { + s_r.connect(addr_r.host, addr_r.port); + }); + } + + if (client_spec.match(/^[0-9A-Fa-f]{64}$/)) { + s_c = new RTMFPSocket(CIRRUS_URL, CIRRUS_KEY); + s_r = new Socket(); + return new ProxyPair(this, s_c, function ():void { + s_c.connect(client_spec); + }, s_r, function ():void { + s_r.connect(addr_r.host, addr_r.port); + }); + } + + throw new ArgumentError("Can't parse client spec "" + client_spec + ""."); }
/* Parse an address in the form "host:port". Returns an Object with