import { CurrentPlugin } from '@extism/extism'; import { WorkflowChanges, WorkflowEventData, WorkflowEventPayload, WorkflowResponse } from '@immich/plugin-sdk'; import { HttpException, UnauthorizedException } from '@nestjs/common'; import _ from 'lodash'; import { join } from 'node:path'; import { OnEvent, OnJob } from 'src/decorators'; import { AlbumsAddAssetsDto } from 'src/dtos/album.dto'; import { BulkIdsDto } from 'src/dtos/asset-ids.response.dto'; import { AuthDto } from 'src/dtos/auth.dto'; import { PluginManifestDto } from 'src/dtos/plugin-manifest.dto'; import { BootstrapEventPriority, DatabaseLock, ImmichWorker, JobName, JobStatus, QueueName, WorkflowTrigger, WorkflowType, } from 'src/enum'; import { ArgOf } from 'src/repositories/event.repository'; import { AlbumService } from 'src/services/album.service'; import { BaseService } from 'src/services/base.service'; import { JobOf } from 'src/types'; const dummy = () => { throw new Error( `Calling host functions is not allowed without setting methods[].hostFunctions=true in the plugin manifest`, ); }; type ExecuteOptions = { read: (type: T) => Promise<{ authUserId: string; data: WorkflowEventData }>; write: (changes: WorkflowChanges) => Promise; }; export class WorkflowExecutionService extends BaseService { private jwtSecret!: string; @OnEvent({ name: 'AppBootstrap', priority: BootstrapEventPriority.PluginSync, workers: [ImmichWorker.Microservices] }) async onPluginSync() { await this.databaseRepository.withLock(DatabaseLock.PluginImport, async () => { // TODO avoid importing plugins in each worker // Can this use system metadata similar to geocoding? const { resourcePaths, plugins } = this.configRepository.getEnv(); await this.importFolder(resourcePaths.corePlugin, { force: true }); if (plugins.external.allow && plugins.external.installFolder) { await this.importFolders(plugins.external.installFolder); } }); } @OnEvent({ name: 'AppBootstrap', priority: BootstrapEventPriority.PluginLoad, workers: [ImmichWorker.Microservices] }) async onPluginLoad() { this.jwtSecret = this.cryptoRepository.randomBytesAsText(32); const albumService = BaseService.create(AlbumService, this); const albumAddAssets = this.wrap<[id: string, dto: BulkIdsDto]>((authDto, args) => albumService.addAssets(authDto, ...args), ); const addAssetsToAlbums = this.wrap<[dto: AlbumsAddAssetsDto]>((authDto, args) => albumService.addAssetsToAlbums(authDto, ...args), ); const functions = { albumAddAssets, addAssetsToAlbums, }; const stubs = { albumAddAssets: dummy, addAssetsToAlbums: dummy, }; const plugins = await this.pluginRepository.getForLoad(); for (const { id, name, version, wasmBytes, methods } of plugins) { const method = methods.some(({ hostFunctions }) => !hostFunctions); if (method) { const label = `${name}@${version}`; const key = this.getPluginKey({ id, hostFunctions: false }); try { await this.pluginRepository.load({ key, label, wasmBytes }, { runInWorker: false, functions: stubs }); this.logger.log(`Loaded plugin: ${label}`); } catch (error) { this.logger.error(`Unable to load plugin ${label} (${id})`, error); } } const methodWithFunction = methods.some(({ hostFunctions }) => hostFunctions); if (methodWithFunction) { const label = `${name}@${version}/worker`; const key = this.getPluginKey({ id, hostFunctions: true }); try { await this.pluginRepository.load({ key, label, wasmBytes }, { runInWorker: true, functions }); this.logger.log(`Loaded plugin with host functions: ${label}`); } catch (error) { this.logger.error(`Unable to load plugin with host functions ${label} (${id})`, error); } } } } private getPluginKey({ id, hostFunctions }: { id: string; hostFunctions: boolean }) { return id + (hostFunctions ? '/worker' : ''); } private wrap(fn: (authDto: AuthDto, args: T) => Promise) { return async (plugin: CurrentPlugin, offset: bigint) => { try { const handle = plugin.read(offset); if (!handle) { return plugin.store( JSON.stringify({ success: false, status: 400, message: 'Called host function without input' }), ); } const { authToken, args } = handle.json() as { authToken: string; args: T }; if (!authToken) { throw new Error('authToken is required'); } const authDto = this.validate(authToken); const response = await fn(authDto, args); return plugin.store(JSON.stringify({ success: true, response })); } catch (error: Error | any) { if (error instanceof HttpException) { this.logger.error(`Plugin host exception: ${error}`); return plugin.store( JSON.stringify({ success: false, status: error.getStatus(), message: error.getResponse() }), ); } this.logger.error(`Plugin host exception: ${error}`, error?.stack); return plugin.store( JSON.stringify({ success: false, status: 500, message: `Internal server error: ${error}`, }), ); } }; } private async importFolders(installFolder: string): Promise { try { const entries = await this.storageRepository.readdirWithTypes(installFolder); for (const entry of entries) { if (!entry.isDirectory()) { continue; } await this.importFolder(join(installFolder, entry.name)); } } catch (error) { this.logger.error(`Failed to import plugins folder ${installFolder}:`, error); } } private async importFolder(folder: string, options?: { force?: boolean }) { try { const manifestPath = join(folder, 'manifest.json'); const dto = await this.storageRepository.readJsonFile(manifestPath); const result = PluginManifestDto.schema.safeParse(dto); if (!result.success) { const issues = result.error.issues.map((issue) => ` - [${issue.path.join('.')}] ${issue.message}`).join('\n'); this.logger.warn(`Invalid plugin manifest at ${manifestPath}:\n${issues}`); return; } const manifest = result.data; const existing = await this.pluginRepository.getByName(manifest.name); if (existing && existing.version === manifest.version && options?.force !== true) { return; } const wasmPath = `${folder}/${manifest.wasmPath}`; const wasmBytes = await this.storageRepository.readFile(wasmPath); const plugin = await this.pluginRepository.upsert( { enabled: true, name: manifest.name, title: manifest.title, description: manifest.description, author: manifest.author, version: manifest.version, wasmBytes, }, manifest.methods, ); if (existing) { this.logger.log( `Upgraded plugin ${manifest.name} (${plugin.methods.length} methods) from ${existing.version} to ${manifest.version} `, ); } else { this.logger.log( `Imported plugin ${manifest.name}@${manifest.version} (${plugin.methods.length} methods) from ${folder}`, ); } return manifest; } catch { this.logger.warn(`Failed to import plugin from ${folder}:`); } } private validate(authToken: string): AuthDto { try { const jwt = this.cryptoRepository.verifyJwt<{ userId: string }>(authToken, this.jwtSecret); if (!jwt.userId) { throw new UnauthorizedException('Invalid token: missing userId'); } return { user: { id: jwt.userId, }, } as AuthDto; } catch (error) { this.logger.error('Token validation failed:', error); throw new UnauthorizedException('Invalid token'); } } private sign(userId: string) { return this.cryptoRepository.signJwt({ userId }, this.jwtSecret); } @OnEvent({ name: 'AssetCreate' }) async onAssetCreate({ asset }: ArgOf<'AssetCreate'>) { const dto = { ownerId: asset.ownerId, trigger: WorkflowTrigger.AssetCreate }; const items = await this.workflowRepository.search(dto); await this.jobRepository.queueAll( items.map((workflow) => ({ name: JobName.WorkflowAssetCreate, data: { workflowId: workflow.id, assetId: asset.id }, })), ); } @OnJob({ name: JobName.WorkflowAssetCreate, queue: QueueName.Workflow }) handleAssetCreate({ workflowId, assetId }: JobOf) { return this.execute(workflowId, (type) => { switch (type) { case WorkflowType.AssetV1: { return { read: async () => { const asset = await this.workflowRepository.getForAssetV1(assetId); return { data: { asset } as any, authUserId: asset.ownerId, }; }, write: async (changes) => { if (changes.asset) { await this.assetRepository.update({ id: assetId, ..._.omitBy( { isFavorite: changes.asset?.isFavorite, visibility: changes.asset?.visibility, }, _.isUndefined, ), }); } }, } satisfies ExecuteOptions; } } }); } private async execute( workflowId: string, getHandler: (type: T) => ExecuteOptions | undefined, ) { const workflow = await this.workflowRepository.getForWorkflowRun(workflowId); if (!workflow) { return; } // TODO infer from steps const type = 'AssetV1' as T; const handler = getHandler(type); if (!handler) { this.logger.error(`Misconfigured workflow ${workflowId}: no handler for type ${type}`); return; } try { const { read, write } = handler; const readResult = await read(type); let data = readResult.data; for (const step of workflow.steps) { const payload: WorkflowEventPayload = { trigger: workflow.trigger, type, config: step.config ?? {}, workflow: { id: workflowId, authToken: this.sign(readResult.authUserId), stepId: step.id, }, data, }; if (step.methodName.startsWith('noop')) { continue; } const result = await this.pluginRepository.callMethod>( { pluginKey: this.getPluginKey({ id: step.pluginId, hostFunctions: step.hostFunctions }), methodName: step.methodName, }, payload, ); if (result?.changes) { await write(result.changes); ({ data } = await read(type)); } const shouldContinue = result?.workflow?.continue ?? true; if (!shouldContinue) { break; } } this.logger.debug(`Workflow ${workflowId} executed successfully`); } catch (error) { this.logger.error(`Error executing workflow ${workflowId}:`, error); return JobStatus.Failed; } } }