深入执行引擎:Run Executor的设计
执行引擎是整个系统的心脏。bella-assistants项目在兼容OpenAI Assistants API的基础上,设计了一套高度优化的执行引擎架构。本文将深入剖析Run Executor的全链路设计,揭示其如何通过精巧的分层架构、并发控制和性能优化,实现了高效、可靠的AI对话执行能力。
全链路生命周期
一次 Run 的端到端链路自上而下如下所示:
关键代码入口:
- 创建接口:
RunController.createRun()、ThreadController.createThreadAndRun()、ResponseController.createResponses()。 - 执行入口:
RunExecutor.startRun()/startResponseRun()→executeRun()→executeLoop()(文件:api/src/main/java/com/ke/assistant/core/run/RunExecutor.java)。 - 计划器:
Planner.nextStep()(core/plan/Planner.java)。 - 模型流式:
ChatService.chat()(core/ai/ChatService.java)。 - 工具执行:
ToolExecutor.run()/loop()(core/tools/ToolExecutor.java)。 - 消息流式:
MessageExecutor与ResponseMessageExecutor(core/run/*.java)。 - 状态持久化:
RunStateManager(core/run/RunStateManager.java)。
执行状态机设计(queued → in_progress → requires_action → completed)
状态枚举与事件映射:RunStatus(core/run/RunStatus.java)。
QUEUED→IN_PROGRESS→REQUIRES_ACTION/COMPLETED/FAILED/CANCELLING→CANCELLED或EXPIRED。- 对应 SSE 事件:
StreamEvent.THREAD_RUN_*/THREAD_RUN_STEP_*。
状态迁移落地:RunStateManager 负责 DB 加锁/校验与变更:
toInProgress():校验message_creation与run_step均为in_progress,成功后缓存执行中的context,并注册到ServiceMesh(用于跨实例取消)。startToolCalls():创建tool_calls类型的run_step为in_progress,并context.signalToolCall()唤醒工具执行线程。finishMessageCreation()/finishToolCall():落库usage、step_details、错误信息等;当工具全部完成(或失败)后触发context.signalRunner()继续主循环。toRequiresAction()/submitRequiredAction():当存在外部工具(无服务器端 Handler)时,构造RequiredAction并挂到run;外部提交后将run返回到QUEUED以继续后续轮次。- 终止态:
toCompleted()/toFailed()/toCanceled()/toExpired()。
执行器的分层架构与协作
- RunExecutor(主控制器):
RunExecutor.executeRun()+executeLoop()控制一次 Run 的生命周期与循环调度,选择消息执行器(MessageExecutorvsResponseMessageExecutor),以及启动工具执行器(ToolExecutor.start(...))。 - Planner(规划执行器):
Planner.nextStep()负责:- 终止/异常/超时/外部输入等快速分支;
- 构建聊天上下文
buildChatMessages()与工具列表buildChatTools(); - 决策
LLM_CALL或WAIT_FOR_TOOL/COMPLETE。
- AIExecutor(模型调用执行器):由
ChatService.chat()承担,组装ChatCompletionRequest并启用stream=true,将ChatCompletionChunk逐条context.publish(...)给消息执行器消费;内建带指数风格的有限重试逻辑。 - ToolExecutor(工具执行器):独立线程等待
context.toolCallAwait(),被唤醒后并发执行可服务端处理的工具;必要时通过ToolOutputChannel.start(context)开启工具输出串行通道。 - MessageExecutor/ResponseMessageExecutor(消息处理执行器):
- Assistant API 路径:
MessageExecutor将 Token/推理流与工具调用事件转换为 OpenAI Assistants SSE 事件(THREAD_MESSAGE_DELTA、THREAD_RUN_STEP_DELTA等),并负责在[LLM_DONE]/[TOOL_DONE]时调用RunStateManager收尾。 - Response API 路径:
ResponseMessageExecutor将 Token/推理/函数调用/工具输出映射为 Response API 的BaseStreamEvent序列,使用sequenceNumber/outputIndex/itemId明确顺序与边界,并在运行停止时汇总Response返回。
- Assistant API 路径:
- ExecutionContext(协作枢纽):跨执行器共享的状态与同步原语(
ReentrantLock + Condition),包含:- 发送队列
senderQueue、工具任务currentToolTasks、结果currentToolResults、元数据currentMetaData; - 条件等待/唤醒:
runnerAwait()/signalRunner()、toolCallAwait()/signalToolCall(); - 输出串行标识:
currentOutputToolCallId; - 运行参数、文件映射、模型特性与配额统计等。
- 发送队列
- RunStateManager(状态与持久化):以 DB 为真值源,处理
run与run_step的状态迁移、usage、step_details、required_action与message内容的最终一致性更新。
工具并发执行的顺序保证
工具依赖与调度
- 工具产生:LLM 流式返回
ChatToolCall时由MessageExecutor/ResponseMessageExecutor调用ExecutionContext.addToolCallTask()收集,必要时生成currentToolCallStepId并发出THREAD_RUN_STEP_CREATED/DELTA。 - 工具分类:
ToolExecutor将工具分为两类:- 服务器端可执行(存在
ToolHandler,且非definitionHandler)→ 并发执行。 - 不可在服务端执行(
definitionHandler(比如 response api的custom tool和local shell tool) 或无 Handler)→ 汇总到RequiredAction,返回客户端提交。
- 服务器端可执行(存在
并行工具组与异步编排
- 编排核心:
ToolExecutor.loop()将一批ChatToolCall转换为ToolCall,对有 Handler 的任务使用TaskExecutor.supplyCaller(...)提交到caller线程池并返回CompletableFuture;CompletableFuture.allOf(...).join()等待本批内部工具全部完成。 - 结果聚合:每个工具完成后调用
RunStateManager.finishToolCall()将ToolCall附加到当前run_step.step_details.tool_calls