feat: push session list updates over websocket
This commit is contained in:
parent
c0e5e8db22
commit
761e5ce5f8
@ -644,5 +644,16 @@ export function createAgentEventHandler({
|
||||
agentRunSeq.delete(evt.runId);
|
||||
agentRunSeq.delete(clientRunId);
|
||||
}
|
||||
|
||||
if (
|
||||
sessionKey &&
|
||||
(lifecyclePhase === "start" || lifecyclePhase === "end" || lifecyclePhase === "error")
|
||||
) {
|
||||
broadcast(
|
||||
"sessions.changed",
|
||||
{ sessionKey, phase: lifecyclePhase, runId: evt.runId, ts: evt.ts },
|
||||
{ dropIfSlow: true },
|
||||
);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@ -54,6 +54,8 @@ const BASE_METHODS = [
|
||||
"secrets.reload",
|
||||
"secrets.resolve",
|
||||
"sessions.list",
|
||||
"sessions.subscribe",
|
||||
"sessions.unsubscribe",
|
||||
"sessions.preview",
|
||||
"sessions.patch",
|
||||
"sessions.reset",
|
||||
@ -114,6 +116,8 @@ export const GATEWAY_EVENTS = [
|
||||
"connect.challenge",
|
||||
"agent",
|
||||
"chat",
|
||||
"session.message",
|
||||
"sessions.changed",
|
||||
"presence",
|
||||
"tick",
|
||||
"talk.mode",
|
||||
|
||||
@ -43,7 +43,12 @@ import {
|
||||
} from "../session-utils.js";
|
||||
import { applySessionsPatchToStore } from "../sessions-patch.js";
|
||||
import { resolveSessionKeyFromResolveParams } from "../sessions-resolve.js";
|
||||
import type { GatewayClient, GatewayRequestHandlers, RespondFn } from "./types.js";
|
||||
import type {
|
||||
GatewayClient,
|
||||
GatewayRequestContext,
|
||||
GatewayRequestHandlers,
|
||||
RespondFn,
|
||||
} from "./types.js";
|
||||
import { assertValidParams } from "./validation.js";
|
||||
|
||||
function requireSessionKey(key: unknown, respond: RespondFn): string | null {
|
||||
@ -69,6 +74,20 @@ function resolveGatewaySessionTargetFromKey(key: string) {
|
||||
return { cfg, target, storePath: target.storePath };
|
||||
}
|
||||
|
||||
function emitSessionsChanged(
|
||||
broadcast: GatewayRequestContext["broadcast"],
|
||||
payload: { sessionKey?: string; reason: string; compacted?: boolean },
|
||||
) {
|
||||
broadcast(
|
||||
"sessions.changed",
|
||||
{
|
||||
...payload,
|
||||
ts: Date.now(),
|
||||
},
|
||||
{ dropIfSlow: true },
|
||||
);
|
||||
}
|
||||
|
||||
function rejectWebchatSessionMutation(params: {
|
||||
action: "patch" | "delete";
|
||||
client: GatewayClient | null;
|
||||
@ -133,6 +152,12 @@ export const sessionsHandlers: GatewayRequestHandlers = {
|
||||
});
|
||||
respond(true, result, undefined);
|
||||
},
|
||||
"sessions.subscribe": ({ respond }) => {
|
||||
respond(true, { subscribed: true }, undefined);
|
||||
},
|
||||
"sessions.unsubscribe": ({ respond }) => {
|
||||
respond(true, { subscribed: false }, undefined);
|
||||
},
|
||||
"sessions.preview": ({ params, respond }) => {
|
||||
if (!assertValidParams(params, validateSessionsPreviewParams, "sessions.preview", respond)) {
|
||||
return;
|
||||
@ -251,8 +276,12 @@ export const sessionsHandlers: GatewayRequestHandlers = {
|
||||
},
|
||||
};
|
||||
respond(true, result, undefined);
|
||||
emitSessionsChanged(context.broadcast, {
|
||||
sessionKey: target.canonicalKey,
|
||||
reason: "patch",
|
||||
});
|
||||
},
|
||||
"sessions.reset": async ({ params, respond }) => {
|
||||
"sessions.reset": async ({ params, respond, context }) => {
|
||||
if (!assertValidParams(params, validateSessionsResetParams, "sessions.reset", respond)) {
|
||||
return;
|
||||
}
|
||||
@ -273,8 +302,12 @@ export const sessionsHandlers: GatewayRequestHandlers = {
|
||||
return;
|
||||
}
|
||||
respond(true, { ok: true, key: result.key, entry: result.entry }, undefined);
|
||||
emitSessionsChanged(context.broadcast, {
|
||||
sessionKey: result.key,
|
||||
reason,
|
||||
});
|
||||
},
|
||||
"sessions.delete": async ({ params, respond, client, isWebchatConnect }) => {
|
||||
"sessions.delete": async ({ params, respond, client, isWebchatConnect, context }) => {
|
||||
if (!assertValidParams(params, validateSessionsDeleteParams, "sessions.delete", respond)) {
|
||||
return;
|
||||
}
|
||||
@ -344,6 +377,12 @@ export const sessionsHandlers: GatewayRequestHandlers = {
|
||||
}
|
||||
|
||||
respond(true, { ok: true, key: target.canonicalKey, deleted, archived }, undefined);
|
||||
if (deleted) {
|
||||
emitSessionsChanged(context.broadcast, {
|
||||
sessionKey: target.canonicalKey,
|
||||
reason: "delete",
|
||||
});
|
||||
}
|
||||
},
|
||||
"sessions.get": ({ params, respond }) => {
|
||||
const p = params;
|
||||
@ -367,7 +406,7 @@ export const sessionsHandlers: GatewayRequestHandlers = {
|
||||
const messages = limit < allMessages.length ? allMessages.slice(-limit) : allMessages;
|
||||
respond(true, { messages }, undefined);
|
||||
},
|
||||
"sessions.compact": async ({ params, respond }) => {
|
||||
"sessions.compact": async ({ params, respond, context }) => {
|
||||
if (!assertValidParams(params, validateSessionsCompactParams, "sessions.compact", respond)) {
|
||||
return;
|
||||
}
|
||||
@ -468,5 +507,10 @@ export const sessionsHandlers: GatewayRequestHandlers = {
|
||||
},
|
||||
undefined,
|
||||
);
|
||||
emitSessionsChanged(context.broadcast, {
|
||||
sessionKey: target.canonicalKey,
|
||||
reason: "compact",
|
||||
compacted: true,
|
||||
});
|
||||
},
|
||||
};
|
||||
|
||||
114
ui/src/ui/app-gateway.sessions.node.test.ts
Normal file
114
ui/src/ui/app-gateway.sessions.node.test.ts
Normal file
@ -0,0 +1,114 @@
|
||||
import { describe, expect, it, vi } from "vitest";
|
||||
|
||||
const loadSessionsMock = vi.fn();
|
||||
|
||||
vi.mock("./app-chat.ts", () => ({
|
||||
CHAT_SESSIONS_ACTIVE_MINUTES: 10,
|
||||
flushChatQueueForEvent: vi.fn(),
|
||||
}));
|
||||
vi.mock("./app-settings.ts", () => ({
|
||||
applySettings: vi.fn(),
|
||||
loadCron: vi.fn(),
|
||||
refreshActiveTab: vi.fn(),
|
||||
setLastActiveSessionKey: vi.fn(),
|
||||
}));
|
||||
vi.mock("./app-tool-stream.ts", () => ({
|
||||
handleAgentEvent: vi.fn(),
|
||||
resetToolStream: vi.fn(),
|
||||
}));
|
||||
vi.mock("./controllers/agents.ts", () => ({
|
||||
loadAgents: vi.fn(),
|
||||
loadToolsCatalog: vi.fn(),
|
||||
}));
|
||||
vi.mock("./controllers/assistant-identity.ts", () => ({
|
||||
loadAssistantIdentity: vi.fn(),
|
||||
}));
|
||||
vi.mock("./controllers/chat.ts", () => ({
|
||||
loadChatHistory: vi.fn(),
|
||||
handleChatEvent: vi.fn(() => "idle"),
|
||||
}));
|
||||
vi.mock("./controllers/devices.ts", () => ({
|
||||
loadDevices: vi.fn(),
|
||||
}));
|
||||
vi.mock("./controllers/exec-approval.ts", () => ({
|
||||
addExecApproval: vi.fn(),
|
||||
parseExecApprovalRequested: vi.fn(() => null),
|
||||
parseExecApprovalResolved: vi.fn(() => null),
|
||||
removeExecApproval: vi.fn(),
|
||||
}));
|
||||
vi.mock("./controllers/nodes.ts", () => ({
|
||||
loadNodes: vi.fn(),
|
||||
}));
|
||||
vi.mock("./controllers/sessions.ts", () => ({
|
||||
loadSessions: loadSessionsMock,
|
||||
subscribeSessions: vi.fn(),
|
||||
}));
|
||||
vi.mock("./gateway.ts", () => ({
|
||||
GatewayBrowserClient: class {},
|
||||
resolveGatewayErrorDetailCode: () => null,
|
||||
}));
|
||||
|
||||
const { handleGatewayEvent } = await import("./app-gateway.ts");
|
||||
|
||||
function createHost() {
|
||||
return {
|
||||
settings: {
|
||||
gatewayUrl: "ws://127.0.0.1:18789",
|
||||
token: "",
|
||||
sessionKey: "main",
|
||||
lastActiveSessionKey: "main",
|
||||
theme: "system",
|
||||
chatFocusMode: false,
|
||||
chatShowThinking: true,
|
||||
splitRatio: 0.6,
|
||||
navCollapsed: false,
|
||||
navGroupsCollapsed: {},
|
||||
},
|
||||
password: "",
|
||||
clientInstanceId: "instance-test",
|
||||
client: null,
|
||||
connected: true,
|
||||
hello: null,
|
||||
lastError: null,
|
||||
lastErrorCode: null,
|
||||
eventLogBuffer: [],
|
||||
eventLog: [],
|
||||
tab: "overview",
|
||||
presenceEntries: [],
|
||||
presenceError: null,
|
||||
presenceStatus: null,
|
||||
agentsLoading: false,
|
||||
agentsList: null,
|
||||
agentsError: null,
|
||||
toolsCatalogLoading: false,
|
||||
toolsCatalogError: null,
|
||||
toolsCatalogResult: null,
|
||||
debugHealth: null,
|
||||
assistantName: "OpenClaw",
|
||||
assistantAvatar: null,
|
||||
assistantAgentId: null,
|
||||
serverVersion: null,
|
||||
sessionKey: "main",
|
||||
chatRunId: null,
|
||||
refreshSessionsAfterChat: new Set<string>(),
|
||||
execApprovalQueue: [],
|
||||
execApprovalError: null,
|
||||
updateAvailable: null,
|
||||
} as Parameters<typeof handleGatewayEvent>[0];
|
||||
}
|
||||
|
||||
describe("handleGatewayEvent sessions.changed", () => {
|
||||
it("reloads sessions when the gateway pushes a sessions.changed event", () => {
|
||||
loadSessionsMock.mockReset();
|
||||
const host = createHost();
|
||||
|
||||
handleGatewayEvent(host, {
|
||||
event: "sessions.changed",
|
||||
payload: { sessionKey: "agent:main:main", reason: "patch" },
|
||||
seq: 1,
|
||||
});
|
||||
|
||||
expect(loadSessionsMock).toHaveBeenCalledTimes(1);
|
||||
expect(loadSessionsMock).toHaveBeenCalledWith(host);
|
||||
});
|
||||
});
|
||||
@ -28,7 +28,7 @@ import {
|
||||
} from "./controllers/exec-approval.ts";
|
||||
import { loadHealthState } from "./controllers/health.ts";
|
||||
import { loadNodes } from "./controllers/nodes.ts";
|
||||
import { loadSessions } from "./controllers/sessions.ts";
|
||||
import { loadSessions, subscribeSessions } from "./controllers/sessions.ts";
|
||||
import {
|
||||
resolveGatewayErrorDetailCode,
|
||||
type GatewayEventFrame,
|
||||
@ -220,6 +220,7 @@ export function connectGateway(host: GatewayHost) {
|
||||
(host as unknown as { chatStream: string | null }).chatStream = null;
|
||||
(host as unknown as { chatStreamStartedAt: number | null }).chatStreamStartedAt = null;
|
||||
resetToolStream(host as unknown as Parameters<typeof resetToolStream>[0]);
|
||||
void subscribeSessions(host as unknown as OpenClawApp);
|
||||
void loadAssistantIdentity(host as unknown as OpenClawApp);
|
||||
void loadAgents(host as unknown as OpenClawApp);
|
||||
void loadHealthState(host as unknown as OpenClawApp);
|
||||
@ -368,6 +369,11 @@ function handleGatewayEventUnsafe(host: GatewayHost, evt: GatewayEventFrame) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (evt.event === "sessions.changed") {
|
||||
void loadSessions(host as unknown as OpenClawApp);
|
||||
return;
|
||||
}
|
||||
|
||||
if (evt.event === "cron" && host.tab === "cron") {
|
||||
void loadCron(host as unknown as Parameters<typeof loadCron>[0]);
|
||||
}
|
||||
|
||||
@ -1,8 +1,21 @@
|
||||
import { afterEach, describe, expect, it, vi } from "vitest";
|
||||
import { deleteSession, deleteSessionAndRefresh, type SessionsState } from "./sessions.ts";
|
||||
import {
|
||||
deleteSession,
|
||||
deleteSessionAndRefresh,
|
||||
subscribeSessions,
|
||||
type SessionsState,
|
||||
} from "./sessions.ts";
|
||||
|
||||
type RequestFn = (method: string, params?: unknown) => Promise<unknown>;
|
||||
|
||||
if (!("window" in globalThis)) {
|
||||
Object.assign(globalThis, {
|
||||
window: {
|
||||
confirm: () => false,
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
function createState(request: RequestFn, overrides: Partial<SessionsState> = {}): SessionsState {
|
||||
return {
|
||||
client: { request } as unknown as SessionsState["client"],
|
||||
@ -22,6 +35,18 @@ afterEach(() => {
|
||||
vi.restoreAllMocks();
|
||||
});
|
||||
|
||||
describe("subscribeSessions", () => {
|
||||
it("registers for session change events", async () => {
|
||||
const request = vi.fn(async () => ({ subscribed: true }));
|
||||
const state = createState(request);
|
||||
|
||||
await subscribeSessions(state);
|
||||
|
||||
expect(request).toHaveBeenCalledWith("sessions.subscribe", {});
|
||||
expect(state.sessionsError).toBeNull();
|
||||
});
|
||||
});
|
||||
|
||||
describe("deleteSessionAndRefresh", () => {
|
||||
it("refreshes sessions after a successful delete", async () => {
|
||||
const request = vi.fn(async (method: string) => {
|
||||
|
||||
@ -14,6 +14,17 @@ export type SessionsState = {
|
||||
sessionsIncludeUnknown: boolean;
|
||||
};
|
||||
|
||||
export async function subscribeSessions(state: SessionsState) {
|
||||
if (!state.client || !state.connected) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
await state.client.request("sessions.subscribe", {});
|
||||
} catch (err) {
|
||||
state.sessionsError = String(err);
|
||||
}
|
||||
}
|
||||
|
||||
export async function loadSessions(
|
||||
state: SessionsState,
|
||||
overrides?: {
|
||||
|
||||
@ -86,6 +86,8 @@ export default defineConfig({
|
||||
"ui/src/ui/views/usage-render-details.test.ts",
|
||||
"ui/src/ui/controllers/agents.test.ts",
|
||||
"ui/src/ui/controllers/chat.test.ts",
|
||||
"ui/src/ui/controllers/sessions.test.ts",
|
||||
"ui/src/ui/app-gateway.sessions.node.test.ts",
|
||||
],
|
||||
setupFiles: ["test/setup.ts"],
|
||||
exclude: [
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user