diff --git a/.github/workflows/auto-response.yml b/.github/workflows/auto-response.yml index 60e1707cf35..d9d810bffa7 100644 --- a/.github/workflows/auto-response.yml +++ b/.github/workflows/auto-response.yml @@ -393,6 +393,7 @@ jobs: } const invalidLabel = "invalid"; + const spamLabel = "r: spam"; const dirtyLabel = "dirty"; const noisyPrMessage = "Closing this PR because it looks dirty (too many unrelated or unexpected changes). This usually happens when a branch picks up unrelated commits or a merge went sideways. Please recreate the PR from a clean branch."; @@ -429,6 +430,21 @@ jobs: }); return; } + if (labelSet.has(spamLabel)) { + await github.rest.issues.update({ + owner: context.repo.owner, + repo: context.repo.repo, + issue_number: pullRequest.number, + state: "closed", + }); + await github.rest.issues.lock({ + owner: context.repo.owner, + repo: context.repo.repo, + issue_number: pullRequest.number, + lock_reason: "spam", + }); + return; + } if (labelSet.has(invalidLabel)) { await github.rest.issues.update({ owner: context.repo.owner, @@ -440,6 +456,23 @@ jobs: } } + if (issue && labelSet.has(spamLabel)) { + await github.rest.issues.update({ + owner: context.repo.owner, + repo: context.repo.repo, + issue_number: issue.number, + state: "closed", + state_reason: "not_planned", + }); + await github.rest.issues.lock({ + owner: context.repo.owner, + repo: context.repo.repo, + issue_number: issue.number, + lock_reason: "spam", + }); + return; + } + if (issue && labelSet.has(invalidLabel)) { await github.rest.issues.update({ owner: context.repo.owner, diff --git a/AGENTS.md b/AGENTS.md index 80443603c87..69b0df68faa 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -24,6 +24,7 @@ - `r: testflight`: close requests asking for TestFlight access/builds. OpenClaw does not provide TestFlight distribution yet, so use the standard response (“Not available, build from source.”) instead of ad-hoc replies. - `r: third-party-extension`: close with guidance to ship as third-party plugin. - `r: moltbook`: close + lock as off-topic (not affiliated). +- `r: spam`: close + lock as spam (`lock_reason: spam`). - `invalid`: close invalid items (issues are closed as `not_planned`; PRs are closed). - `dirty`: close PRs with too many unrelated/unexpected changes (PR-only label). diff --git a/CHANGELOG.md b/CHANGELOG.md index 6f6f3c5cd46..60df48c6357 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -75,6 +75,8 @@ Docs: https://docs.openclaw.ai - Secrets/SecretRef: reject exec SecretRef traversal ids across schema, runtime, and gateway. (#42370) Thanks @joshavant. - Telegram/docs: clarify that `channels.telegram.groups` allowlists chats while `groupAllowFrom` allowlists users inside those chats, and point invalid negative chat IDs at the right config key. (#42451) Thanks @altaywtf. - Models/Alibaba Cloud Model Studio: wire `MODELSTUDIO_API_KEY` through shared env auth, implicit provider discovery, and shell-env fallback so onboarding works outside the wizard too. (#40634) Thanks @pomelo-nwu. +- ACP/sessions_spawn: implicitly stream `mode="run"` ACP spawns to parent only for eligible subagent orchestrator sessions (heartbeat `target: "last"` with a usable session-local route), restoring parent progress relays without thread binding. (#42404) Thanks @davidguttman. +- Sessions/reset model recompute: clear stale runtime model, context-token, and system-prompt metadata before session resets recompute the replacement session, so resets pick up current defaults and explicit overrides instead of reusing old runtime model state. (#41173) thanks @PonyX-lab. ## 2026.3.8 diff --git a/extensions/zalo/src/channel.sendpayload.test.ts b/extensions/zalo/src/channel.sendpayload.test.ts index 6cc072ac6dd..27acb737f9f 100644 --- a/extensions/zalo/src/channel.sendpayload.test.ts +++ b/extensions/zalo/src/channel.sendpayload.test.ts @@ -1,5 +1,9 @@ import type { ReplyPayload } from "openclaw/plugin-sdk/zalo"; import { beforeEach, describe, expect, it, vi } from "vitest"; +import { + installSendPayloadContractSuite, + primeSendMock, +} from "../../../src/test-utils/send-payload-contract.js"; import { zaloPlugin } from "./channel.js"; vi.mock("./send.js", () => ({ @@ -25,78 +29,16 @@ describe("zaloPlugin outbound sendPayload", () => { mockedSend.mockResolvedValue({ ok: true, messageId: "zl-1" }); }); - it("text-only delegates to sendText", async () => { - mockedSend.mockResolvedValue({ ok: true, messageId: "zl-t1" }); - - const result = await zaloPlugin.outbound!.sendPayload!(baseCtx({ text: "hello" })); - - expect(mockedSend).toHaveBeenCalledWith("123456789", "hello", expect.any(Object)); - expect(result).toMatchObject({ channel: "zalo", messageId: "zl-t1" }); - }); - - it("single media delegates to sendMedia", async () => { - mockedSend.mockResolvedValue({ ok: true, messageId: "zl-m1" }); - - const result = await zaloPlugin.outbound!.sendPayload!( - baseCtx({ text: "cap", mediaUrl: "https://example.com/a.jpg" }), - ); - - expect(mockedSend).toHaveBeenCalledWith( - "123456789", - "cap", - expect.objectContaining({ mediaUrl: "https://example.com/a.jpg" }), - ); - expect(result).toMatchObject({ channel: "zalo" }); - }); - - it("multi-media iterates URLs with caption on first", async () => { - mockedSend - .mockResolvedValueOnce({ ok: true, messageId: "zl-1" }) - .mockResolvedValueOnce({ ok: true, messageId: "zl-2" }); - - const result = await zaloPlugin.outbound!.sendPayload!( - baseCtx({ - text: "caption", - mediaUrls: ["https://example.com/1.jpg", "https://example.com/2.jpg"], - }), - ); - - expect(mockedSend).toHaveBeenCalledTimes(2); - expect(mockedSend).toHaveBeenNthCalledWith( - 1, - "123456789", - "caption", - expect.objectContaining({ mediaUrl: "https://example.com/1.jpg" }), - ); - expect(mockedSend).toHaveBeenNthCalledWith( - 2, - "123456789", - "", - expect.objectContaining({ mediaUrl: "https://example.com/2.jpg" }), - ); - expect(result).toMatchObject({ channel: "zalo", messageId: "zl-2" }); - }); - - it("empty payload returns no-op", async () => { - const result = await zaloPlugin.outbound!.sendPayload!(baseCtx({})); - - expect(mockedSend).not.toHaveBeenCalled(); - expect(result).toEqual({ channel: "zalo", messageId: "" }); - }); - - it("chunking splits long text", async () => { - mockedSend - .mockResolvedValueOnce({ ok: true, messageId: "zl-c1" }) - .mockResolvedValueOnce({ ok: true, messageId: "zl-c2" }); - - const longText = "a".repeat(3000); - const result = await zaloPlugin.outbound!.sendPayload!(baseCtx({ text: longText })); - - // textChunkLimit is 2000 with chunkTextForOutbound, so it should split - expect(mockedSend.mock.calls.length).toBeGreaterThanOrEqual(2); - for (const call of mockedSend.mock.calls) { - expect((call[1] as string).length).toBeLessThanOrEqual(2000); - } - expect(result).toMatchObject({ channel: "zalo" }); + installSendPayloadContractSuite({ + channel: "zalo", + chunking: { mode: "split", longTextLength: 3000, maxChunkLength: 2000 }, + createHarness: ({ payload, sendResults }) => { + primeSendMock(mockedSend, { ok: true, messageId: "zl-1" }, sendResults); + return { + run: async () => await zaloPlugin.outbound!.sendPayload!(baseCtx(payload)), + sendMock: mockedSend, + to: "123456789", + }; + }, }); }); diff --git a/extensions/zalouser/src/channel.sendpayload.test.ts b/extensions/zalouser/src/channel.sendpayload.test.ts index 534f9c39b95..0cef65f8c05 100644 --- a/extensions/zalouser/src/channel.sendpayload.test.ts +++ b/extensions/zalouser/src/channel.sendpayload.test.ts @@ -1,5 +1,9 @@ import type { ReplyPayload } from "openclaw/plugin-sdk/zalouser"; import { beforeEach, describe, expect, it, vi } from "vitest"; +import { + installSendPayloadContractSuite, + primeSendMock, +} from "../../../src/test-utils/send-payload-contract.js"; import { zalouserPlugin } from "./channel.js"; vi.mock("./send.js", () => ({ @@ -40,15 +44,6 @@ describe("zalouserPlugin outbound sendPayload", () => { mockedSend.mockResolvedValue({ ok: true, messageId: "zlu-1" }); }); - it("text-only delegates to sendText", async () => { - mockedSend.mockResolvedValue({ ok: true, messageId: "zlu-t1" }); - - const result = await zalouserPlugin.outbound!.sendPayload!(baseCtx({ text: "hello" })); - - expect(mockedSend).toHaveBeenCalledWith("987654321", "hello", expect.any(Object)); - expect(result).toMatchObject({ channel: "zalouser", messageId: "zlu-t1" }); - }); - it("group target delegates with isGroup=true and stripped threadId", async () => { mockedSend.mockResolvedValue({ ok: true, messageId: "zlu-g1" }); @@ -65,21 +60,6 @@ describe("zalouserPlugin outbound sendPayload", () => { expect(result).toMatchObject({ channel: "zalouser", messageId: "zlu-g1" }); }); - it("single media delegates to sendMedia", async () => { - mockedSend.mockResolvedValue({ ok: true, messageId: "zlu-m1" }); - - const result = await zalouserPlugin.outbound!.sendPayload!( - baseCtx({ text: "cap", mediaUrl: "https://example.com/a.jpg" }), - ); - - expect(mockedSend).toHaveBeenCalledWith( - "987654321", - "cap", - expect.objectContaining({ mediaUrl: "https://example.com/a.jpg" }), - ); - expect(result).toMatchObject({ channel: "zalouser" }); - }); - it("treats bare numeric targets as direct chats for backward compatibility", async () => { mockedSend.mockResolvedValue({ ok: true, messageId: "zlu-d1" }); @@ -112,55 +92,17 @@ describe("zalouserPlugin outbound sendPayload", () => { expect(result).toMatchObject({ channel: "zalouser", messageId: "zlu-g-native" }); }); - it("multi-media iterates URLs with caption on first", async () => { - mockedSend - .mockResolvedValueOnce({ ok: true, messageId: "zlu-1" }) - .mockResolvedValueOnce({ ok: true, messageId: "zlu-2" }); - - const result = await zalouserPlugin.outbound!.sendPayload!( - baseCtx({ - text: "caption", - mediaUrls: ["https://example.com/1.jpg", "https://example.com/2.jpg"], - }), - ); - - expect(mockedSend).toHaveBeenCalledTimes(2); - expect(mockedSend).toHaveBeenNthCalledWith( - 1, - "987654321", - "caption", - expect.objectContaining({ mediaUrl: "https://example.com/1.jpg" }), - ); - expect(mockedSend).toHaveBeenNthCalledWith( - 2, - "987654321", - "", - expect.objectContaining({ mediaUrl: "https://example.com/2.jpg" }), - ); - expect(result).toMatchObject({ channel: "zalouser", messageId: "zlu-2" }); - }); - - it("empty payload returns no-op", async () => { - const result = await zalouserPlugin.outbound!.sendPayload!(baseCtx({})); - - expect(mockedSend).not.toHaveBeenCalled(); - expect(result).toEqual({ channel: "zalouser", messageId: "" }); - }); - - it("chunking splits long text", async () => { - mockedSend - .mockResolvedValueOnce({ ok: true, messageId: "zlu-c1" }) - .mockResolvedValueOnce({ ok: true, messageId: "zlu-c2" }); - - const longText = "a".repeat(3000); - const result = await zalouserPlugin.outbound!.sendPayload!(baseCtx({ text: longText })); - - // textChunkLimit is 2000 with chunkTextForOutbound, so it should split - expect(mockedSend.mock.calls.length).toBeGreaterThanOrEqual(2); - for (const call of mockedSend.mock.calls) { - expect((call[1] as string).length).toBeLessThanOrEqual(2000); - } - expect(result).toMatchObject({ channel: "zalouser" }); + installSendPayloadContractSuite({ + channel: "zalouser", + chunking: { mode: "split", longTextLength: 3000, maxChunkLength: 2000 }, + createHarness: ({ payload, sendResults }) => { + primeSendMock(mockedSend, { ok: true, messageId: "zlu-1" }, sendResults); + return { + run: async () => await zalouserPlugin.outbound!.sendPayload!(baseCtx(payload)), + sendMock: mockedSend, + to: "987654321", + }; + }, }); }); diff --git a/src/agents/acp-spawn-parent-stream.ts b/src/agents/acp-spawn-parent-stream.ts index 94f04ce3940..36b113386c2 100644 --- a/src/agents/acp-spawn-parent-stream.ts +++ b/src/agents/acp-spawn-parent-stream.ts @@ -180,7 +180,9 @@ export function startAcpSpawnParentStreamRelay(params: { }; const wake = () => { requestHeartbeatNow( - scopedHeartbeatWakeOptions(parentSessionKey, { reason: "acp:spawn:stream" }), + scopedHeartbeatWakeOptions(parentSessionKey, { + reason: "acp:spawn:stream", + }), ); }; const emit = (text: string, contextKey: string) => { diff --git a/src/agents/acp-spawn.test.ts b/src/agents/acp-spawn.test.ts index 0f28b709792..c53584cdf55 100644 --- a/src/agents/acp-spawn.test.ts +++ b/src/agents/acp-spawn.test.ts @@ -38,6 +38,7 @@ const hoisted = vi.hoisted(() => { const loadSessionStoreMock = vi.fn(); const resolveStorePathMock = vi.fn(); const resolveSessionTranscriptFileMock = vi.fn(); + const areHeartbeatsEnabledMock = vi.fn(); const state = { cfg: createDefaultSpawnConfig(), }; @@ -55,6 +56,7 @@ const hoisted = vi.hoisted(() => { loadSessionStoreMock, resolveStorePathMock, resolveSessionTranscriptFileMock, + areHeartbeatsEnabledMock, state, }; }); @@ -128,6 +130,14 @@ vi.mock("../infra/outbound/session-binding-service.js", async (importOriginal) = }; }); +vi.mock("../infra/heartbeat-wake.js", async (importOriginal) => { + const actual = await importOriginal(); + return { + ...actual, + areHeartbeatsEnabled: () => hoisted.areHeartbeatsEnabledMock(), + }; +}); + vi.mock("./acp-spawn-parent-stream.js", () => ({ startAcpSpawnParentStreamRelay: (...args: unknown[]) => hoisted.startAcpSpawnParentStreamRelayMock(...args), @@ -192,6 +202,7 @@ function expectResolvedIntroTextInBindMetadata(): void { describe("spawnAcpDirect", () => { beforeEach(() => { hoisted.state.cfg = createDefaultSpawnConfig(); + hoisted.areHeartbeatsEnabledMock.mockReset().mockReturnValue(true); hoisted.callGatewayMock.mockReset().mockImplementation(async (argsUnknown: unknown) => { const args = argsUnknown as { method?: string }; @@ -393,6 +404,8 @@ describe("spawnAcpDirect", () => { expect(result.status).toBe("accepted"); expect(result.mode).toBe("run"); + expect(result.streamLogPath).toBeUndefined(); + expect(hoisted.startAcpSpawnParentStreamRelayMock).not.toHaveBeenCalled(); expect(hoisted.resolveSessionTranscriptFileMock).toHaveBeenCalledWith( expect.objectContaining({ sessionId: "sess-123", @@ -633,6 +646,290 @@ describe("spawnAcpDirect", () => { expect(secondHandle.notifyStarted).toHaveBeenCalledTimes(1); }); + it("implicitly streams mode=run ACP spawns for subagent requester sessions", async () => { + hoisted.state.cfg = { + ...hoisted.state.cfg, + agents: { + defaults: { + heartbeat: { + every: "30m", + target: "last", + }, + }, + }, + }; + const firstHandle = createRelayHandle(); + const secondHandle = createRelayHandle(); + hoisted.startAcpSpawnParentStreamRelayMock + .mockReset() + .mockReturnValueOnce(firstHandle) + .mockReturnValueOnce(secondHandle); + hoisted.loadSessionStoreMock.mockReset().mockImplementation(() => { + const store: Record< + string, + { sessionId: string; updatedAt: number; deliveryContext?: unknown } + > = { + "agent:main:subagent:parent": { + sessionId: "parent-sess-1", + updatedAt: Date.now(), + deliveryContext: { + channel: "discord", + to: "channel:parent-channel", + accountId: "default", + }, + }, + }; + return new Proxy(store, { + get(target, prop) { + if (typeof prop === "string" && prop.startsWith("agent:codex:acp:")) { + return { sessionId: "sess-123", updatedAt: Date.now() }; + } + return target[prop as keyof typeof target]; + }, + }); + }); + + const result = await spawnAcpDirect( + { + task: "Investigate flaky tests", + agentId: "codex", + }, + { + agentSessionKey: "agent:main:subagent:parent", + agentChannel: "discord", + agentAccountId: "default", + agentTo: "channel:parent-channel", + }, + ); + + expect(result.status).toBe("accepted"); + expect(result.mode).toBe("run"); + expect(result.streamLogPath).toBe("/tmp/sess-main.acp-stream.jsonl"); + const agentCall = hoisted.callGatewayMock.mock.calls + .map((call: unknown[]) => call[0] as { method?: string; params?: Record }) + .find((request) => request.method === "agent"); + expect(agentCall?.params?.deliver).toBe(false); + expect(agentCall?.params?.channel).toBeUndefined(); + expect(agentCall?.params?.to).toBeUndefined(); + expect(agentCall?.params?.threadId).toBeUndefined(); + expect(hoisted.startAcpSpawnParentStreamRelayMock).toHaveBeenCalledWith( + expect.objectContaining({ + parentSessionKey: "agent:main:subagent:parent", + agentId: "codex", + logPath: "/tmp/sess-main.acp-stream.jsonl", + emitStartNotice: false, + }), + ); + expect(firstHandle.dispose).toHaveBeenCalledTimes(1); + expect(secondHandle.notifyStarted).toHaveBeenCalledTimes(1); + }); + + it("does not implicitly stream when heartbeat target is not session-local", async () => { + hoisted.state.cfg = { + ...hoisted.state.cfg, + agents: { + defaults: { + heartbeat: { + every: "30m", + target: "discord", + to: "channel:ops-room", + }, + }, + }, + }; + + const result = await spawnAcpDirect( + { + task: "Investigate flaky tests", + agentId: "codex", + }, + { + agentSessionKey: "agent:main:subagent:fixed-target", + }, + ); + + expect(result.status).toBe("accepted"); + expect(result.mode).toBe("run"); + expect(result.streamLogPath).toBeUndefined(); + expect(hoisted.startAcpSpawnParentStreamRelayMock).not.toHaveBeenCalled(); + }); + + it("does not implicitly stream when session scope is global", async () => { + hoisted.state.cfg = { + ...hoisted.state.cfg, + session: { + ...hoisted.state.cfg.session, + scope: "global", + }, + agents: { + defaults: { + heartbeat: { + every: "30m", + target: "last", + }, + }, + }, + }; + + const result = await spawnAcpDirect( + { + task: "Investigate flaky tests", + agentId: "codex", + }, + { + agentSessionKey: "agent:main:subagent:global-scope", + }, + ); + + expect(result.status).toBe("accepted"); + expect(result.mode).toBe("run"); + expect(result.streamLogPath).toBeUndefined(); + expect(hoisted.startAcpSpawnParentStreamRelayMock).not.toHaveBeenCalled(); + }); + + it("does not implicitly stream for subagent requester sessions when heartbeat is disabled", async () => { + hoisted.state.cfg = { + ...hoisted.state.cfg, + agents: { + list: [{ id: "main", heartbeat: { every: "30m" } }, { id: "research" }], + }, + }; + + const result = await spawnAcpDirect( + { + task: "Investigate flaky tests", + agentId: "codex", + }, + { + agentSessionKey: "agent:research:subagent:orchestrator", + }, + ); + + expect(result.status).toBe("accepted"); + expect(result.mode).toBe("run"); + expect(result.streamLogPath).toBeUndefined(); + expect(hoisted.startAcpSpawnParentStreamRelayMock).not.toHaveBeenCalled(); + }); + + it("does not implicitly stream for subagent requester sessions when heartbeat cadence is invalid", async () => { + hoisted.state.cfg = { + ...hoisted.state.cfg, + agents: { + list: [ + { + id: "research", + heartbeat: { every: "0m" }, + }, + ], + }, + }; + + const result = await spawnAcpDirect( + { + task: "Investigate flaky tests", + agentId: "codex", + }, + { + agentSessionKey: "agent:research:subagent:invalid-heartbeat", + }, + ); + + expect(result.status).toBe("accepted"); + expect(result.mode).toBe("run"); + expect(result.streamLogPath).toBeUndefined(); + expect(hoisted.startAcpSpawnParentStreamRelayMock).not.toHaveBeenCalled(); + }); + + it("does not implicitly stream when heartbeats are runtime-disabled", async () => { + hoisted.areHeartbeatsEnabledMock.mockReturnValue(false); + + const result = await spawnAcpDirect( + { + task: "Investigate flaky tests", + agentId: "codex", + }, + { + agentSessionKey: "agent:main:subagent:runtime-disabled", + }, + ); + + expect(result.status).toBe("accepted"); + expect(result.mode).toBe("run"); + expect(result.streamLogPath).toBeUndefined(); + expect(hoisted.startAcpSpawnParentStreamRelayMock).not.toHaveBeenCalled(); + }); + + it("does not implicitly stream for legacy subagent requester session keys", async () => { + const result = await spawnAcpDirect( + { + task: "Investigate flaky tests", + agentId: "codex", + }, + { + agentSessionKey: "subagent:legacy-worker", + }, + ); + + expect(result.status).toBe("accepted"); + expect(result.mode).toBe("run"); + expect(result.streamLogPath).toBeUndefined(); + expect(hoisted.startAcpSpawnParentStreamRelayMock).not.toHaveBeenCalled(); + }); + + it("does not implicitly stream for subagent requester sessions with thread context", async () => { + const result = await spawnAcpDirect( + { + task: "Investigate flaky tests", + agentId: "codex", + }, + { + agentSessionKey: "agent:main:subagent:thread-context", + agentChannel: "discord", + agentAccountId: "default", + agentTo: "channel:parent-channel", + agentThreadId: "requester-thread", + }, + ); + + expect(result.status).toBe("accepted"); + expect(result.mode).toBe("run"); + expect(result.streamLogPath).toBeUndefined(); + expect(hoisted.startAcpSpawnParentStreamRelayMock).not.toHaveBeenCalled(); + }); + + it("does not implicitly stream for thread-bound subagent requester sessions", async () => { + hoisted.sessionBindingListBySessionMock.mockImplementation((targetSessionKey: string) => { + if (targetSessionKey === "agent:main:subagent:thread-bound") { + return [ + createSessionBinding({ + targetSessionKey, + targetKind: "subagent", + status: "active", + }), + ]; + } + return []; + }); + + const result = await spawnAcpDirect( + { + task: "Investigate flaky tests", + agentId: "codex", + }, + { + agentSessionKey: "agent:main:subagent:thread-bound", + agentChannel: "discord", + agentAccountId: "default", + agentTo: "channel:parent-channel", + }, + ); + + expect(result.status).toBe("accepted"); + expect(result.mode).toBe("run"); + expect(result.streamLogPath).toBeUndefined(); + expect(hoisted.startAcpSpawnParentStreamRelayMock).not.toHaveBeenCalled(); + }); + it("announces parent relay start only after successful child dispatch", async () => { const firstHandle = createRelayHandle(); const secondHandle = createRelayHandle(); diff --git a/src/agents/acp-spawn.ts b/src/agents/acp-spawn.ts index 5d305b25f27..9d68a234aea 100644 --- a/src/agents/acp-spawn.ts +++ b/src/agents/acp-spawn.ts @@ -10,6 +10,7 @@ import { resolveAcpThreadSessionDetailLines, } from "../acp/runtime/session-identifiers.js"; import type { AcpRuntimeSessionMode } from "../acp/runtime/types.js"; +import { DEFAULT_HEARTBEAT_EVERY } from "../auto-reply/heartbeat.js"; import { resolveThreadBindingIntroText, resolveThreadBindingThreadName, @@ -21,11 +22,13 @@ import { resolveThreadBindingMaxAgeMsForChannel, resolveThreadBindingSpawnPolicy, } from "../channels/thread-bindings-policy.js"; +import { parseDurationMs } from "../cli/parse-duration.js"; import { loadConfig } from "../config/config.js"; import type { OpenClawConfig } from "../config/config.js"; import { loadSessionStore, resolveStorePath, type SessionEntry } from "../config/sessions.js"; import { resolveSessionTranscriptFile } from "../config/sessions/transcript.js"; import { callGateway } from "../gateway/call.js"; +import { areHeartbeatsEnabled } from "../infra/heartbeat-wake.js"; import { resolveConversationIdFromTargets } from "../infra/outbound/conversation-id.js"; import { getSessionBindingService, @@ -33,13 +36,18 @@ import { type SessionBindingRecord, } from "../infra/outbound/session-binding-service.js"; import { createSubsystemLogger } from "../logging/subsystem.js"; -import { normalizeAgentId } from "../routing/session-key.js"; -import { normalizeDeliveryContext } from "../utils/delivery-context.js"; +import { + isSubagentSessionKey, + normalizeAgentId, + parseAgentSessionKey, +} from "../routing/session-key.js"; +import { deliveryContextFromSession, normalizeDeliveryContext } from "../utils/delivery-context.js"; import { type AcpSpawnParentRelayHandle, resolveAcpSpawnStreamLogPath, startAcpSpawnParentStreamRelay, } from "./acp-spawn-parent-stream.js"; +import { resolveAgentConfig, resolveDefaultAgentId } from "./agent-scope.js"; import { resolveSandboxRuntimeStatus } from "./sandbox/runtime-status.js"; import { resolveInternalSessionKey, resolveMainSessionAlias } from "./tools/sessions-helpers.js"; @@ -130,6 +138,95 @@ function resolveAcpSessionMode(mode: SpawnAcpMode): AcpRuntimeSessionMode { return mode === "session" ? "persistent" : "oneshot"; } +function isHeartbeatEnabledForSessionAgent(params: { + cfg: OpenClawConfig; + sessionKey?: string; +}): boolean { + if (!areHeartbeatsEnabled()) { + return false; + } + const requesterAgentId = parseAgentSessionKey(params.sessionKey)?.agentId; + if (!requesterAgentId) { + return true; + } + + const agentEntries = params.cfg.agents?.list ?? []; + const hasExplicitHeartbeatAgents = agentEntries.some((entry) => Boolean(entry?.heartbeat)); + const enabledByPolicy = hasExplicitHeartbeatAgents + ? agentEntries.some( + (entry) => Boolean(entry?.heartbeat) && normalizeAgentId(entry?.id) === requesterAgentId, + ) + : requesterAgentId === resolveDefaultAgentId(params.cfg); + if (!enabledByPolicy) { + return false; + } + + const heartbeatEvery = + resolveAgentConfig(params.cfg, requesterAgentId)?.heartbeat?.every ?? + params.cfg.agents?.defaults?.heartbeat?.every ?? + DEFAULT_HEARTBEAT_EVERY; + const trimmedEvery = typeof heartbeatEvery === "string" ? heartbeatEvery.trim() : ""; + if (!trimmedEvery) { + return false; + } + try { + return parseDurationMs(trimmedEvery, { defaultUnit: "m" }) > 0; + } catch { + return false; + } +} + +function resolveHeartbeatConfigForAgent(params: { + cfg: OpenClawConfig; + agentId: string; +}): NonNullable["defaults"]>["heartbeat"] { + const defaults = params.cfg.agents?.defaults?.heartbeat; + const overrides = resolveAgentConfig(params.cfg, params.agentId)?.heartbeat; + if (!defaults && !overrides) { + return undefined; + } + return { + ...defaults, + ...overrides, + }; +} + +function hasSessionLocalHeartbeatRelayRoute(params: { + cfg: OpenClawConfig; + parentSessionKey: string; + requesterAgentId: string; +}): boolean { + const scope = params.cfg.session?.scope ?? "per-sender"; + if (scope === "global") { + return false; + } + + const heartbeat = resolveHeartbeatConfigForAgent({ + cfg: params.cfg, + agentId: params.requesterAgentId, + }); + if ((heartbeat?.target ?? "none") !== "last") { + return false; + } + + // Explicit delivery overrides are not session-local and can route updates + // to unrelated destinations (for example a pinned ops channel). + if (typeof heartbeat?.to === "string" && heartbeat.to.trim().length > 0) { + return false; + } + if (typeof heartbeat?.accountId === "string" && heartbeat.accountId.trim().length > 0) { + return false; + } + + const storePath = resolveStorePath(params.cfg.session?.store, { + agentId: params.requesterAgentId, + }); + const sessionStore = loadSessionStore(storePath); + const parentEntry = sessionStore[params.parentSessionKey]; + const parentDeliveryContext = deliveryContextFromSession(parentEntry); + return Boolean(parentDeliveryContext?.channel && parentDeliveryContext.to); +} + function resolveTargetAcpAgentId(params: { requestedAgentId?: string; cfg: OpenClawConfig; @@ -326,6 +423,8 @@ export async function spawnAcpDirect( error: 'sessions_spawn streamTo="parent" requires an active requester session context.', }; } + + const requestThreadBinding = params.thread === true; const runtimePolicyError = resolveAcpSpawnRuntimePolicyError({ cfg, requesterSessionKey: ctx.agentSessionKey, @@ -339,7 +438,6 @@ export async function spawnAcpDirect( }; } - const requestThreadBinding = params.thread === true; const spawnMode = resolveSpawnMode({ requestedMode: params.mode, threadRequested: requestThreadBinding, @@ -351,6 +449,52 @@ export async function spawnAcpDirect( }; } + const bindingService = getSessionBindingService(); + const requesterParsedSession = parseAgentSessionKey(parentSessionKey); + const requesterIsSubagentSession = + Boolean(requesterParsedSession) && isSubagentSessionKey(parentSessionKey); + const requesterHasActiveSubagentBinding = + requesterIsSubagentSession && parentSessionKey + ? bindingService + .listBySession(parentSessionKey) + .some((record) => record.targetKind === "subagent" && record.status !== "ended") + : false; + const requesterHasThreadContext = + typeof ctx.agentThreadId === "string" + ? ctx.agentThreadId.trim().length > 0 + : ctx.agentThreadId != null; + const requesterHeartbeatEnabled = isHeartbeatEnabledForSessionAgent({ + cfg, + sessionKey: parentSessionKey, + }); + const requesterAgentId = requesterParsedSession?.agentId; + const requesterHeartbeatRelayRouteUsable = + parentSessionKey && requesterAgentId + ? hasSessionLocalHeartbeatRelayRoute({ + cfg, + parentSessionKey, + requesterAgentId, + }) + : false; + + // For mode=run without thread binding, implicitly route output to parent + // only for spawned subagent orchestrator sessions with heartbeat enabled + // AND a session-local heartbeat delivery route (target=last + usable last route). + // Skip requester sessions that are thread-bound (or carrying thread context) + // so user-facing threads do not receive unsolicited ACP progress chatter + // unless streamTo="parent" is explicitly requested. Use resolved spawnMode + // (not params.mode) so default mode selection works. + const implicitStreamToParent = + !streamToParentRequested && + spawnMode === "run" && + !requestThreadBinding && + requesterIsSubagentSession && + !requesterHasActiveSubagentBinding && + !requesterHasThreadContext && + requesterHeartbeatEnabled && + requesterHeartbeatRelayRouteUsable; + const effectiveStreamToParent = streamToParentRequested || implicitStreamToParent; + const targetAgentResult = resolveTargetAcpAgentId({ requestedAgentId: params.agentId, cfg, @@ -392,7 +536,6 @@ export async function spawnAcpDirect( } const acpManager = getAcpSessionManager(); - const bindingService = getSessionBindingService(); let binding: SessionBindingRecord | null = null; let sessionCreated = false; let initializedRuntime: AcpSpawnRuntimeCloseHandle | undefined; @@ -530,17 +673,17 @@ export async function spawnAcpDirect( // Fresh one-shot ACP runs should bootstrap the worker first, then let higher layers // decide how to relay status. Inline delivery is reserved for thread-bound sessions. const useInlineDelivery = - hasDeliveryTarget && spawnMode === "session" && !streamToParentRequested; + hasDeliveryTarget && spawnMode === "session" && !effectiveStreamToParent; const childIdem = crypto.randomUUID(); let childRunId: string = childIdem; const streamLogPath = - streamToParentRequested && parentSessionKey + effectiveStreamToParent && parentSessionKey ? resolveAcpSpawnStreamLogPath({ childSessionKey: sessionKey, }) : undefined; let parentRelay: AcpSpawnParentRelayHandle | undefined; - if (streamToParentRequested && parentSessionKey) { + if (effectiveStreamToParent && parentSessionKey) { // Register relay before dispatch so fast lifecycle failures are not missed. parentRelay = startAcpSpawnParentStreamRelay({ runId: childIdem, @@ -585,7 +728,7 @@ export async function spawnAcpDirect( }; } - if (streamToParentRequested && parentSessionKey) { + if (effectiveStreamToParent && parentSessionKey) { if (parentRelay && childRunId !== childIdem) { parentRelay.dispose(); // Defensive fallback if gateway returns a runId that differs from idempotency key. diff --git a/src/auto-reply/reply/agent-runner.runreplyagent.e2e.test.ts b/src/auto-reply/reply/agent-runner.runreplyagent.e2e.test.ts index 599a8fd6a48..6bebdc6a390 100644 --- a/src/auto-reply/reply/agent-runner.runreplyagent.e2e.test.ts +++ b/src/auto-reply/reply/agent-runner.runreplyagent.e2e.test.ts @@ -1255,6 +1255,79 @@ describe("runReplyAgent typing (heartbeat)", () => { }); }); + it("clears stale runtime model fields when resetSession retries after compaction failure", async () => { + await withTempStateDir(async (stateDir) => { + const sessionId = "session-stale-model"; + const storePath = path.join(stateDir, "sessions", "sessions.json"); + const transcriptPath = sessions.resolveSessionTranscriptPath(sessionId); + const sessionEntry: SessionEntry = { + sessionId, + updatedAt: Date.now(), + sessionFile: transcriptPath, + modelProvider: "qwencode", + model: "qwen3.5-plus-2026-02-15", + contextTokens: 123456, + systemPromptReport: { + source: "run", + generatedAt: Date.now(), + sessionId, + sessionKey: "main", + provider: "qwencode", + model: "qwen3.5-plus-2026-02-15", + workspaceDir: stateDir, + bootstrapMaxChars: 1000, + bootstrapTotalMaxChars: 2000, + systemPrompt: { + chars: 10, + projectContextChars: 5, + nonProjectContextChars: 5, + }, + injectedWorkspaceFiles: [], + skills: { + promptChars: 0, + entries: [], + }, + tools: { + listChars: 0, + schemaChars: 0, + entries: [], + }, + }, + }; + const sessionStore = { main: sessionEntry }; + + await fs.mkdir(path.dirname(storePath), { recursive: true }); + await fs.writeFile(storePath, JSON.stringify(sessionStore), "utf-8"); + await fs.mkdir(path.dirname(transcriptPath), { recursive: true }); + await fs.writeFile(transcriptPath, "ok", "utf-8"); + + state.runEmbeddedPiAgentMock.mockImplementationOnce(async () => { + throw new Error( + 'Context overflow: Summarization failed: 400 {"message":"prompt is too long"}', + ); + }); + + const { run } = createMinimalRun({ + sessionEntry, + sessionStore, + sessionKey: "main", + storePath, + }); + await run(); + + expect(sessionStore.main.modelProvider).toBeUndefined(); + expect(sessionStore.main.model).toBeUndefined(); + expect(sessionStore.main.contextTokens).toBeUndefined(); + expect(sessionStore.main.systemPromptReport).toBeUndefined(); + + const persisted = JSON.parse(await fs.readFile(storePath, "utf-8")); + expect(persisted.main.modelProvider).toBeUndefined(); + expect(persisted.main.model).toBeUndefined(); + expect(persisted.main.contextTokens).toBeUndefined(); + expect(persisted.main.systemPromptReport).toBeUndefined(); + }); + }); + it("surfaces overflow fallback when embedded run returns empty payloads", async () => { state.runEmbeddedPiAgentMock.mockImplementationOnce(async () => ({ payloads: [], diff --git a/src/auto-reply/reply/agent-runner.ts b/src/auto-reply/reply/agent-runner.ts index b6dcd7dcd91..edc441a2552 100644 --- a/src/auto-reply/reply/agent-runner.ts +++ b/src/auto-reply/reply/agent-runner.ts @@ -278,6 +278,10 @@ export async function runReplyAgent(params: { updatedAt: Date.now(), systemSent: false, abortedLastRun: false, + modelProvider: undefined, + model: undefined, + contextTokens: undefined, + systemPromptReport: undefined, fallbackNoticeSelectedModel: undefined, fallbackNoticeActiveModel: undefined, fallbackNoticeReason: undefined, diff --git a/src/channels/plugins/outbound/direct-text-media.sendpayload.test.ts b/src/channels/plugins/outbound/direct-text-media.sendpayload.test.ts index 0e5c2ba01db..42971f1e89c 100644 --- a/src/channels/plugins/outbound/direct-text-media.sendpayload.test.ts +++ b/src/channels/plugins/outbound/direct-text-media.sendpayload.test.ts @@ -1,9 +1,17 @@ -import { describe, expect, it, vi } from "vitest"; +import { describe, vi } from "vitest"; import type { ReplyPayload } from "../../../auto-reply/types.js"; +import { + installSendPayloadContractSuite, + primeSendMock, +} from "../../../test-utils/send-payload-contract.js"; import { createDirectTextMediaOutbound } from "./direct-text-media.js"; -function makeOutbound() { - const sendFn = vi.fn().mockResolvedValue({ messageId: "m1" }); +function createDirectHarness(params: { + payload: ReplyPayload; + sendResults?: Array<{ messageId: string }>; +}) { + const sendFn = vi.fn(); + primeSendMock(sendFn, { messageId: "m1" }, params.sendResults); const outbound = createDirectTextMediaOutbound({ channel: "imessage", resolveSender: () => sendFn, @@ -24,94 +32,16 @@ function baseCtx(payload: ReplyPayload) { } describe("createDirectTextMediaOutbound sendPayload", () => { - it("text-only delegates to sendText", async () => { - const { outbound, sendFn } = makeOutbound(); - const result = await outbound.sendPayload!(baseCtx({ text: "hello" })); - - expect(sendFn).toHaveBeenCalledTimes(1); - expect(sendFn).toHaveBeenCalledWith("user1", "hello", expect.any(Object)); - expect(result).toMatchObject({ channel: "imessage", messageId: "m1" }); - }); - - it("single media delegates to sendMedia", async () => { - const { outbound, sendFn } = makeOutbound(); - const result = await outbound.sendPayload!( - baseCtx({ text: "cap", mediaUrl: "https://example.com/a.jpg" }), - ); - - expect(sendFn).toHaveBeenCalledTimes(1); - expect(sendFn).toHaveBeenCalledWith( - "user1", - "cap", - expect.objectContaining({ mediaUrl: "https://example.com/a.jpg" }), - ); - expect(result).toMatchObject({ channel: "imessage", messageId: "m1" }); - }); - - it("multi-media iterates URLs with caption on first", async () => { - const sendFn = vi - .fn() - .mockResolvedValueOnce({ messageId: "m1" }) - .mockResolvedValueOnce({ messageId: "m2" }); - const outbound = createDirectTextMediaOutbound({ - channel: "imessage", - resolveSender: () => sendFn, - resolveMaxBytes: () => undefined, - buildTextOptions: (opts) => opts as never, - buildMediaOptions: (opts) => opts as never, - }); - const result = await outbound.sendPayload!( - baseCtx({ - text: "caption", - mediaUrls: ["https://example.com/1.jpg", "https://example.com/2.jpg"], - }), - ); - - expect(sendFn).toHaveBeenCalledTimes(2); - expect(sendFn).toHaveBeenNthCalledWith( - 1, - "user1", - "caption", - expect.objectContaining({ mediaUrl: "https://example.com/1.jpg" }), - ); - expect(sendFn).toHaveBeenNthCalledWith( - 2, - "user1", - "", - expect.objectContaining({ mediaUrl: "https://example.com/2.jpg" }), - ); - expect(result).toMatchObject({ channel: "imessage", messageId: "m2" }); - }); - - it("empty payload returns no-op", async () => { - const { outbound, sendFn } = makeOutbound(); - const result = await outbound.sendPayload!(baseCtx({})); - - expect(sendFn).not.toHaveBeenCalled(); - expect(result).toEqual({ channel: "imessage", messageId: "" }); - }); - - it("chunking splits long text", async () => { - const sendFn = vi - .fn() - .mockResolvedValueOnce({ messageId: "c1" }) - .mockResolvedValueOnce({ messageId: "c2" }); - const outbound = createDirectTextMediaOutbound({ - channel: "signal", - resolveSender: () => sendFn, - resolveMaxBytes: () => undefined, - buildTextOptions: (opts) => opts as never, - buildMediaOptions: (opts) => opts as never, - }); - // textChunkLimit is 4000; generate text exceeding that - const longText = "a".repeat(5000); - const result = await outbound.sendPayload!(baseCtx({ text: longText })); - - expect(sendFn.mock.calls.length).toBeGreaterThanOrEqual(2); - // Each chunk should be within the limit - for (const call of sendFn.mock.calls) { - expect((call[1] as string).length).toBeLessThanOrEqual(4000); - } - expect(result).toMatchObject({ channel: "signal" }); + installSendPayloadContractSuite({ + channel: "imessage", + chunking: { mode: "split", longTextLength: 5000, maxChunkLength: 4000 }, + createHarness: ({ payload, sendResults }) => { + const { outbound, sendFn } = createDirectHarness({ payload, sendResults }); + return { + run: async () => await outbound.sendPayload!(baseCtx(payload)), + sendMock: sendFn, + to: "user1", + }; + }, }); }); diff --git a/src/channels/plugins/outbound/discord.sendpayload.test.ts b/src/channels/plugins/outbound/discord.sendpayload.test.ts index 07c821d6e79..168f8d8d927 100644 --- a/src/channels/plugins/outbound/discord.sendpayload.test.ts +++ b/src/channels/plugins/outbound/discord.sendpayload.test.ts @@ -1,98 +1,37 @@ -import { describe, expect, it, vi } from "vitest"; +import { describe, vi } from "vitest"; import type { ReplyPayload } from "../../../auto-reply/types.js"; +import { + installSendPayloadContractSuite, + primeSendMock, +} from "../../../test-utils/send-payload-contract.js"; import { discordOutbound } from "./discord.js"; -function baseCtx(payload: ReplyPayload) { - return { +function createHarness(params: { + payload: ReplyPayload; + sendResults?: Array<{ messageId: string }>; +}) { + const sendDiscord = vi.fn(); + primeSendMock(sendDiscord, { messageId: "dc-1", channelId: "123456" }, params.sendResults); + const ctx = { cfg: {}, to: "channel:123456", text: "", - payload, + payload: params.payload, deps: { - sendDiscord: vi.fn().mockResolvedValue({ messageId: "dc-1", channelId: "123456" }), + sendDiscord, }, }; + return { + run: async () => await discordOutbound.sendPayload!(ctx), + sendMock: sendDiscord, + to: ctx.to, + }; } describe("discordOutbound sendPayload", () => { - it("text-only delegates to sendText", async () => { - const ctx = baseCtx({ text: "hello" }); - const result = await discordOutbound.sendPayload!(ctx); - - expect(ctx.deps.sendDiscord).toHaveBeenCalledTimes(1); - expect(ctx.deps.sendDiscord).toHaveBeenCalledWith( - "channel:123456", - "hello", - expect.any(Object), - ); - expect(result).toMatchObject({ channel: "discord" }); - }); - - it("single media delegates to sendMedia", async () => { - const ctx = baseCtx({ text: "cap", mediaUrl: "https://example.com/a.jpg" }); - const result = await discordOutbound.sendPayload!(ctx); - - expect(ctx.deps.sendDiscord).toHaveBeenCalledTimes(1); - expect(ctx.deps.sendDiscord).toHaveBeenCalledWith( - "channel:123456", - "cap", - expect.objectContaining({ mediaUrl: "https://example.com/a.jpg" }), - ); - expect(result).toMatchObject({ channel: "discord" }); - }); - - it("multi-media iterates URLs with caption on first", async () => { - const sendDiscord = vi - .fn() - .mockResolvedValueOnce({ messageId: "dc-1", channelId: "123456" }) - .mockResolvedValueOnce({ messageId: "dc-2", channelId: "123456" }); - const ctx = { - cfg: {}, - to: "channel:123456", - text: "", - payload: { - text: "caption", - mediaUrls: ["https://example.com/1.jpg", "https://example.com/2.jpg"], - } as ReplyPayload, - deps: { sendDiscord }, - }; - const result = await discordOutbound.sendPayload!(ctx); - - expect(sendDiscord).toHaveBeenCalledTimes(2); - expect(sendDiscord).toHaveBeenNthCalledWith( - 1, - "channel:123456", - "caption", - expect.objectContaining({ mediaUrl: "https://example.com/1.jpg" }), - ); - expect(sendDiscord).toHaveBeenNthCalledWith( - 2, - "channel:123456", - "", - expect.objectContaining({ mediaUrl: "https://example.com/2.jpg" }), - ); - expect(result).toMatchObject({ channel: "discord", messageId: "dc-2" }); - }); - - it("empty payload returns no-op", async () => { - const ctx = baseCtx({}); - const result = await discordOutbound.sendPayload!(ctx); - - expect(ctx.deps.sendDiscord).not.toHaveBeenCalled(); - expect(result).toEqual({ channel: "discord", messageId: "" }); - }); - - it("text exceeding chunk limit is sent as-is when chunker is null", async () => { - // Discord has chunker: null, so long text should be sent as a single message - const ctx = baseCtx({ text: "a".repeat(3000) }); - const result = await discordOutbound.sendPayload!(ctx); - - expect(ctx.deps.sendDiscord).toHaveBeenCalledTimes(1); - expect(ctx.deps.sendDiscord).toHaveBeenCalledWith( - "channel:123456", - "a".repeat(3000), - expect.any(Object), - ); - expect(result).toMatchObject({ channel: "discord" }); + installSendPayloadContractSuite({ + channel: "discord", + chunking: { mode: "passthrough", longTextLength: 3000 }, + createHarness, }); }); diff --git a/src/channels/plugins/outbound/slack.sendpayload.test.ts b/src/channels/plugins/outbound/slack.sendpayload.test.ts index c6df264df12..374c9881a73 100644 --- a/src/channels/plugins/outbound/slack.sendpayload.test.ts +++ b/src/channels/plugins/outbound/slack.sendpayload.test.ts @@ -1,92 +1,41 @@ -import { describe, expect, it, vi } from "vitest"; +import { describe, vi } from "vitest"; import type { ReplyPayload } from "../../../auto-reply/types.js"; +import { + installSendPayloadContractSuite, + primeSendMock, +} from "../../../test-utils/send-payload-contract.js"; import { slackOutbound } from "./slack.js"; -function baseCtx(payload: ReplyPayload) { - return { +function createHarness(params: { + payload: ReplyPayload; + sendResults?: Array<{ messageId: string }>; +}) { + const sendSlack = vi.fn(); + primeSendMock( + sendSlack, + { messageId: "sl-1", channelId: "C12345", ts: "1234.5678" }, + params.sendResults, + ); + const ctx = { cfg: {}, to: "C12345", text: "", - payload, + payload: params.payload, deps: { - sendSlack: vi - .fn() - .mockResolvedValue({ messageId: "sl-1", channelId: "C12345", ts: "1234.5678" }), + sendSlack, }, }; + return { + run: async () => await slackOutbound.sendPayload!(ctx), + sendMock: sendSlack, + to: ctx.to, + }; } describe("slackOutbound sendPayload", () => { - it("text-only delegates to sendText", async () => { - const ctx = baseCtx({ text: "hello" }); - const result = await slackOutbound.sendPayload!(ctx); - - expect(ctx.deps.sendSlack).toHaveBeenCalledTimes(1); - expect(ctx.deps.sendSlack).toHaveBeenCalledWith("C12345", "hello", expect.any(Object)); - expect(result).toMatchObject({ channel: "slack" }); - }); - - it("single media delegates to sendMedia", async () => { - const ctx = baseCtx({ text: "cap", mediaUrl: "https://example.com/a.jpg" }); - const result = await slackOutbound.sendPayload!(ctx); - - expect(ctx.deps.sendSlack).toHaveBeenCalledTimes(1); - expect(ctx.deps.sendSlack).toHaveBeenCalledWith( - "C12345", - "cap", - expect.objectContaining({ mediaUrl: "https://example.com/a.jpg" }), - ); - expect(result).toMatchObject({ channel: "slack" }); - }); - - it("multi-media iterates URLs with caption on first", async () => { - const sendSlack = vi - .fn() - .mockResolvedValueOnce({ messageId: "sl-1", channelId: "C12345" }) - .mockResolvedValueOnce({ messageId: "sl-2", channelId: "C12345" }); - const ctx = { - cfg: {}, - to: "C12345", - text: "", - payload: { - text: "caption", - mediaUrls: ["https://example.com/1.jpg", "https://example.com/2.jpg"], - } as ReplyPayload, - deps: { sendSlack }, - }; - const result = await slackOutbound.sendPayload!(ctx); - - expect(sendSlack).toHaveBeenCalledTimes(2); - expect(sendSlack).toHaveBeenNthCalledWith( - 1, - "C12345", - "caption", - expect.objectContaining({ mediaUrl: "https://example.com/1.jpg" }), - ); - expect(sendSlack).toHaveBeenNthCalledWith( - 2, - "C12345", - "", - expect.objectContaining({ mediaUrl: "https://example.com/2.jpg" }), - ); - expect(result).toMatchObject({ channel: "slack", messageId: "sl-2" }); - }); - - it("empty payload returns no-op", async () => { - const ctx = baseCtx({}); - const result = await slackOutbound.sendPayload!(ctx); - - expect(ctx.deps.sendSlack).not.toHaveBeenCalled(); - expect(result).toEqual({ channel: "slack", messageId: "" }); - }); - - it("text exceeding chunk limit is sent as-is when chunker is null", async () => { - // Slack has chunker: null, so long text should be sent as a single message - const ctx = baseCtx({ text: "a".repeat(5000) }); - const result = await slackOutbound.sendPayload!(ctx); - - expect(ctx.deps.sendSlack).toHaveBeenCalledTimes(1); - expect(ctx.deps.sendSlack).toHaveBeenCalledWith("C12345", "a".repeat(5000), expect.any(Object)); - expect(result).toMatchObject({ channel: "slack" }); + installSendPayloadContractSuite({ + channel: "slack", + chunking: { mode: "passthrough", longTextLength: 5000 }, + createHarness, }); }); diff --git a/src/channels/plugins/outbound/whatsapp.sendpayload.test.ts b/src/channels/plugins/outbound/whatsapp.sendpayload.test.ts index 3eb6f7467dc..e98351cfa61 100644 --- a/src/channels/plugins/outbound/whatsapp.sendpayload.test.ts +++ b/src/channels/plugins/outbound/whatsapp.sendpayload.test.ts @@ -1,106 +1,37 @@ -import { describe, expect, it, vi } from "vitest"; +import { describe, vi } from "vitest"; import type { ReplyPayload } from "../../../auto-reply/types.js"; +import { + installSendPayloadContractSuite, + primeSendMock, +} from "../../../test-utils/send-payload-contract.js"; import { whatsappOutbound } from "./whatsapp.js"; -function baseCtx(payload: ReplyPayload) { - return { +function createHarness(params: { + payload: ReplyPayload; + sendResults?: Array<{ messageId: string }>; +}) { + const sendWhatsApp = vi.fn(); + primeSendMock(sendWhatsApp, { messageId: "wa-1" }, params.sendResults); + const ctx = { cfg: {}, to: "5511999999999@c.us", text: "", - payload, + payload: params.payload, deps: { - sendWhatsApp: vi.fn().mockResolvedValue({ messageId: "wa-1" }), + sendWhatsApp, }, }; + return { + run: async () => await whatsappOutbound.sendPayload!(ctx), + sendMock: sendWhatsApp, + to: ctx.to, + }; } describe("whatsappOutbound sendPayload", () => { - it("text-only delegates to sendText", async () => { - const ctx = baseCtx({ text: "hello" }); - const result = await whatsappOutbound.sendPayload!(ctx); - - expect(ctx.deps.sendWhatsApp).toHaveBeenCalledTimes(1); - expect(ctx.deps.sendWhatsApp).toHaveBeenCalledWith( - "5511999999999@c.us", - "hello", - expect.any(Object), - ); - expect(result).toMatchObject({ channel: "whatsapp", messageId: "wa-1" }); - }); - - it("single media delegates to sendMedia", async () => { - const ctx = baseCtx({ text: "cap", mediaUrl: "https://example.com/a.jpg" }); - const result = await whatsappOutbound.sendPayload!(ctx); - - expect(ctx.deps.sendWhatsApp).toHaveBeenCalledTimes(1); - expect(ctx.deps.sendWhatsApp).toHaveBeenCalledWith( - "5511999999999@c.us", - "cap", - expect.objectContaining({ mediaUrl: "https://example.com/a.jpg" }), - ); - expect(result).toMatchObject({ channel: "whatsapp" }); - }); - - it("multi-media iterates URLs with caption on first", async () => { - const sendWhatsApp = vi - .fn() - .mockResolvedValueOnce({ messageId: "wa-1" }) - .mockResolvedValueOnce({ messageId: "wa-2" }); - const ctx = { - cfg: {}, - to: "5511999999999@c.us", - text: "", - payload: { - text: "caption", - mediaUrls: ["https://example.com/1.jpg", "https://example.com/2.jpg"], - } as ReplyPayload, - deps: { sendWhatsApp }, - }; - const result = await whatsappOutbound.sendPayload!(ctx); - - expect(sendWhatsApp).toHaveBeenCalledTimes(2); - expect(sendWhatsApp).toHaveBeenNthCalledWith( - 1, - "5511999999999@c.us", - "caption", - expect.objectContaining({ mediaUrl: "https://example.com/1.jpg" }), - ); - expect(sendWhatsApp).toHaveBeenNthCalledWith( - 2, - "5511999999999@c.us", - "", - expect.objectContaining({ mediaUrl: "https://example.com/2.jpg" }), - ); - expect(result).toMatchObject({ channel: "whatsapp", messageId: "wa-2" }); - }); - - it("empty payload returns no-op", async () => { - const ctx = baseCtx({}); - const result = await whatsappOutbound.sendPayload!(ctx); - - expect(ctx.deps.sendWhatsApp).not.toHaveBeenCalled(); - expect(result).toEqual({ channel: "whatsapp", messageId: "" }); - }); - - it("chunking splits long text", async () => { - const sendWhatsApp = vi - .fn() - .mockResolvedValueOnce({ messageId: "wa-c1" }) - .mockResolvedValueOnce({ messageId: "wa-c2" }); - const longText = "a".repeat(5000); - const ctx = { - cfg: {}, - to: "5511999999999@c.us", - text: "", - payload: { text: longText } as ReplyPayload, - deps: { sendWhatsApp }, - }; - const result = await whatsappOutbound.sendPayload!(ctx); - - expect(sendWhatsApp.mock.calls.length).toBeGreaterThanOrEqual(2); - for (const call of sendWhatsApp.mock.calls) { - expect((call[1] as string).length).toBeLessThanOrEqual(4000); - } - expect(result).toMatchObject({ channel: "whatsapp" }); + installSendPayloadContractSuite({ + channel: "whatsapp", + chunking: { mode: "split", longTextLength: 5000, maxChunkLength: 4000 }, + createHarness, }); }); diff --git a/src/cli/acp-cli.option-collisions.test.ts b/src/cli/acp-cli.option-collisions.test.ts index 131db6a67cb..068f415de79 100644 --- a/src/cli/acp-cli.option-collisions.test.ts +++ b/src/cli/acp-cli.option-collisions.test.ts @@ -1,9 +1,7 @@ -import fs from "node:fs/promises"; -import os from "node:os"; -import path from "node:path"; import { Command } from "commander"; import { beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; import { runRegisteredCli } from "../test-utils/command-runner.js"; +import { withTempSecretFiles } from "../test-utils/secret-file-fixture.js"; const runAcpClientInteractive = vi.fn(async (_opts: unknown) => {}); const serveAcpGateway = vi.fn(async (_opts: unknown) => {}); @@ -30,27 +28,6 @@ vi.mock("../runtime.js", () => ({ describe("acp cli option collisions", () => { let registerAcpCli: typeof import("./acp-cli.js").registerAcpCli; - async function withSecretFiles( - secrets: { token?: string; password?: string }, - run: (files: { tokenFile?: string; passwordFile?: string }) => Promise, - ): Promise { - const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-acp-cli-")); - try { - const files: { tokenFile?: string; passwordFile?: string } = {}; - if (secrets.token !== undefined) { - files.tokenFile = path.join(dir, "token.txt"); - await fs.writeFile(files.tokenFile, secrets.token, "utf8"); - } - if (secrets.password !== undefined) { - files.passwordFile = path.join(dir, "password.txt"); - await fs.writeFile(files.passwordFile, secrets.password, "utf8"); - } - return await run(files); - } finally { - await fs.rm(dir, { recursive: true, force: true }); - } - } - function createAcpProgram() { const program = new Command(); registerAcpCli(program); @@ -93,15 +70,19 @@ describe("acp cli option collisions", () => { }); it("loads gateway token/password from files", async () => { - await withSecretFiles({ token: "tok_file\n", [passwordKey()]: "pw_file\n" }, async (files) => { - // pragma: allowlist secret - await parseAcp([ - "--token-file", - files.tokenFile ?? "", - "--password-file", - files.passwordFile ?? "", - ]); - }); + await withTempSecretFiles( + "openclaw-acp-cli-", + { token: "tok_file\n", [passwordKey()]: "pw_file\n" }, + async (files) => { + // pragma: allowlist secret + await parseAcp([ + "--token-file", + files.tokenFile ?? "", + "--password-file", + files.passwordFile ?? "", + ]); + }, + ); expect(serveAcpGateway).toHaveBeenCalledWith( expect.objectContaining({ @@ -111,21 +92,30 @@ describe("acp cli option collisions", () => { ); }); - it("rejects mixed secret flags and file flags", async () => { - await withSecretFiles({ token: "tok_file\n" }, async (files) => { - await parseAcp(["--token", "tok_inline", "--token-file", files.tokenFile ?? ""]); + it.each([ + { + name: "rejects mixed secret flags and file flags", + files: { token: "tok_file\n" }, + args: (tokenFile: string) => ["--token", "tok_inline", "--token-file", tokenFile], + expected: /Use either --token or --token-file/, + }, + { + name: "rejects mixed password flags and file flags", + files: { password: "pw_file\n" }, // pragma: allowlist secret + args: (_tokenFile: string, passwordFile: string) => [ + "--password", + "pw_inline", + "--password-file", + passwordFile, + ], + expected: /Use either --password or --password-file/, + }, + ])("$name", async ({ files, args, expected }) => { + await withTempSecretFiles("openclaw-acp-cli-", files, async ({ tokenFile, passwordFile }) => { + await parseAcp(args(tokenFile ?? "", passwordFile ?? "")); }); - expectCliError(/Use either --token or --token-file/); - }); - - it("rejects mixed password flags and file flags", async () => { - const passwordFileValue = "pw_file\n"; // pragma: allowlist secret - await withSecretFiles({ password: passwordFileValue }, async (files) => { - await parseAcp(["--password", "pw_inline", "--password-file", files.passwordFile ?? ""]); - }); - - expectCliError(/Use either --password or --password-file/); + expectCliError(expected); }); it("warns when inline secret flags are used", async () => { @@ -140,7 +130,7 @@ describe("acp cli option collisions", () => { }); it("trims token file path before reading", async () => { - await withSecretFiles({ token: "tok_file\n" }, async (files) => { + await withTempSecretFiles("openclaw-acp-cli-", { token: "tok_file\n" }, async (files) => { await parseAcp(["--token-file", ` ${files.tokenFile ?? ""} `]); }); diff --git a/src/cli/daemon-cli/register-service-commands.test.ts b/src/cli/daemon-cli/register-service-commands.test.ts index cec45d62769..e249b00c835 100644 --- a/src/cli/daemon-cli/register-service-commands.test.ts +++ b/src/cli/daemon-cli/register-service-commands.test.ts @@ -39,34 +39,37 @@ describe("addGatewayServiceCommands", () => { runDaemonUninstall.mockClear(); }); - it("forwards install option collisions from parent gateway command", async () => { + it.each([ + { + name: "forwards install option collisions from parent gateway command", + argv: ["install", "--force", "--port", "19000", "--token", "tok_test"], + assert: () => { + expect(runDaemonInstall).toHaveBeenCalledWith( + expect.objectContaining({ + force: true, + port: "19000", + token: "tok_test", + }), + ); + }, + }, + { + name: "forwards status auth collisions from parent gateway command", + argv: ["status", "--token", "tok_status", "--password", "pw_status"], + assert: () => { + expect(runDaemonStatus).toHaveBeenCalledWith( + expect.objectContaining({ + rpc: expect.objectContaining({ + token: "tok_status", + password: "pw_status", // pragma: allowlist secret + }), + }), + ); + }, + }, + ])("$name", async ({ argv, assert }) => { const gateway = createGatewayParentLikeCommand(); - await gateway.parseAsync(["install", "--force", "--port", "19000", "--token", "tok_test"], { - from: "user", - }); - - expect(runDaemonInstall).toHaveBeenCalledWith( - expect.objectContaining({ - force: true, - port: "19000", - token: "tok_test", - }), - ); - }); - - it("forwards status auth collisions from parent gateway command", async () => { - const gateway = createGatewayParentLikeCommand(); - await gateway.parseAsync(["status", "--token", "tok_status", "--password", "pw_status"], { - from: "user", - }); - - expect(runDaemonStatus).toHaveBeenCalledWith( - expect.objectContaining({ - rpc: expect.objectContaining({ - token: "tok_status", - password: "pw_status", // pragma: allowlist secret - }), - }), - ); + await gateway.parseAsync(argv, { from: "user" }); + assert(); }); }); diff --git a/src/cli/gateway-cli/register.option-collisions.test.ts b/src/cli/gateway-cli/register.option-collisions.test.ts index 1ef5ba2c238..665886c76eb 100644 --- a/src/cli/gateway-cli/register.option-collisions.test.ts +++ b/src/cli/gateway-cli/register.option-collisions.test.ts @@ -128,30 +128,34 @@ describe("gateway register option collisions", () => { gatewayStatusCommand.mockClear(); }); - it("forwards --token to gateway call when parent and child option names collide", async () => { - await sharedProgram.parseAsync(["gateway", "call", "health", "--token", "tok_call", "--json"], { - from: "user", - }); - - expect(callGatewayCli).toHaveBeenCalledWith( - "health", - expect.objectContaining({ - token: "tok_call", - }), - {}, - ); - }); - - it("forwards --token to gateway probe when parent and child option names collide", async () => { - await sharedProgram.parseAsync(["gateway", "probe", "--token", "tok_probe", "--json"], { - from: "user", - }); - - expect(gatewayStatusCommand).toHaveBeenCalledWith( - expect.objectContaining({ - token: "tok_probe", - }), - defaultRuntime, - ); + it.each([ + { + name: "forwards --token to gateway call when parent and child option names collide", + argv: ["gateway", "call", "health", "--token", "tok_call", "--json"], + assert: () => { + expect(callGatewayCli).toHaveBeenCalledWith( + "health", + expect.objectContaining({ + token: "tok_call", + }), + {}, + ); + }, + }, + { + name: "forwards --token to gateway probe when parent and child option names collide", + argv: ["gateway", "probe", "--token", "tok_probe", "--json"], + assert: () => { + expect(gatewayStatusCommand).toHaveBeenCalledWith( + expect.objectContaining({ + token: "tok_probe", + }), + defaultRuntime, + ); + }, + }, + ])("$name", async ({ argv, assert }) => { + await sharedProgram.parseAsync(argv, { from: "user" }); + assert(); }); }); diff --git a/src/cli/gateway-cli/run.option-collisions.test.ts b/src/cli/gateway-cli/run.option-collisions.test.ts index 3a1f8bf57c7..a896a7a3f76 100644 --- a/src/cli/gateway-cli/run.option-collisions.test.ts +++ b/src/cli/gateway-cli/run.option-collisions.test.ts @@ -1,8 +1,6 @@ -import fs from "node:fs/promises"; -import os from "node:os"; -import path from "node:path"; import { Command } from "commander"; import { beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; +import { withTempSecretFiles } from "../../test-utils/secret-file-fixture.js"; import { createCliRuntimeCapture } from "../test-runtime-capture.js"; const startGatewayServer = vi.fn(async (_port: number, _opts?: unknown) => ({ @@ -195,16 +193,10 @@ describe("gateway run option collisions", () => { ); }); - it("accepts --auth none override", async () => { - await runGatewayCli(["gateway", "run", "--auth", "none", "--allow-unconfigured"]); + it.each(["none", "trusted-proxy"] as const)("accepts --auth %s override", async (mode) => { + await runGatewayCli(["gateway", "run", "--auth", mode, "--allow-unconfigured"]); - expectAuthOverrideMode("none"); - }); - - it("accepts --auth trusted-proxy override", async () => { - await runGatewayCli(["gateway", "run", "--auth", "trusted-proxy", "--allow-unconfigured"]); - - expectAuthOverrideMode("trusted-proxy"); + expectAuthOverrideMode(mode); }); it("prints all supported modes on invalid --auth value", async () => { @@ -244,36 +236,34 @@ describe("gateway run option collisions", () => { }); it("reads gateway password from --password-file", async () => { - const tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-gateway-run-")); - try { - const passwordFile = path.join(tempDir, "gateway-password.txt"); - await fs.writeFile(passwordFile, "pw_from_file\n", "utf8"); + await withTempSecretFiles( + "openclaw-gateway-run-", + { password: "pw_from_file\n" }, + async ({ passwordFile }) => { + await runGatewayCli([ + "gateway", + "run", + "--auth", + "password", + "--password-file", + passwordFile ?? "", + "--allow-unconfigured", + ]); + }, + ); - await runGatewayCli([ - "gateway", - "run", - "--auth", - "password", - "--password-file", - passwordFile, - "--allow-unconfigured", - ]); - - expect(startGatewayServer).toHaveBeenCalledWith( - 18789, - expect.objectContaining({ - auth: expect.objectContaining({ - mode: "password", - password: "pw_from_file", // pragma: allowlist secret - }), + expect(startGatewayServer).toHaveBeenCalledWith( + 18789, + expect.objectContaining({ + auth: expect.objectContaining({ + mode: "password", + password: "pw_from_file", // pragma: allowlist secret }), - ); - expect(runtimeErrors).not.toContain( - "Warning: --password can be exposed via process listings. Prefer --password-file or OPENCLAW_GATEWAY_PASSWORD.", - ); - } finally { - await fs.rm(tempDir, { recursive: true, force: true }); - } + }), + ); + expect(runtimeErrors).not.toContain( + "Warning: --password can be exposed via process listings. Prefer --password-file or OPENCLAW_GATEWAY_PASSWORD.", + ); }); it("warns when gateway password is passed inline", async () => { @@ -293,26 +283,24 @@ describe("gateway run option collisions", () => { }); it("rejects using both --password and --password-file", async () => { - const tempDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-gateway-run-")); - try { - const passwordFile = path.join(tempDir, "gateway-password.txt"); - await fs.writeFile(passwordFile, "pw_from_file\n", "utf8"); + await withTempSecretFiles( + "openclaw-gateway-run-", + { password: "pw_from_file\n" }, + async ({ passwordFile }) => { + await expect( + runGatewayCli([ + "gateway", + "run", + "--password", + "pw_inline", + "--password-file", + passwordFile ?? "", + "--allow-unconfigured", + ]), + ).rejects.toThrow("__exit__:1"); + }, + ); - await expect( - runGatewayCli([ - "gateway", - "run", - "--password", - "pw_inline", - "--password-file", - passwordFile, - "--allow-unconfigured", - ]), - ).rejects.toThrow("__exit__:1"); - - expect(runtimeErrors).toContain("Use either --password or --password-file."); - } finally { - await fs.rm(tempDir, { recursive: true, force: true }); - } + expect(runtimeErrors).toContain("Use either --password or --password-file."); }); }); diff --git a/src/cli/update-cli.option-collisions.test.ts b/src/cli/update-cli.option-collisions.test.ts index c0dd2d88404..6db4cfdd260 100644 --- a/src/cli/update-cli.option-collisions.test.ts +++ b/src/cli/update-cli.option-collisions.test.ts @@ -44,30 +44,36 @@ describe("update cli option collisions", () => { defaultRuntime.exit.mockClear(); }); - it("forwards parent-captured --json/--timeout to `update status`", async () => { - await runRegisteredCli({ - register: registerUpdateCli as (program: Command) => void, + it.each([ + { + name: "forwards parent-captured --json/--timeout to `update status`", argv: ["update", "status", "--json", "--timeout", "9"], - }); - - expect(updateStatusCommand).toHaveBeenCalledWith( - expect.objectContaining({ - json: true, - timeout: "9", - }), - ); - }); - - it("forwards parent-captured --timeout to `update wizard`", async () => { + assert: () => { + expect(updateStatusCommand).toHaveBeenCalledWith( + expect.objectContaining({ + json: true, + timeout: "9", + }), + ); + }, + }, + { + name: "forwards parent-captured --timeout to `update wizard`", + argv: ["update", "wizard", "--timeout", "13"], + assert: () => { + expect(updateWizardCommand).toHaveBeenCalledWith( + expect.objectContaining({ + timeout: "13", + }), + ); + }, + }, + ])("$name", async ({ argv, assert }) => { await runRegisteredCli({ register: registerUpdateCli as (program: Command) => void, - argv: ["update", "wizard", "--timeout", "13"], + argv, }); - expect(updateWizardCommand).toHaveBeenCalledWith( - expect.objectContaining({ - timeout: "13", - }), - ); + assert(); }); }); diff --git a/src/gateway/server-methods/sessions.ts b/src/gateway/server-methods/sessions.ts index bd8f6b57ac2..83bf3057278 100644 --- a/src/gateway/server-methods/sessions.ts +++ b/src/gateway/server-methods/sessions.ts @@ -128,6 +128,19 @@ function migrateAndPruneSessionStoreKey(params: { return { target, primaryKey, entry: params.store[primaryKey] }; } +function stripRuntimeModelState(entry?: SessionEntry): SessionEntry | undefined { + if (!entry) { + return entry; + } + return { + ...entry, + model: undefined, + modelProvider: undefined, + contextTokens: undefined, + systemPromptReport: undefined, + }; +} + function archiveSessionTranscriptsForSession(params: { sessionId: string | undefined; storePath: string; @@ -507,9 +520,10 @@ export const sessionsHandlers: GatewayRequestHandlers = { const next = await updateSessionStore(storePath, (store) => { const { primaryKey } = migrateAndPruneSessionStoreKey({ cfg, key, store }); const entry = store[primaryKey]; + const resetEntry = stripRuntimeModelState(entry); const parsed = parseAgentSessionKey(primaryKey); const sessionAgentId = normalizeAgentId(parsed?.agentId ?? resolveDefaultAgentId(cfg)); - const resolvedModel = resolveSessionModelRef(cfg, entry, sessionAgentId); + const resolvedModel = resolveSessionModelRef(cfg, resetEntry, sessionAgentId); oldSessionId = entry?.sessionId; oldSessionFile = entry?.sessionFile; const now = Date.now(); @@ -524,7 +538,7 @@ export const sessionsHandlers: GatewayRequestHandlers = { responseUsage: entry?.responseUsage, model: resolvedModel.model, modelProvider: resolvedModel.provider, - contextTokens: entry?.contextTokens, + contextTokens: resetEntry?.contextTokens, sendPolicy: entry?.sendPolicy, label: entry?.label, origin: snapshotSessionOrigin(entry), diff --git a/src/gateway/server.sessions.gateway-server-sessions-a.test.ts b/src/gateway/server.sessions.gateway-server-sessions-a.test.ts index f986d49c648..1decc4b9178 100644 --- a/src/gateway/server.sessions.gateway-server-sessions-a.test.ts +++ b/src/gateway/server.sessions.gateway-server-sessions-a.test.ts @@ -591,6 +591,43 @@ describe("gateway server sessions", () => { ws.close(); }); + test("sessions.reset recomputes model from defaults instead of stale runtime model", async () => { + await createSessionStoreDir(); + testState.agentConfig = { + model: { + primary: "openai/gpt-test-a", + }, + }; + + await writeSessionStore({ + entries: { + main: { + sessionId: "sess-stale-model", + updatedAt: Date.now(), + modelProvider: "qwencode", + model: "qwen3.5-plus-2026-02-15", + contextTokens: 123456, + }, + }, + }); + + const { ws } = await openClient(); + const reset = await rpcReq<{ + ok: true; + key: string; + entry: { sessionId: string; modelProvider?: string; model?: string; contextTokens?: number }; + }>(ws, "sessions.reset", { key: "main" }); + + expect(reset.ok).toBe(true); + expect(reset.payload?.key).toBe("agent:main:main"); + expect(reset.payload?.entry.sessionId).not.toBe("sess-stale-model"); + expect(reset.payload?.entry.modelProvider).toBe("openai"); + expect(reset.payload?.entry.model).toBe("gpt-test-a"); + expect(reset.payload?.entry.contextTokens).toBeUndefined(); + + ws.close(); + }); + test("sessions.preview resolves legacy mixed-case main alias with custom mainKey", async () => { const { dir, storePath } = await createSessionStoreDir(); testState.agentsConfig = { list: [{ id: "ops", default: true }] }; diff --git a/src/infra/heartbeat-reason.test.ts b/src/infra/heartbeat-reason.test.ts index 6c2fdc68f97..69d23e3219d 100644 --- a/src/infra/heartbeat-reason.test.ts +++ b/src/infra/heartbeat-reason.test.ts @@ -19,6 +19,7 @@ describe("heartbeat-reason", () => { expect(resolveHeartbeatReasonKind("manual")).toBe("manual"); expect(resolveHeartbeatReasonKind("exec-event")).toBe("exec-event"); expect(resolveHeartbeatReasonKind("wake")).toBe("wake"); + expect(resolveHeartbeatReasonKind("acp:spawn:stream")).toBe("wake"); expect(resolveHeartbeatReasonKind("cron:job-1")).toBe("cron"); expect(resolveHeartbeatReasonKind("hook:wake")).toBe("hook"); expect(resolveHeartbeatReasonKind(" hook:wake ")).toBe("hook"); @@ -35,6 +36,7 @@ describe("heartbeat-reason", () => { expect(isHeartbeatEventDrivenReason("exec-event")).toBe(true); expect(isHeartbeatEventDrivenReason("cron:job-1")).toBe(true); expect(isHeartbeatEventDrivenReason("wake")).toBe(true); + expect(isHeartbeatEventDrivenReason("acp:spawn:stream")).toBe(true); expect(isHeartbeatEventDrivenReason("hook:gmail:sync")).toBe(true); expect(isHeartbeatEventDrivenReason("interval")).toBe(false); expect(isHeartbeatEventDrivenReason("manual")).toBe(false); diff --git a/src/infra/heartbeat-reason.ts b/src/infra/heartbeat-reason.ts index 968b1e24062..447ca733e53 100644 --- a/src/infra/heartbeat-reason.ts +++ b/src/infra/heartbeat-reason.ts @@ -34,6 +34,9 @@ export function resolveHeartbeatReasonKind(reason?: string): HeartbeatReasonKind if (trimmed === "wake") { return "wake"; } + if (trimmed.startsWith("acp:spawn:")) { + return "wake"; + } if (trimmed.startsWith("cron:")) { return "cron"; } diff --git a/src/infra/heartbeat-runner.ts b/src/infra/heartbeat-runner.ts index c3c58d34c1e..344fd22d8fc 100644 --- a/src/infra/heartbeat-runner.ts +++ b/src/infra/heartbeat-runner.ts @@ -38,7 +38,11 @@ import type { AgentDefaultsConfig } from "../config/types.agent-defaults.js"; import { createSubsystemLogger } from "../logging/subsystem.js"; import { getQueueSize } from "../process/command-queue.js"; import { CommandLane } from "../process/lanes.js"; -import { normalizeAgentId, toAgentStoreSessionKey } from "../routing/session-key.js"; +import { + normalizeAgentId, + parseAgentSessionKey, + toAgentStoreSessionKey, +} from "../routing/session-key.js"; import { defaultRuntime, type RuntimeEnv } from "../runtime.js"; import { escapeRegExp } from "../utils.js"; import { formatErrorMessage, hasErrnoCode } from "./errors.js"; @@ -53,9 +57,11 @@ import { emitHeartbeatEvent, resolveIndicatorType } from "./heartbeat-events.js" import { resolveHeartbeatReasonKind } from "./heartbeat-reason.js"; import { resolveHeartbeatVisibility } from "./heartbeat-visibility.js"; import { + areHeartbeatsEnabled, type HeartbeatRunResult, type HeartbeatWakeHandler, requestHeartbeatNow, + setHeartbeatsEnabled, setHeartbeatWakeHandler, } from "./heartbeat-wake.js"; import type { OutboundSendDeps } from "./outbound/deliver.js"; @@ -75,11 +81,8 @@ export type HeartbeatDeps = OutboundSendDeps & }; const log = createSubsystemLogger("gateway/heartbeat"); -let heartbeatsEnabled = true; -export function setHeartbeatsEnabled(enabled: boolean) { - heartbeatsEnabled = enabled; -} +export { areHeartbeatsEnabled, setHeartbeatsEnabled }; type HeartbeatConfig = AgentDefaultsConfig["heartbeat"]; type HeartbeatAgent = { @@ -611,9 +614,14 @@ export async function runHeartbeatOnce(opts: { deps?: HeartbeatDeps; }): Promise { const cfg = opts.cfg ?? loadConfig(); - const agentId = normalizeAgentId(opts.agentId ?? resolveDefaultAgentId(cfg)); + const explicitAgentId = typeof opts.agentId === "string" ? opts.agentId.trim() : ""; + const forcedSessionAgentId = + explicitAgentId.length > 0 ? undefined : parseAgentSessionKey(opts.sessionKey)?.agentId; + const agentId = normalizeAgentId( + explicitAgentId || forcedSessionAgentId || resolveDefaultAgentId(cfg), + ); const heartbeat = opts.heartbeat ?? resolveHeartbeatConfig(cfg, agentId); - if (!heartbeatsEnabled) { + if (!areHeartbeatsEnabled()) { return { status: "skipped", reason: "disabled" }; } if (!isHeartbeatEnabledForAgent(cfg, agentId)) { @@ -1114,7 +1122,7 @@ export function startHeartbeatRunner(opts: { reason: "disabled", } satisfies HeartbeatRunResult; } - if (!heartbeatsEnabled) { + if (!areHeartbeatsEnabled()) { return { status: "skipped", reason: "disabled", diff --git a/src/infra/heartbeat-wake.ts b/src/infra/heartbeat-wake.ts index bccfdfe9829..3aaaca5ed96 100644 --- a/src/infra/heartbeat-wake.ts +++ b/src/infra/heartbeat-wake.ts @@ -15,6 +15,16 @@ export type HeartbeatWakeHandler = (opts: { sessionKey?: string; }) => Promise; +let heartbeatsEnabled = true; + +export function setHeartbeatsEnabled(enabled: boolean) { + heartbeatsEnabled = enabled; +} + +export function areHeartbeatsEnabled(): boolean { + return heartbeatsEnabled; +} + type WakeTimerKind = "normal" | "retry"; type PendingWakeReason = { reason: string; diff --git a/src/test-utils/secret-file-fixture.ts b/src/test-utils/secret-file-fixture.ts new file mode 100644 index 00000000000..8e780929f94 --- /dev/null +++ b/src/test-utils/secret-file-fixture.ts @@ -0,0 +1,30 @@ +import fs from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; + +export type SecretFiles = { + passwordFile?: string; + tokenFile?: string; +}; + +export async function withTempSecretFiles( + prefix: string, + secrets: { password?: string; token?: string }, + run: (files: SecretFiles) => Promise, +): Promise { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), prefix)); + try { + const files: SecretFiles = {}; + if (secrets.token !== undefined) { + files.tokenFile = path.join(dir, "token.txt"); + await fs.writeFile(files.tokenFile, secrets.token, "utf8"); + } + if (secrets.password !== undefined) { + files.passwordFile = path.join(dir, "password.txt"); + await fs.writeFile(files.passwordFile, secrets.password, "utf8"); + } + return await run(files); + } finally { + await fs.rm(dir, { recursive: true, force: true }); + } +} diff --git a/src/test-utils/send-payload-contract.ts b/src/test-utils/send-payload-contract.ts new file mode 100644 index 00000000000..5e78e406a74 --- /dev/null +++ b/src/test-utils/send-payload-contract.ts @@ -0,0 +1,138 @@ +import { expect, it, type Mock } from "vitest"; + +type PayloadLike = { + mediaUrl?: string; + mediaUrls?: string[]; + text?: string; +}; + +type SendResultLike = { + messageId: string; + [key: string]: unknown; +}; + +type ChunkingMode = + | { + longTextLength: number; + maxChunkLength: number; + mode: "split"; + } + | { + longTextLength: number; + mode: "passthrough"; + }; + +export function installSendPayloadContractSuite(params: { + channel: string; + chunking: ChunkingMode; + createHarness: (params: { payload: PayloadLike; sendResults?: SendResultLike[] }) => { + run: () => Promise>; + sendMock: Mock; + to: string; + }; +}) { + it("text-only delegates to sendText", async () => { + const { run, sendMock, to } = params.createHarness({ + payload: { text: "hello" }, + }); + const result = await run(); + + expect(sendMock).toHaveBeenCalledTimes(1); + expect(sendMock).toHaveBeenCalledWith(to, "hello", expect.any(Object)); + expect(result).toMatchObject({ channel: params.channel }); + }); + + it("single media delegates to sendMedia", async () => { + const { run, sendMock, to } = params.createHarness({ + payload: { text: "cap", mediaUrl: "https://example.com/a.jpg" }, + }); + const result = await run(); + + expect(sendMock).toHaveBeenCalledTimes(1); + expect(sendMock).toHaveBeenCalledWith( + to, + "cap", + expect.objectContaining({ mediaUrl: "https://example.com/a.jpg" }), + ); + expect(result).toMatchObject({ channel: params.channel }); + }); + + it("multi-media iterates URLs with caption on first", async () => { + const { run, sendMock, to } = params.createHarness({ + payload: { + text: "caption", + mediaUrls: ["https://example.com/1.jpg", "https://example.com/2.jpg"], + }, + sendResults: [{ messageId: "m-1" }, { messageId: "m-2" }], + }); + const result = await run(); + + expect(sendMock).toHaveBeenCalledTimes(2); + expect(sendMock).toHaveBeenNthCalledWith( + 1, + to, + "caption", + expect.objectContaining({ mediaUrl: "https://example.com/1.jpg" }), + ); + expect(sendMock).toHaveBeenNthCalledWith( + 2, + to, + "", + expect.objectContaining({ mediaUrl: "https://example.com/2.jpg" }), + ); + expect(result).toMatchObject({ channel: params.channel, messageId: "m-2" }); + }); + + it("empty payload returns no-op", async () => { + const { run, sendMock } = params.createHarness({ payload: {} }); + const result = await run(); + + expect(sendMock).not.toHaveBeenCalled(); + expect(result).toEqual({ channel: params.channel, messageId: "" }); + }); + + if (params.chunking.mode === "passthrough") { + it("text exceeding chunk limit is sent as-is when chunker is null", async () => { + const text = "a".repeat(params.chunking.longTextLength); + const { run, sendMock, to } = params.createHarness({ payload: { text } }); + const result = await run(); + + expect(sendMock).toHaveBeenCalledTimes(1); + expect(sendMock).toHaveBeenCalledWith(to, text, expect.any(Object)); + expect(result).toMatchObject({ channel: params.channel }); + }); + return; + } + + const chunking = params.chunking; + + it("chunking splits long text", async () => { + const text = "a".repeat(chunking.longTextLength); + const { run, sendMock } = params.createHarness({ + payload: { text }, + sendResults: [{ messageId: "c-1" }, { messageId: "c-2" }], + }); + const result = await run(); + + expect(sendMock.mock.calls.length).toBeGreaterThanOrEqual(2); + for (const call of sendMock.mock.calls) { + expect((call[1] as string).length).toBeLessThanOrEqual(chunking.maxChunkLength); + } + expect(result).toMatchObject({ channel: params.channel }); + }); +} + +export function primeSendMock( + sendMock: Mock, + fallbackResult: Record, + sendResults: SendResultLike[] = [], +) { + sendMock.mockReset(); + if (sendResults.length === 0) { + sendMock.mockResolvedValue(fallbackResult); + return; + } + for (const result of sendResults) { + sendMock.mockResolvedValueOnce(result); + } +}