diff --git a/scanner/scanner/__init__.py b/scanner/scanner/__init__.py index c7ffd55a..208dc4c7 100644 --- a/scanner/scanner/__init__.py +++ b/scanner/scanner/__init__.py @@ -6,12 +6,17 @@ async def main(): from .scanner import scan from .refresher import refresh from .publisher import Publisher + from .subscriber import Subscriber from providers.kyoo_client import KyooClient logging.basicConfig(level=logging.INFO) logging.getLogger("watchfiles").setLevel(logging.WARNING) - async with Publisher() as publisher, KyooClient() as client: + async with ( + Publisher() as publisher, + Subscriber() as subscriber, + KyooClient() as client, + ): path = os.environ.get("SCANNER_LIBRARY_ROOT", "/video") async def scan_all(): @@ -21,5 +26,5 @@ async def main(): monitor(path, publisher, client), scan_all(), refresh(publisher, client), - publisher.listen(scan_all), + subscriber.listen(scan_all), ) diff --git a/scanner/scanner/publisher.py b/scanner/scanner/publisher.py index f7821d8f..d8ad9cc0 100644 --- a/scanner/scanner/publisher.py +++ b/scanner/scanner/publisher.py @@ -1,7 +1,5 @@ -import asyncio from guessit.jsonutils import json from aio_pika import Message -from aio_pika.abc import AbstractIncomingMessage from logging import getLogger from typing import Literal @@ -11,8 +9,6 @@ logger = getLogger(__name__) class Publisher(RabbitBase): - QUEUE_RESCAN = "scanner.rescan" - async def _publish(self, data: dict): await self._channel.default_exchange.publish( Message(json.dumps(data).encode()), @@ -32,15 +28,3 @@ class Publisher(RabbitBase): **_kwargs, ): await self._publish({"action": "refresh", "kind": kind, "id": id}) - - async def listen(self, scan): - async def on_message(message: AbstractIncomingMessage): - try: - await scan() - await message.ack() - except Exception as e: - logger.exception("Unhandled error", exc_info=e) - await message.reject() - - await self._queue.consume(on_message) - await asyncio.Future() diff --git a/scanner/scanner/subscriber.py b/scanner/scanner/subscriber.py new file mode 100644 index 00000000..98149d26 --- /dev/null +++ b/scanner/scanner/subscriber.py @@ -0,0 +1,24 @@ +import asyncio +from guessit.jsonutils import json +from aio_pika.abc import AbstractIncomingMessage +from logging import getLogger + +from providers.rabbit_base import RabbitBase + +logger = getLogger(__name__) + + +class Subscriber(RabbitBase): + QUEUE = "scanner.rescan" + + async def listen(self, scan): + async def on_message(message: AbstractIncomingMessage): + try: + await scan() + await message.ack() + except Exception as e: + logger.exception("Unhandled error", exc_info=e) + await message.reject() + + await self._queue.consume(on_message) + await asyncio.Future()