From fe7a395f73bd2fbb9e01d24910722cc13c0ee649 Mon Sep 17 00:00:00 2001 From: Yves Date: Fri, 2 Apr 2010 20:23:13 +0200 Subject: python-class for config, code dedup --- proxy.py | 136 ++++++++++++++++++++++++++------------------------------------- 1 file changed, 56 insertions(+), 80 deletions(-) diff --git a/proxy.py b/proxy.py index c11236d..edf6be2 100755 --- a/proxy.py +++ b/proxy.py @@ -4,27 +4,31 @@ import asynchat, asyncore, socket, httplib, urlparse from heapq import heappush, heappop import cStringIO as StringIO +kB = 1024 -ENDPOINTS = [ - ('10.2.2.11', 8888), -# ('10.3.1.2', 8888), -# ('10.1.1.156', 8888), -] +class DefaultConfiguration: + """bind to that""" + listen=("",8080) -kB = 1024 -#minimum entity size to start a paralel fetch -THRESHOLD = 512 * kB -#first fetch-range blocksize -INIT_BLOCKSIZE = 512 * kB -#lower bound of fetch-range blocksize optimization -MIN_BLOCKSIZE = 512 * kB -#time each fetcher spent on his range, calculated using -#speed measured while using INIT_BLOCKSIZE -TIME_SLICE = 5 -#start a new fetcher on a endpoint X-bytes before the -#old one finished -FETCHER_JUMPSTART = 32 * kB + """available http-proxies""" + endpoints=[ ('10.2.2.11', 8888) ] + + """minimum entity size to start parallelize fetch""" + threshold = 512*kB + + """initial size/range for a fetcher-job""" + initial_blocksize=512*kB + """minimal size/range for a fetcher-job""" + minimal_blocksize=512*kB + + """(sec) #time each fetcher spent on his range, + calculated using speed measured while using initial_blocksize""" + time_slice=5 + + """start a new fetcher on a endpoint X-bytes before the old one finished""" + fetcher_jumpstart=32*kB + ################# class Fetcher(asynchat.async_chat): @@ -41,7 +45,6 @@ class Fetcher(asynchat.async_chat): self.http_status = "" self.http_header = StringIO.StringIO() self.state = 0 #0=status, 1=header, 2=body - print self, "__init__" asynchat.async_chat.__init__(self) self.set_terminator("\r\n") self.create_socket(socket.AF_INET, socket.SOCK_STREAM) @@ -51,7 +54,7 @@ class Fetcher(asynchat.async_chat): return "= THRESHOLD: + if content_length >= self.config.threshold: self.content_length = content_length - fetcher.range = self.next_range(INIT_BLOCKSIZE) - for proxy in filter(lambda p: fetcher.proxy != p, ENDPOINTS): + fetcher.range = self.next_range(self.config.initial_blocksize) + for proxy in filter(lambda p: fetcher.proxy != p, self.config.endpoints): if self.fetch_pos == self.content_length -1: break - self.fetchers.append(Fetcher( self, proxy, self.url, self.header, self.next_range(INIT_BLOCKSIZE))) + self.fetchers.append(Fetcher( self, proxy, self.url, self.header, self.next_range(self.config.initial_blocksize))) else: content_length = None @@ -208,43 +212,9 @@ class MagicHTTPProxyClient(object): buf += "\r\n" self.channel.push(buf) -"""Transparent forward to other proxy server""" -class HTTPProxyClient(asynchat.async_chat): - def __init__(self, proxy, channel, method, url, headers): - self.proxy = proxy - self.other = channel - self.method = method - self.headers = headers - - asynchat.async_chat.__init__(self) - self.set_terminator(None) - self.create_socket(socket.AF_INET, socket.SOCK_STREAM) - self.connect(self.proxy) - - self.buf = "" - self.buf += "%s %s HTTP/1.0\r\n" % (method, urlparse.urlunparse(url)) - for key in headers.keys(): - self.buf += "%s: %s\r\n" % (key, headers[key]) - self.buf += "\r\n" - - def __str__(self): - return "" % self.proxy - - def collect_incoming_data(self, data): - self.other.push(data) - - def handle_close(self): - self.close() - self.other.close_when_done() - - def handle_connect(self): - self.push(self.buf) - class HTTPChannel(asynchat.async_chat): def __init__(self, server, sock, addr): - print "Channel opened" self.server = server - self.data = StringIO.StringIO() asynchat.async_chat.__init__(self, sock) @@ -269,42 +239,48 @@ class HTTPChannel(asynchat.async_chat): else: self.set_terminator(None) headers = httplib.HTTPMessage(self.data).dict - self.handle_request(self, self.request[0], self.request[1], headers) + self.handle_request(self.request[0], self.request[1], headers) - def handle_request(self, channel, method, path, headers): + def handle_request(self, method, path, headers): url = urlparse.urlparse(path) - if method != "GET": + if method != "GET" or url.query != "": #do not handle non-GET or GET with Query (?foo=bla) requests - return self._bypass_request(channel, method, url, headers) + proxy = self.server.config.endpoints[ int( md5.md5(url.hostname).hexdigest(),16 ) % len(self.server.config.endpoints) ] + print Fetcher(self, proxy, url, headers, (-1,-1)) else: - MagicHTTPProxyClient(channel, url, headers) - - def _bypass_request(self, channel, method, url, headers): - proxy = ENDPOINTS[ int( md5.md5(url.hostname).hexdigest(),16 ) % len(ENDPOINTS) ] - print self, "_bypass request via %s: %s %s" % (proxy, method, urlparse.urlunparse(url)) - HTTPProxyClient(proxy, channel, method, url, headers) - #FIXME use this other thing - - + MagicHTTPProxyClient(self, url, headers) + + def handle_incoming_http_header(self,fetcher,header): + header.seek(0) + headers = httplib.HTTPMessage(header) + buf = "HTTP/1.1 200 OK\r\n" + buf += "\r\n".join(map(lambda hdr: "%s: %s" % (hdr,headers.dict[hdr]), headers.dict.keys())) + buf += "\r\n\r\n" + self.push(buf) -class HTTPProxyServer(asyncore.dispatcher): - def __init__(self): - self.port = 8080 + def handle_incoming_data(self, fetcher, data=None, length=0): + if data: + self.push(data) + return True + +class HTTPProxyServer(asyncore.dispatcher): + def __init__(self,config): + self.config = config asyncore.dispatcher.__init__(self) self.create_socket(socket.AF_INET, socket.SOCK_STREAM) self.set_reuse_addr() - self.bind(("", 8080)) + self.bind(self.config.listen) self.listen(5) def __str__(self): - return "" % self.port + return "" % self.config.listen def handle_accept(self): conn, addr = self.accept() HTTPChannel(self, conn, addr) if __name__ == "__main__": - proxy = HTTPProxyServer() + proxy = HTTPProxyServer(DefaultConfiguration) print proxy asyncore.loop() -- cgit v1.2.1