diff --git a/scanner/scanner/__init__.py b/scanner/scanner/__init__.py index 25ab41f9..c13438d0 100644 --- a/scanner/scanner/__init__.py +++ b/scanner/scanner/__init__.py @@ -1,5 +1,5 @@ -import asyncio import logging +from asyncio import CancelledError, TaskGroup from contextlib import asynccontextmanager from fastapi import FastAPI @@ -29,19 +29,29 @@ async def lifespan(_): # there's no way someone else used the same id, right? is_master = await db.fetchval("select pg_try_advisory_lock(198347)") if is_master: - await migrate(); + await migrate() # creating the processor makes it listen to requests event in pg - provider = tmdb #CompositeProvider(tmdb) + provider = tmdb # CompositeProvider(tmdb) async with ( RequestProcessor(db, client, provider) as processor, get_db() as db, ): - _ = asyncio.create_task(processor.process_all()) - scanner = FsScanner(client, RequestCreator(db)) - if is_master: - _ = asyncio.create_task(scanner.monitor()) - _ = asyncio.create_task(scanner.scan(remove_deleted=True)) - yield + # see https://github.com/python/cpython/issues/108951 + try: + async with TaskGroup() as tg: + _ = tg.create_task(processor.process_all()) + scanner = FsScanner(client, RequestCreator(db)) + if is_master: + _ = tg.create_task(scanner.monitor()) + _ = tg.create_task(scanner.scan(remove_deleted=True)) + yield + _ = tg.create_task(cancel()) + except CancelledError: + pass + + +async def cancel(): + raise CancelledError() app = FastAPI(