Gateway: add agent abort support with session ownership and lane cleanup
This commit is contained in:
parent
6b4c24c2e5
commit
1f9d487e7a
@ -630,6 +630,24 @@ public struct AgentParams: Codable, Sendable {
|
||||
}
|
||||
}
|
||||
|
||||
public struct AgentAbortParams: Codable, Sendable {
|
||||
public let runid: String
|
||||
public let sessionkey: String?
|
||||
|
||||
public init(
|
||||
runid: String,
|
||||
sessionkey: String?)
|
||||
{
|
||||
self.runid = runid
|
||||
self.sessionkey = sessionkey
|
||||
}
|
||||
|
||||
private enum CodingKeys: String, CodingKey {
|
||||
case runid = "runId"
|
||||
case sessionkey = "sessionKey"
|
||||
}
|
||||
}
|
||||
|
||||
public struct AgentIdentityParams: Codable, Sendable {
|
||||
public let agentid: String?
|
||||
public let sessionkey: String?
|
||||
|
||||
@ -630,6 +630,24 @@ public struct AgentParams: Codable, Sendable {
|
||||
}
|
||||
}
|
||||
|
||||
public struct AgentAbortParams: Codable, Sendable {
|
||||
public let runid: String
|
||||
public let sessionkey: String?
|
||||
|
||||
public init(
|
||||
runid: String,
|
||||
sessionkey: String?)
|
||||
{
|
||||
self.runid = runid
|
||||
self.sessionkey = sessionkey
|
||||
}
|
||||
|
||||
private enum CodingKeys: String, CodingKey {
|
||||
case runid = "runId"
|
||||
case sessionkey = "sessionKey"
|
||||
}
|
||||
}
|
||||
|
||||
public struct AgentIdentityParams: Codable, Sendable {
|
||||
public let agentid: String?
|
||||
public let sessionkey: String?
|
||||
|
||||
68
src/gateway/agent-abort.ts
Normal file
68
src/gateway/agent-abort.ts
Normal file
@ -0,0 +1,68 @@
|
||||
export type AgentAbortControllerEntry = {
|
||||
controller: AbortController;
|
||||
sessionKey?: string;
|
||||
startedAtMs: number;
|
||||
expiresAtMs: number;
|
||||
clearableLanes?: string[];
|
||||
};
|
||||
|
||||
export function resolveAgentRunExpiresAtMs(params: {
|
||||
now: number;
|
||||
timeoutMs?: number;
|
||||
graceMs?: number;
|
||||
minMs?: number;
|
||||
maxMs?: number;
|
||||
}): number {
|
||||
const {
|
||||
now,
|
||||
timeoutMs = 0,
|
||||
graceMs = 60_000,
|
||||
minMs = 2 * 60_000,
|
||||
maxMs = 24 * 60 * 60_000,
|
||||
} = params;
|
||||
const boundedTimeoutMs = Math.max(0, timeoutMs);
|
||||
// Preserve no-timeout semantics: if the caller explicitly requested a timeout
|
||||
// longer than the default max (including timeout=0 → MAX_SAFE_TIMEOUT_MS),
|
||||
// skip the 24h cap to avoid force-aborting long-running workflows.
|
||||
if (boundedTimeoutMs > maxMs) {
|
||||
return now + boundedTimeoutMs + graceMs;
|
||||
}
|
||||
const target = now + boundedTimeoutMs + graceMs;
|
||||
const min = now + minMs;
|
||||
const max = now + maxMs;
|
||||
return Math.min(max, Math.max(min, target));
|
||||
}
|
||||
|
||||
export function abortAgentRunById(
|
||||
params: {
|
||||
agentAbortControllers: Map<string, AgentAbortControllerEntry>;
|
||||
runId: string;
|
||||
sessionKey?: string;
|
||||
reason?: unknown;
|
||||
},
|
||||
laneCleaner?: {
|
||||
clearSessionLane: (sessionKey: string) => void;
|
||||
clearLane: (lane: string) => void;
|
||||
},
|
||||
): { aborted: boolean } {
|
||||
const active = params.agentAbortControllers.get(params.runId);
|
||||
if (!active) {
|
||||
return { aborted: false };
|
||||
}
|
||||
if (active.sessionKey && params.sessionKey !== active.sessionKey) {
|
||||
return { aborted: false };
|
||||
}
|
||||
active.controller.abort(params.reason);
|
||||
params.agentAbortControllers.delete(params.runId);
|
||||
if (laneCleaner) {
|
||||
if (active.sessionKey) {
|
||||
laneCleaner.clearSessionLane(active.sessionKey);
|
||||
}
|
||||
if (active.clearableLanes) {
|
||||
for (const lane of active.clearableLanes) {
|
||||
laneCleaner.clearLane(lane);
|
||||
}
|
||||
}
|
||||
}
|
||||
return { aborted: true };
|
||||
}
|
||||
@ -50,6 +50,16 @@ describe("operator scope authorization", () => {
|
||||
});
|
||||
});
|
||||
|
||||
it("requires admin for agent.abort", () => {
|
||||
expect(authorizeOperatorScopesForMethod("agent.abort", ["operator.write"])).toEqual({
|
||||
allowed: false,
|
||||
missingScope: "operator.admin",
|
||||
});
|
||||
expect(authorizeOperatorScopesForMethod("agent.abort", ["operator.admin"])).toEqual({
|
||||
allowed: true,
|
||||
});
|
||||
});
|
||||
|
||||
it("requires approvals scope for approval methods", () => {
|
||||
expect(authorizeOperatorScopesForMethod("exec.approval.resolve", ["operator.write"])).toEqual({
|
||||
allowed: false,
|
||||
|
||||
@ -95,6 +95,7 @@ const METHOD_SCOPE_GROUPS: Record<OperatorScope, readonly string[]> = {
|
||||
"send",
|
||||
"poll",
|
||||
"agent",
|
||||
"agent.enqueue",
|
||||
"agent.wait",
|
||||
"wake",
|
||||
"talk.mode",
|
||||
@ -115,6 +116,7 @@ const METHOD_SCOPE_GROUPS: Record<OperatorScope, readonly string[]> = {
|
||||
"node.pending.enqueue",
|
||||
],
|
||||
[ADMIN_SCOPE]: [
|
||||
"agent.abort",
|
||||
"channels.logout",
|
||||
"agents.create",
|
||||
"agents.update",
|
||||
|
||||
@ -1,6 +1,8 @@
|
||||
import AjvPkg, { type ErrorObject } from "ajv";
|
||||
import type { SessionsPatchResult } from "../session-utils.types.js";
|
||||
import {
|
||||
type AgentAbortParams,
|
||||
AgentAbortParamsSchema,
|
||||
type AgentEvent,
|
||||
AgentEventSchema,
|
||||
type AgentIdentityParams,
|
||||
@ -277,6 +279,7 @@ export const validateEventFrame = ajv.compile<EventFrame>(EventFrameSchema);
|
||||
export const validateSendParams = ajv.compile(SendParamsSchema);
|
||||
export const validatePollParams = ajv.compile<PollParams>(PollParamsSchema);
|
||||
export const validateAgentParams = ajv.compile(AgentParamsSchema);
|
||||
export const validateAgentAbortParams = ajv.compile<AgentAbortParams>(AgentAbortParamsSchema);
|
||||
export const validateAgentIdentityParams =
|
||||
ajv.compile<AgentIdentityParams>(AgentIdentityParamsSchema);
|
||||
export const validateAgentWaitParams = ajv.compile<AgentWaitParams>(AgentWaitParamsSchema);
|
||||
@ -496,6 +499,7 @@ export {
|
||||
ErrorShapeSchema,
|
||||
StateVersionSchema,
|
||||
AgentEventSchema,
|
||||
AgentAbortParamsSchema,
|
||||
ChatEventSchema,
|
||||
SendParamsSchema,
|
||||
PollParamsSchema,
|
||||
@ -608,6 +612,7 @@ export type {
|
||||
ErrorShape,
|
||||
StateVersion,
|
||||
AgentEvent,
|
||||
AgentAbortParams,
|
||||
AgentIdentityParams,
|
||||
AgentIdentityResult,
|
||||
AgentWaitParams,
|
||||
|
||||
@ -130,6 +130,14 @@ export const AgentWaitParamsSchema = Type.Object(
|
||||
{ additionalProperties: false },
|
||||
);
|
||||
|
||||
export const AgentAbortParamsSchema = Type.Object(
|
||||
{
|
||||
runId: NonEmptyString,
|
||||
sessionKey: Type.Optional(Type.String()),
|
||||
},
|
||||
{ additionalProperties: false },
|
||||
);
|
||||
|
||||
export const WakeParamsSchema = Type.Object(
|
||||
{
|
||||
mode: Type.Union([Type.Literal("now"), Type.Literal("next-heartbeat")]),
|
||||
|
||||
@ -1,5 +1,6 @@
|
||||
import type { TSchema } from "@sinclair/typebox";
|
||||
import {
|
||||
AgentAbortParamsSchema,
|
||||
AgentEventSchema,
|
||||
AgentIdentityParamsSchema,
|
||||
AgentIdentityResultSchema,
|
||||
@ -181,6 +182,7 @@ export const ProtocolSchemas = {
|
||||
SendParams: SendParamsSchema,
|
||||
PollParams: PollParamsSchema,
|
||||
AgentParams: AgentParamsSchema,
|
||||
AgentAbortParams: AgentAbortParamsSchema,
|
||||
AgentIdentityParams: AgentIdentityParamsSchema,
|
||||
AgentIdentityResult: AgentIdentityResultSchema,
|
||||
AgentWaitParams: AgentWaitParamsSchema,
|
||||
|
||||
@ -18,6 +18,7 @@ export type AgentEvent = SchemaType<"AgentEvent">;
|
||||
export type AgentIdentityParams = SchemaType<"AgentIdentityParams">;
|
||||
export type AgentIdentityResult = SchemaType<"AgentIdentityResult">;
|
||||
export type PollParams = SchemaType<"PollParams">;
|
||||
export type AgentAbortParams = SchemaType<"AgentAbortParams">;
|
||||
export type AgentWaitParams = SchemaType<"AgentWaitParams">;
|
||||
export type WakeParams = SchemaType<"WakeParams">;
|
||||
export type NodePairRequestParams = SchemaType<"NodePairRequestParams">;
|
||||
|
||||
@ -22,6 +22,7 @@ function createMaintenanceTimerDeps() {
|
||||
refreshGatewayHealthSnapshot: async () => ({ ok: true }) as HealthSummary,
|
||||
logHealth: { error: () => {} },
|
||||
dedupe: new Map(),
|
||||
agentAbortControllers: new Map(),
|
||||
chatAbortControllers: new Map(),
|
||||
chatRunState: { abortedRuns: new Map() },
|
||||
chatRunBuffers: new Map(),
|
||||
|
||||
@ -1,5 +1,8 @@
|
||||
import { resolveSessionLane } from "../agents/pi-embedded-runner/lanes.js";
|
||||
import type { HealthSummary } from "../commands/health.js";
|
||||
import { cleanOldMedia } from "../media/store.js";
|
||||
import { clearCommandLane } from "../process/command-queue.js";
|
||||
import { abortAgentRunById, type AgentAbortControllerEntry } from "./agent-abort.js";
|
||||
import { abortChatRunById, type ChatAbortControllerEntry } from "./chat-abort.js";
|
||||
import type { ChatRunEntry } from "./server-chat.js";
|
||||
import {
|
||||
@ -27,6 +30,7 @@ export function startGatewayMaintenanceTimers(params: {
|
||||
refreshGatewayHealthSnapshot: (opts?: { probe?: boolean }) => Promise<HealthSummary>;
|
||||
logHealth: { error: (msg: string) => void };
|
||||
dedupe: Map<string, DedupeEntry>;
|
||||
agentAbortControllers: Map<string, AgentAbortControllerEntry>;
|
||||
chatAbortControllers: Map<string, ChatAbortControllerEntry>;
|
||||
chatRunState: { abortedRuns: Map<string, number> };
|
||||
chatRunBuffers: Map<string, string>;
|
||||
@ -102,6 +106,24 @@ export function startGatewayMaintenanceTimers(params: {
|
||||
}
|
||||
}
|
||||
|
||||
for (const [runId, entry] of params.agentAbortControllers) {
|
||||
if (now <= entry.expiresAtMs) {
|
||||
continue;
|
||||
}
|
||||
abortAgentRunById(
|
||||
{
|
||||
agentAbortControllers: params.agentAbortControllers,
|
||||
runId,
|
||||
sessionKey: entry.sessionKey,
|
||||
reason: "timeout",
|
||||
},
|
||||
{
|
||||
clearSessionLane: (key) => clearCommandLane(resolveSessionLane(key), { force: true }),
|
||||
clearLane: (lane) => clearCommandLane(lane, { force: true }),
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
for (const [runId, entry] of params.chatAbortControllers) {
|
||||
if (now <= entry.expiresAtMs) {
|
||||
continue;
|
||||
|
||||
@ -104,6 +104,8 @@ const BASE_METHODS = [
|
||||
"system-event",
|
||||
"send",
|
||||
"agent",
|
||||
"agent.enqueue",
|
||||
"agent.abort",
|
||||
"agent.identity.get",
|
||||
"agent.wait",
|
||||
"browser.request",
|
||||
|
||||
@ -76,6 +76,13 @@ vi.mock("../session-reset-service.js", () => ({
|
||||
(mocks.performGatewaySessionReset as (...args: unknown[]) => unknown)(...args),
|
||||
}));
|
||||
|
||||
vi.mock("../../process/command-queue.js", () => ({
|
||||
clearCommandLane: vi.fn(),
|
||||
enqueueCommandInLane: vi.fn((_lane: unknown, task: () => unknown) => task()),
|
||||
CommandLaneClearedError: class extends Error { name = "CommandLaneClearedError"; },
|
||||
GatewayDrainingError: class extends Error { name = "GatewayDrainingError"; },
|
||||
}));
|
||||
|
||||
vi.mock("../../sessions/send-policy.js", () => ({
|
||||
resolveSendPolicy: () => "allow",
|
||||
}));
|
||||
@ -92,6 +99,8 @@ vi.mock("../../utils/delivery-context.js", async () => {
|
||||
|
||||
const makeContext = (): GatewayRequestContext =>
|
||||
({
|
||||
agentAbortControllers: new Map(),
|
||||
chatAbortControllers: new Map(),
|
||||
dedupe: new Map(),
|
||||
addChatRun: vi.fn(),
|
||||
logGateway: { info: vi.fn(), error: vi.fn() },
|
||||
@ -508,6 +517,8 @@ describe("gateway agent handler", () => {
|
||||
{
|
||||
respond,
|
||||
context: {
|
||||
agentAbortControllers: new Map(),
|
||||
chatAbortControllers: new Map(),
|
||||
dedupe: new Map(),
|
||||
addChatRun: vi.fn(),
|
||||
logGateway: { info: vi.fn(), error: vi.fn() },
|
||||
@ -747,6 +758,179 @@ describe("gateway agent handler", () => {
|
||||
expect(capturedEntry?.claudeCliSessionId).toBeUndefined();
|
||||
});
|
||||
|
||||
it("aborts active agent runs by run id", () => {
|
||||
const respond = vi.fn();
|
||||
const controller = new AbortController();
|
||||
const context = makeContext();
|
||||
context.agentAbortControllers.set("run-1", {
|
||||
controller,
|
||||
sessionKey: "agent:main:main",
|
||||
startedAtMs: Date.now(),
|
||||
expiresAtMs: Date.now() + 60_000,
|
||||
});
|
||||
|
||||
void agentHandlers["agent.abort"]({
|
||||
params: { runId: "run-1", sessionKey: "agent:main:main" },
|
||||
respond: respond as never,
|
||||
context,
|
||||
req: { type: "req", id: "agent-abort-1", method: "agent.abort" },
|
||||
client: null,
|
||||
isWebchatConnect: () => false,
|
||||
});
|
||||
|
||||
expect(controller.signal.aborted).toBe(true);
|
||||
expect(context.agentAbortControllers.has("run-1")).toBe(false);
|
||||
expect(respond).toHaveBeenCalledWith(true, { ok: true, aborted: true, runId: "run-1" });
|
||||
mocks.loadConfigReturn = {};
|
||||
});
|
||||
|
||||
it("does not abort a session-bound run when sessionKey is omitted", async () => {
|
||||
const respond = vi.fn();
|
||||
const controller = new AbortController();
|
||||
const context = makeContext();
|
||||
context.agentAbortControllers.set("run-1", {
|
||||
controller,
|
||||
sessionKey: "agent:main:main",
|
||||
startedAtMs: Date.now(),
|
||||
expiresAtMs: Date.now() + 60_000,
|
||||
});
|
||||
|
||||
await agentHandlers["agent.abort"]({
|
||||
params: { runId: "run-1" },
|
||||
respond: respond as never,
|
||||
context,
|
||||
req: { type: "req", id: "agent-abort-missing-session", method: "agent.abort" },
|
||||
client: null,
|
||||
isWebchatConnect: () => false,
|
||||
});
|
||||
|
||||
expect(controller.signal.aborted).toBe(false);
|
||||
expect(context.agentAbortControllers.has("run-1")).toBe(true);
|
||||
expect(respond).toHaveBeenCalledWith(true, { ok: true, aborted: false, runId: "run-1" });
|
||||
});
|
||||
|
||||
it("canonicalizes alias session keys before aborting agent runs", async () => {
|
||||
mocks.loadConfigReturn = { session: { mainKey: "work" } };
|
||||
const respond = vi.fn();
|
||||
const controller = new AbortController();
|
||||
const context = makeContext();
|
||||
context.agentAbortControllers.set("run-1", {
|
||||
controller,
|
||||
sessionKey: "agent:main:work",
|
||||
startedAtMs: Date.now(),
|
||||
expiresAtMs: Date.now() + 60_000,
|
||||
});
|
||||
|
||||
await agentHandlers["agent.abort"]({
|
||||
params: { runId: "run-1", sessionKey: "main" },
|
||||
respond: respond as never,
|
||||
context,
|
||||
req: { type: "req", id: "agent-abort-alias-session", method: "agent.abort" },
|
||||
client: null,
|
||||
isWebchatConnect: () => false,
|
||||
});
|
||||
|
||||
expect(controller.signal.aborted).toBe(true);
|
||||
expect(context.agentAbortControllers.has("run-1")).toBe(false);
|
||||
expect(respond).toHaveBeenCalledWith(true, { ok: true, aborted: true, runId: "run-1" });
|
||||
mocks.loadConfigReturn = {};
|
||||
});
|
||||
|
||||
it("clears session lane and custom clearable lanes on successful abort", async () => {
|
||||
const { clearCommandLane } = await import("../../process/command-queue.js");
|
||||
(clearCommandLane as ReturnType<typeof vi.fn>).mockClear();
|
||||
const respond = vi.fn();
|
||||
const controller = new AbortController();
|
||||
const context = makeContext();
|
||||
context.agentAbortControllers.set("run-lane", {
|
||||
controller,
|
||||
sessionKey: "agent:main:main",
|
||||
sessionId: "test-session",
|
||||
startedAtMs: Date.now(),
|
||||
expiresAtMs: Date.now() + 60_000,
|
||||
clearableLanes: ["custom:test-lane"],
|
||||
});
|
||||
|
||||
await agentHandlers["agent.abort"]({
|
||||
params: { runId: "run-lane", sessionKey: "agent:main:main" },
|
||||
respond: respond as never,
|
||||
context,
|
||||
req: { type: "req", id: "agent-abort-lane", method: "agent.abort" },
|
||||
client: null,
|
||||
isWebchatConnect: () => false,
|
||||
});
|
||||
|
||||
expect(controller.signal.aborted).toBe(true);
|
||||
expect(respond).toHaveBeenCalledWith(true, { ok: true, aborted: true, runId: "run-lane" });
|
||||
const clearCalls = (clearCommandLane as ReturnType<typeof vi.fn>).mock.calls;
|
||||
expect(clearCalls.some((c: unknown[]) => c[0] === "session:agent:main:main")).toBe(true);
|
||||
expect(clearCalls.some((c: unknown[]) => c[0] === "custom:test-lane")).toBe(true);
|
||||
});
|
||||
|
||||
it("keeps a newer abort controller entry when an older run with the same id finishes", async () => {
|
||||
mockMainSessionEntry({ sessionId: "existing-session-id" });
|
||||
mocks.updateSessionStore.mockResolvedValue(undefined);
|
||||
|
||||
let resolveOlderRun:
|
||||
| ((value: { payloads: Array<{ text: string }>; meta: { durationMs: number } }) => void)
|
||||
| undefined;
|
||||
mocks.agentCommand.mockImplementationOnce(
|
||||
() =>
|
||||
new Promise((resolve) => {
|
||||
resolveOlderRun = resolve;
|
||||
}),
|
||||
);
|
||||
|
||||
const context = makeContext();
|
||||
const respond = vi.fn();
|
||||
await invokeAgent(
|
||||
{
|
||||
message: "older run",
|
||||
agentId: "main",
|
||||
sessionKey: "agent:main:main",
|
||||
idempotencyKey: "shared-run-id",
|
||||
},
|
||||
{ respond, reqId: "shared-run-id", context },
|
||||
);
|
||||
|
||||
const olderEntry = context.agentAbortControllers.get("shared-run-id");
|
||||
expect(olderEntry).toBeDefined();
|
||||
|
||||
const newerController = new AbortController();
|
||||
context.agentAbortControllers.set("shared-run-id", {
|
||||
controller: newerController,
|
||||
sessionKey: "agent:main:main",
|
||||
startedAtMs: Date.now(),
|
||||
expiresAtMs: Date.now() + 60_000,
|
||||
});
|
||||
|
||||
resolveOlderRun?.({
|
||||
payloads: [{ text: "ok" }],
|
||||
meta: { durationMs: 100 },
|
||||
});
|
||||
await vi.waitFor(() => expect(respond).toHaveBeenCalledTimes(2));
|
||||
|
||||
expect(context.agentAbortControllers.get("shared-run-id")?.controller).toBe(newerController);
|
||||
|
||||
const abortRespond = vi.fn();
|
||||
await agentHandlers["agent.abort"]({
|
||||
params: { runId: "shared-run-id", sessionKey: "agent:main:main" },
|
||||
respond: abortRespond as never,
|
||||
context,
|
||||
req: { type: "req", id: "agent-abort-shared", method: "agent.abort" },
|
||||
client: null,
|
||||
isWebchatConnect: () => false,
|
||||
});
|
||||
|
||||
expect(newerController.signal.aborted).toBe(true);
|
||||
expect(context.agentAbortControllers.has("shared-run-id")).toBe(false);
|
||||
expect(abortRespond).toHaveBeenCalledWith(true, {
|
||||
ok: true,
|
||||
aborted: true,
|
||||
runId: "shared-run-id",
|
||||
});
|
||||
});
|
||||
|
||||
it("prunes legacy main alias keys when writing a canonical session entry", async () => {
|
||||
mocks.loadSessionEntry.mockReturnValue({
|
||||
cfg: {
|
||||
|
||||
@ -5,6 +5,9 @@ import {
|
||||
normalizeSpawnedRunMetadata,
|
||||
resolveIngressWorkspaceOverrideForSpawnedRun,
|
||||
} from "../../agents/spawned-context.js";
|
||||
import { resolveGlobalLane, resolveSessionLane } from "../../agents/pi-embedded-runner/lanes.js";
|
||||
import { resolveAgentTimeoutMs } from "../../agents/timeout.js";
|
||||
import { clearCommandLane } from "../../process/command-queue.js";
|
||||
import { buildBareSessionResetPrompt } from "../../auto-reply/reply/session-reset-prompt.js";
|
||||
import { agentCommandFromIngress } from "../../commands/agent.js";
|
||||
import { loadConfig } from "../../config/config.js";
|
||||
@ -26,6 +29,7 @@ import { classifySessionKeyShape, normalizeAgentId } from "../../routing/session
|
||||
import { defaultRuntime } from "../../runtime.js";
|
||||
import { normalizeInputProvenance, type InputProvenance } from "../../sessions/input-provenance.js";
|
||||
import { resolveSendPolicy } from "../../sessions/send-policy.js";
|
||||
import { parseAgentSessionKey } from "../../sessions/session-key-utils.js";
|
||||
import { normalizeSessionDeliveryFields } from "../../utils/delivery-context.js";
|
||||
import {
|
||||
INTERNAL_MESSAGE_CHANNEL,
|
||||
@ -33,6 +37,7 @@ import {
|
||||
isGatewayMessageChannel,
|
||||
normalizeMessageChannel,
|
||||
} from "../../utils/message-channel.js";
|
||||
import { abortAgentRunById, resolveAgentRunExpiresAtMs } from "../agent-abort.js";
|
||||
import { resolveAssistantIdentity } from "../assistant-identity.js";
|
||||
import { parseMessageWithAttachments } from "../chat-attachments.js";
|
||||
import { resolveAssistantAvatarUrl } from "../control-ui-shared.js";
|
||||
@ -42,6 +47,7 @@ import {
|
||||
ErrorCodes,
|
||||
errorShape,
|
||||
formatValidationErrors,
|
||||
validateAgentAbortParams,
|
||||
validateAgentIdentityParams,
|
||||
validateAgentParams,
|
||||
validateAgentWaitParams,
|
||||
@ -101,6 +107,50 @@ async function runSessionResetFromAgent(params: {
|
||||
};
|
||||
}
|
||||
|
||||
function canonicalizeAbortSessionKey(params: {
|
||||
cfg: ReturnType<typeof loadConfig>;
|
||||
activeSessionKey?: string;
|
||||
requestedSessionKey?: string;
|
||||
}): string | undefined {
|
||||
const raw = params.requestedSessionKey?.trim();
|
||||
if (!raw) {
|
||||
return undefined;
|
||||
}
|
||||
const lowered = raw.toLowerCase();
|
||||
if (lowered === "global" || lowered === "unknown") {
|
||||
return lowered;
|
||||
}
|
||||
if (lowered.startsWith("agent:")) {
|
||||
const parsed = parseAgentSessionKey(lowered);
|
||||
if (parsed) {
|
||||
const canonicalMainKey = resolveAgentMainSessionKey({
|
||||
cfg: params.cfg,
|
||||
agentId: parsed.agentId,
|
||||
}).toLowerCase();
|
||||
const configuredMainAlias = canonicalMainKey.split(":").slice(2).join(":");
|
||||
if (parsed.rest === "main" || parsed.rest === configuredMainAlias) {
|
||||
return canonicalMainKey;
|
||||
}
|
||||
}
|
||||
return lowered;
|
||||
}
|
||||
const activeAgentId =
|
||||
(params.activeSessionKey ? resolveAgentIdFromSessionKey(params.activeSessionKey) : undefined) ??
|
||||
resolveAgentIdFromSessionKey(lowered);
|
||||
if (!activeAgentId) {
|
||||
return lowered;
|
||||
}
|
||||
const canonicalMainKey = resolveAgentMainSessionKey({
|
||||
cfg: params.cfg,
|
||||
agentId: activeAgentId,
|
||||
}).toLowerCase();
|
||||
const configuredMainAlias = canonicalMainKey.split(":").slice(2).join(":");
|
||||
if (lowered === "main" || lowered === configuredMainAlias) {
|
||||
return canonicalMainKey;
|
||||
}
|
||||
return `agent:${normalizeAgentId(activeAgentId)}:${lowered}`;
|
||||
}
|
||||
|
||||
function emitSessionsChanged(
|
||||
context: Pick<
|
||||
GatewayRequestHandlerOptions["context"],
|
||||
@ -142,6 +192,7 @@ function dispatchAgentRunFromGateway(params: {
|
||||
ingressOpts: Parameters<typeof agentCommandFromIngress>[0];
|
||||
runId: string;
|
||||
idempotencyKey: string;
|
||||
abortController: AbortController;
|
||||
respond: GatewayRequestHandlerOptions["respond"];
|
||||
context: GatewayRequestHandlerOptions["context"];
|
||||
}) {
|
||||
@ -187,6 +238,12 @@ function dispatchAgentRunFromGateway(params: {
|
||||
runId: params.runId,
|
||||
error: formatForLog(err),
|
||||
});
|
||||
})
|
||||
.finally(() => {
|
||||
const active = params.context.agentAbortControllers.get(params.runId);
|
||||
if (active?.controller === params.abortController) {
|
||||
params.context.agentAbortControllers.delete(params.runId);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@ -625,11 +682,33 @@ export const agentHandlers: GatewayRequestHandlers = {
|
||||
|
||||
const deliver = request.deliver === true && resolvedChannel !== INTERNAL_MESSAGE_CHANNEL;
|
||||
|
||||
const acceptedAt = Date.now();
|
||||
const accepted = {
|
||||
runId,
|
||||
status: "accepted" as const,
|
||||
acceptedAt: Date.now(),
|
||||
acceptedAt,
|
||||
};
|
||||
const agentAbortController = new AbortController();
|
||||
const requestLane = typeof request.lane === "string" ? request.lane.trim() : "";
|
||||
const resolvedLane = requestLane ? resolveGlobalLane(requestLane) : "";
|
||||
// Record the resolved lane for abort cleanup, but only if it's a custom lane
|
||||
// (i.e. the caller explicitly named it and it wasn't remapped to a builtin).
|
||||
const isClearableLane = Boolean(resolvedLane && resolvedLane === requestLane);
|
||||
context.agentAbortControllers.set(runId, {
|
||||
controller: agentAbortController,
|
||||
sessionKey: resolvedSessionKey,
|
||||
startedAtMs: acceptedAt,
|
||||
expiresAtMs: resolveAgentRunExpiresAtMs({
|
||||
now: acceptedAt,
|
||||
timeoutMs: resolveAgentTimeoutMs({
|
||||
cfg: cfgForAgent ?? cfg,
|
||||
overrideSeconds: request.timeout,
|
||||
// Subagent-lane runs default to no timeout (matching agent-command.ts behavior).
|
||||
...(request.lane === "subagent" && request.timeout == null ? { overrideMs: 0 } : {}),
|
||||
}),
|
||||
}),
|
||||
...(isClearableLane ? { clearableLanes: [resolvedLane] } : {}),
|
||||
});
|
||||
// Store an in-flight ack so retries do not spawn a second run.
|
||||
setGatewayDedupeEntry({
|
||||
dedupe: context.dedupe,
|
||||
@ -699,6 +778,7 @@ export const agentHandlers: GatewayRequestHandlers = {
|
||||
extraSystemPrompt: request.extraSystemPrompt,
|
||||
internalEvents: request.internalEvents,
|
||||
inputProvenance,
|
||||
abortSignal: agentAbortController.signal,
|
||||
// Internal-only: allow workspace override for spawned subagent runs.
|
||||
workspaceDir: resolveIngressWorkspaceOverrideForSpawnedRun({
|
||||
spawnedBy: spawnedByValue,
|
||||
@ -709,10 +789,51 @@ export const agentHandlers: GatewayRequestHandlers = {
|
||||
},
|
||||
runId,
|
||||
idempotencyKey: idem,
|
||||
abortController: agentAbortController,
|
||||
respond,
|
||||
context,
|
||||
});
|
||||
},
|
||||
"agent.enqueue": async ({ req, params, respond, context, client, isWebchatConnect }) => {
|
||||
await agentHandlers.agent({ req, params, respond, context, client, isWebchatConnect });
|
||||
},
|
||||
"agent.abort": ({ params, respond, context }) => {
|
||||
if (!validateAgentAbortParams(params)) {
|
||||
respond(
|
||||
false,
|
||||
undefined,
|
||||
errorShape(
|
||||
ErrorCodes.INVALID_REQUEST,
|
||||
`invalid agent.abort params: ${formatValidationErrors(validateAgentAbortParams.errors)}`,
|
||||
),
|
||||
);
|
||||
return;
|
||||
}
|
||||
const request = params as { runId: string; sessionKey?: string };
|
||||
const runId = request.runId;
|
||||
const activeSessionKey = context.agentAbortControllers.get(runId)?.sessionKey;
|
||||
const sessionKey = canonicalizeAbortSessionKey({
|
||||
cfg: loadConfig(),
|
||||
activeSessionKey,
|
||||
requestedSessionKey:
|
||||
typeof request.sessionKey === "string" && request.sessionKey.trim()
|
||||
? request.sessionKey.trim()
|
||||
: undefined,
|
||||
});
|
||||
const result = abortAgentRunById(
|
||||
{
|
||||
agentAbortControllers: context.agentAbortControllers,
|
||||
runId,
|
||||
sessionKey,
|
||||
reason: "rpc",
|
||||
},
|
||||
{
|
||||
clearSessionLane: (key) => clearCommandLane(resolveSessionLane(key), { force: true }),
|
||||
clearLane: (lane) => clearCommandLane(lane, { force: true }),
|
||||
},
|
||||
);
|
||||
respond(true, { ok: true, aborted: result.aborted, runId });
|
||||
},
|
||||
"agent.identity.get": ({ params, respond }) => {
|
||||
if (!validateAgentIdentityParams(params)) {
|
||||
respond(
|
||||
|
||||
@ -4,6 +4,7 @@ import type { HealthSummary } from "../../commands/health.js";
|
||||
import type { CronService } from "../../cron/service.js";
|
||||
import type { createSubsystemLogger } from "../../logging/subsystem.js";
|
||||
import type { WizardSession } from "../../wizard/session.js";
|
||||
import type { AgentAbortControllerEntry } from "../agent-abort.js";
|
||||
import type { ChatAbortControllerEntry } from "../chat-abort.js";
|
||||
import type { ExecApprovalManager } from "../exec-approval-manager.js";
|
||||
import type { NodeRegistry } from "../node-registry.js";
|
||||
@ -57,6 +58,7 @@ export type GatewayRequestContext = {
|
||||
hasExecApprovalClients?: () => boolean;
|
||||
nodeRegistry: NodeRegistry;
|
||||
agentRunSeq: Map<string, number>;
|
||||
agentAbortControllers: Map<string, AgentAbortControllerEntry>;
|
||||
chatAbortControllers: Map<string, ChatAbortControllerEntry>;
|
||||
chatAbortedRuns: Map<string, number>;
|
||||
chatRunBuffers: Map<string, string>;
|
||||
|
||||
@ -11,6 +11,7 @@ import {
|
||||
resolveActivePluginHttpRouteRegistry,
|
||||
} from "../plugins/runtime.js";
|
||||
import type { RuntimeEnv } from "../runtime.js";
|
||||
import type { AgentAbortControllerEntry } from "./agent-abort.js";
|
||||
import type { AuthRateLimiter } from "./auth-rate-limit.js";
|
||||
import type { ResolvedGatewayAuth } from "./auth.js";
|
||||
import type { ChatAbortControllerEntry } from "./chat-abort.js";
|
||||
@ -84,6 +85,7 @@ export async function createGatewayRuntimeState(params: {
|
||||
broadcast: GatewayBroadcastFn;
|
||||
broadcastToConnIds: GatewayBroadcastToConnIdsFn;
|
||||
agentRunSeq: Map<string, number>;
|
||||
agentAbortControllers: Map<string, AgentAbortControllerEntry>;
|
||||
dedupe: Map<string, DedupeEntry>;
|
||||
chatRunState: ReturnType<typeof createChatRunState>;
|
||||
chatRunBuffers: Map<string, string>;
|
||||
@ -216,6 +218,7 @@ export async function createGatewayRuntimeState(params: {
|
||||
}
|
||||
|
||||
const agentRunSeq = new Map<string, number>();
|
||||
const agentAbortControllers = new Map<string, AgentAbortControllerEntry>();
|
||||
const dedupe = new Map<string, DedupeEntry>();
|
||||
const chatRunState = createChatRunState();
|
||||
const chatRunRegistry = chatRunState.registry;
|
||||
@ -237,6 +240,7 @@ export async function createGatewayRuntimeState(params: {
|
||||
broadcast,
|
||||
broadcastToConnIds,
|
||||
agentRunSeq,
|
||||
agentAbortControllers,
|
||||
dedupe,
|
||||
chatRunState,
|
||||
chatRunBuffers,
|
||||
|
||||
@ -685,6 +685,7 @@ export async function startGatewayServer(
|
||||
broadcast,
|
||||
broadcastToConnIds,
|
||||
agentRunSeq,
|
||||
agentAbortControllers,
|
||||
dedupe,
|
||||
chatRunState,
|
||||
chatRunBuffers,
|
||||
@ -810,6 +811,7 @@ export async function startGatewayServer(
|
||||
refreshGatewayHealthSnapshot,
|
||||
logHealth,
|
||||
dedupe,
|
||||
agentAbortControllers,
|
||||
chatAbortControllers,
|
||||
chatRunState,
|
||||
chatRunBuffers,
|
||||
@ -1095,6 +1097,7 @@ export async function startGatewayServer(
|
||||
},
|
||||
nodeRegistry,
|
||||
agentRunSeq,
|
||||
agentAbortControllers,
|
||||
chatAbortControllers,
|
||||
chatAbortedRuns: chatRunState.abortedRuns,
|
||||
chatRunBuffers: chatRunState.buffers,
|
||||
|
||||
@ -220,7 +220,7 @@ export function getTotalQueueSize() {
|
||||
return total;
|
||||
}
|
||||
|
||||
export function clearCommandLane(lane: string = CommandLane.Main) {
|
||||
export function clearCommandLane(lane: string = CommandLane.Main, opts?: { force?: boolean }) {
|
||||
const cleaned = lane.trim() || CommandLane.Main;
|
||||
const state = queueState.lanes.get(cleaned);
|
||||
if (!state) {
|
||||
@ -231,6 +231,11 @@ export function clearCommandLane(lane: string = CommandLane.Main) {
|
||||
for (const entry of pending) {
|
||||
entry.reject(new CommandLaneClearedError(cleaned));
|
||||
}
|
||||
if (opts?.force) {
|
||||
state.generation += 1;
|
||||
state.activeTaskIds.clear();
|
||||
state.draining = false;
|
||||
}
|
||||
return removed;
|
||||
}
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user