Cron: fix threadId failure fallback

This commit is contained in:
Alex Alaniz 2026-03-12 14:13:44 -04:00
parent 33dbdd724b
commit 656eb1ddb6
4 changed files with 93 additions and 0 deletions

View File

@ -166,6 +166,33 @@ describe("resolveFailureDestination", () => {
expect(plan).toBeNull();
});
it("keeps failure destinations for topic-routed announce jobs", () => {
const plan = resolveFailureDestination(
makeJob({
delivery: {
mode: "announce",
channel: "telegram",
to: "-1003700845925",
threadId: "15",
accountId: "bot-a",
failureDestination: {
mode: "announce",
channel: "telegram",
to: "-1003700845925",
accountId: "bot-a",
},
},
}),
undefined,
);
expect(plan).toEqual({
mode: "announce",
channel: "telegram",
to: "-1003700845925",
accountId: "bot-a",
});
});
it("returns null when webhook failure destination matches the primary webhook target", () => {
const plan = resolveFailureDestination(
makeJob({

View File

@ -237,6 +237,7 @@ function isSameDeliveryTarget(
const primaryChannel = delivery.channel;
const primaryTo = delivery.to;
const primaryAccountId = delivery.accountId;
const primaryThreadId = normalizeThreadId(delivery.threadId);
if (failurePlan.mode === "webhook") {
return primaryMode === "webhook" && primaryTo === failurePlan.to;
@ -246,6 +247,7 @@ function isSameDeliveryTarget(
const failureChannelNormalized = failurePlan.channel ?? "last";
return (
primaryThreadId === undefined &&
failureChannelNormalized === primaryChannelNormalized &&
failurePlan.to === primaryTo &&
failurePlan.accountId === primaryAccountId

View File

@ -171,6 +171,7 @@ export const CronFailureDestinationSchema = Type.Object(
const CronDeliverySharedProperties = {
channel: Type.Optional(Type.Union([Type.Literal("last"), NonEmptyString])),
threadId: Type.Optional(Type.Union([Type.String(), Type.Number()])),
accountId: Type.Optional(NonEmptyString),
bestEffort: Type.Optional(Type.Boolean()),
failureDestination: Type.Optional(CronFailureDestinationSchema),

View File

@ -359,6 +359,69 @@ describe("gateway server cron", () => {
expect(merged?.delivery?.channel).toBe("telegram");
expect(merged?.delivery?.to).toBe("19098680");
const threadIdRes = await rpcReq(ws, "cron.add", {
name: "thread id roundtrip",
enabled: true,
schedule: { kind: "every", everyMs: 60_000 },
sessionTarget: "isolated",
wakeMode: "next-heartbeat",
payload: { kind: "agentTurn", message: "hello" },
delivery: {
mode: "announce",
channel: "telegram",
to: "-1003700845925",
threadId: 15,
},
});
expect(threadIdRes.ok).toBe(true);
const threadIdAdded = threadIdRes.payload as
| {
id?: unknown;
delivery?: { threadId?: unknown; mode?: unknown; channel?: unknown; to?: unknown };
}
| undefined;
const threadIdJobId = typeof threadIdAdded?.id === "string" ? threadIdAdded.id : "";
expect(threadIdJobId.length > 0).toBe(true);
expect(threadIdAdded?.delivery?.mode).toBe("announce");
expect(threadIdAdded?.delivery?.channel).toBe("telegram");
expect(threadIdAdded?.delivery?.to).toBe("-1003700845925");
expect(threadIdAdded?.delivery?.threadId).toBe(15);
const threadIdUpdateRes = await rpcReq(ws, "cron.update", {
id: threadIdJobId,
patch: {
delivery: { threadId: "16" },
},
});
expect(threadIdUpdateRes.ok).toBe(true);
const threadIdUpdated = threadIdUpdateRes.payload as
| {
delivery?: { mode?: unknown; channel?: unknown; to?: unknown; threadId?: unknown };
}
| undefined;
expect(threadIdUpdated?.delivery?.mode).toBe("announce");
expect(threadIdUpdated?.delivery?.channel).toBe("telegram");
expect(threadIdUpdated?.delivery?.to).toBe("-1003700845925");
expect(threadIdUpdated?.delivery?.threadId).toBe("16");
const threadIdListRes = await rpcReq(ws, "cron.list", {
includeDisabled: true,
query: "thread id roundtrip",
});
expect(threadIdListRes.ok).toBe(true);
const threadIdJobs = (threadIdListRes.payload as { jobs?: unknown } | null)?.jobs;
expect(Array.isArray(threadIdJobs)).toBe(true);
const threadIdListed = (
threadIdJobs as Array<{
id?: unknown;
delivery?: { threadId?: unknown; mode?: unknown; channel?: unknown; to?: unknown };
}>
).find((job) => job.id === threadIdJobId);
expect(threadIdListed?.delivery?.mode).toBe("announce");
expect(threadIdListed?.delivery?.channel).toBe("telegram");
expect(threadIdListed?.delivery?.to).toBe("-1003700845925");
expect(threadIdListed?.delivery?.threadId).toBe("16");
const modelOnlyPatchRes = await rpcReq(ws, "cron.update", {
id: mergeJobId,
patch: {