commit e06bcf2c849075cf241addf2182dfc0125a35c92 Author: David Fifield david@bamsoftware.com Date: Tue Apr 7 09:36:35 2015 -0700
Use persistent connections in the WSGI reflector.
This improves performance quite a lot--previously we were doing a complete TCP and TLS handshake to meek-server for every single request, which, apart from increasing latency, also caused a lot of CPU usage on meek-server. it was up above 80% when I checked it.
Now we reuse connections until they error out, making a new connection if all others are currently busy. --- wsgi/reflect.py | 90 ++++++++++++++++++++++++++++++++++++------------------- 1 file changed, 60 insertions(+), 30 deletions(-)
diff --git a/wsgi/reflect.py b/wsgi/reflect.py index e9bff42..5b89f6a 100644 --- a/wsgi/reflect.py +++ b/wsgi/reflect.py @@ -2,35 +2,18 @@
import httplib import urlparse +import threading
FORWARD_URL = "https://meek.bamsoftware.com/" TIMEOUT = 20 BUFSIZ = 2048 +MAX_REQUEST_LENGTH = 0x10000
REFLECTED_HEADER_FIELDS = [ "Content-Type", "X-Session-Id", ]
-# Limits a file-like object to reading only n bytes. Used to limit wsgi.input to -# the Content-Length, otherwise it blocks. -class LimitedReader(object): - def __init__(self, f, n): - self.f = f - self.n = n - - def __getattr__(self, name): - return getattr(self.f, name) - - def read(self, size=None): - if self.n <= 0: - return "" - if size is None or size > self.n: - size = self.n - data = self.f.read(size) - self.n -= len(data) - return data - # Join two URL paths. def path_join(a, b): if a.endswith("/"): @@ -60,8 +43,14 @@ def copy_request(environ, url):
content_length = environ.get("CONTENT_LENGTH") if content_length: - body = LimitedReader(environ["wsgi.input"], int(content_length)) - headers.append(("Content-Length", content_length)) + content_length = int(content_length) + # We read the whole response body (and limit its length). Normally we + # would just pass environ["wsgi.input"] as the body to + # HTTPSConnection.request. But make_request may need to try the request + # twice, in which case it needs to send the same body the second time. + if content_length > MAX_REQUEST_LENGTH: + raise ValueError("Content-Length too large: %d" % content_length) + body = environ["wsgi.input"].read(content_length) else: body = ""
@@ -73,18 +62,56 @@ def copy_request(environ, url):
return method, url, body, headers
-def make_conn(url): - u = urlparse.urlsplit(url) - create_connection = httplib.HTTPConnection - if u.scheme == "https": - create_connection = httplib.HTTPSConnection - return create_connection(u.hostname, u.port, strict=True, timeout=TIMEOUT) +# We want to reuse persistent HTTPSConnections. If we don't then every request +# will start a branch new TCP and TLS connection, leading to increased latency +# and high CPU use on meek-server. A pool just locks connections so only one +# thread can use a connection at a time. If the connection is still good after +# use, then the caller should put it back by calling restore_conn. +class ConnectionPool(object): + def __init__(self, url): + self.url = urlparse.urlsplit(url) + self.conns = [] + self.lock = threading.RLock() + + def new_conn(self): + create_connection = httplib.HTTPConnection + if self.url.scheme == "https": + create_connection = httplib.HTTPSConnection + return create_connection(self.url.hostname, self.url.port, strict=True, timeout=TIMEOUT) + + def get_conn(self): + with self.lock: + try: + return self.conns.pop(0) + except IndexError: + pass + return self.new_conn() + + def restore_conn(self, conn): + with self.lock: + self.conns.append(conn)
def make_request(conn, method, url, body, headers): u = urlparse.urlsplit(url) path = urlparse.urlunsplit(("", "", u.path, u.query, "")) conn.request(method, path, body, headers) - return conn.getresponse() + try: + return conn.getresponse() + except httplib.BadStatusLine, e: + if e.message != "": + raise + # There's a very common error with httplib persistent connections. If + # you let a connection idle until it times out, then issue a request, + # you will get a BadStatusLine("") exception, not when the request is + # sent, but when getresponse tries to read from a closed socket. When + # that happens, we reinitialize the connection by first closing it, + # which will cause a new TCP and TLS handshake to happen for the next + # request. + conn.close() + conn.request(method, path, body, headers) + return conn.getresponse() + +pool = ConnectionPool(FORWARD_URL)
def main(environ, start_response): try: @@ -93,10 +120,12 @@ def main(environ, start_response): start_response("400 Bad Request", [("Content-Type", "text/plain; charset=utf-8")]) yield "Bad Request" return + try: - conn = make_conn(url) + conn = pool.get_conn() resp = make_request(conn, method, url, body, headers) except Exception, e: + # Discard conn. start_response("500 Internal Server Error", [("Content-Type", "text/plain; charset=utf-8")]) yield "Internal Server Error" return @@ -114,7 +143,8 @@ def main(environ, start_response): break yield data
- conn.close() + resp.close() + pool.restore_conn(conn)
if __name__ == "__main__": import wsgiref.simple_server