Init new KyooClient & setup pydantic

This commit is contained in:
Zoe Roux 2025-05-05 01:21:19 +02:00
parent f403004842
commit 742ae4e771
No known key found for this signature in database
8 changed files with 169 additions and 229 deletions

View File

@ -1,6 +1,9 @@
# vi: ft=sh
# shellcheck disable=SC2034
KYOO_URL="http://api:3567/api"
KYOO_APIKEY=""
# Root directory that will be traversed to find video files (inside the container)
SCANNER_LIBRARY_ROOT="/video"
# A pattern (regex) to ignore video files.

View File

@ -1,159 +0,0 @@
import os
import jsons
from aiohttp import ClientSession
from datetime import date
from logging import getLogger
from typing import List, Literal, Any, Optional
from urllib.parse import quote
from .utils import format_date
logger = getLogger(__name__)
class KyooClient:
def __init__(self) -> None:
self._api_key = os.environ.get("KYOO_APIKEY")
if not self._api_key:
self._api_key = os.environ.get("KYOO_APIKEYS")
if not self._api_key:
print("Missing environment variable 'KYOO_APIKEY'.")
exit(2)
self._api_key = self._api_key.split(",")[0]
self._url = os.environ.get("KYOO_URL", "http://back:5000")
async def __aenter__(self):
jsons.set_serializer(lambda x, **_: format_date(x), type[Optional[date | int]])
self.client = ClientSession(
headers={
"User-Agent": "kyoo",
},
json_serialize=lambda *args, **kwargs: jsons.dumps(
*args, key_transformer=jsons.KEY_TRANSFORMER_CAMELCASE, **kwargs
),
)
return self
async def __aexit__(self, exc_type, exc_value, exc_tb):
await self.client.close()
async def get_registered_paths(self) -> List[str]:
async with self.client.get(
f"{self._url}/paths",
headers={"X-API-Key": self._api_key},
) as r:
r.raise_for_status()
return await r.json()
async def create_issue(self, path: str, issue: str, extra: dict | None = None):
async with self.client.post(
f"{self._url}/issues",
json={
"domain": "scanner",
"cause": path,
"reason": issue,
"extra": extra if extra is not None else {},
},
headers={"X-API-Key": self._api_key},
) as r:
if not r.ok:
logger.error(f"Request error: {await r.text()}")
r.raise_for_status()
async def get_issues(self) -> List[str]:
async with self.client.get(
f"{self._url}/issues",
params={"limit": 0},
headers={"X-API-Key": self._api_key},
) as r:
r.raise_for_status()
ret = await r.json()
return [x["cause"] for x in ret if x["domain"] == "scanner"]
async def delete_issue(self, path: str):
async with self.client.delete(
f'{self._url}/issues?filter=domain eq scanner and cause eq "{quote(path)}"',
headers={"X-API-Key": self._api_key},
) as r:
if not r.ok:
logger.error(f"Request error: {await r.text()}")
r.raise_for_status()
async def link_collection(
self, collection: str, type: Literal["movie"] | Literal["show"], id: str
):
async with self.client.put(
f"{self._url}/collections/{collection}/{type}/{id}",
headers={"X-API-Key": self._api_key},
) as r:
# Allow 409 and continue as if it worked.
if not r.ok and r.status != 409:
logger.error(f"Request error: {await r.text()}")
r.raise_for_status()
async def post(self, path: str, *, data: dict[str, Any]) -> str:
logger.debug(
"Sending %s: %s",
path,
jsons.dumps(
data,
key_transformer=jsons.KEY_TRANSFORMER_CAMELCASE,
jdkwargs={"indent": 4},
),
)
async with self.client.post(
f"{self._url}/{path}",
json=data,
headers={"X-API-Key": self._api_key},
) as r:
# Allow 409 and continue as if it worked.
if not r.ok and r.status != 409:
logger.error(f"Request error: {await r.text()}")
r.raise_for_status()
ret = await r.json()
return ret["id"]
async def delete(
self,
path: str,
):
logger.info("Deleting %s", path)
async with self.client.delete(
f"{self._url}/paths?recursive=true&path={quote(path)}",
headers={"X-API-Key": self._api_key},
) as r:
if not r.ok:
logger.error(f"Request error: {await r.text()}")
r.raise_for_status()
async def get(self, path: str):
async with self.client.get(
f"{self._url}/{path}",
headers={"X-API-Key": self._api_key},
) as r:
if not r.ok:
logger.error(f"Request error: {await r.text()}")
r.raise_for_status()
return await r.json()
async def put(self, path: str, *, data: dict[str, Any]):
logger.debug(
"Sending %s: %s",
path,
jsons.dumps(
data,
key_transformer=jsons.KEY_TRANSFORMER_CAMELCASE,
jdkwargs={"indent": 4},
),
)
async with self.client.put(
f"{self._url}/{path}",
json=data,
headers={"X-API-Key": self._api_key},
) as r:
# Allow 409 and continue as if it worked.
if not r.ok and r.status != 409:
logger.error(f"Request error: {await r.text()}")
r.raise_for_status()

