👌 IMPROVE: chain of thought and live stream

This commit is contained in:
kumarabhirup 2026-02-12 13:37:40 -08:00
parent 5329d265f3
commit c2a302b582
No known key found for this signature in database
GPG Key ID: DB7CA2289CAB0167
18 changed files with 3680 additions and 525 deletions

View File

@ -0,0 +1,214 @@
---
name: Web Cron Dashboard
overview: Add a "Cron" virtual folder to the web app sidebar that reads `~/.openclaw/cron/` data, showing all cron jobs, their heartbeat/scheduling status, run history, and full session transcripts with thinking for each run.
todos:
- id: types
content: Create shared cron types in apps/web/app/types/cron.ts
status: pending
- id: api-jobs
content: Create GET /api/cron/jobs route (reads jobs.json + heartbeat info)
status: pending
- id: api-runs
content: Create GET /api/cron/jobs/[jobId]/runs route (reads run log JSONL)
status: pending
- id: api-session
content: Create GET /api/cron/runs/[sessionId] route (reads session transcript JSONL with full thinking/tools)
status: pending
- id: cron-dashboard
content: Build CronDashboard component (heartbeat status, job list, timeline)
status: pending
- id: cron-job-detail
content: Build CronJobDetail component (job config, next run countdown, run history)
status: pending
- id: cron-run-chat
content: Build CronRunChat component (full session transcript with thinking/tools, reusing ChainOfThought patterns)
status: pending
- id: sidebar
content: Add ~cron virtual folder to workspace sidebar in page.tsx (fetch + tree + handleNodeSelect)
status: pending
- id: content-routing
content: Extend content state and main area renderer in workspace/page.tsx for cron-dashboard and cron-job kinds
status: pending
isProject: false
---
# Web Cron Dashboard
## Architecture
The cron system stores its data at:
- **Job definitions**: `~/.openclaw/cron/jobs.json` (`CronStoreFile` with `CronJob[]`)
- **Run logs per job**: `~/.openclaw/cron/runs/{jobId}.jsonl` (`CronRunLogEntry` per line)
- **Run session transcripts**: `~/.openclaw/agents/{agentId}/sessions/{sessionId}.jsonl`
The web app does NOT connect to the gateway directly -- it reads files from disk via Next.js API routes (same pattern as existing `/api/sessions`, `/api/web-sessions`). No gateway WebSocket integration needed.
```mermaid
flowchart LR
subgraph disk [Disk Storage]
JobsJSON["~/.openclaw/cron/jobs.json"]
RunLogs["~/.openclaw/cron/runs/{jobId}.jsonl"]
Sessions["~/.openclaw/agents/{agentId}/sessions/{sessionId}.jsonl"]
end
subgraph api [Next.js API Routes]
CronJobsAPI["/api/cron/jobs"]
CronRunsAPI["/api/cron/jobs/{jobId}/runs"]
CronRunSessionAPI["/api/cron/runs/{sessionId}"]
end
subgraph ui [UI Components]
Sidebar["~cron virtual folder"]
Dashboard["CronDashboard"]
JobDetail["CronJobDetail"]
RunChat["CronRunChat"]
end
JobsJSON --> CronJobsAPI --> Dashboard
RunLogs --> CronRunsAPI --> JobDetail
Sessions --> CronRunSessionAPI --> RunChat
Dashboard --> Sidebar
JobDetail --> Sidebar
RunChat --> Sidebar
```
## 1. API Routes
### `GET /api/cron/jobs` -- [apps/web/app/api/cron/jobs/route.ts](apps/web/app/api/cron/jobs/route.ts)
Read `~/.openclaw/cron/jobs.json` and return all jobs with computed status fields. Also scan `~/.openclaw/cron/runs/` to get latest run info. Include heartbeat info by reading agent config from `~/.openclaw/config.yaml` (heartbeat interval, next due estimate).
Returns:
```typescript
{
jobs: CronJob[],
heartbeat: {
intervalMs: number,
nextDueEstimateMs: number | null
},
cronStatus: {
enabled: boolean,
nextWakeAtMs: number | null
}
}
```
### `GET /api/cron/jobs/[jobId]/runs` -- [apps/web/app/api/cron/jobs/[jobId]/runs/route.ts](apps/web/app/api/cron/jobs/[jobId]/runs/route.ts)
Read `~/.openclaw/cron/runs/{jobId}.jsonl` using the same logic as `readCronRunLogEntries` in [src/cron/run-log.ts](src/cron/run-log.ts). Return entries with `?limit=N` support.
Returns:
```typescript
{ entries: CronRunLogEntry[] }
```
### `GET /api/cron/runs/[sessionId]` -- [apps/web/app/api/cron/runs/[sessionId]/route.ts](apps/web/app/api/cron/runs/[sessionId]/route.ts)
Find and read `~/.openclaw/agents/*/sessions/{sessionId}.jsonl` (same search pattern as existing [apps/web/app/api/sessions/[sessionId]/route.ts](apps/web/app/api/sessions/[sessionId]/route.ts)). Parse JSONL into messages including `thinking` blocks, tool calls, and text -- return the full conversation with all parts preserved (not truncated like the existing session API).
Returns:
```typescript
{
sessionId: string,
messages: Array<{
id: string,
role: "user" | "assistant",
parts: Array<
| { type: "text", text: string }
| { type: "thinking", thinking: string }
| { type: "tool-call", toolName: string, args: unknown, output?: string }
>,
timestamp: string
}>
}
```
## 2. Sidebar Integration
In [apps/web/app/workspace/page.tsx](apps/web/app/workspace/page.tsx), add a `~cron` virtual folder alongside the existing `~chats` folder:
- Fetch cron jobs from `/api/cron/jobs` on mount (same pattern as `fetchSessions`).
- Build a `~cron` `TreeNode` folder with children:
- Each job becomes a folder: `~cron/{jobId}` (name = job name, with status icon in the name string)
- Clicking `~cron` opens the cron dashboard view
- Clicking `~cron/{jobId}` opens the job detail view
- Handle `~cron` paths in `handleNodeSelect` (same pattern as `~chats`).
```typescript
const cronFolder: TreeNode = {
name: "Cron",
path: "~cron",
type: "folder",
virtual: true,
children: cronJobs.map((j) => ({
name: `${statusEmoji(j)} ${j.name}`,
path: `~cron/${j.id}`,
type: "folder",
virtual: true,
})),
};
return [...tree, chatsFolder, cronFolder];
```
## 3. UI Components
### CronDashboard ([apps/web/app/components/cron/cron-dashboard.tsx](apps/web/app/components/cron/cron-dashboard.tsx))
Shown when user clicks `~cron` in sidebar. Displays:
- **Heartbeat status card**: interval (e.g. "every 30m"), estimated next heartbeat as a live countdown, explanation of how heartbeat decides which cron jobs to run
- **Cron scheduler status**: enabled/disabled, next wake time as countdown
- **Jobs table/list**: all jobs with columns:
- Name, Schedule (human-readable), Status (running/ok/error/disabled/idle), Next run (countdown), Last run (time-ago + duration), Session target (main/isolated)
- Click a job row to navigate to its detail view
- **Timeline view**: visual timeline showing upcoming scheduled runs across all jobs (next 24h)
### CronJobDetail ([apps/web/app/components/cron/cron-job-detail.tsx](apps/web/app/components/cron/cron-job-detail.tsx))
Shown when user clicks `~cron/{jobId}`. Displays:
- **Job config header**: name, description, schedule (formatted), enabled status, session target, wake mode, payload summary
- **Next run countdown**: large countdown timer to next execution
- **Run history list**: each `CronRunLogEntry` as a card with:
- Timestamp, status badge (ok/error/skipped), duration, summary text
- Click to expand into full run chat (loads session transcript inline or navigates to run view)
- **Error details**: if `consecutiveErrors > 0`, show error streak and last error message
### CronRunChat ([apps/web/app/components/cron/cron-run-chat.tsx](apps/web/app/components/cron/cron-run-chat.tsx))
Displayed inline when expanding a run in `CronJobDetail`. Renders the full session transcript:
- Reuse `ChatMessage` and `ChainOfThought` components from [apps/web/app/components/chat-message.tsx](apps/web/app/components/chat-message.tsx) and [apps/web/app/components/chain-of-thought.tsx](apps/web/app/components/chain-of-thought.tsx) (or at minimum their rendering patterns)
- Show all thinking/reasoning blocks fully expanded (not collapsed, since these are historical runs)
- Show all tool calls with their inputs and outputs
- Show the system event / prompt that triggered the run
- Display run metadata at the top: model used, tokens consumed, duration
## 4. Content Routing in Workspace Page
Extend the `content` state in [apps/web/app/workspace/page.tsx](apps/web/app/workspace/page.tsx) with new kinds:
```typescript
type ContentState =
| { kind: "none" }
| { kind: "loading" }
| { kind: "document"; ... }
| { kind: "object"; ... }
| { kind: "cron-dashboard"; jobs: CronJob[]; heartbeat: HeartbeatInfo }
| { kind: "cron-job"; jobId: string; job: CronJob; runs: CronRunLogEntry[] }
```
In the main content area renderer, add cases for `cron-dashboard` and `cron-job` that render the corresponding components.
## 5. Data Types (shared)
Create [apps/web/app/types/cron.ts](apps/web/app/types/cron.ts) with client-side type definitions mirroring `CronJob`, `CronRunLogEntry`, `CronSchedule`, `CronJobState` etc. (copy from [src/cron/types.ts](src/cron/types.ts) and [src/cron/run-log.ts](src/cron/run-log.ts), stripped of server-only imports).
## Key Design Decisions
- **Read-only**: The cron UI is read-only (view jobs, runs, transcripts). No add/edit/delete/enable/disable from the web UI -- that stays in the CLI.
- **No gateway connection**: All data read directly from disk via API routes, consistent with the existing web app pattern.
- **Reuse existing components**: Chat rendering reuses `ChatMessage`/`ChainOfThought` patterns rather than building from scratch.
- **Live countdowns**: Use `setInterval` for countdown timers (next heartbeat, next cron run) with periodic refetch of job data to stay current.
- **Auto-refresh**: Poll `/api/cron/jobs` every ~30s to catch state changes from the running gateway.

View File

