mirror of
				https://github.com/immich-app/immich.git
				synced 2025-11-03 19:17:11 -05:00 
			
		
		
		
	refactor(server): event emits (#10648)
* refactor(server): event emits * refactor: change default priority to 0
This commit is contained in:
		
							parent
							
								
									7e99394c70
								
							
						
					
					
						commit
						72bf9439b0
					
				@ -1,7 +1,7 @@
 | 
			
		||||
import { BullModule } from '@nestjs/bullmq';
 | 
			
		||||
import { Module, OnModuleInit, ValidationPipe } from '@nestjs/common';
 | 
			
		||||
import { Inject, Module, OnModuleDestroy, OnModuleInit, ValidationPipe } from '@nestjs/common';
 | 
			
		||||
import { ConfigModule } from '@nestjs/config';
 | 
			
		||||
import { APP_FILTER, APP_GUARD, APP_INTERCEPTOR, APP_PIPE } from '@nestjs/core';
 | 
			
		||||
import { APP_FILTER, APP_GUARD, APP_INTERCEPTOR, APP_PIPE, ModuleRef } from '@nestjs/core';
 | 
			
		||||
import { EventEmitterModule } from '@nestjs/event-emitter';
 | 
			
		||||
import { ScheduleModule, SchedulerRegistry } from '@nestjs/schedule';
 | 
			
		||||
import { TypeOrmModule } from '@nestjs/typeorm';
 | 
			
		||||
@ -12,6 +12,7 @@ import { bullConfig, bullQueues, clsConfig, immichAppConfig } from 'src/config';
 | 
			
		||||
import { controllers } from 'src/controllers';
 | 
			
		||||
import { databaseConfig } from 'src/database.config';
 | 
			
		||||
import { entities } from 'src/entities';
 | 
			
		||||
import { IEventRepository } from 'src/interfaces/event.interface';
 | 
			
		||||
import { AuthGuard } from 'src/middleware/auth.guard';
 | 
			
		||||
import { ErrorInterceptor } from 'src/middleware/error.interceptor';
 | 
			
		||||
import { FileUploadInterceptor } from 'src/middleware/file-upload.interceptor';
 | 
			
		||||
@ -19,8 +20,7 @@ import { HttpExceptionFilter } from 'src/middleware/http-exception.filter';
 | 
			
		||||
import { LoggingInterceptor } from 'src/middleware/logging.interceptor';
 | 
			
		||||
import { repositories } from 'src/repositories';
 | 
			
		||||
import { services } from 'src/services';
 | 
			
		||||
import { ApiService } from 'src/services/api.service';
 | 
			
		||||
import { MicroservicesService } from 'src/services/microservices.service';
 | 
			
		||||
import { setupEventHandlers } from 'src/utils/events';
 | 
			
		||||
import { otelConfig } from 'src/utils/instrumentation';
 | 
			
		||||
 | 
			
		||||
const common = [...services, ...repositories];
 | 
			
		||||
@ -50,11 +50,19 @@ const imports = [
 | 
			
		||||
  controllers: [...controllers],
 | 
			
		||||
  providers: [...common, ...middleware],
 | 
			
		||||
})
 | 
			
		||||
export class ApiModule implements OnModuleInit {
 | 
			
		||||
  constructor(private service: ApiService) {}
 | 
			
		||||
export class ApiModule implements OnModuleInit, OnModuleDestroy {
 | 
			
		||||
  constructor(
 | 
			
		||||
    private moduleRef: ModuleRef,
 | 
			
		||||
    @Inject(IEventRepository) private eventRepository: IEventRepository,
 | 
			
		||||
  ) {}
 | 
			
		||||
 | 
			
		||||
  async onModuleInit() {
 | 
			
		||||
    await this.service.init();
 | 
			
		||||
    setupEventHandlers(this.moduleRef);
 | 
			
		||||
    await this.eventRepository.emit('onBootstrapEvent', 'api');
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  async onModuleDestroy() {
 | 
			
		||||
    await this.eventRepository.emit('onShutdownEvent');
 | 
			
		||||
  }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -62,11 +70,19 @@ export class ApiModule implements OnModuleInit {
 | 
			
		||||
  imports: [...imports],
 | 
			
		||||
  providers: [...common, SchedulerRegistry],
 | 
			
		||||
})
 | 
			
		||||
export class MicroservicesModule implements OnModuleInit {
 | 
			
		||||
  constructor(private service: MicroservicesService) {}
 | 
			
		||||
export class MicroservicesModule implements OnModuleInit, OnModuleDestroy {
 | 
			
		||||
  constructor(
 | 
			
		||||
    private moduleRef: ModuleRef,
 | 
			
		||||
    @Inject(IEventRepository) private eventRepository: IEventRepository,
 | 
			
		||||
  ) {}
 | 
			
		||||
 | 
			
