Implement request processor (listen for requests)

This commit is contained in:
Zoe Roux 2025-05-10 00:12:50 +02:00
parent 3e15b28ec1
commit dda496d88b
No known key found for this signature in database
4 changed files with 136 additions and 49 deletions

View File

@ -3,3 +3,6 @@ function-case=1 #lowercase
keyword-case=1 keyword-case=1
type-case=1 type-case=1
no-space-function=1 no-space-function=1
keep-newline=1
nogrouping=1
placeholder=%(\(\w+\))?s

View File

@ -1,6 +1,7 @@
begin; begin;
drop table scanner.requests; drop table scanner.requests;
drop type scanner.request_kind; drop type scanner.request_kind;
commit; commit;

View File

@ -1,6 +1,15 @@
begin; begin;
create type scanner.request_kind as enum('episode', 'movie'); create type scanner.request_kind as enum(
'episode',
'movie'
);
create type scanner.request_status as enum(
'pending',
'running',
'failed'
);
create table scanner.requests( create table scanner.requests(
pk serial primary key, pk serial primary key,
@ -9,10 +18,12 @@ create table scanner.requests(
title text not null, title text not null,
year integer, year integer,
external_id jsonb not null default '{}' ::jsonb, external_id jsonb not null default '{}' ::jsonb,
status scanner.request_status not null,
created_at timestamptz not null default now()::timestamptz, started_at created_at timestamptz,
created_at created_at timestamptz not null default now() ::timestamptz,
constraint unique_kty (kind, title, year) constraint unique_kty(kind, title, year),
constraint unique_eid(external_id)
); );
commit; commit;

View File

@ -3,15 +3,20 @@ from __future__ import annotations
from logging import getLogger from logging import getLogger
from typing import Literal from typing import Literal
from psycopg import AsyncConnection
from psycopg.rows import class_row
from pydantic import Field
from .client import KyooClient from .client import KyooClient
from .models.videos import Guess from .models.videos import Guess
from .old.composite import CompositeProvider from .providers.composite import CompositeProvider
from .utils import Model from .utils import Model
logger = getLogger(__name__) logger = getLogger(__name__)
class Request(Model): class Request(Model, extra="allow"):
pk: int = Field(exclude=True)
kind: Literal["episode", "movie"] kind: Literal["episode", "movie"]
title: str title: str
year: int | None year: int | None
@ -23,25 +28,92 @@ class Request(Model):
episodes: list[Guess.Episode] episodes: list[Guess.Episode]
async def enqueue(requests: list[Request]):
# insert all requests
# on conflict(kind,title,year) add to the `videos` list
# notify
# TODO: how will this conflict be handled if the request is already locked for update (being processed)
pass
class RequestProcessor: class RequestProcessor:
def __init__(self, client: KyooClient, providers: CompositeProvider): def __init__(
self,
database: AsyncConnection,
client: KyooClient,
providers: CompositeProvider,
):
self._database = database
self._client = client self._client = client
self._providers = providers self._providers = providers
async def process_scan_requests(self): async def enqueue(self, requests: list[Request]):
# select for update skip_locked limit 1 async with self._database.cursor() as cur:
request: Request = ... await cur.executemany(
"""
insert into scanner.requests(kind, title, year, external_id, videos)
values (%(kind)s, %(title) s, %(year)s, %(external_id)s, %(videos)s)
on conflict (kind, title, year)
do update set
videos = videos || excluded.videos
""",
(x.model_dump() for x in requests),
)
# TODO: how will this conflict be handled if the request is already locked for update (being processed)
if cur.rowcount > 0:
_ = await cur.execute("notify requests")
async def process_requests(self):
_ = await self._database.execute("listen requests")
gen = self._database.notifies()
async for _ in gen:
await self._process_request()
async def _process_request(self):
async with self._database.cursor(row_factory=class_row(Request)) as cur:
cur = await cur.execute(
"""
update
scanner.requests
set
status = 'running',
started_at = nom()::timestamptz
where
pk in (
select
*
from
scanner.requests
where
status = 'pending'
limit 1
for update
skip locked)
returning
*
"""
)
request = await cur.fetchone()
if request is None:
return
logger.info(f"Starting to process {request.title}")
try:
await self._run_request(request)
cur = await cur.execute(
"""
delete from scanner.requests
where pk = %s
""",
[request.pk],
)
except Exception as e:
logger.error("Couldn't process request", exc_info=e)
cur = await cur.execute(
"""
update
scanner.requests
set
status = 'failed'
where
pk = %s
""",
[request.pk],
)
async def _run_request(self, request: Request):
if request.kind == "movie": if request.kind == "movie":
movie = await self._providers.find_movie( movie = await self._providers.find_movie(
request.title, request.title,
@ -50,7 +122,8 @@ class RequestProcessor:
) )
movie.videos = [x.id for x in request.videos] movie.videos = [x.id for x in request.videos]
await self._client.create_movie(movie) await self._client.create_movie(movie)
else: return
serie = await self._providers.find_serie( serie = await self._providers.find_serie(
request.title, request.title,
request.year, request.year,
@ -77,4 +150,3 @@ class RequestProcessor:
continue continue
entry.videos.append(vid.id) entry.videos.append(vid.id)
await self._client.create_serie(serie) await self._client.create_serie(serie)
# delete request