最小增量实现 Responses API
以最小增量复用 Run/Thread/Tool/Message 全链路,构建 Response API 的轻量化执行与流式协议适配。
架构复用的设计理念
-
共性分析(以 Run 为中心)
- 两条路径本质一致:均是“规划 → 模型流式 → 工具并发 → 状态机落库/返回”。
- 共享核心:
RunExecutor.executeRun()+executeLoop()、Planner.nextStep()、ChatService.chat()、ToolExecutor、RunStateManager、ExecutionContext、线程池隔离(TaskExecutor)。 - 差异只在“消息编排与协议映射”层:Assistant 用
MessageExecutor→ Assistants SSE;Responses 用ResponseMessageExecutor→ Responses SSE。
-
核心组件抽象与泛化
ExecutionContext统一线程内状态与同步(队列、锁、条件变量、工具任务/结果、SSE 输出队列、模型能力、文件映射等)。RunExecutor统一调度骨架(模板方法式骨架):启动消息执行器与工具执行器 → 进入计划/执行循环 → 统一收尾。ToolExecutor/ToolOutputChannel统一工具并发与输出串行化机制。RunStateManager统一 Run/Step 状态迁移与持久化。
-
设计模式
- Template Method:
RunExecutor.executeRun()定义通用执行骨架,仅在“选择消息执行器”处分流。 - Adapter:
ResponseService.createResponse()+ResponseUtils.convertToRunRequest()将CreateResponseRequest适配为RunCreateRequest(无 Assistant 场景)。ResponseMessageExecutor将ChatCompletionChunk/工具输出适配为 Responses 的BaseStreamEvent序列。
- Strategy:
Planner决策执行步;ToolHandler多工具实现;ToolChoice与“并行工具”开关来自元数据策略。 - Observer/Producer-Consumer:
ExecutionContext.senderQueue串联 Run 主循环、LLM 流、工具流与 SSE 发送线程。 - Bulkhead(隔离栅):
TaskExecutor多线程池隔离 Runner/Executor/Caller,避免相互干扰(见api/run-executor-design.md)。
- Template Method:
共享组件的设计
-
Thread 管理器的双模式支持(Store/Non-Store)
- 控制面:
ResponseController.createResponses()对store=false激活RepoContext非持久模式;ResponseIdMappingRepo等 Repo 感知并走内存态。 - 数据面:
ResponseService.confirmThreadAndRun()支持三路:显式conversation、基于previous_response_id、或新建thread(ThreadService.createThread())。 - 分支复用:如果
previous_response_id已被使用,checkAndStore()走写锁并调用ThreadService.forkThreadBeforeTargetRun()在目标 run 的 assistant 消息“之前”分叉,且可将该 run 的工具调用消息对(tool_call/tool_result)一并织入,保证对话延续性与顺序性。
- 控制面:
-
Message 处理 pipeline 的复用
- 上游一致:
ChatService.chat(context)将模型流式增量与工具调用增量都投递到ExecutionContext.senderQueue。 - 中游分流:Assistant 用
MessageExecutor,Responses 用ResponseMessageExecutor。两者都调用RunStateManager.finishMessageCreation()、startToolCalls()、finishToolCall()等,共享消息/步骤的最终一致性路径。 - 底层一致:
MessageService承担消息读写;RunService生成message_creation/tool_calls步骤并维护工具相关元数据。
- 上游一致:
-
Run 执行引擎的适配层
RunExecutor.startResponseRun()在通用ExecutionContext上打标context.setResponse(response),从而在executeRun()中选择ResponseMessageExecutor.start(...)。RunExecutor.buildExecutionContext()完全复用 Assistant 路径:装载 Run/Step/Tools/Files/ModelFeatures,并发布首包(Run/Thread或ResumeMessage)。
-
工具系统的统一调用接口
ToolExecutor.start(...)注册服务端可执行工具(ToolFetcher.getToolHandler()),统一把 LLM 的ChatToolCall转ToolCall并并发执行。- 输出串行化:
ToolOutputChannel基于ExecutionContext.currentOutputToolCallId将多工具并行执行的产物按“一个工具一个时间窗口”的方式串行发布;Responses 路径中,ResponseMessageExecutor再把这些输出映射为OutputItemAdded/Done、FunctionCallArgumentsDelta/Done等事件并维持全局sequenceNumber/outputIndex/contentIndex。
差异化处理策略
-
previous_response_id 专属语义
- 解析与继承:
ResponseService.confirmThreadAndRun()通过ResponseIdMappingRepo.findByResponseId()找到上一轮 run,继承其tools(作为“继承工具”打标),并确定threadId/previousRunId。 - 顺序与原子性:
checkFirstMessage(...)会在首条用户输入前补入必要的“上一轮消息边界”(例如未提交的工具调用或上一条 assistant 文本),并用MessageUtils.checkPre()校验user/assistant/tool邻接合法性,避免对话断裂。 - 并发与幂等:
checkAndStore(...)以previous_response_id为写锁键,若该 previous 已被使用则自动分叉新 thread(见上文“分支复用”)。
- 解析与继承:
-
无 Assistant 模式(按请求即时配置)
- 适配:
ResponseUtils.convertToRunRequest()将CreateResponseRequest转为RunCreateRequest,不依赖assistant_id,模型/指令/工具/裁剪/推理等均来自请求;MetaConstants.RESPONSE_ID/STORE/PARALLEL_TOOL_CALLS/BACKGROUND等写入run.metadata控制后续行为。 - 落地:
RunService.doCreateRun()检测RESPONSE_ID即走“无 Assistant”分支,仍复用统一的 Run/Step/Tool 创建逻辑,保证与 Assistant 路径一致的稳定性与可观测性。
- 适配:
-
不同响应协议的处理
- Assistant:
MessageExecutor输出 OpenAI Assistants 系列事件(THREAD_MESSAGE_DELTA等),在[LLM_DONE]/[TOOL_DONE]时统一收尾。 - Responses:
ResponseMessageExecutor输出ResponseCreated/InProgress/Completed/Failed/Incomplete及细粒度内容/工具事件:- 文本:
OutputItemAdded→OutputTextDelta→OutputTextDone→OutputItemDone。 - 推理:
ReasoningSummaryPartAdded→ReasoningSummaryTextDelta/Done。 - 函数调用:
OutputItemAdded(FunctionToolCall)→FunctionCallArgumentsDelta/Done。 - 工具输出:从
ToolOutputChannel接收ToolStreamEvent,据currentOutputToolCallId生成itemId并与outputIndex对齐,完成后context.finishToolCallOutput()释放下一个工具的输出权。
- 文本:
- Assistant:
-
轻量级上下文(Non-Store)
- 开关:
ResponseController.createResponses()对store=false激活RepoContext,对应 Repo(如ResponseIdMappingRepo)走内存实现;ExecutionContext.isStore()也基于run.metadata.store控制消息是否落库。 - 体验:在不持久化的场景下,仍可完整获得 Responses 流式事件与最终聚合结果(
context.blockingGetResult(...))。
- 开关:
时序与协作(Responses 路径)
关键代码索引
- 控制层
api/src/main/java/com/ke/assistant/controller/ResponseController.java
- 业务编排
api/src/main/java/com/ke/assistant/service/ResponseService.javaapi/src/main/java/com/ke/assistant/service/RunService.javaapi/src/main/java/com/ke/assistant/service/ThreadService.javaapi/src/main/java/com/ke/assistant/db/repo/ResponseIdMappingRepo.java
- 执行与消息
api/src/main/java/com/ke/assistant/core/run/RunExecutor.javaapi/src/main/java/com/ke/assistant/core/run/ExecutionContext.javaapi/src/main/java/com/ke/assistant/core/run/ResponseMessageExecutor.javaapi/src/main/java/com/ke/assistant/core/run/MessageExecutor.java(Assistant)
- 工具体系
api/src/main/java/com/ke/assistant/core/tools/ToolExecutor.javaapi/src/main/java/com/ke/assistant/core/tools/ToolOutputChannel.javaapi/src/main/java/com/ke/assistant/core/tools/ToolHandler.java与各handlers/*
- 协议与适配
api/src/main/java/com/ke/assistant/util/ResponseUtils.javaapi/run-executor-design.md、api/thread-concurrent-run-ordering.md
小结
Responses API 以“适配层 + 协议映射”的最小改造复用 Assistant 执行栈:
- Run/Tool/State/Context 保持完全一致,继承其稳定性与并发能力;
- 通过
ResponseMessageExecutor与ResponseUtils实现协议与输入的定制化; - 借助
previous_response_id的分支机制与 Store/Non-Store 双模式,既能延续复杂对话,也能满足轻量与隐私需求。