diff --git a/.pg_format b/.pg_format index 882260a2..188fd3bd 100644 --- a/.pg_format +++ b/.pg_format @@ -3,3 +3,6 @@ function-case=1 #lowercase keyword-case=1 type-case=1 no-space-function=1 +keep-newline=1 +nogrouping=1 +placeholder=%(\(\w+\))?s diff --git a/scanner/migrations/000001_request.down.sql b/scanner/migrations/000001_request.down.sql index 3a175421..bb9ff43a 100644 --- a/scanner/migrations/000001_request.down.sql +++ b/scanner/migrations/000001_request.down.sql @@ -1,6 +1,7 @@ begin; drop table scanner.requests; + drop type scanner.request_kind; commit; diff --git a/scanner/migrations/000001_request.up.sql b/scanner/migrations/000001_request.up.sql index 5ab9e1a8..d8601e18 100644 --- a/scanner/migrations/000001_request.up.sql +++ b/scanner/migrations/000001_request.up.sql @@ -1,6 +1,15 @@ begin; -create type scanner.request_kind as enum('episode', 'movie'); +create type scanner.request_kind as enum( + 'episode', + 'movie' +); + +create type scanner.request_status as enum( + 'pending', + 'running', + 'failed' +); create table scanner.requests( pk serial primary key, @@ -8,11 +17,13 @@ create table scanner.requests( kind scanner.request_kind not null, title text not null, year integer, - external_id jsonb not null default '{}'::jsonb, - - created_at timestamptz not null default now()::timestamptz, - - constraint unique_kty (kind, title, year) + external_id jsonb not null default '{}' ::jsonb, + status scanner.request_status not null, + started_at created_at timestamptz, + created_at created_at timestamptz not null default now() ::timestamptz, + constraint unique_kty(kind, title, year), + constraint unique_eid(external_id) ); commit; + diff --git a/scanner/scanner/requests.py b/scanner/scanner/requests.py index 7edf7817..8cc418ce 100644 --- a/scanner/scanner/requests.py +++ b/scanner/scanner/requests.py @@ -3,15 +3,20 @@ from __future__ import annotations from logging import getLogger from typing import Literal +from psycopg import AsyncConnection +from psycopg.rows import class_row +from pydantic import Field + from .client import KyooClient from .models.videos import Guess -from .old.composite import CompositeProvider +from .providers.composite import CompositeProvider from .utils import Model logger = getLogger(__name__) -class Request(Model): +class Request(Model, extra="allow"): + pk: int = Field(exclude=True) kind: Literal["episode", "movie"] title: str year: int | None @@ -23,25 +28,92 @@ class Request(Model): episodes: list[Guess.Episode] -async def enqueue(requests: list[Request]): - # insert all requests - # on conflict(kind,title,year) add to the `videos` list - - # notify - - # TODO: how will this conflict be handled if the request is already locked for update (being processed) - pass - - class RequestProcessor: - def __init__(self, client: KyooClient, providers: CompositeProvider): + def __init__( + self, + database: AsyncConnection, + client: KyooClient, + providers: CompositeProvider, + ): + self._database = database self._client = client self._providers = providers - async def process_scan_requests(self): - # select for update skip_locked limit 1 - request: Request = ... + async def enqueue(self, requests: list[Request]): + async with self._database.cursor() as cur: + await cur.executemany( + """ + insert into scanner.requests(kind, title, year, external_id, videos) + values (%(kind)s, %(title) s, %(year)s, %(external_id)s, %(videos)s) + on conflict (kind, title, year) + do update set + videos = videos || excluded.videos + """, + (x.model_dump() for x in requests), + ) + # TODO: how will this conflict be handled if the request is already locked for update (being processed) + if cur.rowcount > 0: + _ = await cur.execute("notify requests") + async def process_requests(self): + _ = await self._database.execute("listen requests") + gen = self._database.notifies() + async for _ in gen: + await self._process_request() + + async def _process_request(self): + async with self._database.cursor(row_factory=class_row(Request)) as cur: + cur = await cur.execute( + """ + update + scanner.requests + set + status = 'running', + started_at = nom()::timestamptz + where + pk in ( + select + * + from + scanner.requests + where + status = 'pending' + limit 1 + for update + skip locked) + returning + * + """ + ) + request = await cur.fetchone() + if request is None: + return + + logger.info(f"Starting to process {request.title}") + try: + await self._run_request(request) + cur = await cur.execute( + """ + delete from scanner.requests + where pk = %s + """, + [request.pk], + ) + except Exception as e: + logger.error("Couldn't process request", exc_info=e) + cur = await cur.execute( + """ + update + scanner.requests + set + status = 'failed' + where + pk = %s + """, + [request.pk], + ) + + async def _run_request(self, request: Request): if request.kind == "movie": movie = await self._providers.find_movie( request.title, @@ -50,31 +122,31 @@ class RequestProcessor: ) movie.videos = [x.id for x in request.videos] await self._client.create_movie(movie) - else: - serie = await self._providers.find_serie( - request.title, - request.year, - request.external_id, - ) - for vid in request.videos: - for ep in vid.episodes: - entry = next( - ( - x - for x in serie.entries - if (ep.season is None and x.order == ep.episode) - or ( - x.season_number == ep.season - and x.episode_number == ep.episode - ) - ), - None, - ) - if entry is None: - logger.warning( - f"Couldn't match entry for {serie.slug} {ep.season or 'abs'}-e{ep.episode}." + return + + serie = await self._providers.find_serie( + request.title, + request.year, + request.external_id, + ) + for vid in request.videos: + for ep in vid.episodes: + entry = next( + ( + x + for x in serie.entries + if (ep.season is None and x.order == ep.episode) + or ( + x.season_number == ep.season + and x.episode_number == ep.episode ) - continue - entry.videos.append(vid.id) - await self._client.create_serie(serie) - # delete request + ), + None, + ) + if entry is None: + logger.warning( + f"Couldn't match entry for {serie.slug} {ep.season or 'abs'}-e{ep.episode}." + ) + continue + entry.videos.append(vid.id) + await self._client.create_serie(serie)