diff options
author | Thomas Keck <thomas@macbook.macbook> | 2010-03-13 16:19:46 +0100 |
---|---|---|
committer | Thomas Keck <thomas@macbook.macbook> | 2010-03-13 16:19:46 +0100 |
commit | ce35065f0b12f509dea6940bbf3794fb80ffa403 (patch) | |
tree | 582975d8a171f74f79ba9a5700d78d3a9a47b8c3 | |
parent | 94e33924fad7b7a38d6d1e848d01b5e830a45360 (diff) | |
parent | 69247decb19a5c3b366e3489ffdf8426367a9b40 (diff) | |
download | magicproxy-ce35065f0b12f509dea6940bbf3794fb80ffa403.tar.gz magicproxy-ce35065f0b12f509dea6940bbf3794fb80ffa403.zip |
Merge branch 'master' of git://github.com/yvesf/magicproxy
-rwxr-xr-x | proxy.py | 63 |
1 files changed, 35 insertions, 28 deletions
@@ -42,7 +42,7 @@ class Fetcher(asynchat.async_chat): self.start_time = 0 self.stop_time = 0 self.http_status = "" - self.http_header = "" + self.http_header = StringIO.StringIO() self.state = 0 #0=status, 1=header, 2=body asynchat.async_chat.__init__(self) @@ -70,8 +70,9 @@ class Fetcher(asynchat.async_chat): def collect_incoming_data(self, data): if self.state==2: #body - self.reader.handle_incoming_data(self, data) - self.pos += len(data) + length = len(data) + self.reader.handle_incoming_data(self, data, length) + self.pos += length if self.pos >= self.range[1]: self.stop_time = time.time() print self, "finished" @@ -79,7 +80,7 @@ class Fetcher(asynchat.async_chat): self.reader.handle_incoming_data(self) self.close() elif self.state ==1: #header - self.http_header += data + self.http_header.write( data ) else: #status self.http_status += data @@ -95,6 +96,15 @@ class Fetcher(asynchat.async_chat): def __str__(self): return "<Fetcher proxy=%s url=%s range=%s" % (self.proxy, urlparse.urlunparse(self.url), self.range) +class StringIOProducer(object): + def __init__(self,buf, amt=1440): + self.buf=buf + self.amt = amt + self.buf.seek(0) + + def more(self): + return self.buf.read(self.amt) + class MultipleProxyReader(object): def __init__(self, channel, url, header, content_length): self.channel = channel @@ -112,11 +122,15 @@ class MultipleProxyReader(object): for proxy in ENDPOINTS: self.fetchers.append( Fetcher(self, proxy, self.url, self.header, self.next_range(INIT_BLOCKSIZE)) ) - def handle_incoming_data(self, fetcher, data=None): + def handle_incoming_data(self, fetcher, data=None, length=0): if not data: self.fetchers = filter(lambda f: f != fetcher, self.fetchers) else: - heappush(self.blocks, (fetcher.pos, data)) + heappush(self.blocks, (fetcher.pos, data, length)) + + if self.channel.is_closed: + print self, "request side closed the connection" + return if fetcher.range[1] - fetcher.pos < FETCHER_JUMPSTART \ and self.fetch_pos + 1 < self.content_length and not self.channel.is_closed \ @@ -127,8 +141,19 @@ class MultipleProxyReader(object): print "Start new Fetcher, bs=%s range=%s" % (blocksize,fetch_range) self.fetchers.append( Fetcher(self, fetcher.proxy, self.url, self.header, fetch_range) ) - while self.send_next_data(): - pass + buf = StringIO.StringIO() + while len(self.blocks)>0 and min(self.blocks)[0] == self.write_pos: + item = heappop(self.blocks) + buf.write(item[1]) + self.write_pos += item[2] + + if buf.tell() > 0: + self.channel.push_with_producer(StringIOProducer(buf)) + + if self.write_pos + 1 >= self.content_length: + print self, "job done %s blocks left" % len(self.blocks) + self.channel.is_closed = True + self.channel.close_when_done() def next_range(self, suggested_blocksize): start = self.fetch_pos @@ -143,33 +168,15 @@ class MultipleProxyReader(object): self.channel.push("HTTP/1.0 200 OK\r\n") # Sends header from first response - headers = httplib.HTTPMessage(StringIO.StringIO(header)) + header.seek(0) + headers = httplib.HTTPMessage(header) for key in filter(lambda k: k not in ("content-range", "content-length"), headers.dict.keys()): self.channel.push("%s: %s\r\n" % (key, headers.dict[key])) self.channel.push("Content-Length: %s" % self.content_length) self.channel.push("X-Proxy: Magicproxy (superpower activated)\r\n") self.channel.push("\r\n") - def send_next_data(self): - if self.channel.is_closed: - print self, "request side closed the connection" - self.channel.close_when_done() - #XXX terminate all running fetcher - return False - - #print self, "expect data at %s in" % self.write_pos, self.blocks.keys() - if len(self.blocks)>0 and min(self.blocks)[0] == self.write_pos: - item = heappop(self.blocks) - self.channel.push(item[1]) - self.write_pos += len(item[1]) - return True - - if self.write_pos + 1 >= self.content_length: - print self, "job done %s blocks left" % len(self.blocks) - #XXX prevent next calls to send_next_data - self.channel.close_when_done() - return False def __str__(self): return "<MultipleProxyReader url=%s content_length=%s>" % (urlparse.urlunparse(self.url), self.content_length) |