Fix scanner message storm caused by #916

Signed-off-by: Fred Heinecke <fred.heinecke@yahoo.com>
This commit is contained in:
Fred Heinecke 2025-05-02 21:59:22 +00:00 committed by Zoe Roux
parent a16d09d692
commit 099032c165
3 changed files with 31 additions and 18 deletions

View File

@ -6,12 +6,17 @@ async def main():
from .scanner import scan
from .refresher import refresh
from .publisher import Publisher
from .subscriber import Subscriber
from providers.kyoo_client import KyooClient
logging.basicConfig(level=logging.INFO)
logging.getLogger("watchfiles").setLevel(logging.WARNING)
async with Publisher() as publisher, KyooClient() as client:
async with (
Publisher() as publisher,
Subscriber() as subscriber,
KyooClient() as client,
):
path = os.environ.get("SCANNER_LIBRARY_ROOT", "/video")
async def scan_all():
@ -21,5 +26,5 @@ async def main():
monitor(path, publisher, client),
scan_all(),
refresh(publisher, client),
publisher.listen(scan_all),
subscriber.listen(scan_all),
)

View File

@ -1,7 +1,5 @@
import asyncio
from guessit.jsonutils import json
from aio_pika import Message
from aio_pika.abc import AbstractIncomingMessage
from logging import getLogger
from typing import Literal
@ -11,8 +9,6 @@ logger = getLogger(__name__)
class Publisher(RabbitBase):
QUEUE_RESCAN = "scanner.rescan"
async def _publish(self, data: dict):
await self._channel.default_exchange.publish(
Message(json.dumps(data).encode()),
@ -32,15 +28,3 @@ class Publisher(RabbitBase):
**_kwargs,
):
await self._publish({"action": "refresh", "kind": kind, "id": id})
async def listen(self, scan):
async def on_message(message: AbstractIncomingMessage):
try:
await scan()
await message.ack()
except Exception as e:
logger.exception("Unhandled error", exc_info=e)
await message.reject()
await self._queue.consume(on_message)
await asyncio.Future()

View File

@ -0,0 +1,24 @@
import asyncio
from guessit.jsonutils import json
from aio_pika.abc import AbstractIncomingMessage
from logging import getLogger
from providers.rabbit_base import RabbitBase
logger = getLogger(__name__)
class Subscriber(RabbitBase):
QUEUE = "scanner.rescan"
async def listen(self, scan):
async def on_message(message: AbstractIncomingMessage):
try:
await scan()
await message.ack()
except Exception as e:
logger.exception("Unhandled error", exc_info=e)
await message.reject()
await self._queue.consume(on_message)
await asyncio.Future()