diff options
-rw-r--r-- | proxy.py | 36 |
1 files changed, 24 insertions, 12 deletions
@@ -1,5 +1,5 @@ #!/usr/bin/python -t -import pwd, os, sys, logging, logging.handlers, string +import pwd, os, sys, logging, logging.handlers, string, time import asynchat, asyncore, socket, httplib, urlparse try: import cStringIO as StringIO @@ -17,15 +17,18 @@ ENDPOINTS = [ KB = 1024 MINIMAL_SIZE = 512 * KB BLOCKSIZE_FETCH = 2048 * KB +BLOCKSIZE_STEP = 512 * KB + class Fetcher(asynchat.async_chat): - def __init__(self, reader, proxy, url, header, range): + def __init__(self, reader, endpoint, url, header, range): self.reader = reader - self.proxy = proxy + self.proxy = ENDPOINTS(endpoint) self.url = url self.header = self.header self.range = range self.start_time = time.time() + self.endpoint = endpoint self.pos = range[0] asynchat.async_chat.__init__(self) @@ -51,9 +54,9 @@ class Fetcher(asynchat.async_chat): if self.state==2: #body self.reader.handle_incoming_data(self.pos, data) self.pos += len(data) - if self.pos >= self.range[1]: - self.reader.finished() + if self.pos >= self.range[1]: self.stop_time = time.time() + self.start_fetcher(endpoint, self.time()) print self, "finished" self.close_when_done() elif self.state ==1: #header @@ -88,10 +91,13 @@ class MultipleProxyReader(object): self.write_pos = 0 self.buffer = "" self.blocks = dict() - self.next_endpoint = 0 + + for i in range(0,len(ENDPOINTS)): + self.endpoints_time.append(1) + self.endpoints_blocksize.append(BLOCKSIZE_FETCH) for i in range(0,len(ENDPOINTS)): - self.finished() #bad-named XXX + self.start_fetcher(i,1) def handle_incoming_data(self, pos, data): self.blocks[pos] = data @@ -114,18 +120,24 @@ class MultipleProxyReader(object): self.channel.push("X-Proxy: Magicproxy (superpower activated)\r\n") self.channel.push("\r\n") - def finished(self): + def start_fetcher(self,endpoint,time): self.find_next_data() + + self.endpoints_time[endpoint] = time + if max(self.endpoints_times[endpoint]) == time: + self.endpoint_blocksize[endpoint] += BLOCKSIZE_STEP + elif min(self.endpoints_times[endpoint]) == time: + self.endpoint_blocksize[endpoint] -= BLOCKSIZE_STEP + if self.fetch_pos +1 <self.content_length: print self, "start new fetcher" start = self.fetch_pos - if self.fetch_pos+BLOCKSIZE_FETCH < self.content_length: - self.fetch_pos += BLOCKSIZE_FETCH + if self.fetch_pos+self.endpoint_blocksize[endpoint] < self.content_length: + self.fetch_pos += self.endpoint_blocksize[endpoint] else: self.fetch_pos = self.content_length range = (start, self.fetch_pos-1) - Fetcher(self, ENDPOINTS[self.next_endpoint], self.url, self.header, range, self.content_length) - self.next_endpoint = (self.next_endpoint+1) % len(ENDPOINTS) + Fetcher(self, ENDPOINTS[endpoint], self.url, self.header, range, self.content_length) def find_next_data(self): if not self.channel.writeable(): # request-side closed the connection |