Split fs scaning and metadata matching

This commit is contained in:
Zoe Roux 2024-04-08 00:15:02 +02:00
parent 00c41fb704
commit e284f771df
No known key found for this signature in database
13 changed files with 261 additions and 204 deletions

2
scanner/.dockerignore Normal file
View File

@ -0,0 +1,2 @@
Dockerfile*

View File

@ -0,0 +1,17 @@
async def main():
import asyncio
import os
import logging
from .monitor import monitor
from .scanner import scan
from .publisher import Publisher
from providers.kyoo_client import KyooClient
logging.getLogger("watchfiles").setLevel(logging.WARNING)
async with Publisher() as publisher, KyooClient() as client:
path = os.environ.get("SCANNER_LIBRARY_ROOT", "/video")
await asyncio.gather(
monitor(path, publisher),
scan(path),
)

View File

@ -0,0 +1,6 @@
#!/usr/bin/env python
import asyncio
import monitor
asyncio.run(monitor.main())

View File

@ -0,0 +1,19 @@
from logging import getLogger
from watchfiles import awatch, Change
from monitor.publisher import Publisher
logger = getLogger(__name__)
async def monitor(path: str, publisher: Publisher):
async for changes in awatch(path):
for event, file in changes:
if event == Change.added:
await publisher.add(file)
elif event == Change.deleted:
await publisher.delete(file)
elif event == Change.modified:
pass
else:
logger.info(f"Change {event} occured for file {file}")

View File

@ -0,0 +1,32 @@
import os
from guessit.jsonutils import json
from aio_pika import Message, connect_robust
class Publisher:
QUEUE = "scanner"
async def __aenter__(self):
self._con = await connect_robust(
host=os.environ.get("RABBITMQ_HOST", "rabbitmq"),
login=os.environ.get("RABBITMQ_DEFAULT_USER", "guest"),
password=os.environ.get("RABBITMQ_DEFAULT_PASS", "guest"),
)
self._channel = await self._con.channel()
self._queue = await self._channel.declare_queue(self.QUEUE)
return self
async def __aexit__(self, exc_type, exc_value, exc_tb):
await self._con.close()
async def _publish(self, data: dict):
await self._channel.default_exchange.publish(
Message(json.dumps(data).encode()),
routing_key=self.QUEUE,
)
async def add(self, path: str):
await self._publish({"action": "scan", "path": path})
async def delete(self, path: str):
await self._publish({"action": "delete", "path": path})

View File

@ -0,0 +1 @@
aio-pika

View File

@ -0,0 +1,32 @@
from logging import getLogger
logger = getLogger(__name__)
async def scan(path: str):
logger.info("Starting the scan. It can take some times...")
registered = await _get_registered_paths()
self.issues = await self.get_issues()
videos = [str(p) for p in Path(path).rglob("*") if p.is_file()]
deleted = [x for x in self.registered if x not in videos]
# try:
# self._ignore_pattern = re.compile(
# os.environ.get("LIBRARY_IGNORE_PATTERN", "")
# )
# except Exception as e:
# self._ignore_pattern = re.compile("")
# logging.error(f"Invalid ignore pattern. Ignoring. Error: {e}")
if len(deleted) != len(self.registered):
for x in deleted:
await self.delete(x)
for x in self.issues:
if x not in videos:
await self.delete(x, "issue")
elif len(deleted) > 0:
logging.warning("All video files are unavailable. Check your disks.")
# We batch videos by 20 because too mutch at once kinda DDOS everything.
for group in batch(iter(videos), 20):
await asyncio.gather(*map(self.identify, group))
logging.info("Scan finished.")

View File

