Add entry & seasons parsing from tmdb

This commit is contained in:
Zoe Roux 2025-05-08 23:50:49 +02:00
parent 8269546dd0
commit d209026094
No known key found for this signature in database
5 changed files with 211 additions and 238 deletions

View File

@ -3,6 +3,8 @@ from __future__ import annotations
from datetime import date
from typing import Literal
from langcodes import Language
from ..utils import Model
from .metadataid import EpisodeId, MetadataId
@ -10,27 +12,27 @@ from .metadataid import EpisodeId, MetadataId
class Entry(Model):
kind: Literal["episode", "movie", "special"]
order: float
runtime: int | None = None
air_date: date | None = None
thumbnail: str | None = None
runtime: int | None
air_date: date | None
thumbnail: str | None
# Movie-specific fields
slug: str | None = None
slug: str | None
# Episode-specific fields
season_number: int | None = None
episode_number: int | None = None
season_number: int | None
episode_number: int | None
# Special-specific fields
number: int | None = None
number: int | None
externalId: dict[str, MetadataId | EpisodeId]
translations: dict[str, EntryTranslation] = {}
external_id: dict[str, MetadataId | EpisodeId]
translations: dict[Language, EntryTranslation] = {}
videos: list[str] = []
class EntryTranslation(Model):
name: str | None = None
description: str | None = None
tagline: str | None = None
poster: str | None = None
name: str | None
description: str | None
tagline: str | None
poster: str | None

View File

@ -6,6 +6,12 @@ class MetadataId(Model):
link: str | None = None
class SeasonId(Model):
serie_id: str
season: int
link: str | None = None
class EpisodeId(Model):
serie_id: str
season: int | None

View File

@ -2,16 +2,18 @@ from __future__ import annotations
from datetime import date
from langcodes import Language
from ..utils import Model
from .metadataid import MetadataId
from .metadataid import SeasonId
class Season(Model):
season_number: int
start_air: date | None
end_air: date | None
external_id: dict[str, MetadataId]
translations: dict[str, SeasonTranslation] = {}
external_id: dict[str, SeasonId]
translations: dict[Language, SeasonTranslation] = {}
class SeasonTranslation(Model):

View File

@ -34,7 +34,7 @@ class Serie(Model):
end_air: date | None
external_id: dict[str, MetadataId]
translations: dict[str, SerieTranslation] = {}
translations: dict[Language, SerieTranslation] = {}
seasons: list[Season] = []
entries: list[Entry] = []
extra: list[Extra] = []

View File

