diff --git a/scanner/scanner/models/movie.py b/scanner/scanner/models/movie.py index 72b18eb7..27de511b 100644 --- a/scanner/scanner/models/movie.py +++ b/scanner/scanner/models/movie.py @@ -29,11 +29,11 @@ class Movie(Model): air_date: date | None external_id: dict[str, MetadataId] - translations: dict[str, MovieTranslation] = {} - videos: list[str] = [] + translations: dict[Language, MovieTranslation] = {} collections: list[Collection] = [] studios: list[Studio] = [] staff: list[Staff] = [] + videos: list[str] = [] class MovieTranslation(Model): diff --git a/scanner/scanner/models/studio.py b/scanner/scanner/models/studio.py index ffd83336..eff7840b 100644 --- a/scanner/scanner/models/studio.py +++ b/scanner/scanner/models/studio.py @@ -7,9 +7,9 @@ from .metadataid import MetadataId class Studio(Model): slug: str external_id: dict[str, MetadataId] - translations: dict[str, StudioTranslations] = {} + translations: dict[str, StudioTranslation] = {} -class StudioTranslations(Model): +class StudioTranslation(Model): name: str logo: str | None diff --git a/scanner/scanner/providers/themoviedatabase.py b/scanner/scanner/providers/themoviedatabase.py index 85a8fb4b..02b87ea7 100644 --- a/scanner/scanner/providers/themoviedatabase.py +++ b/scanner/scanner/providers/themoviedatabase.py @@ -1,23 +1,25 @@ import asyncio -from aiohttp import ClientSession +import os from datetime import datetime, timedelta -from logging import getLogger -from typing import Awaitable, Callable, Dict, List, Optional, Any, TypeVar from itertools import accumulate, zip_longest +from logging import getLogger +from typing import Any, Awaitable, Callable, Dict, List, Optional, TypeVar, override + +from aiohttp import ClientSession from langcodes import Language -from providers.utils import ProviderError from matcher.cache import cache -from ..provider import Provider -from ..types.movie import Movie, MovieTranslation, Status as MovieStatus -from ..types.season import Season, SeasonTranslation -from ..types.episode import Episode, EpisodeTranslation, PartialShow, EpisodeID -from ..types.studio import Studio -from ..types.genre import Genre -from ..types.metadataid import MetadataID -from ..types.show import Show, ShowTranslation, Status as ShowStatus -from ..types.collection import Collection, CollectionTranslation +from ..models.collection import Collection, CollectionTranslation +from ..models.entry import Entry, EntryTranslation +from ..models.genre import Genre +from ..models.metadataid import EpisodeId, MetadataId +from ..models.movie import Movie, MovieStatus, MovieTranslation +from ..models.season import Season, SeasonTranslation +from ..models.serie import Serie, SerieStatus, SerieTranslation +from ..models.studio import Studio, StudioTranslation +from ..utils import clean, to_slug +from .provider import Provider, ProviderError logger = getLogger(__name__) @@ -27,16 +29,16 @@ class TheMovieDatabase(Provider): def __init__( self, - languages: list[str], client: ClientSession, api_key: str, ) -> None: super().__init__() - self._languages = [Language.get(l) for l in languages] self._client = client - self.base = "https://api.themoviedb.org/3" - self.api_key = api_key - self.genre_map = { + self._base = "https://api.themoviedb.org/3" + self._api_key = ( + os.environ.get("THEMOVIEDB_APIKEY") or TheMovieDatabase.DEFAULT_API_KEY + ) + self._genre_map = { 28: Genre.ACTION, 12: Genre.ADVENTURE, 16: Genre.ANIMATION, @@ -57,7 +59,7 @@ class TheMovieDatabase(Provider): 37: Genre.WESTERN, 10759: [Genre.ACTION, Genre.ADVENTURE], 10762: Genre.KIDS, - 10763: Genre.NEWS, + 10763: [], 10764: Genre.REALITY, 10765: [Genre.SCIENCE_FICTION, Genre.FANTASY], 10766: Genre.SOAP, @@ -66,6 +68,7 @@ class TheMovieDatabase(Provider): } @property + @override def name(self) -> str: return "themoviedatabase" @@ -76,12 +79,9 @@ class TheMovieDatabase(Provider): return [x] return flatten( - [self.genre_map[x["id"]] for x in genres if x["id"] in self.genre_map] + [self._genre_map[x["id"]] for x in genres if x["id"] in self._genre_map] ) - def get_languages(self, *args) -> list[Language]: - return self._languages + list(args) - async def get( self, path: str, @@ -91,110 +91,54 @@ class TheMovieDatabase(Provider): ): params = {k: v for k, v in params.items() if v is not None} async with self._client.get( - f"{self.base}/{path}", params={"api_key": self.api_key, **params} + f"{self._base}/{path}", params={"api_key": self._api_key, **params} ) as r: if not_found_fail and r.status == 404: raise ProviderError(not_found_fail) r.raise_for_status() return await r.json() - T = TypeVar("T") - - def merge_translations(self, host, translations, *, languages: list[Language]): - host.translations = { - k.to_tag(): v.translations[k.to_tag()] - for k, v in zip(languages, translations) - } - return host - - async def process_translations( - self, - for_language: Callable[[str], Awaitable[T]], - languages: list[Language], - post_merge: Callable[[T, list[T]], T] | None = None, - ) -> T: - tasks = map(lambda lng: for_language(lng), languages) - items: list[Any] = await asyncio.gather(*tasks) - item = self.merge_translations(items[0], items, languages=languages) - if post_merge: - item = post_merge(item, items) - return item - - def get_image(self, images: list[Dict[str, Any]]) -> list[str]: - return [ - f"https://image.tmdb.org/t/p/original{x['file_path']}" - for x in images - if x["file_path"] - ] - def to_studio(self, company: dict[str, Any]) -> Studio: return Studio( - name=company["name"], - logos=[f"https://image.tmdb.org/t/p/original{company['logo_path']}"] - if "logo_path" in company - else [], + slug=to_slug(company["name"]), external_id={ - self.name: MetadataID( - company["id"], f"https://www.themoviedb.org/company/{company['id']}" + self.name: MetadataId( + data_id=company["id"], + link=f"https://www.themoviedb.org/company/{company['id']}", ) }, + translations={ + "en": StudioTranslation( + name=company["name"], + logo=f"https://image.tmdb.org/t/p/original{company['logo_path']}" + if "logo_path" in company + else None, + ), + }, ) - def get_best_image( - self, item: dict[str, Any], lng: Language, key: str - ) -> list[dict]: - """ - Retrieves the best available images for a item based on localization. + def _pick_image(self, item: dict[str, Any], lng: str, key: str) -> str | None: + base_path = "https://image.tmdb.org/t/p/original" - Args: - item (dict): A dictionary containing item information, including images and language details. - lng (Language): The preferred language for the images. - key (str): The key to access the images in the item dictionary. (e.g. "posters", "backdrops", "logos") - Returns: - list: A list of images, prioritized by localization, original language, and any available image. - """ - # Order images by size and vote average - item["images"][key] = sorted( + images = sorted( item["images"][key], key=lambda x: (x.get("vote_average", 0), x.get("width", 0)), reverse=True, ) - # Step 1: Try to get localized images - localized_images = [ - image - for image in item["images"][key] - if image.get("iso_639_1") == lng.language - ] - - # Step 2: If no localized images, try images in the original language - if not localized_images: - localized_images = [ - image - for image in item["images"][key] - if image.get("iso_639_1") == item.get("original_language") - ] - - # Step 3: If still no images, use any available images - if not localized_images: - localized_images = item["images"][key] - - # Step 4: If there are no images at all, fallback to _path attribute. - if not localized_images: - localized_images = self._get_image_fallback(item, key) - - return self.get_image(localized_images) - - def _get_image_fallback(self, item: dict[str, Any], key: str) -> list[dict]: - """ - Fallback to _path attribute if there are no images available in the images list. - """ - if key == "posters": - return [{"file_path": item.get("poster_path")}] - elif key == "backdrops": - return [{"file_path": item.get("backdrop_path")}] - - return [] + # check images in your language + localized = next((x for x in images if x["iso_639_1"] == lng), None) + if localized: + return base_path + localized + # if failed, check images without text + notext = next((x for x in images if x["iso_639_1"] == None), None) + if notext: + return base_path + notext + # take a random image, it's better than nothing + random_img = next((x for x in images if x["iso_639_1"] == None), None) + if random_img: + return base_path + random_img + return None async def search_movie(self, name: str, year: Optional[int]) -> Movie: search_results = ( @@ -208,94 +152,107 @@ class TheMovieDatabase(Provider): search["id"], original_language=original_language ) - async def identify_movie( - self, movie_id: str, original_language: Optional[Language] = None - ) -> Movie: - languages = self.get_languages() + @override + async def get_movie(self, external_id: dict[str, str]) -> Movie | None: + # TODO: fallback to search via another id + if self.name not in external_id: + return None - async def for_language(lng: Language) -> Movie: - movie = await self.get( - f"movie/{movie_id}", - params={ - "language": lng.to_tag(), - "append_to_response": "alternative_titles,videos,credits,keywords,images", - "include_image_language": f"{lng.language},null,{original_language.language if original_language else ''}", - }, - ) - logger.debug("TMDb responded: %s", movie) + movie = await self.get( + f"movie/{external_id[self.name]}", + params={ + "append_to_response": "alternative_titles,videos,credits,keywords,images,translations", + }, + ) + logger.debug("TMDb responded: %s", movie) - ret = Movie( - original_language=movie["original_language"], - aliases=[x["title"] for x in movie["alternative_titles"]["titles"]], - air_date=datetime.strptime(movie["release_date"], "%Y-%m-%d").date() - if movie["release_date"] - else None, - status=MovieStatus.FINISHED - if movie["status"] == "Released" - else MovieStatus.PLANNED, - rating=round(float(movie["vote_average"]) * 10), - runtime=int(movie["runtime"]) if movie["runtime"] is not None else None, - studios=[self.to_studio(x) for x in movie["production_companies"]], - genres=self.process_genres(movie["genres"]), - external_id=( + return Movie( + slug=to_slug(movie["title"]), + original_language=Language.get(movie["original_language"]), + genres=self.process_genres(movie["genres"]), + rating=round(float(movie["vote_average"]) * 10), + status=MovieStatus.FINISHED + if movie["status"] == "Released" + else MovieStatus.PLANNED, + runtime=int(movie["runtime"]) if movie["runtime"] is not None else None, + air_date=datetime.strptime(movie["release_date"], "%Y-%m-%d").date() + if movie["release_date"] + else None, + external_id=( + { + self.name: MetadataId( + data_id=movie["id"], + link=f"https://www.themoviedb.org/movie/{movie['id']}", + ) + } + | ( { - self.name: MetadataID( - movie["id"], - f"https://www.themoviedb.org/movie/{movie['id']}", + "imdb": MetadataId( + data_id=movie["imdb_id"], + link=f"https://www.imdb.com/title/{movie['imdb_id']}", ) } - | ( - { - "imdb": MetadataID( - movie["imdb_id"], - f"https://www.imdb.com/title/{movie['imdb_id']}", - ) - } - if movie["imdb_id"] - else {} + if movie["imdb_id"] + else {} + ) + ), + translations={ + Language.get( + f"{trans['iso_639_1']}-{trans['iso_3166_1']}" + ): MovieTranslation( + name=clean(trans["data"]["title"]) + or ( + clean(movie["original_title"]) + if movie["original_language"] == trans["iso_639_1"] + else None ) - ), - collections=[ - Collection( - external_id={ - self.name: MetadataID( - movie["belongs_to_collection"]["id"], - f"https://www.themoviedb.org/collection/{movie['belongs_to_collection']['id']}", - ) - }, - ) - ] - if movie["belongs_to_collection"] is not None - else [], - # TODO: Add cast information - ) - translation = MovieTranslation( - name=movie["title"], - tagline=movie["tagline"] if movie["tagline"] else None, - tags=list(map(lambda x: x["name"], movie["keywords"]["keywords"])), - overview=movie["overview"], - posters=self.get_best_image(movie, lng, "posters"), - logos=self.get_best_image(movie, lng, "logos"), - thumbnails=self.get_best_image(movie, lng, "backdrops"), - trailers=[ - f"https://www.youtube.com/watch?v={x['key']}" - for x in movie["videos"]["results"] - if x["type"] == "Trailer" and x["site"] == "YouTube" - ], - ) - ret.translations = {lng.to_tag(): translation} - return ret - - ret = await self.process_translations(for_language, languages) - if ( - ret.original_language is not None - and ret.original_language not in ret.translations - ): - orig_language = Language.get(ret.original_language) - ret.translations[orig_language.to_tag()] = ( - await for_language(orig_language) - ).translations[orig_language.to_tag()] - return ret + or movie["title"], + latin_name=next( + ( + x["title"] + for x in movie["alternative_titles"]["titles"] + if x["iso_3166_1"] == trans["iso_3166_1"] + and x["type"] == "Romaji" + ), + None, + ), + description=clean(trans["data"]["overview"]), + tagline=clean(trans["data"]["tagline"]), + aliases=[ + x["title"] + for x in movie["alternative_titles"]["titles"] + if x["iso_3166_1"] == trans["iso_3166_1"] + ], + tags=[x["name"] for x in movie["keywords"]["keywords"]], + poster=self._pick_image(movie, trans["iso_639_1"], "posters"), + logo=self._pick_image(movie, trans["iso_639_1"], "logos"), + banner=None, + thumbnail=self._pick_image(movie, trans["iso_639_1"], "backdrops"), + trailer=None, + # TODO: should the trailer be added? or all of them as extra? + # [ + # f"https://www.youtube.com/watch?v={x['key']}" + # for x in movie["videos"]["results"] + # if x["type"] == "Trailer" and x["site"] == "YouTube" + # ], + ) + for trans in movie["translations"]["translations"] + }, + collections=[ + # Collection( + # external_id={ + # self.name: MetadataID( + # movie["belongs_to_collection"]["id"], + # f"https://www.themoviedb.org/collection/{movie['belongs_to_collection']['id']}", + # ) + # }, + # ) + ] + if movie["belongs_to_collection"] is not None + else [], + studios=[self.to_studio(x) for x in movie["production_companies"]], + staff=[], + ) @cache(ttl=timedelta(days=1)) async def identify_show( @@ -363,9 +320,9 @@ class TheMovieDatabase(Provider): tagline=show["tagline"] if show["tagline"] else None, tags=list(map(lambda x: x["name"], show["keywords"]["results"])), overview=show["overview"], - posters=self.get_best_image(show, lng, "posters"), - logos=self.get_best_image(show, lng, "logos"), - thumbnails=self.get_best_image(show, lng, "backdrops"), + posters=self._pick_image(show, lng, "posters"), + logos=self._pick_image(show, lng, "logos"), + thumbnails=self._pick_image(show, lng, "backdrops"), trailers=[ f"https://www.youtube.com/watch?v={x['key']}" for x in show["videos"]["results"] @@ -759,9 +716,9 @@ class TheMovieDatabase(Provider): translation = CollectionTranslation( name=collection["name"], overview=collection["overview"], - posters=self.get_best_image(collection, lng, "posters"), + posters=self._pick_image(collection, lng, "posters"), logos=[], - thumbnails=self.get_best_image(collection, lng, "backdrops"), + thumbnails=self._pick_image(collection, lng, "backdrops"), ) ret.translations = {lng.to_tag(): translation} return ret diff --git a/scanner/scanner/utils.py b/scanner/scanner/utils.py index 19ad42ca..7c0127f5 100644 --- a/scanner/scanner/utils.py +++ b/scanner/scanner/utils.py @@ -1,22 +1,19 @@ -from datetime import date - from langcodes import Language from pydantic import AliasGenerator, BaseModel, ConfigDict from pydantic.alias_generators import to_camel -def format_date(date: date | int | None) -> str | None: - if date is None: - return None - if isinstance(date, int): - return f"{date}-01-01" - return date.isoformat() - - def normalize_lang(lang: str) -> str: return str(Language.get(lang)) +def to_slug(title: str) -> str: + return title + +def clean(val: str) -> str | None: + return val or None + + class Model(BaseModel): model_config = ConfigDict( use_enum_values=True,