Compare commits

...

14 Commits

Author SHA1 Message Date
Vincent Koc
b0c77a4f67 Changelog: add Slack streaming fallback note 2026-03-12 04:00:52 -04:00
Vincent Koc
7bc216db03 Tests: cover Slack streaming fallback helpers 2026-03-12 04:00:12 -04:00
Vincent Koc
9c8b283f77 Slack streaming: serialize replay cleanup decisions 2026-03-12 03:59:40 -04:00
Raul
bef0b8c8bb Fix unused orphanDeleted variable in mid-stream catch block
The variable was assigned but never read since fallback delivery is
intentionally unconditional in this path. Fixes lint error.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-12 03:56:28 -04:00
Nora
f2e0997ede fix(slack-stream): use Object.assign instead of spread for nullable payload
tsc strict mode refuses to narrow mutable let variables through spreads even
with const binding, !== null guards, and non-null assertions. Object.assign
avoids the TS2698 issue entirely since it accepts any source arguments.
2026-03-12 03:56:28 -04:00
Nora
3c5f308487 fix(slack-stream): capture lastStreamPayload into const before null-check + spread
Both tsc and tsgo fail to narrow a mutable let variable through a spread
expression regardless of guards or non-null assertions. The fix: assign the
outer let to a block-scoped const, then check the const !== null in a
separate if. TypeScript always narrows a const through a distinct null check
so the spread is accepted by all compiler variants.
2026-03-12 03:56:28 -04:00
Nora
a298fa22f6 fix(slack-stream): non-null assertion for lastStreamPayload spread
tsc does not narrow mutable let variables through spread type expressions
even when an explicit !== null guard precedes the spread. The assertion is
safe — lastStreamPayload !== null is checked in the same condition.
2026-03-12 03:56:28 -04:00
Nora
7b9fc01fba fix(slack-stream): use !== null narrowing for lastStreamPayload spread
tsgo (native TypeScript compiler) does not narrow a let variable through a
truthy check in a compound && condition when it appears in a spread
expression. Replace the intermediate const + truthy check with an explicit
!== null guard which tsgo reliably narrows.
2026-03-12 03:56:28 -04:00
Nora
d1653b7750 style(slack-stream): apply oxfmt formatting to dispatch.ts
One log line exceeded the print width and one deliverNormally call was
split across lines where oxfmt prefers a single line.
2026-03-12 03:56:28 -04:00
Nora
d8e1aff7ee fix(slack-stream): always deliver fallback on streaming failure regardless of orphan deletion
When a streaming append/start call fails and chat.delete also fails, the
stream message is left in 'streaming' state — never finalized via
chat.stopStream, which may render as invisible or broken on mobile Slack.
streamSession.stopped is already set to true so the end-of-dispatch
finalizer also skips the stream, leaving the payload with no recovery path.

Remove the orphanDeleted guard from the deliverWithStreaming catch block:
always call deliverNormally here even if deletion failed, to ensure the user
receives the complete answer. A cosmetic duplicate on desktop clients is
preferable to a silently truncated answer.

The guard is intentionally kept in the finalizer catch: there the stream
message has already been fully finalized (all content visible), so skipping
deliverNormally on deletion failure avoids a true content duplicate.
2026-03-12 03:56:28 -04:00
Nora
c9fbb445a7 fix(slack-stream): bind lastStreamPayload to local const before spread
TypeScript cannot narrow a mutable let variable through a null-check guard
when it is used in a spread expression (TS2698: Spread types may only be
created from object types). Binding to a local const lets the compiler
narrow the type correctly.
2026-03-12 03:56:28 -04:00
Nora
0eb89b16e1 fix(slack-stream): guard fallback delivery behind orphan-deletion success
If chat.delete throws after a stream failure, deliverNormally was called
unconditionally — leaving both the unfinalizable stream message and the
fallback reply visible, recreating the exact duplicate this PR prevents.

Fix: introduce orphanDeleted flag in both failure paths (deliverWithStreaming
catch and the finalizer catch). deliverNormally is now only called when:
  - there was no orphaned stream message to begin with (streamMessageTs
    undefined = stream never flushed to Slack), OR
  - chat.delete succeeded

If deletion fails, the stream message is still visible with its full content,
so skipping the fallback is the correct behaviour — the user sees the content
without a duplicate.
2026-03-12 03:56:28 -04:00
Nora
85018c4b56 fix(slack-stream): re-deliver full accumulated text on mid-stream failure
When appendSlackStream throws for a later payload, the fallback was calling
deliverNormally(payload, ...) with only the current chunk — dropping all text
from earlier payloads that was already live in the stream message.

