handleToolExecutionStart() flushed pending block replies and then called onBlockReplyFlush() as fire-and-forget (`void`). This created a race where fast tool results (especially media on Telegram) could be delivered before the text block that preceded the tool call. Await onBlockReplyFlush() so the block pipeline finishes before tool execution continues, preserving delivery order. Fixes #25267 Co-authored-by: Cursor <cursoragent@cursor.com>
335 lines
10 KiB
TypeScript
335 lines
10 KiB
TypeScript
import type { AgentEvent } from "@mariozechner/pi-agent-core";
|
|
import { describe, expect, it, vi } from "vitest";
|
|
import type { MessagingToolSend } from "./pi-embedded-messaging.js";
|
|
import {
|
|
handleToolExecutionEnd,
|
|
handleToolExecutionStart,
|
|
} from "./pi-embedded-subscribe.handlers.tools.js";
|
|
import type {
|
|
ToolCallSummary,
|
|
ToolHandlerContext,
|
|
} from "./pi-embedded-subscribe.handlers.types.js";
|
|
|
|
type ToolExecutionStartEvent = Extract<AgentEvent, { type: "tool_execution_start" }>;
|
|
type ToolExecutionEndEvent = Extract<AgentEvent, { type: "tool_execution_end" }>;
|
|
|
|
function createTestContext(): {
|
|
ctx: ToolHandlerContext;
|
|
warn: ReturnType<typeof vi.fn>;
|
|
onBlockReplyFlush: ReturnType<typeof vi.fn>;
|
|
} {
|
|
const onBlockReplyFlush = vi.fn();
|
|
const warn = vi.fn();
|
|
const ctx: ToolHandlerContext = {
|
|
params: {
|
|
runId: "run-test",
|
|
onBlockReplyFlush,
|
|
onAgentEvent: undefined,
|
|
onToolResult: undefined,
|
|
},
|
|
flushBlockReplyBuffer: vi.fn(),
|
|
hookRunner: undefined,
|
|
log: {
|
|
debug: vi.fn(),
|
|
warn,
|
|
},
|
|
state: {
|
|
toolMetaById: new Map<string, ToolCallSummary>(),
|
|
toolMetas: [],
|
|
toolSummaryById: new Set<string>(),
|
|
pendingMessagingTargets: new Map<string, MessagingToolSend>(),
|
|
pendingMessagingTexts: new Map<string, string>(),
|
|
pendingMessagingMediaUrls: new Map<string, string[]>(),
|
|
messagingToolSentTexts: [],
|
|
messagingToolSentTextsNormalized: [],
|
|
messagingToolSentMediaUrls: [],
|
|
messagingToolSentTargets: [],
|
|
successfulCronAdds: 0,
|
|
},
|
|
shouldEmitToolResult: () => false,
|
|
shouldEmitToolOutput: () => false,
|
|
emitToolSummary: vi.fn(),
|
|
emitToolOutput: vi.fn(),
|
|
trimMessagingToolSent: vi.fn(),
|
|
};
|
|
|
|
return { ctx, warn, onBlockReplyFlush };
|
|
}
|
|
|
|
describe("handleToolExecutionStart read path checks", () => {
|
|
it("does not warn when read tool uses file_path alias", async () => {
|
|
const { ctx, warn, onBlockReplyFlush } = createTestContext();
|
|
|
|
const evt: ToolExecutionStartEvent = {
|
|
type: "tool_execution_start",
|
|
toolName: "read",
|
|
toolCallId: "tool-1",
|
|
args: { file_path: "/tmp/example.txt" },
|
|
};
|
|
|
|
await handleToolExecutionStart(ctx, evt);
|
|
|
|
expect(onBlockReplyFlush).toHaveBeenCalledTimes(1);
|
|
expect(warn).not.toHaveBeenCalled();
|
|
});
|
|
|
|
it("warns when read tool has neither path nor file_path", async () => {
|
|
const { ctx, warn } = createTestContext();
|
|
|
|
const evt: ToolExecutionStartEvent = {
|
|
type: "tool_execution_start",
|
|
toolName: "read",
|
|
toolCallId: "tool-2",
|
|
args: {},
|
|
};
|
|
|
|
await handleToolExecutionStart(ctx, evt);
|
|
|
|
expect(warn).toHaveBeenCalledTimes(1);
|
|
expect(String(warn.mock.calls[0]?.[0] ?? "")).toContain("read tool called without path");
|
|
});
|
|
|
|
it("awaits onBlockReplyFlush before continuing tool start processing", async () => {
|
|
const { ctx, onBlockReplyFlush } = createTestContext();
|
|
let releaseFlush: (() => void) | undefined;
|
|
onBlockReplyFlush.mockImplementation(
|
|
() =>
|
|
new Promise<void>((resolve) => {
|
|
releaseFlush = resolve;
|
|
}),
|
|
);
|
|
|
|
const evt: ToolExecutionStartEvent = {
|
|
type: "tool_execution_start",
|
|
toolName: "exec",
|
|
toolCallId: "tool-await-flush",
|
|
args: { command: "echo hi" },
|
|
};
|
|
|
|
const pending = handleToolExecutionStart(ctx, evt);
|
|
// Let the async function reach the awaited flush Promise.
|
|
await Promise.resolve();
|
|
|
|
// If flush isn't awaited, tool metadata would already be recorded here.
|
|
expect(ctx.state.toolMetaById.has("tool-await-flush")).toBe(false);
|
|
expect(releaseFlush).toBeTypeOf("function");
|
|
|
|
releaseFlush?.();
|
|
await pending;
|
|
|
|
expect(ctx.state.toolMetaById.has("tool-await-flush")).toBe(true);
|
|
});
|
|
});
|
|
|
|
describe("handleToolExecutionEnd cron.add commitment tracking", () => {
|
|
it("increments successfulCronAdds when cron add succeeds", async () => {
|
|
const { ctx } = createTestContext();
|
|
await handleToolExecutionStart(
|
|
ctx as never,
|
|
{
|
|
type: "tool_execution_start",
|
|
toolName: "cron",
|
|
toolCallId: "tool-cron-1",
|
|
args: { action: "add", job: { name: "reminder" } },
|
|
} as never,
|
|
);
|
|
|
|
await handleToolExecutionEnd(
|
|
ctx as never,
|
|
{
|
|
type: "tool_execution_end",
|
|
toolName: "cron",
|
|
toolCallId: "tool-cron-1",
|
|
isError: false,
|
|
result: { details: { status: "ok" } },
|
|
} as never,
|
|
);
|
|
|
|
expect(ctx.state.successfulCronAdds).toBe(1);
|
|
});
|
|
|
|
it("does not increment successfulCronAdds when cron add fails", async () => {
|
|
const { ctx } = createTestContext();
|
|
await handleToolExecutionStart(
|
|
ctx as never,
|
|
{
|
|
type: "tool_execution_start",
|
|
toolName: "cron",
|
|
toolCallId: "tool-cron-2",
|
|
args: { action: "add", job: { name: "reminder" } },
|
|
} as never,
|
|
);
|
|
|
|
await handleToolExecutionEnd(
|
|
ctx as never,
|
|
{
|
|
type: "tool_execution_end",
|
|
toolName: "cron",
|
|
toolCallId: "tool-cron-2",
|
|
isError: true,
|
|
result: { details: { status: "error" } },
|
|
} as never,
|
|
);
|
|
|
|
expect(ctx.state.successfulCronAdds).toBe(0);
|
|
});
|
|
});
|
|
|
|
describe("messaging tool media URL tracking", () => {
|
|
it("tracks media arg from messaging tool as pending", async () => {
|
|
const { ctx } = createTestContext();
|
|
|
|
const evt: ToolExecutionStartEvent = {
|
|
type: "tool_execution_start",
|
|
toolName: "message",
|
|
toolCallId: "tool-m1",
|
|
args: { action: "send", to: "channel:123", content: "hi", media: "file:///img.jpg" },
|
|
};
|
|
|
|
await handleToolExecutionStart(ctx, evt);
|
|
|
|
expect(ctx.state.pendingMessagingMediaUrls.get("tool-m1")).toEqual(["file:///img.jpg"]);
|
|
});
|
|
|
|
it("commits pending media URL on tool success", async () => {
|
|
const { ctx } = createTestContext();
|
|
|
|
// Simulate start
|
|
const startEvt: ToolExecutionStartEvent = {
|
|
type: "tool_execution_start",
|
|
toolName: "message",
|
|
toolCallId: "tool-m2",
|
|
args: { action: "send", to: "channel:123", content: "hi", media: "file:///img.jpg" },
|
|
};
|
|
|
|
await handleToolExecutionStart(ctx, startEvt);
|
|
|
|
// Simulate successful end
|
|
const endEvt: ToolExecutionEndEvent = {
|
|
type: "tool_execution_end",
|
|
toolName: "message",
|
|
toolCallId: "tool-m2",
|
|
isError: false,
|
|
result: { ok: true },
|
|
};
|
|
|
|
await handleToolExecutionEnd(ctx, endEvt);
|
|
|
|
expect(ctx.state.messagingToolSentMediaUrls).toContain("file:///img.jpg");
|
|
expect(ctx.state.pendingMessagingMediaUrls.has("tool-m2")).toBe(false);
|
|
});
|
|
|
|
it("commits mediaUrls from tool result payload", async () => {
|
|
const { ctx } = createTestContext();
|
|
|
|
const startEvt: ToolExecutionStartEvent = {
|
|
type: "tool_execution_start",
|
|
toolName: "message",
|
|
toolCallId: "tool-m2b",
|
|
args: { action: "send", to: "channel:123", content: "hi" },
|
|
};
|
|
await handleToolExecutionStart(ctx, startEvt);
|
|
|
|
const endEvt: ToolExecutionEndEvent = {
|
|
type: "tool_execution_end",
|
|
toolName: "message",
|
|
toolCallId: "tool-m2b",
|
|
isError: false,
|
|
result: {
|
|
content: [
|
|
{
|
|
type: "text",
|
|
text: JSON.stringify({
|
|
mediaUrls: ["file:///img-a.jpg", "file:///img-b.jpg"],
|
|
}),
|
|
},
|
|
],
|
|
},
|
|
};
|
|
await handleToolExecutionEnd(ctx, endEvt);
|
|
|
|
expect(ctx.state.messagingToolSentMediaUrls).toEqual([
|
|
"file:///img-a.jpg",
|
|
"file:///img-b.jpg",
|
|
]);
|
|
});
|
|
|
|
it("trims messagingToolSentMediaUrls to 200 on commit (FIFO)", async () => {
|
|
const { ctx } = createTestContext();
|
|
|
|
// Replace mock with a real trim that replicates production cap logic.
|
|
const MAX = 200;
|
|
ctx.trimMessagingToolSent = () => {
|
|
if (ctx.state.messagingToolSentTexts.length > MAX) {
|
|
const overflow = ctx.state.messagingToolSentTexts.length - MAX;
|
|
ctx.state.messagingToolSentTexts.splice(0, overflow);
|
|
ctx.state.messagingToolSentTextsNormalized.splice(0, overflow);
|
|
}
|
|
if (ctx.state.messagingToolSentTargets.length > MAX) {
|
|
const overflow = ctx.state.messagingToolSentTargets.length - MAX;
|
|
ctx.state.messagingToolSentTargets.splice(0, overflow);
|
|
}
|
|
if (ctx.state.messagingToolSentMediaUrls.length > MAX) {
|
|
const overflow = ctx.state.messagingToolSentMediaUrls.length - MAX;
|
|
ctx.state.messagingToolSentMediaUrls.splice(0, overflow);
|
|
}
|
|
};
|
|
|
|
// Pre-fill with 200 URLs (url-0 .. url-199)
|
|
for (let i = 0; i < 200; i++) {
|
|
ctx.state.messagingToolSentMediaUrls.push(`file:///img-${i}.jpg`);
|
|
}
|
|
expect(ctx.state.messagingToolSentMediaUrls).toHaveLength(200);
|
|
|
|
// Commit one more via start → end
|
|
const startEvt: ToolExecutionStartEvent = {
|
|
type: "tool_execution_start",
|
|
toolName: "message",
|
|
toolCallId: "tool-cap",
|
|
args: { action: "send", to: "channel:123", content: "hi", media: "file:///img-new.jpg" },
|
|
};
|
|
await handleToolExecutionStart(ctx, startEvt);
|
|
|
|
const endEvt: ToolExecutionEndEvent = {
|
|
type: "tool_execution_end",
|
|
toolName: "message",
|
|
toolCallId: "tool-cap",
|
|
isError: false,
|
|
result: { ok: true },
|
|
};
|
|
await handleToolExecutionEnd(ctx, endEvt);
|
|
|
|
// Should be capped at 200, oldest removed, newest appended.
|
|
expect(ctx.state.messagingToolSentMediaUrls).toHaveLength(200);
|
|
expect(ctx.state.messagingToolSentMediaUrls[0]).toBe("file:///img-1.jpg");
|
|
expect(ctx.state.messagingToolSentMediaUrls[199]).toBe("file:///img-new.jpg");
|
|
expect(ctx.state.messagingToolSentMediaUrls).not.toContain("file:///img-0.jpg");
|
|
});
|
|
|
|
it("discards pending media URL on tool error", async () => {
|
|
const { ctx } = createTestContext();
|
|
|
|
const startEvt: ToolExecutionStartEvent = {
|
|
type: "tool_execution_start",
|
|
toolName: "message",
|
|
toolCallId: "tool-m3",
|
|
args: { action: "send", to: "channel:123", content: "hi", media: "file:///img.jpg" },
|
|
};
|
|
|
|
await handleToolExecutionStart(ctx, startEvt);
|
|
|
|
const endEvt: ToolExecutionEndEvent = {
|
|
type: "tool_execution_end",
|
|
toolName: "message",
|
|
toolCallId: "tool-m3",
|
|
isError: true,
|
|
result: "Error: failed",
|
|
};
|
|
|
|
await handleToolExecutionEnd(ctx, endEvt);
|
|
|
|
expect(ctx.state.messagingToolSentMediaUrls).toHaveLength(0);
|
|
expect(ctx.state.pendingMessagingMediaUrls.has("tool-m3")).toBe(false);
|
|
});
|
|
});
|