fix(mattermost): make streaming opt-in; apply textLimit to patches; handle reply target divergence

Three fixes addressing Codex review feedback:

1. Streaming opt-in: change streamingEnabled from (blockStreaming !== false) to
   (blockStreaming === true) so accounts without an explicit blockStreaming setting
   preserve their agent blockStreamingDefault instead of having edit-in-place
   streaming silently enabled.

2. Text limit: apply textLimit truncation in schedulePatch and flushPendingPatch
   before sending/patching. Intermediate preview posts only need the first chunk;
   final delivery goes through deliverMattermostReplyPayload which applies full
   chunking. This prevents oversize patch loops when responses exceed the limit.

3. Reply target divergence: when the final payload carries an explicit replyToId
   that resolves to a different root than the streaming post was created under
   (e.g. a [[reply_to_current]] directive), skip the in-place patch and fall
   through to normal delivery so the reply lands in the correct thread. Any
   orphaned stream post is deleted before the correct reply is sent.
This commit is contained in:
teconomix 2026-03-18 12:02:31 +00:00
parent 33678bb973
commit 7ed3db579f

View File

@ -1411,7 +1411,10 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {}
let patchSending = false; // prevents concurrent network calls
const STREAM_PATCH_INTERVAL_MS = 200;
const streamingEnabled = account.blockStreaming !== false;
// Edit-in-place streaming is opt-in: only activate when blockStreaming is
// explicitly true. When the config key is absent (undefined) we leave the
// agent's blockStreamingDefault in place and do not inject onPartialReply.
const streamingEnabled = account.blockStreaming === true;
const blockStreamingClient =
streamingEnabled && baseUrl && botToken
? createMattermostClient({ baseUrl, botToken })
@ -1436,8 +1439,11 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {}
while (patchSending && Date.now() < deadline) {
await new Promise<void>((r) => setTimeout(r, 20));
}
const text = pendingPatchText;
if (!text || text === lastSentText) return;
const rawText = pendingPatchText;
if (!rawText || rawText === lastSentText) return;
// Truncate to textLimit so intermediate patches never exceed the server limit.
// Final delivery applies full chunking; streaming posts only need the first chunk.
const text = rawText.length > textLimit ? rawText.slice(0, textLimit) : rawText;
if (!streamMessageId) {
try {
const result = await sendMessageMattermost(to, text, {
@ -1445,7 +1451,7 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {}
replyToId: effectiveReplyToId,
});
streamMessageId = result.messageId;
lastSentText = text;
lastSentText = rawText;
runtime.log?.(`stream-patch started ${streamMessageId}`);
} catch (err) {
logVerboseMessage(`mattermost stream-patch flush send failed: ${String(err)}`);
@ -1456,7 +1462,7 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {}
postId: streamMessageId,
message: text,
});
lastSentText = text;
lastSentText = rawText;
runtime.log?.(`stream-patch flushed ${streamMessageId}`);
} catch (err) {
logVerboseMessage(`mattermost stream-patch flush failed: ${String(err)}`);
@ -1469,8 +1475,10 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {}
pendingPatchText = fullText;
if (patchInterval) return;
patchInterval = setInterval(() => {
const text = pendingPatchText;
if (!text || text === lastSentText || patchSending) return;
const rawText = pendingPatchText;
if (!rawText || rawText === lastSentText || patchSending) return;
// Truncate to textLimit so intermediate patches never exceed the server limit.
const text = rawText.length > textLimit ? rawText.slice(0, textLimit) : rawText;
patchSending = true;
void (async () => {
try {
@ -1481,7 +1489,7 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {}
replyToId: effectiveReplyToId,
});
streamMessageId = result.messageId;
lastSentText = text;
lastSentText = rawText;
runtime.log?.(`stream-patch started ${streamMessageId}`);
} catch (err) {
logVerboseMessage(`mattermost stream-patch send failed: ${String(err)}`);
@ -1492,7 +1500,7 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {}
postId: streamMessageId,
message: text,
});
lastSentText = text;
lastSentText = rawText;
runtime.log?.(`stream-patch edited ${streamMessageId}`);
} catch (err) {
logVerboseMessage(`mattermost stream-patch edit failed: ${String(err)}`);
@ -1521,7 +1529,17 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {}
// Final + streaming active: patch the streamed message with authoritative
// complete text, or fall back to a new message (with orphan cleanup).
if (isFinal && streamMessageId && payload.text) {
// If the final payload carries an explicit replyToId that differs from
// the one the streaming post was created under, skip the in-place patch
// and fall through to normal delivery so the reply lands in the right thread.
const finalReplyToId = resolveMattermostReplyRootId({
threadRootId: effectiveReplyToId,
replyToId: payload.replyToId,
});
const streamReplyToId = effectiveReplyToId;
const replyTargetDiverged =
finalReplyToId !== streamReplyToId && payload.replyToId != null;
if (isFinal && streamMessageId && payload.text && !replyTargetDiverged) {
const text = core.channel.text.convertMarkdownTables(payload.text, tableMode);
try {
await patchMattermostPost(blockStreamingClient!, {
@ -1567,7 +1585,19 @@ export async function monitorMattermostProvider(opts: MonitorMattermostOpts = {}
if (isFinal) {
stopPatchInterval();
streamMessageId = null;
// If the reply target diverged and we have an orphaned stream post,
// attempt to delete it before normal delivery creates the correct post.
if (replyTargetDiverged && streamMessageId) {
const orphanId = streamMessageId;
streamMessageId = null;
try {
await deleteMattermostPost(blockStreamingClient!, orphanId);
} catch {
// Ignore — delivering to the correct thread takes priority.
}
} else {
streamMessageId = null;
}
pendingPatchText = "";
lastSentText = "";
patchSending = false;