Cleanup task handling/cancellation of background tasks

This commit is contained in:
Zoe Roux 2025-05-19 01:17:33 +02:00
parent 90cee41595
commit 6ec2c3e61b
No known key found for this signature in database
2 changed files with 20 additions and 20 deletions

View File

@ -1,5 +1,5 @@
import logging import logging
from asyncio import CancelledError, TaskGroup from asyncio import CancelledError, TaskGroup, create_task
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from fastapi import FastAPI from fastapi import FastAPI
@ -25,29 +25,29 @@ async def lifespan(_):
get_db() as db, get_db() as db,
KyooClient() as client, KyooClient() as client,
TheMovieDatabase() as tmdb, TheMovieDatabase() as tmdb,
RequestProcessor(db, client, tmdb) as processor,
): ):
# there's no way someone else used the same id, right? # there's no way someone else used the same id, right?
is_master = await db.fetchval("select pg_try_advisory_lock(198347)") is_master = await db.fetchval("select pg_try_advisory_lock(198347)")
if is_master: if is_master:
await migrate() await migrate()
# creating the processor makes it listen to requests event in pg async with get_db() as db:
provider = tmdb # CompositeProvider(tmdb) scanner = FsScanner(client, RequestCreator(db))
async with ( tasks = create_task(background_startup(scanner, processor, is_master))
RequestProcessor(db, client, provider) as processor, yield
get_db() as db, _ = tasks.cancel()
):
# see https://github.com/python/cpython/issues/108951
try: async def background_startup(
async with TaskGroup() as tg: scanner: FsScanner,
_ = tg.create_task(processor.process_all()) processor: RequestProcessor,
scanner = FsScanner(client, RequestCreator(db)) is_master: bool | None,
if is_master: ):
_ = tg.create_task(scanner.monitor()) async with TaskGroup() as tg:
_ = tg.create_task(scanner.scan(remove_deleted=True)) _ = tg.create_task(processor.listen())
yield if is_master:
_ = tg.create_task(cancel()) _ = tg.create_task(scanner.monitor())
except CancelledError: _ = tg.create_task(scanner.scan(remove_deleted=True))
pass
async def cancel(): async def cancel():

View File

@ -26,7 +26,7 @@ async def init_pool():
async with await create_pool(**connection) as p: async with await create_pool(**connection) as p:
global pool global pool
pool = p pool = p
yield yield pool
pool = None # type: ignore pool = None # type: ignore