@ -1,3 +1,4 @@
import asyncio
import os
from datetime import datetime, timedelta
from itertools import accumulate, zip_longest
@ -13,7 +14,7 @@ from matcher.cache import cache
from ..models.collection import Collection, CollectionTranslation
from ..models.entry import Entry, EntryTranslation
from ..models.genre import Genre
from ..models.metadataid import EpisodeId, MetadataId
from ..models.metadataid import EpisodeId, MetadataId, SeasonId
from ..models.movie import Movie, MovieStatus, MovieTranslation, SearchMovie
from ..models.season import Season, SeasonTranslation
from ..models.serie import SearchSerie, Serie, SerieStatus, SerieTranslation
@ -94,7 +95,7 @@ class TheMovieDatabase(Provider):
name=x["title"],
description=x["overview"],
air_date=datetime.strptime(x["release_date"], "%Y-%m-%d").date(),
poster=self._image_path + x["poster_path"],
poster=self._map_image(x["poster_path"]),
original_language=Language.get(x["original_language"]),
external_id={
self.name: MetadataId(
@ -198,6 +199,7 @@ class TheMovieDatabase(Provider):
if movie["belongs_to_collection"] is not None
else [],
studios=[self._map_studio(x) for x in movie["production_companies"]],
# TODO: add crew
staff=[self._map_staff(x) for x in movie["credits"]["cast"]],
)
@ -223,7 +225,7 @@ class TheMovieDatabase(Provider):
description=x["overview"],
start_air=datetime.strptime(x["first_air_date"], "%Y-%m-%d").date(),
end_air=None,
poster=self._image_path + x["poster_path"],
poster=self._map_image(x["poster_path"]),
original_language=Language.get(x["original_language"]),
external_id={
self.name: MetadataId(
@ -235,240 +237,196 @@ class TheMovieDatabase(Provider):
for x in search
]
@cache(ttl=timedelta(days=1))
async def identify_show(
self,
show_id: str,
) -> Show:
languages = self.get_languages()
@override
async def get_serie(self, external_id: dict[str, str]) -> Serie | None:
# TODO: fallback to search via another id
if self.name not in external_id:
return None
async def for_language(lng: Language) -> Show:
show = await self._get(
f"tv/{show_id}",
params={
"language": lng.to_tag(),
"append_to_response": "alternative_titles,videos,credits,keywords,images,external_ids",
"include_image_language": f"{lng.language},null,en",
},
)
logger.debug("TMDb responded: %s", show)
ret = Show(
original_language=show["original_language"],
aliases=[x["title"] for x in show["alternative_titles"]["results"]],
start_air=datetime.strptime(show["first_air_date"], "%Y-%m-%d").date()
if show["first_air_date"]
else None,
end_air=datetime.strptime(show["last_air_date"], "%Y-%m-%d").date()
if show["last_air_date"]
else None,
status=ShowStatus.FINISHED
if show["status"] == "Released"
else ShowStatus.AIRING
if show["in_production"]
else ShowStatus.FINISHED,
rating=round(float(show["vote_average"]) * 10),
studios=[self._map_studio(x) for x in show["production_companies"]],
genres=self._map_genres(show["genres"]),
external_id={
self.name: MetadataID(
show["id"], f"https://www.themoviedb.org/tv/{show['id']}"
),
}
| (
{
"imdb": MetadataID(
show["external_ids"]["imdb_id"],
f"https://www.imdb.com/title/{show['external_ids']['imdb_id']}",
)
}
if show["external_ids"]["imdb_id"]
else {}
)
| (
{"tvdb": MetadataID(show["external_ids"]["tvdb_id"], link=None)}
if show["external_ids"]["tvdb_id"]
else {}
),
seasons=[
self.to_season(x, language=lng, show_id=show["id"])
for x in show["seasons"]
],
# TODO: Add cast information
)
translation = ShowTranslation(
name=show["name"],
tagline=show["tagline"] if show["tagline"] else None,
tags=list(map(lambda x: x["name"], show["keywords"]["results"])),
overview=show["overview"],
posters=self._pick_image(show, lng, "posters"),
logos=self._pick_image(show, lng, "logos"),
thumbnails=self._pick_image(show, lng, "backdrops"),
trailers=[
f"https://www.youtube.com/watch?v={x['key']}"
for x in show["videos"]["results"]
if x["type"] == "Trailer" and x["site"] == "YouTube"
],
)
ret.translations = {lng.to_tag(): translation}
return ret
def merge_seasons_translations(item: Show, items: list[Show]) -> Show:
item.seasons = [
self.merge_translations(
season,
[
next(
y
for y in x.seasons
if y.season_number == season.season_number
)
for x in items
],
languages=languages,
)
for season in item.seasons
]
return item
ret = await self.process_translations(
for_language, languages, merge_seasons_translations
serie = await self._get(
f"tv/{external_id[self.name]}",
params={
"append_to_response": "alternative_titles,videos,credits,keywords,images,external_ids,translations",
},
)
if (
ret.original_language is not None
and ret.original_language not in ret.translations
):
orig_language = Language.get(ret.original_language)
ret.translations[orig_language.to_tag()] = (
await for_language(orig_language)
).translations[orig_language.to_tag()]
return ret
logger.debug("TMDb responded: %s", serie)
return Serie(
slug=to_slug(serie["name"]),
original_language=Language.get(serie["original_language"]),
genres=self._map_genres(x["id"] for x in serie["genres"]),
rating=round(float(serie["vote_average"]) * 10),
status=SerieStatus.FINISHED
if serie["status"] == "Released"
else SerieStatus.AIRING
if serie["in_production"]
else SerieStatus.FINISHED,
runtime=serie["last_episode_to_air"]["runtime"],
start_air=datetime.strptime(serie["first_air_date"], "%Y-%m-%d").date()
if serie["first_air_date"]
else None,
end_air=datetime.strptime(serie["last_air_date"], "%Y-%m-%d").date()
if serie["last_air_date"]
else None,
external_id={
self.name: MetadataId(
data_id=serie["id"],
link=f"https://www.themoviedb.org/tv/{serie['id']}",
),
}
| (
{
"imdb": MetadataId(
data_id=serie["external_ids"]["imdb_id"],
link=f"https://www.imdb.com/title/{serie['external_ids']['imdb_id']}",
)
}
if serie["external_ids"]["imdb_id"]
else {}
)
| (
{
"tvdb": MetadataId(
data_id=serie["external_ids"]["tvdb_id"],
link=None,
)
}
if serie["external_ids"]["tvdb_id"]
else {}
),
translations={
Language.get(
f"{trans['iso_639_1']}-{trans['iso_3166_1']}"
): SerieTranslation(
name=clean(trans["data"]["title"])
or (
clean(serie["original_title"])
if serie["original_language"] == trans["iso_639_1"]
else None
)
or serie["title"],
latin_name=next(
(
x["title"]
for x in serie["alternative_titles"]["titles"]
if x["iso_3166_1"] == trans["iso_3166_1"]
and x["type"] == "Romaji"
),
None,
),
description=clean(trans["data"]["overview"]),
tagline=clean(trans["data"]["tagline"]),
aliases=[
x["title"]
for x in serie["alternative_titles"]["titles"]
if x["iso_3166_1"] == trans["iso_3166_1"]
],
tags=[x["name"] for x in serie["keywords"]["keywords"]],
poster=self._pick_image(serie, trans["iso_639_1"], "posters"),
logo=self._pick_image(serie, trans["iso_639_1"], "logos"),
banner=None,
thumbnail=self._pick_image(serie, trans["iso_639_1"], "backdrops"),
trailer=None,
# TODO: should the trailer be added? or all of them as extra?
# [
# f"https://www.youtube.com/watch?v={x['key']}"
# for x in show["videos"]["results"]
# if x["type"] == "Trailer" and x["site"] == "YouTube"
# ],
)
for trans in serie["translations"]["translations"]
},
seasons=await asyncio.gather(
*[
self._get_season(serie["id"], x["season_number"])
for x in serie["seasons"]
]
),
entries=[],
extra=[],
collections=[],
studios=[self._map_studio(x) for x in serie["production_companies"]],
# TODO: add crew
staff=[self._map_staff(x) for x in serie["credits"]["cast"]],
)
async def _get_season(self, serie_id: str, season_number: int) -> Season:
season = await self._get(
f"tv/{serie_id}/season/{season_number}",
params={
"append_to_response": "translations,images",
},
)
logger.debug("TMDb responded: %s", season)
def to_season(
self, season: dict[str, Any], *, language: Language, show_id: str
) -> Season:
return Season(
season_number=season["season_number"],
episodes_count=season["episode_count"],
start_air=datetime.strptime(season["air_date"], "%Y-%m-%d").date()
if season["air_date"]
else None,
end_air=None,
external_id={
self.name: MetadataID(
show_id,
f"https://www.themoviedb.org/tv/{show_id}/season/{season['season_number']}",
self.name: SeasonId(
serie_id=serie_id,
season=season["season_number"],
link=f"https://www.themoviedb.org/tv/{serie_id}/season/{season['season_number']}",
)
},
translations={
language.to_tag(): SeasonTranslation(
name=season["name"],
overview=season["overview"],
posters=[
f"https://image.tmdb.org/t/p/original{season['poster_path']}"
]
if season["poster_path"] is not None
else [],
thumbnails=[],
Language.get(
f"{trans['iso_639_1']}-{trans['iso_3166_1']}"
): SeasonTranslation(
name=clean(trans["data"]["name"]),
description=clean(trans["data"]["overview"]),
poster=self._pick_image(season, trans["iso_639_1"], "posters"),
thumbnail=None,
banner=None,
)
for trans in season["translations"]["translations"]
},
)
async def identify_season(self, show_id: str, season: int) -> Season:
# We already get seasons info in the identify_show and chances are this gets cached already
show = await self.identify_show(show_id)
ret = next((x for x in show.seasons if x.season_number == season), None)
if ret is None:
raise ProviderError(
f"Could not find season {season} for show {show.to_kyoo()['name']}"
)
return ret
async def _get_entry(self, serie_id: str, season: int, episode_nbr: int) -> Entry:
episode = await self._get(
f"tv/{serie_id}/season/{season}/episode/{episode_nbr}",
params={
"append_to_response": "translations",
},
)
logger.debug("TMDb responded: %s", episode)
async def search_episode(
self,
name: str,
season: Optional[int],
episode_nbr: Optional[int],
absolute: Optional[int],
year: Optional[int],
) -> Episode:
show = await self.search_show(name, year)
show_id = show.external_id[self.name].data_id
if absolute is not None and (season is None or episode_nbr is None):
(season, episode_nbr) = await self.get_episode_from_absolute(
show_id, absolute
)
if season is None or episode_nbr is None:
raise ProviderError(
f"Could not guess season or episode number of the episode {show.name} {season}-{episode_nbr} ({absolute})",
)
if absolute is None:
absolute = await self.get_absolute_number(show_id, season, episode_nbr)
return await self.identify_episode(show_id, season, episode_nbr, absolute)
async def identify_episode(
self, show_id: str, season: Optional[int], episode_nbr: int, absolute: int
) -> Episode:
async def for_language(lng: Language) -> Episode:
try:
episode = await self._get(
f"tv/{show_id}/season/{season}/episode/{episode_nbr}",
params={
"language": lng.to_tag(),
},
)
except:
episode = await self._get(
f"tv/{show_id}/season/{season}/episode/{absolute}",
params={
"language": lng.to_tag(),
},
not_found_fail=f"Could not find episode {episode_nbr} of season {season} of serie {show_id} (absolute: {absolute})",
)
logger.debug("TMDb responded: %s", episode)
ret = Episode(
show=PartialShow(
name=show_id,
original_language=None,
external_id={
self.name: MetadataID(
show_id, f"https://www.themoviedb.org/tv/{show_id}"
)
},
return Entry(
kind="episode" if episode["season_number"] != 0 else "special",
order=0,
runtime=int(episode["runtime"]) if episode["runtime"] is not None else None,
air_date=datetime.strptime(episode["air_date"], "%Y-%m-%d").date()
if episode["air_date"]
else None,
thumbnail=self._map_image(episode["still_path"]),
slug=None,
season_number=episode["season_number"],
episode_number=episode["episode_number"],
number=episode["episode_number"],
external_id={
self.name: EpisodeId(
serie_id=serie_id,
season=episode["season_number"],
episode=episode["episode_number"],
link=f"https://www.themoviedb.org/tv/{serie_id}/season/{episode['season_number']}/episode/{episode['episode_number']}",
),
season_number=episode["season_number"],
episode_number=episode["episode_number"],
absolute_number=absolute,
runtime=int(episode["runtime"])
if episode["runtime"] is not None
else None,
release_date=datetime.strptime(episode["air_date"], "%Y-%m-%d").date()
if episode["air_date"]
else None,
thumbnail=f"https://image.tmdb.org/t/p/original{episode['still_path']}"
if "still_path" in episode and episode["still_path"] is not None
else None,
external_id={
self.name: EpisodeID(
show_id,
episode["season_number"],
episode["episode_number"],
f"https://www.themoviedb.org/tv/{show_id}/season/{episode['season_number']}/episode/{episode['episode_number']}",
),
},
)
translation = EpisodeTranslation(
name=episode["name"],
overview=episode["overview"],
)
ret.translations = {lng.to_tag(): translation}
return ret
return await self.process_translations(for_language, self.get_languages())
},
translations={
Language.get(
f"{trans['iso_639_1']}-{trans['iso_3166_1']}"
): EntryTranslation(
name=clean(trans["data"]["name"]),
description=clean(trans["data"]["overview"]),
tagline=None,
poster=None,
)
for trans in episode["translations"]["translations"]
},
)
@cache(ttl=timedelta(days=1))
async def get_absolute_order(self, show_id: str):
@ -699,10 +657,10 @@ class TheMovieDatabase(Provider):
self,
path: str,
*,
params: dict[str, Any] = {},
params: dict[str, Any] | None = None,
not_found_fail: str | None = None,
):
params = {k: v for k, v in params.items() if v is not None}
params = {k: v for k, v in params.items() if v is not None} if params else {}
async with self._client.get(
f"{self._base}/{path}", params={"api_key": self._api_key, **params}
) as r:
@ -751,7 +709,7 @@ class TheMovieDatabase(Provider):
slug=to_slug(person["name"]),
name=person["name"],
latin_name=person["original_name"],
image=self._image_path + person["profile_path"],
image=self._map_image(person["profile_path"]),
external_id={
self.name: MetadataId(
data_id=person["id"],
@ -761,6 +719,11 @@ class TheMovieDatabase(Provider):
),
)
def _map_image(self, image: str | None) -> str | None:
if not image:
return None
return self._image_path + image
def _pick_image(self, item: dict[str, Any], lng: str, key: str) -> str | None:
images = sorted(
item["images"][key],