From b0d0da7cfc7116a0234b2021d03085a9ce7f5595 Mon Sep 17 00:00:00 2001 From: Yves Date: Sun, 28 Mar 2010 03:35:17 +0000 Subject: fix job partitioning logic, but no error handling so far --- proxy.py | 52 +++++++++++++++++++++++++++++----------------------- 1 file changed, 29 insertions(+), 23 deletions(-) diff --git a/proxy.py b/proxy.py index ac01249..c11236d 100755 --- a/proxy.py +++ b/proxy.py @@ -6,8 +6,8 @@ import cStringIO as StringIO ENDPOINTS = [ -# ('10.2.2.11', 8888), - ('10.3.1.2', 8888), + ('10.2.2.11', 8888), +# ('10.3.1.2', 8888), # ('10.1.1.156', 8888), ] @@ -73,11 +73,11 @@ class Fetcher(asynchat.async_chat): def collect_incoming_data(self, data): if self.state==2: #body length = len(data) - if self.range != (-1,-1) and self.pos + length > self.range[1]: + if self.range != (-1,-1) and self.pos + length >= self.range[1] + 1: #if this request is the first one (whithout Range: ...) then the server # dont send us our expected range, we must cut it at some point (here) - bytes_remaining = self.range[1] - self.pos + 1 #XXX - data=data[:bytes_remaining] + bytes_remaining = self.range[1] - ( self.pos ) + data=data[:bytes_remaining+1] print self,"cut: pos=%s length=%s => %s" % (self.pos, length, len(data)) length = len(data) if length == 0: @@ -91,6 +91,7 @@ class Fetcher(asynchat.async_chat): if self.range != (-1,-1) and self.pos >= self.range[1]: self.stop_time = time.time() print self, "finished with %s kB/s" % (self.speed() / 1024) + self.reader.handle_incoming_data(self) self.close() elif self.state ==1: #header self.http_header.write( data ) @@ -130,6 +131,7 @@ class MagicHTTPProxyClient(object): if not data: #fetcher is done, remove from list self.fetchers = filter(lambda f: f != fetcher, self.fetchers) + print "Remove: %s" % fetcher else: assert fetcher.pos < fetcher.range[1] or fetcher.range == (-1,-1) heappush(self.blocks, (fetcher.pos, data, length)) @@ -140,15 +142,15 @@ class MagicHTTPProxyClient(object): if fetcher.range != (-1,-1) \ and fetcher.range[1] - (fetcher.pos+length) < FETCHER_JUMPSTART \ - and self.fetch_pos + 1 < self.content_length and self.channel.connected \ + and self.fetch_pos + 1 < self.content_length \ and len( filter(lambda f: f.proxy == fetcher.proxy, self.fetchers) ) < 2: #Start a new fetcher if this fetcher is X-Bytes before finished his job blocksize = max(int(TIME_SLICE * fetcher.speed()), MIN_BLOCKSIZE) fetch_range = self.next_range(blocksize) self.fetchers.append( Fetcher(self, fetcher.proxy, self.url, self.header, fetch_range) ) - if len(self.blocks)>0: - print self,"fetch_pos=%s write_pos=%s get=%s with length=%s pending=%s" % (self.fetch_pos, self.write_pos, min(self.blocks)[0],min(self.blocks)[2], len(self.blocks)) + #if len(self.blocks)>0: + #print self,"fetch_pos=%s write_pos=%s get=%s with length=%s pending=%s" % (self.fetch_pos, self.write_pos, min(self.blocks)[0],min(self.blocks)[2], len(self.blocks)) buf = "" while len(self.blocks)>0 and min(self.blocks)[0] == self.write_pos: @@ -240,6 +242,7 @@ class HTTPProxyClient(asynchat.async_chat): class HTTPChannel(asynchat.async_chat): def __init__(self, server, sock, addr): + print "Channel opened" self.server = server self.data = StringIO.StringIO() @@ -266,8 +269,23 @@ class HTTPChannel(asynchat.async_chat): else: self.set_terminator(None) headers = httplib.HTTPMessage(self.data).dict - self.server.handle_request(self, self.request[0], self.request[1], headers) - + self.handle_request(self, self.request[0], self.request[1], headers) + + def handle_request(self, channel, method, path, headers): + url = urlparse.urlparse(path) + if method != "GET": + #do not handle non-GET or GET with Query (?foo=bla) requests + return self._bypass_request(channel, method, url, headers) + else: + MagicHTTPProxyClient(channel, url, headers) + + def _bypass_request(self, channel, method, url, headers): + proxy = ENDPOINTS[ int( md5.md5(url.hostname).hexdigest(),16 ) % len(ENDPOINTS) ] + print self, "_bypass request via %s: %s %s" % (proxy, method, urlparse.urlunparse(url)) + HTTPProxyClient(proxy, channel, method, url, headers) + #FIXME use this other thing + + class HTTPProxyServer(asyncore.dispatcher): def __init__(self): @@ -286,19 +304,7 @@ class HTTPProxyServer(asyncore.dispatcher): conn, addr = self.accept() HTTPChannel(self, conn, addr) - def handle_request(self, channel, method, path, headers): - url = urlparse.urlparse(path) - if method != "GET": - #do not handle non-GET or GET with Query (?foo=bla) requests - return self._bypass_request(channel, method, url, headers) - else: - MagicHTTPProxyClient(channel, url, headers) - - def _bypass_request(self, channel, method, url, headers): - proxy = ENDPOINTS[ int( md5.md5(url.hostname).hexdigest(),16 ) % len(ENDPOINTS) ] - print self, "_bypass request via %s: %s %s" % (proxy, method, urlparse.urlunparse(url)) - HTTPProxyClient(proxy, channel, method, url, headers) - if __name__ == "__main__": proxy = HTTPProxyServer() + print proxy asyncore.loop() -- cgit v1.2.1