From 8984061771b6d351e8851dfd5298f45e0c22df3d Mon Sep 17 00:00:00 2001 From: Zoe Roux Date: Mon, 19 May 2025 12:29:06 +0200 Subject: [PATCH] wip: Reconnect to database on connection failure --- scanner/scanner/__init__.py | 23 ++++++++++----------- scanner/scanner/requests.py | 40 +++++++++++++++++++++++++++---------- 2 files changed, 41 insertions(+), 22 deletions(-) diff --git a/scanner/scanner/__init__.py b/scanner/scanner/__init__.py index e9334a32..676e7d9e 100644 --- a/scanner/scanner/__init__.py +++ b/scanner/scanner/__init__.py @@ -21,7 +21,7 @@ logging.getLogger("rebulk").setLevel(logging.WARNING) @asynccontextmanager async def lifespan(_): async with ( - init_pool(), + init_pool() as pool, get_db() as db, KyooClient() as client, TheMovieDatabase() as tmdb, @@ -30,18 +30,17 @@ async def lifespan(_): is_master = await db.fetchval("select pg_try_advisory_lock(198347)") if is_master: await migrate() - async with get_db() as scanner_db: - processor = RequestProcessor(db, client, tmdb) - scanner = FsScanner(client, RequestCreator(scanner_db)) - tasks = create_task( - background_startup( - scanner, - processor, - is_master, - ) + processor = RequestProcessor(pool, client, tmdb) + scanner = FsScanner(client, RequestCreator(db)) + tasks = create_task( + background_startup( + scanner, + processor, + is_master, ) - yield - _ = tasks.cancel() + ) + yield + _ = tasks.cancel() async def background_startup( diff --git a/scanner/scanner/requests.py b/scanner/scanner/requests.py index a4d86dde..3a47e2e4 100644 --- a/scanner/scanner/requests.py +++ b/scanner/scanner/requests.py @@ -1,6 +1,6 @@ from __future__ import annotations -from asyncio import CancelledError, Future, TaskGroup, sleep +from asyncio import CancelledError, Event, Future, TaskGroup, sleep from logging import getLogger from types import TracebackType from typing import Literal, cast @@ -61,25 +61,45 @@ class RequestCreator: class RequestProcessor: def __init__( self, - database: Connection, + pool: Pool, client: KyooClient, providers: Provider, ): - self._database = database + self._pool = pool + self._database: Connection = None # type: ignore self._client = client self._providers = providers async def listen(self, tg: TaskGroup): + closed = Event() + def process(*_): _ = tg.create_task(self.process_all()) - try: - logger.info("Listening for requestes") - await self._database.add_listener("scanner_requests", process) - await Future() - except CancelledError as e: - logger.info("Stopped listening for requsets") - await self._database.remove_listener("scanner_requests", process) + def terminated(*_): + logger.info("terminated") + closed.set() + + while True: + closed.clear() + logger.info("aquire") + try: + async with self._pool.acquire(timeout=10) as db: + try: + self._database = cast(Connection, db) + self._database.add_termination_listener(terminated) + await self._database.add_listener("scanner_requests", process) + + logger.info("Listening for requestes") + _ = await closed.wait() + logger.info("stopping...") + except CancelledError as e: + logger.info("Stopped listening for requsets") + await self._database.remove_listener("scanner_requests", process) + self._database.remove_termination_listener(terminated) + raise + except TimeoutError: + logger.info("temiout") async def process_all(self): found = True