Compare commits

...

2 Commits

Author SHA1 Message Date
bo0tzz 6fec82b772 fix: no lock on queue duplicates 2026-05-14 10:20:50 +02:00
bo0tzz e2e9dd425f fix: locking around concurrency-1 jobs 2026-05-13 15:18:33 +02:00
3 changed files with 167 additions and 158 deletions
+2
View File
@@ -915,6 +915,8 @@ export enum DatabaseLock {
MaintenanceOperation = 621,
MemoryCreation = 777,
VersionCheck = 800,
FacialRecognition = 900,
DuplicateDetection = 1000,
}
export enum MaintenanceAction {
+50 -48
View File
@@ -5,7 +5,7 @@ import { BulkIdErrorReason, BulkIdResponseDto, BulkIdsDto } from 'src/dtos/asset
import { MapAsset, mapAsset } from 'src/dtos/asset-response.dto';
import { AuthDto } from 'src/dtos/auth.dto';
import { DuplicateResolveDto, DuplicateResolveGroupDto, DuplicateResponseDto } from 'src/dtos/duplicate.dto';
import { AssetStatus, AssetVisibility, JobName, JobStatus, Permission, QueueName } from 'src/enum';
import { AssetStatus, AssetVisibility, DatabaseLock, JobName, JobStatus, Permission, QueueName } from 'src/enum';
import { AssetDuplicateResult } from 'src/repositories/search.repository';
import { BaseService } from 'src/services/base.service';
import { JobItem, JobOf } from 'src/types';
@@ -326,60 +326,62 @@ export class DuplicateService extends BaseService {
@OnJob({ name: JobName.AssetDetectDuplicates, queue: QueueName.DuplicateDetection })
async handleSearchDuplicates({ id }: JobOf<JobName.AssetDetectDuplicates>): Promise<JobStatus> {
const { machineLearning } = await this.getConfig({ withCache: true });
if (!isDuplicateDetectionEnabled(machineLearning)) {
return JobStatus.Skipped;
}
return this.databaseRepository.withLock(DatabaseLock.DuplicateDetection, async () => {
const { machineLearning } = await this.getConfig({ withCache: true });
if (!isDuplicateDetectionEnabled(machineLearning)) {
return JobStatus.Skipped;
}
const asset = await this.assetJobRepository.getForSearchDuplicatesJob(id);
if (!asset) {
this.logger.error(`Asset ${id} not found`);
return JobStatus.Failed;
}
const asset = await this.assetJobRepository.getForSearchDuplicatesJob(id);
if (!asset) {
this.logger.error(`Asset ${id} not found`);
return JobStatus.Failed;
}
if (asset.stackId) {
this.logger.debug(`Asset ${id} is part of a stack, skipping`);
return JobStatus.Skipped;
}
if (asset.stackId) {
this.logger.debug(`Asset ${id} is part of a stack, skipping`);
return JobStatus.Skipped;
}
if (asset.visibility === AssetVisibility.Hidden) {
this.logger.debug(`Asset ${id} is not visible, skipping`);
return JobStatus.Skipped;
}
if (asset.visibility === AssetVisibility.Hidden) {
this.logger.debug(`Asset ${id} is not visible, skipping`);
return JobStatus.Skipped;
}
if (asset.visibility === AssetVisibility.Locked) {
this.logger.debug(`Asset ${id} is locked, skipping`);
return JobStatus.Skipped;
}
if (asset.visibility === AssetVisibility.Locked) {
this.logger.debug(`Asset ${id} is locked, skipping`);
return JobStatus.Skipped;
}
if (!asset.embedding) {
this.logger.debug(`Asset ${id} is missing embedding`);
return JobStatus.Failed;
}
if (!asset.embedding) {
this.logger.debug(`Asset ${id} is missing embedding`);
return JobStatus.Failed;
}
const duplicateAssets = await this.duplicateRepository.search({
assetId: asset.id,
embedding: asset.embedding,
maxDistance: machineLearning.duplicateDetection.maxDistance,
type: asset.type,
userIds: [asset.ownerId],
const duplicateAssets = await this.duplicateRepository.search({
assetId: asset.id,
embedding: asset.embedding,
maxDistance: machineLearning.duplicateDetection.maxDistance,
type: asset.type,
userIds: [asset.ownerId],
});
let assetIds = [asset.id];
if (duplicateAssets.length > 0) {
this.logger.debug(
`Found ${duplicateAssets.length} duplicate${duplicateAssets.length === 1 ? '' : 's'} for asset ${asset.id}`,
);
assetIds = await this.updateDuplicates(asset, duplicateAssets);
} else if (asset.duplicateId) {
this.logger.debug(`No duplicates found for asset ${asset.id}, removing duplicateId`);
await this.assetRepository.update({ id: asset.id, duplicateId: null });
}
const duplicatesDetectedAt = new Date();
await this.assetRepository.upsertJobStatus(...assetIds.map((assetId) => ({ assetId, duplicatesDetectedAt })));
return JobStatus.Success;
});
let assetIds = [asset.id];
if (duplicateAssets.length > 0) {
this.logger.debug(
`Found ${duplicateAssets.length} duplicate${duplicateAssets.length === 1 ? '' : 's'} for asset ${asset.id}`,
);
assetIds = await this.updateDuplicates(asset, duplicateAssets);
} else if (asset.duplicateId) {
this.logger.debug(`No duplicates found for asset ${asset.id}, removing duplicateId`);
await this.assetRepository.update({ id: asset.id, duplicateId: null });
}
const duplicatesDetectedAt = new Date();
await this.assetRepository.upsertJobStatus(...assetIds.map((assetId) => ({ assetId, duplicatesDetectedAt })));
return JobStatus.Success;
}
private async updateDuplicates(
+115 -110
View File
@@ -25,6 +25,7 @@ import {
import {
AssetVisibility,
CacheControl,
DatabaseLock,
JobName,
JobStatus,
Permission,
@@ -402,144 +403,148 @@ export class PersonService extends BaseService {
@OnJob({ name: JobName.FacialRecognitionQueueAll, queue: QueueName.FacialRecognition })
async handleQueueRecognizeFaces({ force, nightly }: JobOf<JobName.FacialRecognitionQueueAll>): Promise<JobStatus> {
const { machineLearning } = await this.getConfig({ withCache: false });
if (!isFacialRecognitionEnabled(machineLearning)) {
return JobStatus.Skipped;
}
await this.jobRepository.waitForQueueCompletion(QueueName.ThumbnailGeneration, QueueName.FaceDetection);
if (nightly) {
const [state, latestFaceDate] = await Promise.all([
this.systemMetadataRepository.get(SystemMetadataKey.FacialRecognitionState),
this.personRepository.getLatestFaceDate(),
]);
if (state?.lastRun && latestFaceDate && state.lastRun > latestFaceDate) {
this.logger.debug('Skipping facial recognition nightly since no face has been added since the last run');
return this.databaseRepository.withLock(DatabaseLock.FacialRecognition, async () => {
const { machineLearning } = await this.getConfig({ withCache: false });
if (!isFacialRecognitionEnabled(machineLearning)) {
return JobStatus.Skipped;
}
}
const { waiting } = await this.jobRepository.getJobCounts(QueueName.FacialRecognition);
await this.jobRepository.waitForQueueCompletion(QueueName.ThumbnailGeneration, QueueName.FaceDetection);
if (force) {
await this.personRepository.unassignFaces({ sourceType: SourceType.MachineLearning });
await this.handlePersonCleanup();
await this.personRepository.vacuum({ reindexVectors: false });
} else if (waiting) {
this.logger.debug(
`Skipping facial recognition queueing because ${waiting} job${waiting > 1 ? 's are' : ' is'} already queued`,
);
return JobStatus.Skipped;
}
if (nightly) {
const [state, latestFaceDate] = await Promise.all([
this.systemMetadataRepository.get(SystemMetadataKey.FacialRecognitionState),
this.personRepository.getLatestFaceDate(),
]);
await this.databaseRepository.prewarm(VectorIndex.Face);
const lastRun = new Date().toISOString();
const facePagination = this.personRepository.getAllFaces(
force ? undefined : { personId: null, sourceType: SourceType.MachineLearning },
);
let jobs: { name: JobName.FacialRecognition; data: { id: string; deferred: false } }[] = [];
for await (const face of facePagination) {
jobs.push({ name: JobName.FacialRecognition, data: { id: face.id, deferred: false } });
if (jobs.length === JOBS_ASSET_PAGINATION_SIZE) {
await this.jobRepository.queueAll(jobs);
jobs = [];
if (state?.lastRun && latestFaceDate && state.lastRun > latestFaceDate) {
this.logger.debug('Skipping facial recognition nightly since no face has been added since the last run');
return JobStatus.Skipped;
}
}
}
await this.jobRepository.queueAll(jobs);
const { waiting } = await this.jobRepository.getJobCounts(QueueName.FacialRecognition);
await this.systemMetadataRepository.set(SystemMetadataKey.FacialRecognitionState, { lastRun });
if (force) {
await this.personRepository.unassignFaces({ sourceType: SourceType.MachineLearning });
await this.handlePersonCleanup();
await this.personRepository.vacuum({ reindexVectors: false });
} else if (waiting) {
this.logger.debug(
`Skipping facial recognition queueing because ${waiting} job${waiting > 1 ? 's are' : ' is'} already queued`,
);
return JobStatus.Skipped;
}
return JobStatus.Success;
await this.databaseRepository.prewarm(VectorIndex.Face);
const lastRun = new Date().toISOString();
const facePagination = this.personRepository.getAllFaces(
force ? undefined : { personId: null, sourceType: SourceType.MachineLearning },
);
let jobs: { name: JobName.FacialRecognition; data: { id: string; deferred: false } }[] = [];
for await (const face of facePagination) {
jobs.push({ name: JobName.FacialRecognition, data: { id: face.id, deferred: false } });
if (jobs.length === JOBS_ASSET_PAGINATION_SIZE) {
await this.jobRepository.queueAll(jobs);
jobs = [];
}
}
await this.jobRepository.queueAll(jobs);
await this.systemMetadataRepository.set(SystemMetadataKey.FacialRecognitionState, { lastRun });
return JobStatus.Success;
});
}
@OnJob({ name: JobName.FacialRecognition, queue: QueueName.FacialRecognition })
async handleRecognizeFaces({ id, deferred }: JobOf<JobName.FacialRecognition>): Promise<JobStatus> {
const { machineLearning } = await this.getConfig({ withCache: true });
if (!isFacialRecognitionEnabled(machineLearning)) {
return JobStatus.Skipped;
}
return this.databaseRepository.withLock(DatabaseLock.FacialRecognition, async () => {
const { machineLearning } = await this.getConfig({ withCache: true });
if (!isFacialRecognitionEnabled(machineLearning)) {
return JobStatus.Skipped;
}
const face = await this.personRepository.getFaceForFacialRecognitionJob(id);
if (!face || !face.asset) {
this.logger.warn(`Face ${id} not found`);
return JobStatus.Failed;
}
const face = await this.personRepository.getFaceForFacialRecognitionJob(id);
if (!face || !face.asset) {
this.logger.warn(`Face ${id} not found`);
return JobStatus.Failed;
}
if (face.sourceType !== SourceType.MachineLearning) {
this.logger.warn(`Skipping face ${id} due to source ${face.sourceType}`);
return JobStatus.Skipped;
}
if (face.sourceType !== SourceType.MachineLearning) {
this.logger.warn(`Skipping face ${id} due to source ${face.sourceType}`);
return JobStatus.Skipped;
}
if (!face.faceSearch?.embedding) {
this.logger.warn(`Face ${id} does not have an embedding`);
return JobStatus.Failed;
}
if (!face.faceSearch?.embedding) {
this.logger.warn(`Face ${id} does not have an embedding`);
return JobStatus.Failed;
}
if (face.personId) {
this.logger.debug(`Face ${id} already has a person assigned`);
return JobStatus.Skipped;
}
if (face.personId) {
this.logger.debug(`Face ${id} already has a person assigned`);
return JobStatus.Skipped;
}
const matches = await this.searchRepository.searchFaces({
userIds: [face.asset.ownerId],
embedding: face.faceSearch.embedding,
maxDistance: machineLearning.facialRecognition.maxDistance,
numResults: machineLearning.facialRecognition.minFaces,
minBirthDate: new Date(face.asset.fileCreatedAt),
});
// `matches` also includes the face itself
if (machineLearning.facialRecognition.minFaces > 1 && matches.length <= 1) {
this.logger.debug(`Face ${id} only matched the face itself, skipping`);
return JobStatus.Skipped;
}
this.logger.debug(`Face ${id} has ${matches.length} matches`);
const isCore =
matches.length >= machineLearning.facialRecognition.minFaces &&
face.asset.visibility === AssetVisibility.Timeline;
if (!isCore && !deferred) {
this.logger.debug(`Deferring non-core face ${id} for later processing`);
await this.jobRepository.queue({ name: JobName.FacialRecognition, data: { id, deferred: true } });
return JobStatus.Skipped;
}
let personId = matches.find((match) => match.personId)?.personId;
if (!personId) {
const matchWithPerson = await this.searchRepository.searchFaces({
const matches = await this.searchRepository.searchFaces({
userIds: [face.asset.ownerId],
embedding: face.faceSearch.embedding,
maxDistance: machineLearning.facialRecognition.maxDistance,
numResults: 1,
hasPerson: true,
numResults: machineLearning.facialRecognition.minFaces,
minBirthDate: new Date(face.asset.fileCreatedAt),
});
if (matchWithPerson.length > 0) {
personId = matchWithPerson[0].personId;
// `matches` also includes the face itself
if (machineLearning.facialRecognition.minFaces > 1 && matches.length <= 1) {
this.logger.debug(`Face ${id} only matched the face itself, skipping`);
return JobStatus.Skipped;
}
}
if (isCore && !personId) {
this.logger.log(`Creating new person for face ${id}`);
const newPerson = await this.personRepository.create({ ownerId: face.asset.ownerId, faceAssetId: face.id });
await this.jobRepository.queue({ name: JobName.PersonGenerateThumbnail, data: { id: newPerson.id } });
personId = newPerson.id;
}
this.logger.debug(`Face ${id} has ${matches.length} matches`);
if (personId) {
this.logger.debug(`Assigning face ${id} to person ${personId}`);
await this.personRepository.reassignFaces({ faceIds: [id], newPersonId: personId });
}
const isCore =
matches.length >= machineLearning.facialRecognition.minFaces &&
face.asset.visibility === AssetVisibility.Timeline;
if (!isCore && !deferred) {
this.logger.debug(`Deferring non-core face ${id} for later processing`);
await this.jobRepository.queue({ name: JobName.FacialRecognition, data: { id, deferred: true } });
return JobStatus.Skipped;
}
return JobStatus.Success;
let personId = matches.find((match) => match.personId)?.personId;
if (!personId) {
const matchWithPerson = await this.searchRepository.searchFaces({
userIds: [face.asset.ownerId],
embedding: face.faceSearch.embedding,
maxDistance: machineLearning.facialRecognition.maxDistance,
numResults: 1,
hasPerson: true,
minBirthDate: new Date(face.asset.fileCreatedAt),
});
if (matchWithPerson.length > 0) {
personId = matchWithPerson[0].personId;
}
}
if (isCore && !personId) {
this.logger.log(`Creating new person for face ${id}`);
const newPerson = await this.personRepository.create({ ownerId: face.asset.ownerId, faceAssetId: face.id });
await this.jobRepository.queue({ name: JobName.PersonGenerateThumbnail, data: { id: newPerson.id } });
personId = newPerson.id;
}
if (personId) {
this.logger.debug(`Assigning face ${id} to person ${personId}`);
await this.personRepository.reassignFaces({ faceIds: [id], newPersonId: personId });
}
return JobStatus.Success;
});
}
@OnJob({ name: JobName.PersonFileMigration, queue: QueueName.Migration })