dispatchReplyFromConfig can emit multiple final payloads per turn (it
iterates the replies array), so a mid-stream Slack API error could silently
truncate the visible answer.

Fix: accumulate all successfully-streamed text in streamedText (updated after
each successful startSlackStream / appendSlackStream). On failure, re-deliver
{ ...payload, text: streamedText + current chunk } so the user always gets
the complete content. The finalizer fallback (stopSlackStream failure) also
uses streamedText for the same reason.
2026-03-12 03:56:28 -04:00
Nora
8bd3281652 fix(slack-stream): clean up orphaned messages + track streamMessageTs on session
Prevents ghost/duplicate messages on mobile Slack when streaming fails.

## Problem

When a streaming API call fails mid-stream, the partially-created stream
message (sent to Slack via chat.startStream) would persist alongside the
fallback normal reply, causing a duplicate on mobile clients.

Two issues also existed in the original cleanup approach:
1. streamTs is declared private on ChatStreamer in @slack/web-api — accessing
   it directly fails TypeScript strict-mode / pnpm check at compile time.
2. When stopSlackStream fails in the finalizer, the orphaned message was
   deleted but no fallback reply was sent — user gets silence.

## Fix

### src/slack/streaming.ts
- Add streamMessageTs?: string to SlackStreamSession. Populated lazily from
  the first non-null response returned by streamer.append() — which is the
  ChatStartStreamResponse carrying the stream message ts. Never undefined if
  a message was actually sent to Slack; undefined means nothing to clean up.
- Capture ts in startSlackStream (from the initial append response).
- Also backfill in appendSlackStream in case the first append was buffered
  (text < SDK buffer_size of 256 chars → returns null).

### src/slack/monitor/message-handler/dispatch.ts
- On streaming failure: mark stream stopped, delete orphaned message via
  streamMessageTs (not private streamer.streamTs), then fall back to normal
  delivery.
- On finalizer stopSlackStream failure: delete orphaned message + call
  deliverNormally(lastStreamPayload) so the user gets a response.
- Track lastStreamPayload in outer scope across deliverWithStreaming calls.
2026-03-12 03:56:28 -04:00
4 changed files with 188 additions and 11 deletions

View File

