summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--proxy.py37
1 files changed, 25 insertions, 12 deletions
diff --git a/proxy.py b/proxy.py
index 7c873b9..93c1994 100644
--- a/proxy.py
+++ b/proxy.py
@@ -17,15 +17,19 @@ ENDPOINTS = [
KB = 1024
MINIMAL_SIZE = 512 * KB
BLOCKSIZE_FETCH = 2048 * KB
+BLOCKSIZE_STEP = 512 * KB
+
class Fetcher(asynchat.async_chat):
- def __init__(self, reader, proxy, url, header, range):
+ def __init__(self, reader, endpoint, url, header, range):
self.reader = reader
- self.proxy = proxy
+ self.proxy = ENDPOINTS(endpoint)
self.url = url
self.header = header
self.range = range
self.start_time = time.time()
+ self.endpoint = endpoint
+
self.pos = range[0]
asynchat.async_chat.__init__(self)
self.set_terminator("\r\n")
@@ -33,7 +37,7 @@ class Fetcher(asynchat.async_chat):
self.http_header = ""
self.state = 0 #0=status, 1=header, 2=body
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
- self.connect(proxy)
+ self.connect(self.proxy)
#XXXshould include original request headers
def handle_connect (self):
@@ -50,9 +54,9 @@ class Fetcher(asynchat.async_chat):
if self.state==2: #body
self.reader.handle_incoming_data(self.pos, data)
self.pos += len(data)
- if self.pos >= self.range[1]:
- self.reader.finished()
+ if self.pos >= self.range[1]:
self.stop_time = time.time()
+ self.start_fetcher(endpoint, self.time())
print self, "finished"
self.close_when_done()
elif self.state ==1: #header
@@ -88,10 +92,13 @@ class MultipleProxyReader(object):
self.write_pos = 0
self.buffer = ""
self.blocks = dict()
- self.next_endpoint = 0
+
+ for i in range(0,len(ENDPOINTS)):
+ self.endpoints_time.append(1)
+ self.endpoints_blocksize.append(BLOCKSIZE_FETCH)
for i in range(0,len(ENDPOINTS)):
- self.finished() #bad-named XXX
+ self.start_fetcher(i,1)
def handle_incoming_data(self, pos, data):
self.blocks[pos] = data
@@ -114,18 +121,24 @@ class MultipleProxyReader(object):
self.channel.push("X-Proxy: Magicproxy (superpower activated)\r\n")
self.channel.push("\r\n")
- def finished(self):
+ def start_fetcher(self,endpoint,time):
self.find_next_data()
+
+ self.endpoints_time[endpoint] = time
+ if max(self.endpoints_times[endpoint]) == time:
+ self.endpoint_blocksize[endpoint] += BLOCKSIZE_STEP
+ elif min(self.endpoints_times[endpoint]) == time:
+ self.endpoint_blocksize[endpoint] -= BLOCKSIZE_STEP
+
if self.fetch_pos +1 <self.content_length:
print self, "start new fetcher"
start = self.fetch_pos
- if self.fetch_pos+BLOCKSIZE_FETCH < self.content_length:
- self.fetch_pos += BLOCKSIZE_FETCH
+ if self.fetch_pos+self.endpoint_blocksize[endpoint] < self.content_length:
+ self.fetch_pos += self.endpoint_blocksize[endpoint]
else:
self.fetch_pos = self.content_length
range = (start, self.fetch_pos-1)
- Fetcher(self, ENDPOINTS[self.next_endpoint], self.url, self.header, range)
- self.next_endpoint = (self.next_endpoint+1) % len(ENDPOINTS)
+ Fetcher(self, ENDPOINTS[endpoint], self.url, self.header, range)
def find_next_data(self):
if self.channel.is_closed: