GigaChat: stream-decode SSE UTF-8 bytes

This commit is contained in:
Alexander Davydov 2026-03-18 14:51:41 +03:00
parent 79a525c70d
commit afec3305d0
2 changed files with 114 additions and 41 deletions

View File

@ -26,6 +26,10 @@ function createSseStream(lines: string[]): Readable {
return Readable.from(lines.map((line) => `${line}\n`));
}
function createSseByteStream(chunks: Buffer[]): Readable {
return Readable.from(chunks);
}
describe("createGigachatStreamFn tool calling", () => {
beforeEach(() => {
vi.clearAllMocks();
@ -92,6 +96,63 @@ describe("createGigachatStreamFn tool calling", () => {
]);
});
it("preserves multibyte UTF-8 tool arguments split across stream chunks", async () => {
const toolNameLine = Buffer.from(
'data: {"choices":[{"delta":{"function_call":{"name":"llm_task"}}}]}\n',
"utf8",
);
const argsLine = Buffer.from(
'data: {"choices":[{"delta":{"function_call":{"arguments":"{\\"prompt\\":\\"привет\\"}"}}}]}\n',
"utf8",
);
const splitAt = argsLine.indexOf("привет", 0, "utf8") + 1;
request.mockResolvedValueOnce({
status: 200,
data: createSseByteStream([
toolNameLine,
argsLine.subarray(0, splitAt),
argsLine.subarray(splitAt),
Buffer.from("data: [DONE]\n", "utf8"),
]),
});
const streamFn = createGigachatStreamFn({
baseUrl: "https://gigachat.devices.sberbank.ru/api/v1",
authMode: "oauth",
});
const stream = await streamFn(
{ api: "gigachat", provider: "gigachat", id: "GigaChat-2-Max" } as never,
{
messages: [],
tools: [
{
name: "llm-task",
description: "Run a task",
parameters: {
type: "object",
properties: {
prompt: { type: "string" },
},
},
},
],
} as never,
{ apiKey: "token" } as never,
);
const event = await stream.result();
expect(event.content).toEqual([
expect.objectContaining({
type: "toolCall",
name: "llm-task",
arguments: { prompt: "привет" },
}),
]);
});
it("sanitizes historical assistant/tool result names in the outbound request", async () => {
request.mockResolvedValueOnce({
status: 200,

View File

@ -691,54 +691,66 @@ export function createGigachatStreamFn(opts: GigachatStreamOptions): StreamFn {
let promptTokens = 0;
let completionTokens = 0;
// Our own SSE parsing that handles `: ` in JSON correctly
// Our own SSE parsing that handles `: ` in JSON correctly and keeps
// UTF-8 code points intact when TCP chunks split multibyte characters.
let sseBuffer = "";
for await (const chunk of response.data) {
sseBuffer += chunk.toString();
const sseDecoder = new TextDecoder();
const consumeSseLine = (line: string) => {
const trimmed = line.trim();
if (!trimmed || trimmed.startsWith(":")) {
return;
}
if (trimmed === "data: [DONE]") {
return;
}
if (trimmed.startsWith("data: ")) {
// Fix: only split on first `: ` occurrence
const jsonStr = trimmed.slice(6); // Remove "data: " prefix
try {
const parsed = JSON.parse(jsonStr) as ExtendedChatCompletionChunk;
const choice = parsed.choices?.[0];
if (choice?.delta?.content) {
accumulatedContent += choice.delta.content;
}
if (choice?.delta?.function_call) {
if (!functionCallBuffer) {
functionCallBuffer = { name: "", arguments: "" };
}
if (choice.delta.function_call.name) {
functionCallBuffer.name += choice.delta.function_call.name;
}
if (choice.delta.function_call.arguments) {
const args = choice.delta.function_call.arguments;
functionCallBuffer.arguments +=
typeof args === "string" ? args : JSON.stringify(args);
}
}
if (parsed.usage) {
promptTokens = parsed.usage.prompt_tokens ?? 0;
completionTokens = parsed.usage.completion_tokens ?? 0;
}
} catch (e) {
log.warn(`Failed to parse SSE chunk: ${String(e)}`);
}
}
};
for await (const chunk of response.data as AsyncIterable<string | Uint8Array>) {
sseBuffer +=
typeof chunk === "string"
? `${sseDecoder.decode()}${chunk}`
: sseDecoder.decode(chunk, { stream: true });
const lines = sseBuffer.split("\n");
sseBuffer = lines.pop() ?? "";
for (const line of lines) {
const trimmed = line.trim();
if (!trimmed || trimmed.startsWith(":")) {
continue;
}
if (trimmed === "data: [DONE]") {
continue;
}
if (trimmed.startsWith("data: ")) {
// Fix: only split on first `: ` occurrence
const jsonStr = trimmed.slice(6); // Remove "data: " prefix
try {
const parsed = JSON.parse(jsonStr) as ExtendedChatCompletionChunk;
const choice = parsed.choices?.[0];
if (choice?.delta?.content) {
accumulatedContent += choice.delta.content;
}
if (choice?.delta?.function_call) {
if (!functionCallBuffer) {
functionCallBuffer = { name: "", arguments: "" };
}
if (choice.delta.function_call.name) {
functionCallBuffer.name += choice.delta.function_call.name;
}
if (choice.delta.function_call.arguments) {
const args = choice.delta.function_call.arguments;
functionCallBuffer.arguments +=
typeof args === "string" ? args : JSON.stringify(args);
}
}
if (parsed.usage) {
promptTokens = parsed.usage.prompt_tokens ?? 0;
completionTokens = parsed.usage.completion_tokens ?? 0;
}
} catch (e) {
log.warn(`Failed to parse SSE chunk: ${String(e)}`);
}
}
consumeSseLine(line);
}
}
sseBuffer += sseDecoder.decode();
if (sseBuffer.trim()) {
consumeSseLine(sseBuffer);
}
if (functionCallBuffer && functionCallBuffer.name) {
accumulatedContent = stripLeakedFunctionCallPrelude(accumulatedContent);