openclaw/src/auto-reply/reply/abort.test.ts

669 lines
20 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import { afterEach, describe, expect, it, vi } from "vitest";
import type { SubagentRunRecord } from "../../agents/subagent-registry.js";
import type { OpenClawConfig } from "../../config/config.js";
import {
getAbortMemory,
getAbortMemorySizeForTest,
isAbortRequestText,
isAbortTrigger,
resetAbortMemoryForTest,
resolveAbortCutoffFromContext,
resolveSessionEntryForKey,
setAbortMemory,
shouldSkipMessageByAbortCutoff,
tryFastAbortFromMessage,
} from "./abort.js";
import { enqueueFollowupRun, getFollowupQueueDepth, type FollowupRun } from "./queue.js";
import { initSessionState } from "./session.js";
import { buildTestCtx } from "./test-ctx.js";
vi.mock("../../agents/pi-embedded.js", () => ({
abortEmbeddedPiRun: vi.fn().mockReturnValue(true),
resolveEmbeddedSessionLane: (key: string) => `session:${key.trim() || "main"}`,
}));
const commandQueueMocks = vi.hoisted(() => ({
clearCommandLane: vi.fn(),
}));
vi.mock("../../process/command-queue.js", () => commandQueueMocks);
const subagentRegistryMocks = vi.hoisted(() => ({
listSubagentRunsForRequester: vi.fn<(requesterSessionKey: string) => SubagentRunRecord[]>(
() => [],
),
markSubagentRunTerminated: vi.fn(() => 1),
}));
vi.mock("../../agents/subagent-registry.js", () => ({
listSubagentRunsForRequester: subagentRegistryMocks.listSubagentRunsForRequester,
markSubagentRunTerminated: subagentRegistryMocks.markSubagentRunTerminated,
}));
const acpManagerMocks = vi.hoisted(() => ({
resolveSession: vi.fn<
() =>
| { kind: "none" }
| {
kind: "ready";
sessionKey: string;
meta: unknown;
}
>(() => ({ kind: "none" })),
cancelSession: vi.fn(async () => {}),
}));
vi.mock("../../acp/control-plane/manager.js", () => ({
getAcpSessionManager: () => ({
resolveSession: acpManagerMocks.resolveSession,
cancelSession: acpManagerMocks.cancelSession,
}),
}));
describe("abort detection", () => {
async function writeSessionStore(
storePath: string,
sessionIdsByKey: Record<string, string>,
nowMs = Date.now(),
) {
const storeEntries = Object.fromEntries(
Object.entries(sessionIdsByKey).map(([key, sessionId]) => [
key,
{ sessionId, updatedAt: nowMs },
]),
);
await fs.writeFile(storePath, JSON.stringify(storeEntries, null, 2));
}
async function createAbortConfig(params?: {
commandsTextEnabled?: boolean;
sessionIdsByKey?: Record<string, string>;
nowMs?: number;
}) {
const root = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-abort-"));
const storePath = path.join(root, "sessions.json");
const cfg = {
session: { store: storePath },
...(typeof params?.commandsTextEnabled === "boolean"
? { commands: { text: params.commandsTextEnabled } }
: {}),
} as OpenClawConfig;
if (params?.sessionIdsByKey) {
await writeSessionStore(storePath, params.sessionIdsByKey, params.nowMs);
}
return { root, storePath, cfg };
}
async function runStopCommand(params: {
cfg: OpenClawConfig;
sessionKey: string;
from: string;
to: string;
targetSessionKey?: string;
messageSid?: string;
timestamp?: number;
}) {
return tryFastAbortFromMessage({
ctx: buildTestCtx({
CommandBody: "/stop",
RawBody: "/stop",
CommandAuthorized: true,
SessionKey: params.sessionKey,
Provider: "telegram",
Surface: "telegram",
From: params.from,
To: params.to,
...(params.targetSessionKey ? { CommandTargetSessionKey: params.targetSessionKey } : {}),
...(params.messageSid ? { MessageSid: params.messageSid } : {}),
...(typeof params.timestamp === "number" ? { Timestamp: params.timestamp } : {}),
}),
cfg: params.cfg,
});
}
afterEach(() => {
resetAbortMemoryForTest();
acpManagerMocks.resolveSession.mockReset().mockReturnValue({ kind: "none" });
acpManagerMocks.cancelSession.mockReset().mockResolvedValue(undefined);
});
it("triggerBodyNormalized extracts /stop from RawBody for abort detection", async () => {
const root = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-abort-"));
const storePath = path.join(root, "sessions.json");
const cfg = { session: { store: storePath } } as OpenClawConfig;
const groupMessageCtx = {
Body: `[Context]\nJake: /stop\n[from: Jake]`,
RawBody: "/stop",
ChatType: "group",
SessionKey: "agent:main:whatsapp:group:g1",
};
const result = await initSessionState({
ctx: groupMessageCtx,
cfg,
commandAuthorized: true,
});
// /stop is detected via exact match in handleAbort, not isAbortTrigger
expect(result.triggerBodyNormalized).toBe("/stop");
});
it("isAbortTrigger matches standalone abort trigger phrases", () => {
const positives = [
"stop",
"esc",
"abort",
"wait",
"exit",
"interrupt",
"stop openclaw",
"openclaw stop",
"stop action",
"stop current action",
"stop run",
"stop current run",
"stop agent",
"stop the agent",
"stop don't do anything",
"stop dont do anything",
"stop do not do anything",
"stop doing anything",
"do not do that",
"please stop",
"stop please",
"STOP OPENCLAW",
"stop openclaw!!!",
"stop dont do anything",
"detente",
"detén",
"arrête",
"停止",
"やめて",
"止めて",
"रुको",
"توقف",
"стоп",
"остановись",
"останови",
"остановить",
"прекрати",
"halt",
"anhalten",
"aufhören",
"hoer auf",
"stopp",
"pare",
];
for (const candidate of positives) {
expect(isAbortTrigger(candidate)).toBe(true);
}
expect(isAbortTrigger("hello")).toBe(false);
expect(isAbortTrigger("please do not do that")).toBe(false);
// /stop is NOT matched by isAbortTrigger - it's handled separately.
expect(isAbortTrigger("/stop")).toBe(false);
});
it("isAbortRequestText aligns abort command semantics", () => {
expect(isAbortRequestText("/stop")).toBe(true);
expect(isAbortRequestText("/STOP")).toBe(true);
expect(isAbortRequestText("/stop!!!")).toBe(true);
expect(isAbortRequestText("/Stop!!!")).toBe(true);
expect(isAbortRequestText("stop")).toBe(true);
expect(isAbortRequestText("Stop")).toBe(true);
expect(isAbortRequestText("STOP")).toBe(true);
expect(isAbortRequestText("stop action")).toBe(true);
expect(isAbortRequestText("stop openclaw!!!")).toBe(true);
expect(isAbortRequestText("やめて")).toBe(true);
expect(isAbortRequestText("остановись")).toBe(true);
expect(isAbortRequestText("halt")).toBe(true);
expect(isAbortRequestText("stopp")).toBe(true);
expect(isAbortRequestText("pare")).toBe(true);
expect(isAbortRequestText(" توقف ")).toBe(true);
expect(isAbortRequestText("/stop@openclaw_bot", { botUsername: "openclaw_bot" })).toBe(true);
expect(isAbortRequestText("/Stop@openclaw_bot", { botUsername: "openclaw_bot" })).toBe(true);
expect(isAbortRequestText("/status")).toBe(false);
expect(isAbortRequestText("do not do that")).toBe(true);
expect(isAbortRequestText("please do not do that")).toBe(false);
expect(isAbortRequestText("/abort")).toBe(false);
});
it("removes abort memory entry when flag is reset", () => {
setAbortMemory("session-1", true);
expect(getAbortMemory("session-1")).toBe(true);
setAbortMemory("session-1", false);
expect(getAbortMemory("session-1")).toBeUndefined();
expect(getAbortMemorySizeForTest()).toBe(0);
});
it("caps abort memory tracking to a bounded max size", () => {
for (let i = 0; i < 2105; i += 1) {
setAbortMemory(`session-${i}`, true);
}
expect(getAbortMemorySizeForTest()).toBe(2000);
expect(getAbortMemory("session-0")).toBeUndefined();
expect(getAbortMemory("session-2104")).toBe(true);
});
it("extracts abort cutoff metadata from context", () => {
expect(
resolveAbortCutoffFromContext(
buildTestCtx({
MessageSid: "42",
Timestamp: 123,
}),
),
).toEqual({
messageSid: "42",
timestamp: 123,
});
});
it("treats numeric message IDs at or before cutoff as stale", () => {
expect(
shouldSkipMessageByAbortCutoff({
cutoffMessageSid: "200",
messageSid: "199",
}),
).toBe(true);
expect(
shouldSkipMessageByAbortCutoff({
cutoffMessageSid: "200",
messageSid: "200",
}),
).toBe(true);
expect(
shouldSkipMessageByAbortCutoff({
cutoffMessageSid: "200",
messageSid: "201",
}),
).toBe(false);
});
it("falls back to timestamp cutoff when message IDs are unavailable", () => {
expect(
shouldSkipMessageByAbortCutoff({
cutoffTimestamp: 2000,
timestamp: 1999,
}),
).toBe(true);
expect(
shouldSkipMessageByAbortCutoff({
cutoffTimestamp: 2000,
timestamp: 2000,
}),
).toBe(true);
expect(
shouldSkipMessageByAbortCutoff({
cutoffTimestamp: 2000,
timestamp: 2001,
}),
).toBe(false);
});
it("resolves session entry when key exists in store", () => {
const store = {
"session-1": { sessionId: "abc", updatedAt: 0 },
} as const;
expect(resolveSessionEntryForKey(store, "session-1")).toEqual({
entry: store["session-1"],
key: "session-1",
});
expect(resolveSessionEntryForKey(store, "session-2")).toEqual({});
expect(resolveSessionEntryForKey(undefined, "session-1")).toEqual({});
});
it("fast-aborts even when text commands are disabled", async () => {
const { cfg } = await createAbortConfig({ commandsTextEnabled: false });
const result = await runStopCommand({
cfg,
sessionKey: "telegram:123",
from: "telegram:123",
to: "telegram:123",
});
expect(result.handled).toBe(true);
});
it("fast-abort clears queued followups and session lane", async () => {
const sessionKey = "telegram:123";
const sessionId = "session-123";
const { root, cfg } = await createAbortConfig({
sessionIdsByKey: { [sessionKey]: sessionId },
});
const followupRun: FollowupRun = {
prompt: "queued",
enqueuedAt: Date.now(),
run: {
agentId: "main",
agentDir: path.join(root, "agent"),
sessionId,
sessionKey,
messageProvider: "telegram",
agentAccountId: "acct",
sessionFile: path.join(root, "session.jsonl"),
workspaceDir: path.join(root, "workspace"),
config: cfg,
provider: "anthropic",
model: "claude-opus-4-5",
timeoutMs: 1000,
blockReplyBreak: "text_end",
},
};
enqueueFollowupRun(
sessionKey,
followupRun,
{ mode: "collect", debounceMs: 0, cap: 20, dropPolicy: "summarize" },
"none",
);
expect(getFollowupQueueDepth(sessionKey)).toBe(1);
const result = await runStopCommand({
cfg,
sessionKey,
from: "telegram:123",
to: "telegram:123",
});
expect(result.handled).toBe(true);
expect(getFollowupQueueDepth(sessionKey)).toBe(0);
expect(commandQueueMocks.clearCommandLane).toHaveBeenCalledWith(`session:${sessionKey}`);
});
it("plain-language stop on ACP-bound session triggers ACP cancel", async () => {
const sessionKey = "agent:codex:acp:test-1";
const sessionId = "session-123";
const { cfg } = await createAbortConfig({
sessionIdsByKey: { [sessionKey]: sessionId },
});
acpManagerMocks.resolveSession.mockReturnValue({
kind: "ready",
sessionKey,
meta: {} as never,
});
const result = await runStopCommand({
cfg,
sessionKey,
from: "telegram:123",
to: "telegram:123",
targetSessionKey: sessionKey,
});
expect(result.handled).toBe(true);
expect(acpManagerMocks.cancelSession).toHaveBeenCalledWith({
cfg,
sessionKey,
reason: "fast-abort",
});
});
it("ACP cancel failures do not skip queue and lane cleanup", async () => {
const sessionKey = "agent:codex:acp:test-2";
const sessionId = "session-456";
const { root, cfg } = await createAbortConfig({
sessionIdsByKey: { [sessionKey]: sessionId },
});
const followupRun: FollowupRun = {
prompt: "queued",
enqueuedAt: Date.now(),
run: {
agentId: "main",
agentDir: path.join(root, "agent"),
sessionId,
sessionKey,
messageProvider: "telegram",
agentAccountId: "acct",
sessionFile: path.join(root, "session.jsonl"),
workspaceDir: path.join(root, "workspace"),
config: cfg,
provider: "anthropic",
model: "claude-opus-4-5",
timeoutMs: 1000,
blockReplyBreak: "text_end",
},
};
enqueueFollowupRun(
sessionKey,
followupRun,
{ mode: "collect", debounceMs: 0, cap: 20, dropPolicy: "summarize" },
"none",
);
acpManagerMocks.resolveSession.mockReturnValue({
kind: "ready",
sessionKey,
meta: {} as never,
});
acpManagerMocks.cancelSession.mockRejectedValueOnce(new Error("cancel failed"));
const result = await runStopCommand({
cfg,
sessionKey,
from: "telegram:123",
to: "telegram:123",
targetSessionKey: sessionKey,
});
expect(result.handled).toBe(true);
expect(getFollowupQueueDepth(sessionKey)).toBe(0);
expect(commandQueueMocks.clearCommandLane).toHaveBeenCalledWith(`session:${sessionKey}`);
});
it("persists abort cutoff metadata on /stop when command and target session match", async () => {
const sessionKey = "telegram:123";
const sessionId = "session-123";
const { storePath, cfg } = await createAbortConfig({
sessionIdsByKey: { [sessionKey]: sessionId },
});
const result = await runStopCommand({
cfg,
sessionKey,
from: "telegram:123",
to: "telegram:123",
messageSid: "55",
timestamp: 1234567890000,
});
expect(result.handled).toBe(true);
const store = JSON.parse(await fs.readFile(storePath, "utf8")) as Record<string, unknown>;
const entry = store[sessionKey] as {
abortedLastRun?: boolean;
abortCutoffMessageSid?: string;
abortCutoffTimestamp?: number;
};
expect(entry.abortedLastRun).toBe(true);
expect(entry.abortCutoffMessageSid).toBe("55");
expect(entry.abortCutoffTimestamp).toBe(1234567890000);
});
it("does not persist cutoff metadata when native /stop targets a different session", async () => {
const slashSessionKey = "telegram:slash:123";
const targetSessionKey = "agent:main:telegram:group:123";
const targetSessionId = "session-target";
const { storePath, cfg } = await createAbortConfig({
sessionIdsByKey: { [targetSessionKey]: targetSessionId },
});
const result = await runStopCommand({
cfg,
sessionKey: slashSessionKey,
from: "telegram:123",
to: "telegram:123",
targetSessionKey,
messageSid: "999",
timestamp: 1234567890000,
});
expect(result.handled).toBe(true);
const store = JSON.parse(await fs.readFile(storePath, "utf8")) as Record<string, unknown>;
const entry = store[targetSessionKey] as {
abortedLastRun?: boolean;
abortCutoffMessageSid?: string;
abortCutoffTimestamp?: number;
};
expect(entry.abortedLastRun).toBe(true);
expect(entry.abortCutoffMessageSid).toBeUndefined();
expect(entry.abortCutoffTimestamp).toBeUndefined();
});
it("fast-abort stops active subagent runs for requester session", async () => {
const sessionKey = "telegram:parent";
const childKey = "agent:main:subagent:child-1";
const sessionId = "session-parent";
const childSessionId = "session-child";
const { cfg } = await createAbortConfig({
sessionIdsByKey: {
[sessionKey]: sessionId,
[childKey]: childSessionId,
},
});
subagentRegistryMocks.listSubagentRunsForRequester.mockReturnValueOnce([
{
runId: "run-1",
childSessionKey: childKey,
requesterSessionKey: sessionKey,
requesterDisplayKey: "telegram:parent",
task: "do work",
cleanup: "keep",
createdAt: Date.now(),
},
]);
const result = await runStopCommand({
cfg,
sessionKey,
from: "telegram:parent",
to: "telegram:parent",
});
expect(result.stoppedSubagents).toBe(1);
expect(commandQueueMocks.clearCommandLane).toHaveBeenCalledWith(`session:${childKey}`);
});
it("cascade stop kills depth-2 children when stopping depth-1 agent", async () => {
const sessionKey = "telegram:parent";
const depth1Key = "agent:main:subagent:child-1";
const depth2Key = "agent:main:subagent:child-1:subagent:grandchild-1";
const sessionId = "session-parent";
const depth1SessionId = "session-child";
const depth2SessionId = "session-grandchild";
const { cfg } = await createAbortConfig({
sessionIdsByKey: {
[sessionKey]: sessionId,
[depth1Key]: depth1SessionId,
[depth2Key]: depth2SessionId,
},
});
// First call: main session lists depth-1 children
// Second call (cascade): depth-1 session lists depth-2 children
// Third call (cascade from depth-2): no further children
subagentRegistryMocks.listSubagentRunsForRequester
.mockReturnValueOnce([
{
runId: "run-1",
childSessionKey: depth1Key,
requesterSessionKey: sessionKey,
requesterDisplayKey: "telegram:parent",
task: "orchestrator",
cleanup: "keep",
createdAt: Date.now(),
},
])
.mockReturnValueOnce([
{
runId: "run-2",
childSessionKey: depth2Key,
requesterSessionKey: depth1Key,
requesterDisplayKey: depth1Key,
task: "leaf worker",
cleanup: "keep",
createdAt: Date.now(),
},
])
.mockReturnValueOnce([]);
const result = await runStopCommand({
cfg,
sessionKey,
from: "telegram:parent",
to: "telegram:parent",
});
// Should stop both depth-1 and depth-2 agents (cascade)
expect(result.stoppedSubagents).toBe(2);
expect(commandQueueMocks.clearCommandLane).toHaveBeenCalledWith(`session:${depth1Key}`);
expect(commandQueueMocks.clearCommandLane).toHaveBeenCalledWith(`session:${depth2Key}`);
});
it("cascade stop traverses ended depth-1 parents to stop active depth-2 children", async () => {
subagentRegistryMocks.listSubagentRunsForRequester.mockClear();
subagentRegistryMocks.markSubagentRunTerminated.mockClear();
const sessionKey = "telegram:parent";
const depth1Key = "agent:main:subagent:child-ended";
const depth2Key = "agent:main:subagent:child-ended:subagent:grandchild-active";
const now = Date.now();
const { cfg } = await createAbortConfig({
nowMs: now,
sessionIdsByKey: {
[sessionKey]: "session-parent",
[depth1Key]: "session-child-ended",
[depth2Key]: "session-grandchild-active",
},
});
// main -> ended depth-1 parent
// depth-1 parent -> active depth-2 child
// depth-2 child -> none
subagentRegistryMocks.listSubagentRunsForRequester
.mockReturnValueOnce([
{
runId: "run-1",
childSessionKey: depth1Key,
requesterSessionKey: sessionKey,
requesterDisplayKey: "telegram:parent",
task: "orchestrator",
cleanup: "keep",
createdAt: now - 1_000,
endedAt: now - 500,
outcome: { status: "ok" },
},
])
.mockReturnValueOnce([
{
runId: "run-2",
childSessionKey: depth2Key,
requesterSessionKey: depth1Key,
requesterDisplayKey: depth1Key,
task: "leaf worker",
cleanup: "keep",
createdAt: now - 500,
},
])
.mockReturnValueOnce([]);
const result = await runStopCommand({
cfg,
sessionKey,
from: "telegram:parent",
to: "telegram:parent",
});
// Should skip killing the ended depth-1 run itself, but still kill depth-2.
expect(result.stoppedSubagents).toBe(1);
expect(commandQueueMocks.clearCommandLane).toHaveBeenCalledWith(`session:${depth2Key}`);
expect(subagentRegistryMocks.markSubagentRunTerminated).toHaveBeenCalledWith(
expect.objectContaining({ runId: "run-2", childSessionKey: depth2Key }),
);
});
});