fix(cli): quiet cron status checks and retry transient gateway transport

This commit is contained in:
MaxxxDong 2026-03-17 23:19:57 +08:00
parent 7cd0acf8af
commit eb52408112
4 changed files with 159 additions and 14 deletions

View File

@ -21,7 +21,7 @@ vi.mock("./gateway-rpc.js", async () => {
return {
...actual,
callGatewayFromCli: (method: string, opts: unknown, params?: unknown, extra?: unknown) =>
callGatewayFromCli(method, opts, params, extra as number | undefined),
callGatewayFromCli(method, opts, params, extra),
};
});
@ -266,6 +266,43 @@ describe("cron cli", () => {
expect(params?.delivery?.mode).toBe("announce");
});
it("skips cron.status helper in json mode", async () => {
await runCronCommand([
"cron",
"add",
"--name",
"Json add",
"--cron",
"* * * * *",
"--session",
"isolated",
"--message",
"hello",
"--json",
]);
const statusCalls = callGatewayFromCli.mock.calls.filter((call) => call[0] === "cron.status");
expect(statusCalls).toHaveLength(0);
});
it("runs cron.status helper quietly outside json mode", async () => {
await runCronCommand([
"cron",
"add",
"--name",
"Quiet helper",
"--cron",
"* * * * *",
"--session",
"isolated",
"--message",
"hello",
]);
const statusCall = callGatewayFromCli.mock.calls.find((call) => call[0] === "cron.status");
expect(statusCall?.[3]).toEqual({ progress: false, quiet: true });
});
it("infers sessionTarget from payload when --session is omitted", async () => {
await runCronCommand([
"cron",

View File

@ -22,8 +22,19 @@ export function handleCronCliError(err: unknown) {
}
export async function warnIfCronSchedulerDisabled(opts: GatewayRpcOpts) {
if (opts?.json === true) {
return;
}
try {
const res = (await callGatewayFromCli("cron.status", opts, {})) as {
const res = (await callGatewayFromCli(
"cron.status",
opts,
{},
{
progress: false,
quiet: true,
},
)) as {
enabled?: boolean;
storePath?: string;
};

View File

@ -0,0 +1,75 @@
import { beforeEach, describe, expect, it, vi } from "vitest";
const callGateway = vi.fn();
const withProgress = vi.fn(async (_opts: unknown, fn: () => Promise<unknown>) => await fn());
vi.mock("../gateway/call.js", () => ({
callGateway,
}));
vi.mock("./progress.js", () => ({
withProgress,
}));
const { callGatewayFromCli } = await import("./gateway-rpc.js");
describe("callGatewayFromCli", () => {
beforeEach(() => {
callGateway.mockReset();
withProgress.mockClear();
});
it("uses probe mode for quiet calls", async () => {
callGateway.mockResolvedValueOnce({ ok: true });
await callGatewayFromCli("cron.status", { timeout: "30000" }, {}, { quiet: true });
expect(callGateway).toHaveBeenCalledTimes(1);
expect(callGateway).toHaveBeenCalledWith(
expect.objectContaining({
method: "cron.status",
mode: "probe",
clientName: "cli",
}),
);
});
it("retries transient transport errors with probe mode after the first CLI attempt", async () => {
callGateway
.mockRejectedValueOnce(new Error("gateway closed (1000 normal closure): no close reason"))
.mockResolvedValueOnce({ ok: true });
await callGatewayFromCli("cron.add", { timeout: "30000" }, { name: "job" });
expect(callGateway).toHaveBeenCalledTimes(2);
expect(callGateway.mock.calls[0]?.[0]).toEqual(
expect.objectContaining({ method: "cron.add", mode: "cli" }),
);
expect(callGateway.mock.calls[1]?.[0]).toEqual(
expect.objectContaining({ method: "cron.add", mode: "probe" }),
);
});
it("does not retry non-transport errors", async () => {
callGateway.mockRejectedValueOnce(new Error("active gateway does not support required method"));
await expect(
callGatewayFromCli("cron.add", { timeout: "30000" }, { name: "job" }),
).rejects.toThrow("active gateway does not support required method");
expect(callGateway).toHaveBeenCalledTimes(1);
});
it("stops after three transient failures", async () => {
callGateway.mockRejectedValue(
new Error("gateway closed (1006 abnormal closure (no close frame)): no close reason"),
);
await expect(
callGatewayFromCli("cron.add", { timeout: "30000" }, { name: "job" }),
).rejects.toThrow("gateway closed (1006 abnormal closure (no close frame)): no close reason");
expect(callGateway).toHaveBeenCalledTimes(3);
expect(callGateway.mock.calls.map((call) => call[0]?.mode)).toEqual(["cli", "probe", "probe"]);
});
});

View File

@ -19,29 +19,51 @@ export function addGatewayClientOptions(cmd: Command) {
.option("--expect-final", "Wait for final response (agent)", false);
}
function isRetryableCliTransportError(err: unknown): boolean {
const message = (err instanceof Error ? err.message : String(err)).toLowerCase();
return (
message.includes("gateway closed (1000") ||
message.includes("gateway closed (1006") ||
message.includes("gateway timeout") ||
message.includes("connect challenge timeout")
);
}
export async function callGatewayFromCli(
method: string,
opts: GatewayRpcOpts,
params?: unknown,
extra?: { expectFinal?: boolean; progress?: boolean },
extra?: { expectFinal?: boolean; progress?: boolean; quiet?: boolean },
) {
const showProgress = extra?.progress ?? opts.json !== true;
const quiet = extra?.quiet === true;
const baseMode = quiet ? GATEWAY_CLIENT_MODES.PROBE : GATEWAY_CLIENT_MODES.CLI;
return await withProgress(
{
label: `Gateway ${method}`,
indeterminate: true,
enabled: showProgress,
},
async () =>
await callGateway({
url: opts.url,
token: opts.token,
method,
params,
expectFinal: extra?.expectFinal ?? Boolean(opts.expectFinal),
timeoutMs: Number(opts.timeout ?? 10_000),
clientName: GATEWAY_CLIENT_NAMES.CLI,
mode: GATEWAY_CLIENT_MODES.CLI,
}),
async () => {
for (let attempt = 0; attempt < 3; attempt += 1) {
try {
return await callGateway({
url: opts.url,
token: opts.token,
method,
params,
expectFinal: extra?.expectFinal ?? Boolean(opts.expectFinal),
timeoutMs: Number(opts.timeout ?? 10_000),
clientName: GATEWAY_CLIENT_NAMES.CLI,
mode: attempt === 0 ? baseMode : GATEWAY_CLIENT_MODES.PROBE,
});
} catch (err) {
if (attempt === 2 || !isRetryableCliTransportError(err)) {
throw err;
}
}
}
throw new Error(`gateway retries exhausted for ${method}`);
},
);
}