[tor-commits] [flashproxy/master] Adding in David's rate limiting code to rtmfpcat

dcf at torproject.org dcf at torproject.org
Sun Jun 12 08:56:29 UTC 2011


commit ab4e90626b6f6a7152c79b2d2aeeae601deb9d64
Author: Nate Hardison <nate at rescomp-09-154551.stanford.edu>
Date:   Thu Jun 2 13:23:08 2011 -0700

    Adding in David's rate limiting code to rtmfpcat
---
 rtmfp/ProxyPair.as         |  218 +++++++++++++++++++++++++++++++++++++-------
 rtmfp/RTMFPSocketClient.as |    5 +-
 rtmfpcat.as                |   38 +++-----
 3 files changed, 202 insertions(+), 59 deletions(-)

diff --git a/rtmfp/ProxyPair.as b/rtmfp/ProxyPair.as
index 42d4b81..fb184cc 100644
--- a/rtmfp/ProxyPair.as
+++ b/rtmfp/ProxyPair.as
@@ -7,6 +7,8 @@ package rtmfp
     import flash.events.SecurityErrorEvent;
     import flash.net.Socket;
     import flash.utils.ByteArray;
+    import flash.utils.clearTimeout;
+    import flash.utils.setTimeout;
     
     import rtmfp.CirrusSocket;
     import rtmfp.RTMFPSocket;
