From c18e0ab17a23ddef42768151c6905818ad1d40d3 Mon Sep 17 00:00:00 2001 From: Yves Date: Fri, 2 Apr 2010 20:48:13 +0200 Subject: lays a egg --- proxy.py | 286 --------------------------------------------------------------- 1 file changed, 286 deletions(-) delete mode 100755 proxy.py (limited to 'proxy.py') diff --git a/proxy.py b/proxy.py deleted file mode 100755 index edf6be2..0000000 --- a/proxy.py +++ /dev/null @@ -1,286 +0,0 @@ -#!/usr/bin/python -t -import os, sys, string, time, md5, random -import asynchat, asyncore, socket, httplib, urlparse -from heapq import heappush, heappop -import cStringIO as StringIO - -kB = 1024 - -class DefaultConfiguration: - """bind to that""" - listen=("",8080) - - """available http-proxies""" - endpoints=[ ('10.2.2.11', 8888) ] - - """minimum entity size to start parallelize fetch""" - threshold = 512*kB - - """initial size/range for a fetcher-job""" - initial_blocksize=512*kB - - """minimal size/range for a fetcher-job""" - minimal_blocksize=512*kB - - """(sec) #time each fetcher spent on his range, - calculated using speed measured while using initial_blocksize""" - time_slice=5 - - """start a new fetcher on a endpoint X-bytes before the old one finished""" - fetcher_jumpstart=32*kB - -################# - -class Fetcher(asynchat.async_chat): - def __init__(self, reader, proxy, url, headers, range): - self.reader = reader - self.proxy = proxy - self.url = url - self.headers = headers - self.range = range - - self.pos = (self.range[0] != -1) and self.range[0] or 0 - self.start_time = 0 - self.stop_time = 0 - self.http_status = "" - self.http_header = StringIO.StringIO() - self.state = 0 #0=status, 1=header, 2=body - asynchat.async_chat.__init__(self) - self.set_terminator("\r\n") - self.create_socket(socket.AF_INET, socket.SOCK_STREAM) - self.connect(self.proxy) - - def __str__(self): - return "= self.range[1] + 1: - #if this request is the first one (whithout Range: ...) then the server - # dont send us our expected range, we must cut it at some point (here) - bytes_remaining = self.range[1] - ( self.pos ) - data=data[:bytes_remaining+1] - print self,"cut: pos=%s length=%s => %s" % (self.pos, length, len(data)) - length = len(data) - if length == 0: - self.reader.handle_incoming_data(self) - self.close() - return - if not self.reader.handle_incoming_data(self, data, length): - self.close() - return - self.pos += length - if self.range != (-1,-1) and self.pos >= self.range[1]: - self.stop_time = time.time() - print self, "finished with %s kB/s" % (self.speed() / 1024) - self.reader.handle_incoming_data(self) - self.close() - elif self.state ==1: #header - self.http_header.write( data ) - else: #status - self.http_status += data - - def found_terminator(self): - if self.state == 0: #got status-line - self.state = 1 - self.set_terminator("\r\n\r\n") - elif self.state == 1: #got headers - self.state = 2 - self.set_terminator(None) - self.reader.handle_incoming_http_header(self, self.http_header) - -class MagicHTTPProxyClient(object): - def __init__(self, channel, url, header): - self.channel = channel - self.config = self.channel.server.config - self.url = url - self.header = header - - self.content_length = -1 - self.header_sent = False - self.fetch_pos = 0 - self.write_pos = 0 - self.buffer = "" - self.blocks = list() - self.fetchers = list() - - proxy = self.config.endpoints[ random.randint(0, len(self.config.endpoints)-1) ] - self.fetchers.append( Fetcher(self, proxy, self.url, self.header, (-1,-1)) ) - - def __str__(self): - return "" % (self.url.hostname, self.url.path, self.content_length) - - def handle_incoming_data(self, fetcher, data=None, length=0): - if not data: - #fetcher is done, remove from list - self.fetchers = filter(lambda f: f != fetcher, self.fetchers) - print "Remove: %s" % fetcher - else: - assert fetcher.pos < fetcher.range[1] or fetcher.range == (-1,-1) - heappush(self.blocks, (fetcher.pos, data, length)) - - if not self.channel.connected: - print self, "request side closed the connection" - return False - - if fetcher.range != (-1,-1) \ - and fetcher.range[1] - (fetcher.pos+length) < self.config.fetcher_jumpstart \ - and self.fetch_pos + 1 < self.content_length \ - and len( filter(lambda f: f.proxy == fetcher.proxy, self.fetchers) ) < 2: - #Start a new fetcher if this fetcher is X-Bytes before finished his job - blocksize = max(int(self.config.time_slice * fetcher.speed()), self.config.minimal_blocksize) - fetch_range = self.next_range(blocksize) - self.fetchers.append( Fetcher(self, fetcher.proxy, self.url, self.header, fetch_range) ) - - #if len(self.blocks)>0: - #print self,"fetch_pos=%s write_pos=%s get=%s with length=%s pending=%s" % (self.fetch_pos, self.write_pos, min(self.blocks)[0],min(self.blocks)[2], len(self.blocks)) - - buf = "" - while len(self.blocks)>0 and min(self.blocks)[0] == self.write_pos: - item = heappop(self.blocks) - buf += item[1] - self.write_pos += item[2] - - if buf != "": - self.channel.push(buf) - - if self.write_pos + 1 >= self.content_length: - print self, "job done %s blocks left" % len(self.blocks) - self.channel.close_when_done() - return True - - def next_range(self, suggested_blocksize): - assert self.content_length != -1 - start = self.fetch_pos - self.fetch_pos = min(self.fetch_pos + suggested_blocksize, self.content_length) - return (start, self.fetch_pos-1) - - def handle_incoming_http_header(self, fetcher, header): - if not self.channel.connected: - return - if self.header_sent: - pass - else: - self.header_sent = True - - # Sends header from first response - header.seek(0) - headers = httplib.HTTPMessage(header) - - content_length = filter(lambda i: i == "content-length", headers.dict.keys()) - if len(content_length) == 1: - content_length = int(headers.dict["content-length"]) - if content_length >= self.config.threshold: - self.content_length = content_length - fetcher.range = self.next_range(self.config.initial_blocksize) - for proxy in filter(lambda p: fetcher.proxy != p, self.config.endpoints): - if self.fetch_pos == self.content_length -1: - break - self.fetchers.append(Fetcher( self, proxy, self.url, self.header, self.next_range(self.config.initial_blocksize))) - - else: - content_length = None - - buf = "HTTP/1.1 200 OK\r\n" - for key in filter(lambda k: k not in ("content-range", "content-length"), headers.dict.keys()): - buf += "%s: %s\r\n" % (key, headers.dict[key]) - if content_length: - buf += "Content-Length: %s\r\n" % content_length - buf += "Content-Range: bytes %s-%s/%s\r\n" % (0, content_length-1, content_length) - buf += "X-Proxy: Magicproxy (superpower activated)\r\n" - buf += "\r\n" - self.channel.push(buf) - -class HTTPChannel(asynchat.async_chat): - def __init__(self, server, sock, addr): - self.server = server - self.data = StringIO.StringIO() - - asynchat.async_chat.__init__(self, sock) - self.set_terminator("\r\n\r\n") - - def handle_close(self): - self.connected = False - self.close() - - def collect_incoming_data(self, data): - self.data.write(data) - if self.data.tell() > 16384: - self.close_when_done() - - def found_terminator(self): - # parse http header - self.data.seek(0) - self.request = string.split(self.data.readline(), None, 2) - if len(self.request) != 3: - # badly formed request; just shut down - self.close_when_done() - else: - self.set_terminator(None) - headers = httplib.HTTPMessage(self.data).dict - self.handle_request(self.request[0], self.request[1], headers) - - def handle_request(self, method, path, headers): - url = urlparse.urlparse(path) - if method != "GET" or url.query != "": - #do not handle non-GET or GET with Query (?foo=bla) requests - proxy = self.server.config.endpoints[ int( md5.md5(url.hostname).hexdigest(),16 ) % len(self.server.config.endpoints) ] - print Fetcher(self, proxy, url, headers, (-1,-1)) - else: - MagicHTTPProxyClient(self, url, headers) - - def handle_incoming_http_header(self,fetcher,header): - header.seek(0) - headers = httplib.HTTPMessage(header) - buf = "HTTP/1.1 200 OK\r\n" - buf += "\r\n".join(map(lambda hdr: "%s: %s" % (hdr,headers.dict[hdr]), headers.dict.keys())) - buf += "\r\n\r\n" - self.push(buf) - - - def handle_incoming_data(self, fetcher, data=None, length=0): - if data: - self.push(data) - return True - -class HTTPProxyServer(asyncore.dispatcher): - def __init__(self,config): - self.config = config - asyncore.dispatcher.__init__(self) - self.create_socket(socket.AF_INET, socket.SOCK_STREAM) - self.set_reuse_addr() - self.bind(self.config.listen) - self.listen(5) - - def __str__(self): - return "" % self.config.listen - - def handle_accept(self): - conn, addr = self.accept() - HTTPChannel(self, conn, addr) - -if __name__ == "__main__": - proxy = HTTPProxyServer(DefaultConfiguration) - print proxy - asyncore.loop() -- cgit v1.2.1