Merge ad77e1340ef83377ff5575c3ec1365140ee679e1 into 9fb78453e088cd7b553d7779faa0de5c83708e70

This commit is contained in:
Eulices 2026-03-20 22:19:39 -07:00 committed by GitHub
commit f45dd853f4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 374 additions and 18 deletions

View File

@ -1,12 +1,19 @@
import Foundation
struct AssistantTextSegment: Identifiable {
enum Kind {
struct AssistantTextSegment: Identifiable, Equatable {
enum Kind: Equatable {
case thinking
case response
var stableIDPrefix: String {
switch self {
case .thinking: "t"
case .response: "r"
}
}
}
let id = UUID()
let id: String
let kind: Kind
let text: String
}
@ -16,7 +23,7 @@ enum AssistantTextParser {
let trimmed = raw.trimmingCharacters(in: .whitespacesAndNewlines)
guard !trimmed.isEmpty else { return [] }
guard raw.contains("<") else {
return [AssistantTextSegment(kind: .response, text: trimmed)]
return [AssistantTextSegment(id: "r0", kind: .response, text: trimmed)]
}
var segments: [AssistantTextSegment] = []
@ -51,7 +58,7 @@ enum AssistantTextParser {
}
guard matchedTag else {
return [AssistantTextSegment(kind: .response, text: trimmed)]
return [AssistantTextSegment(id: "r0", kind: .response, text: trimmed)]
}
if includeThinking {
@ -146,6 +153,11 @@ enum AssistantTextParser {
{
let trimmed = text.trimmingCharacters(in: .whitespacesAndNewlines)
guard !trimmed.isEmpty else { return }
segments.append(AssistantTextSegment(kind: kind, text: trimmed))
let nextIndex = segments.count
segments.append(
AssistantTextSegment(
id: "\(kind.stableIDPrefix)\(nextIndex)",
kind: kind,
text: trimmed))
}
}

View File

@ -275,7 +275,24 @@ public struct OpenClawAgentEventPayload: Codable, Sendable, Identifiable {
public let seq: Int?
public let stream: String
public let ts: Int?
public let sessionKey: String?
public let data: [String: AnyCodable]
public init(
runId: String,
seq: Int?,
stream: String,
ts: Int?,
sessionKey: String? = nil,
data: [String: AnyCodable])
{
self.runId = runId
self.seq = seq
self.stream = stream
self.ts = ts
self.sessionKey = sessionKey
self.data = data
}
}
public struct OpenClawChatPendingToolCall: Identifiable, Hashable, Sendable {

View File

@ -17,6 +17,7 @@ public struct OpenClawChatView: View {
@State private var hasPerformedInitialScroll = false
@State private var isPinnedToBottom = true
@State private var lastUserMessageID: UUID?
@State private var pendingStreamingScrollTask: Task<Void, Never>?
private let showsSessionSwitcher: Bool
private let style: Style
private let markdownVariant: ChatMarkdownVariant
@ -83,6 +84,10 @@ public struct OpenClawChatView: View {
}
.frame(maxWidth: .infinity, maxHeight: .infinity, alignment: .top)
.onAppear { self.viewModel.load() }
.onDisappear {
self.pendingStreamingScrollTask?.cancel()
self.pendingStreamingScrollTask = nil
}
.sheet(isPresented: self.$showSessions) {
if self.showsSessionSwitcher {
ChatSessionsSheet(viewModel: self.viewModel)
@ -180,6 +185,16 @@ public struct OpenClawChatView: View {
}
.onChange(of: self.viewModel.streamingAssistantText) { _, _ in
guard self.hasPerformedInitialScroll, self.isPinnedToBottom else { return }
self.scheduleStreamingScrollToBottom()
}
}
private func scheduleStreamingScrollToBottom() {
guard self.pendingStreamingScrollTask == nil else { return }
self.pendingStreamingScrollTask = Task { @MainActor in
defer { self.pendingStreamingScrollTask = nil }
try? await Task.sleep(nanoseconds: 100_000_000)
guard !Task.isCancelled, self.hasPerformedInitialScroll, self.isPinnedToBottom else { return }
withAnimation(.snappy(duration: 0.22)) {
self.scrollPosition = self.scrollerBottomID
}
@ -200,7 +215,7 @@ public struct OpenClawChatView: View {
alignment: msg.role.lowercased() == "user" ? .trailing : .leading)
}
if self.viewModel.pendingRunCount > 0 {
if self.viewModel.pendingRunCount > 0 && !self.hasVisibleStreamingAssistantText {
HStack {
ChatTypingIndicatorBubble(style: self.style)
.equatable()
@ -291,13 +306,16 @@ public struct OpenClawChatView: View {
return text
}
private var hasVisibleStreamingAssistantText: Bool {
guard let text = self.viewModel.streamingAssistantText else { return false }
return AssistantTextParser.hasVisibleContent(in: text, includeThinking: self.showsAssistantTrace)
}
private var hasVisibleMessageListContent: Bool {
if !self.visibleMessages.isEmpty {
return true
}
if let text = self.viewModel.streamingAssistantText,
AssistantTextParser.hasVisibleContent(in: text, includeThinking: self.showsAssistantTrace)
{
if self.hasVisibleStreamingAssistantText {
return true
}
if self.viewModel.pendingRunCount > 0 {

View File

@ -42,6 +42,7 @@ public final class OpenClawChatViewModel {
@ObservationIgnored
private nonisolated(unsafe) var eventTask: Task<Void, Never>?
private var hasLoadedInitialState = false
private var pendingRuns = Set<String>() {
didSet { self.pendingRunCount = self.pendingRuns.count }
}
@ -103,6 +104,8 @@ public final class OpenClawChatViewModel {
}
public func load() {
guard !self.hasLoadedInitialState else { return }
self.hasLoadedInitialState = true
Task { await self.bootstrap() }
}
@ -841,12 +844,25 @@ public final class OpenClawChatViewModel {
return
}
if !isOurRun {
// Keep multiple clients in sync: if another client finishes a run for our session, refresh history.
// Keep multiple clients in sync: if another client finishes a run for our session,
// surface the final message immediately when possible and reconcile with history in background.
let shouldResetExternalLiveState = self.pendingRuns.isEmpty
switch chat.state {
case "final", "aborted", "error":
self.streamingAssistantText = nil
self.pendingToolCallsById = [:]
case "final", "aborted":
if shouldResetExternalLiveState {
self.streamingAssistantText = nil
self.pendingToolCallsById = [:]
}
if let message = self.decodedAssistantMessage(from: chat.message) {
self.messages.append(message)
}
Task { await self.refreshHistoryAfterRun() }
case "error":
if shouldResetExternalLiveState {
self.streamingAssistantText = nil
self.pendingToolCallsById = [:]
Task { await self.refreshHistoryAfterRun() }
}
default:
break
}
@ -863,9 +879,13 @@ public final class OpenClawChatViewModel {
} else if self.pendingRuns.count <= 1 {
self.clearPendingRuns(reason: nil)
}
_ = self.appendFinalAssistantMessage(from: chat)
self.pendingToolCallsById = [:]
self.streamingAssistantText = nil
Task { await self.refreshHistoryAfterRun() }
if chat.state != "error" {
Task { await self.refreshHistoryAfterRun() }
}
default:
break
}
@ -886,10 +906,69 @@ public final class OpenClawChatViewModel {
return false
}
private func handleAgentEvent(_ evt: OpenClawAgentEventPayload) {
if let sessionId, evt.runId != sessionId {
return
private func shouldAcceptAgentEvent(_ evt: OpenClawAgentEventPayload) -> Bool {
if self.pendingRuns.contains(evt.runId) {
return true
}
if let sessionKey = evt.sessionKey {
return Self.matchesCurrentSessionKey(incoming: sessionKey, current: self.sessionKey)
}
if let sessionId {
return evt.runId == sessionId
}
return false
}
private func decodedAssistantMessage(from raw: AnyCodable?) -> OpenClawChatMessage? {
guard let raw else { return nil }
guard let decoded = try? ChatPayloadDecoding.decode(raw, as: OpenClawChatMessage.self) else {
return nil
}
let sanitized = Self.stripInboundMetadata(from: decoded)
guard sanitized.role.lowercased() == "assistant" else { return nil }
return sanitized
}
private func streamedAssistantMessage() -> OpenClawChatMessage? {
guard let text = self.streamingAssistantText,
AssistantTextParser.hasVisibleContent(in: text, includeThinking: false)
else {
return nil
}
return OpenClawChatMessage(
id: UUID(),
role: "assistant",
content: [
OpenClawChatMessageContent(
type: "text",
text: text,
thinking: nil,
thinkingSignature: nil,
mimeType: nil,
fileName: nil,
content: nil,
id: nil,
name: nil,
arguments: nil),
],
timestamp: Date().timeIntervalSince1970 * 1000)
}
@discardableResult
private func appendFinalAssistantMessage(from chat: OpenClawChatEventPayload) -> Bool {
if let message = self.decodedAssistantMessage(from: chat.message) {
self.messages.append(message)
return true
}
if let streamed = self.streamedAssistantMessage() {
self.messages.append(streamed)
return true
}
return false
}
private func handleAgentEvent(_ evt: OpenClawAgentEventPayload) {
guard self.shouldAcceptAgentEvent(evt) else { return }
switch evt.stream {
case "assistant":

View File

@ -48,4 +48,13 @@ import Testing
#expect(AssistantTextParser.hasVisibleContent(in: "<think>internal</think>") == false)
#expect(AssistantTextParser.hasVisibleContent(in: "<think>internal</think>", includeThinking: true))
}
@Test func usesStableSegmentIDsAcrossRepeatedStreamingParses() {
let raw = "<think>internal</think>\n\n<final>Hello there</final>"
let first = AssistantTextParser.segments(from: raw, includeThinking: true)
let second = AssistantTextParser.segments(from: raw, includeThinking: true)
#expect(first.map(\.id) == ["t0", "r1"])
#expect(first == second)
}
}

View File

@ -130,6 +130,7 @@ private func emitAssistantText(
transport: TestChatTransport,
runId: String,
text: String,
sessionKey: String? = nil,
seq: Int = 1)
{
transport.emit(
@ -139,12 +140,14 @@ private func emitAssistantText(
seq: seq,
stream: "assistant",
ts: Int(Date().timeIntervalSince1970 * 1000),
sessionKey: sessionKey,
data: ["text": AnyCodable(text)])))
}
private func emitToolStart(
transport: TestChatTransport,
runId: String,
sessionKey: String? = nil,
seq: Int = 2)
{
transport.emit(
@ -154,6 +157,7 @@ private func emitToolStart(
seq: seq,
stream: "tool",
ts: Int(Date().timeIntervalSince1970 * 1000),
sessionKey: sessionKey,
data: [
"phase": AnyCodable("start"),
"name": AnyCodable("demo"),
@ -334,6 +338,10 @@ private final class TestChatTransport: @unchecked Sendable, OpenClawChatTranspor
return ids.last
}
func historyRequestCount() async -> Int {
await self.state.historyCallCount
}
func abortedRunIds() async -> [String] {
await self.state.abortedRunIds
}
@ -470,6 +478,86 @@ extension TestChatTransportState {
}
}
@Test func acceptsAgentEventsForCanonicalSessionKeyEvenWhenRunIDDiffersFromSessionID() async throws {
let sessionId = "sess-main"
let history = historyPayload(sessionId: sessionId)
let (transport, vm) = await makeViewModel(historyResponses: [history])
try await loadAndWaitBootstrap(vm: vm, sessionId: sessionId)
emitAssistantText(
transport: transport,
runId: "server-run-1",
text: "stream via session key",
sessionKey: "agent:main:main")
emitToolStart(
transport: transport,
runId: "server-run-1",
sessionKey: "agent:main:main")
try await waitUntil("assistant stream visible via sessionKey") {
await MainActor.run { vm.streamingAssistantText == "stream via session key" }
}
try await waitUntil("tool call pending via sessionKey") {
await MainActor.run { vm.pendingToolCalls.count == 1 }
}
}
@Test func ignoresAgentEventsForOtherSessionKeys() async throws {
let sessionId = "sess-main"
let history = historyPayload(sessionId: sessionId)
let (transport, vm) = await makeViewModel(historyResponses: [history])
try await loadAndWaitBootstrap(vm: vm, sessionId: sessionId)
emitAssistantText(
transport: transport,
runId: "server-run-2",
text: "wrong session",
sessionKey: "agent:main:other")
try? await Task.sleep(nanoseconds: 150_000_000)
#expect(await MainActor.run { vm.streamingAssistantText } == nil)
}
@Test func acceptsPendingRunAgentEventsEvenWhenSessionKeyMismatches() async throws {
let history = historyPayload(sessionId: "sess-main")
let (transport, vm) = await makeViewModel(historyResponses: [history])
try await loadAndWaitBootstrap(vm: vm, sessionId: "sess-main")
await sendUserMessage(vm)
try await waitUntil("pending run starts") { await transport.lastSentRunId() != nil }
let runId = try #require(await transport.lastSentRunId())
emitAssistantText(
transport: transport,
runId: runId,
text: "own run still streams",
sessionKey: "agent:main:other")
try await waitUntil("assistant stream visible for own run despite key mismatch") {
await MainActor.run { vm.streamingAssistantText == "own run still streams" }
}
}
@Test func loadIsIdempotentDuringActiveStream() async throws {
let history = historyPayload(messages: [chatTextMessage(role: "user", text: "hello", timestamp: 1)])
let (transport, vm) = await makeViewModel(historyResponses: [history])
try await loadAndWaitBootstrap(vm: vm)
await sendUserMessage(vm)
try await waitUntil("pending run starts") { await transport.lastSentRunId() != nil }
let runId = try #require(await transport.lastSentRunId())
emitAssistantText(transport: transport, runId: runId, text: "stream survives load")
try await waitUntil("assistant stream visible") {
await MainActor.run { vm.streamingAssistantText == "stream survives load" }
}
await MainActor.run { vm.load() }
try? await Task.sleep(nanoseconds: 150_000_000)
#expect(await transport.historyRequestCount() == 1)
#expect(await MainActor.run { vm.streamingAssistantText } == "stream survives load")
#expect(await MainActor.run { vm.pendingRunCount } == 1)
}
@Test func acceptsCanonicalSessionKeyEventsForExternalRuns() async throws {
let now = Date().timeIntervalSince1970 * 1000
let history1 = historyPayload(messages: [chatTextMessage(role: "user", text: "first", timestamp: now)])
@ -498,6 +586,139 @@ extension TestChatTransportState {
}
}
@Test func appendsFinalAssistantMessageImmediatelyAndRefreshesHistory() async throws {
let now = Date().timeIntervalSince1970 * 1000
let history1 = historyPayload(messages: [])
let history2 = historyPayload(
messages: [
chatTextMessage(role: "assistant", text: "final from event", timestamp: now),
])
let (transport, vm) = await makeViewModel(historyResponses: [history1, history2])
try await loadAndWaitBootstrap(vm: vm)
await sendUserMessage(vm)
try await waitUntil("pending run starts") { await MainActor.run { vm.pendingRunCount == 1 } }
emitAssistantText(transport: transport, runId: "sess-main", text: "partial")
try await waitUntil("assistant stream visible") {
await MainActor.run { vm.streamingAssistantText == "partial" }
}
let runId = try #require(await transport.lastSentRunId())
let finalMessage = AnyCodable([
"role": "assistant",
"content": [["type": "text", "text": "final from event"]],
"timestamp": now,
])
transport.emit(
.chat(
OpenClawChatEventPayload(
runId: runId,
sessionKey: "main",
state: "final",
message: finalMessage,
errorMessage: nil)))
try await waitUntil("final message appended immediately") {
await MainActor.run {
vm.messages.contains(where: { message in
message.role == "assistant" && message.content.contains(where: { $0.text == "final from event" })
})
}
}
try await waitUntil("history refreshed after local final message") {
await transport.historyRequestCount() == 2
}
#expect(await MainActor.run { vm.streamingAssistantText } == nil)
}
@Test func externalFinalMessageStillRefreshesHistory() async throws {
let now = Date().timeIntervalSince1970 * 1000
let history1 = historyPayload(messages: [])
let history2 = historyPayload(
messages: [
chatTextMessage(role: "user", text: "prompt from another client", timestamp: now),
chatTextMessage(role: "assistant", text: "final from external event", timestamp: now + 1),
])
let (transport, vm) = await makeViewModel(historyResponses: [history1, history2])
await MainActor.run { vm.load() }
try await waitUntil("bootstrap history loaded") { await MainActor.run { vm.messages.isEmpty } }
#expect(await transport.historyRequestCount() == 1)
let finalMessage = AnyCodable([
"role": "assistant",
"content": [["type": "text", "text": "final from external event"]],
"timestamp": now + 1,
])
transport.emit(
.chat(
OpenClawChatEventPayload(
runId: "external-run",
sessionKey: "agent:main:main",
state: "final",
message: finalMessage,
errorMessage: nil)))
try await waitUntil("final message appended immediately") {
await MainActor.run {
vm.messages.contains(where: { message in
message.role == "assistant" && message.content.contains(where: { $0.text == "final from external event" })
})
}
}
try await waitUntil("history refreshed after external final message") {
await transport.historyRequestCount() == 2
}
try await waitUntil("user prompt synced from history") {
await MainActor.run {
vm.messages.contains(where: { message in
message.role == "user" && message.content.contains(where: { $0.text == "prompt from another client" })
})
}
}
}
@Test func externalFinalMessageRefreshesHistoryEvenDuringLocalPendingRun() async throws {
let now = Date().timeIntervalSince1970 * 1000
let history1 = historyPayload(messages: [])
let history2 = historyPayload(
messages: [
chatTextMessage(role: "user", text: "prompt from another client", timestamp: now),
chatTextMessage(role: "assistant", text: "external final while local run pending", timestamp: now + 1),
])
let (transport, vm) = await makeViewModel(historyResponses: [history1, history2])
try await loadAndWaitBootstrap(vm: vm)
await sendUserMessage(vm, text: "local pending run")
try await waitUntil("local pending run starts") { await MainActor.run { vm.pendingRunCount == 1 } }
#expect(await transport.historyRequestCount() == 1)
let finalMessage = AnyCodable([
"role": "assistant",
"content": [["type": "text", "text": "external final while local run pending"]],
"timestamp": now + 1,
])
transport.emit(
.chat(
OpenClawChatEventPayload(
runId: "external-run",
sessionKey: "agent:main:main",
state: "final",
message: finalMessage,
errorMessage: nil)))
try await waitUntil("history refreshed after external final with local pending run") {
await transport.historyRequestCount() == 2
}
try await waitUntil("external prompt synced from history during local pending run") {
await MainActor.run {
vm.messages.contains(where: { message in
message.role == "user" && message.content.contains(where: { $0.text == "prompt from another client" })
})
}
}
}
@Test func preservesMessageIDsAcrossHistoryRefreshes() async throws {
let now = Date().timeIntervalSince1970 * 1000
let history1 = historyPayload(messages: [chatTextMessage(role: "user", text: "hello", timestamp: now)])