diff --git a/src/agents/openclaw-tools.ts b/src/agents/openclaw-tools.ts index de5e91fdf0c..631d59d8a2a 100644 --- a/src/agents/openclaw-tools.ts +++ b/src/agents/openclaw-tools.ts @@ -175,6 +175,10 @@ export function createOpenClawTools( ...(imageGenerateTool ? [imageGenerateTool] : []), createGatewayTool({ agentSessionKey: options?.agentSessionKey, + agentChannel: options?.agentChannel != null ? String(options.agentChannel) : undefined, + agentTo: options?.agentTo, + agentThreadId: options?.agentThreadId, + agentAccountId: options?.agentAccountId, config: options?.config, }), createAgentsListTool({ diff --git a/src/agents/tools/gateway-tool.ts b/src/agents/tools/gateway-tool.ts index 33b8d86adcf..d4b7b707dd6 100644 --- a/src/agents/tools/gateway-tool.ts +++ b/src/agents/tools/gateway-tool.ts @@ -69,6 +69,10 @@ const GatewayToolSchema = Type.Object({ export function createGatewayTool(opts?: { agentSessionKey?: string; + agentChannel?: string; + agentTo?: string; + agentThreadId?: string | number; + agentAccountId?: string; config?: OpenClawConfig; }): AnyAgentTool { return { @@ -99,9 +103,24 @@ export function createGatewayTool(opts?: { : undefined; const note = typeof params.note === "string" && params.note.trim() ? params.note.trim() : undefined; - // Extract channel + threadId for routing after restart - // Supports both :thread: (most channels) and :topic: (Telegram) - const { deliveryContext, threadId } = extractDeliveryInfo(sessionKey); + // Prefer the live delivery context captured during the current agent + // run over extractDeliveryInfo() (which reads the persisted session + // store). The session store is frequently overwritten by heartbeat + // runs to { channel: "webchat", to: "heartbeat" }, causing the + // sentinel to write stale routing data that fails post-restart. + // See #18612. + const liveContext = + opts?.agentChannel != null && String(opts.agentChannel).trim() + ? { + channel: String(opts.agentChannel).trim(), + to: opts?.agentTo ?? undefined, + accountId: opts?.agentAccountId ?? undefined, + } + : undefined; + const extracted = extractDeliveryInfo(sessionKey); + const deliveryContext = liveContext ?? extracted.deliveryContext; + const threadId = + opts?.agentThreadId != null ? String(opts.agentThreadId) : extracted.threadId; const payload: RestartSentinelPayload = { kind: "restart", status: "ok", @@ -133,10 +152,25 @@ export function createGatewayTool(opts?: { const gatewayOpts = readGatewayCallOptions(params); + // Build the live delivery context from the current agent run's routing + // fields. This is passed to server-side handlers so they can write an + // accurate sentinel without reading the (potentially stale) session + // store. The store is frequently overwritten by heartbeat runs to + // { channel: "webchat", to: "heartbeat" }. See #18612. + const liveDeliveryContextForRpc = + opts?.agentChannel != null && String(opts.agentChannel).trim() + ? { + channel: String(opts.agentChannel).trim(), + to: opts?.agentTo ?? undefined, + accountId: opts?.agentAccountId ?? undefined, + } + : undefined; + const resolveGatewayWriteMeta = (): { sessionKey: string | undefined; note: string | undefined; restartDelayMs: number | undefined; + deliveryContext: typeof liveDeliveryContextForRpc; } => { const sessionKey = typeof params.sessionKey === "string" && params.sessionKey.trim() @@ -148,7 +182,7 @@ export function createGatewayTool(opts?: { typeof params.restartDelayMs === "number" && Number.isFinite(params.restartDelayMs) ? Math.floor(params.restartDelayMs) : undefined; - return { sessionKey, note, restartDelayMs }; + return { sessionKey, note, restartDelayMs, deliveryContext: liveDeliveryContextForRpc }; }; const resolveConfigWriteParams = async (): Promise<{ @@ -157,6 +191,7 @@ export function createGatewayTool(opts?: { sessionKey: string | undefined; note: string | undefined; restartDelayMs: number | undefined; + deliveryContext: typeof liveDeliveryContextForRpc; }> => { const raw = readStringParam(params, "raw", { required: true }); let baseHash = readStringParam(params, "baseHash"); @@ -183,7 +218,7 @@ export function createGatewayTool(opts?: { return jsonResult({ ok: true, result }); } if (action === "config.apply") { - const { raw, baseHash, sessionKey, note, restartDelayMs } = + const { raw, baseHash, sessionKey, note, restartDelayMs, deliveryContext } = await resolveConfigWriteParams(); const result = await callGatewayTool("config.apply", gatewayOpts, { raw, @@ -191,11 +226,12 @@ export function createGatewayTool(opts?: { sessionKey, note, restartDelayMs, + deliveryContext, }); return jsonResult({ ok: true, result }); } if (action === "config.patch") { - const { raw, baseHash, sessionKey, note, restartDelayMs } = + const { raw, baseHash, sessionKey, note, restartDelayMs, deliveryContext } = await resolveConfigWriteParams(); const result = await callGatewayTool("config.patch", gatewayOpts, { raw, @@ -203,11 +239,12 @@ export function createGatewayTool(opts?: { sessionKey, note, restartDelayMs, + deliveryContext, }); return jsonResult({ ok: true, result }); } if (action === "update.run") { - const { sessionKey, note, restartDelayMs } = resolveGatewayWriteMeta(); + const { sessionKey, note, restartDelayMs, deliveryContext } = resolveGatewayWriteMeta(); const updateTimeoutMs = gatewayOpts.timeoutMs ?? DEFAULT_UPDATE_TIMEOUT_MS; const updateGatewayOpts = { ...gatewayOpts, @@ -217,6 +254,7 @@ export function createGatewayTool(opts?: { sessionKey, note, restartDelayMs, + deliveryContext, timeoutMs: updateTimeoutMs, }); return jsonResult({ ok: true, result }); diff --git a/src/gateway/protocol/schema/config.ts b/src/gateway/protocol/schema/config.ts index 9d0ec876668..7b97b6864e1 100644 --- a/src/gateway/protocol/schema/config.ts +++ b/src/gateway/protocol/schema/config.ts @@ -17,6 +17,22 @@ export const ConfigSetParamsSchema = Type.Object( { additionalProperties: false }, ); +// deliveryContext carries the live channel/to/accountId from the agent run so +// that the restart sentinel has accurate routing data even after heartbeats +// have overwritten the session store with { channel: "webchat", to: "heartbeat" }. +// Without this field, the additionalProperties: false constraint silently drops +// it and the sentinel falls back to stale session store data. See #18612. +const DeliveryContextSchema = Type.Optional( + Type.Object( + { + channel: Type.Optional(Type.String()), + to: Type.Optional(Type.String()), + accountId: Type.Optional(Type.String()), + }, + { additionalProperties: false }, + ), +); + const ConfigApplyLikeParamsSchema = Type.Object( { raw: NonEmptyString, @@ -24,6 +40,7 @@ const ConfigApplyLikeParamsSchema = Type.Object( sessionKey: Type.Optional(Type.String()), note: Type.Optional(Type.String()), restartDelayMs: Type.Optional(Type.Integer({ minimum: 0 })), + deliveryContext: DeliveryContextSchema, }, { additionalProperties: false }, ); @@ -46,6 +63,7 @@ export const UpdateRunParamsSchema = Type.Object( note: Type.Optional(Type.String()), restartDelayMs: Type.Optional(Type.Integer({ minimum: 0 })), timeoutMs: Type.Optional(Type.Integer({ minimum: 1 })), + deliveryContext: DeliveryContextSchema, }, { additionalProperties: false }, ); diff --git a/src/gateway/server-methods/config.ts b/src/gateway/server-methods/config.ts index 977a59f00b5..c6a55a87315 100644 --- a/src/gateway/server-methods/config.ts +++ b/src/gateway/server-methods/config.ts @@ -194,9 +194,14 @@ function resolveConfigRestartRequest(params: unknown): { } { const { sessionKey, note, restartDelayMs } = parseRestartRequestParams(params); - // Extract deliveryContext + threadId for routing after restart - // Supports both :thread: (most channels) and :topic: (Telegram) - const { deliveryContext, threadId } = extractDeliveryInfo(sessionKey); + // Extract threadId from the session key (reliable — derived from key, not store). + // For deliveryContext, prefer the live context passed by the client over + // extractDeliveryInfo(), which reads the persisted session store. Heartbeat + // runs overwrite the store to { channel: "webchat", to: "heartbeat" }, so + // reading it here would produce stale routing data. See #18612. + const { deliveryContext: extractedDeliveryContext, threadId } = extractDeliveryInfo(sessionKey); + const paramsDeliveryContext = parseDeliveryContextFromParams(params); + const deliveryContext = paramsDeliveryContext ?? extractedDeliveryContext; return { sessionKey, @@ -207,6 +212,31 @@ function resolveConfigRestartRequest(params: unknown): { }; } +function parseDeliveryContextFromParams( + params: unknown, +): { channel?: string; to?: string; accountId?: string } | undefined { + const raw = (params as { deliveryContext?: unknown }).deliveryContext; + if (!raw || typeof raw !== "object") { + return undefined; + } + const channel = + typeof (raw as { channel?: unknown }).channel === "string" + ? (raw as { channel: string }).channel.trim() || undefined + : undefined; + const to = + typeof (raw as { to?: unknown }).to === "string" + ? (raw as { to: string }).to.trim() || undefined + : undefined; + const accountId = + typeof (raw as { accountId?: unknown }).accountId === "string" + ? (raw as { accountId: string }).accountId.trim() || undefined + : undefined; + if (!channel && !to) { + return undefined; + } + return { channel, to, accountId }; +} + function buildConfigRestartSentinelPayload(params: { kind: RestartSentinelPayload["kind"]; mode: string; diff --git a/src/gateway/server-methods/update.ts b/src/gateway/server-methods/update.ts index bf53e2a0227..148b80af1e2 100644 --- a/src/gateway/server-methods/update.ts +++ b/src/gateway/server-methods/update.ts @@ -22,7 +22,22 @@ export const updateHandlers: GatewayRequestHandlers = { } const actor = resolveControlPlaneActor(client); const { sessionKey, note, restartDelayMs } = parseRestartRequestParams(params); - const { deliveryContext, threadId } = extractDeliveryInfo(sessionKey); + // Prefer live deliveryContext from params over extractDeliveryInfo() (see #18612). + const paramsDeliveryContextRaw = (params as { deliveryContext?: unknown }).deliveryContext; + const paramsDeliveryContext = + paramsDeliveryContextRaw && typeof paramsDeliveryContextRaw === "object" + ? (() => { + const dc = paramsDeliveryContextRaw as Record; + const channel = + typeof dc.channel === "string" ? dc.channel.trim() || undefined : undefined; + const to = typeof dc.to === "string" ? dc.to.trim() || undefined : undefined; + const accountId = + typeof dc.accountId === "string" ? dc.accountId.trim() || undefined : undefined; + return channel || to ? { channel, to, accountId } : undefined; + })() + : undefined; + const { deliveryContext: extractedDeliveryContext, threadId } = extractDeliveryInfo(sessionKey); + const deliveryContext = paramsDeliveryContext ?? extractedDeliveryContext; const timeoutMsRaw = (params as { timeoutMs?: unknown }).timeoutMs; const timeoutMs = typeof timeoutMsRaw === "number" && Number.isFinite(timeoutMsRaw) diff --git a/src/gateway/server-restart-sentinel.test.ts b/src/gateway/server-restart-sentinel.test.ts index 187698b06ed..bcb7a57ebd7 100644 --- a/src/gateway/server-restart-sentinel.test.ts +++ b/src/gateway/server-restart-sentinel.test.ts @@ -25,8 +25,9 @@ const mocks = vi.hoisted(() => ({ })), normalizeChannelId: vi.fn((channel: string) => channel), resolveOutboundTarget: vi.fn(() => ({ ok: true as const, to: "+15550002" })), - deliverOutboundPayloads: vi.fn(async () => []), + agentCommand: vi.fn(async () => undefined), enqueueSystemEvent: vi.fn(), + defaultRuntime: {}, })); vi.mock("../agents/agent-scope.js", () => ({ @@ -68,8 +69,12 @@ vi.mock("../infra/outbound/targets.js", () => ({ resolveOutboundTarget: mocks.resolveOutboundTarget, })); -vi.mock("../infra/outbound/deliver.js", () => ({ - deliverOutboundPayloads: mocks.deliverOutboundPayloads, +vi.mock("../commands/agent.js", () => ({ + agentCommand: mocks.agentCommand, +})); + +vi.mock("../runtime.js", () => ({ + defaultRuntime: mocks.defaultRuntime, })); vi.mock("../infra/system-events.js", () => ({ @@ -79,16 +84,62 @@ vi.mock("../infra/system-events.js", () => ({ const { scheduleRestartSentinelWake } = await import("./server-restart-sentinel.js"); describe("scheduleRestartSentinelWake", () => { - it("forwards session context to outbound delivery", async () => { + it("calls agentCommand with resolved channel, to, and sessionKey after restart", async () => { await scheduleRestartSentinelWake({ deps: {} as never }); - expect(mocks.deliverOutboundPayloads).toHaveBeenCalledWith( + expect(mocks.agentCommand).toHaveBeenCalledWith( expect.objectContaining({ - channel: "whatsapp", + message: "restart message", + sessionKey: "agent:main:main", to: "+15550002", - session: { key: "agent:main:main", agentId: "agent-from-key" }, + channel: "whatsapp", + deliver: true, + bestEffortDeliver: true, + messageChannel: "whatsapp", + accountId: "acct-2", }), + mocks.defaultRuntime, + {}, ); expect(mocks.enqueueSystemEvent).not.toHaveBeenCalled(); }); + + it("falls back to enqueueSystemEvent when agentCommand throws", async () => { + mocks.agentCommand.mockRejectedValueOnce(new Error("agent failed")); + mocks.enqueueSystemEvent.mockClear(); + + await scheduleRestartSentinelWake({ deps: {} as never }); + + expect(mocks.enqueueSystemEvent).toHaveBeenCalledWith( + expect.stringContaining("restart summary"), + { sessionKey: "agent:main:main" }, + ); + }); + + it("falls back to enqueueSystemEvent when channel cannot be resolved (no channel in origin)", async () => { + mocks.resolveOutboundTarget.mockReturnValueOnce({ + ok: false, + error: new Error("no-target"), + } as never); + mocks.agentCommand.mockClear(); + mocks.enqueueSystemEvent.mockClear(); + + await scheduleRestartSentinelWake({ deps: {} as never }); + + expect(mocks.agentCommand).not.toHaveBeenCalled(); + expect(mocks.enqueueSystemEvent).toHaveBeenCalledWith("restart message", { + sessionKey: "agent:main:main", + }); + }); + + it("falls back to enqueueSystemEvent on main session key when sentinel has no sessionKey", async () => { + mocks.consumeRestartSentinel.mockResolvedValueOnce({ payload: { sessionKey: "" } } as never); + mocks.enqueueSystemEvent.mockClear(); + + await scheduleRestartSentinelWake({ deps: {} as never }); + + expect(mocks.enqueueSystemEvent).toHaveBeenCalledWith("restart message", { + sessionKey: "agent:main:main", + }); + }); }); diff --git a/src/gateway/server-restart-sentinel.ts b/src/gateway/server-restart-sentinel.ts index e6191942dba..dadf596d6ad 100644 --- a/src/gateway/server-restart-sentinel.ts +++ b/src/gateway/server-restart-sentinel.ts @@ -1,10 +1,9 @@ import { resolveAnnounceTargetFromKey } from "../agents/tools/sessions-send-helpers.js"; import { normalizeChannelId } from "../channels/plugins/index.js"; import type { CliDeps } from "../cli/deps.js"; +import { agentCommand } from "../commands/agent.js"; import { resolveMainSessionKeyFromConfig } from "../config/sessions.js"; import { parseSessionThreadInfo } from "../config/sessions/delivery-info.js"; -import { deliverOutboundPayloads } from "../infra/outbound/deliver.js"; -import { buildOutboundSessionContext } from "../infra/outbound/session-context.js"; import { resolveOutboundTarget } from "../infra/outbound/targets.js"; import { consumeRestartSentinel, @@ -12,10 +11,11 @@ import { summarizeRestartSentinel, } from "../infra/restart-sentinel.js"; import { enqueueSystemEvent } from "../infra/system-events.js"; +import { defaultRuntime } from "../runtime.js"; import { deliveryContextFromSession, mergeDeliveryContext } from "../utils/delivery-context.js"; import { loadSessionEntry } from "./session-utils.js"; -export async function scheduleRestartSentinelWake(_params: { deps: CliDeps }) { +export async function scheduleRestartSentinelWake(params: { deps: CliDeps }) { const sentinel = await consumeRestartSentinel(); if (!sentinel) { return; @@ -76,30 +76,29 @@ export async function scheduleRestartSentinelWake(_params: { deps: CliDeps }) { sessionThreadId ?? (origin?.threadId != null ? String(origin.threadId) : undefined); - // Slack uses replyToId (thread_ts) for threading, not threadId. - // The reply path does this mapping but deliverOutboundPayloads does not, - // so we must convert here to ensure post-restart notifications land in - // the originating Slack thread. See #17716. - const isSlack = channel === "slack"; - const replyToId = isSlack && threadId != null && threadId !== "" ? String(threadId) : undefined; - const resolvedThreadId = isSlack ? undefined : threadId; - const outboundSession = buildOutboundSessionContext({ - cfg, - sessionKey, - }); - try { - await deliverOutboundPayloads({ - cfg, - channel, - to: resolved.to, - accountId: origin?.accountId, - replyToId, - threadId: resolvedThreadId, - payloads: [{ text: message }], - session: outboundSession, - bestEffort: true, - }); + // Use agentCommand() rather than deliverOutboundPayloads() so the restart + // message is a proper agent turn: the user is notified AND the agent sees + // the message in its conversation history and can resume autonomously. + // + // This is safe post-restart because scheduleRestartSentinelWake() runs in + // the new process, where there are zero in-flight replies. The pre-restart + // race condition fixed in ab4a08a82 does not apply here. + await agentCommand( + { + message, + sessionKey, + to: resolved.to, + channel, + deliver: true, + bestEffortDeliver: true, + messageChannel: channel, + threadId, + accountId: origin?.accountId, + }, + defaultRuntime, + params.deps, + ); } catch (err) { enqueueSystemEvent(`${summary}\n${String(err)}`, { sessionKey }); }