#!/usr/bin/python -t #TODO: ignore non-functional endpoints #TODO: support Range: ... in requests import os, sys, string, time, md5, random import asynchat, asyncore, socket, httplib, urlparse from heapq import heappush, heappop import cStringIO as StringIO kB = 1024 class DefaultConfiguration: """bind to that""" listen=("",8080) """available http-proxies""" endpoints=[ ('10.2.2.11', 8888), #('10.3.1.2',8888) ] """minimum entity size to start parallelize fetch""" threshold = 512*kB """initial size/range for a fetcher-job""" initial_blocksize=512*kB """minimal size/range for a fetcher-job""" minimal_blocksize=512*kB """(sec) #time each fetcher spent on his range, calculated using speed measured while using initial_blocksize""" time_slice=5 """start a new fetcher on a endpoint X-bytes before the old one finished""" fetcher_jumpstart=32*kB ################# class Fetcher(asynchat.async_chat): def __init__(self, reader, proxy, url, headers, fetch_range): self.reader = reader self.proxy = proxy self.url = url self.headers = headers self.range = fetch_range self.pos = (self.range[0] != -1) and self.range[0] or 0 self.start_time = 0 self.stop_time = 0 self.http_status = "" self.http_header = StringIO.StringIO() self.state = 0 #0=status, 1=header, 2=body asynchat.async_chat.__init__(self) self.set_terminator("\r\n") self.create_socket(socket.AF_INET, socket.SOCK_STREAM) self.connect(self.proxy) def __str__(self): return "= self.range[1] + 1: #if this request is the first one (whithout Range: ...) then the server # dont send us our expected range, we must cut it at some point (here) bytes_remaining = self.range[1] - ( self.pos ) data=data[:bytes_remaining+1] print self,"cut: pos=%s length=%s => %s" % (self.pos, length, len(data)) length = len(data) if length == 0: self.reader.handle_incoming_data(self) self.close() return if not self.reader.handle_incoming_data(self, data, length): self.close() return self.pos += length if self.range != (-1,-1) and self.pos >= self.range[1]: self.stop_time = time.time() print self, "finished with %s kB/s" % (self.speed() / 1024) self.reader.handle_incoming_data(self) self.close() elif self.state ==1: #header self.http_header.write( data ) else: #status self.http_status += data def found_terminator(self): if self.state == 0: #got status-line status = self.http_status.split(" ") if len(status) > 1: try: self.http_status_code = int(status[1]) except: self.http_status_code = 520 #Bad Gateway else: self.http_status_code = 520 #Bad Gateway self.state = 1 self.set_terminator("\r\n\r\n") #end of header elif self.state == 1: #got headers self.state = 2 self.set_terminator(None) self.reader.handle_incoming_http_header(self, self.http_status_code, self.http_header) class MagicHTTPProxyClient(object): def __init__(self, channel, url, header): self.channel = channel self.config = self.channel.server.config self.url = url self.header = header self.content_length = -1 self.header_sent = False self.fetch_pos = 0 self.write_pos = 0 self.buffer = "" self.blocks = list() self.fetchers = list() proxy = self.config.endpoints[ random.randint(0, len(self.config.endpoints)-1) ] self.fetchers.append( Fetcher(self, proxy, self.url, self.header, (-1,-1)) ) def __str__(self): return "" % (self.url.hostname, self.url.path, self.content_length) def handle_incoming_data(self, fetcher, data=None, length=0): if not data: #fetcher is done, remove from list self.fetchers = filter(lambda f: f != fetcher, self.fetchers) print "Remove: %s" % fetcher else: assert fetcher.pos < fetcher.range[1] or fetcher.range == (-1,-1) heappush(self.blocks, (fetcher.pos, data, length)) if not self.channel.connected: print self, "request side closed the connection" return False if fetcher.range != (-1,-1) \ and fetcher.range[1] - (fetcher.pos+length) < self.config.fetcher_jumpstart \ and self.fetch_pos + 1 < self.content_length \ and len( filter(lambda f: f.proxy == fetcher.proxy, self.fetchers) ) < 2: #Start a new fetcher if this fetcher is X-Bytes before finished his job blocksize = max(int(self.config.time_slice * fetcher.speed()), self.config.minimal_blocksize) fetch_range = self.next_range(blocksize) self.fetchers.append( Fetcher(self, fetcher.proxy, self.url, self.header, fetch_range) ) #if len(self.blocks)>0: #print self,"fetch_pos=%s write_pos=%s get=%s with length=%s pending=%s" % (self.fetch_pos, self.write_pos, min(self.blocks)[0],min(self.blocks)[2], len(self.blocks)) buf = "" while len(self.blocks)>0 and min(self.blocks)[0] == self.write_pos: item = heappop(self.blocks) buf += item[1] self.write_pos += item[2] if buf != "": self.channel.push(buf) if self.write_pos + 1 >= self.content_length: print self, "job done %s blocks left" % len(self.blocks) self.channel.close_when_done() return True def next_range(self, suggested_blocksize): assert self.content_length != -1 start = self.fetch_pos self.fetch_pos = min(self.fetch_pos + suggested_blocksize, self.content_length) return (start, self.fetch_pos-1) def handle_incoming_http_header(self, fetcher, status_code, header): if not self.channel.connected: return if self.header_sent: if status_code < 200 or status_code >= 300: print self, "Error: got error code %s in %s. Giving up" % (status_code, fetcher) self.channel.close() else: self.header_sent = True # Sends header from first response header.seek(0) headers = httplib.HTTPMessage(header) content_length = filter(lambda i: i == "content-length", headers.dict.keys()) #if there are content-length headers decide if entity size is #bigger then threshold, if true then start n proxies (n=#endpoints) if len(content_length) == 1: content_length = int(headers.dict["content-length"]) if content_length >= self.config.threshold: self.content_length = content_length fetcher.range = self.next_range(self.config.initial_blocksize) for proxy in filter(lambda p: fetcher.proxy != p, self.config.endpoints): if self.fetch_pos == self.content_length -1: break self.fetchers.append(Fetcher( self, proxy, self.url, self.header, self.next_range(self.config.initial_blocksize))) else: content_length = None buf = "HTTP/1.1 %s OK\r\n" % (status_code) buf += "".join(map(lambda key: "%s: %s\r\n" % (key, headers.dict[key]), filter(lambda k: k not in ("content-range", "content-length"), headers.dict.keys()))) if content_length: buf += "Content-Length: %s\r\n" % content_length buf += "Content-Range: bytes 0-%s/%s\r\n" % (content_length-1, content_length) buf += "X-Proxy: Magicproxy; using proxies %s\r\n" % ", ".join(map(lambda host: "%s:%s"%host, self.config.endpoints)) buf += "\r\n" self.channel.push(buf) class HTTPChannel(asynchat.async_chat): def __init__(self, server, sock, addr): self.server = server self.data = StringIO.StringIO() asynchat.async_chat.__init__(self, sock) self.set_terminator("\r\n\r\n") def handle_close(self): self.connected = False self.close() def collect_incoming_data(self, data): self.data.write(data) if self.data.tell() > 16384: self.close_when_done() def found_terminator(self): # parse http header self.data.seek(0) self.request = string.split(self.data.readline(), None, 2) if len(self.request) != 3: # badly formed request; just shut down self.close_when_done() else: self.set_terminator(None) headers = httplib.HTTPMessage(self.data).dict self.handle_request(self.request[0], self.request[1], headers) def handle_request(self, method, path, headers): url = urlparse.urlparse(path) if method != "GET" or url.query != "": #do not handle non-GET or GET with Query (?foo=bla) requests proxy = self.server.config.endpoints[ int( md5.md5(url.hostname).hexdigest(),16 ) % len(self.server.config.endpoints) ] print Fetcher(self, proxy, url, headers, (-1,-1)) else: MagicHTTPProxyClient(self, url, headers) def handle_incoming_http_header(self,fetcher, status_code, header): header.seek(0) headers = httplib.HTTPMessage(header) buf = "HTTP/1.1 %s OK\r\n" % status_code buf += "\r\n".join(map(lambda hdr: "%s: %s" % (hdr,headers.dict[hdr]), headers.dict.keys())) buf += "\r\n\r\n" self.push(buf) def handle_incoming_data(self, fetcher, data=None, length=0): if data: self.push(data) return True class HTTPProxyServer(asyncore.dispatcher): def __init__(self,config): self.config = config asyncore.dispatcher.__init__(self) self.create_socket(socket.AF_INET, socket.SOCK_STREAM) self.set_reuse_addr() self.bind(self.config.listen) self.listen(5) def __str__(self): return "" % self.config.listen def handle_accept(self): conn, addr = self.accept() HTTPChannel(self, conn, addr) def main(): proxy = HTTPProxyServer(DefaultConfiguration) print proxy asyncore.loop()