From d78101ce0b7ee950f9f02a6c8da818ebde5f9d42 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=A8=E8=89=BA=E9=9F=AC=28yangyitao=29?= Date: Fri, 13 Mar 2026 07:34:24 +0000 Subject: [PATCH] fix(cron): pass heartbeat target=last for main-session next-heartbeat jobs systemEvent cron jobs with sessionTarget="main" and default wakeMode ("next-heartbeat") were silently swallowed: the heartbeat fired but delivered to target="none" (the config default), so no response reached any channel. The wakeMode="now" path already applied heartbeat: { target: "last" } (see #28508). Apply the same override for the next-heartbeat path by threading heartbeat through requestHeartbeatNow all the way from the cron timer through the wake queue to the heartbeat runner. Fixes #40261 #40266 --- ...n-job-passes-heartbeat-target-last.test.ts | 9 +++++++- src/cron/service/state.ts | 7 +++++- src/cron/service/timer.ts | 6 +++++ src/gateway/server-cron.ts | 1 + src/infra/heartbeat-runner.ts | 13 +++++++++-- src/infra/heartbeat-wake.ts | 23 +++++++++++++++---- 6 files changed, 51 insertions(+), 8 deletions(-) diff --git a/src/cron/service.main-job-passes-heartbeat-target-last.test.ts b/src/cron/service.main-job-passes-heartbeat-target-last.test.ts index 39959f63207..2de501d00dc 100644 --- a/src/cron/service.main-job-passes-heartbeat-target-last.test.ts +++ b/src/cron/service.main-job-passes-heartbeat-target-last.test.ts @@ -88,7 +88,7 @@ describe("cron main job passes heartbeat target=last", () => { expect(callArgs?.heartbeat?.target).toBe("last"); }); - it("should not pass heartbeat target for wakeMode=next-heartbeat main jobs", async () => { + it("should pass heartbeat.target=last to requestHeartbeatNow for wakeMode=next-heartbeat main jobs", async () => { const { storePath } = await makeStorePath(); const now = Date.now(); @@ -116,5 +116,12 @@ describe("cron main job passes heartbeat target=last", () => { expect(requestHeartbeatNow).toHaveBeenCalled(); // runHeartbeatOnce should NOT have been called for next-heartbeat mode expect(runHeartbeatOnce).not.toHaveBeenCalled(); + + // The heartbeat override should include target: "last" so the heartbeat + // runner delivers the response to the last active channel. + const callArgs = requestHeartbeatNow.mock.calls[0]?.[0]; + expect(callArgs).toBeDefined(); + expect(callArgs?.heartbeat).toBeDefined(); + expect(callArgs?.heartbeat?.target).toBe("last"); }); }); diff --git a/src/cron/service/state.ts b/src/cron/service/state.ts index 073efd8f459..1a179149a87 100644 --- a/src/cron/service/state.ts +++ b/src/cron/service/state.ts @@ -64,7 +64,12 @@ export type CronServiceDeps = { text: string, opts?: { agentId?: string; sessionKey?: string; contextKey?: string }, ) => void; - requestHeartbeatNow: (opts?: { reason?: string; agentId?: string; sessionKey?: string }) => void; + requestHeartbeatNow: (opts?: { + reason?: string; + agentId?: string; + sessionKey?: string; + heartbeat?: { target?: string }; + }) => void; runHeartbeatOnce?: (opts?: { reason?: string; agentId?: string; diff --git a/src/cron/service/timer.ts b/src/cron/service/timer.ts index e12c4ae38e7..71f75f39088 100644 --- a/src/cron/service/timer.ts +++ b/src/cron/service/timer.ts @@ -1097,6 +1097,7 @@ export async function executeJobCore( reason, agentId: job.agentId, sessionKey: targetMainSessionKey, + heartbeat: { target: "last" }, }); return { status: "ok", summary: text }; } @@ -1118,6 +1119,11 @@ export async function executeJobCore( reason: `cron:${job.id}`, agentId: job.agentId, sessionKey: targetMainSessionKey, + // Cron-triggered heartbeats should deliver to the last active channel. + // Without this override, heartbeat target defaults to "none" and cron + // main-session responses are silently swallowed (same fix as wakeMode=now). + // See: https://github.com/openclaw/openclaw/issues/28508 + heartbeat: { target: "last" }, }); return { status: "ok", summary: text }; } diff --git a/src/gateway/server-cron.ts b/src/gateway/server-cron.ts index 8a288866721..9d014ae9966 100644 --- a/src/gateway/server-cron.ts +++ b/src/gateway/server-cron.ts @@ -250,6 +250,7 @@ export function buildGatewayCronService(params: { reason: opts?.reason, agentId, sessionKey, + ...(opts?.heartbeat ? { heartbeat: opts.heartbeat } : {}), }); }, runHeartbeatOnce: async (opts) => { diff --git a/src/infra/heartbeat-runner.ts b/src/infra/heartbeat-runner.ts index 5e6ddcf07cf..040b0e61abe 100644 --- a/src/infra/heartbeat-runner.ts +++ b/src/infra/heartbeat-runner.ts @@ -1051,7 +1051,12 @@ export function startHeartbeatRunner(opts: { scheduleNext(); }; - const run: HeartbeatWakeHandler = async (params) => { + const run: HeartbeatWakeHandler = async (params: { + reason?: string; + agentId?: string; + sessionKey?: string; + heartbeat?: { target?: string }; + }) => { if (state.stopped) { return { status: "skipped", @@ -1087,10 +1092,13 @@ export function startHeartbeatRunner(opts: { return { status: "skipped", reason: "disabled" }; } try { + const mergedHeartbeat = params.heartbeat + ? { ...targetAgent.heartbeat, ...params.heartbeat } + : targetAgent.heartbeat; const res = await runOnce({ cfg: state.cfg, agentId: targetAgent.agentId, - heartbeat: targetAgent.heartbeat, + heartbeat: mergedHeartbeat, reason, sessionKey: requestedSessionKey, deps: { runtime: state.runtime }, @@ -1160,6 +1168,7 @@ export function startHeartbeatRunner(opts: { reason: params.reason, agentId: params.agentId, sessionKey: params.sessionKey, + heartbeat: params.heartbeat, }); const disposeWakeHandler = setHeartbeatWakeHandler(wakeHandler); updateConfig(state.cfg); diff --git a/src/infra/heartbeat-wake.ts b/src/infra/heartbeat-wake.ts index 3aaaca5ed96..6faed2844e6 100644 --- a/src/infra/heartbeat-wake.ts +++ b/src/infra/heartbeat-wake.ts @@ -13,6 +13,7 @@ export type HeartbeatWakeHandler = (opts: { reason?: string; agentId?: string; sessionKey?: string; + heartbeat?: { target?: string }; }) => Promise; let heartbeatsEnabled = true; @@ -32,6 +33,7 @@ type PendingWakeReason = { requestedAt: number; agentId?: string; sessionKey?: string; + heartbeat?: { target?: string }; }; let handler: HeartbeatWakeHandler | null = null; @@ -86,6 +88,7 @@ function queuePendingWakeReason(params?: { requestedAt?: number; agentId?: string; sessionKey?: string; + heartbeat?: { target?: string }; }) { const requestedAt = params?.requestedAt ?? Date.now(); const normalizedReason = normalizeWakeReason(params?.reason); @@ -101,18 +104,23 @@ function queuePendingWakeReason(params?: { requestedAt, agentId: normalizedAgentId, sessionKey: normalizedSessionKey, + ...(params?.heartbeat ? { heartbeat: params.heartbeat } : {}), }; const previous = pendingWakes.get(wakeTargetKey); if (!previous) { pendingWakes.set(wakeTargetKey, next); return; } - if (next.priority > previous.priority) { - pendingWakes.set(wakeTargetKey, next); + // Preserve the previous heartbeat override unless the replacing wake explicitly sets a new one. + // This prevents a subsequent unscoped wake from silently dropping a cron target="last" override + // that was queued in the same coalescing window. + const mergedNext = next.heartbeat ? next : { ...next, heartbeat: previous.heartbeat }; + if (mergedNext.priority > previous.priority) { + pendingWakes.set(wakeTargetKey, mergedNext); return; } - if (next.priority === previous.priority && next.requestedAt >= previous.requestedAt) { - pendingWakes.set(wakeTargetKey, next); + if (mergedNext.priority === previous.priority && mergedNext.requestedAt >= previous.requestedAt) { + pendingWakes.set(wakeTargetKey, mergedNext); } } @@ -161,25 +169,30 @@ function schedule(coalesceMs: number, kind: WakeTimerKind = "normal") { reason: pendingWake.reason ?? undefined, ...(pendingWake.agentId ? { agentId: pendingWake.agentId } : {}), ...(pendingWake.sessionKey ? { sessionKey: pendingWake.sessionKey } : {}), + ...(pendingWake.heartbeat ? { heartbeat: pendingWake.heartbeat } : {}), }; const res = await active(wakeOpts); if (res.status === "skipped" && res.reason === "requests-in-flight") { // The main lane is busy; retry this wake target soon. + // Preserve the heartbeat override so cron target=last is not lost on retry. queuePendingWakeReason({ reason: pendingWake.reason ?? "retry", agentId: pendingWake.agentId, sessionKey: pendingWake.sessionKey, + heartbeat: pendingWake.heartbeat, }); schedule(DEFAULT_RETRY_MS, "retry"); } } } catch { // Error is already logged by the heartbeat runner; schedule a retry. + // Preserve the heartbeat override so cron target=last is not lost on retry. for (const pendingWake of pendingBatch) { queuePendingWakeReason({ reason: pendingWake.reason ?? "retry", agentId: pendingWake.agentId, sessionKey: pendingWake.sessionKey, + heartbeat: pendingWake.heartbeat, }); } schedule(DEFAULT_RETRY_MS, "retry"); @@ -240,11 +253,13 @@ export function requestHeartbeatNow(opts?: { coalesceMs?: number; agentId?: string; sessionKey?: string; + heartbeat?: { target?: string }; }) { queuePendingWakeReason({ reason: opts?.reason, agentId: opts?.agentId, sessionKey: opts?.sessionKey, + ...(opts?.heartbeat ? { heartbeat: opts.heartbeat } : {}), }); schedule(opts?.coalesceMs ?? DEFAULT_COALESCE_MS, "normal"); }