use addJobs

This commit is contained in:
mertalev 2025-04-30 20:13:40 -04:00
parent f0c013f844
commit e773a7b7a1
No known key found for this signature in database
GPG Key ID: DF6ABC77AAD98C95
4 changed files with 41 additions and 18 deletions

View File

@ -38,7 +38,7 @@
"fast-glob": "^3.3.2", "fast-glob": "^3.3.2",
"fluent-ffmpeg": "^2.1.2", "fluent-ffmpeg": "^2.1.2",
"geo-tz": "^8.0.0", "geo-tz": "^8.0.0",
"graphile-worker": "^0.16.6", "graphile-worker": "^0.17.0-canary.1fcb2a0",
"handlebars": "^4.7.8", "handlebars": "^4.7.8",
"i18n-iso-countries": "^7.6.0", "i18n-iso-countries": "^7.6.0",
"joi": "^17.10.0", "joi": "^17.10.0",
@ -10085,16 +10085,16 @@
"license": "MIT" "license": "MIT"
}, },
"node_modules/graphile-worker": { "node_modules/graphile-worker": {
"version": "0.16.6", "version": "0.17.0-canary.1fcb2a0",
"resolved": "https://registry.npmjs.org/graphile-worker/-/graphile-worker-0.16.6.tgz", "resolved": "https://registry.npmjs.org/graphile-worker/-/graphile-worker-0.17.0-canary.1fcb2a0.tgz",
"integrity": "sha512-e7gGYDmGqzju2l83MpzX8vNG/lOtVJiSzI3eZpAFubSxh/cxs7sRrRGBGjzBP1kNG0H+c95etPpNRNlH65PYhw==", "integrity": "sha512-eG02GZ0U1eSMBdfHlQg9+jaNXpr9gs1cwqfFeney3BHpEMSvG3jw+7SdQJPVUgF8wnt8dRRfhkbpzaXGSOr+MQ==",
"license": "MIT", "license": "MIT",
"dependencies": { "dependencies": {
"@graphile/logger": "^0.2.0", "@graphile/logger": "^0.2.0",
"@types/debug": "^4.1.10", "@types/debug": "^4.1.10",
"@types/pg": "^8.10.5", "@types/pg": "^8.10.5",
"cosmiconfig": "^8.3.6", "cosmiconfig": "^8.3.6",
"graphile-config": "^0.0.1-beta.4", "graphile-config": "^0.0.1-beta.14",
"json5": "^2.2.3", "json5": "^2.2.3",
"pg": "^8.11.3", "pg": "^8.11.3",
"tslib": "^2.6.2", "tslib": "^2.6.2",
@ -10104,7 +10104,8 @@
"graphile-worker": "dist/cli.js" "graphile-worker": "dist/cli.js"
}, },
"engines": { "engines": {
"node": ">=14.0.0" "node": ">=14.0.0",
"yarn": "^1.22.22"
} }
}, },
"node_modules/handlebars": { "node_modules/handlebars": {

View File

@ -63,7 +63,7 @@
"fast-glob": "^3.3.2", "fast-glob": "^3.3.2",
"fluent-ffmpeg": "^2.1.2", "fluent-ffmpeg": "^2.1.2",
"geo-tz": "^8.0.0", "geo-tz": "^8.0.0",
"graphile-worker": "^0.16.6", "graphile-worker": "^0.17.0-canary.1fcb2a0",
"handlebars": "^4.7.8", "handlebars": "^4.7.8",
"i18n-iso-countries": "^7.6.0", "i18n-iso-countries": "^7.6.0",
"joi": "^17.10.0", "joi": "^17.10.0",

View File

@ -1,7 +1,7 @@
import { Injectable } from '@nestjs/common'; import { Injectable } from '@nestjs/common';
import { ModuleRef, Reflector } from '@nestjs/core'; import { ModuleRef, Reflector } from '@nestjs/core';
import { ClassConstructor } from 'class-transformer'; import { ClassConstructor } from 'class-transformer';
import { makeWorkerUtils, run, Runner, TaskSpec, WorkerUtils } from 'graphile-worker'; import { AddJobsJobSpec, makeWorkerUtils, run, Runner, TaskSpec, WorkerUtils } from 'graphile-worker';
import { Kysely } from 'kysely'; import { Kysely } from 'kysely';
import { DateTime, Duration } from 'luxon'; import { DateTime, Duration } from 'luxon';
import { InjectKysely } from 'nestjs-kysely'; import { InjectKysely } from 'nestjs-kysely';
@ -178,24 +178,21 @@ export class JobRepository {
return (this.handlers[name] as JobMapItem).queueName; return (this.handlers[name] as JobMapItem).queueName;
} }
async run({ name, data }: JobItem): Promise<JobStatus> { run({ name, data }: JobItem): Promise<JobStatus> {
const item = this.handlers[name as JobName]; const item = this.handlers[name as JobName];
if (!item) { if (!item) {
this.logger.warn(`Skipping unknown job: "${name}"`); this.logger.warn(`Skipping unknown job: "${name}"`);
return JobStatus.SKIPPED; return Promise.resolve(JobStatus.SKIPPED);
} }
return item.handler(data); return item.handler(data);
} }
async queue(item: JobItem): Promise<void> { queue(item: JobItem): Promise<unknown> {
// this.logger.log(`Queueing job: ${this.getQueueName(item.name)}, data: ${JSON.stringify(item)}`); return this.workerUtils!.addJob(this.getQueueName(item.name), item, this.getJobOptions(item));
await this.workerUtils!.addJob(this.getQueueName(item.name), item, this.getJobOptions(item));
} }
async queueAll(items: JobItem[]): Promise<void> { queueAll(items: JobItem[]): Promise<unknown> {
for (const item of items) { return this.workerUtils!.addJobs(items.map((item) => this.getJobSpec(item)));
await this.queue(item);
}
} }
// todo: are we actually generating sql // todo: are we actually generating sql
@ -280,6 +277,31 @@ export class JobRepository {
return { paused: state?.[queueName]?.paused ?? false }; return { paused: state?.[queueName]?.paused ?? false };
} }
private getJobSpec(item: JobItem): AddJobsJobSpec {
switch (item.name) {
case JobName.NOTIFY_ALBUM_UPDATE: {
return {
identifier: item.name,
payload: item.data,
jobKey: item.data.id,
runAt: item.data?.delay ? new Date(Date.now() + item.data.delay) : undefined,
};
}
case JobName.STORAGE_TEMPLATE_MIGRATION_SINGLE: {
return { identifier: item.name, payload: item.data, jobKey: QueueName.STORAGE_TEMPLATE_MIGRATION };
}
case JobName.GENERATE_PERSON_THUMBNAIL: {
return { identifier: item.name, payload: item.data, priority: 1 };
}
case JobName.QUEUE_FACIAL_RECOGNITION: {
return { identifier: item.name, payload: item.data, jobKey: JobName.QUEUE_FACIAL_RECOGNITION };
}
default: {
return { identifier: item.name, payload: item.data };
}
}
}
private getJobOptions(item: JobItem): TaskSpec | undefined { private getJobOptions(item: JobItem): TaskSpec | undefined {
switch (item.name) { switch (item.name) {
case JobName.NOTIFY_ALBUM_UPDATE: { case JobName.NOTIFY_ALBUM_UPDATE: {

View File

@ -130,7 +130,7 @@ export class JobService extends BaseService {
return response; return response;
} }
private async start(name: QueueName, { force }: JobCommandDto): Promise<void> { private async start(name: QueueName, { force }: JobCommandDto): Promise<unknown> {
const { active } = await this.jobRepository.getJobCounts(name); const { active } = await this.jobRepository.getJobCounts(name);
if (active > 0) { if (active > 0) {
throw new BadRequestException(`Jobs are already running`); throw new BadRequestException(`Jobs are already running`);