diff --git a/api/src/controllers/seed/images.ts b/api/src/controllers/seed/images.ts index 106f9277..7faa39dc 100644 --- a/api/src/controllers/seed/images.ts +++ b/api/src/controllers/seed/images.ts @@ -68,7 +68,7 @@ export const enqueueOptImage = async ( kind: "image", message, }); - await tx.execute(sql`notify image`); + await tx.execute(sql`notify kyoo.image`); return { id, @@ -128,7 +128,7 @@ export const processImages = async () => { const client = (await db.$client.connect()) as PoolClient; client.on("notification", (evt) => { - if (evt.channel !== "image") return; + if (evt.channel !== "kyoo.image") return; processAll(); }); await client.query("listen image"); diff --git a/scanner/migrations/000001_request.up.sql b/scanner/migrations/000001_request.up.sql index d8601e18..54895ac4 100644 --- a/scanner/migrations/000001_request.up.sql +++ b/scanner/migrations/000001_request.up.sql @@ -21,8 +21,7 @@ create table scanner.requests( status scanner.request_status not null, started_at created_at timestamptz, created_at created_at timestamptz not null default now() ::timestamptz, - constraint unique_kty(kind, title, year), - constraint unique_eid(external_id) + constraint unique_kty(kind, title, year) ); commit; diff --git a/scanner/scanner/client.py b/scanner/scanner/client.py index 65fffa93..e211dcec 100644 --- a/scanner/scanner/client.py +++ b/scanner/scanner/client.py @@ -5,7 +5,7 @@ from aiohttp import ClientSession from .models.movie import Movie from .models.serie import Serie -from .models.videos import Video, VideoCreated, VideoInfo +from .models.videos import Resource, Video, VideoCreated, VideoInfo logger = getLogger(__name__) @@ -50,16 +50,18 @@ class KyooClient: ) as r: r.raise_for_status() - async def create_movie(self, movie: Movie): + async def create_movie(self, movie: Movie) -> Resource: async with self._client.post( "movies", json=movie.model_dump_json(), ) as r: r.raise_for_status() + return Resource(**await r.json()) - async def create_serie(self, serie: Serie): + async def create_serie(self, serie: Serie) -> Resource: async with self._client.post( "series", json=serie.model_dump_json(), ) as r: r.raise_for_status() + return Resource(**await r.json()) diff --git a/scanner/scanner/requests.py b/scanner/scanner/requests.py index 8cc418ce..acb5abb8 100644 --- a/scanner/scanner/requests.py +++ b/scanner/scanner/requests.py @@ -4,11 +4,11 @@ from logging import getLogger from typing import Literal from psycopg import AsyncConnection -from psycopg.rows import class_row +from psycopg.rows import class_row, dict_row from pydantic import Field from .client import KyooClient -from .models.videos import Guess +from .models.videos import Guess, Resource from .providers.composite import CompositeProvider from .utils import Model @@ -16,7 +16,7 @@ logger = getLogger(__name__) class Request(Model, extra="allow"): - pk: int = Field(exclude=True) + pk: int | None = Field(exclude=True, default=None) kind: Literal["episode", "movie"] title: str year: int | None @@ -51,15 +51,15 @@ class RequestProcessor: """, (x.model_dump() for x in requests), ) - # TODO: how will this conflict be handled if the request is already locked for update (being processed) + # TODO: how will this conflict be handled if the request is already running if cur.rowcount > 0: - _ = await cur.execute("notify requests") + _ = await cur.execute("notify scanner.requests") async def process_requests(self): - _ = await self._database.execute("listen requests") - gen = self._database.notifies() - async for _ in gen: - await self._process_request() + async with await self._database.execute("listen scanner.requests"): + gen = self._database.notifies() + async for _ in gen: + await self._process_request() async def _process_request(self): async with self._database.cursor(row_factory=class_row(Request)) as cur: @@ -91,14 +91,21 @@ class RequestProcessor: logger.info(f"Starting to process {request.title}") try: - await self._run_request(request) - cur = await cur.execute( - """ - delete from scanner.requests - where pk = %s - """, - [request.pk], - ) + show = await self._run_request(request) + + async with self._database.cursor(row_factory=dict_row) as cur: + cur = await cur.execute( + """ + delete from scanner.requests + where pk = %s + returning + videos + """, + [request.pk], + ) + finished = await anext(cur) + if finished["videos"] != request.videos: + await self._client.link_videos(show.slug, finished["videos"]) except Exception as e: logger.error("Couldn't process request", exc_info=e) cur = await cur.execute( @@ -113,7 +120,7 @@ class RequestProcessor: [request.pk], ) - async def _run_request(self, request: Request): + async def _run_request(self, request: Request) -> Resource: if request.kind == "movie": movie = await self._providers.find_movie( request.title, @@ -121,8 +128,7 @@ class RequestProcessor: request.external_id, ) movie.videos = [x.id for x in request.videos] - await self._client.create_movie(movie) - return + return await self._client.create_movie(movie) serie = await self._providers.find_serie( request.title, @@ -149,4 +155,4 @@ class RequestProcessor: ) continue entry.videos.append(vid.id) - await self._client.create_serie(serie) + return await self._client.create_serie(serie)