From 28259e5cf5cb5bfbba2ef054ebe7b94096175b68 Mon Sep 17 00:00:00 2001 From: Yves Date: Sat, 13 Mar 2010 12:09:25 +0100 Subject: Add origin request headers to each proxy request --- proxy.py | 48 +++++++++++++++++++++++++----------------------- 1 file changed, 25 insertions(+), 23 deletions(-) diff --git a/proxy.py b/proxy.py index f7fa2ac..3249d2f 100755 --- a/proxy.py +++ b/proxy.py @@ -13,28 +13,28 @@ ENDPOINTS = [ # ('10.1.1.156', 8888), ] -KB = 1024 +kB = 1024 #minimum entity size to start a paralel fetch -THRESHOLD = 0 #512 * KB +THRESHOLD = 0 * kB #512 * kB #first fetch-range blocksize -INIT_BLOCKSIZE = 512 * KB +INIT_BLOCKSIZE = 512 * kB #lower bound of fetch-range blocksize optimization -MIN_BLOCKSIZE = 512 * KB +MIN_BLOCKSIZE = 512 * kB #time each fetcher spent on his range, calculated using #speed measured while using INIT_BLOCKSIZE TIME_SLICE = 5 #start a new fetcher on a endpoint X-bytes before the #old one finished -FETCHER_JUMPSTART = 32 * KB +FETCHER_JUMPSTART = 32 * kB ################# class Fetcher(asynchat.async_chat): - def __init__(self, reader, endpoint, url, header, range): + def __init__(self, reader, endpoint, url, headers, range): self.reader = reader self.endpoint = endpoint self.url = url - self.header = header + self.headers = headers self.range = range self.pos = self.range[0] @@ -53,7 +53,8 @@ class Fetcher(asynchat.async_chat): def handle_connect (self): print self, "Start" self.send("GET http://%s:%s%s HTTP/1.0\r\n" % ( self.url.hostname, self.url.port or 80, self.url.path )) - #XXX self.header insert request header from client, and keep alive if supported + for key in filter(lambda k: k not in ("range"), self.headers.keys()): #origin request headers + self.send("%s: %s\r\n" % (key, self.headers[key])) self.send("Range: bytes=%s-%s\r\n" % (self.range[0], self.range[1])) self.send("\r\n") self.start_time = time.time() @@ -119,12 +120,12 @@ class MultipleProxyReader(object): if fetcher.range[1] - fetcher.pos < FETCHER_JUMPSTART \ and self.fetch_pos + 1 < self.content_length and not self.channel.is_closed \ - and len( filter( lambda f: f.endpoint == fetcher.endpoint, self.fetchers) ) < 2: + and len( filter( (lambda f: f.endpoint == fetcher.endpoint), self.fetchers) ) < 2: #Start a new fetcher on this line if this fetchers is X-Bytes before finishing his jobs blocksize = min(TIME_SLICE * int(fetcher.speed()), MIN_BLOCKSIZE) - range = self.next_range(blocksize) - print "Start new Fetcher, bs=%s range=%s" % (blocksize,range) - self.fetchers.append( Fetcher(self, fetcher.endpoint, self.url, self.header, range) ) + fetch_range = self.next_range(blocksize) + print "Start new Fetcher, bs=%s range=%s" % (blocksize,fetch_range) + self.fetchers.append( Fetcher(self, fetcher.endpoint, self.url, self.header, fetch_range) ) while self.send_next_data(): pass @@ -132,8 +133,7 @@ class MultipleProxyReader(object): def next_range(self, suggested_blocksize): start = self.fetch_pos self.fetch_pos = min(self.fetch_pos + suggested_blocksize, self.content_length) - r=(start, self.fetch_pos-1) - return r + return (start, self.fetch_pos-1) def handle_incoming_http_header(self, header): if self.header_sent: @@ -175,12 +175,14 @@ class MultipleProxyReader(object): class HTTPChannel(asynchat.async_chat): def __init__(self, server, sock, addr): - asynchat.async_chat.__init__(self, sock) self.server = server - self.set_terminator("\r\n\r\n") - self.request = None + self.data = StringIO.StringIO() self.is_closed = False + self.request = None + + asynchat.async_chat.__init__(self, sock) + self.set_terminator("\r\n\r\n") def handle_close(self): self.is_closed = True @@ -199,14 +201,16 @@ class HTTPChannel(asynchat.async_chat): # badly formed request; just shut down self.close_when_done() else: - self.server.handle_request(self, self.request[0], self.request[1]) + headers = httplib.HTTPMessage(self.data).dict + self.server.handle_request(self, self.request[0], self.request[1], headers) else: pass # ignore body data, for now class HTTPProxyServer(asyncore.dispatcher): def __init__(self): - asyncore.dispatcher.__init__(self) self.port = 8080 + + asyncore.dispatcher.__init__(self) self.create_socket(socket.AF_INET, socket.SOCK_STREAM) self.set_reuse_addr() self.bind(("", 8080)) @@ -216,7 +220,7 @@ class HTTPProxyServer(asyncore.dispatcher): conn, addr = self.accept() HTTPChannel(self, conn, addr) - def handle_request(self, channel, method, path): + def handle_request(self, channel, method, path, headers): url = urlparse.urlparse(path) print method, path if method != "GET" or url.query != "": @@ -239,9 +243,7 @@ class HTTPProxyServer(asyncore.dispatcher): if content_length < THRESHOLD: self._bypass_request(channel, "GET", url) else: - #XXX parse original headers to send them with all fetcher-requests - header = (("foo", "bla")) - MultipleProxyReader(channel, url, header, content_length) + MultipleProxyReader(channel, url, headers, content_length) def _bypass_request(self, channel, method, url): print "_bypass request hostname=%s port=%s path=%s" % (url.hostname, url.port or 80, url.path) -- cgit v1.2.1