diff --git a/scanner/.env.example b/scanner/.env.example index 6df9fd11..ffad6f13 100644 --- a/scanner/.env.example +++ b/scanner/.env.example @@ -1,6 +1,9 @@ # vi: ft=sh # shellcheck disable=SC2034 +KYOO_URL="http://api:3567/api" +KYOO_APIKEY="" + # Root directory that will be traversed to find video files (inside the container) SCANNER_LIBRARY_ROOT="/video" # A pattern (regex) to ignore video files. diff --git a/scanner/providers/kyoo_client.py b/scanner/providers/kyoo_client.py deleted file mode 100644 index 50d220aa..00000000 --- a/scanner/providers/kyoo_client.py +++ /dev/null @@ -1,159 +0,0 @@ -import os -import jsons -from aiohttp import ClientSession -from datetime import date -from logging import getLogger -from typing import List, Literal, Any, Optional -from urllib.parse import quote - -from .utils import format_date - -logger = getLogger(__name__) - - -class KyooClient: - def __init__(self) -> None: - self._api_key = os.environ.get("KYOO_APIKEY") - if not self._api_key: - self._api_key = os.environ.get("KYOO_APIKEYS") - if not self._api_key: - print("Missing environment variable 'KYOO_APIKEY'.") - exit(2) - self._api_key = self._api_key.split(",")[0] - - self._url = os.environ.get("KYOO_URL", "http://back:5000") - - async def __aenter__(self): - jsons.set_serializer(lambda x, **_: format_date(x), type[Optional[date | int]]) - self.client = ClientSession( - headers={ - "User-Agent": "kyoo", - }, - json_serialize=lambda *args, **kwargs: jsons.dumps( - *args, key_transformer=jsons.KEY_TRANSFORMER_CAMELCASE, **kwargs - ), - ) - return self - - async def __aexit__(self, exc_type, exc_value, exc_tb): - await self.client.close() - - async def get_registered_paths(self) -> List[str]: - async with self.client.get( - f"{self._url}/paths", - headers={"X-API-Key": self._api_key}, - ) as r: - r.raise_for_status() - return await r.json() - - async def create_issue(self, path: str, issue: str, extra: dict | None = None): - async with self.client.post( - f"{self._url}/issues", - json={ - "domain": "scanner", - "cause": path, - "reason": issue, - "extra": extra if extra is not None else {}, - }, - headers={"X-API-Key": self._api_key}, - ) as r: - if not r.ok: - logger.error(f"Request error: {await r.text()}") - r.raise_for_status() - - async def get_issues(self) -> List[str]: - async with self.client.get( - f"{self._url}/issues", - params={"limit": 0}, - headers={"X-API-Key": self._api_key}, - ) as r: - r.raise_for_status() - ret = await r.json() - return [x["cause"] for x in ret if x["domain"] == "scanner"] - - async def delete_issue(self, path: str): - async with self.client.delete( - f'{self._url}/issues?filter=domain eq scanner and cause eq "{quote(path)}"', - headers={"X-API-Key": self._api_key}, - ) as r: - if not r.ok: - logger.error(f"Request error: {await r.text()}") - r.raise_for_status() - - async def link_collection( - self, collection: str, type: Literal["movie"] | Literal["show"], id: str - ): - async with self.client.put( - f"{self._url}/collections/{collection}/{type}/{id}", - headers={"X-API-Key": self._api_key}, - ) as r: - # Allow 409 and continue as if it worked. - if not r.ok and r.status != 409: - logger.error(f"Request error: {await r.text()}") - r.raise_for_status() - - async def post(self, path: str, *, data: dict[str, Any]) -> str: - logger.debug( - "Sending %s: %s", - path, - jsons.dumps( - data, - key_transformer=jsons.KEY_TRANSFORMER_CAMELCASE, - jdkwargs={"indent": 4}, - ), - ) - async with self.client.post( - f"{self._url}/{path}", - json=data, - headers={"X-API-Key": self._api_key}, - ) as r: - # Allow 409 and continue as if it worked. - if not r.ok and r.status != 409: - logger.error(f"Request error: {await r.text()}") - r.raise_for_status() - ret = await r.json() - return ret["id"] - - async def delete( - self, - path: str, - ): - logger.info("Deleting %s", path) - - async with self.client.delete( - f"{self._url}/paths?recursive=true&path={quote(path)}", - headers={"X-API-Key": self._api_key}, - ) as r: - if not r.ok: - logger.error(f"Request error: {await r.text()}") - r.raise_for_status() - - async def get(self, path: str): - async with self.client.get( - f"{self._url}/{path}", - headers={"X-API-Key": self._api_key}, - ) as r: - if not r.ok: - logger.error(f"Request error: {await r.text()}") - r.raise_for_status() - return await r.json() - - async def put(self, path: str, *, data: dict[str, Any]): - logger.debug( - "Sending %s: %s", - path, - jsons.dumps( - data, - key_transformer=jsons.KEY_TRANSFORMER_CAMELCASE, - jdkwargs={"indent": 4}, - ), - ) - async with self.client.put( - f"{self._url}/{path}", - json=data, - headers={"X-API-Key": self._api_key}, - ) as r: - # Allow 409 and continue as if it worked. - if not r.ok and r.status != 409: - logger.error(f"Request error: {await r.text()}") - r.raise_for_status() diff --git a/scanner/providers/rabbit_base.py b/scanner/providers/rabbit_base.py deleted file mode 100644 index 064fa3d1..00000000 --- a/scanner/providers/rabbit_base.py +++ /dev/null @@ -1,34 +0,0 @@ -import os -from aio_pika import connect_robust - - -class RabbitBase: - QUEUE = "scanner" - - async def __aenter__(self): - self._con = await connect_robust( - os.environ.get("RABBITMQ_URL"), - host=os.environ.get("RABBITMQ_HOST", "rabbitmq"), - port=int(os.environ.get("RABBITMQ_PORT", "5672")), - login=os.environ.get("RABBITMQ_DEFAULT_USER", "guest"), - password=os.environ.get("RABBITMQ_DEFAULT_PASS", "guest"), - ) - - # Attempt to declare the queue passively in case it already exists. - try: - self._channel = await self._con.channel() - self._queue = await self._channel.declare_queue(self.QUEUE, passive=True) - return self - except Exception: - # The server will close the channel on error. - # Cleanup the reference to it. - await self._channel.close() - - # The queue does not exist, so actively declare it. - self._channel = await self._con.channel() - self._queue = await self._channel.declare_queue(self.QUEUE) - return self - - async def __aexit__(self, exc_type, exc_value, exc_tb): - await self._channel.close() - await self._con.close() diff --git a/scanner/requirements.txt b/scanner/requirements.txt index 717b6581..a957fb31 100644 --- a/scanner/requirements.txt +++ b/scanner/requirements.txt @@ -1,9 +1,6 @@ -guessit@git+https://github.com/zoriya/guessit fastapi[standard] - +pydantic +guessit@git+https://github.com/zoriya/guessit aiohttp -jsons watchfiles -aio-pika -msgspec langcodes diff --git a/scanner/scanner/client.py b/scanner/scanner/client.py new file mode 100644 index 00000000..14aedd87 --- /dev/null +++ b/scanner/scanner/client.py @@ -0,0 +1,134 @@ +import os +import jsons +from aiohttp import ClientSession +from datetime import date +from logging import getLogger +from typing import Optional + +from .utils import format_date +from .models.videos import VideoInfo + +logger = getLogger(__name__) + + +class KyooClient: + def __init__(self) -> None: + self._api_key: str = os.environ.get("KYOO_APIKEY") # type: ignore + if not self._api_key: + print("Missing environment variable 'KYOO_APIKEY'.") + exit(2) + self._url = os.environ.get("KYOO_URL", "http://api:3567/api") + + async def __aenter__(self): + jsons.set_serializer(lambda x, **_: format_date(x), type[Optional[date | int]]) + self._client = ClientSession( + headers={ + "User-Agent": "kyoo", + }, + json_serialize=lambda *args, **kwargs: jsons.dumps( + *args, key_transformer=jsons.KEY_TRANSFORMER_CAMELCASE, **kwargs + ), + ) + return self + + async def __aexit__(self): + await self._client.close() + + async def get_videos_info(self) -> VideoInfo: + async with self._client.get( + f"{self._url}/videos", + ) as r: + r.raise_for_status() + return VideoInfo(**await r.json()) + + async def create_videos(self, videos: list[Video]): + async with self._client.post( + f"{self._url}/videos", + json=[x.model_dump() for x in videos] + ) as r: + r.raise_for_status() + + async def delete_videos(self, videos: list[str] | set[str]): + async with self._client.delete( + f"{self._url}/videos", + json=videos, + ) as r: + r.raise_for_status() + + # async def link_collection( + # self, collection: str, type: Literal["movie"] | Literal["show"], id: str + # ): + # async with self.client.put( + # f"{self._url}/collections/{collection}/{type}/{id}", + # headers={"X-API-Key": self._api_key}, + # ) as r: + # # Allow 409 and continue as if it worked. + # if not r.ok and r.status != 409: + # logger.error(f"Request error: {await r.text()}") + # r.raise_for_status() + # + # async def post(self, path: str, *, data: dict[str, Any]) -> str: + # logger.debug( + # "Sending %s: %s", + # path, + # jsons.dumps( + # data, + # key_transformer=jsons.KEY_TRANSFORMER_CAMELCASE, + # jdkwargs={"indent": 4}, + # ), + # ) + # async with self.client.post( + # f"{self._url}/{path}", + # json=data, + # headers={"X-API-Key": self._api_key}, + # ) as r: + # # Allow 409 and continue as if it worked. + # if not r.ok and r.status != 409: + # logger.error(f"Request error: {await r.text()}") + # r.raise_for_status() + # ret = await r.json() + # return ret["id"] + # + # async def delete( + # self, + # path: str, + # ): + # logger.info("Deleting %s", path) + # + # async with self.client.delete( + # f"{self._url}/paths?recursive=true&path={quote(path)}", + # headers={"X-API-Key": self._api_key}, + # ) as r: + # if not r.ok: + # logger.error(f"Request error: {await r.text()}") + # r.raise_for_status() + # + # async def get(self, path: str): + # async with self.client.get( + # f"{self._url}/{path}", + # headers={"X-API-Key": self._api_key}, + # ) as r: + # if not r.ok: + # logger.error(f"Request error: {await r.text()}") + # r.raise_for_status() + # return await r.json() + # + # async def put(self, path: str, *, data: dict[str, Any]): + # logger.debug( + # "Sending %s: %s", + # path, + # jsons.dumps( + # data, + # key_transformer=jsons.KEY_TRANSFORMER_CAMELCASE, + # jdkwargs={"indent": 4}, + # ), + # ) + # async with self.client.put( + # f"{self._url}/{path}", + # json=data, + # headers={"X-API-Key": self._api_key}, + # ) as r: + # # Allow 409 and continue as if it worked. + # if not r.ok and r.status != 409: + # logger.error(f"Request error: {await r.text()}") + # r.raise_for_status() diff --git a/scanner/scanner/fsscan.py b/scanner/scanner/fsscan.py index ed3592ff..72934798 100644 --- a/scanner/scanner/fsscan.py +++ b/scanner/scanner/fsscan.py @@ -1,12 +1,10 @@ - import os import re import asyncio from typing import Optional from logging import getLogger -from .publisher import Publisher -from providers.kyoo_client import KyooClient +from .client import KyooClient logger = getLogger(__name__) @@ -20,23 +18,23 @@ def get_ignore_pattern(): return None -async def scan( - path_: Optional[str], publisher: Publisher, client: KyooClient, remove_deleted=False -): - path = path_ or os.environ.get("SCANNER_LIBRARY_ROOT", "/video") - logger.info("Starting scan at %s. This may take some time...", path) +ignore_pattern = get_ignore_pattern() - ignore_pattern = get_ignore_pattern() + +async def scan(path: Optional[str], client: KyooClient, remove_deleted=False): + path = path or os.environ.get("SCANNER_LIBRARY_ROOT", "/video") + logger.info("Starting scan at %s. This may take some time...", path) if ignore_pattern: logger.info(f"Applying ignore pattern: {ignore_pattern}") - registered = set(await client.get_registered_paths()) - videos = set() + info = await client.get_videos_info() + videos = set() for dirpath, dirnames, files in os.walk(path): # Skip directories with a `.ignore` file if ".ignore" in files: - dirnames.clear() # Prevents os.walk from descending into this directory + # Prevents os.walk from descending into this directory + dirnames.clear() continue for file in files: @@ -46,29 +44,20 @@ async def scan( continue videos.add(file_path) - to_register = videos - registered - to_delete = registered - videos if remove_deleted else set() + to_register = videos - info.paths + to_delete = info.paths - videos if remove_deleted else set() - if not any(to_register) and any(to_delete) and len(to_delete) == len(registered): + if not any(to_register) and any(to_delete) and len(to_delete) == len(info.paths): logger.warning("All video files are unavailable. Check your disks.") return # delete stale files before creating new ones to prevent potential conflicts if to_delete: logger.info("Removing %d stale files.", len(to_delete)) - await asyncio.gather(*[publisher.delete(path) for path in to_delete]) + await client.delete_videos(to_delete) if to_register: logger.info("Found %d new files to register.", len(to_register)) await asyncio.gather(*[publisher.add(path) for path in to_register]) - if remove_deleted: - issues = set(await client.get_issues()) - issues_to_delete = issues - videos - if issues_to_delete: - logger.info("Removing %d stale issues.", len(issues_to_delete)) - await asyncio.gather( - *[client.delete_issue(issue) for issue in issues_to_delete] - ) - logger.info("Scan finished for %s.", path) diff --git a/scanner/scanner/models/videos.py b/scanner/scanner/models/videos.py new file mode 100644 index 00000000..3b803fbc --- /dev/null +++ b/scanner/scanner/models/videos.py @@ -0,0 +1,15 @@ +from __future__ import annotations + +from ..utils import Model +from typing import Optional + + +class Resource(Model): + id: str + slug: str + + +class VideoInfo(Model): + paths: set[str] + unmatched: set[str] + guesses: dict[str, dict[str, Resource]] diff --git a/scanner/shell.nix b/scanner/shell.nix index 8305e1bb..b3fdd640 100644 --- a/scanner/shell.nix +++ b/scanner/shell.nix @@ -2,15 +2,10 @@ python = pkgs.python313.withPackages (ps: with ps; [ fastapi + pydantic guessit aiohttp - jsons watchfiles - pika - aio-pika - requests - dataclasses-json - msgspec langcodes ]); in