import asyncio from datetime import datetime import logging from aiohttp import ClientSession from typing import Callable, Dict, Optional, Any from providers.types.genre import Genre from providers.types.metadataid import MetadataID from ..provider import Provider from ..types.movie import Movie, MovieTranslation from ..types.status import Status class TheMovieDatabase(Provider): def __init__(self, client: ClientSession, api_key: str) -> None: super().__init__() self._client = client self.base = "https://api.themoviedb.org/3" self.api_key = api_key self.genre_map = { 28: Genre.ACTION, 12: Genre.ADVENTURE, 16: Genre.ANIMATION, 35: Genre.COMEDY, 80: Genre.CRIME, 99: Genre.DOCUMENTARY, 18: Genre.DRAMA, 10751: Genre.FAMILY, 14: Genre.FANTASY, 36: Genre.HISTORY, 27: Genre.HORROR, 10402: Genre.MUSIC, 9648: Genre.MYSTERY, 10749: Genre.ROMANCE, 878: Genre.SCIENCE_FICTION, 53: Genre.THRILLER, 10752: Genre.WAR, 37: Genre.WESTERN, } async def get(self, path: str, *, params: dict[str, Any] = {}): params = {k: v for k, v in params.items() if v is not None} async with self._client.get( f"{self.base}/{path}", params={"api_key": self.api_key, **params} ) as r: r.raise_for_status() return await r.json() def get_image(self, images: list[Dict[str, Any]]) -> list[str]: return [ f"https://image.tmdb.org/t/p/original{x['file_path']}" for x in images if x["file_path"] ] async def identify_movie( self, name: str, year: Optional[int], *, language: list[str] ) -> Movie: search = (await self.get("search/movie", params={"query": name, "year": year}))[ "results" ][0] movie_id = search["id"] if search["original_language"] not in language: language.append(search["original_language"]) async def for_language(lng: str) -> Movie: movie = await self.get( f"/movie/{movie_id}", params={ "language": lng, "append_to_response": "alternative_titles,videos,credits,keywords,images", }, ) logging.debug("TMDb responded: %s", movie) # TODO: Use collection data ret = Movie( original_language=movie["original_language"], aliases=[x["title"] for x in movie["alternative_titles"]["titles"]], release_date=datetime.strptime( movie["release_date"], "%Y-%m-%d" ).date(), status=Status.FINISHED if movie["status"] == "Released" else Status.PLANNED, studios=list(map(lambda x: x["name"], movie["production_companies"])), genres=[self.genre_map[x["id"]] for x in movie["genres"] if x["id"] in self.genre_map], external_id={ "themoviedatabase": MetadataID( movie["id"], f"https://www.themoviedb.org/movie/{movie['id']}" ), "imdb": MetadataID( movie["imdb_id"], f"https://www.imdb.com/title/{movie['imdb_id']}", ), } # TODO: Add cast information ) translation = MovieTranslation( name=movie["title"], tagline=movie["tagline"], keywords=list(map(lambda x: x["name"], movie["keywords"]["keywords"])), overview=movie["overview"], posters=self.get_image(movie["images"]["posters"]), logos=self.get_image(movie["images"]["logos"]), thumbnails=self.get_image(movie["images"]["backdrops"]), trailers=[ f"https://www.youtube.com/watch?v{x['key']}" for x in movie["videos"]["results"] if x["type"] == "Trailer" and x["site"] == "YouTube" ], ) ret.translations = {lng: translation} return ret # TODO: make the folllowing generic tasks = map(lambda lng: for_language(lng), language) movies: list[Movie] = await asyncio.gather(*tasks) movie = movies[0] movie.translations = {k: v.translations[k] for k, v in zip(language, movies)} return movie