命令队列

命令队列

命令队列(2026-01-16)

我们通过一个小型进程内队列序列化入站自动回复运行(所有渠道),以防止多个智能体运行发生冲突,同时仍允许跨会话的安全并行。

为什么需要

  • 自动回复运行可能开销很大(LLM 调用),当多条入站消息接近同时到达时可能发生冲突。
  • 序列化可以避免竞争共享资源(会话文件、日志、CLI stdin),并降低上游速率限制的可能性。

工作原理

  • 一个支持通道感知的 FIFO 队列以可配置的并发上限排空每个通道(未配置的通道默认为 1;main 默认为 4,subagent 为 8)。
  • runEmbeddedPiAgent会话键入队(通道 session:<key>),以保证每个会话只有一个活动运行。
  • 然后每个会话运行被排入全局通道(默认为 main),因此整体并行度受 agents.defaults.maxConcurrent 限制。
  • 启用详细日志时,如果排队运行在开始前等待超过约 2 秒,会发出简短通知。
  • 输入指示器仍在入队时立即触发(当渠道支持时),因此在等待轮次时用户体验不受影响。

队列模式(按渠道)

入站消息可以引导当前运行、等待后续轮次,或两者兼顾:

  • steer:立即注入当前运行(在下一个工具边界后取消待处理的工具调用)。如果未在流式传输,则回退到 followup。
  • followup:在当前运行结束后为下一个智能体轮次入队。
  • collect:将所有排队消息合并为单个后续轮次(默认)。如果消息针对不同的渠道/线程,它们会单独排空以保留路由。
  • steer-backlog(又名 steer+backlog):现在引导保留消息用于后续轮次。
  • interrupt(旧版):中止该会话的活动运行,然后运行最新消息。
  • queue(旧版别名):与 steer 相同。

steer-backlog 意味着你可以在被引导的运行之后获得后续响应,因此流式传输界面可能看起来像重复。如果你希望每条入站消息只有一个响应,请优先使用 collect/steer。 发送 /queue collect 作为独立命令(按会话)或设置 messages.queue.byChannel.discord: "collect"

默认值(配置中未设置时):

  • 所有界面 → collect

通过 messages.queue 全局或按渠道配置:

{
  messages: {
    queue: {
      mode: "collect",
      debounceMs: 1000,
      cap: 20,
      drop: "summarize",
      byChannel: { discord: "collect" },
    },
  },
}

队列选项

选项适用于 followupcollectsteer-backlog(以及当 steer 回退到 followup 时):

  • debounceMs:在开始后续轮次前等待静默(防止"继续,继续")。
  • cap:每个会话的最大排队消息数。
  • drop:溢出策略(oldnewsummarize)。

summarize 保留被丢弃消息的简短要点列表,并将其作为合成的后续提示注入。 默认值:debounceMs: 1000cap: 20drop: summarize

按会话覆盖

  • 发送 /queue <mode> 作为独立命令,为当前会话存储该模式。
  • 选项可以组合:/queue collect debounce:2s cap:25 drop:summarize
  • /queue default/queue reset 清除会话覆盖。

范围和保证

  • 适用于所有使用 Gateway 网关回复管道的入站渠道的自动回复智能体运行(WhatsApp 网页版、Telegram、Slack、Discord、Signal、iMessage、网页聊天等)。
  • 默认通道(main)对入站 + 主心跳是进程范围的;设置 agents.defaults.maxConcurrent 以允许多个会话并行。
  • 可能存在额外的通道(例如 cronsubagent),以便后台任务可以并行运行而不阻塞入站回复。
  • 按会话通道保证一次只有一个智能体运行触及给定会话。
  • 无外部依赖或后台工作线程;纯 TypeScript + promises。

故障排除

  • 如果命令似乎卡住,启用详细日志并查找"queued for …ms"行以确认队列正在排空。
  • 如果你需要查看队列深度,启用详细日志并观察队列计时行。

一致性与可追溯性

队列模式(steer / followup / collect / steer-backlog)解决的是并发下的可解释性:同一会话不并发写、不同会话可并行、以及“新消息如何进入运行”有明确规则。它不承诺“不同模式下语义等价”(同样一组消息在不同进入方式下,模型最终回答可能不同)。

1) steer:一致性保证与边界

  • steer 只在“可安全注入”的窗口生效:必须正在流式阶段,并且不会在压缩(compaction)期间注入;否则会拒绝注入并回退到 followup(即排队下一轮)。
  • 注入并不等价于“回滚当前输出”。如果你启用了分块流式传输(把部分回复提前发到外部渠道),先前已经发出去的块不会被撤回;steer 只会影响后续推理与后续输出。这也是 steer-backlog 可能让“流式界面看起来像重复”的原因之一。

相关概念入口:

2) collect:如何保证可追溯

collect 的核心是把“忙时积压”变成一个结构化、可回放的 followup prompt:它会显式标记“这些内容是在 agent 忙时排队的”,并保留每条排队消息的边界(不会把多条消息直接拼成一坨难以复盘的自然语言)。

实现上,合并后的 followup prompt 会包含类似的可识别标记(用于你在转录/日志中定位):

  • 标题:[Queued messages while agent was busy]
  • 分隔块:Queued #1 / Queued #2 / …

当积压消息跨不同渠道/线程目标时,系统会放弃合并并改为“逐条排空”,以避免把不同路由目标的消息合并后造成回复串线。

3) drop: summarize:会不会“语义偏移”?

会。summarize 明确是一种有损背压:当积压超过 cap,系统会丢弃一部分排队消息,并生成一个“溢出摘要”作为合成提示注入下一轮,用来告诉模型“有内容被丢弃了,大概是什么”。这能提升可解释性与连续性,但不会保证完全保真。

实现上,这个溢出摘要同样有可识别标记(便于审计与复盘):

  • 标题:[Queue overflow] Dropped N messages due to cap.
  • 以及一个 Summary: 的项目符号列表(对被丢弃消息做压缩/截断)。

如果你的场景对“事实不丢”和“语义精确”更敏感,优先考虑调大 cap、缩短 debounceMs、或改用 followup(逐条进入下一轮),并谨慎使用 drop: summarize

4) 从“证据链”角度排障:看哪里

  • 会话转录是最终事实:会话状态与 JSONL 转录由 Gateway 网关持有,复盘时以网关侧转录为准。参见 会话
  • 排队与等待的可观测性:启用详细日志后,你能看到 lane 入队/出队与等待时长等信号。参见 日志