feat: workflows & plugins (#26727)

feat: plugins

chore: better types

feat: plugins
This commit is contained in:
Jason Rasmussen
2026-05-18 11:09:33 -04:00
committed by GitHub
parent 7384799f19
commit 3d075f2bf8
144 changed files with 6099 additions and 7419 deletions
+61
View File
@@ -58,6 +58,7 @@ import { ViewRepository } from 'src/repositories/view-repository';
import { WebsocketRepository } from 'src/repositories/websocket.repository';
import { WorkflowRepository } from 'src/repositories/workflow.repository';
import { UserTable } from 'src/schema/tables/user.table';
import { ClassConstructor } from 'src/types';
import { AccessRequest, checkAccess, requireAccess } from 'src/utils/access';
import { getConfig, updateConfig } from 'src/utils/config';
@@ -187,6 +188,66 @@ export class BaseService {
);
}
static create<T extends BaseService>(Service: ClassConstructor<T>, ctx: BaseService) {
const service = new Service(
LoggingRepository.create(),
ctx.accessRepository,
ctx.activityRepository,
ctx.albumRepository,
ctx.albumUserRepository,
ctx.apiKeyRepository,
ctx.appRepository,
ctx.assetRepository,
ctx.assetEditRepository,
ctx.assetJobRepository,
ctx.configRepository,
ctx.cronRepository,
ctx.cryptoRepository,
ctx.databaseRepository,
ctx.downloadRepository,
ctx.duplicateRepository,
ctx.emailRepository,
ctx.eventRepository,
ctx.jobRepository,
ctx.libraryRepository,
ctx.machineLearningRepository,
ctx.mapRepository,
ctx.mediaRepository,
ctx.memoryRepository,
ctx.metadataRepository,
ctx.moveRepository,
ctx.notificationRepository,
ctx.oauthRepository,
ctx.ocrRepository,
ctx.partnerRepository,
ctx.personRepository,
ctx.pluginRepository,
ctx.processRepository,
ctx.searchRepository,
ctx.serverInfoRepository,
ctx.sessionRepository,
ctx.sharedLinkRepository,
ctx.sharedLinkAssetRepository,
ctx.stackRepository,
ctx.storageRepository,
ctx.syncRepository,
ctx.syncCheckpointRepository,
ctx.systemMetadataRepository,
ctx.tagRepository,
ctx.telemetryRepository,
ctx.trashRepository,
ctx.userRepository,
ctx.versionRepository,
ctx.viewRepository,
ctx.websocketRepository,
ctx.workflowRepository,
);
service.logger.setContext(this.name);
return service as T;
}
get worker() {
return this.configRepository.getWorker();
}
+2
View File
@@ -44,6 +44,7 @@ import { UserAdminService } from 'src/services/user-admin.service';
import { UserService } from 'src/services/user.service';
import { VersionService } from 'src/services/version.service';
import { ViewService } from 'src/services/view.service';
import { WorkflowExecutionService } from 'src/services/workflow-execution.service';
import { WorkflowService } from 'src/services/workflow.service';
export const services = [
@@ -93,5 +94,6 @@ export const services = [
UserService,
VersionService,
ViewService,
WorkflowExecutionService,
WorkflowService,
];
@@ -1,120 +0,0 @@
import { CurrentPlugin } from '@extism/extism';
import { UnauthorizedException } from '@nestjs/common';
import { Updateable } from 'kysely';
import { Permission } from 'src/enum';
import { AccessRepository } from 'src/repositories/access.repository';
import { AlbumRepository } from 'src/repositories/album.repository';
import { AssetRepository } from 'src/repositories/asset.repository';
import { CryptoRepository } from 'src/repositories/crypto.repository';
import { LoggingRepository } from 'src/repositories/logging.repository';
import { AssetTable } from 'src/schema/tables/asset.table';
import { requireAccess } from 'src/utils/access';
/**
* Plugin host functions that are exposed to WASM plugins via Extism.
* These functions allow plugins to interact with the Immich system.
*/
export class PluginHostFunctions {
constructor(
private assetRepository: AssetRepository,
private albumRepository: AlbumRepository,
private accessRepository: AccessRepository,
private cryptoRepository: CryptoRepository,
private logger: LoggingRepository,
private pluginJwtSecret: string,
) {}
/**
* Creates Extism host function bindings for the plugin.
* These are the functions that WASM plugins can call.
*/
getHostFunctions() {
return {
'extism:host/user': {
updateAsset: (cp: CurrentPlugin, offs: bigint) => this.handleUpdateAsset(cp, offs),
addAssetToAlbum: (cp: CurrentPlugin, offs: bigint) => this.handleAddAssetToAlbum(cp, offs),
},
};
}
/**
* Host function wrapper for updateAsset.
* Reads the input from the plugin, parses it, and calls the actual update function.
*/
private async handleUpdateAsset(cp: CurrentPlugin, offs: bigint) {
const input = JSON.parse(cp.read(offs)!.text());
await this.updateAsset(input);
}
/**
* Host function wrapper for addAssetToAlbum.
* Reads the input from the plugin, parses it, and calls the actual add function.
*/
private async handleAddAssetToAlbum(cp: CurrentPlugin, offs: bigint) {
const input = JSON.parse(cp.read(offs)!.text());
await this.addAssetToAlbum(input);
}
/**
* Validates the JWT token and returns the auth context.
*/
private validateToken(authToken: string): { userId: string } {
try {
const auth = this.cryptoRepository.verifyJwt<{ userId: string }>(authToken, this.pluginJwtSecret);
if (!auth.userId) {
throw new UnauthorizedException('Invalid token: missing userId');
}
return auth;
} catch (error) {
this.logger.error('Token validation failed:', error);
throw new UnauthorizedException('Invalid token');
}
}
/**
* Updates an asset with the given properties.
*/
async updateAsset(input: { authToken: string } & Updateable<AssetTable> & { id: string }) {
const { authToken, id, ...assetData } = input;
// Validate token
const auth = this.validateToken(authToken);
// Check access to the asset
await requireAccess(this.accessRepository, {
auth: { user: { id: auth.userId } } as any,
permission: Permission.AssetUpdate,
ids: [id],
});
this.logger.log(`Updating asset ${id} -- ${JSON.stringify(assetData)}`);
await this.assetRepository.update({ id, ...assetData });
}
/**
* Adds an asset to an album.
*/
async addAssetToAlbum(input: { authToken: string; assetId: string; albumId: string }) {
const { authToken, assetId, albumId } = input;
// Validate token
const auth = this.validateToken(authToken);
// Check access to both the asset and the album
await requireAccess(this.accessRepository, {
auth: { user: { id: auth.userId } } as any,
permission: Permission.AssetRead,
ids: [assetId],
});
await requireAccess(this.accessRepository, {
auth: { user: { id: auth.userId } } as any,
permission: Permission.AlbumUpdate,
ids: [albumId],
});
this.logger.log(`Adding asset ${assetId} to album ${albumId}`);
await this.albumRepository.addAssetIds(albumId, [assetId]);
return 0;
}
}
+17 -296
View File
@@ -1,313 +1,34 @@
import { Plugin as ExtismPlugin, newPlugin } from '@extism/extism';
import { BadRequestException, Injectable } from '@nestjs/common';
import { join } from 'node:path';
import { Asset, WorkflowAction, WorkflowFilter } from 'src/database';
import { OnEvent, OnJob } from 'src/decorators';
import { PluginManifestDto, PluginManifestSchema } from 'src/dtos/plugin-manifest.dto';
import { mapPlugin, PluginResponseDto, PluginTriggerResponseDto } from 'src/dtos/plugin.dto';
import { JobName, JobStatus, PluginTriggerType, QueueName } from 'src/enum';
import { pluginTriggers } from 'src/plugins';
import { ArgOf } from 'src/repositories/event.repository';
import {
mapMethod,
mapPlugin,
PluginMethodResponseDto,
PluginMethodSearchDto,
PluginResponseDto,
PluginSearchDto,
} from 'src/dtos/plugin.dto';
import { BaseService } from 'src/services/base.service';
import { PluginHostFunctions } from 'src/services/plugin-host.functions';
import { IWorkflowJob, JobItem, JobOf, WorkflowData } from 'src/types';
interface WorkflowContext {
authToken: string;
asset: Asset;
}
interface PluginInput<T = unknown> {
authToken: string;
config: T;
data: {
asset: Asset;
};
}
import { isMethodCompatible } from 'src/utils/workflow';
@Injectable()
export class PluginService extends BaseService {
private pluginJwtSecret!: string;
private loadedPlugins: Map<string, ExtismPlugin> = new Map();
private hostFunctions!: PluginHostFunctions;
@OnEvent({ name: 'AppBootstrap' })
async onBootstrap() {
this.pluginJwtSecret = this.cryptoRepository.randomBytesAsText(32);
await this.loadPluginsFromManifests();
this.hostFunctions = new PluginHostFunctions(
this.assetRepository,
this.albumRepository,
this.accessRepository,
this.cryptoRepository,
this.logger,
this.pluginJwtSecret,
);
await this.loadPlugins();
}
getTriggers(): PluginTriggerResponseDto[] {
return pluginTriggers;
}
//
// CRUD operations for plugins
//
async getAll(): Promise<PluginResponseDto[]> {
const plugins = await this.pluginRepository.getAllPlugins();
async search(dto: PluginSearchDto): Promise<PluginResponseDto[]> {
const plugins = await this.pluginRepository.search(dto);
return plugins.map((plugin) => mapPlugin(plugin));
}
async get(id: string): Promise<PluginResponseDto> {
const plugin = await this.pluginRepository.getPlugin(id);
const plugin = await this.pluginRepository.get(id);
if (!plugin) {
throw new BadRequestException('Plugin not found');
}
return mapPlugin(plugin);
}
///////////////////////////////////////////
// Plugin Loader
//////////////////////////////////////////
async loadPluginsFromManifests(): Promise<void> {
// Load core plugin
const { resourcePaths, plugins } = this.configRepository.getEnv();
const coreManifestPath = `${resourcePaths.corePlugin}/manifest.json`;
const coreManifest = await this.readAndValidateManifest(coreManifestPath);
await this.loadPluginToDatabase(coreManifest, resourcePaths.corePlugin);
this.logger.log(`Successfully processed core plugin: ${coreManifest.name} (version ${coreManifest.version})`);
// Load external plugins
if (plugins.external.allow && plugins.external.installFolder) {
await this.loadExternalPlugins(plugins.external.installFolder);
}
}
private async loadExternalPlugins(installFolder: string): Promise<void> {
try {
const entries = await this.pluginRepository.readDirectory(installFolder);
for (const entry of entries) {
if (!entry.isDirectory()) {
continue;
}
const pluginFolder = join(installFolder, entry.name);
const manifestPath = join(pluginFolder, 'manifest.json');
try {
const manifest = await this.readAndValidateManifest(manifestPath);
await this.loadPluginToDatabase(manifest, pluginFolder);
this.logger.log(`Successfully processed external plugin: ${manifest.name} (version ${manifest.version})`);
} catch (error) {
this.logger.warn(`Failed to load external plugin from ${manifestPath}:`, error);
}
}
} catch (error) {
this.logger.error(`Failed to scan external plugins folder ${installFolder}:`, error);
}
}
private async loadPluginToDatabase(manifest: PluginManifestDto, basePath: string): Promise<void> {
const currentPlugin = await this.pluginRepository.getPluginByName(manifest.name);
if (currentPlugin != null && currentPlugin.version === manifest.version) {
this.logger.log(`Plugin ${manifest.name} is up to date (version ${manifest.version}). Skipping`);
return;
}
const { plugin, filters, actions } = await this.pluginRepository.loadPlugin(manifest, basePath);
this.logger.log(`Upserted plugin: ${plugin.name} (ID: ${plugin.id}, version: ${plugin.version})`);
for (const filter of filters) {
this.logger.log(`Upserted plugin filter: ${filter.methodName} (ID: ${filter.id})`);
}
for (const action of actions) {
this.logger.log(`Upserted plugin action: ${action.methodName} (ID: ${action.id})`);
}
}
private async readAndValidateManifest(manifestPath: string): Promise<PluginManifestDto> {
const content = await this.storageRepository.readTextFile(manifestPath);
const manifestData = JSON.parse(content);
return PluginManifestSchema.parse(manifestData);
}
///////////////////////////////////////////
// Plugin Execution
///////////////////////////////////////////
private async loadPlugins() {
const plugins = await this.pluginRepository.getAllPlugins();
for (const plugin of plugins) {
try {
this.logger.debug(`Loading plugin: ${plugin.name} from ${plugin.wasmPath}`);
const extismPlugin = await newPlugin(plugin.wasmPath, {
useWasi: true,
functions: this.hostFunctions.getHostFunctions(),
});
this.loadedPlugins.set(plugin.id, extismPlugin);
this.logger.log(`Successfully loaded plugin: ${plugin.name}`);
} catch (error) {
this.logger.error(`Failed to load plugin ${plugin.name}:`, error);
}
}
}
@OnEvent({ name: 'AssetCreate' })
async handleAssetCreate({ asset }: ArgOf<'AssetCreate'>) {
await this.handleTrigger(PluginTriggerType.AssetCreate, {
ownerId: asset.ownerId,
event: { userId: asset.ownerId, asset },
});
}
private async handleTrigger<T extends PluginTriggerType>(
triggerType: T,
params: { ownerId: string; event: WorkflowData[T] },
): Promise<void> {
const workflows = await this.workflowRepository.getWorkflowByOwnerAndTrigger(params.ownerId, triggerType);
if (workflows.length === 0) {
return;
}
const jobs: JobItem[] = workflows.map((workflow) => ({
name: JobName.WorkflowRun,
data: {
id: workflow.id,
type: triggerType,
event: params.event,
} as IWorkflowJob<T>,
}));
await this.jobRepository.queueAll(jobs);
this.logger.debug(`Queued ${jobs.length} workflow execution jobs for trigger ${triggerType}`);
}
@OnJob({ name: JobName.WorkflowRun, queue: QueueName.Workflow })
async handleWorkflowRun({ id: workflowId, type, event }: JobOf<JobName.WorkflowRun>): Promise<JobStatus> {
try {
const workflow = await this.workflowRepository.getWorkflow(workflowId);
if (!workflow) {
this.logger.error(`Workflow ${workflowId} not found`);
return JobStatus.Failed;
}
const workflowFilters = await this.workflowRepository.getFilters(workflowId);
const workflowActions = await this.workflowRepository.getActions(workflowId);
switch (type) {
case PluginTriggerType.AssetCreate: {
const data = event as WorkflowData[PluginTriggerType.AssetCreate];
const asset = data.asset;
const authToken = this.cryptoRepository.signJwt({ userId: data.userId }, this.pluginJwtSecret);
const context = {
authToken,
asset,
};
const filtersPassed = await this.executeFilters(workflowFilters, context);
if (!filtersPassed) {
return JobStatus.Skipped;
}
await this.executeActions(workflowActions, context);
this.logger.debug(`Workflow ${workflowId} executed successfully`);
return JobStatus.Success;
}
case PluginTriggerType.PersonRecognized: {
this.logger.error('unimplemented');
return JobStatus.Skipped;
}
default: {
this.logger.error(`Unknown workflow trigger type: ${type}`);
return JobStatus.Failed;
}
}
} catch (error) {
this.logger.error(`Error executing workflow ${workflowId}:`, error);
return JobStatus.Failed;
}
}
private async executeFilters(workflowFilters: WorkflowFilter[], context: WorkflowContext): Promise<boolean> {
for (const workflowFilter of workflowFilters) {
const filter = await this.pluginRepository.getFilter(workflowFilter.pluginFilterId);
if (!filter) {
this.logger.error(`Filter ${workflowFilter.pluginFilterId} not found`);
return false;
}
const pluginInstance = this.loadedPlugins.get(filter.pluginId);
if (!pluginInstance) {
this.logger.error(`Plugin ${filter.pluginId} not loaded`);
return false;
}
const filterInput: PluginInput = {
authToken: context.authToken,
config: workflowFilter.filterConfig,
data: {
asset: context.asset,
},
};
this.logger.debug(`Calling filter ${filter.methodName} with input: ${JSON.stringify(filterInput)}`);
const filterResult = await pluginInstance.call(
filter.methodName,
new TextEncoder().encode(JSON.stringify(filterInput)),
);
if (!filterResult) {
this.logger.error(`Filter ${filter.methodName} returned null`);
return false;
}
const result = JSON.parse(filterResult.text());
if (result.passed === false) {
this.logger.debug(`Filter ${filter.methodName} returned false, stopping workflow execution`);
return false;
}
}
return true;
}
private async executeActions(workflowActions: WorkflowAction[], context: WorkflowContext): Promise<void> {
for (const workflowAction of workflowActions) {
const action = await this.pluginRepository.getAction(workflowAction.pluginActionId);
if (!action) {
throw new Error(`Action ${workflowAction.pluginActionId} not found`);
}
const pluginInstance = this.loadedPlugins.get(action.pluginId);
if (!pluginInstance) {
throw new Error(`Plugin ${action.pluginId} not loaded`);
}
const actionInput: PluginInput = {
authToken: context.authToken,
config: workflowAction.actionConfig,
data: {
asset: context.asset,
},
};
this.logger.debug(`Calling action ${action.methodName} with input: ${JSON.stringify(actionInput)}`);
await pluginInstance.call(action.methodName, JSON.stringify(actionInput));
}
async searchMethods(dto: PluginMethodSearchDto): Promise<PluginMethodResponseDto[]> {
const methods = await this.pluginRepository.searchMethods(dto);
return methods
.filter((method) => !dto.trigger || isMethodCompatible(method, dto.trigger))
.map((method) => mapMethod(method));
}
}
@@ -0,0 +1,344 @@
import { CurrentPlugin } from '@extism/extism';
import { WorkflowChanges, WorkflowEventData, WorkflowEventPayload, WorkflowResponse } from '@immich/plugin-sdk';
import { HttpException, UnauthorizedException } from '@nestjs/common';
import _ from 'lodash';
import { join } from 'node:path';
import { OnEvent, OnJob } from 'src/decorators';
import { AlbumsAddAssetsDto } from 'src/dtos/album.dto';
import { BulkIdsDto } from 'src/dtos/asset-ids.response.dto';
import { AuthDto } from 'src/dtos/auth.dto';
import { PluginManifestDto } from 'src/dtos/plugin-manifest.dto';
import {
BootstrapEventPriority,
DatabaseLock,
ImmichWorker,
JobName,
JobStatus,
QueueName,
WorkflowTrigger,
WorkflowType,
} from 'src/enum';
import { ArgOf } from 'src/repositories/event.repository';
import { AlbumService } from 'src/services/album.service';
import { BaseService } from 'src/services/base.service';
import { JobOf } from 'src/types';
const dummy = () => {
throw new Error(
`Calling host functions is not allowed without setting methods[].hostFunctions=true in the plugin manifest`,
);
};
type ExecuteOptions<T extends WorkflowType> = {
read: (type: T) => Promise<{ authUserId: string; data: WorkflowEventData<T> }>;
write: (changes: WorkflowChanges<T>) => Promise<void>;
};
export class WorkflowExecutionService extends BaseService {
private jwtSecret!: string;
@OnEvent({ name: 'AppBootstrap', priority: BootstrapEventPriority.PluginSync, workers: [ImmichWorker.Microservices] })
async onPluginSync() {
await this.databaseRepository.withLock(DatabaseLock.PluginImport, async () => {
// TODO avoid importing plugins in each worker
// Can this use system metadata similar to geocoding?
const { resourcePaths, plugins } = this.configRepository.getEnv();
await this.importFolder(resourcePaths.corePlugin, { force: true });
if (plugins.external.allow && plugins.external.installFolder) {
await this.importFolders(plugins.external.installFolder);
}
});
}
@OnEvent({ name: 'AppBootstrap', priority: BootstrapEventPriority.PluginLoad, workers: [ImmichWorker.Microservices] })
async onPluginLoad() {
this.jwtSecret = this.cryptoRepository.randomBytesAsText(32);
const albumService = BaseService.create(AlbumService, this);
const albumAddAssets = this.wrap<[id: string, dto: BulkIdsDto]>((authDto, args) =>
albumService.addAssets(authDto, ...args),
);
const addAssetsToAlbums = this.wrap<[dto: AlbumsAddAssetsDto]>((authDto, args) =>
albumService.addAssetsToAlbums(authDto, ...args),
);
const functions = {
albumAddAssets,
addAssetsToAlbums,
};
const stubs = {
albumAddAssets: dummy,
addAssetsToAlbums: dummy,
};
const plugins = await this.pluginRepository.getForLoad();
for (const { id, name, version, wasmBytes, methods } of plugins) {
const method = methods.some(({ hostFunctions }) => !hostFunctions);
if (method) {
const label = `${name}@${version}`;
const key = this.getPluginKey({ id, hostFunctions: false });
try {
await this.pluginRepository.load({ key, label, wasmBytes }, { runInWorker: false, functions: stubs });
this.logger.log(`Loaded plugin: ${label}`);
} catch (error) {
this.logger.error(`Unable to load plugin ${label} (${id})`, error);
}
}
const methodWithFunction = methods.some(({ hostFunctions }) => hostFunctions);
if (methodWithFunction) {
const label = `${name}@${version}/worker`;
const key = this.getPluginKey({ id, hostFunctions: true });
try {
await this.pluginRepository.load({ key, label, wasmBytes }, { runInWorker: true, functions });
this.logger.log(`Loaded plugin with host functions: ${label}`);
} catch (error) {
this.logger.error(`Unable to load plugin with host functions ${label} (${id})`, error);
}
}
}
}
private getPluginKey({ id, hostFunctions }: { id: string; hostFunctions: boolean }) {
return id + (hostFunctions ? '/worker' : '');
}
private wrap<T>(fn: (authDto: AuthDto, args: T) => Promise<unknown>) {
return async (plugin: CurrentPlugin, offset: bigint) => {
try {
const handle = plugin.read(offset);
if (!handle) {
return plugin.store(
JSON.stringify({ success: false, status: 400, message: 'Called host function without input' }),
);
}
const { authToken, args } = handle.json() as { authToken: string; args: T };
if (!authToken) {
throw new Error('authToken is required');
}
const authDto = this.validate(authToken);
const response = await fn(authDto, args);
return plugin.store(JSON.stringify({ success: true, response }));
} catch (error: Error | any) {
if (error instanceof HttpException) {
this.logger.error(`Plugin host exception: ${error}`);
return plugin.store(
JSON.stringify({ success: false, status: error.getStatus(), message: error.getResponse() }),
);
}
this.logger.error(`Plugin host exception: ${error}`, error?.stack);
return plugin.store(
JSON.stringify({
success: false,
status: 500,
message: `Internal server error: ${error}`,
}),
);
}
};
}
private async importFolders(installFolder: string): Promise<void> {
try {
const entries = await this.storageRepository.readdirWithTypes(installFolder);
for (const entry of entries) {
if (!entry.isDirectory()) {
continue;
}
await this.importFolder(join(installFolder, entry.name));
}
} catch (error) {
this.logger.error(`Failed to import plugins folder ${installFolder}:`, error);
}
}
private async importFolder(folder: string, options?: { force?: boolean }) {
try {
const manifestPath = join(folder, 'manifest.json');
const dto = await this.storageRepository.readJsonFile(manifestPath);
const result = PluginManifestDto.schema.safeParse(dto);
if (!result.success) {
const issues = result.error.issues.map((issue) => ` - [${issue.path.join('.')}] ${issue.message}`).join('\n');
this.logger.warn(`Invalid plugin manifest at ${manifestPath}:\n${issues}`);
return;
}
const manifest = result.data;
const existing = await this.pluginRepository.getByName(manifest.name);
if (existing && existing.version === manifest.version && options?.force !== true) {
return;
}
const wasmPath = `${folder}/${manifest.wasmPath}`;
const wasmBytes = await this.storageRepository.readFile(wasmPath);
const plugin = await this.pluginRepository.upsert(
{
enabled: true,
name: manifest.name,
title: manifest.title,
description: manifest.description,
author: manifest.author,
version: manifest.version,
wasmBytes,
},
manifest.methods,
);
if (existing) {
this.logger.log(
`Upgraded plugin ${manifest.name} (${plugin.methods.length} methods) from ${existing.version} to ${manifest.version} `,
);
} else {
this.logger.log(
`Imported plugin ${manifest.name}@${manifest.version} (${plugin.methods.length} methods) from ${folder}`,
);
}
return manifest;
} catch {
this.logger.warn(`Failed to import plugin from ${folder}:`);
}
}
private validate(authToken: string): AuthDto {
try {
const jwt = this.cryptoRepository.verifyJwt<{ userId: string }>(authToken, this.jwtSecret);
if (!jwt.userId) {
throw new UnauthorizedException('Invalid token: missing userId');
}
return {
user: {
id: jwt.userId,
},
} as AuthDto;
} catch (error) {
this.logger.error('Token validation failed:', error);
throw new UnauthorizedException('Invalid token');
}
}
private sign(userId: string) {
return this.cryptoRepository.signJwt({ userId }, this.jwtSecret);
}
@OnEvent({ name: 'AssetCreate' })
async onAssetCreate({ asset }: ArgOf<'AssetCreate'>) {
const dto = { ownerId: asset.ownerId, trigger: WorkflowTrigger.AssetCreate };
const items = await this.workflowRepository.search(dto);
await this.jobRepository.queueAll(
items.map((workflow) => ({
name: JobName.WorkflowAssetCreate,
data: { workflowId: workflow.id, assetId: asset.id },
})),
);
}
@OnJob({ name: JobName.WorkflowAssetCreate, queue: QueueName.Workflow })
handleAssetCreate({ workflowId, assetId }: JobOf<JobName.WorkflowAssetCreate>) {
return this.execute(workflowId, (type) => {
switch (type) {
case WorkflowType.AssetV1: {
return {
read: async () => {
const asset = await this.workflowRepository.getForAssetV1(assetId);
return {
data: { asset } as any,
authUserId: asset.ownerId,
};
},
write: async (changes) => {
if (changes.asset) {
await this.assetRepository.update({
id: assetId,
..._.omitBy(
{
isFavorite: changes.asset?.isFavorite,
visibility: changes.asset?.visibility,
},
_.isUndefined,
),
});
}
},
} satisfies ExecuteOptions<typeof type>;
}
}
});
}
private async execute<T extends WorkflowType>(
workflowId: string,
getHandler: (type: T) => ExecuteOptions<T> | undefined,
) {
const workflow = await this.workflowRepository.getForWorkflowRun(workflowId);
if (!workflow) {
return;
}
// TODO infer from steps
const type = 'AssetV1' as T;
const handler = getHandler(type);
if (!handler) {
this.logger.error(`Misconfigured workflow ${workflowId}: no handler for type ${type}`);
return;
}
try {
const { read, write } = handler;
const readResult = await read(type);
let data = readResult.data;
for (const step of workflow.steps) {
const payload: WorkflowEventPayload = {
trigger: workflow.trigger,
type,
config: step.config ?? {},
workflow: {
id: workflowId,
authToken: this.sign(readResult.authUserId),
stepId: step.id,
},
data,
};
if (step.methodName.startsWith('noop')) {
continue;
}
const result = await this.pluginRepository.callMethod<WorkflowResponse<T>>(
{
pluginKey: this.getPluginKey({ id: step.pluginId, hostFunctions: step.hostFunctions }),
methodName: step.methodName,
},
payload,
);
if (result?.changes) {
await write(result.changes);
({ data } = await read(type));
}
const shouldContinue = result?.workflow?.continue ?? true;
if (!shouldContinue) {
break;
}
}
this.logger.debug(`Workflow ${workflowId} executed successfully`);
} catch (error) {
this.logger.error(`Error executing workflow ${workflowId}:`, error);
return JobStatus.Failed;
}
}
}
+67 -112
View File
@@ -1,159 +1,114 @@
import { WorkflowStepConfig } from '@immich/plugin-sdk';
import { BadRequestException, Injectable } from '@nestjs/common';
import { Workflow } from 'src/database';
import { AuthDto } from 'src/dtos/auth.dto';
import {
mapWorkflowAction,
mapWorkflowFilter,
mapWorkflow,
mapWorkflowShare,
WorkflowCreateDto,
WorkflowResponseDto,
WorkflowSearchDto,
WorkflowShareResponseDto,
WorkflowTriggerResponseDto,
WorkflowUpdateDto,
} from 'src/dtos/workflow.dto';
import { Permission, PluginContext, PluginTriggerType } from 'src/enum';
import { pluginTriggers } from 'src/plugins';
import { Permission, WorkflowTrigger } from 'src/enum';
import { PluginMethodSearchResponse } from 'src/repositories/plugin.repository';
import { BaseService } from 'src/services/base.service';
import { getWorkflowTriggers, isMethodCompatible, resolveMethod } from 'src/utils/workflow';
@Injectable()
export class WorkflowService extends BaseService {
async create(auth: AuthDto, dto: WorkflowCreateDto): Promise<WorkflowResponseDto> {
const context = this.getContextForTrigger(dto.triggerType);
const filterInserts = await this.validateAndMapFilters(dto.filters, context);
const actionInserts = await this.validateAndMapActions(dto.actions, context);
const workflow = await this.workflowRepository.createWorkflow(
{
ownerId: auth.user.id,
triggerType: dto.triggerType,
name: dto.name,
description: dto.description || '',
enabled: dto.enabled ?? true,
},
filterInserts,
actionInserts,
);
return this.mapWorkflow(workflow);
getTriggers(): WorkflowTriggerResponseDto[] {
return getWorkflowTriggers();
}
async getAll(auth: AuthDto): Promise<WorkflowResponseDto[]> {
const workflows = await this.workflowRepository.getWorkflowsByOwner(auth.user.id);
return Promise.all(workflows.map((workflow) => this.mapWorkflow(workflow)));
async search(auth: AuthDto, dto: WorkflowSearchDto): Promise<WorkflowResponseDto[]> {
const workflows = await this.workflowRepository.search({ ...dto, ownerId: auth.user.id });
return workflows.map((workflow) => mapWorkflow(workflow));
}
async get(auth: AuthDto, id: string): Promise<WorkflowResponseDto> {
await this.requireAccess({ auth, permission: Permission.WorkflowRead, ids: [id] });
const workflow = await this.findOrFail(id);
return this.mapWorkflow(workflow);
return mapWorkflow(workflow);
}
async share(auth: AuthDto, id: string): Promise<WorkflowShareResponseDto> {
await this.requireAccess({ auth, permission: Permission.WorkflowRead, ids: [id] });
const workflow = await this.findOrFail(id);
return mapWorkflowShare(workflow);
}
async create(auth: AuthDto, dto: WorkflowCreateDto): Promise<WorkflowResponseDto> {
const { steps: stepsDto, ...workflowDto } = dto;
const steps = await this.resolveAndValidateSteps(stepsDto ?? [], workflowDto.trigger);
const workflow = await this.workflowRepository.create(
{
...workflowDto,
ownerId: auth.user.id,
},
steps.map((step) => ({
enabled: step.enabled ?? true,
config: step.config as WorkflowStepConfig,
pluginMethodId: step.pluginMethod.id,
})),
);
return mapWorkflow({ ...workflow, steps: [] });
}
async update(auth: AuthDto, id: string, dto: WorkflowUpdateDto): Promise<WorkflowResponseDto> {
await this.requireAccess({ auth, permission: Permission.WorkflowUpdate, ids: [id] });
if (Object.values(dto).filter((prop) => prop !== undefined).length === 0) {
throw new BadRequestException('No fields to update');
}
const workflow = await this.findOrFail(id);
const context = this.getContextForTrigger(dto.triggerType ?? workflow.triggerType);
const { filters, actions, ...workflowUpdate } = dto;
const filterInserts = filters && (await this.validateAndMapFilters(filters, context));
const actionInserts = actions && (await this.validateAndMapActions(actions, context));
const updatedWorkflow = await this.workflowRepository.updateWorkflow(
const { steps: stepsDto, ...workflowDto } = dto;
const current = await this.findOrFail(id);
const steps = stepsDto ? await this.resolveAndValidateSteps(stepsDto, dto.trigger ?? current.trigger) : undefined;
const workflow = await this.workflowRepository.update(
id,
workflowUpdate,
filterInserts,
actionInserts,
workflowDto,
steps?.map((step) => ({
enabled: step.enabled ?? true,
config: step.config as WorkflowStepConfig,
pluginMethodId: step.pluginMethod.id,
})),
);
return this.mapWorkflow(updatedWorkflow);
return mapWorkflow(workflow);
}
async delete(auth: AuthDto, id: string): Promise<void> {
await this.requireAccess({ auth, permission: Permission.WorkflowDelete, ids: [id] });
await this.workflowRepository.deleteWorkflow(id);
await this.workflowRepository.delete(id);
}
private async validateAndMapFilters(
filters: Array<{ pluginFilterId: string; filterConfig?: any }>,
requiredContext: PluginContext,
) {
for (const dto of filters) {
const filter = await this.pluginRepository.getFilter(dto.pluginFilterId);
if (!filter) {
throw new BadRequestException(`Invalid filter ID: ${dto.pluginFilterId}`);
private async resolveAndValidateSteps<T extends { method: string }>(steps: T[], trigger: WorkflowTrigger) {
const methods = await this.pluginRepository.getForValidation();
const results: Array<T & { pluginMethod: PluginMethodSearchResponse }> = [];
for (const step of steps) {
const pluginMethod = resolveMethod(methods, step.method);
if (!pluginMethod) {
throw new BadRequestException(`Unknown method ${step.method}`);
}
if (!filter.supportedContexts.includes(requiredContext)) {
throw new BadRequestException(
`Filter "${filter.title}" does not support ${requiredContext} context. Supported contexts: ${filter.supportedContexts.join(', ')}`,
);
if (!isMethodCompatible(pluginMethod, trigger)) {
throw new BadRequestException(`Method "${step.method}" is incompatible with workflow trigger: "${trigger}"`);
}
results.push({ ...step, pluginMethod });
}
return filters.map((dto, index) => ({
pluginFilterId: dto.pluginFilterId,
filterConfig: dto.filterConfig || null,
order: index,
}));
}
// TODO make sure all steps can use a common WorkflowType
private async validateAndMapActions(
actions: Array<{ pluginActionId: string; actionConfig?: any }>,
requiredContext: PluginContext,
) {
for (const dto of actions) {
const action = await this.pluginRepository.getAction(dto.pluginActionId);
if (!action) {
throw new BadRequestException(`Invalid action ID: ${dto.pluginActionId}`);
}
if (!action.supportedContexts.includes(requiredContext)) {
throw new BadRequestException(
`Action "${action.title}" does not support ${requiredContext} context. Supported contexts: ${action.supportedContexts.join(', ')}`,
);
}
}
return actions.map((dto, index) => ({
pluginActionId: dto.pluginActionId,
actionConfig: dto.actionConfig || null,
order: index,
}));
}
private getContextForTrigger(type: PluginTriggerType) {
const trigger = pluginTriggers.find((t) => t.type === type);
if (!trigger) {
throw new BadRequestException(`Invalid trigger type: ${type}`);
}
return trigger.contextType;
return results;
}
private async findOrFail(id: string) {
const workflow = await this.workflowRepository.getWorkflow(id);
const workflow = await this.workflowRepository.get(id);
if (!workflow) {
throw new BadRequestException('Workflow not found');
}
return workflow;
}
private async mapWorkflow(workflow: Workflow): Promise<WorkflowResponseDto> {
const filters = await this.workflowRepository.getFilters(workflow.id);
const actions = await this.workflowRepository.getActions(workflow.id);
return {
id: workflow.id,
ownerId: workflow.ownerId,
triggerType: workflow.triggerType,
name: workflow.name,
description: workflow.description,
createdAt: workflow.createdAt.toISOString(),
enabled: workflow.enabled,
filters: filters.map((f) => mapWorkflowFilter(f)),
actions: actions.map((a) => mapWorkflowAction(a)),
};
}
}