From c18e0ab17a23ddef42768151c6905818ad1d40d3 Mon Sep 17 00:00:00 2001 From: Yves Date: Fri, 2 Apr 2010 20:48:13 +0200 Subject: lays a egg --- magicproxy/__init__.py | 286 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 286 insertions(+) create mode 100755 magicproxy/__init__.py (limited to 'magicproxy/__init__.py') diff --git a/magicproxy/__init__.py b/magicproxy/__init__.py new file mode 100755 index 0000000..5ab9773 --- /dev/null +++ b/magicproxy/__init__.py @@ -0,0 +1,286 @@ +#!/usr/bin/python -t +import os, sys, string, time, md5, random +import asynchat, asyncore, socket, httplib, urlparse +from heapq import heappush, heappop +import cStringIO as StringIO + +kB = 1024 + +class DefaultConfiguration: + """bind to that""" + listen=("",8080) + + """available http-proxies""" + endpoints=[ ('10.2.2.11', 8888), ('10.3.1.2',8888) ] + + """minimum entity size to start parallelize fetch""" + threshold = 512*kB + + """initial size/range for a fetcher-job""" + initial_blocksize=512*kB + + """minimal size/range for a fetcher-job""" + minimal_blocksize=512*kB + + """(sec) #time each fetcher spent on his range, + calculated using speed measured while using initial_blocksize""" + time_slice=5 + + """start a new fetcher on a endpoint X-bytes before the old one finished""" + fetcher_jumpstart=32*kB + +################# + +class Fetcher(asynchat.async_chat): + def __init__(self, reader, proxy, url, headers, range): + self.reader = reader + self.proxy = proxy + self.url = url + self.headers = headers + self.range = range + + self.pos = (self.range[0] != -1) and self.range[0] or 0 + self.start_time = 0 + self.stop_time = 0 + self.http_status = "" + self.http_header = StringIO.StringIO() + self.state = 0 #0=status, 1=header, 2=body + asynchat.async_chat.__init__(self) + self.set_terminator("\r\n") + self.create_socket(socket.AF_INET, socket.SOCK_STREAM) + self.connect(self.proxy) + + def __str__(self): + return "= self.range[1] + 1: + #if this request is the first one (whithout Range: ...) then the server + # dont send us our expected range, we must cut it at some point (here) + bytes_remaining = self.range[1] - ( self.pos ) + data=data[:bytes_remaining+1] + print self,"cut: pos=%s length=%s => %s" % (self.pos, length, len(data)) + length = len(data) + if length == 0: + self.reader.handle_incoming_data(self) + self.close() + return + if not self.reader.handle_incoming_data(self, data, length): + self.close() + return + self.pos += length + if self.range != (-1,-1) and self.pos >= self.range[1]: + self.stop_time = time.time() + print self, "finished with %s kB/s" % (self.speed() / 1024) + self.reader.handle_incoming_data(self) + self.close() + elif self.state ==1: #header + self.http_header.write( data ) + else: #status + self.http_status += data + + def found_terminator(self): + if self.state == 0: #got status-line + self.state = 1 + self.set_terminator("\r\n\r\n") + elif self.state == 1: #got headers + self.state = 2 + self.set_terminator(None) + self.reader.handle_incoming_http_header(self, self.http_header) + +class MagicHTTPProxyClient(object): + def __init__(self, channel, url, header): + self.channel = channel + self.config = self.channel.server.config + self.url = url + self.header = header + + self.content_length = -1 + self.header_sent = False + self.fetch_pos = 0 + self.write_pos = 0 + self.buffer = "" + self.blocks = list() + self.fetchers = list() + + proxy = self.config.endpoints[ random.randint(0, len(self.config.endpoints)-1) ] + self.fetchers.append( Fetcher(self, proxy, self.url, self.header, (-1,-1)) ) + + def __str__(self): + return "" % (self.url.hostname, self.url.path, self.content_length) + + def handle_incoming_data(self, fetcher, data=None, length=0): + if not data: + #fetcher is done, remove from list + self.fetchers = filter(lambda f: f != fetcher, self.fetchers) + print "Remove: %s" % fetcher + else: + assert fetcher.pos < fetcher.range[1] or fetcher.range == (-1,-1) + heappush(self.blocks, (fetcher.pos, data, length)) + + if not self.channel.connected: + print self, "request side closed the connection" + return False + + if fetcher.range != (-1,-1) \ + and fetcher.range[1] - (fetcher.pos+length) < self.config.fetcher_jumpstart \ + and self.fetch_pos + 1 < self.content_length \ + and len( filter(lambda f: f.proxy == fetcher.proxy, self.fetchers) ) < 2: + #Start a new fetcher if this fetcher is X-Bytes before finished his job + blocksize = max(int(self.config.time_slice * fetcher.speed()), self.config.minimal_blocksize) + fetch_range = self.next_range(blocksize) + self.fetchers.append( Fetcher(self, fetcher.proxy, self.url, self.header, fetch_range) ) + + #if len(self.blocks)>0: + #print self,"fetch_pos=%s write_pos=%s get=%s with length=%s pending=%s" % (self.fetch_pos, self.write_pos, min(self.blocks)[0],min(self.blocks)[2], len(self.blocks)) + + buf = "" + while len(self.blocks)>0 and min(self.blocks)[0] == self.write_pos: + item = heappop(self.blocks) + buf += item[1] + self.write_pos += item[2] + + if buf != "": + self.channel.push(buf) + + if self.write_pos + 1 >= self.content_length: + print self, "job done %s blocks left" % len(self.blocks) + self.channel.close_when_done() + return True + + def next_range(self, suggested_blocksize): + assert self.content_length != -1 + start = self.fetch_pos + self.fetch_pos = min(self.fetch_pos + suggested_blocksize, self.content_length) + return (start, self.fetch_pos-1) + + def handle_incoming_http_header(self, fetcher, header): + if not self.channel.connected: + return + if self.header_sent: + pass + else: + self.header_sent = True + + # Sends header from first response + header.seek(0) + headers = httplib.HTTPMessage(header) + + content_length = filter(lambda i: i == "content-length", headers.dict.keys()) + if len(content_length) == 1: + content_length = int(headers.dict["content-length"]) + if content_length >= self.config.threshold: + self.content_length = content_length + fetcher.range = self.next_range(self.config.initial_blocksize) + for proxy in filter(lambda p: fetcher.proxy != p, self.config.endpoints): + if self.fetch_pos == self.content_length -1: + break + self.fetchers.append(Fetcher( self, proxy, self.url, self.header, self.next_range(self.config.initial_blocksize))) + + else: + content_length = None + + buf = "HTTP/1.1 200 OK\r\n" + for key in filter(lambda k: k not in ("content-range", "content-length"), headers.dict.keys()): + buf += "%s: %s\r\n" % (key, headers.dict[key]) + if content_length: + buf += "Content-Length: %s\r\n" % content_length + buf += "Content-Range: bytes %s-%s/%s\r\n" % (0, content_length-1, content_length) + buf += "X-Proxy: Magicproxy (superpower activated)\r\n" + buf += "\r\n" + self.channel.push(buf) + +class HTTPChannel(asynchat.async_chat): + def __init__(self, server, sock, addr): + self.server = server + self.data = StringIO.StringIO() + + asynchat.async_chat.__init__(self, sock) + self.set_terminator("\r\n\r\n") + + def handle_close(self): + self.connected = False + self.close() + + def collect_incoming_data(self, data): + self.data.write(data) + if self.data.tell() > 16384: + self.close_when_done() + + def found_terminator(self): + # parse http header + self.data.seek(0) + self.request = string.split(self.data.readline(), None, 2) + if len(self.request) != 3: + # badly formed request; just shut down + self.close_when_done() + else: + self.set_terminator(None) + headers = httplib.HTTPMessage(self.data).dict + self.handle_request(self.request[0], self.request[1], headers) + + def handle_request(self, method, path, headers): + url = urlparse.urlparse(path) + if method != "GET" or url.query != "": + #do not handle non-GET or GET with Query (?foo=bla) requests + proxy = self.server.config.endpoints[ int( md5.md5(url.hostname).hexdigest(),16 ) % len(self.server.config.endpoints) ] + print Fetcher(self, proxy, url, headers, (-1,-1)) + else: + MagicHTTPProxyClient(self, url, headers) + + def handle_incoming_http_header(self,fetcher,header): + header.seek(0) + headers = httplib.HTTPMessage(header) + buf = "HTTP/1.1 200 OK\r\n" + buf += "\r\n".join(map(lambda hdr: "%s: %s" % (hdr,headers.dict[hdr]), headers.dict.keys())) + buf += "\r\n\r\n" + self.push(buf) + + + def handle_incoming_data(self, fetcher, data=None, length=0): + if data: + self.push(data) + return True + +class HTTPProxyServer(asyncore.dispatcher): + def __init__(self,config): + self.config = config + asyncore.dispatcher.__init__(self) + self.create_socket(socket.AF_INET, socket.SOCK_STREAM) + self.set_reuse_addr() + self.bind(self.config.listen) + self.listen(5) + + def __str__(self): + return "" % self.config.listen + + def handle_accept(self): + conn, addr = self.accept() + HTTPChannel(self, conn, addr) + +def main(): + proxy = HTTPProxyServer(DefaultConfiguration) + print proxy + asyncore.loop() -- cgit v1.2.1 From 680fefcd3a73d46408f00b973578991395721a4d Mon Sep 17 00:00:00 2001 From: yvesf Date: Sat, 3 Apr 2010 23:36:38 +0200 Subject: handle http-status codes --- magicproxy/__init__.py | 45 ++++++++++++++++++++++++++++++--------------- 1 file changed, 30 insertions(+), 15 deletions(-) (limited to 'magicproxy/__init__.py') diff --git a/magicproxy/__init__.py b/magicproxy/__init__.py index 5ab9773..8a78a54 100755 --- a/magicproxy/__init__.py +++ b/magicproxy/__init__.py @@ -5,13 +5,15 @@ from heapq import heappush, heappop import cStringIO as StringIO kB = 1024 - class DefaultConfiguration: """bind to that""" listen=("",8080) """available http-proxies""" - endpoints=[ ('10.2.2.11', 8888), ('10.3.1.2',8888) ] + endpoints=[ + ('10.2.2.11', 8888), + #('10.3.1.2',8888) + ] """minimum entity size to start parallelize fetch""" threshold = 512*kB @@ -32,12 +34,12 @@ class DefaultConfiguration: ################# class Fetcher(asynchat.async_chat): - def __init__(self, reader, proxy, url, headers, range): + def __init__(self, reader, proxy, url, headers, fetch_range): self.reader = reader self.proxy = proxy self.url = url self.headers = headers - self.range = range + self.range = fetch_range self.pos = (self.range[0] != -1) and self.range[0] or 0 self.start_time = 0 @@ -103,12 +105,20 @@ class Fetcher(asynchat.async_chat): def found_terminator(self): if self.state == 0: #got status-line + status = self.http_status.split(" ") + if len(status) > 1: + try: + self.http_status_code = int(status[1]) + except: + self.http_status_code = 520 #Bad Gateway + else: + self.http_status_code = 520 #Bad Gateway self.state = 1 - self.set_terminator("\r\n\r\n") + self.set_terminator("\r\n\r\n") #end of header elif self.state == 1: #got headers self.state = 2 self.set_terminator(None) - self.reader.handle_incoming_http_header(self, self.http_header) + self.reader.handle_incoming_http_header(self, self.http_status_code, self.http_header) class MagicHTTPProxyClient(object): def __init__(self, channel, url, header): @@ -176,11 +186,13 @@ class MagicHTTPProxyClient(object): self.fetch_pos = min(self.fetch_pos + suggested_blocksize, self.content_length) return (start, self.fetch_pos-1) - def handle_incoming_http_header(self, fetcher, header): + def handle_incoming_http_header(self, fetcher, status_code, header): if not self.channel.connected: return if self.header_sent: - pass + if status_code < 200 or status_code >= 300: + print self, "Error: got error code %s in %s. Giving up" % (status_code, fetcher) + self.channel.close() else: self.header_sent = True @@ -189,6 +201,8 @@ class MagicHTTPProxyClient(object): headers = httplib.HTTPMessage(header) content_length = filter(lambda i: i == "content-length", headers.dict.keys()) + #if there are content-length headers decide if entity size is + #bigger then threshold, if true then start n proxies (n=#endpoints) if len(content_length) == 1: content_length = int(headers.dict["content-length"]) if content_length >= self.config.threshold: @@ -202,13 +216,14 @@ class MagicHTTPProxyClient(object): else: content_length = None - buf = "HTTP/1.1 200 OK\r\n" - for key in filter(lambda k: k not in ("content-range", "content-length"), headers.dict.keys()): - buf += "%s: %s\r\n" % (key, headers.dict[key]) + buf = "HTTP/1.1 %s OK\r\n" % (status_code) + buf += "".join(map(lambda key: "%s: %s\r\n" % (key, headers.dict[key]), + filter(lambda k: k not in ("content-range", "content-length"), + headers.dict.keys()))) if content_length: buf += "Content-Length: %s\r\n" % content_length - buf += "Content-Range: bytes %s-%s/%s\r\n" % (0, content_length-1, content_length) - buf += "X-Proxy: Magicproxy (superpower activated)\r\n" + buf += "Content-Range: bytes 0-%s/%s\r\n" % (content_length-1, content_length) + buf += "X-Proxy: Magicproxy; using proxies %s\r\n" % ", ".join(map(lambda host: "%s:%s"%host, self.config.endpoints)) buf += "\r\n" self.channel.push(buf) @@ -250,10 +265,10 @@ class HTTPChannel(asynchat.async_chat): else: MagicHTTPProxyClient(self, url, headers) - def handle_incoming_http_header(self,fetcher,header): + def handle_incoming_http_header(self,fetcher, status_code, header): header.seek(0) headers = httplib.HTTPMessage(header) - buf = "HTTP/1.1 200 OK\r\n" + buf = "HTTP/1.1 %s OK\r\n" % status_code buf += "\r\n".join(map(lambda hdr: "%s: %s" % (hdr,headers.dict[hdr]), headers.dict.keys())) buf += "\r\n\r\n" self.push(buf) -- cgit v1.2.1 From 179297b767ef58c2452c66276e53f31a0d8075cc Mon Sep 17 00:00:00 2001 From: Yves Date: Mon, 5 Apr 2010 11:53:26 +0200 Subject: update todo --- magicproxy/__init__.py | 3 +++ 1 file changed, 3 insertions(+) (limited to 'magicproxy/__init__.py') diff --git a/magicproxy/__init__.py b/magicproxy/__init__.py index 5ab9773..90ab741 100755 --- a/magicproxy/__init__.py +++ b/magicproxy/__init__.py @@ -1,4 +1,7 @@ #!/usr/bin/python -t +#TODO: ignore non-functional endpoints +#TODO: support Range: ... in requests + import os, sys, string, time, md5, random import asynchat, asyncore, socket, httplib, urlparse from heapq import heappush, heappop -- cgit v1.2.1