fix: preserve responses phase over websocket (#43475) (thanks @by-openai)

This commit is contained in:
Peter Steinberger 2026-03-11 23:12:55 +00:00
parent 4eccea9f7f
commit 55a7eab7df
5 changed files with 329 additions and 1 deletions

View File

@ -110,6 +110,7 @@ Docs: https://docs.openclaw.ai
- Agents/error rendering: ignore stale assistant `errorMessage` fields on successful turns so background/tool-side failures no longer prepend synthetic billing errors over valid replies. (#40616) Thanks @ingyukoh.
- Agents/fallback: recognize Venice `402 Insufficient USD or Diem balance` billing errors so configured model fallbacks trigger instead of surfacing the raw provider error. (#43205) Thanks @Squabble9.
- Dependencies: refresh workspace dependencies except the pinned Carbon package, and harden ACP session-config writes against non-string SDK values so newer ACP clients fail fast instead of tripping type/runtime mismatches.
- Agents/OpenAI WebSocket Responses: preserve assistant `phase` metadata on replayed follow-up requests and completed WS responses, with unit plus live regression coverage for the custom OpenAI WS transport path. (#43475) Thanks @by-openai.
## 2026.3.8

View File

@ -37,12 +37,15 @@ export interface UsageInfo {
total_tokens: number;
}
export type OpenAIResponsesAssistantPhase = "commentary" | "final_answer";
export type OutputItem =
| {
type: "message";
id: string;
role: "assistant";
content: Array<{ type: "output_text"; text: string }>;
phase?: OpenAIResponsesAssistantPhase;
status?: "in_progress" | "completed";
}
| {
@ -190,6 +193,7 @@ export type InputItem =
type: "message";
role: "system" | "developer" | "user" | "assistant";
content: string | ContentPart[];
phase?: OpenAIResponsesAssistantPhase;
}
| { type: "function_call"; id?: string; call_id?: string; name: string; arguments: string }
| { type: "function_call_output"; call_id: string; output: string }

View File

@ -0,0 +1,148 @@
import type { AssistantMessage } from "@mariozechner/pi-ai";
import { afterEach, describe, expect, it } from "vitest";
import { isTruthyEnvValue } from "../infra/env.js";
import { createOpenAIWebSocketStreamFn, releaseWsSession } from "./openai-ws-stream.js";
const API_KEY = process.env.OPENAI_API_KEY ?? "";
const LIVE = isTruthyEnvValue(process.env.LIVE) || isTruthyEnvValue(process.env.OPENCLAW_LIVE_TEST);
const describeLive = LIVE && API_KEY ? describe : describe.skip;
type AssistantPhase = "commentary" | "final_answer";
type AssistantMessageWithPhase = AssistantMessage & { phase?: AssistantPhase };
type StreamFn = ReturnType<typeof createOpenAIWebSocketStreamFn>;
type StreamModel = Parameters<StreamFn>[0];
type StreamContext = Parameters<StreamFn>[1];
type StreamOptions = Parameters<StreamFn>[2];
const model = {
api: "openai-responses" as const,
provider: "openai",
id: "gpt-5.2",
name: "gpt-5.2",
baseUrl: "https://api.openai.com/v1",
reasoning: true,
input: ["text"],
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0 },
contextWindow: 128_000,
maxTokens: 16_384,
} as unknown as StreamModel;
const sessions: string[] = [];
function freshSession(name: string): string {
const id = `live-ws-${name}-${Date.now()}`;
sessions.push(id);
return id;
}
function makeUserMessage(text: string) {
return {
role: "user" as const,
content: text,
timestamp: Date.now(),
};
}
function makeContext(messages: StreamContext["messages"]): StreamContext {
return {
systemPrompt: "Reply with the exact requested text and nothing else.",
messages,
tools: [],
} as StreamContext;
}
function extractText(message: AssistantMessage): string {
return message.content
.filter(
(block): block is Extract<(typeof message.content)[number], { type: "text" }> =>
block.type === "text",
)
.map((block) => block.text)
.join("");
}
async function collectDoneEvent(
stream: ReturnType<StreamFn>,
): Promise<{ reason: string; message: AssistantMessageWithPhase }> {
for await (const event of stream as AsyncIterable<unknown>) {
if (event && typeof event === "object" && (event as { type?: string }).type === "done") {
return event as { reason: string; message: AssistantMessageWithPhase };
}
if (event && typeof event === "object" && (event as { type?: string }).type === "error") {
const error = event as { error?: { errorMessage?: string } };
throw new Error(error.error?.errorMessage ?? "OpenAI WS live test failed");
}
}
throw new Error("stream ended without a terminal done event");
}
describeLive("openai-ws-stream (live)", () => {
afterEach(() => {
for (const id of sessions) {
releaseWsSession(id);
}
sessions.length = 0;
});
it("replays seeded assistant phase on a second full-context websocket request", async () => {
const sessionId = freshSession("phase-replay");
const streamFn = createOpenAIWebSocketStreamFn(API_KEY, sessionId);
const firstTurn = await collectDoneEvent(
streamFn(model, makeContext([makeUserMessage("Reply with exactly FIRST-PHASE-OK.")]), {
reasoningEffort: "low",
maxTokens: 64,
} as StreamOptions),
);
expect(extractText(firstTurn.message)).toContain("FIRST-PHASE-OK");
const seededAssistant: AssistantMessageWithPhase = {
role: "assistant",
api: "openai-responses",
provider: "openai",
model: "gpt-5.2",
usage: {
input: 0,
output: 0,
cacheRead: 0,
cacheWrite: 0,
totalTokens: 0,
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 },
},
stopReason: "stop",
timestamp: Date.now(),
phase: "final_answer",
content: [{ type: "text", text: "FIRST-PHASE-OK" }],
};
let secondPayload: Record<string, unknown> | undefined;
const secondTurn = await collectDoneEvent(
streamFn(
model,
makeContext([
makeUserMessage("Reply with exactly FIRST-PHASE-OK."),
seededAssistant,
makeUserMessage("Reply with exactly SECOND-PHASE-OK."),
] as StreamContext["messages"]),
{
reasoningEffort: "low",
maxTokens: 64,
onPayload: (payload) => {
secondPayload = payload as Record<string, unknown>;
},
} as StreamOptions,
),
);
expect(extractText(secondTurn.message)).toContain("SECOND-PHASE-OK");
const input = Array.isArray(secondPayload?.input) ? secondPayload.input : [];
const replayedAssistant = input.find(
(item): item is Record<string, unknown> =>
!!item &&
typeof item === "object" &&
(item as Record<string, unknown>).type === "message" &&
(item as Record<string, unknown>).role === "assistant",
);
expect(replayedAssistant?.phase).toBe("final_answer");
}, 60_000);
});

View File

@ -224,6 +224,7 @@ type FakeMessage =
| {
role: "assistant";
content: unknown[];
phase?: "commentary" | "final_answer";
stopReason: string;
api: string;
provider: string;
@ -247,6 +248,7 @@ function userMsg(text: string): FakeMessage {
function assistantMsg(
textBlocks: string[],
toolCalls: Array<{ id: string; name: string; args: Record<string, unknown> }> = [],
phase?: "commentary" | "final_answer",
): FakeMessage {
const content: unknown[] = [];
for (const t of textBlocks) {
@ -258,6 +260,7 @@ function assistantMsg(
return {
role: "assistant",
content,
phase,
stopReason: toolCalls.length > 0 ? "toolUse" : "stop",
api: "openai-responses",
provider: "openai",
@ -302,6 +305,7 @@ function makeResponseObject(
id: string,
outputText?: string,
toolCallName?: string,
phase?: "commentary" | "final_answer",
): ResponseObject {
const output: ResponseObject["output"] = [];
if (outputText) {
@ -310,6 +314,7 @@ function makeResponseObject(
id: "item_1",
role: "assistant",
content: [{ type: "output_text", text: outputText }],
phase,
});
}
if (toolCallName) {
@ -391,6 +396,19 @@ describe("convertMessagesToInputItems", () => {
expect(items[0]).toMatchObject({ type: "message", role: "assistant", content: "Hi there." });
});
it("preserves assistant phase on replayed assistant messages", () => {
const items = convertMessagesToInputItems([
assistantMsg(["Working on it."], [], "commentary"),
] as Parameters<typeof convertMessagesToInputItems>[0]);
expect(items).toHaveLength(1);
expect(items[0]).toMatchObject({
type: "message",
role: "assistant",
content: "Working on it.",
phase: "commentary",
});
});
it("converts an assistant message with a tool call", () => {
const msg = assistantMsg(
["Let me run that."],
@ -408,10 +426,29 @@ describe("convertMessagesToInputItems", () => {
call_id: "call_1",
name: "exec",
});
expect(textItem).not.toHaveProperty("phase");
const fc = fcItem as { arguments: string };
expect(JSON.parse(fc.arguments)).toEqual({ cmd: "ls" });
});
it("preserves assistant phase on commentary text before tool calls", () => {
const msg = assistantMsg(
["Let me run that."],
[{ id: "call_1", name: "exec", args: { cmd: "ls" } }],
"commentary",
);
const items = convertMessagesToInputItems([msg] as Parameters<
typeof convertMessagesToInputItems
>[0]);
const textItem = items.find((i) => i.type === "message");
expect(textItem).toMatchObject({
type: "message",
role: "assistant",
content: "Let me run that.",
phase: "commentary",
});
});
it("converts a tool result message", () => {
const items = convertMessagesToInputItems([toolResultMsg("call_1", "file.txt")] as Parameters<
typeof convertMessagesToInputItems
@ -594,6 +631,16 @@ describe("buildAssistantMessageFromResponse", () => {
expect(msg.content).toEqual([]);
expect(msg.stopReason).toBe("stop");
});
it("preserves phase from assistant message output items", () => {
const response = makeResponseObject("resp_8", "Final answer", undefined, "final_answer");
const msg = buildAssistantMessageFromResponse(response, modelInfo) as {
phase?: string;
content: Array<{ type: string; text?: string }>;
};
expect(msg.phase).toBe("final_answer");
expect(msg.content[0]?.text).toBe("Final answer");
});
});
// ─────────────────────────────────────────────────────────────────────────────
@ -633,6 +680,8 @@ describe("createOpenAIWebSocketStreamFn", () => {
releaseWsSession("sess-fallback");
releaseWsSession("sess-incremental");
releaseWsSession("sess-full");
releaseWsSession("sess-phase");
releaseWsSession("sess-phase-replay");
releaseWsSession("sess-tools");
releaseWsSession("sess-store-default");
releaseWsSession("sess-store-compat");
@ -795,6 +844,40 @@ describe("createOpenAIWebSocketStreamFn", () => {
expect(doneEvent?.message.content[0]?.text).toBe("Hello back!");
});
it("keeps assistant phase on completed WebSocket responses", async () => {
const streamFn = createOpenAIWebSocketStreamFn("sk-test", "sess-phase");
const stream = streamFn(
modelStub as Parameters<typeof streamFn>[0],
contextStub as Parameters<typeof streamFn>[1],
);
const events: unknown[] = [];
const done = (async () => {
for await (const ev of await resolveStream(stream)) {
events.push(ev);
}
})();
await new Promise((r) => setImmediate(r));
const manager = MockManager.lastInstance!;
manager.simulateEvent({
type: "response.completed",
response: makeResponseObject("resp_phase", "Working...", "exec", "commentary"),
});
await done;
const doneEvent = events.find((e) => (e as { type?: string }).type === "done") as
| {
type: string;
reason: string;
message: { phase?: string; stopReason: string };
}
| undefined;
expect(doneEvent?.message.phase).toBe("commentary");
expect(doneEvent?.message.stopReason).toBe("toolUse");
});
it("falls back to HTTP when WebSocket connect fails (session pre-broken via flag)", async () => {
// Set the class-level flag BEFORE calling streamFn so the new instance
// fails on connect(). We patch the static default via MockManager directly.
@ -898,6 +981,79 @@ describe("createOpenAIWebSocketStreamFn", () => {
expect(inputTypes).toHaveLength(1);
});
it("replays assistant phase on a second turn full-context resend", async () => {
const sessionId = "sess-phase-replay";
const streamFn = createOpenAIWebSocketStreamFn("sk-test", sessionId);
const ctx1 = {
systemPrompt: "You are helpful.",
messages: [userMsg("Say hi")] as Parameters<typeof convertMessagesToInputItems>[0],
tools: [],
};
const stream1 = streamFn(
modelStub as Parameters<typeof streamFn>[0],
ctx1 as Parameters<typeof streamFn>[1],
);
const done1 = (async () => {
for await (const _ of await resolveStream(stream1)) {
// consume
}
})();
await new Promise((r) => setImmediate(r));
const manager = MockManager.lastInstance!;
manager.setPreviousResponseId("resp_phase_turn1");
manager.simulateEvent({
type: "response.completed",
response: makeResponseObject("resp_phase_turn1", "Hi there", undefined, "final_answer"),
});
await done1;
const ctx2 = {
systemPrompt: "You are helpful.",
messages: [
userMsg("Say hi"),
assistantMsg(["Hi there"], [], "final_answer"),
userMsg("Follow up"),
] as Parameters<typeof convertMessagesToInputItems>[0],
tools: [],
};
const stream2 = streamFn(
modelStub as Parameters<typeof streamFn>[0],
ctx2 as Parameters<typeof streamFn>[1],
);
const done2 = (async () => {
for await (const _ of await resolveStream(stream2)) {
// consume
}
})();
await new Promise((r) => setImmediate(r));
manager.simulateEvent({
type: "response.completed",
response: makeResponseObject("resp_phase_turn2", "All done", undefined, "final_answer"),
});
await done2;
expect(manager.sentEvents).toHaveLength(2);
const sent2 = manager.sentEvents[1] as {
previous_response_id?: string;
input?: Array<Record<string, unknown>>;
};
expect(sent2.previous_response_id).toBe("resp_phase_turn1");
const assistantInput = (sent2.input ?? []).find(
(item) => item.type === "message" && item.role === "assistant",
);
expect(assistantInput).toMatchObject({
type: "message",
role: "assistant",
content: "Hi there",
phase: "final_answer",
});
});
it("sends instructions (system prompt) in each request", async () => {
const streamFn = createOpenAIWebSocketStreamFn("sk-test", "sess-tools");
const ctx = {

View File

@ -37,6 +37,7 @@ import {
type ContentPart,
type FunctionToolDefinition,
type InputItem,
type OpenAIResponsesAssistantPhase,
type OpenAIWebSocketManagerOptions,
type ResponseObject,
} from "./openai-ws-connection.js";
@ -100,6 +101,7 @@ export function hasWsSession(sessionId: string): boolean {
// ─────────────────────────────────────────────────────────────────────────────
type AnyMessage = Message & { role: string; content: unknown };
type AssistantMessageWithPhase = AssistantMessage & { phase?: OpenAIResponsesAssistantPhase };
function toNonEmptyString(value: unknown): string | null {
if (typeof value !== "string") {
@ -109,6 +111,10 @@ function toNonEmptyString(value: unknown): string | null {
return trimmed.length > 0 ? trimmed : null;
}
function normalizeAssistantPhase(value: unknown): OpenAIResponsesAssistantPhase | undefined {
return value === "commentary" || value === "final_answer" ? value : undefined;
}
/** Convert pi-ai content (string | ContentPart[]) to plain text. */
function contentToText(content: unknown): string {
if (typeof content === "string") {
@ -193,6 +199,7 @@ export function convertMessagesToInputItems(messages: Message[]): InputItem[] {
}
if (m.role === "assistant") {
const assistantPhase = normalizeAssistantPhase((m as { phase?: unknown }).phase);
const content = m.content;
if (Array.isArray(content)) {
// Collect text blocks and tool calls separately
@ -216,6 +223,7 @@ export function convertMessagesToInputItems(messages: Message[]): InputItem[] {
type: "message",
role: "assistant",
content: textParts.join(""),
...(assistantPhase ? { phase: assistantPhase } : {}),
});
textParts.length = 0;
}
@ -241,6 +249,7 @@ export function convertMessagesToInputItems(messages: Message[]): InputItem[] {
type: "message",
role: "assistant",
content: textParts.join(""),
...(assistantPhase ? { phase: assistantPhase } : {}),
});
}
} else {
@ -250,6 +259,7 @@ export function convertMessagesToInputItems(messages: Message[]): InputItem[] {
type: "message",
role: "assistant",
content: text,
...(assistantPhase ? { phase: assistantPhase } : {}),
});
}
}
@ -289,9 +299,14 @@ export function buildAssistantMessageFromResponse(
modelInfo: { api: string; provider: string; id: string },
): AssistantMessage {
const content: (TextContent | ToolCall)[] = [];
let assistantPhase: OpenAIResponsesAssistantPhase | undefined;
for (const item of response.output ?? []) {
if (item.type === "message") {
const itemPhase = normalizeAssistantPhase(item.phase);
if (itemPhase) {
assistantPhase = itemPhase;
}
for (const part of item.content ?? []) {
if (part.type === "output_text" && part.text) {
content.push({ type: "text", text: part.text });
@ -321,7 +336,7 @@ export function buildAssistantMessageFromResponse(
const hasToolCalls = content.some((c) => c.type === "toolCall");
const stopReason: StopReason = hasToolCalls ? "toolUse" : "stop";
return buildAssistantMessage({
const message = buildAssistantMessage({
model: modelInfo,
content,
stopReason,
@ -331,6 +346,10 @@ export function buildAssistantMessageFromResponse(
totalTokens: response.usage?.total_tokens ?? 0,
}),
});
return assistantPhase
? ({ ...message, phase: assistantPhase } as AssistantMessageWithPhase)
: message;
}
// ─────────────────────────────────────────────────────────────────────────────