feat(agent-runner): per-session lanes, lifecycle error recovery, and slash command support

Use per-session gateway lanes (web:sessionId) so concurrent chat tabs
stream independently. Add a 15s recovery window after lifecycle errors
to accept continuation runIds. Route slash commands through chat.send
RPC and forward chat events to the UI.
This commit is contained in:
kumarabhirup 2026-03-17 12:35:26 -07:00
parent 11478c752e
commit 8fa221554e
No known key found for this signature in database
GPG Key ID: DB7CA2289CAB0167
2 changed files with 273 additions and 29 deletions

View File

@ -285,6 +285,173 @@ describe("agent-runner", () => {
proc.kill("SIGTERM");
});
it("keeps stream open across lifecycle error and accepts continuation runId", async () => {
const MockWs = installMockWsModule();
const { spawnAgentProcess } = await import("./agent-runner.js");
const proc = spawnAgentProcess("hello", "sess-lifeerr");
await waitFor(() => MockWs.instances[0]?.methods.includes("agent"));
const ws = MockWs.instances[0];
let stdout = "";
let closed = false;
proc.stdout?.on("data", (chunk: Buffer | string) => {
stdout += chunk.toString();
});
proc.on("close", () => {
closed = true;
});
ws.emitJson({
type: "event",
event: "agent",
seq: 1,
payload: {
runId: "r-initial",
sessionKey: "agent:main:web:sess-lifeerr",
stream: "lifecycle",
data: { phase: "error" },
globalSeq: 1,
ts: Date.now(),
},
});
await new Promise((resolve) => setTimeout(resolve, 40));
expect(closed).toBe(false);
ws.emitJson({
type: "event",
event: "agent",
seq: 2,
payload: {
runId: "r-continuation",
sessionKey: "agent:main:web:sess-lifeerr",
stream: "assistant",
data: { delta: "continued-output" },
globalSeq: 2,
ts: Date.now(),
},
});
await waitFor(() => stdout.includes("continued-output"), {
attempts: 80,
delayMs: 10,
});
proc.kill("SIGTERM");
});
it("uses chat.send RPC for slash commands instead of agent", async () => {
const MockWs = installMockWsModule();
const { spawnAgentProcess } = await import("./agent-runner.js");
const proc = spawnAgentProcess("/status", "sess-cmd");
await waitFor(() => MockWs.instances[0]?.methods.includes("chat.send"));
const ws = MockWs.instances[0];
expect(ws.methods).toContain("connect");
expect(ws.methods).toContain("chat.send");
expect(ws.methods).not.toContain("agent");
const chatSendFrame = ws.requestFrames.find(
(frame) => frame.method === "chat.send",
);
const params = chatSendFrame?.params as Record<string, unknown>;
expect(params.message).toBe("/status");
expect(params.deliver).toBe(false);
expect(typeof params.idempotencyKey).toBe("string");
expect((params.idempotencyKey as string).length).toBeGreaterThan(0);
proc.kill("SIGTERM");
});
it("uses agent RPC for regular (non-slash) messages", async () => {
const MockWs = installMockWsModule();
const { spawnAgentProcess } = await import("./agent-runner.js");
const proc = spawnAgentProcess("hello world", "sess-reg");
await waitFor(() => MockWs.instances[0]?.methods.includes("agent"));
const ws = MockWs.instances[0];
expect(ws.methods).toContain("agent");
expect(ws.methods).not.toContain("chat.send");
proc.kill("SIGTERM");
});
it("forwards chat final events to stdout for slash commands", async () => {
const MockWs = installMockWsModule();
const { spawnAgentProcess } = await import("./agent-runner.js");
const proc = spawnAgentProcess("/status", "sess-chatfinal");
await waitFor(() => MockWs.instances[0]?.methods.includes("chat.send"));
const ws = MockWs.instances[0];
let stdout = "";
let closed = false;
proc.stdout?.on("data", (chunk: Buffer | string) => {
stdout += chunk.toString();
});
proc.on("close", () => {
closed = true;
});
ws.emitJson({
type: "event",
event: "chat",
seq: 1,
payload: {
state: "final",
message: {
role: "assistant",
content: "Status: all systems go",
},
sessionKey: "agent:main:web:sess-chatfinal",
globalSeq: 1,
},
});
await waitFor(() => stdout.includes("state"), {
attempts: 80,
delayMs: 10,
});
const parsed = JSON.parse(stdout.trim().split("\n").pop()!) as Record<string, unknown>;
expect(parsed.event).toBe("chat");
expect((parsed.data as Record<string, unknown>).state).toBe("final");
await waitFor(() => closed, { attempts: 80, delayMs: 10 });
});
it("does not forward chat events for regular messages in start mode", async () => {
const MockWs = installMockWsModule();
const { spawnAgentProcess } = await import("./agent-runner.js");
const proc = spawnAgentProcess("hello", "sess-nochat");
await waitFor(() => MockWs.instances[0]?.methods.includes("agent"));
const ws = MockWs.instances[0];
let stdout = "";
proc.stdout?.on("data", (chunk: Buffer | string) => {
stdout += chunk.toString();
});
ws.emitJson({
type: "event",
event: "chat",
seq: 1,
payload: {
state: "final",
message: {
role: "assistant",
content: "should be ignored",
},
sessionKey: "agent:main:web:sess-nochat",
globalSeq: 1,
},
});
await new Promise((resolve) => setTimeout(resolve, 40));
expect(stdout).not.toContain("should be ignored");
proc.kill("SIGTERM");
});
});
describe("spawnAgentSubscribeProcess", () => {

View File

@ -162,6 +162,7 @@ const REQUEST_TIMEOUT_MS = 12_000;
const DEFAULT_GATEWAY_CLIENT_CAPS = ["tool-events"];
const SESSIONS_PATCH_RETRY_DELAY_MS = 150;
const SESSIONS_PATCH_MAX_ATTEMPTS = 2;
const LIFECYCLE_ERROR_RECOVERY_MS = 15_000;
const ED25519_SPKI_PREFIX = Buffer.from("302a300506032b6570032100", "hex");
type AgentSubscribeSupport = "unknown" | "supported" | "unsupported";
@ -721,6 +722,10 @@ class GatewayProcessHandle
private closeScheduled = false;
private requestedClose = false;
private runId: string | null = null;
private lifecycleErrorCloseTimer: ReturnType<typeof setTimeout> | null = null;
private lifecycleErrorRecoveryUntil = 0;
private useChatSend = false;
private receivedAgentEvent = false;
constructor(private readonly params: SpawnGatewayProcessParams) {
super();
@ -732,6 +737,7 @@ class GatewayProcessHandle
return false;
}
this.requestedClose = true;
this.clearLifecycleErrorCloseTimer();
this.client?.close();
const closeSignal = typeof signal === "string" ? signal : null;
this.finish(0, closeSignal);
@ -772,16 +778,28 @@ class GatewayProcessHandle
throw new Error(frameErrorMessage(connectRes));
}
if (this.params.mode === "start") {
// Pre-patch verbose for existing sessions (best-effort; new
// sessions don't exist yet so this may fail — we retry below).
if (this.params.sessionKey) {
await this.ensureFullToolVerbose(this.params.sessionKey);
}
if (this.params.mode === "start") {
// Pre-patch verbose for existing sessions (best-effort; new
// sessions don't exist yet so this may fail — we retry below).
if (this.params.sessionKey) {
await this.ensureFullToolVerbose(this.params.sessionKey);
}
const sessionKey = this.params.sessionKey;
const startRes = await this.client.request("agent", {
message: this.params.message ?? "",
const sessionKey = this.params.sessionKey;
const msg = this.params.message ?? "";
this.useChatSend = msg.startsWith("/");
let startRes: GatewayResFrame;
if (this.useChatSend) {
startRes = await this.client.request("chat.send", {
message: msg,
...(sessionKey ? { sessionKey } : {}),
idempotencyKey: randomUUID(),
deliver: false,
});
} else {
startRes = await this.client.request("agent", {
message: msg,
idempotencyKey: randomUUID(),
...(sessionKey ? { sessionKey } : {}),
deliver: false,
@ -789,20 +807,21 @@ class GatewayProcessHandle
lane: this.params.lane ?? "web",
timeout: 0,
});
if (!startRes.ok) {
throw new Error(frameErrorMessage(startRes));
}
const payload = asRecord(startRes.payload);
const runId =
payload && typeof payload.runId === "string" ? payload.runId : null;
this.runId = runId;
}
if (!startRes.ok) {
throw new Error(frameErrorMessage(startRes));
}
const payload = asRecord(startRes.payload);
const runId =
payload && typeof payload.runId === "string" ? payload.runId : null;
this.runId = runId;
// Retry verbose patch now that the agent RPC has created the
// session. This is the critical path for first-message-in-chat
// where the pre-patch above failed.
if (sessionKey) {
await this.ensureFullToolVerbose(sessionKey);
}
// Retry verbose patch now that the RPC has created the
// session. This is the critical path for first-message-in-chat
// where the pre-patch above failed.
if (sessionKey) {
await this.ensureFullToolVerbose(sessionKey);
}
} else {
const sessionKey = this.params.sessionKey;
if (!sessionKey) {
@ -931,6 +950,7 @@ class GatewayProcessHandle
}
if (frame.event === "agent") {
this.receivedAgentEvent = true;
const payload = asRecord(frame.payload);
if (!payload) {
return;
@ -942,7 +962,16 @@ class GatewayProcessHandle
}
const runId = typeof payload.runId === "string" ? payload.runId : undefined;
if (this.runId && runId && runId !== this.runId) {
return;
if (Date.now() <= this.lifecycleErrorRecoveryUntil) {
// The gateway can recover from lifecycle/error by creating
// a continuation run with a new runId under the same session.
// During the recovery window, follow that new run so the UI
// doesn't miss trailing events before final termination.
this.runId = runId;
this.clearLifecycleErrorCloseTimer();
} else {
return;
}
}
const payloadGlobalSeq =
typeof payload.globalSeq === "number" ? payload.globalSeq : undefined;
@ -974,21 +1003,37 @@ class GatewayProcessHandle
const stream = typeof payload.stream === "string" ? payload.stream : "";
const data = asRecord(payload.data);
const phase = data && typeof data.phase === "string" ? data.phase : "";
if (!(stream === "lifecycle" && phase === "error")) {
this.clearLifecycleErrorCloseTimer();
}
if (
this.params.mode === "start" &&
stream === "lifecycle" &&
(phase === "end" || phase === "error")
phase === "end"
) {
this.scheduleClose();
}
if (
this.params.mode === "start" &&
stream === "lifecycle" &&
phase === "error"
) {
this.armLifecycleErrorCloseTimer();
}
return;
}
if (frame.event === "chat") {
// Only forward chat frames in subscribe mode. In start mode
// the agent stream already delivers the full assistant text;
// forwarding chat finals would duplicate the output.
if (this.params.mode !== "subscribe") {
// Forward chat frames in subscribe mode unconditionally.
// In start mode, only forward when using chat.send for
// slash commands — the gateway returns command responses
// as chat events rather than agent events. Skip if we
// already received agent events (agent run started) to
// avoid duplicating the assistant text.
const forwardChat =
this.params.mode === "subscribe" ||
(this.useChatSend && !this.receivedAgentEvent);
if (!forwardChat) {
return;
}
const payload = asRecord(frame.payload) ?? {};
@ -1011,6 +1056,15 @@ class GatewayProcessHandle
...(sessionKey ? { sessionKey } : {}),
};
(this.stdout as PassThrough).write(`${JSON.stringify(event)}\n`);
if (
this.useChatSend &&
this.params.mode === "start" &&
typeof payload.state === "string" &&
payload.state === "final"
) {
this.scheduleClose();
}
return;
}
@ -1036,11 +1090,32 @@ class GatewayProcessHandle
};
(this.stdout as PassThrough).write(`${JSON.stringify(event)}\n`);
if (this.params.mode === "start") {
this.scheduleClose();
this.armLifecycleErrorCloseTimer();
}
}
}
private armLifecycleErrorCloseTimer(): void {
this.lifecycleErrorRecoveryUntil = Date.now() + LIFECYCLE_ERROR_RECOVERY_MS;
this.clearLifecycleErrorCloseTimer();
this.lifecycleErrorCloseTimer = setTimeout(() => {
this.lifecycleErrorCloseTimer = null;
if (this.finished) {
return;
}
this.scheduleClose();
}, LIFECYCLE_ERROR_RECOVERY_MS);
}
private clearLifecycleErrorCloseTimer(): void {
this.lifecycleErrorRecoveryUntil = 0;
if (!this.lifecycleErrorCloseTimer) {
return;
}
clearTimeout(this.lifecycleErrorCloseTimer);
this.lifecycleErrorCloseTimer = null;
}
private scheduleClose(): void {
if (this.closeScheduled || this.finished) {
return;
@ -1073,6 +1148,7 @@ class GatewayProcessHandle
return;
}
this.finished = true;
this.clearLifecycleErrorCloseTimer();
try {
(this.stdout as PassThrough).end();
(this.stderr as PassThrough).end();
@ -1135,6 +1211,7 @@ export function spawnAgentProcess(
message,
sessionKey,
afterSeq: 0,
lane: agentSessionId ? `web:${agentSessionId}` : "web",
});
}