commit f77bd570a94428678cedbcf0899d11b569b599c4
Author: ellitron <jdellit(a)stanford.edu>
Date: Fri May 27 09:54:55 2011 -0700
Appended unique data stream name to registration to prevent connection aliasing
---
return_of_the_rtmfpcat.as | 87 ++++++++++++++++++++++++++++++--------------
1 files changed, 59 insertions(+), 28 deletions(-)
diff --git a/return_of_the_rtmfpcat.as b/return_of_the_rtmfpcat.as
index f1842e4..a1d14cc 100644
--- a/return_of_the_rtmfpcat.as
+++ b/return_of_the_rtmfpcat.as
@@ -55,11 +55,13 @@ package
private static const DEFAULT_CIRCON_TIMEOUT:uint = 4000;
/* Maximum connections. */
- private const MAXIMUM_RCP_PAIRS:uint = 1;
+ private const DEFAULT_MAXIMUM_RCP_PAIRS:uint = 1;
/* Milliseconds. */
private const FACILITATOR_POLL_INTERVAL:int = 10000;
+ private var max_rcp_pairs;
+
/* TextField for debug output. */
private var output_text:TextField;
@@ -89,6 +91,8 @@ package
/* Number of connected RTMFPConnectionPairs. */
private var rcp_pairs:uint;
+ private var rtmfp_data_counter:uint;
+
/* Keep track of facilitator polling timer. */
private var fac_poll_timeo_id:uint;
@@ -115,6 +119,9 @@ package
/* Initialize connection pair count. */
rcp_pairs = 0;
+ /* Unique counter for RTMFP data publishing. */
+ rtmfp_data_counter = 0;
+
puts("Meow!");
puts("Starting.");
// Wait until the query string parameters are loaded.
@@ -168,6 +175,11 @@ package
else
cir_key = DEFAULT_CIRRUS_KEY;
+ if(this.loaderInfo.parameters["max_con"])
+ max_rcp_pairs = this.loaderInfo.parameters["max_con"];
+ else
+ max_rcp_pairs = DEFAULT_MAXIMUM_RCP_PAIRS;
+
main();
}
@@ -202,11 +214,12 @@ package
var rcp:RTMFPConnectionPair = new RTMFPConnectionPair(circon, tor_addr, output_text);
rcp.addEventListener(Event.CONNECT, rcp_connect_event);
rcp.addEventListener(Event.CLOSE, rcp_close_event);
- rcp.listen();
+ rcp.listen(String(rtmfp_data_counter));
- puts("Registering with facilitator");
- /* Register ID with facilitator. */
- register_id(circon.nearID, fac_addr);
+ var reg_str:String = circon.nearID + ":" + String(rtmfp_data_counter);
+ puts("Registering " + reg_str + " with facilitator");
+ register_id(reg_str, fac_addr);
+ rtmfp_data_counter++;
}
break;
@@ -229,19 +242,25 @@ package
puts("Facilitator: I/O error: " + e.text + ".");
});
s_f.addEventListener(ProgressEvent.SOCKET_DATA, function (e:ProgressEvent):void {
- var clientID:String = s_f.readMultiByte(e.bytesLoaded, "utf-8");
- puts("Facilitator: got \"" + clientID + "\"");
- if (clientID != "Registration list empty") {
- puts("Connecting to " + clientID + ".");
+ var client:String = s_f.readMultiByte(e.bytesLoaded, "utf-8");
+ puts("Facilitator: got \"" + client + "\"");
+ if (client != "Registration list empty") {
+ puts("Connecting to " + client + ".");
+
+ var client_id:String = client.split(":")[0];
+ var client_data:String = client.split(":")[1];
+
+ clearTimeout(fac_poll_timeo_id);
+
var rcp:RTMFPConnectionPair = new RTMFPConnectionPair(circon, tor_addr, output_text);
rcp.addEventListener(Event.CONNECT, rcp_connect_event);
rcp.addEventListener(Event.CLOSE, rcp_close_event);
- rcp.connect(clientID);
+ rcp.connect(client_id, client_data);
} else {
/* Need to clear any outstanding timers to ensure
* that only one timer ever runs. */
clearTimeout(fac_poll_timeo_id);
- if(rcp_pairs < MAXIMUM_RCP_PAIRS)
+ if(rcp_pairs < max_rcp_pairs)
fac_poll_timeo_id = setTimeout(poll_for_id, FACILITATOR_POLL_INTERVAL);
}
});
@@ -260,17 +279,21 @@ package
rcp_pairs++;
if(proxy_mode) {
- if(rcp_pairs < MAXIMUM_RCP_PAIRS) {
+ if(rcp_pairs < max_rcp_pairs) {
poll_for_id();
}
} else {
/* Setup listening RTMFPConnectionPair. */
- if(rcp_pairs < MAXIMUM_RCP_PAIRS) {
+ if(rcp_pairs < max_rcp_pairs) {
puts("Setting up listening RTMFPConnectionPair");
var rcp:RTMFPConnectionPair = new RTMFPConnectionPair(circon, tor_addr, output_text);
rcp.addEventListener(Event.CONNECT, rcp_connect_event);
rcp.addEventListener(Event.CLOSE, rcp_close_event);
- rcp.listen();
+ rcp.listen(String(rtmfp_data_counter));
+ var reg_str:String = circon.nearID + ":" + String(rtmfp_data_counter);
+ puts("Registering " + reg_str + " with facilitator");
+ register_id(reg_str, fac_addr);
+ rtmfp_data_counter++;
}
}
}
@@ -284,16 +307,20 @@ package
/* FIXME: Do I need to unregister the event listeners so
* that the system can garbage collect the rcp object? */
if(proxy_mode) {
- if(rcp_pairs < MAXIMUM_RCP_PAIRS) {
+ if(rcp_pairs < max_rcp_pairs) {
poll_for_id();
}
} else {
- if(rcp_pairs < MAXIMUM_RCP_PAIRS) {
+ if(rcp_pairs < max_rcp_pairs) {
puts("Setting up listening RTMFPConnectionPair");
var rcp:RTMFPConnectionPair = new RTMFPConnectionPair(circon, tor_addr, output_text);
rcp.addEventListener(Event.CONNECT, rcp_connect_event);
rcp.addEventListener(Event.CLOSE, rcp_close_event);
- rcp.listen();
+ rcp.listen(String(rtmfp_data_counter));
+ var reg_str:String = circon.nearID + ":" + String(rtmfp_data_counter);
+ puts("Registering " + reg_str + " with facilitator");
+ register_id(reg_str, fac_addr);
+ rtmfp_data_counter++;
}
}
}
@@ -362,7 +389,7 @@ import flash.text.TextField;
class RTMFPSocket extends EventDispatcher
{
/* The name of the "media" to pass between peers. */
- private static const DATA:String = "data";
+ private var data:String;
/* Connection to the Cirrus rendezvous service.
* RTMFPSocket is established using this service. */
@@ -399,19 +426,23 @@ class RTMFPSocket extends EventDispatcher
});
}
- public function listen():void
+ public function listen(data:String):void
{
+ this.data = data;
+
send_stream = new NetStream(circon, NetStream.DIRECT_CONNECTIONS);
var client:Object = new Object();
client.onPeerConnect = send_stream_peer_connect;
send_stream.client = client;
- send_stream.publish(DATA);
+ send_stream.publish(data);
}
- public function connect(clientID:String):void
+ public function connect(clientID:String, data:String):void
{
puts("RTMFPSocket: connecting to peer...");
+ this.data = data;
+
send_stream = new NetStream(circon, NetStream.DIRECT_CONNECTIONS);
var client:Object = new Object();
client.onPeerConnect = function (peer:NetStream):Boolean {
@@ -422,7 +453,7 @@ class RTMFPSocket extends EventDispatcher
return true;
};
send_stream.client = client;
- send_stream.publish(DATA);
+ send_stream.publish(data);
recv_stream = new NetStream(circon, clientID);
var client_rtmfp:RTMFPSocketClient = new RTMFPSocketClient();
@@ -434,7 +465,7 @@ class RTMFPSocket extends EventDispatcher
}, false, 0, true);
recv_stream.client = client_rtmfp;
- recv_stream.play(DATA);
+ recv_stream.play(data);
}
private function send_stream_peer_connect(peer:NetStream):Boolean
@@ -451,7 +482,7 @@ class RTMFPSocket extends EventDispatcher
dispatchEvent(new Event(Event.CONNECT));
}, false, 0, true);
recv_stream.client = client;
- recv_stream.play(DATA);
+ recv_stream.play(data);
peer.send("setPeerConnectAcknowledged");
@@ -574,20 +605,20 @@ class RTMFPConnectionPair extends EventDispatcher
this.output_text = output_text;
}
- public function connect(clientID:String):void
+ public function connect(clientID:String, rtmfp_data:String):void
{
s_r = new RTMFPSocket(circon, output_text);
s_r.addEventListener(Event.CONNECT, rtmfp_connect_event);
s_r.addEventListener(Event.CLOSE, rtmfp_close_event);
- s_r.connect(clientID);
+ s_r.connect(clientID, rtmfp_data);
}
- public function listen():void
+ public function listen(rtmfp_data:String):void
{
s_r = new RTMFPSocket(circon, output_text);
s_r.addEventListener(Event.CONNECT, rtmfp_connect_event);
s_r.addEventListener(Event.CLOSE, rtmfp_close_event);
- s_r.listen();
+ s_r.listen(rtmfp_data);
}
private function rtmfp_connect_event(e:Event):void