feat(cli): watch paths for auto uploading daemon (#14923)

* feat(cli): watch paths for auto uploading daemon

* chore: update package-lock

* test(cli): Batcher util calss

* feat(cli): expose batcher params from startWatch()

* test(cli): startWatch() for `--watch`

* refactor(cli): more reliable watcher

* feat(cli): disable progress bar on --no-progress or --watch

* fix(cli): extensions match when upload with watch

* feat(cli): basic logs without progress on upload

* feat(cli): hide progress in uploadFiles()

* refactor(cli): use promise-based setTimeout() instead of hand crafted sleep()

* refactor(cli): unexport UPLOAD_WATCH consts

* refactor(cli): rename fsWatchListener() to onFile()

* test(cli): prefix dot to mocked getSupportedMediaTypes()

* test(cli): add tests for ignored patterns/ unsupported exts

* refactor(cli): minor changes for code reviews

* feat(cli): disable onFile logs when progress bar is enabled
This commit is contained in:
Eli Gao 2025-03-04 02:05:32 +08:00 committed by GitHub
parent 23b1256592
commit e2c34f17ba
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 397 additions and 33 deletions

50
cli/package-lock.json generated
View File

@ -9,9 +9,11 @@
"version": "2.2.52",
"license": "GNU Affero General Public License version 3",
"dependencies": {
"chokidar": "^4.0.3",
"fast-glob": "^3.3.2",
"fastq": "^1.17.1",
"lodash-es": "^4.17.21"
"lodash-es": "^4.17.21",
"micromatch": "^4.0.8"
},
"bin": {
"immich": "dist/index.js"
@ -23,6 +25,7 @@
"@types/byte-size": "^8.1.0",
"@types/cli-progress": "^3.11.0",
"@types/lodash-es": "^4.17.12",
"@types/micromatch": "^4.0.9",
"@types/mock-fs": "^4.13.1",
"@types/node": "^22.13.4",
"@typescript-eslint/eslint-plugin": "^8.15.0",
@ -1427,6 +1430,13 @@
"win32"
]
},
"node_modules/@types/braces": {
"version": "3.0.4",
"resolved": "https://registry.npmjs.org/@types/braces/-/braces-3.0.4.tgz",
"integrity": "sha512-0WR3b8eaISjEW7RpZnclONaLFDf7buaowRHdqLp4vLj54AsSAYWfh3DRbfiYJY9XDxMgx1B4sE1Afw2PGpuHOA==",
"dev": true,
"license": "MIT"
},
"node_modules/@types/byte-size": {
"version": "8.1.2",
"resolved": "https://registry.npmjs.org/@types/byte-size/-/byte-size-8.1.2.tgz",
@ -1472,6 +1482,16 @@
"@types/lodash": "*"
}
},
"node_modules/@types/micromatch": {
"version": "4.0.9",
"resolved": "https://registry.npmjs.org/@types/micromatch/-/micromatch-4.0.9.tgz",
"integrity": "sha512-7V+8ncr22h4UoYRLnLXSpTxjQrNUXtWHGeMPRJt1nULXI57G9bIcpyrHlmrQ7QK24EyyuXvYcSSWAM8GA9nqCg==",
"dev": true,
"license": "MIT",
"dependencies": {
"@types/braces": "*"
}
},
"node_modules/@types/mock-fs": {
"version": "4.13.4",
"resolved": "https://registry.npmjs.org/@types/mock-fs/-/mock-fs-4.13.4.tgz",
@ -2089,6 +2109,21 @@
"node": ">= 16"
}
},
"node_modules/chokidar": {
"version": "4.0.3",
"resolved": "https://registry.npmjs.org/chokidar/-/chokidar-4.0.3.tgz",
"integrity": "sha512-Qgzu8kfBvo+cA4962jnP1KkS6Dop5NS6g7R5LFYJr4b8Ub94PPQXUksCw9PvXoeXPRRddRNC5C1JQUR2SMGtnA==",
"license": "MIT",
"dependencies": {
"readdirp": "^4.0.1"
},
"engines": {
"node": ">= 14.16.0"
},
"funding": {
"url": "https://paulmillr.com/funding/"
}
},
"node_modules/ci-info": {
"version": "4.0.0",
"resolved": "https://registry.npmjs.org/ci-info/-/ci-info-4.0.0.tgz",
@ -3756,6 +3791,19 @@
"node": ">=8"
}
},
"node_modules/readdirp": {
"version": "4.0.2",
"resolved": "https://registry.npmjs.org/readdirp/-/readdirp-4.0.2.tgz",
"integrity": "sha512-yDMz9g+VaZkqBYS/ozoBJwaBhTbZo3UNYQHNRw1D3UFQB8oHB4uS/tAODO+ZLjGWmUbKnIlOWO+aaIiAxrUWHA==",
"license": "MIT",
"engines": {
"node": ">= 14.16.0"
},
"funding": {
"type": "individual",
"url": "https://paulmillr.com/funding/"
}
},
"node_modules/regexp-tree": {
"version": "0.1.27",
"resolved": "https://registry.npmjs.org/regexp-tree/-/regexp-tree-0.1.27.tgz",

View File

@ -19,6 +19,7 @@
"@types/byte-size": "^8.1.0",
"@types/cli-progress": "^3.11.0",
"@types/lodash-es": "^4.17.12",
"@types/micromatch": "^4.0.9",
"@types/mock-fs": "^4.13.1",
"@types/node": "^22.13.4",
"@typescript-eslint/eslint-plugin": "^8.15.0",
@ -62,11 +63,13 @@
"node": ">=20.0.0"
},
"dependencies": {
"chokidar": "^4.0.3",
"fast-glob": "^3.3.2",
"fastq": "^1.17.1",
"lodash-es": "^4.17.21"
"lodash-es": "^4.17.21",
"micromatch": "^4.0.8"
},
"volta": {
"node": "22.14.0"
}
}
}

