Lanes + Queues (Implementation): session/global two-level queuing

Lanes + Queues (Implementation): session/global two-level queuing

This page turns concurrency control into a replicable state machine: per-session ordering, global limits, and post-restart recovery.

Entry point (concept):

Code entry points (optional)

  • src/process/command-queue.ts
  • src/agents/pi-embedded-runner/lanes.ts
  • src/gateway/server-lanes.ts
  • src/agents/pi-embedded-runner/runs.ts

Core data structures (keep them explicit)

  • LaneState: queue, activeTaskIds, maxConcurrent, draining, generation
  • QueueEntry: enqueuedAt, warnAfterMs, onWait (warn-only, no cancel)

Critical flow (enqueue → drain → finalize)

  1. drain immediately on enqueue (don’t wait for the next event).
  2. draining prevents re-entrant pumps.
  3. pump on failures too (avoid starving the queue).

reset/generation (the key to “no dead queue after restart”)

If tasks are interrupted, their finally blocks may not run; generation guards prevent stale callbacks from writing into new state.

Failure modes and troubleshooting

  • Out-of-order in the same session: confirm session lane serializes and maxConcurrent wasn’t raised accidentally.
  • Dead lanes: verify failure paths still pump and draining isn’t stuck.
  • Probe lane log spam: treat probe failures as expected and keep them quiet.

Acceptance checks

  1. stable ordering for a single session under burst.
  2. two sessions can run concurrently.
  3. after reset, backlog continues draining (no deadlocks).