diff options
Diffstat (limited to 'proxy.py')
-rw-r--r-- | proxy.py | 147 |
1 files changed, 126 insertions, 21 deletions
@@ -7,19 +7,128 @@ except ImportError: import StringIO -endpoints = { - {'host':'10.1.0.1', 'port':8080, 'speed':220, 'name':'Proxy 10.1'}, - {'host':'10.2.2.11', 'port':8081, 'speed':340, 'name':'Proxy 10.2'}, - {'host':'10.3.0.99', 'port':8080, 'speed':340, 'name':'Proxy 10.3'}, -} +ENDPOINTS = [ +# {'host':'10.1.0.1', 'port':8080}, +# {'host':'10.2.2.11', 'port':8081}, + ('10.3.1.2', 8888), +] +#minimum entity size to start a paralel fetch +MINIMAL_SIZE = 524288 +BLOCKSIZE_FETCH = 524288 + +class Fetcher(asynchat.async_chat): + def __init__(self, reader, proxy, url, header, range, content_length): + print "Fetcher using proxy=%s for url=%s range=%s content_length=%s" % (proxy, url, range, content_length) + self.reader = reader + self.proxy = proxy + self.url = url + self.range = range + self.content_length = content_length + + self.pos = range[0] + asynchat.async_chat.__init__(self) + self.set_terminator("\r\n\r\n") + self.http_header = "" + self.is_body = False + self.create_socket(socket.AF_INET, socket.SOCK_STREAM) + self.connect(proxy) + + #XXXshould include original request headers + def handle_connect (self): + print "handle_connect" + self.send("GET http://%s:%s%s HTTP/1.0\r\n" % ( self.url.hostname, self.url.port or 80, self.url.path )) + self.send("Content-Range: bytes %s-%s/%s\r\n" % (self.range[0], self.range[1], self.content_length)) + self.send("\r\n") + print "sent request" + + def collect_incoming_data(self, data): + if self.is_body: + print "read %s - range_start=%s range_end=%s" % (self.pos, self.range[0], self.range[1]) + self.reader.handle_incoming_data(self.pos, data) + self.pos += len(data) + if self.pos >= self.range[1]: + self.reader.finished() + print "finished" + self.close_when_done() + else: + print "get header data" + print data + self.http_header += data + + def found_terminator(self): + print "header ends now" + self.set_terminator(None) + self.reader.handle_incoming_http_header(self.http_header) + self.is_body = True + +class MultipleProxyReader(object): + def __init__(self, channel, url, header, content_length): + print "MultipleProxyReader url=%s content_length=%s" % (url, content_length) + self.channel = channel + self.url = url + self.header = header + self.content_length = content_length + + self.header_sent = False + self.fetch_pos = 0 + self.write_pos = 0 + self.buffer = "" + self.blocks = dict() + self.next_endpoint = 0 + + self.finished() #bad-named XXX + + def handle_incoming_data(self, pos, data): + print "got body data at pos %s" % pos, + if self.write_pos == pos: + self.write_pos += len(data) + self.channel.push(data) + print " ..sent" + else: + self.blocks[pos] = data + print " ..stored" + + self.find_next_data() + + def handle_incoming_http_header(self, header): + if self.header_sent: + pass + else: + self.header_sent = True + self.channel.push(header) + self.channel.push("X-Proxy: Magicproxy (superpower activated)\r\n") + self.channel.push("\r\n") + + def finished(self): + self.find_next_data() + if self.fetch_pos != self.content_length: + print "start new fetcher" + start = self.fetch_pos + if self.fetch_pos+BLOCKSIZE_FETCH < self.content_length: + self.fetch_pos += BLOCKSIZE_FETCH + else: + self.fetch_pos = self.content_length + range = (start, self.fetch_pos-1) + Fetcher(self, ENDPOINTS[self.next_endpoint], self.url, self.header, range, self.content_length) + self.next_endpoint = (self.next_endpoint+1) % len(ENDPOINTS) + else: + #bin fertig + self.channel.close_when_done() + + def find_next_data(self): + if self.write_pos in self.blocks: + data = self.blocks.pop(self.write_pos) + self.channel.push(data) + self.write_pos += len(data) + self.find_next_data() class HTTPResponseProducer(object): def __init__(self, resp, amt=512): self.resp = resp self.amt = amt def more(self): - return self.resp.read(self.amt) + self.resp.read(self.amt) class HTTPChannel(asynchat.async_chat): def __init__(self, server, sock, addr): @@ -44,7 +153,6 @@ class HTTPChannel(asynchat.async_chat): self.close_when_done() else: self.server.handle_request(self, self.request[0], self.request[1]) - self.close_when_done() else: pass # ignore body data, for now @@ -75,32 +183,29 @@ class HTTPProxyServer(asyncore.dispatcher): content_length = filter(lambda it: it[0] == "content-length", resp.getheaders()) if len( content_length ) == 0: # no content length given, bypass this request - return self._bypass_request(channel, method, url) + return self._bypass_request(channel, "GET", url) else: - content_length = content_length[0][1] + content_length = int(content_length[0][1]) - if content_length < 524288: - # do not handle requests smaller than 512kb - return self._bypass_request(channel, method, url) + if content_length < MINIMAL_SIZE: + return self._bypass_request(channel, "GET", url) - print "Content-Length: %s" % (content_length) - - # XXX an dieser stelle muss de request aufgeteilt werden - return self._bypass_request(channel, method, url) - #print "do some magic for " +str(url) - #channel.push("HTTP/1.0 200 OK\r\nX-Proxy: Magicproxy (request handled in boost mode)\r\n") - #channel.close_when_done() + print "start reader" + ##XXX + header = (("foo", "bla")) + MultipleProxyReader(channel, url, header, content_length) def _bypass_request(self, channel, method, url): + print "_bypass request" #XXX hier sollte nicht proxy gespielt werden sondern #die daten 1-zu-1 durchgereicht werden. #Weiterhin sollte sichergestellt werden, dass die requests - #zu Host X1 immer über Proxy Y1 geroutet werden + #zu Host X1 immer ueber Proxy Y1 geroutet werden # etwa proxy=proxies[ stuff(hostname) % len(proxies) ] conn = httplib.HTTPConnection(url.hostname, url.port or 80) conn.request(method, url.path) resp = conn.getresponse() - channel.push("HTTP/1.0 200 OK\r\nX-Proxy: Magicproxy (request handled in standard mode)\r\n") + channel.push("HTTP/1.0 200 OK\r\nX-Proxy: Magicproxy (superpower disabled)\r\n") channel.push( "\r\n".join(map(lambda k: "%s: %s" % (k[0],k[1]), resp.getheaders())) ) channel.push("\r\n\r\n") channel.push_with_producer( HTTPResponseProducer(resp) ) |