fix: resume orphaned subagent sessions after SIGUSR1 reload

Closes #47711

After a SIGUSR1 gateway reload aborts in-flight subagent LLM calls, the gateway now scans for orphaned sessions and sends a synthetic resume message to restart their work. Also makes the deferral timeout configurable via gateway.reload.deferralTimeoutMs (default: 5 minutes, up from 90s).
This commit is contained in:
Joey Krug 2026-03-15 21:04:39 -04:00 committed by Peter Steinberger
parent abe7ea4373
commit 7b098ea79f
9 changed files with 642 additions and 2 deletions

View File

@ -0,0 +1,316 @@
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import type { SubagentRunRecord } from "./subagent-registry.types.js";
// Mock dependencies before importing the module under test
vi.mock("../config/config.js", () => ({
loadConfig: vi.fn(() => ({
session: { store: undefined },
})),
}));
vi.mock("../config/sessions.js", () => ({
loadSessionStore: vi.fn(() => ({})),
resolveAgentIdFromSessionKey: vi.fn(() => "main"),
resolveStorePath: vi.fn(() => "/tmp/test-sessions.json"),
updateSessionStore: vi.fn(async () => {}),
}));
vi.mock("../gateway/call.js", () => ({
callGateway: vi.fn(async () => ({ runId: "test-run-id" })),
}));
function createTestRunRecord(overrides: Partial<SubagentRunRecord> = {}): SubagentRunRecord {
return {
runId: "run-1",
childSessionKey: "agent:main:subagent:test-session-1",
requesterSessionKey: "agent:main:signal:direct:+1234567890",
requesterDisplayKey: "main",
task: "Test task: implement feature X",
cleanup: "delete",
createdAt: Date.now() - 60_000,
startedAt: Date.now() - 55_000,
...overrides,
};
}
describe("subagent-orphan-recovery", () => {
beforeEach(() => {
vi.clearAllMocks();
});
afterEach(() => {
vi.restoreAllMocks();
});
it("recovers orphaned sessions with abortedLastRun=true", async () => {
const sessions = await import("../config/sessions.js");
const gateway = await import("../gateway/call.js");
const sessionEntry = {
sessionId: "session-abc",
updatedAt: Date.now(),
abortedLastRun: true,
};
vi.mocked(sessions.loadSessionStore).mockReturnValue({
"agent:main:subagent:test-session-1": sessionEntry,
});
const activeRuns = new Map<string, SubagentRunRecord>();
activeRuns.set("run-1", createTestRunRecord());
const { recoverOrphanedSubagentSessions } = await import("./subagent-orphan-recovery.js");
const result = await recoverOrphanedSubagentSessions({
getActiveRuns: () => activeRuns,
});
expect(result.recovered).toBe(1);
expect(result.failed).toBe(0);
expect(result.skipped).toBe(0);
// Should have called callGateway to resume the session
expect(gateway.callGateway).toHaveBeenCalledOnce();
const callArgs = vi.mocked(gateway.callGateway).mock.calls[0];
const opts = callArgs[0];
expect(opts.method).toBe("agent");
const params = opts.params as Record<string, unknown>;
expect(params.sessionKey).toBe("agent:main:subagent:test-session-1");
expect(params.message).toContain("gateway reload");
expect(params.message).toContain("Test task: implement feature X");
});
it("skips sessions that are not aborted", async () => {
const sessions = await import("../config/sessions.js");
const gateway = await import("../gateway/call.js");
vi.mocked(sessions.loadSessionStore).mockReturnValue({
"agent:main:subagent:test-session-1": {
sessionId: "session-abc",
updatedAt: Date.now(),
abortedLastRun: false,
},
});
const activeRuns = new Map<string, SubagentRunRecord>();
activeRuns.set("run-1", createTestRunRecord());
const { recoverOrphanedSubagentSessions } = await import("./subagent-orphan-recovery.js");
const result = await recoverOrphanedSubagentSessions({
getActiveRuns: () => activeRuns,
});
expect(result.recovered).toBe(0);
expect(result.skipped).toBe(1);
expect(gateway.callGateway).not.toHaveBeenCalled();
});
it("skips runs that have already ended", async () => {
const gateway = await import("../gateway/call.js");
const activeRuns = new Map<string, SubagentRunRecord>();
activeRuns.set(
"run-1",
createTestRunRecord({
endedAt: Date.now() - 1000,
}),
);
const { recoverOrphanedSubagentSessions } = await import("./subagent-orphan-recovery.js");
const result = await recoverOrphanedSubagentSessions({
getActiveRuns: () => activeRuns,
});
expect(result.recovered).toBe(0);
expect(gateway.callGateway).not.toHaveBeenCalled();
});
it("handles multiple orphaned sessions", async () => {
const sessions = await import("../config/sessions.js");
const gateway = await import("../gateway/call.js");
vi.mocked(sessions.loadSessionStore).mockReturnValue({
"agent:main:subagent:session-a": {
sessionId: "id-a",
updatedAt: Date.now(),
abortedLastRun: true,
},
"agent:main:subagent:session-b": {
sessionId: "id-b",
updatedAt: Date.now(),
abortedLastRun: true,
},
"agent:main:subagent:session-c": {
sessionId: "id-c",
updatedAt: Date.now(),
abortedLastRun: false,
},
});
const activeRuns = new Map<string, SubagentRunRecord>();
activeRuns.set(
"run-a",
createTestRunRecord({
runId: "run-a",
childSessionKey: "agent:main:subagent:session-a",
task: "Task A",
}),
);
activeRuns.set(
"run-b",
createTestRunRecord({
runId: "run-b",
childSessionKey: "agent:main:subagent:session-b",
task: "Task B",
}),
);
activeRuns.set(
"run-c",
createTestRunRecord({
runId: "run-c",
childSessionKey: "agent:main:subagent:session-c",
task: "Task C",
}),
);
const { recoverOrphanedSubagentSessions } = await import("./subagent-orphan-recovery.js");
const result = await recoverOrphanedSubagentSessions({
getActiveRuns: () => activeRuns,
});
expect(result.recovered).toBe(2);
expect(result.skipped).toBe(1);
expect(gateway.callGateway).toHaveBeenCalledTimes(2);
});
it("handles callGateway failure gracefully", async () => {
const sessions = await import("../config/sessions.js");
const gateway = await import("../gateway/call.js");
vi.mocked(sessions.loadSessionStore).mockReturnValue({
"agent:main:subagent:test-session-1": {
sessionId: "session-abc",
updatedAt: Date.now(),
abortedLastRun: true,
},
});
vi.mocked(gateway.callGateway).mockRejectedValue(new Error("gateway unavailable"));
const activeRuns = new Map<string, SubagentRunRecord>();
activeRuns.set("run-1", createTestRunRecord());
const { recoverOrphanedSubagentSessions } = await import("./subagent-orphan-recovery.js");
const result = await recoverOrphanedSubagentSessions({
getActiveRuns: () => activeRuns,
});
expect(result.recovered).toBe(0);
expect(result.failed).toBe(1);
});
it("returns empty results when no active runs exist", async () => {
const { recoverOrphanedSubagentSessions } = await import("./subagent-orphan-recovery.js");
const result = await recoverOrphanedSubagentSessions({
getActiveRuns: () => new Map(),
});
expect(result.recovered).toBe(0);
expect(result.failed).toBe(0);
expect(result.skipped).toBe(0);
});
it("skips sessions with missing session entry in store", async () => {
const sessions = await import("../config/sessions.js");
const gateway = await import("../gateway/call.js");
// Store has no matching entry
vi.mocked(sessions.loadSessionStore).mockReturnValue({});
const activeRuns = new Map<string, SubagentRunRecord>();
activeRuns.set("run-1", createTestRunRecord());
const { recoverOrphanedSubagentSessions } = await import("./subagent-orphan-recovery.js");
const result = await recoverOrphanedSubagentSessions({
getActiveRuns: () => activeRuns,
});
expect(result.recovered).toBe(0);
expect(result.skipped).toBe(1);
expect(gateway.callGateway).not.toHaveBeenCalled();
});
it("clears abortedLastRun flag before resuming", async () => {
const sessions = await import("../config/sessions.js");
vi.mocked(sessions.loadSessionStore).mockReturnValue({
"agent:main:subagent:test-session-1": {
sessionId: "session-abc",
updatedAt: Date.now(),
abortedLastRun: true,
},
});
const activeRuns = new Map<string, SubagentRunRecord>();
activeRuns.set("run-1", createTestRunRecord());
const { recoverOrphanedSubagentSessions } = await import("./subagent-orphan-recovery.js");
await recoverOrphanedSubagentSessions({
getActiveRuns: () => activeRuns,
});
// updateSessionStore should have been called to clear the flag
expect(sessions.updateSessionStore).toHaveBeenCalledOnce();
const calls = vi.mocked(sessions.updateSessionStore).mock.calls;
const [storePath, updater] = calls[0];
expect(storePath).toBe("/tmp/test-sessions.json");
// Simulate the updater to verify it clears abortedLastRun
const mockStore: Record<string, { abortedLastRun?: boolean; updatedAt?: number }> = {
"agent:main:subagent:test-session-1": {
abortedLastRun: true,
updatedAt: 0,
},
};
(updater as (store: Record<string, unknown>) => void)(mockStore);
expect(mockStore["agent:main:subagent:test-session-1"]?.abortedLastRun).toBe(false);
});
it("truncates long task descriptions in resume message", async () => {
const sessions = await import("../config/sessions.js");
const gateway = await import("../gateway/call.js");
vi.mocked(sessions.loadSessionStore).mockReturnValue({
"agent:main:subagent:test-session-1": {
sessionId: "session-abc",
updatedAt: Date.now(),
abortedLastRun: true,
},
});
const longTask = "x".repeat(5000);
const activeRuns = new Map<string, SubagentRunRecord>();
activeRuns.set("run-1", createTestRunRecord({ task: longTask }));
const { recoverOrphanedSubagentSessions } = await import("./subagent-orphan-recovery.js");
await recoverOrphanedSubagentSessions({
getActiveRuns: () => activeRuns,
});
const callArgs = vi.mocked(gateway.callGateway).mock.calls[0];
const opts = callArgs[0];
const params = opts.params as Record<string, unknown>;
const message = params.message as string;
// Message should contain truncated task (2000 chars + "...")
expect(message.length).toBeLessThan(5000);
expect(message).toContain("...");
});
});