View File

@ -1,12 +1,13 @@
import * as fs from 'node:fs';
import * as os from 'node:os';
import * as path from 'node:path';
import { describe, expect, it, vi } from 'vitest';
import { setTimeout as sleep } from 'node:timers/promises';
import { describe, expect, it, MockedFunction, vi } from 'vitest';
import { Action, checkBulkUpload, defaults, Reason } from '@immich/sdk';
import { Action, checkBulkUpload, defaults, getSupportedMediaTypes, Reason } from '@immich/sdk';
import createFetchMock from 'vitest-fetch-mock';
import { checkForDuplicates, getAlbumName, uploadFiles, UploadOptionsDto } from './asset';
import { checkForDuplicates, getAlbumName, startWatch, uploadFiles, UploadOptionsDto } from 'src/commands/asset';
vi.mock('@immich/sdk');
@ -199,3 +200,112 @@ describe('checkForDuplicates', () => {
});
});
});
describe('startWatch', () => {
let testFolder: string;
let checkBulkUploadMocked: MockedFunction<typeof checkBulkUpload>;
beforeEach(async () => {
vi.restoreAllMocks();
vi.mocked(getSupportedMediaTypes).mockResolvedValue({
image: ['.jpg'],
sidecar: ['.xmp'],
video: ['.mp4'],
});
testFolder = await fs.promises.mkdtemp(path.join(os.tmpdir(), 'test-startWatch-'));
checkBulkUploadMocked = vi.mocked(checkBulkUpload);
checkBulkUploadMocked.mockResolvedValue({
results: [],
});
});
it('should start watching a directory and upload new files', async () => {
const testFilePath = path.join(testFolder, 'test.jpg');
await startWatch([testFolder], { concurrency: 1 }, { batchSize: 1, debounceTimeMs: 10 });
await sleep(100); // to debounce the watcher from considering the test file as a existing file
await fs.promises.writeFile(testFilePath, 'testjpg');
await vi.waitUntil(() => checkBulkUploadMocked.mock.calls.length > 0, 3000);
expect(checkBulkUpload).toHaveBeenCalledWith({
assetBulkUploadCheckDto: {
assets: [
expect.objectContaining({
id: testFilePath,
}),
],
},
});
});
it('should filter out unsupported files', async () => {
const testFilePath = path.join(testFolder, 'test.jpg');
const unsupportedFilePath = path.join(testFolder, 'test.txt');
await startWatch([testFolder], { concurrency: 1 }, { batchSize: 1, debounceTimeMs: 10 });
await sleep(100); // to debounce the watcher from considering the test file as a existing file
await fs.promises.writeFile(testFilePath, 'testjpg');
await fs.promises.writeFile(unsupportedFilePath, 'testtxt');
await vi.waitUntil(() => checkBulkUploadMocked.mock.calls.length > 0, 3000);
expect(checkBulkUpload).toHaveBeenCalledWith({
assetBulkUploadCheckDto: {
assets: expect.arrayContaining([
expect.objectContaining({
id: testFilePath,
}),
]),
},
});
expect(checkBulkUpload).not.toHaveBeenCalledWith({
assetBulkUploadCheckDto: {
assets: expect.arrayContaining([
expect.objectContaining({
id: unsupportedFilePath,
}),
]),
},
});
});
it('should filger out ignored patterns', async () => {
const testFilePath = path.join(testFolder, 'test.jpg');
const ignoredPattern = 'ignored';
const ignoredFolder = path.join(testFolder, ignoredPattern);
await fs.promises.mkdir(ignoredFolder, { recursive: true });
const ignoredFilePath = path.join(ignoredFolder, 'ignored.jpg');
await startWatch([testFolder], { concurrency: 1, ignore: ignoredPattern }, { batchSize: 1, debounceTimeMs: 10 });
await sleep(100); // to debounce the watcher from considering the test file as a existing file
await fs.promises.writeFile(testFilePath, 'testjpg');
await fs.promises.writeFile(ignoredFilePath, 'ignoredjpg');
await vi.waitUntil(() => checkBulkUploadMocked.mock.calls.length > 0, 3000);
expect(checkBulkUpload).toHaveBeenCalledWith({
assetBulkUploadCheckDto: {
assets: expect.arrayContaining([
expect.objectContaining({
id: testFilePath,
}),
]),
},
});
expect(checkBulkUpload).not.toHaveBeenCalledWith({
assetBulkUploadCheckDto: {
assets: expect.arrayContaining([
expect.objectContaining({
id: ignoredFilePath,
}),
]),
},
});
});
afterEach(async () => {
await fs.promises.rm(testFolder, { recursive: true, force: true });
});
});

