Merge 83500dac80657db807418dd75a8da547d0237297 into 4d20b11f256c40e3894c229ed638d7ea04ebdc44

This commit is contained in:
Lena Tauchner 2024-10-01 14:44:22 -04:00 committed by GitHub
commit 541a50fb50
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 45 additions and 18 deletions

View File

@ -1,5 +1,6 @@
import { import {
Action, Action,
AssetBulkUploadCheckItem,
AssetBulkUploadCheckResult, AssetBulkUploadCheckResult,
AssetMediaResponseDto, AssetMediaResponseDto,
AssetMediaStatus, AssetMediaStatus,
@ -11,7 +12,7 @@ import {
getSupportedMediaTypes, getSupportedMediaTypes,
} from '@immich/sdk'; } from '@immich/sdk';
import byteSize from 'byte-size'; import byteSize from 'byte-size';
import { Presets, SingleBar } from 'cli-progress'; import { MultiBar, Presets, SingleBar } from 'cli-progress';
import { chunk } from 'lodash-es'; import { chunk } from 'lodash-es';
import { Stats, createReadStream } from 'node:fs'; import { Stats, createReadStream } from 'node:fs';
import { stat, unlink } from 'node:fs/promises'; import { stat, unlink } from 'node:fs/promises';
@ -90,23 +91,23 @@ export const checkForDuplicates = async (files: string[], { concurrency, skipHas
return { newFiles: files, duplicates: [] }; return { newFiles: files, duplicates: [] };
} }
const progressBar = new SingleBar( const multiBar = new MultiBar(
{ format: 'Checking files | {bar} | {percentage}% | ETA: {eta}s | {value}/{total} assets' }, { format: '{message} | {bar} | {percentage}% | ETA: {eta}s | {value}/{total} assets' },
Presets.shades_classic, Presets.shades_classic,
); );
progressBar.start(files.length, 0); const hashProgressBar = multiBar.create(files.length, 0, { message: 'Hashing files ' });
const checkProgressBar = multiBar.create(files.length, 0, { message: 'Checking for duplicates' });
const newFiles: string[] = []; const newFiles: string[] = [];
const duplicates: Asset[] = []; const duplicates: Asset[] = [];
const queue = new Queue<string[], AssetBulkUploadCheckResults>( const checkBulkUploadQueue = new Queue<AssetBulkUploadCheckItem[], void>(
async (filepaths: string[]) => { async (assets: AssetBulkUploadCheckItem[]) => {
const dto = await Promise.all( const response = await checkBulkUpload({ assetBulkUploadCheckDto: { assets } });
filepaths.map(async (filepath) => ({ id: filepath, checksum: await sha1(filepath) })),
);
const response = await checkBulkUpload({ assetBulkUploadCheckDto: { assets: dto } });
const results = response.results as AssetBulkUploadCheckResults; const results = response.results as AssetBulkUploadCheckResults;
for (const { id: filepath, assetId, action } of results) { for (const { id: filepath, assetId, action } of results) {
if (action === Action.Accept) { if (action === Action.Accept) {
newFiles.push(filepath); newFiles.push(filepath);
@ -115,19 +116,45 @@ export const checkForDuplicates = async (files: string[], { concurrency, skipHas
duplicates.push({ id: assetId as string, filepath }); duplicates.push({ id: assetId as string, filepath });
} }
} }
progressBar.increment(filepaths.length);
checkProgressBar.increment(assets.length);
},
{ concurrency, retry: 3 },
);
const results: { id: string; checksum: string }[] = [];
let checkBulkUploadRequests: AssetBulkUploadCheckItem[] = [];
const queue = new Queue<string, AssetBulkUploadCheckItem[]>(
async (filepath: string): Promise<AssetBulkUploadCheckItem[]> => {
const dto = { id: filepath, checksum: await sha1(filepath) };
results.push(dto);
checkBulkUploadRequests.push(dto);
if (checkBulkUploadRequests.length > 5000) {
void checkBulkUploadQueue.push([...checkBulkUploadRequests]);
checkBulkUploadRequests = [];
}
hashProgressBar.increment();
return results; return results;
}, },
{ concurrency, retry: 3 }, { concurrency, retry: 3 },
); );
for (const items of chunk(files, concurrency)) { for (const item of files) {
await queue.push(items); void queue.push(item);
} }
await queue.drained(); await queue.drained();
progressBar.stop(); if (checkBulkUploadRequests.length > 0) {
void checkBulkUploadQueue.push([...checkBulkUploadRequests]);
}
await checkBulkUploadQueue.drained();
multiBar.stop();
console.log(`Found ${newFiles.length} new files and ${duplicates.length} duplicate${s(duplicates.length)}`); console.log(`Found ${newFiles.length} new files and ${duplicates.length} duplicate${s(duplicates.length)}`);
@ -201,8 +228,8 @@ export const uploadFiles = async (files: string[], { dryRun, concurrency }: Uplo
{ concurrency, retry: 3 }, { concurrency, retry: 3 },
); );
for (const filepath of files) { for (const item of files) {
await queue.push(filepath); void queue.push(item);
} }
await queue.drained(); await queue.drained();

View File

@ -72,8 +72,8 @@ export class Queue<T, R> {
* @returns Promise<void> - The returned Promise will be resolved when all tasks in the queue have been processed by a worker. * @returns Promise<void> - The returned Promise will be resolved when all tasks in the queue have been processed by a worker.
* This promise could be ignored as it will not lead to a `unhandledRejection`. * This promise could be ignored as it will not lead to a `unhandledRejection`.
*/ */
async drained(): Promise<void> { drained(): Promise<void> {
await this.queue.drain(); return this.queue.drained();
} }
/** /**