Parse staff & collections

This commit is contained in:
Zoe Roux 2025-05-08 04:07:20 +02:00
parent a4fdeb8a9b
commit bf494720f9
No known key found for this signature in database
2 changed files with 100 additions and 75 deletions

View File

@ -9,12 +9,12 @@ from .metadataid import MetadataId
class Collection(Model): class Collection(Model):
slug: str slug: str
original_language: Language original_language: Language | None
genres: list[Genre] genres: list[Genre]
rating: int | None rating: int | None
external_id: dict[str, MetadataId] external_id: dict[str, MetadataId]
translations: dict[str, CollectionTranslation] = {} translations: dict[Language, CollectionTranslation] = {}
class CollectionTranslation(Model): class CollectionTranslation(Model):

View File

@ -1,14 +1,15 @@
import asyncio
import os import os
from datetime import datetime, timedelta from datetime import datetime, timedelta
from itertools import accumulate, zip_longest from itertools import accumulate, zip_longest
from logging import getLogger from logging import getLogger
from typing import Any, Awaitable, Callable, Dict, List, Optional, TypeVar, override from statistics import mean
from typing import Any, Generator, List, Optional, override
from aiohttp import ClientSession from aiohttp import ClientSession
from langcodes import Language from langcodes import Language
from matcher.cache import cache from matcher.cache import cache
from scanner.models.staff import Character, Person, Role, Staff
from ..models.collection import Collection, CollectionTranslation from ..models.collection import Collection, CollectionTranslation
from ..models.entry import Entry, EntryTranslation from ..models.entry import Entry, EntryTranslation
@ -35,6 +36,7 @@ class TheMovieDatabase(Provider):
super().__init__() super().__init__()
self._client = client self._client = client
self._base = "https://api.themoviedb.org/3" self._base = "https://api.themoviedb.org/3"
self._image_path = "https://image.tmdb.org/t/p/original"
self._api_key = ( self._api_key = (
os.environ.get("THEMOVIEDB_APIKEY") or TheMovieDatabase.DEFAULT_API_KEY os.environ.get("THEMOVIEDB_APIKEY") or TheMovieDatabase.DEFAULT_API_KEY
) )
@ -59,7 +61,6 @@ class TheMovieDatabase(Provider):
37: Genre.WESTERN, 37: Genre.WESTERN,
10759: [Genre.ACTION, Genre.ADVENTURE], 10759: [Genre.ACTION, Genre.ADVENTURE],
10762: Genre.KIDS, 10762: Genre.KIDS,
10763: [],
10764: Genre.REALITY, 10764: Genre.REALITY,
10765: [Genre.SCIENCE_FICTION, Genre.FANTASY], 10765: [Genre.SCIENCE_FICTION, Genre.FANTASY],
10766: Genre.SOAP, 10766: Genre.SOAP,
@ -72,17 +73,7 @@ class TheMovieDatabase(Provider):
def name(self) -> str: def name(self) -> str:
return "themoviedatabase" return "themoviedatabase"
def process_genres(self, genres) -> list[Genre]: async def _get(
def flatten(x: Genre | list[Genre]) -> list[Genre]:
if isinstance(x, list):
return [j for i in x for j in flatten(i)]
return [x]
return flatten(
[self._genre_map[x["id"]] for x in genres if x["id"] in self._genre_map]
)
async def get(
self, self,
path: str, path: str,
*, *,
@ -98,7 +89,15 @@ class TheMovieDatabase(Provider):
r.raise_for_status() r.raise_for_status()
return await r.json() return await r.json()
def to_studio(self, company: dict[str, Any]) -> Studio: def _map_genres(self, genres: Generator[int]) -> list[Genre]:
def flatten(x: Genre | list[Genre] | list[Genre | list[Genre]]) -> list[Genre]:
if isinstance(x, list):
return [j for i in x for j in flatten(i)]
return [x]
return flatten([self._genre_map[x] for x in genres if x in self._genre_map])
def _map_studio(self, company: dict[str, Any]) -> Studio:
return Studio( return Studio(
slug=to_slug(company["name"]), slug=to_slug(company["name"]),
external_id={ external_id={
@ -117,9 +116,30 @@ class TheMovieDatabase(Provider):
}, },
) )
def _pick_image(self, item: dict[str, Any], lng: str, key: str) -> str | None: def _map_staff(self, person: dict[str, Any]) -> Staff:
base_path = "https://image.tmdb.org/t/p/original" return Staff(
# TODO: map those to Role (see https://developer.themoviedb.org/reference/configuration-jobs for list)
kind=person["known_for_department"],
character=Character(
name=person["character"],
latin_name=None,
image=None,
),
staff=Person(
slug=to_slug(person["name"]),
name=person["name"],
latin_name=person["original_name"],
image=self._image_path + person["profile_path"],
external_id={
self.name: MetadataId(
data_id=person["id"],
link=f"https://www.themoviedb.org/person/{person['id']}",
)
},
),
)
def _pick_image(self, item: dict[str, Any], lng: str, key: str) -> str | None:
images = sorted( images = sorted(
item["images"][key], item["images"][key],
key=lambda x: (x.get("vote_average", 0), x.get("width", 0)), key=lambda x: (x.get("vote_average", 0), x.get("width", 0)),
@ -129,20 +149,20 @@ class TheMovieDatabase(Provider):
# check images in your language # check images in your language
localized = next((x for x in images if x["iso_639_1"] == lng), None) localized = next((x for x in images if x["iso_639_1"] == lng), None)
if localized: if localized:
return base_path + localized return self._image_path + localized
# if failed, check images without text # if failed, check images without text
notext = next((x for x in images if x["iso_639_1"] == None), None) notext = next((x for x in images if x["iso_639_1"] == None), None)
if notext: if notext:
return base_path + notext return self._image_path + notext
# take a random image, it's better than nothing # take a random image, it's better than nothing
random_img = next((x for x in images if x["iso_639_1"] == None), None) random_img = next((x for x in images if x["iso_639_1"] == None), None)
if random_img: if random_img:
return base_path + random_img return self._image_path + random_img
return None return None
async def search_movie(self, name: str, year: Optional[int]) -> Movie: async def search_movie(self, name: str, year: Optional[int]) -> Movie:
search_results = ( search_results = (
await self.get("search/movie", params={"query": name, "year": year}) await self._get("search/movie", params={"query": name, "year": year})
)["results"] )["results"]
if len(search_results) == 0: if len(search_results) == 0:
raise ProviderError(f"No result for a movie named: {name}") raise ProviderError(f"No result for a movie named: {name}")
@ -158,7 +178,7 @@ class TheMovieDatabase(Provider):
if self.name not in external_id: if self.name not in external_id:
return None return None
movie = await self.get( movie = await self._get(
f"movie/{external_id[self.name]}", f"movie/{external_id[self.name]}",
params={ params={
"append_to_response": "alternative_titles,videos,credits,keywords,images,translations", "append_to_response": "alternative_titles,videos,credits,keywords,images,translations",
@ -169,7 +189,7 @@ class TheMovieDatabase(Provider):
return Movie( return Movie(
slug=to_slug(movie["title"]), slug=to_slug(movie["title"]),
original_language=Language.get(movie["original_language"]), original_language=Language.get(movie["original_language"]),
genres=self.process_genres(movie["genres"]), genres=self._map_genres(x["id"] for x in movie["genres"]),
rating=round(float(movie["vote_average"]) * 10), rating=round(float(movie["vote_average"]) * 10),
status=MovieStatus.FINISHED status=MovieStatus.FINISHED
if movie["status"] == "Released" if movie["status"] == "Released"
@ -239,19 +259,12 @@ class TheMovieDatabase(Provider):
for trans in movie["translations"]["translations"] for trans in movie["translations"]["translations"]
}, },
collections=[ collections=[
# Collection( await self._get_collection(movie["belongs_to_collection"]["id"])
# external_id={
# self.name: MetadataID(
# movie["belongs_to_collection"]["id"],
# f"https://www.themoviedb.org/collection/{movie['belongs_to_collection']['id']}",
# )
# },
# )
] ]
if movie["belongs_to_collection"] is not None if movie["belongs_to_collection"] is not None
else [], else [],
studios=[self.to_studio(x) for x in movie["production_companies"]], studios=[self._map_studio(x) for x in movie["production_companies"]],
staff=[], staff=[self._map_staff(x) for x in movie["credits"]["cast"]],
) )
@cache(ttl=timedelta(days=1)) @cache(ttl=timedelta(days=1))
@ -262,7 +275,7 @@ class TheMovieDatabase(Provider):
languages = self.get_languages() languages = self.get_languages()
async def for_language(lng: Language) -> Show: async def for_language(lng: Language) -> Show:
show = await self.get( show = await self._get(
f"tv/{show_id}", f"tv/{show_id}",
params={ params={
"language": lng.to_tag(), "language": lng.to_tag(),
@ -287,8 +300,8 @@ class TheMovieDatabase(Provider):
if show["in_production"] if show["in_production"]
else ShowStatus.FINISHED, else ShowStatus.FINISHED,
rating=round(float(show["vote_average"]) * 10), rating=round(float(show["vote_average"]) * 10),
studios=[self.to_studio(x) for x in show["production_companies"]], studios=[self._map_studio(x) for x in show["production_companies"]],
genres=self.process_genres(show["genres"]), genres=self._map_genres(show["genres"]),
external_id={ external_id={
self.name: MetadataID( self.name: MetadataID(
show["id"], f"https://www.themoviedb.org/tv/{show['id']}" show["id"], f"https://www.themoviedb.org/tv/{show['id']}"
@ -406,7 +419,7 @@ class TheMovieDatabase(Provider):
@cache(ttl=timedelta(days=1)) @cache(ttl=timedelta(days=1))
async def search_show(self, name: str, year: Optional[int]) -> PartialShow: async def search_show(self, name: str, year: Optional[int]) -> PartialShow:
search_results = ( search_results = (
await self.get("search/tv", params={"query": name, "year": year}) await self._get("search/tv", params={"query": name, "year": year})
)["results"] )["results"]
if len(search_results) == 0: if len(search_results) == 0:
@ -454,14 +467,14 @@ class TheMovieDatabase(Provider):
) -> Episode: ) -> Episode:
async def for_language(lng: Language) -> Episode: async def for_language(lng: Language) -> Episode:
try: try:
episode = await self.get( episode = await self._get(
f"tv/{show_id}/season/{season}/episode/{episode_nbr}", f"tv/{show_id}/season/{season}/episode/{episode_nbr}",
params={ params={
"language": lng.to_tag(), "language": lng.to_tag(),
}, },
) )
except: except:
episode = await self.get( episode = await self._get(
f"tv/{show_id}/season/{season}/episode/{absolute}", f"tv/{show_id}/season/{season}/episode/{absolute}",
params={ params={
"language": lng.to_tag(), "language": lng.to_tag(),
@ -559,7 +572,7 @@ class TheMovieDatabase(Provider):
show = await self.identify_show(show_id) show = await self.identify_show(show_id)
try: try:
groups = await self.get(f"tv/{show_id}/episode_groups") groups = await self._get(f"tv/{show_id}/episode_groups")
ep_count = max((x["episode_count"] for x in groups["results"]), default=0) ep_count = max((x["episode_count"] for x in groups["results"]), default=0)
if ep_count == 0: if ep_count == 0:
return None return None
@ -575,7 +588,7 @@ class TheMovieDatabase(Provider):
if group_id is None: if group_id is None:
return None return None
group = await self.get(f"tv/episode_group/{group_id}") group = await self._get(f"tv/episode_group/{group_id}")
absgrp = [ absgrp = [
ep ep
for grp in sorted(group["groups"], key=lambda x: x["order"]) for grp in sorted(group["groups"], key=lambda x: x["order"])
@ -691,36 +704,48 @@ class TheMovieDatabase(Provider):
# but tmdb registered it as S21E831 since S21's first ep is 800 # but tmdb registered it as S21E831 since S21's first ep is 800
return await self.get_absolute_number(show_id, season, episode_nbr + start) return await self.get_absolute_number(show_id, season, episode_nbr + start)
async def identify_collection(self, provider_id: str) -> Collection: async def _get_collection(self, provider_id: str) -> Collection:
languages = self.get_languages() collection = await self._get(
async def for_language(lng: Language) -> Collection:
collection = await self.get(
f"collection/{provider_id}", f"collection/{provider_id}",
params={ params={
"language": lng.to_tag(), "append_to_response": "images,translations",
"append_to_response": "images",
"include_image_language": f"{lng.language},null,en",
}, },
) )
logger.debug("TMDb responded: %s", collection) logger.debug("TMDb responded: %s", collection)
ret = Collection( return Collection(
slug=to_slug(collection["name"]),
# assume all parts are in the same language
original_language=Language.get(collection["part"][0]["original_language"]),
genres=[
y for x in collection["part"] for y in self._map_genres(x["genres"])
],
rating=round(
mean(float(x["vote_average"]) * 10 for x in collection["part"])
),
external_id={ external_id={
self.name: MetadataID( self.name: MetadataId(
collection["id"], data_id=collection["id"],
f"https://www.themoviedb.org/collection/{collection['id']}", link=f"https://www.themoviedb.org/collection/{collection['id']}",
) )
}, },
translations={
Language.get(
f"{trans['iso_639_1']}-{trans['iso_3166_1']}"
): CollectionTranslation(
name=clean(trans["data"]["title"]) or collection["title"],
latin_name=None,
description=trans["overview"],
tagline=None,
aliases=[],
tags=[],
poster=self._pick_image(collection, trans["iso_639_1"], "posters"),
thumbnail=self._pick_image(
collection, trans["iso_639_1"], "backdrops"
),
banner=None,
logo=None,
) )
translation = CollectionTranslation( for trans in collection["translations"]["translations"]
name=collection["name"], },
overview=collection["overview"],
posters=self._pick_image(collection, lng, "posters"),
logos=[],
thumbnails=self._pick_image(collection, lng, "backdrops"),
) )
ret.translations = {lng.to_tag(): translation}
return ret
return await self.process_translations(for_language, languages)