Compare commits
14 Commits
main
...
fix/slack-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b0c77a4f67 | ||
|
|
7bc216db03 | ||
|
|
9c8b283f77 | ||
|
|
bef0b8c8bb | ||
|
|
f2e0997ede | ||
|
|
3c5f308487 | ||
|
|
a298fa22f6 | ||
|
|
7b9fc01fba | ||
|
|
d1653b7750 | ||
|
|
d8e1aff7ee | ||
|
|
c9fbb445a7 | ||
|
|
0eb89b16e1 | ||
|
|
85018c4b56 | ||
|
|
8bd3281652 |
@ -24,6 +24,7 @@ Docs: https://docs.openclaw.ai
|
|||||||
- iMessage/self-chat echo dedupe: drop reflected duplicate copies only when a matching `is_from_me` event was just seen for the same chat, text, and `created_at`, preventing self-chat loops without broad text-only suppression. Related to #32166. (#38440) Thanks @vincentkoc.
|
- iMessage/self-chat echo dedupe: drop reflected duplicate copies only when a matching `is_from_me` event was just seen for the same chat, text, and `created_at`, preventing self-chat loops without broad text-only suppression. Related to #32166. (#38440) Thanks @vincentkoc.
|
||||||
- Mattermost/block streaming: fix duplicate message delivery (one threaded, one top-level) when block streaming is active by excluding `replyToId` from the block reply dedup key and adding an explicit `threading` dock to the Mattermost plugin. (#41362) Thanks @mathiasnagler and @vincentkoc.
|
- Mattermost/block streaming: fix duplicate message delivery (one threaded, one top-level) when block streaming is active by excluding `replyToId` from the block reply dedup key and adding an explicit `threading` dock to the Mattermost plugin. (#41362) Thanks @mathiasnagler and @vincentkoc.
|
||||||
- BlueBubbles/self-chat echo dedupe: drop reflected duplicate webhook copies only when a matching `fromMe` event was just seen for the same chat, body, and timestamp, preventing self-chat loops without broad webhook suppression. Related to #32166. (#38442) Thanks @vincentkoc.
|
- BlueBubbles/self-chat echo dedupe: drop reflected duplicate webhook copies only when a matching `fromMe` event was just seen for the same chat, body, and timestamp, preventing self-chat loops without broad webhook suppression. Related to #32166. (#38442) Thanks @vincentkoc.
|
||||||
|
- Slack/native streaming fallback cleanup: replay full streamed text only after orphan cleanup succeeds, finalize active streams before plain/media payloads to preserve reply order, and keep ghost stream messages from duplicating fallback replies on mobile Slack. (#41525) Thanks @raulvidis.
|
||||||
- Models/Kimi Coding: send `anthropic-messages` tools in native Anthropic format again so `kimi-coding` stops degrading tool calls into XML/plain-text pseudo invocations instead of real `tool_use` blocks. (#38669, #39907, #40552) Thanks @opriz.
|
- Models/Kimi Coding: send `anthropic-messages` tools in native Anthropic format again so `kimi-coding` stops degrading tool calls into XML/plain-text pseudo invocations instead of real `tool_use` blocks. (#38669, #39907, #40552) Thanks @opriz.
|
||||||
|
|
||||||
## 2026.3.11
|
## 2026.3.11
|
||||||
|
|||||||
@ -1,5 +1,11 @@
|
|||||||
import { describe, expect, it } from "vitest";
|
import { describe, expect, it } from "vitest";
|
||||||
import { isSlackStreamingEnabled, resolveSlackStreamingThreadHint } from "./dispatch.js";
|
import type { ReplyPayload } from "../../../auto-reply/types.js";
|
||||||
|
import {
|
||||||
|
buildSlackStreamFallbackText,
|
||||||
|
isSlackStreamingEnabled,
|
||||||
|
resolveSlackStreamingThreadHint,
|
||||||
|
shouldFinalizeSlackStreamBeforePlainPayload,
|
||||||
|
} from "./dispatch.js";
|
||||||
|
|
||||||
describe("slack native streaming defaults", () => {
|
describe("slack native streaming defaults", () => {
|
||||||
it("is enabled for partial mode when native streaming is on", () => {
|
it("is enabled for partial mode when native streaming is on", () => {
|
||||||
@ -45,3 +51,46 @@ describe("slack native streaming thread hint", () => {
|
|||||||
).toBe("2000.1");
|
).toBe("2000.1");
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
describe("slack native streaming fallback helpers", () => {
|
||||||
|
it("replays accumulated streamed text before the failing chunk", () => {
|
||||||
|
expect(buildSlackStreamFallbackText("First chunk", "Second chunk")).toBe(
|
||||||
|
"First chunk\nSecond chunk",
|
||||||
|
);
|
||||||
|
expect(buildSlackStreamFallbackText("", "Only chunk")).toBe("Only chunk");
|
||||||
|
});
|
||||||
|
|
||||||
|
it("finalizes an active stream before sending plain payloads", () => {
|
||||||
|
const mediaPayload: ReplyPayload = {
|
||||||
|
text: "Image caption",
|
||||||
|
mediaUrl: "file:///tmp/example.png",
|
||||||
|
};
|
||||||
|
const emptyTextPayload: ReplyPayload = { text: " " };
|
||||||
|
const normalTextPayload: ReplyPayload = { text: "Continue streaming" };
|
||||||
|
|
||||||
|
expect(
|
||||||
|
shouldFinalizeSlackStreamBeforePlainPayload({
|
||||||
|
hasActiveStream: true,
|
||||||
|
payload: mediaPayload,
|
||||||
|
}),
|
||||||
|
).toBe(true);
|
||||||
|
expect(
|
||||||
|
shouldFinalizeSlackStreamBeforePlainPayload({
|
||||||
|
hasActiveStream: true,
|
||||||
|
payload: emptyTextPayload,
|
||||||
|
}),
|
||||||
|
).toBe(true);
|
||||||
|
expect(
|
||||||
|
shouldFinalizeSlackStreamBeforePlainPayload({
|
||||||
|
hasActiveStream: true,
|
||||||
|
payload: normalTextPayload,
|
||||||
|
}),
|
||||||
|
).toBe(false);
|
||||||
|
expect(
|
||||||
|
shouldFinalizeSlackStreamBeforePlainPayload({
|
||||||
|
hasActiveStream: false,
|
||||||
|
payload: mediaPayload,
|
||||||
|
}),
|
||||||
|
).toBe(false);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|||||||
@ -31,6 +31,20 @@ function hasMedia(payload: ReplyPayload): boolean {
|
|||||||
return Boolean(payload.mediaUrl) || (payload.mediaUrls?.length ?? 0) > 0;
|
return Boolean(payload.mediaUrl) || (payload.mediaUrls?.length ?? 0) > 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export function buildSlackStreamFallbackText(streamedText: string, nextText: string): string {
|
||||||
|
return streamedText ? `${streamedText}\n${nextText}` : nextText;
|
||||||
|
}
|
||||||
|
|
||||||
|
export function shouldFinalizeSlackStreamBeforePlainPayload(params: {
|
||||||
|
hasActiveStream: boolean;
|
||||||
|
payload: ReplyPayload;
|
||||||
|
}): boolean {
|
||||||
|
if (!params.hasActiveStream) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return hasMedia(params.payload) || !params.payload.text?.trim();
|
||||||
|
}
|
||||||
|
|
||||||
export function isSlackStreamingEnabled(params: {
|
export function isSlackStreamingEnabled(params: {
|
||||||
mode: "off" | "partial" | "block" | "progress";
|
mode: "off" | "partial" | "block" | "progress";
|
||||||
nativeStreaming: boolean;
|
nativeStreaming: boolean;
|
||||||
@ -222,6 +236,12 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
|
|||||||
});
|
});
|
||||||
let streamSession: SlackStreamSession | null = null;
|
let streamSession: SlackStreamSession | null = null;
|
||||||
let streamFailed = false;
|
let streamFailed = false;
|
||||||
|
// Accumulates all text that has been successfully flushed into the Slack
|
||||||
|
// stream message. Used to reconstruct a complete fallback reply if the
|
||||||
|
// stream fails mid-turn or stopSlackStream fails at finalization — so the
|
||||||
|
// user always receives the full answer, not just the most recent chunk.
|
||||||
|
let streamedText = "";
|
||||||
|
let lastStreamPayload: ReplyPayload | null = null;
|
||||||
let usedReplyThreadTs: string | undefined;
|
let usedReplyThreadTs: string | undefined;
|
||||||
|
|
||||||
const deliverNormally = async (payload: ReplyPayload, forcedThreadTs?: string): Promise<void> => {
|
const deliverNormally = async (payload: ReplyPayload, forcedThreadTs?: string): Promise<void> => {
|
||||||
@ -244,13 +264,72 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
|
|||||||
replyPlan.markSent();
|
replyPlan.markSent();
|
||||||
};
|
};
|
||||||
|
|
||||||
const deliverWithStreaming = async (payload: ReplyPayload): Promise<void> => {
|
const deleteOrphanedStreamMessage = async (streamMessageTs?: string): Promise<boolean> => {
|
||||||
if (streamFailed || hasMedia(payload) || !payload.text?.trim()) {
|
if (!streamMessageTs) {
|
||||||
await deliverNormally(payload, streamSession?.threadTs);
|
return true;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
await ctx.app.client.chat.delete({
|
||||||
|
token: ctx.botToken,
|
||||||
|
channel: message.channel,
|
||||||
|
ts: streamMessageTs,
|
||||||
|
});
|
||||||
|
logVerbose(`slack-stream: deleted orphaned stream message ${streamMessageTs}`);
|
||||||
|
return true;
|
||||||
|
} catch (deleteErr) {
|
||||||
|
logVerbose(
|
||||||
|
`slack-stream: failed to delete orphaned stream message ${streamMessageTs}: ${String(deleteErr)}`,
|
||||||
|
);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
const replayAccumulatedStreamText = async (threadTs?: string): Promise<void> => {
|
||||||
|
if (!lastStreamPayload || !streamedText) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
const fallback: ReplyPayload = Object.assign({}, lastStreamPayload, { text: streamedText });
|
||||||
|
await deliverNormally(fallback, threadTs);
|
||||||
|
};
|
||||||
|
|
||||||
|
const finalizeActiveStreamBeforePlainPayload = async (): Promise<string | undefined> => {
|
||||||
|
if (!streamSession || streamFailed) {
|
||||||
|
return streamSession?.threadTs;
|
||||||
|
}
|
||||||
|
const activeStream = streamSession;
|
||||||
|
const threadTs = activeStream.threadTs;
|
||||||
|
try {
|
||||||
|
await stopSlackStream({ session: activeStream });
|
||||||
|
} catch (err) {
|
||||||
|
runtime.error?.(danger(`slack-stream: failed to stop stream: ${String(err)}`));
|
||||||
|
if (await deleteOrphanedStreamMessage(activeStream.streamMessageTs)) {
|
||||||
|
await replayAccumulatedStreamText(threadTs);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
streamSession = null;
|
||||||
|
streamedText = "";
|
||||||
|
lastStreamPayload = null;
|
||||||
|
}
|
||||||
|
return threadTs;
|
||||||
|
};
|
||||||
|
|
||||||
|
const deliverWithStreaming = async (payload: ReplyPayload): Promise<void> => {
|
||||||
|
const text = payload.text?.trim() ?? "";
|
||||||
|
const forcedThreadTs = shouldFinalizeSlackStreamBeforePlainPayload({
|
||||||
|
hasActiveStream: Boolean(streamSession),
|
||||||
|
payload,
|
||||||
|
})
|
||||||
|
? await finalizeActiveStreamBeforePlainPayload()
|
||||||
|
: streamSession?.threadTs;
|
||||||
|
if (streamFailed || hasMedia(payload) || !text) {
|
||||||
|
await deliverNormally(payload, forcedThreadTs);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
// Track the last payload for metadata (thread ts, media, etc.) and
|
||||||
|
// accumulate its text so a mid-stream failure can re-deliver the complete
|
||||||
|
// answer rather than only the failing chunk.
|
||||||
|
lastStreamPayload = payload;
|
||||||
|
|
||||||
const text = payload.text.trim();
|
|
||||||
let plannedThreadTs: string | undefined;
|
let plannedThreadTs: string | undefined;
|
||||||
try {
|
try {
|
||||||
if (!streamSession) {
|
if (!streamSession) {
|
||||||
@ -273,6 +352,8 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
|
|||||||
teamId: ctx.teamId,
|
teamId: ctx.teamId,
|
||||||
userId: message.user,
|
userId: message.user,
|
||||||
});
|
});
|
||||||
|
// Record text that is now live in the Slack stream message.
|
||||||
|
streamedText = text;
|
||||||
usedReplyThreadTs ??= streamThreadTs;
|
usedReplyThreadTs ??= streamThreadTs;
|
||||||
replyPlan.markSent();
|
replyPlan.markSent();
|
||||||
return;
|
return;
|
||||||
@ -282,12 +363,34 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
|
|||||||
session: streamSession,
|
session: streamSession,
|
||||||
text: "\n" + text,
|
text: "\n" + text,
|
||||||
});
|
});
|
||||||
|
// Record text that was successfully appended to the stream message.
|
||||||
|
streamedText += "\n" + text;
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
runtime.error?.(
|
runtime.error?.(
|
||||||
danger(`slack-stream: streaming API call failed: ${String(err)}, falling back`),
|
danger(`slack-stream: streaming API call failed: ${String(err)}, falling back`),
|
||||||
);
|
);
|
||||||
streamFailed = true;
|
streamFailed = true;
|
||||||
await deliverNormally(payload, streamSession?.threadTs ?? plannedThreadTs);
|
|
||||||
|
// Mark the stream as stopped so the end-of-dispatch finalizer does not
|
||||||
|
// call stopSlackStream on the orphaned message (which would finalize it
|
||||||
|
// and leave a duplicate visible on mobile Slack).
|
||||||
|
if (streamSession) {
|
||||||
|
streamSession.stopped = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
// If startStream already created a Slack message, delete it to prevent
|
||||||
|
// the orphaned stream message from persisting alongside the fallback.
|
||||||
|
const orphanDeleted = await deleteOrphanedStreamMessage(streamSession?.streamMessageTs);
|
||||||
|
|
||||||
|
// Re-deliver the full content: everything already in the stream message
|
||||||
|
// plus the current payload that failed to append. Using only `payload`
|
||||||
|
// here would drop all previously-streamed text.
|
||||||
|
if (orphanDeleted) {
|
||||||
|
await deliverNormally(
|
||||||
|
{ ...payload, text: buildSlackStreamFallbackText(streamedText, text) },
|
||||||
|
streamSession?.threadTs ?? plannedThreadTs,
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -456,15 +559,15 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
|
|||||||
draftStream.stop();
|
draftStream.stop();
|
||||||
markDispatchIdle();
|
markDispatchIdle();
|
||||||
|
|
||||||
// -----------------------------------------------------------------------
|
|
||||||
// Finalize the stream if one was started
|
|
||||||
// -----------------------------------------------------------------------
|
|
||||||
const finalStream = streamSession as SlackStreamSession | null;
|
const finalStream = streamSession as SlackStreamSession | null;
|
||||||
if (finalStream && !finalStream.stopped) {
|
if (finalStream && !finalStream.stopped) {
|
||||||
try {
|
try {
|
||||||
await stopSlackStream({ session: finalStream });
|
await stopSlackStream({ session: finalStream });
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
runtime.error?.(danger(`slack-stream: failed to stop stream: ${String(err)}`));
|
runtime.error?.(danger(`slack-stream: failed to stop stream: ${String(err)}`));
|
||||||
|
if (await deleteOrphanedStreamMessage(finalStream.streamMessageTs)) {
|
||||||
|
await replayAccumulatedStreamText(finalStream.threadTs);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -28,6 +28,16 @@ export type SlackStreamSession = {
|
|||||||
threadTs: string;
|
threadTs: string;
|
||||||
/** True once stop() has been called. */
|
/** True once stop() has been called. */
|
||||||
stopped: boolean;
|
stopped: boolean;
|
||||||
|
/**
|
||||||
|
* The Slack message timestamp of the stream message. Populated from the
|
||||||
|
* first non-null response returned by `streamer.append()` (which is the
|
||||||
|
* `chat.startStream` response). May be undefined if all appends were
|
||||||
|
* buffered and the stream was never flushed to Slack — in which case there
|
||||||
|
* is no orphaned message to clean up.
|
||||||
|
*
|
||||||
|
* Use this instead of accessing `streamer.streamTs` (private in the SDK).
|
||||||
|
*/
|
||||||
|
streamMessageTs?: string;
|
||||||
};
|
};
|
||||||
|
|
||||||
export type StartSlackStreamParams = {
|
export type StartSlackStreamParams = {
|
||||||
@ -98,8 +108,14 @@ export async function startSlackStream(
|
|||||||
|
|
||||||
// If initial text is provided, send it as the first append which will
|
// If initial text is provided, send it as the first append which will
|
||||||
// trigger the ChatStreamer to call chat.startStream under the hood.
|
// trigger the ChatStreamer to call chat.startStream under the hood.
|
||||||
|
// The first non-null response is a ChatStartStreamResponse and carries the
|
||||||
|
// stream message ts — capture it so callers never need to touch the private
|
||||||
|
// streamer.streamTs field.
|
||||||
if (text) {
|
if (text) {
|
||||||
await streamer.append({ markdown_text: text });
|
const response = await streamer.append({ markdown_text: text });
|
||||||
|
if (response?.ts) {
|
||||||
|
session.streamMessageTs = response.ts;
|
||||||
|
}
|
||||||
logVerbose(`slack-stream: appended initial text (${text.length} chars)`);
|
logVerbose(`slack-stream: appended initial text (${text.length} chars)`);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -121,7 +137,15 @@ export async function appendSlackStream(params: AppendSlackStreamParams): Promis
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
await session.streamer.append({ markdown_text: text });
|
// Capture the stream message ts from the first non-null response (the
|
||||||
|
// ChatStartStreamResponse returned when the SDK flushes and calls
|
||||||
|
// chat.startStream). Subsequent appends return ChatAppendStreamResponse or
|
||||||
|
// null (buffered). Once captured we stop checking.
|
||||||
|
const response = await session.streamer.append({ markdown_text: text });
|
||||||
|
if (!session.streamMessageTs && response?.ts) {
|
||||||
|
session.streamMessageTs = response.ts;
|
||||||
|
}
|
||||||
|
|
||||||
logVerbose(`slack-stream: appended ${text.length} chars`);
|
logVerbose(`slack-stream: appended ${text.length} chars`);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user