forked from Cutlery/immich
interweave scans
This commit is contained in:
parent
0070b83d8a
commit
69ce4e883a
@ -74,7 +74,6 @@ export enum JobName {
|
||||
LIBRARY_DELETE = 'library-delete',
|
||||
LIBRARY_QUEUE_SCAN_ALL = 'library-queue-all-refresh',
|
||||
LIBRARY_QUEUE_CLEANUP = 'library-queue-cleanup',
|
||||
LIBRARY_SCAN_DELETED = 'library-scan-deleted',
|
||||
LIBRARY_CHECK_OFFLINE = 'library-check-if-online',
|
||||
LIBRARY_REMOVE_OFFLINE = 'library-remove-offline',
|
||||
|
||||
@ -151,7 +150,6 @@ export const JOBS_TO_QUEUE: Record<JobName, QueueName> = {
|
||||
[JobName.LIBRARY_SCAN_ASSET]: QueueName.LIBRARY,
|
||||
[JobName.LIBRARY_SCAN]: QueueName.LIBRARY,
|
||||
[JobName.LIBRARY_DELETE]: QueueName.LIBRARY,
|
||||
[JobName.LIBRARY_SCAN_DELETED]: QueueName.LIBRARY,
|
||||
[JobName.LIBRARY_REMOVE_OFFLINE]: QueueName.LIBRARY,
|
||||
[JobName.LIBRARY_CHECK_OFFLINE]: QueueName.LIBRARY,
|
||||
[JobName.LIBRARY_QUEUE_SCAN_ALL]: QueueName.LIBRARY,
|
||||
|
@ -316,27 +316,6 @@ describe(LibraryService.name, () => {
|
||||
});
|
||||
});
|
||||
|
||||
describe('handleQueueOfflineCheck', () => {
|
||||
it('should queue a check of each asset', async () => {
|
||||
const mockLibraryJob: IEntityJob = {
|
||||
id: libraryStub.externalLibrary1.id,
|
||||
};
|
||||
|
||||
assetMock.getWith.mockResolvedValue({
|
||||
items: [assetStub.external],
|
||||
hasNextPage: false,
|
||||
});
|
||||
|
||||
const result = await sut.handleQueueOfflineCheck(mockLibraryJob);
|
||||
|
||||
expect(result).toEqual(JobStatus.SUCCESS);
|
||||
|
||||
expect(jobMock.queueAll).toHaveBeenCalledWith([
|
||||
{ name: JobName.LIBRARY_CHECK_OFFLINE, data: { id: assetStub.external.id } },
|
||||
]);
|
||||
});
|
||||
});
|
||||
|
||||
describe('handleAssetRefresh', () => {
|
||||
let mockUser: UserEntity;
|
||||
|
||||
@ -1458,35 +1437,6 @@ describe(LibraryService.name, () => {
|
||||
});
|
||||
});
|
||||
|
||||
describe('queueDeletedScan', () => {
|
||||
it('should not queue a deleted scan of upload library', async () => {
|
||||
libraryMock.get.mockResolvedValue(libraryStub.uploadLibrary1);
|
||||
|
||||
await expect(sut.queueDeletedScan(authStub.admin, libraryStub.uploadLibrary1.id)).rejects.toBeInstanceOf(
|
||||
BadRequestException,
|
||||
);
|
||||
|
||||
expect(jobMock.queue).not.toBeCalled();
|
||||
});
|
||||
|
||||
it('should queue a deleted file scan', async () => {
|
||||
libraryMock.get.mockResolvedValue(libraryStub.externalLibrary1);
|
||||
|
||||
await sut.queueDeletedScan(authStub.admin, libraryStub.externalLibrary1.id);
|
||||
|
||||
expect(jobMock.queue.mock.calls).toEqual([
|
||||
[
|
||||
{
|
||||
name: JobName.LIBRARY_SCAN_DELETED,
|
||||
data: {
|
||||
id: libraryStub.externalLibrary1.id,
|
||||
},
|
||||
},
|
||||
],
|
||||
]);
|
||||
});
|
||||
});
|
||||
|
||||
describe('queueEmptyTrash', () => {
|
||||
it('should queue the trash job', async () => {
|
||||
await sut.queueRemoveOffline(libraryStub.externalLibrary1.id);
|
||||
|
@ -8,6 +8,7 @@ import { Stats } from 'node:fs';
|
||||
import path, { basename, parse } from 'node:path';
|
||||
import picomatch from 'picomatch';
|
||||
import { AccessCore } from '../access';
|
||||
import { AuthDto } from '../auth';
|
||||
import { mimeTypes } from '../domain.constant';
|
||||
import { handlePromiseError, usePagination, validateCronExpression } from '../domain.util';
|
||||
import { IBaseJob, IEntityJob, ILibraryFileJob, ILibraryRefreshJob, JOBS_ASSET_PAGINATION_SIZE, JobName } from '../job';
|
||||
@ -549,15 +550,6 @@ export class LibraryService extends EventEmitter {
|
||||
});
|
||||
}
|
||||
|
||||
async queueDeletedScan(auth: AuthDto, id: string) {
|
||||
const library = await this.repository.get(id);
|
||||
if (!library || library.type !== LibraryType.EXTERNAL) {
|
||||
throw new BadRequestException('Can only scan external libraries');
|
||||
}
|
||||
|
||||
await this.jobRepository.queue({ name: JobName.LIBRARY_SCAN_DELETED, data: { id } });
|
||||
}
|
||||
|
||||
async queueRemoveOffline(id: string) {
|
||||
this.logger.verbose(`Removing offline files from library: ${id}`);
|
||||
await this.jobRepository.queue({ name: JobName.LIBRARY_REMOVE_OFFLINE, data: { id } });
|
||||
@ -584,25 +576,6 @@ export class LibraryService extends EventEmitter {
|
||||
return JobStatus.SUCCESS;
|
||||
}
|
||||
|
||||
async handleQueueOfflineCheck(job: IEntityJob): Promise<JobStatus> {
|
||||
this.logger.log(`Finding offline assets in library: ${job.id}`);
|
||||
const onlineAssets = usePagination(JOBS_ASSET_PAGINATION_SIZE, (pagination) =>
|
||||
this.assetRepository.getWith(pagination, WithProperty.IS_ONLINE, job.id),
|
||||
);
|
||||
|
||||
for await (const assets of onlineAssets) {
|
||||
this.logger.debug(`Checking if ${assets.length} assets are still online`);
|
||||
await this.jobRepository.queueAll(
|
||||
assets.map((asset) => ({
|
||||
name: JobName.LIBRARY_CHECK_OFFLINE,
|
||||
data: { id: asset.id },
|
||||
})),
|
||||
);
|
||||
}
|
||||
|
||||
return JobStatus.SUCCESS;
|
||||
}
|
||||
|
||||
// Check if an asset is has no file, marking it as offline
|
||||
async handleOfflineCheck(job: IEntityJob): Promise<JobStatus> {
|
||||
const asset = await this.assetRepository.getById(job.id);
|
||||
@ -662,29 +635,69 @@ export class LibraryService extends EventEmitter {
|
||||
.filter((validation) => validation.isValid)
|
||||
.map((validation) => validation.importPath);
|
||||
|
||||
const generator = this.storageRepository.walk({
|
||||
const crawledAssets = this.storageRepository.walk({
|
||||
pathsToCrawl: validImportPaths,
|
||||
exclusionPatterns: library.exclusionPatterns,
|
||||
});
|
||||
|
||||
const existingAssets = usePagination(JOBS_ASSET_PAGINATION_SIZE, (pagination) =>
|
||||
this.assetRepository.getWith(pagination, WithProperty.IS_ONLINE, job.id),
|
||||
);
|
||||
|
||||
let crawledAssetPaths: string[] = [];
|
||||
|
||||
let pathCounter = 0;
|
||||
let crawlCounter = 0;
|
||||
let crawlDone = false;
|
||||
let existingAssetsDone = false;
|
||||
let existingAssetCounter = 0;
|
||||
|
||||
for await (const filePath of generator) {
|
||||
crawledAssetPaths.push(filePath);
|
||||
pathCounter++;
|
||||
const scanNextAssetPage = async () => {
|
||||
if (!existingAssetsDone) {
|
||||
const existingAssetPage = await existingAssets.next();
|
||||
existingAssetsDone = existingAssetPage.done ?? true;
|
||||
|
||||
if (crawledAssetPaths.length % LIBRARY_SCAN_BATCH_SIZE === 0) {
|
||||
if (existingAssetPage.value) {
|
||||
existingAssetCounter += existingAssetPage.value.length;
|
||||
this.logger.log(
|
||||
`Queuing online check of ${existingAssetPage.value.length} asset(s) in library ${library.id}...`,
|
||||
);
|
||||
await this.jobRepository.queueAll(
|
||||
existingAssetPage.value.map((asset) => ({
|
||||
name: JobName.LIBRARY_CHECK_OFFLINE,
|
||||
data: { id: asset.id },
|
||||
})),
|
||||
);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
while (!crawlDone) {
|
||||
const crawlResult = await crawledAssets.next();
|
||||
|
||||
crawlDone = crawlResult.done ?? true;
|
||||
|
||||
crawledAssetPaths.push(crawlResult.value);
|
||||
crawlCounter++;
|
||||
|
||||
if (crawledAssetPaths.length % LIBRARY_SCAN_BATCH_SIZE === 0 || crawlDone) {
|
||||
this.logger.log(`Queueing scan of ${crawledAssetPaths.length} asset path(s) in library ${library.id}...`);
|
||||
// We have reached the batch size or the end of the generator, scan the assets
|
||||
await this.scanAssets(job.id, crawledAssetPaths, library.ownerId, job.refreshAllFiles ?? false);
|
||||
|
||||
crawledAssetPaths = [];
|
||||
|
||||
// Interweave the queuing of offline checks with the asset scanning (if any)
|
||||
await scanNextAssetPage();
|
||||
}
|
||||
}
|
||||
|
||||
await this.scanAssets(job.id, crawledAssetPaths, library.ownerId, job.refreshAllFiles ?? false);
|
||||
// If there are any remaining assets to check for offline status, do so
|
||||
while (!existingAssetsDone) {
|
||||
await scanNextAssetPage();
|
||||
}
|
||||
|
||||
this.logger.log(`Found ${pathCounter} asset(s) when crawling import paths ${library.importPaths}`);
|
||||
this.logger.log(
|
||||
`Finished queuing scan of ${crawlCounter} crawled and ${existingAssetCounter} existing asset(s) in library ${library.id}`,
|
||||
);
|
||||
|
||||
await this.repository.update({ id: job.id, refreshedAt: new Date() });
|
||||
|
||||
|
@ -65,12 +65,6 @@ export class LibraryController {
|
||||
return this.service.queueScan(id, dto);
|
||||
}
|
||||
|
||||
@Post(':id/scanDeleted')
|
||||
@HttpCode(HttpStatus.NO_CONTENT)
|
||||
scanDeletedFiles(@Auth() auth: AuthDto, @Param() { id }: UUIDParamDto) {
|
||||
return this.service.queueDeletedScan(auth, id);
|
||||
}
|
||||
|
||||
@Post(':id/removeOffline')
|
||||
@HttpCode(HttpStatus.NO_CONTENT)
|
||||
removeOfflineFiles(@Param() { id }: UUIDParamDto) {
|
||||
|
@ -77,7 +77,6 @@ export class AppService {
|
||||
[JobName.LIBRARY_SCAN_ASSET]: (data) => this.libraryService.handleAssetRefresh(data),
|
||||
[JobName.LIBRARY_SCAN]: (data) => this.libraryService.handleQueueAssetRefresh(data),
|
||||
[JobName.LIBRARY_DELETE]: (data) => this.libraryService.handleDeleteLibrary(data),
|
||||
[JobName.LIBRARY_SCAN_DELETED]: (data) => this.libraryService.handleQueueOfflineCheck(data),
|
||||
[JobName.LIBRARY_CHECK_OFFLINE]: (data) => this.libraryService.handleOfflineCheck(data),
|
||||
[JobName.LIBRARY_REMOVE_OFFLINE]: (data) => this.libraryService.handleOfflineRemoval(data),
|
||||
[JobName.LIBRARY_QUEUE_SCAN_ALL]: (data) => this.libraryService.handleQueueAllScan(data),
|
||||
|
Loading…
x
Reference in New Issue
Block a user