diff --git a/extensions/telegram/src/outbound-adapter.ts b/extensions/telegram/src/outbound-adapter.ts index b500fb870cf..f5430eda679 100644 --- a/extensions/telegram/src/outbound-adapter.ts +++ b/extensions/telegram/src/outbound-adapter.ts @@ -10,6 +10,7 @@ import { sendPayloadMediaSequenceOrFallback, } from "openclaw/plugin-sdk/reply-payload"; import type { ReplyPayload } from "openclaw/plugin-sdk/reply-runtime"; +import { markReplyApplied } from "../../../src/infra/outbound/reply-applied.js"; import type { TelegramInlineButtons } from "./button-types.js"; import { resolveTelegramInlineButtons } from "./button-types.js"; import { markdownToTelegramHtmlChunks } from "./format.js"; @@ -21,6 +22,13 @@ export const TELEGRAM_TEXT_CHUNK_LIMIT = 4000; type TelegramSendFn = typeof sendMessageTelegram; type TelegramSendOpts = Parameters[2]; +function attachReplyAppliedMarker( + result: T, + baseOpts: { replyToMessageId?: number }, +) { + return markReplyApplied(result, baseOpts.replyToMessageId !== undefined); +} + function resolveTelegramSendContext(params: { cfg: NonNullable["cfg"]; deps?: OutboundSendDeps; @@ -116,9 +124,10 @@ export const telegramOutbound: ChannelOutboundAdapter = { replyToId, threadId, }); - return await send(to, text, { + const result = await send(to, text, { ...baseOpts, }); + return attachReplyAppliedMarker(result, baseOpts); }, sendMedia: async ({ cfg, @@ -145,6 +154,7 @@ export const telegramOutbound: ChannelOutboundAdapter = { mediaLocalRoots, forceDocument: forceDocument ?? false, }); + return attachReplyAppliedMarker(result, baseOpts); }, }), sendPayload: async ({ @@ -175,6 +185,6 @@ export const telegramOutbound: ChannelOutboundAdapter = { forceDocument: forceDocument ?? false, }, }); - return attachChannelToResult("telegram", result); + return attachReplyAppliedMarker(attachChannelToResult("telegram", result), baseOpts); }, }; diff --git a/src/infra/outbound/deliver.greptile-fixes.test.ts b/src/infra/outbound/deliver.greptile-fixes.test.ts index 9b169570ffb..1fec54cfd8d 100644 --- a/src/infra/outbound/deliver.greptile-fixes.test.ts +++ b/src/infra/outbound/deliver.greptile-fixes.test.ts @@ -356,6 +356,78 @@ describe("deliverOutboundPayloads Greptile fixes", () => { expect(results).toHaveLength(2); }); + it("does not consume inherited telegram reply state after an invalid payload-level text override", async () => { + const sendTelegram = vi + .fn() + .mockResolvedValueOnce({ messageId: "tg-1", chatId: "chat-1" }) + .mockResolvedValueOnce({ messageId: "tg-2", chatId: "chat-1" }); + const cfg: OpenClawConfig = { + channels: { telegram: { botToken: "tok-1", textChunkLimit: 4000 } }, + }; + + const results = await deliverOutboundPayloads({ + cfg, + channel: "telegram", + to: "123", + payloads: [{ text: "first", replyToId: "not-a-number" }, { text: "second" }], + replyToId: "777", + deps: { sendTelegram }, + skipQueue: true, + }); + + const firstOpts = sendTelegram.mock.calls[0]?.[2] as { replyToMessageId?: number } | undefined; + const secondOpts = sendTelegram.mock.calls[1]?.[2] as { replyToMessageId?: number } | undefined; + + expect(sendTelegram).toHaveBeenCalledTimes(2); + expect(firstOpts?.replyToMessageId).toBeUndefined(); + expect(secondOpts?.replyToMessageId).toBe(777); + expect(results).toEqual([ + { channel: "telegram", messageId: "tg-1", chatId: "chat-1" }, + { channel: "telegram", messageId: "tg-2", chatId: "chat-1" }, + ]); + }); + + it("does not consume inherited telegram reply state after an invalid payload-level sendPayload override", async () => { + const sendTelegram = vi + .fn() + .mockResolvedValueOnce({ messageId: "tg-1", chatId: "chat-1" }) + .mockResolvedValueOnce({ messageId: "tg-2", chatId: "chat-1" }); + const cfg: OpenClawConfig = { + channels: { telegram: { botToken: "tok-1", textChunkLimit: 4000 } }, + }; + + const results = await deliverOutboundPayloads({ + cfg, + channel: "telegram", + to: "123", + payloads: [ + { + text: "first", + replyToId: "not-a-number", + channelData: { telegram: { buttons: [] } }, + }, + { + text: "second", + channelData: { telegram: { buttons: [] } }, + }, + ], + replyToId: "777", + deps: { sendTelegram }, + skipQueue: true, + }); + + const firstOpts = sendTelegram.mock.calls[0]?.[2] as { replyToMessageId?: number } | undefined; + const secondOpts = sendTelegram.mock.calls[1]?.[2] as { replyToMessageId?: number } | undefined; + + expect(sendTelegram).toHaveBeenCalledTimes(2); + expect(firstOpts?.replyToMessageId).toBeUndefined(); + expect(secondOpts?.replyToMessageId).toBe(777); + expect(results).toEqual([ + { channel: "telegram", messageId: "tg-1", chatId: "chat-1" }, + { channel: "telegram", messageId: "tg-2", chatId: "chat-1" }, + ]); + }); + it("retries replyToId on later non-signal media payloads after a best-effort failure", async () => { const sendText = vi.fn(); const sendMedia = vi diff --git a/src/infra/outbound/deliver.ts b/src/infra/outbound/deliver.ts index 817680e21ac..e648ac70b92 100644 --- a/src/infra/outbound/deliver.ts +++ b/src/infra/outbound/deliver.ts @@ -49,6 +49,7 @@ import type { OutboundIdentity } from "./identity.js"; import type { DeliveryMirror } from "./mirror.js"; import type { NormalizedOutboundPayload } from "./payloads.js"; import { normalizeReplyPayloadsForDelivery } from "./payloads.js"; +import { readReplyApplied } from "./reply-applied.js"; import { isPlainTextSurface, sanitizeForPlainText } from "./sanitize-text.js"; import { resolveOutboundSendDep, type OutboundSendDeps } from "./send-deps.js"; import type { OutboundSessionContext } from "./session-context.js"; @@ -770,11 +771,29 @@ async function deliverOutboundPayloadsCore( }).quoteTimestamp !== undefined ); }; + const didSendApplyReply = ( + resultCountBeforeSend: number, + value?: T, + sent?: (value: T) => boolean, + ) => { + const valueFlag = value === undefined ? undefined : readReplyApplied(value); + if (valueFlag !== undefined) { + return valueFlag; + } + const resultFlags = results + .slice(resultCountBeforeSend) + .map((result) => readReplyApplied(result)) + .filter((flag): flag is boolean => flag !== undefined); + if (resultFlags.length > 0) { + return resultFlags.some(Boolean); + } + return value !== undefined && sent ? sent(value) : results.length > resultCountBeforeSend; + }; const markReplyConsumedIfSendSucceeded = ( replyTo: string | undefined, resultCountBeforeSend: number, ) => { - if (shouldConsumeReplyAfterSend(replyTo) && results.length > resultCountBeforeSend) { + if (shouldConsumeReplyAfterSend(replyTo) && didSendApplyReply(resultCountBeforeSend)) { replyConsumed = true; } }; @@ -788,7 +807,7 @@ async function deliverOutboundPayloadsCore( const value = await send(); if ( shouldConsumeReplyAfterSend(replyTo) && - (sent?.(value) ?? results.length > resultCountBeforeSend) + didSendApplyReply(resultCountBeforeSend, value, sent) ) { replyConsumed = true; } diff --git a/src/infra/outbound/reply-applied.ts b/src/infra/outbound/reply-applied.ts new file mode 100644 index 00000000000..83193716482 --- /dev/null +++ b/src/infra/outbound/reply-applied.ts @@ -0,0 +1,13 @@ +const replyAppliedMarkers = new WeakMap(); + +export function markReplyApplied(value: T, applied: boolean): T { + replyAppliedMarkers.set(value, applied); + return value; +} + +export function readReplyApplied(value: unknown): boolean | undefined { + if (!value || typeof value !== "object") { + return undefined; + } + return replyAppliedMarkers.get(value); +}