@ -1,6 +1,12 @@
import type { UIMessage } from "ai";
import { runAgent, type ToolResult } from "@/lib/agent-runner";
import { resolveAgentWorkspacePrefix } from "@/lib/workspace";
import {
startRun,
hasActiveRun,
subscribeToRun,
persistUserMessage,
type SseEvent,
} from "@/lib/active-runs";
// Force Node.js runtime (required for child_process)
export const runtime = "nodejs";
@ -8,43 +14,19 @@ export const runtime = "nodejs";
// Allow streaming responses up to 10 minutes
export const maxDuration = 600;
/**
* Build a flat output object from the agent's tool result so the frontend
* can render tool output text, exit codes, etc.
*/
function buildToolOutput(
result?: ToolResult,
): Record<string, unknown> {
if (!result) {return {};}
const out: Record<string, unknown> = {};
if (result.text) {out.text = result.text;}
if (result.details) {
// Forward useful details (exit code, duration, status, cwd)
for (const key of [
"exitCode",
"status",
"durationMs",
"cwd",
"error",
"reason",
]) {
if (result.details[key] !== undefined)
{out[key] = result.details[key];}
}
}
return out;
}
export async function POST(req: Request) {
const { messages, sessionId }: { messages: UIMessage[]; sessionId?: string } =
await req.json();
const {
messages,
sessionId,
}: { messages: UIMessage[]; sessionId?: string } = await req.json();
// Extract the latest user message text
const lastUserMessage = messages.filter((m) => m.role === "user").pop();
const userText =
lastUserMessage?.parts
?.filter(
(p): p is { type: "text"; text: string } => p.type === "text",
(p): p is { type: "text"; text: string } =>
p.type === "text",
)
.map((p) => p.text)
.join("\n") ?? "";
@ -53,9 +35,12 @@ export async function POST(req: Request) {
return new Response("No message provided", { status: 400 });
}
// Reject if a run is already active for this session.
if (sessionId && hasActiveRun(sessionId)) {
return new Response("Active run in progress", { status: 409 });
}
// Resolve workspace file paths to be agent-cwd-relative.
// Tree paths are workspace-root-relative (e.g. "knowledge/leads/foo.md"),
// but the agent runs from the repo root and needs "dench/knowledge/leads/foo.md".
let agentMessage = userText;
const wsPrefix = resolveAgentWorkspacePrefix();
if (wsPrefix) {
@ -65,295 +50,84 @@ export async function POST(req: Request) {
);
}
// Create a custom SSE stream using the AI SDK v6 data stream wire format.
// DefaultChatTransport parses these events into UIMessage parts automatically.
// Persist the user message server-side so it survives a page reload
// even if the client never gets a chance to save.
if (sessionId && lastUserMessage) {
persistUserMessage(sessionId, {
id: lastUserMessage.id,
content: userText,
parts: lastUserMessage.parts as unknown[],
});
}
// Start the agent run (decoupled from this HTTP connection).
// The child process will keep running even if this response is cancelled.
if (sessionId) {
try {
startRun({
sessionId,
message: agentMessage,
agentSessionId: sessionId,
});
} catch (err) {
return new Response(
err instanceof Error ? err.message : String(err),
{ status: 500 },
);
}
}
// Stream SSE events to the client using the AI SDK v6 wire format.
const encoder = new TextEncoder();
let closed = false;
const abortController = new AbortController();
let unsubscribe: (() => void) | null = null;
const stream = new ReadableStream({
async start(controller) {
// Use incrementing IDs so multi-round reasoning/text cycles get
// unique part IDs (avoids conflicts in the AI SDK transport).
let idCounter = 0;
const nextId = (prefix: string) =>
`${prefix}-${Date.now()}-${++idCounter}`;
start(controller) {
if (!sessionId) {
// No session — shouldn't happen but close gracefully.
controller.close();
return;
}
let currentTextId = "";
let currentReasoningId = "";
let textStarted = false;
let reasoningStarted = false;
// Track whether ANY text was ever sent across the full run.
// onLifecycleEnd closes the text part (textStarted→false), so
// onClose can't rely on textStarted alone to detect "no output".
let everSentText = false;
// Track whether the status reasoning block is the one currently open
// so we can close it cleanly when real content arrives.
let statusReasoningActive = false;
/** Write an SSE event; silently no-ops if the stream was already cancelled. */
const writeEvent = (data: unknown) => {
if (closed) {return;}
const json = JSON.stringify(data);
controller.enqueue(encoder.encode(`data: ${json}\n\n`));
};
/** Close the reasoning part if open. */
const closeReasoning = () => {
if (reasoningStarted) {
writeEvent({
type: "reasoning-end",
id: currentReasoningId,
});
reasoningStarted = false;
statusReasoningActive = false;
}
};
/** Close the text part if open. */
const closeText = () => {
if (textStarted) {
writeEvent({ type: "text-end", id: currentTextId });
textStarted = false;
}
};
/** Open a status reasoning block (auto-closes any existing one). */
const openStatusReasoning = (label: string) => {
closeReasoning();
closeText();
currentReasoningId = nextId("status");
writeEvent({
type: "reasoning-start",
id: currentReasoningId,
});
writeEvent({
type: "reasoning-delta",
id: currentReasoningId,
delta: label,
});
reasoningStarted = true;
statusReasoningActive = true;
};
try {
await runAgent(agentMessage, abortController.signal, {
onLifecycleStart: () => {
// Show immediate feedback — the agent has started working.
// This eliminates the "Streaming... (silence)" gap.
openStatusReasoning("Preparing response...");
},
onThinkingDelta: (delta) => {
// Close the status block if it's still the active one;
// real reasoning content is now arriving.
if (statusReasoningActive) {
closeReasoning();
unsubscribe = subscribeToRun(
sessionId,
(event: SseEvent | null) => {
if (closed) {return;}
if (event === null) {
// Run completed — close the SSE stream.
closed = true;
try {
controller.close();
} catch {
/* already closed */
}
if (!reasoningStarted) {
currentReasoningId = nextId("reasoning");
writeEvent({
type: "reasoning-start",
id: currentReasoningId,
});
reasoningStarted = true;
}
writeEvent({
type: "reasoning-delta",
id: currentReasoningId,
delta,
});
},
return;
}
try {
const json = JSON.stringify(event);
controller.enqueue(
encoder.encode(`data: ${json}\n\n`),
);
} catch {
/* ignore enqueue errors on closed stream */
}
},
// Don't replay — we just created the run, the buffer is empty.
{ replay: false },
);
onTextDelta: (delta) => {
// Close reasoning once text starts streaming
closeReasoning();
if (!textStarted) {
currentTextId = nextId("text");
writeEvent({
type: "text-start",
id: currentTextId,
});
textStarted = true;
}
everSentText = true;
writeEvent({
type: "text-delta",
id: currentTextId,
delta,
});
},
onToolStart: (toolCallId, toolName, args) => {
// Close open reasoning/text parts before tool events
closeReasoning();
closeText();
writeEvent({
type: "tool-input-start",
toolCallId,
toolName,
});
// Include actual tool arguments so the frontend can
// display what the tool is doing (command, path, etc.)
writeEvent({
type: "tool-input-available",
toolCallId,
toolName,
input: args ?? {},
});
},
onToolEnd: (
toolCallId,
_toolName,
isError,
result,
) => {
if (isError) {
const errorText =
result?.text ||
(result?.details?.error as
| string
| undefined) ||
"Tool execution failed";
writeEvent({
type: "tool-output-error",
toolCallId,
errorText,
});
} else {
// Include the actual tool output (text, exit code, etc.)
writeEvent({
type: "tool-output-available",
toolCallId,
output: buildToolOutput(result),
});
}
},
onCompactionStart: () => {
// Show compaction status while the gateway is
// optimizing the session context (can take 10-30s).
openStatusReasoning("Optimizing session context...");
},
onCompactionEnd: (willRetry) => {
// Close the compaction status block. If the gateway
// will retry the prompt, leave the reasoning area open
// so the next status/thinking block follows smoothly.
if (statusReasoningActive) {
if (willRetry) {
// Append a note, keep block open for retry
writeEvent({
type: "reasoning-delta",
id: currentReasoningId,
delta: "\nRetrying with compacted context...",
});
} else {
closeReasoning();
}
}
},
onLifecycleEnd: () => {
closeReasoning();
closeText();
},
onAgentError: (message) => {
// Surface agent-level errors (API 402, rate limits, etc.)
// as visible text in the chat so the user sees what happened.
closeReasoning();
closeText();
currentTextId = nextId("text");
writeEvent({
type: "text-start",
id: currentTextId,
});
writeEvent({
type: "text-delta",
id: currentTextId,
delta: `[error] ${message}`,
});
writeEvent({
type: "text-end",
id: currentTextId,
});
textStarted = false;
everSentText = true;
},
onError: (err) => {
console.error("[chat] Agent error:", err);
closeReasoning();
closeText();
currentTextId = nextId("text");
writeEvent({
type: "text-start",
id: currentTextId,
});
textStarted = true;
everSentText = true;
writeEvent({
type: "text-delta",
id: currentTextId,
delta: `[error] Failed to start agent: ${err.message}`,
});
writeEvent({ type: "text-end", id: currentTextId });
textStarted = false;
},
onClose: (_code) => {
closeReasoning();
if (!everSentText) {
// No text was ever sent during the entire run
currentTextId = nextId("text");
writeEvent({
type: "text-start",
id: currentTextId,
});
const msg =
_code !== null && _code !== 0
? `[error] Agent exited with code ${_code}. Check server logs for details.`
: "[error] No response from agent.";
writeEvent({
type: "text-delta",
id: currentTextId,
delta: msg,
});
writeEvent({
type: "text-end",
id: currentTextId,
});
} else {
// Ensure any still-open text part is closed
closeText();
}
},
}, sessionId ? { sessionId } : undefined);
} catch (error) {
console.error("[chat] Stream error:", error);
writeEvent({
type: "error",
errorText:
error instanceof Error
? error.message
: String(error),
});
} finally {
if (!closed) {
closed = true;
controller.close();
}
if (!unsubscribe) {
// Race: run was cleaned up between startRun and subscribe.
closed = true;
controller.close();
}
},
cancel() {
// Client disconnected (e.g. user hit stop) — tear down gracefully.
// Client disconnected — unsubscribe but keep the run alive.
// The ActiveRunManager continues buffering + persisting in the background.
closed = true;
abortController.abort();
unsubscribe?.();
},
});

View File

@ -0,0 +1,22 @@
/**
* POST /api/chat/stop
*
* Abort an active agent run. Called by the Stop button.
* The child process is sent SIGTERM and the run transitions to "error" state.
*/
import { abortRun } from "@/lib/active-runs";
export const runtime = "nodejs";
export async function POST(req: Request) {
const body: { sessionId?: string } = await req
.json()
.catch(() => ({}));
if (!body.sessionId) {
return new Response("sessionId required", { status: 400 });
}
const aborted = abortRun(body.sessionId);
return Response.json({ aborted });
}

View File

@ -0,0 +1,87 @@
/**
* GET /api/chat/stream?sessionId=xxx
*
* Reconnect to an active (or recently-completed) agent run.
* Replays all buffered SSE events from the start of the run, then
* streams live events until the run finishes.
*
* Returns 404 if no run exists for the given session.
*/
import {
getActiveRun,
subscribeToRun,
type SseEvent,
} from "@/lib/active-runs";
export const runtime = "nodejs";
export const maxDuration = 600;
export async function GET(req: Request) {
const url = new URL(req.url);
const sessionId = url.searchParams.get("sessionId");
if (!sessionId) {
return new Response("sessionId required", { status: 400 });
}
const run = getActiveRun(sessionId);
if (!run) {
return Response.json({ active: false }, { status: 404 });
}
const encoder = new TextEncoder();
let closed = false;
let unsubscribe: (() => void) | null = null;
const stream = new ReadableStream({
start(controller) {
// subscribeToRun with replay=true replays the full event buffer
// synchronously, then subscribes for live events.
unsubscribe = subscribeToRun(
sessionId,
(event: SseEvent | null) => {
if (closed) {return;}
if (event === null) {
// Run completed — close the SSE stream.
closed = true;
try {
controller.close();
} catch {
/* already closed */
}
return;
}
try {
const json = JSON.stringify(event);
controller.enqueue(
encoder.encode(`data: ${json}\n\n`),
);
} catch {
/* ignore enqueue errors on closed stream */
}
},
{ replay: true },
);
if (!unsubscribe) {
// Run was cleaned up between getActiveRun and subscribe.
closed = true;
controller.close();
}
},
cancel() {
// Client disconnected — unsubscribe only (don't kill the run).
closed = true;
unsubscribe?.();
},
});
return new Response(stream, {
headers: {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache, no-transform",
Connection: "keep-alive",
"X-Run-Active": run.status === "running" ? "true" : "false",
},
});
}

View File

@ -0,0 +1,86 @@
import { readFileSync, existsSync } from "node:fs";
import { join } from "node:path";
import { homedir } from "node:os";
export const dynamic = "force-dynamic";
const CRON_DIR = join(homedir(), ".openclaw", "cron");
type CronRunLogEntry = {
ts: number;
jobId: string;
action: "finished";
status?: string;
error?: string;
summary?: string;
sessionId?: string;
sessionKey?: string;
runAtMs?: number;
durationMs?: number;
nextRunAtMs?: number;
};
/** Read run log entries from a JSONL file, returning most recent first (then reversed). */
function readRunLog(filePath: string, limit: number): CronRunLogEntry[] {
if (!existsSync(filePath)) {return [];}
try {
const raw = readFileSync(filePath, "utf-8");
if (!raw.trim()) {return [];}
const lines = raw.split("\n");
const parsed: CronRunLogEntry[] = [];
for (let i = lines.length - 1; i >= 0 && parsed.length < limit; i--) {
const line = lines[i]?.trim();
if (!line) {continue;}
try {
const obj = JSON.parse(line) as Partial<CronRunLogEntry>;
if (!obj || typeof obj !== "object") {continue;}
if (obj.action !== "finished") {continue;}
if (typeof obj.jobId !== "string" || !obj.jobId.trim()) {continue;}
if (typeof obj.ts !== "number" || !Number.isFinite(obj.ts)) {continue;}
const entry: CronRunLogEntry = {
ts: obj.ts,
jobId: obj.jobId,
action: "finished",
status: obj.status,
error: obj.error,
summary: obj.summary,
runAtMs: obj.runAtMs,
durationMs: obj.durationMs,
nextRunAtMs: obj.nextRunAtMs,
};
if (typeof obj.sessionId === "string" && obj.sessionId.trim()) {
entry.sessionId = obj.sessionId;
}
if (typeof obj.sessionKey === "string" && obj.sessionKey.trim()) {
entry.sessionKey = obj.sessionKey;
}
parsed.push(entry);
} catch {
// skip malformed lines
}
}
return parsed.toReversed();
} catch {
return [];
}
}
/** GET /api/cron/jobs/[jobId]/runs -- list run log entries for a cron job */
export async function GET(
request: Request,
{ params }: { params: Promise<{ jobId: string }> },
) {
const { jobId } = await params;
if (!jobId) {
return Response.json({ error: "Job ID required" }, { status: 400 });
}
const url = new URL(request.url);
const limitParam = url.searchParams.get("limit");
const limit = Math.max(1, Math.min(500, Number(limitParam) || 100));
const logPath = join(CRON_DIR, "runs", `${jobId}.jsonl`);
const entries = readRunLog(logPath, limit);
return Response.json({ entries });
}

View File

@ -0,0 +1,99 @@
import { readFileSync, existsSync, readdirSync } from "node:fs";
import { join } from "node:path";
import { homedir } from "node:os";
export const dynamic = "force-dynamic";
const CRON_DIR = join(homedir(), ".openclaw", "cron");
const JOBS_FILE = join(CRON_DIR, "jobs.json");
type CronStoreFile = {
version: 1;
jobs: Array<Record<string, unknown>>;
};
/** Read cron jobs.json, returning empty array if missing or invalid. */
function readJobsFile(): Array<Record<string, unknown>> {
if (!existsSync(JOBS_FILE)) {return [];}
try {
const raw = readFileSync(JOBS_FILE, "utf-8");
const parsed = JSON.parse(raw) as CronStoreFile;
if (parsed && Array.isArray(parsed.jobs)) {return parsed.jobs;}
return [];
} catch {
return [];
}
}
/** Compute next wake time from job states (minimum nextRunAtMs among enabled jobs). */
function computeNextWakeAtMs(jobs: Array<Record<string, unknown>>): number | null {
let min: number | null = null;
for (const job of jobs) {
if (job.enabled !== true) {continue;}
const state = job.state as Record<string, unknown> | undefined;
if (!state) {continue;}
const next = state.nextRunAtMs;
if (typeof next === "number" && Number.isFinite(next)) {
if (min === null || next < min) {min = next;}
}
}
return min;
}
/** Read heartbeat config from ~/.openclaw/config.yaml (best-effort). */
function readHeartbeatInfo(): { intervalMs: number; nextDueEstimateMs: number | null } {
const defaults = { intervalMs: 30 * 60_000, nextDueEstimateMs: null as number | null };
// Try to read agent session stores to estimate next heartbeat from lastRunMs
try {
const agentsDir = join(homedir(), ".openclaw", "agents");
if (!existsSync(agentsDir)) {return defaults;}
const agentDirs = readdirSync(agentsDir, { withFileTypes: true });
let latestHeartbeat: number | null = null;
for (const d of agentDirs) {
if (!d.isDirectory()) {continue;}
const storePath = join(agentsDir, d.name, "sessions", "sessions.json");
if (!existsSync(storePath)) {continue;}
try {
const raw = readFileSync(storePath, "utf-8");
const store = JSON.parse(raw) as Record<string, { updatedAt?: number }>;
// Look for the main agent session (shortest key, most recently updated)
for (const [key, entry] of Object.entries(store)) {
if (key.startsWith("agent:") && !key.includes(":cron:") && entry.updatedAt) {
if (latestHeartbeat === null || entry.updatedAt > latestHeartbeat) {
latestHeartbeat = entry.updatedAt;
}
}
}
} catch {
// skip
}
}
if (latestHeartbeat) {
defaults.nextDueEstimateMs = latestHeartbeat + defaults.intervalMs;
}
} catch {
// ignore
}
return defaults;
}
/** GET /api/cron/jobs -- list all cron jobs with heartbeat & status info */
export async function GET() {
const jobs = readJobsFile();
const heartbeat = readHeartbeatInfo();
const nextWakeAtMs = computeNextWakeAtMs(jobs);
return Response.json({
jobs,
heartbeat,
cronStatus: {
enabled: jobs.length > 0,
nextWakeAtMs,
},
});
}

View File

@ -0,0 +1,154 @@
import { readFileSync, readdirSync, existsSync } from "node:fs";
import { join } from "node:path";
import { homedir } from "node:os";
export const dynamic = "force-dynamic";
type MessagePart =
| { type: "text"; text: string }
| { type: "thinking"; thinking: string }
| { type: "tool-call"; toolName: string; toolCallId: string; args?: unknown; output?: string };
type ParsedMessage = {
id: string;
role: "user" | "assistant" | "system";
parts: MessagePart[];
timestamp: string;
};
/** Search agent session directories for a session file by ID. */
function findSessionFile(sessionId: string): string | null {
const agentsDir = join(homedir(), ".openclaw", "agents");
if (!existsSync(agentsDir)) {return null;}
try {
const agentDirs = readdirSync(agentsDir, { withFileTypes: true });
for (const agentDir of agentDirs) {
if (!agentDir.isDirectory()) {continue;}
const sessionFile = join(agentsDir, agentDir.name, "sessions", `${sessionId}.jsonl`);
if (existsSync(sessionFile)) {return sessionFile;}
}
} catch {
// ignore
}
return null;
}
/** Parse a JSONL session transcript into structured messages with thinking and tool calls. */
function parseSessionTranscript(content: string): ParsedMessage[] {
const lines = content.trim().split("\n").filter((l) => l.trim());
const messages: ParsedMessage[] = [];
// Track tool calls for linking invocations with results
const pendingToolCalls = new Map<string, { toolName: string; args?: unknown }>();
for (const line of lines) {
try {
const entry = JSON.parse(line);
if (entry.type === "message" && entry.message) {
const msg = entry.message;
const role = msg.role as "user" | "assistant" | "system";
const parts: MessagePart[] = [];
if (Array.isArray(msg.content)) {
for (const part of msg.content) {
if (part.type === "text" && typeof part.text === "string" && part.text.trim()) {
parts.push({ type: "text", text: part.text });
} else if (part.type === "thinking" && typeof part.thinking === "string" && part.thinking.trim()) {
parts.push({ type: "thinking", thinking: part.thinking });
} else if (part.type === "tool_use" || part.type === "tool-call") {
const toolName = part.name ?? part.toolName ?? "unknown";
const toolCallId = part.id ?? part.toolCallId ?? `tool-${Date.now()}`;
pendingToolCalls.set(toolCallId, { toolName, args: part.input ?? part.args });
parts.push({
type: "tool-call",
toolName,
toolCallId,
args: part.input ?? part.args,
});
} else if (part.type === "tool_result" || part.type === "tool-result") {
const toolCallId = part.tool_use_id ?? part.toolCallId ?? "";
const pending = pendingToolCalls.get(toolCallId);
const outputText = typeof part.content === "string"
? part.content
: Array.isArray(part.content)
? part.content.filter((c: { type: string }) => c.type === "text").map((c: { text: string }) => c.text).join("\n")
: typeof part.output === "string"
? part.output
: JSON.stringify(part.output ?? part.content ?? "");
if (pending) {
// Merge output into existing tool-call part
const existingMsg = messages[messages.length - 1];
if (existingMsg) {
const tc = existingMsg.parts.find(
(p) => p.type === "tool-call" && (p as { toolCallId: string }).toolCallId === toolCallId,
);
if (tc && tc.type === "tool-call") {
(tc as { output?: string }).output = outputText.slice(0, 5000);
continue;
}
}
parts.push({
type: "tool-call",
toolName: pending.toolName,
toolCallId,
args: pending.args,
output: outputText.slice(0, 5000),
});
} else {
parts.push({
type: "tool-call",
toolName: "tool",
toolCallId,
output: outputText.slice(0, 5000),
});
}
}
}
} else if (typeof msg.content === "string" && msg.content.trim()) {
parts.push({ type: "text", text: msg.content });
}
if (parts.length > 0) {
messages.push({
id: entry.id ?? `msg-${messages.length}`,
role,
parts,
timestamp: entry.timestamp ?? new Date(entry.ts ?? Date.now()).toISOString(),
});
}
}
} catch {
// skip malformed lines
}
}
return messages;
}
/** GET /api/cron/runs/[sessionId] -- get full session transcript for a cron run */
export async function GET(
_request: Request,
{ params }: { params: Promise<{ sessionId: string }> },
) {
const { sessionId } = await params;
if (!sessionId) {
return Response.json({ error: "Session ID required" }, { status: 400 });
}
const sessionFile = findSessionFile(sessionId);
if (!sessionFile) {
return Response.json({ error: "Session not found" }, { status: 404 });
}
try {
const content = readFileSync(sessionFile, "utf-8");
const messages = parseSessionTranscript(content);
return Response.json({ sessionId, messages });
} catch (error) {
console.error("Error reading cron session:", error);
return Response.json({ error: "Failed to read session" }, { status: 500 });
}
}

View File

@ -1,5 +1,4 @@
import {
appendFileSync,
readFileSync,
writeFileSync,
existsSync,
@ -21,7 +20,14 @@ type IndexEntry = {
messageCount: number;
};
/** POST /api/web-sessions/[id]/messages — append messages to a session */
/**
* POST /api/web-sessions/[id]/messages append or upsert messages.
*
* Uses upsert semantics: if a message with the same `id` already exists
* in the session JSONL, it is replaced in-place. Otherwise the message
* is appended. This supports both the client's post-stream save and the
* server-side incremental persistence from the ActiveRunManager.
*/
export async function POST(
request: Request,
{ params }: { params: Promise<{ id: string }> },
@ -43,11 +49,39 @@ export async function POST(
return Response.json({ error: "messages array required" }, { status: 400 });
}
// Append each message as a JSONL line
// Read existing lines for upsert checks.
const existing = readFileSync(filePath, "utf-8");
const lines = existing.split("\n").filter((l) => l.trim());
let newCount = 0;
for (const msg of messages) {
appendFileSync(filePath, JSON.stringify(msg) + "\n");
const msgId = typeof msg.id === "string" ? msg.id : undefined;
let found = false;
if (msgId) {
for (let i = 0; i < lines.length; i++) {
try {
const parsed = JSON.parse(lines[i]);
if (parsed.id === msgId) {
// Replace the existing line in-place.
lines[i] = JSON.stringify(msg);
found = true;
break;
}
} catch {
/* keep malformed lines as-is */
}
}
}
if (!found) {
lines.push(JSON.stringify(msg));
newCount++;
}
}
writeFileSync(filePath, lines.join("\n") + "\n");
// Update index metadata
try {
if (existsSync(INDEX_FILE)) {
@ -57,8 +91,8 @@ export async function POST(
const session = index.find((s) => s.id === id);
if (session) {
session.updatedAt = Date.now();
session.messageCount += messages.length;
if (title) session.title = title;
if (newCount > 0) {session.messageCount += newCount;}
if (title) {session.title = title;}
writeFileSync(INDEX_FILE, JSON.stringify(index, null, 2));
}
}

View File

@ -178,7 +178,7 @@ function extractDomains(text: string): string[] {
}
function faviconUrl(domain: string): string {
return `https://www.google.com/s2/favicons?domain=${encodeURIComponent(domain)}&sz=32`;
return `https://www.google.com/s2/favicons?sz=64&domain_url=${encodeURIComponent(domain)}`;
}
/* ─── Classify tool steps ─── */
@ -192,7 +192,10 @@ type StepKind =
| "image"
| "generic";
function classifyTool(name: string): StepKind {
function classifyTool(
name: string,
args?: Record<string, unknown>,
): StepKind {
const n = name.toLowerCase().replace(/[_-]/g, "");
if (
[
@ -205,6 +208,18 @@ function classifyTool(name: string): StepKind {
].some((k) => n.includes(k))
)
{return "search";}
// Browser tool — classify based on the action being performed
if (n === "browser") {
const action =
typeof args?.action === "string"
? args.action.toLowerCase()
: "";
if (action === "open" || action === "navigate") {return "fetch";}
if (action === "screenshot") {return "image";}
return "fetch"; // default: most browser actions involve a page
}
if (
["fetchurl", "fetch", "browse", "browseurl", "webfetch"].some(
(k) => n.includes(k),
@ -278,7 +293,10 @@ function buildStepLabel(
}
case "fetch": {
const u =
strVal("url") ?? strVal("path") ?? strVal("src");
strVal("url") ??
strVal("targetUrl") ??
strVal("path") ??
strVal("src");
if (u) {
try {
return `Fetching ${new URL(u).hostname}`;
@ -286,6 +304,17 @@ function buildStepLabel(
return `Fetching ${u.length > 50 ? u.slice(0, 50) + "..." : u}`;
}
}
// Fallback: check output for the URL (web_fetch results include url/finalUrl)
const outUrl =
(typeof output?.finalUrl === "string" && output.finalUrl) ||
(typeof output?.url === "string" && output.url);
if (outUrl) {
try {
return `Fetched ${new URL(outUrl).hostname}`;
} catch {
return `Fetched ${outUrl.length > 50 ? outUrl.slice(0, 50) + "..." : outUrl}`;
}
}
return "Fetching page";
}
case "read": {
@ -334,10 +363,13 @@ function getSearchDomains(
if (!output) {return [];}
const text = typeof output.text === "string" ? output.text : "";
const results = output.results;
const citations = output.citations;
let combined = text;
if (Array.isArray(results)) {
for (const r of results) {
if (typeof r === "object" && r !== null) {
if (typeof r === "string") {
combined += ` ${r}`;
} else if (typeof r === "object" && r !== null) {
const obj = r as Record<string, unknown>;
if (typeof obj.url === "string")
{combined += ` ${obj.url}`;}
@ -346,9 +378,64 @@ function getSearchDomains(
}
}
}
if (Array.isArray(citations)) {
for (const c of citations) {
// Citations can be plain URL strings or objects with a url field
if (typeof c === "string") {
combined += ` ${c}`;
} else if (typeof c === "object" && c !== null) {
const obj = c as Record<string, unknown>;
if (typeof obj.url === "string")
{combined += ` ${obj.url}`;}
}
}
}
// Scan all remaining string values in the output for URLs we may have missed
for (const val of Object.values(output)) {
if (typeof val === "string" && val !== text && val.includes("http")) {
combined += ` ${val}`;
}
}
return extractDomains(combined);
}
/** Extract domain(s) from fetch/browser tool args and/or output */
function getFetchDomains(
args?: Record<string, unknown>,
output?: Record<string, unknown>,
): string[] {
const domains = new Set<string>();
// Check args for URL (web_fetch uses "url", browser tool uses "targetUrl")
for (const key of ["url", "targetUrl", "path", "src"]) {
const v = args?.[key];
if (typeof v === "string" && v.startsWith("http")) {
try {
const hostname = new URL(v).hostname;
if (hostname && !hostname.includes("localhost")) {
domains.add(hostname);
}
} catch {
/* skip */
}
}
}
// Check output for URL / finalUrl
for (const key of ["url", "finalUrl", "targetUrl"]) {
const v = output?.[key];
if (typeof v === "string" && v.startsWith("http")) {
try {
const hostname = new URL(v).hostname;
if (hostname && !hostname.includes("localhost")) {
domains.add(hostname);
}
} catch {
/* skip */
}
}
}
return [...domains].slice(0, 4);
}
/* ─── Group consecutive media reads ─── */
type ToolPart = Extract<ChainPart, { kind: "tool" }>;
@ -366,7 +453,7 @@ function groupToolSteps(tools: ToolPart[]): VisualItem[] {
let i = 0;
while (i < tools.length) {
const tool = tools[i];
const kind = classifyTool(tool.toolName);
const kind = classifyTool(tool.toolName, tool.args);
// Check both args AND output for the file path
const filePath = getFilePath(tool.args, tool.output);
const media = filePath ? detectMedia(filePath) : null;
@ -379,7 +466,7 @@ function groupToolSteps(tools: ToolPart[]): VisualItem[] {
let j = i + 1;
while (j < tools.length) {
const next = tools[j];
const nextKind = classifyTool(next.toolName);
const nextKind = classifyTool(next.toolName, next.args);
const nextPath = getFilePath(next.args, next.output);
const nextMedia = nextPath ? detectMedia(nextPath) : null;
if (nextKind === "read" && nextMedia === media && nextPath) {
@ -972,12 +1059,14 @@ function ToolStep({
errorText?: string;
}) {
const [showOutput, setShowOutput] = useState(false);
const kind = classifyTool(toolName);
const kind = classifyTool(toolName, args);
const label = buildStepLabel(kind, toolName, args, output);
const domains =
(kind === "search" || kind === "fetch") && status === "done"
kind === "search"
? getSearchDomains(output)
: [];
: kind === "fetch"
? getFetchDomains(args, output)
: [];
const outputText =
typeof output?.text === "string" ? output.text : undefined;
@ -1067,8 +1156,8 @@ function ToolStep({
/>
)}
{/* Search domain badges */}
{domains.length > 0 && (
{/* Domain badges (search results / fetched page) — skip when running, the running section handles its own */}
{domains.length > 0 && status !== "running" && (
<div className="flex items-center gap-1.5 flex-wrap mt-1.5">
{domains.map((domain) => (
<DomainBadge
@ -1083,26 +1172,65 @@ function ToolStep({
status === "running" &&
args && (
<div className="flex items-center gap-1.5 flex-wrap mt-1.5">
<span
className="inline-flex items-center gap-1 h-6 px-2 rounded-full text-[11px]"
style={{
background:
"var(--color-surface-hover)",
color: "var(--color-text-muted)",
border: "1px solid var(--color-border)",
}}
>
<span
className="w-2 h-2 rounded-full animate-pulse"
style={{
background:
"var(--color-accent)",
}}
/>
{kind === "fetch"
? "Fetching..."
: "Searching..."}
</span>
{/* Show favicon badges for known domains while running */}
{domains.length > 0
? domains.map((domain) => (
<span
key={domain}
className="inline-flex items-center gap-1.5 h-7 px-2.5 rounded-full text-[12px]"
style={{
background:
"var(--color-surface-hover)",
color: "var(--color-text-secondary)",
border: "1px solid var(--color-border)",
}}
>
{/* eslint-disable-next-line @next/next/no-img-element */}
<img
src={faviconUrl(
domain,
)}
alt=""
width={14}
height={14}
className="rounded-sm flex-shrink-0"
loading="lazy"
/>
{domain.replace(
/^www\./,
"",
)}
<span
className="w-2 h-2 rounded-full animate-pulse"
style={{
background:
"var(--color-accent)",
}}
/>
</span>
))
: (
<span
className="inline-flex items-center gap-1 h-6 px-2 rounded-full text-[11px]"
style={{
background:
"var(--color-surface-hover)",
color: "var(--color-text-muted)",
border: "1px solid var(--color-border)",
}}
>
<span
className="w-2 h-2 rounded-full animate-pulse"
style={{
background:
"var(--color-accent)",
}}
/>
{kind === "fetch"
? "Fetching..."
: "Searching..."}
</span>
)}
</div>
)}
@ -1177,8 +1305,8 @@ function DomainBadge({ domain }: { domain: string }) {
<img
src={faviconUrl(domain)}
alt=""
width={14}
height={14}
width={16}
height={16}
className="rounded-sm flex-shrink-0"
loading="lazy"
/>

View File

@ -13,6 +13,139 @@ import {
} from "react";
import { ChatMessage } from "./chat-message";
// ── SSE stream parser for reconnection ──
// Converts raw SSE events (AI SDK v6 wire format) into UIMessage parts.
type ParsedPart =
| { type: "text"; text: string }
| { type: "reasoning"; text: string; state?: string }
| {
type: "dynamic-tool";
toolName: string;
toolCallId: string;
state: string;
input?: Record<string, unknown>;
output?: Record<string, unknown>;
};
function createStreamParser() {
const parts: ParsedPart[] = [];
let currentTextIdx = -1;
let currentReasoningIdx = -1;
function processEvent(event: Record<string, unknown>) {
const t = event.type as string;
switch (t) {
case "reasoning-start":
parts.push({
type: "reasoning",
text: "",
state: "streaming",
});
currentReasoningIdx = parts.length - 1;
break;
case "reasoning-delta": {
if (currentReasoningIdx >= 0) {
const p = parts[currentReasoningIdx] as {
type: "reasoning";
text: string;
};
p.text += event.delta as string;
}
break;
}
case "reasoning-end":
if (currentReasoningIdx >= 0) {
const p = parts[currentReasoningIdx] as {
type: "reasoning";
state?: string;
};
delete p.state;
}
currentReasoningIdx = -1;
break;
case "text-start":
parts.push({ type: "text", text: "" });
currentTextIdx = parts.length - 1;
break;
case "text-delta": {
if (currentTextIdx >= 0) {
const p = parts[currentTextIdx] as {
type: "text";
text: string;
};
p.text += event.delta as string;
}
break;
}
case "text-end":
currentTextIdx = -1;
break;
case "tool-input-start":
parts.push({
type: "dynamic-tool",
toolCallId: event.toolCallId as string,
toolName: event.toolName as string,
state: "input-available",
input: {},
});
break;
case "tool-input-available":
for (let i = parts.length - 1; i >= 0; i--) {
const p = parts[i];
if (
p.type === "dynamic-tool" &&
p.toolCallId === event.toolCallId
) {
p.input =
(event.input as Record<string, unknown>) ??
{};
break;
}
}
break;
case "tool-output-available":
for (let i = parts.length - 1; i >= 0; i--) {
const p = parts[i];
if (
p.type === "dynamic-tool" &&
p.toolCallId === event.toolCallId
) {
p.state = "output-available";
p.output =
(event.output as Record<
string,
unknown
>) ?? {};
break;
}
}
break;
case "tool-output-error":
for (let i = parts.length - 1; i >= 0; i--) {
const p = parts[i];
if (
p.type === "dynamic-tool" &&
p.toolCallId === event.toolCallId
) {
p.state = "error";
p.output = {
error: event.errorText as string,
};
break;
}
}
break;
}
}
return {
processEvent,
getParts: (): ParsedPart[] => parts.map((p) => ({ ...p })),
};
}
/** Imperative handle for parent-driven session control (main page). */
export type ChatPanelHandle = {
loadSession: (sessionId: string) => Promise<void>;
@ -64,6 +197,10 @@ export const ChatPanel = forwardRef<ChatPanelHandle, ChatPanelProps>(
const [startingNewSession, setStartingNewSession] = useState(false);
const messagesEndRef = useRef<HTMLDivElement>(null);
// ── Reconnection state ──
const [isReconnecting, setIsReconnecting] = useState(false);
const reconnectAbortRef = useRef<AbortController | null>(null);
// Track persisted messages to avoid double-saves
const savedMessageIdsRef = useRef<Set<string>>(new Set());
// Set when /new or + triggers a new session
@ -101,7 +238,9 @@ export const ChatPanel = forwardRef<ChatPanelHandle, ChatPanelProps>(
useChat({ transport });
const isStreaming =
status === "streaming" || status === "submitted";
status === "streaming" ||
status === "submitted" ||
isReconnecting;
// Auto-scroll to bottom on new messages
useEffect(() => {
@ -129,67 +268,119 @@ export const ChatPanel = forwardRef<ChatPanelHandle, ChatPanelProps>(
[filePath],
);
const saveMessages = useCallback(
// ── Stream reconnection ──
// Attempts to reconnect to an active agent run for the given session.
// Replays buffered SSE events and streams live updates.
const attemptReconnect = useCallback(
async (
sessionId: string,
msgs: Array<{
baseMessages: Array<{
id: string;
role: string;
content: string;
parts?: unknown[];
role: "user" | "assistant" | "system";
parts: UIMessage["parts"];
}>,
title?: string,
) => {
const toSave = msgs.map((m) => ({
id: m.id,
role: m.role,
content: m.content,
...(m.parts ? { parts: m.parts } : {}),
timestamp: new Date().toISOString(),
}));
): Promise<boolean> => {
const abort = new AbortController();
reconnectAbortRef.current = abort;
try {
await fetch(
`/api/web-sessions/${sessionId}/messages`,
{
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify({
messages: toSave,
title,
}),
},
const res = await fetch(
`/api/chat/stream?sessionId=${encodeURIComponent(sessionId)}`,
{ signal: abort.signal },
);
for (const m of msgs) {
if (!res.ok || !res.body) {
return false; // No active run
}
setIsReconnecting(true);
const parser = createStreamParser();
const reader = res.body.getReader();
const decoder = new TextDecoder();
const reconnectMsgId = `reconnect-${sessionId}`;
let buffer = "";
let frameRequested = false;
const updateUI = () => {
const assistantMsg = {
id: reconnectMsgId,
role: "assistant" as const,
parts: parser.getParts() as UIMessage["parts"],
};
setMessages([
...baseMessages,
assistantMsg,
]);
};
// Read the SSE stream
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition -- loop reads until done
while (true) {
const { done, value } =
await reader.read();
if (done) {break;}
buffer += decoder.decode(value, {
stream: true,
});
// Parse SSE events (data: <json>\n\n)
let idx;
while (
(idx = buffer.indexOf("\n\n")) !== -1
) {
const chunk = buffer.slice(0, idx);
buffer = buffer.slice(idx + 2);
if (chunk.startsWith("data: ")) {
try {
const event = JSON.parse(
chunk.slice(6),
);
parser.processEvent(event);
} catch {
/* skip malformed events */
}
}
}
// Batch UI updates to animation frames
if (!frameRequested) {
frameRequested = true;
requestAnimationFrame(() => {
frameRequested = false;
updateUI();
});
}
}
// Final update after stream ends
updateUI();
// Mark all messages as saved (server persisted them)
for (const m of baseMessages) {
savedMessageIdsRef.current.add(m.id);
}
onSessionsChange?.();
savedMessageIdsRef.current.add(reconnectMsgId);
setIsReconnecting(false);
reconnectAbortRef.current = null;
return true;
} catch (err) {
console.error("Failed to save messages:", err);
if (
(err as Error).name !== "AbortError"
) {
console.error(
"Reconnection error:",
err,
);
}
setIsReconnecting(false);
reconnectAbortRef.current = null;
return false;
}
},
[onSessionsChange],
);
/** Extract plain text from a UIMessage */
const getMessageText = useCallback(
(msg: (typeof messages)[number]): string => {
return (
msg.parts
?.filter(
(
p,
): p is {
type: "text";
text: string;
} => p.type === "text",
)
.map((p) => p.text)
.join("\n") ?? ""
);
},
[],
[setMessages],
);
// ── File-scoped session initialization ──
@ -253,9 +444,21 @@ export const ChatPanel = forwardRef<ChatPanelHandle, ChatPanelProps>(
role: "user" | "assistant";
content: string;
parts?: Array<Record<string, unknown>>;
_streaming?: boolean;
}> = msgData.messages || [];
const uiMessages = sessionMessages.map(
// Filter out in-progress streaming messages
// (will be rebuilt from the live SSE stream)
const hasStreaming = sessionMessages.some(
(m) => m._streaming,
);
const completedMessages = hasStreaming
? sessionMessages.filter(
(m) => !m._streaming,
)
: sessionMessages;
const uiMessages = completedMessages.map(
(msg) => {
savedMessageIdsRef.current.add(msg.id);
return {
@ -273,6 +476,15 @@ export const ChatPanel = forwardRef<ChatPanelHandle, ChatPanelProps>(
if (!cancelled) {
setMessages(uiMessages);
}
// If there was a streaming message, try to
// reconnect to the active agent run.
if (hasStreaming && !cancelled) {
await attemptReconnect(
latest.id,
uiMessages,
);
}
} catch {
// ignore
}
@ -283,9 +495,12 @@ export const ChatPanel = forwardRef<ChatPanelHandle, ChatPanelProps>(
cancelled = true;
};
// eslint-disable-next-line react-hooks/exhaustive-deps -- stable setters
}, [filePath]);
}, [filePath, attemptReconnect]);
// ── Persist unsaved messages + live-reload after streaming ──
// ── Post-stream side-effects (file-reload, session refresh) ──
// Message persistence is handled server-side by ActiveRunManager,
// so we only refresh the file sessions list and notify the parent
// when the file content may have changed.
const prevStatusRef = useRef(status);
useEffect(() => {
const wasStreaming =
@ -294,17 +509,10 @@ export const ChatPanel = forwardRef<ChatPanelHandle, ChatPanelProps>(
const isNowReady = status === "ready";
if (wasStreaming && isNowReady && currentSessionId) {
const unsaved = messages.filter(
(m) => !savedMessageIdsRef.current.has(m.id),
);
if (unsaved.length > 0) {
const toSave = unsaved.map((m) => ({
id: m.id,
role: m.role,
content: getMessageText(m),
parts: m.parts,
}));
saveMessages(currentSessionId, toSave);
// Mark all current messages as saved — the server
// already persisted them via ActiveRunManager.
for (const m of messages) {
savedMessageIdsRef.current.add(m.id);
}
if (filePath) {
@ -327,16 +535,17 @@ export const ChatPanel = forwardRef<ChatPanelHandle, ChatPanelProps>(
})
.catch(() => {});
}
onSessionsChange?.();
}
prevStatusRef.current = status;
}, [
status,
messages,
currentSessionId,
saveMessages,
getMessageText,
filePath,
onFileChanged,
onSessionsChange,
]);
// ── Actions ──
@ -391,7 +600,10 @@ export const ChatPanel = forwardRef<ChatPanelHandle, ChatPanelProps>(
return;
}
// Stop any active stream/reconnection for the old session.
reconnectAbortRef.current?.abort();
stop();
setLoadingSession(true);
setCurrentSessionId(sessionId);
sessionIdRef.current = sessionId;
@ -413,23 +625,43 @@ export const ChatPanel = forwardRef<ChatPanelHandle, ChatPanelProps>(
role: "user" | "assistant";
content: string;
parts?: Array<Record<string, unknown>>;
_streaming?: boolean;
}> = data.messages || [];
const uiMessages = sessionMessages.map((msg) => {
savedMessageIdsRef.current.add(msg.id);
return {
id: msg.id,
role: msg.role,
parts: (msg.parts ?? [
{
type: "text" as const,
text: msg.content,
},
]) as UIMessage["parts"],
};
});
const hasStreaming = sessionMessages.some(
(m) => m._streaming,
);
const completedMessages = hasStreaming
? sessionMessages.filter(
(m) => !m._streaming,
)
: sessionMessages;
const uiMessages = completedMessages.map(
(msg) => {
savedMessageIdsRef.current.add(msg.id);
return {
id: msg.id,
role: msg.role,
parts: (msg.parts ?? [
{
type: "text" as const,
text: msg.content,
},
]) as UIMessage["parts"],
};
},
);
setMessages(uiMessages);
// Reconnect to active stream if one exists.
if (hasStreaming) {
await attemptReconnect(
sessionId,
uiMessages,
);
}
} catch (err) {
console.error("Error loading session:", err);
} finally {
@ -441,11 +673,14 @@ export const ChatPanel = forwardRef<ChatPanelHandle, ChatPanelProps>(
setMessages,
onActiveSessionChange,
stop,
attemptReconnect,
],
);
const handleNewSession = useCallback(async () => {
reconnectAbortRef.current?.abort();
stop();
setIsReconnecting(false);
setCurrentSessionId(null);
sessionIdRef.current = null;
onActiveSessionChange?.(null);
@ -477,21 +712,46 @@ export const ChatPanel = forwardRef<ChatPanelHandle, ChatPanelProps>(
[handleSessionSelect, handleNewSession],
);
// ── Stop handler (aborts server-side run + client-side stream) ──
const handleStop = useCallback(async () => {
// Abort the server-side agent run
if (currentSessionId) {
fetch("/api/chat/stop", {
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify({
sessionId: currentSessionId,
}),
}).catch(() => {});
}
// Abort reconnection stream if active
reconnectAbortRef.current?.abort();
setIsReconnecting(false);
// Stop the useChat transport stream
stop();
}, [currentSessionId, stop]);
// ── Status label ──
const statusLabel = startingNewSession
? "Starting new session..."
: loadingSession
? "Loading session..."
: status === "ready"
? "Ready"
: status === "submitted"
? "Thinking..."
: status === "streaming"
? "Streaming..."
: status === "error"
? "Error"
: status;
: isReconnecting
? "Resuming stream..."
: status === "ready"
? "Ready"
: status === "submitted"
? "Thinking..."
: status === "streaming"
? "Streaming..."
: status === "error"
? "Error"
: status;
// ── Render ──
@ -577,7 +837,7 @@ export const ChatPanel = forwardRef<ChatPanelHandle, ChatPanelProps>(
{isStreaming && (
<button
type="button"
onClick={() => stop()}
onClick={() => handleStop()}
className={`${compact ? "px-2 py-0.5 text-[10px]" : "px-3 py-1 text-xs"} rounded-full font-medium`}
style={{
background:

View File

@ -0,0 +1,444 @@
"use client";
import { useEffect, useState, useCallback } from "react";
import type {
CronJob,
HeartbeatInfo,
CronStatusInfo,
CronJobsResponse,
} from "../../types/cron";
/* ─── Helpers ─── */
function formatSchedule(schedule: CronJob["schedule"]): string {
switch (schedule.kind) {
case "cron":
return `cron: ${schedule.expr}${schedule.tz ? ` (${schedule.tz})` : ""}`;
case "every": {
const ms = schedule.everyMs;
if (ms >= 86_400_000) {return `every ${Math.round(ms / 86_400_000)}d`;}
if (ms >= 3_600_000) {return `every ${Math.round(ms / 3_600_000)}h`;}
if (ms >= 60_000) {return `every ${Math.round(ms / 60_000)}m`;}
return `every ${Math.round(ms / 1000)}s`;
}
case "at":
return `at ${schedule.at}`;
default:
return "unknown";
}
}
function formatCountdown(ms: number): string {
if (ms <= 0) {return "now";}
const totalSec = Math.ceil(ms / 1000);
if (totalSec < 60) {return `${totalSec}s`;}
const min = Math.floor(totalSec / 60);
const sec = totalSec % 60;
if (min < 60) {return sec > 0 ? `${min}m ${sec}s` : `${min}m`;}
const hr = Math.floor(min / 60);
const remMin = min % 60;
return remMin > 0 ? `${hr}h ${remMin}m` : `${hr}h`;
}
function formatTimeAgo(ms: number): string {
const ago = Date.now() - ms;
if (ago < 60_000) {return "just now";}
if (ago < 3_600_000) {return `${Math.floor(ago / 60_000)}m ago`;}
if (ago < 86_400_000) {return `${Math.floor(ago / 3_600_000)}h ago`;}
return `${Math.floor(ago / 86_400_000)}d ago`;
}
function formatDuration(ms: number): string {
if (ms < 1000) {return `${ms}ms`;}
if (ms < 60_000) {return `${(ms / 1000).toFixed(1)}s`;}
return `${Math.floor(ms / 60_000)}m ${Math.floor((ms % 60_000) / 1000)}s`;
}
function jobStatusLabel(job: CronJob): string {
if (!job.enabled) {return "disabled";}
if (job.state.runningAtMs) {return "running";}
return job.state.lastStatus ?? "idle";
}
function jobStatusColor(status: string): string {
switch (status) {
case "ok": return "var(--color-success, #22c55e)";
case "running": return "var(--color-accent)";
case "error": return "var(--color-error, #ef4444)";
case "disabled": return "var(--color-text-muted)";
case "skipped": return "var(--color-warning, #f59e0b)";
default: return "var(--color-text-muted)";
}
}
/* ─── Countdown hook ─── */
function useCountdown(targetMs: number | null | undefined): string | null {
const [now, setNow] = useState(Date.now());
useEffect(() => {
if (!targetMs) {return;}
const id = setInterval(() => setNow(Date.now()), 1000);
return () => clearInterval(id);
}, [targetMs]);
if (!targetMs) {return null;}
return formatCountdown(targetMs - now);
}
/* ─── Main component ─── */
export function CronDashboard({
onSelectJob,
}: {
onSelectJob: (jobId: string) => void;
}) {
const [jobs, setJobs] = useState<CronJob[]>([]);
const [heartbeat, setHeartbeat] = useState<HeartbeatInfo>({ intervalMs: 30 * 60_000, nextDueEstimateMs: null });
const [cronStatus, setCronStatus] = useState<CronStatusInfo>({ enabled: false, nextWakeAtMs: null });
const [loading, setLoading] = useState(true);
const fetchData = useCallback(async () => {
try {
const res = await fetch("/api/cron/jobs");
const data: CronJobsResponse = await res.json();
setJobs(data.jobs ?? []);
setHeartbeat(data.heartbeat ?? { intervalMs: 30 * 60_000, nextDueEstimateMs: null });
setCronStatus(data.cronStatus ?? { enabled: false, nextWakeAtMs: null });
} catch {
// ignore
} finally {
setLoading(false);
}
}, []);
useEffect(() => {
fetchData();
const id = setInterval(fetchData, 30_000);
return () => clearInterval(id);
}, [fetchData]);
const heartbeatCountdown = useCountdown(heartbeat.nextDueEstimateMs);
const cronWakeCountdown = useCountdown(cronStatus.nextWakeAtMs);
if (loading) {
return (
<div className="flex items-center justify-center h-full p-8">
<div
className="w-6 h-6 border-2 rounded-full animate-spin"
style={{ borderColor: "var(--color-border)", borderTopColor: "var(--color-accent)" }}
/>
</div>
);
}
const enabledJobs = jobs.filter((j) => j.enabled);
const disabledJobs = jobs.filter((j) => !j.enabled);
return (
<div className="p-6 max-w-5xl mx-auto">
{/* Header */}
<h1
className="font-instrument text-3xl tracking-tight mb-1"
style={{ color: "var(--color-text)" }}
>
Cron
</h1>
<p className="text-sm mb-6" style={{ color: "var(--color-text-muted)" }}>
Scheduled jobs and heartbeat status
</p>
{/* Status cards */}
<div className="grid grid-cols-1 sm:grid-cols-2 lg:grid-cols-3 gap-4 mb-8">
{/* Heartbeat card */}
<StatusCard
title="Heartbeat"
icon={<HeartbeatIcon />}
value={heartbeatCountdown ? `in ${heartbeatCountdown}` : "unknown"}
subtitle={`Interval: ${formatCountdown(heartbeat.intervalMs)}`}
description="The heartbeat wakes the agent periodically. Cron jobs with wakeMode=next-heartbeat piggyback on this loop."
/>
{/* Cron scheduler card */}
<StatusCard
title="Cron Scheduler"
icon={<ClockIcon />}
value={cronWakeCountdown ? `next in ${cronWakeCountdown}` : jobs.length === 0 ? "no jobs" : "idle"}
subtitle={`${enabledJobs.length} active / ${jobs.length} total jobs`}
description="The cron timer fires every ~60s, checking for due jobs. Isolated jobs run independently; main-session jobs wake the heartbeat."
/>
{/* Running card */}
<StatusCard
title="Active Runs"
icon={<RunningIcon />}
value={`${jobs.filter((j) => j.state.runningAtMs).length}`}
subtitle={`${jobs.filter((j) => j.state.lastStatus === "error").length} errors`}
description="Jobs currently executing. Errors show consecutive failures."
/>
</div>
{/* Timeline - upcoming runs in next 24h */}
<TimelineSection jobs={enabledJobs} />
{/* Jobs table */}
<div className="mb-6">
<h2
className="text-sm font-medium uppercase tracking-wider mb-3"
style={{ color: "var(--color-text-muted)" }}
>
Jobs
</h2>
{jobs.length === 0 ? (
<div
className="p-8 text-center rounded-2xl"
style={{
background: "var(--color-surface)",
border: "1px solid var(--color-border)",
}}
>
<p className="text-sm" style={{ color: "var(--color-text-muted)" }}>
No cron jobs configured. Use <code className="px-1.5 py-0.5 rounded text-xs" style={{ background: "var(--color-surface-hover)" }}>openclaw cron add</code> to create one.
</p>
</div>
) : (
<div
className="rounded-2xl overflow-hidden"
style={{
border: "1px solid var(--color-border)",
background: "var(--color-surface)",
}}
>
<table className="w-full text-sm">
<thead>
<tr
style={{
borderBottom: "1px solid var(--color-border)",
color: "var(--color-text-muted)",
}}
>
<th className="text-left px-4 py-2.5 font-medium text-xs uppercase tracking-wider">Name</th>
<th className="text-left px-4 py-2.5 font-medium text-xs uppercase tracking-wider">Schedule</th>
<th className="text-left px-4 py-2.5 font-medium text-xs uppercase tracking-wider">Status</th>
<th className="text-left px-4 py-2.5 font-medium text-xs uppercase tracking-wider">Next Run</th>
<th className="text-left px-4 py-2.5 font-medium text-xs uppercase tracking-wider">Last Run</th>
<th className="text-left px-4 py-2.5 font-medium text-xs uppercase tracking-wider">Target</th>
</tr>
</thead>
<tbody>
{[...enabledJobs, ...disabledJobs].map((job) => (
<JobRow key={job.id} job={job} onClick={() => onSelectJob(job.id)} />
))}
</tbody>
</table>
</div>
)}
</div>
</div>
);
}
/* ─── Status card ─── */
function StatusCard({
title,
icon,
value,
subtitle,
description,
}: {
title: string;
icon: React.ReactNode;
value: string;
subtitle: string;
description: string;
}) {
return (
<div
className="rounded-2xl p-4"
style={{
background: "var(--color-surface)",
border: "1px solid var(--color-border)",
boxShadow: "var(--shadow-sm)",
}}
>
<div className="flex items-center gap-2 mb-2">
<span style={{ color: "var(--color-accent)" }}>{icon}</span>
<span className="text-xs font-medium uppercase tracking-wider" style={{ color: "var(--color-text-muted)" }}>
{title}
</span>
</div>
<div className="text-lg font-semibold mb-0.5" style={{ color: "var(--color-text)" }}>
{value}
</div>
<div className="text-xs mb-2" style={{ color: "var(--color-text-muted)" }}>
{subtitle}
</div>
<div className="text-[11px] leading-relaxed" style={{ color: "var(--color-text-muted)", opacity: 0.7 }}>
{description}
</div>
</div>
);
}
/* ─── Timeline ─── */
function TimelineSection({ jobs }: { jobs: CronJob[] }) {
const [now, setNow] = useState(Date.now());
useEffect(() => {
const id = setInterval(() => setNow(Date.now()), 10_000);
return () => clearInterval(id);
}, []);
const horizon = 24 * 60 * 60 * 1000; // 24h
const upcoming = jobs
.filter((j) => j.state.nextRunAtMs && j.state.nextRunAtMs > now && j.state.nextRunAtMs < now + horizon)
.toSorted((a, b) => (a.state.nextRunAtMs ?? 0) - (b.state.nextRunAtMs ?? 0));
if (upcoming.length === 0) {return null;}
return (
<div className="mb-8">
<h2
className="text-sm font-medium uppercase tracking-wider mb-3"
style={{ color: "var(--color-text-muted)" }}
>
Upcoming (next 24h)
</h2>
<div
className="rounded-2xl p-4"
style={{
background: "var(--color-surface)",
border: "1px solid var(--color-border)",
}}
>
<div className="relative">
{/* Timeline bar */}
<div
className="absolute top-0 left-3 bottom-0 w-px"
style={{ background: "var(--color-border)" }}
/>
<div className="space-y-3">
{upcoming.map((job) => {
const timeUntil = (job.state.nextRunAtMs ?? 0) - now;
return (
<div key={job.id} className="flex items-center gap-3 pl-1">
<div
className="relative z-10 w-5 h-5 rounded-full flex items-center justify-center flex-shrink-0"
style={{ background: "var(--color-accent)", opacity: 0.8 }}
>
<div className="w-2 h-2 rounded-full" style={{ background: "var(--color-bg)" }} />
</div>
<div className="flex-1 min-w-0">
<span className="text-sm font-medium" style={{ color: "var(--color-text)" }}>
{job.name}
</span>
<span className="text-xs ml-2" style={{ color: "var(--color-text-muted)" }}>
in {formatCountdown(timeUntil)}
</span>
</div>
<span className="text-[11px] flex-shrink-0" style={{ color: "var(--color-text-muted)" }}>
{new Date(job.state.nextRunAtMs!).toLocaleTimeString([], { hour: "2-digit", minute: "2-digit" })}
</span>
</div>
);
})}
</div>
</div>
</div>
</div>
);
}
/* ─── Job row ─── */
function JobRow({ job, onClick }: { job: CronJob; onClick: () => void }) {
const status = jobStatusLabel(job);
const statusColor = jobStatusColor(status);
const [now, setNow] = useState(Date.now());
useEffect(() => {
const id = setInterval(() => setNow(Date.now()), 5000);
return () => clearInterval(id);
}, []);
const nextRunStr = job.state.nextRunAtMs
? job.state.nextRunAtMs > now
? `in ${formatCountdown(job.state.nextRunAtMs - now)}`
: "overdue"
: "-";
const lastRunStr = job.state.lastRunAtMs
? `${formatTimeAgo(job.state.lastRunAtMs)}${job.state.lastDurationMs ? ` (${formatDuration(job.state.lastDurationMs)})` : ""}`
: "-";
return (
<tr
className="cursor-pointer transition-colors"
style={{ borderBottom: "1px solid var(--color-border)" }}
onClick={onClick}
onMouseEnter={(e) => { (e.currentTarget as HTMLElement).style.background = "var(--color-surface-hover)"; }}
onMouseLeave={(e) => { (e.currentTarget as HTMLElement).style.background = "transparent"; }}
>
<td className="px-4 py-3">
<div className="font-medium" style={{ color: "var(--color-text)" }}>{job.name}</div>
{job.description && (
<div className="text-xs truncate max-w-[200px]" style={{ color: "var(--color-text-muted)" }}>
{job.description}
</div>
)}
</td>
<td className="px-4 py-3 text-xs" style={{ color: "var(--color-text-muted)" }}>
{formatSchedule(job.schedule)}
</td>
<td className="px-4 py-3">
<span
className="inline-flex items-center gap-1.5 text-xs px-2 py-0.5 rounded-full"
style={{
background: `color-mix(in srgb, ${statusColor} 12%, transparent)`,
color: statusColor,
}}
>
{status === "running" && (
<span className="w-1.5 h-1.5 rounded-full animate-pulse" style={{ background: statusColor }} />
)}
{status}
</span>
</td>
<td className="px-4 py-3 text-xs" style={{ color: "var(--color-text-muted)" }}>
{nextRunStr}
</td>
<td className="px-4 py-3 text-xs" style={{ color: "var(--color-text-muted)" }}>
{lastRunStr}
</td>
<td className="px-4 py-3 text-xs" style={{ color: "var(--color-text-muted)" }}>
{job.sessionTarget}
</td>
</tr>
);
}
/* ─── Icons ─── */
function HeartbeatIcon() {
return (
<svg width="18" height="18" viewBox="0 0 24 24" fill="none" stroke="currentColor" strokeWidth="2" strokeLinecap="round" strokeLinejoin="round">
<path d="M19.5 12.572l-7.5 7.428l-7.5 -7.428a5 5 0 1 1 7.5 -6.566a5 5 0 1 1 7.5 6.572" />
</svg>
);
}
function ClockIcon() {
return (
<svg width="18" height="18" viewBox="0 0 24 24" fill="none" stroke="currentColor" strokeWidth="2" strokeLinecap="round" strokeLinejoin="round">
<circle cx="12" cy="12" r="10" />
<polyline points="12 6 12 12 16 14" />
</svg>
);
}
function RunningIcon() {
return (
<svg width="18" height="18" viewBox="0 0 24 24" fill="none" stroke="currentColor" strokeWidth="2" strokeLinecap="round" strokeLinejoin="round">
<polygon points="6 3 20 12 6 21 6 3" />
</svg>
);
}

View File

@ -0,0 +1,441 @@
"use client";
import { useEffect, useState, useCallback } from "react";
import type { CronJob, CronRunLogEntry, CronRunsResponse } from "../../types/cron";
import { CronRunChat } from "./cron-run-chat";
/* ─── Helpers ─── */
function formatSchedule(schedule: CronJob["schedule"]): string {
switch (schedule.kind) {
case "cron":
return schedule.expr + (schedule.tz ? ` (${schedule.tz})` : "");
case "every": {
const ms = schedule.everyMs;
if (ms >= 86_400_000) {return `every ${Math.round(ms / 86_400_000)} day(s)`;}
if (ms >= 3_600_000) {return `every ${Math.round(ms / 3_600_000)} hour(s)`;}
if (ms >= 60_000) {return `every ${Math.round(ms / 60_000)} minute(s)`;}
return `every ${Math.round(ms / 1000)} second(s)`;
}
case "at":
return new Date(schedule.at).toLocaleString();
default:
return "unknown";
}
}
function formatCountdown(ms: number): string {
if (ms <= 0) {return "now";}
const totalSec = Math.ceil(ms / 1000);
if (totalSec < 60) {return `${totalSec}s`;}
const min = Math.floor(totalSec / 60);
const sec = totalSec % 60;
if (min < 60) {return sec > 0 ? `${min}m ${sec}s` : `${min}m`;}
const hr = Math.floor(min / 60);
const remMin = min % 60;
return remMin > 0 ? `${hr}h ${remMin}m` : `${hr}h`;
}
function formatDuration(ms: number): string {
if (ms < 1000) {return `${ms}ms`;}
if (ms < 60_000) {return `${(ms / 1000).toFixed(1)}s`;}
return `${Math.floor(ms / 60_000)}m ${Math.floor((ms % 60_000) / 1000)}s`;
}
function payloadSummary(payload: CronJob["payload"]): string {
if (payload.kind === "systemEvent") {return payload.text.slice(0, 120);}
return payload.message.slice(0, 120);
}
/* ─── Main component ─── */
export function CronJobDetail({
job,
onBack,
}: {
job: CronJob;
onBack: () => void;
}) {
const [runs, setRuns] = useState<CronRunLogEntry[]>([]);
const [loadingRuns, setLoadingRuns] = useState(true);
const [expandedRunTs, setExpandedRunTs] = useState<number | null>(null);
const fetchRuns = useCallback(async () => {
try {
const res = await fetch(`/api/cron/jobs/${encodeURIComponent(job.id)}/runs?limit=50`);
const data: CronRunsResponse = await res.json();
setRuns(data.entries ?? []);
} catch {
// ignore
} finally {
setLoadingRuns(false);
}
}, [job.id]);
useEffect(() => {
fetchRuns();
const id = setInterval(fetchRuns, 15_000);
return () => clearInterval(id);
}, [fetchRuns]);
const status = !job.enabled
? "disabled"
: job.state.runningAtMs
? "running"
: (job.state.lastStatus ?? "idle");
return (
<div className="p-6 max-w-4xl mx-auto">
{/* Back button + header */}
<button
type="button"
onClick={onBack}
className="flex items-center gap-1 text-sm mb-4 cursor-pointer"
style={{ color: "var(--color-text-muted)" }}
>
<svg width="16" height="16" viewBox="0 0 24 24" fill="none" stroke="currentColor" strokeWidth="2" strokeLinecap="round" strokeLinejoin="round">
<path d="m12 19-7-7 7-7" /><path d="M19 12H5" />
</svg>
Back to Cron
</button>
{/* Job header */}
<div className="mb-6">
<div className="flex items-center gap-3 mb-1">
<h1
className="font-instrument text-3xl tracking-tight"
style={{ color: "var(--color-text)" }}
>
{job.name}
</h1>
<StatusBadge status={status} />
</div>
{job.description && (
<p className="text-sm" style={{ color: "var(--color-text-muted)" }}>
{job.description}
</p>
)}
</div>
{/* Config + countdown grid */}
<div className="grid grid-cols-1 md:grid-cols-2 gap-4 mb-8">
{/* Next run countdown */}
<NextRunCard job={job} />
{/* Job config */}
<div
className="rounded-2xl p-4"
style={{
background: "var(--color-surface)",
border: "1px solid var(--color-border)",
}}
>
<h3 className="text-xs font-medium uppercase tracking-wider mb-3" style={{ color: "var(--color-text-muted)" }}>
Configuration
</h3>
<div className="space-y-2 text-sm">
<ConfigRow label="Schedule" value={formatSchedule(job.schedule)} />
<ConfigRow label="Target" value={job.sessionTarget} />
<ConfigRow label="Wake mode" value={job.wakeMode} />
<ConfigRow label="Payload" value={`${job.payload.kind}: ${payloadSummary(job.payload)}`} />
{job.agentId && <ConfigRow label="Agent" value={job.agentId} />}
{job.delivery && <ConfigRow label="Delivery" value={job.delivery.mode} />}
<ConfigRow label="Created" value={new Date(job.createdAtMs).toLocaleString()} />
</div>
</div>
</div>
{/* Error streak */}
{job.state.consecutiveErrors && job.state.consecutiveErrors > 0 && (
<div
className="rounded-2xl p-4 mb-6"
style={{
background: "color-mix(in srgb, var(--color-error, #ef4444) 6%, var(--color-surface))",
border: "1px solid color-mix(in srgb, var(--color-error, #ef4444) 18%, transparent)",
}}
>
<div className="flex items-center gap-2 mb-1">
<svg width="16" height="16" viewBox="0 0 24 24" fill="none" stroke="var(--color-error, #ef4444)" strokeWidth="2" strokeLinecap="round" strokeLinejoin="round">
<circle cx="12" cy="12" r="10" /><line x1="12" y1="8" x2="12" y2="12" /><line x1="12" y1="16" x2="12.01" y2="16" />
</svg>
<span className="text-sm font-medium" style={{ color: "var(--color-error, #ef4444)" }}>
{job.state.consecutiveErrors} consecutive error{job.state.consecutiveErrors > 1 ? "s" : ""}
</span>
</div>
{job.state.lastError && (
<p className="text-xs font-mono mt-1" style={{ color: "var(--color-error, #ef4444)", opacity: 0.8 }}>
{job.state.lastError}
</p>
)}
</div>
)}
{/* Run history */}
<div>
<h2
className="text-sm font-medium uppercase tracking-wider mb-3"
style={{ color: "var(--color-text-muted)" }}
>
Run History
</h2>
{loadingRuns ? (
<div className="flex items-center justify-center p-8">
<div
className="w-5 h-5 border-2 rounded-full animate-spin"
style={{ borderColor: "var(--color-border)", borderTopColor: "var(--color-accent)" }}
/>
</div>
) : runs.length === 0 ? (
<div
className="p-8 text-center rounded-2xl"
style={{
background: "var(--color-surface)",
border: "1px solid var(--color-border)",
}}
>
<p className="text-sm" style={{ color: "var(--color-text-muted)" }}>
No runs recorded yet.
</p>
</div>
) : (
<div className="space-y-2">
{runs.toReversed().map((run) => (
<RunCard
key={`${run.ts}-${run.jobId}`}
run={run}
isExpanded={expandedRunTs === run.ts}
onToggle={() => setExpandedRunTs(expandedRunTs === run.ts ? null : run.ts)}
/>
))}
</div>
)}
</div>
</div>
);
}
/* ─── Next run countdown card ─── */
function NextRunCard({ job }: { job: CronJob }) {
const [now, setNow] = useState(Date.now());
useEffect(() => {
const id = setInterval(() => setNow(Date.now()), 1000);
return () => clearInterval(id);
}, []);
const nextMs = job.state.nextRunAtMs;
const isRunning = !!job.state.runningAtMs;
return (
<div
className="rounded-2xl p-4 flex flex-col justify-center"
style={{
background: "var(--color-surface)",
border: "1px solid var(--color-border)",
}}
>
<h3 className="text-xs font-medium uppercase tracking-wider mb-2" style={{ color: "var(--color-text-muted)" }}>
{isRunning ? "Currently Running" : "Next Run"}
</h3>
{isRunning ? (
<div className="flex items-center gap-2">
<span
className="w-2 h-2 rounded-full animate-pulse"
style={{ background: "var(--color-accent)" }}
/>
<span className="text-2xl font-semibold" style={{ color: "var(--color-accent)" }}>
Running now
</span>
</div>
) : nextMs ? (
<>
<div className="text-3xl font-semibold mb-1" style={{ color: "var(--color-text)" }}>
{nextMs > now ? formatCountdown(nextMs - now) : "overdue"}
</div>
<div className="text-xs" style={{ color: "var(--color-text-muted)" }}>
{new Date(nextMs).toLocaleString()}
</div>
</>
) : (
<div className="text-lg" style={{ color: "var(--color-text-muted)" }}>
{job.enabled ? "Not scheduled" : "Disabled"}
</div>
)}
</div>
);
}
/* ─── Run card ─── */
function RunCard({
run,
isExpanded,
onToggle,
}: {
run: CronRunLogEntry;
isExpanded: boolean;
onToggle: () => void;
}) {
const statusColor = run.status === "ok"
? "var(--color-success, #22c55e)"
: run.status === "error"
? "var(--color-error, #ef4444)"
: "var(--color-warning, #f59e0b)";
return (
<div
className="rounded-xl overflow-hidden"
style={{
background: "var(--color-surface)",
border: "1px solid var(--color-border)",
}}
>
{/* Run header - clickable */}
<button
type="button"
onClick={onToggle}
className="w-full flex items-center gap-3 px-4 py-3 text-left cursor-pointer transition-colors"
onMouseEnter={(e) => { (e.currentTarget as HTMLElement).style.background = "var(--color-surface-hover)"; }}
onMouseLeave={(e) => { (e.currentTarget as HTMLElement).style.background = "transparent"; }}
>
{/* Status dot */}
<span
className="w-2.5 h-2.5 rounded-full flex-shrink-0"
style={{ background: statusColor }}
/>
{/* Timestamp */}
<span className="text-sm" style={{ color: "var(--color-text)" }}>
{new Date(run.ts).toLocaleString()}
</span>
{/* Status badge */}
<span
className="text-xs px-2 py-0.5 rounded-full"
style={{
background: `color-mix(in srgb, ${statusColor} 12%, transparent)`,
color: statusColor,
}}
>
{run.status ?? "unknown"}
</span>
{/* Duration */}
{run.durationMs != null && (
<span className="text-xs" style={{ color: "var(--color-text-muted)" }}>
{formatDuration(run.durationMs)}
</span>
)}
{/* Summary */}
{run.summary && (
<span className="text-xs truncate flex-1 min-w-0" style={{ color: "var(--color-text-muted)" }}>
{run.summary}
</span>
)}
{/* Has session indicator */}
{run.sessionId && (
<span className="text-[10px] px-1.5 py-0.5 rounded" style={{ background: "var(--color-surface-hover)", color: "var(--color-text-muted)" }}>
chat
</span>
)}
{/* Chevron */}
<svg
width="14"
height="14"
viewBox="0 0 12 12"
fill="none"
stroke="var(--color-text-muted)"
strokeWidth="1.5"
strokeLinecap="round"
strokeLinejoin="round"
className={`flex-shrink-0 transition-transform duration-200 ${isExpanded ? "" : "-rotate-90"}`}
>
<path d="M3 4.5L6 7.5L9 4.5" />
</svg>
</button>
{/* Expanded content */}
{isExpanded && (
<div
className="px-4 pb-4"
style={{ borderTop: "1px solid var(--color-border)" }}
>
{/* Error message */}
{run.error && (
<div
className="mt-3 text-xs font-mono rounded-lg px-3 py-2"
style={{
color: "var(--color-error, #ef4444)",
background: "color-mix(in srgb, var(--color-error, #ef4444) 6%, var(--color-surface))",
}}
>
{run.error}
</div>
)}
{/* Summary */}
{run.summary && (
<div className="mt-3 text-sm" style={{ color: "var(--color-text)" }}>
{run.summary}
</div>
)}
{/* Session transcript */}
{run.sessionId ? (
<div className="mt-4">
<CronRunChat sessionId={run.sessionId} />
</div>
) : (
<div className="mt-3 text-xs" style={{ color: "var(--color-text-muted)" }}>
No session transcript available for this run.
</div>
)}
</div>
)}
</div>
);
}
/* ─── Subcomponents ─── */
function StatusBadge({ status }: { status: string }) {
const color = status === "ok"
? "var(--color-success, #22c55e)"
: status === "running"
? "var(--color-accent)"
: status === "error"
? "var(--color-error, #ef4444)"
: "var(--color-text-muted)";
return (
<span
className="inline-flex items-center gap-1.5 text-xs px-2.5 py-1 rounded-full"
style={{
background: `color-mix(in srgb, ${color} 12%, transparent)`,
color,
}}
>
{status === "running" && (
<span className="w-1.5 h-1.5 rounded-full animate-pulse" style={{ background: color }} />
)}
{status}
</span>
);
}
function ConfigRow({ label, value }: { label: string; value: string }) {
return (
<div className="flex items-start gap-2">
<span className="text-xs font-medium w-20 flex-shrink-0" style={{ color: "var(--color-text-muted)" }}>
{label}
</span>
<span className="text-xs break-all" style={{ color: "var(--color-text)" }}>
{value}
</span>
</div>
);
}

View File

@ -0,0 +1,366 @@
"use client";
import { useEffect, useState, useCallback } from "react";
import ReactMarkdown from "react-markdown";
import remarkGfm from "remark-gfm";
import type { SessionMessage, SessionMessagePart, CronRunSessionResponse } from "../../types/cron";
/* ─── Main component ─── */
export function CronRunChat({ sessionId }: { sessionId: string }) {
const [messages, setMessages] = useState<SessionMessage[]>([]);
const [loading, setLoading] = useState(true);
const [error, setError] = useState<string | null>(null);
const fetchSession = useCallback(async () => {
try {
const res = await fetch(`/api/cron/runs/${encodeURIComponent(sessionId)}`);
if (!res.ok) {
setError(res.status === 404 ? "Session transcript not found" : "Failed to load session");
return;
}
const data: CronRunSessionResponse = await res.json();
setMessages(data.messages ?? []);
} catch {
setError("Failed to load session");
} finally {
setLoading(false);
}
}, [sessionId]);
useEffect(() => {
fetchSession();
}, [fetchSession]);
if (loading) {
return (
<div className="flex items-center gap-2 py-4">
<div
className="w-4 h-4 border-[1.5px] rounded-full animate-spin"
style={{ borderColor: "var(--color-border)", borderTopColor: "var(--color-accent)" }}
/>
<span className="text-xs" style={{ color: "var(--color-text-muted)" }}>Loading session transcript...</span>
</div>
);
}
if (error) {
return (
<div
className="text-xs rounded-lg px-3 py-2"
style={{ color: "var(--color-text-muted)", background: "var(--color-surface-hover)" }}
>
{error}
</div>
);
}
if (messages.length === 0) {
return (
<div className="text-xs py-2" style={{ color: "var(--color-text-muted)" }}>
Empty session transcript.
</div>
);
}
return (
<div className="space-y-3">
<div className="text-[11px] uppercase tracking-wider font-medium mb-2" style={{ color: "var(--color-text-muted)" }}>
Session Transcript
</div>
{messages.map((msg) => (
<CronChatMessage key={msg.id} message={msg} />
))}
</div>
);
}
/* ─── Message rendering ─── */
function CronChatMessage({ message }: { message: SessionMessage }) {
const isUser = message.role === "user";
const isSystem = message.role === "system";
// Group parts into segments
const segments = groupPartsIntoSegments(message.parts);
if (isSystem) {
const textContent = message.parts
.filter((p): p is { type: "text"; text: string } => p.type === "text")
.map((p) => p.text)
.join("\n");
return (
<div
className="text-xs rounded-lg px-3 py-2 font-mono"
style={{
background: "var(--color-surface-hover)",
color: "var(--color-text-muted)",
border: "1px dashed var(--color-border)",
}}
>
<span className="font-medium">system:</span> {textContent.slice(0, 500)}
</div>
);
}
if (isUser) {
const textContent = message.parts
.filter((p): p is { type: "text"; text: string } => p.type === "text")
.map((p) => p.text)
.join("\n");
return (
<div className="flex justify-end py-1">
<div
className="max-w-[80%] rounded-2xl rounded-br-sm px-4 py-2.5 text-sm"
style={{
background: "var(--color-user-bubble)",
color: "var(--color-user-bubble-text)",
}}
>
<p className="whitespace-pre-wrap">{textContent}</p>
</div>
</div>
);
}
// Assistant message
return (
<div className="py-2 space-y-2">
{segments.map((segment, idx) => {
if (segment.type === "text") {
return (
<div
key={idx}
className="chat-prose text-sm"
style={{ color: "var(--color-text)" }}
>
<ReactMarkdown remarkPlugins={[remarkGfm]}>
{segment.text}
</ReactMarkdown>
</div>
);
}
if (segment.type === "thinking") {
return <ThinkingBlock key={idx} text={segment.thinking} />;
}
if (segment.type === "tool-group") {
return <ToolGroup key={idx} tools={segment.tools} />;
}
return null;
})}
</div>
);
}
/* ─── Part grouping ─── */
type ChatSegment =
| { type: "text"; text: string }
| { type: "thinking"; thinking: string }
| { type: "tool-group"; tools: Array<SessionMessagePart & { type: "tool-call" }> };
function groupPartsIntoSegments(parts: SessionMessagePart[]): ChatSegment[] {
const segments: ChatSegment[] = [];
let toolBuffer: Array<SessionMessagePart & { type: "tool-call" }> = [];
const flushTools = () => {
if (toolBuffer.length > 0) {
segments.push({ type: "tool-group", tools: [...toolBuffer] });
toolBuffer = [];
}
};
for (const part of parts) {
if (part.type === "text") {
flushTools();
segments.push({ type: "text", text: part.text });
} else if (part.type === "thinking") {
flushTools();
segments.push({ type: "thinking", thinking: part.thinking });
} else if (part.type === "tool-call") {
toolBuffer.push(part as SessionMessagePart & { type: "tool-call" });
}
}
flushTools();
return segments;
}
/* ─── Thinking block (always expanded for historical runs) ─── */
function ThinkingBlock({ text }: { text: string }) {
const [expanded, setExpanded] = useState(true);
const isLong = text.length > 600;
return (
<div className="my-2">
<button
type="button"
onClick={() => setExpanded((v) => !v)}
className="flex items-center gap-2 py-1 text-[13px] cursor-pointer"
style={{ color: "var(--color-text-muted)" }}
>
<svg
width="16" height="16" viewBox="0 0 24 24" fill="none"
stroke="currentColor" strokeWidth="2" strokeLinecap="round" strokeLinejoin="round"
className="opacity-60"
>
<path d="M12 2a7 7 0 0 0-7 7c0 2.38 1.19 4.47 3 5.74V17a2 2 0 0 0 2 2h4a2 2 0 0 0 2-2v-2.26c1.81-1.27 3-3.36 3-5.74a7 7 0 0 0-7-7z" />
<path d="M10 21h4" />
</svg>
<span className="font-medium">Thinking</span>
<svg
width="12" height="12" viewBox="0 0 12 12" fill="none"
stroke="currentColor" strokeWidth="1.5" strokeLinecap="round" strokeLinejoin="round"
className={`transition-transform duration-200 ${expanded ? "" : "-rotate-90"}`}
>
<path d="M3 4.5L6 7.5L9 4.5" />
</svg>
</button>
{expanded && (
<div
className={`text-[13px] whitespace-pre-wrap leading-relaxed pl-6 ${isLong && !expanded ? "max-h-24 overflow-hidden" : ""}`}
style={{ color: "var(--color-text-secondary)" }}
>
{text}
</div>
)}
</div>
);
}
/* ─── Tool group ─── */
function ToolGroup({ tools }: { tools: Array<SessionMessagePart & { type: "tool-call" }> }) {
return (
<div className="my-2 relative">
{/* Timeline connector */}
<div
className="absolute w-px"
style={{ left: 9, top: 8, bottom: 8, background: "var(--color-border)" }}
/>
<div className="space-y-1">
{tools.map((tool) => (
<ToolCallStep key={tool.toolCallId} tool={tool} />
))}
</div>
</div>
);
}
/* ─── Tool call step ─── */
function ToolCallStep({ tool }: { tool: SessionMessagePart & { type: "tool-call" } }) {
const [showOutput, setShowOutput] = useState(false);
const label = buildToolLabel(tool.toolName, tool.args);
return (
<div className="flex items-start gap-2.5 py-1">
<div
className="relative z-10 flex-shrink-0 w-5 h-5 mt-0.5 flex items-center justify-center rounded-full"
style={{ background: "var(--color-bg)" }}
>
<ToolIcon toolName={tool.toolName} />
</div>
<div className="flex-1 min-w-0">
<div className="text-[13px] leading-snug" style={{ color: "var(--color-text-secondary)" }}>
{label}
</div>
{tool.output && (
<div className="mt-0.5">
<button
type="button"
onClick={() => setShowOutput((v) => !v)}
className="text-[11px] hover:underline cursor-pointer"
style={{ color: "var(--color-accent)" }}
>
{showOutput ? "Hide output" : "Show output"}
</button>
{showOutput && (
<pre
className="mt-1 text-[11px] font-mono rounded-lg px-2.5 py-2 overflow-x-auto whitespace-pre-wrap break-all max-h-48 overflow-y-auto leading-relaxed"
style={{ color: "var(--color-text-muted)", background: "var(--color-bg)" }}
>
{tool.output.length > 3000 ? tool.output.slice(0, 3000) + "\n..." : tool.output}
</pre>
)}
</div>
)}
</div>
</div>
);
}
/* ─── Tool label builder ─── */
function buildToolLabel(toolName: string, args?: unknown): string {
const a = args as Record<string, unknown> | undefined;
const strVal = (key: string) => {
const v = a?.[key];
return typeof v === "string" && v.length > 0 ? v : null;
};
const n = toolName.toLowerCase().replace(/[_-]/g, "");
if (["websearch", "search", "googlesearch"].some((k) => n.includes(k))) {
const q = strVal("query") ?? strVal("search_query") ?? strVal("q");
return q ? `Searching: ${q.slice(0, 80)}` : "Searching...";
}
if (["fetchurl", "fetch", "webfetch"].some((k) => n.includes(k))) {
const u = strVal("url") ?? strVal("path");
return u ? `Fetching: ${u.slice(0, 60)}` : "Fetching page";
}
if (["read", "readfile", "getfile"].some((k) => n.includes(k))) {
const p = strVal("path") ?? strVal("file");
return p ? `Reading: ${p.split("/").pop()}` : "Reading file";
}
if (["bash", "shell", "execute", "exec", "terminal"].some((k) => n.includes(k))) {
const cmd = strVal("command") ?? strVal("cmd");
return cmd ? `Running: ${cmd.slice(0, 60)}` : "Running command";
}
if (["write", "create", "edit", "str_replace", "save"].some((k) => n.includes(k))) {
const p = strVal("path") ?? strVal("file");
return p ? `Editing: ${p.split("/").pop()}` : "Editing file";
}
return toolName.replace(/_/g, " ").replace(/\b\w/g, (c) => c.toUpperCase());
}
/* ─── Tool icon ─── */
function ToolIcon({ toolName }: { toolName: string }) {
const color = "var(--color-text-muted)";
const n = toolName.toLowerCase().replace(/[_-]/g, "");
if (["search", "websearch"].some((k) => n.includes(k))) {
return (
<svg width="14" height="14" viewBox="0 0 24 24" fill="none" stroke={color} strokeWidth="2" strokeLinecap="round" strokeLinejoin="round">
<circle cx="11" cy="11" r="8" /><path d="m21 21-4.3-4.3" />
</svg>
);
}
if (["bash", "shell", "exec", "terminal"].some((k) => n.includes(k))) {
return (
<svg width="14" height="14" viewBox="0 0 24 24" fill="none" stroke={color} strokeWidth="2" strokeLinecap="round" strokeLinejoin="round">
<polyline points="4 17 10 11 4 5" /><line x1="12" x2="20" y1="19" y2="19" />
</svg>
);
}
if (["write", "edit", "create", "save"].some((k) => n.includes(k))) {
return (
<svg width="14" height="14" viewBox="0 0 24 24" fill="none" stroke={color} strokeWidth="2" strokeLinecap="round" strokeLinejoin="round">
<path d="M12 3H5a2 2 0 0 0-2 2v14a2 2 0 0 0 2 2h14a2 2 0 0 0 2-2v-7" />
<path d="M18.375 2.625a1 1 0 0 1 3 3l-9.013 9.014a2 2 0 0 1-.853.505l-2.873.84a.5.5 0 0 1-.62-.62l.84-2.873a2 2 0 0 1 .506-.852z" />
</svg>
);
}
// Default: file/read icon
return (
<svg width="14" height="14" viewBox="0 0 24 24" fill="none" stroke={color} strokeWidth="2" strokeLinecap="round" strokeLinejoin="round">
<path d="M15 2H6a2 2 0 0 0-2 2v16a2 2 0 0 0 2 2h12a2 2 0 0 0 2-2V7Z" />
<path d="M14 2v4a2 2 0 0 0 2 2h4" />
</svg>
);
}

116
apps/web/app/types/cron.ts Normal file
View File

@ -0,0 +1,116 @@
// Client-side types mirroring src/cron/types.ts and src/cron/run-log.ts
// Stripped of server-only imports for use in the web UI.
export type CronSchedule =
| { kind: "at"; at: string }
| { kind: "every"; everyMs: number; anchorMs?: number }
| { kind: "cron"; expr: string; tz?: string };
export type CronSessionTarget = "main" | "isolated";
export type CronWakeMode = "next-heartbeat" | "now";
export type CronDeliveryMode = "none" | "announce";
export type CronDelivery = {
mode: CronDeliveryMode;
channel?: string;
to?: string;
bestEffort?: boolean;
};
export type CronPayload =
| { kind: "systemEvent"; text: string }
| {
kind: "agentTurn";
message: string;
model?: string;
thinking?: string;
timeoutSeconds?: number;
deliver?: boolean;
channel?: string;
to?: string;
};
export type CronJobState = {
nextRunAtMs?: number;
runningAtMs?: number;
lastRunAtMs?: number;
lastStatus?: "ok" | "error" | "skipped";
lastError?: string;
lastDurationMs?: number;
consecutiveErrors?: number;
scheduleErrorCount?: number;
};
export type CronJob = {
id: string;
agentId?: string;
name: string;
description?: string;
enabled: boolean;
deleteAfterRun?: boolean;
createdAtMs: number;
updatedAtMs: number;
schedule: CronSchedule;
sessionTarget: CronSessionTarget;
wakeMode: CronWakeMode;
payload: CronPayload;
delivery?: CronDelivery;
state: CronJobState;
};
export type CronStoreFile = {
version: 1;
jobs: CronJob[];
};
export type CronRunLogEntry = {
ts: number;
jobId: string;
action: "finished";
status?: "ok" | "error" | "skipped";
error?: string;
summary?: string;
sessionId?: string;
sessionKey?: string;
runAtMs?: number;
durationMs?: number;
nextRunAtMs?: number;
};
export type HeartbeatInfo = {
intervalMs: number;
nextDueEstimateMs: number | null;
};
export type CronStatusInfo = {
enabled: boolean;
nextWakeAtMs: number | null;
};
export type CronJobsResponse = {
jobs: CronJob[];
heartbeat: HeartbeatInfo;
cronStatus: CronStatusInfo;
};
export type CronRunsResponse = {
entries: CronRunLogEntry[];
};
export type SessionMessagePart =
| { type: "text"; text: string }
| { type: "thinking"; thinking: string }
| { type: "tool-call"; toolName: string; toolCallId: string; args?: unknown; output?: string };
export type SessionMessage = {
id: string;
role: "user" | "assistant" | "system";
parts: SessionMessagePart[];
timestamp: string;
};
export type CronRunSessionResponse = {
sessionId: string;
messages: SessionMessage[];
};

View File

@ -18,6 +18,9 @@ import { ChatPanel, type ChatPanelHandle } from "../components/chat-panel";
import { EntryDetailModal } from "../components/workspace/entry-detail-modal";
import { useSearchIndex } from "@/lib/search-index";
import { parseWorkspaceLink, isWorkspaceLink } from "@/lib/workspace-links";
import { CronDashboard } from "../components/cron/cron-dashboard";
import { CronJobDetail } from "../components/cron/cron-job-detail";
import type { CronJob, CronJobsResponse } from "../types/cron";
// --- Types ---
@ -82,7 +85,9 @@ type ContentState =
| { kind: "media"; url: string; mediaType: MediaType; filename: string; filePath: string }
| { kind: "database"; dbPath: string; filename: string }
| { kind: "report"; reportPath: string; filename: string }
| { kind: "directory"; node: TreeNode };
| { kind: "directory"; node: TreeNode }
| { kind: "cron-dashboard" }
| { kind: "cron-job"; jobId: string; job: CronJob };
type WebSession = {
id: string;
@ -208,15 +213,20 @@ function WorkspacePageInner() {
const [sessions, setSessions] = useState<WebSession[]>([]);
const [sidebarRefreshKey, setSidebarRefreshKey] = useState(0);
// Cron jobs state
const [cronJobs, setCronJobs] = useState<CronJob[]>([]);
// Entry detail modal state
const [entryModal, setEntryModal] = useState<{
objectName: string;
entryId: string;
} | null>(null);
// Derive file context for chat sidebar directly from activePath (stable across loading)
// Derive file context for chat sidebar directly from activePath (stable across loading).
// Exclude reserved virtual paths (~chats, ~cron, etc.) where file-scoped chat is irrelevant.
const fileContext = useMemo(() => {
if (!activePath) {return undefined;}
if (isVirtualPath(activePath)) {return undefined;}
const filename = activePath.split("/").pop() || activePath;
return { path: activePath, filename };
}, [activePath]);
@ -269,6 +279,23 @@ function WorkspacePageInner() {
setSidebarRefreshKey((k) => k + 1);
}, []);
// Fetch cron jobs for sidebar
const fetchCronJobs = useCallback(async () => {
try {
const res = await fetch("/api/cron/jobs");
const data: CronJobsResponse = await res.json();
setCronJobs(data.jobs ?? []);
} catch {
// ignore - cron might not be configured
}
}, []);
useEffect(() => {
fetchCronJobs();
const id = setInterval(fetchCronJobs, 30_000);
return () => clearInterval(id);
}, [fetchCronJobs]);
// Load content when path changes
const loadContent = useCallback(
async (node: TreeNode) => {
@ -337,7 +364,7 @@ function WorkspacePageInner() {
setContent({ kind: "none" });
setActiveSessionId(sessionId);
chatRef.current?.loadSession(sessionId);
router.replace("/workspace", { scroll: false });
// URL is synced by the activeSessionId effect
return;
}
// Clicking the Chats folder itself opens a new chat
@ -348,12 +375,30 @@ function WorkspacePageInner() {
router.replace("/workspace", { scroll: false });
return;
}
// Intercept cron job item clicks
if (node.path.startsWith("~cron/")) {
const jobId = node.path.slice("~cron/".length);
const job = cronJobs.find((j) => j.id === jobId);
if (job) {
setActivePath(node.path);
setContent({ kind: "cron-job", jobId, job });
router.replace("/workspace", { scroll: false });
return;
}
}
// Clicking the Cron folder itself opens the dashboard
if (node.path === "~cron") {
setActivePath(node.path);
setContent({ kind: "cron-dashboard" });
router.replace("/workspace", { scroll: false });
return;
}
loadContent(node);
},
[loadContent, router],
[loadContent, router, cronJobs],
);
// Build the enhanced tree: real tree + Chats virtual folder at the bottom
// Build the enhanced tree: real tree + Chats + Cron virtual folders at the bottom
const enhancedTree = useMemo(() => {
const chatChildren: TreeNode[] = sessions.map((s) => ({
name: s.title || "Untitled chat",
@ -370,21 +415,56 @@ function WorkspacePageInner() {
children: chatChildren.length > 0 ? chatChildren : undefined,
};
return [...tree, chatsFolder];
}, [tree, sessions]);
const cronStatusIcon = (job: CronJob) => {
if (!job.enabled) {return "\u25CB";} // circle outline
if (job.state.runningAtMs) {return "\u25CF";} // filled circle
if (job.state.lastStatus === "error") {return "\u25C6";} // diamond
if (job.state.lastStatus === "ok") {return "\u2713";} // check
return "\u25CB";
};
// Sync URL bar when activePath changes
const cronChildren: TreeNode[] = cronJobs.map((j) => ({
name: `${cronStatusIcon(j)} ${j.name}`,
path: `~cron/${j.id}`,
type: "file" as const,
virtual: true,
}));
const cronFolder: TreeNode = {
name: "Cron",
path: "~cron",
type: "folder",
virtual: true,
children: cronChildren.length > 0 ? cronChildren : undefined,
};
return [...tree, chatsFolder, cronFolder];
}, [tree, sessions, cronJobs]);
// Sync URL bar with active content / chat state.
// Uses window.location instead of searchParams in the comparison to
// avoid a circular dependency (searchParams updates → effect fires →
// router.replace → searchParams updates → …).
useEffect(() => {
const currentPath = searchParams.get("path");
const currentEntry = searchParams.get("entry");
const current = new URLSearchParams(window.location.search);
if (activePath && activePath !== currentPath) {
const params = new URLSearchParams();
params.set("path", activePath);
if (currentEntry) {params.set("entry", currentEntry);}
router.replace(`/workspace?${params.toString()}`, { scroll: false });
if (activePath) {
// File / content mode — path takes priority over chat.
if (current.get("path") !== activePath || current.has("chat")) {
const params = new URLSearchParams();
params.set("path", activePath);
const entry = current.get("entry");
if (entry) {params.set("entry", entry);}
router.replace(`/workspace?${params.toString()}`, { scroll: false });
}
} else if (activeSessionId) {
// Chat mode — no file selected.
if (current.get("chat") !== activeSessionId || current.has("path")) {
router.replace(`/workspace?chat=${encodeURIComponent(activeSessionId)}`, { scroll: false });
}
}
}, [activePath, searchParams, router]);
// eslint-disable-next-line react-hooks/exhaustive-deps -- intentionally excludes searchParams to avoid infinite loop
}, [activePath, activeSessionId, router]);
// Open entry modal handler
const handleOpenEntry = useCallback(
@ -406,12 +486,13 @@ function WorkspacePageInner() {
router.replace(qs ? `/workspace?${qs}` : "/workspace", { scroll: false });
}, [searchParams, router]);
// Auto-navigate to path from URL query param after tree loads
// Auto-navigate to path/chat from URL query params after tree loads
useEffect(() => {
if (initialPathHandled.current || treeLoading || tree.length === 0) {return;}
const pathParam = searchParams.get("path");
const entryParam = searchParams.get("entry");
const chatParam = searchParams.get("chat");
if (pathParam) {
const node = resolveNode(tree, pathParam);
@ -419,6 +500,13 @@ function WorkspacePageInner() {
initialPathHandled.current = true;
loadContent(node);
}
} else if (chatParam) {
// Restore the active chat session from URL
initialPathHandled.current = true;
setActiveSessionId(chatParam);
setActivePath(null);
setContent({ kind: "none" });
chatRef.current?.loadSession(chatParam);
}
// Also open entry modal from URL if present
@ -529,6 +617,22 @@ function WorkspacePageInner() {
[handleEditorNavigate],
);
// Cron navigation handlers
const handleSelectCronJob = useCallback((jobId: string) => {
const job = cronJobs.find((j) => j.id === jobId);
if (job) {
setActivePath(`~cron/${jobId}`);
setContent({ kind: "cron-job", jobId, job });
router.replace("/workspace", { scroll: false });
}
}, [cronJobs, router]);
const handleBackToCronDashboard = useCallback(() => {
setActivePath("~cron");
setContent({ kind: "cron-dashboard" });
router.replace("/workspace", { scroll: false });
}, [router]);
// Whether to show the main ChatPanel (no file/content selected)
const showMainChat = !activePath || content.kind === "none";
@ -574,21 +678,23 @@ function WorkspacePageInner() {
<path d="m12 19-7-7 7-7" /><path d="M19 12H5" />
</svg>
</button>
{/* Chat sidebar toggle */}
<button
type="button"
onClick={() => setShowChatSidebar((v) => !v)}
className="p-1.5 rounded-lg flex-shrink-0"
style={{
color: showChatSidebar ? "var(--color-accent)" : "var(--color-text-muted)",
background: showChatSidebar ? "var(--color-accent-light)" : "transparent",
}}
title={showChatSidebar ? "Hide chat" : "Chat about this file"}
>
<svg width="16" height="16" viewBox="0 0 24 24" fill="none" stroke="currentColor" strokeWidth="2" strokeLinecap="round" strokeLinejoin="round">
<path d="M21 15a2 2 0 0 1-2 2H7l-4 4V5a2 2 0 0 1 2-2h14a2 2 0 0 1 2 2z" />
</svg>
</button>
{/* Chat sidebar toggle (hidden for reserved/virtual paths and directories) */}
{fileContext && content.kind !== "directory" && (
<button
type="button"
onClick={() => setShowChatSidebar((v) => !v)}
className="p-1.5 rounded-lg flex-shrink-0"
style={{
color: showChatSidebar ? "var(--color-accent)" : "var(--color-text-muted)",
background: showChatSidebar ? "var(--color-accent-light)" : "transparent",
}}
title={showChatSidebar ? "Hide chat" : "Chat about this file"}
>
<svg width="16" height="16" viewBox="0 0 24 24" fill="none" stroke="currentColor" strokeWidth="2" strokeLinecap="round" strokeLinejoin="round">
<path d="M21 15a2 2 0 0 1-2 2H7l-4 4V5a2 2 0 0 1 2-2h14a2 2 0 0 1 2 2z" />
</svg>
</button>
)}
</div>
</div>
)}
@ -623,11 +729,13 @@ function WorkspacePageInner() {
onNavigate={handleEditorNavigate}
onOpenEntry={handleOpenEntry}
searchFn={searchIndex}
onSelectCronJob={handleSelectCronJob}
onBackToCronDashboard={handleBackToCronDashboard}
/>
</div>
{/* Chat sidebar (file-scoped) */}
{fileContext && showChatSidebar && (
{/* Chat sidebar (file-scoped) — hidden for directories and reserved paths */}
{fileContext && showChatSidebar && content.kind !== "directory" && (
<aside
className="flex-shrink-0 border-l"
style={{
@ -682,6 +790,8 @@ function ContentRenderer({
onNavigate,
onOpenEntry,
searchFn,
onSelectCronJob,
onBackToCronDashboard,
}: {
content: ContentState;
workspaceExists: boolean;
@ -695,6 +805,8 @@ function ContentRenderer({
onNavigate: (href: string) => void;
onOpenEntry: (objectName: string, entryId: string) => void;
searchFn: (query: string, limit?: number) => import("@/lib/search-index").SearchIndexItem[];
onSelectCronJob: (jobId: string) => void;
onBackToCronDashboard: () => void;
}) {
switch (content.kind) {
case "loading":
@ -776,6 +888,21 @@ function ContentRenderer({
/>
);
case "cron-dashboard":
return (
<CronDashboard
onSelectJob={onSelectCronJob}
/>
);
case "cron-job":
return (
<CronJobDetail
job={content.job}
onBack={onBackToCronDashboard}
/>
);
case "none":
default:
if (tree.length === 0) {

749
apps/web/lib/active-runs.ts Normal file
View File

@ -0,0 +1,749 @@
/**
* Server-side singleton that manages agent child processes independently of
* HTTP connections. Buffers SSE events, fans out to subscribers, and
* persists assistant messages incrementally to disk.
*
* This decouples agent lifecycles from request lifecycles so:
* - Streams survive page reloads (process keeps running).
* - Messages are written to persistent sessions as they arrive.
* - New HTTP connections can re-attach to a running stream.
*/
import { type ChildProcess } from "node:child_process";
import { createInterface } from "node:readline";
import { join } from "node:path";
import {
readFileSync,
writeFileSync,
existsSync,
mkdirSync,
} from "node:fs";
import { homedir } from "node:os";
import {
type AgentEvent,
spawnAgentProcess,
extractToolResult,
buildToolOutput,
parseAgentErrorMessage,
parseErrorBody,
parseErrorFromStderr,
} from "./agent-runner";
// ── Types ──
/** An SSE event object in the AI SDK v6 data stream wire format. */
export type SseEvent = Record<string, unknown> & { type: string };
/** Subscriber callback. Receives SSE events, or `null` when the run completes. */
export type RunSubscriber = (event: SseEvent | null) => void;
type AccumulatedMessage = {
id: string;
role: "assistant";
textParts: string[];
reasoningParts: string[];
toolCalls: Map<
string,
{
toolName: string;
args: Record<string, unknown>;
output?: Record<string, unknown>;
errorText?: string;
}
>;
};
export type ActiveRun = {
sessionId: string;
childProcess: ChildProcess;
eventBuffer: SseEvent[];
subscribers: Set<RunSubscriber>;
accumulated: AccumulatedMessage;
status: "running" | "completed" | "error";
startedAt: number;
exitCode: number | null;
abortController: AbortController;
/** @internal debounced persistence timer */
_persistTimer: ReturnType<typeof setTimeout> | null;
/** @internal last time persistence was flushed */
_lastPersistedAt: number;
};
// ── Constants ──
const PERSIST_INTERVAL_MS = 2_000;
const CLEANUP_GRACE_MS = 30_000;
const WEB_CHAT_DIR = join(homedir(), ".openclaw", "web-chat");
const INDEX_FILE = join(WEB_CHAT_DIR, "index.json");
// ── Singleton registry ──
const activeRuns = new Map<string, ActiveRun>();
// ── Public API ──
/** Retrieve an active or recently-completed run (within the grace period). */
export function getActiveRun(sessionId: string): ActiveRun | undefined {
return activeRuns.get(sessionId);
}
/** Check whether a *running* (not just completed) run exists for a session. */
export function hasActiveRun(sessionId: string): boolean {
const run = activeRuns.get(sessionId);
return run !== undefined && run.status === "running";
}
/**
* Subscribe to an active run's SSE events.
*
* When `replay` is true (default), all buffered events are replayed first
* (synchronously), then live events follow. If the run already finished,
* the subscriber is called with `null` after the replay.
*
* Returns an unsubscribe function, or `null` if no run exists.
*/
export function subscribeToRun(
sessionId: string,
callback: RunSubscriber,
options?: { replay?: boolean },
): (() => void) | null {
const run = activeRuns.get(sessionId);
if (!run) {return null;}
const replay = options?.replay ?? true;
// Replay buffered events synchronously (safe — no event-loop yield).
if (replay) {
for (const event of run.eventBuffer) {
callback(event);
}
}
// If the run already finished, signal completion immediately.
if (run.status !== "running") {
callback(null);
return () => {};
}
run.subscribers.add(callback);
return () => {
run.subscribers.delete(callback);
};
}
/** Abort a running agent. Returns true if a run was actually aborted. */
export function abortRun(sessionId: string): boolean {
const run = activeRuns.get(sessionId);
if (!run || run.status !== "running") {return false;}
run.abortController.abort();
run.childProcess.kill("SIGTERM");
return true;
}
/**
* Start a new agent run for the given session.
* Throws if a run is already active for this session.
*/
export function startRun(params: {
sessionId: string;
message: string;
agentSessionId?: string;
}): ActiveRun {
const { sessionId, message, agentSessionId } = params;
const existing = activeRuns.get(sessionId);
if (existing?.status === "running") {
throw new Error("Active run already exists for this session");
}
// Clean up a finished run that's still in the grace period.
if (existing) {cleanupRun(sessionId);}
const abortController = new AbortController();
const child = spawnAgentProcess(message, agentSessionId);
const run: ActiveRun = {
sessionId,
childProcess: child,
eventBuffer: [],
subscribers: new Set(),
accumulated: {
id: `assistant-${sessionId}-${Date.now()}`,
role: "assistant",
textParts: [],
reasoningParts: [],
toolCalls: new Map(),
},
status: "running",
startedAt: Date.now(),
exitCode: null,
abortController,
_persistTimer: null,
_lastPersistedAt: 0,
};
activeRuns.set(sessionId, run);
// Wire abort signal → child process kill.
const onAbort = () => child.kill("SIGTERM");
if (abortController.signal.aborted) {
child.kill("SIGTERM");
} else {
abortController.signal.addEventListener("abort", onAbort, {
once: true,
});
child.on("close", () =>
abortController.signal.removeEventListener("abort", onAbort),
);
}
wireChildProcess(run);
return run;
}
// ── Persistence helpers (called from route to persist user messages) ──
/** Save a user message to the session JSONL (called once at run start). */
export function persistUserMessage(
sessionId: string,
msg: { id: string; content: string; parts?: unknown[] },
): void {
ensureDir();
const filePath = join(WEB_CHAT_DIR, `${sessionId}.jsonl`);
if (!existsSync(filePath)) {writeFileSync(filePath, "");}
const line = JSON.stringify({
id: msg.id,
role: "user",
content: msg.content,
...(msg.parts ? { parts: msg.parts } : {}),
timestamp: new Date().toISOString(),
});
// Avoid duplicates (e.g. retry).
const existing = readFileSync(filePath, "utf-8");
const lines = existing.split("\n").filter((l) => l.trim());
const alreadySaved = lines.some((l) => {
try {
return JSON.parse(l).id === msg.id;
} catch {
return false;
}
});
if (!alreadySaved) {
writeFileSync(filePath, [...lines, line].join("\n") + "\n");
updateIndex(sessionId, { incrementCount: 1 });
}
}
// ── Internals ──
function ensureDir() {
if (!existsSync(WEB_CHAT_DIR)) {
mkdirSync(WEB_CHAT_DIR, { recursive: true });
}
}
function updateIndex(
sessionId: string,
opts: { incrementCount?: number; title?: string },
) {
try {
if (!existsSync(INDEX_FILE)) {return;}
const index = JSON.parse(
readFileSync(INDEX_FILE, "utf-8"),
) as Array<Record<string, unknown>>;
const session = index.find((s) => s.id === sessionId);
if (!session) {return;}
session.updatedAt = Date.now();
if (opts.incrementCount) {
session.messageCount =
((session.messageCount as number) || 0) + opts.incrementCount;
}
if (opts.title) {session.title = opts.title;}
writeFileSync(INDEX_FILE, JSON.stringify(index, null, 2));
} catch {
/* best-effort */
}
}
// ── SSE event generation from child-process JSON lines ──
function wireChildProcess(run: ActiveRun): void {
const child = run.childProcess;
let idCounter = 0;
const nextId = (prefix: string) =>
`${prefix}-${Date.now()}-${++idCounter}`;
let currentTextId = "";
let currentReasoningId = "";
let textStarted = false;
let reasoningStarted = false;
let everSentText = false;
let statusReasoningActive = false;
let agentErrorReported = false;
const stderrChunks: string[] = [];
/** Emit an SSE event: push to buffer + notify all subscribers. */
const emit = (event: SseEvent) => {
run.eventBuffer.push(event);
for (const sub of run.subscribers) {
try {
sub(event);
} catch {
/* ignore subscriber errors */
}
}
schedulePersist(run);
};
const closeReasoning = () => {
if (reasoningStarted) {
emit({ type: "reasoning-end", id: currentReasoningId });
reasoningStarted = false;
statusReasoningActive = false;
}
};
const closeText = () => {
if (textStarted) {
emit({ type: "text-end", id: currentTextId });
textStarted = false;
}
};
const openStatusReasoning = (label: string) => {
closeReasoning();
closeText();
currentReasoningId = nextId("status");
emit({ type: "reasoning-start", id: currentReasoningId });
emit({
type: "reasoning-delta",
id: currentReasoningId,
delta: label,
});
reasoningStarted = true;
statusReasoningActive = true;
};
const emitError = (message: string) => {
closeReasoning();
closeText();
const tid = nextId("text");
emit({ type: "text-start", id: tid });
emit({ type: "text-delta", id: tid, delta: `[error] ${message}` });
emit({ type: "text-end", id: tid });
run.accumulated.textParts.push(`[error] ${message}`);
everSentText = true;
};
// ── Parse stdout JSON lines ──
const rl = createInterface({ input: child.stdout! });
rl.on("line", (line: string) => {
if (!line.trim()) {return;}
let ev: AgentEvent;
try {
ev = JSON.parse(line) as AgentEvent;
} catch {
return;
}
// Lifecycle start
if (
ev.event === "agent" &&
ev.stream === "lifecycle" &&
ev.data?.phase === "start"
) {
openStatusReasoning("Preparing response...");
}
// Thinking / reasoning
if (ev.event === "agent" && ev.stream === "thinking") {
const delta =
typeof ev.data?.delta === "string"
? ev.data.delta
: undefined;
if (delta) {
if (statusReasoningActive) {closeReasoning();}
if (!reasoningStarted) {
currentReasoningId = nextId("reasoning");
emit({
type: "reasoning-start",
id: currentReasoningId,
});
reasoningStarted = true;
}
emit({
type: "reasoning-delta",
id: currentReasoningId,
delta,
});
run.accumulated.reasoningParts.push(delta);
}
}
// Assistant text
if (ev.event === "agent" && ev.stream === "assistant") {
const delta =
typeof ev.data?.delta === "string"
? ev.data.delta
: undefined;
if (delta) {
closeReasoning();
if (!textStarted) {
currentTextId = nextId("text");
emit({ type: "text-start", id: currentTextId });
textStarted = true;
}
everSentText = true;
emit({ type: "text-delta", id: currentTextId, delta });
run.accumulated.textParts.push(delta);
}
// Media URLs
const mediaUrls = ev.data?.mediaUrls;
if (Array.isArray(mediaUrls)) {
for (const url of mediaUrls) {
if (typeof url === "string" && url.trim()) {
closeReasoning();
if (!textStarted) {
currentTextId = nextId("text");
emit({
type: "text-start",
id: currentTextId,
});
textStarted = true;
}
everSentText = true;
const md = `\n![media](${url.trim()})\n`;
emit({
type: "text-delta",
id: currentTextId,
delta: md,
});
run.accumulated.textParts.push(md);
}
}
}
// Agent error inline (stopReason=error)
if (
typeof ev.data?.stopReason === "string" &&
ev.data.stopReason === "error" &&
typeof ev.data?.errorMessage === "string" &&
!agentErrorReported
) {
agentErrorReported = true;
emitError(parseErrorBody(ev.data.errorMessage));
}
}
// Tool events
if (ev.event === "agent" && ev.stream === "tool") {
const phase =
typeof ev.data?.phase === "string"
? ev.data.phase
: undefined;
const toolCallId =
typeof ev.data?.toolCallId === "string"
? ev.data.toolCallId
: "";
const toolName =
typeof ev.data?.name === "string" ? ev.data.name : "";
if (phase === "start") {
closeReasoning();
closeText();
const args =
ev.data?.args && typeof ev.data.args === "object"
? (ev.data.args as Record<string, unknown>)
: {};
emit({ type: "tool-input-start", toolCallId, toolName });
emit({
type: "tool-input-available",
toolCallId,
toolName,
input: args,
});
run.accumulated.toolCalls.set(toolCallId, {
toolName,
args,
});
} else if (phase === "result") {
const isError = ev.data?.isError === true;
const result = extractToolResult(ev.data?.result);
if (isError) {
const errorText =
result?.text ||
(result?.details?.error as string | undefined) ||
"Tool execution failed";
emit({
type: "tool-output-error",
toolCallId,
errorText,
});
const tc = run.accumulated.toolCalls.get(toolCallId);
if (tc) {tc.errorText = errorText;}
} else {
const output = buildToolOutput(result);
emit({
type: "tool-output-available",
toolCallId,
output,
});
const tc = run.accumulated.toolCalls.get(toolCallId);
if (tc) {tc.output = output;}
}
}
}
// Compaction
if (ev.event === "agent" && ev.stream === "compaction") {
const phase =
typeof ev.data?.phase === "string"
? ev.data.phase
: undefined;
if (phase === "start") {
openStatusReasoning("Optimizing session context...");
} else if (phase === "end") {
if (statusReasoningActive) {
if (ev.data?.willRetry === true) {
emit({
type: "reasoning-delta",
id: currentReasoningId,
delta: "\nRetrying with compacted context...",
});
} else {
closeReasoning();
}
}
}
}
// Lifecycle end
if (
ev.event === "agent" &&
ev.stream === "lifecycle" &&
ev.data?.phase === "end"
) {
closeReasoning();
closeText();
}
// Lifecycle error
if (
ev.event === "agent" &&
ev.stream === "lifecycle" &&
ev.data?.phase === "error" &&
!agentErrorReported
) {
const msg = parseAgentErrorMessage(ev.data);
if (msg) {
agentErrorReported = true;
emitError(msg);
}
}
// Top-level error event
if (ev.event === "error" && !agentErrorReported) {
const msg = parseAgentErrorMessage(
ev.data ??
(ev as unknown as Record<string, unknown>),
);
if (msg) {
agentErrorReported = true;
emitError(msg);
}
}
});
// ── Child process exit ──
child.on("close", (code) => {
if (!agentErrorReported && stderrChunks.length > 0) {
const stderr = stderrChunks.join("").trim();
const msg = parseErrorFromStderr(stderr);
if (msg) {
agentErrorReported = true;
emitError(msg);
}
}
closeReasoning();
if (!everSentText) {
const tid = nextId("text");
emit({ type: "text-start", id: tid });
const errMsg =
code !== null && code !== 0
? `[error] Agent exited with code ${code}. Check server logs for details.`
: "[error] No response from agent.";
emit({ type: "text-delta", id: tid, delta: errMsg });
emit({ type: "text-end", id: tid });
run.accumulated.textParts.push(errMsg);
} else {
closeText();
}
run.status = code === 0 || code === null ? "completed" : "error";
run.exitCode = code;
// Final persistence flush (removes _streaming flag).
flushPersistence(run);
// Signal completion to all subscribers.
for (const sub of run.subscribers) {
try {
sub(null);
} catch {
/* ignore */
}
}
run.subscribers.clear();
// Clean up run state after a grace period so reconnections
// within that window still get the buffered events.
setTimeout(() => cleanupRun(run.sessionId), CLEANUP_GRACE_MS);
});
child.on("error", (err) => {
console.error("[active-runs] Child process error:", err);
emitError(`Failed to start agent: ${err.message}`);
run.status = "error";
flushPersistence(run);
for (const sub of run.subscribers) {
try {
sub(null);
} catch {
/* ignore */
}
}
run.subscribers.clear();
setTimeout(() => cleanupRun(run.sessionId), CLEANUP_GRACE_MS);
});
child.stderr?.on("data", (chunk: Buffer) => {
const text = chunk.toString();
stderrChunks.push(text);
console.error("[active-runs stderr]", text);
});
}
// ── Debounced persistence ──
function schedulePersist(run: ActiveRun) {
if (run._persistTimer) {return;}
const elapsed = Date.now() - run._lastPersistedAt;
const delay = Math.max(0, PERSIST_INTERVAL_MS - elapsed);
run._persistTimer = setTimeout(() => {
run._persistTimer = null;
flushPersistence(run);
}, delay);
}
function flushPersistence(run: ActiveRun) {
if (run._persistTimer) {
clearTimeout(run._persistTimer);
run._persistTimer = null;
}
run._lastPersistedAt = Date.now();
const text = run.accumulated.textParts.join("");
if (
!text &&
run.accumulated.toolCalls.size === 0 &&
run.accumulated.reasoningParts.length === 0
) {
return; // Nothing to persist yet.
}
// Build parts array matching the UIMessage format the frontend expects.
const parts: Array<Record<string, unknown>> = [];
if (run.accumulated.reasoningParts.length > 0) {
parts.push({
type: "reasoning",
text: run.accumulated.reasoningParts.join(""),
});
}
for (const [toolCallId, tc] of run.accumulated.toolCalls) {
parts.push({
type: "tool-invocation",
toolCallId,
toolName: tc.toolName,
args: tc.args,
...(tc.output ? { result: tc.output } : {}),
...(tc.errorText ? { errorText: tc.errorText } : {}),
});
}
if (text) {
parts.push({ type: "text", text });
}
const isStillRunning = run.status === "running";
const message: Record<string, unknown> = {
id: run.accumulated.id,
role: "assistant",
content: text,
parts,
timestamp: new Date().toISOString(),
};
if (isStillRunning) {
message._streaming = true;
}
try {
upsertMessage(run.sessionId, message);
} catch (err) {
console.error("[active-runs] Persistence error:", err);
}
}
/**
* Upsert a single message into the session JSONL.
* If a line with the same `id` already exists it is replaced; otherwise appended.
*/
function upsertMessage(
sessionId: string,
message: Record<string, unknown>,
) {
ensureDir();
const fp = join(WEB_CHAT_DIR, `${sessionId}.jsonl`);
if (!existsSync(fp)) {writeFileSync(fp, "");}
const msgId = message.id as string;
const content = readFileSync(fp, "utf-8");
const lines = content.split("\n").filter((l) => l.trim());
let found = false;
const updated = lines.map((line) => {
try {
const parsed = JSON.parse(line);
if (parsed.id === msgId) {
found = true;
return JSON.stringify(message);
}
} catch {
/* keep as-is */
}
return line;
});
if (!found) {
updated.push(JSON.stringify(message));
updateIndex(sessionId, { incrementCount: 1 });
} else {
updateIndex(sessionId, {});
}
writeFileSync(fp, updated.join("\n") + "\n");
}
function cleanupRun(sessionId: string) {
const run = activeRuns.get(sessionId);
if (!run) {return;}
if (run._persistTimer) {clearTimeout(run._persistTimer);}
activeRuns.delete(sessionId);
}

View File

@ -59,11 +59,18 @@ export type AgentCallback = {
* Extract text content from the agent's tool result object.
* The result has `content: Array<{ type: "text", text: string } | ...>` and
* optional `details` (exit codes, file paths, etc.).
*
* Falls back gracefully when the result doesn't follow the standard wrapper:
* - If no `content` array, tries to use the raw object as details directly.
* - If the raw value is a string, treats it as text.
*/
function extractToolResult(
export function extractToolResult(
raw: unknown,
): ToolResult | undefined {
if (!raw || typeof raw !== "object") {return undefined;}
if (!raw) {return undefined;}
// String result — treat the whole thing as text
if (typeof raw === "string") {return { text: raw, details: undefined };}
if (typeof raw !== "object") {return undefined;}
const r = raw as Record<string, unknown>;
// Extract text from content blocks
@ -86,6 +93,13 @@ function extractToolResult(
? (r.details as Record<string, unknown>)
: undefined;
// Fallback: if neither content nor details were found, the raw object
// might BE the tool payload itself (e.g. { query, results, url, ... }).
// Use it as details so buildToolOutput can extract web tool fields.
if (!text && !details && !Array.isArray(r.content)) {
return { text: undefined, details: r };
}
return { text, details };
}
@ -94,6 +108,88 @@ export type RunAgentOptions = {
sessionId?: string;
};
/**
* Spawn an agent child process and return the ChildProcess handle.
* Shared between `runAgent` (legacy callback API) and the ActiveRunManager.
*/
export function spawnAgentProcess(
message: string,
agentSessionId?: string,
): ReturnType<typeof spawn> {
const cwd = process.cwd();
const root = cwd.endsWith(join("apps", "web"))
? join(cwd, "..", "..")
: cwd;
const scriptPath = join(root, "scripts", "run-node.mjs");
const args = [
scriptPath,
"agent",
"--agent",
"main",
"--message",
message,
"--stream-json",
];
if (agentSessionId) {
const sessionKey = `agent:main:subagent:${agentSessionId}`;
args.push("--session-key", sessionKey, "--lane", "subagent");
}
return spawn("node", args, {
cwd: root,
env: { ...process.env },
stdio: ["ignore", "pipe", "pipe"],
});
}
/**
* Build a flat output object from the agent's tool result so the frontend
* can render tool output text, exit codes, etc.
*/
export function buildToolOutput(
result?: ToolResult,
): Record<string, unknown> {
if (!result) {return {};}
const out: Record<string, unknown> = {};
if (result.text) {out.text = result.text;}
if (result.details) {
for (const key of [
"exitCode",
"status",
"durationMs",
"cwd",
"error",
"reason",
// Web tool fields — pass through so the UI can show favicons / domains
"url",
"finalUrl",
"targetUrl",
"query",
"results",
"citations",
]) {
if (result.details[key] !== undefined)
{out[key] = result.details[key];}
}
}
// If we have details but no text, synthesize a text field from the JSON so
// domain-extraction regex in the frontend can find URLs from search results.
if (!out.text && result.details) {
try {
const json = JSON.stringify(result.details);
if (json.length <= 12000) {
out.text = json;
}
} catch {
/* ignore */
}
}
return out;
}
/**
* Spawn the openclaw agent and stream its output.
* Pass an AbortSignal to kill the child process when the caller cancels.
@ -108,50 +204,8 @@ export async function runAgent(
callback: AgentCallback,
options?: RunAgentOptions,
): Promise<void> {
// Get repo root - construct path dynamically at runtime
const cwd = process.cwd();
const root = cwd.endsWith(join("apps", "web"))
? join(cwd, "..", "..")
: cwd;
// Construct script path at runtime to avoid static analysis
const pathParts = ["scripts", "run-node.mjs"];
const scriptPath = join(root, ...pathParts);
return new Promise<void>((resolve) => {
const args = [
scriptPath,
"agent",
"--agent",
"main",
"--message",
message,
"--stream-json",
// Route through the gateway daemon (not --local) so all concurrent
// agent runs share the gateway's lane-based concurrency system.
// The gateway serialises writes per session-key and avoids the
// cross-process file-lock contention that --local causes when
// multiple chat threads run in parallel child processes.
];
// Isolated session for file-scoped subagent chats.
// Uses a proper subagent session key (agent:main:subagent:<id>) so the
// agent runs in the Subagent concurrency lane with its own session
// context, completely independent of the main agent session.
if (options?.sessionId) {
const sessionKey = `agent:main:subagent:${options.sessionId}`;
args.push("--session-key", sessionKey, "--lane", "subagent");
}
const child = spawn(
"node",
args,
{
cwd: root,
env: { ...process.env },
stdio: ["ignore", "pipe", "pipe"],
},
);
const child = spawnAgentProcess(message, options?.sessionId);
// Kill the child process if the caller aborts (e.g. user hit stop).
if (signal) {
@ -170,7 +224,7 @@ export async function runAgent(
const stderrChunks: string[] = [];
let agentErrorReported = false;
const rl = createInterface({ input: child.stdout });
const rl = createInterface({ input: child.stdout! });
rl.on("line", (line: string) => {
if (!line.trim()) {return;}
@ -353,7 +407,7 @@ export async function runAgent(
* Handles various shapes: `{ error: "..." }`, `{ message: "..." }`,
* `{ errorMessage: "402 {...}" }`, etc.
*/
function parseAgentErrorMessage(
export function parseAgentErrorMessage(
data: Record<string, unknown> | undefined,
): string | undefined {
if (!data) {return undefined;}
@ -374,7 +428,7 @@ function parseAgentErrorMessage(
* e.g. `402 {"error":{"message":"Insufficient funds..."}}`.
* Returns a clean, user-readable message.
*/
function parseErrorBody(raw: string): string {
export function parseErrorBody(raw: string): string {
// Try to extract JSON body from "STATUS {json}" pattern
const jsonIdx = raw.indexOf("{");
if (jsonIdx >= 0) {
@ -394,7 +448,7 @@ function parseErrorBody(raw: string): string {
* Extract a meaningful error message from raw stderr output.
* Strips ANSI codes and looks for common error patterns.
*/
function parseErrorFromStderr(stderr: string): string | undefined {
export function parseErrorFromStderr(stderr: string): string | undefined {
if (!stderr) {return undefined;}
// Strip ANSI escape codes

File diff suppressed because one or more lines are too long