From a71a8ebe8fd2c7a25bf7188fbfcf73c068b0b84d Mon Sep 17 00:00:00 2001 From: Yves Date: Sat, 13 Mar 2010 15:55:16 +0100 Subject: vereinfacht --- proxy.py | 63 +++++++++++++++++++++++++++++++++++---------------------------- 1 file changed, 35 insertions(+), 28 deletions(-) (limited to 'proxy.py') diff --git a/proxy.py b/proxy.py index 44c7365..0e3f7b5 100755 --- a/proxy.py +++ b/proxy.py @@ -42,7 +42,7 @@ class Fetcher(asynchat.async_chat): self.start_time = 0 self.stop_time = 0 self.http_status = "" - self.http_header = "" + self.http_header = StringIO.StringIO() self.state = 0 #0=status, 1=header, 2=body asynchat.async_chat.__init__(self) @@ -70,8 +70,9 @@ class Fetcher(asynchat.async_chat): def collect_incoming_data(self, data): if self.state==2: #body - self.reader.handle_incoming_data(self, data) - self.pos += len(data) + length = len(data) + self.reader.handle_incoming_data(self, data, length) + self.pos += length if self.pos >= self.range[1]: self.stop_time = time.time() print self, "finished" @@ -79,7 +80,7 @@ class Fetcher(asynchat.async_chat): self.reader.handle_incoming_data(self) self.close() elif self.state ==1: #header - self.http_header += data + self.http_header.write( data ) else: #status self.http_status += data @@ -95,6 +96,15 @@ class Fetcher(asynchat.async_chat): def __str__(self): return "0 and min(self.blocks)[0] == self.write_pos: + item = heappop(self.blocks) + buf.write(item[1]) + self.write_pos += item[2] + + if buf.tell() > 0: + self.channel.push_with_producer(StringIOProducer(buf)) + + if self.write_pos + 1 >= self.content_length: + print self, "job done %s blocks left" % len(self.blocks) + self.channel.is_closed = True + self.channel.close_when_done() def next_range(self, suggested_blocksize): start = self.fetch_pos @@ -143,33 +168,15 @@ class MultipleProxyReader(object): self.channel.push("HTTP/1.0 200 OK\r\n") # Sends header from first response - headers = httplib.HTTPMessage(StringIO.StringIO(header)) + header.seek(0) + headers = httplib.HTTPMessage(header) for key in filter(lambda k: k not in ("content-range", "content-length"), headers.dict.keys()): self.channel.push("%s: %s\r\n" % (key, headers.dict[key])) self.channel.push("Content-Length: %s" % self.content_length) self.channel.push("X-Proxy: Magicproxy (superpower activated)\r\n") self.channel.push("\r\n") - def send_next_data(self): - if self.channel.is_closed: - print self, "request side closed the connection" - self.channel.close_when_done() - #XXX terminate all running fetcher - return False - - #print self, "expect data at %s in" % self.write_pos, self.blocks.keys() - if len(self.blocks)>0 and min(self.blocks)[0] == self.write_pos: - item = heappop(self.blocks) - self.channel.push(item[1]) - self.write_pos += len(item[1]) - return True - - if self.write_pos + 1 >= self.content_length: - print self, "job done %s blocks left" % len(self.blocks) - #XXX prevent next calls to send_next_data - self.channel.close_when_done() - return False def __str__(self): return "" % (urlparse.urlunparse(self.url), self.content_length) -- cgit v1.2.1 From 94e33924fad7b7a38d6d1e848d01b5e830a45360 Mon Sep 17 00:00:00 2001 From: Thomas Keck Date: Sat, 13 Mar 2010 16:19:20 +0100 Subject: First setting implemented --- proxy.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'proxy.py') diff --git a/proxy.py b/proxy.py index da30636..0b63c52 100755 --- a/proxy.py +++ b/proxy.py @@ -122,7 +122,7 @@ class MultipleProxyReader(object): and self.fetch_pos + 1 < self.content_length and not self.channel.is_closed \ and len( filter( (lambda f: f.proxy == fetcher.proxy), self.fetchers) ) < 2: #Start a new fetcher on this line if this fetchers is X-Bytes before finishing his jobs - blocksize = min(TIME_SLICE * int(fetcher.speed()), MIN_BLOCKSIZE) + blocksize = min(int(TIME_SLICE * fetcher.speed()), MIN_BLOCKSIZE) fetch_range = self.next_range(blocksize) print "Start new Fetcher, bs=%s range=%s" % (blocksize,fetch_range) self.fetchers.append( Fetcher(self, fetcher.proxy, self.url, self.header, fetch_range) ) -- cgit v1.2.1 From 9b0ffb71e5b47fff2095c24551b9e0fce4c0650e Mon Sep 17 00:00:00 2001 From: Thomas Keck Date: Sat, 13 Mar 2010 17:35:28 +0100 Subject: Min/Max mistake at blocksize calculation --- proxy.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'proxy.py') diff --git a/proxy.py b/proxy.py index 99793a8..ffd09a0 100755 --- a/proxy.py +++ b/proxy.py @@ -136,7 +136,7 @@ class MultipleProxyReader(object): and self.fetch_pos + 1 < self.content_length and not self.channel.is_closed \ and len( filter( (lambda f: f.proxy == fetcher.proxy), self.fetchers) ) < 2: #Start a new fetcher on this line if this fetchers is X-Bytes before finishing his jobs - blocksize = min(int(TIME_SLICE * fetcher.speed()), MIN_BLOCKSIZE) + blocksize = max(int(TIME_SLICE * fetcher.speed()), MIN_BLOCKSIZE) fetch_range = self.next_range(blocksize) print "Start new Fetcher, bs=%s range=%s" % (blocksize,fetch_range) self.fetchers.append( Fetcher(self, fetcher.proxy, self.url, self.header, fetch_range) ) -- cgit v1.2.1 From 9be7d75ba371027b0a1e0e7d289e3ac2639a4ae7 Mon Sep 17 00:00:00 2001 From: Yves Date: Sun, 14 Mar 2010 00:04:44 +0100 Subject: fix content-length and better request bypassing --- proxy.py | 80 +++++++++++++++++++++++++++++++++++++++++----------------------- 1 file changed, 52 insertions(+), 28 deletions(-) (limited to 'proxy.py') diff --git a/proxy.py b/proxy.py index 99793a8..0d50d67 100755 --- a/proxy.py +++ b/proxy.py @@ -1,5 +1,5 @@ #!/usr/bin/python -t -import os, sys, string, time +import os, sys, string, time, md5 import asynchat, asyncore, socket, httplib, urlparse from heapq import heappush, heappop try: @@ -11,7 +11,7 @@ except ImportError: ENDPOINTS = [ ('10.2.2.11', 8888), ('10.3.1.2', 8888), -# ('10.1.1.156', 8888), + ('10.1.1.156', 8888), ] kB = 1024 @@ -50,6 +50,9 @@ class Fetcher(asynchat.async_chat): self.create_socket(socket.AF_INET, socket.SOCK_STREAM) self.connect(self.proxy) + def __str__(self): + return "" % (urlparse.urlunparse(self.url), self.content_length) + def handle_incoming_data(self, fetcher, data=None, length=0): if not data: self.fetchers = filter(lambda f: f != fetcher, self.fetchers) @@ -172,15 +176,44 @@ class MultipleProxyReader(object): headers = httplib.HTTPMessage(header) for key in filter(lambda k: k not in ("content-range", "content-length"), headers.dict.keys()): self.channel.push("%s: %s\r\n" % (key, headers.dict[key])) - self.channel.push("Content-Length: %s" % self.content_length) + self.channel.push("Content-Length: %s\r\n" % self.content_length) self.channel.push("X-Proxy: Magicproxy (superpower activated)\r\n") self.channel.push("\r\n") +"""Transparent forward to other proxy server""" +class HTTPProxyClient(asynchat.async_chat): + def __init__(self, proxy, channel, method, url, headers): + self.proxy = proxy + self.other = channel + self.method = method + self.headers = headers + asynchat.async_chat.__init__(self) + self.set_terminator(None) + self.create_socket(socket.AF_INET, socket.SOCK_STREAM) + self.connect(self.proxy) + self.buf = StringIO.StringIO() + self.buf.write("%s %s HTTP/1.0\r\n" % (method, urlparse.urlunparse(url))) + for key in headers.keys(): + self.buf.write("%s: %s\r\n" % (key, headers[key])) + self.buf.write("\r\n") + def __str__(self): - return "" % (urlparse.urlunparse(self.url), self.content_length) + return "" % self.proxy + def collect_incoming_data(self, data): + self.other.push(data) + + def handle_close(self): + self.close() + self.other.close_when_done() +# print self, "Done" + + def handle_connect(self): + self.push_with_producer(StringIOProducer(self.buf)) + + class HTTPChannel(asynchat.async_chat): def __init__(self, server, sock, addr): self.server = server @@ -193,6 +226,7 @@ class HTTPChannel(asynchat.async_chat): self.set_terminator("\r\n\r\n") def handle_close(self): + self.close() self.is_closed = True def collect_incoming_data(self, data): @@ -224,6 +258,9 @@ class HTTPProxyServer(asyncore.dispatcher): self.bind(("", 8080)) self.listen(5) + def __str__(self): + return "" % self.port + def shutdown(self): #TODO Hier Proxy sauber beenden #self.channel.close_when_done() @@ -246,32 +283,19 @@ class HTTPProxyServer(asyncore.dispatcher): content_length = filter(lambda it: it[0] == "content-length", resp.getheaders()) if len( content_length ) == 0: # no content length given, bypass this request - print "missing content-length, bypass" self._bypass_request(channel, "GET", url, headers) else: content_length = int(content_length[0][1]) - - if content_length < THRESHOLD: - self._bypass_request(channel, "GET", url, headers) - else: - MultipleProxyReader(channel, url, headers, content_length) + if content_length < THRESHOLD: + self._bypass_request(channel, "GET", url, headers) + else: + MultipleProxyReader(channel, url, headers, content_length) def _bypass_request(self, channel, method, url, headers): - print "_bypass request: %s %s" % (method, urlparse.urlunparse(url)) - #XXX hier sollte nicht proxy gespielt werden sondern - #die daten 1-zu-1 durchgereicht werden. - #Weiterhin sollte sichergestellt werden, dass die requests - #zu Host X1 immer ueber Proxy Y1 geroutet werden - # etwa proxy=proxies[ stuff(hostname) % len(proxies) ] - conn = httplib.HTTPConnection(url.hostname, url.port or 80) - conn.request(method, url.path, body="", headers=headers) - resp = conn.getresponse() - channel.push("HTTP/1.0 200 OK\r\nX-Proxy: Magicproxy (superpower disabled)\r\n") - channel.push( "\r\n".join(map(lambda k: "%s: %s" % (k[0],k[1]), resp.getheaders())) ) - channel.push("\r\n\r\n") - channel.push(resp.read()) - channel.close_when_done() - + proxy = ENDPOINTS[ int( md5.md5(url.hostname).hexdigest(),16 ) % len(ENDPOINTS) ] + print self, "_bypass request via %s: %s %s" % (proxy, method, urlparse.urlunparse(url)) + HTTPProxyClient(proxy, channel, method, url, headers) + if __name__ == "__main__": proxy = HTTPProxyServer() asyncore.loop() -- cgit v1.2.1 From 90e44d2a7f88e73bba511f945dba60cdc9cefae6 Mon Sep 17 00:00:00 2001 From: yvesf Date: Sun, 14 Mar 2010 20:28:23 +0100 Subject: better request bypassing logic but somehow broken magicproxy abilities --- proxy.py | 175 ++++++++++++++++++++++++--------------------------------------- 1 file changed, 66 insertions(+), 109 deletions(-) (limited to 'proxy.py') diff --git a/proxy.py b/proxy.py index 0d50d67..393082f 100755 --- a/proxy.py +++ b/proxy.py @@ -1,16 +1,13 @@ #!/usr/bin/python -t -import os, sys, string, time, md5 +import os, sys, string, time, md5, random import asynchat, asyncore, socket, httplib, urlparse from heapq import heappush, heappop -try: - import cStringIO as StringIO -except ImportError: - import StringIO +import cStringIO as StringIO ENDPOINTS = [ ('10.2.2.11', 8888), - ('10.3.1.2', 8888), +# ('10.3.1.2', 8888), ('10.1.1.156', 8888), ] @@ -38,7 +35,7 @@ class Fetcher(asynchat.async_chat): self.headers = headers self.range = range - self.pos = self.range[0] + self.pos = (self.range[0] != -1) and self.range[0] or 0 self.start_time = 0 self.stop_time = 0 self.http_status = "" @@ -58,7 +55,8 @@ class Fetcher(asynchat.async_chat): self.send("GET http://%s:%s%s HTTP/1.0\r\n" % ( self.url.hostname, self.url.port or 80, self.url.path )) for key in filter(lambda k: k not in ("range"), self.headers.keys()): #send origin request headers self.send("%s: %s\r\n" % (key, self.headers[key])) - self.send("Range: bytes=%s-%s\r\n" % (self.range[0], self.range[1])) + if self.range != (-1,-1): + self.send("Range: bytes=%s-%s\r\n" % (self.range[0], self.range[1])) self.send("\r\n") self.start_time = time.time() @@ -76,9 +74,9 @@ class Fetcher(asynchat.async_chat): length = len(data) self.reader.handle_incoming_data(self, data, length) self.pos += length - if self.pos >= self.range[1]: + if self.range != (-1,-1) and self.pos >= self.range[1]: self.stop_time = time.time() - print self, "finished" + print self, "finished with %s kB/s" % (self.speed() / 1024) #make sure the next fetcher will be started self.reader.handle_incoming_data(self) self.close() @@ -94,25 +92,15 @@ class Fetcher(asynchat.async_chat): elif self.state == 1: #got headers self.state = 2 self.set_terminator(None) - self.reader.handle_incoming_http_header(self.http_header) + self.reader.handle_incoming_http_header(self, self.http_header) -class StringIOProducer(object): - def __init__(self,buf, amt=1440): - self.buf=buf - self.amt = amt - self.buf.seek(0) - - def more(self): - return self.buf.read(self.amt) - -class MultipleProxyReader(object): - def __init__(self, channel, url, header, content_length): +class MagicHTTPProxyClient(object): + def __init__(self, channel, url, header): self.channel = channel self.url = url self.header = header - self.content_length = content_length - print self, "New Instance" - + + self.content_length = -1 self.header_sent = False self.fetch_pos = 0 self.write_pos = 0 @@ -120,98 +108,88 @@ class MultipleProxyReader(object): self.blocks = list() self.fetchers = list() - for proxy in ENDPOINTS: - self.fetchers.append( Fetcher(self, proxy, self.url, self.header, self.next_range(INIT_BLOCKSIZE)) ) + print self, "New Instance" + + proxy = ENDPOINTS[ random.randint(0, len(ENDPOINTS)-1) ] + self.fetchers.append( Fetcher(self, proxy, self.url, self.header, (-1,-1)) ) def __str__(self): - return "" % (urlparse.urlunparse(self.url), self.content_length) + return "" % (urlparse.urlunparse(self.url), self.content_length) def handle_incoming_data(self, fetcher, data=None, length=0): if not data: + #fetcher is done, remove from list self.fetchers = filter(lambda f: f != fetcher, self.fetchers) else: heappush(self.blocks, (fetcher.pos, data, length)) - if self.channel.is_closed: + if not self.channel.connected: print self, "request side closed the connection" return - if fetcher.range[1] - fetcher.pos < FETCHER_JUMPSTART \ - and self.fetch_pos + 1 < self.content_length and not self.channel.is_closed \ - and len( filter( (lambda f: f.proxy == fetcher.proxy), self.fetchers) ) < 2: - #Start a new fetcher on this line if this fetchers is X-Bytes before finishing his jobs + if fetcher.range != (-1,-1) \ + and fetcher.range[1] - fetcher.pos < FETCHER_JUMPSTART \ + and self.fetch_pos + 1 < self.content_length and self.channel.connected \ + and len( filter(lambda f: f.proxy == fetcher.proxy, self.fetchers) ) < 2: + #Start a new fetcher on this line if this fetchers is X-Bytes before finished his job blocksize = min(int(TIME_SLICE * fetcher.speed()), MIN_BLOCKSIZE) fetch_range = self.next_range(blocksize) print "Start new Fetcher, bs=%s range=%s" % (blocksize,fetch_range) self.fetchers.append( Fetcher(self, fetcher.proxy, self.url, self.header, fetch_range) ) - buf = StringIO.StringIO() + buf = "" while len(self.blocks)>0 and min(self.blocks)[0] == self.write_pos: item = heappop(self.blocks) - buf.write(item[1]) + buf += item[1] self.write_pos += item[2] - - if buf.tell() > 0: - self.channel.push_with_producer(StringIOProducer(buf)) + if len(self.blocks)>0: + print "missed: %s => %s" % (self.write_pos, min(self.blocks)[0]) + if buf != "": + self.channel.push(buf) if self.write_pos + 1 >= self.content_length: print self, "job done %s blocks left" % len(self.blocks) - self.channel.is_closed = True self.channel.close_when_done() def next_range(self, suggested_blocksize): + assert self.content_length != -1 start = self.fetch_pos self.fetch_pos = min(self.fetch_pos + suggested_blocksize, self.content_length) return (start, self.fetch_pos-1) - def handle_incoming_http_header(self, header): + def handle_incoming_http_header(self, fetcher, header): if self.header_sent: pass else: self.header_sent = True - self.channel.push("HTTP/1.0 200 OK\r\n") # Sends header from first response header.seek(0) headers = httplib.HTTPMessage(header) - for key in filter(lambda k: k not in ("content-range", "content-length"), headers.dict.keys()): - self.channel.push("%s: %s\r\n" % (key, headers.dict[key])) - self.channel.push("Content-Length: %s\r\n" % self.content_length) - self.channel.push("X-Proxy: Magicproxy (superpower activated)\r\n") - self.channel.push("\r\n") - -"""Transparent forward to other proxy server""" -class HTTPProxyClient(asynchat.async_chat): - def __init__(self, proxy, channel, method, url, headers): - self.proxy = proxy - self.other = channel - self.method = method - self.headers = headers - - asynchat.async_chat.__init__(self) - self.set_terminator(None) - self.create_socket(socket.AF_INET, socket.SOCK_STREAM) - self.connect(self.proxy) - - self.buf = StringIO.StringIO() - self.buf.write("%s %s HTTP/1.0\r\n" % (method, urlparse.urlunparse(url))) - for key in headers.keys(): - self.buf.write("%s: %s\r\n" % (key, headers[key])) - self.buf.write("\r\n") - - def __str__(self): - return "" % self.proxy - def collect_incoming_data(self, data): - self.other.push(data) + content_length = filter(lambda i: i == "content-length", headers.dict.keys()) + if len(content_length) == 1: + content_length = int(headers.dict["content-length"]) + if content_length >= THRESHOLD: + self.content_length = content_length + fetcher.range = self.next_range(INIT_BLOCKSIZE) + for proxy in filter(lambda p: fetcher.proxy != p, ENDPOINTS): + if self.fetch_pos == self.content_length -1: + break + self.fetchers.append(Fetcher( self, proxy, self.url, self.header, self.next_range(INIT_BLOCKSIZE))) - def handle_close(self): - self.close() - self.other.close_when_done() -# print self, "Done" + else: + content_length = None - def handle_connect(self): - self.push_with_producer(StringIOProducer(self.buf)) + buf = "HTTP/1.1 200 OK\r\n" + for key in filter(lambda k: k not in ("content-range", "content-length"), headers.dict.keys()): + buf += "%s: %s\r\n" % (key, headers.dict[key]) + if content_length: + buf += "Content-Length: %s\r\n" % content_length + buf += "Content-Range: bytes %s-%s/%s\r\n" % (0, content_length-1, content_length) + buf += "X-Proxy: Magicproxy (superpower activated)\r\n" + buf += "\r\n" + self.channel.push(buf) class HTTPChannel(asynchat.async_chat): @@ -219,7 +197,6 @@ class HTTPChannel(asynchat.async_chat): self.server = server self.data = StringIO.StringIO() - self.is_closed = False self.request = None asynchat.async_chat.__init__(self, sock) @@ -227,7 +204,6 @@ class HTTPChannel(asynchat.async_chat): def handle_close(self): self.close() - self.is_closed = True def collect_incoming_data(self, data): self.data.write(data) @@ -235,18 +211,17 @@ class HTTPChannel(asynchat.async_chat): self.close_when_done() def found_terminator(self): - if not self.request: - # parse http header - self.data.seek(0) - self.request = string.split(self.data.readline(), None, 2) - if len(self.request) != 3: - # badly formed request; just shut down - self.close_when_done() - else: - headers = httplib.HTTPMessage(self.data).dict - self.server.handle_request(self, self.request[0], self.request[1], headers) + # parse http header + self.data.seek(0) + self.request = string.split(self.data.readline(), None, 2) + if len(self.request) != 3: + # badly formed request; just shut down + self.close_when_done() else: - pass # ignore body data, for now + self.set_terminator(None) + headers = httplib.HTTPMessage(self.data).dict + self.server.handle_request(self, self.request[0], self.request[1], headers) + class HTTPProxyServer(asyncore.dispatcher): def __init__(self): @@ -256,16 +231,11 @@ class HTTPProxyServer(asyncore.dispatcher): self.create_socket(socket.AF_INET, socket.SOCK_STREAM) self.set_reuse_addr() self.bind(("", 8080)) - self.listen(5) + self.listen(15) def __str__(self): return "" % self.port - def shutdown(self): - #TODO Hier Proxy sauber beenden - #self.channel.close_when_done() - sys.exit() - def handle_accept(self): conn, addr = self.accept() HTTPChannel(self, conn, addr) @@ -275,21 +245,8 @@ class HTTPProxyServer(asyncore.dispatcher): if method != "GET" or url.query != "": #do not handle non-GET or GET with Query (?foo=bla) requests return self._bypass_request(channel, method, url, headers) - - #check for content-length header with a HEAD request - conn = httplib.HTTPConnection(url.hostname, url.port or 80) - conn.request("HEAD", url.path) - resp = conn.getresponse() - content_length = filter(lambda it: it[0] == "content-length", resp.getheaders()) - if len( content_length ) == 0: - # no content length given, bypass this request - self._bypass_request(channel, "GET", url, headers) else: - content_length = int(content_length[0][1]) - if content_length < THRESHOLD: - self._bypass_request(channel, "GET", url, headers) - else: - MultipleProxyReader(channel, url, headers, content_length) + MagicHTTPProxyClient(channel, url, headers) def _bypass_request(self, channel, method, url, headers): proxy = ENDPOINTS[ int( md5.md5(url.hostname).hexdigest(),16 ) % len(ENDPOINTS) ] -- cgit v1.2.1 From c739d682aab415cb5e18558622eba92d1e2110ca Mon Sep 17 00:00:00 2001 From: Yves Date: Tue, 16 Mar 2010 19:19:41 +0100 Subject: fix error from previous commit, contains still some debugging stuff --- proxy.py | 26 +++++++++++++++++++------- 1 file changed, 19 insertions(+), 7 deletions(-) (limited to 'proxy.py') diff --git a/proxy.py b/proxy.py index ee5d083..eb408b6 100755 --- a/proxy.py +++ b/proxy.py @@ -6,9 +6,9 @@ import cStringIO as StringIO ENDPOINTS = [ - ('10.2.2.11', 8888), -# ('10.3.1.2', 8888), - ('10.1.1.156', 8888), +# ('10.2.2.11', 8888), + ('10.3.1.2', 8888), +# ('10.1.1.156', 8888), ] kB = 1024 @@ -48,7 +48,7 @@ class Fetcher(asynchat.async_chat): self.connect(self.proxy) def __str__(self): - return "= self.range[1]: + #if this request is the first one (whithout Range: ...) then the server + # dont send us our expected range, we must cut it at some point (here) + data=data[:(self.range[1]-self.pos+1)] #XXX explain this + print "cut range=%s pos=%s length=%s => %s" % (self.range, self.pos, length, len(data)) + length = len(data) + if length == 0: + self.reader.handle_incoming_data(self) + self.close() + return self.reader.handle_incoming_data(self, data, length) self.pos += length if self.range != (-1,-1) and self.pos >= self.range[1]: @@ -121,6 +131,7 @@ class MagicHTTPProxyClient(object): #fetcher is done, remove from list self.fetchers = filter(lambda f: f != fetcher, self.fetchers) else: + assert fetcher.pos < fetcher.range[1] or fetcher.range == (-1,-1) heappush(self.blocks, (fetcher.pos, data, length)) if not self.channel.connected: @@ -131,7 +142,7 @@ class MagicHTTPProxyClient(object): and fetcher.range[1] - fetcher.pos < FETCHER_JUMPSTART \ and self.fetch_pos + 1 < self.content_length and self.channel.connected \ and len( filter(lambda f: f.proxy == fetcher.proxy, self.fetchers) ) < 2: - #Start a new fetcher on this line if this fetchers is X-Bytes before finished his job + #Start a new fetcher if this fetcher is X-Bytes before finished his job blocksize = max(int(TIME_SLICE * fetcher.speed()), MIN_BLOCKSIZE) fetch_range = self.next_range(blocksize) print "Start new Fetcher, bs=%s range=%s" % (blocksize,fetch_range) @@ -142,8 +153,9 @@ class MagicHTTPProxyClient(object): item = heappop(self.blocks) buf += item[1] self.write_pos += item[2] - if len(self.blocks)>0: - print "missed: %s => %s" % (self.write_pos, min(self.blocks)[0]) + + if buf == "" and len(self.blocks)>0: + print "search=%s get=%s with length=%s pending=%s" % (self.write_pos, min(self.blocks)[0],min(self.blocks)[2], len(self.blocks)) if buf != "": self.channel.push(buf) -- cgit v1.2.1 From e40b169239b9a257be66480c164554652d2123bb Mon Sep 17 00:00:00 2001 From: Yves Date: Wed, 17 Mar 2010 00:14:06 +0100 Subject: blablablablabla --- proxy.py | 80 +++++++++++++++++++++++++++++++++++++++++++++------------------- 1 file changed, 57 insertions(+), 23 deletions(-) (limited to 'proxy.py') diff --git a/proxy.py b/proxy.py index eb408b6..ac01249 100755 --- a/proxy.py +++ b/proxy.py @@ -41,7 +41,7 @@ class Fetcher(asynchat.async_chat): self.http_status = "" self.http_header = StringIO.StringIO() self.state = 0 #0=status, 1=header, 2=body - + print self, "__init__" asynchat.async_chat.__init__(self) self.set_terminator("\r\n") self.create_socket(socket.AF_INET, socket.SOCK_STREAM) @@ -52,12 +52,13 @@ class Fetcher(asynchat.async_chat): def handle_connect (self): print self, "Start" - self.send("GET http://%s:%s%s HTTP/1.0\r\n" % ( self.url.hostname, self.url.port or 80, self.url.path )) + buf = "GET http://%s:%s%s HTTP/1.0\r\n" % ( self.url.hostname, self.url.port or 80, self.url.path ) for key in filter(lambda k: k not in ("range"), self.headers.keys()): #send origin request headers - self.send("%s: %s\r\n" % (key, self.headers[key])) + buf += "%s: %s\r\n" % (key, self.headers[key]) if self.range != (-1,-1): - self.send("Range: bytes=%s-%s\r\n" % (self.range[0], self.range[1])) - self.send("\r\n") + buf += "Range: bytes=%s-%s\r\n" % (self.range[0], self.range[1]) + buf += "\r\n" + self.push(buf) self.start_time = time.time() def time(self): @@ -72,23 +73,24 @@ class Fetcher(asynchat.async_chat): def collect_incoming_data(self, data): if self.state==2: #body length = len(data) - if self.range != (-1,-1) and self.pos + length >= self.range[1]: + if self.range != (-1,-1) and self.pos + length > self.range[1]: #if this request is the first one (whithout Range: ...) then the server # dont send us our expected range, we must cut it at some point (here) - data=data[:(self.range[1]-self.pos+1)] #XXX explain this - print "cut range=%s pos=%s length=%s => %s" % (self.range, self.pos, length, len(data)) + bytes_remaining = self.range[1] - self.pos + 1 #XXX + data=data[:bytes_remaining] + print self,"cut: pos=%s length=%s => %s" % (self.pos, length, len(data)) length = len(data) if length == 0: self.reader.handle_incoming_data(self) self.close() return - self.reader.handle_incoming_data(self, data, length) + if not self.reader.handle_incoming_data(self, data, length): + self.close() + return self.pos += length if self.range != (-1,-1) and self.pos >= self.range[1]: self.stop_time = time.time() print self, "finished with %s kB/s" % (self.speed() / 1024) - #make sure the next fetcher will be started - self.reader.handle_incoming_data(self) self.close() elif self.state ==1: #header self.http_header.write( data ) @@ -118,13 +120,11 @@ class MagicHTTPProxyClient(object): self.blocks = list() self.fetchers = list() - print self, "New Instance" - proxy = ENDPOINTS[ random.randint(0, len(ENDPOINTS)-1) ] self.fetchers.append( Fetcher(self, proxy, self.url, self.header, (-1,-1)) ) def __str__(self): - return "" % (urlparse.urlunparse(self.url), self.content_length) + return "" % (self.url.hostname, self.url.path, self.content_length) def handle_incoming_data(self, fetcher, data=None, length=0): if not data: @@ -136,32 +136,33 @@ class MagicHTTPProxyClient(object): if not self.channel.connected: print self, "request side closed the connection" - return + return False if fetcher.range != (-1,-1) \ - and fetcher.range[1] - fetcher.pos < FETCHER_JUMPSTART \ + and fetcher.range[1] - (fetcher.pos+length) < FETCHER_JUMPSTART \ and self.fetch_pos + 1 < self.content_length and self.channel.connected \ and len( filter(lambda f: f.proxy == fetcher.proxy, self.fetchers) ) < 2: #Start a new fetcher if this fetcher is X-Bytes before finished his job blocksize = max(int(TIME_SLICE * fetcher.speed()), MIN_BLOCKSIZE) fetch_range = self.next_range(blocksize) - print "Start new Fetcher, bs=%s range=%s" % (blocksize,fetch_range) self.fetchers.append( Fetcher(self, fetcher.proxy, self.url, self.header, fetch_range) ) + if len(self.blocks)>0: + print self,"fetch_pos=%s write_pos=%s get=%s with length=%s pending=%s" % (self.fetch_pos, self.write_pos, min(self.blocks)[0],min(self.blocks)[2], len(self.blocks)) + buf = "" while len(self.blocks)>0 and min(self.blocks)[0] == self.write_pos: item = heappop(self.blocks) buf += item[1] self.write_pos += item[2] - if buf == "" and len(self.blocks)>0: - print "search=%s get=%s with length=%s pending=%s" % (self.write_pos, min(self.blocks)[0],min(self.blocks)[2], len(self.blocks)) if buf != "": self.channel.push(buf) if self.write_pos + 1 >= self.content_length: print self, "job done %s blocks left" % len(self.blocks) self.channel.close_when_done() + return True def next_range(self, suggested_blocksize): assert self.content_length != -1 @@ -170,6 +171,8 @@ class MagicHTTPProxyClient(object): return (start, self.fetch_pos-1) def handle_incoming_http_header(self, fetcher, header): + if not self.channel.connected: + return if self.header_sent: pass else: @@ -203,18 +206,49 @@ class MagicHTTPProxyClient(object): buf += "\r\n" self.channel.push(buf) - +"""Transparent forward to other proxy server""" +class HTTPProxyClient(asynchat.async_chat): + def __init__(self, proxy, channel, method, url, headers): + self.proxy = proxy + self.other = channel + self.method = method + self.headers = headers + + asynchat.async_chat.__init__(self) + self.set_terminator(None) + self.create_socket(socket.AF_INET, socket.SOCK_STREAM) + self.connect(self.proxy) + + self.buf = "" + self.buf += "%s %s HTTP/1.0\r\n" % (method, urlparse.urlunparse(url)) + for key in headers.keys(): + self.buf += "%s: %s\r\n" % (key, headers[key]) + self.buf += "\r\n" + + def __str__(self): + return "" % self.proxy + + def collect_incoming_data(self, data): + self.other.push(data) + + def handle_close(self): + self.close() + self.other.close_when_done() + + def handle_connect(self): + self.push(self.buf) + class HTTPChannel(asynchat.async_chat): def __init__(self, server, sock, addr): self.server = server self.data = StringIO.StringIO() - self.request = None asynchat.async_chat.__init__(self, sock) self.set_terminator("\r\n\r\n") def handle_close(self): + self.connected = False self.close() def collect_incoming_data(self, data): @@ -243,7 +277,7 @@ class HTTPProxyServer(asyncore.dispatcher): self.create_socket(socket.AF_INET, socket.SOCK_STREAM) self.set_reuse_addr() self.bind(("", 8080)) - self.listen(15) + self.listen(5) def __str__(self): return "" % self.port @@ -254,7 +288,7 @@ class HTTPProxyServer(asyncore.dispatcher): def handle_request(self, channel, method, path, headers): url = urlparse.urlparse(path) - if method != "GET" or url.query != "": + if method != "GET": #do not handle non-GET or GET with Query (?foo=bla) requests return self._bypass_request(channel, method, url, headers) else: -- cgit v1.2.1 From b0d0da7cfc7116a0234b2021d03085a9ce7f5595 Mon Sep 17 00:00:00 2001 From: Yves Date: Sun, 28 Mar 2010 03:35:17 +0000 Subject: fix job partitioning logic, but no error handling so far --- proxy.py | 52 +++++++++++++++++++++++++++++----------------------- 1 file changed, 29 insertions(+), 23 deletions(-) (limited to 'proxy.py') diff --git a/proxy.py b/proxy.py index ac01249..c11236d 100755 --- a/proxy.py +++ b/proxy.py @@ -6,8 +6,8 @@ import cStringIO as StringIO ENDPOINTS = [ -# ('10.2.2.11', 8888), - ('10.3.1.2', 8888), + ('10.2.2.11', 8888), +# ('10.3.1.2', 8888), # ('10.1.1.156', 8888), ] @@ -73,11 +73,11 @@ class Fetcher(asynchat.async_chat): def collect_incoming_data(self, data): if self.state==2: #body length = len(data) - if self.range != (-1,-1) and self.pos + length > self.range[1]: + if self.range != (-1,-1) and self.pos + length >= self.range[1] + 1: #if this request is the first one (whithout Range: ...) then the server # dont send us our expected range, we must cut it at some point (here) - bytes_remaining = self.range[1] - self.pos + 1 #XXX - data=data[:bytes_remaining] + bytes_remaining = self.range[1] - ( self.pos ) + data=data[:bytes_remaining+1] print self,"cut: pos=%s length=%s => %s" % (self.pos, length, len(data)) length = len(data) if length == 0: @@ -91,6 +91,7 @@ class Fetcher(asynchat.async_chat): if self.range != (-1,-1) and self.pos >= self.range[1]: self.stop_time = time.time() print self, "finished with %s kB/s" % (self.speed() / 1024) + self.reader.handle_incoming_data(self) self.close() elif self.state ==1: #header self.http_header.write( data ) @@ -130,6 +131,7 @@ class MagicHTTPProxyClient(object): if not data: #fetcher is done, remove from list self.fetchers = filter(lambda f: f != fetcher, self.fetchers) + print "Remove: %s" % fetcher else: assert fetcher.pos < fetcher.range[1] or fetcher.range == (-1,-1) heappush(self.blocks, (fetcher.pos, data, length)) @@ -140,15 +142,15 @@ class MagicHTTPProxyClient(object): if fetcher.range != (-1,-1) \ and fetcher.range[1] - (fetcher.pos+length) < FETCHER_JUMPSTART \ - and self.fetch_pos + 1 < self.content_length and self.channel.connected \ + and self.fetch_pos + 1 < self.content_length \ and len( filter(lambda f: f.proxy == fetcher.proxy, self.fetchers) ) < 2: #Start a new fetcher if this fetcher is X-Bytes before finished his job blocksize = max(int(TIME_SLICE * fetcher.speed()), MIN_BLOCKSIZE) fetch_range = self.next_range(blocksize) self.fetchers.append( Fetcher(self, fetcher.proxy, self.url, self.header, fetch_range) ) - if len(self.blocks)>0: - print self,"fetch_pos=%s write_pos=%s get=%s with length=%s pending=%s" % (self.fetch_pos, self.write_pos, min(self.blocks)[0],min(self.blocks)[2], len(self.blocks)) + #if len(self.blocks)>0: + #print self,"fetch_pos=%s write_pos=%s get=%s with length=%s pending=%s" % (self.fetch_pos, self.write_pos, min(self.blocks)[0],min(self.blocks)[2], len(self.blocks)) buf = "" while len(self.blocks)>0 and min(self.blocks)[0] == self.write_pos: @@ -240,6 +242,7 @@ class HTTPProxyClient(asynchat.async_chat): class HTTPChannel(asynchat.async_chat): def __init__(self, server, sock, addr): + print "Channel opened" self.server = server self.data = StringIO.StringIO() @@ -266,8 +269,23 @@ class HTTPChannel(asynchat.async_chat): else: self.set_terminator(None) headers = httplib.HTTPMessage(self.data).dict - self.server.handle_request(self, self.request[0], self.request[1], headers) - + self.handle_request(self, self.request[0], self.request[1], headers) + + def handle_request(self, channel, method, path, headers): + url = urlparse.urlparse(path) + if method != "GET": + #do not handle non-GET or GET with Query (?foo=bla) requests + return self._bypass_request(channel, method, url, headers) + else: + MagicHTTPProxyClient(channel, url, headers) + + def _bypass_request(self, channel, method, url, headers): + proxy = ENDPOINTS[ int( md5.md5(url.hostname).hexdigest(),16 ) % len(ENDPOINTS) ] + print self, "_bypass request via %s: %s %s" % (proxy, method, urlparse.urlunparse(url)) + HTTPProxyClient(proxy, channel, method, url, headers) + #FIXME use this other thing + + class HTTPProxyServer(asyncore.dispatcher): def __init__(self): @@ -286,19 +304,7 @@ class HTTPProxyServer(asyncore.dispatcher): conn, addr = self.accept() HTTPChannel(self, conn, addr) - def handle_request(self, channel, method, path, headers): - url = urlparse.urlparse(path) - if method != "GET": - #do not handle non-GET or GET with Query (?foo=bla) requests - return self._bypass_request(channel, method, url, headers) - else: - MagicHTTPProxyClient(channel, url, headers) - - def _bypass_request(self, channel, method, url, headers): - proxy = ENDPOINTS[ int( md5.md5(url.hostname).hexdigest(),16 ) % len(ENDPOINTS) ] - print self, "_bypass request via %s: %s %s" % (proxy, method, urlparse.urlunparse(url)) - HTTPProxyClient(proxy, channel, method, url, headers) - if __name__ == "__main__": proxy = HTTPProxyServer() + print proxy asyncore.loop() -- cgit v1.2.1 From fe7a395f73bd2fbb9e01d24910722cc13c0ee649 Mon Sep 17 00:00:00 2001 From: Yves Date: Fri, 2 Apr 2010 20:23:13 +0200 Subject: python-class for config, code dedup --- proxy.py | 136 ++++++++++++++++++++++++++------------------------------------- 1 file changed, 56 insertions(+), 80 deletions(-) (limited to 'proxy.py') diff --git a/proxy.py b/proxy.py index c11236d..edf6be2 100755 --- a/proxy.py +++ b/proxy.py @@ -4,27 +4,31 @@ import asynchat, asyncore, socket, httplib, urlparse from heapq import heappush, heappop import cStringIO as StringIO +kB = 1024 -ENDPOINTS = [ - ('10.2.2.11', 8888), -# ('10.3.1.2', 8888), -# ('10.1.1.156', 8888), -] +class DefaultConfiguration: + """bind to that""" + listen=("",8080) -kB = 1024 -#minimum entity size to start a paralel fetch -THRESHOLD = 512 * kB -#first fetch-range blocksize -INIT_BLOCKSIZE = 512 * kB -#lower bound of fetch-range blocksize optimization -MIN_BLOCKSIZE = 512 * kB -#time each fetcher spent on his range, calculated using -#speed measured while using INIT_BLOCKSIZE -TIME_SLICE = 5 -#start a new fetcher on a endpoint X-bytes before the -#old one finished -FETCHER_JUMPSTART = 32 * kB + """available http-proxies""" + endpoints=[ ('10.2.2.11', 8888) ] + + """minimum entity size to start parallelize fetch""" + threshold = 512*kB + + """initial size/range for a fetcher-job""" + initial_blocksize=512*kB + """minimal size/range for a fetcher-job""" + minimal_blocksize=512*kB + + """(sec) #time each fetcher spent on his range, + calculated using speed measured while using initial_blocksize""" + time_slice=5 + + """start a new fetcher on a endpoint X-bytes before the old one finished""" + fetcher_jumpstart=32*kB + ################# class Fetcher(asynchat.async_chat): @@ -41,7 +45,6 @@ class Fetcher(asynchat.async_chat): self.http_status = "" self.http_header = StringIO.StringIO() self.state = 0 #0=status, 1=header, 2=body - print self, "__init__" asynchat.async_chat.__init__(self) self.set_terminator("\r\n") self.create_socket(socket.AF_INET, socket.SOCK_STREAM) @@ -51,7 +54,7 @@ class Fetcher(asynchat.async_chat): return "= THRESHOLD: + if content_length >= self.config.threshold: self.content_length = content_length - fetcher.range = self.next_range(INIT_BLOCKSIZE) - for proxy in filter(lambda p: fetcher.proxy != p, ENDPOINTS): + fetcher.range = self.next_range(self.config.initial_blocksize) + for proxy in filter(lambda p: fetcher.proxy != p, self.config.endpoints): if self.fetch_pos == self.content_length -1: break - self.fetchers.append(Fetcher( self, proxy, self.url, self.header, self.next_range(INIT_BLOCKSIZE))) + self.fetchers.append(Fetcher( self, proxy, self.url, self.header, self.next_range(self.config.initial_blocksize))) else: content_length = None @@ -208,43 +212,9 @@ class MagicHTTPProxyClient(object): buf += "\r\n" self.channel.push(buf) -"""Transparent forward to other proxy server""" -class HTTPProxyClient(asynchat.async_chat): - def __init__(self, proxy, channel, method, url, headers): - self.proxy = proxy - self.other = channel - self.method = method - self.headers = headers - - asynchat.async_chat.__init__(self) - self.set_terminator(None) - self.create_socket(socket.AF_INET, socket.SOCK_STREAM) - self.connect(self.proxy) - - self.buf = "" - self.buf += "%s %s HTTP/1.0\r\n" % (method, urlparse.urlunparse(url)) - for key in headers.keys(): - self.buf += "%s: %s\r\n" % (key, headers[key]) - self.buf += "\r\n" - - def __str__(self): - return "" % self.proxy - - def collect_incoming_data(self, data): - self.other.push(data) - - def handle_close(self): - self.close() - self.other.close_when_done() - - def handle_connect(self): - self.push(self.buf) - class HTTPChannel(asynchat.async_chat): def __init__(self, server, sock, addr): - print "Channel opened" self.server = server - self.data = StringIO.StringIO() asynchat.async_chat.__init__(self, sock) @@ -269,42 +239,48 @@ class HTTPChannel(asynchat.async_chat): else: self.set_terminator(None) headers = httplib.HTTPMessage(self.data).dict - self.handle_request(self, self.request[0], self.request[1], headers) + self.handle_request(self.request[0], self.request[1], headers) - def handle_request(self, channel, method, path, headers): + def handle_request(self, method, path, headers): url = urlparse.urlparse(path) - if method != "GET": + if method != "GET" or url.query != "": #do not handle non-GET or GET with Query (?foo=bla) requests - return self._bypass_request(channel, method, url, headers) + proxy = self.server.config.endpoints[ int( md5.md5(url.hostname).hexdigest(),16 ) % len(self.server.config.endpoints) ] + print Fetcher(self, proxy, url, headers, (-1,-1)) else: - MagicHTTPProxyClient(channel, url, headers) - - def _bypass_request(self, channel, method, url, headers): - proxy = ENDPOINTS[ int( md5.md5(url.hostname).hexdigest(),16 ) % len(ENDPOINTS) ] - print self, "_bypass request via %s: %s %s" % (proxy, method, urlparse.urlunparse(url)) - HTTPProxyClient(proxy, channel, method, url, headers) - #FIXME use this other thing - - + MagicHTTPProxyClient(self, url, headers) + + def handle_incoming_http_header(self,fetcher,header): + header.seek(0) + headers = httplib.HTTPMessage(header) + buf = "HTTP/1.1 200 OK\r\n" + buf += "\r\n".join(map(lambda hdr: "%s: %s" % (hdr,headers.dict[hdr]), headers.dict.keys())) + buf += "\r\n\r\n" + self.push(buf) -class HTTPProxyServer(asyncore.dispatcher): - def __init__(self): - self.port = 8080 + def handle_incoming_data(self, fetcher, data=None, length=0): + if data: + self.push(data) + return True + +class HTTPProxyServer(asyncore.dispatcher): + def __init__(self,config): + self.config = config asyncore.dispatcher.__init__(self) self.create_socket(socket.AF_INET, socket.SOCK_STREAM) self.set_reuse_addr() - self.bind(("", 8080)) + self.bind(self.config.listen) self.listen(5) def __str__(self): - return "" % self.port + return "" % self.config.listen def handle_accept(self): conn, addr = self.accept() HTTPChannel(self, conn, addr) if __name__ == "__main__": - proxy = HTTPProxyServer() + proxy = HTTPProxyServer(DefaultConfiguration) print proxy asyncore.loop() -- cgit v1.2.1 From c18e0ab17a23ddef42768151c6905818ad1d40d3 Mon Sep 17 00:00:00 2001 From: Yves Date: Fri, 2 Apr 2010 20:48:13 +0200 Subject: lays a egg --- proxy.py | 286 --------------------------------------------------------------- 1 file changed, 286 deletions(-) delete mode 100755 proxy.py (limited to 'proxy.py') diff --git a/proxy.py b/proxy.py deleted file mode 100755 index edf6be2..0000000 --- a/proxy.py +++ /dev/null @@ -1,286 +0,0 @@ -#!/usr/bin/python -t -import os, sys, string, time, md5, random -import asynchat, asyncore, socket, httplib, urlparse -from heapq import heappush, heappop -import cStringIO as StringIO - -kB = 1024 - -class DefaultConfiguration: - """bind to that""" - listen=("",8080) - - """available http-proxies""" - endpoints=[ ('10.2.2.11', 8888) ] - - """minimum entity size to start parallelize fetch""" - threshold = 512*kB - - """initial size/range for a fetcher-job""" - initial_blocksize=512*kB - - """minimal size/range for a fetcher-job""" - minimal_blocksize=512*kB - - """(sec) #time each fetcher spent on his range, - calculated using speed measured while using initial_blocksize""" - time_slice=5 - - """start a new fetcher on a endpoint X-bytes before the old one finished""" - fetcher_jumpstart=32*kB - -################# - -class Fetcher(asynchat.async_chat): - def __init__(self, reader, proxy, url, headers, range): - self.reader = reader - self.proxy = proxy - self.url = url - self.headers = headers - self.range = range - - self.pos = (self.range[0] != -1) and self.range[0] or 0 - self.start_time = 0 - self.stop_time = 0 - self.http_status = "" - self.http_header = StringIO.StringIO() - self.state = 0 #0=status, 1=header, 2=body - asynchat.async_chat.__init__(self) - self.set_terminator("\r\n") - self.create_socket(socket.AF_INET, socket.SOCK_STREAM) - self.connect(self.proxy) - - def __str__(self): - return "= self.range[1] + 1: - #if this request is the first one (whithout Range: ...) then the server - # dont send us our expected range, we must cut it at some point (here) - bytes_remaining = self.range[1] - ( self.pos ) - data=data[:bytes_remaining+1] - print self,"cut: pos=%s length=%s => %s" % (self.pos, length, len(data)) - length = len(data) - if length == 0: - self.reader.handle_incoming_data(self) - self.close() - return - if not self.reader.handle_incoming_data(self, data, length): - self.close() - return - self.pos += length - if self.range != (-1,-1) and self.pos >= self.range[1]: - self.stop_time = time.time() - print self, "finished with %s kB/s" % (self.speed() / 1024) - self.reader.handle_incoming_data(self) - self.close() - elif self.state ==1: #header - self.http_header.write( data ) - else: #status - self.http_status += data - - def found_terminator(self): - if self.state == 0: #got status-line - self.state = 1 - self.set_terminator("\r\n\r\n") - elif self.state == 1: #got headers - self.state = 2 - self.set_terminator(None) - self.reader.handle_incoming_http_header(self, self.http_header) - -class MagicHTTPProxyClient(object): - def __init__(self, channel, url, header): - self.channel = channel - self.config = self.channel.server.config - self.url = url - self.header = header - - self.content_length = -1 - self.header_sent = False - self.fetch_pos = 0 - self.write_pos = 0 - self.buffer = "" - self.blocks = list() - self.fetchers = list() - - proxy = self.config.endpoints[ random.randint(0, len(self.config.endpoints)-1) ] - self.fetchers.append( Fetcher(self, proxy, self.url, self.header, (-1,-1)) ) - - def __str__(self): - return "" % (self.url.hostname, self.url.path, self.content_length) - - def handle_incoming_data(self, fetcher, data=None, length=0): - if not data: - #fetcher is done, remove from list - self.fetchers = filter(lambda f: f != fetcher, self.fetchers) - print "Remove: %s" % fetcher - else: - assert fetcher.pos < fetcher.range[1] or fetcher.range == (-1,-1) - heappush(self.blocks, (fetcher.pos, data, length)) - - if not self.channel.connected: - print self, "request side closed the connection" - return False - - if fetcher.range != (-1,-1) \ - and fetcher.range[1] - (fetcher.pos+length) < self.config.fetcher_jumpstart \ - and self.fetch_pos + 1 < self.content_length \ - and len( filter(lambda f: f.proxy == fetcher.proxy, self.fetchers) ) < 2: - #Start a new fetcher if this fetcher is X-Bytes before finished his job - blocksize = max(int(self.config.time_slice * fetcher.speed()), self.config.minimal_blocksize) - fetch_range = self.next_range(blocksize) - self.fetchers.append( Fetcher(self, fetcher.proxy, self.url, self.header, fetch_range) ) - - #if len(self.blocks)>0: - #print self,"fetch_pos=%s write_pos=%s get=%s with length=%s pending=%s" % (self.fetch_pos, self.write_pos, min(self.blocks)[0],min(self.blocks)[2], len(self.blocks)) - - buf = "" - while len(self.blocks)>0 and min(self.blocks)[0] == self.write_pos: - item = heappop(self.blocks) - buf += item[1] - self.write_pos += item[2] - - if buf != "": - self.channel.push(buf) - - if self.write_pos + 1 >= self.content_length: - print self, "job done %s blocks left" % len(self.blocks) - self.channel.close_when_done() - return True - - def next_range(self, suggested_blocksize): - assert self.content_length != -1 - start = self.fetch_pos - self.fetch_pos = min(self.fetch_pos + suggested_blocksize, self.content_length) - return (start, self.fetch_pos-1) - - def handle_incoming_http_header(self, fetcher, header): - if not self.channel.connected: - return - if self.header_sent: - pass - else: - self.header_sent = True - - # Sends header from first response - header.seek(0) - headers = httplib.HTTPMessage(header) - - content_length = filter(lambda i: i == "content-length", headers.dict.keys()) - if len(content_length) == 1: - content_length = int(headers.dict["content-length"]) - if content_length >= self.config.threshold: - self.content_length = content_length - fetcher.range = self.next_range(self.config.initial_blocksize) - for proxy in filter(lambda p: fetcher.proxy != p, self.config.endpoints): - if self.fetch_pos == self.content_length -1: - break - self.fetchers.append(Fetcher( self, proxy, self.url, self.header, self.next_range(self.config.initial_blocksize))) - - else: - content_length = None - - buf = "HTTP/1.1 200 OK\r\n" - for key in filter(lambda k: k not in ("content-range", "content-length"), headers.dict.keys()): - buf += "%s: %s\r\n" % (key, headers.dict[key]) - if content_length: - buf += "Content-Length: %s\r\n" % content_length - buf += "Content-Range: bytes %s-%s/%s\r\n" % (0, content_length-1, content_length) - buf += "X-Proxy: Magicproxy (superpower activated)\r\n" - buf += "\r\n" - self.channel.push(buf) - -class HTTPChannel(asynchat.async_chat): - def __init__(self, server, sock, addr): - self.server = server - self.data = StringIO.StringIO() - - asynchat.async_chat.__init__(self, sock) - self.set_terminator("\r\n\r\n") - - def handle_close(self): - self.connected = False - self.close() - - def collect_incoming_data(self, data): - self.data.write(data) - if self.data.tell() > 16384: - self.close_when_done() - - def found_terminator(self): - # parse http header - self.data.seek(0) - self.request = string.split(self.data.readline(), None, 2) - if len(self.request) != 3: - # badly formed request; just shut down - self.close_when_done() - else: - self.set_terminator(None) - headers = httplib.HTTPMessage(self.data).dict - self.handle_request(self.request[0], self.request[1], headers) - - def handle_request(self, method, path, headers): - url = urlparse.urlparse(path) - if method != "GET" or url.query != "": - #do not handle non-GET or GET with Query (?foo=bla) requests - proxy = self.server.config.endpoints[ int( md5.md5(url.hostname).hexdigest(),16 ) % len(self.server.config.endpoints) ] - print Fetcher(self, proxy, url, headers, (-1,-1)) - else: - MagicHTTPProxyClient(self, url, headers) - - def handle_incoming_http_header(self,fetcher,header): - header.seek(0) - headers = httplib.HTTPMessage(header) - buf = "HTTP/1.1 200 OK\r\n" - buf += "\r\n".join(map(lambda hdr: "%s: %s" % (hdr,headers.dict[hdr]), headers.dict.keys())) - buf += "\r\n\r\n" - self.push(buf) - - - def handle_incoming_data(self, fetcher, data=None, length=0): - if data: - self.push(data) - return True - -class HTTPProxyServer(asyncore.dispatcher): - def __init__(self,config): - self.config = config - asyncore.dispatcher.__init__(self) - self.create_socket(socket.AF_INET, socket.SOCK_STREAM) - self.set_reuse_addr() - self.bind(self.config.listen) - self.listen(5) - - def __str__(self): - return "" % self.config.listen - - def handle_accept(self): - conn, addr = self.accept() - HTTPChannel(self, conn, addr) - -if __name__ == "__main__": - proxy = HTTPProxyServer(DefaultConfiguration) - print proxy - asyncore.loop() -- cgit v1.2.1