智能体执行状态机实战:run / attempt / subscribe / active runs

智能体执行状态机实战:run / attempt / subscribe / active runs

这一章面向“复刻实现”:把执行框架从概念拆成状态机与可观测信号。

入口(概念版):

对应源码入口(可选)

  • src/auto-reply/reply/agent-runner.ts
  • src/agents/pi-embedded-runner/run.ts
  • src/agents/pi-embedded-runner/run/attempt.ts
  • src/agents/pi-embedded-subscribe.ts
  • src/agents/pi-embedded-runner/runs.ts

状态机总图(建议你实现时保留同样的边界)

idle → queued(session lane) → queued(global lane) → attempting → streaming → compacting(optional) → completed/failed → idle

补充控制态:

  • aborting:用户中断或超时触发的终止态
  • waiting_compaction_retry:订阅层等待重试稳定(避免半压缩结果)

关键机制(最值得抄的 3 个点)

1) 双层排队:同会话保序 + 全局限流

复刻时不要只做“全局队列”:缺少 session lane 很容易出现乱序/串线。

2) Active run registry:steer/abort 的最小控制面

要点不是“能插消息”,而是竞态可控

  • clearActiveRun 必须做 handle 匹配,避免旧 finally 误删新 run
  • 压缩中(compacting)不接受 steer(避免 transcript 不一致)

3) 订阅层分流:文本流 / 工具流 / 压缩流

不要把工具事件与文本输出混在一起:它会直接影响审计、UI 展示与排障效率。

失败场景与排障入口

  • 用户看到“已发送但没回复”:先确认 ACK 与事件流是否正常(参见 Gateway 协议 / 日志),再确认是否被 lane 排队或被 abort。
  • steer 偶发失败:优先检查 active run 是否处于 streaming 且非 compacting;并确认 handle 清理是否做了匹配校验。

验收清单

  1. 同一 sessionKey 任意时刻只存在一个 active run。
  2. abort 后不会残留 active run / 订阅监听器。
  3. compaction 重试期间不会提前返回“完成”。
  4. steer 的失败是“可解释的 false”,而不是随机抛错。