news 2026/4/23 11:29:32

新手进阶Python:办公看板集成跨系统联动+可视化任务编排+故障自愈

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
新手进阶Python:办公看板集成跨系统联动+可视化任务编排+故障自愈

大家好!我是CSDN的Python新手博主~ 上一篇我们完成了看板的AI异常预警与全流程审计,解决了安全合规与风险防控需求,但甲方客户反馈两大核心痛点:① 多系统数据孤立,ERP的订单数据、OA的审批数据、CRM的客户数据无法自动联动,需手动下载上传,效率低下且易出错;② 定时任务(如每日9点同步ERP数据、每周五推送报表)配置复杂,需开发人员修改代码,非技术人员无法操作,且任务失败后无自动重试、兜底机制,需人工排查修复;③ 任务流程无可视化编排能力,多步骤任务(如“同步数据→清洗处理→生成报表→邮件推送”)无法灵活配置执行顺序与依赖关系。今天就带来超落地的新手实战项目——办公看板集成跨系统联动+可视化任务编排+故障自愈!

本次基于之前的“风控审计看板”代码,新增3大核心功能:① 跨系统智能联动(适配ERP/OA/CRM等主流系统API,支持密钥/OAuth2.0鉴权,实现数据自动同步与双向交互);② 可视化任务编排(拖拽式配置任务流程,支持单任务定时执行、多任务串行/并行依赖,非技术人员也能操作);③ 故障自愈机制(任务失败自动重试、异常告警、兜底方案执行,减少人工干预)。全程基于现有技术栈(Flask+MySQL+Celery+ECharts),新增跨系统适配模块、任务编排引擎、故障处理工具,代码注释详细,新手只需配置系统API信息与任务规则,跟着步骤复制操作就能成功,让看板成为企业全链路自动化办公的核心中枢~

一、本次学习目标

  1. 掌握跨系统API集成技巧,适配不同鉴权方式(API密钥、OAuth2.0),实现多系统数据自动同步、双向交互与格式转换;

  2. 学会可视化任务编排设计,基于拖拽组件搭建任务流程,支持定时触发、依赖执行、条件分支,实现任务灵活配置;

  3. 理解故障自愈机制,实现任务失败自动重试(按策略调整间隔)、异常告警(邮件/看板通知)、兜底方案执行,提升系统稳定性;

  4. 实现联动、编排、故障处理功能闭环,跨系统数据联动触发任务流程,任务异常自动告警并自愈,全程留痕可追溯;

  5. 适配企业实际场景,支持任务权限管控、流程导出导入、执行日志分析,满足多角色协同与自动化办公需求。

二、前期准备

  1. 安装核心依赖库

安装核心依赖(任务调度、跨系统鉴权、前端拖拽)

pip3 install celery redis requests-oauthlib python-dotenv flask-dragdrop -i https://pypi.tuna.tsinghua.edu.cn/simple

确保已有依赖正常(Flask、ECharts、报表导出等)

pip3 install --upgrade flask flask-login flask-sqlalchemy apscheduler openpyxl reportlab pymysql -i https://pypi.tuna.tsinghua.edu.cn/simple

说明:跨系统联动用requests/requests-oauthlib处理API请求与鉴权;任务编排用Celery+Redis实现分布式调度(比APScheduler更适配复杂任务依赖);前端可视化拖拽基于flask-dragdrop扩展,无需额外开发复杂组件。

  1. 第三方服务与配置准备
  • 跨系统API配置:梳理需联动的系统(如ERP、OA、CRM),获取API地址、鉴权方式(API密钥/OAuth2.0的client_id/client_secret)、请求参数与返回格式,测试API连通性,将配置信息存入.env文件(避免硬编码);

  • 任务规则配置:定义任务类型(单任务/流程任务)、触发方式(定时触发/Cron表达式/数据联动触发)、执行策略(串行/并行)、依赖关系(如“报表生成”依赖“数据同步”完成);

  • 故障处理配置:设置重试策略(重试次数、间隔递增规则,如1分钟→3分钟→5分钟)、告警方式(邮件/看板弹窗)、兜底方案(如任务失败后执行备用接口、生成异常报表);

  • 环境准备:启动Redis服务(作为Celery消息队列,用于任务调度),确保云服务器开放对应端口,允许跨系统API访问(配置防火墙规则)。

  1. 数据库表优化与创建

