Fix localized images selection (#677)

This commit is contained in:
Zoe Roux 2025-04-09 00:21:25 +02:00 committed by GitHub
commit 29f38e42ae
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 116 additions and 72 deletions

View File

@ -10,8 +10,10 @@ LIBRARY_ROOT=./video
# It will automatically be cleaned up on kyoo's startup/shutdown/runtime.
CACHE_ROOT=/tmp/kyoo_cache
LIBRARY_LANGUAGES=en
# A pattern (regex) to ignore video files.
LIBRARY_IGNORE_PATTERN=".*/[dD]ownloads?/.*"
# If this is true, kyoo will prefer to download the media in the original language of the item.
MEDIA_PREFER_ORIGINAL_LANGUAGE=false
# A pattern (regex) to ignore files.
LIBRARY_IGNORE_PATTERN=".*/[dD]ownloads?/.*|.*[Tt][Rr][Aa][Ii][Ll][Ee][Rr].*"
# If this is true, new accounts wont have any permissions before you approve them in your admin dashboard.
REQUIRE_ACCOUNT_VERIFICATION=true

View File

@ -52,7 +52,7 @@ class Matcher:
if "mimetype" not in raw or not raw["mimetype"].startswith("video"):
return
logger.info("Identied %s: %s", path, raw)
logger.info("Identified %s: %s", path, raw)
title = raw.get("title")
if not isinstance(title, str):

View File

@ -4,6 +4,7 @@ from datetime import datetime, timedelta
from logging import getLogger
from typing import Awaitable, Callable, Dict, List, Optional, Any, TypeVar
from itertools import accumulate, zip_longest
from langcodes import Language
from providers.utils import ProviderError
from matcher.cache import cache
@ -31,7 +32,7 @@ class TheMovieDatabase(Provider):
api_key: str,
) -> None:
super().__init__()
self._languages = languages
self._languages = [Language.get(l) for l in languages]
self._client = client
self.base = "https://api.themoviedb.org/3"
self.api_key = api_key
@ -78,7 +79,7 @@ class TheMovieDatabase(Provider):
[self.genre_map[x["id"]] for x in genres if x["id"] in self.genre_map]
)
def get_languages(self, *args):
def get_languages(self, *args) -> list[Language]:
return self._languages + list(args)
async def get(
@ -99,16 +100,17 @@ class TheMovieDatabase(Provider):
T = TypeVar("T")
def merge_translations(self, host, translations, *, languages: list[str]):
def merge_translations(self, host, translations, *, languages: list[Language]):
host.translations = {
k: v.translations[k] for k, v in zip(languages, translations)
k.to_tag(): v.translations[k.to_tag()]
for k, v in zip(languages, translations)
}
return host
async def process_translations(
self,
for_language: Callable[[str], Awaitable[T]],
languages: list[str],
languages: list[Language],
post_merge: Callable[[T, list[T]], T] | None = None,
) -> T:
tasks = map(lambda lng: for_language(lng), languages)
@ -138,6 +140,62 @@ class TheMovieDatabase(Provider):
},
)
def get_best_image(
self, item: dict[str, Any], lng: Language, key: str
) -> list[dict]:
"""
Retrieves the best available images for a item based on localization.
Args:
item (dict): A dictionary containing item information, including images and language details.
lng (Language): The preferred language for the images.
key (str): The key to access the images in the item dictionary. (e.g. "posters", "backdrops", "logos")
Returns:
list: A list of images, prioritized by localization, original language, and any available image.
"""
# Order images by size and vote average
item["images"][key] = sorted(
item["images"][key],
key=lambda x: (x.get("vote_average", 0), x.get("width", 0)),
reverse=True,
)
# Step 1: Try to get localized images
localized_images = [
image
for image in item["images"][key]
if image.get("iso_639_1") == lng.language
]
# Step 2: If no localized images, try images in the original language
if not localized_images:
localized_images = [
image
for image in item["images"][key]
if image.get("iso_639_1") == item.get("original_language")
]
# Step 3: If still no images, use any available images
if not localized_images:
localized_images = item["images"][key]
# Step 4: If there are no images at all, fallback to _path attribute.
if not localized_images:
localized_images = self._get_image_fallback(item, key)
return self.get_image(localized_images)
def _get_image_fallback(self, item: dict[str, Any], key: str) -> list[dict]:
"""
Fallback to _path attribute if there are no images available in the images list.
"""
if key == "posters":
return [{"file_path": item.get("poster_path")}]
elif key == "backdrops":
return [{"file_path": item.get("backdrop_path")}]
return []
async def search_movie(self, name: str, year: Optional[int]) -> Movie:
search_results = (
await self.get("search/movie", params={"query": name, "year": year})
@ -145,17 +203,23 @@ class TheMovieDatabase(Provider):
if len(search_results) == 0:
raise ProviderError(f"No result for a movie named: {name}")
search = self.get_best_result(search_results, name, year)
return await self.identify_movie(search["id"])
original_language = Language.get(search["original_language"])
return await self.identify_movie(
search["id"], original_language=original_language
)
async def identify_movie(self, movie_id: str) -> Movie:
async def identify_movie(
self, movie_id: str, original_language: Optional[Language] = None
) -> Movie:
languages = self.get_languages()
async def for_language(lng: str) -> Movie:
async def for_language(lng: Language) -> Movie:
movie = await self.get(
f"movie/{movie_id}",
params={
"language": lng,
"language": lng.to_tag(),
"append_to_response": "alternative_titles,videos,credits,keywords,images",
"include_image_language": f"{lng.language},null,{original_language.language if original_language else ''}",
},
)
logger.debug("TMDb responded: %s", movie)
@ -210,30 +274,16 @@ class TheMovieDatabase(Provider):
tagline=movie["tagline"] if movie["tagline"] else None,
tags=list(map(lambda x: x["name"], movie["keywords"]["keywords"])),
overview=movie["overview"],
posters=self.get_image(
movie["images"]["posters"]
+ (
[{"file_path": movie["poster_path"]}]
if lng == movie["original_language"]
else []
)
),
logos=self.get_image(movie["images"]["logos"]),
thumbnails=self.get_image(
movie["images"]["backdrops"]
+ (
[{"file_path": movie["backdrop_path"]}]
if lng == movie["original_language"]
else []
)
),
posters=self.get_best_image(movie, lng, "posters"),
logos=self.get_best_image(movie, lng, "logos"),
thumbnails=self.get_best_image(movie, lng, "backdrops"),
trailers=[
f"https://www.youtube.com/watch?v={x['key']}"
for x in movie["videos"]["results"]
if x["type"] == "Trailer" and x["site"] == "YouTube"
],
)
ret.translations = {lng: translation}
ret.translations = {lng.to_tag(): translation}
return ret
ret = await self.process_translations(for_language, languages)
@ -241,9 +291,10 @@ class TheMovieDatabase(Provider):
ret.original_language is not None
and ret.original_language not in ret.translations
):
ret.translations[ret.original_language] = (
await for_language(ret.original_language)
).translations[ret.original_language]
orig_language = Language.get(ret.original_language)
ret.translations[orig_language.to_tag()] = (
await for_language(orig_language)
).translations[orig_language.to_tag()]
return ret
@cache(ttl=timedelta(days=1))
@ -253,12 +304,13 @@ class TheMovieDatabase(Provider):
) -> Show:
languages = self.get_languages()
async def for_language(lng: str) -> Show:
async def for_language(lng: Language) -> Show:
show = await self.get(
f"tv/{show_id}",
params={
"language": lng,
"language": lng.to_tag(),
"append_to_response": "alternative_titles,videos,credits,keywords,images,external_ids",
"include_image_language": f"{lng.language},null,en",
},
)
logger.debug("TMDb responded: %s", show)
@ -311,30 +363,16 @@ class TheMovieDatabase(Provider):
tagline=show["tagline"] if show["tagline"] else None,
tags=list(map(lambda x: x["name"], show["keywords"]["results"])),
overview=show["overview"],
posters=self.get_image(
show["images"]["posters"]
+ (
[{"file_path": show["poster_path"]}]
if lng == show["original_language"]
else []
)
),
logos=self.get_image(show["images"]["logos"]),
thumbnails=self.get_image(
show["images"]["backdrops"]
+ (
[{"file_path": show["backdrop_path"]}]
if lng == show["original_language"]
else []
)
),
posters=self.get_best_image(show, lng, "posters"),
logos=self.get_best_image(show, lng, "logos"),
thumbnails=self.get_best_image(show, lng, "backdrops"),
trailers=[
f"https://www.youtube.com/watch?v={x['key']}"
for x in show["videos"]["results"]
if x["type"] == "Trailer" and x["site"] == "YouTube"
],
)
ret.translations = {lng: translation}
ret.translations = {lng.to_tag(): translation}
return ret
def merge_seasons_translations(item: Show, items: list[Show]) -> Show:
@ -362,13 +400,14 @@ class TheMovieDatabase(Provider):
ret.original_language is not None
and ret.original_language not in ret.translations
):
ret.translations[ret.original_language] = (
await for_language(ret.original_language)
).translations[ret.original_language]
orig_language = Language.get(ret.original_language)
ret.translations[orig_language.to_tag()] = (
await for_language(orig_language)
).translations[orig_language.to_tag()]
return ret
def to_season(
self, season: dict[str, Any], *, language: str, show_id: str
self, season: dict[str, Any], *, language: Language, show_id: str
) -> Season:
return Season(
season_number=season["season_number"],
@ -384,7 +423,7 @@ class TheMovieDatabase(Provider):
)
},
translations={
language: SeasonTranslation(
language.to_tag(): SeasonTranslation(
name=season["name"],
overview=season["overview"],
posters=[
@ -456,19 +495,19 @@ class TheMovieDatabase(Provider):
async def identify_episode(
self, show_id: str, season: Optional[int], episode_nbr: int, absolute: int
) -> Episode:
async def for_language(lng: str) -> Episode:
async def for_language(lng: Language) -> Episode:
try:
episode = await self.get(
f"tv/{show_id}/season/{season}/episode/{episode_nbr}",
params={
"language": lng,
"language": lng.to_tag(),
},
)
except:
episode = await self.get(
f"tv/{show_id}/season/{season}/episode/{absolute}",
params={
"language": lng,
"language": lng.to_tag(),
},
not_found_fail=f"Could not find episode {episode_nbr} of season {season} of serie {show_id} (absolute: {absolute})",
)
@ -509,7 +548,7 @@ class TheMovieDatabase(Provider):
name=episode["name"],
overview=episode["overview"],
)
ret.translations = {lng: translation}
ret.translations = {lng.to_tag(): translation}
return ret
return await self.process_translations(for_language, self.get_languages())
@ -698,11 +737,13 @@ class TheMovieDatabase(Provider):
async def identify_collection(self, provider_id: str) -> Collection:
languages = self.get_languages()
async def for_language(lng: str) -> Collection:
async def for_language(lng: Language) -> Collection:
collection = await self.get(
f"collection/{provider_id}",
params={
"language": lng,
"language": lng.to_tag(),
"append_to_response": "images",
"include_image_language": f"{lng.language},null,en",
},
)
logger.debug("TMDb responded: %s", collection)
@ -718,15 +759,11 @@ class TheMovieDatabase(Provider):
translation = CollectionTranslation(
name=collection["name"],
overview=collection["overview"],
posters=[
f"https://image.tmdb.org/t/p/original{collection['poster_path']}"
],
posters=self.get_best_image(collection, lng, "posters"),
logos=[],
thumbnails=[
f"https://image.tmdb.org/t/p/original{collection['backdrop_path']}"
],
thumbnails=self.get_best_image(collection, lng, "backdrops"),
)
ret.translations = {lng: translation}
ret.translations = {lng.to_tag(): translation}
return ret
return await self.process_translations(for_language, languages)

View File

@ -28,6 +28,9 @@ def normalize_lang(lang: str) -> str:
# For now, the API of kyoo only support one language so we remove the others.
default_languages = os.environ.get("LIBRARY_LANGUAGES", "").split(",")
media_prefer_original_language = (
os.environ.get("MEDIA_PREFER_ORIGINAL_LANGUAGE", "false").lower() == "true"
)
def sort_translations(
@ -64,7 +67,9 @@ def select_image(
chain(
*(
getattr(trans, kind)
for trans in sort_translations(value, prefer_orginal=True)
for trans in sort_translations(
value, prefer_orginal=media_prefer_original_language
)
)
),
None,