From f109f3d4c2d90f9578dd3ea23a3414ab2218cd67 Mon Sep 17 00:00:00 2001 From: Joey Krug Date: Sun, 15 Mar 2026 23:05:08 -0400 Subject: [PATCH] fix: address all review comments on PR #47719 + implement resume context and config idempotency guard --- src/agents/subagent-orphan-recovery.test.ts | 109 ++++++++++++++++++++ src/agents/subagent-orphan-recovery.ts | 101 +++++++++++++++--- src/cli/daemon-cli/lifecycle.ts | 6 +- 3 files changed, 201 insertions(+), 15 deletions(-) diff --git a/src/agents/subagent-orphan-recovery.test.ts b/src/agents/subagent-orphan-recovery.test.ts index 351e1079dc7..56b652b3b42 100644 --- a/src/agents/subagent-orphan-recovery.test.ts +++ b/src/agents/subagent-orphan-recovery.test.ts @@ -19,6 +19,14 @@ vi.mock("../gateway/call.js", () => ({ callGateway: vi.fn(async () => ({ runId: "test-run-id" })), })); +vi.mock("../gateway/session-utils.fs.js", () => ({ + readSessionMessages: vi.fn(() => []), +})); + +vi.mock("./subagent-registry.js", () => ({ + replaceSubagentRunAfterSteer: vi.fn(() => true), +})); + function createTestRunRecord(overrides: Partial = {}): SubagentRunRecord { return { runId: "run-1", @@ -45,6 +53,7 @@ describe("subagent-orphan-recovery", () => { it("recovers orphaned sessions with abortedLastRun=true", async () => { const sessions = await import("../config/sessions.js"); const gateway = await import("../gateway/call.js"); + const subagentRegistry = await import("./subagent-registry.js"); const sessionEntry = { sessionId: "session-abc", @@ -78,6 +87,10 @@ describe("subagent-orphan-recovery", () => { expect(params.sessionKey).toBe("agent:main:subagent:test-session-1"); expect(params.message).toContain("gateway reload"); expect(params.message).toContain("Test task: implement feature X"); + expect(subagentRegistry.replaceSubagentRunAfterSteer).toHaveBeenCalledWith({ + previousRunId: "run-1", + nextRunId: "test-run-id", + }); }); it("skips sessions that are not aborted", async () => { @@ -321,4 +334,100 @@ describe("subagent-orphan-recovery", () => { expect(message.length).toBeLessThan(5000); expect(message).toContain("..."); }); + + it("includes last human message in resume when available", async () => { + const sessions = await import("../config/sessions.js"); + const gateway = await import("../gateway/call.js"); + const sessionUtils = await import("../gateway/session-utils.fs.js"); + + vi.mocked(sessions.loadSessionStore).mockReturnValue({ + "agent:main:subagent:test-session-1": { + sessionId: "session-abc", + updatedAt: Date.now(), + abortedLastRun: true, + sessionFile: "session-abc.jsonl", + }, + }); + + vi.mocked(sessionUtils.readSessionMessages).mockReturnValue([ + { role: "user", content: [{ type: "text", text: "Please build feature Y" }] }, + { role: "assistant", content: [{ type: "text", text: "Working on it..." }] }, + { role: "user", content: [{ type: "text", text: "Also add tests for it" }] }, + { role: "assistant", content: [{ type: "text", text: "Sure, adding tests now." }] }, + ]); + + const activeRuns = new Map(); + activeRuns.set("run-1", createTestRunRecord()); + + const { recoverOrphanedSubagentSessions } = await import("./subagent-orphan-recovery.js"); + await recoverOrphanedSubagentSessions({ getActiveRuns: () => activeRuns }); + + const callArgs = vi.mocked(gateway.callGateway).mock.calls[0]; + const params = callArgs[0].params as Record; + const message = params.message as string; + expect(message).toContain("Also add tests for it"); + expect(message).toContain("last message from the user"); + }); + + it("adds config change hint when assistant messages reference config modifications", async () => { + const sessions = await import("../config/sessions.js"); + const gateway = await import("../gateway/call.js"); + const sessionUtils = await import("../gateway/session-utils.fs.js"); + + vi.mocked(sessions.loadSessionStore).mockReturnValue({ + "agent:main:subagent:test-session-1": { + sessionId: "session-abc", + updatedAt: Date.now(), + abortedLastRun: true, + }, + }); + + vi.mocked(sessionUtils.readSessionMessages).mockReturnValue([ + { role: "user", content: "Update the config" }, + { role: "assistant", content: "I've modified openclaw.json to add the new setting." }, + ]); + + const activeRuns = new Map(); + activeRuns.set("run-1", createTestRunRecord()); + + const { recoverOrphanedSubagentSessions } = await import("./subagent-orphan-recovery.js"); + await recoverOrphanedSubagentSessions({ getActiveRuns: () => activeRuns }); + + const callArgs = vi.mocked(gateway.callGateway).mock.calls[0]; + const params = callArgs[0].params as Record; + const message = params.message as string; + expect(message).toContain("config changes from your previous run were already applied"); + }); + + it("prevents duplicate resume when updateSessionStore fails", async () => { + const sessions = await import("../config/sessions.js"); + const gateway = await import("../gateway/call.js"); + + vi.mocked(gateway.callGateway).mockResolvedValue({ runId: "new-run" } as never); + vi.mocked(sessions.updateSessionStore).mockRejectedValue(new Error("write failed")); + + vi.mocked(sessions.loadSessionStore).mockReturnValue({ + "agent:main:subagent:test-session-1": { + sessionId: "session-abc", + updatedAt: Date.now(), + abortedLastRun: true, + }, + }); + + const activeRuns = new Map(); + activeRuns.set("run-1", createTestRunRecord()); + activeRuns.set( + "run-2", + createTestRunRecord({ + runId: "run-2", + }), + ); + + const { recoverOrphanedSubagentSessions } = await import("./subagent-orphan-recovery.js"); + const result = await recoverOrphanedSubagentSessions({ getActiveRuns: () => activeRuns }); + + expect(result.recovered).toBe(1); + expect(result.skipped).toBe(1); + expect(gateway.callGateway).toHaveBeenCalledOnce(); + }); }); diff --git a/src/agents/subagent-orphan-recovery.ts b/src/agents/subagent-orphan-recovery.ts index 02dfa1528be..320d96f3727 100644 --- a/src/agents/subagent-orphan-recovery.ts +++ b/src/agents/subagent-orphan-recovery.ts @@ -19,7 +19,9 @@ import { type SessionEntry, } from "../config/sessions.js"; import { callGateway } from "../gateway/call.js"; +import { readSessionMessages } from "../gateway/session-utils.fs.js"; import { createSubsystemLogger } from "../logging/subsystem.js"; +import { replaceSubagentRunAfterSteer } from "./subagent-registry.js"; import type { SubagentRunRecord } from "./subagent-registry.types.js"; const log = createSubsystemLogger("subagent-orphan-recovery"); @@ -30,14 +32,45 @@ const DEFAULT_RECOVERY_DELAY_MS = 5_000; /** * Build the resume message for an orphaned subagent. */ -function buildResumeMessage(task: string): string { +function buildResumeMessage(task: string, lastHumanMessage?: string): string { const maxTaskLen = 2000; const truncatedTask = task.length > maxTaskLen ? `${task.slice(0, maxTaskLen)}...` : task; - return ( + let message = `[System] Your previous turn was interrupted by a gateway reload. ` + - `Your task was:\n\n${truncatedTask}\n\nPlease continue where you left off.` - ); + `Your original task was:\n\n${truncatedTask}\n\n`; + + if (lastHumanMessage) { + message += `The last message from the user before the interruption was:\n\n${lastHumanMessage}\n\n`; + } + + message += `Please continue where you left off.`; + return message; +} + +function extractMessageText(msg: unknown): string | undefined { + if (!msg || typeof msg !== "object") { + return undefined; + } + const m = msg as Record; + if (typeof m.content === "string") { + return m.content; + } + if (Array.isArray(m.content)) { + const text = m.content + .filter( + (c: unknown) => + typeof c === "object" && + c !== null && + (c as Record).type === "text" && + typeof (c as Record).text === "string", + ) + .map((c: unknown) => (c as Record).text) + .filter(Boolean) + .join("\n"); + return text || undefined; + } + return undefined; } /** @@ -46,11 +79,17 @@ function buildResumeMessage(task: string): string { async function resumeOrphanedSession(params: { sessionKey: string; task: string; + lastHumanMessage?: string; + configChangeHint?: string; + originalRunId: string; }): Promise { - const resumeMessage = buildResumeMessage(params.task); + let resumeMessage = buildResumeMessage(params.task, params.lastHumanMessage); + if (params.configChangeHint) { + resumeMessage += params.configChangeHint; + } try { - await callGateway<{ runId: string }>({ + const result = await callGateway<{ runId: string }>({ method: "agent", params: { message: resumeMessage, @@ -61,6 +100,10 @@ async function resumeOrphanedSession(params: { }, timeoutMs: 10_000, }); + replaceSubagentRunAfterSteer({ + previousRunId: params.originalRunId, + nextRunId: result.runId, + }); log.info(`resumed orphaned session: ${params.sessionKey}`); return true; } catch (err) { @@ -84,6 +127,8 @@ export async function recoverOrphanedSubagentSessions(params: { getActiveRuns: () => Map; }): Promise<{ recovered: number; failed: number; skipped: number }> { const result = { recovered: 0, failed: 0, skipped: 0 }; + const resumedSessionKeys = new Set(); + const configChangePattern = /openclaw\.json|openclaw gateway restart|config\.patch/i; try { const activeRuns = params.getActiveRuns(); @@ -104,6 +149,10 @@ export async function recoverOrphanedSubagentSessions(params: { if (!childSessionKey) { continue; } + if (resumedSessionKeys.has(childSessionKey)) { + result.skipped++; + continue; + } try { const agentId = resolveAgentIdFromSessionKey(childSessionKey); @@ -129,6 +178,18 @@ export async function recoverOrphanedSubagentSessions(params: { log.info(`found orphaned subagent session: ${childSessionKey} (run=${runId})`); + const messages = readSessionMessages(entry.sessionId, storePath, entry.sessionFile); + const lastHumanMessage = [...messages] + .toReversed() + .find((msg) => (msg as { role?: unknown } | null)?.role === "user"); + const configChangeDetected = messages.some((msg) => { + if ((msg as { role?: unknown } | null)?.role !== "assistant") { + return false; + } + const text = extractMessageText(msg); + return typeof text === "string" && configChangePattern.test(text); + }); + // Resume the session with the original task context. // We intentionally do NOT clear abortedLastRun before attempting // the resume — if callGateway fails (e.g. gateway still booting), @@ -136,18 +197,30 @@ export async function recoverOrphanedSubagentSessions(params: { const resumed = await resumeOrphanedSession({ sessionKey: childSessionKey, task: runRecord.task, + lastHumanMessage: extractMessageText(lastHumanMessage), + configChangeHint: configChangeDetected + ? "\n\n[config changes from your previous run were already applied — do not re-modify openclaw.json or restart the gateway]" + : undefined, + originalRunId: runId, }); if (resumed) { + resumedSessionKeys.add(childSessionKey); // Only clear the aborted flag after confirmed successful resume. - await updateSessionStore(storePath, (currentStore) => { - const current = currentStore[childSessionKey]; - if (current) { - current.abortedLastRun = false; - current.updatedAt = Date.now(); - currentStore[childSessionKey] = current; - } - }); + try { + await updateSessionStore(storePath, (currentStore) => { + const current = currentStore[childSessionKey]; + if (current) { + current.abortedLastRun = false; + current.updatedAt = Date.now(); + currentStore[childSessionKey] = current; + } + }); + } catch (err) { + log.warn( + `resume succeeded but failed to update session store for ${childSessionKey}: ${String(err)}`, + ); + } result.recovered++; } else { // Flag stays as abortedLastRun=true so next restart can retry diff --git a/src/cli/daemon-cli/lifecycle.ts b/src/cli/daemon-cli/lifecycle.ts index 53efaff9495..76099fe956c 100644 --- a/src/cli/daemon-cli/lifecycle.ts +++ b/src/cli/daemon-cli/lifecycle.ts @@ -50,8 +50,12 @@ function resolveGatewayPortFallback(): Promise { } async function assertUnmanagedGatewayRestartEnabled(port: number): Promise { + const cfg = await readBestEffortConfig().catch(() => undefined); + const tlsEnabled = !!(cfg as { gateway?: { tls?: { enabled?: unknown } } } | undefined)?.gateway + ?.tls?.enabled; + const scheme = tlsEnabled ? "wss" : "ws"; const probe = await probeGateway({ - url: `ws://127.0.0.1:${port}`, + url: `${scheme}://127.0.0.1:${port}`, auth: { token: process.env.OPENCLAW_GATEWAY_TOKEN?.trim() || undefined, password: process.env.OPENCLAW_GATEWAY_PASSWORD?.trim() || undefined,