Finish client/scanner split

This commit is contained in:
Zoe Roux 2024-04-08 22:33:37 +02:00
parent e284f771df
commit a18fa7ebad
No known key found for this signature in database
11 changed files with 138 additions and 138 deletions

View File

@ -35,7 +35,7 @@ def on_message(
body: bytes,
):
try:
message = Message.from_json(body) # type: Message
message = Message.from_json(body)
service.update(message.value.user, message.value.resource, message.value)
except Exception as e:
logging.exception("Error processing message.", exc_info=e)

View File

@ -1,5 +1,5 @@
from dataclasses import dataclass
from dataclasses_json import dataclass_json, LetterCase
from dataclasses_json import DataClassJsonMixin, dataclass_json, LetterCase
from autosync.models.episode import Episode
from autosync.models.movie import Movie
@ -17,7 +17,7 @@ class WatchStatusMessage(WatchStatus):
@dataclass_json(letter_case=LetterCase.CAMEL)
@dataclass
class Message:
class Message(DataClassJsonMixin):
action: str
type: str
value: WatchStatusMessage

View File

@ -5,4 +5,4 @@ COPY ./requirements.txt .
RUN pip3 install -r ./requirements.txt
COPY . .
ENTRYPOINT ["python3", "-m", "scanner", "-v"]
ENTRYPOINT ["python3", "-m", "scanner"]

View File

@ -7,6 +7,7 @@ async def main():
from .publisher import Publisher
from providers.kyoo_client import KyooClient
logging.basicConfig(level=logging.INFO)
logging.getLogger("watchfiles").setLevel(logging.WARNING)
async with Publisher() as publisher, KyooClient() as client:

View File

@ -8,6 +8,9 @@ async def scan(path: str):
self.issues = await self.get_issues()
videos = [str(p) for p in Path(path).rglob("*") if p.is_file()]
deleted = [x for x in self.registered if x not in videos]
# if path in self.registered or self._ignore_pattern.match(path):
# return
#
# try:
# self._ignore_pattern = re.compile(

View File

