openclaw/src/auto-reply/reply/followup-runner.test.ts
codexGW 6fb082e131
fix(typing): call markDispatchIdle in followup runner to prevent stuck indicator (#26881)
The followup runner (used for queued messages, inter-agent sends,
heartbeat followups, etc.) only called typing.markRunComplete() in
its finally block.  The typing controller requires BOTH markRunComplete
AND markDispatchIdle to trigger cleanup — but markDispatchIdle was
only wired through the buffered dispatcher path, which followup turns
bypass entirely.

This caused the typing indicator to persist indefinitely on channels
like Telegram when the agent replied with NO_REPLY or produced empty
payloads, because the keepalive loop was never stopped.

Adds markDispatchIdle() alongside markRunComplete() in the followup
runner's finally block, and four test cases covering NO_REPLY, empty
payloads, agent errors, and successful delivery.

Complements #26295 which addressed the channel-level callback layer.

Fixes #26595

Co-authored-by: Samantha <samantha@Samanthas-Mac-mini.local>
2026-02-26 00:53:38 +00:00

542 lines
16 KiB
TypeScript

import fs from "node:fs/promises";
import { tmpdir } from "node:os";
import path from "node:path";
import { beforeEach, describe, expect, it, vi } from "vitest";
import { loadSessionStore, saveSessionStore, type SessionEntry } from "../../config/sessions.js";
import type { FollowupRun } from "./queue.js";
import { createMockTypingController } from "./test-helpers.js";
const runEmbeddedPiAgentMock = vi.fn();
const routeReplyMock = vi.fn();
const isRoutableChannelMock = vi.fn();
vi.mock(
"../../agents/model-fallback.js",
async () => await import("../../test-utils/model-fallback.mock.js"),
);
vi.mock("../../agents/pi-embedded.js", () => ({
runEmbeddedPiAgent: (params: unknown) => runEmbeddedPiAgentMock(params),
}));
vi.mock("./route-reply.js", async (importOriginal) => {
const actual = await importOriginal<typeof import("./route-reply.js")>();
return {
...actual,
isRoutableChannel: (...args: unknown[]) => isRoutableChannelMock(...args),
routeReply: (...args: unknown[]) => routeReplyMock(...args),
};
});
import { createFollowupRunner } from "./followup-runner.js";
const ROUTABLE_TEST_CHANNELS = new Set([
"telegram",
"slack",
"discord",
"signal",
"imessage",
"whatsapp",
"feishu",
]);
beforeEach(() => {
routeReplyMock.mockReset();
routeReplyMock.mockResolvedValue({ ok: true });
isRoutableChannelMock.mockReset();
isRoutableChannelMock.mockImplementation((ch: string | undefined) =>
Boolean(ch?.trim() && ROUTABLE_TEST_CHANNELS.has(ch.trim().toLowerCase())),
);
});
const baseQueuedRun = (messageProvider = "whatsapp"): FollowupRun =>
({
prompt: "hello",
summaryLine: "hello",
enqueuedAt: Date.now(),
originatingTo: "channel:C1",
run: {
sessionId: "session",
sessionKey: "main",
messageProvider,
agentAccountId: "primary",
sessionFile: "/tmp/session.jsonl",
workspaceDir: "/tmp",
config: {},
skillsSnapshot: {},
provider: "anthropic",
model: "claude",
thinkLevel: "low",
verboseLevel: "off",
elevatedLevel: "off",
bashElevated: {
enabled: false,
allowed: false,
defaultLevel: "off",
},
timeoutMs: 1_000,
blockReplyBreak: "message_end",
},
}) as FollowupRun;
function createQueuedRun(
overrides: Partial<Omit<FollowupRun, "run">> & { run?: Partial<FollowupRun["run"]> } = {},
): FollowupRun {
const base = baseQueuedRun();
return {
...base,
...overrides,
run: {
...base.run,
...overrides.run,
},
};
}
function mockCompactionRun(params: {
willRetry: boolean;
result: {
payloads: Array<{ text: string }>;
meta: Record<string, unknown>;
};
}) {
runEmbeddedPiAgentMock.mockImplementationOnce(
async (args: {
onAgentEvent?: (evt: { stream: string; data: Record<string, unknown> }) => void;
}) => {
args.onAgentEvent?.({
stream: "compaction",
data: { phase: "end", willRetry: params.willRetry },
});
return params.result;
},
);
}
describe("createFollowupRunner compaction", () => {
it("adds verbose auto-compaction notice and tracks count", async () => {
const storePath = path.join(
await fs.mkdtemp(path.join(tmpdir(), "openclaw-compaction-")),
"sessions.json",
);
const sessionEntry: SessionEntry = {
sessionId: "session",
updatedAt: Date.now(),
};
const sessionStore: Record<string, SessionEntry> = {
main: sessionEntry,
};
const onBlockReply = vi.fn(async () => {});
mockCompactionRun({
willRetry: true,
result: { payloads: [{ text: "final" }], meta: {} },
});
const runner = createFollowupRunner({
opts: { onBlockReply },
typing: createMockTypingController(),
typingMode: "instant",
sessionEntry,
sessionStore,
sessionKey: "main",
storePath,
defaultModel: "anthropic/claude-opus-4-5",
});
const queued = createQueuedRun({
run: {
verboseLevel: "on",
},
});
await runner(queued);
expect(onBlockReply).toHaveBeenCalled();
const firstCall = (onBlockReply.mock.calls as unknown as Array<Array<{ text?: string }>>)[0];
expect(firstCall?.[0]?.text).toContain("Auto-compaction complete");
expect(sessionStore.main.compactionCount).toBe(1);
});
});
describe("createFollowupRunner messaging tool dedupe", () => {
function createMessagingDedupeRunner(
onBlockReply: (payload: unknown) => Promise<void>,
overrides: Partial<{
sessionEntry: SessionEntry;
sessionStore: Record<string, SessionEntry>;
sessionKey: string;
storePath: string;
}> = {},
) {
return createFollowupRunner({
opts: { onBlockReply },
typing: createMockTypingController(),
typingMode: "instant",
defaultModel: "anthropic/claude-opus-4-5",
sessionEntry: overrides.sessionEntry,
sessionStore: overrides.sessionStore,
sessionKey: overrides.sessionKey,
storePath: overrides.storePath,
});
}
it("drops payloads already sent via messaging tool", async () => {
const onBlockReply = vi.fn(async () => {});
runEmbeddedPiAgentMock.mockResolvedValueOnce({
payloads: [{ text: "hello world!" }],
messagingToolSentTexts: ["hello world!"],
meta: {},
});
const runner = createMessagingDedupeRunner(onBlockReply);
await runner(baseQueuedRun());
expect(onBlockReply).not.toHaveBeenCalled();
});
it("delivers payloads when not duplicates", async () => {
const onBlockReply = vi.fn(async () => {});
runEmbeddedPiAgentMock.mockResolvedValueOnce({
payloads: [{ text: "hello world!" }],
messagingToolSentTexts: ["different message"],
meta: {},
});
const runner = createMessagingDedupeRunner(onBlockReply);
await runner(baseQueuedRun());
expect(onBlockReply).toHaveBeenCalledTimes(1);
});
it("suppresses replies when a messaging tool sent via the same provider + target", async () => {
const onBlockReply = vi.fn(async () => {});
runEmbeddedPiAgentMock.mockResolvedValueOnce({
payloads: [{ text: "hello world!" }],
messagingToolSentTexts: ["different message"],
messagingToolSentTargets: [{ tool: "slack", provider: "slack", to: "channel:C1" }],
meta: {},
});
const runner = createMessagingDedupeRunner(onBlockReply);
await runner(baseQueuedRun("slack"));
expect(onBlockReply).not.toHaveBeenCalled();
});
it("suppresses replies when provider is synthetic but originating channel matches", async () => {
const onBlockReply = vi.fn(async () => {});
runEmbeddedPiAgentMock.mockResolvedValueOnce({
payloads: [{ text: "hello world!" }],
messagingToolSentTexts: ["different message"],
messagingToolSentTargets: [{ tool: "telegram", provider: "telegram", to: "268300329" }],
meta: {},
});
const runner = createMessagingDedupeRunner(onBlockReply);
await runner({
...baseQueuedRun("heartbeat"),
originatingChannel: "telegram",
originatingTo: "268300329",
} as FollowupRun);
expect(onBlockReply).not.toHaveBeenCalled();
});
it("does not suppress replies for same target when account differs", async () => {
const onBlockReply = vi.fn(async () => {});
runEmbeddedPiAgentMock.mockResolvedValueOnce({
payloads: [{ text: "hello world!" }],
messagingToolSentTexts: ["different message"],
messagingToolSentTargets: [
{ tool: "telegram", provider: "telegram", to: "268300329", accountId: "work" },
],
meta: {},
});
const runner = createMessagingDedupeRunner(onBlockReply);
await runner({
...baseQueuedRun("heartbeat"),
originatingChannel: "telegram",
originatingTo: "268300329",
originatingAccountId: "personal",
} as FollowupRun);
expect(routeReplyMock).toHaveBeenCalledWith(
expect.objectContaining({
channel: "telegram",
to: "268300329",
accountId: "personal",
}),
);
expect(onBlockReply).not.toHaveBeenCalled();
});
it("drops media URL from payload when messaging tool already sent it", async () => {
const onBlockReply = vi.fn(async () => {});
runEmbeddedPiAgentMock.mockResolvedValueOnce({
payloads: [{ mediaUrl: "/tmp/img.png" }],
messagingToolSentMediaUrls: ["/tmp/img.png"],
meta: {},
});
const runner = createMessagingDedupeRunner(onBlockReply);
await runner(baseQueuedRun());
// Media stripped → payload becomes non-renderable → not delivered.
expect(onBlockReply).not.toHaveBeenCalled();
});
it("delivers media payload when not a duplicate", async () => {
const onBlockReply = vi.fn(async () => {});
runEmbeddedPiAgentMock.mockResolvedValueOnce({
payloads: [{ mediaUrl: "/tmp/img.png" }],
messagingToolSentMediaUrls: ["/tmp/other.png"],
meta: {},
});
const runner = createMessagingDedupeRunner(onBlockReply);
await runner(baseQueuedRun());
expect(onBlockReply).toHaveBeenCalledTimes(1);
});
it("persists usage even when replies are suppressed", async () => {
const storePath = path.join(
await fs.mkdtemp(path.join(tmpdir(), "openclaw-followup-usage-")),
"sessions.json",
);
const sessionKey = "main";
const sessionEntry: SessionEntry = { sessionId: "session", updatedAt: Date.now() };
const sessionStore: Record<string, SessionEntry> = { [sessionKey]: sessionEntry };
await saveSessionStore(storePath, sessionStore);
const onBlockReply = vi.fn(async () => {});
runEmbeddedPiAgentMock.mockResolvedValueOnce({
payloads: [{ text: "hello world!" }],
messagingToolSentTexts: ["different message"],
messagingToolSentTargets: [{ tool: "slack", provider: "slack", to: "channel:C1" }],
meta: {
agentMeta: {
usage: { input: 1_000, output: 50 },
lastCallUsage: { input: 400, output: 20 },
model: "claude-opus-4-5",
provider: "anthropic",
},
},
});
const runner = createMessagingDedupeRunner(onBlockReply, {
sessionEntry,
sessionStore,
sessionKey,
storePath,
});
await runner(baseQueuedRun("slack"));
expect(onBlockReply).not.toHaveBeenCalled();
const store = loadSessionStore(storePath, { skipCache: true });
// totalTokens should reflect the last call usage snapshot, not the accumulated input.
expect(store[sessionKey]?.totalTokens).toBe(400);
expect(store[sessionKey]?.model).toBe("claude-opus-4-5");
// Accumulated usage is still stored for usage/cost tracking.
expect(store[sessionKey]?.inputTokens).toBe(1_000);
expect(store[sessionKey]?.outputTokens).toBe(50);
});
it("does not fall back to dispatcher when cross-channel origin routing fails", async () => {
const onBlockReply = vi.fn(async () => {});
runEmbeddedPiAgentMock.mockResolvedValueOnce({
payloads: [{ text: "hello world!" }],
meta: {},
});
routeReplyMock.mockResolvedValueOnce({
ok: false,
error: "forced route failure",
});
const runner = createMessagingDedupeRunner(onBlockReply);
await runner({
...baseQueuedRun("webchat"),
originatingChannel: "discord",
originatingTo: "channel:C1",
} as FollowupRun);
expect(routeReplyMock).toHaveBeenCalled();
expect(onBlockReply).not.toHaveBeenCalled();
});
it("falls back to dispatcher when same-channel origin routing fails", async () => {
const onBlockReply = vi.fn(async () => {});
runEmbeddedPiAgentMock.mockResolvedValueOnce({
payloads: [{ text: "hello world!" }],
meta: {},
});
routeReplyMock.mockResolvedValueOnce({
ok: false,
error: "outbound adapter unavailable",
});
const runner = createMessagingDedupeRunner(onBlockReply);
await runner({
...baseQueuedRun(" Feishu "),
originatingChannel: "FEISHU",
originatingTo: "ou_abc123",
} as FollowupRun);
expect(routeReplyMock).toHaveBeenCalled();
expect(onBlockReply).toHaveBeenCalledTimes(1);
expect(onBlockReply).toHaveBeenCalledWith(expect.objectContaining({ text: "hello world!" }));
});
it("routes followups with originating account/thread metadata", async () => {
const onBlockReply = vi.fn(async () => {});
runEmbeddedPiAgentMock.mockResolvedValueOnce({
payloads: [{ text: "hello world!" }],
meta: {},
});
const runner = createMessagingDedupeRunner(onBlockReply);
await runner({
...baseQueuedRun("webchat"),
originatingChannel: "discord",
originatingTo: "channel:C1",
originatingAccountId: "work",
originatingThreadId: "1739142736.000100",
} as FollowupRun);
expect(routeReplyMock).toHaveBeenCalledWith(
expect.objectContaining({
channel: "discord",
to: "channel:C1",
accountId: "work",
threadId: "1739142736.000100",
}),
);
expect(onBlockReply).not.toHaveBeenCalled();
});
});
describe("createFollowupRunner typing cleanup", () => {
it("calls both markRunComplete and markDispatchIdle on NO_REPLY", async () => {
const typing = createMockTypingController();
runEmbeddedPiAgentMock.mockResolvedValueOnce({
payloads: [{ text: "NO_REPLY" }],
meta: {},
});
const runner = createFollowupRunner({
opts: { onBlockReply: vi.fn(async () => {}) },
typing,
typingMode: "instant",
defaultModel: "anthropic/claude-opus-4-5",
});
await runner(baseQueuedRun());
expect(typing.markRunComplete).toHaveBeenCalled();
expect(typing.markDispatchIdle).toHaveBeenCalled();
});
it("calls both markRunComplete and markDispatchIdle on empty payloads", async () => {
const typing = createMockTypingController();
runEmbeddedPiAgentMock.mockResolvedValueOnce({
payloads: [],
meta: {},
});
const runner = createFollowupRunner({
opts: { onBlockReply: vi.fn(async () => {}) },
typing,
typingMode: "instant",
defaultModel: "anthropic/claude-opus-4-5",
});
await runner(baseQueuedRun());
expect(typing.markRunComplete).toHaveBeenCalled();
expect(typing.markDispatchIdle).toHaveBeenCalled();
});
it("calls both markRunComplete and markDispatchIdle on agent error", async () => {
const typing = createMockTypingController();
runEmbeddedPiAgentMock.mockRejectedValueOnce(new Error("agent exploded"));
const runner = createFollowupRunner({
opts: { onBlockReply: vi.fn(async () => {}) },
typing,
typingMode: "instant",
defaultModel: "anthropic/claude-opus-4-5",
});
await runner(baseQueuedRun());
expect(typing.markRunComplete).toHaveBeenCalled();
expect(typing.markDispatchIdle).toHaveBeenCalled();
});
it("calls both markRunComplete and markDispatchIdle on successful delivery", async () => {
const typing = createMockTypingController();
const onBlockReply = vi.fn(async () => {});
runEmbeddedPiAgentMock.mockResolvedValueOnce({
payloads: [{ text: "hello world!" }],
meta: {},
});
const runner = createFollowupRunner({
opts: { onBlockReply },
typing,
typingMode: "instant",
defaultModel: "anthropic/claude-opus-4-5",
});
await runner(baseQueuedRun());
expect(onBlockReply).toHaveBeenCalled();
expect(typing.markRunComplete).toHaveBeenCalled();
expect(typing.markDispatchIdle).toHaveBeenCalled();
});
});
describe("createFollowupRunner agentDir forwarding", () => {
it("passes queued run agentDir to runEmbeddedPiAgent", async () => {
runEmbeddedPiAgentMock.mockClear();
const onBlockReply = vi.fn(async () => {});
runEmbeddedPiAgentMock.mockResolvedValueOnce({
payloads: [{ text: "hello world!" }],
messagingToolSentTexts: ["different message"],
meta: {},
});
const runner = createFollowupRunner({
opts: { onBlockReply },
typing: createMockTypingController(),
typingMode: "instant",
defaultModel: "anthropic/claude-opus-4-5",
});
const agentDir = path.join("/tmp", "agent-dir");
const queued = createQueuedRun();
await runner({
...queued,
run: {
...queued.run,
agentDir,
},
});
expect(runEmbeddedPiAgentMock).toHaveBeenCalledTimes(1);
const call = runEmbeddedPiAgentMock.mock.calls.at(-1)?.[0] as { agentDir?: string };
expect(call?.agentDir).toBe(agentDir);
});
});