From 8bd328165268a34f9185f3958ca79027d412f6f0 Mon Sep 17 00:00:00 2001 From: Nora Date: Mon, 9 Mar 2026 23:33:09 +0000 Subject: [PATCH] fix(slack-stream): clean up orphaned messages + track streamMessageTs on session MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Prevents ghost/duplicate messages on mobile Slack when streaming fails. ## Problem When a streaming API call fails mid-stream, the partially-created stream message (sent to Slack via chat.startStream) would persist alongside the fallback normal reply, causing a duplicate on mobile clients. Two issues also existed in the original cleanup approach: 1. streamTs is declared private on ChatStreamer in @slack/web-api — accessing it directly fails TypeScript strict-mode / pnpm check at compile time. 2. When stopSlackStream fails in the finalizer, the orphaned message was deleted but no fallback reply was sent — user gets silence. ## Fix ### src/slack/streaming.ts - Add streamMessageTs?: string to SlackStreamSession. Populated lazily from the first non-null response returned by streamer.append() — which is the ChatStartStreamResponse carrying the stream message ts. Never undefined if a message was actually sent to Slack; undefined means nothing to clean up. - Capture ts in startSlackStream (from the initial append response). - Also backfill in appendSlackStream in case the first append was buffered (text < SDK buffer_size of 256 chars → returns null). ### src/slack/monitor/message-handler/dispatch.ts - On streaming failure: mark stream stopped, delete orphaned message via streamMessageTs (not private streamer.streamTs), then fall back to normal delivery. - On finalizer stopSlackStream failure: delete orphaned message + call deliverNormally(lastStreamPayload) so the user gets a response. - Track lastStreamPayload in outer scope across deliverWithStreaming calls. --- src/slack/monitor/message-handler/dispatch.ts | 54 +++++++++++++++++++ src/slack/streaming.ts | 28 +++++++++- 2 files changed, 80 insertions(+), 2 deletions(-) diff --git a/src/slack/monitor/message-handler/dispatch.ts b/src/slack/monitor/message-handler/dispatch.ts index 029d110f0b9..336f0548c98 100644 --- a/src/slack/monitor/message-handler/dispatch.ts +++ b/src/slack/monitor/message-handler/dispatch.ts @@ -222,6 +222,7 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag }); let streamSession: SlackStreamSession | null = null; let streamFailed = false; + let lastStreamPayload: ReplyPayload | null = null; let usedReplyThreadTs: string | undefined; const deliverNormally = async (payload: ReplyPayload, forcedThreadTs?: string): Promise => { @@ -249,6 +250,9 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag await deliverNormally(payload, streamSession?.threadTs); return; } + // Track the last payload so the stream finalizer can fall back to normal + // delivery if stopSlackStream fails after all content has been streamed. + lastStreamPayload = payload; const text = payload.text.trim(); let plannedThreadTs: string | undefined; @@ -287,6 +291,32 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag danger(`slack-stream: streaming API call failed: ${String(err)}, falling back`), ); streamFailed = true; + + // Mark the stream as stopped so the end-of-dispatch finalizer does not + // call stopSlackStream on the orphaned message (which would finalize it + // and leave a duplicate visible on mobile Slack). + if (streamSession) { + streamSession.stopped = true; + } + + // If startStream already created a Slack message, delete it to prevent + // the orphaned stream message from persisting alongside the fallback. + const orphanedTs = streamSession?.streamMessageTs; + if (orphanedTs) { + try { + await ctx.app.client.chat.delete({ + token: ctx.botToken, + channel: message.channel, + ts: orphanedTs, + }); + logVerbose(`slack-stream: deleted orphaned stream message ${orphanedTs}`); + } catch (deleteErr) { + logVerbose( + `slack-stream: failed to delete orphaned stream message ${orphanedTs}: ${String(deleteErr)}`, + ); + } + } + await deliverNormally(payload, streamSession?.threadTs ?? plannedThreadTs); } }; @@ -461,10 +491,34 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag // ----------------------------------------------------------------------- const finalStream = streamSession as SlackStreamSession | null; if (finalStream && !finalStream.stopped) { + // Capture the stream message timestamp before stopping so we can clean up + // if stopSlackStream fails (prevents ghost/duplicate on mobile Slack). + const streamMsgTs = finalStream.streamMessageTs; try { await stopSlackStream({ session: finalStream }); } catch (err) { runtime.error?.(danger(`slack-stream: failed to stop stream: ${String(err)}`)); + // If stop failed and a stream message exists, try to delete it so it + // does not persist as a ghost message alongside any fallback delivery. + if (streamMsgTs) { + try { + await ctx.app.client.chat.delete({ + token: ctx.botToken, + channel: message.channel, + ts: streamMsgTs, + }); + logVerbose(`slack-stream: deleted orphaned stream message ${streamMsgTs} after stop failure`); + } catch (deleteErr) { + logVerbose( + `slack-stream: failed to delete orphaned stream message ${streamMsgTs}: ${String(deleteErr)}`, + ); + } + } + // Fall back to normal delivery so the user gets a response even when + // the stream could not be finalized. + if (lastStreamPayload) { + await deliverNormally(lastStreamPayload, finalStream.threadTs); + } } } diff --git a/src/slack/streaming.ts b/src/slack/streaming.ts index 936fba79feb..40f16cfd545 100644 --- a/src/slack/streaming.ts +++ b/src/slack/streaming.ts @@ -28,6 +28,16 @@ export type SlackStreamSession = { threadTs: string; /** True once stop() has been called. */ stopped: boolean; + /** + * The Slack message timestamp of the stream message. Populated from the + * first non-null response returned by `streamer.append()` (which is the + * `chat.startStream` response). May be undefined if all appends were + * buffered and the stream was never flushed to Slack — in which case there + * is no orphaned message to clean up. + * + * Use this instead of accessing `streamer.streamTs` (private in the SDK). + */ + streamMessageTs?: string; }; export type StartSlackStreamParams = { @@ -98,8 +108,14 @@ export async function startSlackStream( // If initial text is provided, send it as the first append which will // trigger the ChatStreamer to call chat.startStream under the hood. + // The first non-null response is a ChatStartStreamResponse and carries the + // stream message ts — capture it so callers never need to touch the private + // streamer.streamTs field. if (text) { - await streamer.append({ markdown_text: text }); + const response = await streamer.append({ markdown_text: text }); + if (response?.ts) { + session.streamMessageTs = response.ts; + } logVerbose(`slack-stream: appended initial text (${text.length} chars)`); } @@ -121,7 +137,15 @@ export async function appendSlackStream(params: AppendSlackStreamParams): Promis return; } - await session.streamer.append({ markdown_text: text }); + // Capture the stream message ts from the first non-null response (the + // ChatStartStreamResponse returned when the SDK flushes and calls + // chat.startStream). Subsequent appends return ChatAppendStreamResponse or + // null (buffered). Once captured we stop checking. + const response = await session.streamer.append({ markdown_text: text }); + if (!session.streamMessageTs && response?.ts) { + session.streamMessageTs = response.ts; + } + logVerbose(`slack-stream: appended ${text.length} chars`); }