diff --git a/api/src/controllers/profiles/history.ts b/api/src/controllers/profiles/history.ts index 9736ee23..324a2738 100644 --- a/api/src/controllers/profiles/history.ts +++ b/api/src/controllers/profiles/history.ts @@ -1,11 +1,28 @@ -import { and, count, eq, exists, gt, isNotNull, ne, sql } from "drizzle-orm"; +import { + and, + count, + eq, + exists, + gt, + isNotNull, + lte, + ne, + sql, +} from "drizzle-orm"; import { alias } from "drizzle-orm/pg-core"; import Elysia, { t } from "elysia"; import { auth, getUserInfo } from "~/auth"; -import { db } from "~/db"; -import { entries, history, profiles, shows, videos } from "~/db/schema"; +import { db, type Transaction } from "~/db"; +import { + entries, + entryVideoJoin, + history, + profiles, + shows, + videos, +} from "~/db/schema"; import { watchlist } from "~/db/schema/watchlist"; -import { coalesce, values } from "~/db/utils"; +import { coalesce } from "~/db/utils"; import { Entry } from "~/models/entry"; import { KError } from "~/models/error"; import { SeedHistory } from "~/models/history"; @@ -13,12 +30,12 @@ import { AcceptLanguage, createPage, Filter, - isUuid, Page, processLanguages, } from "~/models/utils"; import { desc } from "~/models/utils/descriptions"; import type { WatchlistStatus } from "~/models/watchlist"; +import { traverse } from "~/utils"; import { entryFilters, entryProgressQ, @@ -27,6 +44,220 @@ import { } from "../entries"; import { getOrCreateProfile } from "./profile"; +export async function updateHistory( + dbTx: Transaction, + userPk: number, + progress: SeedHistory[], +) { + return dbTx.transaction(async (tx) => { + const existing = ( + await tx + .select({ videoId: videos.id }) + .from(history) + .for("update") + .leftJoin(videos, eq(videos.pk, history.videoPk)) + .where( + and( + eq(history.profilePk, userPk), + lte(history.playedDate, sql`interval '1 day'`), + ), + ) + ).map((x) => x.videoId); + + const toUpdate = traverse( + progress.filter((x) => existing.includes(x.videoId)), + ); + const newEntries = traverse( + progress.filter((x) => !existing.includes(x.videoId)), + ); + + const updated = await tx + .update(history) + .set({ + time: sql`hist.ts`, + percent: sql`hist.percent`, + playedDate: coalesce(sql`hist.played_date`, sql`now()`) + }) + .from(sql`unnest( + ${toUpdate.videoId}::uuid[], + ${toUpdate.time}::integer[], + ${toUpdate.percent}::integer[], + ${toUpdate.playedDate}::timestamp[] + ) as hist(video_id, ts, percent, played_date)`) + .innerJoin(videos, eq(videos.id, sql`hist.video_id`)) + .where(and(eq(history.profilePk, userPk), eq(history.videoPk, videos.pk))) + .returning({ + entryPk: history.entryPk, + percent: history.percent, + playedDate: history.playedDate, + }); + + const ret = await tx + .insert(history) + .select( + db + .select({ + profilePk: sql`${userPk}`.as("profilePk"), + entryPk: entries.pk, + videoPk: videos.pk, + percent: sql`hist.percent`.as("percent"), + time: sql`hist.ts`.as("time"), + playedDate: coalesce(sql`hist.played_date`, sql`now()`).as( + "playedDate", + ), + }) + .from(sql`unnest( + ${newEntries.videoId}::uuid[], + ${newEntries.time}::integer[], + ${newEntries.percent}::integer[], + ${newEntries.playedDate}::timestamptz[] + ) as hist(video_id, ts, percent, played_date)`) + .innerJoin(videos, eq(videos.id, sql`hist.videoId`)) + .leftJoin(entryVideoJoin, eq(entryVideoJoin.videoPk, videos.pk)) + .leftJoin(entries, eq(entries.pk, entryVideoJoin.entryPk)), + ) + .returning({ + entryPk: history.entryPk, + percent: history.percent, + playedDate: history.playedDate, + }); + + // only return new and entries whose status has changed. + // we don't need to update the watchlist every 10s when watching a video. + return [...ret, ...updated.filter((x) => x.percent >= 95)]; + }); +} + +export async function updateWatchlist( + tx: Transaction, + userPk: number, + histArr: Awaited>, +) { + const nextEntry = alias(entries, "next_entry"); + const nextEntryQ = tx + .select({ + pk: nextEntry.pk, + }) + .from(nextEntry) + .where( + and( + eq(nextEntry.showPk, entries.showPk), + ne(nextEntry.kind, "extra"), + gt(nextEntry.order, entries.order), + ), + ) + .orderBy(nextEntry.order) + .limit(1) + .as("nextEntryQ"); + + const seenCountQ = tx + .select({ c: count() }) + .from(entries) + .where( + and( + eq(entries.showPk, sql`excluded.show_pk`), + exists( + db + .select() + .from(history) + .where( + and( + eq(history.profilePk, userPk), + eq(history.entryPk, entries.pk), + ), + ), + ), + ), + ); + + const showKindQ = tx + .select({ k: shows.kind }) + .from(shows) + .where(eq(shows.pk, sql`excluded.show_pk`)); + + const hist = traverse(histArr); + await tx + .insert(watchlist) + .select( + db + .selectDistinctOn([entries.showPk], { + profilePk: sql`${userPk}`.as("profilePk"), + showPk: entries.showPk, + status: sql` + case + when + hist.percent >= 95 + and ${nextEntryQ.pk} is null + then 'completed'::watchlist_status + else 'watching'::watchlist_status + end + `.as("status"), + seenCount: sql` + case + when ${entries.kind} = 'movie' then hist.percent + when hist.percent >= 95 then 1 + else 0 + end + `.as("seen_count"), + nextEntry: sql` + case + when hist.percent >= 95 then ${nextEntryQ.pk} + else ${entries.pk} + end + `.as("next_entry"), + score: sql`null`.as("score"), + startedAt: sql`hist.played_date`.as("startedAt"), + lastPlayedAt: sql`hist.played_date`.as("lastPlayedAt"), + completedAt: sql` + case + when ${nextEntryQ.pk} is null then hist.played_date + else null + end + `.as("completedAt"), + // see https://github.com/drizzle-team/drizzle-orm/issues/3608 + updatedAt: sql`now()`.as("updatedAt"), + }) + .from(sql`unnest( + ${hist.entryPk}::integer[], + ${hist.percent}::integer[], + ${hist.playedDate}::timestamptz[] + ) as hist(entry_pk, percent, played_date)`) + .leftJoin(entries, eq(entries.pk, sql`hist.entry_pk`)) + .leftJoinLateral(nextEntryQ, sql`true`), + ) + .onConflictDoUpdate({ + target: [watchlist.profilePk, watchlist.showPk], + set: { + status: sql` + case + when excluded.status = 'completed' then excluded.status + when + ${watchlist.status} != 'completed' + and ${watchlist.status} != 'rewatching' + then excluded.status + else ${watchlist.status} + end + `, + seenCount: sql` + case + when ${showKindQ} = 'movie' then excluded.seen_count + else ${seenCountQ} + end`, + nextEntry: sql` + case + when ${watchlist.status} = 'completed' then null + else excluded.next_entry + end + `, + lastPlayedAt: sql`excluded.last_played_at`, + completedAt: coalesce( + watchlist.completedAt, + sql`excluded.completed_at`, + ), + }, + }); +} + const historyProgressQ: typeof entryProgressQ = db .select({ percent: history.percent, @@ -170,162 +401,11 @@ export const historyH = new Elysia({ tags: ["profiles"] }) async ({ body, jwt: { sub }, status }) => { const profilePk = await getOrCreateProfile(sub); - const hist = values( - body.map((x) => ({ ...x, entryUseId: isUuid(x.entry) })), - { - percent: "integer", - time: "integer", - playedDate: "timestamptz", - videoId: "uuid", - }, - ).as("hist"); - const valEqEntries = sql` - case - when hist.entryUseId::boolean then ${entries.id} = hist.entry::uuid - else ${entries.slug} = hist.entry - end - `; - - const rows = await db - .insert(history) - .select( - db - .select({ - profilePk: sql`${profilePk}`.as("profilePk"), - entryPk: entries.pk, - videoPk: videos.pk, - percent: sql`hist.percent`.as("percent"), - time: sql`hist.time`.as("time"), - playedDate: sql`hist.playedDate`.as("playedDate"), - }) - .from(hist) - .innerJoin(entries, valEqEntries) - .leftJoin(videos, eq(videos.id, sql`hist.videoId`)), - ) - .returning({ pk: history.pk }); - - // automatically update watchlist with this new info - - const nextEntry = alias(entries, "next_entry"); - const nextEntryQ = db - .select({ - pk: nextEntry.pk, - }) - .from(nextEntry) - .where( - and( - eq(nextEntry.showPk, entries.showPk), - ne(nextEntry.kind, "extra"), - gt(nextEntry.order, entries.order), - ), - ) - .orderBy(nextEntry.order) - .limit(1) - .as("nextEntryQ"); - - const seenCountQ = db - .select({ c: count() }) - .from(entries) - .where( - and( - eq(entries.showPk, sql`excluded.show_pk`), - exists( - db - .select() - .from(history) - .where( - and( - eq(history.profilePk, profilePk), - eq(history.entryPk, entries.pk), - ), - ), - ), - ), - ); - - const showKindQ = db - .select({ k: shows.kind }) - .from(shows) - .where(eq(shows.pk, sql`excluded.show_pk`)); - - await db - .insert(watchlist) - .select( - db - .select({ - profilePk: sql`${profilePk}`.as("profilePk"), - showPk: entries.showPk, - status: sql` - case - when - hist.percent >= 95 - and ${nextEntryQ.pk} is null - then 'completed'::watchlist_status - else 'watching'::watchlist_status - end - `.as("status"), - seenCount: sql` - case - when ${entries.kind} = 'movie' then hist.percent - when hist.percent >= 95 then 1 - else 0 - end - `.as("seen_count"), - nextEntry: sql` - case - when hist.percent >= 95 then ${nextEntryQ.pk} - else ${entries.pk} - end - `.as("next_entry"), - score: sql`null`.as("score"), - startedAt: sql`hist.playedDate`.as("startedAt"), - lastPlayedAt: sql`hist.playedDate`.as("lastPlayedAt"), - completedAt: sql` - case - when ${nextEntryQ.pk} is null then hist.playedDate - else null - end - `.as("completedAt"), - // see https://github.com/drizzle-team/drizzle-orm/issues/3608 - updatedAt: sql`now()`.as("updatedAt"), - }) - .from(hist) - .leftJoin(entries, valEqEntries) - .leftJoinLateral(nextEntryQ, sql`true`), - ) - .onConflictDoUpdate({ - target: [watchlist.profilePk, watchlist.showPk], - set: { - status: sql` - case - when excluded.status = 'completed' then excluded.status - when - ${watchlist.status} != 'completed' - and ${watchlist.status} != 'rewatching' - then excluded.status - else ${watchlist.status} - end - `, - seenCount: sql` - case - when ${showKindQ} = 'movie' then excluded.seen_count - else ${seenCountQ} - end`, - nextEntry: sql` - case - when ${watchlist.status} = 'completed' then null - else excluded.next_entry - end - `, - lastPlayedAt: sql`excluded.last_played_at`, - completedAt: coalesce( - watchlist.completedAt, - sql`excluded.completed_at`, - ), - }, - }); - - return status(201, { status: 201, inserted: rows.length }); + return db.transaction(async (tx) => { + const hist = await updateHistory(tx, profilePk, body); + await updateWatchlist(tx, profilePk, hist); + return status(201, { status: 201, inserted: hist.length }); + }); }, { detail: { description: "Bulk add entries/movies to your watch history." }, diff --git a/api/src/db/utils.ts b/api/src/db/utils.ts index 12c2e43c..c6fd3fdd 100644 --- a/api/src/db/utils.ts +++ b/api/src/db/utils.ts @@ -92,36 +92,6 @@ export function sqlarr(array: unknown[]): string { .join(", ")}}`; } -// See https://github.com/drizzle-team/drizzle-orm/issues/4044 -export function values( - items: Record[], - typeInfo: Partial> = {}, -) { - if (items[0] === undefined) - throw new Error("Invalid values, expecting at least one items"); - const [firstProp, ...props] = Object.keys(items[0]) as K[]; - const values = items - .map((x, i) => { - let ret = sql`(${x[firstProp]}`; - if (i === 0 && typeInfo[firstProp]) - ret = sql`${ret}::${sql.raw(typeInfo[firstProp])}`; - for (const val of props) { - ret = sql`${ret}, ${x[val]}`; - if (i === 0 && typeInfo[val]) - ret = sql`${ret}::${sql.raw(typeInfo[val])}`; - } - return sql`${ret})`; - }) - .reduce((acc, x) => sql`${acc}, ${x}`); - const valueNames = [firstProp, ...props].join(", "); - - return { - as: (name: string) => { - return sql`(values ${values}) as ${sql.raw(name)}(${sql.raw(valueNames)})`; - }, - }; -} - /* goal: * unnestValues([{a: 1, b: 2}, {a: 3, b: 4}], tbl) * diff --git a/api/src/models/history.ts b/api/src/models/history.ts index 541153fd..76ec51a9 100644 --- a/api/src/models/history.ts +++ b/api/src/models/history.ts @@ -27,12 +27,10 @@ export const Progress = t.Object({ }); export type Progress = typeof Progress.static; -export const SeedHistory = t.Intersect([ - t.Object({ - entry: t.String({ - description: "Id or slug of the entry/movie you watched", - }), - }), - Progress, -]); +export const SeedHistory = t.Object({ + percent: Progress.properties.percent, + time: Progress.properties.time, + playedDate: t.Optional(Progress.properties.playedDate), + videoId: Progress.properties.videoId.anyOf[0], +}); export type SeedHistory = typeof SeedHistory.static; diff --git a/api/src/utils.ts b/api/src/utils.ts index c74bd4a9..bd52e73c 100644 --- a/api/src/utils.ts +++ b/api/src/utils.ts @@ -38,3 +38,20 @@ export function uniqBy(a: T[], key: (val: T) => string): T[] { return true; }); } + +export function traverse>( + arr: T[], +): { [K in keyof T]: T[K][] } { + const result = {} as { [K in keyof T]: T[K][] }; + + arr.forEach((obj) => { + for (const key in obj) { + if (!result[key]) { + result[key] = []; + } + result[key].push(obj[key]); + } + }); + + return result; +}