fix(outbound): preserve replies after adapter sanitization
This commit is contained in:
parent
b3f0da2a77
commit
9debe07ec2
@ -10,6 +10,7 @@ import {
|
||||
sendPayloadMediaSequenceOrFallback,
|
||||
} from "openclaw/plugin-sdk/reply-payload";
|
||||
import type { ReplyPayload } from "openclaw/plugin-sdk/reply-runtime";
|
||||
import { markReplyApplied } from "../../../src/infra/outbound/reply-applied.js";
|
||||
import type { TelegramInlineButtons } from "./button-types.js";
|
||||
import { resolveTelegramInlineButtons } from "./button-types.js";
|
||||
import { markdownToTelegramHtmlChunks } from "./format.js";
|
||||
@ -21,6 +22,13 @@ export const TELEGRAM_TEXT_CHUNK_LIMIT = 4000;
|
||||
type TelegramSendFn = typeof sendMessageTelegram;
|
||||
type TelegramSendOpts = Parameters<TelegramSendFn>[2];
|
||||
|
||||
function attachReplyAppliedMarker<T extends object>(
|
||||
result: T,
|
||||
baseOpts: { replyToMessageId?: number },
|
||||
) {
|
||||
return markReplyApplied(result, baseOpts.replyToMessageId !== undefined);
|
||||
}
|
||||
|
||||
function resolveTelegramSendContext(params: {
|
||||
cfg: NonNullable<TelegramSendOpts>["cfg"];
|
||||
deps?: OutboundSendDeps;
|
||||
@ -116,9 +124,10 @@ export const telegramOutbound: ChannelOutboundAdapter = {
|
||||
replyToId,
|
||||
threadId,
|
||||
});
|
||||
return await send(to, text, {
|
||||
const result = await send(to, text, {
|
||||
...baseOpts,
|
||||
});
|
||||
return attachReplyAppliedMarker(result, baseOpts);
|
||||
},
|
||||
sendMedia: async ({
|
||||
cfg,
|
||||
@ -145,6 +154,7 @@ export const telegramOutbound: ChannelOutboundAdapter = {
|
||||
mediaLocalRoots,
|
||||
forceDocument: forceDocument ?? false,
|
||||
});
|
||||
return attachReplyAppliedMarker(result, baseOpts);
|
||||
},
|
||||
}),
|
||||
sendPayload: async ({
|
||||
@ -175,6 +185,6 @@ export const telegramOutbound: ChannelOutboundAdapter = {
|
||||
forceDocument: forceDocument ?? false,
|
||||
},
|
||||
});
|
||||
return attachChannelToResult("telegram", result);
|
||||
return attachReplyAppliedMarker(attachChannelToResult("telegram", result), baseOpts);
|
||||
},
|
||||
};
|
||||
|
||||
@ -356,6 +356,78 @@ describe("deliverOutboundPayloads Greptile fixes", () => {
|
||||
expect(results).toHaveLength(2);
|
||||
});
|
||||
|
||||
it("does not consume inherited telegram reply state after an invalid payload-level text override", async () => {
|
||||
const sendTelegram = vi
|
||||
.fn()
|
||||
.mockResolvedValueOnce({ messageId: "tg-1", chatId: "chat-1" })
|
||||
.mockResolvedValueOnce({ messageId: "tg-2", chatId: "chat-1" });
|
||||
const cfg: OpenClawConfig = {
|
||||
channels: { telegram: { botToken: "tok-1", textChunkLimit: 4000 } },
|
||||
};
|
||||
|
||||
const results = await deliverOutboundPayloads({
|
||||
cfg,
|
||||
channel: "telegram",
|
||||
to: "123",
|
||||
payloads: [{ text: "first", replyToId: "not-a-number" }, { text: "second" }],
|
||||
replyToId: "777",
|
||||
deps: { sendTelegram },
|
||||
skipQueue: true,
|
||||
});
|
||||
|
||||
const firstOpts = sendTelegram.mock.calls[0]?.[2] as { replyToMessageId?: number } | undefined;
|
||||
const secondOpts = sendTelegram.mock.calls[1]?.[2] as { replyToMessageId?: number } | undefined;
|
||||
|
||||
expect(sendTelegram).toHaveBeenCalledTimes(2);
|
||||
expect(firstOpts?.replyToMessageId).toBeUndefined();
|
||||
expect(secondOpts?.replyToMessageId).toBe(777);
|
||||
expect(results).toEqual([
|
||||
{ channel: "telegram", messageId: "tg-1", chatId: "chat-1" },
|
||||
{ channel: "telegram", messageId: "tg-2", chatId: "chat-1" },
|
||||
]);
|
||||
});
|
||||
|
||||
it("does not consume inherited telegram reply state after an invalid payload-level sendPayload override", async () => {
|
||||
const sendTelegram = vi
|
||||
.fn()
|
||||
.mockResolvedValueOnce({ messageId: "tg-1", chatId: "chat-1" })
|
||||
.mockResolvedValueOnce({ messageId: "tg-2", chatId: "chat-1" });
|
||||
const cfg: OpenClawConfig = {
|
||||
channels: { telegram: { botToken: "tok-1", textChunkLimit: 4000 } },
|
||||
};
|
||||
|
||||
const results = await deliverOutboundPayloads({
|
||||
cfg,
|
||||
channel: "telegram",
|
||||
to: "123",
|
||||
payloads: [
|
||||
{
|
||||
text: "first",
|
||||
replyToId: "not-a-number",
|
||||
channelData: { telegram: { buttons: [] } },
|
||||
},
|
||||
{
|
||||
text: "second",
|
||||
channelData: { telegram: { buttons: [] } },
|
||||
},
|
||||
],
|
||||
replyToId: "777",
|
||||
deps: { sendTelegram },
|
||||
skipQueue: true,
|
||||
});
|
||||
|
||||
const firstOpts = sendTelegram.mock.calls[0]?.[2] as { replyToMessageId?: number } | undefined;
|
||||
const secondOpts = sendTelegram.mock.calls[1]?.[2] as { replyToMessageId?: number } | undefined;
|
||||
|
||||
expect(sendTelegram).toHaveBeenCalledTimes(2);
|
||||
expect(firstOpts?.replyToMessageId).toBeUndefined();
|
||||
expect(secondOpts?.replyToMessageId).toBe(777);
|
||||
expect(results).toEqual([
|
||||
{ channel: "telegram", messageId: "tg-1", chatId: "chat-1" },
|
||||
{ channel: "telegram", messageId: "tg-2", chatId: "chat-1" },
|
||||
]);
|
||||
});
|
||||
|
||||
it("retries replyToId on later non-signal media payloads after a best-effort failure", async () => {
|
||||
const sendText = vi.fn();
|
||||
const sendMedia = vi
|
||||
|
||||
@ -49,6 +49,7 @@ import type { OutboundIdentity } from "./identity.js";
|
||||
import type { DeliveryMirror } from "./mirror.js";
|
||||
import type { NormalizedOutboundPayload } from "./payloads.js";
|
||||
import { normalizeReplyPayloadsForDelivery } from "./payloads.js";
|
||||
import { readReplyApplied } from "./reply-applied.js";
|
||||
import { isPlainTextSurface, sanitizeForPlainText } from "./sanitize-text.js";
|
||||
import { resolveOutboundSendDep, type OutboundSendDeps } from "./send-deps.js";
|
||||
import type { OutboundSessionContext } from "./session-context.js";
|
||||
@ -770,11 +771,29 @@ async function deliverOutboundPayloadsCore(
|
||||
}).quoteTimestamp !== undefined
|
||||
);
|
||||
};
|
||||
const didSendApplyReply = <T>(
|
||||
resultCountBeforeSend: number,
|
||||
value?: T,
|
||||
sent?: (value: T) => boolean,
|
||||
) => {
|
||||
const valueFlag = value === undefined ? undefined : readReplyApplied(value);
|
||||
if (valueFlag !== undefined) {
|
||||
return valueFlag;
|
||||
}
|
||||
const resultFlags = results
|
||||
.slice(resultCountBeforeSend)
|
||||
.map((result) => readReplyApplied(result))
|
||||
.filter((flag): flag is boolean => flag !== undefined);
|
||||
if (resultFlags.length > 0) {
|
||||
return resultFlags.some(Boolean);
|
||||
}
|
||||
return value !== undefined && sent ? sent(value) : results.length > resultCountBeforeSend;
|
||||
};
|
||||
const markReplyConsumedIfSendSucceeded = (
|
||||
replyTo: string | undefined,
|
||||
resultCountBeforeSend: number,
|
||||
) => {
|
||||
if (shouldConsumeReplyAfterSend(replyTo) && results.length > resultCountBeforeSend) {
|
||||
if (shouldConsumeReplyAfterSend(replyTo) && didSendApplyReply(resultCountBeforeSend)) {
|
||||
replyConsumed = true;
|
||||
}
|
||||
};
|
||||
@ -788,7 +807,7 @@ async function deliverOutboundPayloadsCore(
|
||||
const value = await send();
|
||||
if (
|
||||
shouldConsumeReplyAfterSend(replyTo) &&
|
||||
(sent?.(value) ?? results.length > resultCountBeforeSend)
|
||||
didSendApplyReply(resultCountBeforeSend, value, sent)
|
||||
) {
|
||||
replyConsumed = true;
|
||||
}
|
||||
|
||||
13
src/infra/outbound/reply-applied.ts
Normal file
13
src/infra/outbound/reply-applied.ts
Normal file
@ -0,0 +1,13 @@
|
||||
const replyAppliedMarkers = new WeakMap<object, boolean>();
|
||||
|
||||
export function markReplyApplied<T extends object>(value: T, applied: boolean): T {
|
||||
replyAppliedMarkers.set(value, applied);
|
||||
return value;
|
||||
}
|
||||
|
||||
export function readReplyApplied(value: unknown): boolean | undefined {
|
||||
if (!value || typeof value !== "object") {
|
||||
return undefined;
|
||||
}
|
||||
return replyAppliedMarkers.get(value);
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user