@ -0,0 +1,117 @@
from datetime import timedelta
import os
import asyncio
import logging
import jsons
import re
from aiohttp import ClientSession
from pathlib import Path
from typing import List, Literal, Any
from urllib.parse import quote
from providers.provider import Provider, ProviderError
from providers.types.collection import Collection
from providers.types.show import Show
from providers.types.episode import Episode, PartialShow
from providers.types.season import Season
class KyooClient:
def __init__(
self, client: ClientSession, *, api_key: str
) -> None:
self._client = client
self._api_key = api_key
self._url = os.environ.get("KYOO_URL", "http://back:5000")
async def get_issues(self) -> List[str]:
async with self._client.get(
f"{self._url}/issues",
params={"limit": 0},
headers={"X-API-Key": self._api_key},
) as r:
r.raise_for_status()
ret = await r.json()
return [x["cause"] for x in ret if x["domain"] == "scanner"]
async def link_collection(
self, collection: str, type: Literal["movie"] | Literal["show"], id: str
):
async with self._client.put(
f"{self._url}/collections/{collection}/{type}/{id}",
headers={"X-API-Key": self._api_key},
) as r:
# Allow 409 and continue as if it worked.
if not r.ok and r.status != 409:
logging.error(f"Request error: {await r.text()}")
r.raise_for_status()
async def post(self, path: str, *, data: dict[str, Any]) -> str:
logging.debug(
"Sending %s: %s",
path,
jsons.dumps(
data,
key_transformer=jsons.KEY_TRANSFORMER_CAMELCASE,
jdkwargs={"indent": 4},
),
)
async with self._client.post(
f"{self._url}/{path}",
json=data,
headers={"X-API-Key": self._api_key},
) as r:
# Allow 409 and continue as if it worked.
if not r.ok and r.status != 409:
logging.error(f"Request error: {await r.text()}")
r.raise_for_status()
ret = await r.json()
if r.status == 409 and (
(path == "shows" and ret["startAir"][:4] != str(data["start_air"].year))
or (
path == "movies"
and ret["airDate"][:4] != str(data["air_date"].year)
)
):
logging.info(
f"Found a {path} with the same slug ({ret['slug']}) and a different date, using the date as part of the slug"
)
year = (data["start_air"] if path == "movie" else data["air_date"]).year
data["slug"] = f"{ret['slug']}-{year}"
return await self.post(path, data=data)
return ret["id"]
async def delete(
self,
path: str,
type: Literal["episode", "movie", "issue"] | None = None,
):
logging.info("Deleting %s", path)
self.registered = filter(lambda x: x != path, self.registered)
if type is None or type == "movie":
async with self._client.delete(
f'{self._url}/movies?filter=path eq "{quote(path)}"',
headers={"X-API-Key": self._api_key},
) as r:
if not r.ok:
logging.error(f"Request error: {await r.text()}")
r.raise_for_status()
if type is None or type == "episode":
async with self._client.delete(
f'{self._url}/episodes?filter=path eq "{quote(path)}"',
headers={"X-API-Key": self._api_key},
) as r:
if not r.ok:
logging.error(f"Request error: {await r.text()}")
r.raise_for_status()
if path in self.issues:
self.issues = filter(lambda x: x != path, self.issues)
await self._client.delete(
f'{self._url}/issues?filter=domain eq scanner and cause eq "{quote(path)}"',
headers={"X-API-Key": self._api_key},
)

View File

@ -2,3 +2,4 @@ guessit
aiohttp aiohttp
jsons jsons
watchfiles watchfiles
aio-pika

View File

@ -1,3 +1,6 @@
from providers.kyoo_client import KyooClient
async def main(): async def main():
import asyncio import asyncio
import os import os
@ -9,7 +12,6 @@ async def main():
from aiohttp import ClientSession from aiohttp import ClientSession
from providers.utils import format_date, ProviderError from providers.utils import format_date, ProviderError
from .scanner import Scanner from .scanner import Scanner
from .monitor import monitor
path = os.environ.get("SCANNER_LIBRARY_ROOT", "/video") path = os.environ.get("SCANNER_LIBRARY_ROOT", "/video")
languages = os.environ.get("LIBRARY_LANGUAGES") languages = os.environ.get("LIBRARY_LANGUAGES")
@ -37,11 +39,9 @@ async def main():
*args, key_transformer=jsons.KEY_TRANSFORMER_CAMELCASE, **kwargs *args, key_transformer=jsons.KEY_TRANSFORMER_CAMELCASE, **kwargs
), ),
) as client: ) as client:
kyoo = KyooClient(client, api_key=api_key)
provider =
try: try:
scanner = Scanner(client, languages=languages.split(","), api_key=api_key) scanner = Scanner(kyoo, languages=languages.split(","), api_key=api_key)
await asyncio.gather(
monitor(path, scanner),
scanner.scan(path),
)
except ProviderError as e: except ProviderError as e:
logging.error(e) logging.error(e)

View File

