fix(cron): pass heartbeat target=last for main-session next-heartbeat jobs
systemEvent cron jobs with sessionTarget="main" and default wakeMode
("next-heartbeat") were silently swallowed: the heartbeat fired but
delivered to target="none" (the config default), so no response reached
any channel.
The wakeMode="now" path already applied heartbeat: { target: "last" }
(see #28508). Apply the same override for the next-heartbeat path by
threading heartbeat through requestHeartbeatNow all the way from the
cron timer through the wake queue to the heartbeat runner.
Fixes #40261 #40266
This commit is contained in:
parent
5e417b44e1
commit
d78101ce0b
@ -88,7 +88,7 @@ describe("cron main job passes heartbeat target=last", () => {
|
||||
expect(callArgs?.heartbeat?.target).toBe("last");
|
||||
});
|
||||
|
||||
it("should not pass heartbeat target for wakeMode=next-heartbeat main jobs", async () => {
|
||||
it("should pass heartbeat.target=last to requestHeartbeatNow for wakeMode=next-heartbeat main jobs", async () => {
|
||||
const { storePath } = await makeStorePath();
|
||||
const now = Date.now();
|
||||
|
||||
@ -116,5 +116,12 @@ describe("cron main job passes heartbeat target=last", () => {
|
||||
expect(requestHeartbeatNow).toHaveBeenCalled();
|
||||
// runHeartbeatOnce should NOT have been called for next-heartbeat mode
|
||||
expect(runHeartbeatOnce).not.toHaveBeenCalled();
|
||||
|
||||
// The heartbeat override should include target: "last" so the heartbeat
|
||||
// runner delivers the response to the last active channel.
|
||||
const callArgs = requestHeartbeatNow.mock.calls[0]?.[0];
|
||||
expect(callArgs).toBeDefined();
|
||||
expect(callArgs?.heartbeat).toBeDefined();
|
||||
expect(callArgs?.heartbeat?.target).toBe("last");
|
||||
});
|
||||
});
|
||||
|
||||
@ -64,7 +64,12 @@ export type CronServiceDeps = {
|
||||
text: string,
|
||||
opts?: { agentId?: string; sessionKey?: string; contextKey?: string },
|
||||
) => void;
|
||||
requestHeartbeatNow: (opts?: { reason?: string; agentId?: string; sessionKey?: string }) => void;
|
||||
requestHeartbeatNow: (opts?: {
|
||||
reason?: string;
|
||||
agentId?: string;
|
||||
sessionKey?: string;
|
||||
heartbeat?: { target?: string };
|
||||
}) => void;
|
||||
runHeartbeatOnce?: (opts?: {
|
||||
reason?: string;
|
||||
agentId?: string;
|
||||
|
||||
@ -1097,6 +1097,7 @@ export async function executeJobCore(
|
||||
reason,
|
||||
agentId: job.agentId,
|
||||
sessionKey: targetMainSessionKey,
|
||||
heartbeat: { target: "last" },
|
||||
});
|
||||
return { status: "ok", summary: text };
|
||||
}
|
||||
@ -1118,6 +1119,11 @@ export async function executeJobCore(
|
||||
reason: `cron:${job.id}`,
|
||||
agentId: job.agentId,
|
||||
sessionKey: targetMainSessionKey,
|
||||
// Cron-triggered heartbeats should deliver to the last active channel.
|
||||
// Without this override, heartbeat target defaults to "none" and cron
|
||||
// main-session responses are silently swallowed (same fix as wakeMode=now).
|
||||
// See: https://github.com/openclaw/openclaw/issues/28508
|
||||
heartbeat: { target: "last" },
|
||||
});
|
||||
return { status: "ok", summary: text };
|
||||
}
|
||||
|
||||
@ -250,6 +250,7 @@ export function buildGatewayCronService(params: {
|
||||
reason: opts?.reason,
|
||||
agentId,
|
||||
sessionKey,
|
||||
...(opts?.heartbeat ? { heartbeat: opts.heartbeat } : {}),
|
||||
});
|
||||
},
|
||||
runHeartbeatOnce: async (opts) => {
|
||||
|
||||
@ -1051,7 +1051,12 @@ export function startHeartbeatRunner(opts: {
|
||||
scheduleNext();
|
||||
};
|
||||
|
||||
const run: HeartbeatWakeHandler = async (params) => {
|
||||
const run: HeartbeatWakeHandler = async (params: {
|
||||
reason?: string;
|
||||
agentId?: string;
|
||||
sessionKey?: string;
|
||||
heartbeat?: { target?: string };
|
||||
}) => {
|
||||
if (state.stopped) {
|
||||
return {
|
||||
status: "skipped",
|
||||
@ -1087,10 +1092,13 @@ export function startHeartbeatRunner(opts: {
|
||||
return { status: "skipped", reason: "disabled" };
|
||||
}
|
||||
try {
|
||||
const mergedHeartbeat = params.heartbeat
|
||||
? { ...targetAgent.heartbeat, ...params.heartbeat }
|
||||
: targetAgent.heartbeat;
|
||||
const res = await runOnce({
|
||||
cfg: state.cfg,
|
||||
agentId: targetAgent.agentId,
|
||||
heartbeat: targetAgent.heartbeat,
|
||||
heartbeat: mergedHeartbeat,
|
||||
reason,
|
||||
sessionKey: requestedSessionKey,
|
||||
deps: { runtime: state.runtime },
|
||||
@ -1160,6 +1168,7 @@ export function startHeartbeatRunner(opts: {
|
||||
reason: params.reason,
|
||||
agentId: params.agentId,
|
||||
sessionKey: params.sessionKey,
|
||||
heartbeat: params.heartbeat,
|
||||
});
|
||||
const disposeWakeHandler = setHeartbeatWakeHandler(wakeHandler);
|
||||
updateConfig(state.cfg);
|
||||
|
||||
@ -13,6 +13,7 @@ export type HeartbeatWakeHandler = (opts: {
|
||||
reason?: string;
|
||||
agentId?: string;
|
||||
sessionKey?: string;
|
||||
heartbeat?: { target?: string };
|
||||
}) => Promise<HeartbeatRunResult>;
|
||||
|
||||
let heartbeatsEnabled = true;
|
||||
@ -32,6 +33,7 @@ type PendingWakeReason = {
|
||||
requestedAt: number;
|
||||
agentId?: string;
|
||||
sessionKey?: string;
|
||||
heartbeat?: { target?: string };
|
||||
};
|
||||
|
||||
let handler: HeartbeatWakeHandler | null = null;
|
||||
@ -86,6 +88,7 @@ function queuePendingWakeReason(params?: {
|
||||
requestedAt?: number;
|
||||
agentId?: string;
|
||||
sessionKey?: string;
|
||||
heartbeat?: { target?: string };
|
||||
}) {
|
||||
const requestedAt = params?.requestedAt ?? Date.now();
|
||||
const normalizedReason = normalizeWakeReason(params?.reason);
|
||||
@ -101,18 +104,23 @@ function queuePendingWakeReason(params?: {
|
||||
requestedAt,
|
||||
agentId: normalizedAgentId,
|
||||
sessionKey: normalizedSessionKey,
|
||||
...(params?.heartbeat ? { heartbeat: params.heartbeat } : {}),
|
||||
};
|
||||
const previous = pendingWakes.get(wakeTargetKey);
|
||||
if (!previous) {
|
||||
pendingWakes.set(wakeTargetKey, next);
|
||||
return;
|
||||
}
|
||||
if (next.priority > previous.priority) {
|
||||
pendingWakes.set(wakeTargetKey, next);
|
||||
// Preserve the previous heartbeat override unless the replacing wake explicitly sets a new one.
|
||||
// This prevents a subsequent unscoped wake from silently dropping a cron target="last" override
|
||||
// that was queued in the same coalescing window.
|
||||
const mergedNext = next.heartbeat ? next : { ...next, heartbeat: previous.heartbeat };
|
||||
if (mergedNext.priority > previous.priority) {
|
||||
pendingWakes.set(wakeTargetKey, mergedNext);
|
||||
return;
|
||||
}
|
||||
if (next.priority === previous.priority && next.requestedAt >= previous.requestedAt) {
|
||||
pendingWakes.set(wakeTargetKey, next);
|
||||
if (mergedNext.priority === previous.priority && mergedNext.requestedAt >= previous.requestedAt) {
|
||||
pendingWakes.set(wakeTargetKey, mergedNext);
|
||||
}
|
||||
}
|
||||
|
||||
@ -161,25 +169,30 @@ function schedule(coalesceMs: number, kind: WakeTimerKind = "normal") {
|
||||
reason: pendingWake.reason ?? undefined,
|
||||
...(pendingWake.agentId ? { agentId: pendingWake.agentId } : {}),
|
||||
...(pendingWake.sessionKey ? { sessionKey: pendingWake.sessionKey } : {}),
|
||||
...(pendingWake.heartbeat ? { heartbeat: pendingWake.heartbeat } : {}),
|
||||
};
|
||||
const res = await active(wakeOpts);
|
||||
if (res.status === "skipped" && res.reason === "requests-in-flight") {
|
||||
// The main lane is busy; retry this wake target soon.
|
||||
// Preserve the heartbeat override so cron target=last is not lost on retry.
|
||||
queuePendingWakeReason({
|
||||
reason: pendingWake.reason ?? "retry",
|
||||
agentId: pendingWake.agentId,
|
||||
sessionKey: pendingWake.sessionKey,
|
||||
heartbeat: pendingWake.heartbeat,
|
||||
});
|
||||
schedule(DEFAULT_RETRY_MS, "retry");
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
// Error is already logged by the heartbeat runner; schedule a retry.
|
||||
// Preserve the heartbeat override so cron target=last is not lost on retry.
|
||||
for (const pendingWake of pendingBatch) {
|
||||
queuePendingWakeReason({
|
||||
reason: pendingWake.reason ?? "retry",
|
||||
agentId: pendingWake.agentId,
|
||||
sessionKey: pendingWake.sessionKey,
|
||||
heartbeat: pendingWake.heartbeat,
|
||||
});
|
||||
}
|
||||
schedule(DEFAULT_RETRY_MS, "retry");
|
||||
@ -240,11 +253,13 @@ export function requestHeartbeatNow(opts?: {
|
||||
coalesceMs?: number;
|
||||
agentId?: string;
|
||||
sessionKey?: string;
|
||||
heartbeat?: { target?: string };
|
||||
}) {
|
||||
queuePendingWakeReason({
|
||||
reason: opts?.reason,
|
||||
agentId: opts?.agentId,
|
||||
sessionKey: opts?.sessionKey,
|
||||
...(opts?.heartbeat ? { heartbeat: opts.heartbeat } : {}),
|
||||
});
|
||||
schedule(opts?.coalesceMs ?? DEFAULT_COALESCE_MS, "normal");
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user