@ -1,31 +1,37 @@
from datetime import timedelta
import os
import asyncio
import logging
import jsons
import re
from aiohttp import ClientSession
from pathlib import Path
from typing import List, Literal, Any
from urllib.parse import quote
from providers.provider import Provider, ProviderError
from providers.types.collection import Collection
from providers.types.show import Show
from providers.types.episode import Episode, PartialShow
from providers.types.season import Season
class KyooClient:
def __init__(
self, client: ClientSession, *, api_key: str
) -> None:
self._client = client
self._api_key = api_key
def __init__(self) -> None:
self._api_key = os.environ.get("KYOO_APIKEY")
if not self._api_key:
self._api_key = os.environ.get("KYOO_APIKEYS")
if not self._api_key:
print("Missing environment variable 'KYOO_APIKEY'.")
exit(2)
self._api_key = self._api_key.split(",")[0]
self._url = os.environ.get("KYOO_URL", "http://back:5000")
async def __aenter__(self):
jsons.set_serializer(lambda x, **_: format_date(x), Optional[date | int]) # type: ignore
self.client = ClientSession(
json_serialize=lambda *args, **kwargs: jsons.dumps(
*args, key_transformer=jsons.KEY_TRANSFORMER_CAMELCASE, **kwargs
),
)
return self
async def __aexit__(self, exc_type, exc_value, exc_tb):
await self.client.close()
async def get_issues(self) -> List[str]:
async with self._client.get(
async with self.client.get(
f"{self._url}/issues",
params={"limit": 0},
headers={"X-API-Key": self._api_key},
@ -34,10 +40,23 @@ class KyooClient:
ret = await r.json()
return [x["cause"] for x in ret if x["domain"] == "scanner"]
async def create_issue(self, path: str, issue: str, extra: dict | None = None):
await self.client.post(
f"{self._url}/issues",
json={"domain": "scanner", "cause": path, "reason": issue, "extra": extra},
headers={"X-API-Key": self._api_key},
)
async def delete_issue(self, path: str):
await self.client.delete(
f'{self._url}/issues?filter=domain eq scanner and cause eq "{path}"',
headers={"X-API-Key": self._api_key},
)
async def link_collection(
self, collection: str, type: Literal["movie"] | Literal["show"], id: str
):
async with self._client.put(
async with self.client.put(
f"{self._url}/collections/{collection}/{type}/{id}",
headers={"X-API-Key": self._api_key},
) as r:
@ -56,7 +75,7 @@ class KyooClient:
jdkwargs={"indent": 4},
),
)
async with self._client.post(
async with self.client.post(
f"{self._url}/{path}",
json=data,
headers={"X-API-Key": self._api_key},
@ -85,13 +104,12 @@ class KyooClient:
async def delete(
self,
path: str,
type: Literal["episode", "movie", "issue"] | None = None,
type: Literal["episode", "movie"] | None = None,
):
logging.info("Deleting %s", path)
self.registered = filter(lambda x: x != path, self.registered)
if type is None or type == "movie":
async with self._client.delete(
async with self.client.delete(
f'{self._url}/movies?filter=path eq "{quote(path)}"',
headers={"X-API-Key": self._api_key},
) as r:
@ -100,7 +118,7 @@ class KyooClient:
r.raise_for_status()
if type is None or type == "episode":
async with self._client.delete(
async with self.client.delete(
f'{self._url}/episodes?filter=path eq "{quote(path)}"',
headers={"X-API-Key": self._api_key},
) as r:
@ -108,10 +126,4 @@ class KyooClient:
logging.error(f"Request error: {await r.text()}")
r.raise_for_status()
if path in self.issues:
self.issues = filter(lambda x: x != path, self.issues)
await self._client.delete(
f'{self._url}/issues?filter=domain eq scanner and cause eq "{quote(path)}"',
headers={"X-API-Key": self._api_key},
)
await self.delete_issue(path)

View File

@ -1,7 +1,7 @@
import os
from aiohttp import ClientSession
from abc import abstractmethod, abstractproperty
from typing import Optional, TypeVar
from typing import Optional, Self
from providers.implementations.thexem import TheXem
from providers.utils import ProviderError
@ -13,14 +13,13 @@ from .types.movie import Movie
from .types.collection import Collection
Self = TypeVar("Self", bound="Provider")
class Provider:
@classmethod
def get_all(
cls: type[Self], client: ClientSession, languages: list[str]
) -> tuple[list[Self], TheXem]:
def get_all(cls, client: ClientSession) -> tuple[Self, TheXem]:
languages = os.environ.get("LIBRARY_LANGUAGES")
if not languages:
print("Missing environment variable 'LIBRARY_LANGUAGES'.")
exit(2)
providers = []
from providers.idmapper import IdMapper
@ -44,7 +43,7 @@ class Provider:
idmapper.init(tmdb=tmdb, language=languages[0])
return providers, xem
return next(providers), xem
@abstractproperty
def name(self) -> str:

View File

@ -1,47 +1,18 @@
from providers.kyoo_client import KyooClient
async def main():
import asyncio
import os
import logging
import sys
import jsons
from datetime import date
from typing import Optional
from aiohttp import ClientSession
from providers.utils import format_date, ProviderError
from providers.provider import Provider
from providers.kyoo_client import KyooClient
from .scanner import Scanner
from .subscriber import Subscriber
path = os.environ.get("SCANNER_LIBRARY_ROOT", "/video")
languages = os.environ.get("LIBRARY_LANGUAGES")
if not languages:
print("Missing environment variable 'LIBRARY_LANGUAGES'.")
exit(2)
api_key = os.environ.get("KYOO_APIKEY")
if not api_key:
api_key = os.environ.get("KYOO_APIKEYS")
if not api_key:
print("Missing environment variable 'KYOO_APIKEY'.")
exit(2)
api_key = api_key.split(",")[0]
logging.basicConfig(level=logging.INFO)
if len(sys.argv) > 1 and sys.argv[1] == "-v":
logging.basicConfig(level=logging.INFO)
if len(sys.argv) > 1 and sys.argv[1] == "-vv":
logging.basicConfig(level=logging.DEBUG)
logging.getLogger("watchfiles").setLevel(logging.WARNING)
logging.getLogger("rebulk").setLevel(logging.WARNING)
jsons.set_serializer(lambda x, **_: format_date(x), Optional[date | int]) # type: ignore
async with ClientSession(
json_serialize=lambda *args, **kwargs: jsons.dumps(
*args, key_transformer=jsons.KEY_TRANSFORMER_CAMELCASE, **kwargs
),
) as client:
kyoo = KyooClient(client, api_key=api_key)
provider =
try:
scanner = Scanner(kyoo, languages=languages.split(","), api_key=api_key)
except ProviderError as e:
logging.error(e)
async with KyooClient() as kyoo, Subscriber() as sub:
provider, xem = Provider.get_all(kyoo.client)
scanner = Scanner(kyoo, provider, xem)
await sub.listen(scanner)

View File

@ -9,7 +9,6 @@ from providers.types.episode import Episode, PartialShow
from providers.types.season import Season
from providers.kyoo_client import KyooClient
from .parser.guess import guessit
from .utils import handle_errors
from .cache import cache, exec_as_cache, make_key
@ -23,11 +22,30 @@ class Scanner:
self._show_cache = {}
self._season_cache = {}
@handle_errors
async def delete(self, path: str):
try:
await self._client.delete(path)
return True
except Exception as e:
logging.exception("Unhandled error", exc_info=e)
return False
async def identify(self, path: str):
# if path in self.registered or self._ignore_pattern.match(path):
# return
#
try:
await self.identify(path)
await self._client.delete_issue(path)
except ProviderError as e:
logging.error(e)
await self._client.create_issue(path, str(e))
except Exception as e:
logging.exception("Unhandled error", exc_info=e)
await self._client.create_issue(
path, "Unknown error", {"type": type(e).__name__, "message": str(e)}
)
return False
return True
async def _identify(self, path: str):
raw = guessit(path, xem_titles=await self._xem.get_expected_titles())
if "mimetype" not in raw or not raw["mimetype"].startswith("video"):

View File

@ -0,0 +1,52 @@
from dataclasses import dataclass
from dataclasses_json import DataClassJsonMixin
from typing import Literal
import os
import logging
from aio_pika import connect_robust
from aio_pika.abc import AbstractIncomingMessage
from scanner.scanner import Scanner
logger = logging.getLogger(__name__)
@dataclass
class Message(DataClassJsonMixin):
action: Literal["scan"] | Literal["delete"]
path: str
class Subscriber:
QUEUE = "scanner"
async def __aenter__(self):
self._con = await connect_robust(
host=os.environ.get("RABBITMQ_HOST", "rabbitmq"),
login=os.environ.get("RABBITMQ_DEFAULT_USER", "guest"),
password=os.environ.get("RABBITMQ_DEFAULT_PASS", "guest"),
)
self._channel = await self._con.channel()
self._queue = await self._channel.declare_queue(self.QUEUE)
return self
async def __aexit__(self, exc_type, exc_value, exc_tb):
await self._con.close()
async def listen(self, scanner: Scanner):
async def on_message(message: AbstractIncomingMessage):
async with message.process():
msg = Message.from_json(message.body)
ack = False
match msg.action:
case "scan":
ack = await scanner.identify(msg.path)
case "delete":
ack = await scanner.delete(msg.path)
case _:
logger.error(f"Invalid action: {msg.action}")
if ack:
await message.ack()
else:
await message.nack(requeue=False)
await self._queue.consume(on_message, no_ack=True)

View File

@ -1,56 +0,0 @@
from __future__ import annotations
import logging
from functools import wraps
from itertools import islice
from typing import TYPE_CHECKING, Iterator, List, TypeVar
from providers.utils import ProviderError
if TYPE_CHECKING:
from scanner.scanner import Scanner
T = TypeVar("T")
def batch(iterable: Iterator[T], n: int) -> Iterator[List[T]]:
"Batch data into lists of length n. The last batch may be shorter."
# batched('ABCDEFG', 3) --> ABC DEF G
it = iter(iterable)
while True:
batch = list(islice(it, n))
if not batch:
return
yield batch
def handle_errors(f):
@wraps(f)
async def internal(self: Scanner, path: str):
try:
await f(self, path)
if path in self.issues:
await self._client.delete(
f'{self._url}/issues?filter=domain eq scanner and cause eq "{path}"',
headers={"X-API-Key": self._api_key},
)
except ProviderError as e:
logging.error(str(e))
await self._client.post(
f"{self._url}/issues",
json={"domain": "scanner", "cause": path, "reason": str(e)},
headers={"X-API-Key": self._api_key},
)
except Exception as e:
logging.exception("Unhandled error", exc_info=e)
await self._client.post(
f"{self._url}/issues",
json={
"domain": "scanner",
"cause": path,
"reason": "Unknown error",
"extra": {"type": type(e).__name__, "message": str(e)},
},
headers={"X-API-Key": self._api_key},
)
return internal