commit b969a80fa9d90781e45d2d0c4a40fdbb0225bfb8 Author: David Fifield david@bamsoftware.com Date: Thu Jul 5 03:19:25 2012 -0700
Rename "connector" → "flashproxy-client" generally.
"Connector" was never a good name because this component listens on two different ports and doesn't connect to anything. It also doesn't make sense to invent new terminology when we fit into the pluggable transports model. --- Makefile | 4 +- README | 36 +- connector-test.py | 220 ------ connector.py | 932 ----------------------- doc/design.txt | 72 +- experiments/exercise/exercise.sh | 2 +- experiments/switching/local-http-alternating.sh | 4 +- experiments/switching/local-http-constant.sh | 4 +- experiments/switching/remote-tor-alternating.sh | 4 +- experiments/switching/remote-tor-constant.sh | 4 +- experiments/throughput/throughput.sh | 18 +- flashproxy-client-test.py | 224 ++++++ flashproxy-client.py | 932 +++++++++++++++++++++++ torrc | 2 +- 14 files changed, 1233 insertions(+), 1225 deletions(-)
diff --git a/Makefile b/Makefile index 9a2f10c..3b30d61 100644 --- a/Makefile +++ b/Makefile @@ -6,13 +6,13 @@ all:
install: mkdir -p $(BINDIR) - cp -f connector.py facilitator.py flashproxy-reg-http.py $(BINDIR) + cp -f flashproxy-client.py flashproxy-reg-http.py facilitator.py $(BINDIR)
clean: rm -f *.pyc
test: - ./connector-test.py + ./flashproxy-client-test.py ./flashproxy-test.js
.PHONY: all clean test diff --git a/README b/README index 1dbca7a..d14c9cc 100644 --- a/README +++ b/README @@ -6,25 +6,25 @@ means version 0.2.2.32 (released 2011-08-29) or later. All the flashproxy programs and source code can be downloaded this way: git clone https://git.torproject.org/flashproxy.git But as a user you only need these files: - https://gitweb.torproject.org/flashproxy.git/blob_plain/HEAD:/connector.py + https://gitweb.torproject.org/flashproxy.git/blob_plain/HEAD:/flashproxy-cli... https://gitweb.torproject.org/flashproxy.git/blob_plain/HEAD:/torrc
You must be able to receive TCP connections; unfortunately means that -you cannot be behind NAT. See the section "Using a public connector" -below to try out the system even behind NAT. - -1. Run the connector. - $ python connector.py --register - By default the connector listens on Internet-facing TCP port 9000. If - you have to use a different port (to get through a firewall, for - example), give it on the command lines like this (here using port +you cannot be behind NAT. See the section "Using a public client +transport plugin" below to try out the system even behind NAT. + +1. Run the client transport plugin. + $ python flashproxy-client.py --register + By default the transport plugin listens on Internet-facing TCP port + 9000. If you have to use a different port (to get through a firewall, + for example), give it on the command lines like this (here using port 8888): - $ python connector.py --register 127.0.0.1:9001 :8888 + $ python flashproxy-client.py --register 127.0.0.1:9001 :8888 2. Run Tor using the included torrc file. $ tor -f torrc
-Watch the output of connector.py and tor. From connector.py you are -expecting output lines like this: +Watch the output of flashproxy-client.py and tor. From +flashproxy-client.py you are expecting output lines like this: Remote connection from [scrubbed]. Local connection from [scrubbed]. Linking [scrubbed] and [scrubbed]. @@ -70,13 +70,13 @@ computer into a flash proxy as long as the page is open. http://crypto.stanford.edu/flashproxy/
-== Using a public connector +== Using a public client transport plugin
-Rather than running connector.py on your computer, you can use a public -connector. This way is not as realistic because all your Tor traffic -will first go to a public connector, which is at a fixed address and can -be easily blocked. However this is an easy way to try out the system -without having to do port forwarding. +Rather than running flashproxy-client.py on your computer, you can use a +public instance of it. This way is not as realistic because all your Tor +traffic will first go to a fixed address and can be easily blocked. +However this is an easy way to try out the system without having to do +port forwarding. $ tor ClientTransportPlugin "websocket socks4 tor-facilitator.bamsoftware.com:9999" UseBridges 1 Bridge "websocket 127.0.0.1:9001" LearnCircuitBuildTimeout 0
diff --git a/connector-test.py b/connector-test.py deleted file mode 100755 index 374b173..0000000 --- a/connector-test.py +++ /dev/null @@ -1,220 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -import socket -import subprocess -import unittest -from connector import parse_socks_request, WebSocketDecoder, WebSocketEncoder - -LOCAL_ADDRESS = ("127.0.0.1", 40000) -REMOTE_ADDRESS = ("127.0.0.1", 40001) - -class TestSocks(unittest.TestCase): - def test_parse_socks_request_empty(self): - self.assertRaises(ValueError, parse_socks_request, "") - def test_parse_socks_request_short(self): - self.assertRaises(ValueError, parse_socks_request, "\x04\x01\x99\x99\x01\x02\x03\x04") - def test_parse_socks_request_ip_userid_missing(self): - dest, port = parse_socks_request("\x04\x01\x99\x99\x01\x02\x03\x04\x00") - dest, port = parse_socks_request("\x04\x01\x99\x99\x01\x02\x03\x04\x00userid") - self.assertEqual((dest, port), ("1.2.3.4", 0x9999)) - def test_parse_socks_request_ip(self): - dest, port = parse_socks_request("\x04\x01\x99\x99\x01\x02\x03\x04userid\x00") - self.assertEqual((dest, port), ("1.2.3.4", 0x9999)) - def test_parse_socks_request_hostname_missing(self): - self.assertRaises(ValueError, parse_socks_request, "\x04\x01\x99\x99\x00\x00\x00\x01userid\x00") - self.assertRaises(ValueError, parse_socks_request, "\x04\x01\x99\x99\x00\x00\x00\x01userid\x00abc") - def test_parse_socks_request_hostname(self): - dest, port = parse_socks_request("\x04\x01\x99\x99\x00\x00\x00\x01userid\x00abc\x00") - -def read_frames(dec): - frames = [] - while True: - frame = dec.read_frame() - if frame is None: - break - frames.append((frame.fin, frame.opcode, frame.payload)) - return frames - -def read_messages(dec): - messages = [] - while True: - message = dec.read_message() - if message is None: - break - messages.append((message.opcode, message.payload)) - return messages - -class TestWebSocketDecoder(unittest.TestCase): - def test_rfc(self): - """Test samples from RFC 6455 section 5.7.""" - TESTS = [ - ("\x81\x05\x48\x65\x6c\x6c\x6f", False, - [(True, 1, "Hello")], - [(1, u"Hello")]), - ("\x81\x85\x37\xfa\x21\x3d\x7f\x9f\x4d\x51\x58", True, - [(True, 1, "Hello")], - [(1, u"Hello")]), - ("\x01\x03\x48\x65\x6c\x80\x02\x6c\x6f", False, - [(False, 1, "Hel"), (True, 0, "lo")], - [(1, u"Hello")]), - ("\x89\x05\x48\x65\x6c\x6c\x6f", False, - [(True, 9, "Hello")], - [(9, u"Hello")]), - ("\x8a\x85\x37\xfa\x21\x3d\x7f\x9f\x4d\x51\x58", True, - [(True, 10, "Hello")], - [(10, u"Hello")]), - ("\x82\x7e\x01\x00" + "\x00" * 256, False, - [(True, 2, "\x00" * 256)], - [(2, "\x00" * 256)]), - ("\x82\x7f\x00\x00\x00\x00\x00\x01\x00\x00" + "\x00" * 65536, False, - [(True, 2, "\x00" * 65536)], - [(2, "\x00" * 65536)]), - ("\x82\x7f\x00\x00\x00\x00\x00\x01\x00\x03" + "ABCD" * 16384 + "XYZ", False, - [(True, 2, "ABCD" * 16384 + "XYZ")], - [(2, "ABCD" * 16384 + "XYZ")]), - ] - for data, use_mask, expected_frames, expected_messages in TESTS: - dec = WebSocketDecoder(use_mask = use_mask) - dec.feed(data) - actual_frames = read_frames(dec) - self.assertEqual(actual_frames, expected_frames) - - dec = WebSocketDecoder(use_mask = use_mask) - dec.feed(data) - actual_messages = read_messages(dec) - self.assertEqual(actual_messages, expected_messages) - - dec = WebSocketDecoder(use_mask = not use_mask) - dec.feed(data) - self.assertRaises(WebSocketDecoder.MaskingError, dec.read_frame) - - def test_empty_feed(self): - """Test that the decoder can handle a zero-byte feed.""" - dec = WebSocketDecoder() - self.assertEqual(dec.read_frame(), None) - dec.feed("") - self.assertEqual(dec.read_frame(), None) - dec.feed("\x81\x05H") - self.assertEqual(dec.read_frame(), None) - dec.feed("ello") - self.assertEqual(read_frames(dec), [(True, 1, u"Hello")]) - - def test_empty_frame(self): - """Test that a frame may contain a zero-byte payload.""" - dec = WebSocketDecoder() - dec.feed("\x81\x00") - self.assertEqual(read_frames(dec), [(True, 1, u"")]) - dec.feed("\x82\x00") - self.assertEqual(read_frames(dec), [(True, 2, "")]) - - def test_empty_message(self): - """Test that a message may have a zero-byte payload.""" - dec = WebSocketDecoder() - dec.feed("\x01\x00\x00\x00\x80\x00") - self.assertEqual(read_messages(dec), [(1, u"")]) - dec.feed("\x02\x00\x00\x00\x80\x00") - self.assertEqual(read_messages(dec), [(2, "")]) - - def test_interleaved_control(self): - """Test that control messages interleaved with fragmented messages are - returned.""" - dec = WebSocketDecoder() - dec.feed("\x89\x04PING\x01\x03Hel\x8a\x04PONG\x80\x02lo\x89\x04PING") - self.assertEqual(read_messages(dec), [(9, "PING"), (10, "PONG"), (1, u"Hello"), (9, "PING")]) - - def test_fragmented_control(self): - """Test that illegal fragmented control messages cause an error.""" - dec = WebSocketDecoder() - dec.feed("\x09\x04PING") - self.assertRaises(ValueError, dec.read_message) - - def test_zero_opcode(self): - """Test that it is an error for the first frame in a message to have an - opcode of 0.""" - dec = WebSocketDecoder() - dec.feed("\x80\x05Hello") - self.assertRaises(ValueError, dec.read_message) - dec = WebSocketDecoder() - dec.feed("\x00\x05Hello") - self.assertRaises(ValueError, dec.read_message) - - def test_nonzero_opcode(self): - """Test that every frame after the first must have a zero opcode.""" - dec = WebSocketDecoder() - dec.feed("\x01\x01H\x01\x02el\x80\x02lo") - self.assertRaises(ValueError, dec.read_message) - dec = WebSocketDecoder() - dec.feed("\x01\x01H\x00\x02el\x01\x02lo") - self.assertRaises(ValueError, dec.read_message) - - def test_utf8(self): - """Test that text frames (opcode 1) are decoded from UTF-8.""" - text = u"Hello World or Καλημέρα κόσμε or こんにちは 世界 or \U0001f639" - utf8_text = text.encode("utf-8") - dec = WebSocketDecoder() - dec.feed("\x81" + chr(len(utf8_text)) + utf8_text) - self.assertEqual(read_messages(dec), [(1, text)]) - - def test_wrong_utf8(self): - """Test that failed UTF-8 decoding causes an error.""" - TESTS = [ - "\xc0\x41", # Non-shortest form. - "\xc2", # Unfinished sequence. - ] - for test in TESTS: - dec = WebSocketDecoder() - dec.feed("\x81" + chr(len(test)) + test) - self.assertRaises(ValueError, dec.read_message) - - def test_overly_large_payload(self): - """Test that large payloads are rejected.""" - dec = WebSocketDecoder() - dec.feed("\x82\x7f\x00\x00\x00\x00\x01\x00\x00\x00") - self.assertRaises(ValueError, dec.read_frame) - -class TestWebSocketEncoder(unittest.TestCase): - def test_length(self): - """Test that payload lengths are encoded using the smallest number of - bytes.""" - TESTS = [(0, 0), (125, 0), (126, 2), (65535, 2), (65536, 8)] - for length, encoded_length in TESTS: - enc = WebSocketEncoder(use_mask = False) - eframe = enc.encode_frame(2, "\x00" * length) - self.assertEqual(len(eframe), 1 + 1 + encoded_length + length) - enc = WebSocketEncoder(use_mask = True) - eframe = enc.encode_frame(2, "\x00" * length) - self.assertEqual(len(eframe), 1 + 1 + encoded_length + 4 + length) - - def test_roundtrip(self): - TESTS = [ - (1, u"Hello world"), - (1, u"Hello \N{WHITE SMILING FACE}"), - ] - for opcode, payload in TESTS: - for use_mask in (False, True): - enc = WebSocketEncoder(use_mask = use_mask) - enc_message = enc.encode_message(opcode, payload) - dec = WebSocketDecoder(use_mask = use_mask) - dec.feed(enc_message) - self.assertEqual(read_messages(dec), [(opcode, payload)]) - -def format_address(addr): - return "%s:%d" % addr - -class TestConnectionLimit(unittest.TestCase): - def setUp(self): - self.p = subprocess.Popen(["./connector.py", format_address(LOCAL_ADDRESS), format_address(REMOTE_ADDRESS)]) - - def tearDown(self): - self.p.terminate() - - def test_remote_limit(self): - """Test that the connector limits the number of remote connections that - it will accept.""" - for i in range(5): - s = socket.create_connection(REMOTE_ADDRESS, 2) - self.assertRaises(socket.error, socket.create_connection, REMOTE_ADDRESS) - -if __name__ == "__main__": - unittest.main() diff --git a/connector.py b/connector.py deleted file mode 100755 index 8afd0c0..0000000 --- a/connector.py +++ /dev/null @@ -1,932 +0,0 @@ -#!/usr/bin/env python - -import array -import base64 -import cStringIO -import getopt -import httplib -import os -import re -import select -import socket -import struct -import subprocess -import sys -import time -import traceback -import urllib -import xml.sax.saxutils -import BaseHTTPServer - -try: - from hashlib import sha1 -except ImportError: - # Python 2.4 uses this name. - from sha import sha as sha1 - -try: - import numpy -except ImportError: - numpy = None - -DEFAULT_REMOTE_ADDRESS = "0.0.0.0" -DEFAULT_REMOTE_PORT = 9000 -DEFAULT_LOCAL_ADDRESS = "127.0.0.1" -DEFAULT_LOCAL_PORT = 9001 - -LOG_DATE_FORMAT = "%Y-%m-%d %H:%M:%S" - -class options(object): - local_addr = None - remote_addr = None - facilitator_addr = None - - log_filename = None - log_file = sys.stdout - daemonize = False - register = False - pid_filename = None - safe_logging = True - -# We accept up to this many bytes from a socket not yet matched with a partner -# before disconnecting it. -UNCONNECTED_BUFFER_LIMIT = 10240 - -def usage(f = sys.stdout): - print >> f, """\ -Usage: %(progname)s --register [LOCAL][:PORT] [REMOTE][:PORT] -Wait for connections on a local and a remote port. When any pair of connections -exists, data is ferried between them until one side is closed. By default -LOCAL is "%(local)s" and REMOTE is "%(remote)s". - -The local connection acts as a SOCKS4a proxy, but the host and port in the SOCKS -request are ignored and the local connection is always linked to a remote -connection. - -If the --register option is used, then your IP address will be sent to the -facilitator so that proxies can connect to you. You need to register in some way -in order to get any service. The --facilitator option allows controlling which -facilitator is used; if omitted, it uses a public default. - --daemon daemonize (Unix only). - -f, --facilitator=HOST[:PORT] advertise willingness to receive connections to - HOST:PORT. - -h, --help show this help. - -l, --log FILENAME write log to FILENAME (default stdout). - --pidfile FILENAME write PID to FILENAME after daemonizing. - -r, --register register with the facilitator. - --unsafe-logging don't scrub IP addresses from logs.\ -""" % { - "progname": sys.argv[0], - "local": format_addr((DEFAULT_LOCAL_ADDRESS, DEFAULT_LOCAL_PORT)), - "remote": format_addr((DEFAULT_REMOTE_ADDRESS, DEFAULT_REMOTE_PORT)), -} - -def safe_str(s): - """Return s if options.safe_logging is true, and "[scrubbed]" otherwise.""" - if options.safe_logging: - return "[scrubbed]" - else: - return s - -def log(msg): - print >> options.log_file, (u"%s %s" % (time.strftime(LOG_DATE_FORMAT), msg)).encode("UTF-8") - options.log_file.flush() - -def parse_addr_spec(spec, defhost = None, defport = None): - host = None - port = None - m = None - # IPv6 syntax. - if not m: - m = re.match(ur'^[(.+)]:(\d+)$', spec) - if m: - host, port = m.groups() - af = socket.AF_INET6 - if not m: - m = re.match(ur'^[(.+)]:?$', spec) - if m: - host, = m.groups() - af = socket.AF_INET6 - # IPv4 syntax. - if not m: - m = re.match(ur'^(.+):(\d+)$', spec) - if m: - host, port = m.groups() - af = socket.AF_INET - if not m: - m = re.match(ur'^:?(\d+)$', spec) - if m: - port, = m.groups() - af = 0 - if not m: - host = spec - af = 0 - host = host or defhost - port = port or defport - if not (host and port): - raise ValueError("Bad address specification "%s"" % spec) - return host, int(port) - -def format_addr(addr): - host, port = addr - if not host: - return u":%d" % port - # Numeric IPv6 address? - try: - addrs = socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM, socket.IPPROTO_TCP, socket.AI_NUMERICHOST) - af = addrs[0][0] - except socket.gaierror, e: - af = 0 - if af == socket.AF_INET6: - return u"[%s]:%d" % (host, port) - else: - return u"%s:%d" % (host, port) - -def safe_format_addr(addr): - return safe_str(format_addr(addr)) - host, port = addr - if not host: - return u":%d" % port - # Numeric IPv6 address? - try: - addrs = socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM, socket.IPPROTO_TCP, socket.AI_NUMERICHOST) - af = addrs[0][0] - except socket.gaierror, e: - af = 0 - if af == socket.AF_INET6: - return u"[%s]:%d" % (host, port) - else: - return u"%s:%d" % (host, port) - - - -def apply_mask_numpy(payload, mask_key): - if len(payload) == 0: - return "" - payload_a = numpy.frombuffer(payload, dtype="|u4", count=len(payload)//4) - m, = numpy.frombuffer(mask_key, dtype="|u4", count=1) - result = numpy.bitwise_xor(payload_a, m).tostring() - i = len(payload) // 4 * 4 - if i < len(payload): - remainder = [] - while i < len(payload): - remainder.append(chr(ord(payload[i]) ^ ord(mask_key[i % 4]))) - i += 1 - result = result + "".join(remainder) - return result - -def apply_mask_py(payload, mask_key): - result = array.array("B", payload) - m = array.array("B", mask_key) - i = 0 - while i < len(result) - 7: - result[i] ^= m[0] - result[i+1] ^= m[1] - result[i+2] ^= m[2] - result[i+3] ^= m[3] - result[i+4] ^= m[0] - result[i+5] ^= m[1] - result[i+6] ^= m[2] - result[i+7] ^= m[3] - i += 8 - while i < len(result): - result[i] ^= m[i%4] - i += 1 - return result.tostring() - -if numpy is not None: - apply_mask = apply_mask_numpy -else: - apply_mask = apply_mask_py - -class WebSocketFrame(object): - def __init__(self): - self.fin = False - self.opcode = None - self.payload = None - - def is_control(self): - return (self.opcode & 0x08) != 0 - -class WebSocketMessage(object): - def __init__(self): - self.opcode = None - self.payload = None - - def is_control(self): - return (self.opcode & 0x08) != 0 - -class WebSocketDecoder(object): - """RFC 6455 section 5 is about the WebSocket framing format.""" - # Raise an exception rather than buffer anything larger than this. - MAX_MESSAGE_LENGTH = 1024 * 1024 - - class MaskingError(ValueError): - pass - - def __init__(self, use_mask = False): - """use_mask should be True for server-to-client sockets, and False for - client-to-server sockets.""" - self.use_mask = use_mask - - # Per-frame state. - self.buf = "" - - # Per-message state. - self.message_buf = "" - self.message_opcode = None - - def feed(self, data): - self.buf += data - - def read_frame(self): - """Read a frame from the internal buffer, if one is available. Returns a - WebSocketFrame object, or None if there are no complete frames to - read.""" - # RFC 6255 section 5.2. - if len(self.buf) < 2: - return None - offset = 0 - b0, b1 = struct.unpack_from(">BB", self.buf, offset) - offset += 2 - fin = (b0 & 0x80) != 0 - opcode = b0 & 0x0f - frame_masked = (b1 & 0x80) != 0 - payload_len = b1 & 0x7f - - if payload_len == 126: - if len(self.buf) < offset + 2: - return None - payload_len, = struct.unpack_from(">H", self.buf, offset) - offset += 2 - elif payload_len == 127: - if len(self.buf) < offset + 8: - return None - payload_len, = struct.unpack_from(">Q", self.buf, offset) - offset += 8 - - if frame_masked: - if not self.use_mask: - # "A client MUST close a connection if it detects a masked - # frame." - raise self.MaskingError("Got masked payload from server") - if len(self.buf) < offset + 4: - return None - mask_key = self.buf[offset:offset+4] - offset += 4 - else: - if self.use_mask: - # "The server MUST close the connection upon receiving a frame - # that is not masked." - raise self.MaskingError("Got unmasked payload from client") - mask_key = None - - if payload_len > self.MAX_MESSAGE_LENGTH: - raise ValueError("Refusing to buffer payload of %d bytes" % payload_len) - - if len(self.buf) < offset + payload_len: - return None - if mask_key: - payload = apply_mask(self.buf[offset:offset+payload_len], mask_key) - else: - payload = self.buf[offset:offset+payload_len] - self.buf = self.buf[offset+payload_len:] - - frame = WebSocketFrame() - frame.fin = fin - frame.opcode = opcode - frame.payload = payload - - return frame - - def read_message(self): - """Read a complete message. If the opcode is 1, the payload is decoded - from a UTF-8 binary string to a unicode string. If a control frame is - read while another fragmented message is in progress, the control frame - is returned as a new message immediately. Returns None if there is no - complete frame to be read.""" - # RFC 6455 section 5.4 is about fragmentation. - while True: - frame = self.read_frame() - if frame is None: - return None - # "Control frames (see Section 5.5) MAY be injected in the middle of - # a fragmented message. Control frames themselves MUST NOT be - # fragmented." - if frame.is_control(): - if not frame.fin: - raise ValueError("Control frame (opcode %d) has FIN bit clear" % frame.opcode) - message = WebSocketMessage() - message.opcode = frame.opcode - message.payload = frame.payload - return message - - if self.message_opcode is None: - if frame.opcode == 0: - raise ValueError("First frame has opcode 0") - self.message_opcode = frame.opcode - else: - if frame.opcode != 0: - raise ValueError("Non-first frame has nonzero opcode %d" % frame.opcode) - self.message_buf += frame.payload - - if frame.fin: - break - message = WebSocketMessage() - message.opcode = self.message_opcode - message.payload = self.message_buf - self.postprocess_message(message) - self.message_opcode = None - self.message_buf = "" - - return message - - def postprocess_message(self, message): - if message.opcode == 1: - message.payload = message.payload.decode("utf-8") - return message - -class WebSocketEncoder(object): - def __init__(self, use_mask = False): - self.use_mask = use_mask - - def encode_frame(self, opcode, payload): - if opcode >= 16: - raise ValueError("Opcode of %d is >= 16" % opcode) - length = len(payload) - - if self.use_mask: - mask_key = os.urandom(4) - payload = apply_mask(payload, mask_key) - mask_bit = 0x80 - else: - mask_key = "" - mask_bit = 0x00 - - if length < 126: - len_b, len_ext = length, "" - elif length < 0x10000: - len_b, len_ext = 126, struct.pack(">H", length) - elif length < 0x10000000000000000: - len_b, len_ext = 127, struct.pack(">Q", length) - else: - raise ValueError("payload length of %d is too long" % length) - - return chr(0x80 | opcode) + chr(mask_bit | len_b) + len_ext + mask_key + payload - - def encode_message(self, opcode, payload): - if opcode == 1: - payload = payload.encode("utf-8") - return self.encode_frame(opcode, payload) - -# WebSocket implementations generally support text (opcode 1) messages, which -# are UTF-8-encoded text. Not all support binary (opcode 2) messages. During the -# WebSocket handshake, we use the "base64" value of the Sec-WebSocket-Protocol -# header field to indicate that text frames should encoded UTF-8-encoded -# base64-encoded binary data. Binary messages are always interpreted verbatim, -# but text messages are rejected if "base64" was not negotiated. -# -# The idea here is that browsers that know they don't support binary messages -# can negotiate "base64" with both endpoints and still reliably transport binary -# data. Those that know they can support binary messages can just use binary -# messages in the straightforward way. - -class WebSocketBinaryDecoder(object): - def __init__(self, protocols, use_mask = False): - self.dec = WebSocketDecoder(use_mask) - self.base64 = "base64" in protocols - - def feed(self, data): - self.dec.feed(data) - - def read(self): - """Returns None when there are currently no data to be read. Returns "" - when a close message is received.""" - while True: - message = self.dec.read_message() - if message is None: - return None - elif message.opcode == 1: - if not self.base64: - raise ValueError("Received text message on decoder incapable of base64") - payload = base64.b64decode(message.payload) - if payload: - return payload - elif message.opcode == 2: - if message.payload: - return message.payload - elif message.opcode == 8: - return "" - # Ignore all other opcodes. - return None - -class WebSocketBinaryEncoder(object): - def __init__(self, protocols, use_mask = False): - self.enc = WebSocketEncoder(use_mask) - self.base64 = "base64" in protocols - - def encode(self, data): - if self.base64: - return self.enc.encode_message(1, base64.b64encode(data)) - else: - return self.enc.encode_message(2, data) - - -def listen_socket(addr): - """Return a nonblocking socket listening on the given address.""" - addrinfo = socket.getaddrinfo(addr[0], addr[1], 0, socket.SOCK_STREAM, socket.IPPROTO_TCP)[0] - s = socket.socket(addrinfo[0], addrinfo[1], addrinfo[2]) - s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - s.bind(addr) - s.listen(10) - s.setblocking(0) - return s - -def format_peername(s): - try: - return safe_format_addr(s.getpeername()) - except socket.error, e: - return "<unconnected>" - -# How long to wait for a WebSocket request on the remote socket. It is limited -# to avoid Slowloris-like attacks. -WEBSOCKET_REQUEST_TIMEOUT = 2.0 - -class WebSocketRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler): - def __init__(self, request_text, fd): - self.rfile = cStringIO.StringIO(request_text) - self.wfile = fd.makefile() - self.error = False - self.raw_requestline = self.rfile.readline() - self.parse_request() - - def log_message(self, *args): - pass - - def send_error(self, code, message = None): - BaseHTTPServer.BaseHTTPRequestHandler.send_error(self, code, message) - self.error = True - -MAGIC_GUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11" - -def handle_websocket_request(fd): - request_text = fd.recv(10 * 1024) - handler = WebSocketRequestHandler(request_text, fd) - if handler.error or not hasattr(handler, "path"): - return None - method = handler.command - path = handler.path - headers = handler.headers - - # See RFC 6455 section 4.2.1 for this sequence of checks. - # - # 1. An HTTP/1.1 or higher GET request, including a "Request-URI"... - if method != "GET": - handler.send_error(405) - return None - if path != "/": - handler.send_error(404) - return None - - # 2. A |Host| header field containing the server's authority. - # We deliberately skip this test. - - # 3. An |Upgrade| header field containing the value "websocket", treated as - # an ASCII case-insensitive value. - if "websocket" not in [x.strip().lower() for x in headers.get("upgrade").split(",")]: - handler.send_error(400) - return None - - # 4. A |Connection| header field that includes the token "Upgrade", treated - # as an ASCII case-insensitive value. - if "upgrade" not in [x.strip().lower() for x in headers.get("connection").split(",")]: - handler.send_error(400) - return None - - # 5. A |Sec-WebSocket-Key| header field with a base64-encoded value that, - # when decoded, is 16 bytes in length. - try: - key = headers.get("sec-websocket-key") - if len(base64.b64decode(key)) != 16: - raise TypeError("Sec-WebSocket-Key must be 16 bytes") - except TypeError: - handler.send_error(400) - return None - - # 6. A |Sec-WebSocket-Version| header field, with a value of 13. We also - # allow 8 from draft-ietf-hybi-thewebsocketprotocol-10. - version = headers.get("sec-websocket-version") - KNOWN_VERSIONS = ["8", "13"] - if version not in KNOWN_VERSIONS: - # "If this version does not match a version understood by the server, - # the server MUST abort the WebSocket handshake described in this - # section and instead send an appropriate HTTP error code (such as 426 - # Upgrade Required) and a |Sec-WebSocket-Version| header field - # indicating the version(s) the server is capable of understanding." - handler.send_response(426) - handler.send_header("Sec-WebSocket-Version", ", ".join(KNOWN_VERSIONS)) - handler.end_headers() - return None - - # 7. Optionally, an |Origin| header field. - - # 8. Optionally, a |Sec-WebSocket-Protocol| header field, with a list of - # values indicating which protocols the client would like to speak, ordered - # by preference. - protocols_str = headers.get("sec-websocket-protocol") - if protocols_str is None: - protocols = [] - else: - protocols = [x.strip().lower() for x in protocols_str.split(",")] - - # 9. Optionally, a |Sec-WebSocket-Extensions| header field... - - # 10. Optionally, other header fields... - - # See RFC 6455 section 4.2.2, item 5 for these steps. - - # 1. A Status-Line with a 101 response code as per RFC 2616. - handler.send_response(101) - # 2. An |Upgrade| header field with value "websocket" as per RFC 2616. - handler.send_header("Upgrade", "websocket") - # 3. A |Connection| header field with value "Upgrade". - handler.send_header("Connection", "Upgrade") - # 4. A |Sec-WebSocket-Accept| header field. The value of this header field - # is constructed by concatenating /key/, defined above in step 4 in Section - # 4.2.2, with the string "258EAFA5-E914-47DA-95CA-C5AB0DC85B11", taking the - # SHA-1 hash of this concatenated value to obtain a 20-byte value and - # base64-encoding (see Section 4 of [RFC4648]) this 20-byte hash. - accept_key = base64.b64encode(sha1(key + MAGIC_GUID).digest()) - handler.send_header("Sec-WebSocket-Accept", accept_key) - # 5. Optionally, a |Sec-WebSocket-Protocol| header field, with a value - # /subprotocol/ as defined in step 4 in Section 4.2.2. - if "base64" in protocols: - handler.send_header("Sec-WebSocket-Protocol", "base64") - # 6. Optionally, a |Sec-WebSocket-Extensions| header field... - - handler.end_headers() - - return protocols - -def grab_string(s, pos): - """Grab a NUL-terminated string from the given string, starting at the given - offset. Return (pos, str) tuple, or (pos, None) on error.""" - i = pos - while i < len(s): - if s[i] == '\0': - return (i + 1, s[pos:i]) - i += 1 - return pos, None - -# http://ftp.icm.edu.pl/packages/socks/socks4/SOCKS4.protocol -# https://en.wikipedia.org/wiki/SOCKS#SOCKS4a -def parse_socks_request(data): - """Parse the 8-byte SOCKS header at the beginning of data. Returns a - (dest, port) tuple. Raises ValueError on error.""" - try: - ver, cmd, dport, o1, o2, o3, o4 = struct.unpack(">BBHBBBB", data[:8]) - except struct.error: - raise ValueError("Couldn't unpack SOCKS4 header") - if ver != 4: - raise ValueError("Wrong SOCKS version (%d)" % ver) - if cmd != 1: - raise ValueError("Wrong SOCKS command (%d)" % cmd) - pos, userid = grab_string(data, 8) - if userid is None: - raise ValueError("Couldn't read userid") - if o1 == 0 and o2 == 0 and o3 == 0 and o4 != 0: - pos, dest = grab_string(data, pos) - if dest is None: - raise ValueError("Couldn't read destination") - else: - dest = "%d.%d.%d.%d" % (o1, o2, o3, o4) - return dest, dport - -def handle_socks_request(fd): - try: - addr = fd.getpeername() - data = fd.recv(100) - except socket.error, e: - log(u"Socket error from SOCKS-pending: %s" % repr(str(e))) - return False - try: - dest_addr = parse_socks_request(data) - except ValueError, e: - log(u"Error parsing SOCKS request: %s." % str(e)) - # Error reply. - fd.sendall(struct.pack(">BBHBBBB", 0, 91, 0, 0, 0, 0, 0)) - return False - log(u"Got SOCKS request for %s." % safe_format_addr(dest_addr)) - fd.sendall(struct.pack(">BBHBBBB", 0, 90, dest_addr[1], 127, 0, 0, 1)) - # Note we throw away the requested address and port. - return True - -def report_pending(): - log(u"locals (%d): %s" % (len(locals), [format_peername(x) for x in locals])) - log(u"remotes (%d): %s" % (len(remotes), [format_peername(x) for x in remotes])) - -def register(): - if not options.register: - return - - # sys.path[0] is initialized to the directory containing the Python script file. - script_dir = sys.path[0] - if not script_dir: - # Maybe the script was read from stdin; in any case don't guess at the directory. - return - command = [os.path.join(script_dir, "flashproxy-reg-http.py")] - spec = format_addr((None, options.remote_addr[1])) - if options.facilitator_addr is None: - log(u"Registering "%s"." % spec) - else: - command += [format_addr(options.facilitator_addr)] - command += ["-a", spec] - try: - p = subprocess.Popen(command) - except OSError, e: - log(u"Failed to register: %s" % str(e)) - -def proxy_chunk_local_to_remote(local, remote, data = None): - if data is None: - try: - data = local.recv(65536) - except socket.error, e: # Can be "Connection reset by peer". - log(u"Socket error from local: %s" % repr(str(e))) - remote.close() - return False - if not data: - log(u"EOF from local %s." % format_peername(local)) - local.close() - remote.close() - return False - else: - try: - remote.send_chunk(data) - except socket.error, e: - log(u"Socket error writing to remote: %s" % repr(str(e))) - local.close() - return False - return True - -def proxy_chunk_remote_to_local(remote, local, data = None): - if data is None: - try: - data = remote.recv(65536) - except socket.error, e: # Can be "Connection reset by peer". - log(u"Socket error from remote: %s" % repr(str(e))) - local.close() - return False - if not data: - log(u"EOF from remote %s." % format_peername(remote)) - remote.close() - local.close() - return False - else: - remote.dec.feed(data) - while True: - try: - data = remote.dec.read() - except (WebSocketDecoder.MaskingError, ValueError), e: - log(u"WebSocket decode error from remote: %s" % repr(str(e))) - remote.close() - local.close() - return False - if data is None: - break - elif not data: - log(u"WebSocket close from remote %s." % format_peername(remote)) - remote.close() - local.close() - return False - try: - local.send_chunk(data) - except socket.error, e: - log(u"Socket error writing to local: %s" % repr(str(e))) - remote.close() - return False - return True - -def receive_unlinked(fd, label): - """Receive and buffer data on a socket that has not been linked yet. Returns - True iff there was no error and the socket may still be used; otherwise, the - socket will be closed before returning.""" - - try: - data = fd.recv(1024) - except socket.error, e: - log(u"Socket error from %s: %s" % (label, repr(str(e)))) - fd.close() - return False - if not data: - log(u"EOF from unlinked %s %s with %d bytes buffered." % (label, format_peername(fd), len(fd.buf))) - fd.close() - return False - else: - log(u"Data from unlinked %s %s (%d bytes)." % (label, format_peername(fd), len(data))) - fd.buf += data - if len(fd.buf) >= UNCONNECTED_BUFFER_LIMIT: - log(u"Refusing to buffer more than %d bytes from %s %s." % (UNCONNECTED_BUFFER_LIMIT, label, format_peername(fd))) - fd.close() - return False - return True - -def match_proxies(): - while unlinked_remotes and unlinked_locals: - remote = unlinked_remotes.pop(0) - local = unlinked_locals.pop(0) - remote_addr, remote_port = remote.getpeername() - local_addr, local_port = local.getpeername() - log(u"Linking %s and %s." % (format_peername(local), format_peername(remote))) - remote.partner = local - local.partner = remote - if remote.buf: - if not proxy_chunk_remote_to_local(remote, local, remote.buf): - remotes.remove(remote) - locals.remove(local) - register() - return - if local.buf: - if not proxy_chunk_local_to_remote(local, remote, local.buf): - remotes.remove(remote) - locals.remove(local) - return - -class TimeoutSocket(object): - def __init__(self, fd): - self.fd = fd - self.birthday = time.time() - - def age(self): - return time.time() - self.birthday - - def __getattr__(self, name): - return getattr(self.fd, name) - -class RemoteSocket(object): - def __init__(self, fd, protocols): - self.fd = fd - self.buf = "" - self.partner = None - self.dec = WebSocketBinaryDecoder(protocols, use_mask = True) - self.enc = WebSocketBinaryEncoder(protocols, use_mask = False) - - def send_chunk(self, data): - self.sendall(self.enc.encode(data)) - - def __getattr__(self, name): - return getattr(self.fd, name) - -class LocalSocket(object): - def __init__(self, fd): - self.fd = fd - self.buf = "" - self.partner = None - - def send_chunk(self, data): - self.sendall(data) - - def __getattr__(self, name): - return getattr(self.fd, name) - -def main(): - while True: - rset = [remote_s, local_s] + websocket_pending + socks_pending + locals + remotes - rset, _, _ = select.select(rset, [], [], WEBSOCKET_REQUEST_TIMEOUT) - for fd in rset: - if fd == remote_s: - remote_c, addr = fd.accept() - log(u"Remote connection from %s." % safe_format_addr(addr)) - websocket_pending.append(TimeoutSocket(remote_c)) - elif fd == local_s: - local_c, addr = fd.accept() - log(u"Local connection from %s." % safe_format_addr(addr)) - socks_pending.append(local_c) - register() - elif fd in websocket_pending: - log(u"Data from WebSocket-pending %s." % safe_format_addr(addr)) - protocols = handle_websocket_request(fd) - if protocols is not None: - wrapped = RemoteSocket(fd, protocols) - remotes.append(wrapped) - unlinked_remotes.append(wrapped) - else: - fd.close() - websocket_pending.remove(fd) - report_pending() - elif fd in socks_pending: - log(u"SOCKS request from %s." % safe_format_addr(addr)) - if handle_socks_request(fd): - wrapped = LocalSocket(fd) - locals.append(wrapped) - unlinked_locals.append(wrapped) - else: - fd.close() - socks_pending.remove(fd) - report_pending() - elif fd in remotes: - local = fd.partner - if local: - if not proxy_chunk_remote_to_local(fd, local): - remotes.remove(fd) - locals.remove(local) - register() - else: - if not receive_unlinked(fd, "remote"): - remotes.remove(fd) - unlinked_remotes.remove(fd) - register() - report_pending() - elif fd in locals: - remote = fd.partner - if remote: - if not proxy_chunk_local_to_remote(fd, remote): - remotes.remove(remote) - locals.remove(fd) - else: - if not receive_unlinked(fd, "local"): - locals.remove(fd) - unlinked_locals.remove(fd) - report_pending() - match_proxies() - while websocket_pending: - pending = websocket_pending[0] - if pending.age() < WEBSOCKET_REQUEST_TIMEOUT: - break - log(u"Expired remote connection from %s." % format_peername(pending)) - pending.close() - websocket_pending.pop(0) - report_pending() - -if __name__ == "__main__": - opts, args = getopt.gnu_getopt(sys.argv[1:], "f:hl:r", ["daemon", "facilitator=", "help", "log=", "pidfile=", "register", "unsafe-logging"]) - for o, a in opts: - if o == "--daemon": - options.daemonize = True - elif o == "-f" or o == "--facilitator": - options.facilitator_addr = parse_addr_spec(a) - elif o == "-h" or o == "--help": - usage() - sys.exit() - elif o == "-l" or o == "--log": - options.log_filename = a - elif o == "--pidfile": - options.pid_filename = a - elif o == "-r" or o == "--register": - options.register = True - elif o == "--unsafe-logging": - options.safe_logging = False - - if len(args) == 0: - options.local_addr = (DEFAULT_LOCAL_ADDRESS, DEFAULT_LOCAL_PORT) - options.remote_addr = (DEFAULT_REMOTE_ADDRESS, DEFAULT_REMOTE_PORT) - elif len(args) == 1: - options.local_addr = parse_addr_spec(args[0], DEFAULT_LOCAL_ADDRESS, DEFAULT_LOCAL_PORT) - options.remote_addr = (DEFAULT_REMOTE_ADDRESS, DEFAULT_REMOTE_PORT) - elif len(args) == 2: - options.local_addr = parse_addr_spec(args[0], DEFAULT_LOCAL_ADDRESS, DEFAULT_LOCAL_PORT) - options.remote_addr = parse_addr_spec(args[1], DEFAULT_REMOTE_ADDRESS, DEFAULT_REMOTE_PORT) - else: - usage(sys.stderr) - sys.exit(1) - - if options.log_filename: - options.log_file = open(options.log_filename, "a") - # Send error tracebacks to the log. - sys.stderr = options.log_file - else: - options.log_file = sys.stdout - - # Local socket, accepting SOCKS requests from localhost - local_s = listen_socket(options.local_addr) - # Remote socket, accepting remote WebSocket connections from proxies. - remote_s = listen_socket(options.remote_addr) - - # New remote sockets waiting to finish their WebSocket negotiation. - websocket_pending = [] - # Remote connection sockets. - remotes = [] - # Remotes not yet linked with a local. This is a subset of remotes. - unlinked_remotes = [] - # New local sockets waiting to finish their SOCKS negotiation. - socks_pending = [] - # Local Tor sockets, after SOCKS negotiation. - locals = [] - # Locals not yet linked with a remote. This is a subset of remotes. - unlinked_locals = [] - - register() - - if options.daemonize: - log(u"Daemonizing.") - pid = os.fork() - if pid != 0: - if options.pid_filename: - f = open(options.pid_filename, "w") - print >> f, pid - f.close() - sys.exit(0) - try: - main() - except Exception: - exc = traceback.format_exc() - log("".join(exc)) diff --git a/doc/design.txt b/doc/design.txt index c8141dd..71cf554 100644 --- a/doc/design.txt +++ b/doc/design.txt @@ -38,14 +38,15 @@ Design of flash proxies 1. Tor client: Is just ordinary Tor with a special configuration to allow it to connect through a flash proxy. It advertises its need for a connection the the facilitator, and communicates with the - flash proxy through the connector. - 2. Connector: Runs on the same computer as the Tor client. It opens - one socket to the Internet and another to localhost. It waits for a - connection on both sockets, then starts proxying data between them. - The connector speaks SOCKS on the localhost side so that Tor can - connect to it using the Socks4Proxy configuration option. On - startup, the connector informs the facilitator that it is waiting - for a connection. + flash proxy through the client transport plugin. + 2. Client transport plugin: Runs on the same computer as the Tor + client. It opens one socket to the Internet and another to + localhost. It waits for a connection on both sockets, then starts + proxying data between them. The transport plugin speaks SOCKS on + the localhost side so that it can work as a pluggable transport for + Tor using the ClientTransportPlugin configuration option. On + startup, the transport plugin registers with the the facilitator to + inform the facilitator that it is waiting for a connection. 3. Flash proxy: Runs in someone's browser, in an uncensored region of the Internet. The flash proxy first connects to the facilitator to get a client registration. It then makes two outgoing connections, @@ -60,17 +61,18 @@ Design of flash proxies
4. Sample session
- 1. The restricted Tor user starts the connector program. - 2. The restricted user starts Tor, which makes a SOCKS connection to - the connector. - 3. The connector notifies the facilitator that it needs a connection. + 1. The restricted Tor user starts the client transport plugin. + 2. The client transport plugin notifies the facilitator that it needs + a connection. + 3. The restricted user starts Tor, which connects to the client + transport plugin. 4. An unrestricted user opens the web page containing the flash proxy. 5. The flash proxy connects to the facilitator and asks for a client. 6. The facilitator sends one of its client registrations to the proxy. 7. The flash proxy connects to a Tor relay and to the waiting client - connector. - 8. The connector receives the flash proxy's connection and begins - relaying data between it and the Tor relay. + transport plugin. + 8. The client transport plugin receives the flash proxy's connection + and begins relaying data between it and the Tor relay.
Later, the flash proxy may go offline. Assuming that another flash proxy is available, it will receive the same client's address from the @@ -80,22 +82,24 @@ Design of flash proxies 5. Behavior of the Tor client
The Tor client must be configured to make its connections through a - local proxy (the connector). This configuration is sufficient: + local proxy (the client transport plugin). This configuration is + sufficient: UseBridges 1 Bridge 127.0.0.1:9001 Socks4Proxy 127.0.0.1:9001 The address given for the "Bridge" option is actually irrelevant. The - connector will ignore it and connect (through the flash proxy) to a - Tor relay. The Tor client does not have control of its first hop. + client transport plugin will ignore it and connect (through the flash + proxy) to a Tor relay. The Tor client does not have control of its + first hop.
-6. Behavior of the connector +6. Behavior of the client transport plugin
- The connector serves two purposes: It sends a registration message to - the facilitator and it carries data between a flash proxy and the - local Tor client. + The client transport plugin serves two purposes: It sends a + registration message to the facilitator and it carries data between a + flash proxy and the local Tor client.
- On startup, the connector sends a registration message to the - facilitator, informing the facilitator that it is waiting for a + On startup, the client transport plugin sends a registration message + to the facilitator, informing the facilitator that it is waiting for a connection. The facilitator will later hand this registration to a flash proxy. The registration message is an HTTP POST request of the form: @@ -105,16 +109,16 @@ Design of flash proxies client=[<address>]:<port>
The facilitator sends a 200 reply if the registration was successful - and an error status otherwise. If the connector omits the [<address>] - part, the facilitator will automatically fill it in based on the HTTP - client address, which means the connector doesn't have to know its - external address. - - The connector solves the impedance mismatch between the Tor client and - the flash proxy, both of which want to make outgoing connections to - the other. The connector sits in between, listens for connections from - both ends, and matches them together. The remote socket listens on - port 9000 and the local on port 9001. + and an error status otherwise. If the transport plugin omits the + [<address>] part, the facilitator will automatically fill it in based + on the HTTP client address, which means the transport plugin doesn't + have to know its external address. + + The client transport plugin solves the impedance mismatch between the + Tor client and the flash proxy, both of which want to make outgoing + connections to the other. The transport plugin sits in between, + listens for connections from both ends, and matches them together. The + remote socket listens on port 9000 and the local on port 9001.
On the local side, it acts as a SOCKS proxy (albeit one that always goes to the same destination). diff --git a/experiments/exercise/exercise.sh b/experiments/exercise/exercise.sh index 117b2f5..554e5fb 100755 --- a/experiments/exercise/exercise.sh +++ b/experiments/exercise/exercise.sh @@ -21,7 +21,7 @@ trap stop EXIT date
cd "$FLASHPROXY_DIR" -./connector.py --register ":$LOCAL_PORT" ":$REMOTE_PORT" & +./flashproxy-client.py --register ":$LOCAL_PORT" ":$REMOTE_PORT" & PIDS_TO_KILL+=($!)
sleep 20 diff --git a/experiments/switching/local-http-alternating.sh b/experiments/switching/local-http-alternating.sh index ee2dd70..e0807e2 100755 --- a/experiments/switching/local-http-alternating.sh +++ b/experiments/switching/local-http-alternating.sh @@ -40,8 +40,8 @@ echo "Start facilitator." PIDS_TO_KILL+=($!) visible_sleep 5
-echo "Start connector." -"$FLASHPROXY_DIR"/connector.py --register --facilitator 127.0.0.1:9002 >/dev/null & +echo "Start client transport plugin." +"$FLASHPROXY_DIR"/flashproxy-client.py --register --facilitator 127.0.0.1:9002 >/dev/null & PIDS_TO_KILL+=($!) visible_sleep 1
diff --git a/experiments/switching/local-http-constant.sh b/experiments/switching/local-http-constant.sh index 4eb51b2..9efaafd 100755 --- a/experiments/switching/local-http-constant.sh +++ b/experiments/switching/local-http-constant.sh @@ -44,8 +44,8 @@ echo "Start facilitator." PIDS_TO_KILL+=($!) visible_sleep 5
-echo "Start connector." -"$FLASHPROXY_DIR"/connector.py --register --facilitator 127.0.0.1:9002 >/dev/null & +echo "Start client transport plugin." +"$FLASHPROXY_DIR"/flashproxy-client.py --register --facilitator 127.0.0.1:9002 >/dev/null & PIDS_TO_KILL+=($!) visible_sleep 1
diff --git a/experiments/switching/remote-tor-alternating.sh b/experiments/switching/remote-tor-alternating.sh index 4b0df87..e4ad8c0 100755 --- a/experiments/switching/remote-tor-alternating.sh +++ b/experiments/switching/remote-tor-alternating.sh @@ -37,8 +37,8 @@ echo "Start facilitator." PIDS_TO_KILL+=($!) visible_sleep 15
-echo "Start connector." -"$FLASHPROXY_DIR"/connector.py --register --facilitator 127.0.0.1:9002 >/dev/null & +echo "Start client transport plugin." +"$FLASHPROXY_DIR"/flashproxy-client.py --register --facilitator 127.0.0.1:9002 >/dev/null & PIDS_TO_KILL+=($!) visible_sleep 1
diff --git a/experiments/switching/remote-tor-constant.sh b/experiments/switching/remote-tor-constant.sh index 7320999..a26a323 100755 --- a/experiments/switching/remote-tor-constant.sh +++ b/experiments/switching/remote-tor-constant.sh @@ -36,8 +36,8 @@ echo "Start facilitator." PIDS_TO_KILL+=($!) visible_sleep 15
-echo "Start connector." -"$FLASHPROXY_DIR"/connector.py --register --facilitator 127.0.0.1:9002 >/dev/null & +echo "Start client transport plugin." +"$FLASHPROXY_DIR"/flashproxy-client.py --register --facilitator 127.0.0.1:9002 >/dev/null & PIDS_TO_KILL+=($!) visible_sleep 1
diff --git a/experiments/throughput/throughput.sh b/experiments/throughput/throughput.sh index 8befe65..2290e94 100755 --- a/experiments/throughput/throughput.sh +++ b/experiments/throughput/throughput.sh @@ -2,15 +2,15 @@
# Usage: ./throughput.sh [-n NUM_CLIENTS] # -# Tests the raw throughput of a single proxy. This script starts a web server -# serving swfcat.swf and a large data file, starts a facilitator, connector, and -# socat shim, and then starts multiple downloads through the proxy at once. -# Results are saved in a file called results-NUM_CLIENTS-DATE, where DATE is the -# current date. +# Tests the raw throughput of a single proxy. This script starts a web +# server serving swfcat.swf and a large data file, starts a facilitator, +# client transport plugin, and socat shim, and then starts multiple +# downloads through the proxy at once. Results are saved in a file +# called results-NUM_CLIENTS-DATE, where DATE is the current date.
# plain socks ws ws plain -# httpget <---> socat <---> connector <---> flashproxy <---> websockify <---> thttpd -# 2000 9001 9000 8001 8000 +# httpget <---> socat <---> flashproxy-client <---> flashproxy <---> websockify <---> thttpd +# 2000 9001 9000 8001 8000
. ../common.sh
@@ -57,8 +57,8 @@ echo "Start facilitator." PIDS_TO_KILL+=($!) visible_sleep 1
-echo "Start connector." -"$FLASHPROXY_DIR"/connector.py >/dev/null & +echo "Start client transport plugin." +"$FLASHPROXY_DIR"/flashproxy-client.py >/dev/null & PIDS_TO_KILL+=($!) visible_sleep 1
diff --git a/flashproxy-client-test.py b/flashproxy-client-test.py new file mode 100755 index 0000000..35d49ed --- /dev/null +++ b/flashproxy-client-test.py @@ -0,0 +1,224 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import socket +import subprocess +import unittest +flashproxy = __import__("flashproxy-client") +parse_socks_request = flashproxy.parse_socks_request +WebSocketDecoder = flashproxy.WebSocketDecoder +WebSocketEncoder = flashproxy.WebSocketEncoder +del flashproxy + +LOCAL_ADDRESS = ("127.0.0.1", 40000) +REMOTE_ADDRESS = ("127.0.0.1", 40001) + +class TestSocks(unittest.TestCase): + def test_parse_socks_request_empty(self): + self.assertRaises(ValueError, parse_socks_request, "") + def test_parse_socks_request_short(self): + self.assertRaises(ValueError, parse_socks_request, "\x04\x01\x99\x99\x01\x02\x03\x04") + def test_parse_socks_request_ip_userid_missing(self): + dest, port = parse_socks_request("\x04\x01\x99\x99\x01\x02\x03\x04\x00") + dest, port = parse_socks_request("\x04\x01\x99\x99\x01\x02\x03\x04\x00userid") + self.assertEqual((dest, port), ("1.2.3.4", 0x9999)) + def test_parse_socks_request_ip(self): + dest, port = parse_socks_request("\x04\x01\x99\x99\x01\x02\x03\x04userid\x00") + self.assertEqual((dest, port), ("1.2.3.4", 0x9999)) + def test_parse_socks_request_hostname_missing(self): + self.assertRaises(ValueError, parse_socks_request, "\x04\x01\x99\x99\x00\x00\x00\x01userid\x00") + self.assertRaises(ValueError, parse_socks_request, "\x04\x01\x99\x99\x00\x00\x00\x01userid\x00abc") + def test_parse_socks_request_hostname(self): + dest, port = parse_socks_request("\x04\x01\x99\x99\x00\x00\x00\x01userid\x00abc\x00") + +def read_frames(dec): + frames = [] + while True: + frame = dec.read_frame() + if frame is None: + break + frames.append((frame.fin, frame.opcode, frame.payload)) + return frames + +def read_messages(dec): + messages = [] + while True: + message = dec.read_message() + if message is None: + break + messages.append((message.opcode, message.payload)) + return messages + +class TestWebSocketDecoder(unittest.TestCase): + def test_rfc(self): + """Test samples from RFC 6455 section 5.7.""" + TESTS = [ + ("\x81\x05\x48\x65\x6c\x6c\x6f", False, + [(True, 1, "Hello")], + [(1, u"Hello")]), + ("\x81\x85\x37\xfa\x21\x3d\x7f\x9f\x4d\x51\x58", True, + [(True, 1, "Hello")], + [(1, u"Hello")]), + ("\x01\x03\x48\x65\x6c\x80\x02\x6c\x6f", False, + [(False, 1, "Hel"), (True, 0, "lo")], + [(1, u"Hello")]), + ("\x89\x05\x48\x65\x6c\x6c\x6f", False, + [(True, 9, "Hello")], + [(9, u"Hello")]), + ("\x8a\x85\x37\xfa\x21\x3d\x7f\x9f\x4d\x51\x58", True, + [(True, 10, "Hello")], + [(10, u"Hello")]), + ("\x82\x7e\x01\x00" + "\x00" * 256, False, + [(True, 2, "\x00" * 256)], + [(2, "\x00" * 256)]), + ("\x82\x7f\x00\x00\x00\x00\x00\x01\x00\x00" + "\x00" * 65536, False, + [(True, 2, "\x00" * 65536)], + [(2, "\x00" * 65536)]), + ("\x82\x7f\x00\x00\x00\x00\x00\x01\x00\x03" + "ABCD" * 16384 + "XYZ", False, + [(True, 2, "ABCD" * 16384 + "XYZ")], + [(2, "ABCD" * 16384 + "XYZ")]), + ] + for data, use_mask, expected_frames, expected_messages in TESTS: + dec = WebSocketDecoder(use_mask = use_mask) + dec.feed(data) + actual_frames = read_frames(dec) + self.assertEqual(actual_frames, expected_frames) + + dec = WebSocketDecoder(use_mask = use_mask) + dec.feed(data) + actual_messages = read_messages(dec) + self.assertEqual(actual_messages, expected_messages) + + dec = WebSocketDecoder(use_mask = not use_mask) + dec.feed(data) + self.assertRaises(WebSocketDecoder.MaskingError, dec.read_frame) + + def test_empty_feed(self): + """Test that the decoder can handle a zero-byte feed.""" + dec = WebSocketDecoder() + self.assertEqual(dec.read_frame(), None) + dec.feed("") + self.assertEqual(dec.read_frame(), None) + dec.feed("\x81\x05H") + self.assertEqual(dec.read_frame(), None) + dec.feed("ello") + self.assertEqual(read_frames(dec), [(True, 1, u"Hello")]) + + def test_empty_frame(self): + """Test that a frame may contain a zero-byte payload.""" + dec = WebSocketDecoder() + dec.feed("\x81\x00") + self.assertEqual(read_frames(dec), [(True, 1, u"")]) + dec.feed("\x82\x00") + self.assertEqual(read_frames(dec), [(True, 2, "")]) + + def test_empty_message(self): + """Test that a message may have a zero-byte payload.""" + dec = WebSocketDecoder() + dec.feed("\x01\x00\x00\x00\x80\x00") + self.assertEqual(read_messages(dec), [(1, u"")]) + dec.feed("\x02\x00\x00\x00\x80\x00") + self.assertEqual(read_messages(dec), [(2, "")]) + + def test_interleaved_control(self): + """Test that control messages interleaved with fragmented messages are + returned.""" + dec = WebSocketDecoder() + dec.feed("\x89\x04PING\x01\x03Hel\x8a\x04PONG\x80\x02lo\x89\x04PING") + self.assertEqual(read_messages(dec), [(9, "PING"), (10, "PONG"), (1, u"Hello"), (9, "PING")]) + + def test_fragmented_control(self): + """Test that illegal fragmented control messages cause an error.""" + dec = WebSocketDecoder() + dec.feed("\x09\x04PING") + self.assertRaises(ValueError, dec.read_message) + + def test_zero_opcode(self): + """Test that it is an error for the first frame in a message to have an + opcode of 0.""" + dec = WebSocketDecoder() + dec.feed("\x80\x05Hello") + self.assertRaises(ValueError, dec.read_message) + dec = WebSocketDecoder() + dec.feed("\x00\x05Hello") + self.assertRaises(ValueError, dec.read_message) + + def test_nonzero_opcode(self): + """Test that every frame after the first must have a zero opcode.""" + dec = WebSocketDecoder() + dec.feed("\x01\x01H\x01\x02el\x80\x02lo") + self.assertRaises(ValueError, dec.read_message) + dec = WebSocketDecoder() + dec.feed("\x01\x01H\x00\x02el\x01\x02lo") + self.assertRaises(ValueError, dec.read_message) + + def test_utf8(self): + """Test that text frames (opcode 1) are decoded from UTF-8.""" + text = u"Hello World or Καλημέρα κόσμε or こんにちは 世界 or \U0001f639" + utf8_text = text.encode("utf-8") + dec = WebSocketDecoder() + dec.feed("\x81" + chr(len(utf8_text)) + utf8_text) + self.assertEqual(read_messages(dec), [(1, text)]) + + def test_wrong_utf8(self): + """Test that failed UTF-8 decoding causes an error.""" + TESTS = [ + "\xc0\x41", # Non-shortest form. + "\xc2", # Unfinished sequence. + ] + for test in TESTS: + dec = WebSocketDecoder() + dec.feed("\x81" + chr(len(test)) + test) + self.assertRaises(ValueError, dec.read_message) + + def test_overly_large_payload(self): + """Test that large payloads are rejected.""" + dec = WebSocketDecoder() + dec.feed("\x82\x7f\x00\x00\x00\x00\x01\x00\x00\x00") + self.assertRaises(ValueError, dec.read_frame) + +class TestWebSocketEncoder(unittest.TestCase): + def test_length(self): + """Test that payload lengths are encoded using the smallest number of + bytes.""" + TESTS = [(0, 0), (125, 0), (126, 2), (65535, 2), (65536, 8)] + for length, encoded_length in TESTS: + enc = WebSocketEncoder(use_mask = False) + eframe = enc.encode_frame(2, "\x00" * length) + self.assertEqual(len(eframe), 1 + 1 + encoded_length + length) + enc = WebSocketEncoder(use_mask = True) + eframe = enc.encode_frame(2, "\x00" * length) + self.assertEqual(len(eframe), 1 + 1 + encoded_length + 4 + length) + + def test_roundtrip(self): + TESTS = [ + (1, u"Hello world"), + (1, u"Hello \N{WHITE SMILING FACE}"), + ] + for opcode, payload in TESTS: + for use_mask in (False, True): + enc = WebSocketEncoder(use_mask = use_mask) + enc_message = enc.encode_message(opcode, payload) + dec = WebSocketDecoder(use_mask = use_mask) + dec.feed(enc_message) + self.assertEqual(read_messages(dec), [(opcode, payload)]) + +def format_address(addr): + return "%s:%d" % addr + +class TestConnectionLimit(unittest.TestCase): + def setUp(self): + self.p = subprocess.Popen(["./flashproxy-client.py", format_address(LOCAL_ADDRESS), format_address(REMOTE_ADDRESS)]) + + def tearDown(self): + self.p.terminate() + + def test_remote_limit(self): + """Test that the client transport plugin limits the number of remote + connections that it will accept.""" + for i in range(5): + s = socket.create_connection(REMOTE_ADDRESS, 2) + self.assertRaises(socket.error, socket.create_connection, REMOTE_ADDRESS) + +if __name__ == "__main__": + unittest.main() diff --git a/flashproxy-client.py b/flashproxy-client.py new file mode 100755 index 0000000..8afd0c0 --- /dev/null +++ b/flashproxy-client.py @@ -0,0 +1,932 @@ +#!/usr/bin/env python + +import array +import base64 +import cStringIO +import getopt +import httplib +import os +import re +import select +import socket +import struct +import subprocess +import sys +import time +import traceback +import urllib +import xml.sax.saxutils +import BaseHTTPServer + +try: + from hashlib import sha1 +except ImportError: + # Python 2.4 uses this name. + from sha import sha as sha1 + +try: + import numpy +except ImportError: + numpy = None + +DEFAULT_REMOTE_ADDRESS = "0.0.0.0" +DEFAULT_REMOTE_PORT = 9000 +DEFAULT_LOCAL_ADDRESS = "127.0.0.1" +DEFAULT_LOCAL_PORT = 9001 + +LOG_DATE_FORMAT = "%Y-%m-%d %H:%M:%S" + +class options(object): + local_addr = None + remote_addr = None + facilitator_addr = None + + log_filename = None + log_file = sys.stdout + daemonize = False + register = False + pid_filename = None + safe_logging = True + +# We accept up to this many bytes from a socket not yet matched with a partner +# before disconnecting it. +UNCONNECTED_BUFFER_LIMIT = 10240 + +def usage(f = sys.stdout): + print >> f, """\ +Usage: %(progname)s --register [LOCAL][:PORT] [REMOTE][:PORT] +Wait for connections on a local and a remote port. When any pair of connections +exists, data is ferried between them until one side is closed. By default +LOCAL is "%(local)s" and REMOTE is "%(remote)s". + +The local connection acts as a SOCKS4a proxy, but the host and port in the SOCKS +request are ignored and the local connection is always linked to a remote +connection. + +If the --register option is used, then your IP address will be sent to the +facilitator so that proxies can connect to you. You need to register in some way +in order to get any service. The --facilitator option allows controlling which +facilitator is used; if omitted, it uses a public default. + --daemon daemonize (Unix only). + -f, --facilitator=HOST[:PORT] advertise willingness to receive connections to + HOST:PORT. + -h, --help show this help. + -l, --log FILENAME write log to FILENAME (default stdout). + --pidfile FILENAME write PID to FILENAME after daemonizing. + -r, --register register with the facilitator. + --unsafe-logging don't scrub IP addresses from logs.\ +""" % { + "progname": sys.argv[0], + "local": format_addr((DEFAULT_LOCAL_ADDRESS, DEFAULT_LOCAL_PORT)), + "remote": format_addr((DEFAULT_REMOTE_ADDRESS, DEFAULT_REMOTE_PORT)), +} + +def safe_str(s): + """Return s if options.safe_logging is true, and "[scrubbed]" otherwise.""" + if options.safe_logging: + return "[scrubbed]" + else: + return s + +def log(msg): + print >> options.log_file, (u"%s %s" % (time.strftime(LOG_DATE_FORMAT), msg)).encode("UTF-8") + options.log_file.flush() + +def parse_addr_spec(spec, defhost = None, defport = None): + host = None + port = None + m = None + # IPv6 syntax. + if not m: + m = re.match(ur'^[(.+)]:(\d+)$', spec) + if m: + host, port = m.groups() + af = socket.AF_INET6 + if not m: + m = re.match(ur'^[(.+)]:?$', spec) + if m: + host, = m.groups() + af = socket.AF_INET6 + # IPv4 syntax. + if not m: + m = re.match(ur'^(.+):(\d+)$', spec) + if m: + host, port = m.groups() + af = socket.AF_INET + if not m: + m = re.match(ur'^:?(\d+)$', spec) + if m: + port, = m.groups() + af = 0 + if not m: + host = spec + af = 0 + host = host or defhost + port = port or defport + if not (host and port): + raise ValueError("Bad address specification "%s"" % spec) + return host, int(port) + +def format_addr(addr): + host, port = addr + if not host: + return u":%d" % port + # Numeric IPv6 address? + try: + addrs = socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM, socket.IPPROTO_TCP, socket.AI_NUMERICHOST) + af = addrs[0][0] + except socket.gaierror, e: + af = 0 + if af == socket.AF_INET6: + return u"[%s]:%d" % (host, port) + else: + return u"%s:%d" % (host, port) + +def safe_format_addr(addr): + return safe_str(format_addr(addr)) + host, port = addr + if not host: + return u":%d" % port + # Numeric IPv6 address? + try: + addrs = socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM, socket.IPPROTO_TCP, socket.AI_NUMERICHOST) + af = addrs[0][0] + except socket.gaierror, e: + af = 0 + if af == socket.AF_INET6: + return u"[%s]:%d" % (host, port) + else: + return u"%s:%d" % (host, port) + + + +def apply_mask_numpy(payload, mask_key): + if len(payload) == 0: + return "" + payload_a = numpy.frombuffer(payload, dtype="|u4", count=len(payload)//4) + m, = numpy.frombuffer(mask_key, dtype="|u4", count=1) + result = numpy.bitwise_xor(payload_a, m).tostring() + i = len(payload) // 4 * 4 + if i < len(payload): + remainder = [] + while i < len(payload): + remainder.append(chr(ord(payload[i]) ^ ord(mask_key[i % 4]))) + i += 1 + result = result + "".join(remainder) + return result + +def apply_mask_py(payload, mask_key): + result = array.array("B", payload) + m = array.array("B", mask_key) + i = 0 + while i < len(result) - 7: + result[i] ^= m[0] + result[i+1] ^= m[1] + result[i+2] ^= m[2] + result[i+3] ^= m[3] + result[i+4] ^= m[0] + result[i+5] ^= m[1] + result[i+6] ^= m[2] + result[i+7] ^= m[3] + i += 8 + while i < len(result): + result[i] ^= m[i%4] + i += 1 + return result.tostring() + +if numpy is not None: + apply_mask = apply_mask_numpy +else: + apply_mask = apply_mask_py + +class WebSocketFrame(object): + def __init__(self): + self.fin = False + self.opcode = None + self.payload = None + + def is_control(self): + return (self.opcode & 0x08) != 0 + +class WebSocketMessage(object): + def __init__(self): + self.opcode = None + self.payload = None + + def is_control(self): + return (self.opcode & 0x08) != 0 + +class WebSocketDecoder(object): + """RFC 6455 section 5 is about the WebSocket framing format.""" + # Raise an exception rather than buffer anything larger than this. + MAX_MESSAGE_LENGTH = 1024 * 1024 + + class MaskingError(ValueError): + pass + + def __init__(self, use_mask = False): + """use_mask should be True for server-to-client sockets, and False for + client-to-server sockets.""" + self.use_mask = use_mask + + # Per-frame state. + self.buf = "" + + # Per-message state. + self.message_buf = "" + self.message_opcode = None + + def feed(self, data): + self.buf += data + + def read_frame(self): + """Read a frame from the internal buffer, if one is available. Returns a + WebSocketFrame object, or None if there are no complete frames to + read.""" + # RFC 6255 section 5.2. + if len(self.buf) < 2: + return None + offset = 0 + b0, b1 = struct.unpack_from(">BB", self.buf, offset) + offset += 2 + fin = (b0 & 0x80) != 0 + opcode = b0 & 0x0f + frame_masked = (b1 & 0x80) != 0 + payload_len = b1 & 0x7f + + if payload_len == 126: + if len(self.buf) < offset + 2: + return None + payload_len, = struct.unpack_from(">H", self.buf, offset) + offset += 2 + elif payload_len == 127: + if len(self.buf) < offset + 8: + return None + payload_len, = struct.unpack_from(">Q", self.buf, offset) + offset += 8 + + if frame_masked: + if not self.use_mask: + # "A client MUST close a connection if it detects a masked + # frame." + raise self.MaskingError("Got masked payload from server") + if len(self.buf) < offset + 4: + return None + mask_key = self.buf[offset:offset+4] + offset += 4 + else: + if self.use_mask: + # "The server MUST close the connection upon receiving a frame + # that is not masked." + raise self.MaskingError("Got unmasked payload from client") + mask_key = None + + if payload_len > self.MAX_MESSAGE_LENGTH: + raise ValueError("Refusing to buffer payload of %d bytes" % payload_len) + + if len(self.buf) < offset + payload_len: + return None + if mask_key: + payload = apply_mask(self.buf[offset:offset+payload_len], mask_key) + else: + payload = self.buf[offset:offset+payload_len] + self.buf = self.buf[offset+payload_len:] + + frame = WebSocketFrame() + frame.fin = fin + frame.opcode = opcode + frame.payload = payload + + return frame + + def read_message(self): + """Read a complete message. If the opcode is 1, the payload is decoded + from a UTF-8 binary string to a unicode string. If a control frame is + read while another fragmented message is in progress, the control frame + is returned as a new message immediately. Returns None if there is no + complete frame to be read.""" + # RFC 6455 section 5.4 is about fragmentation. + while True: + frame = self.read_frame() + if frame is None: + return None + # "Control frames (see Section 5.5) MAY be injected in the middle of + # a fragmented message. Control frames themselves MUST NOT be + # fragmented." + if frame.is_control(): + if not frame.fin: + raise ValueError("Control frame (opcode %d) has FIN bit clear" % frame.opcode) + message = WebSocketMessage() + message.opcode = frame.opcode + message.payload = frame.payload + return message + + if self.message_opcode is None: + if frame.opcode == 0: + raise ValueError("First frame has opcode 0") + self.message_opcode = frame.opcode + else: + if frame.opcode != 0: + raise ValueError("Non-first frame has nonzero opcode %d" % frame.opcode) + self.message_buf += frame.payload + + if frame.fin: + break + message = WebSocketMessage() + message.opcode = self.message_opcode + message.payload = self.message_buf + self.postprocess_message(message) + self.message_opcode = None + self.message_buf = "" + + return message + + def postprocess_message(self, message): + if message.opcode == 1: + message.payload = message.payload.decode("utf-8") + return message + +class WebSocketEncoder(object): + def __init__(self, use_mask = False): + self.use_mask = use_mask + + def encode_frame(self, opcode, payload): + if opcode >= 16: + raise ValueError("Opcode of %d is >= 16" % opcode) + length = len(payload) + + if self.use_mask: + mask_key = os.urandom(4) + payload = apply_mask(payload, mask_key) + mask_bit = 0x80 + else: + mask_key = "" + mask_bit = 0x00 + + if length < 126: + len_b, len_ext = length, "" + elif length < 0x10000: + len_b, len_ext = 126, struct.pack(">H", length) + elif length < 0x10000000000000000: + len_b, len_ext = 127, struct.pack(">Q", length) + else: + raise ValueError("payload length of %d is too long" % length) + + return chr(0x80 | opcode) + chr(mask_bit | len_b) + len_ext + mask_key + payload + + def encode_message(self, opcode, payload): + if opcode == 1: + payload = payload.encode("utf-8") + return self.encode_frame(opcode, payload) + +# WebSocket implementations generally support text (opcode 1) messages, which +# are UTF-8-encoded text. Not all support binary (opcode 2) messages. During the +# WebSocket handshake, we use the "base64" value of the Sec-WebSocket-Protocol +# header field to indicate that text frames should encoded UTF-8-encoded +# base64-encoded binary data. Binary messages are always interpreted verbatim, +# but text messages are rejected if "base64" was not negotiated. +# +# The idea here is that browsers that know they don't support binary messages +# can negotiate "base64" with both endpoints and still reliably transport binary +# data. Those that know they can support binary messages can just use binary +# messages in the straightforward way. + +class WebSocketBinaryDecoder(object): + def __init__(self, protocols, use_mask = False): + self.dec = WebSocketDecoder(use_mask) + self.base64 = "base64" in protocols + + def feed(self, data): + self.dec.feed(data) + + def read(self): + """Returns None when there are currently no data to be read. Returns "" + when a close message is received.""" + while True: + message = self.dec.read_message() + if message is None: + return None + elif message.opcode == 1: + if not self.base64: + raise ValueError("Received text message on decoder incapable of base64") + payload = base64.b64decode(message.payload) + if payload: + return payload + elif message.opcode == 2: + if message.payload: + return message.payload + elif message.opcode == 8: + return "" + # Ignore all other opcodes. + return None + +class WebSocketBinaryEncoder(object): + def __init__(self, protocols, use_mask = False): + self.enc = WebSocketEncoder(use_mask) + self.base64 = "base64" in protocols + + def encode(self, data): + if self.base64: + return self.enc.encode_message(1, base64.b64encode(data)) + else: + return self.enc.encode_message(2, data) + + +def listen_socket(addr): + """Return a nonblocking socket listening on the given address.""" + addrinfo = socket.getaddrinfo(addr[0], addr[1], 0, socket.SOCK_STREAM, socket.IPPROTO_TCP)[0] + s = socket.socket(addrinfo[0], addrinfo[1], addrinfo[2]) + s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + s.bind(addr) + s.listen(10) + s.setblocking(0) + return s + +def format_peername(s): + try: + return safe_format_addr(s.getpeername()) + except socket.error, e: + return "<unconnected>" + +# How long to wait for a WebSocket request on the remote socket. It is limited +# to avoid Slowloris-like attacks. +WEBSOCKET_REQUEST_TIMEOUT = 2.0 + +class WebSocketRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler): + def __init__(self, request_text, fd): + self.rfile = cStringIO.StringIO(request_text) + self.wfile = fd.makefile() + self.error = False + self.raw_requestline = self.rfile.readline() + self.parse_request() + + def log_message(self, *args): + pass + + def send_error(self, code, message = None): + BaseHTTPServer.BaseHTTPRequestHandler.send_error(self, code, message) + self.error = True + +MAGIC_GUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11" + +def handle_websocket_request(fd): + request_text = fd.recv(10 * 1024) + handler = WebSocketRequestHandler(request_text, fd) + if handler.error or not hasattr(handler, "path"): + return None + method = handler.command + path = handler.path + headers = handler.headers + + # See RFC 6455 section 4.2.1 for this sequence of checks. + # + # 1. An HTTP/1.1 or higher GET request, including a "Request-URI"... + if method != "GET": + handler.send_error(405) + return None + if path != "/": + handler.send_error(404) + return None + + # 2. A |Host| header field containing the server's authority. + # We deliberately skip this test. + + # 3. An |Upgrade| header field containing the value "websocket", treated as + # an ASCII case-insensitive value. + if "websocket" not in [x.strip().lower() for x in headers.get("upgrade").split(",")]: + handler.send_error(400) + return None + + # 4. A |Connection| header field that includes the token "Upgrade", treated + # as an ASCII case-insensitive value. + if "upgrade" not in [x.strip().lower() for x in headers.get("connection").split(",")]: + handler.send_error(400) + return None + + # 5. A |Sec-WebSocket-Key| header field with a base64-encoded value that, + # when decoded, is 16 bytes in length. + try: + key = headers.get("sec-websocket-key") + if len(base64.b64decode(key)) != 16: + raise TypeError("Sec-WebSocket-Key must be 16 bytes") + except TypeError: + handler.send_error(400) + return None + + # 6. A |Sec-WebSocket-Version| header field, with a value of 13. We also + # allow 8 from draft-ietf-hybi-thewebsocketprotocol-10. + version = headers.get("sec-websocket-version") + KNOWN_VERSIONS = ["8", "13"] + if version not in KNOWN_VERSIONS: + # "If this version does not match a version understood by the server, + # the server MUST abort the WebSocket handshake described in this + # section and instead send an appropriate HTTP error code (such as 426 + # Upgrade Required) and a |Sec-WebSocket-Version| header field + # indicating the version(s) the server is capable of understanding." + handler.send_response(426) + handler.send_header("Sec-WebSocket-Version", ", ".join(KNOWN_VERSIONS)) + handler.end_headers() + return None + + # 7. Optionally, an |Origin| header field. + + # 8. Optionally, a |Sec-WebSocket-Protocol| header field, with a list of + # values indicating which protocols the client would like to speak, ordered + # by preference. + protocols_str = headers.get("sec-websocket-protocol") + if protocols_str is None: + protocols = [] + else: + protocols = [x.strip().lower() for x in protocols_str.split(",")] + + # 9. Optionally, a |Sec-WebSocket-Extensions| header field... + + # 10. Optionally, other header fields... + + # See RFC 6455 section 4.2.2, item 5 for these steps. + + # 1. A Status-Line with a 101 response code as per RFC 2616. + handler.send_response(101) + # 2. An |Upgrade| header field with value "websocket" as per RFC 2616. + handler.send_header("Upgrade", "websocket") + # 3. A |Connection| header field with value "Upgrade". + handler.send_header("Connection", "Upgrade") + # 4. A |Sec-WebSocket-Accept| header field. The value of this header field + # is constructed by concatenating /key/, defined above in step 4 in Section + # 4.2.2, with the string "258EAFA5-E914-47DA-95CA-C5AB0DC85B11", taking the + # SHA-1 hash of this concatenated value to obtain a 20-byte value and + # base64-encoding (see Section 4 of [RFC4648]) this 20-byte hash. + accept_key = base64.b64encode(sha1(key + MAGIC_GUID).digest()) + handler.send_header("Sec-WebSocket-Accept", accept_key) + # 5. Optionally, a |Sec-WebSocket-Protocol| header field, with a value + # /subprotocol/ as defined in step 4 in Section 4.2.2. + if "base64" in protocols: + handler.send_header("Sec-WebSocket-Protocol", "base64") + # 6. Optionally, a |Sec-WebSocket-Extensions| header field... + + handler.end_headers() + + return protocols + +def grab_string(s, pos): + """Grab a NUL-terminated string from the given string, starting at the given + offset. Return (pos, str) tuple, or (pos, None) on error.""" + i = pos + while i < len(s): + if s[i] == '\0': + return (i + 1, s[pos:i]) + i += 1 + return pos, None + +# http://ftp.icm.edu.pl/packages/socks/socks4/SOCKS4.protocol +# https://en.wikipedia.org/wiki/SOCKS#SOCKS4a +def parse_socks_request(data): + """Parse the 8-byte SOCKS header at the beginning of data. Returns a + (dest, port) tuple. Raises ValueError on error.""" + try: + ver, cmd, dport, o1, o2, o3, o4 = struct.unpack(">BBHBBBB", data[:8]) + except struct.error: + raise ValueError("Couldn't unpack SOCKS4 header") + if ver != 4: + raise ValueError("Wrong SOCKS version (%d)" % ver) + if cmd != 1: + raise ValueError("Wrong SOCKS command (%d)" % cmd) + pos, userid = grab_string(data, 8) + if userid is None: + raise ValueError("Couldn't read userid") + if o1 == 0 and o2 == 0 and o3 == 0 and o4 != 0: + pos, dest = grab_string(data, pos) + if dest is None: + raise ValueError("Couldn't read destination") + else: + dest = "%d.%d.%d.%d" % (o1, o2, o3, o4) + return dest, dport + +def handle_socks_request(fd): + try: + addr = fd.getpeername() + data = fd.recv(100) + except socket.error, e: + log(u"Socket error from SOCKS-pending: %s" % repr(str(e))) + return False + try: + dest_addr = parse_socks_request(data) + except ValueError, e: + log(u"Error parsing SOCKS request: %s." % str(e)) + # Error reply. + fd.sendall(struct.pack(">BBHBBBB", 0, 91, 0, 0, 0, 0, 0)) + return False + log(u"Got SOCKS request for %s." % safe_format_addr(dest_addr)) + fd.sendall(struct.pack(">BBHBBBB", 0, 90, dest_addr[1], 127, 0, 0, 1)) + # Note we throw away the requested address and port. + return True + +def report_pending(): + log(u"locals (%d): %s" % (len(locals), [format_peername(x) for x in locals])) + log(u"remotes (%d): %s" % (len(remotes), [format_peername(x) for x in remotes])) + +def register(): + if not options.register: + return + + # sys.path[0] is initialized to the directory containing the Python script file. + script_dir = sys.path[0] + if not script_dir: + # Maybe the script was read from stdin; in any case don't guess at the directory. + return + command = [os.path.join(script_dir, "flashproxy-reg-http.py")] + spec = format_addr((None, options.remote_addr[1])) + if options.facilitator_addr is None: + log(u"Registering "%s"." % spec) + else: + command += [format_addr(options.facilitator_addr)] + command += ["-a", spec] + try: + p = subprocess.Popen(command) + except OSError, e: + log(u"Failed to register: %s" % str(e)) + +def proxy_chunk_local_to_remote(local, remote, data = None): + if data is None: + try: + data = local.recv(65536) + except socket.error, e: # Can be "Connection reset by peer". + log(u"Socket error from local: %s" % repr(str(e))) + remote.close() + return False + if not data: + log(u"EOF from local %s." % format_peername(local)) + local.close() + remote.close() + return False + else: + try: + remote.send_chunk(data) + except socket.error, e: + log(u"Socket error writing to remote: %s" % repr(str(e))) + local.close() + return False + return True + +def proxy_chunk_remote_to_local(remote, local, data = None): + if data is None: + try: + data = remote.recv(65536) + except socket.error, e: # Can be "Connection reset by peer". + log(u"Socket error from remote: %s" % repr(str(e))) + local.close() + return False + if not data: + log(u"EOF from remote %s." % format_peername(remote)) + remote.close() + local.close() + return False + else: + remote.dec.feed(data) + while True: + try: + data = remote.dec.read() + except (WebSocketDecoder.MaskingError, ValueError), e: + log(u"WebSocket decode error from remote: %s" % repr(str(e))) + remote.close() + local.close() + return False + if data is None: + break + elif not data: + log(u"WebSocket close from remote %s." % format_peername(remote)) + remote.close() + local.close() + return False + try: + local.send_chunk(data) + except socket.error, e: + log(u"Socket error writing to local: %s" % repr(str(e))) + remote.close() + return False + return True + +def receive_unlinked(fd, label): + """Receive and buffer data on a socket that has not been linked yet. Returns + True iff there was no error and the socket may still be used; otherwise, the + socket will be closed before returning.""" + + try: + data = fd.recv(1024) + except socket.error, e: + log(u"Socket error from %s: %s" % (label, repr(str(e)))) + fd.close() + return False + if not data: + log(u"EOF from unlinked %s %s with %d bytes buffered." % (label, format_peername(fd), len(fd.buf))) + fd.close() + return False + else: + log(u"Data from unlinked %s %s (%d bytes)." % (label, format_peername(fd), len(data))) + fd.buf += data + if len(fd.buf) >= UNCONNECTED_BUFFER_LIMIT: + log(u"Refusing to buffer more than %d bytes from %s %s." % (UNCONNECTED_BUFFER_LIMIT, label, format_peername(fd))) + fd.close() + return False + return True + +def match_proxies(): + while unlinked_remotes and unlinked_locals: + remote = unlinked_remotes.pop(0) + local = unlinked_locals.pop(0) + remote_addr, remote_port = remote.getpeername() + local_addr, local_port = local.getpeername() + log(u"Linking %s and %s." % (format_peername(local), format_peername(remote))) + remote.partner = local + local.partner = remote + if remote.buf: + if not proxy_chunk_remote_to_local(remote, local, remote.buf): + remotes.remove(remote) + locals.remove(local) + register() + return + if local.buf: + if not proxy_chunk_local_to_remote(local, remote, local.buf): + remotes.remove(remote) + locals.remove(local) + return + +class TimeoutSocket(object): + def __init__(self, fd): + self.fd = fd + self.birthday = time.time() + + def age(self): + return time.time() - self.birthday + + def __getattr__(self, name): + return getattr(self.fd, name) + +class RemoteSocket(object): + def __init__(self, fd, protocols): + self.fd = fd + self.buf = "" + self.partner = None + self.dec = WebSocketBinaryDecoder(protocols, use_mask = True) + self.enc = WebSocketBinaryEncoder(protocols, use_mask = False) + + def send_chunk(self, data): + self.sendall(self.enc.encode(data)) + + def __getattr__(self, name): + return getattr(self.fd, name) + +class LocalSocket(object): + def __init__(self, fd): + self.fd = fd + self.buf = "" + self.partner = None + + def send_chunk(self, data): + self.sendall(data) + + def __getattr__(self, name): + return getattr(self.fd, name) + +def main(): + while True: + rset = [remote_s, local_s] + websocket_pending + socks_pending + locals + remotes + rset, _, _ = select.select(rset, [], [], WEBSOCKET_REQUEST_TIMEOUT) + for fd in rset: + if fd == remote_s: + remote_c, addr = fd.accept() + log(u"Remote connection from %s." % safe_format_addr(addr)) + websocket_pending.append(TimeoutSocket(remote_c)) + elif fd == local_s: + local_c, addr = fd.accept() + log(u"Local connection from %s." % safe_format_addr(addr)) + socks_pending.append(local_c) + register() + elif fd in websocket_pending: + log(u"Data from WebSocket-pending %s." % safe_format_addr(addr)) + protocols = handle_websocket_request(fd) + if protocols is not None: + wrapped = RemoteSocket(fd, protocols) + remotes.append(wrapped) + unlinked_remotes.append(wrapped) + else: + fd.close() + websocket_pending.remove(fd) + report_pending() + elif fd in socks_pending: + log(u"SOCKS request from %s." % safe_format_addr(addr)) + if handle_socks_request(fd): + wrapped = LocalSocket(fd) + locals.append(wrapped) + unlinked_locals.append(wrapped) + else: + fd.close() + socks_pending.remove(fd) + report_pending() + elif fd in remotes: + local = fd.partner + if local: + if not proxy_chunk_remote_to_local(fd, local): + remotes.remove(fd) + locals.remove(local) + register() + else: + if not receive_unlinked(fd, "remote"): + remotes.remove(fd) + unlinked_remotes.remove(fd) + register() + report_pending() + elif fd in locals: + remote = fd.partner + if remote: + if not proxy_chunk_local_to_remote(fd, remote): + remotes.remove(remote) + locals.remove(fd) + else: + if not receive_unlinked(fd, "local"): + locals.remove(fd) + unlinked_locals.remove(fd) + report_pending() + match_proxies() + while websocket_pending: + pending = websocket_pending[0] + if pending.age() < WEBSOCKET_REQUEST_TIMEOUT: + break + log(u"Expired remote connection from %s." % format_peername(pending)) + pending.close() + websocket_pending.pop(0) + report_pending() + +if __name__ == "__main__": + opts, args = getopt.gnu_getopt(sys.argv[1:], "f:hl:r", ["daemon", "facilitator=", "help", "log=", "pidfile=", "register", "unsafe-logging"]) + for o, a in opts: + if o == "--daemon": + options.daemonize = True + elif o == "-f" or o == "--facilitator": + options.facilitator_addr = parse_addr_spec(a) + elif o == "-h" or o == "--help": + usage() + sys.exit() + elif o == "-l" or o == "--log": + options.log_filename = a + elif o == "--pidfile": + options.pid_filename = a + elif o == "-r" or o == "--register": + options.register = True + elif o == "--unsafe-logging": + options.safe_logging = False + + if len(args) == 0: + options.local_addr = (DEFAULT_LOCAL_ADDRESS, DEFAULT_LOCAL_PORT) + options.remote_addr = (DEFAULT_REMOTE_ADDRESS, DEFAULT_REMOTE_PORT) + elif len(args) == 1: + options.local_addr = parse_addr_spec(args[0], DEFAULT_LOCAL_ADDRESS, DEFAULT_LOCAL_PORT) + options.remote_addr = (DEFAULT_REMOTE_ADDRESS, DEFAULT_REMOTE_PORT) + elif len(args) == 2: + options.local_addr = parse_addr_spec(args[0], DEFAULT_LOCAL_ADDRESS, DEFAULT_LOCAL_PORT) + options.remote_addr = parse_addr_spec(args[1], DEFAULT_REMOTE_ADDRESS, DEFAULT_REMOTE_PORT) + else: + usage(sys.stderr) + sys.exit(1) + + if options.log_filename: + options.log_file = open(options.log_filename, "a") + # Send error tracebacks to the log. + sys.stderr = options.log_file + else: + options.log_file = sys.stdout + + # Local socket, accepting SOCKS requests from localhost + local_s = listen_socket(options.local_addr) + # Remote socket, accepting remote WebSocket connections from proxies. + remote_s = listen_socket(options.remote_addr) + + # New remote sockets waiting to finish their WebSocket negotiation. + websocket_pending = [] + # Remote connection sockets. + remotes = [] + # Remotes not yet linked with a local. This is a subset of remotes. + unlinked_remotes = [] + # New local sockets waiting to finish their SOCKS negotiation. + socks_pending = [] + # Local Tor sockets, after SOCKS negotiation. + locals = [] + # Locals not yet linked with a remote. This is a subset of remotes. + unlinked_locals = [] + + register() + + if options.daemonize: + log(u"Daemonizing.") + pid = os.fork() + if pid != 0: + if options.pid_filename: + f = open(options.pid_filename, "w") + print >> f, pid + f.close() + sys.exit(0) + try: + main() + except Exception: + exc = traceback.format_exc() + log("".join(exc)) diff --git a/torrc b/torrc index 66dd3b6..d502265 100644 --- a/torrc +++ b/torrc @@ -1,6 +1,6 @@ ## Configuration file for Tor over flash proxies. ## Usage: -## python connector.py --register +## python flashproxy-client.py --register ## tor -f torrc
ClientTransportPlugin websocket socks4 127.0.0.1:9001
tor-commits@lists.torproject.org