Compare commits
3 Commits
main
...
gateway/no
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f2c544f057 | ||
|
|
c49af9ea7c | ||
|
|
54536e48f5 |
@ -18,6 +18,10 @@ describe("method scope resolution", () => {
|
|||||||
expect(resolveLeastPrivilegeOperatorScopesForMethod("poll")).toEqual(["operator.write"]);
|
expect(resolveLeastPrivilegeOperatorScopesForMethod("poll")).toEqual(["operator.write"]);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it("leaves node-only pending drain outside operator scopes", () => {
|
||||||
|
expect(resolveLeastPrivilegeOperatorScopesForMethod("node.pending.drain")).toEqual([]);
|
||||||
|
});
|
||||||
|
|
||||||
it("returns empty scopes for unknown methods", () => {
|
it("returns empty scopes for unknown methods", () => {
|
||||||
expect(resolveLeastPrivilegeOperatorScopesForMethod("totally.unknown.method")).toEqual([]);
|
expect(resolveLeastPrivilegeOperatorScopesForMethod("totally.unknown.method")).toEqual([]);
|
||||||
});
|
});
|
||||||
|
|||||||
@ -22,6 +22,7 @@ export const CLI_DEFAULT_OPERATOR_SCOPES: OperatorScope[] = [
|
|||||||
const NODE_ROLE_METHODS = new Set([
|
const NODE_ROLE_METHODS = new Set([
|
||||||
"node.invoke.result",
|
"node.invoke.result",
|
||||||
"node.event",
|
"node.event",
|
||||||
|
"node.pending.drain",
|
||||||
"node.canvas.capability.refresh",
|
"node.canvas.capability.refresh",
|
||||||
"node.pending.pull",
|
"node.pending.pull",
|
||||||
"node.pending.ack",
|
"node.pending.ack",
|
||||||
@ -102,6 +103,7 @@ const METHOD_SCOPE_GROUPS: Record<OperatorScope, readonly string[]> = {
|
|||||||
"chat.abort",
|
"chat.abort",
|
||||||
"browser.request",
|
"browser.request",
|
||||||
"push.test",
|
"push.test",
|
||||||
|
"node.pending.enqueue",
|
||||||
],
|
],
|
||||||
[ADMIN_SCOPE]: [
|
[ADMIN_SCOPE]: [
|
||||||
"channels.logout",
|
"channels.logout",
|
||||||
|
|||||||
67
src/gateway/node-pending-work.test.ts
Normal file
67
src/gateway/node-pending-work.test.ts
Normal file
@ -0,0 +1,67 @@
|
|||||||
|
import { describe, expect, it, beforeEach } from "vitest";
|
||||||
|
import {
|
||||||
|
acknowledgeNodePendingWork,
|
||||||
|
drainNodePendingWork,
|
||||||
|
enqueueNodePendingWork,
|
||||||
|
getNodePendingWorkStateCountForTests,
|
||||||
|
resetNodePendingWorkForTests,
|
||||||
|
} from "./node-pending-work.js";
|
||||||
|
|
||||||
|
describe("node pending work", () => {
|
||||||
|
beforeEach(() => {
|
||||||
|
resetNodePendingWorkForTests();
|
||||||
|
});
|
||||||
|
|
||||||
|
it("returns a baseline status request even when no explicit work is queued", () => {
|
||||||
|
const drained = drainNodePendingWork("node-1");
|
||||||
|
expect(drained.items).toEqual([
|
||||||
|
expect.objectContaining({
|
||||||
|
id: "baseline-status",
|
||||||
|
type: "status.request",
|
||||||
|
priority: "default",
|
||||||
|
}),
|
||||||
|
]);
|
||||||
|
expect(drained.hasMore).toBe(false);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("dedupes explicit work by type and removes acknowledged items", () => {
|
||||||
|
const first = enqueueNodePendingWork({ nodeId: "node-2", type: "location.request" });
|
||||||
|
const second = enqueueNodePendingWork({ nodeId: "node-2", type: "location.request" });
|
||||||
|
|
||||||
|
expect(first.deduped).toBe(false);
|
||||||
|
expect(second.deduped).toBe(true);
|
||||||
|
expect(second.item.id).toBe(first.item.id);
|
||||||
|
|
||||||
|
const drained = drainNodePendingWork("node-2");
|
||||||
|
expect(drained.items.map((item) => item.type)).toEqual(["location.request", "status.request"]);
|
||||||
|
|
||||||
|
const acked = acknowledgeNodePendingWork({
|
||||||
|
nodeId: "node-2",
|
||||||
|
itemIds: [first.item.id, "baseline-status"],
|
||||||
|
});
|
||||||
|
expect(acked.removedItemIds).toEqual([first.item.id]);
|
||||||
|
|
||||||
|
const afterAck = drainNodePendingWork("node-2");
|
||||||
|
expect(afterAck.items.map((item) => item.id)).toEqual(["baseline-status"]);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("keeps hasMore true when the baseline status item is deferred by maxItems", () => {
|
||||||
|
enqueueNodePendingWork({ nodeId: "node-3", type: "location.request" });
|
||||||
|
|
||||||
|
const drained = drainNodePendingWork("node-3", { maxItems: 1 });
|
||||||
|
|
||||||
|
expect(drained.items.map((item) => item.type)).toEqual(["location.request"]);
|
||||||
|
expect(drained.hasMore).toBe(true);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("does not allocate state for drain-only nodes with no queued work", () => {
|
||||||
|
expect(getNodePendingWorkStateCountForTests()).toBe(0);
|
||||||
|
|
||||||
|
const drained = drainNodePendingWork("node-4");
|
||||||
|
const acked = acknowledgeNodePendingWork({ nodeId: "node-4", itemIds: ["baseline-status"] });
|
||||||
|
|
||||||
|
expect(drained.items.map((item) => item.id)).toEqual(["baseline-status"]);
|
||||||
|
expect(acked).toEqual({ revision: 0, removedItemIds: [] });
|
||||||
|
expect(getNodePendingWorkStateCountForTests()).toBe(0);
|
||||||
|
});
|
||||||
|
});
|
||||||
193
src/gateway/node-pending-work.ts
Normal file
193
src/gateway/node-pending-work.ts
Normal file
@ -0,0 +1,193 @@
|
|||||||
|
import { randomUUID } from "node:crypto";
|
||||||
|
|
||||||
|
export const NODE_PENDING_WORK_TYPES = ["status.request", "location.request"] as const;
|
||||||
|
export type NodePendingWorkType = (typeof NODE_PENDING_WORK_TYPES)[number];
|
||||||
|
|
||||||
|
export const NODE_PENDING_WORK_PRIORITIES = ["default", "normal", "high"] as const;
|
||||||
|
export type NodePendingWorkPriority = (typeof NODE_PENDING_WORK_PRIORITIES)[number];
|
||||||
|
|
||||||
|
export type NodePendingWorkItem = {
|
||||||
|
id: string;
|
||||||
|
type: NodePendingWorkType;
|
||||||
|
priority: NodePendingWorkPriority;
|
||||||
|
createdAtMs: number;
|
||||||
|
expiresAtMs: number | null;
|
||||||
|
payload?: Record<string, unknown>;
|
||||||
|
};
|
||||||
|
|
||||||
|
type NodePendingWorkState = {
|
||||||
|
revision: number;
|
||||||
|
itemsById: Map<string, NodePendingWorkItem>;
|
||||||
|
};
|
||||||
|
|
||||||
|
type DrainOptions = {
|
||||||
|
maxItems?: number;
|
||||||
|
includeDefaultStatus?: boolean;
|
||||||
|
nowMs?: number;
|
||||||
|
};
|
||||||
|
|
||||||
|
type DrainResult = {
|
||||||
|
revision: number;
|
||||||
|
items: NodePendingWorkItem[];
|
||||||
|
hasMore: boolean;
|
||||||
|
};
|
||||||
|
|
||||||
|
const DEFAULT_STATUS_ITEM_ID = "baseline-status";
|
||||||
|
const DEFAULT_STATUS_PRIORITY: NodePendingWorkPriority = "default";
|
||||||
|
const DEFAULT_PRIORITY: NodePendingWorkPriority = "normal";
|
||||||
|
const DEFAULT_MAX_ITEMS = 4;
|
||||||
|
const MAX_ITEMS = 10;
|
||||||
|
const PRIORITY_RANK: Record<NodePendingWorkPriority, number> = {
|
||||||
|
high: 3,
|
||||||
|
normal: 2,
|
||||||
|
default: 1,
|
||||||
|
};
|
||||||
|
|
||||||
|
const stateByNodeId = new Map<string, NodePendingWorkState>();
|
||||||
|
|
||||||
|
function getOrCreateState(nodeId: string): NodePendingWorkState {
|
||||||
|
let state = stateByNodeId.get(nodeId);
|
||||||
|
if (!state) {
|
||||||
|
state = {
|
||||||
|
revision: 0,
|
||||||
|
itemsById: new Map(),
|
||||||
|
};
|
||||||
|
stateByNodeId.set(nodeId, state);
|
||||||
|
}
|
||||||
|
return state;
|
||||||
|
}
|
||||||
|
|
||||||
|
function pruneExpired(state: NodePendingWorkState, nowMs: number): boolean {
|
||||||
|
let changed = false;
|
||||||
|
for (const [id, item] of state.itemsById) {
|
||||||
|
if (item.expiresAtMs !== null && item.expiresAtMs <= nowMs) {
|
||||||
|
state.itemsById.delete(id);
|
||||||
|
changed = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (changed) {
|
||||||
|
state.revision += 1;
|
||||||
|
}
|
||||||
|
return changed;
|
||||||
|
}
|
||||||
|
|
||||||
|
function sortedItems(state: NodePendingWorkState): NodePendingWorkItem[] {
|
||||||
|
return [...state.itemsById.values()].toSorted((a, b) => {
|
||||||
|
const priorityDelta = PRIORITY_RANK[b.priority] - PRIORITY_RANK[a.priority];
|
||||||
|
if (priorityDelta !== 0) {
|
||||||
|
return priorityDelta;
|
||||||
|
}
|
||||||
|
if (a.createdAtMs !== b.createdAtMs) {
|
||||||
|
return a.createdAtMs - b.createdAtMs;
|
||||||
|
}
|
||||||
|
return a.id.localeCompare(b.id);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
function makeBaselineStatusItem(nowMs: number): NodePendingWorkItem {
|
||||||
|
return {
|
||||||
|
id: DEFAULT_STATUS_ITEM_ID,
|
||||||
|
type: "status.request",
|
||||||
|
priority: DEFAULT_STATUS_PRIORITY,
|
||||||
|
createdAtMs: nowMs,
|
||||||
|
expiresAtMs: null,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
export function enqueueNodePendingWork(params: {
|
||||||
|
nodeId: string;
|
||||||
|
type: NodePendingWorkType;
|
||||||
|
priority?: NodePendingWorkPriority;
|
||||||
|
expiresInMs?: number;
|
||||||
|
payload?: Record<string, unknown>;
|
||||||
|
}): { revision: number; item: NodePendingWorkItem; deduped: boolean } {
|
||||||
|
const nodeId = params.nodeId.trim();
|
||||||
|
if (!nodeId) {
|
||||||
|
throw new Error("nodeId required");
|
||||||
|
}
|
||||||
|
const nowMs = Date.now();
|
||||||
|
const state = getOrCreateState(nodeId);
|
||||||
|
pruneExpired(state, nowMs);
|
||||||
|
const existing = [...state.itemsById.values()].find((item) => item.type === params.type);
|
||||||
|
if (existing) {
|
||||||
|
return { revision: state.revision, item: existing, deduped: true };
|
||||||
|
}
|
||||||
|
const item: NodePendingWorkItem = {
|
||||||
|
id: randomUUID(),
|
||||||
|
type: params.type,
|
||||||
|
priority: params.priority ?? DEFAULT_PRIORITY,
|
||||||
|
createdAtMs: nowMs,
|
||||||
|
expiresAtMs:
|
||||||
|
typeof params.expiresInMs === "number" && Number.isFinite(params.expiresInMs)
|
||||||
|
? nowMs + Math.max(1_000, Math.trunc(params.expiresInMs))
|
||||||
|
: null,
|
||||||
|
...(params.payload ? { payload: params.payload } : {}),
|
||||||
|
};
|
||||||
|
state.itemsById.set(item.id, item);
|
||||||
|
state.revision += 1;
|
||||||
|
return { revision: state.revision, item, deduped: false };
|
||||||
|
}
|
||||||
|
|
||||||
|
export function drainNodePendingWork(nodeId: string, opts: DrainOptions = {}): DrainResult {
|
||||||
|
const normalizedNodeId = nodeId.trim();
|
||||||
|
if (!normalizedNodeId) {
|
||||||
|
return { revision: 0, items: [], hasMore: false };
|
||||||
|
}
|
||||||
|
const nowMs = opts.nowMs ?? Date.now();
|
||||||
|
const state = stateByNodeId.get(normalizedNodeId);
|
||||||
|
const revision = state?.revision ?? 0;
|
||||||
|
if (state) {
|
||||||
|
pruneExpired(state, nowMs);
|
||||||
|
}
|
||||||
|
const maxItems = Math.min(MAX_ITEMS, Math.max(1, Math.trunc(opts.maxItems ?? DEFAULT_MAX_ITEMS)));
|
||||||
|
const explicitItems = state ? sortedItems(state) : [];
|
||||||
|
const items = explicitItems.slice(0, maxItems);
|
||||||
|
const hasExplicitStatus = explicitItems.some((item) => item.type === "status.request");
|
||||||
|
const includeBaseline = opts.includeDefaultStatus !== false && !hasExplicitStatus;
|
||||||
|
if (includeBaseline && items.length < maxItems) {
|
||||||
|
items.push(makeBaselineStatusItem(nowMs));
|
||||||
|
}
|
||||||
|
const explicitReturnedCount = items.filter((item) => item.id !== DEFAULT_STATUS_ITEM_ID).length;
|
||||||
|
const baselineIncluded = items.some((item) => item.id === DEFAULT_STATUS_ITEM_ID);
|
||||||
|
return {
|
||||||
|
revision,
|
||||||
|
items,
|
||||||
|
hasMore: explicitItems.length > explicitReturnedCount || (includeBaseline && !baselineIncluded),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
export function acknowledgeNodePendingWork(params: { nodeId: string; itemIds: string[] }): {
|
||||||
|
revision: number;
|
||||||
|
removedItemIds: string[];
|
||||||
|
} {
|
||||||
|
const nodeId = params.nodeId.trim();
|
||||||
|
if (!nodeId) {
|
||||||
|
return { revision: 0, removedItemIds: [] };
|
||||||
|
}
|
||||||
|
const state = stateByNodeId.get(nodeId);
|
||||||
|
if (!state) {
|
||||||
|
return { revision: 0, removedItemIds: [] };
|
||||||
|
}
|
||||||
|
const removedItemIds: string[] = [];
|
||||||
|
for (const itemId of params.itemIds) {
|
||||||
|
const trimmedId = itemId.trim();
|
||||||
|
if (!trimmedId || trimmedId === DEFAULT_STATUS_ITEM_ID) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (state.itemsById.delete(trimmedId)) {
|
||||||
|
removedItemIds.push(trimmedId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (removedItemIds.length > 0) {
|
||||||
|
state.revision += 1;
|
||||||
|
}
|
||||||
|
return { revision: state.revision, removedItemIds };
|
||||||
|
}
|
||||||
|
|
||||||
|
export function resetNodePendingWorkForTests() {
|
||||||
|
stateByNodeId.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
export function getNodePendingWorkStateCountForTests(): number {
|
||||||
|
return stateByNodeId.size;
|
||||||
|
}
|
||||||
@ -140,6 +140,14 @@ import {
|
|||||||
NodeDescribeParamsSchema,
|
NodeDescribeParamsSchema,
|
||||||
type NodeEventParams,
|
type NodeEventParams,
|
||||||
NodeEventParamsSchema,
|
NodeEventParamsSchema,
|
||||||
|
type NodePendingDrainParams,
|
||||||
|
NodePendingDrainParamsSchema,
|
||||||
|
type NodePendingDrainResult,
|
||||||
|
NodePendingDrainResultSchema,
|
||||||
|
type NodePendingEnqueueParams,
|
||||||
|
NodePendingEnqueueParamsSchema,
|
||||||
|
type NodePendingEnqueueResult,
|
||||||
|
NodePendingEnqueueResultSchema,
|
||||||
type NodeInvokeParams,
|
type NodeInvokeParams,
|
||||||
NodeInvokeParamsSchema,
|
NodeInvokeParamsSchema,
|
||||||
type NodeInvokeResultParams,
|
type NodeInvokeResultParams,
|
||||||
@ -296,6 +304,12 @@ export const validateNodeInvokeResultParams = ajv.compile<NodeInvokeResultParams
|
|||||||
NodeInvokeResultParamsSchema,
|
NodeInvokeResultParamsSchema,
|
||||||
);
|
);
|
||||||
export const validateNodeEventParams = ajv.compile<NodeEventParams>(NodeEventParamsSchema);
|
export const validateNodeEventParams = ajv.compile<NodeEventParams>(NodeEventParamsSchema);
|
||||||
|
export const validateNodePendingDrainParams = ajv.compile<NodePendingDrainParams>(
|
||||||
|
NodePendingDrainParamsSchema,
|
||||||
|
);
|
||||||
|
export const validateNodePendingEnqueueParams = ajv.compile<NodePendingEnqueueParams>(
|
||||||
|
NodePendingEnqueueParamsSchema,
|
||||||
|
);
|
||||||
export const validatePushTestParams = ajv.compile<PushTestParams>(PushTestParamsSchema);
|
export const validatePushTestParams = ajv.compile<PushTestParams>(PushTestParamsSchema);
|
||||||
export const validateSecretsResolveParams = ajv.compile<SecretsResolveParams>(
|
export const validateSecretsResolveParams = ajv.compile<SecretsResolveParams>(
|
||||||
SecretsResolveParamsSchema,
|
SecretsResolveParamsSchema,
|
||||||
@ -472,6 +486,10 @@ export {
|
|||||||
NodeListParamsSchema,
|
NodeListParamsSchema,
|
||||||
NodePendingAckParamsSchema,
|
NodePendingAckParamsSchema,
|
||||||
NodeInvokeParamsSchema,
|
NodeInvokeParamsSchema,
|
||||||
|
NodePendingDrainParamsSchema,
|
||||||
|
NodePendingDrainResultSchema,
|
||||||
|
NodePendingEnqueueParamsSchema,
|
||||||
|
NodePendingEnqueueResultSchema,
|
||||||
SessionsListParamsSchema,
|
SessionsListParamsSchema,
|
||||||
SessionsPreviewParamsSchema,
|
SessionsPreviewParamsSchema,
|
||||||
SessionsPatchParamsSchema,
|
SessionsPatchParamsSchema,
|
||||||
@ -621,6 +639,10 @@ export type {
|
|||||||
NodeInvokeParams,
|
NodeInvokeParams,
|
||||||
NodeInvokeResultParams,
|
NodeInvokeResultParams,
|
||||||
NodeEventParams,
|
NodeEventParams,
|
||||||
|
NodePendingDrainParams,
|
||||||
|
NodePendingDrainResult,
|
||||||
|
NodePendingEnqueueParams,
|
||||||
|
NodePendingEnqueueResult,
|
||||||
SessionsListParams,
|
SessionsListParams,
|
||||||
SessionsPreviewParams,
|
SessionsPreviewParams,
|
||||||
SessionsResolveParams,
|
SessionsResolveParams,
|
||||||
|
|||||||
@ -1,6 +1,14 @@
|
|||||||
import { Type } from "@sinclair/typebox";
|
import { Type } from "@sinclair/typebox";
|
||||||
import { NonEmptyString } from "./primitives.js";
|
import { NonEmptyString } from "./primitives.js";
|
||||||
|
|
||||||
|
const NodePendingWorkTypeSchema = Type.String({
|
||||||
|
enum: ["status.request", "location.request"],
|
||||||
|
});
|
||||||
|
|
||||||
|
const NodePendingWorkPrioritySchema = Type.String({
|
||||||
|
enum: ["normal", "high"],
|
||||||
|
});
|
||||||
|
|
||||||
export const NodePairRequestParamsSchema = Type.Object(
|
export const NodePairRequestParamsSchema = Type.Object(
|
||||||
{
|
{
|
||||||
nodeId: NonEmptyString,
|
nodeId: NonEmptyString,
|
||||||
@ -95,6 +103,56 @@ export const NodeEventParamsSchema = Type.Object(
|
|||||||
{ additionalProperties: false },
|
{ additionalProperties: false },
|
||||||
);
|
);
|
||||||
|
|
||||||
|
export const NodePendingDrainParamsSchema = Type.Object(
|
||||||
|
{
|
||||||
|
maxItems: Type.Optional(Type.Integer({ minimum: 1, maximum: 10 })),
|
||||||
|
},
|
||||||
|
{ additionalProperties: false },
|
||||||
|
);
|
||||||
|
|
||||||
|
export const NodePendingDrainItemSchema = Type.Object(
|
||||||
|
{
|
||||||
|
id: NonEmptyString,
|
||||||
|
type: NodePendingWorkTypeSchema,
|
||||||
|
priority: Type.String({ enum: ["default", "normal", "high"] }),
|
||||||
|
createdAtMs: Type.Integer({ minimum: 0 }),
|
||||||
|
expiresAtMs: Type.Optional(Type.Union([Type.Integer({ minimum: 0 }), Type.Null()])),
|
||||||
|
payload: Type.Optional(Type.Record(Type.String(), Type.Unknown())),
|
||||||
|
},
|
||||||
|
{ additionalProperties: false },
|
||||||
|
);
|
||||||
|
|
||||||
|
export const NodePendingDrainResultSchema = Type.Object(
|
||||||
|
{
|
||||||
|
nodeId: NonEmptyString,
|
||||||
|
revision: Type.Integer({ minimum: 0 }),
|
||||||
|
items: Type.Array(NodePendingDrainItemSchema),
|
||||||
|
hasMore: Type.Boolean(),
|
||||||
|
},
|
||||||
|
{ additionalProperties: false },
|
||||||
|
);
|
||||||
|
|
||||||
|
export const NodePendingEnqueueParamsSchema = Type.Object(
|
||||||
|
{
|
||||||
|
nodeId: NonEmptyString,
|
||||||
|
type: NodePendingWorkTypeSchema,
|
||||||
|
priority: Type.Optional(NodePendingWorkPrioritySchema),
|
||||||
|
expiresInMs: Type.Optional(Type.Integer({ minimum: 1_000, maximum: 86_400_000 })),
|
||||||
|
wake: Type.Optional(Type.Boolean()),
|
||||||
|
},
|
||||||
|
{ additionalProperties: false },
|
||||||
|
);
|
||||||
|
|
||||||
|
export const NodePendingEnqueueResultSchema = Type.Object(
|
||||||
|
{
|
||||||
|
nodeId: NonEmptyString,
|
||||||
|
revision: Type.Integer({ minimum: 0 }),
|
||||||
|
queued: NodePendingDrainItemSchema,
|
||||||
|
wakeTriggered: Type.Boolean(),
|
||||||
|
},
|
||||||
|
{ additionalProperties: false },
|
||||||
|
);
|
||||||
|
|
||||||
export const NodeInvokeRequestEventSchema = Type.Object(
|
export const NodeInvokeRequestEventSchema = Type.Object(
|
||||||
{
|
{
|
||||||
id: NonEmptyString,
|
id: NonEmptyString,
|
||||||
|
|||||||
@ -114,6 +114,10 @@ import {
|
|||||||
import {
|
import {
|
||||||
NodeDescribeParamsSchema,
|
NodeDescribeParamsSchema,
|
||||||
NodeEventParamsSchema,
|
NodeEventParamsSchema,
|
||||||
|
NodePendingDrainParamsSchema,
|
||||||
|
NodePendingDrainResultSchema,
|
||||||
|
NodePendingEnqueueParamsSchema,
|
||||||
|
NodePendingEnqueueResultSchema,
|
||||||
NodeInvokeParamsSchema,
|
NodeInvokeParamsSchema,
|
||||||
NodeInvokeResultParamsSchema,
|
NodeInvokeResultParamsSchema,
|
||||||
NodeInvokeRequestEventSchema,
|
NodeInvokeRequestEventSchema,
|
||||||
@ -186,6 +190,10 @@ export const ProtocolSchemas = {
|
|||||||
NodeInvokeParams: NodeInvokeParamsSchema,
|
NodeInvokeParams: NodeInvokeParamsSchema,
|
||||||
NodeInvokeResultParams: NodeInvokeResultParamsSchema,
|
NodeInvokeResultParams: NodeInvokeResultParamsSchema,
|
||||||
NodeEventParams: NodeEventParamsSchema,
|
NodeEventParams: NodeEventParamsSchema,
|
||||||
|
NodePendingDrainParams: NodePendingDrainParamsSchema,
|
||||||
|
NodePendingDrainResult: NodePendingDrainResultSchema,
|
||||||
|
NodePendingEnqueueParams: NodePendingEnqueueParamsSchema,
|
||||||
|
NodePendingEnqueueResult: NodePendingEnqueueResultSchema,
|
||||||
NodeInvokeRequestEvent: NodeInvokeRequestEventSchema,
|
NodeInvokeRequestEvent: NodeInvokeRequestEventSchema,
|
||||||
PushTestParams: PushTestParamsSchema,
|
PushTestParams: PushTestParamsSchema,
|
||||||
PushTestResult: PushTestResultSchema,
|
PushTestResult: PushTestResultSchema,
|
||||||
|
|||||||
@ -32,6 +32,10 @@ export type NodeDescribeParams = SchemaType<"NodeDescribeParams">;
|
|||||||
export type NodeInvokeParams = SchemaType<"NodeInvokeParams">;
|
export type NodeInvokeParams = SchemaType<"NodeInvokeParams">;
|
||||||
export type NodeInvokeResultParams = SchemaType<"NodeInvokeResultParams">;
|
export type NodeInvokeResultParams = SchemaType<"NodeInvokeResultParams">;
|
||||||
export type NodeEventParams = SchemaType<"NodeEventParams">;
|
export type NodeEventParams = SchemaType<"NodeEventParams">;
|
||||||
|
export type NodePendingDrainParams = SchemaType<"NodePendingDrainParams">;
|
||||||
|
export type NodePendingDrainResult = SchemaType<"NodePendingDrainResult">;
|
||||||
|
export type NodePendingEnqueueParams = SchemaType<"NodePendingEnqueueParams">;
|
||||||
|
export type NodePendingEnqueueResult = SchemaType<"NodePendingEnqueueResult">;
|
||||||
export type PushTestParams = SchemaType<"PushTestParams">;
|
export type PushTestParams = SchemaType<"PushTestParams">;
|
||||||
export type PushTestResult = SchemaType<"PushTestResult">;
|
export type PushTestResult = SchemaType<"PushTestResult">;
|
||||||
export type SessionsListParams = SchemaType<"SessionsListParams">;
|
export type SessionsListParams = SchemaType<"SessionsListParams">;
|
||||||
|
|||||||
@ -21,8 +21,10 @@ describe("gateway role policy", () => {
|
|||||||
|
|
||||||
test("authorizes roles against node vs operator methods", () => {
|
test("authorizes roles against node vs operator methods", () => {
|
||||||
expect(isRoleAuthorizedForMethod("node", "node.event")).toBe(true);
|
expect(isRoleAuthorizedForMethod("node", "node.event")).toBe(true);
|
||||||
|
expect(isRoleAuthorizedForMethod("node", "node.pending.drain")).toBe(true);
|
||||||
expect(isRoleAuthorizedForMethod("node", "status")).toBe(false);
|
expect(isRoleAuthorizedForMethod("node", "status")).toBe(false);
|
||||||
expect(isRoleAuthorizedForMethod("operator", "status")).toBe(true);
|
expect(isRoleAuthorizedForMethod("operator", "status")).toBe(true);
|
||||||
|
expect(isRoleAuthorizedForMethod("operator", "node.pending.drain")).toBe(false);
|
||||||
expect(isRoleAuthorizedForMethod("operator", "node.event")).toBe(false);
|
expect(isRoleAuthorizedForMethod("operator", "node.event")).toBe(false);
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|||||||
@ -76,6 +76,8 @@ const BASE_METHODS = [
|
|||||||
"node.rename",
|
"node.rename",
|
||||||
"node.list",
|
"node.list",
|
||||||
"node.describe",
|
"node.describe",
|
||||||
|
"node.pending.drain",
|
||||||
|
"node.pending.enqueue",
|
||||||
"node.invoke",
|
"node.invoke",
|
||||||
"node.pending.pull",
|
"node.pending.pull",
|
||||||
"node.pending.ack",
|
"node.pending.ack",
|
||||||
|
|||||||
@ -18,6 +18,7 @@ import { execApprovalsHandlers } from "./server-methods/exec-approvals.js";
|
|||||||
import { healthHandlers } from "./server-methods/health.js";
|
import { healthHandlers } from "./server-methods/health.js";
|
||||||
import { logsHandlers } from "./server-methods/logs.js";
|
import { logsHandlers } from "./server-methods/logs.js";
|
||||||
import { modelsHandlers } from "./server-methods/models.js";
|
import { modelsHandlers } from "./server-methods/models.js";
|
||||||
|
import { nodePendingHandlers } from "./server-methods/nodes-pending.js";
|
||||||
import { nodeHandlers } from "./server-methods/nodes.js";
|
import { nodeHandlers } from "./server-methods/nodes.js";
|
||||||
import { pushHandlers } from "./server-methods/push.js";
|
import { pushHandlers } from "./server-methods/push.js";
|
||||||
import { sendHandlers } from "./server-methods/send.js";
|
import { sendHandlers } from "./server-methods/send.js";
|
||||||
@ -87,6 +88,7 @@ export const coreGatewayHandlers: GatewayRequestHandlers = {
|
|||||||
...systemHandlers,
|
...systemHandlers,
|
||||||
...updateHandlers,
|
...updateHandlers,
|
||||||
...nodeHandlers,
|
...nodeHandlers,
|
||||||
|
...nodePendingHandlers,
|
||||||
...pushHandlers,
|
...pushHandlers,
|
||||||
...sendHandlers,
|
...sendHandlers,
|
||||||
...usageHandlers,
|
...usageHandlers,
|
||||||
|
|||||||
177
src/gateway/server-methods/nodes-pending.test.ts
Normal file
177
src/gateway/server-methods/nodes-pending.test.ts
Normal file
@ -0,0 +1,177 @@
|
|||||||
|
import { beforeEach, describe, expect, it, vi } from "vitest";
|
||||||
|
import { nodePendingHandlers } from "./nodes-pending.js";
|
||||||
|
|
||||||
|
const mocks = vi.hoisted(() => ({
|
||||||
|
drainNodePendingWork: vi.fn(),
|
||||||
|
enqueueNodePendingWork: vi.fn(),
|
||||||
|
maybeWakeNodeWithApns: vi.fn(),
|
||||||
|
maybeSendNodeWakeNudge: vi.fn(),
|
||||||
|
waitForNodeReconnect: vi.fn(),
|
||||||
|
}));
|
||||||
|
|
||||||
|
vi.mock("../node-pending-work.js", () => ({
|
||||||
|
drainNodePendingWork: mocks.drainNodePendingWork,
|
||||||
|
enqueueNodePendingWork: mocks.enqueueNodePendingWork,
|
||||||
|
}));
|
||||||
|
|
||||||
|
vi.mock("./nodes.js", () => ({
|
||||||
|
NODE_WAKE_RECONNECT_WAIT_MS: 3_000,
|
||||||
|
NODE_WAKE_RECONNECT_RETRY_WAIT_MS: 12_000,
|
||||||
|
maybeWakeNodeWithApns: mocks.maybeWakeNodeWithApns,
|
||||||
|
maybeSendNodeWakeNudge: mocks.maybeSendNodeWakeNudge,
|
||||||
|
waitForNodeReconnect: mocks.waitForNodeReconnect,
|
||||||
|
}));
|
||||||
|
|
||||||
|
type RespondCall = [
|
||||||
|
boolean,
|
||||||
|
unknown?,
|
||||||
|
{
|
||||||
|
code?: number;
|
||||||
|
message?: string;
|
||||||
|
details?: unknown;
|
||||||
|
}?,
|
||||||
|
];
|
||||||
|
|
||||||
|
function makeContext(overrides?: Partial<Record<string, unknown>>) {
|
||||||
|
return {
|
||||||
|
nodeRegistry: {
|
||||||
|
get: vi.fn(() => undefined),
|
||||||
|
},
|
||||||
|
logGateway: {
|
||||||
|
info: vi.fn(),
|
||||||
|
warn: vi.fn(),
|
||||||
|
},
|
||||||
|
...overrides,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
describe("node.pending handlers", () => {
|
||||||
|
beforeEach(() => {
|
||||||
|
mocks.drainNodePendingWork.mockReset();
|
||||||
|
mocks.enqueueNodePendingWork.mockReset();
|
||||||
|
mocks.maybeWakeNodeWithApns.mockReset();
|
||||||
|
mocks.maybeSendNodeWakeNudge.mockReset();
|
||||||
|
mocks.waitForNodeReconnect.mockReset();
|
||||||
|
});
|
||||||
|
|
||||||
|
it("drains pending work for the connected node identity", async () => {
|
||||||
|
mocks.drainNodePendingWork.mockReturnValue({
|
||||||
|
revision: 2,
|
||||||
|
items: [{ id: "baseline-status", type: "status.request", priority: "default" }],
|
||||||
|
hasMore: false,
|
||||||
|
});
|
||||||
|
const respond = vi.fn();
|
||||||
|
|
||||||
|
await nodePendingHandlers["node.pending.drain"]({
|
||||||
|
params: { maxItems: 3 },
|
||||||
|
respond: respond as never,
|
||||||
|
client: { connect: { device: { id: "ios-node-1" } } } as never,
|
||||||
|
context: makeContext() as never,
|
||||||
|
req: { type: "req", id: "req-node-pending-drain", method: "node.pending.drain" },
|
||||||
|
isWebchatConnect: () => false,
|
||||||
|
});
|
||||||
|
|
||||||
|
expect(mocks.drainNodePendingWork).toHaveBeenCalledWith("ios-node-1", {
|
||||||
|
maxItems: 3,
|
||||||
|
includeDefaultStatus: true,
|
||||||
|
});
|
||||||
|
expect(respond).toHaveBeenCalledWith(
|
||||||
|
true,
|
||||||
|
{
|
||||||
|
nodeId: "ios-node-1",
|
||||||
|
revision: 2,
|
||||||
|
items: [{ id: "baseline-status", type: "status.request", priority: "default" }],
|
||||||
|
hasMore: false,
|
||||||
|
},
|
||||||
|
undefined,
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("rejects node.pending.drain without a connected device identity", async () => {
|
||||||
|
const respond = vi.fn();
|
||||||
|
|
||||||
|
await nodePendingHandlers["node.pending.drain"]({
|
||||||
|
params: {},
|
||||||
|
respond: respond as never,
|
||||||
|
client: null,
|
||||||
|
context: makeContext() as never,
|
||||||
|
req: { type: "req", id: "req-node-pending-drain-missing", method: "node.pending.drain" },
|
||||||
|
isWebchatConnect: () => false,
|
||||||
|
});
|
||||||
|
|
||||||
|
const call = respond.mock.calls[0] as RespondCall | undefined;
|
||||||
|
expect(call?.[0]).toBe(false);
|
||||||
|
expect(call?.[2]?.message).toContain("connected device identity");
|
||||||
|
});
|
||||||
|
|
||||||
|
it("enqueues pending work and wakes a disconnected node once", async () => {
|
||||||
|
mocks.enqueueNodePendingWork.mockReturnValue({
|
||||||
|
revision: 4,
|
||||||
|
deduped: false,
|
||||||
|
item: {
|
||||||
|
id: "pending-1",
|
||||||
|
type: "location.request",
|
||||||
|
priority: "high",
|
||||||
|
createdAtMs: 100,
|
||||||
|
expiresAtMs: null,
|
||||||
|
},
|
||||||
|
});
|
||||||
|
mocks.maybeWakeNodeWithApns.mockResolvedValue({
|
||||||
|
available: true,
|
||||||
|
throttled: false,
|
||||||
|
path: "apns",
|
||||||
|
durationMs: 12,
|
||||||
|
apnsStatus: 200,
|
||||||
|
apnsReason: null,
|
||||||
|
});
|
||||||
|
let connected = false;
|
||||||
|
mocks.waitForNodeReconnect.mockImplementation(async () => {
|
||||||
|
connected = true;
|
||||||
|
return true;
|
||||||
|
});
|
||||||
|
const context = makeContext({
|
||||||
|
nodeRegistry: {
|
||||||
|
get: vi.fn(() => (connected ? { nodeId: "ios-node-2" } : undefined)),
|
||||||
|
},
|
||||||
|
});
|
||||||
|
const respond = vi.fn();
|
||||||
|
|
||||||
|
await nodePendingHandlers["node.pending.enqueue"]({
|
||||||
|
params: {
|
||||||
|
nodeId: "ios-node-2",
|
||||||
|
type: "location.request",
|
||||||
|
priority: "high",
|
||||||
|
},
|
||||||
|
respond: respond as never,
|
||||||
|
client: null,
|
||||||
|
context: context as never,
|
||||||
|
req: { type: "req", id: "req-node-pending-enqueue", method: "node.pending.enqueue" },
|
||||||
|
isWebchatConnect: () => false,
|
||||||
|
});
|
||||||
|
|
||||||
|
expect(mocks.enqueueNodePendingWork).toHaveBeenCalledWith({
|
||||||
|
nodeId: "ios-node-2",
|
||||||
|
type: "location.request",
|
||||||
|
priority: "high",
|
||||||
|
expiresInMs: undefined,
|
||||||
|
});
|
||||||
|
expect(mocks.maybeWakeNodeWithApns).toHaveBeenCalledWith("ios-node-2", {
|
||||||
|
wakeReason: "node.pending",
|
||||||
|
});
|
||||||
|
expect(mocks.waitForNodeReconnect).toHaveBeenCalledWith({
|
||||||
|
nodeId: "ios-node-2",
|
||||||
|
context,
|
||||||
|
timeoutMs: 3_000,
|
||||||
|
});
|
||||||
|
expect(mocks.maybeSendNodeWakeNudge).not.toHaveBeenCalled();
|
||||||
|
expect(respond).toHaveBeenCalledWith(
|
||||||
|
true,
|
||||||
|
expect.objectContaining({
|
||||||
|
nodeId: "ios-node-2",
|
||||||
|
revision: 4,
|
||||||
|
wakeTriggered: true,
|
||||||
|
}),
|
||||||
|
undefined,
|
||||||
|
);
|
||||||
|
});
|
||||||
|
});
|
||||||
159
src/gateway/server-methods/nodes-pending.ts
Normal file
159
src/gateway/server-methods/nodes-pending.ts
Normal file
@ -0,0 +1,159 @@
|
|||||||
|
import {
|
||||||
|
drainNodePendingWork,
|
||||||
|
enqueueNodePendingWork,
|
||||||
|
type NodePendingWorkPriority,
|
||||||
|
type NodePendingWorkType,
|
||||||
|
} from "../node-pending-work.js";
|
||||||
|
import {
|
||||||
|
ErrorCodes,
|
||||||
|
errorShape,
|
||||||
|
validateNodePendingDrainParams,
|
||||||
|
validateNodePendingEnqueueParams,
|
||||||
|
} from "../protocol/index.js";
|
||||||
|
import { respondInvalidParams, respondUnavailableOnThrow } from "./nodes.helpers.js";
|
||||||
|
import {
|
||||||
|
maybeSendNodeWakeNudge,
|
||||||
|
maybeWakeNodeWithApns,
|
||||||
|
NODE_WAKE_RECONNECT_RETRY_WAIT_MS,
|
||||||
|
NODE_WAKE_RECONNECT_WAIT_MS,
|
||||||
|
waitForNodeReconnect,
|
||||||
|
} from "./nodes.js";
|
||||||
|
import type { GatewayRequestHandlers } from "./types.js";
|
||||||
|
|
||||||
|
function resolveClientNodeId(
|
||||||
|
client: { connect?: { device?: { id?: string }; client?: { id?: string } } } | null,
|
||||||
|
): string | null {
|
||||||
|
const nodeId = client?.connect?.device?.id ?? client?.connect?.client?.id ?? "";
|
||||||
|
const trimmed = nodeId.trim();
|
||||||
|
return trimmed.length > 0 ? trimmed : null;
|
||||||
|
}
|
||||||
|
|
||||||
|
export const nodePendingHandlers: GatewayRequestHandlers = {
|
||||||
|
"node.pending.drain": async ({ params, respond, client }) => {
|
||||||
|
if (!validateNodePendingDrainParams(params)) {
|
||||||
|
respondInvalidParams({
|
||||||
|
respond,
|
||||||
|
method: "node.pending.drain",
|
||||||
|
validator: validateNodePendingDrainParams,
|
||||||
|
});
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
const nodeId = resolveClientNodeId(client);
|
||||||
|
if (!nodeId) {
|
||||||
|
respond(
|
||||||
|
false,
|
||||||
|
undefined,
|
||||||
|
errorShape(
|
||||||
|
ErrorCodes.INVALID_REQUEST,
|
||||||
|
"node.pending.drain requires a connected device identity",
|
||||||
|
),
|
||||||
|
);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
const p = params as { maxItems?: number };
|
||||||
|
const drained = drainNodePendingWork(nodeId, {
|
||||||
|
maxItems: p.maxItems,
|
||||||
|
includeDefaultStatus: true,
|
||||||
|
});
|
||||||
|
respond(true, { nodeId, ...drained }, undefined);
|
||||||
|
},
|
||||||
|
"node.pending.enqueue": async ({ params, respond, context }) => {
|
||||||
|
if (!validateNodePendingEnqueueParams(params)) {
|
||||||
|
respondInvalidParams({
|
||||||
|
respond,
|
||||||
|
method: "node.pending.enqueue",
|
||||||
|
validator: validateNodePendingEnqueueParams,
|
||||||
|
});
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
const p = params as {
|
||||||
|
nodeId: string;
|
||||||
|
type: NodePendingWorkType;
|
||||||
|
priority?: NodePendingWorkPriority;
|
||||||
|
expiresInMs?: number;
|
||||||
|
wake?: boolean;
|
||||||
|
};
|
||||||
|
await respondUnavailableOnThrow(respond, async () => {
|
||||||
|
const queued = enqueueNodePendingWork({
|
||||||
|
nodeId: p.nodeId,
|
||||||
|
type: p.type,
|
||||||
|
priority: p.priority,
|
||||||
|
expiresInMs: p.expiresInMs,
|
||||||
|
});
|
||||||
|
let wakeTriggered = false;
|
||||||
|
if (p.wake !== false && !queued.deduped && !context.nodeRegistry.get(p.nodeId)) {
|
||||||
|
const wakeReqId = queued.item.id;
|
||||||
|
context.logGateway.info(
|
||||||
|
`node pending wake start node=${p.nodeId} req=${wakeReqId} type=${queued.item.type}`,
|
||||||
|
);
|
||||||
|
const wake = await maybeWakeNodeWithApns(p.nodeId, { wakeReason: "node.pending" });
|
||||||
|
context.logGateway.info(
|
||||||
|
`node pending wake stage=wake1 node=${p.nodeId} req=${wakeReqId} ` +
|
||||||
|
`available=${wake.available} throttled=${wake.throttled} ` +
|
||||||
|
`path=${wake.path} durationMs=${wake.durationMs} ` +
|
||||||
|
`apnsStatus=${wake.apnsStatus ?? -1} apnsReason=${wake.apnsReason ?? "-"}`,
|
||||||
|
);
|
||||||
|
wakeTriggered = wake.available;
|
||||||
|
if (wake.available) {
|
||||||
|
const reconnected = await waitForNodeReconnect({
|
||||||
|
nodeId: p.nodeId,
|
||||||
|
context,
|
||||||
|
timeoutMs: NODE_WAKE_RECONNECT_WAIT_MS,
|
||||||
|
});
|
||||||
|
context.logGateway.info(
|
||||||
|
`node pending wake stage=wait1 node=${p.nodeId} req=${wakeReqId} ` +
|
||||||
|
`reconnected=${reconnected} timeoutMs=${NODE_WAKE_RECONNECT_WAIT_MS}`,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
if (!context.nodeRegistry.get(p.nodeId) && wake.available) {
|
||||||
|
const retryWake = await maybeWakeNodeWithApns(p.nodeId, {
|
||||||
|
force: true,
|
||||||
|
wakeReason: "node.pending",
|
||||||
|
});
|
||||||
|
context.logGateway.info(
|
||||||
|
`node pending wake stage=wake2 node=${p.nodeId} req=${wakeReqId} force=true ` +
|
||||||
|
`available=${retryWake.available} throttled=${retryWake.throttled} ` +
|
||||||
|
`path=${retryWake.path} durationMs=${retryWake.durationMs} ` +
|
||||||
|
`apnsStatus=${retryWake.apnsStatus ?? -1} apnsReason=${retryWake.apnsReason ?? "-"}`,
|
||||||
|
);
|
||||||
|
if (retryWake.available) {
|
||||||
|
const reconnected = await waitForNodeReconnect({
|
||||||
|
nodeId: p.nodeId,
|
||||||
|
context,
|
||||||
|
timeoutMs: NODE_WAKE_RECONNECT_RETRY_WAIT_MS,
|
||||||
|
});
|
||||||
|
context.logGateway.info(
|
||||||
|
`node pending wake stage=wait2 node=${p.nodeId} req=${wakeReqId} ` +
|
||||||
|
`reconnected=${reconnected} timeoutMs=${NODE_WAKE_RECONNECT_RETRY_WAIT_MS}`,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (!context.nodeRegistry.get(p.nodeId)) {
|
||||||
|
const nudge = await maybeSendNodeWakeNudge(p.nodeId);
|
||||||
|
context.logGateway.info(
|
||||||
|
`node pending wake nudge node=${p.nodeId} req=${wakeReqId} sent=${nudge.sent} ` +
|
||||||
|
`throttled=${nudge.throttled} reason=${nudge.reason} durationMs=${nudge.durationMs} ` +
|
||||||
|
`apnsStatus=${nudge.apnsStatus ?? -1} apnsReason=${nudge.apnsReason ?? "-"}`,
|
||||||
|
);
|
||||||
|
context.logGateway.warn(
|
||||||
|
`node pending wake done node=${p.nodeId} req=${wakeReqId} connected=false reason=not_connected`,
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
context.logGateway.info(
|
||||||
|
`node pending wake done node=${p.nodeId} req=${wakeReqId} connected=true`,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
respond(
|
||||||
|
true,
|
||||||
|
{
|
||||||
|
nodeId: p.nodeId,
|
||||||
|
revision: queued.revision,
|
||||||
|
queued: queued.item,
|
||||||
|
wakeTriggered,
|
||||||
|
},
|
||||||
|
undefined,
|
||||||
|
);
|
||||||
|
});
|
||||||
|
},
|
||||||
|
};
|
||||||
@ -47,9 +47,9 @@ import {
|
|||||||
} from "./nodes.helpers.js";
|
} from "./nodes.helpers.js";
|
||||||
import type { GatewayRequestHandlers } from "./types.js";
|
import type { GatewayRequestHandlers } from "./types.js";
|
||||||
|
|
||||||
const NODE_WAKE_RECONNECT_WAIT_MS = 3_000;
|
export const NODE_WAKE_RECONNECT_WAIT_MS = 3_000;
|
||||||
const NODE_WAKE_RECONNECT_RETRY_WAIT_MS = 12_000;
|
export const NODE_WAKE_RECONNECT_RETRY_WAIT_MS = 12_000;
|
||||||
const NODE_WAKE_RECONNECT_POLL_MS = 150;
|
export const NODE_WAKE_RECONNECT_POLL_MS = 150;
|
||||||
const NODE_WAKE_THROTTLE_MS = 15_000;
|
const NODE_WAKE_THROTTLE_MS = 15_000;
|
||||||
const NODE_WAKE_NUDGE_THROTTLE_MS = 10 * 60_000;
|
const NODE_WAKE_NUDGE_THROTTLE_MS = 10 * 60_000;
|
||||||
const NODE_PENDING_ACTION_TTL_MS = 10 * 60_000;
|
const NODE_PENDING_ACTION_TTL_MS = 10 * 60_000;
|
||||||
@ -208,9 +208,9 @@ function toPendingParamsJSON(params: unknown): string | undefined {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async function maybeWakeNodeWithApns(
|
export async function maybeWakeNodeWithApns(
|
||||||
nodeId: string,
|
nodeId: string,
|
||||||
opts?: { force?: boolean },
|
opts?: { force?: boolean; wakeReason?: string },
|
||||||
): Promise<NodeWakeAttempt> {
|
): Promise<NodeWakeAttempt> {
|
||||||
const state = nodeWakeById.get(nodeId) ?? { lastWakeAtMs: 0 };
|
const state = nodeWakeById.get(nodeId) ?? { lastWakeAtMs: 0 };
|
||||||
nodeWakeById.set(nodeId, state);
|
nodeWakeById.set(nodeId, state);
|
||||||
@ -253,7 +253,7 @@ async function maybeWakeNodeWithApns(
|
|||||||
auth: auth.value,
|
auth: auth.value,
|
||||||
registration,
|
registration,
|
||||||
nodeId,
|
nodeId,
|
||||||
wakeReason: "node.invoke",
|
wakeReason: opts?.wakeReason ?? "node.invoke",
|
||||||
});
|
});
|
||||||
if (!wakeResult.ok) {
|
if (!wakeResult.ok) {
|
||||||
return withDuration({
|
return withDuration({
|
||||||
@ -298,7 +298,7 @@ async function maybeWakeNodeWithApns(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async function maybeSendNodeWakeNudge(nodeId: string): Promise<NodeWakeNudgeAttempt> {
|
export async function maybeSendNodeWakeNudge(nodeId: string): Promise<NodeWakeNudgeAttempt> {
|
||||||
const startedAtMs = Date.now();
|
const startedAtMs = Date.now();
|
||||||
const withDuration = (
|
const withDuration = (
|
||||||
attempt: Omit<NodeWakeNudgeAttempt, "durationMs">,
|
attempt: Omit<NodeWakeNudgeAttempt, "durationMs">,
|
||||||
@ -362,7 +362,7 @@ async function maybeSendNodeWakeNudge(nodeId: string): Promise<NodeWakeNudgeAtte
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async function waitForNodeReconnect(params: {
|
export async function waitForNodeReconnect(params: {
|
||||||
nodeId: string;
|
nodeId: string;
|
||||||
context: { nodeRegistry: { get: (nodeId: string) => unknown } };
|
context: { nodeRegistry: { get: (nodeId: string) => unknown } };
|
||||||
timeoutMs?: number;
|
timeoutMs?: number;
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user