[tor-commits] [meek/master] Use persistent connections in the WSGI reflector.

dcf at torproject.org dcf at torproject.org
Sun Apr 12 07:52:16 UTC 2015


commit e06bcf2c849075cf241addf2182dfc0125a35c92
Author: David Fifield <david at bamsoftware.com>
Date:   Tue Apr 7 09:36:35 2015 -0700

    Use persistent connections in the WSGI reflector.
    
    This improves performance quite a lot--previously we were doing a
    complete TCP and TLS handshake to meek-server for every single request,
    which, apart from increasing latency, also caused a lot of CPU usage on
    meek-server. it was up above 80% when I checked it.
    
    Now we reuse connections until they error out, making a new connection
    if all others are currently busy.
---
 wsgi/reflect.py |   90 ++++++++++++++++++++++++++++++++++++-------------------
 1 file changed, 60 insertions(+), 30 deletions(-)

diff --git a/wsgi/reflect.py b/wsgi/reflect.py
index e9bff42..5b89f6a 100644
--- a/wsgi/reflect.py
+++ b/wsgi/reflect.py
@@ -2,35 +2,18 @@
 
 import httplib
 import urlparse
+import threading
 
 FORWARD_URL = "https://meek.bamsoftware.com/"
 TIMEOUT = 20
 BUFSIZ = 2048
+MAX_REQUEST_LENGTH = 0x10000
 
 REFLECTED_HEADER_FIELDS = [
     "Content-Type",
     "X-Session-Id",
 ]
 
-# Limits a file-like object to reading only n bytes. Used to limit wsgi.input to
-# the Content-Length, otherwise it blocks.
-class LimitedReader(object):
-    def __init__(self, f, n):
-        self.f = f
-        self.n = n
-
-    def __getattr__(self, name):
-        return getattr(self.f, name)
-
-    def read(self, size=None):
-        if self.n <= 0:
-            return ""
-        if size is None or size > self.n:
-            size = self.n
-        data = self.f.read(size)
-        self.n -= len(data)
-        return data
-
 # Join two URL paths.
 def path_join(a, b):
     if a.endswith("/"):
@@ -60,8 +43,14 @@ def copy_request(environ, url):
 
     content_length = environ.get("CONTENT_LENGTH")
     if content_length:
-        body = LimitedReader(environ["wsgi.input"], int(content_length))
-        headers.append(("Content-Length", content_length))
+        content_length = int(content_length)
+        # We read the whole response body (and limit its length). Normally we
+        # would just pass environ["wsgi.input"] as the body to
+        # HTTPSConnection.request. But make_request may need to try the request
+        # twice, in which case it needs to send the same body the second time.
+        if content_length > MAX_REQUEST_LENGTH:
+            raise ValueError("Content-Length too large: %d" % content_length)
+        body = environ["wsgi.input"].read(content_length)
     else:
         body = ""
 
@@ -73,18 +62,56 @@ def copy_request(environ, url):
 
     return method, url, body, headers
 
-def make_conn(url):
-    u = urlparse.urlsplit(url)
-    create_connection = httplib.HTTPConnection
-    if u.scheme == "https":
-        create_connection = httplib.HTTPSConnection
-    return create_connection(u.hostname, u.port, strict=True, timeout=TIMEOUT)
+# We want to reuse persistent HTTPSConnections. If we don't then every request
+# will start a branch new TCP and TLS connection, leading to increased latency
+# and high CPU use on meek-server. A pool just locks connections so only one
+# thread can use a connection at a time. If the connection is still good after
+# use, then the caller should put it back by calling restore_conn.
+class ConnectionPool(object):
+    def __init__(self, url):
+        self.url = urlparse.urlsplit(url)
+        self.conns = []
+        self.lock = threading.RLock()
+
+    def new_conn(self):
+        create_connection = httplib.HTTPConnection
+        if self.url.scheme == "https":
+            create_connection = httplib.HTTPSConnection
+        return create_connection(self.url.hostname, self.url.port, strict=True, timeout=TIMEOUT)
+
+    def get_conn(self):
+        with self.lock:
+            try:
+                return self.conns.pop(0)
+            except IndexError:
+                pass
+        return self.new_conn()
+
+    def restore_conn(self, conn):
+        with self.lock:
+            self.conns.append(conn)
 
 def make_request(conn, method, url, body, headers):
     u = urlparse.urlsplit(url)
     path = urlparse.urlunsplit(("", "", u.path, u.query, ""))
     conn.request(method, path, body, headers)
-    return conn.getresponse()
+    try:
+        return conn.getresponse()
+    except httplib.BadStatusLine, e:
+        if e.message != "":
+            raise
+        # There's a very common error with httplib persistent connections. If
+        # you let a connection idle until it times out, then issue a request,
+        # you will get a BadStatusLine("") exception, not when the request is
+        # sent, but when getresponse tries to read from a closed socket. When
+        # that happens, we reinitialize the connection by first closing it,
+        # which will cause a new TCP and TLS handshake to happen for the next
+        # request.
+        conn.close()
+        conn.request(method, path, body, headers)
+        return conn.getresponse()
+
+pool = ConnectionPool(FORWARD_URL)
 
 def main(environ, start_response):
     try:
@@ -93,10 +120,12 @@ def main(environ, start_response):
         start_response("400 Bad Request", [("Content-Type", "text/plain; charset=utf-8")])
         yield "Bad Request"
         return
+
     try:
-        conn = make_conn(url)
+        conn = pool.get_conn()
         resp = make_request(conn, method, url, body, headers)
     except Exception, e:
+        # Discard conn.
         start_response("500 Internal Server Error", [("Content-Type", "text/plain; charset=utf-8")])
         yield "Internal Server Error"
         return
@@ -114,7 +143,8 @@ def main(environ, start_response):
             break
         yield data
 
-    conn.close()
+    resp.close()
+    pool.restore_conn(conn)
 
 if __name__ == "__main__":
     import wsgiref.simple_server



More information about the tor-commits mailing list