diff --git a/src/gateway/server-http.ts b/src/gateway/server-http.ts index e9837cff1b3..7c2ade78a5e 100644 --- a/src/gateway/server-http.ts +++ b/src/gateway/server-http.ts @@ -72,6 +72,7 @@ import { } from "./server/plugins-http.js"; import type { ReadinessChecker } from "./server/readiness.js"; import type { GatewayWsClient } from "./server/ws-types.js"; +import { handleSessionHistoryHttpRequest } from "./sessions-history-http.js"; import { handleToolsInvokeHttpRequest } from "./tools-invoke-http.js"; type SubsystemLogger = ReturnType; @@ -811,6 +812,16 @@ export function createGatewayHttpServer(opts: { rateLimiter, }), }, + { + name: "sessions-history", + run: () => + handleSessionHistoryHttpRequest(req, res, { + auth: resolvedAuth, + trustedProxies, + allowRealIpFallback, + rateLimiter, + }), + }, { name: "slack", run: () => handleSlackHttpRequest(req, res), diff --git a/src/gateway/sessions-history-http.test.ts b/src/gateway/sessions-history-http.test.ts new file mode 100644 index 00000000000..c8cab95e328 --- /dev/null +++ b/src/gateway/sessions-history-http.test.ts @@ -0,0 +1,167 @@ +import fs from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; +import { afterEach, describe, expect, test } from "vitest"; +import { appendAssistantMessageToSessionTranscript } from "../config/sessions/transcript.js"; +import { testState } from "./test-helpers.mocks.js"; +import { + createGatewaySuiteHarness, + installGatewayTestHooks, + writeSessionStore, +} from "./test-helpers.server.js"; + +installGatewayTestHooks(); + +const AUTH_HEADER = { Authorization: "Bearer test-gateway-token-1234567890" }; +const cleanupDirs: string[] = []; + +afterEach(async () => { + await Promise.all( + cleanupDirs.splice(0).map((dir) => fs.rm(dir, { recursive: true, force: true })), + ); +}); + +async function createSessionStoreFile(): Promise { + const dir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-session-history-")); + cleanupDirs.push(dir); + const storePath = path.join(dir, "sessions.json"); + testState.sessionStorePath = storePath; + return storePath; +} + +async function seedSession(params?: { text?: string }) { + const storePath = await createSessionStoreFile(); + await writeSessionStore({ + entries: { + main: { + sessionId: "sess-main", + updatedAt: Date.now(), + }, + }, + storePath, + }); + if (params?.text) { + const appended = await appendAssistantMessageToSessionTranscript({ + sessionKey: "agent:main:main", + text: params.text, + storePath, + }); + expect(appended.ok).toBe(true); + } + return { storePath }; +} + +async function readSseEvent( + reader: ReadableStreamDefaultReader, + state: { buffer: string }, +): Promise<{ event: string; data: unknown }> { + const decoder = new TextDecoder(); + while (true) { + const boundary = state.buffer.indexOf("\n\n"); + if (boundary >= 0) { + const rawEvent = state.buffer.slice(0, boundary); + state.buffer = state.buffer.slice(boundary + 2); + const lines = rawEvent.split("\n"); + const event = + lines + .find((line) => line.startsWith("event:")) + ?.slice("event:".length) + .trim() ?? "message"; + const data = lines + .filter((line) => line.startsWith("data:")) + .map((line) => line.slice("data:".length).trim()) + .join("\n"); + if (!data) { + continue; + } + return { event, data: JSON.parse(data) }; + } + const chunk = await reader.read(); + if (chunk.done) { + throw new Error("SSE stream ended before next event"); + } + state.buffer += decoder.decode(chunk.value, { stream: true }); + } +} + +describe("session history HTTP endpoints", () => { + test("returns session history over direct REST", async () => { + await seedSession({ text: "hello from history" }); + + const harness = await createGatewaySuiteHarness(); + try { + const res = await fetch( + `http://127.0.0.1:${harness.port}/sessions/${encodeURIComponent("agent:main:main")}/history`, + { + headers: AUTH_HEADER, + }, + ); + + expect(res.status).toBe(200); + const body = (await res.json()) as { + sessionKey?: string; + messages?: Array<{ content?: Array<{ text?: string }> }>; + }; + expect(body.sessionKey).toBe("agent:main:main"); + expect(body.messages).toHaveLength(1); + expect(body.messages?.[0]?.content?.[0]?.text).toBe("hello from history"); + } finally { + await harness.close(); + } + }); + + test("streams session history updates over SSE", async () => { + const { storePath } = await seedSession({ text: "first message" }); + + const harness = await createGatewaySuiteHarness(); + try { + const res = await fetch( + `http://127.0.0.1:${harness.port}/sessions/${encodeURIComponent("agent:main:main")}/history`, + { + headers: { + ...AUTH_HEADER, + Accept: "text/event-stream", + }, + }, + ); + + expect(res.status).toBe(200); + expect(res.headers.get("content-type") ?? "").toContain("text/event-stream"); + const reader = res.body?.getReader(); + expect(reader).toBeTruthy(); + const streamState = { buffer: "" }; + const historyEvent = await readSseEvent(reader!, streamState); + expect(historyEvent.event).toBe("history"); + expect( + (historyEvent.data as { messages?: Array<{ content?: Array<{ text?: string }> }> }) + .messages?.[0]?.content?.[0]?.text, + ).toBe("first message"); + + const appended = await appendAssistantMessageToSessionTranscript({ + sessionKey: "agent:main:main", + text: "second message", + storePath, + }); + expect(appended.ok).toBe(true); + + const messageEvent = await readSseEvent(reader!, streamState); + expect(messageEvent.event).toBe("message"); + expect( + ( + messageEvent.data as { + sessionKey?: string; + message?: { content?: Array<{ text?: string }> }; + } + ).sessionKey, + ).toBe("agent:main:main"); + expect( + (messageEvent.data as { message?: { content?: Array<{ text?: string }> } }).message + ?.content?.[0]?.text, + ).toBe("second message"); + + await reader?.cancel(); + } finally { + await harness.close(); + } + }); +}); diff --git a/src/gateway/sessions-history-http.ts b/src/gateway/sessions-history-http.ts new file mode 100644 index 00000000000..44e6494dbce --- /dev/null +++ b/src/gateway/sessions-history-http.ts @@ -0,0 +1,204 @@ +import fs from "node:fs"; +import type { IncomingMessage, ServerResponse } from "node:http"; +import path from "node:path"; +import { loadConfig } from "../config/config.js"; +import { loadSessionStore } from "../config/sessions.js"; +import { onSessionTranscriptUpdate } from "../sessions/transcript-events.js"; +import type { AuthRateLimiter } from "./auth-rate-limit.js"; +import { authorizeHttpGatewayConnect, type ResolvedGatewayAuth } from "./auth.js"; +import { + sendGatewayAuthFailure, + sendInvalidRequest, + sendJson, + sendMethodNotAllowed, + setSseHeaders, +} from "./http-common.js"; +import { getBearerToken, getHeader } from "./http-utils.js"; +import { + readSessionMessages, + resolveGatewaySessionStoreTarget, + resolveSessionTranscriptCandidates, +} from "./session-utils.js"; + +function resolveSessionHistoryPath(req: IncomingMessage): string | null { + const url = new URL(req.url ?? "/", `http://${req.headers.host ?? "localhost"}`); + const match = url.pathname.match(/^\/sessions\/([^/]+)\/history$/); + if (!match) { + return null; + } + try { + return decodeURIComponent(match[1] ?? "").trim() || null; + } catch { + return ""; + } +} + +function shouldStreamSse(req: IncomingMessage): boolean { + const accept = getHeader(req, "accept")?.toLowerCase() ?? ""; + return accept.includes("text/event-stream"); +} + +function resolveLimit(req: IncomingMessage): number | undefined { + const url = new URL(req.url ?? "/", `http://${req.headers.host ?? "localhost"}`); + const raw = url.searchParams.get("limit"); + if (raw == null || raw.trim() === "") { + return undefined; + } + const value = Number.parseInt(raw, 10); + if (!Number.isFinite(value) || value < 1) { + return 1; + } + return Math.max(1, value); +} + +function maybeLimitMessages(messages: unknown[], limit: number | undefined): unknown[] { + if (limit === undefined || limit >= messages.length) { + return messages; + } + return messages.slice(-limit); +} + +function canonicalizePath(value: string | undefined): string | undefined { + const trimmed = value?.trim(); + if (!trimmed) { + return undefined; + } + const resolved = path.resolve(trimmed); + try { + return fs.realpathSync(resolved); + } catch { + return resolved; + } +} + +function sseWrite(res: ServerResponse, event: string, payload: unknown): void { + res.write(`event: ${event}\n`); + res.write(`data: ${JSON.stringify(payload)}\n\n`); +} + +export async function handleSessionHistoryHttpRequest( + req: IncomingMessage, + res: ServerResponse, + opts: { + auth: ResolvedGatewayAuth; + trustedProxies?: string[]; + allowRealIpFallback?: boolean; + rateLimiter?: AuthRateLimiter; + }, +): Promise { + const sessionKey = resolveSessionHistoryPath(req); + if (sessionKey === null) { + return false; + } + if (!sessionKey) { + sendInvalidRequest(res, "invalid session key"); + return true; + } + if (req.method !== "GET") { + sendMethodNotAllowed(res, "GET"); + return true; + } + + const cfg = loadConfig(); + const token = getBearerToken(req); + const authResult = await authorizeHttpGatewayConnect({ + auth: opts.auth, + connectAuth: token ? { token, password: token } : null, + req, + trustedProxies: opts.trustedProxies ?? cfg.gateway?.trustedProxies, + allowRealIpFallback: opts.allowRealIpFallback ?? cfg.gateway?.allowRealIpFallback, + rateLimiter: opts.rateLimiter, + }); + if (!authResult.ok) { + sendGatewayAuthFailure(res, authResult); + return true; + } + + const target = resolveGatewaySessionStoreTarget({ cfg, key: sessionKey }); + const store = loadSessionStore(target.storePath); + const entry = target.storeKeys.map((key) => store[key]).find(Boolean); + const limit = resolveLimit(req); + const messages = entry?.sessionId + ? maybeLimitMessages( + readSessionMessages(entry.sessionId, target.storePath, entry.sessionFile), + limit, + ) + : []; + + if (!shouldStreamSse(req)) { + sendJson(res, 200, { + sessionKey: target.canonicalKey, + messages, + }); + return true; + } + + const transcriptCandidates = entry?.sessionId + ? new Set( + resolveSessionTranscriptCandidates( + entry.sessionId, + target.storePath, + entry.sessionFile, + target.agentId, + ) + .map((candidate) => canonicalizePath(candidate)) + .filter((candidate): candidate is string => typeof candidate === "string"), + ) + : new Set(); + + let sentMessages = messages; + setSseHeaders(res); + res.write("retry: 1000\n\n"); + sseWrite(res, "history", { + sessionKey: target.canonicalKey, + messages: sentMessages, + }); + + const heartbeat = setInterval(() => { + if (!res.writableEnded) { + res.write(": keepalive\n\n"); + } + }, 15_000); + + const unsubscribe = onSessionTranscriptUpdate((update) => { + if (res.writableEnded || !entry?.sessionId) { + return; + } + const updatePath = canonicalizePath(update.sessionFile); + if (!updatePath || !transcriptCandidates.has(updatePath)) { + return; + } + const nextMessages = maybeLimitMessages( + readSessionMessages(entry.sessionId, target.storePath, entry.sessionFile), + limit, + ); + if (nextMessages.length < sentMessages.length) { + sentMessages = nextMessages; + sseWrite(res, "history", { + sessionKey: target.canonicalKey, + messages: sentMessages, + }); + return; + } + if (nextMessages.length === sentMessages.length) { + return; + } + const appended = nextMessages.slice(sentMessages.length); + sentMessages = nextMessages; + for (const message of appended) { + sseWrite(res, "message", { + sessionKey: target.canonicalKey, + message, + }); + } + }); + + const cleanup = () => { + clearInterval(heartbeat); + unsubscribe(); + }; + req.on("close", cleanup); + res.on("close", cleanup); + res.on("finish", cleanup); + return true; +}