项目名称: 企业级智能工作流自动化平台
项目目标: 旨在构建一个功能强大、易于扩展的自动化工作流平台,赋能企业用户通过可视化的方式设计、部署和执行各类业务流程。该平台将提供类似n8n、Zapier、Integromat等产品的核心能力,但更注重企业内部集成与自定义扩展性。
核心理念: 低代码/无代码、可扩展、高性能、易用。
技术栈概览:
// 定义节点类型
type NodeType = 'trigger' | 'http' | 'database' | 'condition' | 'loop' | 'customFunction' | string;
// 工作流节点基础配置
interface BaseNodeConfig {
name: string;
description?: string;
// 其他通用配置
}
// HTTP节点配置
interface HttpNodeConfig extends BaseNodeConfig {
method: 'GET' | 'POST' | 'PUT' | 'DELETE';
url: string;
headers?: Record<string, string>;
body?: Record<string, any> | string;
timeout?: number; // 毫秒
}
// 数据库节点配置
interface DatabaseNodeConfig extends BaseNodeConfig {
connectionId: string; // 数据库连接ID
operation: 'query' | 'insert' | 'update' | 'delete';
table?: string;
query?: string; // SQL查询语句
parameters?: Record<string, any>;
data?: Record<string, any>; // 插入或更新的数据
where?: Record<string, any>; // 更新或删除的条件
}
// 条件节点配置
interface ConditionNodeConfig extends BaseNodeConfig {
expression: string; // 条件表达式,支持JavaScript语法或自定义DSL
branches: Array<{
name: string;
value: any; // 表达式的预期结果
nextNodeId: string;
}>;
defaultNextNodeId?: string; // 默认分支
}
// 工作流节点
interface WorkflowNode {
id: string;
type: NodeType;
name: string;
position: { x: number; y: number };
config: HttpNodeConfig | DatabaseNodeConfig | ConditionNodeConfig | BaseNodeConfig;
inputs: Array<{ portId: string; type: string; label: string }>; // 输入端口
outputs: Array<{ portId: string; type: string; label: string }>; // 输出端口
metadata?: Record<string, any>;
}
// 节点之间的连接
interface Connection {
id: string;
source: {
nodeId: string;
port: string;
};
target: {
nodeId: string;
port: string;
};
}
// 整个工作流定义
interface Workflow {
id: string;
name: string;
description?: string;
nodes: WorkflowNode[];
connections: Connection[];
version: number;
createdAt: Date;
updatedAt: Date;
createdBy: string;
updatedBy: string;
status: 'draft' | 'published' | 'archived';
variables: Record<string, any>; // 工作流级别的变量
tags?: string[];
}
// 工作流执行上下文 (运行时数据)
interface ExecutionContext {
workflowId: string;
executionId: string;
startTime: Date;
status: 'running' | 'completed' | 'failed' | 'canceled';
variables: Record<string, any>; // 运行时变量
executionPath: Array<{ nodeId: string; startTime: Date; endTime?: Date; status?: string }>;
results: Map<string, NodeExecutionResult>; // 每个节点的执行结果
// ... 其他运行时数据
}
// 节点执行结果
interface NodeExecutionResult {
data: any; // 节点输出数据
status: 'success' | 'failed';
error?: string;
executionTime: number; // 毫秒
metadata?: Record<string, any>;
}
// 验证结果
interface ValidationResult {
isValid: boolean;
errors: string[];
}
import { ref, reactive, computed } from 'vue';
import type { Workflow, WorkflowNode, Connection, NodeType, ValidationResult } from '@/types/workflow';
// 假设有一个存储所有节点配置的映射
const nodeConfigs = {
trigger: {
name: '触发器',
inputs: [],
outputs: [{ portId: 'output', type: 'flow', label: '输出' }],
config: { /* 默认配置 */ }
},
http: {
name: 'HTTP请求',
inputs: [{ portId: 'input', type: 'flow', label: '输入' }],
outputs: [{ portId: 'success', type: 'flow', label: '成功' }, { portId: 'fail', type: 'flow', label: '失败' }],
config: { method: 'GET', url: '', headers: {}, body: {} }
},
database: {
name: '数据库操作',
inputs: [{ portId: 'input', type: 'flow', label: '输入' }],
outputs: [{ portId: 'success', type: 'flow', label: '成功' }, { portId: 'fail', type: 'flow', label: '失败' }],
config: { connectionId: '', operation: 'query' }
},
condition: {
name: '条件分支',
inputs: [{ portId: 'input', type: 'flow', label: '输入' }],
outputs: [{ portId: 'true', type: 'flow', label: '真' }, { portId: 'false', type: 'flow', label: '假' }], // 示例
config: { expression: '' }
},
// ... 更多节点类型
};
export function useWorkflowEditor() {
// 响应式的工作流数据
const workflow = reactive<Workflow>({
id: 'new_workflow_' + Date.now(),
name: '未命名工作流',
description: '',
nodes: [],
connections: [],
version: 1,
createdAt: new Date(),
updatedAt: new Date(),
createdBy: 'admin',
updatedBy: 'admin',
status: 'draft',
variables: {}
});
const selectedNode = ref<WorkflowNode | null>(null);
// 获取默认节点配置
const getDefaultNodeConfig = (nodeType: NodeType) => {
const config = (nodeConfigs as any)[nodeType];
return config ? JSON.parse(JSON.stringify(config)) : {}; // 深拷贝
};
// 添加新节点
const addNode = (nodeType: NodeType, position: { x: number; y: number }): WorkflowNode => {
const defaultNodeProps = getDefaultNodeConfig(nodeType);
const newNode: WorkflowNode = {
id: `node_${nodeType}_${Date.now()}`,
type: nodeType,
name: defaultNodeProps.name || `新${nodeType}节点`,
position,
inputs: defaultNodeProps.inputs || [],
outputs: defaultNodeProps.outputs || [],
config: defaultNodeProps.config || {} // 使用默认配置
// ... 可能还有其他默认属性,例如 ports
};
workflow.nodes.push(newNode);
return newNode;
};
// 连接节点
const connectNodes = (sourceId: string, sourcePort: string,
targetId: string, targetPort: string) => {
const connection: Connection = {
id: `conn_${Date.now()}`,
source: { nodeId: sourceId, port: sourcePort },
target: { nodeId: targetId, port: targetPort }
};
workflow.connections.push(connection);
};
// 根据ID删除节点
const deleteNodeById = (nodeId: string) => {
workflow.nodes = workflow.nodes.filter(node => node.id !== nodeId);
workflow.connections = workflow.connections.filter(
conn => conn.source.nodeId !== nodeId && conn.target.nodeId !== nodeId
);
};
// 更新节点位置
const updateNodePosition = (nodeId: string, newPosition: { x: number; y: number }) => {
const node = workflow.nodes.find(n => n.id === nodeId);
if (node) {
node.position = newPosition;
}
};
// 辅助函数:检测工作流中是否存在循环
function hasCycle(workflow: Workflow): boolean {
const graph: Map<string, string[]> = new Map();
workflow.nodes.forEach(node => graph.set(node.id, []));
workflow.connections.forEach(conn => {
const sourceOutputs = workflow.nodes.find(n => n.id === conn.source.nodeId)?.outputs;
// 只有数据流转的连接才构成图的边
if (sourceOutputs?.some(output => output.portId === conn.source.port && output.type === 'flow')) {
graph.get(conn.source.nodeId)?.push(conn.target.nodeId);
}
});
const visited: Set<string> = new Set();
const recursionStack: Set<string> = new Set();
function dfs(nodeId: string): boolean {
visited.add(nodeId);
recursionStack.add(nodeId);
for (const neighborId of graph.get(nodeId) || []) {
if (!visited.has(neighborId)) {
if (dfs(neighborId)) {
return true;
}
} else if (recursionStack.has(neighborId)) {
return true; // 发现循环
}
}
recursionStack.delete(nodeId);
return false;
}
for (const node of workflow.nodes) {
if (!visited.has(node.id)) {
if (dfs(node.id)) {
return true;
}
}
}
return false;
}
// 验证工作流
const validateWorkflow = computed<ValidationResult>(() => {
const errors: string[] = [];
// 检查是否有孤立节点 (除了触发器节点)
const connectedNodeIds = new Set<string>();
workflow.connections.forEach(conn => {
connectedNodeIds.add(conn.source.nodeId);
connectedNodeIds.add(conn.target.nodeId);
});
workflow.nodes.forEach(node => {
// 触发器节点可以没有入边,但一般要有出边
if (node.type !== 'trigger' && !connectedNodeIds.has(node.id) && workflow.connections.every(c => c.target.nodeId !== node.id)) {
errors.push(`节点 "${node.name}" (${node.id}) 是孤立节点或没有入边`);
}
// 触发器节点检查:必须有出边
if (node.type === 'trigger' && !workflow.connections.some(c => c.source.nodeId === node.id)) {
errors.push(`触发器节点 "${node.name}" (${node.id}) 没有出边`);
}
// 检查必填配置
// 这里可以根据 node.type 和 nodeConfigs 定义更详细的验证
// 例如:if (node.type === 'http' && !(node.config as HttpNodeConfig).url) { errors.push(...) }
});
// 检查是否有环
if (hasCycle(workflow)) {
errors.push('工作流中存在循环依赖,请检查连线');
}
// 检查是否有重复的节点ID (理论上在 addNode 时已避免,但作为防御性编程)
const nodeIds = new Set();
workflow.nodes.forEach(node => {
if (nodeIds.has(node.id)) {
errors.push(`存在重复的节点ID: ${node.id}`);
}
nodeIds.add(node.id);
});
// 检查连线的源和目标节点/端口是否存在
workflow.connections.forEach(conn => {
const sourceNode = workflow.nodes.find(n => n.id === conn.source.nodeId);
const targetNode = workflow.nodes.find(n => n.id === conn.target.nodeId);
if (!sourceNode) {
errors.push(`连线 ${conn.id} 的源节点 ${conn.source.nodeId} 不存在`);
} else if (!sourceNode.outputs.some(p => p.portId === conn.source.port)) {
errors.push(`连线 ${conn.id} 的源节点 ${sourceNode.name} 没有输出端口 ${conn.source.port}`);
}
if (!targetNode) {
errors.push(`连线 ${conn.id} 的目标节点 ${conn.target.nodeId} 不存在`);
} else if (!targetNode.inputs.some(p => p.portId === conn.target.port)) {
errors.push(`连线 ${conn.id} 的目标节点 ${targetNode.name} 没有输入端口 ${conn.target.port}`);
}
});
return {
isValid: errors.length === 0,
errors
};
});
// 简单的历史记录功能
const history = (() => {
const states: Workflow[] = [];
let currentIndex = -1;
const maxHistory = 20; // 最大历史记录数
const saveState = () => {
if (currentIndex < states.length - 1) {
states.splice(currentIndex + 1); // 截断重做历史
}
states.push(JSON.parse(JSON.stringify(workflow)));
if (states.length > maxHistory) {
states.shift(); // 移除最旧的
}
currentIndex = states.length - 1;
console.log('State saved. Current index:', currentIndex, 'Total states:', states.length);
};
const undo = () => {
if (currentIndex > 0) {
currentIndex--;
// 使用 Object.assign 确保 Vue 的响应式系统能正确追踪变化
Object.assign(workflow, states[currentIndex]);
console.log('Undo. Current index:', currentIndex);
}
};
const redo = () => {
if (currentIndex < states.length - 1) {
currentIndex++;
Object.assign(workflow, states[currentIndex]);
console.log('Redo. Current index:', currentIndex);
}
};
const canUndo = computed(() => currentIndex > 0);
const canRedo = computed(() => currentIndex < states.length - 1);
// 监听工作流变化并保存状态
// 这里需要更细粒度的控制,避免每次小变动都保存
// 例如,可以通过 debounce 或在特定操作后手动 saveState
// watch(workflow, saveState, { deep: true }); // 这个会过于频繁
return { saveState, undo, redo, canUndo, canRedo };
})();
// 初始化时保存一次状态,作为初始可撤销点
history.saveState();
return {
workflow,
selectedNode,
addNode,
connectNodes,
deleteNodeById,
updateNodePosition,
validateWorkflow,
history
};
}
import { EventEmitter } from 'events'; // 或自定义事件总线
import type { Workflow, WorkflowNode, ExecutionContext, NodeExecutionResult, NodeType, ValidationResult } from '@/types/workflow';
// 节点执行器接口
interface NodeExecutor {
type: NodeType;
execute: (node: WorkflowNode, context: ExecutionContext, inputData?: any)
=> Promise<NodeExecutionResult>;
validateConfig?: (config: any) => ValidationResult;
}
// 示例:模拟的HTTP节点配置
interface HttpNodeConfig {
method: 'GET' | 'POST' | 'PUT' | 'DELETE';
url: string;
headers?: Record<string, string>;
body?: Record<string, any> | string;
timeout?: number;
}
// 示例:模拟的数据库节点配置
interface DatabaseNodeConfig {
connectionId: string;
operation: 'query' | 'insert' | 'update' | 'delete';
table?: string;
query?: string;
parameters?: Record<string, any>;
data?: Record<string, any>;
where?: Record<string, any>;
}
// 模拟的Fetch API和sleep函数
async function fetch(url: string, options: any): Promise<any> {
console.log(`模拟请求: ${options.method} ${url} with body: ${options.body}`);
await new Promise(resolve => setTimeout(resolve, 100 + Math.random() * 500)); // 模拟网络延迟
return {
json: () => Promise.resolve({ success: true, message: 'Simulated response' }),
status: 200,
headers: { entries: () => [] }
};
}
function sleep(ms: number) { return new Promise(resolve => setTimeout(resolve, ms)); }
// 模拟的数据库连接
class MockDBConnection {
constructor(private id: string) {}
async query(sql: string, params: any) {
console.log(`DB[${this.id}] Query: ${sql}, Params: ${JSON.stringify(params)}`);
await sleep(50);
return [{ id: 1, name: 'Test' }];
}
async insert(table: string, data: any) {
console.log(`DB[${this.id}] Insert into ${table}: ${JSON.stringify(data)}`);
await sleep(50);
return { rowCount: 1, insertId: 101 };
}
async update(table: string, data: any, where: any) {
console.log(`DB[${this.id}] Update ${table} set ${JSON.stringify(data)} where ${JSON.stringify(where)}`);
await sleep(50);
return { rowCount: 1 };
}
async delete(table: string, where: any) {
console.log(`DB[${this.id}] Delete from ${table} where ${JSON.stringify(where)}`);
await sleep(50);
return { rowCount: 1 };
}
async release() { console.log(`DB[${this.id}] Connection released.`); }
}
// 工作流执行引擎
class WorkflowExecutionEngine {
private executionQueue: Array<{ node: WorkflowNode; inputData: any; context: ExecutionContext }> = [];
private nodeExecutors: Map<NodeType, NodeExecutor> = new Map();
private eventBus: EventEmitter; // 事件总线,用于跨模块通信,例如日志、监控
private workflowData: Map<string, Workflow> = new Map(); // 存储已加载的工作流定义
constructor() {
this.eventBus = new EventEmitter();
this.registerDefaultExecutors();
}
// 注册默认节点执行器
private registerDefaultExecutors() {
this.registerNodeExecutor('http', new HttpNodeExecutor());
this.registerNodeExecutor('database', new DatabaseNodeExecutor());
// ... 注册其他默认执行器
}
// 加载工作流定义 (这里简化为从内存Map加载)
private async loadWorkflow(workflowId: string): Promise<Workflow> {
// 实际应用中会从数据库或缓存加载
const workflow = this.workflowData.get(workflowId);
if (!workflow) {
throw new Error(`工作流 ${workflowId} 未找到`);
}
return workflow;
}
// 存储工作流定义 (用于测试)
public addWorkflowDefinition(workflow: Workflow) {
this.workflowData.set(workflow.id, workflow);
}
private generateExecutionId(): string {
return `exec_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
}
// 注册节点执行器
registerNodeExecutor(type: NodeType, executor: NodeExecutor) {
this.nodeExecutors.set(type, executor);
}
// 执行工作流
async executeWorkflow(workflowId: string, triggerData?: any): Promise<{ executionId: string; status: string; startTime: Date }> {
const workflow = await this.loadWorkflow(workflowId);
const executionId = this.generateExecutionId();
// 创建执行上下文
const context: ExecutionContext = {
workflowId,
executionId,
startTime: new Date(),
status: 'running',
variables: { ...workflow.variables, ...triggerData }, // 复制工作流变量并合并触发数据
executionPath: [],
results: new Map()
};
// 获取触发器节点
const triggerNode = workflow.nodes.find(n => n.type === 'trigger');
if (!triggerNode) {
throw new Error('工作流没有触发器节点');
}
// 从触发器开始执行
await this.executeNode(triggerNode, context, triggerData);
// 异步执行后续节点
// 实际生产中,这里会使用消息队列或工作池来异步处理
this.processExecutionQueue(context);
return {
executionId,
status: context.status,
startTime: context.startTime
};
}
// 处理执行队列
private async processExecutionQueue(context: ExecutionContext) {
// 简单的循环处理,实际中会是事件驱动或消费者模式
while (this.executionQueue.length > 0) {
const { node, inputData, context: currentContext } = this.executionQueue.shift()!;
await this.executeNode(node, currentContext, inputData);
}
// 所有节点执行完毕,更新工作流状态
context.status = 'completed';
this.eventBus.emit('workflowCompleted', context.executionId, context.results);
}
// 执行单个节点
async executeNode(node: WorkflowNode, context: ExecutionContext, inputData?: any) {
const executor = this.nodeExecutors.get(node.type);
if (!executor) {
throw new Error(`未找到节点类型 ${node.type} 的执行器`);
}
const startTime = Date.now();
const nodeExecutionEntry = {
nodeId: node.id,
startTime: new Date(startTime),
status: 'running' as 'running' | 'completed' | 'failed'
};
context.executionPath.push(nodeExecutionEntry);
this.eventBus.emit('nodeStarted', context.executionId, node.id);
try {
// 执行节点逻辑
const result = await executor.execute(node, context, inputData);
// 更新节点执行记录
nodeExecutionEntry.endTime = new Date();
nodeExecutionEntry.status = 'completed';
context.results.set(node.id, {
data: result.data,
status: 'success',
executionTime: Date.now() - startTime,
metadata: result.metadata
});
this.eventBus.emit('nodeCompleted', context.executionId, node.id, result.data);
// 将节点输出数据添加到上下文变量,供后续节点使用
// 命名约定,例如 output_nodeId
context.variables[`output_${node.id}`] = result.data;
// 查找后续节点
const nextNodes = this.findNextNodes(node, context.workflowId);
// 调度后续节点
for (const nextConn of nextNodes) {
const nextNode = (await this.loadWorkflow(context.workflowId)).nodes.find(n => n.id === nextConn.target.nodeId);
if (nextNode && this.shouldExecuteNode(nextNode, result, context, nextConn.source.port)) {
// 将任务加入队列
this.executionQueue.push({
node: nextNode,
inputData: result.data, // 传递当前节点的输出作为下一个节点的输入
context // 传递当前的执行上下文
});
}
}
} catch (error: any) {
nodeExecutionEntry.endTime = new Date();
nodeExecutionEntry.status = 'failed';
context.results.set(node.id, {
data: null,
status: 'failed',
error: error.message,
executionTime: Date.now() - startTime
});
context.status = 'failed'; // 工作流状态也变为失败
this.eventBus.emit('nodeFailed', context.executionId, node.id, error.message);
// 错误处理策略 (例如重试、通知等)
await this.handleExecutionError(node, context, error);
}
}
// 查找下一个节点(根据连线)
private findNextNodes(currentNode: WorkflowNode, workflowId: string): Connection[] {
const workflow = this.workflowData.get(workflowId);
if (!workflow) return [];
// 找到从当前节点发出的所有连线
return workflow.connections.filter(
conn => conn.source.nodeId === currentNode.id
);
}
// 判断是否应该执行下一个节点 (例如条件分支)
private shouldExecuteNode(nextNode: WorkflowNode, prevResult: NodeExecutionResult, context: ExecutionContext, sourcePort: string): boolean {
// 示例:条件节点根据 prevResult 和 sourcePort 判断
if (nextNode.type === 'condition' && prevResult.data !== undefined) {
// 假设条件节点有一个 config.expression
// 并且输出端口 'true' 和 'false'
// prevResult.data 就是上一个节点传过来的数据
// sourcePort 则是上一个节点哪一个输出端口被连接到这个条件节点
// 这里需要更复杂的逻辑来评估条件
// 简化的例子:如果上一个节点是条件节点,它会通过不同的端口输出,这里根据端口判断
const isTrueBranch = sourcePort === 'true'; // 假设上一个节点是条件节点,且结果为真
const isFalseBranch = sourcePort === 'false'; // 假设结果为假
// 如果当前节点是条件节点,它将自己判断,而不是被上一个节点判断是否执行
// 因此,这里应该是根据条件节点自身的配置和输入数据来决定哪个分支被激活
// 这里的 `shouldExecuteNode` 更适用于判断上游节点输出是否满足触发下游节点执行的条件
return true; // 暂时简化为总是执行
}
return true; // 默认总是执行
}
private async handleExecutionError(node: WorkflowNode, context: ExecutionContext, error: any) {
console.error(`节点 ${node.name} (${node.id}) 执行失败:`, error.message);
// 实现重试逻辑
// 实现错误通知
// 实现回滚操作等
}
// 辅助函数:变量替换
private replaceVariables(text: string, variables: Record<string, any>): string {
if (!text) return text;
return text.replace(/\{\{([^}]+)\}\}/g, (match, variableName) => {
// 简单支持点语法访问对象属性,例如 'output_node1.data.id'
const value = variableName.split('.').reduce((obj: any, key: string) => obj && obj[key], variables);
return value ?? '';
});
}
private replaceVariablesInObject(obj: Record<string, any>, variables: Record<string, any>): Record<string, any> {
if (!obj) return {};
const newObj: Record<string, any> = {};
for (const key in obj) {
if (typeof obj[key] === 'string') {
newObj[key] = this.replaceVariables(obj[key], variables);
} else if (typeof obj[key] === 'object' && obj[key] !== null) {
newObj[key] = this.replaceVariablesInObject(obj[key], variables);
} else {
newObj[key] = obj[key];
}
}
return newObj;
}
}
// HTTP节点执行器实现
class HttpNodeExecutor implements NodeExecutor {
type: NodeType = 'http';
async execute(node: WorkflowNode, context: ExecutionContext, inputData?: any) {
const config = node.config as HttpNodeConfig;
// 构建请求参数,支持变量替换
const url = this.replaceVariables(config.url, context.variables);
const headers = this.replaceVariablesInObject(config.headers || {}, context.variables);
let body: string | undefined;
if (config.method !== 'GET' && config.body) {
if (typeof config.body === 'string') {
body = this.replaceVariables(config.body, context.variables);
} else {
body = JSON.stringify(this.replaceVariablesInObject(config.body, context.variables));
}
}
const requestOptions = {
method: config.method,
headers,
body: body,
// timeout: config.timeout // fetch API不支持直接的timeout,需要使用AbortController
};
const response = await fetch(url, requestOptions);
const data = await response.json();
return {
data,
metadata: {
statusCode: response.status,
// headers: Object.fromEntries(response.headers.entries()), // fetch HeadersIterator
responseTime: Date.now()
}
};
}
validateConfig(config: any): ValidationResult {
const errors: string[] = [];
if (!config.url) {
errors.push('URL不能为空');
}
if (!['GET', 'POST', 'PUT', 'DELETE'].includes(config.method)) {
errors.push('请求方法无效');
}
try {
if (config.body && typeof config.body !== 'string') {
JSON.parse(JSON.stringify(config.body)); // 确保是有效的JSON对象
}
} catch {
errors.push('请求体必须是有效的JSON对象或字符串');
}
return {
isValid: errors.length === 0,
errors
};
}
private replaceVariables(text: string, variables: Record<string, any>): string {
if (!text) return text;
return text.replace(/\{\{([^}]+)\}\}/g, (match, variableName) => {
const value = variableName.split('.').reduce((obj: any, key: string) => obj && obj[key], variables);
return value ?? '';
});
}
private replaceVariablesInObject(obj: Record<string, any>, variables: Record<string, any>): Record<string, any> {
if (!obj) return {};
const newObj: Record<string, any> = {};
for (const key in obj) {
if (typeof obj[key] === 'string') {
newObj[key] = this.replaceVariables(obj[key], variables);
} else if (typeof obj[key] === 'object' && obj[key] !== null) {
newObj[key] = this.replaceVariablesInObject(obj[key], variables);
} else {
newObj[key] = obj[key];
}
}
return newObj;
}
}
// 数据库节点执行器实现
class DatabaseNodeExecutor implements NodeExecutor {
type: NodeType = 'database';
private async getConnection(connectionId: string): Promise<MockDBConnection> {
// 实际中会从连接池获取
console.log(`获取数据库连接: ${connectionId}`);
await sleep(20);
return new MockDBConnection(connectionId);
}
async execute(node: WorkflowNode, context: ExecutionContext, inputData?: any) {
const config = node.config as DatabaseNodeConfig;
const connection = await this.getConnection(config.connectionId);
try {
let result: any;
// 替换查询参数中的变量
const parameters = config.parameters ? this.replaceVariablesInObject(config.parameters, context.variables) : undefined;
const data = config.data ? this.replaceVariablesInObject(config.data, context.variables) : undefined;
const where = config.where ? this.replaceVariablesInObject(config.where, context.variables) : undefined;
switch (config.operation) {
case 'query':
result = await connection.query(config.query!, parameters);
break;
case 'insert':
result = await connection.insert(config.table!, inputData || data);
break;
case 'update':
result = await connection.update(
config.table!,
inputData || data,
where
);
break;
case 'delete':
result = await connection.delete(config.table!, where);
break;
default:
throw new Error(`不支持的数据库操作: ${config.operation}`);
}
return {
data: result,
metadata: {
rowCount: result.rowCount || result.length || 0,
operation: config.operation
}
};
} finally {
await connection.release();
}
}
validateConfig(config: any): ValidationResult {
const errors: string[] = [];
if (!config.connectionId) errors.push('数据库连接ID不能为空');
if (!config.operation) errors.push('数据库操作不能为空');
switch (config.operation) {
case 'query':
if (!config.query) errors.push('查询操作需要SQL查询语句');
break;
case 'insert':
if (!config.table) errors.push('插入操作需要表名');
if (!config.data && !config.inputDataPlaceholder) errors.push('插入操作需要数据或指定输入数据');
break;
case 'update':
if (!config.table) errors.push('更新操作需要表名');
if (!config.data && !config.inputDataPlaceholder) errors.push('更新操作需要数据或指定输入数据');
if (!config.where) errors.push('更新操作需要条件');
break;
case 'delete':
if (!config.table) errors.push('删除操作需要表名');
if (!config.where) errors.push('删除操作需要条件');
break;
}
return { isValid: errors.length === 0, errors };
}
private replaceVariables(text: string, variables: Record<string, any>): string {
if (!text) return text;
return text.replace(/\{\{([^}]+)\}\}/g, (match, variableName) => {
const value = variableName.split('.').reduce((obj: any, key: string) => obj && obj[key], variables);
return value ?? '';
});
}
private replaceVariablesInObject(obj: Record<string, any>, variables: Record<string, any>): Record<string, any> {
if (!obj) return {};
const newObj: Record<string, any> = {};
for (const key in obj) {
if (typeof obj[key] === 'string') {
newObj[key] = this.replaceVariables(obj[key], variables);
} else if (typeof obj[key] === 'object' && obj[key] !== null) {
newObj[key] = this.replaceVariablesInObject(obj[key], variables);
} else {
newObj[key] = obj[key];
}
}
return newObj;
}
}
// 节点执行器接口 (已在4.1中定义,这里重复是为了上下文完整性)
// interface NodeExecutor {
// type: NodeType;
// execute: (node: WorkflowNode, context: ExecutionContext, inputData?: any)
// => Promise<NodeExecutionResult>;
// validateConfig?: (config: any) => ValidationResult;
// }
// HTTP节点执行器 (已在4.1中实现)
// class HttpNodeExecutor implements NodeExecutor { ... }
// 数据库节点执行器 (已在4.1中实现)
// class DatabaseNodeExecutor implements NodeExecutor { ... }
// 其他节点执行器示例 (伪代码)
// 条件节点执行器
class ConditionNodeExecutor implements NodeExecutor {
type: NodeType = 'condition';
async execute(node: WorkflowNode, context: ExecutionContext, inputData?: any) {
const config = node.config as ConditionNodeConfig;
// 假设 config.expression 是一个可以被执行的JavaScript表达式字符串
// 或者是一个自定义DSL,需要解析器进行解析
// 这里简化处理,直接评估一个布尔值
let conditionResult = false;
try {
// 在真实环境中,需要一个安全的沙箱环境来执行用户定义的表达式
// eval('return ' + this.replaceVariables(config.expression, context.variables))
// 简单示例:判断 inputData 是否为真
conditionResult = !!inputData; // 非常简化的逻辑
console.log(`条件节点 ${node.name} 评估结果: ${conditionResult}`);
} catch (e: any) {
throw new Error(`条件表达式评估失败: ${e.message}`);
}
// 条件节点通常不直接输出数据,而是通过不同端口引导流程
// 返回一个指示条件结果的元数据
return {
data: inputData, // 原封不动地传递输入数据
metadata: {
conditionMet: conditionResult,
outputPort: conditionResult ? 'true' : 'false' // 指示流程应从哪个端口继续
}
};
}
validateConfig(config: any): ValidationResult {
const errors: string[] = [];
if (!config.expression) {
errors.push('条件节点必须包含表达式');
}
// 更多表达式语法验证
return { isValid: errors.length === 0, errors };
}
}
// 循环节点执行器 (for-each 循环)
class LoopNodeExecutor implements NodeExecutor {
type: NodeType = 'loop';
async execute(node: WorkflowNode, context: ExecutionContext, inputData?: any) {
// 假设配置包含一个 `iterateOn` 字段,指定要循环的数组
// 例如:`iterateOn: 'inputData.items'`
const config = node.config as { iterateOn: string; nextNodeIdAfterLoop: string };
const itemsToIterate = config.iterateOn.split('.').reduce((obj: any, key: string) => obj && obj[key], inputData || context.variables);
if (!Array.isArray(itemsToIterate)) {
throw new Error(`循环节点 "${node.name}" 需要一个数组进行迭代,但得到的是: ${typeof itemsToIterate}`);
}
const loopResults: any[] = [];
for (const item of itemsToIterate) {
// 为每次循环创建新的子上下文或更新当前上下文
const loopContext = { ...context, variables: { ...context.variables, currentLoopItem: item } };
// 触发内部子工作流或直接执行连接到循环体入口的节点
// 这通常涉及到更复杂的子工作流调度逻辑
console.log(`Loop node "${node.name}" iterating on item:`, item);
// 实际中这里会调度一个 "子执行"
// await this.executeChildWorkflow(node.id + '_loop_body', loopContext, item);
loopResults.push({ item, status: 'processed' }); // 模拟结果
}
return {
data: loopResults,
metadata: {
totalIterations: itemsToIterate.length,
loopCompleted: true
}
};
}
validateConfig(config: any): ValidationResult {
const errors: string[] = [];
if (!config.iterateOn) {
errors.push('循环节点必须指定迭代的数组路径');
}
return { isValid: errors.length === 0, errors };
}
}
// 使用Git-like的版本控制
interface WorkflowVersion {
workflowId: string;
version: number;
createdAt: Date;
createdBy: string;
nodes: WorkflowNode[];
connections: Connection[];
variables: Record<string, any>;
metadata?: Record<string, any>;
}
// 版本管理服务
class WorkflowVersioningService {
private versions: Map<string, WorkflowVersion[]> = new Map(); // 模拟存储
async createVersion(workflow: Workflow, createdBy: string = 'system'): Promise<WorkflowVersion> {
const workflowVersions = this.versions.get(workflow.id) || [];
const latestVersion = workflowVersions.length > 0 ? workflowVersions[workflowVersions.length - 1] : null;
const newVersionNumber = (latestVersion?.version || 0) + 1;
const version: WorkflowVersion = {
workflowId: workflow.id,
version: newVersionNumber,
createdAt: new Date(),
createdBy: createdBy,
// 深拷贝当前工作流的定义
nodes: JSON.parse(JSON.stringify(workflow.nodes)),
connections: JSON.parse(JSON.stringify(workflow.connections)),
variables: JSON.parse(JSON.stringify(workflow.variables))
};
workflowVersions.push(version);
this.versions.set(workflow.id, workflowVersions); // 更新存储
console.log(`工作流 ${workflow.name} 创建新版本: ${newVersionNumber}`);
return version;
}
async getLatestVersion(workflowId: string): Promise<WorkflowVersion | null> {
const workflowVersions = this.versions.get(workflowId);
return workflowVersions && workflowVersions.length > 0 ? workflowVersions[workflowVersions.length - 1] : null;
}
async getVersion(workflowId: string, versionNumber: number): Promise<WorkflowVersion | null> {
const workflowVersions = this.versions.get(workflowId);
return workflowVersions?.find(v => v.version === versionNumber) || null;
}
async diffVersions(workflowId: string, sourceVersionNum: number, targetVersionNum: number): Promise<VersionDiff> {
const sourceVersion = await this.getVersion(workflowId, sourceVersionNum);
const targetVersion = await this.getVersion(workflowId, targetVersionNum);
if (!sourceVersion || !targetVersion) {
throw new Error('版本不存在');
}
// 实际的diff算法会比较 nodes, connections, variables 等的变化
// 这里仅为示例,返回模拟差异
const addedNodes = targetVersion.nodes.filter(tn => !sourceVersion.nodes.some(sn => sn.id === tn.id));
const removedNodes = sourceVersion.nodes.filter(sn => !targetVersion.nodes.some(tn => tn.id === sn.id));
const modifiedNodes = targetVersion.nodes.filter(tn =>
sourceVersion.nodes.some(sn => sn.id === tn.id && JSON.stringify(sn) !== JSON.stringify(tn))
);
return {
added: addedNodes.map(n => ({ type: 'node', id: n.id, name: n.name })),
removed: removedNodes.map(n => ({ type: 'node', id: n.id, name: n.name })),
modified: modifiedNodes.map(n => ({ type: 'node', id: n.id, name: n.name })),
conflicts: [] // 实际需要复杂的冲突检测
};
}
}
interface VersionDiff {
added: Array<{ type: string; id: string; name: string }>;
removed: Array<{ type: string; id: string; name: string }>;
modified: Array<{ type: string; id: string; name: string }>;
conflicts: Array<{ type: string; id: string; description: string }>;
}
import { createClient, RedisClientType } from 'redis'; // 假设使用redis npm包
import { Worker, WorkerPool } from 'workerpool'; // 模拟一个工作进程池库
class DistributedExecutionManager {
private redisClient: RedisClientType; // Redis客户端实例
private workerPool: any; // 模拟的WorkerPool实例
constructor() {
// 实际应用中需要配置Redis连接
this.redisClient = createClient({
url: process.env.REDIS_URL || 'redis://localhost:6379'
});
this.redisClient.connect().catch(console.error); // 连接Redis
// 模拟一个工作进程池
this.workerPool = {
maxWorkers: process.env.MAX_WORKERS ? parseInt(process.env.MAX_WORKERS) : 10,
workerScript: './execution-worker.js',
notify: () => console.log('Worker pool notified for new jobs.'),
// 模拟 run 方法
run: async (job: any) => {
console.log(`Worker processing job: ${job.jobId}`);
await sleep(Math.random() * 1000 + 500); // 模拟工作
if (Math.random() > 0.9) throw new Error('Simulated worker error');
return { status: 'completed', result: 'Worker processed successfully' };
}
};
}
// 调度工作流到队列
async scheduleWorkflow(workflowId: string, triggerData: any, priority: number = 0) {
const jobId = `job_${workflowId}_${Date.now()}`;
const jobPayload = {
jobId,
workflowId,
triggerData,
scheduledAt: new Date().toISOString(),
priority,
retryCount: 0
};
// 推送到优先级队列 (Redis Sorted Set)
await this.redisClient.zAdd(
'workflow_jobs',
{ score: priority, value: JSON.stringify(jobPayload) }
);
console.log(`工作流 ${workflowId} (Job: ${jobId}) 已调度,优先级: ${priority}`);
this.workerPool.notify(); // 通知工作进程有新任务
return jobId;
}
// 工作进程处理逻辑 (在单独的进程或Web Worker中运行)
async workerProcess() {
console.log('工作进程启动...');
while (true) {
try {
// 从优先级队列获取分数最低 (优先级最高) 的一个作业
const jobData = await this.redisClient.zPopMin('workflow_jobs', 1);
if (!jobData || jobData.length === 0) {
await sleep(1000); // 如果队列为空,等待1秒
continue;
}
const job = JSON.parse(jobData[0].value);
console.log(`工作进程获取到作业: ${job.jobId}`);
try {
// 模拟执行工作流
// 在实际中,这里会调用 WorkflowExecutionEngine.executeWorkflow
const executionResult = await this.workerPool.run(job); // 模拟执行
console.log(`作业 ${job.jobId} 执行完成:`, executionResult);
await this.recordExecutionResult(job.jobId, executionResult);
} catch (error: any) {
console.error(`作业 ${job.jobId} 执行失败: ${error.message}`);
await this.handleJobError(job.jobId, error);
// 根据重试策略决定是否重试
if (this.shouldRetry(job)) {
job.retryCount = (job.retryCount || 0) + 1;
const newPriority = job.priority + job.retryCount * 10; // 降低优先级
console.log(`作业 ${job.jobId} 重试 (第 ${job.retryCount} 次),新优先级: ${newPriority}`);
await this.redisClient.zAdd(
'workflow_jobs',
{ score: newPriority, value: JSON.stringify(job) }
);
} else {
console.log(`作业 ${job.jobId} 达到最大重试次数,不再重试。`);
}
}
} catch (e: any) {
console.error('工作进程发生未捕获错误:', e.message);
await sleep(5000); // 错误时等待更长时间
}
}
}
private async recordExecutionResult(jobId: string, result: any) {
// 存储执行结果到MongoDB或PostgreSQL
console.log(`记录作业 ${jobId} 的执行结果...`);
// 实际:await db.saveExecutionResult({ jobId, ...result });
}
private async handleJobError(jobId: string, error: any) {
// 记录错误日志,发送告警等
console.error(`处理作业 ${jobId} 的错误: ${error.message}`);
}
private shouldRetry(job: any): boolean {
const maxRetries = 3; // 最大重试次数
return (job.retryCount || 0) < maxRetries;
}
}
shallowRef和shallowReactive避免不必要的深度响应,特别是在处理大型、不频繁变化的图表数据时。computed缓存计算结果,避免重复计算。vm模块或Docker容器,防止恶意代码对服务器造成破坏。作为全栈架构师的技术建议:
| 角色 | 人数 | 主要职责 | 技能要求 |
|---|---|---|---|
| 前端开发工程师 | 2-3人 | Vue3开发、可视化组件、用户体验 | Vue3、TypeScript、D3/X6/GoJS、Element Plus |
| 后端开发工程师 | 2-3人 | API开发、工作流引擎、数据库设计、微服务 | Node.js/Go、PostgreSQL/MongoDB、Redis、ORM/GORM |
| DevOps工程师 | 1人 | 部署、监控、CI/CD、容器化 | Docker、Kubernetes、AWS/Aliyun、Terraform |
| 测试工程师 | 1人 | 自动化测试、性能测试、E2E测试 | Vitest、Playwright/Cypress、JMeter/K6 |
<template>
<div class="workflow-editor">
<div class="toolbar">
<el-button type="primary" @click="saveWorkflow">保存</el-button>
<el-button type="success" @click="runWorkflow">运行</el-button>
<el-button @click="undo" :disabled="!history.canUndo.value">撤销</el-button>
<el-button @click="redo" :disabled="!history.canRedo.value">重做</el-button>
<el-button @click="debugDialogVisible = true">调试面板</el-button>
</div>
<div class="editor-container">
<div class="node-palette">
<h3>节点库</h3>
<div
v-for="nodeType in availableNodeTypes"
:key="nodeType.type"
class="node-item"
draggable="true"
@dragstart="onDragStart($event, nodeType.type)"
>
<i :class="nodeType.icon"></i> {{ nodeType.label }}
</div>
</div>
<div
class="canvas-container"
ref="canvasRef"
@drop="onDrop"
@dragover.prevent
>
<!-- X6Graph 组件, 负责渲染流程图 -->
<X6Graph
:data="graphData"
@node:click="onNodeClick"
@edge:connected="onEdgeAdded"
@blank:click="onBlankClick"
/>
</div>
<div class="properties-panel" v-if="selectedNode">
<h3>节点属性 - {{ selectedNode.name }}</h3>
<el-form label-width="100px">
<el-form-item label="节点名称">
<el-input v-model="selectedNode.name" @input="history.saveState()"></el-input>
</el-form-item>
<!-- 动态加载节点配置组件 -->
<component
:is="getConfigComponent(selectedNode.type)"
v-model:config="selectedNode.config"
:node-id="selectedNode.id"
@config-change="history.saveState()" <!-- 配置变化时保存状态 -->
/>
<el-button type="danger" size="small" @click="deleteSelectedNode">删除节点</el-button>
</el-form>
</div>
</div>
<!-- 调试面板 Dialog -->
<el-dialog v-model="debugDialogVisible" title="工作流调试面板" width="80%">
<WorkflowDebugger :workflow-id="workflow.id" />
</el-dialog>
</div>
</template>
<script setup lang="ts">
import { ref, reactive, computed, onMounted, watch } from 'vue';
import { useWorkflowStore } from '@/stores/workflow';
import { ElMessage, ElMessageBox } from 'element-plus';
import type { Workflow, WorkflowNode, Connection } from '@/types/workflow';
import { useWorkflowEditor } from '@/composables/useWorkflowEditor'; // 导入核心逻辑
// 假设这些组件和类型已经定义
import X6Graph from '@/components/workflow/X6Graph.vue';
import WorkflowDebugger from '@/components/workflow/WorkflowDebugger.vue';
import HttpNodeConfigEditor from '@/components/node-config/HttpNodeConfigEditor.vue';
import DatabaseNodeConfigEditor from '@/components/node-config/DatabaseNodeConfigEditor.vue';
import ConditionNodeConfigEditor from '@/components/node-config/ConditionNodeConfigEditor.vue';
// ... 导入其他节点配置编辑器
// 节点类型列表及其显示信息
const availableNodeTypes = reactive([
{ type: 'trigger', label: '触发器', icon: 'el-icon-timer' },
{ type: 'http', label: 'HTTP请求', icon: 'el-icon-link' },
{ type: 'database', label: '数据库', icon: 'el-icon-coin' },
{ type: 'condition', label: '条件判断', icon: 'el-icon-question' },
{ type: 'loop', label: '循环', icon: 'el-icon-refresh-left' },
{ type: 'customFunction', label: '自定义函数', icon: 'el-icon-s-tools' },
]);
// 动态映射节点类型到对应的配置组件
const nodeConfigComponents: Record<string, any> = {
http: HttpNodeConfigEditor,
database: DatabaseNodeConfigEditor,
condition: ConditionNodeConfigEditor,
// ... 更多节点配置组件
};
const getConfigComponent = (nodeType: string) => {
return nodeConfigComponents[nodeType] || null; // 返回对应的Vue组件
};
// 组件逻辑
const workflowStore = useWorkflowStore(); // 假设存在一个Pinia Store
const canvasRef = ref<HTMLElement>();
const selectedNode = ref<WorkflowNode | null>(null);
const debugDialogVisible = ref(false);
// 使用Composition API管理工作流编辑状态和逻辑
const {
workflow,
addNode,
connectNodes,
validateWorkflow,
history,
deleteNodeById, // 假设 useWorkflowEditor 提供了删除节点的方法
updateNodePosition, // 假设 useWorkflowEditor 提供了更新节点位置的方法
} = useWorkflowEditor();
// 将工作流数据转换为 X6 或 GoJS 图表所需的数据格式
const graphData = computed(() => ({
nodes: workflow.nodes.map(node => ({
id: node.id,
shape: 'workflow-node', // 自定义节点形状
x: node.position.x,
y: node.position.y,
data: node, // 存储原始节点数据
ports: node.inputs.map(input => ({ id: input.portId, group: 'in', ...input }))
.concat(node.outputs.map(output => ({ id: output.portId, group: 'out', ...output }))),
// 更多 X6/GoJS 特有属性
})),
edges: workflow.connections.map(conn => ({
source: {
cell: conn.source.nodeId,
port: conn.source.port
},
target: {
cell: conn.target.nodeId,
port: conn.target.port
},
id: conn.id,
// 更多 X6/GoJS 特有属性
}))
}));
// 拖拽处理:当开始拖拽节点库中的节点时
const onDragStart = (event: DragEvent, nodeType: string) => {
event.dataTransfer!.setData('nodeType', nodeType);
event.dataTransfer!.effectAllowed = 'copy';
};
// 拖拽处理:当拖拽的节点放置到画布上时
const onDrop = (event: DragEvent) => {
event.preventDefault();
const nodeType = event.dataTransfer!.getData('nodeType');
if (nodeType && canvasRef.value) {
const rect = canvasRef.value.getBoundingClientRect();
const position = {
x: event.clientX - rect.left,
y: event.clientY - rect.top
};
const newNode = addNode(nodeType, position);
workflowStore.addNode(newNode); // 更新 Pinia store (如果需要)
history.saveState(); // 保存操作历史
}
};
// 节点点击处理
const onNodeClick = (nodeId: string) => {
const node = workflow.nodes.find(n => n.id === nodeId);
if (node) {
selectedNode.value = node;
}
};
// 画布空白处点击,取消节点选择
const onBlankClick = () => {
selectedNode.value = null;
};
// 连线添加处理 (由 X6Graph 组件发出)
const onEdgeAdded = (source: { cell: string; port: string }, target: { cell: string; port: string }) => {
connectNodes(source.cell, source.port, target.cell, target.port);
history.saveState(); // 保存操作历史
};
// 删除选中节点
const deleteSelectedNode = () => {
if (selectedNode.value) {
ElMessageBox.confirm(`确定要删除节点 "${selectedNode.value.name}" 吗?`, '警告', {
confirmButtonText: '确定',
cancelButtonText: '取消',
type: 'warning',
}).then(() => {
deleteNodeById(selectedNode.value!.id); // 调用 useWorkflowEditor 中的方法
selectedNode.value = null; // 清除选中状态
history.saveState(); // 保存操作历史
ElMessage.success('节点删除成功');
}).catch(() => {
// 用户取消
});
}
};
// 工作流操作:保存
const saveWorkflow = async () => {
const validation = validateWorkflow.value;
if (!validation.isValid) {
await ElMessageBox.alert(
`工作流存在以下问题:\n${validation.errors.join('\n')}`,
'验证失败',
{ type: 'error' }
);
return;
}
try {
// 实际的保存操作会调用后端API
await workflowStore.saveWorkflow(workflow);
history.saveState(); // 保存保存后的状态
ElMessage.success('工作流保存成功');
} catch (error: any) {
ElMessage.error('保存失败: ' + error.message);
}
};
// 工作流操作:运行
const runWorkflow = async () => {
const validation = validateWorkflow.value;
if (!validation.isValid) {
await ElMessageBox.alert(
`工作流存在问题,无法运行:\n${validation.errors.join('\n')}`,
'运行失败',
{ type: 'error' }
);
return;
}
try {
ElMessage.info('工作流开始执行...');
// 调用后端API执行工作流,并获取执行ID
const result = await workflowStore.executeWorkflow(workflow.id, {
// 传递触发数据,例如:当前时间、用户信息
triggerTime: new Date().toISOString()
});
if (result.status === 'running' || result.status === 'completed') {
ElMessage.success('工作流已成功调度执行,请查看调试面板');
debugDialogVisible.value = true; // 显示调试面板
} else {
ElMessage.warning('工作流执行可能存在异常');
}
} catch (error: any) {
ElMessage.error('执行失败: ' + error.message);
}
};
// 撤销/重做
const undo = () => {
history.undo();
selectedNode.value = null; // 撤销后取消节点选中
ElMessage.info('已撤销');
};
const redo = () => {
history.redo();
selectedNode.value = null; // 重做后取消节点选中
ElMessage.info('已重做');
};
// 生命周期钩子
onMounted(() => {
// 可以在这里初始化图表库的配置,如果 X6Graph 组件没有自行处理
// 例如:Graph.registerNode('workflow-node', { ... });
// 加载初始工作流 (如果存在)
workflowStore.loadInitialWorkflow('some-workflow-id').then((loadedWorkflow) => {
if (loadedWorkflow) {
Object.assign(workflow, loadedWorkflow);
history.saveState(); // 加载后保存初始状态
}
});
});
// 监听 Pinia Store 中的工作流变化,同步到本地响应式对象
watch(
() => workflowStore.currentWorkflow,
(newWorkflow) => {
if (newWorkflow && newWorkflow.id === workflow.id) { // 确保是当前编辑的工作流
// 避免无限循环更新,只在 store 中的数据与本地不同时才更新
if (JSON.stringify(newWorkflow) !== JSON.stringify(workflow)) {
Object.assign(workflow, newWorkflow);
}
}
},
{ deep: true }
);
// 监听本地工作流的每次“大”变化(例如节点增删、连线增删、节点配置变化),自动保存历史
// 这是一个简化的 debounce 示例,实际应使用专门的 debounce 工具函数
let saveHistoryTimeout: any = null;
watch(workflow, () => {
clearTimeout(saveHistoryTimeout);
saveHistoryTimeout = setTimeout(() => {
history.saveState();
console.log('Workflow state saved to history.');
}, 500); // 500ms 内没有新的变化才保存
}, { deep: true });
</script>
<style scoped>
.workflow-editor {
height: calc(100vh - 80px); /* 减去 Header 的高度 */
display: flex;
flex-direction: column;
background-color: var(--bg-light);
border-radius: 12px;
overflow: hidden;
box-shadow: 0 4px 20px rgba(0,0,0,0.08);
}
.toolbar {
padding: 15px 20px;
background: var(--card-bg);
border-bottom: 1px solid var(--border-color-base);
display: flex;
gap: 10px;
flex-wrap: wrap;
align-items: center;
box-shadow: 0 1px 4px rgba(0,0,0,0.04);
}
.editor-container {
flex: 1;
display: flex;
overflow: hidden;
}
.node-palette {
width: 250px;
background: var(--bg-darker);
border-right: 1px solid var(--border-color-base);
padding: 20px;
overflow-y: auto;
flex-shrink: 0;
}
.node-palette h3 {
margin-top: 0;
color: var(--primary-color);
font-size: 1.2rem;
padding-bottom: 10px;
border-bottom: 1px dashed var(--border-color-base);
margin-bottom: 20px;
}
.node-item {
padding: 12px 15px;
margin: 8px 0;
background: var(--card-bg);
border: 1px solid var(--border-color-base);
border-radius: 6px;
cursor: grab;
user-select: none;
display: flex;
align-items: center;
gap: 10px;
font-weight: 500;
color: var(--text-color-primary);
transition: all 0.2s ease;
}
.node-item i {
font-size: 1.1rem;
color: var(--primary-color);
}
.node-item:hover {
border-color: var(--primary-color);
background: var(--highlight-bg-info);
color: var(--primary-color);
transform: translateY(-2px);
box-shadow: 0 2px 8px rgba(0, 0, 0, 0.08);
}
.canvas-container {
flex: 1;
position: relative;
overflow: hidden;
background-color: var(--bg-light);
background-image: radial-gradient(var(--border-color-base) 1px, transparent 1px);
background-size: 20px 20px; /* 网格背景 */
}
.properties-panel {
width: 350px;
background: var(--card-bg);
border-left: 1px solid var(--border-color-base);
padding: 25px;
overflow-y: auto;
flex-shrink: 0;
box-shadow: -2px 0 8px rgba(0,0,0,0.03);
}
.properties-panel h3 {
margin-top: 0;
color: var(--primary-color);
font-size: 1.3rem;
padding-bottom: 15px;
border-bottom: 1px solid var(--border-color-base);
margin-bottom: 20px;
}
/* 覆盖 Element Plus 样式 */
:deep(.el-button) {
border-radius: 6px;
}
:deep(.el-input__inner), :deep(.el-textarea__inner) {
border-radius: 6px;
}
/* Debugger Dialog 样式 */
:deep(.el-dialog__header) {
background-color: var(--primary-color);
color: white;
padding: 15px 20px;
border-top-left-radius: 7px;
border-top-right-radius: 7px;
}
:deep(.el-dialog__title) {
color: white;
}
:deep(.el-dialog__headerbtn .el-dialog__close) {
color: white;
}
</style>
<template>
<div class="node-config-form">
<el-form
:model="formData"
label-width="100px"
ref="nodeConfigFormRef"
@validate="onValidate"
>
<!-- 动态渲染表单字段 -->
<template v-for="field in formFields" :key="field.name">
<!-- 文本输入 -->
<el-form-item
v-if="field.type === 'text' || field.type === 'textarea' || field.type === 'number'"
:label="field.label"
:prop="field.name"
:rules="field.required ? [{ required: true, message: field.label + '不能为空', trigger: 'blur' }] : []"
>
<el-input
v-model="formData[field.name]"
:type="field.type === 'textarea' ? 'textarea' : 'text'"
:placeholder="field.placeholder"
:rows="field.rows || 3"
@input="onConfigChange"
/>
</el-form-item>
<!-- 布尔开关 -->
<el-form-item
v-else-if="field.type === 'boolean'"
:label="field.label"
:prop="field.name"
>
<el-switch v-model="formData[field.name]" @change="onConfigChange"></el-switch>
</el-form-item>
<!-- 下拉选择 -->
<el-form-item
v-else-if="field.type === 'select'"
:label="field.label"
:prop="field.name"
:rules="field.required ? [{ required: true, message: field.label + '不能为空', trigger: 'change' }] : []"
>
<el-select
v-model="formData[field.name]"
style="width: 100%"
:placeholder="field.placeholder"
@change="onConfigChange"
>
<el-option
v-for="option in field.options"
:key="option.value"
:label="option.label"
:value="option.value"
/>
</el-select>
</el-form-item>
<!-- 代码编辑器 (需要单独引入 MonacoEditor 组件) -->
<el-form-item
v-else-if="field.type === 'code'"
:label="field.label"
:prop="field.name"
>
<MonacoEditor
v-model="formData[field.name]"
:language="field.language || 'javascript'"
:height="field.height || 200"
@update:modelValue="onConfigChange"
/>
</el-form-item>
<!-- 键值对编辑器 (需要单独引入 KeyValueEditor 组件) -->
<el-form-item
v-else-if="field.type === 'keyValue'"
:label="field.label"
:prop="field.name"
>
<KeyValueEditor
v-model="formData[field.name]"
:key-placeholder="field.keyPlaceholder || 'Key'"
:value-placeholder="field.valuePlaceholder || 'Value'"
@update:modelValue="onConfigChange"
/>
</el-form-item>
<!-- 条件表达式编辑器 (需要单独引入 ConditionBuilder 组件) -->
<el-form-item
v-else-if="field.type === 'conditionBuilder'"
:label="field.label"
:prop="field.name"
>
<ConditionBuilder
v-model="formData[field.name]"
:variables="availableVariables"
@update:modelValue="onConfigChange"
/>
</el-form-item>
</template>
<!-- 自定义插槽 -->
<slot name="custom-fields" :formData="formData"></slot>
</el-form>
</div>
</template>
<script setup lang="ts">
import { ref, watch, computed, onMounted } from 'vue';
import { ElMessage, ElForm } from 'element-plus'; // 导入 ElForm 类型
// 导入自定义组件 (需要实际创建这些组件)
import MonacoEditor from '@/components/editor/MonacoEditor.vue';
import KeyValueEditor from '@/components/editor/KeyValueEditor.vue';
import ConditionBuilder from '@/components/editor/ConditionBuilder.vue';
interface FormField {
name: string;
label: string;
type: 'text' | 'textarea' | 'select' | 'code' |
'keyValue' | 'conditionBuilder' | 'number' | 'boolean';
placeholder?: string;
defaultValue?: any;
required?: boolean;
rules?: any[]; // Element Plus Form Rules
options?: Array<{ label: string; value: any }>;
language?: string; // for 'code' type
height?: number; // for 'code' type
rows?: number; // for 'textarea' type
keyPlaceholder?: string; // for 'keyValue' type
valuePlaceholder?: string; // for 'keyValue' type
}
const props = defineProps<{
nodeId: string; // 节点ID,用于区分不同节点的配置
config: Record<string, any>; // 传入的节点配置数据
fields: FormField[]; // 动态生成的表单字段定义
}>();
const emit = defineEmits<{
'update:config': [value: Record<string, any>];
'config-change': [value: Record<string, any>]; // 每次配置变化时触发
'validate': [isValid: boolean, errors: string[]];
}>();
const formData = ref<Record<string, any>>({});
const nodeConfigFormRef = ref<InstanceType<typeof ElForm> | null>(null); // 表单实例引用
// 初始化表单数据
const initFormData = () => {
const data: Record<string, any> = {};
props.fields.forEach(field => {
// 优先使用传入的config中的值,其次使用field定义的defaultValue
if (props.config && props.config[field.name] !== undefined) {
data[field.name] = props.config[field.name];
} else if (field.defaultValue !== undefined) {
data[field.name] = field.defaultValue;
} else {
// 根据类型设置默认空值
if (field.type === 'number') data[field.name] = null;
else if (field.type === 'boolean') data[field.name] = false;
else if (field.type === 'keyValue') data[field.name] = [];
else if (field.type === 'code') data[field.name] = '';
else if (field.type === 'conditionBuilder') data[field.name] = { operator: 'and', conditions: [] };
else data[field.name] = '';
}
});
formData.value = reactive(data); // 使用 reactive 确保深度响应
};
// 监听 props.config 变化,更新 formData
watch(() => props.config, (newConfig) => {
// 仅当外部传入的config与当前 formData 不同时才更新,避免无限循环
if (JSON.stringify(newConfig) !== JSON.stringify(formData.value)) {
initFormData();
}
}, { deep: true, immediate: true }); // immediate: true 确保组件加载时立即执行
// 当 formData 变化时,通知父组件更新 config
const onConfigChange = () => {
emit('update:config', formData.value);
emit('config-change', formData.value); // 额外触发一个 change 事件
// 可以在这里触发一次验证,但通常在保存工作流时统一验证
};
// 验证函数 (提供给父组件调用)
const validateForm = async (): Promise<boolean> => {
if (!nodeConfigFormRef.value) return false;
try {
const isValid = await nodeConfigFormRef.value.validate();
// 额外处理自定义组件内部的验证,例如 MonacoEditor, KeyValueEditor, ConditionBuilder
// 这些组件需要自行暴露 validate 方法,并在父组件中调用
// let customComponentValid = true;
// if (someCustomComponentRef.value && typeof someCustomComponentRef.value.validate === 'function') {
// customComponentValid = await someCustomComponentRef.value.validate();
// }
// const finalValid = isValid && customComponentValid;
// emit('validate', finalValid, []); // 实际错误信息需要从子组件汇总
return isValid;
} catch (error) {
console.error('表单验证失败:', error);
// emit('validate', false, [error.message]);
return false;
}
};
// 当 Element Plus 表单触发验证时
const onValidate = (prop: string, isValid: boolean, message: string) => {
// 这里可以处理单个字段的验证状态,但我们通常在 `validateForm` 统一处理
};
// 获取可用变量(用于条件表达式编辑器等)
const availableVariables = computed(() => {
// 从全局 Store 或工作流上下文获取变量
// 示例:可以从 Pinia Store 中获取当前工作流的变量或上游节点的输出
// 假设有一个 `useWorkflowStore` 提供了获取当前工作流变量的方法
// const workflowStore = useWorkflowStore();
// const currentWorkflowVariables = workflowStore.currentWorkflow?.variables || {};
// const upstreamNodeOutputs = getUpstreamNodeOutputs(props.nodeId); // 需要实现
return [
{ name: 'workflow.id', label: '工作流ID', type: 'string' },
{ name: 'execution.id', label: '执行ID', type: 'string' },
{ name: 'trigger.data', label: '触发数据', type: 'object' },
{ name: 'output_previousNode.data', label: '上个节点输出', type: 'object' },
// 动态添加的用户变量或前置节点输出变量
// ...Object.keys(currentWorkflowVariables).map(key => ({ name: `workflow.variables.${key}`, label: key, type: 'any' }))
];
});
// 暴露方法给父组件
defineExpose({
validate: validateForm,
getData: () => formData.value
});
</script>
<style scoped>
.node-config-form {
padding: 10px 0;
}
:deep(.el-form-item) {
margin-bottom: 20px;
}
:deep(.el-form-item__label) {
font-weight: 500;
color: var(--text-color-primary);
}
:deep(.el-input), :deep(.el-textarea), :deep(.el-select) {
width: 100%;
}
</style>
作为全栈架构师,我建议:
在开发类似n8n的工作流平台时,关键成功因素不是技术复杂度,而是用户体验和生态系统的构建。
建议前期投入更多资源在:
Vue3的Composition API为这类复杂应用提供了极好的开发体验,配合TypeScript可以大幅提高代码质量和开发效率。
📋 开发检查清单:
1. ✅ 已完成技术架构设计
2. ✅ 已完成功能需求分析
3. ✅ 已完成技术栈选型
4. ✅ 已完成项目里程碑规划
5. ⏳ 需要开始团队组建和任务分配
6. ⏳ 需要开始原型开发和POC验证
7. ⏳ 需要制定详细API设计规范
8. ⏳ 需要建立开发环境和CI/CD流程
# 1. 创建Vue3项目
npm create vue@latest workflow-platform -- --typescript --router --pinia
# 2. 安装核心依赖
cd workflow-platform
npm install @antv/x6 @antv/x6-vue-shape # 或 gojs
npm install pinia vue-router
npm install element-plus @element-plus/icons-vue # 确保安装图标库
npm install dayjs axios lodash-es
# 3. 安装开发依赖
npm install -D @types/node vite-plugin-monaco-editor unplugin-auto-import @types/mermaid
# 4. 启动前端开发服务器
npm run dev
# 5. 后端启动(Node.js/Go) - 示例命令
# Node.js:
# mkdir backend && cd backend
# npm init -y
# npm install @nestjs/core @nestjs/platform-express typeorm pg redis ioredis @nestjs/mongoose mongoose
# nest start
# Go:
# mkdir backend-go && cd backend-go
# go mod init workflow-backend
# go get github.com/gin-gonic/gin github.com/go-redis/redis/v8 github.com/lib/pq github.com/jmoiron/sqlx github.com/joho/godotenv
# go run main.go