Merge 84bf1d73ea29f7616eec13a41af9683cba25da6b into 9fb78453e088cd7b553d7779faa0de5c83708e70

This commit is contained in:
Kim 2026-03-21 05:14:10 +00:00 committed by GitHub
commit 906e712e98
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 519 additions and 171 deletions

View File

@ -103,16 +103,23 @@ function escapeRegex(str: string): string {
return str.replace(/[.*+?^${}()|[\]\\]/g, "\\$&");
}
const FILE_EXTENSIONS_PATTERN = Array.from(FILE_REF_EXTENSIONS_WITH_TLD).map(escapeRegex).join("|");
const FILE_EXTENSIONS_PATTERN =
FILE_REF_EXTENSIONS_WITH_TLD && Symbol.iterator in FILE_REF_EXTENSIONS_WITH_TLD
? Array.from(FILE_REF_EXTENSIONS_WITH_TLD).map(escapeRegex).join("|")
: "";
const AUTO_LINKED_ANCHOR_PATTERN = /<a\s+href="https?:\/\/([^"]+)"[^>]*>\1<\/a>/gi;
const FILE_REFERENCE_PATTERN = new RegExp(
`(^|[^a-zA-Z0-9_\\-/])([a-zA-Z0-9_.\\-./]+\\.(?:${FILE_EXTENSIONS_PATTERN}))(?=$|[^a-zA-Z0-9_\\-/])`,
"gi",
);
const ORPHANED_TLD_PATTERN = new RegExp(
`([^a-zA-Z0-9]|^)([A-Za-z]\\.(?:${FILE_EXTENSIONS_PATTERN}))(?=[^a-zA-Z0-9/]|$)`,
"g",
);
const FILE_REFERENCE_PATTERN = FILE_EXTENSIONS_PATTERN
? new RegExp(
`(^|[^a-zA-Z0-9_\\-/])([a-zA-Z0-9_.\\-./]+\\.(?:${FILE_EXTENSIONS_PATTERN}))(?=$|[^a-zA-Z0-9_\\-/])`,
"gi",
)
: null;
const ORPHANED_TLD_PATTERN = FILE_EXTENSIONS_PATTERN
? new RegExp(
`([^a-zA-Z0-9]|^)([A-Za-z]\\.(?:${FILE_EXTENSIONS_PATTERN}))(?=[^a-zA-Z0-9/]|$)`,
"g",
)
: null;
const HTML_TAG_PATTERN = /(<\/?)([a-zA-Z][a-zA-Z0-9-]*)\b[^>]*?>/gi;
function wrapStandaloneFileRef(match: string, prefix: string, filename: string): string {
@ -131,7 +138,14 @@ function wrapSegmentFileRefs(
preDepth: number,
anchorDepth: number,
): string {
if (!text || codeDepth > 0 || preDepth > 0 || anchorDepth > 0) {
if (
!text ||
!FILE_REFERENCE_PATTERN ||
!ORPHANED_TLD_PATTERN ||
codeDepth > 0 ||
preDepth > 0 ||
anchorDepth > 0
) {
return text;
}
const wrappedStandalone = text.replace(FILE_REFERENCE_PATTERN, wrapStandaloneFileRef);

View File

@ -58,6 +58,8 @@ import { incrementRunCompactionCount, persistRunSessionUsage } from "./session-r
import { createTypingSignaler } from "./typing-mode.js";
import type { TypingController } from "./typing.js";
const RECENT_MESSAGING_TOOL_DEDUPE_WINDOW_MS = 2 * 60 * 1000;
const BLOCK_REPLY_SEND_TIMEOUT_MS = 15_000;
export async function runReplyAgent(params: {
@ -292,6 +294,11 @@ export async function runReplyAgent(params: {
fallbackNoticeSelectedModel: undefined,
fallbackNoticeActiveModel: undefined,
fallbackNoticeReason: undefined,
lastMessagingToolSessionId: undefined,
lastMessagingToolSentAt: undefined,
lastMessagingToolSentTexts: undefined,
lastMessagingToolSentMediaUrls: undefined,
lastMessagingToolSentTargets: undefined,
};
const agentId = resolveAgentIdFromSessionKey(sessionKey);
const nextSessionFile = resolveSessionTranscriptPath(
@ -414,6 +421,54 @@ export async function runReplyAgent(params: {
const payloadArray = runResult.payloads ?? [];
const sentTexts = runResult.messagingToolSentTexts ?? [];
const sentMediaUrls = runResult.messagingToolSentMediaUrls ?? [];
const sentTargets = runResult.messagingToolSentTargets ?? [];
const sessionDedupeEntry = activeSessionEntry;
if (sessionDedupeEntry) {
const now = Date.now();
if (sentTexts.length || sentMediaUrls.length || sentTargets.length) {
sessionDedupeEntry.lastMessagingToolSessionId = followupRun.run.sessionId;
sessionDedupeEntry.lastMessagingToolSentAt = now;
sessionDedupeEntry.lastMessagingToolSentTexts = sentTexts;
sessionDedupeEntry.lastMessagingToolSentMediaUrls = sentMediaUrls;
sessionDedupeEntry.lastMessagingToolSentTargets = sentTargets;
} else if (
typeof sessionDedupeEntry.lastMessagingToolSentAt === "number" &&
now - sessionDedupeEntry.lastMessagingToolSentAt > RECENT_MESSAGING_TOOL_DEDUPE_WINDOW_MS
) {
delete sessionDedupeEntry.lastMessagingToolSessionId;
delete sessionDedupeEntry.lastMessagingToolSentAt;
delete sessionDedupeEntry.lastMessagingToolSentTexts;
delete sessionDedupeEntry.lastMessagingToolSentMediaUrls;
delete sessionDedupeEntry.lastMessagingToolSentTargets;
}
if (sessionKey && activeSessionStore) {
activeSessionStore[sessionKey] = sessionDedupeEntry;
}
if (sessionKey && storePath) {
try {
await updateSessionStoreEntry({
storePath,
sessionKey,
update: async () => ({
lastMessagingToolSessionId: sessionDedupeEntry.lastMessagingToolSessionId,
lastMessagingToolSentAt: sessionDedupeEntry.lastMessagingToolSentAt,
lastMessagingToolSentTexts: sessionDedupeEntry.lastMessagingToolSentTexts,
lastMessagingToolSentMediaUrls: sessionDedupeEntry.lastMessagingToolSentMediaUrls,
lastMessagingToolSentTargets: sessionDedupeEntry.lastMessagingToolSentTargets,
}),
});
} catch (error) {
console.warn(
"Failed to persist messaging-tool dedupe state for session",
sessionKey,
error,
);
}
}
}
if (blockReplyPipeline) {
await blockReplyPipeline.flush({ force: true });
blockReplyPipeline.stop();

View File

@ -79,10 +79,6 @@ function mockCompactionRun(params: {
);
}
function createAsyncReplySpy() {
return vi.fn(async () => {});
}
describe("createFollowupRunner compaction", () => {
it("adds verbose auto-compaction notice and tracks count", async () => {
const storePath = path.join(
@ -321,97 +317,95 @@ describe("createFollowupRunner messaging tool dedupe", () => {
});
}
async function runMessagingCase(params: {
agentResult: Record<string, unknown>;
queued?: FollowupRun;
runnerOverrides?: Partial<{
sessionEntry: SessionEntry;
sessionStore: Record<string, SessionEntry>;
sessionKey: string;
storePath: string;
}>;
}) {
const onBlockReply = createAsyncReplySpy();
runEmbeddedPiAgentMock.mockResolvedValueOnce({
meta: {},
...params.agentResult,
});
const runner = createMessagingDedupeRunner(onBlockReply, params.runnerOverrides);
await runner(params.queued ?? baseQueuedRun());
return { onBlockReply };
}
function makeTextReplyDedupeResult(overrides?: Record<string, unknown>) {
return {
payloads: [{ text: "hello world!" }],
messagingToolSentTexts: ["different message"],
...overrides,
};
}
it("drops payloads already sent via messaging tool", async () => {
const { onBlockReply } = await runMessagingCase({
agentResult: {
payloads: [{ text: "hello world!" }],
messagingToolSentTexts: ["hello world!"],
},
const onBlockReply = vi.fn(async () => {});
runEmbeddedPiAgentMock.mockResolvedValueOnce({
payloads: [{ text: "hello world!" }],
messagingToolSentTexts: ["hello world!"],
meta: {},
});
const runner = createMessagingDedupeRunner(onBlockReply);
await runner({
...baseQueuedRun("telegram"),
originatingTo: "123",
});
expect(onBlockReply).not.toHaveBeenCalled();
});
it("delivers payloads when not duplicates", async () => {
const { onBlockReply } = await runMessagingCase({
agentResult: makeTextReplyDedupeResult(),
const onBlockReply = vi.fn(async () => {});
runEmbeddedPiAgentMock.mockResolvedValueOnce({
payloads: [{ text: "hello world!" }],
messagingToolSentTexts: ["different message"],
meta: {},
});
const runner = createMessagingDedupeRunner(onBlockReply);
await runner(baseQueuedRun());
expect(onBlockReply).toHaveBeenCalledTimes(1);
});
it("suppresses replies when a messaging tool sent via the same provider + target", async () => {
const { onBlockReply } = await runMessagingCase({
agentResult: {
...makeTextReplyDedupeResult(),
messagingToolSentTargets: [{ tool: "slack", provider: "slack", to: "channel:C1" }],
},
queued: baseQueuedRun("slack"),
const onBlockReply = vi.fn(async () => {});
runEmbeddedPiAgentMock.mockResolvedValueOnce({
payloads: [{ text: "hello world!" }],
messagingToolSentTexts: ["different message"],
messagingToolSentTargets: [{ tool: "slack", provider: "slack", to: "channel:C1" }],
meta: {},
});
const runner = createMessagingDedupeRunner(onBlockReply);
await runner(baseQueuedRun("slack"));
expect(onBlockReply).not.toHaveBeenCalled();
});
it("suppresses replies when provider is synthetic but originating channel matches", async () => {
const { onBlockReply } = await runMessagingCase({
agentResult: {
...makeTextReplyDedupeResult(),
messagingToolSentTargets: [{ tool: "telegram", provider: "telegram", to: "268300329" }],
},
queued: {
...baseQueuedRun("heartbeat"),
originatingChannel: "telegram",
originatingTo: "268300329",
} as FollowupRun,
const onBlockReply = vi.fn(async () => {});
runEmbeddedPiAgentMock.mockResolvedValueOnce({
payloads: [{ text: "hello world!" }],
messagingToolSentTexts: ["different message"],
messagingToolSentTargets: [{ tool: "telegram", provider: "telegram", to: "268300329" }],
meta: {},
});
const runner = createMessagingDedupeRunner(onBlockReply);
await runner({
...baseQueuedRun("heartbeat"),
originatingChannel: "telegram",
originatingTo: "268300329",
} as FollowupRun);
expect(onBlockReply).not.toHaveBeenCalled();
});
it("does not suppress replies for same target when account differs", async () => {
const { onBlockReply } = await runMessagingCase({
agentResult: {
...makeTextReplyDedupeResult(),
messagingToolSentTargets: [
{ tool: "telegram", provider: "telegram", to: "268300329", accountId: "work" },
],
},
queued: {
...baseQueuedRun("heartbeat"),
originatingChannel: "telegram",
originatingTo: "268300329",
originatingAccountId: "personal",
} as FollowupRun,
const onBlockReply = vi.fn(async () => {});
runEmbeddedPiAgentMock.mockResolvedValueOnce({
payloads: [{ text: "hello world!" }],
messagingToolSentTexts: ["different message"],
messagingToolSentTargets: [
{ tool: "telegram", provider: "telegram", to: "268300329", accountId: "work" },
],
meta: {},
});
const runner = createMessagingDedupeRunner(onBlockReply);
await runner({
...baseQueuedRun("heartbeat"),
originatingChannel: "telegram",
originatingTo: "268300329",
originatingAccountId: "personal",
} as FollowupRun);
expect(routeReplyMock).toHaveBeenCalledWith(
expect.objectContaining({
channel: "telegram",
@ -422,26 +416,224 @@ describe("createFollowupRunner messaging tool dedupe", () => {
expect(onBlockReply).not.toHaveBeenCalled();
});
it("drops media URL from payload when messaging tool already sent it", async () => {
const { onBlockReply } = await runMessagingCase({
agentResult: {
payloads: [{ mediaUrl: "/tmp/img.png" }],
messagingToolSentMediaUrls: ["/tmp/img.png"],
},
it("suppresses replies using recent session-level messaging-tool dedupe state", async () => {
const onBlockReply = vi.fn(async () => {});
const sessionEntry: SessionEntry = {
sessionId: "session",
updatedAt: Date.now(),
lastMessagingToolSessionId: "session",
lastMessagingToolSentAt: Date.now(),
lastMessagingToolSentTexts: ["hello world!"],
lastMessagingToolSentTargets: [{ tool: "message", provider: "telegram", to: "123" }],
};
const sessionStore: Record<string, SessionEntry> = { main: sessionEntry };
runEmbeddedPiAgentMock.mockResolvedValueOnce({
payloads: [{ text: "hello world!" }],
meta: {},
});
const runner = createMessagingDedupeRunner(onBlockReply, {
sessionEntry,
sessionStore,
sessionKey: "main",
});
await runner({
...baseQueuedRun("telegram"),
originatingTo: "123",
});
expect(onBlockReply).not.toHaveBeenCalled();
});
it("does not reuse session-level dedupe fingerprints for queued user turns", async () => {
const onBlockReply = vi.fn(async () => {});
const sessionEntry: SessionEntry = {
sessionId: "session",
updatedAt: Date.now(),
lastMessagingToolSessionId: "session",
lastMessagingToolSentAt: Date.now(),
lastMessagingToolSentTexts: ["hello world!"],
lastMessagingToolSentTargets: [{ tool: "message", provider: "telegram", to: "123" }],
};
const sessionStore: Record<string, SessionEntry> = { main: sessionEntry };
runEmbeddedPiAgentMock.mockResolvedValueOnce({
payloads: [{ text: "hello world!" }],
meta: {},
});
const runner = createMessagingDedupeRunner(onBlockReply, {
sessionEntry,
sessionStore,
sessionKey: "main",
});
await runner({
...baseQueuedRun("telegram"),
messageId: "user-msg-123",
originatingTo: "123",
});
expect(onBlockReply).toHaveBeenCalledTimes(1);
});
it("does not use session-level dedupe from a previous session id", async () => {
const onBlockReply = vi.fn(async () => {});
const sessionEntry: SessionEntry = {
sessionId: "current-session",
updatedAt: Date.now(),
lastMessagingToolSessionId: "old-session",
lastMessagingToolSentAt: Date.now(),
lastMessagingToolSentTexts: ["hello world!"],
lastMessagingToolSentTargets: [{ tool: "message", provider: "telegram", to: "123" }],
};
const sessionStore: Record<string, SessionEntry> = { main: sessionEntry };
runEmbeddedPiAgentMock.mockResolvedValueOnce({
payloads: [{ text: "hello world!" }],
meta: {},
});
const runner = createMessagingDedupeRunner(onBlockReply, {
sessionEntry,
sessionStore,
sessionKey: "main",
});
await runner({
...baseQueuedRun("telegram"),
originatingTo: "123",
run: { ...baseQueuedRun("telegram").run, sessionId: "current-session" },
});
expect(onBlockReply).toHaveBeenCalled();
});
it("does not use session-level text dedupe when recent target does not match", async () => {
const onBlockReply = vi.fn(async () => {});
const sessionEntry: SessionEntry = {
sessionId: "session",
updatedAt: Date.now(),
lastMessagingToolSentAt: Date.now(),
lastMessagingToolSentTexts: ["hello world!"],
lastMessagingToolSentTargets: [{ tool: "message", provider: "telegram", to: "999" }],
};
const sessionStore: Record<string, SessionEntry> = { main: sessionEntry };
runEmbeddedPiAgentMock.mockResolvedValueOnce({
payloads: [{ text: "hello world!" }],
meta: {},
});
const runner = createMessagingDedupeRunner(onBlockReply, {
sessionEntry,
sessionStore,
sessionKey: "main",
});
await runner(baseQueuedRun("telegram"));
expect(onBlockReply).toHaveBeenCalled();
});
it("does not reuse session-level text dedupe when prior run had multiple messaging targets", async () => {
const onBlockReply = vi.fn(async () => {});
const sessionEntry: SessionEntry = {
sessionId: "session",
updatedAt: Date.now(),
lastMessagingToolSessionId: "session",
lastMessagingToolSentAt: Date.now(),
lastMessagingToolSentTexts: ["hello world!"],
lastMessagingToolSentTargets: [
{ tool: "message", provider: "telegram", to: "123" },
{ tool: "message", provider: "telegram", to: "999" },
],
};
const sessionStore: Record<string, SessionEntry> = { main: sessionEntry };
runEmbeddedPiAgentMock.mockResolvedValueOnce({
payloads: [{ text: "hello world!" }],
meta: {},
});
const runner = createMessagingDedupeRunner(onBlockReply, {
sessionEntry,
sessionStore,
sessionKey: "main",
});
await runner({
...baseQueuedRun("telegram"),
originatingTo: "123",
});
expect(onBlockReply).toHaveBeenCalled();
});
it("reuses session-level text dedupe when prior run had repeated sends to one target", async () => {
const onBlockReply = vi.fn(async () => {});
const sessionEntry: SessionEntry = {
sessionId: "session",
updatedAt: Date.now(),
lastMessagingToolSessionId: "session",
lastMessagingToolSentAt: Date.now(),
lastMessagingToolSentTexts: ["hello world!"],
lastMessagingToolSentTargets: [
{ tool: "message", provider: "telegram", to: "123" },
{ tool: "message", provider: "telegram", to: "123" },
],
};
const sessionStore: Record<string, SessionEntry> = { main: sessionEntry };
runEmbeddedPiAgentMock.mockResolvedValueOnce({
payloads: [{ text: "hello world!" }],
meta: {},
});
const runner = createMessagingDedupeRunner(onBlockReply, {
sessionEntry,
sessionStore,
sessionKey: "main",
});
await runner({
...baseQueuedRun("telegram"),
originatingTo: "123",
});
expect(onBlockReply).not.toHaveBeenCalled();
});
it("drops media URL from payload when messaging tool already sent it", async () => {
const onBlockReply = vi.fn(async () => {});
runEmbeddedPiAgentMock.mockResolvedValueOnce({
payloads: [{ mediaUrl: "/tmp/img.png" }],
messagingToolSentMediaUrls: ["/tmp/img.png"],
meta: {},
});
const runner = createMessagingDedupeRunner(onBlockReply);
await runner(baseQueuedRun());
// Media stripped → payload becomes non-renderable → not delivered.
expect(onBlockReply).not.toHaveBeenCalled();
});
it("delivers media payload when not a duplicate", async () => {
const { onBlockReply } = await runMessagingCase({
agentResult: {
payloads: [{ mediaUrl: "/tmp/img.png" }],
messagingToolSentMediaUrls: ["/tmp/other.png"],
},
const onBlockReply = vi.fn(async () => {});
runEmbeddedPiAgentMock.mockResolvedValueOnce({
payloads: [{ mediaUrl: "/tmp/img.png" }],
messagingToolSentMediaUrls: ["/tmp/other.png"],
meta: {},
});
const runner = createMessagingDedupeRunner(onBlockReply);
await runner(baseQueuedRun());
expect(onBlockReply).toHaveBeenCalledTimes(1);
});
@ -455,28 +647,30 @@ describe("createFollowupRunner messaging tool dedupe", () => {
const sessionStore: Record<string, SessionEntry> = { [sessionKey]: sessionEntry };
await saveSessionStore(storePath, sessionStore);
const { onBlockReply } = await runMessagingCase({
agentResult: {
...makeTextReplyDedupeResult(),
messagingToolSentTargets: [{ tool: "slack", provider: "slack", to: "channel:C1" }],
meta: {
agentMeta: {
usage: { input: 1_000, output: 50 },
lastCallUsage: { input: 400, output: 20 },
model: "claude-opus-4-5",
provider: "anthropic",
},
const onBlockReply = vi.fn(async () => {});
runEmbeddedPiAgentMock.mockResolvedValueOnce({
payloads: [{ text: "hello world!" }],
messagingToolSentTexts: ["different message"],
messagingToolSentTargets: [{ tool: "slack", provider: "slack", to: "channel:C1" }],
meta: {
agentMeta: {
usage: { input: 1_000, output: 50 },
lastCallUsage: { input: 400, output: 20 },
model: "claude-opus-4-5",
provider: "anthropic",
},
},
runnerOverrides: {
sessionEntry,
sessionStore,
sessionKey,
storePath,
},
queued: baseQueuedRun("slack"),
});
const runner = createMessagingDedupeRunner(onBlockReply, {
sessionEntry,
sessionStore,
sessionKey,
storePath,
});
await runner(baseQueuedRun("slack"));
expect(onBlockReply).not.toHaveBeenCalled();
const store = loadSessionStore(storePath, { skipCache: true });
// totalTokens should reflect the last call usage snapshot, not the accumulated input.
@ -546,36 +740,46 @@ describe("createFollowupRunner messaging tool dedupe", () => {
});
it("does not fall back to dispatcher when cross-channel origin routing fails", async () => {
const onBlockReply = vi.fn(async () => {});
runEmbeddedPiAgentMock.mockResolvedValueOnce({
payloads: [{ text: "hello world!" }],
meta: {},
});
routeReplyMock.mockResolvedValueOnce({
ok: false,
error: "forced route failure",
});
const { onBlockReply } = await runMessagingCase({
agentResult: { payloads: [{ text: "hello world!" }] },
queued: {
...baseQueuedRun("webchat"),
originatingChannel: "discord",
originatingTo: "channel:C1",
} as FollowupRun,
});
const runner = createMessagingDedupeRunner(onBlockReply);
await runner({
...baseQueuedRun("webchat"),
originatingChannel: "discord",
originatingTo: "channel:C1",
} as FollowupRun);
expect(routeReplyMock).toHaveBeenCalled();
expect(onBlockReply).not.toHaveBeenCalled();
});
it("falls back to dispatcher when same-channel origin routing fails", async () => {
const onBlockReply = vi.fn(async () => {});
runEmbeddedPiAgentMock.mockResolvedValueOnce({
payloads: [{ text: "hello world!" }],
meta: {},
});
routeReplyMock.mockResolvedValueOnce({
ok: false,
error: "outbound adapter unavailable",
});
const { onBlockReply } = await runMessagingCase({
agentResult: { payloads: [{ text: "hello world!" }] },
queued: {
...baseQueuedRun(" Feishu "),
originatingChannel: "FEISHU",
originatingTo: "ou_abc123",
} as FollowupRun,
});
const runner = createMessagingDedupeRunner(onBlockReply);
await runner({
...baseQueuedRun(" Feishu "),
originatingChannel: "FEISHU",
originatingTo: "ou_abc123",
} as FollowupRun);
expect(routeReplyMock).toHaveBeenCalled();
expect(onBlockReply).toHaveBeenCalledTimes(1);
@ -583,17 +787,22 @@ describe("createFollowupRunner messaging tool dedupe", () => {
});
it("routes followups with originating account/thread metadata", async () => {
const { onBlockReply } = await runMessagingCase({
agentResult: { payloads: [{ text: "hello world!" }] },
queued: {
...baseQueuedRun("webchat"),
originatingChannel: "discord",
originatingTo: "channel:C1",
originatingAccountId: "work",
originatingThreadId: "1739142736.000100",
} as FollowupRun,
const onBlockReply = vi.fn(async () => {});
runEmbeddedPiAgentMock.mockResolvedValueOnce({
payloads: [{ text: "hello world!" }],
meta: {},
});
const runner = createMessagingDedupeRunner(onBlockReply);
await runner({
...baseQueuedRun("webchat"),
originatingChannel: "discord",
originatingTo: "channel:C1",
originatingAccountId: "work",
originatingThreadId: "1739142736.000100",
} as FollowupRun);
expect(routeReplyMock).toHaveBeenCalledWith(
expect.objectContaining({
channel: "discord",
@ -607,37 +816,44 @@ describe("createFollowupRunner messaging tool dedupe", () => {
});
describe("createFollowupRunner typing cleanup", () => {
async function runTypingCase(agentResult: Record<string, unknown>) {
it("calls both markRunComplete and markDispatchIdle on NO_REPLY", async () => {
const typing = createMockTypingController();
runEmbeddedPiAgentMock.mockResolvedValueOnce({
payloads: [{ text: "NO_REPLY" }],
meta: {},
...agentResult,
});
const runner = createFollowupRunner({
opts: { onBlockReply: createAsyncReplySpy() },
opts: { onBlockReply: vi.fn(async () => {}) },
typing,
typingMode: "instant",
defaultModel: "anthropic/claude-opus-4-5",
});
await runner(baseQueuedRun());
return typing;
}
function expectTypingCleanup(typing: ReturnType<typeof createMockTypingController>) {
expect(typing.markRunComplete).toHaveBeenCalled();
expect(typing.markDispatchIdle).toHaveBeenCalled();
}
it("calls both markRunComplete and markDispatchIdle on NO_REPLY", async () => {
const typing = await runTypingCase({ payloads: [{ text: "NO_REPLY" }] });
expectTypingCleanup(typing);
});
it("calls both markRunComplete and markDispatchIdle on empty payloads", async () => {
const typing = await runTypingCase({ payloads: [] });
expectTypingCleanup(typing);
const typing = createMockTypingController();
runEmbeddedPiAgentMock.mockResolvedValueOnce({
payloads: [],
meta: {},
});
const runner = createFollowupRunner({
opts: { onBlockReply: vi.fn(async () => {}) },
typing,
typingMode: "instant",
defaultModel: "anthropic/claude-opus-4-5",
});
await runner(baseQueuedRun());
expect(typing.markRunComplete).toHaveBeenCalled();
expect(typing.markDispatchIdle).toHaveBeenCalled();
});
it("calls both markRunComplete and markDispatchIdle on agent error", async () => {
@ -653,7 +869,8 @@ describe("createFollowupRunner typing cleanup", () => {
await runner(baseQueuedRun());
expectTypingCleanup(typing);
expect(typing.markRunComplete).toHaveBeenCalled();
expect(typing.markDispatchIdle).toHaveBeenCalled();
});
it("calls both markRunComplete and markDispatchIdle on successful delivery", async () => {
@ -674,7 +891,8 @@ describe("createFollowupRunner typing cleanup", () => {
await runner(baseQueuedRun());
expect(onBlockReply).toHaveBeenCalled();
expectTypingCleanup(typing);
expect(typing.markRunComplete).toHaveBeenCalled();
expect(typing.markDispatchIdle).toHaveBeenCalled();
});
});

View File

@ -38,6 +38,8 @@ import { incrementRunCompactionCount, persistRunSessionUsage } from "./session-r
import { createTypingSignaler } from "./typing-mode.js";
import type { TypingController } from "./typing.js";
const RECENT_MESSAGING_TOOL_DEDUPE_WINDOW_MS = 2 * 60 * 1000;
export function createFollowupRunner(params: {
opts?: GetReplyOptions;
typing: TypingController;
@ -316,27 +318,74 @@ export function createFollowupRunner(params: {
replyToChannel,
});
const messageProvider = resolveOriginMessageProvider({
originatingChannel: queued.originatingChannel,
provider: queued.run.messageProvider,
});
const originatingTo = resolveOriginMessageTo({
originatingTo: queued.originatingTo,
});
const originAccountId = resolveOriginAccountId({
originatingAccountId: queued.originatingAccountId,
accountId: queued.run.agentAccountId,
});
const now = Date.now();
const recentWindowActive =
typeof sessionEntry?.lastMessagingToolSentAt === "number" &&
sessionEntry?.lastMessagingToolSessionId === queued.run.sessionId &&
now - sessionEntry.lastMessagingToolSentAt <= RECENT_MESSAGING_TOOL_DEDUPE_WINDOW_MS;
const previousSentTargets = sessionEntry?.lastMessagingToolSentTargets ?? [];
const recentTargetMatch =
recentWindowActive &&
shouldSuppressMessagingToolReplies({
messageProvider,
messagingToolSentTargets: previousSentTargets,
originatingTo,
accountId: originAccountId,
});
const uniquePreviousTargets = new Set(
previousSentTargets.map((target) =>
JSON.stringify({
provider: target.provider,
to: target.to ?? "",
accountId: target.accountId ?? "",
}),
),
);
const isSystemGeneratedFollowup = !queued.messageId;
const canReuseSessionDedupeFingerprints =
isSystemGeneratedFollowup && recentTargetMatch && uniquePreviousTargets.size <= 1;
const sentTexts = [
...(runResult.messagingToolSentTexts ?? []),
...(canReuseSessionDedupeFingerprints
? (sessionEntry?.lastMessagingToolSentTexts ?? [])
: []),
];
const sentMediaUrls = [
...(runResult.messagingToolSentMediaUrls ?? []),
...(canReuseSessionDedupeFingerprints
? (sessionEntry?.lastMessagingToolSentMediaUrls ?? [])
: []),
];
// Keep target-based suppression scoped to the current run only.
// Session-level dedupe state is used for text/media duplicate filtering when target matches.
const sentTargets = runResult.messagingToolSentTargets ?? [];
const dedupedPayloads = filterMessagingToolDuplicates({
payloads: replyTaggedPayloads,
sentTexts: runResult.messagingToolSentTexts ?? [],
sentTexts,
});
const mediaFilteredPayloads = filterMessagingToolMediaDuplicates({
payloads: dedupedPayloads,
sentMediaUrls: runResult.messagingToolSentMediaUrls ?? [],
sentMediaUrls,
});
const suppressMessagingToolReplies = shouldSuppressMessagingToolReplies({
messageProvider: resolveOriginMessageProvider({
originatingChannel: queued.originatingChannel,
provider: queued.run.messageProvider,
}),
messagingToolSentTargets: runResult.messagingToolSentTargets,
originatingTo: resolveOriginMessageTo({
originatingTo: queued.originatingTo,
}),
accountId: resolveOriginAccountId({
originatingAccountId: queued.originatingAccountId,
accountId: queued.run.agentAccountId,
}),
messageProvider,
messagingToolSentTargets: sentTargets,
originatingTo,
accountId: originAccountId,
});
const finalPayloads = suppressMessagingToolReplies ? [] : mediaFilteredPayloads;

View File

@ -35,16 +35,17 @@ const resolveGatewayPort = vi.fn(() => 18789);
const findVerifiedGatewayListenerPidsOnPortSync = vi.fn<(port: number) => number[]>(() => []);
const signalVerifiedGatewayPidSync = vi.fn<(pid: number, signal: "SIGTERM" | "SIGUSR1") => void>();
const formatGatewayPidList = vi.fn<(pids: number[]) => string>((pids) => pids.join(", "));
const probeGateway = vi.fn<
(opts: {
url: string;
auth?: { token?: string; password?: string };
timeoutMs: number;
}) => Promise<{
ok: boolean;
configSnapshot: unknown;
}>
>();
const probeGateway =
vi.fn<
(opts: {
url: string;
auth?: { token?: string; password?: string };
timeoutMs: number;
}) => Promise<{
ok: boolean;
configSnapshot: unknown;
}>
>();
const isRestartEnabled = vi.fn<(config?: { commands?: unknown }) => boolean>(() => true);
const loadConfig = vi.fn(() => ({}));

View File

@ -1,5 +1,6 @@
import crypto from "node:crypto";
import type { Skill } from "@mariozechner/pi-coding-agent";
import type { MessagingToolSend } from "../../agents/pi-embedded-runner.js";
import type { ChatType } from "../../channels/chat-type.js";
import type { ChannelId } from "../../channels/plugins/types.js";
import type { DeliveryContext } from "../../utils/delivery-context.js";
@ -73,6 +74,16 @@ export type SessionEntry = {
lastHeartbeatText?: string;
/** Timestamp (ms) when lastHeartbeatText was delivered. */
lastHeartbeatSentAt?: number;
/** Timestamp (ms) for the most recent message-tool send fingerprint (cross-run dedupe). */
lastMessagingToolSentAt?: number;
/** Session id that produced the most recent message-tool dedupe fingerprint. */
lastMessagingToolSessionId?: string;
/** Recently sent message-tool text payloads for short-window cross-run dedupe. */
lastMessagingToolSentTexts?: string[];
/** Recently sent message-tool media urls for short-window cross-run dedupe. */
lastMessagingToolSentMediaUrls?: string[];
/** Recently sent message-tool routing targets for short-window cross-run dedupe. */
lastMessagingToolSentTargets?: MessagingToolSend[];
sessionId: string;
updatedAt: number;
sessionFile?: string;