diff --git a/autosync/autosync/__init__.py b/autosync/autosync/__init__.py index e6e44ec2..8b1eda73 100644 --- a/autosync/autosync/__init__.py +++ b/autosync/autosync/__init__.py @@ -35,7 +35,7 @@ def on_message( body: bytes, ): try: - message = Message.from_json(body) # type: Message + message = Message.from_json(body) service.update(message.value.user, message.value.resource, message.value) except Exception as e: logging.exception("Error processing message.", exc_info=e) diff --git a/autosync/autosync/models/message.py b/autosync/autosync/models/message.py index 94b3312e..d24f39fc 100644 --- a/autosync/autosync/models/message.py +++ b/autosync/autosync/models/message.py @@ -1,5 +1,5 @@ from dataclasses import dataclass -from dataclasses_json import dataclass_json, LetterCase +from dataclasses_json import DataClassJsonMixin, dataclass_json, LetterCase from autosync.models.episode import Episode from autosync.models.movie import Movie @@ -17,7 +17,7 @@ class WatchStatusMessage(WatchStatus): @dataclass_json(letter_case=LetterCase.CAMEL) @dataclass -class Message: +class Message(DataClassJsonMixin): action: str type: str value: WatchStatusMessage diff --git a/docker-compose.dev.yml b/docker-compose.dev.yml index 3856779c..75c03090 100644 --- a/docker-compose.dev.yml +++ b/docker-compose.dev.yml @@ -84,6 +84,18 @@ services: volumes: - ${LIBRARY_ROOT}:/video:ro + matcher: + build: ./scanner + command: matcher + restart: on-failure + depends_on: + back: + condition: service_healthy + env_file: + - ./.env + environment: + - KYOO_URL=${KYOO_URL:-http://back:5000} + autosync: build: ./autosync restart: on-failure diff --git a/docker-compose.prod.yml b/docker-compose.prod.yml index 57dfb271..0e7a9872 100644 --- a/docker-compose.prod.yml +++ b/docker-compose.prod.yml @@ -59,6 +59,18 @@ services: volumes: - ${LIBRARY_ROOT}:/video:ro + matcher: + image: zoriya/kyoo_scanner:latest + command: matcher + restart: unless-stopped + depends_on: + back: + condition: service_healthy + env_file: + - ./.env + environment: + - KYOO_URL=${KYOO_URL:-http://back:5000} + autosync: image: zoriya/kyoo_autosync:latest restart: on-failure diff --git a/docker-compose.yml b/docker-compose.yml index 4a3db931..c8815528 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -60,6 +60,18 @@ services: volumes: - ${LIBRARY_ROOT}:/video:ro + matcher: + build: ./scanner + command: matcher + restart: unless-stopped + depends_on: + back: + condition: service_healthy + env_file: + - ./.env + environment: + - KYOO_URL=${KYOO_URL:-http://back:5000} + autosync: build: ./autosync restart: on-failure diff --git a/front/packages/models/src/resources/genre.ts b/front/packages/models/src/resources/genre.ts index dfb04d9a..c54004bb 100644 --- a/front/packages/models/src/resources/genre.ts +++ b/front/packages/models/src/resources/genre.ts @@ -37,4 +37,10 @@ export enum Genre { Thriller = "Thriller", War = "War", Western = "Western", + Kids = "Kids", + News = "News", + Reality = "Reality", + Soap = "Soap", + Talk = "Talk", + Politics = "Politics", } diff --git a/scanner/.dockerignore b/scanner/.dockerignore new file mode 100644 index 00000000..e136516d --- /dev/null +++ b/scanner/.dockerignore @@ -0,0 +1,2 @@ +Dockerfile* + diff --git a/scanner/Dockerfile b/scanner/Dockerfile index 372016df..162663e2 100644 --- a/scanner/Dockerfile +++ b/scanner/Dockerfile @@ -5,4 +5,5 @@ COPY ./requirements.txt . RUN pip3 install -r ./requirements.txt COPY . . -ENTRYPOINT ["python3", "-m", "scanner", "-v"] +ENTRYPOINT ["python3", "-m"] +CMD ["scanner"] diff --git a/scanner/matcher/__init__.py b/scanner/matcher/__init__.py new file mode 100644 index 00000000..195a1336 --- /dev/null +++ b/scanner/matcher/__init__.py @@ -0,0 +1,18 @@ +async def main(): + import logging + import sys + from providers.provider import Provider + from providers.kyoo_client import KyooClient + from .matcher import Matcher + from .subscriber import Subscriber + + logging.basicConfig(level=logging.INFO) + if len(sys.argv) > 1 and sys.argv[1] == "-v": + logging.basicConfig(level=logging.DEBUG) + logging.getLogger("watchfiles").setLevel(logging.WARNING) + logging.getLogger("rebulk").setLevel(logging.WARNING) + + async with KyooClient() as kyoo, Subscriber() as sub: + provider, xem = Provider.get_all(kyoo.client) + scanner = Matcher(kyoo, provider, xem) + await sub.listen(scanner) diff --git a/scanner/matcher/__main__.py b/scanner/matcher/__main__.py new file mode 100644 index 00000000..670779da --- /dev/null +++ b/scanner/matcher/__main__.py @@ -0,0 +1,6 @@ +#!/usr/bin/env python + +import asyncio +import matcher + +asyncio.run(matcher.main()) diff --git a/scanner/scanner/cache.py b/scanner/matcher/cache.py similarity index 100% rename from scanner/scanner/cache.py rename to scanner/matcher/cache.py diff --git a/scanner/matcher/matcher.py b/scanner/matcher/matcher.py new file mode 100644 index 00000000..d09a4b80 --- /dev/null +++ b/scanner/matcher/matcher.py @@ -0,0 +1,169 @@ +from datetime import timedelta +import asyncio +from logging import getLogger +from providers.implementations.thexem import TheXem +from providers.provider import Provider, ProviderError +from providers.types.collection import Collection +from providers.types.show import Show +from providers.types.episode import Episode, PartialShow +from providers.types.season import Season +from providers.kyoo_client import KyooClient +from .parser.guess import guessit +from .cache import cache, exec_as_cache, make_key + +logger = getLogger(__name__) + + +class Matcher: + def __init__(self, client: KyooClient, provider: Provider, xem: TheXem) -> None: + self._client = client + self._provider = provider + self._xem = xem + + self._collection_cache = {} + self._show_cache = {} + self._season_cache = {} + + async def delete(self, path: str): + try: + await self._client.delete(path) + return True + except Exception as e: + logger.exception("Unhandled error", exc_info=e) + return False + + async def identify(self, path: str): + try: + await self._identify(path) + await self._client.delete_issue(path) + except ProviderError as e: + logger.error(e) + await self._client.create_issue(path, str(e)) + except Exception as e: + logger.exception("Unhandled error", exc_info=e) + await self._client.create_issue( + path, "Unknown error", {"type": type(e).__name__, "message": str(e)} + ) + return False + return True + + async def _identify(self, path: str): + raw = guessit(path, xem_titles=await self._xem.get_expected_titles()) + + if "mimetype" not in raw or not raw["mimetype"].startswith("video"): + return + # Remove seasons in "One Piece (1999) 152.mkv" for example + if raw.get("season") == raw.get("year") and "season" in raw: + del raw["season"] + + if isinstance(raw.get("season"), list): + raise ProviderError( + f"An episode can't have multiple seasons (found {raw.get('season')} for {path})" + ) + if isinstance(raw.get("episode"), list): + raise ProviderError( + f"Multi-episodes files are not yet supported (for {path})" + ) + + logger.info("Identied %s: %s", path, raw) + + if raw["type"] == "movie": + movie = await self._provider.identify_movie(raw["title"], raw.get("year")) + movie.path = str(path) + logger.debug("Got movie: %s", movie) + movie_id = await self._client.post("movies", data=movie.to_kyoo()) + + if any(movie.collections): + ids = await asyncio.gather( + *(self.create_or_get_collection(x) for x in movie.collections) + ) + await asyncio.gather( + *(self._client.link_collection(x, "movie", movie_id) for x in ids) + ) + elif raw["type"] == "episode": + episode = await self._provider.identify_episode( + raw["title"], + season=raw.get("season"), + episode_nbr=raw.get("episode"), + absolute=raw.get("episode") if "season" not in raw else None, + year=raw.get("year"), + ) + episode.path = str(path) + logger.debug("Got episode: %s", episode) + episode.show_id = await self.create_or_get_show(episode) + + if episode.season_number is not None: + episode.season_id = await self.register_seasons( + episode.show, episode.show_id, episode.season_number + ) + await self._client.post("episodes", data=episode.to_kyoo()) + else: + logger.warn("Unknown video file type: %s", raw["type"]) + + async def create_or_get_collection(self, collection: Collection) -> str: + @cache(ttl=timedelta(days=1), cache=self._collection_cache) + async def create_collection(provider_id: str): + # TODO: Check if a collection with the same metadata id exists already on kyoo. + new_collection = ( + await self._provider.identify_collection(provider_id) + if not any(collection.translations.keys()) + else collection + ) + logger.debug("Got collection: %s", new_collection) + return await self._client.post("collection", data=new_collection.to_kyoo()) + + # The parameter is only used as a key for the cache. + provider_id = collection.external_id[self._provider.name].data_id + return await create_collection(provider_id) + + async def create_or_get_show(self, episode: Episode) -> str: + @cache(ttl=timedelta(days=1), cache=self._show_cache) + async def create_show(_: str): + # TODO: Check if a show with the same metadata id exists already on kyoo. + show = ( + await self._provider.identify_show( + episode.show.external_id[self._provider.name].data_id, + ) + if isinstance(episode.show, PartialShow) + else episode.show + ) + # TODO: collections + logger.debug("Got show: %s", episode) + ret = await self._client.post("show", data=show.to_kyoo()) + + async def create_season(season: Season, id: str): + try: + season.show_id = id + return await self._client.post("seasons", data=season.to_kyoo()) + except Exception as e: + logger.exception("Unhandled error create a season", exc_info=e) + + season_tasks = map( + lambda s: exec_as_cache( + self._season_cache, + make_key((ret, s.season_number)), + lambda: create_season(s, ret), + ), + show.seasons, + ) + await asyncio.gather(*season_tasks) + + return ret + + # The parameter is only used as a key for the cache. + provider_id = episode.show.external_id[self._provider.name].data_id + return await create_show(provider_id) + + async def register_seasons( + self, show: Show | PartialShow, show_id: str, season_number: int + ) -> str: + # We use an external season cache because we want to edit this cache programatically + @cache(ttl=timedelta(days=1), cache=self._season_cache) + async def create_season(_: str, __: int): + season = await self._provider.identify_season( + show.external_id[self._provider.name].data_id, season_number + ) + season.show_id = show_id + return await self._client.post("seasons", data=season.to_kyoo()) + + return await create_season(show_id, season_number) diff --git a/scanner/scanner/parser/guess.py b/scanner/matcher/parser/guess.py similarity index 100% rename from scanner/scanner/parser/guess.py rename to scanner/matcher/parser/guess.py diff --git a/scanner/scanner/parser/rules.py b/scanner/matcher/parser/rules.py similarity index 100% rename from scanner/scanner/parser/rules.py rename to scanner/matcher/parser/rules.py diff --git a/scanner/matcher/subscriber.py b/scanner/matcher/subscriber.py new file mode 100644 index 00000000..15d25710 --- /dev/null +++ b/scanner/matcher/subscriber.py @@ -0,0 +1,57 @@ +import asyncio +from dataclasses import dataclass +from dataclasses_json import DataClassJsonMixin +from typing import Literal +import os +import logging +from aio_pika import connect_robust +from aio_pika.abc import AbstractIncomingMessage + +from matcher.matcher import Matcher + +logger = logging.getLogger(__name__) + + +@dataclass +class Message(DataClassJsonMixin): + action: Literal["scan", "delete"] + path: str + + +class Subscriber: + QUEUE = "scanner" + + async def __aenter__(self): + self._con = await connect_robust( + host=os.environ.get("RABBITMQ_HOST", "rabbitmq"), + login=os.environ.get("RABBITMQ_DEFAULT_USER", "guest"), + password=os.environ.get("RABBITMQ_DEFAULT_PASS", "guest"), + ) + self._channel = await self._con.channel() + self._queue = await self._channel.declare_queue(self.QUEUE) + return self + + async def __aexit__(self, exc_type, exc_value, exc_tb): + await self._con.close() + + async def listen(self, scanner: Matcher): + async def on_message(message: AbstractIncomingMessage): + msg = Message.from_json(message.body) + ack = False + match msg.action: + case "scan": + ack = await scanner.identify(msg.path) + case "delete": + ack = await scanner.delete(msg.path) + case _: + logger.error(f"Invalid action: {msg.action}") + if ack: + await message.ack() + else: + await message.reject() + + # Allow up to 20 scan requests to run in parallel on the same listener. + # Since most work is calling API not doing that is a waste. + await self._channel.set_qos(prefetch_count=20) + await self._queue.consume(on_message) + await asyncio.Future() diff --git a/scanner/providers/implementations/themoviedatabase.py b/scanner/providers/implementations/themoviedatabase.py index eb1971fa..3f1c861e 100644 --- a/scanner/providers/implementations/themoviedatabase.py +++ b/scanner/providers/implementations/themoviedatabase.py @@ -1,14 +1,14 @@ import asyncio -import logging from aiohttp import ClientSession from datetime import datetime, timedelta +from logging import getLogger from typing import Awaitable, Callable, Dict, List, Optional, Any, TypeVar from itertools import accumulate, zip_longest from providers.idmapper import IdMapper from providers.implementations.thexem import TheXem from providers.utils import ProviderError -from scanner.cache import cache +from matcher.cache import cache from ..provider import Provider from ..types.movie import Movie, MovieTranslation, Status as MovieStatus @@ -20,11 +20,13 @@ from ..types.metadataid import MetadataID from ..types.show import Show, ShowTranslation, Status as ShowStatus from ..types.collection import Collection, CollectionTranslation +logger = getLogger(__name__) + class TheMovieDatabase(Provider): def __init__( self, - languages, + languages: list[str], client: ClientSession, api_key: str, xem: TheXem, @@ -158,7 +160,7 @@ class TheMovieDatabase(Provider): "append_to_response": "alternative_titles,videos,credits,keywords,images", }, ) - logging.debug("TMDb responded: %s", movie) + logger.debug("TMDb responded: %s", movie) ret = Movie( original_language=movie["original_language"], @@ -256,7 +258,7 @@ class TheMovieDatabase(Provider): "append_to_response": "alternative_titles,videos,credits,keywords,images,external_ids", }, ) - logging.debug("TMDb responded: %s", show) + logger.debug("TMDb responded: %s", show) ret = Show( original_language=show["original_language"], @@ -427,7 +429,7 @@ class TheMovieDatabase(Provider): if self.name in ret.external_id: return ret - logging.warn( + logger.warn( "Could not map xem exception to themoviedb, searching instead for %s", new_name, ) @@ -473,7 +475,7 @@ class TheMovieDatabase(Provider): else None ) if tvdb_id is None: - logging.info( + logger.info( "Tvdb could not be found, trying xem name lookup for %s", name ) _, tvdb_id = await self._xem.get_show_override("tvdb", old_name) @@ -518,7 +520,7 @@ class TheMovieDatabase(Provider): }, not_found_fail=f"Could not find episode {episode_nbr} of season {season} of serie {name} (absolute: {absolute})", ) - logging.debug("TMDb responded: %s", episode) + logger.debug("TMDb responded: %s", episode) ret = Episode( show=show, @@ -616,7 +618,7 @@ class TheMovieDatabase(Provider): grp = next(iter(group["groups"]), None) return grp["episodes"] if grp else None except Exception as e: - logging.exception( + logger.exception( "Could not retrieve absolute ordering information", exc_info=e ) return None @@ -697,7 +699,7 @@ class TheMovieDatabase(Provider): "language": lng, }, ) - logging.debug("TMDb responded: %s", collection) + logger.debug("TMDb responded: %s", collection) ret = Collection( external_id={ diff --git a/scanner/providers/implementations/thexem.py b/scanner/providers/implementations/thexem.py index 565ae1fc..23e22c84 100644 --- a/scanner/providers/implementations/thexem.py +++ b/scanner/providers/implementations/thexem.py @@ -1,11 +1,13 @@ import re -import logging from typing import Dict, List, Literal from aiohttp import ClientSession +from logging import getLogger from datetime import timedelta from providers.utils import ProviderError -from scanner.cache import cache +from matcher.cache import cache + +logger = getLogger(__name__) def clean(s: str): @@ -28,7 +30,7 @@ class TheXem: async def get_map( self, provider: Literal["tvdb"] | Literal["anidb"] ) -> Dict[str, List[Dict[str, int]]]: - logging.info("Fetching data from thexem for %s", provider) + logger.info("Fetching data from thexem for %s", provider) async with self._client.get( f"{self.base}/map/allNames", params={ @@ -40,7 +42,7 @@ class TheXem: r.raise_for_status() ret = await r.json() if "data" not in ret or ret["result"] == "failure": - logging.error("Could not fetch xem metadata. Error: %s", ret["message"]) + logger.error("Could not fetch xem metadata. Error: %s", ret["message"]) raise ProviderError("Could not fetch xem metadata") return ret["data"] @@ -53,7 +55,7 @@ class TheXem: Dict[Literal["season"] | Literal["episode"] | Literal["absolute"], int], ] ]: - logging.info("Fetching from thexem the map of %s (%s)", id, provider) + logger.info("Fetching from thexem the map of %s (%s)", id, provider) async with self._client.get( f"{self.base}/map/all", params={ @@ -64,7 +66,7 @@ class TheXem: r.raise_for_status() ret = await r.json() if "data" not in ret or ret["result"] == "failure": - logging.error("Could not fetch xem mapping. Error: %s", ret["message"]) + logger.error("Could not fetch xem mapping. Error: %s", ret["message"]) return [] return ret["data"] @@ -111,7 +113,7 @@ class TheXem: if master_season is None or master_season == -1: return [None, None, episode] - logging.info( + logger.info( "Fount xem override for show %s, ep %d. Master season: %d", show_name, episode, @@ -130,7 +132,7 @@ class TheXem: None, ) if ep is None: - logging.warning( + logger.warning( "Could not get xem mapping for show %s, falling back to identifier mapping.", show_name, ) diff --git a/scanner/providers/kyoo_client.py b/scanner/providers/kyoo_client.py new file mode 100644 index 00000000..5dfbdc7b --- /dev/null +++ b/scanner/providers/kyoo_client.py @@ -0,0 +1,156 @@ +import os +import jsons +from aiohttp import ClientSession +from datetime import date +from logging import getLogger +from typing import List, Literal, Any, Optional +from urllib.parse import quote + +from .utils import format_date + +logger = getLogger(__name__) + + +class KyooClient: + def __init__(self) -> None: + self._api_key = os.environ.get("KYOO_APIKEY") + if not self._api_key: + self._api_key = os.environ.get("KYOO_APIKEYS") + if not self._api_key: + print("Missing environment variable 'KYOO_APIKEY'.") + exit(2) + self._api_key = self._api_key.split(",")[0] + + self._url = os.environ.get("KYOO_URL", "http://back:5000") + + async def __aenter__(self): + jsons.set_serializer(lambda x, **_: format_date(x), type[Optional[date | int]]) + self.client = ClientSession( + json_serialize=lambda *args, **kwargs: jsons.dumps( + *args, key_transformer=jsons.KEY_TRANSFORMER_CAMELCASE, **kwargs + ), + ) + return self + + async def __aexit__(self, exc_type, exc_value, exc_tb): + await self.client.close() + + async def get_registered_paths(self) -> List[str]: + paths = None + async with self.client.get( + f"{self._url}/episodes", + params={"limit": 0}, + headers={"X-API-Key": self._api_key}, + ) as r: + r.raise_for_status() + ret = await r.json() + paths = list(x["path"] for x in ret["items"]) + + async with self.client.get( + f"{self._url}/movies", + params={"limit": 0}, + headers={"X-API-Key": self._api_key}, + ) as r: + r.raise_for_status() + ret = await r.json() + paths += list(x["path"] for x in ret["items"]) + return paths + + async def create_issue(self, path: str, issue: str, extra: dict | None = None): + async with self.client.post( + f"{self._url}/issues", + json={ + "domain": "scanner", + "cause": path, + "reason": issue, + "extra": extra if extra is not None else {}, + }, + headers={"X-API-Key": self._api_key}, + ) as r: + if not r.ok: + logger.error(f"Request error: {await r.text()}") + r.raise_for_status() + + async def delete_issue(self, path: str): + async with self.client.delete( + f'{self._url}/issues?filter=domain eq scanner and cause eq "{path}"', + headers={"X-API-Key": self._api_key}, + ) as r: + if not r.ok: + logger.error(f"Request error: {await r.text()}") + r.raise_for_status() + + async def link_collection( + self, collection: str, type: Literal["movie"] | Literal["show"], id: str + ): + async with self.client.put( + f"{self._url}/collections/{collection}/{type}/{id}", + headers={"X-API-Key": self._api_key}, + ) as r: + # Allow 409 and continue as if it worked. + if not r.ok and r.status != 409: + logger.error(f"Request error: {await r.text()}") + r.raise_for_status() + + async def post(self, path: str, *, data: dict[str, Any]) -> str: + logger.debug( + "Sending %s: %s", + path, + jsons.dumps( + data, + key_transformer=jsons.KEY_TRANSFORMER_CAMELCASE, + jdkwargs={"indent": 4}, + ), + ) + async with self.client.post( + f"{self._url}/{path}", + json=data, + headers={"X-API-Key": self._api_key}, + ) as r: + # Allow 409 and continue as if it worked. + if not r.ok and r.status != 409: + logger.error(f"Request error: {await r.text()}") + r.raise_for_status() + ret = await r.json() + + if r.status == 409 and ( + (path == "shows" and ret["startAir"][:4] != str(data["start_air"].year)) + or ( + path == "movies" + and ret["airDate"][:4] != str(data["air_date"].year) + ) + ): + logger.info( + f"Found a {path} with the same slug ({ret['slug']}) and a different date, using the date as part of the slug" + ) + year = (data["start_air"] if path == "movie" else data["air_date"]).year + data["slug"] = f"{ret['slug']}-{year}" + return await self.post(path, data=data) + return ret["id"] + + async def delete( + self, + path: str, + type: Literal["episode", "movie"] | None = None, + ): + logger.info("Deleting %s", path) + + if type is None or type == "movie": + async with self.client.delete( + f'{self._url}/movies?filter=path eq "{quote(path)}"', + headers={"X-API-Key": self._api_key}, + ) as r: + if not r.ok: + logger.error(f"Request error: {await r.text()}") + r.raise_for_status() + + if type is None or type == "episode": + async with self.client.delete( + f'{self._url}/episodes?filter=path eq "{quote(path)}"', + headers={"X-API-Key": self._api_key}, + ) as r: + if not r.ok: + logger.error(f"Request error: {await r.text()}") + r.raise_for_status() + + await self.delete_issue(path) diff --git a/scanner/providers/provider.py b/scanner/providers/provider.py index f9e449e4..5f5e7f53 100644 --- a/scanner/providers/provider.py +++ b/scanner/providers/provider.py @@ -1,7 +1,7 @@ import os from aiohttp import ClientSession from abc import abstractmethod, abstractproperty -from typing import Optional, TypeVar +from typing import Optional, Self from providers.implementations.thexem import TheXem from providers.utils import ProviderError @@ -13,14 +13,14 @@ from .types.movie import Movie from .types.collection import Collection -Self = TypeVar("Self", bound="Provider") - - class Provider: @classmethod - def get_all( - cls: type[Self], client: ClientSession, languages: list[str] - ) -> tuple[list[Self], TheXem]: + def get_all(cls, client: ClientSession) -> tuple[Self, TheXem]: + languages = os.environ.get("LIBRARY_LANGUAGES") + if not languages: + print("Missing environment variable 'LIBRARY_LANGUAGES'.") + exit(2) + languages = languages.split(",") providers = [] from providers.idmapper import IdMapper @@ -44,7 +44,7 @@ class Provider: idmapper.init(tmdb=tmdb, language=languages[0]) - return providers, xem + return next(iter(providers)), xem @abstractproperty def name(self) -> str: diff --git a/scanner/requirements.txt b/scanner/requirements.txt index 5a49dfae..cdeb31bd 100644 --- a/scanner/requirements.txt +++ b/scanner/requirements.txt @@ -2,3 +2,5 @@ guessit aiohttp jsons watchfiles +aio-pika +dataclasses-json diff --git a/scanner/scanner/__init__.py b/scanner/scanner/__init__.py index 587f1086..76951d53 100644 --- a/scanner/scanner/__init__.py +++ b/scanner/scanner/__init__.py @@ -2,46 +2,17 @@ async def main(): import asyncio import os import logging - import sys - import jsons - from datetime import date - from typing import Optional - from aiohttp import ClientSession - from providers.utils import format_date, ProviderError - from .scanner import Scanner from .monitor import monitor + from .scanner import scan + from .publisher import Publisher + from providers.kyoo_client import KyooClient - path = os.environ.get("SCANNER_LIBRARY_ROOT", "/video") - languages = os.environ.get("LIBRARY_LANGUAGES") - if not languages: - print("Missing environment variable 'LIBRARY_LANGUAGES'.") - exit(2) - api_key = os.environ.get("KYOO_APIKEY") - if not api_key: - api_key = os.environ.get("KYOO_APIKEYS") - if not api_key: - print("Missing environment variable 'KYOO_APIKEY'.") - exit(2) - api_key = api_key.split(",")[0] - - if len(sys.argv) > 1 and sys.argv[1] == "-v": - logging.basicConfig(level=logging.INFO) - if len(sys.argv) > 1 and sys.argv[1] == "-vv": - logging.basicConfig(level=logging.DEBUG) + logging.basicConfig(level=logging.INFO) logging.getLogger("watchfiles").setLevel(logging.WARNING) - logging.getLogger("rebulk").setLevel(logging.WARNING) - jsons.set_serializer(lambda x, **_: format_date(x), Optional[date | int]) # type: ignore - async with ClientSession( - json_serialize=lambda *args, **kwargs: jsons.dumps( - *args, key_transformer=jsons.KEY_TRANSFORMER_CAMELCASE, **kwargs - ), - ) as client: - try: - scanner = Scanner(client, languages=languages.split(","), api_key=api_key) - await asyncio.gather( - monitor(path, scanner), - scanner.scan(path), - ) - except ProviderError as e: - logging.error(e) + async with Publisher() as publisher, KyooClient() as client: + path = os.environ.get("SCANNER_LIBRARY_ROOT", "/video") + await asyncio.gather( + monitor(path, publisher), + scan(path, publisher, client), + ) diff --git a/scanner/scanner/monitor.py b/scanner/scanner/monitor.py index 7949541d..1fe24ace 100644 --- a/scanner/scanner/monitor.py +++ b/scanner/scanner/monitor.py @@ -1,22 +1,19 @@ -import logging +from logging import getLogger from watchfiles import awatch, Change -from .utils import ProviderError -from .scanner import Scanner + +from .publisher import Publisher + +logger = getLogger(__name__) -async def monitor(path: str, scanner: Scanner): +async def monitor(path: str, publisher: Publisher): async for changes in awatch(path): for event, file in changes: - try: - if event == Change.added: - await scanner.identify(file) - elif event == Change.deleted: - await scanner.delete(file) - elif event == Change.modified: - pass - else: - print(f"Change {event} occured for file {file}") - except ProviderError as e: - logging.error(str(e)) - except Exception as e: - logging.exception("Unhandled error", exc_info=e) + if event == Change.added: + await publisher.add(file) + elif event == Change.deleted: + await publisher.delete(file) + elif event == Change.modified: + pass + else: + logger.info(f"Change {event} occured for file {file}") diff --git a/scanner/scanner/publisher.py b/scanner/scanner/publisher.py new file mode 100644 index 00000000..7a99295d --- /dev/null +++ b/scanner/scanner/publisher.py @@ -0,0 +1,32 @@ +import os +from guessit.jsonutils import json +from aio_pika import Message, connect_robust + + +class Publisher: + QUEUE = "scanner" + + async def __aenter__(self): + self._con = await connect_robust( + host=os.environ.get("RABBITMQ_HOST", "rabbitmq"), + login=os.environ.get("RABBITMQ_DEFAULT_USER", "guest"), + password=os.environ.get("RABBITMQ_DEFAULT_PASS", "guest"), + ) + self._channel = await self._con.channel() + self._queue = await self._channel.declare_queue(self.QUEUE) + return self + + async def __aexit__(self, exc_type, exc_value, exc_tb): + await self._con.close() + + async def _publish(self, data: dict): + await self._channel.default_exchange.publish( + Message(json.dumps(data).encode()), + routing_key=self.QUEUE, + ) + + async def add(self, path: str): + await self._publish({"action": "scan", "path": path}) + + async def delete(self, path: str): + await self._publish({"action": "delete", "path": path}) diff --git a/scanner/scanner/requirements.txt b/scanner/scanner/requirements.txt new file mode 100644 index 00000000..e092edcc --- /dev/null +++ b/scanner/scanner/requirements.txt @@ -0,0 +1 @@ +aio-pika diff --git a/scanner/scanner/scanner.py b/scanner/scanner/scanner.py index 984cde7f..b74ecc14 100644 --- a/scanner/scanner/scanner.py +++ b/scanner/scanner/scanner.py @@ -1,298 +1,36 @@ -from datetime import timedelta import os -import asyncio -import logging -import jsons import re -from aiohttp import ClientSession -from pathlib import Path -from typing import List, Literal, Any -from urllib.parse import quote -from providers.provider import Provider, ProviderError -from providers.types.collection import Collection -from providers.types.show import Show -from providers.types.episode import Episode, PartialShow -from providers.types.season import Season -from .parser.guess import guessit -from .utils import batch, handle_errors -from .cache import cache, exec_as_cache, make_key +import asyncio +from logging import getLogger + +from .publisher import Publisher +from providers.kyoo_client import KyooClient + +logger = getLogger(__name__) -class Scanner: - def __init__( - self, client: ClientSession, *, languages: list[str], api_key: str - ) -> None: - self._client = client - self._api_key = api_key - self._url = os.environ.get("KYOO_URL", "http://back:5000") - try: - self._ignore_pattern = re.compile( - os.environ.get("LIBRARY_IGNORE_PATTERN", "") - ) - except Exception as e: - self._ignore_pattern = re.compile("") - logging.error(f"Invalid ignore pattern. Ignoring. Error: {e}") - [self.provider, *_], self._xem = Provider.get_all(client, languages) - self.languages = languages +async def scan(path: str, publisher: Publisher, client: KyooClient): + logger.info("Starting the scan. It can take some times...") + ignore_pattern = None + try: + ignore_pattern = re.compile(os.environ.get("LIBRARY_IGNORE_PATTERN", "")) + except Exception as e: + ignore_pattern = re.compile("") + logger.error(f"Invalid ignore pattern. Ignoring. Error: {e}") - self._collection_cache = {} - self._show_cache = {} - self._season_cache = {} + registered = await client.get_registered_paths() + videos = [ + os.path.join(dir, file) for dir, _, files in os.walk(path) for file in files + ] + to_register = [ + p for p in videos if p not in registered and not ignore_pattern.match(p) + ] + deleted = [x for x in registered if x not in videos] - async def scan(self, path: str): - logging.info("Starting the scan. It can take some times...") - self.registered = await self.get_registered_paths() - self.issues = await self.get_issues() - videos = [str(p) for p in Path(path).rglob("*") if p.is_file()] - deleted = [x for x in self.registered if x not in videos] + if len(deleted) != len(registered): + await asyncio.gather(*map(publisher.delete, deleted)) + elif len(deleted) > 0: + logger.warning("All video files are unavailable. Check your disks.") - if len(deleted) != len(self.registered): - for x in deleted: - await self.delete(x) - for x in self.issues: - if x not in videos: - await self.delete(x, "issue") - elif len(deleted) > 0: - logging.warning("All video files are unavailable. Check your disks.") - - # We batch videos by 20 because too mutch at once kinda DDOS everything. - for group in batch(iter(videos), 20): - await asyncio.gather(*map(self.identify, group)) - logging.info("Scan finished.") - - async def get_registered_paths(self) -> List[str]: - paths = None - async with self._client.get( - f"{self._url}/episodes", - params={"limit": 0}, - headers={"X-API-Key": self._api_key}, - ) as r: - r.raise_for_status() - ret = await r.json() - paths = list(x["path"] for x in ret["items"]) - - async with self._client.get( - f"{self._url}/movies", - params={"limit": 0}, - headers={"X-API-Key": self._api_key}, - ) as r: - r.raise_for_status() - ret = await r.json() - paths += list(x["path"] for x in ret["items"]) - return paths - - async def get_issues(self) -> List[str]: - async with self._client.get( - f"{self._url}/issues", - params={"limit": 0}, - headers={"X-API-Key": self._api_key}, - ) as r: - r.raise_for_status() - ret = await r.json() - return [x["cause"] for x in ret if x["domain"] == "scanner"] - - @handle_errors - async def identify(self, path: str): - if path in self.registered or self._ignore_pattern.match(path): - return - - raw = guessit(path, xem_titles=await self._xem.get_expected_titles()) - - if "mimetype" not in raw or not raw["mimetype"].startswith("video"): - return - # Remove seasons in "One Piece (1999) 152.mkv" for example - if raw.get("season") == raw.get("year") and "season" in raw: - del raw["season"] - - if isinstance(raw.get("season"), List): - raise ProviderError( - f"An episode can't have multiple seasons (found {raw.get('season')} for {path})" - ) - if isinstance(raw.get("episode"), List): - raise ProviderError( - f"Multi-episodes files are not yet supported (for {path})" - ) - - logging.info("Identied %s: %s", path, raw) - - if raw["type"] == "movie": - movie = await self.provider.identify_movie(raw["title"], raw.get("year")) - movie.path = str(path) - logging.debug("Got movie: %s", movie) - movie_id = await self.post("movies", data=movie.to_kyoo()) - - if any(movie.collections): - ids = await asyncio.gather( - *(self.create_or_get_collection(x) for x in movie.collections) - ) - await asyncio.gather( - *(self.link_collection(x, "movie", movie_id) for x in ids) - ) - elif raw["type"] == "episode": - episode = await self.provider.identify_episode( - raw["title"], - season=raw.get("season"), - episode_nbr=raw.get("episode"), - absolute=raw.get("episode") if "season" not in raw else None, - year=raw.get("year"), - ) - episode.path = str(path) - logging.debug("Got episode: %s", episode) - episode.show_id = await self.create_or_get_show(episode) - - if episode.season_number is not None: - episode.season_id = await self.register_seasons( - episode.show, episode.show_id, episode.season_number - ) - await self.post("episodes", data=episode.to_kyoo()) - else: - logging.warn("Unknown video file type: %s", raw["type"]) - - async def create_or_get_collection(self, collection: Collection) -> str: - @cache(ttl=timedelta(days=1), cache=self._collection_cache) - async def create_collection(provider_id: str): - # TODO: Check if a collection with the same metadata id exists already on kyoo. - new_collection = ( - await self.provider.identify_collection(provider_id) - if not any(collection.translations.keys()) - else collection - ) - logging.debug("Got collection: %s", new_collection) - return await self.post("collection", data=new_collection.to_kyoo()) - - # The parameter is only used as a key for the cache. - provider_id = collection.external_id[self.provider.name].data_id - return await create_collection(provider_id) - - async def link_collection( - self, collection: str, type: Literal["movie"] | Literal["show"], id: str - ): - async with self._client.put( - f"{self._url}/collections/{collection}/{type}/{id}", - headers={"X-API-Key": self._api_key}, - ) as r: - # Allow 409 and continue as if it worked. - if not r.ok and r.status != 409: - logging.error(f"Request error: {await r.text()}") - r.raise_for_status() - - async def create_or_get_show(self, episode: Episode) -> str: - @cache(ttl=timedelta(days=1), cache=self._show_cache) - async def create_show(_: str): - # TODO: Check if a show with the same metadata id exists already on kyoo. - show = ( - await self.provider.identify_show( - episode.show.external_id[self.provider.name].data_id, - ) - if isinstance(episode.show, PartialShow) - else episode.show - ) - # TODO: collections - logging.debug("Got show: %s", episode) - ret = await self.post("show", data=show.to_kyoo()) - - async def create_season(season: Season, id: str): - try: - season.show_id = id - return await self.post("seasons", data=season.to_kyoo()) - except Exception as e: - logging.exception("Unhandled error create a season", exc_info=e) - - season_tasks = map( - lambda s: exec_as_cache( - self._season_cache, - make_key((ret, s.season_number)), - lambda: create_season(s, ret), - ), - show.seasons, - ) - await asyncio.gather(*season_tasks) - - return ret - - # The parameter is only used as a key for the cache. - provider_id = episode.show.external_id[self.provider.name].data_id - return await create_show(provider_id) - - async def register_seasons( - self, show: Show | PartialShow, show_id: str, season_number: int - ) -> str: - # We use an external season cache because we want to edit this cache programatically - @cache(ttl=timedelta(days=1), cache=self._season_cache) - async def create_season(_: str, __: int): - season = await self.provider.identify_season( - show.external_id[self.provider.name].data_id, season_number - ) - season.show_id = show_id - return await self.post("seasons", data=season.to_kyoo()) - - return await create_season(show_id, season_number) - - async def post(self, path: str, *, data: dict[str, Any]) -> str: - logging.debug( - "Sending %s: %s", - path, - jsons.dumps( - data, - key_transformer=jsons.KEY_TRANSFORMER_CAMELCASE, - jdkwargs={"indent": 4}, - ), - ) - async with self._client.post( - f"{self._url}/{path}", - json=data, - headers={"X-API-Key": self._api_key}, - ) as r: - # Allow 409 and continue as if it worked. - if not r.ok and r.status != 409: - logging.error(f"Request error: {await r.text()}") - r.raise_for_status() - ret = await r.json() - - if r.status == 409 and ( - (path == "shows" and ret["startAir"][:4] != str(data["start_air"].year)) - or ( - path == "movies" - and ret["airDate"][:4] != str(data["air_date"].year) - ) - ): - logging.info( - f"Found a {path} with the same slug ({ret['slug']}) and a different date, using the date as part of the slug" - ) - year = (data["start_air"] if path == "movie" else data["air_date"]).year - data["slug"] = f"{ret['slug']}-{year}" - return await self.post(path, data=data) - return ret["id"] - - async def delete( - self, - path: str, - type: Literal["episode", "movie", "issue"] | None = None, - ): - logging.info("Deleting %s", path) - self.registered = filter(lambda x: x != path, self.registered) - - if type is None or type == "movie": - async with self._client.delete( - f'{self._url}/movies?filter=path eq "{quote(path)}"', - headers={"X-API-Key": self._api_key}, - ) as r: - if not r.ok: - logging.error(f"Request error: {await r.text()}") - r.raise_for_status() - - if type is None or type == "episode": - async with self._client.delete( - f'{self._url}/episodes?filter=path eq "{quote(path)}"', - headers={"X-API-Key": self._api_key}, - ) as r: - if not r.ok: - logging.error(f"Request error: {await r.text()}") - r.raise_for_status() - - if path in self.issues: - self.issues = filter(lambda x: x != path, self.issues) - await self._client.delete( - f'{self._url}/issues?filter=domain eq scanner and cause eq "{quote(path)}"', - headers={"X-API-Key": self._api_key}, - ) + await asyncio.gather(*map(publisher.add, to_register)) + logger.info("Scan finished.") diff --git a/scanner/scanner/utils.py b/scanner/scanner/utils.py deleted file mode 100644 index ae3de92a..00000000 --- a/scanner/scanner/utils.py +++ /dev/null @@ -1,56 +0,0 @@ -from __future__ import annotations -import logging -from functools import wraps -from itertools import islice -from typing import TYPE_CHECKING, Iterator, List, TypeVar -from providers.utils import ProviderError - -if TYPE_CHECKING: - from scanner.scanner import Scanner - - -T = TypeVar("T") - - -def batch(iterable: Iterator[T], n: int) -> Iterator[List[T]]: - "Batch data into lists of length n. The last batch may be shorter." - # batched('ABCDEFG', 3) --> ABC DEF G - it = iter(iterable) - while True: - batch = list(islice(it, n)) - if not batch: - return - yield batch - - -def handle_errors(f): - @wraps(f) - async def internal(self: Scanner, path: str): - try: - await f(self, path) - if path in self.issues: - await self._client.delete( - f'{self._url}/issues?filter=domain eq scanner and cause eq "{path}"', - headers={"X-API-Key": self._api_key}, - ) - except ProviderError as e: - logging.error(str(e)) - await self._client.post( - f"{self._url}/issues", - json={"domain": "scanner", "cause": path, "reason": str(e)}, - headers={"X-API-Key": self._api_key}, - ) - except Exception as e: - logging.exception("Unhandled error", exc_info=e) - await self._client.post( - f"{self._url}/issues", - json={ - "domain": "scanner", - "cause": path, - "reason": "Unknown error", - "extra": {"type": type(e).__name__, "message": str(e)}, - }, - headers={"X-API-Key": self._api_key}, - ) - - return internal diff --git a/shell.nix b/shell.nix index 7c1b81ee..482414f3 100644 --- a/shell.nix +++ b/shell.nix @@ -6,6 +6,7 @@ jsons watchfiles pika + aio-pika requests dataclasses-json ]);