Kyoo/scanner/providers/kyoo_client.py
2024-09-20 21:19:56 +02:00

160 lines
4.2 KiB
Python

import os
import jsons
from aiohttp import ClientSession
from datetime import date
from logging import getLogger
from typing import List, Literal, Any, Optional
from urllib.parse import quote
from .utils import format_date
logger = getLogger(__name__)
class KyooClient:
def __init__(self) -> None:
self._api_key = os.environ.get("KYOO_APIKEY")
if not self._api_key:
self._api_key = os.environ.get("KYOO_APIKEYS")
if not self._api_key:
print("Missing environment variable 'KYOO_APIKEY'.")
exit(2)
self._api_key = self._api_key.split(",")[0]
self._url = os.environ.get("KYOO_URL", "http://back:5000")
async def __aenter__(self):
jsons.set_serializer(lambda x, **_: format_date(x), type[Optional[date | int]])
self.client = ClientSession(
headers={
"User-Agent": "kyoo",
},
json_serialize=lambda *args, **kwargs: jsons.dumps(
*args, key_transformer=jsons.KEY_TRANSFORMER_CAMELCASE, **kwargs
),
)
return self
async def __aexit__(self, exc_type, exc_value, exc_tb):
await self.client.close()
async def get_registered_paths(self) -> List[str]:
async with self.client.get(
f"{self._url}/paths",
headers={"X-API-Key": self._api_key},
) as r:
r.raise_for_status()
return await r.json()
async def create_issue(self, path: str, issue: str, extra: dict | None = None):
async with self.client.post(
f"{self._url}/issues",
json={
"domain": "scanner",
"cause": path,
"reason": issue,
"extra": extra if extra is not None else {},
},
headers={"X-API-Key": self._api_key},
) as r:
if not r.ok:
logger.error(f"Request error: {await r.text()}")
r.raise_for_status()
async def get_issues(self) -> List[str]:
async with self.client.get(
f"{self._url}/issues",
params={"limit": 0},
headers={"X-API-Key": self._api_key},
) as r:
r.raise_for_status()
ret = await r.json()
return [x["cause"] for x in ret if x["domain"] == "scanner"]
async def delete_issue(self, path: str):
async with self.client.delete(
f'{self._url}/issues?filter=domain eq scanner and cause eq "{quote(path)}"',
headers={"X-API-Key": self._api_key},
) as r:
if not r.ok:
logger.error(f"Request error: {await r.text()}")
r.raise_for_status()
async def link_collection(
self, collection: str, type: Literal["movie"] | Literal["show"], id: str
):
async with self.client.put(
f"{self._url}/collections/{collection}/{type}/{id}",
headers={"X-API-Key": self._api_key},
) as r:
# Allow 409 and continue as if it worked.
if not r.ok and r.status != 409:
logger.error(f"Request error: {await r.text()}")
r.raise_for_status()
async def post(self, path: str, *, data: dict[str, Any]) -> str:
logger.debug(
"Sending %s: %s",
path,
jsons.dumps(
data,
key_transformer=jsons.KEY_TRANSFORMER_CAMELCASE,
jdkwargs={"indent": 4},
),
)
async with self.client.post(
f"{self._url}/{path}",
json=data,
headers={"X-API-Key": self._api_key},
) as r:
# Allow 409 and continue as if it worked.
if not r.ok and r.status != 409:
logger.error(f"Request error: {await r.text()}")
r.raise_for_status()
ret = await r.json()
return ret["id"]
async def delete(
self,
path: str,
):
logger.info("Deleting %s", path)
async with self.client.delete(
f"{self._url}/paths?recursive=true&path={quote(path)}",
headers={"X-API-Key": self._api_key},
) as r:
if not r.ok:
logger.error(f"Request error: {await r.text()}")
r.raise_for_status()
async def get(self, path: str):
async with self.client.get(
f"{self._url}/{path}",
headers={"X-API-Key": self._api_key},
) as r:
if not r.ok:
logger.error(f"Request error: {await r.text()}")
r.raise_for_status()
return await r.json()
async def put(self, path: str, *, data: dict[str, Any]):
logger.debug(
"Sending %s: %s",
path,
jsons.dumps(
data,
key_transformer=jsons.KEY_TRANSFORMER_CAMELCASE,
jdkwargs={"indent": 4},
),
)
async with self.client.put(
f"{self._url}/{path}",
json=data,
headers={"X-API-Key": self._api_key},
) as r:
# Allow 409 and continue as if it worked.
if not r.ok and r.status != 409:
logger.error(f"Request error: {await r.text()}")
r.raise_for_status()