summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xproxy.py136
1 files changed, 56 insertions, 80 deletions
diff --git a/proxy.py b/proxy.py
index c11236d..edf6be2 100755
--- a/proxy.py
+++ b/proxy.py
@@ -4,27 +4,31 @@ import asynchat, asyncore, socket, httplib, urlparse
from heapq import heappush, heappop
import cStringIO as StringIO
+kB = 1024
-ENDPOINTS = [
- ('10.2.2.11', 8888),
-# ('10.3.1.2', 8888),
-# ('10.1.1.156', 8888),
-]
+class DefaultConfiguration:
+ """bind to that"""
+ listen=("",8080)
-kB = 1024
-#minimum entity size to start a paralel fetch
-THRESHOLD = 512 * kB
-#first fetch-range blocksize
-INIT_BLOCKSIZE = 512 * kB
-#lower bound of fetch-range blocksize optimization
-MIN_BLOCKSIZE = 512 * kB
-#time each fetcher spent on his range, calculated using
-#speed measured while using INIT_BLOCKSIZE
-TIME_SLICE = 5
-#start a new fetcher on a endpoint X-bytes before the
-#old one finished
-FETCHER_JUMPSTART = 32 * kB
+ """available http-proxies"""
+ endpoints=[ ('10.2.2.11', 8888) ]
+
+ """minimum entity size to start parallelize fetch"""
+ threshold = 512*kB
+
+ """initial size/range for a fetcher-job"""
+ initial_blocksize=512*kB
+ """minimal size/range for a fetcher-job"""
+ minimal_blocksize=512*kB
+
+ """(sec) #time each fetcher spent on his range,
+ calculated using speed measured while using initial_blocksize"""
+ time_slice=5
+
+ """start a new fetcher on a endpoint X-bytes before the old one finished"""
+ fetcher_jumpstart=32*kB
+
#################
class Fetcher(asynchat.async_chat):
@@ -41,7 +45,6 @@ class Fetcher(asynchat.async_chat):
self.http_status = ""
self.http_header = StringIO.StringIO()
self.state = 0 #0=status, 1=header, 2=body
- print self, "__init__"
asynchat.async_chat.__init__(self)
self.set_terminator("\r\n")
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
@@ -51,7 +54,7 @@ class Fetcher(asynchat.async_chat):
return "<Fetcher proxy=%s host=%s path=%s range=%s" % (self.proxy, self.url.hostname, self.url.path, self.range)
def handle_connect (self):
- print self, "Start"
+ print self, "connected"
buf = "GET http://%s:%s%s HTTP/1.0\r\n" % ( self.url.hostname, self.url.port or 80, self.url.path )
for key in filter(lambda k: k not in ("range"), self.headers.keys()): #send origin request headers
buf += "%s: %s\r\n" % (key, self.headers[key])
@@ -110,6 +113,7 @@ class Fetcher(asynchat.async_chat):
class MagicHTTPProxyClient(object):
def __init__(self, channel, url, header):
self.channel = channel
+ self.config = self.channel.server.config
self.url = url
self.header = header
@@ -121,7 +125,7 @@ class MagicHTTPProxyClient(object):
self.blocks = list()
self.fetchers = list()
- proxy = ENDPOINTS[ random.randint(0, len(ENDPOINTS)-1) ]
+ proxy = self.config.endpoints[ random.randint(0, len(self.config.endpoints)-1) ]
self.fetchers.append( Fetcher(self, proxy, self.url, self.header, (-1,-1)) )
def __str__(self):
@@ -141,11 +145,11 @@ class MagicHTTPProxyClient(object):
return False
if fetcher.range != (-1,-1) \
- and fetcher.range[1] - (fetcher.pos+length) < FETCHER_JUMPSTART \
+ and fetcher.range[1] - (fetcher.pos+length) < self.config.fetcher_jumpstart \
and self.fetch_pos + 1 < self.content_length \
and len( filter(lambda f: f.proxy == fetcher.proxy, self.fetchers) ) < 2:
#Start a new fetcher if this fetcher is X-Bytes before finished his job
- blocksize = max(int(TIME_SLICE * fetcher.speed()), MIN_BLOCKSIZE)
+ blocksize = max(int(self.config.time_slice * fetcher.speed()), self.config.minimal_blocksize)
fetch_range = self.next_range(blocksize)
self.fetchers.append( Fetcher(self, fetcher.proxy, self.url, self.header, fetch_range) )
@@ -187,13 +191,13 @@ class MagicHTTPProxyClient(object):
content_length = filter(lambda i: i == "content-length", headers.dict.keys())
if len(content_length) == 1:
content_length = int(headers.dict["content-length"])
- if content_length >= THRESHOLD:
+ if content_length >= self.config.threshold:
self.content_length = content_length
- fetcher.range = self.next_range(INIT_BLOCKSIZE)
- for proxy in filter(lambda p: fetcher.proxy != p, ENDPOINTS):
+ fetcher.range = self.next_range(self.config.initial_blocksize)
+ for proxy in filter(lambda p: fetcher.proxy != p, self.config.endpoints):
if self.fetch_pos == self.content_length -1:
break
- self.fetchers.append(Fetcher( self, proxy, self.url, self.header, self.next_range(INIT_BLOCKSIZE)))
+ self.fetchers.append(Fetcher( self, proxy, self.url, self.header, self.next_range(self.config.initial_blocksize)))
else:
content_length = None
@@ -208,43 +212,9 @@ class MagicHTTPProxyClient(object):
buf += "\r\n"
self.channel.push(buf)
-"""Transparent forward to other proxy server"""
-class HTTPProxyClient(asynchat.async_chat):
- def __init__(self, proxy, channel, method, url, headers):
- self.proxy = proxy
- self.other = channel
- self.method = method
- self.headers = headers
-
- asynchat.async_chat.__init__(self)
- self.set_terminator(None)
- self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
- self.connect(self.proxy)
-
- self.buf = ""
- self.buf += "%s %s HTTP/1.0\r\n" % (method, urlparse.urlunparse(url))
- for key in headers.keys():
- self.buf += "%s: %s\r\n" % (key, headers[key])
- self.buf += "\r\n"
-
- def __str__(self):
- return "<HTTPProxyClient proxy=%s:%s>" % self.proxy
-
- def collect_incoming_data(self, data):
- self.other.push(data)
-
- def handle_close(self):
- self.close()
- self.other.close_when_done()
-
- def handle_connect(self):
- self.push(self.buf)
-
class HTTPChannel(asynchat.async_chat):
def __init__(self, server, sock, addr):
- print "Channel opened"
self.server = server
-
self.data = StringIO.StringIO()
asynchat.async_chat.__init__(self, sock)
@@ -269,42 +239,48 @@ class HTTPChannel(asynchat.async_chat):
else:
self.set_terminator(None)
headers = httplib.HTTPMessage(self.data).dict
- self.handle_request(self, self.request[0], self.request[1], headers)
+ self.handle_request(self.request[0], self.request[1], headers)
- def handle_request(self, channel, method, path, headers):
+ def handle_request(self, method, path, headers):
url = urlparse.urlparse(path)
- if method != "GET":
+ if method != "GET" or url.query != "":
#do not handle non-GET or GET with Query (?foo=bla) requests
- return self._bypass_request(channel, method, url, headers)
+ proxy = self.server.config.endpoints[ int( md5.md5(url.hostname).hexdigest(),16 ) % len(self.server.config.endpoints) ]
+ print Fetcher(self, proxy, url, headers, (-1,-1))
else:
- MagicHTTPProxyClient(channel, url, headers)
-
- def _bypass_request(self, channel, method, url, headers):
- proxy = ENDPOINTS[ int( md5.md5(url.hostname).hexdigest(),16 ) % len(ENDPOINTS) ]
- print self, "_bypass request via %s: %s %s" % (proxy, method, urlparse.urlunparse(url))
- HTTPProxyClient(proxy, channel, method, url, headers)
- #FIXME use this other thing
-
-
+ MagicHTTPProxyClient(self, url, headers)
+
+ def handle_incoming_http_header(self,fetcher,header):
+ header.seek(0)
+ headers = httplib.HTTPMessage(header)
+ buf = "HTTP/1.1 200 OK\r\n"
+ buf += "\r\n".join(map(lambda hdr: "%s: %s" % (hdr,headers.dict[hdr]), headers.dict.keys()))
+ buf += "\r\n\r\n"
+ self.push(buf)
-class HTTPProxyServer(asyncore.dispatcher):
- def __init__(self):
- self.port = 8080
+ def handle_incoming_data(self, fetcher, data=None, length=0):
+ if data:
+ self.push(data)
+ return True
+
+class HTTPProxyServer(asyncore.dispatcher):
+ def __init__(self,config):
+ self.config = config
asyncore.dispatcher.__init__(self)
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.set_reuse_addr()
- self.bind(("", 8080))
+ self.bind(self.config.listen)
self.listen(5)
def __str__(self):
- return "<HTTPProxyServer port=%s>" % self.port
+ return "<HTTPProxyServer listen=%s:%s>" % self.config.listen
def handle_accept(self):
conn, addr = self.accept()
HTTPChannel(self, conn, addr)
if __name__ == "__main__":
- proxy = HTTPProxyServer()
+ proxy = HTTPProxyServer(DefaultConfiguration)
print proxy
asyncore.loop()