diff --git a/src/cron/delivery.test.ts b/src/cron/delivery.test.ts index 43eaa215114..1d2ffd0ac8d 100644 --- a/src/cron/delivery.test.ts +++ b/src/cron/delivery.test.ts @@ -84,6 +84,24 @@ describe("resolveCronDeliveryPlan", () => { expect(plan.to).toBe("123"); expect(plan.accountId).toBe("bot-a"); }); + + it("threads delivery.threadId when explicitly configured", () => { + const plan = resolveCronDeliveryPlan( + makeJob({ + delivery: { + mode: "announce", + channel: "telegram", + to: "-1003700845925", + threadId: " 15 ", + }, + }), + ); + expect(plan.mode).toBe("announce"); + expect(plan.requested).toBe(true); + expect(plan.channel).toBe("telegram"); + expect(plan.to).toBe("-1003700845925"); + expect(plan.threadId).toBe("15"); + }); }); describe("resolveFailureDestination", () => { @@ -148,6 +166,33 @@ describe("resolveFailureDestination", () => { expect(plan).toBeNull(); }); + it("keeps failure destinations for topic-routed announce jobs", () => { + const plan = resolveFailureDestination( + makeJob({ + delivery: { + mode: "announce", + channel: "telegram", + to: "-1003700845925", + threadId: "15", + accountId: "bot-a", + failureDestination: { + mode: "announce", + channel: "telegram", + to: "-1003700845925", + accountId: "bot-a", + }, + }, + }), + undefined, + ); + expect(plan).toEqual({ + mode: "announce", + channel: "telegram", + to: "-1003700845925", + accountId: "bot-a", + }); + }); + it("returns null when webhook failure destination matches the primary webhook target", () => { const plan = resolveFailureDestination( makeJob({ diff --git a/src/cron/delivery.ts b/src/cron/delivery.ts index 9d502a74fcb..6347bd87b82 100644 --- a/src/cron/delivery.ts +++ b/src/cron/delivery.ts @@ -14,6 +14,8 @@ export type CronDeliveryPlan = { mode: CronDeliveryMode; channel?: CronMessageChannel; to?: string; + /** Explicit thread/topic target from the delivery config, if set. */ + threadId?: string | number; /** Explicit channel account id from the delivery config, if set. */ accountId?: string; source: "delivery" | "payload"; @@ -47,6 +49,17 @@ function normalizeAccountId(value: unknown): string | undefined { return trimmed ? trimmed : undefined; } +function normalizeThreadId(value: unknown): string | number | undefined { + if (typeof value === "number" && Number.isFinite(value)) { + return value; + } + if (typeof value !== "string") { + return undefined; + } + const trimmed = value.trim(); + return trimmed ? trimmed : undefined; +} + export function resolveCronDeliveryPlan(job: CronJob): CronDeliveryPlan { const payload = job.payload.kind === "agentTurn" ? job.payload : null; const delivery = job.delivery; @@ -70,6 +83,9 @@ export function resolveCronDeliveryPlan(job: CronJob): CronDeliveryPlan { (delivery as { channel?: unknown } | undefined)?.channel, ); const deliveryTo = normalizeTo((delivery as { to?: unknown } | undefined)?.to); + const deliveryThreadId = normalizeThreadId( + (delivery as { threadId?: unknown } | undefined)?.threadId, + ); const channel = deliveryChannel ?? payloadChannel ?? "last"; const to = deliveryTo ?? payloadTo; const deliveryAccountId = normalizeAccountId( @@ -81,6 +97,7 @@ export function resolveCronDeliveryPlan(job: CronJob): CronDeliveryPlan { mode: resolvedMode, channel: resolvedMode === "announce" ? channel : undefined, to, + threadId: deliveryThreadId, accountId: deliveryAccountId, source: "delivery", requested: resolvedMode === "announce", @@ -220,6 +237,7 @@ function isSameDeliveryTarget( const primaryChannel = delivery.channel; const primaryTo = delivery.to; const primaryAccountId = delivery.accountId; + const primaryThreadId = normalizeThreadId(delivery.threadId); if (failurePlan.mode === "webhook") { return primaryMode === "webhook" && primaryTo === failurePlan.to; @@ -229,6 +247,7 @@ function isSameDeliveryTarget( const failureChannelNormalized = failurePlan.channel ?? "last"; return ( + primaryThreadId === undefined && failureChannelNormalized === primaryChannelNormalized && failurePlan.to === primaryTo && failurePlan.accountId === primaryAccountId diff --git a/src/cron/isolated-agent.delivery-target-thread-session.test.ts b/src/cron/isolated-agent.delivery-target-thread-session.test.ts index 68413f386b8..73db84e0168 100644 --- a/src/cron/isolated-agent.delivery-target-thread-session.test.ts +++ b/src/cron/isolated-agent.delivery-target-thread-session.test.ts @@ -170,4 +170,18 @@ describe("resolveDeliveryTarget thread session lookup", () => { expect(result.to).toBe("63448508"); expect(result.threadId).toBe(1008013); }); + + it("preserves explicit delivery.threadId when the target chat is plain telegram", async () => { + mockStore["/mock/store.json"] = {}; + + const result = await resolveDeliveryTarget(cfg, "main", { + channel: "telegram", + to: "63448508", + threadId: "15", + }); + + expect(result.to).toBe("63448508"); + expect(result.threadId).toBe("15"); + expect(result.channel).toBe("telegram"); + }); }); diff --git a/src/cron/isolated-agent.delivery.test-helpers.ts b/src/cron/isolated-agent.delivery.test-helpers.ts index 041f5750a95..ba7641d9311 100644 --- a/src/cron/isolated-agent.delivery.test-helpers.ts +++ b/src/cron/isolated-agent.delivery.test-helpers.ts @@ -54,6 +54,7 @@ export async function runTelegramAnnounceTurn(params: { mode: "announce"; channel: string; to?: string; + threadId?: string | number; bestEffort?: boolean; }; deliveryContract?: "cron-owned" | "shared"; diff --git a/src/cron/isolated-agent.direct-delivery-forum-topics.test.ts b/src/cron/isolated-agent.direct-delivery-forum-topics.test.ts index 0ee64e789fc..e6a2b46b2d8 100644 --- a/src/cron/isolated-agent.direct-delivery-forum-topics.test.ts +++ b/src/cron/isolated-agent.direct-delivery-forum-topics.test.ts @@ -12,7 +12,7 @@ import { setupIsolatedAgentTurnMocks } from "./isolated-agent.test-setup.js"; describe("runCronIsolatedAgentTurn forum topic delivery", () => { beforeEach(() => { - setupIsolatedAgentTurnMocks(); + setupIsolatedAgentTurnMocks({ fast: true }); }); it("routes forum-topic telegram targets through the correct delivery path", async () => { @@ -59,6 +59,30 @@ describe("runCronIsolatedAgentTurn forum topic delivery", () => { chatId: "123", text: "plain message", }); + + vi.clearAllMocks(); + mockAgentPayloads([{ text: "explicit thread message" }]); + + const explicitThreadRes = await runTelegramAnnounceTurn({ + home, + storePath, + deps, + delivery: { + mode: "announce", + channel: "telegram", + to: "123", + threadId: "15", + }, + }); + + expect(explicitThreadRes.status).toBe("ok"); + expect(explicitThreadRes.delivered).toBe(true); + expect(runSubagentAnnounceFlow).not.toHaveBeenCalled(); + expectDirectTelegramDelivery(deps, { + chatId: "123", + text: "explicit thread message", + messageThreadId: 15, + }); }); }); }); diff --git a/src/cron/isolated-agent/delivery-target.ts b/src/cron/isolated-agent/delivery-target.ts index 538ebdca273..590169c9e5e 100644 --- a/src/cron/isolated-agent/delivery-target.ts +++ b/src/cron/isolated-agent/delivery-target.ts @@ -43,6 +43,7 @@ export async function resolveDeliveryTarget( jobPayload: { channel?: "last" | ChannelId; to?: string; + threadId?: string | number; /** Explicit accountId from job.delivery — overrides session-derived and binding-derived values. */ accountId?: string; sessionKey?: string; @@ -50,6 +51,8 @@ export async function resolveDeliveryTarget( ): Promise { const requestedChannel = typeof jobPayload.channel === "string" ? jobPayload.channel : "last"; const explicitTo = typeof jobPayload.to === "string" ? jobPayload.to : undefined; + const explicitThreadId = + jobPayload.threadId != null && jobPayload.threadId !== "" ? jobPayload.threadId : undefined; const allowMismatchedLastTo = requestedChannel === "last"; const sessionCfg = cfg.session; @@ -67,6 +70,7 @@ export async function resolveDeliveryTarget( entry: main, requestedChannel, explicitTo, + explicitThreadId, allowMismatchedLastTo, }); @@ -93,6 +97,7 @@ export async function resolveDeliveryTarget( entry: main, requestedChannel, explicitTo, + explicitThreadId, fallbackChannel, allowMismatchedLastTo, mode: preliminary.mode, diff --git a/src/cron/isolated-agent/run.ts b/src/cron/isolated-agent/run.ts index 1a122f56864..96a6b80edeb 100644 --- a/src/cron/isolated-agent/run.ts +++ b/src/cron/isolated-agent/run.ts @@ -201,6 +201,7 @@ async function resolveCronDeliveryContext(params: { const resolvedDelivery = await resolveDeliveryTarget(params.cfg, params.agentId, { channel: deliveryPlan.channel ?? "last", to: deliveryPlan.to, + threadId: deliveryPlan.threadId, accountId: deliveryPlan.accountId, sessionKey: params.job.sessionKey, }); diff --git a/src/cron/normalize.test.ts b/src/cron/normalize.test.ts index 969faa6bb6f..0424cf3d02b 100644 --- a/src/cron/normalize.test.ts +++ b/src/cron/normalize.test.ts @@ -248,6 +248,21 @@ describe("normalizeCronJobCreate", () => { expect(delivery.accountId).toBe("coordinator"); }); + it("normalizes delivery threadId and strips blanks", () => { + const normalized = normalizeIsolatedAgentTurnCreateJob({ + name: "delivery thread", + delivery: { + mode: "announce", + channel: "telegram", + to: "-1003816714067", + threadId: " 15 ", + }, + }); + + const delivery = normalized.delivery as Record; + expect(delivery.threadId).toBe("15"); + }); + it("strips empty accountId from delivery", () => { const normalized = normalizeIsolatedAgentTurnCreateJob({ name: "empty account", diff --git a/src/cron/normalize.ts b/src/cron/normalize.ts index b1afdfaaa12..ebcd65102e3 100644 --- a/src/cron/normalize.ts +++ b/src/cron/normalize.ts @@ -193,6 +193,18 @@ function coerceDelivery(delivery: UnknownRecord) { delete next.to; } } + if (typeof delivery.threadId === "number" && Number.isFinite(delivery.threadId)) { + next.threadId = delivery.threadId; + } else if (typeof delivery.threadId === "string") { + const trimmed = delivery.threadId.trim(); + if (trimmed) { + next.threadId = trimmed; + } else { + delete next.threadId; + } + } else if ("threadId" in next) { + delete next.threadId; + } if (typeof delivery.accountId === "string") { const trimmed = delivery.accountId.trim(); if (trimmed) { diff --git a/src/cron/service.jobs.test.ts b/src/cron/service.jobs.test.ts index c514f7528ba..4688aa15c7a 100644 --- a/src/cron/service.jobs.test.ts +++ b/src/cron/service.jobs.test.ts @@ -167,6 +167,26 @@ describe("applyJobPatch", () => { expect(job.delivery?.accountId).toBeUndefined(); }); + it("merges delivery.threadId from patch and preserves existing", () => { + const job = createIsolatedAgentTurnJob("job-thread", { + mode: "announce", + channel: "telegram", + to: "-100123", + }); + + applyJobPatch(job, { delivery: { mode: "announce", threadId: " 15 " } }); + expect(job.delivery?.threadId).toBe("15"); + expect(job.delivery?.mode).toBe("announce"); + expect(job.delivery?.to).toBe("-100123"); + + applyJobPatch(job, { delivery: { mode: "announce", to: "-100999" } }); + expect(job.delivery?.threadId).toBe("15"); + expect(job.delivery?.to).toBe("-100999"); + + applyJobPatch(job, { delivery: { mode: "announce", threadId: "" } }); + expect(job.delivery?.threadId).toBeUndefined(); + }); + it("persists agentTurn payload.lightContext updates when editing existing jobs", () => { const job = createIsolatedAgentTurnJob("job-light-context", { mode: "announce", diff --git a/src/cron/service/jobs.ts b/src/cron/service/jobs.ts index 542ba81053d..3b56e4c8c26 100644 --- a/src/cron/service/jobs.ts +++ b/src/cron/service/jobs.ts @@ -780,6 +780,13 @@ function normalizeOptionalTrimmedString(value: unknown): string | undefined { return trimmed ? trimmed : undefined; } +function normalizeOptionalThreadId(value: unknown): string | number | undefined { + if (typeof value === "number" && Number.isFinite(value)) { + return value; + } + return normalizeOptionalTrimmedString(value); +} + function mergeCronDelivery( existing: CronDelivery | undefined, patch: CronDeliveryPatch, @@ -788,6 +795,7 @@ function mergeCronDelivery( mode: existing?.mode ?? "none", channel: existing?.channel, to: existing?.to, + threadId: existing?.threadId, accountId: existing?.accountId, bestEffort: existing?.bestEffort, failureDestination: existing?.failureDestination, @@ -802,6 +810,9 @@ function mergeCronDelivery( if ("to" in patch) { next.to = normalizeOptionalTrimmedString(patch.to); } + if ("threadId" in patch) { + next.threadId = normalizeOptionalThreadId(patch.threadId); + } if ("accountId" in patch) { next.accountId = normalizeOptionalTrimmedString(patch.accountId); } diff --git a/src/cron/types.ts b/src/cron/types.ts index 02078d15424..aeee20e0599 100644 --- a/src/cron/types.ts +++ b/src/cron/types.ts @@ -24,6 +24,8 @@ export type CronDelivery = { mode: CronDeliveryMode; channel?: CronMessageChannel; to?: string; + /** Explicit thread or topic target for threaded channels (for example Telegram forum topics). */ + threadId?: string | number; /** Explicit channel account id for multi-account setups (e.g. multiple Telegram bots). */ accountId?: string; bestEffort?: boolean; diff --git a/src/gateway/protocol/schema/cron.ts b/src/gateway/protocol/schema/cron.ts index f61d3e42711..9de5b250022 100644 --- a/src/gateway/protocol/schema/cron.ts +++ b/src/gateway/protocol/schema/cron.ts @@ -176,6 +176,7 @@ export const CronFailureDestinationSchema = Type.Object( const CronDeliverySharedProperties = { channel: Type.Optional(Type.Union([Type.Literal("last"), NonEmptyString])), + threadId: Type.Optional(Type.Union([Type.String(), Type.Number()])), accountId: Type.Optional(NonEmptyString), bestEffort: Type.Optional(Type.Boolean()), failureDestination: Type.Optional(CronFailureDestinationSchema), diff --git a/src/gateway/server.cron.test.ts b/src/gateway/server.cron.test.ts index efdbecba004..ba524697504 100644 --- a/src/gateway/server.cron.test.ts +++ b/src/gateway/server.cron.test.ts @@ -375,6 +375,69 @@ describe("gateway server cron", () => { expect(merged?.delivery?.channel).toBe("telegram"); expect(merged?.delivery?.to).toBe("19098680"); + const threadIdRes = await rpcReq(ws, "cron.add", { + name: "thread id roundtrip", + enabled: true, + schedule: { kind: "every", everyMs: 60_000 }, + sessionTarget: "isolated", + wakeMode: "next-heartbeat", + payload: { kind: "agentTurn", message: "hello" }, + delivery: { + mode: "announce", + channel: "telegram", + to: "-1003700845925", + threadId: 15, + }, + }); + expect(threadIdRes.ok).toBe(true); + const threadIdAdded = threadIdRes.payload as + | { + id?: unknown; + delivery?: { threadId?: unknown; mode?: unknown; channel?: unknown; to?: unknown }; + } + | undefined; + const threadIdJobId = typeof threadIdAdded?.id === "string" ? threadIdAdded.id : ""; + expect(threadIdJobId.length > 0).toBe(true); + expect(threadIdAdded?.delivery?.mode).toBe("announce"); + expect(threadIdAdded?.delivery?.channel).toBe("telegram"); + expect(threadIdAdded?.delivery?.to).toBe("-1003700845925"); + expect(threadIdAdded?.delivery?.threadId).toBe(15); + + const threadIdUpdateRes = await rpcReq(ws, "cron.update", { + id: threadIdJobId, + patch: { + delivery: { threadId: "16" }, + }, + }); + expect(threadIdUpdateRes.ok).toBe(true); + const threadIdUpdated = threadIdUpdateRes.payload as + | { + delivery?: { mode?: unknown; channel?: unknown; to?: unknown; threadId?: unknown }; + } + | undefined; + expect(threadIdUpdated?.delivery?.mode).toBe("announce"); + expect(threadIdUpdated?.delivery?.channel).toBe("telegram"); + expect(threadIdUpdated?.delivery?.to).toBe("-1003700845925"); + expect(threadIdUpdated?.delivery?.threadId).toBe("16"); + + const threadIdListRes = await rpcReq(ws, "cron.list", { + includeDisabled: true, + query: "thread id roundtrip", + }); + expect(threadIdListRes.ok).toBe(true); + const threadIdJobs = (threadIdListRes.payload as { jobs?: unknown } | null)?.jobs; + expect(Array.isArray(threadIdJobs)).toBe(true); + const threadIdListed = ( + threadIdJobs as Array<{ + id?: unknown; + delivery?: { threadId?: unknown; mode?: unknown; channel?: unknown; to?: unknown }; + }> + ).find((job) => job.id === threadIdJobId); + expect(threadIdListed?.delivery?.mode).toBe("announce"); + expect(threadIdListed?.delivery?.channel).toBe("telegram"); + expect(threadIdListed?.delivery?.to).toBe("-1003700845925"); + expect(threadIdListed?.delivery?.threadId).toBe("16"); + const modelOnlyPatchRes = await rpcReq(ws, "cron.update", { id: mergeJobId, patch: {