Merge fe8d0d5a9b8f7a8980947c36ab5e549ef78dc7da into 8a05c05596ca9ba0735dafd8e359885de4c2c969

This commit is contained in:
Teconomix 2026-03-21 06:49:50 +01:00 committed by GitHub
commit ef3645a5ae
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 376 additions and 3 deletions

View File

@ -533,3 +533,29 @@ export async function uploadMattermostFile(
}
return info;
}
/**
* Update an existing Mattermost post (partial patch).
* Requires edit_post (own) or edit_others_posts permission.
*/
export async function patchMattermostPost(
client: MattermostClient,
params: { postId: string; message: string },
): Promise<void> {
await client.request<void>(`/posts/${params.postId}/patch`, {
method: "PUT",
body: JSON.stringify({ message: params.message }),
});
}
/**
* Delete a Mattermost post (soft delete).
*/
export async function deleteMattermostPost(
client: MattermostClient,
postId: string,
): Promise<void> {
await client.request<void>(`/posts/${postId}`, {
method: "DELETE",
});
}

View File

@ -35,7 +35,10 @@ import {
fetchMattermostChannel,
fetchMattermostMe,
fetchMattermostUser,
fetchMattermostUserTeams,
deleteMattermostPost,
normalizeMattermostBaseUrl,
patchMattermostPost,
sendMattermostTyping,
updateMattermostPost,
type MattermostChannel,
@ -1395,12 +1398,312 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {}
},
},
});
// ── P4: Edit-in-place streaming state ──────────────────────────────────────
// Uses onPartialReply (full cumulative text each tick) + 200ms throttled
// setInterval to progressively edit a single Mattermost post in-place.
let streamMessageId: string | null = null;
let pendingPatchText = "";
let lastSentText = "";
let patchInterval: ReturnType<typeof setInterval> | null = null;
let patchSending = false; // prevents concurrent network calls
// Tracks the currently in-flight sendMessageMattermost / patchMattermostPost
// promise so that onSettled can await it directly rather than busy-waiting.
let patchInflight: Promise<unknown> | null = null;
// Latches true after the first send/edit failure to prevent the interval
// from being re-armed by a later onPartialReply call (ID=2964357928).
let previewSendFailed = false;
const STREAM_PATCH_INTERVAL_MS = 200;
// Edit-in-place streaming is opt-in: only activate when blockStreaming is
// explicitly true. When the config key is absent (undefined) we leave the
// agent's blockStreamingDefault in place and do not inject onPartialReply.
const streamingEnabled = account.blockStreaming === true;
const blockStreamingClient =
streamingEnabled && baseUrl && botToken
? createMattermostClient({ baseUrl, botToken })
: null;
const stopPatchInterval = () => {
if (patchInterval) {
clearInterval(patchInterval);
patchInterval = null;
}
};
const flushPendingPatch = async () => {
stopPatchInterval();
if (!blockStreamingClient) return;
// Await the in-flight promise directly so we never miss a late-resolving
// POST/PATCH — the busy-wait on patchSending had a race where patchSending
// could clear before the network request settled (ID=2965256849).
if (patchInflight) {
await patchInflight.catch(() => {});
}
const rawText = pendingPatchText;
if (!rawText) return;
// Truncate to textLimit so intermediate patches never exceed the server limit.
// Final delivery applies full chunking; streaming posts only need the first chunk.
const text = rawText.length > textLimit ? rawText.slice(0, textLimit) : rawText;
// Guard on the truncated text so long replies (past textLimit) do not keep
// re-patching with the same truncated content every 200 ms and hit rate limits.
if (text === lastSentText) return;
if (!streamMessageId) {
try {
const result = await sendMessageMattermost(to, text, {
accountId: account.accountId,
replyToId: effectiveReplyToId,
});
streamMessageId = result.messageId;
lastSentText = text;
runtime.log?.(`stream-patch started ${streamMessageId}`);
} catch (err) {
logVerboseMessage(`mattermost stream-patch flush send failed: ${String(err)}`);
}
} else {
try {
await patchMattermostPost(blockStreamingClient, {
postId: streamMessageId,
message: text,
});
lastSentText = text;
runtime.log?.(`stream-patch flushed ${streamMessageId}`);
} catch (err) {
logVerboseMessage(`mattermost stream-patch flush failed: ${String(err)}`);
}
}
};
const schedulePatch = (fullText: string) => {
if (!blockStreamingClient) return;
// Do not re-arm if a permanent send/edit failure has been latched.
if (previewSendFailed) return;
pendingPatchText = fullText;
if (patchInterval) return;
patchInterval = setInterval(() => {
const rawText = pendingPatchText;
if (!rawText || patchSending) return;
// Truncate to textLimit so intermediate patches never exceed the server limit.
const text = rawText.length > textLimit ? rawText.slice(0, textLimit) : rawText;
// Guard on the truncated text so long replies (past textLimit) do not keep
// re-patching with the same truncated content every 200 ms and hit rate limits.
if (text === lastSentText) return;
patchSending = true;
const runTick = async () => {
try {
if (!streamMessageId) {
try {
const result = await sendMessageMattermost(to, text, {
accountId: account.accountId,
replyToId: effectiveReplyToId,
});
streamMessageId = result.messageId;
lastSentText = text;
runtime.log?.(`stream-patch started ${streamMessageId}`);
} catch (err) {
logVerboseMessage(`mattermost stream-patch send failed: ${String(err)}`);
// Latch the failure so schedulePatch() does not re-arm the interval
// on subsequent onPartialReply calls (which would retry indefinitely).
previewSendFailed = true;
stopPatchInterval();
}
} else {
try {
await patchMattermostPost(blockStreamingClient, {
postId: streamMessageId,
message: text,
});
lastSentText = text;
runtime.log?.(`stream-patch edited ${streamMessageId}`);
} catch (err) {
logVerboseMessage(`mattermost stream-patch edit failed: ${String(err)}`);
// Latch the failure so schedulePatch() does not re-arm the interval
// on subsequent onPartialReply calls.
previewSendFailed = true;
stopPatchInterval();
}
}
} finally {
patchSending = false;
}
};
const inflightRun = runTick();
patchInflight = inflightRun;
inflightRun.finally(() => {
if (patchInflight === inflightRun) patchInflight = null;
});
void inflightRun;
}, STREAM_PATCH_INTERVAL_MS);
};
// ── End P4 streaming state ────────────────────────────────────────────────
const { dispatcher, replyOptions, markDispatchIdle } =
core.channel.reply.createReplyDispatcherWithTyping({
...replyPipeline,
humanDelay: core.channel.reply.resolveHumanDelayConfig(cfg, route.agentId),
typingCallbacks,
deliver: async (payload: ReplyPayload) => {
deliver: async (payload: ReplyPayload, info) => {
const isFinal = info.kind === "final";
// Compute reply target divergence before flushing, so we don't
// accidentally create a preview post in the wrong thread on flush.
// Compute the reply target for this payload. When payload.replyToId resolves
// to a different Mattermost thread root than effectiveReplyToId, we must not
// patch the preview in-place (different thread).
//
// Both sides go through resolveMattermostReplyRootId so that child-post IDs
// (e.g. from [[reply_to_current]] inside a thread) are normalized to the same
// thread root as effectiveReplyToId before comparing. Raw payload.replyToId
// comparison was wrong: [[reply_to_current]] sets a child-post id that differs
// from the thread root but resolves to the same thread (ID=2965488514).
const finalReplyToId = resolveMattermostReplyRootId({
threadRootId: effectiveReplyToId,
replyToId: payload.replyToId,
});
const baseReplyToId = resolveMattermostReplyRootId({
threadRootId: effectiveReplyToId,
});
const replyTargetDiverged =
payload.replyToId != null &&
payload.replyToId.trim() !== "" &&
finalReplyToId !== baseReplyToId;
if (isFinal && blockStreamingClient) {
if (replyTargetDiverged) {
// Divergent target: don't flush (we don't want to create a preview in
// the wrong thread), but do stop the interval and wait for any in-flight
// tick/send to settle. This ensures streamMessageId is populated if the
// first sendMessageMattermost resolves during this window, so the orphan
// cleanup below can capture and delete it.
stopPatchInterval();
if (patchInflight) {
await patchInflight.catch(() => {});
}
} else {
// Same thread: flush pending patches normally.
await flushPendingPatch();
}
}
// Final + streaming active: patch the streamed message with authoritative
// complete text, or fall back to a new message (with orphan cleanup).
// (When replyTargetDiverged the preview is cleaned up further below.)
if (isFinal && streamMessageId && payload.text && !replyTargetDiverged) {
const hasMedia = payload.mediaUrls?.length || payload.mediaUrl;
// When the payload also has media, skip the in-place patch: patching
// the preview with only the text and then posting captionless media
// separately splits a single captioned-file reply into two posts.
// Fall through to deliverMattermostReplyPayload which sends
// text+media together in the correct Mattermost format (ID=2965096969).
if (!hasMedia) {
const text = core.channel.text.convertMarkdownTables(payload.text, tableMode);
try {
await patchMattermostPost(blockStreamingClient!, {
postId: streamMessageId,
message: text,
});
runtime.log?.(`stream-patch final edit ${streamMessageId}`);
// Successful text-only patch. Reset streaming state and return.
streamMessageId = null;
pendingPatchText = "";
lastSentText = "";
patchSending = false;
return;
} catch (err) {
logVerboseMessage(
`mattermost stream-patch final edit failed: ${String(err)}, sending new message`,
);
// Fall through to deliverMattermostReplyPayload below.
}
}
// Media payload or patch failure: deliver the full payload via the
// normal path (handles text+media together, chunking, etc.).
// Delete the preview only after successful delivery.
const orphanId = streamMessageId;
streamMessageId = null;
pendingPatchText = "";
lastSentText = "";
patchSending = false;
await deliverMattermostReplyPayload({
core,
cfg,
payload,
to,
accountId: account.accountId,
agentId: route.agentId,
replyToId: resolveMattermostReplyRootId({
threadRootId: effectiveReplyToId,
replyToId: payload.replyToId,
}),
textLimit,
tableMode,
sendMessage: sendMessageMattermost,
});
// Delivery succeeded — clean up the orphaned preview.
if (orphanId) {
void deleteMattermostPost(blockStreamingClient!, orphanId).catch(() => {});
}
return;
}
if (isFinal) {
stopPatchInterval();
// Capture and clear the stream ID so normal delivery below can proceed.
const orphanedStreamId = streamMessageId;
streamMessageId = null;
pendingPatchText = "";
lastSentText = "";
patchSending = false;
if (replyTargetDiverged && orphanedStreamId) {
// Divergent target: deliver to the correct thread first, then clean
// up the orphan. If delivery fails the user keeps the partial preview.
await deliverMattermostReplyPayload({
core,
cfg,
payload,
to,
accountId: account.accountId,
agentId: route.agentId,
replyToId: finalReplyToId,
textLimit,
tableMode,
sendMessage: sendMessageMattermost,
});
try {
await deleteMattermostPost(blockStreamingClient!, orphanedStreamId);
} catch {
// Ignore — the complete message was already delivered.
}
return;
}
if (orphanedStreamId && !payload.text) {
// Media-only final with a streamed text-preview: deliver media first,
// then delete the orphaned preview so users don't see stale partial
// text alongside the attachment. If delivery fails, the preview stays.
await deliverMattermostReplyPayload({
core,
cfg,
payload,
to,
accountId: account.accountId,
agentId: route.agentId,
replyToId: finalReplyToId,
textLimit,
tableMode,
sendMessage: sendMessageMattermost,
});
try {
await deleteMattermostPost(blockStreamingClient!, orphanedStreamId);
} catch {
// Ignore — media was already delivered.
}
return;
}
// Otherwise fall through to normal delivery.
}
// Normal delivery — streaming not active or non-final partial.
await deliverMattermostReplyPayload({
core,
cfg,
@ -1427,6 +1730,36 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {}
dispatcher,
onSettled: () => {
markDispatchIdle();
// Clean up any streaming preview that was never finalized — this happens when
// the reply pipeline produces no final payload (e.g. messaging-tool sends that
// are suppressed, or empty/heartbeat responses). Without this, onPartialReply
// can create a Mattermost post that is never deleted or patched with final text.
//
// We must also handle the race where the first preview POST is still in flight
// (patchSending=true, streamMessageId=null): stopPatchInterval() prevents new
// ticks, and the async cleanup waits for patchSending to clear so it can capture
// the messageId that the in-flight send will set.
if ((streamMessageId || patchSending || patchInterval) && blockStreamingClient) {
stopPatchInterval();
pendingPatchText = "";
lastSentText = "";
const client = blockStreamingClient;
void (async () => {
// Await the in-flight promise directly so we never miss a late-resolving
// POST — a 3s timeout would race on slow Mattermost links (ID=2964616785).
if (patchInflight) {
await patchInflight.catch(() => {});
}
patchSending = false;
const orphanId = streamMessageId;
streamMessageId = null;
if (orphanId) {
await deleteMattermostPost(client, orphanId).catch(() => {
// Best-effort — the run is already complete.
});
}
})();
}
},
run: () =>
core.channel.reply.dispatchReplyFromConfig({
@ -1435,8 +1768,22 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {}
dispatcher,
replyOptions: {
...replyOptions,
disableBlockStreaming:
typeof account.blockStreaming === "boolean" ? !account.blockStreaming : undefined,
// Use onPartialReply (full cumulative text) for edit-in-place streaming.
onPartialReply: blockStreamingClient
? (payload: ReplyPayload) => {
const rawText = payload.text ?? "";
const fullText = core.channel.text.convertMarkdownTables(rawText, tableMode);
if (fullText) {
schedulePatch(fullText);
}
}
: undefined,
// Disable core block streaming since we handle streaming via onPartialReply.
disableBlockStreaming: blockStreamingClient
? true
: typeof account.blockStreaming === "boolean"
? !account.blockStreaming
: undefined,
onModelSelected,
},
}),