summaryrefslogtreecommitdiff
path: root/proxy.py
diff options
context:
space:
mode:
Diffstat (limited to 'proxy.py')
-rwxr-xr-xproxy.py80
1 files changed, 57 insertions, 23 deletions
diff --git a/proxy.py b/proxy.py
index eb408b6..ac01249 100755
--- a/proxy.py
+++ b/proxy.py
@@ -41,7 +41,7 @@ class Fetcher(asynchat.async_chat):
self.http_status = ""
self.http_header = StringIO.StringIO()
self.state = 0 #0=status, 1=header, 2=body
-
+ print self, "__init__"
asynchat.async_chat.__init__(self)
self.set_terminator("\r\n")
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
@@ -52,12 +52,13 @@ class Fetcher(asynchat.async_chat):
def handle_connect (self):
print self, "Start"
- self.send("GET http://%s:%s%s HTTP/1.0\r\n" % ( self.url.hostname, self.url.port or 80, self.url.path ))
+ buf = "GET http://%s:%s%s HTTP/1.0\r\n" % ( self.url.hostname, self.url.port or 80, self.url.path )
for key in filter(lambda k: k not in ("range"), self.headers.keys()): #send origin request headers
- self.send("%s: %s\r\n" % (key, self.headers[key]))
+ buf += "%s: %s\r\n" % (key, self.headers[key])
if self.range != (-1,-1):
- self.send("Range: bytes=%s-%s\r\n" % (self.range[0], self.range[1]))
- self.send("\r\n")
+ buf += "Range: bytes=%s-%s\r\n" % (self.range[0], self.range[1])
+ buf += "\r\n"
+ self.push(buf)
self.start_time = time.time()
def time(self):
@@ -72,23 +73,24 @@ class Fetcher(asynchat.async_chat):
def collect_incoming_data(self, data):
if self.state==2: #body
length = len(data)
- if self.range != (-1,-1) and self.pos + length >= self.range[1]:
+ if self.range != (-1,-1) and self.pos + length > self.range[1]:
#if this request is the first one (whithout Range: ...) then the server
# dont send us our expected range, we must cut it at some point (here)
- data=data[:(self.range[1]-self.pos+1)] #XXX explain this
- print "cut range=%s pos=%s length=%s => %s" % (self.range, self.pos, length, len(data))
+ bytes_remaining = self.range[1] - self.pos + 1 #XXX
+ data=data[:bytes_remaining]
+ print self,"cut: pos=%s length=%s => %s" % (self.pos, length, len(data))
length = len(data)
if length == 0:
self.reader.handle_incoming_data(self)
self.close()
return
- self.reader.handle_incoming_data(self, data, length)
+ if not self.reader.handle_incoming_data(self, data, length):
+ self.close()
+ return
self.pos += length
if self.range != (-1,-1) and self.pos >= self.range[1]:
self.stop_time = time.time()
print self, "finished with %s kB/s" % (self.speed() / 1024)
- #make sure the next fetcher will be started
- self.reader.handle_incoming_data(self)
self.close()
elif self.state ==1: #header
self.http_header.write( data )
@@ -118,13 +120,11 @@ class MagicHTTPProxyClient(object):
self.blocks = list()
self.fetchers = list()
- print self, "New Instance"
-
proxy = ENDPOINTS[ random.randint(0, len(ENDPOINTS)-1) ]
self.fetchers.append( Fetcher(self, proxy, self.url, self.header, (-1,-1)) )
def __str__(self):
- return "<MagicHTTPProxyClient url=%s content_length=%s>" % (urlparse.urlunparse(self.url), self.content_length)
+ return "<MagicHTTPProxyClient host=%s path=%s content_length=%s>" % (self.url.hostname, self.url.path, self.content_length)
def handle_incoming_data(self, fetcher, data=None, length=0):
if not data:
@@ -136,32 +136,33 @@ class MagicHTTPProxyClient(object):
if not self.channel.connected:
print self, "request side closed the connection"
- return
+ return False
if fetcher.range != (-1,-1) \
- and fetcher.range[1] - fetcher.pos < FETCHER_JUMPSTART \
+ and fetcher.range[1] - (fetcher.pos+length) < FETCHER_JUMPSTART \
and self.fetch_pos + 1 < self.content_length and self.channel.connected \
and len( filter(lambda f: f.proxy == fetcher.proxy, self.fetchers) ) < 2:
#Start a new fetcher if this fetcher is X-Bytes before finished his job
blocksize = max(int(TIME_SLICE * fetcher.speed()), MIN_BLOCKSIZE)
fetch_range = self.next_range(blocksize)
- print "Start new Fetcher, bs=%s range=%s" % (blocksize,fetch_range)
self.fetchers.append( Fetcher(self, fetcher.proxy, self.url, self.header, fetch_range) )
+ if len(self.blocks)>0:
+ print self,"fetch_pos=%s write_pos=%s get=%s with length=%s pending=%s" % (self.fetch_pos, self.write_pos, min(self.blocks)[0],min(self.blocks)[2], len(self.blocks))
+
buf = ""
while len(self.blocks)>0 and min(self.blocks)[0] == self.write_pos:
item = heappop(self.blocks)
buf += item[1]
self.write_pos += item[2]
- if buf == "" and len(self.blocks)>0:
- print "search=%s get=%s with length=%s pending=%s" % (self.write_pos, min(self.blocks)[0],min(self.blocks)[2], len(self.blocks))
if buf != "":
self.channel.push(buf)
if self.write_pos + 1 >= self.content_length:
print self, "job done %s blocks left" % len(self.blocks)
self.channel.close_when_done()
+ return True
def next_range(self, suggested_blocksize):
assert self.content_length != -1
@@ -170,6 +171,8 @@ class MagicHTTPProxyClient(object):
return (start, self.fetch_pos-1)
def handle_incoming_http_header(self, fetcher, header):
+ if not self.channel.connected:
+ return
if self.header_sent:
pass
else:
@@ -203,18 +206,49 @@ class MagicHTTPProxyClient(object):
buf += "\r\n"
self.channel.push(buf)
-
+"""Transparent forward to other proxy server"""
+class HTTPProxyClient(asynchat.async_chat):
+ def __init__(self, proxy, channel, method, url, headers):
+ self.proxy = proxy
+ self.other = channel
+ self.method = method
+ self.headers = headers
+
+ asynchat.async_chat.__init__(self)
+ self.set_terminator(None)
+ self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.connect(self.proxy)
+
+ self.buf = ""
+ self.buf += "%s %s HTTP/1.0\r\n" % (method, urlparse.urlunparse(url))
+ for key in headers.keys():
+ self.buf += "%s: %s\r\n" % (key, headers[key])
+ self.buf += "\r\n"
+
+ def __str__(self):
+ return "<HTTPProxyClient proxy=%s:%s>" % self.proxy
+
+ def collect_incoming_data(self, data):
+ self.other.push(data)
+
+ def handle_close(self):
+ self.close()
+ self.other.close_when_done()
+
+ def handle_connect(self):
+ self.push(self.buf)
+
class HTTPChannel(asynchat.async_chat):
def __init__(self, server, sock, addr):
self.server = server
self.data = StringIO.StringIO()
- self.request = None
asynchat.async_chat.__init__(self, sock)
self.set_terminator("\r\n\r\n")
def handle_close(self):
+ self.connected = False
self.close()
def collect_incoming_data(self, data):
@@ -243,7 +277,7 @@ class HTTPProxyServer(asyncore.dispatcher):
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.set_reuse_addr()
self.bind(("", 8080))
- self.listen(15)
+ self.listen(5)
def __str__(self):
return "<HTTPProxyServer port=%s>" % self.port
@@ -254,7 +288,7 @@ class HTTPProxyServer(asyncore.dispatcher):
def handle_request(self, channel, method, path, headers):
url = urlparse.urlparse(path)
- if method != "GET" or url.query != "":
+ if method != "GET":
#do not handle non-GET or GET with Query (?foo=bla) requests
return self._bypass_request(channel, method, url, headers)
else: