feat(mattermost): block streaming edit-in-place (v3.12 rebase)

Rebased onto v3.12 main. Upstream extracted deliver logic to reply-delivery.ts,
so streaming now wraps deliverMattermostReplyPayload() instead of replacing
the inline deliver body.

Changes:
- client.ts: add patchMattermostPost() + deleteMattermostPost() API helpers
- monitor.ts: inject streaming state (schedulePatch, flushPendingPatch, setInterval)
  before main inbound createReplyDispatcherWithTyping only (3 dispatch paths exist,
  only main handler gets streaming via unique humanDelay+typingCallbacks anchor)
- monitor.ts: upgrade deliver signature to (payload, info) for isFinal detection
- monitor.ts: wrap deliverMattermostReplyPayload() with isFinal streaming logic
  (final+streaming: patch in-place or fallback; non-streaming: delegate to helper)
- monitor.ts: add onPartialReply + disableBlockStreaming override in replyOptions

pnpm check: no new errors introduced (pre-existing errors on main unrelated to this PR)

Fixes: https://github.com/openclaw/openclaw/issues/XXXX
PR: https://github.com/openclaw/openclaw/pull/33506
This commit is contained in:
teconomix 2026-03-14 03:27:43 +00:00
parent 13f396b395
commit 9f29865224
2 changed files with 219 additions and 5 deletions

View File

@ -533,3 +533,48 @@ export async function uploadMattermostFile(
}
return info;
}
/**
* Update an existing Mattermost post (partial patch).
* Requires edit_post (own) or edit_others_posts permission.
*/
export async function patchMattermostPost(
client: MattermostClient,
params: { postId: string; message: string },
): Promise<void> {
const res = await fetch(`${client.apiBaseUrl}/posts/${params.postId}/patch`, {
method: "PUT",
headers: {
Authorization: `Bearer ${client.token}`,
"Content-Type": "application/json",
},
body: JSON.stringify({ message: params.message }),
});
if (!res.ok) {
const detail = await readMattermostError(res);
throw new Error(
`Mattermost patch post ${res.status} ${res.statusText}: ${detail || "unknown error"}`,
);
}
}
/**
* Delete a Mattermost post (soft delete).
*/
export async function deleteMattermostPost(
client: MattermostClient,
postId: string,
): Promise<void> {
const res = await fetch(`${client.apiBaseUrl}/posts/${postId}`, {
method: "DELETE",
headers: {
Authorization: `Bearer ${client.token}`,
},
});
if (!res.ok) {
const detail = await readMattermostError(res);
throw new Error(
`Mattermost delete post ${res.status} ${res.statusText}: ${detail || "unknown error"}`,
);
}
}

View File

