commit ab4e90626b6f6a7152c79b2d2aeeae601deb9d64 Author: Nate Hardison nate@rescomp-09-154551.stanford.edu Date: Thu Jun 2 13:23:08 2011 -0700
Adding in David's rate limiting code to rtmfpcat --- rtmfp/ProxyPair.as | 218 +++++++++++++++++++++++++++++++++++++------- rtmfp/RTMFPSocketClient.as | 5 +- rtmfpcat.as | 38 +++----- 3 files changed, 202 insertions(+), 59 deletions(-)
diff --git a/rtmfp/ProxyPair.as b/rtmfp/ProxyPair.as index 42d4b81..fb184cc 100644 --- a/rtmfp/ProxyPair.as +++ b/rtmfp/ProxyPair.as @@ -7,6 +7,8 @@ package rtmfp import flash.events.SecurityErrorEvent; import flash.net.Socket; import flash.utils.ByteArray; + import flash.utils.clearTimeout; + import flash.utils.setTimeout;
import rtmfp.CirrusSocket; import rtmfp.RTMFPSocket; @@ -14,20 +16,41 @@ package rtmfp
public class ProxyPair extends EventDispatcher { - private var parent:rtmfpcat; + private var ui:rtmfpcat;
private var s_r:RTMFPSocket; private var s_t:Socket;
private var tor_host:String; private var tor_port:uint; + + private var p2t_schedule:Array; + private var t2p_schedule:Array; + + // Bytes per second. Set to undefined to disable limit. + public const RATE_LIMIT:Number = 10000; + // Seconds. + private const RATE_LIMIT_HISTORY:Number = 5.0; + + private var rate_limit:RateLimit; + + // Callback id. + private var flush_id:uint;
- public function ProxyPair(parent:rtmfpcat, s_c:CirrusSocket, tor_host:String, tor_port:uint) + public function ProxyPair(ui:rtmfpcat, s_c:CirrusSocket, tor_host:String, tor_port:uint) { - this.parent = parent; + this.ui = ui; this.tor_host = tor_host; this.tor_port = tor_port;
+ this.p2t_schedule = new Array(); + this.t2p_schedule = new Array(); + + if (RATE_LIMIT) + rate_limit = new BucketRateLimit(RATE_LIMIT * RATE_LIMIT_HISTORY, RATE_LIMIT_HISTORY); + else + rate_limit = new RateUnlimit(); + setup_rtmfp_socket(s_c); setup_tor_socket(); } @@ -61,65 +84,196 @@ package rtmfp private function setup_rtmfp_socket(s_c:CirrusSocket):void { s_r = new RTMFPSocket(s_c); - s_r.addEventListener(RTMFPSocketEvent.PLAY_STARTED, function (e:RTMFPSocketEvent):void { - puts("Play started."); + s_r.addEventListener(RTMFPSocketEvent.CONNECT_FAILED, function (e:RTMFPSocketEvent):void { + ui.puts("Peering failed."); }); - s_r.addEventListener(RTMFPSocketEvent.PUBLISH_STARTED, function (e:RTMFPSocketEvent):void { - puts("Publishing started."); + s_r.addEventListener(RTMFPSocketEvent.CONNECT_SUCCESS, function (e:RTMFPSocketEvent):void { + ui.puts("Peering success."); + s_t.connect(tor_host, tor_port); }); s_r.addEventListener(RTMFPSocketEvent.PEER_CONNECTED, function (e:RTMFPSocketEvent):void { - puts("Peer connected."); + ui.puts("Peer connected."); }); s_r.addEventListener(RTMFPSocketEvent.PEER_DISCONNECTED, function (e:RTMFPSocketEvent):void { - puts("Peer disconnected."); + ui.puts("Peer disconnected."); close(); }); - s_r.addEventListener(RTMFPSocketEvent.CONNECT_SUCCESS, function (e:RTMFPSocketEvent):void { - puts("Peering success."); - s_t.connect(tor_host, tor_port); + s_r.addEventListener(RTMFPSocketEvent.PLAY_STARTED, function (e:RTMFPSocketEvent):void { + ui.puts("Play started."); }); - s_r.addEventListener(RTMFPSocketEvent.CONNECT_FAILED, function (e:RTMFPSocketEvent):void { - puts("Peering failed."); + s_r.addEventListener(RTMFPSocketEvent.PUBLISH_STARTED, function (e:RTMFPSocketEvent):void { + ui.puts("Publishing started."); }); + s_r.addEventListener(ProgressEvent.SOCKET_DATA, proxy_to_tor); }
private function setup_tor_socket():void { s_t = new Socket(); s_t.addEventListener(Event.CONNECT, function (e:Event):void { - puts("Tor: connected to " + tor_host + ":" + tor_port + "."); - s_t.addEventListener(ProgressEvent.SOCKET_DATA, function (e:ProgressEvent):void { - var bytes:ByteArray = new ByteArray(); - s_t.readBytes(bytes, 0, e.bytesLoaded); - puts("RTMFPSocket: Tor: read " + bytes.length + " bytes."); - s_r.writeBytes(bytes); - }); - s_r.addEventListener(ProgressEvent.SOCKET_DATA, function (e:ProgressEvent):void { - var bytes:ByteArray = new ByteArray(); - s_r.readBytes(bytes, 0, e.bytesLoaded); - puts("RTMFPSocket: RTMFP: read " + bytes.length + " bytes."); - s_t.writeBytes(bytes); - }); + ui.puts("Tor: connected to " + tor_host + ":" + tor_port + "."); dispatchEvent(new Event(Event.CONNECT)); }); s_t.addEventListener(Event.CLOSE, function (e:Event):void { - puts("Tor: closed connection."); + ui.puts("Tor: closed connection."); close(); }); s_t.addEventListener(IOErrorEvent.IO_ERROR, function (e:IOErrorEvent):void { - puts("Tor: I/O error: " + e.text + "."); + ui.puts("Tor: I/O error: " + e.text + "."); close(); }); s_t.addEventListener(SecurityErrorEvent.SECURITY_ERROR, function (e:SecurityErrorEvent):void { - puts("Tor: security error: " + e.text + "."); + ui.puts("Tor: security error: " + e.text + "."); close(); }); + s_t.addEventListener(ProgressEvent.SOCKET_DATA, tor_to_proxy); }
- private function puts(s:String):void + private function tor_to_proxy(e:ProgressEvent):void + { + t2p_schedule.push(e.bytesLoaded); + flush(); + } + + private function proxy_to_tor(e:ProgressEvent):void { - parent.puts(s); + p2t_schedule.push(e.bytesLoaded); + flush(); }
+ /* Send as much data as the rate limit currently allows. */ + private function flush():void + { + if (flush_id) + clearTimeout(flush_id); + flush_id = undefined; + + if (!(s_r.connected && s_t.connected)) + /* Can't do anything until both sockets are connected. */ + return; + + while (!rate_limit.is_limited() && (p2t_schedule.length > 0 || t2p_schedule.length > 0)) { + var numBytes:uint; + var bytes:ByteArray; + + if (p2t_schedule.length > 0) { + numBytes = p2t_schedule.shift(); + bytes = new ByteArray(); + s_r.readBytes(bytes, 0, numBytes); + ui.puts("ProxyPair: RTMFP: read " + bytes.length + " bytes."); + s_t.writeBytes(bytes); + rate_limit.update(numBytes); + } + if (t2p_schedule.length > 0) { + numBytes = t2p_schedule.shift(); + bytes = new ByteArray(); + s_t.readBytes(bytes, 0, numBytes); + ui.puts("ProxyPair: Tor: read " + bytes.length + " bytes."); + s_r.writeBytes(bytes); + rate_limit.update(numBytes); + } + } + + /* Call again when safe, if necessary. */ + if (p2t_schedule.length > 0 || t2p_schedule.length > 0) + flush_id = setTimeout(flush, rate_limit.when() * 1000); + } + } +} + +import flash.utils.getTimer; + +class RateLimit +{ + public function RateLimit() + { + } + + public function update(n:Number):Boolean + { + return true; + } + + public function when():Number + { + return 0.0; + } + + public function is_limited():Boolean + { + return false; + } +} + +class RateUnlimit extends RateLimit +{ + public function RateUnlimit() + { + } + + public override function update(n:Number):Boolean + { + return true; + } + + public override function when():Number + { + return 0.0; + } + + public override function is_limited():Boolean + { + return false; + } +} + +class BucketRateLimit extends RateLimit +{ + private var amount:Number; + private var capacity:Number; + private var time:Number; + private var last_update:uint; + + public function BucketRateLimit(capacity:Number, time:Number) + { + this.amount = 0.0; + /* capacity / time is the rate we are aiming for. */ + this.capacity = capacity; + this.time = time; + this.last_update = getTimer(); + } + + private function age():void + { + var now:uint; + var delta:Number; + + now = getTimer(); + delta = (now - last_update) / 1000.0; + last_update = now; + + amount -= delta * capacity / time; + if (amount < 0.0) + amount = 0.0; + } + + public override function update(n:Number):Boolean + { + age(); + amount += n; + + return amount <= capacity; + } + + public override function when():Number + { + age(); + return (amount - capacity) / (capacity / time); + } + + public override function is_limited():Boolean + { + age(); + return amount > capacity; } } \ No newline at end of file diff --git a/rtmfp/RTMFPSocketClient.as b/rtmfp/RTMFPSocketClient.as index 7469a53..b917bcb 100644 --- a/rtmfp/RTMFPSocketClient.as +++ b/rtmfp/RTMFPSocketClient.as @@ -28,9 +28,8 @@ package rtmfp
public function data_available(bytes:ByteArray):void { - this._bytes.clear(); - bytes.readBytes(this._bytes); - dispatchEvent(new ProgressEvent(ProgressEvent.SOCKET_DATA, false, false, this._bytes.bytesAvailable, this._bytes.length)); + bytes.readBytes(_bytes, _bytes.length, 0); + dispatchEvent(new ProgressEvent(ProgressEvent.SOCKET_DATA, false, false, _bytes.bytesAvailable, _bytes.bytesAvailable)); }
public function get connect_acknowledged():Boolean diff --git a/rtmfpcat.as b/rtmfpcat.as index 25099dd..2273bc9 100644 --- a/rtmfpcat.as +++ b/rtmfpcat.as @@ -6,8 +6,7 @@ package import flash.text.TextField; import flash.text.TextFormat; import flash.events.Event; - import flash.utils.clearInterval; - import flash.utils.setInterval; + import flash.utils.setTimeout;
import rtmfp.CirrusSocket; import rtmfp.FacilitatorSocket; @@ -39,9 +38,8 @@ package port: 9001 };
- /* Poll facilitator every 3 sec if in proxy mode and haven't found - anyone to proxy */ - private const DEFAULT_FAC_POLL_INTERVAL:uint = 3000; + /* Poll facilitator every 10 sec */ + private const DEFAULT_FAC_POLL_INTERVAL:uint = 10000;
// Socket to Cirrus server private var s_c:CirrusSocket; @@ -54,8 +52,6 @@ package
private var proxy_mode:Boolean;
- private var fac_poll_interval:uint; - /* TextField for debug output. */ private var output_text:TextField;
@@ -80,6 +76,12 @@ package // Wait until the query string parameters are loaded. this.loaderInfo.addEventListener(Event.COMPLETE, loaderinfo_complete); } + + public function puts(s:String):void + { + output_text.appendText(s + "\n"); + output_text.scrollV = output_text.maxScrollV; + }
private function loaderinfo_complete(e:Event):void { @@ -133,11 +135,7 @@ package s_c = new CirrusSocket(); s_c.addEventListener(CirrusSocketEvent.CONNECT_SUCCESS, function (e:CirrusSocketEvent):void { puts("Cirrus: connected with id " + s_c.id + "."); - if (proxy_mode) { - fac_poll_interval = setInterval(establish_facilitator_connection, DEFAULT_FAC_POLL_INTERVAL); - } else { - establish_facilitator_connection(); - } + establish_facilitator_connection(); }); s_c.addEventListener(CirrusSocketEvent.CONNECT_FAILED, function (e:CirrusSocketEvent):void { puts("Error: failed to connect to Cirrus."); @@ -180,6 +178,7 @@ package }); s_f.addEventListener(FacilitatorSocketEvent.CONNECT_FAILED, function (e:Event):void { puts("Facilitator: connect failed."); + setTimeout(establish_facilitator_connection, DEFAULT_FAC_POLL_INTERVAL); }); s_f.addEventListener(FacilitatorSocketEvent.CONNECT_CLOSED, function (e:Event):void { puts("Facilitator: connect closed."); @@ -188,16 +187,17 @@ package if (proxy_mode) { s_f.addEventListener(FacilitatorSocketEvent.REGISTRATION_RECEIVED, function (e:FacilitatorSocketEvent):void { puts("Facilitator: got registration " + e.client); - clearInterval(fac_poll_interval); start_proxy_pair(); s_c.send_hello(e.client); }); s_f.addEventListener(FacilitatorSocketEvent.REGISTRATIONS_EMPTY, function (e:Event):void { puts("Facilitator: no registrations available."); + setTimeout(establish_facilitator_connection, DEFAULT_FAC_POLL_INTERVAL); }); } else { s_f.addEventListener(FacilitatorSocketEvent.REGISTRATION_FAILED, function (e:Event):void { puts("Facilitator: registration failed."); + setTimeout(establish_facilitator_connection, DEFAULT_FAC_POLL_INTERVAL); }); } s_f.connect(fac_addr.host, fac_addr.port); @@ -213,11 +213,7 @@ package p_p.addEventListener(Event.CLOSE, function (e:Event):void { puts("ProxyPair: connection closed."); p_p = null; - if (proxy_mode) { - fac_poll_interval = setInterval(establish_facilitator_connection, DEFAULT_FAC_POLL_INTERVAL); - } else { - establish_facilitator_connection(); - } + establish_facilitator_connection(); }); p_p.listen(s_c.local_stream_name); } @@ -238,11 +234,5 @@ package
return addr; } - - public function puts(s:String):void - { - output_text.appendText(s + "\n"); - output_text.scrollV = output_text.maxScrollV; - } } }