#!/usr/bin/python -t import os, sys, string, time import asynchat, asyncore, socket, httplib, urlparse try: import cStringIO as StringIO except ImportError: import StringIO ENDPOINTS = [ ('10.2.2.11', 8888), ('10.3.1.2', 8888), # ('10.1.1.156', 8888), ] kB = 1024 #minimum entity size to start a paralel fetch THRESHOLD = 0 * kB #512 * kB #first fetch-range blocksize INIT_BLOCKSIZE = 512 * kB #lower bound of fetch-range blocksize optimization MIN_BLOCKSIZE = 512 * kB #time each fetcher spent on his range, calculated using #speed measured while using INIT_BLOCKSIZE TIME_SLICE = 5 #start a new fetcher on a endpoint X-bytes before the #old one finished FETCHER_JUMPSTART = 32 * kB ################# class Fetcher(asynchat.async_chat): def __init__(self, reader, endpoint, url, headers, range): self.reader = reader self.endpoint = endpoint self.url = url self.headers = headers self.range = range self.pos = self.range[0] self.proxy = ENDPOINTS[endpoint] self.start_time = 0 self.stop_time = 0 self.http_status = "" self.http_header = "" self.state = 0 #0=status, 1=header, 2=body asynchat.async_chat.__init__(self) self.set_terminator("\r\n") self.create_socket(socket.AF_INET, socket.SOCK_STREAM) self.connect(self.proxy) def handle_connect (self): print self, "Start" self.send("GET http://%s:%s%s HTTP/1.0\r\n" % ( self.url.hostname, self.url.port or 80, self.url.path )) for key in filter(lambda k: k not in ("range"), self.headers.keys()): #origin request headers self.send("%s: %s\r\n" % (key, self.headers[key])) self.send("Range: bytes=%s-%s\r\n" % (self.range[0], self.range[1])) self.send("\r\n") self.start_time = time.time() def time(self): if self.stop_time == 0: return time.time() - self.start_time else: return self.stop_time - self.start_time def speed(self): return (self.pos - self.range[0]) / self.time() def collect_incoming_data(self, data): if self.state==2: #body self.reader.handle_incoming_data(self, data) self.pos += len(data) if self.pos >= self.range[1]: self.stop_time = time.time() print self, "finished" #XXX just to make sure the next fetcher will be started self.reader.handle_incoming_data(self, "") self.close() elif self.state ==1: #header self.http_header += data else: #status self.http_status += data def found_terminator(self): if self.state == 0: #got status-line self.state = 1 self.set_terminator("\r\n\r\n") elif self.state == 1: #got headers self.state = 2 self.set_terminator(None) self.reader.handle_incoming_http_header(self.http_header) def __str__(self): return "= self.content_length: print self, "job done" self.channel.close_when_done() return False def __str__(self): return "" % (urlparse.urlunparse(self.url), self.content_length) class HTTPChannel(asynchat.async_chat): def __init__(self, server, sock, addr): self.server = server self.data = StringIO.StringIO() self.is_closed = False self.request = None asynchat.async_chat.__init__(self, sock) self.set_terminator("\r\n\r\n") def handle_close(self): self.is_closed = True def collect_incoming_data(self, data): self.data.write(data) if self.data.tell() > 16384: self.close_when_done() def found_terminator(self): if not self.request: # parse http header self.data.seek(0) self.request = string.split(self.data.readline(), None, 2) if len(self.request) != 3: # badly formed request; just shut down self.close_when_done() else: headers = httplib.HTTPMessage(self.data).dict self.server.handle_request(self, self.request[0], self.request[1], headers) else: pass # ignore body data, for now class HTTPProxyServer(asyncore.dispatcher): def __init__(self): self.port = 8080 asyncore.dispatcher.__init__(self) self.create_socket(socket.AF_INET, socket.SOCK_STREAM) self.set_reuse_addr() self.bind(("", 8080)) self.listen(5) def handle_accept(self): conn, addr = self.accept() HTTPChannel(self, conn, addr) def handle_request(self, channel, method, path, headers): url = urlparse.urlparse(path) print method, path if method != "GET" or url.query != "": print "non-GET or query, bypass" #do not handle non-GET or GET with Query (?foo=bla) requests return self._bypass_request(channel, method, url) #check for content-length header with a HEAD request conn = httplib.HTTPConnection(url.hostname, url.port or 80) conn.request("HEAD", url.path) resp = conn.getresponse() content_length = filter(lambda it: it[0] == "content-length", resp.getheaders()) if len( content_length ) == 0: # no content length given, bypass this request print "missing content-length, bypass" self._bypass_request(channel, "GET", url) else: content_length = int(content_length[0][1]) if content_length < THRESHOLD: self._bypass_request(channel, "GET", url) else: MultipleProxyReader(channel, url, headers, content_length) def _bypass_request(self, channel, method, url): print "_bypass request hostname=%s port=%s path=%s" % (url.hostname, url.port or 80, url.path) #XXX hier sollte nicht proxy gespielt werden sondern #die daten 1-zu-1 durchgereicht werden. #Weiterhin sollte sichergestellt werden, dass die requests #zu Host X1 immer ueber Proxy Y1 geroutet werden # etwa proxy=proxies[ stuff(hostname) % len(proxies) ] conn = httplib.HTTPConnection(url.hostname, url.port or 80) conn.request(method, url.path) resp = conn.getresponse() channel.push("HTTP/1.0 200 OK\r\nX-Proxy: Magicproxy (superpower disabled)\r\n") channel.push( "\r\n".join(map(lambda k: "%s: %s" % (k[0],k[1]), resp.getheaders())) ) channel.push("\r\n\r\n") channel.push(resp.read()) channel.close_when_done() if __name__ == "__main__": proxy = HTTPProxyServer() asyncore.loop()