From e284f771df930e4e4de23c37de0caafc6199408c Mon Sep 17 00:00:00 2001 From: Zoe Roux Date: Mon, 8 Apr 2024 00:15:02 +0200 Subject: [PATCH 01/12] Split fs scaning and metadata matching --- scanner/.dockerignore | 2 + scanner/monitor/__init__.py | 17 +++ scanner/monitor/__main__.py | 6 + scanner/monitor/monitor.py | 19 +++ scanner/monitor/publisher.py | 32 +++++ scanner/monitor/requirements.txt | 1 + scanner/monitor/scanner.py | 32 +++++ scanner/providers/kyoo_client.py | 117 ++++++++++++++++++ scanner/requirements.txt | 1 + scanner/scanner/__init__.py | 12 +- scanner/scanner/monitor.py | 22 ---- scanner/scanner/scanner.py | 203 ++++--------------------------- shell.nix | 1 + 13 files changed, 261 insertions(+), 204 deletions(-) create mode 100644 scanner/.dockerignore create mode 100644 scanner/monitor/__init__.py create mode 100644 scanner/monitor/__main__.py create mode 100644 scanner/monitor/monitor.py create mode 100644 scanner/monitor/publisher.py create mode 100644 scanner/monitor/requirements.txt create mode 100644 scanner/monitor/scanner.py create mode 100644 scanner/providers/kyoo_client.py delete mode 100644 scanner/scanner/monitor.py diff --git a/scanner/.dockerignore b/scanner/.dockerignore new file mode 100644 index 00000000..e136516d --- /dev/null +++ b/scanner/.dockerignore @@ -0,0 +1,2 @@ +Dockerfile* + diff --git a/scanner/monitor/__init__.py b/scanner/monitor/__init__.py new file mode 100644 index 00000000..3716b018 --- /dev/null +++ b/scanner/monitor/__init__.py @@ -0,0 +1,17 @@ +async def main(): + import asyncio + import os + import logging + from .monitor import monitor + from .scanner import scan + from .publisher import Publisher + from providers.kyoo_client import KyooClient + + logging.getLogger("watchfiles").setLevel(logging.WARNING) + + async with Publisher() as publisher, KyooClient() as client: + path = os.environ.get("SCANNER_LIBRARY_ROOT", "/video") + await asyncio.gather( + monitor(path, publisher), + scan(path), + ) diff --git a/scanner/monitor/__main__.py b/scanner/monitor/__main__.py new file mode 100644 index 00000000..b75252d5 --- /dev/null +++ b/scanner/monitor/__main__.py @@ -0,0 +1,6 @@ +#!/usr/bin/env python + +import asyncio +import monitor + +asyncio.run(monitor.main()) diff --git a/scanner/monitor/monitor.py b/scanner/monitor/monitor.py new file mode 100644 index 00000000..56ed18b4 --- /dev/null +++ b/scanner/monitor/monitor.py @@ -0,0 +1,19 @@ +from logging import getLogger +from watchfiles import awatch, Change + +from monitor.publisher import Publisher + +logger = getLogger(__name__) + + +async def monitor(path: str, publisher: Publisher): + async for changes in awatch(path): + for event, file in changes: + if event == Change.added: + await publisher.add(file) + elif event == Change.deleted: + await publisher.delete(file) + elif event == Change.modified: + pass + else: + logger.info(f"Change {event} occured for file {file}") diff --git a/scanner/monitor/publisher.py b/scanner/monitor/publisher.py new file mode 100644 index 00000000..7a99295d --- /dev/null +++ b/scanner/monitor/publisher.py @@ -0,0 +1,32 @@ +import os +from guessit.jsonutils import json +from aio_pika import Message, connect_robust + + +class Publisher: + QUEUE = "scanner" + + async def __aenter__(self): + self._con = await connect_robust( + host=os.environ.get("RABBITMQ_HOST", "rabbitmq"), + login=os.environ.get("RABBITMQ_DEFAULT_USER", "guest"), + password=os.environ.get("RABBITMQ_DEFAULT_PASS", "guest"), + ) + self._channel = await self._con.channel() + self._queue = await self._channel.declare_queue(self.QUEUE) + return self + + async def __aexit__(self, exc_type, exc_value, exc_tb): + await self._con.close() + + async def _publish(self, data: dict): + await self._channel.default_exchange.publish( + Message(json.dumps(data).encode()), + routing_key=self.QUEUE, + ) + + async def add(self, path: str): + await self._publish({"action": "scan", "path": path}) + + async def delete(self, path: str): + await self._publish({"action": "delete", "path": path}) diff --git a/scanner/monitor/requirements.txt b/scanner/monitor/requirements.txt new file mode 100644 index 00000000..e092edcc --- /dev/null +++ b/scanner/monitor/requirements.txt @@ -0,0 +1 @@ +aio-pika diff --git a/scanner/monitor/scanner.py b/scanner/monitor/scanner.py new file mode 100644 index 00000000..30ecf5db --- /dev/null +++ b/scanner/monitor/scanner.py @@ -0,0 +1,32 @@ +from logging import getLogger + +logger = getLogger(__name__) + +async def scan(path: str): + logger.info("Starting the scan. It can take some times...") + registered = await _get_registered_paths() + self.issues = await self.get_issues() + videos = [str(p) for p in Path(path).rglob("*") if p.is_file()] + deleted = [x for x in self.registered if x not in videos] + + # try: + # self._ignore_pattern = re.compile( + # os.environ.get("LIBRARY_IGNORE_PATTERN", "") + # ) + # except Exception as e: + # self._ignore_pattern = re.compile("") + # logging.error(f"Invalid ignore pattern. Ignoring. Error: {e}") + + if len(deleted) != len(self.registered): + for x in deleted: + await self.delete(x) + for x in self.issues: + if x not in videos: + await self.delete(x, "issue") + elif len(deleted) > 0: + logging.warning("All video files are unavailable. Check your disks.") + + # We batch videos by 20 because too mutch at once kinda DDOS everything. + for group in batch(iter(videos), 20): + await asyncio.gather(*map(self.identify, group)) + logging.info("Scan finished.") diff --git a/scanner/providers/kyoo_client.py b/scanner/providers/kyoo_client.py new file mode 100644 index 00000000..dc980a19 --- /dev/null +++ b/scanner/providers/kyoo_client.py @@ -0,0 +1,117 @@ +from datetime import timedelta +import os +import asyncio +import logging +import jsons +import re +from aiohttp import ClientSession +from pathlib import Path +from typing import List, Literal, Any +from urllib.parse import quote +from providers.provider import Provider, ProviderError +from providers.types.collection import Collection +from providers.types.show import Show +from providers.types.episode import Episode, PartialShow +from providers.types.season import Season + + +class KyooClient: + def __init__( + self, client: ClientSession, *, api_key: str + ) -> None: + self._client = client + self._api_key = api_key + self._url = os.environ.get("KYOO_URL", "http://back:5000") + + + async def get_issues(self) -> List[str]: + async with self._client.get( + f"{self._url}/issues", + params={"limit": 0}, + headers={"X-API-Key": self._api_key}, + ) as r: + r.raise_for_status() + ret = await r.json() + return [x["cause"] for x in ret if x["domain"] == "scanner"] + + async def link_collection( + self, collection: str, type: Literal["movie"] | Literal["show"], id: str + ): + async with self._client.put( + f"{self._url}/collections/{collection}/{type}/{id}", + headers={"X-API-Key": self._api_key}, + ) as r: + # Allow 409 and continue as if it worked. + if not r.ok and r.status != 409: + logging.error(f"Request error: {await r.text()}") + r.raise_for_status() + + async def post(self, path: str, *, data: dict[str, Any]) -> str: + logging.debug( + "Sending %s: %s", + path, + jsons.dumps( + data, + key_transformer=jsons.KEY_TRANSFORMER_CAMELCASE, + jdkwargs={"indent": 4}, + ), + ) + async with self._client.post( + f"{self._url}/{path}", + json=data, + headers={"X-API-Key": self._api_key}, + ) as r: + # Allow 409 and continue as if it worked. + if not r.ok and r.status != 409: + logging.error(f"Request error: {await r.text()}") + r.raise_for_status() + ret = await r.json() + + if r.status == 409 and ( + (path == "shows" and ret["startAir"][:4] != str(data["start_air"].year)) + or ( + path == "movies" + and ret["airDate"][:4] != str(data["air_date"].year) + ) + ): + logging.info( + f"Found a {path} with the same slug ({ret['slug']}) and a different date, using the date as part of the slug" + ) + year = (data["start_air"] if path == "movie" else data["air_date"]).year + data["slug"] = f"{ret['slug']}-{year}" + return await self.post(path, data=data) + return ret["id"] + + async def delete( + self, + path: str, + type: Literal["episode", "movie", "issue"] | None = None, + ): + logging.info("Deleting %s", path) + self.registered = filter(lambda x: x != path, self.registered) + + if type is None or type == "movie": + async with self._client.delete( + f'{self._url}/movies?filter=path eq "{quote(path)}"', + headers={"X-API-Key": self._api_key}, + ) as r: + if not r.ok: + logging.error(f"Request error: {await r.text()}") + r.raise_for_status() + + if type is None or type == "episode": + async with self._client.delete( + f'{self._url}/episodes?filter=path eq "{quote(path)}"', + headers={"X-API-Key": self._api_key}, + ) as r: + if not r.ok: + logging.error(f"Request error: {await r.text()}") + r.raise_for_status() + + if path in self.issues: + self.issues = filter(lambda x: x != path, self.issues) + await self._client.delete( + f'{self._url}/issues?filter=domain eq scanner and cause eq "{quote(path)}"', + headers={"X-API-Key": self._api_key}, + ) + diff --git a/scanner/requirements.txt b/scanner/requirements.txt index 5a49dfae..74801dd9 100644 --- a/scanner/requirements.txt +++ b/scanner/requirements.txt @@ -2,3 +2,4 @@ guessit aiohttp jsons watchfiles +aio-pika diff --git a/scanner/scanner/__init__.py b/scanner/scanner/__init__.py index 587f1086..c21a9dde 100644 --- a/scanner/scanner/__init__.py +++ b/scanner/scanner/__init__.py @@ -1,3 +1,6 @@ +from providers.kyoo_client import KyooClient + + async def main(): import asyncio import os @@ -9,7 +12,6 @@ async def main(): from aiohttp import ClientSession from providers.utils import format_date, ProviderError from .scanner import Scanner - from .monitor import monitor path = os.environ.get("SCANNER_LIBRARY_ROOT", "/video") languages = os.environ.get("LIBRARY_LANGUAGES") @@ -37,11 +39,9 @@ async def main(): *args, key_transformer=jsons.KEY_TRANSFORMER_CAMELCASE, **kwargs ), ) as client: + kyoo = KyooClient(client, api_key=api_key) + provider = try: - scanner = Scanner(client, languages=languages.split(","), api_key=api_key) - await asyncio.gather( - monitor(path, scanner), - scanner.scan(path), - ) + scanner = Scanner(kyoo, languages=languages.split(","), api_key=api_key) except ProviderError as e: logging.error(e) diff --git a/scanner/scanner/monitor.py b/scanner/scanner/monitor.py deleted file mode 100644 index 7949541d..00000000 --- a/scanner/scanner/monitor.py +++ /dev/null @@ -1,22 +0,0 @@ -import logging -from watchfiles import awatch, Change -from .utils import ProviderError -from .scanner import Scanner - - -async def monitor(path: str, scanner: Scanner): - async for changes in awatch(path): - for event, file in changes: - try: - if event == Change.added: - await scanner.identify(file) - elif event == Change.deleted: - await scanner.delete(file) - elif event == Change.modified: - pass - else: - print(f"Change {event} occured for file {file}") - except ProviderError as e: - logging.error(str(e)) - except Exception as e: - logging.exception("Unhandled error", exc_info=e) diff --git a/scanner/scanner/scanner.py b/scanner/scanner/scanner.py index 984cde7f..c723f3a9 100644 --- a/scanner/scanner/scanner.py +++ b/scanner/scanner/scanner.py @@ -1,101 +1,33 @@ from datetime import timedelta -import os import asyncio import logging -import jsons -import re -from aiohttp import ClientSession -from pathlib import Path -from typing import List, Literal, Any -from urllib.parse import quote +from providers.implementations.thexem import TheXem from providers.provider import Provider, ProviderError from providers.types.collection import Collection from providers.types.show import Show from providers.types.episode import Episode, PartialShow from providers.types.season import Season +from providers.kyoo_client import KyooClient from .parser.guess import guessit -from .utils import batch, handle_errors +from .utils import handle_errors from .cache import cache, exec_as_cache, make_key class Scanner: - def __init__( - self, client: ClientSession, *, languages: list[str], api_key: str - ) -> None: + def __init__(self, client: KyooClient, provider: Provider, xem: TheXem) -> None: self._client = client - self._api_key = api_key - self._url = os.environ.get("KYOO_URL", "http://back:5000") - try: - self._ignore_pattern = re.compile( - os.environ.get("LIBRARY_IGNORE_PATTERN", "") - ) - except Exception as e: - self._ignore_pattern = re.compile("") - logging.error(f"Invalid ignore pattern. Ignoring. Error: {e}") - [self.provider, *_], self._xem = Provider.get_all(client, languages) - self.languages = languages + self._provider = provider + self._xem = xem self._collection_cache = {} self._show_cache = {} self._season_cache = {} - async def scan(self, path: str): - logging.info("Starting the scan. It can take some times...") - self.registered = await self.get_registered_paths() - self.issues = await self.get_issues() - videos = [str(p) for p in Path(path).rglob("*") if p.is_file()] - deleted = [x for x in self.registered if x not in videos] - - if len(deleted) != len(self.registered): - for x in deleted: - await self.delete(x) - for x in self.issues: - if x not in videos: - await self.delete(x, "issue") - elif len(deleted) > 0: - logging.warning("All video files are unavailable. Check your disks.") - - # We batch videos by 20 because too mutch at once kinda DDOS everything. - for group in batch(iter(videos), 20): - await asyncio.gather(*map(self.identify, group)) - logging.info("Scan finished.") - - async def get_registered_paths(self) -> List[str]: - paths = None - async with self._client.get( - f"{self._url}/episodes", - params={"limit": 0}, - headers={"X-API-Key": self._api_key}, - ) as r: - r.raise_for_status() - ret = await r.json() - paths = list(x["path"] for x in ret["items"]) - - async with self._client.get( - f"{self._url}/movies", - params={"limit": 0}, - headers={"X-API-Key": self._api_key}, - ) as r: - r.raise_for_status() - ret = await r.json() - paths += list(x["path"] for x in ret["items"]) - return paths - - async def get_issues(self) -> List[str]: - async with self._client.get( - f"{self._url}/issues", - params={"limit": 0}, - headers={"X-API-Key": self._api_key}, - ) as r: - r.raise_for_status() - ret = await r.json() - return [x["cause"] for x in ret if x["domain"] == "scanner"] - @handle_errors async def identify(self, path: str): - if path in self.registered or self._ignore_pattern.match(path): - return - + # if path in self.registered or self._ignore_pattern.match(path): + # return + # raw = guessit(path, xem_titles=await self._xem.get_expected_titles()) if "mimetype" not in raw or not raw["mimetype"].startswith("video"): @@ -104,11 +36,11 @@ class Scanner: if raw.get("season") == raw.get("year") and "season" in raw: del raw["season"] - if isinstance(raw.get("season"), List): + if isinstance(raw.get("season"), list): raise ProviderError( f"An episode can't have multiple seasons (found {raw.get('season')} for {path})" ) - if isinstance(raw.get("episode"), List): + if isinstance(raw.get("episode"), list): raise ProviderError( f"Multi-episodes files are not yet supported (for {path})" ) @@ -116,20 +48,20 @@ class Scanner: logging.info("Identied %s: %s", path, raw) if raw["type"] == "movie": - movie = await self.provider.identify_movie(raw["title"], raw.get("year")) + movie = await self._provider.identify_movie(raw["title"], raw.get("year")) movie.path = str(path) logging.debug("Got movie: %s", movie) - movie_id = await self.post("movies", data=movie.to_kyoo()) + movie_id = await self._client.post("movies", data=movie.to_kyoo()) if any(movie.collections): ids = await asyncio.gather( *(self.create_or_get_collection(x) for x in movie.collections) ) await asyncio.gather( - *(self.link_collection(x, "movie", movie_id) for x in ids) + *(self._client.link_collection(x, "movie", movie_id) for x in ids) ) elif raw["type"] == "episode": - episode = await self.provider.identify_episode( + episode = await self._provider.identify_episode( raw["title"], season=raw.get("season"), episode_nbr=raw.get("episode"), @@ -144,7 +76,7 @@ class Scanner: episode.season_id = await self.register_seasons( episode.show, episode.show_id, episode.season_number ) - await self.post("episodes", data=episode.to_kyoo()) + await self._client.post("episodes", data=episode.to_kyoo()) else: logging.warn("Unknown video file type: %s", raw["type"]) @@ -153,48 +85,36 @@ class Scanner: async def create_collection(provider_id: str): # TODO: Check if a collection with the same metadata id exists already on kyoo. new_collection = ( - await self.provider.identify_collection(provider_id) + await self._provider.identify_collection(provider_id) if not any(collection.translations.keys()) else collection ) logging.debug("Got collection: %s", new_collection) - return await self.post("collection", data=new_collection.to_kyoo()) + return await self._client.post("collection", data=new_collection.to_kyoo()) # The parameter is only used as a key for the cache. - provider_id = collection.external_id[self.provider.name].data_id + provider_id = collection.external_id[self._provider.name].data_id return await create_collection(provider_id) - async def link_collection( - self, collection: str, type: Literal["movie"] | Literal["show"], id: str - ): - async with self._client.put( - f"{self._url}/collections/{collection}/{type}/{id}", - headers={"X-API-Key": self._api_key}, - ) as r: - # Allow 409 and continue as if it worked. - if not r.ok and r.status != 409: - logging.error(f"Request error: {await r.text()}") - r.raise_for_status() - async def create_or_get_show(self, episode: Episode) -> str: @cache(ttl=timedelta(days=1), cache=self._show_cache) async def create_show(_: str): # TODO: Check if a show with the same metadata id exists already on kyoo. show = ( - await self.provider.identify_show( - episode.show.external_id[self.provider.name].data_id, + await self._provider.identify_show( + episode.show.external_id[self._provider.name].data_id, ) if isinstance(episode.show, PartialShow) else episode.show ) # TODO: collections logging.debug("Got show: %s", episode) - ret = await self.post("show", data=show.to_kyoo()) + ret = await self._client.post("show", data=show.to_kyoo()) async def create_season(season: Season, id: str): try: season.show_id = id - return await self.post("seasons", data=season.to_kyoo()) + return await self._client.post("seasons", data=season.to_kyoo()) except Exception as e: logging.exception("Unhandled error create a season", exc_info=e) @@ -211,7 +131,7 @@ class Scanner: return ret # The parameter is only used as a key for the cache. - provider_id = episode.show.external_id[self.provider.name].data_id + provider_id = episode.show.external_id[self._provider.name].data_id return await create_show(provider_id) async def register_seasons( @@ -220,79 +140,10 @@ class Scanner: # We use an external season cache because we want to edit this cache programatically @cache(ttl=timedelta(days=1), cache=self._season_cache) async def create_season(_: str, __: int): - season = await self.provider.identify_season( - show.external_id[self.provider.name].data_id, season_number + season = await self._provider.identify_season( + show.external_id[self._provider.name].data_id, season_number ) season.show_id = show_id - return await self.post("seasons", data=season.to_kyoo()) + return await self._client.post("seasons", data=season.to_kyoo()) return await create_season(show_id, season_number) - - async def post(self, path: str, *, data: dict[str, Any]) -> str: - logging.debug( - "Sending %s: %s", - path, - jsons.dumps( - data, - key_transformer=jsons.KEY_TRANSFORMER_CAMELCASE, - jdkwargs={"indent": 4}, - ), - ) - async with self._client.post( - f"{self._url}/{path}", - json=data, - headers={"X-API-Key": self._api_key}, - ) as r: - # Allow 409 and continue as if it worked. - if not r.ok and r.status != 409: - logging.error(f"Request error: {await r.text()}") - r.raise_for_status() - ret = await r.json() - - if r.status == 409 and ( - (path == "shows" and ret["startAir"][:4] != str(data["start_air"].year)) - or ( - path == "movies" - and ret["airDate"][:4] != str(data["air_date"].year) - ) - ): - logging.info( - f"Found a {path} with the same slug ({ret['slug']}) and a different date, using the date as part of the slug" - ) - year = (data["start_air"] if path == "movie" else data["air_date"]).year - data["slug"] = f"{ret['slug']}-{year}" - return await self.post(path, data=data) - return ret["id"] - - async def delete( - self, - path: str, - type: Literal["episode", "movie", "issue"] | None = None, - ): - logging.info("Deleting %s", path) - self.registered = filter(lambda x: x != path, self.registered) - - if type is None or type == "movie": - async with self._client.delete( - f'{self._url}/movies?filter=path eq "{quote(path)}"', - headers={"X-API-Key": self._api_key}, - ) as r: - if not r.ok: - logging.error(f"Request error: {await r.text()}") - r.raise_for_status() - - if type is None or type == "episode": - async with self._client.delete( - f'{self._url}/episodes?filter=path eq "{quote(path)}"', - headers={"X-API-Key": self._api_key}, - ) as r: - if not r.ok: - logging.error(f"Request error: {await r.text()}") - r.raise_for_status() - - if path in self.issues: - self.issues = filter(lambda x: x != path, self.issues) - await self._client.delete( - f'{self._url}/issues?filter=domain eq scanner and cause eq "{quote(path)}"', - headers={"X-API-Key": self._api_key}, - ) diff --git a/shell.nix b/shell.nix index 7c1b81ee..482414f3 100644 --- a/shell.nix +++ b/shell.nix @@ -6,6 +6,7 @@ jsons watchfiles pika + aio-pika requests dataclasses-json ]); From a18fa7ebade0af79154a275b336dd458c9eaeb30 Mon Sep 17 00:00:00 2001 From: Zoe Roux Date: Mon, 8 Apr 2024 22:33:37 +0200 Subject: [PATCH 02/12] Finish client/scanner split --- autosync/autosync/__init__.py | 2 +- autosync/autosync/models/message.py | 4 +- scanner/Dockerfile | 2 +- scanner/monitor/__init__.py | 1 + scanner/monitor/scanner.py | 3 ++ scanner/providers/kyoo_client.py | 68 +++++++++++++++++------------ scanner/providers/provider.py | 15 +++---- scanner/scanner/__init__.py | 45 ++++--------------- scanner/scanner/scanner.py | 28 +++++++++--- scanner/scanner/subscriber.py | 52 ++++++++++++++++++++++ scanner/scanner/utils.py | 56 ------------------------ 11 files changed, 138 insertions(+), 138 deletions(-) create mode 100644 scanner/scanner/subscriber.py delete mode 100644 scanner/scanner/utils.py diff --git a/autosync/autosync/__init__.py b/autosync/autosync/__init__.py index e6e44ec2..8b1eda73 100644 --- a/autosync/autosync/__init__.py +++ b/autosync/autosync/__init__.py @@ -35,7 +35,7 @@ def on_message( body: bytes, ): try: - message = Message.from_json(body) # type: Message + message = Message.from_json(body) service.update(message.value.user, message.value.resource, message.value) except Exception as e: logging.exception("Error processing message.", exc_info=e) diff --git a/autosync/autosync/models/message.py b/autosync/autosync/models/message.py index 94b3312e..d24f39fc 100644 --- a/autosync/autosync/models/message.py +++ b/autosync/autosync/models/message.py @@ -1,5 +1,5 @@ from dataclasses import dataclass -from dataclasses_json import dataclass_json, LetterCase +from dataclasses_json import DataClassJsonMixin, dataclass_json, LetterCase from autosync.models.episode import Episode from autosync.models.movie import Movie @@ -17,7 +17,7 @@ class WatchStatusMessage(WatchStatus): @dataclass_json(letter_case=LetterCase.CAMEL) @dataclass -class Message: +class Message(DataClassJsonMixin): action: str type: str value: WatchStatusMessage diff --git a/scanner/Dockerfile b/scanner/Dockerfile index 372016df..785dd1bc 100644 --- a/scanner/Dockerfile +++ b/scanner/Dockerfile @@ -5,4 +5,4 @@ COPY ./requirements.txt . RUN pip3 install -r ./requirements.txt COPY . . -ENTRYPOINT ["python3", "-m", "scanner", "-v"] +ENTRYPOINT ["python3", "-m", "scanner"] diff --git a/scanner/monitor/__init__.py b/scanner/monitor/__init__.py index 3716b018..5e3b3366 100644 --- a/scanner/monitor/__init__.py +++ b/scanner/monitor/__init__.py @@ -7,6 +7,7 @@ async def main(): from .publisher import Publisher from providers.kyoo_client import KyooClient + logging.basicConfig(level=logging.INFO) logging.getLogger("watchfiles").setLevel(logging.WARNING) async with Publisher() as publisher, KyooClient() as client: diff --git a/scanner/monitor/scanner.py b/scanner/monitor/scanner.py index 30ecf5db..dc8611ed 100644 --- a/scanner/monitor/scanner.py +++ b/scanner/monitor/scanner.py @@ -8,6 +8,9 @@ async def scan(path: str): self.issues = await self.get_issues() videos = [str(p) for p in Path(path).rglob("*") if p.is_file()] deleted = [x for x in self.registered if x not in videos] + # if path in self.registered or self._ignore_pattern.match(path): + # return + # # try: # self._ignore_pattern = re.compile( diff --git a/scanner/providers/kyoo_client.py b/scanner/providers/kyoo_client.py index dc980a19..5a3af9b0 100644 --- a/scanner/providers/kyoo_client.py +++ b/scanner/providers/kyoo_client.py @@ -1,31 +1,37 @@ -from datetime import timedelta import os -import asyncio import logging import jsons -import re from aiohttp import ClientSession -from pathlib import Path from typing import List, Literal, Any from urllib.parse import quote -from providers.provider import Provider, ProviderError -from providers.types.collection import Collection -from providers.types.show import Show -from providers.types.episode import Episode, PartialShow -from providers.types.season import Season class KyooClient: - def __init__( - self, client: ClientSession, *, api_key: str - ) -> None: - self._client = client - self._api_key = api_key + def __init__(self) -> None: + self._api_key = os.environ.get("KYOO_APIKEY") + if not self._api_key: + self._api_key = os.environ.get("KYOO_APIKEYS") + if not self._api_key: + print("Missing environment variable 'KYOO_APIKEY'.") + exit(2) + self._api_key = self._api_key.split(",")[0] + self._url = os.environ.get("KYOO_URL", "http://back:5000") + async def __aenter__(self): + jsons.set_serializer(lambda x, **_: format_date(x), Optional[date | int]) # type: ignore + self.client = ClientSession( + json_serialize=lambda *args, **kwargs: jsons.dumps( + *args, key_transformer=jsons.KEY_TRANSFORMER_CAMELCASE, **kwargs + ), + ) + return self + + async def __aexit__(self, exc_type, exc_value, exc_tb): + await self.client.close() async def get_issues(self) -> List[str]: - async with self._client.get( + async with self.client.get( f"{self._url}/issues", params={"limit": 0}, headers={"X-API-Key": self._api_key}, @@ -34,10 +40,23 @@ class KyooClient: ret = await r.json() return [x["cause"] for x in ret if x["domain"] == "scanner"] + async def create_issue(self, path: str, issue: str, extra: dict | None = None): + await self.client.post( + f"{self._url}/issues", + json={"domain": "scanner", "cause": path, "reason": issue, "extra": extra}, + headers={"X-API-Key": self._api_key}, + ) + + async def delete_issue(self, path: str): + await self.client.delete( + f'{self._url}/issues?filter=domain eq scanner and cause eq "{path}"', + headers={"X-API-Key": self._api_key}, + ) + async def link_collection( self, collection: str, type: Literal["movie"] | Literal["show"], id: str ): - async with self._client.put( + async with self.client.put( f"{self._url}/collections/{collection}/{type}/{id}", headers={"X-API-Key": self._api_key}, ) as r: @@ -56,7 +75,7 @@ class KyooClient: jdkwargs={"indent": 4}, ), ) - async with self._client.post( + async with self.client.post( f"{self._url}/{path}", json=data, headers={"X-API-Key": self._api_key}, @@ -85,13 +104,12 @@ class KyooClient: async def delete( self, path: str, - type: Literal["episode", "movie", "issue"] | None = None, + type: Literal["episode", "movie"] | None = None, ): logging.info("Deleting %s", path) - self.registered = filter(lambda x: x != path, self.registered) if type is None or type == "movie": - async with self._client.delete( + async with self.client.delete( f'{self._url}/movies?filter=path eq "{quote(path)}"', headers={"X-API-Key": self._api_key}, ) as r: @@ -100,7 +118,7 @@ class KyooClient: r.raise_for_status() if type is None or type == "episode": - async with self._client.delete( + async with self.client.delete( f'{self._url}/episodes?filter=path eq "{quote(path)}"', headers={"X-API-Key": self._api_key}, ) as r: @@ -108,10 +126,4 @@ class KyooClient: logging.error(f"Request error: {await r.text()}") r.raise_for_status() - if path in self.issues: - self.issues = filter(lambda x: x != path, self.issues) - await self._client.delete( - f'{self._url}/issues?filter=domain eq scanner and cause eq "{quote(path)}"', - headers={"X-API-Key": self._api_key}, - ) - + await self.delete_issue(path) diff --git a/scanner/providers/provider.py b/scanner/providers/provider.py index f9e449e4..12a7f655 100644 --- a/scanner/providers/provider.py +++ b/scanner/providers/provider.py @@ -1,7 +1,7 @@ import os from aiohttp import ClientSession from abc import abstractmethod, abstractproperty -from typing import Optional, TypeVar +from typing import Optional, Self from providers.implementations.thexem import TheXem from providers.utils import ProviderError @@ -13,14 +13,13 @@ from .types.movie import Movie from .types.collection import Collection -Self = TypeVar("Self", bound="Provider") - - class Provider: @classmethod - def get_all( - cls: type[Self], client: ClientSession, languages: list[str] - ) -> tuple[list[Self], TheXem]: + def get_all(cls, client: ClientSession) -> tuple[Self, TheXem]: + languages = os.environ.get("LIBRARY_LANGUAGES") + if not languages: + print("Missing environment variable 'LIBRARY_LANGUAGES'.") + exit(2) providers = [] from providers.idmapper import IdMapper @@ -44,7 +43,7 @@ class Provider: idmapper.init(tmdb=tmdb, language=languages[0]) - return providers, xem + return next(providers), xem @abstractproperty def name(self) -> str: diff --git a/scanner/scanner/__init__.py b/scanner/scanner/__init__.py index c21a9dde..e049dff0 100644 --- a/scanner/scanner/__init__.py +++ b/scanner/scanner/__init__.py @@ -1,47 +1,18 @@ -from providers.kyoo_client import KyooClient - - async def main(): - import asyncio - import os import logging import sys - import jsons - from datetime import date - from typing import Optional - from aiohttp import ClientSession - from providers.utils import format_date, ProviderError + from providers.provider import Provider + from providers.kyoo_client import KyooClient from .scanner import Scanner + from .subscriber import Subscriber - path = os.environ.get("SCANNER_LIBRARY_ROOT", "/video") - languages = os.environ.get("LIBRARY_LANGUAGES") - if not languages: - print("Missing environment variable 'LIBRARY_LANGUAGES'.") - exit(2) - api_key = os.environ.get("KYOO_APIKEY") - if not api_key: - api_key = os.environ.get("KYOO_APIKEYS") - if not api_key: - print("Missing environment variable 'KYOO_APIKEY'.") - exit(2) - api_key = api_key.split(",")[0] - + logging.basicConfig(level=logging.INFO) if len(sys.argv) > 1 and sys.argv[1] == "-v": - logging.basicConfig(level=logging.INFO) - if len(sys.argv) > 1 and sys.argv[1] == "-vv": logging.basicConfig(level=logging.DEBUG) logging.getLogger("watchfiles").setLevel(logging.WARNING) logging.getLogger("rebulk").setLevel(logging.WARNING) - jsons.set_serializer(lambda x, **_: format_date(x), Optional[date | int]) # type: ignore - async with ClientSession( - json_serialize=lambda *args, **kwargs: jsons.dumps( - *args, key_transformer=jsons.KEY_TRANSFORMER_CAMELCASE, **kwargs - ), - ) as client: - kyoo = KyooClient(client, api_key=api_key) - provider = - try: - scanner = Scanner(kyoo, languages=languages.split(","), api_key=api_key) - except ProviderError as e: - logging.error(e) + async with KyooClient() as kyoo, Subscriber() as sub: + provider, xem = Provider.get_all(kyoo.client) + scanner = Scanner(kyoo, provider, xem) + await sub.listen(scanner) diff --git a/scanner/scanner/scanner.py b/scanner/scanner/scanner.py index c723f3a9..22010b34 100644 --- a/scanner/scanner/scanner.py +++ b/scanner/scanner/scanner.py @@ -9,7 +9,6 @@ from providers.types.episode import Episode, PartialShow from providers.types.season import Season from providers.kyoo_client import KyooClient from .parser.guess import guessit -from .utils import handle_errors from .cache import cache, exec_as_cache, make_key @@ -23,11 +22,30 @@ class Scanner: self._show_cache = {} self._season_cache = {} - @handle_errors + async def delete(self, path: str): + try: + await self._client.delete(path) + return True + except Exception as e: + logging.exception("Unhandled error", exc_info=e) + return False + async def identify(self, path: str): - # if path in self.registered or self._ignore_pattern.match(path): - # return - # + try: + await self.identify(path) + await self._client.delete_issue(path) + except ProviderError as e: + logging.error(e) + await self._client.create_issue(path, str(e)) + except Exception as e: + logging.exception("Unhandled error", exc_info=e) + await self._client.create_issue( + path, "Unknown error", {"type": type(e).__name__, "message": str(e)} + ) + return False + return True + + async def _identify(self, path: str): raw = guessit(path, xem_titles=await self._xem.get_expected_titles()) if "mimetype" not in raw or not raw["mimetype"].startswith("video"): diff --git a/scanner/scanner/subscriber.py b/scanner/scanner/subscriber.py new file mode 100644 index 00000000..55ef3ae4 --- /dev/null +++ b/scanner/scanner/subscriber.py @@ -0,0 +1,52 @@ +from dataclasses import dataclass +from dataclasses_json import DataClassJsonMixin +from typing import Literal +import os +import logging +from aio_pika import connect_robust +from aio_pika.abc import AbstractIncomingMessage + +from scanner.scanner import Scanner + +logger = logging.getLogger(__name__) + +@dataclass +class Message(DataClassJsonMixin): + action: Literal["scan"] | Literal["delete"] + path: str + + +class Subscriber: + QUEUE = "scanner" + + async def __aenter__(self): + self._con = await connect_robust( + host=os.environ.get("RABBITMQ_HOST", "rabbitmq"), + login=os.environ.get("RABBITMQ_DEFAULT_USER", "guest"), + password=os.environ.get("RABBITMQ_DEFAULT_PASS", "guest"), + ) + self._channel = await self._con.channel() + self._queue = await self._channel.declare_queue(self.QUEUE) + return self + + async def __aexit__(self, exc_type, exc_value, exc_tb): + await self._con.close() + + async def listen(self, scanner: Scanner): + async def on_message(message: AbstractIncomingMessage): + async with message.process(): + msg = Message.from_json(message.body) + ack = False + match msg.action: + case "scan": + ack = await scanner.identify(msg.path) + case "delete": + ack = await scanner.delete(msg.path) + case _: + logger.error(f"Invalid action: {msg.action}") + if ack: + await message.ack() + else: + await message.nack(requeue=False) + + await self._queue.consume(on_message, no_ack=True) diff --git a/scanner/scanner/utils.py b/scanner/scanner/utils.py deleted file mode 100644 index ae3de92a..00000000 --- a/scanner/scanner/utils.py +++ /dev/null @@ -1,56 +0,0 @@ -from __future__ import annotations -import logging -from functools import wraps -from itertools import islice -from typing import TYPE_CHECKING, Iterator, List, TypeVar -from providers.utils import ProviderError - -if TYPE_CHECKING: - from scanner.scanner import Scanner - - -T = TypeVar("T") - - -def batch(iterable: Iterator[T], n: int) -> Iterator[List[T]]: - "Batch data into lists of length n. The last batch may be shorter." - # batched('ABCDEFG', 3) --> ABC DEF G - it = iter(iterable) - while True: - batch = list(islice(it, n)) - if not batch: - return - yield batch - - -def handle_errors(f): - @wraps(f) - async def internal(self: Scanner, path: str): - try: - await f(self, path) - if path in self.issues: - await self._client.delete( - f'{self._url}/issues?filter=domain eq scanner and cause eq "{path}"', - headers={"X-API-Key": self._api_key}, - ) - except ProviderError as e: - logging.error(str(e)) - await self._client.post( - f"{self._url}/issues", - json={"domain": "scanner", "cause": path, "reason": str(e)}, - headers={"X-API-Key": self._api_key}, - ) - except Exception as e: - logging.exception("Unhandled error", exc_info=e) - await self._client.post( - f"{self._url}/issues", - json={ - "domain": "scanner", - "cause": path, - "reason": "Unknown error", - "extra": {"type": type(e).__name__, "message": str(e)}, - }, - headers={"X-API-Key": self._api_key}, - ) - - return internal From 52380bcb29ec0c935b184978da71abc1c78d9dc9 Mon Sep 17 00:00:00 2001 From: Zoe Roux Date: Tue, 9 Apr 2024 00:07:31 +0200 Subject: [PATCH 03/12] Update scan method --- scanner/monitor/__init__.py | 2 +- scanner/monitor/scanner.py | 52 ++++++++++++++++---------------- scanner/providers/kyoo_client.py | 17 +++++++++-- 3 files changed, 41 insertions(+), 30 deletions(-) diff --git a/scanner/monitor/__init__.py b/scanner/monitor/__init__.py index 5e3b3366..76951d53 100644 --- a/scanner/monitor/__init__.py +++ b/scanner/monitor/__init__.py @@ -14,5 +14,5 @@ async def main(): path = os.environ.get("SCANNER_LIBRARY_ROOT", "/video") await asyncio.gather( monitor(path, publisher), - scan(path), + scan(path, publisher, client), ) diff --git a/scanner/monitor/scanner.py b/scanner/monitor/scanner.py index dc8611ed..1d7cf844 100644 --- a/scanner/monitor/scanner.py +++ b/scanner/monitor/scanner.py @@ -1,35 +1,35 @@ +import os +import re +import asyncio from logging import getLogger +from pathlib import Path + +from monitor.publisher import Publisher +from providers.kyoo_client import KyooClient logger = getLogger(__name__) -async def scan(path: str): + +async def scan(path: str, publisher: Publisher, client: KyooClient): logger.info("Starting the scan. It can take some times...") - registered = await _get_registered_paths() - self.issues = await self.get_issues() + ignore_pattern = None + try: + ignore_pattern = re.compile(os.environ.get("LIBRARY_IGNORE_PATTERN", "")) + except Exception as e: + ignore_pattern = re.compile("") + logger.error(f"Invalid ignore pattern. Ignoring. Error: {e}") + + registered = await client.get_registered_paths() videos = [str(p) for p in Path(path).rglob("*") if p.is_file()] - deleted = [x for x in self.registered if x not in videos] - # if path in self.registered or self._ignore_pattern.match(path): - # return - # + to_register = [ + p for p in videos if p not in registered and not ignore_pattern.match(p) + ] + deleted = [x for x in registered if x not in videos] - # try: - # self._ignore_pattern = re.compile( - # os.environ.get("LIBRARY_IGNORE_PATTERN", "") - # ) - # except Exception as e: - # self._ignore_pattern = re.compile("") - # logging.error(f"Invalid ignore pattern. Ignoring. Error: {e}") - - if len(deleted) != len(self.registered): - for x in deleted: - await self.delete(x) - for x in self.issues: - if x not in videos: - await self.delete(x, "issue") + if len(deleted) != len(registered): + await asyncio.gather(*map(publisher.delete, deleted)) elif len(deleted) > 0: - logging.warning("All video files are unavailable. Check your disks.") + logger.warning("All video files are unavailable. Check your disks.") - # We batch videos by 20 because too mutch at once kinda DDOS everything. - for group in batch(iter(videos), 20): - await asyncio.gather(*map(self.identify, group)) - logging.info("Scan finished.") + await asyncio.gather(*map(publisher.add, to_register)) + logger.info("Scan finished.") diff --git a/scanner/providers/kyoo_client.py b/scanner/providers/kyoo_client.py index 5a3af9b0..3e7b2d9c 100644 --- a/scanner/providers/kyoo_client.py +++ b/scanner/providers/kyoo_client.py @@ -30,15 +30,26 @@ class KyooClient: async def __aexit__(self, exc_type, exc_value, exc_tb): await self.client.close() - async def get_issues(self) -> List[str]: + async def get_registered_paths(self) -> List[str]: + paths = None async with self.client.get( - f"{self._url}/issues", + f"{self._url}/episodes", params={"limit": 0}, headers={"X-API-Key": self._api_key}, ) as r: r.raise_for_status() ret = await r.json() - return [x["cause"] for x in ret if x["domain"] == "scanner"] + paths = list(x["path"] for x in ret["items"]) + + async with self.client.get( + f"{self._url}/movies", + params={"limit": 0}, + headers={"X-API-Key": self._api_key}, + ) as r: + r.raise_for_status() + ret = await r.json() + paths += list(x["path"] for x in ret["items"]) + return paths async def create_issue(self, path: str, issue: str, extra: dict | None = None): await self.client.post( From 8da6085df0774d0fa6e0fb17b7a8ce516ec4162e Mon Sep 17 00:00:00 2001 From: Zoe Roux Date: Tue, 9 Apr 2024 00:22:45 +0200 Subject: [PATCH 04/12] Process 20 messages at the same time --- scanner/scanner/subscriber.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/scanner/scanner/subscriber.py b/scanner/scanner/subscriber.py index 55ef3ae4..eaca56fe 100644 --- a/scanner/scanner/subscriber.py +++ b/scanner/scanner/subscriber.py @@ -49,4 +49,7 @@ class Subscriber: else: await message.nack(requeue=False) - await self._queue.consume(on_message, no_ack=True) + # Allow up to 20 scan requests to run in parallel on the same listener. + # Since most work is calling API not doing that is a waste. + await self._channel.set_qos(prefetch_count=20) + await self._queue.consume(on_message) From 3bb0565f18f66c98ce6f0e0c56f213c2d009afd8 Mon Sep 17 00:00:00 2001 From: Zoe Roux Date: Tue, 9 Apr 2024 16:58:35 +0200 Subject: [PATCH 05/12] Use os.walk instead of Path.rglob to fix #409 --- scanner/monitor/scanner.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/scanner/monitor/scanner.py b/scanner/monitor/scanner.py index 1d7cf844..b4dba07d 100644 --- a/scanner/monitor/scanner.py +++ b/scanner/monitor/scanner.py @@ -2,7 +2,6 @@ import os import re import asyncio from logging import getLogger -from pathlib import Path from monitor.publisher import Publisher from providers.kyoo_client import KyooClient @@ -20,7 +19,9 @@ async def scan(path: str, publisher: Publisher, client: KyooClient): logger.error(f"Invalid ignore pattern. Ignoring. Error: {e}") registered = await client.get_registered_paths() - videos = [str(p) for p in Path(path).rglob("*") if p.is_file()] + videos = [ + os.path.join(dir, file) for dir, _, files in os.walk(path) for file in files + ] to_register = [ p for p in videos if p not in registered and not ignore_pattern.match(p) ] From 27dfb71838385aec80647789a9913b7d8f41b89a Mon Sep 17 00:00:00 2001 From: Zoe Roux Date: Tue, 9 Apr 2024 22:22:22 +0200 Subject: [PATCH 06/12] Rename scanner modules --- docker-compose.dev.yml | 12 ++ scanner/Dockerfile | 3 +- scanner/matcher/__init__.py | 18 ++ scanner/matcher/__main__.py | 6 + scanner/{scanner => matcher}/cache.py | 0 scanner/matcher/matcher.py | 167 ++++++++++++++++ scanner/{scanner => matcher}/parser/guess.py | 0 scanner/{scanner => matcher}/parser/rules.py | 0 scanner/{scanner => matcher}/subscriber.py | 6 +- scanner/monitor/__init__.py | 18 -- scanner/monitor/__main__.py | 6 - scanner/monitor/scanner.py | 36 ---- .../implementations/themoviedatabase.py | 2 +- scanner/providers/implementations/thexem.py | 2 +- scanner/providers/kyoo_client.py | 7 +- scanner/providers/provider.py | 2 +- scanner/requirements.txt | 1 + scanner/scanner/__init__.py | 22 +- scanner/scanner/__main__.py | 4 +- scanner/{monitor => scanner}/monitor.py | 2 +- scanner/{monitor => scanner}/publisher.py | 0 scanner/{monitor => scanner}/requirements.txt | 0 scanner/scanner/scanner.py | 189 +++--------------- 23 files changed, 260 insertions(+), 243 deletions(-) create mode 100644 scanner/matcher/__init__.py create mode 100644 scanner/matcher/__main__.py rename scanner/{scanner => matcher}/cache.py (100%) create mode 100644 scanner/matcher/matcher.py rename scanner/{scanner => matcher}/parser/guess.py (100%) rename scanner/{scanner => matcher}/parser/rules.py (100%) rename scanner/{scanner => matcher}/subscriber.py (92%) delete mode 100644 scanner/monitor/__init__.py delete mode 100644 scanner/monitor/__main__.py delete mode 100644 scanner/monitor/scanner.py rename scanner/{monitor => scanner}/monitor.py (91%) rename scanner/{monitor => scanner}/publisher.py (100%) rename scanner/{monitor => scanner}/requirements.txt (100%) diff --git a/docker-compose.dev.yml b/docker-compose.dev.yml index 3856779c..caf8138d 100644 --- a/docker-compose.dev.yml +++ b/docker-compose.dev.yml @@ -84,6 +84,18 @@ services: volumes: - ${LIBRARY_ROOT}:/video:ro + matcher: + build: ./scanner + command: "matcher" + restart: on-failure + depends_on: + back: + condition: service_healthy + env_file: + - ./.env + environment: + - KYOO_URL=${KYOO_URL:-http://back:5000} + autosync: build: ./autosync restart: on-failure diff --git a/scanner/Dockerfile b/scanner/Dockerfile index 785dd1bc..162663e2 100644 --- a/scanner/Dockerfile +++ b/scanner/Dockerfile @@ -5,4 +5,5 @@ COPY ./requirements.txt . RUN pip3 install -r ./requirements.txt COPY . . -ENTRYPOINT ["python3", "-m", "scanner"] +ENTRYPOINT ["python3", "-m"] +CMD ["scanner"] diff --git a/scanner/matcher/__init__.py b/scanner/matcher/__init__.py new file mode 100644 index 00000000..195a1336 --- /dev/null +++ b/scanner/matcher/__init__.py @@ -0,0 +1,18 @@ +async def main(): + import logging + import sys + from providers.provider import Provider + from providers.kyoo_client import KyooClient + from .matcher import Matcher + from .subscriber import Subscriber + + logging.basicConfig(level=logging.INFO) + if len(sys.argv) > 1 and sys.argv[1] == "-v": + logging.basicConfig(level=logging.DEBUG) + logging.getLogger("watchfiles").setLevel(logging.WARNING) + logging.getLogger("rebulk").setLevel(logging.WARNING) + + async with KyooClient() as kyoo, Subscriber() as sub: + provider, xem = Provider.get_all(kyoo.client) + scanner = Matcher(kyoo, provider, xem) + await sub.listen(scanner) diff --git a/scanner/matcher/__main__.py b/scanner/matcher/__main__.py new file mode 100644 index 00000000..670779da --- /dev/null +++ b/scanner/matcher/__main__.py @@ -0,0 +1,6 @@ +#!/usr/bin/env python + +import asyncio +import matcher + +asyncio.run(matcher.main()) diff --git a/scanner/scanner/cache.py b/scanner/matcher/cache.py similarity index 100% rename from scanner/scanner/cache.py rename to scanner/matcher/cache.py diff --git a/scanner/matcher/matcher.py b/scanner/matcher/matcher.py new file mode 100644 index 00000000..534f0d8e --- /dev/null +++ b/scanner/matcher/matcher.py @@ -0,0 +1,167 @@ +from datetime import timedelta +import asyncio +import logging +from providers.implementations.thexem import TheXem +from providers.provider import Provider, ProviderError +from providers.types.collection import Collection +from providers.types.show import Show +from providers.types.episode import Episode, PartialShow +from providers.types.season import Season +from providers.kyoo_client import KyooClient +from .parser.guess import guessit +from .cache import cache, exec_as_cache, make_key + + +class Matcher: + def __init__(self, client: KyooClient, provider: Provider, xem: TheXem) -> None: + self._client = client + self._provider = provider + self._xem = xem + + self._collection_cache = {} + self._show_cache = {} + self._season_cache = {} + + async def delete(self, path: str): + try: + await self._client.delete(path) + return True + except Exception as e: + logging.exception("Unhandled error", exc_info=e) + return False + + async def identify(self, path: str): + try: + await self.identify(path) + await self._client.delete_issue(path) + except ProviderError as e: + logging.error(e) + await self._client.create_issue(path, str(e)) + except Exception as e: + logging.exception("Unhandled error", exc_info=e) + await self._client.create_issue( + path, "Unknown error", {"type": type(e).__name__, "message": str(e)} + ) + return False + return True + + async def _identify(self, path: str): + raw = guessit(path, xem_titles=await self._xem.get_expected_titles()) + + if "mimetype" not in raw or not raw["mimetype"].startswith("video"): + return + # Remove seasons in "One Piece (1999) 152.mkv" for example + if raw.get("season") == raw.get("year") and "season" in raw: + del raw["season"] + + if isinstance(raw.get("season"), list): + raise ProviderError( + f"An episode can't have multiple seasons (found {raw.get('season')} for {path})" + ) + if isinstance(raw.get("episode"), list): + raise ProviderError( + f"Multi-episodes files are not yet supported (for {path})" + ) + + logging.info("Identied %s: %s", path, raw) + + if raw["type"] == "movie": + movie = await self._provider.identify_movie(raw["title"], raw.get("year")) + movie.path = str(path) + logging.debug("Got movie: %s", movie) + movie_id = await self._client.post("movies", data=movie.to_kyoo()) + + if any(movie.collections): + ids = await asyncio.gather( + *(self.create_or_get_collection(x) for x in movie.collections) + ) + await asyncio.gather( + *(self._client.link_collection(x, "movie", movie_id) for x in ids) + ) + elif raw["type"] == "episode": + episode = await self._provider.identify_episode( + raw["title"], + season=raw.get("season"), + episode_nbr=raw.get("episode"), + absolute=raw.get("episode") if "season" not in raw else None, + year=raw.get("year"), + ) + episode.path = str(path) + logging.debug("Got episode: %s", episode) + episode.show_id = await self.create_or_get_show(episode) + + if episode.season_number is not None: + episode.season_id = await self.register_seasons( + episode.show, episode.show_id, episode.season_number + ) + await self._client.post("episodes", data=episode.to_kyoo()) + else: + logging.warn("Unknown video file type: %s", raw["type"]) + + async def create_or_get_collection(self, collection: Collection) -> str: + @cache(ttl=timedelta(days=1), cache=self._collection_cache) + async def create_collection(provider_id: str): + # TODO: Check if a collection with the same metadata id exists already on kyoo. + new_collection = ( + await self._provider.identify_collection(provider_id) + if not any(collection.translations.keys()) + else collection + ) + logging.debug("Got collection: %s", new_collection) + return await self._client.post("collection", data=new_collection.to_kyoo()) + + # The parameter is only used as a key for the cache. + provider_id = collection.external_id[self._provider.name].data_id + return await create_collection(provider_id) + + async def create_or_get_show(self, episode: Episode) -> str: + @cache(ttl=timedelta(days=1), cache=self._show_cache) + async def create_show(_: str): + # TODO: Check if a show with the same metadata id exists already on kyoo. + show = ( + await self._provider.identify_show( + episode.show.external_id[self._provider.name].data_id, + ) + if isinstance(episode.show, PartialShow) + else episode.show + ) + # TODO: collections + logging.debug("Got show: %s", episode) + ret = await self._client.post("show", data=show.to_kyoo()) + + async def create_season(season: Season, id: str): + try: + season.show_id = id + return await self._client.post("seasons", data=season.to_kyoo()) + except Exception as e: + logging.exception("Unhandled error create a season", exc_info=e) + + season_tasks = map( + lambda s: exec_as_cache( + self._season_cache, + make_key((ret, s.season_number)), + lambda: create_season(s, ret), + ), + show.seasons, + ) + await asyncio.gather(*season_tasks) + + return ret + + # The parameter is only used as a key for the cache. + provider_id = episode.show.external_id[self._provider.name].data_id + return await create_show(provider_id) + + async def register_seasons( + self, show: Show | PartialShow, show_id: str, season_number: int + ) -> str: + # We use an external season cache because we want to edit this cache programatically + @cache(ttl=timedelta(days=1), cache=self._season_cache) + async def create_season(_: str, __: int): + season = await self._provider.identify_season( + show.external_id[self._provider.name].data_id, season_number + ) + season.show_id = show_id + return await self._client.post("seasons", data=season.to_kyoo()) + + return await create_season(show_id, season_number) diff --git a/scanner/scanner/parser/guess.py b/scanner/matcher/parser/guess.py similarity index 100% rename from scanner/scanner/parser/guess.py rename to scanner/matcher/parser/guess.py diff --git a/scanner/scanner/parser/rules.py b/scanner/matcher/parser/rules.py similarity index 100% rename from scanner/scanner/parser/rules.py rename to scanner/matcher/parser/rules.py diff --git a/scanner/scanner/subscriber.py b/scanner/matcher/subscriber.py similarity index 92% rename from scanner/scanner/subscriber.py rename to scanner/matcher/subscriber.py index eaca56fe..035c09ee 100644 --- a/scanner/scanner/subscriber.py +++ b/scanner/matcher/subscriber.py @@ -6,13 +6,13 @@ import logging from aio_pika import connect_robust from aio_pika.abc import AbstractIncomingMessage -from scanner.scanner import Scanner +from matcher.matcher import Matcher logger = logging.getLogger(__name__) @dataclass class Message(DataClassJsonMixin): - action: Literal["scan"] | Literal["delete"] + action: Literal["scan", "delete"] path: str @@ -32,7 +32,7 @@ class Subscriber: async def __aexit__(self, exc_type, exc_value, exc_tb): await self._con.close() - async def listen(self, scanner: Scanner): + async def listen(self, scanner: Matcher): async def on_message(message: AbstractIncomingMessage): async with message.process(): msg = Message.from_json(message.body) diff --git a/scanner/monitor/__init__.py b/scanner/monitor/__init__.py deleted file mode 100644 index 76951d53..00000000 --- a/scanner/monitor/__init__.py +++ /dev/null @@ -1,18 +0,0 @@ -async def main(): - import asyncio - import os - import logging - from .monitor import monitor - from .scanner import scan - from .publisher import Publisher - from providers.kyoo_client import KyooClient - - logging.basicConfig(level=logging.INFO) - logging.getLogger("watchfiles").setLevel(logging.WARNING) - - async with Publisher() as publisher, KyooClient() as client: - path = os.environ.get("SCANNER_LIBRARY_ROOT", "/video") - await asyncio.gather( - monitor(path, publisher), - scan(path, publisher, client), - ) diff --git a/scanner/monitor/__main__.py b/scanner/monitor/__main__.py deleted file mode 100644 index b75252d5..00000000 --- a/scanner/monitor/__main__.py +++ /dev/null @@ -1,6 +0,0 @@ -#!/usr/bin/env python - -import asyncio -import monitor - -asyncio.run(monitor.main()) diff --git a/scanner/monitor/scanner.py b/scanner/monitor/scanner.py deleted file mode 100644 index b4dba07d..00000000 --- a/scanner/monitor/scanner.py +++ /dev/null @@ -1,36 +0,0 @@ -import os -import re -import asyncio -from logging import getLogger - -from monitor.publisher import Publisher -from providers.kyoo_client import KyooClient - -logger = getLogger(__name__) - - -async def scan(path: str, publisher: Publisher, client: KyooClient): - logger.info("Starting the scan. It can take some times...") - ignore_pattern = None - try: - ignore_pattern = re.compile(os.environ.get("LIBRARY_IGNORE_PATTERN", "")) - except Exception as e: - ignore_pattern = re.compile("") - logger.error(f"Invalid ignore pattern. Ignoring. Error: {e}") - - registered = await client.get_registered_paths() - videos = [ - os.path.join(dir, file) for dir, _, files in os.walk(path) for file in files - ] - to_register = [ - p for p in videos if p not in registered and not ignore_pattern.match(p) - ] - deleted = [x for x in registered if x not in videos] - - if len(deleted) != len(registered): - await asyncio.gather(*map(publisher.delete, deleted)) - elif len(deleted) > 0: - logger.warning("All video files are unavailable. Check your disks.") - - await asyncio.gather(*map(publisher.add, to_register)) - logger.info("Scan finished.") diff --git a/scanner/providers/implementations/themoviedatabase.py b/scanner/providers/implementations/themoviedatabase.py index eb1971fa..b8b9c310 100644 --- a/scanner/providers/implementations/themoviedatabase.py +++ b/scanner/providers/implementations/themoviedatabase.py @@ -8,7 +8,7 @@ from itertools import accumulate, zip_longest from providers.idmapper import IdMapper from providers.implementations.thexem import TheXem from providers.utils import ProviderError -from scanner.cache import cache +from matcher.cache import cache from ..provider import Provider from ..types.movie import Movie, MovieTranslation, Status as MovieStatus diff --git a/scanner/providers/implementations/thexem.py b/scanner/providers/implementations/thexem.py index 565ae1fc..8fcc75b6 100644 --- a/scanner/providers/implementations/thexem.py +++ b/scanner/providers/implementations/thexem.py @@ -5,7 +5,7 @@ from aiohttp import ClientSession from datetime import timedelta from providers.utils import ProviderError -from scanner.cache import cache +from matcher.cache import cache def clean(s: str): diff --git a/scanner/providers/kyoo_client.py b/scanner/providers/kyoo_client.py index 3e7b2d9c..ec271817 100644 --- a/scanner/providers/kyoo_client.py +++ b/scanner/providers/kyoo_client.py @@ -2,9 +2,12 @@ import os import logging import jsons from aiohttp import ClientSession -from typing import List, Literal, Any +from datetime import date +from typing import List, Literal, Any, Optional from urllib.parse import quote +from .utils import format_date + class KyooClient: def __init__(self) -> None: @@ -19,7 +22,7 @@ class KyooClient: self._url = os.environ.get("KYOO_URL", "http://back:5000") async def __aenter__(self): - jsons.set_serializer(lambda x, **_: format_date(x), Optional[date | int]) # type: ignore + jsons.set_serializer(lambda x, **_: format_date(x), type[Optional[date | int]]) self.client = ClientSession( json_serialize=lambda *args, **kwargs: jsons.dumps( *args, key_transformer=jsons.KEY_TRANSFORMER_CAMELCASE, **kwargs diff --git a/scanner/providers/provider.py b/scanner/providers/provider.py index 12a7f655..545e2d21 100644 --- a/scanner/providers/provider.py +++ b/scanner/providers/provider.py @@ -43,7 +43,7 @@ class Provider: idmapper.init(tmdb=tmdb, language=languages[0]) - return next(providers), xem + return next(iter(providers)), xem @abstractproperty def name(self) -> str: diff --git a/scanner/requirements.txt b/scanner/requirements.txt index 74801dd9..cdeb31bd 100644 --- a/scanner/requirements.txt +++ b/scanner/requirements.txt @@ -3,3 +3,4 @@ aiohttp jsons watchfiles aio-pika +dataclasses-json diff --git a/scanner/scanner/__init__.py b/scanner/scanner/__init__.py index e049dff0..76951d53 100644 --- a/scanner/scanner/__init__.py +++ b/scanner/scanner/__init__.py @@ -1,18 +1,18 @@ async def main(): + import asyncio + import os import logging - import sys - from providers.provider import Provider + from .monitor import monitor + from .scanner import scan + from .publisher import Publisher from providers.kyoo_client import KyooClient - from .scanner import Scanner - from .subscriber import Subscriber logging.basicConfig(level=logging.INFO) - if len(sys.argv) > 1 and sys.argv[1] == "-v": - logging.basicConfig(level=logging.DEBUG) logging.getLogger("watchfiles").setLevel(logging.WARNING) - logging.getLogger("rebulk").setLevel(logging.WARNING) - async with KyooClient() as kyoo, Subscriber() as sub: - provider, xem = Provider.get_all(kyoo.client) - scanner = Scanner(kyoo, provider, xem) - await sub.listen(scanner) + async with Publisher() as publisher, KyooClient() as client: + path = os.environ.get("SCANNER_LIBRARY_ROOT", "/video") + await asyncio.gather( + monitor(path, publisher), + scan(path, publisher, client), + ) diff --git a/scanner/scanner/__main__.py b/scanner/scanner/__main__.py index ac4e42e3..670779da 100644 --- a/scanner/scanner/__main__.py +++ b/scanner/scanner/__main__.py @@ -1,6 +1,6 @@ #!/usr/bin/env python import asyncio -import scanner +import matcher -asyncio.run(scanner.main()) +asyncio.run(matcher.main()) diff --git a/scanner/monitor/monitor.py b/scanner/scanner/monitor.py similarity index 91% rename from scanner/monitor/monitor.py rename to scanner/scanner/monitor.py index 56ed18b4..1fe24ace 100644 --- a/scanner/monitor/monitor.py +++ b/scanner/scanner/monitor.py @@ -1,7 +1,7 @@ from logging import getLogger from watchfiles import awatch, Change -from monitor.publisher import Publisher +from .publisher import Publisher logger = getLogger(__name__) diff --git a/scanner/monitor/publisher.py b/scanner/scanner/publisher.py similarity index 100% rename from scanner/monitor/publisher.py rename to scanner/scanner/publisher.py diff --git a/scanner/monitor/requirements.txt b/scanner/scanner/requirements.txt similarity index 100% rename from scanner/monitor/requirements.txt rename to scanner/scanner/requirements.txt diff --git a/scanner/scanner/scanner.py b/scanner/scanner/scanner.py index 22010b34..b74ecc14 100644 --- a/scanner/scanner/scanner.py +++ b/scanner/scanner/scanner.py @@ -1,167 +1,36 @@ -from datetime import timedelta +import os +import re import asyncio -import logging -from providers.implementations.thexem import TheXem -from providers.provider import Provider, ProviderError -from providers.types.collection import Collection -from providers.types.show import Show -from providers.types.episode import Episode, PartialShow -from providers.types.season import Season +from logging import getLogger + +from .publisher import Publisher from providers.kyoo_client import KyooClient -from .parser.guess import guessit -from .cache import cache, exec_as_cache, make_key + +logger = getLogger(__name__) -class Scanner: - def __init__(self, client: KyooClient, provider: Provider, xem: TheXem) -> None: - self._client = client - self._provider = provider - self._xem = xem +async def scan(path: str, publisher: Publisher, client: KyooClient): + logger.info("Starting the scan. It can take some times...") + ignore_pattern = None + try: + ignore_pattern = re.compile(os.environ.get("LIBRARY_IGNORE_PATTERN", "")) + except Exception as e: + ignore_pattern = re.compile("") + logger.error(f"Invalid ignore pattern. Ignoring. Error: {e}") - self._collection_cache = {} - self._show_cache = {} - self._season_cache = {} + registered = await client.get_registered_paths() + videos = [ + os.path.join(dir, file) for dir, _, files in os.walk(path) for file in files + ] + to_register = [ + p for p in videos if p not in registered and not ignore_pattern.match(p) + ] + deleted = [x for x in registered if x not in videos] - async def delete(self, path: str): - try: - await self._client.delete(path) - return True - except Exception as e: - logging.exception("Unhandled error", exc_info=e) - return False + if len(deleted) != len(registered): + await asyncio.gather(*map(publisher.delete, deleted)) + elif len(deleted) > 0: + logger.warning("All video files are unavailable. Check your disks.") - async def identify(self, path: str): - try: - await self.identify(path) - await self._client.delete_issue(path) - except ProviderError as e: - logging.error(e) - await self._client.create_issue(path, str(e)) - except Exception as e: - logging.exception("Unhandled error", exc_info=e) - await self._client.create_issue( - path, "Unknown error", {"type": type(e).__name__, "message": str(e)} - ) - return False - return True - - async def _identify(self, path: str): - raw = guessit(path, xem_titles=await self._xem.get_expected_titles()) - - if "mimetype" not in raw or not raw["mimetype"].startswith("video"): - return - # Remove seasons in "One Piece (1999) 152.mkv" for example - if raw.get("season") == raw.get("year") and "season" in raw: - del raw["season"] - - if isinstance(raw.get("season"), list): - raise ProviderError( - f"An episode can't have multiple seasons (found {raw.get('season')} for {path})" - ) - if isinstance(raw.get("episode"), list): - raise ProviderError( - f"Multi-episodes files are not yet supported (for {path})" - ) - - logging.info("Identied %s: %s", path, raw) - - if raw["type"] == "movie": - movie = await self._provider.identify_movie(raw["title"], raw.get("year")) - movie.path = str(path) - logging.debug("Got movie: %s", movie) - movie_id = await self._client.post("movies", data=movie.to_kyoo()) - - if any(movie.collections): - ids = await asyncio.gather( - *(self.create_or_get_collection(x) for x in movie.collections) - ) - await asyncio.gather( - *(self._client.link_collection(x, "movie", movie_id) for x in ids) - ) - elif raw["type"] == "episode": - episode = await self._provider.identify_episode( - raw["title"], - season=raw.get("season"), - episode_nbr=raw.get("episode"), - absolute=raw.get("episode") if "season" not in raw else None, - year=raw.get("year"), - ) - episode.path = str(path) - logging.debug("Got episode: %s", episode) - episode.show_id = await self.create_or_get_show(episode) - - if episode.season_number is not None: - episode.season_id = await self.register_seasons( - episode.show, episode.show_id, episode.season_number - ) - await self._client.post("episodes", data=episode.to_kyoo()) - else: - logging.warn("Unknown video file type: %s", raw["type"]) - - async def create_or_get_collection(self, collection: Collection) -> str: - @cache(ttl=timedelta(days=1), cache=self._collection_cache) - async def create_collection(provider_id: str): - # TODO: Check if a collection with the same metadata id exists already on kyoo. - new_collection = ( - await self._provider.identify_collection(provider_id) - if not any(collection.translations.keys()) - else collection - ) - logging.debug("Got collection: %s", new_collection) - return await self._client.post("collection", data=new_collection.to_kyoo()) - - # The parameter is only used as a key for the cache. - provider_id = collection.external_id[self._provider.name].data_id - return await create_collection(provider_id) - - async def create_or_get_show(self, episode: Episode) -> str: - @cache(ttl=timedelta(days=1), cache=self._show_cache) - async def create_show(_: str): - # TODO: Check if a show with the same metadata id exists already on kyoo. - show = ( - await self._provider.identify_show( - episode.show.external_id[self._provider.name].data_id, - ) - if isinstance(episode.show, PartialShow) - else episode.show - ) - # TODO: collections - logging.debug("Got show: %s", episode) - ret = await self._client.post("show", data=show.to_kyoo()) - - async def create_season(season: Season, id: str): - try: - season.show_id = id - return await self._client.post("seasons", data=season.to_kyoo()) - except Exception as e: - logging.exception("Unhandled error create a season", exc_info=e) - - season_tasks = map( - lambda s: exec_as_cache( - self._season_cache, - make_key((ret, s.season_number)), - lambda: create_season(s, ret), - ), - show.seasons, - ) - await asyncio.gather(*season_tasks) - - return ret - - # The parameter is only used as a key for the cache. - provider_id = episode.show.external_id[self._provider.name].data_id - return await create_show(provider_id) - - async def register_seasons( - self, show: Show | PartialShow, show_id: str, season_number: int - ) -> str: - # We use an external season cache because we want to edit this cache programatically - @cache(ttl=timedelta(days=1), cache=self._season_cache) - async def create_season(_: str, __: int): - season = await self._provider.identify_season( - show.external_id[self._provider.name].data_id, season_number - ) - season.show_id = show_id - return await self._client.post("seasons", data=season.to_kyoo()) - - return await create_season(show_id, season_number) + await asyncio.gather(*map(publisher.add, to_register)) + logger.info("Scan finished.") From 8d8e9846696f7120ebef4fa782d173bb765d4924 Mon Sep 17 00:00:00 2001 From: Zoe Roux Date: Tue, 9 Apr 2024 22:37:26 +0200 Subject: [PATCH 07/12] Cleanup scanner logger --- docker-compose.dev.yml | 2 +- scanner/matcher/matcher.py | 24 ++++++++++--------- .../implementations/themoviedatabase.py | 18 +++++++------- scanner/providers/implementations/thexem.py | 16 +++++++------ scanner/providers/kyoo_client.py | 18 +++++++------- 5 files changed, 43 insertions(+), 35 deletions(-) diff --git a/docker-compose.dev.yml b/docker-compose.dev.yml index caf8138d..75c03090 100644 --- a/docker-compose.dev.yml +++ b/docker-compose.dev.yml @@ -86,7 +86,7 @@ services: matcher: build: ./scanner - command: "matcher" + command: matcher restart: on-failure depends_on: back: diff --git a/scanner/matcher/matcher.py b/scanner/matcher/matcher.py index 534f0d8e..b8149b77 100644 --- a/scanner/matcher/matcher.py +++ b/scanner/matcher/matcher.py @@ -1,6 +1,6 @@ from datetime import timedelta import asyncio -import logging +from logging import getLogger from providers.implementations.thexem import TheXem from providers.provider import Provider, ProviderError from providers.types.collection import Collection @@ -11,6 +11,8 @@ from providers.kyoo_client import KyooClient from .parser.guess import guessit from .cache import cache, exec_as_cache, make_key +logger = getLogger(__name__) + class Matcher: def __init__(self, client: KyooClient, provider: Provider, xem: TheXem) -> None: @@ -27,7 +29,7 @@ class Matcher: await self._client.delete(path) return True except Exception as e: - logging.exception("Unhandled error", exc_info=e) + logger.exception("Unhandled error", exc_info=e) return False async def identify(self, path: str): @@ -35,10 +37,10 @@ class Matcher: await self.identify(path) await self._client.delete_issue(path) except ProviderError as e: - logging.error(e) + logger.error(e) await self._client.create_issue(path, str(e)) except Exception as e: - logging.exception("Unhandled error", exc_info=e) + logger.exception("Unhandled error", exc_info=e) await self._client.create_issue( path, "Unknown error", {"type": type(e).__name__, "message": str(e)} ) @@ -63,12 +65,12 @@ class Matcher: f"Multi-episodes files are not yet supported (for {path})" ) - logging.info("Identied %s: %s", path, raw) + logger.info("Identied %s: %s", path, raw) if raw["type"] == "movie": movie = await self._provider.identify_movie(raw["title"], raw.get("year")) movie.path = str(path) - logging.debug("Got movie: %s", movie) + logger.debug("Got movie: %s", movie) movie_id = await self._client.post("movies", data=movie.to_kyoo()) if any(movie.collections): @@ -87,7 +89,7 @@ class Matcher: year=raw.get("year"), ) episode.path = str(path) - logging.debug("Got episode: %s", episode) + logger.debug("Got episode: %s", episode) episode.show_id = await self.create_or_get_show(episode) if episode.season_number is not None: @@ -96,7 +98,7 @@ class Matcher: ) await self._client.post("episodes", data=episode.to_kyoo()) else: - logging.warn("Unknown video file type: %s", raw["type"]) + logger.warn("Unknown video file type: %s", raw["type"]) async def create_or_get_collection(self, collection: Collection) -> str: @cache(ttl=timedelta(days=1), cache=self._collection_cache) @@ -107,7 +109,7 @@ class Matcher: if not any(collection.translations.keys()) else collection ) - logging.debug("Got collection: %s", new_collection) + logger.debug("Got collection: %s", new_collection) return await self._client.post("collection", data=new_collection.to_kyoo()) # The parameter is only used as a key for the cache. @@ -126,7 +128,7 @@ class Matcher: else episode.show ) # TODO: collections - logging.debug("Got show: %s", episode) + logger.debug("Got show: %s", episode) ret = await self._client.post("show", data=show.to_kyoo()) async def create_season(season: Season, id: str): @@ -134,7 +136,7 @@ class Matcher: season.show_id = id return await self._client.post("seasons", data=season.to_kyoo()) except Exception as e: - logging.exception("Unhandled error create a season", exc_info=e) + logger.exception("Unhandled error create a season", exc_info=e) season_tasks = map( lambda s: exec_as_cache( diff --git a/scanner/providers/implementations/themoviedatabase.py b/scanner/providers/implementations/themoviedatabase.py index b8b9c310..b3866da4 100644 --- a/scanner/providers/implementations/themoviedatabase.py +++ b/scanner/providers/implementations/themoviedatabase.py @@ -1,7 +1,7 @@ import asyncio -import logging from aiohttp import ClientSession from datetime import datetime, timedelta +from logging import getLogger from typing import Awaitable, Callable, Dict, List, Optional, Any, TypeVar from itertools import accumulate, zip_longest @@ -20,6 +20,8 @@ from ..types.metadataid import MetadataID from ..types.show import Show, ShowTranslation, Status as ShowStatus from ..types.collection import Collection, CollectionTranslation +logger = getLogger(__name__) + class TheMovieDatabase(Provider): def __init__( @@ -158,7 +160,7 @@ class TheMovieDatabase(Provider): "append_to_response": "alternative_titles,videos,credits,keywords,images", }, ) - logging.debug("TMDb responded: %s", movie) + logger.debug("TMDb responded: %s", movie) ret = Movie( original_language=movie["original_language"], @@ -256,7 +258,7 @@ class TheMovieDatabase(Provider): "append_to_response": "alternative_titles,videos,credits,keywords,images,external_ids", }, ) - logging.debug("TMDb responded: %s", show) + logger.debug("TMDb responded: %s", show) ret = Show( original_language=show["original_language"], @@ -427,7 +429,7 @@ class TheMovieDatabase(Provider): if self.name in ret.external_id: return ret - logging.warn( + logger.warn( "Could not map xem exception to themoviedb, searching instead for %s", new_name, ) @@ -473,7 +475,7 @@ class TheMovieDatabase(Provider): else None ) if tvdb_id is None: - logging.info( + logger.info( "Tvdb could not be found, trying xem name lookup for %s", name ) _, tvdb_id = await self._xem.get_show_override("tvdb", old_name) @@ -518,7 +520,7 @@ class TheMovieDatabase(Provider): }, not_found_fail=f"Could not find episode {episode_nbr} of season {season} of serie {name} (absolute: {absolute})", ) - logging.debug("TMDb responded: %s", episode) + logger.debug("TMDb responded: %s", episode) ret = Episode( show=show, @@ -616,7 +618,7 @@ class TheMovieDatabase(Provider): grp = next(iter(group["groups"]), None) return grp["episodes"] if grp else None except Exception as e: - logging.exception( + logger.exception( "Could not retrieve absolute ordering information", exc_info=e ) return None @@ -697,7 +699,7 @@ class TheMovieDatabase(Provider): "language": lng, }, ) - logging.debug("TMDb responded: %s", collection) + logger.debug("TMDb responded: %s", collection) ret = Collection( external_id={ diff --git a/scanner/providers/implementations/thexem.py b/scanner/providers/implementations/thexem.py index 8fcc75b6..23e22c84 100644 --- a/scanner/providers/implementations/thexem.py +++ b/scanner/providers/implementations/thexem.py @@ -1,12 +1,14 @@ import re -import logging from typing import Dict, List, Literal from aiohttp import ClientSession +from logging import getLogger from datetime import timedelta from providers.utils import ProviderError from matcher.cache import cache +logger = getLogger(__name__) + def clean(s: str): s = s.lower() @@ -28,7 +30,7 @@ class TheXem: async def get_map( self, provider: Literal["tvdb"] | Literal["anidb"] ) -> Dict[str, List[Dict[str, int]]]: - logging.info("Fetching data from thexem for %s", provider) + logger.info("Fetching data from thexem for %s", provider) async with self._client.get( f"{self.base}/map/allNames", params={ @@ -40,7 +42,7 @@ class TheXem: r.raise_for_status() ret = await r.json() if "data" not in ret or ret["result"] == "failure": - logging.error("Could not fetch xem metadata. Error: %s", ret["message"]) + logger.error("Could not fetch xem metadata. Error: %s", ret["message"]) raise ProviderError("Could not fetch xem metadata") return ret["data"] @@ -53,7 +55,7 @@ class TheXem: Dict[Literal["season"] | Literal["episode"] | Literal["absolute"], int], ] ]: - logging.info("Fetching from thexem the map of %s (%s)", id, provider) + logger.info("Fetching from thexem the map of %s (%s)", id, provider) async with self._client.get( f"{self.base}/map/all", params={ @@ -64,7 +66,7 @@ class TheXem: r.raise_for_status() ret = await r.json() if "data" not in ret or ret["result"] == "failure": - logging.error("Could not fetch xem mapping. Error: %s", ret["message"]) + logger.error("Could not fetch xem mapping. Error: %s", ret["message"]) return [] return ret["data"] @@ -111,7 +113,7 @@ class TheXem: if master_season is None or master_season == -1: return [None, None, episode] - logging.info( + logger.info( "Fount xem override for show %s, ep %d. Master season: %d", show_name, episode, @@ -130,7 +132,7 @@ class TheXem: None, ) if ep is None: - logging.warning( + logger.warning( "Could not get xem mapping for show %s, falling back to identifier mapping.", show_name, ) diff --git a/scanner/providers/kyoo_client.py b/scanner/providers/kyoo_client.py index ec271817..7f95a3a9 100644 --- a/scanner/providers/kyoo_client.py +++ b/scanner/providers/kyoo_client.py @@ -1,13 +1,15 @@ import os -import logging import jsons from aiohttp import ClientSession from datetime import date +from logging import getLogger from typing import List, Literal, Any, Optional from urllib.parse import quote from .utils import format_date +logger = getLogger(__name__) + class KyooClient: def __init__(self) -> None: @@ -76,11 +78,11 @@ class KyooClient: ) as r: # Allow 409 and continue as if it worked. if not r.ok and r.status != 409: - logging.error(f"Request error: {await r.text()}") + logger.error(f"Request error: {await r.text()}") r.raise_for_status() async def post(self, path: str, *, data: dict[str, Any]) -> str: - logging.debug( + logger.debug( "Sending %s: %s", path, jsons.dumps( @@ -96,7 +98,7 @@ class KyooClient: ) as r: # Allow 409 and continue as if it worked. if not r.ok and r.status != 409: - logging.error(f"Request error: {await r.text()}") + logger.error(f"Request error: {await r.text()}") r.raise_for_status() ret = await r.json() @@ -107,7 +109,7 @@ class KyooClient: and ret["airDate"][:4] != str(data["air_date"].year) ) ): - logging.info( + logger.info( f"Found a {path} with the same slug ({ret['slug']}) and a different date, using the date as part of the slug" ) year = (data["start_air"] if path == "movie" else data["air_date"]).year @@ -120,7 +122,7 @@ class KyooClient: path: str, type: Literal["episode", "movie"] | None = None, ): - logging.info("Deleting %s", path) + logger.info("Deleting %s", path) if type is None or type == "movie": async with self.client.delete( @@ -128,7 +130,7 @@ class KyooClient: headers={"X-API-Key": self._api_key}, ) as r: if not r.ok: - logging.error(f"Request error: {await r.text()}") + logger.error(f"Request error: {await r.text()}") r.raise_for_status() if type is None or type == "episode": @@ -137,7 +139,7 @@ class KyooClient: headers={"X-API-Key": self._api_key}, ) as r: if not r.ok: - logging.error(f"Request error: {await r.text()}") + logger.error(f"Request error: {await r.text()}") r.raise_for_status() await self.delete_issue(path) From e4403cc17c227cb708d3f75fffc81d07b89333c7 Mon Sep 17 00:00:00 2001 From: Zoe Roux Date: Tue, 9 Apr 2024 23:17:08 +0200 Subject: [PATCH 08/12] Fix matcher/scanner issues --- scanner/matcher/matcher.py | 2 +- scanner/matcher/subscriber.py | 30 ++++++++++--------- .../implementations/themoviedatabase.py | 2 +- scanner/providers/provider.py | 1 + scanner/scanner/__main__.py | 4 +-- 5 files changed, 21 insertions(+), 18 deletions(-) diff --git a/scanner/matcher/matcher.py b/scanner/matcher/matcher.py index b8149b77..d09a4b80 100644 --- a/scanner/matcher/matcher.py +++ b/scanner/matcher/matcher.py @@ -34,7 +34,7 @@ class Matcher: async def identify(self, path: str): try: - await self.identify(path) + await self._identify(path) await self._client.delete_issue(path) except ProviderError as e: logger.error(e) diff --git a/scanner/matcher/subscriber.py b/scanner/matcher/subscriber.py index 035c09ee..126307c7 100644 --- a/scanner/matcher/subscriber.py +++ b/scanner/matcher/subscriber.py @@ -1,3 +1,4 @@ +import asyncio from dataclasses import dataclass from dataclasses_json import DataClassJsonMixin from typing import Literal @@ -34,22 +35,23 @@ class Subscriber: async def listen(self, scanner: Matcher): async def on_message(message: AbstractIncomingMessage): - async with message.process(): - msg = Message.from_json(message.body) - ack = False - match msg.action: - case "scan": - ack = await scanner.identify(msg.path) - case "delete": - ack = await scanner.delete(msg.path) - case _: - logger.error(f"Invalid action: {msg.action}") - if ack: - await message.ack() - else: - await message.nack(requeue=False) + msg = Message.from_json(message.body) + ack = False + match msg.action: + case "scan": + ack = await scanner.identify(msg.path) + case "delete": + ack = await scanner.delete(msg.path) + case _: + logger.error(f"Invalid action: {msg.action}") + if ack: + await message.ack() + else: + await message.reject() # Allow up to 20 scan requests to run in parallel on the same listener. # Since most work is calling API not doing that is a waste. await self._channel.set_qos(prefetch_count=20) await self._queue.consume(on_message) + await asyncio.Future() + diff --git a/scanner/providers/implementations/themoviedatabase.py b/scanner/providers/implementations/themoviedatabase.py index b3866da4..3f1c861e 100644 --- a/scanner/providers/implementations/themoviedatabase.py +++ b/scanner/providers/implementations/themoviedatabase.py @@ -26,7 +26,7 @@ logger = getLogger(__name__) class TheMovieDatabase(Provider): def __init__( self, - languages, + languages: list[str], client: ClientSession, api_key: str, xem: TheXem, diff --git a/scanner/providers/provider.py b/scanner/providers/provider.py index 545e2d21..5f5e7f53 100644 --- a/scanner/providers/provider.py +++ b/scanner/providers/provider.py @@ -20,6 +20,7 @@ class Provider: if not languages: print("Missing environment variable 'LIBRARY_LANGUAGES'.") exit(2) + languages = languages.split(",") providers = [] from providers.idmapper import IdMapper diff --git a/scanner/scanner/__main__.py b/scanner/scanner/__main__.py index 670779da..ac4e42e3 100644 --- a/scanner/scanner/__main__.py +++ b/scanner/scanner/__main__.py @@ -1,6 +1,6 @@ #!/usr/bin/env python import asyncio -import matcher +import scanner -asyncio.run(matcher.main()) +asyncio.run(scanner.main()) From a085650a2e613a2baa71171fa683cad9ba867069 Mon Sep 17 00:00:00 2001 From: Zoe Roux Date: Tue, 9 Apr 2024 23:17:21 +0200 Subject: [PATCH 09/12] Add missing genres on the front --- front/packages/models/src/resources/genre.ts | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/front/packages/models/src/resources/genre.ts b/front/packages/models/src/resources/genre.ts index dfb04d9a..c54004bb 100644 --- a/front/packages/models/src/resources/genre.ts +++ b/front/packages/models/src/resources/genre.ts @@ -37,4 +37,10 @@ export enum Genre { Thriller = "Thriller", War = "War", Western = "Western", + Kids = "Kids", + News = "News", + Reality = "Reality", + Soap = "Soap", + Talk = "Talk", + Politics = "Politics", } From d0901c7267c06100bb7f2ec71d883f3ed6b351d1 Mon Sep 17 00:00:00 2001 From: Zoe Roux Date: Tue, 9 Apr 2024 23:37:23 +0200 Subject: [PATCH 10/12] Fix issues creation --- scanner/providers/kyoo_client.py | 21 ++++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/scanner/providers/kyoo_client.py b/scanner/providers/kyoo_client.py index 7f95a3a9..5dfbdc7b 100644 --- a/scanner/providers/kyoo_client.py +++ b/scanner/providers/kyoo_client.py @@ -57,17 +57,28 @@ class KyooClient: return paths async def create_issue(self, path: str, issue: str, extra: dict | None = None): - await self.client.post( + async with self.client.post( f"{self._url}/issues", - json={"domain": "scanner", "cause": path, "reason": issue, "extra": extra}, + json={ + "domain": "scanner", + "cause": path, + "reason": issue, + "extra": extra if extra is not None else {}, + }, headers={"X-API-Key": self._api_key}, - ) + ) as r: + if not r.ok: + logger.error(f"Request error: {await r.text()}") + r.raise_for_status() async def delete_issue(self, path: str): - await self.client.delete( + async with self.client.delete( f'{self._url}/issues?filter=domain eq scanner and cause eq "{path}"', headers={"X-API-Key": self._api_key}, - ) + ) as r: + if not r.ok: + logger.error(f"Request error: {await r.text()}") + r.raise_for_status() async def link_collection( self, collection: str, type: Literal["movie"] | Literal["show"], id: str From 0f3f33a9e6fdc68035927f745b5e7107913c38d0 Mon Sep 17 00:00:00 2001 From: Zoe Roux Date: Tue, 9 Apr 2024 23:45:37 +0200 Subject: [PATCH 11/12] Add matcher to docker-compose files --- docker-compose.prod.yml | 12 ++++++++++++ docker-compose.yml | 12 ++++++++++++ 2 files changed, 24 insertions(+) diff --git a/docker-compose.prod.yml b/docker-compose.prod.yml index 57dfb271..0e7a9872 100644 --- a/docker-compose.prod.yml +++ b/docker-compose.prod.yml @@ -59,6 +59,18 @@ services: volumes: - ${LIBRARY_ROOT}:/video:ro + matcher: + image: zoriya/kyoo_scanner:latest + command: matcher + restart: unless-stopped + depends_on: + back: + condition: service_healthy + env_file: + - ./.env + environment: + - KYOO_URL=${KYOO_URL:-http://back:5000} + autosync: image: zoriya/kyoo_autosync:latest restart: on-failure diff --git a/docker-compose.yml b/docker-compose.yml index 4a3db931..c8815528 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -60,6 +60,18 @@ services: volumes: - ${LIBRARY_ROOT}:/video:ro + matcher: + build: ./scanner + command: matcher + restart: unless-stopped + depends_on: + back: + condition: service_healthy + env_file: + - ./.env + environment: + - KYOO_URL=${KYOO_URL:-http://back:5000} + autosync: build: ./autosync restart: on-failure From 18b56724bb2ab61f67d374f378ce1155223f564d Mon Sep 17 00:00:00 2001 From: Zoe Roux Date: Wed, 10 Apr 2024 00:11:42 +0200 Subject: [PATCH 12/12] Format code --- scanner/matcher/subscriber.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scanner/matcher/subscriber.py b/scanner/matcher/subscriber.py index 126307c7..15d25710 100644 --- a/scanner/matcher/subscriber.py +++ b/scanner/matcher/subscriber.py @@ -11,6 +11,7 @@ from matcher.matcher import Matcher logger = logging.getLogger(__name__) + @dataclass class Message(DataClassJsonMixin): action: Literal["scan", "delete"] @@ -54,4 +55,3 @@ class Subscriber: await self._channel.set_qos(prefetch_count=20) await self._queue.consume(on_message) await asyncio.Future() -