@ -24,6 +24,7 @@ Docs: https://docs.openclaw.ai
- iMessage/self-chat echo dedupe: drop reflected duplicate copies only when a matching `is_from_me` event was just seen for the same chat, text, and `created_at`, preventing self-chat loops without broad text-only suppression. Related to #32166. (#38440) Thanks @vincentkoc.
- Mattermost/block streaming: fix duplicate message delivery (one threaded, one top-level) when block streaming is active by excluding `replyToId` from the block reply dedup key and adding an explicit `threading` dock to the Mattermost plugin. (#41362) Thanks @mathiasnagler and @vincentkoc.
- BlueBubbles/self-chat echo dedupe: drop reflected duplicate webhook copies only when a matching `fromMe` event was just seen for the same chat, body, and timestamp, preventing self-chat loops without broad webhook suppression. Related to #32166. (#38442) Thanks @vincentkoc.
- Slack/native streaming fallback cleanup: replay full streamed text only after orphan cleanup succeeds, finalize active streams before plain/media payloads to preserve reply order, and keep ghost stream messages from duplicating fallback replies on mobile Slack. (#41525) Thanks @raulvidis.
- Models/Kimi Coding: send `anthropic-messages` tools in native Anthropic format again so `kimi-coding` stops degrading tool calls into XML/plain-text pseudo invocations instead of real `tool_use` blocks. (#38669, #39907, #40552) Thanks @opriz.
## 2026.3.11

View File

@ -1,5 +1,11 @@
import { describe, expect, it } from "vitest";
import { isSlackStreamingEnabled, resolveSlackStreamingThreadHint } from "./dispatch.js";
import type { ReplyPayload } from "../../../auto-reply/types.js";
import {
buildSlackStreamFallbackText,
isSlackStreamingEnabled,
resolveSlackStreamingThreadHint,
shouldFinalizeSlackStreamBeforePlainPayload,
} from "./dispatch.js";
describe("slack native streaming defaults", () => {
it("is enabled for partial mode when native streaming is on", () => {
@ -45,3 +51,46 @@ describe("slack native streaming thread hint", () => {
).toBe("2000.1");
});
});
describe("slack native streaming fallback helpers", () => {
it("replays accumulated streamed text before the failing chunk", () => {
expect(buildSlackStreamFallbackText("First chunk", "Second chunk")).toBe(
"First chunk\nSecond chunk",
);
expect(buildSlackStreamFallbackText("", "Only chunk")).toBe("Only chunk");
});
it("finalizes an active stream before sending plain payloads", () => {
const mediaPayload: ReplyPayload = {
text: "Image caption",
mediaUrl: "file:///tmp/example.png",
};
const emptyTextPayload: ReplyPayload = { text: " " };
const normalTextPayload: ReplyPayload = { text: "Continue streaming" };
expect(
shouldFinalizeSlackStreamBeforePlainPayload({
hasActiveStream: true,
payload: mediaPayload,
}),
).toBe(true);
expect(
shouldFinalizeSlackStreamBeforePlainPayload({
hasActiveStream: true,
payload: emptyTextPayload,
}),
).toBe(true);
expect(
shouldFinalizeSlackStreamBeforePlainPayload({
hasActiveStream: true,
payload: normalTextPayload,
}),
).toBe(false);
expect(
shouldFinalizeSlackStreamBeforePlainPayload({
hasActiveStream: false,
payload: mediaPayload,
}),
).toBe(false);
});
});

View File

@ -31,6 +31,20 @@ function hasMedia(payload: ReplyPayload): boolean {
return Boolean(payload.mediaUrl) || (payload.mediaUrls?.length ?? 0) > 0;
}
export function buildSlackStreamFallbackText(streamedText: string, nextText: string): string {
return streamedText ? `${streamedText}\n${nextText}` : nextText;
}
export function shouldFinalizeSlackStreamBeforePlainPayload(params: {
hasActiveStream: boolean;
payload: ReplyPayload;
}): boolean {
if (!params.hasActiveStream) {
return false;
}
return hasMedia(params.payload) || !params.payload.text?.trim();
}
export function isSlackStreamingEnabled(params: {
mode: "off" | "partial" | "block" | "progress";
nativeStreaming: boolean;
@ -222,6 +236,12 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
});
let streamSession: SlackStreamSession | null = null;
let streamFailed = false;
// Accumulates all text that has been successfully flushed into the Slack
// stream message. Used to reconstruct a complete fallback reply if the
// stream fails mid-turn or stopSlackStream fails at finalization — so the
// user always receives the full answer, not just the most recent chunk.
let streamedText = "";
let lastStreamPayload: ReplyPayload | null = null;
let usedReplyThreadTs: string | undefined;
const deliverNormally = async (payload: ReplyPayload, forcedThreadTs?: string): Promise<void> => {
@ -244,13 +264,72 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
replyPlan.markSent();
};
const deliverWithStreaming = async (payload: ReplyPayload): Promise<void> => {
if (streamFailed || hasMedia(payload) || !payload.text?.trim()) {
await deliverNormally(payload, streamSession?.threadTs);
const deleteOrphanedStreamMessage = async (streamMessageTs?: string): Promise<boolean> => {
if (!streamMessageTs) {
return true;
}
try {
await ctx.app.client.chat.delete({
token: ctx.botToken,
channel: message.channel,
ts: streamMessageTs,
});
logVerbose(`slack-stream: deleted orphaned stream message ${streamMessageTs}`);
return true;
} catch (deleteErr) {
logVerbose(
`slack-stream: failed to delete orphaned stream message ${streamMessageTs}: ${String(deleteErr)}`,
);
return false;
}
};
const replayAccumulatedStreamText = async (threadTs?: string): Promise<void> => {
if (!lastStreamPayload || !streamedText) {
return;
}
const fallback: ReplyPayload = Object.assign({}, lastStreamPayload, { text: streamedText });
await deliverNormally(fallback, threadTs);
};
const finalizeActiveStreamBeforePlainPayload = async (): Promise<string | undefined> => {
if (!streamSession || streamFailed) {
return streamSession?.threadTs;
}
const activeStream = streamSession;
const threadTs = activeStream.threadTs;
try {
await stopSlackStream({ session: activeStream });
} catch (err) {
runtime.error?.(danger(`slack-stream: failed to stop stream: ${String(err)}`));
if (await deleteOrphanedStreamMessage(activeStream.streamMessageTs)) {
await replayAccumulatedStreamText(threadTs);
}
} finally {
streamSession = null;
streamedText = "";
lastStreamPayload = null;
}
return threadTs;
};
const deliverWithStreaming = async (payload: ReplyPayload): Promise<void> => {
const text = payload.text?.trim() ?? "";
const forcedThreadTs = shouldFinalizeSlackStreamBeforePlainPayload({
hasActiveStream: Boolean(streamSession),
payload,
})
? await finalizeActiveStreamBeforePlainPayload()
: streamSession?.threadTs;
if (streamFailed || hasMedia(payload) || !text) {
await deliverNormally(payload, forcedThreadTs);
return;
}
// Track the last payload for metadata (thread ts, media, etc.) and
// accumulate its text so a mid-stream failure can re-deliver the complete
// answer rather than only the failing chunk.
lastStreamPayload = payload;
const text = payload.text.trim();
let plannedThreadTs: string | undefined;
try {
if (!streamSession) {
@ -273,6 +352,8 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
teamId: ctx.teamId,
userId: message.user,
});
// Record text that is now live in the Slack stream message.
streamedText = text;
usedReplyThreadTs ??= streamThreadTs;
replyPlan.markSent();
return;
@ -282,12 +363,34 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
session: streamSession,
text: "\n" + text,
});
// Record text that was successfully appended to the stream message.
streamedText += "\n" + text;
} catch (err) {
runtime.error?.(
danger(`slack-stream: streaming API call failed: ${String(err)}, falling back`),
);
streamFailed = true;
await deliverNormally(payload, streamSession?.threadTs ?? plannedThreadTs);
// Mark the stream as stopped so the end-of-dispatch finalizer does not
// call stopSlackStream on the orphaned message (which would finalize it
// and leave a duplicate visible on mobile Slack).
if (streamSession) {
streamSession.stopped = true;
}
// If startStream already created a Slack message, delete it to prevent
// the orphaned stream message from persisting alongside the fallback.
const orphanDeleted = await deleteOrphanedStreamMessage(streamSession?.streamMessageTs);
// Re-deliver the full content: everything already in the stream message
// plus the current payload that failed to append. Using only `payload`
// here would drop all previously-streamed text.
if (orphanDeleted) {
await deliverNormally(
{ ...payload, text: buildSlackStreamFallbackText(streamedText, text) },
streamSession?.threadTs ?? plannedThreadTs,
);
}
}
};
@ -456,15 +559,15 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
draftStream.stop();
markDispatchIdle();
// -----------------------------------------------------------------------
// Finalize the stream if one was started
// -----------------------------------------------------------------------
const finalStream = streamSession as SlackStreamSession | null;
if (finalStream && !finalStream.stopped) {
try {
await stopSlackStream({ session: finalStream });
} catch (err) {
runtime.error?.(danger(`slack-stream: failed to stop stream: ${String(err)}`));
if (await deleteOrphanedStreamMessage(finalStream.streamMessageTs)) {
await replayAccumulatedStreamText(finalStream.threadTs);
}
}
}

View File

@ -28,6 +28,16 @@ export type SlackStreamSession = {
threadTs: string;
/** True once stop() has been called. */
stopped: boolean;
/**
* The Slack message timestamp of the stream message. Populated from the
* first non-null response returned by `streamer.append()` (which is the
* `chat.startStream` response). May be undefined if all appends were
* buffered and the stream was never flushed to Slack in which case there
* is no orphaned message to clean up.
*
* Use this instead of accessing `streamer.streamTs` (private in the SDK).
*/
streamMessageTs?: string;
};
export type StartSlackStreamParams = {
@ -98,8 +108,14 @@ export async function startSlackStream(
// If initial text is provided, send it as the first append which will
// trigger the ChatStreamer to call chat.startStream under the hood.
// The first non-null response is a ChatStartStreamResponse and carries the
// stream message ts — capture it so callers never need to touch the private
// streamer.streamTs field.
if (text) {
await streamer.append({ markdown_text: text });
const response = await streamer.append({ markdown_text: text });
if (response?.ts) {
session.streamMessageTs = response.ts;
}
logVerbose(`slack-stream: appended initial text (${text.length} chars)`);
}
@ -121,7 +137,15 @@ export async function appendSlackStream(params: AppendSlackStreamParams): Promis
return;
}
await session.streamer.append({ markdown_text: text });
// Capture the stream message ts from the first non-null response (the
// ChatStartStreamResponse returned when the SDK flushes and calls
// chat.startStream). Subsequent appends return ChatAppendStreamResponse or
// null (buffered). Once captured we stop checking.
const response = await session.streamer.append({ markdown_text: text });
if (!session.streamMessageTs && response?.ts) {
session.streamMessageTs = response.ts;
}
logVerbose(`slack-stream: appended ${text.length} chars`);
}