diff options
author | Yves <yvesf-git@xapek.org> | 2010-03-17 00:14:06 +0100 |
---|---|---|
committer | Yves <yvesf-git@xapek.org> | 2010-03-17 00:14:06 +0100 |
commit | e40b169239b9a257be66480c164554652d2123bb (patch) | |
tree | d2232e52a46d07abd072523c3c970ba86de6e81f | |
parent | c739d682aab415cb5e18558622eba92d1e2110ca (diff) | |
download | magicproxy-e40b169239b9a257be66480c164554652d2123bb.tar.gz magicproxy-e40b169239b9a257be66480c164554652d2123bb.zip |
blablablablabla
-rwxr-xr-x | proxy.py | 80 |
1 files changed, 57 insertions, 23 deletions
@@ -41,7 +41,7 @@ class Fetcher(asynchat.async_chat): self.http_status = "" self.http_header = StringIO.StringIO() self.state = 0 #0=status, 1=header, 2=body - + print self, "__init__" asynchat.async_chat.__init__(self) self.set_terminator("\r\n") self.create_socket(socket.AF_INET, socket.SOCK_STREAM) @@ -52,12 +52,13 @@ class Fetcher(asynchat.async_chat): def handle_connect (self): print self, "Start" - self.send("GET http://%s:%s%s HTTP/1.0\r\n" % ( self.url.hostname, self.url.port or 80, self.url.path )) + buf = "GET http://%s:%s%s HTTP/1.0\r\n" % ( self.url.hostname, self.url.port or 80, self.url.path ) for key in filter(lambda k: k not in ("range"), self.headers.keys()): #send origin request headers - self.send("%s: %s\r\n" % (key, self.headers[key])) + buf += "%s: %s\r\n" % (key, self.headers[key]) if self.range != (-1,-1): - self.send("Range: bytes=%s-%s\r\n" % (self.range[0], self.range[1])) - self.send("\r\n") + buf += "Range: bytes=%s-%s\r\n" % (self.range[0], self.range[1]) + buf += "\r\n" + self.push(buf) self.start_time = time.time() def time(self): @@ -72,23 +73,24 @@ class Fetcher(asynchat.async_chat): def collect_incoming_data(self, data): if self.state==2: #body length = len(data) - if self.range != (-1,-1) and self.pos + length >= self.range[1]: + if self.range != (-1,-1) and self.pos + length > self.range[1]: #if this request is the first one (whithout Range: ...) then the server # dont send us our expected range, we must cut it at some point (here) - data=data[:(self.range[1]-self.pos+1)] #XXX explain this - print "cut range=%s pos=%s length=%s => %s" % (self.range, self.pos, length, len(data)) + bytes_remaining = self.range[1] - self.pos + 1 #XXX + data=data[:bytes_remaining] + print self,"cut: pos=%s length=%s => %s" % (self.pos, length, len(data)) length = len(data) if length == 0: self.reader.handle_incoming_data(self) self.close() return - self.reader.handle_incoming_data(self, data, length) + if not self.reader.handle_incoming_data(self, data, length): + self.close() + return self.pos += length if self.range != (-1,-1) and self.pos >= self.range[1]: self.stop_time = time.time() print self, "finished with %s kB/s" % (self.speed() / 1024) - #make sure the next fetcher will be started - self.reader.handle_incoming_data(self) self.close() elif self.state ==1: #header self.http_header.write( data ) @@ -118,13 +120,11 @@ class MagicHTTPProxyClient(object): self.blocks = list() self.fetchers = list() - print self, "New Instance" - proxy = ENDPOINTS[ random.randint(0, len(ENDPOINTS)-1) ] self.fetchers.append( Fetcher(self, proxy, self.url, self.header, (-1,-1)) ) def __str__(self): - return "<MagicHTTPProxyClient url=%s content_length=%s>" % (urlparse.urlunparse(self.url), self.content_length) + return "<MagicHTTPProxyClient host=%s path=%s content_length=%s>" % (self.url.hostname, self.url.path, self.content_length) def handle_incoming_data(self, fetcher, data=None, length=0): if not data: @@ -136,32 +136,33 @@ class MagicHTTPProxyClient(object): if not self.channel.connected: print self, "request side closed the connection" - return + return False if fetcher.range != (-1,-1) \ - and fetcher.range[1] - fetcher.pos < FETCHER_JUMPSTART \ + and fetcher.range[1] - (fetcher.pos+length) < FETCHER_JUMPSTART \ and self.fetch_pos + 1 < self.content_length and self.channel.connected \ and len( filter(lambda f: f.proxy == fetcher.proxy, self.fetchers) ) < 2: #Start a new fetcher if this fetcher is X-Bytes before finished his job blocksize = max(int(TIME_SLICE * fetcher.speed()), MIN_BLOCKSIZE) fetch_range = self.next_range(blocksize) - print "Start new Fetcher, bs=%s range=%s" % (blocksize,fetch_range) self.fetchers.append( Fetcher(self, fetcher.proxy, self.url, self.header, fetch_range) ) + if len(self.blocks)>0: + print self,"fetch_pos=%s write_pos=%s get=%s with length=%s pending=%s" % (self.fetch_pos, self.write_pos, min(self.blocks)[0],min(self.blocks)[2], len(self.blocks)) + buf = "" while len(self.blocks)>0 and min(self.blocks)[0] == self.write_pos: item = heappop(self.blocks) buf += item[1] self.write_pos += item[2] - if buf == "" and len(self.blocks)>0: - print "search=%s get=%s with length=%s pending=%s" % (self.write_pos, min(self.blocks)[0],min(self.blocks)[2], len(self.blocks)) if buf != "": self.channel.push(buf) if self.write_pos + 1 >= self.content_length: print self, "job done %s blocks left" % len(self.blocks) self.channel.close_when_done() + return True def next_range(self, suggested_blocksize): assert self.content_length != -1 @@ -170,6 +171,8 @@ class MagicHTTPProxyClient(object): return (start, self.fetch_pos-1) def handle_incoming_http_header(self, fetcher, header): + if not self.channel.connected: + return if self.header_sent: pass else: @@ -203,18 +206,49 @@ class MagicHTTPProxyClient(object): buf += "\r\n" self.channel.push(buf) - +"""Transparent forward to other proxy server""" +class HTTPProxyClient(asynchat.async_chat): + def __init__(self, proxy, channel, method, url, headers): + self.proxy = proxy + self.other = channel + self.method = method + self.headers = headers + + asynchat.async_chat.__init__(self) + self.set_terminator(None) + self.create_socket(socket.AF_INET, socket.SOCK_STREAM) + self.connect(self.proxy) + + self.buf = "" + self.buf += "%s %s HTTP/1.0\r\n" % (method, urlparse.urlunparse(url)) + for key in headers.keys(): + self.buf += "%s: %s\r\n" % (key, headers[key]) + self.buf += "\r\n" + + def __str__(self): + return "<HTTPProxyClient proxy=%s:%s>" % self.proxy + + def collect_incoming_data(self, data): + self.other.push(data) + + def handle_close(self): + self.close() + self.other.close_when_done() + + def handle_connect(self): + self.push(self.buf) + class HTTPChannel(asynchat.async_chat): def __init__(self, server, sock, addr): self.server = server self.data = StringIO.StringIO() - self.request = None asynchat.async_chat.__init__(self, sock) self.set_terminator("\r\n\r\n") def handle_close(self): + self.connected = False self.close() def collect_incoming_data(self, data): @@ -243,7 +277,7 @@ class HTTPProxyServer(asyncore.dispatcher): self.create_socket(socket.AF_INET, socket.SOCK_STREAM) self.set_reuse_addr() self.bind(("", 8080)) - self.listen(15) + self.listen(5) def __str__(self): return "<HTTPProxyServer port=%s>" % self.port @@ -254,7 +288,7 @@ class HTTPProxyServer(asyncore.dispatcher): def handle_request(self, channel, method, path, headers): url = urlparse.urlparse(path) - if method != "GET" or url.query != "": + if method != "GET": #do not handle non-GET or GET with Query (?foo=bla) requests return self._bypass_request(channel, method, url, headers) else: |