summaryrefslogtreecommitdiff
path: root/proxy.py
diff options
context:
space:
mode:
Diffstat (limited to 'proxy.py')
-rw-r--r--proxy.py147
1 files changed, 126 insertions, 21 deletions
diff --git a/proxy.py b/proxy.py
index 585b58b..1ebed70 100644
--- a/proxy.py
+++ b/proxy.py
@@ -7,19 +7,128 @@ except ImportError:
import StringIO
-endpoints = {
- {'host':'10.1.0.1', 'port':8080, 'speed':220, 'name':'Proxy 10.1'},
- {'host':'10.2.2.11', 'port':8081, 'speed':340, 'name':'Proxy 10.2'},
- {'host':'10.3.0.99', 'port':8080, 'speed':340, 'name':'Proxy 10.3'},
-}
+ENDPOINTS = [
+# {'host':'10.1.0.1', 'port':8080},
+# {'host':'10.2.2.11', 'port':8081},
+ ('10.3.1.2', 8888),
+]
+#minimum entity size to start a paralel fetch
+MINIMAL_SIZE = 524288
+BLOCKSIZE_FETCH = 524288
+
+class Fetcher(asynchat.async_chat):
+ def __init__(self, reader, proxy, url, header, range, content_length):
+ print "Fetcher using proxy=%s for url=%s range=%s content_length=%s" % (proxy, url, range, content_length)
+ self.reader = reader
+ self.proxy = proxy
+ self.url = url
+ self.range = range
+ self.content_length = content_length
+
+ self.pos = range[0]
+ asynchat.async_chat.__init__(self)
+ self.set_terminator("\r\n\r\n")
+ self.http_header = ""
+ self.is_body = False
+ self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.connect(proxy)
+
+ #XXXshould include original request headers
+ def handle_connect (self):
+ print "handle_connect"
+ self.send("GET http://%s:%s%s HTTP/1.0\r\n" % ( self.url.hostname, self.url.port or 80, self.url.path ))
+ self.send("Content-Range: bytes %s-%s/%s\r\n" % (self.range[0], self.range[1], self.content_length))
+ self.send("\r\n")
+ print "sent request"
+
+ def collect_incoming_data(self, data):
+ if self.is_body:
+ print "read %s - range_start=%s range_end=%s" % (self.pos, self.range[0], self.range[1])
+ self.reader.handle_incoming_data(self.pos, data)
+ self.pos += len(data)
+ if self.pos >= self.range[1]:
+ self.reader.finished()
+ print "finished"
+ self.close_when_done()
+ else:
+ print "get header data"
+ print data
+ self.http_header += data
+
+ def found_terminator(self):
+ print "header ends now"
+ self.set_terminator(None)
+ self.reader.handle_incoming_http_header(self.http_header)
+ self.is_body = True
+
+class MultipleProxyReader(object):
+ def __init__(self, channel, url, header, content_length):
+ print "MultipleProxyReader url=%s content_length=%s" % (url, content_length)
+ self.channel = channel
+ self.url = url
+ self.header = header
+ self.content_length = content_length
+
+ self.header_sent = False
+ self.fetch_pos = 0
+ self.write_pos = 0
+ self.buffer = ""
+ self.blocks = dict()
+ self.next_endpoint = 0
+
+ self.finished() #bad-named XXX
+
+ def handle_incoming_data(self, pos, data):
+ print "got body data at pos %s" % pos,
+ if self.write_pos == pos:
+ self.write_pos += len(data)
+ self.channel.push(data)
+ print " ..sent"
+ else:
+ self.blocks[pos] = data
+ print " ..stored"
+
+ self.find_next_data()
+
+ def handle_incoming_http_header(self, header):
+ if self.header_sent:
+ pass
+ else:
+ self.header_sent = True
+ self.channel.push(header)
+ self.channel.push("X-Proxy: Magicproxy (superpower activated)\r\n")
+ self.channel.push("\r\n")
+
+ def finished(self):
+ self.find_next_data()
+ if self.fetch_pos != self.content_length:
+ print "start new fetcher"
+ start = self.fetch_pos
+ if self.fetch_pos+BLOCKSIZE_FETCH < self.content_length:
+ self.fetch_pos += BLOCKSIZE_FETCH
+ else:
+ self.fetch_pos = self.content_length
+ range = (start, self.fetch_pos-1)
+ Fetcher(self, ENDPOINTS[self.next_endpoint], self.url, self.header, range, self.content_length)
+ self.next_endpoint = (self.next_endpoint+1) % len(ENDPOINTS)
+ else:
+ #bin fertig
+ self.channel.close_when_done()
+
+ def find_next_data(self):
+ if self.write_pos in self.blocks:
+ data = self.blocks.pop(self.write_pos)
+ self.channel.push(data)
+ self.write_pos += len(data)
+ self.find_next_data()
class HTTPResponseProducer(object):
def __init__(self, resp, amt=512):
self.resp = resp
self.amt = amt
def more(self):
- return self.resp.read(self.amt)
+ self.resp.read(self.amt)
class HTTPChannel(asynchat.async_chat):
def __init__(self, server, sock, addr):
@@ -44,7 +153,6 @@ class HTTPChannel(asynchat.async_chat):
self.close_when_done()
else:
self.server.handle_request(self, self.request[0], self.request[1])
- self.close_when_done()
else:
pass # ignore body data, for now
@@ -75,32 +183,29 @@ class HTTPProxyServer(asyncore.dispatcher):
content_length = filter(lambda it: it[0] == "content-length", resp.getheaders())
if len( content_length ) == 0:
# no content length given, bypass this request
- return self._bypass_request(channel, method, url)
+ return self._bypass_request(channel, "GET", url)
else:
- content_length = content_length[0][1]
+ content_length = int(content_length[0][1])
- if content_length < 524288:
- # do not handle requests smaller than 512kb
- return self._bypass_request(channel, method, url)
+ if content_length < MINIMAL_SIZE:
+ return self._bypass_request(channel, "GET", url)
- print "Content-Length: %s" % (content_length)
-
- # XXX an dieser stelle muss de request aufgeteilt werden
- return self._bypass_request(channel, method, url)
- #print "do some magic for " +str(url)
- #channel.push("HTTP/1.0 200 OK\r\nX-Proxy: Magicproxy (request handled in boost mode)\r\n")
- #channel.close_when_done()
+ print "start reader"
+ ##XXX
+ header = (("foo", "bla"))
+ MultipleProxyReader(channel, url, header, content_length)
def _bypass_request(self, channel, method, url):
+ print "_bypass request"
#XXX hier sollte nicht proxy gespielt werden sondern
#die daten 1-zu-1 durchgereicht werden.
#Weiterhin sollte sichergestellt werden, dass die requests
- #zu Host X1 immer über Proxy Y1 geroutet werden
+ #zu Host X1 immer ueber Proxy Y1 geroutet werden
# etwa proxy=proxies[ stuff(hostname) % len(proxies) ]
conn = httplib.HTTPConnection(url.hostname, url.port or 80)
conn.request(method, url.path)
resp = conn.getresponse()
- channel.push("HTTP/1.0 200 OK\r\nX-Proxy: Magicproxy (request handled in standard mode)\r\n")
+ channel.push("HTTP/1.0 200 OK\r\nX-Proxy: Magicproxy (superpower disabled)\r\n")
channel.push( "\r\n".join(map(lambda k: "%s: %s" % (k[0],k[1]), resp.getheaders())) )
channel.push("\r\n\r\n")
channel.push_with_producer( HTTPResponseProducer(resp) )