– 连接MySQL数据库(替换为你的数据库信息)
mysql -u office_user -p -h 47.108.xxx.xxx office_data

– 创建跨系统配置表(system_config)
CREATE TABLE system_config (
id INT AUTO_INCREMENT PRIMARY KEY,
system_name VARCHAR(50) NOT NULL COMMENT ‘系统名称(如ERP、OA、CRM)’,
api_base_url VARCHAR(255) NOT NULL COMMENT ‘API基础地址’,
auth_type ENUM(‘api_key’, ‘oauth2’) NOT NULL COMMENT ‘鉴权方式’,
auth_config TEXT NOT NULL COMMENT ‘鉴权配置(JSON格式,含密钥/OAuth2参数)’,
sync_fields TEXT NOT NULL COMMENT ‘同步字段映射(JSON格式,本地字段→系统字段)’,
is_enable TINYINT(1) DEFAULT 1 COMMENT ‘是否启用(1-启用,0-禁用)’,
remark TEXT NULL COMMENT ‘备注(如系统版本、API限制)’,
create_time DATETIME DEFAULT CURRENT_TIMESTAMP COMMENT ‘创建时间’,
update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT ‘更新时间’,
UNIQUE KEY uk_system_name (system_name)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT=‘跨系统联动配置表’;

– 创建任务表(task_info)
CREATE TABLE task_info (
id INT AUTO_INCREMENT PRIMARY KEY,
task_name VARCHAR(100) NOT NULL COMMENT ‘任务名称’,
task_type ENUM(‘single’, ‘flow’) NOT NULL COMMENT ‘任务类型(单任务/流程任务)’,
trigger_type ENUM(‘timed’, ‘cron’, ‘data_link’) NOT NULL COMMENT ‘触发方式(定时/ Cron/数据联动)’,
trigger_config TEXT NULL COMMENT ‘触发配置(JSON格式,定时时间/Cron表达式/联动条件)’,
execute_config TEXT NOT NULL COMMENT ‘执行配置(JSON格式,API地址、参数、执行策略)’,
system_id INT NOT NULL COMMENT ‘关联系统ID(对应system_config表)’,
retry_strategy TEXT NOT NULL COMMENT ‘重试策略(JSON格式,次数、间隔)’,
fail_handler TEXT NULL COMMENT ‘失败处理(JSON格式,告警方式、兜底方案)’,
creator_id INT NOT NULL COMMENT ‘创建人ID’,
is_enable TINYINT(1) DEFAULT 1 COMMENT ‘是否启用’,
create_time DATETIME DEFAULT CURRENT_TIMESTAMP COMMENT ‘创建时间’,
update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT ‘更新时间’,
UNIQUE KEY uk_task_name (task_name),
FOREIGN KEY (system_id) REFERENCES system_config(id) ON DELETE CASCADE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT=‘任务信息表’;

– 创建任务流程表(task_flow)
CREATE TABLE task_flow (
id INT AUTO_INCREMENT PRIMARY KEY,
flow_name VARCHAR(100) NOT NULL COMMENT ‘流程名称’,
task_ids TEXT NOT NULL COMMENT ‘包含的任务ID(JSON格式,按执行顺序排列)’,
dependencies TEXT NULL COMMENT ‘任务依赖关系(JSON格式,如{“2”:[“1”]}表示任务2依赖任务1)’,
execute_mode ENUM(‘serial’, ‘parallel’) DEFAULT ‘serial’ COMMENT ‘执行模式(串行/并行)’,
creator_id INT NOT NULL COMMENT ‘创建人ID’,
is_enable TINYINT(1) DEFAULT 1 COMMENT ‘是否启用’,
create_time DATETIME DEFAULT CURRENT_TIMESTAMP COMMENT ‘创建时间’,
update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT ‘更新时间’,
UNIQUE KEY uk_flow_name (flow_name)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT=‘任务流程编排表’;

– 创建任务执行日志表(task_execute_log)
CREATE TABLE task_execute_log (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
task_id INT NULL COMMENT ‘关联任务ID(单任务)’,
flow_id INT NULL COMMENT ‘关联流程ID(流程任务)’,
execute_status ENUM(‘pending’, ‘running’, ‘success’, ‘fail’, ‘retrying’, ‘skipped’) DEFAULT ‘pending’ COMMENT ‘执行状态’,
execute_time DATETIME DEFAULT CURRENT_TIMESTAMP COMMENT ‘执行时间’,
finish_time DATETIME NULL COMMENT ‘结束时间’,
execute_result TEXT NULL COMMENT ‘执行结果(JSON格式,返回数据/错误信息)’,
retry_count INT DEFAULT 0 COMMENT ‘重试次数’,
handler_info TEXT NULL COMMENT ‘故障处理信息(JSON格式,告警记录/兜底执行结果)’,
audit_id BIGINT NULL COMMENT ‘关联审计ID’,
KEY idx_task_id (task_id),
KEY idx_flow_id (flow_id),
KEY idx_execute_status (execute_status),
KEY idx_execute_time (execute_time),
FOREIGN KEY (audit_id) REFERENCES operation_audit(id) ON DELETE SET NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT=‘任务执行日志表’;

– 初始化跨系统配置示例(ERP系统)
INSERT INTO system_config (system_name, api_base_url, auth_type, auth_config, sync_fields, remark)
VALUES (
‘ERP系统’,
‘https://erp.example.com/api/v1’,
‘api_key’,
‘{“api_key”:“ERP_API_KEY_123456”,“api_secret”:“ERP_SECRET_654321”}’,
‘{“local_order_id”:“erp_order_code”,“local_amount”:“erp_total_amount”,“local_status”:“erp_order_status”}’,
‘企业ERP系统,用于同步订单数据’
);

– 初始化测试任务
INSERT INTO task_info (task_name, task_type, trigger_type, trigger_config, execute_config, system_id, retry_strategy, fail_handler, creator_id)
VALUES (
‘ERP订单数据同步’,
‘single’,
‘cron’,
‘{“cron”:“0 9 * * *”}’, – 每日9点执行
‘{“api_path”:“/order/list”,“method”:“GET”,“params”:{“page_size”:100},“data_format”:“json”}’,
1,
‘{“retry_count”:3,“retry_intervals”:[60, 180, 300]}’, – 重试3次,间隔1/3/5分钟
‘{“alert_type”:“email”,“fallback_task_id”:null}’,
1 – 假设创建人ID为1(管理员)
);

三、实战:跨系统联动+任务编排+故障自愈集成

  1. 第一步:搭建跨系统API联动引擎,实现多系统数据互通

-- coding: utf-8 --

cross_system_engine.py 跨系统API联动引擎

import requests
import json
import time
from requests_oauthlib import OAuth2Session
from models import SystemConfig, db
from dotenv import load_dotenv
import os

加载环境变量

load_dotenv()

通用请求头

DEFAULT_HEADERS = {“Content-Type”: “application/json; charset=utf-8”}

class CrossSystemClient:
“”“跨系统API客户端,适配不同鉴权方式与数据格式”“”
definit(self, system_id):
“”“初始化客户端,加载系统配置”“”
self.system_config = SystemConfig.query.get(system_id)
if not self.system_config or not self.system_config.is_enable:
raise Exception(f"系统配置不存在或已禁用(ID:{system_id})")
self.api_base_url = self.system_config.api_base_url
self.auth_type = self.system_config.auth_type
self.auth_config = json.loads(self.system_config.auth_config)
self.sync_fields = json.loads(self.system_config.sync_fields)
# 初始化鉴权信息
self.auth_headers = self._init_auth_headers()

def _init_auth_headers(self): """初始化鉴权请求头,适配api_key/oauth2""" if self.auth_type == "api_key": # API密钥鉴权(通常在请求头或参数中携带) api_key = self.auth_config.get("api_key") api_secret = self.auth_config.get("api_secret") if not api_key: raise Exception("API密钥配置缺失") return {**DEFAULT_HEADERS, "X-API-Key": api_key, "X-API-Secret": api_secret} elif self.auth_type == "oauth2": # OAuth2.0鉴权(获取access_token) return self._get_oauth2_headers() else: raise Exception(f"不支持的鉴权方式:{self.auth_type}") def _get_oauth2_headers(self): """获取OAuth2.0的access_token,生成鉴权头""" oauth_config = self.auth_config required_params = ["client_id", "client_secret", "token_url"] if not all(param in oauth_config for param in required_params): raise Exception("OAuth2.0配置缺失关键参数") oauth_session = OAuth2Session(client_id=oauth_config["client_id"]) token = oauth_session.fetch_token( token_url=oauth_config["token_url"], client_secret=oauth_config["client_secret"], grant_type=oauth_config.get("grant_type", "client_credentials") ) if not token.get("access_token"): raise Exception("OAuth2.0获取access_token失败") return {**DEFAULT_HEADERS, "Authorization": f"Bearer {token['access_token']}"} def _format_response_data(self, response_data): """格式化返回数据,按字段映射转换为本地格式""" if not isinstance(response_data, dict) and not isinstance(response_data, list): return response_data # 单条数据处理 def format_single_data(data): formatted_data = {} for local_field, system_field in self.sync_fields.items(): # 支持嵌套字段(如erp_data.order_code) if "." in system_field: keys = system_field.split(".") value = data for key in keys: value = value.get(key) if isinstance(value, dict) else None if value is None: break formatted_data[local_field] = value else: formatted_data[local_field] = data.get(system_field) return formatted_data # 多条数据处理 if isinstance(response_data, list): return [format_single_data(item) for item in response_data] return format_single_data(response_data) def request(self, api_path, method="GET", params=None, data=None, retry_count=0): """ 通用API请求方法 :param api_path: API路径(相对于基础地址) :param method: 请求方法(GET/POST/PUT/DELETE) :param params: URL参数(GET请求用) :param data: 请求体数据(POST/PUT请求用) :param retry_count: 当前重试次数 :return: 格式化后的返回数据 """ url = f"{self.api_base_url}{api_path}" method = method.upper() params = params or {} data = json.dumps(data) if data and method in ["POST", "PUT"] else None try: print(f"发起跨系统请求:{method} {url},参数:{params},数据:{data}") response = requests.request( method=method, url=url, headers=self.auth_headers, params=params, data=data, timeout=30 ) response.raise_for_status() # 触发HTTP错误(状态码≥400) response_data = response.json() # 格式化数据并返回 return self._format_response_data(response_data) except Exception as e: # 重试逻辑(若未超过最大重试次数) max_retry = self.auth_config.get("max_retry", 1) if retry_count < max_retry: retry_interval = self.auth_config.get("retry_interval", 5) print(f"请求失败({str(e)}),{retry_interval}秒后重试(第{retry_count+1}次)") time.sleep(retry_interval) return self.request(api_path, method, params, data, retry_count+1) # 重试失败,抛出异常 raise Exception(f"跨系统请求失败(重试{max_retry}次后仍失败):{str(e)}") def sync_data(self, api_path, method="GET", params=None, data=None): """数据同步方法(封装请求,返回格式化数据,供任务调用)""" return self.request(api_path, method, params, data)

====================== 接口:跨系统配置管理 ======================

from flask import Blueprint, request, jsonify
from flask_login import login_required, current_user
from audit_engine import audit_log

cross_system_bp = Blueprint(“cross_system”,name)

@cross_system_bp.route(“/system/config/add”, methods=[“POST”])
@login_required
@audit_log(“add”)
def add_system_config():
“”“新增跨系统配置”“”
data = request.get_json()
required_fields = [“system_name”, “api_base_url”, “auth_type”, “auth_config”, “sync_fields”]
for field in required_fields:
if not data.get(field):
return jsonify({“success”: False, “error”: f"缺少必填字段:{field}"})

# 检查系统名称是否已存在 if SystemConfig.query.filter_by(system_name=data["system_name"]).first(): return jsonify({"success": False, "error": "该系统配置已存在"}) try: # 验证JSON格式字段 json.loads(data["auth_config"]) json.loads(data["sync_fields"]) except json.JSONDecodeError: return jsonify({"success": False, "error": "auth_config或sync_fields格式错误(需JSON)"}) system_config = SystemConfig( system_name=data["system_name"], api_base_url=data["api_base_url"], auth_type=data["auth_type"], auth_config=data["auth_config"], sync_fields=data["sync_fields"], remark=data.get("remark", ""), is_enable=data.get("is_enable", 1) ) db.session.add(system_config) db.session.commit() return jsonify({"success": True, "msg": "系统配置新增成功", "system_id": system_config.id})

@cross_system_bp.route(“/system/data/sync/int:system_id”, methods=[“POST”])
@login_required
@audit_log(“update”)
def sync_system_data(system_id):
“”“手动触发跨系统数据同步”“”
data = request.get_json()
api_path = data.get(“api_path”)
method = data.get(“method”, “GET”)
params = data.get(“params”, {})
request_data = data.get(“data”, {})

if not api_path: return jsonify({"success": False, "error": "缺少API路径"}) try: client = CrossSystemClient(system_id) sync_result = client.sync_data(api_path, method, params, request_data) return jsonify({"success": True, "msg": "数据同步成功", "data": sync_result}) except Exception as e: return jsonify({"success": False, "error": f"数据同步失败:{str(e)}"})

在app.py中新增以下内容

from cross_system_engine import cross_system_bp

注册跨系统联动蓝图

app.register_blueprint(cross_system_bp)

示例:测试ERP数据同步接口

@app.route(“/test/erp/sync”, methods=[“GET”])
@login_required
def test_erp_sync():
“”“测试ERP订单数据同步”“”
from cross_system_engine import CrossSystemClient
try:
client = CrossSystemClient(system_id=1) # 1为ERP系统配置ID
sync_data = client.sync_data(
api_path=“/order/list”,
method=“GET”,
params={“start_date”: “2026-01-01”, “end_date”: “2026-01-26”}
)
return jsonify({“success”: True, “data”: sync_data, “count”: len(sync_data)})
except Exception as e:
return jsonify({“success”: False, “error”: str(e)})

  1. 第二步:实现可视化任务编排,拖拽配置自动化流程

-- coding: utf-8 --

task_orchestration.py 可视化任务编排模块

import json
from flask import Blueprint, request, jsonify, g
from flask_login import login_required, current_user
from models import db, TaskInfo, TaskFlow, SystemConfig
from audit_engine import audit_log
from datetime import datetime

task_bp = Blueprint(“task”,name)

====================== 核心功能:解析任务流程,生成执行序列 ======================

def parse_task_flow(flow_id):
“”"
解析任务流程,生成执行序列(处理依赖关系)
:param flow_id: 流程ID
:return: 执行序列(列表,每个元素包含任务ID、执行顺序、依赖任务)
“”"
flow = TaskFlow.query.get(flow_id)
if not flow or not flow.is_enable:
raise Exception(f"流程不存在或已禁用(ID:{flow_id})")

task_ids = json.loads(flow.task_ids) dependencies = json.loads(flow.dependencies) if flow.dependencies else {} execute_mode = flow.execute_mode # 串行执行:按顺序解析,确保依赖任务在前 if execute_mode == "serial": execute_sequence = [] processed_tasks = set() # 递归处理依赖 def add_task(task_id): if task_id in processed_tasks: return # 先添加依赖任务 for dep_task_id in dependencies.get(str(task_id), []): add_task(dep_task_id) # 添加当前任务 task = TaskInfo.query.get(task_id) if not task or not task.is_enable: raise Exception(f"任务不存在或已禁用(ID:{task_id})") execute_sequence.append({ "task_id": task_id, "task_name": task.task_name, "dependencies": dependencies.get(str(task_id), []) }) processed_tasks.add(task_id) for task_id in task_ids: add_task(task_id) return execute_sequence # 并行执行:不考虑顺序(依赖任务需单独处理) else: return [ { "task_id": task_id, "task_name": TaskInfo.query.get(task_id).task_name, "dependencies": dependencies.get(str(task_id), []) } for task_id in task_ids if TaskInfo.query.get(task_id).is_enable ]

====================== 接口:任务管理(新增/编辑/删除) ======================

@task_bp.route(“/task/add”, methods=[“POST”])
@login_required
@audit_log(“add”)
def add_task():
“”“新增任务(单任务)”“”
data = request.get_json()
required_fields = [“task_name”, “trigger_type”, “execute_config”, “system_id”, “retry_strategy”]
for field in required_fields:
if not data.get(field):
return jsonify({“success”: False, “error”: f"缺少必填字段:{field}"})

# 检查任务名称是否重复 if TaskInfo.query.filter_by(task_name=data["task_name"]).first(): return jsonify({"success": False, "error": "该任务名称已存在"}) try: # 验证JSON格式字段 json.loads(data["execute_config"]) json.loads(data["retry_strategy"]) if data.get("trigger_config"): json.loads(data["trigger_config"]) if data.get("fail_handler"): json.loads(data["fail_handler"]) except json.JSONDecodeError: return jsonify({"success": False, "error": "配置字段格式错误(需JSON)"}) task = TaskInfo( task_name=data["task_name"], task_type="single", trigger_type=data["trigger_type"], trigger_config=data.get("trigger_config", "{}"), execute_config=data["execute_config"], system_id=data["system_id"], retry_strategy=data["retry_strategy"], fail_handler=data.get("fail_handler", "{}"), creator_id=current_user.id, is_enable=data.get("is_enable", 1) ) db.session.add(task) db.session.commit() return jsonify({"success": True, "msg": "任务新增成功", "task_id": task.id})

@task_bp.route(“/task/flow/add”, methods=[“POST”])
@login_required
@audit_log(“add”)
def add_task_flow():
“”“新增任务流程(可视化编排)”“”
data = request.get_json()
required_fields = [“flow_name”, “task_ids”, “execute_mode”]
for field in required_fields:
if not data.get(field):
return jsonify({“success”: False, “error”: f"缺少必填字段:{field}"})

# 检查流程名称是否重复 if TaskFlow.query.filter_by(flow_name=data["flow_name"]).first(): return jsonify({"success": False, "error": "该流程名称已存在"}) try: # 验证任务ID与依赖关系格式 task_ids = json.loads(data["task_ids"]) if not isinstance(task_ids, list) or len(task_ids) == 0: return jsonify({"success": False, "error": "task_ids需为非空列表"}) # 检查任务是否存在 for task_id in task_ids: if not TaskInfo.query.get(task_id): return jsonify({"success": False, "error": f"任务ID不存在:{task_id}"}) # 验证依赖关系 dependencies = json.loads(data.get("dependencies", "{}")) if data.get("dependencies") else {} except json.JSONDecodeError: return jsonify({"success": False, "error": "task_ids或dependencies格式错误(需JSON)"}) task_flow = TaskFlow( flow_name=data["flow_name"], task_ids=data["task_ids"], dependencies=json.dumps(dependencies, ensure_ascii=False), execute_mode=data["execute_mode"], creator_id=current_user.id, is_enable=data.get("is_enable", 1) ) db.session.add(task_flow) db.session.commit() # 解析流程,验证合法性 try: parse_task_flow(task_flow.id) except Exception as e: db.session.rollback() return jsonify({"success": False, "error": f"流程解析失败:{str(e)}"}) return jsonify({"success": True, "msg": "任务流程新增成功", "flow_id": task_flow.id})

====================== 接口:任务流程解析与预览 ======================

@task_bp.route(“/task/flow/parse/int:flow_id”, methods=[“GET”])
@login_required
def parse_flow():
“”“解析任务流程,返回执行序列(供前端预览)”“”
try:
execute_sequence = parse_task_flow(flow_id)
return jsonify({
“success”: True,
“execute_sequence”: execute_sequence,
“count”: len(execute_sequence)
})
except Exception as e:
return jsonify({“success”: False, “error”: str(e)})

====================== 前端拖拽编排页面(Jinja2模板) ======================

@task_bp.route(“/task/orchestration”, methods=[“GET”])
@login_required
def task_orchestration_page():
“”“任务编排可视化页面”“”
# 获取所有可用任务与系统配置
tasks = TaskInfo.query.filter_by(is_enable=1, task_type=“single”).all()
systems = SystemConfig.query.filter_by(is_enable=1).all()
return render_template(“task_orchestration.html”, tasks=tasks, systems=systems)

{% extends “base.html” %}
{% block content %}

任务流程可视化编排

可用任务库
{% for task in tasks %}
{{ task.task_name }}系统:{{ task.system.system_name }}触发方式:{{ {"timed":"定时","cron":"Cron","data_link":"数据联动"}[task.trigger_type] }}
{% else %}
暂无可用任务,请先新增单任务
{% endfor %}
<!-- 中间编排区域 --> <div class="col-6"> <div class="card h-100"> <div class="card-header bg-info text-white"> <div class="d-flex justify-content-between align-items-center"> <h5 class="card-title mb-0">流程编排区(拖拽任务至此)</h5> <div> <select class="form-select form-select-sm d-inline-block w-auto me-2" id="executeMode"> <option value="serial" selected>串行执行</option> <option value="parallel">并行执行</option> </select> <button class="btn btn-sm btn-white" onclick="clearFlow()">清空</button> </div> </div> </div> <div class="card-body"> <div id="orchestrationArea" class="border border-dashed rounded p-3 min-vh-50"> <div class="text-center text-muted py-5" id="emptyTip"> <i class="bi bi-arrows-move fs-3 mb-2"></i> <p>拖拽左侧任务至此处编排流程,可调整顺序、设置依赖</p> </div> <div id="flowTasks" class="d-flex flex-column gap-2"></div> </div> <!-- 依赖设置区域 --> <div class="mt-4"> <h6>依赖关系设置</h6> <div class="input-group mb-2"> <select class="form-select" id="targetTask" disabled> <option value="">选择目标任务</option> </select> <span class="input-group-text">依赖于</span> <select class="form-select" id="depTask" disabled> <option value="">选择依赖任务</option> </select> <button class="btn btn-primary" id="addDepBtn" disabled>添加依赖</button> </div> <div id="depList" class="text-sm text-muted">暂无依赖关系</div> </div> </div> </div> </div> <!-- 右侧流程配置与提交 --> <div class="col-3"> <div class="card h-100"> <div class="card-header bg-success text-white"> <h5 class="card-title mb-0">流程配置</h5> </div> <div class="card-body"> <form id="flowForm"> <div class="mb-3"> <label class="form-label">流程名称</label> <input type="text" class="form-control" id="flowName" placeholder="请输入流程名称" required> </div> <div class="mb-3"> <label class="form-label">流程状态</label> <select class="form-select" id="flowEnable"> <option value="1" selected>启用</option> <option value="0">禁用</option> </select> </div> <div class="mb-3"> <label class="form-label">流程描述</label> <textarea class="form-control" id="flowRemark" rows="3" placeholder="可选,描述流程用途"></textarea> </div> <div class="d-grid gap-2"> <button type="button" class="btn btn-primary" id="previewFlowBtn" disabled>预览流程</button> <button type="button" class="btn btn-success" id="saveFlowBtn" disabled>保存流程</button> </div> </form> <!-- 预览结果区域 --> <div class="mt-4" id="previewResult" style="display: none;"> <h6>流程执行序列预览</h6> <div class="border rounded p-2 max-height-200 overflow-auto" id="previewList"></div> </div> </div> </div> </div>

{% endblock %}

  1. 第三步:集成任务调度与故障自愈,确保自动化稳定运行
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/22 17:37:23

三步打造AI视频剪辑工具:效率提升300%的本地部署方案

三步打造AI视频剪辑工具&#xff1a;效率提升300%的本地部署方案 【免费下载链接】FunClip Open-source, accurate and easy-to-use video clipping tool, LLM based AI clipping intergrated || 开源、精准、方便的视频切片工具&#xff0c;集成了大语言模型AI智能剪辑功能 …

作者头像 李华
网站建设 2026/4/23 11:29:29

黑苹果配置破局者:OpCore-Simplify零门槛EFI生成工具全攻略

黑苹果配置破局者&#xff1a;OpCore-Simplify零门槛EFI生成工具全攻略 【免费下载链接】OpCore-Simplify A tool designed to simplify the creation of OpenCore EFI 项目地址: https://gitcode.com/GitHub_Trending/op/OpCore-Simplify 技术民主化&#xff1a;打破黑…

作者头像 李华
网站建设 2026/4/23 11:28:58

医学影像分析实战:基于PyTorch通用镜像快速建模

医学影像分析实战&#xff1a;基于PyTorch通用镜像快速建模 医学影像分析是AI在医疗领域最具落地价值的方向之一。从肺部CT结节检测到眼底图像糖网筛查&#xff0c;从MRI脑肿瘤分割到超声心动图功能评估&#xff0c;高质量的模型开发离不开稳定、高效、开箱即用的开发环境。但…

作者头像 李华
网站建设 2026/4/16 13:53:36

Z-Image-Turbo效率翻倍:8步高质量出图秘诀

Z-Image-Turbo效率翻倍&#xff1a;8步高质量出图秘诀 你有没有试过输入一句精心打磨的提示词&#xff0c;点击生成&#xff0c;然后盯着进度条数到第30步——结果画面却模糊、失真、甚至人物长出三只手&#xff1f;更别提等上五六秒才看到结果&#xff0c;在需要快速迭代创意…

作者头像 李华
网站建设 2026/4/23 13:02:09

避免踩坑!proteus8.17下载及安装注意事项汇总

以下是对您提供的博文内容进行 深度润色与工程化重构后的技术文章 。整体风格已全面转向 真实技术博主口吻 教学级实战逻辑 无AI痕迹的专业表达 &#xff0c;删减冗余结构、强化因果链条、融入一线调试经验&#xff0c;并严格遵循您提出的全部优化要求&#xff08;如&…

作者头像 李华
网站建设 2026/4/23 11:08:43

Parakeet-TDT-0.6B-V2:如何做到1.69%超低词错率语音识别?

Parakeet-TDT-0.6B-V2&#xff1a;如何做到1.69%超低词错率语音识别&#xff1f; 【免费下载链接】parakeet-tdt-0.6b-v2 项目地址: https://ai.gitcode.com/hf_mirrors/nvidia/parakeet-tdt-0.6b-v2 导语&#xff1a;NVIDIA最新发布的Parakeet-TDT-0.6B-V2语音识别模型…

作者头像 李华