Move rescan requests to the scanner service (instead of the matcher)

This commit is contained in:
Zoe Roux 2024-05-03 12:12:40 +02:00
parent b07f164ace
commit 4130601856
No known key found for this signature in database
5 changed files with 57 additions and 31 deletions

View File

@ -32,19 +32,20 @@ public class ScannerProducer : IScanner
{ {
_channel = rabbitConnection.CreateModel(); _channel = rabbitConnection.CreateModel();
_channel.QueueDeclare("scanner", exclusive: false, autoDelete: false); _channel.QueueDeclare("scanner", exclusive: false, autoDelete: false);
_channel.QueueDeclare("scanner.rescan", exclusive: false, autoDelete: false);
} }
private Task _Publish<T>(T message) private Task _Publish<T>(T message, string queue = "scanner")
{ {
var body = Encoding.UTF8.GetBytes(JsonSerializer.Serialize(message, Utility.JsonOptions)); var body = Encoding.UTF8.GetBytes(JsonSerializer.Serialize(message, Utility.JsonOptions));
_channel.BasicPublish("", routingKey: "scanner", body: body); _channel.BasicPublish("", routingKey: queue, body: body);
return Task.CompletedTask; return Task.CompletedTask;
} }
public Task SendRescanRequest() public Task SendRescanRequest()
{ {
var message = new { Action = "rescan", }; var message = new { Action = "rescan", };
return _Publish(message); return _Publish(message, queue: "scanner.rescan");
} }
public Task SendRefreshRequest(string kind, Guid id) public Task SendRefreshRequest(string kind, Guid id)

View File

@ -4,9 +4,7 @@ from msgspec import Struct, json
from logging import getLogger from logging import getLogger
from aio_pika.abc import AbstractIncomingMessage from aio_pika.abc import AbstractIncomingMessage
from scanner.publisher import Publisher from providers.rabbit_base import RabbitBase
from scanner.scanner import scan
from matcher.matcher import Matcher from matcher.matcher import Matcher
logger = getLogger(__name__) logger = getLogger(__name__)
@ -29,14 +27,10 @@ class Refresh(Message):
id: str id: str
class Rescan(Message): decoder = json.Decoder(Union[Scan, Delete, Refresh])
pass
decoder = json.Decoder(Union[Scan, Delete, Refresh, Rescan]) class Subscriber(RabbitBase):
class Subscriber(Publisher):
async def listen(self, matcher: Matcher): async def listen(self, matcher: Matcher):
async def on_message(message: AbstractIncomingMessage): async def on_message(message: AbstractIncomingMessage):
try: try:
@ -49,9 +43,6 @@ class Subscriber(Publisher):
ack = await matcher.delete(path) ack = await matcher.delete(path)
case Refresh(kind, id): case Refresh(kind, id):
ack = await matcher.refresh(kind, id) ack = await matcher.refresh(kind, id)
case Rescan():
await scan(None, self, matcher._client, remove_deleted=True)
ack = True
case _: case _:
logger.error(f"Invalid action: {msg.action}") logger.error(f"Invalid action: {msg.action}")
if ack: if ack:

View File

@ -0,0 +1,20 @@
import os
from aio_pika import connect_robust
class RabbitBase:
QUEUE = "scanner"
async def __aenter__(self):
self._con = await connect_robust(
host=os.environ.get("RABBITMQ_HOST", "rabbitmq"),
port=int(os.environ.get("RABBITMQ_PORT", "5672")),
login=os.environ.get("RABBITMQ_DEFAULT_USER", "guest"),
password=os.environ.get("RABBITMQ_DEFAULT_PASS", "guest"),
)
self._channel = await self._con.channel()
self._queue = await self._channel.declare_queue(self.QUEUE)
return self
async def __aexit__(self, exc_type, exc_value, exc_tb):
await self._con.close()

View File

@ -13,8 +13,13 @@ async def main():
async with Publisher() as publisher, KyooClient() as client: async with Publisher() as publisher, KyooClient() as client:
path = os.environ.get("SCANNER_LIBRARY_ROOT", "/video") path = os.environ.get("SCANNER_LIBRARY_ROOT", "/video")
async def scan_all():
await scan(path, publisher, client, remove_deleted=True)
await asyncio.gather( await asyncio.gather(
monitor(path, publisher, client), monitor(path, publisher, client),
scan(path, publisher, client, remove_deleted=True), scan_all(),
refresh(publisher, client), refresh(publisher, client),
publisher.listen(scan_all),
) )

View File

@ -1,26 +1,23 @@
import os import asyncio
from guessit.jsonutils import json from guessit.jsonutils import json
from aio_pika import Message, connect_robust from aio_pika import Message
from aio_pika.abc import AbstractIncomingMessage
from logging import getLogger
from typing import Literal from typing import Literal
from providers.rabbit_base import RabbitBase
class Publisher: logger = getLogger(__name__)
QUEUE = "scanner"
class Publisher(RabbitBase):
QUEUE_RESCAN = "scanner.rescan"
async def __aenter__(self): async def __aenter__(self):
self._con = await connect_robust( await super().__aenter__()
host=os.environ.get("RABBITMQ_HOST", "rabbitmq"), self._queue = await self._channel.declare_queue(self.QUEUE_RESCAN)
port=int(os.environ.get("RABBITMQ_PORT", "5672")),
login=os.environ.get("RABBITMQ_DEFAULT_USER", "guest"),
password=os.environ.get("RABBITMQ_DEFAULT_PASS", "guest"),
)
self._channel = await self._con.channel()
self._queue = await self._channel.declare_queue(self.QUEUE)
return self return self
async def __aexit__(self, exc_type, exc_value, exc_tb):
await self._con.close()
async def _publish(self, data: dict): async def _publish(self, data: dict):
await self._channel.default_exchange.publish( await self._channel.default_exchange.publish(
Message(json.dumps(data).encode()), Message(json.dumps(data).encode()),
@ -40,3 +37,15 @@ class Publisher:
**_kwargs, **_kwargs,
): ):
await self._publish({"action": "refresh", "kind": kind, "id": id}) await self._publish({"action": "refresh", "kind": kind, "id": id})
async def listen(self, scan):
async def on_message(message: AbstractIncomingMessage):
try:
await scan()
await message.ack()
except Exception as e:
logger.exception("Unhandled error", exc_info=e)
await message.reject()
await self._queue.consume(on_message)
await asyncio.Future()