bulk dedupe server endpoints

This commit is contained in:
mertalev 2025-05-19 15:19:55 -04:00
parent adb17c4d58
commit f16bdb2a01
No known key found for this signature in database
GPG Key ID: DF6ABC77AAD98C95
6 changed files with 158 additions and 82 deletions

View File

@ -1,7 +1,8 @@
import { Controller, Get } from '@nestjs/common'; import { Body, Controller, Get, Post } from '@nestjs/common';
import { ApiTags } from '@nestjs/swagger'; import { ApiTags } from '@nestjs/swagger';
import { AuthDto } from 'src/dtos/auth.dto'; import { AuthDto } from 'src/dtos/auth.dto';
import { DuplicateResponseDto } from 'src/dtos/duplicate.dto'; import { DeduplicateAllDto, DuplicateResponseDto } from 'src/dtos/duplicate.dto';
import { Permission } from 'src/enum';
import { Auth, Authenticated } from 'src/middleware/auth.guard'; import { Auth, Authenticated } from 'src/middleware/auth.guard';
import { DuplicateService } from 'src/services/duplicate.service'; import { DuplicateService } from 'src/services/duplicate.service';
@ -15,4 +16,16 @@ export class DuplicateController {
getAssetDuplicates(@Auth() auth: AuthDto): Promise<DuplicateResponseDto[]> { getAssetDuplicates(@Auth() auth: AuthDto): Promise<DuplicateResponseDto[]> {
return this.service.getDuplicates(auth); return this.service.getDuplicates(auth);
} }
@Post('/bulk/keep')
@Authenticated({ permission: Permission.ASSET_UPDATE })
async keepAll(@Auth() auth: AuthDto) {
await this.service.keepAll(auth);
}
@Post('/bulk/deduplicate')
@Authenticated({ permission: Permission.ASSET_DELETE })
async deduplicateAll(@Auth() auth: AuthDto, @Body() dto: DeduplicateAllDto) {
await this.service.deduplicateAll(auth, dto);
}
} }

View File

@ -12,3 +12,9 @@ export class ResolveDuplicatesDto {
@ValidateUUID({ each: true }) @ValidateUUID({ each: true })
assetIds!: string[]; assetIds!: string[];
} }
export class DeduplicateAllDto {
@IsNotEmpty()
@ValidateUUID({ each: true })
assetIdsToKeep!: string[];
}

View File

