refactor: queue asset deletes via stream (#16706)

This commit is contained in:
Jason Rasmussen 2025-03-07 17:22:57 -05:00 committed by GitHub
parent 086d8a448a
commit 3f06a494a9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 42 additions and 23 deletions

View File

@ -637,6 +637,14 @@ export class AssetRepository {
return this.storageTemplateAssetQuery().stream() as AsyncIterableIterator<StorageAsset>;
}
streamDeletedAssets(trashedBefore: Date) {
return this.db
.selectFrom('assets')
.select(['id', 'isOffline'])
.where('assets.deletedAt', '<=', trashedBefore)
.stream();
}
@GenerateSql(
...Object.values(WithProperty).map((property) => ({
name: property,

View File

@ -11,7 +11,8 @@ import { authStub } from 'test/fixtures/auth.stub';
import { faceStub } from 'test/fixtures/face.stub';
import { partnerStub } from 'test/fixtures/partner.stub';
import { userStub } from 'test/fixtures/user.stub';
import { newTestService, ServiceMocks } from 'test/utils';
import { factory } from 'test/small.factory';
import { makeStream, newTestService, ServiceMocks } from 'test/utils';
import { vitest } from 'vitest';
const stats: AssetStats = {
@ -473,28 +474,30 @@ describe(AssetService.name, () => {
});
it('should immediately queue assets for deletion if trash is disabled', async () => {
mocks.asset.getAll.mockResolvedValue({ hasNextPage: false, items: [assetStub.image] });
const asset = factory.asset({ isOffline: false });
mocks.asset.streamDeletedAssets.mockReturnValue(makeStream([asset]));
mocks.systemMetadata.get.mockResolvedValue({ trash: { enabled: false } });
await expect(sut.handleAssetDeletionCheck()).resolves.toBe(JobStatus.SUCCESS);
expect(mocks.asset.getAll).toHaveBeenCalledWith(expect.anything(), { trashedBefore: new Date() });
expect(mocks.asset.streamDeletedAssets).toHaveBeenCalledWith(new Date());
expect(mocks.job.queueAll).toHaveBeenCalledWith([
{ name: JobName.ASSET_DELETION, data: { id: assetStub.image.id, deleteOnDisk: true } },
{ name: JobName.ASSET_DELETION, data: { id: asset.id, deleteOnDisk: true } },
]);
});
it('should queue assets for deletion after trash duration', async () => {
mocks.asset.getAll.mockResolvedValue({ hasNextPage: false, items: [assetStub.image] });
const asset = factory.asset({ isOffline: false });
mocks.asset.streamDeletedAssets.mockReturnValue(makeStream([asset]));
mocks.systemMetadata.get.mockResolvedValue({ trash: { enabled: true, days: 7 } });
await expect(sut.handleAssetDeletionCheck()).resolves.toBe(JobStatus.SUCCESS);
expect(mocks.asset.getAll).toHaveBeenCalledWith(expect.anything(), {
trashedBefore: DateTime.now().minus({ days: 7 }).toJSDate(),
});
expect(mocks.asset.streamDeletedAssets).toHaveBeenCalledWith(DateTime.now().minus({ days: 7 }).toJSDate());
expect(mocks.job.queueAll).toHaveBeenCalledWith([
{ name: JobName.ASSET_DELETION, data: { id: assetStub.image.id, deleteOnDisk: true } },
{ name: JobName.ASSET_DELETION, data: { id: asset.id, deleteOnDisk: true } },
]);
});
});

View File

@ -25,7 +25,6 @@ import { AssetStatus, JobName, JobStatus, Permission, QueueName } from 'src/enum
import { BaseService } from 'src/services/base.service';
import { ISidecarWriteJob, JobItem, JobOf } from 'src/types';
import { getAssetFiles, getMyPartnerIds, onAfterUnlink, onBeforeLink, onBeforeUnlink } from 'src/utils/asset.util';
import { usePagination } from 'src/utils/pagination';
@Injectable()
export class AssetService extends BaseService {
@ -156,22 +155,30 @@ export class AssetService extends BaseService {
const trashedBefore = DateTime.now()
.minus(Duration.fromObject({ days: trashedDays }))
.toJSDate();
const assetPagination = usePagination(JOBS_ASSET_PAGINATION_SIZE, (pagination) =>
this.assetRepository.getAll(pagination, { trashedBefore }),
);
for await (const assets of assetPagination) {
await this.jobRepository.queueAll(
assets.map((asset) => ({
name: JobName.ASSET_DELETION,
data: {
id: asset.id,
deleteOnDisk: !asset.isOffline,
},
})),
);
let chunk: Array<{ id: string; isOffline: boolean }> = [];
const queueChunk = async () => {
if (chunk.length > 0) {
await this.jobRepository.queueAll(
chunk.map(({ id, isOffline }) => ({
name: JobName.ASSET_DELETION,
data: { id, deleteOnDisk: !isOffline },
})),
);
chunk = [];
}
};
const assets = this.assetRepository.streamDeletedAssets(trashedBefore);
for await (const asset of assets) {
chunk.push(asset);
if (chunk.length >= JOBS_ASSET_PAGINATION_SIZE) {
await queueChunk();
}
}
await queueChunk();
return JobStatus.SUCCESS;
}

View File

@ -46,5 +46,6 @@ export const newAssetRepositoryMock = (): Mocked<RepositoryInterface<AssetReposi
updateByLibraryId: vitest.fn(),
streamStorageTemplateAssets: vitest.fn(),
getStorageTemplateAsset: vitest.fn(),
streamDeletedAssets: vitest.fn(),
};
};