commit f77bd570a94428678cedbcf0899d11b569b599c4 Author: ellitron jdellit@stanford.edu Date: Fri May 27 09:54:55 2011 -0700
Appended unique data stream name to registration to prevent connection aliasing --- return_of_the_rtmfpcat.as | 87 ++++++++++++++++++++++++++++++-------------- 1 files changed, 59 insertions(+), 28 deletions(-)
diff --git a/return_of_the_rtmfpcat.as b/return_of_the_rtmfpcat.as index f1842e4..a1d14cc 100644 --- a/return_of_the_rtmfpcat.as +++ b/return_of_the_rtmfpcat.as @@ -55,11 +55,13 @@ package private static const DEFAULT_CIRCON_TIMEOUT:uint = 4000;
/* Maximum connections. */ - private const MAXIMUM_RCP_PAIRS:uint = 1; + private const DEFAULT_MAXIMUM_RCP_PAIRS:uint = 1;
/* Milliseconds. */ private const FACILITATOR_POLL_INTERVAL:int = 10000;
+ private var max_rcp_pairs; + /* TextField for debug output. */ private var output_text:TextField;
@@ -89,6 +91,8 @@ package /* Number of connected RTMFPConnectionPairs. */ private var rcp_pairs:uint;
+ private var rtmfp_data_counter:uint; + /* Keep track of facilitator polling timer. */ private var fac_poll_timeo_id:uint;
@@ -115,6 +119,9 @@ package /* Initialize connection pair count. */ rcp_pairs = 0;
+ /* Unique counter for RTMFP data publishing. */ + rtmfp_data_counter = 0; + puts("Meow!"); puts("Starting."); // Wait until the query string parameters are loaded. @@ -168,6 +175,11 @@ package else cir_key = DEFAULT_CIRRUS_KEY;
+ if(this.loaderInfo.parameters["max_con"]) + max_rcp_pairs = this.loaderInfo.parameters["max_con"]; + else + max_rcp_pairs = DEFAULT_MAXIMUM_RCP_PAIRS; + main(); }
@@ -202,11 +214,12 @@ package var rcp:RTMFPConnectionPair = new RTMFPConnectionPair(circon, tor_addr, output_text); rcp.addEventListener(Event.CONNECT, rcp_connect_event); rcp.addEventListener(Event.CLOSE, rcp_close_event); - rcp.listen(); + rcp.listen(String(rtmfp_data_counter));
- puts("Registering with facilitator"); - /* Register ID with facilitator. */ - register_id(circon.nearID, fac_addr); + var reg_str:String = circon.nearID + ":" + String(rtmfp_data_counter); + puts("Registering " + reg_str + " with facilitator"); + register_id(reg_str, fac_addr); + rtmfp_data_counter++; }
break; @@ -229,19 +242,25 @@ package puts("Facilitator: I/O error: " + e.text + "."); }); s_f.addEventListener(ProgressEvent.SOCKET_DATA, function (e:ProgressEvent):void { - var clientID:String = s_f.readMultiByte(e.bytesLoaded, "utf-8"); - puts("Facilitator: got "" + clientID + """); - if (clientID != "Registration list empty") { - puts("Connecting to " + clientID + "."); + var client:String = s_f.readMultiByte(e.bytesLoaded, "utf-8"); + puts("Facilitator: got "" + client + """); + if (client != "Registration list empty") { + puts("Connecting to " + client + "."); + + var client_id:String = client.split(":")[0]; + var client_data:String = client.split(":")[1]; + + clearTimeout(fac_poll_timeo_id); + var rcp:RTMFPConnectionPair = new RTMFPConnectionPair(circon, tor_addr, output_text); rcp.addEventListener(Event.CONNECT, rcp_connect_event); rcp.addEventListener(Event.CLOSE, rcp_close_event); - rcp.connect(clientID); + rcp.connect(client_id, client_data); } else { /* Need to clear any outstanding timers to ensure * that only one timer ever runs. */ clearTimeout(fac_poll_timeo_id); - if(rcp_pairs < MAXIMUM_RCP_PAIRS) + if(rcp_pairs < max_rcp_pairs) fac_poll_timeo_id = setTimeout(poll_for_id, FACILITATOR_POLL_INTERVAL); } }); @@ -260,17 +279,21 @@ package rcp_pairs++;
if(proxy_mode) { - if(rcp_pairs < MAXIMUM_RCP_PAIRS) { + if(rcp_pairs < max_rcp_pairs) { poll_for_id(); } } else { /* Setup listening RTMFPConnectionPair. */ - if(rcp_pairs < MAXIMUM_RCP_PAIRS) { + if(rcp_pairs < max_rcp_pairs) { puts("Setting up listening RTMFPConnectionPair"); var rcp:RTMFPConnectionPair = new RTMFPConnectionPair(circon, tor_addr, output_text); rcp.addEventListener(Event.CONNECT, rcp_connect_event); rcp.addEventListener(Event.CLOSE, rcp_close_event); - rcp.listen(); + rcp.listen(String(rtmfp_data_counter)); + var reg_str:String = circon.nearID + ":" + String(rtmfp_data_counter); + puts("Registering " + reg_str + " with facilitator"); + register_id(reg_str, fac_addr); + rtmfp_data_counter++; } } } @@ -284,16 +307,20 @@ package /* FIXME: Do I need to unregister the event listeners so * that the system can garbage collect the rcp object? */ if(proxy_mode) { - if(rcp_pairs < MAXIMUM_RCP_PAIRS) { + if(rcp_pairs < max_rcp_pairs) { poll_for_id(); } } else { - if(rcp_pairs < MAXIMUM_RCP_PAIRS) { + if(rcp_pairs < max_rcp_pairs) { puts("Setting up listening RTMFPConnectionPair"); var rcp:RTMFPConnectionPair = new RTMFPConnectionPair(circon, tor_addr, output_text); rcp.addEventListener(Event.CONNECT, rcp_connect_event); rcp.addEventListener(Event.CLOSE, rcp_close_event); - rcp.listen(); + rcp.listen(String(rtmfp_data_counter)); + var reg_str:String = circon.nearID + ":" + String(rtmfp_data_counter); + puts("Registering " + reg_str + " with facilitator"); + register_id(reg_str, fac_addr); + rtmfp_data_counter++; } } } @@ -362,7 +389,7 @@ import flash.text.TextField; class RTMFPSocket extends EventDispatcher { /* The name of the "media" to pass between peers. */ - private static const DATA:String = "data"; + private var data:String;
/* Connection to the Cirrus rendezvous service. * RTMFPSocket is established using this service. */ @@ -399,19 +426,23 @@ class RTMFPSocket extends EventDispatcher }); }
- public function listen():void + public function listen(data:String):void { + this.data = data; + send_stream = new NetStream(circon, NetStream.DIRECT_CONNECTIONS); var client:Object = new Object(); client.onPeerConnect = send_stream_peer_connect; send_stream.client = client; - send_stream.publish(DATA); + send_stream.publish(data); }
- public function connect(clientID:String):void + public function connect(clientID:String, data:String):void { puts("RTMFPSocket: connecting to peer...");
+ this.data = data; + send_stream = new NetStream(circon, NetStream.DIRECT_CONNECTIONS); var client:Object = new Object(); client.onPeerConnect = function (peer:NetStream):Boolean { @@ -422,7 +453,7 @@ class RTMFPSocket extends EventDispatcher return true; }; send_stream.client = client; - send_stream.publish(DATA); + send_stream.publish(data);
recv_stream = new NetStream(circon, clientID); var client_rtmfp:RTMFPSocketClient = new RTMFPSocketClient(); @@ -434,7 +465,7 @@ class RTMFPSocket extends EventDispatcher }, false, 0, true);
recv_stream.client = client_rtmfp; - recv_stream.play(DATA); + recv_stream.play(data); }
private function send_stream_peer_connect(peer:NetStream):Boolean @@ -451,7 +482,7 @@ class RTMFPSocket extends EventDispatcher dispatchEvent(new Event(Event.CONNECT)); }, false, 0, true); recv_stream.client = client; - recv_stream.play(DATA); + recv_stream.play(data);
peer.send("setPeerConnectAcknowledged");
@@ -574,20 +605,20 @@ class RTMFPConnectionPair extends EventDispatcher this.output_text = output_text; }
- public function connect(clientID:String):void + public function connect(clientID:String, rtmfp_data:String):void { s_r = new RTMFPSocket(circon, output_text); s_r.addEventListener(Event.CONNECT, rtmfp_connect_event); s_r.addEventListener(Event.CLOSE, rtmfp_close_event); - s_r.connect(clientID); + s_r.connect(clientID, rtmfp_data); }
- public function listen():void + public function listen(rtmfp_data:String):void { s_r = new RTMFPSocket(circon, output_text); s_r.addEventListener(Event.CONNECT, rtmfp_connect_event); s_r.addEventListener(Event.CLOSE, rtmfp_close_event); - s_r.listen(); + s_r.listen(rtmfp_data); }
private function rtmfp_connect_event(e:Event):void