View File

@ -1,34 +0,0 @@
import os
from aio_pika import connect_robust
class RabbitBase:
QUEUE = "scanner"
async def __aenter__(self):
self._con = await connect_robust(
os.environ.get("RABBITMQ_URL"),
host=os.environ.get("RABBITMQ_HOST", "rabbitmq"),
port=int(os.environ.get("RABBITMQ_PORT", "5672")),
login=os.environ.get("RABBITMQ_DEFAULT_USER", "guest"),
password=os.environ.get("RABBITMQ_DEFAULT_PASS", "guest"),
)
# Attempt to declare the queue passively in case it already exists.
try:
self._channel = await self._con.channel()
self._queue = await self._channel.declare_queue(self.QUEUE, passive=True)
return self
except Exception:
# The server will close the channel on error.
# Cleanup the reference to it.
await self._channel.close()
# The queue does not exist, so actively declare it.
self._channel = await self._con.channel()
self._queue = await self._channel.declare_queue(self.QUEUE)
return self
async def __aexit__(self, exc_type, exc_value, exc_tb):
await self._channel.close()
await self._con.close()

View File

@ -1,9 +1,6 @@
guessit@git+https://github.com/zoriya/guessit
fastapi[standard]
pydantic
guessit@git+https://github.com/zoriya/guessit
aiohttp
jsons
watchfiles
aio-pika
msgspec
langcodes

134
scanner/scanner/client.py Normal file
View File

@ -0,0 +1,134 @@
import os
import jsons
from aiohttp import ClientSession
from datetime import date
from logging import getLogger
from typing import Optional
from .utils import format_date
from .models.videos import VideoInfo
logger = getLogger(__name__)
class KyooClient:
def __init__(self) -> None:
self._api_key: str = os.environ.get("KYOO_APIKEY") # type: ignore
if not self._api_key:
print("Missing environment variable 'KYOO_APIKEY'.")
exit(2)
self._url = os.environ.get("KYOO_URL", "http://api:3567/api")
async def __aenter__(self):
jsons.set_serializer(lambda x, **_: format_date(x), type[Optional[date | int]])
self._client = ClientSession(
headers={
"User-Agent": "kyoo",
},
json_serialize=lambda *args, **kwargs: jsons.dumps(
*args, key_transformer=jsons.KEY_TRANSFORMER_CAMELCASE, **kwargs
),
)
return self
async def __aexit__(self):
await self._client.close()
async def get_videos_info(self) -> VideoInfo:
async with self._client.get(
f"{self._url}/videos",
) as r:
r.raise_for_status()
return VideoInfo(**await r.json())
async def create_videos(self, videos: list[Video]):
async with self._client.post(
f"{self._url}/videos",
json=[x.model_dump() for x in videos]
) as r:
r.raise_for_status()
async def delete_videos(self, videos: list[str] | set[str]):
async with self._client.delete(
f"{self._url}/videos",
json=videos,
) as r:
r.raise_for_status()
# async def link_collection(
# self, collection: str, type: Literal["movie"] | Literal["show"], id: str
# ):
# async with self.client.put(
# f"{self._url}/collections/{collection}/{type}/{id}",
# headers={"X-API-Key": self._api_key},
# ) as r:
# # Allow 409 and continue as if it worked.
# if not r.ok and r.status != 409:
# logger.error(f"Request error: {await r.text()}")
# r.raise_for_status()
#
# async def post(self, path: str, *, data: dict[str, Any]) -> str:
# logger.debug(
# "Sending %s: %s",
# path,
# jsons.dumps(
# data,
# key_transformer=jsons.KEY_TRANSFORMER_CAMELCASE,
# jdkwargs={"indent": 4},
# ),
# )
# async with self.client.post(
# f"{self._url}/{path}",
# json=data,
# headers={"X-API-Key": self._api_key},
# ) as r:
# # Allow 409 and continue as if it worked.
# if not r.ok and r.status != 409:
# logger.error(f"Request error: {await r.text()}")
# r.raise_for_status()
# ret = await r.json()
# return ret["id"]
#
# async def delete(
# self,
# path: str,
# ):
# logger.info("Deleting %s", path)
#
# async with self.client.delete(
# f"{self._url}/paths?recursive=true&path={quote(path)}",
# headers={"X-API-Key": self._api_key},
# ) as r:
# if not r.ok:
# logger.error(f"Request error: {await r.text()}")
# r.raise_for_status()
#
# async def get(self, path: str):
# async with self.client.get(
# f"{self._url}/{path}",
# headers={"X-API-Key": self._api_key},
# ) as r:
# if not r.ok:
# logger.error(f"Request error: {await r.text()}")
# r.raise_for_status()
# return await r.json()
#
# async def put(self, path: str, *, data: dict[str, Any]):
# logger.debug(
# "Sending %s: %s",
# path,
# jsons.dumps(
# data,
# key_transformer=jsons.KEY_TRANSFORMER_CAMELCASE,
# jdkwargs={"indent": 4},
# ),
# )
# async with self.client.put(
# f"{self._url}/{path}",
# json=data,
# headers={"X-API-Key": self._api_key},
# ) as r:
# # Allow 409 and continue as if it worked.
# if not r.ok and r.status != 409:
# logger.error(f"Request error: {await r.text()}")
# r.raise_for_status()

