2024-05-15 00:11:37 +02:00

380 lines
10 KiB
Python

import asyncio
from datetime import timedelta, datetime
from urllib.parse import urlencode
from aiohttp import ClientSession
from logging import getLogger
from typing import Optional, Any, Callable, OrderedDict
from langcodes import Language
from matcher.cache import cache
from ..provider import Provider, ProviderError
from ..types.season import Season, SeasonTranslation
from ..types.episode import Episode, EpisodeTranslation, PartialShow, EpisodeID
from ..types.studio import Studio
from ..types.genre import Genre
from ..types.metadataid import MetadataID
from ..types.show import Show, ShowTranslation, Status as ShowStatus
logger = getLogger(__name__)
class TVDB(Provider):
DEFAULT_API_KEY = "3732560f-08b7-41db-9d9a-2966b4d90c10"
def __init__(
self,
client: ClientSession,
api_key: str,
pin: Optional[str],
languages: list[str],
) -> None:
super().__init__()
self._client = client
self.base = "https://api4.thetvdb.com/v4"
self._api_key = api_key
self._pin = pin
# tvdb use three letter codes for languages
# (with the terminology code as in 'fra' and not the biblographic code as in 'fre')
self._languages = [Language.get(lang).to_alpha3() for lang in languages]
self._genre_map = {
"soap": Genre.SOAP,
"science-fiction": Genre.SCIENCE_FICTION,
"reality": Genre.REALITY,
"news": Genre.NEWS,
"mini-series": None,
"horror": Genre.HORROR,
"home-and-garden": None,
"game-show": None,
"food": None,
"fantasy": Genre.FANTASY,
"family": Genre.FAMILY,
"drama": Genre.DRAMA,
"documentary": Genre.DOCUMENTARY,
"crime": Genre.CRIME,
"comedy": Genre.COMEDY,
"children": Genre.KIDS,
"animation": Genre.ANIMATION,
"adventure": Genre.ADVENTURE,
"action": Genre.ACTION,
"sport": None,
"suspense": None,
"talk-show": Genre.TALK,
"thriller": Genre.THRILLER,
"travel": None,
"western": Genre.WESTERN,
"anime": Genre.ANIMATION,
"romance": Genre.ROMANCE,
"musical": Genre.MUSIC,
"podcast": None,
"mystery": Genre.MYSTERY,
"indie": None,
"history": Genre.HISTORY,
"war": Genre.WAR,
"martial-arts": None,
"awards-show": None,
}
def normalize_lang(self, lang: str) -> str:
return str(Language.get(lang))
@cache(ttl=timedelta(days=30))
async def login(self) -> str:
async with self._client.post(
f"{self.base}/login",
json={
"apikey": self._api_key,
}
| ({"pin": self._pin} if self._pin else {}),
) as r:
r.raise_for_status()
ret = await r.json()
return ret["data"]["token"]
async def get(
self,
path: Optional[str] = None,
*,
fullPath: Optional[str] = None,
params: dict[str, Any] = {},
not_found_fail: Optional[str] = None,
):
token = await self.login()
params = {k: v for k, v in params.items() if v is not None}
async with self._client.get(
fullPath or f"{self.base}/{path}",
params={"api_key": self._api_key, **params},
headers={"Authorization": f"Bearer {token}"},
) as r:
if not_found_fail and r.status == 404:
raise ProviderError(not_found_fail)
r.raise_for_status()
return await r.json()
@property
def name(self) -> str:
return "tvdb"
@cache(ttl=timedelta(days=1))
async def search_show(self, name: str, year: Optional[int]) -> str:
query = OrderedDict(
query=name,
year=year,
type="series",
)
ret = await self.get(f"search?{urlencode(query)}")
return ret["data"][0]["tvdb_id"]
@cache(ttl=timedelta(days=1))
async def get_episodes(
self,
show_id: str,
language: Optional[str] = None,
):
path = f"series/{show_id}/episodes/default"
if language is not None:
path += f"/{language}"
ret = await self.get(
path, not_found_fail=f"Could not find show with id {show_id}"
)
episodes = ret["data"]["episodes"]
next = ret["links"]["next"]
while next != None:
ret = await self.get(fullPath=next)
next = ret["links"]["next"]
episodes += ret["data"]["episodes"]
return episodes, ret["data"]
async def search_episode(
self,
name: str,
season: Optional[int],
episode_nbr: Optional[int],
absolute: Optional[int],
year: Optional[int],
) -> Episode:
show_id = await self.search_show(name, year)
return await self.identify_episode(show_id, season, episode_nbr, absolute)
@cache(ttl=timedelta(days=1))
async def identify_episode(
self,
show_id: str,
season: Optional[int],
episode_nbr: Optional[int],
absolute: Optional[int],
) -> Episode:
flang, *olang = self._languages
episodes, show = await self.get_episodes(show_id, language=flang)
ret = next(
filter(
(lambda x: x["seasonNumber"] == 1 and x["number"] == absolute)
if absolute is not None
else (
lambda x: x["seasonNumber"] == season and x["number"] == episode_nbr
),
episodes,
),
None,
)
if ret == None:
raise ProviderError(
f"Could not retrive episode {show['name']} s{season}e{episode_nbr}, absolute {absolute}"
)
otrans = await asyncio.gather(
*(self.get_episodes(show_id, language=lang) for lang in olang)
)
translations = {
self.normalize_lang(lang): EpisodeTranslation(
name=val["name"],
overview=val["overview"],
)
for (lang, val) in zip(
self._languages,
[
ret,
*(next(x for x in e[0] if x["id"] == ret["id"]) for e in otrans),
],
)
}
return Episode(
show=PartialShow(
name=show["name"],
original_language=self.normalize_lang(show["originalLanguage"]),
external_id={
self.name: MetadataID(
show_id, f"https://thetvdb.com/series/{show['slug']}"
),
},
),
season_number=ret["seasonNumber"],
episode_number=ret["number"],
absolute_number=ret["absoluteNumber"],
runtime=ret["runtime"],
release_date=datetime.strptime(ret["aired"], "%Y-%m-%d").date(),
thumbnail=f"https://artworks.thetvdb.com{ret['image']}",
external_id={
self.name: EpisodeID(
show_id,
ret["seasonNumber"],
ret["number"],
f"https://thetvdb.com/series/{show_id}/episodes/{ret['id']}",
),
},
translations=translations,
)
@cache(ttl=timedelta(days=1))
async def identify_show(self, show_id: str) -> Show:
ret = await self.get(
f"series/{show_id}/extended",
not_found_fail=f"Could not find show with id {show_id}",
)
logger.debug("TVDB responded: %s", ret)
ret = ret["data"]
translations = await asyncio.gather(
*(
self.get(f"series/{show_id}/translations/{lang}")
for lang in self._languages
if lang != ret["originalLanguage"]
)
)
trans = {
self.normalize_lang(lang): ShowTranslation(
name=x["name"],
tagline=None,
tags=[],
overview=x["overview"],
posters=[
i["image"]
for i in x["artworks"]
if i["type"] == 2
and (i["language"] == lang or i["language"] is None)
],
logos=[
i["image"]
for i in x["artworks"]
if i["type"] == 5
and (i["language"] == lang or i["language"] is None)
],
thumbnails=[
i["image"]
for i in x["artworks"]
if i["type"] == 3
and (i["language"] == lang or i["language"] is None)
],
trailers=[x["url"] for t in ret["trailers"] if t["language"] == lang],
)
for (lang, x) in [
(ret["originalLanguage"], ret),
*zip(self._languages, translations),
]
}
return Show(
original_language=self.normalize_lang(ret["originalLanguage"]),
aliases=[x["name"] for x in ret["aliases"]],
start_air=datetime.strptime(ret["firstAired"], "%Y-%m-%d").date(),
end_air=datetime.strptime(ret["lastAired"], "%Y-%m-%d").date(),
status=ShowStatus.FINISHED
if ret["status"]["name"] == "Ended"
else ShowStatus.AIRING
if ret["status"]["name"] == "Continuing"
else ShowStatus.PLANNED,
rating=None,
studios=[
Studio(
name=x["name"],
logos=[],
external_id={
self.name: MetadataID(
x["id"], f"https://thetvdb.com/companies/{x['slug']}"
)
},
)
for x in ret["companies"]
if x["companyType"]["companyTypeName"] == "Studio"
],
genres=[
self._genre_map[x["slug"]]
for x in ret["genres"]
if self._genre_map[x["slug"]] is not None
],
external_id={
self.name: MetadataID(
ret["id"], f"https://thetvdb.com/series/{ret['slug']}"
),
}
| self.process_remote_id(
ret["remoteIds"],
"themoviedatabase",
lambda x: f"https://www.themoviedb.org/tv/{x}",
"TheMovieDB.com",
)
| self.process_remote_id(
ret["remoteIds"],
"imdb",
lambda x: f"https://www.imdb.com/title/{x}",
"IMDB",
),
translations=trans,
seasons=await asyncio.gather(
*(self.identify_season(x["id"], x["number"]) for x in ret["seasons"])
),
)
def process_remote_id(
self, ids: dict, name: str, link: Callable[[str], str], tvdb_name: str
) -> dict:
id = next((x["id"] for x in ids if x["sourceName"] == tvdb_name), None)
if id is None:
return {}
return {name: MetadataID(id, link(id))}
@cache(ttl=timedelta(days=1))
async def identify_season(self, show_id: str, season: int) -> Season:
"""
for tvdb, we don't save show_id but the season_id so we don't need to read `season`
"""
season_id = show_id
info = await self.get(
f"seasons/{season_id}/extended",
not_found_fail=f"Invalid season id {season_id}",
)
async def process_translation(lang: str) -> SeasonTranslation:
data = await self.get(f"seasons/{season_id}/translations/{lang}")
return SeasonTranslation(
name=data["data"]["name"],
overview=data["data"]["overview"],
posters=[
i["image"]
for i in data["data"]["artworks"]
if i["type"] == 7
and (i["language"] == lang or i["language"] is None)
],
thumbnails=[
i["image"]
for i in data["data"]["artworks"]
if i["type"] == 8
and (i["language"] == lang or i["language"] is None)
],
)
trans = await asyncio.gather(*(process_translation(x) for x in self._languages))
translations = {
self.normalize_lang(lang): tl for lang, tl in zip(self._languages, trans)
}
return Season(
season_number=info["data"]["number"],
episodes_count=len(info["data"]["episodes"]),
start_air=min(x["aired"] for x in info["data"]["episodes"]),
end_air=max(x["aired"] for x in info["data"]["episodes"]),
external_id={
self.name: MetadataID(season_id, None),
},
translations=translations,
)