diff --git a/docker-compose.dev.yml b/docker-compose.dev.yml index 3856779c..caf8138d 100644 --- a/docker-compose.dev.yml +++ b/docker-compose.dev.yml @@ -84,6 +84,18 @@ services: volumes: - ${LIBRARY_ROOT}:/video:ro + matcher: + build: ./scanner + command: "matcher" + restart: on-failure + depends_on: + back: + condition: service_healthy + env_file: + - ./.env + environment: + - KYOO_URL=${KYOO_URL:-http://back:5000} + autosync: build: ./autosync restart: on-failure diff --git a/scanner/Dockerfile b/scanner/Dockerfile index 785dd1bc..162663e2 100644 --- a/scanner/Dockerfile +++ b/scanner/Dockerfile @@ -5,4 +5,5 @@ COPY ./requirements.txt . RUN pip3 install -r ./requirements.txt COPY . . -ENTRYPOINT ["python3", "-m", "scanner"] +ENTRYPOINT ["python3", "-m"] +CMD ["scanner"] diff --git a/scanner/matcher/__init__.py b/scanner/matcher/__init__.py new file mode 100644 index 00000000..195a1336 --- /dev/null +++ b/scanner/matcher/__init__.py @@ -0,0 +1,18 @@ +async def main(): + import logging + import sys + from providers.provider import Provider + from providers.kyoo_client import KyooClient + from .matcher import Matcher + from .subscriber import Subscriber + + logging.basicConfig(level=logging.INFO) + if len(sys.argv) > 1 and sys.argv[1] == "-v": + logging.basicConfig(level=logging.DEBUG) + logging.getLogger("watchfiles").setLevel(logging.WARNING) + logging.getLogger("rebulk").setLevel(logging.WARNING) + + async with KyooClient() as kyoo, Subscriber() as sub: + provider, xem = Provider.get_all(kyoo.client) + scanner = Matcher(kyoo, provider, xem) + await sub.listen(scanner) diff --git a/scanner/matcher/__main__.py b/scanner/matcher/__main__.py new file mode 100644 index 00000000..670779da --- /dev/null +++ b/scanner/matcher/__main__.py @@ -0,0 +1,6 @@ +#!/usr/bin/env python + +import asyncio +import matcher + +asyncio.run(matcher.main()) diff --git a/scanner/scanner/cache.py b/scanner/matcher/cache.py similarity index 100% rename from scanner/scanner/cache.py rename to scanner/matcher/cache.py diff --git a/scanner/matcher/matcher.py b/scanner/matcher/matcher.py new file mode 100644 index 00000000..534f0d8e --- /dev/null +++ b/scanner/matcher/matcher.py @@ -0,0 +1,167 @@ +from datetime import timedelta +import asyncio +import logging +from providers.implementations.thexem import TheXem +from providers.provider import Provider, ProviderError +from providers.types.collection import Collection +from providers.types.show import Show +from providers.types.episode import Episode, PartialShow +from providers.types.season import Season +from providers.kyoo_client import KyooClient +from .parser.guess import guessit +from .cache import cache, exec_as_cache, make_key + + +class Matcher: + def __init__(self, client: KyooClient, provider: Provider, xem: TheXem) -> None: + self._client = client + self._provider = provider + self._xem = xem + + self._collection_cache = {} + self._show_cache = {} + self._season_cache = {} + + async def delete(self, path: str): + try: + await self._client.delete(path) + return True + except Exception as e: + logging.exception("Unhandled error", exc_info=e) + return False + + async def identify(self, path: str): + try: + await self.identify(path) + await self._client.delete_issue(path) + except ProviderError as e: + logging.error(e) + await self._client.create_issue(path, str(e)) + except Exception as e: + logging.exception("Unhandled error", exc_info=e) + await self._client.create_issue( + path, "Unknown error", {"type": type(e).__name__, "message": str(e)} + ) + return False + return True + + async def _identify(self, path: str): + raw = guessit(path, xem_titles=await self._xem.get_expected_titles()) + + if "mimetype" not in raw or not raw["mimetype"].startswith("video"): + return + # Remove seasons in "One Piece (1999) 152.mkv" for example + if raw.get("season") == raw.get("year") and "season" in raw: + del raw["season"] + + if isinstance(raw.get("season"), list): + raise ProviderError( + f"An episode can't have multiple seasons (found {raw.get('season')} for {path})" + ) + if isinstance(raw.get("episode"), list): + raise ProviderError( + f"Multi-episodes files are not yet supported (for {path})" + ) + + logging.info("Identied %s: %s", path, raw) + + if raw["type"] == "movie": + movie = await self._provider.identify_movie(raw["title"], raw.get("year")) + movie.path = str(path) + logging.debug("Got movie: %s", movie) + movie_id = await self._client.post("movies", data=movie.to_kyoo()) + + if any(movie.collections): + ids = await asyncio.gather( + *(self.create_or_get_collection(x) for x in movie.collections) + ) + await asyncio.gather( + *(self._client.link_collection(x, "movie", movie_id) for x in ids) + ) + elif raw["type"] == "episode": + episode = await self._provider.identify_episode( + raw["title"], + season=raw.get("season"), + episode_nbr=raw.get("episode"), + absolute=raw.get("episode") if "season" not in raw else None, + year=raw.get("year"), + ) + episode.path = str(path) + logging.debug("Got episode: %s", episode) + episode.show_id = await self.create_or_get_show(episode) + + if episode.season_number is not None: + episode.season_id = await self.register_seasons( + episode.show, episode.show_id, episode.season_number + ) + await self._client.post("episodes", data=episode.to_kyoo()) + else: + logging.warn("Unknown video file type: %s", raw["type"]) + + async def create_or_get_collection(self, collection: Collection) -> str: + @cache(ttl=timedelta(days=1), cache=self._collection_cache) + async def create_collection(provider_id: str): + # TODO: Check if a collection with the same metadata id exists already on kyoo. + new_collection = ( + await self._provider.identify_collection(provider_id) + if not any(collection.translations.keys()) + else collection + ) + logging.debug("Got collection: %s", new_collection) + return await self._client.post("collection", data=new_collection.to_kyoo()) + + # The parameter is only used as a key for the cache. + provider_id = collection.external_id[self._provider.name].data_id + return await create_collection(provider_id) + + async def create_or_get_show(self, episode: Episode) -> str: + @cache(ttl=timedelta(days=1), cache=self._show_cache) + async def create_show(_: str): + # TODO: Check if a show with the same metadata id exists already on kyoo. + show = ( + await self._provider.identify_show( + episode.show.external_id[self._provider.name].data_id, + ) + if isinstance(episode.show, PartialShow) + else episode.show + ) + # TODO: collections + logging.debug("Got show: %s", episode) + ret = await self._client.post("show", data=show.to_kyoo()) + + async def create_season(season: Season, id: str): + try: + season.show_id = id + return await self._client.post("seasons", data=season.to_kyoo()) + except Exception as e: + logging.exception("Unhandled error create a season", exc_info=e) + + season_tasks = map( + lambda s: exec_as_cache( + self._season_cache, + make_key((ret, s.season_number)), + lambda: create_season(s, ret), + ), + show.seasons, + ) + await asyncio.gather(*season_tasks) + + return ret + + # The parameter is only used as a key for the cache. + provider_id = episode.show.external_id[self._provider.name].data_id + return await create_show(provider_id) + + async def register_seasons( + self, show: Show | PartialShow, show_id: str, season_number: int + ) -> str: + # We use an external season cache because we want to edit this cache programatically + @cache(ttl=timedelta(days=1), cache=self._season_cache) + async def create_season(_: str, __: int): + season = await self._provider.identify_season( + show.external_id[self._provider.name].data_id, season_number + ) + season.show_id = show_id + return await self._client.post("seasons", data=season.to_kyoo()) + + return await create_season(show_id, season_number) diff --git a/scanner/scanner/parser/guess.py b/scanner/matcher/parser/guess.py similarity index 100% rename from scanner/scanner/parser/guess.py rename to scanner/matcher/parser/guess.py diff --git a/scanner/scanner/parser/rules.py b/scanner/matcher/parser/rules.py similarity index 100% rename from scanner/scanner/parser/rules.py rename to scanner/matcher/parser/rules.py diff --git a/scanner/scanner/subscriber.py b/scanner/matcher/subscriber.py similarity index 92% rename from scanner/scanner/subscriber.py rename to scanner/matcher/subscriber.py index eaca56fe..035c09ee 100644 --- a/scanner/scanner/subscriber.py +++ b/scanner/matcher/subscriber.py @@ -6,13 +6,13 @@ import logging from aio_pika import connect_robust from aio_pika.abc import AbstractIncomingMessage -from scanner.scanner import Scanner +from matcher.matcher import Matcher logger = logging.getLogger(__name__) @dataclass class Message(DataClassJsonMixin): - action: Literal["scan"] | Literal["delete"] + action: Literal["scan", "delete"] path: str @@ -32,7 +32,7 @@ class Subscriber: async def __aexit__(self, exc_type, exc_value, exc_tb): await self._con.close() - async def listen(self, scanner: Scanner): + async def listen(self, scanner: Matcher): async def on_message(message: AbstractIncomingMessage): async with message.process(): msg = Message.from_json(message.body) diff --git a/scanner/monitor/__init__.py b/scanner/monitor/__init__.py deleted file mode 100644 index 76951d53..00000000 --- a/scanner/monitor/__init__.py +++ /dev/null @@ -1,18 +0,0 @@ -async def main(): - import asyncio - import os - import logging - from .monitor import monitor - from .scanner import scan - from .publisher import Publisher - from providers.kyoo_client import KyooClient - - logging.basicConfig(level=logging.INFO) - logging.getLogger("watchfiles").setLevel(logging.WARNING) - - async with Publisher() as publisher, KyooClient() as client: - path = os.environ.get("SCANNER_LIBRARY_ROOT", "/video") - await asyncio.gather( - monitor(path, publisher), - scan(path, publisher, client), - ) diff --git a/scanner/monitor/__main__.py b/scanner/monitor/__main__.py deleted file mode 100644 index b75252d5..00000000 --- a/scanner/monitor/__main__.py +++ /dev/null @@ -1,6 +0,0 @@ -#!/usr/bin/env python - -import asyncio -import monitor - -asyncio.run(monitor.main()) diff --git a/scanner/monitor/scanner.py b/scanner/monitor/scanner.py deleted file mode 100644 index b4dba07d..00000000 --- a/scanner/monitor/scanner.py +++ /dev/null @@ -1,36 +0,0 @@ -import os -import re -import asyncio -from logging import getLogger - -from monitor.publisher import Publisher -from providers.kyoo_client import KyooClient - -logger = getLogger(__name__) - - -async def scan(path: str, publisher: Publisher, client: KyooClient): - logger.info("Starting the scan. It can take some times...") - ignore_pattern = None - try: - ignore_pattern = re.compile(os.environ.get("LIBRARY_IGNORE_PATTERN", "")) - except Exception as e: - ignore_pattern = re.compile("") - logger.error(f"Invalid ignore pattern. Ignoring. Error: {e}") - - registered = await client.get_registered_paths() - videos = [ - os.path.join(dir, file) for dir, _, files in os.walk(path) for file in files - ] - to_register = [ - p for p in videos if p not in registered and not ignore_pattern.match(p) - ] - deleted = [x for x in registered if x not in videos] - - if len(deleted) != len(registered): - await asyncio.gather(*map(publisher.delete, deleted)) - elif len(deleted) > 0: - logger.warning("All video files are unavailable. Check your disks.") - - await asyncio.gather(*map(publisher.add, to_register)) - logger.info("Scan finished.") diff --git a/scanner/providers/implementations/themoviedatabase.py b/scanner/providers/implementations/themoviedatabase.py index eb1971fa..b8b9c310 100644 --- a/scanner/providers/implementations/themoviedatabase.py +++ b/scanner/providers/implementations/themoviedatabase.py @@ -8,7 +8,7 @@ from itertools import accumulate, zip_longest from providers.idmapper import IdMapper from providers.implementations.thexem import TheXem from providers.utils import ProviderError -from scanner.cache import cache +from matcher.cache import cache from ..provider import Provider from ..types.movie import Movie, MovieTranslation, Status as MovieStatus diff --git a/scanner/providers/implementations/thexem.py b/scanner/providers/implementations/thexem.py index 565ae1fc..8fcc75b6 100644 --- a/scanner/providers/implementations/thexem.py +++ b/scanner/providers/implementations/thexem.py @@ -5,7 +5,7 @@ from aiohttp import ClientSession from datetime import timedelta from providers.utils import ProviderError -from scanner.cache import cache +from matcher.cache import cache def clean(s: str): diff --git a/scanner/providers/kyoo_client.py b/scanner/providers/kyoo_client.py index 3e7b2d9c..ec271817 100644 --- a/scanner/providers/kyoo_client.py +++ b/scanner/providers/kyoo_client.py @@ -2,9 +2,12 @@ import os import logging import jsons from aiohttp import ClientSession -from typing import List, Literal, Any +from datetime import date +from typing import List, Literal, Any, Optional from urllib.parse import quote +from .utils import format_date + class KyooClient: def __init__(self) -> None: @@ -19,7 +22,7 @@ class KyooClient: self._url = os.environ.get("KYOO_URL", "http://back:5000") async def __aenter__(self): - jsons.set_serializer(lambda x, **_: format_date(x), Optional[date | int]) # type: ignore + jsons.set_serializer(lambda x, **_: format_date(x), type[Optional[date | int]]) self.client = ClientSession( json_serialize=lambda *args, **kwargs: jsons.dumps( *args, key_transformer=jsons.KEY_TRANSFORMER_CAMELCASE, **kwargs diff --git a/scanner/providers/provider.py b/scanner/providers/provider.py index 12a7f655..545e2d21 100644 --- a/scanner/providers/provider.py +++ b/scanner/providers/provider.py @@ -43,7 +43,7 @@ class Provider: idmapper.init(tmdb=tmdb, language=languages[0]) - return next(providers), xem + return next(iter(providers)), xem @abstractproperty def name(self) -> str: diff --git a/scanner/requirements.txt b/scanner/requirements.txt index 74801dd9..cdeb31bd 100644 --- a/scanner/requirements.txt +++ b/scanner/requirements.txt @@ -3,3 +3,4 @@ aiohttp jsons watchfiles aio-pika +dataclasses-json diff --git a/scanner/scanner/__init__.py b/scanner/scanner/__init__.py index e049dff0..76951d53 100644 --- a/scanner/scanner/__init__.py +++ b/scanner/scanner/__init__.py @@ -1,18 +1,18 @@ async def main(): + import asyncio + import os import logging - import sys - from providers.provider import Provider + from .monitor import monitor + from .scanner import scan + from .publisher import Publisher from providers.kyoo_client import KyooClient - from .scanner import Scanner - from .subscriber import Subscriber logging.basicConfig(level=logging.INFO) - if len(sys.argv) > 1 and sys.argv[1] == "-v": - logging.basicConfig(level=logging.DEBUG) logging.getLogger("watchfiles").setLevel(logging.WARNING) - logging.getLogger("rebulk").setLevel(logging.WARNING) - async with KyooClient() as kyoo, Subscriber() as sub: - provider, xem = Provider.get_all(kyoo.client) - scanner = Scanner(kyoo, provider, xem) - await sub.listen(scanner) + async with Publisher() as publisher, KyooClient() as client: + path = os.environ.get("SCANNER_LIBRARY_ROOT", "/video") + await asyncio.gather( + monitor(path, publisher), + scan(path, publisher, client), + ) diff --git a/scanner/scanner/__main__.py b/scanner/scanner/__main__.py index ac4e42e3..670779da 100644 --- a/scanner/scanner/__main__.py +++ b/scanner/scanner/__main__.py @@ -1,6 +1,6 @@ #!/usr/bin/env python import asyncio -import scanner +import matcher -asyncio.run(scanner.main()) +asyncio.run(matcher.main()) diff --git a/scanner/monitor/monitor.py b/scanner/scanner/monitor.py similarity index 91% rename from scanner/monitor/monitor.py rename to scanner/scanner/monitor.py index 56ed18b4..1fe24ace 100644 --- a/scanner/monitor/monitor.py +++ b/scanner/scanner/monitor.py @@ -1,7 +1,7 @@ from logging import getLogger from watchfiles import awatch, Change -from monitor.publisher import Publisher +from .publisher import Publisher logger = getLogger(__name__) diff --git a/scanner/monitor/publisher.py b/scanner/scanner/publisher.py similarity index 100% rename from scanner/monitor/publisher.py rename to scanner/scanner/publisher.py diff --git a/scanner/monitor/requirements.txt b/scanner/scanner/requirements.txt similarity index 100% rename from scanner/monitor/requirements.txt rename to scanner/scanner/requirements.txt diff --git a/scanner/scanner/scanner.py b/scanner/scanner/scanner.py index 22010b34..b74ecc14 100644 --- a/scanner/scanner/scanner.py +++ b/scanner/scanner/scanner.py @@ -1,167 +1,36 @@ -from datetime import timedelta +import os +import re import asyncio -import logging -from providers.implementations.thexem import TheXem -from providers.provider import Provider, ProviderError -from providers.types.collection import Collection -from providers.types.show import Show -from providers.types.episode import Episode, PartialShow -from providers.types.season import Season +from logging import getLogger + +from .publisher import Publisher from providers.kyoo_client import KyooClient -from .parser.guess import guessit -from .cache import cache, exec_as_cache, make_key + +logger = getLogger(__name__) -class Scanner: - def __init__(self, client: KyooClient, provider: Provider, xem: TheXem) -> None: - self._client = client - self._provider = provider - self._xem = xem +async def scan(path: str, publisher: Publisher, client: KyooClient): + logger.info("Starting the scan. It can take some times...") + ignore_pattern = None + try: + ignore_pattern = re.compile(os.environ.get("LIBRARY_IGNORE_PATTERN", "")) + except Exception as e: + ignore_pattern = re.compile("") + logger.error(f"Invalid ignore pattern. Ignoring. Error: {e}") - self._collection_cache = {} - self._show_cache = {} - self._season_cache = {} + registered = await client.get_registered_paths() + videos = [ + os.path.join(dir, file) for dir, _, files in os.walk(path) for file in files + ] + to_register = [ + p for p in videos if p not in registered and not ignore_pattern.match(p) + ] + deleted = [x for x in registered if x not in videos] - async def delete(self, path: str): - try: - await self._client.delete(path) - return True - except Exception as e: - logging.exception("Unhandled error", exc_info=e) - return False + if len(deleted) != len(registered): + await asyncio.gather(*map(publisher.delete, deleted)) + elif len(deleted) > 0: + logger.warning("All video files are unavailable. Check your disks.") - async def identify(self, path: str): - try: - await self.identify(path) - await self._client.delete_issue(path) - except ProviderError as e: - logging.error(e) - await self._client.create_issue(path, str(e)) - except Exception as e: - logging.exception("Unhandled error", exc_info=e) - await self._client.create_issue( - path, "Unknown error", {"type": type(e).__name__, "message": str(e)} - ) - return False - return True - - async def _identify(self, path: str): - raw = guessit(path, xem_titles=await self._xem.get_expected_titles()) - - if "mimetype" not in raw or not raw["mimetype"].startswith("video"): - return - # Remove seasons in "One Piece (1999) 152.mkv" for example - if raw.get("season") == raw.get("year") and "season" in raw: - del raw["season"] - - if isinstance(raw.get("season"), list): - raise ProviderError( - f"An episode can't have multiple seasons (found {raw.get('season')} for {path})" - ) - if isinstance(raw.get("episode"), list): - raise ProviderError( - f"Multi-episodes files are not yet supported (for {path})" - ) - - logging.info("Identied %s: %s", path, raw) - - if raw["type"] == "movie": - movie = await self._provider.identify_movie(raw["title"], raw.get("year")) - movie.path = str(path) - logging.debug("Got movie: %s", movie) - movie_id = await self._client.post("movies", data=movie.to_kyoo()) - - if any(movie.collections): - ids = await asyncio.gather( - *(self.create_or_get_collection(x) for x in movie.collections) - ) - await asyncio.gather( - *(self._client.link_collection(x, "movie", movie_id) for x in ids) - ) - elif raw["type"] == "episode": - episode = await self._provider.identify_episode( - raw["title"], - season=raw.get("season"), - episode_nbr=raw.get("episode"), - absolute=raw.get("episode") if "season" not in raw else None, - year=raw.get("year"), - ) - episode.path = str(path) - logging.debug("Got episode: %s", episode) - episode.show_id = await self.create_or_get_show(episode) - - if episode.season_number is not None: - episode.season_id = await self.register_seasons( - episode.show, episode.show_id, episode.season_number - ) - await self._client.post("episodes", data=episode.to_kyoo()) - else: - logging.warn("Unknown video file type: %s", raw["type"]) - - async def create_or_get_collection(self, collection: Collection) -> str: - @cache(ttl=timedelta(days=1), cache=self._collection_cache) - async def create_collection(provider_id: str): - # TODO: Check if a collection with the same metadata id exists already on kyoo. - new_collection = ( - await self._provider.identify_collection(provider_id) - if not any(collection.translations.keys()) - else collection - ) - logging.debug("Got collection: %s", new_collection) - return await self._client.post("collection", data=new_collection.to_kyoo()) - - # The parameter is only used as a key for the cache. - provider_id = collection.external_id[self._provider.name].data_id - return await create_collection(provider_id) - - async def create_or_get_show(self, episode: Episode) -> str: - @cache(ttl=timedelta(days=1), cache=self._show_cache) - async def create_show(_: str): - # TODO: Check if a show with the same metadata id exists already on kyoo. - show = ( - await self._provider.identify_show( - episode.show.external_id[self._provider.name].data_id, - ) - if isinstance(episode.show, PartialShow) - else episode.show - ) - # TODO: collections - logging.debug("Got show: %s", episode) - ret = await self._client.post("show", data=show.to_kyoo()) - - async def create_season(season: Season, id: str): - try: - season.show_id = id - return await self._client.post("seasons", data=season.to_kyoo()) - except Exception as e: - logging.exception("Unhandled error create a season", exc_info=e) - - season_tasks = map( - lambda s: exec_as_cache( - self._season_cache, - make_key((ret, s.season_number)), - lambda: create_season(s, ret), - ), - show.seasons, - ) - await asyncio.gather(*season_tasks) - - return ret - - # The parameter is only used as a key for the cache. - provider_id = episode.show.external_id[self._provider.name].data_id - return await create_show(provider_id) - - async def register_seasons( - self, show: Show | PartialShow, show_id: str, season_number: int - ) -> str: - # We use an external season cache because we want to edit this cache programatically - @cache(ttl=timedelta(days=1), cache=self._season_cache) - async def create_season(_: str, __: int): - season = await self._provider.identify_season( - show.external_id[self._provider.name].data_id, season_number - ) - season.show_id = show_id - return await self._client.post("seasons", data=season.to_kyoo()) - - return await create_season(show_id, season_number) + await asyncio.gather(*map(publisher.add, to_register)) + logger.info("Scan finished.")