View File

@ -1,12 +1,10 @@
import os
import re
import asyncio
from typing import Optional
from logging import getLogger
from .publisher import Publisher
from providers.kyoo_client import KyooClient
from .client import KyooClient
logger = getLogger(__name__)
@ -20,23 +18,23 @@ def get_ignore_pattern():
return None
async def scan(
path_: Optional[str], publisher: Publisher, client: KyooClient, remove_deleted=False
):
path = path_ or os.environ.get("SCANNER_LIBRARY_ROOT", "/video")
logger.info("Starting scan at %s. This may take some time...", path)
ignore_pattern = get_ignore_pattern()
ignore_pattern = get_ignore_pattern()
async def scan(path: Optional[str], client: KyooClient, remove_deleted=False):
path = path or os.environ.get("SCANNER_LIBRARY_ROOT", "/video")
logger.info("Starting scan at %s. This may take some time...", path)
if ignore_pattern:
logger.info(f"Applying ignore pattern: {ignore_pattern}")
registered = set(await client.get_registered_paths())
videos = set()
info = await client.get_videos_info()
videos = set()
for dirpath, dirnames, files in os.walk(path):
# Skip directories with a `.ignore` file
if ".ignore" in files:
dirnames.clear() # Prevents os.walk from descending into this directory
# Prevents os.walk from descending into this directory
dirnames.clear()
continue
for file in files:
@ -46,29 +44,20 @@ async def scan(
continue
videos.add(file_path)
to_register = videos - registered
to_delete = registered - videos if remove_deleted else set()
to_register = videos - info.paths
to_delete = info.paths - videos if remove_deleted else set()
if not any(to_register) and any(to_delete) and len(to_delete) == len(registered):
if not any(to_register) and any(to_delete) and len(to_delete) == len(info.paths):
logger.warning("All video files are unavailable. Check your disks.")
return
# delete stale files before creating new ones to prevent potential conflicts
if to_delete:
logger.info("Removing %d stale files.", len(to_delete))
await asyncio.gather(*[publisher.delete(path) for path in to_delete])
await client.delete_videos(to_delete)
if to_register:
logger.info("Found %d new files to register.", len(to_register))
await asyncio.gather(*[publisher.add(path) for path in to_register])
if remove_deleted:
issues = set(await client.get_issues())
issues_to_delete = issues - videos
if issues_to_delete:
logger.info("Removing %d stale issues.", len(issues_to_delete))
await asyncio.gather(
*[client.delete_issue(issue) for issue in issues_to_delete]
)
logger.info("Scan finished for %s.", path)

View File

@ -0,0 +1,15 @@
from __future__ import annotations
from ..utils import Model
from typing import Optional
class Resource(Model):
id: str
slug: str
class VideoInfo(Model):
paths: set[str]
unmatched: set[str]
guesses: dict[str, dict[str, Resource]]

View File

@ -2,15 +2,10 @@
python = pkgs.python313.withPackages (ps:
with ps; [
fastapi
pydantic
guessit
aiohttp
jsons
watchfiles
pika
aio-pika
requests
dataclasses-json
msgspec
langcodes
]);
in