[flashproxy/master] Remove .py extension from executables.

commit a9119dd2b14bb845b0a19f96e8a5d48373c5df0c Author: David Fifield <david@bamsoftware.com> Date: Sun Jul 22 18:26:06 2012 -0700 Remove .py extension from executables. There's no reason to tie the executable names to their implementation language; also maybe the extensionless names are less scary for users. --- Makefile | 6 +- README | 16 +- experiments/exercise/exercise.sh | 2 +- experiments/switching/local-http-alternating.sh | 4 +- experiments/switching/local-http-constant.sh | 4 +- experiments/switching/remote-tor-alternating.sh | 4 +- experiments/switching/remote-tor-constant.sh | 4 +- experiments/throughput/throughput.sh | 4 +- facilitator | 364 +++++++++ facilitator.py | 364 --------- flashproxy-client | 932 +++++++++++++++++++++++ flashproxy-client-test | 233 ++++++ flashproxy-client-test.py | 224 ------ flashproxy-client.py | 932 ----------------------- flashproxy-reg-http | 111 +++ flashproxy-reg-http.py | 111 --- init.d/facilitator | 4 +- torrc | 2 +- 18 files changed, 1665 insertions(+), 1656 deletions(-) diff --git a/Makefile b/Makefile index fc65672..7b204dd 100644 --- a/Makefile +++ b/Makefile @@ -3,21 +3,21 @@ BINDIR = $(PREFIX)/bin VERSION = 0.2 -CLIENT_DIST_FILES = flashproxy-client.py flashproxy-reg-http.py README LICENSE torrc +CLIENT_DIST_FILES = flashproxy-client flashproxy-reg-http README LICENSE torrc all: : install: mkdir -p $(BINDIR) - cp -f flashproxy-client.py flashproxy-reg-http.py facilitator.py $(BINDIR) + cp -f flashproxy-client flashproxy-reg-http facilitator $(BINDIR) clean: rm -f *.pyc rm -rf dist test: - ./flashproxy-client-test.py + ./flashproxy-client-test ./flashproxy-test.js DISTNAME = flashproxy-client-$(VERSION) diff --git a/README b/README index 2b4b5a9..d6ad66d 100644 --- a/README +++ b/README @@ -6,7 +6,7 @@ means version 0.2.3.2-alpha or later. All the flashproxy programs and source code can be downloaded this way: git clone https://git.torproject.org/flashproxy.git But as a user you only need these files: - https://gitweb.torproject.org/flashproxy.git/blob_plain/HEAD:/flashproxy-cli... + https://gitweb.torproject.org/flashproxy.git/blob_plain/HEAD:/flashproxy-cli... https://gitweb.torproject.org/flashproxy.git/blob_plain/HEAD:/torrc You must be able to receive TCP connections; unfortunately means that @@ -14,17 +14,17 @@ you cannot be behind NAT. See the section "Using a public client transport plugin" below to try out the system even behind NAT. 1. Run the client transport plugin. - $ python flashproxy-client.py --register + $ flashproxy-client --register By default the transport plugin listens on Internet-facing TCP port 9000. If you have to use a different port (to get through a firewall, for example), give it on the command lines like this (here using port 8888): - $ python flashproxy-client.py --register 127.0.0.1:9001 :8888 + $ flashproxy-client --register 127.0.0.1:9001 :8888 2. Run Tor using the included torrc file. $ tor -f torrc -Watch the output of flashproxy-client.py and tor. From -flashproxy-client.py you are expecting output lines like this: +Watch the output of flashproxy-client and tor. From +flashproxy-client you are expecting output lines like this: Remote connection from [scrubbed]. Local connection from [scrubbed]. Linking [scrubbed] and [scrubbed]. @@ -72,7 +72,7 @@ http://crypto.stanford.edu/flashproxy/ == Using a public client transport plugin -Rather than running flashproxy-client.py on your computer, you can use a +Rather than running flashproxy-client on your computer, you can use a public instance of it. This way is not as realistic because all your Tor traffic will first go to a fixed address and can be easily blocked. However this is an easy way to try out the system without having to do @@ -106,7 +106,7 @@ wait a few minutes. It can take a while to download relay descriptors. If you suspect that the facilitator has lost your client registration, you can re-register: - $ python flashproxy-reg-http.py + $ flashproxy-reg-http == How to run a relay @@ -136,7 +136,7 @@ See design.txt for some more technical information. === Running a facilitator On the facilitator, run - $ ./facilitator.py -r <relay-ip> + $ facilitator -r <relay-ip> You can use "tor1.bamsoftware.com" for <relay-ip>. The facilitator runs on port 9002 by default. diff --git a/experiments/exercise/exercise.sh b/experiments/exercise/exercise.sh index 91ab3d7..8cfd5b3 100755 --- a/experiments/exercise/exercise.sh +++ b/experiments/exercise/exercise.sh @@ -21,7 +21,7 @@ trap stop EXIT date cd "$FLASHPROXY_DIR" -./flashproxy-client.py --register "127.0.0.1:$LOCAL_PORT" ":$REMOTE_PORT" & +./flashproxy-client --register "127.0.0.1:$LOCAL_PORT" ":$REMOTE_PORT" & PIDS_TO_KILL+=($!) sleep 20 diff --git a/experiments/switching/local-http-alternating.sh b/experiments/switching/local-http-alternating.sh index e0807e2..83e33eb 100755 --- a/experiments/switching/local-http-alternating.sh +++ b/experiments/switching/local-http-alternating.sh @@ -36,12 +36,12 @@ echo "Start web server." PIDS_TO_KILL+=($!) echo "Start facilitator." -"$FLASHPROXY_DIR"/facilitator.py -d --relay 127.0.0.1:8000 >/dev/null & +"$FLASHPROXY_DIR"/facilitator -d --relay 127.0.0.1:8000 >/dev/null & PIDS_TO_KILL+=($!) visible_sleep 5 echo "Start client transport plugin." -"$FLASHPROXY_DIR"/flashproxy-client.py --register --facilitator 127.0.0.1:9002 >/dev/null & +"$FLASHPROXY_DIR"/flashproxy-client --register --facilitator 127.0.0.1:9002 >/dev/null & PIDS_TO_KILL+=($!) visible_sleep 1 diff --git a/experiments/switching/local-http-constant.sh b/experiments/switching/local-http-constant.sh index 9efaafd..41c1a6e 100755 --- a/experiments/switching/local-http-constant.sh +++ b/experiments/switching/local-http-constant.sh @@ -40,12 +40,12 @@ echo "Start websockify." PIDS_TO_KILL+=($!) echo "Start facilitator." -"$FLASHPROXY_DIR"/facilitator.py -d --relay 127.0.0.1:8001 >/dev/null & +"$FLASHPROXY_DIR"/facilitator -d --relay 127.0.0.1:8001 >/dev/null & PIDS_TO_KILL+=($!) visible_sleep 5 echo "Start client transport plugin." -"$FLASHPROXY_DIR"/flashproxy-client.py --register --facilitator 127.0.0.1:9002 >/dev/null & +"$FLASHPROXY_DIR"/flashproxy-client --register --facilitator 127.0.0.1:9002 >/dev/null & PIDS_TO_KILL+=($!) visible_sleep 1 diff --git a/experiments/switching/remote-tor-alternating.sh b/experiments/switching/remote-tor-alternating.sh index e4ad8c0..08d6dc0 100755 --- a/experiments/switching/remote-tor-alternating.sh +++ b/experiments/switching/remote-tor-alternating.sh @@ -33,12 +33,12 @@ echo "Start web server." PIDS_TO_KILL+=($!) echo "Start facilitator." -"$FLASHPROXY_DIR"/facilitator.py -d --relay tor1.bamsoftware.com:9901 >/dev/null & +"$FLASHPROXY_DIR"/facilitator -d --relay tor1.bamsoftware.com:9901 >/dev/null & PIDS_TO_KILL+=($!) visible_sleep 15 echo "Start client transport plugin." -"$FLASHPROXY_DIR"/flashproxy-client.py --register --facilitator 127.0.0.1:9002 >/dev/null & +"$FLASHPROXY_DIR"/flashproxy-client --register --facilitator 127.0.0.1:9002 >/dev/null & PIDS_TO_KILL+=($!) visible_sleep 1 diff --git a/experiments/switching/remote-tor-constant.sh b/experiments/switching/remote-tor-constant.sh index a26a323..33c54f4 100755 --- a/experiments/switching/remote-tor-constant.sh +++ b/experiments/switching/remote-tor-constant.sh @@ -32,12 +32,12 @@ echo "Start web server." PIDS_TO_KILL+=($!) echo "Start facilitator." -"$FLASHPROXY_DIR"/facilitator.py -d --relay tor1.bamsoftware.com:9901 >/dev/null & +"$FLASHPROXY_DIR"/facilitator -d --relay tor1.bamsoftware.com:9901 >/dev/null & PIDS_TO_KILL+=($!) visible_sleep 15 echo "Start client transport plugin." -"$FLASHPROXY_DIR"/flashproxy-client.py --register --facilitator 127.0.0.1:9002 >/dev/null & +"$FLASHPROXY_DIR"/flashproxy-client --register --facilitator 127.0.0.1:9002 >/dev/null & PIDS_TO_KILL+=($!) visible_sleep 1 diff --git a/experiments/throughput/throughput.sh b/experiments/throughput/throughput.sh index 2290e94..99ca7fc 100755 --- a/experiments/throughput/throughput.sh +++ b/experiments/throughput/throughput.sh @@ -53,12 +53,12 @@ echo "Start websockify." PIDS_TO_KILL+=($!) echo "Start facilitator." -"$FLASHPROXY_DIR"/facilitator.py -d --relay 127.0.0.1:8001 127.0.0.1 9002 >/dev/null & +"$FLASHPROXY_DIR"/facilitator -d --relay 127.0.0.1:8001 127.0.0.1 9002 >/dev/null & PIDS_TO_KILL+=($!) visible_sleep 1 echo "Start client transport plugin." -"$FLASHPROXY_DIR"/flashproxy-client.py >/dev/null & +"$FLASHPROXY_DIR"/flashproxy-client >/dev/null & PIDS_TO_KILL+=($!) visible_sleep 1 diff --git a/facilitator b/facilitator new file mode 100755 index 0000000..2a04e71 --- /dev/null +++ b/facilitator @@ -0,0 +1,364 @@ +#!/usr/bin/env python + +import BaseHTTPServer +import SocketServer +import cgi +import errno +import getopt +import os +import re +import socket +import sys +import threading +import time +import urllib +import urlparse + +DEFAULT_ADDRESS = "0.0.0.0" +DEFAULT_PORT = 9002 +DEFAULT_RELAY_PORT = 9001 +DEFAULT_LOG_FILENAME = "facilitator.log" + +LOG_DATE_FORMAT = "%Y-%m-%d %H:%M:%S" + +class options(object): + log_filename = DEFAULT_LOG_FILENAME + log_file = sys.stdout + relay_spec = None + daemonize = True + pid_filename = None + safe_logging = True + + @staticmethod + def set_relay_spec(spec): + af, host, port = parse_addr_spec(spec, defport = DEFAULT_RELAY_PORT) + # Resolve to get an IP address. + addrs = socket.getaddrinfo(host, port, af) + options.relay_spec = format_addr(addrs[0][4]) + +def usage(f = sys.stdout): + print >> f, """\ +Usage: %(progname)s -r RELAY <OPTIONS> [HOST] [PORT] +Flash proxy facilitator: Register client addresses with HTTP POST requests +and serve them out again with HTTP GET. Listen on HOST and PORT, by default +%(addr)s %(port)d. + -d, --debug don't daemonize, log to stdout. + -h, --help show this help. + -l, --log FILENAME write log to FILENAME (default \"%(log)s\"). + --pidfile FILENAME write PID to FILENAME after daemonizing. + -r, --relay RELAY send RELAY (host:port) to proxies as the relay to use. + --unsafe-logging don't scrub IP addresses from logs.\ +""" % { + "progname": sys.argv[0], + "addr": DEFAULT_ADDRESS, + "port": DEFAULT_PORT, + "log": DEFAULT_LOG_FILENAME, +} + +def safe_str(s): + """Return s if options.safe_logging is true, and "[scrubbed]" otherwise.""" + if options.safe_logging: + return "[scrubbed]" + else: + return s + +log_lock = threading.Lock() +def log(msg): + log_lock.acquire() + try: + print >> options.log_file, (u"%s %s" % (time.strftime(LOG_DATE_FORMAT), msg)).encode("UTF-8") + options.log_file.flush() + finally: + log_lock.release() + +def parse_addr_spec(spec, defhost = None, defport = None): + host = None + port = None + m = None + # IPv6 syntax. + if not m: + m = re.match(ur'^\[(.+)\]:(\d+)$', spec) + if m: + host, port = m.groups() + af = socket.AF_INET6 + if not m: + m = re.match(ur'^\[(.+)\]:?$', spec) + if m: + host, = m.groups() + af = socket.AF_INET6 + # IPv4 syntax. + if not m: + m = re.match(ur'^(.+):(\d+)$', spec) + if m: + host, port = m.groups() + af = socket.AF_INET + if not m: + m = re.match(ur'^:?(\d+)$', spec) + if m: + port, = m.groups() + af = 0 + if not m: + host = spec + af = 0 + host = host or defhost + port = port or defport + if not (host and port): + raise ValueError("Bad address specification \"%s\"" % spec) + return af, host, int(port) + +def format_addr(addr): + host, port = addr + if not host: + return u":%d" % port + # Numeric IPv6 address? + try: + addrs = socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM, socket.IPPROTO_TCP, socket.AI_NUMERICHOST) + af = addrs[0][0] + except socket.gaierror, e: + af = 0 + if af == socket.AF_INET6: + return u"[%s]:%d" % (host, port) + else: + return u"%s:%d" % (host, port) + +class TCPReg(object): + def __init__(self, host, port): + self.host = host + self.port = port + + def __unicode__(self): + return format_addr((self.host, self.port)) + + def __str__(self): + return unicode(self).encode("UTF-8") + + def __cmp__(self, other): + if isinstance(other, TCPReg): + return cmp((self.host, self.port), (other.host, other.port)) + else: + return False + +class Reg(object): + @staticmethod + def parse(spec, defhost = None, defport = None): + try: + af, host, port = parse_addr_spec(spec, defhost, defport) + except ValueError: + pass + else: + try: + addrs = socket.getaddrinfo(host, port, af, socket.SOCK_STREAM, socket.IPPROTO_TCP, socket.AI_NUMERICHOST) + except socket.gaierror, e: + raise ValueError("Bad host or port: \"%s\" \"%s\": %s" % (host, port, str(e))) + if not addrs: + raise ValueError("Bad host or port: \"%s\" \"%s\"" % (host, port)) + + host, port = socket.getnameinfo(addrs[0][4], socket.NI_NUMERICHOST | socket.NI_NUMERICSERV) + return TCPReg(host, int(port)) + + raise ValueError("Bad spec format: %s" % repr(spec)) + +class RegSet(object): + def __init__(self): + self.set = [] + self.cv = threading.Condition() + + def add(self, reg): + self.cv.acquire() + try: + if reg not in list(self.set): + self.set.append(reg) + self.cv.notify() + return True + else: + return False + finally: + self.cv.release() + + def fetch(self): + self.cv.acquire() + try: + if not self.set: + return None + return self.set.pop(0) + finally: + self.cv.release() + + def __len__(self): + self.cv.acquire() + try: + return len(self.set) + finally: + self.cv.release() + +class Handler(BaseHTTPServer.BaseHTTPRequestHandler): + def do_GET(self): + proxy_addr_s = format_addr(self.client_address) + + path = urlparse.urlsplit(self.path)[2] + + reg = REGS.fetch() + if reg: + log(u"proxy %s gets %s, relay %s (now %d)" % + (safe_str(proxy_addr_s), safe_str(unicode(reg)), options.relay_spec, len(REGS))) + else: + log(u"proxy %s gets none" % safe_str(proxy_addr_s)) + self.send_client(reg) + + def do_POST(self): + client_addr_s = format_addr(self.client_address) + + data = cgi.FieldStorage(fp = self.rfile, headers = self.headers, + environ = {"REQUEST_METHOD": "POST"}) + + client_spec = data.getfirst("client") + if client_spec is None: + log(u"client %s missing \"client\" param" % safe_str(client_addr_s)) + self.send_error(400) + return + + try: + reg = Reg.parse(client_spec, self.client_address[0]) + except ValueError, e: + log(u"client %s syntax error in %s: %s" + % (safe_str(client_addr_s), safe_str(repr(client_spec)), repr(str(e)))) + self.send_error(400) + return + + if REGS.add(reg): + log(u"client %s %s (now %d)" + % (safe_str(client_addr_s), safe_str(unicode(reg)), len(REGS))) + else: + log(u"client %s %s (already present, now %d)" + % (safe_str(client_addr_s), safe_str(unicode(reg)), len(REGS))) + + self.send_response(200) + self.end_headers() + + def send_error(self, code, message = None): + self.send_response(code) + self.end_headers() + if message: + self.wfile.write(message) + + def log_request(self, code): + pass + + def log_message(self, format, *args): + msg = format % args + log(u"message from HTTP handler for %s: %s" + % (format_addr(self.client_address), repr(msg))) + + def send_client(self, reg): + if reg: + client_str = str(reg) + else: + client_str = "" + self.send_response(200) + self.send_header("Content-Type", "application/x-www-form-urlencoded") + self.send_header("Cache-Control", "no-cache") + # Allow XMLHttpRequest from any domain. http://www.w3.org/TR/cors/. + self.send_header("Access-Control-Allow-Origin", "*") + self.end_headers() + + data = {} + data["client"] = client_str + data["relay"] = options.relay_spec + self.request.send(urllib.urlencode(data)) + + # Catch "broken pipe" errors that otherwise cause a stack trace in the log. + def catch_epipe(fn): + def ret(self, *args): + try: + fn(self, *args) + except socket.error, e: + try: + err_num = e.errno + except AttributeError: + # Before Python 2.6, exception can be a pair. + err_num, errstr = e + except: + raise + if err_num != errno.EPIPE: + raise + return ret + handle = catch_epipe(BaseHTTPServer.BaseHTTPRequestHandler.handle) + finish = catch_epipe(BaseHTTPServer.BaseHTTPRequestHandler.finish) + +REGS = RegSet() + +opts, args = getopt.gnu_getopt(sys.argv[1:], "dhl:r:", + ["debug", "help", "log=", "pidfile=", "relay=", "unsafe-logging"]) +for o, a in opts: + if o == "-d" or o == "--debug": + options.daemonize = False + options.log_filename = None + elif o == "-h" or o == "--help": + usage() + sys.exit() + elif o == "-l" or o == "--log": + options.log_filename = a + elif o == "--pidfile": + options.pid_filename = a + elif o == "-r" or o == "--relay": + try: + options.set_relay_spec(a) + except socket.gaierror, e: + print >> sys.stderr, u"Can't resolve relay %s: %s" % (repr(a), str(e)) + sys.exit(1) + elif o == "--unsafe-logging": + options.safe_logging = False + +if not options.relay_spec: + print >> sys.stderr, """\ +The -r option is required. Give it the relay that will be sent to proxies. + -r HOST[:PORT]\ +""" + sys.exit(1) + +if len(args) == 0: + address = (DEFAULT_ADDRESS, DEFAULT_PORT) +elif len(args) == 1: + # Either HOST or PORT may be omitted; figure out which one. + if args[0].isdigit(): + address = (DEFAULT_ADDRESS, args[0]) + else: + address = (args[0], DEFAULT_PORT) +elif len(args) == 2: + address = (args[0], args[1]) +else: + usage(sys.stderr) + sys.exit(1) + +if options.log_filename: + options.log_file = open(options.log_filename, "a") + # Send error tracebacks to the log. + sys.stderr = options.log_file +else: + options.log_file = sys.stdout + +addrinfo = socket.getaddrinfo(address[0], address[1], 0, socket.SOCK_STREAM, socket.IPPROTO_TCP)[0] + +class Server(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer): + pass + +# Setup the server +server = Server(addrinfo[4], Handler) + +log(u"start on %s" % format_addr(addrinfo[4])) +log(u"using relay address %s" % options.relay_spec) + +if options.daemonize: + log(u"daemonizing") + pid = os.fork() + if pid != 0: + if options.pid_filename: + f = open(options.pid_filename, "w") + print >> f, pid + f.close() + sys.exit(0) + +try: + server.serve_forever() +except KeyboardInterrupt: + sys.exit(0) diff --git a/facilitator.py b/facilitator.py deleted file mode 100755 index 2a04e71..0000000 --- a/facilitator.py +++ /dev/null @@ -1,364 +0,0 @@ -#!/usr/bin/env python - -import BaseHTTPServer -import SocketServer -import cgi -import errno -import getopt -import os -import re -import socket -import sys -import threading -import time -import urllib -import urlparse - -DEFAULT_ADDRESS = "0.0.0.0" -DEFAULT_PORT = 9002 -DEFAULT_RELAY_PORT = 9001 -DEFAULT_LOG_FILENAME = "facilitator.log" - -LOG_DATE_FORMAT = "%Y-%m-%d %H:%M:%S" - -class options(object): - log_filename = DEFAULT_LOG_FILENAME - log_file = sys.stdout - relay_spec = None - daemonize = True - pid_filename = None - safe_logging = True - - @staticmethod - def set_relay_spec(spec): - af, host, port = parse_addr_spec(spec, defport = DEFAULT_RELAY_PORT) - # Resolve to get an IP address. - addrs = socket.getaddrinfo(host, port, af) - options.relay_spec = format_addr(addrs[0][4]) - -def usage(f = sys.stdout): - print >> f, """\ -Usage: %(progname)s -r RELAY <OPTIONS> [HOST] [PORT] -Flash proxy facilitator: Register client addresses with HTTP POST requests -and serve them out again with HTTP GET. Listen on HOST and PORT, by default -%(addr)s %(port)d. - -d, --debug don't daemonize, log to stdout. - -h, --help show this help. - -l, --log FILENAME write log to FILENAME (default \"%(log)s\"). - --pidfile FILENAME write PID to FILENAME after daemonizing. - -r, --relay RELAY send RELAY (host:port) to proxies as the relay to use. - --unsafe-logging don't scrub IP addresses from logs.\ -""" % { - "progname": sys.argv[0], - "addr": DEFAULT_ADDRESS, - "port": DEFAULT_PORT, - "log": DEFAULT_LOG_FILENAME, -} - -def safe_str(s): - """Return s if options.safe_logging is true, and "[scrubbed]" otherwise.""" - if options.safe_logging: - return "[scrubbed]" - else: - return s - -log_lock = threading.Lock() -def log(msg): - log_lock.acquire() - try: - print >> options.log_file, (u"%s %s" % (time.strftime(LOG_DATE_FORMAT), msg)).encode("UTF-8") - options.log_file.flush() - finally: - log_lock.release() - -def parse_addr_spec(spec, defhost = None, defport = None): - host = None - port = None - m = None - # IPv6 syntax. - if not m: - m = re.match(ur'^\[(.+)\]:(\d+)$', spec) - if m: - host, port = m.groups() - af = socket.AF_INET6 - if not m: - m = re.match(ur'^\[(.+)\]:?$', spec) - if m: - host, = m.groups() - af = socket.AF_INET6 - # IPv4 syntax. - if not m: - m = re.match(ur'^(.+):(\d+)$', spec) - if m: - host, port = m.groups() - af = socket.AF_INET - if not m: - m = re.match(ur'^:?(\d+)$', spec) - if m: - port, = m.groups() - af = 0 - if not m: - host = spec - af = 0 - host = host or defhost - port = port or defport - if not (host and port): - raise ValueError("Bad address specification \"%s\"" % spec) - return af, host, int(port) - -def format_addr(addr): - host, port = addr - if not host: - return u":%d" % port - # Numeric IPv6 address? - try: - addrs = socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM, socket.IPPROTO_TCP, socket.AI_NUMERICHOST) - af = addrs[0][0] - except socket.gaierror, e: - af = 0 - if af == socket.AF_INET6: - return u"[%s]:%d" % (host, port) - else: - return u"%s:%d" % (host, port) - -class TCPReg(object): - def __init__(self, host, port): - self.host = host - self.port = port - - def __unicode__(self): - return format_addr((self.host, self.port)) - - def __str__(self): - return unicode(self).encode("UTF-8") - - def __cmp__(self, other): - if isinstance(other, TCPReg): - return cmp((self.host, self.port), (other.host, other.port)) - else: - return False - -class Reg(object): - @staticmethod - def parse(spec, defhost = None, defport = None): - try: - af, host, port = parse_addr_spec(spec, defhost, defport) - except ValueError: - pass - else: - try: - addrs = socket.getaddrinfo(host, port, af, socket.SOCK_STREAM, socket.IPPROTO_TCP, socket.AI_NUMERICHOST) - except socket.gaierror, e: - raise ValueError("Bad host or port: \"%s\" \"%s\": %s" % (host, port, str(e))) - if not addrs: - raise ValueError("Bad host or port: \"%s\" \"%s\"" % (host, port)) - - host, port = socket.getnameinfo(addrs[0][4], socket.NI_NUMERICHOST | socket.NI_NUMERICSERV) - return TCPReg(host, int(port)) - - raise ValueError("Bad spec format: %s" % repr(spec)) - -class RegSet(object): - def __init__(self): - self.set = [] - self.cv = threading.Condition() - - def add(self, reg): - self.cv.acquire() - try: - if reg not in list(self.set): - self.set.append(reg) - self.cv.notify() - return True - else: - return False - finally: - self.cv.release() - - def fetch(self): - self.cv.acquire() - try: - if not self.set: - return None - return self.set.pop(0) - finally: - self.cv.release() - - def __len__(self): - self.cv.acquire() - try: - return len(self.set) - finally: - self.cv.release() - -class Handler(BaseHTTPServer.BaseHTTPRequestHandler): - def do_GET(self): - proxy_addr_s = format_addr(self.client_address) - - path = urlparse.urlsplit(self.path)[2] - - reg = REGS.fetch() - if reg: - log(u"proxy %s gets %s, relay %s (now %d)" % - (safe_str(proxy_addr_s), safe_str(unicode(reg)), options.relay_spec, len(REGS))) - else: - log(u"proxy %s gets none" % safe_str(proxy_addr_s)) - self.send_client(reg) - - def do_POST(self): - client_addr_s = format_addr(self.client_address) - - data = cgi.FieldStorage(fp = self.rfile, headers = self.headers, - environ = {"REQUEST_METHOD": "POST"}) - - client_spec = data.getfirst("client") - if client_spec is None: - log(u"client %s missing \"client\" param" % safe_str(client_addr_s)) - self.send_error(400) - return - - try: - reg = Reg.parse(client_spec, self.client_address[0]) - except ValueError, e: - log(u"client %s syntax error in %s: %s" - % (safe_str(client_addr_s), safe_str(repr(client_spec)), repr(str(e)))) - self.send_error(400) - return - - if REGS.add(reg): - log(u"client %s %s (now %d)" - % (safe_str(client_addr_s), safe_str(unicode(reg)), len(REGS))) - else: - log(u"client %s %s (already present, now %d)" - % (safe_str(client_addr_s), safe_str(unicode(reg)), len(REGS))) - - self.send_response(200) - self.end_headers() - - def send_error(self, code, message = None): - self.send_response(code) - self.end_headers() - if message: - self.wfile.write(message) - - def log_request(self, code): - pass - - def log_message(self, format, *args): - msg = format % args - log(u"message from HTTP handler for %s: %s" - % (format_addr(self.client_address), repr(msg))) - - def send_client(self, reg): - if reg: - client_str = str(reg) - else: - client_str = "" - self.send_response(200) - self.send_header("Content-Type", "application/x-www-form-urlencoded") - self.send_header("Cache-Control", "no-cache") - # Allow XMLHttpRequest from any domain. http://www.w3.org/TR/cors/. - self.send_header("Access-Control-Allow-Origin", "*") - self.end_headers() - - data = {} - data["client"] = client_str - data["relay"] = options.relay_spec - self.request.send(urllib.urlencode(data)) - - # Catch "broken pipe" errors that otherwise cause a stack trace in the log. - def catch_epipe(fn): - def ret(self, *args): - try: - fn(self, *args) - except socket.error, e: - try: - err_num = e.errno - except AttributeError: - # Before Python 2.6, exception can be a pair. - err_num, errstr = e - except: - raise - if err_num != errno.EPIPE: - raise - return ret - handle = catch_epipe(BaseHTTPServer.BaseHTTPRequestHandler.handle) - finish = catch_epipe(BaseHTTPServer.BaseHTTPRequestHandler.finish) - -REGS = RegSet() - -opts, args = getopt.gnu_getopt(sys.argv[1:], "dhl:r:", - ["debug", "help", "log=", "pidfile=", "relay=", "unsafe-logging"]) -for o, a in opts: - if o == "-d" or o == "--debug": - options.daemonize = False - options.log_filename = None - elif o == "-h" or o == "--help": - usage() - sys.exit() - elif o == "-l" or o == "--log": - options.log_filename = a - elif o == "--pidfile": - options.pid_filename = a - elif o == "-r" or o == "--relay": - try: - options.set_relay_spec(a) - except socket.gaierror, e: - print >> sys.stderr, u"Can't resolve relay %s: %s" % (repr(a), str(e)) - sys.exit(1) - elif o == "--unsafe-logging": - options.safe_logging = False - -if not options.relay_spec: - print >> sys.stderr, """\ -The -r option is required. Give it the relay that will be sent to proxies. - -r HOST[:PORT]\ -""" - sys.exit(1) - -if len(args) == 0: - address = (DEFAULT_ADDRESS, DEFAULT_PORT) -elif len(args) == 1: - # Either HOST or PORT may be omitted; figure out which one. - if args[0].isdigit(): - address = (DEFAULT_ADDRESS, args[0]) - else: - address = (args[0], DEFAULT_PORT) -elif len(args) == 2: - address = (args[0], args[1]) -else: - usage(sys.stderr) - sys.exit(1) - -if options.log_filename: - options.log_file = open(options.log_filename, "a") - # Send error tracebacks to the log. - sys.stderr = options.log_file -else: - options.log_file = sys.stdout - -addrinfo = socket.getaddrinfo(address[0], address[1], 0, socket.SOCK_STREAM, socket.IPPROTO_TCP)[0] - -class Server(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer): - pass - -# Setup the server -server = Server(addrinfo[4], Handler) - -log(u"start on %s" % format_addr(addrinfo[4])) -log(u"using relay address %s" % options.relay_spec) - -if options.daemonize: - log(u"daemonizing") - pid = os.fork() - if pid != 0: - if options.pid_filename: - f = open(options.pid_filename, "w") - print >> f, pid - f.close() - sys.exit(0) - -try: - server.serve_forever() -except KeyboardInterrupt: - sys.exit(0) diff --git a/flashproxy-client b/flashproxy-client new file mode 100755 index 0000000..1f85f87 --- /dev/null +++ b/flashproxy-client @@ -0,0 +1,932 @@ +#!/usr/bin/env python + +import array +import base64 +import cStringIO +import getopt +import httplib +import os +import re +import select +import socket +import struct +import subprocess +import sys +import time +import traceback +import urllib +import xml.sax.saxutils +import BaseHTTPServer + +try: + from hashlib import sha1 +except ImportError: + # Python 2.4 uses this name. + from sha import sha as sha1 + +try: + import numpy +except ImportError: + numpy = None + +DEFAULT_REMOTE_ADDRESS = "0.0.0.0" +DEFAULT_REMOTE_PORT = 9000 +DEFAULT_LOCAL_ADDRESS = "127.0.0.1" +DEFAULT_LOCAL_PORT = 9001 + +LOG_DATE_FORMAT = "%Y-%m-%d %H:%M:%S" + +class options(object): + local_addr = None + remote_addr = None + facilitator_addr = None + + log_filename = None + log_file = sys.stdout + daemonize = False + register = False + pid_filename = None + safe_logging = True + +# We accept up to this many bytes from a socket not yet matched with a partner +# before disconnecting it. +UNCONNECTED_BUFFER_LIMIT = 10240 + +def usage(f = sys.stdout): + print >> f, """\ +Usage: %(progname)s --register [LOCAL][:PORT] [REMOTE][:PORT] +Wait for connections on a local and a remote port. When any pair of connections +exists, data is ferried between them until one side is closed. By default +LOCAL is "%(local)s" and REMOTE is "%(remote)s". + +The local connection acts as a SOCKS4a proxy, but the host and port in the SOCKS +request are ignored and the local connection is always linked to a remote +connection. + +If the --register option is used, then your IP address will be sent to the +facilitator so that proxies can connect to you. You need to register in some way +in order to get any service. The --facilitator option allows controlling which +facilitator is used; if omitted, it uses a public default. + --daemon daemonize (Unix only). + -f, --facilitator=HOST[:PORT] advertise willingness to receive connections to + HOST:PORT. + -h, --help show this help. + -l, --log FILENAME write log to FILENAME (default stdout). + --pidfile FILENAME write PID to FILENAME after daemonizing. + -r, --register register with the facilitator. + --unsafe-logging don't scrub IP addresses from logs.\ +""" % { + "progname": sys.argv[0], + "local": format_addr((DEFAULT_LOCAL_ADDRESS, DEFAULT_LOCAL_PORT)), + "remote": format_addr((DEFAULT_REMOTE_ADDRESS, DEFAULT_REMOTE_PORT)), +} + +def safe_str(s): + """Return s if options.safe_logging is true, and "[scrubbed]" otherwise.""" + if options.safe_logging: + return "[scrubbed]" + else: + return s + +def log(msg): + print >> options.log_file, (u"%s %s" % (time.strftime(LOG_DATE_FORMAT), msg)).encode("UTF-8") + options.log_file.flush() + +def parse_addr_spec(spec, defhost = None, defport = None): + host = None + port = None + m = None + # IPv6 syntax. + if not m: + m = re.match(ur'^\[(.+)\]:(\d+)$', spec) + if m: + host, port = m.groups() + af = socket.AF_INET6 + if not m: + m = re.match(ur'^\[(.+)\]:?$', spec) + if m: + host, = m.groups() + af = socket.AF_INET6 + # IPv4 syntax. + if not m: + m = re.match(ur'^(.+):(\d+)$', spec) + if m: + host, port = m.groups() + af = socket.AF_INET + if not m: + m = re.match(ur'^:?(\d+)$', spec) + if m: + port, = m.groups() + af = 0 + if not m: + host = spec + af = 0 + host = host or defhost + port = port or defport + if not (host and port): + raise ValueError("Bad address specification \"%s\"" % spec) + return host, int(port) + +def format_addr(addr): + host, port = addr + if not host: + return u":%d" % port + # Numeric IPv6 address? + try: + addrs = socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM, socket.IPPROTO_TCP, socket.AI_NUMERICHOST) + af = addrs[0][0] + except socket.gaierror, e: + af = 0 + if af == socket.AF_INET6: + return u"[%s]:%d" % (host, port) + else: + return u"%s:%d" % (host, port) + +def safe_format_addr(addr): + return safe_str(format_addr(addr)) + host, port = addr + if not host: + return u":%d" % port + # Numeric IPv6 address? + try: + addrs = socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM, socket.IPPROTO_TCP, socket.AI_NUMERICHOST) + af = addrs[0][0] + except socket.gaierror, e: + af = 0 + if af == socket.AF_INET6: + return u"[%s]:%d" % (host, port) + else: + return u"%s:%d" % (host, port) + + + +def apply_mask_numpy(payload, mask_key): + if len(payload) == 0: + return "" + payload_a = numpy.frombuffer(payload, dtype="|u4", count=len(payload)//4) + m, = numpy.frombuffer(mask_key, dtype="|u4", count=1) + result = numpy.bitwise_xor(payload_a, m).tostring() + i = len(payload) // 4 * 4 + if i < len(payload): + remainder = [] + while i < len(payload): + remainder.append(chr(ord(payload[i]) ^ ord(mask_key[i % 4]))) + i += 1 + result = result + "".join(remainder) + return result + +def apply_mask_py(payload, mask_key): + result = array.array("B", payload) + m = array.array("B", mask_key) + i = 0 + while i < len(result) - 7: + result[i] ^= m[0] + result[i+1] ^= m[1] + result[i+2] ^= m[2] + result[i+3] ^= m[3] + result[i+4] ^= m[0] + result[i+5] ^= m[1] + result[i+6] ^= m[2] + result[i+7] ^= m[3] + i += 8 + while i < len(result): + result[i] ^= m[i%4] + i += 1 + return result.tostring() + +if numpy is not None: + apply_mask = apply_mask_numpy +else: + apply_mask = apply_mask_py + +class WebSocketFrame(object): + def __init__(self): + self.fin = False + self.opcode = None + self.payload = None + + def is_control(self): + return (self.opcode & 0x08) != 0 + +class WebSocketMessage(object): + def __init__(self): + self.opcode = None + self.payload = None + + def is_control(self): + return (self.opcode & 0x08) != 0 + +class WebSocketDecoder(object): + """RFC 6455 section 5 is about the WebSocket framing format.""" + # Raise an exception rather than buffer anything larger than this. + MAX_MESSAGE_LENGTH = 1024 * 1024 + + class MaskingError(ValueError): + pass + + def __init__(self, use_mask = False): + """use_mask should be True for server-to-client sockets, and False for + client-to-server sockets.""" + self.use_mask = use_mask + + # Per-frame state. + self.buf = "" + + # Per-message state. + self.message_buf = "" + self.message_opcode = None + + def feed(self, data): + self.buf += data + + def read_frame(self): + """Read a frame from the internal buffer, if one is available. Returns a + WebSocketFrame object, or None if there are no complete frames to + read.""" + # RFC 6255 section 5.2. + if len(self.buf) < 2: + return None + offset = 0 + b0, b1 = struct.unpack_from(">BB", self.buf, offset) + offset += 2 + fin = (b0 & 0x80) != 0 + opcode = b0 & 0x0f + frame_masked = (b1 & 0x80) != 0 + payload_len = b1 & 0x7f + + if payload_len == 126: + if len(self.buf) < offset + 2: + return None + payload_len, = struct.unpack_from(">H", self.buf, offset) + offset += 2 + elif payload_len == 127: + if len(self.buf) < offset + 8: + return None + payload_len, = struct.unpack_from(">Q", self.buf, offset) + offset += 8 + + if frame_masked: + if not self.use_mask: + # "A client MUST close a connection if it detects a masked + # frame." + raise self.MaskingError("Got masked payload from server") + if len(self.buf) < offset + 4: + return None + mask_key = self.buf[offset:offset+4] + offset += 4 + else: + if self.use_mask: + # "The server MUST close the connection upon receiving a frame + # that is not masked." + raise self.MaskingError("Got unmasked payload from client") + mask_key = None + + if payload_len > self.MAX_MESSAGE_LENGTH: + raise ValueError("Refusing to buffer payload of %d bytes" % payload_len) + + if len(self.buf) < offset + payload_len: + return None + if mask_key: + payload = apply_mask(self.buf[offset:offset+payload_len], mask_key) + else: + payload = self.buf[offset:offset+payload_len] + self.buf = self.buf[offset+payload_len:] + + frame = WebSocketFrame() + frame.fin = fin + frame.opcode = opcode + frame.payload = payload + + return frame + + def read_message(self): + """Read a complete message. If the opcode is 1, the payload is decoded + from a UTF-8 binary string to a unicode string. If a control frame is + read while another fragmented message is in progress, the control frame + is returned as a new message immediately. Returns None if there is no + complete frame to be read.""" + # RFC 6455 section 5.4 is about fragmentation. + while True: + frame = self.read_frame() + if frame is None: + return None + # "Control frames (see Section 5.5) MAY be injected in the middle of + # a fragmented message. Control frames themselves MUST NOT be + # fragmented." + if frame.is_control(): + if not frame.fin: + raise ValueError("Control frame (opcode %d) has FIN bit clear" % frame.opcode) + message = WebSocketMessage() + message.opcode = frame.opcode + message.payload = frame.payload + return message + + if self.message_opcode is None: + if frame.opcode == 0: + raise ValueError("First frame has opcode 0") + self.message_opcode = frame.opcode + else: + if frame.opcode != 0: + raise ValueError("Non-first frame has nonzero opcode %d" % frame.opcode) + self.message_buf += frame.payload + + if frame.fin: + break + message = WebSocketMessage() + message.opcode = self.message_opcode + message.payload = self.message_buf + self.postprocess_message(message) + self.message_opcode = None + self.message_buf = "" + + return message + + def postprocess_message(self, message): + if message.opcode == 1: + message.payload = message.payload.decode("utf-8") + return message + +class WebSocketEncoder(object): + def __init__(self, use_mask = False): + self.use_mask = use_mask + + def encode_frame(self, opcode, payload): + if opcode >= 16: + raise ValueError("Opcode of %d is >= 16" % opcode) + length = len(payload) + + if self.use_mask: + mask_key = os.urandom(4) + payload = apply_mask(payload, mask_key) + mask_bit = 0x80 + else: + mask_key = "" + mask_bit = 0x00 + + if length < 126: + len_b, len_ext = length, "" + elif length < 0x10000: + len_b, len_ext = 126, struct.pack(">H", length) + elif length < 0x10000000000000000: + len_b, len_ext = 127, struct.pack(">Q", length) + else: + raise ValueError("payload length of %d is too long" % length) + + return chr(0x80 | opcode) + chr(mask_bit | len_b) + len_ext + mask_key + payload + + def encode_message(self, opcode, payload): + if opcode == 1: + payload = payload.encode("utf-8") + return self.encode_frame(opcode, payload) + +# WebSocket implementations generally support text (opcode 1) messages, which +# are UTF-8-encoded text. Not all support binary (opcode 2) messages. During the +# WebSocket handshake, we use the "base64" value of the Sec-WebSocket-Protocol +# header field to indicate that text frames should encoded UTF-8-encoded +# base64-encoded binary data. Binary messages are always interpreted verbatim, +# but text messages are rejected if "base64" was not negotiated. +# +# The idea here is that browsers that know they don't support binary messages +# can negotiate "base64" with both endpoints and still reliably transport binary +# data. Those that know they can support binary messages can just use binary +# messages in the straightforward way. + +class WebSocketBinaryDecoder(object): + def __init__(self, protocols, use_mask = False): + self.dec = WebSocketDecoder(use_mask) + self.base64 = "base64" in protocols + + def feed(self, data): + self.dec.feed(data) + + def read(self): + """Returns None when there are currently no data to be read. Returns "" + when a close message is received.""" + while True: + message = self.dec.read_message() + if message is None: + return None + elif message.opcode == 1: + if not self.base64: + raise ValueError("Received text message on decoder incapable of base64") + payload = base64.b64decode(message.payload) + if payload: + return payload + elif message.opcode == 2: + if message.payload: + return message.payload + elif message.opcode == 8: + return "" + # Ignore all other opcodes. + return None + +class WebSocketBinaryEncoder(object): + def __init__(self, protocols, use_mask = False): + self.enc = WebSocketEncoder(use_mask) + self.base64 = "base64" in protocols + + def encode(self, data): + if self.base64: + return self.enc.encode_message(1, base64.b64encode(data)) + else: + return self.enc.encode_message(2, data) + + +def listen_socket(addr): + """Return a nonblocking socket listening on the given address.""" + addrinfo = socket.getaddrinfo(addr[0], addr[1], 0, socket.SOCK_STREAM, socket.IPPROTO_TCP)[0] + s = socket.socket(addrinfo[0], addrinfo[1], addrinfo[2]) + s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + s.bind(addr) + s.listen(10) + s.setblocking(0) + return s + +def format_peername(s): + try: + return safe_format_addr(s.getpeername()) + except socket.error, e: + return "<unconnected>" + +# How long to wait for a WebSocket request on the remote socket. It is limited +# to avoid Slowloris-like attacks. +WEBSOCKET_REQUEST_TIMEOUT = 2.0 + +class WebSocketRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler): + def __init__(self, request_text, fd): + self.rfile = cStringIO.StringIO(request_text) + self.wfile = fd.makefile() + self.error = False + self.raw_requestline = self.rfile.readline() + self.parse_request() + + def log_message(self, *args): + pass + + def send_error(self, code, message = None): + BaseHTTPServer.BaseHTTPRequestHandler.send_error(self, code, message) + self.error = True + +MAGIC_GUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11" + +def handle_websocket_request(fd): + request_text = fd.recv(10 * 1024) + handler = WebSocketRequestHandler(request_text, fd) + if handler.error or not hasattr(handler, "path"): + return None + method = handler.command + path = handler.path + headers = handler.headers + + # See RFC 6455 section 4.2.1 for this sequence of checks. + # + # 1. An HTTP/1.1 or higher GET request, including a "Request-URI"... + if method != "GET": + handler.send_error(405) + return None + if path != "/": + handler.send_error(404) + return None + + # 2. A |Host| header field containing the server's authority. + # We deliberately skip this test. + + # 3. An |Upgrade| header field containing the value "websocket", treated as + # an ASCII case-insensitive value. + if "websocket" not in [x.strip().lower() for x in headers.get("upgrade").split(",")]: + handler.send_error(400) + return None + + # 4. A |Connection| header field that includes the token "Upgrade", treated + # as an ASCII case-insensitive value. + if "upgrade" not in [x.strip().lower() for x in headers.get("connection").split(",")]: + handler.send_error(400) + return None + + # 5. A |Sec-WebSocket-Key| header field with a base64-encoded value that, + # when decoded, is 16 bytes in length. + try: + key = headers.get("sec-websocket-key") + if len(base64.b64decode(key)) != 16: + raise TypeError("Sec-WebSocket-Key must be 16 bytes") + except TypeError: + handler.send_error(400) + return None + + # 6. A |Sec-WebSocket-Version| header field, with a value of 13. We also + # allow 8 from draft-ietf-hybi-thewebsocketprotocol-10. + version = headers.get("sec-websocket-version") + KNOWN_VERSIONS = ["8", "13"] + if version not in KNOWN_VERSIONS: + # "If this version does not match a version understood by the server, + # the server MUST abort the WebSocket handshake described in this + # section and instead send an appropriate HTTP error code (such as 426 + # Upgrade Required) and a |Sec-WebSocket-Version| header field + # indicating the version(s) the server is capable of understanding." + handler.send_response(426) + handler.send_header("Sec-WebSocket-Version", ", ".join(KNOWN_VERSIONS)) + handler.end_headers() + return None + + # 7. Optionally, an |Origin| header field. + + # 8. Optionally, a |Sec-WebSocket-Protocol| header field, with a list of + # values indicating which protocols the client would like to speak, ordered + # by preference. + protocols_str = headers.get("sec-websocket-protocol") + if protocols_str is None: + protocols = [] + else: + protocols = [x.strip().lower() for x in protocols_str.split(",")] + + # 9. Optionally, a |Sec-WebSocket-Extensions| header field... + + # 10. Optionally, other header fields... + + # See RFC 6455 section 4.2.2, item 5 for these steps. + + # 1. A Status-Line with a 101 response code as per RFC 2616. + handler.send_response(101) + # 2. An |Upgrade| header field with value "websocket" as per RFC 2616. + handler.send_header("Upgrade", "websocket") + # 3. A |Connection| header field with value "Upgrade". + handler.send_header("Connection", "Upgrade") + # 4. A |Sec-WebSocket-Accept| header field. The value of this header field + # is constructed by concatenating /key/, defined above in step 4 in Section + # 4.2.2, with the string "258EAFA5-E914-47DA-95CA-C5AB0DC85B11", taking the + # SHA-1 hash of this concatenated value to obtain a 20-byte value and + # base64-encoding (see Section 4 of [RFC4648]) this 20-byte hash. + accept_key = base64.b64encode(sha1(key + MAGIC_GUID).digest()) + handler.send_header("Sec-WebSocket-Accept", accept_key) + # 5. Optionally, a |Sec-WebSocket-Protocol| header field, with a value + # /subprotocol/ as defined in step 4 in Section 4.2.2. + if "base64" in protocols: + handler.send_header("Sec-WebSocket-Protocol", "base64") + # 6. Optionally, a |Sec-WebSocket-Extensions| header field... + + handler.end_headers() + + return protocols + +def grab_string(s, pos): + """Grab a NUL-terminated string from the given string, starting at the given + offset. Return (pos, str) tuple, or (pos, None) on error.""" + i = pos + while i < len(s): + if s[i] == '\0': + return (i + 1, s[pos:i]) + i += 1 + return pos, None + +# http://ftp.icm.edu.pl/packages/socks/socks4/SOCKS4.protocol +# https://en.wikipedia.org/wiki/SOCKS#SOCKS4a +def parse_socks_request(data): + """Parse the 8-byte SOCKS header at the beginning of data. Returns a + (dest, port) tuple. Raises ValueError on error.""" + try: + ver, cmd, dport, o1, o2, o3, o4 = struct.unpack(">BBHBBBB", data[:8]) + except struct.error: + raise ValueError("Couldn't unpack SOCKS4 header") + if ver != 4: + raise ValueError("Wrong SOCKS version (%d)" % ver) + if cmd != 1: + raise ValueError("Wrong SOCKS command (%d)" % cmd) + pos, userid = grab_string(data, 8) + if userid is None: + raise ValueError("Couldn't read userid") + if o1 == 0 and o2 == 0 and o3 == 0 and o4 != 0: + pos, dest = grab_string(data, pos) + if dest is None: + raise ValueError("Couldn't read destination") + else: + dest = "%d.%d.%d.%d" % (o1, o2, o3, o4) + return dest, dport + +def handle_socks_request(fd): + try: + addr = fd.getpeername() + data = fd.recv(100) + except socket.error, e: + log(u"Socket error from SOCKS-pending: %s" % repr(str(e))) + return False + try: + dest_addr = parse_socks_request(data) + except ValueError, e: + log(u"Error parsing SOCKS request: %s." % str(e)) + # Error reply. + fd.sendall(struct.pack(">BBHBBBB", 0, 91, 0, 0, 0, 0, 0)) + return False + log(u"Got SOCKS request for %s." % safe_format_addr(dest_addr)) + fd.sendall(struct.pack(">BBHBBBB", 0, 90, dest_addr[1], 127, 0, 0, 1)) + # Note we throw away the requested address and port. + return True + +def report_pending(): + log(u"locals (%d): %s" % (len(locals), [format_peername(x) for x in locals])) + log(u"remotes (%d): %s" % (len(remotes), [format_peername(x) for x in remotes])) + +def register(): + if not options.register: + return + + # sys.path[0] is initialized to the directory containing the Python script file. + script_dir = sys.path[0] + if not script_dir: + # Maybe the script was read from stdin; in any case don't guess at the directory. + return + command = [os.path.join(script_dir, "flashproxy-reg-http")] + spec = format_addr((None, options.remote_addr[1])) + if options.facilitator_addr is None: + log(u"Registering \"%s\"." % spec) + else: + command += [format_addr(options.facilitator_addr)] + command += ["-a", spec] + try: + p = subprocess.Popen(command) + except OSError, e: + log(u"Failed to register: %s" % str(e)) + +def proxy_chunk_local_to_remote(local, remote, data = None): + if data is None: + try: + data = local.recv(65536) + except socket.error, e: # Can be "Connection reset by peer". + log(u"Socket error from local: %s" % repr(str(e))) + remote.close() + return False + if not data: + log(u"EOF from local %s." % format_peername(local)) + local.close() + remote.close() + return False + else: + try: + remote.send_chunk(data) + except socket.error, e: + log(u"Socket error writing to remote: %s" % repr(str(e))) + local.close() + return False + return True + +def proxy_chunk_remote_to_local(remote, local, data = None): + if data is None: + try: + data = remote.recv(65536) + except socket.error, e: # Can be "Connection reset by peer". + log(u"Socket error from remote: %s" % repr(str(e))) + local.close() + return False + if not data: + log(u"EOF from remote %s." % format_peername(remote)) + remote.close() + local.close() + return False + else: + remote.dec.feed(data) + while True: + try: + data = remote.dec.read() + except (WebSocketDecoder.MaskingError, ValueError), e: + log(u"WebSocket decode error from remote: %s" % repr(str(e))) + remote.close() + local.close() + return False + if data is None: + break + elif not data: + log(u"WebSocket close from remote %s." % format_peername(remote)) + remote.close() + local.close() + return False + try: + local.send_chunk(data) + except socket.error, e: + log(u"Socket error writing to local: %s" % repr(str(e))) + remote.close() + return False + return True + +def receive_unlinked(fd, label): + """Receive and buffer data on a socket that has not been linked yet. Returns + True iff there was no error and the socket may still be used; otherwise, the + socket will be closed before returning.""" + + try: + data = fd.recv(1024) + except socket.error, e: + log(u"Socket error from %s: %s" % (label, repr(str(e)))) + fd.close() + return False + if not data: + log(u"EOF from unlinked %s %s with %d bytes buffered." % (label, format_peername(fd), len(fd.buf))) + fd.close() + return False + else: + log(u"Data from unlinked %s %s (%d bytes)." % (label, format_peername(fd), len(data))) + fd.buf += data + if len(fd.buf) >= UNCONNECTED_BUFFER_LIMIT: + log(u"Refusing to buffer more than %d bytes from %s %s." % (UNCONNECTED_BUFFER_LIMIT, label, format_peername(fd))) + fd.close() + return False + return True + +def match_proxies(): + while unlinked_remotes and unlinked_locals: + remote = unlinked_remotes.pop(0) + local = unlinked_locals.pop(0) + remote_addr, remote_port = remote.getpeername() + local_addr, local_port = local.getpeername() + log(u"Linking %s and %s." % (format_peername(local), format_peername(remote))) + remote.partner = local + local.partner = remote + if remote.buf: + if not proxy_chunk_remote_to_local(remote, local, remote.buf): + remotes.remove(remote) + locals.remove(local) + register() + return + if local.buf: + if not proxy_chunk_local_to_remote(local, remote, local.buf): + remotes.remove(remote) + locals.remove(local) + return + +class TimeoutSocket(object): + def __init__(self, fd): + self.fd = fd + self.birthday = time.time() + + def age(self): + return time.time() - self.birthday + + def __getattr__(self, name): + return getattr(self.fd, name) + +class RemoteSocket(object): + def __init__(self, fd, protocols): + self.fd = fd + self.buf = "" + self.partner = None + self.dec = WebSocketBinaryDecoder(protocols, use_mask = True) + self.enc = WebSocketBinaryEncoder(protocols, use_mask = False) + + def send_chunk(self, data): + self.sendall(self.enc.encode(data)) + + def __getattr__(self, name): + return getattr(self.fd, name) + +class LocalSocket(object): + def __init__(self, fd): + self.fd = fd + self.buf = "" + self.partner = None + + def send_chunk(self, data): + self.sendall(data) + + def __getattr__(self, name): + return getattr(self.fd, name) + +def main(): + while True: + rset = [remote_s, local_s] + websocket_pending + socks_pending + locals + remotes + rset, _, _ = select.select(rset, [], [], WEBSOCKET_REQUEST_TIMEOUT) + for fd in rset: + if fd == remote_s: + remote_c, addr = fd.accept() + log(u"Remote connection from %s." % safe_format_addr(addr)) + websocket_pending.append(TimeoutSocket(remote_c)) + elif fd == local_s: + local_c, addr = fd.accept() + log(u"Local connection from %s." % safe_format_addr(addr)) + socks_pending.append(local_c) + register() + elif fd in websocket_pending: + log(u"Data from WebSocket-pending %s." % safe_format_addr(addr)) + protocols = handle_websocket_request(fd) + if protocols is not None: + wrapped = RemoteSocket(fd, protocols) + remotes.append(wrapped) + unlinked_remotes.append(wrapped) + else: + fd.close() + websocket_pending.remove(fd) + report_pending() + elif fd in socks_pending: + log(u"SOCKS request from %s." % safe_format_addr(addr)) + if handle_socks_request(fd): + wrapped = LocalSocket(fd) + locals.append(wrapped) + unlinked_locals.append(wrapped) + else: + fd.close() + socks_pending.remove(fd) + report_pending() + elif fd in remotes: + local = fd.partner + if local: + if not proxy_chunk_remote_to_local(fd, local): + remotes.remove(fd) + locals.remove(local) + register() + else: + if not receive_unlinked(fd, "remote"): + remotes.remove(fd) + unlinked_remotes.remove(fd) + register() + report_pending() + elif fd in locals: + remote = fd.partner + if remote: + if not proxy_chunk_local_to_remote(fd, remote): + remotes.remove(remote) + locals.remove(fd) + else: + if not receive_unlinked(fd, "local"): + locals.remove(fd) + unlinked_locals.remove(fd) + report_pending() + match_proxies() + while websocket_pending: + pending = websocket_pending[0] + if pending.age() < WEBSOCKET_REQUEST_TIMEOUT: + break + log(u"Expired remote connection from %s." % format_peername(pending)) + pending.close() + websocket_pending.pop(0) + report_pending() + +if __name__ == "__main__": + opts, args = getopt.gnu_getopt(sys.argv[1:], "f:hl:r", ["daemon", "facilitator=", "help", "log=", "pidfile=", "register", "unsafe-logging"]) + for o, a in opts: + if o == "--daemon": + options.daemonize = True + elif o == "-f" or o == "--facilitator": + options.facilitator_addr = parse_addr_spec(a) + elif o == "-h" or o == "--help": + usage() + sys.exit() + elif o == "-l" or o == "--log": + options.log_filename = a + elif o == "--pidfile": + options.pid_filename = a + elif o == "-r" or o == "--register": + options.register = True + elif o == "--unsafe-logging": + options.safe_logging = False + + if len(args) == 0: + options.local_addr = (DEFAULT_LOCAL_ADDRESS, DEFAULT_LOCAL_PORT) + options.remote_addr = (DEFAULT_REMOTE_ADDRESS, DEFAULT_REMOTE_PORT) + elif len(args) == 1: + options.local_addr = parse_addr_spec(args[0], DEFAULT_LOCAL_ADDRESS, DEFAULT_LOCAL_PORT) + options.remote_addr = (DEFAULT_REMOTE_ADDRESS, DEFAULT_REMOTE_PORT) + elif len(args) == 2: + options.local_addr = parse_addr_spec(args[0], DEFAULT_LOCAL_ADDRESS, DEFAULT_LOCAL_PORT) + options.remote_addr = parse_addr_spec(args[1], DEFAULT_REMOTE_ADDRESS, DEFAULT_REMOTE_PORT) + else: + usage(sys.stderr) + sys.exit(1) + + if options.log_filename: + options.log_file = open(options.log_filename, "a") + # Send error tracebacks to the log. + sys.stderr = options.log_file + else: + options.log_file = sys.stdout + + # Local socket, accepting SOCKS requests from localhost + local_s = listen_socket(options.local_addr) + # Remote socket, accepting remote WebSocket connections from proxies. + remote_s = listen_socket(options.remote_addr) + + # New remote sockets waiting to finish their WebSocket negotiation. + websocket_pending = [] + # Remote connection sockets. + remotes = [] + # Remotes not yet linked with a local. This is a subset of remotes. + unlinked_remotes = [] + # New local sockets waiting to finish their SOCKS negotiation. + socks_pending = [] + # Local Tor sockets, after SOCKS negotiation. + locals = [] + # Locals not yet linked with a remote. This is a subset of remotes. + unlinked_locals = [] + + register() + + if options.daemonize: + log(u"Daemonizing.") + pid = os.fork() + if pid != 0: + if options.pid_filename: + f = open(options.pid_filename, "w") + print >> f, pid + f.close() + sys.exit(0) + try: + main() + except Exception: + exc = traceback.format_exc() + log("".join(exc)) diff --git a/flashproxy-client-test b/flashproxy-client-test new file mode 100755 index 0000000..2992bd0 --- /dev/null +++ b/flashproxy-client-test @@ -0,0 +1,233 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import socket +import subprocess +import sys +import unittest + +# Special tricks to load a module whose filename contains a dash and doesn't end +# in ".py". +import imp +dont_write_bytecode = sys.dont_write_bytecode +sys.dont_write_bytecode = True +flashproxy = imp.load_source("flashproxy", "flashproxy-client") +parse_socks_request = flashproxy.parse_socks_request +WebSocketDecoder = flashproxy.WebSocketDecoder +WebSocketEncoder = flashproxy.WebSocketEncoder +sys.dont_write_bytecode = dont_write_bytecode +del dont_write_bytecode +del flashproxy + +LOCAL_ADDRESS = ("127.0.0.1", 40000) +REMOTE_ADDRESS = ("127.0.0.1", 40001) + +class TestSocks(unittest.TestCase): + def test_parse_socks_request_empty(self): + self.assertRaises(ValueError, parse_socks_request, "") + def test_parse_socks_request_short(self): + self.assertRaises(ValueError, parse_socks_request, "\x04\x01\x99\x99\x01\x02\x03\x04") + def test_parse_socks_request_ip_userid_missing(self): + dest, port = parse_socks_request("\x04\x01\x99\x99\x01\x02\x03\x04\x00") + dest, port = parse_socks_request("\x04\x01\x99\x99\x01\x02\x03\x04\x00userid") + self.assertEqual((dest, port), ("1.2.3.4", 0x9999)) + def test_parse_socks_request_ip(self): + dest, port = parse_socks_request("\x04\x01\x99\x99\x01\x02\x03\x04userid\x00") + self.assertEqual((dest, port), ("1.2.3.4", 0x9999)) + def test_parse_socks_request_hostname_missing(self): + self.assertRaises(ValueError, parse_socks_request, "\x04\x01\x99\x99\x00\x00\x00\x01userid\x00") + self.assertRaises(ValueError, parse_socks_request, "\x04\x01\x99\x99\x00\x00\x00\x01userid\x00abc") + def test_parse_socks_request_hostname(self): + dest, port = parse_socks_request("\x04\x01\x99\x99\x00\x00\x00\x01userid\x00abc\x00") + +def read_frames(dec): + frames = [] + while True: + frame = dec.read_frame() + if frame is None: + break + frames.append((frame.fin, frame.opcode, frame.payload)) + return frames + +def read_messages(dec): + messages = [] + while True: + message = dec.read_message() + if message is None: + break + messages.append((message.opcode, message.payload)) + return messages + +class TestWebSocketDecoder(unittest.TestCase): + def test_rfc(self): + """Test samples from RFC 6455 section 5.7.""" + TESTS = [ + ("\x81\x05\x48\x65\x6c\x6c\x6f", False, + [(True, 1, "Hello")], + [(1, u"Hello")]), + ("\x81\x85\x37\xfa\x21\x3d\x7f\x9f\x4d\x51\x58", True, + [(True, 1, "Hello")], + [(1, u"Hello")]), + ("\x01\x03\x48\x65\x6c\x80\x02\x6c\x6f", False, + [(False, 1, "Hel"), (True, 0, "lo")], + [(1, u"Hello")]), + ("\x89\x05\x48\x65\x6c\x6c\x6f", False, + [(True, 9, "Hello")], + [(9, u"Hello")]), + ("\x8a\x85\x37\xfa\x21\x3d\x7f\x9f\x4d\x51\x58", True, + [(True, 10, "Hello")], + [(10, u"Hello")]), + ("\x82\x7e\x01\x00" + "\x00" * 256, False, + [(True, 2, "\x00" * 256)], + [(2, "\x00" * 256)]), + ("\x82\x7f\x00\x00\x00\x00\x00\x01\x00\x00" + "\x00" * 65536, False, + [(True, 2, "\x00" * 65536)], + [(2, "\x00" * 65536)]), + ("\x82\x7f\x00\x00\x00\x00\x00\x01\x00\x03" + "ABCD" * 16384 + "XYZ", False, + [(True, 2, "ABCD" * 16384 + "XYZ")], + [(2, "ABCD" * 16384 + "XYZ")]), + ] + for data, use_mask, expected_frames, expected_messages in TESTS: + dec = WebSocketDecoder(use_mask = use_mask) + dec.feed(data) + actual_frames = read_frames(dec) + self.assertEqual(actual_frames, expected_frames) + + dec = WebSocketDecoder(use_mask = use_mask) + dec.feed(data) + actual_messages = read_messages(dec) + self.assertEqual(actual_messages, expected_messages) + + dec = WebSocketDecoder(use_mask = not use_mask) + dec.feed(data) + self.assertRaises(WebSocketDecoder.MaskingError, dec.read_frame) + + def test_empty_feed(self): + """Test that the decoder can handle a zero-byte feed.""" + dec = WebSocketDecoder() + self.assertEqual(dec.read_frame(), None) + dec.feed("") + self.assertEqual(dec.read_frame(), None) + dec.feed("\x81\x05H") + self.assertEqual(dec.read_frame(), None) + dec.feed("ello") + self.assertEqual(read_frames(dec), [(True, 1, u"Hello")]) + + def test_empty_frame(self): + """Test that a frame may contain a zero-byte payload.""" + dec = WebSocketDecoder() + dec.feed("\x81\x00") + self.assertEqual(read_frames(dec), [(True, 1, u"")]) + dec.feed("\x82\x00") + self.assertEqual(read_frames(dec), [(True, 2, "")]) + + def test_empty_message(self): + """Test that a message may have a zero-byte payload.""" + dec = WebSocketDecoder() + dec.feed("\x01\x00\x00\x00\x80\x00") + self.assertEqual(read_messages(dec), [(1, u"")]) + dec.feed("\x02\x00\x00\x00\x80\x00") + self.assertEqual(read_messages(dec), [(2, "")]) + + def test_interleaved_control(self): + """Test that control messages interleaved with fragmented messages are + returned.""" + dec = WebSocketDecoder() + dec.feed("\x89\x04PING\x01\x03Hel\x8a\x04PONG\x80\x02lo\x89\x04PING") + self.assertEqual(read_messages(dec), [(9, "PING"), (10, "PONG"), (1, u"Hello"), (9, "PING")]) + + def test_fragmented_control(self): + """Test that illegal fragmented control messages cause an error.""" + dec = WebSocketDecoder() + dec.feed("\x09\x04PING") + self.assertRaises(ValueError, dec.read_message) + + def test_zero_opcode(self): + """Test that it is an error for the first frame in a message to have an + opcode of 0.""" + dec = WebSocketDecoder() + dec.feed("\x80\x05Hello") + self.assertRaises(ValueError, dec.read_message) + dec = WebSocketDecoder() + dec.feed("\x00\x05Hello") + self.assertRaises(ValueError, dec.read_message) + + def test_nonzero_opcode(self): + """Test that every frame after the first must have a zero opcode.""" + dec = WebSocketDecoder() + dec.feed("\x01\x01H\x01\x02el\x80\x02lo") + self.assertRaises(ValueError, dec.read_message) + dec = WebSocketDecoder() + dec.feed("\x01\x01H\x00\x02el\x01\x02lo") + self.assertRaises(ValueError, dec.read_message) + + def test_utf8(self): + """Test that text frames (opcode 1) are decoded from UTF-8.""" + text = u"Hello World or Καλημέρα κόσμε or こんにちは 世界 or \U0001f639" + utf8_text = text.encode("utf-8") + dec = WebSocketDecoder() + dec.feed("\x81" + chr(len(utf8_text)) + utf8_text) + self.assertEqual(read_messages(dec), [(1, text)]) + + def test_wrong_utf8(self): + """Test that failed UTF-8 decoding causes an error.""" + TESTS = [ + "\xc0\x41", # Non-shortest form. + "\xc2", # Unfinished sequence. + ] + for test in TESTS: + dec = WebSocketDecoder() + dec.feed("\x81" + chr(len(test)) + test) + self.assertRaises(ValueError, dec.read_message) + + def test_overly_large_payload(self): + """Test that large payloads are rejected.""" + dec = WebSocketDecoder() + dec.feed("\x82\x7f\x00\x00\x00\x00\x01\x00\x00\x00") + self.assertRaises(ValueError, dec.read_frame) + +class TestWebSocketEncoder(unittest.TestCase): + def test_length(self): + """Test that payload lengths are encoded using the smallest number of + bytes.""" + TESTS = [(0, 0), (125, 0), (126, 2), (65535, 2), (65536, 8)] + for length, encoded_length in TESTS: + enc = WebSocketEncoder(use_mask = False) + eframe = enc.encode_frame(2, "\x00" * length) + self.assertEqual(len(eframe), 1 + 1 + encoded_length + length) + enc = WebSocketEncoder(use_mask = True) + eframe = enc.encode_frame(2, "\x00" * length) + self.assertEqual(len(eframe), 1 + 1 + encoded_length + 4 + length) + + def test_roundtrip(self): + TESTS = [ + (1, u"Hello world"), + (1, u"Hello \N{WHITE SMILING FACE}"), + ] + for opcode, payload in TESTS: + for use_mask in (False, True): + enc = WebSocketEncoder(use_mask = use_mask) + enc_message = enc.encode_message(opcode, payload) + dec = WebSocketDecoder(use_mask = use_mask) + dec.feed(enc_message) + self.assertEqual(read_messages(dec), [(opcode, payload)]) + +def format_address(addr): + return "%s:%d" % addr + +class TestConnectionLimit(unittest.TestCase): + def setUp(self): + self.p = subprocess.Popen(["./flashproxy-client", format_address(LOCAL_ADDRESS), format_address(REMOTE_ADDRESS)]) + + def tearDown(self): + self.p.terminate() + + def test_remote_limit(self): + """Test that the client transport plugin limits the number of remote + connections that it will accept.""" + for i in range(5): + s = socket.create_connection(REMOTE_ADDRESS, 2) + self.assertRaises(socket.error, socket.create_connection, REMOTE_ADDRESS) + +if __name__ == "__main__": + unittest.main() diff --git a/flashproxy-client-test.py b/flashproxy-client-test.py deleted file mode 100755 index 35d49ed..0000000 --- a/flashproxy-client-test.py +++ /dev/null @@ -1,224 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -import socket -import subprocess -import unittest -flashproxy = __import__("flashproxy-client") -parse_socks_request = flashproxy.parse_socks_request -WebSocketDecoder = flashproxy.WebSocketDecoder -WebSocketEncoder = flashproxy.WebSocketEncoder -del flashproxy - -LOCAL_ADDRESS = ("127.0.0.1", 40000) -REMOTE_ADDRESS = ("127.0.0.1", 40001) - -class TestSocks(unittest.TestCase): - def test_parse_socks_request_empty(self): - self.assertRaises(ValueError, parse_socks_request, "") - def test_parse_socks_request_short(self): - self.assertRaises(ValueError, parse_socks_request, "\x04\x01\x99\x99\x01\x02\x03\x04") - def test_parse_socks_request_ip_userid_missing(self): - dest, port = parse_socks_request("\x04\x01\x99\x99\x01\x02\x03\x04\x00") - dest, port = parse_socks_request("\x04\x01\x99\x99\x01\x02\x03\x04\x00userid") - self.assertEqual((dest, port), ("1.2.3.4", 0x9999)) - def test_parse_socks_request_ip(self): - dest, port = parse_socks_request("\x04\x01\x99\x99\x01\x02\x03\x04userid\x00") - self.assertEqual((dest, port), ("1.2.3.4", 0x9999)) - def test_parse_socks_request_hostname_missing(self): - self.assertRaises(ValueError, parse_socks_request, "\x04\x01\x99\x99\x00\x00\x00\x01userid\x00") - self.assertRaises(ValueError, parse_socks_request, "\x04\x01\x99\x99\x00\x00\x00\x01userid\x00abc") - def test_parse_socks_request_hostname(self): - dest, port = parse_socks_request("\x04\x01\x99\x99\x00\x00\x00\x01userid\x00abc\x00") - -def read_frames(dec): - frames = [] - while True: - frame = dec.read_frame() - if frame is None: - break - frames.append((frame.fin, frame.opcode, frame.payload)) - return frames - -def read_messages(dec): - messages = [] - while True: - message = dec.read_message() - if message is None: - break - messages.append((message.opcode, message.payload)) - return messages - -class TestWebSocketDecoder(unittest.TestCase): - def test_rfc(self): - """Test samples from RFC 6455 section 5.7.""" - TESTS = [ - ("\x81\x05\x48\x65\x6c\x6c\x6f", False, - [(True, 1, "Hello")], - [(1, u"Hello")]), - ("\x81\x85\x37\xfa\x21\x3d\x7f\x9f\x4d\x51\x58", True, - [(True, 1, "Hello")], - [(1, u"Hello")]), - ("\x01\x03\x48\x65\x6c\x80\x02\x6c\x6f", False, - [(False, 1, "Hel"), (True, 0, "lo")], - [(1, u"Hello")]), - ("\x89\x05\x48\x65\x6c\x6c\x6f", False, - [(True, 9, "Hello")], - [(9, u"Hello")]), - ("\x8a\x85\x37\xfa\x21\x3d\x7f\x9f\x4d\x51\x58", True, - [(True, 10, "Hello")], - [(10, u"Hello")]), - ("\x82\x7e\x01\x00" + "\x00" * 256, False, - [(True, 2, "\x00" * 256)], - [(2, "\x00" * 256)]), - ("\x82\x7f\x00\x00\x00\x00\x00\x01\x00\x00" + "\x00" * 65536, False, - [(True, 2, "\x00" * 65536)], - [(2, "\x00" * 65536)]), - ("\x82\x7f\x00\x00\x00\x00\x00\x01\x00\x03" + "ABCD" * 16384 + "XYZ", False, - [(True, 2, "ABCD" * 16384 + "XYZ")], - [(2, "ABCD" * 16384 + "XYZ")]), - ] - for data, use_mask, expected_frames, expected_messages in TESTS: - dec = WebSocketDecoder(use_mask = use_mask) - dec.feed(data) - actual_frames = read_frames(dec) - self.assertEqual(actual_frames, expected_frames) - - dec = WebSocketDecoder(use_mask = use_mask) - dec.feed(data) - actual_messages = read_messages(dec) - self.assertEqual(actual_messages, expected_messages) - - dec = WebSocketDecoder(use_mask = not use_mask) - dec.feed(data) - self.assertRaises(WebSocketDecoder.MaskingError, dec.read_frame) - - def test_empty_feed(self): - """Test that the decoder can handle a zero-byte feed.""" - dec = WebSocketDecoder() - self.assertEqual(dec.read_frame(), None) - dec.feed("") - self.assertEqual(dec.read_frame(), None) - dec.feed("\x81\x05H") - self.assertEqual(dec.read_frame(), None) - dec.feed("ello") - self.assertEqual(read_frames(dec), [(True, 1, u"Hello")]) - - def test_empty_frame(self): - """Test that a frame may contain a zero-byte payload.""" - dec = WebSocketDecoder() - dec.feed("\x81\x00") - self.assertEqual(read_frames(dec), [(True, 1, u"")]) - dec.feed("\x82\x00") - self.assertEqual(read_frames(dec), [(True, 2, "")]) - - def test_empty_message(self): - """Test that a message may have a zero-byte payload.""" - dec = WebSocketDecoder() - dec.feed("\x01\x00\x00\x00\x80\x00") - self.assertEqual(read_messages(dec), [(1, u"")]) - dec.feed("\x02\x00\x00\x00\x80\x00") - self.assertEqual(read_messages(dec), [(2, "")]) - - def test_interleaved_control(self): - """Test that control messages interleaved with fragmented messages are - returned.""" - dec = WebSocketDecoder() - dec.feed("\x89\x04PING\x01\x03Hel\x8a\x04PONG\x80\x02lo\x89\x04PING") - self.assertEqual(read_messages(dec), [(9, "PING"), (10, "PONG"), (1, u"Hello"), (9, "PING")]) - - def test_fragmented_control(self): - """Test that illegal fragmented control messages cause an error.""" - dec = WebSocketDecoder() - dec.feed("\x09\x04PING") - self.assertRaises(ValueError, dec.read_message) - - def test_zero_opcode(self): - """Test that it is an error for the first frame in a message to have an - opcode of 0.""" - dec = WebSocketDecoder() - dec.feed("\x80\x05Hello") - self.assertRaises(ValueError, dec.read_message) - dec = WebSocketDecoder() - dec.feed("\x00\x05Hello") - self.assertRaises(ValueError, dec.read_message) - - def test_nonzero_opcode(self): - """Test that every frame after the first must have a zero opcode.""" - dec = WebSocketDecoder() - dec.feed("\x01\x01H\x01\x02el\x80\x02lo") - self.assertRaises(ValueError, dec.read_message) - dec = WebSocketDecoder() - dec.feed("\x01\x01H\x00\x02el\x01\x02lo") - self.assertRaises(ValueError, dec.read_message) - - def test_utf8(self): - """Test that text frames (opcode 1) are decoded from UTF-8.""" - text = u"Hello World or Καλημέρα κόσμε or こんにちは 世界 or \U0001f639" - utf8_text = text.encode("utf-8") - dec = WebSocketDecoder() - dec.feed("\x81" + chr(len(utf8_text)) + utf8_text) - self.assertEqual(read_messages(dec), [(1, text)]) - - def test_wrong_utf8(self): - """Test that failed UTF-8 decoding causes an error.""" - TESTS = [ - "\xc0\x41", # Non-shortest form. - "\xc2", # Unfinished sequence. - ] - for test in TESTS: - dec = WebSocketDecoder() - dec.feed("\x81" + chr(len(test)) + test) - self.assertRaises(ValueError, dec.read_message) - - def test_overly_large_payload(self): - """Test that large payloads are rejected.""" - dec = WebSocketDecoder() - dec.feed("\x82\x7f\x00\x00\x00\x00\x01\x00\x00\x00") - self.assertRaises(ValueError, dec.read_frame) - -class TestWebSocketEncoder(unittest.TestCase): - def test_length(self): - """Test that payload lengths are encoded using the smallest number of - bytes.""" - TESTS = [(0, 0), (125, 0), (126, 2), (65535, 2), (65536, 8)] - for length, encoded_length in TESTS: - enc = WebSocketEncoder(use_mask = False) - eframe = enc.encode_frame(2, "\x00" * length) - self.assertEqual(len(eframe), 1 + 1 + encoded_length + length) - enc = WebSocketEncoder(use_mask = True) - eframe = enc.encode_frame(2, "\x00" * length) - self.assertEqual(len(eframe), 1 + 1 + encoded_length + 4 + length) - - def test_roundtrip(self): - TESTS = [ - (1, u"Hello world"), - (1, u"Hello \N{WHITE SMILING FACE}"), - ] - for opcode, payload in TESTS: - for use_mask in (False, True): - enc = WebSocketEncoder(use_mask = use_mask) - enc_message = enc.encode_message(opcode, payload) - dec = WebSocketDecoder(use_mask = use_mask) - dec.feed(enc_message) - self.assertEqual(read_messages(dec), [(opcode, payload)]) - -def format_address(addr): - return "%s:%d" % addr - -class TestConnectionLimit(unittest.TestCase): - def setUp(self): - self.p = subprocess.Popen(["./flashproxy-client.py", format_address(LOCAL_ADDRESS), format_address(REMOTE_ADDRESS)]) - - def tearDown(self): - self.p.terminate() - - def test_remote_limit(self): - """Test that the client transport plugin limits the number of remote - connections that it will accept.""" - for i in range(5): - s = socket.create_connection(REMOTE_ADDRESS, 2) - self.assertRaises(socket.error, socket.create_connection, REMOTE_ADDRESS) - -if __name__ == "__main__": - unittest.main() diff --git a/flashproxy-client.py b/flashproxy-client.py deleted file mode 100755 index 8afd0c0..0000000 --- a/flashproxy-client.py +++ /dev/null @@ -1,932 +0,0 @@ -#!/usr/bin/env python - -import array -import base64 -import cStringIO -import getopt -import httplib -import os -import re -import select -import socket -import struct -import subprocess -import sys -import time -import traceback -import urllib -import xml.sax.saxutils -import BaseHTTPServer - -try: - from hashlib import sha1 -except ImportError: - # Python 2.4 uses this name. - from sha import sha as sha1 - -try: - import numpy -except ImportError: - numpy = None - -DEFAULT_REMOTE_ADDRESS = "0.0.0.0" -DEFAULT_REMOTE_PORT = 9000 -DEFAULT_LOCAL_ADDRESS = "127.0.0.1" -DEFAULT_LOCAL_PORT = 9001 - -LOG_DATE_FORMAT = "%Y-%m-%d %H:%M:%S" - -class options(object): - local_addr = None - remote_addr = None - facilitator_addr = None - - log_filename = None - log_file = sys.stdout - daemonize = False - register = False - pid_filename = None - safe_logging = True - -# We accept up to this many bytes from a socket not yet matched with a partner -# before disconnecting it. -UNCONNECTED_BUFFER_LIMIT = 10240 - -def usage(f = sys.stdout): - print >> f, """\ -Usage: %(progname)s --register [LOCAL][:PORT] [REMOTE][:PORT] -Wait for connections on a local and a remote port. When any pair of connections -exists, data is ferried between them until one side is closed. By default -LOCAL is "%(local)s" and REMOTE is "%(remote)s". - -The local connection acts as a SOCKS4a proxy, but the host and port in the SOCKS -request are ignored and the local connection is always linked to a remote -connection. - -If the --register option is used, then your IP address will be sent to the -facilitator so that proxies can connect to you. You need to register in some way -in order to get any service. The --facilitator option allows controlling which -facilitator is used; if omitted, it uses a public default. - --daemon daemonize (Unix only). - -f, --facilitator=HOST[:PORT] advertise willingness to receive connections to - HOST:PORT. - -h, --help show this help. - -l, --log FILENAME write log to FILENAME (default stdout). - --pidfile FILENAME write PID to FILENAME after daemonizing. - -r, --register register with the facilitator. - --unsafe-logging don't scrub IP addresses from logs.\ -""" % { - "progname": sys.argv[0], - "local": format_addr((DEFAULT_LOCAL_ADDRESS, DEFAULT_LOCAL_PORT)), - "remote": format_addr((DEFAULT_REMOTE_ADDRESS, DEFAULT_REMOTE_PORT)), -} - -def safe_str(s): - """Return s if options.safe_logging is true, and "[scrubbed]" otherwise.""" - if options.safe_logging: - return "[scrubbed]" - else: - return s - -def log(msg): - print >> options.log_file, (u"%s %s" % (time.strftime(LOG_DATE_FORMAT), msg)).encode("UTF-8") - options.log_file.flush() - -def parse_addr_spec(spec, defhost = None, defport = None): - host = None - port = None - m = None - # IPv6 syntax. - if not m: - m = re.match(ur'^\[(.+)\]:(\d+)$', spec) - if m: - host, port = m.groups() - af = socket.AF_INET6 - if not m: - m = re.match(ur'^\[(.+)\]:?$', spec) - if m: - host, = m.groups() - af = socket.AF_INET6 - # IPv4 syntax. - if not m: - m = re.match(ur'^(.+):(\d+)$', spec) - if m: - host, port = m.groups() - af = socket.AF_INET - if not m: - m = re.match(ur'^:?(\d+)$', spec) - if m: - port, = m.groups() - af = 0 - if not m: - host = spec - af = 0 - host = host or defhost - port = port or defport - if not (host and port): - raise ValueError("Bad address specification \"%s\"" % spec) - return host, int(port) - -def format_addr(addr): - host, port = addr - if not host: - return u":%d" % port - # Numeric IPv6 address? - try: - addrs = socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM, socket.IPPROTO_TCP, socket.AI_NUMERICHOST) - af = addrs[0][0] - except socket.gaierror, e: - af = 0 - if af == socket.AF_INET6: - return u"[%s]:%d" % (host, port) - else: - return u"%s:%d" % (host, port) - -def safe_format_addr(addr): - return safe_str(format_addr(addr)) - host, port = addr - if not host: - return u":%d" % port - # Numeric IPv6 address? - try: - addrs = socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM, socket.IPPROTO_TCP, socket.AI_NUMERICHOST) - af = addrs[0][0] - except socket.gaierror, e: - af = 0 - if af == socket.AF_INET6: - return u"[%s]:%d" % (host, port) - else: - return u"%s:%d" % (host, port) - - - -def apply_mask_numpy(payload, mask_key): - if len(payload) == 0: - return "" - payload_a = numpy.frombuffer(payload, dtype="|u4", count=len(payload)//4) - m, = numpy.frombuffer(mask_key, dtype="|u4", count=1) - result = numpy.bitwise_xor(payload_a, m).tostring() - i = len(payload) // 4 * 4 - if i < len(payload): - remainder = [] - while i < len(payload): - remainder.append(chr(ord(payload[i]) ^ ord(mask_key[i % 4]))) - i += 1 - result = result + "".join(remainder) - return result - -def apply_mask_py(payload, mask_key): - result = array.array("B", payload) - m = array.array("B", mask_key) - i = 0 - while i < len(result) - 7: - result[i] ^= m[0] - result[i+1] ^= m[1] - result[i+2] ^= m[2] - result[i+3] ^= m[3] - result[i+4] ^= m[0] - result[i+5] ^= m[1] - result[i+6] ^= m[2] - result[i+7] ^= m[3] - i += 8 - while i < len(result): - result[i] ^= m[i%4] - i += 1 - return result.tostring() - -if numpy is not None: - apply_mask = apply_mask_numpy -else: - apply_mask = apply_mask_py - -class WebSocketFrame(object): - def __init__(self): - self.fin = False - self.opcode = None - self.payload = None - - def is_control(self): - return (self.opcode & 0x08) != 0 - -class WebSocketMessage(object): - def __init__(self): - self.opcode = None - self.payload = None - - def is_control(self): - return (self.opcode & 0x08) != 0 - -class WebSocketDecoder(object): - """RFC 6455 section 5 is about the WebSocket framing format.""" - # Raise an exception rather than buffer anything larger than this. - MAX_MESSAGE_LENGTH = 1024 * 1024 - - class MaskingError(ValueError): - pass - - def __init__(self, use_mask = False): - """use_mask should be True for server-to-client sockets, and False for - client-to-server sockets.""" - self.use_mask = use_mask - - # Per-frame state. - self.buf = "" - - # Per-message state. - self.message_buf = "" - self.message_opcode = None - - def feed(self, data): - self.buf += data - - def read_frame(self): - """Read a frame from the internal buffer, if one is available. Returns a - WebSocketFrame object, or None if there are no complete frames to - read.""" - # RFC 6255 section 5.2. - if len(self.buf) < 2: - return None - offset = 0 - b0, b1 = struct.unpack_from(">BB", self.buf, offset) - offset += 2 - fin = (b0 & 0x80) != 0 - opcode = b0 & 0x0f - frame_masked = (b1 & 0x80) != 0 - payload_len = b1 & 0x7f - - if payload_len == 126: - if len(self.buf) < offset + 2: - return None - payload_len, = struct.unpack_from(">H", self.buf, offset) - offset += 2 - elif payload_len == 127: - if len(self.buf) < offset + 8: - return None - payload_len, = struct.unpack_from(">Q", self.buf, offset) - offset += 8 - - if frame_masked: - if not self.use_mask: - # "A client MUST close a connection if it detects a masked - # frame." - raise self.MaskingError("Got masked payload from server") - if len(self.buf) < offset + 4: - return None - mask_key = self.buf[offset:offset+4] - offset += 4 - else: - if self.use_mask: - # "The server MUST close the connection upon receiving a frame - # that is not masked." - raise self.MaskingError("Got unmasked payload from client") - mask_key = None - - if payload_len > self.MAX_MESSAGE_LENGTH: - raise ValueError("Refusing to buffer payload of %d bytes" % payload_len) - - if len(self.buf) < offset + payload_len: - return None - if mask_key: - payload = apply_mask(self.buf[offset:offset+payload_len], mask_key) - else: - payload = self.buf[offset:offset+payload_len] - self.buf = self.buf[offset+payload_len:] - - frame = WebSocketFrame() - frame.fin = fin - frame.opcode = opcode - frame.payload = payload - - return frame - - def read_message(self): - """Read a complete message. If the opcode is 1, the payload is decoded - from a UTF-8 binary string to a unicode string. If a control frame is - read while another fragmented message is in progress, the control frame - is returned as a new message immediately. Returns None if there is no - complete frame to be read.""" - # RFC 6455 section 5.4 is about fragmentation. - while True: - frame = self.read_frame() - if frame is None: - return None - # "Control frames (see Section 5.5) MAY be injected in the middle of - # a fragmented message. Control frames themselves MUST NOT be - # fragmented." - if frame.is_control(): - if not frame.fin: - raise ValueError("Control frame (opcode %d) has FIN bit clear" % frame.opcode) - message = WebSocketMessage() - message.opcode = frame.opcode - message.payload = frame.payload - return message - - if self.message_opcode is None: - if frame.opcode == 0: - raise ValueError("First frame has opcode 0") - self.message_opcode = frame.opcode - else: - if frame.opcode != 0: - raise ValueError("Non-first frame has nonzero opcode %d" % frame.opcode) - self.message_buf += frame.payload - - if frame.fin: - break - message = WebSocketMessage() - message.opcode = self.message_opcode - message.payload = self.message_buf - self.postprocess_message(message) - self.message_opcode = None - self.message_buf = "" - - return message - - def postprocess_message(self, message): - if message.opcode == 1: - message.payload = message.payload.decode("utf-8") - return message - -class WebSocketEncoder(object): - def __init__(self, use_mask = False): - self.use_mask = use_mask - - def encode_frame(self, opcode, payload): - if opcode >= 16: - raise ValueError("Opcode of %d is >= 16" % opcode) - length = len(payload) - - if self.use_mask: - mask_key = os.urandom(4) - payload = apply_mask(payload, mask_key) - mask_bit = 0x80 - else: - mask_key = "" - mask_bit = 0x00 - - if length < 126: - len_b, len_ext = length, "" - elif length < 0x10000: - len_b, len_ext = 126, struct.pack(">H", length) - elif length < 0x10000000000000000: - len_b, len_ext = 127, struct.pack(">Q", length) - else: - raise ValueError("payload length of %d is too long" % length) - - return chr(0x80 | opcode) + chr(mask_bit | len_b) + len_ext + mask_key + payload - - def encode_message(self, opcode, payload): - if opcode == 1: - payload = payload.encode("utf-8") - return self.encode_frame(opcode, payload) - -# WebSocket implementations generally support text (opcode 1) messages, which -# are UTF-8-encoded text. Not all support binary (opcode 2) messages. During the -# WebSocket handshake, we use the "base64" value of the Sec-WebSocket-Protocol -# header field to indicate that text frames should encoded UTF-8-encoded -# base64-encoded binary data. Binary messages are always interpreted verbatim, -# but text messages are rejected if "base64" was not negotiated. -# -# The idea here is that browsers that know they don't support binary messages -# can negotiate "base64" with both endpoints and still reliably transport binary -# data. Those that know they can support binary messages can just use binary -# messages in the straightforward way. - -class WebSocketBinaryDecoder(object): - def __init__(self, protocols, use_mask = False): - self.dec = WebSocketDecoder(use_mask) - self.base64 = "base64" in protocols - - def feed(self, data): - self.dec.feed(data) - - def read(self): - """Returns None when there are currently no data to be read. Returns "" - when a close message is received.""" - while True: - message = self.dec.read_message() - if message is None: - return None - elif message.opcode == 1: - if not self.base64: - raise ValueError("Received text message on decoder incapable of base64") - payload = base64.b64decode(message.payload) - if payload: - return payload - elif message.opcode == 2: - if message.payload: - return message.payload - elif message.opcode == 8: - return "" - # Ignore all other opcodes. - return None - -class WebSocketBinaryEncoder(object): - def __init__(self, protocols, use_mask = False): - self.enc = WebSocketEncoder(use_mask) - self.base64 = "base64" in protocols - - def encode(self, data): - if self.base64: - return self.enc.encode_message(1, base64.b64encode(data)) - else: - return self.enc.encode_message(2, data) - - -def listen_socket(addr): - """Return a nonblocking socket listening on the given address.""" - addrinfo = socket.getaddrinfo(addr[0], addr[1], 0, socket.SOCK_STREAM, socket.IPPROTO_TCP)[0] - s = socket.socket(addrinfo[0], addrinfo[1], addrinfo[2]) - s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - s.bind(addr) - s.listen(10) - s.setblocking(0) - return s - -def format_peername(s): - try: - return safe_format_addr(s.getpeername()) - except socket.error, e: - return "<unconnected>" - -# How long to wait for a WebSocket request on the remote socket. It is limited -# to avoid Slowloris-like attacks. -WEBSOCKET_REQUEST_TIMEOUT = 2.0 - -class WebSocketRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler): - def __init__(self, request_text, fd): - self.rfile = cStringIO.StringIO(request_text) - self.wfile = fd.makefile() - self.error = False - self.raw_requestline = self.rfile.readline() - self.parse_request() - - def log_message(self, *args): - pass - - def send_error(self, code, message = None): - BaseHTTPServer.BaseHTTPRequestHandler.send_error(self, code, message) - self.error = True - -MAGIC_GUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11" - -def handle_websocket_request(fd): - request_text = fd.recv(10 * 1024) - handler = WebSocketRequestHandler(request_text, fd) - if handler.error or not hasattr(handler, "path"): - return None - method = handler.command - path = handler.path - headers = handler.headers - - # See RFC 6455 section 4.2.1 for this sequence of checks. - # - # 1. An HTTP/1.1 or higher GET request, including a "Request-URI"... - if method != "GET": - handler.send_error(405) - return None - if path != "/": - handler.send_error(404) - return None - - # 2. A |Host| header field containing the server's authority. - # We deliberately skip this test. - - # 3. An |Upgrade| header field containing the value "websocket", treated as - # an ASCII case-insensitive value. - if "websocket" not in [x.strip().lower() for x in headers.get("upgrade").split(",")]: - handler.send_error(400) - return None - - # 4. A |Connection| header field that includes the token "Upgrade", treated - # as an ASCII case-insensitive value. - if "upgrade" not in [x.strip().lower() for x in headers.get("connection").split(",")]: - handler.send_error(400) - return None - - # 5. A |Sec-WebSocket-Key| header field with a base64-encoded value that, - # when decoded, is 16 bytes in length. - try: - key = headers.get("sec-websocket-key") - if len(base64.b64decode(key)) != 16: - raise TypeError("Sec-WebSocket-Key must be 16 bytes") - except TypeError: - handler.send_error(400) - return None - - # 6. A |Sec-WebSocket-Version| header field, with a value of 13. We also - # allow 8 from draft-ietf-hybi-thewebsocketprotocol-10. - version = headers.get("sec-websocket-version") - KNOWN_VERSIONS = ["8", "13"] - if version not in KNOWN_VERSIONS: - # "If this version does not match a version understood by the server, - # the server MUST abort the WebSocket handshake described in this - # section and instead send an appropriate HTTP error code (such as 426 - # Upgrade Required) and a |Sec-WebSocket-Version| header field - # indicating the version(s) the server is capable of understanding." - handler.send_response(426) - handler.send_header("Sec-WebSocket-Version", ", ".join(KNOWN_VERSIONS)) - handler.end_headers() - return None - - # 7. Optionally, an |Origin| header field. - - # 8. Optionally, a |Sec-WebSocket-Protocol| header field, with a list of - # values indicating which protocols the client would like to speak, ordered - # by preference. - protocols_str = headers.get("sec-websocket-protocol") - if protocols_str is None: - protocols = [] - else: - protocols = [x.strip().lower() for x in protocols_str.split(",")] - - # 9. Optionally, a |Sec-WebSocket-Extensions| header field... - - # 10. Optionally, other header fields... - - # See RFC 6455 section 4.2.2, item 5 for these steps. - - # 1. A Status-Line with a 101 response code as per RFC 2616. - handler.send_response(101) - # 2. An |Upgrade| header field with value "websocket" as per RFC 2616. - handler.send_header("Upgrade", "websocket") - # 3. A |Connection| header field with value "Upgrade". - handler.send_header("Connection", "Upgrade") - # 4. A |Sec-WebSocket-Accept| header field. The value of this header field - # is constructed by concatenating /key/, defined above in step 4 in Section - # 4.2.2, with the string "258EAFA5-E914-47DA-95CA-C5AB0DC85B11", taking the - # SHA-1 hash of this concatenated value to obtain a 20-byte value and - # base64-encoding (see Section 4 of [RFC4648]) this 20-byte hash. - accept_key = base64.b64encode(sha1(key + MAGIC_GUID).digest()) - handler.send_header("Sec-WebSocket-Accept", accept_key) - # 5. Optionally, a |Sec-WebSocket-Protocol| header field, with a value - # /subprotocol/ as defined in step 4 in Section 4.2.2. - if "base64" in protocols: - handler.send_header("Sec-WebSocket-Protocol", "base64") - # 6. Optionally, a |Sec-WebSocket-Extensions| header field... - - handler.end_headers() - - return protocols - -def grab_string(s, pos): - """Grab a NUL-terminated string from the given string, starting at the given - offset. Return (pos, str) tuple, or (pos, None) on error.""" - i = pos - while i < len(s): - if s[i] == '\0': - return (i + 1, s[pos:i]) - i += 1 - return pos, None - -# http://ftp.icm.edu.pl/packages/socks/socks4/SOCKS4.protocol -# https://en.wikipedia.org/wiki/SOCKS#SOCKS4a -def parse_socks_request(data): - """Parse the 8-byte SOCKS header at the beginning of data. Returns a - (dest, port) tuple. Raises ValueError on error.""" - try: - ver, cmd, dport, o1, o2, o3, o4 = struct.unpack(">BBHBBBB", data[:8]) - except struct.error: - raise ValueError("Couldn't unpack SOCKS4 header") - if ver != 4: - raise ValueError("Wrong SOCKS version (%d)" % ver) - if cmd != 1: - raise ValueError("Wrong SOCKS command (%d)" % cmd) - pos, userid = grab_string(data, 8) - if userid is None: - raise ValueError("Couldn't read userid") - if o1 == 0 and o2 == 0 and o3 == 0 and o4 != 0: - pos, dest = grab_string(data, pos) - if dest is None: - raise ValueError("Couldn't read destination") - else: - dest = "%d.%d.%d.%d" % (o1, o2, o3, o4) - return dest, dport - -def handle_socks_request(fd): - try: - addr = fd.getpeername() - data = fd.recv(100) - except socket.error, e: - log(u"Socket error from SOCKS-pending: %s" % repr(str(e))) - return False - try: - dest_addr = parse_socks_request(data) - except ValueError, e: - log(u"Error parsing SOCKS request: %s." % str(e)) - # Error reply. - fd.sendall(struct.pack(">BBHBBBB", 0, 91, 0, 0, 0, 0, 0)) - return False - log(u"Got SOCKS request for %s." % safe_format_addr(dest_addr)) - fd.sendall(struct.pack(">BBHBBBB", 0, 90, dest_addr[1], 127, 0, 0, 1)) - # Note we throw away the requested address and port. - return True - -def report_pending(): - log(u"locals (%d): %s" % (len(locals), [format_peername(x) for x in locals])) - log(u"remotes (%d): %s" % (len(remotes), [format_peername(x) for x in remotes])) - -def register(): - if not options.register: - return - - # sys.path[0] is initialized to the directory containing the Python script file. - script_dir = sys.path[0] - if not script_dir: - # Maybe the script was read from stdin; in any case don't guess at the directory. - return - command = [os.path.join(script_dir, "flashproxy-reg-http.py")] - spec = format_addr((None, options.remote_addr[1])) - if options.facilitator_addr is None: - log(u"Registering \"%s\"." % spec) - else: - command += [format_addr(options.facilitator_addr)] - command += ["-a", spec] - try: - p = subprocess.Popen(command) - except OSError, e: - log(u"Failed to register: %s" % str(e)) - -def proxy_chunk_local_to_remote(local, remote, data = None): - if data is None: - try: - data = local.recv(65536) - except socket.error, e: # Can be "Connection reset by peer". - log(u"Socket error from local: %s" % repr(str(e))) - remote.close() - return False - if not data: - log(u"EOF from local %s." % format_peername(local)) - local.close() - remote.close() - return False - else: - try: - remote.send_chunk(data) - except socket.error, e: - log(u"Socket error writing to remote: %s" % repr(str(e))) - local.close() - return False - return True - -def proxy_chunk_remote_to_local(remote, local, data = None): - if data is None: - try: - data = remote.recv(65536) - except socket.error, e: # Can be "Connection reset by peer". - log(u"Socket error from remote: %s" % repr(str(e))) - local.close() - return False - if not data: - log(u"EOF from remote %s." % format_peername(remote)) - remote.close() - local.close() - return False - else: - remote.dec.feed(data) - while True: - try: - data = remote.dec.read() - except (WebSocketDecoder.MaskingError, ValueError), e: - log(u"WebSocket decode error from remote: %s" % repr(str(e))) - remote.close() - local.close() - return False - if data is None: - break - elif not data: - log(u"WebSocket close from remote %s." % format_peername(remote)) - remote.close() - local.close() - return False - try: - local.send_chunk(data) - except socket.error, e: - log(u"Socket error writing to local: %s" % repr(str(e))) - remote.close() - return False - return True - -def receive_unlinked(fd, label): - """Receive and buffer data on a socket that has not been linked yet. Returns - True iff there was no error and the socket may still be used; otherwise, the - socket will be closed before returning.""" - - try: - data = fd.recv(1024) - except socket.error, e: - log(u"Socket error from %s: %s" % (label, repr(str(e)))) - fd.close() - return False - if not data: - log(u"EOF from unlinked %s %s with %d bytes buffered." % (label, format_peername(fd), len(fd.buf))) - fd.close() - return False - else: - log(u"Data from unlinked %s %s (%d bytes)." % (label, format_peername(fd), len(data))) - fd.buf += data - if len(fd.buf) >= UNCONNECTED_BUFFER_LIMIT: - log(u"Refusing to buffer more than %d bytes from %s %s." % (UNCONNECTED_BUFFER_LIMIT, label, format_peername(fd))) - fd.close() - return False - return True - -def match_proxies(): - while unlinked_remotes and unlinked_locals: - remote = unlinked_remotes.pop(0) - local = unlinked_locals.pop(0) - remote_addr, remote_port = remote.getpeername() - local_addr, local_port = local.getpeername() - log(u"Linking %s and %s." % (format_peername(local), format_peername(remote))) - remote.partner = local - local.partner = remote - if remote.buf: - if not proxy_chunk_remote_to_local(remote, local, remote.buf): - remotes.remove(remote) - locals.remove(local) - register() - return - if local.buf: - if not proxy_chunk_local_to_remote(local, remote, local.buf): - remotes.remove(remote) - locals.remove(local) - return - -class TimeoutSocket(object): - def __init__(self, fd): - self.fd = fd - self.birthday = time.time() - - def age(self): - return time.time() - self.birthday - - def __getattr__(self, name): - return getattr(self.fd, name) - -class RemoteSocket(object): - def __init__(self, fd, protocols): - self.fd = fd - self.buf = "" - self.partner = None - self.dec = WebSocketBinaryDecoder(protocols, use_mask = True) - self.enc = WebSocketBinaryEncoder(protocols, use_mask = False) - - def send_chunk(self, data): - self.sendall(self.enc.encode(data)) - - def __getattr__(self, name): - return getattr(self.fd, name) - -class LocalSocket(object): - def __init__(self, fd): - self.fd = fd - self.buf = "" - self.partner = None - - def send_chunk(self, data): - self.sendall(data) - - def __getattr__(self, name): - return getattr(self.fd, name) - -def main(): - while True: - rset = [remote_s, local_s] + websocket_pending + socks_pending + locals + remotes - rset, _, _ = select.select(rset, [], [], WEBSOCKET_REQUEST_TIMEOUT) - for fd in rset: - if fd == remote_s: - remote_c, addr = fd.accept() - log(u"Remote connection from %s." % safe_format_addr(addr)) - websocket_pending.append(TimeoutSocket(remote_c)) - elif fd == local_s: - local_c, addr = fd.accept() - log(u"Local connection from %s." % safe_format_addr(addr)) - socks_pending.append(local_c) - register() - elif fd in websocket_pending: - log(u"Data from WebSocket-pending %s." % safe_format_addr(addr)) - protocols = handle_websocket_request(fd) - if protocols is not None: - wrapped = RemoteSocket(fd, protocols) - remotes.append(wrapped) - unlinked_remotes.append(wrapped) - else: - fd.close() - websocket_pending.remove(fd) - report_pending() - elif fd in socks_pending: - log(u"SOCKS request from %s." % safe_format_addr(addr)) - if handle_socks_request(fd): - wrapped = LocalSocket(fd) - locals.append(wrapped) - unlinked_locals.append(wrapped) - else: - fd.close() - socks_pending.remove(fd) - report_pending() - elif fd in remotes: - local = fd.partner - if local: - if not proxy_chunk_remote_to_local(fd, local): - remotes.remove(fd) - locals.remove(local) - register() - else: - if not receive_unlinked(fd, "remote"): - remotes.remove(fd) - unlinked_remotes.remove(fd) - register() - report_pending() - elif fd in locals: - remote = fd.partner - if remote: - if not proxy_chunk_local_to_remote(fd, remote): - remotes.remove(remote) - locals.remove(fd) - else: - if not receive_unlinked(fd, "local"): - locals.remove(fd) - unlinked_locals.remove(fd) - report_pending() - match_proxies() - while websocket_pending: - pending = websocket_pending[0] - if pending.age() < WEBSOCKET_REQUEST_TIMEOUT: - break - log(u"Expired remote connection from %s." % format_peername(pending)) - pending.close() - websocket_pending.pop(0) - report_pending() - -if __name__ == "__main__": - opts, args = getopt.gnu_getopt(sys.argv[1:], "f:hl:r", ["daemon", "facilitator=", "help", "log=", "pidfile=", "register", "unsafe-logging"]) - for o, a in opts: - if o == "--daemon": - options.daemonize = True - elif o == "-f" or o == "--facilitator": - options.facilitator_addr = parse_addr_spec(a) - elif o == "-h" or o == "--help": - usage() - sys.exit() - elif o == "-l" or o == "--log": - options.log_filename = a - elif o == "--pidfile": - options.pid_filename = a - elif o == "-r" or o == "--register": - options.register = True - elif o == "--unsafe-logging": - options.safe_logging = False - - if len(args) == 0: - options.local_addr = (DEFAULT_LOCAL_ADDRESS, DEFAULT_LOCAL_PORT) - options.remote_addr = (DEFAULT_REMOTE_ADDRESS, DEFAULT_REMOTE_PORT) - elif len(args) == 1: - options.local_addr = parse_addr_spec(args[0], DEFAULT_LOCAL_ADDRESS, DEFAULT_LOCAL_PORT) - options.remote_addr = (DEFAULT_REMOTE_ADDRESS, DEFAULT_REMOTE_PORT) - elif len(args) == 2: - options.local_addr = parse_addr_spec(args[0], DEFAULT_LOCAL_ADDRESS, DEFAULT_LOCAL_PORT) - options.remote_addr = parse_addr_spec(args[1], DEFAULT_REMOTE_ADDRESS, DEFAULT_REMOTE_PORT) - else: - usage(sys.stderr) - sys.exit(1) - - if options.log_filename: - options.log_file = open(options.log_filename, "a") - # Send error tracebacks to the log. - sys.stderr = options.log_file - else: - options.log_file = sys.stdout - - # Local socket, accepting SOCKS requests from localhost - local_s = listen_socket(options.local_addr) - # Remote socket, accepting remote WebSocket connections from proxies. - remote_s = listen_socket(options.remote_addr) - - # New remote sockets waiting to finish their WebSocket negotiation. - websocket_pending = [] - # Remote connection sockets. - remotes = [] - # Remotes not yet linked with a local. This is a subset of remotes. - unlinked_remotes = [] - # New local sockets waiting to finish their SOCKS negotiation. - socks_pending = [] - # Local Tor sockets, after SOCKS negotiation. - locals = [] - # Locals not yet linked with a remote. This is a subset of remotes. - unlinked_locals = [] - - register() - - if options.daemonize: - log(u"Daemonizing.") - pid = os.fork() - if pid != 0: - if options.pid_filename: - f = open(options.pid_filename, "w") - print >> f, pid - f.close() - sys.exit(0) - try: - main() - except Exception: - exc = traceback.format_exc() - log("".join(exc)) diff --git a/flashproxy-reg-http b/flashproxy-reg-http new file mode 100755 index 0000000..209f77e --- /dev/null +++ b/flashproxy-reg-http @@ -0,0 +1,111 @@ +#!/usr/bin/env python + +import getopt +import httplib +import socket +import re +import sys +import urllib + +DEFAULT_REMOTE_ADDR = "" +DEFAULT_REMOTE_PORT = 9000 +DEFAULT_FACILITATOR_HOST = "tor-facilitator.bamsoftware.com" +DEFAULT_FACILITATOR_PORT = 9002 + +class options(object): + facilitator_addr = None + remote_addr = None + +def usage(f = sys.stdout): + print >> f, """\ +Usage: %(progname)s [HOSTNAME][:PORT] +Register with a flash proxy facilitator using an HTTP POST. By default the +facilitator address is "%(fac_addr)s". + + -a, --address=ADDRESS register the given address instead of \"%(remote_addr)s\". + -h, --help show this help. \ +""" % { + "progname": sys.argv[0], + "fac_addr": format_addr((DEFAULT_FACILITATOR_HOST, DEFAULT_FACILITATOR_PORT)), + "remote_addr": format_addr((DEFAULT_REMOTE_ADDR, DEFAULT_REMOTE_PORT)), +} + +def parse_addr_spec(spec, defhost = None, defport = None): + host = None + port = None + m = None + # IPv6 syntax. + if not m: + m = re.match(ur'^\[(.+)\]:(\d+)$', spec) + if m: + host, port = m.groups() + af = socket.AF_INET6 + if not m: + m = re.match(ur'^\[(.+)\]:?$', spec) + if m: + host, = m.groups() + af = socket.AF_INET6 + # IPv4 syntax. + if not m: + m = re.match(ur'^(.+):(\d+)$', spec) + if m: + host, port = m.groups() + af = socket.AF_INET + if not m: + m = re.match(ur'^:?(\d+)$', spec) + if m: + port, = m.groups() + af = 0 + if not m: + host = spec + af = 0 + host = host or defhost + port = port or defport + if not port: + raise ValueError("Bad address specification \"%s\"" % spec) + return host, int(port) + +def format_addr(addr): + host, port = addr + if not host: + return u":%d" % port + # Numeric IPv6 address? + try: + addrs = socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM, socket.IPPROTO_TCP, socket.AI_NUMERICHOST) + af = addrs[0][0] + except socket.gaierror, e: + af = 0 + if af == socket.AF_INET6: + return u"[%s]:%d" % (host, port) + else: + return u"%s:%d" % (host, port) + +options.facilitator_addr = (DEFAULT_FACILITATOR_HOST, DEFAULT_FACILITATOR_PORT) +options.remote_addr = (DEFAULT_REMOTE_ADDR, DEFAULT_REMOTE_PORT) + +opts, args = getopt.gnu_getopt(sys.argv[1:], "a:h", ["address=", "help"]) +for o, a in opts: + if o == "-a" or o == "--address": + options.remote_addr = parse_addr_spec(a, DEFAULT_REMOTE_ADDR, DEFAULT_REMOTE_PORT) + elif o == "-h" or o == "--help": + usage() + sys.exit() + +if len(args) == 0: + pass +elif len(args) == 1: + options.facilitator_addr = parse_addr_spec(args[0], DEFAULT_FACILITATOR_HOST, DEFAULT_FACILITATOR_PORT) +else: + usage(sys.stderr) + sys.exit(1) + +spec = format_addr(options.remote_addr) +http = httplib.HTTPConnection(*options.facilitator_addr) +try: + http.request("POST", "/", urllib.urlencode({"client": spec})) +except Exception, e: + print >> sys.stderr, "Failed to register: %s" % str(e) + sys.exit(1) +http.close() + +print "Registered \"%s\" with %s." % (spec, format_addr(options.facilitator_addr)) diff --git a/flashproxy-reg-http.py b/flashproxy-reg-http.py deleted file mode 100755 index 209f77e..0000000 --- a/flashproxy-reg-http.py +++ /dev/null @@ -1,111 +0,0 @@ -#!/usr/bin/env python - -import getopt -import httplib -import socket -import re -import sys -import urllib - -DEFAULT_REMOTE_ADDR = "" -DEFAULT_REMOTE_PORT = 9000 -DEFAULT_FACILITATOR_HOST = "tor-facilitator.bamsoftware.com" -DEFAULT_FACILITATOR_PORT = 9002 - -class options(object): - facilitator_addr = None - remote_addr = None - -def usage(f = sys.stdout): - print >> f, """\ -Usage: %(progname)s [HOSTNAME][:PORT] -Register with a flash proxy facilitator using an HTTP POST. By default the -facilitator address is "%(fac_addr)s". - - -a, --address=ADDRESS register the given address instead of \"%(remote_addr)s\". - -h, --help show this help. \ -""" % { - "progname": sys.argv[0], - "fac_addr": format_addr((DEFAULT_FACILITATOR_HOST, DEFAULT_FACILITATOR_PORT)), - "remote_addr": format_addr((DEFAULT_REMOTE_ADDR, DEFAULT_REMOTE_PORT)), -} - -def parse_addr_spec(spec, defhost = None, defport = None): - host = None - port = None - m = None - # IPv6 syntax. - if not m: - m = re.match(ur'^\[(.+)\]:(\d+)$', spec) - if m: - host, port = m.groups() - af = socket.AF_INET6 - if not m: - m = re.match(ur'^\[(.+)\]:?$', spec) - if m: - host, = m.groups() - af = socket.AF_INET6 - # IPv4 syntax. - if not m: - m = re.match(ur'^(.+):(\d+)$', spec) - if m: - host, port = m.groups() - af = socket.AF_INET - if not m: - m = re.match(ur'^:?(\d+)$', spec) - if m: - port, = m.groups() - af = 0 - if not m: - host = spec - af = 0 - host = host or defhost - port = port or defport - if not port: - raise ValueError("Bad address specification \"%s\"" % spec) - return host, int(port) - -def format_addr(addr): - host, port = addr - if not host: - return u":%d" % port - # Numeric IPv6 address? - try: - addrs = socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM, socket.IPPROTO_TCP, socket.AI_NUMERICHOST) - af = addrs[0][0] - except socket.gaierror, e: - af = 0 - if af == socket.AF_INET6: - return u"[%s]:%d" % (host, port) - else: - return u"%s:%d" % (host, port) - -options.facilitator_addr = (DEFAULT_FACILITATOR_HOST, DEFAULT_FACILITATOR_PORT) -options.remote_addr = (DEFAULT_REMOTE_ADDR, DEFAULT_REMOTE_PORT) - -opts, args = getopt.gnu_getopt(sys.argv[1:], "a:h", ["address=", "help"]) -for o, a in opts: - if o == "-a" or o == "--address": - options.remote_addr = parse_addr_spec(a, DEFAULT_REMOTE_ADDR, DEFAULT_REMOTE_PORT) - elif o == "-h" or o == "--help": - usage() - sys.exit() - -if len(args) == 0: - pass -elif len(args) == 1: - options.facilitator_addr = parse_addr_spec(args[0], DEFAULT_FACILITATOR_HOST, DEFAULT_FACILITATOR_PORT) -else: - usage(sys.stderr) - sys.exit(1) - -spec = format_addr(options.remote_addr) -http = httplib.HTTPConnection(*options.facilitator_addr) -try: - http.request("POST", "/", urllib.urlencode({"client": spec})) -except Exception, e: - print >> sys.stderr, "Failed to register: %s" % str(e) - sys.exit(1) -http.close() - -print "Registered \"%s\" with %s." % (spec, format_addr(options.facilitator_addr)) diff --git a/init.d/facilitator b/init.d/facilitator index 152f03c..4e8ab1d 100755 --- a/init.d/facilitator +++ b/init.d/facilitator @@ -5,7 +5,7 @@ # # chkconfig: 2345 90 10 # description: Flash proxy facilitator. -# processname: facilitator.py +# processname: facilitator # pidfile: /var/flashproxy/facilitator.pid # Installation instructions: @@ -22,7 +22,7 @@ RELAY=... BINDIR=/usr/local/bin VARDIR=/var/flashproxy -PROG=$BINDIR/facilitator.py +PROG=$BINDIR/facilitator PIDFILE=$VARDIR/facilitator.pid USER=flashproxy diff --git a/torrc b/torrc index 216c292..4ee54e1 100644 --- a/torrc +++ b/torrc @@ -1,6 +1,6 @@ ## Configuration file for Tor over flash proxies. ## Usage: -## python flashproxy-client.py --register +## flashproxy-client --register ## tor -f torrc ClientTransportPlugin websocket socks4 127.0.0.1:9001
participants (1)
-
dcf@torproject.org