From 5101fd299f6676d581ca6c45b556eaa0c4c229a5 Mon Sep 17 00:00:00 2001 From: Yves Date: Sat, 13 Mar 2010 11:26:12 +0100 Subject: simpler and faster --- proxy.py | 159 +++++++++++++++++++++++++++++---------------------------------- 1 file changed, 72 insertions(+), 87 deletions(-) diff --git a/proxy.py b/proxy.py index 2f98eaf..f7fa2ac 100755 --- a/proxy.py +++ b/proxy.py @@ -13,94 +13,82 @@ ENDPOINTS = [ # ('10.1.1.156', 8888), ] -#minimum entity size to start a paralel fetch KB = 1024 -THRESHOLD = 512 * KB +#minimum entity size to start a paralel fetch +THRESHOLD = 0 #512 * KB +#first fetch-range blocksize INIT_BLOCKSIZE = 512 * KB +#lower bound of fetch-range blocksize optimization MIN_BLOCKSIZE = 512 * KB -TIME_SLICE = 20 +#time each fetcher spent on his range, calculated using +#speed measured while using INIT_BLOCKSIZE +TIME_SLICE = 5 +#start a new fetcher on a endpoint X-bytes before the +#old one finished +FETCHER_JUMPSTART = 32 * KB + +################# class Fetcher(asynchat.async_chat): - def __init__(self, reader, endpoint, url, header, first_instance): + def __init__(self, reader, endpoint, url, header, range): self.reader = reader - self.proxy = ENDPOINTS[endpoint] + self.endpoint = endpoint self.url = url self.header = header - self.pos = 0 + self.range = range + + self.pos = self.range[0] + self.proxy = ENDPOINTS[endpoint] self.start_time = 0 self.stop_time = 0 - self.endpoint = endpoint - self.first_instance = first_instance - self.started_second_instance = False - self.range = (0,0) - asynchat.async_chat.__init__(self) - self.set_terminator("\r\n") self.http_status = "" self.http_header = "" self.state = 0 #0=status, 1=header, 2=body + + asynchat.async_chat.__init__(self) + self.set_terminator("\r\n") self.create_socket(socket.AF_INET, socket.SOCK_STREAM) self.connect(self.proxy) - - #XXXshould include original request headers def handle_connect (self): - self.set_terminator("\r\n") - self.http_status = "" - self.http_header = "" - self.state = 0 - self.fetch() - print self,"handle_connect" + print self, "Start" + self.send("GET http://%s:%s%s HTTP/1.0\r\n" % ( self.url.hostname, self.url.port or 80, self.url.path )) + #XXX self.header insert request header from client, and keep alive if supported + self.send("Range: bytes=%s-%s\r\n" % (self.range[0], self.range[1])) + self.send("\r\n") + self.start_time = time.time() def time(self): - return self.stop_time - self.start_time - - def fetch(self): - if self.first_instance: - self.range = self.reader.next_range(self.endpoint, self.time()) - else: - self.range = self.reader.next_range(self.endpoint, 0) - if not self.range: - self.close_when_done() + if self.stop_time == 0: + return time.time() - self.start_time else: - self.pos = self.range[0] - self.send("GET http://%s:%s%s HTTP/1.0\r\n" % ( self.url.hostname, self.url.port or 80, self.url.path )) - #XXX self.header, and keep alive if supported - self.send("Range: bytes=%s-%s\r\n" % (self.range[0], self.range[1])) - #self.send("Keep-Alive: 300\r\n") - #self.send("Connection: keep-alive\r\n") - self.send("\r\n") - self.start_time = time.time() + return self.stop_time - self.start_time + + def speed(self): + return (self.pos - self.range[0]) / self.time() def collect_incoming_data(self, data): if self.state==2: #body - if not self.started_second_instance and self.first_instance: - Fetcher(self.reader, self.endpoint,self.url,self.header,False) - self.started_second_instance = True - self.reader.handle_incoming_data(self.pos, data) + self.reader.handle_incoming_data(self, data) self.pos += len(data) if self.pos >= self.range[1]: self.stop_time = time.time() print self, "finished" - if self.first_instance: - self.close() - self.create_socket(socket.AF_INET, socket.SOCK_STREAM) - self.connect(self.proxy) - else: - self.close_when_done() + #XXX just to make sure the next fetcher will be started + self.reader.handle_incoming_data(self, "") + self.close() elif self.state ==1: #header self.http_header += data else: #status self.http_status += data def found_terminator(self): - if self.state == 0: + if self.state == 0: #got status-line self.state = 1 self.set_terminator("\r\n\r\n") - #print "got status line" - elif self.state == 1: + elif self.state == 1: #got headers self.state = 2 self.set_terminator(None) - print self, "header ends now" self.reader.handle_incoming_http_header(self.http_header) def __str__(self): @@ -113,31 +101,40 @@ class MultipleProxyReader(object): self.header = header self.content_length = content_length - print self - self.header_sent = False self.fetch_pos = 0 self.write_pos = 0 self.buffer = "" self.blocks = dict() - self.endpoints_time = [] - self.endpoints_blocksize = [] - - self.time_slice = int(content_length / (100.0 * KB) / len(ENDPOINTS) / 10.0) + 1 - print self, "Time slice: %s" % self.time_slice - - for i in range(0,len(ENDPOINTS)): - self.endpoints_blocksize.append(INIT_BLOCKSIZE) - - for i in range(0,len(ENDPOINTS)): - Fetcher(self,i,self.url,self.header,True) + self.fetchers = list() - def handle_incoming_data(self, pos, data): - self.blocks[pos] = data + for i in range(len(ENDPOINTS)): + self.fetchers.append( Fetcher(self, i, self.url, self.header, self.next_range(INIT_BLOCKSIZE)) ) - while self.find_next_data(): + def handle_incoming_data(self, fetcher, data): + if len(data) == 0: + self.fetchers = filter(lambda f: f != fetcher, self.fetchers) + else: + self.blocks[fetcher.pos] = data + + if fetcher.range[1] - fetcher.pos < FETCHER_JUMPSTART \ + and self.fetch_pos + 1 < self.content_length and not self.channel.is_closed \ + and len( filter( lambda f: f.endpoint == fetcher.endpoint, self.fetchers) ) < 2: + #Start a new fetcher on this line if this fetchers is X-Bytes before finishing his jobs + blocksize = min(TIME_SLICE * int(fetcher.speed()), MIN_BLOCKSIZE) + range = self.next_range(blocksize) + print "Start new Fetcher, bs=%s range=%s" % (blocksize,range) + self.fetchers.append( Fetcher(self, fetcher.endpoint, self.url, self.header, range) ) + + while self.send_next_data(): pass + def next_range(self, suggested_blocksize): + start = self.fetch_pos + self.fetch_pos = min(self.fetch_pos + suggested_blocksize, self.content_length) + r=(start, self.fetch_pos-1) + return r + def handle_incoming_http_header(self, header): if self.header_sent: pass @@ -153,30 +150,14 @@ class MultipleProxyReader(object): self.channel.push("X-Proxy: Magicproxy (superpower activated)\r\n") self.channel.push("\r\n") - def next_range(self,endpoint,time): - self.find_next_data() - - if time != 0: - speed = (self.endpoints_blocksize[endpoint] / time) - self.endpoints_blocksize[endpoint] = int(speed * self.time_slice) - - if self.fetch_pos +1 " % (self.url, self.content_length) + return "" % (urlparse.urlunparse(self.url), self.content_length) class HTTPChannel(asynchat.async_chat): def __init__(self, server, sock, addr): @@ -254,7 +235,11 @@ class HTTPProxyServer(asyncore.dispatcher): self._bypass_request(channel, "GET", url) else: content_length = int(content_length[0][1]) - ##XXX + + if content_length < THRESHOLD: + self._bypass_request(channel, "GET", url) + else: + #XXX parse original headers to send them with all fetcher-requests header = (("foo", "bla")) MultipleProxyReader(channel, url, header, content_length) -- cgit v1.2.1