diff --git a/extensions/mattermost/src/mattermost/client.ts b/extensions/mattermost/src/mattermost/client.ts index c514160590f..9bbf4629468 100644 --- a/extensions/mattermost/src/mattermost/client.ts +++ b/extensions/mattermost/src/mattermost/client.ts @@ -533,3 +533,48 @@ export async function uploadMattermostFile( } return info; } + +/** + * Update an existing Mattermost post (partial patch). + * Requires edit_post (own) or edit_others_posts permission. + */ +export async function patchMattermostPost( + client: MattermostClient, + params: { postId: string; message: string }, +): Promise { + const res = await fetch(`${client.apiBaseUrl}/posts/${params.postId}/patch`, { + method: "PUT", + headers: { + Authorization: `Bearer ${client.token}`, + "Content-Type": "application/json", + }, + body: JSON.stringify({ message: params.message }), + }); + if (!res.ok) { + const detail = await readMattermostError(res); + throw new Error( + `Mattermost patch post ${res.status} ${res.statusText}: ${detail || "unknown error"}`, + ); + } +} + +/** + * Delete a Mattermost post (soft delete). + */ +export async function deleteMattermostPost( + client: MattermostClient, + postId: string, +): Promise { + const res = await fetch(`${client.apiBaseUrl}/posts/${postId}`, { + method: "DELETE", + headers: { + Authorization: `Bearer ${client.token}`, + }, + }); + if (!res.ok) { + const detail = await readMattermostError(res); + throw new Error( + `Mattermost delete post ${res.status} ${res.statusText}: ${detail || "unknown error"}`, + ); + } +} diff --git a/extensions/mattermost/src/mattermost/monitor.ts b/extensions/mattermost/src/mattermost/monitor.ts index 4cd74216811..b187e0cd44b 100644 --- a/extensions/mattermost/src/mattermost/monitor.ts +++ b/extensions/mattermost/src/mattermost/monitor.ts @@ -36,7 +36,10 @@ import { fetchMattermostChannel, fetchMattermostMe, fetchMattermostUser, + fetchMattermostUserTeams, + deleteMattermostPost, normalizeMattermostBaseUrl, + patchMattermostPost, sendMattermostTyping, updateMattermostPost, type MattermostChannel, @@ -515,7 +518,7 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} replyOptions: { ...replyOptions, disableBlockStreaming: - typeof account.blockStreaming === "boolean" ? !account.blockStreaming : undefined, + typeof account.blockStreaming === "boolean" ? !account.blockStreaming : false, onModelSelected, }, }); @@ -728,7 +731,7 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} replyOptions: { ...replyOptions, disableBlockStreaming: - typeof account.blockStreaming === "boolean" ? !account.blockStreaming : undefined, + typeof account.blockStreaming === "boolean" ? !account.blockStreaming : false, onModelSelected, }, }), @@ -1398,12 +1401,164 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} }); }, }); + // ── P4: Edit-in-place streaming state ────────────────────────────────────── + // Uses onPartialReply (full cumulative text each tick) + 200ms throttled + // setInterval to progressively edit a single Mattermost post in-place. + let streamMessageId: string | null = null; + let pendingPatchText = ""; + let lastSentText = ""; + let patchInterval: ReturnType | null = null; + let patchSending = false; // prevents concurrent network calls + const STREAM_PATCH_INTERVAL_MS = 200; + + const streamingEnabled = account.blockStreaming !== false; + const blockStreamingClient = + streamingEnabled && baseUrl && botToken + ? createMattermostClient({ baseUrl, botToken }) + : null; + + const stopPatchInterval = () => { + if (patchInterval) { + clearInterval(patchInterval); + patchInterval = null; + } + }; + + const flushPendingPatch = async () => { + stopPatchInterval(); + if (!blockStreamingClient) return; + const text = pendingPatchText; + if (!text || text === lastSentText) return; + lastSentText = text; + if (!streamMessageId) { + try { + const result = await sendMessageMattermost(to, text, { + accountId: account.accountId, + replyToId: effectiveReplyToId, + }); + streamMessageId = result.messageId; + runtime.log?.(`stream-patch started ${streamMessageId}`); + } catch (err) { + logVerboseMessage(`mattermost stream-patch flush send failed: ${String(err)}`); + } + } else { + try { + await patchMattermostPost(blockStreamingClient, { + postId: streamMessageId, + message: text, + }); + runtime.log?.(`stream-patch flushed ${streamMessageId}`); + } catch (err) { + logVerboseMessage(`mattermost stream-patch flush failed: ${String(err)}`); + } + } + }; + + const schedulePatch = (fullText: string) => { + if (!blockStreamingClient) return; + pendingPatchText = fullText; + if (patchInterval) return; + patchInterval = setInterval(() => { + const text = pendingPatchText; + if (!text || text === lastSentText || patchSending) return; + lastSentText = text; + patchSending = true; + void (async () => { + try { + if (!streamMessageId) { + try { + const result = await sendMessageMattermost(to, text, { + accountId: account.accountId, + replyToId: effectiveReplyToId, + }); + streamMessageId = result.messageId; + runtime.log?.(`stream-patch started ${streamMessageId}`); + } catch (err) { + logVerboseMessage(`mattermost stream-patch send failed: ${String(err)}`); + } + } else { + try { + await patchMattermostPost(blockStreamingClient, { + postId: streamMessageId, + message: text, + }); + runtime.log?.(`stream-patch edited ${streamMessageId}`); + } catch (err) { + logVerboseMessage(`mattermost stream-patch edit failed: ${String(err)}`); + } + } + } finally { + patchSending = false; + } + })(); + }, STREAM_PATCH_INTERVAL_MS); + }; + // ── End P4 streaming state ──────────────────────────────────────────────── + const { dispatcher, replyOptions, markDispatchIdle } = core.channel.reply.createReplyDispatcherWithTyping({ ...prefixOptions, humanDelay: core.channel.reply.resolveHumanDelayConfig(cfg, route.agentId), typingCallbacks, - deliver: async (payload: ReplyPayload) => { + deliver: async (payload: ReplyPayload, info) => { + const isFinal = info.kind === "final"; + + // Flush any pending partial-reply patch before final delivery. + if (isFinal && blockStreamingClient) { + await flushPendingPatch(); + } + + // Final + streaming active: patch the streamed message with authoritative + // complete text, or fall back to a new message (with orphan cleanup). + if (isFinal && streamMessageId && payload.text) { + const text = core.channel.text.convertMarkdownTables(payload.text, tableMode); + try { + await patchMattermostPost(blockStreamingClient!, { + postId: streamMessageId, + message: text, + }); + runtime.log?.(`stream-patch final edit ${streamMessageId}`); + } catch (err) { + logVerboseMessage( + `mattermost stream-patch final edit failed: ${String(err)}, sending new message`, + ); + const orphanId = streamMessageId; + streamMessageId = null; + try { + await deleteMattermostPost(blockStreamingClient!, orphanId); + } catch { + // Ignore delete failure — delivering the complete message takes priority + } + await deliverMattermostReplyPayload({ + core, + cfg, + payload, + to, + accountId: account.accountId, + agentId: route.agentId, + replyToId: resolveMattermostReplyRootId({ + threadRootId: effectiveReplyToId, + replyToId: payload.replyToId, + }), + textLimit, + tableMode, + sendMessage: sendMessageMattermost, + }); + return; + } + streamMessageId = null; + return; + } + + if (isFinal) { + stopPatchInterval(); + streamMessageId = null; + pendingPatchText = ""; + lastSentText = ""; + patchSending = false; + } + + // Normal delivery — streaming not active or non-final partial. await deliverMattermostReplyPayload({ core, cfg, @@ -1438,8 +1593,22 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {} dispatcher, replyOptions: { ...replyOptions, - disableBlockStreaming: - typeof account.blockStreaming === "boolean" ? !account.blockStreaming : undefined, + // Use onPartialReply (full cumulative text) for edit-in-place streaming. + onPartialReply: blockStreamingClient + ? (payload: ReplyPayload) => { + const rawText = payload.text ?? ""; + const fullText = core.channel.text.convertMarkdownTables(rawText, tableMode); + if (fullText) { + schedulePatch(fullText); + } + } + : undefined, + // Disable core block streaming since we handle streaming via onPartialReply. + disableBlockStreaming: blockStreamingClient + ? true + : typeof account.blockStreaming === "boolean" + ? !account.blockStreaming + : false, onModelSelected, }, }),