Fix request creation

This commit is contained in:
Zoe Roux 2025-05-14 10:44:16 +02:00
parent 2fc696dde9
commit a9b7d18a6e
No known key found for this signature in database
6 changed files with 36 additions and 22 deletions

View File

@ -68,7 +68,7 @@ export const enqueueOptImage = async (
kind: "image", kind: "image",
message, message,
}); });
await tx.execute(sql`notify kyoo.image`); await tx.execute(sql`notify kyoo_image`);
return { return {
id, id,
@ -128,7 +128,7 @@ export const processImages = async () => {
const client = (await db.$client.connect()) as PoolClient; const client = (await db.$client.connect()) as PoolClient;
client.on("notification", (evt) => { client.on("notification", (evt) => {
if (evt.channel !== "kyoo.image") return; if (evt.channel !== "kyoo_image") return;
processAll(); processAll();
}); });
await client.query("listen image"); await client.query("listen image");

View File

@ -1,8 +1,5 @@
begin;
drop table scanner.requests; drop table scanner.requests;
drop type scanner.request_kind; drop type scanner.request_kind;
drop type scanner.request_status;
commit;

View File

@ -16,7 +16,8 @@ create table scanner.requests(
title text not null, title text not null,
year integer, year integer,
external_id jsonb not null default '{}'::jsonb, external_id jsonb not null default '{}'::jsonb,
status scanner.request_status not null, videos jsonb not null default '[]'::jsonb,
status scanner.request_status not null default 'pending',
started_at timestamptz, started_at timestamptz,
created_at timestamptz not null default now()::timestamptz, created_at timestamptz not null default now()::timestamptz,
constraint unique_kty unique(kind, title, year) constraint unique_kty unique(kind, title, year)

View File

@ -26,15 +26,16 @@ async def lifespan(_):
KyooClient() as client, KyooClient() as client,
TheMovieDatabase() as tmdb, TheMovieDatabase() as tmdb,
): ):
await migrate(); # there's no way someone else used the same id, right?
is_master = await db.fetchval("select pg_try_advisory_lock(198347)")
if is_master:
await migrate();
# creating the processor makes it listen to requests event in pg # creating the processor makes it listen to requests event in pg
async with ( async with (
RequestProcessor(db, client, CompositeProvider(tmdb)) as processor, RequestProcessor(db, client, CompositeProvider(tmdb)) as processor,
get_db() as db, get_db() as db,
): ):
scanner = FsScanner(client, RequestCreator(db)) scanner = FsScanner(client, RequestCreator(db))
# there's no way someone else used the same id, right?
is_master = await db.fetchval("select pg_try_advisory_lock(198347)")
if is_master: if is_master:
_ = asyncio.create_task(scanner.monitor()) _ = asyncio.create_task(scanner.monitor())
_ = asyncio.create_task(scanner.scan(remove_deleted=True)) _ = asyncio.create_task(scanner.scan(remove_deleted=True))

View File

@ -1,3 +1,4 @@
import json
import os import os
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from logging import getLogger from logging import getLogger
@ -32,6 +33,19 @@ async def init_pool():
@asynccontextmanager @asynccontextmanager
async def get_db(): async def get_db():
async with pool.acquire() as db: async with pool.acquire() as db:
await db.set_type_codec(
"json",
encoder=json.dumps,
decoder=json.loads,
schema="pg_catalog",
)
await db.set_type_codec(
"jsonb",
encoder=lambda data: b"\x01" + bytes(json.dumps(data), encoding="utf8"),
decoder=lambda data: json.loads(data[1:]),
schema="pg_catalog",
format="binary",
)
yield cast(Connection, db) yield cast(Connection, db)
@ -44,9 +58,7 @@ async def migrate(migrations_dir="./migrations"):
create table if not exists scanner._migrations( create table if not exists scanner._migrations(
pk serial primary key, pk serial primary key,
name text not null, name text not null,
applied_at timestamptz not null default now() ::timestamptz applied_at timestamptz not null default now() ::timestamptz)""",
);
""",
) )
applied = await db.fetchval( applied = await db.fetchval(

View File

@ -8,7 +8,7 @@ from asyncpg import Connection
from pydantic import Field, TypeAdapter from pydantic import Field, TypeAdapter
from .client import KyooClient from .client import KyooClient
from .models.videos import Guess, Resource from .models.videos import Guess, Resource, Video
from .providers.composite import CompositeProvider from .providers.composite import CompositeProvider
from .utils import Model from .utils import Model
@ -21,7 +21,7 @@ class Request(Model, extra="allow"):
title: str title: str
year: int | None year: int | None
external_id: dict[str, str] external_id: dict[str, str]
videos: list[Video] videos: list[Request.Video]
class Video(Model): class Video(Model):
id: str id: str
@ -39,11 +39,14 @@ class RequestCreator:
values ($1, $2, $3, $4, $5) values ($1, $2, $3, $4, $5)
on conflict (kind, title, year) on conflict (kind, title, year)
do update set do update set
videos = videos || excluded.videos videos = requests.videos || excluded.videos
""", """,
[[x.kind, x.title, x.year, x.external_id, x.videos] for x in requests], [
[x["kind"], x["title"], x["year"], x["external_id"], x["videos"]]
for x in TypeAdapter(list[Request]).dump_python(requests)
],
) )
_ = await self._database.execute("notify scanner.requests") _ = await self._database.execute("notify scanner_requests")
class RequestProcessor: class RequestProcessor:
@ -59,7 +62,7 @@ class RequestProcessor:
async def __aenter__(self): async def __aenter__(self):
logger.info("Listening for requestes") logger.info("Listening for requestes")
await self._database.add_listener("scanner.requests", self.process_request) await self._database.add_listener("scanner_requests", self.process_request)
return self return self
async def __aexit__( async def __aexit__(
@ -68,7 +71,7 @@ class RequestProcessor:
exc_value: BaseException | None, exc_value: BaseException | None,
traceback: TracebackType | None, traceback: TracebackType | None,
): ):
await self._database.remove_listener("scanner.requests", self.process_request) await self._database.remove_listener("scanner_requests", self.process_request)
async def process_request(self): async def process_request(self):
cur = await self._database.fetchrow( cur = await self._database.fetchrow(
@ -103,7 +106,7 @@ class RequestProcessor:
finished = await self._database.fetchrow( finished = await self._database.fetchrow(
""" """
delete from scanner.requests delete from scanner.requests
where pk = %s where pk = $1
returning returning
videos videos
""", """,
@ -120,7 +123,7 @@ class RequestProcessor:
set set
status = 'failed' status = 'failed'
where where
pk = %s pk = $1
""", """,
[request.pk], [request.pk],
) )