From 9f29865224005801cadd061b3eae02099f3eaa20 Mon Sep 17 00:00:00 2001 From: teconomix Date: Sat, 14 Mar 2026 03:27:43 +0000 Subject: [PATCH 01/18] feat(mattermost): block streaming edit-in-place (v3.12 rebase) Rebased onto v3.12 main. Upstream extracted deliver logic to reply-delivery.ts, so streaming now wraps deliverMattermostReplyPayload() instead of replacing the inline deliver body. Changes: - client.ts: add patchMattermostPost() + deleteMattermostPost() API helpers - monitor.ts: inject streaming state (schedulePatch, flushPendingPatch, setInterval) before main inbound createReplyDispatcherWithTyping only (3 dispatch paths exist, only main handler gets streaming via unique humanDelay+typingCallbacks anchor) - monitor.ts: upgrade deliver signature to (payload, info) for isFinal detection - monitor.ts: wrap deliverMattermostReplyPayload() with isFinal streaming logic (final+streaming: patch in-place or fallback; non-streaming: delegate to helper) - monitor.ts: add onPartialReply + disableBlockStreaming override in replyOptions pnpm check: no new errors introduced (pre-existing errors on main unrelated to this PR) Fixes: https://github.com/openclaw/openclaw/issues/XXXX PR: https://github.com/openclaw/openclaw/pull/33506 --- .../mattermost/src/mattermost/client.ts | 45 +++++ .../mattermost/src/mattermost/monitor.ts | 179 +++++++++++++++++- 2 files changed, 219 insertions(+), 5 deletions(-) diff --git a/extensions/mattermost/src/mattermost/client.ts b/extensions/mattermost/src/mattermost/client.ts index c514160590f..9bbf4629468 100644 --- a/extensions/mattermost/src/mattermost/client.ts +++ b/extensions/mattermost/src/mattermost/client.ts @@ -533,3 +533,48 @@ export async function uploadMattermostFile( } return info; } + +/** + * Update an existing Mattermost post (partial patch). + * Requires edit_post (own) or edit_others_posts permission. + */ +export async function patchMattermostPost( + client: MattermostClient, + params: { postId: string; message: string }, +): Promise { + const res = await fetch(`${client.apiBaseUrl}/posts/${params.postId}/patch`, { + method: "PUT", + headers: { + Authorization: `Bearer ${client.token}`, + "Content-Type": "application/json", + }, + body: JSON.stringify({ message: params.message }), + }); + if (!res.ok) { + const detail = await readMattermostError(res); + throw new Error( + `Mattermost patch post ${res.status} ${res.statusText}: ${detail || "unknown error"}`, + ); + } +} + +/** + * Delete a Mattermost post (soft delete). + */ +export async function deleteMattermostPost( + client: MattermostClient, + postId: string, +): Promise { + const res = await fetch(`${client.apiBaseUrl}/posts/${postId}`, { + method: "DELETE", + headers: { + Authorization: `Bearer ${client.token}`, + }, + }); + if (!res.ok) { + const detail = await readMattermostError(res); + throw new Error( + `Mattermost delete post ${res.status} ${res.statusText}: ${detail || "unknown error"}`, + ); + } +} diff --git a/extensions/mattermost/src/mattermost/monitor.ts b/extensions/mattermost/src/mattermost/monitor.ts index 4cd74216811..b187e0cd44b 100644 --- a/extensions/mattermost/src/mattermost/monitor.ts +++ b/extensions/mattermost/src/mattermost/monitor.ts @@ -36,7 +36,10 @@ import { fetchMattermostChannel, fetchMattermostMe, fetchMattermostUser, + fetchMattermostUserTeams, + deleteMattermostPost, normalizeMattermostBaseUrl, + patchMattermostPost, sendMattermostTyping, updateMattermostPost, type MattermostChannel, @@ -515,7 +518,7 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} replyOptions: { ...replyOptions, disableBlockStreaming: - typeof account.blockStreaming === "boolean" ? !account.blockStreaming : undefined, + typeof account.blockStreaming === "boolean" ? !account.blockStreaming : false, onModelSelected, }, }); @@ -728,7 +731,7 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} replyOptions: { ...replyOptions, disableBlockStreaming: - typeof account.blockStreaming === "boolean" ? !account.blockStreaming : undefined, + typeof account.blockStreaming === "boolean" ? !account.blockStreaming : false, onModelSelected, }, }), @@ -1398,12 +1401,164 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} }); }, }); + // ── P4: Edit-in-place streaming state ────────────────────────────────────── + // Uses onPartialReply (full cumulative text each tick) + 200ms throttled + // setInterval to progressively edit a single Mattermost post in-place. + let streamMessageId: string | null = null; + let pendingPatchText = ""; + let lastSentText = ""; + let patchInterval: ReturnType | null = null; + let patchSending = false; // prevents concurrent network calls + const STREAM_PATCH_INTERVAL_MS = 200; + + const streamingEnabled = account.blockStreaming !== false; + const blockStreamingClient = + streamingEnabled && baseUrl && botToken + ? createMattermostClient({ baseUrl, botToken }) + : null; + + const stopPatchInterval = () => { + if (patchInterval) { + clearInterval(patchInterval); + patchInterval = null; + } + }; + + const flushPendingPatch = async () => { + stopPatchInterval(); + if (!blockStreamingClient) return; + const text = pendingPatchText; + if (!text || text === lastSentText) return; + lastSentText = text; + if (!streamMessageId) { + try { + const result = await sendMessageMattermost(to, text, { + accountId: account.accountId, + replyToId: effectiveReplyToId, + }); + streamMessageId = result.messageId; + runtime.log?.(`stream-patch started ${streamMessageId}`); + } catch (err) { + logVerboseMessage(`mattermost stream-patch flush send failed: ${String(err)}`); + } + } else { + try { + await patchMattermostPost(blockStreamingClient, { + postId: streamMessageId, + message: text, + }); + runtime.log?.(`stream-patch flushed ${streamMessageId}`); + } catch (err) { + logVerboseMessage(`mattermost stream-patch flush failed: ${String(err)}`); + } + } + }; + + const schedulePatch = (fullText: string) => { + if (!blockStreamingClient) return; + pendingPatchText = fullText; + if (patchInterval) return; + patchInterval = setInterval(() => { + const text = pendingPatchText; + if (!text || text === lastSentText || patchSending) return; + lastSentText = text; + patchSending = true; + void (async () => { + try { + if (!streamMessageId) { + try { + const result = await sendMessageMattermost(to, text, { + accountId: account.accountId, + replyToId: effectiveReplyToId, + }); + streamMessageId = result.messageId; + runtime.log?.(`stream-patch started ${streamMessageId}`); + } catch (err) { + logVerboseMessage(`mattermost stream-patch send failed: ${String(err)}`); + } + } else { + try { + await patchMattermostPost(blockStreamingClient, { + postId: streamMessageId, + message: text, + }); + runtime.log?.(`stream-patch edited ${streamMessageId}`); + } catch (err) { + logVerboseMessage(`mattermost stream-patch edit failed: ${String(err)}`); + } + } + } finally { + patchSending = false; + } + })(); + }, STREAM_PATCH_INTERVAL_MS); + }; + // ── End P4 streaming state ──────────────────────────────────────────────── + const { dispatcher, replyOptions, markDispatchIdle } = core.channel.reply.createReplyDispatcherWithTyping({ ...prefixOptions, humanDelay: core.channel.reply.resolveHumanDelayConfig(cfg, route.agentId), typingCallbacks, - deliver: async (payload: ReplyPayload) => { + deliver: async (payload: ReplyPayload, info) => { + const isFinal = info.kind === "final"; + + // Flush any pending partial-reply patch before final delivery. + if (isFinal && blockStreamingClient) { + await flushPendingPatch(); + } + + // Final + streaming active: patch the streamed message with authoritative + // complete text, or fall back to a new message (with orphan cleanup). + if (isFinal && streamMessageId && payload.text) { + const text = core.channel.text.convertMarkdownTables(payload.text, tableMode); + try { + await patchMattermostPost(blockStreamingClient!, { + postId: streamMessageId, + message: text, + }); + runtime.log?.(`stream-patch final edit ${streamMessageId}`); + } catch (err) { + logVerboseMessage( + `mattermost stream-patch final edit failed: ${String(err)}, sending new message`, + ); + const orphanId = streamMessageId; + streamMessageId = null; + try { + await deleteMattermostPost(blockStreamingClient!, orphanId); + } catch { + // Ignore delete failure — delivering the complete message takes priority + } + await deliverMattermostReplyPayload({ + core, + cfg, + payload, + to, + accountId: account.accountId, + agentId: route.agentId, + replyToId: resolveMattermostReplyRootId({ + threadRootId: effectiveReplyToId, + replyToId: payload.replyToId, + }), + textLimit, + tableMode, + sendMessage: sendMessageMattermost, + }); + return; + } + streamMessageId = null; + return; + } + + if (isFinal) { + stopPatchInterval(); + streamMessageId = null; + pendingPatchText = ""; + lastSentText = ""; + patchSending = false; + } + + // Normal delivery — streaming not active or non-final partial. await deliverMattermostReplyPayload({ core, cfg, @@ -1438,8 +1593,22 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} dispatcher, replyOptions: { ...replyOptions, - disableBlockStreaming: - typeof account.blockStreaming === "boolean" ? !account.blockStreaming : undefined, + // Use onPartialReply (full cumulative text) for edit-in-place streaming. + onPartialReply: blockStreamingClient + ? (payload: ReplyPayload) => { + const rawText = payload.text ?? ""; + const fullText = core.channel.text.convertMarkdownTables(rawText, tableMode); + if (fullText) { + schedulePatch(fullText); + } + } + : undefined, + // Disable core block streaming since we handle streaming via onPartialReply. + disableBlockStreaming: blockStreamingClient + ? true + : typeof account.blockStreaming === "boolean" + ? !account.blockStreaming + : false, onModelSelected, }, }), From ec10de7361e99d32eed200c551f59b975dde2166 Mon Sep 17 00:00:00 2001 From: teconomix Date: Sat, 14 Mar 2026 06:04:44 +0000 Subject: [PATCH 02/18] fix(mattermost): inherit agent block-streaming default in button/model-picker paths When account.blockStreaming is unset, pass undefined instead of false for disableBlockStreaming so downstream get-reply-directives inherits the agent-level default rather than forcing block streaming on. Affected paths: button-click interactions (handleInteractiveMenuInteraction) and model picker confirmations (handleModelPickerInteraction). slash-http.ts already used undefined correctly; this brings monitor.ts into alignment. Addresses Codex P2 review: 'Preserve inherited block-streaming default for model picker replies' and 'Preserve default block-streaming behavior for button replies'. --- extensions/mattermost/src/mattermost/monitor.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/extensions/mattermost/src/mattermost/monitor.ts b/extensions/mattermost/src/mattermost/monitor.ts index b187e0cd44b..d374ba60dc8 100644 --- a/extensions/mattermost/src/mattermost/monitor.ts +++ b/extensions/mattermost/src/mattermost/monitor.ts @@ -518,7 +518,7 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} replyOptions: { ...replyOptions, disableBlockStreaming: - typeof account.blockStreaming === "boolean" ? !account.blockStreaming : false, + typeof account.blockStreaming === "boolean" ? !account.blockStreaming : undefined, onModelSelected, }, }); @@ -731,7 +731,7 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} replyOptions: { ...replyOptions, disableBlockStreaming: - typeof account.blockStreaming === "boolean" ? !account.blockStreaming : false, + typeof account.blockStreaming === "boolean" ? !account.blockStreaming : undefined, onModelSelected, }, }), From ee6d9849503c811f5f92407d584286cf2caa367f Mon Sep 17 00:00:00 2001 From: teconomix Date: Sat, 14 Mar 2026 06:13:08 +0000 Subject: [PATCH 03/18] fix(mattermost): use client.request in patchMattermostPost and deleteMattermostPost Both functions called the global fetch directly, bypassing the fetchImpl stored in the client closure. This silently ignored any custom fetch implementation passed to createMattermostClient (test mocks, proxy-aware fetchers, SSRF guards). Switch both to client.request() which uses fetchImpl, auto-injects the Authorization header, handles Content-Type for JSON bodies, and propagates errors consistently with every other client function. uploadMattermostFile retains its direct fetch call (multipart/form-data conflicts with request's automatic Content-Type injection). Addresses Greptile review: 'New functions bypass client.request, ignoring custom fetchImpl'. --- .../mattermost/src/mattermost/client.ts | 23 ++----------------- 1 file changed, 2 insertions(+), 21 deletions(-) diff --git a/extensions/mattermost/src/mattermost/client.ts b/extensions/mattermost/src/mattermost/client.ts index 9bbf4629468..acb489cd03c 100644 --- a/extensions/mattermost/src/mattermost/client.ts +++ b/extensions/mattermost/src/mattermost/client.ts @@ -542,20 +542,10 @@ export async function patchMattermostPost( client: MattermostClient, params: { postId: string; message: string }, ): Promise { - const res = await fetch(`${client.apiBaseUrl}/posts/${params.postId}/patch`, { + await client.request(`/posts/${params.postId}/patch`, { method: "PUT", - headers: { - Authorization: `Bearer ${client.token}`, - "Content-Type": "application/json", - }, body: JSON.stringify({ message: params.message }), }); - if (!res.ok) { - const detail = await readMattermostError(res); - throw new Error( - `Mattermost patch post ${res.status} ${res.statusText}: ${detail || "unknown error"}`, - ); - } } /** @@ -565,16 +555,7 @@ export async function deleteMattermostPost( client: MattermostClient, postId: string, ): Promise { - const res = await fetch(`${client.apiBaseUrl}/posts/${postId}`, { + await client.request(`/posts/${postId}`, { method: "DELETE", - headers: { - Authorization: `Bearer ${client.token}`, - }, }); - if (!res.ok) { - const detail = await readMattermostError(res); - throw new Error( - `Mattermost delete post ${res.status} ${res.statusText}: ${detail || "unknown error"}`, - ); - } } From 33678bb973c6c8db6c6f3dab121f0a0a58300004 Mon Sep 17 00:00:00 2001 From: teconomix Date: Wed, 18 Mar 2026 07:55:37 +0000 Subject: [PATCH 04/18] fix(mattermost): set lastSentText after network success; wait for in-flight tick in flush Race condition: lastSentText was set synchronously before the async send/patch completed, so a failed request was treated as delivered and subsequent ticks skipped retrying. flushPendingPatch also didn't wait for in-flight interval ticks, causing it to exit early (text === lastSentText guard) when a tick had just fired but hadn't resolved yet, leaving streamMessageId null and forcing final delivery to send a new post instead of patching the streamed one. Fixes: - schedulePatch interval: set lastSentText only after successful send/patch - flushPendingPatch: wait up to 2s for in-flight patchSending before proceeding - flushPendingPatch: set lastSentText after network success, not before --- .../mattermost/src/mattermost/monitor.ts | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/extensions/mattermost/src/mattermost/monitor.ts b/extensions/mattermost/src/mattermost/monitor.ts index d374ba60dc8..a4c1f43d7a5 100644 --- a/extensions/mattermost/src/mattermost/monitor.ts +++ b/extensions/mattermost/src/mattermost/monitor.ts @@ -1427,9 +1427,17 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} const flushPendingPatch = async () => { stopPatchInterval(); if (!blockStreamingClient) return; + // Wait for any in-flight interval tick to settle before flushing. + // Without this, an interval tick that set lastSentText synchronously but hasn't + // completed the async send yet would cause flushPendingPatch to exit early + // (text === lastSentText guard), leaving streamMessageId null and causing + // final delivery to fall through to a new post instead of patching in place. + const deadline = Date.now() + 2000; + while (patchSending && Date.now() < deadline) { + await new Promise((r) => setTimeout(r, 20)); + } const text = pendingPatchText; if (!text || text === lastSentText) return; - lastSentText = text; if (!streamMessageId) { try { const result = await sendMessageMattermost(to, text, { @@ -1437,6 +1445,7 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} replyToId: effectiveReplyToId, }); streamMessageId = result.messageId; + lastSentText = text; runtime.log?.(`stream-patch started ${streamMessageId}`); } catch (err) { logVerboseMessage(`mattermost stream-patch flush send failed: ${String(err)}`); @@ -1447,6 +1456,7 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} postId: streamMessageId, message: text, }); + lastSentText = text; runtime.log?.(`stream-patch flushed ${streamMessageId}`); } catch (err) { logVerboseMessage(`mattermost stream-patch flush failed: ${String(err)}`); @@ -1461,7 +1471,6 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} patchInterval = setInterval(() => { const text = pendingPatchText; if (!text || text === lastSentText || patchSending) return; - lastSentText = text; patchSending = true; void (async () => { try { @@ -1472,6 +1481,7 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} replyToId: effectiveReplyToId, }); streamMessageId = result.messageId; + lastSentText = text; runtime.log?.(`stream-patch started ${streamMessageId}`); } catch (err) { logVerboseMessage(`mattermost stream-patch send failed: ${String(err)}`); @@ -1482,6 +1492,7 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} postId: streamMessageId, message: text, }); + lastSentText = text; runtime.log?.(`stream-patch edited ${streamMessageId}`); } catch (err) { logVerboseMessage(`mattermost stream-patch edit failed: ${String(err)}`); @@ -1546,7 +1557,11 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} }); return; } + // Successful final patch: reset all streaming state. streamMessageId = null; + pendingPatchText = ""; + lastSentText = ""; + patchSending = false; return; } From 7ed3db579f2191b252c0d366761498e430156c20 Mon Sep 17 00:00:00 2001 From: teconomix Date: Wed, 18 Mar 2026 12:02:31 +0000 Subject: [PATCH 05/18] fix(mattermost): make streaming opt-in; apply textLimit to patches; handle reply target divergence Three fixes addressing Codex review feedback: 1. Streaming opt-in: change streamingEnabled from (blockStreaming !== false) to (blockStreaming === true) so accounts without an explicit blockStreaming setting preserve their agent blockStreamingDefault instead of having edit-in-place streaming silently enabled. 2. Text limit: apply textLimit truncation in schedulePatch and flushPendingPatch before sending/patching. Intermediate preview posts only need the first chunk; final delivery goes through deliverMattermostReplyPayload which applies full chunking. This prevents oversize patch loops when responses exceed the limit. 3. Reply target divergence: when the final payload carries an explicit replyToId that resolves to a different root than the streaming post was created under (e.g. a [[reply_to_current]] directive), skip the in-place patch and fall through to normal delivery so the reply lands in the correct thread. Any orphaned stream post is deleted before the correct reply is sent. --- .../mattermost/src/mattermost/monitor.ts | 52 +++++++++++++++---- 1 file changed, 41 insertions(+), 11 deletions(-) diff --git a/extensions/mattermost/src/mattermost/monitor.ts b/extensions/mattermost/src/mattermost/monitor.ts index a4c1f43d7a5..ad2aa5bdb69 100644 --- a/extensions/mattermost/src/mattermost/monitor.ts +++ b/extensions/mattermost/src/mattermost/monitor.ts @@ -1411,7 +1411,10 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} let patchSending = false; // prevents concurrent network calls const STREAM_PATCH_INTERVAL_MS = 200; - const streamingEnabled = account.blockStreaming !== false; + // Edit-in-place streaming is opt-in: only activate when blockStreaming is + // explicitly true. When the config key is absent (undefined) we leave the + // agent's blockStreamingDefault in place and do not inject onPartialReply. + const streamingEnabled = account.blockStreaming === true; const blockStreamingClient = streamingEnabled && baseUrl && botToken ? createMattermostClient({ baseUrl, botToken }) @@ -1436,8 +1439,11 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} while (patchSending && Date.now() < deadline) { await new Promise((r) => setTimeout(r, 20)); } - const text = pendingPatchText; - if (!text || text === lastSentText) return; + const rawText = pendingPatchText; + if (!rawText || rawText === lastSentText) return; + // Truncate to textLimit so intermediate patches never exceed the server limit. + // Final delivery applies full chunking; streaming posts only need the first chunk. + const text = rawText.length > textLimit ? rawText.slice(0, textLimit) : rawText; if (!streamMessageId) { try { const result = await sendMessageMattermost(to, text, { @@ -1445,7 +1451,7 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} replyToId: effectiveReplyToId, }); streamMessageId = result.messageId; - lastSentText = text; + lastSentText = rawText; runtime.log?.(`stream-patch started ${streamMessageId}`); } catch (err) { logVerboseMessage(`mattermost stream-patch flush send failed: ${String(err)}`); @@ -1456,7 +1462,7 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} postId: streamMessageId, message: text, }); - lastSentText = text; + lastSentText = rawText; runtime.log?.(`stream-patch flushed ${streamMessageId}`); } catch (err) { logVerboseMessage(`mattermost stream-patch flush failed: ${String(err)}`); @@ -1469,8 +1475,10 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} pendingPatchText = fullText; if (patchInterval) return; patchInterval = setInterval(() => { - const text = pendingPatchText; - if (!text || text === lastSentText || patchSending) return; + const rawText = pendingPatchText; + if (!rawText || rawText === lastSentText || patchSending) return; + // Truncate to textLimit so intermediate patches never exceed the server limit. + const text = rawText.length > textLimit ? rawText.slice(0, textLimit) : rawText; patchSending = true; void (async () => { try { @@ -1481,7 +1489,7 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} replyToId: effectiveReplyToId, }); streamMessageId = result.messageId; - lastSentText = text; + lastSentText = rawText; runtime.log?.(`stream-patch started ${streamMessageId}`); } catch (err) { logVerboseMessage(`mattermost stream-patch send failed: ${String(err)}`); @@ -1492,7 +1500,7 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} postId: streamMessageId, message: text, }); - lastSentText = text; + lastSentText = rawText; runtime.log?.(`stream-patch edited ${streamMessageId}`); } catch (err) { logVerboseMessage(`mattermost stream-patch edit failed: ${String(err)}`); @@ -1521,7 +1529,17 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} // Final + streaming active: patch the streamed message with authoritative // complete text, or fall back to a new message (with orphan cleanup). - if (isFinal && streamMessageId && payload.text) { + // If the final payload carries an explicit replyToId that differs from + // the one the streaming post was created under, skip the in-place patch + // and fall through to normal delivery so the reply lands in the right thread. + const finalReplyToId = resolveMattermostReplyRootId({ + threadRootId: effectiveReplyToId, + replyToId: payload.replyToId, + }); + const streamReplyToId = effectiveReplyToId; + const replyTargetDiverged = + finalReplyToId !== streamReplyToId && payload.replyToId != null; + if (isFinal && streamMessageId && payload.text && !replyTargetDiverged) { const text = core.channel.text.convertMarkdownTables(payload.text, tableMode); try { await patchMattermostPost(blockStreamingClient!, { @@ -1567,7 +1585,19 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} if (isFinal) { stopPatchInterval(); - streamMessageId = null; + // If the reply target diverged and we have an orphaned stream post, + // attempt to delete it before normal delivery creates the correct post. + if (replyTargetDiverged && streamMessageId) { + const orphanId = streamMessageId; + streamMessageId = null; + try { + await deleteMattermostPost(blockStreamingClient!, orphanId); + } catch { + // Ignore — delivering to the correct thread takes priority. + } + } else { + streamMessageId = null; + } pendingPatchText = ""; lastSentText = ""; patchSending = false; From 276e5d735b93df3fd6a31f3ea94cdd8aa4ea9b9b Mon Sep 17 00:00:00 2001 From: teconomix Date: Wed, 18 Mar 2026 12:37:22 +0000 Subject: [PATCH 06/18] fix(mattermost): deduplicate truncated patch edits; delete orphan only after fallback succeeds MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two fixes from latest Codex review: 1. Truncation dedup: compare lastSentText against the truncated text (not the full rawText) in both schedulePatch and flushPendingPatch. Previously, once a reply grew past textLimit the guard compared the growing rawText against the stored rawText, so the post would be patched every 200 ms with the same truncated body — running into avoidable Mattermost rate limiting on long responses. 2. Orphan cleanup order: in the final-edit fallback path, deliver the replacement message first and only delete the orphaned stream post afterward. If the fallback send also fails, the user keeps the partial preview instead of losing all visible output. --- .../mattermost/src/mattermost/monitor.ts | 33 ++++++++++++------- 1 file changed, 22 insertions(+), 11 deletions(-) diff --git a/extensions/mattermost/src/mattermost/monitor.ts b/extensions/mattermost/src/mattermost/monitor.ts index ad2aa5bdb69..1948a0ab700 100644 --- a/extensions/mattermost/src/mattermost/monitor.ts +++ b/extensions/mattermost/src/mattermost/monitor.ts @@ -1440,10 +1440,13 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} await new Promise((r) => setTimeout(r, 20)); } const rawText = pendingPatchText; - if (!rawText || rawText === lastSentText) return; + if (!rawText) return; // Truncate to textLimit so intermediate patches never exceed the server limit. // Final delivery applies full chunking; streaming posts only need the first chunk. const text = rawText.length > textLimit ? rawText.slice(0, textLimit) : rawText; + // Guard on the truncated text so long replies (past textLimit) do not keep + // re-patching with the same truncated content every 200 ms and hit rate limits. + if (text === lastSentText) return; if (!streamMessageId) { try { const result = await sendMessageMattermost(to, text, { @@ -1451,7 +1454,7 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} replyToId: effectiveReplyToId, }); streamMessageId = result.messageId; - lastSentText = rawText; + lastSentText = text; runtime.log?.(`stream-patch started ${streamMessageId}`); } catch (err) { logVerboseMessage(`mattermost stream-patch flush send failed: ${String(err)}`); @@ -1462,7 +1465,7 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} postId: streamMessageId, message: text, }); - lastSentText = rawText; + lastSentText = text; runtime.log?.(`stream-patch flushed ${streamMessageId}`); } catch (err) { logVerboseMessage(`mattermost stream-patch flush failed: ${String(err)}`); @@ -1476,9 +1479,12 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} if (patchInterval) return; patchInterval = setInterval(() => { const rawText = pendingPatchText; - if (!rawText || rawText === lastSentText || patchSending) return; + if (!rawText || patchSending) return; // Truncate to textLimit so intermediate patches never exceed the server limit. const text = rawText.length > textLimit ? rawText.slice(0, textLimit) : rawText; + // Guard on the truncated text so long replies (past textLimit) do not keep + // re-patching with the same truncated content every 200 ms and hit rate limits. + if (text === lastSentText) return; patchSending = true; void (async () => { try { @@ -1489,7 +1495,7 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} replyToId: effectiveReplyToId, }); streamMessageId = result.messageId; - lastSentText = rawText; + lastSentText = text; runtime.log?.(`stream-patch started ${streamMessageId}`); } catch (err) { logVerboseMessage(`mattermost stream-patch send failed: ${String(err)}`); @@ -1500,7 +1506,7 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} postId: streamMessageId, message: text, }); - lastSentText = rawText; + lastSentText = text; runtime.log?.(`stream-patch edited ${streamMessageId}`); } catch (err) { logVerboseMessage(`mattermost stream-patch edit failed: ${String(err)}`); @@ -1553,11 +1559,10 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} ); const orphanId = streamMessageId; streamMessageId = null; - try { - await deleteMattermostPost(blockStreamingClient!, orphanId); - } catch { - // Ignore delete failure — delivering the complete message takes priority - } + // Deliver the fallback message first. Only delete the orphaned + // stream post after we know the replacement was successfully sent — + // if delivery also fails the user keeps the partial preview rather + // than losing all visible output. await deliverMattermostReplyPayload({ core, cfg, @@ -1573,6 +1578,12 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} tableMode, sendMessage: sendMessageMattermost, }); + // Fallback succeeded — now clean up the orphaned partial. + try { + await deleteMattermostPost(blockStreamingClient!, orphanId); + } catch { + // Ignore — the complete message was already delivered. + } return; } // Successful final patch: reset all streaming state. From e1e572b9ca4698f81f8cf79870c2d3420e1bce5f Mon Sep 17 00:00:00 2001 From: teconomix Date: Wed, 18 Mar 2026 14:43:22 +0000 Subject: [PATCH 07/18] fix(mattermost): compute replyTargetDiverged before flush; deliver-before-delete for divergent thread; fix disableBlockStreaming undefined MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three fixes from latest Codex review: 1. Compute replyTargetDiverged before flushPendingPatch: previously the flush always ran first, potentially creating a preview post under effectiveReplyToId even when the final payload would land in a different thread. Now flush is skipped when the target diverges, avoiding a transient post in the wrong thread. 2. Divergent-thread cleanup order: when replyTargetDiverged, deliver the correct message first and delete the orphaned preview only afterward. This matches the same pattern as the fallback path — if delivery fails, the user keeps the partial preview rather than losing all visible output. 3. disableBlockStreaming: changed fallback from false to undefined so accounts without an explicit blockStreaming setting preserve the agent blockStreamingDefault instead of having block streaming forced on. --- .../mattermost/src/mattermost/monitor.ts | 70 ++++++++++++------- 1 file changed, 44 insertions(+), 26 deletions(-) diff --git a/extensions/mattermost/src/mattermost/monitor.ts b/extensions/mattermost/src/mattermost/monitor.ts index 1948a0ab700..f801e680c3c 100644 --- a/extensions/mattermost/src/mattermost/monitor.ts +++ b/extensions/mattermost/src/mattermost/monitor.ts @@ -1528,23 +1528,24 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} deliver: async (payload: ReplyPayload, info) => { const isFinal = info.kind === "final"; - // Flush any pending partial-reply patch before final delivery. - if (isFinal && blockStreamingClient) { + // Compute reply target divergence before flushing, so we don't + // accidentally create a preview post in the wrong thread on flush. + const finalReplyToId = resolveMattermostReplyRootId({ + threadRootId: effectiveReplyToId, + replyToId: payload.replyToId, + }); + const replyTargetDiverged = + finalReplyToId !== effectiveReplyToId && payload.replyToId != null; + + // Flush any pending partial-reply patch before final delivery — + // but only when the reply stays in the same thread as the preview post. + if (isFinal && blockStreamingClient && !replyTargetDiverged) { await flushPendingPatch(); } // Final + streaming active: patch the streamed message with authoritative // complete text, or fall back to a new message (with orphan cleanup). - // If the final payload carries an explicit replyToId that differs from - // the one the streaming post was created under, skip the in-place patch - // and fall through to normal delivery so the reply lands in the right thread. - const finalReplyToId = resolveMattermostReplyRootId({ - threadRootId: effectiveReplyToId, - replyToId: payload.replyToId, - }); - const streamReplyToId = effectiveReplyToId; - const replyTargetDiverged = - finalReplyToId !== streamReplyToId && payload.replyToId != null; + // (When replyTargetDiverged the preview is cleaned up further below.) if (isFinal && streamMessageId && payload.text && !replyTargetDiverged) { const text = core.channel.text.convertMarkdownTables(payload.text, tableMode); try { @@ -1596,22 +1597,39 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} if (isFinal) { stopPatchInterval(); - // If the reply target diverged and we have an orphaned stream post, - // attempt to delete it before normal delivery creates the correct post. - if (replyTargetDiverged && streamMessageId) { - const orphanId = streamMessageId; - streamMessageId = null; - try { - await deleteMattermostPost(blockStreamingClient!, orphanId); - } catch { - // Ignore — delivering to the correct thread takes priority. - } - } else { - streamMessageId = null; - } + // Capture and clear the stream ID so normal delivery below can proceed. + // If the reply target diverged we hold the orphan ID and delete it + // *after* the replacement message is successfully sent (see below). + const orphanedStreamId = replyTargetDiverged ? streamMessageId : null; + streamMessageId = null; pendingPatchText = ""; lastSentText = ""; patchSending = false; + + if (!orphanedStreamId) { + // No divergence — fall through to normal delivery. + } else { + // Divergent target: deliver to the correct thread first, then clean + // up the orphan. If delivery fails the user keeps the partial preview. + await deliverMattermostReplyPayload({ + core, + cfg, + payload, + to, + accountId: account.accountId, + agentId: route.agentId, + replyToId: finalReplyToId, + textLimit, + tableMode, + sendMessage: sendMessageMattermost, + }); + try { + await deleteMattermostPost(blockStreamingClient!, orphanedStreamId); + } catch { + // Ignore — the complete message was already delivered. + } + return; + } } // Normal delivery — streaming not active or non-final partial. @@ -1664,7 +1682,7 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} ? true : typeof account.blockStreaming === "boolean" ? !account.blockStreaming - : false, + : undefined, onModelSelected, }, }), From 59db1c2b67ced7bac1507a2d0020d558c6b190f4 Mon Sep 17 00:00:00 2001 From: teconomix Date: Thu, 19 Mar 2026 20:17:51 +0000 Subject: [PATCH 08/18] fix(mattermost): deliver media-only final before deleting streamed preview When the final payload has no text (media-only), the in-place text patch branch is skipped, but the streamed preview post was previously left in the channel alongside the attachment. The orphanedStreamId capture now always holds the stream post ID (not only on divergent-target paths), and a new branch delivers the media payload first and deletes the stale preview only after delivery succeeds. If delivery fails, the preview stays visible as a fallback. --- .../mattermost/src/mattermost/monitor.ts | 33 +++++++++++++++---- 1 file changed, 27 insertions(+), 6 deletions(-) diff --git a/extensions/mattermost/src/mattermost/monitor.ts b/extensions/mattermost/src/mattermost/monitor.ts index f801e680c3c..03f11673d7c 100644 --- a/extensions/mattermost/src/mattermost/monitor.ts +++ b/extensions/mattermost/src/mattermost/monitor.ts @@ -1598,17 +1598,13 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} if (isFinal) { stopPatchInterval(); // Capture and clear the stream ID so normal delivery below can proceed. - // If the reply target diverged we hold the orphan ID and delete it - // *after* the replacement message is successfully sent (see below). - const orphanedStreamId = replyTargetDiverged ? streamMessageId : null; + const orphanedStreamId = streamMessageId; streamMessageId = null; pendingPatchText = ""; lastSentText = ""; patchSending = false; - if (!orphanedStreamId) { - // No divergence — fall through to normal delivery. - } else { + if (replyTargetDiverged && orphanedStreamId) { // Divergent target: deliver to the correct thread first, then clean // up the orphan. If delivery fails the user keeps the partial preview. await deliverMattermostReplyPayload({ @@ -1630,6 +1626,31 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} } return; } + + if (orphanedStreamId && !payload.text) { + // Media-only final with a streamed text-preview: deliver media first, + // then delete the orphaned preview so users don't see stale partial + // text alongside the attachment. If delivery fails, the preview stays. + await deliverMattermostReplyPayload({ + core, + cfg, + payload, + to, + accountId: account.accountId, + agentId: route.agentId, + replyToId: finalReplyToId, + textLimit, + tableMode, + sendMessage: sendMessageMattermost, + }); + try { + await deleteMattermostPost(blockStreamingClient!, orphanedStreamId); + } catch { + // Ignore — media was already delivered. + } + return; + } + // Otherwise fall through to normal delivery. } // Normal delivery — streaming not active or non-final partial. From 38f3e76f979a060b2d27777c3ccee9602715ee38 Mon Sep 17 00:00:00 2001 From: teconomix Date: Fri, 20 Mar 2026 03:53:16 +0000 Subject: [PATCH 09/18] fix(mattermost): wait for in-flight preview send before divergent delivery; stop retrying failed patches Two fixes: 1. Divergent-target flush (Codex ID=2962544342): when replyTargetDiverged is true the flush is skipped to avoid creating a preview in the wrong thread, but the patch interval was not stopped and any in-flight first sendMessageMattermost was not awaited. If that send resolved after the divergent delivery returned, it created a stray preview post with no cleanup path. Fix: always stop the interval and wait for patchSending to settle (up to 2s) even on the divergent path, so streamMessageId is populated if the send resolves during this window and the orphan cleanup below can capture and delete it. 2. Patch-failure retry storm (Codex ID=2962544347): after a patchMattermostPost failure in the schedulePatch interval, streamMessageId remained set and every subsequent 200ms tick retried the same failing request, spamming the API until final delivery. Fix: call stopPatchInterval() in the catch block so retries stop immediately. The preview stays frozen at its last successful text; deliver() will patch or replace it when the final reply arrives. --- .../mattermost/src/mattermost/monitor.ts | 24 +++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) diff --git a/extensions/mattermost/src/mattermost/monitor.ts b/extensions/mattermost/src/mattermost/monitor.ts index 03f11673d7c..f22bcd7adf1 100644 --- a/extensions/mattermost/src/mattermost/monitor.ts +++ b/extensions/mattermost/src/mattermost/monitor.ts @@ -1510,6 +1510,10 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} runtime.log?.(`stream-patch edited ${streamMessageId}`); } catch (err) { logVerboseMessage(`mattermost stream-patch edit failed: ${String(err)}`); + // Stop retrying on patch failure — the post may not be editable + // (missing edit_post permission, deleted, etc.). The preview stays + // frozen; final delivery will patch or replace it via deliver(). + stopPatchInterval(); } } } finally { @@ -1537,10 +1541,22 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} const replyTargetDiverged = finalReplyToId !== effectiveReplyToId && payload.replyToId != null; - // Flush any pending partial-reply patch before final delivery — - // but only when the reply stays in the same thread as the preview post. - if (isFinal && blockStreamingClient && !replyTargetDiverged) { - await flushPendingPatch(); + if (isFinal && blockStreamingClient) { + if (replyTargetDiverged) { + // Divergent target: don't flush (we don't want to create a preview in + // the wrong thread), but do stop the interval and wait for any in-flight + // tick/send to settle. This ensures streamMessageId is populated if the + // first sendMessageMattermost resolves during this window, so the orphan + // cleanup below can capture and delete it. + stopPatchInterval(); + const deadline = Date.now() + 2000; + while (patchSending && Date.now() < deadline) { + await new Promise((r) => setTimeout(r, 20)); + } + } else { + // Same thread: flush pending patches normally. + await flushPendingPatch(); + } } // Final + streaming active: patch the streamed message with authoritative From e2c4d8bd45bea1a3be3fd12fa9ca0cf29f353f9e Mon Sep 17 00:00:00 2001 From: teconomix Date: Fri, 20 Mar 2026 06:03:16 +0000 Subject: [PATCH 10/18] fix(mattermost): clean up streaming preview when no final payload arrives; stop retrying failed initial sends MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two fixes: 1. onSettled orphan cleanup (Codex ID=2963834802): add cleanup in the streaming dispatcher's onSettled callback for cases where the reply pipeline produces no final payload — e.g. messaging-tool sends suppressed by agent-runner-payloads.ts, or empty/heartbeat responses. Without this, onPartialReply could create a preview post that is never finalized or deleted. The cleanup mirrors the existing logic in #43041 (P5). 2. Initial-send retry storm (Codex ID=2963834806): call stopPatchInterval() in the sendMessageMattermost catch block, mirroring the existing fix for patchMattermostPost failures. Without this, a failed initial post attempt (missing post permission, DM creation failure, etc.) causes the 200ms interval to retry indefinitely for the rest of the response, flooding the API and gateway logs. --- .../mattermost/src/mattermost/monitor.ts | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/extensions/mattermost/src/mattermost/monitor.ts b/extensions/mattermost/src/mattermost/monitor.ts index f22bcd7adf1..391ccb890e9 100644 --- a/extensions/mattermost/src/mattermost/monitor.ts +++ b/extensions/mattermost/src/mattermost/monitor.ts @@ -1499,6 +1499,10 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} runtime.log?.(`stream-patch started ${streamMessageId}`); } catch (err) { logVerboseMessage(`mattermost stream-patch send failed: ${String(err)}`); + // Stop retrying on initial-send failure (e.g. missing post permission, + // DM-channel creation failure). Without this the interval keeps firing + // every 200 ms and flooding the API/logs for the rest of the response. + stopPatchInterval(); } } else { try { @@ -1696,6 +1700,21 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} dispatcher, onSettled: () => { markDispatchIdle(); + // Clean up any streaming preview that was never finalized — this happens when + // the reply pipeline produces no final payload (e.g. messaging-tool sends that + // are suppressed, or empty/heartbeat responses). Without this, onPartialReply + // can create a Mattermost post that is never deleted or patched with final text. + if (streamMessageId && blockStreamingClient) { + stopPatchInterval(); + const orphanId = streamMessageId; + streamMessageId = null; + pendingPatchText = ""; + lastSentText = ""; + patchSending = false; + void deleteMattermostPost(blockStreamingClient, orphanId).catch(() => { + // Best-effort — the run is already complete. + }); + } }, run: () => core.channel.reply.dispatchReplyFromConfig({ From 0ecdda1433e0694f33a5907e8289629ac650ba63 Mon Sep 17 00:00:00 2001 From: teconomix Date: Fri, 20 Mar 2026 07:15:22 +0000 Subject: [PATCH 11/18] fix(mattermost): wait for in-flight preview send in onSettled cleanup MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When the first preview POST is still in flight (patchSending=true, streamMessageId=null), the previous onSettled check was skipped entirely — the POST would resolve after cleanup and leave an orphaned preview post with no interval to clear it. Fix: trigger cleanup when either streamMessageId is set OR patchSending is true. Stop the interval immediately, clear pending state, then wait up to 3s for patchSending to clear before capturing the final streamMessageId and deleting the post. --- .../mattermost/src/mattermost/monitor.ts | 29 ++++++++++++++----- 1 file changed, 22 insertions(+), 7 deletions(-) diff --git a/extensions/mattermost/src/mattermost/monitor.ts b/extensions/mattermost/src/mattermost/monitor.ts index 391ccb890e9..299470bff1f 100644 --- a/extensions/mattermost/src/mattermost/monitor.ts +++ b/extensions/mattermost/src/mattermost/monitor.ts @@ -1704,16 +1704,31 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} // the reply pipeline produces no final payload (e.g. messaging-tool sends that // are suppressed, or empty/heartbeat responses). Without this, onPartialReply // can create a Mattermost post that is never deleted or patched with final text. - if (streamMessageId && blockStreamingClient) { + // + // We must also handle the race where the first preview POST is still in flight + // (patchSending=true, streamMessageId=null): stopPatchInterval() prevents new + // ticks, and the async cleanup waits for patchSending to clear so it can capture + // the messageId that the in-flight send will set. + if ((streamMessageId || patchSending) && blockStreamingClient) { stopPatchInterval(); - const orphanId = streamMessageId; - streamMessageId = null; pendingPatchText = ""; lastSentText = ""; - patchSending = false; - void deleteMattermostPost(blockStreamingClient, orphanId).catch(() => { - // Best-effort — the run is already complete. - }); + const client = blockStreamingClient; + void (async () => { + // Wait for any in-flight send/patch to settle so we get the final messageId. + const deadline = Date.now() + 3000; + while (patchSending && Date.now() < deadline) { + await new Promise((r) => setTimeout(r, 50)); + } + patchSending = false; + const orphanId = streamMessageId; + streamMessageId = null; + if (orphanId) { + await deleteMattermostPost(client, orphanId).catch(() => { + // Best-effort — the run is already complete. + }); + } + })(); } }, run: () => From 374c92947afb66623682168ef31aea0f2a07ebbd Mon Sep 17 00:00:00 2001 From: teconomix Date: Fri, 20 Mar 2026 08:21:53 +0000 Subject: [PATCH 12/18] fix(mattermost): latch send failures; include patchInterval in onSettled cleanup MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two fixes: 1. Failure latch (Codex ID=2964357928): add previewSendFailed boolean that is set true in both the initial-send and patch-edit catch blocks (alongside stopPatchInterval). schedulePatch() checks it before re-arming the interval, so subsequent onPartialReply calls during a run with a permanent failure (missing permission, DM-creation error) no longer recreate the timer and retry indefinitely. 2. patchInterval in onSettled guard (Codex ID=2964357925): onSettled now triggers cleanup when patchInterval is non-null, even if streamMessageId and patchSending are still falsy. This covers the window between schedulePatch arming the interval and the first 200ms tick flipping patchSending — if the run ends in that window (same-target messaging-tool sends, empty/heartbeat replies), the interval is now stopped and the pending state is cleared. --- .../mattermost/src/mattermost/monitor.ts | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/extensions/mattermost/src/mattermost/monitor.ts b/extensions/mattermost/src/mattermost/monitor.ts index 299470bff1f..cdddf595c1e 100644 --- a/extensions/mattermost/src/mattermost/monitor.ts +++ b/extensions/mattermost/src/mattermost/monitor.ts @@ -1409,6 +1409,9 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} let lastSentText = ""; let patchInterval: ReturnType | null = null; let patchSending = false; // prevents concurrent network calls + // Latches true after the first send/edit failure to prevent the interval + // from being re-armed by a later onPartialReply call (ID=2964357928). + let previewSendFailed = false; const STREAM_PATCH_INTERVAL_MS = 200; // Edit-in-place streaming is opt-in: only activate when blockStreaming is @@ -1475,6 +1478,8 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} const schedulePatch = (fullText: string) => { if (!blockStreamingClient) return; + // Do not re-arm if a permanent send/edit failure has been latched. + if (previewSendFailed) return; pendingPatchText = fullText; if (patchInterval) return; patchInterval = setInterval(() => { @@ -1499,9 +1504,9 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} runtime.log?.(`stream-patch started ${streamMessageId}`); } catch (err) { logVerboseMessage(`mattermost stream-patch send failed: ${String(err)}`); - // Stop retrying on initial-send failure (e.g. missing post permission, - // DM-channel creation failure). Without this the interval keeps firing - // every 200 ms and flooding the API/logs for the rest of the response. + // Latch the failure so schedulePatch() does not re-arm the interval + // on subsequent onPartialReply calls (which would retry indefinitely). + previewSendFailed = true; stopPatchInterval(); } } else { @@ -1514,9 +1519,9 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} runtime.log?.(`stream-patch edited ${streamMessageId}`); } catch (err) { logVerboseMessage(`mattermost stream-patch edit failed: ${String(err)}`); - // Stop retrying on patch failure — the post may not be editable - // (missing edit_post permission, deleted, etc.). The preview stays - // frozen; final delivery will patch or replace it via deliver(). + // Latch the failure so schedulePatch() does not re-arm the interval + // on subsequent onPartialReply calls. + previewSendFailed = true; stopPatchInterval(); } } @@ -1709,7 +1714,7 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} // (patchSending=true, streamMessageId=null): stopPatchInterval() prevents new // ticks, and the async cleanup waits for patchSending to clear so it can capture // the messageId that the in-flight send will set. - if ((streamMessageId || patchSending) && blockStreamingClient) { + if ((streamMessageId || patchSending || patchInterval) && blockStreamingClient) { stopPatchInterval(); pendingPatchText = ""; lastSentText = ""; From dff71545a617e27f37fbac47a42cf11e4716f07e Mon Sep 17 00:00:00 2001 From: teconomix Date: Fri, 20 Mar 2026 10:21:19 +0000 Subject: [PATCH 13/18] fix(mattermost): track patchInflight in P4; use direct await in onSettled cleanup Add patchInflight Promise tracking to P4 (feat/mattermost-block-streaming-rebased), mirroring the existing P5 approach. The onSettled cleanup previously used a 3-second busy-wait on patchSending, which would race on slow Mattermost links: if the first preview POST takes longer than 3s the cleanup exits early, patchSending is forced false, and when the POST later resolves it creates an orphan post that is never deleted. Fix: track the interval tick's async function as patchInflight. onSettled awaits it directly so the cleanup always captures the final streamMessageId, regardless of how long the POST takes. (Codex ID=2964616785) --- .../mattermost/src/mattermost/monitor.ts | 21 +++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/extensions/mattermost/src/mattermost/monitor.ts b/extensions/mattermost/src/mattermost/monitor.ts index cdddf595c1e..ef629a61f60 100644 --- a/extensions/mattermost/src/mattermost/monitor.ts +++ b/extensions/mattermost/src/mattermost/monitor.ts @@ -1409,6 +1409,9 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} let lastSentText = ""; let patchInterval: ReturnType | null = null; let patchSending = false; // prevents concurrent network calls + // Tracks the currently in-flight sendMessageMattermost / patchMattermostPost + // promise so that onSettled can await it directly rather than busy-waiting. + let patchInflight: Promise | null = null; // Latches true after the first send/edit failure to prevent the interval // from being re-armed by a later onPartialReply call (ID=2964357928). let previewSendFailed = false; @@ -1491,7 +1494,7 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} // re-patching with the same truncated content every 200 ms and hit rate limits. if (text === lastSentText) return; patchSending = true; - void (async () => { + const runTick = async () => { try { if (!streamMessageId) { try { @@ -1528,7 +1531,13 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} } finally { patchSending = false; } - })(); + }; + const inflightRun = runTick(); + patchInflight = inflightRun; + inflightRun.finally(() => { + if (patchInflight === inflightRun) patchInflight = null; + }); + void inflightRun; }, STREAM_PATCH_INTERVAL_MS); }; // ── End P4 streaming state ──────────────────────────────────────────────── @@ -1720,10 +1729,10 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} lastSentText = ""; const client = blockStreamingClient; void (async () => { - // Wait for any in-flight send/patch to settle so we get the final messageId. - const deadline = Date.now() + 3000; - while (patchSending && Date.now() < deadline) { - await new Promise((r) => setTimeout(r, 50)); + // Await the in-flight promise directly so we never miss a late-resolving + // POST — a 3s timeout would race on slow Mattermost links (ID=2964616785). + if (patchInflight) { + await patchInflight.catch(() => {}); } patchSending = false; const orphanId = streamMessageId; From f49c6a402c9305fd8ce5bc9232823b7e144b6ab2 Mon Sep 17 00:00:00 2001 From: teconomix Date: Fri, 20 Mar 2026 10:38:24 +0000 Subject: [PATCH 14/18] fix(mattermost): deliver media attachments after successful final patch (ID=2965023940) After a successful patchMattermostPost in the isFinal branch, the code returned immediately without delivering any media attachments. deliverMattermostReplyPayload is the only path that uploads/sends media, so caption+image/file/audio payloads were silently dropping the attachment whenever streaming was active and the patch succeeded. Fix: after a successful patch, check whether the payload has mediaUrls/mediaUrl. If so, call deliverMattermostReplyPayload with text=undefined to deliver only the media through the normal attachment path. --- .../mattermost/src/mattermost/monitor.ts | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/extensions/mattermost/src/mattermost/monitor.ts b/extensions/mattermost/src/mattermost/monitor.ts index ef629a61f60..bea27a5d27c 100644 --- a/extensions/mattermost/src/mattermost/monitor.ts +++ b/extensions/mattermost/src/mattermost/monitor.ts @@ -1626,6 +1626,26 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} pendingPatchText = ""; lastSentText = ""; patchSending = false; + // If the payload also has media attachments, deliver them now via the + // normal path. The patch only updates text; deliverMattermostReplyPayload + // is the only code that actually uploads/sends media (ID=2965023940). + if (payload.mediaUrls?.length || payload.mediaUrl) { + await deliverMattermostReplyPayload({ + core, + cfg, + payload: { ...payload, text: undefined }, + to, + accountId: account.accountId, + agentId: route.agentId, + replyToId: resolveMattermostReplyRootId({ + threadRootId: effectiveReplyToId, + replyToId: payload.replyToId, + }), + textLimit, + tableMode, + sendMessage: sendMessageMattermost, + }); + } return; } From 6bea6df33fbe3f6183c9d83ee4285e6521b40fcc Mon Sep 17 00:00:00 2001 From: teconomix Date: Fri, 20 Mar 2026 11:15:46 +0000 Subject: [PATCH 15/18] fix(mattermost): keep text+media together; avoid split captioned posts (ID=2965096969) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Patching preview text then delivering media separately splits a captioned-file reply into two posts: a text-only preview + captionless file attachment. New logic in the isFinal branch: - Text-only payload: patch in place as before (no change for common case) - Media payload: skip the patch, reset state, deliver full payload via deliverMattermostReplyPayload (text+media together), then delete preview. - Patch failure: same fallback as media payload — full delivery + delete. --- .../mattermost/src/mattermost/monitor.ts | 103 ++++++++---------- 1 file changed, 46 insertions(+), 57 deletions(-) diff --git a/extensions/mattermost/src/mattermost/monitor.ts b/extensions/mattermost/src/mattermost/monitor.ts index bea27a5d27c..20d3552bb05 100644 --- a/extensions/mattermost/src/mattermost/monitor.ts +++ b/extensions/mattermost/src/mattermost/monitor.ts @@ -1581,70 +1581,59 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} // complete text, or fall back to a new message (with orphan cleanup). // (When replyTargetDiverged the preview is cleaned up further below.) if (isFinal && streamMessageId && payload.text && !replyTargetDiverged) { - const text = core.channel.text.convertMarkdownTables(payload.text, tableMode); - try { - await patchMattermostPost(blockStreamingClient!, { - postId: streamMessageId, - message: text, - }); - runtime.log?.(`stream-patch final edit ${streamMessageId}`); - } catch (err) { - logVerboseMessage( - `mattermost stream-patch final edit failed: ${String(err)}, sending new message`, - ); - const orphanId = streamMessageId; - streamMessageId = null; - // Deliver the fallback message first. Only delete the orphaned - // stream post after we know the replacement was successfully sent — - // if delivery also fails the user keeps the partial preview rather - // than losing all visible output. - await deliverMattermostReplyPayload({ - core, - cfg, - payload, - to, - accountId: account.accountId, - agentId: route.agentId, - replyToId: resolveMattermostReplyRootId({ - threadRootId: effectiveReplyToId, - replyToId: payload.replyToId, - }), - textLimit, - tableMode, - sendMessage: sendMessageMattermost, - }); - // Fallback succeeded — now clean up the orphaned partial. + const hasMedia = payload.mediaUrls?.length || payload.mediaUrl; + // When the payload also has media, skip the in-place patch: patching + // the preview with only the text and then posting captionless media + // separately splits a single captioned-file reply into two posts. + // Fall through to deliverMattermostReplyPayload which sends + // text+media together in the correct Mattermost format (ID=2965096969). + if (!hasMedia) { + const text = core.channel.text.convertMarkdownTables(payload.text, tableMode); try { - await deleteMattermostPost(blockStreamingClient!, orphanId); - } catch { - // Ignore — the complete message was already delivered. + await patchMattermostPost(blockStreamingClient!, { + postId: streamMessageId, + message: text, + }); + runtime.log?.(`stream-patch final edit ${streamMessageId}`); + // Successful text-only patch. Reset streaming state and return. + streamMessageId = null; + pendingPatchText = ""; + lastSentText = ""; + patchSending = false; + return; + } catch (err) { + logVerboseMessage( + `mattermost stream-patch final edit failed: ${String(err)}, sending new message`, + ); + // Fall through to deliverMattermostReplyPayload below. } - return; } - // Successful final patch: reset all streaming state. + // Media payload or patch failure: deliver the full payload via the + // normal path (handles text+media together, chunking, etc.). + // Delete the preview only after successful delivery. + const orphanId = streamMessageId; streamMessageId = null; pendingPatchText = ""; lastSentText = ""; patchSending = false; - // If the payload also has media attachments, deliver them now via the - // normal path. The patch only updates text; deliverMattermostReplyPayload - // is the only code that actually uploads/sends media (ID=2965023940). - if (payload.mediaUrls?.length || payload.mediaUrl) { - await deliverMattermostReplyPayload({ - core, - cfg, - payload: { ...payload, text: undefined }, - to, - accountId: account.accountId, - agentId: route.agentId, - replyToId: resolveMattermostReplyRootId({ - threadRootId: effectiveReplyToId, - replyToId: payload.replyToId, - }), - textLimit, - tableMode, - sendMessage: sendMessageMattermost, - }); + await deliverMattermostReplyPayload({ + core, + cfg, + payload, + to, + accountId: account.accountId, + agentId: route.agentId, + replyToId: resolveMattermostReplyRootId({ + threadRootId: effectiveReplyToId, + replyToId: payload.replyToId, + }), + textLimit, + tableMode, + sendMessage: sendMessageMattermost, + }); + // Delivery succeeded — clean up the orphaned preview. + if (orphanId) { + void deleteMattermostPost(blockStreamingClient!, orphanId).catch(() => {}); } return; } From 75b79f6f59e919657e0da70094fa6843e10937a0 Mon Sep 17 00:00:00 2001 From: teconomix Date: Fri, 20 Mar 2026 11:39:22 +0000 Subject: [PATCH 16/18] fix(mattermost): use patchInflight in flushPendingPatch and divergent-target wait flushPendingPatch and the divergent-target branch were still using a 2-second busy-wait on patchSending, which has the same race as the onSettled wait that was already fixed: patchSending clears in the finally block before the network request actually settles. Both paths now await patchInflight directly. (ID=2965256849) --- .../mattermost/src/mattermost/monitor.ts | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/extensions/mattermost/src/mattermost/monitor.ts b/extensions/mattermost/src/mattermost/monitor.ts index 20d3552bb05..44bb6bce94e 100644 --- a/extensions/mattermost/src/mattermost/monitor.ts +++ b/extensions/mattermost/src/mattermost/monitor.ts @@ -1436,14 +1436,11 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} const flushPendingPatch = async () => { stopPatchInterval(); if (!blockStreamingClient) return; - // Wait for any in-flight interval tick to settle before flushing. - // Without this, an interval tick that set lastSentText synchronously but hasn't - // completed the async send yet would cause flushPendingPatch to exit early - // (text === lastSentText guard), leaving streamMessageId null and causing - // final delivery to fall through to a new post instead of patching in place. - const deadline = Date.now() + 2000; - while (patchSending && Date.now() < deadline) { - await new Promise((r) => setTimeout(r, 20)); + // Await the in-flight promise directly so we never miss a late-resolving + // POST/PATCH — the busy-wait on patchSending had a race where patchSending + // could clear before the network request settled (ID=2965256849). + if (patchInflight) { + await patchInflight.catch(() => {}); } const rawText = pendingPatchText; if (!rawText) return; @@ -1567,9 +1564,8 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} // first sendMessageMattermost resolves during this window, so the orphan // cleanup below can capture and delete it. stopPatchInterval(); - const deadline = Date.now() + 2000; - while (patchSending && Date.now() < deadline) { - await new Promise((r) => setTimeout(r, 20)); + if (patchInflight) { + await patchInflight.catch(() => {}); } } else { // Same thread: flush pending patches normally. From 5aa16edfbb8fe96539dc92b1bd766ed2456c67a2 Mon Sep 17 00:00:00 2001 From: teconomix Date: Fri, 20 Mar 2026 12:16:26 +0000 Subject: [PATCH 17/18] fix(mattermost): detect reply target divergence correctly (ID=2965349638) resolveMattermostReplyRootId always returns threadRootId when it is set, so comparing finalReplyToId against effectiveReplyToId was always false when the streaming preview was created in a thread. Explicit reply directives like [[reply_to:...]] could therefore never trigger the divergent-target path. Fix: compare payload.replyToId directly against effectiveReplyToId instead of going through the resolver. The resolver is still used to compute finalReplyToId for the actual delivery call. --- extensions/mattermost/src/mattermost/monitor.ts | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/extensions/mattermost/src/mattermost/monitor.ts b/extensions/mattermost/src/mattermost/monitor.ts index 44bb6bce94e..a14d6d1ebba 100644 --- a/extensions/mattermost/src/mattermost/monitor.ts +++ b/extensions/mattermost/src/mattermost/monitor.ts @@ -1549,12 +1549,21 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} // Compute reply target divergence before flushing, so we don't // accidentally create a preview post in the wrong thread on flush. + // Compute the reply target for this payload. When payload.replyToId is set + // and resolves to a different thread than the one the streaming preview was + // created in (effectiveReplyToId), we must not patch the preview in-place. + // We compare the raw payload.replyToId against effectiveReplyToId directly + // instead of going through resolveMattermostReplyRootId(), because that + // helper always returns threadRootId when set, making the comparison always + // false when effectiveReplyToId is non-empty (ID=2965349638). const finalReplyToId = resolveMattermostReplyRootId({ threadRootId: effectiveReplyToId, replyToId: payload.replyToId, }); const replyTargetDiverged = - finalReplyToId !== effectiveReplyToId && payload.replyToId != null; + payload.replyToId != null && + payload.replyToId.trim() !== "" && + payload.replyToId.trim() !== effectiveReplyToId; if (isFinal && blockStreamingClient) { if (replyTargetDiverged) { From fe8d0d5a9b8f7a8980947c36ab5e549ef78dc7da Mon Sep 17 00:00:00 2001 From: teconomix Date: Fri, 20 Mar 2026 16:39:27 +0000 Subject: [PATCH 18/18] fix(mattermost): normalize both sides before divergence check (ID=2965488514) [[reply_to_current]] sets payload.replyToId to the child-post id inside the thread, not the thread root. The raw !== comparison triggered false divergence: child-post != thread-root, even though they both resolve to the same Mattermost thread. Fix: compute baseReplyToId via resolveMattermostReplyRootId without the payload, then compare finalReplyToId against baseReplyToId. Both paths normalize child-post ids to the thread root before comparison, so [[reply_to_current]] and explicit child-post targets no longer falsely trigger the divergent-target path. --- .../mattermost/src/mattermost/monitor.ts | 21 ++++++++++++------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/extensions/mattermost/src/mattermost/monitor.ts b/extensions/mattermost/src/mattermost/monitor.ts index a14d6d1ebba..853021b2c36 100644 --- a/extensions/mattermost/src/mattermost/monitor.ts +++ b/extensions/mattermost/src/mattermost/monitor.ts @@ -1549,21 +1549,26 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} // Compute reply target divergence before flushing, so we don't // accidentally create a preview post in the wrong thread on flush. - // Compute the reply target for this payload. When payload.replyToId is set - // and resolves to a different thread than the one the streaming preview was - // created in (effectiveReplyToId), we must not patch the preview in-place. - // We compare the raw payload.replyToId against effectiveReplyToId directly - // instead of going through resolveMattermostReplyRootId(), because that - // helper always returns threadRootId when set, making the comparison always - // false when effectiveReplyToId is non-empty (ID=2965349638). + // Compute the reply target for this payload. When payload.replyToId resolves + // to a different Mattermost thread root than effectiveReplyToId, we must not + // patch the preview in-place (different thread). + // + // Both sides go through resolveMattermostReplyRootId so that child-post IDs + // (e.g. from [[reply_to_current]] inside a thread) are normalized to the same + // thread root as effectiveReplyToId before comparing. Raw payload.replyToId + // comparison was wrong: [[reply_to_current]] sets a child-post id that differs + // from the thread root but resolves to the same thread (ID=2965488514). const finalReplyToId = resolveMattermostReplyRootId({ threadRootId: effectiveReplyToId, replyToId: payload.replyToId, }); + const baseReplyToId = resolveMattermostReplyRootId({ + threadRootId: effectiveReplyToId, + }); const replyTargetDiverged = payload.replyToId != null && payload.replyToId.trim() !== "" && - payload.replyToId.trim() !== effectiveReplyToId; + finalReplyToId !== baseReplyToId; if (isFinal && blockStreamingClient) { if (replyTargetDiverged) {