commit f2e878f6e17965b338094c0eb28106b4ae55633f Merge: c16f7d9 dd7cfb9 Author: David Fifield david@bamsoftware.com Date: Fri Jun 10 06:41:33 2011 -0700
Merge branch 'master' into rtmfp
Conflicts: README facilitator.py swfcat.as
FacilitatorSocket.as | 39 +++++------ ProxyPair.as | 27 +++++---- README | 16 +++-- SQSProxyPair.as | 17 +---- TCPProxyPair.as | 17 +---- connector.py | 2 +- design.txt | 28 ++++---- events/FacilitatorSocketEvent.as | 4 +- facilitator.py | 130 +++++++++++++++++++++++++------------- swfcat.as | 30 +-------- 10 files changed, 157 insertions(+), 153 deletions(-)
diff --cc FacilitatorSocket.as index 3930fea,0000000..70325c1 mode 100644,000000..100644 --- a/FacilitatorSocket.as +++ b/FacilitatorSocket.as @@@ -1,103 -1,0 +1,100 @@@ +package +{ + import flash.events.Event; + import flash.events.EventDispatcher; + import flash.events.HTTPStatusEvent; + import flash.events.IOErrorEvent; + import flash.events.SecurityErrorEvent; + import flash.net.URLLoader; + import flash.net.URLLoaderDataFormat; + import flash.net.URLRequest; + import flash.net.URLRequestMethod; + import flash.net.URLVariables; + + import events.FacilitatorSocketEvent; + + [Event(name=FacilitatorSocketEvent.CONNECT_FAILED, type="com.flashproxy.rtmfp.events.FacilitatorSocketEvent")] + [Event(name=FacilitatorSocketEvent.REGISTRATION_FAILED, type="com.flashproxy.rtmfp.events.FacilitatorSocketEvent")] + [Event(name=FacilitatorSocketEvent.REGISTRATION_RECEIVED, type="com.flashproxy.rtmfp.events.FacilitatorSocketEvent")] + [Event(name=FacilitatorSocketEvent.REGISTRATIONS_EMPTY, type="com.flashproxy.rtmfp.events.FacilitatorSocketEvent")] + public class FacilitatorSocket extends EventDispatcher + { + private var host:String; + private var port:uint; + + public function FacilitatorSocket(host:String, port:uint) + { + this.host = host; + this.port = port; + } + + public function get_registration():void + { + make_request(URLRequestMethod.GET); + } + + public function post_registration(registration_data:String):void + { + var data:URLVariables = new URLVariables(); + data.client = registration_data; + make_request(URLRequestMethod.POST, data); + } + + private function fail():void + { + dispatchEvent(new FacilitatorSocketEvent(FacilitatorSocketEvent.CONNECT_FAILED)); + } + + private function make_request(method:String, data:URLVariables = null):void + { - var request:URLRequest = new URLRequest(url) ++ var request:URLRequest; ++ var loader:URLLoader; ++ ++ loader = new URLLoader(); ++ /* Get the x-www-form-encoded-values. */ ++ loader.dataFormat = URLLoaderDataFormat.VARIABLES; ++ loader.addEventListener(Event.COMPLETE, on_complete_event); ++ loader.addEventListener(SecurityErrorEvent.SECURITY_ERROR, on_security_error_event); ++ loader.addEventListener(IOErrorEvent.IO_ERROR, on_io_error_event); ++ ++ request = new URLRequest(url); + request.data = data; + request.method = method; - - var url_loader:URLLoader = new URLLoader(); - url_loader = new URLLoader(); - url_loader.dataFormat = URLLoaderDataFormat.VARIABLES; - - url_loader.addEventListener(Event.COMPLETE, on_complete_event); - url_loader.addEventListener(HTTPStatusEvent.HTTP_STATUS, on_http_status_event); - url_loader.addEventListener(SecurityErrorEvent.SECURITY_ERROR, on_security_error_event); - url_loader.addEventListener(IOErrorEvent.IO_ERROR, on_io_error_event); - - url_loader.load(request); ++ loader.load(request); + } + + private function on_complete_event(event:Event):void + { + try { + var client_id:String = event.target.data.client; - if (client_id == "Registration list empty") { ++ var relay_addr:String = event.target.data.relay; ++ if (client_id == "") { + dispatchEvent(new FacilitatorSocketEvent(FacilitatorSocketEvent.REGISTRATIONS_EMPTY)); + } else { - dispatchEvent(new FacilitatorSocketEvent(FacilitatorSocketEvent.REGISTRATION_RECEIVED, client_id)); ++ dispatchEvent(new FacilitatorSocketEvent(FacilitatorSocketEvent.REGISTRATION_RECEIVED, client_id, relay_addr)); + } + } catch (e:Error) { + /* error is thrown for POST when we don't care about + the response anyways */ + } + + event.target.close() + } + - private function on_http_status_event(event:HTTPStatusEvent):void - { - /* empty for now */ - } - + private function on_io_error_event(event:IOErrorEvent):void + { + fail(); + } + + private function on_security_error_event(event:SecurityErrorEvent):void + { + fail(); + } + + private function get url():String + { - return "http://" + host + ":" + port; ++ return "http://" + encodeURIComponent(host) ++ + ":" + encodeURIComponent(port.toString()) + "/"; + } + } - } ++} diff --cc ProxyPair.as index 75ae901,0000000..6ca60ef mode 100644,000000..100644 --- a/ProxyPair.as +++ b/ProxyPair.as @@@ -1,273 -1,0 +1,276 @@@ +package +{ + import flash.errors.IllegalOperationError; + import flash.events.Event; + import flash.events.EventDispatcher; + import flash.events.IOErrorEvent; + import flash.events.ProgressEvent; + import flash.events.SecurityErrorEvent; ++ import flash.events.TextEvent; + import flash.net.Socket; + import flash.utils.ByteArray; + import flash.utils.clearTimeout; + import flash.utils.setTimeout; + + import swfcat; + + public class ProxyPair extends EventDispatcher + { + private var ui:swfcat; + + protected var client_addr:Object; + + /* Not defined here: subclasses should define their own + * protected var client_socket:Object; + */ + + private var c2r_schedule:Array; + + private var relay_addr:Object; + private var relay_socket:Socket; + private var r2c_schedule:Array; + + // Bytes per second. Set to undefined to disable limit. + private const RATE_LIMIT:Number = undefined; //10000; + // Seconds. + private const RATE_LIMIT_HISTORY:Number = 5.0; + + private var rate_limit:RateLimit; + + // Callback id. + private var flush_id:uint; + + public function ProxyPair(self:ProxyPair, ui:swfcat) + { + if (self != this) { + //only a subclass can pass a valid reference to self + throw new IllegalOperationError("ProxyPair cannot be instantiated directly."); + } + + this.ui = ui; + this.c2r_schedule = new Array(); + this.r2c_schedule = new Array(); + + if (RATE_LIMIT) + rate_limit = new BucketRateLimit(RATE_LIMIT * RATE_LIMIT_HISTORY, RATE_LIMIT_HISTORY); + else + rate_limit = new RateUnlimit(); + + setup_relay_socket(); + + /* client_socket setup should be taken */ + /* care of in the subclass constructor */ + } + + public function close():void + { + if (relay_socket != null && relay_socket.connected) { + relay_socket.close(); + } + + /* subclasses should override to close */ + /* their client_socket according to impl. */ + } + + public function get connected():Boolean + { + return (relay_socket != null && relay_socket.connected); + + /* subclasses should override to check */ + /* connectivity of their client_socket. */ + } + + public function set client(client_addr:Object):void + { + /* subclasses should override to */ + /* connect the client_socket here */ + } + + public function set relay(relay_addr:Object):void + { + this.relay_addr = relay_addr; + log("Relay: connecting to " + relay_addr.host + ":" + relay_addr.port + "."); + relay_socket.connect(relay_addr.host, relay_addr.port); + } + + protected function transfer_bytes(src:Object, dst:Object, num_bytes:uint):void + { + /* No-op: must be overridden by subclasses */ + } ++ ++ protected function socket_error(message:String):Function ++ { ++ return function(e:Event):void { ++ if (e is TextEvent) ++ log(message + ": " + (e as TextEvent).text + "."); ++ else ++ log(message + "."); ++ close(); ++ }; ++ } + + private function setup_relay_socket():void + { + relay_socket = new Socket(); + relay_socket.addEventListener(Event.CONNECT, function (e:Event):void { + log("Relay: connected to " + relay_addr.host + ":" + relay_addr.port + "."); + if (connected) { + dispatchEvent(new Event(Event.CONNECT)); + } + }); - relay_socket.addEventListener(Event.CLOSE, function (e:Event):void { - log("Relay: closed connection."); - close(); - }); - relay_socket.addEventListener(IOErrorEvent.IO_ERROR, function (e:IOErrorEvent):void { - log("Relay: I/O error: " + e.text + "."); - close(); - }); - relay_socket.addEventListener(SecurityErrorEvent.SECURITY_ERROR, function (e:SecurityErrorEvent):void { - log("Relay: security error: " + e.text + "."); - close(); - }); ++ relay_socket.addEventListener(Event.CLOSE, socket_error("Relay: closed")); ++ relay_socket.addEventListener(IOErrorEvent.IO_ERROR, socket_error("Relay: I/O error")) ++ relay_socket.addEventListener(SecurityErrorEvent.SECURITY_ERROR, socket_error("Relay: security error")) + relay_socket.addEventListener(ProgressEvent.SOCKET_DATA, relay_to_client); + } + + protected function client_to_relay(e:ProgressEvent):void + { + c2r_schedule.push(e.bytesLoaded); + flush(); + } + + private function relay_to_client(e:ProgressEvent):void + { + r2c_schedule.push(e.bytesLoaded); + flush(); + } + + /* Send as much data as the rate limit currently allows. */ + private function flush():void + { + if (flush_id) + clearTimeout(flush_id); + flush_id = undefined; + + if (!connected) + /* Can't do anything until connected. */ + return; + + while (!rate_limit.is_limited() && (c2r_schedule.length > 0 || r2c_schedule.length > 0)) { + var num_bytes:uint; + + if (c2r_schedule.length > 0) { + num_bytes = c2r_schedule.shift(); + transfer_bytes(null, relay_socket, num_bytes); + rate_limit.update(num_bytes); + } + + if (r2c_schedule.length > 0) { + num_bytes = r2c_schedule.shift(); + transfer_bytes(relay_socket, null, num_bytes); + rate_limit.update(num_bytes); + } + } + + /* Call again when safe, if necessary. */ + if (c2r_schedule.length > 0 || r2c_schedule.length > 0) + flush_id = setTimeout(flush, rate_limit.when() * 1000); + } + + /* Helper function to write output to the + * swfcat console. Set as protected for + * subclasses */ + protected function log(s:String):void + { + ui.puts(s); + } + } +} + +import flash.utils.getTimer; + +class RateLimit +{ + public function RateLimit() + { + } + + public function update(n:Number):Boolean + { + return true; + } + + public function when():Number + { + return 0.0; + } + + public function is_limited():Boolean + { + return false; + } +} + +class RateUnlimit extends RateLimit +{ + public function RateUnlimit() + { + } + + public override function update(n:Number):Boolean + { + return true; + } + + public override function when():Number + { + return 0.0; + } + + public override function is_limited():Boolean + { + return false; + } +} + +class BucketRateLimit extends RateLimit +{ + private var amount:Number; + private var capacity:Number; + private var time:Number; + private var last_update:uint; + + public function BucketRateLimit(capacity:Number, time:Number) + { + this.amount = 0.0; + /* capacity / time is the rate we are aiming for. */ + this.capacity = capacity; + this.time = time; + this.last_update = getTimer(); + } + + private function age():void + { + var now:uint; + var delta:Number; + + now = getTimer(); + delta = (now - last_update) / 1000.0; + last_update = now; + + amount -= delta * capacity / time; + if (amount < 0.0) + amount = 0.0; + } + + public override function update(n:Number):Boolean + { + age(); + amount += n; + + return amount <= capacity; + } + + public override function when():Number + { + age(); + return (amount - capacity) / (capacity / time); + } + + public override function is_limited():Boolean + { + age(); + return amount > capacity; + } +} diff --cc README index 05cd676,b4b1d0e..0e2a0eb --- a/README +++ b/README @@@ -102,26 -81,25 +102,30 @@@ changing pool of addresses
== Design notes
- The Tor relay address is hardcoded in rtmfpcat.as. It could be any relay, - with the caveat that the server also has to serve a crossdomain policy. + Any Tor relay can be used, as long as it also serves a crossdomain + policy.
- Client rtmfpcats register with the facilitator by sending an HTTP-like message: -The Tor client needs to be able to listen for an incoming connection, -which generally means not being behind NAT. - + Clients register with the facilitator by sending an HTTP message: POST / HTTP/1.0\r\n \r\n - client=:9000 + client=<CIRRUS-CLIENT-ID> + +The <CIRRUS-CLIENT-ID> is returned by Adobe's developer Cirrus server +as soon as the rtfmpcat can connect to it. Each rtmfpcat needs to connect +to a server like this to get one of these client IDs, since the Cirrus +server uses these to coordinate RTMFP connections (including NAT punching). +The need to communicate with a Cirrus server in addition to a facilitator is +one of the major weaknesses of this design. + - The proxy rtmfpcat gets a client id using something like HTTP: + The Flash proxy also gets a client address over HTTP: GET / HTTP/1.0\r\n \r\n - The server sends back an id specification (no HTTP header): - 51ae8ed56c3705e4ad3755cdd3328c27720984778bfff71d9ec9f2647331d39b + The server sends back address specifications of a client and a relay in + an HTTP respose. + HTTP/1.0 200 OK\r\n + Server: BaseHTTP/0.3 Python/2.5.2\r\n + \r\n - client=1.2.3.4%3A9000&relay=9.9.9.9:9001 ++ client=<CIRRUS-CLIENT-ID>&relay=9.9.9.9:9001
== ActionScript programming
diff --cc SQSProxyPair.as index 54f763d,0000000..20a6a65 mode 100644,000000..100644 --- a/SQSProxyPair.as +++ b/SQSProxyPair.as @@@ -1,86 -1,0 +1,77 @@@ +package +{ + import flash.events.Event; + import flash.events.EventDispatcher; + import flash.events.IOErrorEvent; + import flash.events.ProgressEvent; + import flash.events.SecurityErrorEvent; + import flash.net.Socket; + import flash.utils.ByteArray; + + public class SQSProxyPair extends ProxyPair + { + private var client_socket:Socket; + + public function SQSProxyPair(ui:swfcat) + { + super(this, ui); + + log("Starting SQS proxy pair"); + setup_client_socket(); + } + + override public function set client(client_addr:Object):void + { + this.client_addr = client_addr; + log("Client: connecting to " + client_addr.host + ":" + client_addr.port + "."); + client_socket.connect(client_addr.host, client_addr.port); + } + + override public function close():void + { + super.close(); + if (client_socket != null && client_socket.connected) { + client_socket.close(); + } + dispatchEvent(new Event(Event.CLOSE)); + } + + override public function get connected():Boolean + { + return (super.connected && client_socket != null && client_socket.connected); + } + + override protected function transfer_bytes(src:Object, dst:Object, num_bytes:uint):void + { + var bytes:ByteArray = new ByteArray(); + + if (src == null) { + src = client_socket; + } + + if (dst == null) { + dst = client_socket; + } + + Socket(src).readBytes(bytes, 0, num_bytes); + log("SQSProxyPair: transferring " + num_bytes + " bytes."); + Socket(dst).writeBytes(bytes); + } + + private function setup_client_socket():void + { + client_socket = new Socket(); + + client_socket.addEventListener(Event.CONNECT, function (e:Event):void { + log("Client: connected to " + client_addr.host + ":" + client_addr.port + "."); + if (connected) { + dispatchEvent(new Event(Event.CONNECT)); + } + }); - client_socket.addEventListener(Event.CLOSE, function (e:Event):void { - log("Client: closed."); - close(); - }); - client_socket.addEventListener(IOErrorEvent.IO_ERROR, function (e:IOErrorEvent):void { - log("Client: I/O error: " + e.text + "."); - close(); - }); - client_socket.addEventListener(SecurityErrorEvent.SECURITY_ERROR, function (e:SecurityErrorEvent):void { - log("Client: security error: " + e.text + "."); - close(); - }); ++ client_socket.addEventListener(Event.CLOSE, socket_error("Client: closed")); ++ client_socket.addEventListener(IOErrorEvent.IO_ERROR, socket_error("Client: I/O error")); ++ client_socket.addEventListener(SecurityErrorEvent.SECURITY_ERROR, socket_error("Client: security error")) + client_socket.addEventListener(ProgressEvent.SOCKET_DATA, client_to_relay); + } + } - } ++} diff --cc TCPProxyPair.as index b81af3e,0000000..de18dd6 mode 100644,000000..100644 --- a/TCPProxyPair.as +++ b/TCPProxyPair.as @@@ -1,86 -1,0 +1,77 @@@ +package +{ + import flash.events.Event; + import flash.events.EventDispatcher; + import flash.events.IOErrorEvent; + import flash.events.ProgressEvent; + import flash.events.SecurityErrorEvent; + import flash.net.Socket; + import flash.utils.ByteArray; + + public class TCPProxyPair extends ProxyPair + { + private var client_socket:Socket; + + public function TCPProxyPair(ui:swfcat) + { + super(this, ui); + + log("Starting TCP proxy pair"); + setup_client_socket(); + } + + override public function set client(client_addr:Object):void + { + this.client_addr = client_addr; + log("Client: connecting to " + client_addr.host + ":" + client_addr.port + "."); + client_socket.connect(client_addr.host, client_addr.port); + } + + override public function close():void + { + super.close(); + if (client_socket != null && client_socket.connected) { + client_socket.close(); + } + dispatchEvent(new Event(Event.CLOSE)); + } + + override public function get connected():Boolean + { + return (super.connected && client_socket != null && client_socket.connected); + } + + override protected function transfer_bytes(src:Object, dst:Object, num_bytes:uint):void + { + var bytes:ByteArray = new ByteArray(); + + if (src == null) { + src = client_socket; + } + + if (dst == null) { + dst = client_socket; + } + + Socket(src).readBytes(bytes, 0, num_bytes); + log("TCPProxyPair: transferring " + num_bytes + " bytes."); + Socket(dst).writeBytes(bytes); + } + + private function setup_client_socket():void + { + client_socket = new Socket(); + + client_socket.addEventListener(Event.CONNECT, function (e:Event):void { + log("Client: connected to " + client_addr.host + ":" + client_addr.port + "."); + if (connected) { + dispatchEvent(new Event(Event.CONNECT)); + } + }); - client_socket.addEventListener(Event.CLOSE, function (e:Event):void { - log("Client: closed."); - close(); - }); - client_socket.addEventListener(IOErrorEvent.IO_ERROR, function (e:IOErrorEvent):void { - log("Client: I/O error: " + e.text + "."); - close(); - }); - client_socket.addEventListener(SecurityErrorEvent.SECURITY_ERROR, function (e:SecurityErrorEvent):void { - log("Client: security error: " + e.text + "."); - close(); - }); ++ client_socket.addEventListener(Event.CLOSE, socket_error("Client: closed")); ++ client_socket.addEventListener(IOErrorEvent.IO_ERROR, socket_error("Client: I/O error")); ++ client_socket.addEventListener(SecurityErrorEvent.SECURITY_ERROR, socket_error("Client: security error")) + client_socket.addEventListener(ProgressEvent.SOCKET_DATA, client_to_relay); + } + } - } ++} diff --cc events/FacilitatorSocketEvent.as index 9fc478b,0000000..5787309 mode 100644,000000..100644 --- a/events/FacilitatorSocketEvent.as +++ b/events/FacilitatorSocketEvent.as @@@ -1,22 -1,0 +1,24 @@@ +package events +{ + import flash.events.Event; + + public class FacilitatorSocketEvent extends Event + { + public static const CONNECT_CLOSED:String = "connectClosed"; + public static const CONNECT_FAILED:String = "connectFailed"; + public static const CONNECT_SUCCESS:String = "connectSuccess"; + public static const REGISTRATION_RECEIVED:String = "registrationReceived"; + public static const REGISTRATION_FAILED:String = "registrationFailed"; + public static const REGISTRATIONS_EMPTY:String = "registrationsEmpty"; + + public var client:String; ++ public var relay:String; + - public function FacilitatorSocketEvent(type:String, client:String = null, bubbles:Boolean = false, cancelable:Boolean = false) ++ public function FacilitatorSocketEvent(type:String, client:String = null, relay:String = null, bubbles:Boolean = false, cancelable:Boolean = false) + { + super(type, bubbles, cancelable); + this.client = client; ++ this.relay = relay; + } + } +} diff --cc swfcat.as index 1bb2ff5,4aa430b..dd2396d --- a/swfcat.as +++ b/swfcat.as @@@ -5,64 -5,33 +5,56 @@@ packag import flash.display.StageScaleMode; import flash.text.TextField; import flash.text.TextFormat; - import flash.net.Socket; - import flash.net.URLLoader; - import flash.net.URLLoaderDataFormat; - import flash.net.URLRequest; import flash.events.Event; - import flash.events.IOErrorEvent; - import flash.events.ProgressEvent; - import flash.events.SecurityErrorEvent; - import flash.utils.ByteArray; import flash.utils.setTimeout;
+ import FacilitatorSocket; + import events.FacilitatorSocketEvent; + + import ProxyPair; + import RTMFPProxyPair; + import SQSProxyPair; + import TCPProxyPair; + + import rtmfp.CirrusSocket; + import rtmfp.events.CirrusSocketEvent; + public class swfcat extends Sprite { + /* Adobe's Cirrus server for RTMFP connections. + The Cirrus key is defined at compile time by + reading from the CIRRUS_KEY environment var. */ + private const DEFAULT_CIRRUS_ADDR:String = "rtmfp://p2p.rtmfp.net"; + private const DEFAULT_CIRRUS_KEY:String = RTMFP::CIRRUS_KEY; + - /* David's facilitator. */ private const DEFAULT_FACILITATOR_ADDR:Object = { - host: "173.255.221.44", + host: "tor-facilitator.bamsoftware.com", port: 9002 }; - - private const MAX_NUM_PROXY_PAIRS:uint = 1; - - // Milliseconds. - private const FACILITATOR_POLL_INTERVAL:int = 10000; - - // Bytes per second. Set to undefined to disable limit. - public const RATE_LIMIT:Number = undefined; - // Seconds. - private const RATE_LIMIT_HISTORY:Number = 5.0; + + /* Default Tor client to use in case of RTMFP connection */ + private const DEFAULT_TOR_CLIENT_ADDR:Object = { + host: "127.0.0.1", + port: 9002 + }; + - /* David's relay (nickname 3VXRyxz67OeRoqHn) that also serves a - crossdomain policy. */ - private const DEFAULT_TOR_RELAY_ADDR:Object = { - host: "173.255.221.44", - port: 9001 - }; - + /* Poll facilitator every 10 sec */ + private const DEFAULT_FAC_POLL_INTERVAL:uint = 10000; + + // Socket to Cirrus server + private var s_c:CirrusSocket; + // Socket to facilitator. + private var s_f:FacilitatorSocket; + // Handle local-remote traffic + private var p_p:ProxyPair; + + private var client_id:String; + private var proxy_pair_factory:Function; + + private var proxy_pairs:Array; + + private var debug_mode:Boolean; + private var proxy_mode:Boolean;
/* TextField for debug output. */ private var output_text:TextField; @@@ -97,57 -133,24 +89,39 @@@ private function loaderinfo_complete(e:Event):void { var fac_spec:String; + var relay_spec:String;
- puts("Parameters loaded."); - - if (this.loaderInfo.parameters["debug"]) + debug_mode = (this.loaderInfo.parameters["debug"] != null) + proxy_mode = (this.loaderInfo.parameters["proxy"] != null); + if (proxy_mode && !debug_mode) { + badge = new InternetFreedomBadge(this); + badge.display(); + } else { + output_text = new TextField(); + output_text.width = stage.stageWidth; + output_text.height = stage.stageHeight; + output_text.background = true; + output_text.backgroundColor = 0x001f0f; + output_text.textColor = 0x44cc44; addChild(output_text); - else { - addChild(new BadgeImage()); - /* Tried unsuccessfully to add counter to badge. */ - /* For now, need two addChilds :( */ - addChild(tot_client_count_tf); - addChild(cur_client_count_tf); } - - fac_addr = get_param_addr("facilitator", DEFAULT_FACILITATOR_ADDR); - if (!fac_addr) { - puts("Error: Facilitator spec must be in the form "host:port"."); - return; + + puts("Starting: parameters loaded."); + + /* TODO: use this to have multiple proxies going at once */ + proxy_pairs = new Array(); + + fac_spec = this.loaderInfo.parameters["facilitator"]; + if (fac_spec) { + puts("Facilitator spec: "" + fac_spec + """); + fac_addr = parse_addr_spec(fac_spec); + if (!fac_addr) { + puts("Error: Facilitator spec must be in the form "host:port"."); + return; + } + } else { + fac_addr = DEFAULT_FACILITATOR_ADDR; } - - /* TODO: modify this for the client so that it can specify - a relay for the proxy to use */ - relay_spec = this.loaderInfo.parameters["relay"]; - if (relay_spec) { - puts("Relay spec: "" + relay_spec + """); - relay_addr = parse_addr_spec(relay_spec); - if (!relay_addr) { - puts("Error: Relay spec must be in the form "host:port"."); - return; - } - } else { - if (proxy_mode) { - relay_addr = DEFAULT_TOR_RELAY_ADDR; - } else { - relay_addr = DEFAULT_TOR_CLIENT_ADDR; - } - }
main(); } @@@ -155,132 -158,98 +129,134 @@@ /* The main logic begins here, after start-up issues are taken care of. */ private function main():void { - var fac_url:String; - var loader:URLLoader; - - if (num_proxy_pairs >= MAX_NUM_PROXY_PAIRS) { - setTimeout(main, FACILITATOR_POLL_INTERVAL); - return; + if (proxy_mode) { + establish_facilitator_connection(); + } else { + establish_cirrus_connection(); } + }
- loader = new URLLoader(); - /* Get the x-www-form-urlencoded values. */ - loader.dataFormat = URLLoaderDataFormat.VARIABLES; - loader.addEventListener(Event.COMPLETE, fac_complete); - loader.addEventListener(IOErrorEvent.IO_ERROR, function (e:IOErrorEvent):void { - puts("Facilitator: I/O error: " + e.text + "."); + private function establish_cirrus_connection():void + { + s_c = new CirrusSocket(); + s_c.addEventListener(CirrusSocketEvent.CONNECT_SUCCESS, function (e:CirrusSocketEvent):void { + puts("Cirrus: connected with id " + s_c.id + "."); + if (proxy_mode) { + start_proxy_pair(); + s_c.send_hello(client_id); + } else { + establish_facilitator_connection(); + } }); - loader.addEventListener(SecurityErrorEvent.SECURITY_ERROR, function (e:SecurityErrorEvent):void { - puts("Facilitator: security error: " + e.text + "."); + s_c.addEventListener(CirrusSocketEvent.CONNECT_FAILED, function (e:CirrusSocketEvent):void { + puts("Error: failed to connect to Cirrus."); }); - - fac_url = "http://" + encodeURIComponent(fac_addr.host) - + ":" + encodeURIComponent(fac_addr.port) + "/"; - puts("Facilitator: connecting to " + fac_url + "."); - loader.load(new URLRequest(fac_url)); + s_c.addEventListener(CirrusSocketEvent.CONNECT_CLOSED, function (e:CirrusSocketEvent):void { + puts("Cirrus: closed connection."); + }); + s_c.addEventListener(CirrusSocketEvent.HELLO_RECEIVED, function (e:CirrusSocketEvent):void { + puts("Cirrus: received hello from peer " + e.peer); + + /* don't bother if we already have a proxy going */ + if (p_p != null && p_p.connected) { + return; + } + + /* if we're in proxy mode, we should have already set + up a proxy pair */ + if (!proxy_mode) { ++ relay_addr = DEFAULT_TOR_CLIENT_ADDR; + proxy_pair_factory = rtmfp_proxy_pair_factory; + start_proxy_pair(); + s_c.send_hello(e.peer); + } else if (!debug_mode && badge != null) { + badge.total_proxy_pairs++; + badge.num_proxy_pairs++; + } + + p_p.client = {peer: e.peer, stream: e.stream}; + }); + + s_c.connect(DEFAULT_CIRRUS_ADDR, DEFAULT_CIRRUS_KEY); }
- private function fac_complete(e:Event):void + private function establish_facilitator_connection():void { - var loader:URLLoader; - var client_spec:String; - var client_addr:Object; - var relay_spec:String; - var relay_addr:Object; - var proxy_pair:Object; - - setTimeout(main, FACILITATOR_POLL_INTERVAL); - - loader = e.target as URLLoader; - client_spec = loader.data.client; - if (client_spec == "") { - puts("No clients."); - return; - } else if (!client_spec) { - puts("Error: missing "client" in response."); - return; - } - relay_spec = loader.data.relay; - if (!relay_spec) { - puts("Error: missing "relay" in response."); - return; - } - puts("Facilitator: got client:"" + client_spec + "" " - + "relay:"" + relay_spec + ""."); - - client_addr = parse_addr_spec(client_spec); - if (!client_addr) { - puts("Error: Client spec must be in the form "host:port"."); - return; - } - relay_addr = parse_addr_spec(relay_spec); - if (!client_addr) { - puts("Error: Relay spec must be in the form "host:port"."); - return; + s_f = new FacilitatorSocket(fac_addr.host, fac_addr.port); + s_f.addEventListener(FacilitatorSocketEvent.CONNECT_FAILED, function (e:Event):void { + puts("Facilitator: connect failed."); + setTimeout(establish_facilitator_connection, DEFAULT_FAC_POLL_INTERVAL); + }); + + if (proxy_mode) { + s_f.addEventListener(FacilitatorSocketEvent.REGISTRATION_RECEIVED, function (e:FacilitatorSocketEvent):void { + var client_addr:Object = parse_addr_spec(e.client); ++ relay_addr = parse_addr_spec(e.relay); + if (client_addr == null) { + puts("Facilitator: got registration " + e.client); + proxy_pair_factory = rtmfp_proxy_pair_factory; + if (s_c == null || !s_c.connected) { + client_id = e.client; + establish_cirrus_connection(); + } else { + start_proxy_pair(); + s_c.send_hello(e.client); + } + } else { + proxy_pair_factory = tcp_proxy_pair_factory; + start_proxy_pair(); + p_p.client = client_addr; + } + }); + s_f.addEventListener(FacilitatorSocketEvent.REGISTRATIONS_EMPTY, function (e:Event):void { + puts("Facilitator: no registrations available."); + setTimeout(establish_facilitator_connection, DEFAULT_FAC_POLL_INTERVAL); + }); + puts("Facilitator: getting registration."); + s_f.get_registration(); + } else { + s_f.addEventListener(FacilitatorSocketEvent.REGISTRATION_FAILED, function (e:Event):void { + puts("Facilitator: registration failed."); + setTimeout(establish_facilitator_connection, DEFAULT_FAC_POLL_INTERVAL); + }); + puts("Facilitator: posting registration."); + s_f.post_registration(s_c.id); } - - num_proxy_pairs++; - total_proxy_pairs++; - /* Update the client count on the badge. */ - update_client_count(); - - proxy_pair = new ProxyPair(this, client_addr, relay_addr); - proxy_pair.addEventListener(Event.COMPLETE, function(e:Event):void { - proxy_pair.log("Complete."); - - num_proxy_pairs--; - /* Update the client count on the badge. */ - update_client_count(); + } + + private function start_proxy_pair():void + { + p_p = proxy_pair_factory(); + p_p.addEventListener(Event.CONNECT, function (e:Event):void { + puts("ProxyPair: connected!"); }); - proxy_pair.connect(); - + p_p.addEventListener(Event.CLOSE, function (e:Event):void { + puts("ProxyPair: connection closed."); + p_p = null; + if (proxy_mode && !debug_mode && badge != null) { + badge.num_proxy_pairs--; + } + establish_facilitator_connection(); + }); + p_p.relay = relay_addr; + } + + private function rtmfp_proxy_pair_factory():ProxyPair + { + return new RTMFPProxyPair(this, s_c, s_c.local_stream_name); + } + + // currently is the same as TCPProxyPair + // could be interesting to see how this works + // can't imagine it will work terribly well... + private function sqs_proxy_pair_factory():ProxyPair + { + return new SQSProxyPair(this); + } + + private function tcp_proxy_pair_factory():ProxyPair + { + return new TCPProxyPair(this); }
/* Parse an address in the form "host:port". Returns an Object with