diff --git a/src/slack/monitor/message-handler/dispatch.ts b/src/slack/monitor/message-handler/dispatch.ts index 09121002a41..1f8015c4bb0 100644 --- a/src/slack/monitor/message-handler/dispatch.ts +++ b/src/slack/monitor/message-handler/dispatch.ts @@ -31,6 +31,20 @@ function hasMedia(payload: ReplyPayload): boolean { return Boolean(payload.mediaUrl) || (payload.mediaUrls?.length ?? 0) > 0; } +export function buildSlackStreamFallbackText(streamedText: string, nextText: string): string { + return streamedText ? `${streamedText}\n${nextText}` : nextText; +} + +export function shouldFinalizeSlackStreamBeforePlainPayload(params: { + hasActiveStream: boolean; + payload: ReplyPayload; +}): boolean { + if (!params.hasActiveStream) { + return false; + } + return hasMedia(params.payload) || !params.payload.text?.trim(); +} + export function isSlackStreamingEnabled(params: { mode: "off" | "partial" | "block" | "progress"; nativeStreaming: boolean; @@ -250,9 +264,65 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag replyPlan.markSent(); }; + const deleteOrphanedStreamMessage = async (streamMessageTs?: string): Promise => { + if (!streamMessageTs) { + return true; + } + try { + await ctx.app.client.chat.delete({ + token: ctx.botToken, + channel: message.channel, + ts: streamMessageTs, + }); + logVerbose(`slack-stream: deleted orphaned stream message ${streamMessageTs}`); + return true; + } catch (deleteErr) { + logVerbose( + `slack-stream: failed to delete orphaned stream message ${streamMessageTs}: ${String(deleteErr)}`, + ); + return false; + } + }; + + const replayAccumulatedStreamText = async (threadTs?: string): Promise => { + if (!lastStreamPayload || !streamedText) { + return; + } + const fallback: ReplyPayload = Object.assign({}, lastStreamPayload, { text: streamedText }); + await deliverNormally(fallback, threadTs); + }; + + const finalizeActiveStreamBeforePlainPayload = async (): Promise => { + if (!streamSession || streamFailed) { + return streamSession?.threadTs; + } + const activeStream = streamSession; + const threadTs = activeStream.threadTs; + try { + await stopSlackStream({ session: activeStream }); + } catch (err) { + runtime.error?.(danger(`slack-stream: failed to stop stream: ${String(err)}`)); + if (await deleteOrphanedStreamMessage(activeStream.streamMessageTs)) { + await replayAccumulatedStreamText(threadTs); + } + } finally { + streamSession = null; + streamedText = ""; + lastStreamPayload = null; + } + return threadTs; + }; + const deliverWithStreaming = async (payload: ReplyPayload): Promise => { - if (streamFailed || hasMedia(payload) || !payload.text?.trim()) { - await deliverNormally(payload, streamSession?.threadTs); + const text = payload.text?.trim() ?? ""; + const forcedThreadTs = shouldFinalizeSlackStreamBeforePlainPayload({ + hasActiveStream: Boolean(streamSession), + payload, + }) + ? await finalizeActiveStreamBeforePlainPayload() + : streamSession?.threadTs; + if (streamFailed || hasMedia(payload) || !text) { + await deliverNormally(payload, forcedThreadTs); return; } // Track the last payload for metadata (thread ts, media, etc.) and @@ -260,7 +330,6 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag // answer rather than only the failing chunk. lastStreamPayload = payload; - const text = payload.text.trim(); let plannedThreadTs: string | undefined; try { if (!streamSession) { @@ -311,37 +380,17 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag // If startStream already created a Slack message, delete it to prevent // the orphaned stream message from persisting alongside the fallback. - const orphanedTs = streamSession?.streamMessageTs; - if (orphanedTs) { - try { - await ctx.app.client.chat.delete({ - token: ctx.botToken, - channel: message.channel, - ts: orphanedTs, - }); - logVerbose(`slack-stream: deleted orphaned stream message ${orphanedTs}`); - } catch (deleteErr) { - logVerbose( - `slack-stream: failed to delete orphaned stream message ${orphanedTs}: ${String(deleteErr)}`, - ); - } - } + const orphanDeleted = await deleteOrphanedStreamMessage(streamSession?.streamMessageTs); // Re-deliver the full content: everything already in the stream message // plus the current payload that failed to append. Using only `payload` // here would drop all previously-streamed text. - // - // Note: we deliver even when orphan deletion failed. The stream message - // is stuck in "streaming" state (never finalized via chat.stopStream) - // and may not render on mobile Slack — skipping deliverNormally here - // would silently drop content with no later recovery path (the - // finalizer is skipped because streamSession.stopped is already true). - // A cosmetic duplicate on desktop is preferable to a truncated answer. - const fallbackText = streamedText ? `${streamedText}\n${text}` : text; - await deliverNormally( - { ...payload, text: fallbackText }, - streamSession?.threadTs ?? plannedThreadTs, - ); + if (orphanDeleted) { + await deliverNormally( + { ...payload, text: buildSlackStreamFallbackText(streamedText, text) }, + streamSession?.threadTs ?? plannedThreadTs, + ); + } } }; @@ -510,47 +559,14 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag draftStream.stop(); markDispatchIdle(); - // ----------------------------------------------------------------------- - // Finalize the stream if one was started - // ----------------------------------------------------------------------- const finalStream = streamSession as SlackStreamSession | null; if (finalStream && !finalStream.stopped) { - // Capture the stream message timestamp before stopping so we can clean up - // if stopSlackStream fails (prevents ghost/duplicate on mobile Slack). - const streamMsgTs = finalStream.streamMessageTs; try { await stopSlackStream({ session: finalStream }); } catch (err) { runtime.error?.(danger(`slack-stream: failed to stop stream: ${String(err)}`)); - // If stop failed and a stream message exists, try to delete it so it - // does not persist as a ghost message alongside any fallback delivery. - // Only deliver the fallback when deletion succeeds (or no orphan - // existed) — if deletion fails the stream message is still visible with - // its full content, so sending a normal reply would recreate the - // duplicate this PR prevents. - let orphanDeleted = !streamMsgTs; // trivially "deleted" if nothing to delete - if (streamMsgTs) { - try { - await ctx.app.client.chat.delete({ - token: ctx.botToken, - channel: message.channel, - ts: streamMsgTs, - }); - logVerbose( - `slack-stream: deleted orphaned stream message ${streamMsgTs} after stop failure`, - ); - orphanDeleted = true; - } catch (deleteErr) { - logVerbose( - `slack-stream: failed to delete orphaned stream message ${streamMsgTs}: ${String(deleteErr)}`, - ); - } - } - // Fall back to normal delivery with the full accumulated streamed text - // so the user receives the complete answer even when stop() fails. - if (orphanDeleted && lastStreamPayload && streamedText) { - const fallback: ReplyPayload = Object.assign({}, lastStreamPayload, { text: streamedText }); - await deliverNormally(fallback, finalStream.threadTs); + if (await deleteOrphanedStreamMessage(finalStream.streamMessageTs)) { + await replayAccumulatedStreamText(finalStream.threadTs); } } }