Implement proper shutdown of scan/monitor tasks

This commit is contained in:
Zoe Roux 2025-05-18 22:22:27 +02:00
parent bff46f7319
commit 71cc8c06c2
No known key found for this signature in database

View File

@ -1,5 +1,5 @@
import asyncio
import logging import logging
from asyncio import CancelledError, TaskGroup
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from fastapi import FastAPI from fastapi import FastAPI
@ -29,19 +29,29 @@ async def lifespan(_):
# there's no way someone else used the same id, right? # there's no way someone else used the same id, right?
is_master = await db.fetchval("select pg_try_advisory_lock(198347)") is_master = await db.fetchval("select pg_try_advisory_lock(198347)")
if is_master: if is_master:
await migrate(); await migrate()
# creating the processor makes it listen to requests event in pg # creating the processor makes it listen to requests event in pg
provider = tmdb #CompositeProvider(tmdb) provider = tmdb # CompositeProvider(tmdb)
async with ( async with (
RequestProcessor(db, client, provider) as processor, RequestProcessor(db, client, provider) as processor,
get_db() as db, get_db() as db,
): ):
_ = asyncio.create_task(processor.process_all()) # see https://github.com/python/cpython/issues/108951
try:
async with TaskGroup() as tg:
_ = tg.create_task(processor.process_all())
scanner = FsScanner(client, RequestCreator(db)) scanner = FsScanner(client, RequestCreator(db))
if is_master: if is_master:
_ = asyncio.create_task(scanner.monitor()) _ = tg.create_task(scanner.monitor())
_ = asyncio.create_task(scanner.scan(remove_deleted=True)) _ = tg.create_task(scanner.scan(remove_deleted=True))
yield yield
_ = tg.create_task(cancel())
except CancelledError:
pass
async def cancel():
raise CancelledError()
app = FastAPI( app = FastAPI(