commit a482cca6aff72b40529d02dc1c6f0479dc2a76ee Author: David Fifield david@bamsoftware.com Date: Thu Aug 30 10:31:20 2012 -0700
Add prototype of new facilitator. --- facilitator | 316 +++++++++++++---------------------------------------------- 1 files changed, 68 insertions(+), 248 deletions(-)
diff --git a/facilitator b/facilitator index 2a04e71..61129c7 100755 --- a/facilitator +++ b/facilitator @@ -1,9 +1,6 @@ #!/usr/bin/env python
-import BaseHTTPServer import SocketServer -import cgi -import errno import getopt import os import re @@ -11,17 +8,16 @@ import socket import sys import threading import time -import urllib -import urlparse
-DEFAULT_ADDRESS = "0.0.0.0" -DEFAULT_PORT = 9002 +LISTEN_ADDRESS = "127.0.0.1" +DEFAULT_LISTEN_PORT = 9002 DEFAULT_RELAY_PORT = 9001 DEFAULT_LOG_FILENAME = "facilitator.log"
LOG_DATE_FORMAT = "%Y-%m-%d %H:%M:%S"
class options(object): + listen_port = DEFAULT_LISTEN_PORT log_filename = DEFAULT_LOG_FILENAME log_file = sys.stdout relay_spec = None @@ -38,30 +34,22 @@ class options(object):
def usage(f = sys.stdout): print >> f, """\ -Usage: %(progname)s -r RELAY <OPTIONS> [HOST] [PORT] -Flash proxy facilitator: Register client addresses with HTTP POST requests -and serve them out again with HTTP GET. Listen on HOST and PORT, by default -%(addr)s %(port)d. +Usage: %(progname)s -r RELAY <OPTIONS> +Flash proxy facilitator: Register client addresses and serve them out +again. Listen on 127.0.0.1 and port PORT (by default %(port)d). -d, --debug don't daemonize, log to stdout. -h, --help show this help. -l, --log FILENAME write log to FILENAME (default "%(log)s"). + -p, --port PORT listen on PORT (by default %(port)d). --pidfile FILENAME write PID to FILENAME after daemonizing. -r, --relay RELAY send RELAY (host:port) to proxies as the relay to use. - --unsafe-logging don't scrub IP addresses from logs.\ + --unsafe-logging don't scrub IP addresses from logs.\ """ % { "progname": sys.argv[0], - "addr": DEFAULT_ADDRESS, - "port": DEFAULT_PORT, + "port": DEFAULT_LISTEN_PORT, "log": DEFAULT_LOG_FILENAME, }
-def safe_str(s): - """Return s if options.safe_logging is true, and "[scrubbed]" otherwise.""" - if options.safe_logging: - return "[scrubbed]" - else: - return s - log_lock = threading.Lock() def log(msg): log_lock.acquire() @@ -121,244 +109,76 @@ def format_addr(addr): else: return u"%s:%d" % (host, port)
-class TCPReg(object): - def __init__(self, host, port): - self.host = host - self.port = port - - def __unicode__(self): - return format_addr((self.host, self.port)) - - def __str__(self): - return unicode(self).encode("UTF-8") - - def __cmp__(self, other): - if isinstance(other, TCPReg): - return cmp((self.host, self.port), (other.host, other.port)) - else: - return False - -class Reg(object): - @staticmethod - def parse(spec, defhost = None, defport = None): - try: - af, host, port = parse_addr_spec(spec, defhost, defport) - except ValueError: - pass - else: +class Handler(SocketServer.StreamRequestHandler): + def handle(self): + self.connection.settimeout(1.0) + while True: + data = self.connection.recv(1024) + print repr(data) + +class Server(SocketServer.ThreadingMixIn, SocketServer.TCPServer): + allow_reuse_address = True + +def main(): + opts, args = getopt.gnu_getopt(sys.argv[1:], "dhl:p:r:", + ["debug", "help", "log=", "port=", "pidfile=", "relay=", "unsafe-logging"]) + for o, a in opts: + if o == "-d" or o == "--debug": + options.daemonize = False + options.log_filename = None + elif o == "-h" or o == "--help": + usage() + sys.exit() + elif o == "-l" or o == "--log": + options.log_filename = a + elif o == "-p" or o == "--port": + options.listen_port = int(a) + elif o == "--pidfile": + options.pid_filename = a + elif o == "-r" or o == "--relay": try: - addrs = socket.getaddrinfo(host, port, af, socket.SOCK_STREAM, socket.IPPROTO_TCP, socket.AI_NUMERICHOST) + options.set_relay_spec(a) except socket.gaierror, e: - raise ValueError("Bad host or port: "%s" "%s": %s" % (host, port, str(e))) - if not addrs: - raise ValueError("Bad host or port: "%s" "%s"" % (host, port)) - - host, port = socket.getnameinfo(addrs[0][4], socket.NI_NUMERICHOST | socket.NI_NUMERICSERV) - return TCPReg(host, int(port)) - - raise ValueError("Bad spec format: %s" % repr(spec)) - -class RegSet(object): - def __init__(self): - self.set = [] - self.cv = threading.Condition() - - def add(self, reg): - self.cv.acquire() - try: - if reg not in list(self.set): - self.set.append(reg) - self.cv.notify() - return True - else: - return False - finally: - self.cv.release() - - def fetch(self): - self.cv.acquire() - try: - if not self.set: - return None - return self.set.pop(0) - finally: - self.cv.release() - - def __len__(self): - self.cv.acquire() - try: - return len(self.set) - finally: - self.cv.release() - -class Handler(BaseHTTPServer.BaseHTTPRequestHandler): - def do_GET(self): - proxy_addr_s = format_addr(self.client_address) - - path = urlparse.urlsplit(self.path)[2] - - reg = REGS.fetch() - if reg: - log(u"proxy %s gets %s, relay %s (now %d)" % - (safe_str(proxy_addr_s), safe_str(unicode(reg)), options.relay_spec, len(REGS))) - else: - log(u"proxy %s gets none" % safe_str(proxy_addr_s)) - self.send_client(reg) - - def do_POST(self): - client_addr_s = format_addr(self.client_address) + print >> sys.stderr, u"Can't resolve relay %s: %s" % (repr(a), str(e)) + sys.exit(1) + elif o == "--unsafe-logging": + options.safe_logging = False
- data = cgi.FieldStorage(fp = self.rfile, headers = self.headers, - environ = {"REQUEST_METHOD": "POST"}) - - client_spec = data.getfirst("client") - if client_spec is None: - log(u"client %s missing "client" param" % safe_str(client_addr_s)) - self.send_error(400) - return - - try: - reg = Reg.parse(client_spec, self.client_address[0]) - except ValueError, e: - log(u"client %s syntax error in %s: %s" - % (safe_str(client_addr_s), safe_str(repr(client_spec)), repr(str(e)))) - self.send_error(400) - return - - if REGS.add(reg): - log(u"client %s %s (now %d)" - % (safe_str(client_addr_s), safe_str(unicode(reg)), len(REGS))) - else: - log(u"client %s %s (already present, now %d)" - % (safe_str(client_addr_s), safe_str(unicode(reg)), len(REGS))) - - self.send_response(200) - self.end_headers() - - def send_error(self, code, message = None): - self.send_response(code) - self.end_headers() - if message: - self.wfile.write(message) - - def log_request(self, code): - pass - - def log_message(self, format, *args): - msg = format % args - log(u"message from HTTP handler for %s: %s" - % (format_addr(self.client_address), repr(msg))) - - def send_client(self, reg): - if reg: - client_str = str(reg) - else: - client_str = "" - self.send_response(200) - self.send_header("Content-Type", "application/x-www-form-urlencoded") - self.send_header("Cache-Control", "no-cache") - # Allow XMLHttpRequest from any domain. http://www.w3.org/TR/cors/. - self.send_header("Access-Control-Allow-Origin", "*") - self.end_headers() - - data = {} - data["client"] = client_str - data["relay"] = options.relay_spec - self.request.send(urllib.urlencode(data)) - - # Catch "broken pipe" errors that otherwise cause a stack trace in the log. - def catch_epipe(fn): - def ret(self, *args): - try: - fn(self, *args) - except socket.error, e: - try: - err_num = e.errno - except AttributeError: - # Before Python 2.6, exception can be a pair. - err_num, errstr = e - except: - raise - if err_num != errno.EPIPE: - raise - return ret - handle = catch_epipe(BaseHTTPServer.BaseHTTPRequestHandler.handle) - finish = catch_epipe(BaseHTTPServer.BaseHTTPRequestHandler.finish) - -REGS = RegSet() - -opts, args = getopt.gnu_getopt(sys.argv[1:], "dhl:r:", - ["debug", "help", "log=", "pidfile=", "relay=", "unsafe-logging"]) -for o, a in opts: - if o == "-d" or o == "--debug": - options.daemonize = False - options.log_filename = None - elif o == "-h" or o == "--help": - usage() - sys.exit() - elif o == "-l" or o == "--log": - options.log_filename = a - elif o == "--pidfile": - options.pid_filename = a - elif o == "-r" or o == "--relay": - try: - options.set_relay_spec(a) - except socket.gaierror, e: - print >> sys.stderr, u"Can't resolve relay %s: %s" % (repr(a), str(e)) - sys.exit(1) - elif o == "--unsafe-logging": - options.safe_logging = False - -if not options.relay_spec: - print >> sys.stderr, """\ + if not options.relay_spec: + print >> sys.stderr, """\ The -r option is required. Give it the relay that will be sent to proxies. -r HOST[:PORT]\ -""" - sys.exit(1) + """ + sys.exit(1)
-if len(args) == 0: - address = (DEFAULT_ADDRESS, DEFAULT_PORT) -elif len(args) == 1: - # Either HOST or PORT may be omitted; figure out which one. - if args[0].isdigit(): - address = (DEFAULT_ADDRESS, args[0]) + if options.log_filename: + options.log_file = open(options.log_filename, "a") + # Send error tracebacks to the log. + sys.stderr = options.log_file else: - address = (args[0], DEFAULT_PORT) -elif len(args) == 2: - address = (args[0], args[1]) -else: - usage(sys.stderr) - sys.exit(1) + options.log_file = sys.stdout
-if options.log_filename: - options.log_file = open(options.log_filename, "a") - # Send error tracebacks to the log. - sys.stderr = options.log_file -else: - options.log_file = sys.stdout + addrinfo = socket.getaddrinfo(LISTEN_ADDRESS, options.listen_port, 0, socket.SOCK_STREAM, socket.IPPROTO_TCP)[0]
-addrinfo = socket.getaddrinfo(address[0], address[1], 0, socket.SOCK_STREAM, socket.IPPROTO_TCP)[0] + server = Server(addrinfo[4], Handler)
-class Server(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer): - pass + log(u"start on %s" % format_addr(addrinfo[4])) + log(u"using relay address %s" % options.relay_spec)
-# Setup the server -server = Server(addrinfo[4], Handler) + if options.daemonize: + log(u"daemonizing") + pid = os.fork() + if pid != 0: + if options.pid_filename: + f = open(options.pid_filename, "w") + print >> f, pid + f.close() + sys.exit(0)
-log(u"start on %s" % format_addr(addrinfo[4])) -log(u"using relay address %s" % options.relay_spec) - -if options.daemonize: - log(u"daemonizing") - pid = os.fork() - if pid != 0: - if options.pid_filename: - f = open(options.pid_filename, "w") - print >> f, pid - f.close() + try: + server.serve_forever() + except KeyboardInterrupt: sys.exit(0)
-try: - server.serve_forever() -except KeyboardInterrupt: - sys.exit(0) +if __name__ == "__main__": + main()