Compare commits
14 Commits
main
...
fix/slack-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b0c77a4f67 | ||
|
|
7bc216db03 | ||
|
|
9c8b283f77 | ||
|
|
bef0b8c8bb | ||
|
|
f2e0997ede | ||
|
|
3c5f308487 | ||
|
|
a298fa22f6 | ||
|
|
7b9fc01fba | ||
|
|
d1653b7750 | ||
|
|
d8e1aff7ee | ||
|
|
c9fbb445a7 | ||
|
|
0eb89b16e1 | ||
|
|
85018c4b56 | ||
|
|
8bd3281652 |
@ -24,6 +24,7 @@ Docs: https://docs.openclaw.ai
|
||||
- iMessage/self-chat echo dedupe: drop reflected duplicate copies only when a matching `is_from_me` event was just seen for the same chat, text, and `created_at`, preventing self-chat loops without broad text-only suppression. Related to #32166. (#38440) Thanks @vincentkoc.
|
||||
- Mattermost/block streaming: fix duplicate message delivery (one threaded, one top-level) when block streaming is active by excluding `replyToId` from the block reply dedup key and adding an explicit `threading` dock to the Mattermost plugin. (#41362) Thanks @mathiasnagler and @vincentkoc.
|
||||
- BlueBubbles/self-chat echo dedupe: drop reflected duplicate webhook copies only when a matching `fromMe` event was just seen for the same chat, body, and timestamp, preventing self-chat loops without broad webhook suppression. Related to #32166. (#38442) Thanks @vincentkoc.
|
||||
- Slack/native streaming fallback cleanup: replay full streamed text only after orphan cleanup succeeds, finalize active streams before plain/media payloads to preserve reply order, and keep ghost stream messages from duplicating fallback replies on mobile Slack. (#41525) Thanks @raulvidis.
|
||||
- Models/Kimi Coding: send `anthropic-messages` tools in native Anthropic format again so `kimi-coding` stops degrading tool calls into XML/plain-text pseudo invocations instead of real `tool_use` blocks. (#38669, #39907, #40552) Thanks @opriz.
|
||||
|
||||
## 2026.3.11
|
||||
|
||||
@ -1,5 +1,11 @@
|
||||
import { describe, expect, it } from "vitest";
|
||||
import { isSlackStreamingEnabled, resolveSlackStreamingThreadHint } from "./dispatch.js";
|
||||
import type { ReplyPayload } from "../../../auto-reply/types.js";
|
||||
import {
|
||||
buildSlackStreamFallbackText,
|
||||
isSlackStreamingEnabled,
|
||||
resolveSlackStreamingThreadHint,
|
||||
shouldFinalizeSlackStreamBeforePlainPayload,
|
||||
} from "./dispatch.js";
|
||||
|
||||
describe("slack native streaming defaults", () => {
|
||||
it("is enabled for partial mode when native streaming is on", () => {
|
||||
@ -45,3 +51,46 @@ describe("slack native streaming thread hint", () => {
|
||||
).toBe("2000.1");
|
||||
});
|
||||
});
|
||||
|
||||
describe("slack native streaming fallback helpers", () => {
|
||||
it("replays accumulated streamed text before the failing chunk", () => {
|
||||
expect(buildSlackStreamFallbackText("First chunk", "Second chunk")).toBe(
|
||||
"First chunk\nSecond chunk",
|
||||
);
|
||||
expect(buildSlackStreamFallbackText("", "Only chunk")).toBe("Only chunk");
|
||||
});
|
||||
|
||||
it("finalizes an active stream before sending plain payloads", () => {
|
||||
const mediaPayload: ReplyPayload = {
|
||||
text: "Image caption",
|
||||
mediaUrl: "file:///tmp/example.png",
|
||||
};
|
||||
const emptyTextPayload: ReplyPayload = { text: " " };
|
||||
const normalTextPayload: ReplyPayload = { text: "Continue streaming" };
|
||||
|
||||
expect(
|
||||
shouldFinalizeSlackStreamBeforePlainPayload({
|
||||
hasActiveStream: true,
|
||||
payload: mediaPayload,
|
||||
}),
|
||||
).toBe(true);
|
||||
expect(
|
||||
shouldFinalizeSlackStreamBeforePlainPayload({
|
||||
hasActiveStream: true,
|
||||
payload: emptyTextPayload,
|
||||
}),
|
||||
).toBe(true);
|
||||
expect(
|
||||
shouldFinalizeSlackStreamBeforePlainPayload({
|
||||
hasActiveStream: true,
|
||||
payload: normalTextPayload,
|
||||
}),
|
||||
).toBe(false);
|
||||
expect(
|
||||
shouldFinalizeSlackStreamBeforePlainPayload({
|
||||
hasActiveStream: false,
|
||||
payload: mediaPayload,
|
||||
}),
|
||||
).toBe(false);
|
||||
});
|
||||
});
|
||||
|
||||
@ -31,6 +31,20 @@ function hasMedia(payload: ReplyPayload): boolean {
|
||||
return Boolean(payload.mediaUrl) || (payload.mediaUrls?.length ?? 0) > 0;
|
||||
}
|
||||
|
||||
export function buildSlackStreamFallbackText(streamedText: string, nextText: string): string {
|
||||
return streamedText ? `${streamedText}\n${nextText}` : nextText;
|
||||
}
|
||||
|
||||
export function shouldFinalizeSlackStreamBeforePlainPayload(params: {
|
||||
hasActiveStream: boolean;
|
||||
payload: ReplyPayload;
|
||||
}): boolean {
|
||||
if (!params.hasActiveStream) {
|
||||
return false;
|
||||
}
|
||||
return hasMedia(params.payload) || !params.payload.text?.trim();
|
||||
}
|
||||
|
||||
export function isSlackStreamingEnabled(params: {
|
||||
mode: "off" | "partial" | "block" | "progress";
|
||||
nativeStreaming: boolean;
|
||||
@ -222,6 +236,12 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
|
||||
});
|
||||
let streamSession: SlackStreamSession | null = null;
|
||||
let streamFailed = false;
|
||||
// Accumulates all text that has been successfully flushed into the Slack
|
||||
// stream message. Used to reconstruct a complete fallback reply if the
|
||||
// stream fails mid-turn or stopSlackStream fails at finalization — so the
|
||||
// user always receives the full answer, not just the most recent chunk.
|
||||
let streamedText = "";
|
||||
let lastStreamPayload: ReplyPayload | null = null;
|
||||
let usedReplyThreadTs: string | undefined;
|
||||
|
||||
const deliverNormally = async (payload: ReplyPayload, forcedThreadTs?: string): Promise<void> => {
|
||||
@ -244,13 +264,72 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
|
||||
replyPlan.markSent();
|
||||
};
|
||||
|
||||
const deliverWithStreaming = async (payload: ReplyPayload): Promise<void> => {
|
||||
if (streamFailed || hasMedia(payload) || !payload.text?.trim()) {
|
||||
await deliverNormally(payload, streamSession?.threadTs);
|
||||
const deleteOrphanedStreamMessage = async (streamMessageTs?: string): Promise<boolean> => {
|
||||
if (!streamMessageTs) {
|
||||
return true;
|
||||
}
|
||||
try {
|
||||
await ctx.app.client.chat.delete({
|
||||
token: ctx.botToken,
|
||||
channel: message.channel,
|
||||
ts: streamMessageTs,
|
||||
});
|
||||
logVerbose(`slack-stream: deleted orphaned stream message ${streamMessageTs}`);
|
||||
return true;
|
||||
} catch (deleteErr) {
|
||||
logVerbose(
|
||||
`slack-stream: failed to delete orphaned stream message ${streamMessageTs}: ${String(deleteErr)}`,
|
||||
);
|
||||
return false;
|
||||
}
|
||||
};
|
||||
|
||||
const replayAccumulatedStreamText = async (threadTs?: string): Promise<void> => {
|
||||
if (!lastStreamPayload || !streamedText) {
|
||||
return;
|
||||
}
|
||||
const fallback: ReplyPayload = Object.assign({}, lastStreamPayload, { text: streamedText });
|
||||
await deliverNormally(fallback, threadTs);
|
||||
};
|
||||
|
||||
const finalizeActiveStreamBeforePlainPayload = async (): Promise<string | undefined> => {
|
||||
if (!streamSession || streamFailed) {
|
||||
return streamSession?.threadTs;
|
||||
}
|
||||
const activeStream = streamSession;
|
||||
const threadTs = activeStream.threadTs;
|
||||
try {
|
||||
await stopSlackStream({ session: activeStream });
|
||||
} catch (err) {
|
||||
runtime.error?.(danger(`slack-stream: failed to stop stream: ${String(err)}`));
|
||||
if (await deleteOrphanedStreamMessage(activeStream.streamMessageTs)) {
|
||||
await replayAccumulatedStreamText(threadTs);
|
||||
}
|
||||
} finally {
|
||||
streamSession = null;
|
||||
streamedText = "";
|
||||
lastStreamPayload = null;
|
||||
}
|
||||
return threadTs;
|
||||
};
|
||||
|
||||
const deliverWithStreaming = async (payload: ReplyPayload): Promise<void> => {
|
||||
const text = payload.text?.trim() ?? "";
|
||||
const forcedThreadTs = shouldFinalizeSlackStreamBeforePlainPayload({
|
||||
hasActiveStream: Boolean(streamSession),
|
||||
payload,
|
||||
})
|
||||
? await finalizeActiveStreamBeforePlainPayload()
|
||||
: streamSession?.threadTs;
|
||||
if (streamFailed || hasMedia(payload) || !text) {
|
||||
await deliverNormally(payload, forcedThreadTs);
|
||||
return;
|
||||
}
|
||||
// Track the last payload for metadata (thread ts, media, etc.) and
|
||||
// accumulate its text so a mid-stream failure can re-deliver the complete
|
||||
// answer rather than only the failing chunk.
|
||||
lastStreamPayload = payload;
|
||||
|
||||
const text = payload.text.trim();
|
||||
let plannedThreadTs: string | undefined;
|
||||
try {
|
||||
if (!streamSession) {
|
||||
@ -273,6 +352,8 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
|
||||
teamId: ctx.teamId,
|
||||
userId: message.user,
|
||||
});
|
||||
// Record text that is now live in the Slack stream message.
|
||||
streamedText = text;
|
||||
usedReplyThreadTs ??= streamThreadTs;
|
||||
replyPlan.markSent();
|
||||
return;
|
||||
@ -282,12 +363,34 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
|
||||
session: streamSession,
|
||||
text: "\n" + text,
|
||||
});
|
||||
// Record text that was successfully appended to the stream message.
|
||||
streamedText += "\n" + text;
|
||||
} catch (err) {
|
||||
runtime.error?.(
|
||||
danger(`slack-stream: streaming API call failed: ${String(err)}, falling back`),
|
||||
);
|
||||
streamFailed = true;
|
||||
await deliverNormally(payload, streamSession?.threadTs ?? plannedThreadTs);
|
||||
|
||||
// Mark the stream as stopped so the end-of-dispatch finalizer does not
|
||||
// call stopSlackStream on the orphaned message (which would finalize it
|
||||
// and leave a duplicate visible on mobile Slack).
|
||||
if (streamSession) {
|
||||
streamSession.stopped = true;
|
||||
}
|
||||
|
||||
// If startStream already created a Slack message, delete it to prevent
|
||||
// the orphaned stream message from persisting alongside the fallback.
|
||||
const orphanDeleted = await deleteOrphanedStreamMessage(streamSession?.streamMessageTs);
|
||||
|
||||
// Re-deliver the full content: everything already in the stream message
|
||||
// plus the current payload that failed to append. Using only `payload`
|
||||
// here would drop all previously-streamed text.
|
||||
if (orphanDeleted) {
|
||||
await deliverNormally(
|
||||
{ ...payload, text: buildSlackStreamFallbackText(streamedText, text) },
|
||||
streamSession?.threadTs ?? plannedThreadTs,
|
||||
);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
@ -456,15 +559,15 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
|
||||
draftStream.stop();
|
||||
markDispatchIdle();
|
||||
|
||||
// -----------------------------------------------------------------------
|
||||
// Finalize the stream if one was started
|
||||
// -----------------------------------------------------------------------
|
||||
const finalStream = streamSession as SlackStreamSession | null;
|
||||
if (finalStream && !finalStream.stopped) {
|
||||
try {
|
||||
await stopSlackStream({ session: finalStream });
|
||||
} catch (err) {
|
||||
runtime.error?.(danger(`slack-stream: failed to stop stream: ${String(err)}`));
|
||||
if (await deleteOrphanedStreamMessage(finalStream.streamMessageTs)) {
|
||||
await replayAccumulatedStreamText(finalStream.threadTs);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -28,6 +28,16 @@ export type SlackStreamSession = {
|
||||
threadTs: string;
|
||||
/** True once stop() has been called. */
|
||||
stopped: boolean;
|
||||
/**
|
||||
* The Slack message timestamp of the stream message. Populated from the
|
||||
* first non-null response returned by `streamer.append()` (which is the
|
||||
* `chat.startStream` response). May be undefined if all appends were
|
||||
* buffered and the stream was never flushed to Slack — in which case there
|
||||
* is no orphaned message to clean up.
|
||||
*
|
||||
* Use this instead of accessing `streamer.streamTs` (private in the SDK).
|
||||
*/
|
||||
streamMessageTs?: string;
|
||||
};
|
||||
|
||||
export type StartSlackStreamParams = {
|
||||
@ -98,8 +108,14 @@ export async function startSlackStream(
|
||||
|
||||
// If initial text is provided, send it as the first append which will
|
||||
// trigger the ChatStreamer to call chat.startStream under the hood.
|
||||
// The first non-null response is a ChatStartStreamResponse and carries the
|
||||
// stream message ts — capture it so callers never need to touch the private
|
||||
// streamer.streamTs field.
|
||||
if (text) {
|
||||
await streamer.append({ markdown_text: text });
|
||||
const response = await streamer.append({ markdown_text: text });
|
||||
if (response?.ts) {
|
||||
session.streamMessageTs = response.ts;
|
||||
}
|
||||
logVerbose(`slack-stream: appended initial text (${text.length} chars)`);
|
||||
}
|
||||
|
||||
@ -121,7 +137,15 @@ export async function appendSlackStream(params: AppendSlackStreamParams): Promis
|
||||
return;
|
||||
}
|
||||
|
||||
await session.streamer.append({ markdown_text: text });
|
||||
// Capture the stream message ts from the first non-null response (the
|
||||
// ChatStartStreamResponse returned when the SDK flushes and calls
|
||||
// chat.startStream). Subsequent appends return ChatAppendStreamResponse or
|
||||
// null (buffered). Once captured we stop checking.
|
||||
const response = await session.streamer.append({ markdown_text: text });
|
||||
if (!session.streamMessageTs && response?.ts) {
|
||||
session.streamMessageTs = response.ts;
|
||||
}
|
||||
|
||||
logVerbose(`slack-stream: appended ${text.length} chars`);
|
||||
}
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user