From d34d87957e4d1cba673475c9aa8081b42d182daa Mon Sep 17 00:00:00 2001 From: Zoe Roux Date: Wed, 17 Apr 2024 21:59:18 +0200 Subject: [PATCH] Use msgspec for messages deserialization --- scanner/matcher/__init__.py | 4 ++-- scanner/matcher/subscriber.py | 32 ++++++++++++++++++++------------ scanner/requirements.txt | 2 +- scanner/scanner/requirements.txt | 1 - shell.nix | 1 + 5 files changed, 24 insertions(+), 16 deletions(-) delete mode 100644 scanner/scanner/requirements.txt diff --git a/scanner/matcher/__init__.py b/scanner/matcher/__init__.py index e7ec19da..4445f9aa 100644 --- a/scanner/matcher/__init__.py +++ b/scanner/matcher/__init__.py @@ -14,5 +14,5 @@ async def main(): async with KyooClient() as kyoo, Subscriber() as sub: provider = Provider.get_default(kyoo.client) - scanner = Matcher(kyoo, provider) - await sub.listen(scanner) + matcher = Matcher(kyoo, provider) + await sub.listen(matcher) diff --git a/scanner/matcher/subscriber.py b/scanner/matcher/subscriber.py index 15d25710..b8b7bac9 100644 --- a/scanner/matcher/subscriber.py +++ b/scanner/matcher/subscriber.py @@ -1,7 +1,6 @@ import asyncio -from dataclasses import dataclass -from dataclasses_json import DataClassJsonMixin -from typing import Literal +from typing import Union +from msgspec import Struct, json import os import logging from aio_pika import connect_robust @@ -11,12 +10,19 @@ from matcher.matcher import Matcher logger = logging.getLogger(__name__) +class Message(Struct, tag_field="action", tag=str.lower): + pass -@dataclass -class Message(DataClassJsonMixin): - action: Literal["scan", "delete"] +class Scan(Message): path: str +class Delete(Message): + path: str + +class Identify(Message): + pass + +decoder = json.Decoder(Union[Scan, Delete, Identify]) class Subscriber: QUEUE = "scanner" @@ -36,13 +42,15 @@ class Subscriber: async def listen(self, scanner: Matcher): async def on_message(message: AbstractIncomingMessage): - msg = Message.from_json(message.body) + msg = decoder.decode(message.body) ack = False - match msg.action: - case "scan": - ack = await scanner.identify(msg.path) - case "delete": - ack = await scanner.delete(msg.path) + match msg: + case Scan(path): + ack = await scanner.identify(path) + case Delete(path): + ack = await scanner.delete(path) + # case Identify(): + # ack = await scanner.delete(msg.path) case _: logger.error(f"Invalid action: {msg.action}") if ack: diff --git a/scanner/requirements.txt b/scanner/requirements.txt index cdeb31bd..ca23e9ed 100644 --- a/scanner/requirements.txt +++ b/scanner/requirements.txt @@ -3,4 +3,4 @@ aiohttp jsons watchfiles aio-pika -dataclasses-json +msgspec diff --git a/scanner/scanner/requirements.txt b/scanner/scanner/requirements.txt deleted file mode 100644 index e092edcc..00000000 --- a/scanner/scanner/requirements.txt +++ /dev/null @@ -1 +0,0 @@ -aio-pika diff --git a/shell.nix b/shell.nix index 482414f3..6f92b2eb 100644 --- a/shell.nix +++ b/shell.nix @@ -9,6 +9,7 @@ aio-pika requests dataclasses-json + msgspec ]); dotnet = with pkgs.dotnetCorePackages; combinePackages [