fix(slack-stream): clean up orphaned messages + track streamMessageTs on session

Prevents ghost/duplicate messages on mobile Slack when streaming fails.

## Problem

When a streaming API call fails mid-stream, the partially-created stream
message (sent to Slack via chat.startStream) would persist alongside the
fallback normal reply, causing a duplicate on mobile clients.

Two issues also existed in the original cleanup approach:
1. streamTs is declared private on ChatStreamer in @slack/web-api — accessing
   it directly fails TypeScript strict-mode / pnpm check at compile time.
2. When stopSlackStream fails in the finalizer, the orphaned message was
   deleted but no fallback reply was sent — user gets silence.

## Fix

### src/slack/streaming.ts
- Add streamMessageTs?: string to SlackStreamSession. Populated lazily from
  the first non-null response returned by streamer.append() — which is the
  ChatStartStreamResponse carrying the stream message ts. Never undefined if
  a message was actually sent to Slack; undefined means nothing to clean up.
- Capture ts in startSlackStream (from the initial append response).
- Also backfill in appendSlackStream in case the first append was buffered
  (text < SDK buffer_size of 256 chars → returns null).

### src/slack/monitor/message-handler/dispatch.ts
- On streaming failure: mark stream stopped, delete orphaned message via
  streamMessageTs (not private streamer.streamTs), then fall back to normal
  delivery.
- On finalizer stopSlackStream failure: delete orphaned message + call
  deliverNormally(lastStreamPayload) so the user gets a response.
- Track lastStreamPayload in outer scope across deliverWithStreaming calls.
This commit is contained in:
Nora 2026-03-09 23:33:09 +00:00 committed by Vincent Koc
parent 9aeaa19e9e
commit 8bd3281652
2 changed files with 80 additions and 2 deletions

View File

@ -222,6 +222,7 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
});
let streamSession: SlackStreamSession | null = null;
let streamFailed = false;
let lastStreamPayload: ReplyPayload | null = null;
let usedReplyThreadTs: string | undefined;
const deliverNormally = async (payload: ReplyPayload, forcedThreadTs?: string): Promise<void> => {
@ -249,6 +250,9 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
await deliverNormally(payload, streamSession?.threadTs);
return;
}
// Track the last payload so the stream finalizer can fall back to normal
// delivery if stopSlackStream fails after all content has been streamed.
lastStreamPayload = payload;
const text = payload.text.trim();
let plannedThreadTs: string | undefined;
@ -287,6 +291,32 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
danger(`slack-stream: streaming API call failed: ${String(err)}, falling back`),
);
streamFailed = true;
// Mark the stream as stopped so the end-of-dispatch finalizer does not
// call stopSlackStream on the orphaned message (which would finalize it
// and leave a duplicate visible on mobile Slack).
if (streamSession) {
streamSession.stopped = true;
}
// If startStream already created a Slack message, delete it to prevent
// the orphaned stream message from persisting alongside the fallback.
const orphanedTs = streamSession?.streamMessageTs;
if (orphanedTs) {
try {
await ctx.app.client.chat.delete({
token: ctx.botToken,
channel: message.channel,
ts: orphanedTs,
});
logVerbose(`slack-stream: deleted orphaned stream message ${orphanedTs}`);
} catch (deleteErr) {
logVerbose(
`slack-stream: failed to delete orphaned stream message ${orphanedTs}: ${String(deleteErr)}`,
);
}
}
await deliverNormally(payload, streamSession?.threadTs ?? plannedThreadTs);
}
};
@ -461,10 +491,34 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
// -----------------------------------------------------------------------
const finalStream = streamSession as SlackStreamSession | null;
if (finalStream && !finalStream.stopped) {
// Capture the stream message timestamp before stopping so we can clean up
// if stopSlackStream fails (prevents ghost/duplicate on mobile Slack).
const streamMsgTs = finalStream.streamMessageTs;
try {
await stopSlackStream({ session: finalStream });
} catch (err) {
runtime.error?.(danger(`slack-stream: failed to stop stream: ${String(err)}`));
// If stop failed and a stream message exists, try to delete it so it
// does not persist as a ghost message alongside any fallback delivery.
if (streamMsgTs) {
try {
await ctx.app.client.chat.delete({
token: ctx.botToken,
channel: message.channel,
ts: streamMsgTs,
});
logVerbose(`slack-stream: deleted orphaned stream message ${streamMsgTs} after stop failure`);
} catch (deleteErr) {
logVerbose(
`slack-stream: failed to delete orphaned stream message ${streamMsgTs}: ${String(deleteErr)}`,
);
}
}
// Fall back to normal delivery so the user gets a response even when
// the stream could not be finalized.
if (lastStreamPayload) {
await deliverNormally(lastStreamPayload, finalStream.threadTs);
}
}
}

View File

@ -28,6 +28,16 @@ export type SlackStreamSession = {
threadTs: string;
/** True once stop() has been called. */
stopped: boolean;
/**
* The Slack message timestamp of the stream message. Populated from the
* first non-null response returned by `streamer.append()` (which is the
* `chat.startStream` response). May be undefined if all appends were
* buffered and the stream was never flushed to Slack in which case there
* is no orphaned message to clean up.
*
* Use this instead of accessing `streamer.streamTs` (private in the SDK).
*/
streamMessageTs?: string;
};
export type StartSlackStreamParams = {
@ -98,8 +108,14 @@ export async function startSlackStream(
// If initial text is provided, send it as the first append which will
// trigger the ChatStreamer to call chat.startStream under the hood.
// The first non-null response is a ChatStartStreamResponse and carries the
// stream message ts — capture it so callers never need to touch the private
// streamer.streamTs field.
if (text) {
await streamer.append({ markdown_text: text });
const response = await streamer.append({ markdown_text: text });
if (response?.ts) {
session.streamMessageTs = response.ts;
}
logVerbose(`slack-stream: appended initial text (${text.length} chars)`);
}
@ -121,7 +137,15 @@ export async function appendSlackStream(params: AppendSlackStreamParams): Promis
return;
}
await session.streamer.append({ markdown_text: text });
// Capture the stream message ts from the first non-null response (the
// ChatStartStreamResponse returned when the SDK flushes and calls
// chat.startStream). Subsequent appends return ChatAppendStreamResponse or
// null (buffered). Once captured we stop checking.
const response = await session.streamer.append({ markdown_text: text });
if (!session.streamMessageTs && response?.ts) {
session.streamMessageTs = response.ts;
}
logVerbose(`slack-stream: appended ${text.length} chars`);
}