From 083899d6088492b1bc7e01f74be015070224f2c2 Mon Sep 17 00:00:00 2001 From: Yves Fischer Date: Fri, 20 Feb 2015 18:35:30 +0100 Subject: Some tests and using python netrc module --- httpfs/__init__.py | 319 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 319 insertions(+) create mode 100644 httpfs/__init__.py (limited to 'httpfs/__init__.py') diff --git a/httpfs/__init__.py b/httpfs/__init__.py new file mode 100644 index 0000000..f7d7ec3 --- /dev/null +++ b/httpfs/__init__.py @@ -0,0 +1,319 @@ +#!/usr/bin/env python3 +import os +import sys +import time +import netrc +import logging +import logging.config +from urllib.parse import quote, unquote +from email.utils import parsedate +from html.parser import HTMLParser +from stat import S_IFDIR, S_IFREG +from errno import EIO, ENOENT, EBADF, EHOSTUNREACH + +import fuse +import requests + + +class Config(object): + mountpoint = None + timeout = (5, 25) # connect_timeout, read_timeout + verify = None + + +class Path: + def __init__(self, parent, name): + self.parent = parent + self.name = name + self.initialized = False + + def buildUrl(self): + return self.parent.buildUrl() + "/" + quote(self.name) + + def getSession(self): + return self.parent.getSession() + + def getAttr(self): + raise fuse.FuseOSError(ENOENT) + + @classmethod + def fromPath(clazz, parent, pathElement): + p = clazz(parent, unquote(pathElement)) + if (pathElement == ""): + raise SystemExit(1) + logging.info("created {} '{}' referencing {}".format( + clazz.__name__, p.name, p.buildUrl())) + return p + + +class File(Path): + def __init__(self, parent, name): + super().__init__(parent, name) + self.lastModified = None + self.size = None + + def init(self): + url = self.buildUrl() + logging.info("File url={} name={}".format(url, self.name)) + r = self.getSession().head(url, timeout=Config.timeout) + r.close() + if r.status_code != 200: + error = "Status code != 200 for {}: {}".format(url, r.status_code) + raise Exception(error) + self.size = int(r.headers['content-length']) + self.lastModified = time.mktime(parsedate(r.headers['last-modified'])) + + logging.info("File initialized url={} name={}".format(url, self.name)) + self.initialized = True + + def get(self, size, offset): + if not self.initialized: + self.init() + url = self.buildUrl() + bytesRange = '{}-{}'.format(offset, min(self.size, offset+size-1)) + headers = {'range': 'bytes=' + bytesRange} + logging.info("File.get url={} range={}".format(url, bytesRange)) + r = self.getSession().get(url, headers=headers, timeout=Config.timeout) + r.close() + if r.status_code == 200 or r.status_code == 206: + d = r.content + logging.info("Received {} bytes".format(len(d))) + if len(d) > size: + errormsg = "size {} > than expected {}".format(len(d), size) + logging.error(errormsg) + raise fuse.FuseOSError(EIO) + return d + else: + raise fuse.FuseOSError(EIO) + + def getAttr(self): + if not self.initialized: + self.init() + t = self.lastModified + return dict(st_mode=(S_IFREG | 0o444), st_nlink=1, st_size=self.size, + st_ctime=t, st_mtime=t, st_atime=t) + + +class Directory(Path): + def __init__(self, parent, name): + super().__init__(parent, name) + self.entries = {} + + def init(self): + url = self.buildUrl() + logging.info("Directory url={} name={}".format(url, self.name)) + r = self.getSession().get(url, stream=True, timeout=Config.timeout) + if r.status_code != 200: + raise Exception("Status code not 200 for {}: {}".format( + url, r.status_code)) + + if "text/html" not in r.headers['content-type']: + raise Exception("Is not text/html: {}".format(url)) + + parser = RelativeLinkCollector(self) + for line in r.iter_content(decode_unicode=True): + parser.feed(line) + self.entries.update(parser.entries) + parser.entries.clear() + parser.close() + self.entries.update(parser.entries) + r.close() + + logging.info("Diretory loaded {}".format(url)) + self.initialized = True + + def getAttr(self): + t = time.time() + nentries = 1 + if self.initialized: + nentries += len(self.entries) + return dict(st_mode=(S_IFDIR | 0o555), st_nlink=nentries, + st_ctime=t, st_mtime=t, st_atime=t) + + +class Server(Directory): + def __init__(self, parent, name): + super().__init__(parent, name) + self.session = requests.Session() + self.session.verify = Config.verify + + def getSession(self): + return self.session + + def buildUrl(self): + return self.parent.buildUrl() + "/" + self.name + + +class Schema(Directory): + def __init__(self, parent, name): + super().__init__(parent, name) + self.initialized = True + + def buildUrl(self): + return self.name + ":/" + + +class Root(Directory): + def __init__(self): + super().__init__(None, "") + self.initialized = True + + def buildUrl(self): + return "" + + +class RelativeLinkCollector(HTMLParser): + def __init__(self, parent): + super().__init__(self, convert_charrefs=True) + self.parent = parent + self.entries = {} + + def handle_starttag(self, tag, attrs): + if tag == "a": + attrs = dict(attrs) + if "href" in attrs: + href = attrs["href"] + if "/" in href[:-1] or href[0] == ".": + return + + if href[-1:] == "/": + d = Directory.fromPath(self.parent, href[:-1]) + self.entries[unquote(href[:-1])] = d + else: + f = File.fromPath(self.parent, href) + self.entries[unquote(href)] = f + + +class Httpfs(fuse.LoggingMixIn, fuse.Operations): + """A read only http/https/ftp filesystem using python-requests.""" + def __init__(self): + self.root = Root() + + https = Schema(self.root, 'https') + https.entries = dict(self._getDefaultEntries(https)) + http = Schema(self.root, 'http') + http.entries = dict(self._getDefaultEntries(http)) + + self.root.entries = {'http': http, 'https': https} + + def _getDefaultEntries(self, parent): + try: + for machine in netrc.netrc().hosts.keys(): + yield (machine, Server(parent, machine)) + except IOError as e: + logging.warn("No .netrc file found, no default machines") + + def getattr(self, path, fh=None): + logging.info("getattr path={}".format(path)) + try: + entry = self._getPath(path) + if entry: + return entry.getAttr() + except Exception as e: + logging.error("Error", e) + raise fuse.FuseOSError(EHOSTUNREACH) + raise fuse.FuseOSError(ENOENT) + + def _getPath(self, path): + """ map path to self.root tree + a path is build like ///""" + logging.debug("getPath path={}".format(path)) + if path == "/": + return self.root + + if path[-1] == "/": + path = path[:-1] + + schema, *p = path[1:].split("/") + if schema not in self.root.entries: + return None + prevEntry = self.root.entries[schema] + if p == []: + return prevEntry + + server, *p = p + if server not in prevEntry.entries: + # create server if not exists + prevEntry.entries[server] = Server.fromPath(prevEntry, server) + prevEntry = prevEntry.entries[server] + if p == []: + return prevEntry + + *pathElements, lastElement = p + for pathElement in pathElements: + if pathElement not in prevEntry.entries: + d = Directory.fromPath(prevEntry, pathElement) + prevEntry.entries[pathElement] = d + prevEntry = prevEntry.entries[pathElement] + + if lastElement not in prevEntry.entries: + if not prevEntry.initialized: + prevEntry.init() + if lastElement not in prevEntry.entries: + # the server don't return it, then just create it + # assuming its an directory, if a HEAD is successful + d = Directory.fromPath(prevEntry, lastElement) + r = d.getSession().head(d.buildUrl(), + timeout=Config.timeout) + if r.status_code == 200: + logging.info("Create directory for path which was not " + + "discovered by Index of: {}".format(path)) + prevEntry.entries[lastElement] = d + else: + logging.info("Path not found: {}".format(path)) + return None + return prevEntry.entries[lastElement] + + def readdir(self, path, fh): + logging.info("readdir path=%s", path) + entry = self._getPath(path) + if not entry: + raise fuse.FuseOSError(EBADF) + if not entry.initialized: + entry.init() + return [(".", entry.getAttr(), 0), + ("..", (entry.parent and entry.parent.getAttr() or None), 0)] \ + + [(it.name, it.getAttr(), 0) for it in entry.entries.values()] + + def read(self, path, size, offset, fh): + entry = self._getPath(path) + if isinstance(entry, File): + return entry.get(size, offset) + else: + raise fuse.FuseOSError(EIO) + +if __name__ == '__main__': + import argparse + FORMAT = "%(threadName)s %(asctime)-15s %(levelname)s:%(name)s %(message)s" + logging.basicConfig(level=logging.INFO, format=FORMAT) + + p = argparse.ArgumentParser( + formatter_class=argparse.ArgumentDefaultsHelpFormatter) + p.add_argument("mountpoint", nargs=1, help="Target directory") + p.add_argument("--max_background", type=int, default=15, + help="Maximum number of background threads") + p.add_argument("--no_foreground", action="store_true", default=False, + help="Fork into background as a daemon") + p.add_argument("--debug", action="store_true", help="Enable fuse debug") + p.add_argument("--nothreads", action="store_true", + help="Disable fuse threads") + p.add_argument("--connect_timeout", type=int, default=Config.timeout[0], + help="HTTP connect timeout") + p.add_argument("--read_timeout", type=int, default=Config.timeout[1], + help="HTTP read timeout") + p.add_argument("--no-verify", action="store_true", + help="Disable ssl verify") + + args = vars(p.parse_args(sys.argv[1:])) + + Config.timeout = (args.pop("connect_timeout"), args.pop("read_timeout")) + Config.mountpoint = args.pop("mountpoint")[0] + Config.verify = not args.pop("no_verify") + kwargs = {} + if not args.pop("no_foreground"): + kwargs["foreground"] = True + if args.pop("debug"): + kwargs["debug"] = True + kwargs.update(args) + + fuse = fuse.FUSE(Httpfs(), Config.mountpoint, **kwargs) -- cgit v1.2.1