summaryrefslogtreecommitdiff
path: root/proxy.py
diff options
context:
space:
mode:
authorThomas Keck <thomas@macbook.macbook>2010-03-13 16:19:46 +0100
committerThomas Keck <thomas@macbook.macbook>2010-03-13 16:19:46 +0100
commitce35065f0b12f509dea6940bbf3794fb80ffa403 (patch)
tree582975d8a171f74f79ba9a5700d78d3a9a47b8c3 /proxy.py
parent94e33924fad7b7a38d6d1e848d01b5e830a45360 (diff)
parent69247decb19a5c3b366e3489ffdf8426367a9b40 (diff)
downloadmagicproxy-ce35065f0b12f509dea6940bbf3794fb80ffa403.tar.gz
magicproxy-ce35065f0b12f509dea6940bbf3794fb80ffa403.zip
Merge branch 'master' of git://github.com/yvesf/magicproxy
Diffstat (limited to 'proxy.py')
-rwxr-xr-xproxy.py63
1 files changed, 35 insertions, 28 deletions
diff --git a/proxy.py b/proxy.py
index 0b63c52..99793a8 100755
--- a/proxy.py
+++ b/proxy.py
@@ -42,7 +42,7 @@ class Fetcher(asynchat.async_chat):
self.start_time = 0
self.stop_time = 0
self.http_status = ""
- self.http_header = ""
+ self.http_header = StringIO.StringIO()
self.state = 0 #0=status, 1=header, 2=body
asynchat.async_chat.__init__(self)
@@ -70,8 +70,9 @@ class Fetcher(asynchat.async_chat):
def collect_incoming_data(self, data):
if self.state==2: #body
- self.reader.handle_incoming_data(self, data)
- self.pos += len(data)
+ length = len(data)
+ self.reader.handle_incoming_data(self, data, length)
+ self.pos += length
if self.pos >= self.range[1]:
self.stop_time = time.time()
print self, "finished"
@@ -79,7 +80,7 @@ class Fetcher(asynchat.async_chat):
self.reader.handle_incoming_data(self)
self.close()
elif self.state ==1: #header
- self.http_header += data
+ self.http_header.write( data )
else: #status
self.http_status += data
@@ -95,6 +96,15 @@ class Fetcher(asynchat.async_chat):
def __str__(self):
return "<Fetcher proxy=%s url=%s range=%s" % (self.proxy, urlparse.urlunparse(self.url), self.range)
+class StringIOProducer(object):
+ def __init__(self,buf, amt=1440):
+ self.buf=buf
+ self.amt = amt
+ self.buf.seek(0)
+
+ def more(self):
+ return self.buf.read(self.amt)
+
class MultipleProxyReader(object):
def __init__(self, channel, url, header, content_length):
self.channel = channel
@@ -112,11 +122,15 @@ class MultipleProxyReader(object):
for proxy in ENDPOINTS:
self.fetchers.append( Fetcher(self, proxy, self.url, self.header, self.next_range(INIT_BLOCKSIZE)) )
- def handle_incoming_data(self, fetcher, data=None):
+ def handle_incoming_data(self, fetcher, data=None, length=0):
if not data:
self.fetchers = filter(lambda f: f != fetcher, self.fetchers)
else:
- heappush(self.blocks, (fetcher.pos, data))
+ heappush(self.blocks, (fetcher.pos, data, length))
+
+ if self.channel.is_closed:
+ print self, "request side closed the connection"
+ return
if fetcher.range[1] - fetcher.pos < FETCHER_JUMPSTART \
and self.fetch_pos + 1 < self.content_length and not self.channel.is_closed \
@@ -127,8 +141,19 @@ class MultipleProxyReader(object):
print "Start new Fetcher, bs=%s range=%s" % (blocksize,fetch_range)
self.fetchers.append( Fetcher(self, fetcher.proxy, self.url, self.header, fetch_range) )
- while self.send_next_data():
- pass
+ buf = StringIO.StringIO()
+ while len(self.blocks)>0 and min(self.blocks)[0] == self.write_pos:
+ item = heappop(self.blocks)
+ buf.write(item[1])
+ self.write_pos += item[2]
+
+ if buf.tell() > 0:
+ self.channel.push_with_producer(StringIOProducer(buf))
+
+ if self.write_pos + 1 >= self.content_length:
+ print self, "job done %s blocks left" % len(self.blocks)
+ self.channel.is_closed = True
+ self.channel.close_when_done()
def next_range(self, suggested_blocksize):
start = self.fetch_pos
@@ -143,33 +168,15 @@ class MultipleProxyReader(object):
self.channel.push("HTTP/1.0 200 OK\r\n")
# Sends header from first response
- headers = httplib.HTTPMessage(StringIO.StringIO(header))
+ header.seek(0)
+ headers = httplib.HTTPMessage(header)
for key in filter(lambda k: k not in ("content-range", "content-length"), headers.dict.keys()):
self.channel.push("%s: %s\r\n" % (key, headers.dict[key]))
self.channel.push("Content-Length: %s" % self.content_length)
self.channel.push("X-Proxy: Magicproxy (superpower activated)\r\n")
self.channel.push("\r\n")
- def send_next_data(self):
- if self.channel.is_closed:
- print self, "request side closed the connection"
- self.channel.close_when_done()
- #XXX terminate all running fetcher
- return False
-
- #print self, "expect data at %s in" % self.write_pos, self.blocks.keys()
- if len(self.blocks)>0 and min(self.blocks)[0] == self.write_pos:
- item = heappop(self.blocks)
- self.channel.push(item[1])
- self.write_pos += len(item[1])
- return True
-
- if self.write_pos + 1 >= self.content_length:
- print self, "job done %s blocks left" % len(self.blocks)
- #XXX prevent next calls to send_next_data
- self.channel.close_when_done()
- return False
def __str__(self):
return "<MultipleProxyReader url=%s content_length=%s>" % (urlparse.urlunparse(self.url), self.content_length)