
commit 858801c0e8965aa09f8e68fe14a6ac27679b759c Author: Nate Hardison <nate@rescomp-09-154551.stanford.edu> Date: Mon May 9 02:45:24 2011 -0700 New com/jscat structure for Nate's stuff --- com/jscat/Connector.as | 188 ++++++++++++++++++++++++ com/jscat/Utils.as | 25 ++++ com/jscat/facilitator.py | 146 +++++++++++++++++++ com/jscat/rtmfp/RTMFPSocket.as | 216 ++++++++++++++++++++++++++++ com/jscat/rtmfp/RTMFPSocketClient.as | 57 ++++++++ com/jscat/rtmfp/events/RTMFPSocketEvent.as | 25 ++++ 6 files changed, 657 insertions(+), 0 deletions(-) diff --git a/com/jscat/Connector.as b/com/jscat/Connector.as new file mode 100644 index 0000000..dab7d0a --- /dev/null +++ b/com/jscat/Connector.as @@ -0,0 +1,188 @@ +package +{ + import flash.display.Sprite; + import flash.text.TextField; + import flash.net.Socket; + import flash.events.Event; + import flash.events.EventDispatcher; + import flash.events.IOErrorEvent; + import flash.events.NetStatusEvent; + import flash.events.ProgressEvent; + import flash.events.SecurityErrorEvent; + import flash.utils.ByteArray; + + import rtmfp.RTMFPSocket; + import rtmfp.events.RTMFPSocketEvent; + import Utils; + + public class Connector extends Sprite { + + /* David's relay (nickname 3VXRyxz67OeRoqHn) that also serves a + crossdomain policy. */ + private const DEFAULT_TOR_RELAY:Object = { + host: "173.255.221.44", + port: 9001 + }; + + private var output_text:TextField; + + private var s_f:Socket; + private var s_r:RTMFPSocket; + private var s_t:Socket; + + private var fac_addr:Object; + private var tor_addr:Object; + + public function Connector() + { + output_text = new TextField(); + output_text.width = 400; + output_text.height = 300; + output_text.background = true; + output_text.backgroundColor = 0x001f0f; + output_text.textColor = 0x44CC44; + addChild(output_text); + + puts("Starting."); + + this.loaderInfo.addEventListener(Event.COMPLETE, onLoaderInfoComplete); + } + + protected function puts(s:String):void + { + output_text.appendText(s + "\n"); + output_text.scrollV = output_text.maxScrollV; + } + + private function onLoaderInfoComplete(e:Event):void + { + var fac_spec:String; + var tor_spec:String; + + puts("Parameters loaded."); + + fac_spec = this.loaderInfo.parameters["facilitator"]; + if (!fac_spec) { + puts("Error: no \"facilitator\" specification provided."); + return; + } + puts("Facilitator spec: \"" + fac_spec + "\""); + fac_addr = Utils.parseAddrSpec(fac_spec); + if (!fac_addr) { + puts("Error: Facilitator spec must be in the form \"host:port\"."); + return; + } + + tor_spec = this.loaderInfo.parameters["tor"]; + if (!tor_spec) { + puts("Error: No Tor specification provided."); + return; + } + puts("Tor spec: \"" + tor_spec + "\"") + tor_addr = Utils.parseAddrSpec(tor_spec); + if (!tor_addr) { + puts("Error: Tor spec must be in the form \"host:port\"."); + return; + } + + s_r = new RTMFPSocket(); + s_r.addEventListener(RTMFPSocketEvent.CONNECT_SUCCESS, onRTMFPSocketConnect); + s_r.addEventListener(RTMFPSocketEvent.CONNECT_FAIL, function (e:Event):void { + puts("Error: failed to connect to Cirrus."); + }); + s_r.addEventListener(RTMFPSocketEvent.PUBLISH_START, function(e:RTMFPSocketEvent):void { + + }); + s_r.addEventListener(RTMFPSocketEvent.PEER_CONNECTED, function(e:RTMFPSocketEvent):void { + + }); + s_r.addEventListener(RTMFPSocketEvent.PEER_DISCONNECTED, function(e:RTMFPSocketEvent):void { + + }); + s_r.addEventListener(RTMFPSocketEvent.PEERING_SUCCESS, function(e:RTMFPSocketEvent):void { + + }); + s_r.addEventListener(RTMFPSocketEvent.PEERING_FAIL, function(e:RTMFPSocketEvent):void { + + }); + s_r.addEventListener(ProgressEvent.SOCKET_DATA, function (e:ProgressEvent):void { + var bytes:ByteArray = new ByteArray(); + s_r.readBytes(bytes); + puts("RTMFP: read " + bytes.length + " bytes."); + s_t.writeBytes(bytes); + }); + + s_r.connect(); + } + + private function onRTMFPSocketConnect(event:RTMFPSocketEvent):void + { + puts("Cirrus: connected with id " + s_r.id + "."); + s_t = new Socket(); + s_t.addEventListener(Event.CONNECT, onTorSocketConnect); + s_t.addEventListener(Event.CLOSE, function (e:Event):void { + + }); + s_t.addEventListener(IOErrorEvent.IO_ERROR, function (e:IOErrorEvent):void { + puts("Tor: I/O error: " + e.text + "."); + }); + s_t.addEventListener(ProgressEvent.SOCKET_DATA, function (e:ProgressEvent):void { + var bytes:ByteArray = new ByteArray(); + s_t.readBytes(bytes, 0, e.bytesLoaded); + puts("Tor: read " + bytes.length + " bytes."); + s_r.writeBytes(bytes); + }); + s_t.addEventListener(SecurityErrorEvent.SECURITY_ERROR, function (e:SecurityErrorEvent):void { + puts("Tor: security error: " + e.text + "."); + }); + + s_t.connect(tor_addr.host, tor_addr.port); + onTorSocketConnect(new Event("")); + } + + private function onTorSocketConnect(event:Event):void + { + puts("Tor: connected to " + tor_addr.host + ":" + tor_addr.port + "."); + + s_f = new Socket(); + s_f.addEventListener(Event.CONNECT, onFacilitatorSocketConnect); + s_f.addEventListener(Event.CLOSE, function (e:Event):void { + + }); + s_f.addEventListener(IOErrorEvent.IO_ERROR, function (e:IOErrorEvent):void { + puts("Facilitator: I/O error: " + e.text + "."); + }); + s_f.addEventListener(ProgressEvent.SOCKET_DATA, function (e:ProgressEvent):void { + var clientID:String = s_f.readMultiByte(e.bytesLoaded, "utf-8"); + puts("Facilitator: got \"" + clientID + "\""); + puts("Connecting to " + clientID + "."); + s_r.peer = clientID; + }); + s_f.addEventListener(SecurityErrorEvent.SECURITY_ERROR, function (e:SecurityErrorEvent):void { + puts("Facilitator: security error: " + e.text + "."); + }); + + s_f.connect(fac_addr.host, fac_addr.port); + } + + private function onFacilitatorSocketConnect(event:Event):void + { + puts("Facilitator: connected to " + fac_addr.host + ":" + fac_addr.port + "."); + onConnectionEvent(); + } + + private function onConnectionEvent():void + { + if (s_f != null && s_f.connected && s_t != null && /*s_t.connected && */ + s_r != null && s_r.connected) { + if (this.loaderInfo.parameters["proxy"]) { + s_f.writeUTFBytes("GET / HTTP/1.0\r\n\r\n"); + } else { + var str:String = "POST / HTTP/1.0\r\n\r\nclient=" + s_r.id + "\r\n" + puts(str); + s_f.writeUTFBytes(str); + } + } + } + } +} \ No newline at end of file diff --git a/com/jscat/Utils.as b/com/jscat/Utils.as new file mode 100644 index 0000000..48f9a62 --- /dev/null +++ b/com/jscat/Utils.as @@ -0,0 +1,25 @@ +package +{ + + public class Utils { + + /* Parse an address in the form "host:port". Returns an Object with + keys "host" (String) and "port" (int). Returns null on error. */ + public static function parseAddrSpec(spec:String):Object + { + var parts:Array; + var addr:Object; + + parts = spec.split(":", 2); + if (parts.length != 2 || !parseInt(parts[1])) + return null; + addr = {} + addr.host = parts[0]; + addr.port = parseInt(parts[1]); + + return addr; + } + + } + +} \ No newline at end of file diff --git a/com/jscat/facilitator.py b/com/jscat/facilitator.py new file mode 100755 index 0000000..8c6a44e --- /dev/null +++ b/com/jscat/facilitator.py @@ -0,0 +1,146 @@ +#!/usr/bin/env python + +import BaseHTTPServer +import getopt +import cgi +import re +import sys +import socket +from collections import deque + +DEFAULT_ADDRESS = "0.0.0.0" +DEFAULT_PORT = 9002 + +def usage(f = sys.stdout): + print >> f, """\ +Usage: %(progname)s <OPTIONS> [HOST] [PORT] +Flash bridge facilitator: Register client addresses with HTTP POST requests +and serve them out again with HTTP GET. Listen on HOST and PORT, by default +%(addr)s %(port)d. + -h, --help show this help.\ +""" % { + "progname": sys.argv[0], + "addr": DEFAULT_ADDRESS, + "port": DEFAULT_PORT, +} + +REGS = deque() + +class Reg(object): + def __init__(self, id): + self.id = id + + def __unicode__(self): + return u"%s" % (self.id) + + def __str__(self): + return unicode(self).encode("UTF-8") + + def __cmp__(self, other): + return cmp((self.id), (other.id)) + + @staticmethod + def parse(spec, defhost = None, defport = None): + host = None + port = None + m = re.match(r'^\[(.+)\]:(\d*)$', spec) + if m: + host, port = m.groups() + af = socket.AF_INET6 + else: + m = re.match(r'^(.*):(\d*)$', spec) + if m: + host, port = m.groups() + if host: + af = socket.AF_INET + else: + # Has to be guessed from format of defhost. + af = 0 + host = host or defhost + port = port or defport + if not (host and port): + raise ValueError("Bad address specification \"%s\"" % spec) + + try: + addrs = socket.getaddrinfo(host, port, af, socket.SOCK_STREAM, socket.IPPROTO_TCP, socket.AI_NUMERICHOST) + except socket.gaierror, e: + raise ValueError("Bad host or port: \"%s\" \"%s\": %s" % (host, port, str(e))) + if not addrs: + raise ValueError("Bad host or port: \"%s\" \"%s\"" % (host, port)) + + af = addrs[0][0] + host, port = socket.getnameinfo(addrs[0][4], socket.NI_NUMERICHOST | socket.NI_NUMERICSERV) + return Reg(af, host, int(port)) + +def fetch_reg(): + """Get a client registration, or None if none is available.""" + if not REGS: + return None + return REGS.popleft() + +class Handler(BaseHTTPServer.BaseHTTPRequestHandler): + def do_GET(self): + print "From " + str(self.client_address) + " received: GET:", + reg = fetch_reg() + if reg: + print "Handing out " + str(reg) + ". Clients: " + str(len(REGS)) + self.request.send(str(reg)) + else: + print "Registration list is empty" + self.request.send("Registration list empty") + + def do_POST(self): + print "From " + str(self.client_address) + " received: POST:", + data = self.rfile.readline().strip() + print data + " :", + try: + vals = cgi.parse_qs(data, False, True) + except ValueError, e: + print "Syntax error in POST:", str(e) + return + + client_specs = vals.get("client") + if client_specs is None or len(client_specs) != 1: + print "In POST: need exactly one \"client\" param" + return + val = client_specs[0] + + try: + reg = Reg(val) + except ValueError, e: + print "Can't parse client \"%s\": %s" % (val, str(e)) + return + + if reg not in list(REGS): + REGS.append(reg) + print "Registration " + str(reg) + " added. Registrations: " + str(len(REGS)) + else: + print "Registration " + str(reg) + " already present. Registrations: " + str(len(REGS)) + +opts, args = getopt.gnu_getopt(sys.argv[1:], "h", ["help"]) +for o, a in opts: + if o == "-h" or o == "--help": + usage() + sys.exit() + +if len(args) == 0: + address = (DEFAULT_ADDRESS, DEFAULT_PORT) +elif len(args) == 1: + # Either HOST or PORT may be omitted; figure out which one. + if args[0].isdigit(): + address = (DEFAULT_ADDRESS, args[0]) + else: + address = (args[0], DEFAULT_PORT) +elif len(args) == 2: + address = (args[0], args[1]) +else: + usage(sys.stderr) + sys.exit(1) + +# Setup the server +server = BaseHTTPServer.HTTPServer(address, Handler) + +print "Starting Facilitator on " + str(address) + "..." + +# Run server... Single threaded serving of requests... +server.serve_forever() diff --git a/com/jscat/rtmfp/RTMFPSocket.as b/com/jscat/rtmfp/RTMFPSocket.as new file mode 100644 index 0000000..4b83784 --- /dev/null +++ b/com/jscat/rtmfp/RTMFPSocket.as @@ -0,0 +1,216 @@ +package rtmfp +{ + import flash.events.Event; + import flash.events.EventDispatcher; + import flash.events.IOErrorEvent; + import flash.events.NetStatusEvent; + import flash.events.ProgressEvent; + import flash.events.SecurityErrorEvent; + import flash.net.NetConnection; + import flash.net.NetStream; + import flash.utils.ByteArray; + import flash.utils.clearInterval; + import flash.utils.setInterval; + import flash.utils.setTimeout; + + import rtmfp.RTMFPSocketClient; + import rtmfp.events.RTMFPSocketEvent; + + [Event(name="connectSuccess", type="com.jscat.rtmfp.events.RTMFPSocketEvent")] + [Event(name="connectFail", type="com.jscat.rtmfp.events.RTMFPSocketEvent")] + [Event(name="publishStart", type="com.jscat.rtmfp.events.RTMFPSocketEvent")] + [Event(name="peerConnected", type="com.jscat.rtmfp.events.RTMFPSocketEvent")] + [Event(name="peeringSuccess", type="com.jscat.rtmfp.events.RTMFPSocketEvent")] + [Event(name="peeringFail", type="com.jscat.rtmfp.events.RTMFPSocketEvent")] + [Event(name="peerDisconnected", type="com.jscat.rtmfp.events.RTMFPSocketEvent")] + public class RTMFPSocket extends EventDispatcher + { + /* The name of the "media" to pass between peers */ + private static const DATA:String = "data"; + private static const DEFAULT_CIRRUS_ADDRESS:String = "rtmfp://p2p.rtmfp.net"; + private static const DEFAULT_CIRRUS_KEY:String = RTMFP::CIRRUS_KEY; + private static const DEFAULT_CONNECT_TIMEOUT:uint = 4000; + + /* Connection to the Cirrus rendezvous service */ + private var connection:NetConnection; + + /* ID of the peer to connect to */ + private var peerID:String; + + /* Data streams to be established with peer */ + private var sendStream:NetStream; + private var recvStream:NetStream; + + /* Timeouts */ + private var connectionTimeout:int; + private var peerConnectTimeout:uint; + + public function RTMFPSocket(){} + + public function connect(addr:String = DEFAULT_CIRRUS_ADDRESS, key:String = DEFAULT_CIRRUS_KEY):void + { + connection = new NetConnection(); + connection.addEventListener(NetStatusEvent.NET_STATUS, onNetStatusEvent); + connection.addEventListener(IOErrorEvent.IO_ERROR, onIOErrorEvent); + connection.addEventListener(SecurityErrorEvent.SECURITY_ERROR, onSecurityErrorEvent); + connection.connect(addr + "/" + key); + connectionTimeout = setInterval(fail, DEFAULT_CONNECT_TIMEOUT); + } + + public function get id():String + { + if (connection != null && connection.connected) { + return connection.nearID; + } + + return null; + } + + public function get connected():Boolean + { + return (connection != null && connection.connected); + } + + public function readBytes(bytes:ByteArray):void + { + recvStream.client.bytes.readBytes(bytes); + } + + public function writeBytes(bytes:ByteArray):void + { + sendStream.send("dataAvailable", bytes); + } + + public function get peer():String + { + return this.peerID; + } + + public function set peer(peerID:String):void + { + if (peerID == null || peerID.length == 0) { + throw new Error("Peer ID is null/empty.") + } else if (peerID == connection.nearID) { + throw new Error("Peer ID cannot be the same as our ID."); + } else if (this.peerID == peerID) { + throw new Error("Already connected to peer " + peerID + "."); + } else if (this.recvStream != null) { + throw new Error("Cannot connect to a second peer."); + } + + this.peerID = peerID; + + recvStream = new NetStream(connection, peerID); + var client:RTMFPSocketClient = new RTMFPSocketClient(); + client.addEventListener(ProgressEvent.SOCKET_DATA, onDataAvailable, false, 0, true); + client.addEventListener(RTMFPSocketClient.PEER_CONNECT_ACKNOWLEDGED, onPeerConnectAcknowledged, false, 0, true); + recvStream.client = client; + recvStream.addEventListener(NetStatusEvent.NET_STATUS, onRecvStreamEvent); + recvStream.play(DATA); + setTimeout(onPeerConnectTimeout, peerConnectTimeout, recvStream); + } + + private function startPublishStream():void + { + sendStream = new NetStream(connection, NetStream.DIRECT_CONNECTIONS); + sendStream.addEventListener(NetStatusEvent.NET_STATUS, onSendStreamEvent); + var o:Object = new Object(); + o.onPeerConnect = onPeerConnect; + sendStream.client = o; + sendStream.publish(DATA); + } + + private function fail():void + { + clearInterval(connectionTimeout); + dispatchEvent(new RTMFPSocketEvent(RTMFPSocketEvent.CONNECT_FAIL)); + } + + private function onDataAvailable(event:ProgressEvent):void + { + dispatchEvent(event); + } + + private function onIOErrorEvent(event:IOErrorEvent):void + { + fail(); + } + + private function onNetStatusEvent(event:NetStatusEvent):void + { + switch (event.info.code) { + case "NetConnection.Connect.Success" : + clearInterval(connectionTimeout); + startPublishStream(); + dispatchEvent(new RTMFPSocketEvent(RTMFPSocketEvent.CONNECT_SUCCESS)); + break; + case "NetStream.Connect.Success" : + break; + case "NetStream.Publish.BadName" : + fail(); + break; + case "NetStream.Connect.Closed" : + // we've disconnected from the peer + // can reset to accept another + // clear the publish stream and re-publish another + dispatchEvent(new RTMFPSocketEvent(RTMFPSocketEvent.PEER_DISCONNECTED, recvStream)); + break; + } + } + + private function onPeerConnect(peer:NetStream):Boolean + { + // establish a bidirectional stream with the peer + if (peerID == null) { + this.peer = peer.farID; + } + + // disallow additional peers connecting to us + if (peer.farID != peerID) return false; + + peer.send("setPeerConnectAcknowledged"); + dispatchEvent(new RTMFPSocketEvent(RTMFPSocketEvent.PEER_CONNECTED, peer)); + + return true; + } + + private function onPeerConnectAcknowledged(event:Event):void + { + dispatchEvent(new RTMFPSocketEvent(RTMFPSocketEvent.PEERING_SUCCESS, recvStream)); + } + + private function onPeerConnectTimeout(peer:NetStream):void + { + if (!recvStream.client) return; + if (!RTMFPSocketClient(recvStream.client).peerConnectAcknowledged) { + dispatchEvent(new RTMFPSocketEvent(RTMFPSocketEvent.PEERING_FAIL, recvStream)); + } + } + + private function onSecurityErrorEvent(event:SecurityErrorEvent):void + { + fail(); + } + + private function onSendStreamEvent(event:NetStatusEvent):void + { + switch (event.info.code) { + case ("NetStream.Publish.Start") : + dispatchEvent(new RTMFPSocketEvent(RTMFPSocketEvent.PUBLISH_START)); + break; + case ("NetStream.Play.Reset") : + case ("NetStream.Play.Start") : + break; + } + } + private function onRecvStreamEvent(event:NetStatusEvent):void + { + switch (event.info.code) { + case ("NetStream.Publish.Start") : + case ("NetStream.Play.Reset") : + case ("NetStream.Play.Start") : + break; + } + } + } +} diff --git a/com/jscat/rtmfp/RTMFPSocketClient.as b/com/jscat/rtmfp/RTMFPSocketClient.as new file mode 100644 index 0000000..d9fcffa --- /dev/null +++ b/com/jscat/rtmfp/RTMFPSocketClient.as @@ -0,0 +1,57 @@ +package rtmfp +{ + import flash.events.Event; + import flash.events.EventDispatcher; + import flash.events.ProgressEvent; + import flash.utils.ByteArray; + + [Event(name="peerConnectAcknowledged", type="flash.events.Event")] + public dynamic class RTMFPSocketClient extends EventDispatcher { + public static const PEER_CONNECT_ACKNOWLEDGED:String = "peerConnectAcknowledged"; + + private var _bytes:ByteArray; + private var _peerID:String; + private var _peerConnectAcknowledged:Boolean; + + public function RTMFPSocketClient() + { + super(); + _bytes = new ByteArray(); + _peerID = null; + _peerConnectAcknowledged = false; + } + + public function get bytes():ByteArray + { + return _bytes; + } + + public function dataAvailable(bytes:ByteArray):void + { + this._bytes.clear(); + bytes.readBytes(this._bytes); + dispatchEvent(new ProgressEvent(ProgressEvent.SOCKET_DATA, false, false, this._bytes.bytesAvailable, this._bytes.length)); + } + + public function get peerConnectAcknowledged():Boolean + { + return _peerConnectAcknowledged; + } + + public function setPeerConnectAcknowledged():void + { + _peerConnectAcknowledged = true; + dispatchEvent(new Event(PEER_CONNECT_ACKNOWLEDGED)); + } + + public function get peerID():String + { + return _peerID; + } + + public function set peerID(id:String):void + { + _peerID = id; + } + } +} \ No newline at end of file diff --git a/com/jscat/rtmfp/events/RTMFPSocketEvent.as b/com/jscat/rtmfp/events/RTMFPSocketEvent.as new file mode 100644 index 0000000..c5b4af1 --- /dev/null +++ b/com/jscat/rtmfp/events/RTMFPSocketEvent.as @@ -0,0 +1,25 @@ +package rtmfp.events +{ + import flash.events.Event; + import flash.net.NetStream; + + public class RTMFPSocketEvent extends Event + { + public static const CONNECT_SUCCESS:String = "connectSuccess"; + public static const CONNECT_FAIL:String = "connectFail"; + public static const PUBLISH_START:String = "publishStart"; + public static const PEER_CONNECTED:String = "peerConnected"; + public static const PEER_DISCONNECTED:String = "peerDisconnected"; + public static const PEERING_SUCCESS:String = "peeringSuccess"; + public static const PEERING_FAIL:String = "peeringFail"; + + public var stream:NetStream; + + public function RTMFPSocketEvent(type:String, streamVal:NetStream = null, bubbles:Boolean = false, cancelable:Boolean = false) + { + super(type, bubbles, cancelable); + stream = streamVal; + } + + } +} \ No newline at end of file