Start scan & monitor on master instance of scanner

This commit is contained in:
Zoe Roux 2025-05-10 03:17:30 +02:00
parent 7098a8326d
commit de199eeec4
No known key found for this signature in database
3 changed files with 42 additions and 16 deletions

View File

@ -1,3 +1,4 @@
import asyncio
import logging
from contextlib import asynccontextmanager
@ -5,9 +6,10 @@ import asyncpg
from fastapi import FastAPI
from .client import KyooClient
from .fsscan import Scanner
from .providers.composite import CompositeProvider
from .providers.themoviedatabase import TheMovieDatabase
from .requests import RequestProcessor
from .requests import RequestCreator, RequestProcessor
logging.basicConfig(level=logging.INFO)
logging.getLogger("watchfiles").setLevel(logging.WARNING)
@ -19,8 +21,14 @@ async def lifetime():
async with (
await asyncpg.create_pool() as pool,
create_request_processor(pool) as processor,
create_scanner(pool) as (scanner, is_master),
):
await processor.listen_for_requests()
if is_master:
_ = await asyncio.gather(
scanner.scan(remove_deleted=True),
scanner.monitor(),
)
yield
@ -34,6 +42,17 @@ async def create_request_processor(pool: asyncpg.Pool):
yield RequestProcessor(db, client, CompositeProvider(themoviedb))
@asynccontextmanager
async def create_scanner(pool: asyncpg.Pool):
async with (
pool.acquire() as db,
KyooClient() as client,
):
# there's no way someone else used the same id, right?
is_master: bool = await db.fetchval("select pg_try_advisory_lock(198347)")
yield (Scanner(client, RequestCreator(db)), is_master)
app = FastAPI(
title="Scanner",
description="API to control the long running scanner or interacting with external databases (themoviedb, tvdb...)\n\n"

View File

@ -10,15 +10,17 @@ from .client import KyooClient
from .identifiers.identify import identify
from .models.metadataid import EpisodeId, MetadataId
from .models.videos import For, Video, VideoInfo
from .requests import Request, enqueue
from .requests import Request, RequestCreator
logger = getLogger(__name__)
class Scanner:
def __init__(self, client: KyooClient):
def __init__(self, client: KyooClient, requests: RequestCreator):
self._client = client
self._requests = requests
self._info: VideoInfo = None # type: ignore
self._root_path = os.environ.get("SCANNER_LIBRARY_ROOT", "/video")
try:
pattern = os.environ.get("LIBRARY_IGNORE_PATTERN")
self._ignore_pattern = re.compile(pattern) if pattern else None
@ -27,10 +29,10 @@ class Scanner:
async def scan(self, path: str | None = None, remove_deleted=False):
if path is None:
path = self._root_path
logger.info("Starting scan at %s. This may take some time...", path)
if self._ignore_pattern:
logger.info(f"Applying ignore pattern: {self._ignore_pattern}")
path = path or os.environ.get("SCANNER_LIBRARY_ROOT", "/video")
videos = self.walk_fs(path)
self._info = await self._client.get_videos_info()
@ -58,8 +60,8 @@ class Scanner:
logger.info("Scan finished for %s.", path)
async def monitor(self, path: str, client: KyooClient):
async for changes in awatch(path, ignore_permission_denied=True):
async def monitor(self):
async for changes in awatch(self._root_path, ignore_permission_denied=True):
for event, file in changes:
if not isdir(file) and not is_video(file):
continue
@ -78,7 +80,7 @@ class Scanner:
await self._register([file])
case Change.deleted:
logger.info("Delete video at: %s", file)
await client.delete_videos([file])
await self._client.delete_videos([file])
case Change.modified:
pass

View File

@ -27,16 +27,9 @@ class Request(Model, extra="allow"):
episodes: list[Guess.Episode]
class RequestProcessor:
def __init__(
self,
database: Connection,
client: KyooClient,
providers: CompositeProvider,
):
class RequestCreator:
def __init__(self, database: Connection):
self._database = database
self._client = client
self._providers = providers
async def enqueue(self, requests: list[Request]):
await self._database.executemany(
@ -51,6 +44,18 @@ class RequestProcessor:
)
_ = await self._database.execute("notify scanner.requests")
class RequestProcessor:
def __init__(
self,
database: Connection,
client: KyooClient,
providers: CompositeProvider,
):
self._database = database
self._client = client
self._providers = providers
async def listen_for_requests(self):
logger.info("Listening for requestes")
await self._database.add_listener("scanner.requests", self.process_request)