View File

@ -12,13 +12,18 @@ import {
getSupportedMediaTypes,
} from '@immich/sdk';
import byteSize from 'byte-size';
import { Matcher, watch as watchFs } from 'chokidar';
import { MultiBar, Presets, SingleBar } from 'cli-progress';
import { chunk } from 'lodash-es';
import micromatch from 'micromatch';
import { Stats, createReadStream } from 'node:fs';
import { stat, unlink } from 'node:fs/promises';
import path, { basename } from 'node:path';
import { Queue } from 'src/queue';
import { BaseOptions, authenticate, crawl, sha1 } from 'src/utils';
import { BaseOptions, Batcher, authenticate, crawl, sha1 } from 'src/utils';
const UPLOAD_WATCH_BATCH_SIZE = 100;
const UPLOAD_WATCH_DEBOUNCE_TIME_MS = 10_000;
const s = (count: number) => (count === 1 ? '' : 's');
@ -36,6 +41,8 @@ export interface UploadOptionsDto {
albumName?: string;
includeHidden?: boolean;
concurrency: number;
progress?: boolean;
watch?: boolean;
}
class UploadFile extends File {
@ -55,19 +62,94 @@ class UploadFile extends File {
}
}
const uploadBatch = async (files: string[], options: UploadOptionsDto) => {
const { newFiles, duplicates } = await checkForDuplicates(files, options);
const newAssets = await uploadFiles(newFiles, options);
await updateAlbums([...newAssets, ...duplicates], options);
await deleteFiles(newFiles, options);
};
export const startWatch = async (
paths: string[],
options: UploadOptionsDto,
{
batchSize = UPLOAD_WATCH_BATCH_SIZE,
debounceTimeMs = UPLOAD_WATCH_DEBOUNCE_TIME_MS,
}: { batchSize?: number; debounceTimeMs?: number } = {},
) => {
const watcherIgnored: Matcher[] = [];
const { image, video } = await getSupportedMediaTypes();
const extensions = new Set([...image, ...video]);
if (options.ignore) {
watcherIgnored.push((path) => micromatch.contains(path, `**/${options.ignore}`));
}
const pathsBatcher = new Batcher<string>({
batchSize,
debounceTimeMs,
onBatch: async (paths: string[]) => {
const uniquePaths = [...new Set(paths)];
await uploadBatch(uniquePaths, options);
},
});
const onFile = async (path: string, stats?: Stats) => {
if (stats?.isDirectory()) {
return;
}
const ext = '.' + path.split('.').pop()?.toLowerCase();
if (!ext || !extensions.has(ext)) {
return;
}
if (!options.progress) {
// logging when progress is disabled as it can cause issues with the progress bar rendering
console.log(`Change detected: ${path}`);
}
pathsBatcher.add(path);
};
const fsWatcher = watchFs(paths, {
ignoreInitial: true,
ignored: watcherIgnored,
alwaysStat: true,
awaitWriteFinish: true,
depth: options.recursive ? undefined : 1,
persistent: true,
})
.on('add', onFile)
.on('change', onFile)
.on('error', (error) => console.error(`Watcher error: ${error}`));
process.on('SIGINT', async () => {
console.log('Exiting...');
await fsWatcher.close();
process.exit();
});
};
export const upload = async (paths: string[], baseOptions: BaseOptions, options: UploadOptionsDto) => {
await authenticate(baseOptions);
const scanFiles = await scan(paths, options);
if (scanFiles.length === 0) {
console.log('No files found, exiting');
return;
if (options.watch) {
console.log('No files found initially.');
} else {
console.log('No files found, exiting');
return;
}
}
const { newFiles, duplicates } = await checkForDuplicates(scanFiles, options);
const newAssets = await uploadFiles(newFiles, options);
await updateAlbums([...newAssets, ...duplicates], options);
await deleteFiles(newFiles, options);
if (options.watch) {
console.log('Watching for changes...');
await startWatch(paths, options);
// watcher does not handle the initial scan
// as the scan() is a more efficient quick start with batched results
}
await uploadBatch(scanFiles, options);
};
const scan = async (pathsToCrawl: string[], options: UploadOptionsDto) => {
@ -85,19 +167,25 @@ const scan = async (pathsToCrawl: string[], options: UploadOptionsDto) => {
return files;
};
export const checkForDuplicates = async (files: string[], { concurrency, skipHash }: UploadOptionsDto) => {
export const checkForDuplicates = async (files: string[], { concurrency, skipHash, progress }: UploadOptionsDto) => {
if (skipHash) {
console.log('Skipping hash check, assuming all files are new');
return { newFiles: files, duplicates: [] };
}
const multiBar = new MultiBar(
{ format: '{message} | {bar} | {percentage}% | ETA: {eta}s | {value}/{total} assets' },
Presets.shades_classic,
);
let multiBar: MultiBar | undefined;
const hashProgressBar = multiBar.create(files.length, 0, { message: 'Hashing files ' });
const checkProgressBar = multiBar.create(files.length, 0, { message: 'Checking for duplicates' });
if (progress) {
multiBar = new MultiBar(
{ format: '{message} | {bar} | {percentage}% | ETA: {eta}s | {value}/{total} assets' },
Presets.shades_classic,
);
} else {
console.log(`Received ${files.length} files, hashing...`);
}
const hashProgressBar = multiBar?.create(files.length, 0, { message: 'Hashing files ' });
const checkProgressBar = multiBar?.create(files.length, 0, { message: 'Checking for duplicates' });
const newFiles: string[] = [];
const duplicates: Asset[] = [];
@ -117,7 +205,7 @@ export const checkForDuplicates = async (files: string[], { concurrency, skipHas
}
}
checkProgressBar.increment(assets.length);
checkProgressBar?.increment(assets.length);
},
{ concurrency, retry: 3 },
);
@ -137,7 +225,7 @@ export const checkForDuplicates = async (files: string[], { concurrency, skipHas
void checkBulkUploadQueue.push(batch);
}
hashProgressBar.increment();
hashProgressBar?.increment();
return results;
},
{ concurrency, retry: 3 },
@ -155,7 +243,7 @@ export const checkForDuplicates = async (files: string[], { concurrency, skipHas
await checkBulkUploadQueue.drained();
multiBar.stop();
multiBar?.stop();
console.log(`Found ${newFiles.length} new files and ${duplicates.length} duplicate${s(duplicates.length)}`);
@ -171,7 +259,10 @@ export const checkForDuplicates = async (files: string[], { concurrency, skipHas
return { newFiles, duplicates };
};
export const uploadFiles = async (files: string[], { dryRun, concurrency }: UploadOptionsDto): Promise<Asset[]> => {
export const uploadFiles = async (
files: string[],
{ dryRun, concurrency, progress }: UploadOptionsDto,
): Promise<Asset[]> => {
if (files.length === 0) {
console.log('All assets were already uploaded, nothing to do.');
return [];
@ -191,12 +282,20 @@ export const uploadFiles = async (files: string[], { dryRun, concurrency }: Uplo
return files.map((filepath) => ({ id: '', filepath }));
}
const uploadProgress = new SingleBar(
{ format: 'Uploading assets | {bar} | {percentage}% | ETA: {eta_formatted} | {value_formatted}/{total_formatted}' },
Presets.shades_classic,
);
uploadProgress.start(totalSize, 0);
uploadProgress.update({ value_formatted: 0, total_formatted: byteSize(totalSize) });
let uploadProgress: SingleBar | undefined;
if (progress) {
uploadProgress = new SingleBar(
{
format: 'Uploading assets | {bar} | {percentage}% | ETA: {eta_formatted} | {value_formatted}/{total_formatted}',
},
Presets.shades_classic,
);
} else {
console.log(`Uploading ${files.length} asset${s(files.length)} (${byteSize(totalSize)})`);
}
uploadProgress?.start(totalSize, 0);
uploadProgress?.update({ value_formatted: 0, total_formatted: byteSize(totalSize) });
let duplicateCount = 0;
let duplicateSize = 0;
@ -222,7 +321,7 @@ export const uploadFiles = async (files: string[], { dryRun, concurrency }: Uplo
successSize += stats.size ?? 0;
}
uploadProgress.update(successSize, { value_formatted: byteSize(successSize + duplicateSize) });
uploadProgress?.update(successSize, { value_formatted: byteSize(successSize + duplicateSize) });
return response;
},
@ -235,7 +334,7 @@ export const uploadFiles = async (files: string[], { dryRun, concurrency }: Uplo
await queue.drained();
uploadProgress.stop();
uploadProgress?.stop();
console.log(`Successfully uploaded ${successCount} new asset${s(successCount)} (${byteSize(successSize)})`);
if (duplicateCount > 0) {

View File

@ -69,6 +69,13 @@ program
.default(4),
)
.addOption(new Option('--delete', 'Delete local assets after upload').env('IMMICH_DELETE_ASSETS'))
.addOption(new Option('--no-progress', 'Hide progress bars').env('IMMICH_PROGRESS_BAR').default(true))
.addOption(
new Option('--watch', 'Watch for changes and upload automatically')
.env('IMMICH_WATCH_CHANGES')
.default(false)
.implies({ progress: false }),
)
.argument('[paths...]', 'One or more paths to assets to be uploaded')
.action((paths, options) => upload(paths, program.opts(), options));

View File

@ -1,6 +1,7 @@
import mockfs from 'mock-fs';
import { readFileSync } from 'node:fs';
import { CrawlOptions, crawl } from 'src/utils';
import { Batcher, CrawlOptions, crawl } from 'src/utils';
import { Mock } from 'vitest';
interface Test {
test: string;
@ -303,3 +304,38 @@ describe('crawl', () => {
}
});
});
describe('Batcher', () => {
let batcher: Batcher;
let onBatch: Mock;
beforeEach(() => {
onBatch = vi.fn();
batcher = new Batcher({ batchSize: 2, onBatch });
});
it('should trigger onBatch() when a batch limit is reached', async () => {
batcher.add('a');
batcher.add('b');
batcher.add('c');
expect(onBatch).toHaveBeenCalledOnce();
expect(onBatch).toHaveBeenCalledWith(['a', 'b']);
});
it('should trigger onBatch() when flush() is called', async () => {
batcher.add('a');
batcher.flush();
expect(onBatch).toHaveBeenCalledOnce();
expect(onBatch).toHaveBeenCalledWith(['a']);
});
it('should trigger onBatch() when debounce time reached', async () => {
vi.useFakeTimers();
batcher = new Batcher({ batchSize: 2, debounceTimeMs: 100, onBatch });
batcher.add('a');
expect(onBatch).not.toHaveBeenCalled();
vi.advanceTimersByTime(200);
expect(onBatch).toHaveBeenCalledOnce();
expect(onBatch).toHaveBeenCalledWith(['a']);
vi.useRealTimers();
});
});

View File

@ -172,3 +172,64 @@ export const sha1 = (filepath: string) => {
rs.on('end', () => resolve(hash.digest('hex')));
});
};
/**
* Batches items and calls onBatch to process them
* when the batch size is reached or the debounce time has passed.
*/
export class Batcher<T = unknown> {
private items: T[] = [];
private readonly batchSize: number;
private readonly debounceTimeMs?: number;
private readonly onBatch: (items: T[]) => void;
private debounceTimer?: NodeJS.Timeout;
constructor({
batchSize,
debounceTimeMs,
onBatch,
}: {
batchSize: number;
debounceTimeMs?: number;
onBatch: (items: T[]) => Promise<void>;
}) {
this.batchSize = batchSize;
this.debounceTimeMs = debounceTimeMs;
this.onBatch = onBatch;
}
private setDebounceTimer() {
if (this.debounceTimer) {
clearTimeout(this.debounceTimer);
}
if (this.debounceTimeMs) {
this.debounceTimer = setTimeout(() => this.flush(), this.debounceTimeMs);
}
}
private clearDebounceTimer() {
if (this.debounceTimer) {
clearTimeout(this.debounceTimer);
this.debounceTimer = undefined;
}
}
add(item: T) {
this.items.push(item);
this.setDebounceTimer();
if (this.items.length >= this.batchSize) {
this.flush();
}
}
flush() {
this.clearDebounceTimer();
if (this.items.length === 0) {
return;
}
this.onBatch(this.items);
this.items = [];
}
}