diff --git a/back/src/Kyoo.RabbitMq/ScannerProducer.cs b/back/src/Kyoo.RabbitMq/ScannerProducer.cs index 75c575a0..1034a2c5 100644 --- a/back/src/Kyoo.RabbitMq/ScannerProducer.cs +++ b/back/src/Kyoo.RabbitMq/ScannerProducer.cs @@ -32,19 +32,20 @@ public class ScannerProducer : IScanner { _channel = rabbitConnection.CreateModel(); _channel.QueueDeclare("scanner", exclusive: false, autoDelete: false); + _channel.QueueDeclare("scanner.rescan", exclusive: false, autoDelete: false); } - private Task _Publish(T message) + private Task _Publish(T message, string queue = "scanner") { var body = Encoding.UTF8.GetBytes(JsonSerializer.Serialize(message, Utility.JsonOptions)); - _channel.BasicPublish("", routingKey: "scanner", body: body); + _channel.BasicPublish("", routingKey: queue, body: body); return Task.CompletedTask; } public Task SendRescanRequest() { var message = new { Action = "rescan", }; - return _Publish(message); + return _Publish(message, queue: "scanner.rescan"); } public Task SendRefreshRequest(string kind, Guid id) diff --git a/scanner/matcher/subscriber.py b/scanner/matcher/subscriber.py index fc7ec589..167654c8 100644 --- a/scanner/matcher/subscriber.py +++ b/scanner/matcher/subscriber.py @@ -4,9 +4,7 @@ from msgspec import Struct, json from logging import getLogger from aio_pika.abc import AbstractIncomingMessage -from scanner.publisher import Publisher -from scanner.scanner import scan - +from providers.rabbit_base import RabbitBase from matcher.matcher import Matcher logger = getLogger(__name__) @@ -29,14 +27,10 @@ class Refresh(Message): id: str -class Rescan(Message): - pass +decoder = json.Decoder(Union[Scan, Delete, Refresh]) -decoder = json.Decoder(Union[Scan, Delete, Refresh, Rescan]) - - -class Subscriber(Publisher): +class Subscriber(RabbitBase): async def listen(self, matcher: Matcher): async def on_message(message: AbstractIncomingMessage): try: @@ -49,9 +43,6 @@ class Subscriber(Publisher): ack = await matcher.delete(path) case Refresh(kind, id): ack = await matcher.refresh(kind, id) - case Rescan(): - await scan(None, self, matcher._client) - ack = True case _: logger.error(f"Invalid action: {msg.action}") if ack: diff --git a/scanner/providers/implementations/themoviedatabase.py b/scanner/providers/implementations/themoviedatabase.py index 3f73e1d5..e675e4c7 100644 --- a/scanner/providers/implementations/themoviedatabase.py +++ b/scanner/providers/implementations/themoviedatabase.py @@ -533,10 +533,10 @@ class TheMovieDatabase(Provider): ( x for x in results - if ("name" in x and x["name"] == name) - or ("title" in x and x["title"] == name) + if ("name" in x and x["name"].casefold() == name.casefold()) + or ("title" in x and x["title"].casefold() == name.casefold()) ), - key=lambda x: x["popularity"], + key=lambda x: (x["vote_count"], x["popularity"]), reverse=True, ) if res: @@ -579,9 +579,17 @@ class TheMovieDatabase(Provider): return None group = await self.get(f"tv/episode_group/{group_id}") absgrp = [ep for grp in group["groups"] for ep in grp["episodes"]] - logger.warn( - f"Incomplete absolute group for show {show_id}. Filling missing values by assuming season/episode order is ascending" - ) + season_starts = [ + next( + ( + x["episode_number"] + for x in absgrp + if x["season_number"] == s.season_number + ), + 1, + ) + for s in show.seasons + ] complete_abs = absgrp + [ {"season_number": s.season_number, "episode_number": e} for s in show.seasons @@ -589,10 +597,19 @@ class TheMovieDatabase(Provider): if s.season_number > 0 for e in range(1, s.episodes_count) if not any( - x["season_number"] == s.season_number and x["episode_number"] == e + x["season_number"] == s.season_number + and ( + x["episode_number"] == e + # take into account weird absolute (for example one piece, episodes are not reset to 1 when the season starts) + or x["episode_number"] == season_starts[s.season_number - 1] + e + ) for x in absgrp ) ] + if len(complete_abs) != len(absgrp): + logger.warn( + f"Incomplete absolute group for show {show_id}. Filling missing values by assuming season/episode order is ascending" + ) return complete_abs except Exception as e: logger.exception( diff --git a/scanner/providers/rabbit_base.py b/scanner/providers/rabbit_base.py new file mode 100644 index 00000000..ff29c4f1 --- /dev/null +++ b/scanner/providers/rabbit_base.py @@ -0,0 +1,20 @@ +import os +from aio_pika import connect_robust + + +class RabbitBase: + QUEUE = "scanner" + + async def __aenter__(self): + self._con = await connect_robust( + host=os.environ.get("RABBITMQ_HOST", "rabbitmq"), + port=int(os.environ.get("RABBITMQ_PORT", "5672")), + login=os.environ.get("RABBITMQ_DEFAULT_USER", "guest"), + password=os.environ.get("RABBITMQ_DEFAULT_PASS", "guest"), + ) + self._channel = await self._con.channel() + self._queue = await self._channel.declare_queue(self.QUEUE) + return self + + async def __aexit__(self, exc_type, exc_value, exc_tb): + await self._con.close() diff --git a/scanner/scanner/__init__.py b/scanner/scanner/__init__.py index bce4273e..c7ffd55a 100644 --- a/scanner/scanner/__init__.py +++ b/scanner/scanner/__init__.py @@ -13,8 +13,13 @@ async def main(): async with Publisher() as publisher, KyooClient() as client: path = os.environ.get("SCANNER_LIBRARY_ROOT", "/video") + + async def scan_all(): + await scan(path, publisher, client, remove_deleted=True) + await asyncio.gather( monitor(path, publisher, client), - scan(path, publisher, client, remove_deleted=True), + scan_all(), refresh(publisher, client), + publisher.listen(scan_all), ) diff --git a/scanner/scanner/publisher.py b/scanner/scanner/publisher.py index 315855e1..0132a781 100644 --- a/scanner/scanner/publisher.py +++ b/scanner/scanner/publisher.py @@ -1,26 +1,23 @@ -import os +import asyncio from guessit.jsonutils import json -from aio_pika import Message, connect_robust +from aio_pika import Message +from aio_pika.abc import AbstractIncomingMessage +from logging import getLogger from typing import Literal +from providers.rabbit_base import RabbitBase -class Publisher: - QUEUE = "scanner" +logger = getLogger(__name__) + + +class Publisher(RabbitBase): + QUEUE_RESCAN = "scanner.rescan" async def __aenter__(self): - self._con = await connect_robust( - host=os.environ.get("RABBITMQ_HOST", "rabbitmq"), - port=int(os.environ.get("RABBITMQ_PORT", "5672")), - login=os.environ.get("RABBITMQ_DEFAULT_USER", "guest"), - password=os.environ.get("RABBITMQ_DEFAULT_PASS", "guest"), - ) - self._channel = await self._con.channel() - self._queue = await self._channel.declare_queue(self.QUEUE) + await super().__aenter__() + self._queue = await self._channel.declare_queue(self.QUEUE_RESCAN) return self - async def __aexit__(self, exc_type, exc_value, exc_tb): - await self._con.close() - async def _publish(self, data: dict): await self._channel.default_exchange.publish( Message(json.dumps(data).encode()), @@ -40,3 +37,15 @@ class Publisher: **_kwargs, ): await self._publish({"action": "refresh", "kind": kind, "id": id}) + + async def listen(self, scan): + async def on_message(message: AbstractIncomingMessage): + try: + await scan() + await message.ack() + except Exception as e: + logger.exception("Unhandled error", exc_info=e) + await message.reject() + + await self._queue.consume(on_message) + await asyncio.Future()