View File

@ -0,0 +1,185 @@
/**
* Post-restart orphan recovery for subagent sessions.
*
* After a SIGUSR1 gateway reload aborts in-flight subagent LLM calls,
* this module scans for orphaned sessions (those with `abortedLastRun: true`
* that are still tracked as active in the subagent registry) and sends a
* synthetic resume message to restart their work.
*
* @see https://github.com/openclaw/openclaw/issues/47711
*/
import crypto from "node:crypto";
import { loadConfig } from "../config/config.js";
import {
loadSessionStore,
resolveAgentIdFromSessionKey,
resolveStorePath,
updateSessionStore,
type SessionEntry,
} from "../config/sessions.js";
import { callGateway } from "../gateway/call.js";
import { createSubsystemLogger } from "../logging/subsystem.js";
import type { SubagentRunRecord } from "./subagent-registry.types.js";
const log = createSubsystemLogger("subagent-orphan-recovery");
/** Delay before attempting recovery to let the gateway finish bootstrapping. */
const DEFAULT_RECOVERY_DELAY_MS = 5_000;
/**
* Build the resume message for an orphaned subagent.
*/
function buildResumeMessage(task: string): string {
const maxTaskLen = 2000;
const truncatedTask = task.length > maxTaskLen ? `${task.slice(0, maxTaskLen)}...` : task;
return (
`[System] Your previous turn was interrupted by a gateway reload. ` +
`Your task was:\n\n${truncatedTask}\n\nPlease continue where you left off.`
);
}
/**
* Send a resume message to an orphaned subagent session via the gateway agent method.
*/
async function resumeOrphanedSession(params: {
sessionKey: string;
task: string;
}): Promise<boolean> {
const resumeMessage = buildResumeMessage(params.task);
try {
await callGateway<{ runId: string }>({
method: "agent",
params: {
message: resumeMessage,
sessionKey: params.sessionKey,
idempotencyKey: crypto.randomUUID(),
deliver: false,
lane: "subagent",
},
timeoutMs: 10_000,
});
log.info(`resumed orphaned session: ${params.sessionKey}`);
return true;
} catch (err) {
log.warn(`failed to resume orphaned session ${params.sessionKey}: ${String(err)}`);
return false;
}
}
/**
* Scan for and resume orphaned subagent sessions after a gateway restart.
*
* An orphaned session is one where:
* 1. It has an active (not ended) entry in the subagent run registry
* 2. Its session store entry has `abortedLastRun: true`
*
* For each orphaned session found, we:
* 1. Clear the `abortedLastRun` flag
* 2. Send a synthetic resume message to trigger a new LLM turn
*/
export async function recoverOrphanedSubagentSessions(params: {
getActiveRuns: () => Map<string, SubagentRunRecord>;
}): Promise<{ recovered: number; failed: number; skipped: number }> {
const result = { recovered: 0, failed: 0, skipped: 0 };
try {
const activeRuns = params.getActiveRuns();
if (activeRuns.size === 0) {
return result;
}
const cfg = loadConfig();
const storeCache = new Map<string, Record<string, SessionEntry>>();
for (const [runId, runRecord] of activeRuns.entries()) {
// Only consider runs that haven't ended yet
if (typeof runRecord.endedAt === "number" && runRecord.endedAt > 0) {
continue;
}
const childSessionKey = runRecord.childSessionKey?.trim();
if (!childSessionKey) {
continue;
}
try {
const agentId = resolveAgentIdFromSessionKey(childSessionKey);
const storePath = resolveStorePath(cfg.session?.store, { agentId });
let store = storeCache.get(storePath);
if (!store) {
store = loadSessionStore(storePath);
storeCache.set(storePath, store);
}
const entry = store[childSessionKey];
if (!entry) {
result.skipped++;
continue;
}
// Check if this session was aborted by the restart
if (!entry.abortedLastRun) {
result.skipped++;
continue;
}
log.info(`found orphaned subagent session: ${childSessionKey} (run=${runId})`);
// Clear the aborted flag before resuming
await updateSessionStore(storePath, (currentStore) => {
const current = currentStore[childSessionKey];
if (current) {
current.abortedLastRun = false;
current.updatedAt = Date.now();
currentStore[childSessionKey] = current;
}
});
// Resume the session with the original task context
const resumed = await resumeOrphanedSession({
sessionKey: childSessionKey,
task: runRecord.task,
});
if (resumed) {
result.recovered++;
} else {
result.failed++;
}
} catch (err) {
log.warn(`error processing orphaned session ${childSessionKey}: ${String(err)}`);
result.failed++;
}
}
} catch (err) {
log.warn(`orphan recovery scan failed: ${String(err)}`);
}
if (result.recovered > 0 || result.failed > 0) {
log.info(
`orphan recovery complete: recovered=${result.recovered} failed=${result.failed} skipped=${result.skipped}`,
);
}
return result;
}
/**
* Schedule orphan recovery after a delay.
* The delay gives the gateway time to fully bootstrap after restart.
*/
export function scheduleOrphanRecovery(params: {
getActiveRuns: () => Map<string, SubagentRunRecord>;
delayMs?: number;
}): void {
const delay = params.delayMs ?? DEFAULT_RECOVERY_DELAY_MS;
setTimeout(() => {
void recoverOrphanedSubagentSessions(params).catch((err) => {
log.warn(`scheduled orphan recovery failed: ${String(err)}`);
});
}, delay).unref?.();
}

