diff --git a/CHANGELOG.md b/CHANGELOG.md index 25f8d76aa82..928e99871dd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -110,6 +110,7 @@ Docs: https://docs.openclaw.ai - Agents/error rendering: ignore stale assistant `errorMessage` fields on successful turns so background/tool-side failures no longer prepend synthetic billing errors over valid replies. (#40616) Thanks @ingyukoh. - Agents/fallback: recognize Venice `402 Insufficient USD or Diem balance` billing errors so configured model fallbacks trigger instead of surfacing the raw provider error. (#43205) Thanks @Squabble9. - Dependencies: refresh workspace dependencies except the pinned Carbon package, and harden ACP session-config writes against non-string SDK values so newer ACP clients fail fast instead of tripping type/runtime mismatches. +- Agents/OpenAI WebSocket Responses: preserve assistant `phase` metadata on replayed follow-up requests and completed WS responses, with unit plus live regression coverage for the custom OpenAI WS transport path. (#43475) Thanks @by-openai. ## 2026.3.8 diff --git a/src/agents/openai-ws-connection.ts b/src/agents/openai-ws-connection.ts index a765c0f3780..489d8f39962 100644 --- a/src/agents/openai-ws-connection.ts +++ b/src/agents/openai-ws-connection.ts @@ -37,12 +37,15 @@ export interface UsageInfo { total_tokens: number; } +export type OpenAIResponsesAssistantPhase = "commentary" | "final_answer"; + export type OutputItem = | { type: "message"; id: string; role: "assistant"; content: Array<{ type: "output_text"; text: string }>; + phase?: OpenAIResponsesAssistantPhase; status?: "in_progress" | "completed"; } | { @@ -190,6 +193,7 @@ export type InputItem = type: "message"; role: "system" | "developer" | "user" | "assistant"; content: string | ContentPart[]; + phase?: OpenAIResponsesAssistantPhase; } | { type: "function_call"; id?: string; call_id?: string; name: string; arguments: string } | { type: "function_call_output"; call_id: string; output: string } diff --git a/src/agents/openai-ws-stream.live.test.ts b/src/agents/openai-ws-stream.live.test.ts new file mode 100644 index 00000000000..0a160e41ff7 --- /dev/null +++ b/src/agents/openai-ws-stream.live.test.ts @@ -0,0 +1,148 @@ +import type { AssistantMessage } from "@mariozechner/pi-ai"; +import { afterEach, describe, expect, it } from "vitest"; +import { isTruthyEnvValue } from "../infra/env.js"; +import { createOpenAIWebSocketStreamFn, releaseWsSession } from "./openai-ws-stream.js"; + +const API_KEY = process.env.OPENAI_API_KEY ?? ""; +const LIVE = isTruthyEnvValue(process.env.LIVE) || isTruthyEnvValue(process.env.OPENCLAW_LIVE_TEST); +const describeLive = LIVE && API_KEY ? describe : describe.skip; + +type AssistantPhase = "commentary" | "final_answer"; +type AssistantMessageWithPhase = AssistantMessage & { phase?: AssistantPhase }; +type StreamFn = ReturnType; +type StreamModel = Parameters[0]; +type StreamContext = Parameters[1]; +type StreamOptions = Parameters[2]; + +const model = { + api: "openai-responses" as const, + provider: "openai", + id: "gpt-5.2", + name: "gpt-5.2", + baseUrl: "https://api.openai.com/v1", + reasoning: true, + input: ["text"], + cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0 }, + contextWindow: 128_000, + maxTokens: 16_384, +} as unknown as StreamModel; + +const sessions: string[] = []; + +function freshSession(name: string): string { + const id = `live-ws-${name}-${Date.now()}`; + sessions.push(id); + return id; +} + +function makeUserMessage(text: string) { + return { + role: "user" as const, + content: text, + timestamp: Date.now(), + }; +} + +function makeContext(messages: StreamContext["messages"]): StreamContext { + return { + systemPrompt: "Reply with the exact requested text and nothing else.", + messages, + tools: [], + } as StreamContext; +} + +function extractText(message: AssistantMessage): string { + return message.content + .filter( + (block): block is Extract<(typeof message.content)[number], { type: "text" }> => + block.type === "text", + ) + .map((block) => block.text) + .join(""); +} + +async function collectDoneEvent( + stream: ReturnType, +): Promise<{ reason: string; message: AssistantMessageWithPhase }> { + for await (const event of stream as AsyncIterable) { + if (event && typeof event === "object" && (event as { type?: string }).type === "done") { + return event as { reason: string; message: AssistantMessageWithPhase }; + } + if (event && typeof event === "object" && (event as { type?: string }).type === "error") { + const error = event as { error?: { errorMessage?: string } }; + throw new Error(error.error?.errorMessage ?? "OpenAI WS live test failed"); + } + } + throw new Error("stream ended without a terminal done event"); +} + +describeLive("openai-ws-stream (live)", () => { + afterEach(() => { + for (const id of sessions) { + releaseWsSession(id); + } + sessions.length = 0; + }); + + it("replays seeded assistant phase on a second full-context websocket request", async () => { + const sessionId = freshSession("phase-replay"); + const streamFn = createOpenAIWebSocketStreamFn(API_KEY, sessionId); + + const firstTurn = await collectDoneEvent( + streamFn(model, makeContext([makeUserMessage("Reply with exactly FIRST-PHASE-OK.")]), { + reasoningEffort: "low", + maxTokens: 64, + } as StreamOptions), + ); + expect(extractText(firstTurn.message)).toContain("FIRST-PHASE-OK"); + + const seededAssistant: AssistantMessageWithPhase = { + role: "assistant", + api: "openai-responses", + provider: "openai", + model: "gpt-5.2", + usage: { + input: 0, + output: 0, + cacheRead: 0, + cacheWrite: 0, + totalTokens: 0, + cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 }, + }, + stopReason: "stop", + timestamp: Date.now(), + phase: "final_answer", + content: [{ type: "text", text: "FIRST-PHASE-OK" }], + }; + + let secondPayload: Record | undefined; + const secondTurn = await collectDoneEvent( + streamFn( + model, + makeContext([ + makeUserMessage("Reply with exactly FIRST-PHASE-OK."), + seededAssistant, + makeUserMessage("Reply with exactly SECOND-PHASE-OK."), + ] as StreamContext["messages"]), + { + reasoningEffort: "low", + maxTokens: 64, + onPayload: (payload) => { + secondPayload = payload as Record; + }, + } as StreamOptions, + ), + ); + expect(extractText(secondTurn.message)).toContain("SECOND-PHASE-OK"); + + const input = Array.isArray(secondPayload?.input) ? secondPayload.input : []; + const replayedAssistant = input.find( + (item): item is Record => + !!item && + typeof item === "object" && + (item as Record).type === "message" && + (item as Record).role === "assistant", + ); + expect(replayedAssistant?.phase).toBe("final_answer"); + }, 60_000); +}); diff --git a/src/agents/openai-ws-stream.test.ts b/src/agents/openai-ws-stream.test.ts index a9c3679f561..635724b2b95 100644 --- a/src/agents/openai-ws-stream.test.ts +++ b/src/agents/openai-ws-stream.test.ts @@ -224,6 +224,7 @@ type FakeMessage = | { role: "assistant"; content: unknown[]; + phase?: "commentary" | "final_answer"; stopReason: string; api: string; provider: string; @@ -247,6 +248,7 @@ function userMsg(text: string): FakeMessage { function assistantMsg( textBlocks: string[], toolCalls: Array<{ id: string; name: string; args: Record }> = [], + phase?: "commentary" | "final_answer", ): FakeMessage { const content: unknown[] = []; for (const t of textBlocks) { @@ -258,6 +260,7 @@ function assistantMsg( return { role: "assistant", content, + phase, stopReason: toolCalls.length > 0 ? "toolUse" : "stop", api: "openai-responses", provider: "openai", @@ -302,6 +305,7 @@ function makeResponseObject( id: string, outputText?: string, toolCallName?: string, + phase?: "commentary" | "final_answer", ): ResponseObject { const output: ResponseObject["output"] = []; if (outputText) { @@ -310,6 +314,7 @@ function makeResponseObject( id: "item_1", role: "assistant", content: [{ type: "output_text", text: outputText }], + phase, }); } if (toolCallName) { @@ -391,6 +396,19 @@ describe("convertMessagesToInputItems", () => { expect(items[0]).toMatchObject({ type: "message", role: "assistant", content: "Hi there." }); }); + it("preserves assistant phase on replayed assistant messages", () => { + const items = convertMessagesToInputItems([ + assistantMsg(["Working on it."], [], "commentary"), + ] as Parameters[0]); + expect(items).toHaveLength(1); + expect(items[0]).toMatchObject({ + type: "message", + role: "assistant", + content: "Working on it.", + phase: "commentary", + }); + }); + it("converts an assistant message with a tool call", () => { const msg = assistantMsg( ["Let me run that."], @@ -408,10 +426,29 @@ describe("convertMessagesToInputItems", () => { call_id: "call_1", name: "exec", }); + expect(textItem).not.toHaveProperty("phase"); const fc = fcItem as { arguments: string }; expect(JSON.parse(fc.arguments)).toEqual({ cmd: "ls" }); }); + it("preserves assistant phase on commentary text before tool calls", () => { + const msg = assistantMsg( + ["Let me run that."], + [{ id: "call_1", name: "exec", args: { cmd: "ls" } }], + "commentary", + ); + const items = convertMessagesToInputItems([msg] as Parameters< + typeof convertMessagesToInputItems + >[0]); + const textItem = items.find((i) => i.type === "message"); + expect(textItem).toMatchObject({ + type: "message", + role: "assistant", + content: "Let me run that.", + phase: "commentary", + }); + }); + it("converts a tool result message", () => { const items = convertMessagesToInputItems([toolResultMsg("call_1", "file.txt")] as Parameters< typeof convertMessagesToInputItems @@ -594,6 +631,16 @@ describe("buildAssistantMessageFromResponse", () => { expect(msg.content).toEqual([]); expect(msg.stopReason).toBe("stop"); }); + + it("preserves phase from assistant message output items", () => { + const response = makeResponseObject("resp_8", "Final answer", undefined, "final_answer"); + const msg = buildAssistantMessageFromResponse(response, modelInfo) as { + phase?: string; + content: Array<{ type: string; text?: string }>; + }; + expect(msg.phase).toBe("final_answer"); + expect(msg.content[0]?.text).toBe("Final answer"); + }); }); // ───────────────────────────────────────────────────────────────────────────── @@ -633,6 +680,8 @@ describe("createOpenAIWebSocketStreamFn", () => { releaseWsSession("sess-fallback"); releaseWsSession("sess-incremental"); releaseWsSession("sess-full"); + releaseWsSession("sess-phase"); + releaseWsSession("sess-phase-replay"); releaseWsSession("sess-tools"); releaseWsSession("sess-store-default"); releaseWsSession("sess-store-compat"); @@ -795,6 +844,40 @@ describe("createOpenAIWebSocketStreamFn", () => { expect(doneEvent?.message.content[0]?.text).toBe("Hello back!"); }); + it("keeps assistant phase on completed WebSocket responses", async () => { + const streamFn = createOpenAIWebSocketStreamFn("sk-test", "sess-phase"); + const stream = streamFn( + modelStub as Parameters[0], + contextStub as Parameters[1], + ); + + const events: unknown[] = []; + const done = (async () => { + for await (const ev of await resolveStream(stream)) { + events.push(ev); + } + })(); + + await new Promise((r) => setImmediate(r)); + const manager = MockManager.lastInstance!; + manager.simulateEvent({ + type: "response.completed", + response: makeResponseObject("resp_phase", "Working...", "exec", "commentary"), + }); + + await done; + + const doneEvent = events.find((e) => (e as { type?: string }).type === "done") as + | { + type: string; + reason: string; + message: { phase?: string; stopReason: string }; + } + | undefined; + expect(doneEvent?.message.phase).toBe("commentary"); + expect(doneEvent?.message.stopReason).toBe("toolUse"); + }); + it("falls back to HTTP when WebSocket connect fails (session pre-broken via flag)", async () => { // Set the class-level flag BEFORE calling streamFn so the new instance // fails on connect(). We patch the static default via MockManager directly. @@ -898,6 +981,79 @@ describe("createOpenAIWebSocketStreamFn", () => { expect(inputTypes).toHaveLength(1); }); + it("replays assistant phase on a second turn full-context resend", async () => { + const sessionId = "sess-phase-replay"; + const streamFn = createOpenAIWebSocketStreamFn("sk-test", sessionId); + + const ctx1 = { + systemPrompt: "You are helpful.", + messages: [userMsg("Say hi")] as Parameters[0], + tools: [], + }; + + const stream1 = streamFn( + modelStub as Parameters[0], + ctx1 as Parameters[1], + ); + const done1 = (async () => { + for await (const _ of await resolveStream(stream1)) { + // consume + } + })(); + + await new Promise((r) => setImmediate(r)); + const manager = MockManager.lastInstance!; + manager.setPreviousResponseId("resp_phase_turn1"); + manager.simulateEvent({ + type: "response.completed", + response: makeResponseObject("resp_phase_turn1", "Hi there", undefined, "final_answer"), + }); + await done1; + + const ctx2 = { + systemPrompt: "You are helpful.", + messages: [ + userMsg("Say hi"), + assistantMsg(["Hi there"], [], "final_answer"), + userMsg("Follow up"), + ] as Parameters[0], + tools: [], + }; + + const stream2 = streamFn( + modelStub as Parameters[0], + ctx2 as Parameters[1], + ); + const done2 = (async () => { + for await (const _ of await resolveStream(stream2)) { + // consume + } + })(); + + await new Promise((r) => setImmediate(r)); + manager.simulateEvent({ + type: "response.completed", + response: makeResponseObject("resp_phase_turn2", "All done", undefined, "final_answer"), + }); + await done2; + + expect(manager.sentEvents).toHaveLength(2); + const sent2 = manager.sentEvents[1] as { + previous_response_id?: string; + input?: Array>; + }; + expect(sent2.previous_response_id).toBe("resp_phase_turn1"); + const assistantInput = (sent2.input ?? []).find( + (item) => item.type === "message" && item.role === "assistant", + ); + expect(assistantInput).toMatchObject({ + type: "message", + role: "assistant", + content: "Hi there", + phase: "final_answer", + }); + }); + it("sends instructions (system prompt) in each request", async () => { const streamFn = createOpenAIWebSocketStreamFn("sk-test", "sess-tools"); const ctx = { diff --git a/src/agents/openai-ws-stream.ts b/src/agents/openai-ws-stream.ts index 9591143d880..be01985f28a 100644 --- a/src/agents/openai-ws-stream.ts +++ b/src/agents/openai-ws-stream.ts @@ -37,6 +37,7 @@ import { type ContentPart, type FunctionToolDefinition, type InputItem, + type OpenAIResponsesAssistantPhase, type OpenAIWebSocketManagerOptions, type ResponseObject, } from "./openai-ws-connection.js"; @@ -100,6 +101,7 @@ export function hasWsSession(sessionId: string): boolean { // ───────────────────────────────────────────────────────────────────────────── type AnyMessage = Message & { role: string; content: unknown }; +type AssistantMessageWithPhase = AssistantMessage & { phase?: OpenAIResponsesAssistantPhase }; function toNonEmptyString(value: unknown): string | null { if (typeof value !== "string") { @@ -109,6 +111,10 @@ function toNonEmptyString(value: unknown): string | null { return trimmed.length > 0 ? trimmed : null; } +function normalizeAssistantPhase(value: unknown): OpenAIResponsesAssistantPhase | undefined { + return value === "commentary" || value === "final_answer" ? value : undefined; +} + /** Convert pi-ai content (string | ContentPart[]) to plain text. */ function contentToText(content: unknown): string { if (typeof content === "string") { @@ -193,6 +199,7 @@ export function convertMessagesToInputItems(messages: Message[]): InputItem[] { } if (m.role === "assistant") { + const assistantPhase = normalizeAssistantPhase((m as { phase?: unknown }).phase); const content = m.content; if (Array.isArray(content)) { // Collect text blocks and tool calls separately @@ -216,6 +223,7 @@ export function convertMessagesToInputItems(messages: Message[]): InputItem[] { type: "message", role: "assistant", content: textParts.join(""), + ...(assistantPhase ? { phase: assistantPhase } : {}), }); textParts.length = 0; } @@ -241,6 +249,7 @@ export function convertMessagesToInputItems(messages: Message[]): InputItem[] { type: "message", role: "assistant", content: textParts.join(""), + ...(assistantPhase ? { phase: assistantPhase } : {}), }); } } else { @@ -250,6 +259,7 @@ export function convertMessagesToInputItems(messages: Message[]): InputItem[] { type: "message", role: "assistant", content: text, + ...(assistantPhase ? { phase: assistantPhase } : {}), }); } } @@ -289,9 +299,14 @@ export function buildAssistantMessageFromResponse( modelInfo: { api: string; provider: string; id: string }, ): AssistantMessage { const content: (TextContent | ToolCall)[] = []; + let assistantPhase: OpenAIResponsesAssistantPhase | undefined; for (const item of response.output ?? []) { if (item.type === "message") { + const itemPhase = normalizeAssistantPhase(item.phase); + if (itemPhase) { + assistantPhase = itemPhase; + } for (const part of item.content ?? []) { if (part.type === "output_text" && part.text) { content.push({ type: "text", text: part.text }); @@ -321,7 +336,7 @@ export function buildAssistantMessageFromResponse( const hasToolCalls = content.some((c) => c.type === "toolCall"); const stopReason: StopReason = hasToolCalls ? "toolUse" : "stop"; - return buildAssistantMessage({ + const message = buildAssistantMessage({ model: modelInfo, content, stopReason, @@ -331,6 +346,10 @@ export function buildAssistantMessageFromResponse( totalTokens: response.usage?.total_tokens ?? 0, }), }); + + return assistantPhase + ? ({ ...message, phase: assistantPhase } as AssistantMessageWithPhase) + : message; } // ─────────────────────────────────────────────────────────────────────────────