refactor(server): filter on-event handlers by worker (#14085)

This commit is contained in:
Jason Rasmussen 2024-11-20 14:08:42 -05:00 committed by GitHub
parent c6e1dbec5c
commit 876893c823
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 24 additions and 70 deletions

View File

@ -2,7 +2,7 @@ import { SetMetadata, applyDecorators } from '@nestjs/common';
import { ApiExtension, ApiOperation, ApiProperty, ApiTags } from '@nestjs/swagger'; import { ApiExtension, ApiOperation, ApiProperty, ApiTags } from '@nestjs/swagger';
import _ from 'lodash'; import _ from 'lodash';
import { ADDED_IN_PREFIX, DEPRECATED_IN_PREFIX, LIFECYCLE_EXTENSION } from 'src/constants'; import { ADDED_IN_PREFIX, DEPRECATED_IN_PREFIX, LIFECYCLE_EXTENSION } from 'src/constants';
import { MetadataKey } from 'src/enum'; import { ImmichWorker, MetadataKey } from 'src/enum';
import { EmitEvent } from 'src/interfaces/event.interface'; import { EmitEvent } from 'src/interfaces/event.interface';
import { JobName, QueueName } from 'src/interfaces/job.interface'; import { JobName, QueueName } from 'src/interfaces/job.interface';
import { setUnion } from 'src/utils/set'; import { setUnion } from 'src/utils/set';
@ -120,6 +120,8 @@ export type EventConfig = {
server?: boolean; server?: boolean;
/** lower value has higher priority, defaults to 0 */ /** lower value has higher priority, defaults to 0 */
priority?: number; priority?: number;
/** register events for these workers, defaults to all workers */
workers?: ImmichWorker[];
}; };
export const OnEvent = (config: EventConfig) => SetMetadata(MetadataKey.EVENT_CONFIG, config); export const OnEvent = (config: EventConfig) => SetMetadata(MetadataKey.EVENT_CONFIG, config);

View File

