Add movie search

This commit is contained in:
Zoe Roux 2025-05-08 04:20:07 +02:00
parent bf494720f9
commit ff8455e6ec
No known key found for this signature in database

View File

@ -3,21 +3,21 @@ from datetime import datetime, timedelta
from itertools import accumulate, zip_longest
from logging import getLogger
from statistics import mean
from typing import Any, Generator, List, Optional, override
from typing import Any, Generator, Optional, override
from aiohttp import ClientSession
from langcodes import Language
from matcher.cache import cache
from scanner.models.staff import Character, Person, Role, Staff
from ..models.collection import Collection, CollectionTranslation
from ..models.entry import Entry, EntryTranslation
from ..models.genre import Genre
from ..models.metadataid import EpisodeId, MetadataId
from ..models.movie import Movie, MovieStatus, MovieTranslation
from ..models.movie import Movie, MovieStatus, MovieTranslation, SearchMovie
from ..models.season import Season, SeasonTranslation
from ..models.serie import Serie, SerieStatus, SerieTranslation
from ..models.staff import Character, Person, Role, Staff
from ..models.studio import Studio, StudioTranslation
from ..utils import clean, to_slug
from .provider import Provider, ProviderError
@ -73,104 +73,38 @@ class TheMovieDatabase(Provider):
def name(self) -> str:
return "themoviedatabase"
async def _get(
self,
path: str,
*,
params: dict[str, Any] = {},
not_found_fail: Optional[str] = None,
):
params = {k: v for k, v in params.items() if v is not None}
async with self._client.get(
f"{self._base}/{path}", params={"api_key": self._api_key, **params}
) as r:
if not_found_fail and r.status == 404:
raise ProviderError(not_found_fail)
r.raise_for_status()
return await r.json()
def _map_genres(self, genres: Generator[int]) -> list[Genre]:
def flatten(x: Genre | list[Genre] | list[Genre | list[Genre]]) -> list[Genre]:
if isinstance(x, list):
return [j for i in x for j in flatten(i)]
return [x]
return flatten([self._genre_map[x] for x in genres if x in self._genre_map])
def _map_studio(self, company: dict[str, Any]) -> Studio:
return Studio(
slug=to_slug(company["name"]),
external_id={
self.name: MetadataId(
data_id=company["id"],
link=f"https://www.themoviedb.org/company/{company['id']}",
)
},
translations={
"en": StudioTranslation(
name=company["name"],
logo=f"https://image.tmdb.org/t/p/original{company['logo_path']}"
if "logo_path" in company
else None,
),
},
)
def _map_staff(self, person: dict[str, Any]) -> Staff:
return Staff(
# TODO: map those to Role (see https://developer.themoviedb.org/reference/configuration-jobs for list)
kind=person["known_for_department"],
character=Character(
name=person["character"],
latin_name=None,
image=None,
),
staff=Person(
slug=to_slug(person["name"]),
name=person["name"],
latin_name=person["original_name"],
image=self._image_path + person["profile_path"],
@override
async def search_movies(
self, title: str, year: int | None, *, language: list[Language]
) -> list[SearchMovie]:
search = (
await self._get(
"search/movie",
params={
"query": title,
"year": year,
"languages": [str(x) for x in language],
},
)
)["results"]
search = self._sort_search(search, title, year)
return [
SearchMovie(
slug=to_slug(x["title"]),
name=x["title"],
description=x["overview"],
air_date=datetime.strptime(x["release_date"], "%Y-%m-%d").date(),
poster=self._image_path + x["poster_path"],
original_language=Language.get(x["original_language"]),
external_id={
self.name: MetadataId(
data_id=person["id"],
link=f"https://www.themoviedb.org/person/{person['id']}",
data_id=x["id"],
link=f"https://www.themoviedb.org/movie/{x['id']}",
)
},
),
)
def _pick_image(self, item: dict[str, Any], lng: str, key: str) -> str | None:
images = sorted(
item["images"][key],
key=lambda x: (x.get("vote_average", 0), x.get("width", 0)),
reverse=True,
)
# check images in your language
localized = next((x for x in images if x["iso_639_1"] == lng), None)
if localized:
return self._image_path + localized
# if failed, check images without text
notext = next((x for x in images if x["iso_639_1"] == None), None)
if notext:
return self._image_path + notext
# take a random image, it's better than nothing
random_img = next((x for x in images if x["iso_639_1"] == None), None)
if random_img:
return self._image_path + random_img
return None
async def search_movie(self, name: str, year: Optional[int]) -> Movie:
search_results = (
await self._get("search/movie", params={"query": name, "year": year})
)["results"]
if len(search_results) == 0:
raise ProviderError(f"No result for a movie named: {name}")
search = self.get_best_result(search_results, name, year)
original_language = Language.get(search["original_language"])
return await self.identify_movie(
search["id"], original_language=original_language
)
)
for x in search
]
@override
async def get_movie(self, external_id: dict[str, str]) -> Movie | None:
@ -425,7 +359,7 @@ class TheMovieDatabase(Provider):
if len(search_results) == 0:
raise ProviderError(f"No result for a tv show named: {name}")
search = self.get_best_result(search_results, name, year)
search = self._sort_search(search_results, name, year)
show_id = search["id"]
return PartialShow(
name=search["name"],
@ -523,44 +457,6 @@ class TheMovieDatabase(Provider):
return await self.process_translations(for_language, self.get_languages())
def get_best_result(
self, search_results: List[Any], name: str, year: Optional[int]
) -> Any:
results = search_results
# Find perfect match by year since sometime tmdb decides to discard the year parameter.
if year:
results = list(
x
for x in search_results
if ("first_air_date" in x and x["first_air_date"].startswith(str(year)))
or ("release_date" in x and x["release_date"].startswith(str(year)))
)
if not results:
results = search_results
# If there is a perfect match use it (and if there are multiple, use the most popular one)
res = sorted(
(
x
for x in results
if ("name" in x and x["name"].casefold() == name.casefold())
or ("title" in x and x["title"].casefold() == name.casefold())
),
key=lambda x: (x["vote_count"], x["popularity"]),
reverse=True,
)
if res:
results = res
else:
# Ignore totally unpopular shows or unknown ones.
# sorted is stable and False<True so doing this puts baddly rated items at the end of the list.
results = sorted(
results, key=lambda x: x["vote_count"] < 5 or x["popularity"] < 5
)
return results[0]
@cache(ttl=timedelta(days=1))
async def get_absolute_order(self, show_id: str):
"""
@ -749,3 +645,126 @@ class TheMovieDatabase(Provider):
for trans in collection["translations"]["translations"]
},
)
def _sort_search(self, search: list[Any], name: str, year: int | None) -> Any:
results = search
# Find perfect match by year since sometime tmdb decides to discard the year parameter.
if year:
results = [
x
for x in search
if ("first_air_date" in x and x["first_air_date"].startswith(str(year)))
or ("release_date" in x and x["release_date"].startswith(str(year)))
]
if not results:
results = search
# If there is a perfect match use it (and if there are multiple, use the most popular one)
res = sorted(
(
x
for x in results
if ("name" in x and x["name"].casefold() == name.casefold())
or ("title" in x and x["title"].casefold() == name.casefold())
),
key=lambda x: (x["vote_count"], x["popularity"]),
reverse=True,
)
if res:
results = res
else:
# Ignore totally unpopular shows or unknown ones.
# sorted is stable and False<True so doing this puts baddly rated items at the end of the list.
results = sorted(
results, key=lambda x: x["vote_count"] < 5 or x["popularity"] < 5
)
return results
async def _get(
self,
path: str,
*,
params: dict[str, Any] = {},
not_found_fail: Optional[str] = None,
):
params = {k: v for k, v in params.items() if v is not None}
async with self._client.get(
f"{self._base}/{path}", params={"api_key": self._api_key, **params}
) as r:
if not_found_fail and r.status == 404:
raise ProviderError(not_found_fail)
r.raise_for_status()
return await r.json()
def _map_genres(self, genres: Generator[int]) -> list[Genre]:
def flatten(x: Genre | list[Genre] | list[Genre | list[Genre]]) -> list[Genre]:
if isinstance(x, list):
return [j for i in x for j in flatten(i)]
return [x]
return flatten([self._genre_map[x] for x in genres if x in self._genre_map])
def _map_studio(self, company: dict[str, Any]) -> Studio:
return Studio(
slug=to_slug(company["name"]),
external_id={
self.name: MetadataId(
data_id=company["id"],
link=f"https://www.themoviedb.org/company/{company['id']}",
)
},
translations={
"en": StudioTranslation(
name=company["name"],
logo=f"https://image.tmdb.org/t/p/original{company['logo_path']}"
if "logo_path" in company
else None,
),
},
)
def _map_staff(self, person: dict[str, Any]) -> Staff:
return Staff(
# TODO: map those to Role (see https://developer.themoviedb.org/reference/configuration-jobs for list)
kind=person["known_for_department"],
character=Character(
name=person["character"],
latin_name=None,
image=None,
),
staff=Person(
slug=to_slug(person["name"]),
name=person["name"],
latin_name=person["original_name"],
image=self._image_path + person["profile_path"],
external_id={
self.name: MetadataId(
data_id=person["id"],
link=f"https://www.themoviedb.org/person/{person['id']}",
)
},
),
)
def _pick_image(self, item: dict[str, Any], lng: str, key: str) -> str | None:
images = sorted(
item["images"][key],
key=lambda x: (x.get("vote_average", 0), x.get("width", 0)),
reverse=True,
)
# check images in your language
localized = next((x for x in images if x["iso_639_1"] == lng), None)
if localized:
return self._image_path + localized
# if failed, check images without text
notext = next((x for x in images if x["iso_639_1"] == None), None)
if notext:
return self._image_path + notext
# take a random image, it's better than nothing
random_img = next((x for x in images if x["iso_639_1"] == None), None)
if random_img:
return self._image_path + random_img
return None