@ -146,10 +146,17 @@ export class AssetJobRepository {
@GenerateSql({ params: [], stream: true }) @GenerateSql({ params: [], stream: true })
streamForSearchDuplicates(force?: boolean) { streamForSearchDuplicates(force?: boolean) {
return this.assetsWithPreviews() return this.db
.where((eb) => eb.not((eb) => eb.exists(eb.selectFrom('smart_search').whereRef('assetId', '=', 'assets.id')))) .selectFrom('assets')
.$if(!force, (qb) => qb.where('job_status.duplicatesDetectedAt', 'is', null))
.select(['assets.id']) .select(['assets.id'])
.where('assets.visibility', '!=', AssetVisibility.HIDDEN)
.where('assets.deletedAt', 'is', null)
.innerJoin('smart_search', 'assets.id', 'smart_search.assetId')
.$if(!force, (qb) =>
qb
.innerJoin('asset_job_status as job_status', 'assetId', 'assets.id')
.where('job_status.duplicatesDetectedAt', 'is', null),
)
.stream(); .stream();
} }

View File

@ -632,57 +632,100 @@ export class AssetRepository {
@GenerateSql({ params: [DummyValue.UUID] }) @GenerateSql({ params: [DummyValue.UUID] })
getDuplicates(userId: string) { getDuplicates(userId: string) {
return ( return this.db
this.db .with('duplicates', (qb) =>
.with('duplicates', (qb) => qb
.selectFrom('assets')
.innerJoin('exif', 'assets.id', 'exif.assetId')
.leftJoinLateral(
(qb) =>
qb
.selectFrom(sql`(select 1)`.as('dummy'))
.selectAll('assets')
.select((eb) => eb.table('exif').as('exifInfo'))
.as('asset'),
(join) => join.onTrue(),
)
.select('assets.duplicateId')
.select((eb) => eb.fn.jsonAgg('asset').$castTo<MapAsset[]>().as('assets'))
.where('assets.ownerId', '=', asUuid(userId))
.where('assets.duplicateId', 'is not', null)
.$narrowType<{ duplicateId: NotNull }>()
.where('assets.deletedAt', 'is', null)
.where('assets.visibility', '!=', AssetVisibility.HIDDEN)
.where('assets.stackId', 'is', null)
.groupBy('assets.duplicateId'),
)
.with('unique', (qb) =>
qb
.selectFrom('duplicates')
.select('duplicateId')
.where((eb) => eb(eb.fn('json_array_length', ['assets']), '=', 1)),
)
.with('removed_unique', (qb) =>
qb
.updateTable('assets')
.set({ duplicateId: null })
.from('unique')
.whereRef('assets.duplicateId', '=', 'unique.duplicateId'),
)
.selectFrom('duplicates')
.selectAll()
.where(({ not, exists }) =>
not(exists((eb) => eb.selectFrom('unique').whereRef('unique.duplicateId', '=', 'duplicates.duplicateId'))),
)
.execute();
}
@GenerateSql({ params: [DummyValue.UUID] })
streamDuplicates(userId: string) {
return this.db
.selectFrom('assets')
.innerJoin('exif', 'assets.id', 'exif.assetId')
.innerJoinLateral(
(qb) =>
qb qb
.selectFrom('assets') .selectFrom(sql`(select 1)`.as('dummy'))
.leftJoinLateral( .selectAll('assets')
(qb) => .select((eb) => eb.table('exif').as('exifInfo'))
qb .as('asset'),
.selectFrom('exif') (join) => join.onTrue(),
.selectAll('assets') )
.select((eb) => eb.table('exif').as('exifInfo')) .select('assets.duplicateId')
.whereRef('exif.assetId', '=', 'assets.id') .select((eb) => eb.fn.jsonAgg('asset').as('assets'))
.as('asset'), .where('assets.ownerId', '=', asUuid(userId))
(join) => join.onTrue(), .where('assets.duplicateId', 'is not', null)
) .$narrowType<{ duplicateId: NotNull }>()
.select('assets.duplicateId') .where('assets.deletedAt', 'is', null)
.select((eb) => .where('assets.visibility', '!=', AssetVisibility.HIDDEN)
eb .where('assets.stackId', 'is', null)
.fn('jsonb_agg', [eb.table('asset')]) .groupBy('assets.duplicateId')
.$castTo<MapAsset[]>() .stream();
.as('assets'), }
)
.where('assets.ownerId', '=', asUuid(userId)) @GenerateSql({ params: [DummyValue.UUID] })
.where('assets.duplicateId', 'is not', null) keepAllDuplicates(userId: string) {
.$narrowType<{ duplicateId: NotNull }>() return this.db
.where('assets.deletedAt', 'is', null) .updateTable('assets')
.where('assets.visibility', '!=', AssetVisibility.HIDDEN) .set({ duplicateId: null })
.where('assets.stackId', 'is', null) .where('duplicateId', 'is not', null)
.groupBy('assets.duplicateId'), .where('ownerId', '=', userId)
) .execute();
.with('unique', (qb) => }
qb
.selectFrom('duplicates') deduplicateAll(userId: string, keptAssetIds: string[], deduplicatedStatus: AssetStatus) {
.select('duplicateId') return this.db
.where((eb) => eb(eb.fn('jsonb_array_length', ['assets']), '=', 1)), .with('kept', (qb) =>
) // anyUuid ensures the array is passed as a single parameter, so no need to chunk
.with('removed_unique', (qb) => qb.updateTable('assets').set({ duplicateId: null }).where('id', '=', anyUuid(keptAssetIds)).returning('id'),
qb )
.updateTable('assets') .updateTable('assets')
.set({ duplicateId: null }) .from('kept')
.from('unique') .set({ duplicateId: null, status: deduplicatedStatus })
.whereRef('assets.duplicateId', '=', 'unique.duplicateId'), .whereRef('id', '!=', 'kept.id')
) .where('duplicateId', 'is not', null)
.selectFrom('duplicates') .where('ownerId', '=', userId)
.selectAll() .execute();
// TODO: compare with filtering by jsonb_array_length > 1
.where(({ not, exists }) =>
not(exists((eb) => eb.selectFrom('unique').whereRef('unique.duplicateId', '=', 'duplicates.duplicateId'))),
)
.execute()
);
} }
@GenerateSql({ params: [DummyValue.UUID, { minAssetsPerField: 5, maxFields: 12 }] }) @GenerateSql({ params: [DummyValue.UUID, { minAssetsPerField: 5, maxFields: 12 }] })

View File

@ -3,8 +3,8 @@ import { JOBS_ASSET_PAGINATION_SIZE } from 'src/constants';
import { OnJob } from 'src/decorators'; import { OnJob } from 'src/decorators';
import { mapAsset } from 'src/dtos/asset-response.dto'; import { mapAsset } from 'src/dtos/asset-response.dto';
import { AuthDto } from 'src/dtos/auth.dto'; import { AuthDto } from 'src/dtos/auth.dto';
import { DuplicateResponseDto } from 'src/dtos/duplicate.dto'; import { DeduplicateAllDto, DuplicateResponseDto } from 'src/dtos/duplicate.dto';
import { AssetFileType, AssetVisibility, JobName, JobStatus, QueueName } from 'src/enum'; import { AssetFileType, AssetStatus, AssetVisibility, JobName, JobStatus, QueueName } from 'src/enum';
import { AssetDuplicateResult } from 'src/repositories/search.repository'; import { AssetDuplicateResult } from 'src/repositories/search.repository';
import { BaseService } from 'src/services/base.service'; import { BaseService } from 'src/services/base.service';
import { JobItem, JobOf } from 'src/types'; import { JobItem, JobOf } from 'src/types';
@ -21,6 +21,20 @@ export class DuplicateService extends BaseService {
})); }));
} }
keepAll(auth: AuthDto) {
return this.assetRepository.keepAllDuplicates(auth.user.id);
}
async deduplicateAll(auth: AuthDto, dto: DeduplicateAllDto) {
if (dto.assetIdsToKeep.length === 0) {
return;
}
const { trash } = await this.getConfig({ withCache: false });
const deduplicatedStatus = trash.enabled ? AssetStatus.TRASHED : AssetStatus.DELETED;
return this.assetRepository.deduplicateAll(auth.user.id, dto.assetIdsToKeep, deduplicatedStatus);
}
@OnJob({ name: JobName.QUEUE_DUPLICATE_DETECTION, queue: QueueName.DUPLICATE_DETECTION }) @OnJob({ name: JobName.QUEUE_DUPLICATE_DETECTION, queue: QueueName.DUPLICATE_DETECTION })
async handleQueueSearchDuplicates({ force }: JobOf<JobName.QUEUE_DUPLICATE_DETECTION>): Promise<JobStatus> { async handleQueueSearchDuplicates({ force }: JobOf<JobName.QUEUE_DUPLICATE_DETECTION>): Promise<JobStatus> {
const { machineLearning } = await this.getConfig({ withCache: false }); const { machineLearning } = await this.getConfig({ withCache: false });
@ -29,20 +43,16 @@ export class DuplicateService extends BaseService {
} }
let jobs: JobItem[] = []; let jobs: JobItem[] = [];
const queueAll = async () => {
await this.jobRepository.queueAll(jobs);
jobs = [];
};
const assets = this.assetJobRepository.streamForSearchDuplicates(force); const assets = this.assetJobRepository.streamForSearchDuplicates(force);
for await (const asset of assets) { for await (const asset of assets) {
jobs.push({ name: JobName.DUPLICATE_DETECTION, data: { id: asset.id } }); jobs.push({ name: JobName.DUPLICATE_DETECTION, data: { id: asset.id } });
if (jobs.length >= JOBS_ASSET_PAGINATION_SIZE) { if (jobs.length >= JOBS_ASSET_PAGINATION_SIZE) {
await queueAll(); await this.jobRepository.queueAll(jobs);
jobs = [];
} }
} }
await queueAll(); await this.jobRepository.queueAll(jobs);
return JobStatus.SUCCESS; return JobStatus.SUCCESS;
} }

