commit ab4e90626b6f6a7152c79b2d2aeeae601deb9d64
Author: Nate Hardison <nate(a)rescomp-09-154551.stanford.edu>
Date: Thu Jun 2 13:23:08 2011 -0700
Adding in David's rate limiting code to rtmfpcat
---
rtmfp/ProxyPair.as | 218 +++++++++++++++++++++++++++++++++++++-------
rtmfp/RTMFPSocketClient.as | 5 +-
rtmfpcat.as | 38 +++-----
3 files changed, 202 insertions(+), 59 deletions(-)
diff --git a/rtmfp/ProxyPair.as b/rtmfp/ProxyPair.as
index 42d4b81..fb184cc 100644
--- a/rtmfp/ProxyPair.as
+++ b/rtmfp/ProxyPair.as
@@ -7,6 +7,8 @@ package rtmfp
import flash.events.SecurityErrorEvent;
import flash.net.Socket;
import flash.utils.ByteArray;
+ import flash.utils.clearTimeout;
+ import flash.utils.setTimeout;
import rtmfp.CirrusSocket;
import rtmfp.RTMFPSocket;
@@ -14,20 +16,41 @@ package rtmfp
public class ProxyPair extends EventDispatcher
{
- private var parent:rtmfpcat;
+ private var ui:rtmfpcat;
private var s_r:RTMFPSocket;
private var s_t:Socket;
private var tor_host:String;
private var tor_port:uint;
+
+ private var p2t_schedule:Array;
+ private var t2p_schedule:Array;
+
+ // Bytes per second. Set to undefined to disable limit.
+ public const RATE_LIMIT:Number = 10000;
+ // Seconds.
+ private const RATE_LIMIT_HISTORY:Number = 5.0;
+
+ private var rate_limit:RateLimit;
+
+ // Callback id.
+ private var flush_id:uint;
- public function ProxyPair(parent:rtmfpcat, s_c:CirrusSocket, tor_host:String, tor_port:uint)
+ public function ProxyPair(ui:rtmfpcat, s_c:CirrusSocket, tor_host:String, tor_port:uint)
{
- this.parent = parent;
+ this.ui = ui;
this.tor_host = tor_host;
this.tor_port = tor_port;
+ this.p2t_schedule = new Array();
+ this.t2p_schedule = new Array();
+
+ if (RATE_LIMIT)
+ rate_limit = new BucketRateLimit(RATE_LIMIT * RATE_LIMIT_HISTORY, RATE_LIMIT_HISTORY);
+ else
+ rate_limit = new RateUnlimit();
+
setup_rtmfp_socket(s_c);
setup_tor_socket();
}
@@ -61,65 +84,196 @@ package rtmfp
private function setup_rtmfp_socket(s_c:CirrusSocket):void
{
s_r = new RTMFPSocket(s_c);
- s_r.addEventListener(RTMFPSocketEvent.PLAY_STARTED, function (e:RTMFPSocketEvent):void {
- puts("Play started.");
+ s_r.addEventListener(RTMFPSocketEvent.CONNECT_FAILED, function (e:RTMFPSocketEvent):void {
+ ui.puts("Peering failed.");
});
- s_r.addEventListener(RTMFPSocketEvent.PUBLISH_STARTED, function (e:RTMFPSocketEvent):void {
- puts("Publishing started.");
+ s_r.addEventListener(RTMFPSocketEvent.CONNECT_SUCCESS, function (e:RTMFPSocketEvent):void {
+ ui.puts("Peering success.");
+ s_t.connect(tor_host, tor_port);
});
s_r.addEventListener(RTMFPSocketEvent.PEER_CONNECTED, function (e:RTMFPSocketEvent):void {
- puts("Peer connected.");
+ ui.puts("Peer connected.");
});
s_r.addEventListener(RTMFPSocketEvent.PEER_DISCONNECTED, function (e:RTMFPSocketEvent):void {
- puts("Peer disconnected.");
+ ui.puts("Peer disconnected.");
close();
});
- s_r.addEventListener(RTMFPSocketEvent.CONNECT_SUCCESS, function (e:RTMFPSocketEvent):void {
- puts("Peering success.");
- s_t.connect(tor_host, tor_port);
+ s_r.addEventListener(RTMFPSocketEvent.PLAY_STARTED, function (e:RTMFPSocketEvent):void {
+ ui.puts("Play started.");
});
- s_r.addEventListener(RTMFPSocketEvent.CONNECT_FAILED, function (e:RTMFPSocketEvent):void {
- puts("Peering failed.");
+ s_r.addEventListener(RTMFPSocketEvent.PUBLISH_STARTED, function (e:RTMFPSocketEvent):void {
+ ui.puts("Publishing started.");
});
+ s_r.addEventListener(ProgressEvent.SOCKET_DATA, proxy_to_tor);
}
private function setup_tor_socket():void
{
s_t = new Socket();
s_t.addEventListener(Event.CONNECT, function (e:Event):void {
- puts("Tor: connected to " + tor_host + ":" + tor_port + ".");
- s_t.addEventListener(ProgressEvent.SOCKET_DATA, function (e:ProgressEvent):void {
- var bytes:ByteArray = new ByteArray();
- s_t.readBytes(bytes, 0, e.bytesLoaded);
- puts("RTMFPSocket: Tor: read " + bytes.length + " bytes.");
- s_r.writeBytes(bytes);
- });
- s_r.addEventListener(ProgressEvent.SOCKET_DATA, function (e:ProgressEvent):void {
- var bytes:ByteArray = new ByteArray();
- s_r.readBytes(bytes, 0, e.bytesLoaded);
- puts("RTMFPSocket: RTMFP: read " + bytes.length + " bytes.");
- s_t.writeBytes(bytes);
- });
+ ui.puts("Tor: connected to " + tor_host + ":" + tor_port + ".");
dispatchEvent(new Event(Event.CONNECT));
});
s_t.addEventListener(Event.CLOSE, function (e:Event):void {
- puts("Tor: closed connection.");
+ ui.puts("Tor: closed connection.");
close();
});
s_t.addEventListener(IOErrorEvent.IO_ERROR, function (e:IOErrorEvent):void {
- puts("Tor: I/O error: " + e.text + ".");
+ ui.puts("Tor: I/O error: " + e.text + ".");
close();
});
s_t.addEventListener(SecurityErrorEvent.SECURITY_ERROR, function (e:SecurityErrorEvent):void {
- puts("Tor: security error: " + e.text + ".");
+ ui.puts("Tor: security error: " + e.text + ".");
close();
});
+ s_t.addEventListener(ProgressEvent.SOCKET_DATA, tor_to_proxy);
}
- private function puts(s:String):void
+ private function tor_to_proxy(e:ProgressEvent):void
+ {
+ t2p_schedule.push(e.bytesLoaded);
+ flush();
+ }
+
+ private function proxy_to_tor(e:ProgressEvent):void
{
- parent.puts(s);
+ p2t_schedule.push(e.bytesLoaded);
+ flush();
}
+ /* Send as much data as the rate limit currently allows. */
+ private function flush():void
+ {
+ if (flush_id)
+ clearTimeout(flush_id);
+ flush_id = undefined;
+
+ if (!(s_r.connected && s_t.connected))
+ /* Can't do anything until both sockets are connected. */
+ return;
+
+ while (!rate_limit.is_limited() && (p2t_schedule.length > 0 || t2p_schedule.length > 0)) {
+ var numBytes:uint;
+ var bytes:ByteArray;
+
+ if (p2t_schedule.length > 0) {
+ numBytes = p2t_schedule.shift();
+ bytes = new ByteArray();
+ s_r.readBytes(bytes, 0, numBytes);
+ ui.puts("ProxyPair: RTMFP: read " + bytes.length + " bytes.");
+ s_t.writeBytes(bytes);
+ rate_limit.update(numBytes);
+ }
+ if (t2p_schedule.length > 0) {
+ numBytes = t2p_schedule.shift();
+ bytes = new ByteArray();
+ s_t.readBytes(bytes, 0, numBytes);
+ ui.puts("ProxyPair: Tor: read " + bytes.length + " bytes.");
+ s_r.writeBytes(bytes);
+ rate_limit.update(numBytes);
+ }
+ }
+
+ /* Call again when safe, if necessary. */
+ if (p2t_schedule.length > 0 || t2p_schedule.length > 0)
+ flush_id = setTimeout(flush, rate_limit.when() * 1000);
+ }
+ }
+}
+
+import flash.utils.getTimer;
+
+class RateLimit
+{
+ public function RateLimit()
+ {
+ }
+
+ public function update(n:Number):Boolean
+ {
+ return true;
+ }
+
+ public function when():Number
+ {
+ return 0.0;
+ }
+
+ public function is_limited():Boolean
+ {
+ return false;
+ }
+}
+
+class RateUnlimit extends RateLimit
+{
+ public function RateUnlimit()
+ {
+ }
+
+ public override function update(n:Number):Boolean
+ {
+ return true;
+ }
+
+ public override function when():Number
+ {
+ return 0.0;
+ }
+
+ public override function is_limited():Boolean
+ {
+ return false;
+ }
+}
+
+class BucketRateLimit extends RateLimit
+{
+ private var amount:Number;
+ private var capacity:Number;
+ private var time:Number;
+ private var last_update:uint;
+
+ public function BucketRateLimit(capacity:Number, time:Number)
+ {
+ this.amount = 0.0;
+ /* capacity / time is the rate we are aiming for. */
+ this.capacity = capacity;
+ this.time = time;
+ this.last_update = getTimer();
+ }
+
+ private function age():void
+ {
+ var now:uint;
+ var delta:Number;
+
+ now = getTimer();
+ delta = (now - last_update) / 1000.0;
+ last_update = now;
+
+ amount -= delta * capacity / time;
+ if (amount < 0.0)
+ amount = 0.0;
+ }
+
+ public override function update(n:Number):Boolean
+ {
+ age();
+ amount += n;
+
+ return amount <= capacity;
+ }
+
+ public override function when():Number
+ {
+ age();
+ return (amount - capacity) / (capacity / time);
+ }
+
+ public override function is_limited():Boolean
+ {
+ age();
+ return amount > capacity;
}
}
\ No newline at end of file
diff --git a/rtmfp/RTMFPSocketClient.as b/rtmfp/RTMFPSocketClient.as
index 7469a53..b917bcb 100644
--- a/rtmfp/RTMFPSocketClient.as
+++ b/rtmfp/RTMFPSocketClient.as
@@ -28,9 +28,8 @@ package rtmfp
public function data_available(bytes:ByteArray):void
{
- this._bytes.clear();
- bytes.readBytes(this._bytes);
- dispatchEvent(new ProgressEvent(ProgressEvent.SOCKET_DATA, false, false, this._bytes.bytesAvailable, this._bytes.length));
+ bytes.readBytes(_bytes, _bytes.length, 0);
+ dispatchEvent(new ProgressEvent(ProgressEvent.SOCKET_DATA, false, false, _bytes.bytesAvailable, _bytes.bytesAvailable));
}
public function get connect_acknowledged():Boolean
diff --git a/rtmfpcat.as b/rtmfpcat.as
index 25099dd..2273bc9 100644
--- a/rtmfpcat.as
+++ b/rtmfpcat.as
@@ -6,8 +6,7 @@ package
import flash.text.TextField;
import flash.text.TextFormat;
import flash.events.Event;
- import flash.utils.clearInterval;
- import flash.utils.setInterval;
+ import flash.utils.setTimeout;
import rtmfp.CirrusSocket;
import rtmfp.FacilitatorSocket;
@@ -39,9 +38,8 @@ package
port: 9001
};
- /* Poll facilitator every 3 sec if in proxy mode and haven't found
- anyone to proxy */
- private const DEFAULT_FAC_POLL_INTERVAL:uint = 3000;
+ /* Poll facilitator every 10 sec */
+ private const DEFAULT_FAC_POLL_INTERVAL:uint = 10000;
// Socket to Cirrus server
private var s_c:CirrusSocket;
@@ -54,8 +52,6 @@ package
private var proxy_mode:Boolean;
- private var fac_poll_interval:uint;
-
/* TextField for debug output. */
private var output_text:TextField;
@@ -80,6 +76,12 @@ package
// Wait until the query string parameters are loaded.
this.loaderInfo.addEventListener(Event.COMPLETE, loaderinfo_complete);
}
+
+ public function puts(s:String):void
+ {
+ output_text.appendText(s + "\n");
+ output_text.scrollV = output_text.maxScrollV;
+ }
private function loaderinfo_complete(e:Event):void
{
@@ -133,11 +135,7 @@ package
s_c = new CirrusSocket();
s_c.addEventListener(CirrusSocketEvent.CONNECT_SUCCESS, function (e:CirrusSocketEvent):void {
puts("Cirrus: connected with id " + s_c.id + ".");
- if (proxy_mode) {
- fac_poll_interval = setInterval(establish_facilitator_connection, DEFAULT_FAC_POLL_INTERVAL);
- } else {
- establish_facilitator_connection();
- }
+ establish_facilitator_connection();
});
s_c.addEventListener(CirrusSocketEvent.CONNECT_FAILED, function (e:CirrusSocketEvent):void {
puts("Error: failed to connect to Cirrus.");
@@ -180,6 +178,7 @@ package
});
s_f.addEventListener(FacilitatorSocketEvent.CONNECT_FAILED, function (e:Event):void {
puts("Facilitator: connect failed.");
+ setTimeout(establish_facilitator_connection, DEFAULT_FAC_POLL_INTERVAL);
});
s_f.addEventListener(FacilitatorSocketEvent.CONNECT_CLOSED, function (e:Event):void {
puts("Facilitator: connect closed.");
@@ -188,16 +187,17 @@ package
if (proxy_mode) {
s_f.addEventListener(FacilitatorSocketEvent.REGISTRATION_RECEIVED, function (e:FacilitatorSocketEvent):void {
puts("Facilitator: got registration " + e.client);
- clearInterval(fac_poll_interval);
start_proxy_pair();
s_c.send_hello(e.client);
});
s_f.addEventListener(FacilitatorSocketEvent.REGISTRATIONS_EMPTY, function (e:Event):void {
puts("Facilitator: no registrations available.");
+ setTimeout(establish_facilitator_connection, DEFAULT_FAC_POLL_INTERVAL);
});
} else {
s_f.addEventListener(FacilitatorSocketEvent.REGISTRATION_FAILED, function (e:Event):void {
puts("Facilitator: registration failed.");
+ setTimeout(establish_facilitator_connection, DEFAULT_FAC_POLL_INTERVAL);
});
}
s_f.connect(fac_addr.host, fac_addr.port);
@@ -213,11 +213,7 @@ package
p_p.addEventListener(Event.CLOSE, function (e:Event):void {
puts("ProxyPair: connection closed.");
p_p = null;
- if (proxy_mode) {
- fac_poll_interval = setInterval(establish_facilitator_connection, DEFAULT_FAC_POLL_INTERVAL);
- } else {
- establish_facilitator_connection();
- }
+ establish_facilitator_connection();
});
p_p.listen(s_c.local_stream_name);
}
@@ -238,11 +234,5 @@ package
return addr;
}
-
- public function puts(s:String):void
- {
- output_text.appendText(s + "\n");
- output_text.scrollV = output_text.maxScrollV;
- }
}
}