diff --git a/apps/macos/Sources/OpenClawProtocol/GatewayModels.swift b/apps/macos/Sources/OpenClawProtocol/GatewayModels.swift index 0b1d7b13e01..727921eacf8 100644 --- a/apps/macos/Sources/OpenClawProtocol/GatewayModels.swift +++ b/apps/macos/Sources/OpenClawProtocol/GatewayModels.swift @@ -630,6 +630,24 @@ public struct AgentParams: Codable, Sendable { } } +public struct AgentAbortParams: Codable, Sendable { + public let runid: String + public let sessionkey: String? + + public init( + runid: String, + sessionkey: String?) + { + self.runid = runid + self.sessionkey = sessionkey + } + + private enum CodingKeys: String, CodingKey { + case runid = "runId" + case sessionkey = "sessionKey" + } +} + public struct AgentIdentityParams: Codable, Sendable { public let agentid: String? public let sessionkey: String? diff --git a/apps/shared/OpenClawKit/Sources/OpenClawProtocol/GatewayModels.swift b/apps/shared/OpenClawKit/Sources/OpenClawProtocol/GatewayModels.swift index 0b1d7b13e01..727921eacf8 100644 --- a/apps/shared/OpenClawKit/Sources/OpenClawProtocol/GatewayModels.swift +++ b/apps/shared/OpenClawKit/Sources/OpenClawProtocol/GatewayModels.swift @@ -630,6 +630,24 @@ public struct AgentParams: Codable, Sendable { } } +public struct AgentAbortParams: Codable, Sendable { + public let runid: String + public let sessionkey: String? + + public init( + runid: String, + sessionkey: String?) + { + self.runid = runid + self.sessionkey = sessionkey + } + + private enum CodingKeys: String, CodingKey { + case runid = "runId" + case sessionkey = "sessionKey" + } +} + public struct AgentIdentityParams: Codable, Sendable { public let agentid: String? public let sessionkey: String? diff --git a/src/gateway/agent-abort.ts b/src/gateway/agent-abort.ts new file mode 100644 index 00000000000..948eb509c1a --- /dev/null +++ b/src/gateway/agent-abort.ts @@ -0,0 +1,68 @@ +export type AgentAbortControllerEntry = { + controller: AbortController; + sessionKey?: string; + startedAtMs: number; + expiresAtMs: number; + clearableLanes?: string[]; +}; + +export function resolveAgentRunExpiresAtMs(params: { + now: number; + timeoutMs?: number; + graceMs?: number; + minMs?: number; + maxMs?: number; +}): number { + const { + now, + timeoutMs = 0, + graceMs = 60_000, + minMs = 2 * 60_000, + maxMs = 24 * 60 * 60_000, + } = params; + const boundedTimeoutMs = Math.max(0, timeoutMs); + // Preserve no-timeout semantics: if the caller explicitly requested a timeout + // longer than the default max (including timeout=0 → MAX_SAFE_TIMEOUT_MS), + // skip the 24h cap to avoid force-aborting long-running workflows. + if (boundedTimeoutMs > maxMs) { + return now + boundedTimeoutMs + graceMs; + } + const target = now + boundedTimeoutMs + graceMs; + const min = now + minMs; + const max = now + maxMs; + return Math.min(max, Math.max(min, target)); +} + +export function abortAgentRunById( + params: { + agentAbortControllers: Map; + runId: string; + sessionKey?: string; + reason?: unknown; + }, + laneCleaner?: { + clearSessionLane: (sessionKey: string) => void; + clearLane: (lane: string) => void; + }, +): { aborted: boolean } { + const active = params.agentAbortControllers.get(params.runId); + if (!active) { + return { aborted: false }; + } + if (active.sessionKey && params.sessionKey !== active.sessionKey) { + return { aborted: false }; + } + active.controller.abort(params.reason); + params.agentAbortControllers.delete(params.runId); + if (laneCleaner) { + if (active.sessionKey) { + laneCleaner.clearSessionLane(active.sessionKey); + } + if (active.clearableLanes) { + for (const lane of active.clearableLanes) { + laneCleaner.clearLane(lane); + } + } + } + return { aborted: true }; +} diff --git a/src/gateway/method-scopes.test.ts b/src/gateway/method-scopes.test.ts index 2edac06885f..3dd19e04274 100644 --- a/src/gateway/method-scopes.test.ts +++ b/src/gateway/method-scopes.test.ts @@ -50,6 +50,16 @@ describe("operator scope authorization", () => { }); }); + it("requires admin for agent.abort", () => { + expect(authorizeOperatorScopesForMethod("agent.abort", ["operator.write"])).toEqual({ + allowed: false, + missingScope: "operator.admin", + }); + expect(authorizeOperatorScopesForMethod("agent.abort", ["operator.admin"])).toEqual({ + allowed: true, + }); + }); + it("requires approvals scope for approval methods", () => { expect(authorizeOperatorScopesForMethod("exec.approval.resolve", ["operator.write"])).toEqual({ allowed: false, diff --git a/src/gateway/method-scopes.ts b/src/gateway/method-scopes.ts index f3a969301bf..7afa58ac299 100644 --- a/src/gateway/method-scopes.ts +++ b/src/gateway/method-scopes.ts @@ -95,6 +95,7 @@ const METHOD_SCOPE_GROUPS: Record = { "send", "poll", "agent", + "agent.enqueue", "agent.wait", "wake", "talk.mode", @@ -115,6 +116,7 @@ const METHOD_SCOPE_GROUPS: Record = { "node.pending.enqueue", ], [ADMIN_SCOPE]: [ + "agent.abort", "channels.logout", "agents.create", "agents.update", diff --git a/src/gateway/protocol/index.ts b/src/gateway/protocol/index.ts index 408074d44e4..801cb3c1635 100644 --- a/src/gateway/protocol/index.ts +++ b/src/gateway/protocol/index.ts @@ -1,6 +1,8 @@ import AjvPkg, { type ErrorObject } from "ajv"; import type { SessionsPatchResult } from "../session-utils.types.js"; import { + type AgentAbortParams, + AgentAbortParamsSchema, type AgentEvent, AgentEventSchema, type AgentIdentityParams, @@ -277,6 +279,7 @@ export const validateEventFrame = ajv.compile(EventFrameSchema); export const validateSendParams = ajv.compile(SendParamsSchema); export const validatePollParams = ajv.compile(PollParamsSchema); export const validateAgentParams = ajv.compile(AgentParamsSchema); +export const validateAgentAbortParams = ajv.compile(AgentAbortParamsSchema); export const validateAgentIdentityParams = ajv.compile(AgentIdentityParamsSchema); export const validateAgentWaitParams = ajv.compile(AgentWaitParamsSchema); @@ -496,6 +499,7 @@ export { ErrorShapeSchema, StateVersionSchema, AgentEventSchema, + AgentAbortParamsSchema, ChatEventSchema, SendParamsSchema, PollParamsSchema, @@ -608,6 +612,7 @@ export type { ErrorShape, StateVersion, AgentEvent, + AgentAbortParams, AgentIdentityParams, AgentIdentityResult, AgentWaitParams, diff --git a/src/gateway/protocol/schema/agent.ts b/src/gateway/protocol/schema/agent.ts index b9c844b135b..18279b81121 100644 --- a/src/gateway/protocol/schema/agent.ts +++ b/src/gateway/protocol/schema/agent.ts @@ -130,6 +130,14 @@ export const AgentWaitParamsSchema = Type.Object( { additionalProperties: false }, ); +export const AgentAbortParamsSchema = Type.Object( + { + runId: NonEmptyString, + sessionKey: Type.Optional(Type.String()), + }, + { additionalProperties: false }, +); + export const WakeParamsSchema = Type.Object( { mode: Type.Union([Type.Literal("now"), Type.Literal("next-heartbeat")]), diff --git a/src/gateway/protocol/schema/protocol-schemas.ts b/src/gateway/protocol/schema/protocol-schemas.ts index cf14fc44610..8eb98617b04 100644 --- a/src/gateway/protocol/schema/protocol-schemas.ts +++ b/src/gateway/protocol/schema/protocol-schemas.ts @@ -1,5 +1,6 @@ import type { TSchema } from "@sinclair/typebox"; import { + AgentAbortParamsSchema, AgentEventSchema, AgentIdentityParamsSchema, AgentIdentityResultSchema, @@ -181,6 +182,7 @@ export const ProtocolSchemas = { SendParams: SendParamsSchema, PollParams: PollParamsSchema, AgentParams: AgentParamsSchema, + AgentAbortParams: AgentAbortParamsSchema, AgentIdentityParams: AgentIdentityParamsSchema, AgentIdentityResult: AgentIdentityResultSchema, AgentWaitParams: AgentWaitParamsSchema, diff --git a/src/gateway/protocol/schema/types.ts b/src/gateway/protocol/schema/types.ts index d74c08ad10b..2cfc154cd0b 100644 --- a/src/gateway/protocol/schema/types.ts +++ b/src/gateway/protocol/schema/types.ts @@ -18,6 +18,7 @@ export type AgentEvent = SchemaType<"AgentEvent">; export type AgentIdentityParams = SchemaType<"AgentIdentityParams">; export type AgentIdentityResult = SchemaType<"AgentIdentityResult">; export type PollParams = SchemaType<"PollParams">; +export type AgentAbortParams = SchemaType<"AgentAbortParams">; export type AgentWaitParams = SchemaType<"AgentWaitParams">; export type WakeParams = SchemaType<"WakeParams">; export type NodePairRequestParams = SchemaType<"NodePairRequestParams">; diff --git a/src/gateway/server-maintenance.test.ts b/src/gateway/server-maintenance.test.ts index 045f73d802a..ccea522f7cf 100644 --- a/src/gateway/server-maintenance.test.ts +++ b/src/gateway/server-maintenance.test.ts @@ -22,6 +22,7 @@ function createMaintenanceTimerDeps() { refreshGatewayHealthSnapshot: async () => ({ ok: true }) as HealthSummary, logHealth: { error: () => {} }, dedupe: new Map(), + agentAbortControllers: new Map(), chatAbortControllers: new Map(), chatRunState: { abortedRuns: new Map() }, chatRunBuffers: new Map(), diff --git a/src/gateway/server-maintenance.ts b/src/gateway/server-maintenance.ts index 581e0d43ec3..2e29e98c603 100644 --- a/src/gateway/server-maintenance.ts +++ b/src/gateway/server-maintenance.ts @@ -1,5 +1,8 @@ +import { resolveSessionLane } from "../agents/pi-embedded-runner/lanes.js"; import type { HealthSummary } from "../commands/health.js"; import { cleanOldMedia } from "../media/store.js"; +import { clearCommandLane } from "../process/command-queue.js"; +import { abortAgentRunById, type AgentAbortControllerEntry } from "./agent-abort.js"; import { abortChatRunById, type ChatAbortControllerEntry } from "./chat-abort.js"; import type { ChatRunEntry } from "./server-chat.js"; import { @@ -27,6 +30,7 @@ export function startGatewayMaintenanceTimers(params: { refreshGatewayHealthSnapshot: (opts?: { probe?: boolean }) => Promise; logHealth: { error: (msg: string) => void }; dedupe: Map; + agentAbortControllers: Map; chatAbortControllers: Map; chatRunState: { abortedRuns: Map }; chatRunBuffers: Map; @@ -102,6 +106,24 @@ export function startGatewayMaintenanceTimers(params: { } } + for (const [runId, entry] of params.agentAbortControllers) { + if (now <= entry.expiresAtMs) { + continue; + } + abortAgentRunById( + { + agentAbortControllers: params.agentAbortControllers, + runId, + sessionKey: entry.sessionKey, + reason: "timeout", + }, + { + clearSessionLane: (key) => clearCommandLane(resolveSessionLane(key), { force: true }), + clearLane: (lane) => clearCommandLane(lane, { force: true }), + }, + ); + } + for (const [runId, entry] of params.chatAbortControllers) { if (now <= entry.expiresAtMs) { continue; diff --git a/src/gateway/server-methods-list.ts b/src/gateway/server-methods-list.ts index e930f8b0517..74858282b01 100644 --- a/src/gateway/server-methods-list.ts +++ b/src/gateway/server-methods-list.ts @@ -104,6 +104,8 @@ const BASE_METHODS = [ "system-event", "send", "agent", + "agent.enqueue", + "agent.abort", "agent.identity.get", "agent.wait", "browser.request", diff --git a/src/gateway/server-methods/agent.test.ts b/src/gateway/server-methods/agent.test.ts index f29a9a4c85d..39e4ebda788 100644 --- a/src/gateway/server-methods/agent.test.ts +++ b/src/gateway/server-methods/agent.test.ts @@ -76,6 +76,13 @@ vi.mock("../session-reset-service.js", () => ({ (mocks.performGatewaySessionReset as (...args: unknown[]) => unknown)(...args), })); +vi.mock("../../process/command-queue.js", () => ({ + clearCommandLane: vi.fn(), + enqueueCommandInLane: vi.fn((_lane: unknown, task: () => unknown) => task()), + CommandLaneClearedError: class extends Error { name = "CommandLaneClearedError"; }, + GatewayDrainingError: class extends Error { name = "GatewayDrainingError"; }, +})); + vi.mock("../../sessions/send-policy.js", () => ({ resolveSendPolicy: () => "allow", })); @@ -92,6 +99,8 @@ vi.mock("../../utils/delivery-context.js", async () => { const makeContext = (): GatewayRequestContext => ({ + agentAbortControllers: new Map(), + chatAbortControllers: new Map(), dedupe: new Map(), addChatRun: vi.fn(), logGateway: { info: vi.fn(), error: vi.fn() }, @@ -508,6 +517,8 @@ describe("gateway agent handler", () => { { respond, context: { + agentAbortControllers: new Map(), + chatAbortControllers: new Map(), dedupe: new Map(), addChatRun: vi.fn(), logGateway: { info: vi.fn(), error: vi.fn() }, @@ -747,6 +758,179 @@ describe("gateway agent handler", () => { expect(capturedEntry?.claudeCliSessionId).toBeUndefined(); }); + it("aborts active agent runs by run id", () => { + const respond = vi.fn(); + const controller = new AbortController(); + const context = makeContext(); + context.agentAbortControllers.set("run-1", { + controller, + sessionKey: "agent:main:main", + startedAtMs: Date.now(), + expiresAtMs: Date.now() + 60_000, + }); + + void agentHandlers["agent.abort"]({ + params: { runId: "run-1", sessionKey: "agent:main:main" }, + respond: respond as never, + context, + req: { type: "req", id: "agent-abort-1", method: "agent.abort" }, + client: null, + isWebchatConnect: () => false, + }); + + expect(controller.signal.aborted).toBe(true); + expect(context.agentAbortControllers.has("run-1")).toBe(false); + expect(respond).toHaveBeenCalledWith(true, { ok: true, aborted: true, runId: "run-1" }); + mocks.loadConfigReturn = {}; + }); + + it("does not abort a session-bound run when sessionKey is omitted", async () => { + const respond = vi.fn(); + const controller = new AbortController(); + const context = makeContext(); + context.agentAbortControllers.set("run-1", { + controller, + sessionKey: "agent:main:main", + startedAtMs: Date.now(), + expiresAtMs: Date.now() + 60_000, + }); + + await agentHandlers["agent.abort"]({ + params: { runId: "run-1" }, + respond: respond as never, + context, + req: { type: "req", id: "agent-abort-missing-session", method: "agent.abort" }, + client: null, + isWebchatConnect: () => false, + }); + + expect(controller.signal.aborted).toBe(false); + expect(context.agentAbortControllers.has("run-1")).toBe(true); + expect(respond).toHaveBeenCalledWith(true, { ok: true, aborted: false, runId: "run-1" }); + }); + + it("canonicalizes alias session keys before aborting agent runs", async () => { + mocks.loadConfigReturn = { session: { mainKey: "work" } }; + const respond = vi.fn(); + const controller = new AbortController(); + const context = makeContext(); + context.agentAbortControllers.set("run-1", { + controller, + sessionKey: "agent:main:work", + startedAtMs: Date.now(), + expiresAtMs: Date.now() + 60_000, + }); + + await agentHandlers["agent.abort"]({ + params: { runId: "run-1", sessionKey: "main" }, + respond: respond as never, + context, + req: { type: "req", id: "agent-abort-alias-session", method: "agent.abort" }, + client: null, + isWebchatConnect: () => false, + }); + + expect(controller.signal.aborted).toBe(true); + expect(context.agentAbortControllers.has("run-1")).toBe(false); + expect(respond).toHaveBeenCalledWith(true, { ok: true, aborted: true, runId: "run-1" }); + mocks.loadConfigReturn = {}; + }); + + it("clears session lane and custom clearable lanes on successful abort", async () => { + const { clearCommandLane } = await import("../../process/command-queue.js"); + (clearCommandLane as ReturnType).mockClear(); + const respond = vi.fn(); + const controller = new AbortController(); + const context = makeContext(); + context.agentAbortControllers.set("run-lane", { + controller, + sessionKey: "agent:main:main", + sessionId: "test-session", + startedAtMs: Date.now(), + expiresAtMs: Date.now() + 60_000, + clearableLanes: ["custom:test-lane"], + }); + + await agentHandlers["agent.abort"]({ + params: { runId: "run-lane", sessionKey: "agent:main:main" }, + respond: respond as never, + context, + req: { type: "req", id: "agent-abort-lane", method: "agent.abort" }, + client: null, + isWebchatConnect: () => false, + }); + + expect(controller.signal.aborted).toBe(true); + expect(respond).toHaveBeenCalledWith(true, { ok: true, aborted: true, runId: "run-lane" }); + const clearCalls = (clearCommandLane as ReturnType).mock.calls; + expect(clearCalls.some((c: unknown[]) => c[0] === "session:agent:main:main")).toBe(true); + expect(clearCalls.some((c: unknown[]) => c[0] === "custom:test-lane")).toBe(true); + }); + + it("keeps a newer abort controller entry when an older run with the same id finishes", async () => { + mockMainSessionEntry({ sessionId: "existing-session-id" }); + mocks.updateSessionStore.mockResolvedValue(undefined); + + let resolveOlderRun: + | ((value: { payloads: Array<{ text: string }>; meta: { durationMs: number } }) => void) + | undefined; + mocks.agentCommand.mockImplementationOnce( + () => + new Promise((resolve) => { + resolveOlderRun = resolve; + }), + ); + + const context = makeContext(); + const respond = vi.fn(); + await invokeAgent( + { + message: "older run", + agentId: "main", + sessionKey: "agent:main:main", + idempotencyKey: "shared-run-id", + }, + { respond, reqId: "shared-run-id", context }, + ); + + const olderEntry = context.agentAbortControllers.get("shared-run-id"); + expect(olderEntry).toBeDefined(); + + const newerController = new AbortController(); + context.agentAbortControllers.set("shared-run-id", { + controller: newerController, + sessionKey: "agent:main:main", + startedAtMs: Date.now(), + expiresAtMs: Date.now() + 60_000, + }); + + resolveOlderRun?.({ + payloads: [{ text: "ok" }], + meta: { durationMs: 100 }, + }); + await vi.waitFor(() => expect(respond).toHaveBeenCalledTimes(2)); + + expect(context.agentAbortControllers.get("shared-run-id")?.controller).toBe(newerController); + + const abortRespond = vi.fn(); + await agentHandlers["agent.abort"]({ + params: { runId: "shared-run-id", sessionKey: "agent:main:main" }, + respond: abortRespond as never, + context, + req: { type: "req", id: "agent-abort-shared", method: "agent.abort" }, + client: null, + isWebchatConnect: () => false, + }); + + expect(newerController.signal.aborted).toBe(true); + expect(context.agentAbortControllers.has("shared-run-id")).toBe(false); + expect(abortRespond).toHaveBeenCalledWith(true, { + ok: true, + aborted: true, + runId: "shared-run-id", + }); + }); + it("prunes legacy main alias keys when writing a canonical session entry", async () => { mocks.loadSessionEntry.mockReturnValue({ cfg: { diff --git a/src/gateway/server-methods/agent.ts b/src/gateway/server-methods/agent.ts index bd5637fa78f..b60f0beaad5 100644 --- a/src/gateway/server-methods/agent.ts +++ b/src/gateway/server-methods/agent.ts @@ -5,6 +5,9 @@ import { normalizeSpawnedRunMetadata, resolveIngressWorkspaceOverrideForSpawnedRun, } from "../../agents/spawned-context.js"; +import { resolveGlobalLane, resolveSessionLane } from "../../agents/pi-embedded-runner/lanes.js"; +import { resolveAgentTimeoutMs } from "../../agents/timeout.js"; +import { clearCommandLane } from "../../process/command-queue.js"; import { buildBareSessionResetPrompt } from "../../auto-reply/reply/session-reset-prompt.js"; import { agentCommandFromIngress } from "../../commands/agent.js"; import { loadConfig } from "../../config/config.js"; @@ -26,6 +29,7 @@ import { classifySessionKeyShape, normalizeAgentId } from "../../routing/session import { defaultRuntime } from "../../runtime.js"; import { normalizeInputProvenance, type InputProvenance } from "../../sessions/input-provenance.js"; import { resolveSendPolicy } from "../../sessions/send-policy.js"; +import { parseAgentSessionKey } from "../../sessions/session-key-utils.js"; import { normalizeSessionDeliveryFields } from "../../utils/delivery-context.js"; import { INTERNAL_MESSAGE_CHANNEL, @@ -33,6 +37,7 @@ import { isGatewayMessageChannel, normalizeMessageChannel, } from "../../utils/message-channel.js"; +import { abortAgentRunById, resolveAgentRunExpiresAtMs } from "../agent-abort.js"; import { resolveAssistantIdentity } from "../assistant-identity.js"; import { parseMessageWithAttachments } from "../chat-attachments.js"; import { resolveAssistantAvatarUrl } from "../control-ui-shared.js"; @@ -42,6 +47,7 @@ import { ErrorCodes, errorShape, formatValidationErrors, + validateAgentAbortParams, validateAgentIdentityParams, validateAgentParams, validateAgentWaitParams, @@ -101,6 +107,50 @@ async function runSessionResetFromAgent(params: { }; } +function canonicalizeAbortSessionKey(params: { + cfg: ReturnType; + activeSessionKey?: string; + requestedSessionKey?: string; +}): string | undefined { + const raw = params.requestedSessionKey?.trim(); + if (!raw) { + return undefined; + } + const lowered = raw.toLowerCase(); + if (lowered === "global" || lowered === "unknown") { + return lowered; + } + if (lowered.startsWith("agent:")) { + const parsed = parseAgentSessionKey(lowered); + if (parsed) { + const canonicalMainKey = resolveAgentMainSessionKey({ + cfg: params.cfg, + agentId: parsed.agentId, + }).toLowerCase(); + const configuredMainAlias = canonicalMainKey.split(":").slice(2).join(":"); + if (parsed.rest === "main" || parsed.rest === configuredMainAlias) { + return canonicalMainKey; + } + } + return lowered; + } + const activeAgentId = + (params.activeSessionKey ? resolveAgentIdFromSessionKey(params.activeSessionKey) : undefined) ?? + resolveAgentIdFromSessionKey(lowered); + if (!activeAgentId) { + return lowered; + } + const canonicalMainKey = resolveAgentMainSessionKey({ + cfg: params.cfg, + agentId: activeAgentId, + }).toLowerCase(); + const configuredMainAlias = canonicalMainKey.split(":").slice(2).join(":"); + if (lowered === "main" || lowered === configuredMainAlias) { + return canonicalMainKey; + } + return `agent:${normalizeAgentId(activeAgentId)}:${lowered}`; +} + function emitSessionsChanged( context: Pick< GatewayRequestHandlerOptions["context"], @@ -142,6 +192,7 @@ function dispatchAgentRunFromGateway(params: { ingressOpts: Parameters[0]; runId: string; idempotencyKey: string; + abortController: AbortController; respond: GatewayRequestHandlerOptions["respond"]; context: GatewayRequestHandlerOptions["context"]; }) { @@ -187,6 +238,12 @@ function dispatchAgentRunFromGateway(params: { runId: params.runId, error: formatForLog(err), }); + }) + .finally(() => { + const active = params.context.agentAbortControllers.get(params.runId); + if (active?.controller === params.abortController) { + params.context.agentAbortControllers.delete(params.runId); + } }); } @@ -625,11 +682,33 @@ export const agentHandlers: GatewayRequestHandlers = { const deliver = request.deliver === true && resolvedChannel !== INTERNAL_MESSAGE_CHANNEL; + const acceptedAt = Date.now(); const accepted = { runId, status: "accepted" as const, - acceptedAt: Date.now(), + acceptedAt, }; + const agentAbortController = new AbortController(); + const requestLane = typeof request.lane === "string" ? request.lane.trim() : ""; + const resolvedLane = requestLane ? resolveGlobalLane(requestLane) : ""; + // Record the resolved lane for abort cleanup, but only if it's a custom lane + // (i.e. the caller explicitly named it and it wasn't remapped to a builtin). + const isClearableLane = Boolean(resolvedLane && resolvedLane === requestLane); + context.agentAbortControllers.set(runId, { + controller: agentAbortController, + sessionKey: resolvedSessionKey, + startedAtMs: acceptedAt, + expiresAtMs: resolveAgentRunExpiresAtMs({ + now: acceptedAt, + timeoutMs: resolveAgentTimeoutMs({ + cfg: cfgForAgent ?? cfg, + overrideSeconds: request.timeout, + // Subagent-lane runs default to no timeout (matching agent-command.ts behavior). + ...(request.lane === "subagent" && request.timeout == null ? { overrideMs: 0 } : {}), + }), + }), + ...(isClearableLane ? { clearableLanes: [resolvedLane] } : {}), + }); // Store an in-flight ack so retries do not spawn a second run. setGatewayDedupeEntry({ dedupe: context.dedupe, @@ -699,6 +778,7 @@ export const agentHandlers: GatewayRequestHandlers = { extraSystemPrompt: request.extraSystemPrompt, internalEvents: request.internalEvents, inputProvenance, + abortSignal: agentAbortController.signal, // Internal-only: allow workspace override for spawned subagent runs. workspaceDir: resolveIngressWorkspaceOverrideForSpawnedRun({ spawnedBy: spawnedByValue, @@ -709,10 +789,51 @@ export const agentHandlers: GatewayRequestHandlers = { }, runId, idempotencyKey: idem, + abortController: agentAbortController, respond, context, }); }, + "agent.enqueue": async ({ req, params, respond, context, client, isWebchatConnect }) => { + await agentHandlers.agent({ req, params, respond, context, client, isWebchatConnect }); + }, + "agent.abort": ({ params, respond, context }) => { + if (!validateAgentAbortParams(params)) { + respond( + false, + undefined, + errorShape( + ErrorCodes.INVALID_REQUEST, + `invalid agent.abort params: ${formatValidationErrors(validateAgentAbortParams.errors)}`, + ), + ); + return; + } + const request = params as { runId: string; sessionKey?: string }; + const runId = request.runId; + const activeSessionKey = context.agentAbortControllers.get(runId)?.sessionKey; + const sessionKey = canonicalizeAbortSessionKey({ + cfg: loadConfig(), + activeSessionKey, + requestedSessionKey: + typeof request.sessionKey === "string" && request.sessionKey.trim() + ? request.sessionKey.trim() + : undefined, + }); + const result = abortAgentRunById( + { + agentAbortControllers: context.agentAbortControllers, + runId, + sessionKey, + reason: "rpc", + }, + { + clearSessionLane: (key) => clearCommandLane(resolveSessionLane(key), { force: true }), + clearLane: (lane) => clearCommandLane(lane, { force: true }), + }, + ); + respond(true, { ok: true, aborted: result.aborted, runId }); + }, "agent.identity.get": ({ params, respond }) => { if (!validateAgentIdentityParams(params)) { respond( diff --git a/src/gateway/server-methods/types.ts b/src/gateway/server-methods/types.ts index 39a6f458a5f..d0a8276568b 100644 --- a/src/gateway/server-methods/types.ts +++ b/src/gateway/server-methods/types.ts @@ -4,6 +4,7 @@ import type { HealthSummary } from "../../commands/health.js"; import type { CronService } from "../../cron/service.js"; import type { createSubsystemLogger } from "../../logging/subsystem.js"; import type { WizardSession } from "../../wizard/session.js"; +import type { AgentAbortControllerEntry } from "../agent-abort.js"; import type { ChatAbortControllerEntry } from "../chat-abort.js"; import type { ExecApprovalManager } from "../exec-approval-manager.js"; import type { NodeRegistry } from "../node-registry.js"; @@ -57,6 +58,7 @@ export type GatewayRequestContext = { hasExecApprovalClients?: () => boolean; nodeRegistry: NodeRegistry; agentRunSeq: Map; + agentAbortControllers: Map; chatAbortControllers: Map; chatAbortedRuns: Map; chatRunBuffers: Map; diff --git a/src/gateway/server-runtime-state.ts b/src/gateway/server-runtime-state.ts index 50bc491f421..3ea22290103 100644 --- a/src/gateway/server-runtime-state.ts +++ b/src/gateway/server-runtime-state.ts @@ -11,6 +11,7 @@ import { resolveActivePluginHttpRouteRegistry, } from "../plugins/runtime.js"; import type { RuntimeEnv } from "../runtime.js"; +import type { AgentAbortControllerEntry } from "./agent-abort.js"; import type { AuthRateLimiter } from "./auth-rate-limit.js"; import type { ResolvedGatewayAuth } from "./auth.js"; import type { ChatAbortControllerEntry } from "./chat-abort.js"; @@ -84,6 +85,7 @@ export async function createGatewayRuntimeState(params: { broadcast: GatewayBroadcastFn; broadcastToConnIds: GatewayBroadcastToConnIdsFn; agentRunSeq: Map; + agentAbortControllers: Map; dedupe: Map; chatRunState: ReturnType; chatRunBuffers: Map; @@ -216,6 +218,7 @@ export async function createGatewayRuntimeState(params: { } const agentRunSeq = new Map(); + const agentAbortControllers = new Map(); const dedupe = new Map(); const chatRunState = createChatRunState(); const chatRunRegistry = chatRunState.registry; @@ -237,6 +240,7 @@ export async function createGatewayRuntimeState(params: { broadcast, broadcastToConnIds, agentRunSeq, + agentAbortControllers, dedupe, chatRunState, chatRunBuffers, diff --git a/src/gateway/server.impl.ts b/src/gateway/server.impl.ts index 7a4c18b6593..5400c099c29 100644 --- a/src/gateway/server.impl.ts +++ b/src/gateway/server.impl.ts @@ -685,6 +685,7 @@ export async function startGatewayServer( broadcast, broadcastToConnIds, agentRunSeq, + agentAbortControllers, dedupe, chatRunState, chatRunBuffers, @@ -810,6 +811,7 @@ export async function startGatewayServer( refreshGatewayHealthSnapshot, logHealth, dedupe, + agentAbortControllers, chatAbortControllers, chatRunState, chatRunBuffers, @@ -1095,6 +1097,7 @@ export async function startGatewayServer( }, nodeRegistry, agentRunSeq, + agentAbortControllers, chatAbortControllers, chatAbortedRuns: chatRunState.abortedRuns, chatRunBuffers: chatRunState.buffers, diff --git a/src/process/command-queue.ts b/src/process/command-queue.ts index 956b386a6bf..8a3b45e12d9 100644 --- a/src/process/command-queue.ts +++ b/src/process/command-queue.ts @@ -220,7 +220,7 @@ export function getTotalQueueSize() { return total; } -export function clearCommandLane(lane: string = CommandLane.Main) { +export function clearCommandLane(lane: string = CommandLane.Main, opts?: { force?: boolean }) { const cleaned = lane.trim() || CommandLane.Main; const state = queueState.lanes.get(cleaned); if (!state) { @@ -231,6 +231,11 @@ export function clearCommandLane(lane: string = CommandLane.Main) { for (const entry of pending) { entry.reject(new CommandLaneClearedError(cleaned)); } + if (opts?.force) { + state.generation += 1; + state.activeTaskIds.clear(); + state.draining = false; + } return removed; }