* fix: make cleanup "keep" persist subagent sessions indefinitely * feat: expose subagent session metadata in sessions list * fix: include status and timing in sessions_list tool * fix: hide injected timestamp prefixes in chat ui * feat: push session list updates over websocket * feat: expose child subagent sessions in subagents list * feat: add admin http endpoint to kill sessions * Emit session.message websocket events for transcript updates * Estimate session costs in sessions list * Add direct session history HTTP and SSE endpoints * Harden dashboard session events and history APIs * Add session lifecycle gateway methods * Add dashboard session API improvements * Add dashboard session model and parent linkage support * fix: tighten dashboard session API metadata * Fix dashboard session cost metadata * Persist accumulated session cost * fix: stop followup queue drain cfg crash * Fix dashboard session create and model metadata * fix: stop guessing session model costs * Gateway: cache OpenRouter pricing for configured models * Gateway: add timeout session status * Fix subagent spawn test config loading * Gateway: preserve operator scopes without device identity * Emit user message transcript events and deduplicate plugin warnings * feat: emit sessions.changed lifecycle event on subagent spawn Adds a session-lifecycle-events module (similar to transcript-events) that emits create events when subagents are spawned. The gateway server.impl.ts listens for these events and broadcasts sessions.changed with reason=create to SSE subscribers, so dashboards can pick up new subagent sessions without polling. * Gateway: allow persistent dashboard orchestrator sessions * fix: preserve operator scopes for token-authenticated backend clients Backend clients (like agent-dashboard) that authenticate with a valid gateway token but don't present a device identity were getting their scopes stripped. The scope-clearing logic ran before checking the device identity decision, so even when evaluateMissingDeviceIdentity returned 'allow' (because roleCanSkipDeviceIdentity passed for token-authed operators), scopes were already cleared. Fix: also check decision.kind before clearing scopes, so token-authenticated operators keep their requested scopes. * Gateway: allow operator-token session kills * Fix stale active subagent status after follow-up runs * Fix dashboard image attachments in sessions send * Fix completed session follow-up status updates * feat: stream session tool events to operator UIs * Add sessions.steer gateway coverage * Persist subagent timing in session store * Fix subagent session transcript event keys * Fix active subagent session status in gateway * bump session label max to 512 * Fix gateway send session reactivation * fix: publish terminal session lifecycle state * feat: change default session reset to effectively never - Change DEFAULT_RESET_MODE from "daily" to "idle" - Change DEFAULT_IDLE_MINUTES from 60 to 0 (0 = disabled/never) - Allow idleMinutes=0 through normalization (don't clamp to 1) - Treat idleMinutes=0 as "no idle expiry" in evaluateSessionFreshness - Default behavior: mode "idle" + idleMinutes 0 = sessions never auto-reset - Update test assertion for new default mode * fix: prep session management followups (#50101) (thanks @clay-datacurve) --------- Co-authored-by: Tyler Yust <TYTYYUST@YAHOO.COM>
711 lines
21 KiB
TypeScript
711 lines
21 KiB
TypeScript
import fs from "node:fs/promises";
|
|
import { tmpdir } from "node:os";
|
|
import path from "node:path";
|
|
import { beforeEach, describe, expect, it, vi } from "vitest";
|
|
import { loadSessionStore, saveSessionStore, type SessionEntry } from "../../config/sessions.js";
|
|
import type { FollowupRun } from "./queue.js";
|
|
import * as sessionRunAccounting from "./session-run-accounting.js";
|
|
import { createMockFollowupRun, createMockTypingController } from "./test-helpers.js";
|
|
|
|
const runEmbeddedPiAgentMock = vi.fn();
|
|
const routeReplyMock = vi.fn();
|
|
const isRoutableChannelMock = vi.fn();
|
|
|
|
vi.mock(
|
|
"../../agents/model-fallback.js",
|
|
async () => await import("../../test-utils/model-fallback.mock.js"),
|
|
);
|
|
|
|
vi.mock("../../agents/pi-embedded.js", () => ({
|
|
runEmbeddedPiAgent: (params: unknown) => runEmbeddedPiAgentMock(params),
|
|
}));
|
|
|
|
vi.mock("./route-reply.js", async (importOriginal) => {
|
|
const actual = await importOriginal<typeof import("./route-reply.js")>();
|
|
return {
|
|
...actual,
|
|
isRoutableChannel: (...args: unknown[]) => isRoutableChannelMock(...args),
|
|
routeReply: (...args: unknown[]) => routeReplyMock(...args),
|
|
};
|
|
});
|
|
|
|
import { createFollowupRunner } from "./followup-runner.js";
|
|
|
|
const ROUTABLE_TEST_CHANNELS = new Set([
|
|
"telegram",
|
|
"slack",
|
|
"discord",
|
|
"signal",
|
|
"imessage",
|
|
"whatsapp",
|
|
"feishu",
|
|
]);
|
|
|
|
beforeEach(() => {
|
|
routeReplyMock.mockReset();
|
|
routeReplyMock.mockResolvedValue({ ok: true });
|
|
isRoutableChannelMock.mockReset();
|
|
isRoutableChannelMock.mockImplementation((ch: string | undefined) =>
|
|
Boolean(ch?.trim() && ROUTABLE_TEST_CHANNELS.has(ch.trim().toLowerCase())),
|
|
);
|
|
});
|
|
|
|
const baseQueuedRun = (messageProvider = "whatsapp"): FollowupRun =>
|
|
createMockFollowupRun({ run: { messageProvider } });
|
|
|
|
function createQueuedRun(
|
|
overrides: Partial<Omit<FollowupRun, "run">> & { run?: Partial<FollowupRun["run"]> } = {},
|
|
): FollowupRun {
|
|
return createMockFollowupRun(overrides);
|
|
}
|
|
|
|
function mockCompactionRun(params: {
|
|
willRetry: boolean;
|
|
result: {
|
|
payloads: Array<{ text: string }>;
|
|
meta: Record<string, unknown>;
|
|
};
|
|
}) {
|
|
runEmbeddedPiAgentMock.mockImplementationOnce(
|
|
async (args: {
|
|
onAgentEvent?: (evt: { stream: string; data: Record<string, unknown> }) => void;
|
|
}) => {
|
|
args.onAgentEvent?.({
|
|
stream: "compaction",
|
|
data: { phase: "end", willRetry: params.willRetry, completed: true },
|
|
});
|
|
return params.result;
|
|
},
|
|
);
|
|
}
|
|
|
|
function createAsyncReplySpy() {
|
|
return vi.fn(async () => {});
|
|
}
|
|
|
|
describe("createFollowupRunner compaction", () => {
|
|
it("adds verbose auto-compaction notice and tracks count", async () => {
|
|
const storePath = path.join(
|
|
await fs.mkdtemp(path.join(tmpdir(), "openclaw-compaction-")),
|
|
"sessions.json",
|
|
);
|
|
const sessionEntry: SessionEntry = {
|
|
sessionId: "session",
|
|
updatedAt: Date.now(),
|
|
};
|
|
const sessionStore: Record<string, SessionEntry> = {
|
|
main: sessionEntry,
|
|
};
|
|
const onBlockReply = vi.fn(async () => {});
|
|
|
|
mockCompactionRun({
|
|
willRetry: true,
|
|
result: { payloads: [{ text: "final" }], meta: {} },
|
|
});
|
|
|
|
const runner = createFollowupRunner({
|
|
opts: { onBlockReply },
|
|
typing: createMockTypingController(),
|
|
typingMode: "instant",
|
|
sessionEntry,
|
|
sessionStore,
|
|
sessionKey: "main",
|
|
storePath,
|
|
defaultModel: "anthropic/claude-opus-4-5",
|
|
});
|
|
|
|
const queued = createQueuedRun({
|
|
run: {
|
|
verboseLevel: "on",
|
|
},
|
|
});
|
|
|
|
await runner(queued);
|
|
|
|
expect(onBlockReply).toHaveBeenCalled();
|
|
const firstCall = (onBlockReply.mock.calls as unknown as Array<Array<{ text?: string }>>)[0];
|
|
expect(firstCall?.[0]?.text).toContain("Auto-compaction complete");
|
|
expect(sessionStore.main.compactionCount).toBe(1);
|
|
});
|
|
|
|
it("tracks auto-compaction from embedded result metadata even when no compaction event is emitted", async () => {
|
|
const storePath = path.join(
|
|
await fs.mkdtemp(path.join(tmpdir(), "openclaw-compaction-meta-")),
|
|
"sessions.json",
|
|
);
|
|
const sessionEntry: SessionEntry = {
|
|
sessionId: "session",
|
|
updatedAt: Date.now(),
|
|
};
|
|
const sessionStore: Record<string, SessionEntry> = {
|
|
main: sessionEntry,
|
|
};
|
|
const onBlockReply = vi.fn(async () => {});
|
|
|
|
runEmbeddedPiAgentMock.mockResolvedValueOnce({
|
|
payloads: [{ text: "final" }],
|
|
meta: {
|
|
agentMeta: {
|
|
compactionCount: 2,
|
|
lastCallUsage: { input: 10_000, output: 3_000, total: 13_000 },
|
|
},
|
|
},
|
|
});
|
|
|
|
const runner = createFollowupRunner({
|
|
opts: { onBlockReply },
|
|
typing: createMockTypingController(),
|
|
typingMode: "instant",
|
|
sessionEntry,
|
|
sessionStore,
|
|
sessionKey: "main",
|
|
storePath,
|
|
defaultModel: "anthropic/claude-opus-4-5",
|
|
});
|
|
|
|
const queued = createQueuedRun({
|
|
run: {
|
|
verboseLevel: "on",
|
|
},
|
|
});
|
|
|
|
await runner(queued);
|
|
|
|
expect(onBlockReply).toHaveBeenCalled();
|
|
const firstCall = (onBlockReply.mock.calls as unknown as Array<Array<{ text?: string }>>)[0];
|
|
expect(firstCall?.[0]?.text).toContain("Auto-compaction complete");
|
|
expect(sessionStore.main.compactionCount).toBe(2);
|
|
});
|
|
|
|
it("does not count failed compaction end events in followup runs", async () => {
|
|
const storePath = path.join(
|
|
await fs.mkdtemp(path.join(tmpdir(), "openclaw-compaction-failed-")),
|
|
"sessions.json",
|
|
);
|
|
const sessionEntry: SessionEntry = {
|
|
sessionId: "session",
|
|
updatedAt: Date.now(),
|
|
};
|
|
const sessionStore: Record<string, SessionEntry> = {
|
|
main: sessionEntry,
|
|
};
|
|
const onBlockReply = vi.fn(async () => {});
|
|
|
|
const runner = createFollowupRunner({
|
|
opts: { onBlockReply },
|
|
typing: createMockTypingController(),
|
|
typingMode: "instant",
|
|
sessionEntry,
|
|
sessionStore,
|
|
sessionKey: "main",
|
|
storePath,
|
|
defaultModel: "anthropic/claude-opus-4-5",
|
|
});
|
|
|
|
const queued = createQueuedRun({
|
|
run: {
|
|
verboseLevel: "on",
|
|
},
|
|
});
|
|
|
|
runEmbeddedPiAgentMock.mockImplementationOnce(async (args) => {
|
|
args.onAgentEvent?.({
|
|
stream: "compaction",
|
|
data: { phase: "end", willRetry: false, completed: false },
|
|
});
|
|
return {
|
|
payloads: [{ text: "final" }],
|
|
meta: {
|
|
agentMeta: {
|
|
compactionCount: 0,
|
|
lastCallUsage: { input: 10_000, output: 3_000, total: 13_000 },
|
|
},
|
|
},
|
|
};
|
|
});
|
|
|
|
await runner(queued);
|
|
|
|
expect(onBlockReply).toHaveBeenCalledTimes(1);
|
|
const firstCall = (onBlockReply.mock.calls as unknown as Array<Array<{ text?: string }>>)[0];
|
|
expect(firstCall?.[0]?.text).toBe("final");
|
|
expect(sessionStore.main.compactionCount).toBeUndefined();
|
|
});
|
|
});
|
|
|
|
describe("createFollowupRunner bootstrap warning dedupe", () => {
|
|
it("passes stored warning signature history to embedded followup runs", async () => {
|
|
runEmbeddedPiAgentMock.mockResolvedValueOnce({
|
|
payloads: [],
|
|
meta: {},
|
|
});
|
|
|
|
const sessionEntry: SessionEntry = {
|
|
sessionId: "session",
|
|
updatedAt: Date.now(),
|
|
systemPromptReport: {
|
|
source: "run",
|
|
generatedAt: Date.now(),
|
|
systemPrompt: {
|
|
chars: 1,
|
|
projectContextChars: 0,
|
|
nonProjectContextChars: 1,
|
|
},
|
|
injectedWorkspaceFiles: [],
|
|
skills: {
|
|
promptChars: 0,
|
|
entries: [],
|
|
},
|
|
tools: {
|
|
listChars: 0,
|
|
schemaChars: 0,
|
|
entries: [],
|
|
},
|
|
bootstrapTruncation: {
|
|
warningMode: "once",
|
|
warningShown: true,
|
|
promptWarningSignature: "sig-b",
|
|
warningSignaturesSeen: ["sig-a", "sig-b"],
|
|
truncatedFiles: 1,
|
|
nearLimitFiles: 0,
|
|
totalNearLimit: false,
|
|
},
|
|
},
|
|
};
|
|
const sessionStore: Record<string, SessionEntry> = { main: sessionEntry };
|
|
|
|
const runner = createFollowupRunner({
|
|
opts: { onBlockReply: vi.fn(async () => {}) },
|
|
typing: createMockTypingController(),
|
|
typingMode: "instant",
|
|
sessionEntry,
|
|
sessionStore,
|
|
sessionKey: "main",
|
|
defaultModel: "anthropic/claude-opus-4-5",
|
|
});
|
|
|
|
await runner(baseQueuedRun());
|
|
|
|
const call = runEmbeddedPiAgentMock.mock.calls.at(-1)?.[0] as
|
|
| {
|
|
allowGatewaySubagentBinding?: boolean;
|
|
bootstrapPromptWarningSignaturesSeen?: string[];
|
|
bootstrapPromptWarningSignature?: string;
|
|
}
|
|
| undefined;
|
|
expect(call?.allowGatewaySubagentBinding).toBe(true);
|
|
expect(call?.bootstrapPromptWarningSignaturesSeen).toEqual(["sig-a", "sig-b"]);
|
|
expect(call?.bootstrapPromptWarningSignature).toBe("sig-b");
|
|
});
|
|
});
|
|
|
|
describe("createFollowupRunner messaging tool dedupe", () => {
|
|
function createMessagingDedupeRunner(
|
|
onBlockReply: (payload: unknown) => Promise<void>,
|
|
overrides: Partial<{
|
|
sessionEntry: SessionEntry;
|
|
sessionStore: Record<string, SessionEntry>;
|
|
sessionKey: string;
|
|
storePath: string;
|
|
}> = {},
|
|
) {
|
|
return createFollowupRunner({
|
|
opts: { onBlockReply },
|
|
typing: createMockTypingController(),
|
|
typingMode: "instant",
|
|
defaultModel: "anthropic/claude-opus-4-5",
|
|
sessionEntry: overrides.sessionEntry,
|
|
sessionStore: overrides.sessionStore,
|
|
sessionKey: overrides.sessionKey,
|
|
storePath: overrides.storePath,
|
|
});
|
|
}
|
|
|
|
async function runMessagingCase(params: {
|
|
agentResult: Record<string, unknown>;
|
|
queued?: FollowupRun;
|
|
runnerOverrides?: Partial<{
|
|
sessionEntry: SessionEntry;
|
|
sessionStore: Record<string, SessionEntry>;
|
|
sessionKey: string;
|
|
storePath: string;
|
|
}>;
|
|
}) {
|
|
const onBlockReply = createAsyncReplySpy();
|
|
runEmbeddedPiAgentMock.mockResolvedValueOnce({
|
|
meta: {},
|
|
...params.agentResult,
|
|
});
|
|
const runner = createMessagingDedupeRunner(onBlockReply, params.runnerOverrides);
|
|
await runner(params.queued ?? baseQueuedRun());
|
|
return { onBlockReply };
|
|
}
|
|
|
|
function makeTextReplyDedupeResult(overrides?: Record<string, unknown>) {
|
|
return {
|
|
payloads: [{ text: "hello world!" }],
|
|
messagingToolSentTexts: ["different message"],
|
|
...overrides,
|
|
};
|
|
}
|
|
|
|
it("drops payloads already sent via messaging tool", async () => {
|
|
const { onBlockReply } = await runMessagingCase({
|
|
agentResult: {
|
|
payloads: [{ text: "hello world!" }],
|
|
messagingToolSentTexts: ["hello world!"],
|
|
},
|
|
});
|
|
|
|
expect(onBlockReply).not.toHaveBeenCalled();
|
|
});
|
|
|
|
it("delivers payloads when not duplicates", async () => {
|
|
const { onBlockReply } = await runMessagingCase({
|
|
agentResult: makeTextReplyDedupeResult(),
|
|
});
|
|
|
|
expect(onBlockReply).toHaveBeenCalledTimes(1);
|
|
});
|
|
|
|
it("suppresses replies when a messaging tool sent via the same provider + target", async () => {
|
|
const { onBlockReply } = await runMessagingCase({
|
|
agentResult: {
|
|
...makeTextReplyDedupeResult(),
|
|
messagingToolSentTargets: [{ tool: "slack", provider: "slack", to: "channel:C1" }],
|
|
},
|
|
queued: baseQueuedRun("slack"),
|
|
});
|
|
|
|
expect(onBlockReply).not.toHaveBeenCalled();
|
|
});
|
|
|
|
it("suppresses replies when provider is synthetic but originating channel matches", async () => {
|
|
const { onBlockReply } = await runMessagingCase({
|
|
agentResult: {
|
|
...makeTextReplyDedupeResult(),
|
|
messagingToolSentTargets: [{ tool: "telegram", provider: "telegram", to: "268300329" }],
|
|
},
|
|
queued: {
|
|
...baseQueuedRun("heartbeat"),
|
|
originatingChannel: "telegram",
|
|
originatingTo: "268300329",
|
|
} as FollowupRun,
|
|
});
|
|
|
|
expect(onBlockReply).not.toHaveBeenCalled();
|
|
});
|
|
|
|
it("does not suppress replies for same target when account differs", async () => {
|
|
const { onBlockReply } = await runMessagingCase({
|
|
agentResult: {
|
|
...makeTextReplyDedupeResult(),
|
|
messagingToolSentTargets: [
|
|
{ tool: "telegram", provider: "telegram", to: "268300329", accountId: "work" },
|
|
],
|
|
},
|
|
queued: {
|
|
...baseQueuedRun("heartbeat"),
|
|
originatingChannel: "telegram",
|
|
originatingTo: "268300329",
|
|
originatingAccountId: "personal",
|
|
} as FollowupRun,
|
|
});
|
|
|
|
expect(routeReplyMock).toHaveBeenCalledWith(
|
|
expect.objectContaining({
|
|
channel: "telegram",
|
|
to: "268300329",
|
|
accountId: "personal",
|
|
}),
|
|
);
|
|
expect(onBlockReply).not.toHaveBeenCalled();
|
|
});
|
|
|
|
it("drops media URL from payload when messaging tool already sent it", async () => {
|
|
const { onBlockReply } = await runMessagingCase({
|
|
agentResult: {
|
|
payloads: [{ mediaUrl: "/tmp/img.png" }],
|
|
messagingToolSentMediaUrls: ["/tmp/img.png"],
|
|
},
|
|
});
|
|
|
|
// Media stripped → payload becomes non-renderable → not delivered.
|
|
expect(onBlockReply).not.toHaveBeenCalled();
|
|
});
|
|
|
|
it("delivers media payload when not a duplicate", async () => {
|
|
const { onBlockReply } = await runMessagingCase({
|
|
agentResult: {
|
|
payloads: [{ mediaUrl: "/tmp/img.png" }],
|
|
messagingToolSentMediaUrls: ["/tmp/other.png"],
|
|
},
|
|
});
|
|
|
|
expect(onBlockReply).toHaveBeenCalledTimes(1);
|
|
});
|
|
|
|
it("persists usage even when replies are suppressed", async () => {
|
|
const storePath = path.join(
|
|
await fs.mkdtemp(path.join(tmpdir(), "openclaw-followup-usage-")),
|
|
"sessions.json",
|
|
);
|
|
const sessionKey = "main";
|
|
const sessionEntry: SessionEntry = { sessionId: "session", updatedAt: Date.now() };
|
|
const sessionStore: Record<string, SessionEntry> = { [sessionKey]: sessionEntry };
|
|
await saveSessionStore(storePath, sessionStore);
|
|
|
|
const { onBlockReply } = await runMessagingCase({
|
|
agentResult: {
|
|
...makeTextReplyDedupeResult(),
|
|
messagingToolSentTargets: [{ tool: "slack", provider: "slack", to: "channel:C1" }],
|
|
meta: {
|
|
agentMeta: {
|
|
usage: { input: 1_000, output: 50 },
|
|
lastCallUsage: { input: 400, output: 20 },
|
|
model: "claude-opus-4-5",
|
|
provider: "anthropic",
|
|
},
|
|
},
|
|
},
|
|
runnerOverrides: {
|
|
sessionEntry,
|
|
sessionStore,
|
|
sessionKey,
|
|
storePath,
|
|
},
|
|
queued: baseQueuedRun("slack"),
|
|
});
|
|
|
|
expect(onBlockReply).not.toHaveBeenCalled();
|
|
const store = loadSessionStore(storePath, { skipCache: true });
|
|
// totalTokens should reflect the last call usage snapshot, not the accumulated input.
|
|
expect(store[sessionKey]?.totalTokens).toBe(400);
|
|
expect(store[sessionKey]?.model).toBe("claude-opus-4-5");
|
|
// Accumulated usage is still stored for usage/cost tracking.
|
|
expect(store[sessionKey]?.inputTokens).toBe(1_000);
|
|
expect(store[sessionKey]?.outputTokens).toBe(50);
|
|
});
|
|
|
|
it("passes queued config into usage persistence during drained followups", async () => {
|
|
const storePath = path.join(
|
|
await fs.mkdtemp(path.join(tmpdir(), "openclaw-followup-usage-cfg-")),
|
|
"sessions.json",
|
|
);
|
|
const sessionKey = "main";
|
|
const sessionEntry: SessionEntry = { sessionId: "session", updatedAt: Date.now() };
|
|
const sessionStore: Record<string, SessionEntry> = { [sessionKey]: sessionEntry };
|
|
await saveSessionStore(storePath, sessionStore);
|
|
|
|
const cfg = {
|
|
messages: {
|
|
responsePrefix: "agent",
|
|
},
|
|
};
|
|
const persistSpy = vi.spyOn(sessionRunAccounting, "persistRunSessionUsage");
|
|
runEmbeddedPiAgentMock.mockResolvedValueOnce({
|
|
payloads: [{ text: "hello world!" }],
|
|
meta: {
|
|
agentMeta: {
|
|
usage: { input: 10, output: 5 },
|
|
lastCallUsage: { input: 6, output: 3 },
|
|
model: "claude-opus-4-5",
|
|
},
|
|
},
|
|
});
|
|
|
|
const runner = createFollowupRunner({
|
|
opts: { onBlockReply: createAsyncReplySpy() },
|
|
typing: createMockTypingController(),
|
|
typingMode: "instant",
|
|
defaultModel: "anthropic/claude-opus-4-5",
|
|
sessionEntry,
|
|
sessionStore,
|
|
sessionKey,
|
|
storePath,
|
|
});
|
|
|
|
await expect(
|
|
runner(
|
|
createQueuedRun({
|
|
run: {
|
|
config: cfg,
|
|
},
|
|
}),
|
|
),
|
|
).resolves.toBeUndefined();
|
|
|
|
expect(persistSpy).toHaveBeenCalledWith(
|
|
expect.objectContaining({
|
|
storePath,
|
|
sessionKey,
|
|
cfg,
|
|
}),
|
|
);
|
|
persistSpy.mockRestore();
|
|
});
|
|
|
|
it("does not fall back to dispatcher when cross-channel origin routing fails", async () => {
|
|
routeReplyMock.mockResolvedValueOnce({
|
|
ok: false,
|
|
error: "forced route failure",
|
|
});
|
|
const { onBlockReply } = await runMessagingCase({
|
|
agentResult: { payloads: [{ text: "hello world!" }] },
|
|
queued: {
|
|
...baseQueuedRun("webchat"),
|
|
originatingChannel: "discord",
|
|
originatingTo: "channel:C1",
|
|
} as FollowupRun,
|
|
});
|
|
|
|
expect(routeReplyMock).toHaveBeenCalled();
|
|
expect(onBlockReply).not.toHaveBeenCalled();
|
|
});
|
|
|
|
it("falls back to dispatcher when same-channel origin routing fails", async () => {
|
|
routeReplyMock.mockResolvedValueOnce({
|
|
ok: false,
|
|
error: "outbound adapter unavailable",
|
|
});
|
|
const { onBlockReply } = await runMessagingCase({
|
|
agentResult: { payloads: [{ text: "hello world!" }] },
|
|
queued: {
|
|
...baseQueuedRun(" Feishu "),
|
|
originatingChannel: "FEISHU",
|
|
originatingTo: "ou_abc123",
|
|
} as FollowupRun,
|
|
});
|
|
|
|
expect(routeReplyMock).toHaveBeenCalled();
|
|
expect(onBlockReply).toHaveBeenCalledTimes(1);
|
|
expect(onBlockReply).toHaveBeenCalledWith(expect.objectContaining({ text: "hello world!" }));
|
|
});
|
|
|
|
it("routes followups with originating account/thread metadata", async () => {
|
|
const { onBlockReply } = await runMessagingCase({
|
|
agentResult: { payloads: [{ text: "hello world!" }] },
|
|
queued: {
|
|
...baseQueuedRun("webchat"),
|
|
originatingChannel: "discord",
|
|
originatingTo: "channel:C1",
|
|
originatingAccountId: "work",
|
|
originatingThreadId: "1739142736.000100",
|
|
} as FollowupRun,
|
|
});
|
|
|
|
expect(routeReplyMock).toHaveBeenCalledWith(
|
|
expect.objectContaining({
|
|
channel: "discord",
|
|
to: "channel:C1",
|
|
accountId: "work",
|
|
threadId: "1739142736.000100",
|
|
}),
|
|
);
|
|
expect(onBlockReply).not.toHaveBeenCalled();
|
|
});
|
|
});
|
|
|
|
describe("createFollowupRunner typing cleanup", () => {
|
|
async function runTypingCase(agentResult: Record<string, unknown>) {
|
|
const typing = createMockTypingController();
|
|
runEmbeddedPiAgentMock.mockResolvedValueOnce({
|
|
meta: {},
|
|
...agentResult,
|
|
});
|
|
|
|
const runner = createFollowupRunner({
|
|
opts: { onBlockReply: createAsyncReplySpy() },
|
|
typing,
|
|
typingMode: "instant",
|
|
defaultModel: "anthropic/claude-opus-4-5",
|
|
});
|
|
|
|
await runner(baseQueuedRun());
|
|
return typing;
|
|
}
|
|
|
|
function expectTypingCleanup(typing: ReturnType<typeof createMockTypingController>) {
|
|
expect(typing.markRunComplete).toHaveBeenCalled();
|
|
expect(typing.markDispatchIdle).toHaveBeenCalled();
|
|
}
|
|
|
|
it("calls both markRunComplete and markDispatchIdle on NO_REPLY", async () => {
|
|
const typing = await runTypingCase({ payloads: [{ text: "NO_REPLY" }] });
|
|
expectTypingCleanup(typing);
|
|
});
|
|
|
|
it("calls both markRunComplete and markDispatchIdle on empty payloads", async () => {
|
|
const typing = await runTypingCase({ payloads: [] });
|
|
expectTypingCleanup(typing);
|
|
});
|
|
|
|
it("calls both markRunComplete and markDispatchIdle on agent error", async () => {
|
|
const typing = createMockTypingController();
|
|
runEmbeddedPiAgentMock.mockRejectedValueOnce(new Error("agent exploded"));
|
|
|
|
const runner = createFollowupRunner({
|
|
opts: { onBlockReply: vi.fn(async () => {}) },
|
|
typing,
|
|
typingMode: "instant",
|
|
defaultModel: "anthropic/claude-opus-4-5",
|
|
});
|
|
|
|
await runner(baseQueuedRun());
|
|
|
|
expectTypingCleanup(typing);
|
|
});
|
|
|
|
it("calls both markRunComplete and markDispatchIdle on successful delivery", async () => {
|
|
const typing = createMockTypingController();
|
|
const onBlockReply = vi.fn(async () => {});
|
|
runEmbeddedPiAgentMock.mockResolvedValueOnce({
|
|
payloads: [{ text: "hello world!" }],
|
|
meta: {},
|
|
});
|
|
|
|
const runner = createFollowupRunner({
|
|
opts: { onBlockReply },
|
|
typing,
|
|
typingMode: "instant",
|
|
defaultModel: "anthropic/claude-opus-4-5",
|
|
});
|
|
|
|
await runner(baseQueuedRun());
|
|
|
|
expect(onBlockReply).toHaveBeenCalled();
|
|
expectTypingCleanup(typing);
|
|
});
|
|
});
|
|
|
|
describe("createFollowupRunner agentDir forwarding", () => {
|
|
it("passes queued run agentDir to runEmbeddedPiAgent", async () => {
|
|
runEmbeddedPiAgentMock.mockClear();
|
|
const onBlockReply = vi.fn(async () => {});
|
|
runEmbeddedPiAgentMock.mockResolvedValueOnce({
|
|
payloads: [{ text: "hello world!" }],
|
|
messagingToolSentTexts: ["different message"],
|
|
meta: {},
|
|
});
|
|
const runner = createFollowupRunner({
|
|
opts: { onBlockReply },
|
|
typing: createMockTypingController(),
|
|
typingMode: "instant",
|
|
defaultModel: "anthropic/claude-opus-4-5",
|
|
});
|
|
const agentDir = path.join("/tmp", "agent-dir");
|
|
const queued = createQueuedRun();
|
|
await runner({
|
|
...queued,
|
|
run: {
|
|
...queued.run,
|
|
agentDir,
|
|
},
|
|
});
|
|
|
|
expect(runEmbeddedPiAgentMock).toHaveBeenCalledTimes(1);
|
|
const call = runEmbeddedPiAgentMock.mock.calls.at(-1)?.[0] as { agentDir?: string };
|
|
expect(call?.agentDir).toBe(agentDir);
|
|
});
|
|
});
|