From cb56c29fd3fa124eed908c01314941816ef8bff5 Mon Sep 17 00:00:00 2001 From: Eulices Lopez <105620565+eulicesl@users.noreply.github.com> Date: Tue, 17 Mar 2026 15:24:39 -0400 Subject: [PATCH] fix(ios): scope chat streaming to the active session --- .../Sources/OpenClawChatUI/ChatModels.swift | 17 ++++ .../Sources/OpenClawChatUI/ChatView.swift | 11 ++- .../OpenClawChatUI/ChatViewModel.swift | 95 +++++++++++++++++-- .../OpenClawKitTests/ChatViewModelTests.swift | 81 ++++++++++++++++ 4 files changed, 191 insertions(+), 13 deletions(-) diff --git a/apps/shared/OpenClawKit/Sources/OpenClawChatUI/ChatModels.swift b/apps/shared/OpenClawKit/Sources/OpenClawChatUI/ChatModels.swift index c58f2d702e4..3f7b0ea83f6 100644 --- a/apps/shared/OpenClawKit/Sources/OpenClawChatUI/ChatModels.swift +++ b/apps/shared/OpenClawKit/Sources/OpenClawChatUI/ChatModels.swift @@ -275,7 +275,24 @@ public struct OpenClawAgentEventPayload: Codable, Sendable, Identifiable { public let seq: Int? public let stream: String public let ts: Int? + public let sessionKey: String? public let data: [String: AnyCodable] + + public init( + runId: String, + seq: Int?, + stream: String, + ts: Int?, + sessionKey: String? = nil, + data: [String: AnyCodable]) + { + self.runId = runId + self.seq = seq + self.stream = stream + self.ts = ts + self.sessionKey = sessionKey + self.data = data + } } public struct OpenClawChatPendingToolCall: Identifiable, Hashable, Sendable { diff --git a/apps/shared/OpenClawKit/Sources/OpenClawChatUI/ChatView.swift b/apps/shared/OpenClawKit/Sources/OpenClawChatUI/ChatView.swift index c760fad30d5..c3ca0a344be 100644 --- a/apps/shared/OpenClawKit/Sources/OpenClawChatUI/ChatView.swift +++ b/apps/shared/OpenClawKit/Sources/OpenClawChatUI/ChatView.swift @@ -200,7 +200,7 @@ public struct OpenClawChatView: View { alignment: msg.role.lowercased() == "user" ? .trailing : .leading) } - if self.viewModel.pendingRunCount > 0 { + if self.viewModel.pendingRunCount > 0 && !self.hasVisibleStreamingAssistantText { HStack { ChatTypingIndicatorBubble(style: self.style) .equatable() @@ -291,13 +291,16 @@ public struct OpenClawChatView: View { return text } + private var hasVisibleStreamingAssistantText: Bool { + guard let text = self.viewModel.streamingAssistantText else { return false } + return AssistantTextParser.hasVisibleContent(in: text, includeThinking: self.showsAssistantTrace) + } + private var hasVisibleMessageListContent: Bool { if !self.visibleMessages.isEmpty { return true } - if let text = self.viewModel.streamingAssistantText, - AssistantTextParser.hasVisibleContent(in: text, includeThinking: self.showsAssistantTrace) - { + if self.hasVisibleStreamingAssistantText { return true } if self.viewModel.pendingRunCount > 0 { diff --git a/apps/shared/OpenClawKit/Sources/OpenClawChatUI/ChatViewModel.swift b/apps/shared/OpenClawKit/Sources/OpenClawChatUI/ChatViewModel.swift index 92413aefe64..21ab6d6f0aa 100644 --- a/apps/shared/OpenClawKit/Sources/OpenClawChatUI/ChatViewModel.swift +++ b/apps/shared/OpenClawKit/Sources/OpenClawChatUI/ChatViewModel.swift @@ -841,12 +841,26 @@ public final class OpenClawChatViewModel { return } if !isOurRun { - // Keep multiple clients in sync: if another client finishes a run for our session, refresh history. + // Keep multiple clients in sync: if another client finishes a run for our session, + // surface the final message immediately when possible and reconcile with history in background. + let shouldResetExternalLiveState = self.pendingRuns.isEmpty switch chat.state { - case "final", "aborted", "error": - self.streamingAssistantText = nil - self.pendingToolCallsById = [:] - Task { await self.refreshHistoryAfterRun() } + case "final", "aborted": + if shouldResetExternalLiveState { + self.streamingAssistantText = nil + self.pendingToolCallsById = [:] + } + if let message = self.decodedAssistantMessage(from: chat.message) { + self.messages.append(message) + } else if shouldResetExternalLiveState { + Task { await self.refreshHistoryAfterRun() } + } + case "error": + if shouldResetExternalLiveState { + self.streamingAssistantText = nil + self.pendingToolCallsById = [:] + Task { await self.refreshHistoryAfterRun() } + } default: break } @@ -863,9 +877,13 @@ public final class OpenClawChatViewModel { } else if self.pendingRuns.count <= 1 { self.clearPendingRuns(reason: nil) } + + let appendedFinalMessage = self.appendFinalAssistantMessage(from: chat) self.pendingToolCallsById = [:] self.streamingAssistantText = nil - Task { await self.refreshHistoryAfterRun() } + if !appendedFinalMessage { + Task { await self.refreshHistoryAfterRun() } + } default: break } @@ -886,10 +904,69 @@ public final class OpenClawChatViewModel { return false } - private func handleAgentEvent(_ evt: OpenClawAgentEventPayload) { - if let sessionId, evt.runId != sessionId { - return + private func shouldAcceptAgentEvent(_ evt: OpenClawAgentEventPayload) -> Bool { + if let sessionKey = evt.sessionKey { + return Self.matchesCurrentSessionKey(incoming: sessionKey, current: self.sessionKey) } + if self.pendingRuns.contains(evt.runId) { + return true + } + if let sessionId { + return evt.runId == sessionId + } + return false + } + + private func decodedAssistantMessage(from raw: AnyCodable?) -> OpenClawChatMessage? { + guard let raw else { return nil } + guard let decoded = try? ChatPayloadDecoding.decode(raw, as: OpenClawChatMessage.self) else { + return nil + } + let sanitized = Self.stripInboundMetadata(from: decoded) + guard sanitized.role.lowercased() == "assistant" else { return nil } + return sanitized + } + + private func streamedAssistantMessage() -> OpenClawChatMessage? { + guard let text = self.streamingAssistantText, + AssistantTextParser.hasVisibleContent(in: text, includeThinking: false) + else { + return nil + } + return OpenClawChatMessage( + id: UUID(), + role: "assistant", + content: [ + OpenClawChatMessageContent( + type: "text", + text: text, + thinking: nil, + thinkingSignature: nil, + mimeType: nil, + fileName: nil, + content: nil, + id: nil, + name: nil, + arguments: nil), + ], + timestamp: Date().timeIntervalSince1970 * 1000) + } + + @discardableResult + private func appendFinalAssistantMessage(from chat: OpenClawChatEventPayload) -> Bool { + if let message = self.decodedAssistantMessage(from: chat.message) { + self.messages.append(message) + return true + } + if let streamed = self.streamedAssistantMessage() { + self.messages.append(streamed) + return true + } + return false + } + + private func handleAgentEvent(_ evt: OpenClawAgentEventPayload) { + guard self.shouldAcceptAgentEvent(evt) else { return } switch evt.stream { case "assistant": diff --git a/apps/shared/OpenClawKit/Tests/OpenClawKitTests/ChatViewModelTests.swift b/apps/shared/OpenClawKit/Tests/OpenClawKitTests/ChatViewModelTests.swift index 6d1fa88e569..d0e63151f5a 100644 --- a/apps/shared/OpenClawKit/Tests/OpenClawKitTests/ChatViewModelTests.swift +++ b/apps/shared/OpenClawKit/Tests/OpenClawKitTests/ChatViewModelTests.swift @@ -130,6 +130,7 @@ private func emitAssistantText( transport: TestChatTransport, runId: String, text: String, + sessionKey: String? = nil, seq: Int = 1) { transport.emit( @@ -139,12 +140,14 @@ private func emitAssistantText( seq: seq, stream: "assistant", ts: Int(Date().timeIntervalSince1970 * 1000), + sessionKey: sessionKey, data: ["text": AnyCodable(text)]))) } private func emitToolStart( transport: TestChatTransport, runId: String, + sessionKey: String? = nil, seq: Int = 2) { transport.emit( @@ -154,6 +157,7 @@ private func emitToolStart( seq: seq, stream: "tool", ts: Int(Date().timeIntervalSince1970 * 1000), + sessionKey: sessionKey, data: [ "phase": AnyCodable("start"), "name": AnyCodable("demo"), @@ -470,6 +474,46 @@ extension TestChatTransportState { } } + @Test func acceptsAgentEventsForCanonicalSessionKeyEvenWhenRunIDDiffersFromSessionID() async throws { + let sessionId = "sess-main" + let history = historyPayload(sessionId: sessionId) + let (transport, vm) = await makeViewModel(historyResponses: [history]) + try await loadAndWaitBootstrap(vm: vm, sessionId: sessionId) + + emitAssistantText( + transport: transport, + runId: "server-run-1", + text: "stream via session key", + sessionKey: "agent:main:main") + emitToolStart( + transport: transport, + runId: "server-run-1", + sessionKey: "agent:main:main") + + try await waitUntil("assistant stream visible via sessionKey") { + await MainActor.run { vm.streamingAssistantText == "stream via session key" } + } + try await waitUntil("tool call pending via sessionKey") { + await MainActor.run { vm.pendingToolCalls.count == 1 } + } + } + + @Test func ignoresAgentEventsForOtherSessionKeys() async throws { + let sessionId = "sess-main" + let history = historyPayload(sessionId: sessionId) + let (transport, vm) = await makeViewModel(historyResponses: [history]) + try await loadAndWaitBootstrap(vm: vm, sessionId: sessionId) + + emitAssistantText( + transport: transport, + runId: "server-run-2", + text: "wrong session", + sessionKey: "agent:main:other") + + try? await Task.sleep(nanoseconds: 150_000_000) + #expect(await MainActor.run { vm.streamingAssistantText } == nil) + } + @Test func acceptsCanonicalSessionKeyEventsForExternalRuns() async throws { let now = Date().timeIntervalSince1970 * 1000 let history1 = historyPayload(messages: [chatTextMessage(role: "user", text: "first", timestamp: now)]) @@ -498,6 +542,43 @@ extension TestChatTransportState { } } + @Test func appendsFinalAssistantMessageImmediatelyWithoutHistoryRefresh() async throws { + let history = historyPayload(messages: []) + let (transport, vm) = await makeViewModel(historyResponses: [history]) + try await loadAndWaitBootstrap(vm: vm) + await sendUserMessage(vm) + try await waitUntil("pending run starts") { await MainActor.run { vm.pendingRunCount == 1 } } + + emitAssistantText(transport: transport, runId: "sess-main", text: "partial") + try await waitUntil("assistant stream visible") { + await MainActor.run { vm.streamingAssistantText == "partial" } + } + + let runId = try #require(await transport.lastSentRunId()) + let finalMessage = AnyCodable([ + "role": "assistant", + "content": [["type": "text", "text": "final from event"]], + "timestamp": Date().timeIntervalSince1970 * 1000, + ]) + transport.emit( + .chat( + OpenClawChatEventPayload( + runId: runId, + sessionKey: "main", + state: "final", + message: finalMessage, + errorMessage: nil))) + + try await waitUntil("final message appended immediately") { + await MainActor.run { + vm.messages.contains(where: { message in + message.role == "assistant" && message.content.contains(where: { $0.text == "final from event" }) + }) + } + } + #expect(await MainActor.run { vm.streamingAssistantText } == nil) + } + @Test func preservesMessageIDsAcrossHistoryRefreshes() async throws { let now = Date().timeIntervalSince1970 * 1000 let history1 = historyPayload(messages: [chatTextMessage(role: "user", text: "hello", timestamp: now)])