diff --git a/src/cron/delivery.test.ts b/src/cron/delivery.test.ts index 970589f228e..1d2ffd0ac8d 100644 --- a/src/cron/delivery.test.ts +++ b/src/cron/delivery.test.ts @@ -166,6 +166,33 @@ describe("resolveFailureDestination", () => { expect(plan).toBeNull(); }); + it("keeps failure destinations for topic-routed announce jobs", () => { + const plan = resolveFailureDestination( + makeJob({ + delivery: { + mode: "announce", + channel: "telegram", + to: "-1003700845925", + threadId: "15", + accountId: "bot-a", + failureDestination: { + mode: "announce", + channel: "telegram", + to: "-1003700845925", + accountId: "bot-a", + }, + }, + }), + undefined, + ); + expect(plan).toEqual({ + mode: "announce", + channel: "telegram", + to: "-1003700845925", + accountId: "bot-a", + }); + }); + it("returns null when webhook failure destination matches the primary webhook target", () => { const plan = resolveFailureDestination( makeJob({ diff --git a/src/cron/delivery.ts b/src/cron/delivery.ts index b71589d10ce..6347bd87b82 100644 --- a/src/cron/delivery.ts +++ b/src/cron/delivery.ts @@ -237,6 +237,7 @@ function isSameDeliveryTarget( const primaryChannel = delivery.channel; const primaryTo = delivery.to; const primaryAccountId = delivery.accountId; + const primaryThreadId = normalizeThreadId(delivery.threadId); if (failurePlan.mode === "webhook") { return primaryMode === "webhook" && primaryTo === failurePlan.to; @@ -246,6 +247,7 @@ function isSameDeliveryTarget( const failureChannelNormalized = failurePlan.channel ?? "last"; return ( + primaryThreadId === undefined && failureChannelNormalized === primaryChannelNormalized && failurePlan.to === primaryTo && failurePlan.accountId === primaryAccountId diff --git a/src/gateway/protocol/schema/cron.ts b/src/gateway/protocol/schema/cron.ts index 3cba5a65781..a3e76ed0cb3 100644 --- a/src/gateway/protocol/schema/cron.ts +++ b/src/gateway/protocol/schema/cron.ts @@ -171,6 +171,7 @@ export const CronFailureDestinationSchema = Type.Object( const CronDeliverySharedProperties = { channel: Type.Optional(Type.Union([Type.Literal("last"), NonEmptyString])), + threadId: Type.Optional(Type.Union([Type.String(), Type.Number()])), accountId: Type.Optional(NonEmptyString), bestEffort: Type.Optional(Type.Boolean()), failureDestination: Type.Optional(CronFailureDestinationSchema), diff --git a/src/gateway/server.cron.test.ts b/src/gateway/server.cron.test.ts index 2590f63c23d..55908758acd 100644 --- a/src/gateway/server.cron.test.ts +++ b/src/gateway/server.cron.test.ts @@ -359,6 +359,69 @@ describe("gateway server cron", () => { expect(merged?.delivery?.channel).toBe("telegram"); expect(merged?.delivery?.to).toBe("19098680"); + const threadIdRes = await rpcReq(ws, "cron.add", { + name: "thread id roundtrip", + enabled: true, + schedule: { kind: "every", everyMs: 60_000 }, + sessionTarget: "isolated", + wakeMode: "next-heartbeat", + payload: { kind: "agentTurn", message: "hello" }, + delivery: { + mode: "announce", + channel: "telegram", + to: "-1003700845925", + threadId: 15, + }, + }); + expect(threadIdRes.ok).toBe(true); + const threadIdAdded = threadIdRes.payload as + | { + id?: unknown; + delivery?: { threadId?: unknown; mode?: unknown; channel?: unknown; to?: unknown }; + } + | undefined; + const threadIdJobId = typeof threadIdAdded?.id === "string" ? threadIdAdded.id : ""; + expect(threadIdJobId.length > 0).toBe(true); + expect(threadIdAdded?.delivery?.mode).toBe("announce"); + expect(threadIdAdded?.delivery?.channel).toBe("telegram"); + expect(threadIdAdded?.delivery?.to).toBe("-1003700845925"); + expect(threadIdAdded?.delivery?.threadId).toBe(15); + + const threadIdUpdateRes = await rpcReq(ws, "cron.update", { + id: threadIdJobId, + patch: { + delivery: { threadId: "16" }, + }, + }); + expect(threadIdUpdateRes.ok).toBe(true); + const threadIdUpdated = threadIdUpdateRes.payload as + | { + delivery?: { mode?: unknown; channel?: unknown; to?: unknown; threadId?: unknown }; + } + | undefined; + expect(threadIdUpdated?.delivery?.mode).toBe("announce"); + expect(threadIdUpdated?.delivery?.channel).toBe("telegram"); + expect(threadIdUpdated?.delivery?.to).toBe("-1003700845925"); + expect(threadIdUpdated?.delivery?.threadId).toBe("16"); + + const threadIdListRes = await rpcReq(ws, "cron.list", { + includeDisabled: true, + query: "thread id roundtrip", + }); + expect(threadIdListRes.ok).toBe(true); + const threadIdJobs = (threadIdListRes.payload as { jobs?: unknown } | null)?.jobs; + expect(Array.isArray(threadIdJobs)).toBe(true); + const threadIdListed = ( + threadIdJobs as Array<{ + id?: unknown; + delivery?: { threadId?: unknown; mode?: unknown; channel?: unknown; to?: unknown }; + }> + ).find((job) => job.id === threadIdJobId); + expect(threadIdListed?.delivery?.mode).toBe("announce"); + expect(threadIdListed?.delivery?.channel).toBe("telegram"); + expect(threadIdListed?.delivery?.to).toBe("-1003700845925"); + expect(threadIdListed?.delivery?.threadId).toBe("16"); + const modelOnlyPatchRes = await rpcReq(ws, "cron.update", { id: mergeJobId, patch: {