From bedbc4d9b1d5b73876fd8d8dfca602a0eafbea84 Mon Sep 17 00:00:00 2001 From: Thomas Keck Date: Sat, 13 Mar 2010 01:08:44 +0100 Subject: Big blocks now and second instance --- proxy.py | 39 +++++++++++++++++++++++++++++---------- 1 file changed, 29 insertions(+), 10 deletions(-) diff --git a/proxy.py b/proxy.py index 31a2936..2f98eaf 100755 --- a/proxy.py +++ b/proxy.py @@ -1,5 +1,5 @@ #!/usr/bin/python -t -import pwd, os, sys, logging, logging.handlers, string, time +import os, sys, string, time import asynchat, asyncore, socket, httplib, urlparse try: import cStringIO as StringIO @@ -16,12 +16,12 @@ ENDPOINTS = [ #minimum entity size to start a paralel fetch KB = 1024 THRESHOLD = 512 * KB -INIT_BLOCKSIZE = 2048 * KB +INIT_BLOCKSIZE = 512 * KB MIN_BLOCKSIZE = 512 * KB -TIME_SLICE = 5 +TIME_SLICE = 20 class Fetcher(asynchat.async_chat): - def __init__(self, reader, endpoint, url, header): + def __init__(self, reader, endpoint, url, header, first_instance): self.reader = reader self.proxy = ENDPOINTS[endpoint] self.url = url @@ -30,6 +30,8 @@ class Fetcher(asynchat.async_chat): self.start_time = 0 self.stop_time = 0 self.endpoint = endpoint + self.first_instance = first_instance + self.started_second_instance = False self.range = (0,0) asynchat.async_chat.__init__(self) self.set_terminator("\r\n") @@ -42,14 +44,21 @@ class Fetcher(asynchat.async_chat): #XXXshould include original request headers def handle_connect (self): - print self,"handle_connect" + self.set_terminator("\r\n") + self.http_status = "" + self.http_header = "" + self.state = 0 self.fetch() + print self,"handle_connect" def time(self): return self.stop_time - self.start_time def fetch(self): - self.range = self.reader.next_range(self.endpoint, self.time()) + if self.first_instance: + self.range = self.reader.next_range(self.endpoint, self.time()) + else: + self.range = self.reader.next_range(self.endpoint, 0) if not self.range: self.close_when_done() else: @@ -64,12 +73,20 @@ class Fetcher(asynchat.async_chat): def collect_incoming_data(self, data): if self.state==2: #body + if not self.started_second_instance and self.first_instance: + Fetcher(self.reader, self.endpoint,self.url,self.header,False) + self.started_second_instance = True self.reader.handle_incoming_data(self.pos, data) self.pos += len(data) if self.pos >= self.range[1]: self.stop_time = time.time() - self.fetch() print self, "finished" + if self.first_instance: + self.close() + self.create_socket(socket.AF_INET, socket.SOCK_STREAM) + self.connect(self.proxy) + else: + self.close_when_done() elif self.state ==1: #header self.http_header += data else: #status @@ -106,11 +123,14 @@ class MultipleProxyReader(object): self.endpoints_time = [] self.endpoints_blocksize = [] + self.time_slice = int(content_length / (100.0 * KB) / len(ENDPOINTS) / 10.0) + 1 + print self, "Time slice: %s" % self.time_slice + for i in range(0,len(ENDPOINTS)): self.endpoints_blocksize.append(INIT_BLOCKSIZE) for i in range(0,len(ENDPOINTS)): - Fetcher(self, i, self.url, self.header) + Fetcher(self,i,self.url,self.header,True) def handle_incoming_data(self, pos, data): self.blocks[pos] = data @@ -138,10 +158,9 @@ class MultipleProxyReader(object): if time != 0: speed = (self.endpoints_blocksize[endpoint] / time) - self.endpoints_blocksize[endpoint] = int(speed * TIME_SLICE) + self.endpoints_blocksize[endpoint] = int(speed * self.time_slice) if self.fetch_pos +1