diff --git a/autosync/autosync/__init__.py b/autosync/autosync/__init__.py index e6e44ec2..8b1eda73 100644 --- a/autosync/autosync/__init__.py +++ b/autosync/autosync/__init__.py @@ -35,7 +35,7 @@ def on_message( body: bytes, ): try: - message = Message.from_json(body) # type: Message + message = Message.from_json(body) service.update(message.value.user, message.value.resource, message.value) except Exception as e: logging.exception("Error processing message.", exc_info=e) diff --git a/autosync/autosync/models/message.py b/autosync/autosync/models/message.py index 94b3312e..d24f39fc 100644 --- a/autosync/autosync/models/message.py +++ b/autosync/autosync/models/message.py @@ -1,5 +1,5 @@ from dataclasses import dataclass -from dataclasses_json import dataclass_json, LetterCase +from dataclasses_json import DataClassJsonMixin, dataclass_json, LetterCase from autosync.models.episode import Episode from autosync.models.movie import Movie @@ -17,7 +17,7 @@ class WatchStatusMessage(WatchStatus): @dataclass_json(letter_case=LetterCase.CAMEL) @dataclass -class Message: +class Message(DataClassJsonMixin): action: str type: str value: WatchStatusMessage diff --git a/scanner/Dockerfile b/scanner/Dockerfile index 372016df..785dd1bc 100644 --- a/scanner/Dockerfile +++ b/scanner/Dockerfile @@ -5,4 +5,4 @@ COPY ./requirements.txt . RUN pip3 install -r ./requirements.txt COPY . . -ENTRYPOINT ["python3", "-m", "scanner", "-v"] +ENTRYPOINT ["python3", "-m", "scanner"] diff --git a/scanner/monitor/__init__.py b/scanner/monitor/__init__.py index 3716b018..5e3b3366 100644 --- a/scanner/monitor/__init__.py +++ b/scanner/monitor/__init__.py @@ -7,6 +7,7 @@ async def main(): from .publisher import Publisher from providers.kyoo_client import KyooClient + logging.basicConfig(level=logging.INFO) logging.getLogger("watchfiles").setLevel(logging.WARNING) async with Publisher() as publisher, KyooClient() as client: diff --git a/scanner/monitor/scanner.py b/scanner/monitor/scanner.py index 30ecf5db..dc8611ed 100644 --- a/scanner/monitor/scanner.py +++ b/scanner/monitor/scanner.py @@ -8,6 +8,9 @@ async def scan(path: str): self.issues = await self.get_issues() videos = [str(p) for p in Path(path).rglob("*") if p.is_file()] deleted = [x for x in self.registered if x not in videos] + # if path in self.registered or self._ignore_pattern.match(path): + # return + # # try: # self._ignore_pattern = re.compile( diff --git a/scanner/providers/kyoo_client.py b/scanner/providers/kyoo_client.py index dc980a19..5a3af9b0 100644 --- a/scanner/providers/kyoo_client.py +++ b/scanner/providers/kyoo_client.py @@ -1,31 +1,37 @@ -from datetime import timedelta import os -import asyncio import logging import jsons -import re from aiohttp import ClientSession -from pathlib import Path from typing import List, Literal, Any from urllib.parse import quote -from providers.provider import Provider, ProviderError -from providers.types.collection import Collection -from providers.types.show import Show -from providers.types.episode import Episode, PartialShow -from providers.types.season import Season class KyooClient: - def __init__( - self, client: ClientSession, *, api_key: str - ) -> None: - self._client = client - self._api_key = api_key + def __init__(self) -> None: + self._api_key = os.environ.get("KYOO_APIKEY") + if not self._api_key: + self._api_key = os.environ.get("KYOO_APIKEYS") + if not self._api_key: + print("Missing environment variable 'KYOO_APIKEY'.") + exit(2) + self._api_key = self._api_key.split(",")[0] + self._url = os.environ.get("KYOO_URL", "http://back:5000") + async def __aenter__(self): + jsons.set_serializer(lambda x, **_: format_date(x), Optional[date | int]) # type: ignore + self.client = ClientSession( + json_serialize=lambda *args, **kwargs: jsons.dumps( + *args, key_transformer=jsons.KEY_TRANSFORMER_CAMELCASE, **kwargs + ), + ) + return self + + async def __aexit__(self, exc_type, exc_value, exc_tb): + await self.client.close() async def get_issues(self) -> List[str]: - async with self._client.get( + async with self.client.get( f"{self._url}/issues", params={"limit": 0}, headers={"X-API-Key": self._api_key}, @@ -34,10 +40,23 @@ class KyooClient: ret = await r.json() return [x["cause"] for x in ret if x["domain"] == "scanner"] + async def create_issue(self, path: str, issue: str, extra: dict | None = None): + await self.client.post( + f"{self._url}/issues", + json={"domain": "scanner", "cause": path, "reason": issue, "extra": extra}, + headers={"X-API-Key": self._api_key}, + ) + + async def delete_issue(self, path: str): + await self.client.delete( + f'{self._url}/issues?filter=domain eq scanner and cause eq "{path}"', + headers={"X-API-Key": self._api_key}, + ) + async def link_collection( self, collection: str, type: Literal["movie"] | Literal["show"], id: str ): - async with self._client.put( + async with self.client.put( f"{self._url}/collections/{collection}/{type}/{id}", headers={"X-API-Key": self._api_key}, ) as r: @@ -56,7 +75,7 @@ class KyooClient: jdkwargs={"indent": 4}, ), ) - async with self._client.post( + async with self.client.post( f"{self._url}/{path}", json=data, headers={"X-API-Key": self._api_key}, @@ -85,13 +104,12 @@ class KyooClient: async def delete( self, path: str, - type: Literal["episode", "movie", "issue"] | None = None, + type: Literal["episode", "movie"] | None = None, ): logging.info("Deleting %s", path) - self.registered = filter(lambda x: x != path, self.registered) if type is None or type == "movie": - async with self._client.delete( + async with self.client.delete( f'{self._url}/movies?filter=path eq "{quote(path)}"', headers={"X-API-Key": self._api_key}, ) as r: @@ -100,7 +118,7 @@ class KyooClient: r.raise_for_status() if type is None or type == "episode": - async with self._client.delete( + async with self.client.delete( f'{self._url}/episodes?filter=path eq "{quote(path)}"', headers={"X-API-Key": self._api_key}, ) as r: @@ -108,10 +126,4 @@ class KyooClient: logging.error(f"Request error: {await r.text()}") r.raise_for_status() - if path in self.issues: - self.issues = filter(lambda x: x != path, self.issues) - await self._client.delete( - f'{self._url}/issues?filter=domain eq scanner and cause eq "{quote(path)}"', - headers={"X-API-Key": self._api_key}, - ) - + await self.delete_issue(path) diff --git a/scanner/providers/provider.py b/scanner/providers/provider.py index f9e449e4..12a7f655 100644 --- a/scanner/providers/provider.py +++ b/scanner/providers/provider.py @@ -1,7 +1,7 @@ import os from aiohttp import ClientSession from abc import abstractmethod, abstractproperty -from typing import Optional, TypeVar +from typing import Optional, Self from providers.implementations.thexem import TheXem from providers.utils import ProviderError @@ -13,14 +13,13 @@ from .types.movie import Movie from .types.collection import Collection -Self = TypeVar("Self", bound="Provider") - - class Provider: @classmethod - def get_all( - cls: type[Self], client: ClientSession, languages: list[str] - ) -> tuple[list[Self], TheXem]: + def get_all(cls, client: ClientSession) -> tuple[Self, TheXem]: + languages = os.environ.get("LIBRARY_LANGUAGES") + if not languages: + print("Missing environment variable 'LIBRARY_LANGUAGES'.") + exit(2) providers = [] from providers.idmapper import IdMapper @@ -44,7 +43,7 @@ class Provider: idmapper.init(tmdb=tmdb, language=languages[0]) - return providers, xem + return next(providers), xem @abstractproperty def name(self) -> str: diff --git a/scanner/scanner/__init__.py b/scanner/scanner/__init__.py index c21a9dde..e049dff0 100644 --- a/scanner/scanner/__init__.py +++ b/scanner/scanner/__init__.py @@ -1,47 +1,18 @@ -from providers.kyoo_client import KyooClient - - async def main(): - import asyncio - import os import logging import sys - import jsons - from datetime import date - from typing import Optional - from aiohttp import ClientSession - from providers.utils import format_date, ProviderError + from providers.provider import Provider + from providers.kyoo_client import KyooClient from .scanner import Scanner + from .subscriber import Subscriber - path = os.environ.get("SCANNER_LIBRARY_ROOT", "/video") - languages = os.environ.get("LIBRARY_LANGUAGES") - if not languages: - print("Missing environment variable 'LIBRARY_LANGUAGES'.") - exit(2) - api_key = os.environ.get("KYOO_APIKEY") - if not api_key: - api_key = os.environ.get("KYOO_APIKEYS") - if not api_key: - print("Missing environment variable 'KYOO_APIKEY'.") - exit(2) - api_key = api_key.split(",")[0] - + logging.basicConfig(level=logging.INFO) if len(sys.argv) > 1 and sys.argv[1] == "-v": - logging.basicConfig(level=logging.INFO) - if len(sys.argv) > 1 and sys.argv[1] == "-vv": logging.basicConfig(level=logging.DEBUG) logging.getLogger("watchfiles").setLevel(logging.WARNING) logging.getLogger("rebulk").setLevel(logging.WARNING) - jsons.set_serializer(lambda x, **_: format_date(x), Optional[date | int]) # type: ignore - async with ClientSession( - json_serialize=lambda *args, **kwargs: jsons.dumps( - *args, key_transformer=jsons.KEY_TRANSFORMER_CAMELCASE, **kwargs - ), - ) as client: - kyoo = KyooClient(client, api_key=api_key) - provider = - try: - scanner = Scanner(kyoo, languages=languages.split(","), api_key=api_key) - except ProviderError as e: - logging.error(e) + async with KyooClient() as kyoo, Subscriber() as sub: + provider, xem = Provider.get_all(kyoo.client) + scanner = Scanner(kyoo, provider, xem) + await sub.listen(scanner) diff --git a/scanner/scanner/scanner.py b/scanner/scanner/scanner.py index c723f3a9..22010b34 100644 --- a/scanner/scanner/scanner.py +++ b/scanner/scanner/scanner.py @@ -9,7 +9,6 @@ from providers.types.episode import Episode, PartialShow from providers.types.season import Season from providers.kyoo_client import KyooClient from .parser.guess import guessit -from .utils import handle_errors from .cache import cache, exec_as_cache, make_key @@ -23,11 +22,30 @@ class Scanner: self._show_cache = {} self._season_cache = {} - @handle_errors + async def delete(self, path: str): + try: + await self._client.delete(path) + return True + except Exception as e: + logging.exception("Unhandled error", exc_info=e) + return False + async def identify(self, path: str): - # if path in self.registered or self._ignore_pattern.match(path): - # return - # + try: + await self.identify(path) + await self._client.delete_issue(path) + except ProviderError as e: + logging.error(e) + await self._client.create_issue(path, str(e)) + except Exception as e: + logging.exception("Unhandled error", exc_info=e) + await self._client.create_issue( + path, "Unknown error", {"type": type(e).__name__, "message": str(e)} + ) + return False + return True + + async def _identify(self, path: str): raw = guessit(path, xem_titles=await self._xem.get_expected_titles()) if "mimetype" not in raw or not raw["mimetype"].startswith("video"): diff --git a/scanner/scanner/subscriber.py b/scanner/scanner/subscriber.py new file mode 100644 index 00000000..55ef3ae4 --- /dev/null +++ b/scanner/scanner/subscriber.py @@ -0,0 +1,52 @@ +from dataclasses import dataclass +from dataclasses_json import DataClassJsonMixin +from typing import Literal +import os +import logging +from aio_pika import connect_robust +from aio_pika.abc import AbstractIncomingMessage + +from scanner.scanner import Scanner + +logger = logging.getLogger(__name__) + +@dataclass +class Message(DataClassJsonMixin): + action: Literal["scan"] | Literal["delete"] + path: str + + +class Subscriber: + QUEUE = "scanner" + + async def __aenter__(self): + self._con = await connect_robust( + host=os.environ.get("RABBITMQ_HOST", "rabbitmq"), + login=os.environ.get("RABBITMQ_DEFAULT_USER", "guest"), + password=os.environ.get("RABBITMQ_DEFAULT_PASS", "guest"), + ) + self._channel = await self._con.channel() + self._queue = await self._channel.declare_queue(self.QUEUE) + return self + + async def __aexit__(self, exc_type, exc_value, exc_tb): + await self._con.close() + + async def listen(self, scanner: Scanner): + async def on_message(message: AbstractIncomingMessage): + async with message.process(): + msg = Message.from_json(message.body) + ack = False + match msg.action: + case "scan": + ack = await scanner.identify(msg.path) + case "delete": + ack = await scanner.delete(msg.path) + case _: + logger.error(f"Invalid action: {msg.action}") + if ack: + await message.ack() + else: + await message.nack(requeue=False) + + await self._queue.consume(on_message, no_ack=True) diff --git a/scanner/scanner/utils.py b/scanner/scanner/utils.py deleted file mode 100644 index ae3de92a..00000000 --- a/scanner/scanner/utils.py +++ /dev/null @@ -1,56 +0,0 @@ -from __future__ import annotations -import logging -from functools import wraps -from itertools import islice -from typing import TYPE_CHECKING, Iterator, List, TypeVar -from providers.utils import ProviderError - -if TYPE_CHECKING: - from scanner.scanner import Scanner - - -T = TypeVar("T") - - -def batch(iterable: Iterator[T], n: int) -> Iterator[List[T]]: - "Batch data into lists of length n. The last batch may be shorter." - # batched('ABCDEFG', 3) --> ABC DEF G - it = iter(iterable) - while True: - batch = list(islice(it, n)) - if not batch: - return - yield batch - - -def handle_errors(f): - @wraps(f) - async def internal(self: Scanner, path: str): - try: - await f(self, path) - if path in self.issues: - await self._client.delete( - f'{self._url}/issues?filter=domain eq scanner and cause eq "{path}"', - headers={"X-API-Key": self._api_key}, - ) - except ProviderError as e: - logging.error(str(e)) - await self._client.post( - f"{self._url}/issues", - json={"domain": "scanner", "cause": path, "reason": str(e)}, - headers={"X-API-Key": self._api_key}, - ) - except Exception as e: - logging.exception("Unhandled error", exc_info=e) - await self._client.post( - f"{self._url}/issues", - json={ - "domain": "scanner", - "cause": path, - "reason": "Unknown error", - "extra": {"type": type(e).__name__, "message": str(e)}, - }, - headers={"X-API-Key": self._api_key}, - ) - - return internal