Cron: preserve delivery.threadId for topic delivery

This commit is contained in:
Alex Alaniz 2026-03-12 03:31:47 -04:00
parent 2d42588a18
commit ad88dd8c7e
12 changed files with 164 additions and 24 deletions

View File

@ -1,6 +1,6 @@
import { describe, expect, it } from "vitest";
import { resolveCronDeliveryPlan, resolveFailureDestination } from "./delivery.js";
import type { CronJob } from "./types.js";
import { resolveCronDeliveryPlan, resolveFailureDestination } from "./delivery.js";
function makeJob(overrides: Partial<CronJob>): CronJob {
const now = Date.now();
@ -84,6 +84,24 @@ describe("resolveCronDeliveryPlan", () => {
expect(plan.to).toBe("123");
expect(plan.accountId).toBe("bot-a");
});
it("threads delivery.threadId when explicitly configured", () => {
const plan = resolveCronDeliveryPlan(
makeJob({
delivery: {
mode: "announce",
channel: "telegram",
to: "-1003700845925",
threadId: " 15 ",
},
}),
);
expect(plan.mode).toBe("announce");
expect(plan.requested).toBe(true);
expect(plan.channel).toBe("telegram");
expect(plan.to).toBe("-1003700845925");
expect(plan.threadId).toBe("15");
});
});
describe("resolveFailureDestination", () => {

View File

@ -1,19 +1,21 @@
import type { CliDeps } from "../cli/deps.js";
import { createOutboundSendDeps } from "../cli/outbound-send-deps.js";
import type { CronFailureDestinationConfig } from "../config/types.cron.js";
import type { OpenClawConfig } from "../config/types.js";
import type { CronDelivery, CronDeliveryMode, CronJob, CronMessageChannel } from "./types.js";
import { createOutboundSendDeps } from "../cli/outbound-send-deps.js";
import { formatErrorMessage } from "../infra/errors.js";
import { deliverOutboundPayloads } from "../infra/outbound/deliver.js";
import { resolveAgentOutboundIdentity } from "../infra/outbound/identity.js";
import { buildOutboundSessionContext } from "../infra/outbound/session-context.js";
import { getChildLogger } from "../logging.js";
import { resolveDeliveryTarget } from "./isolated-agent/delivery-target.js";
import type { CronDelivery, CronDeliveryMode, CronJob, CronMessageChannel } from "./types.js";
export type CronDeliveryPlan = {
mode: CronDeliveryMode;
channel?: CronMessageChannel;
to?: string;
/** Explicit thread/topic target from the delivery config, if set. */
threadId?: string | number;
/** Explicit channel account id from the delivery config, if set. */
accountId?: string;
source: "delivery" | "payload";
@ -47,6 +49,17 @@ function normalizeAccountId(value: unknown): string | undefined {
return trimmed ? trimmed : undefined;
}
function normalizeThreadId(value: unknown): string | number | undefined {
if (typeof value === "number" && Number.isFinite(value)) {
return value;
}
if (typeof value !== "string") {
return undefined;
}
const trimmed = value.trim();
return trimmed ? trimmed : undefined;
}
export function resolveCronDeliveryPlan(job: CronJob): CronDeliveryPlan {
const payload = job.payload.kind === "agentTurn" ? job.payload : null;
const delivery = job.delivery;
@ -70,6 +83,9 @@ export function resolveCronDeliveryPlan(job: CronJob): CronDeliveryPlan {
(delivery as { channel?: unknown } | undefined)?.channel,
);
const deliveryTo = normalizeTo((delivery as { to?: unknown } | undefined)?.to);
const deliveryThreadId = normalizeThreadId(
(delivery as { threadId?: unknown } | undefined)?.threadId,
);
const channel = deliveryChannel ?? payloadChannel ?? "last";
const to = deliveryTo ?? payloadTo;
const deliveryAccountId = normalizeAccountId(
@ -81,6 +97,7 @@ export function resolveCronDeliveryPlan(job: CronJob): CronDeliveryPlan {
mode: resolvedMode,
channel: resolvedMode === "announce" ? channel : undefined,
to,
threadId: deliveryThreadId,
accountId: deliveryAccountId,
source: "delivery",
requested: resolvedMode === "announce",

View File

@ -149,4 +149,18 @@ describe("resolveDeliveryTarget thread session lookup", () => {
expect(result.to).toBe("63448508");
expect(result.threadId).toBe(1008013);
});
it("preserves explicit delivery.threadId when the target chat is plain telegram", async () => {
mockStore["/mock/store.json"] = {};
const result = await resolveDeliveryTarget(cfg, "main", {
channel: "telegram",
to: "63448508",
threadId: "15",
});
expect(result.to).toBe("63448508");
expect(result.threadId).toBe("15");
expect(result.channel).toBe("telegram");
});
});

View File

@ -1,6 +1,6 @@
import { expect, vi } from "vitest";
import { runEmbeddedPiAgent } from "../agents/pi-embedded.js";
import type { CliDeps } from "../cli/deps.js";
import { runEmbeddedPiAgent } from "../agents/pi-embedded.js";
import { runCronIsolatedAgentTurn } from "./isolated-agent.js";
import { makeCfg, makeJob } from "./isolated-agent.test-harness.js";
@ -52,6 +52,7 @@ export async function runTelegramAnnounceTurn(params: {
mode: "announce";
channel: string;
to?: string;
threadId?: string | number;
bestEffort?: boolean;
};
deliveryContract?: "cron-owned" | "shared";

View File

@ -12,7 +12,7 @@ import { setupIsolatedAgentTurnMocks } from "./isolated-agent.test-setup.js";
describe("runCronIsolatedAgentTurn forum topic delivery", () => {
beforeEach(() => {
setupIsolatedAgentTurnMocks();
setupIsolatedAgentTurnMocks({ fast: true });
});
it("routes forum-topic and plain telegram targets through the correct delivery path", async () => {
@ -54,6 +54,30 @@ describe("runCronIsolatedAgentTurn forum topic delivery", () => {
chatId: "123",
text: "plain message",
});
vi.clearAllMocks();
mockAgentPayloads([{ text: "explicit thread message" }]);
const explicitThreadRes = await runTelegramAnnounceTurn({
home,
storePath,
deps,
delivery: {
mode: "announce",
channel: "telegram",
to: "123",
threadId: "15",
},
});
expect(explicitThreadRes.status).toBe("ok");
expect(explicitThreadRes.delivered).toBe(true);
expect(runSubagentAnnounceFlow).not.toHaveBeenCalled();
expectDirectTelegramDelivery(deps, {
chatId: "123",
text: "explicit thread message",
messageThreadId: 15,
});
});
});
});

View File

@ -1,5 +1,6 @@
import type { ChannelId } from "../../channels/plugins/types.js";
import type { OpenClawConfig } from "../../config/config.js";
import type { OutboundChannel } from "../../infra/outbound/targets.js";
import {
loadSessionStore,
resolveAgentMainSessionKey,
@ -7,7 +8,6 @@ import {
} from "../../config/sessions.js";
import { resolveMessageChannelSelection } from "../../infra/outbound/channel-selection.js";
import { maybeResolveIdLikeTarget } from "../../infra/outbound/target-resolver.js";
import type { OutboundChannel } from "../../infra/outbound/targets.js";
import {
resolveOutboundTarget,
resolveSessionDeliveryTarget,
@ -43,6 +43,7 @@ export async function resolveDeliveryTarget(
jobPayload: {
channel?: "last" | ChannelId;
to?: string;
threadId?: string | number;
/** Explicit accountId from job.delivery — overrides session-derived and binding-derived values. */
accountId?: string;
sessionKey?: string;
@ -50,6 +51,8 @@ export async function resolveDeliveryTarget(
): Promise<DeliveryTargetResolution> {
const requestedChannel = typeof jobPayload.channel === "string" ? jobPayload.channel : "last";
const explicitTo = typeof jobPayload.to === "string" ? jobPayload.to : undefined;
const explicitThreadId =
jobPayload.threadId != null && jobPayload.threadId !== "" ? jobPayload.threadId : undefined;
const allowMismatchedLastTo = requestedChannel === "last";
const sessionCfg = cfg.session;
@ -67,6 +70,7 @@ export async function resolveDeliveryTarget(
entry: main,
requestedChannel,
explicitTo,
explicitThreadId,
allowMismatchedLastTo,
});
@ -93,6 +97,7 @@ export async function resolveDeliveryTarget(
entry: main,
requestedChannel,
explicitTo,
explicitThreadId,
fallbackChannel,
allowMismatchedLastTo,
mode: preliminary.mode,

View File

@ -1,3 +1,7 @@
import type { CliDeps } from "../../cli/outbound-send-deps.js";
import type { OpenClawConfig } from "../../config/config.js";
import type { AgentDefaultsConfig } from "../../config/types.js";
import type { CronJob, CronRunOutcome, CronRunTelemetry } from "../types.js";
import {
resolveAgentConfig,
resolveAgentDir,
@ -37,14 +41,11 @@ import {
normalizeVerboseLevel,
supportsXHighThinking,
} from "../../auto-reply/thinking.js";
import type { CliDeps } from "../../cli/outbound-send-deps.js";
import type { OpenClawConfig } from "../../config/config.js";
import {
resolveSessionTranscriptPath,
setSessionRuntimeModel,
updateSessionStore,
} from "../../config/sessions.js";
import type { AgentDefaultsConfig } from "../../config/types.js";
import { registerAgentRunContext } from "../../infra/agent-events.js";
import { logWarn } from "../../logger.js";
import { normalizeAgentId } from "../../routing/session-key.js";
@ -55,7 +56,6 @@ import {
isExternalHookSession,
} from "../../security/external-content.js";
import { resolveCronDeliveryPlan } from "../delivery.js";
import type { CronJob, CronRunOutcome, CronRunTelemetry } from "../types.js";
import {
dispatchCronDelivery,
matchesMessagingToolDeliveryTarget,
@ -173,6 +173,7 @@ async function resolveCronDeliveryContext(params: {
const resolvedDelivery = await resolveDeliveryTarget(params.cfg, params.agentId, {
channel: deliveryPlan.channel ?? "last",
to: deliveryPlan.to,
threadId: deliveryPlan.threadId,
accountId: deliveryPlan.accountId,
sessionKey: params.job.sessionKey,
});

View File

@ -248,6 +248,21 @@ describe("normalizeCronJobCreate", () => {
expect(delivery.accountId).toBe("coordinator");
});
it("normalizes delivery threadId and strips blanks", () => {
const normalized = normalizeIsolatedAgentTurnCreateJob({
name: "delivery thread",
delivery: {
mode: "announce",
channel: "telegram",
to: "-1003816714067",
threadId: " 15 ",
},
});
const delivery = normalized.delivery as Record<string, unknown>;
expect(delivery.threadId).toBe("15");
});
it("strips empty accountId from delivery", () => {
const normalized = normalizeIsolatedAgentTurnCreateJob({
name: "empty account",

View File

@ -1,3 +1,4 @@
import type { CronJobCreate, CronJobPatch } from "./types.js";
import { sanitizeAgentId } from "../routing/session-key.js";
import { isRecord } from "../utils.js";
import { normalizeLegacyDeliveryInput } from "./legacy-delivery.js";
@ -5,7 +6,6 @@ import { parseAbsoluteTimeMs } from "./parse.js";
import { migrateLegacyCronPayload } from "./payload-migration.js";
import { inferLegacyName } from "./service/normalize.js";
import { normalizeCronStaggerMs, resolveDefaultCronStaggerMs } from "./stagger.js";
import type { CronJobCreate, CronJobPatch } from "./types.js";
type UnknownRecord = Record<string, unknown>;
@ -191,6 +191,18 @@ function coerceDelivery(delivery: UnknownRecord) {
delete next.to;
}
}
if (typeof delivery.threadId === "number" && Number.isFinite(delivery.threadId)) {
next.threadId = delivery.threadId;
} else if (typeof delivery.threadId === "string") {
const trimmed = delivery.threadId.trim();
if (trimmed) {
next.threadId = trimmed;
} else {
delete next.threadId;
}
} else if ("threadId" in next) {
delete next.threadId;
}
if (typeof delivery.accountId === "string") {
const trimmed = delivery.accountId.trim();
if (trimmed) {

View File

@ -1,8 +1,8 @@
import { describe, expect, it } from "vitest";
import { applyJobPatch, createJob } from "./service/jobs.js";
import type { CronServiceState } from "./service/state.js";
import { DEFAULT_TOP_OF_HOUR_STAGGER_MS } from "./stagger.js";
import type { CronJob, CronJobPatch } from "./types.js";
import { applyJobPatch, createJob } from "./service/jobs.js";
import { DEFAULT_TOP_OF_HOUR_STAGGER_MS } from "./stagger.js";
function expectCronStaggerMs(job: CronJob, expected: number): void {
expect(job.schedule.kind).toBe("cron");
@ -144,6 +144,26 @@ describe("applyJobPatch", () => {
expect(job.delivery?.accountId).toBeUndefined();
});
it("merges delivery.threadId from patch and preserves existing", () => {
const job = createIsolatedAgentTurnJob("job-thread", {
mode: "announce",
channel: "telegram",
to: "-100123",
});
applyJobPatch(job, { delivery: { mode: "announce", threadId: " 15 " } });
expect(job.delivery?.threadId).toBe("15");
expect(job.delivery?.mode).toBe("announce");
expect(job.delivery?.to).toBe("-100123");
applyJobPatch(job, { delivery: { mode: "announce", to: "-100999" } });
expect(job.delivery?.threadId).toBe("15");
expect(job.delivery?.to).toBe("-100999");
applyJobPatch(job, { delivery: { mode: "announce", threadId: "" } });
expect(job.delivery?.threadId).toBeUndefined();
});
it("persists agentTurn payload.lightContext updates when editing existing jobs", () => {
const job = createIsolatedAgentTurnJob("job-light-context", {
mode: "announce",

View File

@ -1,4 +1,15 @@
import crypto from "node:crypto";
import type {
CronDelivery,
CronDeliveryPatch,
CronFailureAlert,
CronJob,
CronJobCreate,
CronJobPatch,
CronPayload,
CronPayloadPatch,
} from "../types.js";
import type { CronServiceState } from "./state.js";
import { normalizeAgentId } from "../../routing/session-key.js";
import { parseAbsoluteTimeMs } from "../parse.js";
import {
@ -11,16 +22,6 @@ import {
resolveCronStaggerMs,
resolveDefaultCronStaggerMs,
} from "../stagger.js";
import type {
CronDelivery,
CronDeliveryPatch,
CronFailureAlert,
CronJob,
CronJobCreate,
CronJobPatch,
CronPayload,
CronPayloadPatch,
} from "../types.js";
import { normalizeHttpWebhookUrl } from "../webhook-url.js";
import { resolveInitialCronDelivery } from "./initial-delivery.js";
import {
@ -30,7 +31,6 @@ import {
normalizePayloadToSystemText,
normalizeRequiredName,
} from "./normalize.js";
import type { CronServiceState } from "./state.js";
const STUCK_RUN_MS = 2 * 60 * 60 * 1000;
const STAGGER_OFFSET_CACHE_MAX = 4096;
@ -771,6 +771,13 @@ function normalizeOptionalTrimmedString(value: unknown): string | undefined {
return trimmed ? trimmed : undefined;
}
function normalizeOptionalThreadId(value: unknown): string | number | undefined {
if (typeof value === "number" && Number.isFinite(value)) {
return value;
}
return normalizeOptionalTrimmedString(value);
}
function mergeCronDelivery(
existing: CronDelivery | undefined,
patch: CronDeliveryPatch,
@ -779,6 +786,7 @@ function mergeCronDelivery(
mode: existing?.mode ?? "none",
channel: existing?.channel,
to: existing?.to,
threadId: existing?.threadId,
accountId: existing?.accountId,
bestEffort: existing?.bestEffort,
failureDestination: existing?.failureDestination,
@ -793,6 +801,9 @@ function mergeCronDelivery(
if ("to" in patch) {
next.to = normalizeOptionalTrimmedString(patch.to);
}
if ("threadId" in patch) {
next.threadId = normalizeOptionalThreadId(patch.threadId);
}
if ("accountId" in patch) {
next.accountId = normalizeOptionalTrimmedString(patch.accountId);
}

View File

@ -24,6 +24,8 @@ export type CronDelivery = {
mode: CronDeliveryMode;
channel?: CronMessageChannel;
to?: string;
/** Explicit thread or topic target for threaded channels (for example Telegram forum topics). */
threadId?: string | number;
/** Explicit channel account id for multi-account setups (e.g. multiple Telegram bots). */
accountId?: string;
bestEffort?: boolean;