全面解析基于Vue3的企业级工作流自动化平台开发方案
| 功能模块 | 功能描述 | 优先级 | 技术实现要点 |
|---|---|---|---|
| 工作流编辑器 |
|
高优先级 | X6/GoJS + Vue3 Custom Renderer |
| 节点库管理 |
|
高优先级 | 插件化架构 + Vue动态组件 |
| 触发器系统 |
|
高优先级 | 事件驱动架构 + Job Scheduler |
// 工作流节点基础接口
interface WorkflowNode {
id: string;
type: NodeType;
name: string;
position: { x: number; y: number };
inputs: NodePort[];
outputs: NodePort[];
config: Record<string, any>;
}
// 使用Vue3 Composition API管理工作流状态
import { ref, reactive, computed } from 'vue';
import type { Workflow } from '@/types/workflow';
export function useWorkflowEditor() {
const workflow = reactive<Workflow>({
id: '',
name: '',
nodes: [],
connections: [],
variables: {},
triggers: []
});
const selectedNode = ref<WorkflowNode | null>(null);
// 添加节点
const addNode = (nodeType: NodeType, position: Position) => {
const newNode: WorkflowNode = {
id: `node_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`,
type: nodeType,
name: getDefaultNodeName(nodeType),
position,
inputs: getNodeInputs(nodeType),
outputs: getNodeOutputs(nodeType),
config: getDefaultNodeConfig(nodeType)
};
workflow.nodes.push(newNode);
return newNode;
};
// 连接节点
const connectNodes = (sourceId: string, sourcePort: string,
targetId: string, targetPort: string) => {
const connection = {
id: `conn_${Date.now()}`,
source: { nodeId: sourceId, port: sourcePort },
target: { nodeId: targetId, port: targetPort }
};
workflow.connections.push(connection);
};
// 验证工作流
const validateWorkflow = computed(() => {
const errors: string[] = [];
// 检查是否有孤立节点
const connectedNodeIds = new Set();
workflow.connections.forEach(conn => {
connectedNodeIds.add(conn.source.nodeId);
connectedNodeIds.add(conn.target.nodeId);
});
workflow.nodes.forEach(node => {
if (!connectedNodeIds.has(node.id) && node.type !== 'trigger') {
errors.push(`${node.name} (${node.id}) 是孤立节点`);
}
});
// 检查是否有环
if (hasCycle(workflow)) {
errors.push('工作流中存在循环依赖');
}
return {
isValid: errors.length === 0,
errors
};
});
return {
workflow,
selectedNode,
addNode,
connectNodes,
validateWorkflow
};
}
// 工作流执行引擎
class WorkflowExecutionEngine {
private executionQueue: ExecutionContext[] = [];
private nodeExecutors: Map<NodeType, NodeExecutor> = new Map();
private eventBus: EventEmitter;
constructor() {
this.eventBus = new EventEmitter();
this.registerDefaultExecutors();
}
// 注册节点执行器
registerNodeExecutor(type: NodeType, executor: NodeExecutor) {
this.nodeExecutors.set(type, executor);
}
// 执行工作流
async executeWorkflow(workflowId: string, triggerData?: any): Promise<ExecutionResult> {
const workflow = await this.loadWorkflow(workflowId);
const executionId = this.generateExecutionId();
// 创建执行上下文
const context: ExecutionContext = {
workflowId,
executionId,
startTime: new Date(),
status: 'running',
variables: {},
executionPath: [],
results: new Map()
};
// 获取触发器节点
const triggerNode = workflow.nodes.find(n => n.type === 'trigger');
if (!triggerNode) {
throw new Error('工作流没有触发器节点');
}
// 从触发器开始执行
await this.executeNode(triggerNode, context, triggerData);
// 异步执行后续节点
this.processExecutionQueue(context);
return {
executionId,
status: context.status,
startTime: context.startTime
};
}
// 执行单个节点
async executeNode(node: WorkflowNode, context: ExecutionContext, inputData?: any) {
const executor = this.nodeExecutors.get(node.type);
if (!executor) {
throw new Error(`未找到节点类型 ${node.type} 的执行器`);
}
try {
const startTime = Date.now();
context.executionPath.push({
nodeId: node.id,
startTime: new Date(startTime)
});
// 执行节点逻辑
const result = await executor.execute(node, context, inputData);
// 记录执行结果
context.results.set(node.id, {
data: result.data,
status: 'success',
executionTime: Date.now() - startTime,
metadata: result.metadata
});
// 查找后续节点
const nextNodes = this.findNextNodes(node, context.workflow);
// 调度后续节点
for (const nextNode of nextNodes) {
if (this.shouldExecuteNode(nextNode, result, context)) {
context.executionQueue.push({
node: nextNode,
inputData: result.data,
context
});
}
}
} catch (error) {
context.results.set(node.id, {
data: null,
status: 'failed',
error: error.message,
executionTime: Date.now() - startTime
});
// 错误处理策略
await this.handleExecutionError(node, context, error);
}
}
}
// 节点执行器接口
interface NodeExecutor {
type: NodeType;
execute: (node: WorkflowNode, context: ExecutionContext, inputData?: any)
=> Promise<NodeExecutionResult>;
validateConfig?: (config: any) => ValidationResult;
}
// HTTP节点执行器
class HttpNodeExecutor implements NodeExecutor {
type: NodeType = 'http';
async execute(node: WorkflowNode, context: ExecutionContext, inputData?: any) {
const config = node.config as HttpNodeConfig;
// 构建请求参数,支持变量替换
const url = this.replaceVariables(config.url, context.variables);
const headers = this.replaceVariablesInObject(config.headers, context.variables);
const body = this.replaceVariables(JSON.stringify(config.body), context.variables);
const response = await fetch(url, {
method: config.method,
headers,
body: config.method !== 'GET' ? body : undefined,
timeout: config.timeout
});
const data = await response.json();
return {
data,
metadata: {
statusCode: response.status,
headers: Object.fromEntries(response.headers.entries()),
responseTime: Date.now()
}
};
}
validateConfig(config: any): ValidationResult {
const errors: string[] = [];
if (!config.url) {
errors.push('URL不能为空');
}
try {
if (config.body) {
JSON.parse(JSON.stringify(config.body));
}
} catch {
errors.push('请求体必须是有效的JSON');
}
return {
isValid: errors.length === 0,
errors
};
}
private replaceVariables(text: string, variables: Record<string, any>): string {
if (!text) return text;
return text.replace(/\{\{([^}]+)\}\}/g, (match, variableName) => {
const value = this.getVariableValue(variableName, variables);
return value ?? '';
});
}
}
// 数据库节点执行器
class DatabaseNodeExecutor implements NodeExecutor {
type: NodeType = 'database';
async execute(node: WorkflowNode, context: ExecutionContext, inputData?: any) {
const config = node.config as DatabaseNodeConfig;
const connection = await this.getConnection(config.connectionId);
try {
let result: any;
switch (config.operation) {
case 'query':
result = await connection.query(config.query, config.parameters);
break;
case 'insert':
result = await connection.insert(config.table, inputData || config.data);
break;
case 'update':
result = await connection.update(
config.table,
inputData || config.data,
config.where
);
break;
case 'delete':
result = await connection.delete(config.table, config.where);
break;
}
return {
data: result,
metadata: {
rowCount: result.rowCount || result.length || 0,
operation: config.operation
}
};
} finally {
await connection.release();
}
}
}
// 使用Git-like的版本控制
interface WorkflowVersion {
workflowId: string;
version: number;
createdAt: Date;
createdBy: string;
nodes: WorkflowNode[];
connections: Connection[];
variables: Record<string, any>;
metadata?: Record<string, any>;
}
// 版本管理服务
class WorkflowVersioningService {
async createVersion(workflow: Workflow): Promise<WorkflowVersion> {
const latestVersion = await this.getLatestVersion(workflow.id);
const newVersionNumber = (latestVersion?.version || 0) + 1;
const version: WorkflowVersion = {
workflowId: workflow.id,
version: newVersionNumber,
createdAt: new Date(),
createdBy: workflow.updatedBy,
nodes: JSON.parse(JSON.stringify(workflow.nodes)),
connections: JSON.parse(JSON.stringify(workflow.connections)),
variables: JSON.parse(JSON.stringify(workflow.variables))
};
await this.saveVersion(version);
return version;
}
async diffVersions(sourceVersion: number, targetVersion: number): Promise<VersionDiff> {
// 计算两个版本之间的差异
const diff = await this.calculateDiff(sourceVersion, targetVersion);
// 支持多种diff类型
return {
added: diff.added || [],
removed: diff.removed || [],
modified: diff.modified || [],
conflicts: diff.conflicts || []
};
}
}
// 使用Redis队列和工作进程池
class DistributedExecutionManager {
private redisClient: Redis;
private workerPool: WorkerPool;
constructor() {
this.redisClient = new Redis(process.env.REDIS_URL);
this.workerPool = new WorkerPool({
maxWorkers: process.env.MAX_WORKERS || 10,
workerScript: './execution-worker.js'
});
}
async scheduleWorkflow(workflowId: string, priority: number = 0) {
const jobId = `job_${workflowId}_${Date.now()}`;
// 推送到优先级队列
await this.redisClient.zadd(
'workflow_jobs',
priority,
JSON.stringify({
jobId,
workflowId,
scheduledAt: new Date(),
priority
})
);
// 触发工作进程
this.workerPool.notify();
return jobId;
}
// 工作进程处理逻辑
async workerProcess() {
while (true) {
// 从队列获取作业
const jobData = await this.redisClient.zpopmin('workflow_jobs');
if (!jobData) {
await sleep(1000); // 等待1秒
continue;
}
const job = JSON.parse(jobData[0]);
try {
// 执行工作流
const result = await this.executeWorkflow(job.workflowId);
// 记录执行结果
await this.recordExecutionResult(job.jobId, result);
} catch (error) {
// 错误处理
await this.handleJobError(job.jobId, error);
// 根据重试策略决定是否重试
if (this.shouldRetry(job)) {
await this.scheduleWorkflow(
job.workflowId,
job.priority + 1 // 降低优先级
);
}
}
}
}
}
shallowRef和shallowReactive避免不必要的深度响应computed缓存计算结果作为全栈架构师的技术建议:
| 角色 | 人数 | 主要职责 | 技能要求 |
|---|---|---|---|
| 前端开发工程师 | 2-3人 | Vue3开发、可视化组件、用户体验 | Vue3、TypeScript、D3/X6 |
| 后端开发工程师 | 2-3人 | API开发、工作流引擎、数据库设计 | Node.js/Go、数据库、Redis |
| DevOps工程师 | 1人 | 部署、监控、CI/CD | Docker、Kubernetes、AWS/Aliyun |
| 测试工程师 | 1人 | 自动化测试、性能测试 | 测试框架、性能监控 |
<template>
<div class="workflow-editor">
<div class="toolbar">
<el-button @click="saveWorkflow">保存</el-button>
<el-button @click="runWorkflow">运行</el-button>
<el-button @click="undo">撤销</el-button>
<el-button @click="redo">重做</el-button>
</div>
<div class="editor-container">
<div class="node-palette">
<h3>节点库</h3>
<div
v-for="nodeType in availableNodeTypes"
:key="nodeType"
class="node-item"
draggable="true"
@dragstart="onDragStart($event, nodeType)"
>
{{ getNodeTypeLabel(nodeType) }}
</div>
</div>
<div
class="canvas-container"
ref="canvasRef"
@drop="onDrop"
@dragover.prevent
>
<x6-graph
:data="graphData"
@node:click="onNodeClick"
@edge:added="onEdgeAdded"
/>
</div>
<div class="properties-panel" v-if="selectedNode">
<h3>节点属性</h3>
<component
:is="getConfigComponent(selectedNode.type)"
v-model="selectedNode.config"
/>
</div>
</div>
<-- 调试面板 -->
<el-dialog v-model="debugDialogVisible" title="工作流调试">
<WorkflowDebugger :workflow="workflow" />
</el-dialog>
</div>
</template>
<script setup lang="ts">
import { ref, reactive, computed, onMounted, watch } from 'vue';
import { useWorkflowStore } from '@/stores/workflow';
import { ElMessage, ElMessageBox } from 'element-plus';
import type { Workflow, WorkflowNode, Connection } from '@/types/workflow';
// 组件逻辑
const workflowStore = useWorkflowStore();
const canvasRef = ref<HTMLElement>();
const selectedNode = ref<WorkflowNode | null>(null);
const debugDialogVisible = ref(false);
// 使用Composition API管理状态
const {
workflow,
addNode,
connectNodes,
validateWorkflow,
history
} = useWorkflowEditor();
const graphData = computed(() => ({
nodes: workflow.nodes.map(node => ({
id: node.id,
shape: 'workflow-node',
x: node.position.x,
y: node.position.y,
data: node
})),
edges: workflow.connections.map(conn => ({
source: {
cell: conn.source.nodeId,
port: conn.source.port
},
target: {
cell: conn.target.nodeId,
port: conn.target.port
}
}))
}));
// 拖拽处理
const onDragStart = (event: DragEvent, nodeType: string) => {
event.dataTransfer!.setData('nodeType', nodeType);
event.dataTransfer!.effectAllowed = 'copy';
};
const onDrop = (event: DragEvent) => {
event.preventDefault();
const nodeType = event.dataTransfer!.getData('nodeType');
if (nodeType && canvasRef.value) {
const rect = canvasRef.value.getBoundingClientRect();
const position = {
x: event.clientX - rect.left,
y: event.clientY - rect.top
};
const newNode = addNode(nodeType, position);
workflowStore.addNode(newNode);
}
};
// 节点点击处理
const onNodeClick = (nodeId: string) => {
const node = workflow.nodes.find(n => n.id === nodeId);
if (node) {
selectedNode.value = node;
}
};
// 连线添加处理
const onEdgeAdded = (source: any, target: any) => {
connectNodes(source.cell, source.port, target.cell, target.port);
};
// 工作流操作
const saveWorkflow = async () => {
const validation = validateWorkflow.value;
if (!validation.isValid) {
await ElMessageBox.alert(
`工作流存在以下问题:\n${validation.errors.join('\n')}`,
'验证失败'
);
return;
}
try {
await workflowStore.saveWorkflow(workflow);
ElMessage.success('保存成功');
} catch (error) {
ElMessage.error('保存失败: ' + error.message);
}
};
const runWorkflow = async () => {
try {
const result = await workflowStore.executeWorkflow(workflow.id);
if (result.status === 'completed') {
ElMessage.success('工作流执行成功');
debugDialogVisible.value = true;
} else {
ElMessage.warning('工作流执行异常');
}
} catch (error) {
ElMessage.error('执行失败: ' + error.message);
}
};
// 撤销/重做
const undo = () => {
if (history.canUndo) {
history.undo();
}
};
const redo = () => {
if (history.canRedo) {
history.redo();
}
};
// 生命周期
onMounted(() => {
// 初始化图表库
initGraphLibrary();
});
// 监听工作流变化
watch(
() => workflowStore.currentWorkflow,
(newWorkflow) => {
if (newWorkflow) {
Object.assign(workflow, newWorkflow);
}
},
{ deep: true }
);
</script>
<style scoped>
.workflow-editor {
height: 100vh;
display: flex;
flex-direction: column;
}
.toolbar {
padding: 10px 20px;
background: #fff;
border-bottom: 1px solid #e4e7ed;
display: flex;
gap: 10px;
}
.editor-container {
flex: 1;
display: flex;
overflow: hidden;
}
.node-palette {
width: 200px;
background: #f5f7fa;
border-right: 1px solid #e4e7ed;
padding: 20px;
overflow-y: auto;
}
.node-item {
padding: 10px;
margin: 5px 0;
background: white;
border: 1px solid #dcdfe6;
border-radius: 4px;
cursor: move;
user-select: none;
}
.node-item:hover {
border-color: #409eff;
background: #ecf5ff;
}
.canvas-container {
flex: 1;
position: relative;
overflow: hidden;
}
.properties-panel {
width: 300px;
background: white;
border-left: 1px solid #e4e7ed;
padding: 20px;
overflow-y: auto;
}
</style>
<template>
<div class="node-config-form">
<el-form
:model="formData"
label-width="100px"
@validate="onValidate"
>
<!-- 动态渲染表单字段 -->
<template v-for="field in formFields" :key="field.name">
<!-- 文本输入 -->
<el-form-item
v-if="field.type === 'text' || field.type === 'textarea'"
:label="field.label"
:prop="field.name"
:rules="field.rules"
>
<el-input
v-model="formData[field.name]"
:type="field.type"
:placeholder="field.placeholder"
:rows="field.rows"
/>
</el-form-item>
<!-- 下拉选择 -->
<el-form-item
v-else-if="field.type === 'select'"
:label="field.label"
:prop="field.name"
>
<el-select
v-model="formData[field.name]"
style="width: 100%"
>
<el-option
v-for="option in field.options"
:key="option.value"
:label="option.label"
:value="option.value"
/>
</el-select>
</el-form-item>
<!-- 代码编辑器 -->
<el-form-item
v-else-if="field.type === 'code'"
:label="field.label"
:prop="field.name"
>
<MonacoEditor
v-model="formData[field.name]"
:language="field.language || 'javascript'"
:height="field.height || 200"
/>
</el-form-item>
<!-- 键值对编辑器 -->
<el-form-item
v-else-if="field.type === 'keyValue'"
:label="field.label"
>
<KeyValueEditor
v-model="formData[field.name]"
:key-placeholder="field.keyPlaceholder"
:value-placeholder="field.valuePlaceholder"
/>
</el-form-item>
<!-- 条件表达式编辑器 -->
<el-form-item
v-else-if="field.type === 'condition'"
:label="field.label"
>
<ConditionBuilder
v-model="formData[field.name]"
:variables="availableVariables"
/>
</el-form-item>
</template>
<!-- 自定义插槽 -->
<slot name="custom-fields"></slot>
</el-form>
</div>
</template>
<script setup lang="ts">
import { ref, watch, computed } from 'vue';
import MonacoEditor from '@/components/editor/MonacoEditor.vue';
import KeyValueEditor from '@/components/editor/KeyValueEditor.vue';
import ConditionBuilder from '@/components/editor/ConditionBuilder.vue';
interface FormField {
name: string;
label: string;
type: 'text' | 'textarea' | 'select' | 'code' |
'keyValue' | 'condition' | 'number' | 'boolean';
placeholder?: string;
defaultValue?: any;
required?: boolean;
rules?: any[];
options?: Array<{ label: string; value: any }>;
language?: string;
height?: number;
keyPlaceholder?: string;
valuePlaceholder?: string;
}
const props = defineProps<{
nodeType: string;
modelValue: Record<string, any>;
fields: FormField[];
}>();
const emit = defineEmits<{
'update:modelValue': [value: Record<string, any>];
'validate': [isValid: boolean, errors: string[]];
}>();
const formData = ref<Record<string, any>>({});
const validationErrors = ref<string[]>([]);
// 初始化表单数据
const initFormData = () => {
const data: Record<string, any> = {};
props.fields.forEach(field => {
// 使用传入的值,否则使用默认值
if (props.modelValue[field.name] !== undefined) {
data[field.name] = props.modelValue[field.name];
} else {
data[field.name] = field.defaultValue;
}
});
formData.value = data;
};
// 监听表单变化
watch(formData, (newValue) => {
emit('update:modelValue', newValue);
}, { deep: true });
// 验证函数
const validateForm = () => {
const errors: string[] = [];
props.fields.forEach(field => {
if (field.required && !formData.value[field.name]) {
errors.push(`${field.label}不能为空`);
}
// 执行自定义验证规则
if (field.rules) {
field.rules.forEach(rule => {
const result = rule.validate(formData.value[field.name]);
if (!result) {
errors.push(`${field.label}: ${rule.message}`);
}
});
}
});
validationErrors.value = errors;
emit('validate', errors.length === 0, errors);
return errors.length === 0;
};
// 获取可用变量(用于条件表达式编辑器)
const availableVariables = computed(() => {
// 从Store或上下文获取变量
return [
{ name: 'workflow.id', label: '工作流ID', type: 'string' },
{ name: 'execution.id', label: '执行ID', type: 'string' },
{ name: 'timestamp', label: '时间戳', type: 'number' },
// 动态添加的用户变量
];
});
// 生命周期
onMounted(() => {
initFormData();
});
// 暴露方法给父组件
defineExpose({
validate: validateForm,
getData: () => formData.value
});
</script>
<style scoped>
.node-config-form {
padding: 10px 0;
}
:deep(.el-form-item) {
margin-bottom: 20px;
}
:deep(.el-form-item__label) {
font-weight: 500;
}
</style>
作为全栈架构师,我建议:
在开发类似n8n的工作流平台时,关键成功因素不是技术复杂度,而是用户体验和生态系统的构建。
建议前期投入更多资源在:
Vue3的Composition API为这类复杂应用提供了极好的开发体验,配合TypeScript可以大幅提高代码质量和开发效率。
📋 开发检查清单:
1. ✅ 已完成技术架构设计
2. ✅ 已完成功能需求分析
3. ✅ 已完成技术栈选型
4. ✅ 已完成项目里程碑规划
5. ⏳ 需要开始团队组建和任务分配
6. ⏳ 需要开始原型开发和POC验证
7. ⏳ 需要制定详细API设计规范
8. ⏳ 需要建立开发环境和CI/CD流程
# 1. 创建Vue3项目
npm create vue@latest workflow-platform -- --typescript --router --pinia
# 2. 安装核心依赖
cd workflow-platform
npm install @antv/x6 @antv/x6-vue-shape
npm install pinia vue-router
npm install element-plus @element-plus/icons-vue
npm install dayjs axios lodash-es
# 3. 安装开发依赖
npm install -D @types/node vite-plugin-monaco-editor unplugin-auto-import
# 4. 启动开发服务器
npm run dev
# 5. 后端启动(Node.js/Go)
# Node.js: npm init -y && npm install nestjs typeorm
# Go: go mod init workflow-backend && go get github.com/gin-gonic/gin
架构设计文档 | 版权所有 © 2024 | 资深全栈架构师提供
本技术方案基于Vue3生态,结合n8n最佳实践,为企业级工作流自动化平台提供完整实现路径。 建议实施周期为3-4个月,团队规模为5-8人。