commit 7c4ca7bf575684c35773605e522ee118c1f8f0bf Author: Nate Hardison nate@rescomp-09-154551.stanford.edu Date: Thu Jun 2 03:45:18 2011 -0700
New version of rtmfpcat that works with reconnection --- Makefile | 2 +- rtmfp/CirrusSocket.as | 140 +++++++++++++ rtmfp/FacilitatorSocket.as | 115 +++++++++++ rtmfp/ProxyPair.as | 125 ++++++++++++ rtmfp/RTMFPSocket.as | 340 +++++++++++++++----------------- rtmfp/RTMFPSocketClient.as | 34 ++-- rtmfp/events/CirrusSocketEvent.as | 22 ++ rtmfp/events/FacilitatorSocketEvent.as | 22 ++ rtmfp/events/RTMFPSocketEvent.as | 13 +- rtmfpcat.as | 240 ++++++++++++----------- 10 files changed, 729 insertions(+), 324 deletions(-)
diff --git a/Makefile b/Makefile index bc1f8bf..3595385 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,6 @@ MXMLC ?= mxmlc
-TARGETS = rtmfpcat.swf return_of_the_rtmfpcat.swf +TARGETS = rtmfpcat.swf
all: $(TARGETS)
diff --git a/rtmfp/CirrusSocket.as b/rtmfp/CirrusSocket.as new file mode 100644 index 0000000..d9996f2 --- /dev/null +++ b/rtmfp/CirrusSocket.as @@ -0,0 +1,140 @@ +/* CirrusSocket abstraction + * ------------------------ + * Manages the NetConnection portion of RTMFP and also handles + * the handshake between two Flash players to decide what their + * data stream names will be. + * + * TODO: consider using farNonce/nearNonce instead of sending bytes? + */ + +package rtmfp +{ + import flash.events.Event; + import flash.events.EventDispatcher; + import flash.events.IOErrorEvent; + import flash.events.NetStatusEvent; + import flash.events.ProgressEvent; + import flash.events.SecurityErrorEvent; + import flash.net.NetConnection; + import flash.utils.clearInterval; + import flash.utils.setInterval; + + import rtmfp.RTMFPSocket; + import rtmfp.events.CirrusSocketEvent; + import rtmfp.events.RTMFPSocketEvent; + + [Event(name=CirrusSocketEvent.CONNECT_CLOSED, type="com.flashproxy.rtmfp.events.CirrusSocketEvent")] + [Event(name=CirrusSocketEvent.CONNECT_FAILED, type="com.flashproxy.rtmfp.events.CirrusSocketEvent")] + [Event(name=CirrusSocketEvent.CONNECT_SUCCESS, type="com.flashproxy.rtmfp.events.CirrusSocketEvent")] + [Event(name=CirrusSocketEvent.HELLO_RECEIVED, type="com.flashproxy.rtmfp.events.CirrusSocketEvent")] + public class CirrusSocket extends EventDispatcher + { + private static const CONNECT_TIMEOUT:uint = 4000; // in milliseconds + + /* We'll append a unique number to the DATA_STREAM_PREFIX for each + new stream we create so that we have unique streams per player. */ + private static const DATA_STREAM_PREFIX:String = "DATA"; + private var data_stream_suffix:uint = 0; + + /* Connection to the Cirrus rendezvous service */ + public var connection:NetConnection; + + /* Timeouts */ + private var connect_timeout:int; + private var hello_timeout:int; + + public function CirrusSocket() + { + connection = new NetConnection(); + connection.addEventListener(NetStatusEvent.NET_STATUS, on_net_status_event); + connection.addEventListener(IOErrorEvent.IO_ERROR, on_io_error_event); + connection.addEventListener(SecurityErrorEvent.SECURITY_ERROR, on_security_error_event); + var client:Object = new Object(); + client.onRelay = on_hello; + connection.client = client; + } + + public function connect(addr:String, key:String):void + { + if (!this.connected) { + connect_timeout = setInterval(fail, CONNECT_TIMEOUT); + connection.connect(addr, key); + } else { + throw new Error("Cannot connect Cirrus socket: already connected."); + } + } + + public function close():void + { + if (this.connected) { + connection.close(); + } else { + throw new Error("Cannot close Cirrus socket: not connected."); + } + } + + public function get connected():Boolean + { + return (connection != null && connection.connected); + } + + public function get id():String + { + if (this.connected) { + return connection.nearID; + } + + return null; + } + + public function get local_stream_name():String + { + return DATA_STREAM_PREFIX + data_stream_suffix; + } + + /* Sends a hello message to the Flash player with Cirrus ID "id" + We use this new call protocol outlined here: + http://forums.adobe.com/thread/780788?tstart=0 */ + public function send_hello(id:String):void + { + if (this.connected) { + connection.call("relay", null, id, local_stream_name); + } else { + throw new Error("Cannot send hello: Cirrus socket not connected."); + } + } + +/*************************** PRIVATE HELPER FUNCTIONS *************************/ + + private function fail():void + { + clearInterval(connect_timeout); + dispatchEvent(new CirrusSocketEvent(CirrusSocketEvent.CONNECT_FAILED)); + } + + private function on_hello(peer:String, ...args):void + { + var stream:String = args[0]; + dispatchEvent(new CirrusSocketEvent(CirrusSocketEvent.HELLO_RECEIVED, peer, stream)); + data_stream_suffix++; + } + + private function on_io_error_event(event:IOErrorEvent):void + { + fail(); + } + + private function on_net_status_event(event:NetStatusEvent):void + { + if (event.info.code == "NetConnection.Connect.Success") { + clearInterval(connect_timeout); + dispatchEvent(new CirrusSocketEvent(CirrusSocketEvent.CONNECT_SUCCESS)); + } + } + + private function on_security_error_event(event:SecurityErrorEvent):void + { + fail(); + } + } +} diff --git a/rtmfp/FacilitatorSocket.as b/rtmfp/FacilitatorSocket.as new file mode 100644 index 0000000..e175290 --- /dev/null +++ b/rtmfp/FacilitatorSocket.as @@ -0,0 +1,115 @@ +package rtmfp +{ + import flash.net.Socket; + import flash.events.Event; + import flash.events.EventDispatcher; + import flash.events.IOErrorEvent; + import flash.events.NetStatusEvent; + import flash.events.ProgressEvent; + import flash.events.SecurityErrorEvent; + import flash.utils.clearInterval; + import flash.utils.setInterval; + + import rtmfp.events.FacilitatorSocketEvent; + + [Event(name=FacilitatorSocketEvent.CONNECT_CLOSED, type="com.flashproxy.rtmfp.events.FacilitatorSocketEvent")] + [Event(name=FacilitatorSocketEvent.CONNECT_FAILED, type="com.flashproxy.rtmfp.events.FacilitatorSocketEvent")] + [Event(name=FacilitatorSocketEvent.CONNECT_SUCCESS, type="com.flashproxy.rtmfp.events.FacilitatorSocketEvent")] + [Event(name=FacilitatorSocketEvent.REGISTRATION_FAILED, type="com.flashproxy.rtmfp.events.FacilitatorSocketEvent")] + [Event(name=FacilitatorSocketEvent.REGISTRATION_RECEIVED, type="com.flashproxy.rtmfp.events.FacilitatorSocketEvent")] + [Event(name=FacilitatorSocketEvent.REGISTRATIONS_EMPTY, type="com.flashproxy.rtmfp.events.FacilitatorSocketEvent")] + public class FacilitatorSocket extends EventDispatcher + { + private var socket:Socket; + private var connected:Boolean; + private var connection_timeout:uint; + + public function FacilitatorSocket() + { + socket = null; + connected = false; + } + + public function close():void + { + connected = false; + if (socket != null) { + socket.removeEventListener(Event.CONNECT, on_connect_event); + socket.removeEventListener(Event.CLOSE, on_close_event); + socket.removeEventListener(IOErrorEvent.IO_ERROR, on_io_error_event); + socket.removeEventListener(ProgressEvent.SOCKET_DATA, on_progress_event); + socket.removeEventListener(SecurityErrorEvent.SECURITY_ERROR, on_security_error_event); + if (connected) { + socket.close(); + } + } + } + + public function connect(host:String, port:uint):void + { + if (socket != null || connected) { + return; + } + + socket = new Socket(); + socket.addEventListener(Event.CONNECT, on_connect_event); + socket.addEventListener(Event.CLOSE, on_close_event); + socket.addEventListener(IOErrorEvent.IO_ERROR, on_io_error_event); + socket.addEventListener(ProgressEvent.SOCKET_DATA, on_progress_event); + socket.addEventListener(SecurityErrorEvent.SECURITY_ERROR, on_security_error_event); + socket.connect(host, port); + } + + public function get_registration():void + { + if (!connected) return; + socket.writeUTFBytes("GET / HTTP/1.0\r\n\r\n"); + } + + public function post_registration(registration_data:String):void + { + if (!connected) return; + socket.writeUTFBytes("POST / HTTP/1.0\r\n\r\nclient=" + registration_data + "\r\n"); + } + + private function fail():void + { + clearInterval(connection_timeout); + dispatchEvent(new FacilitatorSocketEvent(FacilitatorSocketEvent.CONNECT_FAILED)); + } + + private function on_close_event(event:Event):void + { + close(); + dispatchEvent(new FacilitatorSocketEvent(FacilitatorSocketEvent.CONNECT_CLOSED)); + } + + private function on_connect_event(event:Event):void + { + connected = true; + dispatchEvent(new FacilitatorSocketEvent(FacilitatorSocketEvent.CONNECT_SUCCESS)); + } + + private function on_io_error_event(event:IOErrorEvent):void + { + fail(); + } + + private function on_progress_event(event:ProgressEvent):void + { + var client_id:String = socket.readUTFBytes(event.bytesLoaded); + if (client_id == "Registration list empty") { + dispatchEvent(new FacilitatorSocketEvent(FacilitatorSocketEvent.REGISTRATIONS_EMPTY)); + } else { + dispatchEvent(new FacilitatorSocketEvent(FacilitatorSocketEvent.REGISTRATION_RECEIVED, client_id)); + } + } + + private function on_security_error_event(event:SecurityErrorEvent):void + { + fail(); + } + + + } +} \ No newline at end of file diff --git a/rtmfp/ProxyPair.as b/rtmfp/ProxyPair.as new file mode 100644 index 0000000..42d4b81 --- /dev/null +++ b/rtmfp/ProxyPair.as @@ -0,0 +1,125 @@ +package rtmfp +{ + import flash.events.Event; + import flash.events.EventDispatcher; + import flash.events.IOErrorEvent; + import flash.events.ProgressEvent; + import flash.events.SecurityErrorEvent; + import flash.net.Socket; + import flash.utils.ByteArray; + + import rtmfp.CirrusSocket; + import rtmfp.RTMFPSocket; + import rtmfp.events.RTMFPSocketEvent; + + public class ProxyPair extends EventDispatcher + { + private var parent:rtmfpcat; + + private var s_r:RTMFPSocket; + private var s_t:Socket; + + private var tor_host:String; + private var tor_port:uint; + + public function ProxyPair(parent:rtmfpcat, s_c:CirrusSocket, tor_host:String, tor_port:uint) + { + this.parent = parent; + this.tor_host = tor_host; + this.tor_port = tor_port; + + setup_rtmfp_socket(s_c); + setup_tor_socket(); + } + + public function close():void + { + if (s_r.connected) { + s_r.close(); + } + if (s_t.connected) { + s_t.close(); + } + dispatchEvent(new Event(Event.CLOSE)); + } + + public function connect(peer:String, stream:String):void + { + s_r.connect(peer, stream); + } + + public function get connected():Boolean + { + return (s_r.connected && s_t.connected); + } + + public function listen(stream:String):void + { + s_r.listen(stream); + } + + private function setup_rtmfp_socket(s_c:CirrusSocket):void + { + s_r = new RTMFPSocket(s_c); + s_r.addEventListener(RTMFPSocketEvent.PLAY_STARTED, function (e:RTMFPSocketEvent):void { + puts("Play started."); + }); + s_r.addEventListener(RTMFPSocketEvent.PUBLISH_STARTED, function (e:RTMFPSocketEvent):void { + puts("Publishing started."); + }); + s_r.addEventListener(RTMFPSocketEvent.PEER_CONNECTED, function (e:RTMFPSocketEvent):void { + puts("Peer connected."); + }); + s_r.addEventListener(RTMFPSocketEvent.PEER_DISCONNECTED, function (e:RTMFPSocketEvent):void { + puts("Peer disconnected."); + close(); + }); + s_r.addEventListener(RTMFPSocketEvent.CONNECT_SUCCESS, function (e:RTMFPSocketEvent):void { + puts("Peering success."); + s_t.connect(tor_host, tor_port); + }); + s_r.addEventListener(RTMFPSocketEvent.CONNECT_FAILED, function (e:RTMFPSocketEvent):void { + puts("Peering failed."); + }); + } + + private function setup_tor_socket():void + { + s_t = new Socket(); + s_t.addEventListener(Event.CONNECT, function (e:Event):void { + puts("Tor: connected to " + tor_host + ":" + tor_port + "."); + s_t.addEventListener(ProgressEvent.SOCKET_DATA, function (e:ProgressEvent):void { + var bytes:ByteArray = new ByteArray(); + s_t.readBytes(bytes, 0, e.bytesLoaded); + puts("RTMFPSocket: Tor: read " + bytes.length + " bytes."); + s_r.writeBytes(bytes); + }); + s_r.addEventListener(ProgressEvent.SOCKET_DATA, function (e:ProgressEvent):void { + var bytes:ByteArray = new ByteArray(); + s_r.readBytes(bytes, 0, e.bytesLoaded); + puts("RTMFPSocket: RTMFP: read " + bytes.length + " bytes."); + s_t.writeBytes(bytes); + }); + dispatchEvent(new Event(Event.CONNECT)); + }); + s_t.addEventListener(Event.CLOSE, function (e:Event):void { + puts("Tor: closed connection."); + close(); + }); + s_t.addEventListener(IOErrorEvent.IO_ERROR, function (e:IOErrorEvent):void { + puts("Tor: I/O error: " + e.text + "."); + close(); + }); + s_t.addEventListener(SecurityErrorEvent.SECURITY_ERROR, function (e:SecurityErrorEvent):void { + puts("Tor: security error: " + e.text + "."); + close(); + }); + } + + private function puts(s:String):void + { + parent.puts(s); + } + + } +} \ No newline at end of file diff --git a/rtmfp/RTMFPSocket.as b/rtmfp/RTMFPSocket.as index d524699..783a0e0 100644 --- a/rtmfp/RTMFPSocket.as +++ b/rtmfp/RTMFPSocket.as @@ -1,13 +1,3 @@ -/* RTMFPSocket abstraction - * Author: Nate Hardison, May 2011 - * - * This code is heavily based off of BelugaFile, an open-source - * Air file-transfer application written by Nicholas Bliyk. - * Website: http://www.belugafile.com/ - * Source: http://code.google.com/p/belugafile/ - * - */ - package rtmfp { import flash.events.Event; @@ -19,213 +9,205 @@ package rtmfp import flash.net.NetConnection; import flash.net.NetStream; import flash.utils.ByteArray; - import flash.utils.clearInterval; - import flash.utils.setInterval; + import flash.utils.clearTimeout; import flash.utils.setTimeout; - + + import rtmfp.CirrusSocket; import rtmfp.RTMFPSocketClient; + import rtmfp.events.CirrusSocketEvent; import rtmfp.events.RTMFPSocketEvent; - - [Event(name="connectSuccess", type="com.jscat.rtmfp.events.RTMFPSocketEvent")] - [Event(name="connectFail", type="com.jscat.rtmfp.events.RTMFPSocketEvent")] - [Event(name="publishStart", type="com.jscat.rtmfp.events.RTMFPSocketEvent")] - [Event(name="peerConnected", type="com.jscat.rtmfp.events.RTMFPSocketEvent")] - [Event(name="peeringSuccess", type="com.jscat.rtmfp.events.RTMFPSocketEvent")] - [Event(name="peeringFail", type="com.jscat.rtmfp.events.RTMFPSocketEvent")] - [Event(name="peerDisconnected", type="com.jscat.rtmfp.events.RTMFPSocketEvent")] + + [Event(name=RTMFPSocketEvent.CONNECT_FAILED, type="com.flashproxy.rtmfp.events.RTMFPSocketEvent")] + [Event(name=RTMFPSocketEvent.CONNECT_CLOSED, type="com.flashproxy.rtmfp.events.RTMFPSocketEvent")] + [Event(name=RTMFPSocketEvent.CONNECT_SUCCESS, type="com.flashproxy.rtmfp.events.RTMFPSocketEvent")] + [Event(name=RTMFPSocketEvent.PEER_CONNECTED, type="com.flashproxy.rtmfp.events.RTMFPSocketEvent")] + [Event(name=RTMFPSocketEvent.PEER_DISCONNECTED, type="com.flashproxy.rtmfp.events.RTMFPSocketEvent")] + [Event(name=RTMFPSocketEvent.PLAY_STARTED, type="com.flashproxy.rtmfp.events.RTMFPSocketEvent")] + [Event(name=RTMFPSocketEvent.PUBLISH_STARTED, type="com.flashproxy.rtmfp.events.RTMFPSocketEvent")] + [Event(name=RTMFPSocketEvent.PUBLISH_FAILED, type="com.flashproxy.rtmfp.events.RTMFPSocketEvent")] public class RTMFPSocket extends EventDispatcher - { - /* The name of the "media" to pass between peers */ - private static const DATA:String = "data"; - private static const DEFAULT_CIRRUS_ADDRESS:String = "rtmfp://p2p.rtmfp.net"; - private static const DEFAULT_CIRRUS_KEY:String = RTMFP::CIRRUS_KEY; - private static const DEFAULT_CONNECT_TIMEOUT:uint = 4000; - - /* Connection to the Cirrus rendezvous service */ - private var connection:NetConnection; - - /* ID of the peer to connect to */ - private var peerID:String; - - /* Data streams to be established with peer */ - private var sendStream:NetStream; - private var recvStream:NetStream; - - /* Timeouts */ - private var connectionTimeout:int; - private var peerConnectTimeout:uint; - - public function RTMFPSocket(){} - - public function connect(addr:String = DEFAULT_CIRRUS_ADDRESS, key:String = DEFAULT_CIRRUS_KEY):void - { - connection = new NetConnection(); - connection.addEventListener(NetStatusEvent.NET_STATUS, onNetStatusEvent); - connection.addEventListener(IOErrorEvent.IO_ERROR, onIOErrorEvent); - connection.addEventListener(SecurityErrorEvent.SECURITY_ERROR, onSecurityErrorEvent); - connection.connect(addr + "/" + key); - connectionTimeout = setInterval(fail, DEFAULT_CONNECT_TIMEOUT); - } - - public function close():void - { - connection.close(); - } - - public function get id():String - { - if (connection != null && connection.connected) { - return connection.nearID; + { + private const CONNECT_TIMEOUT:uint = 10000; + + private var s_c:CirrusSocket; + + private var recv_stream:NetStream; + private var send_stream:NetStream; + + private var connect_timeout:int; + + public function RTMFPSocket(s_c:CirrusSocket) + { + this.s_c = s_c; + recv_stream = null; + send_stream = null; + connect_timeout = 0; + } + + /* Tears down this RTMFPSocket, closing both its streams. + To be used when destroying this object. If you just want + to disconnect from a client, call disconnect() below */ + public function close():void + { + if (send_stream != null) { + s_c.connection.removeEventListener(NetStatusEvent.NET_STATUS, on_stream_disconnection_event); + send_stream.close(); + } + + if (recv_stream != null) { + recv_stream.close(); } - - return null; - } - - public function get connected():Boolean - { - return (connection != null && connection.connected); }
- public function readBytes(bytes:ByteArray):void + /* In RTMFP, you connect to a remote socket by requesting to + "play" the data being published on a named stream by the + host identified by id. The connection request goes through + the Cirrus server which handles the mapping from id/stream + to IP/port and any necessary NAT traversal. */ + public function connect(id:String, stream:String):void { - recvStream.client.bytes.readBytes(bytes); - } - - public function writeBytes(bytes:ByteArray):void + recv_stream = new NetStream(s_c.connection, id); + var client:RTMFPSocketClient = new RTMFPSocketClient(); + client.addEventListener(ProgressEvent.SOCKET_DATA, on_data_available, false, 0, true); + client.addEventListener(RTMFPSocketClient.CONNECT_ACKNOWLEDGED, on_connect_acknowledged, false, 0, true); + recv_stream.client = client; + recv_stream.addEventListener(NetStatusEvent.NET_STATUS, on_recv_stream_event); + recv_stream.play(stream); + connect_timeout = setTimeout(on_connect_timeout, CONNECT_TIMEOUT, recv_stream); + } + + public function get connected():Boolean { - sendStream.send("dataAvailable", bytes); - } - + return (recv_stream != null && recv_stream.client != null && + RTMFPSocketClient(recv_stream.client).connect_acknowledged); + } + + public function disconnect():void + { + if (recv_stream != null) { + if (recv_stream.client != null) { + recv_stream.client.removeEventListener(ProgressEvent.SOCKET_DATA, on_data_available); + recv_stream.client.removeEventListener(RTMFPSocketClient.CONNECT_ACKNOWLEDGED, on_connect_acknowledged); + } + recv_stream.removeEventListener(NetStatusEvent.NET_STATUS, on_recv_stream_event); + recv_stream.close(); + recv_stream = null; + } + } + + /* In RTMFP, you open a listening socket by publishing a named + stream that others can connect to instead of listening on a port. + You register this stream with the Cirrus server via the Cirrus + socket so that it can redirect connection requests for an id/stream + tuple to this socket. */ + public function listen(stream:String):void + { + // apparently streams don't get disconnection events, only the NetConnection + // object does...bleh. + s_c.connection.addEventListener(NetStatusEvent.NET_STATUS, on_stream_disconnection_event); + + send_stream = new NetStream(s_c.connection, NetStream.DIRECT_CONNECTIONS); + send_stream.addEventListener(NetStatusEvent.NET_STATUS, on_send_stream_event); + var client:Object = new Object(); + client.onPeerConnect = on_peer_connect; + send_stream.client = client; + send_stream.publish(stream); + } + public function get peer():String { - return this.peerID; + if (!connected) return null; + return recv_stream.farID; } - - public function set peer(peerID:String):void - { - if (peerID == null || peerID.length == 0) { - throw new Error("Peer ID is null/empty.") - } else if (peerID == connection.nearID) { - throw new Error("Peer ID cannot be the same as our ID."); - } else if (this.peerID == peerID) { - throw new Error("Already connected to peer " + peerID + "."); - } else if (this.recvStream != null) { - throw new Error("Cannot connect to a second peer."); - } - - this.peerID = peerID; - - recvStream = new NetStream(connection, peerID); - var client:RTMFPSocketClient = new RTMFPSocketClient(); - client.addEventListener(ProgressEvent.SOCKET_DATA, onDataAvailable, false, 0, true); - client.addEventListener(RTMFPSocketClient.PEER_CONNECT_ACKNOWLEDGED, onPeerConnectAcknowledged, false, 0, true); - recvStream.client = client; - recvStream.addEventListener(NetStatusEvent.NET_STATUS, onRecvStreamEvent); - recvStream.play(DATA); - setTimeout(onPeerConnectTimeout, peerConnectTimeout, recvStream); - } - - private function startPublishStream():void + + public function get peer_connected():Boolean { - sendStream = new NetStream(connection, NetStream.DIRECT_CONNECTIONS); - sendStream.addEventListener(NetStatusEvent.NET_STATUS, onSendStreamEvent); - var o:Object = new Object(); - o.onPeerConnect = onPeerConnect; - sendStream.client = o; - sendStream.publish(DATA); + return send_stream.peerStreams.length > 0; } - - private function fail():void + + public function readBytes(bytes:ByteArray, offset:uint = 0, length:uint = 0):void { - clearInterval(connectionTimeout); - dispatchEvent(new RTMFPSocketEvent(RTMFPSocketEvent.CONNECT_FAIL)); + if (recv_stream != null && recv_stream.client != null) { + recv_stream.client.bytes.readBytes(bytes, offset, length); + } }
- private function onDataAvailable(event:ProgressEvent):void - { - dispatchEvent(event); - }
- private function onIOErrorEvent(event:IOErrorEvent):void + public function writeBytes(bytes:ByteArray):void { - fail(); + if (send_stream != null && peer_connected) { + send_stream.send(RTMFPSocketClient.DATA_AVAILABLE, bytes); + } } - - private function onNetStatusEvent(event:NetStatusEvent):void + + /* Listens for acknowledgement of a connection attempt to a + remote peer. */ + private function on_connect_acknowledged(event:Event):void { - switch (event.info.code) { - case "NetConnection.Connect.Success" : - clearInterval(connectionTimeout); - startPublishStream(); - dispatchEvent(new RTMFPSocketEvent(RTMFPSocketEvent.CONNECT_SUCCESS)); - break; - case "NetStream.Connect.Success" : - break; - case "NetStream.Publish.BadName" : - fail(); - break; - case "NetStream.Connect.Closed" : - // we've disconnected from the peer - // can reset to accept another - // clear the publish stream and re-publish another - dispatchEvent(new RTMFPSocketEvent(RTMFPSocketEvent.PEER_DISCONNECTED, recvStream)); - break; - } + clearTimeout(connect_timeout); + dispatchEvent(new RTMFPSocketEvent(RTMFPSocketEvent.CONNECT_SUCCESS, recv_stream)); }
- private function onPeerConnect(peer:NetStream):Boolean + /* If we don't get a connection acknowledgement by the time this + timeout function is called, we punt. */ + private function on_connect_timeout(peer:NetStream):void { - // establish a bidirectional stream with the peer - if (peerID == null) { - this.peer = peer.farID; + if (!this.connected) { + dispatchEvent(new RTMFPSocketEvent(RTMFPSocketEvent.CONNECT_FAILED, recv_stream)); } - - // disallow additional peers connecting to us - if (peer.farID != peerID) - return false; - - peer.send("setPeerConnectAcknowledged"); - dispatchEvent(new RTMFPSocketEvent(RTMFPSocketEvent.PEER_CONNECTED, peer)); - - return true; } - - private function onPeerConnectAcknowledged(event:Event):void + + private function on_data_available(event:ProgressEvent):void { - dispatchEvent(new RTMFPSocketEvent(RTMFPSocketEvent.PEERING_SUCCESS, recvStream)); + dispatchEvent(event); } - - private function onPeerConnectTimeout(peer:NetStream):void + + private function on_recv_stream_event(event:NetStatusEvent):void { - if (!recvStream.client) return; - if (!RTMFPSocketClient(recvStream.client).peerConnectAcknowledged) { - dispatchEvent(new RTMFPSocketEvent(RTMFPSocketEvent.PEERING_FAIL, recvStream)); - } + /* empty, here for symmetry */ }
- private function onSecurityErrorEvent(event:SecurityErrorEvent):void + /* This function gets called whenever someone tries to connect + to this socket's send_stream tuple. We don't want multiple + peers connecting at once, so we disallow that. The socket + acknowledges the connection back to the peer with the + SET_CONNECTION_ACKNOWLEDGED message. */ + private function on_peer_connect(peer:NetStream):Boolean { - fail(); + if (peer_connected) { + return false; + } + + peer.send(RTMFPSocketClient.SET_CONNECT_ACKNOWLEDGED); + + // need to do this in a timeout so that this function can + // return true to finalize the connection before firing the event + setTimeout(function (stream:NetStream):void { + dispatchEvent(new RTMFPSocketEvent(RTMFPSocketEvent.PEER_CONNECTED, stream)); + }, 0, peer); + + return true; } - - private function onSendStreamEvent(event:NetStatusEvent):void + + private function on_send_stream_event(event:NetStatusEvent):void { switch (event.info.code) { - case ("NetStream.Publish.Start") : - dispatchEvent(new RTMFPSocketEvent(RTMFPSocketEvent.PUBLISH_START)); - break; - case ("NetStream.Play.Reset") : - case ("NetStream.Play.Start") : - break; + case "NetStream.Publish.Start": + dispatchEvent(new RTMFPSocketEvent(RTMFPSocketEvent.PUBLISH_STARTED)); + break; + case "NetStream.Publish.BadName": + dispatchEvent(new RTMFPSocketEvent(RTMFPSocketEvent.PUBLISH_FAILED)); + break; + default: + break; } } - private function onRecvStreamEvent(event:NetStatusEvent):void + + private function on_stream_disconnection_event(event:NetStatusEvent):void { - switch (event.info.code) { - case ("NetStream.Publish.Start") : - case ("NetStream.Play.Reset") : - case ("NetStream.Play.Start") : - break; + if (event.info.code == "NetStream.Connect.Closed") { + dispatchEvent(new RTMFPSocketEvent(RTMFPSocketEvent.PEER_DISCONNECTED)); + //disconnect(); } } } diff --git a/rtmfp/RTMFPSocketClient.as b/rtmfp/RTMFPSocketClient.as index e2f93ef..7469a53 100644 --- a/rtmfp/RTMFPSocketClient.as +++ b/rtmfp/RTMFPSocketClient.as @@ -5,20 +5,20 @@ package rtmfp import flash.events.ProgressEvent; import flash.utils.ByteArray;
- [Event(name="peerConnectAcknowledged", type="flash.events.Event")] + [Event(name=RTMFPSocketClient.CONNECT_ACKNOWLEDGED, type="flash.events.Event")] public dynamic class RTMFPSocketClient extends EventDispatcher { - public static const PEER_CONNECT_ACKNOWLEDGED:String = "peerConnectAcknowledged"; + public static const DATA_AVAILABLE:String = "data_available"; + public static const CONNECT_ACKNOWLEDGED:String = "connectAcknowledged"; + public static const SET_CONNECT_ACKNOWLEDGED:String = "set_connect_acknowledged";
private var _bytes:ByteArray; - private var _peerID:String; - private var _peerConnectAcknowledged:Boolean; + private var _connect_acknowledged:Boolean;
public function RTMFPSocketClient() { super(); _bytes = new ByteArray(); - _peerID = null; - _peerConnectAcknowledged = false; + _connect_acknowledged = false; }
public function get bytes():ByteArray @@ -26,32 +26,22 @@ package rtmfp return _bytes; }
- public function dataAvailable(bytes:ByteArray):void + public function data_available(bytes:ByteArray):void { this._bytes.clear(); bytes.readBytes(this._bytes); dispatchEvent(new ProgressEvent(ProgressEvent.SOCKET_DATA, false, false, this._bytes.bytesAvailable, this._bytes.length)); }
- public function get peerConnectAcknowledged():Boolean + public function get connect_acknowledged():Boolean { - return _peerConnectAcknowledged; + return _connect_acknowledged; }
- public function setPeerConnectAcknowledged():void + public function set_connect_acknowledged():void { - _peerConnectAcknowledged = true; - dispatchEvent(new Event(PEER_CONNECT_ACKNOWLEDGED)); - } - - public function get peerID():String - { - return _peerID; - } - - public function set peerID(id:String):void - { - _peerID = id; + _connect_acknowledged = true; + dispatchEvent(new Event(CONNECT_ACKNOWLEDGED)); } } } diff --git a/rtmfp/events/CirrusSocketEvent.as b/rtmfp/events/CirrusSocketEvent.as new file mode 100644 index 0000000..831ad73 --- /dev/null +++ b/rtmfp/events/CirrusSocketEvent.as @@ -0,0 +1,22 @@ +package rtmfp.events +{ + import flash.events.Event; + + public class CirrusSocketEvent extends Event + { + public static const CONNECT_CLOSED:String = "connectClosed"; + public static const CONNECT_FAILED:String = "connectFailed"; + public static const CONNECT_SUCCESS:String = "connectSuccess"; + public static const HELLO_RECEIVED:String = "helloReceived"; + + public var peer:String; + public var stream:String; + + public function CirrusSocketEvent(type:String, peer:String = null, stream:String = null, bubbles:Boolean = false, cancelable:Boolean = false) + { + super(type, bubbles, cancelable); + this.peer = peer; + this.stream = stream; + } + } +} diff --git a/rtmfp/events/FacilitatorSocketEvent.as b/rtmfp/events/FacilitatorSocketEvent.as new file mode 100644 index 0000000..a0599aa --- /dev/null +++ b/rtmfp/events/FacilitatorSocketEvent.as @@ -0,0 +1,22 @@ +package rtmfp.events +{ + import flash.events.Event; + + public class FacilitatorSocketEvent extends Event + { + public static const CONNECT_CLOSED:String = "connectClosed"; + public static const CONNECT_FAILED:String = "connectFailed"; + public static const CONNECT_SUCCESS:String = "connectSuccess"; + public static const REGISTRATION_RECEIVED:String = "registrationReceived"; + public static const REGISTRATION_FAILED:String = "registrationFailed"; + public static const REGISTRATIONS_EMPTY:String = "registrationsEmpty"; + + public var client:String; + + public function FacilitatorSocketEvent(type:String, client:String = null, bubbles:Boolean = false, cancelable:Boolean = false) + { + super(type, bubbles, cancelable); + this.client = client; + } + } +} diff --git a/rtmfp/events/RTMFPSocketEvent.as b/rtmfp/events/RTMFPSocketEvent.as index 5bc08e5..87a7e09 100644 --- a/rtmfp/events/RTMFPSocketEvent.as +++ b/rtmfp/events/RTMFPSocketEvent.as @@ -5,20 +5,21 @@ package rtmfp.events
public class RTMFPSocketEvent extends Event { + public static const CONNECT_FAILED:String = "connectFailed"; public static const CONNECT_SUCCESS:String = "connectSuccess"; - public static const CONNECT_FAIL:String = "connectFail"; - public static const PUBLISH_START:String = "publishStart"; + public static const CONNECT_CLOSED:String = "connectClosed" public static const PEER_CONNECTED:String = "peerConnected"; public static const PEER_DISCONNECTED:String = "peerDisconnected"; - public static const PEERING_SUCCESS:String = "peeringSuccess"; - public static const PEERING_FAIL:String = "peeringFail"; + public static const PLAY_STARTED:String = "playStarted"; + public static const PUBLISH_STARTED:String = "publishStarted"; + public static const PUBLISH_FAILED:String = "publishFailed";
public var stream:NetStream;
- public function RTMFPSocketEvent(type:String, streamVal:NetStream = null, bubbles:Boolean = false, cancelable:Boolean = false) + public function RTMFPSocketEvent(type:String, stream:NetStream = null, bubbles:Boolean = false, cancelable:Boolean = false) { super(type, bubbles, cancelable); - stream = streamVal; + this.stream = stream; } } } diff --git a/rtmfpcat.as b/rtmfpcat.as index d01bb37..866175e 100644 --- a/rtmfpcat.as +++ b/rtmfpcat.as @@ -5,46 +5,54 @@ package import flash.display.StageScaleMode; import flash.text.TextField; import flash.text.TextFormat; - import flash.net.Socket; import flash.events.Event; - import flash.events.EventDispatcher; - import flash.events.IOErrorEvent; - import flash.events.NetStatusEvent; - import flash.events.ProgressEvent; - import flash.events.SecurityErrorEvent; - import flash.utils.ByteArray; - import flash.utils.setTimeout; + import flash.utils.clearInterval; + import flash.utils.setInterval;
- import rtmfp.RTMFPSocket; - import rtmfp.events.RTMFPSocketEvent; + import rtmfp.CirrusSocket; + import rtmfp.FacilitatorSocket; + import rtmfp.ProxyPair; + import rtmfp.events.CirrusSocketEvent; + import rtmfp.events.FacilitatorSocketEvent;
public class rtmfpcat extends Sprite { - /* David's relay (nickname 3VXRyxz67OeRoqHn) that also serves a - crossdomain policy. */ - private const DEFAULT_TOR_PROXY_ADDR:Object = { - host: "173.255.221.44", - port: 9001 - }; - /* Nate's facilitator -- also serving a crossdomain policy */ + /* Adobe's Cirrus server and Nate's key */ + private const DEFAULT_CIRRUS_ADDR:String = "rtmfp://p2p.rtmfp.net"; + private const DEFAULT_CIRRUS_KEY:String = RTMFP::CIRRUS_KEY; + + /* Nate's facilitator -- serves a crossdomain policy */ private const DEFAULT_FACILITATOR_ADDR:Object = { host: "128.12.179.80", port: 9002 }; + private const DEFAULT_TOR_CLIENT_ADDR:Object = { host: "127.0.0.1", port: 3333 }; + + /* David's relay (nickname 3VXRyxz67OeRoqHn) that also serves a + crossdomain policy. */ + private const DEFAULT_TOR_PROXY_ADDR:Object = { + host: "173.255.221.44", + port: 9001 + }; + + /* Poll facilitator every 3 sec if in proxy mode and haven't found + anyone to proxy */ + private const DEFAULT_FAC_POLL_INTERVAL:uint = 3000;
- // Milliseconds. - private const FACILITATOR_POLL_INTERVAL:int = 10000; - + // Socket to Cirrus server + private var s_c:CirrusSocket; // Socket to facilitator. - private var s_f:Socket; - // Socket to RTMFP peer (flash proxy). - private var s_r:RTMFPSocket; - // Socket to local Tor client. - private var s_t:Socket; + private var s_f:FacilitatorSocket; + // Handle local-remote traffic + private var p_p:ProxyPair; + + private var proxy_mode:Boolean; + + private var fac_poll_interval:uint;
/* TextField for debug output. */ private var output_text:TextField; @@ -52,14 +60,6 @@ package private var fac_addr:Object; private var tor_addr:Object;
- private var proxy_mode:Boolean; - - public function puts(s:String):void - { - output_text.appendText(s + "\n"); - output_text.scrollV = output_text.maxScrollV; - } - public function rtmfpcat() { // Absolute positioning. @@ -72,7 +72,8 @@ package output_text.background = true; output_text.backgroundColor = 0x001f0f; output_text.textColor = 0x44cc44; - + addChild(output_text); + puts("Starting."); // Wait until the query string parameters are loaded. this.loaderInfo.addEventListener(Event.COMPLETE, loaderinfo_complete); @@ -82,11 +83,10 @@ package { var fac_spec:String; var tor_spec:String; - + puts("Parameters loaded.");
proxy_mode = (this.loaderInfo.parameters["proxy"] != null); - addChild(output_text);
fac_spec = this.loaderInfo.parameters["facilitator"]; if (fac_spec) { @@ -109,10 +109,11 @@ package return; } } else { - if (proxy_mode) + if (proxy_mode) { tor_addr = DEFAULT_TOR_PROXY_ADDR; - else + } else { tor_addr = DEFAULT_TOR_CLIENT_ADDR; + } }
main(); @@ -121,105 +122,106 @@ package /* The main logic begins here, after start-up issues are taken care of. */ private function main():void { - establishRTMFPConnection(); + establish_cirrus_connection(); }
- private function establishRTMFPConnection():void + private function establish_cirrus_connection():void { - s_r = new RTMFPSocket(); - s_r.addEventListener(RTMFPSocketEvent.CONNECT_SUCCESS, function (e:Event):void { - puts("Cirrus: connected with id " + s_r.id + "."); - establishFacilitatorConnection(); + s_c = new CirrusSocket(); + s_c.addEventListener(CirrusSocketEvent.CONNECT_SUCCESS, function (e:CirrusSocketEvent):void { + puts("Cirrus: connected with id " + s_c.id + "."); + if (proxy_mode) { + fac_poll_interval = setInterval(establish_facilitator_connection, DEFAULT_FAC_POLL_INTERVAL); + } else { + establish_facilitator_connection(); + } }); - s_r.addEventListener(RTMFPSocketEvent.CONNECT_FAIL, function (e:Event):void { + s_c.addEventListener(CirrusSocketEvent.CONNECT_FAILED, function (e:CirrusSocketEvent):void { puts("Error: failed to connect to Cirrus."); }); - s_r.addEventListener(RTMFPSocketEvent.PUBLISH_START, function(e:RTMFPSocketEvent):void { - puts("Publishing started."); + s_c.addEventListener(CirrusSocketEvent.CONNECT_CLOSED, function (e:CirrusSocketEvent):void { + puts("Cirrus: closed connection."); }); - s_r.addEventListener(RTMFPSocketEvent.PEER_CONNECTED, function(e:RTMFPSocketEvent):void { - puts("Peer connected."); - }); - s_r.addEventListener(RTMFPSocketEvent.PEER_DISCONNECTED, function(e:RTMFPSocketEvent):void { - puts("Peer disconnected."); - }); - s_r.addEventListener(RTMFPSocketEvent.PEERING_SUCCESS, function(e:RTMFPSocketEvent):void { - puts("Peering success."); - establishTorConnection(); - }); - s_r.addEventListener(RTMFPSocketEvent.PEERING_FAIL, function(e:RTMFPSocketEvent):void { - puts("Peering fail."); - }); - s_r.addEventListener(ProgressEvent.SOCKET_DATA, function (e:ProgressEvent):void { - var bytes:ByteArray = new ByteArray(); - s_r.readBytes(bytes); - puts("RTMFP: read " + bytes.length + " bytes."); - s_t.writeBytes(bytes); + + s_c.addEventListener(CirrusSocketEvent.HELLO_RECEIVED, function (e:CirrusSocketEvent):void { + puts("Cirrus: received hello from peer " + e.peer); + + /* don't bother if we already have a proxy going */ + if (p_p != null && p_p.connected) { + return; + } + + /* if we're in proxy mode, we should have already set + up a proxy pair */ + if (!proxy_mode) { + start_proxy_pair(); + s_c.send_hello(e.peer); + } + p_p.connect(e.peer, e.stream); }); - - s_r.connect(); + + s_c.connect(DEFAULT_CIRRUS_ADDR, DEFAULT_CIRRUS_KEY); }
- private function establishTorConnection():void + private function establish_facilitator_connection():void { - s_t = new Socket(); - s_t.addEventListener(Event.CONNECT, function (e:Event):void { - puts("Tor: connected to " + tor_addr.host + ":" + tor_addr.port + "."); - }); - s_t.addEventListener(Event.CLOSE, function (e:Event):void { - puts("Tor: closed connection."); - }); - s_t.addEventListener(IOErrorEvent.IO_ERROR, function (e:IOErrorEvent):void { - puts("Tor: I/O error: " + e.text + "."); - }); - s_t.addEventListener(ProgressEvent.SOCKET_DATA, function (e:ProgressEvent):void { - var bytes:ByteArray = new ByteArray(); - s_t.readBytes(bytes, 0, e.bytesLoaded); - puts("Tor: read " + bytes.length + " bytes."); - s_r.writeBytes(bytes); - }); - s_t.addEventListener(SecurityErrorEvent.SECURITY_ERROR, function (e:SecurityErrorEvent):void { - puts("Tor: security error: " + e.text + "."); + s_f = new FacilitatorSocket(); + s_f.addEventListener(FacilitatorSocketEvent.CONNECT_SUCCESS, function (e:Event):void { + if (proxy_mode) { + puts("Facilitator: getting registration."); + s_f.get_registration(); + } else { + puts("Facilitator: posting registration."); + s_f.post_registration(s_c.id); + } }); - - s_t.connect(tor_addr.host, tor_addr.port); + s_f.addEventListener(FacilitatorSocketEvent.CONNECT_FAILED, function (e:Event):void { + puts("Facilitator: connect failed."); + }); + s_f.addEventListener(FacilitatorSocketEvent.CONNECT_CLOSED, function (e:Event):void { + puts("Facilitator: connect closed."); + }); + + if (proxy_mode) { + s_f.addEventListener(FacilitatorSocketEvent.REGISTRATION_RECEIVED, function (e:FacilitatorSocketEvent):void { + puts("Facilitator: got registration " + e.client); + clearInterval(fac_poll_interval); + start_proxy_pair(); + s_c.send_hello(e.client); + }); + s_f.addEventListener(FacilitatorSocketEvent.REGISTRATIONS_EMPTY, function (e:Event):void { + puts("Facilitator: no registrations available."); + }); + } else { + s_f.addEventListener(FacilitatorSocketEvent.REGISTRATION_FAILED, function (e:Event):void { + puts("Facilitator: registration failed."); + }); + } + s_f.connect(fac_addr.host, fac_addr.port); } - - private function establishFacilitatorConnection():void + + private function start_proxy_pair():void { - s_f = new Socket(); - s_f.addEventListener(Event.CONNECT, function (e:Event):void { - puts("Facilitator: connected to " + fac_addr.host + ":" + fac_addr.port + "."); - if (proxy_mode) s_f.writeUTFBytes("GET / HTTP/1.0\r\n\r\n"); - else s_f.writeUTFBytes("POST / HTTP/1.0\r\n\r\nclient=" + s_r.id + "\r\n"); - }); - s_f.addEventListener(Event.CLOSE, function (e:Event):void { - puts("Facilitator: connection closed."); + puts("Starting proxy pair on stream " + s_c.local_stream_name); + p_p = new ProxyPair(this, s_c, tor_addr.host, tor_addr.port); + p_p.addEventListener(Event.CONNECT, function (e:Event):void { + puts("ProxyPair: connected!"); + }); + p_p.addEventListener(Event.CLOSE, function (e:Event):void { + puts("ProxyPair: connection closed."); + p_p = null; if (proxy_mode) { - setTimeout(establishFacilitatorConnection, FACILITATOR_POLL_INTERVAL); + fac_poll_interval = setInterval(establish_facilitator_connection, DEFAULT_FAC_POLL_INTERVAL); + } else { + establish_facilitator_connection(); } }); - s_f.addEventListener(IOErrorEvent.IO_ERROR, function (e:IOErrorEvent):void { - puts("Facilitator: I/O error: " + e.text + "."); - }); - s_f.addEventListener(ProgressEvent.SOCKET_DATA, function (e:ProgressEvent):void { - var clientID:String = s_f.readMultiByte(e.bytesLoaded, "utf-8"); - puts("Facilitator: got "" + clientID + """); - if (clientID != "Registration list empty") { - puts("Connecting to " + clientID + "."); - s_r.peer = clientID; - } - }); - s_f.addEventListener(SecurityErrorEvent.SECURITY_ERROR, function (e:SecurityErrorEvent):void { - puts("Facilitator: security error: " + e.text + "."); - }); - - s_f.connect(fac_addr.host, fac_addr.port); + p_p.listen(s_c.local_stream_name); }
/* Parse an address in the form "host:port". Returns an Object with keys "host" (String) and "port" (int). Returns null on error. */ - private static function parse_addr_spec(spec:String):Object + private function parse_addr_spec(spec:String):Object { var parts:Array; var addr:Object; @@ -233,5 +235,11 @@ package
return addr; } + + public function puts(s:String):void + { + output_text.appendText(s + "\n"); + output_text.scrollV = output_text.maxScrollV; + } } }