From 28fe8bd9bc1ddbbec6e953ca0b3cdda8240c1e72 Mon Sep 17 00:00:00 2001 From: Felipe Marinho Date: Thu, 14 Nov 2024 15:12:56 -0300 Subject: [PATCH] chg: refactor: use langcodes instead of string for language representation --- .../implementations/themoviedatabase.py | 98 +++++++++++-------- 1 file changed, 57 insertions(+), 41 deletions(-) diff --git a/scanner/providers/implementations/themoviedatabase.py b/scanner/providers/implementations/themoviedatabase.py index 93bdc8f5..b5b8b86d 100644 --- a/scanner/providers/implementations/themoviedatabase.py +++ b/scanner/providers/implementations/themoviedatabase.py @@ -4,6 +4,7 @@ from datetime import datetime, timedelta from logging import getLogger from typing import Awaitable, Callable, Dict, List, Optional, Any, TypeVar from itertools import accumulate, zip_longest +from langcodes import Language from providers.utils import ProviderError from matcher.cache import cache @@ -31,7 +32,7 @@ class TheMovieDatabase(Provider): api_key: str, ) -> None: super().__init__() - self._languages = languages + self._languages = [Language.get(l) for l in languages] self._client = client self.base = "https://api.themoviedb.org/3" self.api_key = api_key @@ -78,7 +79,7 @@ class TheMovieDatabase(Provider): [self.genre_map[x["id"]] for x in genres if x["id"] in self.genre_map] ) - def get_languages(self, *args): + def get_languages(self, *args) -> list[Language]: return self._languages + list(args) async def get( @@ -99,16 +100,16 @@ class TheMovieDatabase(Provider): T = TypeVar("T") - def merge_translations(self, host, translations, *, languages: list[str]): + def merge_translations(self, host, translations, *, languages: list[Language]): host.translations = { - k: v.translations[k] for k, v in zip(languages, translations) + k.to_tag(): v.translations[k.to_tag()] for k, v in zip(languages, translations) } return host async def process_translations( self, for_language: Callable[[str], Awaitable[T]], - languages: list[str], + languages: list[Language], post_merge: Callable[[T, list[T]], T] | None = None, ) -> T: tasks = map(lambda lng: for_language(lng), languages) @@ -138,13 +139,13 @@ class TheMovieDatabase(Provider): }, ) - def get_best_image(self, item, lng, key): + async def get_best_image(self, item, lng, key): """ Retrieves the best available images for a item based on localization. Args: item (dict): A dictionary containing item information, including images and language details. - lng (str): The preferred language code for the images in ISO 639-1 format. + lng (Language): The preferred language for the images. key (str): The key to access the images in the item dictionary. (e.g. "posters", "backdrops", "logos") Returns: list: A list of images, prioritized by localization, original language, and any available image. @@ -160,7 +161,7 @@ class TheMovieDatabase(Provider): localized_images = [ image for image in item["images"][key] - if image.get("iso_639_1") == lng + if image.get("iso_639_1") == lng.language ] # Step 2: If no localized images, try images in the original language @@ -175,6 +176,23 @@ class TheMovieDatabase(Provider): if not localized_images: localized_images = item["images"][key] + # Corner case: If there are no images at all, call TMDB images API. + # Although doing another API call is not ideal, this would only be called very rarely. + if not localized_images: + logger.debug( + "No images found for %s %s. Calling TMDB images API.", + item["media_type"], + item["id"], + ) + images = await self.get( + f"{item['media_type']}/{item['id']}/images", + ) + localized_images = sorted( + images[key], + key=lambda x: (x.get("width", 0), x.get("vote_average", 0)), + reverse=True, + ) + return localized_images async def search_movie(self, name: str, year: Optional[int]) -> Movie: @@ -189,14 +207,13 @@ class TheMovieDatabase(Provider): async def identify_movie(self, movie_id: str, original_language: str) -> Movie: languages = self.get_languages() - async def for_language(lng: str) -> Movie: - lng_iso639_1 = lng.split("-").pop(0) + async def for_language(lng: Language) -> Movie: movie = await self.get( f"movie/{movie_id}", params={ - "language": lng, + "language": lng.to_tag(), "append_to_response": "alternative_titles,videos,credits,keywords,images", - "include_image_language": f"{lng_iso639_1},null,{original_language}", + "include_image_language": f"{lng.language},null,{original_language}", }, ) logger.debug("TMDb responded: %s", movie) @@ -252,11 +269,11 @@ class TheMovieDatabase(Provider): tags=list(map(lambda x: x["name"], movie["keywords"]["keywords"])), overview=movie["overview"], posters=self.get_image( - self.get_best_image(movie, lng_iso639_1, "posters") + await self.get_best_image(movie, lng, "posters") ), - logos=self.get_image(self.get_best_image(movie, lng_iso639_1, "logos")), + logos=self.get_image(await self.get_best_image(movie, lng, "logos")), thumbnails=self.get_image( - self.get_best_image(movie, lng_iso639_1, "backdrops") + await self.get_best_image(movie, lng, "backdrops") ), trailers=[ f"https://www.youtube.com/watch?v={x['key']}" @@ -264,7 +281,7 @@ class TheMovieDatabase(Provider): if x["type"] == "Trailer" and x["site"] == "YouTube" ], ) - ret.translations = {lng: translation} + ret.translations = {lng.to_tag(): translation} return ret ret = await self.process_translations(for_language, languages) @@ -272,9 +289,10 @@ class TheMovieDatabase(Provider): ret.original_language is not None and ret.original_language not in ret.translations ): - ret.translations[ret.original_language] = ( - await for_language(ret.original_language) - ).translations[ret.original_language] + orig_language = Language.get(ret.original_language) + ret.translations[orig_language.to_tag()] = ( + await for_language(orig_language) + ).translations[orig_language.to_tag()] return ret @cache(ttl=timedelta(days=1)) @@ -284,16 +302,13 @@ class TheMovieDatabase(Provider): ) -> Show: languages = self.get_languages() - async def for_language(lng: str) -> Show: - # ISO 639-1 (eg. "en") is required as TMDB does not support fetching images - # by ISO 639-2 + ISO 3166-1 alpha-2 (eg. "en-US") - lng_iso639_1 = lng.split("-").pop(0) + async def for_language(lng: Language) -> Show: show = await self.get( f"tv/{show_id}", params={ - "language": lng, + "language": lng.to_tag(), "append_to_response": "alternative_titles,videos,credits,keywords,images,external_ids", - "include_image_language": f"{lng_iso639_1},null,en", + "include_image_language": f"{lng.language},null,en", }, ) logger.debug("TMDb responded: %s", show) @@ -347,11 +362,11 @@ class TheMovieDatabase(Provider): tags=list(map(lambda x: x["name"], show["keywords"]["results"])), overview=show["overview"], posters=self.get_image( - self.get_best_image(show, lng_iso639_1, "posters") + await self.get_best_image(show, lng, "posters") ), - logos=self.get_image(self.get_best_image(show, lng_iso639_1, "logos")), + logos=self.get_image(await self.get_best_image(show, lng, "logos")), thumbnails=self.get_image( - self.get_best_image(show, lng_iso639_1, "backdrops") + await self.get_best_image(show, lng, "backdrops") ), trailers=[ f"https://www.youtube.com/watch?v={x['key']}" @@ -359,7 +374,7 @@ class TheMovieDatabase(Provider): if x["type"] == "Trailer" and x["site"] == "YouTube" ], ) - ret.translations = {lng: translation} + ret.translations = {lng.to_tag(): translation} return ret def merge_seasons_translations(item: Show, items: list[Show]) -> Show: @@ -387,13 +402,14 @@ class TheMovieDatabase(Provider): ret.original_language is not None and ret.original_language not in ret.translations ): - ret.translations[ret.original_language] = ( - await for_language(ret.original_language) - ).translations[ret.original_language] + orig_language = Language.get(ret.original_language) + ret.translations[orig_language.to_tag()] = ( + await for_language(orig_language) + ).translations[orig_language.to_tag()] return ret def to_season( - self, season: dict[str, Any], *, language: str, show_id: str + self, season: dict[str, Any], *, language: Language, show_id: str ) -> Season: return Season( season_number=season["season_number"], @@ -409,7 +425,7 @@ class TheMovieDatabase(Provider): ) }, translations={ - language: SeasonTranslation( + language.to_tag(): SeasonTranslation( name=season["name"], overview=season["overview"], posters=[ @@ -481,19 +497,19 @@ class TheMovieDatabase(Provider): async def identify_episode( self, show_id: str, season: Optional[int], episode_nbr: int, absolute: int ) -> Episode: - async def for_language(lng: str) -> Episode: + async def for_language(lng: Language) -> Episode: try: episode = await self.get( f"tv/{show_id}/season/{season}/episode/{episode_nbr}", params={ - "language": lng, + "language": lng.to_tag(), }, ) except: episode = await self.get( f"tv/{show_id}/season/{season}/episode/{absolute}", params={ - "language": lng, + "language": lng.to_tag(), }, not_found_fail=f"Could not find episode {episode_nbr} of season {season} of serie {show_id} (absolute: {absolute})", ) @@ -534,7 +550,7 @@ class TheMovieDatabase(Provider): name=episode["name"], overview=episode["overview"], ) - ret.translations = {lng: translation} + ret.translations = {lng.to_tag(): translation} return ret return await self.process_translations(for_language, self.get_languages()) @@ -723,11 +739,11 @@ class TheMovieDatabase(Provider): async def identify_collection(self, provider_id: str) -> Collection: languages = self.get_languages() - async def for_language(lng: str) -> Collection: + async def for_language(lng: Language) -> Collection: collection = await self.get( f"collection/{provider_id}", params={ - "language": lng, + "language": lng.to_tag(), }, ) logger.debug("TMDb responded: %s", collection) @@ -751,7 +767,7 @@ class TheMovieDatabase(Provider): f"https://image.tmdb.org/t/p/original{collection['backdrop_path']}" ], ) - ret.translations = {lng: translation} + ret.translations = {lng.to_tag(): translation} return ret return await self.process_translations(for_language, languages)