@ -36,7 +36,10 @@ import {
fetchMattermostChannel,
fetchMattermostMe,
fetchMattermostUser,
fetchMattermostUserTeams,
deleteMattermostPost,
normalizeMattermostBaseUrl,
patchMattermostPost,
sendMattermostTyping,
updateMattermostPost,
type MattermostChannel,
@ -515,7 +518,7 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {}
replyOptions: {
...replyOptions,
disableBlockStreaming:
typeof account.blockStreaming === "boolean" ? !account.blockStreaming : undefined,
typeof account.blockStreaming === "boolean" ? !account.blockStreaming : false,
onModelSelected,
},
});
@ -728,7 +731,7 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {}
replyOptions: {
...replyOptions,
disableBlockStreaming:
typeof account.blockStreaming === "boolean" ? !account.blockStreaming : undefined,
typeof account.blockStreaming === "boolean" ? !account.blockStreaming : false,
onModelSelected,
},
}),
@ -1398,12 +1401,164 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {}
});
},
});
// ── P4: Edit-in-place streaming state ──────────────────────────────────────
// Uses onPartialReply (full cumulative text each tick) + 200ms throttled
// setInterval to progressively edit a single Mattermost post in-place.
let streamMessageId: string | null = null;
let pendingPatchText = "";
let lastSentText = "";
let patchInterval: ReturnType<typeof setInterval> | null = null;
let patchSending = false; // prevents concurrent network calls
const STREAM_PATCH_INTERVAL_MS = 200;
const streamingEnabled = account.blockStreaming !== false;
const blockStreamingClient =
streamingEnabled && baseUrl && botToken
? createMattermostClient({ baseUrl, botToken })
: null;
const stopPatchInterval = () => {
if (patchInterval) {
clearInterval(patchInterval);
patchInterval = null;
}
};
const flushPendingPatch = async () => {
stopPatchInterval();
if (!blockStreamingClient) return;
const text = pendingPatchText;
if (!text || text === lastSentText) return;
lastSentText = text;
if (!streamMessageId) {
try {
const result = await sendMessageMattermost(to, text, {
accountId: account.accountId,
replyToId: effectiveReplyToId,
});
streamMessageId = result.messageId;
runtime.log?.(`stream-patch started ${streamMessageId}`);
} catch (err) {
logVerboseMessage(`mattermost stream-patch flush send failed: ${String(err)}`);
}
} else {
try {
await patchMattermostPost(blockStreamingClient, {
postId: streamMessageId,
message: text,
});
runtime.log?.(`stream-patch flushed ${streamMessageId}`);
} catch (err) {
logVerboseMessage(`mattermost stream-patch flush failed: ${String(err)}`);
}
}
};
const schedulePatch = (fullText: string) => {
if (!blockStreamingClient) return;
pendingPatchText = fullText;
if (patchInterval) return;
patchInterval = setInterval(() => {
const text = pendingPatchText;
if (!text || text === lastSentText || patchSending) return;
lastSentText = text;
patchSending = true;
void (async () => {
try {
if (!streamMessageId) {
try {
const result = await sendMessageMattermost(to, text, {
accountId: account.accountId,
replyToId: effectiveReplyToId,
});
streamMessageId = result.messageId;
runtime.log?.(`stream-patch started ${streamMessageId}`);
} catch (err) {
logVerboseMessage(`mattermost stream-patch send failed: ${String(err)}`);
}
} else {
try {
await patchMattermostPost(blockStreamingClient, {
postId: streamMessageId,
message: text,
});
runtime.log?.(`stream-patch edited ${streamMessageId}`);
} catch (err) {
logVerboseMessage(`mattermost stream-patch edit failed: ${String(err)}`);
}
}
} finally {
patchSending = false;
}
})();
}, STREAM_PATCH_INTERVAL_MS);
};
// ── End P4 streaming state ────────────────────────────────────────────────
const { dispatcher, replyOptions, markDispatchIdle } =
core.channel.reply.createReplyDispatcherWithTyping({
...prefixOptions,
humanDelay: core.channel.reply.resolveHumanDelayConfig(cfg, route.agentId),
typingCallbacks,
deliver: async (payload: ReplyPayload) => {
deliver: async (payload: ReplyPayload, info) => {
const isFinal = info.kind === "final";
// Flush any pending partial-reply patch before final delivery.
if (isFinal && blockStreamingClient) {
await flushPendingPatch();
}
// Final + streaming active: patch the streamed message with authoritative
// complete text, or fall back to a new message (with orphan cleanup).
if (isFinal && streamMessageId && payload.text) {
const text = core.channel.text.convertMarkdownTables(payload.text, tableMode);
try {
await patchMattermostPost(blockStreamingClient!, {
postId: streamMessageId,
message: text,
});
runtime.log?.(`stream-patch final edit ${streamMessageId}`);
} catch (err) {
logVerboseMessage(
`mattermost stream-patch final edit failed: ${String(err)}, sending new message`,
);
const orphanId = streamMessageId;
streamMessageId = null;
try {
await deleteMattermostPost(blockStreamingClient!, orphanId);
} catch {
// Ignore delete failure — delivering the complete message takes priority
}
await deliverMattermostReplyPayload({
core,
cfg,
payload,
to,
accountId: account.accountId,
agentId: route.agentId,
replyToId: resolveMattermostReplyRootId({
threadRootId: effectiveReplyToId,
replyToId: payload.replyToId,
}),
textLimit,
tableMode,
sendMessage: sendMessageMattermost,
});
return;
}
streamMessageId = null;
return;
}
if (isFinal) {
stopPatchInterval();
streamMessageId = null;
pendingPatchText = "";
lastSentText = "";
patchSending = false;
}
// Normal delivery — streaming not active or non-final partial.
await deliverMattermostReplyPayload({
core,
cfg,
@ -1438,8 +1593,22 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {}
dispatcher,
replyOptions: {
...replyOptions,
disableBlockStreaming:
typeof account.blockStreaming === "boolean" ? !account.blockStreaming : undefined,
// Use onPartialReply (full cumulative text) for edit-in-place streaming.
onPartialReply: blockStreamingClient
? (payload: ReplyPayload) => {
const rawText = payload.text ?? "";
const fullText = core.channel.text.convertMarkdownTables(rawText, tableMode);
if (fullText) {
schedulePatch(fullText);
}
}
: undefined,
// Disable core block streaming since we handle streaming via onPartialReply.
disableBlockStreaming: blockStreamingClient
? true
: typeof account.blockStreaming === "boolean"
? !account.blockStreaming
: false,
onModelSelected,
},
}),