From afec3305d0def1602ebbeefe917e30b863819dbe Mon Sep 17 00:00:00 2001 From: Alexander Davydov Date: Wed, 18 Mar 2026 14:51:41 +0300 Subject: [PATCH] GigaChat: stream-decode SSE UTF-8 bytes --- src/agents/gigachat-stream.tool-calls.test.ts | 61 ++++++++++++ src/agents/gigachat-stream.ts | 94 +++++++++++-------- 2 files changed, 114 insertions(+), 41 deletions(-) diff --git a/src/agents/gigachat-stream.tool-calls.test.ts b/src/agents/gigachat-stream.tool-calls.test.ts index d458e6e40f1..7697aef205e 100644 --- a/src/agents/gigachat-stream.tool-calls.test.ts +++ b/src/agents/gigachat-stream.tool-calls.test.ts @@ -26,6 +26,10 @@ function createSseStream(lines: string[]): Readable { return Readable.from(lines.map((line) => `${line}\n`)); } +function createSseByteStream(chunks: Buffer[]): Readable { + return Readable.from(chunks); +} + describe("createGigachatStreamFn tool calling", () => { beforeEach(() => { vi.clearAllMocks(); @@ -92,6 +96,63 @@ describe("createGigachatStreamFn tool calling", () => { ]); }); + it("preserves multibyte UTF-8 tool arguments split across stream chunks", async () => { + const toolNameLine = Buffer.from( + 'data: {"choices":[{"delta":{"function_call":{"name":"llm_task"}}}]}\n', + "utf8", + ); + const argsLine = Buffer.from( + 'data: {"choices":[{"delta":{"function_call":{"arguments":"{\\"prompt\\":\\"привет\\"}"}}}]}\n', + "utf8", + ); + const splitAt = argsLine.indexOf("привет", 0, "utf8") + 1; + + request.mockResolvedValueOnce({ + status: 200, + data: createSseByteStream([ + toolNameLine, + argsLine.subarray(0, splitAt), + argsLine.subarray(splitAt), + Buffer.from("data: [DONE]\n", "utf8"), + ]), + }); + + const streamFn = createGigachatStreamFn({ + baseUrl: "https://gigachat.devices.sberbank.ru/api/v1", + authMode: "oauth", + }); + + const stream = await streamFn( + { api: "gigachat", provider: "gigachat", id: "GigaChat-2-Max" } as never, + { + messages: [], + tools: [ + { + name: "llm-task", + description: "Run a task", + parameters: { + type: "object", + properties: { + prompt: { type: "string" }, + }, + }, + }, + ], + } as never, + { apiKey: "token" } as never, + ); + + const event = await stream.result(); + + expect(event.content).toEqual([ + expect.objectContaining({ + type: "toolCall", + name: "llm-task", + arguments: { prompt: "привет" }, + }), + ]); + }); + it("sanitizes historical assistant/tool result names in the outbound request", async () => { request.mockResolvedValueOnce({ status: 200, diff --git a/src/agents/gigachat-stream.ts b/src/agents/gigachat-stream.ts index 03ca8fba97c..27fb5339dd6 100644 --- a/src/agents/gigachat-stream.ts +++ b/src/agents/gigachat-stream.ts @@ -691,54 +691,66 @@ export function createGigachatStreamFn(opts: GigachatStreamOptions): StreamFn { let promptTokens = 0; let completionTokens = 0; - // Our own SSE parsing that handles `: ` in JSON correctly + // Our own SSE parsing that handles `: ` in JSON correctly and keeps + // UTF-8 code points intact when TCP chunks split multibyte characters. let sseBuffer = ""; - for await (const chunk of response.data) { - sseBuffer += chunk.toString(); + const sseDecoder = new TextDecoder(); + const consumeSseLine = (line: string) => { + const trimmed = line.trim(); + if (!trimmed || trimmed.startsWith(":")) { + return; + } + if (trimmed === "data: [DONE]") { + return; + } + if (trimmed.startsWith("data: ")) { + // Fix: only split on first `: ` occurrence + const jsonStr = trimmed.slice(6); // Remove "data: " prefix + try { + const parsed = JSON.parse(jsonStr) as ExtendedChatCompletionChunk; + const choice = parsed.choices?.[0]; + + if (choice?.delta?.content) { + accumulatedContent += choice.delta.content; + } + if (choice?.delta?.function_call) { + if (!functionCallBuffer) { + functionCallBuffer = { name: "", arguments: "" }; + } + if (choice.delta.function_call.name) { + functionCallBuffer.name += choice.delta.function_call.name; + } + if (choice.delta.function_call.arguments) { + const args = choice.delta.function_call.arguments; + functionCallBuffer.arguments += + typeof args === "string" ? args : JSON.stringify(args); + } + } + if (parsed.usage) { + promptTokens = parsed.usage.prompt_tokens ?? 0; + completionTokens = parsed.usage.completion_tokens ?? 0; + } + } catch (e) { + log.warn(`Failed to parse SSE chunk: ${String(e)}`); + } + } + }; + for await (const chunk of response.data as AsyncIterable) { + sseBuffer += + typeof chunk === "string" + ? `${sseDecoder.decode()}${chunk}` + : sseDecoder.decode(chunk, { stream: true }); const lines = sseBuffer.split("\n"); sseBuffer = lines.pop() ?? ""; for (const line of lines) { - const trimmed = line.trim(); - if (!trimmed || trimmed.startsWith(":")) { - continue; - } - if (trimmed === "data: [DONE]") { - continue; - } - if (trimmed.startsWith("data: ")) { - // Fix: only split on first `: ` occurrence - const jsonStr = trimmed.slice(6); // Remove "data: " prefix - try { - const parsed = JSON.parse(jsonStr) as ExtendedChatCompletionChunk; - const choice = parsed.choices?.[0]; - - if (choice?.delta?.content) { - accumulatedContent += choice.delta.content; - } - if (choice?.delta?.function_call) { - if (!functionCallBuffer) { - functionCallBuffer = { name: "", arguments: "" }; - } - if (choice.delta.function_call.name) { - functionCallBuffer.name += choice.delta.function_call.name; - } - if (choice.delta.function_call.arguments) { - const args = choice.delta.function_call.arguments; - functionCallBuffer.arguments += - typeof args === "string" ? args : JSON.stringify(args); - } - } - if (parsed.usage) { - promptTokens = parsed.usage.prompt_tokens ?? 0; - completionTokens = parsed.usage.completion_tokens ?? 0; - } - } catch (e) { - log.warn(`Failed to parse SSE chunk: ${String(e)}`); - } - } + consumeSseLine(line); } } + sseBuffer += sseDecoder.decode(); + if (sseBuffer.trim()) { + consumeSseLine(sseBuffer); + } if (functionCallBuffer && functionCallBuffer.name) { accumulatedContent = stripLeakedFunctionCallPrelude(accumulatedContent);