* fix(acp): implicit streamToParent for mode=run without thread When spawning ACP sessions with mode=run and no thread binding, automatically route output to parent session instead of Discord. This enables agent-to-agent supervision patterns where the spawning agent wants results returned programmatically, not posted as chat. The change makes sessions_spawn with runtime=acp and thread=false behave like direct acpx invocation - output goes to the spawning session, not to Discord. Fixes the issue where mode=run without thread still posted to Discord because hasDeliveryTarget was true when called from a Discord context. * fix: use resolved spawnMode instead of params.mode Move implicit streamToParent check to after resolveSpawnMode so that both explicit mode="run" and omitted mode (which defaults to "run" when thread is false) correctly trigger parent routing. This fixes the issue where callers that rely on default mode selection would not get the intended parent streaming behavior. * fix: tighten implicit ACP parent relay gating (#42404) (thanks @davidguttman) --------- Co-authored-by: Onur Solmaz <2453968+osolmaz@users.noreply.github.com>
1045 lines
32 KiB
TypeScript
1045 lines
32 KiB
TypeScript
import { beforeEach, describe, expect, it, vi } from "vitest";
|
|
import type { OpenClawConfig } from "../config/config.js";
|
|
import type { SessionBindingRecord } from "../infra/outbound/session-binding-service.js";
|
|
|
|
function createDefaultSpawnConfig(): OpenClawConfig {
|
|
return {
|
|
acp: {
|
|
enabled: true,
|
|
backend: "acpx",
|
|
allowedAgents: ["codex"],
|
|
},
|
|
session: {
|
|
mainKey: "main",
|
|
scope: "per-sender",
|
|
},
|
|
channels: {
|
|
discord: {
|
|
threadBindings: {
|
|
enabled: true,
|
|
spawnAcpSessions: true,
|
|
},
|
|
},
|
|
},
|
|
};
|
|
}
|
|
|
|
const hoisted = vi.hoisted(() => {
|
|
const callGatewayMock = vi.fn();
|
|
const sessionBindingCapabilitiesMock = vi.fn();
|
|
const sessionBindingBindMock = vi.fn();
|
|
const sessionBindingUnbindMock = vi.fn();
|
|
const sessionBindingResolveByConversationMock = vi.fn();
|
|
const sessionBindingListBySessionMock = vi.fn();
|
|
const closeSessionMock = vi.fn();
|
|
const initializeSessionMock = vi.fn();
|
|
const startAcpSpawnParentStreamRelayMock = vi.fn();
|
|
const resolveAcpSpawnStreamLogPathMock = vi.fn();
|
|
const loadSessionStoreMock = vi.fn();
|
|
const resolveStorePathMock = vi.fn();
|
|
const resolveSessionTranscriptFileMock = vi.fn();
|
|
const areHeartbeatsEnabledMock = vi.fn();
|
|
const state = {
|
|
cfg: createDefaultSpawnConfig(),
|
|
};
|
|
return {
|
|
callGatewayMock,
|
|
sessionBindingCapabilitiesMock,
|
|
sessionBindingBindMock,
|
|
sessionBindingUnbindMock,
|
|
sessionBindingResolveByConversationMock,
|
|
sessionBindingListBySessionMock,
|
|
closeSessionMock,
|
|
initializeSessionMock,
|
|
startAcpSpawnParentStreamRelayMock,
|
|
resolveAcpSpawnStreamLogPathMock,
|
|
loadSessionStoreMock,
|
|
resolveStorePathMock,
|
|
resolveSessionTranscriptFileMock,
|
|
areHeartbeatsEnabledMock,
|
|
state,
|
|
};
|
|
});
|
|
|
|
function buildSessionBindingServiceMock() {
|
|
return {
|
|
touch: vi.fn(),
|
|
bind(input: unknown) {
|
|
return hoisted.sessionBindingBindMock(input);
|
|
},
|
|
unbind(input: unknown) {
|
|
return hoisted.sessionBindingUnbindMock(input);
|
|
},
|
|
getCapabilities(params: unknown) {
|
|
return hoisted.sessionBindingCapabilitiesMock(params);
|
|
},
|
|
resolveByConversation(ref: unknown) {
|
|
return hoisted.sessionBindingResolveByConversationMock(ref);
|
|
},
|
|
listBySession(targetSessionKey: string) {
|
|
return hoisted.sessionBindingListBySessionMock(targetSessionKey);
|
|
},
|
|
};
|
|
}
|
|
|
|
vi.mock("../config/config.js", async (importOriginal) => {
|
|
const actual = await importOriginal<typeof import("../config/config.js")>();
|
|
return {
|
|
...actual,
|
|
loadConfig: () => hoisted.state.cfg,
|
|
};
|
|
});
|
|
|
|
vi.mock("../gateway/call.js", () => ({
|
|
callGateway: (opts: unknown) => hoisted.callGatewayMock(opts),
|
|
}));
|
|
|
|
vi.mock("../config/sessions.js", async (importOriginal) => {
|
|
const actual = await importOriginal<typeof import("../config/sessions.js")>();
|
|
return {
|
|
...actual,
|
|
loadSessionStore: (storePath: string) => hoisted.loadSessionStoreMock(storePath),
|
|
resolveStorePath: (store: unknown, opts: unknown) => hoisted.resolveStorePathMock(store, opts),
|
|
};
|
|
});
|
|
|
|
vi.mock("../config/sessions/transcript.js", async (importOriginal) => {
|
|
const actual = await importOriginal<typeof import("../config/sessions/transcript.js")>();
|
|
return {
|
|
...actual,
|
|
resolveSessionTranscriptFile: (params: unknown) =>
|
|
hoisted.resolveSessionTranscriptFileMock(params),
|
|
};
|
|
});
|
|
|
|
vi.mock("../acp/control-plane/manager.js", () => {
|
|
return {
|
|
getAcpSessionManager: () => ({
|
|
initializeSession: (params: unknown) => hoisted.initializeSessionMock(params),
|
|
closeSession: (params: unknown) => hoisted.closeSessionMock(params),
|
|
}),
|
|
};
|
|
});
|
|
|
|
vi.mock("../infra/outbound/session-binding-service.js", async (importOriginal) => {
|
|
const actual =
|
|
await importOriginal<typeof import("../infra/outbound/session-binding-service.js")>();
|
|
return {
|
|
...actual,
|
|
getSessionBindingService: () => buildSessionBindingServiceMock(),
|
|
};
|
|
});
|
|
|
|
vi.mock("../infra/heartbeat-wake.js", async (importOriginal) => {
|
|
const actual = await importOriginal<typeof import("../infra/heartbeat-wake.js")>();
|
|
return {
|
|
...actual,
|
|
areHeartbeatsEnabled: () => hoisted.areHeartbeatsEnabledMock(),
|
|
};
|
|
});
|
|
|
|
vi.mock("./acp-spawn-parent-stream.js", () => ({
|
|
startAcpSpawnParentStreamRelay: (...args: unknown[]) =>
|
|
hoisted.startAcpSpawnParentStreamRelayMock(...args),
|
|
resolveAcpSpawnStreamLogPath: (...args: unknown[]) =>
|
|
hoisted.resolveAcpSpawnStreamLogPathMock(...args),
|
|
}));
|
|
|
|
const { spawnAcpDirect } = await import("./acp-spawn.js");
|
|
|
|
function createSessionBindingCapabilities() {
|
|
return {
|
|
adapterAvailable: true,
|
|
bindSupported: true,
|
|
unbindSupported: true,
|
|
placements: ["current", "child"] as const,
|
|
};
|
|
}
|
|
|
|
function createSessionBinding(overrides?: Partial<SessionBindingRecord>): SessionBindingRecord {
|
|
return {
|
|
bindingId: "default:child-thread",
|
|
targetSessionKey: "agent:codex:acp:s1",
|
|
targetKind: "session",
|
|
conversation: {
|
|
channel: "discord",
|
|
accountId: "default",
|
|
conversationId: "child-thread",
|
|
parentConversationId: "parent-channel",
|
|
},
|
|
status: "active",
|
|
boundAt: Date.now(),
|
|
metadata: {
|
|
agentId: "codex",
|
|
boundBy: "system",
|
|
},
|
|
...overrides,
|
|
};
|
|
}
|
|
|
|
function createRelayHandle(overrides?: {
|
|
dispose?: ReturnType<typeof vi.fn>;
|
|
notifyStarted?: ReturnType<typeof vi.fn>;
|
|
}) {
|
|
return {
|
|
dispose: overrides?.dispose ?? vi.fn(),
|
|
notifyStarted: overrides?.notifyStarted ?? vi.fn(),
|
|
};
|
|
}
|
|
|
|
function expectResolvedIntroTextInBindMetadata(): void {
|
|
const callWithMetadata = hoisted.sessionBindingBindMock.mock.calls.find(
|
|
(call: unknown[]) =>
|
|
typeof (call[0] as { metadata?: { introText?: unknown } } | undefined)?.metadata
|
|
?.introText === "string",
|
|
);
|
|
const introText =
|
|
(callWithMetadata?.[0] as { metadata?: { introText?: string } } | undefined)?.metadata
|
|
?.introText ?? "";
|
|
expect(introText.includes("session ids: pending (available after the first reply)")).toBe(false);
|
|
}
|
|
|
|
describe("spawnAcpDirect", () => {
|
|
beforeEach(() => {
|
|
hoisted.state.cfg = createDefaultSpawnConfig();
|
|
hoisted.areHeartbeatsEnabledMock.mockReset().mockReturnValue(true);
|
|
|
|
hoisted.callGatewayMock.mockReset().mockImplementation(async (argsUnknown: unknown) => {
|
|
const args = argsUnknown as { method?: string };
|
|
if (args.method === "sessions.patch") {
|
|
return { ok: true };
|
|
}
|
|
if (args.method === "agent") {
|
|
return { runId: "run-1" };
|
|
}
|
|
if (args.method === "sessions.delete") {
|
|
return { ok: true };
|
|
}
|
|
return {};
|
|
});
|
|
|
|
hoisted.closeSessionMock.mockReset().mockResolvedValue({
|
|
runtimeClosed: true,
|
|
metaCleared: false,
|
|
});
|
|
hoisted.initializeSessionMock.mockReset().mockImplementation(async (argsUnknown: unknown) => {
|
|
const args = argsUnknown as {
|
|
sessionKey: string;
|
|
agent: string;
|
|
mode: "persistent" | "oneshot";
|
|
cwd?: string;
|
|
};
|
|
const runtimeSessionName = `${args.sessionKey}:runtime`;
|
|
const cwd = typeof args.cwd === "string" ? args.cwd : undefined;
|
|
return {
|
|
runtime: {
|
|
close: vi.fn().mockResolvedValue(undefined),
|
|
},
|
|
handle: {
|
|
sessionKey: args.sessionKey,
|
|
backend: "acpx",
|
|
runtimeSessionName,
|
|
...(cwd ? { cwd } : {}),
|
|
agentSessionId: "codex-inner-1",
|
|
backendSessionId: "acpx-1",
|
|
},
|
|
meta: {
|
|
backend: "acpx",
|
|
agent: args.agent,
|
|
runtimeSessionName,
|
|
...(cwd ? { runtimeOptions: { cwd }, cwd } : {}),
|
|
identity: {
|
|
state: "pending",
|
|
source: "ensure",
|
|
acpxSessionId: "acpx-1",
|
|
agentSessionId: "codex-inner-1",
|
|
lastUpdatedAt: Date.now(),
|
|
},
|
|
mode: args.mode,
|
|
state: "idle",
|
|
lastActivityAt: Date.now(),
|
|
},
|
|
};
|
|
});
|
|
|
|
hoisted.sessionBindingCapabilitiesMock
|
|
.mockReset()
|
|
.mockReturnValue(createSessionBindingCapabilities());
|
|
hoisted.sessionBindingBindMock
|
|
.mockReset()
|
|
.mockImplementation(
|
|
async (input: {
|
|
targetSessionKey: string;
|
|
conversation: { accountId: string };
|
|
metadata?: Record<string, unknown>;
|
|
}) =>
|
|
createSessionBinding({
|
|
targetSessionKey: input.targetSessionKey,
|
|
conversation: {
|
|
channel: "discord",
|
|
accountId: input.conversation.accountId,
|
|
conversationId: "child-thread",
|
|
parentConversationId: "parent-channel",
|
|
},
|
|
metadata: {
|
|
boundBy:
|
|
typeof input.metadata?.boundBy === "string" ? input.metadata.boundBy : "system",
|
|
agentId: "codex",
|
|
webhookId: "wh-1",
|
|
},
|
|
}),
|
|
);
|
|
hoisted.sessionBindingResolveByConversationMock.mockReset().mockReturnValue(null);
|
|
hoisted.sessionBindingListBySessionMock.mockReset().mockReturnValue([]);
|
|
hoisted.sessionBindingUnbindMock.mockReset().mockResolvedValue([]);
|
|
hoisted.startAcpSpawnParentStreamRelayMock
|
|
.mockReset()
|
|
.mockImplementation(() => createRelayHandle());
|
|
hoisted.resolveAcpSpawnStreamLogPathMock
|
|
.mockReset()
|
|
.mockReturnValue("/tmp/sess-main.acp-stream.jsonl");
|
|
hoisted.resolveStorePathMock.mockReset().mockReturnValue("/tmp/codex-sessions.json");
|
|
hoisted.loadSessionStoreMock.mockReset().mockImplementation(() => {
|
|
const store: Record<string, { sessionId: string; updatedAt: number }> = {};
|
|
return new Proxy(store, {
|
|
get(_target, prop) {
|
|
if (typeof prop === "string" && prop.startsWith("agent:codex:acp:")) {
|
|
return { sessionId: "sess-123", updatedAt: Date.now() };
|
|
}
|
|
return undefined;
|
|
},
|
|
});
|
|
});
|
|
hoisted.resolveSessionTranscriptFileMock
|
|
.mockReset()
|
|
.mockImplementation(async (params: unknown) => {
|
|
const typed = params as { threadId?: string };
|
|
const sessionFile = typed.threadId
|
|
? `/tmp/agents/codex/sessions/sess-123-topic-${typed.threadId}.jsonl`
|
|
: "/tmp/agents/codex/sessions/sess-123.jsonl";
|
|
return {
|
|
sessionFile,
|
|
sessionEntry: {
|
|
sessionId: "sess-123",
|
|
updatedAt: Date.now(),
|
|
sessionFile,
|
|
},
|
|
};
|
|
});
|
|
});
|
|
|
|
it("spawns ACP session, binds a new thread, and dispatches initial task", async () => {
|
|
const result = await spawnAcpDirect(
|
|
{
|
|
task: "Investigate flaky tests",
|
|
agentId: "codex",
|
|
mode: "session",
|
|
thread: true,
|
|
},
|
|
{
|
|
agentSessionKey: "agent:main:main",
|
|
agentChannel: "discord",
|
|
agentAccountId: "default",
|
|
agentTo: "channel:parent-channel",
|
|
agentThreadId: "requester-thread",
|
|
},
|
|
);
|
|
|
|
expect(result.status).toBe("accepted");
|
|
expect(result.childSessionKey).toMatch(/^agent:codex:acp:/);
|
|
expect(result.runId).toBe("run-1");
|
|
expect(result.mode).toBe("session");
|
|
const patchCalls = hoisted.callGatewayMock.mock.calls
|
|
.map((call: unknown[]) => call[0] as { method?: string; params?: Record<string, unknown> })
|
|
.filter((request) => request.method === "sessions.patch");
|
|
expect(patchCalls[0]?.params).toMatchObject({
|
|
key: result.childSessionKey,
|
|
spawnedBy: "agent:main:main",
|
|
});
|
|
expect(hoisted.sessionBindingBindMock).toHaveBeenCalledWith(
|
|
expect.objectContaining({
|
|
targetKind: "session",
|
|
placement: "child",
|
|
}),
|
|
);
|
|
expectResolvedIntroTextInBindMetadata();
|
|
|
|
const agentCall = hoisted.callGatewayMock.mock.calls
|
|
.map((call: unknown[]) => call[0] as { method?: string; params?: Record<string, unknown> })
|
|
.find((request) => request.method === "agent");
|
|
expect(agentCall?.params?.sessionKey).toMatch(/^agent:codex:acp:/);
|
|
expect(agentCall?.params?.to).toBe("channel:child-thread");
|
|
expect(agentCall?.params?.threadId).toBe("child-thread");
|
|
expect(agentCall?.params?.deliver).toBe(true);
|
|
expect(hoisted.initializeSessionMock).toHaveBeenCalledWith(
|
|
expect.objectContaining({
|
|
sessionKey: expect.stringMatching(/^agent:codex:acp:/),
|
|
agent: "codex",
|
|
mode: "persistent",
|
|
}),
|
|
);
|
|
const transcriptCalls = hoisted.resolveSessionTranscriptFileMock.mock.calls.map(
|
|
(call: unknown[]) => call[0] as { threadId?: string },
|
|
);
|
|
expect(transcriptCalls).toHaveLength(2);
|
|
expect(transcriptCalls[0]?.threadId).toBeUndefined();
|
|
expect(transcriptCalls[1]?.threadId).toBe("child-thread");
|
|
});
|
|
|
|
it("does not inline delivery for fresh oneshot ACP runs", async () => {
|
|
const result = await spawnAcpDirect(
|
|
{
|
|
task: "Investigate flaky tests",
|
|
agentId: "codex",
|
|
mode: "run",
|
|
},
|
|
{
|
|
agentSessionKey: "agent:main:telegram:direct:6098642967",
|
|
agentChannel: "telegram",
|
|
agentAccountId: "default",
|
|
agentTo: "telegram:6098642967",
|
|
agentThreadId: "1",
|
|
},
|
|
);
|
|
|
|
expect(result.status).toBe("accepted");
|
|
expect(result.mode).toBe("run");
|
|
expect(result.streamLogPath).toBeUndefined();
|
|
expect(hoisted.startAcpSpawnParentStreamRelayMock).not.toHaveBeenCalled();
|
|
expect(hoisted.resolveSessionTranscriptFileMock).toHaveBeenCalledWith(
|
|
expect.objectContaining({
|
|
sessionId: "sess-123",
|
|
storePath: "/tmp/codex-sessions.json",
|
|
agentId: "codex",
|
|
}),
|
|
);
|
|
const agentCall = hoisted.callGatewayMock.mock.calls
|
|
.map((call: unknown[]) => call[0] as { method?: string; params?: Record<string, unknown> })
|
|
.find((request) => request.method === "agent");
|
|
expect(agentCall?.params?.deliver).toBe(false);
|
|
expect(agentCall?.params?.channel).toBeUndefined();
|
|
expect(agentCall?.params?.to).toBeUndefined();
|
|
expect(agentCall?.params?.threadId).toBeUndefined();
|
|
});
|
|
|
|
it("keeps ACP spawn running when session-file persistence fails", async () => {
|
|
hoisted.resolveSessionTranscriptFileMock.mockRejectedValueOnce(new Error("disk full"));
|
|
|
|
const result = await spawnAcpDirect(
|
|
{
|
|
task: "Investigate flaky tests",
|
|
agentId: "codex",
|
|
mode: "run",
|
|
},
|
|
{
|
|
agentSessionKey: "agent:main:main",
|
|
agentChannel: "telegram",
|
|
agentAccountId: "default",
|
|
agentTo: "telegram:6098642967",
|
|
agentThreadId: "1",
|
|
},
|
|
);
|
|
|
|
expect(result.status).toBe("accepted");
|
|
expect(result.childSessionKey).toMatch(/^agent:codex:acp:/);
|
|
const agentCall = hoisted.callGatewayMock.mock.calls
|
|
.map((call: unknown[]) => call[0] as { method?: string; params?: Record<string, unknown> })
|
|
.find((request) => request.method === "agent");
|
|
expect(agentCall?.params?.sessionKey).toBe(result.childSessionKey);
|
|
});
|
|
|
|
it("includes cwd in ACP thread intro banner when provided at spawn time", async () => {
|
|
const result = await spawnAcpDirect(
|
|
{
|
|
task: "Check workspace",
|
|
agentId: "codex",
|
|
cwd: "/home/bob/clawd",
|
|
mode: "session",
|
|
thread: true,
|
|
},
|
|
{
|
|
agentSessionKey: "agent:main:main",
|
|
agentChannel: "discord",
|
|
agentAccountId: "default",
|
|
agentTo: "channel:parent-channel",
|
|
},
|
|
);
|
|
|
|
expect(result.status).toBe("accepted");
|
|
expect(hoisted.sessionBindingBindMock).toHaveBeenCalledWith(
|
|
expect.objectContaining({
|
|
metadata: expect.objectContaining({
|
|
introText: expect.stringContaining("cwd: /home/bob/clawd"),
|
|
}),
|
|
}),
|
|
);
|
|
});
|
|
|
|
it("rejects disallowed ACP agents", async () => {
|
|
hoisted.state.cfg = {
|
|
...hoisted.state.cfg,
|
|
acp: {
|
|
enabled: true,
|
|
backend: "acpx",
|
|
allowedAgents: ["claudecode"],
|
|
},
|
|
};
|
|
|
|
const result = await spawnAcpDirect(
|
|
{
|
|
task: "hello",
|
|
agentId: "codex",
|
|
},
|
|
{
|
|
agentSessionKey: "agent:main:main",
|
|
},
|
|
);
|
|
|
|
expect(result).toMatchObject({
|
|
status: "forbidden",
|
|
});
|
|
});
|
|
|
|
it("requires an explicit ACP agent when no config default exists", async () => {
|
|
const result = await spawnAcpDirect(
|
|
{
|
|
task: "hello",
|
|
},
|
|
{
|
|
agentSessionKey: "agent:main:main",
|
|
},
|
|
);
|
|
|
|
expect(result.status).toBe("error");
|
|
expect(result.error).toContain("set `acp.defaultAgent`");
|
|
});
|
|
|
|
it("fails fast when Discord ACP thread spawn is disabled", async () => {
|
|
hoisted.state.cfg = {
|
|
...hoisted.state.cfg,
|
|
channels: {
|
|
discord: {
|
|
threadBindings: {
|
|
enabled: true,
|
|
spawnAcpSessions: false,
|
|
},
|
|
},
|
|
},
|
|
};
|
|
|
|
const result = await spawnAcpDirect(
|
|
{
|
|
task: "hello",
|
|
agentId: "codex",
|
|
thread: true,
|
|
mode: "session",
|
|
},
|
|
{
|
|
agentChannel: "discord",
|
|
agentAccountId: "default",
|
|
agentTo: "channel:parent-channel",
|
|
},
|
|
);
|
|
|
|
expect(result.status).toBe("error");
|
|
expect(result.error).toContain("spawnAcpSessions=true");
|
|
});
|
|
|
|
it("forbids ACP spawn from sandboxed requester sessions", async () => {
|
|
hoisted.state.cfg = {
|
|
...hoisted.state.cfg,
|
|
agents: {
|
|
defaults: {
|
|
sandbox: { mode: "all" },
|
|
},
|
|
},
|
|
};
|
|
|
|
const result = await spawnAcpDirect(
|
|
{
|
|
task: "hello",
|
|
agentId: "codex",
|
|
},
|
|
{
|
|
agentSessionKey: "agent:main:subagent:parent",
|
|
},
|
|
);
|
|
|
|
expect(result.status).toBe("forbidden");
|
|
expect(result.error).toContain("Sandboxed sessions cannot spawn ACP sessions");
|
|
expect(hoisted.callGatewayMock).not.toHaveBeenCalled();
|
|
expect(hoisted.initializeSessionMock).not.toHaveBeenCalled();
|
|
});
|
|
|
|
it('forbids sandbox="require" for runtime=acp', async () => {
|
|
const result = await spawnAcpDirect(
|
|
{
|
|
task: "hello",
|
|
agentId: "codex",
|
|
sandbox: "require",
|
|
},
|
|
{
|
|
agentSessionKey: "agent:main:main",
|
|
},
|
|
);
|
|
|
|
expect(result.status).toBe("forbidden");
|
|
expect(result.error).toContain('sandbox="require"');
|
|
expect(hoisted.callGatewayMock).not.toHaveBeenCalled();
|
|
expect(hoisted.initializeSessionMock).not.toHaveBeenCalled();
|
|
});
|
|
|
|
it('streams ACP progress to parent when streamTo="parent"', async () => {
|
|
const firstHandle = createRelayHandle();
|
|
const secondHandle = createRelayHandle();
|
|
hoisted.startAcpSpawnParentStreamRelayMock
|
|
.mockReset()
|
|
.mockReturnValueOnce(firstHandle)
|
|
.mockReturnValueOnce(secondHandle);
|
|
|
|
const result = await spawnAcpDirect(
|
|
{
|
|
task: "Investigate flaky tests",
|
|
agentId: "codex",
|
|
streamTo: "parent",
|
|
},
|
|
{
|
|
agentSessionKey: "agent:main:main",
|
|
agentChannel: "discord",
|
|
agentAccountId: "default",
|
|
agentTo: "channel:parent-channel",
|
|
},
|
|
);
|
|
|
|
expect(result.status).toBe("accepted");
|
|
expect(result.streamLogPath).toBe("/tmp/sess-main.acp-stream.jsonl");
|
|
const agentCall = hoisted.callGatewayMock.mock.calls
|
|
.map((call: unknown[]) => call[0] as { method?: string; params?: Record<string, unknown> })
|
|
.find((request) => request.method === "agent");
|
|
const agentCallIndex = hoisted.callGatewayMock.mock.calls.findIndex(
|
|
(call: unknown[]) => (call[0] as { method?: string }).method === "agent",
|
|
);
|
|
const relayCallOrder = hoisted.startAcpSpawnParentStreamRelayMock.mock.invocationCallOrder[0];
|
|
const agentCallOrder = hoisted.callGatewayMock.mock.invocationCallOrder[agentCallIndex];
|
|
expect(agentCall?.params?.deliver).toBe(false);
|
|
expect(typeof relayCallOrder).toBe("number");
|
|
expect(typeof agentCallOrder).toBe("number");
|
|
expect(relayCallOrder < agentCallOrder).toBe(true);
|
|
expect(hoisted.startAcpSpawnParentStreamRelayMock).toHaveBeenCalledWith(
|
|
expect.objectContaining({
|
|
parentSessionKey: "agent:main:main",
|
|
agentId: "codex",
|
|
logPath: "/tmp/sess-main.acp-stream.jsonl",
|
|
emitStartNotice: false,
|
|
}),
|
|
);
|
|
const relayRuns = hoisted.startAcpSpawnParentStreamRelayMock.mock.calls.map(
|
|
(call: unknown[]) => (call[0] as { runId?: string }).runId,
|
|
);
|
|
expect(relayRuns).toContain(agentCall?.params?.idempotencyKey);
|
|
expect(relayRuns).toContain(result.runId);
|
|
expect(hoisted.resolveAcpSpawnStreamLogPathMock).toHaveBeenCalledWith({
|
|
childSessionKey: expect.stringMatching(/^agent:codex:acp:/),
|
|
});
|
|
expect(firstHandle.dispose).toHaveBeenCalledTimes(1);
|
|
expect(firstHandle.notifyStarted).not.toHaveBeenCalled();
|
|
expect(secondHandle.notifyStarted).toHaveBeenCalledTimes(1);
|
|
});
|
|
|
|
it("implicitly streams mode=run ACP spawns for subagent requester sessions", async () => {
|
|
hoisted.state.cfg = {
|
|
...hoisted.state.cfg,
|
|
agents: {
|
|
defaults: {
|
|
heartbeat: {
|
|
every: "30m",
|
|
target: "last",
|
|
},
|
|
},
|
|
},
|
|
};
|
|
const firstHandle = createRelayHandle();
|
|
const secondHandle = createRelayHandle();
|
|
hoisted.startAcpSpawnParentStreamRelayMock
|
|
.mockReset()
|
|
.mockReturnValueOnce(firstHandle)
|
|
.mockReturnValueOnce(secondHandle);
|
|
hoisted.loadSessionStoreMock.mockReset().mockImplementation(() => {
|
|
const store: Record<
|
|
string,
|
|
{ sessionId: string; updatedAt: number; deliveryContext?: unknown }
|
|
> = {
|
|
"agent:main:subagent:parent": {
|
|
sessionId: "parent-sess-1",
|
|
updatedAt: Date.now(),
|
|
deliveryContext: {
|
|
channel: "discord",
|
|
to: "channel:parent-channel",
|
|
accountId: "default",
|
|
},
|
|
},
|
|
};
|
|
return new Proxy(store, {
|
|
get(target, prop) {
|
|
if (typeof prop === "string" && prop.startsWith("agent:codex:acp:")) {
|
|
return { sessionId: "sess-123", updatedAt: Date.now() };
|
|
}
|
|
return target[prop as keyof typeof target];
|
|
},
|
|
});
|
|
});
|
|
|
|
const result = await spawnAcpDirect(
|
|
{
|
|
task: "Investigate flaky tests",
|
|
agentId: "codex",
|
|
},
|
|
{
|
|
agentSessionKey: "agent:main:subagent:parent",
|
|
agentChannel: "discord",
|
|
agentAccountId: "default",
|
|
agentTo: "channel:parent-channel",
|
|
},
|
|
);
|
|
|
|
expect(result.status).toBe("accepted");
|
|
expect(result.mode).toBe("run");
|
|
expect(result.streamLogPath).toBe("/tmp/sess-main.acp-stream.jsonl");
|
|
const agentCall = hoisted.callGatewayMock.mock.calls
|
|
.map((call: unknown[]) => call[0] as { method?: string; params?: Record<string, unknown> })
|
|
.find((request) => request.method === "agent");
|
|
expect(agentCall?.params?.deliver).toBe(false);
|
|
expect(agentCall?.params?.channel).toBeUndefined();
|
|
expect(agentCall?.params?.to).toBeUndefined();
|
|
expect(agentCall?.params?.threadId).toBeUndefined();
|
|
expect(hoisted.startAcpSpawnParentStreamRelayMock).toHaveBeenCalledWith(
|
|
expect.objectContaining({
|
|
parentSessionKey: "agent:main:subagent:parent",
|
|
agentId: "codex",
|
|
logPath: "/tmp/sess-main.acp-stream.jsonl",
|
|
emitStartNotice: false,
|
|
}),
|
|
);
|
|
expect(firstHandle.dispose).toHaveBeenCalledTimes(1);
|
|
expect(secondHandle.notifyStarted).toHaveBeenCalledTimes(1);
|
|
});
|
|
|
|
it("does not implicitly stream when heartbeat target is not session-local", async () => {
|
|
hoisted.state.cfg = {
|
|
...hoisted.state.cfg,
|
|
agents: {
|
|
defaults: {
|
|
heartbeat: {
|
|
every: "30m",
|
|
target: "discord",
|
|
to: "channel:ops-room",
|
|
},
|
|
},
|
|
},
|
|
};
|
|
|
|
const result = await spawnAcpDirect(
|
|
{
|
|
task: "Investigate flaky tests",
|
|
agentId: "codex",
|
|
},
|
|
{
|
|
agentSessionKey: "agent:main:subagent:fixed-target",
|
|
},
|
|
);
|
|
|
|
expect(result.status).toBe("accepted");
|
|
expect(result.mode).toBe("run");
|
|
expect(result.streamLogPath).toBeUndefined();
|
|
expect(hoisted.startAcpSpawnParentStreamRelayMock).not.toHaveBeenCalled();
|
|
});
|
|
|
|
it("does not implicitly stream when session scope is global", async () => {
|
|
hoisted.state.cfg = {
|
|
...hoisted.state.cfg,
|
|
session: {
|
|
...hoisted.state.cfg.session,
|
|
scope: "global",
|
|
},
|
|
agents: {
|
|
defaults: {
|
|
heartbeat: {
|
|
every: "30m",
|
|
target: "last",
|
|
},
|
|
},
|
|
},
|
|
};
|
|
|
|
const result = await spawnAcpDirect(
|
|
{
|
|
task: "Investigate flaky tests",
|
|
agentId: "codex",
|
|
},
|
|
{
|
|
agentSessionKey: "agent:main:subagent:global-scope",
|
|
},
|
|
);
|
|
|
|
expect(result.status).toBe("accepted");
|
|
expect(result.mode).toBe("run");
|
|
expect(result.streamLogPath).toBeUndefined();
|
|
expect(hoisted.startAcpSpawnParentStreamRelayMock).not.toHaveBeenCalled();
|
|
});
|
|
|
|
it("does not implicitly stream for subagent requester sessions when heartbeat is disabled", async () => {
|
|
hoisted.state.cfg = {
|
|
...hoisted.state.cfg,
|
|
agents: {
|
|
list: [{ id: "main", heartbeat: { every: "30m" } }, { id: "research" }],
|
|
},
|
|
};
|
|
|
|
const result = await spawnAcpDirect(
|
|
{
|
|
task: "Investigate flaky tests",
|
|
agentId: "codex",
|
|
},
|
|
{
|
|
agentSessionKey: "agent:research:subagent:orchestrator",
|
|
},
|
|
);
|
|
|
|
expect(result.status).toBe("accepted");
|
|
expect(result.mode).toBe("run");
|
|
expect(result.streamLogPath).toBeUndefined();
|
|
expect(hoisted.startAcpSpawnParentStreamRelayMock).not.toHaveBeenCalled();
|
|
});
|
|
|
|
it("does not implicitly stream for subagent requester sessions when heartbeat cadence is invalid", async () => {
|
|
hoisted.state.cfg = {
|
|
...hoisted.state.cfg,
|
|
agents: {
|
|
list: [
|
|
{
|
|
id: "research",
|
|
heartbeat: { every: "0m" },
|
|
},
|
|
],
|
|
},
|
|
};
|
|
|
|
const result = await spawnAcpDirect(
|
|
{
|
|
task: "Investigate flaky tests",
|
|
agentId: "codex",
|
|
},
|
|
{
|
|
agentSessionKey: "agent:research:subagent:invalid-heartbeat",
|
|
},
|
|
);
|
|
|
|
expect(result.status).toBe("accepted");
|
|
expect(result.mode).toBe("run");
|
|
expect(result.streamLogPath).toBeUndefined();
|
|
expect(hoisted.startAcpSpawnParentStreamRelayMock).not.toHaveBeenCalled();
|
|
});
|
|
|
|
it("does not implicitly stream when heartbeats are runtime-disabled", async () => {
|
|
hoisted.areHeartbeatsEnabledMock.mockReturnValue(false);
|
|
|
|
const result = await spawnAcpDirect(
|
|
{
|
|
task: "Investigate flaky tests",
|
|
agentId: "codex",
|
|
},
|
|
{
|
|
agentSessionKey: "agent:main:subagent:runtime-disabled",
|
|
},
|
|
);
|
|
|
|
expect(result.status).toBe("accepted");
|
|
expect(result.mode).toBe("run");
|
|
expect(result.streamLogPath).toBeUndefined();
|
|
expect(hoisted.startAcpSpawnParentStreamRelayMock).not.toHaveBeenCalled();
|
|
});
|
|
|
|
it("does not implicitly stream for legacy subagent requester session keys", async () => {
|
|
const result = await spawnAcpDirect(
|
|
{
|
|
task: "Investigate flaky tests",
|
|
agentId: "codex",
|
|
},
|
|
{
|
|
agentSessionKey: "subagent:legacy-worker",
|
|
},
|
|
);
|
|
|
|
expect(result.status).toBe("accepted");
|
|
expect(result.mode).toBe("run");
|
|
expect(result.streamLogPath).toBeUndefined();
|
|
expect(hoisted.startAcpSpawnParentStreamRelayMock).not.toHaveBeenCalled();
|
|
});
|
|
|
|
it("does not implicitly stream for subagent requester sessions with thread context", async () => {
|
|
const result = await spawnAcpDirect(
|
|
{
|
|
task: "Investigate flaky tests",
|
|
agentId: "codex",
|
|
},
|
|
{
|
|
agentSessionKey: "agent:main:subagent:thread-context",
|
|
agentChannel: "discord",
|
|
agentAccountId: "default",
|
|
agentTo: "channel:parent-channel",
|
|
agentThreadId: "requester-thread",
|
|
},
|
|
);
|
|
|
|
expect(result.status).toBe("accepted");
|
|
expect(result.mode).toBe("run");
|
|
expect(result.streamLogPath).toBeUndefined();
|
|
expect(hoisted.startAcpSpawnParentStreamRelayMock).not.toHaveBeenCalled();
|
|
});
|
|
|
|
it("does not implicitly stream for thread-bound subagent requester sessions", async () => {
|
|
hoisted.sessionBindingListBySessionMock.mockImplementation((targetSessionKey: string) => {
|
|
if (targetSessionKey === "agent:main:subagent:thread-bound") {
|
|
return [
|
|
createSessionBinding({
|
|
targetSessionKey,
|
|
targetKind: "subagent",
|
|
status: "active",
|
|
}),
|
|
];
|
|
}
|
|
return [];
|
|
});
|
|
|
|
const result = await spawnAcpDirect(
|
|
{
|
|
task: "Investigate flaky tests",
|
|
agentId: "codex",
|
|
},
|
|
{
|
|
agentSessionKey: "agent:main:subagent:thread-bound",
|
|
agentChannel: "discord",
|
|
agentAccountId: "default",
|
|
agentTo: "channel:parent-channel",
|
|
},
|
|
);
|
|
|
|
expect(result.status).toBe("accepted");
|
|
expect(result.mode).toBe("run");
|
|
expect(result.streamLogPath).toBeUndefined();
|
|
expect(hoisted.startAcpSpawnParentStreamRelayMock).not.toHaveBeenCalled();
|
|
});
|
|
|
|
it("announces parent relay start only after successful child dispatch", async () => {
|
|
const firstHandle = createRelayHandle();
|
|
const secondHandle = createRelayHandle();
|
|
hoisted.startAcpSpawnParentStreamRelayMock
|
|
.mockReset()
|
|
.mockReturnValueOnce(firstHandle)
|
|
.mockReturnValueOnce(secondHandle);
|
|
|
|
const result = await spawnAcpDirect(
|
|
{
|
|
task: "Investigate flaky tests",
|
|
agentId: "codex",
|
|
streamTo: "parent",
|
|
},
|
|
{
|
|
agentSessionKey: "agent:main:main",
|
|
},
|
|
);
|
|
|
|
expect(result.status).toBe("accepted");
|
|
expect(firstHandle.notifyStarted).not.toHaveBeenCalled();
|
|
expect(secondHandle.notifyStarted).toHaveBeenCalledTimes(1);
|
|
const notifyOrder = secondHandle.notifyStarted.mock.invocationCallOrder;
|
|
const agentCallIndex = hoisted.callGatewayMock.mock.calls.findIndex(
|
|
(call: unknown[]) => (call[0] as { method?: string }).method === "agent",
|
|
);
|
|
const agentCallOrder = hoisted.callGatewayMock.mock.invocationCallOrder[agentCallIndex];
|
|
expect(typeof agentCallOrder).toBe("number");
|
|
expect(typeof notifyOrder[0]).toBe("number");
|
|
expect(notifyOrder[0] > agentCallOrder).toBe(true);
|
|
});
|
|
|
|
it("keeps inline delivery for thread-bound ACP session mode", async () => {
|
|
const result = await spawnAcpDirect(
|
|
{
|
|
task: "Investigate flaky tests",
|
|
agentId: "codex",
|
|
mode: "session",
|
|
thread: true,
|
|
},
|
|
{
|
|
agentSessionKey: "agent:main:telegram:group:-1003342490704:topic:2",
|
|
agentChannel: "telegram",
|
|
agentAccountId: "default",
|
|
agentTo: "telegram:-1003342490704",
|
|
agentThreadId: "2",
|
|
},
|
|
);
|
|
|
|
expect(result.status).toBe("accepted");
|
|
expect(result.mode).toBe("session");
|
|
const agentCall = hoisted.callGatewayMock.mock.calls
|
|
.map((call: unknown[]) => call[0] as { method?: string; params?: Record<string, unknown> })
|
|
.find((request) => request.method === "agent");
|
|
expect(agentCall?.params?.deliver).toBe(true);
|
|
expect(agentCall?.params?.channel).toBe("telegram");
|
|
});
|
|
|
|
it("disposes pre-registered parent relay when initial ACP dispatch fails", async () => {
|
|
const relayHandle = createRelayHandle();
|
|
hoisted.startAcpSpawnParentStreamRelayMock.mockReturnValueOnce(relayHandle);
|
|
hoisted.callGatewayMock.mockImplementation(async (argsUnknown: unknown) => {
|
|
const args = argsUnknown as { method?: string };
|
|
if (args.method === "sessions.patch") {
|
|
return { ok: true };
|
|
}
|
|
if (args.method === "agent") {
|
|
throw new Error("agent dispatch failed");
|
|
}
|
|
if (args.method === "sessions.delete") {
|
|
return { ok: true };
|
|
}
|
|
return {};
|
|
});
|
|
|
|
const result = await spawnAcpDirect(
|
|
{
|
|
task: "Investigate flaky tests",
|
|
agentId: "codex",
|
|
streamTo: "parent",
|
|
},
|
|
{
|
|
agentSessionKey: "agent:main:main",
|
|
},
|
|
);
|
|
|
|
expect(result.status).toBe("error");
|
|
expect(result.error).toContain("agent dispatch failed");
|
|
expect(relayHandle.dispose).toHaveBeenCalledTimes(1);
|
|
expect(relayHandle.notifyStarted).not.toHaveBeenCalled();
|
|
});
|
|
|
|
it('rejects streamTo="parent" without requester session context', async () => {
|
|
const result = await spawnAcpDirect(
|
|
{
|
|
task: "Investigate flaky tests",
|
|
agentId: "codex",
|
|
streamTo: "parent",
|
|
},
|
|
{
|
|
agentChannel: "discord",
|
|
agentAccountId: "default",
|
|
agentTo: "channel:parent-channel",
|
|
},
|
|
);
|
|
|
|
expect(result.status).toBe("error");
|
|
expect(result.error).toContain('streamTo="parent"');
|
|
expect(hoisted.callGatewayMock).not.toHaveBeenCalled();
|
|
expect(hoisted.startAcpSpawnParentStreamRelayMock).not.toHaveBeenCalled();
|
|
});
|
|
});
|