Write enqueue logic for identify requests

This commit is contained in:
Zoe Roux 2025-05-07 22:03:46 +02:00
parent ddf4617585
commit e9ce1016a8
No known key found for this signature in database
9 changed files with 57 additions and 96 deletions

View File

@ -1,18 +0,0 @@
async def main():
import logging
import sys
from providers.provider import Provider
from providers.kyoo_client import KyooClient
from .matcher import Matcher
from .subscriber import Subscriber
logging.basicConfig(level=logging.INFO)
if len(sys.argv) > 1 and sys.argv[1] == "-v":
logging.basicConfig(level=logging.DEBUG)
logging.getLogger("watchfiles").setLevel(logging.WARNING)
logging.getLogger("rebulk").setLevel(logging.WARNING)
async with KyooClient() as kyoo, Subscriber() as sub:
provider = Provider.get_default(kyoo.client)
matcher = Matcher(kyoo, provider)
await sub.listen(matcher)

View File

@ -1,6 +0,0 @@
#!/usr/bin/env python
import asyncio
import matcher
asyncio.run(matcher.main())

View File

@ -1,6 +0,0 @@
#!/usr/bin/env python
import asyncio
import scanner
asyncio.run(scanner.main())

View File

@ -1,30 +0,0 @@
from guessit.jsonutils import json
from aio_pika import Message
from logging import getLogger
from typing import Literal
from providers.rabbit_base import RabbitBase
logger = getLogger(__name__)
class Publisher(RabbitBase):
async def _publish(self, data: dict):
await self._channel.default_exchange.publish(
Message(json.dumps(data).encode()),
routing_key=self.QUEUE,
)
async def add(self, path: str):
await self._publish({"action": "scan", "path": path})
async def delete(self, path: str):
await self._publish({"action": "delete", "path": path})
async def refresh(
self,
kind: Literal["collection", "show", "movie", "season", "episode"],
id: str,
**_kwargs,
):
await self._publish({"action": "refresh", "kind": kind, "id": id})

View File

@ -1,24 +0,0 @@
import asyncio
from guessit.jsonutils import json
from aio_pika.abc import AbstractIncomingMessage
from logging import getLogger
from providers.rabbit_base import RabbitBase
logger = getLogger(__name__)
class Subscriber(RabbitBase):
QUEUE = "scanner.rescan"
async def listen(self, scan):
async def on_message(message: AbstractIncomingMessage):
try:
await scan()
await message.ack()
except Exception as e:
logger.exception("Unhandled error", exc_info=e)
await message.reject()
await self._queue.consume(on_message)
await asyncio.Future()

View File

@ -1,5 +1,12 @@
import logging
from fastapi import FastAPI
logging.basicConfig(level=logging.INFO)
logging.getLogger("watchfiles").setLevel(logging.WARNING)
logging.getLogger("rebulk").setLevel(logging.WARNING)
app = FastAPI(
title="Scanner",
description="API to control the long running scanner or interacting with external databases (themoviedb, tvdb...)\n\n"

View File

@ -7,7 +7,8 @@ from typing import Optional
from .client import KyooClient
from .identify import identify
from .models.metadataid import EpisodeId, MetadataId
from .models.videos import For, Video, VideoInfo
from .models.videos import For, Guess, Video, VideoInfo
from .queue import Request, enqueue
logger = getLogger(__name__)
@ -29,16 +30,9 @@ def is_video(path: str) -> bool:
return mime is not None and mime.startswith("video/")
async def scan(path: Optional[str], client: KyooClient, remove_deleted=False):
path = path or os.environ.get("SCANNER_LIBRARY_ROOT", "/video")
logger.info("Starting scan at %s. This may take some time...", path)
if ignore_pattern:
logger.info(f"Applying ignore pattern: {ignore_pattern}")
info = await client.get_videos_info()
def walk_fs(root_path: str) -> set[str]:
videos: set[str] = set()
for dirpath, dirnames, files in os.walk(path):
for dirpath, dirnames, files in os.walk(root_path):
# Skip directories with a `.ignore` file
if ".ignore" in files:
# Prevents os.walk from descending into this directory
@ -52,6 +46,17 @@ async def scan(path: Optional[str], client: KyooClient, remove_deleted=False):
continue
if is_video(file_path):
videos.add(file_path)
return videos
async def scan(path: Optional[str], client: KyooClient, remove_deleted=False):
path = path or os.environ.get("SCANNER_LIBRARY_ROOT", "/video")
logger.info("Starting scan at %s. This may take some time...", path)
if ignore_pattern:
logger.info(f"Applying ignore pattern: {ignore_pattern}")
videos = walk_fs(path)
info = await client.get_videos_info()
# TODO: handle unmatched
to_register = videos - info.paths
@ -80,8 +85,18 @@ async def scan(path: Optional[str], client: KyooClient, remove_deleted=False):
logger.error("Couldn't identify %s.", path, exc_info=e)
created = await client.create_videos(vids)
# TODO: queue those
need_scan = [x for x in created if not any(x.entries)]
await enqueue(
[
Request(
kind=x.guess.kind,
title=x.guess.title,
year=next(iter(x.guess.years), None),
videos=[Request.Video(id=x.id, episodes=x.guess.episodes)],
)
for x in created
if not any(x.entries) and x.guess.kind != "extra"
]
)
logger.info("Scan finished for %s.", path)

23
scanner/scanner/queue.py Normal file
View File

@ -0,0 +1,23 @@
from __future__ import annotations
from typing import Literal, Optional
from .models.videos import Guess
from .utils import Model
class Request(Model):
kind: Literal["episode"] | Literal["movie"]
title: str
year: Optional[int]
videos: list[Video]
class Video(Model):
id: str
episodes: list[Guess.Episode]
async def enqueue(requests: list[Request]):
# insert all requests
# on conflict(kind,title,year) add to the `videos` list
pass