summaryrefslogtreecommitdiff
path: root/proxy.py
diff options
context:
space:
mode:
Diffstat (limited to 'proxy.py')
-rwxr-xr-xproxy.py175
1 files changed, 66 insertions, 109 deletions
diff --git a/proxy.py b/proxy.py
index 5af4fd3..ee5d083 100755
--- a/proxy.py
+++ b/proxy.py
@@ -1,16 +1,13 @@
#!/usr/bin/python -t
-import os, sys, string, time, md5
+import os, sys, string, time, md5, random
import asynchat, asyncore, socket, httplib, urlparse
from heapq import heappush, heappop
-try:
- import cStringIO as StringIO
-except ImportError:
- import StringIO
+import cStringIO as StringIO
ENDPOINTS = [
('10.2.2.11', 8888),
- ('10.3.1.2', 8888),
+# ('10.3.1.2', 8888),
('10.1.1.156', 8888),
]
@@ -38,7 +35,7 @@ class Fetcher(asynchat.async_chat):
self.headers = headers
self.range = range
- self.pos = self.range[0]
+ self.pos = (self.range[0] != -1) and self.range[0] or 0
self.start_time = 0
self.stop_time = 0
self.http_status = ""
@@ -58,7 +55,8 @@ class Fetcher(asynchat.async_chat):
self.send("GET http://%s:%s%s HTTP/1.0\r\n" % ( self.url.hostname, self.url.port or 80, self.url.path ))
for key in filter(lambda k: k not in ("range"), self.headers.keys()): #send origin request headers
self.send("%s: %s\r\n" % (key, self.headers[key]))
- self.send("Range: bytes=%s-%s\r\n" % (self.range[0], self.range[1]))
+ if self.range != (-1,-1):
+ self.send("Range: bytes=%s-%s\r\n" % (self.range[0], self.range[1]))
self.send("\r\n")
self.start_time = time.time()
@@ -76,9 +74,9 @@ class Fetcher(asynchat.async_chat):
length = len(data)
self.reader.handle_incoming_data(self, data, length)
self.pos += length
- if self.pos >= self.range[1]:
+ if self.range != (-1,-1) and self.pos >= self.range[1]:
self.stop_time = time.time()
- print self, "finished"
+ print self, "finished with %s kB/s" % (self.speed() / 1024)
#make sure the next fetcher will be started
self.reader.handle_incoming_data(self)
self.close()
@@ -94,25 +92,15 @@ class Fetcher(asynchat.async_chat):
elif self.state == 1: #got headers
self.state = 2
self.set_terminator(None)
- self.reader.handle_incoming_http_header(self.http_header)
+ self.reader.handle_incoming_http_header(self, self.http_header)
-class StringIOProducer(object):
- def __init__(self,buf, amt=1440):
- self.buf=buf
- self.amt = amt
- self.buf.seek(0)
-
- def more(self):
- return self.buf.read(self.amt)
-
-class MultipleProxyReader(object):
- def __init__(self, channel, url, header, content_length):
+class MagicHTTPProxyClient(object):
+ def __init__(self, channel, url, header):
self.channel = channel
self.url = url
self.header = header
- self.content_length = content_length
- print self, "New Instance"
-
+
+ self.content_length = -1
self.header_sent = False
self.fetch_pos = 0
self.write_pos = 0
@@ -120,98 +108,88 @@ class MultipleProxyReader(object):
self.blocks = list()
self.fetchers = list()
- for proxy in ENDPOINTS:
- self.fetchers.append( Fetcher(self, proxy, self.url, self.header, self.next_range(INIT_BLOCKSIZE)) )
+ print self, "New Instance"
+
+ proxy = ENDPOINTS[ random.randint(0, len(ENDPOINTS)-1) ]
+ self.fetchers.append( Fetcher(self, proxy, self.url, self.header, (-1,-1)) )
def __str__(self):
- return "<MultipleProxyReader url=%s content_length=%s>" % (urlparse.urlunparse(self.url), self.content_length)
+ return "<MagicHTTPProxyClient url=%s content_length=%s>" % (urlparse.urlunparse(self.url), self.content_length)
def handle_incoming_data(self, fetcher, data=None, length=0):
if not data:
+ #fetcher is done, remove from list
self.fetchers = filter(lambda f: f != fetcher, self.fetchers)
else:
heappush(self.blocks, (fetcher.pos, data, length))
- if self.channel.is_closed:
+ if not self.channel.connected:
print self, "request side closed the connection"
return
- if fetcher.range[1] - fetcher.pos < FETCHER_JUMPSTART \
- and self.fetch_pos + 1 < self.content_length and not self.channel.is_closed \
- and len( filter( (lambda f: f.proxy == fetcher.proxy), self.fetchers) ) < 2:
- #Start a new fetcher on this line if this fetchers is X-Bytes before finishing his jobs
+ if fetcher.range != (-1,-1) \
+ and fetcher.range[1] - fetcher.pos < FETCHER_JUMPSTART \
+ and self.fetch_pos + 1 < self.content_length and self.channel.connected \
+ and len( filter(lambda f: f.proxy == fetcher.proxy, self.fetchers) ) < 2:
+ #Start a new fetcher on this line if this fetchers is X-Bytes before finished his job
blocksize = max(int(TIME_SLICE * fetcher.speed()), MIN_BLOCKSIZE)
fetch_range = self.next_range(blocksize)
print "Start new Fetcher, bs=%s range=%s" % (blocksize,fetch_range)
self.fetchers.append( Fetcher(self, fetcher.proxy, self.url, self.header, fetch_range) )
- buf = StringIO.StringIO()
+ buf = ""
while len(self.blocks)>0 and min(self.blocks)[0] == self.write_pos:
item = heappop(self.blocks)
- buf.write(item[1])
+ buf += item[1]
self.write_pos += item[2]
-
- if buf.tell() > 0:
- self.channel.push_with_producer(StringIOProducer(buf))
+ if len(self.blocks)>0:
+ print "missed: %s => %s" % (self.write_pos, min(self.blocks)[0])
+ if buf != "":
+ self.channel.push(buf)
if self.write_pos + 1 >= self.content_length:
print self, "job done %s blocks left" % len(self.blocks)
- self.channel.is_closed = True
self.channel.close_when_done()
def next_range(self, suggested_blocksize):
+ assert self.content_length != -1
start = self.fetch_pos
self.fetch_pos = min(self.fetch_pos + suggested_blocksize, self.content_length)
return (start, self.fetch_pos-1)
- def handle_incoming_http_header(self, header):
+ def handle_incoming_http_header(self, fetcher, header):
if self.header_sent:
pass
else:
self.header_sent = True
- self.channel.push("HTTP/1.0 200 OK\r\n")
# Sends header from first response
header.seek(0)
headers = httplib.HTTPMessage(header)
- for key in filter(lambda k: k not in ("content-range", "content-length"), headers.dict.keys()):
- self.channel.push("%s: %s\r\n" % (key, headers.dict[key]))
- self.channel.push("Content-Length: %s\r\n" % self.content_length)
- self.channel.push("X-Proxy: Magicproxy (superpower activated)\r\n")
- self.channel.push("\r\n")
-
-"""Transparent forward to other proxy server"""
-class HTTPProxyClient(asynchat.async_chat):
- def __init__(self, proxy, channel, method, url, headers):
- self.proxy = proxy
- self.other = channel
- self.method = method
- self.headers = headers
-
- asynchat.async_chat.__init__(self)
- self.set_terminator(None)
- self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
- self.connect(self.proxy)
-
- self.buf = StringIO.StringIO()
- self.buf.write("%s %s HTTP/1.0\r\n" % (method, urlparse.urlunparse(url)))
- for key in headers.keys():
- self.buf.write("%s: %s\r\n" % (key, headers[key]))
- self.buf.write("\r\n")
-
- def __str__(self):
- return "<HTTPProxyClient proxy=%s:%s>" % self.proxy
- def collect_incoming_data(self, data):
- self.other.push(data)
+ content_length = filter(lambda i: i == "content-length", headers.dict.keys())
+ if len(content_length) == 1:
+ content_length = int(headers.dict["content-length"])
+ if content_length >= THRESHOLD:
+ self.content_length = content_length
+ fetcher.range = self.next_range(INIT_BLOCKSIZE)
+ for proxy in filter(lambda p: fetcher.proxy != p, ENDPOINTS):
+ if self.fetch_pos == self.content_length -1:
+ break
+ self.fetchers.append(Fetcher( self, proxy, self.url, self.header, self.next_range(INIT_BLOCKSIZE)))
- def handle_close(self):
- self.close()
- self.other.close_when_done()
-# print self, "Done"
+ else:
+ content_length = None
- def handle_connect(self):
- self.push_with_producer(StringIOProducer(self.buf))
+ buf = "HTTP/1.1 200 OK\r\n"
+ for key in filter(lambda k: k not in ("content-range", "content-length"), headers.dict.keys()):
+ buf += "%s: %s\r\n" % (key, headers.dict[key])
+ if content_length:
+ buf += "Content-Length: %s\r\n" % content_length
+ buf += "Content-Range: bytes %s-%s/%s\r\n" % (0, content_length-1, content_length)
+ buf += "X-Proxy: Magicproxy (superpower activated)\r\n"
+ buf += "\r\n"
+ self.channel.push(buf)
class HTTPChannel(asynchat.async_chat):
@@ -219,7 +197,6 @@ class HTTPChannel(asynchat.async_chat):
self.server = server
self.data = StringIO.StringIO()
- self.is_closed = False
self.request = None
asynchat.async_chat.__init__(self, sock)
@@ -227,7 +204,6 @@ class HTTPChannel(asynchat.async_chat):
def handle_close(self):
self.close()
- self.is_closed = True
def collect_incoming_data(self, data):
self.data.write(data)
@@ -235,18 +211,17 @@ class HTTPChannel(asynchat.async_chat):
self.close_when_done()
def found_terminator(self):
- if not self.request:
- # parse http header
- self.data.seek(0)
- self.request = string.split(self.data.readline(), None, 2)
- if len(self.request) != 3:
- # badly formed request; just shut down
- self.close_when_done()
- else:
- headers = httplib.HTTPMessage(self.data).dict
- self.server.handle_request(self, self.request[0], self.request[1], headers)
+ # parse http header
+ self.data.seek(0)
+ self.request = string.split(self.data.readline(), None, 2)
+ if len(self.request) != 3:
+ # badly formed request; just shut down
+ self.close_when_done()
else:
- pass # ignore body data, for now
+ self.set_terminator(None)
+ headers = httplib.HTTPMessage(self.data).dict
+ self.server.handle_request(self, self.request[0], self.request[1], headers)
+
class HTTPProxyServer(asyncore.dispatcher):
def __init__(self):
@@ -256,16 +231,11 @@ class HTTPProxyServer(asyncore.dispatcher):
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.set_reuse_addr()
self.bind(("", 8080))
- self.listen(5)
+ self.listen(15)
def __str__(self):
return "<HTTPProxyServer port=%s>" % self.port
- def shutdown(self):
- #TODO Hier Proxy sauber beenden
- #self.channel.close_when_done()
- sys.exit()
-
def handle_accept(self):
conn, addr = self.accept()
HTTPChannel(self, conn, addr)
@@ -275,21 +245,8 @@ class HTTPProxyServer(asyncore.dispatcher):
if method != "GET" or url.query != "":
#do not handle non-GET or GET with Query (?foo=bla) requests
return self._bypass_request(channel, method, url, headers)
-
- #check for content-length header with a HEAD request
- conn = httplib.HTTPConnection(url.hostname, url.port or 80)
- conn.request("HEAD", url.path)
- resp = conn.getresponse()
- content_length = filter(lambda it: it[0] == "content-length", resp.getheaders())
- if len( content_length ) == 0:
- # no content length given, bypass this request
- self._bypass_request(channel, "GET", url, headers)
else:
- content_length = int(content_length[0][1])
- if content_length < THRESHOLD:
- self._bypass_request(channel, "GET", url, headers)
- else:
- MultipleProxyReader(channel, url, headers, content_length)
+ MagicHTTPProxyClient(channel, url, headers)
def _bypass_request(self, channel, method, url, headers):
proxy = ENDPOINTS[ int( md5.md5(url.hostname).hexdigest(),16 ) % len(ENDPOINTS) ]