Make the scanner's consumer also a publisher

This commit is contained in:
Zoe Roux 2024-05-02 01:07:46 +02:00
parent c1ed16b871
commit 21414d6c2f
No known key found for this signature in database

View File

@ -1,7 +1,6 @@
import asyncio import asyncio
from typing import Union, Literal from typing import Union, Literal
from msgspec import Struct, json from msgspec import Struct, json
import os
from logging import getLogger from logging import getLogger
from aio_pika import connect_robust from aio_pika import connect_robust
from aio_pika.abc import AbstractIncomingMessage from aio_pika.abc import AbstractIncomingMessage
@ -28,38 +27,24 @@ class Refresh(Message):
id: str id: str
decoder = json.Decoder(Union[Scan, Delete, Refresh]) decoder = json.Decoder(Union[Scan, Delete, Refresh])
class Subscriber: class Subscriber(Publisher):
QUEUE = "scanner" async def listen(self, matcher: Matcher):
async def __aenter__(self):
self._con = await connect_robust(
host=os.environ.get("RABBITMQ_HOST", "rabbitmq"),
port=int(os.environ.get("RABBITMQ_PORT", "5672")),
login=os.environ.get("RABBITMQ_DEFAULT_USER", "guest"),
password=os.environ.get("RABBITMQ_DEFAULT_PASS", "guest"),
)
self._channel = await self._con.channel()
self._queue = await self._channel.declare_queue(self.QUEUE)
return self
async def __aexit__(self, exc_type, exc_value, exc_tb):
await self._con.close()
async def listen(self, scanner: Matcher):
async def on_message(message: AbstractIncomingMessage): async def on_message(message: AbstractIncomingMessage):
try: try:
msg = decoder.decode(message.body) msg = decoder.decode(message.body)
ack = False ack = False
match msg: match msg:
case Scan(path): case Scan(path):
ack = await scanner.identify(path) ack = await matcher.identify(path)
case Delete(path): case Delete(path):
ack = await scanner.delete(path) ack = await matcher.delete(path)
case Refresh(kind, id): case Refresh(kind, id):
ack = await scanner.refresh(kind, id) ack = await matcher.refresh(kind, id)
case _: case _:
logger.error(f"Invalid action: {msg.action}") logger.error(f"Invalid action: {msg.action}")
if ack: if ack: