Compare commits
6 Commits
main
...
fix/sigusr
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
745b01d775 | ||
|
|
9a1611f30b | ||
|
|
f109f3d4c2 | ||
|
|
7a5f2b128c | ||
|
|
f5e32be558 | ||
|
|
7b098ea79f |
@ -58,6 +58,7 @@ Docs: https://docs.openclaw.ai
|
||||
- Agents/openai-compatible tool calls: deduplicate repeated tool call ids across live assistant messages and replayed history so OpenAI-compatible backends no longer reject duplicate `tool_call_id` values with HTTP 400. (#40996) Thanks @xaeon2026.
|
||||
- Security/device pairing: harden `device.token.rotate` deny handling by keeping public failures generic while logging internal deny reasons and preserving approved-baseline enforcement. (`GHSA-7jrw-x62h-64p8`)
|
||||
- Slack/interactive replies: preserve `channelData.slack.blocks` through live DM delivery and preview-finalized edits so Block Kit button and select directives render instead of falling back to raw text. (#45890) Thanks @vincentkoc.
|
||||
- Gateway/restart: defer externally signaled unmanaged restarts through the in-process idle drain, and preserve the restored subagent run as remap fallback during orphan recovery so resumed sessions do not duplicate work. (#47719) Thanks @joeykrug.
|
||||
- Zalo/plugin runtime: export `resolveClientIp` from `openclaw/plugin-sdk/zalo` so installed builds no longer crash on startup when the webhook monitor loads from the packaged extension instead of the monorepo source tree. (#46549) Thanks @No898.
|
||||
- CI/channel test routing: move the built-in channel suites into `test:channels` and keep them out of `test:extensions`, so extension CI no longer fails after the channel migration while targeted test routing still sends Slack, Signal, and iMessage suites to the right lane. (#46066) Thanks @scoootscooob.
|
||||
- Browser/profiles: drop the auto-created `chrome-relay` browser profile; users who need the Chrome extension relay must now create their own profile via `openclaw browser create-profile`. (#45777) Thanks @odysseus0.
|
||||
|
||||
437
src/agents/subagent-orphan-recovery.test.ts
Normal file
437
src/agents/subagent-orphan-recovery.test.ts
Normal file
@ -0,0 +1,437 @@
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import type { SubagentRunRecord } from "./subagent-registry.types.js";
|
||||
|
||||
// Mock dependencies before importing the module under test
|
||||
vi.mock("../config/config.js", () => ({
|
||||
loadConfig: vi.fn(() => ({
|
||||
session: { store: undefined },
|
||||
})),
|
||||
}));
|
||||
|
||||
vi.mock("../config/sessions.js", () => ({
|
||||
loadSessionStore: vi.fn(() => ({})),
|
||||
resolveAgentIdFromSessionKey: vi.fn(() => "main"),
|
||||
resolveStorePath: vi.fn(() => "/tmp/test-sessions.json"),
|
||||
updateSessionStore: vi.fn(async () => {}),
|
||||
}));
|
||||
|
||||
vi.mock("../gateway/call.js", () => ({
|
||||
callGateway: vi.fn(async () => ({ runId: "test-run-id" })),
|
||||
}));
|
||||
|
||||
vi.mock("../gateway/session-utils.fs.js", () => ({
|
||||
readSessionMessages: vi.fn(() => []),
|
||||
}));
|
||||
|
||||
vi.mock("./subagent-registry.js", () => ({
|
||||
replaceSubagentRunAfterSteer: vi.fn(() => true),
|
||||
}));
|
||||
|
||||
function createTestRunRecord(overrides: Partial<SubagentRunRecord> = {}): SubagentRunRecord {
|
||||
return {
|
||||
runId: "run-1",
|
||||
childSessionKey: "agent:main:subagent:test-session-1",
|
||||
requesterSessionKey: "agent:main:signal:direct:+1234567890",
|
||||
requesterDisplayKey: "main",
|
||||
task: "Test task: implement feature X",
|
||||
cleanup: "delete",
|
||||
createdAt: Date.now() - 60_000,
|
||||
startedAt: Date.now() - 55_000,
|
||||
...overrides,
|
||||
};
|
||||
}
|
||||
|
||||
describe("subagent-orphan-recovery", () => {
|
||||
beforeEach(() => {
|
||||
vi.clearAllMocks();
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
vi.restoreAllMocks();
|
||||
});
|
||||
|
||||
it("recovers orphaned sessions with abortedLastRun=true", async () => {
|
||||
const sessions = await import("../config/sessions.js");
|
||||
const gateway = await import("../gateway/call.js");
|
||||
const subagentRegistry = await import("./subagent-registry.js");
|
||||
|
||||
const sessionEntry = {
|
||||
sessionId: "session-abc",
|
||||
updatedAt: Date.now(),
|
||||
abortedLastRun: true,
|
||||
};
|
||||
|
||||
vi.mocked(sessions.loadSessionStore).mockReturnValue({
|
||||
"agent:main:subagent:test-session-1": sessionEntry,
|
||||
});
|
||||
|
||||
const run = createTestRunRecord();
|
||||
const activeRuns = new Map<string, SubagentRunRecord>();
|
||||
activeRuns.set("run-1", run);
|
||||
|
||||
const { recoverOrphanedSubagentSessions } = await import("./subagent-orphan-recovery.js");
|
||||
|
||||
const result = await recoverOrphanedSubagentSessions({
|
||||
getActiveRuns: () => activeRuns,
|
||||
});
|
||||
|
||||
expect(result.recovered).toBe(1);
|
||||
expect(result.failed).toBe(0);
|
||||
expect(result.skipped).toBe(0);
|
||||
|
||||
// Should have called callGateway to resume the session
|
||||
expect(gateway.callGateway).toHaveBeenCalledOnce();
|
||||
const callArgs = vi.mocked(gateway.callGateway).mock.calls[0];
|
||||
const opts = callArgs[0];
|
||||
expect(opts.method).toBe("agent");
|
||||
const params = opts.params as Record<string, unknown>;
|
||||
expect(params.sessionKey).toBe("agent:main:subagent:test-session-1");
|
||||
expect(params.message).toContain("gateway reload");
|
||||
expect(params.message).toContain("Test task: implement feature X");
|
||||
expect(subagentRegistry.replaceSubagentRunAfterSteer).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
previousRunId: "run-1",
|
||||
nextRunId: "test-run-id",
|
||||
fallback: run,
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it("skips sessions that are not aborted", async () => {
|
||||
const sessions = await import("../config/sessions.js");
|
||||
const gateway = await import("../gateway/call.js");
|
||||
|
||||
vi.mocked(sessions.loadSessionStore).mockReturnValue({
|
||||
"agent:main:subagent:test-session-1": {
|
||||
sessionId: "session-abc",
|
||||
updatedAt: Date.now(),
|
||||
abortedLastRun: false,
|
||||
},
|
||||
});
|
||||
|
||||
const activeRuns = new Map<string, SubagentRunRecord>();
|
||||
activeRuns.set("run-1", createTestRunRecord());
|
||||
|
||||
const { recoverOrphanedSubagentSessions } = await import("./subagent-orphan-recovery.js");
|
||||
|
||||
const result = await recoverOrphanedSubagentSessions({
|
||||
getActiveRuns: () => activeRuns,
|
||||
});
|
||||
|
||||
expect(result.recovered).toBe(0);
|
||||
expect(result.skipped).toBe(1);
|
||||
expect(gateway.callGateway).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("skips runs that have already ended", async () => {
|
||||
const gateway = await import("../gateway/call.js");
|
||||
|
||||
const activeRuns = new Map<string, SubagentRunRecord>();
|
||||
activeRuns.set(
|
||||
"run-1",
|
||||
createTestRunRecord({
|
||||
endedAt: Date.now() - 1000,
|
||||
}),
|
||||
);
|
||||
|
||||
const { recoverOrphanedSubagentSessions } = await import("./subagent-orphan-recovery.js");
|
||||
|
||||
const result = await recoverOrphanedSubagentSessions({
|
||||
getActiveRuns: () => activeRuns,
|
||||
});
|
||||
|
||||
expect(result.recovered).toBe(0);
|
||||
expect(gateway.callGateway).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("handles multiple orphaned sessions", async () => {
|
||||
const sessions = await import("../config/sessions.js");
|
||||
const gateway = await import("../gateway/call.js");
|
||||
|
||||
vi.mocked(sessions.loadSessionStore).mockReturnValue({
|
||||
"agent:main:subagent:session-a": {
|
||||
sessionId: "id-a",
|
||||
updatedAt: Date.now(),
|
||||
abortedLastRun: true,
|
||||
},
|
||||
"agent:main:subagent:session-b": {
|
||||
sessionId: "id-b",
|
||||
updatedAt: Date.now(),
|
||||
abortedLastRun: true,
|
||||
},
|
||||
"agent:main:subagent:session-c": {
|
||||
sessionId: "id-c",
|
||||
updatedAt: Date.now(),
|
||||
abortedLastRun: false,
|
||||
},
|
||||
});
|
||||
|
||||
const activeRuns = new Map<string, SubagentRunRecord>();
|
||||
activeRuns.set(
|
||||
"run-a",
|
||||
createTestRunRecord({
|
||||
runId: "run-a",
|
||||
childSessionKey: "agent:main:subagent:session-a",
|
||||
task: "Task A",
|
||||
}),
|
||||
);
|
||||
activeRuns.set(
|
||||
"run-b",
|
||||
createTestRunRecord({
|
||||
runId: "run-b",
|
||||
childSessionKey: "agent:main:subagent:session-b",
|
||||
task: "Task B",
|
||||
}),
|
||||
);
|
||||
activeRuns.set(
|
||||
"run-c",
|
||||
createTestRunRecord({
|
||||
runId: "run-c",
|
||||
childSessionKey: "agent:main:subagent:session-c",
|
||||
task: "Task C",
|
||||
}),
|
||||
);
|
||||
|
||||
const { recoverOrphanedSubagentSessions } = await import("./subagent-orphan-recovery.js");
|
||||
|
||||
const result = await recoverOrphanedSubagentSessions({
|
||||
getActiveRuns: () => activeRuns,
|
||||
});
|
||||
|
||||
expect(result.recovered).toBe(2);
|
||||
expect(result.skipped).toBe(1);
|
||||
expect(gateway.callGateway).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
|
||||
it("handles callGateway failure gracefully and preserves abortedLastRun flag", async () => {
|
||||
const sessions = await import("../config/sessions.js");
|
||||
const gateway = await import("../gateway/call.js");
|
||||
|
||||
vi.mocked(sessions.loadSessionStore).mockReturnValue({
|
||||
"agent:main:subagent:test-session-1": {
|
||||
sessionId: "session-abc",
|
||||
updatedAt: Date.now(),
|
||||
abortedLastRun: true,
|
||||
},
|
||||
});
|
||||
|
||||
vi.mocked(gateway.callGateway).mockRejectedValue(new Error("gateway unavailable"));
|
||||
|
||||
const activeRuns = new Map<string, SubagentRunRecord>();
|
||||
activeRuns.set("run-1", createTestRunRecord());
|
||||
|
||||
const { recoverOrphanedSubagentSessions } = await import("./subagent-orphan-recovery.js");
|
||||
|
||||
const result = await recoverOrphanedSubagentSessions({
|
||||
getActiveRuns: () => activeRuns,
|
||||
});
|
||||
|
||||
expect(result.recovered).toBe(0);
|
||||
expect(result.failed).toBe(1);
|
||||
|
||||
// abortedLastRun flag should NOT be cleared on failure,
|
||||
// so the next restart can retry the recovery
|
||||
expect(sessions.updateSessionStore).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("returns empty results when no active runs exist", async () => {
|
||||
const { recoverOrphanedSubagentSessions } = await import("./subagent-orphan-recovery.js");
|
||||
|
||||
const result = await recoverOrphanedSubagentSessions({
|
||||
getActiveRuns: () => new Map(),
|
||||
});
|
||||
|
||||
expect(result.recovered).toBe(0);
|
||||
expect(result.failed).toBe(0);
|
||||
expect(result.skipped).toBe(0);
|
||||
});
|
||||
|
||||
it("skips sessions with missing session entry in store", async () => {
|
||||
const sessions = await import("../config/sessions.js");
|
||||
const gateway = await import("../gateway/call.js");
|
||||
|
||||
// Store has no matching entry
|
||||
vi.mocked(sessions.loadSessionStore).mockReturnValue({});
|
||||
|
||||
const activeRuns = new Map<string, SubagentRunRecord>();
|
||||
activeRuns.set("run-1", createTestRunRecord());
|
||||
|
||||
const { recoverOrphanedSubagentSessions } = await import("./subagent-orphan-recovery.js");
|
||||
|
||||
const result = await recoverOrphanedSubagentSessions({
|
||||
getActiveRuns: () => activeRuns,
|
||||
});
|
||||
|
||||
expect(result.recovered).toBe(0);
|
||||
expect(result.skipped).toBe(1);
|
||||
expect(gateway.callGateway).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("clears abortedLastRun flag after successful resume", async () => {
|
||||
const sessions = await import("../config/sessions.js");
|
||||
const gateway = await import("../gateway/call.js");
|
||||
|
||||
// Ensure callGateway succeeds for this test
|
||||
vi.mocked(gateway.callGateway).mockResolvedValue({ runId: "resumed-run" } as never);
|
||||
|
||||
vi.mocked(sessions.loadSessionStore).mockReturnValue({
|
||||
"agent:main:subagent:test-session-1": {
|
||||
sessionId: "session-abc",
|
||||
updatedAt: Date.now(),
|
||||
abortedLastRun: true,
|
||||
},
|
||||
});
|
||||
|
||||
const activeRuns = new Map<string, SubagentRunRecord>();
|
||||
activeRuns.set("run-1", createTestRunRecord());
|
||||
|
||||
const { recoverOrphanedSubagentSessions } = await import("./subagent-orphan-recovery.js");
|
||||
|
||||
await recoverOrphanedSubagentSessions({
|
||||
getActiveRuns: () => activeRuns,
|
||||
});
|
||||
|
||||
// updateSessionStore should have been called AFTER successful resume to clear the flag
|
||||
expect(sessions.updateSessionStore).toHaveBeenCalledOnce();
|
||||
const calls = vi.mocked(sessions.updateSessionStore).mock.calls;
|
||||
const [storePath, updater] = calls[0];
|
||||
expect(storePath).toBe("/tmp/test-sessions.json");
|
||||
|
||||
// Simulate the updater to verify it clears abortedLastRun
|
||||
const mockStore: Record<string, { abortedLastRun?: boolean; updatedAt?: number }> = {
|
||||
"agent:main:subagent:test-session-1": {
|
||||
abortedLastRun: true,
|
||||
updatedAt: 0,
|
||||
},
|
||||
};
|
||||
(updater as (store: Record<string, unknown>) => void)(mockStore);
|
||||
expect(mockStore["agent:main:subagent:test-session-1"]?.abortedLastRun).toBe(false);
|
||||
});
|
||||
|
||||
it("truncates long task descriptions in resume message", async () => {
|
||||
const sessions = await import("../config/sessions.js");
|
||||
const gateway = await import("../gateway/call.js");
|
||||
|
||||
vi.mocked(sessions.loadSessionStore).mockReturnValue({
|
||||
"agent:main:subagent:test-session-1": {
|
||||
sessionId: "session-abc",
|
||||
updatedAt: Date.now(),
|
||||
abortedLastRun: true,
|
||||
},
|
||||
});
|
||||
|
||||
const longTask = "x".repeat(5000);
|
||||
const activeRuns = new Map<string, SubagentRunRecord>();
|
||||
activeRuns.set("run-1", createTestRunRecord({ task: longTask }));
|
||||
|
||||
const { recoverOrphanedSubagentSessions } = await import("./subagent-orphan-recovery.js");
|
||||
|
||||
await recoverOrphanedSubagentSessions({
|
||||
getActiveRuns: () => activeRuns,
|
||||
});
|
||||
|
||||
const callArgs = vi.mocked(gateway.callGateway).mock.calls[0];
|
||||
const opts = callArgs[0];
|
||||
const params = opts.params as Record<string, unknown>;
|
||||
const message = params.message as string;
|
||||
// Message should contain truncated task (2000 chars + "...")
|
||||
expect(message.length).toBeLessThan(5000);
|
||||
expect(message).toContain("...");
|
||||
});
|
||||
|
||||
it("includes last human message in resume when available", async () => {
|
||||
const sessions = await import("../config/sessions.js");
|
||||
const gateway = await import("../gateway/call.js");
|
||||
const sessionUtils = await import("../gateway/session-utils.fs.js");
|
||||
|
||||
vi.mocked(sessions.loadSessionStore).mockReturnValue({
|
||||
"agent:main:subagent:test-session-1": {
|
||||
sessionId: "session-abc",
|
||||
updatedAt: Date.now(),
|
||||
abortedLastRun: true,
|
||||
sessionFile: "session-abc.jsonl",
|
||||
},
|
||||
});
|
||||
|
||||
vi.mocked(sessionUtils.readSessionMessages).mockReturnValue([
|
||||
{ role: "user", content: [{ type: "text", text: "Please build feature Y" }] },
|
||||
{ role: "assistant", content: [{ type: "text", text: "Working on it..." }] },
|
||||
{ role: "user", content: [{ type: "text", text: "Also add tests for it" }] },
|
||||
{ role: "assistant", content: [{ type: "text", text: "Sure, adding tests now." }] },
|
||||
]);
|
||||
|
||||
const activeRuns = new Map<string, SubagentRunRecord>();
|
||||
activeRuns.set("run-1", createTestRunRecord());
|
||||
|
||||
const { recoverOrphanedSubagentSessions } = await import("./subagent-orphan-recovery.js");
|
||||
await recoverOrphanedSubagentSessions({ getActiveRuns: () => activeRuns });
|
||||
|
||||
const callArgs = vi.mocked(gateway.callGateway).mock.calls[0];
|
||||
const params = callArgs[0].params as Record<string, unknown>;
|
||||
const message = params.message as string;
|
||||
expect(message).toContain("Also add tests for it");
|
||||
expect(message).toContain("last message from the user");
|
||||
});
|
||||
|
||||
it("adds config change hint when assistant messages reference config modifications", async () => {
|
||||
const sessions = await import("../config/sessions.js");
|
||||
const gateway = await import("../gateway/call.js");
|
||||
const sessionUtils = await import("../gateway/session-utils.fs.js");
|
||||
|
||||
vi.mocked(sessions.loadSessionStore).mockReturnValue({
|
||||
"agent:main:subagent:test-session-1": {
|
||||
sessionId: "session-abc",
|
||||
updatedAt: Date.now(),
|
||||
abortedLastRun: true,
|
||||
},
|
||||
});
|
||||
|
||||
vi.mocked(sessionUtils.readSessionMessages).mockReturnValue([
|
||||
{ role: "user", content: "Update the config" },
|
||||
{ role: "assistant", content: "I've modified openclaw.json to add the new setting." },
|
||||
]);
|
||||
|
||||
const activeRuns = new Map<string, SubagentRunRecord>();
|
||||
activeRuns.set("run-1", createTestRunRecord());
|
||||
|
||||
const { recoverOrphanedSubagentSessions } = await import("./subagent-orphan-recovery.js");
|
||||
await recoverOrphanedSubagentSessions({ getActiveRuns: () => activeRuns });
|
||||
|
||||
const callArgs = vi.mocked(gateway.callGateway).mock.calls[0];
|
||||
const params = callArgs[0].params as Record<string, unknown>;
|
||||
const message = params.message as string;
|
||||
expect(message).toContain("config changes from your previous run were already applied");
|
||||
});
|
||||
|
||||
it("prevents duplicate resume when updateSessionStore fails", async () => {
|
||||
const sessions = await import("../config/sessions.js");
|
||||
const gateway = await import("../gateway/call.js");
|
||||
|
||||
vi.mocked(gateway.callGateway).mockResolvedValue({ runId: "new-run" } as never);
|
||||
vi.mocked(sessions.updateSessionStore).mockRejectedValue(new Error("write failed"));
|
||||
|
||||
vi.mocked(sessions.loadSessionStore).mockReturnValue({
|
||||
"agent:main:subagent:test-session-1": {
|
||||
sessionId: "session-abc",
|
||||
updatedAt: Date.now(),
|
||||
abortedLastRun: true,
|
||||
},
|
||||
});
|
||||
|
||||
const activeRuns = new Map<string, SubagentRunRecord>();
|
||||
activeRuns.set("run-1", createTestRunRecord());
|
||||
activeRuns.set(
|
||||
"run-2",
|
||||
createTestRunRecord({
|
||||
runId: "run-2",
|
||||
}),
|
||||
);
|
||||
|
||||
const { recoverOrphanedSubagentSessions } = await import("./subagent-orphan-recovery.js");
|
||||
const result = await recoverOrphanedSubagentSessions({ getActiveRuns: () => activeRuns });
|
||||
|
||||
expect(result.recovered).toBe(1);
|
||||
expect(result.skipped).toBe(1);
|
||||
expect(gateway.callGateway).toHaveBeenCalledOnce();
|
||||
});
|
||||
});
|
||||
314
src/agents/subagent-orphan-recovery.ts
Normal file
314
src/agents/subagent-orphan-recovery.ts
Normal file
@ -0,0 +1,314 @@
|
||||
/**
|
||||
* Post-restart orphan recovery for subagent sessions.
|
||||
*
|
||||
* After a SIGUSR1 gateway reload aborts in-flight subagent LLM calls,
|
||||
* this module scans for orphaned sessions (those with `abortedLastRun: true`
|
||||
* that are still tracked as active in the subagent registry) and sends a
|
||||
* synthetic resume message to restart their work.
|
||||
*
|
||||
* @see https://github.com/openclaw/openclaw/issues/47711
|
||||
*/
|
||||
|
||||
import crypto from "node:crypto";
|
||||
import { loadConfig } from "../config/config.js";
|
||||
import {
|
||||
loadSessionStore,
|
||||
resolveAgentIdFromSessionKey,
|
||||
resolveStorePath,
|
||||
updateSessionStore,
|
||||
type SessionEntry,
|
||||
} from "../config/sessions.js";
|
||||
import { callGateway } from "../gateway/call.js";
|
||||
import { readSessionMessages } from "../gateway/session-utils.fs.js";
|
||||
import { createSubsystemLogger } from "../logging/subsystem.js";
|
||||
import { replaceSubagentRunAfterSteer } from "./subagent-registry.js";
|
||||
import type { SubagentRunRecord } from "./subagent-registry.types.js";
|
||||
|
||||
const log = createSubsystemLogger("subagent-orphan-recovery");
|
||||
|
||||
/** Delay before attempting recovery to let the gateway finish bootstrapping. */
|
||||
const DEFAULT_RECOVERY_DELAY_MS = 5_000;
|
||||
|
||||
/**
|
||||
* Build the resume message for an orphaned subagent.
|
||||
*/
|
||||
function buildResumeMessage(task: string, lastHumanMessage?: string): string {
|
||||
const maxTaskLen = 2000;
|
||||
const truncatedTask = task.length > maxTaskLen ? `${task.slice(0, maxTaskLen)}...` : task;
|
||||
|
||||
let message =
|
||||
`[System] Your previous turn was interrupted by a gateway reload. ` +
|
||||
`Your original task was:\n\n${truncatedTask}\n\n`;
|
||||
|
||||
if (lastHumanMessage) {
|
||||
message += `The last message from the user before the interruption was:\n\n${lastHumanMessage}\n\n`;
|
||||
}
|
||||
|
||||
message += `Please continue where you left off.`;
|
||||
return message;
|
||||
}
|
||||
|
||||
function extractMessageText(msg: unknown): string | undefined {
|
||||
if (!msg || typeof msg !== "object") {
|
||||
return undefined;
|
||||
}
|
||||
const m = msg as Record<string, unknown>;
|
||||
if (typeof m.content === "string") {
|
||||
return m.content;
|
||||
}
|
||||
if (Array.isArray(m.content)) {
|
||||
const text = m.content
|
||||
.filter(
|
||||
(c: unknown) =>
|
||||
typeof c === "object" &&
|
||||
c !== null &&
|
||||
(c as Record<string, unknown>).type === "text" &&
|
||||
typeof (c as Record<string, unknown>).text === "string",
|
||||
)
|
||||
.map((c: unknown) => (c as Record<string, string>).text)
|
||||
.filter(Boolean)
|
||||
.join("\n");
|
||||
return text || undefined;
|
||||
}
|
||||
return undefined;
|
||||
}
|
||||
|
||||
/**
|
||||
* Send a resume message to an orphaned subagent session via the gateway agent method.
|
||||
*/
|
||||
async function resumeOrphanedSession(params: {
|
||||
sessionKey: string;
|
||||
task: string;
|
||||
lastHumanMessage?: string;
|
||||
configChangeHint?: string;
|
||||
originalRunId: string;
|
||||
originalRun: SubagentRunRecord;
|
||||
}): Promise<boolean> {
|
||||
let resumeMessage = buildResumeMessage(params.task, params.lastHumanMessage);
|
||||
if (params.configChangeHint) {
|
||||
resumeMessage += params.configChangeHint;
|
||||
}
|
||||
|
||||
try {
|
||||
const result = await callGateway<{ runId: string }>({
|
||||
method: "agent",
|
||||
params: {
|
||||
message: resumeMessage,
|
||||
sessionKey: params.sessionKey,
|
||||
idempotencyKey: crypto.randomUUID(),
|
||||
deliver: false,
|
||||
lane: "subagent",
|
||||
},
|
||||
timeoutMs: 10_000,
|
||||
});
|
||||
const remapped = replaceSubagentRunAfterSteer({
|
||||
previousRunId: params.originalRunId,
|
||||
nextRunId: result.runId,
|
||||
fallback: params.originalRun,
|
||||
});
|
||||
if (!remapped) {
|
||||
log.warn(
|
||||
`resumed orphaned session ${params.sessionKey} but remap failed (old run already removed); treating as failure`,
|
||||
);
|
||||
return false;
|
||||
}
|
||||
log.info(`resumed orphaned session: ${params.sessionKey}`);
|
||||
return true;
|
||||
} catch (err) {
|
||||
log.warn(`failed to resume orphaned session ${params.sessionKey}: ${String(err)}`);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Scan for and resume orphaned subagent sessions after a gateway restart.
|
||||
*
|
||||
* An orphaned session is one where:
|
||||
* 1. It has an active (not ended) entry in the subagent run registry
|
||||
* 2. Its session store entry has `abortedLastRun: true`
|
||||
*
|
||||
* For each orphaned session found, we:
|
||||
* 1. Clear the `abortedLastRun` flag
|
||||
* 2. Send a synthetic resume message to trigger a new LLM turn
|
||||
*/
|
||||
export async function recoverOrphanedSubagentSessions(params: {
|
||||
getActiveRuns: () => Map<string, SubagentRunRecord>;
|
||||
/** Persisted across retries so already-resumed sessions are not resumed again. */
|
||||
resumedSessionKeys?: Set<string>;
|
||||
}): Promise<{ recovered: number; failed: number; skipped: number }> {
|
||||
const result = { recovered: 0, failed: 0, skipped: 0 };
|
||||
const resumedSessionKeys = params.resumedSessionKeys ?? new Set<string>();
|
||||
const configChangePattern = /openclaw\.json|openclaw gateway restart|config\.patch/i;
|
||||
|
||||
try {
|
||||
const activeRuns = params.getActiveRuns();
|
||||
if (activeRuns.size === 0) {
|
||||
return result;
|
||||
}
|
||||
|
||||
const cfg = loadConfig();
|
||||
const storeCache = new Map<string, Record<string, SessionEntry>>();
|
||||
|
||||
for (const [runId, runRecord] of activeRuns.entries()) {
|
||||
// Only consider runs that haven't ended yet
|
||||
if (typeof runRecord.endedAt === "number" && runRecord.endedAt > 0) {
|
||||
continue;
|
||||
}
|
||||
|
||||
const childSessionKey = runRecord.childSessionKey?.trim();
|
||||
if (!childSessionKey) {
|
||||
continue;
|
||||
}
|
||||
if (resumedSessionKeys.has(childSessionKey)) {
|
||||
result.skipped++;
|
||||
continue;
|
||||
}
|
||||
|
||||
try {
|
||||
const agentId = resolveAgentIdFromSessionKey(childSessionKey);
|
||||
const storePath = resolveStorePath(cfg.session?.store, { agentId });
|
||||
|
||||
let store = storeCache.get(storePath);
|
||||
if (!store) {
|
||||
store = loadSessionStore(storePath);
|
||||
storeCache.set(storePath, store);
|
||||
}
|
||||
|
||||
const entry = store[childSessionKey];
|
||||
if (!entry) {
|
||||
result.skipped++;
|
||||
continue;
|
||||
}
|
||||
|
||||
// Check if this session was aborted by the restart
|
||||
if (!entry.abortedLastRun) {
|
||||
result.skipped++;
|
||||
continue;
|
||||
}
|
||||
|
||||
log.info(`found orphaned subagent session: ${childSessionKey} (run=${runId})`);
|
||||
|
||||
const messages = readSessionMessages(entry.sessionId, storePath, entry.sessionFile);
|
||||
const lastHumanMessage = [...messages]
|
||||
.toReversed()
|
||||
.find((msg) => (msg as { role?: unknown } | null)?.role === "user");
|
||||
const configChangeDetected = messages.some((msg) => {
|
||||
if ((msg as { role?: unknown } | null)?.role !== "assistant") {
|
||||
return false;
|
||||
}
|
||||
const text = extractMessageText(msg);
|
||||
return typeof text === "string" && configChangePattern.test(text);
|
||||
});
|
||||
|
||||
// Resume the session with the original task context.
|
||||
// We intentionally do NOT clear abortedLastRun before attempting
|
||||
// the resume — if callGateway fails (e.g. gateway still booting),
|
||||
// the flag stays true so the next restart can retry.
|
||||
const resumed = await resumeOrphanedSession({
|
||||
sessionKey: childSessionKey,
|
||||
task: runRecord.task,
|
||||
lastHumanMessage: extractMessageText(lastHumanMessage),
|
||||
configChangeHint: configChangeDetected
|
||||
? "\n\n[config changes from your previous run were already applied — do not re-modify openclaw.json or restart the gateway]"
|
||||
: undefined,
|
||||
originalRunId: runId,
|
||||
originalRun: runRecord,
|
||||
});
|
||||
|
||||
if (resumed) {
|
||||
resumedSessionKeys.add(childSessionKey);
|
||||
// Only clear the aborted flag after confirmed successful resume.
|
||||
try {
|
||||
await updateSessionStore(storePath, (currentStore) => {
|
||||
const current = currentStore[childSessionKey];
|
||||
if (current) {
|
||||
current.abortedLastRun = false;
|
||||
current.updatedAt = Date.now();
|
||||
currentStore[childSessionKey] = current;
|
||||
}
|
||||
});
|
||||
} catch (err) {
|
||||
log.warn(
|
||||
`resume succeeded but failed to update session store for ${childSessionKey}: ${String(err)}`,
|
||||
);
|
||||
}
|
||||
result.recovered++;
|
||||
} else {
|
||||
// Flag stays as abortedLastRun=true so next restart can retry
|
||||
log.warn(
|
||||
`resume failed for ${childSessionKey}; abortedLastRun flag preserved for retry on next restart`,
|
||||
);
|
||||
result.failed++;
|
||||
}
|
||||
} catch (err) {
|
||||
log.warn(`error processing orphaned session ${childSessionKey}: ${String(err)}`);
|
||||
result.failed++;
|
||||
}
|
||||
}
|
||||
} catch (err) {
|
||||
log.warn(`orphan recovery scan failed: ${String(err)}`);
|
||||
// Ensure retry logic fires for scan-level exceptions.
|
||||
if (result.failed === 0) {
|
||||
result.failed = 1;
|
||||
}
|
||||
}
|
||||
|
||||
if (result.recovered > 0 || result.failed > 0) {
|
||||
log.info(
|
||||
`orphan recovery complete: recovered=${result.recovered} failed=${result.failed} skipped=${result.skipped}`,
|
||||
);
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
/** Maximum number of retry attempts for orphan recovery. */
|
||||
const MAX_RECOVERY_RETRIES = 3;
|
||||
/** Backoff multiplier between retries (exponential). */
|
||||
const RETRY_BACKOFF_MULTIPLIER = 2;
|
||||
|
||||
/**
|
||||
* Schedule orphan recovery after a delay, with retry logic.
|
||||
* The delay gives the gateway time to fully bootstrap after restart.
|
||||
* If recovery fails (e.g. gateway not yet ready), retries with exponential backoff.
|
||||
*/
|
||||
export function scheduleOrphanRecovery(params: {
|
||||
getActiveRuns: () => Map<string, SubagentRunRecord>;
|
||||
delayMs?: number;
|
||||
maxRetries?: number;
|
||||
}): void {
|
||||
const initialDelay = params.delayMs ?? DEFAULT_RECOVERY_DELAY_MS;
|
||||
const maxRetries = params.maxRetries ?? MAX_RECOVERY_RETRIES;
|
||||
|
||||
const resumedSessionKeys = new Set<string>();
|
||||
|
||||
const attemptRecovery = (attempt: number, delay: number) => {
|
||||
setTimeout(() => {
|
||||
void recoverOrphanedSubagentSessions({ ...params, resumedSessionKeys })
|
||||
.then((result) => {
|
||||
if (result.failed > 0 && attempt < maxRetries) {
|
||||
const nextDelay = delay * RETRY_BACKOFF_MULTIPLIER;
|
||||
log.info(
|
||||
`orphan recovery had ${result.failed} failure(s); retrying in ${nextDelay}ms (attempt ${attempt + 1}/${maxRetries})`,
|
||||
);
|
||||
attemptRecovery(attempt + 1, nextDelay);
|
||||
}
|
||||
})
|
||||
.catch((err) => {
|
||||
if (attempt < maxRetries) {
|
||||
const nextDelay = delay * RETRY_BACKOFF_MULTIPLIER;
|
||||
log.warn(
|
||||
`scheduled orphan recovery failed: ${String(err)}; retrying in ${nextDelay}ms (attempt ${attempt + 1}/${maxRetries})`,
|
||||
);
|
||||
attemptRecovery(attempt + 1, nextDelay);
|
||||
} else {
|
||||
log.warn(
|
||||
`scheduled orphan recovery failed after ${maxRetries} retries: ${String(err)}`,
|
||||
);
|
||||
}
|
||||
});
|
||||
}, delay).unref?.();
|
||||
};
|
||||
|
||||
attemptRecovery(0, initialDelay);
|
||||
}
|
||||
@ -684,6 +684,19 @@ function restoreSubagentRunsOnce() {
|
||||
for (const runId of subagentRuns.keys()) {
|
||||
resumeSubagentRun(runId);
|
||||
}
|
||||
|
||||
// Schedule orphan recovery for subagent sessions that were aborted
|
||||
// by a SIGUSR1 reload. This runs after a short delay to let the
|
||||
// gateway fully bootstrap first. Dynamic import to avoid increasing
|
||||
// startup memory footprint. (#47711)
|
||||
void import("./subagent-orphan-recovery.js").then(
|
||||
({ scheduleOrphanRecovery }) => {
|
||||
scheduleOrphanRecovery({ getActiveRuns: () => subagentRuns });
|
||||
},
|
||||
() => {
|
||||
// Ignore import failures — orphan recovery is best-effort.
|
||||
},
|
||||
);
|
||||
} catch {
|
||||
// ignore restore failures
|
||||
}
|
||||
|
||||
@ -50,8 +50,11 @@ function resolveGatewayPortFallback(): Promise<number> {
|
||||
}
|
||||
|
||||
async function assertUnmanagedGatewayRestartEnabled(port: number): Promise<void> {
|
||||
const cfg = await readBestEffortConfig().catch(() => undefined);
|
||||
const tlsEnabled = !!cfg?.gateway?.tls?.enabled;
|
||||
const scheme = tlsEnabled ? "wss" : "ws";
|
||||
const probe = await probeGateway({
|
||||
url: `ws://127.0.0.1:${port}`,
|
||||
url: `${scheme}://127.0.0.1:${port}`,
|
||||
auth: {
|
||||
token: process.env.OPENCLAW_GATEWAY_TOKEN?.trim() || undefined,
|
||||
password: process.env.OPENCLAW_GATEWAY_PASSWORD?.trim() || undefined,
|
||||
|
||||
@ -8,6 +8,15 @@ const acquireGatewayLock = vi.fn(async (_opts?: { port?: number }) => ({
|
||||
const consumeGatewaySigusr1RestartAuthorization = vi.fn(() => true);
|
||||
const isGatewaySigusr1RestartExternallyAllowed = vi.fn(() => false);
|
||||
const markGatewaySigusr1RestartHandled = vi.fn();
|
||||
const scheduleGatewaySigusr1Restart = vi.fn((_opts?: { delayMs?: number; reason?: string }) => ({
|
||||
ok: true,
|
||||
pid: process.pid,
|
||||
signal: "SIGUSR1" as const,
|
||||
delayMs: 0,
|
||||
mode: "emit" as const,
|
||||
coalesced: false,
|
||||
cooldownMsApplied: 0,
|
||||
}));
|
||||
const getActiveTaskCount = vi.fn(() => 0);
|
||||
const markGatewayDraining = vi.fn();
|
||||
const waitForActiveTasks = vi.fn(async (_timeoutMs: number) => ({ drained: true }));
|
||||
@ -35,6 +44,8 @@ vi.mock("../../infra/restart.js", () => ({
|
||||
consumeGatewaySigusr1RestartAuthorization: () => consumeGatewaySigusr1RestartAuthorization(),
|
||||
isGatewaySigusr1RestartExternallyAllowed: () => isGatewaySigusr1RestartExternallyAllowed(),
|
||||
markGatewaySigusr1RestartHandled: () => markGatewaySigusr1RestartHandled(),
|
||||
scheduleGatewaySigusr1Restart: (opts?: { delayMs?: number; reason?: string }) =>
|
||||
scheduleGatewaySigusr1Restart(opts),
|
||||
}));
|
||||
|
||||
vi.mock("../../infra/process-respawn.js", () => ({
|
||||
@ -292,6 +303,28 @@ describe("runGatewayLoop", () => {
|
||||
});
|
||||
});
|
||||
|
||||
it("routes external SIGUSR1 through the restart scheduler before draining", async () => {
|
||||
vi.clearAllMocks();
|
||||
consumeGatewaySigusr1RestartAuthorization.mockReturnValueOnce(false);
|
||||
isGatewaySigusr1RestartExternallyAllowed.mockReturnValueOnce(true);
|
||||
|
||||
await withIsolatedSignals(async ({ captureSignal }) => {
|
||||
const { close, start } = await createSignaledLoopHarness();
|
||||
const sigusr1 = captureSignal("SIGUSR1");
|
||||
|
||||
sigusr1();
|
||||
await new Promise<void>((resolve) => setImmediate(resolve));
|
||||
|
||||
expect(scheduleGatewaySigusr1Restart).toHaveBeenCalledWith({
|
||||
delayMs: 0,
|
||||
reason: "SIGUSR1",
|
||||
});
|
||||
expect(close).not.toHaveBeenCalled();
|
||||
expect(start).toHaveBeenCalledTimes(1);
|
||||
expect(markGatewaySigusr1RestartHandled).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
||||
it("releases the lock before exiting on spawned restart", async () => {
|
||||
vi.clearAllMocks();
|
||||
|
||||
|
||||
@ -10,6 +10,7 @@ import {
|
||||
consumeGatewaySigusr1RestartAuthorization,
|
||||
isGatewaySigusr1RestartExternallyAllowed,
|
||||
markGatewaySigusr1RestartHandled,
|
||||
scheduleGatewaySigusr1Restart,
|
||||
} from "../../infra/restart.js";
|
||||
import { createSubsystemLogger } from "../../logging/subsystem.js";
|
||||
import {
|
||||
@ -186,10 +187,20 @@ export async function runGatewayLoop(params: {
|
||||
const onSigusr1 = () => {
|
||||
gatewayLog.info("signal SIGUSR1 received");
|
||||
const authorized = consumeGatewaySigusr1RestartAuthorization();
|
||||
if (!authorized && !isGatewaySigusr1RestartExternallyAllowed()) {
|
||||
gatewayLog.warn(
|
||||
"SIGUSR1 restart ignored (not authorized; commands.restart=false or use gateway tool).",
|
||||
);
|
||||
if (!authorized) {
|
||||
if (!isGatewaySigusr1RestartExternallyAllowed()) {
|
||||
gatewayLog.warn(
|
||||
"SIGUSR1 restart ignored (not authorized; commands.restart=false or use gateway tool).",
|
||||
);
|
||||
return;
|
||||
}
|
||||
if (shuttingDown) {
|
||||
gatewayLog.info("received SIGUSR1 during shutdown; ignoring");
|
||||
return;
|
||||
}
|
||||
// External SIGUSR1 requests should still reuse the in-process restart
|
||||
// scheduler so idle drain and restart coalescing stay consistent.
|
||||
scheduleGatewaySigusr1Restart({ delayMs: 0, reason: "SIGUSR1" });
|
||||
return;
|
||||
}
|
||||
markGatewaySigusr1RestartHandled();
|
||||
|
||||
@ -427,6 +427,8 @@ export const FIELD_HELP: Record<string, string> = {
|
||||
"gateway.reload.mode":
|
||||
'Controls how config edits are applied: "off" ignores live edits, "restart" always restarts, "hot" applies in-process, and "hybrid" tries hot then restarts if required. Keep "hybrid" for safest routine updates.',
|
||||
"gateway.reload.debounceMs": "Debounce window (ms) before applying config changes.",
|
||||
"gateway.reload.deferralTimeoutMs":
|
||||
"Maximum time (ms) to wait for in-flight operations to complete before forcing a SIGUSR1 restart. Default: 300000 (5 minutes). Lower values risk aborting active subagent LLM calls.",
|
||||
"gateway.nodes.browser.mode":
|
||||
'Node browser routing ("auto" = pick single connected browser node, "manual" = require node param, "off" = disable).',
|
||||
"gateway.nodes.browser.node": "Pin browser routing to a specific node id or name (optional).",
|
||||
|
||||
@ -279,6 +279,7 @@ export const FIELD_LABELS: Record<string, string> = {
|
||||
"OpenAI Chat Completions Image Timeout (ms)",
|
||||
"gateway.reload.mode": "Config Reload Mode",
|
||||
"gateway.reload.debounceMs": "Config Reload Debounce (ms)",
|
||||
"gateway.reload.deferralTimeoutMs": "Restart Deferral Timeout (ms)",
|
||||
"gateway.nodes.browser.mode": "Gateway Node Browser Mode",
|
||||
"gateway.nodes.browser.node": "Gateway Node Browser Pin",
|
||||
"gateway.nodes.allowCommands": "Gateway Node Allowlist (Extra Commands)",
|
||||
|
||||
@ -211,6 +211,13 @@ export type GatewayReloadConfig = {
|
||||
mode?: GatewayReloadMode;
|
||||
/** Debounce window for config reloads (ms). Default: 300. */
|
||||
debounceMs?: number;
|
||||
/**
|
||||
* Maximum time (ms) to wait for in-flight operations to complete before
|
||||
* forcing a SIGUSR1 restart. Default: 300000 (5 minutes).
|
||||
* Lower values risk aborting active subagent LLM calls.
|
||||
* @see https://github.com/openclaw/openclaw/issues/47711
|
||||
*/
|
||||
deferralTimeoutMs?: number;
|
||||
};
|
||||
|
||||
export type GatewayHttpChatCompletionsConfig = {
|
||||
|
||||
@ -728,6 +728,7 @@ export const OpenClawSchema = z
|
||||
])
|
||||
.optional(),
|
||||
debounceMs: z.number().int().min(0).optional(),
|
||||
deferralTimeoutMs: z.number().int().min(0).optional(),
|
||||
})
|
||||
.strict()
|
||||
.optional(),
|
||||
|
||||
@ -219,6 +219,7 @@ export function createGatewayReloadHandlers(params: {
|
||||
|
||||
deferGatewayRestartUntilIdle({
|
||||
getPendingCount: () => getActiveCounts().totalActive,
|
||||
maxWaitMs: nextConfig.gateway?.reload?.deferralTimeoutMs,
|
||||
hooks: {
|
||||
onReady: () => {
|
||||
restartPending = false;
|
||||
|
||||
@ -190,8 +190,8 @@ describe("infra runtime", () => {
|
||||
await vi.advanceTimersByTimeAsync(0);
|
||||
expect(emitSpy).not.toHaveBeenCalledWith("SIGUSR1");
|
||||
|
||||
// Advance past the 90s max deferral wait
|
||||
await vi.advanceTimersByTimeAsync(90_000);
|
||||
// Advance past the 5-minute max deferral wait
|
||||
await vi.advanceTimersByTimeAsync(300_000);
|
||||
expect(emitSpy).toHaveBeenCalledWith("SIGUSR1");
|
||||
} finally {
|
||||
process.removeListener("SIGUSR1", handler);
|
||||
|
||||
119
src/infra/restart.deferral-timeout.test.ts
Normal file
119
src/infra/restart.deferral-timeout.test.ts
Normal file
@ -0,0 +1,119 @@
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import { __testing, deferGatewayRestartUntilIdle, type RestartDeferralHooks } from "./restart.js";
|
||||
|
||||
describe("deferGatewayRestartUntilIdle timeout", () => {
|
||||
beforeEach(() => {
|
||||
vi.useFakeTimers();
|
||||
__testing.resetSigusr1State();
|
||||
// Add a listener so emitGatewayRestart uses process.emit instead of process.kill
|
||||
process.on("SIGUSR1", () => {});
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
vi.useRealTimers();
|
||||
vi.restoreAllMocks();
|
||||
__testing.resetSigusr1State();
|
||||
process.removeAllListeners("SIGUSR1");
|
||||
});
|
||||
|
||||
it("uses default 5-minute timeout when maxWaitMs is not specified", () => {
|
||||
const hooks: RestartDeferralHooks = {
|
||||
onTimeout: vi.fn(),
|
||||
onReady: vi.fn(),
|
||||
};
|
||||
|
||||
// Always return 1 pending item to prevent draining
|
||||
deferGatewayRestartUntilIdle({
|
||||
getPendingCount: () => 1,
|
||||
hooks,
|
||||
});
|
||||
|
||||
// Advance to just before 5 minutes — should NOT have timed out yet
|
||||
vi.advanceTimersByTime(299_999);
|
||||
expect(hooks.onTimeout).not.toHaveBeenCalled();
|
||||
|
||||
// Advance past 5 minutes — should time out
|
||||
vi.advanceTimersByTime(1);
|
||||
expect(hooks.onTimeout).toHaveBeenCalledOnce();
|
||||
expect(hooks.onReady).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("respects custom maxWaitMs configuration", () => {
|
||||
const hooks: RestartDeferralHooks = {
|
||||
onTimeout: vi.fn(),
|
||||
onReady: vi.fn(),
|
||||
};
|
||||
|
||||
const customTimeoutMs = 120_000; // 2 minutes
|
||||
|
||||
deferGatewayRestartUntilIdle({
|
||||
getPendingCount: () => 1,
|
||||
maxWaitMs: customTimeoutMs,
|
||||
hooks,
|
||||
});
|
||||
|
||||
// Advance to just before 2 minutes
|
||||
vi.advanceTimersByTime(119_999);
|
||||
expect(hooks.onTimeout).not.toHaveBeenCalled();
|
||||
|
||||
// Advance past 2 minutes
|
||||
vi.advanceTimersByTime(1);
|
||||
expect(hooks.onTimeout).toHaveBeenCalledOnce();
|
||||
});
|
||||
|
||||
it("calls onReady and does not timeout when pending count drops to 0", () => {
|
||||
const hooks: RestartDeferralHooks = {
|
||||
onTimeout: vi.fn(),
|
||||
onReady: vi.fn(),
|
||||
};
|
||||
|
||||
let pending = 3;
|
||||
|
||||
deferGatewayRestartUntilIdle({
|
||||
getPendingCount: () => pending,
|
||||
hooks,
|
||||
});
|
||||
|
||||
// Advance a few poll intervals, then drain
|
||||
vi.advanceTimersByTime(1000);
|
||||
expect(hooks.onReady).not.toHaveBeenCalled();
|
||||
|
||||
pending = 0;
|
||||
vi.advanceTimersByTime(500); // Next poll interval
|
||||
expect(hooks.onReady).toHaveBeenCalledOnce();
|
||||
expect(hooks.onTimeout).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("immediately restarts when pending count is 0", () => {
|
||||
const hooks: RestartDeferralHooks = {
|
||||
onReady: vi.fn(),
|
||||
onTimeout: vi.fn(),
|
||||
};
|
||||
|
||||
deferGatewayRestartUntilIdle({
|
||||
getPendingCount: () => 0,
|
||||
hooks,
|
||||
});
|
||||
|
||||
// onReady should be called synchronously
|
||||
expect(hooks.onReady).toHaveBeenCalledOnce();
|
||||
expect(hooks.onTimeout).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("handles getPendingCount error by restarting immediately", () => {
|
||||
const hooks: RestartDeferralHooks = {
|
||||
onCheckError: vi.fn(),
|
||||
onReady: vi.fn(),
|
||||
};
|
||||
|
||||
deferGatewayRestartUntilIdle({
|
||||
getPendingCount: () => {
|
||||
throw new Error("store corrupted");
|
||||
},
|
||||
hooks,
|
||||
});
|
||||
|
||||
expect(hooks.onCheckError).toHaveBeenCalledOnce();
|
||||
expect(hooks.onReady).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
@ -1,6 +1,7 @@
|
||||
import { spawnSync } from "node:child_process";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import { loadConfig } from "../config/config.js";
|
||||
import {
|
||||
resolveGatewayLaunchAgentLabel,
|
||||
resolveGatewaySystemdServiceName,
|
||||
@ -19,8 +20,9 @@ export type RestartAttempt = {
|
||||
const SPAWN_TIMEOUT_MS = 2000;
|
||||
const SIGUSR1_AUTH_GRACE_MS = 5000;
|
||||
const DEFAULT_DEFERRAL_POLL_MS = 500;
|
||||
// Cover slow in-flight embedded compaction work before forcing restart.
|
||||
const DEFAULT_DEFERRAL_MAX_WAIT_MS = 90_000;
|
||||
// Default to 5 minutes to avoid aborting in-flight subagent LLM calls.
|
||||
// Configurable via gateway.reload.deferralTimeoutMs.
|
||||
const DEFAULT_DEFERRAL_MAX_WAIT_MS = 300_000;
|
||||
const RESTART_COOLDOWN_MS = 30_000;
|
||||
|
||||
const restartLog = createSubsystemLogger("restart");
|
||||
@ -475,7 +477,11 @@ export function scheduleGatewaySigusr1Restart(opts?: {
|
||||
emitGatewayRestart();
|
||||
return;
|
||||
}
|
||||
deferGatewayRestartUntilIdle({ getPendingCount: pendingCheck });
|
||||
const cfg = loadConfig();
|
||||
deferGatewayRestartUntilIdle({
|
||||
getPendingCount: pendingCheck,
|
||||
maxWaitMs: cfg.gateway?.reload?.deferralTimeoutMs,
|
||||
});
|
||||
},
|
||||
Math.max(0, requestedDueAt - nowMs),
|
||||
);
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user