		||||
  async onModuleInit() {
 | 
			
		||||
    await this.service.init();
 | 
			
		||||
    setupEventHandlers(this.moduleRef);
 | 
			
		||||
    await this.eventRepository.emit('onBootstrapEvent', 'microservices');
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  async onModuleDestroy() {
 | 
			
		||||
    await this.eventRepository.emit('onShutdownEvent');
 | 
			
		||||
  }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -4,7 +4,8 @@ import { OnEventOptions } from '@nestjs/event-emitter/dist/interfaces';
 | 
			
		||||
import { ApiExtension, ApiOperation, ApiProperty, ApiTags } from '@nestjs/swagger';
 | 
			
		||||
import _ from 'lodash';
 | 
			
		||||
import { ADDED_IN_PREFIX, DEPRECATED_IN_PREFIX, LIFECYCLE_EXTENSION } from 'src/constants';
 | 
			
		||||
import { ServerAsyncEvent, ServerEvent } from 'src/interfaces/event.interface';
 | 
			
		||||
import { ServerEvent } from 'src/interfaces/event.interface';
 | 
			
		||||
import { Metadata } from 'src/middleware/auth.guard';
 | 
			
		||||
import { setUnion } from 'src/utils/set';
 | 
			
		||||
 | 
			
		||||
// PostgreSQL uses a 16-bit integer to indicate the number of bound parameters. This means that the
 | 
			
		||||
@ -129,9 +130,15 @@ export interface GenerateSqlQueries {
 | 
			
		||||
/** Decorator to enable versioning/tracking of generated Sql */
 | 
			
		||||
export const GenerateSql = (...options: GenerateSqlQueries[]) => SetMetadata(GENERATE_SQL_KEY, options);
 | 
			
		||||
 | 
			
		||||
export const OnServerEvent = (event: ServerEvent | ServerAsyncEvent, options?: OnEventOptions) =>
 | 
			
		||||
export const OnServerEvent = (event: ServerEvent, options?: OnEventOptions) =>
 | 
			
		||||
  OnEvent(event, { suppressErrors: false, ...options });
 | 
			
		||||
 | 
			
		||||
export type HandlerOptions = {
 | 
			
		||||
  /** lower value has higher priority, defaults to 0 */
 | 
			
		||||
  priority: number;
 | 
			
		||||
};
 | 
			
		||||
export const EventHandlerOptions = (options: HandlerOptions) => SetMetadata(Metadata.EVENT_HANDLER_OPTIONS, options);
 | 
			
		||||
 | 
			
		||||
type LifecycleRelease = 'NEXT_RELEASE' | string;
 | 
			
		||||
type LifecycleMetadata = {
 | 
			
		||||
  addedAt?: LifecycleRelease;
 | 
			
		||||
 | 
			
		||||
@ -4,6 +4,23 @@ import { ReleaseNotification, ServerVersionResponseDto } from 'src/dtos/server-i
 | 
			
		||||
 | 
			
		||||
export const IEventRepository = 'IEventRepository';
 | 
			
		||||
 | 
			
		||||
type MaybePromise<T> = Promise<T> | T;
 | 
			
		||||
 | 
			
		||||
const noop = () => {};
 | 
			
		||||
const dummyHandlers = {
 | 
			
		||||
  onBootstrapEvent: noop as (app: 'api' | 'microservices') => MaybePromise<void>,
 | 
			
		||||
  onShutdownEvent: noop as () => MaybePromise<void>,
 | 
			
		||||
  onConfigUpdateEvent: noop as (update: SystemConfigUpdate) => MaybePromise<void>,
 | 
			
		||||
  onConfigValidateEvent: noop as (update: SystemConfigUpdate) => MaybePromise<void>,
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
export type SystemConfigUpdate = { newConfig: SystemConfig; oldConfig: SystemConfig };
 | 
			
		||||
export type EventHandlers = typeof dummyHandlers;
 | 
			
		||||
export type EmitEvent = keyof EventHandlers;
 | 
			
		||||
export type EmitEventHandler<T extends EmitEvent> = (...args: Parameters<EventHandlers[T]>) => MaybePromise<void>;
 | 
			
		||||
export const events = Object.keys(dummyHandlers) as EmitEvent[];
 | 
			
		||||
export type OnEvents = Partial<EventHandlers>;
 | 
			
		||||
 | 
			
		||||
export enum ClientEvent {
 | 
			
		||||
  UPLOAD_SUCCESS = 'on_upload_success',
 | 
			
		||||
  USER_DELETE = 'on_user_delete',
 | 
			
		||||
@ -44,15 +61,10 @@ export interface ServerEventMap {
 | 
			
		||||
  [ServerEvent.WEBSOCKET_CONNECT]: { userId: string };
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
export enum ServerAsyncEvent {
 | 
			
		||||
  CONFIG_VALIDATE = 'config.validate',
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
export interface ServerAsyncEventMap {
 | 
			
		||||
  [ServerAsyncEvent.CONFIG_VALIDATE]: { newConfig: SystemConfig; oldConfig: SystemConfig };
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
export interface IEventRepository {
 | 
			
		||||
  on<T extends EmitEvent>(event: T, handler: EmitEventHandler<T>): void;
 | 
			
		||||
  emit<T extends EmitEvent>(event: T, ...args: Parameters<EmitEventHandler<T>>): Promise<void>;
 | 
			
		||||
 | 
			
		||||
  /**
 | 
			
		||||
   * Send to connected clients for a specific user
 | 
			
		||||
   */
 | 
			
		||||
@ -65,8 +77,4 @@ export interface IEventRepository {
 | 
			
		||||
   * Notify listeners in this and connected processes. Subscribe to an event with `@OnServerEvent`
 | 
			
		||||
   */
 | 
			
		||||
  serverSend<E extends keyof ServerEventMap>(event: E, data: ServerEventMap[E]): boolean;
 | 
			
		||||
  /**
 | 
			
		||||
   * Notify and wait for responses from listeners in this process. Subscribe to an event with `@OnServerEvent`
 | 
			
		||||
   */
 | 
			
		||||
  serverSendAsync<E extends keyof ServerAsyncEventMap>(event: E, data: ServerAsyncEventMap[E]): Promise<any>;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -20,6 +20,7 @@ export enum Metadata {
 | 
			
		||||
  ADMIN_ROUTE = 'admin_route',
 | 
			
		||||
  SHARED_ROUTE = 'shared_route',
 | 
			
		||||
  API_KEY_SECURITY = 'api_key',
 | 
			
		||||
  EVENT_HANDLER_OPTIONS = 'event_handler_options',
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type AdminRoute = { admin?: true };
 | 
			
		||||
 | 
			
		||||
@ -10,8 +10,9 @@ import {
 | 
			
		||||
import { Server, Socket } from 'socket.io';
 | 
			
		||||
import {
 | 
			
		||||
  ClientEventMap,
 | 
			
		||||
  EmitEvent,
 | 
			
		||||
  EmitEventHandler,
 | 
			
		||||
  IEventRepository,
 | 
			
		||||
  ServerAsyncEventMap,
 | 
			
		||||
  ServerEvent,
 | 
			
		||||
  ServerEventMap,
 | 
			
		||||
} from 'src/interfaces/event.interface';
 | 
			
		||||
@ -27,6 +28,8 @@ import { Instrumentation } from 'src/utils/instrumentation';
 | 
			
		||||
})
 | 
			
		||||
@Injectable()
 | 
			
		||||
export class EventRepository implements OnGatewayConnection, OnGatewayDisconnect, OnGatewayInit, IEventRepository {
 | 
			
		||||
  private emitHandlers: Partial<Record<EmitEvent, EmitEventHandler<EmitEvent>[]>> = {};
 | 
			
		||||
 | 
			
		||||
  @WebSocketServer()
 | 
			
		||||
  private server?: Server;
 | 
			
		||||
 | 
			
		||||
@ -71,6 +74,18 @@ export class EventRepository implements OnGatewayConnection, OnGatewayDisconnect
 | 
			
		||||
    await client.leave(client.nsp.name);
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  on<T extends EmitEvent>(event: T, handler: EmitEventHandler<T>): void {
 | 
			
		||||
    const handlers: EmitEventHandler<EmitEvent>[] = this.emitHandlers[event] || [];
 | 
			
		||||
    this.emitHandlers[event] = [...handlers, handler];
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  async emit<T extends EmitEvent>(event: T, ...args: Parameters<EmitEventHandler<T>>): Promise<void> {
 | 
			
		||||
    const handlers = this.emitHandlers[event] || [];
 | 
			
		||||
    for (const handler of handlers) {
 | 
			
		||||
      await handler(...args);
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  clientSend<E extends keyof ClientEventMap>(event: E, userId: string, data: ClientEventMap[E]) {
 | 
			
		||||
    this.server?.to(userId).emit(event, data);
 | 
			
		||||
  }
 | 
			
		||||
@ -84,8 +99,4 @@ export class EventRepository implements OnGatewayConnection, OnGatewayDisconnect
 | 
			
		||||
    this.server?.serverSideEmit(event, data);
 | 
			
		||||
    return this.eventEmitter.emit(event, data);
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  serverSendAsync<E extends keyof ServerAsyncEventMap, R = any[]>(event: E, data: ServerAsyncEventMap[E]): Promise<R> {
 | 
			
		||||
    return this.eventEmitter.emitAsync(event, data) as Promise<R>;
 | 
			
		||||
  }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -6,12 +6,8 @@ import { join } from 'node:path';
 | 
			
		||||
import { ONE_HOUR, WEB_ROOT } from 'src/constants';
 | 
			
		||||
import { ILoggerRepository } from 'src/interfaces/logger.interface';
 | 
			
		||||
import { AuthService } from 'src/services/auth.service';
 | 
			
		||||
import { DatabaseService } from 'src/services/database.service';
 | 
			
		||||
import { JobService } from 'src/services/job.service';
 | 
			
		||||
import { ServerInfoService } from 'src/services/server-info.service';
 | 
			
		||||
import { SharedLinkService } from 'src/services/shared-link.service';
 | 
			
		||||
import { StorageService } from 'src/services/storage.service';
 | 
			
		||||
import { SystemConfigService } from 'src/services/system-config.service';
 | 
			
		||||
import { VersionService } from 'src/services/version.service';
 | 
			
		||||
import { OpenGraphTags } from 'src/utils/misc';
 | 
			
		||||
 | 
			
		||||
@ -39,12 +35,8 @@ const render = (index: string, meta: OpenGraphTags) => {
 | 
			
		||||
export class ApiService {
 | 
			
		||||
  constructor(
 | 
			
		||||
    private authService: AuthService,
 | 
			
		||||
    private configService: SystemConfigService,
 | 
			
		||||
    private jobService: JobService,
 | 
			
		||||
    private serverService: ServerInfoService,
 | 
			
		||||
    private sharedLinkService: SharedLinkService,
 | 
			
		||||
    private storageService: StorageService,
 | 
			
		||||
    private databaseService: DatabaseService,
 | 
			
		||||
    private versionService: VersionService,
 | 
			
		||||
    @Inject(ILoggerRepository) private logger: ILoggerRepository,
 | 
			
		||||
  ) {
 | 
			
		||||
@ -61,15 +53,6 @@ export class ApiService {
 | 
			
		||||
    await this.jobService.handleNightlyJobs();
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  async init() {
 | 
			
		||||
    await this.databaseService.init();
 | 
			
		||||
    await this.configService.init();
 | 
			
		||||
    this.storageService.init();
 | 
			
		||||
    await this.serverService.init();
 | 
			
		||||
    await this.versionService.init();
 | 
			
		||||
    this.logger.log(`Feature Flags: ${JSON.stringify(await this.serverService.getFeatures(), null, 2)}`);
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  ssr(excludePaths: string[]) {
 | 
			
		||||
    let index = '';
 | 
			
		||||
    try {
 | 
			
		||||
 | 
			
		||||
@ -27,7 +27,7 @@ describe(DatabaseService.name, () => {
 | 
			
		||||
  it('should throw an error if PostgreSQL version is below minimum supported version', async () => {
 | 
			
		||||
    databaseMock.getPostgresVersion.mockResolvedValueOnce('13.10.0');
 | 
			
		||||
 | 
			
		||||
    await expect(sut.init()).rejects.toThrow('Invalid PostgreSQL version. Found 13.10.0');
 | 
			
		||||
    await expect(sut.onBootstrapEvent()).rejects.toThrow('Invalid PostgreSQL version. Found 13.10.0');
 | 
			
		||||
 | 
			
		||||
    expect(databaseMock.getPostgresVersion).toHaveBeenCalledTimes(1);
 | 
			
		||||
  });
 | 
			
		||||
@ -35,7 +35,7 @@ describe(DatabaseService.name, () => {
 | 
			
		||||
  it(`should start up successfully with pgvectors`, async () => {
 | 
			
		||||
    databaseMock.getPostgresVersion.mockResolvedValue('14.0.0');
 | 
			
		||||
 | 
			
		||||
    await expect(sut.init()).resolves.toBeUndefined();
 | 
			
		||||
    await expect(sut.onBootstrapEvent()).resolves.toBeUndefined();
 | 
			
		||||
 | 
			
		||||
    expect(databaseMock.getPostgresVersion).toHaveBeenCalled();
 | 
			
		||||
    expect(databaseMock.createExtension).toHaveBeenCalledWith(DatabaseExtension.VECTORS);
 | 
			
		||||
@ -50,7 +50,7 @@ describe(DatabaseService.name, () => {
 | 
			
		||||
    databaseMock.getPostgresVersion.mockResolvedValue('14.0.0');
 | 
			
		||||
    databaseMock.getExtensionVersion.mockResolvedValue('0.5.0');
 | 
			
		||||
 | 
			
		||||
    await expect(sut.init()).resolves.toBeUndefined();
 | 
			
		||||
    await expect(sut.onBootstrapEvent()).resolves.toBeUndefined();
 | 
			
		||||
 | 
			
		||||
    expect(databaseMock.createExtension).toHaveBeenCalledWith(DatabaseExtension.VECTOR);
 | 
			
		||||
    expect(databaseMock.createExtension).toHaveBeenCalledTimes(1);
 | 
			
		||||
@ -60,7 +60,7 @@ describe(DatabaseService.name, () => {
 | 
			
		||||
 | 
			
		||||
  it(`should throw an error if the pgvecto.rs extension is not installed`, async () => {
 | 
			
		||||
    databaseMock.getExtensionVersion.mockResolvedValue('');
 | 
			
		||||
    await expect(sut.init()).rejects.toThrow(`Unexpected: The pgvecto.rs extension is not installed.`);
 | 
			
		||||
    await expect(sut.onBootstrapEvent()).rejects.toThrow(`Unexpected: The pgvecto.rs extension is not installed.`);
 | 
			
		||||
 | 
			
		||||
    expect(databaseMock.createExtension).toHaveBeenCalledTimes(1);
 | 
			
		||||
    expect(databaseMock.runMigrations).not.toHaveBeenCalled();
 | 
			
		||||
@ -69,7 +69,7 @@ describe(DatabaseService.name, () => {
 | 
			
		||||
  it(`should throw an error if the pgvector extension is not installed`, async () => {
 | 
			
		||||
    process.env.DB_VECTOR_EXTENSION = 'pgvector';
 | 
			
		||||
    databaseMock.getExtensionVersion.mockResolvedValue('');
 | 
			
		||||
    await expect(sut.init()).rejects.toThrow(`Unexpected: The pgvector extension is not installed.`);
 | 
			
		||||
    await expect(sut.onBootstrapEvent()).rejects.toThrow(`Unexpected: The pgvector extension is not installed.`);
 | 
			
		||||
 | 
			
		||||
    expect(databaseMock.createExtension).toHaveBeenCalledTimes(1);
 | 
			
		||||
    expect(databaseMock.runMigrations).not.toHaveBeenCalled();
 | 
			
		||||
@ -78,7 +78,7 @@ describe(DatabaseService.name, () => {
 | 
			
		||||
  it(`should throw an error if the pgvecto.rs extension version is below minimum supported version`, async () => {
 | 
			
		||||
    databaseMock.getExtensionVersion.mockResolvedValue('0.1.0');
 | 
			
		||||
 | 
			
		||||
    await expect(sut.init()).rejects.toThrow(
 | 
			
		||||
    await expect(sut.onBootstrapEvent()).rejects.toThrow(
 | 
			
		||||
      'The pgvecto.rs extension version is 0.1.0, but Immich only supports 0.2.x.',
 | 
			
		||||
    );
 | 
			
		||||
 | 
			
		||||
@ -89,7 +89,7 @@ describe(DatabaseService.name, () => {
 | 
			
		||||
    process.env.DB_VECTOR_EXTENSION = 'pgvector';
 | 
			
		||||
    databaseMock.getExtensionVersion.mockResolvedValue('0.1.0');
 | 
			
		||||
 | 
			
		||||
    await expect(sut.init()).rejects.toThrow(
 | 
			
		||||
    await expect(sut.onBootstrapEvent()).rejects.toThrow(
 | 
			
		||||
      'The pgvector extension version is 0.1.0, but Immich only supports >=0.5 <1',
 | 
			
		||||
    );
 | 
			
		||||
 | 
			
		||||
@ -99,7 +99,7 @@ describe(DatabaseService.name, () => {
 | 
			
		||||
  it(`should throw an error if pgvecto.rs extension version is a nightly`, async () => {
 | 
			
		||||
    databaseMock.getExtensionVersion.mockResolvedValue('0.0.0');
 | 
			
		||||
 | 
			
		||||
    await expect(sut.init()).rejects.toThrow(
 | 
			
		||||
    await expect(sut.onBootstrapEvent()).rejects.toThrow(
 | 
			
		||||
      'The pgvecto.rs extension version is 0.0.0, which means it is a nightly release.',
 | 
			
		||||
    );
 | 
			
		||||
 | 
			
		||||
@ -111,7 +111,7 @@ describe(DatabaseService.name, () => {
 | 
			
		||||
    process.env.DB_VECTOR_EXTENSION = 'pgvector';
 | 
			
		||||
    databaseMock.getExtensionVersion.mockResolvedValue('0.0.0');
 | 
			
		||||
 | 
			
		||||
    await expect(sut.init()).rejects.toThrow(
 | 
			
		||||
    await expect(sut.onBootstrapEvent()).rejects.toThrow(
 | 
			
		||||
      'The pgvector extension version is 0.0.0, which means it is a nightly release.',
 | 
			
		||||
    );
 | 
			
		||||
 | 
			
		||||
@ -122,7 +122,7 @@ describe(DatabaseService.name, () => {
 | 
			
		||||
  it(`should throw error if pgvecto.rs extension could not be created`, async () => {
 | 
			
		||||
    databaseMock.createExtension.mockRejectedValue(new Error('Failed to create extension'));
 | 
			
		||||
 | 
			
		||||
    await expect(sut.init()).rejects.toThrow('Failed to create extension');
 | 
			
		||||
    await expect(sut.onBootstrapEvent()).rejects.toThrow('Failed to create extension');
 | 
			
		||||
 | 
			
		||||
    expect(loggerMock.fatal).toHaveBeenCalledTimes(1);
 | 
			
		||||
    expect(loggerMock.fatal.mock.calls[0][0]).toContain(
 | 
			
		||||
@ -137,7 +137,7 @@ describe(DatabaseService.name, () => {
 | 
			
		||||
    databaseMock.getExtensionVersion.mockResolvedValue('0.0.0');
 | 
			
		||||
    databaseMock.createExtension.mockRejectedValue(new Error('Failed to create extension'));
 | 
			
		||||
 | 
			
		||||
    await expect(sut.init()).rejects.toThrow('Failed to create extension');
 | 
			
		||||
    await expect(sut.onBootstrapEvent()).rejects.toThrow('Failed to create extension');
 | 
			
		||||
 | 
			
		||||
    expect(loggerMock.fatal).toHaveBeenCalledTimes(1);
 | 
			
		||||
    expect(loggerMock.fatal.mock.calls[0][0]).toContain(
 | 
			
		||||
@ -152,7 +152,7 @@ describe(DatabaseService.name, () => {
 | 
			
		||||
      databaseMock.getAvailableExtensionVersion.mockResolvedValue(version);
 | 
			
		||||
      databaseMock.getExtensionVersion.mockResolvedValue(version);
 | 
			
		||||
 | 
			
		||||
      await expect(sut.init()).resolves.toBeUndefined();
 | 
			
		||||
      await expect(sut.onBootstrapEvent()).resolves.toBeUndefined();
 | 
			
		||||
 | 
			
		||||
      expect(databaseMock.updateVectorExtension).toHaveBeenCalledWith('vectors', version);
 | 
			
		||||
      expect(databaseMock.updateVectorExtension).toHaveBeenCalledTimes(1);
 | 
			
		||||
@ -168,7 +168,7 @@ describe(DatabaseService.name, () => {
 | 
			
		||||
      databaseMock.getAvailableExtensionVersion.mockResolvedValue(version);
 | 
			
		||||
      databaseMock.getExtensionVersion.mockResolvedValue(version);
 | 
			
		||||
 | 
			
		||||
      await expect(sut.init()).resolves.toBeUndefined();
 | 
			
		||||
      await expect(sut.onBootstrapEvent()).resolves.toBeUndefined();
 | 
			
		||||
 | 
			
		||||
      expect(databaseMock.updateVectorExtension).toHaveBeenCalledWith('vector', version);
 | 
			
		||||
      expect(databaseMock.updateVectorExtension).toHaveBeenCalledTimes(1);
 | 
			
		||||
@ -182,7 +182,7 @@ describe(DatabaseService.name, () => {
 | 
			
		||||
    it(`should not upgrade pgvecto.rs to ${version}`, async () => {
 | 
			
		||||
      databaseMock.getAvailableExtensionVersion.mockResolvedValue(version);
 | 
			
		||||
 | 
			
		||||
      await expect(sut.init()).resolves.toBeUndefined();
 | 
			
		||||
      await expect(sut.onBootstrapEvent()).resolves.toBeUndefined();
 | 
			
		||||
 | 
			
		||||
      expect(databaseMock.updateVectorExtension).not.toHaveBeenCalled();
 | 
			
		||||
      expect(databaseMock.runMigrations).toHaveBeenCalledTimes(1);
 | 
			
		||||
@ -196,7 +196,7 @@ describe(DatabaseService.name, () => {
 | 
			
		||||
      databaseMock.getExtensionVersion.mockResolvedValue('0.5.0');
 | 
			
		||||
      databaseMock.getAvailableExtensionVersion.mockResolvedValue(version);
 | 
			
		||||
 | 
			
		||||
      await expect(sut.init()).resolves.toBeUndefined();
 | 
			
		||||
      await expect(sut.onBootstrapEvent()).resolves.toBeUndefined();
 | 
			
		||||
 | 
			
		||||
      expect(databaseMock.updateVectorExtension).not.toHaveBeenCalled();
 | 
			
		||||
      expect(databaseMock.runMigrations).toHaveBeenCalledTimes(1);
 | 
			
		||||
@ -210,7 +210,7 @@ describe(DatabaseService.name, () => {
 | 
			
		||||
    databaseMock.getAvailableExtensionVersion.mockResolvedValue('0.5.2');
 | 
			
		||||
    databaseMock.updateVectorExtension.mockRejectedValue(new Error('Failed to update extension'));
 | 
			
		||||
 | 
			
		||||
    await expect(sut.init()).resolves.toBeUndefined();
 | 
			
		||||
    await expect(sut.onBootstrapEvent()).resolves.toBeUndefined();
 | 
			
		||||
 | 
			
		||||
    expect(loggerMock.warn.mock.calls[0][0]).toContain('The pgvector extension can be updated to 0.5.2.');
 | 
			
		||||
    expect(loggerMock.error).toHaveBeenCalledTimes(1);
 | 
			
		||||
@ -223,7 +223,7 @@ describe(DatabaseService.name, () => {
 | 
			
		||||
    databaseMock.getAvailableExtensionVersion.mockResolvedValue('0.2.1');
 | 
			
		||||
    databaseMock.updateVectorExtension.mockRejectedValue(new Error('Failed to update extension'));
 | 
			
		||||
 | 
			
		||||
    await expect(sut.init()).resolves.toBeUndefined();
 | 
			
		||||
    await expect(sut.onBootstrapEvent()).resolves.toBeUndefined();
 | 
			
		||||
 | 
			
		||||
    expect(loggerMock.warn.mock.calls[0][0]).toContain('The pgvecto.rs extension can be updated to 0.2.1.');
 | 
			
		||||
    expect(loggerMock.error).toHaveBeenCalledTimes(1);
 | 
			
		||||
@ -236,7 +236,7 @@ describe(DatabaseService.name, () => {
 | 
			
		||||
    databaseMock.getAvailableExtensionVersion.mockResolvedValue('0.2.1');
 | 
			
		||||
    databaseMock.updateVectorExtension.mockResolvedValue({ restartRequired: true });
 | 
			
		||||
 | 
			
		||||
    await expect(sut.init()).resolves.toBeUndefined();
 | 
			
		||||
    await expect(sut.onBootstrapEvent()).resolves.toBeUndefined();
 | 
			
		||||
 | 
			
		||||
    expect(loggerMock.warn).toHaveBeenCalledTimes(1);
 | 
			
		||||
    expect(loggerMock.warn.mock.calls[0][0]).toContain('pgvecto.rs');
 | 
			
		||||
@ -251,7 +251,7 @@ describe(DatabaseService.name, () => {
 | 
			
		||||
    databaseMock.getAvailableExtensionVersion.mockResolvedValue('0.5.1');
 | 
			
		||||
    databaseMock.updateVectorExtension.mockResolvedValue({ restartRequired: true });
 | 
			
		||||
 | 
			
		||||
    await expect(sut.init()).resolves.toBeUndefined();
 | 
			
		||||
    await expect(sut.onBootstrapEvent()).resolves.toBeUndefined();
 | 
			
		||||
 | 
			
		||||
    expect(loggerMock.warn).toHaveBeenCalledTimes(1);
 | 
			
		||||
    expect(loggerMock.warn.mock.calls[0][0]).toContain('pgvector');
 | 
			
		||||
@ -263,7 +263,7 @@ describe(DatabaseService.name, () => {
 | 
			
		||||
  it('should reindex if needed', async () => {
 | 
			
		||||
    databaseMock.shouldReindex.mockResolvedValue(true);
 | 
			
		||||
 | 
			
		||||
    await expect(sut.init()).resolves.toBeUndefined();
 | 
			
		||||
    await expect(sut.onBootstrapEvent()).resolves.toBeUndefined();
 | 
			
		||||
 | 
			
		||||
    expect(databaseMock.shouldReindex).toHaveBeenCalledTimes(2);
 | 
			
		||||
    expect(databaseMock.reindex).toHaveBeenCalledTimes(2);
 | 
			
		||||
@ -274,7 +274,7 @@ describe(DatabaseService.name, () => {
 | 
			
		||||
  it('should not reindex if not needed', async () => {
 | 
			
		||||
    databaseMock.shouldReindex.mockResolvedValue(false);
 | 
			
		||||
 | 
			
		||||
    await expect(sut.init()).resolves.toBeUndefined();
 | 
			
		||||
    await expect(sut.onBootstrapEvent()).resolves.toBeUndefined();
 | 
			
		||||
 | 
			
		||||
    expect(databaseMock.shouldReindex).toHaveBeenCalledTimes(2);
 | 
			
		||||
    expect(databaseMock.reindex).toHaveBeenCalledTimes(0);
 | 
			
		||||
@ -286,7 +286,7 @@ describe(DatabaseService.name, () => {
 | 
			
		||||
    process.env.DB_SKIP_MIGRATIONS = 'true';
 | 
			
		||||
    databaseMock.getExtensionVersion.mockResolvedValue('0.2.0');
 | 
			
		||||
 | 
			
		||||
    await expect(sut.init()).resolves.toBeUndefined();
 | 
			
		||||
    await expect(sut.onBootstrapEvent()).resolves.toBeUndefined();
 | 
			
		||||
 | 
			
		||||
    expect(databaseMock.runMigrations).not.toHaveBeenCalled();
 | 
			
		||||
  });
 | 
			
		||||
 | 
			
		||||
@ -2,6 +2,7 @@ import { Inject, Injectable } from '@nestjs/common';
 | 
			
		||||
import semver from 'semver';
 | 
			
		||||
import { POSTGRES_VERSION_RANGE, VECTORS_VERSION_RANGE, VECTOR_VERSION_RANGE } from 'src/constants';
 | 
			
		||||
import { getVectorExtension } from 'src/database.config';
 | 
			
		||||
import { EventHandlerOptions } from 'src/decorators';
 | 
			
		||||
import {
 | 
			
		||||
  DatabaseExtension,
 | 
			
		||||
  DatabaseLock,
 | 
			
		||||
@ -9,6 +10,7 @@ import {
 | 
			
		||||
  IDatabaseRepository,
 | 
			
		||||
  VectorIndex,
 | 
			
		||||
} from 'src/interfaces/database.interface';
 | 
			
		||||
import { OnEvents } from 'src/interfaces/event.interface';
 | 
			
		||||
import { ILoggerRepository } from 'src/interfaces/logger.interface';
 | 
			
		||||
 | 
			
		||||
type CreateFailedArgs = { name: string; extension: string; otherName: string };
 | 
			
		||||
@ -63,7 +65,7 @@ const messages = {
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
@Injectable()
 | 
			
		||||
export class DatabaseService {
 | 
			
		||||
export class DatabaseService implements OnEvents {
 | 
			
		||||
  constructor(
 | 
			
		||||
    @Inject(IDatabaseRepository) private databaseRepository: IDatabaseRepository,
 | 
			
		||||
    @Inject(ILoggerRepository) private logger: ILoggerRepository,
 | 
			
		||||
@ -71,7 +73,8 @@ export class DatabaseService {
 | 
			
		||||
    this.logger.setContext(DatabaseService.name);
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  async init() {
 | 
			
		||||
  @EventHandlerOptions({ priority: -200 })
 | 
			
		||||
  async onBootstrapEvent() {
 | 
			
		||||
    const version = await this.databaseRepository.getPostgresVersion();
 | 
			
		||||
    const current = semver.coerce(version);
 | 
			
		||||
    if (!current || !semver.satisfies(current, POSTGRES_VERSION_RANGE)) {
 | 
			
		||||
 | 
			
		||||
@ -69,11 +69,11 @@ describe(LibraryService.name, () => {
 | 
			
		||||
    expect(sut).toBeDefined();
 | 
			
		||||
  });
 | 
			
		||||
 | 
			
		||||
  describe('init', () => {
 | 
			
		||||
  describe('onBootstrapEvent', () => {
 | 
			
		||||
    it('should init cron job and subscribe to config changes', async () => {
 | 
			
		||||
      systemMock.get.mockResolvedValue(systemConfigStub.libraryScan);
 | 
			
		||||
 | 
			
		||||
      await sut.init();
 | 
			
		||||
      await sut.onBootstrapEvent();
 | 
			
		||||
      expect(systemMock.get).toHaveBeenCalled();
 | 
			
		||||
      expect(jobMock.addCronJob).toHaveBeenCalled();
 | 
			
		||||
 | 
			
		||||
@ -105,7 +105,7 @@ describe(LibraryService.name, () => {
 | 
			
		||||
        ),
 | 
			
		||||
      );
 | 
			
		||||
 | 
			
		||||
      await sut.init();
 | 
			
		||||
      await sut.onBootstrapEvent();
 | 
			
		||||
 | 
			
		||||
      expect(storageMock.watch.mock.calls).toEqual(
 | 
			
		||||
        expect.arrayContaining([
 | 
			
		||||
@ -118,7 +118,7 @@ describe(LibraryService.name, () => {
 | 
			
		||||
    it('should not initialize watcher when watching is disabled', async () => {
 | 
			
		||||
      systemMock.get.mockResolvedValue(systemConfigStub.libraryWatchDisabled);
 | 
			
		||||
 | 
			
		||||
      await sut.init();
 | 
			
		||||
      await sut.onBootstrapEvent();
 | 
			
		||||
 | 
			
		||||
      expect(storageMock.watch).not.toHaveBeenCalled();
 | 
			
		||||
    });
 | 
			
		||||
@ -127,16 +127,16 @@ describe(LibraryService.name, () => {
 | 
			
		||||
      systemMock.get.mockResolvedValue(systemConfigStub.libraryWatchEnabled);
 | 
			
		||||
      databaseMock.tryLock.mockResolvedValue(false);
 | 
			
		||||
 | 
			
		||||
      await sut.init();
 | 
			
		||||
      await sut.onBootstrapEvent();
 | 
			
		||||
 | 
			
		||||
      expect(storageMock.watch).not.toHaveBeenCalled();
 | 
			
		||||
    });
 | 
			
		||||
  });
 | 
			
		||||
 | 
			
		||||
  describe('onValidateConfig', () => {
 | 
			
		||||
  describe('onConfigValidateEvent', () => {
 | 
			
		||||
    it('should allow a valid cron expression', () => {
 | 
			
		||||
      expect(() =>
 | 
			
		||||
        sut.onValidateConfig({
 | 
			
		||||
        sut.onConfigValidateEvent({
 | 
			
		||||
          newConfig: { library: { scan: { cronExpression: '0 0 * * *' } } } as SystemConfig,
 | 
			
		||||
          oldConfig: {} as SystemConfig,
 | 
			
		||||
        }),
 | 
			
		||||
@ -145,7 +145,7 @@ describe(LibraryService.name, () => {
 | 
			
		||||
 | 
			
		||||
    it('should fail for an invalid cron expression', () => {
 | 
			
		||||
      expect(() =>
 | 
			
		||||
        sut.onValidateConfig({
 | 
			
		||||
        sut.onConfigValidateEvent({
 | 
			
		||||
          newConfig: { library: { scan: { cronExpression: 'foo' } } } as SystemConfig,
 | 
			
		||||
          oldConfig: {} as SystemConfig,
 | 
			
		||||
        }),
 | 
			
		||||
@ -730,7 +730,7 @@ describe(LibraryService.name, () => {
 | 
			
		||||
      const mockClose = vitest.fn();
 | 
			
		||||
      storageMock.watch.mockImplementation(makeMockWatcher({ close: mockClose }));
 | 
			
		||||
 | 
			
		||||
      await sut.init();
 | 
			
		||||
      await sut.onBootstrapEvent();
 | 
			
		||||
      await sut.delete(libraryStub.externalLibraryWithImportPaths1.id);
 | 
			
		||||
 | 
			
		||||
      expect(mockClose).toHaveBeenCalled();
 | 
			
		||||
@ -861,7 +861,7 @@ describe(LibraryService.name, () => {
 | 
			
		||||
        libraryMock.get.mockResolvedValue(libraryStub.externalLibraryWithImportPaths1);
 | 
			
		||||
        libraryMock.getAll.mockResolvedValue([]);
 | 
			
		||||
 | 
			
		||||
        await sut.init();
 | 
			
		||||
        await sut.onBootstrapEvent();
 | 
			
		||||
        await sut.create({
 | 
			
		||||
          ownerId: authStub.admin.user.id,
 | 
			
		||||
          importPaths: libraryStub.externalLibraryWithImportPaths1.importPaths,
 | 
			
		||||
@ -917,7 +917,7 @@ describe(LibraryService.name, () => {
 | 
			
		||||
      systemMock.get.mockResolvedValue(systemConfigStub.libraryWatchEnabled);
 | 
			
		||||
      libraryMock.getAll.mockResolvedValue([]);
 | 
			
		||||
 | 
			
		||||
      await sut.init();
 | 
			
		||||
      await sut.onBootstrapEvent();
 | 
			
		||||
    });
 | 
			
		||||
 | 
			
		||||
    it('should update library', async () => {
 | 
			
		||||
@ -933,7 +933,7 @@ describe(LibraryService.name, () => {
 | 
			
		||||
      beforeEach(async () => {
 | 
			
		||||
        systemMock.get.mockResolvedValue(systemConfigStub.libraryWatchDisabled);
 | 
			
		||||
 | 
			
		||||
        await sut.init();
 | 
			
		||||
        await sut.onBootstrapEvent();
 | 
			
		||||
      });
 | 
			
		||||
 | 
			
		||||
      it('should not watch library', async () => {
 | 
			
		||||
@ -949,7 +949,7 @@ describe(LibraryService.name, () => {
 | 
			
		||||
      beforeEach(async () => {
 | 
			
		||||
        systemMock.get.mockResolvedValue(systemConfigStub.libraryWatchEnabled);
 | 
			
		||||
        libraryMock.getAll.mockResolvedValue([]);
 | 
			
		||||
        await sut.init();
 | 
			
		||||
        await sut.onBootstrapEvent();
 | 
			
		||||
      });
 | 
			
		||||
 | 
			
		||||
      it('should watch library', async () => {
 | 
			
		||||
@ -1107,8 +1107,8 @@ describe(LibraryService.name, () => {
 | 
			
		||||
      const mockClose = vitest.fn();
 | 
			
		||||
      storageMock.watch.mockImplementation(makeMockWatcher({ close: mockClose }));
 | 
			
		||||
 | 
			
		||||
      await sut.init();
 | 
			
		||||
      await sut.teardown();
 | 
			
		||||
      await sut.onBootstrapEvent();
 | 
			
		||||
      await sut.onShutdownEvent();
 | 
			
		||||
 | 
			
		||||
      expect(mockClose).toHaveBeenCalledTimes(2);
 | 
			
		||||
    });
 | 
			
		||||
 | 
			
		||||
@ -6,7 +6,6 @@ import path, { basename, parse } from 'node:path';
 | 
			
		||||
import picomatch from 'picomatch';
 | 
			
		||||
import { StorageCore } from 'src/cores/storage.core';
 | 
			
		||||
import { SystemConfigCore } from 'src/cores/system-config.core';
 | 
			
		||||
import { OnServerEvent } from 'src/decorators';
 | 
			
		||||
import {
 | 
			
		||||
  CreateLibraryDto,
 | 
			
		||||
  LibraryResponseDto,
 | 
			
		||||
@ -23,7 +22,7 @@ import { LibraryEntity } from 'src/entities/library.entity';
 | 
			
		||||
import { IAssetRepository, WithProperty } from 'src/interfaces/asset.interface';
 | 
			
		||||
import { ICryptoRepository } from 'src/interfaces/crypto.interface';
 | 
			
		||||
import { DatabaseLock, IDatabaseRepository } from 'src/interfaces/database.interface';
 | 
			
		||||
import { ServerAsyncEvent, ServerAsyncEventMap } from 'src/interfaces/event.interface';
 | 
			
		||||
import { OnEvents, SystemConfigUpdate } from 'src/interfaces/event.interface';
 | 
			
		||||
import {
 | 
			
		||||
  IBaseJob,
 | 
			
		||||
  IEntityJob,
 | 
			
		||||
@ -46,7 +45,7 @@ import { validateCronExpression } from 'src/validation';
 | 
			
		||||
const LIBRARY_SCAN_BATCH_SIZE = 5000;
 | 
			
		||||
 | 
			
		||||
@Injectable()
 | 
			
		||||
export class LibraryService {
 | 
			
		||||
export class LibraryService implements OnEvents {
 | 
			
		||||
  private configCore: SystemConfigCore;
 | 
			
		||||
  private watchLibraries = false;
 | 
			
		||||
  private watchLock = false;
 | 
			
		||||
@ -66,7 +65,7 @@ export class LibraryService {
 | 
			
		||||
    this.configCore = SystemConfigCore.create(systemMetadataRepository, this.logger);
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  async init() {
 | 
			
		||||
  async onBootstrapEvent() {
 | 
			
		||||
    const config = await this.configCore.getConfig({ withCache: false });
 | 
			
		||||
 | 
			
		||||
    const { watch, scan } = config.library;
 | 
			
		||||
@ -103,8 +102,7 @@ export class LibraryService {
 | 
			
		||||
    });
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  @OnServerEvent(ServerAsyncEvent.CONFIG_VALIDATE)
 | 
			
		||||
  onValidateConfig({ newConfig }: ServerAsyncEventMap[ServerAsyncEvent.CONFIG_VALIDATE]) {
 | 
			
		||||
  onConfigValidateEvent({ newConfig }: SystemConfigUpdate) {
 | 
			
		||||
    const { scan } = newConfig.library;
 | 
			
		||||
    if (!validateCronExpression(scan.cronExpression)) {
 | 
			
		||||
      throw new Error(`Invalid cron expression ${scan.cronExpression}`);
 | 
			
		||||
@ -189,7 +187,7 @@ export class LibraryService {
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  async teardown() {
 | 
			
		||||
  async onShutdownEvent() {
 | 
			
		||||
    await this.unwatchAll();
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -95,16 +95,16 @@ describe(MetadataService.name, () => {
 | 
			
		||||
  });
 | 
			
		||||
 | 
			
		||||
  afterEach(async () => {
 | 
			
		||||
    await sut.teardown();
 | 
			
		||||
    await sut.onShutdownEvent();
 | 
			
		||||
  });
 | 
			
		||||
 | 
			
		||||
  it('should be defined', () => {
 | 
			
		||||
    expect(sut).toBeDefined();
 | 
			
		||||
  });
 | 
			
		||||
 | 
			
		||||
  describe('init', () => {
 | 
			
		||||
  describe('onBootstrapEvent', () => {
 | 
			
		||||
    it('should pause and resume queue during init', async () => {
 | 
			
		||||
      await sut.init();
 | 
			
		||||
      await sut.onBootstrapEvent('microservices');
 | 
			
		||||
 | 
			
		||||
      expect(jobMock.pause).toHaveBeenCalledTimes(1);
 | 
			
		||||
      expect(mapMock.init).toHaveBeenCalledTimes(1);
 | 
			
		||||
@ -114,7 +114,7 @@ describe(MetadataService.name, () => {
 | 
			
		||||
    it('should return if reverse geocoding is disabled', async () => {
 | 
			
		||||
      systemMock.get.mockResolvedValue({ reverseGeocoding: { enabled: false } });
 | 
			
		||||
 | 
			
		||||
      await sut.init();
 | 
			
		||||
      await sut.onBootstrapEvent('microservices');
 | 
			
		||||
 | 
			
		||||
      expect(jobMock.pause).not.toHaveBeenCalled();
 | 
			
		||||
      expect(mapMock.init).not.toHaveBeenCalled();
 | 
			
		||||
 | 
			
		||||
@ -5,7 +5,7 @@ import _ from 'lodash';
 | 
			
		||||
import { Duration } from 'luxon';
 | 
			
		||||
import { constants } from 'node:fs/promises';
 | 
			
		||||
import path from 'node:path';
 | 
			
		||||
import { Subscription } from 'rxjs';
 | 
			
		||||
import { SystemConfig } from 'src/config';
 | 
			
		||||
import { StorageCore } from 'src/cores/storage.core';
 | 
			
		||||
import { SystemConfigCore } from 'src/cores/system-config.core';
 | 
			
		||||
import { AssetEntity, AssetType } from 'src/entities/asset.entity';
 | 
			
		||||
@ -14,7 +14,7 @@ import { IAlbumRepository } from 'src/interfaces/album.interface';
 | 
			
		||||
import { IAssetRepository, WithoutProperty } from 'src/interfaces/asset.interface';
 | 
			
		||||
import { ICryptoRepository } from 'src/interfaces/crypto.interface';
 | 
			
		||||
import { DatabaseLock, IDatabaseRepository } from 'src/interfaces/database.interface';
 | 
			
		||||
import { ClientEvent, IEventRepository } from 'src/interfaces/event.interface';
 | 
			
		||||
import { ClientEvent, IEventRepository, OnEvents } from 'src/interfaces/event.interface';
 | 
			
		||||
import {
 | 
			
		||||
  IBaseJob,
 | 
			
		||||
  IEntityJob,
 | 
			
		||||
@ -34,7 +34,6 @@ import { IPersonRepository } from 'src/interfaces/person.interface';
 | 
			
		||||
import { IStorageRepository } from 'src/interfaces/storage.interface';
 | 
			
		||||
import { ISystemMetadataRepository } from 'src/interfaces/system-metadata.interface';
 | 
			
		||||
import { IUserRepository } from 'src/interfaces/user.interface';
 | 
			
		||||
import { handlePromiseError } from 'src/utils/misc';
 | 
			
		||||
import { usePagination } from 'src/utils/pagination';
 | 
			
		||||
 | 
			
		||||
/** look for a date from these tags (in order) */
 | 
			
		||||
@ -97,10 +96,9 @@ const validate = <T>(value: T): NonNullable<T> | null => {
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
@Injectable()
 | 
			
		||||
export class MetadataService {
 | 
			
		||||
export class MetadataService implements OnEvents {
 | 
			
		||||
  private storageCore: StorageCore;
 | 
			
		||||
  private configCore: SystemConfigCore;
 | 
			
		||||
  private subscription: Subscription | null = null;
 | 
			
		||||
 | 
			
		||||
  constructor(
 | 
			
		||||
    @Inject(IAlbumRepository) private albumRepository: IAlbumRepository,
 | 
			
		||||
@ -132,12 +130,19 @@ export class MetadataService {
 | 
			
		||||
    );
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  async init() {
 | 
			
		||||
    if (!this.subscription) {
 | 
			
		||||
      this.subscription = this.configCore.config$.subscribe(() => handlePromiseError(this.init(), this.logger));
 | 
			
		||||
  async onBootstrapEvent(app: 'api' | 'microservices') {
 | 
			
		||||
    if (app !== 'microservices') {
 | 
			
		||||
      return;
 | 
			
		||||
    }
 | 
			
		||||
    const config = await this.configCore.getConfig({ withCache: false });
 | 
			
		||||
    await this.init(config);
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
    const { reverseGeocoding } = await this.configCore.getConfig({ withCache: false });
 | 
			
		||||
  async onConfigUpdateEvent({ newConfig }: { newConfig: SystemConfig }) {
 | 
			
		||||
    await this.init(newConfig);
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  private async init({ reverseGeocoding }: SystemConfig) {
 | 
			
		||||
    const { enabled } = reverseGeocoding;
 | 
			
		||||
 | 
			
		||||
    if (!enabled) {
 | 
			
		||||
@ -155,8 +160,7 @@ export class MetadataService {
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  async teardown() {
 | 
			
		||||
    this.subscription?.unsubscribe();
 | 
			
		||||
  async onShutdownEvent() {
 | 
			
		||||
    await this.repository.teardown();
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -1,8 +1,8 @@
 | 
			
		||||
import { Injectable } from '@nestjs/common';
 | 
			
		||||
import { OnEvents } from 'src/interfaces/event.interface';
 | 
			
		||||
import { IDeleteFilesJob, JobName } from 'src/interfaces/job.interface';
 | 
			
		||||
import { AssetService } from 'src/services/asset.service';
 | 
			
		||||
import { AuditService } from 'src/services/audit.service';
 | 
			
		||||
import { DatabaseService } from 'src/services/database.service';
 | 
			
		||||
import { DuplicateService } from 'src/services/duplicate.service';
 | 
			
		||||
import { JobService } from 'src/services/job.service';
 | 
			
		||||
import { LibraryService } from 'src/services/library.service';
 | 
			
		||||
@ -14,18 +14,15 @@ import { SessionService } from 'src/services/session.service';
 | 
			
		||||
import { SmartInfoService } from 'src/services/smart-info.service';
 | 
			
		||||
import { StorageTemplateService } from 'src/services/storage-template.service';
 | 
			
		||||
import { StorageService } from 'src/services/storage.service';
 | 
			
		||||
import { SystemConfigService } from 'src/services/system-config.service';
 | 
			
		||||
import { UserService } from 'src/services/user.service';
 | 
			
		||||
import { VersionService } from 'src/services/version.service';
 | 
			
		||||
import { otelShutdown } from 'src/utils/instrumentation';
 | 
			
		||||
 | 
			
		||||
@Injectable()
 | 
			
		||||
export class MicroservicesService {
 | 
			
		||||
export class MicroservicesService implements OnEvents {
 | 
			
		||||
  constructor(
 | 
			
		||||
    private auditService: AuditService,
 | 
			
		||||
    private assetService: AssetService,
 | 
			
		||||
    private configService: SystemConfigService,
 | 
			
		||||
    private databaseService: DatabaseService,
 | 
			
		||||
    private jobService: JobService,
 | 
			
		||||
    private libraryService: LibraryService,
 | 
			
		||||
    private mediaService: MediaService,
 | 
			
		||||
@ -41,11 +38,11 @@ export class MicroservicesService {
 | 
			
		||||
    private versionService: VersionService,
 | 
			
		||||
  ) {}
 | 
			
		||||
 | 
			
		||||
  async init() {
 | 
			
		||||
    await this.databaseService.init();
 | 
			
		||||
    await this.configService.init();
 | 
			
		||||
    await this.libraryService.init();
 | 
			
		||||
    await this.notificationService.init();
 | 
			
		||||
  async onBootstrapEvent(app: 'api' | 'microservices') {
 | 
			
		||||
    if (app !== 'microservices') {
 | 
			
		||||
      return;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    await this.jobService.init({
 | 
			
		||||
      [JobName.ASSET_DELETION]: (data) => this.assetService.handleAssetDeletion(data),
 | 
			
		||||
      [JobName.ASSET_DELETION_CHECK]: () => this.assetService.handleAssetDeletionCheck(),
 | 
			
		||||
@ -95,13 +92,9 @@ export class MicroservicesService {
 | 
			
		||||
      [JobName.NOTIFY_SIGNUP]: (data) => this.notificationService.handleUserSignup(data),
 | 
			
		||||
      [JobName.VERSION_CHECK]: () => this.versionService.handleVersionCheck(),
 | 
			
		||||
    });
 | 
			
		||||
 | 
			
		||||
    await this.metadataService.init();
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  async teardown() {
 | 
			
		||||
    await this.libraryService.teardown();
 | 
			
		||||
    await this.metadataService.teardown();
 | 
			
		||||
  async onShutdown() {
 | 
			
		||||
    await otelShutdown();
 | 
			
		||||
  }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -1,12 +1,11 @@
 | 
			
		||||
import { HttpException, HttpStatus, Inject, Injectable } from '@nestjs/common';
 | 
			
		||||
import { DEFAULT_EXTERNAL_DOMAIN } from 'src/constants';
 | 
			
		||||
import { SystemConfigCore } from 'src/cores/system-config.core';
 | 
			
		||||
import { OnServerEvent } from 'src/decorators';
 | 
			
		||||
import { SystemConfigSmtpDto } from 'src/dtos/system-config.dto';
 | 
			
		||||
import { AlbumEntity } from 'src/entities/album.entity';
 | 
			
		||||
import { IAlbumRepository } from 'src/interfaces/album.interface';
 | 
			
		||||
import { IAssetRepository } from 'src/interfaces/asset.interface';
 | 
			
		||||
import { ServerAsyncEvent, ServerAsyncEventMap } from 'src/interfaces/event.interface';
 | 
			
		||||
import { OnEvents, SystemConfigUpdate } from 'src/interfaces/event.interface';
 | 
			
		||||
import {
 | 
			
		||||
  IEmailJob,
 | 
			
		||||
  IJobRepository,
 | 
			
		||||
@ -23,7 +22,7 @@ import { IUserRepository } from 'src/interfaces/user.interface';
 | 
			
		||||
import { getPreferences } from 'src/utils/preferences';
 | 
			
		||||
 | 
			
		||||
@Injectable()
 | 
			
		||||
export class NotificationService {
 | 
			
		||||
export class NotificationService implements OnEvents {
 | 
			
		||||
  private configCore: SystemConfigCore;
 | 
			
		||||
 | 
			
		||||
  constructor(
 | 
			
		||||
@ -39,13 +38,7 @@ export class NotificationService {
 | 
			
		||||
    this.configCore = SystemConfigCore.create(systemMetadataRepository, logger);
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  init() {
 | 
			
		||||
    // TODO
 | 
			
		||||
    return Promise.resolve();
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  @OnServerEvent(ServerAsyncEvent.CONFIG_VALIDATE)
 | 
			
		||||
  async onValidateConfig({ newConfig }: ServerAsyncEventMap[ServerAsyncEvent.CONFIG_VALIDATE]) {
 | 
			
		||||
  async onConfigValidateEvent({ newConfig }: SystemConfigUpdate) {
 | 
			
		||||
    try {
 | 
			
		||||
      if (newConfig.notifications.smtp.enabled) {
 | 
			
		||||
        await this.notificationRepository.verifySmtp(newConfig.notifications.smtp.transport);
 | 
			
		||||
 | 
			
		||||
@ -14,6 +14,7 @@ import {
 | 
			
		||||
  UsageByUserDto,
 | 
			
		||||
} from 'src/dtos/server-info.dto';
 | 
			
		||||
import { SystemMetadataKey } from 'src/entities/system-metadata.entity';
 | 
			
		||||
import { OnEvents } from 'src/interfaces/event.interface';
 | 
			
		||||
import { ILoggerRepository } from 'src/interfaces/logger.interface';
 | 
			
		||||
import { IServerInfoRepository } from 'src/interfaces/server-info.interface';
 | 
			
		||||
import { IStorageRepository } from 'src/interfaces/storage.interface';
 | 
			
		||||
@ -24,7 +25,7 @@ import { mimeTypes } from 'src/utils/mime-types';
 | 
			
		||||
import { isDuplicateDetectionEnabled, isFacialRecognitionEnabled, isSmartSearchEnabled } from 'src/utils/misc';
 | 
			
		||||
 | 
			
		||||
@Injectable()
 | 
			
		||||
export class ServerInfoService {
 | 
			
		||||
export class ServerInfoService implements OnEvents {
 | 
			
		||||
  private configCore: SystemConfigCore;
 | 
			
		||||
 | 
			
		||||
  constructor(
 | 
			
		||||
@ -38,13 +39,14 @@ export class ServerInfoService {
 | 
			
		||||
    this.configCore = SystemConfigCore.create(systemMetadataRepository, this.logger);
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  async init(): Promise<void> {
 | 
			
		||||
  async onBootstrapEvent(): Promise<void> {
 | 
			
		||||
    const featureFlags = await this.getFeatures();
 | 
			
		||||
    if (featureFlags.configFile) {
 | 
			
		||||
      await this.systemMetadataRepository.set(SystemMetadataKey.ADMIN_ONBOARDING, {
 | 
			
		||||
        isOnboarded: true,
 | 
			
		||||
      });
 | 
			
		||||
    }
 | 
			
		||||
    this.logger.log(`Feature Flags: ${JSON.stringify(await this.getFeatures(), null, 2)}`);
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  async getAboutInfo(): Promise<ServerAboutResponseDto> {
 | 
			
		||||
 | 
			
		||||
@ -2,6 +2,7 @@ import { Inject, Injectable } from '@nestjs/common';
 | 
			
		||||
import { SystemConfigCore } from 'src/cores/system-config.core';
 | 
			
		||||
import { IAssetRepository, WithoutProperty } from 'src/interfaces/asset.interface';
 | 
			
		||||
import { DatabaseLock, IDatabaseRepository } from 'src/interfaces/database.interface';
 | 
			
		||||
import { OnEvents, SystemConfigUpdate } from 'src/interfaces/event.interface';
 | 
			
		||||
import {
 | 
			
		||||
  IBaseJob,
 | 
			
		||||
  IEntityJob,
 | 
			
		||||
@ -19,7 +20,7 @@ import { isSmartSearchEnabled } from 'src/utils/misc';
 | 
			
		||||
import { usePagination } from 'src/utils/pagination';
 | 
			
		||||
 | 
			
		||||
@Injectable()
 | 
			
		||||
export class SmartInfoService {
 | 
			
		||||
export class SmartInfoService implements OnEvents {
 | 
			
		||||
  private configCore: SystemConfigCore;
 | 
			
		||||
 | 
			
		||||
  constructor(
 | 
			
		||||
@ -49,6 +50,12 @@ export class SmartInfoService {
 | 
			
		||||
    await this.jobRepository.resume(QueueName.SMART_SEARCH);
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  async onConfigUpdateEvent({ oldConfig, newConfig }: SystemConfigUpdate) {
 | 
			
		||||
    if (oldConfig.machineLearning.clip.modelName !== newConfig.machineLearning.clip.modelName) {
 | 
			
		||||
      await this.repository.init(newConfig.machineLearning.clip.modelName);
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  async handleQueueEncodeClip({ force }: IBaseJob): Promise<JobStatus> {
 | 
			
		||||
    const { machineLearning } = await this.configCore.getConfig({ withCache: false });
 | 
			
		||||
    if (!isSmartSearchEnabled(machineLearning)) {
 | 
			
		||||
 | 
			
		||||
@ -76,10 +76,10 @@ describe(StorageTemplateService.name, () => {
 | 
			
		||||
    SystemConfigCore.create(systemMock, loggerMock).config$.next(defaults);
 | 
			
		||||
  });
 | 
			
		||||
 | 
			
		||||
  describe('onValidateConfig', () => {
 | 
			
		||||
  describe('onConfigValidateEvent', () => {
 | 
			
		||||
    it('should allow valid templates', () => {
 | 
			
		||||
      expect(() =>
 | 
			
		||||
        sut.onValidateConfig({
 | 
			
		||||
        sut.onConfigValidateEvent({
 | 
			
		||||
          newConfig: {
 | 
			
		||||
            storageTemplate: {
 | 
			
		||||
              template:
 | 
			
		||||
@ -93,7 +93,7 @@ describe(StorageTemplateService.name, () => {
 | 
			
		||||
 | 
			
		||||
    it('should fail for an invalid template', () => {
 | 
			
		||||
      expect(() =>
 | 
			
		||||
        sut.onValidateConfig({
 | 
			
		||||
        sut.onConfigValidateEvent({
 | 
			
		||||
          newConfig: {
 | 
			
		||||
            storageTemplate: {
 | 
			
		||||
              template: '{{foo}}',
 | 
			
		||||
 | 
			
		||||
@ -15,14 +15,13 @@ import {
 | 
			
		||||
} from 'src/constants';
 | 
			
		||||
import { StorageCore, StorageFolder } from 'src/cores/storage.core';
 | 
			
		||||
import { SystemConfigCore } from 'src/cores/system-config.core';
 | 
			
		||||
import { OnServerEvent } from 'src/decorators';
 | 
			
		||||
import { AssetEntity, AssetType } from 'src/entities/asset.entity';
 | 
			
		||||
import { AssetPathType } from 'src/entities/move.entity';
 | 
			
		||||
import { IAlbumRepository } from 'src/interfaces/album.interface';
 | 
			
		||||
import { IAssetRepository } from 'src/interfaces/asset.interface';
 | 
			
		||||
import { ICryptoRepository } from 'src/interfaces/crypto.interface';
 | 
			
		||||
import { DatabaseLock, IDatabaseRepository } from 'src/interfaces/database.interface';
 | 
			
		||||
import { ServerAsyncEvent, ServerAsyncEventMap } from 'src/interfaces/event.interface';
 | 
			
		||||
import { OnEvents, SystemConfigUpdate } from 'src/interfaces/event.interface';
 | 
			
		||||
import { IEntityJob, JOBS_ASSET_PAGINATION_SIZE, JobStatus } from 'src/interfaces/job.interface';
 | 
			
		||||
import { ILoggerRepository } from 'src/interfaces/logger.interface';
 | 
			
		||||
import { IMoveRepository } from 'src/interfaces/move.interface';
 | 
			
		||||
@ -46,7 +45,7 @@ interface RenderMetadata {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@Injectable()
 | 
			
		||||
export class StorageTemplateService {
 | 
			
		||||
export class StorageTemplateService implements OnEvents {
 | 
			
		||||
  private configCore: SystemConfigCore;
 | 
			
		||||
  private storageCore: StorageCore;
 | 
			
		||||
  private _template: {
 | 
			
		||||
@ -88,8 +87,7 @@ export class StorageTemplateService {
 | 
			
		||||
    );
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  @OnServerEvent(ServerAsyncEvent.CONFIG_VALIDATE)
 | 
			
		||||
  onValidateConfig({ newConfig }: ServerAsyncEventMap[ServerAsyncEvent.CONFIG_VALIDATE]) {
 | 
			
		||||
  onConfigValidateEvent({ newConfig }: SystemConfigUpdate) {
 | 
			
		||||
    try {
 | 
			
		||||
      const { compiled } = this.compile(newConfig.storageTemplate.template);
 | 
			
		||||
      this.render(compiled, {
 | 
			
		||||
 | 
			
		||||
@ -20,9 +20,9 @@ describe(StorageService.name, () => {
 | 
			
		||||
    expect(sut).toBeDefined();
 | 
			
		||||
  });
 | 
			
		||||
 | 
			
		||||
  describe('init', () => {
 | 
			
		||||
  describe('onBootstrapEvent', () => {
 | 
			
		||||
    it('should create the library folder on initialization', () => {
 | 
			
		||||
      sut.init();
 | 
			
		||||
      sut.onBootstrapEvent();
 | 
			
		||||
      expect(storageMock.mkdirSync).toHaveBeenCalledWith('upload/library');
 | 
			
		||||
    });
 | 
			
		||||
  });
 | 
			
		||||
 | 
			
		||||
@ -1,11 +1,12 @@
 | 
			
		||||
import { Inject, Injectable } from '@nestjs/common';
 | 
			
		||||
import { StorageCore, StorageFolder } from 'src/cores/storage.core';
 | 
			
		||||
import { OnEvents } from 'src/interfaces/event.interface';
 | 
			
		||||
import { IDeleteFilesJob, JobStatus } from 'src/interfaces/job.interface';
 | 
			
		||||
import { ILoggerRepository } from 'src/interfaces/logger.interface';
 | 
			
		||||
import { IStorageRepository } from 'src/interfaces/storage.interface';
 | 
			
		||||
 | 
			
		||||
@Injectable()
 | 
			
		||||
export class StorageService {
 | 
			
		||||
export class StorageService implements OnEvents {
 | 
			
		||||
  constructor(
 | 
			
		||||
    @Inject(IStorageRepository) private storageRepository: IStorageRepository,
 | 
			
		||||
    @Inject(ILoggerRepository) private logger: ILoggerRepository,
 | 
			
		||||
@ -13,7 +14,7 @@ export class StorageService {
 | 
			
		||||
    this.logger.setContext(StorageService.name);
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  init() {
 | 
			
		||||
  onBootstrapEvent() {
 | 
			
		||||
    const libraryBase = StorageCore.getBaseFolder(StorageFolder.LIBRARY);
 | 
			
		||||
    this.storageRepository.mkdirSync(libraryBase);
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
@ -16,7 +16,6 @@ import { SystemMetadataKey } from 'src/entities/system-metadata.entity';
 | 
			
		||||
import { IEventRepository, ServerEvent } from 'src/interfaces/event.interface';
 | 
			
		||||
import { QueueName } from 'src/interfaces/job.interface';
 | 
			
		||||
import { ILoggerRepository } from 'src/interfaces/logger.interface';
 | 
			
		||||
import { ISearchRepository } from 'src/interfaces/search.interface';
 | 
			
		||||
import { ISystemMetadataRepository } from 'src/interfaces/system-metadata.interface';
 | 
			
		||||
import { SystemConfigService } from 'src/services/system-config.service';
 | 
			
		||||
import { newEventRepositoryMock } from 'test/repositories/event.repository.mock';
 | 
			
		||||
@ -180,14 +179,13 @@ describe(SystemConfigService.name, () => {
 | 
			
		||||
  let systemMock: Mocked<ISystemMetadataRepository>;
 | 
			
		||||
  let eventMock: Mocked<IEventRepository>;
 | 
			
		||||
  let loggerMock: Mocked<ILoggerRepository>;
 | 
			
		||||
  let smartInfoMock: Mocked<ISearchRepository>;
 | 
			
		||||
 | 
			
		||||
  beforeEach(() => {
 | 
			
		||||
    delete process.env.IMMICH_CONFIG_FILE;
 | 
			
		||||
    systemMock = newSystemMetadataRepositoryMock();
 | 
			
		||||
    eventMock = newEventRepositoryMock();
 | 
			
		||||
    loggerMock = newLoggerRepositoryMock();
 | 
			
		||||
    sut = new SystemConfigService(systemMock, eventMock, loggerMock, smartInfoMock);
 | 
			
		||||
    sut = new SystemConfigService(systemMock, eventMock, loggerMock);
 | 
			
		||||
  });
 | 
			
		||||
 | 
			
		||||
  it('should work', () => {
 | 
			
		||||
 | 
			
		||||
@ -13,35 +13,34 @@ import {
 | 
			
		||||
  supportedYearTokens,
 | 
			
		||||
} from 'src/constants';
 | 
			
		||||
import { SystemConfigCore } from 'src/cores/system-config.core';
 | 
			
		||||
import { OnServerEvent } from 'src/decorators';
 | 
			
		||||
import { EventHandlerOptions, OnServerEvent } from 'src/decorators';
 | 
			
		||||
import { SystemConfigDto, SystemConfigTemplateStorageOptionDto, mapConfig } from 'src/dtos/system-config.dto';
 | 
			
		||||
import {
 | 
			
		||||
  ClientEvent,
 | 
			
		||||
  IEventRepository,
 | 
			
		||||
  ServerAsyncEvent,
 | 
			
		||||
  ServerAsyncEventMap,
 | 
			
		||||
  OnEvents,
 | 
			
		||||
  ServerEvent,
 | 
			
		||||
  SystemConfigUpdate,
 | 
			
		||||
} from 'src/interfaces/event.interface';
 | 
			
		||||
import { ILoggerRepository } from 'src/interfaces/logger.interface';
 | 
			
		||||
import { ISearchRepository } from 'src/interfaces/search.interface';
 | 
			
		||||
import { ISystemMetadataRepository } from 'src/interfaces/system-metadata.interface';
 | 
			
		||||
 | 
			
		||||
@Injectable()
 | 
			
		||||
export class SystemConfigService {
 | 
			
		||||
export class SystemConfigService implements OnEvents {
 | 
			
		||||
  private core: SystemConfigCore;
 | 
			
		||||
 | 
			
		||||
  constructor(
 | 
			
		||||
    @Inject(ISystemMetadataRepository) repository: ISystemMetadataRepository,
 | 
			
		||||
    @Inject(IEventRepository) private eventRepository: IEventRepository,
 | 
			
		||||
    @Inject(ILoggerRepository) private logger: ILoggerRepository,
 | 
			
		||||
    @Inject(ISearchRepository) private smartInfoRepository: ISearchRepository,
 | 
			
		||||
  ) {
 | 
			
		||||
    this.logger.setContext(SystemConfigService.name);
 | 
			
		||||
    this.core = SystemConfigCore.create(repository, this.logger);
 | 
			
		||||
    this.core.config$.subscribe((config) => this.setLogLevel(config));
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  async init() {
 | 
			
		||||
  @EventHandlerOptions({ priority: -100 })
 | 
			
		||||
  async onBootstrapEvent() {
 | 
			
		||||
    const config = await this.core.getConfig({ withCache: false });
 | 
			
		||||
    this.config$.next(config);
 | 
			
		||||
  }
 | 
			
		||||
@ -59,8 +58,7 @@ export class SystemConfigService {
 | 
			
		||||
    return mapConfig(defaults);
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  @OnServerEvent(ServerAsyncEvent.CONFIG_VALIDATE)
 | 
			
		||||
  onValidateConfig({ newConfig, oldConfig }: ServerAsyncEventMap[ServerAsyncEvent.CONFIG_VALIDATE]) {
 | 
			
		||||
  onConfigValidateEvent({ newConfig, oldConfig }: SystemConfigUpdate) {
 | 
			
		||||
    if (!_.isEqual(instanceToPlain(newConfig.logging), oldConfig.logging) && this.getEnvLogLevel()) {
 | 
			
		||||
      throw new Error('Logging cannot be changed while the environment variable IMMICH_LOG_LEVEL is set.');
 | 
			
		||||
    }
 | 
			
		||||
@ -74,10 +72,7 @@ export class SystemConfigService {
 | 
			
		||||
    const oldConfig = await this.core.getConfig({ withCache: false });
 | 
			
		||||
 | 
			
		||||
    try {
 | 
			
		||||
      await this.eventRepository.serverSendAsync(ServerAsyncEvent.CONFIG_VALIDATE, {
 | 
			
		||||
        newConfig: dto,
 | 
			
		||||
        oldConfig,
 | 
			
		||||
      });
 | 
			
		||||
      await this.eventRepository.emit('onConfigValidateEvent', { newConfig: dto, oldConfig });
 | 
			
		||||
    } catch (error) {
 | 
			
		||||
      this.logger.warn(`Unable to save system config due to a validation error: ${error}`);
 | 
			
		||||
      throw new BadRequestException(error instanceof Error ? error.message : error);
 | 
			
		||||
@ -85,12 +80,11 @@ export class SystemConfigService {
 | 
			
		||||
 | 
			
		||||
    const newConfig = await this.core.updateConfig(dto);
 | 
			
		||||
 | 
			
		||||
    // TODO probably move web socket emits to a separate service
 | 
			
		||||
    this.eventRepository.clientBroadcast(ClientEvent.CONFIG_UPDATE, {});
 | 
			
		||||
    this.eventRepository.serverSend(ServerEvent.CONFIG_UPDATE, null);
 | 
			
		||||
    await this.eventRepository.emit('onConfigUpdateEvent', { newConfig, oldConfig });
 | 
			
		||||
 | 
			
		||||
    if (oldConfig.machineLearning.clip.modelName !== newConfig.machineLearning.clip.modelName) {
 | 
			
		||||
      await this.smartInfoRepository.init(newConfig.machineLearning.clip.modelName);
 | 
			
		||||
    }
 | 
			
		||||
    return mapConfig(newConfig);
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
@ -115,7 +109,7 @@ export class SystemConfigService {
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  @OnServerEvent(ServerEvent.CONFIG_UPDATE)
 | 
			
		||||
  async onConfigUpdate() {
 | 
			
		||||
  async onConfigUpdateEvent() {
 | 
			
		||||
    await this.core.refreshConfig();
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -6,7 +6,7 @@ import { SystemConfigCore } from 'src/cores/system-config.core';
 | 
			
		||||
import { OnServerEvent } from 'src/decorators';
 | 
			
		||||
import { ReleaseNotification, ServerVersionResponseDto } from 'src/dtos/server-info.dto';
 | 
			
		||||
import { SystemMetadataKey, VersionCheckMetadata } from 'src/entities/system-metadata.entity';
 | 
			
		||||
import { ClientEvent, IEventRepository, ServerEvent, ServerEventMap } from 'src/interfaces/event.interface';
 | 
			
		||||
import { ClientEvent, IEventRepository, OnEvents, ServerEvent, ServerEventMap } from 'src/interfaces/event.interface';
 | 
			
		||||
import { IJobRepository, JobName, JobStatus } from 'src/interfaces/job.interface';
 | 
			
		||||
import { ILoggerRepository } from 'src/interfaces/logger.interface';
 | 
			
		||||
import { IServerInfoRepository } from 'src/interfaces/server-info.interface';
 | 
			
		||||
@ -22,7 +22,7 @@ const asNotification = ({ checkedAt, releaseVersion }: VersionCheckMetadata): Re
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
@Injectable()
 | 
			
		||||
export class VersionService {
 | 
			
		||||
export class VersionService implements OnEvents {
 | 
			
		||||
  private configCore: SystemConfigCore;
 | 
			
		||||
 | 
			
		||||
  constructor(
 | 
			
		||||
@ -36,7 +36,7 @@ export class VersionService {
 | 
			
		||||
    this.configCore = SystemConfigCore.create(systemMetadataRepository, this.logger);
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  async init(): Promise<void> {
 | 
			
		||||
  async onBootstrapEvent(): Promise<void> {
 | 
			
		||||
    await this.handleVersionCheck();
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										33
									
								
								server/src/utils/events.ts
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										33
									
								
								server/src/utils/events.ts
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,33 @@
 | 
			
		||||
import { ModuleRef, Reflector } from '@nestjs/core';
 | 
			
		||||
import _ from 'lodash';
 | 
			
		||||
import { HandlerOptions } from 'src/decorators';
 | 
			
		||||
import { EmitEvent, EmitEventHandler, IEventRepository, OnEvents, events } from 'src/interfaces/event.interface';
 | 
			
		||||
import { Metadata } from 'src/middleware/auth.guard';
 | 
			
		||||
import { services } from 'src/services';
 | 
			
		||||
 | 
			
		||||
export const setupEventHandlers = (moduleRef: ModuleRef) => {
 | 
			
		||||
  const reflector = moduleRef.get(Reflector, { strict: false });
 | 
			
		||||
  const repository = moduleRef.get<IEventRepository>(IEventRepository);
 | 
			
		||||
  const handlers: Array<{ event: EmitEvent; handler: EmitEventHandler<EmitEvent>; priority: number }> = [];
 | 
			
		||||
 | 
			
		||||
  // discovery
 | 
			
		||||
  for (const Service of services) {
 | 
			
		||||
    const instance = moduleRef.get<OnEvents>(Service);
 | 
			
		||||
    for (const event of events) {
 | 
			
		||||
      const handler = instance[event] as EmitEventHandler<typeof event>;
 | 
			
		||||
      if (typeof handler !== 'function') {
 | 
			
		||||
        continue;
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
      const options = reflector.get<HandlerOptions>(Metadata.EVENT_HANDLER_OPTIONS, handler);
 | 
			
		||||
      const priority = options?.priority || 0;
 | 
			
		||||
 | 
			
		||||
      handlers.push({ event, handler: handler.bind(instance), priority });
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  // register by priority
 | 
			
		||||
  for (const { event, handler } of _.orderBy(handlers, ['priority'], ['asc'])) {
 | 
			
		||||
    repository.on(event, handler);
 | 
			
		||||
  }
 | 
			
		||||
};
 | 
			
		||||
@ -3,9 +3,10 @@ import { Mocked, vitest } from 'vitest';
 | 
			
		||||
 | 
			
		||||
export const newEventRepositoryMock = (): Mocked<IEventRepository> => {
 | 
			
		||||
  return {
 | 
			
		||||
    on: vitest.fn(),
 | 
			
		||||
    emit: vitest.fn() as any,
 | 
			
		||||
    clientSend: vitest.fn(),
 | 
			
		||||
    clientBroadcast: vitest.fn(),
 | 
			
		||||
    serverSend: vitest.fn(),
 | 
			
		||||
    serverSendAsync: vitest.fn(),
 | 
			
		||||
  };
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user