View File

@ -15,7 +15,7 @@
import { suggestDuplicate } from '$lib/utils/duplicate-utils'; import { suggestDuplicate } from '$lib/utils/duplicate-utils';
import { handleError } from '$lib/utils/handle-error'; import { handleError } from '$lib/utils/handle-error';
import type { AssetResponseDto } from '@immich/sdk'; import type { AssetResponseDto } from '@immich/sdk';
import { deleteAssets, updateAssets } from '@immich/sdk'; import { deduplicateAll, deleteAssets, keepAll, updateAssets } from '@immich/sdk';
import { Button, HStack, IconButton, Text } from '@immich/ui'; import { Button, HStack, IconButton, Text } from '@immich/ui';
import { mdiCheckOutline, mdiInformationOutline, mdiKeyboard, mdiTrashCanOutline } from '@mdi/js'; import { mdiCheckOutline, mdiInformationOutline, mdiKeyboard, mdiTrashCanOutline } from '@mdi/js';
import { t } from 'svelte-i18n'; import { t } from 'svelte-i18n';
@ -101,33 +101,30 @@
}; };
const handleDeduplicateAll = async () => { const handleDeduplicateAll = async () => {
const idsToKeep = duplicates.map((group) => suggestDuplicate(group.assets)).map((asset) => asset?.id); let assetCount = 0;
const idsToDelete = duplicates.flatMap((group, i) => const assetIdsToKeep = duplicates.map((group) => suggestDuplicate(group.assets)!.id);
group.assets.map((asset) => asset.id).filter((asset) => asset !== idsToKeep[i]), for (const group of duplicates) {
); assetCount += group.assets.length;
assetIdsToKeep.push(suggestDuplicate(group.assets)!.id);
}
const dedupedAssetCount = assetCount - assetIdsToKeep.length;
let prompt, confirmText; let prompt, confirmText;
if ($featureFlags.trash) { if ($featureFlags.trash) {
prompt = $t('bulk_trash_duplicates_confirmation', { values: { count: idsToDelete.length } }); prompt = $t('bulk_trash_duplicates_confirmation', { values: { count: dedupedAssetCount } });
confirmText = $t('confirm'); confirmText = $t('confirm');
} else { } else {
prompt = $t('bulk_delete_duplicates_confirmation', { values: { count: idsToDelete.length } }); prompt = $t('bulk_delete_duplicates_confirmation', { values: { count: dedupedAssetCount } });
confirmText = $t('permanently_delete'); confirmText = $t('permanently_delete');
} }
return withConfirmation( return withConfirmation(
async () => { async () => {
await deleteAssets({ assetBulkDeleteDto: { ids: idsToDelete, force: !$featureFlags.trash } }); await deduplicateAll({deduplicateAllDto: { assetIdsToKeep } });
await updateAssets({
assetBulkUpdateDto: {
ids: [...idsToDelete, ...idsToKeep.filter((id): id is string => !!id)],
duplicateId: null,
},
});
duplicates = []; duplicates = [];
deletedNotification(idsToDelete.length); deletedNotification(dedupedAssetCount);
}, },
prompt, prompt,
confirmText, confirmText,
@ -135,10 +132,10 @@
}; };
const handleKeepAll = async () => { const handleKeepAll = async () => {
const ids = duplicates.flatMap((group) => group.assets.map((asset) => asset.id)); const assetCount = duplicates.reduce((acc, cur) => acc + cur.assets.length, 0);
return withConfirmation( return withConfirmation(
async () => { async () => {
await updateAssets({ assetBulkUpdateDto: { ids, duplicateId: null } }); await keepAll();
duplicates = []; duplicates = [];
@ -147,7 +144,7 @@
type: NotificationType.Info, type: NotificationType.Info,
}); });
}, },
$t('bulk_keep_duplicates_confirmation', { values: { count: ids.length } }), $t('bulk_keep_duplicates_confirmation', { values: { count: assetCount } }),
$t('confirm'), $t('confirm'),
); );
}; };