@ -11,7 +11,8 @@ import { ClassConstructor } from 'class-transformer';
import _ from 'lodash'; import _ from 'lodash';
import { Server, Socket } from 'socket.io'; import { Server, Socket } from 'socket.io';
import { EventConfig } from 'src/decorators'; import { EventConfig } from 'src/decorators';
import { MetadataKey } from 'src/enum'; import { ImmichWorker, MetadataKey } from 'src/enum';
import { IConfigRepository } from 'src/interfaces/config.interface';
import { import {
ArgsOf, ArgsOf,
ClientEventMap, ClientEventMap,
@ -23,6 +24,7 @@ import {
ServerEvents, ServerEvents,
} from 'src/interfaces/event.interface'; } from 'src/interfaces/event.interface';
import { ILoggerRepository } from 'src/interfaces/logger.interface'; import { ILoggerRepository } from 'src/interfaces/logger.interface';
import { ConfigRepository } from 'src/repositories/config.repository';
import { AuthService } from 'src/services/auth.service'; import { AuthService } from 'src/services/auth.service';
import { handlePromiseError } from 'src/utils/misc'; import { handlePromiseError } from 'src/utils/misc';
@ -50,6 +52,7 @@ export class EventRepository implements OnGatewayConnection, OnGatewayDisconnect
constructor( constructor(
private moduleRef: ModuleRef, private moduleRef: ModuleRef,
@Inject(IConfigRepository) private configRepository: ConfigRepository,
@Inject(ILoggerRepository) private logger: ILoggerRepository, @Inject(ILoggerRepository) private logger: ILoggerRepository,
) { ) {
this.logger.setContext(EventRepository.name); this.logger.setContext(EventRepository.name);
@ -58,6 +61,10 @@ export class EventRepository implements OnGatewayConnection, OnGatewayDisconnect
setup({ services }: { services: ClassConstructor<unknown>[] }) { setup({ services }: { services: ClassConstructor<unknown>[] }) {
const reflector = this.moduleRef.get(Reflector, { strict: false }); const reflector = this.moduleRef.get(Reflector, { strict: false });
const items: Item<EmitEvent>[] = []; const items: Item<EmitEvent>[] = [];
const worker = this.configRepository.getWorker();
if (!worker) {
throw new Error('Unable to determine worker type');
}
// discovery // discovery
for (const Service of services) { for (const Service of services) {
@ -79,6 +86,11 @@ export class EventRepository implements OnGatewayConnection, OnGatewayDisconnect
continue; continue;
} }
const workers = event.workers ?? Object.values(ImmichWorker);
if (!workers.includes(worker)) {
continue;
}
items.push({ items.push({
event: event.name, event: event.name,
priority: event.priority || 0, priority: event.priority || 0,
@ -133,7 +145,7 @@ export class EventRepository implements OnGatewayConnection, OnGatewayDisconnect
await client.leave(client.nsp.name); await client.leave(client.nsp.name);
} }
private addHandler<T extends EmitEvent>(item: EventItem<T>): void { private addHandler<T extends EmitEvent>(item: Item<T>): void {
const event = item.event; const event = item.event;
if (!this.emitHandlers[event]) { if (!this.emitHandlers[event]) {

View File

@ -14,16 +14,12 @@ import { handlePromiseError } from 'src/utils/misc';
export class BackupService extends BaseService { export class BackupService extends BaseService {
private backupLock = false; private backupLock = false;
@OnEvent({ name: 'config.init' }) @OnEvent({ name: 'config.init', workers: [ImmichWorker.MICROSERVICES] })
async onConfigInit({ async onConfigInit({
newConfig: { newConfig: {
backup: { database }, backup: { database },
}, },
}: ArgOf<'config.init'>) { }: ArgOf<'config.init'>) {
if (this.worker !== ImmichWorker.API) {
return;
}
this.backupLock = await this.databaseRepository.tryLock(DatabaseLock.BackupDatabase); this.backupLock = await this.databaseRepository.tryLock(DatabaseLock.BackupDatabase);
if (this.backupLock) { if (this.backupLock) {

View File

@ -38,12 +38,8 @@ const asJobItem = (dto: JobCreateDto): JobItem => {
@Injectable() @Injectable()
export class JobService extends BaseService { export class JobService extends BaseService {
@OnEvent({ name: 'config.init' }) @OnEvent({ name: 'config.init', workers: [ImmichWorker.MICROSERVICES] })
onConfigInit({ newConfig: config }: ArgOf<'config.init'>) { onConfigInit({ newConfig: config }: ArgOf<'config.init'>) {
if (this.worker !== ImmichWorker.MICROSERVICES) {
return;
}
this.logger.debug(`Updating queue concurrency settings`); this.logger.debug(`Updating queue concurrency settings`);
for (const queueName of Object.values(QueueName)) { for (const queueName of Object.values(QueueName)) {
let concurrency = 1; let concurrency = 1;

View File

@ -122,13 +122,6 @@ describe(LibraryService.name, () => {
expect(cronMock.create).not.toHaveBeenCalled(); expect(cronMock.create).not.toHaveBeenCalled();
}); });
it('should not initialize watcher or library scan job when running on api', async () => {
configMock.getWorker.mockReturnValue(ImmichWorker.API);
await sut.onConfigInit({ newConfig: systemConfigStub.libraryScan as SystemConfig });
expect(cronMock.create).not.toHaveBeenCalled();
});
}); });
describe('onConfigUpdateEvent', () => { describe('onConfigUpdateEvent', () => {

View File

@ -31,16 +31,12 @@ export class LibraryService extends BaseService {
private lock = false; private lock = false;
private watchers: Record<string, () => Promise<void>> = {}; private watchers: Record<string, () => Promise<void>> = {};
@OnEvent({ name: 'config.init' }) @OnEvent({ name: 'config.init', workers: [ImmichWorker.MICROSERVICES] })
async onConfigInit({ async onConfigInit({
newConfig: { newConfig: {
library: { watch, scan }, library: { watch, scan },
}, },
}: ArgOf<'config.init'>) { }: ArgOf<'config.init'>) {
if (this.worker !== ImmichWorker.MICROSERVICES) {
return;
}
// This ensures that library watching only occurs in one microservice // This ensures that library watching only occurs in one microservice
this.lock = await this.databaseRepository.tryLock(DatabaseLock.Library); this.lock = await this.databaseRepository.tryLock(DatabaseLock.Library);

View File

@ -94,15 +94,6 @@ describe(MetadataService.name, () => {
expect(mapMock.init).toHaveBeenCalledTimes(1); expect(mapMock.init).toHaveBeenCalledTimes(1);
expect(jobMock.resume).toHaveBeenCalledTimes(1); expect(jobMock.resume).toHaveBeenCalledTimes(1);
}); });
it('should return if running on api', async () => {
configMock.getWorker.mockReturnValue(ImmichWorker.API);
await sut.onBootstrap();
expect(jobMock.pause).not.toHaveBeenCalled();
expect(mapMock.init).not.toHaveBeenCalled();
expect(jobMock.resume).not.toHaveBeenCalled();
});
}); });
describe('handleLivePhotoLinking', () => { describe('handleLivePhotoLinking', () => {

View File

@ -68,11 +68,8 @@ const validateRange = (value: number | undefined, min: number, max: number): Non
@Injectable() @Injectable()
export class MetadataService extends BaseService { export class MetadataService extends BaseService {
@OnEvent({ name: 'app.bootstrap' }) @OnEvent({ name: 'app.bootstrap', workers: [ImmichWorker.MICROSERVICES] })
async onBootstrap() { async onBootstrap() {
if (this.worker !== ImmichWorker.MICROSERVICES) {
return;
}
this.logger.log('Bootstrapping metadata service'); this.logger.log('Bootstrapping metadata service');
await this.init(); await this.init();
} }

View File

@ -67,19 +67,6 @@ describe(SmartInfoService.name, () => {
}); });
describe('onConfigInit', () => { describe('onConfigInit', () => {
it('should return if not microservices', async () => {
configMock.getWorker.mockReturnValue(ImmichWorker.API);
await sut.onConfigInit({ newConfig: systemConfigStub.machineLearningEnabled as SystemConfig });
expect(searchMock.getDimensionSize).not.toHaveBeenCalled();
expect(searchMock.setDimensionSize).not.toHaveBeenCalled();
expect(searchMock.deleteAllSearchEmbeddings).not.toHaveBeenCalled();
expect(jobMock.getQueueStatus).not.toHaveBeenCalled();
expect(jobMock.pause).not.toHaveBeenCalled();
expect(jobMock.waitForQueueCompletion).not.toHaveBeenCalled();
expect(jobMock.resume).not.toHaveBeenCalled();
});
it('should return if machine learning is disabled', async () => { it('should return if machine learning is disabled', async () => {
await sut.onConfigInit({ newConfig: systemConfigStub.machineLearningDisabled as SystemConfig }); await sut.onConfigInit({ newConfig: systemConfigStub.machineLearningDisabled as SystemConfig });
@ -136,22 +123,6 @@ describe(SmartInfoService.name, () => {
}); });
describe('onConfigUpdateEvent', () => { describe('onConfigUpdateEvent', () => {
it('should return if not microservices', async () => {
configMock.getWorker.mockReturnValue(ImmichWorker.API);
await sut.onConfigUpdate({
newConfig: systemConfigStub.machineLearningEnabled as SystemConfig,
oldConfig: systemConfigStub.machineLearningEnabled as SystemConfig,
});
expect(searchMock.getDimensionSize).not.toHaveBeenCalled();
expect(searchMock.setDimensionSize).not.toHaveBeenCalled();
expect(searchMock.deleteAllSearchEmbeddings).not.toHaveBeenCalled();
expect(jobMock.getQueueStatus).not.toHaveBeenCalled();
expect(jobMock.pause).not.toHaveBeenCalled();
expect(jobMock.waitForQueueCompletion).not.toHaveBeenCalled();
expect(jobMock.resume).not.toHaveBeenCalled();
});
it('should return if machine learning is disabled', async () => { it('should return if machine learning is disabled', async () => {
systemMock.get.mockResolvedValue(systemConfigStub.machineLearningDisabled); systemMock.get.mockResolvedValue(systemConfigStub.machineLearningDisabled);

View File

@ -13,12 +13,12 @@ import { usePagination } from 'src/utils/pagination';
@Injectable() @Injectable()
export class SmartInfoService extends BaseService { export class SmartInfoService extends BaseService {
@OnEvent({ name: 'config.init' }) @OnEvent({ name: 'config.init', workers: [ImmichWorker.MICROSERVICES] })
async onConfigInit({ newConfig }: ArgOf<'config.init'>) { async onConfigInit({ newConfig }: ArgOf<'config.init'>) {
await this.init(newConfig); await this.init(newConfig);
} }
@OnEvent({ name: 'config.update', server: true }) @OnEvent({ name: 'config.update', workers: [ImmichWorker.MICROSERVICES], server: true })
async onConfigUpdate({ oldConfig, newConfig }: ArgOf<'config.update'>) { async onConfigUpdate({ oldConfig, newConfig }: ArgOf<'config.update'>) {
await this.init(newConfig, oldConfig); await this.init(newConfig, oldConfig);
} }
@ -35,7 +35,7 @@ export class SmartInfoService extends BaseService {
} }
private async init(newConfig: SystemConfig, oldConfig?: SystemConfig) { private async init(newConfig: SystemConfig, oldConfig?: SystemConfig) {
if (this.worker !== ImmichWorker.MICROSERVICES || !isSmartSearchEnabled(newConfig.machineLearning)) { if (!isSmartSearchEnabled(newConfig.machineLearning)) {
return; return;
} }