diff --git a/server/package-lock.json b/server/package-lock.json index 33d32abc02..62921afc94 100644 --- a/server/package-lock.json +++ b/server/package-lock.json @@ -38,7 +38,7 @@ "fast-glob": "^3.3.2", "fluent-ffmpeg": "^2.1.2", "geo-tz": "^8.0.0", - "graphile-worker": "^0.16.6", + "graphile-worker": "^0.17.0-canary.1fcb2a0", "handlebars": "^4.7.8", "i18n-iso-countries": "^7.6.0", "joi": "^17.10.0", @@ -10085,16 +10085,16 @@ "license": "MIT" }, "node_modules/graphile-worker": { - "version": "0.16.6", - "resolved": "https://registry.npmjs.org/graphile-worker/-/graphile-worker-0.16.6.tgz", - "integrity": "sha512-e7gGYDmGqzju2l83MpzX8vNG/lOtVJiSzI3eZpAFubSxh/cxs7sRrRGBGjzBP1kNG0H+c95etPpNRNlH65PYhw==", + "version": "0.17.0-canary.1fcb2a0", + "resolved": "https://registry.npmjs.org/graphile-worker/-/graphile-worker-0.17.0-canary.1fcb2a0.tgz", + "integrity": "sha512-eG02GZ0U1eSMBdfHlQg9+jaNXpr9gs1cwqfFeney3BHpEMSvG3jw+7SdQJPVUgF8wnt8dRRfhkbpzaXGSOr+MQ==", "license": "MIT", "dependencies": { "@graphile/logger": "^0.2.0", "@types/debug": "^4.1.10", "@types/pg": "^8.10.5", "cosmiconfig": "^8.3.6", - "graphile-config": "^0.0.1-beta.4", + "graphile-config": "^0.0.1-beta.14", "json5": "^2.2.3", "pg": "^8.11.3", "tslib": "^2.6.2", @@ -10104,7 +10104,8 @@ "graphile-worker": "dist/cli.js" }, "engines": { - "node": ">=14.0.0" + "node": ">=14.0.0", + "yarn": "^1.22.22" } }, "node_modules/handlebars": { diff --git a/server/package.json b/server/package.json index 76bc069876..2f699233ee 100644 --- a/server/package.json +++ b/server/package.json @@ -63,7 +63,7 @@ "fast-glob": "^3.3.2", "fluent-ffmpeg": "^2.1.2", "geo-tz": "^8.0.0", - "graphile-worker": "^0.16.6", + "graphile-worker": "^0.17.0-canary.1fcb2a0", "handlebars": "^4.7.8", "i18n-iso-countries": "^7.6.0", "joi": "^17.10.0", diff --git a/server/src/repositories/job.repository.ts b/server/src/repositories/job.repository.ts index f5458ee941..1eb0a83f75 100644 --- a/server/src/repositories/job.repository.ts +++ b/server/src/repositories/job.repository.ts @@ -1,7 +1,7 @@ import { Injectable } from '@nestjs/common'; import { ModuleRef, Reflector } from '@nestjs/core'; import { ClassConstructor } from 'class-transformer'; -import { makeWorkerUtils, run, Runner, TaskSpec, WorkerUtils } from 'graphile-worker'; +import { AddJobsJobSpec, makeWorkerUtils, run, Runner, TaskSpec, WorkerUtils } from 'graphile-worker'; import { Kysely } from 'kysely'; import { DateTime, Duration } from 'luxon'; import { InjectKysely } from 'nestjs-kysely'; @@ -178,24 +178,21 @@ export class JobRepository { return (this.handlers[name] as JobMapItem).queueName; } - async run({ name, data }: JobItem): Promise { + run({ name, data }: JobItem): Promise { const item = this.handlers[name as JobName]; if (!item) { this.logger.warn(`Skipping unknown job: "${name}"`); - return JobStatus.SKIPPED; + return Promise.resolve(JobStatus.SKIPPED); } return item.handler(data); } - async queue(item: JobItem): Promise { - // this.logger.log(`Queueing job: ${this.getQueueName(item.name)}, data: ${JSON.stringify(item)}`); - await this.workerUtils!.addJob(this.getQueueName(item.name), item, this.getJobOptions(item)); + queue(item: JobItem): Promise { + return this.workerUtils!.addJob(this.getQueueName(item.name), item, this.getJobOptions(item)); } - async queueAll(items: JobItem[]): Promise { - for (const item of items) { - await this.queue(item); - } + queueAll(items: JobItem[]): Promise { + return this.workerUtils!.addJobs(items.map((item) => this.getJobSpec(item))); } // todo: are we actually generating sql @@ -280,6 +277,31 @@ export class JobRepository { return { paused: state?.[queueName]?.paused ?? false }; } + private getJobSpec(item: JobItem): AddJobsJobSpec { + switch (item.name) { + case JobName.NOTIFY_ALBUM_UPDATE: { + return { + identifier: item.name, + payload: item.data, + jobKey: item.data.id, + runAt: item.data?.delay ? new Date(Date.now() + item.data.delay) : undefined, + }; + } + case JobName.STORAGE_TEMPLATE_MIGRATION_SINGLE: { + return { identifier: item.name, payload: item.data, jobKey: QueueName.STORAGE_TEMPLATE_MIGRATION }; + } + case JobName.GENERATE_PERSON_THUMBNAIL: { + return { identifier: item.name, payload: item.data, priority: 1 }; + } + case JobName.QUEUE_FACIAL_RECOGNITION: { + return { identifier: item.name, payload: item.data, jobKey: JobName.QUEUE_FACIAL_RECOGNITION }; + } + default: { + return { identifier: item.name, payload: item.data }; + } + } + } + private getJobOptions(item: JobItem): TaskSpec | undefined { switch (item.name) { case JobName.NOTIFY_ALBUM_UPDATE: { diff --git a/server/src/services/job.service.ts b/server/src/services/job.service.ts index bd4c3237cd..e75dfd85c8 100644 --- a/server/src/services/job.service.ts +++ b/server/src/services/job.service.ts @@ -130,7 +130,7 @@ export class JobService extends BaseService { return response; } - private async start(name: QueueName, { force }: JobCommandDto): Promise { + private async start(name: QueueName, { force }: JobCommandDto): Promise { const { active } = await this.jobRepository.getJobCounts(name); if (active > 0) { throw new BadRequestException(`Jobs are already running`);