diff --git a/extensions/mattermost/src/mattermost/client.ts b/extensions/mattermost/src/mattermost/client.ts index c514160590f..acb489cd03c 100644 --- a/extensions/mattermost/src/mattermost/client.ts +++ b/extensions/mattermost/src/mattermost/client.ts @@ -533,3 +533,29 @@ export async function uploadMattermostFile( } return info; } + +/** + * Update an existing Mattermost post (partial patch). + * Requires edit_post (own) or edit_others_posts permission. + */ +export async function patchMattermostPost( + client: MattermostClient, + params: { postId: string; message: string }, +): Promise { + await client.request(`/posts/${params.postId}/patch`, { + method: "PUT", + body: JSON.stringify({ message: params.message }), + }); +} + +/** + * Delete a Mattermost post (soft delete). + */ +export async function deleteMattermostPost( + client: MattermostClient, + postId: string, +): Promise { + await client.request(`/posts/${postId}`, { + method: "DELETE", + }); +} diff --git a/extensions/mattermost/src/mattermost/monitor.ts b/extensions/mattermost/src/mattermost/monitor.ts index 958a40de705..9e1422ab3f5 100644 --- a/extensions/mattermost/src/mattermost/monitor.ts +++ b/extensions/mattermost/src/mattermost/monitor.ts @@ -35,7 +35,10 @@ import { fetchMattermostChannel, fetchMattermostMe, fetchMattermostUser, + fetchMattermostUserTeams, + deleteMattermostPost, normalizeMattermostBaseUrl, + patchMattermostPost, sendMattermostTyping, updateMattermostPost, type MattermostChannel, @@ -1395,12 +1398,312 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} }, }, }); + // ── P4: Edit-in-place streaming state ────────────────────────────────────── + // Uses onPartialReply (full cumulative text each tick) + 200ms throttled + // setInterval to progressively edit a single Mattermost post in-place. + let streamMessageId: string | null = null; + let pendingPatchText = ""; + let lastSentText = ""; + let patchInterval: ReturnType | null = null; + let patchSending = false; // prevents concurrent network calls + // Tracks the currently in-flight sendMessageMattermost / patchMattermostPost + // promise so that onSettled can await it directly rather than busy-waiting. + let patchInflight: Promise | null = null; + // Latches true after the first send/edit failure to prevent the interval + // from being re-armed by a later onPartialReply call (ID=2964357928). + let previewSendFailed = false; + const STREAM_PATCH_INTERVAL_MS = 200; + + // Edit-in-place streaming is opt-in: only activate when blockStreaming is + // explicitly true. When the config key is absent (undefined) we leave the + // agent's blockStreamingDefault in place and do not inject onPartialReply. + const streamingEnabled = account.blockStreaming === true; + const blockStreamingClient = + streamingEnabled && baseUrl && botToken + ? createMattermostClient({ baseUrl, botToken }) + : null; + + const stopPatchInterval = () => { + if (patchInterval) { + clearInterval(patchInterval); + patchInterval = null; + } + }; + + const flushPendingPatch = async () => { + stopPatchInterval(); + if (!blockStreamingClient) return; + // Await the in-flight promise directly so we never miss a late-resolving + // POST/PATCH — the busy-wait on patchSending had a race where patchSending + // could clear before the network request settled (ID=2965256849). + if (patchInflight) { + await patchInflight.catch(() => {}); + } + const rawText = pendingPatchText; + if (!rawText) return; + // Truncate to textLimit so intermediate patches never exceed the server limit. + // Final delivery applies full chunking; streaming posts only need the first chunk. + const text = rawText.length > textLimit ? rawText.slice(0, textLimit) : rawText; + // Guard on the truncated text so long replies (past textLimit) do not keep + // re-patching with the same truncated content every 200 ms and hit rate limits. + if (text === lastSentText) return; + if (!streamMessageId) { + try { + const result = await sendMessageMattermost(to, text, { + accountId: account.accountId, + replyToId: effectiveReplyToId, + }); + streamMessageId = result.messageId; + lastSentText = text; + runtime.log?.(`stream-patch started ${streamMessageId}`); + } catch (err) { + logVerboseMessage(`mattermost stream-patch flush send failed: ${String(err)}`); + } + } else { + try { + await patchMattermostPost(blockStreamingClient, { + postId: streamMessageId, + message: text, + }); + lastSentText = text; + runtime.log?.(`stream-patch flushed ${streamMessageId}`); + } catch (err) { + logVerboseMessage(`mattermost stream-patch flush failed: ${String(err)}`); + } + } + }; + + const schedulePatch = (fullText: string) => { + if (!blockStreamingClient) return; + // Do not re-arm if a permanent send/edit failure has been latched. + if (previewSendFailed) return; + pendingPatchText = fullText; + if (patchInterval) return; + patchInterval = setInterval(() => { + const rawText = pendingPatchText; + if (!rawText || patchSending) return; + // Truncate to textLimit so intermediate patches never exceed the server limit. + const text = rawText.length > textLimit ? rawText.slice(0, textLimit) : rawText; + // Guard on the truncated text so long replies (past textLimit) do not keep + // re-patching with the same truncated content every 200 ms and hit rate limits. + if (text === lastSentText) return; + patchSending = true; + const runTick = async () => { + try { + if (!streamMessageId) { + try { + const result = await sendMessageMattermost(to, text, { + accountId: account.accountId, + replyToId: effectiveReplyToId, + }); + streamMessageId = result.messageId; + lastSentText = text; + runtime.log?.(`stream-patch started ${streamMessageId}`); + } catch (err) { + logVerboseMessage(`mattermost stream-patch send failed: ${String(err)}`); + // Latch the failure so schedulePatch() does not re-arm the interval + // on subsequent onPartialReply calls (which would retry indefinitely). + previewSendFailed = true; + stopPatchInterval(); + } + } else { + try { + await patchMattermostPost(blockStreamingClient, { + postId: streamMessageId, + message: text, + }); + lastSentText = text; + runtime.log?.(`stream-patch edited ${streamMessageId}`); + } catch (err) { + logVerboseMessage(`mattermost stream-patch edit failed: ${String(err)}`); + // Latch the failure so schedulePatch() does not re-arm the interval + // on subsequent onPartialReply calls. + previewSendFailed = true; + stopPatchInterval(); + } + } + } finally { + patchSending = false; + } + }; + const inflightRun = runTick(); + patchInflight = inflightRun; + inflightRun.finally(() => { + if (patchInflight === inflightRun) patchInflight = null; + }); + void inflightRun; + }, STREAM_PATCH_INTERVAL_MS); + }; + // ── End P4 streaming state ──────────────────────────────────────────────── + const { dispatcher, replyOptions, markDispatchIdle } = core.channel.reply.createReplyDispatcherWithTyping({ ...replyPipeline, humanDelay: core.channel.reply.resolveHumanDelayConfig(cfg, route.agentId), typingCallbacks, - deliver: async (payload: ReplyPayload) => { + deliver: async (payload: ReplyPayload, info) => { + const isFinal = info.kind === "final"; + + // Compute reply target divergence before flushing, so we don't + // accidentally create a preview post in the wrong thread on flush. + // Compute the reply target for this payload. When payload.replyToId resolves + // to a different Mattermost thread root than effectiveReplyToId, we must not + // patch the preview in-place (different thread). + // + // Both sides go through resolveMattermostReplyRootId so that child-post IDs + // (e.g. from [[reply_to_current]] inside a thread) are normalized to the same + // thread root as effectiveReplyToId before comparing. Raw payload.replyToId + // comparison was wrong: [[reply_to_current]] sets a child-post id that differs + // from the thread root but resolves to the same thread (ID=2965488514). + const finalReplyToId = resolveMattermostReplyRootId({ + threadRootId: effectiveReplyToId, + replyToId: payload.replyToId, + }); + const baseReplyToId = resolveMattermostReplyRootId({ + threadRootId: effectiveReplyToId, + }); + const replyTargetDiverged = + payload.replyToId != null && + payload.replyToId.trim() !== "" && + finalReplyToId !== baseReplyToId; + + if (isFinal && blockStreamingClient) { + if (replyTargetDiverged) { + // Divergent target: don't flush (we don't want to create a preview in + // the wrong thread), but do stop the interval and wait for any in-flight + // tick/send to settle. This ensures streamMessageId is populated if the + // first sendMessageMattermost resolves during this window, so the orphan + // cleanup below can capture and delete it. + stopPatchInterval(); + if (patchInflight) { + await patchInflight.catch(() => {}); + } + } else { + // Same thread: flush pending patches normally. + await flushPendingPatch(); + } + } + + // Final + streaming active: patch the streamed message with authoritative + // complete text, or fall back to a new message (with orphan cleanup). + // (When replyTargetDiverged the preview is cleaned up further below.) + if (isFinal && streamMessageId && payload.text && !replyTargetDiverged) { + const hasMedia = payload.mediaUrls?.length || payload.mediaUrl; + // When the payload also has media, skip the in-place patch: patching + // the preview with only the text and then posting captionless media + // separately splits a single captioned-file reply into two posts. + // Fall through to deliverMattermostReplyPayload which sends + // text+media together in the correct Mattermost format (ID=2965096969). + if (!hasMedia) { + const text = core.channel.text.convertMarkdownTables(payload.text, tableMode); + try { + await patchMattermostPost(blockStreamingClient!, { + postId: streamMessageId, + message: text, + }); + runtime.log?.(`stream-patch final edit ${streamMessageId}`); + // Successful text-only patch. Reset streaming state and return. + streamMessageId = null; + pendingPatchText = ""; + lastSentText = ""; + patchSending = false; + return; + } catch (err) { + logVerboseMessage( + `mattermost stream-patch final edit failed: ${String(err)}, sending new message`, + ); + // Fall through to deliverMattermostReplyPayload below. + } + } + // Media payload or patch failure: deliver the full payload via the + // normal path (handles text+media together, chunking, etc.). + // Delete the preview only after successful delivery. + const orphanId = streamMessageId; + streamMessageId = null; + pendingPatchText = ""; + lastSentText = ""; + patchSending = false; + await deliverMattermostReplyPayload({ + core, + cfg, + payload, + to, + accountId: account.accountId, + agentId: route.agentId, + replyToId: resolveMattermostReplyRootId({ + threadRootId: effectiveReplyToId, + replyToId: payload.replyToId, + }), + textLimit, + tableMode, + sendMessage: sendMessageMattermost, + }); + // Delivery succeeded — clean up the orphaned preview. + if (orphanId) { + void deleteMattermostPost(blockStreamingClient!, orphanId).catch(() => {}); + } + return; + } + + if (isFinal) { + stopPatchInterval(); + // Capture and clear the stream ID so normal delivery below can proceed. + const orphanedStreamId = streamMessageId; + streamMessageId = null; + pendingPatchText = ""; + lastSentText = ""; + patchSending = false; + + if (replyTargetDiverged && orphanedStreamId) { + // Divergent target: deliver to the correct thread first, then clean + // up the orphan. If delivery fails the user keeps the partial preview. + await deliverMattermostReplyPayload({ + core, + cfg, + payload, + to, + accountId: account.accountId, + agentId: route.agentId, + replyToId: finalReplyToId, + textLimit, + tableMode, + sendMessage: sendMessageMattermost, + }); + try { + await deleteMattermostPost(blockStreamingClient!, orphanedStreamId); + } catch { + // Ignore — the complete message was already delivered. + } + return; + } + + if (orphanedStreamId && !payload.text) { + // Media-only final with a streamed text-preview: deliver media first, + // then delete the orphaned preview so users don't see stale partial + // text alongside the attachment. If delivery fails, the preview stays. + await deliverMattermostReplyPayload({ + core, + cfg, + payload, + to, + accountId: account.accountId, + agentId: route.agentId, + replyToId: finalReplyToId, + textLimit, + tableMode, + sendMessage: sendMessageMattermost, + }); + try { + await deleteMattermostPost(blockStreamingClient!, orphanedStreamId); + } catch { + // Ignore — media was already delivered. + } + return; + } + // Otherwise fall through to normal delivery. + } + + // Normal delivery — streaming not active or non-final partial. await deliverMattermostReplyPayload({ core, cfg, @@ -1427,6 +1730,36 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} dispatcher, onSettled: () => { markDispatchIdle(); + // Clean up any streaming preview that was never finalized — this happens when + // the reply pipeline produces no final payload (e.g. messaging-tool sends that + // are suppressed, or empty/heartbeat responses). Without this, onPartialReply + // can create a Mattermost post that is never deleted or patched with final text. + // + // We must also handle the race where the first preview POST is still in flight + // (patchSending=true, streamMessageId=null): stopPatchInterval() prevents new + // ticks, and the async cleanup waits for patchSending to clear so it can capture + // the messageId that the in-flight send will set. + if ((streamMessageId || patchSending || patchInterval) && blockStreamingClient) { + stopPatchInterval(); + pendingPatchText = ""; + lastSentText = ""; + const client = blockStreamingClient; + void (async () => { + // Await the in-flight promise directly so we never miss a late-resolving + // POST — a 3s timeout would race on slow Mattermost links (ID=2964616785). + if (patchInflight) { + await patchInflight.catch(() => {}); + } + patchSending = false; + const orphanId = streamMessageId; + streamMessageId = null; + if (orphanId) { + await deleteMattermostPost(client, orphanId).catch(() => { + // Best-effort — the run is already complete. + }); + } + })(); + } }, run: () => core.channel.reply.dispatchReplyFromConfig({ @@ -1435,8 +1768,22 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} dispatcher, replyOptions: { ...replyOptions, - disableBlockStreaming: - typeof account.blockStreaming === "boolean" ? !account.blockStreaming : undefined, + // Use onPartialReply (full cumulative text) for edit-in-place streaming. + onPartialReply: blockStreamingClient + ? (payload: ReplyPayload) => { + const rawText = payload.text ?? ""; + const fullText = core.channel.text.convertMarkdownTables(rawText, tableMode); + if (fullText) { + schedulePatch(fullText); + } + } + : undefined, + // Disable core block streaming since we handle streaming via onPartialReply. + disableBlockStreaming: blockStreamingClient + ? true + : typeof account.blockStreaming === "boolean" + ? !account.blockStreaming + : undefined, onModelSelected, }, }),