Rework get movie for tmdb

This commit is contained in:
Zoe Roux 2025-05-08 03:27:59 +02:00
parent 7a00878eae
commit a4fdeb8a9b
No known key found for this signature in database
4 changed files with 163 additions and 209 deletions

View File

@ -29,11 +29,11 @@ class Movie(Model):
air_date: date | None
external_id: dict[str, MetadataId]
translations: dict[str, MovieTranslation] = {}
videos: list[str] = []
translations: dict[Language, MovieTranslation] = {}
collections: list[Collection] = []
studios: list[Studio] = []
staff: list[Staff] = []
videos: list[str] = []
class MovieTranslation(Model):

View File

@ -7,9 +7,9 @@ from .metadataid import MetadataId
class Studio(Model):
slug: str
external_id: dict[str, MetadataId]
translations: dict[str, StudioTranslations] = {}
translations: dict[str, StudioTranslation] = {}
class StudioTranslations(Model):
class StudioTranslation(Model):
name: str
logo: str | None

View File

@ -1,23 +1,25 @@
import asyncio
from aiohttp import ClientSession
import os
from datetime import datetime, timedelta
from logging import getLogger
from typing import Awaitable, Callable, Dict, List, Optional, Any, TypeVar
from itertools import accumulate, zip_longest
from logging import getLogger
from typing import Any, Awaitable, Callable, Dict, List, Optional, TypeVar, override
from aiohttp import ClientSession
from langcodes import Language
from providers.utils import ProviderError
from matcher.cache import cache
from ..provider import Provider
from ..types.movie import Movie, MovieTranslation, Status as MovieStatus
from ..types.season import Season, SeasonTranslation
from ..types.episode import Episode, EpisodeTranslation, PartialShow, EpisodeID
from ..types.studio import Studio
from ..types.genre import Genre
from ..types.metadataid import MetadataID
from ..types.show import Show, ShowTranslation, Status as ShowStatus
from ..types.collection import Collection, CollectionTranslation
from ..models.collection import Collection, CollectionTranslation
from ..models.entry import Entry, EntryTranslation
from ..models.genre import Genre
from ..models.metadataid import EpisodeId, MetadataId
from ..models.movie import Movie, MovieStatus, MovieTranslation
from ..models.season import Season, SeasonTranslation
from ..models.serie import Serie, SerieStatus, SerieTranslation
from ..models.studio import Studio, StudioTranslation
from ..utils import clean, to_slug
from .provider import Provider, ProviderError
logger = getLogger(__name__)
@ -27,16 +29,16 @@ class TheMovieDatabase(Provider):
def __init__(
self,
languages: list[str],
client: ClientSession,
api_key: str,
) -> None:
super().__init__()
self._languages = [Language.get(l) for l in languages]
self._client = client
self.base = "https://api.themoviedb.org/3"
self.api_key = api_key
self.genre_map = {
self._base = "https://api.themoviedb.org/3"
self._api_key = (
os.environ.get("THEMOVIEDB_APIKEY") or TheMovieDatabase.DEFAULT_API_KEY
)
self._genre_map = {
28: Genre.ACTION,
12: Genre.ADVENTURE,
16: Genre.ANIMATION,
@ -57,7 +59,7 @@ class TheMovieDatabase(Provider):
37: Genre.WESTERN,
10759: [Genre.ACTION, Genre.ADVENTURE],
10762: Genre.KIDS,
10763: Genre.NEWS,
10763: [],
10764: Genre.REALITY,
10765: [Genre.SCIENCE_FICTION, Genre.FANTASY],
10766: Genre.SOAP,
@ -66,6 +68,7 @@ class TheMovieDatabase(Provider):
}
@property
@override
def name(self) -> str:
return "themoviedatabase"
@ -76,12 +79,9 @@ class TheMovieDatabase(Provider):
return [x]
return flatten(
[self.genre_map[x["id"]] for x in genres if x["id"] in self.genre_map]
[self._genre_map[x["id"]] for x in genres if x["id"] in self._genre_map]
)
def get_languages(self, *args) -> list[Language]:
return self._languages + list(args)
async def get(
self,
path: str,
@ -91,110 +91,54 @@ class TheMovieDatabase(Provider):
):
params = {k: v for k, v in params.items() if v is not None}
async with self._client.get(
f"{self.base}/{path}", params={"api_key": self.api_key, **params}
f"{self._base}/{path}", params={"api_key": self._api_key, **params}
) as r:
if not_found_fail and r.status == 404:
raise ProviderError(not_found_fail)
r.raise_for_status()
return await r.json()
T = TypeVar("T")
def merge_translations(self, host, translations, *, languages: list[Language]):
host.translations = {
k.to_tag(): v.translations[k.to_tag()]
for k, v in zip(languages, translations)
}
return host
async def process_translations(
self,
for_language: Callable[[str], Awaitable[T]],
languages: list[Language],
post_merge: Callable[[T, list[T]], T] | None = None,
) -> T:
tasks = map(lambda lng: for_language(lng), languages)
items: list[Any] = await asyncio.gather(*tasks)
item = self.merge_translations(items[0], items, languages=languages)
if post_merge:
item = post_merge(item, items)
return item
def get_image(self, images: list[Dict[str, Any]]) -> list[str]:
return [
f"https://image.tmdb.org/t/p/original{x['file_path']}"
for x in images
if x["file_path"]
]
def to_studio(self, company: dict[str, Any]) -> Studio:
return Studio(
name=company["name"],
logos=[f"https://image.tmdb.org/t/p/original{company['logo_path']}"]
if "logo_path" in company
else [],
slug=to_slug(company["name"]),
external_id={
self.name: MetadataID(
company["id"], f"https://www.themoviedb.org/company/{company['id']}"
self.name: MetadataId(
data_id=company["id"],
link=f"https://www.themoviedb.org/company/{company['id']}",
)
},
translations={
"en": StudioTranslation(
name=company["name"],
logo=f"https://image.tmdb.org/t/p/original{company['logo_path']}"
if "logo_path" in company
else None,
),
},
)
def get_best_image(
self, item: dict[str, Any], lng: Language, key: str
) -> list[dict]:
"""
Retrieves the best available images for a item based on localization.
def _pick_image(self, item: dict[str, Any], lng: str, key: str) -> str | None:
base_path = "https://image.tmdb.org/t/p/original"
Args:
item (dict): A dictionary containing item information, including images and language details.
lng (Language): The preferred language for the images.
key (str): The key to access the images in the item dictionary. (e.g. "posters", "backdrops", "logos")
Returns:
list: A list of images, prioritized by localization, original language, and any available image.
"""
# Order images by size and vote average
item["images"][key] = sorted(
images = sorted(
item["images"][key],
key=lambda x: (x.get("vote_average", 0), x.get("width", 0)),
reverse=True,
)
# Step 1: Try to get localized images
localized_images = [
image
for image in item["images"][key]
if image.get("iso_639_1") == lng.language
]
# Step 2: If no localized images, try images in the original language
if not localized_images:
localized_images = [
image
for image in item["images"][key]
if image.get("iso_639_1") == item.get("original_language")
]
# Step 3: If still no images, use any available images
if not localized_images:
localized_images = item["images"][key]
# Step 4: If there are no images at all, fallback to _path attribute.
if not localized_images:
localized_images = self._get_image_fallback(item, key)
return self.get_image(localized_images)
def _get_image_fallback(self, item: dict[str, Any], key: str) -> list[dict]:
"""
Fallback to _path attribute if there are no images available in the images list.
"""
if key == "posters":
return [{"file_path": item.get("poster_path")}]
elif key == "backdrops":
return [{"file_path": item.get("backdrop_path")}]
return []
# check images in your language
localized = next((x for x in images if x["iso_639_1"] == lng), None)
if localized:
return base_path + localized
# if failed, check images without text
notext = next((x for x in images if x["iso_639_1"] == None), None)
if notext:
return base_path + notext
# take a random image, it's better than nothing
random_img = next((x for x in images if x["iso_639_1"] == None), None)
if random_img:
return base_path + random_img
return None
async def search_movie(self, name: str, year: Optional[int]) -> Movie:
search_results = (
@ -208,94 +152,107 @@ class TheMovieDatabase(Provider):
search["id"], original_language=original_language
)
async def identify_movie(
self, movie_id: str, original_language: Optional[Language] = None
) -> Movie:
languages = self.get_languages()
@override
async def get_movie(self, external_id: dict[str, str]) -> Movie | None:
# TODO: fallback to search via another id
if self.name not in external_id:
return None
async def for_language(lng: Language) -> Movie:
movie = await self.get(
f"movie/{movie_id}",
params={
"language": lng.to_tag(),
"append_to_response": "alternative_titles,videos,credits,keywords,images",
"include_image_language": f"{lng.language},null,{original_language.language if original_language else ''}",
},
)
logger.debug("TMDb responded: %s", movie)
movie = await self.get(
f"movie/{external_id[self.name]}",
params={
"append_to_response": "alternative_titles,videos,credits,keywords,images,translations",
},
)
logger.debug("TMDb responded: %s", movie)
ret = Movie(
original_language=movie["original_language"],
aliases=[x["title"] for x in movie["alternative_titles"]["titles"]],
air_date=datetime.strptime(movie["release_date"], "%Y-%m-%d").date()
if movie["release_date"]
else None,
status=MovieStatus.FINISHED
if movie["status"] == "Released"
else MovieStatus.PLANNED,
rating=round(float(movie["vote_average"]) * 10),
runtime=int(movie["runtime"]) if movie["runtime"] is not None else None,
studios=[self.to_studio(x) for x in movie["production_companies"]],
genres=self.process_genres(movie["genres"]),
external_id=(
return Movie(
slug=to_slug(movie["title"]),
original_language=Language.get(movie["original_language"]),
genres=self.process_genres(movie["genres"]),
rating=round(float(movie["vote_average"]) * 10),
status=MovieStatus.FINISHED
if movie["status"] == "Released"
else MovieStatus.PLANNED,
runtime=int(movie["runtime"]) if movie["runtime"] is not None else None,
air_date=datetime.strptime(movie["release_date"], "%Y-%m-%d").date()
if movie["release_date"]
else None,
external_id=(
{
self.name: MetadataId(
data_id=movie["id"],
link=f"https://www.themoviedb.org/movie/{movie['id']}",
)
}
| (
{
self.name: MetadataID(
movie["id"],
f"https://www.themoviedb.org/movie/{movie['id']}",
"imdb": MetadataId(
data_id=movie["imdb_id"],
link=f"https://www.imdb.com/title/{movie['imdb_id']}",
)
}
| (
{
"imdb": MetadataID(
movie["imdb_id"],
f"https://www.imdb.com/title/{movie['imdb_id']}",
)
}
if movie["imdb_id"]
else {}
if movie["imdb_id"]
else {}
)
),
translations={
Language.get(
f"{trans['iso_639_1']}-{trans['iso_3166_1']}"
): MovieTranslation(
name=clean(trans["data"]["title"])
or (
clean(movie["original_title"])
if movie["original_language"] == trans["iso_639_1"]
else None
)
),
collections=[
Collection(
external_id={
self.name: MetadataID(
movie["belongs_to_collection"]["id"],
f"https://www.themoviedb.org/collection/{movie['belongs_to_collection']['id']}",
)
},
)
]
if movie["belongs_to_collection"] is not None
else [],
# TODO: Add cast information
)
translation = MovieTranslation(
name=movie["title"],
tagline=movie["tagline"] if movie["tagline"] else None,
tags=list(map(lambda x: x["name"], movie["keywords"]["keywords"])),
overview=movie["overview"],
posters=self.get_best_image(movie, lng, "posters"),
logos=self.get_best_image(movie, lng, "logos"),
thumbnails=self.get_best_image(movie, lng, "backdrops"),
trailers=[
f"https://www.youtube.com/watch?v={x['key']}"
for x in movie["videos"]["results"]
if x["type"] == "Trailer" and x["site"] == "YouTube"
],
)
ret.translations = {lng.to_tag(): translation}
return ret
ret = await self.process_translations(for_language, languages)
if (
ret.original_language is not None
and ret.original_language not in ret.translations
):
orig_language = Language.get(ret.original_language)
ret.translations[orig_language.to_tag()] = (
await for_language(orig_language)
).translations[orig_language.to_tag()]
return ret
or movie["title"],
latin_name=next(
(
x["title"]
for x in movie["alternative_titles"]["titles"]
if x["iso_3166_1"] == trans["iso_3166_1"]
and x["type"] == "Romaji"
),
None,
),
description=clean(trans["data"]["overview"]),
tagline=clean(trans["data"]["tagline"]),
aliases=[
x["title"]
for x in movie["alternative_titles"]["titles"]
if x["iso_3166_1"] == trans["iso_3166_1"]
],
tags=[x["name"] for x in movie["keywords"]["keywords"]],
poster=self._pick_image(movie, trans["iso_639_1"], "posters"),
logo=self._pick_image(movie, trans["iso_639_1"], "logos"),
banner=None,
thumbnail=self._pick_image(movie, trans["iso_639_1"], "backdrops"),
trailer=None,
# TODO: should the trailer be added? or all of them as extra?
# [
# f"https://www.youtube.com/watch?v={x['key']}"
# for x in movie["videos"]["results"]
# if x["type"] == "Trailer" and x["site"] == "YouTube"
# ],
)
for trans in movie["translations"]["translations"]
},
collections=[
# Collection(
# external_id={
# self.name: MetadataID(
# movie["belongs_to_collection"]["id"],
# f"https://www.themoviedb.org/collection/{movie['belongs_to_collection']['id']}",
# )
# },
# )
]
if movie["belongs_to_collection"] is not None
else [],
studios=[self.to_studio(x) for x in movie["production_companies"]],
staff=[],
)
@cache(ttl=timedelta(days=1))
async def identify_show(
@ -363,9 +320,9 @@ class TheMovieDatabase(Provider):
tagline=show["tagline"] if show["tagline"] else None,
tags=list(map(lambda x: x["name"], show["keywords"]["results"])),
overview=show["overview"],
posters=self.get_best_image(show, lng, "posters"),
logos=self.get_best_image(show, lng, "logos"),
thumbnails=self.get_best_image(show, lng, "backdrops"),
posters=self._pick_image(show, lng, "posters"),
logos=self._pick_image(show, lng, "logos"),
thumbnails=self._pick_image(show, lng, "backdrops"),
trailers=[
f"https://www.youtube.com/watch?v={x['key']}"
for x in show["videos"]["results"]
@ -759,9 +716,9 @@ class TheMovieDatabase(Provider):
translation = CollectionTranslation(
name=collection["name"],
overview=collection["overview"],
posters=self.get_best_image(collection, lng, "posters"),
posters=self._pick_image(collection, lng, "posters"),
logos=[],
thumbnails=self.get_best_image(collection, lng, "backdrops"),
thumbnails=self._pick_image(collection, lng, "backdrops"),
)
ret.translations = {lng.to_tag(): translation}
return ret

View File

@ -1,22 +1,19 @@
from datetime import date
from langcodes import Language
from pydantic import AliasGenerator, BaseModel, ConfigDict
from pydantic.alias_generators import to_camel
def format_date(date: date | int | None) -> str | None:
if date is None:
return None
if isinstance(date, int):
return f"{date}-01-01"
return date.isoformat()
def normalize_lang(lang: str) -> str:
return str(Language.get(lang))
def to_slug(title: str) -> str:
return title
def clean(val: str) -> str | None:
return val or None
class Model(BaseModel):
model_config = ConfigDict(
use_enum_values=True,