diff --git a/.pg_format b/.pg_format index 188fd3bd..c9795ffe 100644 --- a/.pg_format +++ b/.pg_format @@ -5,4 +5,3 @@ type-case=1 no-space-function=1 keep-newline=1 nogrouping=1 -placeholder=%(\(\w+\))?s diff --git a/scanner/migrations/000001_request.up.sql b/scanner/migrations/000001_request.up.sql index 54895ac4..15842e7c 100644 --- a/scanner/migrations/000001_request.up.sql +++ b/scanner/migrations/000001_request.up.sql @@ -1,5 +1,3 @@ -begin; - create type scanner.request_kind as enum( 'episode', 'movie' @@ -17,12 +15,9 @@ create table scanner.requests( kind scanner.request_kind not null, title text not null, year integer, - external_id jsonb not null default '{}' ::jsonb, + external_id jsonb not null default '{}'::jsonb, status scanner.request_status not null, - started_at created_at timestamptz, - created_at created_at timestamptz not null default now() ::timestamptz, - constraint unique_kty(kind, title, year) + started_at timestamptz, + created_at timestamptz not null default now()::timestamptz, + constraint unique_kty unique(kind, title, year) ); - -commit; - diff --git a/scanner/scanner/__init__.py b/scanner/scanner/__init__.py index 28926962..cd6fdd3d 100644 --- a/scanner/scanner/__init__.py +++ b/scanner/scanner/__init__.py @@ -10,7 +10,7 @@ from scanner.providers.composite import CompositeProvider from scanner.providers.themoviedatabase import TheMovieDatabase from scanner.requests import RequestCreator, RequestProcessor -from .database import get_db, init_pool +from .database import get_db, init_pool, migrate from .routers.routes import router logging.basicConfig(level=logging.DEBUG) @@ -26,6 +26,7 @@ async def lifespan(_): KyooClient() as client, TheMovieDatabase() as tmdb, ): + await migrate(); # creating the processor makes it listen to requests event in pg async with ( RequestProcessor(db, client, CompositeProvider(tmdb)) as processor, diff --git a/scanner/scanner/database.py b/scanner/scanner/database.py index 0760775d..3ccb00e9 100644 --- a/scanner/scanner/database.py +++ b/scanner/scanner/database.py @@ -1,9 +1,12 @@ import os from contextlib import asynccontextmanager +from logging import getLogger from typing import Any, cast from asyncpg import Connection, Pool, create_pool +logger = getLogger(__name__) + pool: Pool @@ -30,3 +33,53 @@ async def init_pool(): async def get_db(): async with pool.acquire() as db: yield cast(Connection, db) + + +async def migrate(migrations_dir="./migrations"): + async with get_db() as db: + _ = await db.execute( + """ + create schema if not exists scanner; + + create table if not exists scanner._migrations( + pk serial primary key, + name text not null, + applied_at timestamptz not null default now() ::timestamptz + ); + """, + ) + + applied = await db.fetchval( + """ + select + count(*) + from + scanner._migrations + """ + ) + + if not os.path.exists(migrations_dir): + logger.warning(f"Migrations directory '{migrations_dir}' not found") + return + + migrations = sorted( + f for f in os.listdir(migrations_dir) if f.endswith("up.sql") + ) + for migration in migrations[applied:]: + file_path = os.path.join(migrations_dir, migration) + logger.info(f"Applying migration: {migration}") + try: + with open(file_path, "r") as f: + sql = f.read() + async with db.transaction(): + _ = await db.execute(sql) + _ = await db.execute( + """ + insert into scanner._migrations(name) + values ($1) + """, + migration, + ) + except Exception as e: + logger.error(f"Failed to apply migration {migration}", exc_info=e) + raise