Use reworked image queue in every insert

This commit is contained in:
Zoe Roux 2025-03-16 17:58:13 +01:00
parent 71b8cbca4a
commit 9ef114b91a
No known key found for this signature in database
11 changed files with 218 additions and 147 deletions

View File

@ -2,10 +2,11 @@ import { mkdir, writeFile } from "node:fs/promises";
import path from "node:path"; import path from "node:path";
import { encode } from "blurhash"; import { encode } from "blurhash";
import { eq, sql } from "drizzle-orm"; import { eq, sql } from "drizzle-orm";
import type { PgColumn } from "drizzle-orm/pg-core";
import { version } from "package.json"; import { version } from "package.json";
import type { PoolClient } from "pg"; import type { PoolClient } from "pg";
import sharp from "sharp"; import sharp from "sharp";
import { db } from "~/db"; import { type Transaction, db } from "~/db";
import * as schema from "~/db/schema"; import * as schema from "~/db/schema";
import { mqueue } from "~/db/schema/queue"; import { mqueue } from "~/db/schema/queue";
import type { Image } from "~/models/utils"; import type { Image } from "~/models/utils";
@ -20,18 +21,31 @@ type ImageTask = {
column: string; column: string;
}; };
type ImageTaskC = {
url: string;
column: PgColumn;
};
// this will only push a task to the image downloader service and not download it instantly. // this will only push a task to the image downloader service and not download it instantly.
// this is both done to prevent to many requests to be sent at once and to make sure POST // this is both done to prevent to many requests to be sent at once and to make sure POST
// requests are not blocked by image downloading or blurhash calculation // requests are not blocked by image downloading or blurhash calculation
export const enqueueImage = async ( export const enqueueImage = async (
tx: typeof db, tx: Transaction,
img: Omit<ImageTask, "id">, img: ImageTaskC,
): Promise<Image> => { ): Promise<Image> => {
const hasher = new Bun.CryptoHasher("sha256"); const hasher = new Bun.CryptoHasher("sha256");
hasher.update(img.url); hasher.update(img.url);
const id = hasher.digest().toString("hex"); const id = hasher.digest().toString("hex");
await tx.insert(mqueue).values({ kind: "image", message: { id, ...img } }); await tx.insert(mqueue).values({
kind: "image",
message: {
id,
url: img.url,
table: img.column.table._.name,
column: img.column.name,
} satisfies ImageTask,
});
await tx.execute(sql`notify image`); await tx.execute(sql`notify image`);
return { return {
@ -42,11 +56,11 @@ export const enqueueImage = async (
}; };
export const enqueueOptImage = async ( export const enqueueOptImage = async (
tx: typeof db, tx: Transaction,
img: Omit<ImageTask, "id">, img: { url: string | null; column: PgColumn },
): Promise<Image | null> => { ): Promise<Image | null> => {
if (!img.url) return null; if (!img.url) return null;
return await enqueueImage(tx, img); return await enqueueImage(tx, { url: img.url, column: img.column });
}; };
export const processImages = async () => { export const processImages = async () => {
@ -107,6 +121,7 @@ export const processImages = async () => {
}; };
async function downloadImage(id: string, url: string): Promise<string> { async function downloadImage(id: string, url: string): Promise<string> {
// TODO: check if file exists before downloading
const resp = await fetch(url, { const resp = await fetch(url, {
headers: { "User-Agent": `Kyoo v${version}` }, headers: { "User-Agent": `Kyoo v${version}` },
}); });

View File

@ -48,16 +48,28 @@ export const insertCollection = async (
}) })
.returning({ pk: shows.pk, id: shows.id, slug: shows.slug }); .returning({ pk: shows.pk, id: shows.id, slug: shows.slug });
const trans: ShowTrans[] = Object.entries(translations).map( const trans: ShowTrans[] = await Promise.all(
([lang, tr]) => ({ Object.entries(translations).map(async ([lang, tr]) => ({
pk: ret.pk, pk: ret.pk,
language: lang, language: lang,
...tr, ...tr,
poster: enqueueOptImage(tr.poster), poster: await enqueueOptImage(tx, {
thumbnail: enqueueOptImage(tr.thumbnail), url: tr.poster,
logo: enqueueOptImage(tr.logo), column: showTranslations.poster,
banner: enqueueOptImage(tr.banner), }),
}), thumbnail: await enqueueOptImage(tx, {
url: tr.thumbnail,
column: showTranslations.thumbnail,
}),
logo: await enqueueOptImage(tx, {
url: tr.logo,
column: showTranslations.logo,
}),
banner: await enqueueOptImage(tx, {
url: tr.banner,
column: showTranslations.banner,
}),
})),
); );
await tx await tx
.insert(showTranslations) .insert(showTranslations)

View File

@ -23,6 +23,7 @@ type SeedExtra = Omit<SExtra, "kind"> & {
}; };
type EntryI = typeof entries.$inferInsert; type EntryI = typeof entries.$inferInsert;
type EntryTransI = typeof entryTranslations.$inferInsert;
const generateSlug = ( const generateSlug = (
showSlug: string, showSlug: string,
@ -49,25 +50,30 @@ export const insertEntries = async (
if (!items) return []; if (!items) return [];
const retEntries = await db.transaction(async (tx) => { const retEntries = await db.transaction(async (tx) => {
const vals: EntryI[] = items.map((seed) => { const vals: EntryI[] = await Promise.all(
const { translations, videos, video, ...entry } = seed; items.map(async (seed) => {
return { const { translations, videos, video, ...entry } = seed;
...entry, return {
showPk: show.pk, ...entry,
slug: generateSlug(show.slug, seed), showPk: show.pk,
thumbnail: enqueueOptImage(seed.thumbnail), slug: generateSlug(show.slug, seed),
nextRefresh: thumbnail: await enqueueOptImage(tx, {
entry.kind !== "extra" url: seed.thumbnail,
? guessNextRefresh(entry.airDate ?? new Date()) column: entries.thumbnail,
: guessNextRefresh(new Date()), }),
episodeNumber: nextRefresh:
entry.kind === "episode" entry.kind !== "extra"
? entry.episodeNumber ? guessNextRefresh(entry.airDate ?? new Date())
: entry.kind === "special" : guessNextRefresh(new Date()),
? entry.number episodeNumber:
: undefined, entry.kind === "episode"
}; ? entry.episodeNumber
}); : entry.kind === "special"
? entry.number
: undefined,
};
}),
);
const ret = await tx const ret = await tx
.insert(entries) .insert(entries)
.values(vals) .values(vals)
@ -83,30 +89,41 @@ export const insertEntries = async (
}) })
.returning({ pk: entries.pk, id: entries.id, slug: entries.slug }); .returning({ pk: entries.pk, id: entries.id, slug: entries.slug });
const trans = items.flatMap((seed, i) => { const trans: EntryTransI[] = (
if (seed.kind === "extra") { await Promise.all(
return { items.map(async (seed, i) => {
pk: ret[i].pk, if (seed.kind === "extra") {
// yeah we hardcode the language to extra because if we want to support return [
// translations one day it won't be awkward {
language: "extra", pk: ret[i].pk,
name: seed.name, // yeah we hardcode the language to extra because if we want to support
description: null, // translations one day it won't be awkward
poster: undefined, language: "extra",
}; name: seed.name,
} description: null,
poster: undefined,
},
];
}
return Object.entries(seed.translations).map(([lang, tr]) => ({ return await Promise.all(
// assumes ret is ordered like items. Object.entries(seed.translations).map(async ([lang, tr]) => ({
pk: ret[i].pk, // assumes ret is ordered like items.
language: lang, pk: ret[i].pk,
...tr, language: lang,
poster: ...tr,
seed.kind === "movie" poster:
? enqueueOptImage((tr as any).poster) seed.kind === "movie"
: undefined, ? await enqueueOptImage(tx, {
})); url: (tr as any).poster,
}); column: entryTranslations.poster,
})
: undefined,
})),
);
}),
)
).flat();
await tx await tx
.insert(entryTranslations) .insert(entryTranslations)
.values(trans) .values(trans)

View File

@ -37,17 +37,33 @@ export const insertSeasons = async (
}) })
.returning({ pk: seasons.pk, id: seasons.id, slug: seasons.slug }); .returning({ pk: seasons.pk, id: seasons.id, slug: seasons.slug });
const trans: SeasonTransI[] = items.flatMap((seed, i) => const trans: SeasonTransI[] = (
Object.entries(seed.translations).map(([lang, tr]) => ({ await Promise.all(
// assumes ret is ordered like items. items.map(
pk: ret[i].pk, async (seed, i) =>
language: lang, await Promise.all(
...tr, Object.entries(seed.translations).map(async ([lang, tr]) => ({
poster: enqueueOptImage(tr.poster), // assumes ret is ordered like items.
thumbnail: enqueueOptImage(tr.thumbnail), pk: ret[i].pk,
banner: enqueueOptImage(tr.banner), language: lang,
})), ...tr,
); poster: await enqueueOptImage(tx, {
url: tr.poster,
column: seasonTranslations.poster,
}),
thumbnail: await enqueueOptImage(tx, {
url: tr.thumbnail,
column: seasonTranslations.thumbnail,
}),
banner: await enqueueOptImage(tx, {
url: tr.banner,
column: seasonTranslations.banner,
}),
})),
),
),
)
).flat();
await tx await tx
.insert(seasonTranslations) .insert(seasonTranslations)
.values(trans) .values(trans)

