summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xproxy.py48
1 files changed, 29 insertions, 19 deletions
diff --git a/proxy.py b/proxy.py
index 5ccf3e0..d8ba754 100755
--- a/proxy.py
+++ b/proxy.py
@@ -10,7 +10,7 @@ except ImportError:
ENDPOINTS = [
('10.2.2.11', 8888),
# ('10.3.1.2', 8888),
- ('10.1.1.156', 8888),
+# ('10.1.1.156', 8888),
]
#minimum entity size to start a paralel fetch
@@ -21,17 +21,16 @@ MIN_BLOCKSIZE = 512 * KB
TIME_SLICE = 5
class Fetcher(asynchat.async_chat):
- def __init__(self, reader, endpoint, url, header, range):
+ def __init__(self, reader, endpoint, url, header):
self.reader = reader
self.proxy = ENDPOINTS[endpoint]
self.url = url
self.header = header
- self.range = range
-
- self.start_time = time.time()
+ self.pos = 0
+ self.start_time = 0
+ self.stop_time = 0
self.endpoint = endpoint
-
- self.pos = range[0]
+ self.range = (0,0)
asynchat.async_chat.__init__(self)
self.set_terminator("\r\n")
self.http_status = ""
@@ -40,30 +39,41 @@ class Fetcher(asynchat.async_chat):
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.connect(self.proxy)
+
#XXXshould include original request headers
def handle_connect (self):
print self,"handle_connect"
- self.send("GET http://%s:%s%s HTTP/1.0\r\n" % ( self.url.hostname, self.url.port or 80, self.url.path ))
- #XXX self.header
- self.send("Range: bytes=%s-%s\r\n" % (self.range[0], self.range[1]))
- self.send("\r\n")
-
+ self.fetch()
+
def time(self):
return self.stop_time - self.start_time
+ def fetch(self):
+ self.range = self.reader.next_range(self.endpoint, self.time())
+ if not self.range:
+ self.close_when_done()
+ else:
+ self.pos = self.range[0]
+ self.send("GET http://%s:%s%s HTTP/1.0\r\n" % ( self.url.hostname, self.url.port or 80, self.url.path ))
+ #XXX self.header
+ self.send("Range: bytes=%s-%s\r\n" % (self.range[0], self.range[1]))
+ self.send("Keep-Alive: 300\r\n")
+ self.send("Connection: keep-alive\r\n")
+ self.send("\r\n")
+ self.start_time = time.time()
+
def collect_incoming_data(self, data):
if self.state==2: #body
self.reader.handle_incoming_data(self.pos, data)
self.pos += len(data)
if self.pos >= self.range[1]:
self.stop_time = time.time()
- self.reader.start_fetcher(self.endpoint, self.time())
+ self.fetch()
print self, "finished"
- self.close_when_done()
elif self.state ==1: #header
self.http_header += data
else: #status
- self.http_status += data
+ self.http_status += data
def found_terminator(self):
if self.state == 0:
@@ -100,7 +110,7 @@ class MultipleProxyReader(object):
self.endpoints_blocksize.append(INIT_BLOCKSIZE)
for i in range(0,len(ENDPOINTS)):
- self.start_fetcher(i,0)
+ Fetcher(self, i, self.url, self.header)
def handle_incoming_data(self, pos, data):
self.blocks[pos] = data
@@ -123,7 +133,7 @@ class MultipleProxyReader(object):
self.channel.push("X-Proxy: Magicproxy (superpower activated)\r\n")
self.channel.push("\r\n")
- def start_fetcher(self,endpoint,time):
+ def next_range(self,endpoint,time):
self.find_next_data()
if time != 0:
@@ -137,8 +147,8 @@ class MultipleProxyReader(object):
self.fetch_pos += self.endpoints_blocksize[endpoint]
else:
self.fetch_pos = self.content_length
- range = (start, self.fetch_pos-1)
- Fetcher(self, endpoint, self.url, self.header, range)
+ return (start, self.fetch_pos-1)
+ return None
def find_next_data(self):
if self.channel.is_closed: