返回列表

operation-trigger

影刀RPA生态中间件,解决社区版影刀不支持编排式/批量式/传参式触发应用的问题,实现任务队列调度与自动化执行。

PythonFastAPIVue.jsSQLiteAPSchedulerPyWebviewRPA

项目简介

operation-trigger(简称 OT)是影刀RPA生态中的中间件软件,专门解决社区版影刀不支持编排式/批量式/传参式触发应用的问题。项目采用 Python + FastAPI 后端 + Vue3 前端架构,实现了完整的任务调度系统,包括触发器管理、编排集配置、任务队列调度、定时执行等核心功能。

主要特性

  • 🎯 应用触发器:创建触发器绑定影刀应用,支持一对一触发、参数传递和结果接收
  • 📦 编排集:将多个触发器编排为集,支持批量有序执行和定时执行
  • ⚡ 任务执行引擎:内存队列消费,支持重试机制和优先级插队
  • ⏰ Cron 定时调度:支持 5 字段和 6 字段 Cron 表达式定时执行编排集
  • 📊 执行监控:实时查看队列状态和执行日志
  • 📝 历史记录:查询历史任务执行详情和响应数据
  • 🔔 异常告警:影刀应用执行异常时弹窗告警

技术栈

类别技术
后端框架FastAPI + Uvicorn
前端框架Vue 3 + Vue Router
UI 组件库Element Plus
数据库SQLite
定时调度APScheduler
桌面 GUIPyWebview
打包工具PyInstaller
语言Python 3.x

技术亮点

亮点一:线程安全的优先级任务队列

项目实现了一个高性能的内存任务队列,具有以下技术特点:

  • 单例模式:使用双重检查锁定确保全局只有一个队列实例,避免资源竞争
  • 优先级排序:任务按 run_priority 字段降序排列,优先值越大越先执行
  • FIFO 保证:相同优先值的任务按入队顺序执行,通过入队计数器实现
  • 线程安全:使用 threading.Lock 保护所有共享数据操作
  • 当前任务追踪_current_task 字段确保同一时刻只有一个任务在执行
# 优先级插入算法
def _insert_by_priority(self, task: Dict, priority: int):
    insert_index = len(self._pending_list)
    for i, pending_task in enumerate(self._pending_list):
        pending_priority = pending_task.get('run_priority', 0)
        if pending_priority < priority:
            insert_index = i
            break
    self._pending_list.insert(insert_index, task)

亮点二:任务执行引擎与影刀回调机制

任务执行引擎采用生产者-消费者模式,与影刀RPA应用形成完整的闭环:

  • 独立守护线程:执行器在独立线程中运行,不阻塞主线程
  • 文件通信机制:通过 request.json 文件向影刀应用传递任务参数
  • 回调接口设计
    • /api/pre_callback:影刀应用开始执行前调用,更新任务状态
    • /api/callback:影刀应用执行完成后调用,处理结果和重试逻辑
    • /api/alert:影刀应用异常时调用,触发告警弹窗
  • 自动重试机制:任务失败时自动重新入队,支持配置最大重试次数
# 回调处理逻辑
if request.response_code == 200:
    task_repo.update_status(request.task_id, 4)
    task_queue.clear_current_task()
elif request.response_code == 500:
    retry_count = sum(1 for r in response_list if r.get('response_code') == 500)
    if retry_count < max_retry_times:
        # 重新写入 request.json 触发重试
        with open(request_file_path, 'w', encoding='utf-8') as f:
            f.write(request_json_info)

亮点三:Cron 定时调度系统

基于 APScheduler 实现的定时调度系统,支持灵活的时间配置:

  • 双格式支持:同时支持 5 字段(分 时 日 月 周)和 6 字段(秒 分 时 日 月 周)Cron 表达式
  • 任务持久化:定时任务配置存储在数据库中,应用重启后自动恢复
  • 临时编排机制:定时触发时创建临时编排记录,不影响原始编排集配置
  • 开关控制:支持全局开启/关闭定时任务,便于维护和调试
# Cron 表达式解析
def _parse_cron_expression(self, cron_expression: str) -> CronTrigger:
    parts = cron_expression.strip().split()
    if len(parts) == 6:
        return CronTrigger(
            second=parts[0], minute=parts[1], hour=parts[2],
            day=parts[3], month=parts[4], day_of_week=parts[5]
        )
    elif len(parts) == 5:
        return CronTrigger(
            minute=parts[0], hour=parts[1], day=parts[2],
            month=parts[3], day_of_week=parts[4]
        )

亮点四:SQLite 数据库版本控制与自动迁移

项目实现了完整的数据库版本管理机制:

  • 版本号管理db_version 表存储当前数据库结构版本
  • 迁移脚本_get_migrations() 方法定义版本升级的 SQL 脚本
  • 自动迁移:应用启动时检测版本差异,自动执行迁移脚本
  • 兼容性检查:版本回退时发出警告,提示可能的兼容性问题
# 数据库迁移机制
migrations = {
    1: [
        "ALTER TABLE orchestrations ADD COLUMN emoji_good TEXT DEFAULT NULL"
    ]
}
for version in range(from_version, to_version):
    if version in migrations:
        for sql in migrations[version]:
            cursor.execute(sql)
        cursor.execute(
            "UPDATE db_version SET version = ? WHERE id = 1",
            (version + 1,)
        )

亮点五:PKL 框架指令生成器

实现了影刀触发器框架指令文件的自动生成:

  • 模板机制:基于元模板 OT-ShadowBot-APP-Framework.pkl 生成触发器专用文件
  • 变量注入:自动修改模板中的 trigger_idtrigger_file_pathapi_port 变量
  • 多格式支持:同时处理 ShadowBot.Flow.BlocksHTML Format 两种剪贴板格式
  • Base64 编解码:对嵌入的 JSON 数据进行 Base64 编解码处理

亮点六:PyWebview 桌面应用架构

采用 Python 后端 + Web 前端的混合架构:

  • 无边框窗口:自定义标题栏,支持最小化、最大化、关闭操作
  • JS API 桥接:通过 js_api 参数暴露 Python API 给前端调用
  • 多线程服务:FastAPI 服务在独立守护线程中运行
  • 开发/生产环境适配:自动检测运行环境,正确加载前端资源路径

项目截图

1 1 1 1 1

相关链接