diff options
author | Thomas Keck <thomas@macbook.macbook> | 2010-03-14 22:43:25 +0100 |
---|---|---|
committer | Thomas Keck <thomas@macbook.macbook> | 2010-03-14 22:43:25 +0100 |
commit | b36eef6862a0ce3f0357f1769932a8409495496c (patch) | |
tree | fd38a369854fb40be5671730e8f07227b64780a5 /proxy.py | |
parent | f3a893bb234351bb82a8dc7dce17d4bd9d7887d5 (diff) | |
parent | 90e44d2a7f88e73bba511f945dba60cdc9cefae6 (diff) | |
download | magicproxy-b36eef6862a0ce3f0357f1769932a8409495496c.tar.gz magicproxy-b36eef6862a0ce3f0357f1769932a8409495496c.zip |
Merge branch 'master' of git://github.com/yvesf/magicproxy
Conflicts:
proxy.py
Diffstat (limited to 'proxy.py')
-rwxr-xr-x | proxy.py | 175 |
1 files changed, 66 insertions, 109 deletions
@@ -1,16 +1,13 @@ #!/usr/bin/python -t -import os, sys, string, time, md5 +import os, sys, string, time, md5, random import asynchat, asyncore, socket, httplib, urlparse from heapq import heappush, heappop -try: - import cStringIO as StringIO -except ImportError: - import StringIO +import cStringIO as StringIO ENDPOINTS = [ ('10.2.2.11', 8888), - ('10.3.1.2', 8888), +# ('10.3.1.2', 8888), ('10.1.1.156', 8888), ] @@ -38,7 +35,7 @@ class Fetcher(asynchat.async_chat): self.headers = headers self.range = range - self.pos = self.range[0] + self.pos = (self.range[0] != -1) and self.range[0] or 0 self.start_time = 0 self.stop_time = 0 self.http_status = "" @@ -58,7 +55,8 @@ class Fetcher(asynchat.async_chat): self.send("GET http://%s:%s%s HTTP/1.0\r\n" % ( self.url.hostname, self.url.port or 80, self.url.path )) for key in filter(lambda k: k not in ("range"), self.headers.keys()): #send origin request headers self.send("%s: %s\r\n" % (key, self.headers[key])) - self.send("Range: bytes=%s-%s\r\n" % (self.range[0], self.range[1])) + if self.range != (-1,-1): + self.send("Range: bytes=%s-%s\r\n" % (self.range[0], self.range[1])) self.send("\r\n") self.start_time = time.time() @@ -76,9 +74,9 @@ class Fetcher(asynchat.async_chat): length = len(data) self.reader.handle_incoming_data(self, data, length) self.pos += length - if self.pos >= self.range[1]: + if self.range != (-1,-1) and self.pos >= self.range[1]: self.stop_time = time.time() - print self, "finished" + print self, "finished with %s kB/s" % (self.speed() / 1024) #make sure the next fetcher will be started self.reader.handle_incoming_data(self) self.close() @@ -94,25 +92,15 @@ class Fetcher(asynchat.async_chat): elif self.state == 1: #got headers self.state = 2 self.set_terminator(None) - self.reader.handle_incoming_http_header(self.http_header) + self.reader.handle_incoming_http_header(self, self.http_header) -class StringIOProducer(object): - def __init__(self,buf, amt=1440): - self.buf=buf - self.amt = amt - self.buf.seek(0) - - def more(self): - return self.buf.read(self.amt) - -class MultipleProxyReader(object): - def __init__(self, channel, url, header, content_length): +class MagicHTTPProxyClient(object): + def __init__(self, channel, url, header): self.channel = channel self.url = url self.header = header - self.content_length = content_length - print self, "New Instance" - + + self.content_length = -1 self.header_sent = False self.fetch_pos = 0 self.write_pos = 0 @@ -120,98 +108,88 @@ class MultipleProxyReader(object): self.blocks = list() self.fetchers = list() - for proxy in ENDPOINTS: - self.fetchers.append( Fetcher(self, proxy, self.url, self.header, self.next_range(INIT_BLOCKSIZE)) ) + print self, "New Instance" + + proxy = ENDPOINTS[ random.randint(0, len(ENDPOINTS)-1) ] + self.fetchers.append( Fetcher(self, proxy, self.url, self.header, (-1,-1)) ) def __str__(self): - return "<MultipleProxyReader url=%s content_length=%s>" % (urlparse.urlunparse(self.url), self.content_length) + return "<MagicHTTPProxyClient url=%s content_length=%s>" % (urlparse.urlunparse(self.url), self.content_length) def handle_incoming_data(self, fetcher, data=None, length=0): if not data: + #fetcher is done, remove from list self.fetchers = filter(lambda f: f != fetcher, self.fetchers) else: heappush(self.blocks, (fetcher.pos, data, length)) - if self.channel.is_closed: + if not self.channel.connected: print self, "request side closed the connection" return - if fetcher.range[1] - fetcher.pos < FETCHER_JUMPSTART \ - and self.fetch_pos + 1 < self.content_length and not self.channel.is_closed \ - and len( filter( (lambda f: f.proxy == fetcher.proxy), self.fetchers) ) < 2: - #Start a new fetcher on this line if this fetchers is X-Bytes before finishing his jobs + if fetcher.range != (-1,-1) \ + and fetcher.range[1] - fetcher.pos < FETCHER_JUMPSTART \ + and self.fetch_pos + 1 < self.content_length and self.channel.connected \ + and len( filter(lambda f: f.proxy == fetcher.proxy, self.fetchers) ) < 2: + #Start a new fetcher on this line if this fetchers is X-Bytes before finished his job blocksize = max(int(TIME_SLICE * fetcher.speed()), MIN_BLOCKSIZE) fetch_range = self.next_range(blocksize) print "Start new Fetcher, bs=%s range=%s" % (blocksize,fetch_range) self.fetchers.append( Fetcher(self, fetcher.proxy, self.url, self.header, fetch_range) ) - buf = StringIO.StringIO() + buf = "" while len(self.blocks)>0 and min(self.blocks)[0] == self.write_pos: item = heappop(self.blocks) - buf.write(item[1]) + buf += item[1] self.write_pos += item[2] - - if buf.tell() > 0: - self.channel.push_with_producer(StringIOProducer(buf)) + if len(self.blocks)>0: + print "missed: %s => %s" % (self.write_pos, min(self.blocks)[0]) + if buf != "": + self.channel.push(buf) if self.write_pos + 1 >= self.content_length: print self, "job done %s blocks left" % len(self.blocks) - self.channel.is_closed = True self.channel.close_when_done() def next_range(self, suggested_blocksize): + assert self.content_length != -1 start = self.fetch_pos self.fetch_pos = min(self.fetch_pos + suggested_blocksize, self.content_length) return (start, self.fetch_pos-1) - def handle_incoming_http_header(self, header): + def handle_incoming_http_header(self, fetcher, header): if self.header_sent: pass else: self.header_sent = True - self.channel.push("HTTP/1.0 200 OK\r\n") # Sends header from first response header.seek(0) headers = httplib.HTTPMessage(header) - for key in filter(lambda k: k not in ("content-range", "content-length"), headers.dict.keys()): - self.channel.push("%s: %s\r\n" % (key, headers.dict[key])) - self.channel.push("Content-Length: %s\r\n" % self.content_length) - self.channel.push("X-Proxy: Magicproxy (superpower activated)\r\n") - self.channel.push("\r\n") - -"""Transparent forward to other proxy server""" -class HTTPProxyClient(asynchat.async_chat): - def __init__(self, proxy, channel, method, url, headers): - self.proxy = proxy - self.other = channel - self.method = method - self.headers = headers - - asynchat.async_chat.__init__(self) - self.set_terminator(None) - self.create_socket(socket.AF_INET, socket.SOCK_STREAM) - self.connect(self.proxy) - - self.buf = StringIO.StringIO() - self.buf.write("%s %s HTTP/1.0\r\n" % (method, urlparse.urlunparse(url))) - for key in headers.keys(): - self.buf.write("%s: %s\r\n" % (key, headers[key])) - self.buf.write("\r\n") - - def __str__(self): - return "<HTTPProxyClient proxy=%s:%s>" % self.proxy - def collect_incoming_data(self, data): - self.other.push(data) + content_length = filter(lambda i: i == "content-length", headers.dict.keys()) + if len(content_length) == 1: + content_length = int(headers.dict["content-length"]) + if content_length >= THRESHOLD: + self.content_length = content_length + fetcher.range = self.next_range(INIT_BLOCKSIZE) + for proxy in filter(lambda p: fetcher.proxy != p, ENDPOINTS): + if self.fetch_pos == self.content_length -1: + break + self.fetchers.append(Fetcher( self, proxy, self.url, self.header, self.next_range(INIT_BLOCKSIZE))) - def handle_close(self): - self.close() - self.other.close_when_done() -# print self, "Done" + else: + content_length = None - def handle_connect(self): - self.push_with_producer(StringIOProducer(self.buf)) + buf = "HTTP/1.1 200 OK\r\n" + for key in filter(lambda k: k not in ("content-range", "content-length"), headers.dict.keys()): + buf += "%s: %s\r\n" % (key, headers.dict[key]) + if content_length: + buf += "Content-Length: %s\r\n" % content_length + buf += "Content-Range: bytes %s-%s/%s\r\n" % (0, content_length-1, content_length) + buf += "X-Proxy: Magicproxy (superpower activated)\r\n" + buf += "\r\n" + self.channel.push(buf) class HTTPChannel(asynchat.async_chat): @@ -219,7 +197,6 @@ class HTTPChannel(asynchat.async_chat): self.server = server self.data = StringIO.StringIO() - self.is_closed = False self.request = None asynchat.async_chat.__init__(self, sock) @@ -227,7 +204,6 @@ class HTTPChannel(asynchat.async_chat): def handle_close(self): self.close() - self.is_closed = True def collect_incoming_data(self, data): self.data.write(data) @@ -235,18 +211,17 @@ class HTTPChannel(asynchat.async_chat): self.close_when_done() def found_terminator(self): - if not self.request: - # parse http header - self.data.seek(0) - self.request = string.split(self.data.readline(), None, 2) - if len(self.request) != 3: - # badly formed request; just shut down - self.close_when_done() - else: - headers = httplib.HTTPMessage(self.data).dict - self.server.handle_request(self, self.request[0], self.request[1], headers) + # parse http header + self.data.seek(0) + self.request = string.split(self.data.readline(), None, 2) + if len(self.request) != 3: + # badly formed request; just shut down + self.close_when_done() else: - pass # ignore body data, for now + self.set_terminator(None) + headers = httplib.HTTPMessage(self.data).dict + self.server.handle_request(self, self.request[0], self.request[1], headers) + class HTTPProxyServer(asyncore.dispatcher): def __init__(self): @@ -256,16 +231,11 @@ class HTTPProxyServer(asyncore.dispatcher): self.create_socket(socket.AF_INET, socket.SOCK_STREAM) self.set_reuse_addr() self.bind(("", 8080)) - self.listen(5) + self.listen(15) def __str__(self): return "<HTTPProxyServer port=%s>" % self.port - def shutdown(self): - #TODO Hier Proxy sauber beenden - #self.channel.close_when_done() - sys.exit() - def handle_accept(self): conn, addr = self.accept() HTTPChannel(self, conn, addr) @@ -275,21 +245,8 @@ class HTTPProxyServer(asyncore.dispatcher): if method != "GET" or url.query != "": #do not handle non-GET or GET with Query (?foo=bla) requests return self._bypass_request(channel, method, url, headers) - - #check for content-length header with a HEAD request - conn = httplib.HTTPConnection(url.hostname, url.port or 80) - conn.request("HEAD", url.path) - resp = conn.getresponse() - content_length = filter(lambda it: it[0] == "content-length", resp.getheaders()) - if len( content_length ) == 0: - # no content length given, bypass this request - self._bypass_request(channel, "GET", url, headers) else: - content_length = int(content_length[0][1]) - if content_length < THRESHOLD: - self._bypass_request(channel, "GET", url, headers) - else: - MultipleProxyReader(channel, url, headers, content_length) + MagicHTTPProxyClient(channel, url, headers) def _bypass_request(self, channel, method, url, headers): proxy = ENDPOINTS[ int( md5.md5(url.hostname).hexdigest(),16 ) % len(ENDPOINTS) ] |