fix(server): start job workers after DB (#17806)

Job workers are currently started on app init, which means they are started
before the DB is initialised. This can be problematic if jobs which need to use
the DB start running before it's ready. It also means that swapping out the
queue implementation for something which uses the DB won't work.
This commit is contained in:
Thomas 2025-04-23 16:07:32 +01:00 committed by GitHub
parent 1b5e981a45
commit 987e5ab76c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 22 additions and 7 deletions

View File

@ -17,12 +17,12 @@ import { LoggingInterceptor } from 'src/middleware/logging.interceptor';
import { repositories } from 'src/repositories';
import { ConfigRepository } from 'src/repositories/config.repository';
import { EventRepository } from 'src/repositories/event.repository';
import { JobRepository } from 'src/repositories/job.repository';
import { LoggingRepository } from 'src/repositories/logging.repository';
import { teardownTelemetry, TelemetryRepository } from 'src/repositories/telemetry.repository';
import { services } from 'src/services';
import { AuthService } from 'src/services/auth.service';
import { CliService } from 'src/services/cli.service';
import { JobService } from 'src/services/job.service';
import { getKyselyConfig } from 'src/utils/database';
const common = [...repositories, ...services, GlobalExceptionFilter];
@ -52,7 +52,7 @@ class BaseModule implements OnModuleInit, OnModuleDestroy {
@Inject(IWorker) private worker: ImmichWorker,
logger: LoggingRepository,
private eventRepository: EventRepository,
private jobRepository: JobRepository,
private jobService: JobService,
private telemetryRepository: TelemetryRepository,
private authService: AuthService,
) {
@ -62,10 +62,7 @@ class BaseModule implements OnModuleInit, OnModuleDestroy {
async onModuleInit() {
this.telemetryRepository.setup({ repositories });
this.jobRepository.setup({ services });
if (this.worker === ImmichWorker.MICROSERVICES) {
this.jobRepository.startWorkers();
}
this.jobService.setServices(services);
this.eventRepository.setAuthFn(async (client) =>
this.authService.authenticate({

View File

@ -407,6 +407,8 @@ export enum DatabaseExtension {
export enum BootstrapEventPriority {
// Database service should be initialized before anything else, most other services need database access
DatabaseService = -200,
// Other services may need to queue jobs on bootstrap.
JobService = -190,
// Initialise config after other bootstrap services, stop other services from using config on bootstrap
SystemConfig = 100,
}

View File

@ -33,7 +33,7 @@ export class JobRepository {
this.logger.setContext(JobRepository.name);
}
setup({ services }: { services: ClassConstructor<unknown>[] }) {
setup(services: ClassConstructor<unknown>[]) {
const reflector = this.moduleRef.get(Reflector, { strict: false });
// discovery

View File

@ -1,10 +1,12 @@
import { BadRequestException, Injectable } from '@nestjs/common';
import { ClassConstructor } from 'class-transformer';
import { snakeCase } from 'lodash';
import { OnEvent } from 'src/decorators';
import { mapAsset } from 'src/dtos/asset-response.dto';
import { AllJobStatusResponseDto, JobCommandDto, JobCreateDto, JobStatusDto } from 'src/dtos/job.dto';
import {
AssetType,
BootstrapEventPriority,
ImmichWorker,
JobCommand,
JobName,
@ -51,6 +53,8 @@ const asJobItem = (dto: JobCreateDto): JobItem => {
@Injectable()
export class JobService extends BaseService {
private services: ClassConstructor<unknown>[] = [];
@OnEvent({ name: 'config.init', workers: [ImmichWorker.MICROSERVICES] })
onConfigInit({ newConfig: config }: ArgOf<'config.init'>) {
this.logger.debug(`Updating queue concurrency settings`);
@ -69,6 +73,18 @@ export class JobService extends BaseService {
this.onConfigInit({ newConfig: config });
}
@OnEvent({ name: 'app.bootstrap', priority: BootstrapEventPriority.JobService })
onBootstrap() {
this.jobRepository.setup(this.services);
if (this.worker === ImmichWorker.MICROSERVICES) {
this.jobRepository.startWorkers();
}
}
setServices(services: ClassConstructor<unknown>[]) {
this.services = services;
}
async create(dto: JobCreateDto): Promise<void> {
await this.jobRepository.queue(asJobItem(dto));
}