From 7ed3db579f2191b252c0d366761498e430156c20 Mon Sep 17 00:00:00 2001 From: teconomix Date: Wed, 18 Mar 2026 12:02:31 +0000 Subject: [PATCH] fix(mattermost): make streaming opt-in; apply textLimit to patches; handle reply target divergence Three fixes addressing Codex review feedback: 1. Streaming opt-in: change streamingEnabled from (blockStreaming !== false) to (blockStreaming === true) so accounts without an explicit blockStreaming setting preserve their agent blockStreamingDefault instead of having edit-in-place streaming silently enabled. 2. Text limit: apply textLimit truncation in schedulePatch and flushPendingPatch before sending/patching. Intermediate preview posts only need the first chunk; final delivery goes through deliverMattermostReplyPayload which applies full chunking. This prevents oversize patch loops when responses exceed the limit. 3. Reply target divergence: when the final payload carries an explicit replyToId that resolves to a different root than the streaming post was created under (e.g. a [[reply_to_current]] directive), skip the in-place patch and fall through to normal delivery so the reply lands in the correct thread. Any orphaned stream post is deleted before the correct reply is sent. --- .../mattermost/src/mattermost/monitor.ts | 52 +++++++++++++++---- 1 file changed, 41 insertions(+), 11 deletions(-) diff --git a/extensions/mattermost/src/mattermost/monitor.ts b/extensions/mattermost/src/mattermost/monitor.ts index a4c1f43d7a5..ad2aa5bdb69 100644 --- a/extensions/mattermost/src/mattermost/monitor.ts +++ b/extensions/mattermost/src/mattermost/monitor.ts @@ -1411,7 +1411,10 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} let patchSending = false; // prevents concurrent network calls const STREAM_PATCH_INTERVAL_MS = 200; - const streamingEnabled = account.blockStreaming !== false; + // Edit-in-place streaming is opt-in: only activate when blockStreaming is + // explicitly true. When the config key is absent (undefined) we leave the + // agent's blockStreamingDefault in place and do not inject onPartialReply. + const streamingEnabled = account.blockStreaming === true; const blockStreamingClient = streamingEnabled && baseUrl && botToken ? createMattermostClient({ baseUrl, botToken }) @@ -1436,8 +1439,11 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} while (patchSending && Date.now() < deadline) { await new Promise((r) => setTimeout(r, 20)); } - const text = pendingPatchText; - if (!text || text === lastSentText) return; + const rawText = pendingPatchText; + if (!rawText || rawText === lastSentText) return; + // Truncate to textLimit so intermediate patches never exceed the server limit. + // Final delivery applies full chunking; streaming posts only need the first chunk. + const text = rawText.length > textLimit ? rawText.slice(0, textLimit) : rawText; if (!streamMessageId) { try { const result = await sendMessageMattermost(to, text, { @@ -1445,7 +1451,7 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} replyToId: effectiveReplyToId, }); streamMessageId = result.messageId; - lastSentText = text; + lastSentText = rawText; runtime.log?.(`stream-patch started ${streamMessageId}`); } catch (err) { logVerboseMessage(`mattermost stream-patch flush send failed: ${String(err)}`); @@ -1456,7 +1462,7 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} postId: streamMessageId, message: text, }); - lastSentText = text; + lastSentText = rawText; runtime.log?.(`stream-patch flushed ${streamMessageId}`); } catch (err) { logVerboseMessage(`mattermost stream-patch flush failed: ${String(err)}`); @@ -1469,8 +1475,10 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} pendingPatchText = fullText; if (patchInterval) return; patchInterval = setInterval(() => { - const text = pendingPatchText; - if (!text || text === lastSentText || patchSending) return; + const rawText = pendingPatchText; + if (!rawText || rawText === lastSentText || patchSending) return; + // Truncate to textLimit so intermediate patches never exceed the server limit. + const text = rawText.length > textLimit ? rawText.slice(0, textLimit) : rawText; patchSending = true; void (async () => { try { @@ -1481,7 +1489,7 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} replyToId: effectiveReplyToId, }); streamMessageId = result.messageId; - lastSentText = text; + lastSentText = rawText; runtime.log?.(`stream-patch started ${streamMessageId}`); } catch (err) { logVerboseMessage(`mattermost stream-patch send failed: ${String(err)}`); @@ -1492,7 +1500,7 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} postId: streamMessageId, message: text, }); - lastSentText = text; + lastSentText = rawText; runtime.log?.(`stream-patch edited ${streamMessageId}`); } catch (err) { logVerboseMessage(`mattermost stream-patch edit failed: ${String(err)}`); @@ -1521,7 +1529,17 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} // Final + streaming active: patch the streamed message with authoritative // complete text, or fall back to a new message (with orphan cleanup). - if (isFinal && streamMessageId && payload.text) { + // If the final payload carries an explicit replyToId that differs from + // the one the streaming post was created under, skip the in-place patch + // and fall through to normal delivery so the reply lands in the right thread. + const finalReplyToId = resolveMattermostReplyRootId({ + threadRootId: effectiveReplyToId, + replyToId: payload.replyToId, + }); + const streamReplyToId = effectiveReplyToId; + const replyTargetDiverged = + finalReplyToId !== streamReplyToId && payload.replyToId != null; + if (isFinal && streamMessageId && payload.text && !replyTargetDiverged) { const text = core.channel.text.convertMarkdownTables(payload.text, tableMode); try { await patchMattermostPost(blockStreamingClient!, { @@ -1567,7 +1585,19 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} if (isFinal) { stopPatchInterval(); - streamMessageId = null; + // If the reply target diverged and we have an orphaned stream post, + // attempt to delete it before normal delivery creates the correct post. + if (replyTargetDiverged && streamMessageId) { + const orphanId = streamMessageId; + streamMessageId = null; + try { + await deleteMattermostPost(blockStreamingClient!, orphanId); + } catch { + // Ignore — delivering to the correct thread takes priority. + } + } else { + streamMessageId = null; + } pendingPatchText = ""; lastSentText = ""; patchSending = false;