summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYves <yvesf-git@xapek.org>2010-03-28 03:35:17 +0000
committerYves <yvesf-git@xapek.org>2010-03-28 03:35:17 +0000
commitb0d0da7cfc7116a0234b2021d03085a9ce7f5595 (patch)
tree644f3c00580502c28fcc1d4c70ed486a09928687
parente40b169239b9a257be66480c164554652d2123bb (diff)
downloadmagicproxy-b0d0da7cfc7116a0234b2021d03085a9ce7f5595.tar.gz
magicproxy-b0d0da7cfc7116a0234b2021d03085a9ce7f5595.zip
fix job partitioning logic, but no error handling so far
-rwxr-xr-xproxy.py52
1 files changed, 29 insertions, 23 deletions
diff --git a/proxy.py b/proxy.py
index ac01249..c11236d 100755
--- a/proxy.py
+++ b/proxy.py
@@ -6,8 +6,8 @@ import cStringIO as StringIO
ENDPOINTS = [
-# ('10.2.2.11', 8888),
- ('10.3.1.2', 8888),
+ ('10.2.2.11', 8888),
+# ('10.3.1.2', 8888),
# ('10.1.1.156', 8888),
]
@@ -73,11 +73,11 @@ class Fetcher(asynchat.async_chat):
def collect_incoming_data(self, data):
if self.state==2: #body
length = len(data)
- if self.range != (-1,-1) and self.pos + length > self.range[1]:
+ if self.range != (-1,-1) and self.pos + length >= self.range[1] + 1:
#if this request is the first one (whithout Range: ...) then the server
# dont send us our expected range, we must cut it at some point (here)
- bytes_remaining = self.range[1] - self.pos + 1 #XXX
- data=data[:bytes_remaining]
+ bytes_remaining = self.range[1] - ( self.pos )
+ data=data[:bytes_remaining+1]
print self,"cut: pos=%s length=%s => %s" % (self.pos, length, len(data))
length = len(data)
if length == 0:
@@ -91,6 +91,7 @@ class Fetcher(asynchat.async_chat):
if self.range != (-1,-1) and self.pos >= self.range[1]:
self.stop_time = time.time()
print self, "finished with %s kB/s" % (self.speed() / 1024)
+ self.reader.handle_incoming_data(self)
self.close()
elif self.state ==1: #header
self.http_header.write( data )
@@ -130,6 +131,7 @@ class MagicHTTPProxyClient(object):
if not data:
#fetcher is done, remove from list
self.fetchers = filter(lambda f: f != fetcher, self.fetchers)
+ print "Remove: %s" % fetcher
else:
assert fetcher.pos < fetcher.range[1] or fetcher.range == (-1,-1)
heappush(self.blocks, (fetcher.pos, data, length))
@@ -140,15 +142,15 @@ class MagicHTTPProxyClient(object):
if fetcher.range != (-1,-1) \
and fetcher.range[1] - (fetcher.pos+length) < FETCHER_JUMPSTART \
- and self.fetch_pos + 1 < self.content_length and self.channel.connected \
+ and self.fetch_pos + 1 < self.content_length \
and len( filter(lambda f: f.proxy == fetcher.proxy, self.fetchers) ) < 2:
#Start a new fetcher if this fetcher is X-Bytes before finished his job
blocksize = max(int(TIME_SLICE * fetcher.speed()), MIN_BLOCKSIZE)
fetch_range = self.next_range(blocksize)
self.fetchers.append( Fetcher(self, fetcher.proxy, self.url, self.header, fetch_range) )
- if len(self.blocks)>0:
- print self,"fetch_pos=%s write_pos=%s get=%s with length=%s pending=%s" % (self.fetch_pos, self.write_pos, min(self.blocks)[0],min(self.blocks)[2], len(self.blocks))
+ #if len(self.blocks)>0:
+ #print self,"fetch_pos=%s write_pos=%s get=%s with length=%s pending=%s" % (self.fetch_pos, self.write_pos, min(self.blocks)[0],min(self.blocks)[2], len(self.blocks))
buf = ""
while len(self.blocks)>0 and min(self.blocks)[0] == self.write_pos:
@@ -240,6 +242,7 @@ class HTTPProxyClient(asynchat.async_chat):
class HTTPChannel(asynchat.async_chat):
def __init__(self, server, sock, addr):
+ print "Channel opened"
self.server = server
self.data = StringIO.StringIO()
@@ -266,8 +269,23 @@ class HTTPChannel(asynchat.async_chat):
else:
self.set_terminator(None)
headers = httplib.HTTPMessage(self.data).dict
- self.server.handle_request(self, self.request[0], self.request[1], headers)
-
+ self.handle_request(self, self.request[0], self.request[1], headers)
+
+ def handle_request(self, channel, method, path, headers):
+ url = urlparse.urlparse(path)
+ if method != "GET":
+ #do not handle non-GET or GET with Query (?foo=bla) requests
+ return self._bypass_request(channel, method, url, headers)
+ else:
+ MagicHTTPProxyClient(channel, url, headers)
+
+ def _bypass_request(self, channel, method, url, headers):
+ proxy = ENDPOINTS[ int( md5.md5(url.hostname).hexdigest(),16 ) % len(ENDPOINTS) ]
+ print self, "_bypass request via %s: %s %s" % (proxy, method, urlparse.urlunparse(url))
+ HTTPProxyClient(proxy, channel, method, url, headers)
+ #FIXME use this other thing
+
+
class HTTPProxyServer(asyncore.dispatcher):
def __init__(self):
@@ -286,19 +304,7 @@ class HTTPProxyServer(asyncore.dispatcher):
conn, addr = self.accept()
HTTPChannel(self, conn, addr)
- def handle_request(self, channel, method, path, headers):
- url = urlparse.urlparse(path)
- if method != "GET":
- #do not handle non-GET or GET with Query (?foo=bla) requests
- return self._bypass_request(channel, method, url, headers)
- else:
- MagicHTTPProxyClient(channel, url, headers)
-
- def _bypass_request(self, channel, method, url, headers):
- proxy = ENDPOINTS[ int( md5.md5(url.hostname).hexdigest(),16 ) % len(ENDPOINTS) ]
- print self, "_bypass request via %s: %s %s" % (proxy, method, urlparse.urlunparse(url))
- HTTPProxyClient(proxy, channel, method, url, headers)
-
if __name__ == "__main__":
proxy = HTTPProxyServer()
+ print proxy
asyncore.loop()