From c9fcbc8e8f5d7260ec453f45d11f730e842a34f2 Mon Sep 17 00:00:00 2001 From: Zoe Roux Date: Sun, 18 May 2025 22:22:27 +0200 Subject: [PATCH] Implement proper shutdown of scan/monitor tasks --- scanner/scanner/__init__.py | 28 +++++++++++++++++++--------- 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/scanner/scanner/__init__.py b/scanner/scanner/__init__.py index 25ab41f9..c13438d0 100644 --- a/scanner/scanner/__init__.py +++ b/scanner/scanner/__init__.py @@ -1,5 +1,5 @@ -import asyncio import logging +from asyncio import CancelledError, TaskGroup from contextlib import asynccontextmanager from fastapi import FastAPI @@ -29,19 +29,29 @@ async def lifespan(_): # there's no way someone else used the same id, right? is_master = await db.fetchval("select pg_try_advisory_lock(198347)") if is_master: - await migrate(); + await migrate() # creating the processor makes it listen to requests event in pg - provider = tmdb #CompositeProvider(tmdb) + provider = tmdb # CompositeProvider(tmdb) async with ( RequestProcessor(db, client, provider) as processor, get_db() as db, ): - _ = asyncio.create_task(processor.process_all()) - scanner = FsScanner(client, RequestCreator(db)) - if is_master: - _ = asyncio.create_task(scanner.monitor()) - _ = asyncio.create_task(scanner.scan(remove_deleted=True)) - yield + # see https://github.com/python/cpython/issues/108951 + try: + async with TaskGroup() as tg: + _ = tg.create_task(processor.process_all()) + scanner = FsScanner(client, RequestCreator(db)) + if is_master: + _ = tg.create_task(scanner.monitor()) + _ = tg.create_task(scanner.scan(remove_deleted=True)) + yield + _ = tg.create_task(cancel()) + except CancelledError: + pass + + +async def cancel(): + raise CancelledError() app = FastAPI(