diff --git a/.cursor/plans/web_cron_dashboard_d0829ca4.plan.md b/.cursor/plans/web_cron_dashboard_d0829ca4.plan.md new file mode 100644 index 00000000000..3fc19e09145 --- /dev/null +++ b/.cursor/plans/web_cron_dashboard_d0829ca4.plan.md @@ -0,0 +1,214 @@ +--- +name: Web Cron Dashboard +overview: Add a "Cron" virtual folder to the web app sidebar that reads `~/.openclaw/cron/` data, showing all cron jobs, their heartbeat/scheduling status, run history, and full session transcripts with thinking for each run. +todos: + - id: types + content: Create shared cron types in apps/web/app/types/cron.ts + status: pending + - id: api-jobs + content: Create GET /api/cron/jobs route (reads jobs.json + heartbeat info) + status: pending + - id: api-runs + content: Create GET /api/cron/jobs/[jobId]/runs route (reads run log JSONL) + status: pending + - id: api-session + content: Create GET /api/cron/runs/[sessionId] route (reads session transcript JSONL with full thinking/tools) + status: pending + - id: cron-dashboard + content: Build CronDashboard component (heartbeat status, job list, timeline) + status: pending + - id: cron-job-detail + content: Build CronJobDetail component (job config, next run countdown, run history) + status: pending + - id: cron-run-chat + content: Build CronRunChat component (full session transcript with thinking/tools, reusing ChainOfThought patterns) + status: pending + - id: sidebar + content: Add ~cron virtual folder to workspace sidebar in page.tsx (fetch + tree + handleNodeSelect) + status: pending + - id: content-routing + content: Extend content state and main area renderer in workspace/page.tsx for cron-dashboard and cron-job kinds + status: pending +isProject: false +--- + +# Web Cron Dashboard + +## Architecture + +The cron system stores its data at: + +- **Job definitions**: `~/.openclaw/cron/jobs.json` (`CronStoreFile` with `CronJob[]`) +- **Run logs per job**: `~/.openclaw/cron/runs/{jobId}.jsonl` (`CronRunLogEntry` per line) +- **Run session transcripts**: `~/.openclaw/agents/{agentId}/sessions/{sessionId}.jsonl` + +The web app does NOT connect to the gateway directly -- it reads files from disk via Next.js API routes (same pattern as existing `/api/sessions`, `/api/web-sessions`). No gateway WebSocket integration needed. + +```mermaid +flowchart LR + subgraph disk [Disk Storage] + JobsJSON["~/.openclaw/cron/jobs.json"] + RunLogs["~/.openclaw/cron/runs/{jobId}.jsonl"] + Sessions["~/.openclaw/agents/{agentId}/sessions/{sessionId}.jsonl"] + end + subgraph api [Next.js API Routes] + CronJobsAPI["/api/cron/jobs"] + CronRunsAPI["/api/cron/jobs/{jobId}/runs"] + CronRunSessionAPI["/api/cron/runs/{sessionId}"] + end + subgraph ui [UI Components] + Sidebar["~cron virtual folder"] + Dashboard["CronDashboard"] + JobDetail["CronJobDetail"] + RunChat["CronRunChat"] + end + JobsJSON --> CronJobsAPI --> Dashboard + RunLogs --> CronRunsAPI --> JobDetail + Sessions --> CronRunSessionAPI --> RunChat + Dashboard --> Sidebar + JobDetail --> Sidebar + RunChat --> Sidebar +``` + +## 1. API Routes + +### `GET /api/cron/jobs` -- [apps/web/app/api/cron/jobs/route.ts](apps/web/app/api/cron/jobs/route.ts) + +Read `~/.openclaw/cron/jobs.json` and return all jobs with computed status fields. Also scan `~/.openclaw/cron/runs/` to get latest run info. Include heartbeat info by reading agent config from `~/.openclaw/config.yaml` (heartbeat interval, next due estimate). + +Returns: + +```typescript +{ + jobs: CronJob[], + heartbeat: { + intervalMs: number, + nextDueEstimateMs: number | null + }, + cronStatus: { + enabled: boolean, + nextWakeAtMs: number | null + } +} +``` + +### `GET /api/cron/jobs/[jobId]/runs` -- [apps/web/app/api/cron/jobs/[jobId]/runs/route.ts](apps/web/app/api/cron/jobs/[jobId]/runs/route.ts) + +Read `~/.openclaw/cron/runs/{jobId}.jsonl` using the same logic as `readCronRunLogEntries` in [src/cron/run-log.ts](src/cron/run-log.ts). Return entries with `?limit=N` support. + +Returns: + +```typescript +{ entries: CronRunLogEntry[] } +``` + +### `GET /api/cron/runs/[sessionId]` -- [apps/web/app/api/cron/runs/[sessionId]/route.ts](apps/web/app/api/cron/runs/[sessionId]/route.ts) + +Find and read `~/.openclaw/agents/*/sessions/{sessionId}.jsonl` (same search pattern as existing [apps/web/app/api/sessions/[sessionId]/route.ts](apps/web/app/api/sessions/[sessionId]/route.ts)). Parse JSONL into messages including `thinking` blocks, tool calls, and text -- return the full conversation with all parts preserved (not truncated like the existing session API). + +Returns: + +```typescript +{ + sessionId: string, + messages: Array<{ + id: string, + role: "user" | "assistant", + parts: Array< + | { type: "text", text: string } + | { type: "thinking", thinking: string } + | { type: "tool-call", toolName: string, args: unknown, output?: string } + >, + timestamp: string + }> +} +``` + +## 2. Sidebar Integration + +In [apps/web/app/workspace/page.tsx](apps/web/app/workspace/page.tsx), add a `~cron` virtual folder alongside the existing `~chats` folder: + +- Fetch cron jobs from `/api/cron/jobs` on mount (same pattern as `fetchSessions`). +- Build a `~cron` `TreeNode` folder with children: + - Each job becomes a folder: `~cron/{jobId}` (name = job name, with status icon in the name string) + - Clicking `~cron` opens the cron dashboard view + - Clicking `~cron/{jobId}` opens the job detail view +- Handle `~cron` paths in `handleNodeSelect` (same pattern as `~chats`). + +```typescript +const cronFolder: TreeNode = { + name: "Cron", + path: "~cron", + type: "folder", + virtual: true, + children: cronJobs.map((j) => ({ + name: `${statusEmoji(j)} ${j.name}`, + path: `~cron/${j.id}`, + type: "folder", + virtual: true, + })), +}; +return [...tree, chatsFolder, cronFolder]; +``` + +## 3. UI Components + +### CronDashboard ([apps/web/app/components/cron/cron-dashboard.tsx](apps/web/app/components/cron/cron-dashboard.tsx)) + +Shown when user clicks `~cron` in sidebar. Displays: + +- **Heartbeat status card**: interval (e.g. "every 30m"), estimated next heartbeat as a live countdown, explanation of how heartbeat decides which cron jobs to run +- **Cron scheduler status**: enabled/disabled, next wake time as countdown +- **Jobs table/list**: all jobs with columns: + - Name, Schedule (human-readable), Status (running/ok/error/disabled/idle), Next run (countdown), Last run (time-ago + duration), Session target (main/isolated) + - Click a job row to navigate to its detail view +- **Timeline view**: visual timeline showing upcoming scheduled runs across all jobs (next 24h) + +### CronJobDetail ([apps/web/app/components/cron/cron-job-detail.tsx](apps/web/app/components/cron/cron-job-detail.tsx)) + +Shown when user clicks `~cron/{jobId}`. Displays: + +- **Job config header**: name, description, schedule (formatted), enabled status, session target, wake mode, payload summary +- **Next run countdown**: large countdown timer to next execution +- **Run history list**: each `CronRunLogEntry` as a card with: + - Timestamp, status badge (ok/error/skipped), duration, summary text + - Click to expand into full run chat (loads session transcript inline or navigates to run view) +- **Error details**: if `consecutiveErrors > 0`, show error streak and last error message + +### CronRunChat ([apps/web/app/components/cron/cron-run-chat.tsx](apps/web/app/components/cron/cron-run-chat.tsx)) + +Displayed inline when expanding a run in `CronJobDetail`. Renders the full session transcript: + +- Reuse `ChatMessage` and `ChainOfThought` components from [apps/web/app/components/chat-message.tsx](apps/web/app/components/chat-message.tsx) and [apps/web/app/components/chain-of-thought.tsx](apps/web/app/components/chain-of-thought.tsx) (or at minimum their rendering patterns) +- Show all thinking/reasoning blocks fully expanded (not collapsed, since these are historical runs) +- Show all tool calls with their inputs and outputs +- Show the system event / prompt that triggered the run +- Display run metadata at the top: model used, tokens consumed, duration + +## 4. Content Routing in Workspace Page + +Extend the `content` state in [apps/web/app/workspace/page.tsx](apps/web/app/workspace/page.tsx) with new kinds: + +```typescript +type ContentState = + | { kind: "none" } + | { kind: "loading" } + | { kind: "document"; ... } + | { kind: "object"; ... } + | { kind: "cron-dashboard"; jobs: CronJob[]; heartbeat: HeartbeatInfo } + | { kind: "cron-job"; jobId: string; job: CronJob; runs: CronRunLogEntry[] } +``` + +In the main content area renderer, add cases for `cron-dashboard` and `cron-job` that render the corresponding components. + +## 5. Data Types (shared) + +Create [apps/web/app/types/cron.ts](apps/web/app/types/cron.ts) with client-side type definitions mirroring `CronJob`, `CronRunLogEntry`, `CronSchedule`, `CronJobState` etc. (copy from [src/cron/types.ts](src/cron/types.ts) and [src/cron/run-log.ts](src/cron/run-log.ts), stripped of server-only imports). + +## Key Design Decisions + +- **Read-only**: The cron UI is read-only (view jobs, runs, transcripts). No add/edit/delete/enable/disable from the web UI -- that stays in the CLI. +- **No gateway connection**: All data read directly from disk via API routes, consistent with the existing web app pattern. +- **Reuse existing components**: Chat rendering reuses `ChatMessage`/`ChainOfThought` patterns rather than building from scratch. +- **Live countdowns**: Use `setInterval` for countdown timers (next heartbeat, next cron run) with periodic refetch of job data to stay current. +- **Auto-refresh**: Poll `/api/cron/jobs` every ~30s to catch state changes from the running gateway. diff --git a/apps/web/app/api/chat/route.ts b/apps/web/app/api/chat/route.ts index a1a142d9f43..43d465796a7 100644 --- a/apps/web/app/api/chat/route.ts +++ b/apps/web/app/api/chat/route.ts @@ -1,6 +1,12 @@ import type { UIMessage } from "ai"; -import { runAgent, type ToolResult } from "@/lib/agent-runner"; import { resolveAgentWorkspacePrefix } from "@/lib/workspace"; +import { + startRun, + hasActiveRun, + subscribeToRun, + persistUserMessage, + type SseEvent, +} from "@/lib/active-runs"; // Force Node.js runtime (required for child_process) export const runtime = "nodejs"; @@ -8,43 +14,19 @@ export const runtime = "nodejs"; // Allow streaming responses up to 10 minutes export const maxDuration = 600; -/** - * Build a flat output object from the agent's tool result so the frontend - * can render tool output text, exit codes, etc. - */ -function buildToolOutput( - result?: ToolResult, -): Record { - if (!result) {return {};} - const out: Record = {}; - if (result.text) {out.text = result.text;} - if (result.details) { - // Forward useful details (exit code, duration, status, cwd) - for (const key of [ - "exitCode", - "status", - "durationMs", - "cwd", - "error", - "reason", - ]) { - if (result.details[key] !== undefined) - {out[key] = result.details[key];} - } - } - return out; -} - export async function POST(req: Request) { - const { messages, sessionId }: { messages: UIMessage[]; sessionId?: string } = - await req.json(); + const { + messages, + sessionId, + }: { messages: UIMessage[]; sessionId?: string } = await req.json(); // Extract the latest user message text const lastUserMessage = messages.filter((m) => m.role === "user").pop(); const userText = lastUserMessage?.parts ?.filter( - (p): p is { type: "text"; text: string } => p.type === "text", + (p): p is { type: "text"; text: string } => + p.type === "text", ) .map((p) => p.text) .join("\n") ?? ""; @@ -53,9 +35,12 @@ export async function POST(req: Request) { return new Response("No message provided", { status: 400 }); } + // Reject if a run is already active for this session. + if (sessionId && hasActiveRun(sessionId)) { + return new Response("Active run in progress", { status: 409 }); + } + // Resolve workspace file paths to be agent-cwd-relative. - // Tree paths are workspace-root-relative (e.g. "knowledge/leads/foo.md"), - // but the agent runs from the repo root and needs "dench/knowledge/leads/foo.md". let agentMessage = userText; const wsPrefix = resolveAgentWorkspacePrefix(); if (wsPrefix) { @@ -65,295 +50,84 @@ export async function POST(req: Request) { ); } - // Create a custom SSE stream using the AI SDK v6 data stream wire format. - // DefaultChatTransport parses these events into UIMessage parts automatically. + // Persist the user message server-side so it survives a page reload + // even if the client never gets a chance to save. + if (sessionId && lastUserMessage) { + persistUserMessage(sessionId, { + id: lastUserMessage.id, + content: userText, + parts: lastUserMessage.parts as unknown[], + }); + } + + // Start the agent run (decoupled from this HTTP connection). + // The child process will keep running even if this response is cancelled. + if (sessionId) { + try { + startRun({ + sessionId, + message: agentMessage, + agentSessionId: sessionId, + }); + } catch (err) { + return new Response( + err instanceof Error ? err.message : String(err), + { status: 500 }, + ); + } + } + + // Stream SSE events to the client using the AI SDK v6 wire format. const encoder = new TextEncoder(); let closed = false; - const abortController = new AbortController(); + let unsubscribe: (() => void) | null = null; + const stream = new ReadableStream({ - async start(controller) { - // Use incrementing IDs so multi-round reasoning/text cycles get - // unique part IDs (avoids conflicts in the AI SDK transport). - let idCounter = 0; - const nextId = (prefix: string) => - `${prefix}-${Date.now()}-${++idCounter}`; + start(controller) { + if (!sessionId) { + // No session — shouldn't happen but close gracefully. + controller.close(); + return; + } - let currentTextId = ""; - let currentReasoningId = ""; - let textStarted = false; - let reasoningStarted = false; - // Track whether ANY text was ever sent across the full run. - // onLifecycleEnd closes the text part (textStarted→false), so - // onClose can't rely on textStarted alone to detect "no output". - let everSentText = false; - // Track whether the status reasoning block is the one currently open - // so we can close it cleanly when real content arrives. - let statusReasoningActive = false; - - /** Write an SSE event; silently no-ops if the stream was already cancelled. */ - const writeEvent = (data: unknown) => { - if (closed) {return;} - const json = JSON.stringify(data); - controller.enqueue(encoder.encode(`data: ${json}\n\n`)); - }; - - /** Close the reasoning part if open. */ - const closeReasoning = () => { - if (reasoningStarted) { - writeEvent({ - type: "reasoning-end", - id: currentReasoningId, - }); - reasoningStarted = false; - statusReasoningActive = false; - } - }; - - /** Close the text part if open. */ - const closeText = () => { - if (textStarted) { - writeEvent({ type: "text-end", id: currentTextId }); - textStarted = false; - } - }; - - /** Open a status reasoning block (auto-closes any existing one). */ - const openStatusReasoning = (label: string) => { - closeReasoning(); - closeText(); - currentReasoningId = nextId("status"); - writeEvent({ - type: "reasoning-start", - id: currentReasoningId, - }); - writeEvent({ - type: "reasoning-delta", - id: currentReasoningId, - delta: label, - }); - reasoningStarted = true; - statusReasoningActive = true; - }; - - try { - await runAgent(agentMessage, abortController.signal, { - onLifecycleStart: () => { - // Show immediate feedback — the agent has started working. - // This eliminates the "Streaming... (silence)" gap. - openStatusReasoning("Preparing response..."); - }, - - onThinkingDelta: (delta) => { - // Close the status block if it's still the active one; - // real reasoning content is now arriving. - if (statusReasoningActive) { - closeReasoning(); + unsubscribe = subscribeToRun( + sessionId, + (event: SseEvent | null) => { + if (closed) {return;} + if (event === null) { + // Run completed — close the SSE stream. + closed = true; + try { + controller.close(); + } catch { + /* already closed */ } - if (!reasoningStarted) { - currentReasoningId = nextId("reasoning"); - writeEvent({ - type: "reasoning-start", - id: currentReasoningId, - }); - reasoningStarted = true; - } - writeEvent({ - type: "reasoning-delta", - id: currentReasoningId, - delta, - }); - }, + return; + } + try { + const json = JSON.stringify(event); + controller.enqueue( + encoder.encode(`data: ${json}\n\n`), + ); + } catch { + /* ignore enqueue errors on closed stream */ + } + }, + // Don't replay — we just created the run, the buffer is empty. + { replay: false }, + ); - onTextDelta: (delta) => { - // Close reasoning once text starts streaming - closeReasoning(); - - if (!textStarted) { - currentTextId = nextId("text"); - writeEvent({ - type: "text-start", - id: currentTextId, - }); - textStarted = true; - } - everSentText = true; - writeEvent({ - type: "text-delta", - id: currentTextId, - delta, - }); - }, - - onToolStart: (toolCallId, toolName, args) => { - // Close open reasoning/text parts before tool events - closeReasoning(); - closeText(); - - writeEvent({ - type: "tool-input-start", - toolCallId, - toolName, - }); - // Include actual tool arguments so the frontend can - // display what the tool is doing (command, path, etc.) - writeEvent({ - type: "tool-input-available", - toolCallId, - toolName, - input: args ?? {}, - }); - }, - - onToolEnd: ( - toolCallId, - _toolName, - isError, - result, - ) => { - if (isError) { - const errorText = - result?.text || - (result?.details?.error as - | string - | undefined) || - "Tool execution failed"; - writeEvent({ - type: "tool-output-error", - toolCallId, - errorText, - }); - } else { - // Include the actual tool output (text, exit code, etc.) - writeEvent({ - type: "tool-output-available", - toolCallId, - output: buildToolOutput(result), - }); - } - }, - - onCompactionStart: () => { - // Show compaction status while the gateway is - // optimizing the session context (can take 10-30s). - openStatusReasoning("Optimizing session context..."); - }, - - onCompactionEnd: (willRetry) => { - // Close the compaction status block. If the gateway - // will retry the prompt, leave the reasoning area open - // so the next status/thinking block follows smoothly. - if (statusReasoningActive) { - if (willRetry) { - // Append a note, keep block open for retry - writeEvent({ - type: "reasoning-delta", - id: currentReasoningId, - delta: "\nRetrying with compacted context...", - }); - } else { - closeReasoning(); - } - } - }, - - onLifecycleEnd: () => { - closeReasoning(); - closeText(); - }, - - onAgentError: (message) => { - // Surface agent-level errors (API 402, rate limits, etc.) - // as visible text in the chat so the user sees what happened. - closeReasoning(); - closeText(); - - currentTextId = nextId("text"); - writeEvent({ - type: "text-start", - id: currentTextId, - }); - writeEvent({ - type: "text-delta", - id: currentTextId, - delta: `[error] ${message}`, - }); - writeEvent({ - type: "text-end", - id: currentTextId, - }); - textStarted = false; - everSentText = true; - }, - - onError: (err) => { - console.error("[chat] Agent error:", err); - closeReasoning(); - closeText(); - - currentTextId = nextId("text"); - writeEvent({ - type: "text-start", - id: currentTextId, - }); - textStarted = true; - everSentText = true; - writeEvent({ - type: "text-delta", - id: currentTextId, - delta: `[error] Failed to start agent: ${err.message}`, - }); - writeEvent({ type: "text-end", id: currentTextId }); - textStarted = false; - }, - - onClose: (_code) => { - closeReasoning(); - if (!everSentText) { - // No text was ever sent during the entire run - currentTextId = nextId("text"); - writeEvent({ - type: "text-start", - id: currentTextId, - }); - const msg = - _code !== null && _code !== 0 - ? `[error] Agent exited with code ${_code}. Check server logs for details.` - : "[error] No response from agent."; - writeEvent({ - type: "text-delta", - id: currentTextId, - delta: msg, - }); - writeEvent({ - type: "text-end", - id: currentTextId, - }); - } else { - // Ensure any still-open text part is closed - closeText(); - } - }, - }, sessionId ? { sessionId } : undefined); - } catch (error) { - console.error("[chat] Stream error:", error); - writeEvent({ - type: "error", - errorText: - error instanceof Error - ? error.message - : String(error), - }); - } finally { - if (!closed) { - closed = true; - controller.close(); - } + if (!unsubscribe) { + // Race: run was cleaned up between startRun and subscribe. + closed = true; + controller.close(); } }, cancel() { - // Client disconnected (e.g. user hit stop) — tear down gracefully. + // Client disconnected — unsubscribe but keep the run alive. + // The ActiveRunManager continues buffering + persisting in the background. closed = true; - abortController.abort(); + unsubscribe?.(); }, }); diff --git a/apps/web/app/api/chat/stop/route.ts b/apps/web/app/api/chat/stop/route.ts new file mode 100644 index 00000000000..1642915773c --- /dev/null +++ b/apps/web/app/api/chat/stop/route.ts @@ -0,0 +1,22 @@ +/** + * POST /api/chat/stop + * + * Abort an active agent run. Called by the Stop button. + * The child process is sent SIGTERM and the run transitions to "error" state. + */ +import { abortRun } from "@/lib/active-runs"; + +export const runtime = "nodejs"; + +export async function POST(req: Request) { + const body: { sessionId?: string } = await req + .json() + .catch(() => ({})); + + if (!body.sessionId) { + return new Response("sessionId required", { status: 400 }); + } + + const aborted = abortRun(body.sessionId); + return Response.json({ aborted }); +} diff --git a/apps/web/app/api/chat/stream/route.ts b/apps/web/app/api/chat/stream/route.ts new file mode 100644 index 00000000000..0a23ee7f8b8 --- /dev/null +++ b/apps/web/app/api/chat/stream/route.ts @@ -0,0 +1,87 @@ +/** + * GET /api/chat/stream?sessionId=xxx + * + * Reconnect to an active (or recently-completed) agent run. + * Replays all buffered SSE events from the start of the run, then + * streams live events until the run finishes. + * + * Returns 404 if no run exists for the given session. + */ +import { + getActiveRun, + subscribeToRun, + type SseEvent, +} from "@/lib/active-runs"; + +export const runtime = "nodejs"; +export const maxDuration = 600; + +export async function GET(req: Request) { + const url = new URL(req.url); + const sessionId = url.searchParams.get("sessionId"); + + if (!sessionId) { + return new Response("sessionId required", { status: 400 }); + } + + const run = getActiveRun(sessionId); + if (!run) { + return Response.json({ active: false }, { status: 404 }); + } + + const encoder = new TextEncoder(); + let closed = false; + let unsubscribe: (() => void) | null = null; + + const stream = new ReadableStream({ + start(controller) { + // subscribeToRun with replay=true replays the full event buffer + // synchronously, then subscribes for live events. + unsubscribe = subscribeToRun( + sessionId, + (event: SseEvent | null) => { + if (closed) {return;} + if (event === null) { + // Run completed — close the SSE stream. + closed = true; + try { + controller.close(); + } catch { + /* already closed */ + } + return; + } + try { + const json = JSON.stringify(event); + controller.enqueue( + encoder.encode(`data: ${json}\n\n`), + ); + } catch { + /* ignore enqueue errors on closed stream */ + } + }, + { replay: true }, + ); + + if (!unsubscribe) { + // Run was cleaned up between getActiveRun and subscribe. + closed = true; + controller.close(); + } + }, + cancel() { + // Client disconnected — unsubscribe only (don't kill the run). + closed = true; + unsubscribe?.(); + }, + }); + + return new Response(stream, { + headers: { + "Content-Type": "text/event-stream", + "Cache-Control": "no-cache, no-transform", + Connection: "keep-alive", + "X-Run-Active": run.status === "running" ? "true" : "false", + }, + }); +} diff --git a/apps/web/app/api/cron/jobs/[jobId]/runs/route.ts b/apps/web/app/api/cron/jobs/[jobId]/runs/route.ts new file mode 100644 index 00000000000..4f27a81b8e5 --- /dev/null +++ b/apps/web/app/api/cron/jobs/[jobId]/runs/route.ts @@ -0,0 +1,86 @@ +import { readFileSync, existsSync } from "node:fs"; +import { join } from "node:path"; +import { homedir } from "node:os"; + +export const dynamic = "force-dynamic"; + +const CRON_DIR = join(homedir(), ".openclaw", "cron"); + +type CronRunLogEntry = { + ts: number; + jobId: string; + action: "finished"; + status?: string; + error?: string; + summary?: string; + sessionId?: string; + sessionKey?: string; + runAtMs?: number; + durationMs?: number; + nextRunAtMs?: number; +}; + +/** Read run log entries from a JSONL file, returning most recent first (then reversed). */ +function readRunLog(filePath: string, limit: number): CronRunLogEntry[] { + if (!existsSync(filePath)) {return [];} + try { + const raw = readFileSync(filePath, "utf-8"); + if (!raw.trim()) {return [];} + const lines = raw.split("\n"); + const parsed: CronRunLogEntry[] = []; + for (let i = lines.length - 1; i >= 0 && parsed.length < limit; i--) { + const line = lines[i]?.trim(); + if (!line) {continue;} + try { + const obj = JSON.parse(line) as Partial; + if (!obj || typeof obj !== "object") {continue;} + if (obj.action !== "finished") {continue;} + if (typeof obj.jobId !== "string" || !obj.jobId.trim()) {continue;} + if (typeof obj.ts !== "number" || !Number.isFinite(obj.ts)) {continue;} + const entry: CronRunLogEntry = { + ts: obj.ts, + jobId: obj.jobId, + action: "finished", + status: obj.status, + error: obj.error, + summary: obj.summary, + runAtMs: obj.runAtMs, + durationMs: obj.durationMs, + nextRunAtMs: obj.nextRunAtMs, + }; + if (typeof obj.sessionId === "string" && obj.sessionId.trim()) { + entry.sessionId = obj.sessionId; + } + if (typeof obj.sessionKey === "string" && obj.sessionKey.trim()) { + entry.sessionKey = obj.sessionKey; + } + parsed.push(entry); + } catch { + // skip malformed lines + } + } + return parsed.toReversed(); + } catch { + return []; + } +} + +/** GET /api/cron/jobs/[jobId]/runs -- list run log entries for a cron job */ +export async function GET( + request: Request, + { params }: { params: Promise<{ jobId: string }> }, +) { + const { jobId } = await params; + if (!jobId) { + return Response.json({ error: "Job ID required" }, { status: 400 }); + } + + const url = new URL(request.url); + const limitParam = url.searchParams.get("limit"); + const limit = Math.max(1, Math.min(500, Number(limitParam) || 100)); + + const logPath = join(CRON_DIR, "runs", `${jobId}.jsonl`); + const entries = readRunLog(logPath, limit); + + return Response.json({ entries }); +} diff --git a/apps/web/app/api/cron/jobs/route.ts b/apps/web/app/api/cron/jobs/route.ts new file mode 100644 index 00000000000..7d95010dd3c --- /dev/null +++ b/apps/web/app/api/cron/jobs/route.ts @@ -0,0 +1,99 @@ +import { readFileSync, existsSync, readdirSync } from "node:fs"; +import { join } from "node:path"; +import { homedir } from "node:os"; + +export const dynamic = "force-dynamic"; + +const CRON_DIR = join(homedir(), ".openclaw", "cron"); +const JOBS_FILE = join(CRON_DIR, "jobs.json"); + +type CronStoreFile = { + version: 1; + jobs: Array>; +}; + +/** Read cron jobs.json, returning empty array if missing or invalid. */ +function readJobsFile(): Array> { + if (!existsSync(JOBS_FILE)) {return [];} + try { + const raw = readFileSync(JOBS_FILE, "utf-8"); + const parsed = JSON.parse(raw) as CronStoreFile; + if (parsed && Array.isArray(parsed.jobs)) {return parsed.jobs;} + return []; + } catch { + return []; + } +} + +/** Compute next wake time from job states (minimum nextRunAtMs among enabled jobs). */ +function computeNextWakeAtMs(jobs: Array>): number | null { + let min: number | null = null; + for (const job of jobs) { + if (job.enabled !== true) {continue;} + const state = job.state as Record | undefined; + if (!state) {continue;} + const next = state.nextRunAtMs; + if (typeof next === "number" && Number.isFinite(next)) { + if (min === null || next < min) {min = next;} + } + } + return min; +} + +/** Read heartbeat config from ~/.openclaw/config.yaml (best-effort). */ +function readHeartbeatInfo(): { intervalMs: number; nextDueEstimateMs: number | null } { + const defaults = { intervalMs: 30 * 60_000, nextDueEstimateMs: null as number | null }; + + // Try to read agent session stores to estimate next heartbeat from lastRunMs + try { + const agentsDir = join(homedir(), ".openclaw", "agents"); + if (!existsSync(agentsDir)) {return defaults;} + + const agentDirs = readdirSync(agentsDir, { withFileTypes: true }); + let latestHeartbeat: number | null = null; + + for (const d of agentDirs) { + if (!d.isDirectory()) {continue;} + const storePath = join(agentsDir, d.name, "sessions", "sessions.json"); + if (!existsSync(storePath)) {continue;} + try { + const raw = readFileSync(storePath, "utf-8"); + const store = JSON.parse(raw) as Record; + // Look for the main agent session (shortest key, most recently updated) + for (const [key, entry] of Object.entries(store)) { + if (key.startsWith("agent:") && !key.includes(":cron:") && entry.updatedAt) { + if (latestHeartbeat === null || entry.updatedAt > latestHeartbeat) { + latestHeartbeat = entry.updatedAt; + } + } + } + } catch { + // skip + } + } + + if (latestHeartbeat) { + defaults.nextDueEstimateMs = latestHeartbeat + defaults.intervalMs; + } + } catch { + // ignore + } + + return defaults; +} + +/** GET /api/cron/jobs -- list all cron jobs with heartbeat & status info */ +export async function GET() { + const jobs = readJobsFile(); + const heartbeat = readHeartbeatInfo(); + const nextWakeAtMs = computeNextWakeAtMs(jobs); + + return Response.json({ + jobs, + heartbeat, + cronStatus: { + enabled: jobs.length > 0, + nextWakeAtMs, + }, + }); +} diff --git a/apps/web/app/api/cron/runs/[sessionId]/route.ts b/apps/web/app/api/cron/runs/[sessionId]/route.ts new file mode 100644 index 00000000000..36a420a1140 --- /dev/null +++ b/apps/web/app/api/cron/runs/[sessionId]/route.ts @@ -0,0 +1,154 @@ +import { readFileSync, readdirSync, existsSync } from "node:fs"; +import { join } from "node:path"; +import { homedir } from "node:os"; + +export const dynamic = "force-dynamic"; + +type MessagePart = + | { type: "text"; text: string } + | { type: "thinking"; thinking: string } + | { type: "tool-call"; toolName: string; toolCallId: string; args?: unknown; output?: string }; + +type ParsedMessage = { + id: string; + role: "user" | "assistant" | "system"; + parts: MessagePart[]; + timestamp: string; +}; + +/** Search agent session directories for a session file by ID. */ +function findSessionFile(sessionId: string): string | null { + const agentsDir = join(homedir(), ".openclaw", "agents"); + if (!existsSync(agentsDir)) {return null;} + + try { + const agentDirs = readdirSync(agentsDir, { withFileTypes: true }); + for (const agentDir of agentDirs) { + if (!agentDir.isDirectory()) {continue;} + const sessionFile = join(agentsDir, agentDir.name, "sessions", `${sessionId}.jsonl`); + if (existsSync(sessionFile)) {return sessionFile;} + } + } catch { + // ignore + } + return null; +} + +/** Parse a JSONL session transcript into structured messages with thinking and tool calls. */ +function parseSessionTranscript(content: string): ParsedMessage[] { + const lines = content.trim().split("\n").filter((l) => l.trim()); + const messages: ParsedMessage[] = []; + + // Track tool calls for linking invocations with results + const pendingToolCalls = new Map(); + + for (const line of lines) { + try { + const entry = JSON.parse(line); + + if (entry.type === "message" && entry.message) { + const msg = entry.message; + const role = msg.role as "user" | "assistant" | "system"; + const parts: MessagePart[] = []; + + if (Array.isArray(msg.content)) { + for (const part of msg.content) { + if (part.type === "text" && typeof part.text === "string" && part.text.trim()) { + parts.push({ type: "text", text: part.text }); + } else if (part.type === "thinking" && typeof part.thinking === "string" && part.thinking.trim()) { + parts.push({ type: "thinking", thinking: part.thinking }); + } else if (part.type === "tool_use" || part.type === "tool-call") { + const toolName = part.name ?? part.toolName ?? "unknown"; + const toolCallId = part.id ?? part.toolCallId ?? `tool-${Date.now()}`; + pendingToolCalls.set(toolCallId, { toolName, args: part.input ?? part.args }); + parts.push({ + type: "tool-call", + toolName, + toolCallId, + args: part.input ?? part.args, + }); + } else if (part.type === "tool_result" || part.type === "tool-result") { + const toolCallId = part.tool_use_id ?? part.toolCallId ?? ""; + const pending = pendingToolCalls.get(toolCallId); + const outputText = typeof part.content === "string" + ? part.content + : Array.isArray(part.content) + ? part.content.filter((c: { type: string }) => c.type === "text").map((c: { text: string }) => c.text).join("\n") + : typeof part.output === "string" + ? part.output + : JSON.stringify(part.output ?? part.content ?? ""); + + if (pending) { + // Merge output into existing tool-call part + const existingMsg = messages[messages.length - 1]; + if (existingMsg) { + const tc = existingMsg.parts.find( + (p) => p.type === "tool-call" && (p as { toolCallId: string }).toolCallId === toolCallId, + ); + if (tc && tc.type === "tool-call") { + (tc as { output?: string }).output = outputText.slice(0, 5000); + continue; + } + } + parts.push({ + type: "tool-call", + toolName: pending.toolName, + toolCallId, + args: pending.args, + output: outputText.slice(0, 5000), + }); + } else { + parts.push({ + type: "tool-call", + toolName: "tool", + toolCallId, + output: outputText.slice(0, 5000), + }); + } + } + } + } else if (typeof msg.content === "string" && msg.content.trim()) { + parts.push({ type: "text", text: msg.content }); + } + + if (parts.length > 0) { + messages.push({ + id: entry.id ?? `msg-${messages.length}`, + role, + parts, + timestamp: entry.timestamp ?? new Date(entry.ts ?? Date.now()).toISOString(), + }); + } + } + } catch { + // skip malformed lines + } + } + + return messages; +} + +/** GET /api/cron/runs/[sessionId] -- get full session transcript for a cron run */ +export async function GET( + _request: Request, + { params }: { params: Promise<{ sessionId: string }> }, +) { + const { sessionId } = await params; + if (!sessionId) { + return Response.json({ error: "Session ID required" }, { status: 400 }); + } + + const sessionFile = findSessionFile(sessionId); + if (!sessionFile) { + return Response.json({ error: "Session not found" }, { status: 404 }); + } + + try { + const content = readFileSync(sessionFile, "utf-8"); + const messages = parseSessionTranscript(content); + return Response.json({ sessionId, messages }); + } catch (error) { + console.error("Error reading cron session:", error); + return Response.json({ error: "Failed to read session" }, { status: 500 }); + } +} diff --git a/apps/web/app/api/web-sessions/[id]/messages/route.ts b/apps/web/app/api/web-sessions/[id]/messages/route.ts index b2c171a8ff5..2e7f08cae24 100644 --- a/apps/web/app/api/web-sessions/[id]/messages/route.ts +++ b/apps/web/app/api/web-sessions/[id]/messages/route.ts @@ -1,5 +1,4 @@ import { - appendFileSync, readFileSync, writeFileSync, existsSync, @@ -21,7 +20,14 @@ type IndexEntry = { messageCount: number; }; -/** POST /api/web-sessions/[id]/messages — append messages to a session */ +/** + * POST /api/web-sessions/[id]/messages — append or upsert messages. + * + * Uses upsert semantics: if a message with the same `id` already exists + * in the session JSONL, it is replaced in-place. Otherwise the message + * is appended. This supports both the client's post-stream save and the + * server-side incremental persistence from the ActiveRunManager. + */ export async function POST( request: Request, { params }: { params: Promise<{ id: string }> }, @@ -43,11 +49,39 @@ export async function POST( return Response.json({ error: "messages array required" }, { status: 400 }); } - // Append each message as a JSONL line + // Read existing lines for upsert checks. + const existing = readFileSync(filePath, "utf-8"); + const lines = existing.split("\n").filter((l) => l.trim()); + let newCount = 0; + for (const msg of messages) { - appendFileSync(filePath, JSON.stringify(msg) + "\n"); + const msgId = typeof msg.id === "string" ? msg.id : undefined; + let found = false; + + if (msgId) { + for (let i = 0; i < lines.length; i++) { + try { + const parsed = JSON.parse(lines[i]); + if (parsed.id === msgId) { + // Replace the existing line in-place. + lines[i] = JSON.stringify(msg); + found = true; + break; + } + } catch { + /* keep malformed lines as-is */ + } + } + } + + if (!found) { + lines.push(JSON.stringify(msg)); + newCount++; + } } + writeFileSync(filePath, lines.join("\n") + "\n"); + // Update index metadata try { if (existsSync(INDEX_FILE)) { @@ -57,8 +91,8 @@ export async function POST( const session = index.find((s) => s.id === id); if (session) { session.updatedAt = Date.now(); - session.messageCount += messages.length; - if (title) session.title = title; + if (newCount > 0) {session.messageCount += newCount;} + if (title) {session.title = title;} writeFileSync(INDEX_FILE, JSON.stringify(index, null, 2)); } } diff --git a/apps/web/app/components/chain-of-thought.tsx b/apps/web/app/components/chain-of-thought.tsx index c7fc3c13552..64e845dab19 100644 --- a/apps/web/app/components/chain-of-thought.tsx +++ b/apps/web/app/components/chain-of-thought.tsx @@ -178,7 +178,7 @@ function extractDomains(text: string): string[] { } function faviconUrl(domain: string): string { - return `https://www.google.com/s2/favicons?domain=${encodeURIComponent(domain)}&sz=32`; + return `https://www.google.com/s2/favicons?sz=64&domain_url=${encodeURIComponent(domain)}`; } /* ─── Classify tool steps ─── */ @@ -192,7 +192,10 @@ type StepKind = | "image" | "generic"; -function classifyTool(name: string): StepKind { +function classifyTool( + name: string, + args?: Record, +): StepKind { const n = name.toLowerCase().replace(/[_-]/g, ""); if ( [ @@ -205,6 +208,18 @@ function classifyTool(name: string): StepKind { ].some((k) => n.includes(k)) ) {return "search";} + + // Browser tool — classify based on the action being performed + if (n === "browser") { + const action = + typeof args?.action === "string" + ? args.action.toLowerCase() + : ""; + if (action === "open" || action === "navigate") {return "fetch";} + if (action === "screenshot") {return "image";} + return "fetch"; // default: most browser actions involve a page + } + if ( ["fetchurl", "fetch", "browse", "browseurl", "webfetch"].some( (k) => n.includes(k), @@ -278,7 +293,10 @@ function buildStepLabel( } case "fetch": { const u = - strVal("url") ?? strVal("path") ?? strVal("src"); + strVal("url") ?? + strVal("targetUrl") ?? + strVal("path") ?? + strVal("src"); if (u) { try { return `Fetching ${new URL(u).hostname}`; @@ -286,6 +304,17 @@ function buildStepLabel( return `Fetching ${u.length > 50 ? u.slice(0, 50) + "..." : u}`; } } + // Fallback: check output for the URL (web_fetch results include url/finalUrl) + const outUrl = + (typeof output?.finalUrl === "string" && output.finalUrl) || + (typeof output?.url === "string" && output.url); + if (outUrl) { + try { + return `Fetched ${new URL(outUrl).hostname}`; + } catch { + return `Fetched ${outUrl.length > 50 ? outUrl.slice(0, 50) + "..." : outUrl}`; + } + } return "Fetching page"; } case "read": { @@ -334,10 +363,13 @@ function getSearchDomains( if (!output) {return [];} const text = typeof output.text === "string" ? output.text : ""; const results = output.results; + const citations = output.citations; let combined = text; if (Array.isArray(results)) { for (const r of results) { - if (typeof r === "object" && r !== null) { + if (typeof r === "string") { + combined += ` ${r}`; + } else if (typeof r === "object" && r !== null) { const obj = r as Record; if (typeof obj.url === "string") {combined += ` ${obj.url}`;} @@ -346,9 +378,64 @@ function getSearchDomains( } } } + if (Array.isArray(citations)) { + for (const c of citations) { + // Citations can be plain URL strings or objects with a url field + if (typeof c === "string") { + combined += ` ${c}`; + } else if (typeof c === "object" && c !== null) { + const obj = c as Record; + if (typeof obj.url === "string") + {combined += ` ${obj.url}`;} + } + } + } + // Scan all remaining string values in the output for URLs we may have missed + for (const val of Object.values(output)) { + if (typeof val === "string" && val !== text && val.includes("http")) { + combined += ` ${val}`; + } + } return extractDomains(combined); } +/** Extract domain(s) from fetch/browser tool args and/or output */ +function getFetchDomains( + args?: Record, + output?: Record, +): string[] { + const domains = new Set(); + // Check args for URL (web_fetch uses "url", browser tool uses "targetUrl") + for (const key of ["url", "targetUrl", "path", "src"]) { + const v = args?.[key]; + if (typeof v === "string" && v.startsWith("http")) { + try { + const hostname = new URL(v).hostname; + if (hostname && !hostname.includes("localhost")) { + domains.add(hostname); + } + } catch { + /* skip */ + } + } + } + // Check output for URL / finalUrl + for (const key of ["url", "finalUrl", "targetUrl"]) { + const v = output?.[key]; + if (typeof v === "string" && v.startsWith("http")) { + try { + const hostname = new URL(v).hostname; + if (hostname && !hostname.includes("localhost")) { + domains.add(hostname); + } + } catch { + /* skip */ + } + } + } + return [...domains].slice(0, 4); +} + /* ─── Group consecutive media reads ─── */ type ToolPart = Extract; @@ -366,7 +453,7 @@ function groupToolSteps(tools: ToolPart[]): VisualItem[] { let i = 0; while (i < tools.length) { const tool = tools[i]; - const kind = classifyTool(tool.toolName); + const kind = classifyTool(tool.toolName, tool.args); // Check both args AND output for the file path const filePath = getFilePath(tool.args, tool.output); const media = filePath ? detectMedia(filePath) : null; @@ -379,7 +466,7 @@ function groupToolSteps(tools: ToolPart[]): VisualItem[] { let j = i + 1; while (j < tools.length) { const next = tools[j]; - const nextKind = classifyTool(next.toolName); + const nextKind = classifyTool(next.toolName, next.args); const nextPath = getFilePath(next.args, next.output); const nextMedia = nextPath ? detectMedia(nextPath) : null; if (nextKind === "read" && nextMedia === media && nextPath) { @@ -972,12 +1059,14 @@ function ToolStep({ errorText?: string; }) { const [showOutput, setShowOutput] = useState(false); - const kind = classifyTool(toolName); + const kind = classifyTool(toolName, args); const label = buildStepLabel(kind, toolName, args, output); const domains = - (kind === "search" || kind === "fetch") && status === "done" + kind === "search" ? getSearchDomains(output) - : []; + : kind === "fetch" + ? getFetchDomains(args, output) + : []; const outputText = typeof output?.text === "string" ? output.text : undefined; @@ -1067,8 +1156,8 @@ function ToolStep({ /> )} - {/* Search domain badges */} - {domains.length > 0 && ( + {/* Domain badges (search results / fetched page) — skip when running, the running section handles its own */} + {domains.length > 0 && status !== "running" && (
{domains.map((domain) => ( - - - {kind === "fetch" - ? "Fetching..." - : "Searching..."} - + {/* Show favicon badges for known domains while running */} + {domains.length > 0 + ? domains.map((domain) => ( + + {/* eslint-disable-next-line @next/next/no-img-element */} + + {domain.replace( + /^www\./, + "", + )} + + + )) + : ( + + + {kind === "fetch" + ? "Fetching..." + : "Searching..."} + + )}
)} @@ -1177,8 +1305,8 @@ function DomainBadge({ domain }: { domain: string }) { diff --git a/apps/web/app/components/chat-panel.tsx b/apps/web/app/components/chat-panel.tsx index 6d8e7ed3e24..20951763a65 100644 --- a/apps/web/app/components/chat-panel.tsx +++ b/apps/web/app/components/chat-panel.tsx @@ -13,6 +13,139 @@ import { } from "react"; import { ChatMessage } from "./chat-message"; +// ── SSE stream parser for reconnection ── +// Converts raw SSE events (AI SDK v6 wire format) into UIMessage parts. + +type ParsedPart = + | { type: "text"; text: string } + | { type: "reasoning"; text: string; state?: string } + | { + type: "dynamic-tool"; + toolName: string; + toolCallId: string; + state: string; + input?: Record; + output?: Record; + }; + +function createStreamParser() { + const parts: ParsedPart[] = []; + let currentTextIdx = -1; + let currentReasoningIdx = -1; + + function processEvent(event: Record) { + const t = event.type as string; + + switch (t) { + case "reasoning-start": + parts.push({ + type: "reasoning", + text: "", + state: "streaming", + }); + currentReasoningIdx = parts.length - 1; + break; + case "reasoning-delta": { + if (currentReasoningIdx >= 0) { + const p = parts[currentReasoningIdx] as { + type: "reasoning"; + text: string; + }; + p.text += event.delta as string; + } + break; + } + case "reasoning-end": + if (currentReasoningIdx >= 0) { + const p = parts[currentReasoningIdx] as { + type: "reasoning"; + state?: string; + }; + delete p.state; + } + currentReasoningIdx = -1; + break; + case "text-start": + parts.push({ type: "text", text: "" }); + currentTextIdx = parts.length - 1; + break; + case "text-delta": { + if (currentTextIdx >= 0) { + const p = parts[currentTextIdx] as { + type: "text"; + text: string; + }; + p.text += event.delta as string; + } + break; + } + case "text-end": + currentTextIdx = -1; + break; + case "tool-input-start": + parts.push({ + type: "dynamic-tool", + toolCallId: event.toolCallId as string, + toolName: event.toolName as string, + state: "input-available", + input: {}, + }); + break; + case "tool-input-available": + for (let i = parts.length - 1; i >= 0; i--) { + const p = parts[i]; + if ( + p.type === "dynamic-tool" && + p.toolCallId === event.toolCallId + ) { + p.input = + (event.input as Record) ?? + {}; + break; + } + } + break; + case "tool-output-available": + for (let i = parts.length - 1; i >= 0; i--) { + const p = parts[i]; + if ( + p.type === "dynamic-tool" && + p.toolCallId === event.toolCallId + ) { + p.state = "output-available"; + p.output = + (event.output as Record< + string, + unknown + >) ?? {}; + break; + } + } + break; + case "tool-output-error": + for (let i = parts.length - 1; i >= 0; i--) { + const p = parts[i]; + if ( + p.type === "dynamic-tool" && + p.toolCallId === event.toolCallId + ) { + p.state = "error"; + p.output = { + error: event.errorText as string, + }; + break; + } + } + break; + } + } + + return { + processEvent, + getParts: (): ParsedPart[] => parts.map((p) => ({ ...p })), + }; +} + /** Imperative handle for parent-driven session control (main page). */ export type ChatPanelHandle = { loadSession: (sessionId: string) => Promise; @@ -64,6 +197,10 @@ export const ChatPanel = forwardRef( const [startingNewSession, setStartingNewSession] = useState(false); const messagesEndRef = useRef(null); + // ── Reconnection state ── + const [isReconnecting, setIsReconnecting] = useState(false); + const reconnectAbortRef = useRef(null); + // Track persisted messages to avoid double-saves const savedMessageIdsRef = useRef>(new Set()); // Set when /new or + triggers a new session @@ -101,7 +238,9 @@ export const ChatPanel = forwardRef( useChat({ transport }); const isStreaming = - status === "streaming" || status === "submitted"; + status === "streaming" || + status === "submitted" || + isReconnecting; // Auto-scroll to bottom on new messages useEffect(() => { @@ -129,67 +268,119 @@ export const ChatPanel = forwardRef( [filePath], ); - const saveMessages = useCallback( + // ── Stream reconnection ── + // Attempts to reconnect to an active agent run for the given session. + // Replays buffered SSE events and streams live updates. + const attemptReconnect = useCallback( async ( sessionId: string, - msgs: Array<{ + baseMessages: Array<{ id: string; - role: string; - content: string; - parts?: unknown[]; + role: "user" | "assistant" | "system"; + parts: UIMessage["parts"]; }>, - title?: string, - ) => { - const toSave = msgs.map((m) => ({ - id: m.id, - role: m.role, - content: m.content, - ...(m.parts ? { parts: m.parts } : {}), - timestamp: new Date().toISOString(), - })); + ): Promise => { + const abort = new AbortController(); + reconnectAbortRef.current = abort; + try { - await fetch( - `/api/web-sessions/${sessionId}/messages`, - { - method: "POST", - headers: { - "Content-Type": "application/json", - }, - body: JSON.stringify({ - messages: toSave, - title, - }), - }, + const res = await fetch( + `/api/chat/stream?sessionId=${encodeURIComponent(sessionId)}`, + { signal: abort.signal }, ); - for (const m of msgs) { + if (!res.ok || !res.body) { + return false; // No active run + } + + setIsReconnecting(true); + + const parser = createStreamParser(); + const reader = res.body.getReader(); + const decoder = new TextDecoder(); + const reconnectMsgId = `reconnect-${sessionId}`; + let buffer = ""; + let frameRequested = false; + + const updateUI = () => { + const assistantMsg = { + id: reconnectMsgId, + role: "assistant" as const, + parts: parser.getParts() as UIMessage["parts"], + }; + setMessages([ + ...baseMessages, + assistantMsg, + ]); + }; + + // Read the SSE stream + // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition -- loop reads until done + while (true) { + const { done, value } = + await reader.read(); + if (done) {break;} + + buffer += decoder.decode(value, { + stream: true, + }); + + // Parse SSE events (data: \n\n) + let idx; + while ( + (idx = buffer.indexOf("\n\n")) !== -1 + ) { + const chunk = buffer.slice(0, idx); + buffer = buffer.slice(idx + 2); + + if (chunk.startsWith("data: ")) { + try { + const event = JSON.parse( + chunk.slice(6), + ); + parser.processEvent(event); + } catch { + /* skip malformed events */ + } + } + } + + // Batch UI updates to animation frames + if (!frameRequested) { + frameRequested = true; + requestAnimationFrame(() => { + frameRequested = false; + updateUI(); + }); + } + } + + // Final update after stream ends + updateUI(); + + // Mark all messages as saved (server persisted them) + for (const m of baseMessages) { savedMessageIdsRef.current.add(m.id); } - onSessionsChange?.(); + savedMessageIdsRef.current.add(reconnectMsgId); + + setIsReconnecting(false); + reconnectAbortRef.current = null; + return true; } catch (err) { - console.error("Failed to save messages:", err); + if ( + (err as Error).name !== "AbortError" + ) { + console.error( + "Reconnection error:", + err, + ); + } + setIsReconnecting(false); + reconnectAbortRef.current = null; + return false; } }, - [onSessionsChange], - ); - - /** Extract plain text from a UIMessage */ - const getMessageText = useCallback( - (msg: (typeof messages)[number]): string => { - return ( - msg.parts - ?.filter( - ( - p, - ): p is { - type: "text"; - text: string; - } => p.type === "text", - ) - .map((p) => p.text) - .join("\n") ?? "" - ); - }, - [], + [setMessages], ); // ── File-scoped session initialization ── @@ -253,9 +444,21 @@ export const ChatPanel = forwardRef( role: "user" | "assistant"; content: string; parts?: Array>; + _streaming?: boolean; }> = msgData.messages || []; - const uiMessages = sessionMessages.map( + // Filter out in-progress streaming messages + // (will be rebuilt from the live SSE stream) + const hasStreaming = sessionMessages.some( + (m) => m._streaming, + ); + const completedMessages = hasStreaming + ? sessionMessages.filter( + (m) => !m._streaming, + ) + : sessionMessages; + + const uiMessages = completedMessages.map( (msg) => { savedMessageIdsRef.current.add(msg.id); return { @@ -273,6 +476,15 @@ export const ChatPanel = forwardRef( if (!cancelled) { setMessages(uiMessages); } + + // If there was a streaming message, try to + // reconnect to the active agent run. + if (hasStreaming && !cancelled) { + await attemptReconnect( + latest.id, + uiMessages, + ); + } } catch { // ignore } @@ -283,9 +495,12 @@ export const ChatPanel = forwardRef( cancelled = true; }; // eslint-disable-next-line react-hooks/exhaustive-deps -- stable setters - }, [filePath]); + }, [filePath, attemptReconnect]); - // ── Persist unsaved messages + live-reload after streaming ── + // ── Post-stream side-effects (file-reload, session refresh) ── + // Message persistence is handled server-side by ActiveRunManager, + // so we only refresh the file sessions list and notify the parent + // when the file content may have changed. const prevStatusRef = useRef(status); useEffect(() => { const wasStreaming = @@ -294,17 +509,10 @@ export const ChatPanel = forwardRef( const isNowReady = status === "ready"; if (wasStreaming && isNowReady && currentSessionId) { - const unsaved = messages.filter( - (m) => !savedMessageIdsRef.current.has(m.id), - ); - if (unsaved.length > 0) { - const toSave = unsaved.map((m) => ({ - id: m.id, - role: m.role, - content: getMessageText(m), - parts: m.parts, - })); - saveMessages(currentSessionId, toSave); + // Mark all current messages as saved — the server + // already persisted them via ActiveRunManager. + for (const m of messages) { + savedMessageIdsRef.current.add(m.id); } if (filePath) { @@ -327,16 +535,17 @@ export const ChatPanel = forwardRef( }) .catch(() => {}); } + + onSessionsChange?.(); } prevStatusRef.current = status; }, [ status, messages, currentSessionId, - saveMessages, - getMessageText, filePath, onFileChanged, + onSessionsChange, ]); // ── Actions ── @@ -391,7 +600,10 @@ export const ChatPanel = forwardRef( return; } + // Stop any active stream/reconnection for the old session. + reconnectAbortRef.current?.abort(); stop(); + setLoadingSession(true); setCurrentSessionId(sessionId); sessionIdRef.current = sessionId; @@ -413,23 +625,43 @@ export const ChatPanel = forwardRef( role: "user" | "assistant"; content: string; parts?: Array>; + _streaming?: boolean; }> = data.messages || []; - const uiMessages = sessionMessages.map((msg) => { - savedMessageIdsRef.current.add(msg.id); - return { - id: msg.id, - role: msg.role, - parts: (msg.parts ?? [ - { - type: "text" as const, - text: msg.content, - }, - ]) as UIMessage["parts"], - }; - }); + const hasStreaming = sessionMessages.some( + (m) => m._streaming, + ); + const completedMessages = hasStreaming + ? sessionMessages.filter( + (m) => !m._streaming, + ) + : sessionMessages; + + const uiMessages = completedMessages.map( + (msg) => { + savedMessageIdsRef.current.add(msg.id); + return { + id: msg.id, + role: msg.role, + parts: (msg.parts ?? [ + { + type: "text" as const, + text: msg.content, + }, + ]) as UIMessage["parts"], + }; + }, + ); setMessages(uiMessages); + + // Reconnect to active stream if one exists. + if (hasStreaming) { + await attemptReconnect( + sessionId, + uiMessages, + ); + } } catch (err) { console.error("Error loading session:", err); } finally { @@ -441,11 +673,14 @@ export const ChatPanel = forwardRef( setMessages, onActiveSessionChange, stop, + attemptReconnect, ], ); const handleNewSession = useCallback(async () => { + reconnectAbortRef.current?.abort(); stop(); + setIsReconnecting(false); setCurrentSessionId(null); sessionIdRef.current = null; onActiveSessionChange?.(null); @@ -477,21 +712,46 @@ export const ChatPanel = forwardRef( [handleSessionSelect, handleNewSession], ); + // ── Stop handler (aborts server-side run + client-side stream) ── + const handleStop = useCallback(async () => { + // Abort the server-side agent run + if (currentSessionId) { + fetch("/api/chat/stop", { + method: "POST", + headers: { + "Content-Type": "application/json", + }, + body: JSON.stringify({ + sessionId: currentSessionId, + }), + }).catch(() => {}); + } + + // Abort reconnection stream if active + reconnectAbortRef.current?.abort(); + setIsReconnecting(false); + + // Stop the useChat transport stream + stop(); + }, [currentSessionId, stop]); + // ── Status label ── const statusLabel = startingNewSession ? "Starting new session..." : loadingSession ? "Loading session..." - : status === "ready" - ? "Ready" - : status === "submitted" - ? "Thinking..." - : status === "streaming" - ? "Streaming..." - : status === "error" - ? "Error" - : status; + : isReconnecting + ? "Resuming stream..." + : status === "ready" + ? "Ready" + : status === "submitted" + ? "Thinking..." + : status === "streaming" + ? "Streaming..." + : status === "error" + ? "Error" + : status; // ── Render ── @@ -577,7 +837,7 @@ export const ChatPanel = forwardRef( {isStreaming && ( + + {/* Job header */} +
+
+

+ {job.name} +

+ +
+ {job.description && ( +

+ {job.description} +

+ )} +
+ + {/* Config + countdown grid */} +
+ {/* Next run countdown */} + + + {/* Job config */} +
+

+ Configuration +

+
+ + + + + {job.agentId && } + {job.delivery && } + +
+
+
+ + {/* Error streak */} + {job.state.consecutiveErrors && job.state.consecutiveErrors > 0 && ( +
+
+ + + + + {job.state.consecutiveErrors} consecutive error{job.state.consecutiveErrors > 1 ? "s" : ""} + +
+ {job.state.lastError && ( +

+ {job.state.lastError} +

+ )} +
+ )} + + {/* Run history */} +
+

+ Run History +

+ + {loadingRuns ? ( +
+
+
+ ) : runs.length === 0 ? ( +
+

+ No runs recorded yet. +

+
+ ) : ( +
+ {runs.toReversed().map((run) => ( + setExpandedRunTs(expandedRunTs === run.ts ? null : run.ts)} + /> + ))} +
+ )} +
+
+ ); +} + +/* ─── Next run countdown card ─── */ + +function NextRunCard({ job }: { job: CronJob }) { + const [now, setNow] = useState(Date.now()); + useEffect(() => { + const id = setInterval(() => setNow(Date.now()), 1000); + return () => clearInterval(id); + }, []); + + const nextMs = job.state.nextRunAtMs; + const isRunning = !!job.state.runningAtMs; + + return ( +
+

+ {isRunning ? "Currently Running" : "Next Run"} +

+ {isRunning ? ( +
+ + + Running now + +
+ ) : nextMs ? ( + <> +
+ {nextMs > now ? formatCountdown(nextMs - now) : "overdue"} +
+
+ {new Date(nextMs).toLocaleString()} +
+ + ) : ( +
+ {job.enabled ? "Not scheduled" : "Disabled"} +
+ )} +
+ ); +} + +/* ─── Run card ─── */ + +function RunCard({ + run, + isExpanded, + onToggle, +}: { + run: CronRunLogEntry; + isExpanded: boolean; + onToggle: () => void; +}) { + const statusColor = run.status === "ok" + ? "var(--color-success, #22c55e)" + : run.status === "error" + ? "var(--color-error, #ef4444)" + : "var(--color-warning, #f59e0b)"; + + return ( +
+ {/* Run header - clickable */} + + + {/* Expanded content */} + {isExpanded && ( +
+ {/* Error message */} + {run.error && ( +
+ {run.error} +
+ )} + + {/* Summary */} + {run.summary && ( +
+ {run.summary} +
+ )} + + {/* Session transcript */} + {run.sessionId ? ( +
+ +
+ ) : ( +
+ No session transcript available for this run. +
+ )} +
+ )} +
+ ); +} + +/* ─── Subcomponents ─── */ + +function StatusBadge({ status }: { status: string }) { + const color = status === "ok" + ? "var(--color-success, #22c55e)" + : status === "running" + ? "var(--color-accent)" + : status === "error" + ? "var(--color-error, #ef4444)" + : "var(--color-text-muted)"; + + return ( + + {status === "running" && ( + + )} + {status} + + ); +} + +function ConfigRow({ label, value }: { label: string; value: string }) { + return ( +
+ + {label} + + + {value} + +
+ ); +} diff --git a/apps/web/app/components/cron/cron-run-chat.tsx b/apps/web/app/components/cron/cron-run-chat.tsx new file mode 100644 index 00000000000..8ae4db0fcc8 --- /dev/null +++ b/apps/web/app/components/cron/cron-run-chat.tsx @@ -0,0 +1,366 @@ +"use client"; + +import { useEffect, useState, useCallback } from "react"; +import ReactMarkdown from "react-markdown"; +import remarkGfm from "remark-gfm"; +import type { SessionMessage, SessionMessagePart, CronRunSessionResponse } from "../../types/cron"; + +/* ─── Main component ─── */ + +export function CronRunChat({ sessionId }: { sessionId: string }) { + const [messages, setMessages] = useState([]); + const [loading, setLoading] = useState(true); + const [error, setError] = useState(null); + + const fetchSession = useCallback(async () => { + try { + const res = await fetch(`/api/cron/runs/${encodeURIComponent(sessionId)}`); + if (!res.ok) { + setError(res.status === 404 ? "Session transcript not found" : "Failed to load session"); + return; + } + const data: CronRunSessionResponse = await res.json(); + setMessages(data.messages ?? []); + } catch { + setError("Failed to load session"); + } finally { + setLoading(false); + } + }, [sessionId]); + + useEffect(() => { + fetchSession(); + }, [fetchSession]); + + if (loading) { + return ( +
+
+ Loading session transcript... +
+ ); + } + + if (error) { + return ( +
+ {error} +
+ ); + } + + if (messages.length === 0) { + return ( +
+ Empty session transcript. +
+ ); + } + + return ( +
+
+ Session Transcript +
+ {messages.map((msg) => ( + + ))} +
+ ); +} + +/* ─── Message rendering ─── */ + +function CronChatMessage({ message }: { message: SessionMessage }) { + const isUser = message.role === "user"; + const isSystem = message.role === "system"; + + // Group parts into segments + const segments = groupPartsIntoSegments(message.parts); + + if (isSystem) { + const textContent = message.parts + .filter((p): p is { type: "text"; text: string } => p.type === "text") + .map((p) => p.text) + .join("\n"); + return ( +
+ system: {textContent.slice(0, 500)} +
+ ); + } + + if (isUser) { + const textContent = message.parts + .filter((p): p is { type: "text"; text: string } => p.type === "text") + .map((p) => p.text) + .join("\n"); + return ( +
+
+

{textContent}

+
+
+ ); + } + + // Assistant message + return ( +
+ {segments.map((segment, idx) => { + if (segment.type === "text") { + return ( +
+ + {segment.text} + +
+ ); + } + + if (segment.type === "thinking") { + return ; + } + + if (segment.type === "tool-group") { + return ; + } + + return null; + })} +
+ ); +} + +/* ─── Part grouping ─── */ + +type ChatSegment = + | { type: "text"; text: string } + | { type: "thinking"; thinking: string } + | { type: "tool-group"; tools: Array }; + +function groupPartsIntoSegments(parts: SessionMessagePart[]): ChatSegment[] { + const segments: ChatSegment[] = []; + let toolBuffer: Array = []; + + const flushTools = () => { + if (toolBuffer.length > 0) { + segments.push({ type: "tool-group", tools: [...toolBuffer] }); + toolBuffer = []; + } + }; + + for (const part of parts) { + if (part.type === "text") { + flushTools(); + segments.push({ type: "text", text: part.text }); + } else if (part.type === "thinking") { + flushTools(); + segments.push({ type: "thinking", thinking: part.thinking }); + } else if (part.type === "tool-call") { + toolBuffer.push(part as SessionMessagePart & { type: "tool-call" }); + } + } + flushTools(); + return segments; +} + +/* ─── Thinking block (always expanded for historical runs) ─── */ + +function ThinkingBlock({ text }: { text: string }) { + const [expanded, setExpanded] = useState(true); + const isLong = text.length > 600; + + return ( +
+ + {expanded && ( +
+ {text} +
+ )} +
+ ); +} + +/* ─── Tool group ─── */ + +function ToolGroup({ tools }: { tools: Array }) { + return ( +
+ {/* Timeline connector */} +
+
+ {tools.map((tool) => ( + + ))} +
+
+ ); +} + +/* ─── Tool call step ─── */ + +function ToolCallStep({ tool }: { tool: SessionMessagePart & { type: "tool-call" } }) { + const [showOutput, setShowOutput] = useState(false); + const label = buildToolLabel(tool.toolName, tool.args); + + return ( +
+
+ +
+
+
+ {label} +
+ {tool.output && ( +
+ + {showOutput && ( +
+                {tool.output.length > 3000 ? tool.output.slice(0, 3000) + "\n..." : tool.output}
+              
+ )} +
+ )} +
+
+ ); +} + +/* ─── Tool label builder ─── */ + +function buildToolLabel(toolName: string, args?: unknown): string { + const a = args as Record | undefined; + const strVal = (key: string) => { + const v = a?.[key]; + return typeof v === "string" && v.length > 0 ? v : null; + }; + + const n = toolName.toLowerCase().replace(/[_-]/g, ""); + + if (["websearch", "search", "googlesearch"].some((k) => n.includes(k))) { + const q = strVal("query") ?? strVal("search_query") ?? strVal("q"); + return q ? `Searching: ${q.slice(0, 80)}` : "Searching..."; + } + if (["fetchurl", "fetch", "webfetch"].some((k) => n.includes(k))) { + const u = strVal("url") ?? strVal("path"); + return u ? `Fetching: ${u.slice(0, 60)}` : "Fetching page"; + } + if (["read", "readfile", "getfile"].some((k) => n.includes(k))) { + const p = strVal("path") ?? strVal("file"); + return p ? `Reading: ${p.split("/").pop()}` : "Reading file"; + } + if (["bash", "shell", "execute", "exec", "terminal"].some((k) => n.includes(k))) { + const cmd = strVal("command") ?? strVal("cmd"); + return cmd ? `Running: ${cmd.slice(0, 60)}` : "Running command"; + } + if (["write", "create", "edit", "str_replace", "save"].some((k) => n.includes(k))) { + const p = strVal("path") ?? strVal("file"); + return p ? `Editing: ${p.split("/").pop()}` : "Editing file"; + } + + return toolName.replace(/_/g, " ").replace(/\b\w/g, (c) => c.toUpperCase()); +} + +/* ─── Tool icon ─── */ + +function ToolIcon({ toolName }: { toolName: string }) { + const color = "var(--color-text-muted)"; + const n = toolName.toLowerCase().replace(/[_-]/g, ""); + + if (["search", "websearch"].some((k) => n.includes(k))) { + return ( + + + + ); + } + if (["bash", "shell", "exec", "terminal"].some((k) => n.includes(k))) { + return ( + + + + ); + } + if (["write", "edit", "create", "save"].some((k) => n.includes(k))) { + return ( + + + + + ); + } + // Default: file/read icon + return ( + + + + + ); +} diff --git a/apps/web/app/types/cron.ts b/apps/web/app/types/cron.ts new file mode 100644 index 00000000000..39f5d23db44 --- /dev/null +++ b/apps/web/app/types/cron.ts @@ -0,0 +1,116 @@ +// Client-side types mirroring src/cron/types.ts and src/cron/run-log.ts +// Stripped of server-only imports for use in the web UI. + +export type CronSchedule = + | { kind: "at"; at: string } + | { kind: "every"; everyMs: number; anchorMs?: number } + | { kind: "cron"; expr: string; tz?: string }; + +export type CronSessionTarget = "main" | "isolated"; +export type CronWakeMode = "next-heartbeat" | "now"; + +export type CronDeliveryMode = "none" | "announce"; + +export type CronDelivery = { + mode: CronDeliveryMode; + channel?: string; + to?: string; + bestEffort?: boolean; +}; + +export type CronPayload = + | { kind: "systemEvent"; text: string } + | { + kind: "agentTurn"; + message: string; + model?: string; + thinking?: string; + timeoutSeconds?: number; + deliver?: boolean; + channel?: string; + to?: string; + }; + +export type CronJobState = { + nextRunAtMs?: number; + runningAtMs?: number; + lastRunAtMs?: number; + lastStatus?: "ok" | "error" | "skipped"; + lastError?: string; + lastDurationMs?: number; + consecutiveErrors?: number; + scheduleErrorCount?: number; +}; + +export type CronJob = { + id: string; + agentId?: string; + name: string; + description?: string; + enabled: boolean; + deleteAfterRun?: boolean; + createdAtMs: number; + updatedAtMs: number; + schedule: CronSchedule; + sessionTarget: CronSessionTarget; + wakeMode: CronWakeMode; + payload: CronPayload; + delivery?: CronDelivery; + state: CronJobState; +}; + +export type CronStoreFile = { + version: 1; + jobs: CronJob[]; +}; + +export type CronRunLogEntry = { + ts: number; + jobId: string; + action: "finished"; + status?: "ok" | "error" | "skipped"; + error?: string; + summary?: string; + sessionId?: string; + sessionKey?: string; + runAtMs?: number; + durationMs?: number; + nextRunAtMs?: number; +}; + +export type HeartbeatInfo = { + intervalMs: number; + nextDueEstimateMs: number | null; +}; + +export type CronStatusInfo = { + enabled: boolean; + nextWakeAtMs: number | null; +}; + +export type CronJobsResponse = { + jobs: CronJob[]; + heartbeat: HeartbeatInfo; + cronStatus: CronStatusInfo; +}; + +export type CronRunsResponse = { + entries: CronRunLogEntry[]; +}; + +export type SessionMessagePart = + | { type: "text"; text: string } + | { type: "thinking"; thinking: string } + | { type: "tool-call"; toolName: string; toolCallId: string; args?: unknown; output?: string }; + +export type SessionMessage = { + id: string; + role: "user" | "assistant" | "system"; + parts: SessionMessagePart[]; + timestamp: string; +}; + +export type CronRunSessionResponse = { + sessionId: string; + messages: SessionMessage[]; +}; diff --git a/apps/web/app/workspace/page.tsx b/apps/web/app/workspace/page.tsx index c56555e27ec..d2c482f9904 100644 --- a/apps/web/app/workspace/page.tsx +++ b/apps/web/app/workspace/page.tsx @@ -18,6 +18,9 @@ import { ChatPanel, type ChatPanelHandle } from "../components/chat-panel"; import { EntryDetailModal } from "../components/workspace/entry-detail-modal"; import { useSearchIndex } from "@/lib/search-index"; import { parseWorkspaceLink, isWorkspaceLink } from "@/lib/workspace-links"; +import { CronDashboard } from "../components/cron/cron-dashboard"; +import { CronJobDetail } from "../components/cron/cron-job-detail"; +import type { CronJob, CronJobsResponse } from "../types/cron"; // --- Types --- @@ -82,7 +85,9 @@ type ContentState = | { kind: "media"; url: string; mediaType: MediaType; filename: string; filePath: string } | { kind: "database"; dbPath: string; filename: string } | { kind: "report"; reportPath: string; filename: string } - | { kind: "directory"; node: TreeNode }; + | { kind: "directory"; node: TreeNode } + | { kind: "cron-dashboard" } + | { kind: "cron-job"; jobId: string; job: CronJob }; type WebSession = { id: string; @@ -208,15 +213,20 @@ function WorkspacePageInner() { const [sessions, setSessions] = useState([]); const [sidebarRefreshKey, setSidebarRefreshKey] = useState(0); + // Cron jobs state + const [cronJobs, setCronJobs] = useState([]); + // Entry detail modal state const [entryModal, setEntryModal] = useState<{ objectName: string; entryId: string; } | null>(null); - // Derive file context for chat sidebar directly from activePath (stable across loading) + // Derive file context for chat sidebar directly from activePath (stable across loading). + // Exclude reserved virtual paths (~chats, ~cron, etc.) where file-scoped chat is irrelevant. const fileContext = useMemo(() => { if (!activePath) {return undefined;} + if (isVirtualPath(activePath)) {return undefined;} const filename = activePath.split("/").pop() || activePath; return { path: activePath, filename }; }, [activePath]); @@ -269,6 +279,23 @@ function WorkspacePageInner() { setSidebarRefreshKey((k) => k + 1); }, []); + // Fetch cron jobs for sidebar + const fetchCronJobs = useCallback(async () => { + try { + const res = await fetch("/api/cron/jobs"); + const data: CronJobsResponse = await res.json(); + setCronJobs(data.jobs ?? []); + } catch { + // ignore - cron might not be configured + } + }, []); + + useEffect(() => { + fetchCronJobs(); + const id = setInterval(fetchCronJobs, 30_000); + return () => clearInterval(id); + }, [fetchCronJobs]); + // Load content when path changes const loadContent = useCallback( async (node: TreeNode) => { @@ -337,7 +364,7 @@ function WorkspacePageInner() { setContent({ kind: "none" }); setActiveSessionId(sessionId); chatRef.current?.loadSession(sessionId); - router.replace("/workspace", { scroll: false }); + // URL is synced by the activeSessionId effect return; } // Clicking the Chats folder itself opens a new chat @@ -348,12 +375,30 @@ function WorkspacePageInner() { router.replace("/workspace", { scroll: false }); return; } + // Intercept cron job item clicks + if (node.path.startsWith("~cron/")) { + const jobId = node.path.slice("~cron/".length); + const job = cronJobs.find((j) => j.id === jobId); + if (job) { + setActivePath(node.path); + setContent({ kind: "cron-job", jobId, job }); + router.replace("/workspace", { scroll: false }); + return; + } + } + // Clicking the Cron folder itself opens the dashboard + if (node.path === "~cron") { + setActivePath(node.path); + setContent({ kind: "cron-dashboard" }); + router.replace("/workspace", { scroll: false }); + return; + } loadContent(node); }, - [loadContent, router], + [loadContent, router, cronJobs], ); - // Build the enhanced tree: real tree + Chats virtual folder at the bottom + // Build the enhanced tree: real tree + Chats + Cron virtual folders at the bottom const enhancedTree = useMemo(() => { const chatChildren: TreeNode[] = sessions.map((s) => ({ name: s.title || "Untitled chat", @@ -370,21 +415,56 @@ function WorkspacePageInner() { children: chatChildren.length > 0 ? chatChildren : undefined, }; - return [...tree, chatsFolder]; - }, [tree, sessions]); + const cronStatusIcon = (job: CronJob) => { + if (!job.enabled) {return "\u25CB";} // circle outline + if (job.state.runningAtMs) {return "\u25CF";} // filled circle + if (job.state.lastStatus === "error") {return "\u25C6";} // diamond + if (job.state.lastStatus === "ok") {return "\u2713";} // check + return "\u25CB"; + }; - // Sync URL bar when activePath changes + const cronChildren: TreeNode[] = cronJobs.map((j) => ({ + name: `${cronStatusIcon(j)} ${j.name}`, + path: `~cron/${j.id}`, + type: "file" as const, + virtual: true, + })); + + const cronFolder: TreeNode = { + name: "Cron", + path: "~cron", + type: "folder", + virtual: true, + children: cronChildren.length > 0 ? cronChildren : undefined, + }; + + return [...tree, chatsFolder, cronFolder]; + }, [tree, sessions, cronJobs]); + + // Sync URL bar with active content / chat state. + // Uses window.location instead of searchParams in the comparison to + // avoid a circular dependency (searchParams updates → effect fires → + // router.replace → searchParams updates → …). useEffect(() => { - const currentPath = searchParams.get("path"); - const currentEntry = searchParams.get("entry"); + const current = new URLSearchParams(window.location.search); - if (activePath && activePath !== currentPath) { - const params = new URLSearchParams(); - params.set("path", activePath); - if (currentEntry) {params.set("entry", currentEntry);} - router.replace(`/workspace?${params.toString()}`, { scroll: false }); + if (activePath) { + // File / content mode — path takes priority over chat. + if (current.get("path") !== activePath || current.has("chat")) { + const params = new URLSearchParams(); + params.set("path", activePath); + const entry = current.get("entry"); + if (entry) {params.set("entry", entry);} + router.replace(`/workspace?${params.toString()}`, { scroll: false }); + } + } else if (activeSessionId) { + // Chat mode — no file selected. + if (current.get("chat") !== activeSessionId || current.has("path")) { + router.replace(`/workspace?chat=${encodeURIComponent(activeSessionId)}`, { scroll: false }); + } } - }, [activePath, searchParams, router]); + // eslint-disable-next-line react-hooks/exhaustive-deps -- intentionally excludes searchParams to avoid infinite loop + }, [activePath, activeSessionId, router]); // Open entry modal handler const handleOpenEntry = useCallback( @@ -406,12 +486,13 @@ function WorkspacePageInner() { router.replace(qs ? `/workspace?${qs}` : "/workspace", { scroll: false }); }, [searchParams, router]); - // Auto-navigate to path from URL query param after tree loads + // Auto-navigate to path/chat from URL query params after tree loads useEffect(() => { if (initialPathHandled.current || treeLoading || tree.length === 0) {return;} const pathParam = searchParams.get("path"); const entryParam = searchParams.get("entry"); + const chatParam = searchParams.get("chat"); if (pathParam) { const node = resolveNode(tree, pathParam); @@ -419,6 +500,13 @@ function WorkspacePageInner() { initialPathHandled.current = true; loadContent(node); } + } else if (chatParam) { + // Restore the active chat session from URL + initialPathHandled.current = true; + setActiveSessionId(chatParam); + setActivePath(null); + setContent({ kind: "none" }); + chatRef.current?.loadSession(chatParam); } // Also open entry modal from URL if present @@ -529,6 +617,22 @@ function WorkspacePageInner() { [handleEditorNavigate], ); + // Cron navigation handlers + const handleSelectCronJob = useCallback((jobId: string) => { + const job = cronJobs.find((j) => j.id === jobId); + if (job) { + setActivePath(`~cron/${jobId}`); + setContent({ kind: "cron-job", jobId, job }); + router.replace("/workspace", { scroll: false }); + } + }, [cronJobs, router]); + + const handleBackToCronDashboard = useCallback(() => { + setActivePath("~cron"); + setContent({ kind: "cron-dashboard" }); + router.replace("/workspace", { scroll: false }); + }, [router]); + // Whether to show the main ChatPanel (no file/content selected) const showMainChat = !activePath || content.kind === "none"; @@ -574,21 +678,23 @@ function WorkspacePageInner() { - {/* Chat sidebar toggle */} - + {/* Chat sidebar toggle (hidden for reserved/virtual paths and directories) */} + {fileContext && content.kind !== "directory" && ( + + )}
)} @@ -623,11 +729,13 @@ function WorkspacePageInner() { onNavigate={handleEditorNavigate} onOpenEntry={handleOpenEntry} searchFn={searchIndex} + onSelectCronJob={handleSelectCronJob} + onBackToCronDashboard={handleBackToCronDashboard} /> - {/* Chat sidebar (file-scoped) */} - {fileContext && showChatSidebar && ( + {/* Chat sidebar (file-scoped) — hidden for directories and reserved paths */} + {fileContext && showChatSidebar && content.kind !== "directory" && (