#!/usr/bin/env python2 # coding: utf-8 # python3 works as well import sys assert sys.version.startswith("3."), "Requires python3" import os import re import dbm import json import time import base64 import argparse import math import logging import hashlib from io import StringIO from urllib.parse import quote as urlencode try: from PIL import Image import tmdbsimple as tmdb import requests except ImportError as e: print("Missing dependency: {0}".format(str(e))) print("Install using system package manager or `pip install --user (one of: Pillow tmdbsimple requests)`") sys.exit(1) def read_key(): if "TMDB_KEY" in os.environ.keys(): return os.environ["TMDB_KEY"] if "XDG_CONFIG_HOME" in os.environ.keys(): cfg_home = os.environ["XDG_CONFIG_HOME"] else: cfg_home = os.path.join(os.path.expanduser("~"), ".config") if os.path.exists(os.path.join(cfg_home, "tmdbkey")): return open(os.path.join(cfg_home, "tmdbkey"), "r").read().strip() if os.path.exists(os.path.join(os.path.expanduser("~"), ".tmdbkey")): return open(os.path.join(os.path.expanduser("~"), ".tmdbkey")).read().strip() raise Exception("No TheMovieDB Key defined. Set Env. var. TMDB_KEY or .tmdbkey file") class TMDBCache(object): def __enter__(self): self.logger = logging.getLogger(self.__class__.__module__ + "." + self.__class__.__name__) self.logger.info("Open db") self.db = dbm.open(self._get_db_filename("tmdbmovie.dbm"),"c") self.db_images = dbm.open(self._get_db_filename("tmdbposter.dbm"), "c") return self def __exit__(self, type, value, traceback): self.logger.info("Close db") self.db.close() self.db_images.close() def _get_db_filename(self, name): if "XDG_CACHE_HOME" in os.environ.keys(): cachedir = os.environ["XDG_CACHE_HOME"] else: cachedir = os.path.join(os.path.expanduser("~"), ".cache") return os.path.join(cachedir, name) def _cache(self, key, callable_func): if key not in self.db: self.db[key] = json.dumps(callable_func()) d = self.db[key].decode('utf-8') return json.loads(d) def infos(self, movie_id): try: self.logger.debug("movie %s", movie_id) return self._cache(movie_id + "movies.info", tmdb.Movies(movie_id).info) except Exception as e: raise Exception("Failed to query movie with id {id}: {reason}".format(id=movie_id, reason=str(e))) def alternative_title(self, movie_id, locale): """Returns the title in selected locale or original title otherwise""" try: alt_title = list(filter(lambda l: l["iso_3166_1"] == locale, self._cache(movie_id + "movies.alt_titles", tmdb.Movies(movie_id).alternative_titles)["titles"])) if alt_title: return alt_title[0]["title"] else: infos = self.infos(movie_id) return infos["title"] or infos["original_title"] except Exception as e: raise Exception("Failed to query movie with id {id}: {reason}".format(id=movie_id, reason=str(e))) def prune(self, movie_id): keys = [ "imdb_maindetails_{}".format(movie_id), movie_id + "movies.info", movie_id + "movies.alt_titles", ] for key in keys: if key in self.db: print("Remove {}".format(key)) del self.db[key] def poster(self, poster_path, format="w185"): self.logger.debug("poster %s", poster_path) key = "poster_{}_{}".format(format, poster_path) keyContentType = "poster_{}_{}_content_type".format(format, poster_path) url = "http://image.tmdb.org/t/p/{}/{}".format(format, poster_path) if key not in self.db_images: r = requests.get(url) self.db_images[key] = r.content self.db_images[keyContentType] = r.headers['content-type'] return (self.db_images[keyContentType], self.db_images[key]) def poster_base64(self, poster_path, format="w185"): p = self.poster(poster_path, format) if not p: return None contentType, data = p image = Image.open(StringIO(data)) buf = StringIO() image.save(buf, "JPEG", quality=10, optimize=True) data64 = "".join(map(lambda c: isinstance(c,int) and chr(c) or c, filter(lambda c: c!='\n', base64.encodestring(buf.getvalue())))) return "data:{};base64,{}".format(contentType, data64) def _imdb_request(self, path, query): # see also https://github.com/richardasaurus/imdb-pie # nice library but a bit strange API, so we chose to reimplement stuff here BASE_URI = 'app.imdb.com' API_KEY = '2wex6aeu6a8q9e49k7sfvufd6rhh0n' SHA1_KEY = hashlib.sha1(API_KEY.encode('utf8')).hexdigest() HEADERS = { 'user-agent' : 'AppleWebKit/534.46 (KHTML, like Gecko) Ver sion/5.1 Mobile/9A405', } PARAMS = { "api": "v1", "appid": "iphone1_1", "apiPolicy": "app1_1", "apiKey": SHA1_KEY, "locale": "en_US", "timestamp": "{:.0f}".format(time.time()) } q = query.copy() q.update(PARAMS) return requests.get("http://{}{}".format(BASE_URI, path), params=q, headers=HEADERS) def imdb_movie(self, movie_id): def do_request(): r = self._imdb_request("/title/maindetails", {'tconst': movie_id}) assert r.status_code == 200, "Request must return status-code 200" return json.loads(r.content) key = "imdb_maindetails_{}".format(movie_id) return self._cache(key, do_request) def weight_rating(infos): """ add 'rating' to all infos""" maxvotes = max(map(lambda i: i["vote_count"], infos)) for info in infos: f = math.sin(math.pi * ( info["vote_average"]/10.0 ) ) d = (float(info["vote_count"]) / maxvotes) - 0.5 info['rating'] = info["vote_average"] + 2 * d * f return infos class Protector(object): """ The Protector saves the caller from exception. All callable attributes of child are dynamically replaced by function returning 'None' in case of an exception - except for KeyboardInterrupt exceptions. >>> class Thrower(object): ... def some_func(self): ... raise Exception("I'm evil") >>> t = Thrower() >>> t.some_func() == None Traceback (most recent call last): Exception: I'm evil >>> p = Protector(t) >>> p.some_func() == None True """ def __init__(self, child): self.child = child def __getattr__(self, name): attr = getattr(self.child, name) def protected(*a, **kw): try: return attr(*a,**kw) except KeyboardInterrupt as e: raise e except Exception as e: logging.error("Error calling %s: %s", name, e) return None if callable(attr): return protected else: return attr def do_aka(args, imdb_ids): with TMDBCache() as tmdbcache: for (filename, imdb_id) in imdb_ids: print(tmdbcache.alternative_title(imdb_id, locale=args.lang)) def do_data(args, imdb_ids): with TMDBCache() as tmdbcache: for (filename, imdb_id) in imdb_ids: selected_properties = ["imdb_id", "revenue", "vote_average", "vote_count", "runtime", "budget", "vote_avarage", "release_date", "popularity", ] kv = map(lambda kv: "{}={}".format(*kv), filter(lambda kv: kv[0] in selected_properties, tmdbcache.infos(imdb_id).items())) print(" ".join(kv)) def do_year(args, imdb_ids): with TMDBCache() as tmdbcache: for (filename, imdb_id) in imdb_ids: print(tmdbcache.infos(imdb_id)["release_date"].split("-")[0]) def do_prune(args, imdb_ids): with TMDBCache() as tmdbcache: for (filename, imdb_id) in imdb_ids: tmdbcache.prune(imdb_id) def do_rating(args, imdb_ids): """Calculates a rating based on vote_average and vote_count""" with TMDBCache() as tmdbcache: infos = list(filter(lambda i: "vote_average" in i and "vote_count" in i, map(lambda fid: tmdbcache.infos(fid[1]), imdb_ids))) weight_rating(infos) for info in infos: print("{rating:.02f} {imdb_id} {title:30s} avg={vote_average:.1f} count={vote_count:.0f}".format(**info)) def do_index(args, imdb_ids): """creates a index website""" def listMovieFiles(path): for root, dirs, files in os.walk(path): for curfile in files: if sum(map(curfile.endswith, [".mkv", ".avi", ".mov", ".mp4"])): yield os.path.join(root, curfile) def asBase64(poster): contentType, data = poster data64 = filter(lambda c: c!='\n', base64.encodestring(data)) return "data:{};base64,{}".format(contentType, data64) try: from jinja2 import Template except ImportError: print("Failed to import jinja2 library for html-templating") sys.exit(1) template_file = os.path.join(os.path.dirname(__file__), "index.jinja2.html") template = Template(open(template_file, "rb").read().decode('utf-8')) with TMDBCache() as tmdbcache: mapping = { 'gmtime' : time.gmtime(), 'input' : imdb_ids, 'tmdbcache' : Protector(tmdbcache), 'title' : 'Movie overview', 'urlencode' : urlencode, 'int' : int, 'listMovieFiles' : listMovieFiles, 'math' : math, } assert not os.path.exists("index.html"), "index.html already exists" stream = template.generate(mapping) outfile = open("index.html", "wb") for output in stream: output = output.encode("utf-8") outfile.write(output.strip()) class HelpAction(argparse._HelpAction): def __call__(self, parser, namespace, values, option_string=None): formatter = parser._get_formatter() formatter.add_usage(parser.usage, parser._actions, parser._mutually_exclusive_groups) formatter.add_text(parser.description) formatter.start_section(parser._optionals.title) formatter.add_text(parser._optionals.description) formatter.add_arguments(parser._optionals._group_actions) formatter.end_section() subparsers_actions = [ action for action in parser._actions if isinstance(action, argparse._SubParsersAction)] for subparsers_action in subparsers_actions: # get all subparsers and print help subparsers = subparsers_action.choices for subaction in subparsers_action._get_subactions(): subparser = subparsers[subaction.dest] formatter.start_section("{} {} {}".format(formatter._prog, subaction.dest, formatter._format_actions_usage(subparser._actions, []))) formatter.add_text(subaction.help) formatter.add_arguments(subparser._positionals._group_actions) formatter.add_arguments(subparser._optionals._group_actions) formatter.end_section() print(formatter.format_help()) parser.exit(0) def do_test(args): import doctest doctest.testmod() if __name__ == "__main__": tmdb.API_KEY = read_key() parser = argparse.ArgumentParser(description="get movie data", add_help=False) parser.add_argument("--help", action=HelpAction, help="Display full help") parser.add_argument("--log-level", action='store', type=int, help="Set log level (CRITICAL=50,ERROR=40,WARNING=30,INFO=20,DEBUG=10,NOTSET=0") parser.add_argument("-h", action=argparse._HelpAction, help="Display short help") subparsers = parser.add_subparsers() parser_aka = subparsers.add_parser("aka", add_help=False, help="Print alternative title in other languages") parser_aka.add_argument("--lang", help="Language code (default 'DE')") parser_aka.set_defaults(func=do_aka) parser_aka.add_argument("files", action="append", nargs="+", help="Files containing distinct movie-ids") parser_data = subparsers.add_parser("data", add_help=False, help="Print all available data") parser_data.set_defaults(func=do_data) parser_data.add_argument("files", action="append", nargs="+", help="Files containing distinct movie-ids") parser_year = subparsers.add_parser("year", add_help=False, help="Print only the release year") parser_year.set_defaults(func=do_year) parser_year.add_argument("files", action="append", nargs="+", help="Files containing distinct movie-ids") parser_prune = subparsers.add_parser("prune", add_help=False, help="Delete cache entries") parser_prune.set_defaults(func=do_prune) parser_prune.add_argument("files", action="append", nargs="+", help="Files containing distinct movie-ids") parser_rating = subparsers.add_parser("rating", add_help=False, help="Print movie ratings") parser_rating.set_defaults(func=do_rating) parser_rating.add_argument("files", action="append", nargs="+", help="Files containing distinct movie-ids") parser_index = subparsers.add_parser("index", add_help=False, help="Generate index.html file") parser_index.set_defaults(func=do_index) parser_index.add_argument("files", action="append", nargs="+", help="Files containing distinct movie-ids") parser_test = subparsers.add_parser("test", add_help=False, help="Run testsuite") parser_test.set_defaults(func=do_test) args = parser.parse_args(sys.argv[1:]) if "log_level" in args: logging.basicConfig(level=args.log_level) if "files" in args: ids = map(lambda filename: (lambda x: (filename, x.groups()[0]) if x else None)(re.match(".*#(tt[0-9]{7}).*", filename)), args.files[0]) args.func(args, filter(lambda i: i is not None, ids)) else: args.func(args)