diff --git a/scanner/scanner/providers/themoviedatabase.py b/scanner/scanner/providers/themoviedatabase.py index bc78cada..8fcd1cb9 100644 --- a/scanner/scanner/providers/themoviedatabase.py +++ b/scanner/scanner/providers/themoviedatabase.py @@ -3,21 +3,21 @@ from datetime import datetime, timedelta from itertools import accumulate, zip_longest from logging import getLogger from statistics import mean -from typing import Any, Generator, List, Optional, override +from typing import Any, Generator, Optional, override from aiohttp import ClientSession from langcodes import Language from matcher.cache import cache -from scanner.models.staff import Character, Person, Role, Staff from ..models.collection import Collection, CollectionTranslation from ..models.entry import Entry, EntryTranslation from ..models.genre import Genre from ..models.metadataid import EpisodeId, MetadataId -from ..models.movie import Movie, MovieStatus, MovieTranslation +from ..models.movie import Movie, MovieStatus, MovieTranslation, SearchMovie from ..models.season import Season, SeasonTranslation from ..models.serie import Serie, SerieStatus, SerieTranslation +from ..models.staff import Character, Person, Role, Staff from ..models.studio import Studio, StudioTranslation from ..utils import clean, to_slug from .provider import Provider, ProviderError @@ -73,104 +73,38 @@ class TheMovieDatabase(Provider): def name(self) -> str: return "themoviedatabase" - async def _get( - self, - path: str, - *, - params: dict[str, Any] = {}, - not_found_fail: Optional[str] = None, - ): - params = {k: v for k, v in params.items() if v is not None} - async with self._client.get( - f"{self._base}/{path}", params={"api_key": self._api_key, **params} - ) as r: - if not_found_fail and r.status == 404: - raise ProviderError(not_found_fail) - r.raise_for_status() - return await r.json() - - def _map_genres(self, genres: Generator[int]) -> list[Genre]: - def flatten(x: Genre | list[Genre] | list[Genre | list[Genre]]) -> list[Genre]: - if isinstance(x, list): - return [j for i in x for j in flatten(i)] - return [x] - - return flatten([self._genre_map[x] for x in genres if x in self._genre_map]) - - def _map_studio(self, company: dict[str, Any]) -> Studio: - return Studio( - slug=to_slug(company["name"]), - external_id={ - self.name: MetadataId( - data_id=company["id"], - link=f"https://www.themoviedb.org/company/{company['id']}", - ) - }, - translations={ - "en": StudioTranslation( - name=company["name"], - logo=f"https://image.tmdb.org/t/p/original{company['logo_path']}" - if "logo_path" in company - else None, - ), - }, - ) - - def _map_staff(self, person: dict[str, Any]) -> Staff: - return Staff( - # TODO: map those to Role (see https://developer.themoviedb.org/reference/configuration-jobs for list) - kind=person["known_for_department"], - character=Character( - name=person["character"], - latin_name=None, - image=None, - ), - staff=Person( - slug=to_slug(person["name"]), - name=person["name"], - latin_name=person["original_name"], - image=self._image_path + person["profile_path"], + @override + async def search_movies( + self, title: str, year: int | None, *, language: list[Language] + ) -> list[SearchMovie]: + search = ( + await self._get( + "search/movie", + params={ + "query": title, + "year": year, + "languages": [str(x) for x in language], + }, + ) + )["results"] + search = self._sort_search(search, title, year) + return [ + SearchMovie( + slug=to_slug(x["title"]), + name=x["title"], + description=x["overview"], + air_date=datetime.strptime(x["release_date"], "%Y-%m-%d").date(), + poster=self._image_path + x["poster_path"], + original_language=Language.get(x["original_language"]), external_id={ self.name: MetadataId( - data_id=person["id"], - link=f"https://www.themoviedb.org/person/{person['id']}", + data_id=x["id"], + link=f"https://www.themoviedb.org/movie/{x['id']}", ) }, - ), - ) - - def _pick_image(self, item: dict[str, Any], lng: str, key: str) -> str | None: - images = sorted( - item["images"][key], - key=lambda x: (x.get("vote_average", 0), x.get("width", 0)), - reverse=True, - ) - - # check images in your language - localized = next((x for x in images if x["iso_639_1"] == lng), None) - if localized: - return self._image_path + localized - # if failed, check images without text - notext = next((x for x in images if x["iso_639_1"] == None), None) - if notext: - return self._image_path + notext - # take a random image, it's better than nothing - random_img = next((x for x in images if x["iso_639_1"] == None), None) - if random_img: - return self._image_path + random_img - return None - - async def search_movie(self, name: str, year: Optional[int]) -> Movie: - search_results = ( - await self._get("search/movie", params={"query": name, "year": year}) - )["results"] - if len(search_results) == 0: - raise ProviderError(f"No result for a movie named: {name}") - search = self.get_best_result(search_results, name, year) - original_language = Language.get(search["original_language"]) - return await self.identify_movie( - search["id"], original_language=original_language - ) + ) + for x in search + ] @override async def get_movie(self, external_id: dict[str, str]) -> Movie | None: @@ -425,7 +359,7 @@ class TheMovieDatabase(Provider): if len(search_results) == 0: raise ProviderError(f"No result for a tv show named: {name}") - search = self.get_best_result(search_results, name, year) + search = self._sort_search(search_results, name, year) show_id = search["id"] return PartialShow( name=search["name"], @@ -523,44 +457,6 @@ class TheMovieDatabase(Provider): return await self.process_translations(for_language, self.get_languages()) - def get_best_result( - self, search_results: List[Any], name: str, year: Optional[int] - ) -> Any: - results = search_results - - # Find perfect match by year since sometime tmdb decides to discard the year parameter. - if year: - results = list( - x - for x in search_results - if ("first_air_date" in x and x["first_air_date"].startswith(str(year))) - or ("release_date" in x and x["release_date"].startswith(str(year))) - ) - if not results: - results = search_results - - # If there is a perfect match use it (and if there are multiple, use the most popular one) - res = sorted( - ( - x - for x in results - if ("name" in x and x["name"].casefold() == name.casefold()) - or ("title" in x and x["title"].casefold() == name.casefold()) - ), - key=lambda x: (x["vote_count"], x["popularity"]), - reverse=True, - ) - if res: - results = res - else: - # Ignore totally unpopular shows or unknown ones. - # sorted is stable and False Any: + results = search + + # Find perfect match by year since sometime tmdb decides to discard the year parameter. + if year: + results = [ + x + for x in search + if ("first_air_date" in x and x["first_air_date"].startswith(str(year))) + or ("release_date" in x and x["release_date"].startswith(str(year))) + ] + if not results: + results = search + + # If there is a perfect match use it (and if there are multiple, use the most popular one) + res = sorted( + ( + x + for x in results + if ("name" in x and x["name"].casefold() == name.casefold()) + or ("title" in x and x["title"].casefold() == name.casefold()) + ), + key=lambda x: (x["vote_count"], x["popularity"]), + reverse=True, + ) + if res: + results = res + else: + # Ignore totally unpopular shows or unknown ones. + # sorted is stable and False list[Genre]: + def flatten(x: Genre | list[Genre] | list[Genre | list[Genre]]) -> list[Genre]: + if isinstance(x, list): + return [j for i in x for j in flatten(i)] + return [x] + + return flatten([self._genre_map[x] for x in genres if x in self._genre_map]) + + def _map_studio(self, company: dict[str, Any]) -> Studio: + return Studio( + slug=to_slug(company["name"]), + external_id={ + self.name: MetadataId( + data_id=company["id"], + link=f"https://www.themoviedb.org/company/{company['id']}", + ) + }, + translations={ + "en": StudioTranslation( + name=company["name"], + logo=f"https://image.tmdb.org/t/p/original{company['logo_path']}" + if "logo_path" in company + else None, + ), + }, + ) + + def _map_staff(self, person: dict[str, Any]) -> Staff: + return Staff( + # TODO: map those to Role (see https://developer.themoviedb.org/reference/configuration-jobs for list) + kind=person["known_for_department"], + character=Character( + name=person["character"], + latin_name=None, + image=None, + ), + staff=Person( + slug=to_slug(person["name"]), + name=person["name"], + latin_name=person["original_name"], + image=self._image_path + person["profile_path"], + external_id={ + self.name: MetadataId( + data_id=person["id"], + link=f"https://www.themoviedb.org/person/{person['id']}", + ) + }, + ), + ) + + def _pick_image(self, item: dict[str, Any], lng: str, key: str) -> str | None: + images = sorted( + item["images"][key], + key=lambda x: (x.get("vote_average", 0), x.get("width", 0)), + reverse=True, + ) + + # check images in your language + localized = next((x for x in images if x["iso_639_1"] == lng), None) + if localized: + return self._image_path + localized + # if failed, check images without text + notext = next((x for x in images if x["iso_639_1"] == None), None) + if notext: + return self._image_path + notext + # take a random image, it's better than nothing + random_img = next((x for x in images if x["iso_639_1"] == None), None) + if random_img: + return self._image_path + random_img + return None