#!/usr/bin/python -t import pwd, os, sys, logging, logging.handlers, string import asynchat, asyncore, socket, httplib, urlparse try: import cStringIO as StringIO except ImportError: import StringIO ENDPOINTS = [ # {'host':'10.1.0.1', 'port':8080}, # {'host':'10.2.2.11', 'port':8081}, ('10.3.1.2', 8888), ] #minimum entity size to start a paralel fetch MINIMAL_SIZE = 524288 BLOCKSIZE_FETCH = 524288 class Fetcher(asynchat.async_chat): def __init__(self, reader, proxy, url, header, range, content_length): print "Fetcher using proxy=%s for url=%s range=%s content_length=%s" % (proxy, url, range, content_length) self.reader = reader self.proxy = proxy self.url = url self.range = range self.content_length = content_length self.pos = range[0] asynchat.async_chat.__init__(self) self.set_terminator("\r\n\r\n") self.http_header = "" self.is_body = False self.create_socket(socket.AF_INET, socket.SOCK_STREAM) self.connect(proxy) #XXXshould include original request headers def handle_connect (self): print "handle_connect" self.send("GET http://%s:%s%s HTTP/1.0\r\n" % ( self.url.hostname, self.url.port or 80, self.url.path )) self.send("Content-Range: bytes %s-%s/%s\r\n" % (self.range[0], self.range[1], self.content_length)) self.send("\r\n") print "sent request" def collect_incoming_data(self, data): if self.is_body: print "read %s - range_start=%s range_end=%s" % (self.pos, self.range[0], self.range[1]) self.reader.handle_incoming_data(self.pos, data) self.pos += len(data) if self.pos >= self.range[1]: self.reader.finished() print "finished" self.close_when_done() else: print "get header data" print data self.http_header += data def found_terminator(self): print "header ends now" self.set_terminator(None) self.reader.handle_incoming_http_header(self.http_header) self.is_body = True class MultipleProxyReader(object): def __init__(self, channel, url, header, content_length): print "MultipleProxyReader url=%s content_length=%s" % (url, content_length) self.channel = channel self.url = url self.header = header self.content_length = content_length self.header_sent = False self.fetch_pos = 0 self.write_pos = 0 self.buffer = "" self.blocks = dict() self.next_endpoint = 0 self.finished() #bad-named XXX def handle_incoming_data(self, pos, data): print "got body data at pos %s" % pos, if self.write_pos == pos: self.write_pos += len(data) self.channel.push(data) print " ..sent" else: self.blocks[pos] = data print " ..stored" self.find_next_data() def handle_incoming_http_header(self, header): if self.header_sent: pass else: self.header_sent = True self.channel.push(header) self.channel.push("X-Proxy: Magicproxy (superpower activated)\r\n") self.channel.push("\r\n") def finished(self): self.find_next_data() if self.fetch_pos != self.content_length: print "start new fetcher" start = self.fetch_pos if self.fetch_pos+BLOCKSIZE_FETCH < self.content_length: self.fetch_pos += BLOCKSIZE_FETCH else: self.fetch_pos = self.content_length range = (start, self.fetch_pos-1) Fetcher(self, ENDPOINTS[self.next_endpoint], self.url, self.header, range, self.content_length) self.next_endpoint = (self.next_endpoint+1) % len(ENDPOINTS) else: #bin fertig self.channel.close_when_done() def find_next_data(self): if self.write_pos in self.blocks: data = self.blocks.pop(self.write_pos) self.channel.push(data) self.write_pos += len(data) self.find_next_data() class HTTPResponseProducer(object): def __init__(self, resp, amt=512): self.resp = resp self.amt = amt def more(self): self.resp.read(self.amt) class HTTPChannel(asynchat.async_chat): def __init__(self, server, sock, addr): asynchat.async_chat.__init__(self, sock) self.server = server self.set_terminator("\r\n\r\n") self.request = None self.data = StringIO.StringIO() def collect_incoming_data(self, data): self.data.write(data) if self.data.tell() > 16384: self.close_when_done() def found_terminator(self): if not self.request: # parse http header self.data.seek(0) self.request = string.split(self.data.readline(), None, 2) if len(self.request) != 3: # badly formed request; just shut down self.close_when_done() else: self.server.handle_request(self, self.request[0], self.request[1]) else: pass # ignore body data, for now class HTTPProxyServer(asyncore.dispatcher): def __init__(self): asyncore.dispatcher.__init__(self) self.port = 8080 self.create_socket(socket.AF_INET, socket.SOCK_STREAM) self.set_reuse_addr() self.bind(("", 8080)) self.listen(5) def handle_accept(self): conn, addr = self.accept() HTTPChannel(self, conn, addr) def handle_request(self, channel, method, path): url = urlparse.urlparse(path) print method, path if method != "GET" or url.query != "": #do not handle non-GET or GET with Query (?foo=bla) requests return self._bypass_request(channel, method, url) #check for content-length header with a HEAD request conn = httplib.HTTPConnection(url.hostname, url.port or 80) conn.request("HEAD", url.path) resp = conn.getresponse() content_length = filter(lambda it: it[0] == "content-length", resp.getheaders()) if len( content_length ) == 0: # no content length given, bypass this request return self._bypass_request(channel, "GET", url) else: content_length = int(content_length[0][1]) if content_length < MINIMAL_SIZE: return self._bypass_request(channel, "GET", url) print "start reader" ##XXX header = (("foo", "bla")) MultipleProxyReader(channel, url, header, content_length) def _bypass_request(self, channel, method, url): print "_bypass request" #XXX hier sollte nicht proxy gespielt werden sondern #die daten 1-zu-1 durchgereicht werden. #Weiterhin sollte sichergestellt werden, dass die requests #zu Host X1 immer ueber Proxy Y1 geroutet werden # etwa proxy=proxies[ stuff(hostname) % len(proxies) ] conn = httplib.HTTPConnection(url.hostname, url.port or 80) conn.request(method, url.path) resp = conn.getresponse() channel.push("HTTP/1.0 200 OK\r\nX-Proxy: Magicproxy (superpower disabled)\r\n") channel.push( "\r\n".join(map(lambda k: "%s: %s" % (k[0],k[1]), resp.getheaders())) ) channel.push("\r\n\r\n") channel.push_with_producer( HTTPResponseProducer(resp) ) channel.close_when_done() if __name__ == "__main__": proxy = HTTPProxyServer() asyncore.loop()