summaryrefslogtreecommitdiff
path: root/proxy.py
diff options
context:
space:
mode:
Diffstat (limited to 'proxy.py')
-rwxr-xr-xproxy.py286
1 files changed, 0 insertions, 286 deletions
diff --git a/proxy.py b/proxy.py
deleted file mode 100755
index edf6be2..0000000
--- a/proxy.py
+++ /dev/null
@@ -1,286 +0,0 @@
-#!/usr/bin/python -t
-import os, sys, string, time, md5, random
-import asynchat, asyncore, socket, httplib, urlparse
-from heapq import heappush, heappop
-import cStringIO as StringIO
-
-kB = 1024
-
-class DefaultConfiguration:
- """bind to that"""
- listen=("",8080)
-
- """available http-proxies"""
- endpoints=[ ('10.2.2.11', 8888) ]
-
- """minimum entity size to start parallelize fetch"""
- threshold = 512*kB
-
- """initial size/range for a fetcher-job"""
- initial_blocksize=512*kB
-
- """minimal size/range for a fetcher-job"""
- minimal_blocksize=512*kB
-
- """(sec) #time each fetcher spent on his range,
- calculated using speed measured while using initial_blocksize"""
- time_slice=5
-
- """start a new fetcher on a endpoint X-bytes before the old one finished"""
- fetcher_jumpstart=32*kB
-
-#################
-
-class Fetcher(asynchat.async_chat):
- def __init__(self, reader, proxy, url, headers, range):
- self.reader = reader
- self.proxy = proxy
- self.url = url
- self.headers = headers
- self.range = range
-
- self.pos = (self.range[0] != -1) and self.range[0] or 0
- self.start_time = 0
- self.stop_time = 0
- self.http_status = ""
- self.http_header = StringIO.StringIO()
- self.state = 0 #0=status, 1=header, 2=body
- asynchat.async_chat.__init__(self)
- self.set_terminator("\r\n")
- self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
- self.connect(self.proxy)
-
- def __str__(self):
- return "<Fetcher proxy=%s host=%s path=%s range=%s" % (self.proxy, self.url.hostname, self.url.path, self.range)
-
- def handle_connect (self):
- print self, "connected"
- buf = "GET http://%s:%s%s HTTP/1.0\r\n" % ( self.url.hostname, self.url.port or 80, self.url.path )
- for key in filter(lambda k: k not in ("range"), self.headers.keys()): #send origin request headers
- buf += "%s: %s\r\n" % (key, self.headers[key])
- if self.range != (-1,-1):
- buf += "Range: bytes=%s-%s\r\n" % (self.range[0], self.range[1])
- buf += "\r\n"
- self.push(buf)
- self.start_time = time.time()
-
- def time(self):
- if self.stop_time == 0:
- return time.time() - self.start_time
- else:
- return self.stop_time - self.start_time
-
- def speed(self):
- return (self.pos - self.range[0]) / self.time()
-
- def collect_incoming_data(self, data):
- if self.state==2: #body
- length = len(data)
- if self.range != (-1,-1) and self.pos + length >= self.range[1] + 1:
- #if this request is the first one (whithout Range: ...) then the server
- # dont send us our expected range, we must cut it at some point (here)
- bytes_remaining = self.range[1] - ( self.pos )
- data=data[:bytes_remaining+1]
- print self,"cut: pos=%s length=%s => %s" % (self.pos, length, len(data))
- length = len(data)
- if length == 0:
- self.reader.handle_incoming_data(self)
- self.close()
- return
- if not self.reader.handle_incoming_data(self, data, length):
- self.close()
- return
- self.pos += length
- if self.range != (-1,-1) and self.pos >= self.range[1]:
- self.stop_time = time.time()
- print self, "finished with %s kB/s" % (self.speed() / 1024)
- self.reader.handle_incoming_data(self)
- self.close()
- elif self.state ==1: #header
- self.http_header.write( data )
- else: #status
- self.http_status += data
-
- def found_terminator(self):
- if self.state == 0: #got status-line
- self.state = 1
- self.set_terminator("\r\n\r\n")
- elif self.state == 1: #got headers
- self.state = 2
- self.set_terminator(None)
- self.reader.handle_incoming_http_header(self, self.http_header)
-
-class MagicHTTPProxyClient(object):
- def __init__(self, channel, url, header):
- self.channel = channel
- self.config = self.channel.server.config
- self.url = url
- self.header = header
-
- self.content_length = -1
- self.header_sent = False
- self.fetch_pos = 0
- self.write_pos = 0
- self.buffer = ""
- self.blocks = list()
- self.fetchers = list()
-
- proxy = self.config.endpoints[ random.randint(0, len(self.config.endpoints)-1) ]
- self.fetchers.append( Fetcher(self, proxy, self.url, self.header, (-1,-1)) )
-
- def __str__(self):
- return "<MagicHTTPProxyClient host=%s path=%s content_length=%s>" % (self.url.hostname, self.url.path, self.content_length)
-
- def handle_incoming_data(self, fetcher, data=None, length=0):
- if not data:
- #fetcher is done, remove from list
- self.fetchers = filter(lambda f: f != fetcher, self.fetchers)
- print "Remove: %s" % fetcher
- else:
- assert fetcher.pos < fetcher.range[1] or fetcher.range == (-1,-1)
- heappush(self.blocks, (fetcher.pos, data, length))
-
- if not self.channel.connected:
- print self, "request side closed the connection"
- return False
-
- if fetcher.range != (-1,-1) \
- and fetcher.range[1] - (fetcher.pos+length) < self.config.fetcher_jumpstart \
- and self.fetch_pos + 1 < self.content_length \
- and len( filter(lambda f: f.proxy == fetcher.proxy, self.fetchers) ) < 2:
- #Start a new fetcher if this fetcher is X-Bytes before finished his job
- blocksize = max(int(self.config.time_slice * fetcher.speed()), self.config.minimal_blocksize)
- fetch_range = self.next_range(blocksize)
- self.fetchers.append( Fetcher(self, fetcher.proxy, self.url, self.header, fetch_range) )
-
- #if len(self.blocks)>0:
- #print self,"fetch_pos=%s write_pos=%s get=%s with length=%s pending=%s" % (self.fetch_pos, self.write_pos, min(self.blocks)[0],min(self.blocks)[2], len(self.blocks))
-
- buf = ""
- while len(self.blocks)>0 and min(self.blocks)[0] == self.write_pos:
- item = heappop(self.blocks)
- buf += item[1]
- self.write_pos += item[2]
-
- if buf != "":
- self.channel.push(buf)
-
- if self.write_pos + 1 >= self.content_length:
- print self, "job done %s blocks left" % len(self.blocks)
- self.channel.close_when_done()
- return True
-
- def next_range(self, suggested_blocksize):
- assert self.content_length != -1
- start = self.fetch_pos
- self.fetch_pos = min(self.fetch_pos + suggested_blocksize, self.content_length)
- return (start, self.fetch_pos-1)
-
- def handle_incoming_http_header(self, fetcher, header):
- if not self.channel.connected:
- return
- if self.header_sent:
- pass
- else:
- self.header_sent = True
-
- # Sends header from first response
- header.seek(0)
- headers = httplib.HTTPMessage(header)
-
- content_length = filter(lambda i: i == "content-length", headers.dict.keys())
- if len(content_length) == 1:
- content_length = int(headers.dict["content-length"])
- if content_length >= self.config.threshold:
- self.content_length = content_length
- fetcher.range = self.next_range(self.config.initial_blocksize)
- for proxy in filter(lambda p: fetcher.proxy != p, self.config.endpoints):
- if self.fetch_pos == self.content_length -1:
- break
- self.fetchers.append(Fetcher( self, proxy, self.url, self.header, self.next_range(self.config.initial_blocksize)))
-
- else:
- content_length = None
-
- buf = "HTTP/1.1 200 OK\r\n"
- for key in filter(lambda k: k not in ("content-range", "content-length"), headers.dict.keys()):
- buf += "%s: %s\r\n" % (key, headers.dict[key])
- if content_length:
- buf += "Content-Length: %s\r\n" % content_length
- buf += "Content-Range: bytes %s-%s/%s\r\n" % (0, content_length-1, content_length)
- buf += "X-Proxy: Magicproxy (superpower activated)\r\n"
- buf += "\r\n"
- self.channel.push(buf)
-
-class HTTPChannel(asynchat.async_chat):
- def __init__(self, server, sock, addr):
- self.server = server
- self.data = StringIO.StringIO()
-
- asynchat.async_chat.__init__(self, sock)
- self.set_terminator("\r\n\r\n")
-
- def handle_close(self):
- self.connected = False
- self.close()
-
- def collect_incoming_data(self, data):
- self.data.write(data)
- if self.data.tell() > 16384:
- self.close_when_done()
-
- def found_terminator(self):
- # parse http header
- self.data.seek(0)
- self.request = string.split(self.data.readline(), None, 2)
- if len(self.request) != 3:
- # badly formed request; just shut down
- self.close_when_done()
- else:
- self.set_terminator(None)
- headers = httplib.HTTPMessage(self.data).dict
- self.handle_request(self.request[0], self.request[1], headers)
-
- def handle_request(self, method, path, headers):
- url = urlparse.urlparse(path)
- if method != "GET" or url.query != "":
- #do not handle non-GET or GET with Query (?foo=bla) requests
- proxy = self.server.config.endpoints[ int( md5.md5(url.hostname).hexdigest(),16 ) % len(self.server.config.endpoints) ]
- print Fetcher(self, proxy, url, headers, (-1,-1))
- else:
- MagicHTTPProxyClient(self, url, headers)
-
- def handle_incoming_http_header(self,fetcher,header):
- header.seek(0)
- headers = httplib.HTTPMessage(header)
- buf = "HTTP/1.1 200 OK\r\n"
- buf += "\r\n".join(map(lambda hdr: "%s: %s" % (hdr,headers.dict[hdr]), headers.dict.keys()))
- buf += "\r\n\r\n"
- self.push(buf)
-
-
- def handle_incoming_data(self, fetcher, data=None, length=0):
- if data:
- self.push(data)
- return True
-
-class HTTPProxyServer(asyncore.dispatcher):
- def __init__(self,config):
- self.config = config
- asyncore.dispatcher.__init__(self)
- self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
- self.set_reuse_addr()
- self.bind(self.config.listen)
- self.listen(5)
-
- def __str__(self):
- return "<HTTPProxyServer listen=%s:%s>" % self.config.listen
-
- def handle_accept(self):
- conn, addr = self.accept()
- HTTPChannel(self, conn, addr)
-
-if __name__ == "__main__":
- proxy = HTTPProxyServer(DefaultConfiguration)
- print proxy
- asyncore.loop()