diff --git a/src/agents/subagent-orphan-recovery.test.ts b/src/agents/subagent-orphan-recovery.test.ts new file mode 100644 index 00000000000..7792140e3db --- /dev/null +++ b/src/agents/subagent-orphan-recovery.test.ts @@ -0,0 +1,316 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import type { SubagentRunRecord } from "./subagent-registry.types.js"; + +// Mock dependencies before importing the module under test +vi.mock("../config/config.js", () => ({ + loadConfig: vi.fn(() => ({ + session: { store: undefined }, + })), +})); + +vi.mock("../config/sessions.js", () => ({ + loadSessionStore: vi.fn(() => ({})), + resolveAgentIdFromSessionKey: vi.fn(() => "main"), + resolveStorePath: vi.fn(() => "/tmp/test-sessions.json"), + updateSessionStore: vi.fn(async () => {}), +})); + +vi.mock("../gateway/call.js", () => ({ + callGateway: vi.fn(async () => ({ runId: "test-run-id" })), +})); + +function createTestRunRecord(overrides: Partial = {}): SubagentRunRecord { + return { + runId: "run-1", + childSessionKey: "agent:main:subagent:test-session-1", + requesterSessionKey: "agent:main:signal:direct:+1234567890", + requesterDisplayKey: "main", + task: "Test task: implement feature X", + cleanup: "delete", + createdAt: Date.now() - 60_000, + startedAt: Date.now() - 55_000, + ...overrides, + }; +} + +describe("subagent-orphan-recovery", () => { + beforeEach(() => { + vi.clearAllMocks(); + }); + + afterEach(() => { + vi.restoreAllMocks(); + }); + + it("recovers orphaned sessions with abortedLastRun=true", async () => { + const sessions = await import("../config/sessions.js"); + const gateway = await import("../gateway/call.js"); + + const sessionEntry = { + sessionId: "session-abc", + updatedAt: Date.now(), + abortedLastRun: true, + }; + + vi.mocked(sessions.loadSessionStore).mockReturnValue({ + "agent:main:subagent:test-session-1": sessionEntry, + }); + + const activeRuns = new Map(); + activeRuns.set("run-1", createTestRunRecord()); + + const { recoverOrphanedSubagentSessions } = await import("./subagent-orphan-recovery.js"); + + const result = await recoverOrphanedSubagentSessions({ + getActiveRuns: () => activeRuns, + }); + + expect(result.recovered).toBe(1); + expect(result.failed).toBe(0); + expect(result.skipped).toBe(0); + + // Should have called callGateway to resume the session + expect(gateway.callGateway).toHaveBeenCalledOnce(); + const callArgs = vi.mocked(gateway.callGateway).mock.calls[0]; + const opts = callArgs[0]; + expect(opts.method).toBe("agent"); + const params = opts.params as Record; + expect(params.sessionKey).toBe("agent:main:subagent:test-session-1"); + expect(params.message).toContain("gateway reload"); + expect(params.message).toContain("Test task: implement feature X"); + }); + + it("skips sessions that are not aborted", async () => { + const sessions = await import("../config/sessions.js"); + const gateway = await import("../gateway/call.js"); + + vi.mocked(sessions.loadSessionStore).mockReturnValue({ + "agent:main:subagent:test-session-1": { + sessionId: "session-abc", + updatedAt: Date.now(), + abortedLastRun: false, + }, + }); + + const activeRuns = new Map(); + activeRuns.set("run-1", createTestRunRecord()); + + const { recoverOrphanedSubagentSessions } = await import("./subagent-orphan-recovery.js"); + + const result = await recoverOrphanedSubagentSessions({ + getActiveRuns: () => activeRuns, + }); + + expect(result.recovered).toBe(0); + expect(result.skipped).toBe(1); + expect(gateway.callGateway).not.toHaveBeenCalled(); + }); + + it("skips runs that have already ended", async () => { + const gateway = await import("../gateway/call.js"); + + const activeRuns = new Map(); + activeRuns.set( + "run-1", + createTestRunRecord({ + endedAt: Date.now() - 1000, + }), + ); + + const { recoverOrphanedSubagentSessions } = await import("./subagent-orphan-recovery.js"); + + const result = await recoverOrphanedSubagentSessions({ + getActiveRuns: () => activeRuns, + }); + + expect(result.recovered).toBe(0); + expect(gateway.callGateway).not.toHaveBeenCalled(); + }); + + it("handles multiple orphaned sessions", async () => { + const sessions = await import("../config/sessions.js"); + const gateway = await import("../gateway/call.js"); + + vi.mocked(sessions.loadSessionStore).mockReturnValue({ + "agent:main:subagent:session-a": { + sessionId: "id-a", + updatedAt: Date.now(), + abortedLastRun: true, + }, + "agent:main:subagent:session-b": { + sessionId: "id-b", + updatedAt: Date.now(), + abortedLastRun: true, + }, + "agent:main:subagent:session-c": { + sessionId: "id-c", + updatedAt: Date.now(), + abortedLastRun: false, + }, + }); + + const activeRuns = new Map(); + activeRuns.set( + "run-a", + createTestRunRecord({ + runId: "run-a", + childSessionKey: "agent:main:subagent:session-a", + task: "Task A", + }), + ); + activeRuns.set( + "run-b", + createTestRunRecord({ + runId: "run-b", + childSessionKey: "agent:main:subagent:session-b", + task: "Task B", + }), + ); + activeRuns.set( + "run-c", + createTestRunRecord({ + runId: "run-c", + childSessionKey: "agent:main:subagent:session-c", + task: "Task C", + }), + ); + + const { recoverOrphanedSubagentSessions } = await import("./subagent-orphan-recovery.js"); + + const result = await recoverOrphanedSubagentSessions({ + getActiveRuns: () => activeRuns, + }); + + expect(result.recovered).toBe(2); + expect(result.skipped).toBe(1); + expect(gateway.callGateway).toHaveBeenCalledTimes(2); + }); + + it("handles callGateway failure gracefully", async () => { + const sessions = await import("../config/sessions.js"); + const gateway = await import("../gateway/call.js"); + + vi.mocked(sessions.loadSessionStore).mockReturnValue({ + "agent:main:subagent:test-session-1": { + sessionId: "session-abc", + updatedAt: Date.now(), + abortedLastRun: true, + }, + }); + + vi.mocked(gateway.callGateway).mockRejectedValue(new Error("gateway unavailable")); + + const activeRuns = new Map(); + activeRuns.set("run-1", createTestRunRecord()); + + const { recoverOrphanedSubagentSessions } = await import("./subagent-orphan-recovery.js"); + + const result = await recoverOrphanedSubagentSessions({ + getActiveRuns: () => activeRuns, + }); + + expect(result.recovered).toBe(0); + expect(result.failed).toBe(1); + }); + + it("returns empty results when no active runs exist", async () => { + const { recoverOrphanedSubagentSessions } = await import("./subagent-orphan-recovery.js"); + + const result = await recoverOrphanedSubagentSessions({ + getActiveRuns: () => new Map(), + }); + + expect(result.recovered).toBe(0); + expect(result.failed).toBe(0); + expect(result.skipped).toBe(0); + }); + + it("skips sessions with missing session entry in store", async () => { + const sessions = await import("../config/sessions.js"); + const gateway = await import("../gateway/call.js"); + + // Store has no matching entry + vi.mocked(sessions.loadSessionStore).mockReturnValue({}); + + const activeRuns = new Map(); + activeRuns.set("run-1", createTestRunRecord()); + + const { recoverOrphanedSubagentSessions } = await import("./subagent-orphan-recovery.js"); + + const result = await recoverOrphanedSubagentSessions({ + getActiveRuns: () => activeRuns, + }); + + expect(result.recovered).toBe(0); + expect(result.skipped).toBe(1); + expect(gateway.callGateway).not.toHaveBeenCalled(); + }); + + it("clears abortedLastRun flag before resuming", async () => { + const sessions = await import("../config/sessions.js"); + + vi.mocked(sessions.loadSessionStore).mockReturnValue({ + "agent:main:subagent:test-session-1": { + sessionId: "session-abc", + updatedAt: Date.now(), + abortedLastRun: true, + }, + }); + + const activeRuns = new Map(); + activeRuns.set("run-1", createTestRunRecord()); + + const { recoverOrphanedSubagentSessions } = await import("./subagent-orphan-recovery.js"); + + await recoverOrphanedSubagentSessions({ + getActiveRuns: () => activeRuns, + }); + + // updateSessionStore should have been called to clear the flag + expect(sessions.updateSessionStore).toHaveBeenCalledOnce(); + const calls = vi.mocked(sessions.updateSessionStore).mock.calls; + const [storePath, updater] = calls[0]; + expect(storePath).toBe("/tmp/test-sessions.json"); + + // Simulate the updater to verify it clears abortedLastRun + const mockStore: Record = { + "agent:main:subagent:test-session-1": { + abortedLastRun: true, + updatedAt: 0, + }, + }; + (updater as (store: Record) => void)(mockStore); + expect(mockStore["agent:main:subagent:test-session-1"]?.abortedLastRun).toBe(false); + }); + + it("truncates long task descriptions in resume message", async () => { + const sessions = await import("../config/sessions.js"); + const gateway = await import("../gateway/call.js"); + + vi.mocked(sessions.loadSessionStore).mockReturnValue({ + "agent:main:subagent:test-session-1": { + sessionId: "session-abc", + updatedAt: Date.now(), + abortedLastRun: true, + }, + }); + + const longTask = "x".repeat(5000); + const activeRuns = new Map(); + activeRuns.set("run-1", createTestRunRecord({ task: longTask })); + + const { recoverOrphanedSubagentSessions } = await import("./subagent-orphan-recovery.js"); + + await recoverOrphanedSubagentSessions({ + getActiveRuns: () => activeRuns, + }); + + const callArgs = vi.mocked(gateway.callGateway).mock.calls[0]; + const opts = callArgs[0]; + const params = opts.params as Record; + const message = params.message as string; + // Message should contain truncated task (2000 chars + "...") + expect(message.length).toBeLessThan(5000); + expect(message).toContain("..."); + }); +}); diff --git a/src/agents/subagent-orphan-recovery.ts b/src/agents/subagent-orphan-recovery.ts new file mode 100644 index 00000000000..fc30af26038 --- /dev/null +++ b/src/agents/subagent-orphan-recovery.ts @@ -0,0 +1,185 @@ +/** + * Post-restart orphan recovery for subagent sessions. + * + * After a SIGUSR1 gateway reload aborts in-flight subagent LLM calls, + * this module scans for orphaned sessions (those with `abortedLastRun: true` + * that are still tracked as active in the subagent registry) and sends a + * synthetic resume message to restart their work. + * + * @see https://github.com/openclaw/openclaw/issues/47711 + */ + +import crypto from "node:crypto"; +import { loadConfig } from "../config/config.js"; +import { + loadSessionStore, + resolveAgentIdFromSessionKey, + resolveStorePath, + updateSessionStore, + type SessionEntry, +} from "../config/sessions.js"; +import { callGateway } from "../gateway/call.js"; +import { createSubsystemLogger } from "../logging/subsystem.js"; +import type { SubagentRunRecord } from "./subagent-registry.types.js"; + +const log = createSubsystemLogger("subagent-orphan-recovery"); + +/** Delay before attempting recovery to let the gateway finish bootstrapping. */ +const DEFAULT_RECOVERY_DELAY_MS = 5_000; + +/** + * Build the resume message for an orphaned subagent. + */ +function buildResumeMessage(task: string): string { + const maxTaskLen = 2000; + const truncatedTask = task.length > maxTaskLen ? `${task.slice(0, maxTaskLen)}...` : task; + + return ( + `[System] Your previous turn was interrupted by a gateway reload. ` + + `Your task was:\n\n${truncatedTask}\n\nPlease continue where you left off.` + ); +} + +/** + * Send a resume message to an orphaned subagent session via the gateway agent method. + */ +async function resumeOrphanedSession(params: { + sessionKey: string; + task: string; +}): Promise { + const resumeMessage = buildResumeMessage(params.task); + + try { + await callGateway<{ runId: string }>({ + method: "agent", + params: { + message: resumeMessage, + sessionKey: params.sessionKey, + idempotencyKey: crypto.randomUUID(), + deliver: false, + lane: "subagent", + }, + timeoutMs: 10_000, + }); + log.info(`resumed orphaned session: ${params.sessionKey}`); + return true; + } catch (err) { + log.warn(`failed to resume orphaned session ${params.sessionKey}: ${String(err)}`); + return false; + } +} + +/** + * Scan for and resume orphaned subagent sessions after a gateway restart. + * + * An orphaned session is one where: + * 1. It has an active (not ended) entry in the subagent run registry + * 2. Its session store entry has `abortedLastRun: true` + * + * For each orphaned session found, we: + * 1. Clear the `abortedLastRun` flag + * 2. Send a synthetic resume message to trigger a new LLM turn + */ +export async function recoverOrphanedSubagentSessions(params: { + getActiveRuns: () => Map; +}): Promise<{ recovered: number; failed: number; skipped: number }> { + const result = { recovered: 0, failed: 0, skipped: 0 }; + + try { + const activeRuns = params.getActiveRuns(); + if (activeRuns.size === 0) { + return result; + } + + const cfg = loadConfig(); + const storeCache = new Map>(); + + for (const [runId, runRecord] of activeRuns.entries()) { + // Only consider runs that haven't ended yet + if (typeof runRecord.endedAt === "number" && runRecord.endedAt > 0) { + continue; + } + + const childSessionKey = runRecord.childSessionKey?.trim(); + if (!childSessionKey) { + continue; + } + + try { + const agentId = resolveAgentIdFromSessionKey(childSessionKey); + const storePath = resolveStorePath(cfg.session?.store, { agentId }); + + let store = storeCache.get(storePath); + if (!store) { + store = loadSessionStore(storePath); + storeCache.set(storePath, store); + } + + const entry = store[childSessionKey]; + if (!entry) { + result.skipped++; + continue; + } + + // Check if this session was aborted by the restart + if (!entry.abortedLastRun) { + result.skipped++; + continue; + } + + log.info(`found orphaned subagent session: ${childSessionKey} (run=${runId})`); + + // Clear the aborted flag before resuming + await updateSessionStore(storePath, (currentStore) => { + const current = currentStore[childSessionKey]; + if (current) { + current.abortedLastRun = false; + current.updatedAt = Date.now(); + currentStore[childSessionKey] = current; + } + }); + + // Resume the session with the original task context + const resumed = await resumeOrphanedSession({ + sessionKey: childSessionKey, + task: runRecord.task, + }); + + if (resumed) { + result.recovered++; + } else { + result.failed++; + } + } catch (err) { + log.warn(`error processing orphaned session ${childSessionKey}: ${String(err)}`); + result.failed++; + } + } + } catch (err) { + log.warn(`orphan recovery scan failed: ${String(err)}`); + } + + if (result.recovered > 0 || result.failed > 0) { + log.info( + `orphan recovery complete: recovered=${result.recovered} failed=${result.failed} skipped=${result.skipped}`, + ); + } + + return result; +} + +/** + * Schedule orphan recovery after a delay. + * The delay gives the gateway time to fully bootstrap after restart. + */ +export function scheduleOrphanRecovery(params: { + getActiveRuns: () => Map; + delayMs?: number; +}): void { + const delay = params.delayMs ?? DEFAULT_RECOVERY_DELAY_MS; + setTimeout(() => { + void recoverOrphanedSubagentSessions(params).catch((err) => { + log.warn(`scheduled orphan recovery failed: ${String(err)}`); + }); + }, delay).unref?.(); +} diff --git a/src/agents/subagent-registry.ts b/src/agents/subagent-registry.ts index d9c593c3e84..68fd499cbee 100644 --- a/src/agents/subagent-registry.ts +++ b/src/agents/subagent-registry.ts @@ -30,6 +30,7 @@ import { SUBAGENT_ENDED_REASON_KILLED, type SubagentLifecycleEndedReason, } from "./subagent-lifecycle-events.js"; +import { scheduleOrphanRecovery } from "./subagent-orphan-recovery.js"; import { resolveCleanupCompletionReason, resolveDeferredCleanupDecision, @@ -684,6 +685,13 @@ function restoreSubagentRunsOnce() { for (const runId of subagentRuns.keys()) { resumeSubagentRun(runId); } + + // Schedule orphan recovery for subagent sessions that were aborted + // by a SIGUSR1 reload. This runs after a short delay to let the + // gateway fully bootstrap first. (#47711) + scheduleOrphanRecovery({ + getActiveRuns: () => subagentRuns, + }); } catch { // ignore restore failures } diff --git a/src/config/schema.help.ts b/src/config/schema.help.ts index d591ba53533..8a71c0e9035 100644 --- a/src/config/schema.help.ts +++ b/src/config/schema.help.ts @@ -427,6 +427,8 @@ export const FIELD_HELP: Record = { "gateway.reload.mode": 'Controls how config edits are applied: "off" ignores live edits, "restart" always restarts, "hot" applies in-process, and "hybrid" tries hot then restarts if required. Keep "hybrid" for safest routine updates.', "gateway.reload.debounceMs": "Debounce window (ms) before applying config changes.", + "gateway.reload.deferralTimeoutMs": + "Maximum time (ms) to wait for in-flight operations to complete before forcing a SIGUSR1 restart. Default: 300000 (5 minutes). Lower values risk aborting active subagent LLM calls.", "gateway.nodes.browser.mode": 'Node browser routing ("auto" = pick single connected browser node, "manual" = require node param, "off" = disable).', "gateway.nodes.browser.node": "Pin browser routing to a specific node id or name (optional).", diff --git a/src/config/types.gateway.ts b/src/config/types.gateway.ts index 88a5350ab1d..385ece27aad 100644 --- a/src/config/types.gateway.ts +++ b/src/config/types.gateway.ts @@ -211,6 +211,13 @@ export type GatewayReloadConfig = { mode?: GatewayReloadMode; /** Debounce window for config reloads (ms). Default: 300. */ debounceMs?: number; + /** + * Maximum time (ms) to wait for in-flight operations to complete before + * forcing a SIGUSR1 restart. Default: 300000 (5 minutes). + * Lower values risk aborting active subagent LLM calls. + * @see https://github.com/openclaw/openclaw/issues/47711 + */ + deferralTimeoutMs?: number; }; export type GatewayHttpChatCompletionsConfig = { diff --git a/src/config/zod-schema.ts b/src/config/zod-schema.ts index 20b8b232157..7c9b510080f 100644 --- a/src/config/zod-schema.ts +++ b/src/config/zod-schema.ts @@ -728,6 +728,7 @@ export const OpenClawSchema = z ]) .optional(), debounceMs: z.number().int().min(0).optional(), + deferralTimeoutMs: z.number().int().min(0).optional(), }) .strict() .optional(), diff --git a/src/gateway/server-reload-handlers.ts b/src/gateway/server-reload-handlers.ts index 008f0977d37..b92d71889bc 100644 --- a/src/gateway/server-reload-handlers.ts +++ b/src/gateway/server-reload-handlers.ts @@ -219,6 +219,7 @@ export function createGatewayReloadHandlers(params: { deferGatewayRestartUntilIdle({ getPendingCount: () => getActiveCounts().totalActive, + maxWaitMs: nextConfig.gateway?.reload?.deferralTimeoutMs, hooks: { onReady: () => { restartPending = false; diff --git a/src/infra/restart.deferral-timeout.test.ts b/src/infra/restart.deferral-timeout.test.ts new file mode 100644 index 00000000000..167fe95ccdc --- /dev/null +++ b/src/infra/restart.deferral-timeout.test.ts @@ -0,0 +1,119 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { __testing, deferGatewayRestartUntilIdle, type RestartDeferralHooks } from "./restart.js"; + +describe("deferGatewayRestartUntilIdle timeout", () => { + beforeEach(() => { + vi.useFakeTimers(); + __testing.resetSigusr1State(); + // Add a listener so emitGatewayRestart uses process.emit instead of process.kill + process.on("SIGUSR1", () => {}); + }); + + afterEach(() => { + vi.useRealTimers(); + vi.restoreAllMocks(); + __testing.resetSigusr1State(); + process.removeAllListeners("SIGUSR1"); + }); + + it("uses default 5-minute timeout when maxWaitMs is not specified", () => { + const hooks: RestartDeferralHooks = { + onTimeout: vi.fn(), + onReady: vi.fn(), + }; + + // Always return 1 pending item to prevent draining + deferGatewayRestartUntilIdle({ + getPendingCount: () => 1, + hooks, + }); + + // Advance to just before 5 minutes — should NOT have timed out yet + vi.advanceTimersByTime(299_999); + expect(hooks.onTimeout).not.toHaveBeenCalled(); + + // Advance past 5 minutes — should time out + vi.advanceTimersByTime(1); + expect(hooks.onTimeout).toHaveBeenCalledOnce(); + expect(hooks.onReady).not.toHaveBeenCalled(); + }); + + it("respects custom maxWaitMs configuration", () => { + const hooks: RestartDeferralHooks = { + onTimeout: vi.fn(), + onReady: vi.fn(), + }; + + const customTimeoutMs = 120_000; // 2 minutes + + deferGatewayRestartUntilIdle({ + getPendingCount: () => 1, + maxWaitMs: customTimeoutMs, + hooks, + }); + + // Advance to just before 2 minutes + vi.advanceTimersByTime(119_999); + expect(hooks.onTimeout).not.toHaveBeenCalled(); + + // Advance past 2 minutes + vi.advanceTimersByTime(1); + expect(hooks.onTimeout).toHaveBeenCalledOnce(); + }); + + it("calls onReady and does not timeout when pending count drops to 0", () => { + const hooks: RestartDeferralHooks = { + onTimeout: vi.fn(), + onReady: vi.fn(), + }; + + let pending = 3; + + deferGatewayRestartUntilIdle({ + getPendingCount: () => pending, + hooks, + }); + + // Advance a few poll intervals, then drain + vi.advanceTimersByTime(1000); + expect(hooks.onReady).not.toHaveBeenCalled(); + + pending = 0; + vi.advanceTimersByTime(500); // Next poll interval + expect(hooks.onReady).toHaveBeenCalledOnce(); + expect(hooks.onTimeout).not.toHaveBeenCalled(); + }); + + it("immediately restarts when pending count is 0", () => { + const hooks: RestartDeferralHooks = { + onReady: vi.fn(), + onTimeout: vi.fn(), + }; + + deferGatewayRestartUntilIdle({ + getPendingCount: () => 0, + hooks, + }); + + // onReady should be called synchronously + expect(hooks.onReady).toHaveBeenCalledOnce(); + expect(hooks.onTimeout).not.toHaveBeenCalled(); + }); + + it("handles getPendingCount error by restarting immediately", () => { + const hooks: RestartDeferralHooks = { + onCheckError: vi.fn(), + onReady: vi.fn(), + }; + + deferGatewayRestartUntilIdle({ + getPendingCount: () => { + throw new Error("store corrupted"); + }, + hooks, + }); + + expect(hooks.onCheckError).toHaveBeenCalledOnce(); + expect(hooks.onReady).not.toHaveBeenCalled(); + }); +}); diff --git a/src/infra/restart.ts b/src/infra/restart.ts index 3e0379f25f2..1238b4954d0 100644 --- a/src/infra/restart.ts +++ b/src/infra/restart.ts @@ -19,8 +19,9 @@ export type RestartAttempt = { const SPAWN_TIMEOUT_MS = 2000; const SIGUSR1_AUTH_GRACE_MS = 5000; const DEFAULT_DEFERRAL_POLL_MS = 500; -// Cover slow in-flight embedded compaction work before forcing restart. -const DEFAULT_DEFERRAL_MAX_WAIT_MS = 90_000; +// Default to 5 minutes to avoid aborting in-flight subagent LLM calls. +// Configurable via gateway.reload.deferralTimeoutMs. +const DEFAULT_DEFERRAL_MAX_WAIT_MS = 300_000; const RESTART_COOLDOWN_MS = 30_000; const restartLog = createSubsystemLogger("restart");