From e284f771df930e4e4de23c37de0caafc6199408c Mon Sep 17 00:00:00 2001 From: Zoe Roux Date: Mon, 8 Apr 2024 00:15:02 +0200 Subject: [PATCH] Split fs scaning and metadata matching --- scanner/.dockerignore | 2 + scanner/monitor/__init__.py | 17 +++ scanner/monitor/__main__.py | 6 + scanner/monitor/monitor.py | 19 +++ scanner/monitor/publisher.py | 32 +++++ scanner/monitor/requirements.txt | 1 + scanner/monitor/scanner.py | 32 +++++ scanner/providers/kyoo_client.py | 117 ++++++++++++++++++ scanner/requirements.txt | 1 + scanner/scanner/__init__.py | 12 +- scanner/scanner/monitor.py | 22 ---- scanner/scanner/scanner.py | 203 ++++--------------------------- shell.nix | 1 + 13 files changed, 261 insertions(+), 204 deletions(-) create mode 100644 scanner/.dockerignore create mode 100644 scanner/monitor/__init__.py create mode 100644 scanner/monitor/__main__.py create mode 100644 scanner/monitor/monitor.py create mode 100644 scanner/monitor/publisher.py create mode 100644 scanner/monitor/requirements.txt create mode 100644 scanner/monitor/scanner.py create mode 100644 scanner/providers/kyoo_client.py delete mode 100644 scanner/scanner/monitor.py diff --git a/scanner/.dockerignore b/scanner/.dockerignore new file mode 100644 index 00000000..e136516d --- /dev/null +++ b/scanner/.dockerignore @@ -0,0 +1,2 @@ +Dockerfile* + diff --git a/scanner/monitor/__init__.py b/scanner/monitor/__init__.py new file mode 100644 index 00000000..3716b018 --- /dev/null +++ b/scanner/monitor/__init__.py @@ -0,0 +1,17 @@ +async def main(): + import asyncio + import os + import logging + from .monitor import monitor + from .scanner import scan + from .publisher import Publisher + from providers.kyoo_client import KyooClient + + logging.getLogger("watchfiles").setLevel(logging.WARNING) + + async with Publisher() as publisher, KyooClient() as client: + path = os.environ.get("SCANNER_LIBRARY_ROOT", "/video") + await asyncio.gather( + monitor(path, publisher), + scan(path), + ) diff --git a/scanner/monitor/__main__.py b/scanner/monitor/__main__.py new file mode 100644 index 00000000..b75252d5 --- /dev/null +++ b/scanner/monitor/__main__.py @@ -0,0 +1,6 @@ +#!/usr/bin/env python + +import asyncio +import monitor + +asyncio.run(monitor.main()) diff --git a/scanner/monitor/monitor.py b/scanner/monitor/monitor.py new file mode 100644 index 00000000..56ed18b4 --- /dev/null +++ b/scanner/monitor/monitor.py @@ -0,0 +1,19 @@ +from logging import getLogger +from watchfiles import awatch, Change + +from monitor.publisher import Publisher + +logger = getLogger(__name__) + + +async def monitor(path: str, publisher: Publisher): + async for changes in awatch(path): + for event, file in changes: + if event == Change.added: + await publisher.add(file) + elif event == Change.deleted: + await publisher.delete(file) + elif event == Change.modified: + pass + else: + logger.info(f"Change {event} occured for file {file}") diff --git a/scanner/monitor/publisher.py b/scanner/monitor/publisher.py new file mode 100644 index 00000000..7a99295d --- /dev/null +++ b/scanner/monitor/publisher.py @@ -0,0 +1,32 @@ +import os +from guessit.jsonutils import json +from aio_pika import Message, connect_robust + + +class Publisher: + QUEUE = "scanner" + + async def __aenter__(self): + self._con = await connect_robust( + host=os.environ.get("RABBITMQ_HOST", "rabbitmq"), + login=os.environ.get("RABBITMQ_DEFAULT_USER", "guest"), + password=os.environ.get("RABBITMQ_DEFAULT_PASS", "guest"), + ) + self._channel = await self._con.channel() + self._queue = await self._channel.declare_queue(self.QUEUE) + return self + + async def __aexit__(self, exc_type, exc_value, exc_tb): + await self._con.close() + + async def _publish(self, data: dict): + await self._channel.default_exchange.publish( + Message(json.dumps(data).encode()), + routing_key=self.QUEUE, + ) + + async def add(self, path: str): + await self._publish({"action": "scan", "path": path}) + + async def delete(self, path: str): + await self._publish({"action": "delete", "path": path}) diff --git a/scanner/monitor/requirements.txt b/scanner/monitor/requirements.txt new file mode 100644 index 00000000..e092edcc --- /dev/null +++ b/scanner/monitor/requirements.txt @@ -0,0 +1 @@ +aio-pika diff --git a/scanner/monitor/scanner.py b/scanner/monitor/scanner.py new file mode 100644 index 00000000..30ecf5db --- /dev/null +++ b/scanner/monitor/scanner.py @@ -0,0 +1,32 @@ +from logging import getLogger + +logger = getLogger(__name__) + +async def scan(path: str): + logger.info("Starting the scan. It can take some times...") + registered = await _get_registered_paths() + self.issues = await self.get_issues() + videos = [str(p) for p in Path(path).rglob("*") if p.is_file()] + deleted = [x for x in self.registered if x not in videos] + + # try: + # self._ignore_pattern = re.compile( + # os.environ.get("LIBRARY_IGNORE_PATTERN", "") + # ) + # except Exception as e: + # self._ignore_pattern = re.compile("") + # logging.error(f"Invalid ignore pattern. Ignoring. Error: {e}") + + if len(deleted) != len(self.registered): + for x in deleted: + await self.delete(x) + for x in self.issues: + if x not in videos: + await self.delete(x, "issue") + elif len(deleted) > 0: + logging.warning("All video files are unavailable. Check your disks.") + + # We batch videos by 20 because too mutch at once kinda DDOS everything. + for group in batch(iter(videos), 20): + await asyncio.gather(*map(self.identify, group)) + logging.info("Scan finished.") diff --git a/scanner/providers/kyoo_client.py b/scanner/providers/kyoo_client.py new file mode 100644 index 00000000..dc980a19 --- /dev/null +++ b/scanner/providers/kyoo_client.py @@ -0,0 +1,117 @@ +from datetime import timedelta +import os +import asyncio +import logging +import jsons +import re +from aiohttp import ClientSession +from pathlib import Path +from typing import List, Literal, Any +from urllib.parse import quote +from providers.provider import Provider, ProviderError +from providers.types.collection import Collection +from providers.types.show import Show +from providers.types.episode import Episode, PartialShow +from providers.types.season import Season + + +class KyooClient: + def __init__( + self, client: ClientSession, *, api_key: str + ) -> None: + self._client = client + self._api_key = api_key + self._url = os.environ.get("KYOO_URL", "http://back:5000") + + + async def get_issues(self) -> List[str]: + async with self._client.get( + f"{self._url}/issues", + params={"limit": 0}, + headers={"X-API-Key": self._api_key}, + ) as r: + r.raise_for_status() + ret = await r.json() + return [x["cause"] for x in ret if x["domain"] == "scanner"] + + async def link_collection( + self, collection: str, type: Literal["movie"] | Literal["show"], id: str + ): + async with self._client.put( + f"{self._url}/collections/{collection}/{type}/{id}", + headers={"X-API-Key": self._api_key}, + ) as r: + # Allow 409 and continue as if it worked. + if not r.ok and r.status != 409: + logging.error(f"Request error: {await r.text()}") + r.raise_for_status() + + async def post(self, path: str, *, data: dict[str, Any]) -> str: + logging.debug( + "Sending %s: %s", + path, + jsons.dumps( + data, + key_transformer=jsons.KEY_TRANSFORMER_CAMELCASE, + jdkwargs={"indent": 4}, + ), + ) + async with self._client.post( + f"{self._url}/{path}", + json=data, + headers={"X-API-Key": self._api_key}, + ) as r: + # Allow 409 and continue as if it worked. + if not r.ok and r.status != 409: + logging.error(f"Request error: {await r.text()}") + r.raise_for_status() + ret = await r.json() + + if r.status == 409 and ( + (path == "shows" and ret["startAir"][:4] != str(data["start_air"].year)) + or ( + path == "movies" + and ret["airDate"][:4] != str(data["air_date"].year) + ) + ): + logging.info( + f"Found a {path} with the same slug ({ret['slug']}) and a different date, using the date as part of the slug" + ) + year = (data["start_air"] if path == "movie" else data["air_date"]).year + data["slug"] = f"{ret['slug']}-{year}" + return await self.post(path, data=data) + return ret["id"] + + async def delete( + self, + path: str, + type: Literal["episode", "movie", "issue"] | None = None, + ): + logging.info("Deleting %s", path) + self.registered = filter(lambda x: x != path, self.registered) + + if type is None or type == "movie": + async with self._client.delete( + f'{self._url}/movies?filter=path eq "{quote(path)}"', + headers={"X-API-Key": self._api_key}, + ) as r: + if not r.ok: + logging.error(f"Request error: {await r.text()}") + r.raise_for_status() + + if type is None or type == "episode": + async with self._client.delete( + f'{self._url}/episodes?filter=path eq "{quote(path)}"', + headers={"X-API-Key": self._api_key}, + ) as r: + if not r.ok: + logging.error(f"Request error: {await r.text()}") + r.raise_for_status() + + if path in self.issues: + self.issues = filter(lambda x: x != path, self.issues) + await self._client.delete( + f'{self._url}/issues?filter=domain eq scanner and cause eq "{quote(path)}"', + headers={"X-API-Key": self._api_key}, + ) + diff --git a/scanner/requirements.txt b/scanner/requirements.txt index 5a49dfae..74801dd9 100644 --- a/scanner/requirements.txt +++ b/scanner/requirements.txt @@ -2,3 +2,4 @@ guessit aiohttp jsons watchfiles +aio-pika diff --git a/scanner/scanner/__init__.py b/scanner/scanner/__init__.py index 587f1086..c21a9dde 100644 --- a/scanner/scanner/__init__.py +++ b/scanner/scanner/__init__.py @@ -1,3 +1,6 @@ +from providers.kyoo_client import KyooClient + + async def main(): import asyncio import os @@ -9,7 +12,6 @@ async def main(): from aiohttp import ClientSession from providers.utils import format_date, ProviderError from .scanner import Scanner - from .monitor import monitor path = os.environ.get("SCANNER_LIBRARY_ROOT", "/video") languages = os.environ.get("LIBRARY_LANGUAGES") @@ -37,11 +39,9 @@ async def main(): *args, key_transformer=jsons.KEY_TRANSFORMER_CAMELCASE, **kwargs ), ) as client: + kyoo = KyooClient(client, api_key=api_key) + provider = try: - scanner = Scanner(client, languages=languages.split(","), api_key=api_key) - await asyncio.gather( - monitor(path, scanner), - scanner.scan(path), - ) + scanner = Scanner(kyoo, languages=languages.split(","), api_key=api_key) except ProviderError as e: logging.error(e) diff --git a/scanner/scanner/monitor.py b/scanner/scanner/monitor.py deleted file mode 100644 index 7949541d..00000000 --- a/scanner/scanner/monitor.py +++ /dev/null @@ -1,22 +0,0 @@ -import logging -from watchfiles import awatch, Change -from .utils import ProviderError -from .scanner import Scanner - - -async def monitor(path: str, scanner: Scanner): - async for changes in awatch(path): - for event, file in changes: - try: - if event == Change.added: - await scanner.identify(file) - elif event == Change.deleted: - await scanner.delete(file) - elif event == Change.modified: - pass - else: - print(f"Change {event} occured for file {file}") - except ProviderError as e: - logging.error(str(e)) - except Exception as e: - logging.exception("Unhandled error", exc_info=e) diff --git a/scanner/scanner/scanner.py b/scanner/scanner/scanner.py index 984cde7f..c723f3a9 100644 --- a/scanner/scanner/scanner.py +++ b/scanner/scanner/scanner.py @@ -1,101 +1,33 @@ from datetime import timedelta -import os import asyncio import logging -import jsons -import re -from aiohttp import ClientSession -from pathlib import Path -from typing import List, Literal, Any -from urllib.parse import quote +from providers.implementations.thexem import TheXem from providers.provider import Provider, ProviderError from providers.types.collection import Collection from providers.types.show import Show from providers.types.episode import Episode, PartialShow from providers.types.season import Season +from providers.kyoo_client import KyooClient from .parser.guess import guessit -from .utils import batch, handle_errors +from .utils import handle_errors from .cache import cache, exec_as_cache, make_key class Scanner: - def __init__( - self, client: ClientSession, *, languages: list[str], api_key: str - ) -> None: + def __init__(self, client: KyooClient, provider: Provider, xem: TheXem) -> None: self._client = client - self._api_key = api_key - self._url = os.environ.get("KYOO_URL", "http://back:5000") - try: - self._ignore_pattern = re.compile( - os.environ.get("LIBRARY_IGNORE_PATTERN", "") - ) - except Exception as e: - self._ignore_pattern = re.compile("") - logging.error(f"Invalid ignore pattern. Ignoring. Error: {e}") - [self.provider, *_], self._xem = Provider.get_all(client, languages) - self.languages = languages + self._provider = provider + self._xem = xem self._collection_cache = {} self._show_cache = {} self._season_cache = {} - async def scan(self, path: str): - logging.info("Starting the scan. It can take some times...") - self.registered = await self.get_registered_paths() - self.issues = await self.get_issues() - videos = [str(p) for p in Path(path).rglob("*") if p.is_file()] - deleted = [x for x in self.registered if x not in videos] - - if len(deleted) != len(self.registered): - for x in deleted: - await self.delete(x) - for x in self.issues: - if x not in videos: - await self.delete(x, "issue") - elif len(deleted) > 0: - logging.warning("All video files are unavailable. Check your disks.") - - # We batch videos by 20 because too mutch at once kinda DDOS everything. - for group in batch(iter(videos), 20): - await asyncio.gather(*map(self.identify, group)) - logging.info("Scan finished.") - - async def get_registered_paths(self) -> List[str]: - paths = None - async with self._client.get( - f"{self._url}/episodes", - params={"limit": 0}, - headers={"X-API-Key": self._api_key}, - ) as r: - r.raise_for_status() - ret = await r.json() - paths = list(x["path"] for x in ret["items"]) - - async with self._client.get( - f"{self._url}/movies", - params={"limit": 0}, - headers={"X-API-Key": self._api_key}, - ) as r: - r.raise_for_status() - ret = await r.json() - paths += list(x["path"] for x in ret["items"]) - return paths - - async def get_issues(self) -> List[str]: - async with self._client.get( - f"{self._url}/issues", - params={"limit": 0}, - headers={"X-API-Key": self._api_key}, - ) as r: - r.raise_for_status() - ret = await r.json() - return [x["cause"] for x in ret if x["domain"] == "scanner"] - @handle_errors async def identify(self, path: str): - if path in self.registered or self._ignore_pattern.match(path): - return - + # if path in self.registered or self._ignore_pattern.match(path): + # return + # raw = guessit(path, xem_titles=await self._xem.get_expected_titles()) if "mimetype" not in raw or not raw["mimetype"].startswith("video"): @@ -104,11 +36,11 @@ class Scanner: if raw.get("season") == raw.get("year") and "season" in raw: del raw["season"] - if isinstance(raw.get("season"), List): + if isinstance(raw.get("season"), list): raise ProviderError( f"An episode can't have multiple seasons (found {raw.get('season')} for {path})" ) - if isinstance(raw.get("episode"), List): + if isinstance(raw.get("episode"), list): raise ProviderError( f"Multi-episodes files are not yet supported (for {path})" ) @@ -116,20 +48,20 @@ class Scanner: logging.info("Identied %s: %s", path, raw) if raw["type"] == "movie": - movie = await self.provider.identify_movie(raw["title"], raw.get("year")) + movie = await self._provider.identify_movie(raw["title"], raw.get("year")) movie.path = str(path) logging.debug("Got movie: %s", movie) - movie_id = await self.post("movies", data=movie.to_kyoo()) + movie_id = await self._client.post("movies", data=movie.to_kyoo()) if any(movie.collections): ids = await asyncio.gather( *(self.create_or_get_collection(x) for x in movie.collections) ) await asyncio.gather( - *(self.link_collection(x, "movie", movie_id) for x in ids) + *(self._client.link_collection(x, "movie", movie_id) for x in ids) ) elif raw["type"] == "episode": - episode = await self.provider.identify_episode( + episode = await self._provider.identify_episode( raw["title"], season=raw.get("season"), episode_nbr=raw.get("episode"), @@ -144,7 +76,7 @@ class Scanner: episode.season_id = await self.register_seasons( episode.show, episode.show_id, episode.season_number ) - await self.post("episodes", data=episode.to_kyoo()) + await self._client.post("episodes", data=episode.to_kyoo()) else: logging.warn("Unknown video file type: %s", raw["type"]) @@ -153,48 +85,36 @@ class Scanner: async def create_collection(provider_id: str): # TODO: Check if a collection with the same metadata id exists already on kyoo. new_collection = ( - await self.provider.identify_collection(provider_id) + await self._provider.identify_collection(provider_id) if not any(collection.translations.keys()) else collection ) logging.debug("Got collection: %s", new_collection) - return await self.post("collection", data=new_collection.to_kyoo()) + return await self._client.post("collection", data=new_collection.to_kyoo()) # The parameter is only used as a key for the cache. - provider_id = collection.external_id[self.provider.name].data_id + provider_id = collection.external_id[self._provider.name].data_id return await create_collection(provider_id) - async def link_collection( - self, collection: str, type: Literal["movie"] | Literal["show"], id: str - ): - async with self._client.put( - f"{self._url}/collections/{collection}/{type}/{id}", - headers={"X-API-Key": self._api_key}, - ) as r: - # Allow 409 and continue as if it worked. - if not r.ok and r.status != 409: - logging.error(f"Request error: {await r.text()}") - r.raise_for_status() - async def create_or_get_show(self, episode: Episode) -> str: @cache(ttl=timedelta(days=1), cache=self._show_cache) async def create_show(_: str): # TODO: Check if a show with the same metadata id exists already on kyoo. show = ( - await self.provider.identify_show( - episode.show.external_id[self.provider.name].data_id, + await self._provider.identify_show( + episode.show.external_id[self._provider.name].data_id, ) if isinstance(episode.show, PartialShow) else episode.show ) # TODO: collections logging.debug("Got show: %s", episode) - ret = await self.post("show", data=show.to_kyoo()) + ret = await self._client.post("show", data=show.to_kyoo()) async def create_season(season: Season, id: str): try: season.show_id = id - return await self.post("seasons", data=season.to_kyoo()) + return await self._client.post("seasons", data=season.to_kyoo()) except Exception as e: logging.exception("Unhandled error create a season", exc_info=e) @@ -211,7 +131,7 @@ class Scanner: return ret # The parameter is only used as a key for the cache. - provider_id = episode.show.external_id[self.provider.name].data_id + provider_id = episode.show.external_id[self._provider.name].data_id return await create_show(provider_id) async def register_seasons( @@ -220,79 +140,10 @@ class Scanner: # We use an external season cache because we want to edit this cache programatically @cache(ttl=timedelta(days=1), cache=self._season_cache) async def create_season(_: str, __: int): - season = await self.provider.identify_season( - show.external_id[self.provider.name].data_id, season_number + season = await self._provider.identify_season( + show.external_id[self._provider.name].data_id, season_number ) season.show_id = show_id - return await self.post("seasons", data=season.to_kyoo()) + return await self._client.post("seasons", data=season.to_kyoo()) return await create_season(show_id, season_number) - - async def post(self, path: str, *, data: dict[str, Any]) -> str: - logging.debug( - "Sending %s: %s", - path, - jsons.dumps( - data, - key_transformer=jsons.KEY_TRANSFORMER_CAMELCASE, - jdkwargs={"indent": 4}, - ), - ) - async with self._client.post( - f"{self._url}/{path}", - json=data, - headers={"X-API-Key": self._api_key}, - ) as r: - # Allow 409 and continue as if it worked. - if not r.ok and r.status != 409: - logging.error(f"Request error: {await r.text()}") - r.raise_for_status() - ret = await r.json() - - if r.status == 409 and ( - (path == "shows" and ret["startAir"][:4] != str(data["start_air"].year)) - or ( - path == "movies" - and ret["airDate"][:4] != str(data["air_date"].year) - ) - ): - logging.info( - f"Found a {path} with the same slug ({ret['slug']}) and a different date, using the date as part of the slug" - ) - year = (data["start_air"] if path == "movie" else data["air_date"]).year - data["slug"] = f"{ret['slug']}-{year}" - return await self.post(path, data=data) - return ret["id"] - - async def delete( - self, - path: str, - type: Literal["episode", "movie", "issue"] | None = None, - ): - logging.info("Deleting %s", path) - self.registered = filter(lambda x: x != path, self.registered) - - if type is None or type == "movie": - async with self._client.delete( - f'{self._url}/movies?filter=path eq "{quote(path)}"', - headers={"X-API-Key": self._api_key}, - ) as r: - if not r.ok: - logging.error(f"Request error: {await r.text()}") - r.raise_for_status() - - if type is None or type == "episode": - async with self._client.delete( - f'{self._url}/episodes?filter=path eq "{quote(path)}"', - headers={"X-API-Key": self._api_key}, - ) as r: - if not r.ok: - logging.error(f"Request error: {await r.text()}") - r.raise_for_status() - - if path in self.issues: - self.issues = filter(lambda x: x != path, self.issues) - await self._client.delete( - f'{self._url}/issues?filter=domain eq scanner and cause eq "{quote(path)}"', - headers={"X-API-Key": self._api_key}, - ) diff --git a/shell.nix b/shell.nix index 7c1b81ee..482414f3 100644 --- a/shell.nix +++ b/shell.nix @@ -6,6 +6,7 @@ jsons watchfiles pika + aio-pika requests dataclasses-json ]);