diff options
author | yvesf <yvesf@aurora.xapek.org> | 2010-03-11 21:22:13 +0100 |
---|---|---|
committer | yvesf <yvesf@aurora.xapek.org> | 2010-03-11 21:22:13 +0100 |
commit | 18f6784874a89c924e6fda75cb6fafceecc69c82 (patch) | |
tree | eb6db2704d096ca8937a35823f698e324d14d532 | |
parent | 252c318dadbfcc594d90e050f70dc40b1238aa2a (diff) | |
download | magicproxy-18f6784874a89c924e6fda75cb6fafceecc69c82.tar.gz magicproxy-18f6784874a89c924e6fda75cb6fafceecc69c82.zip |
Still not parallel
-rw-r--r-- | proxy.py | 81 |
1 files changed, 49 insertions, 32 deletions
@@ -8,14 +8,13 @@ except ImportError: ENDPOINTS = [ -# {'host':'10.1.0.1', 'port':8080}, -# {'host':'10.2.2.11', 'port':8081}, + ('10.2.2.11', 8888), ('10.3.1.2', 8888), ] #minimum entity size to start a paralel fetch MINIMAL_SIZE = 524288 -BLOCKSIZE_FETCH = 524288 +BLOCKSIZE_FETCH = 131072 class Fetcher(asynchat.async_chat): def __init__(self, reader, proxy, url, header, range, content_length): @@ -28,39 +27,49 @@ class Fetcher(asynchat.async_chat): self.pos = range[0] asynchat.async_chat.__init__(self) - self.set_terminator("\r\n\r\n") + self.set_terminator("\r\n") + self.http_status = "" self.http_header = "" - self.is_body = False + self.state = 0 #0=status, 1=header, 2=body self.create_socket(socket.AF_INET, socket.SOCK_STREAM) self.connect(proxy) #XXXshould include original request headers def handle_connect (self): print "handle_connect" - self.send("GET http://%s:%s%s HTTP/1.0\r\n" % ( self.url.hostname, self.url.port or 80, self.url.path )) - self.send("Content-Range: bytes %s-%s/%s\r\n" % (self.range[0], self.range[1], self.content_length)) - self.send("\r\n") + self.debug_send("GET http://%s:%s%s HTTP/1.0\r\n" % ( self.url.hostname, self.url.port or 80, self.url.path )) + self.debug_send("Range: bytes=%s-%s\r\n" % (self.range[0], self.range[1])) + self.debug_send("\r\n") print "sent request" + + def debug_send(self, data): + print "SEND", data + self.send(data) def collect_incoming_data(self, data): - if self.is_body: - print "read %s - range_start=%s range_end=%s" % (self.pos, self.range[0], self.range[1]) + if self.state==2:#body + print "read at %s - range_start=%s range_end=%s" % (self.pos, self.range[0], self.range[1]) self.reader.handle_incoming_data(self.pos, data) self.pos += len(data) if self.pos >= self.range[1]: self.reader.finished() print "finished" self.close_when_done() - else: - print "get header data" - print data + elif self.state ==1: #header self.http_header += data + else: #status + self.http_status += data def found_terminator(self): - print "header ends now" - self.set_terminator(None) - self.reader.handle_incoming_http_header(self.http_header) - self.is_body = True + if self.state == 0: + self.state = 1 + self.set_terminator("\r\n\r\n") + print "got status line" + elif self.state == 1: + self.state = 2 + self.set_terminator(None) + print "header ends now" + self.reader.handle_incoming_http_header(self.http_header) class MultipleProxyReader(object): def __init__(self, channel, url, header, content_length): @@ -80,14 +89,8 @@ class MultipleProxyReader(object): self.finished() #bad-named XXX def handle_incoming_data(self, pos, data): - print "got body data at pos %s" % pos, - if self.write_pos == pos: - self.write_pos += len(data) - self.channel.push(data) - print " ..sent" - else: - self.blocks[pos] = data - print " ..stored" + print "got body data at pos %s" % pos + self.blocks[pos] = data self.find_next_data() @@ -96,9 +99,20 @@ class MultipleProxyReader(object): pass else: self.header_sent = True - self.channel.push(header) - self.channel.push("X-Proxy: Magicproxy (superpower activated)\r\n") - self.channel.push("\r\n") + + self.debug_send("HTTP/1.0 200 OK\r\n") + headers = httplib.HTTPMessage(StringIO.StringIO(header)) + for key in headers.dict.keys(): + if key in ("content-range", "content-length"): + continue + self.debug_send("%s: %s\r\n" % (key, headers.dict[key])) + self.debug_send("Content-Length: %s" % self.content_length) + self.debug_send("X-Proxy: Magicproxy (superpower activated)\r\n") + self.debug_send("\r\n") + + def debug_send(self, data): + print "SEND", data + self.channel.push(data) def finished(self): self.find_next_data() @@ -117,10 +131,12 @@ class MultipleProxyReader(object): self.channel.close_when_done() def find_next_data(self): - if self.write_pos in self.blocks: + print "expect at %s" % self.write_pos, self.blocks.keys() + if self.write_pos in self.blocks.keys(): data = self.blocks.pop(self.write_pos) self.channel.push(data) self.write_pos += len(data) + print "Send %s bytes" % len(data) self.find_next_data() class HTTPResponseProducer(object): @@ -173,6 +189,7 @@ class HTTPProxyServer(asyncore.dispatcher): url = urlparse.urlparse(path) print method, path if method != "GET" or url.query != "": + print "non-GET or query, bypass" #do not handle non-GET or GET with Query (?foo=bla) requests return self._bypass_request(channel, method, url) @@ -183,14 +200,14 @@ class HTTPProxyServer(asyncore.dispatcher): content_length = filter(lambda it: it[0] == "content-length", resp.getheaders()) if len( content_length ) == 0: # no content length given, bypass this request + print "missing content-length, bypass" return self._bypass_request(channel, "GET", url) else: content_length = int(content_length[0][1]) - if content_length < MINIMAL_SIZE: - return self._bypass_request(channel, "GET", url) + #if content_length < MINIMAL_SIZE: + # return self._bypass_request(channel, "GET", url) - print "start reader" ##XXX header = (("foo", "bla")) MultipleProxyReader(channel, url, header, content_length) |