View File

@ -1,5 +1,5 @@
import { and, count, eq, exists, ne, sql } from "drizzle-orm"; import { and, count, eq, exists, ne, sql } from "drizzle-orm";
import { db } from "~/db"; import { type Transaction, db } from "~/db";
import { entries, entryVideoJoin, showTranslations, shows } from "~/db/schema"; import { entries, entryVideoJoin, showTranslations, shows } from "~/db/schema";
import { conflictUpdateAllExcept, sqlarr } from "~/db/utils"; import { conflictUpdateAllExcept, sqlarr } from "~/db/utils";
import type { SeedCollection } from "~/models/collections"; import type { SeedCollection } from "~/models/collections";
@ -12,30 +12,53 @@ type Show = typeof shows.$inferInsert;
type ShowTrans = typeof showTranslations.$inferInsert; type ShowTrans = typeof showTranslations.$inferInsert;
export const insertShow = async ( export const insertShow = async (
show: Show, show: Omit<Show, "original"> & { originalLanguage: string },
translations: translations:
| SeedMovie["translations"] | SeedMovie["translations"]
| SeedSerie["translations"] | SeedSerie["translations"]
| SeedCollection["translations"], | SeedCollection["translations"],
) => { ) => {
return await db.transaction(async (tx) => { return await db.transaction(async (tx) => {
const ret = await insertBaseShow(tx, show); const trans: (Omit<ShowTrans, "pk"> & { latinName: string | null })[] =
await Promise.all(
Object.entries(translations).map(async ([lang, tr]) => ({
language: lang,
...tr,
latinName: tr.latinName ?? null,
poster: await enqueueOptImage(tx, {
url: tr.poster,
column: showTranslations.poster,
}),
thumbnail: await enqueueOptImage(tx, {
url: tr.thumbnail,
column: showTranslations.thumbnail,
}),
logo: await enqueueOptImage(tx, {
url: tr.logo,
column: showTranslations.logo,
}),
banner: await enqueueOptImage(tx, {
url: tr.banner,
column: showTranslations.banner,
}),
})),
);
const original = trans.find((x) => x.language === show.originalLanguage);
if (!original) {
tx.rollback();
return {
status: 422 as const,
message: "No translation available in the original language.",
};
}
const ret = await insertBaseShow(tx, { ...show, original });
if ("status" in ret) return ret; if ("status" in ret) return ret;
const trans: ShowTrans[] = Object.entries(translations).map(
([lang, tr]) => ({
pk: ret.pk,
language: lang,
...tr,
poster: enqueueOptImage(tr.poster),
thumbnail: enqueueOptImage(tr.thumbnail),
logo: enqueueOptImage(tr.logo),
banner: enqueueOptImage(tr.banner),
}),
);
await tx await tx
.insert(showTranslations) .insert(showTranslations)
.values(trans) .values(trans.map((x) => ({ ...x, pk: ret.pk })))
.onConflictDoUpdate({ .onConflictDoUpdate({
target: [showTranslations.pk, showTranslations.language], target: [showTranslations.pk, showTranslations.language],
set: conflictUpdateAllExcept(showTranslations, ["pk", "language"]), set: conflictUpdateAllExcept(showTranslations, ["pk", "language"]),
@ -44,10 +67,7 @@ export const insertShow = async (
}); });
}; };
async function insertBaseShow( async function insertBaseShow(tx: Transaction, show: Show) {
tx: Parameters<Parameters<typeof db.transaction>[0]>[0],
show: Show,
) {
function insert() { function insert() {
return tx return tx
.insert(shows) .insert(shows)
@ -97,7 +117,7 @@ async function insertBaseShow(
} }
export async function updateAvailableCount( export async function updateAvailableCount(
tx: typeof db | Parameters<Parameters<typeof db.transaction>[0]>[0], tx: Transaction,
showPks: number[], showPks: number[],
updateEntryCount = true, updateEntryCount = true,
) { ) {

View File

@ -12,10 +12,15 @@ export const insertStaff = async (
if (!seed?.length) return []; if (!seed?.length) return [];
return await db.transaction(async (tx) => { return await db.transaction(async (tx) => {
const people = seed.map((x) => ({ const people = await Promise.all(
...x.staff, seed.map(async (x) => ({
image: enqueueOptImage(x.staff.image), ...x.staff,
})); image: await enqueueOptImage(tx, {
url: x.staff.image,
column: staff.image,
}),
})),
);
const ret = await tx const ret = await tx
.insert(staff) .insert(staff)
.values(people) .values(people)
@ -25,16 +30,21 @@ export const insertStaff = async (
}) })
.returning({ pk: staff.pk, id: staff.id, slug: staff.slug }); .returning({ pk: staff.pk, id: staff.id, slug: staff.slug });
const rval = seed.map((x, i) => ({ const rval = await Promise.all(
showPk, seed.map(async (x, i) => ({
staffPk: ret[i].pk, showPk,
kind: x.kind, staffPk: ret[i].pk,
order: i, kind: x.kind,
character: { order: i,
...x.character, character: {
image: enqueueOptImage(x.character.image), ...x.character,
}, image: await enqueueOptImage(tx, {
})); url: x.character.image,
column: roles.character.image,
}),
},
})),
);
// always replace all roles. this is because: // always replace all roles. this is because:
// - we want `order` to stay in sync (& without duplicates) // - we want `order` to stay in sync (& without duplicates)

View File

@ -33,14 +33,24 @@ export const insertStudios = async (
}) })
.returning({ pk: studios.pk, id: studios.id, slug: studios.slug }); .returning({ pk: studios.pk, id: studios.id, slug: studios.slug });
const trans: StudioTransI[] = seed.flatMap((x, i) => const trans: StudioTransI[] = (
Object.entries(x.translations).map(([lang, tr]) => ({ await Promise.all(
pk: ret[i].pk, seed.map(
language: lang, async (x, i) =>
name: tr.name, await Promise.all(
logo: enqueueOptImage(tr.logo), Object.entries(x.translations).map(async ([lang, tr]) => ({
})), pk: ret[i].pk,
); language: lang,
name: tr.name,
logo: await enqueueOptImage(tx, {
url: tr.logo,
column: studioTranslations.logo,
}),
})),
),
),
)
).flat();
await tx await tx
.insert(studioTranslations) .insert(studioTranslations)
.values(trans) .values(trans)

View File

@ -1,7 +1,6 @@
import { t } from "elysia"; import { t } from "elysia";
import type { SeedMovie } from "~/models/movie"; import type { SeedMovie } from "~/models/movie";
import { getYear } from "~/utils"; import { getYear } from "~/utils";
import { enqueueOptImage } from "./images";
import { insertCollection } from "./insert/collection"; import { insertCollection } from "./insert/collection";
import { insertEntries } from "./insert/entries"; import { insertEntries } from "./insert/entries";
import { insertShow, updateAvailableCount } from "./insert/shows"; import { insertShow, updateAvailableCount } from "./insert/shows";
@ -55,13 +54,6 @@ export const seedMovie = async (
const { translations, videos, collection, studios, staff, ...movie } = seed; const { translations, videos, collection, studios, staff, ...movie } = seed;
const nextRefresh = guessNextRefresh(movie.airDate ?? new Date()); const nextRefresh = guessNextRefresh(movie.airDate ?? new Date());
const original = translations[movie.originalLanguage];
if (!original) {
return {
status: 422,
message: "No translation available in the original language.",
};
}
const col = await insertCollection(collection, { const col = await insertCollection(collection, {
kind: "movie", kind: "movie",
@ -76,15 +68,6 @@ export const seedMovie = async (
nextRefresh, nextRefresh,
collectionPk: col?.pk, collectionPk: col?.pk,
entriesCount: 1, entriesCount: 1,
original: {
language: movie.originalLanguage,
name: original.name,
latinName: original.latinName ?? null,
poster: enqueueOptImage(original.poster),
thumbnail: enqueueOptImage(original.thumbnail),
logo: enqueueOptImage(original.logo),
banner: enqueueOptImage(original.banner),
},
...movie, ...movie,
}, },
translations, translations,

View File

@ -5,7 +5,7 @@ import { enqueueOptImage } from "./images";
import { insertCollection } from "./insert/collection"; import { insertCollection } from "./insert/collection";
import { insertEntries } from "./insert/entries"; import { insertEntries } from "./insert/entries";
import { insertSeasons } from "./insert/seasons"; import { insertSeasons } from "./insert/seasons";
import { insertShow, updateAvailableCount } from "./insert/shows"; import { insertShow } from "./insert/shows";
import { insertStaff } from "./insert/staff"; import { insertStaff } from "./insert/staff";
import { insertStudios } from "./insert/studios"; import { insertStudios } from "./insert/studios";
import { guessNextRefresh } from "./refresh"; import { guessNextRefresh } from "./refresh";
@ -91,13 +91,6 @@ export const seedSerie = async (
...serie ...serie
} = seed; } = seed;
const nextRefresh = guessNextRefresh(serie.startAir ?? new Date()); const nextRefresh = guessNextRefresh(serie.startAir ?? new Date());
const original = translations[serie.originalLanguage];
if (!original) {
return {
status: 422,
message: "No translation available in the original language.",
};
}
const col = await insertCollection(collection, { const col = await insertCollection(collection, {
kind: "serie", kind: "serie",
@ -111,15 +104,6 @@ export const seedSerie = async (
nextRefresh, nextRefresh,
collectionPk: col?.pk, collectionPk: col?.pk,
entriesCount: entries.length, entriesCount: entries.length,
original: {
language: serie.originalLanguage,
name: original.name,
latinName: original.latinName ?? null,
poster: enqueueOptImage(original.poster),
thumbnail: enqueueOptImage(original.thumbnail),
logo: enqueueOptImage(original.logo),
banner: enqueueOptImage(original.banner),
},
...serie, ...serie,
}, },
translations, translations,

View File

@ -31,3 +31,7 @@ export const migrate = async () => {
}); });
console.log(`Database ${dbConfig.database} migrated!`); console.log(`Database ${dbConfig.database} migrated!`);
}; };
export type Transaction =
| typeof db
| Parameters<Parameters<typeof db.transaction>[0]>[0];

View File

@ -58,10 +58,10 @@ export const genres = schema.enum("genres", [
]); ]);
type OriginalWithImages = Original & { type OriginalWithImages = Original & {
poster: Image | null; poster?: Image | null;
thumbnail: Image | null; thumbnail?: Image | null;
banner: Image | null; banner?: Image | null;
logo: Image | null; logo?: Image | null;
}; };
export const shows = schema.table( export const shows = schema.table(