Merge 656eb1ddb60cb9b22ba8cec6469747ea86851a3b into 5e417b44e1540f528d2ae63e3e20229a902d1db2

This commit is contained in:
Alex Alaniz 2026-03-20 22:01:41 -04:00 committed by GitHub
commit 2502fb676e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 234 additions and 1 deletions

View File

@ -84,6 +84,24 @@ describe("resolveCronDeliveryPlan", () => {
expect(plan.to).toBe("123");
expect(plan.accountId).toBe("bot-a");
});
it("threads delivery.threadId when explicitly configured", () => {
const plan = resolveCronDeliveryPlan(
makeJob({
delivery: {
mode: "announce",
channel: "telegram",
to: "-1003700845925",
threadId: " 15 ",
},
}),
);
expect(plan.mode).toBe("announce");
expect(plan.requested).toBe(true);
expect(plan.channel).toBe("telegram");
expect(plan.to).toBe("-1003700845925");
expect(plan.threadId).toBe("15");
});
});
describe("resolveFailureDestination", () => {
@ -148,6 +166,33 @@ describe("resolveFailureDestination", () => {
expect(plan).toBeNull();
});
it("keeps failure destinations for topic-routed announce jobs", () => {
const plan = resolveFailureDestination(
makeJob({
delivery: {
mode: "announce",
channel: "telegram",
to: "-1003700845925",
threadId: "15",
accountId: "bot-a",
failureDestination: {
mode: "announce",
channel: "telegram",
to: "-1003700845925",
accountId: "bot-a",
},
},
}),
undefined,
);
expect(plan).toEqual({
mode: "announce",
channel: "telegram",
to: "-1003700845925",
accountId: "bot-a",
});
});
it("returns null when webhook failure destination matches the primary webhook target", () => {
const plan = resolveFailureDestination(
makeJob({

View File

@ -14,6 +14,8 @@ export type CronDeliveryPlan = {
mode: CronDeliveryMode;
channel?: CronMessageChannel;
to?: string;
/** Explicit thread/topic target from the delivery config, if set. */
threadId?: string | number;
/** Explicit channel account id from the delivery config, if set. */
accountId?: string;
source: "delivery" | "payload";
@ -47,6 +49,17 @@ function normalizeAccountId(value: unknown): string | undefined {
return trimmed ? trimmed : undefined;
}
function normalizeThreadId(value: unknown): string | number | undefined {
if (typeof value === "number" && Number.isFinite(value)) {
return value;
}
if (typeof value !== "string") {
return undefined;
}
const trimmed = value.trim();
return trimmed ? trimmed : undefined;
}
export function resolveCronDeliveryPlan(job: CronJob): CronDeliveryPlan {
const payload = job.payload.kind === "agentTurn" ? job.payload : null;
const delivery = job.delivery;
@ -70,6 +83,9 @@ export function resolveCronDeliveryPlan(job: CronJob): CronDeliveryPlan {
(delivery as { channel?: unknown } | undefined)?.channel,
);
const deliveryTo = normalizeTo((delivery as { to?: unknown } | undefined)?.to);
const deliveryThreadId = normalizeThreadId(
(delivery as { threadId?: unknown } | undefined)?.threadId,
);
const channel = deliveryChannel ?? payloadChannel ?? "last";
const to = deliveryTo ?? payloadTo;
const deliveryAccountId = normalizeAccountId(
@ -81,6 +97,7 @@ export function resolveCronDeliveryPlan(job: CronJob): CronDeliveryPlan {
mode: resolvedMode,
channel: resolvedMode === "announce" ? channel : undefined,
to,
threadId: deliveryThreadId,
accountId: deliveryAccountId,
source: "delivery",
requested: resolvedMode === "announce",
@ -220,6 +237,7 @@ function isSameDeliveryTarget(
const primaryChannel = delivery.channel;
const primaryTo = delivery.to;
const primaryAccountId = delivery.accountId;
const primaryThreadId = normalizeThreadId(delivery.threadId);
if (failurePlan.mode === "webhook") {
return primaryMode === "webhook" && primaryTo === failurePlan.to;
@ -229,6 +247,7 @@ function isSameDeliveryTarget(
const failureChannelNormalized = failurePlan.channel ?? "last";
return (
primaryThreadId === undefined &&
failureChannelNormalized === primaryChannelNormalized &&
failurePlan.to === primaryTo &&
failurePlan.accountId === primaryAccountId

View File

@ -170,4 +170,18 @@ describe("resolveDeliveryTarget thread session lookup", () => {
expect(result.to).toBe("63448508");
expect(result.threadId).toBe(1008013);
});
it("preserves explicit delivery.threadId when the target chat is plain telegram", async () => {
mockStore["/mock/store.json"] = {};
const result = await resolveDeliveryTarget(cfg, "main", {
channel: "telegram",
to: "63448508",
threadId: "15",
});
expect(result.to).toBe("63448508");
expect(result.threadId).toBe("15");
expect(result.channel).toBe("telegram");
});
});

View File

@ -54,6 +54,7 @@ export async function runTelegramAnnounceTurn(params: {
mode: "announce";
channel: string;
to?: string;
threadId?: string | number;
bestEffort?: boolean;
};
deliveryContract?: "cron-owned" | "shared";

View File

@ -12,7 +12,7 @@ import { setupIsolatedAgentTurnMocks } from "./isolated-agent.test-setup.js";
describe("runCronIsolatedAgentTurn forum topic delivery", () => {
beforeEach(() => {
setupIsolatedAgentTurnMocks();
setupIsolatedAgentTurnMocks({ fast: true });
});
it("routes forum-topic telegram targets through the correct delivery path", async () => {
@ -59,6 +59,30 @@ describe("runCronIsolatedAgentTurn forum topic delivery", () => {
chatId: "123",
text: "plain message",
});
vi.clearAllMocks();
mockAgentPayloads([{ text: "explicit thread message" }]);
const explicitThreadRes = await runTelegramAnnounceTurn({
home,
storePath,
deps,
delivery: {
mode: "announce",
channel: "telegram",
to: "123",
threadId: "15",
},
});
expect(explicitThreadRes.status).toBe("ok");
expect(explicitThreadRes.delivered).toBe(true);
expect(runSubagentAnnounceFlow).not.toHaveBeenCalled();
expectDirectTelegramDelivery(deps, {
chatId: "123",
text: "explicit thread message",
messageThreadId: 15,
});
});
});
});

View File

@ -43,6 +43,7 @@ export async function resolveDeliveryTarget(
jobPayload: {
channel?: "last" | ChannelId;
to?: string;
threadId?: string | number;
/** Explicit accountId from job.delivery — overrides session-derived and binding-derived values. */
accountId?: string;
sessionKey?: string;
@ -50,6 +51,8 @@ export async function resolveDeliveryTarget(
): Promise<DeliveryTargetResolution> {
const requestedChannel = typeof jobPayload.channel === "string" ? jobPayload.channel : "last";
const explicitTo = typeof jobPayload.to === "string" ? jobPayload.to : undefined;
const explicitThreadId =
jobPayload.threadId != null && jobPayload.threadId !== "" ? jobPayload.threadId : undefined;
const allowMismatchedLastTo = requestedChannel === "last";
const sessionCfg = cfg.session;
@ -67,6 +70,7 @@ export async function resolveDeliveryTarget(
entry: main,
requestedChannel,
explicitTo,
explicitThreadId,
allowMismatchedLastTo,
});
@ -93,6 +97,7 @@ export async function resolveDeliveryTarget(
entry: main,
requestedChannel,
explicitTo,
explicitThreadId,
fallbackChannel,
allowMismatchedLastTo,
mode: preliminary.mode,

View File

@ -201,6 +201,7 @@ async function resolveCronDeliveryContext(params: {
const resolvedDelivery = await resolveDeliveryTarget(params.cfg, params.agentId, {
channel: deliveryPlan.channel ?? "last",
to: deliveryPlan.to,
threadId: deliveryPlan.threadId,
accountId: deliveryPlan.accountId,
sessionKey: params.job.sessionKey,
});

View File

@ -248,6 +248,21 @@ describe("normalizeCronJobCreate", () => {
expect(delivery.accountId).toBe("coordinator");
});
it("normalizes delivery threadId and strips blanks", () => {
const normalized = normalizeIsolatedAgentTurnCreateJob({
name: "delivery thread",
delivery: {
mode: "announce",
channel: "telegram",
to: "-1003816714067",
threadId: " 15 ",
},
});
const delivery = normalized.delivery as Record<string, unknown>;
expect(delivery.threadId).toBe("15");
});
it("strips empty accountId from delivery", () => {
const normalized = normalizeIsolatedAgentTurnCreateJob({
name: "empty account",

View File

@ -193,6 +193,18 @@ function coerceDelivery(delivery: UnknownRecord) {
delete next.to;
}
}
if (typeof delivery.threadId === "number" && Number.isFinite(delivery.threadId)) {
next.threadId = delivery.threadId;
} else if (typeof delivery.threadId === "string") {
const trimmed = delivery.threadId.trim();
if (trimmed) {
next.threadId = trimmed;
} else {
delete next.threadId;
}
} else if ("threadId" in next) {
delete next.threadId;
}
if (typeof delivery.accountId === "string") {
const trimmed = delivery.accountId.trim();
if (trimmed) {

View File

@ -167,6 +167,26 @@ describe("applyJobPatch", () => {
expect(job.delivery?.accountId).toBeUndefined();
});
it("merges delivery.threadId from patch and preserves existing", () => {
const job = createIsolatedAgentTurnJob("job-thread", {
mode: "announce",
channel: "telegram",
to: "-100123",
});
applyJobPatch(job, { delivery: { mode: "announce", threadId: " 15 " } });
expect(job.delivery?.threadId).toBe("15");
expect(job.delivery?.mode).toBe("announce");
expect(job.delivery?.to).toBe("-100123");
applyJobPatch(job, { delivery: { mode: "announce", to: "-100999" } });
expect(job.delivery?.threadId).toBe("15");
expect(job.delivery?.to).toBe("-100999");
applyJobPatch(job, { delivery: { mode: "announce", threadId: "" } });
expect(job.delivery?.threadId).toBeUndefined();
});
it("persists agentTurn payload.lightContext updates when editing existing jobs", () => {
const job = createIsolatedAgentTurnJob("job-light-context", {
mode: "announce",

View File

@ -780,6 +780,13 @@ function normalizeOptionalTrimmedString(value: unknown): string | undefined {
return trimmed ? trimmed : undefined;
}
function normalizeOptionalThreadId(value: unknown): string | number | undefined {
if (typeof value === "number" && Number.isFinite(value)) {
return value;
}
return normalizeOptionalTrimmedString(value);
}
function mergeCronDelivery(
existing: CronDelivery | undefined,
patch: CronDeliveryPatch,
@ -788,6 +795,7 @@ function mergeCronDelivery(
mode: existing?.mode ?? "none",
channel: existing?.channel,
to: existing?.to,
threadId: existing?.threadId,
accountId: existing?.accountId,
bestEffort: existing?.bestEffort,
failureDestination: existing?.failureDestination,
@ -802,6 +810,9 @@ function mergeCronDelivery(
if ("to" in patch) {
next.to = normalizeOptionalTrimmedString(patch.to);
}
if ("threadId" in patch) {
next.threadId = normalizeOptionalThreadId(patch.threadId);
}
if ("accountId" in patch) {
next.accountId = normalizeOptionalTrimmedString(patch.accountId);
}

View File

@ -24,6 +24,8 @@ export type CronDelivery = {
mode: CronDeliveryMode;
channel?: CronMessageChannel;
to?: string;
/** Explicit thread or topic target for threaded channels (for example Telegram forum topics). */
threadId?: string | number;
/** Explicit channel account id for multi-account setups (e.g. multiple Telegram bots). */
accountId?: string;
bestEffort?: boolean;

View File

@ -176,6 +176,7 @@ export const CronFailureDestinationSchema = Type.Object(
const CronDeliverySharedProperties = {
channel: Type.Optional(Type.Union([Type.Literal("last"), NonEmptyString])),
threadId: Type.Optional(Type.Union([Type.String(), Type.Number()])),
accountId: Type.Optional(NonEmptyString),
bestEffort: Type.Optional(Type.Boolean()),
failureDestination: Type.Optional(CronFailureDestinationSchema),

View File

@ -375,6 +375,69 @@ describe("gateway server cron", () => {
expect(merged?.delivery?.channel).toBe("telegram");
expect(merged?.delivery?.to).toBe("19098680");
const threadIdRes = await rpcReq(ws, "cron.add", {
name: "thread id roundtrip",
enabled: true,
schedule: { kind: "every", everyMs: 60_000 },
sessionTarget: "isolated",
wakeMode: "next-heartbeat",
payload: { kind: "agentTurn", message: "hello" },
delivery: {
mode: "announce",
channel: "telegram",
to: "-1003700845925",
threadId: 15,
},
});
expect(threadIdRes.ok).toBe(true);
const threadIdAdded = threadIdRes.payload as
| {
id?: unknown;
delivery?: { threadId?: unknown; mode?: unknown; channel?: unknown; to?: unknown };
}
| undefined;
const threadIdJobId = typeof threadIdAdded?.id === "string" ? threadIdAdded.id : "";
expect(threadIdJobId.length > 0).toBe(true);
expect(threadIdAdded?.delivery?.mode).toBe("announce");
expect(threadIdAdded?.delivery?.channel).toBe("telegram");
expect(threadIdAdded?.delivery?.to).toBe("-1003700845925");
expect(threadIdAdded?.delivery?.threadId).toBe(15);
const threadIdUpdateRes = await rpcReq(ws, "cron.update", {
id: threadIdJobId,
patch: {
delivery: { threadId: "16" },
},
});
expect(threadIdUpdateRes.ok).toBe(true);
const threadIdUpdated = threadIdUpdateRes.payload as
| {
delivery?: { mode?: unknown; channel?: unknown; to?: unknown; threadId?: unknown };
}
| undefined;
expect(threadIdUpdated?.delivery?.mode).toBe("announce");
expect(threadIdUpdated?.delivery?.channel).toBe("telegram");
expect(threadIdUpdated?.delivery?.to).toBe("-1003700845925");
expect(threadIdUpdated?.delivery?.threadId).toBe("16");
const threadIdListRes = await rpcReq(ws, "cron.list", {
includeDisabled: true,
query: "thread id roundtrip",
});
expect(threadIdListRes.ok).toBe(true);
const threadIdJobs = (threadIdListRes.payload as { jobs?: unknown } | null)?.jobs;
expect(Array.isArray(threadIdJobs)).toBe(true);
const threadIdListed = (
threadIdJobs as Array<{
id?: unknown;
delivery?: { threadId?: unknown; mode?: unknown; channel?: unknown; to?: unknown };
}>
).find((job) => job.id === threadIdJobId);
expect(threadIdListed?.delivery?.mode).toBe("announce");
expect(threadIdListed?.delivery?.channel).toBe("telegram");
expect(threadIdListed?.delivery?.to).toBe("-1003700845925");
expect(threadIdListed?.delivery?.threadId).toBe("16");
const modelOnlyPatchRes = await rpcReq(ws, "cron.update", {
id: mergeJobId,
patch: {