From ad88dd8c7e66c209c149afe68bcc389450f5037f Mon Sep 17 00:00:00 2001 From: Alex Alaniz Date: Thu, 12 Mar 2026 03:31:47 -0400 Subject: [PATCH 1/5] Cron: preserve delivery.threadId for topic delivery --- src/cron/delivery.test.ts | 20 ++++++++++- src/cron/delivery.ts | 21 ++++++++++-- ...ent.delivery-target-thread-session.test.ts | 14 ++++++++ .../isolated-agent.delivery.test-helpers.ts | 3 +- ...agent.direct-delivery-forum-topics.test.ts | 26 ++++++++++++++- src/cron/isolated-agent/delivery-target.ts | 7 +++- src/cron/isolated-agent/run.ts | 9 ++--- src/cron/normalize.test.ts | 15 +++++++++ src/cron/normalize.ts | 14 +++++++- src/cron/service.jobs.test.ts | 24 ++++++++++++-- src/cron/service/jobs.ts | 33 ++++++++++++------- src/cron/types.ts | 2 ++ 12 files changed, 164 insertions(+), 24 deletions(-) diff --git a/src/cron/delivery.test.ts b/src/cron/delivery.test.ts index 43eaa215114..56497476adc 100644 --- a/src/cron/delivery.test.ts +++ b/src/cron/delivery.test.ts @@ -1,6 +1,6 @@ import { describe, expect, it } from "vitest"; -import { resolveCronDeliveryPlan, resolveFailureDestination } from "./delivery.js"; import type { CronJob } from "./types.js"; +import { resolveCronDeliveryPlan, resolveFailureDestination } from "./delivery.js"; function makeJob(overrides: Partial): CronJob { const now = Date.now(); @@ -84,6 +84,24 @@ describe("resolveCronDeliveryPlan", () => { expect(plan.to).toBe("123"); expect(plan.accountId).toBe("bot-a"); }); + + it("threads delivery.threadId when explicitly configured", () => { + const plan = resolveCronDeliveryPlan( + makeJob({ + delivery: { + mode: "announce", + channel: "telegram", + to: "-1003700845925", + threadId: " 15 ", + }, + }), + ); + expect(plan.mode).toBe("announce"); + expect(plan.requested).toBe(true); + expect(plan.channel).toBe("telegram"); + expect(plan.to).toBe("-1003700845925"); + expect(plan.threadId).toBe("15"); + }); }); describe("resolveFailureDestination", () => { diff --git a/src/cron/delivery.ts b/src/cron/delivery.ts index 9d502a74fcb..a65173e7254 100644 --- a/src/cron/delivery.ts +++ b/src/cron/delivery.ts @@ -1,19 +1,21 @@ import type { CliDeps } from "../cli/deps.js"; -import { createOutboundSendDeps } from "../cli/outbound-send-deps.js"; import type { CronFailureDestinationConfig } from "../config/types.cron.js"; import type { OpenClawConfig } from "../config/types.js"; +import type { CronDelivery, CronDeliveryMode, CronJob, CronMessageChannel } from "./types.js"; +import { createOutboundSendDeps } from "../cli/outbound-send-deps.js"; import { formatErrorMessage } from "../infra/errors.js"; import { deliverOutboundPayloads } from "../infra/outbound/deliver.js"; import { resolveAgentOutboundIdentity } from "../infra/outbound/identity.js"; import { buildOutboundSessionContext } from "../infra/outbound/session-context.js"; import { getChildLogger } from "../logging.js"; import { resolveDeliveryTarget } from "./isolated-agent/delivery-target.js"; -import type { CronDelivery, CronDeliveryMode, CronJob, CronMessageChannel } from "./types.js"; export type CronDeliveryPlan = { mode: CronDeliveryMode; channel?: CronMessageChannel; to?: string; + /** Explicit thread/topic target from the delivery config, if set. */ + threadId?: string | number; /** Explicit channel account id from the delivery config, if set. */ accountId?: string; source: "delivery" | "payload"; @@ -47,6 +49,17 @@ function normalizeAccountId(value: unknown): string | undefined { return trimmed ? trimmed : undefined; } +function normalizeThreadId(value: unknown): string | number | undefined { + if (typeof value === "number" && Number.isFinite(value)) { + return value; + } + if (typeof value !== "string") { + return undefined; + } + const trimmed = value.trim(); + return trimmed ? trimmed : undefined; +} + export function resolveCronDeliveryPlan(job: CronJob): CronDeliveryPlan { const payload = job.payload.kind === "agentTurn" ? job.payload : null; const delivery = job.delivery; @@ -70,6 +83,9 @@ export function resolveCronDeliveryPlan(job: CronJob): CronDeliveryPlan { (delivery as { channel?: unknown } | undefined)?.channel, ); const deliveryTo = normalizeTo((delivery as { to?: unknown } | undefined)?.to); + const deliveryThreadId = normalizeThreadId( + (delivery as { threadId?: unknown } | undefined)?.threadId, + ); const channel = deliveryChannel ?? payloadChannel ?? "last"; const to = deliveryTo ?? payloadTo; const deliveryAccountId = normalizeAccountId( @@ -81,6 +97,7 @@ export function resolveCronDeliveryPlan(job: CronJob): CronDeliveryPlan { mode: resolvedMode, channel: resolvedMode === "announce" ? channel : undefined, to, + threadId: deliveryThreadId, accountId: deliveryAccountId, source: "delivery", requested: resolvedMode === "announce", diff --git a/src/cron/isolated-agent.delivery-target-thread-session.test.ts b/src/cron/isolated-agent.delivery-target-thread-session.test.ts index a034d7ab924..54d16711ce4 100644 --- a/src/cron/isolated-agent.delivery-target-thread-session.test.ts +++ b/src/cron/isolated-agent.delivery-target-thread-session.test.ts @@ -149,4 +149,18 @@ describe("resolveDeliveryTarget thread session lookup", () => { expect(result.to).toBe("63448508"); expect(result.threadId).toBe(1008013); }); + + it("preserves explicit delivery.threadId when the target chat is plain telegram", async () => { + mockStore["/mock/store.json"] = {}; + + const result = await resolveDeliveryTarget(cfg, "main", { + channel: "telegram", + to: "63448508", + threadId: "15", + }); + + expect(result.to).toBe("63448508"); + expect(result.threadId).toBe("15"); + expect(result.channel).toBe("telegram"); + }); }); diff --git a/src/cron/isolated-agent.delivery.test-helpers.ts b/src/cron/isolated-agent.delivery.test-helpers.ts index de4caee3a3c..5e9ac4516b2 100644 --- a/src/cron/isolated-agent.delivery.test-helpers.ts +++ b/src/cron/isolated-agent.delivery.test-helpers.ts @@ -1,6 +1,6 @@ import { expect, vi } from "vitest"; -import { runEmbeddedPiAgent } from "../agents/pi-embedded.js"; import type { CliDeps } from "../cli/deps.js"; +import { runEmbeddedPiAgent } from "../agents/pi-embedded.js"; import { runCronIsolatedAgentTurn } from "./isolated-agent.js"; import { makeCfg, makeJob } from "./isolated-agent.test-harness.js"; @@ -52,6 +52,7 @@ export async function runTelegramAnnounceTurn(params: { mode: "announce"; channel: string; to?: string; + threadId?: string | number; bestEffort?: boolean; }; deliveryContract?: "cron-owned" | "shared"; diff --git a/src/cron/isolated-agent.direct-delivery-forum-topics.test.ts b/src/cron/isolated-agent.direct-delivery-forum-topics.test.ts index 836369fedb6..e429a07fe00 100644 --- a/src/cron/isolated-agent.direct-delivery-forum-topics.test.ts +++ b/src/cron/isolated-agent.direct-delivery-forum-topics.test.ts @@ -12,7 +12,7 @@ import { setupIsolatedAgentTurnMocks } from "./isolated-agent.test-setup.js"; describe("runCronIsolatedAgentTurn forum topic delivery", () => { beforeEach(() => { - setupIsolatedAgentTurnMocks(); + setupIsolatedAgentTurnMocks({ fast: true }); }); it("routes forum-topic and plain telegram targets through the correct delivery path", async () => { @@ -54,6 +54,30 @@ describe("runCronIsolatedAgentTurn forum topic delivery", () => { chatId: "123", text: "plain message", }); + + vi.clearAllMocks(); + mockAgentPayloads([{ text: "explicit thread message" }]); + + const explicitThreadRes = await runTelegramAnnounceTurn({ + home, + storePath, + deps, + delivery: { + mode: "announce", + channel: "telegram", + to: "123", + threadId: "15", + }, + }); + + expect(explicitThreadRes.status).toBe("ok"); + expect(explicitThreadRes.delivered).toBe(true); + expect(runSubagentAnnounceFlow).not.toHaveBeenCalled(); + expectDirectTelegramDelivery(deps, { + chatId: "123", + text: "explicit thread message", + messageThreadId: 15, + }); }); }); }); diff --git a/src/cron/isolated-agent/delivery-target.ts b/src/cron/isolated-agent/delivery-target.ts index 33bd80d4118..fb558371687 100644 --- a/src/cron/isolated-agent/delivery-target.ts +++ b/src/cron/isolated-agent/delivery-target.ts @@ -1,5 +1,6 @@ import type { ChannelId } from "../../channels/plugins/types.js"; import type { OpenClawConfig } from "../../config/config.js"; +import type { OutboundChannel } from "../../infra/outbound/targets.js"; import { loadSessionStore, resolveAgentMainSessionKey, @@ -7,7 +8,6 @@ import { } from "../../config/sessions.js"; import { resolveMessageChannelSelection } from "../../infra/outbound/channel-selection.js"; import { maybeResolveIdLikeTarget } from "../../infra/outbound/target-resolver.js"; -import type { OutboundChannel } from "../../infra/outbound/targets.js"; import { resolveOutboundTarget, resolveSessionDeliveryTarget, @@ -43,6 +43,7 @@ export async function resolveDeliveryTarget( jobPayload: { channel?: "last" | ChannelId; to?: string; + threadId?: string | number; /** Explicit accountId from job.delivery — overrides session-derived and binding-derived values. */ accountId?: string; sessionKey?: string; @@ -50,6 +51,8 @@ export async function resolveDeliveryTarget( ): Promise { const requestedChannel = typeof jobPayload.channel === "string" ? jobPayload.channel : "last"; const explicitTo = typeof jobPayload.to === "string" ? jobPayload.to : undefined; + const explicitThreadId = + jobPayload.threadId != null && jobPayload.threadId !== "" ? jobPayload.threadId : undefined; const allowMismatchedLastTo = requestedChannel === "last"; const sessionCfg = cfg.session; @@ -67,6 +70,7 @@ export async function resolveDeliveryTarget( entry: main, requestedChannel, explicitTo, + explicitThreadId, allowMismatchedLastTo, }); @@ -93,6 +97,7 @@ export async function resolveDeliveryTarget( entry: main, requestedChannel, explicitTo, + explicitThreadId, fallbackChannel, allowMismatchedLastTo, mode: preliminary.mode, diff --git a/src/cron/isolated-agent/run.ts b/src/cron/isolated-agent/run.ts index 4c7a5c87fe2..e3e50aaed95 100644 --- a/src/cron/isolated-agent/run.ts +++ b/src/cron/isolated-agent/run.ts @@ -1,3 +1,7 @@ +import type { CliDeps } from "../../cli/outbound-send-deps.js"; +import type { OpenClawConfig } from "../../config/config.js"; +import type { AgentDefaultsConfig } from "../../config/types.js"; +import type { CronJob, CronRunOutcome, CronRunTelemetry } from "../types.js"; import { resolveAgentConfig, resolveAgentDir, @@ -37,14 +41,11 @@ import { normalizeVerboseLevel, supportsXHighThinking, } from "../../auto-reply/thinking.js"; -import type { CliDeps } from "../../cli/outbound-send-deps.js"; -import type { OpenClawConfig } from "../../config/config.js"; import { resolveSessionTranscriptPath, setSessionRuntimeModel, updateSessionStore, } from "../../config/sessions.js"; -import type { AgentDefaultsConfig } from "../../config/types.js"; import { registerAgentRunContext } from "../../infra/agent-events.js"; import { logWarn } from "../../logger.js"; import { normalizeAgentId } from "../../routing/session-key.js"; @@ -55,7 +56,6 @@ import { isExternalHookSession, } from "../../security/external-content.js"; import { resolveCronDeliveryPlan } from "../delivery.js"; -import type { CronJob, CronRunOutcome, CronRunTelemetry } from "../types.js"; import { dispatchCronDelivery, matchesMessagingToolDeliveryTarget, @@ -173,6 +173,7 @@ async function resolveCronDeliveryContext(params: { const resolvedDelivery = await resolveDeliveryTarget(params.cfg, params.agentId, { channel: deliveryPlan.channel ?? "last", to: deliveryPlan.to, + threadId: deliveryPlan.threadId, accountId: deliveryPlan.accountId, sessionKey: params.job.sessionKey, }); diff --git a/src/cron/normalize.test.ts b/src/cron/normalize.test.ts index 6f34c85ebed..ab0c4332d36 100644 --- a/src/cron/normalize.test.ts +++ b/src/cron/normalize.test.ts @@ -248,6 +248,21 @@ describe("normalizeCronJobCreate", () => { expect(delivery.accountId).toBe("coordinator"); }); + it("normalizes delivery threadId and strips blanks", () => { + const normalized = normalizeIsolatedAgentTurnCreateJob({ + name: "delivery thread", + delivery: { + mode: "announce", + channel: "telegram", + to: "-1003816714067", + threadId: " 15 ", + }, + }); + + const delivery = normalized.delivery as Record; + expect(delivery.threadId).toBe("15"); + }); + it("strips empty accountId from delivery", () => { const normalized = normalizeIsolatedAgentTurnCreateJob({ name: "empty account", diff --git a/src/cron/normalize.ts b/src/cron/normalize.ts index 5a6c66ff356..ecf707f7e02 100644 --- a/src/cron/normalize.ts +++ b/src/cron/normalize.ts @@ -1,3 +1,4 @@ +import type { CronJobCreate, CronJobPatch } from "./types.js"; import { sanitizeAgentId } from "../routing/session-key.js"; import { isRecord } from "../utils.js"; import { normalizeLegacyDeliveryInput } from "./legacy-delivery.js"; @@ -5,7 +6,6 @@ import { parseAbsoluteTimeMs } from "./parse.js"; import { migrateLegacyCronPayload } from "./payload-migration.js"; import { inferLegacyName } from "./service/normalize.js"; import { normalizeCronStaggerMs, resolveDefaultCronStaggerMs } from "./stagger.js"; -import type { CronJobCreate, CronJobPatch } from "./types.js"; type UnknownRecord = Record; @@ -191,6 +191,18 @@ function coerceDelivery(delivery: UnknownRecord) { delete next.to; } } + if (typeof delivery.threadId === "number" && Number.isFinite(delivery.threadId)) { + next.threadId = delivery.threadId; + } else if (typeof delivery.threadId === "string") { + const trimmed = delivery.threadId.trim(); + if (trimmed) { + next.threadId = trimmed; + } else { + delete next.threadId; + } + } else if ("threadId" in next) { + delete next.threadId; + } if (typeof delivery.accountId === "string") { const trimmed = delivery.accountId.trim(); if (trimmed) { diff --git a/src/cron/service.jobs.test.ts b/src/cron/service.jobs.test.ts index 053ea8764de..df1d0e3081a 100644 --- a/src/cron/service.jobs.test.ts +++ b/src/cron/service.jobs.test.ts @@ -1,8 +1,8 @@ import { describe, expect, it } from "vitest"; -import { applyJobPatch, createJob } from "./service/jobs.js"; import type { CronServiceState } from "./service/state.js"; -import { DEFAULT_TOP_OF_HOUR_STAGGER_MS } from "./stagger.js"; import type { CronJob, CronJobPatch } from "./types.js"; +import { applyJobPatch, createJob } from "./service/jobs.js"; +import { DEFAULT_TOP_OF_HOUR_STAGGER_MS } from "./stagger.js"; function expectCronStaggerMs(job: CronJob, expected: number): void { expect(job.schedule.kind).toBe("cron"); @@ -144,6 +144,26 @@ describe("applyJobPatch", () => { expect(job.delivery?.accountId).toBeUndefined(); }); + it("merges delivery.threadId from patch and preserves existing", () => { + const job = createIsolatedAgentTurnJob("job-thread", { + mode: "announce", + channel: "telegram", + to: "-100123", + }); + + applyJobPatch(job, { delivery: { mode: "announce", threadId: " 15 " } }); + expect(job.delivery?.threadId).toBe("15"); + expect(job.delivery?.mode).toBe("announce"); + expect(job.delivery?.to).toBe("-100123"); + + applyJobPatch(job, { delivery: { mode: "announce", to: "-100999" } }); + expect(job.delivery?.threadId).toBe("15"); + expect(job.delivery?.to).toBe("-100999"); + + applyJobPatch(job, { delivery: { mode: "announce", threadId: "" } }); + expect(job.delivery?.threadId).toBeUndefined(); + }); + it("persists agentTurn payload.lightContext updates when editing existing jobs", () => { const job = createIsolatedAgentTurnJob("job-light-context", { mode: "announce", diff --git a/src/cron/service/jobs.ts b/src/cron/service/jobs.ts index 5579e5430f0..b7c8b47b60b 100644 --- a/src/cron/service/jobs.ts +++ b/src/cron/service/jobs.ts @@ -1,4 +1,15 @@ import crypto from "node:crypto"; +import type { + CronDelivery, + CronDeliveryPatch, + CronFailureAlert, + CronJob, + CronJobCreate, + CronJobPatch, + CronPayload, + CronPayloadPatch, +} from "../types.js"; +import type { CronServiceState } from "./state.js"; import { normalizeAgentId } from "../../routing/session-key.js"; import { parseAbsoluteTimeMs } from "../parse.js"; import { @@ -11,16 +22,6 @@ import { resolveCronStaggerMs, resolveDefaultCronStaggerMs, } from "../stagger.js"; -import type { - CronDelivery, - CronDeliveryPatch, - CronFailureAlert, - CronJob, - CronJobCreate, - CronJobPatch, - CronPayload, - CronPayloadPatch, -} from "../types.js"; import { normalizeHttpWebhookUrl } from "../webhook-url.js"; import { resolveInitialCronDelivery } from "./initial-delivery.js"; import { @@ -30,7 +31,6 @@ import { normalizePayloadToSystemText, normalizeRequiredName, } from "./normalize.js"; -import type { CronServiceState } from "./state.js"; const STUCK_RUN_MS = 2 * 60 * 60 * 1000; const STAGGER_OFFSET_CACHE_MAX = 4096; @@ -771,6 +771,13 @@ function normalizeOptionalTrimmedString(value: unknown): string | undefined { return trimmed ? trimmed : undefined; } +function normalizeOptionalThreadId(value: unknown): string | number | undefined { + if (typeof value === "number" && Number.isFinite(value)) { + return value; + } + return normalizeOptionalTrimmedString(value); +} + function mergeCronDelivery( existing: CronDelivery | undefined, patch: CronDeliveryPatch, @@ -779,6 +786,7 @@ function mergeCronDelivery( mode: existing?.mode ?? "none", channel: existing?.channel, to: existing?.to, + threadId: existing?.threadId, accountId: existing?.accountId, bestEffort: existing?.bestEffort, failureDestination: existing?.failureDestination, @@ -793,6 +801,9 @@ function mergeCronDelivery( if ("to" in patch) { next.to = normalizeOptionalTrimmedString(patch.to); } + if ("threadId" in patch) { + next.threadId = normalizeOptionalThreadId(patch.threadId); + } if ("accountId" in patch) { next.accountId = normalizeOptionalTrimmedString(patch.accountId); } diff --git a/src/cron/types.ts b/src/cron/types.ts index 2a93bc30311..615307cff4a 100644 --- a/src/cron/types.ts +++ b/src/cron/types.ts @@ -24,6 +24,8 @@ export type CronDelivery = { mode: CronDeliveryMode; channel?: CronMessageChannel; to?: string; + /** Explicit thread or topic target for threaded channels (for example Telegram forum topics). */ + threadId?: string | number; /** Explicit channel account id for multi-account setups (e.g. multiple Telegram bots). */ accountId?: string; bestEffort?: boolean; From 476a8d856d91d1813b0de149e098c1200bfcd92a Mon Sep 17 00:00:00 2001 From: Alex Alaniz Date: Thu, 12 Mar 2026 10:55:30 -0400 Subject: [PATCH 2/5] Cron: fix formatter ordering From 61e9499cc473b953dfe4ffe237b5fe3a0834f3bf Mon Sep 17 00:00:00 2001 From: Alex Alaniz Date: Thu, 12 Mar 2026 11:10:46 -0400 Subject: [PATCH 3/5] Cron: apply CI formatter output From 33dbdd724bea6b58e61e998b3a9f358ec085fd35 Mon Sep 17 00:00:00 2001 From: Alex Alaniz Date: Thu, 12 Mar 2026 11:17:51 -0400 Subject: [PATCH 4/5] Cron: align with CI formatter --- src/cron/delivery.test.ts | 2 +- src/cron/delivery.ts | 4 ++-- .../isolated-agent.delivery.test-helpers.ts | 2 +- src/cron/isolated-agent/delivery-target.ts | 2 +- src/cron/isolated-agent/run.ts | 8 +++---- src/cron/normalize.ts | 2 +- src/cron/service.jobs.test.ts | 4 ++-- src/cron/service/jobs.ts | 22 +++++++++---------- 8 files changed, 23 insertions(+), 23 deletions(-) diff --git a/src/cron/delivery.test.ts b/src/cron/delivery.test.ts index 56497476adc..970589f228e 100644 --- a/src/cron/delivery.test.ts +++ b/src/cron/delivery.test.ts @@ -1,6 +1,6 @@ import { describe, expect, it } from "vitest"; -import type { CronJob } from "./types.js"; import { resolveCronDeliveryPlan, resolveFailureDestination } from "./delivery.js"; +import type { CronJob } from "./types.js"; function makeJob(overrides: Partial): CronJob { const now = Date.now(); diff --git a/src/cron/delivery.ts b/src/cron/delivery.ts index a65173e7254..b71589d10ce 100644 --- a/src/cron/delivery.ts +++ b/src/cron/delivery.ts @@ -1,14 +1,14 @@ import type { CliDeps } from "../cli/deps.js"; +import { createOutboundSendDeps } from "../cli/outbound-send-deps.js"; import type { CronFailureDestinationConfig } from "../config/types.cron.js"; import type { OpenClawConfig } from "../config/types.js"; -import type { CronDelivery, CronDeliveryMode, CronJob, CronMessageChannel } from "./types.js"; -import { createOutboundSendDeps } from "../cli/outbound-send-deps.js"; import { formatErrorMessage } from "../infra/errors.js"; import { deliverOutboundPayloads } from "../infra/outbound/deliver.js"; import { resolveAgentOutboundIdentity } from "../infra/outbound/identity.js"; import { buildOutboundSessionContext } from "../infra/outbound/session-context.js"; import { getChildLogger } from "../logging.js"; import { resolveDeliveryTarget } from "./isolated-agent/delivery-target.js"; +import type { CronDelivery, CronDeliveryMode, CronJob, CronMessageChannel } from "./types.js"; export type CronDeliveryPlan = { mode: CronDeliveryMode; diff --git a/src/cron/isolated-agent.delivery.test-helpers.ts b/src/cron/isolated-agent.delivery.test-helpers.ts index 5e9ac4516b2..861d80ac7a9 100644 --- a/src/cron/isolated-agent.delivery.test-helpers.ts +++ b/src/cron/isolated-agent.delivery.test-helpers.ts @@ -1,6 +1,6 @@ import { expect, vi } from "vitest"; -import type { CliDeps } from "../cli/deps.js"; import { runEmbeddedPiAgent } from "../agents/pi-embedded.js"; +import type { CliDeps } from "../cli/deps.js"; import { runCronIsolatedAgentTurn } from "./isolated-agent.js"; import { makeCfg, makeJob } from "./isolated-agent.test-harness.js"; diff --git a/src/cron/isolated-agent/delivery-target.ts b/src/cron/isolated-agent/delivery-target.ts index fb558371687..ffc0f3dae5f 100644 --- a/src/cron/isolated-agent/delivery-target.ts +++ b/src/cron/isolated-agent/delivery-target.ts @@ -1,6 +1,5 @@ import type { ChannelId } from "../../channels/plugins/types.js"; import type { OpenClawConfig } from "../../config/config.js"; -import type { OutboundChannel } from "../../infra/outbound/targets.js"; import { loadSessionStore, resolveAgentMainSessionKey, @@ -8,6 +7,7 @@ import { } from "../../config/sessions.js"; import { resolveMessageChannelSelection } from "../../infra/outbound/channel-selection.js"; import { maybeResolveIdLikeTarget } from "../../infra/outbound/target-resolver.js"; +import type { OutboundChannel } from "../../infra/outbound/targets.js"; import { resolveOutboundTarget, resolveSessionDeliveryTarget, diff --git a/src/cron/isolated-agent/run.ts b/src/cron/isolated-agent/run.ts index e3e50aaed95..bca434b11ca 100644 --- a/src/cron/isolated-agent/run.ts +++ b/src/cron/isolated-agent/run.ts @@ -1,7 +1,3 @@ -import type { CliDeps } from "../../cli/outbound-send-deps.js"; -import type { OpenClawConfig } from "../../config/config.js"; -import type { AgentDefaultsConfig } from "../../config/types.js"; -import type { CronJob, CronRunOutcome, CronRunTelemetry } from "../types.js"; import { resolveAgentConfig, resolveAgentDir, @@ -41,11 +37,14 @@ import { normalizeVerboseLevel, supportsXHighThinking, } from "../../auto-reply/thinking.js"; +import type { CliDeps } from "../../cli/outbound-send-deps.js"; +import type { OpenClawConfig } from "../../config/config.js"; import { resolveSessionTranscriptPath, setSessionRuntimeModel, updateSessionStore, } from "../../config/sessions.js"; +import type { AgentDefaultsConfig } from "../../config/types.js"; import { registerAgentRunContext } from "../../infra/agent-events.js"; import { logWarn } from "../../logger.js"; import { normalizeAgentId } from "../../routing/session-key.js"; @@ -56,6 +55,7 @@ import { isExternalHookSession, } from "../../security/external-content.js"; import { resolveCronDeliveryPlan } from "../delivery.js"; +import type { CronJob, CronRunOutcome, CronRunTelemetry } from "../types.js"; import { dispatchCronDelivery, matchesMessagingToolDeliveryTarget, diff --git a/src/cron/normalize.ts b/src/cron/normalize.ts index ecf707f7e02..80cf11d5d76 100644 --- a/src/cron/normalize.ts +++ b/src/cron/normalize.ts @@ -1,4 +1,3 @@ -import type { CronJobCreate, CronJobPatch } from "./types.js"; import { sanitizeAgentId } from "../routing/session-key.js"; import { isRecord } from "../utils.js"; import { normalizeLegacyDeliveryInput } from "./legacy-delivery.js"; @@ -6,6 +5,7 @@ import { parseAbsoluteTimeMs } from "./parse.js"; import { migrateLegacyCronPayload } from "./payload-migration.js"; import { inferLegacyName } from "./service/normalize.js"; import { normalizeCronStaggerMs, resolveDefaultCronStaggerMs } from "./stagger.js"; +import type { CronJobCreate, CronJobPatch } from "./types.js"; type UnknownRecord = Record; diff --git a/src/cron/service.jobs.test.ts b/src/cron/service.jobs.test.ts index df1d0e3081a..aa15ddb5543 100644 --- a/src/cron/service.jobs.test.ts +++ b/src/cron/service.jobs.test.ts @@ -1,8 +1,8 @@ import { describe, expect, it } from "vitest"; -import type { CronServiceState } from "./service/state.js"; -import type { CronJob, CronJobPatch } from "./types.js"; import { applyJobPatch, createJob } from "./service/jobs.js"; +import type { CronServiceState } from "./service/state.js"; import { DEFAULT_TOP_OF_HOUR_STAGGER_MS } from "./stagger.js"; +import type { CronJob, CronJobPatch } from "./types.js"; function expectCronStaggerMs(job: CronJob, expected: number): void { expect(job.schedule.kind).toBe("cron"); diff --git a/src/cron/service/jobs.ts b/src/cron/service/jobs.ts index b7c8b47b60b..8d8128f7c12 100644 --- a/src/cron/service/jobs.ts +++ b/src/cron/service/jobs.ts @@ -1,15 +1,4 @@ import crypto from "node:crypto"; -import type { - CronDelivery, - CronDeliveryPatch, - CronFailureAlert, - CronJob, - CronJobCreate, - CronJobPatch, - CronPayload, - CronPayloadPatch, -} from "../types.js"; -import type { CronServiceState } from "./state.js"; import { normalizeAgentId } from "../../routing/session-key.js"; import { parseAbsoluteTimeMs } from "../parse.js"; import { @@ -22,6 +11,16 @@ import { resolveCronStaggerMs, resolveDefaultCronStaggerMs, } from "../stagger.js"; +import type { + CronDelivery, + CronDeliveryPatch, + CronFailureAlert, + CronJob, + CronJobCreate, + CronJobPatch, + CronPayload, + CronPayloadPatch, +} from "../types.js"; import { normalizeHttpWebhookUrl } from "../webhook-url.js"; import { resolveInitialCronDelivery } from "./initial-delivery.js"; import { @@ -31,6 +30,7 @@ import { normalizePayloadToSystemText, normalizeRequiredName, } from "./normalize.js"; +import type { CronServiceState } from "./state.js"; const STUCK_RUN_MS = 2 * 60 * 60 * 1000; const STAGGER_OFFSET_CACHE_MAX = 4096; From 656eb1ddb60cb9b22ba8cec6469747ea86851a3b Mon Sep 17 00:00:00 2001 From: Alex Alaniz Date: Thu, 12 Mar 2026 14:13:44 -0400 Subject: [PATCH 5/5] Cron: fix threadId failure fallback --- src/cron/delivery.test.ts | 27 +++++++++++++ src/cron/delivery.ts | 2 + src/gateway/protocol/schema/cron.ts | 1 + src/gateway/server.cron.test.ts | 63 +++++++++++++++++++++++++++++ 4 files changed, 93 insertions(+) diff --git a/src/cron/delivery.test.ts b/src/cron/delivery.test.ts index 970589f228e..1d2ffd0ac8d 100644 --- a/src/cron/delivery.test.ts +++ b/src/cron/delivery.test.ts @@ -166,6 +166,33 @@ describe("resolveFailureDestination", () => { expect(plan).toBeNull(); }); + it("keeps failure destinations for topic-routed announce jobs", () => { + const plan = resolveFailureDestination( + makeJob({ + delivery: { + mode: "announce", + channel: "telegram", + to: "-1003700845925", + threadId: "15", + accountId: "bot-a", + failureDestination: { + mode: "announce", + channel: "telegram", + to: "-1003700845925", + accountId: "bot-a", + }, + }, + }), + undefined, + ); + expect(plan).toEqual({ + mode: "announce", + channel: "telegram", + to: "-1003700845925", + accountId: "bot-a", + }); + }); + it("returns null when webhook failure destination matches the primary webhook target", () => { const plan = resolveFailureDestination( makeJob({ diff --git a/src/cron/delivery.ts b/src/cron/delivery.ts index b71589d10ce..6347bd87b82 100644 --- a/src/cron/delivery.ts +++ b/src/cron/delivery.ts @@ -237,6 +237,7 @@ function isSameDeliveryTarget( const primaryChannel = delivery.channel; const primaryTo = delivery.to; const primaryAccountId = delivery.accountId; + const primaryThreadId = normalizeThreadId(delivery.threadId); if (failurePlan.mode === "webhook") { return primaryMode === "webhook" && primaryTo === failurePlan.to; @@ -246,6 +247,7 @@ function isSameDeliveryTarget( const failureChannelNormalized = failurePlan.channel ?? "last"; return ( + primaryThreadId === undefined && failureChannelNormalized === primaryChannelNormalized && failurePlan.to === primaryTo && failurePlan.accountId === primaryAccountId diff --git a/src/gateway/protocol/schema/cron.ts b/src/gateway/protocol/schema/cron.ts index 3cba5a65781..a3e76ed0cb3 100644 --- a/src/gateway/protocol/schema/cron.ts +++ b/src/gateway/protocol/schema/cron.ts @@ -171,6 +171,7 @@ export const CronFailureDestinationSchema = Type.Object( const CronDeliverySharedProperties = { channel: Type.Optional(Type.Union([Type.Literal("last"), NonEmptyString])), + threadId: Type.Optional(Type.Union([Type.String(), Type.Number()])), accountId: Type.Optional(NonEmptyString), bestEffort: Type.Optional(Type.Boolean()), failureDestination: Type.Optional(CronFailureDestinationSchema), diff --git a/src/gateway/server.cron.test.ts b/src/gateway/server.cron.test.ts index 2590f63c23d..55908758acd 100644 --- a/src/gateway/server.cron.test.ts +++ b/src/gateway/server.cron.test.ts @@ -359,6 +359,69 @@ describe("gateway server cron", () => { expect(merged?.delivery?.channel).toBe("telegram"); expect(merged?.delivery?.to).toBe("19098680"); + const threadIdRes = await rpcReq(ws, "cron.add", { + name: "thread id roundtrip", + enabled: true, + schedule: { kind: "every", everyMs: 60_000 }, + sessionTarget: "isolated", + wakeMode: "next-heartbeat", + payload: { kind: "agentTurn", message: "hello" }, + delivery: { + mode: "announce", + channel: "telegram", + to: "-1003700845925", + threadId: 15, + }, + }); + expect(threadIdRes.ok).toBe(true); + const threadIdAdded = threadIdRes.payload as + | { + id?: unknown; + delivery?: { threadId?: unknown; mode?: unknown; channel?: unknown; to?: unknown }; + } + | undefined; + const threadIdJobId = typeof threadIdAdded?.id === "string" ? threadIdAdded.id : ""; + expect(threadIdJobId.length > 0).toBe(true); + expect(threadIdAdded?.delivery?.mode).toBe("announce"); + expect(threadIdAdded?.delivery?.channel).toBe("telegram"); + expect(threadIdAdded?.delivery?.to).toBe("-1003700845925"); + expect(threadIdAdded?.delivery?.threadId).toBe(15); + + const threadIdUpdateRes = await rpcReq(ws, "cron.update", { + id: threadIdJobId, + patch: { + delivery: { threadId: "16" }, + }, + }); + expect(threadIdUpdateRes.ok).toBe(true); + const threadIdUpdated = threadIdUpdateRes.payload as + | { + delivery?: { mode?: unknown; channel?: unknown; to?: unknown; threadId?: unknown }; + } + | undefined; + expect(threadIdUpdated?.delivery?.mode).toBe("announce"); + expect(threadIdUpdated?.delivery?.channel).toBe("telegram"); + expect(threadIdUpdated?.delivery?.to).toBe("-1003700845925"); + expect(threadIdUpdated?.delivery?.threadId).toBe("16"); + + const threadIdListRes = await rpcReq(ws, "cron.list", { + includeDisabled: true, + query: "thread id roundtrip", + }); + expect(threadIdListRes.ok).toBe(true); + const threadIdJobs = (threadIdListRes.payload as { jobs?: unknown } | null)?.jobs; + expect(Array.isArray(threadIdJobs)).toBe(true); + const threadIdListed = ( + threadIdJobs as Array<{ + id?: unknown; + delivery?: { threadId?: unknown; mode?: unknown; channel?: unknown; to?: unknown }; + }> + ).find((job) => job.id === threadIdJobId); + expect(threadIdListed?.delivery?.mode).toBe("announce"); + expect(threadIdListed?.delivery?.channel).toBe("telegram"); + expect(threadIdListed?.delivery?.to).toBe("-1003700845925"); + expect(threadIdListed?.delivery?.threadId).toBe("16"); + const modelOnlyPatchRes = await rpcReq(ws, "cron.update", { id: mergeJobId, patch: {