diff --git a/src/commands/agent-via-gateway.e2e.test.ts b/src/commands/agent-via-gateway.e2e.test.ts index 4b3e3acac43..f506329380f 100644 --- a/src/commands/agent-via-gateway.e2e.test.ts +++ b/src/commands/agent-via-gateway.e2e.test.ts @@ -6,10 +6,63 @@ import { beforeEach, describe, expect, it, vi } from "vitest"; vi.mock("../gateway/call.js", () => ({ callGateway: vi.fn(), randomIdempotencyKey: () => "idem-1", + buildGatewayConnectionDetails: vi.fn(() => ({ + url: "ws://127.0.0.1:18789", + urlSource: "test", + message: "Gateway target: ws://127.0.0.1:18789", + })), })); vi.mock("./agent.js", () => ({ agentCommand: vi.fn(), })); +vi.mock("../gateway/client.js", () => { + class MockGatewayClient { + static instances: MockGatewayClient[] = []; + private opts: Record; + + constructor(opts: Record) { + this.opts = opts; + MockGatewayClient.instances.push(this); + } + + start() { + setTimeout(async () => { + const onHelloOk = this.opts.onHelloOk as (() => Promise) | undefined; + await onHelloOk?.(); + }, 0); + } + + async request(method: string, params?: Record) { + if (method === "agent.subscribe") { + const sessionKey = String(params?.sessionKey ?? ""); + const onEvent = this.opts.onEvent as ((evt: Record) => void) | undefined; + onEvent?.({ + event: "agent", + payload: { + sessionKey, + stream: "assistant", + data: { delta: "match" }, + globalSeq: 11, + }, + }); + onEvent?.({ + event: "agent", + payload: { + sessionKey: "agent:main:web:other", + stream: "assistant", + data: { delta: "ignore" }, + globalSeq: 12, + }, + }); + } + return {}; + } + + stop() {} + } + + return { GatewayClient: MockGatewayClient }; +}); import type { OpenClawConfig } from "../config/config.js"; import type { RuntimeEnv } from "../runtime.js"; @@ -222,6 +275,49 @@ describe("agentCliCommand", () => { fs.rmSync(dir, { recursive: true, force: true }); } }); + + it("keeps subscribe mode alive until signaled and filters to target session", async () => { + const dir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-agent-cli-")); + const store = path.join(dir, "sessions.json"); + mockConfig(store); + + const stdoutSpy = vi.spyOn(process.stdout, "write").mockImplementation(() => true); + + try { + const promise = agentCliCommand( + { + message: "unused", + streamJson: true, + subscribeSessionKey: "agent:main:web:target", + afterSeq: "10", + } as unknown as Parameters[0], + runtime, + ); + + // Subscribe mode should not resolve immediately. + let settled = false; + void promise.then(() => { + settled = true; + }); + await new Promise((r) => setTimeout(r, 20)); + expect(settled).toBe(false); + + // Trigger signal-driven shutdown. + (process as unknown as { emit: (event: string) => boolean }).emit("SIGTERM"); + await promise; + + expect(callGateway).not.toHaveBeenCalled(); + const writes = stdoutSpy.mock.calls.map(([data]) => String(data)); + const parsed = writes.map((line) => JSON.parse(line)); + const agentEvents = parsed.filter((evt) => evt.event === "agent"); + expect(agentEvents).toHaveLength(1); + expect(agentEvents[0].sessionKey).toBe("agent:main:web:target"); + expect(parsed[parsed.length - 1]).toMatchObject({ event: "aborted", reason: "signal" }); + } finally { + stdoutSpy.mockRestore(); + fs.rmSync(dir, { recursive: true, force: true }); + } + }); }); describe("emitNdjsonLine", () => {