wip: Link videos found after request started

This commit is contained in:
Zoe Roux 2025-05-10 01:14:56 +02:00
parent dda496d88b
commit 479e91e2e2
No known key found for this signature in database
4 changed files with 35 additions and 28 deletions

View File

@ -68,7 +68,7 @@ export const enqueueOptImage = async (
kind: "image",
message,
});
await tx.execute(sql`notify image`);
await tx.execute(sql`notify kyoo.image`);
return {
id,
@ -128,7 +128,7 @@ export const processImages = async () => {
const client = (await db.$client.connect()) as PoolClient;
client.on("notification", (evt) => {
if (evt.channel !== "image") return;
if (evt.channel !== "kyoo.image") return;
processAll();
});
await client.query("listen image");

View File

@ -21,8 +21,7 @@ create table scanner.requests(
status scanner.request_status not null,
started_at created_at timestamptz,
created_at created_at timestamptz not null default now() ::timestamptz,
constraint unique_kty(kind, title, year),
constraint unique_eid(external_id)
constraint unique_kty(kind, title, year)
);
commit;

View File

@ -5,7 +5,7 @@ from aiohttp import ClientSession
from .models.movie import Movie
from .models.serie import Serie
from .models.videos import Video, VideoCreated, VideoInfo
from .models.videos import Resource, Video, VideoCreated, VideoInfo
logger = getLogger(__name__)
@ -50,16 +50,18 @@ class KyooClient:
) as r:
r.raise_for_status()
async def create_movie(self, movie: Movie):
async def create_movie(self, movie: Movie) -> Resource:
async with self._client.post(
"movies",
json=movie.model_dump_json(),
) as r:
r.raise_for_status()
return Resource(**await r.json())
async def create_serie(self, serie: Serie):
async def create_serie(self, serie: Serie) -> Resource:
async with self._client.post(
"series",
json=serie.model_dump_json(),
) as r:
r.raise_for_status()
return Resource(**await r.json())

View File

@ -4,11 +4,11 @@ from logging import getLogger
from typing import Literal
from psycopg import AsyncConnection
from psycopg.rows import class_row
from psycopg.rows import class_row, dict_row
from pydantic import Field
from .client import KyooClient
from .models.videos import Guess
from .models.videos import Guess, Resource
from .providers.composite import CompositeProvider
from .utils import Model
@ -16,7 +16,7 @@ logger = getLogger(__name__)
class Request(Model, extra="allow"):
pk: int = Field(exclude=True)
pk: int | None = Field(exclude=True, default=None)
kind: Literal["episode", "movie"]
title: str
year: int | None
@ -51,15 +51,15 @@ class RequestProcessor:
""",
(x.model_dump() for x in requests),
)
# TODO: how will this conflict be handled if the request is already locked for update (being processed)
# TODO: how will this conflict be handled if the request is already running
if cur.rowcount > 0:
_ = await cur.execute("notify requests")
_ = await cur.execute("notify scanner.requests")
async def process_requests(self):
_ = await self._database.execute("listen requests")
gen = self._database.notifies()
async for _ in gen:
await self._process_request()
async with await self._database.execute("listen scanner.requests"):
gen = self._database.notifies()
async for _ in gen:
await self._process_request()
async def _process_request(self):
async with self._database.cursor(row_factory=class_row(Request)) as cur:
@ -91,14 +91,21 @@ class RequestProcessor:
logger.info(f"Starting to process {request.title}")
try:
await self._run_request(request)
cur = await cur.execute(
"""
delete from scanner.requests
where pk = %s
""",
[request.pk],
)
show = await self._run_request(request)
async with self._database.cursor(row_factory=dict_row) as cur:
cur = await cur.execute(
"""
delete from scanner.requests
where pk = %s
returning
videos
""",
[request.pk],
)
finished = await anext(cur)
if finished["videos"] != request.videos:
await self._client.link_videos(show.slug, finished["videos"])
except Exception as e:
logger.error("Couldn't process request", exc_info=e)
cur = await cur.execute(
@ -113,7 +120,7 @@ class RequestProcessor:
[request.pk],
)
async def _run_request(self, request: Request):
async def _run_request(self, request: Request) -> Resource:
if request.kind == "movie":
movie = await self._providers.find_movie(
request.title,
@ -121,8 +128,7 @@ class RequestProcessor:
request.external_id,
)
movie.videos = [x.id for x in request.videos]
await self._client.create_movie(movie)
return
return await self._client.create_movie(movie)
serie = await self._providers.find_serie(
request.title,
@ -149,4 +155,4 @@ class RequestProcessor:
)
continue
entry.videos.append(vid.id)
await self._client.create_serie(serie)
return await self._client.create_serie(serie)