View File

@ -30,6 +30,7 @@ import {
SUBAGENT_ENDED_REASON_KILLED,
type SubagentLifecycleEndedReason,
} from "./subagent-lifecycle-events.js";
import { scheduleOrphanRecovery } from "./subagent-orphan-recovery.js";
import {
resolveCleanupCompletionReason,
resolveDeferredCleanupDecision,
@ -684,6 +685,13 @@ function restoreSubagentRunsOnce() {
for (const runId of subagentRuns.keys()) {
resumeSubagentRun(runId);
}
// Schedule orphan recovery for subagent sessions that were aborted
// by a SIGUSR1 reload. This runs after a short delay to let the
// gateway fully bootstrap first. (#47711)
scheduleOrphanRecovery({
getActiveRuns: () => subagentRuns,
});
} catch {
// ignore restore failures
}

View File

@ -427,6 +427,8 @@ export const FIELD_HELP: Record<string, string> = {
"gateway.reload.mode":
'Controls how config edits are applied: "off" ignores live edits, "restart" always restarts, "hot" applies in-process, and "hybrid" tries hot then restarts if required. Keep "hybrid" for safest routine updates.',
"gateway.reload.debounceMs": "Debounce window (ms) before applying config changes.",
"gateway.reload.deferralTimeoutMs":
"Maximum time (ms) to wait for in-flight operations to complete before forcing a SIGUSR1 restart. Default: 300000 (5 minutes). Lower values risk aborting active subagent LLM calls.",
"gateway.nodes.browser.mode":
'Node browser routing ("auto" = pick single connected browser node, "manual" = require node param, "off" = disable).',
"gateway.nodes.browser.node": "Pin browser routing to a specific node id or name (optional).",

View File

@ -211,6 +211,13 @@ export type GatewayReloadConfig = {
mode?: GatewayReloadMode;
/** Debounce window for config reloads (ms). Default: 300. */
debounceMs?: number;
/**
* Maximum time (ms) to wait for in-flight operations to complete before
* forcing a SIGUSR1 restart. Default: 300000 (5 minutes).
* Lower values risk aborting active subagent LLM calls.
* @see https://github.com/openclaw/openclaw/issues/47711
*/
deferralTimeoutMs?: number;
};
export type GatewayHttpChatCompletionsConfig = {

View File

@ -728,6 +728,7 @@ export const OpenClawSchema = z
])
.optional(),
debounceMs: z.number().int().min(0).optional(),
deferralTimeoutMs: z.number().int().min(0).optional(),
})
.strict()
.optional(),

