diff --git a/server/src/domain/metadata/metadata.service.spec.ts b/server/src/domain/metadata/metadata.service.spec.ts index cfabc1149a..70589746f8 100644 --- a/server/src/domain/metadata/metadata.service.spec.ts +++ b/server/src/domain/metadata/metadata.service.spec.ts @@ -5,6 +5,7 @@ import { newAssetRepositoryMock, newCommunicationRepositoryMock, newCryptoRepositoryMock, + newDatabaseRepositoryMock, newJobRepositoryMock, newMediaRepositoryMock, newMetadataRepositoryMock, @@ -25,6 +26,7 @@ import { IAssetRepository, ICommunicationRepository, ICryptoRepository, + IDatabaseRepository, IJobRepository, IMediaRepository, IMetadataRepository, @@ -50,6 +52,7 @@ describe(MetadataService.name, () => { let personMock: jest.Mocked; let storageMock: jest.Mocked; let communicationMock: jest.Mocked; + let databaseMock: jest.Mocked; let sut: MetadataService; beforeEach(async () => { @@ -64,19 +67,21 @@ describe(MetadataService.name, () => { communicationMock = newCommunicationRepositoryMock(); storageMock = newStorageRepositoryMock(); mediaMock = newMediaRepositoryMock(); + databaseMock = newDatabaseRepositoryMock(); sut = new MetadataService( albumMock, assetMock, + communicationMock, cryptoRepository, + databaseMock, jobMock, + mediaMock, metadataMock, + moveMock, + personMock, storageMock, configMock, - mediaMock, - moveMock, - communicationMock, - personMock, ); }); diff --git a/server/src/domain/metadata/metadata.service.ts b/server/src/domain/metadata/metadata.service.ts index 13e6110af8..cebb040231 100644 --- a/server/src/domain/metadata/metadata.service.ts +++ b/server/src/domain/metadata/metadata.service.ts @@ -11,11 +11,13 @@ import { usePagination } from '../domain.util'; import { IBaseJob, IEntityJob, ISidecarWriteJob, JOBS_ASSET_PAGINATION_SIZE, JobName, QueueName } from '../job'; import { ClientEvent, + DatabaseLock, ExifDuration, IAlbumRepository, IAssetRepository, ICommunicationRepository, ICryptoRepository, + IDatabaseRepository, IJobRepository, IMediaRepository, IMetadataRepository, @@ -100,15 +102,16 @@ export class MetadataService { constructor( @Inject(IAlbumRepository) private albumRepository: IAlbumRepository, @Inject(IAssetRepository) private assetRepository: IAssetRepository, + @Inject(ICommunicationRepository) private communicationRepository: ICommunicationRepository, @Inject(ICryptoRepository) private cryptoRepository: ICryptoRepository, + @Inject(IDatabaseRepository) private databaseRepository: IDatabaseRepository, @Inject(IJobRepository) private jobRepository: IJobRepository, + @Inject(IMediaRepository) private mediaRepository: IMediaRepository, @Inject(IMetadataRepository) private repository: IMetadataRepository, + @Inject(IMoveRepository) moveRepository: IMoveRepository, + @Inject(IPersonRepository) personRepository: IPersonRepository, @Inject(IStorageRepository) private storageRepository: IStorageRepository, @Inject(ISystemConfigRepository) configRepository: ISystemConfigRepository, - @Inject(IMediaRepository) private mediaRepository: IMediaRepository, - @Inject(IMoveRepository) moveRepository: IMoveRepository, - @Inject(ICommunicationRepository) private communicationRepository: ICommunicationRepository, - @Inject(IPersonRepository) personRepository: IPersonRepository, ) { this.configCore = SystemConfigCore.create(configRepository); this.storageCore = StorageCore.create(assetRepository, moveRepository, personRepository, storageRepository); @@ -128,7 +131,7 @@ export class MetadataService { try { await this.jobRepository.pause(QueueName.METADATA_EXTRACTION); - await this.repository.init(); + await this.databaseRepository.withLock(DatabaseLock.GeodataImport, () => this.repository.init()); await this.jobRepository.resume(QueueName.METADATA_EXTRACTION); this.logger.log(`Initialized local reverse geocoder`); diff --git a/server/src/domain/repositories/database.repository.ts b/server/src/domain/repositories/database.repository.ts index 9075a0a9c7..e69b3fc2fb 100644 --- a/server/src/domain/repositories/database.repository.ts +++ b/server/src/domain/repositories/database.repository.ts @@ -6,6 +6,11 @@ export enum DatabaseExtension { VECTORS = 'vectors', } +export enum DatabaseLock { + GeodataImport = 100, + CLIPDimSize = 512, +} + export const IDatabaseRepository = 'IDatabaseRepository'; export interface IDatabaseRepository { @@ -13,4 +18,7 @@ export interface IDatabaseRepository { getPostgresVersion(): Promise; createExtension(extension: DatabaseExtension): Promise; runMigrations(options?: { transaction?: 'all' | 'none' | 'each' }): Promise; + withLock(lock: DatabaseLock, callback: () => Promise): Promise; + isBusy(lock: DatabaseLock): boolean; + wait(lock: DatabaseLock): Promise; } diff --git a/server/src/domain/smart-info/smart-info.service.spec.ts b/server/src/domain/smart-info/smart-info.service.spec.ts index 661e5fd80f..9686e6ac63 100644 --- a/server/src/domain/smart-info/smart-info.service.spec.ts +++ b/server/src/domain/smart-info/smart-info.service.spec.ts @@ -2,6 +2,7 @@ import { AssetEntity, SystemConfigKey } from '@app/infra/entities'; import { assetStub, newAssetRepositoryMock, + newDatabaseRepositoryMock, newJobRepositoryMock, newMachineLearningRepositoryMock, newSmartInfoRepositoryMock, @@ -10,6 +11,7 @@ import { import { JobName } from '../job'; import { IAssetRepository, + IDatabaseRepository, IJobRepository, IMachineLearningRepository, ISmartInfoRepository, @@ -31,6 +33,7 @@ describe(SmartInfoService.name, () => { let jobMock: jest.Mocked; let smartMock: jest.Mocked; let machineMock: jest.Mocked; + let databaseMock: jest.Mocked; beforeEach(async () => { assetMock = newAssetRepositoryMock(); @@ -38,7 +41,8 @@ describe(SmartInfoService.name, () => { smartMock = newSmartInfoRepositoryMock(); jobMock = newJobRepositoryMock(); machineMock = newMachineLearningRepositoryMock(); - sut = new SmartInfoService(assetMock, configMock, jobMock, smartMock, machineMock); + databaseMock = newDatabaseRepositoryMock(); + sut = new SmartInfoService(assetMock, databaseMock, jobMock, machineMock, smartMock, configMock); assetMock.getByIds.mockResolvedValue([asset]); }); diff --git a/server/src/domain/smart-info/smart-info.service.ts b/server/src/domain/smart-info/smart-info.service.ts index 303c741bc5..5f6cf9b40f 100644 --- a/server/src/domain/smart-info/smart-info.service.ts +++ b/server/src/domain/smart-info/smart-info.service.ts @@ -4,7 +4,9 @@ import { setTimeout } from 'timers/promises'; import { usePagination } from '../domain.util'; import { IBaseJob, IEntityJob, JOBS_ASSET_PAGINATION_SIZE, JobName, QueueName } from '../job'; import { + DatabaseLock, IAssetRepository, + IDatabaseRepository, IJobRepository, IMachineLearningRepository, ISmartInfoRepository, @@ -20,10 +22,11 @@ export class SmartInfoService { constructor( @Inject(IAssetRepository) private assetRepository: IAssetRepository, - @Inject(ISystemConfigRepository) configRepository: ISystemConfigRepository, + @Inject(IDatabaseRepository) private databaseRepository: IDatabaseRepository, @Inject(IJobRepository) private jobRepository: IJobRepository, - @Inject(ISmartInfoRepository) private repository: ISmartInfoRepository, @Inject(IMachineLearningRepository) private machineLearning: IMachineLearningRepository, + @Inject(ISmartInfoRepository) private repository: ISmartInfoRepository, + @Inject(ISystemConfigRepository) configRepository: ISystemConfigRepository, ) { this.configCore = SystemConfigCore.create(configRepository); } @@ -41,7 +44,9 @@ export class SmartInfoService { const { machineLearning } = await this.configCore.getConfig(); - await this.repository.init(machineLearning.clip.modelName); + await this.databaseRepository.withLock(DatabaseLock.CLIPDimSize, () => + this.repository.init(machineLearning.clip.modelName), + ); await this.jobRepository.resume(QueueName.SMART_SEARCH); } @@ -84,6 +89,11 @@ export class SmartInfoService { machineLearning.clip, ); + if (this.databaseRepository.isBusy(DatabaseLock.CLIPDimSize)) { + this.logger.verbose(`Waiting for CLIP dimension size to be updated`); + await this.databaseRepository.wait(DatabaseLock.CLIPDimSize); + } + await this.repository.upsert({ assetId: asset.id }, clipEmbedding); return true; diff --git a/server/src/infra/database-locks.ts b/server/src/infra/database-locks.ts deleted file mode 100644 index f46027316f..0000000000 --- a/server/src/infra/database-locks.ts +++ /dev/null @@ -1,41 +0,0 @@ -import { dataSource } from '@app/infra'; -import AsyncLock from 'async-lock'; -export enum DatabaseLock { - GeodataImport = 100, - CLIPDimSize = 512, -} - -export async function acquireLock(lock: DatabaseLock): Promise { - return dataSource.query('SELECT pg_advisory_lock($1)', [lock]); -} - -export async function releaseLock(lock: DatabaseLock): Promise { - return dataSource.query('SELECT pg_advisory_unlock($1)', [lock]); -} - -export const asyncLock = new AsyncLock(); - -export function RequireLock( - lock: DatabaseLock, -): (target: any, propertyKey: string, descriptor: PropertyDescriptor) => void { - return function (target: any, propertyKey: string, descriptor: PropertyDescriptor): void { - const originalMethod = descriptor.value; - descriptor.value = async function (...args: any[]): Promise { - if (!dataSource.isInitialized) { - await dataSource.initialize(); - } - - let res; - await asyncLock.acquire(DatabaseLock[lock], async () => { - try { - await acquireLock(lock); - res = await originalMethod.apply(this, args); - } finally { - await releaseLock(lock); - } - }); - - return res as any; - }; - }; -} diff --git a/server/src/infra/index.ts b/server/src/infra/index.ts index 44d74d9dba..6a218d81c8 100644 --- a/server/src/infra/index.ts +++ b/server/src/infra/index.ts @@ -1,4 +1,3 @@ -export * from './database-locks'; export * from './database.config'; export * from './infra.config'; export * from './infra.module'; diff --git a/server/src/infra/repositories/database.repository.ts b/server/src/infra/repositories/database.repository.ts index 71875342ef..778d61bd44 100644 --- a/server/src/infra/repositories/database.repository.ts +++ b/server/src/infra/repositories/database.repository.ts @@ -1,10 +1,13 @@ -import { DatabaseExtension, IDatabaseRepository, Version } from '@app/domain'; +import { DatabaseExtension, DatabaseLock, IDatabaseRepository, Version } from '@app/domain'; import { Injectable } from '@nestjs/common'; import { InjectDataSource } from '@nestjs/typeorm'; +import AsyncLock from 'async-lock'; import { DataSource } from 'typeorm'; @Injectable() export class DatabaseRepository implements IDatabaseRepository { + readonly asyncLock = new AsyncLock(); + constructor(@InjectDataSource() private dataSource: DataSource) {} async getExtensionVersion(extension: DatabaseExtension): Promise { @@ -25,4 +28,34 @@ export class DatabaseRepository implements IDatabaseRepository { async runMigrations(options?: { transaction?: 'all' | 'none' | 'each' }): Promise { await this.dataSource.runMigrations(options); } + + async withLock(lock: DatabaseLock, callback: () => Promise): Promise { + let res; + await this.asyncLock.acquire(DatabaseLock[lock], async () => { + try { + await this.acquireLock(lock); + res = await callback(); + } finally { + await this.releaseLock(lock); + } + }); + + return res as R; + } + + isBusy(lock: DatabaseLock): boolean { + return this.asyncLock.isBusy(DatabaseLock[lock]); + } + + async wait(lock: DatabaseLock): Promise { + await this.asyncLock.acquire(DatabaseLock[lock], () => {}); + } + + private async acquireLock(lock: DatabaseLock): Promise { + return this.dataSource.query('SELECT pg_advisory_lock($1)', [lock]); + } + + private async releaseLock(lock: DatabaseLock): Promise { + return this.dataSource.query('SELECT pg_advisory_unlock($1)', [lock]); + } } diff --git a/server/src/infra/repositories/metadata.repository.ts b/server/src/infra/repositories/metadata.repository.ts index f573eb456a..61ef0b5594 100644 --- a/server/src/infra/repositories/metadata.repository.ts +++ b/server/src/infra/repositories/metadata.repository.ts @@ -5,7 +5,6 @@ import { ISystemMetadataRepository, ReverseGeocodeResult, } from '@app/domain'; -import { DatabaseLock, RequireLock } from '@app/infra'; import { GeodataAdmin1Entity, GeodataAdmin2Entity, GeodataPlacesEntity, SystemMetadataKey } from '@app/infra/entities'; import { ImmichLogger } from '@app/infra/logger'; import { Inject } from '@nestjs/common'; @@ -34,7 +33,6 @@ export class MetadataRepository implements IMetadataRepository { private logger = new ImmichLogger(MetadataRepository.name); - @RequireLock(DatabaseLock.GeodataImport) async init(): Promise { this.logger.log('Initializing metadata repository'); const geodataDate = await readFile('/usr/src/resources/geodata-date.txt', 'utf8'); @@ -46,7 +44,17 @@ export class MetadataRepository implements IMetadataRepository { } this.logger.log('Importing geodata to database from file'); + await this.importGeodata(); + await this.systemMetadataRepository.set(SystemMetadataKey.REVERSE_GEOCODING_STATE, { + lastUpdate: geodataDate, + lastImportFileName: CITIES_FILE, + }); + + this.logger.log('Geodata import completed'); + } + + private async importGeodata() { const queryRunner = this.dataSource.createQueryRunner(); await queryRunner.connect(); @@ -65,13 +73,6 @@ export class MetadataRepository implements IMetadataRepository { } finally { await queryRunner.release(); } - - await this.systemMetadataRepository.set(SystemMetadataKey.REVERSE_GEOCODING_STATE, { - lastUpdate: geodataDate, - lastImportFileName: CITIES_FILE, - }); - - this.logger.log('Geodata import completed'); } private async loadGeodataToTableFromFile( diff --git a/server/src/infra/repositories/smart-info.repository.ts b/server/src/infra/repositories/smart-info.repository.ts index 73679af110..dc7d5a2db6 100644 --- a/server/src/infra/repositories/smart-info.repository.ts +++ b/server/src/infra/repositories/smart-info.repository.ts @@ -1,6 +1,5 @@ import { Embedding, EmbeddingSearch, ISmartInfoRepository } from '@app/domain'; import { getCLIPModelInfo } from '@app/domain/smart-info/smart-info.constant'; -import { DatabaseLock, RequireLock, asyncLock } from '@app/infra'; import { AssetEntity, AssetFaceEntity, SmartInfoEntity, SmartSearchEntity } from '@app/infra/entities'; import { ImmichLogger } from '@app/infra/logger'; import { Injectable } from '@nestjs/common'; @@ -121,18 +120,12 @@ export class SmartInfoRepository implements ISmartInfoRepository { } private async upsertEmbedding(assetId: string, embedding: number[]): Promise { - if (asyncLock.isBusy(DatabaseLock[DatabaseLock.CLIPDimSize])) { - this.logger.verbose(`Waiting for CLIP dimension size to be updated`); - await asyncLock.acquire(DatabaseLock[DatabaseLock.CLIPDimSize], () => {}); - } - await this.smartSearchRepository.upsert( { assetId, embedding: () => asVector(embedding, true) }, { conflictPaths: ['assetId'] }, ); } - @RequireLock(DatabaseLock.CLIPDimSize) private async updateDimSize(dimSize: number): Promise { if (!isValidInteger(dimSize, { min: 1, max: 2 ** 16 })) { throw new Error(`Invalid CLIP dimension size: ${dimSize}`); diff --git a/server/test/repositories/database.repository.mock.ts b/server/test/repositories/database.repository.mock.ts index b68a6c277d..d37a4af6e0 100644 --- a/server/test/repositories/database.repository.mock.ts +++ b/server/test/repositories/database.repository.mock.ts @@ -6,5 +6,8 @@ export const newDatabaseRepositoryMock = (): jest.Mocked => getPostgresVersion: jest.fn().mockResolvedValue(new Version(14, 0, 0)), createExtension: jest.fn().mockImplementation(() => Promise.resolve()), runMigrations: jest.fn(), + withLock: jest.fn().mockImplementation((_, func: () => Promise) => func()), + isBusy: jest.fn(), + wait: jest.fn(), }; };