#!/usr/bin/env python3 # coding: utf-8 # python3 works as well import sys assert sys.version.startswith("3."), "Requires python3" import os import re import dbm import json import time import argparse import math import shutil import logging import hashlib from io import BytesIO try: from PIL import Image, ImageFilter import tmdbsimple as tmdb import requests except ImportError as e: print("Missing dependency: {0}".format(str(e))) print("Install using system package manager") print("or `pip install --user (one of: Pillow tmdbsimple requests)`") sys.exit(1) def read_key(): if "TMDB_KEY" in os.environ.keys(): return os.environ["TMDB_KEY"] if "XDG_CONFIG_HOME" in os.environ.keys(): cfg_home = os.environ["XDG_CONFIG_HOME"] else: cfg_home = os.path.join(os.path.expanduser("~"), ".config") if os.path.exists(os.path.join(cfg_home, "tmdbkey")): return open(os.path.join(cfg_home, "tmdbkey"), "r").read().strip() if os.path.exists(os.path.join(os.path.expanduser("~"), ".tmdbkey")): return open(os.path.join(os.path.expanduser("~"), ".tmdbkey")).read().strip() raise Exception("No TMDB Key defined. Set TMDB_KEY=.. or .tmdbkey file") class TMDBCache(object): def __enter__(self): logger_name = self.__class__.__module__ + "." + self.__class__.__name__ self.logger = logging.getLogger(logger_name) self.logger.info("Open db") self.db = dbm.open(self._get_db_filename("tmdbmovie.dbm"), "c") self.db_images = dbm.open(self._get_db_filename("tmdbposter.dbm"), "c") return self def __exit__(self, type, value, traceback): self.logger.info("Close db") self.db.close() self.db_images.close() def _get_db_filename(self, name): if "XDG_CACHE_HOME" in os.environ.keys(): cachedir = os.environ["XDG_CACHE_HOME"] else: cachedir = os.path.join(os.path.expanduser("~"), ".cache") return os.path.join(cachedir, name) def _cache(self, key, callable_func): if key not in self.db: self.db[key] = json.dumps(callable_func()) d = self.db[key].decode('utf-8') return json.loads(d) def infos(self, movie_id): try: self.logger.debug("movie %s", movie_id) return self._cache(movie_id + "movies.info", tmdb.Movies(movie_id).info) except Exception as e: raise Exception("Failed to query movie-id {id}: {reason}".format( id=movie_id, reason=str(e))) def alternative_title(self, movie_id, locale): """Returns the title in selected locale or original title otherwise""" try: key = movie_id + "movies.alt_titles" search = tmdb.Movies(movie_id).alternative_titles titles = self._cache(key, search) alt_title = list(filter(lambda l: l["iso_3166_1"] == locale, titles["titles"])) if alt_title: return alt_title[0]["title"] else: infos = self.infos(movie_id) return infos["title"] or infos["original_title"] except Exception as e: raise Exception("Failed to query movie-id {id}: {reason}".format( id=movie_id, reason=str(e))) def prune(self, movie_id): self.logger.debug("prune {}".format(movie_id)) keys = [ "imdb_maindetails_{}".format(movie_id), movie_id + "movies.info", movie_id + "movies.alt_titles", ] for key in keys: if key in self.db: self.logger.warn("Remove {}".format(key)) del self.db[key] else: self.logger.debug("Key not in db {}".format(key)) def poster(self, poster_path, f="w154"): self.logger.debug("poster %s", poster_path) key = "poster_{}_{}".format(f, hashlib.md5(poster_path.encode('utf-8')).hexdigest()[0:10]) keyContentType = "{}_ct".format(key) url = "http://image.tmdb.org/t/p/{}/{}".format(f, poster_path) if key not in self.db_images or keyContentType not in self.db_images: r = requests.get(url) self.db_images[key] = r.content self.db_images[keyContentType] = r.headers['content-type'] return (self.db_images[keyContentType], self.db_images[key]) def poster_low(self, poster_path, format="w154"): p = self.poster(poster_path, format) if not p: return None contentType, data = p image = Image.open(BytesIO(data)) image = image.filter(ImageFilter.GaussianBlur(radius=1)) buf = BytesIO() image.save(buf, "JPEG", quality=18, optimize=True) return ("image/jpeg", buf.getvalue()) def _imdb_request(self, path, query): # see also https://github.com/richardasaurus/imdb-pie # nice library but a bit strange API, # so we chose to reimplement stuff here BASE_URI = 'app.imdb.com' API_KEY = '2wex6aeu6a8q9e49k7sfvufd6rhh0n' SHA1_KEY = hashlib.sha1(API_KEY.encode('utf8')).hexdigest() HEADERS = { 'user-agent': 'AppleWebKit/534.46 (KHTML, like Gecko) ' + 'Version/5.1 Mobile/9A405', } PARAMS = { "api": "v1", "appid": "iphone1_1", "apiPolicy": "app1_1", "apiKey": SHA1_KEY, "locale": "en_US", "timestamp": "{:.0f}".format(time.time()) } q = query.copy() q.update(PARAMS) return requests.get("https://{}{}".format(BASE_URI, path), params=q, headers=HEADERS) def imdb_movie(self, movie_id): def do_request(): r = self._imdb_request("/title/maindetails", {'tconst': movie_id}) assert r.status_code == 200, "Request must return status-code 200" data = json.loads(r.text) assert data is not None and data['data'], "Data must not be empty" return data key = "imdb_maindetails_{}".format(movie_id) return self._cache(key, do_request) def omdb_movie(self, movie_id): def do_request(): BASE_URI="http://www.omdbapi.com/" params={'i':movie_id, 'plot':'short', 'r':'json', 'tomatoes':'true'} r = requests.get(BASE_URI, params=params) assert r.status_code == 200, "Request must return status-code 200" data = json.loads(r.text) assert data is not None and data['Response'] == 'True', "Data must not be empty" return data key = "omdb_{}".format(movie_id) return self._cache(key, do_request) def weight_rating(infos): """ add 'rating' to all infos""" infos = list(infos) maxvotes = max(map(lambda i: i["vote_count"], infos)) for info in infos: f = math.sin(math.pi * (info["vote_average"]/10.0)) d = (float(info["vote_count"]) / maxvotes) - 0.5 info['rating'] = info["vote_average"] + 2 * d * f return infos class Protector(object): """ The Protector saves the caller from exception. All callable attributes of child are dynamically replaced by function returning 'None' in case of an exception - except for KeyboardInterrupt exceptions. >>> class Thrower(object): ... def some_func(self): ... raise Exception("I'm evil") >>> t = Thrower() >>> t.some_func() == None Traceback (most recent call last): Exception: I'm evil >>> p = Protector(t) >>> p.some_func() == None True """ def __init__(self, child): self.child = child def __getattr__(self, name): attr = getattr(self.child, name) def protected(*a, **kw): try: return attr(*a, **kw) except KeyboardInterrupt as e: raise e except Exception as e: logging.error("Error calling %s: %s: %s", name, type(e), e) return None if callable(attr): return protected else: return attr def do_aka(args, imdb_ids): with TMDBCache() as tmdbcache: for (filename, imdb_id) in imdb_ids: print(tmdbcache.alternative_title(imdb_id, locale=args.lang)) def do_data(args, imdb_ids): def print_data(data, io, indent=0): if isinstance(data, dict): for (key, val) in data.items(): if val is None: continue else: io.write("\n{}{}: ".format(indent*" ", key)) print_data(val, io, indent+1) elif isinstance(data, list) and len(data) > 0: for val in data: print_data(val, io, indent+2) elif type(data) in (bool, str, int, float): io.write(str(data)) with TMDBCache() as tmdbcache: for (filename, imdb_id) in imdb_ids: print_data({"TMDB": tmdbcache.infos(imdb_id), "IMDB": tmdbcache.imdb_movie(imdb_id), "OMDB": tmdbcache.omdb_movie(imdb_id)}, sys.stdout) def do_year(args, imdb_ids): with TMDBCache() as tmdbcache: for (filename, imdb_id) in imdb_ids: print(tmdbcache.infos(imdb_id)["release_date"].split("-")[0]) def do_prune(args, imdb_ids): with TMDBCache() as tmdbcache: for (filename, imdb_id) in imdb_ids: tmdbcache.prune(imdb_id) def do_rating(args, imdb_ids): """Calculates a rating based on vote_average and vote_count""" with TMDBCache() as tmdbcache: infos = filter(lambda i: "vote_average" in i and "vote_count" in i, map(lambda fid: tmdbcache.infos(fid[1]), imdb_ids)) weight_rating(infos) for info in infos: print("{rating:.02f} {imdb_id} {title:30s} avg=" + "{vote_average:.1f} count={vote_count:.0f}".format(**info)) def do_index(args, imdb_ids): """creates a index website""" valid_extensions = [".mkv", ".avi", ".mov", ".mp4", ".mpg"] def listMovieFiles(path): for root, dirs, files in os.walk(path): for curfile in files: if sum(map(curfile.endswith, valid_extensions)): yield os.path.join(root, curfile) def install(filename): outpath = os.path.join(".index.html", os.path.dirname(filename)) if not os.path.exists(outpath): os.makedirs(outpath) src = os.path.join(os.path.dirname(__file__),filename) out = os.path.join(outpath, os.path.basename(filename)) shutil.copy(src, out) return out def data(callbackName): db = Protector(tmdbcache) def poster(imdb_id, data): if data: if not os.path.exists(".index.html/poster"): os.makedirs(".index.html/poster") out = os.path.join(".index.html/poster",imdb_id+".jpg") open(out, "wb").write(data[1]) return out else: return None def getInfo(a): path, imdb_id = a imdb = db.imdb_movie(imdb_id)['data'] omdb = db.omdb_movie(imdb_id) tmdb = db.infos(imdb_id) if not imdb or not omdb or not tmdb: print("Error in {} {}".format(path, imdb_id)) return None else: return { 'id': imdb_id, 'title': tmdb['title'], 'path': path, 'poster': poster(imdb_id, db.poster_low(tmdb['poster_path'])), 'tagline': 'tagline' in imdb and imdb['tagline'] or None, 'plot': 'plot' in imdb and imdb['plot']['outline'] or None, 'website': 'homepage' in imdb and imdb['homepage'] or omdb['Website'] != 'N/A' and omdb['Website'] or None, 'release': 'release_date' in tmdb and tmdb['release_date'] or None, 'movieFiles': list(listMovieFiles(path)), 'imdbRating': imdb['rating'], 'imdbVotes': imdb['num_votes'], 'omdbTomatoConsensus': (omdb['tomatoConsensus'] != 'N/A') and omdb['tomatoConsensus'] or None, 'omdbTomato': (omdb['tomatoMeter'] != 'N/A') and float(omdb['tomatoMeter']) or None, 'omdbUserTomato': (omdb['tomatoUserMeter'] != 'N/A') and float(omdb['tomatoUserMeter']) or None, 'omdbTomatoRating': (omdb['tomatoRating'] != 'N/A') and float(omdb['tomatoRating']) or None, 'omdbTomatoUserRating': (omdb['tomatoUserRating'] != 'N/A') and float(omdb['tomatoUserRating']) or None, 'omdbTomatoFresh': (omdb['tomatoFresh'] != 'N/A') and int(omdb['tomatoFresh']) or None, 'tmdbId': tmdb['id'], } filename = ".index.html/js/data.js" if not os.path.exists(os.path.dirname(filename)): os.makedirs(os.path.dirname(filename)) out = open(filename, "w") out.write(callbackName) out.write("(") json.dump(list(filter(bool, map(getInfo, imdb_ids))), out) out.write(")") out.close() return filename try: from jinja2 import Template except ImportError: print("Failed to import jinja2 library for html-templating") sys.exit(1) template_file = os.path.join(os.path.dirname(__file__), "index.jinja2.html") template = Template(open(template_file, "r").read()) with TMDBCache() as tmdbcache: mapping = { 'title': 'Movie overview', 'install': install, 'data': data, } assert not os.path.exists("index.html") and not os.path.exists(".index.html"), \ "index.html or folder .index.html already exists" stream = template.generate(mapping) outfile = open("index.html", "wb") for output in stream: out = output.strip() out = re.sub(" {2,}", " ", out) outfile.write(out.encode('utf-8')) class HelpAction(argparse._HelpAction): def __call__(self, parser, namespace, values, option_string=None): formatter = parser._get_formatter() formatter.add_usage(parser.usage, parser._actions, parser._mutually_exclusive_groups) formatter.start_section(parser._optionals.title) formatter.add_text(parser._optionals.description) formatter.add_arguments(parser._optionals._group_actions) formatter.end_section() subparsers_actions = [ action for action in parser._actions if isinstance(action, argparse._SubParsersAction)] for subparsers_action in subparsers_actions: # get all subparsers and print help subparsers = subparsers_action.choices for subaction in subparsers_action._get_subactions(): subparser = subparsers[subaction.dest] usage = formatter._format_actions_usage(subparser._actions, []) formatter.start_section("{} {} {}".format(formatter._prog, subaction.dest, usage)) formatter.add_text(subaction.help) formatter.add_arguments(subparser._positionals._group_actions) formatter.add_arguments(subparser._optionals._group_actions) formatter.end_section() print(formatter.format_help()) parser.exit(0) def do_test(args): import doctest doctest.testmod() if __name__ == "__main__": tmdb.API_KEY = read_key() parser = argparse.ArgumentParser(add_help=False) parser.add_argument("--help", action=HelpAction, help="Display full help") parser.add_argument("--log-level", action='store', type=int, help="Set log level (CRITICAL=50," + "ERROR=40,WARNING=30,INFO=20,DEBUG=10,NOTSET=0") parser.add_argument("-h", action=argparse._HelpAction, help="Display short help") subparsers = parser.add_subparsers() parser_aka = subparsers.add_parser("aka", add_help=False, help="Print alternative titles") parser_aka.add_argument("--lang", help="Language code (default 'DE')") parser_aka.set_defaults(func=do_aka) parser_aka.add_argument("files", action="append", nargs="+", help="Files containing distinct movie-ids") parser_data = subparsers.add_parser("data", add_help=False, help="Print all available data") parser_data.set_defaults(func=do_data) parser_data.add_argument("files", action="append", nargs="+", help="Files containing distinct movie-ids") parser_year = subparsers.add_parser("year", add_help=False, help="Print only the release year") parser_year.set_defaults(func=do_year) parser_year.add_argument("files", action="append", nargs="+", help="Files containing distinct movie-ids") parser_prune = subparsers.add_parser("prune", add_help=False, help="Delete cache entries") parser_prune.set_defaults(func=do_prune) parser_prune.add_argument("files", action="append", nargs="+", help="Files containing distinct movie-ids") parser_rating = subparsers.add_parser("rating", add_help=False, help="Print movie ratings") parser_rating.set_defaults(func=do_rating) parser_rating.add_argument("files", action="append", nargs="+", help="Files containing distinct movie-ids") parser_index = subparsers.add_parser("index", add_help=False, help="Generate index.html file") parser_index.set_defaults(func=do_index) parser_index.add_argument("files", action="append", nargs="+", help="Files containing distinct movie-ids") parser_test = subparsers.add_parser("test", add_help=False, help="Run testsuite") parser_test.set_defaults(func=do_test) args = parser.parse_args(sys.argv[1:]) if "log_level" in args: logging.basicConfig(level=args.log_level) if "files" in args: ids = args.files[0] ids = map(lambda filename: (lambda x: (filename, x.groups()[0]) if x else None) (re.match(".*#(tt[0-9]{7}).*", filename)), ids) args.func(args, list(filter(lambda i: i is not None, ids))) elif "func" in args: args.func(args) else: HelpAction("")(parser, None, None)