wip: Reconnect to database on connection failure

This commit is contained in:
Zoe Roux 2025-05-19 12:29:06 +02:00
parent 71893edaf3
commit 8984061771
No known key found for this signature in database
2 changed files with 41 additions and 22 deletions

View File

@ -21,7 +21,7 @@ logging.getLogger("rebulk").setLevel(logging.WARNING)
@asynccontextmanager
async def lifespan(_):
async with (
init_pool(),
init_pool() as pool,
get_db() as db,
KyooClient() as client,
TheMovieDatabase() as tmdb,
@ -30,18 +30,17 @@ async def lifespan(_):
is_master = await db.fetchval("select pg_try_advisory_lock(198347)")
if is_master:
await migrate()
async with get_db() as scanner_db:
processor = RequestProcessor(db, client, tmdb)
scanner = FsScanner(client, RequestCreator(scanner_db))
tasks = create_task(
background_startup(
scanner,
processor,
is_master,
)
processor = RequestProcessor(pool, client, tmdb)
scanner = FsScanner(client, RequestCreator(db))
tasks = create_task(
background_startup(
scanner,
processor,
is_master,
)
yield
_ = tasks.cancel()
)
yield
_ = tasks.cancel()
async def background_startup(

View File

@ -1,6 +1,6 @@
from __future__ import annotations
from asyncio import CancelledError, Future, TaskGroup, sleep
from asyncio import CancelledError, Event, Future, TaskGroup, sleep
from logging import getLogger
from types import TracebackType
from typing import Literal, cast
@ -61,25 +61,45 @@ class RequestCreator:
class RequestProcessor:
def __init__(
self,
database: Connection,
pool: Pool,
client: KyooClient,
providers: Provider,
):
self._database = database
self._pool = pool
self._database: Connection = None # type: ignore
self._client = client
self._providers = providers
async def listen(self, tg: TaskGroup):
closed = Event()
def process(*_):
_ = tg.create_task(self.process_all())
try:
logger.info("Listening for requestes")
await self._database.add_listener("scanner_requests", process)
await Future()
except CancelledError as e:
logger.info("Stopped listening for requsets")
await self._database.remove_listener("scanner_requests", process)
def terminated(*_):
logger.info("terminated")
closed.set()
while True:
closed.clear()
logger.info("aquire")
try:
async with self._pool.acquire(timeout=10) as db:
try:
self._database = cast(Connection, db)
self._database.add_termination_listener(terminated)
await self._database.add_listener("scanner_requests", process)
logger.info("Listening for requestes")
_ = await closed.wait()
logger.info("stopping...")
except CancelledError as e:
logger.info("Stopped listening for requsets")
await self._database.remove_listener("scanner_requests", process)
self._database.remove_termination_listener(terminated)
raise
except TimeoutError:
logger.info("temiout")
async def process_all(self):
found = True