commit 57c267849ce64c15a597211c07072414baf2e267 Author: ellitron jdellit@stanford.edu Date: Thu May 26 18:39:32 2011 -0700
completed proxy side of the code, untested --- return_of_the_rtmfpcat.as | 112 +++++++++++++++++++++++++++++++++++++++++--- 1 files changed, 104 insertions(+), 8 deletions(-)
diff --git a/return_of_the_rtmfpcat.as b/return_of_the_rtmfpcat.as index 3958c69..ed582b8 100644 --- a/return_of_the_rtmfpcat.as +++ b/return_of_the_rtmfpcat.as @@ -59,6 +59,9 @@ package /* Maximum connections. */ private const MAXIMUM_RCP_PAIRS:uint = 1;
+ /* Milliseconds. */ + private const FACILITATOR_POLL_INTERVAL:int = 10000; + /* TextField for debug output. */ private var output_text:TextField;
@@ -88,6 +91,9 @@ package /* Number of connected RTMFPConnectionPairs. */ private var rcp_pairs:uint;
+ /* Keep track of facilitator polling timer. */ + private var fac_poll_timeo_id:uint; + /* Put a string to the screen. */ public function puts(s:String):void { @@ -192,7 +198,7 @@ package clearInterval(circon_timeo_id);
if(proxy_mode) { - + poll_for_id(); } else { puts("Setting up listening RTMFPConnectionPair"); var rcp:RTMFPConnectionPair = new RTMFPConnectionPair(circon, tor_addr, output_text); @@ -209,6 +215,43 @@ package } }
+ private function poll_for_id():void + { + var s_f:Socket = new Socket(); + s_f.addEventListener(Event.CONNECT, function (e:Event):void { + puts("Facilitator: connected to " + fac_addr.host + ":" + fac_addr.port + "."); + s_f.writeUTFBytes("GET / HTTP/1.0\r\n\r\n"); + }); + s_f.addEventListener(Event.CLOSE, function (e:Event):void { + puts("Facilitator: connection closed."); + }); + s_f.addEventListener(IOErrorEvent.IO_ERROR, function (e:IOErrorEvent):void { + puts("Facilitator: I/O error: " + e.text + "."); + }); + s_f.addEventListener(ProgressEvent.SOCKET_DATA, function (e:ProgressEvent):void { + var clientID:String = s_f.readMultiByte(e.bytesLoaded, "utf-8"); + puts("Facilitator: got "" + clientID + """); + if (clientID != "Registration list empty") { + puts("Connecting to " + clientID + "."); + var rcp:RTMFPConnectionPair = new RTMFPConnectionPair(circon, tor_addr, output_text); + rcp.addEventListener(Event.CONNECT, rcp_connect_event); + rcp.addEventListener(Event.CLOSE, rcp_close_event); + rcp.connect(clientID); + } else { + /* Need to clear any outstanding timers to ensure + * that only one timer ever runs. */ + clearTimeout(fac_poll_timeo_id); + fac_poll_timeo_id = setTimeout(poll_for_id, FACILITATOR_POLL_INTERVAL); + } + }); + s_f.addEventListener(SecurityErrorEvent.SECURITY_ERROR, function (e:SecurityErrorEvent):void { + puts("Facilitator: security error: " + e.text + "."); + }); + + s_f.connect(fac_addr.host, fac_addr.port); + + } + private function rcp_connect_event(e:Event):void { puts("RTMFPConnectionPair connected"); @@ -216,7 +259,9 @@ package rcp_pairs++;
if(proxy_mode) { - + if(rcp_pairs < MAXIMUM_RCP_PAIRS) { + poll_for_id(); + } } else { /* Setup listening RTMFPConnectionPair. */ if(rcp_pairs < MAXIMUM_RCP_PAIRS) { @@ -237,6 +282,19 @@ package
/* FIXME: Do I need to unregister the event listeners so * that the system can garbage collect the rcp object? */ + if(proxy_mode) { + if(rcp_pairs < MAXIMUM_RCP_PAIRS) { + poll_for_id(); + } + } else { + if(rcp_pairs < MAXIMUM_RCP_PAIRS) { + puts("Setting up listening RTMFPConnectionPair"); + var rcp:RTMFPConnectionPair = new RTMFPConnectionPair(circon, tor_addr, output_text); + rcp.addEventListener(Event.CONNECT, rcp_connect_event); + rcp.addEventListener(Event.CLOSE, rcp_close_event); + rcp.listen(); + } + } }
private function register_id(id:String, fac_addr:Object):void @@ -349,6 +407,30 @@ class RTMFPSocket extends EventDispatcher send_stream.publish(DATA); }
+ public function connect(clientID:String):void + { + puts("RTMFPSocket: connecting to peer..."); + + send_stream = new NetStream(circon, NetStream.DIRECT_CONNECTIONS); + var client:Object = new Object(); + client.onPeerConnect = function (peer:NetStream):Boolean { + puts("RTMFPSocket: connected to peer"); + connected = true; + dispatchEvent(new Event(Event.CONNECT)); + return true; + }; + send_stream.client = client; + send_stream.publish(DATA); + + recv_stream = new NetStream(circon, clientID); + var client:RTMFPSocketClient = new RTMFPSocketClient(); + client.addEventListener(ProgressEvent.SOCKET_DATA, function (event:ProgressEvent):void { + dispatchEvent(event); + }, false, 0, true); + recv_stream.client = client; + recv_stream.play(DATA); + } + private function send_stream_peer_connect(peer:NetStream):Boolean { puts("RTMFPSocket: peer connecting..."); @@ -483,6 +565,14 @@ class RTMFPConnectionPair extends EventDispatcher this.output_text = output_text; }
+ public function connect(clientID:String):void + { + s_r = new RTMFPSocket(circon, output_text); + s_r.addEventListener(Event.CONNECT, rtmfp_connect_event); + s_r.addEventListener(Event.CLOSE, rtmfp_close_event); + s_r.connect(clientID); + } + public function listen():void { s_r = new RTMFPSocket(circon, output_text); @@ -500,6 +590,18 @@ class RTMFPConnectionPair extends EventDispatcher s_t = new Socket(); s_t.addEventListener(Event.CONNECT, function (e:Event):void { puts("RTMFPConnectionPair: Tor: connected to " + tor_addr.host + ":" + tor_addr.port + "."); + s_r.addEventListener(ProgressEvent.SOCKET_DATA, function (e:ProgressEvent):void { + var bytes:ByteArray = new ByteArray(); + s_r.readBytes(bytes, 0, e.bytesLoaded); + puts("RTMFPConnectionPair: RTMFP: read " + bytes.length + " bytes."); + s_t.writeBytes(bytes); + }); + s_t.addEventListener(ProgressEvent.SOCKET_DATA, function (e:ProgressEvent):void { + var bytes:ByteArray = new ByteArray(); + s_t.readBytes(bytes, 0, e.bytesLoaded); + puts("RTMFPConnectionPair: Tor: read " + bytes.length + " bytes."); + s_r.writeBytes(bytes); + }); dispatchEvent(new Event(Event.CONNECT)); }); s_t.addEventListener(Event.CLOSE, function (e:Event):void { @@ -513,12 +615,6 @@ class RTMFPConnectionPair extends EventDispatcher s_t.addEventListener(IOErrorEvent.IO_ERROR, function (e:IOErrorEvent):void { puts("RTMFPConnectionPair: Tor: I/O error: " + e.text + "."); }); - s_t.addEventListener(ProgressEvent.SOCKET_DATA, function (e:ProgressEvent):void { - var bytes:ByteArray = new ByteArray(); - s_t.readBytes(bytes, 0, e.bytesLoaded); - puts("RTMFPConnectionPair: Tor: read " + bytes.length + " bytes."); - s_r.writeBytes(bytes); - }); s_t.addEventListener(SecurityErrorEvent.SECURITY_ERROR, function (e:SecurityErrorEvent):void { puts("RTMFPConnectionPair: Tor: security error: " + e.text + "."); });