#!/usr/bin/python -t import os, sys, string, time, md5 import asynchat, asyncore, socket, httplib, urlparse from heapq import heappush, heappop try: import cStringIO as StringIO except ImportError: import StringIO ENDPOINTS = [ ('10.2.2.11', 8888), ('10.3.1.2', 8888), ('10.1.1.156', 8888), ] kB = 1024 #minimum entity size to start a paralel fetch THRESHOLD = 512 * kB #first fetch-range blocksize INIT_BLOCKSIZE = 512 * kB #lower bound of fetch-range blocksize optimization MIN_BLOCKSIZE = 512 * kB #time each fetcher spent on his range, calculated using #speed measured while using INIT_BLOCKSIZE TIME_SLICE = 5 #start a new fetcher on a endpoint X-bytes before the #old one finished FETCHER_JUMPSTART = 32 * kB ################# class Fetcher(asynchat.async_chat): def __init__(self, reader, proxy, url, headers, range): self.reader = reader self.proxy = proxy self.url = url self.headers = headers self.range = range self.pos = self.range[0] self.start_time = 0 self.stop_time = 0 self.http_status = "" self.http_header = StringIO.StringIO() self.state = 0 #0=status, 1=header, 2=body asynchat.async_chat.__init__(self) self.set_terminator("\r\n") self.create_socket(socket.AF_INET, socket.SOCK_STREAM) self.connect(self.proxy) def __str__(self): return "= self.range[1]: self.stop_time = time.time() print self, "finished" #make sure the next fetcher will be started self.reader.handle_incoming_data(self) self.close() elif self.state ==1: #header self.http_header.write( data ) else: #status self.http_status += data def found_terminator(self): if self.state == 0: #got status-line self.state = 1 self.set_terminator("\r\n\r\n") elif self.state == 1: #got headers self.state = 2 self.set_terminator(None) self.reader.handle_incoming_http_header(self.http_header) class StringIOProducer(object): def __init__(self,buf, amt=1440): self.buf=buf self.amt = amt self.buf.seek(0) def more(self): return self.buf.read(self.amt) class MultipleProxyReader(object): def __init__(self, channel, url, header, content_length): self.channel = channel self.url = url self.header = header self.content_length = content_length print self, "New Instance" self.header_sent = False self.fetch_pos = 0 self.write_pos = 0 self.buffer = "" self.blocks = list() self.fetchers = list() for proxy in ENDPOINTS: self.fetchers.append( Fetcher(self, proxy, self.url, self.header, self.next_range(INIT_BLOCKSIZE)) ) def __str__(self): return "" % (urlparse.urlunparse(self.url), self.content_length) def handle_incoming_data(self, fetcher, data=None, length=0): if not data: self.fetchers = filter(lambda f: f != fetcher, self.fetchers) else: heappush(self.blocks, (fetcher.pos, data, length)) if self.channel.is_closed: print self, "request side closed the connection" return if fetcher.range[1] - fetcher.pos < FETCHER_JUMPSTART \ and self.fetch_pos + 1 < self.content_length and not self.channel.is_closed \ and len( filter( (lambda f: f.proxy == fetcher.proxy), self.fetchers) ) < 2: #Start a new fetcher on this line if this fetchers is X-Bytes before finishing his jobs blocksize = max(int(TIME_SLICE * fetcher.speed()), MIN_BLOCKSIZE) fetch_range = self.next_range(blocksize) print "Start new Fetcher, bs=%s range=%s" % (blocksize,fetch_range) self.fetchers.append( Fetcher(self, fetcher.proxy, self.url, self.header, fetch_range) ) buf = StringIO.StringIO() while len(self.blocks)>0 and min(self.blocks)[0] == self.write_pos: item = heappop(self.blocks) buf.write(item[1]) self.write_pos += item[2] if buf.tell() > 0: self.channel.push_with_producer(StringIOProducer(buf)) if self.write_pos + 1 >= self.content_length: print self, "job done %s blocks left" % len(self.blocks) self.channel.is_closed = True self.channel.close_when_done() def next_range(self, suggested_blocksize): start = self.fetch_pos self.fetch_pos = min(self.fetch_pos + suggested_blocksize, self.content_length) return (start, self.fetch_pos-1) def handle_incoming_http_header(self, header): if self.header_sent: pass else: self.header_sent = True self.channel.push("HTTP/1.0 200 OK\r\n") # Sends header from first response header.seek(0) headers = httplib.HTTPMessage(header) for key in filter(lambda k: k not in ("content-range", "content-length"), headers.dict.keys()): self.channel.push("%s: %s\r\n" % (key, headers.dict[key])) self.channel.push("Content-Length: %s\r\n" % self.content_length) self.channel.push("X-Proxy: Magicproxy (superpower activated)\r\n") self.channel.push("\r\n") """Transparent forward to other proxy server""" class HTTPProxyClient(asynchat.async_chat): def __init__(self, proxy, channel, method, url, headers): self.proxy = proxy self.other = channel self.method = method self.headers = headers asynchat.async_chat.__init__(self) self.set_terminator(None) self.create_socket(socket.AF_INET, socket.SOCK_STREAM) self.connect(self.proxy) self.buf = StringIO.StringIO() self.buf.write("%s %s HTTP/1.0\r\n" % (method, urlparse.urlunparse(url))) for key in headers.keys(): self.buf.write("%s: %s\r\n" % (key, headers[key])) self.buf.write("\r\n") def __str__(self): return "" % self.proxy def collect_incoming_data(self, data): self.other.push(data) def handle_close(self): self.close() self.other.close_when_done() # print self, "Done" def handle_connect(self): self.push_with_producer(StringIOProducer(self.buf)) class HTTPChannel(asynchat.async_chat): def __init__(self, server, sock, addr): self.server = server self.data = StringIO.StringIO() self.is_closed = False self.request = None asynchat.async_chat.__init__(self, sock) self.set_terminator("\r\n\r\n") def handle_close(self): self.close() self.is_closed = True def collect_incoming_data(self, data): self.data.write(data) if self.data.tell() > 16384: self.close_when_done() def found_terminator(self): if not self.request: # parse http header self.data.seek(0) self.request = string.split(self.data.readline(), None, 2) if len(self.request) != 3: # badly formed request; just shut down self.close_when_done() else: headers = httplib.HTTPMessage(self.data).dict self.server.handle_request(self, self.request[0], self.request[1], headers) else: pass # ignore body data, for now class HTTPProxyServer(asyncore.dispatcher): def __init__(self): self.port = 8080 asyncore.dispatcher.__init__(self) self.create_socket(socket.AF_INET, socket.SOCK_STREAM) self.set_reuse_addr() self.bind(("", 8080)) self.listen(5) def __str__(self): return "" % self.port def shutdown(self): #TODO Hier Proxy sauber beenden #self.channel.close_when_done() sys.exit() def handle_accept(self): conn, addr = self.accept() HTTPChannel(self, conn, addr) def handle_request(self, channel, method, path, headers): url = urlparse.urlparse(path) if method != "GET" or url.query != "": #do not handle non-GET or GET with Query (?foo=bla) requests return self._bypass_request(channel, method, url, headers) #check for content-length header with a HEAD request conn = httplib.HTTPConnection(url.hostname, url.port or 80) conn.request("HEAD", url.path) resp = conn.getresponse() content_length = filter(lambda it: it[0] == "content-length", resp.getheaders()) if len( content_length ) == 0: # no content length given, bypass this request self._bypass_request(channel, "GET", url, headers) else: content_length = int(content_length[0][1]) if content_length < THRESHOLD: self._bypass_request(channel, "GET", url, headers) else: MultipleProxyReader(channel, url, headers, content_length) def _bypass_request(self, channel, method, url, headers): proxy = ENDPOINTS[ int( md5.md5(url.hostname).hexdigest(),16 ) % len(ENDPOINTS) ] print self, "_bypass request via %s: %s %s" % (proxy, method, urlparse.urlunparse(url)) HTTPProxyClient(proxy, channel, method, url, headers) if __name__ == "__main__": proxy = HTTPProxyServer() asyncore.loop()