diff options
author | yvesf <yvesf-git@xapek.org> | 2010-04-05 22:11:37 +0200 |
---|---|---|
committer | yvesf <yvesf-git@xapek.org> | 2010-04-05 22:11:37 +0200 |
commit | d1914490462869fff47ba2294c32a5e6959e5832 (patch) | |
tree | 23e83707243a2aa9e0eb354ec9cdd15a8eb302db /proxy.py | |
parent | 75041878bd789d2b14d3ccebcb0eaeca2c3a6aca (diff) | |
parent | efe938d11ed46b8c328e9707ddfa61a3f5da3a4c (diff) | |
download | magicproxy-d1914490462869fff47ba2294c32a5e6959e5832.tar.gz magicproxy-d1914490462869fff47ba2294c32a5e6959e5832.zip |
Merge branch 'next'
Diffstat (limited to 'proxy.py')
-rwxr-xr-x | proxy.py | 270 |
1 files changed, 0 insertions, 270 deletions
diff --git a/proxy.py b/proxy.py deleted file mode 100755 index b0f31e8..0000000 --- a/proxy.py +++ /dev/null @@ -1,270 +0,0 @@ -#!/usr/bin/python -t -import os, sys, string, time -import asynchat, asyncore, socket, httplib, urlparse -from heapq import heappush, heappop -try: - import cStringIO as StringIO -except ImportError: - import StringIO - - -ENDPOINTS = [ - ('10.2.2.11', 8888), - ('10.3.1.2', 8888), -# ('10.1.1.156', 8888), -] - -kB = 1024 -#minimum entity size to start a paralel fetch -THRESHOLD = 512 * kB -#first fetch-range blocksize -INIT_BLOCKSIZE = 512 * kB -#lower bound of fetch-range blocksize optimization -MIN_BLOCKSIZE = 512 * kB -#time each fetcher spent on his range, calculated using -#speed measured while using INIT_BLOCKSIZE -TIME_SLICE = 5 -#start a new fetcher on a endpoint X-bytes before the -#old one finished -FETCHER_JUMPSTART = 32 * kB - -################# - -class Fetcher(asynchat.async_chat): - def __init__(self, reader, proxy, url, headers, range): - self.reader = reader - self.proxy = proxy - self.url = url - self.headers = headers - self.range = range - - self.pos = self.range[0] - self.start_time = 0 - self.stop_time = 0 - self.http_status = "" - self.http_header = "" - self.state = 0 #0=status, 1=header, 2=body - - asynchat.async_chat.__init__(self) - self.set_terminator("\r\n") - self.create_socket(socket.AF_INET, socket.SOCK_STREAM) - self.connect(self.proxy) - - def handle_connect (self): - print self, "Start" - self.send("GET http://%s:%s%s HTTP/1.0\r\n" % ( self.url.hostname, self.url.port or 80, self.url.path )) - for key in filter(lambda k: k not in ("range"), self.headers.keys()): #send origin request headers - self.send("%s: %s\r\n" % (key, self.headers[key])) - self.send("Range: bytes=%s-%s\r\n" % (self.range[0], self.range[1])) - self.send("\r\n") - self.start_time = time.time() - - def time(self): - if self.stop_time == 0: - return time.time() - self.start_time - else: - return self.stop_time - self.start_time - - def speed(self): - return (self.pos - self.range[0]) / self.time() - - def collect_incoming_data(self, data): - if self.state==2: #body - self.reader.handle_incoming_data(self, data) - self.pos += len(data) - if self.pos >= self.range[1]: - self.stop_time = time.time() - print self, "finished" - #make sure the next fetcher will be started - self.reader.handle_incoming_data(self) - self.close() - elif self.state ==1: #header - self.http_header += data - else: #status - self.http_status += data - - def found_terminator(self): - if self.state == 0: #got status-line - self.state = 1 - self.set_terminator("\r\n\r\n") - elif self.state == 1: #got headers - self.state = 2 - self.set_terminator(None) - self.reader.handle_incoming_http_header(self.http_header) - - def __str__(self): - return "<Fetcher proxy=%s url=%s range=%s" % (self.proxy, urlparse.urlunparse(self.url), self.range) - -class MultipleProxyReader(object): - def __init__(self, channel, url, header, content_length): - self.channel = channel - self.url = url - self.header = header - self.content_length = content_length - - self.header_sent = False - self.fetch_pos = 0 - self.write_pos = 0 - self.buffer = "" - self.blocks = list() - self.fetchers = list() - - for proxy in ENDPOINTS: - self.fetchers.append( Fetcher(self, proxy, self.url, self.header, self.next_range(INIT_BLOCKSIZE)) ) - - def handle_incoming_data(self, fetcher, data=None): - if not data: - self.fetchers = filter(lambda f: f != fetcher, self.fetchers) - else: - heappush(self.blocks, (fetcher.pos, data)) - - if fetcher.range[1] - fetcher.pos < FETCHER_JUMPSTART \ - and self.fetch_pos + 1 < self.content_length and not self.channel.is_closed \ - and len( filter( (lambda f: f.proxy == fetcher.proxy), self.fetchers) ) < 2: - #Start a new fetcher on this line if this fetchers is X-Bytes before finishing his jobs - blocksize = max(int(TIME_SLICE * fetcher.speed()), MIN_BLOCKSIZE) - fetch_range = self.next_range(blocksize) - print "Start new Fetcher, bs=%s range=%s" % (blocksize,fetch_range) - self.fetchers.append( Fetcher(self, fetcher.proxy, self.url, self.header, fetch_range) ) - - while self.send_next_data(): - pass - - def next_range(self, suggested_blocksize): - start = self.fetch_pos - self.fetch_pos = min(self.fetch_pos + suggested_blocksize, self.content_length) - return (start, self.fetch_pos-1) - - def handle_incoming_http_header(self, header): - if self.header_sent: - pass - else: - self.header_sent = True - - self.channel.push("HTTP/1.0 200 OK\r\n") - # Sends header from first response - headers = httplib.HTTPMessage(StringIO.StringIO(header)) - for key in filter(lambda k: k not in ("content-range", "content-length"), headers.dict.keys()): - self.channel.push("%s: %s\r\n" % (key, headers.dict[key])) - self.channel.push("Content-Length: %s" % self.content_length) - self.channel.push("X-Proxy: Magicproxy (superpower activated)\r\n") - self.channel.push("\r\n") - - def send_next_data(self): - if self.channel.is_closed: - print self, "request side closed the connection" - self.channel.close_when_done() - #XXX terminate all running fetcher - return False - - #print self, "expect data at %s in" % self.write_pos, self.blocks.keys() - if len(self.blocks)>0 and min(self.blocks)[0] == self.write_pos: - item = heappop(self.blocks) - self.channel.push(item[1]) - self.write_pos += len(item[1]) - return True - - if self.write_pos + 1 >= self.content_length: - print self, "job done %s blocks left" % len(self.blocks) - #XXX prevent next calls to send_next_data - self.channel.close_when_done() - - return False - - def __str__(self): - return "<MultipleProxyReader url=%s content_length=%s>" % (urlparse.urlunparse(self.url), self.content_length) - -class HTTPChannel(asynchat.async_chat): - def __init__(self, server, sock, addr): - self.server = server - - self.data = StringIO.StringIO() - self.is_closed = False - self.request = None - - asynchat.async_chat.__init__(self, sock) - self.set_terminator("\r\n\r\n") - - def handle_close(self): - self.is_closed = True - - def collect_incoming_data(self, data): - self.data.write(data) - if self.data.tell() > 16384: - self.close_when_done() - - def found_terminator(self): - if not self.request: - # parse http header - self.data.seek(0) - self.request = string.split(self.data.readline(), None, 2) - if len(self.request) != 3: - # badly formed request; just shut down - self.close_when_done() - else: - headers = httplib.HTTPMessage(self.data).dict - self.server.handle_request(self, self.request[0], self.request[1], headers) - else: - pass # ignore body data, for now - -class HTTPProxyServer(asyncore.dispatcher): - def __init__(self): - self.port = 8080 - - asyncore.dispatcher.__init__(self) - self.create_socket(socket.AF_INET, socket.SOCK_STREAM) - self.set_reuse_addr() - self.bind(("", 8080)) - self.listen(5) - - def shutdown(self): - #TODO Hier Proxy sauber beenden - #self.channel.close_when_done() - sys.exit() - - def handle_accept(self): - conn, addr = self.accept() - HTTPChannel(self, conn, addr) - - def handle_request(self, channel, method, path, headers): - url = urlparse.urlparse(path) - if method != "GET" or url.query != "": - #do not handle non-GET or GET with Query (?foo=bla) requests - return self._bypass_request(channel, method, url, headers) - - #check for content-length header with a HEAD request - conn = httplib.HTTPConnection(url.hostname, url.port or 80) - conn.request("HEAD", url.path) - resp = conn.getresponse() - content_length = filter(lambda it: it[0] == "content-length", resp.getheaders()) - if len( content_length ) == 0: - # no content length given, bypass this request - print "missing content-length, bypass" - self._bypass_request(channel, "GET", url, headers) - else: - content_length = int(content_length[0][1]) - - if content_length < THRESHOLD: - self._bypass_request(channel, "GET", url, headers) - else: - MultipleProxyReader(channel, url, headers, content_length) - - def _bypass_request(self, channel, method, url, headers): - print "_bypass request: %s %s" % (method, urlparse.urlunparse(url)) - #XXX hier sollte nicht proxy gespielt werden sondern - #die daten 1-zu-1 durchgereicht werden. - #Weiterhin sollte sichergestellt werden, dass die requests - #zu Host X1 immer ueber Proxy Y1 geroutet werden - # etwa proxy=proxies[ stuff(hostname) % len(proxies) ] - conn = httplib.HTTPConnection(url.hostname, url.port or 80) - conn.request(method, url.path, body="", headers=headers) - resp = conn.getresponse() - channel.push("HTTP/1.0 200 OK\r\nX-Proxy: Magicproxy (superpower disabled)\r\n") - channel.push( "\r\n".join(map(lambda k: "%s: %s" % (k[0],k[1]), resp.getheaders())) ) - channel.push("\r\n\r\n") - channel.push(resp.read()) - channel.close_when_done() - -if __name__ == "__main__": - proxy = HTTPProxyServer() - asyncore.loop() |