fix(ios): scope chat streaming to the active session
This commit is contained in:
parent
dc86b6d72a
commit
cb56c29fd3
@ -275,7 +275,24 @@ public struct OpenClawAgentEventPayload: Codable, Sendable, Identifiable {
|
||||
public let seq: Int?
|
||||
public let stream: String
|
||||
public let ts: Int?
|
||||
public let sessionKey: String?
|
||||
public let data: [String: AnyCodable]
|
||||
|
||||
public init(
|
||||
runId: String,
|
||||
seq: Int?,
|
||||
stream: String,
|
||||
ts: Int?,
|
||||
sessionKey: String? = nil,
|
||||
data: [String: AnyCodable])
|
||||
{
|
||||
self.runId = runId
|
||||
self.seq = seq
|
||||
self.stream = stream
|
||||
self.ts = ts
|
||||
self.sessionKey = sessionKey
|
||||
self.data = data
|
||||
}
|
||||
}
|
||||
|
||||
public struct OpenClawChatPendingToolCall: Identifiable, Hashable, Sendable {
|
||||
|
||||
@ -200,7 +200,7 @@ public struct OpenClawChatView: View {
|
||||
alignment: msg.role.lowercased() == "user" ? .trailing : .leading)
|
||||
}
|
||||
|
||||
if self.viewModel.pendingRunCount > 0 {
|
||||
if self.viewModel.pendingRunCount > 0 && !self.hasVisibleStreamingAssistantText {
|
||||
HStack {
|
||||
ChatTypingIndicatorBubble(style: self.style)
|
||||
.equatable()
|
||||
@ -291,13 +291,16 @@ public struct OpenClawChatView: View {
|
||||
return text
|
||||
}
|
||||
|
||||
private var hasVisibleStreamingAssistantText: Bool {
|
||||
guard let text = self.viewModel.streamingAssistantText else { return false }
|
||||
return AssistantTextParser.hasVisibleContent(in: text, includeThinking: self.showsAssistantTrace)
|
||||
}
|
||||
|
||||
private var hasVisibleMessageListContent: Bool {
|
||||
if !self.visibleMessages.isEmpty {
|
||||
return true
|
||||
}
|
||||
if let text = self.viewModel.streamingAssistantText,
|
||||
AssistantTextParser.hasVisibleContent(in: text, includeThinking: self.showsAssistantTrace)
|
||||
{
|
||||
if self.hasVisibleStreamingAssistantText {
|
||||
return true
|
||||
}
|
||||
if self.viewModel.pendingRunCount > 0 {
|
||||
|
||||
@ -841,12 +841,26 @@ public final class OpenClawChatViewModel {
|
||||
return
|
||||
}
|
||||
if !isOurRun {
|
||||
// Keep multiple clients in sync: if another client finishes a run for our session, refresh history.
|
||||
// Keep multiple clients in sync: if another client finishes a run for our session,
|
||||
// surface the final message immediately when possible and reconcile with history in background.
|
||||
let shouldResetExternalLiveState = self.pendingRuns.isEmpty
|
||||
switch chat.state {
|
||||
case "final", "aborted", "error":
|
||||
self.streamingAssistantText = nil
|
||||
self.pendingToolCallsById = [:]
|
||||
Task { await self.refreshHistoryAfterRun() }
|
||||
case "final", "aborted":
|
||||
if shouldResetExternalLiveState {
|
||||
self.streamingAssistantText = nil
|
||||
self.pendingToolCallsById = [:]
|
||||
}
|
||||
if let message = self.decodedAssistantMessage(from: chat.message) {
|
||||
self.messages.append(message)
|
||||
} else if shouldResetExternalLiveState {
|
||||
Task { await self.refreshHistoryAfterRun() }
|
||||
}
|
||||
case "error":
|
||||
if shouldResetExternalLiveState {
|
||||
self.streamingAssistantText = nil
|
||||
self.pendingToolCallsById = [:]
|
||||
Task { await self.refreshHistoryAfterRun() }
|
||||
}
|
||||
default:
|
||||
break
|
||||
}
|
||||
@ -863,9 +877,13 @@ public final class OpenClawChatViewModel {
|
||||
} else if self.pendingRuns.count <= 1 {
|
||||
self.clearPendingRuns(reason: nil)
|
||||
}
|
||||
|
||||
let appendedFinalMessage = self.appendFinalAssistantMessage(from: chat)
|
||||
self.pendingToolCallsById = [:]
|
||||
self.streamingAssistantText = nil
|
||||
Task { await self.refreshHistoryAfterRun() }
|
||||
if !appendedFinalMessage {
|
||||
Task { await self.refreshHistoryAfterRun() }
|
||||
}
|
||||
default:
|
||||
break
|
||||
}
|
||||
@ -886,10 +904,69 @@ public final class OpenClawChatViewModel {
|
||||
return false
|
||||
}
|
||||
|
||||
private func handleAgentEvent(_ evt: OpenClawAgentEventPayload) {
|
||||
if let sessionId, evt.runId != sessionId {
|
||||
return
|
||||
private func shouldAcceptAgentEvent(_ evt: OpenClawAgentEventPayload) -> Bool {
|
||||
if let sessionKey = evt.sessionKey {
|
||||
return Self.matchesCurrentSessionKey(incoming: sessionKey, current: self.sessionKey)
|
||||
}
|
||||
if self.pendingRuns.contains(evt.runId) {
|
||||
return true
|
||||
}
|
||||
if let sessionId {
|
||||
return evt.runId == sessionId
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
private func decodedAssistantMessage(from raw: AnyCodable?) -> OpenClawChatMessage? {
|
||||
guard let raw else { return nil }
|
||||
guard let decoded = try? ChatPayloadDecoding.decode(raw, as: OpenClawChatMessage.self) else {
|
||||
return nil
|
||||
}
|
||||
let sanitized = Self.stripInboundMetadata(from: decoded)
|
||||
guard sanitized.role.lowercased() == "assistant" else { return nil }
|
||||
return sanitized
|
||||
}
|
||||
|
||||
private func streamedAssistantMessage() -> OpenClawChatMessage? {
|
||||
guard let text = self.streamingAssistantText,
|
||||
AssistantTextParser.hasVisibleContent(in: text, includeThinking: false)
|
||||
else {
|
||||
return nil
|
||||
}
|
||||
return OpenClawChatMessage(
|
||||
id: UUID(),
|
||||
role: "assistant",
|
||||
content: [
|
||||
OpenClawChatMessageContent(
|
||||
type: "text",
|
||||
text: text,
|
||||
thinking: nil,
|
||||
thinkingSignature: nil,
|
||||
mimeType: nil,
|
||||
fileName: nil,
|
||||
content: nil,
|
||||
id: nil,
|
||||
name: nil,
|
||||
arguments: nil),
|
||||
],
|
||||
timestamp: Date().timeIntervalSince1970 * 1000)
|
||||
}
|
||||
|
||||
@discardableResult
|
||||
private func appendFinalAssistantMessage(from chat: OpenClawChatEventPayload) -> Bool {
|
||||
if let message = self.decodedAssistantMessage(from: chat.message) {
|
||||
self.messages.append(message)
|
||||
return true
|
||||
}
|
||||
if let streamed = self.streamedAssistantMessage() {
|
||||
self.messages.append(streamed)
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
private func handleAgentEvent(_ evt: OpenClawAgentEventPayload) {
|
||||
guard self.shouldAcceptAgentEvent(evt) else { return }
|
||||
|
||||
switch evt.stream {
|
||||
case "assistant":
|
||||
|
||||
@ -130,6 +130,7 @@ private func emitAssistantText(
|
||||
transport: TestChatTransport,
|
||||
runId: String,
|
||||
text: String,
|
||||
sessionKey: String? = nil,
|
||||
seq: Int = 1)
|
||||
{
|
||||
transport.emit(
|
||||
@ -139,12 +140,14 @@ private func emitAssistantText(
|
||||
seq: seq,
|
||||
stream: "assistant",
|
||||
ts: Int(Date().timeIntervalSince1970 * 1000),
|
||||
sessionKey: sessionKey,
|
||||
data: ["text": AnyCodable(text)])))
|
||||
}
|
||||
|
||||
private func emitToolStart(
|
||||
transport: TestChatTransport,
|
||||
runId: String,
|
||||
sessionKey: String? = nil,
|
||||
seq: Int = 2)
|
||||
{
|
||||
transport.emit(
|
||||
@ -154,6 +157,7 @@ private func emitToolStart(
|
||||
seq: seq,
|
||||
stream: "tool",
|
||||
ts: Int(Date().timeIntervalSince1970 * 1000),
|
||||
sessionKey: sessionKey,
|
||||
data: [
|
||||
"phase": AnyCodable("start"),
|
||||
"name": AnyCodable("demo"),
|
||||
@ -470,6 +474,46 @@ extension TestChatTransportState {
|
||||
}
|
||||
}
|
||||
|
||||
@Test func acceptsAgentEventsForCanonicalSessionKeyEvenWhenRunIDDiffersFromSessionID() async throws {
|
||||
let sessionId = "sess-main"
|
||||
let history = historyPayload(sessionId: sessionId)
|
||||
let (transport, vm) = await makeViewModel(historyResponses: [history])
|
||||
try await loadAndWaitBootstrap(vm: vm, sessionId: sessionId)
|
||||
|
||||
emitAssistantText(
|
||||
transport: transport,
|
||||
runId: "server-run-1",
|
||||
text: "stream via session key",
|
||||
sessionKey: "agent:main:main")
|
||||
emitToolStart(
|
||||
transport: transport,
|
||||
runId: "server-run-1",
|
||||
sessionKey: "agent:main:main")
|
||||
|
||||
try await waitUntil("assistant stream visible via sessionKey") {
|
||||
await MainActor.run { vm.streamingAssistantText == "stream via session key" }
|
||||
}
|
||||
try await waitUntil("tool call pending via sessionKey") {
|
||||
await MainActor.run { vm.pendingToolCalls.count == 1 }
|
||||
}
|
||||
}
|
||||
|
||||
@Test func ignoresAgentEventsForOtherSessionKeys() async throws {
|
||||
let sessionId = "sess-main"
|
||||
let history = historyPayload(sessionId: sessionId)
|
||||
let (transport, vm) = await makeViewModel(historyResponses: [history])
|
||||
try await loadAndWaitBootstrap(vm: vm, sessionId: sessionId)
|
||||
|
||||
emitAssistantText(
|
||||
transport: transport,
|
||||
runId: "server-run-2",
|
||||
text: "wrong session",
|
||||
sessionKey: "agent:main:other")
|
||||
|
||||
try? await Task.sleep(nanoseconds: 150_000_000)
|
||||
#expect(await MainActor.run { vm.streamingAssistantText } == nil)
|
||||
}
|
||||
|
||||
@Test func acceptsCanonicalSessionKeyEventsForExternalRuns() async throws {
|
||||
let now = Date().timeIntervalSince1970 * 1000
|
||||
let history1 = historyPayload(messages: [chatTextMessage(role: "user", text: "first", timestamp: now)])
|
||||
@ -498,6 +542,43 @@ extension TestChatTransportState {
|
||||
}
|
||||
}
|
||||
|
||||
@Test func appendsFinalAssistantMessageImmediatelyWithoutHistoryRefresh() async throws {
|
||||
let history = historyPayload(messages: [])
|
||||
let (transport, vm) = await makeViewModel(historyResponses: [history])
|
||||
try await loadAndWaitBootstrap(vm: vm)
|
||||
await sendUserMessage(vm)
|
||||
try await waitUntil("pending run starts") { await MainActor.run { vm.pendingRunCount == 1 } }
|
||||
|
||||
emitAssistantText(transport: transport, runId: "sess-main", text: "partial")
|
||||
try await waitUntil("assistant stream visible") {
|
||||
await MainActor.run { vm.streamingAssistantText == "partial" }
|
||||
}
|
||||
|
||||
let runId = try #require(await transport.lastSentRunId())
|
||||
let finalMessage = AnyCodable([
|
||||
"role": "assistant",
|
||||
"content": [["type": "text", "text": "final from event"]],
|
||||
"timestamp": Date().timeIntervalSince1970 * 1000,
|
||||
])
|
||||
transport.emit(
|
||||
.chat(
|
||||
OpenClawChatEventPayload(
|
||||
runId: runId,
|
||||
sessionKey: "main",
|
||||
state: "final",
|
||||
message: finalMessage,
|
||||
errorMessage: nil)))
|
||||
|
||||
try await waitUntil("final message appended immediately") {
|
||||
await MainActor.run {
|
||||
vm.messages.contains(where: { message in
|
||||
message.role == "assistant" && message.content.contains(where: { $0.text == "final from event" })
|
||||
})
|
||||
}
|
||||
}
|
||||
#expect(await MainActor.run { vm.streamingAssistantText } == nil)
|
||||
}
|
||||
|
||||
@Test func preservesMessageIDsAcrossHistoryRefreshes() async throws {
|
||||
let now = Date().timeIntervalSince1970 * 1000
|
||||
let history1 = historyPayload(messages: [chatTextMessage(role: "user", text: "hello", timestamp: now)])
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user