Slack streaming: serialize replay cleanup decisions

This commit is contained in:
Vincent Koc 2026-03-12 03:59:40 -04:00
parent bef0b8c8bb
commit 9c8b283f77

View File

@ -31,6 +31,20 @@ function hasMedia(payload: ReplyPayload): boolean {
return Boolean(payload.mediaUrl) || (payload.mediaUrls?.length ?? 0) > 0;
}
export function buildSlackStreamFallbackText(streamedText: string, nextText: string): string {
return streamedText ? `${streamedText}\n${nextText}` : nextText;
}
export function shouldFinalizeSlackStreamBeforePlainPayload(params: {
hasActiveStream: boolean;
payload: ReplyPayload;
}): boolean {
if (!params.hasActiveStream) {
return false;
}
return hasMedia(params.payload) || !params.payload.text?.trim();
}
export function isSlackStreamingEnabled(params: {
mode: "off" | "partial" | "block" | "progress";
nativeStreaming: boolean;
@ -250,9 +264,65 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
replyPlan.markSent();
};
const deleteOrphanedStreamMessage = async (streamMessageTs?: string): Promise<boolean> => {
if (!streamMessageTs) {
return true;
}
try {
await ctx.app.client.chat.delete({
token: ctx.botToken,
channel: message.channel,
ts: streamMessageTs,
});
logVerbose(`slack-stream: deleted orphaned stream message ${streamMessageTs}`);
return true;
} catch (deleteErr) {
logVerbose(
`slack-stream: failed to delete orphaned stream message ${streamMessageTs}: ${String(deleteErr)}`,
);
return false;
}
};
const replayAccumulatedStreamText = async (threadTs?: string): Promise<void> => {
if (!lastStreamPayload || !streamedText) {
return;
}
const fallback: ReplyPayload = Object.assign({}, lastStreamPayload, { text: streamedText });
await deliverNormally(fallback, threadTs);
};
const finalizeActiveStreamBeforePlainPayload = async (): Promise<string | undefined> => {
if (!streamSession || streamFailed) {
return streamSession?.threadTs;
}
const activeStream = streamSession;
const threadTs = activeStream.threadTs;
try {
await stopSlackStream({ session: activeStream });
} catch (err) {
runtime.error?.(danger(`slack-stream: failed to stop stream: ${String(err)}`));
if (await deleteOrphanedStreamMessage(activeStream.streamMessageTs)) {
await replayAccumulatedStreamText(threadTs);
}
} finally {
streamSession = null;
streamedText = "";
lastStreamPayload = null;
}
return threadTs;
};
const deliverWithStreaming = async (payload: ReplyPayload): Promise<void> => {
if (streamFailed || hasMedia(payload) || !payload.text?.trim()) {
await deliverNormally(payload, streamSession?.threadTs);
const text = payload.text?.trim() ?? "";
const forcedThreadTs = shouldFinalizeSlackStreamBeforePlainPayload({
hasActiveStream: Boolean(streamSession),
payload,
})
? await finalizeActiveStreamBeforePlainPayload()
: streamSession?.threadTs;
if (streamFailed || hasMedia(payload) || !text) {
await deliverNormally(payload, forcedThreadTs);
return;
}
// Track the last payload for metadata (thread ts, media, etc.) and
@ -260,7 +330,6 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
// answer rather than only the failing chunk.
lastStreamPayload = payload;
const text = payload.text.trim();
let plannedThreadTs: string | undefined;
try {
if (!streamSession) {
@ -311,37 +380,17 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
// If startStream already created a Slack message, delete it to prevent
// the orphaned stream message from persisting alongside the fallback.
const orphanedTs = streamSession?.streamMessageTs;
if (orphanedTs) {
try {
await ctx.app.client.chat.delete({
token: ctx.botToken,
channel: message.channel,
ts: orphanedTs,
});
logVerbose(`slack-stream: deleted orphaned stream message ${orphanedTs}`);
} catch (deleteErr) {
logVerbose(
`slack-stream: failed to delete orphaned stream message ${orphanedTs}: ${String(deleteErr)}`,
);
}
}
const orphanDeleted = await deleteOrphanedStreamMessage(streamSession?.streamMessageTs);
// Re-deliver the full content: everything already in the stream message
// plus the current payload that failed to append. Using only `payload`
// here would drop all previously-streamed text.
//
// Note: we deliver even when orphan deletion failed. The stream message
// is stuck in "streaming" state (never finalized via chat.stopStream)
// and may not render on mobile Slack — skipping deliverNormally here
// would silently drop content with no later recovery path (the
// finalizer is skipped because streamSession.stopped is already true).
// A cosmetic duplicate on desktop is preferable to a truncated answer.
const fallbackText = streamedText ? `${streamedText}\n${text}` : text;
await deliverNormally(
{ ...payload, text: fallbackText },
streamSession?.threadTs ?? plannedThreadTs,
);
if (orphanDeleted) {
await deliverNormally(
{ ...payload, text: buildSlackStreamFallbackText(streamedText, text) },
streamSession?.threadTs ?? plannedThreadTs,
);
}
}
};
@ -510,47 +559,14 @@ export async function dispatchPreparedSlackMessage(prepared: PreparedSlackMessag
draftStream.stop();
markDispatchIdle();
// -----------------------------------------------------------------------
// Finalize the stream if one was started
// -----------------------------------------------------------------------
const finalStream = streamSession as SlackStreamSession | null;
if (finalStream && !finalStream.stopped) {
// Capture the stream message timestamp before stopping so we can clean up
// if stopSlackStream fails (prevents ghost/duplicate on mobile Slack).
const streamMsgTs = finalStream.streamMessageTs;
try {
await stopSlackStream({ session: finalStream });
} catch (err) {
runtime.error?.(danger(`slack-stream: failed to stop stream: ${String(err)}`));
// If stop failed and a stream message exists, try to delete it so it
// does not persist as a ghost message alongside any fallback delivery.
// Only deliver the fallback when deletion succeeds (or no orphan
// existed) — if deletion fails the stream message is still visible with
// its full content, so sending a normal reply would recreate the
// duplicate this PR prevents.
let orphanDeleted = !streamMsgTs; // trivially "deleted" if nothing to delete
if (streamMsgTs) {
try {
await ctx.app.client.chat.delete({
token: ctx.botToken,
channel: message.channel,
ts: streamMsgTs,
});
logVerbose(
`slack-stream: deleted orphaned stream message ${streamMsgTs} after stop failure`,
);
orphanDeleted = true;
} catch (deleteErr) {
logVerbose(
`slack-stream: failed to delete orphaned stream message ${streamMsgTs}: ${String(deleteErr)}`,
);
}
}
// Fall back to normal delivery with the full accumulated streamed text
// so the user receives the complete answer even when stop() fails.
if (orphanDeleted && lastStreamPayload && streamedText) {
const fallback: ReplyPayload = Object.assign({}, lastStreamPayload, { text: streamedText });
await deliverNormally(fallback, finalStream.threadTs);
if (await deleteOrphanedStreamMessage(finalStream.streamMessageTs)) {
await replayAccumulatedStreamText(finalStream.threadTs);
}
}
}