summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authoryvesf <yvesf@aurora.xapek.org>2010-03-11 21:22:13 +0100
committeryvesf <yvesf@aurora.xapek.org>2010-03-11 21:22:13 +0100
commit18f6784874a89c924e6fda75cb6fafceecc69c82 (patch)
treeeb6db2704d096ca8937a35823f698e324d14d532
parent252c318dadbfcc594d90e050f70dc40b1238aa2a (diff)
downloadmagicproxy-18f6784874a89c924e6fda75cb6fafceecc69c82.tar.gz
magicproxy-18f6784874a89c924e6fda75cb6fafceecc69c82.zip
Still not parallel
-rw-r--r--proxy.py81
1 files changed, 49 insertions, 32 deletions
diff --git a/proxy.py b/proxy.py
index 1ebed70..a0f5c6b 100644
--- a/proxy.py
+++ b/proxy.py
@@ -8,14 +8,13 @@ except ImportError:
ENDPOINTS = [
-# {'host':'10.1.0.1', 'port':8080},
-# {'host':'10.2.2.11', 'port':8081},
+ ('10.2.2.11', 8888),
('10.3.1.2', 8888),
]
#minimum entity size to start a paralel fetch
MINIMAL_SIZE = 524288
-BLOCKSIZE_FETCH = 524288
+BLOCKSIZE_FETCH = 131072
class Fetcher(asynchat.async_chat):
def __init__(self, reader, proxy, url, header, range, content_length):
@@ -28,39 +27,49 @@ class Fetcher(asynchat.async_chat):
self.pos = range[0]
asynchat.async_chat.__init__(self)
- self.set_terminator("\r\n\r\n")
+ self.set_terminator("\r\n")
+ self.http_status = ""
self.http_header = ""
- self.is_body = False
+ self.state = 0 #0=status, 1=header, 2=body
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.connect(proxy)
#XXXshould include original request headers
def handle_connect (self):
print "handle_connect"
- self.send("GET http://%s:%s%s HTTP/1.0\r\n" % ( self.url.hostname, self.url.port or 80, self.url.path ))
- self.send("Content-Range: bytes %s-%s/%s\r\n" % (self.range[0], self.range[1], self.content_length))
- self.send("\r\n")
+ self.debug_send("GET http://%s:%s%s HTTP/1.0\r\n" % ( self.url.hostname, self.url.port or 80, self.url.path ))
+ self.debug_send("Range: bytes=%s-%s\r\n" % (self.range[0], self.range[1]))
+ self.debug_send("\r\n")
print "sent request"
+
+ def debug_send(self, data):
+ print "SEND", data
+ self.send(data)
def collect_incoming_data(self, data):
- if self.is_body:
- print "read %s - range_start=%s range_end=%s" % (self.pos, self.range[0], self.range[1])
+ if self.state==2:#body
+ print "read at %s - range_start=%s range_end=%s" % (self.pos, self.range[0], self.range[1])
self.reader.handle_incoming_data(self.pos, data)
self.pos += len(data)
if self.pos >= self.range[1]:
self.reader.finished()
print "finished"
self.close_when_done()
- else:
- print "get header data"
- print data
+ elif self.state ==1: #header
self.http_header += data
+ else: #status
+ self.http_status += data
def found_terminator(self):
- print "header ends now"
- self.set_terminator(None)
- self.reader.handle_incoming_http_header(self.http_header)
- self.is_body = True
+ if self.state == 0:
+ self.state = 1
+ self.set_terminator("\r\n\r\n")
+ print "got status line"
+ elif self.state == 1:
+ self.state = 2
+ self.set_terminator(None)
+ print "header ends now"
+ self.reader.handle_incoming_http_header(self.http_header)
class MultipleProxyReader(object):
def __init__(self, channel, url, header, content_length):
@@ -80,14 +89,8 @@ class MultipleProxyReader(object):
self.finished() #bad-named XXX
def handle_incoming_data(self, pos, data):
- print "got body data at pos %s" % pos,
- if self.write_pos == pos:
- self.write_pos += len(data)
- self.channel.push(data)
- print " ..sent"
- else:
- self.blocks[pos] = data
- print " ..stored"
+ print "got body data at pos %s" % pos
+ self.blocks[pos] = data
self.find_next_data()
@@ -96,9 +99,20 @@ class MultipleProxyReader(object):
pass
else:
self.header_sent = True
- self.channel.push(header)
- self.channel.push("X-Proxy: Magicproxy (superpower activated)\r\n")
- self.channel.push("\r\n")
+
+ self.debug_send("HTTP/1.0 200 OK\r\n")
+ headers = httplib.HTTPMessage(StringIO.StringIO(header))
+ for key in headers.dict.keys():
+ if key in ("content-range", "content-length"):
+ continue
+ self.debug_send("%s: %s\r\n" % (key, headers.dict[key]))
+ self.debug_send("Content-Length: %s" % self.content_length)
+ self.debug_send("X-Proxy: Magicproxy (superpower activated)\r\n")
+ self.debug_send("\r\n")
+
+ def debug_send(self, data):
+ print "SEND", data
+ self.channel.push(data)
def finished(self):
self.find_next_data()
@@ -117,10 +131,12 @@ class MultipleProxyReader(object):
self.channel.close_when_done()
def find_next_data(self):
- if self.write_pos in self.blocks:
+ print "expect at %s" % self.write_pos, self.blocks.keys()
+ if self.write_pos in self.blocks.keys():
data = self.blocks.pop(self.write_pos)
self.channel.push(data)
self.write_pos += len(data)
+ print "Send %s bytes" % len(data)
self.find_next_data()
class HTTPResponseProducer(object):
@@ -173,6 +189,7 @@ class HTTPProxyServer(asyncore.dispatcher):
url = urlparse.urlparse(path)
print method, path
if method != "GET" or url.query != "":
+ print "non-GET or query, bypass"
#do not handle non-GET or GET with Query (?foo=bla) requests
return self._bypass_request(channel, method, url)
@@ -183,14 +200,14 @@ class HTTPProxyServer(asyncore.dispatcher):
content_length = filter(lambda it: it[0] == "content-length", resp.getheaders())
if len( content_length ) == 0:
# no content length given, bypass this request
+ print "missing content-length, bypass"
return self._bypass_request(channel, "GET", url)
else:
content_length = int(content_length[0][1])
- if content_length < MINIMAL_SIZE:
- return self._bypass_request(channel, "GET", url)
+ #if content_length < MINIMAL_SIZE:
+ # return self._bypass_request(channel, "GET", url)
- print "start reader"
##XXX
header = (("foo", "bla"))
MultipleProxyReader(channel, url, header, content_length)