summaryrefslogtreecommitdiff
path: root/proxy.py
diff options
context:
space:
mode:
authorYves <yvesf-git@xapek.org>2010-03-13 11:26:12 +0100
committerYves <yvesf-git@xapek.org>2010-03-13 11:26:12 +0100
commit5101fd299f6676d581ca6c45b556eaa0c4c229a5 (patch)
tree73f8dc765fe1458bb96aecc23dc62055f7e4b68d /proxy.py
parentbedbc4d9b1d5b73876fd8d8dfca602a0eafbea84 (diff)
downloadmagicproxy-5101fd299f6676d581ca6c45b556eaa0c4c229a5.tar.gz
magicproxy-5101fd299f6676d581ca6c45b556eaa0c4c229a5.zip
simpler and faster
Diffstat (limited to 'proxy.py')
-rwxr-xr-xproxy.py159
1 files changed, 72 insertions, 87 deletions
diff --git a/proxy.py b/proxy.py
index 2f98eaf..f7fa2ac 100755
--- a/proxy.py
+++ b/proxy.py
@@ -13,94 +13,82 @@ ENDPOINTS = [
# ('10.1.1.156', 8888),
]
-#minimum entity size to start a paralel fetch
KB = 1024
-THRESHOLD = 512 * KB
+#minimum entity size to start a paralel fetch
+THRESHOLD = 0 #512 * KB
+#first fetch-range blocksize
INIT_BLOCKSIZE = 512 * KB
+#lower bound of fetch-range blocksize optimization
MIN_BLOCKSIZE = 512 * KB
-TIME_SLICE = 20
+#time each fetcher spent on his range, calculated using
+#speed measured while using INIT_BLOCKSIZE
+TIME_SLICE = 5
+#start a new fetcher on a endpoint X-bytes before the
+#old one finished
+FETCHER_JUMPSTART = 32 * KB
+
+#################
class Fetcher(asynchat.async_chat):
- def __init__(self, reader, endpoint, url, header, first_instance):
+ def __init__(self, reader, endpoint, url, header, range):
self.reader = reader
- self.proxy = ENDPOINTS[endpoint]
+ self.endpoint = endpoint
self.url = url
self.header = header
- self.pos = 0
+ self.range = range
+
+ self.pos = self.range[0]
+ self.proxy = ENDPOINTS[endpoint]
self.start_time = 0
self.stop_time = 0
- self.endpoint = endpoint
- self.first_instance = first_instance
- self.started_second_instance = False
- self.range = (0,0)
- asynchat.async_chat.__init__(self)
- self.set_terminator("\r\n")
self.http_status = ""
self.http_header = ""
self.state = 0 #0=status, 1=header, 2=body
+
+ asynchat.async_chat.__init__(self)
+ self.set_terminator("\r\n")
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.connect(self.proxy)
-
- #XXXshould include original request headers
def handle_connect (self):
- self.set_terminator("\r\n")
- self.http_status = ""
- self.http_header = ""
- self.state = 0
- self.fetch()
- print self,"handle_connect"
+ print self, "Start"
+ self.send("GET http://%s:%s%s HTTP/1.0\r\n" % ( self.url.hostname, self.url.port or 80, self.url.path ))
+ #XXX self.header insert request header from client, and keep alive if supported
+ self.send("Range: bytes=%s-%s\r\n" % (self.range[0], self.range[1]))
+ self.send("\r\n")
+ self.start_time = time.time()
def time(self):
- return self.stop_time - self.start_time
-
- def fetch(self):
- if self.first_instance:
- self.range = self.reader.next_range(self.endpoint, self.time())
- else:
- self.range = self.reader.next_range(self.endpoint, 0)
- if not self.range:
- self.close_when_done()
+ if self.stop_time == 0:
+ return time.time() - self.start_time
else:
- self.pos = self.range[0]
- self.send("GET http://%s:%s%s HTTP/1.0\r\n" % ( self.url.hostname, self.url.port or 80, self.url.path ))
- #XXX self.header, and keep alive if supported
- self.send("Range: bytes=%s-%s\r\n" % (self.range[0], self.range[1]))
- #self.send("Keep-Alive: 300\r\n")
- #self.send("Connection: keep-alive\r\n")
- self.send("\r\n")
- self.start_time = time.time()
+ return self.stop_time - self.start_time
+
+ def speed(self):
+ return (self.pos - self.range[0]) / self.time()
def collect_incoming_data(self, data):
if self.state==2: #body
- if not self.started_second_instance and self.first_instance:
- Fetcher(self.reader, self.endpoint,self.url,self.header,False)
- self.started_second_instance = True
- self.reader.handle_incoming_data(self.pos, data)
+ self.reader.handle_incoming_data(self, data)
self.pos += len(data)
if self.pos >= self.range[1]:
self.stop_time = time.time()
print self, "finished"
- if self.first_instance:
- self.close()
- self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
- self.connect(self.proxy)
- else:
- self.close_when_done()
+ #XXX just to make sure the next fetcher will be started
+ self.reader.handle_incoming_data(self, "")
+ self.close()
elif self.state ==1: #header
self.http_header += data
else: #status
self.http_status += data
def found_terminator(self):
- if self.state == 0:
+ if self.state == 0: #got status-line
self.state = 1
self.set_terminator("\r\n\r\n")
- #print "got status line"
- elif self.state == 1:
+ elif self.state == 1: #got headers
self.state = 2
self.set_terminator(None)
- print self, "header ends now"
self.reader.handle_incoming_http_header(self.http_header)
def __str__(self):
@@ -113,31 +101,40 @@ class MultipleProxyReader(object):
self.header = header
self.content_length = content_length
- print self
-
self.header_sent = False
self.fetch_pos = 0
self.write_pos = 0
self.buffer = ""
self.blocks = dict()
- self.endpoints_time = []
- self.endpoints_blocksize = []
-
- self.time_slice = int(content_length / (100.0 * KB) / len(ENDPOINTS) / 10.0) + 1
- print self, "Time slice: %s" % self.time_slice
-
- for i in range(0,len(ENDPOINTS)):
- self.endpoints_blocksize.append(INIT_BLOCKSIZE)
-
- for i in range(0,len(ENDPOINTS)):
- Fetcher(self,i,self.url,self.header,True)
+ self.fetchers = list()
- def handle_incoming_data(self, pos, data):
- self.blocks[pos] = data
+ for i in range(len(ENDPOINTS)):
+ self.fetchers.append( Fetcher(self, i, self.url, self.header, self.next_range(INIT_BLOCKSIZE)) )
- while self.find_next_data():
+ def handle_incoming_data(self, fetcher, data):
+ if len(data) == 0:
+ self.fetchers = filter(lambda f: f != fetcher, self.fetchers)
+ else:
+ self.blocks[fetcher.pos] = data
+
+ if fetcher.range[1] - fetcher.pos < FETCHER_JUMPSTART \
+ and self.fetch_pos + 1 < self.content_length and not self.channel.is_closed \
+ and len( filter( lambda f: f.endpoint == fetcher.endpoint, self.fetchers) ) < 2:
+ #Start a new fetcher on this line if this fetchers is X-Bytes before finishing his jobs
+ blocksize = min(TIME_SLICE * int(fetcher.speed()), MIN_BLOCKSIZE)
+ range = self.next_range(blocksize)
+ print "Start new Fetcher, bs=%s range=%s" % (blocksize,range)
+ self.fetchers.append( Fetcher(self, fetcher.endpoint, self.url, self.header, range) )
+
+ while self.send_next_data():
pass
+ def next_range(self, suggested_blocksize):
+ start = self.fetch_pos
+ self.fetch_pos = min(self.fetch_pos + suggested_blocksize, self.content_length)
+ r=(start, self.fetch_pos-1)
+ return r
+
def handle_incoming_http_header(self, header):
if self.header_sent:
pass
@@ -153,30 +150,14 @@ class MultipleProxyReader(object):
self.channel.push("X-Proxy: Magicproxy (superpower activated)\r\n")
self.channel.push("\r\n")
- def next_range(self,endpoint,time):
- self.find_next_data()
-
- if time != 0:
- speed = (self.endpoints_blocksize[endpoint] / time)
- self.endpoints_blocksize[endpoint] = int(speed * self.time_slice)
-
- if self.fetch_pos +1 <self.content_length and not self.channel.is_closed:
- start = self.fetch_pos
- if self.fetch_pos+self.endpoints_blocksize[endpoint] < self.content_length:
- self.fetch_pos += self.endpoints_blocksize[endpoint]
- else:
- self.fetch_pos = self.content_length
- return (start, self.fetch_pos-1)
- return None
-
- def find_next_data(self):
+ def send_next_data(self):
if self.channel.is_closed:
print self, "request side closed the connection"
self.channel.close_when_done()
#XXX terminate all running fetcher
return False
- #print self, "expect at %s" % self.write_pos, self.blocks.keys()
+ #print self, "expect data at %s in" % self.write_pos, self.blocks.keys()
if self.write_pos in self.blocks.keys():
data = self.blocks.pop(self.write_pos)
self.channel.push(data)
@@ -190,7 +171,7 @@ class MultipleProxyReader(object):
return False
def __str__(self):
- return "<MultipleProxyReader url=%s content_length=%s>" % (self.url, self.content_length)
+ return "<MultipleProxyReader url=%s content_length=%s>" % (urlparse.urlunparse(self.url), self.content_length)
class HTTPChannel(asynchat.async_chat):
def __init__(self, server, sock, addr):
@@ -254,7 +235,11 @@ class HTTPProxyServer(asyncore.dispatcher):
self._bypass_request(channel, "GET", url)
else:
content_length = int(content_length[0][1])
- ##XXX
+
+ if content_length < THRESHOLD:
+ self._bypass_request(channel, "GET", url)
+ else:
+ #XXX parse original headers to send them with all fetcher-requests
header = (("foo", "bla"))
MultipleProxyReader(channel, url, header, content_length)