@ -1,22 +0,0 @@
import logging
from watchfiles import awatch, Change
from .utils import ProviderError
from .scanner import Scanner
async def monitor(path: str, scanner: Scanner):
async for changes in awatch(path):
for event, file in changes:
try:
if event == Change.added:
await scanner.identify(file)
elif event == Change.deleted:
await scanner.delete(file)
elif event == Change.modified:
pass
else:
print(f"Change {event} occured for file {file}")
except ProviderError as e:
logging.error(str(e))
except Exception as e:
logging.exception("Unhandled error", exc_info=e)

View File

@ -1,101 +1,33 @@
from datetime import timedelta from datetime import timedelta
import os
import asyncio import asyncio
import logging import logging
import jsons from providers.implementations.thexem import TheXem
import re
from aiohttp import ClientSession
from pathlib import Path
from typing import List, Literal, Any
from urllib.parse import quote
from providers.provider import Provider, ProviderError from providers.provider import Provider, ProviderError
from providers.types.collection import Collection from providers.types.collection import Collection
from providers.types.show import Show from providers.types.show import Show
from providers.types.episode import Episode, PartialShow from providers.types.episode import Episode, PartialShow
from providers.types.season import Season from providers.types.season import Season
from providers.kyoo_client import KyooClient
from .parser.guess import guessit from .parser.guess import guessit
from .utils import batch, handle_errors from .utils import handle_errors
from .cache import cache, exec_as_cache, make_key from .cache import cache, exec_as_cache, make_key
class Scanner: class Scanner:
def __init__( def __init__(self, client: KyooClient, provider: Provider, xem: TheXem) -> None:
self, client: ClientSession, *, languages: list[str], api_key: str
) -> None:
self._client = client self._client = client
self._api_key = api_key self._provider = provider
self._url = os.environ.get("KYOO_URL", "http://back:5000") self._xem = xem
try:
self._ignore_pattern = re.compile(
os.environ.get("LIBRARY_IGNORE_PATTERN", "")
)
except Exception as e:
self._ignore_pattern = re.compile("")
logging.error(f"Invalid ignore pattern. Ignoring. Error: {e}")
[self.provider, *_], self._xem = Provider.get_all(client, languages)
self.languages = languages
self._collection_cache = {} self._collection_cache = {}
self._show_cache = {} self._show_cache = {}
self._season_cache = {} self._season_cache = {}
async def scan(self, path: str):
logging.info("Starting the scan. It can take some times...")
self.registered = await self.get_registered_paths()
self.issues = await self.get_issues()
videos = [str(p) for p in Path(path).rglob("*") if p.is_file()]
deleted = [x for x in self.registered if x not in videos]
if len(deleted) != len(self.registered):
for x in deleted:
await self.delete(x)
for x in self.issues:
if x not in videos:
await self.delete(x, "issue")
elif len(deleted) > 0:
logging.warning("All video files are unavailable. Check your disks.")
# We batch videos by 20 because too mutch at once kinda DDOS everything.
for group in batch(iter(videos), 20):
await asyncio.gather(*map(self.identify, group))
logging.info("Scan finished.")
async def get_registered_paths(self) -> List[str]:
paths = None
async with self._client.get(
f"{self._url}/episodes",
params={"limit": 0},
headers={"X-API-Key": self._api_key},
) as r:
r.raise_for_status()
ret = await r.json()
paths = list(x["path"] for x in ret["items"])
async with self._client.get(
f"{self._url}/movies",
params={"limit": 0},
headers={"X-API-Key": self._api_key},
) as r:
r.raise_for_status()
ret = await r.json()
paths += list(x["path"] for x in ret["items"])
return paths
async def get_issues(self) -> List[str]:
async with self._client.get(
f"{self._url}/issues",
params={"limit": 0},
headers={"X-API-Key": self._api_key},
) as r:
r.raise_for_status()
ret = await r.json()
return [x["cause"] for x in ret if x["domain"] == "scanner"]
@handle_errors @handle_errors
async def identify(self, path: str): async def identify(self, path: str):
if path in self.registered or self._ignore_pattern.match(path): # if path in self.registered or self._ignore_pattern.match(path):
return # return
#
raw = guessit(path, xem_titles=await self._xem.get_expected_titles()) raw = guessit(path, xem_titles=await self._xem.get_expected_titles())
if "mimetype" not in raw or not raw["mimetype"].startswith("video"): if "mimetype" not in raw or not raw["mimetype"].startswith("video"):
@ -104,11 +36,11 @@ class Scanner:
if raw.get("season") == raw.get("year") and "season" in raw: if raw.get("season") == raw.get("year") and "season" in raw:
del raw["season"] del raw["season"]
if isinstance(raw.get("season"), List): if isinstance(raw.get("season"), list):
raise ProviderError( raise ProviderError(
f"An episode can't have multiple seasons (found {raw.get('season')} for {path})" f"An episode can't have multiple seasons (found {raw.get('season')} for {path})"
) )
if isinstance(raw.get("episode"), List): if isinstance(raw.get("episode"), list):
raise ProviderError( raise ProviderError(
f"Multi-episodes files are not yet supported (for {path})" f"Multi-episodes files are not yet supported (for {path})"
) )
@ -116,20 +48,20 @@ class Scanner:
logging.info("Identied %s: %s", path, raw) logging.info("Identied %s: %s", path, raw)
if raw["type"] == "movie": if raw["type"] == "movie":
movie = await self.provider.identify_movie(raw["title"], raw.get("year")) movie = await self._provider.identify_movie(raw["title"], raw.get("year"))
movie.path = str(path) movie.path = str(path)
logging.debug("Got movie: %s", movie) logging.debug("Got movie: %s", movie)
movie_id = await self.post("movies", data=movie.to_kyoo()) movie_id = await self._client.post("movies", data=movie.to_kyoo())
if any(movie.collections): if any(movie.collections):
ids = await asyncio.gather( ids = await asyncio.gather(
*(self.create_or_get_collection(x) for x in movie.collections) *(self.create_or_get_collection(x) for x in movie.collections)
) )
await asyncio.gather( await asyncio.gather(
*(self.link_collection(x, "movie", movie_id) for x in ids) *(self._client.link_collection(x, "movie", movie_id) for x in ids)
) )
elif raw["type"] == "episode": elif raw["type"] == "episode":
episode = await self.provider.identify_episode( episode = await self._provider.identify_episode(
raw["title"], raw["title"],
season=raw.get("season"), season=raw.get("season"),
episode_nbr=raw.get("episode"), episode_nbr=raw.get("episode"),
@ -144,7 +76,7 @@ class Scanner:
episode.season_id = await self.register_seasons( episode.season_id = await self.register_seasons(
episode.show, episode.show_id, episode.season_number episode.show, episode.show_id, episode.season_number
) )
await self.post("episodes", data=episode.to_kyoo()) await self._client.post("episodes", data=episode.to_kyoo())
else: else:
logging.warn("Unknown video file type: %s", raw["type"]) logging.warn("Unknown video file type: %s", raw["type"])
@ -153,48 +85,36 @@ class Scanner:
async def create_collection(provider_id: str): async def create_collection(provider_id: str):
# TODO: Check if a collection with the same metadata id exists already on kyoo. # TODO: Check if a collection with the same metadata id exists already on kyoo.
new_collection = ( new_collection = (
await self.provider.identify_collection(provider_id) await self._provider.identify_collection(provider_id)
if not any(collection.translations.keys()) if not any(collection.translations.keys())
else collection else collection
) )
logging.debug("Got collection: %s", new_collection) logging.debug("Got collection: %s", new_collection)
return await self.post("collection", data=new_collection.to_kyoo()) return await self._client.post("collection", data=new_collection.to_kyoo())
# The parameter is only used as a key for the cache. # The parameter is only used as a key for the cache.
provider_id = collection.external_id[self.provider.name].data_id provider_id = collection.external_id[self._provider.name].data_id
return await create_collection(provider_id) return await create_collection(provider_id)
async def link_collection(
self, collection: str, type: Literal["movie"] | Literal["show"], id: str
):
async with self._client.put(
f"{self._url}/collections/{collection}/{type}/{id}",
headers={"X-API-Key": self._api_key},
) as r:
# Allow 409 and continue as if it worked.
if not r.ok and r.status != 409:
logging.error(f"Request error: {await r.text()}")
r.raise_for_status()
async def create_or_get_show(self, episode: Episode) -> str: async def create_or_get_show(self, episode: Episode) -> str:
@cache(ttl=timedelta(days=1), cache=self._show_cache) @cache(ttl=timedelta(days=1), cache=self._show_cache)
async def create_show(_: str): async def create_show(_: str):
# TODO: Check if a show with the same metadata id exists already on kyoo. # TODO: Check if a show with the same metadata id exists already on kyoo.
show = ( show = (
await self.provider.identify_show( await self._provider.identify_show(
episode.show.external_id[self.provider.name].data_id, episode.show.external_id[self._provider.name].data_id,
) )
if isinstance(episode.show, PartialShow) if isinstance(episode.show, PartialShow)
else episode.show else episode.show
) )
# TODO: collections # TODO: collections
logging.debug("Got show: %s", episode) logging.debug("Got show: %s", episode)
ret = await self.post("show", data=show.to_kyoo()) ret = await self._client.post("show", data=show.to_kyoo())
async def create_season(season: Season, id: str): async def create_season(season: Season, id: str):
try: try:
season.show_id = id season.show_id = id
return await self.post("seasons", data=season.to_kyoo()) return await self._client.post("seasons", data=season.to_kyoo())
except Exception as e: except Exception as e:
logging.exception("Unhandled error create a season", exc_info=e) logging.exception("Unhandled error create a season", exc_info=e)
@ -211,7 +131,7 @@ class Scanner:
return ret return ret
# The parameter is only used as a key for the cache. # The parameter is only used as a key for the cache.
provider_id = episode.show.external_id[self.provider.name].data_id provider_id = episode.show.external_id[self._provider.name].data_id
return await create_show(provider_id) return await create_show(provider_id)
async def register_seasons( async def register_seasons(
@ -220,79 +140,10 @@ class Scanner:
# We use an external season cache because we want to edit this cache programatically # We use an external season cache because we want to edit this cache programatically
@cache(ttl=timedelta(days=1), cache=self._season_cache) @cache(ttl=timedelta(days=1), cache=self._season_cache)
async def create_season(_: str, __: int): async def create_season(_: str, __: int):
season = await self.provider.identify_season( season = await self._provider.identify_season(
show.external_id[self.provider.name].data_id, season_number show.external_id[self._provider.name].data_id, season_number
) )
season.show_id = show_id season.show_id = show_id
return await self.post("seasons", data=season.to_kyoo()) return await self._client.post("seasons", data=season.to_kyoo())
return await create_season(show_id, season_number) return await create_season(show_id, season_number)
async def post(self, path: str, *, data: dict[str, Any]) -> str:
logging.debug(
"Sending %s: %s",
path,
jsons.dumps(
data,
key_transformer=jsons.KEY_TRANSFORMER_CAMELCASE,
jdkwargs={"indent": 4},
),
)
async with self._client.post(
f"{self._url}/{path}",
json=data,
headers={"X-API-Key": self._api_key},
) as r:
# Allow 409 and continue as if it worked.
if not r.ok and r.status != 409:
logging.error(f"Request error: {await r.text()}")
r.raise_for_status()
ret = await r.json()
if r.status == 409 and (
(path == "shows" and ret["startAir"][:4] != str(data["start_air"].year))
or (
path == "movies"
and ret["airDate"][:4] != str(data["air_date"].year)
)
):
logging.info(
f"Found a {path} with the same slug ({ret['slug']}) and a different date, using the date as part of the slug"
)
year = (data["start_air"] if path == "movie" else data["air_date"]).year
data["slug"] = f"{ret['slug']}-{year}"
return await self.post(path, data=data)
return ret["id"]
async def delete(
self,
path: str,
type: Literal["episode", "movie", "issue"] | None = None,
):
logging.info("Deleting %s", path)
self.registered = filter(lambda x: x != path, self.registered)
if type is None or type == "movie":
async with self._client.delete(
f'{self._url}/movies?filter=path eq "{quote(path)}"',
headers={"X-API-Key": self._api_key},
) as r:
if not r.ok:
logging.error(f"Request error: {await r.text()}")
r.raise_for_status()
if type is None or type == "episode":
async with self._client.delete(
f'{self._url}/episodes?filter=path eq "{quote(path)}"',
headers={"X-API-Key": self._api_key},
) as r:
if not r.ok:
logging.error(f"Request error: {await r.text()}")
r.raise_for_status()
if path in self.issues:
self.issues = filter(lambda x: x != path, self.issues)
await self._client.delete(
f'{self._url}/issues?filter=domain eq scanner and cause eq "{quote(path)}"',
headers={"X-API-Key": self._api_key},
)

View File

@ -6,6 +6,7 @@
jsons jsons
watchfiles watchfiles
pika pika
aio-pika
requests requests
dataclasses-json dataclasses-json
]); ]);