@@ -14,20 +16,41 @@ package rtmfp
     
     public class ProxyPair extends EventDispatcher
     {   
-        private var parent:rtmfpcat;
+        private var ui:rtmfpcat;
 
         private var s_r:RTMFPSocket;
         private var s_t:Socket;
         
         private var tor_host:String;
         private var tor_port:uint;
+        
+        private var p2t_schedule:Array;
+        private var t2p_schedule:Array;
+        
+        // Bytes per second. Set to undefined to disable limit.
+        public const RATE_LIMIT:Number = 10000;
+        // Seconds.
+        private const RATE_LIMIT_HISTORY:Number = 5.0;
+        
+        private var rate_limit:RateLimit;
+        
+        // Callback id.
+        private var flush_id:uint;
 
-        public function ProxyPair(parent:rtmfpcat, s_c:CirrusSocket, tor_host:String, tor_port:uint)
+        public function ProxyPair(ui:rtmfpcat, s_c:CirrusSocket, tor_host:String, tor_port:uint)
         {
-            this.parent = parent;
+            this.ui = ui;
             this.tor_host = tor_host;
             this.tor_port = tor_port;
             
+            this.p2t_schedule = new Array();
+            this.t2p_schedule = new Array();
+            
+            if (RATE_LIMIT)
+                rate_limit = new BucketRateLimit(RATE_LIMIT * RATE_LIMIT_HISTORY, RATE_LIMIT_HISTORY);
+            else
+                rate_limit = new RateUnlimit();
+            
             setup_rtmfp_socket(s_c);
             setup_tor_socket();
         }
@@ -61,65 +84,196 @@ package rtmfp
         private function setup_rtmfp_socket(s_c:CirrusSocket):void
         {
             s_r = new RTMFPSocket(s_c);
-            s_r.addEventListener(RTMFPSocketEvent.PLAY_STARTED, function (e:RTMFPSocketEvent):void {
-                puts("Play started.");
+            s_r.addEventListener(RTMFPSocketEvent.CONNECT_FAILED, function (e:RTMFPSocketEvent):void {
+                ui.puts("Peering failed.");
             });
-            s_r.addEventListener(RTMFPSocketEvent.PUBLISH_STARTED, function (e:RTMFPSocketEvent):void {
-                puts("Publishing started.");
+            s_r.addEventListener(RTMFPSocketEvent.CONNECT_SUCCESS, function (e:RTMFPSocketEvent):void {
+                ui.puts("Peering success.");
+                s_t.connect(tor_host, tor_port);
             });
             s_r.addEventListener(RTMFPSocketEvent.PEER_CONNECTED, function (e:RTMFPSocketEvent):void {
-                puts("Peer connected.");
+                ui.puts("Peer connected.");
             });
             s_r.addEventListener(RTMFPSocketEvent.PEER_DISCONNECTED, function (e:RTMFPSocketEvent):void {
-                puts("Peer disconnected.");
+                ui.puts("Peer disconnected.");
                 close();
             });
-            s_r.addEventListener(RTMFPSocketEvent.CONNECT_SUCCESS, function (e:RTMFPSocketEvent):void {
-                puts("Peering success.");
-                s_t.connect(tor_host, tor_port);
+            s_r.addEventListener(RTMFPSocketEvent.PLAY_STARTED, function (e:RTMFPSocketEvent):void {
+                ui.puts("Play started.");
             });
-            s_r.addEventListener(RTMFPSocketEvent.CONNECT_FAILED, function (e:RTMFPSocketEvent):void {
-                puts("Peering failed.");
+            s_r.addEventListener(RTMFPSocketEvent.PUBLISH_STARTED, function (e:RTMFPSocketEvent):void {
+                ui.puts("Publishing started.");
             });
+            s_r.addEventListener(ProgressEvent.SOCKET_DATA, proxy_to_tor);
         }
         
         private function setup_tor_socket():void
         {
             s_t = new Socket();
             s_t.addEventListener(Event.CONNECT, function (e:Event):void {
-                puts("Tor: connected to " + tor_host + ":" + tor_port + ".");
-                s_t.addEventListener(ProgressEvent.SOCKET_DATA, function (e:ProgressEvent):void {
-                    var bytes:ByteArray = new ByteArray();
-                    s_t.readBytes(bytes, 0, e.bytesLoaded);
-                    puts("RTMFPSocket: Tor: read " + bytes.length + " bytes.");
-                    s_r.writeBytes(bytes);
-                });
-                s_r.addEventListener(ProgressEvent.SOCKET_DATA, function (e:ProgressEvent):void {
-                    var bytes:ByteArray = new ByteArray();
-                    s_r.readBytes(bytes, 0, e.bytesLoaded);
-                    puts("RTMFPSocket: RTMFP: read " + bytes.length + " bytes.");
-                    s_t.writeBytes(bytes);
-                });
+                ui.puts("Tor: connected to " + tor_host + ":" + tor_port + ".");
                 dispatchEvent(new Event(Event.CONNECT));
             });
             s_t.addEventListener(Event.CLOSE, function (e:Event):void {
-                puts("Tor: closed connection.");
+                ui.puts("Tor: closed connection.");
                 close();
             });
             s_t.addEventListener(IOErrorEvent.IO_ERROR, function (e:IOErrorEvent):void {
-                puts("Tor: I/O error: " + e.text + ".");
+                ui.puts("Tor: I/O error: " + e.text + ".");
                 close();
             });
             s_t.addEventListener(SecurityErrorEvent.SECURITY_ERROR, function (e:SecurityErrorEvent):void {
-                puts("Tor: security error: " + e.text + ".");
+                ui.puts("Tor: security error: " + e.text + ".");
                 close();
             });
+            s_t.addEventListener(ProgressEvent.SOCKET_DATA, tor_to_proxy);
         }
         
-        private function puts(s:String):void
+        private function tor_to_proxy(e:ProgressEvent):void
+        {
+            t2p_schedule.push(e.bytesLoaded);
+            flush();
+        }
+
+        private function proxy_to_tor(e:ProgressEvent):void
         {
-            parent.puts(s);
+            p2t_schedule.push(e.bytesLoaded);
+            flush();
         }
         
+        /* Send as much data as the rate limit currently allows. */
+        private function flush():void
+        {
+            if (flush_id)
+                clearTimeout(flush_id);
+            flush_id = undefined;
+
+            if (!(s_r.connected && s_t.connected))
+                /* Can't do anything until both sockets are connected. */
+                return;
+
+            while (!rate_limit.is_limited() && (p2t_schedule.length > 0 || t2p_schedule.length > 0)) {
+                var numBytes:uint;
+                var bytes:ByteArray;
+                
+                if (p2t_schedule.length > 0) {
+                    numBytes = p2t_schedule.shift();
+                    bytes = new ByteArray();
+                    s_r.readBytes(bytes, 0, numBytes);
+                    ui.puts("ProxyPair: RTMFP: read " + bytes.length + " bytes.");
+                    s_t.writeBytes(bytes);
+                    rate_limit.update(numBytes);
+                }
+                if (t2p_schedule.length > 0) {
+                    numBytes = t2p_schedule.shift();
+                    bytes = new ByteArray();
+                    s_t.readBytes(bytes, 0, numBytes);
+                    ui.puts("ProxyPair: Tor: read " + bytes.length + " bytes.");
+                    s_r.writeBytes(bytes);
+                    rate_limit.update(numBytes);
+                }
+            }
+
+            /* Call again when safe, if necessary. */
+            if (p2t_schedule.length > 0 || t2p_schedule.length > 0)
+                flush_id = setTimeout(flush, rate_limit.when() * 1000);
+        }
+    }
+}
+
+import flash.utils.getTimer;
+
+class RateLimit
+{
+    public function RateLimit()
+    {
+    }
+
+    public function update(n:Number):Boolean
+    {
+        return true;
+    }
+
+    public function when():Number
+    {
+        return 0.0;
+    }
+
+    public function is_limited():Boolean
+    {
+        return false;
+    }
+}
+
+class RateUnlimit extends RateLimit
+{
+    public function RateUnlimit()
+    {
+    }
+
+    public override function update(n:Number):Boolean
+    {
+        return true;
+    }
+
+    public override function when():Number
+    {
+        return 0.0;
+    }
+
+    public override function is_limited():Boolean
+    {
+        return false;
+    }
+}
+
+class BucketRateLimit extends RateLimit
+{
+    private var amount:Number;
+    private var capacity:Number;
+    private var time:Number;
+    private var last_update:uint;
+
+    public function BucketRateLimit(capacity:Number, time:Number)
+    {
+        this.amount = 0.0;
+        /* capacity / time is the rate we are aiming for. */
+        this.capacity = capacity;
+        this.time = time;
+        this.last_update = getTimer();
+    }
+
+    private function age():void
+    {
+        var now:uint;
+        var delta:Number;
+
+        now = getTimer();
+        delta = (now - last_update) / 1000.0;
+        last_update = now;
+
+        amount -= delta * capacity / time;
+        if (amount < 0.0)
+            amount = 0.0;
+    }
+
+    public override function update(n:Number):Boolean
+    {
+        age();
+        amount += n;
+
+        return amount <= capacity;
+    }
+
+    public override function when():Number
+    {
+        age();
+        return (amount - capacity) / (capacity / time);
+    }
+
+    public override function is_limited():Boolean
+    {
+        age();
+        return amount > capacity;
     }
 }
\ No newline at end of file
diff --git a/rtmfp/RTMFPSocketClient.as b/rtmfp/RTMFPSocketClient.as
index 7469a53..b917bcb 100644
--- a/rtmfp/RTMFPSocketClient.as
+++ b/rtmfp/RTMFPSocketClient.as
@@ -28,9 +28,8 @@ package rtmfp
 
         public function data_available(bytes:ByteArray):void
         {
-            this._bytes.clear();
-            bytes.readBytes(this._bytes);
-            dispatchEvent(new ProgressEvent(ProgressEvent.SOCKET_DATA, false, false, this._bytes.bytesAvailable, this._bytes.length));
+            bytes.readBytes(_bytes, _bytes.length, 0);
+            dispatchEvent(new ProgressEvent(ProgressEvent.SOCKET_DATA, false, false, _bytes.bytesAvailable, _bytes.bytesAvailable));
         }
 
         public function get connect_acknowledged():Boolean
diff --git a/rtmfpcat.as b/rtmfpcat.as
index 25099dd..2273bc9 100644
--- a/rtmfpcat.as
+++ b/rtmfpcat.as
@@ -6,8 +6,7 @@ package
     import flash.text.TextField;
     import flash.text.TextFormat;
     import flash.events.Event;
-    import flash.utils.clearInterval;
-    import flash.utils.setInterval;
+    import flash.utils.setTimeout;
 
     import rtmfp.CirrusSocket;
     import rtmfp.FacilitatorSocket;
@@ -39,9 +38,8 @@ package
             port: 9001
         };
         
-        /* Poll facilitator every 3 sec if in proxy mode and haven't found
-           anyone to proxy */
-        private const DEFAULT_FAC_POLL_INTERVAL:uint = 3000;
+        /* Poll facilitator every 10 sec */
+        private const DEFAULT_FAC_POLL_INTERVAL:uint = 10000;
 
         // Socket to Cirrus server
         private var s_c:CirrusSocket;
@@ -54,8 +52,6 @@ package
 
         private var proxy_mode:Boolean;
 
-        private var fac_poll_interval:uint;
-
         /* TextField for debug output. */
         private var output_text:TextField;
 
@@ -80,6 +76,12 @@ package
             // Wait until the query string parameters are loaded.
             this.loaderInfo.addEventListener(Event.COMPLETE, loaderinfo_complete);
         }
+        
+        public function puts(s:String):void
+        {
+            output_text.appendText(s + "\n");
+            output_text.scrollV = output_text.maxScrollV;
+        }
 
         private function loaderinfo_complete(e:Event):void
         {
@@ -133,11 +135,7 @@ package
             s_c = new CirrusSocket();
             s_c.addEventListener(CirrusSocketEvent.CONNECT_SUCCESS, function (e:CirrusSocketEvent):void {
                 puts("Cirrus: connected with id " + s_c.id + ".");
-                if (proxy_mode) {
-                    fac_poll_interval = setInterval(establish_facilitator_connection, DEFAULT_FAC_POLL_INTERVAL);
-                } else {
-                    establish_facilitator_connection();
-                }
+                establish_facilitator_connection();
             });
             s_c.addEventListener(CirrusSocketEvent.CONNECT_FAILED, function (e:CirrusSocketEvent):void {
                 puts("Error: failed to connect to Cirrus.");
@@ -180,6 +178,7 @@ package
             });
             s_f.addEventListener(FacilitatorSocketEvent.CONNECT_FAILED, function (e:Event):void {
                 puts("Facilitator: connect failed.");
+                setTimeout(establish_facilitator_connection, DEFAULT_FAC_POLL_INTERVAL);
             });
             s_f.addEventListener(FacilitatorSocketEvent.CONNECT_CLOSED, function (e:Event):void {
                 puts("Facilitator: connect closed.");
@@ -188,16 +187,17 @@ package
             if (proxy_mode) {
                 s_f.addEventListener(FacilitatorSocketEvent.REGISTRATION_RECEIVED, function (e:FacilitatorSocketEvent):void {
                     puts("Facilitator: got registration " + e.client);
-                    clearInterval(fac_poll_interval);
                     start_proxy_pair();
                     s_c.send_hello(e.client);
                 });
                 s_f.addEventListener(FacilitatorSocketEvent.REGISTRATIONS_EMPTY, function (e:Event):void {
                     puts("Facilitator: no registrations available.");
+                    setTimeout(establish_facilitator_connection, DEFAULT_FAC_POLL_INTERVAL);
                 });
             } else {
                 s_f.addEventListener(FacilitatorSocketEvent.REGISTRATION_FAILED, function (e:Event):void {
                     puts("Facilitator: registration failed.");
+                    setTimeout(establish_facilitator_connection, DEFAULT_FAC_POLL_INTERVAL);
                 });
             }
             s_f.connect(fac_addr.host, fac_addr.port);
@@ -213,11 +213,7 @@ package
             p_p.addEventListener(Event.CLOSE, function (e:Event):void {
                 puts("ProxyPair: connection closed.");
                 p_p = null;
-                if (proxy_mode) {
-                    fac_poll_interval = setInterval(establish_facilitator_connection, DEFAULT_FAC_POLL_INTERVAL);
-                } else {
-                    establish_facilitator_connection();
-                }
+                establish_facilitator_connection();
             });
             p_p.listen(s_c.local_stream_name);
         }
@@ -238,11 +234,5 @@ package
 
             return addr;
         }
-        
-        public function puts(s:String):void
-        {
-            output_text.appendText(s + "\n");
-            output_text.scrollV = output_text.maxScrollV;
-        }
     }
 }





More information about the tor-commits mailing list