commit 1d66a5de55e165334cbdd35ad5d38cf4157d9a2a Author: Nate Hardison nate@rescomp-09-154551.stanford.edu Date: Wed Jun 8 23:56:03 2011 -0700
Merged rtmfpcat with swfcat, now proxy can help TCP and RTMFP clients --- Makefile | 6 +- ProxyPair.as | 273 +++++++++++++++++++ RTMFPProxyPair.as | 101 +++++++ TCPProxyPair.as | 86 ++++++ connector.py | 4 +- facilitator.py | 7 +- rtmfp/ProxyPair.as | 279 ------------------- rtmfp/RTMFPSocket.as | 2 +- rtmfpcat.as | 353 ------------------------ swfcat.as | 723 ++++++++++++++++++++------------------------------ 10 files changed, 756 insertions(+), 1078 deletions(-)
diff --git a/Makefile b/Makefile index 3595385..4e22e1d 100644 --- a/Makefile +++ b/Makefile @@ -1,12 +1,10 @@ MXMLC ?= mxmlc
-TARGETS = rtmfpcat.swf +TARGETS = swfcat.swf
all: $(TARGETS)
-swfcat.swf: badge.png - -%.swf: %.as +%.swf: %.as badge.png $(MXMLC) -output $@ -static-link-runtime-shared-libraries -define=RTMFP::CIRRUS_KEY,"$(CIRRUS_KEY)" $<
clean: diff --git a/ProxyPair.as b/ProxyPair.as new file mode 100644 index 0000000..aef4e2e --- /dev/null +++ b/ProxyPair.as @@ -0,0 +1,273 @@ +package +{ + import flash.errors.IllegalOperationError; + import flash.events.Event; + import flash.events.EventDispatcher; + import flash.events.IOErrorEvent; + import flash.events.ProgressEvent; + import flash.events.SecurityErrorEvent; + import flash.net.Socket; + import flash.utils.ByteArray; + import flash.utils.clearTimeout; + import flash.utils.setTimeout; + + import swfcat; + + public class ProxyPair extends EventDispatcher + { + private var ui:swfcat; + + protected var client_addr:Object; + + /* Not defined here: subclasses should define their own + * protected var client_socket:Object; + */ + + private var c2r_schedule:Array; + + private var relay_addr:Object; + private var relay_socket:Socket; + private var r2c_schedule:Array; + + // Bytes per second. Set to undefined to disable limit. + private const RATE_LIMIT:Number = undefined; //10000; + // Seconds. + private const RATE_LIMIT_HISrelayY:Number = 5.0; + + private var rate_limit:RateLimit; + + // Callback id. + private var flush_id:uint; + + public function ProxyPair(self:ProxyPair, ui:swfcat) + { + if (self != this) { + //only a subclass can pass a valid reference to self + throw new IllegalOperationError("ProxyPair cannot be instantiated directly."); + } + + this.ui = ui; + this.c2r_schedule = new Array(); + this.r2c_schedule = new Array(); + + if (RATE_LIMIT) + rate_limit = new BucketRateLimit(RATE_LIMIT * RATE_LIMIT_HISrelayY, RATE_LIMIT_HISrelayY); + else + rate_limit = new RateUnlimit(); + + setup_relay_socket(); + + /* client_socket setup should be taken */ + /* care of in the subclass constructor */ + } + + public function close():void + { + if (relay_socket != null && relay_socket.connected) { + relay_socket.close(); + } + + /* subclasses should override to close */ + /* their client_socket according to impl. */ + } + + public function get connected():Boolean + { + return (relay_socket != null && relay_socket.connected); + + /* subclasses should override to check */ + /* connectivity of their client_socket. */ + } + + public function set client(client_addr:Object):void + { + /* subclasses should override to */ + /* connect the client_socket here */ + } + + public function set relay(relay_addr:Object):void + { + this.relay_addr = relay_addr; + log("Relay: connecting to " + relay_addr.host + ":" + relay_addr.port + "."); + relay_socket.connect(relay_addr.host, relay_addr.port); + } + + protected function transfer_bytes(src:Object, dst:Object, num_bytes:uint):void + { + /* No-op: must be overridden by subclasses */ + } + + private function setup_relay_socket():void + { + relay_socket = new Socket(); + relay_socket.addEventListener(Event.CONNECT, function (e:Event):void { + log("Relay: connected to " + relay_addr.host + ":" + relay_addr.port + "."); + if (connected) { + dispatchEvent(new Event(Event.CONNECT)); + } + }); + relay_socket.addEventListener(Event.CLOSE, function (e:Event):void { + log("Relay: closed connection."); + close(); + }); + relay_socket.addEventListener(IOErrorEvent.IO_ERROR, function (e:IOErrorEvent):void { + log("Relay: I/O error: " + e.text + "."); + close(); + }); + relay_socket.addEventListener(SecurityErrorEvent.SECURITY_ERROR, function (e:SecurityErrorEvent):void { + log("Relay: security error: " + e.text + "."); + close(); + }); + relay_socket.addEventListener(ProgressEvent.SOCKET_DATA, relay_to_client); + } + + protected function client_to_relay(e:ProgressEvent):void + { + c2r_schedule.push(e.bytesLoaded); + flush(); + } + + private function relay_to_client(e:ProgressEvent):void + { + r2c_schedule.push(e.bytesLoaded); + flush(); + } + + /* Send as much data as the rate limit currently allows. */ + private function flush():void + { + if (flush_id) + clearTimeout(flush_id); + flush_id = undefined; + + if (!connected) + /* Can't do anything until connected. */ + return; + + while (!rate_limit.is_limited() && (c2r_schedule.length > 0 || r2c_schedule.length > 0)) { + var num_bytes:uint; + + if (c2r_schedule.length > 0) { + num_bytes = c2r_schedule.shift(); + transfer_bytes(null, relay_socket, num_bytes); + rate_limit.update(num_bytes); + } + + if (r2c_schedule.length > 0) { + num_bytes = r2c_schedule.shift(); + transfer_bytes(relay_socket, null, num_bytes); + rate_limit.update(num_bytes); + } + } + + /* Call again when safe, if necessary. */ + if (c2r_schedule.length > 0 || r2c_schedule.length > 0) + flush_id = setTimeout(flush, rate_limit.when() * 1000); + } + + /* Helper function to write output to the + * swfcat console. Set as protected for + * subclasses */ + protected function log(s:String):void + { + ui.puts(s); + } + } +} + +import flash.utils.getTimer; + +class RateLimit +{ + public function RateLimit() + { + } + + public function update(n:Number):Boolean + { + return true; + } + + public function when():Number + { + return 0.0; + } + + public function is_limited():Boolean + { + return false; + } +} + +class RateUnlimit extends RateLimit +{ + public function RateUnlimit() + { + } + + public override function update(n:Number):Boolean + { + return true; + } + + public override function when():Number + { + return 0.0; + } + + public override function is_limited():Boolean + { + return false; + } +} + +class BucketRateLimit extends RateLimit +{ + private var amount:Number; + private var capacity:Number; + private var time:Number; + private var last_update:uint; + + public function BucketRateLimit(capacity:Number, time:Number) + { + this.amount = 0.0; + /* capacity / time is the rate we are aiming for. */ + this.capacity = capacity; + this.time = time; + this.last_update = getTimer(); + } + + private function age():void + { + var now:uint; + var delta:Number; + + now = getTimer(); + delta = (now - last_update) / 1000.0; + last_update = now; + + amount -= delta * capacity / time; + if (amount < 0.0) + amount = 0.0; + } + + public override function update(n:Number):Boolean + { + age(); + amount += n; + + return amount <= capacity; + } + + public override function when():Number + { + age(); + return (amount - capacity) / (capacity / time); + } + + public override function is_limited():Boolean + { + age(); + return amount > capacity; + } +} \ No newline at end of file diff --git a/RTMFPProxyPair.as b/RTMFPProxyPair.as new file mode 100644 index 0000000..346d63c --- /dev/null +++ b/RTMFPProxyPair.as @@ -0,0 +1,101 @@ +package +{ + import flash.events.Event; + import flash.events.EventDispatcher; + import flash.events.ProgressEvent; + import flash.net.Socket; + import flash.utils.ByteArray; + + import rtmfp.CirrusSocket; + import rtmfp.RTMFPSocket; + import rtmfp.events.RTMFPSocketEvent; + + public class RTMFPProxyPair extends ProxyPair + { + private var cirrus_socket:CirrusSocket; + private var client_socket:RTMFPSocket; + private var listen_stream:String; + + public function RTMFPProxyPair(ui:swfcat, cirrus_socket:CirrusSocket, listen_stream:String) + { + super(this, ui); + + log("Starting RTMFP proxy pair on stream " + listen_stream); + + this.cirrus_socket = cirrus_socket; + this.listen_stream = listen_stream; + + setup_client_socket(); + } + + override public function set client(client_addr:Object):void + { + this.client_addr = client_addr; + log("Client: connecting to " + client_addr.peer + " on stream " + client_addr.stream + "."); + client_socket.connect(client_addr.peer, client_addr.stream); + } + + override public function close():void + { + super.close(); + if (client_socket != null && client_socket.connected) { + client_socket.close(); + } + dispatchEvent(new Event(Event.CLOSE)); + } + + override public function get connected():Boolean + { + return (super.connected && client_socket != null && client_socket.connected); + } + + override protected function transfer_bytes(src:Object, dst:Object, num_bytes:uint):void + { + var bytes:ByteArray = new ByteArray(); + + if (src == null) { + src = client_socket; + RTMFPSocket(src).readBytes(bytes, 0, num_bytes); + log("RTMFPProxyPair: read " + num_bytes + " bytes from client, writing to relay."); + Socket(dst).writeBytes(bytes); + } + + if (dst == null) { + dst = client_socket; + Socket(src).readBytes(bytes, 0, num_bytes); + log("RTMFPProxyPair: read " + num_bytes + " bytes from relay, writing to client."); + RTMFPSocket(dst).writeBytes(bytes); + } + } + + private function setup_client_socket():void + { + client_socket = new RTMFPSocket(cirrus_socket); + client_socket.addEventListener(RTMFPSocketEvent.CONNECT_FAILED, function (e:RTMFPSocketEvent):void { + log("Client: connection failed to " + client_addr.peer + " on stream " + client_addr.stream + "."); + }); + client_socket.addEventListener(RTMFPSocketEvent.CONNECT_SUCCESS, function (e:RTMFPSocketEvent):void { + log("Client: connected to " + client_addr.peer + " on stream " + client_addr.stream + "."); + if (connected) { + dispatchEvent(new Event(Event.CONNECT)); + } + }); + client_socket.addEventListener(RTMFPSocketEvent.PEER_CONNECTED, function (e:RTMFPSocketEvent):void { + log("Peer connected."); + }); + client_socket.addEventListener(RTMFPSocketEvent.PEER_DISCONNECTED, function (e:RTMFPSocketEvent):void { + log("Client: disconnected from " + client_addr.peer + "."); + close(); + }); + client_socket.addEventListener(RTMFPSocketEvent.PLAY_STARTED, function (e:RTMFPSocketEvent):void { + log("Play started."); + }); + client_socket.addEventListener(RTMFPSocketEvent.PUBLISH_STARTED, function (e:RTMFPSocketEvent):void { + log("Publishing started."); + }); + client_socket.addEventListener(ProgressEvent.SOCKET_DATA, client_to_relay); + + client_socket.listen(listen_stream); + } + } +} \ No newline at end of file diff --git a/TCPProxyPair.as b/TCPProxyPair.as new file mode 100644 index 0000000..b81af3e --- /dev/null +++ b/TCPProxyPair.as @@ -0,0 +1,86 @@ +package +{ + import flash.events.Event; + import flash.events.EventDispatcher; + import flash.events.IOErrorEvent; + import flash.events.ProgressEvent; + import flash.events.SecurityErrorEvent; + import flash.net.Socket; + import flash.utils.ByteArray; + + public class TCPProxyPair extends ProxyPair + { + private var client_socket:Socket; + + public function TCPProxyPair(ui:swfcat) + { + super(this, ui); + + log("Starting TCP proxy pair"); + setup_client_socket(); + } + + override public function set client(client_addr:Object):void + { + this.client_addr = client_addr; + log("Client: connecting to " + client_addr.host + ":" + client_addr.port + "."); + client_socket.connect(client_addr.host, client_addr.port); + } + + override public function close():void + { + super.close(); + if (client_socket != null && client_socket.connected) { + client_socket.close(); + } + dispatchEvent(new Event(Event.CLOSE)); + } + + override public function get connected():Boolean + { + return (super.connected && client_socket != null && client_socket.connected); + } + + override protected function transfer_bytes(src:Object, dst:Object, num_bytes:uint):void + { + var bytes:ByteArray = new ByteArray(); + + if (src == null) { + src = client_socket; + } + + if (dst == null) { + dst = client_socket; + } + + Socket(src).readBytes(bytes, 0, num_bytes); + log("TCPProxyPair: transferring " + num_bytes + " bytes."); + Socket(dst).writeBytes(bytes); + } + + private function setup_client_socket():void + { + client_socket = new Socket(); + + client_socket.addEventListener(Event.CONNECT, function (e:Event):void { + log("Client: connected to " + client_addr.host + ":" + client_addr.port + "."); + if (connected) { + dispatchEvent(new Event(Event.CONNECT)); + } + }); + client_socket.addEventListener(Event.CLOSE, function (e:Event):void { + log("Client: closed."); + close(); + }); + client_socket.addEventListener(IOErrorEvent.IO_ERROR, function (e:IOErrorEvent):void { + log("Client: I/O error: " + e.text + "."); + close(); + }); + client_socket.addEventListener(SecurityErrorEvent.SECURITY_ERROR, function (e:SecurityErrorEvent):void { + log("Client: security error: " + e.text + "."); + close(); + }); + client_socket.addEventListener(ProgressEvent.SOCKET_DATA, client_to_relay); + } + } +} \ No newline at end of file diff --git a/connector.py b/connector.py index a683de0..d20a5b6 100755 --- a/connector.py +++ b/connector.py @@ -13,7 +13,7 @@ import urllib import xml.sax.saxutils
DEFAULT_REMOTE_ADDRESS = "127.0.0.1" -DEFAULT_REMOTE_PORT = 3333 +DEFAULT_REMOTE_PORT = 9002 DEFAULT_LOCAL_ADDRESS = "127.0.0.1" DEFAULT_LOCAL_PORT = 9001 DEFAULT_FACILITATOR_PORT = 9002 @@ -281,7 +281,7 @@ def register(): spec = format_addr((None, options.remote_addr[1])) log(u"Registering "%s" with %s." % (spec, format_addr(options.facilitator_addr))) http = httplib.HTTPConnection(*options.facilitator_addr) - http.request("POST", "/", urllib.urlencode({"client": spec})) + http.request("POST", "/", urllib.urlencode({"client": spec}), {"Content-Type":"application/x-www-form-urlencoded"}) http.close() return True
diff --git a/facilitator.py b/facilitator.py index 0ea86a1..d6c598d 100755 --- a/facilitator.py +++ b/facilitator.py @@ -11,6 +11,7 @@ import sys import threading import time import urllib +import xml.sax.saxutils
DEFAULT_ADDRESS = "0.0.0.0" DEFAULT_PORT = 9002 @@ -216,10 +217,6 @@ class Handler(BaseHTTPServer.BaseHTTPRequestHandler): environ = {'REQUEST_METHOD' : 'POST', 'CONTENT_TYPE' : self.headers['Content-Type']})
- if self.path == "/crossdomain.xml": - self.send_crossdomain() - return - client_specs = data["client"] if client_specs is None or client_specs.value is None: log(u"client %s missing "client" param" % format_addr(self.client_address)) @@ -257,7 +254,7 @@ class Handler(BaseHTTPServer.BaseHTTPRequestHandler): crossdomain = """\ <cross-domain-policy> <allow-access-from domain="*" to-ports="%s"/> -</cross-domain-policy>\r\n""" % (address[1]) +</cross-domain-policy>\r\n""" % xml.sax.saxutils.escape(str(address[1])) self.send_response(200) self.send_header('Content-Type', 'application/xml') self.send_header('Content-Length', str(len(crossdomain))) diff --git a/rtmfp/ProxyPair.as b/rtmfp/ProxyPair.as deleted file mode 100644 index 159abde..0000000 --- a/rtmfp/ProxyPair.as +++ /dev/null @@ -1,279 +0,0 @@ -package rtmfp -{ - import flash.events.Event; - import flash.events.EventDispatcher; - import flash.events.IOErrorEvent; - import flash.events.ProgressEvent; - import flash.events.SecurityErrorEvent; - import flash.net.Socket; - import flash.utils.ByteArray; - import flash.utils.clearTimeout; - import flash.utils.setTimeout; - - import rtmfp.CirrusSocket; - import rtmfp.RTMFPSocket; - import rtmfp.events.RTMFPSocketEvent; - - public class ProxyPair extends EventDispatcher - { - private var ui:rtmfpcat; - - private var s_p:RTMFPSocket; - private var s_r:Socket; - - private var relay_host:String; - private var relay_port:uint; - - private var p2r_schedule:Array; - private var r2p_schedule:Array; - - // Bytes per second. Set to undefined to disable limit. - public const RATE_LIMIT:Number = 10000; - // Seconds. - private const RATE_LIMIT_HISrelayY:Number = 5.0; - - private var rate_limit:RateLimit; - - // Callback id. - private var flush_id:uint; - - public function ProxyPair(ui:rtmfpcat, s_c:CirrusSocket, relay_host:String, relay_port:uint) - { - this.ui = ui; - this.relay_host = relay_host; - this.relay_port = relay_port; - - this.p2r_schedule = new Array(); - this.r2p_schedule = new Array(); - - if (RATE_LIMIT) - rate_limit = new BucketRateLimit(RATE_LIMIT * RATE_LIMIT_HISrelayY, RATE_LIMIT_HISrelayY); - else - rate_limit = new RateUnlimit(); - - setup_rtmfp_socket(s_c); - setup_relay_socket(); - } - - public function close():void - { - if (s_p.connected) { - s_p.close(); - } - if (s_r.connected) { - s_r.close(); - } - dispatchEvent(new Event(Event.CLOSE)); - } - - public function connect(peer:String, stream:String):void - { - s_p.connect(peer, stream); - } - - public function get connected():Boolean - { - return (s_p.connected && s_r.connected); - } - - public function listen(stream:String):void - { - s_p.listen(stream); - } - - private function setup_rtmfp_socket(s_c:CirrusSocket):void - { - s_p = new RTMFPSocket(s_c); - s_p.addEventListener(RTMFPSocketEvent.CONNECT_FAILED, function (e:RTMFPSocketEvent):void { - ui.puts("Peering failed."); - }); - s_p.addEventListener(RTMFPSocketEvent.CONNECT_SUCCESS, function (e:RTMFPSocketEvent):void { - ui.puts("Peering success."); - s_r.connect(relay_host, relay_port); - }); - s_p.addEventListener(RTMFPSocketEvent.PEER_CONNECTED, function (e:RTMFPSocketEvent):void { - ui.puts("Peer connected."); - }); - s_p.addEventListener(RTMFPSocketEvent.PEER_DISCONNECTED, function (e:RTMFPSocketEvent):void { - ui.puts("Peer disconnected."); - close(); - }); - s_p.addEventListener(RTMFPSocketEvent.PLAY_STARTED, function (e:RTMFPSocketEvent):void { - ui.puts("Play started."); - }); - s_p.addEventListener(RTMFPSocketEvent.PUBLISH_STARTED, function (e:RTMFPSocketEvent):void { - ui.puts("Publishing started."); - }); - s_p.addEventListener(ProgressEvent.SOCKET_DATA, proxy_to_relay); - } - - private function setup_relay_socket():void - { - s_r = new Socket(); - s_r.addEventListener(Event.CONNECT, function (e:Event):void { - ui.puts("Relay: connected to " + relay_host + ":" + relay_port + "."); - dispatchEvent(new Event(Event.CONNECT)); - }); - s_r.addEventListener(Event.CLOSE, function (e:Event):void { - ui.puts("Relay: closed connection."); - close(); - }); - s_r.addEventListener(IOErrorEvent.IO_ERROR, function (e:IOErrorEvent):void { - ui.puts("Relay: I/O error: " + e.text + "."); - close(); - }); - s_r.addEventListener(SecurityErrorEvent.SECURITY_ERROR, function (e:SecurityErrorEvent):void { - ui.puts("Relay: security error: " + e.text + "."); - close(); - }); - s_r.addEventListener(ProgressEvent.SOCKET_DATA, relay_to_proxy); - } - - private function relay_to_proxy(e:ProgressEvent):void - { - r2p_schedule.push(e.bytesLoaded); - flush(); - } - - private function proxy_to_relay(e:ProgressEvent):void - { - p2r_schedule.push(e.bytesLoaded); - flush(); - } - - /* Send as much data as the rate limit currently allows. */ - private function flush():void - { - if (flush_id) - clearTimeout(flush_id); - flush_id = undefined; - - if (!(s_p.connected && s_r.connected)) - /* Can't do anything until both sockets are connected. */ - return; - - while (!rate_limit.is_limited() && (p2r_schedule.length > 0 || r2p_schedule.length > 0)) { - var numBytes:uint; - var bytes:ByteArray; - - if (p2r_schedule.length > 0) { - numBytes = p2r_schedule.shift(); - bytes = new ByteArray(); - s_p.readBytes(bytes, 0, numBytes); - ui.puts("ProxyPair: RTMFP: read " + bytes.length + " bytes."); - s_r.writeBytes(bytes); - rate_limit.update(numBytes); - } - if (r2p_schedule.length > 0) { - numBytes = r2p_schedule.shift(); - bytes = new ByteArray(); - s_r.readBytes(bytes, 0, numBytes); - ui.puts("ProxyPair: Relay: read " + bytes.length + " bytes."); - s_p.writeBytes(bytes); - rate_limit.update(numBytes); - } - } - - /* Call again when safe, if necessary. */ - if (p2r_schedule.length > 0 || r2p_schedule.length > 0) - flush_id = setTimeout(flush, rate_limit.when() * 1000); - } - } -} - -import flash.utils.getTimer; - -class RateLimit -{ - public function RateLimit() - { - } - - public function update(n:Number):Boolean - { - return true; - } - - public function when():Number - { - return 0.0; - } - - public function is_limited():Boolean - { - return false; - } -} - -class RateUnlimit extends RateLimit -{ - public function RateUnlimit() - { - } - - public override function update(n:Number):Boolean - { - return true; - } - - public override function when():Number - { - return 0.0; - } - - public override function is_limited():Boolean - { - return false; - } -} - -class BucketRateLimit extends RateLimit -{ - private var amount:Number; - private var capacity:Number; - private var time:Number; - private var last_update:uint; - - public function BucketRateLimit(capacity:Number, time:Number) - { - this.amount = 0.0; - /* capacity / time is the rate we are aiming for. */ - this.capacity = capacity; - this.time = time; - this.last_update = getTimer(); - } - - private function age():void - { - var now:uint; - var delta:Number; - - now = getTimer(); - delta = (now - last_update) / 1000.0; - last_update = now; - - amount -= delta * capacity / time; - if (amount < 0.0) - amount = 0.0; - } - - public override function update(n:Number):Boolean - { - age(); - amount += n; - - return amount <= capacity; - } - - public override function when():Number - { - age(); - return (amount - capacity) / (capacity / time); - } - - public override function is_limited():Boolean - { - age(); - return amount > capacity; - } -} \ No newline at end of file diff --git a/rtmfp/RTMFPSocket.as b/rtmfp/RTMFPSocket.as index bcbee67..4efe702 100644 --- a/rtmfp/RTMFPSocket.as +++ b/rtmfp/RTMFPSocket.as @@ -48,7 +48,7 @@ package rtmfp
/* Tears down this RTMFPSocket, closing both its streams. To be used when destroying this object. */ - public function close():void + public function close():void { if (send_stream != null) { s_c.connection.removeEventListener(NetStatusEvent.NET_STATUS, on_stream_disconnection_event); diff --git a/rtmfpcat.as b/rtmfpcat.as deleted file mode 100644 index 1876ef9..0000000 --- a/rtmfpcat.as +++ /dev/null @@ -1,353 +0,0 @@ -package -{ - import flash.display.Sprite; - import flash.display.StageAlign; - import flash.display.StageScaleMode; - import flash.text.TextField; - import flash.text.TextFormat; - import flash.events.Event; - import flash.utils.setTimeout; - - import rtmfp.CirrusSocket; - import rtmfp.FacilitatorSocket; - import rtmfp.ProxyPair; - import rtmfp.events.CirrusSocketEvent; - import rtmfp.events.FacilitatorSocketEvent; - - public class rtmfpcat extends Sprite - { - /* Adobe's Cirrus server and Nate's key */ - private const DEFAULT_CIRRUS_ADDR:String = "rtmfp://p2p.rtmfp.net"; - private const DEFAULT_CIRRUS_KEY:String = RTMFP::CIRRUS_KEY; - - /* Nate's facilitator -- serves a crossdomain policy */ - private const DEFAULT_FACILITATOR_ADDR:Object = { - host: "128.12.179.80", - port: 9002 - }; - - private const DEFAULT_TOR_CLIENT_ADDR:Object = { - host: "127.0.0.1", - port: 3333 - }; - - /* David's relay (nickname 3VXRyxz67OeRoqHn) that also serves a - crossdomain policy. */ - private const DEFAULT_TOR_RELAY_ADDR:Object = { - host: "173.255.221.44", - port: 9001 - }; - - /* Poll facilitator every 10 sec */ - private const DEFAULT_FAC_POLL_INTERVAL:uint = 10000; - - // Socket to Cirrus server - private var s_c:CirrusSocket; - // Socket to facilitator. - private var s_f:FacilitatorSocket; - // Handle local-remote traffic - private var p_p:ProxyPair; - - private var proxy_pairs:Array; - - private var debug_mode:Boolean; - private var proxy_mode:Boolean; - - /* TextField for debug output. */ - private var output_text:TextField; - - /* Badge for display */ - private var badge:InternetFreedomBadge; - - private var fac_addr:Object; - private var relay_addr:Object; - - public function rtmfpcat() - { - proxy_mode = false; - debug_mode = false; - - // Absolute positioning. - stage.scaleMode = StageScaleMode.NO_SCALE; - stage.align = StageAlign.TOP_LEFT; - - // Wait until the query string parameters are loaded. - this.loaderInfo.addEventListener(Event.COMPLETE, loaderinfo_complete); - } - - public function puts(s:String):void - { - if (output_text != null) { - output_text.appendText(s + "\n"); - output_text.scrollV = output_text.maxScrollV; - } - } - - private function loaderinfo_complete(e:Event):void - { - var fac_spec:String; - var relay_spec:String; - - debug_mode = (this.loaderInfo.parameters["debug"] != null) - proxy_mode = (this.loaderInfo.parameters["proxy"] != null); - if (proxy_mode && !debug_mode) { - badge = new InternetFreedomBadge(this); - badge.display(); - } else { - output_text = new TextField(); - output_text.width = stage.stageWidth; - output_text.height = stage.stageHeight; - output_text.background = true; - output_text.backgroundColor = 0x001f0f; - output_text.textColor = 0x44cc44; - addChild(output_text); - } - - puts("Starting: parameters loaded."); - - /* TODO: use this to have multiple proxies going at once */ - proxy_pairs = new Array(); - - fac_spec = this.loaderInfo.parameters["facilitator"]; - if (fac_spec) { - puts("Facilitator spec: "" + fac_spec + """); - fac_addr = parse_addr_spec(fac_spec); - if (!fac_addr) { - puts("Error: Facilitator spec must be in the form "host:port"."); - return; - } - } else { - fac_addr = DEFAULT_FACILITATOR_ADDR; - } - - relay_spec = this.loaderInfo.parameters["relay"]; - if (relay_spec) { - puts("Relay spec: "" + relay_spec + """); - relay_addr = parse_addr_spec(relay_spec); - if (!relay_addr) { - puts("Error: Relay spec must be in the form "host:port"."); - return; - } - } else { - if (proxy_mode) { - relay_addr = DEFAULT_TOR_RELAY_ADDR; - } else { - relay_addr = DEFAULT_TOR_CLIENT_ADDR; - } - } - - main(); - } - - /* The main logic begins here, after start-up issues are taken care of. */ - private function main():void - { - establish_cirrus_connection(); - } - - private function establish_cirrus_connection():void - { - s_c = new CirrusSocket(); - s_c.addEventListener(CirrusSocketEvent.CONNECT_SUCCESS, function (e:CirrusSocketEvent):void { - puts("Cirrus: connected with id " + s_c.id + "."); - establish_facilitator_connection(); - }); - s_c.addEventListener(CirrusSocketEvent.CONNECT_FAILED, function (e:CirrusSocketEvent):void { - puts("Error: failed to connect to Cirrus."); - }); - s_c.addEventListener(CirrusSocketEvent.CONNECT_CLOSED, function (e:CirrusSocketEvent):void { - puts("Cirrus: closed connection."); - }); - - s_c.addEventListener(CirrusSocketEvent.HELLO_RECEIVED, function (e:CirrusSocketEvent):void { - puts("Cirrus: received hello from peer " + e.peer); - - /* don't bother if we already have a proxy going */ - if (p_p != null && p_p.connected) { - return; - } - - /* if we're in proxy mode, we should have already set - up a proxy pair */ - if (!proxy_mode) { - start_proxy_pair(); - s_c.send_hello(e.peer); - } else if (!debug_mode && badge != null) { - badge.total_proxy_pairs++; - badge.num_proxy_pairs++; - } - p_p.connect(e.peer, e.stream); - }); - - s_c.connect(DEFAULT_CIRRUS_ADDR, DEFAULT_CIRRUS_KEY); - } - - private function establish_facilitator_connection():void - { - s_f = new FacilitatorSocket(fac_addr.host, fac_addr.port); - s_f.addEventListener(FacilitatorSocketEvent.CONNECT_FAILED, function (e:Event):void { - puts("Facilitator: connect failed."); - setTimeout(establish_facilitator_connection, DEFAULT_FAC_POLL_INTERVAL); - }); - - if (proxy_mode) { - s_f.addEventListener(FacilitatorSocketEvent.REGISTRATION_RECEIVED, function (e:FacilitatorSocketEvent):void { - puts("Facilitator: got registration " + e.client); - start_proxy_pair(); - s_c.send_hello(e.client); - }); - s_f.addEventListener(FacilitatorSocketEvent.REGISTRATIONS_EMPTY, function (e:Event):void { - puts("Facilitator: no registrations available."); - setTimeout(establish_facilitator_connection, DEFAULT_FAC_POLL_INTERVAL); - }); - puts("Facilitator: getting registration."); - s_f.get_registration(); - } else { - s_f.addEventListener(FacilitatorSocketEvent.REGISTRATION_FAILED, function (e:Event):void { - puts("Facilitator: registration failed."); - setTimeout(establish_facilitator_connection, DEFAULT_FAC_POLL_INTERVAL); - }); - puts("Facilitator: posting registration."); - s_f.post_registration(s_c.id); - } - } - - private function start_proxy_pair():void - { - puts("Starting proxy pair on stream " + s_c.local_stream_name); - p_p = new ProxyPair(this, s_c, relay_addr.host, relay_addr.port); - p_p.addEventListener(Event.CONNECT, function (e:Event):void { - puts("ProxyPair: connected!"); - }); - p_p.addEventListener(Event.CLOSE, function (e:Event):void { - puts("ProxyPair: connection closed."); - p_p = null; - if (proxy_mode && !debug_mode && badge != null) { - badge.num_proxy_pairs--; - } - establish_facilitator_connection(); - }); - p_p.listen(s_c.local_stream_name); - } - - /* Parse an address in the form "host:port". Returns an Object with - keys "host" (String) and "port" (int). Returns null on error. */ - private function parse_addr_spec(spec:String):Object - { - var parts:Array; - var addr:Object; - - parts = spec.split(":", 2); - if (parts.length != 2 || !parseInt(parts[1])) - return null; - addr = {} - addr.host = parts[0]; - addr.port = parseInt(parts[1]); - - return addr; - } - } -} - -import flash.text.TextField; -import flash.text.TextFormat; - -class InternetFreedomBadge { - - private var ui:rtmfpcat; - - private var _num_proxy_pairs:uint; - private var _total_proxy_pairs:uint; - - [Embed(source="badge.png")] - private var BadgeImage:Class; - private var tot_client_count_tf:TextField; - private var tot_client_count_fmt:TextFormat; - private var cur_client_count_tf:TextField; - private var cur_client_count_fmt:TextFormat; - - public function InternetFreedomBadge(ui:rtmfpcat) - { - this.ui = ui; - _num_proxy_pairs = 0; - _total_proxy_pairs = 0; - - /* Setup client counter for badge. */ - tot_client_count_fmt = new TextFormat(); - tot_client_count_fmt.color = 0xFFFFFF; - tot_client_count_fmt.align = "center"; - tot_client_count_fmt.font = "courier-new"; - tot_client_count_fmt.bold = true; - tot_client_count_fmt.size = 10; - tot_client_count_tf = new TextField(); - tot_client_count_tf.width = 20; - tot_client_count_tf.height = 17; - tot_client_count_tf.background = false; - tot_client_count_tf.defaultTextFormat = tot_client_count_fmt; - tot_client_count_tf.x=47; - tot_client_count_tf.y=0; - - cur_client_count_fmt = new TextFormat(); - cur_client_count_fmt.color = 0xFFFFFF; - cur_client_count_fmt.align = "center"; - cur_client_count_fmt.font = "courier-new"; - cur_client_count_fmt.bold = true; - cur_client_count_fmt.size = 10; - cur_client_count_tf = new TextField(); - cur_client_count_tf.width = 20; - cur_client_count_tf.height = 17; - cur_client_count_tf.background = false; - cur_client_count_tf.defaultTextFormat = cur_client_count_fmt; - cur_client_count_tf.x=47; - cur_client_count_tf.y=6; - - /* Update the client counter on badge. */ - update_client_count(); - } - - public function display():void - { - ui.addChild(new BadgeImage()); - /* Tried unsuccessfully to add counter to badge. */ - /* For now, need two addChilds :( */ - ui.addChild(tot_client_count_tf); - ui.addChild(cur_client_count_tf); - } - - public function get num_proxy_pairs():uint - { - return _num_proxy_pairs; - } - - public function set num_proxy_pairs(amount:uint):void - { - _num_proxy_pairs = amount; - update_client_count(); - } - - public function get total_proxy_pairs():uint - { - return _total_proxy_pairs; - } - - public function set total_proxy_pairs(amount:uint):void - { - _total_proxy_pairs = amount; - /* note: doesn't update, so be sure to update this - before you update num_proxy_pairs! */ - } - - private function update_client_count():void - { - /* Update total client count. */ - if (String(total_proxy_pairs).length == 1) - tot_client_count_tf.text = "0" + String(total_proxy_pairs); - else - tot_client_count_tf.text = String(total_proxy_pairs); - - /* Update current client count. */ - cur_client_count_tf.text = ""; - for(var i:Number = 0; i < num_proxy_pairs; i++) - cur_client_count_tf.appendText("."); - } -} diff --git a/swfcat.as b/swfcat.as index 05da498..7729d2c 100644 --- a/swfcat.as +++ b/swfcat.as @@ -5,251 +5,268 @@ package import flash.display.StageScaleMode; import flash.text.TextField; import flash.text.TextFormat; - import flash.net.Socket; import flash.events.Event; - import flash.events.IOErrorEvent; - import flash.events.ProgressEvent; - import flash.events.SecurityErrorEvent; - import flash.utils.ByteArray; import flash.utils.setTimeout;
+ import rtmfp.CirrusSocket; + import rtmfp.FacilitatorSocket; + import rtmfp.events.CirrusSocketEvent; + import rtmfp.events.FacilitatorSocketEvent; + public class swfcat extends Sprite { + /* Adobe's Cirrus server and Nate's key */ + private const DEFAULT_CIRRUS_ADDR:String = "rtmfp://p2p.rtmfp.net"; + private const DEFAULT_CIRRUS_KEY:String = RTMFP::CIRRUS_KEY; + + /* Nate's facilitator -- serves a crossdomain policy */ + private const DEFAULT_FACILITATOR_ADDR:Object = { + host: "128.12.179.80", + port: 9002 + }; + + private const DEFAULT_TOR_CLIENT_ADDR:Object = { + host: "127.0.0.1", + port: 9002 + }; + /* David's relay (nickname 3VXRyxz67OeRoqHn) that also serves a crossdomain policy. */ - private const DEFAULT_RELAY_ADDR:Object = { + private const DEFAULT_TOR_RELAY_ADDR:Object = { host: "173.255.221.44", port: 9001 }; - private const DEFAULT_FACILITATOR_ADDR:Object = { - host: "173.255.221.44", - port: 9002 - }; - - private const MAX_NUM_PROXY_PAIRS:uint = 1; - - // Milliseconds. - private const FACILITATOR_POLL_INTERVAL:int = 10000; - - // Bytes per second. Set to undefined to disable limit. - public const RATE_LIMIT:Number = undefined; - // Seconds. - private const RATE_LIMIT_HISTORY:Number = 5.0; + + /* Poll facilitator every 10 sec */ + private const DEFAULT_FAC_POLL_INTERVAL:uint = 10000;
+ // Socket to Cirrus server + private var s_c:CirrusSocket; // Socket to facilitator. - private var s_f:Socket; + private var s_f:FacilitatorSocket; + // Handle local-remote traffic + private var p_p:ProxyPair; + + private var client_id:String; + private var proxy_pair_factory:Function; + + private var proxy_pairs:Array; + + private var debug_mode:Boolean; + private var proxy_mode:Boolean;
/* TextField for debug output. */ private var output_text:TextField; + + /* Badge for display */ + private var badge:InternetFreedomBadge;
private var fac_addr:Object; private var relay_addr:Object;
- /* Number of proxy pairs currently connected (up to - MAX_NUM_PROXY_PAIRS). */ - private var num_proxy_pairs:int = 0; - /* Number of proxy pairs ever connected. */ - private var total_proxy_pairs:int = 0; - - public var rate_limit:RateLimit; - - /* Badge with a client counter */ - [Embed(source="badge.png")] - private var BadgeImage:Class; - private var tot_client_count_tf:TextField; - private var tot_client_count_fmt:TextFormat; - private var cur_client_count_tf:TextField; - private var cur_client_count_fmt:TextFormat; - - public function puts(s:String):void - { - output_text.appendText(s + "\n"); - output_text.scrollV = output_text.maxScrollV; - } - - public function update_client_count():void - { - /* Update total client count. */ - if (String(total_proxy_pairs).length == 1) - tot_client_count_tf.text = "0" + String(total_proxy_pairs); - else - tot_client_count_tf.text = String(total_proxy_pairs); - - /* Update current client count. */ - cur_client_count_tf.text = ""; - for(var i:Number=0; i<num_proxy_pairs; i++) - cur_client_count_tf.appendText(".");; - } - public function swfcat() { + proxy_mode = false; + debug_mode = false; + // Absolute positioning. stage.scaleMode = StageScaleMode.NO_SCALE; stage.align = StageAlign.TOP_LEFT; - - output_text = new TextField(); - output_text.width = stage.stageWidth; - output_text.height = stage.stageHeight; - output_text.background = true; - output_text.backgroundColor = 0x001f0f; - output_text.textColor = 0x44cc44; - - /* Setup client counter for badge. */ - tot_client_count_fmt = new TextFormat(); - tot_client_count_fmt.color = 0xFFFFFF; - tot_client_count_fmt.align = "center"; - tot_client_count_fmt.font = "courier-new"; - tot_client_count_fmt.bold = true; - tot_client_count_fmt.size = 10; - tot_client_count_tf = new TextField(); - tot_client_count_tf.width = 20; - tot_client_count_tf.height = 17; - tot_client_count_tf.background = false; - tot_client_count_tf.defaultTextFormat = tot_client_count_fmt; - tot_client_count_tf.x=47; - tot_client_count_tf.y=0; - - cur_client_count_fmt = new TextFormat(); - cur_client_count_fmt.color = 0xFFFFFF; - cur_client_count_fmt.align = "center"; - cur_client_count_fmt.font = "courier-new"; - cur_client_count_fmt.bold = true; - cur_client_count_fmt.size = 10; - cur_client_count_tf = new TextField(); - cur_client_count_tf.width = 20; - cur_client_count_tf.height = 17; - cur_client_count_tf.background = false; - cur_client_count_tf.defaultTextFormat = cur_client_count_fmt; - cur_client_count_tf.x=47; - cur_client_count_tf.y=6; - - - /* Update the client counter on badge. */ - update_client_count(); - - if (RATE_LIMIT) - rate_limit = new BucketRateLimit(RATE_LIMIT * RATE_LIMIT_HISTORY, RATE_LIMIT_HISTORY); - else - rate_limit = new RateUnlimit(); - - puts("Starting."); + // Wait until the query string parameters are loaded. this.loaderInfo.addEventListener(Event.COMPLETE, loaderinfo_complete); } + + public function puts(s:String):void + { + if (output_text != null) { + output_text.appendText(s + "\n"); + output_text.scrollV = output_text.maxScrollV; + } + }
private function loaderinfo_complete(e:Event):void { var fac_spec:String; - - puts("Parameters loaded."); - - if (this.loaderInfo.parameters["debug"]) + var relay_spec:String; + + debug_mode = (this.loaderInfo.parameters["debug"] != null) + proxy_mode = (this.loaderInfo.parameters["proxy"] != null); + if (proxy_mode && !debug_mode) { + badge = new InternetFreedomBadge(this); + badge.display(); + } else { + output_text = new TextField(); + output_text.width = stage.stageWidth; + output_text.height = stage.stageHeight; + output_text.background = true; + output_text.backgroundColor = 0x001f0f; + output_text.textColor = 0x44cc44; addChild(output_text); - else { - addChild(new BadgeImage()); - /* Tried unsuccessfully to add counter to badge. */ - /* For now, need two addChilds :( */ - addChild(tot_client_count_tf); - addChild(cur_client_count_tf); } - - fac_addr = get_param_addr("facilitator", DEFAULT_FACILITATOR_ADDR); - if (!fac_addr) { - puts("Error: Facilitator spec must be in the form "host:port"."); - return; + + puts("Starting: parameters loaded."); + + /* TODO: use this to have multiple proxies going at once */ + proxy_pairs = new Array(); + + fac_spec = this.loaderInfo.parameters["facilitator"]; + if (fac_spec) { + puts("Facilitator spec: "" + fac_spec + """); + fac_addr = parse_addr_spec(fac_spec); + if (!fac_addr) { + puts("Error: Facilitator spec must be in the form "host:port"."); + return; + } + } else { + fac_addr = DEFAULT_FACILITATOR_ADDR; } - relay_addr = get_param_addr("relay", DEFAULT_RELAY_ADDR); - if (!relay_addr) { - puts("Error: Relay spec must be in the form "host:port"."); - return; + + relay_spec = this.loaderInfo.parameters["relay"]; + if (relay_spec) { + puts("Relay spec: "" + relay_spec + """); + relay_addr = parse_addr_spec(relay_spec); + if (!relay_addr) { + puts("Error: Relay spec must be in the form "host:port"."); + return; + } + } else { + if (proxy_mode) { + relay_addr = DEFAULT_TOR_RELAY_ADDR; + } else { + relay_addr = DEFAULT_TOR_CLIENT_ADDR; + } }
main(); }
- /* Get an address structure from the given movie parameter, or the given - default. Returns null on error. */ - private function get_param_addr(param:String, default_addr:Object):Object - { - var spec:String, addr:Object; - - spec = this.loaderInfo.parameters[param]; - if (spec) - return parse_addr_spec(spec); - else - return default_addr; - } - /* The main logic begins here, after start-up issues are taken care of. */ private function main():void { - if (num_proxy_pairs >= MAX_NUM_PROXY_PAIRS) { - setTimeout(main, FACILITATOR_POLL_INTERVAL); - return; + if (proxy_mode) { + establish_facilitator_connection(); + } else { + establish_cirrus_connection(); } + }
- s_f = new Socket(); - - s_f.addEventListener(Event.CONNECT, fac_connected); - s_f.addEventListener(Event.CLOSE, function (e:Event):void { - puts("Facilitator: closed connection."); - setTimeout(main, FACILITATOR_POLL_INTERVAL); + private function establish_cirrus_connection():void + { + s_c = new CirrusSocket(); + s_c.addEventListener(CirrusSocketEvent.CONNECT_SUCCESS, function (e:CirrusSocketEvent):void { + puts("Cirrus: connected with id " + s_c.id + "."); + if (proxy_mode) { + start_proxy_pair(); + s_c.send_hello(client_id); + } else { + establish_facilitator_connection(); + } }); - s_f.addEventListener(IOErrorEvent.IO_ERROR, function (e:IOErrorEvent):void { - puts("Facilitator: I/O error: " + e.text + "."); + s_c.addEventListener(CirrusSocketEvent.CONNECT_FAILED, function (e:CirrusSocketEvent):void { + puts("Error: failed to connect to Cirrus."); }); - s_f.addEventListener(SecurityErrorEvent.SECURITY_ERROR, function (e:SecurityErrorEvent):void { - puts("Facilitator: security error: " + e.text + "."); + s_c.addEventListener(CirrusSocketEvent.CONNECT_CLOSED, function (e:CirrusSocketEvent):void { + puts("Cirrus: closed connection."); }); - - puts("Facilitator: connecting to " + fac_addr.host + ":" + fac_addr.port + "."); - s_f.connect(fac_addr.host, fac_addr.port); + s_c.addEventListener(CirrusSocketEvent.HELLO_RECEIVED, function (e:CirrusSocketEvent):void { + puts("Cirrus: received hello from peer " + e.peer); + + /* don't bother if we already have a proxy going */ + if (p_p != null && p_p.connected) { + return; + } + + /* if we're in proxy mode, we should have already set + up a proxy pair */ + if (!proxy_mode) { + proxy_pair_factory = rtmfp_proxy_pair_factory; + start_proxy_pair(); + s_c.send_hello(e.peer); + } else if (!debug_mode && badge != null) { + badge.total_proxy_pairs++; + badge.num_proxy_pairs++; + } + + p_p.client = {peer: e.peer, stream: e.stream}; + }); + + s_c.connect(DEFAULT_CIRRUS_ADDR, DEFAULT_CIRRUS_KEY); }
- private function fac_connected(e:Event):void + private function establish_facilitator_connection():void { - puts("Facilitator: connected."); - - s_f.addEventListener(ProgressEvent.SOCKET_DATA, fac_data); - - s_f.writeUTFBytes("GET / HTTP/1.0\r\n\r\n"); + s_f = new FacilitatorSocket(fac_addr.host, fac_addr.port); + s_f.addEventListener(FacilitatorSocketEvent.CONNECT_FAILED, function (e:Event):void { + puts("Facilitator: connect failed."); + setTimeout(establish_facilitator_connection, DEFAULT_FAC_POLL_INTERVAL); + }); + + if (proxy_mode) { + s_f.addEventListener(FacilitatorSocketEvent.REGISTRATION_RECEIVED, function (e:FacilitatorSocketEvent):void { + var client_addr:Object = parse_addr_spec(e.client); + if (client_addr == null) { + puts("Facilitator: got registration " + e.client); + proxy_pair_factory = rtmfp_proxy_pair_factory; + if (s_c == null || !s_c.connected) { + client_id = e.client; + establish_cirrus_connection(); + } else { + start_proxy_pair(); + s_c.send_hello(e.client); + } + } else { + proxy_pair_factory = tcp_proxy_pair_factory; + start_proxy_pair(); + p_p.client = client_addr; + } + }); + s_f.addEventListener(FacilitatorSocketEvent.REGISTRATIONS_EMPTY, function (e:Event):void { + puts("Facilitator: no registrations available."); + setTimeout(establish_facilitator_connection, DEFAULT_FAC_POLL_INTERVAL); + }); + puts("Facilitator: getting registration."); + s_f.get_registration(); + } else { + s_f.addEventListener(FacilitatorSocketEvent.REGISTRATION_FAILED, function (e:Event):void { + puts("Facilitator: registration failed."); + setTimeout(establish_facilitator_connection, DEFAULT_FAC_POLL_INTERVAL); + }); + puts("Facilitator: posting registration."); + s_f.post_registration(s_c.id); + } } - - private function fac_data(e:ProgressEvent):void + + private function start_proxy_pair():void { - var client_spec:String; - var client_addr:Object; - var proxy_pair:Object; - - client_spec = s_f.readMultiByte(e.bytesLoaded, "utf-8"); - puts("Facilitator: got "" + client_spec + ""."); - - client_addr = parse_addr_spec(client_spec); - if (!client_addr) { - puts("Error: Client spec must be in the form "host:port"."); - return; - } - - num_proxy_pairs++; - total_proxy_pairs++; - /* Update the client count on the badge. */ - update_client_count(); - - proxy_pair = new ProxyPair(this, client_addr, relay_addr); - proxy_pair.addEventListener(Event.COMPLETE, function(e:Event):void { - proxy_pair.log("Complete."); - - num_proxy_pairs--; - /* Update the client count on the badge. */ - update_client_count(); + p_p = proxy_pair_factory(); + p_p.addEventListener(Event.CONNECT, function (e:Event):void { + puts("ProxyPair: connected!"); }); - proxy_pair.connect(); - + p_p.addEventListener(Event.CLOSE, function (e:Event):void { + puts("ProxyPair: connection closed."); + p_p = null; + if (proxy_mode && !debug_mode && badge != null) { + badge.num_proxy_pairs--; + } + establish_facilitator_connection(); + }); + p_p.relay = relay_addr; + } + + private function rtmfp_proxy_pair_factory():ProxyPair + { + return new RTMFPProxyPair(this, s_c, s_c.local_stream_name); + } + + private function tcp_proxy_pair_factory():ProxyPair + { + return new TCPProxyPair(this); }
/* Parse an address in the form "host:port". Returns an Object with keys "host" (String) and "port" (int). Returns null on error. */ - private static function parse_addr_spec(spec:String):Object + private function parse_addr_spec(spec:String):Object { var parts:Array; var addr:Object; @@ -266,267 +283,105 @@ package } }
-import flash.display.Sprite; -import flash.events.Event; -import flash.events.EventDispatcher; -import flash.events.IOErrorEvent; -import flash.events.ProgressEvent; -import flash.events.SecurityErrorEvent; -import flash.net.Socket; -import flash.utils.ByteArray; -import flash.utils.clearTimeout; -import flash.utils.getTimer; -import flash.utils.setTimeout; - -class RateLimit -{ - public function RateLimit() - { - } - - public function update(n:Number):Boolean - { - return true; - } +import flash.text.TextField; +import flash.text.TextFormat;
- public function when():Number - { - return 0.0; - } - - public function is_limited():Boolean - { - return false; - } -} - -class RateUnlimit extends RateLimit -{ - public function RateUnlimit() - { - } - - public override function update(n:Number):Boolean - { - return true; - } - - public override function when():Number - { - return 0.0; - } - - public override function is_limited():Boolean - { - return false; - } -} - -class BucketRateLimit extends RateLimit -{ - private var amount:Number; - private var capacity:Number; - private var time:Number; - private var last_update:uint; - - public function BucketRateLimit(capacity:Number, time:Number) - { - this.amount = 0.0; - /* capacity / time is the rate we are aiming for. */ - this.capacity = capacity; - this.time = time; - this.last_update = getTimer(); - } - - private function age():void - { - var now:uint; - var delta:Number; - - now = getTimer(); - delta = (now - last_update) / 1000.0; - last_update = now; - - amount -= delta * capacity / time; - if (amount < 0.0) - amount = 0.0; - } - - public override function update(n:Number):Boolean - { - age(); - amount += n; - - return amount <= capacity; - } - - public override function when():Number - { - age(); - return (amount - capacity) / (capacity / time); - } - - public override function is_limited():Boolean - { - age(); - return amount > capacity; - } -} - -/* An instance of a client-relay connection. */ -class ProxyPair extends EventDispatcher -{ - // Address ({host, port}) of client. - private var addr_c:Object; - // Address ({host, port}) of relay. - private var addr_r:Object; - - // Socket to client. - private var s_c:Socket; - // Socket to relay. - private var s_r:Socket; - - // Parent swfcat, for UI updates and rate meter. +class InternetFreedomBadge { + private var ui:swfcat; - - // Pending byte read counts for relay and client sockets. - private var r2c_schedule:Array; - private var c2r_schedule:Array; - // Callback id. - private var flush_id:uint; - - public function log(msg:String):void - { - ui.puts(id() + ": " + msg) - } - - // String describing this pair for output. - public function id():String - { - return "<" + this.addr_c.host + ":" + this.addr_c.port + - "," + this.addr_r.host + ":" + this.addr_r.port + ">"; - } - - public function ProxyPair(ui:swfcat, addr_c:Object, addr_r:Object) + + private var _num_proxy_pairs:uint; + private var _total_proxy_pairs:uint; + + [Embed(source="badge.png")] + private var BadgeImage:Class; + private var tot_client_count_tf:TextField; + private var tot_client_count_fmt:TextFormat; + private var cur_client_count_tf:TextField; + private var cur_client_count_fmt:TextFormat; + + public function InternetFreedomBadge(ui:swfcat) { this.ui = ui; - this.addr_c = addr_c; - this.addr_r = addr_r; - - this.c2r_schedule = []; - this.r2c_schedule = []; - } - - public function connect():void - { - s_r = new Socket(); - - s_r.addEventListener(Event.CONNECT, relay_connected); - s_r.addEventListener(Event.CLOSE, function (e:Event):void { - log("Relay: closed."); - if (s_c.connected) - s_c.close(); - dispatchEvent(new Event(Event.COMPLETE)); - }); - s_r.addEventListener(IOErrorEvent.IO_ERROR, function (e:IOErrorEvent):void { - log("Relay: I/O error: " + e.text + "."); - if (s_c.connected) - s_c.close(); - dispatchEvent(new Event(Event.COMPLETE)); - }); - s_r.addEventListener(SecurityErrorEvent.SECURITY_ERROR, function (e:SecurityErrorEvent):void { - log("Relay: security error: " + e.text + "."); - if (s_c.connected) - s_c.close(); - dispatchEvent(new Event(Event.COMPLETE)); - }); - s_r.addEventListener(ProgressEvent.SOCKET_DATA, relay_to_client); - - log("Relay: connecting to " + addr_r.host + ":" + addr_r.port + "."); - s_r.connect(addr_r.host, addr_r.port); + _num_proxy_pairs = 0; + _total_proxy_pairs = 0; + + /* Setup client counter for badge. */ + tot_client_count_fmt = new TextFormat(); + tot_client_count_fmt.color = 0xFFFFFF; + tot_client_count_fmt.align = "center"; + tot_client_count_fmt.font = "courier-new"; + tot_client_count_fmt.bold = true; + tot_client_count_fmt.size = 10; + tot_client_count_tf = new TextField(); + tot_client_count_tf.width = 20; + tot_client_count_tf.height = 17; + tot_client_count_tf.background = false; + tot_client_count_tf.defaultTextFormat = tot_client_count_fmt; + tot_client_count_tf.x=47; + tot_client_count_tf.y=0; + + cur_client_count_fmt = new TextFormat(); + cur_client_count_fmt.color = 0xFFFFFF; + cur_client_count_fmt.align = "center"; + cur_client_count_fmt.font = "courier-new"; + cur_client_count_fmt.bold = true; + cur_client_count_fmt.size = 10; + cur_client_count_tf = new TextField(); + cur_client_count_tf.width = 20; + cur_client_count_tf.height = 17; + cur_client_count_tf.background = false; + cur_client_count_tf.defaultTextFormat = cur_client_count_fmt; + cur_client_count_tf.x=47; + cur_client_count_tf.y=6; + + /* Update the client counter on badge. */ + update_client_count(); } - - private function relay_connected(e:Event):void + + public function display():void { - log("Relay: connected."); - - s_c = new Socket(); - - s_c.addEventListener(Event.CONNECT, client_connected); - s_c.addEventListener(Event.CLOSE, function (e:Event):void { - log("Client: closed."); - if (s_r.connected) - s_r.close(); - dispatchEvent(new Event(Event.COMPLETE)); - }); - s_c.addEventListener(IOErrorEvent.IO_ERROR, function (e:IOErrorEvent):void { - log("Client: I/O error: " + e.text + "."); - if (s_r.connected) - s_r.close(); - dispatchEvent(new Event(Event.COMPLETE)); - }); - s_c.addEventListener(SecurityErrorEvent.SECURITY_ERROR, function (e:SecurityErrorEvent):void { - log("Client: security error: " + e.text + "."); - if (s_r.connected) - s_r.close(); - dispatchEvent(new Event(Event.COMPLETE)); - }); - s_c.addEventListener(ProgressEvent.SOCKET_DATA, client_to_relay); - - log("Client: connecting to " + addr_c.host + ":" + addr_c.port + "."); - s_c.connect(addr_c.host, addr_c.port); + ui.addChild(new BadgeImage()); + /* Tried unsuccessfully to add counter to badge. */ + /* For now, need two addChilds :( */ + ui.addChild(tot_client_count_tf); + ui.addChild(cur_client_count_tf); } - - private function relay_to_client(e:ProgressEvent):void + + public function get num_proxy_pairs():uint { - r2c_schedule.push(e.bytesLoaded); - flush(); + return _num_proxy_pairs; } - - private function client_to_relay(e:ProgressEvent):void + + public function set num_proxy_pairs(amount:uint):void { - c2r_schedule.push(e.bytesLoaded); - flush(); + _num_proxy_pairs = amount; + update_client_count(); } - - private function client_connected(e:Event):void + + public function get total_proxy_pairs():uint { - log("Client: connected."); + return _total_proxy_pairs; } - - private function transfer_chunk(s_from:Socket, s_to:Socket, n:uint, - label:String):void + + public function set total_proxy_pairs(amount:uint):void { - var bytes:ByteArray; - - bytes = new ByteArray(); - s_from.readBytes(bytes, 0, n); - s_to.writeBytes(bytes); - ui.rate_limit.update(n); - log(label + ": read " + bytes.length + "."); + _total_proxy_pairs = amount; + /* note: doesn't update, so be sure to update this + before you update num_proxy_pairs! */ } - - /* Send as much data as the rate limit currently allows. */ - private function flush():void + + private function update_client_count():void { - if (flush_id) - clearTimeout(flush_id); - flush_id = undefined; - - if (!(s_r.connected && s_c.connected)) - /* Can't do anything until both sockets are connected. */ - return; - - while (!ui.rate_limit.is_limited() && - (r2c_schedule.length > 0 || c2r_schedule.length > 0)) { - if (r2c_schedule.length > 0) - transfer_chunk(s_r, s_c, r2c_schedule.shift(), "Relay"); - if (c2r_schedule.length > 0) - transfer_chunk(s_c, s_r, c2r_schedule.shift(), "Client"); - } - - /* Call again when safe, if necessary. */ - if (r2c_schedule.length > 0 || c2r_schedule.length > 0) - flush_id = setTimeout(flush, ui.rate_limit.when() * 1000); + /* Update total client count. */ + if (String(total_proxy_pairs).length == 1) + tot_client_count_tf.text = "0" + String(total_proxy_pairs); + else + tot_client_count_tf.text = String(total_proxy_pairs); + + /* Update current client count. */ + cur_client_count_tf.text = ""; + for(var i:Number = 0; i < num_proxy_pairs; i++) + cur_client_count_tf.appendText("."); } }