View File

@ -219,6 +219,7 @@ export function createGatewayReloadHandlers(params: {
deferGatewayRestartUntilIdle({
getPendingCount: () => getActiveCounts().totalActive,
maxWaitMs: nextConfig.gateway?.reload?.deferralTimeoutMs,
hooks: {
onReady: () => {
restartPending = false;

View File

@ -0,0 +1,119 @@
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { __testing, deferGatewayRestartUntilIdle, type RestartDeferralHooks } from "./restart.js";
describe("deferGatewayRestartUntilIdle timeout", () => {
beforeEach(() => {
vi.useFakeTimers();
__testing.resetSigusr1State();
// Add a listener so emitGatewayRestart uses process.emit instead of process.kill
process.on("SIGUSR1", () => {});
});
afterEach(() => {
vi.useRealTimers();
vi.restoreAllMocks();
__testing.resetSigusr1State();
process.removeAllListeners("SIGUSR1");
});
it("uses default 5-minute timeout when maxWaitMs is not specified", () => {
const hooks: RestartDeferralHooks = {
onTimeout: vi.fn(),
onReady: vi.fn(),
};
// Always return 1 pending item to prevent draining
deferGatewayRestartUntilIdle({
getPendingCount: () => 1,
hooks,
});
// Advance to just before 5 minutes — should NOT have timed out yet
vi.advanceTimersByTime(299_999);
expect(hooks.onTimeout).not.toHaveBeenCalled();
// Advance past 5 minutes — should time out
vi.advanceTimersByTime(1);
expect(hooks.onTimeout).toHaveBeenCalledOnce();
expect(hooks.onReady).not.toHaveBeenCalled();
});
it("respects custom maxWaitMs configuration", () => {
const hooks: RestartDeferralHooks = {
onTimeout: vi.fn(),
onReady: vi.fn(),
};
const customTimeoutMs = 120_000; // 2 minutes
deferGatewayRestartUntilIdle({
getPendingCount: () => 1,
maxWaitMs: customTimeoutMs,
hooks,
});
// Advance to just before 2 minutes
vi.advanceTimersByTime(119_999);
expect(hooks.onTimeout).not.toHaveBeenCalled();
// Advance past 2 minutes
vi.advanceTimersByTime(1);
expect(hooks.onTimeout).toHaveBeenCalledOnce();
});
it("calls onReady and does not timeout when pending count drops to 0", () => {
const hooks: RestartDeferralHooks = {
onTimeout: vi.fn(),
onReady: vi.fn(),
};
let pending = 3;
deferGatewayRestartUntilIdle({
getPendingCount: () => pending,
hooks,
});
// Advance a few poll intervals, then drain
vi.advanceTimersByTime(1000);
expect(hooks.onReady).not.toHaveBeenCalled();
pending = 0;
vi.advanceTimersByTime(500); // Next poll interval
expect(hooks.onReady).toHaveBeenCalledOnce();
expect(hooks.onTimeout).not.toHaveBeenCalled();
});
it("immediately restarts when pending count is 0", () => {
const hooks: RestartDeferralHooks = {
onReady: vi.fn(),
onTimeout: vi.fn(),
};
deferGatewayRestartUntilIdle({
getPendingCount: () => 0,
hooks,
});
// onReady should be called synchronously
expect(hooks.onReady).toHaveBeenCalledOnce();
expect(hooks.onTimeout).not.toHaveBeenCalled();
});
it("handles getPendingCount error by restarting immediately", () => {
const hooks: RestartDeferralHooks = {
onCheckError: vi.fn(),
onReady: vi.fn(),
};
deferGatewayRestartUntilIdle({
getPendingCount: () => {
throw new Error("store corrupted");
},
hooks,
});
expect(hooks.onCheckError).toHaveBeenCalledOnce();
expect(hooks.onReady).not.toHaveBeenCalled();
});
});

View File

@ -19,8 +19,9 @@ export type RestartAttempt = {
const SPAWN_TIMEOUT_MS = 2000;
const SIGUSR1_AUTH_GRACE_MS = 5000;
const DEFAULT_DEFERRAL_POLL_MS = 500;
// Cover slow in-flight embedded compaction work before forcing restart.
const DEFAULT_DEFERRAL_MAX_WAIT_MS = 90_000;
// Default to 5 minutes to avoid aborting in-flight subagent LLM calls.
// Configurable via gateway.reload.deferralTimeoutMs.
const DEFAULT_DEFERRAL_MAX_WAIT_MS = 300_000;
const RESTART_COOLDOWN_MS = 30_000;
const restartLog = createSubsystemLogger("restart");