diff --git a/CHANGELOG.md b/CHANGELOG.md index abe57c8108b..bc8631d80cb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,10 +8,12 @@ Docs: https://docs.openclaw.ai - Commands/btw: add `/btw` side questions for quick tool-less answers about the current session without changing future session context, with dismissible in-session TUI answers and explicit BTW replies on external channels. (#45444) Thanks @ngutman. - Refactor/channels: remove the legacy channel shim directories and point channel-specific imports directly at the extension-owned implementations. (#45967) thanks @scoootscooob. +- Feishu/streaming: add `onReasoningStream` and `onReasoningEnd` support to streaming cards, so `/reasoning stream` renders thinking tokens as markdown blockquotes in the same card — matching the Telegram channel's reasoning lane behavior. ### Fixes - Z.AI/onboarding: detect a working default model even for explicit `zai-coding-*` endpoint choices, so Coding Plan setup can keep the selected endpoint while defaulting to `glm-5` when available or `glm-4.7` as fallback. (#45969) +- Zalo/plugin runtime: export `resolveClientIp` from `openclaw/plugin-sdk/zalo` so installed builds no longer crash on startup when the webhook monitor loads from the packaged extension instead of the monorepo source tree. (#46549) Thanks @No898. - Control UI/chat sessions: show human-readable labels in the grouped session dropdown again, keep unique scoped fallbacks when metadata is missing, and disambiguate duplicate labels only when needed. (#45130) thanks @luzhidong. - Configure/startup: move outbound send-deps resolution into a lightweight helper so `openclaw configure` no longer stalls after the banner while eagerly loading channel plugins. (#46301) thanks @scoootscooob. @@ -19,7 +21,6 @@ Docs: https://docs.openclaw.ai - Slack/interactive replies: preserve `channelData.slack.blocks` through live DM delivery and preview-finalized edits so Block Kit button and select directives render instead of falling back to raw text. Thanks @vincentkoc. - CI/channel test routing: move the built-in channel suites into `test:channels` and keep them out of `test:extensions`, so extension CI no longer fails after the channel migration while targeted test routing still sends Slack, Signal, and iMessage suites to the right lane. (#46066) Thanks @scoootscooob. -- Agents/usage tracking: stop forcing `supportsUsageInStreaming: false` on non-native openai-completions endpoints so providers like DashScope, DeepSeek, and other OpenAI-compatible backends report token usage and cost instead of showing all zeros. (#46142) - Node/startup: remove leftover debug `console.log("node host PATH: ...")` that printed the resolved PATH on every `openclaw node run` invocation. (#46411) - Control UI/dashboard: preserve structured gateway shutdown reasons across restart disconnects so config-triggered restarts no longer fall back to `disconnected (1006): no reason`. (#46532) Thanks @vincentkoc. - Feishu/topic threads: fetch full thread context, including prior bot replies, when starting a topic-thread session so follow-up turns in Feishu topics keep the right conversation state. Thanks @Coobiw. @@ -99,6 +100,8 @@ Docs: https://docs.openclaw.ai - Mattermost/thread routing: non-inbound reply paths (TUI/WebUI turns, tool-call callbacks, subagent responses) now correctly route to the originating Mattermost thread when `replyToMode: "all"` is active; also prevents stale `origin.threadId` metadata from resurrecting cleared thread routes. (#44283) thanks @teconomix - Gateway/websocket pairing bypass for disabled auth: skip device-pairing enforcement when `gateway.auth.mode=none` so Control UI connections behind reverse proxies no longer get stuck on `pairing required` (code 1008) despite auth being explicitly disabled. (#42931) - Auth/login lockout recovery: clear stale `auth_permanent` and `billing` disabled state for all profiles matching the target provider when `openclaw models auth login` is invoked, so users locked out by expired or revoked OAuth tokens can recover by re-authenticating instead of waiting for the cooldown timer to expire. (#43057) +- Auto-reply/context-engine compaction: persist the exact embedded-run metadata compaction count for main and followup runner session accounting, so metadata-only auto-compactions no longer undercount multi-compaction runs. (#42629) thanks @uf-hy. +- Auth/Codex CLI reuse: sync reused Codex CLI credentials into the supported `openai-codex:default` OAuth profile instead of reviving the deprecated `openai-codex:codex-cli` slot, so doctor cleanup no longer loops. (#45353) thanks @Gugu-sugar. ## 2026.3.12 diff --git a/docs/.generated/config-baseline.json b/docs/.generated/config-baseline.json index ed851997bac..cf872fcd62d 100644 --- a/docs/.generated/config-baseline.json +++ b/docs/.generated/config-baseline.json @@ -1484,6 +1484,16 @@ "tags": [], "hasChildren": false }, + { + "path": "agents.defaults.heartbeat.isolatedSession", + "kind": "core", + "type": "boolean", + "required": false, + "deprecated": false, + "sensitive": false, + "tags": [], + "hasChildren": false + }, { "path": "agents.defaults.heartbeat.lightContext", "kind": "core", @@ -1544,7 +1554,7 @@ "deprecated": false, "sensitive": false, "tags": ["automation"], - "help": "Delivery target (\"last\", \"none\", or a channel id). Known channels: telegram, whatsapp, discord, irc, googlechat, slack, signal, imessage, line, bluebubbles, feishu, matrix, mattermost, msteams, nextcloud-talk, nostr, synology-chat, tlon, twitch, zalo, zalouser.", + "help": "Delivery target (\"last\", \"none\", or a channel id). Known channels: telegram, whatsapp, discord, irc, googlechat, slack, signal, imessage, line, zalouser, zalo, tlon, feishu, nextcloud-talk, msteams, bluebubbles, synology-chat, mattermost, twitch, matrix, nostr.", "hasChildren": false }, { @@ -3647,6 +3657,16 @@ "tags": [], "hasChildren": false }, + { + "path": "agents.list.*.heartbeat.isolatedSession", + "kind": "core", + "type": "boolean", + "required": false, + "deprecated": false, + "sensitive": false, + "tags": [], + "hasChildren": false + }, { "path": "agents.list.*.heartbeat.lightContext", "kind": "core", @@ -3707,7 +3727,7 @@ "deprecated": false, "sensitive": false, "tags": ["automation"], - "help": "Delivery target (\"last\", \"none\", or a channel id). Known channels: telegram, whatsapp, discord, irc, googlechat, slack, signal, imessage, line, bluebubbles, feishu, matrix, mattermost, msteams, nextcloud-talk, nostr, synology-chat, tlon, twitch, zalo, zalouser.", + "help": "Delivery target (\"last\", \"none\", or a channel id). Known channels: telegram, whatsapp, discord, irc, googlechat, slack, signal, imessage, line, zalouser, zalo, tlon, feishu, nextcloud-talk, msteams, bluebubbles, synology-chat, mattermost, twitch, matrix, nostr.", "hasChildren": false }, { diff --git a/docs/.generated/config-baseline.jsonl b/docs/.generated/config-baseline.jsonl index 4ea706c3ad3..be2c579b614 100644 --- a/docs/.generated/config-baseline.jsonl +++ b/docs/.generated/config-baseline.jsonl @@ -1,4 +1,4 @@ -{"generatedBy":"scripts/generate-config-doc-baseline.ts","recordType":"meta","totalPaths":4731} +{"generatedBy":"scripts/generate-config-doc-baseline.ts","recordType":"meta","totalPaths":4733} {"recordType":"path","path":"acp","kind":"core","type":"object","required":false,"deprecated":false,"sensitive":false,"tags":["advanced"],"label":"ACP","help":"ACP runtime controls for enabling dispatch, selecting backends, constraining allowed agent targets, and tuning streamed turn projection behavior.","hasChildren":true} {"recordType":"path","path":"acp.allowedAgents","kind":"core","type":"array","required":false,"deprecated":false,"sensitive":false,"tags":["access"],"label":"ACP Allowed Agents","help":"Allowlist of ACP target agent ids permitted for ACP runtime sessions. Empty means no additional allowlist restriction.","hasChildren":true} {"recordType":"path","path":"acp.allowedAgents.*","kind":"core","type":"string","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":false} @@ -137,12 +137,13 @@ {"recordType":"path","path":"agents.defaults.heartbeat.directPolicy","kind":"core","type":"string","required":false,"deprecated":false,"sensitive":false,"tags":["access","automation","storage"],"label":"Heartbeat Direct Policy","help":"Controls whether heartbeat delivery may target direct/DM chats: \"allow\" (default) permits DM delivery and \"block\" suppresses direct-target sends.","hasChildren":false} {"recordType":"path","path":"agents.defaults.heartbeat.every","kind":"core","type":"string","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":false} {"recordType":"path","path":"agents.defaults.heartbeat.includeReasoning","kind":"core","type":"boolean","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":false} +{"recordType":"path","path":"agents.defaults.heartbeat.isolatedSession","kind":"core","type":"boolean","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":false} {"recordType":"path","path":"agents.defaults.heartbeat.lightContext","kind":"core","type":"boolean","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":false} {"recordType":"path","path":"agents.defaults.heartbeat.model","kind":"core","type":"string","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":false} {"recordType":"path","path":"agents.defaults.heartbeat.prompt","kind":"core","type":"string","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":false} {"recordType":"path","path":"agents.defaults.heartbeat.session","kind":"core","type":"string","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":false} {"recordType":"path","path":"agents.defaults.heartbeat.suppressToolErrorWarnings","kind":"core","type":"boolean","required":false,"deprecated":false,"sensitive":false,"tags":["automation"],"label":"Heartbeat Suppress Tool Error Warnings","help":"Suppress tool error warning payloads during heartbeat runs.","hasChildren":false} -{"recordType":"path","path":"agents.defaults.heartbeat.target","kind":"core","type":"string","required":false,"deprecated":false,"sensitive":false,"tags":["automation"],"help":"Delivery target (\"last\", \"none\", or a channel id). Known channels: telegram, whatsapp, discord, irc, googlechat, slack, signal, imessage, line, bluebubbles, feishu, matrix, mattermost, msteams, nextcloud-talk, nostr, synology-chat, tlon, twitch, zalo, zalouser.","hasChildren":false} +{"recordType":"path","path":"agents.defaults.heartbeat.target","kind":"core","type":"string","required":false,"deprecated":false,"sensitive":false,"tags":["automation"],"help":"Delivery target (\"last\", \"none\", or a channel id). Known channels: telegram, whatsapp, discord, irc, googlechat, slack, signal, imessage, line, zalouser, zalo, tlon, feishu, nextcloud-talk, msteams, bluebubbles, synology-chat, mattermost, twitch, matrix, nostr.","hasChildren":false} {"recordType":"path","path":"agents.defaults.heartbeat.to","kind":"core","type":"string","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":false} {"recordType":"path","path":"agents.defaults.humanDelay","kind":"core","type":"object","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":true} {"recordType":"path","path":"agents.defaults.humanDelay.maxMs","kind":"core","type":"integer","required":false,"deprecated":false,"sensitive":false,"tags":["performance"],"label":"Human Delay Max (ms)","help":"Maximum delay in ms for custom humanDelay (default: 2500).","hasChildren":false} @@ -340,12 +341,13 @@ {"recordType":"path","path":"agents.list.*.heartbeat.directPolicy","kind":"core","type":"string","required":false,"deprecated":false,"sensitive":false,"tags":["access","automation","storage"],"label":"Heartbeat Direct Policy","help":"Per-agent override for heartbeat direct/DM delivery policy; use \"block\" for agents that should only send heartbeat alerts to non-DM destinations.","hasChildren":false} {"recordType":"path","path":"agents.list.*.heartbeat.every","kind":"core","type":"string","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":false} {"recordType":"path","path":"agents.list.*.heartbeat.includeReasoning","kind":"core","type":"boolean","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":false} +{"recordType":"path","path":"agents.list.*.heartbeat.isolatedSession","kind":"core","type":"boolean","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":false} {"recordType":"path","path":"agents.list.*.heartbeat.lightContext","kind":"core","type":"boolean","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":false} {"recordType":"path","path":"agents.list.*.heartbeat.model","kind":"core","type":"string","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":false} {"recordType":"path","path":"agents.list.*.heartbeat.prompt","kind":"core","type":"string","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":false} {"recordType":"path","path":"agents.list.*.heartbeat.session","kind":"core","type":"string","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":false} {"recordType":"path","path":"agents.list.*.heartbeat.suppressToolErrorWarnings","kind":"core","type":"boolean","required":false,"deprecated":false,"sensitive":false,"tags":["automation"],"label":"Agent Heartbeat Suppress Tool Error Warnings","help":"Suppress tool error warning payloads during heartbeat runs.","hasChildren":false} -{"recordType":"path","path":"agents.list.*.heartbeat.target","kind":"core","type":"string","required":false,"deprecated":false,"sensitive":false,"tags":["automation"],"help":"Delivery target (\"last\", \"none\", or a channel id). Known channels: telegram, whatsapp, discord, irc, googlechat, slack, signal, imessage, line, bluebubbles, feishu, matrix, mattermost, msteams, nextcloud-talk, nostr, synology-chat, tlon, twitch, zalo, zalouser.","hasChildren":false} +{"recordType":"path","path":"agents.list.*.heartbeat.target","kind":"core","type":"string","required":false,"deprecated":false,"sensitive":false,"tags":["automation"],"help":"Delivery target (\"last\", \"none\", or a channel id). Known channels: telegram, whatsapp, discord, irc, googlechat, slack, signal, imessage, line, zalouser, zalo, tlon, feishu, nextcloud-talk, msteams, bluebubbles, synology-chat, mattermost, twitch, matrix, nostr.","hasChildren":false} {"recordType":"path","path":"agents.list.*.heartbeat.to","kind":"core","type":"string","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":false} {"recordType":"path","path":"agents.list.*.humanDelay","kind":"core","type":"object","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":true} {"recordType":"path","path":"agents.list.*.humanDelay.maxMs","kind":"core","type":"integer","required":false,"deprecated":false,"sensitive":false,"tags":[],"hasChildren":false} diff --git a/docs/gateway/configuration-reference.md b/docs/gateway/configuration-reference.md index 658a3084437..badfe4ee891 100644 --- a/docs/gateway/configuration-reference.md +++ b/docs/gateway/configuration-reference.md @@ -975,6 +975,7 @@ Periodic heartbeat runs. model: "openai/gpt-5.2-mini", includeReasoning: false, lightContext: false, // default: false; true keeps only HEARTBEAT.md from workspace bootstrap files + isolatedSession: false, // default: false; true runs each heartbeat in a fresh session (no conversation history) session: "main", to: "+15555550123", directPolicy: "allow", // allow (default) | block @@ -992,6 +993,7 @@ Periodic heartbeat runs. - `suppressToolErrorWarnings`: when true, suppresses tool error warning payloads during heartbeat runs. - `directPolicy`: direct/DM delivery policy. `allow` (default) permits direct-target delivery. `block` suppresses direct-target delivery and emits `reason=dm-blocked`. - `lightContext`: when true, heartbeat runs use lightweight bootstrap context and keep only `HEARTBEAT.md` from workspace bootstrap files. +- `isolatedSession`: when true, each heartbeat runs in a fresh session with no prior conversation history. Same isolation pattern as cron `sessionTarget: "isolated"`. Reduces per-heartbeat token cost from ~100K to ~2-5K tokens. - Per-agent: set `agents.list[].heartbeat`. When any agent defines `heartbeat`, **only those agents** run heartbeats. - Heartbeats run full agent turns — shorter intervals burn more tokens. diff --git a/docs/gateway/heartbeat.md b/docs/gateway/heartbeat.md index 90c5d9d3c75..e0de2294cfa 100644 --- a/docs/gateway/heartbeat.md +++ b/docs/gateway/heartbeat.md @@ -22,7 +22,8 @@ Troubleshooting: [/automation/troubleshooting](/automation/troubleshooting) 3. Decide where heartbeat messages should go (`target: "none"` is the default; set `target: "last"` to route to the last contact). 4. Optional: enable heartbeat reasoning delivery for transparency. 5. Optional: use lightweight bootstrap context if heartbeat runs only need `HEARTBEAT.md`. -6. Optional: restrict heartbeats to active hours (local time). +6. Optional: enable isolated sessions to avoid sending full conversation history each heartbeat. +7. Optional: restrict heartbeats to active hours (local time). Example config: @@ -35,6 +36,7 @@ Example config: target: "last", // explicit delivery to last contact (default is "none") directPolicy: "allow", // default: allow direct/DM targets; set "block" to suppress lightContext: true, // optional: only inject HEARTBEAT.md from bootstrap files + isolatedSession: true, // optional: fresh session each run (no conversation history) // activeHours: { start: "08:00", end: "24:00" }, // includeReasoning: true, // optional: send separate `Reasoning:` message too }, @@ -91,6 +93,7 @@ and logged; a message that is only `HEARTBEAT_OK` is dropped. model: "anthropic/claude-opus-4-6", includeReasoning: false, // default: false (deliver separate Reasoning: message when available) lightContext: false, // default: false; true keeps only HEARTBEAT.md from workspace bootstrap files + isolatedSession: false, // default: false; true runs each heartbeat in a fresh session (no conversation history) target: "last", // default: none | options: last | none | (core or plugin, e.g. "bluebubbles") to: "+15551234567", // optional channel-specific override accountId: "ops-bot", // optional multi-account channel id @@ -212,6 +215,7 @@ Use `accountId` to target a specific account on multi-account channels like Tele - `model`: optional model override for heartbeat runs (`provider/model`). - `includeReasoning`: when enabled, also deliver the separate `Reasoning:` message when available (same shape as `/reasoning on`). - `lightContext`: when true, heartbeat runs use lightweight bootstrap context and keep only `HEARTBEAT.md` from workspace bootstrap files. +- `isolatedSession`: when true, each heartbeat runs in a fresh session with no prior conversation history. Uses the same isolation pattern as cron `sessionTarget: "isolated"`. Dramatically reduces per-heartbeat token cost. Combine with `lightContext: true` for maximum savings. Delivery routing still uses the main session context. - `session`: optional session key for heartbeat runs. - `main` (default): agent main session. - Explicit session key (copy from `openclaw sessions --json` or the [sessions CLI](/cli/sessions)). @@ -380,6 +384,10 @@ off in group chats. ## Cost awareness -Heartbeats run full agent turns. Shorter intervals burn more tokens. Keep -`HEARTBEAT.md` small and consider a cheaper `model` or `target: "none"` if you -only want internal state updates. +Heartbeats run full agent turns. Shorter intervals burn more tokens. To reduce cost: + +- Use `isolatedSession: true` to avoid sending full conversation history (~100K tokens down to ~2-5K per run). +- Use `lightContext: true` to limit bootstrap files to just `HEARTBEAT.md`. +- Set a cheaper `model` (e.g. `ollama/llama3.2:1b`). +- Keep `HEARTBEAT.md` small. +- Use `target: "none"` if you only want internal state updates. diff --git a/extensions/feishu/src/reply-dispatcher.test.ts b/extensions/feishu/src/reply-dispatcher.test.ts index 338953a7d6d..3f20a594e25 100644 --- a/extensions/feishu/src/reply-dispatcher.test.ts +++ b/extensions/feishu/src/reply-dispatcher.test.ts @@ -462,6 +462,126 @@ describe("createFeishuReplyDispatcher streaming behavior", () => { ); }); + it("streams reasoning content as blockquote before answer", async () => { + const { result, options } = createDispatcherHarness({ + runtime: createRuntimeLogger(), + }); + + await options.onReplyStart?.(); + // Core agent sends pre-formatted text from formatReasoningMessage + result.replyOptions.onReasoningStream?.({ text: "Reasoning:\n_thinking step 1_" }); + result.replyOptions.onReasoningStream?.({ + text: "Reasoning:\n_thinking step 1_\n_step 2_", + }); + result.replyOptions.onPartialReply?.({ text: "answer part" }); + result.replyOptions.onReasoningEnd?.(); + await options.deliver({ text: "answer part final" }, { kind: "final" }); + + expect(streamingInstances).toHaveLength(1); + const updateCalls = streamingInstances[0].update.mock.calls.map((c: unknown[]) => c[0]); + const reasoningUpdate = updateCalls.find((c: string) => c.includes("Thinking")); + expect(reasoningUpdate).toContain("> 💭 **Thinking**"); + // formatReasoningPrefix strips "Reasoning:" prefix and italic markers + expect(reasoningUpdate).toContain("> thinking step"); + expect(reasoningUpdate).not.toContain("Reasoning:"); + expect(reasoningUpdate).not.toMatch(/> _.*_/); + + const combinedUpdate = updateCalls.find( + (c: string) => c.includes("Thinking") && c.includes("---"), + ); + expect(combinedUpdate).toBeDefined(); + + expect(streamingInstances[0].close).toHaveBeenCalledTimes(1); + const closeArg = streamingInstances[0].close.mock.calls[0][0] as string; + expect(closeArg).toContain("> 💭 **Thinking**"); + expect(closeArg).toContain("---"); + expect(closeArg).toContain("answer part final"); + }); + + it("provides onReasoningStream and onReasoningEnd when streaming is enabled", () => { + const { result } = createDispatcherHarness({ + runtime: createRuntimeLogger(), + }); + + expect(result.replyOptions.onReasoningStream).toBeTypeOf("function"); + expect(result.replyOptions.onReasoningEnd).toBeTypeOf("function"); + }); + + it("omits reasoning callbacks when streaming is disabled", () => { + resolveFeishuAccountMock.mockReturnValue({ + accountId: "main", + appId: "app_id", + appSecret: "app_secret", + domain: "feishu", + config: { + renderMode: "auto", + streaming: false, + }, + }); + + const { result } = createDispatcherHarness({ + runtime: createRuntimeLogger(), + }); + + expect(result.replyOptions.onReasoningStream).toBeUndefined(); + expect(result.replyOptions.onReasoningEnd).toBeUndefined(); + }); + + it("renders reasoning-only card when no answer text arrives", async () => { + const { result, options } = createDispatcherHarness({ + runtime: createRuntimeLogger(), + }); + + await options.onReplyStart?.(); + result.replyOptions.onReasoningStream?.({ text: "Reasoning:\n_deep thought_" }); + result.replyOptions.onReasoningEnd?.(); + await options.onIdle?.(); + + expect(streamingInstances).toHaveLength(1); + expect(streamingInstances[0].close).toHaveBeenCalledTimes(1); + const closeArg = streamingInstances[0].close.mock.calls[0][0] as string; + expect(closeArg).toContain("> 💭 **Thinking**"); + expect(closeArg).toContain("> deep thought"); + expect(closeArg).not.toContain("Reasoning:"); + expect(closeArg).not.toContain("---"); + }); + + it("ignores empty reasoning payloads", async () => { + const { result, options } = createDispatcherHarness({ + runtime: createRuntimeLogger(), + }); + + await options.onReplyStart?.(); + result.replyOptions.onReasoningStream?.({ text: "" }); + result.replyOptions.onPartialReply?.({ text: "```ts\ncode\n```" }); + await options.deliver({ text: "```ts\ncode\n```" }, { kind: "final" }); + + expect(streamingInstances).toHaveLength(1); + const closeArg = streamingInstances[0].close.mock.calls[0][0] as string; + expect(closeArg).not.toContain("Thinking"); + expect(closeArg).toBe("```ts\ncode\n```"); + }); + + it("deduplicates final text by raw answer payload, not combined card text", async () => { + const { result, options } = createDispatcherHarness({ + runtime: createRuntimeLogger(), + }); + + await options.onReplyStart?.(); + result.replyOptions.onReasoningStream?.({ text: "Reasoning:\n_thought_" }); + result.replyOptions.onReasoningEnd?.(); + await options.deliver({ text: "```ts\nfinal answer\n```" }, { kind: "final" }); + + expect(streamingInstances).toHaveLength(1); + expect(streamingInstances[0].close).toHaveBeenCalledTimes(1); + + // Deliver the same raw answer text again — should be deduped + await options.deliver({ text: "```ts\nfinal answer\n```" }, { kind: "final" }); + + // No second streaming session since the raw answer text matches + expect(streamingInstances).toHaveLength(1); + }); + it("passes replyToMessageId and replyInThread to streaming.start()", async () => { const { options } = createDispatcherHarness({ runtime: createRuntimeLogger(), diff --git a/extensions/feishu/src/reply-dispatcher.ts b/extensions/feishu/src/reply-dispatcher.ts index 5ebf712ca8b..68f0a2c2a0f 100644 --- a/extensions/feishu/src/reply-dispatcher.ts +++ b/extensions/feishu/src/reply-dispatcher.ts @@ -143,11 +143,39 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP let streaming: FeishuStreamingSession | null = null; let streamText = ""; let lastPartial = ""; + let reasoningText = ""; const deliveredFinalTexts = new Set(); let partialUpdateQueue: Promise = Promise.resolve(); let streamingStartPromise: Promise | null = null; type StreamTextUpdateMode = "snapshot" | "delta"; + const formatReasoningPrefix = (thinking: string): string => { + if (!thinking) return ""; + const withoutLabel = thinking.replace(/^Reasoning:\n/, ""); + const plain = withoutLabel.replace(/^_(.*)_$/gm, "$1"); + const lines = plain.split("\n").map((line) => `> ${line}`); + return `> 💭 **Thinking**\n${lines.join("\n")}`; + }; + + const buildCombinedStreamText = (thinking: string, answer: string): string => { + const parts: string[] = []; + if (thinking) parts.push(formatReasoningPrefix(thinking)); + if (thinking && answer) parts.push("\n\n---\n\n"); + if (answer) parts.push(answer); + return parts.join(""); + }; + + const flushStreamingCardUpdate = (combined: string) => { + partialUpdateQueue = partialUpdateQueue.then(async () => { + if (streamingStartPromise) { + await streamingStartPromise; + } + if (streaming?.isActive()) { + await streaming.update(combined); + } + }); + }; + const queueStreamingUpdate = ( nextText: string, options?: { @@ -167,14 +195,13 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP const mode = options?.mode ?? "snapshot"; streamText = mode === "delta" ? `${streamText}${nextText}` : mergeStreamingText(streamText, nextText); - partialUpdateQueue = partialUpdateQueue.then(async () => { - if (streamingStartPromise) { - await streamingStartPromise; - } - if (streaming?.isActive()) { - await streaming.update(streamText); - } - }); + flushStreamingCardUpdate(buildCombinedStreamText(reasoningText, streamText)); + }; + + const queueReasoningUpdate = (nextThinking: string) => { + if (!nextThinking) return; + reasoningText = nextThinking; + flushStreamingCardUpdate(buildCombinedStreamText(reasoningText, streamText)); }; const startStreaming = () => { @@ -213,7 +240,7 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP } await partialUpdateQueue; if (streaming?.isActive()) { - let text = streamText; + let text = buildCombinedStreamText(reasoningText, streamText); if (mentionTargets?.length) { text = buildMentionedCardContent(mentionTargets, text); } @@ -223,6 +250,7 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP streamingStartPromise = null; streamText = ""; lastPartial = ""; + reasoningText = ""; }; const sendChunkedTextReply = async (params: { @@ -392,6 +420,16 @@ export function createFeishuReplyDispatcher(params: CreateFeishuReplyDispatcherP }); } : undefined, + onReasoningStream: streamingEnabled + ? (payload: ReplyPayload) => { + if (!payload.text) { + return; + } + startStreaming(); + queueReasoningUpdate(payload.text); + } + : undefined, + onReasoningEnd: streamingEnabled ? () => {} : undefined, }, markDispatchIdle, }; diff --git a/extensions/zalo/src/monitor.webhook.ts b/extensions/zalo/src/monitor.webhook.ts index ef10d3a9a0e..ab218dbd7a6 100644 --- a/extensions/zalo/src/monitor.webhook.ts +++ b/extensions/zalo/src/monitor.webhook.ts @@ -15,8 +15,8 @@ import { withResolvedWebhookRequestPipeline, WEBHOOK_ANOMALY_COUNTER_DEFAULTS, WEBHOOK_RATE_LIMIT_DEFAULTS, + resolveClientIp, } from "openclaw/plugin-sdk/zalo"; -import { resolveClientIp } from "../../../src/gateway/net.js"; import type { ResolvedZaloAccount } from "./accounts.js"; import type { ZaloFetch, ZaloUpdate } from "./api.js"; import type { ZaloRuntimeEnv } from "./monitor.js"; diff --git a/src/agents/auth-profiles.external-cli-sync.test.ts b/src/agents/auth-profiles.external-cli-sync.test.ts new file mode 100644 index 00000000000..303b85b72d2 --- /dev/null +++ b/src/agents/auth-profiles.external-cli-sync.test.ts @@ -0,0 +1,54 @@ +import { describe, expect, it, vi } from "vitest"; +import type { AuthProfileStore } from "./auth-profiles/types.js"; + +const mocks = vi.hoisted(() => ({ + readCodexCliCredentialsCached: vi.fn(), + readQwenCliCredentialsCached: vi.fn(() => null), + readMiniMaxCliCredentialsCached: vi.fn(() => null), +})); + +vi.mock("./cli-credentials.js", () => ({ + readCodexCliCredentialsCached: mocks.readCodexCliCredentialsCached, + readQwenCliCredentialsCached: mocks.readQwenCliCredentialsCached, + readMiniMaxCliCredentialsCached: mocks.readMiniMaxCliCredentialsCached, +})); + +const { syncExternalCliCredentials } = await import("./auth-profiles/external-cli-sync.js"); +const { CODEX_CLI_PROFILE_ID } = await import("./auth-profiles/constants.js"); + +const OPENAI_CODEX_DEFAULT_PROFILE_ID = "openai-codex:default"; + +describe("syncExternalCliCredentials", () => { + it("syncs Codex CLI credentials into the supported default auth profile", () => { + const expires = Date.now() + 60_000; + mocks.readCodexCliCredentialsCached.mockReturnValue({ + type: "oauth", + provider: "openai-codex", + access: "access-token", + refresh: "refresh-token", + expires, + accountId: "acct_123", + }); + + const store: AuthProfileStore = { + version: 1, + profiles: {}, + }; + + const mutated = syncExternalCliCredentials(store); + + expect(mutated).toBe(true); + expect(mocks.readCodexCliCredentialsCached).toHaveBeenCalledWith( + expect.objectContaining({ ttlMs: expect.any(Number) }), + ); + expect(store.profiles[OPENAI_CODEX_DEFAULT_PROFILE_ID]).toMatchObject({ + type: "oauth", + provider: "openai-codex", + access: "access-token", + refresh: "refresh-token", + expires, + accountId: "acct_123", + }); + expect(store.profiles[CODEX_CLI_PROFILE_ID]).toBeUndefined(); + }); +}); diff --git a/src/agents/auth-profiles/external-cli-sync.ts b/src/agents/auth-profiles/external-cli-sync.ts index 56ca400cf16..2627845ed40 100644 --- a/src/agents/auth-profiles/external-cli-sync.ts +++ b/src/agents/auth-profiles/external-cli-sync.ts @@ -1,4 +1,5 @@ import { + readCodexCliCredentialsCached, readQwenCliCredentialsCached, readMiniMaxCliCredentialsCached, } from "../cli-credentials.js"; @@ -11,6 +12,8 @@ import { } from "./constants.js"; import type { AuthProfileCredential, AuthProfileStore, OAuthCredential } from "./types.js"; +const OPENAI_CODEX_DEFAULT_PROFILE_ID = "openai-codex:default"; + function shallowEqualOAuthCredentials(a: OAuthCredential | undefined, b: OAuthCredential): boolean { if (!a) { return false; @@ -37,7 +40,11 @@ function isExternalProfileFresh(cred: AuthProfileCredential | undefined, now: nu if (cred.type !== "oauth" && cred.type !== "token") { return false; } - if (cred.provider !== "qwen-portal" && cred.provider !== "minimax-portal") { + if ( + cred.provider !== "qwen-portal" && + cred.provider !== "minimax-portal" && + cred.provider !== "openai-codex" + ) { return false; } if (typeof cred.expires !== "number") { @@ -82,7 +89,8 @@ function syncExternalCliCredentialsForProvider( } /** - * Sync OAuth credentials from external CLI tools (Qwen Code CLI, MiniMax CLI) into the store. + * Sync OAuth credentials from external CLI tools (Qwen Code CLI, MiniMax CLI, Codex CLI) + * into the store. * * Returns true if any credentials were updated. */ @@ -130,6 +138,17 @@ export function syncExternalCliCredentials(store: AuthProfileStore): boolean { ) { mutated = true; } + if ( + syncExternalCliCredentialsForProvider( + store, + OPENAI_CODEX_DEFAULT_PROFILE_ID, + "openai-codex", + () => readCodexCliCredentialsCached({ ttlMs: EXTERNAL_CLI_SYNC_TTL_MS }), + now, + ) + ) { + mutated = true; + } return mutated; } diff --git a/src/agents/model-compat.test.ts b/src/agents/model-compat.test.ts index 3ae2e1b99fe..56b9c16203c 100644 --- a/src/agents/model-compat.test.ts +++ b/src/agents/model-compat.test.ts @@ -86,6 +86,14 @@ function expectSupportsDeveloperRoleForcedOff(overrides?: Partial>): const normalized = normalizeModelCompat(model as Model); expect(supportsDeveloperRole(normalized)).toBe(false); } + +function expectSupportsUsageInStreamingForcedOff(overrides?: Partial>): void { + const model = { ...baseModel(), ...overrides }; + delete (model as { compat?: unknown }).compat; + const normalized = normalizeModelCompat(model as Model); + expect(supportsUsageInStreaming(normalized)).toBe(false); +} + function expectResolvedForwardCompat( model: Model | undefined, expected: { provider: string; id: string }, @@ -211,16 +219,11 @@ describe("normalizeModelCompat", () => { }); }); - it("leaves supportsUsageInStreaming at default for generic custom openai-completions provider", () => { - const model = { - ...baseModel(), + it("forces supportsUsageInStreaming off for generic custom openai-completions provider", () => { + expectSupportsUsageInStreamingForcedOff({ provider: "custom-cpa", baseUrl: "https://cpa.example.com/v1", - }; - delete (model as { compat?: unknown }).compat; - const normalized = normalizeModelCompat(model as Model); - // supportsUsageInStreaming is no longer forced off — pi-ai's default (true) applies - expect(supportsUsageInStreaming(normalized)).toBeUndefined(); + }); }); it("forces supportsDeveloperRole off for Qwen proxy via openai-completions", () => { @@ -270,7 +273,7 @@ describe("normalizeModelCompat", () => { expect(supportsUsageInStreaming(normalized)).toBe(true); }); - it("forces supportsDeveloperRole off but leaves supportsUsageInStreaming unset for non-native endpoints", () => { + it("still forces flags off when not explicitly set by user", () => { const model = { ...baseModel(), provider: "custom-cpa", @@ -279,8 +282,7 @@ describe("normalizeModelCompat", () => { delete (model as { compat?: unknown }).compat; const normalized = normalizeModelCompat(model); expect(supportsDeveloperRole(normalized)).toBe(false); - // supportsUsageInStreaming is no longer forced off — pi-ai default applies - expect(supportsUsageInStreaming(normalized)).toBeUndefined(); + expect(supportsUsageInStreaming(normalized)).toBe(false); }); it("does not mutate caller model when forcing supportsDeveloperRole off", () => { @@ -295,8 +297,7 @@ describe("normalizeModelCompat", () => { expect(supportsDeveloperRole(model)).toBeUndefined(); expect(supportsUsageInStreaming(model)).toBeUndefined(); expect(supportsDeveloperRole(normalized)).toBe(false); - // supportsUsageInStreaming is not set by normalizeModelCompat — pi-ai default applies - expect(supportsUsageInStreaming(normalized)).toBeUndefined(); + expect(supportsUsageInStreaming(normalized)).toBe(false); }); it("does not override explicit compat false", () => { diff --git a/src/agents/model-compat.ts b/src/agents/model-compat.ts index c2837f6b83d..72deb0c655f 100644 --- a/src/agents/model-compat.ts +++ b/src/agents/model-compat.ts @@ -52,16 +52,11 @@ export function normalizeModelCompat(model: Model): Model { return model; } - // The `developer` role is an OpenAI-native behavior that most compatible - // backends reject. Force it off for non-native endpoints unless the user - // has explicitly opted in via their model config. - // - // `supportsUsageInStreaming` is NOT forced off — most OpenAI-compatible - // backends (DashScope, DeepSeek, Groq, Together, etc.) handle - // `stream_options: { include_usage: true }` correctly, and disabling it - // silently breaks usage/cost tracking for all non-native providers. - // Users can still opt out with `compat.supportsUsageInStreaming: false` - // if their backend rejects the parameter. + // The `developer` role and stream usage chunks are OpenAI-native behaviors. + // Many OpenAI-compatible backends reject `developer` and/or emit usage-only + // chunks that break strict parsers expecting choices[0]. For non-native + // openai-completions endpoints, force both compat flags off — unless the + // user has explicitly opted in via their model config. const compat = model.compat ?? undefined; // When baseUrl is empty the pi-ai library defaults to api.openai.com, so // leave compat unchanged and let default native behavior apply. @@ -70,22 +65,24 @@ export function normalizeModelCompat(model: Model): Model { return model; } - // Respect explicit user overrides. + // Respect explicit user overrides: if the user has set a compat flag to + // true in their model definition, they know their endpoint supports it. const forcedDeveloperRole = compat?.supportsDeveloperRole === true; + const forcedUsageStreaming = compat?.supportsUsageInStreaming === true; - if (forcedDeveloperRole) { + if (forcedDeveloperRole && forcedUsageStreaming) { return model; } - // Only force supportsDeveloperRole off. Leave supportsUsageInStreaming - // at whatever the user set or pi-ai's default (true). + // Return a new object — do not mutate the caller's model reference. return { ...model, compat: compat ? { ...compat, - supportsDeveloperRole: false, + supportsDeveloperRole: forcedDeveloperRole || false, + supportsUsageInStreaming: forcedUsageStreaming || false, } - : { supportsDeveloperRole: false }, + : { supportsDeveloperRole: false, supportsUsageInStreaming: false }, } as typeof model; } diff --git a/src/agents/pi-embedded-subscribe.handlers.compaction.ts b/src/agents/pi-embedded-subscribe.handlers.compaction.ts index 705ffb7cf89..7b9c4499eff 100644 --- a/src/agents/pi-embedded-subscribe.handlers.compaction.ts +++ b/src/agents/pi-embedded-subscribe.handlers.compaction.ts @@ -64,11 +64,11 @@ export function handleAutoCompactionEnd( emitAgentEvent({ runId: ctx.params.runId, stream: "compaction", - data: { phase: "end", willRetry }, + data: { phase: "end", willRetry, completed: hasResult && !wasAborted }, }); void ctx.params.onAgentEvent?.({ stream: "compaction", - data: { phase: "end", willRetry }, + data: { phase: "end", willRetry, completed: hasResult && !wasAborted }, }); // Run after_compaction plugin hook (fire-and-forget) diff --git a/src/auto-reply/reply/agent-runner-execution.ts b/src/auto-reply/reply/agent-runner-execution.ts index 27a31c2387a..9ebc239f7ff 100644 --- a/src/auto-reply/reply/agent-runner-execution.ts +++ b/src/auto-reply/reply/agent-runner-execution.ts @@ -67,7 +67,7 @@ export type AgentRunLoopResult = fallbackModel?: string; fallbackAttempts: RuntimeFallbackAttempt[]; didLogHeartbeatStrip: boolean; - autoCompactionCompleted: boolean; + autoCompactionCount: number; /** Payload keys sent directly (not via pipeline) during tool flush. */ directlySentBlockKeys?: Set; } @@ -103,7 +103,7 @@ export async function runAgentTurnWithFallback(params: { }): Promise { const TRANSIENT_HTTP_RETRY_DELAY_MS = 2_500; let didLogHeartbeatStrip = false; - let autoCompactionCompleted = false; + let autoCompactionCount = 0; // Track payloads sent directly (not via pipeline) during tool flush to avoid duplicates. const directlySentBlockKeys = new Set(); @@ -319,154 +319,165 @@ export async function runAgentTurnWithFallback(params: { }, ); return (async () => { - const result = await runEmbeddedPiAgent({ - ...embeddedContext, - trigger: params.isHeartbeat ? "heartbeat" : "user", - groupId: resolveGroupSessionKey(params.sessionCtx)?.id, - groupChannel: - params.sessionCtx.GroupChannel?.trim() ?? params.sessionCtx.GroupSubject?.trim(), - groupSpace: params.sessionCtx.GroupSpace?.trim() ?? undefined, - ...senderContext, - ...runBaseParams, - prompt: params.commandBody, - extraSystemPrompt: params.followupRun.run.extraSystemPrompt, - toolResultFormat: (() => { - const channel = resolveMessageChannel( - params.sessionCtx.Surface, - params.sessionCtx.Provider, - ); - if (!channel) { - return "markdown"; - } - return isMarkdownCapableMessageChannel(channel) ? "markdown" : "plain"; - })(), - suppressToolErrorWarnings: params.opts?.suppressToolErrorWarnings, - bootstrapContextMode: params.opts?.bootstrapContextMode, - bootstrapContextRunKind: params.opts?.isHeartbeat ? "heartbeat" : "default", - images: params.opts?.images, - abortSignal: params.opts?.abortSignal, - blockReplyBreak: params.resolvedBlockStreamingBreak, - blockReplyChunking: params.blockReplyChunking, - onPartialReply: async (payload) => { - const textForTyping = await handlePartialForTyping(payload); - if (!params.opts?.onPartialReply || textForTyping === undefined) { - return; - } - await params.opts.onPartialReply({ - text: textForTyping, - mediaUrls: payload.mediaUrls, - }); - }, - onAssistantMessageStart: async () => { - await params.typingSignals.signalMessageStart(); - await params.opts?.onAssistantMessageStart?.(); - }, - onReasoningStream: - params.typingSignals.shouldStartOnReasoning || params.opts?.onReasoningStream - ? async (payload) => { - await params.typingSignals.signalReasoningDelta(); - await params.opts?.onReasoningStream?.({ - text: payload.text, - mediaUrls: payload.mediaUrls, - }); - } - : undefined, - onReasoningEnd: params.opts?.onReasoningEnd, - onAgentEvent: async (evt) => { - // Signal run start only after the embedded agent emits real activity. - const hasLifecyclePhase = - evt.stream === "lifecycle" && typeof evt.data.phase === "string"; - if (evt.stream !== "lifecycle" || hasLifecyclePhase) { - notifyAgentRunStart(); - } - // Trigger typing when tools start executing. - // Must await to ensure typing indicator starts before tool summaries are emitted. - if (evt.stream === "tool") { - const phase = typeof evt.data.phase === "string" ? evt.data.phase : ""; - const name = typeof evt.data.name === "string" ? evt.data.name : undefined; - if (phase === "start" || phase === "update") { - await params.typingSignals.signalToolStart(); - await params.opts?.onToolStart?.({ name, phase }); + let attemptCompactionCount = 0; + try { + const result = await runEmbeddedPiAgent({ + ...embeddedContext, + trigger: params.isHeartbeat ? "heartbeat" : "user", + groupId: resolveGroupSessionKey(params.sessionCtx)?.id, + groupChannel: + params.sessionCtx.GroupChannel?.trim() ?? params.sessionCtx.GroupSubject?.trim(), + groupSpace: params.sessionCtx.GroupSpace?.trim() ?? undefined, + ...senderContext, + ...runBaseParams, + prompt: params.commandBody, + extraSystemPrompt: params.followupRun.run.extraSystemPrompt, + toolResultFormat: (() => { + const channel = resolveMessageChannel( + params.sessionCtx.Surface, + params.sessionCtx.Provider, + ); + if (!channel) { + return "markdown"; } - } - // Track auto-compaction completion and notify UI layer - if (evt.stream === "compaction") { - const phase = typeof evt.data.phase === "string" ? evt.data.phase : ""; - if (phase === "start") { - await params.opts?.onCompactionStart?.(); + return isMarkdownCapableMessageChannel(channel) ? "markdown" : "plain"; + })(), + suppressToolErrorWarnings: params.opts?.suppressToolErrorWarnings, + bootstrapContextMode: params.opts?.bootstrapContextMode, + bootstrapContextRunKind: params.opts?.isHeartbeat ? "heartbeat" : "default", + images: params.opts?.images, + abortSignal: params.opts?.abortSignal, + blockReplyBreak: params.resolvedBlockStreamingBreak, + blockReplyChunking: params.blockReplyChunking, + onPartialReply: async (payload) => { + const textForTyping = await handlePartialForTyping(payload); + if (!params.opts?.onPartialReply || textForTyping === undefined) { + return; } - if (phase === "end") { - autoCompactionCompleted = true; - await params.opts?.onCompactionEnd?.(); - } - } - }, - // Always pass onBlockReply so flushBlockReplyBuffer works before tool execution, - // even when regular block streaming is disabled. The handler sends directly - // via opts.onBlockReply when the pipeline isn't available. - onBlockReply: params.opts?.onBlockReply - ? createBlockReplyDeliveryHandler({ - onBlockReply: params.opts.onBlockReply, - currentMessageId: - params.sessionCtx.MessageSidFull ?? params.sessionCtx.MessageSid, - normalizeStreamingText, - applyReplyToMode: params.applyReplyToMode, - normalizeMediaPaths: normalizeReplyMediaPaths, - typingSignals: params.typingSignals, - blockStreamingEnabled: params.blockStreamingEnabled, - blockReplyPipeline, - directlySentBlockKeys, - }) - : undefined, - onBlockReplyFlush: - params.blockStreamingEnabled && blockReplyPipeline - ? async () => { - await blockReplyPipeline.flush({ force: true }); - } - : undefined, - shouldEmitToolResult: params.shouldEmitToolResult, - shouldEmitToolOutput: params.shouldEmitToolOutput, - bootstrapPromptWarningSignaturesSeen, - bootstrapPromptWarningSignature: - bootstrapPromptWarningSignaturesSeen[ - bootstrapPromptWarningSignaturesSeen.length - 1 - ], - onToolResult: onToolResult - ? (() => { - // Serialize tool result delivery to preserve message ordering. - // Without this, concurrent tool callbacks race through typing signals - // and message sends, causing out-of-order delivery to the user. - // See: https://github.com/openclaw/openclaw/issues/11044 - let toolResultChain: Promise = Promise.resolve(); - return (payload: ReplyPayload) => { - toolResultChain = toolResultChain - .then(async () => { - const { text, skip } = normalizeStreamingText(payload); - if (skip) { - return; - } - await params.typingSignals.signalTextDelta(text); - await onToolResult({ - ...payload, - text, - }); - }) - .catch((err) => { - // Keep chain healthy after an error so later tool results still deliver. - logVerbose(`tool result delivery failed: ${String(err)}`); + await params.opts.onPartialReply({ + text: textForTyping, + mediaUrls: payload.mediaUrls, + }); + }, + onAssistantMessageStart: async () => { + await params.typingSignals.signalMessageStart(); + await params.opts?.onAssistantMessageStart?.(); + }, + onReasoningStream: + params.typingSignals.shouldStartOnReasoning || params.opts?.onReasoningStream + ? async (payload) => { + await params.typingSignals.signalReasoningDelta(); + await params.opts?.onReasoningStream?.({ + text: payload.text, + mediaUrls: payload.mediaUrls, }); - const task = toolResultChain.finally(() => { - params.pendingToolTasks.delete(task); - }); - params.pendingToolTasks.add(task); - }; - })() - : undefined, - }); - bootstrapPromptWarningSignaturesSeen = resolveBootstrapWarningSignaturesSeen( - result.meta?.systemPromptReport, - ); - return result; + } + : undefined, + onReasoningEnd: params.opts?.onReasoningEnd, + onAgentEvent: async (evt) => { + // Signal run start only after the embedded agent emits real activity. + const hasLifecyclePhase = + evt.stream === "lifecycle" && typeof evt.data.phase === "string"; + if (evt.stream !== "lifecycle" || hasLifecyclePhase) { + notifyAgentRunStart(); + } + // Trigger typing when tools start executing. + // Must await to ensure typing indicator starts before tool summaries are emitted. + if (evt.stream === "tool") { + const phase = typeof evt.data.phase === "string" ? evt.data.phase : ""; + const name = typeof evt.data.name === "string" ? evt.data.name : undefined; + if (phase === "start" || phase === "update") { + await params.typingSignals.signalToolStart(); + await params.opts?.onToolStart?.({ name, phase }); + } + } + // Track auto-compaction completion and notify UI layer. + if (evt.stream === "compaction") { + const phase = typeof evt.data.phase === "string" ? evt.data.phase : ""; + if (phase === "start") { + await params.opts?.onCompactionStart?.(); + } + const completed = evt.data?.completed === true; + if (phase === "end" && completed) { + attemptCompactionCount += 1; + await params.opts?.onCompactionEnd?.(); + } + } + }, + // Always pass onBlockReply so flushBlockReplyBuffer works before tool execution, + // even when regular block streaming is disabled. The handler sends directly + // via opts.onBlockReply when the pipeline isn't available. + onBlockReply: params.opts?.onBlockReply + ? createBlockReplyDeliveryHandler({ + onBlockReply: params.opts.onBlockReply, + currentMessageId: + params.sessionCtx.MessageSidFull ?? params.sessionCtx.MessageSid, + normalizeStreamingText, + applyReplyToMode: params.applyReplyToMode, + normalizeMediaPaths: normalizeReplyMediaPaths, + typingSignals: params.typingSignals, + blockStreamingEnabled: params.blockStreamingEnabled, + blockReplyPipeline, + directlySentBlockKeys, + }) + : undefined, + onBlockReplyFlush: + params.blockStreamingEnabled && blockReplyPipeline + ? async () => { + await blockReplyPipeline.flush({ force: true }); + } + : undefined, + shouldEmitToolResult: params.shouldEmitToolResult, + shouldEmitToolOutput: params.shouldEmitToolOutput, + bootstrapPromptWarningSignaturesSeen, + bootstrapPromptWarningSignature: + bootstrapPromptWarningSignaturesSeen[ + bootstrapPromptWarningSignaturesSeen.length - 1 + ], + onToolResult: onToolResult + ? (() => { + // Serialize tool result delivery to preserve message ordering. + // Without this, concurrent tool callbacks race through typing signals + // and message sends, causing out-of-order delivery to the user. + // See: https://github.com/openclaw/openclaw/issues/11044 + let toolResultChain: Promise = Promise.resolve(); + return (payload: ReplyPayload) => { + toolResultChain = toolResultChain + .then(async () => { + const { text, skip } = normalizeStreamingText(payload); + if (skip) { + return; + } + await params.typingSignals.signalTextDelta(text); + await onToolResult({ + ...payload, + text, + }); + }) + .catch((err) => { + // Keep chain healthy after an error so later tool results still deliver. + logVerbose(`tool result delivery failed: ${String(err)}`); + }); + const task = toolResultChain.finally(() => { + params.pendingToolTasks.delete(task); + }); + params.pendingToolTasks.add(task); + }; + })() + : undefined, + }); + bootstrapPromptWarningSignaturesSeen = resolveBootstrapWarningSignaturesSeen( + result.meta?.systemPromptReport, + ); + const resultCompactionCount = Math.max( + 0, + result.meta?.agentMeta?.compactionCount ?? 0, + ); + attemptCompactionCount = Math.max(attemptCompactionCount, resultCompactionCount); + return result; + } finally { + autoCompactionCount += attemptCompactionCount; + } })(); }, }); @@ -654,7 +665,7 @@ export async function runAgentTurnWithFallback(params: { fallbackModel, fallbackAttempts, didLogHeartbeatStrip, - autoCompactionCompleted, + autoCompactionCount, directlySentBlockKeys: directlySentBlockKeys.size > 0 ? directlySentBlockKeys : undefined, }; } diff --git a/src/auto-reply/reply/agent-runner.misc.runreplyagent.test.ts b/src/auto-reply/reply/agent-runner.misc.runreplyagent.test.ts index 14731dbb0ff..90535e69fb9 100644 --- a/src/auto-reply/reply/agent-runner.misc.runreplyagent.test.ts +++ b/src/auto-reply/reply/agent-runner.misc.runreplyagent.test.ts @@ -322,7 +322,7 @@ describe("runReplyAgent auto-compaction token update", () => { extraSystemPrompt?: string; onAgentEvent?: (evt: { stream?: string; - data?: { phase?: string; willRetry?: boolean }; + data?: { phase?: string; willRetry?: boolean; completed?: boolean }; }) => void; }; @@ -397,7 +397,10 @@ describe("runReplyAgent auto-compaction token update", () => { runEmbeddedPiAgentMock.mockImplementation(async (params: EmbeddedRunParams) => { // Simulate auto-compaction during agent run params.onAgentEvent?.({ stream: "compaction", data: { phase: "start" } }); - params.onAgentEvent?.({ stream: "compaction", data: { phase: "end", willRetry: false } }); + params.onAgentEvent?.({ + stream: "compaction", + data: { phase: "end", willRetry: false, completed: true }, + }); return { payloads: [{ text: "done" }], meta: { @@ -455,6 +458,238 @@ describe("runReplyAgent auto-compaction token update", () => { expect(stored[sessionKey].compactionCount).toBe(1); }); + it("tracks auto-compaction from embedded result metadata even when no compaction event is emitted", async () => { + const tmp = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-compact-meta-")); + const storePath = path.join(tmp, "sessions.json"); + const sessionKey = "main"; + const sessionEntry = { + sessionId: "session", + updatedAt: Date.now(), + totalTokens: 181_000, + compactionCount: 0, + }; + + await seedSessionStore({ storePath, sessionKey, entry: sessionEntry }); + + runEmbeddedPiAgentMock.mockResolvedValue({ + payloads: [{ text: "done" }], + meta: { + agentMeta: { + usage: { input: 190_000, output: 8_000, total: 198_000 }, + lastCallUsage: { input: 10_000, output: 3_000, total: 13_000 }, + compactionCount: 2, + }, + }, + }); + + const config = { + agents: { defaults: { compaction: { memoryFlush: { enabled: false } } } }, + }; + const { typing, sessionCtx, resolvedQueue, followupRun } = createBaseRun({ + storePath, + sessionEntry, + config, + }); + + await runReplyAgent({ + commandBody: "hello", + followupRun, + queueKey: "main", + resolvedQueue, + shouldSteer: false, + shouldFollowup: false, + isActive: false, + isStreaming: false, + typing, + sessionCtx, + sessionEntry, + sessionStore: { [sessionKey]: sessionEntry }, + sessionKey, + storePath, + defaultModel: "anthropic/claude-opus-4-5", + agentCfgContextTokens: 200_000, + resolvedVerboseLevel: "off", + isNewSession: false, + blockStreamingEnabled: false, + resolvedBlockStreamingBreak: "message_end", + shouldInjectGroupIntro: false, + typingMode: "instant", + }); + + const stored = JSON.parse(await fs.readFile(storePath, "utf-8")); + expect(stored[sessionKey].totalTokens).toBe(10_000); + expect(stored[sessionKey].compactionCount).toBe(2); + }); + + it("accumulates compactions across fallback attempts without double-counting a single attempt", async () => { + const tmp = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-compact-fallback-")); + const storePath = path.join(tmp, "sessions.json"); + const sessionKey = "main"; + const sessionEntry = { + sessionId: "session", + updatedAt: Date.now(), + totalTokens: 181_000, + compactionCount: 0, + }; + + await seedSessionStore({ storePath, sessionKey, entry: sessionEntry }); + + runWithModelFallbackMock.mockImplementationOnce(async ({ run }: RunWithModelFallbackParams) => { + try { + await run("anthropic", "claude"); + } catch { + // Expected first-attempt failure. + } + return { + result: await run("openai", "gpt-5.2"), + provider: "openai", + model: "gpt-5.2", + attempts: [{ provider: "anthropic", model: "claude", error: "attempt failed" }], + }; + }); + + runEmbeddedPiAgentMock + .mockImplementationOnce(async (params: EmbeddedRunParams) => { + params.onAgentEvent?.({ + stream: "compaction", + data: { phase: "end", willRetry: true, completed: true }, + }); + throw new Error("attempt failed"); + }) + .mockResolvedValueOnce({ + payloads: [{ text: "done" }], + meta: { + agentMeta: { + usage: { input: 190_000, output: 8_000, total: 198_000 }, + lastCallUsage: { input: 10_000, output: 3_000, total: 13_000 }, + compactionCount: 2, + }, + }, + }); + + const config = { + agents: { defaults: { compaction: { memoryFlush: { enabled: false } } } }, + }; + const { typing, sessionCtx, resolvedQueue, followupRun } = createBaseRun({ + storePath, + sessionEntry, + config, + }); + + await runReplyAgent({ + commandBody: "hello", + followupRun, + queueKey: "main", + resolvedQueue, + shouldSteer: false, + shouldFollowup: false, + isActive: false, + isStreaming: false, + typing, + sessionCtx, + sessionEntry, + sessionStore: { [sessionKey]: sessionEntry }, + sessionKey, + storePath, + defaultModel: "anthropic/claude-opus-4-5", + agentCfgContextTokens: 200_000, + resolvedVerboseLevel: "off", + isNewSession: false, + blockStreamingEnabled: false, + resolvedBlockStreamingBreak: "message_end", + shouldInjectGroupIntro: false, + typingMode: "instant", + }); + + const stored = JSON.parse(await fs.readFile(storePath, "utf-8")); + expect(stored[sessionKey].totalTokens).toBe(10_000); + expect(stored[sessionKey].compactionCount).toBe(3); + }); + + it("does not count failed compaction end events from earlier fallback attempts", async () => { + const tmp = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-compact-fallback-failed-")); + const storePath = path.join(tmp, "sessions.json"); + const sessionKey = "main"; + const sessionEntry = { + sessionId: "session", + updatedAt: Date.now(), + totalTokens: 181_000, + compactionCount: 0, + }; + + await seedSessionStore({ storePath, sessionKey, entry: sessionEntry }); + + runWithModelFallbackMock.mockImplementationOnce(async ({ run }: RunWithModelFallbackParams) => { + try { + await run("anthropic", "claude"); + } catch { + // Expected first-attempt failure. + } + return { + result: await run("openai", "gpt-5.2"), + provider: "openai", + model: "gpt-5.2", + attempts: [{ provider: "anthropic", model: "claude", error: "attempt failed" }], + }; + }); + + runEmbeddedPiAgentMock + .mockImplementationOnce(async (params: EmbeddedRunParams) => { + params.onAgentEvent?.({ + stream: "compaction", + data: { phase: "end", willRetry: true, completed: false }, + }); + throw new Error("attempt failed"); + }) + .mockResolvedValueOnce({ + payloads: [{ text: "done" }], + meta: { + agentMeta: { + usage: { input: 190_000, output: 8_000, total: 198_000 }, + lastCallUsage: { input: 10_000, output: 3_000, total: 13_000 }, + compactionCount: 2, + }, + }, + }); + + const config = { + agents: { defaults: { compaction: { memoryFlush: { enabled: false } } } }, + }; + const { typing, sessionCtx, resolvedQueue, followupRun } = createBaseRun({ + storePath, + sessionEntry, + config, + }); + + await runReplyAgent({ + commandBody: "hello", + followupRun, + queueKey: "main", + resolvedQueue, + shouldSteer: false, + shouldFollowup: false, + isActive: false, + isStreaming: false, + typing, + sessionCtx, + sessionEntry, + sessionStore: { [sessionKey]: sessionEntry }, + sessionKey, + storePath, + defaultModel: "anthropic/claude-opus-4-5", + agentCfgContextTokens: 200_000, + resolvedVerboseLevel: "off", + isNewSession: false, + blockStreamingEnabled: false, + resolvedBlockStreamingBreak: "message_end", + shouldInjectGroupIntro: false, + typingMode: "instant", + }); + + const stored = JSON.parse(await fs.readFile(storePath, "utf-8")); + expect(stored[sessionKey].totalTokens).toBe(10_000); + expect(stored[sessionKey].compactionCount).toBe(2); + }); it("updates totalTokens from lastCallUsage even without compaction", async () => { const tmp = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-usage-last-")); const storePath = path.join(tmp, "sessions.json"); @@ -537,7 +772,10 @@ describe("runReplyAgent auto-compaction token update", () => { runEmbeddedPiAgentMock.mockImplementation(async (params: EmbeddedRunParams) => { params.onAgentEvent?.({ stream: "compaction", data: { phase: "start" } }); - params.onAgentEvent?.({ stream: "compaction", data: { phase: "end", willRetry: false } }); + params.onAgentEvent?.({ + stream: "compaction", + data: { phase: "end", willRetry: false, completed: true }, + }); return { payloads: [{ text: "done" }], meta: { diff --git a/src/auto-reply/reply/agent-runner.ts b/src/auto-reply/reply/agent-runner.ts index edc441a2552..76d86c45b05 100644 --- a/src/auto-reply/reply/agent-runner.ts +++ b/src/auto-reply/reply/agent-runner.ts @@ -380,7 +380,7 @@ export async function runReplyAgent(params: { fallbackAttempts, directlySentBlockKeys, } = runOutcome; - let { didLogHeartbeatStrip, autoCompactionCompleted } = runOutcome; + let { didLogHeartbeatStrip, autoCompactionCount } = runOutcome; if ( shouldInjectGroupIntro && @@ -664,12 +664,13 @@ export async function runReplyAgent(params: { } } - if (autoCompactionCompleted) { + if (autoCompactionCount > 0) { const count = await incrementRunCompactionCount({ sessionEntry: activeSessionEntry, sessionStore: activeSessionStore, sessionKey, storePath, + amount: autoCompactionCount, lastCallUsage: runResult.meta?.agentMeta?.lastCallUsage, contextTokensUsed, }); diff --git a/src/auto-reply/reply/followup-runner.test.ts b/src/auto-reply/reply/followup-runner.test.ts index 8d12e815685..c8e33397a2a 100644 --- a/src/auto-reply/reply/followup-runner.test.ts +++ b/src/auto-reply/reply/followup-runner.test.ts @@ -71,7 +71,7 @@ function mockCompactionRun(params: { }) => { args.onAgentEvent?.({ stream: "compaction", - data: { phase: "end", willRetry: params.willRetry }, + data: { phase: "end", willRetry: params.willRetry, completed: true }, }); return params.result; }, @@ -126,6 +126,110 @@ describe("createFollowupRunner compaction", () => { expect(firstCall?.[0]?.text).toContain("Auto-compaction complete"); expect(sessionStore.main.compactionCount).toBe(1); }); + + it("tracks auto-compaction from embedded result metadata even when no compaction event is emitted", async () => { + const storePath = path.join( + await fs.mkdtemp(path.join(tmpdir(), "openclaw-compaction-meta-")), + "sessions.json", + ); + const sessionEntry: SessionEntry = { + sessionId: "session", + updatedAt: Date.now(), + }; + const sessionStore: Record = { + main: sessionEntry, + }; + const onBlockReply = vi.fn(async () => {}); + + runEmbeddedPiAgentMock.mockResolvedValueOnce({ + payloads: [{ text: "final" }], + meta: { + agentMeta: { + compactionCount: 2, + lastCallUsage: { input: 10_000, output: 3_000, total: 13_000 }, + }, + }, + }); + + const runner = createFollowupRunner({ + opts: { onBlockReply }, + typing: createMockTypingController(), + typingMode: "instant", + sessionEntry, + sessionStore, + sessionKey: "main", + storePath, + defaultModel: "anthropic/claude-opus-4-5", + }); + + const queued = createQueuedRun({ + run: { + verboseLevel: "on", + }, + }); + + await runner(queued); + + expect(onBlockReply).toHaveBeenCalled(); + const firstCall = (onBlockReply.mock.calls as unknown as Array>)[0]; + expect(firstCall?.[0]?.text).toContain("Auto-compaction complete"); + expect(sessionStore.main.compactionCount).toBe(2); + }); + + it("does not count failed compaction end events in followup runs", async () => { + const storePath = path.join( + await fs.mkdtemp(path.join(tmpdir(), "openclaw-compaction-failed-")), + "sessions.json", + ); + const sessionEntry: SessionEntry = { + sessionId: "session", + updatedAt: Date.now(), + }; + const sessionStore: Record = { + main: sessionEntry, + }; + const onBlockReply = vi.fn(async () => {}); + + const runner = createFollowupRunner({ + opts: { onBlockReply }, + typing: createMockTypingController(), + typingMode: "instant", + sessionEntry, + sessionStore, + sessionKey: "main", + storePath, + defaultModel: "anthropic/claude-opus-4-5", + }); + + const queued = createQueuedRun({ + run: { + verboseLevel: "on", + }, + }); + + runEmbeddedPiAgentMock.mockImplementationOnce(async (args) => { + args.onAgentEvent?.({ + stream: "compaction", + data: { phase: "end", willRetry: false, completed: false }, + }); + return { + payloads: [{ text: "final" }], + meta: { + agentMeta: { + compactionCount: 0, + lastCallUsage: { input: 10_000, output: 3_000, total: 13_000 }, + }, + }, + }; + }); + + await runner(queued); + + expect(onBlockReply).toHaveBeenCalledTimes(1); + const firstCall = (onBlockReply.mock.calls as unknown as Array>)[0]; + expect(firstCall?.[0]?.text).toBe("final"); + expect(sessionStore.main.compactionCount).toBeUndefined(); + }); }); describe("createFollowupRunner bootstrap warning dedupe", () => { diff --git a/src/auto-reply/reply/followup-runner.ts b/src/auto-reply/reply/followup-runner.ts index 8c7eccb5f02..fe90d56433c 100644 --- a/src/auto-reply/reply/followup-runner.ts +++ b/src/auto-reply/reply/followup-runner.ts @@ -145,7 +145,7 @@ export function createFollowupRunner(params: { isControlUiVisible: shouldSurfaceToControlUi, }); } - let autoCompactionCompleted = false; + let autoCompactionCount = 0; let runResult: Awaited>; let fallbackProvider = queued.run.provider; let fallbackModel = queued.run.model; @@ -168,68 +168,81 @@ export function createFollowupRunner(params: { }), run: async (provider, model, runOptions) => { const authProfile = resolveRunAuthProfile(queued.run, provider); - const result = await runEmbeddedPiAgent({ - sessionId: queued.run.sessionId, - sessionKey: queued.run.sessionKey, - agentId: queued.run.agentId, - trigger: "user", - messageChannel: queued.originatingChannel ?? undefined, - messageProvider: queued.run.messageProvider, - agentAccountId: queued.run.agentAccountId, - messageTo: queued.originatingTo, - messageThreadId: queued.originatingThreadId, - currentChannelId: queued.originatingTo, - currentThreadTs: - queued.originatingThreadId != null ? String(queued.originatingThreadId) : undefined, - groupId: queued.run.groupId, - groupChannel: queued.run.groupChannel, - groupSpace: queued.run.groupSpace, - senderId: queued.run.senderId, - senderName: queued.run.senderName, - senderUsername: queued.run.senderUsername, - senderE164: queued.run.senderE164, - senderIsOwner: queued.run.senderIsOwner, - sessionFile: queued.run.sessionFile, - agentDir: queued.run.agentDir, - workspaceDir: queued.run.workspaceDir, - config: queued.run.config, - skillsSnapshot: queued.run.skillsSnapshot, - prompt: queued.prompt, - extraSystemPrompt: queued.run.extraSystemPrompt, - ownerNumbers: queued.run.ownerNumbers, - enforceFinalTag: queued.run.enforceFinalTag, - provider, - model, - ...authProfile, - thinkLevel: queued.run.thinkLevel, - verboseLevel: queued.run.verboseLevel, - reasoningLevel: queued.run.reasoningLevel, - suppressToolErrorWarnings: opts?.suppressToolErrorWarnings, - execOverrides: queued.run.execOverrides, - bashElevated: queued.run.bashElevated, - timeoutMs: queued.run.timeoutMs, - runId, - allowTransientCooldownProbe: runOptions?.allowTransientCooldownProbe, - blockReplyBreak: queued.run.blockReplyBreak, - bootstrapPromptWarningSignaturesSeen, - bootstrapPromptWarningSignature: - bootstrapPromptWarningSignaturesSeen[ - bootstrapPromptWarningSignaturesSeen.length - 1 - ], - onAgentEvent: (evt) => { - if (evt.stream !== "compaction") { - return; - } - const phase = typeof evt.data.phase === "string" ? evt.data.phase : ""; - if (phase === "end") { - autoCompactionCompleted = true; - } - }, - }); - bootstrapPromptWarningSignaturesSeen = resolveBootstrapWarningSignaturesSeen( - result.meta?.systemPromptReport, - ); - return result; + let attemptCompactionCount = 0; + try { + const result = await runEmbeddedPiAgent({ + sessionId: queued.run.sessionId, + sessionKey: queued.run.sessionKey, + agentId: queued.run.agentId, + trigger: "user", + messageChannel: queued.originatingChannel ?? undefined, + messageProvider: queued.run.messageProvider, + agentAccountId: queued.run.agentAccountId, + messageTo: queued.originatingTo, + messageThreadId: queued.originatingThreadId, + currentChannelId: queued.originatingTo, + currentThreadTs: + queued.originatingThreadId != null + ? String(queued.originatingThreadId) + : undefined, + groupId: queued.run.groupId, + groupChannel: queued.run.groupChannel, + groupSpace: queued.run.groupSpace, + senderId: queued.run.senderId, + senderName: queued.run.senderName, + senderUsername: queued.run.senderUsername, + senderE164: queued.run.senderE164, + senderIsOwner: queued.run.senderIsOwner, + sessionFile: queued.run.sessionFile, + agentDir: queued.run.agentDir, + workspaceDir: queued.run.workspaceDir, + config: queued.run.config, + skillsSnapshot: queued.run.skillsSnapshot, + prompt: queued.prompt, + extraSystemPrompt: queued.run.extraSystemPrompt, + ownerNumbers: queued.run.ownerNumbers, + enforceFinalTag: queued.run.enforceFinalTag, + provider, + model, + ...authProfile, + thinkLevel: queued.run.thinkLevel, + verboseLevel: queued.run.verboseLevel, + reasoningLevel: queued.run.reasoningLevel, + suppressToolErrorWarnings: opts?.suppressToolErrorWarnings, + execOverrides: queued.run.execOverrides, + bashElevated: queued.run.bashElevated, + timeoutMs: queued.run.timeoutMs, + runId, + allowTransientCooldownProbe: runOptions?.allowTransientCooldownProbe, + blockReplyBreak: queued.run.blockReplyBreak, + bootstrapPromptWarningSignaturesSeen, + bootstrapPromptWarningSignature: + bootstrapPromptWarningSignaturesSeen[ + bootstrapPromptWarningSignaturesSeen.length - 1 + ], + onAgentEvent: (evt) => { + if (evt.stream !== "compaction") { + return; + } + const phase = typeof evt.data.phase === "string" ? evt.data.phase : ""; + const completed = evt.data?.completed === true; + if (phase === "end" && completed) { + attemptCompactionCount += 1; + } + }, + }); + bootstrapPromptWarningSignaturesSeen = resolveBootstrapWarningSignaturesSeen( + result.meta?.systemPromptReport, + ); + const resultCompactionCount = Math.max( + 0, + result.meta?.agentMeta?.compactionCount ?? 0, + ); + attemptCompactionCount = Math.max(attemptCompactionCount, resultCompactionCount); + return result; + } finally { + autoCompactionCount += attemptCompactionCount; + } }, }); runResult = fallbackResult.result; @@ -326,12 +339,13 @@ export function createFollowupRunner(params: { return; } - if (autoCompactionCompleted) { + if (autoCompactionCount > 0) { const count = await incrementRunCompactionCount({ sessionEntry, sessionStore, sessionKey, storePath, + amount: autoCompactionCount, lastCallUsage: runResult.meta?.agentMeta?.lastCallUsage, contextTokensUsed, }); diff --git a/src/auto-reply/reply/reply-state.test.ts b/src/auto-reply/reply/reply-state.test.ts index 69dbad531e7..f83d313e2d3 100644 --- a/src/auto-reply/reply/reply-state.test.ts +++ b/src/auto-reply/reply/reply-state.test.ts @@ -445,6 +445,23 @@ describe("incrementCompactionCount", () => { expect(stored[sessionKey].outputTokens).toBeUndefined(); }); + it("increments compaction count by an explicit amount", async () => { + const entry = { sessionId: "s1", updatedAt: Date.now(), compactionCount: 2 } as SessionEntry; + const { storePath, sessionKey, sessionStore } = await createCompactionSessionFixture(entry); + + const count = await incrementCompactionCount({ + sessionEntry: entry, + sessionStore, + sessionKey, + storePath, + amount: 2, + }); + expect(count).toBe(4); + + const stored = JSON.parse(await fs.readFile(storePath, "utf-8")); + expect(stored[sessionKey].compactionCount).toBe(4); + }); + it("does not update totalTokens when tokensAfter is not provided", async () => { const entry = { sessionId: "s1", diff --git a/src/auto-reply/reply/session-run-accounting.ts b/src/auto-reply/reply/session-run-accounting.ts index fe4b91a7cdc..1a8a0d83640 100644 --- a/src/auto-reply/reply/session-run-accounting.ts +++ b/src/auto-reply/reply/session-run-accounting.ts @@ -8,6 +8,7 @@ type IncrementRunCompactionCountParams = Omit< Parameters[0], "tokensAfter" > & { + amount?: number; lastCallUsage?: NormalizedUsage; contextTokensUsed?: number; }; @@ -30,6 +31,7 @@ export async function incrementRunCompactionCount( sessionStore: params.sessionStore, sessionKey: params.sessionKey, storePath: params.storePath, + amount: params.amount, tokensAfter: tokensAfterCompaction, }); } diff --git a/src/auto-reply/reply/session-updates.ts b/src/auto-reply/reply/session-updates.ts index 55b4d4eb15b..bea6cd326e0 100644 --- a/src/auto-reply/reply/session-updates.ts +++ b/src/auto-reply/reply/session-updates.ts @@ -255,6 +255,7 @@ export async function incrementCompactionCount(params: { sessionKey?: string; storePath?: string; now?: number; + amount?: number; /** Token count after compaction - if provided, updates session token counts */ tokensAfter?: number; }): Promise { @@ -264,6 +265,7 @@ export async function incrementCompactionCount(params: { sessionKey, storePath, now = Date.now(), + amount = 1, tokensAfter, } = params; if (!sessionStore || !sessionKey) { @@ -273,7 +275,8 @@ export async function incrementCompactionCount(params: { if (!entry) { return undefined; } - const nextCount = (entry.compactionCount ?? 0) + 1; + const incrementBy = Math.max(0, amount); + const nextCount = (entry.compactionCount ?? 0) + incrementBy; // Build update payload with compaction count and optionally updated token counts const updates: Partial = { compactionCount: nextCount, diff --git a/src/commands/doctor-auth.deprecated-cli-profiles.test.ts b/src/commands/doctor-auth.deprecated-cli-profiles.test.ts index d6436d7027a..ec7e824cd9e 100644 --- a/src/commands/doctor-auth.deprecated-cli-profiles.test.ts +++ b/src/commands/doctor-auth.deprecated-cli-profiles.test.ts @@ -63,6 +63,13 @@ describe("maybeRemoveDeprecatedCliAuthProfiles", () => { refresh: "token-r2", expires: Date.now() + 60_000, }, + "openai-codex:default": { + type: "oauth", + provider: "openai-codex", + access: "token-c", + refresh: "token-r3", + expires: Date.now() + 60_000, + }, }, }, null, @@ -76,10 +83,11 @@ describe("maybeRemoveDeprecatedCliAuthProfiles", () => { profiles: { "anthropic:claude-cli": { provider: "anthropic", mode: "oauth" }, "openai-codex:codex-cli": { provider: "openai-codex", mode: "oauth" }, + "openai-codex:default": { provider: "openai-codex", mode: "oauth" }, }, order: { anthropic: ["anthropic:claude-cli"], - "openai-codex": ["openai-codex:codex-cli"], + "openai-codex": ["openai-codex:codex-cli", "openai-codex:default"], }, }, } as const; @@ -94,10 +102,12 @@ describe("maybeRemoveDeprecatedCliAuthProfiles", () => { }; expect(raw.profiles?.["anthropic:claude-cli"]).toBeUndefined(); expect(raw.profiles?.["openai-codex:codex-cli"]).toBeUndefined(); + expect(raw.profiles?.["openai-codex:default"]).toBeDefined(); expect(next.auth?.profiles?.["anthropic:claude-cli"]).toBeUndefined(); expect(next.auth?.profiles?.["openai-codex:codex-cli"]).toBeUndefined(); + expect(next.auth?.profiles?.["openai-codex:default"]).toBeDefined(); expect(next.auth?.order?.anthropic).toBeUndefined(); - expect(next.auth?.order?.["openai-codex"]).toBeUndefined(); + expect(next.auth?.order?.["openai-codex"]).toEqual(["openai-codex:default"]); }); }); diff --git a/src/config/legacy.migrations.part-3.ts b/src/config/legacy.migrations.part-3.ts index ccc07b4b99f..5035930dadb 100644 --- a/src/config/legacy.migrations.part-3.ts +++ b/src/config/legacy.migrations.part-3.ts @@ -31,6 +31,7 @@ const AGENT_HEARTBEAT_KEYS = new Set([ "ackMaxChars", "suppressToolErrorWarnings", "lightContext", + "isolatedSession", ]); const CHANNEL_HEARTBEAT_KEYS = new Set(["showOk", "showAlerts", "useIndicator"]); diff --git a/src/config/types.agent-defaults.ts b/src/config/types.agent-defaults.ts index c81cf0edbed..d2bdbb096ff 100644 --- a/src/config/types.agent-defaults.ts +++ b/src/config/types.agent-defaults.ts @@ -253,6 +253,13 @@ export type AgentDefaultsConfig = { * Lightweight mode keeps only HEARTBEAT.md from workspace bootstrap files. */ lightContext?: boolean; + /** + * If true, run heartbeat turns in an isolated session with no prior + * conversation history. The heartbeat only sees its bootstrap context + * (HEARTBEAT.md when lightContext is also enabled). Dramatically reduces + * per-heartbeat token cost by avoiding the full session transcript. + */ + isolatedSession?: boolean; /** * When enabled, deliver the model's reasoning payload for heartbeat runs (when available) * as a separate message prefixed with `Reasoning:` (same as `/reasoning on`). diff --git a/src/config/zod-schema.agent-runtime.ts b/src/config/zod-schema.agent-runtime.ts index 7a87440a768..d7b1dd393e7 100644 --- a/src/config/zod-schema.agent-runtime.ts +++ b/src/config/zod-schema.agent-runtime.ts @@ -34,6 +34,7 @@ export const HeartbeatSchema = z ackMaxChars: z.number().int().nonnegative().optional(), suppressToolErrorWarnings: z.boolean().optional(), lightContext: z.boolean().optional(), + isolatedSession: z.boolean().optional(), }) .strict() .superRefine((val, ctx) => { diff --git a/src/infra/heartbeat-runner.model-override.test.ts b/src/infra/heartbeat-runner.model-override.test.ts index 6c7862fb84c..f33e5e9fbd0 100644 --- a/src/infra/heartbeat-runner.model-override.test.ts +++ b/src/infra/heartbeat-runner.model-override.test.ts @@ -65,6 +65,7 @@ describe("runHeartbeatOnce – heartbeat model override", () => { model?: string; suppressToolErrorWarnings?: boolean; lightContext?: boolean; + isolatedSession?: boolean; }) { return withHeartbeatFixture(async ({ tmpDir, storePath, seedSession }) => { const cfg: OpenClawConfig = { @@ -77,6 +78,7 @@ describe("runHeartbeatOnce – heartbeat model override", () => { model: params.model, suppressToolErrorWarnings: params.suppressToolErrorWarnings, lightContext: params.lightContext, + isolatedSession: params.isolatedSession, }, }, }, @@ -133,6 +135,72 @@ describe("runHeartbeatOnce – heartbeat model override", () => { ); }); + it("uses isolated session key when isolatedSession is enabled", async () => { + await withHeartbeatFixture(async ({ tmpDir, storePath, seedSession }) => { + const cfg: OpenClawConfig = { + agents: { + defaults: { + workspace: tmpDir, + heartbeat: { + every: "5m", + target: "whatsapp", + isolatedSession: true, + }, + }, + }, + channels: { whatsapp: { allowFrom: ["*"] } }, + session: { store: storePath }, + }; + const sessionKey = resolveMainSessionKey(cfg); + await seedSession(sessionKey, { lastChannel: "whatsapp", lastTo: "+1555" }); + + const replySpy = vi.spyOn(replyModule, "getReplyFromConfig"); + replySpy.mockResolvedValue({ text: "HEARTBEAT_OK" }); + + await runHeartbeatOnce({ + cfg, + deps: { getQueueSize: () => 0, nowMs: () => 0 }, + }); + + expect(replySpy).toHaveBeenCalledTimes(1); + const ctx = replySpy.mock.calls[0]?.[0]; + // Isolated heartbeat runs use a dedicated session key with :heartbeat suffix + expect(ctx.SessionKey).toBe(`${sessionKey}:heartbeat`); + }); + }); + + it("uses main session key when isolatedSession is not set", async () => { + await withHeartbeatFixture(async ({ tmpDir, storePath, seedSession }) => { + const cfg: OpenClawConfig = { + agents: { + defaults: { + workspace: tmpDir, + heartbeat: { + every: "5m", + target: "whatsapp", + }, + }, + }, + channels: { whatsapp: { allowFrom: ["*"] } }, + session: { store: storePath }, + }; + const sessionKey = resolveMainSessionKey(cfg); + await seedSession(sessionKey, { lastChannel: "whatsapp", lastTo: "+1555" }); + + const replySpy = vi.spyOn(replyModule, "getReplyFromConfig"); + replySpy.mockResolvedValue({ text: "HEARTBEAT_OK" }); + + await runHeartbeatOnce({ + cfg, + deps: { getQueueSize: () => 0, nowMs: () => 0 }, + }); + + expect(replySpy).toHaveBeenCalledTimes(1); + const ctx = replySpy.mock.calls[0]?.[0]; + expect(ctx.SessionKey).toBe(sessionKey); + }); + }); + it("passes per-agent heartbeat model override (merged with defaults)", async () => { await withHeartbeatFixture(async ({ tmpDir, storePath, seedSession }) => { const cfg: OpenClawConfig = { diff --git a/src/infra/heartbeat-runner.ts b/src/infra/heartbeat-runner.ts index 344fd22d8fc..1f6ae8767e9 100644 --- a/src/infra/heartbeat-runner.ts +++ b/src/infra/heartbeat-runner.ts @@ -35,6 +35,7 @@ import { updateSessionStore, } from "../config/sessions.js"; import type { AgentDefaultsConfig } from "../config/types.agent-defaults.js"; +import { resolveCronSession } from "../cron/isolated-agent/session.js"; import { createSubsystemLogger } from "../logging/subsystem.js"; import { getQueueSize } from "../process/command-queue.js"; import { CommandLane } from "../process/lanes.js"; @@ -659,6 +660,30 @@ export async function runHeartbeatOnce(opts: { } const { entry, sessionKey, storePath } = preflight.session; const previousUpdatedAt = entry?.updatedAt; + + // When isolatedSession is enabled, create a fresh session via the same + // pattern as cron sessionTarget: "isolated". This gives the heartbeat + // a new session ID (empty transcript) each run, avoiding the cost of + // sending the full conversation history (~100K tokens) to the LLM. + // Delivery routing still uses the main session entry (lastChannel, lastTo). + const useIsolatedSession = heartbeat?.isolatedSession === true; + let runSessionKey = sessionKey; + let runStorePath = storePath; + if (useIsolatedSession) { + const isolatedKey = `${sessionKey}:heartbeat`; + const cronSession = resolveCronSession({ + cfg, + sessionKey: isolatedKey, + agentId, + nowMs: startedAt, + forceNew: true, + }); + cronSession.store[isolatedKey] = cronSession.sessionEntry; + await saveSessionStore(cronSession.storePath, cronSession.store); + runSessionKey = isolatedKey; + runStorePath = cronSession.storePath; + } + const delivery = resolveHeartbeatDeliveryTarget({ cfg, entry, heartbeat }); const heartbeatAccountId = heartbeat?.accountId?.trim(); if (delivery.reason === "unknown-account") { @@ -707,7 +732,7 @@ export async function runHeartbeatOnce(opts: { AccountId: delivery.accountId, MessageThreadId: delivery.threadId, Provider: hasExecCompletion ? "exec-event" : hasCronEvents ? "cron-event" : "heartbeat", - SessionKey: sessionKey, + SessionKey: runSessionKey, }; if (!visibility.showAlerts && !visibility.showOk && !visibility.useIndicator) { emitHeartbeatEvent({ @@ -758,10 +783,11 @@ export async function runHeartbeatOnce(opts: { }; try { - // Capture transcript state before the heartbeat run so we can prune if HEARTBEAT_OK + // Capture transcript state before the heartbeat run so we can prune if HEARTBEAT_OK. + // For isolated sessions, capture the isolated transcript (not the main session's). const transcriptState = await captureTranscriptState({ - storePath, - sessionKey, + storePath: runStorePath, + sessionKey: runSessionKey, agentId, }); diff --git a/src/plugin-sdk/subpaths.test.ts b/src/plugin-sdk/subpaths.test.ts index ce66f789857..592b6de73cf 100644 --- a/src/plugin-sdk/subpaths.test.ts +++ b/src/plugin-sdk/subpaths.test.ts @@ -126,5 +126,8 @@ describe("plugin-sdk subpath exports", () => { const twitch = await import("openclaw/plugin-sdk/twitch"); expect(typeof twitch.DEFAULT_ACCOUNT_ID).toBe("string"); expect(typeof twitch.normalizeAccountId).toBe("function"); + + const zalo = await import("openclaw/plugin-sdk/zalo"); + expect(typeof zalo.resolveClientIp).toBe("function"); }); }); diff --git a/src/plugin-sdk/zalo.ts b/src/plugin-sdk/zalo.ts index b5c69486f60..e13529f8c42 100644 --- a/src/plugin-sdk/zalo.ts +++ b/src/plugin-sdk/zalo.ts @@ -61,6 +61,7 @@ export { buildSecretInputSchema } from "./secret-input-schema.js"; export { MarkdownConfigSchema } from "../config/zod-schema.core.js"; export { waitForAbortSignal } from "../infra/abort-signal.js"; export { createDedupeCache } from "../infra/dedupe.js"; +export { resolveClientIp } from "../gateway/net.js"; export { emptyPluginConfigSchema } from "../plugins/config-schema.js"; export type { PluginRuntime } from "../plugins/runtime/types.js"; export type { OpenClawPluginApi } from "../plugins/types.js"; diff --git a/src/plugins/wired-hooks-compaction.test.ts b/src/plugins/wired-hooks-compaction.test.ts index 1e3f0021e29..694f4a1f4b4 100644 --- a/src/plugins/wired-hooks-compaction.test.ts +++ b/src/plugins/wired-hooks-compaction.test.ts @@ -138,7 +138,7 @@ describe("compaction hook wiring", () => { expect(emitAgentEvent).toHaveBeenCalledWith({ runId: "r2", stream: "compaction", - data: { phase: "end", willRetry: false }, + data: { phase: "end", willRetry: false, completed: true }, }); }); @@ -169,7 +169,7 @@ describe("compaction hook wiring", () => { expect(emitAgentEvent).toHaveBeenCalledWith({ runId: "r3", stream: "compaction", - data: { phase: "end", willRetry: true }, + data: { phase: "end", willRetry: true, completed: true }, }); });