From 987e5ab76ccb40d3a49325da1ee9306eb54ec38d Mon Sep 17 00:00:00 2001 From: Thomas <9749173+uhthomas@users.noreply.github.com> Date: Wed, 23 Apr 2025 16:07:32 +0100 Subject: [PATCH] fix(server): start job workers after DB (#17806) Job workers are currently started on app init, which means they are started before the DB is initialised. This can be problematic if jobs which need to use the DB start running before it's ready. It also means that swapping out the queue implementation for something which uses the DB won't work. --- server/src/app.module.ts | 9 +++------ server/src/enum.ts | 2 ++ server/src/repositories/job.repository.ts | 2 +- server/src/services/job.service.ts | 16 ++++++++++++++++ 4 files changed, 22 insertions(+), 7 deletions(-) diff --git a/server/src/app.module.ts b/server/src/app.module.ts index 5720f7af0b..05dbc090fc 100644 --- a/server/src/app.module.ts +++ b/server/src/app.module.ts @@ -17,12 +17,12 @@ import { LoggingInterceptor } from 'src/middleware/logging.interceptor'; import { repositories } from 'src/repositories'; import { ConfigRepository } from 'src/repositories/config.repository'; import { EventRepository } from 'src/repositories/event.repository'; -import { JobRepository } from 'src/repositories/job.repository'; import { LoggingRepository } from 'src/repositories/logging.repository'; import { teardownTelemetry, TelemetryRepository } from 'src/repositories/telemetry.repository'; import { services } from 'src/services'; import { AuthService } from 'src/services/auth.service'; import { CliService } from 'src/services/cli.service'; +import { JobService } from 'src/services/job.service'; import { getKyselyConfig } from 'src/utils/database'; const common = [...repositories, ...services, GlobalExceptionFilter]; @@ -52,7 +52,7 @@ class BaseModule implements OnModuleInit, OnModuleDestroy { @Inject(IWorker) private worker: ImmichWorker, logger: LoggingRepository, private eventRepository: EventRepository, - private jobRepository: JobRepository, + private jobService: JobService, private telemetryRepository: TelemetryRepository, private authService: AuthService, ) { @@ -62,10 +62,7 @@ class BaseModule implements OnModuleInit, OnModuleDestroy { async onModuleInit() { this.telemetryRepository.setup({ repositories }); - this.jobRepository.setup({ services }); - if (this.worker === ImmichWorker.MICROSERVICES) { - this.jobRepository.startWorkers(); - } + this.jobService.setServices(services); this.eventRepository.setAuthFn(async (client) => this.authService.authenticate({ diff --git a/server/src/enum.ts b/server/src/enum.ts index baf864aa49..b9a914671a 100644 --- a/server/src/enum.ts +++ b/server/src/enum.ts @@ -407,6 +407,8 @@ export enum DatabaseExtension { export enum BootstrapEventPriority { // Database service should be initialized before anything else, most other services need database access DatabaseService = -200, + // Other services may need to queue jobs on bootstrap. + JobService = -190, // Initialise config after other bootstrap services, stop other services from using config on bootstrap SystemConfig = 100, } diff --git a/server/src/repositories/job.repository.ts b/server/src/repositories/job.repository.ts index fd9f4c5363..0912759d1c 100644 --- a/server/src/repositories/job.repository.ts +++ b/server/src/repositories/job.repository.ts @@ -33,7 +33,7 @@ export class JobRepository { this.logger.setContext(JobRepository.name); } - setup({ services }: { services: ClassConstructor[] }) { + setup(services: ClassConstructor[]) { const reflector = this.moduleRef.get(Reflector, { strict: false }); // discovery diff --git a/server/src/services/job.service.ts b/server/src/services/job.service.ts index f8298336a8..b81256de81 100644 --- a/server/src/services/job.service.ts +++ b/server/src/services/job.service.ts @@ -1,10 +1,12 @@ import { BadRequestException, Injectable } from '@nestjs/common'; +import { ClassConstructor } from 'class-transformer'; import { snakeCase } from 'lodash'; import { OnEvent } from 'src/decorators'; import { mapAsset } from 'src/dtos/asset-response.dto'; import { AllJobStatusResponseDto, JobCommandDto, JobCreateDto, JobStatusDto } from 'src/dtos/job.dto'; import { AssetType, + BootstrapEventPriority, ImmichWorker, JobCommand, JobName, @@ -51,6 +53,8 @@ const asJobItem = (dto: JobCreateDto): JobItem => { @Injectable() export class JobService extends BaseService { + private services: ClassConstructor[] = []; + @OnEvent({ name: 'config.init', workers: [ImmichWorker.MICROSERVICES] }) onConfigInit({ newConfig: config }: ArgOf<'config.init'>) { this.logger.debug(`Updating queue concurrency settings`); @@ -69,6 +73,18 @@ export class JobService extends BaseService { this.onConfigInit({ newConfig: config }); } + @OnEvent({ name: 'app.bootstrap', priority: BootstrapEventPriority.JobService }) + onBootstrap() { + this.jobRepository.setup(this.services); + if (this.worker === ImmichWorker.MICROSERVICES) { + this.jobRepository.startWorkers(); + } + } + + setServices(services: ClassConstructor[]) { + this.services = services; + } + async create(dto: JobCreateDto): Promise { await this.jobRepository.queue(asJobItem(dto)); }