commit 9cef884c60fa5c4cea7eb1fa092c9fb53a8a1550 Author: David Fifield david@bamsoftware.com Date: Sat Jun 11 21:41:41 2011 -0700
Rewrite ProxyPair to be more generic. --- ProxyPair.as | 204 +++++++++++++++++++++++++++------------------------------- 1 files changed, 95 insertions(+), 109 deletions(-)
diff --git a/ProxyPair.as b/ProxyPair.as index fa51933..ca9f092 100644 --- a/ProxyPair.as +++ b/ProxyPair.as @@ -1,6 +1,5 @@ package { - import flash.errors.IllegalOperationError; import flash.events.Event; import flash.events.EventDispatcher; import flash.events.IOErrorEvent; @@ -11,119 +10,122 @@ package import flash.utils.ByteArray; import flash.utils.clearTimeout; import flash.utils.setTimeout; - - import swfcat; - + + /* An instance of a client-relay connection. */ public class ProxyPair extends EventDispatcher { + // Socket to client. + private var s_c:*; + private var connect_c:Function; + + // Socket to relay. + private var s_r:*; + private var connect_r:Function; + + // Parent swfcat, for UI updates and rate meter. private var ui:swfcat; - - protected var client_addr:Object; - - /* Not defined here: subclasses should define their own - * protected var client_socket:Object; - */ - - private var c2r_schedule:Array; - - private var relay_addr:Object; - private var relay_socket:Socket; + + // Pending byte read counts for relay and client sockets. private var r2c_schedule:Array; - + private var c2r_schedule:Array; // Callback id. private var flush_id:uint; - - public function ProxyPair(self:ProxyPair, ui:swfcat) - { - if (self != this) { - //only a subclass can pass a valid reference to self - throw new IllegalOperationError("ProxyPair cannot be instantiated directly."); - } - - this.ui = ui; - this.c2r_schedule = new Array(); - this.r2c_schedule = new Array(); - - setup_relay_socket(); - - /* client_socket setup should be taken */ - /* care of in the subclass constructor */ - } - - public function close():void - { - if (relay_socket != null && relay_socket.connected) { - relay_socket.close(); - } - - /* subclasses should override to close */ - /* their client_socket according to impl. */ - } - - public function get connected():Boolean - { - return (relay_socket != null && relay_socket.connected); - - /* subclasses should override to check */ - /* connectivity of their client_socket. */ - } - - public function set client(client_addr:Object):void + + public function log(msg:String):void { - /* subclasses should override to */ - /* connect the client_socket here */ + ui.puts(id() + ": " + msg) } - - public function set relay(relay_addr:Object):void + + // String describing this pair for output. + public function id():String { - this.relay_addr = relay_addr; - log("Relay: connecting to " + relay_addr.host + ":" + relay_addr.port + "."); - relay_socket.connect(relay_addr.host, relay_addr.port); + return "<>"; } - - protected function transfer_bytes(src:Object, dst:Object, num_bytes:uint):void + + public function ProxyPair(ui:swfcat, s_c:*, connect_c:Function, s_r:*, connect_r:Function) { - /* No-op: must be overridden by subclasses */ + this.ui = ui; + /* s_c is a socket for connecting to the client. connect_c is a + function that, when called, connects s_c. Likewise for s_r and + connect_r. */ + this.s_c = s_c; + this.connect_c = connect_c; + this.s_r = s_r; + this.connect_r = connect_r; + + this.c2r_schedule = []; + this.r2c_schedule = []; }
- protected function socket_error(message:String):Function + /* Return a function that shows an error message and closes the other half + of a communication pair. */ + private function socket_error(message:String, other:*):Function { return function(e:Event):void { if (e is TextEvent) log(message + ": " + (e as TextEvent).text + "."); else log(message + "."); - close(); + if (other && other.connected) + other.close(); + dispatchEvent(new Event(Event.COMPLETE)); }; } - - private function setup_relay_socket():void + + public function connect():void + { + s_r.addEventListener(Event.CONNECT, relay_connected); + s_r.addEventListener(Event.CLOSE, socket_error("Relay: closed", s_c)); + s_r.addEventListener(IOErrorEvent.IO_ERROR, socket_error("Relay: I/O error", s_c)); + s_r.addEventListener(SecurityErrorEvent.SECURITY_ERROR, socket_error("Relay: security error", s_c)); + s_r.addEventListener(ProgressEvent.SOCKET_DATA, relay_to_client); + + s_c.addEventListener(Event.CONNECT, client_connected); + s_c.addEventListener(Event.CLOSE, socket_error("Client: closed", s_r)); + s_c.addEventListener(IOErrorEvent.IO_ERROR, socket_error("Client: I/O error", s_r)); + s_c.addEventListener(SecurityErrorEvent.SECURITY_ERROR, socket_error("Client: security error", s_r)); + s_c.addEventListener(ProgressEvent.SOCKET_DATA, client_to_relay); + + log("Relay: connecting."); + connect_r(); + log("Client: connecting."); + connect_c(); + } + + private function relay_connected(e:Event):void { - relay_socket = new Socket(); - relay_socket.addEventListener(Event.CONNECT, function (e:Event):void { - log("Relay: connected to " + relay_addr.host + ":" + relay_addr.port + "."); - if (connected) { - dispatchEvent(new Event(Event.CONNECT)); - } - }); - relay_socket.addEventListener(Event.CLOSE, socket_error("Relay: closed")); - relay_socket.addEventListener(IOErrorEvent.IO_ERROR, socket_error("Relay: I/O error")) - relay_socket.addEventListener(SecurityErrorEvent.SECURITY_ERROR, socket_error("Relay: security error")) - relay_socket.addEventListener(ProgressEvent.SOCKET_DATA, relay_to_client); + log("Relay: connected."); } - - protected function client_to_relay(e:ProgressEvent):void + + private function client_connected(e:Event):void { - c2r_schedule.push(e.bytesLoaded); - flush(); + log("Client: connected."); } - + private function relay_to_client(e:ProgressEvent):void { r2c_schedule.push(e.bytesLoaded); flush(); } - + + private function client_to_relay(e:ProgressEvent):void + { + c2r_schedule.push(e.bytesLoaded); + flush(); + } + + private function transfer_chunk(s_from:*, s_to:*, n:uint, + label:String):void + { + var bytes:ByteArray; + + bytes = new ByteArray(); + s_from.readBytes(bytes, 0, n); + s_to.writeBytes(bytes); + ui.rate_limit.update(n); + log(label + ": read " + bytes.length + "."); + } + /* Send as much data as the rate limit currently allows. */ private function flush():void { @@ -131,37 +133,21 @@ package clearTimeout(flush_id); flush_id = undefined;
- if (!connected) - /* Can't do anything until connected. */ + if (!(s_r.connected && s_c.connected)) + /* Can't do anything until both sockets are connected. */ return;
- while (!ui.rate_limit.is_limited() && (c2r_schedule.length > 0 || r2c_schedule.length > 0)) { - var num_bytes:uint; - - if (c2r_schedule.length > 0) { - num_bytes = c2r_schedule.shift(); - transfer_bytes(null, relay_socket, num_bytes); - ui.rate_limit.update(num_bytes); - } - - if (r2c_schedule.length > 0) { - num_bytes = r2c_schedule.shift(); - transfer_bytes(relay_socket, null, num_bytes); - ui.rate_limit.update(num_bytes); - } + while (!ui.rate_limit.is_limited() && + (r2c_schedule.length > 0 || c2r_schedule.length > 0)) { + if (r2c_schedule.length > 0) + transfer_chunk(s_r, s_c, r2c_schedule.shift(), "Relay"); + if (c2r_schedule.length > 0) + transfer_chunk(s_c, s_r, c2r_schedule.shift(), "Client"); }
/* Call again when safe, if necessary. */ - if (c2r_schedule.length > 0 || r2c_schedule.length > 0) + if (r2c_schedule.length > 0 || c2r_schedule.length > 0) flush_id = setTimeout(flush, ui.rate_limit.when() * 1000); } - - /* Helper function to write output to the - * swfcat console. Set as protected for - * subclasses */ - protected function log(s:String):void - { - ui.puts(s); - } } }