Fixes two related issues: - #12768: Gateway restart notification - #18612: Agent does not resume after self-triggered gateway restart ## Root cause In ab4a08a82 ('fix: defer gateway restart until all replies are sent'), agentCommand() was replaced with deliverOutboundPayloads() in scheduleRestartSentinelWake(). This fixed a pre-restart race condition (correct) but accidentally made delivery one-way: the user is notified but the agent never sees the restart message and does not resume. A compounding bug meant the sentinel was also being written with stale routing data. extractDeliveryInfo() reads the persisted session store, which heartbeat runs frequently overwrite to { channel: 'webchat', to: 'heartbeat' } — an internal sink. So even restoring agentCommand() alone would fail: the sentinel's deliveryContext was pointing nowhere. ## Fix (four parts) **Part 1 — src/agents/openclaw-tools.ts** Forward the live delivery fields (agentChannel, agentTo, agentThreadId, agentAccountId) from createOpenClawTools() into createGatewayTool(). These values are captured from the current inbound message context and are accurate; they were available at the callsite but not being passed. **Part 2 — src/agents/tools/gateway-tool.ts** Prefer liveDeliveryContext (built from opts.agentChannel / agentTo / agentAccountId) over extractDeliveryInfo() when writing the sentinel. Pass deliveryContext in config.apply, config.patch, and update.run RPC calls so the server-side handlers receive it. **Part 3 — src/gateway/server-restart-sentinel.ts** Restore agentCommand() in place of deliverOutboundPayloads(). The function runs in the new process post-restart, where there are zero in-flight replies, so the pre-restart race condition does not apply. agentCommand() creates a full agent turn: the restart message is delivered to the user AND the agent sees it in its conversation history, allowing it to resume without waiting for the user to send a new message. **Part 4 — src/gateway/protocol/schema/config.ts** Add deliveryContext as an optional field to ConfigApplyLikeParamsSchema (shared by config.apply and config.patch) and UpdateRunParamsSchema. The additionalProperties: false constraint was silently dropping the field before it reached the server-side handlers. Also updated resolveConfigRestartRequest() in config.ts and the update.run handler in update.ts to prefer params.deliveryContext over extractDeliveryInfo(). ## Why the heartbeat approach fails An alternative approach (requestHeartbeatNow + enqueueSystemEvent) was tested and rejected: the heartbeat does fire, but its delivery target comes from the session store (lastChannel/lastTo), which reads 'webchat/heartbeat' due to heartbeat contamination. Responses route to an internal sink and are silently dropped. agentCommand() is the correct tool because it creates a turn with explicit delivery context attached.
This commit is contained in:
parent
598f1826d8
commit
ade14241ba
@ -175,6 +175,10 @@ export function createOpenClawTools(
|
||||
...(imageGenerateTool ? [imageGenerateTool] : []),
|
||||
createGatewayTool({
|
||||
agentSessionKey: options?.agentSessionKey,
|
||||
agentChannel: options?.agentChannel != null ? String(options.agentChannel) : undefined,
|
||||
agentTo: options?.agentTo,
|
||||
agentThreadId: options?.agentThreadId,
|
||||
agentAccountId: options?.agentAccountId,
|
||||
config: options?.config,
|
||||
}),
|
||||
createAgentsListTool({
|
||||
|
||||
@ -69,6 +69,10 @@ const GatewayToolSchema = Type.Object({
|
||||
|
||||
export function createGatewayTool(opts?: {
|
||||
agentSessionKey?: string;
|
||||
agentChannel?: string;
|
||||
agentTo?: string;
|
||||
agentThreadId?: string | number;
|
||||
agentAccountId?: string;
|
||||
config?: OpenClawConfig;
|
||||
}): AnyAgentTool {
|
||||
return {
|
||||
@ -99,9 +103,24 @@ export function createGatewayTool(opts?: {
|
||||
: undefined;
|
||||
const note =
|
||||
typeof params.note === "string" && params.note.trim() ? params.note.trim() : undefined;
|
||||
// Extract channel + threadId for routing after restart
|
||||
// Supports both :thread: (most channels) and :topic: (Telegram)
|
||||
const { deliveryContext, threadId } = extractDeliveryInfo(sessionKey);
|
||||
// Prefer the live delivery context captured during the current agent
|
||||
// run over extractDeliveryInfo() (which reads the persisted session
|
||||
// store). The session store is frequently overwritten by heartbeat
|
||||
// runs to { channel: "webchat", to: "heartbeat" }, causing the
|
||||
// sentinel to write stale routing data that fails post-restart.
|
||||
// See #18612.
|
||||
const liveContext =
|
||||
opts?.agentChannel != null && String(opts.agentChannel).trim()
|
||||
? {
|
||||
channel: String(opts.agentChannel).trim(),
|
||||
to: opts?.agentTo ?? undefined,
|
||||
accountId: opts?.agentAccountId ?? undefined,
|
||||
}
|
||||
: undefined;
|
||||
const extracted = extractDeliveryInfo(sessionKey);
|
||||
const deliveryContext = liveContext ?? extracted.deliveryContext;
|
||||
const threadId =
|
||||
opts?.agentThreadId != null ? String(opts.agentThreadId) : extracted.threadId;
|
||||
const payload: RestartSentinelPayload = {
|
||||
kind: "restart",
|
||||
status: "ok",
|
||||
@ -133,10 +152,25 @@ export function createGatewayTool(opts?: {
|
||||
|
||||
const gatewayOpts = readGatewayCallOptions(params);
|
||||
|
||||
// Build the live delivery context from the current agent run's routing
|
||||
// fields. This is passed to server-side handlers so they can write an
|
||||
// accurate sentinel without reading the (potentially stale) session
|
||||
// store. The store is frequently overwritten by heartbeat runs to
|
||||
// { channel: "webchat", to: "heartbeat" }. See #18612.
|
||||
const liveDeliveryContextForRpc =
|
||||
opts?.agentChannel != null && String(opts.agentChannel).trim()
|
||||
? {
|
||||
channel: String(opts.agentChannel).trim(),
|
||||
to: opts?.agentTo ?? undefined,
|
||||
accountId: opts?.agentAccountId ?? undefined,
|
||||
}
|
||||
: undefined;
|
||||
|
||||
const resolveGatewayWriteMeta = (): {
|
||||
sessionKey: string | undefined;
|
||||
note: string | undefined;
|
||||
restartDelayMs: number | undefined;
|
||||
deliveryContext: typeof liveDeliveryContextForRpc;
|
||||
} => {
|
||||
const sessionKey =
|
||||
typeof params.sessionKey === "string" && params.sessionKey.trim()
|
||||
@ -148,7 +182,7 @@ export function createGatewayTool(opts?: {
|
||||
typeof params.restartDelayMs === "number" && Number.isFinite(params.restartDelayMs)
|
||||
? Math.floor(params.restartDelayMs)
|
||||
: undefined;
|
||||
return { sessionKey, note, restartDelayMs };
|
||||
return { sessionKey, note, restartDelayMs, deliveryContext: liveDeliveryContextForRpc };
|
||||
};
|
||||
|
||||
const resolveConfigWriteParams = async (): Promise<{
|
||||
@ -157,6 +191,7 @@ export function createGatewayTool(opts?: {
|
||||
sessionKey: string | undefined;
|
||||
note: string | undefined;
|
||||
restartDelayMs: number | undefined;
|
||||
deliveryContext: typeof liveDeliveryContextForRpc;
|
||||
}> => {
|
||||
const raw = readStringParam(params, "raw", { required: true });
|
||||
let baseHash = readStringParam(params, "baseHash");
|
||||
@ -183,7 +218,7 @@ export function createGatewayTool(opts?: {
|
||||
return jsonResult({ ok: true, result });
|
||||
}
|
||||
if (action === "config.apply") {
|
||||
const { raw, baseHash, sessionKey, note, restartDelayMs } =
|
||||
const { raw, baseHash, sessionKey, note, restartDelayMs, deliveryContext } =
|
||||
await resolveConfigWriteParams();
|
||||
const result = await callGatewayTool("config.apply", gatewayOpts, {
|
||||
raw,
|
||||
@ -191,11 +226,12 @@ export function createGatewayTool(opts?: {
|
||||
sessionKey,
|
||||
note,
|
||||
restartDelayMs,
|
||||
deliveryContext,
|
||||
});
|
||||
return jsonResult({ ok: true, result });
|
||||
}
|
||||
if (action === "config.patch") {
|
||||
const { raw, baseHash, sessionKey, note, restartDelayMs } =
|
||||
const { raw, baseHash, sessionKey, note, restartDelayMs, deliveryContext } =
|
||||
await resolveConfigWriteParams();
|
||||
const result = await callGatewayTool("config.patch", gatewayOpts, {
|
||||
raw,
|
||||
@ -203,11 +239,12 @@ export function createGatewayTool(opts?: {
|
||||
sessionKey,
|
||||
note,
|
||||
restartDelayMs,
|
||||
deliveryContext,
|
||||
});
|
||||
return jsonResult({ ok: true, result });
|
||||
}
|
||||
if (action === "update.run") {
|
||||
const { sessionKey, note, restartDelayMs } = resolveGatewayWriteMeta();
|
||||
const { sessionKey, note, restartDelayMs, deliveryContext } = resolveGatewayWriteMeta();
|
||||
const updateTimeoutMs = gatewayOpts.timeoutMs ?? DEFAULT_UPDATE_TIMEOUT_MS;
|
||||
const updateGatewayOpts = {
|
||||
...gatewayOpts,
|
||||
@ -217,6 +254,7 @@ export function createGatewayTool(opts?: {
|
||||
sessionKey,
|
||||
note,
|
||||
restartDelayMs,
|
||||
deliveryContext,
|
||||
timeoutMs: updateTimeoutMs,
|
||||
});
|
||||
return jsonResult({ ok: true, result });
|
||||
|
||||
@ -17,6 +17,22 @@ export const ConfigSetParamsSchema = Type.Object(
|
||||
{ additionalProperties: false },
|
||||
);
|
||||
|
||||
// deliveryContext carries the live channel/to/accountId from the agent run so
|
||||
// that the restart sentinel has accurate routing data even after heartbeats
|
||||
// have overwritten the session store with { channel: "webchat", to: "heartbeat" }.
|
||||
// Without this field, the additionalProperties: false constraint silently drops
|
||||
// it and the sentinel falls back to stale session store data. See #18612.
|
||||
const DeliveryContextSchema = Type.Optional(
|
||||
Type.Object(
|
||||
{
|
||||
channel: Type.Optional(Type.String()),
|
||||
to: Type.Optional(Type.String()),
|
||||
accountId: Type.Optional(Type.String()),
|
||||
},
|
||||
{ additionalProperties: false },
|
||||
),
|
||||
);
|
||||
|
||||
const ConfigApplyLikeParamsSchema = Type.Object(
|
||||
{
|
||||
raw: NonEmptyString,
|
||||
@ -24,6 +40,7 @@ const ConfigApplyLikeParamsSchema = Type.Object(
|
||||
sessionKey: Type.Optional(Type.String()),
|
||||
note: Type.Optional(Type.String()),
|
||||
restartDelayMs: Type.Optional(Type.Integer({ minimum: 0 })),
|
||||
deliveryContext: DeliveryContextSchema,
|
||||
},
|
||||
{ additionalProperties: false },
|
||||
);
|
||||
@ -46,6 +63,7 @@ export const UpdateRunParamsSchema = Type.Object(
|
||||
note: Type.Optional(Type.String()),
|
||||
restartDelayMs: Type.Optional(Type.Integer({ minimum: 0 })),
|
||||
timeoutMs: Type.Optional(Type.Integer({ minimum: 1 })),
|
||||
deliveryContext: DeliveryContextSchema,
|
||||
},
|
||||
{ additionalProperties: false },
|
||||
);
|
||||
|
||||
@ -194,9 +194,14 @@ function resolveConfigRestartRequest(params: unknown): {
|
||||
} {
|
||||
const { sessionKey, note, restartDelayMs } = parseRestartRequestParams(params);
|
||||
|
||||
// Extract deliveryContext + threadId for routing after restart
|
||||
// Supports both :thread: (most channels) and :topic: (Telegram)
|
||||
const { deliveryContext, threadId } = extractDeliveryInfo(sessionKey);
|
||||
// Extract threadId from the session key (reliable — derived from key, not store).
|
||||
// For deliveryContext, prefer the live context passed by the client over
|
||||
// extractDeliveryInfo(), which reads the persisted session store. Heartbeat
|
||||
// runs overwrite the store to { channel: "webchat", to: "heartbeat" }, so
|
||||
// reading it here would produce stale routing data. See #18612.
|
||||
const { deliveryContext: extractedDeliveryContext, threadId } = extractDeliveryInfo(sessionKey);
|
||||
const paramsDeliveryContext = parseDeliveryContextFromParams(params);
|
||||
const deliveryContext = paramsDeliveryContext ?? extractedDeliveryContext;
|
||||
|
||||
return {
|
||||
sessionKey,
|
||||
@ -207,6 +212,31 @@ function resolveConfigRestartRequest(params: unknown): {
|
||||
};
|
||||
}
|
||||
|
||||
function parseDeliveryContextFromParams(
|
||||
params: unknown,
|
||||
): { channel?: string; to?: string; accountId?: string } | undefined {
|
||||
const raw = (params as { deliveryContext?: unknown }).deliveryContext;
|
||||
if (!raw || typeof raw !== "object") {
|
||||
return undefined;
|
||||
}
|
||||
const channel =
|
||||
typeof (raw as { channel?: unknown }).channel === "string"
|
||||
? (raw as { channel: string }).channel.trim() || undefined
|
||||
: undefined;
|
||||
const to =
|
||||
typeof (raw as { to?: unknown }).to === "string"
|
||||
? (raw as { to: string }).to.trim() || undefined
|
||||
: undefined;
|
||||
const accountId =
|
||||
typeof (raw as { accountId?: unknown }).accountId === "string"
|
||||
? (raw as { accountId: string }).accountId.trim() || undefined
|
||||
: undefined;
|
||||
if (!channel && !to) {
|
||||
return undefined;
|
||||
}
|
||||
return { channel, to, accountId };
|
||||
}
|
||||
|
||||
function buildConfigRestartSentinelPayload(params: {
|
||||
kind: RestartSentinelPayload["kind"];
|
||||
mode: string;
|
||||
|
||||
@ -22,7 +22,22 @@ export const updateHandlers: GatewayRequestHandlers = {
|
||||
}
|
||||
const actor = resolveControlPlaneActor(client);
|
||||
const { sessionKey, note, restartDelayMs } = parseRestartRequestParams(params);
|
||||
const { deliveryContext, threadId } = extractDeliveryInfo(sessionKey);
|
||||
// Prefer live deliveryContext from params over extractDeliveryInfo() (see #18612).
|
||||
const paramsDeliveryContextRaw = (params as { deliveryContext?: unknown }).deliveryContext;
|
||||
const paramsDeliveryContext =
|
||||
paramsDeliveryContextRaw && typeof paramsDeliveryContextRaw === "object"
|
||||
? (() => {
|
||||
const dc = paramsDeliveryContextRaw as Record<string, unknown>;
|
||||
const channel =
|
||||
typeof dc.channel === "string" ? dc.channel.trim() || undefined : undefined;
|
||||
const to = typeof dc.to === "string" ? dc.to.trim() || undefined : undefined;
|
||||
const accountId =
|
||||
typeof dc.accountId === "string" ? dc.accountId.trim() || undefined : undefined;
|
||||
return channel || to ? { channel, to, accountId } : undefined;
|
||||
})()
|
||||
: undefined;
|
||||
const { deliveryContext: extractedDeliveryContext, threadId } = extractDeliveryInfo(sessionKey);
|
||||
const deliveryContext = paramsDeliveryContext ?? extractedDeliveryContext;
|
||||
const timeoutMsRaw = (params as { timeoutMs?: unknown }).timeoutMs;
|
||||
const timeoutMs =
|
||||
typeof timeoutMsRaw === "number" && Number.isFinite(timeoutMsRaw)
|
||||
|
||||
@ -25,8 +25,9 @@ const mocks = vi.hoisted(() => ({
|
||||
})),
|
||||
normalizeChannelId: vi.fn((channel: string) => channel),
|
||||
resolveOutboundTarget: vi.fn(() => ({ ok: true as const, to: "+15550002" })),
|
||||
deliverOutboundPayloads: vi.fn(async () => []),
|
||||
agentCommand: vi.fn(async () => undefined),
|
||||
enqueueSystemEvent: vi.fn(),
|
||||
defaultRuntime: {},
|
||||
}));
|
||||
|
||||
vi.mock("../agents/agent-scope.js", () => ({
|
||||
@ -68,8 +69,12 @@ vi.mock("../infra/outbound/targets.js", () => ({
|
||||
resolveOutboundTarget: mocks.resolveOutboundTarget,
|
||||
}));
|
||||
|
||||
vi.mock("../infra/outbound/deliver.js", () => ({
|
||||
deliverOutboundPayloads: mocks.deliverOutboundPayloads,
|
||||
vi.mock("../commands/agent.js", () => ({
|
||||
agentCommand: mocks.agentCommand,
|
||||
}));
|
||||
|
||||
vi.mock("../runtime.js", () => ({
|
||||
defaultRuntime: mocks.defaultRuntime,
|
||||
}));
|
||||
|
||||
vi.mock("../infra/system-events.js", () => ({
|
||||
@ -79,16 +84,62 @@ vi.mock("../infra/system-events.js", () => ({
|
||||
const { scheduleRestartSentinelWake } = await import("./server-restart-sentinel.js");
|
||||
|
||||
describe("scheduleRestartSentinelWake", () => {
|
||||
it("forwards session context to outbound delivery", async () => {
|
||||
it("calls agentCommand with resolved channel, to, and sessionKey after restart", async () => {
|
||||
await scheduleRestartSentinelWake({ deps: {} as never });
|
||||
|
||||
expect(mocks.deliverOutboundPayloads).toHaveBeenCalledWith(
|
||||
expect(mocks.agentCommand).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
channel: "whatsapp",
|
||||
message: "restart message",
|
||||
sessionKey: "agent:main:main",
|
||||
to: "+15550002",
|
||||
session: { key: "agent:main:main", agentId: "agent-from-key" },
|
||||
channel: "whatsapp",
|
||||
deliver: true,
|
||||
bestEffortDeliver: true,
|
||||
messageChannel: "whatsapp",
|
||||
accountId: "acct-2",
|
||||
}),
|
||||
mocks.defaultRuntime,
|
||||
{},
|
||||
);
|
||||
expect(mocks.enqueueSystemEvent).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("falls back to enqueueSystemEvent when agentCommand throws", async () => {
|
||||
mocks.agentCommand.mockRejectedValueOnce(new Error("agent failed"));
|
||||
mocks.enqueueSystemEvent.mockClear();
|
||||
|
||||
await scheduleRestartSentinelWake({ deps: {} as never });
|
||||
|
||||
expect(mocks.enqueueSystemEvent).toHaveBeenCalledWith(
|
||||
expect.stringContaining("restart summary"),
|
||||
{ sessionKey: "agent:main:main" },
|
||||
);
|
||||
});
|
||||
|
||||
it("falls back to enqueueSystemEvent when channel cannot be resolved (no channel in origin)", async () => {
|
||||
mocks.resolveOutboundTarget.mockReturnValueOnce({
|
||||
ok: false,
|
||||
error: new Error("no-target"),
|
||||
} as never);
|
||||
mocks.agentCommand.mockClear();
|
||||
mocks.enqueueSystemEvent.mockClear();
|
||||
|
||||
await scheduleRestartSentinelWake({ deps: {} as never });
|
||||
|
||||
expect(mocks.agentCommand).not.toHaveBeenCalled();
|
||||
expect(mocks.enqueueSystemEvent).toHaveBeenCalledWith("restart message", {
|
||||
sessionKey: "agent:main:main",
|
||||
});
|
||||
});
|
||||
|
||||
it("falls back to enqueueSystemEvent on main session key when sentinel has no sessionKey", async () => {
|
||||
mocks.consumeRestartSentinel.mockResolvedValueOnce({ payload: { sessionKey: "" } } as never);
|
||||
mocks.enqueueSystemEvent.mockClear();
|
||||
|
||||
await scheduleRestartSentinelWake({ deps: {} as never });
|
||||
|
||||
expect(mocks.enqueueSystemEvent).toHaveBeenCalledWith("restart message", {
|
||||
sessionKey: "agent:main:main",
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@ -1,10 +1,9 @@
|
||||
import { resolveAnnounceTargetFromKey } from "../agents/tools/sessions-send-helpers.js";
|
||||
import { normalizeChannelId } from "../channels/plugins/index.js";
|
||||
import type { CliDeps } from "../cli/deps.js";
|
||||
import { agentCommand } from "../commands/agent.js";
|
||||
import { resolveMainSessionKeyFromConfig } from "../config/sessions.js";
|
||||
import { parseSessionThreadInfo } from "../config/sessions/delivery-info.js";
|
||||
import { deliverOutboundPayloads } from "../infra/outbound/deliver.js";
|
||||
import { buildOutboundSessionContext } from "../infra/outbound/session-context.js";
|
||||
import { resolveOutboundTarget } from "../infra/outbound/targets.js";
|
||||
import {
|
||||
consumeRestartSentinel,
|
||||
@ -12,10 +11,11 @@ import {
|
||||
summarizeRestartSentinel,
|
||||
} from "../infra/restart-sentinel.js";
|
||||
import { enqueueSystemEvent } from "../infra/system-events.js";
|
||||
import { defaultRuntime } from "../runtime.js";
|
||||
import { deliveryContextFromSession, mergeDeliveryContext } from "../utils/delivery-context.js";
|
||||
import { loadSessionEntry } from "./session-utils.js";
|
||||
|
||||
export async function scheduleRestartSentinelWake(_params: { deps: CliDeps }) {
|
||||
export async function scheduleRestartSentinelWake(params: { deps: CliDeps }) {
|
||||
const sentinel = await consumeRestartSentinel();
|
||||
if (!sentinel) {
|
||||
return;
|
||||
@ -76,30 +76,29 @@ export async function scheduleRestartSentinelWake(_params: { deps: CliDeps }) {
|
||||
sessionThreadId ??
|
||||
(origin?.threadId != null ? String(origin.threadId) : undefined);
|
||||
|
||||
// Slack uses replyToId (thread_ts) for threading, not threadId.
|
||||
// The reply path does this mapping but deliverOutboundPayloads does not,
|
||||
// so we must convert here to ensure post-restart notifications land in
|
||||
// the originating Slack thread. See #17716.
|
||||
const isSlack = channel === "slack";
|
||||
const replyToId = isSlack && threadId != null && threadId !== "" ? String(threadId) : undefined;
|
||||
const resolvedThreadId = isSlack ? undefined : threadId;
|
||||
const outboundSession = buildOutboundSessionContext({
|
||||
cfg,
|
||||
sessionKey,
|
||||
});
|
||||
|
||||
try {
|
||||
await deliverOutboundPayloads({
|
||||
cfg,
|
||||
channel,
|
||||
to: resolved.to,
|
||||
accountId: origin?.accountId,
|
||||
replyToId,
|
||||
threadId: resolvedThreadId,
|
||||
payloads: [{ text: message }],
|
||||
session: outboundSession,
|
||||
bestEffort: true,
|
||||
});
|
||||
// Use agentCommand() rather than deliverOutboundPayloads() so the restart
|
||||
// message is a proper agent turn: the user is notified AND the agent sees
|
||||
// the message in its conversation history and can resume autonomously.
|
||||
//
|
||||
// This is safe post-restart because scheduleRestartSentinelWake() runs in
|
||||
// the new process, where there are zero in-flight replies. The pre-restart
|
||||
// race condition fixed in ab4a08a82 does not apply here.
|
||||
await agentCommand(
|
||||
{
|
||||
message,
|
||||
sessionKey,
|
||||
to: resolved.to,
|
||||
channel,
|
||||
deliver: true,
|
||||
bestEffortDeliver: true,
|
||||
messageChannel: channel,
|
||||
threadId,
|
||||
accountId: origin?.accountId,
|
||||
},
|
||||
defaultRuntime,
|
||||
params.deps,
|
||||
);
|
||||
} catch (err) {
|
||||
enqueueSystemEvent(`${summary}\n${String(err)}`, { sessionKey });
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user