Inprove search result sorting, fix manual rescan (#470)

This commit is contained in:
Zoe Roux 2024-05-03 12:17:57 +02:00 committed by GitHub
commit 2303baf15b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 81 additions and 38 deletions

View File

@ -32,19 +32,20 @@ public class ScannerProducer : IScanner
{
_channel = rabbitConnection.CreateModel();
_channel.QueueDeclare("scanner", exclusive: false, autoDelete: false);
_channel.QueueDeclare("scanner.rescan", exclusive: false, autoDelete: false);
}
private Task _Publish<T>(T message)
private Task _Publish<T>(T message, string queue = "scanner")
{
var body = Encoding.UTF8.GetBytes(JsonSerializer.Serialize(message, Utility.JsonOptions));
_channel.BasicPublish("", routingKey: "scanner", body: body);
_channel.BasicPublish("", routingKey: queue, body: body);
return Task.CompletedTask;
}
public Task SendRescanRequest()
{
var message = new { Action = "rescan", };
return _Publish(message);
return _Publish(message, queue: "scanner.rescan");
}
public Task SendRefreshRequest(string kind, Guid id)

View File

@ -4,9 +4,7 @@ from msgspec import Struct, json
from logging import getLogger
from aio_pika.abc import AbstractIncomingMessage
from scanner.publisher import Publisher
from scanner.scanner import scan
from providers.rabbit_base import RabbitBase
from matcher.matcher import Matcher
logger = getLogger(__name__)
@ -29,14 +27,10 @@ class Refresh(Message):
id: str
class Rescan(Message):
pass
decoder = json.Decoder(Union[Scan, Delete, Refresh])
decoder = json.Decoder(Union[Scan, Delete, Refresh, Rescan])
class Subscriber(Publisher):
class Subscriber(RabbitBase):
async def listen(self, matcher: Matcher):
async def on_message(message: AbstractIncomingMessage):
try:
@ -49,9 +43,6 @@ class Subscriber(Publisher):
ack = await matcher.delete(path)
case Refresh(kind, id):
ack = await matcher.refresh(kind, id)
case Rescan():
await scan(None, self, matcher._client)
ack = True
case _:
logger.error(f"Invalid action: {msg.action}")
if ack:

View File

@ -533,10 +533,10 @@ class TheMovieDatabase(Provider):
(
x
for x in results
if ("name" in x and x["name"] == name)
or ("title" in x and x["title"] == name)
if ("name" in x and x["name"].casefold() == name.casefold())
or ("title" in x and x["title"].casefold() == name.casefold())
),
key=lambda x: x["popularity"],
key=lambda x: (x["vote_count"], x["popularity"]),
reverse=True,
)
if res:
@ -579,9 +579,17 @@ class TheMovieDatabase(Provider):
return None
group = await self.get(f"tv/episode_group/{group_id}")
absgrp = [ep for grp in group["groups"] for ep in grp["episodes"]]
logger.warn(
f"Incomplete absolute group for show {show_id}. Filling missing values by assuming season/episode order is ascending"
)
season_starts = [
next(
(
x["episode_number"]
for x in absgrp
if x["season_number"] == s.season_number
),
1,
)
for s in show.seasons
]
complete_abs = absgrp + [
{"season_number": s.season_number, "episode_number": e}
for s in show.seasons
@ -589,10 +597,19 @@ class TheMovieDatabase(Provider):
if s.season_number > 0
for e in range(1, s.episodes_count)
if not any(
x["season_number"] == s.season_number and x["episode_number"] == e
x["season_number"] == s.season_number
and (
x["episode_number"] == e
# take into account weird absolute (for example one piece, episodes are not reset to 1 when the season starts)
or x["episode_number"] == season_starts[s.season_number - 1] + e
)
for x in absgrp
)
]
if len(complete_abs) != len(absgrp):
logger.warn(
f"Incomplete absolute group for show {show_id}. Filling missing values by assuming season/episode order is ascending"
)
return complete_abs
except Exception as e:
logger.exception(

View File

@ -0,0 +1,20 @@
import os
from aio_pika import connect_robust
class RabbitBase:
QUEUE = "scanner"
async def __aenter__(self):
self._con = await connect_robust(
host=os.environ.get("RABBITMQ_HOST", "rabbitmq"),
port=int(os.environ.get("RABBITMQ_PORT", "5672")),
login=os.environ.get("RABBITMQ_DEFAULT_USER", "guest"),
password=os.environ.get("RABBITMQ_DEFAULT_PASS", "guest"),
)
self._channel = await self._con.channel()
self._queue = await self._channel.declare_queue(self.QUEUE)
return self
async def __aexit__(self, exc_type, exc_value, exc_tb):
await self._con.close()

View File

@ -13,8 +13,13 @@ async def main():
async with Publisher() as publisher, KyooClient() as client:
path = os.environ.get("SCANNER_LIBRARY_ROOT", "/video")
async def scan_all():
await scan(path, publisher, client, remove_deleted=True)
await asyncio.gather(
monitor(path, publisher, client),
scan(path, publisher, client, remove_deleted=True),
scan_all(),
refresh(publisher, client),
publisher.listen(scan_all),
)

View File

@ -1,26 +1,23 @@
import os
import asyncio
from guessit.jsonutils import json
from aio_pika import Message, connect_robust
from aio_pika import Message
from aio_pika.abc import AbstractIncomingMessage
from logging import getLogger
from typing import Literal
from providers.rabbit_base import RabbitBase
class Publisher:
QUEUE = "scanner"
logger = getLogger(__name__)
class Publisher(RabbitBase):
QUEUE_RESCAN = "scanner.rescan"
async def __aenter__(self):
self._con = await connect_robust(
host=os.environ.get("RABBITMQ_HOST", "rabbitmq"),
port=int(os.environ.get("RABBITMQ_PORT", "5672")),
login=os.environ.get("RABBITMQ_DEFAULT_USER", "guest"),
password=os.environ.get("RABBITMQ_DEFAULT_PASS", "guest"),
)
self._channel = await self._con.channel()
self._queue = await self._channel.declare_queue(self.QUEUE)
await super().__aenter__()
self._queue = await self._channel.declare_queue(self.QUEUE_RESCAN)
return self
async def __aexit__(self, exc_type, exc_value, exc_tb):
await self._con.close()
async def _publish(self, data: dict):
await self._channel.default_exchange.publish(
Message(json.dumps(data).encode()),
@ -40,3 +37,15 @@ class Publisher:
**_kwargs,
):
await self._publish({"action": "refresh", "kind": kind, "id": id})
async def listen(self, scan):
async def on_message(message: AbstractIncomingMessage):
try:
await scan()
await message.ack()
except Exception as e:
logger.exception("Unhandled error", exc_info=e)
await message.reject()
await self._queue.consume(on_message)
await asyncio.Future()