Split fs scanner into scanner and matcher components (#410)

This commit is contained in:
Zoe Roux 2024-04-10 00:11:54 +02:00 committed by GitHub
commit 81557741b2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
27 changed files with 574 additions and 433 deletions

View File

@ -35,7 +35,7 @@ def on_message(
body: bytes,
):
try:
message = Message.from_json(body) # type: Message
message = Message.from_json(body)
service.update(message.value.user, message.value.resource, message.value)
except Exception as e:
logging.exception("Error processing message.", exc_info=e)

View File

@ -1,5 +1,5 @@
from dataclasses import dataclass
from dataclasses_json import dataclass_json, LetterCase
from dataclasses_json import DataClassJsonMixin, dataclass_json, LetterCase
from autosync.models.episode import Episode
from autosync.models.movie import Movie
@ -17,7 +17,7 @@ class WatchStatusMessage(WatchStatus):
@dataclass_json(letter_case=LetterCase.CAMEL)
@dataclass
class Message:
class Message(DataClassJsonMixin):
action: str
type: str
value: WatchStatusMessage

View File

@ -84,6 +84,18 @@ services:
volumes:
- ${LIBRARY_ROOT}:/video:ro
matcher:
build: ./scanner
command: matcher
restart: on-failure
depends_on:
back:
condition: service_healthy
env_file:
- ./.env
environment:
- KYOO_URL=${KYOO_URL:-http://back:5000}
autosync:
build: ./autosync
restart: on-failure

View File

@ -59,6 +59,18 @@ services:
volumes:
- ${LIBRARY_ROOT}:/video:ro
matcher:
image: zoriya/kyoo_scanner:latest
command: matcher
restart: unless-stopped
depends_on:
back:
condition: service_healthy
env_file:
- ./.env
environment:
- KYOO_URL=${KYOO_URL:-http://back:5000}
autosync:
image: zoriya/kyoo_autosync:latest
restart: on-failure

View File

@ -60,6 +60,18 @@ services:
volumes:
- ${LIBRARY_ROOT}:/video:ro
matcher:
build: ./scanner
command: matcher
restart: unless-stopped
depends_on:
back:
condition: service_healthy
env_file:
- ./.env
environment:
- KYOO_URL=${KYOO_URL:-http://back:5000}
autosync:
build: ./autosync
restart: on-failure

View File

@ -37,4 +37,10 @@ export enum Genre {
Thriller = "Thriller",
War = "War",
Western = "Western",
Kids = "Kids",
News = "News",
Reality = "Reality",
Soap = "Soap",
Talk = "Talk",
Politics = "Politics",
}

2
scanner/.dockerignore Normal file
View File

@ -0,0 +1,2 @@
Dockerfile*

View File

@ -5,4 +5,5 @@ COPY ./requirements.txt .
RUN pip3 install -r ./requirements.txt
COPY . .
ENTRYPOINT ["python3", "-m", "scanner", "-v"]
ENTRYPOINT ["python3", "-m"]
CMD ["scanner"]

View File

@ -0,0 +1,18 @@
async def main():
import logging
import sys
from providers.provider import Provider
from providers.kyoo_client import KyooClient
from .matcher import Matcher
from .subscriber import Subscriber
logging.basicConfig(level=logging.INFO)
if len(sys.argv) > 1 and sys.argv[1] == "-v":
logging.basicConfig(level=logging.DEBUG)
logging.getLogger("watchfiles").setLevel(logging.WARNING)
logging.getLogger("rebulk").setLevel(logging.WARNING)
async with KyooClient() as kyoo, Subscriber() as sub:
provider, xem = Provider.get_all(kyoo.client)
scanner = Matcher(kyoo, provider, xem)
await sub.listen(scanner)

View File

@ -0,0 +1,6 @@
#!/usr/bin/env python
import asyncio
import matcher
asyncio.run(matcher.main())

169
scanner/matcher/matcher.py Normal file
View File

@ -0,0 +1,169 @@
from datetime import timedelta
import asyncio
from logging import getLogger
from providers.implementations.thexem import TheXem
from providers.provider import Provider, ProviderError
from providers.types.collection import Collection
from providers.types.show import Show
from providers.types.episode import Episode, PartialShow
from providers.types.season import Season
from providers.kyoo_client import KyooClient
from .parser.guess import guessit
from .cache import cache, exec_as_cache, make_key
logger = getLogger(__name__)
class Matcher:
def __init__(self, client: KyooClient, provider: Provider, xem: TheXem) -> None:
self._client = client
self._provider = provider
self._xem = xem
self._collection_cache = {}
self._show_cache = {}
self._season_cache = {}
async def delete(self, path: str):
try:
await self._client.delete(path)
return True
except Exception as e:
logger.exception("Unhandled error", exc_info=e)
return False
async def identify(self, path: str):
try:
await self._identify(path)
await self._client.delete_issue(path)
except ProviderError as e:
logger.error(e)
await self._client.create_issue(path, str(e))
except Exception as e:
logger.exception("Unhandled error", exc_info=e)
await self._client.create_issue(
path, "Unknown error", {"type": type(e).__name__, "message": str(e)}
)
return False
return True
async def _identify(self, path: str):
raw = guessit(path, xem_titles=await self._xem.get_expected_titles())
if "mimetype" not in raw or not raw["mimetype"].startswith("video"):
return
# Remove seasons in "One Piece (1999) 152.mkv" for example
if raw.get("season") == raw.get("year") and "season" in raw:
del raw["season"]
if isinstance(raw.get("season"), list):
raise ProviderError(
f"An episode can't have multiple seasons (found {raw.get('season')} for {path})"
)
if isinstance(raw.get("episode"), list):
raise ProviderError(
f"Multi-episodes files are not yet supported (for {path})"
)
logger.info("Identied %s: %s", path, raw)
if raw["type"] == "movie":
movie = await self._provider.identify_movie(raw["title"], raw.get("year"))
movie.path = str(path)
logger.debug("Got movie: %s", movie)
movie_id = await self._client.post("movies", data=movie.to_kyoo())
if any(movie.collections):
ids = await asyncio.gather(
*(self.create_or_get_collection(x) for x in movie.collections)
)
await asyncio.gather(
*(self._client.link_collection(x, "movie", movie_id) for x in ids)
)
elif raw["type"] == "episode":
episode = await self._provider.identify_episode(
raw["title"],
season=raw.get("season"),
episode_nbr=raw.get("episode"),
absolute=raw.get("episode") if "season" not in raw else None,
year=raw.get("year"),
)
episode.path = str(path)
logger.debug("Got episode: %s", episode)
episode.show_id = await self.create_or_get_show(episode)
if episode.season_number is not None:
episode.season_id = await self.register_seasons(
episode.show, episode.show_id, episode.season_number
)
await self._client.post("episodes", data=episode.to_kyoo())
else:
logger.warn("Unknown video file type: %s", raw["type"])
async def create_or_get_collection(self, collection: Collection) -> str:
@cache(ttl=timedelta(days=1), cache=self._collection_cache)
async def create_collection(provider_id: str):
# TODO: Check if a collection with the same metadata id exists already on kyoo.
new_collection = (
await self._provider.identify_collection(provider_id)
if not any(collection.translations.keys())
else collection
)
logger.debug("Got collection: %s", new_collection)
return await self._client.post("collection", data=new_collection.to_kyoo())
# The parameter is only used as a key for the cache.
provider_id = collection.external_id[self._provider.name].data_id
return await create_collection(provider_id)
async def create_or_get_show(self, episode: Episode) -> str:
@cache(ttl=timedelta(days=1), cache=self._show_cache)
async def create_show(_: str):
# TODO: Check if a show with the same metadata id exists already on kyoo.
show = (
await self._provider.identify_show(
episode.show.external_id[self._provider.name].data_id,
)
if isinstance(episode.show, PartialShow)
else episode.show
)
# TODO: collections
logger.debug("Got show: %s", episode)
ret = await self._client.post("show", data=show.to_kyoo())
async def create_season(season: Season, id: str):
try:
season.show_id = id
return await self._client.post("seasons", data=season.to_kyoo())
except Exception as e:
logger.exception("Unhandled error create a season", exc_info=e)
season_tasks = map(
lambda s: exec_as_cache(
self._season_cache,
make_key((ret, s.season_number)),
lambda: create_season(s, ret),
),
show.seasons,
)
await asyncio.gather(*season_tasks)
return ret
# The parameter is only used as a key for the cache.
provider_id = episode.show.external_id[self._provider.name].data_id
return await create_show(provider_id)
async def register_seasons(
self, show: Show | PartialShow, show_id: str, season_number: int
) -> str:
# We use an external season cache because we want to edit this cache programatically
@cache(ttl=timedelta(days=1), cache=self._season_cache)
async def create_season(_: str, __: int):
season = await self._provider.identify_season(
show.external_id[self._provider.name].data_id, season_number
)
season.show_id = show_id
return await self._client.post("seasons", data=season.to_kyoo())
return await create_season(show_id, season_number)

View File

@ -0,0 +1,57 @@
import asyncio
from dataclasses import dataclass
from dataclasses_json import DataClassJsonMixin
from typing import Literal
import os
import logging
from aio_pika import connect_robust
from aio_pika.abc import AbstractIncomingMessage
from matcher.matcher import Matcher
logger = logging.getLogger(__name__)
@dataclass
class Message(DataClassJsonMixin):
action: Literal["scan", "delete"]
path: str
class Subscriber:
QUEUE = "scanner"
async def __aenter__(self):
self._con = await connect_robust(
host=os.environ.get("RABBITMQ_HOST", "rabbitmq"),
login=os.environ.get("RABBITMQ_DEFAULT_USER", "guest"),
password=os.environ.get("RABBITMQ_DEFAULT_PASS", "guest"),
)
self._channel = await self._con.channel()
self._queue = await self._channel.declare_queue(self.QUEUE)
return self
async def __aexit__(self, exc_type, exc_value, exc_tb):
await self._con.close()
async def listen(self, scanner: Matcher):
async def on_message(message: AbstractIncomingMessage):
msg = Message.from_json(message.body)
ack = False
match msg.action:
case "scan":
ack = await scanner.identify(msg.path)
case "delete":
ack = await scanner.delete(msg.path)
case _:
logger.error(f"Invalid action: {msg.action}")
if ack:
await message.ack()
else:
await message.reject()
# Allow up to 20 scan requests to run in parallel on the same listener.
# Since most work is calling API not doing that is a waste.
await self._channel.set_qos(prefetch_count=20)
await self._queue.consume(on_message)
await asyncio.Future()

View File

@ -1,14 +1,14 @@
import asyncio
import logging
from aiohttp import ClientSession
from datetime import datetime, timedelta
from logging import getLogger
from typing import Awaitable, Callable, Dict, List, Optional, Any, TypeVar
from itertools import accumulate, zip_longest
from providers.idmapper import IdMapper
from providers.implementations.thexem import TheXem
from providers.utils import ProviderError
from scanner.cache import cache
from matcher.cache import cache
from ..provider import Provider
from ..types.movie import Movie, MovieTranslation, Status as MovieStatus
@ -20,11 +20,13 @@ from ..types.metadataid import MetadataID
from ..types.show import Show, ShowTranslation, Status as ShowStatus
from ..types.collection import Collection, CollectionTranslation
logger = getLogger(__name__)
class TheMovieDatabase(Provider):
def __init__(
self,
languages,
languages: list[str],
client: ClientSession,
api_key: str,
xem: TheXem,
@ -158,7 +160,7 @@ class TheMovieDatabase(Provider):
"append_to_response": "alternative_titles,videos,credits,keywords,images",
},
)
logging.debug("TMDb responded: %s", movie)
logger.debug("TMDb responded: %s", movie)
ret = Movie(
original_language=movie["original_language"],
@ -256,7 +258,7 @@ class TheMovieDatabase(Provider):
"append_to_response": "alternative_titles,videos,credits,keywords,images,external_ids",
},
)
logging.debug("TMDb responded: %s", show)
logger.debug("TMDb responded: %s", show)
ret = Show(
original_language=show["original_language"],
@ -427,7 +429,7 @@ class TheMovieDatabase(Provider):
if self.name in ret.external_id:
return ret
logging.warn(
logger.warn(
"Could not map xem exception to themoviedb, searching instead for %s",
new_name,
)
@ -473,7 +475,7 @@ class TheMovieDatabase(Provider):
else None
)
if tvdb_id is None:
logging.info(
logger.info(
"Tvdb could not be found, trying xem name lookup for %s", name
)
_, tvdb_id = await self._xem.get_show_override("tvdb", old_name)
@ -518,7 +520,7 @@ class TheMovieDatabase(Provider):
},
not_found_fail=f"Could not find episode {episode_nbr} of season {season} of serie {name} (absolute: {absolute})",
)
logging.debug("TMDb responded: %s", episode)
logger.debug("TMDb responded: %s", episode)
ret = Episode(
show=show,
@ -616,7 +618,7 @@ class TheMovieDatabase(Provider):
grp = next(iter(group["groups"]), None)
return grp["episodes"] if grp else None
except Exception as e:
logging.exception(
logger.exception(
"Could not retrieve absolute ordering information", exc_info=e
)
return None
@ -697,7 +699,7 @@ class TheMovieDatabase(Provider):
"language": lng,
},
)
logging.debug("TMDb responded: %s", collection)
logger.debug("TMDb responded: %s", collection)
ret = Collection(
external_id={

View File

@ -1,11 +1,13 @@
import re
import logging
from typing import Dict, List, Literal
from aiohttp import ClientSession
from logging import getLogger
from datetime import timedelta
from providers.utils import ProviderError
from scanner.cache import cache
from matcher.cache import cache
logger = getLogger(__name__)
def clean(s: str):
@ -28,7 +30,7 @@ class TheXem:
async def get_map(
self, provider: Literal["tvdb"] | Literal["anidb"]
) -> Dict[str, List[Dict[str, int]]]:
logging.info("Fetching data from thexem for %s", provider)
logger.info("Fetching data from thexem for %s", provider)
async with self._client.get(
f"{self.base}/map/allNames",
params={
@ -40,7 +42,7 @@ class TheXem:
r.raise_for_status()
ret = await r.json()
if "data" not in ret or ret["result"] == "failure":
logging.error("Could not fetch xem metadata. Error: %s", ret["message"])
logger.error("Could not fetch xem metadata. Error: %s", ret["message"])
raise ProviderError("Could not fetch xem metadata")
return ret["data"]
@ -53,7 +55,7 @@ class TheXem:
Dict[Literal["season"] | Literal["episode"] | Literal["absolute"], int],
]
]:
logging.info("Fetching from thexem the map of %s (%s)", id, provider)
logger.info("Fetching from thexem the map of %s (%s)", id, provider)
async with self._client.get(
f"{self.base}/map/all",
params={
@ -64,7 +66,7 @@ class TheXem:
r.raise_for_status()
ret = await r.json()
if "data" not in ret or ret["result"] == "failure":
logging.error("Could not fetch xem mapping. Error: %s", ret["message"])
logger.error("Could not fetch xem mapping. Error: %s", ret["message"])
return []
return ret["data"]
@ -111,7 +113,7 @@ class TheXem:
if master_season is None or master_season == -1:
return [None, None, episode]
logging.info(
logger.info(
"Fount xem override for show %s, ep %d. Master season: %d",
show_name,
episode,
@ -130,7 +132,7 @@ class TheXem:
None,
)
if ep is None:
logging.warning(
logger.warning(
"Could not get xem mapping for show %s, falling back to identifier mapping.",
show_name,
)

View File

@ -0,0 +1,156 @@
import os
import jsons
from aiohttp import ClientSession
from datetime import date
from logging import getLogger
from typing import List, Literal, Any, Optional
from urllib.parse import quote
from .utils import format_date
logger = getLogger(__name__)
class KyooClient:
def __init__(self) -> None:
self._api_key = os.environ.get("KYOO_APIKEY")
if not self._api_key:
self._api_key = os.environ.get("KYOO_APIKEYS")
if not self._api_key:
print("Missing environment variable 'KYOO_APIKEY'.")
exit(2)
self._api_key = self._api_key.split(",")[0]
self._url = os.environ.get("KYOO_URL", "http://back:5000")
async def __aenter__(self):
jsons.set_serializer(lambda x, **_: format_date(x), type[Optional[date | int]])
self.client = ClientSession(
json_serialize=lambda *args, **kwargs: jsons.dumps(
*args, key_transformer=jsons.KEY_TRANSFORMER_CAMELCASE, **kwargs
),
)
return self
async def __aexit__(self, exc_type, exc_value, exc_tb):
await self.client.close()
async def get_registered_paths(self) -> List[str]:
paths = None
async with self.client.get(
f"{self._url}/episodes",
params={"limit": 0},
headers={"X-API-Key": self._api_key},
) as r:
r.raise_for_status()
ret = await r.json()
paths = list(x["path"] for x in ret["items"])
async with self.client.get(
f"{self._url}/movies",
params={"limit": 0},
headers={"X-API-Key": self._api_key},
) as r:
r.raise_for_status()
ret = await r.json()
paths += list(x["path"] for x in ret["items"])
return paths
async def create_issue(self, path: str, issue: str, extra: dict | None = None):
async with self.client.post(
f"{self._url}/issues",
json={
"domain": "scanner",
"cause": path,
"reason": issue,
"extra": extra if extra is not None else {},
},
headers={"X-API-Key": self._api_key},
) as r:
if not r.ok:
logger.error(f"Request error: {await r.text()}")
r.raise_for_status()
async def delete_issue(self, path: str):
async with self.client.delete(
f'{self._url}/issues?filter=domain eq scanner and cause eq "{path}"',
headers={"X-API-Key": self._api_key},
) as r:
if not r.ok:
logger.error(f"Request error: {await r.text()}")
r.raise_for_status()
async def link_collection(
self, collection: str, type: Literal["movie"] | Literal["show"], id: str
):
async with self.client.put(
f"{self._url}/collections/{collection}/{type}/{id}",
headers={"X-API-Key": self._api_key},
) as r:
# Allow 409 and continue as if it worked.
if not r.ok and r.status != 409:
logger.error(f"Request error: {await r.text()}")
r.raise_for_status()
async def post(self, path: str, *, data: dict[str, Any]) -> str:
logger.debug(
"Sending %s: %s",
path,
jsons.dumps(
data,
key_transformer=jsons.KEY_TRANSFORMER_CAMELCASE,
jdkwargs={"indent": 4},
),
)
async with self.client.post(
f"{self._url}/{path}",
json=data,
headers={"X-API-Key": self._api_key},
) as r:
# Allow 409 and continue as if it worked.
if not r.ok and r.status != 409:
logger.error(f"Request error: {await r.text()}")
r.raise_for_status()
ret = await r.json()
if r.status == 409 and (
(path == "shows" and ret["startAir"][:4] != str(data["start_air"].year))
or (
path == "movies"
and ret["airDate"][:4] != str(data["air_date"].year)
)
):
logger.info(
f"Found a {path} with the same slug ({ret['slug']}) and a different date, using the date as part of the slug"
)
year = (data["start_air"] if path == "movie" else data["air_date"]).year
data["slug"] = f"{ret['slug']}-{year}"
return await self.post(path, data=data)
return ret["id"]
async def delete(
self,
path: str,
type: Literal["episode", "movie"] | None = None,
):
logger.info("Deleting %s", path)
if type is None or type == "movie":
async with self.client.delete(
f'{self._url}/movies?filter=path eq "{quote(path)}"',
headers={"X-API-Key": self._api_key},
) as r:
if not r.ok:
logger.error(f"Request error: {await r.text()}")
r.raise_for_status()
if type is None or type == "episode":
async with self.client.delete(
f'{self._url}/episodes?filter=path eq "{quote(path)}"',
headers={"X-API-Key": self._api_key},
) as r:
if not r.ok:
logger.error(f"Request error: {await r.text()}")
r.raise_for_status()
await self.delete_issue(path)

View File

@ -1,7 +1,7 @@
import os
from aiohttp import ClientSession
from abc import abstractmethod, abstractproperty
from typing import Optional, TypeVar
from typing import Optional, Self
from providers.implementations.thexem import TheXem
from providers.utils import ProviderError
@ -13,14 +13,14 @@ from .types.movie import Movie
from .types.collection import Collection
Self = TypeVar("Self", bound="Provider")
class Provider:
@classmethod
def get_all(
cls: type[Self], client: ClientSession, languages: list[str]
) -> tuple[list[Self], TheXem]:
def get_all(cls, client: ClientSession) -> tuple[Self, TheXem]:
languages = os.environ.get("LIBRARY_LANGUAGES")
if not languages:
print("Missing environment variable 'LIBRARY_LANGUAGES'.")
exit(2)
languages = languages.split(",")
providers = []
from providers.idmapper import IdMapper
@ -44,7 +44,7 @@ class Provider:
idmapper.init(tmdb=tmdb, language=languages[0])
return providers, xem
return next(iter(providers)), xem
@abstractproperty
def name(self) -> str:

View File

@ -2,3 +2,5 @@ guessit
aiohttp
jsons
watchfiles
aio-pika
dataclasses-json

View File

@ -2,46 +2,17 @@ async def main():
import asyncio
import os
import logging
import sys
import jsons
from datetime import date
from typing import Optional
from aiohttp import ClientSession
from providers.utils import format_date, ProviderError
from .scanner import Scanner
from .monitor import monitor
from .scanner import scan
from .publisher import Publisher
from providers.kyoo_client import KyooClient
path = os.environ.get("SCANNER_LIBRARY_ROOT", "/video")
languages = os.environ.get("LIBRARY_LANGUAGES")
if not languages:
print("Missing environment variable 'LIBRARY_LANGUAGES'.")
exit(2)
api_key = os.environ.get("KYOO_APIKEY")
if not api_key:
api_key = os.environ.get("KYOO_APIKEYS")
if not api_key:
print("Missing environment variable 'KYOO_APIKEY'.")
exit(2)
api_key = api_key.split(",")[0]
if len(sys.argv) > 1 and sys.argv[1] == "-v":
logging.basicConfig(level=logging.INFO)
if len(sys.argv) > 1 and sys.argv[1] == "-vv":
logging.basicConfig(level=logging.DEBUG)
logging.getLogger("watchfiles").setLevel(logging.WARNING)
logging.getLogger("rebulk").setLevel(logging.WARNING)
jsons.set_serializer(lambda x, **_: format_date(x), Optional[date | int]) # type: ignore
async with ClientSession(
json_serialize=lambda *args, **kwargs: jsons.dumps(
*args, key_transformer=jsons.KEY_TRANSFORMER_CAMELCASE, **kwargs
),
) as client:
try:
scanner = Scanner(client, languages=languages.split(","), api_key=api_key)
async with Publisher() as publisher, KyooClient() as client:
path = os.environ.get("SCANNER_LIBRARY_ROOT", "/video")
await asyncio.gather(
monitor(path, scanner),
scanner.scan(path),
monitor(path, publisher),
scan(path, publisher, client),
)
except ProviderError as e:
logging.error(e)

View File

@ -1,22 +1,19 @@
import logging
from logging import getLogger
from watchfiles import awatch, Change
from .utils import ProviderError
from .scanner import Scanner
from .publisher import Publisher
logger = getLogger(__name__)
async def monitor(path: str, scanner: Scanner):
async def monitor(path: str, publisher: Publisher):
async for changes in awatch(path):
for event, file in changes:
try:
if event == Change.added:
await scanner.identify(file)
await publisher.add(file)
elif event == Change.deleted:
await scanner.delete(file)
await publisher.delete(file)
elif event == Change.modified:
pass
else:
print(f"Change {event} occured for file {file}")
except ProviderError as e:
logging.error(str(e))
except Exception as e:
logging.exception("Unhandled error", exc_info=e)
logger.info(f"Change {event} occured for file {file}")

View File

@ -0,0 +1,32 @@
import os
from guessit.jsonutils import json
from aio_pika import Message, connect_robust
class Publisher:
QUEUE = "scanner"
async def __aenter__(self):
self._con = await connect_robust(
host=os.environ.get("RABBITMQ_HOST", "rabbitmq"),
login=os.environ.get("RABBITMQ_DEFAULT_USER", "guest"),
password=os.environ.get("RABBITMQ_DEFAULT_PASS", "guest"),
)
self._channel = await self._con.channel()
self._queue = await self._channel.declare_queue(self.QUEUE)
return self
async def __aexit__(self, exc_type, exc_value, exc_tb):
await self._con.close()
async def _publish(self, data: dict):
await self._channel.default_exchange.publish(
Message(json.dumps(data).encode()),
routing_key=self.QUEUE,
)
async def add(self, path: str):
await self._publish({"action": "scan", "path": path})
async def delete(self, path: str):
await self._publish({"action": "delete", "path": path})

View File

@ -0,0 +1 @@
aio-pika

View File

@ -1,298 +1,36 @@
from datetime import timedelta
import os
import asyncio
import logging
import jsons
import re
from aiohttp import ClientSession
from pathlib import Path
from typing import List, Literal, Any
from urllib.parse import quote
from providers.provider import Provider, ProviderError
from providers.types.collection import Collection
from providers.types.show import Show
from providers.types.episode import Episode, PartialShow
from providers.types.season import Season
from .parser.guess import guessit
from .utils import batch, handle_errors
from .cache import cache, exec_as_cache, make_key
import asyncio
from logging import getLogger
from .publisher import Publisher
from providers.kyoo_client import KyooClient
logger = getLogger(__name__)
class Scanner:
def __init__(
self, client: ClientSession, *, languages: list[str], api_key: str
) -> None:
self._client = client
self._api_key = api_key
self._url = os.environ.get("KYOO_URL", "http://back:5000")
async def scan(path: str, publisher: Publisher, client: KyooClient):
logger.info("Starting the scan. It can take some times...")
ignore_pattern = None
try:
self._ignore_pattern = re.compile(
os.environ.get("LIBRARY_IGNORE_PATTERN", "")
)
ignore_pattern = re.compile(os.environ.get("LIBRARY_IGNORE_PATTERN", ""))
except Exception as e:
self._ignore_pattern = re.compile("")
logging.error(f"Invalid ignore pattern. Ignoring. Error: {e}")
[self.provider, *_], self._xem = Provider.get_all(client, languages)
self.languages = languages
ignore_pattern = re.compile("")
logger.error(f"Invalid ignore pattern. Ignoring. Error: {e}")
self._collection_cache = {}
self._show_cache = {}
self._season_cache = {}
registered = await client.get_registered_paths()
videos = [
os.path.join(dir, file) for dir, _, files in os.walk(path) for file in files
]
to_register = [
p for p in videos if p not in registered and not ignore_pattern.match(p)
]
deleted = [x for x in registered if x not in videos]
async def scan(self, path: str):
logging.info("Starting the scan. It can take some times...")
self.registered = await self.get_registered_paths()
self.issues = await self.get_issues()
videos = [str(p) for p in Path(path).rglob("*") if p.is_file()]
deleted = [x for x in self.registered if x not in videos]
if len(deleted) != len(self.registered):
for x in deleted:
await self.delete(x)
for x in self.issues:
if x not in videos:
await self.delete(x, "issue")
if len(deleted) != len(registered):
await asyncio.gather(*map(publisher.delete, deleted))
elif len(deleted) > 0:
logging.warning("All video files are unavailable. Check your disks.")
logger.warning("All video files are unavailable. Check your disks.")
# We batch videos by 20 because too mutch at once kinda DDOS everything.
for group in batch(iter(videos), 20):
await asyncio.gather(*map(self.identify, group))
logging.info("Scan finished.")
async def get_registered_paths(self) -> List[str]:
paths = None
async with self._client.get(
f"{self._url}/episodes",
params={"limit": 0},
headers={"X-API-Key": self._api_key},
) as r:
r.raise_for_status()
ret = await r.json()
paths = list(x["path"] for x in ret["items"])
async with self._client.get(
f"{self._url}/movies",
params={"limit": 0},
headers={"X-API-Key": self._api_key},
) as r:
r.raise_for_status()
ret = await r.json()
paths += list(x["path"] for x in ret["items"])
return paths
async def get_issues(self) -> List[str]:
async with self._client.get(
f"{self._url}/issues",
params={"limit": 0},
headers={"X-API-Key": self._api_key},
) as r:
r.raise_for_status()
ret = await r.json()
return [x["cause"] for x in ret if x["domain"] == "scanner"]
@handle_errors
async def identify(self, path: str):
if path in self.registered or self._ignore_pattern.match(path):
return
raw = guessit(path, xem_titles=await self._xem.get_expected_titles())
if "mimetype" not in raw or not raw["mimetype"].startswith("video"):
return
# Remove seasons in "One Piece (1999) 152.mkv" for example
if raw.get("season") == raw.get("year") and "season" in raw:
del raw["season"]
if isinstance(raw.get("season"), List):
raise ProviderError(
f"An episode can't have multiple seasons (found {raw.get('season')} for {path})"
)
if isinstance(raw.get("episode"), List):
raise ProviderError(
f"Multi-episodes files are not yet supported (for {path})"
)
logging.info("Identied %s: %s", path, raw)
if raw["type"] == "movie":
movie = await self.provider.identify_movie(raw["title"], raw.get("year"))
movie.path = str(path)
logging.debug("Got movie: %s", movie)
movie_id = await self.post("movies", data=movie.to_kyoo())
if any(movie.collections):
ids = await asyncio.gather(
*(self.create_or_get_collection(x) for x in movie.collections)
)
await asyncio.gather(
*(self.link_collection(x, "movie", movie_id) for x in ids)
)
elif raw["type"] == "episode":
episode = await self.provider.identify_episode(
raw["title"],
season=raw.get("season"),
episode_nbr=raw.get("episode"),
absolute=raw.get("episode") if "season" not in raw else None,
year=raw.get("year"),
)
episode.path = str(path)
logging.debug("Got episode: %s", episode)
episode.show_id = await self.create_or_get_show(episode)
if episode.season_number is not None:
episode.season_id = await self.register_seasons(
episode.show, episode.show_id, episode.season_number
)
await self.post("episodes", data=episode.to_kyoo())
else:
logging.warn("Unknown video file type: %s", raw["type"])
async def create_or_get_collection(self, collection: Collection) -> str:
@cache(ttl=timedelta(days=1), cache=self._collection_cache)
async def create_collection(provider_id: str):
# TODO: Check if a collection with the same metadata id exists already on kyoo.
new_collection = (
await self.provider.identify_collection(provider_id)
if not any(collection.translations.keys())
else collection
)
logging.debug("Got collection: %s", new_collection)
return await self.post("collection", data=new_collection.to_kyoo())
# The parameter is only used as a key for the cache.
provider_id = collection.external_id[self.provider.name].data_id
return await create_collection(provider_id)
async def link_collection(
self, collection: str, type: Literal["movie"] | Literal["show"], id: str
):
async with self._client.put(
f"{self._url}/collections/{collection}/{type}/{id}",
headers={"X-API-Key": self._api_key},
) as r:
# Allow 409 and continue as if it worked.
if not r.ok and r.status != 409:
logging.error(f"Request error: {await r.text()}")
r.raise_for_status()
async def create_or_get_show(self, episode: Episode) -> str:
@cache(ttl=timedelta(days=1), cache=self._show_cache)
async def create_show(_: str):
# TODO: Check if a show with the same metadata id exists already on kyoo.
show = (
await self.provider.identify_show(
episode.show.external_id[self.provider.name].data_id,
)
if isinstance(episode.show, PartialShow)
else episode.show
)
# TODO: collections
logging.debug("Got show: %s", episode)
ret = await self.post("show", data=show.to_kyoo())
async def create_season(season: Season, id: str):
try:
season.show_id = id
return await self.post("seasons", data=season.to_kyoo())
except Exception as e:
logging.exception("Unhandled error create a season", exc_info=e)
season_tasks = map(
lambda s: exec_as_cache(
self._season_cache,
make_key((ret, s.season_number)),
lambda: create_season(s, ret),
),
show.seasons,
)
await asyncio.gather(*season_tasks)
return ret
# The parameter is only used as a key for the cache.
provider_id = episode.show.external_id[self.provider.name].data_id
return await create_show(provider_id)
async def register_seasons(
self, show: Show | PartialShow, show_id: str, season_number: int
) -> str:
# We use an external season cache because we want to edit this cache programatically
@cache(ttl=timedelta(days=1), cache=self._season_cache)
async def create_season(_: str, __: int):
season = await self.provider.identify_season(
show.external_id[self.provider.name].data_id, season_number
)
season.show_id = show_id
return await self.post("seasons", data=season.to_kyoo())
return await create_season(show_id, season_number)
async def post(self, path: str, *, data: dict[str, Any]) -> str:
logging.debug(
"Sending %s: %s",
path,
jsons.dumps(
data,
key_transformer=jsons.KEY_TRANSFORMER_CAMELCASE,
jdkwargs={"indent": 4},
),
)
async with self._client.post(
f"{self._url}/{path}",
json=data,
headers={"X-API-Key": self._api_key},
) as r:
# Allow 409 and continue as if it worked.
if not r.ok and r.status != 409:
logging.error(f"Request error: {await r.text()}")
r.raise_for_status()
ret = await r.json()
if r.status == 409 and (
(path == "shows" and ret["startAir"][:4] != str(data["start_air"].year))
or (
path == "movies"
and ret["airDate"][:4] != str(data["air_date"].year)
)
):
logging.info(
f"Found a {path} with the same slug ({ret['slug']}) and a different date, using the date as part of the slug"
)
year = (data["start_air"] if path == "movie" else data["air_date"]).year
data["slug"] = f"{ret['slug']}-{year}"
return await self.post(path, data=data)
return ret["id"]
async def delete(
self,
path: str,
type: Literal["episode", "movie", "issue"] | None = None,
):
logging.info("Deleting %s", path)
self.registered = filter(lambda x: x != path, self.registered)
if type is None or type == "movie":
async with self._client.delete(
f'{self._url}/movies?filter=path eq "{quote(path)}"',
headers={"X-API-Key": self._api_key},
) as r:
if not r.ok:
logging.error(f"Request error: {await r.text()}")
r.raise_for_status()
if type is None or type == "episode":
async with self._client.delete(
f'{self._url}/episodes?filter=path eq "{quote(path)}"',
headers={"X-API-Key": self._api_key},
) as r:
if not r.ok:
logging.error(f"Request error: {await r.text()}")
r.raise_for_status()
if path in self.issues:
self.issues = filter(lambda x: x != path, self.issues)
await self._client.delete(
f'{self._url}/issues?filter=domain eq scanner and cause eq "{quote(path)}"',
headers={"X-API-Key": self._api_key},
)
await asyncio.gather(*map(publisher.add, to_register))
logger.info("Scan finished.")

View File

@ -1,56 +0,0 @@
from __future__ import annotations
import logging
from functools import wraps
from itertools import islice
from typing import TYPE_CHECKING, Iterator, List, TypeVar
from providers.utils import ProviderError
if TYPE_CHECKING:
from scanner.scanner import Scanner
T = TypeVar("T")
def batch(iterable: Iterator[T], n: int) -> Iterator[List[T]]:
"Batch data into lists of length n. The last batch may be shorter."
# batched('ABCDEFG', 3) --> ABC DEF G
it = iter(iterable)
while True:
batch = list(islice(it, n))
if not batch:
return
yield batch
def handle_errors(f):
@wraps(f)
async def internal(self: Scanner, path: str):
try:
await f(self, path)
if path in self.issues:
await self._client.delete(
f'{self._url}/issues?filter=domain eq scanner and cause eq "{path}"',
headers={"X-API-Key": self._api_key},
)
except ProviderError as e:
logging.error(str(e))
await self._client.post(
f"{self._url}/issues",
json={"domain": "scanner", "cause": path, "reason": str(e)},
headers={"X-API-Key": self._api_key},
)
except Exception as e:
logging.exception("Unhandled error", exc_info=e)
await self._client.post(
f"{self._url}/issues",
json={
"domain": "scanner",
"cause": path,
"reason": "Unknown error",
"extra": {"type": type(e).__name__, "message": str(e)},
},
headers={"X-API-Key": self._api_key},
)
return internal

View File

@ -6,6 +6,7 @@
jsons
watchfiles
pika
aio-pika
requests
dataclasses-json
]);