diff --git a/api/src/controllers/seed/images.ts b/api/src/controllers/seed/images.ts index 7faa39dc..621c91b0 100644 --- a/api/src/controllers/seed/images.ts +++ b/api/src/controllers/seed/images.ts @@ -68,7 +68,7 @@ export const enqueueOptImage = async ( kind: "image", message, }); - await tx.execute(sql`notify kyoo.image`); + await tx.execute(sql`notify kyoo_image`); return { id, @@ -128,7 +128,7 @@ export const processImages = async () => { const client = (await db.$client.connect()) as PoolClient; client.on("notification", (evt) => { - if (evt.channel !== "kyoo.image") return; + if (evt.channel !== "kyoo_image") return; processAll(); }); await client.query("listen image"); diff --git a/scanner/migrations/000001_request.down.sql b/scanner/migrations/000001_request.down.sql index bb9ff43a..06421546 100644 --- a/scanner/migrations/000001_request.down.sql +++ b/scanner/migrations/000001_request.down.sql @@ -1,8 +1,5 @@ -begin; - drop table scanner.requests; drop type scanner.request_kind; - -commit; +drop type scanner.request_status; diff --git a/scanner/migrations/000001_request.up.sql b/scanner/migrations/000001_request.up.sql index 15842e7c..fbbbe71d 100644 --- a/scanner/migrations/000001_request.up.sql +++ b/scanner/migrations/000001_request.up.sql @@ -16,7 +16,8 @@ create table scanner.requests( title text not null, year integer, external_id jsonb not null default '{}'::jsonb, - status scanner.request_status not null, + videos jsonb not null default '[]'::jsonb, + status scanner.request_status not null default 'pending', started_at timestamptz, created_at timestamptz not null default now()::timestamptz, constraint unique_kty unique(kind, title, year) diff --git a/scanner/scanner/__init__.py b/scanner/scanner/__init__.py index cd6fdd3d..b27457a1 100644 --- a/scanner/scanner/__init__.py +++ b/scanner/scanner/__init__.py @@ -26,15 +26,16 @@ async def lifespan(_): KyooClient() as client, TheMovieDatabase() as tmdb, ): - await migrate(); + # there's no way someone else used the same id, right? + is_master = await db.fetchval("select pg_try_advisory_lock(198347)") + if is_master: + await migrate(); # creating the processor makes it listen to requests event in pg async with ( RequestProcessor(db, client, CompositeProvider(tmdb)) as processor, get_db() as db, ): scanner = FsScanner(client, RequestCreator(db)) - # there's no way someone else used the same id, right? - is_master = await db.fetchval("select pg_try_advisory_lock(198347)") if is_master: _ = asyncio.create_task(scanner.monitor()) _ = asyncio.create_task(scanner.scan(remove_deleted=True)) diff --git a/scanner/scanner/database.py b/scanner/scanner/database.py index 3ccb00e9..aec0ce16 100644 --- a/scanner/scanner/database.py +++ b/scanner/scanner/database.py @@ -1,3 +1,4 @@ +import json import os from contextlib import asynccontextmanager from logging import getLogger @@ -32,6 +33,19 @@ async def init_pool(): @asynccontextmanager async def get_db(): async with pool.acquire() as db: + await db.set_type_codec( + "json", + encoder=json.dumps, + decoder=json.loads, + schema="pg_catalog", + ) + await db.set_type_codec( + "jsonb", + encoder=lambda data: b"\x01" + bytes(json.dumps(data), encoding="utf8"), + decoder=lambda data: json.loads(data[1:]), + schema="pg_catalog", + format="binary", + ) yield cast(Connection, db) @@ -44,9 +58,7 @@ async def migrate(migrations_dir="./migrations"): create table if not exists scanner._migrations( pk serial primary key, name text not null, - applied_at timestamptz not null default now() ::timestamptz - ); - """, + applied_at timestamptz not null default now() ::timestamptz)""", ) applied = await db.fetchval( diff --git a/scanner/scanner/requests.py b/scanner/scanner/requests.py index e6c91a77..2f6d7208 100644 --- a/scanner/scanner/requests.py +++ b/scanner/scanner/requests.py @@ -8,7 +8,7 @@ from asyncpg import Connection from pydantic import Field, TypeAdapter from .client import KyooClient -from .models.videos import Guess, Resource +from .models.videos import Guess, Resource, Video from .providers.composite import CompositeProvider from .utils import Model @@ -21,7 +21,7 @@ class Request(Model, extra="allow"): title: str year: int | None external_id: dict[str, str] - videos: list[Video] + videos: list[Request.Video] class Video(Model): id: str @@ -39,11 +39,14 @@ class RequestCreator: values ($1, $2, $3, $4, $5) on conflict (kind, title, year) do update set - videos = videos || excluded.videos + videos = requests.videos || excluded.videos """, - [[x.kind, x.title, x.year, x.external_id, x.videos] for x in requests], + [ + [x["kind"], x["title"], x["year"], x["external_id"], x["videos"]] + for x in TypeAdapter(list[Request]).dump_python(requests) + ], ) - _ = await self._database.execute("notify scanner.requests") + _ = await self._database.execute("notify scanner_requests") class RequestProcessor: @@ -59,7 +62,7 @@ class RequestProcessor: async def __aenter__(self): logger.info("Listening for requestes") - await self._database.add_listener("scanner.requests", self.process_request) + await self._database.add_listener("scanner_requests", self.process_request) return self async def __aexit__( @@ -68,7 +71,7 @@ class RequestProcessor: exc_value: BaseException | None, traceback: TracebackType | None, ): - await self._database.remove_listener("scanner.requests", self.process_request) + await self._database.remove_listener("scanner_requests", self.process_request) async def process_request(self): cur = await self._database.fetchrow( @@ -103,7 +106,7 @@ class RequestProcessor: finished = await self._database.fetchrow( """ delete from scanner.requests - where pk = %s + where pk = $1 returning videos """, @@ -120,7 +123,7 @@ class RequestProcessor: set status = 'failed' where - pk = %s + pk = $1 """, [request.pk], )