mirror of
https://github.com/zoriya/Kyoo.git
synced 2026-03-28 12:27:51 -04:00
Implement metadata refresh
This commit is contained in:
parent
3a2aa61ac1
commit
59187a024b
4
api/drizzle/0029_next_refresh.sql
Normal file
4
api/drizzle/0029_next_refresh.sql
Normal file
@ -0,0 +1,4 @@
|
||||
CREATE INDEX "evj_video_pk" ON "kyoo"."entry_video_join" USING btree ("video_pk");--> statement-breakpoint
|
||||
ALTER TABLE "kyoo"."entries" DROP COLUMN "next_refresh";--> statement-breakpoint
|
||||
ALTER TABLE "kyoo"."seasons" DROP COLUMN "next_refresh";
|
||||
ALTER TABLE "kyoo"."shows" ALTER COLUMN "next_refresh" SET DATA TYPE date;
|
||||
2010
api/drizzle/meta/0029_snapshot.json
Normal file
2010
api/drizzle/meta/0029_snapshot.json
Normal file
File diff suppressed because it is too large
Load Diff
@ -204,6 +204,13 @@
|
||||
"when": 1771600000000,
|
||||
"tag": "0028_rating_jsonb",
|
||||
"breakpoints": true
|
||||
},
|
||||
{
|
||||
"idx": 29,
|
||||
"version": "7",
|
||||
"when": 1774623568394,
|
||||
"tag": "0029_next_refresh",
|
||||
"breakpoints": true
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
@ -99,7 +99,6 @@ export const entrySort = Sort(
|
||||
episodeNumber: entries.episodeNumber,
|
||||
number: entries.episodeNumber,
|
||||
airDate: entries.airDate,
|
||||
nextRefresh: entries.nextRefresh,
|
||||
playedDate: entryProgressQ.playedDate,
|
||||
},
|
||||
{
|
||||
|
||||
@ -35,7 +35,6 @@ const seasonSort = Sort(
|
||||
endAir: seasons.endAir,
|
||||
entriesCount: seasons.entriesCount,
|
||||
availableCount: seasons.availableCount,
|
||||
nextRefresh: seasons.nextRefresh,
|
||||
},
|
||||
{
|
||||
default: ["seasonNumber"],
|
||||
|
||||
@ -20,7 +20,7 @@ export const insertCollection = record(
|
||||
| ({ kind: "movie" } & SeedMovie)
|
||||
| ({ kind: "serie" } & SeedSerie)
|
||||
) & {
|
||||
nextRefresh: Date;
|
||||
nextRefresh: string;
|
||||
},
|
||||
original: Original,
|
||||
) => {
|
||||
|
||||
@ -12,7 +12,6 @@ import { KErrorT } from "~/models/error";
|
||||
import { record } from "~/otel";
|
||||
import { duplicates } from "~/utils";
|
||||
import { enqueueOptImage, flushImageQueue, type ImageTask } from "../images";
|
||||
import { guessNextRefresh } from "../refresh";
|
||||
import { updateAvailableCount, updateAvailableSince } from "./shows";
|
||||
|
||||
type SeedEntry = SEntry & {
|
||||
@ -66,10 +65,6 @@ export const insertEntries = record(
|
||||
url: seed.thumbnail,
|
||||
column: entries.thumbnail,
|
||||
}),
|
||||
nextRefresh:
|
||||
entry.kind !== "extra"
|
||||
? guessNextRefresh(entry.airDate ?? new Date())
|
||||
: guessNextRefresh(new Date()),
|
||||
episodeNumber:
|
||||
entry.kind === "episode"
|
||||
? entry.episodeNumber
|
||||
|
||||
@ -4,7 +4,6 @@ import { conflictUpdateAllExcept, unnestValues } from "~/db/utils";
|
||||
import type { SeedSeason } from "~/models/season";
|
||||
import { record } from "~/otel";
|
||||
import { enqueueOptImage, flushImageQueue, type ImageTask } from "../images";
|
||||
import { guessNextRefresh } from "../refresh";
|
||||
|
||||
type SeasonI = typeof seasons.$inferInsert;
|
||||
type SeasonTransI = typeof seasonTranslations.$inferInsert;
|
||||
@ -25,7 +24,6 @@ export const insertSeasons = record(
|
||||
season.seasonNumber === 0
|
||||
? `${show.slug}-specials`
|
||||
: `${show.slug}-s${season.seasonNumber}`,
|
||||
nextRefresh: guessNextRefresh(season.startAir ?? new Date()),
|
||||
};
|
||||
});
|
||||
const ret = await tx
|
||||
|
||||
@ -53,7 +53,7 @@ export const seedMovie = async (
|
||||
}
|
||||
|
||||
const { translations, videos, collection, studios, staff, ...movie } = seed;
|
||||
const nextRefresh = guessNextRefresh(movie.airDate ?? new Date());
|
||||
const nextRefresh = guessNextRefresh({ ...seed, kind: "movie" });
|
||||
const ori = translations[movie.originalLanguage];
|
||||
const original = ori
|
||||
? {
|
||||
|
||||
@ -1,6 +1,27 @@
|
||||
import type { SeedMovie } from "~/models/movie";
|
||||
import type { SeedSerie } from "~/models/serie";
|
||||
|
||||
export const guessNextRefresh = (
|
||||
show: (SeedSerie & { kind: "serie" }) | (SeedMovie & { kind: "movie" }),
|
||||
) => {
|
||||
if (show.kind === "movie") {
|
||||
return fromAirDate(show.airDate ?? new Date());
|
||||
}
|
||||
const lastAirDate = show.entries
|
||||
.filter((x) => x.airDate)
|
||||
.map((x) => new Date(x.airDate!))
|
||||
.reduce((max, cur) => (cur > max ? cur : max));
|
||||
return fromAirDate(lastAirDate);
|
||||
};
|
||||
|
||||
// oh i hate js dates so much.
|
||||
export const guessNextRefresh = (airDate: Date | string) => {
|
||||
const fromAirDate = (airDate: string | Date) => {
|
||||
if (typeof airDate === "string") airDate = new Date(airDate);
|
||||
|
||||
if (airDate.getTime() > Date.now()) {
|
||||
return airDate.toISOString().split("T")[0];
|
||||
}
|
||||
|
||||
const diff = Date.now() - airDate.getTime();
|
||||
const days = diff / (24 * 60 * 60 * 1000);
|
||||
|
||||
@ -8,5 +29,5 @@ export const guessNextRefresh = (airDate: Date | string) => {
|
||||
if (days <= 4) ret.setDate(ret.getDate() + 4);
|
||||
else if (days <= 21) ret.setDate(ret.getDate() + 14);
|
||||
else ret.setMonth(ret.getMonth() + 2);
|
||||
return ret;
|
||||
return ret.toISOString().split("T")[0];
|
||||
};
|
||||
|
||||
@ -89,7 +89,7 @@ export const seedSerie = async (
|
||||
staff,
|
||||
...serie
|
||||
} = seed;
|
||||
const nextRefresh = guessNextRefresh(serie.startAir ?? new Date());
|
||||
const nextRefresh = guessNextRefresh({ ...seed, kind: "serie" });
|
||||
const ori = translations[serie.originalLanguage];
|
||||
const original = ori
|
||||
? {
|
||||
|
||||
@ -91,6 +91,7 @@ export const showFilters: FilterDef = {
|
||||
column: (source: string) => sql`(${shows.rating}->>${source})::int`,
|
||||
type: "int",
|
||||
},
|
||||
nextRefresh: { column: shows.nextRefresh, type: "date" },
|
||||
};
|
||||
export const showSort = Sort(
|
||||
{
|
||||
|
||||
@ -78,7 +78,6 @@ export const entries = schema.table(
|
||||
.notNull()
|
||||
.$onUpdate(() => new Date()),
|
||||
availableSince: timestamp({ withTimezone: true, precision: 3 }),
|
||||
nextRefresh: timestamp({ withTimezone: true, precision: 3 }).notNull(),
|
||||
},
|
||||
(t) => [
|
||||
unique().on(t.showPk, t.seasonNumber, t.episodeNumber),
|
||||
|
||||
@ -51,7 +51,6 @@ export const seasons = schema.table(
|
||||
updatedAt: timestamp({ withTimezone: true, precision: 3 })
|
||||
.notNull()
|
||||
.$onUpdate(() => new Date()),
|
||||
nextRefresh: timestamp({ withTimezone: true, precision: 3 }).notNull(),
|
||||
},
|
||||
(t) => [
|
||||
unique().on(t.showPk, t.seasonNumber),
|
||||
|
||||
@ -93,7 +93,7 @@ export const shows = schema.table(
|
||||
updatedAt: timestamp({ withTimezone: true, precision: 3 })
|
||||
.notNull()
|
||||
.$onUpdate(() => new Date()),
|
||||
nextRefresh: timestamp({ withTimezone: true, precision: 3 }).notNull(),
|
||||
nextRefresh: date().notNull(),
|
||||
},
|
||||
(t) => [
|
||||
unique("kind_slug").on(t.kind, t.slug),
|
||||
|
||||
@ -32,7 +32,6 @@ const BaseCollection = t.Object({
|
||||
descrpition: "Date of the last item of the collection",
|
||||
}),
|
||||
),
|
||||
nextRefresh: t.Date(),
|
||||
externalId: ExternalId(),
|
||||
});
|
||||
|
||||
|
||||
@ -11,8 +11,6 @@ export const BaseEntry = () =>
|
||||
}),
|
||||
),
|
||||
thumbnail: t.Nullable(Image),
|
||||
|
||||
nextRefresh: t.Date(),
|
||||
});
|
||||
|
||||
export const EntryTranslation = () =>
|
||||
|
||||
@ -33,7 +33,7 @@ const BaseMovie = t.Object({
|
||||
t.Number({ minimum: 0, description: "Runtime of the movie in minutes." }),
|
||||
),
|
||||
airDate: t.Nullable(t.String({ format: "date" })),
|
||||
nextRefresh: t.Date(),
|
||||
nextRefresh: t.Nullable(t.String({ format: "date" })),
|
||||
externalId: ExternalId(),
|
||||
});
|
||||
|
||||
|
||||
@ -12,8 +12,6 @@ export const BaseSeason = t.Object({
|
||||
startAir: t.Nullable(t.String({ format: "date" })),
|
||||
endAir: t.Nullable(t.String({ format: "date" })),
|
||||
|
||||
nextRefresh: t.Date(),
|
||||
|
||||
externalId: SeasonId,
|
||||
});
|
||||
|
||||
|
||||
@ -43,7 +43,7 @@ const BaseSerie = t.Object({
|
||||
),
|
||||
startAir: t.Nullable(t.String({ format: "date" })),
|
||||
endAir: t.Nullable(t.String({ format: "date" })),
|
||||
nextRefresh: t.Date(),
|
||||
nextRefresh: t.Nullable(t.String({ format: "date" })),
|
||||
externalId: ExternalId(),
|
||||
});
|
||||
|
||||
|
||||
@ -34,7 +34,7 @@ export type Property = {
|
||||
export type Value =
|
||||
| { type: "int"; value: number }
|
||||
| { type: "float"; value: number }
|
||||
| { type: "date"; value: string }
|
||||
| { type: "date"; value: Date }
|
||||
| { type: "string"; value: string }
|
||||
| { type: "enum"; value: string }
|
||||
| { type: "bool"; value: boolean };
|
||||
@ -92,7 +92,7 @@ const dateVal = t(
|
||||
),
|
||||
map(([year, month, day]) => ({
|
||||
type: "date" as const,
|
||||
value: `${year}-${month}-${day}`,
|
||||
value: new Date(`${year}-${month}-${day}`),
|
||||
})),
|
||||
),
|
||||
).expects("a date");
|
||||
|
||||
@ -1,5 +1,7 @@
|
||||
from asyncio import CancelledError, TaskGroup, create_task, sleep
|
||||
from contextlib import asynccontextmanager
|
||||
from types import CoroutineType
|
||||
from typing import Any
|
||||
|
||||
from asyncpg import Connection
|
||||
from fastapi import FastAPI
|
||||
@ -9,6 +11,7 @@ from scanner.fsscan import FsScanner
|
||||
from scanner.log import configure_logging
|
||||
from scanner.otel import instrument, setup_otelproviders
|
||||
from scanner.providers.composite import CompositeProvider
|
||||
from scanner.refresh import ShowRefresh
|
||||
from scanner.providers.themoviedatabase import TheMovieDatabase
|
||||
from scanner.providers.thetvdb import TVDB
|
||||
from scanner.requests import RequestCreator, RequestProcessor
|
||||
@ -50,10 +53,13 @@ async def lifespan(app: FastAPI):
|
||||
client,
|
||||
app.state.provider,
|
||||
)
|
||||
scanner = FsScanner(client, RequestCreator(db))
|
||||
requests = RequestCreator(db)
|
||||
scanner = FsScanner(client, requests)
|
||||
refresh = ShowRefresh(client, requests)
|
||||
tasks = create_task(
|
||||
background_startup(
|
||||
scanner,
|
||||
refresh,
|
||||
processor,
|
||||
leader_db,
|
||||
is_master,
|
||||
@ -65,14 +71,15 @@ async def lifespan(app: FastAPI):
|
||||
|
||||
async def background_startup(
|
||||
scanner: FsScanner,
|
||||
refresh: ShowRefresh,
|
||||
processor: RequestProcessor,
|
||||
leader_db: Connection,
|
||||
is_master: bool | None,
|
||||
):
|
||||
async def scan():
|
||||
async def delay(task: CoroutineType[Any, Any, None]):
|
||||
# wait for everything to startup & resume before scanning
|
||||
await sleep(30)
|
||||
await scanner.scan(remove_deleted=True)
|
||||
await task
|
||||
|
||||
async def leader_worker(tg: TaskGroup):
|
||||
nonlocal is_master
|
||||
@ -83,7 +90,8 @@ async def background_startup(
|
||||
)
|
||||
|
||||
_ = tg.create_task(scanner.monitor())
|
||||
_ = tg.create_task(scan())
|
||||
_ = tg.create_task(delay(scanner.scan(remove_deleted=True)))
|
||||
_ = tg.create_task(delay(refresh.monitor()))
|
||||
|
||||
async with TaskGroup() as tg:
|
||||
_ = tg.create_task(processor.listen(tg))
|
||||
|
||||
@ -1,4 +1,5 @@
|
||||
import os
|
||||
from datetime import datetime, timezone
|
||||
from logging import getLogger
|
||||
from types import TracebackType
|
||||
from typing import Literal
|
||||
@ -7,8 +8,10 @@ from aiohttp import ClientResponse, ClientResponseError, ClientSession
|
||||
from pydantic import TypeAdapter
|
||||
|
||||
from .models.movie import Movie
|
||||
from .models.page import Page
|
||||
from .models.request import Request
|
||||
from .models.serie import Serie
|
||||
from .models.show import Show
|
||||
from .models.videos import For, Resource, Video, VideoCreated, VideoInfo, VideoLink
|
||||
from .utils import Singleton
|
||||
|
||||
@ -86,6 +89,14 @@ class KyooClient(metaclass=Singleton):
|
||||
await self.raise_for_status(r)
|
||||
return Resource.model_validate(await r.json())
|
||||
|
||||
async def get_shows_to_refresh(self, next: str | None) -> Page[Show]:
|
||||
now = datetime.now(timezone.utc).date()
|
||||
async with self._client.get(
|
||||
next or f"shows?sort=nextRefresh&filter=nextRefresh le {now}"
|
||||
) as r:
|
||||
await self.raise_for_status(r)
|
||||
return Page[Show].model_validate(await r.json())
|
||||
|
||||
async def link_videos(
|
||||
self,
|
||||
kind: Literal["movie", "serie"],
|
||||
|
||||
15
scanner/scanner/models/show.py
Normal file
15
scanner/scanner/models/show.py
Normal file
@ -0,0 +1,15 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import date
|
||||
from typing import Literal
|
||||
|
||||
from .metadataid import MetadataId
|
||||
from ..utils import Model
|
||||
|
||||
|
||||
class Show(Model):
|
||||
kind: Literal["movie", "serie", "collection"]
|
||||
name: str
|
||||
air_date: date | None = None
|
||||
start_air: date | None = None
|
||||
external_id: dict[str, list[MetadataId]]
|
||||
@ -241,7 +241,7 @@ class TVDB(Provider):
|
||||
original_language=Language.get(ret["originalLanguage"]),
|
||||
genres=[
|
||||
cast(Genre, self._genre_map[x["slug"]])
|
||||
for x in ret.get("genres", [])
|
||||
for x in (ret.get("genres", []) or [])
|
||||
if self._genre_map[x["slug"]] is not None
|
||||
],
|
||||
rating={}, # TODO: maybe use the `score` value.
|
||||
@ -438,7 +438,7 @@ class TVDB(Provider):
|
||||
original_language=Language.get(show["originalLanguage"]),
|
||||
genres=[
|
||||
cast(Genre, self._genre_map[x["slug"]])
|
||||
for x in show.get("genres", [])
|
||||
for x in (show.get("genres", []) or [])
|
||||
if self._genre_map[x["slug"]] is not None
|
||||
],
|
||||
rating={},
|
||||
@ -749,7 +749,7 @@ class TVDB(Provider):
|
||||
original_language=Language.get(ret["originalLanguage"]),
|
||||
genres=[
|
||||
cast(Genre, self._genre_map[x["slug"]])
|
||||
for x in ret.get("genres", [])
|
||||
for x in (ret.get("genres", []) or [])
|
||||
if self._genre_map[x["slug"]] is not None
|
||||
],
|
||||
rating={}, # TODO: maybe use the `score` value.
|
||||
|
||||
56
scanner/scanner/refresh.py
Normal file
56
scanner/scanner/refresh.py
Normal file
@ -0,0 +1,56 @@
|
||||
from asyncio import sleep
|
||||
from datetime import timedelta
|
||||
from logging import getLogger
|
||||
|
||||
from .client import KyooClient
|
||||
from .models.metadataid import MetadataId
|
||||
from .models.request import Request
|
||||
from .requests import RequestCreator
|
||||
|
||||
logger = getLogger(__name__)
|
||||
|
||||
|
||||
class ShowRefresh:
|
||||
def __init__(self, client: KyooClient, requests: RequestCreator):
|
||||
self._client = client
|
||||
self._requests = requests
|
||||
|
||||
async def monitor(self):
|
||||
while True:
|
||||
try:
|
||||
queued = await self.refresh_due_shows()
|
||||
logger.info("Queued %d shows for refresh.", queued)
|
||||
except Exception as e:
|
||||
logger.error("Unexpected error while refreshing shows.", exc_info=e)
|
||||
await sleep(timedelta(days=1).total_seconds())
|
||||
|
||||
async def refresh_due_shows(self) -> int:
|
||||
queued = 0
|
||||
next_url: str | None = None
|
||||
|
||||
while True:
|
||||
page = await self._client.get_shows_to_refresh(next_url)
|
||||
requests = [
|
||||
Request(
|
||||
kind="movie" if show.kind == "movie" else "episode",
|
||||
title=show.name,
|
||||
year=show.air_date.year
|
||||
if show.air_date is not None
|
||||
else show.start_air.year
|
||||
if show.start_air is not None
|
||||
else None,
|
||||
external_id=MetadataId.map_dict(show.external_id),
|
||||
videos=[],
|
||||
)
|
||||
for show in page.items
|
||||
if show.kind != "collection"
|
||||
]
|
||||
if requests:
|
||||
_ = await self._requests.enqueue(requests)
|
||||
queued += len(requests)
|
||||
|
||||
if not page.next:
|
||||
break
|
||||
next_url = page.next
|
||||
|
||||
return queued
|
||||
@ -87,8 +87,9 @@ func (s *MetadataService) extractThumbnail(ctx context.Context, path string, sha
|
||||
vttPath := getThumbVttPath(sha)
|
||||
spritePath := getThumbPath(sha)
|
||||
|
||||
alreadyOk, _ := s.storage.DoesItemExist(ctx, spritePath)
|
||||
if alreadyOk {
|
||||
spriteOk, _ := s.storage.DoesItemExist(ctx, spritePath)
|
||||
vttOk, _ := s.storage.DoesItemExist(ctx, vttPath)
|
||||
if spriteOk && vttOk {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user