chg: refactor: use langcodes instead of string for language representation

This commit is contained in:
Felipe Marinho 2024-11-14 15:12:56 -03:00 committed by Zoe Roux
parent 7c0424fcda
commit 28fe8bd9bc
No known key found for this signature in database

View File

@ -4,6 +4,7 @@ from datetime import datetime, timedelta
from logging import getLogger
from typing import Awaitable, Callable, Dict, List, Optional, Any, TypeVar
from itertools import accumulate, zip_longest
from langcodes import Language
from providers.utils import ProviderError
from matcher.cache import cache
@ -31,7 +32,7 @@ class TheMovieDatabase(Provider):
api_key: str,
) -> None:
super().__init__()
self._languages = languages
self._languages = [Language.get(l) for l in languages]
self._client = client
self.base = "https://api.themoviedb.org/3"
self.api_key = api_key
@ -78,7 +79,7 @@ class TheMovieDatabase(Provider):
[self.genre_map[x["id"]] for x in genres if x["id"] in self.genre_map]
)
def get_languages(self, *args):
def get_languages(self, *args) -> list[Language]:
return self._languages + list(args)
async def get(
@ -99,16 +100,16 @@ class TheMovieDatabase(Provider):
T = TypeVar("T")
def merge_translations(self, host, translations, *, languages: list[str]):
def merge_translations(self, host, translations, *, languages: list[Language]):
host.translations = {
k: v.translations[k] for k, v in zip(languages, translations)
k.to_tag(): v.translations[k.to_tag()] for k, v in zip(languages, translations)
}
return host
async def process_translations(
self,
for_language: Callable[[str], Awaitable[T]],
languages: list[str],
languages: list[Language],
post_merge: Callable[[T, list[T]], T] | None = None,
) -> T:
tasks = map(lambda lng: for_language(lng), languages)
@ -138,13 +139,13 @@ class TheMovieDatabase(Provider):
},
)
def get_best_image(self, item, lng, key):
async def get_best_image(self, item, lng, key):
"""
Retrieves the best available images for a item based on localization.
Args:
item (dict): A dictionary containing item information, including images and language details.
lng (str): The preferred language code for the images in ISO 639-1 format.
lng (Language): The preferred language for the images.
key (str): The key to access the images in the item dictionary. (e.g. "posters", "backdrops", "logos")
Returns:
list: A list of images, prioritized by localization, original language, and any available image.
@ -160,7 +161,7 @@ class TheMovieDatabase(Provider):
localized_images = [
image
for image in item["images"][key]
if image.get("iso_639_1") == lng
if image.get("iso_639_1") == lng.language
]
# Step 2: If no localized images, try images in the original language
@ -175,6 +176,23 @@ class TheMovieDatabase(Provider):
if not localized_images:
localized_images = item["images"][key]
# Corner case: If there are no images at all, call TMDB images API.
# Although doing another API call is not ideal, this would only be called very rarely.
if not localized_images:
logger.debug(
"No images found for %s %s. Calling TMDB images API.",
item["media_type"],
item["id"],
)
images = await self.get(
f"{item['media_type']}/{item['id']}/images",
)
localized_images = sorted(
images[key],
key=lambda x: (x.get("width", 0), x.get("vote_average", 0)),
reverse=True,
)
return localized_images
async def search_movie(self, name: str, year: Optional[int]) -> Movie:
@ -189,14 +207,13 @@ class TheMovieDatabase(Provider):
async def identify_movie(self, movie_id: str, original_language: str) -> Movie:
languages = self.get_languages()
async def for_language(lng: str) -> Movie:
lng_iso639_1 = lng.split("-").pop(0)
async def for_language(lng: Language) -> Movie:
movie = await self.get(
f"movie/{movie_id}",
params={
"language": lng,
"language": lng.to_tag(),
"append_to_response": "alternative_titles,videos,credits,keywords,images",
"include_image_language": f"{lng_iso639_1},null,{original_language}",
"include_image_language": f"{lng.language},null,{original_language}",
},
)
logger.debug("TMDb responded: %s", movie)
@ -252,11 +269,11 @@ class TheMovieDatabase(Provider):
tags=list(map(lambda x: x["name"], movie["keywords"]["keywords"])),
overview=movie["overview"],
posters=self.get_image(
self.get_best_image(movie, lng_iso639_1, "posters")
await self.get_best_image(movie, lng, "posters")
),
logos=self.get_image(self.get_best_image(movie, lng_iso639_1, "logos")),
logos=self.get_image(await self.get_best_image(movie, lng, "logos")),
thumbnails=self.get_image(
self.get_best_image(movie, lng_iso639_1, "backdrops")
await self.get_best_image(movie, lng, "backdrops")
),
trailers=[
f"https://www.youtube.com/watch?v={x['key']}"
@ -264,7 +281,7 @@ class TheMovieDatabase(Provider):
if x["type"] == "Trailer" and x["site"] == "YouTube"
],
)
ret.translations = {lng: translation}
ret.translations = {lng.to_tag(): translation}
return ret
ret = await self.process_translations(for_language, languages)
@ -272,9 +289,10 @@ class TheMovieDatabase(Provider):
ret.original_language is not None
and ret.original_language not in ret.translations
):
ret.translations[ret.original_language] = (
await for_language(ret.original_language)
).translations[ret.original_language]
orig_language = Language.get(ret.original_language)
ret.translations[orig_language.to_tag()] = (
await for_language(orig_language)
).translations[orig_language.to_tag()]
return ret
@cache(ttl=timedelta(days=1))
@ -284,16 +302,13 @@ class TheMovieDatabase(Provider):
) -> Show:
languages = self.get_languages()
async def for_language(lng: str) -> Show:
# ISO 639-1 (eg. "en") is required as TMDB does not support fetching images
# by ISO 639-2 + ISO 3166-1 alpha-2 (eg. "en-US")
lng_iso639_1 = lng.split("-").pop(0)
async def for_language(lng: Language) -> Show:
show = await self.get(
f"tv/{show_id}",
params={
"language": lng,
"language": lng.to_tag(),
"append_to_response": "alternative_titles,videos,credits,keywords,images,external_ids",
"include_image_language": f"{lng_iso639_1},null,en",
"include_image_language": f"{lng.language},null,en",
},
)
logger.debug("TMDb responded: %s", show)
@ -347,11 +362,11 @@ class TheMovieDatabase(Provider):
tags=list(map(lambda x: x["name"], show["keywords"]["results"])),
overview=show["overview"],
posters=self.get_image(
self.get_best_image(show, lng_iso639_1, "posters")
await self.get_best_image(show, lng, "posters")
),
logos=self.get_image(self.get_best_image(show, lng_iso639_1, "logos")),
logos=self.get_image(await self.get_best_image(show, lng, "logos")),
thumbnails=self.get_image(
self.get_best_image(show, lng_iso639_1, "backdrops")
await self.get_best_image(show, lng, "backdrops")
),
trailers=[
f"https://www.youtube.com/watch?v={x['key']}"
@ -359,7 +374,7 @@ class TheMovieDatabase(Provider):
if x["type"] == "Trailer" and x["site"] == "YouTube"
],
)
ret.translations = {lng: translation}
ret.translations = {lng.to_tag(): translation}
return ret
def merge_seasons_translations(item: Show, items: list[Show]) -> Show:
@ -387,13 +402,14 @@ class TheMovieDatabase(Provider):
ret.original_language is not None
and ret.original_language not in ret.translations
):
ret.translations[ret.original_language] = (
await for_language(ret.original_language)
).translations[ret.original_language]
orig_language = Language.get(ret.original_language)
ret.translations[orig_language.to_tag()] = (
await for_language(orig_language)
).translations[orig_language.to_tag()]
return ret
def to_season(
self, season: dict[str, Any], *, language: str, show_id: str
self, season: dict[str, Any], *, language: Language, show_id: str
) -> Season:
return Season(
season_number=season["season_number"],
@ -409,7 +425,7 @@ class TheMovieDatabase(Provider):
)
},
translations={
language: SeasonTranslation(
language.to_tag(): SeasonTranslation(
name=season["name"],
overview=season["overview"],
posters=[
@ -481,19 +497,19 @@ class TheMovieDatabase(Provider):
async def identify_episode(
self, show_id: str, season: Optional[int], episode_nbr: int, absolute: int
) -> Episode:
async def for_language(lng: str) -> Episode:
async def for_language(lng: Language) -> Episode:
try:
episode = await self.get(
f"tv/{show_id}/season/{season}/episode/{episode_nbr}",
params={
"language": lng,
"language": lng.to_tag(),
},
)
except:
episode = await self.get(
f"tv/{show_id}/season/{season}/episode/{absolute}",
params={
"language": lng,
"language": lng.to_tag(),
},
not_found_fail=f"Could not find episode {episode_nbr} of season {season} of serie {show_id} (absolute: {absolute})",
)
@ -534,7 +550,7 @@ class TheMovieDatabase(Provider):
name=episode["name"],
overview=episode["overview"],
)
ret.translations = {lng: translation}
ret.translations = {lng.to_tag(): translation}
return ret
return await self.process_translations(for_language, self.get_languages())
@ -723,11 +739,11 @@ class TheMovieDatabase(Provider):
async def identify_collection(self, provider_id: str) -> Collection:
languages = self.get_languages()
async def for_language(lng: str) -> Collection:
async def for_language(lng: Language) -> Collection:
collection = await self.get(
f"collection/{provider_id}",
params={
"language": lng,
"language": lng.to_tag(),
},
)
logger.debug("TMDb responded: %s", collection)
@ -751,7 +767,7 @@ class TheMovieDatabase(Provider):
f"https://image.tmdb.org/t/p/original{collection['backdrop_path']}"
],
)
ret.translations = {lng: translation}
ret.translations = {lng.to_tag(): translation}
return ret
return await self.process_translations(for_language, languages)