CLI: add subscribe mode e2e tests

This commit is contained in:
kumarabhirup 2026-02-21 11:03:37 -08:00
parent da9f5ee992
commit d20b67c370
No known key found for this signature in database
GPG Key ID: DB7CA2289CAB0167

View File

@ -6,10 +6,63 @@ import { beforeEach, describe, expect, it, vi } from "vitest";
vi.mock("../gateway/call.js", () => ({
callGateway: vi.fn(),
randomIdempotencyKey: () => "idem-1",
buildGatewayConnectionDetails: vi.fn(() => ({
url: "ws://127.0.0.1:18789",
urlSource: "test",
message: "Gateway target: ws://127.0.0.1:18789",
})),
}));
vi.mock("./agent.js", () => ({
agentCommand: vi.fn(),
}));
vi.mock("../gateway/client.js", () => {
class MockGatewayClient {
static instances: MockGatewayClient[] = [];
private opts: Record<string, unknown>;
constructor(opts: Record<string, unknown>) {
this.opts = opts;
MockGatewayClient.instances.push(this);
}
start() {
setTimeout(async () => {
const onHelloOk = this.opts.onHelloOk as (() => Promise<void>) | undefined;
await onHelloOk?.();
}, 0);
}
async request(method: string, params?: Record<string, unknown>) {
if (method === "agent.subscribe") {
const sessionKey = String(params?.sessionKey ?? "");
const onEvent = this.opts.onEvent as ((evt: Record<string, unknown>) => void) | undefined;
onEvent?.({
event: "agent",
payload: {
sessionKey,
stream: "assistant",
data: { delta: "match" },
globalSeq: 11,
},
});
onEvent?.({
event: "agent",
payload: {
sessionKey: "agent:main:web:other",
stream: "assistant",
data: { delta: "ignore" },
globalSeq: 12,
},
});
}
return {};
}
stop() {}
}
return { GatewayClient: MockGatewayClient };
});
import type { OpenClawConfig } from "../config/config.js";
import type { RuntimeEnv } from "../runtime.js";
@ -222,6 +275,49 @@ describe("agentCliCommand", () => {
fs.rmSync(dir, { recursive: true, force: true });
}
});
it("keeps subscribe mode alive until signaled and filters to target session", async () => {
const dir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-agent-cli-"));
const store = path.join(dir, "sessions.json");
mockConfig(store);
const stdoutSpy = vi.spyOn(process.stdout, "write").mockImplementation(() => true);
try {
const promise = agentCliCommand(
{
message: "unused",
streamJson: true,
subscribeSessionKey: "agent:main:web:target",
afterSeq: "10",
} as unknown as Parameters<typeof agentCliCommand>[0],
runtime,
);
// Subscribe mode should not resolve immediately.
let settled = false;
void promise.then(() => {
settled = true;
});
await new Promise((r) => setTimeout(r, 20));
expect(settled).toBe(false);
// Trigger signal-driven shutdown.
(process as unknown as { emit: (event: string) => boolean }).emit("SIGTERM");
await promise;
expect(callGateway).not.toHaveBeenCalled();
const writes = stdoutSpy.mock.calls.map(([data]) => String(data));
const parsed = writes.map((line) => JSON.parse(line));
const agentEvents = parsed.filter((evt) => evt.event === "agent");
expect(agentEvents).toHaveLength(1);
expect(agentEvents[0].sessionKey).toBe("agent:main:web:target");
expect(parsed[parsed.length - 1]).toMatchObject({ event: "aborted", reason: "signal" });
} finally {
stdoutSpy.mockRestore();
fs.rmSync(dir, { recursive: true, force: true });
}
});
});
describe("emitNdjsonLine", () => {