diff --git a/extensions/slack/src/monitor/slash.test-harness.ts b/extensions/slack/src/monitor/slash.test-harness.ts index f5618dde5be..c8d4fb811b0 100644 --- a/extensions/slack/src/monitor/slash.test-harness.ts +++ b/extensions/slack/src/monitor/slash.test-harness.ts @@ -20,15 +20,6 @@ vi.mock("openclaw/plugin-sdk/reply-runtime", async (importOriginal) => { }; }); -vi.mock("openclaw/plugin-sdk/conversation-runtime", async (importOriginal) => { - const actual = await importOriginal(); - return { - ...actual, - readChannelAllowFromStore: (...args: unknown[]) => mocks.readAllowFromStoreMock(...args), - upsertChannelPairingRequest: (...args: unknown[]) => mocks.upsertPairingRequestMock(...args), - }; -}); - vi.mock("openclaw/plugin-sdk/routing", async (importOriginal) => { const actual = await importOriginal(); return { diff --git a/src/agents/subagent-announce.capture-completion-reply.test.ts b/src/agents/subagent-announce.capture-completion-reply.test.ts index 9511cd9ec8a..a2cbbb1faa5 100644 --- a/src/agents/subagent-announce.capture-completion-reply.test.ts +++ b/src/agents/subagent-announce.capture-completion-reply.test.ts @@ -1,8 +1,5 @@ import { afterAll, beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; -const readLatestAssistantReplyMock = vi.fn<(sessionKey: string) => Promise>( - async (_sessionKey: string) => undefined, -); const chatHistoryMock = vi.fn<(sessionKey: string) => Promise<{ messages?: Array }>>( async (_sessionKey: string) => ({ messages: [] }), ); @@ -17,10 +14,6 @@ vi.mock("../gateway/call.js", () => ({ }), })); -vi.mock("./tools/agent-step.js", () => ({ - readLatestAssistantReply: readLatestAssistantReplyMock, -})); - describe("captureSubagentCompletionReply", () => { let previousFastTestEnv: string | undefined; let captureSubagentCompletionReply: (typeof import("./subagent-announce.js"))["captureSubagentCompletionReply"]; @@ -40,23 +33,27 @@ describe("captureSubagentCompletionReply", () => { }); beforeEach(() => { - readLatestAssistantReplyMock.mockReset().mockResolvedValue(undefined); chatHistoryMock.mockReset().mockResolvedValue({ messages: [] }); }); - it("returns immediate assistant output without polling", async () => { - readLatestAssistantReplyMock.mockResolvedValueOnce("Immediate assistant completion"); + it("returns immediate assistant output from history without polling", async () => { + chatHistoryMock.mockResolvedValueOnce({ + messages: [ + { + role: "assistant", + content: [{ type: "text", text: "Immediate assistant completion" }], + }, + ], + }); const result = await captureSubagentCompletionReply("agent:main:subagent:child"); expect(result).toBe("Immediate assistant completion"); - expect(readLatestAssistantReplyMock).toHaveBeenCalledTimes(1); - expect(chatHistoryMock).not.toHaveBeenCalled(); + expect(chatHistoryMock).toHaveBeenCalledTimes(1); }); it("polls briefly and returns late tool output once available", async () => { vi.useFakeTimers(); - readLatestAssistantReplyMock.mockResolvedValue(undefined); chatHistoryMock.mockResolvedValueOnce({ messages: [] }).mockResolvedValueOnce({ messages: [ { @@ -82,7 +79,6 @@ describe("captureSubagentCompletionReply", () => { it("returns undefined when no completion output arrives before retry window closes", async () => { vi.useFakeTimers(); - readLatestAssistantReplyMock.mockResolvedValue(undefined); chatHistoryMock.mockResolvedValue({ messages: [] }); const pending = captureSubagentCompletionReply("agent:main:subagent:child"); @@ -93,4 +89,26 @@ describe("captureSubagentCompletionReply", () => { expect(chatHistoryMock).toHaveBeenCalled(); vi.useRealTimers(); }); + + it("returns partial assistant progress when the latest assistant turn is tool-only", async () => { + chatHistoryMock.mockResolvedValueOnce({ + messages: [ + { + role: "assistant", + content: [ + { type: "text", text: "Mapped the modules." }, + { type: "toolCall", id: "call-1", name: "read", arguments: {} }, + ], + }, + { + role: "assistant", + content: [{ type: "toolCall", id: "call-2", name: "exec", arguments: {} }], + }, + ], + }); + + const result = await captureSubagentCompletionReply("agent:main:subagent:child"); + + expect(result).toBe("Mapped the modules."); + }); }); diff --git a/src/agents/subagent-announce.timeout.test.ts b/src/agents/subagent-announce.timeout.test.ts index 5fae988fe73..52cde0f69b0 100644 --- a/src/agents/subagent-announce.timeout.test.ts +++ b/src/agents/subagent-announce.timeout.test.ts @@ -29,10 +29,14 @@ let fallbackRequesterResolution: { requesterSessionKey: string; requesterOrigin?: { channel?: string; to?: string; accountId?: string }; } | null = null; +let chatHistoryMessages: Array> = []; vi.mock("../gateway/call.js", () => ({ callGateway: vi.fn(async (request: GatewayCall) => { gatewayCalls.push(request); + if (request.method === "chat.history") { + return { messages: chatHistoryMessages }; + } return await callGatewayImpl(request); }), })); @@ -138,6 +142,7 @@ function setupParentSessionFallback(parentSessionKey: string): void { describe("subagent announce timeout config", () => { beforeEach(() => { gatewayCalls.length = 0; + chatHistoryMessages = []; callGatewayImpl = async (request) => { if (request.method === "chat.history") { return { messages: [] }; @@ -270,7 +275,6 @@ describe("subagent announce timeout config", () => { it("regression, routes child announce to parent session instead of grandparent when parent session still exists", async () => { const parentSessionKey = "agent:main:subagent:parent"; setupParentSessionFallback(parentSessionKey); - // No sessionId on purpose: existence in store should still count as alive. sessionStore[parentSessionKey] = { updatedAt: Date.now() }; await runAnnounceFlowForTest("run-parent-route", { @@ -301,4 +305,147 @@ describe("subagent announce timeout config", () => { expect(directAgentCall?.params?.to).toBe("chan-main"); expect(directAgentCall?.params?.accountId).toBe("acct-main"); }); + + it("uses partial progress on timeout when the child only made tool calls", async () => { + chatHistoryMessages = [ + { role: "user", content: "do a complex task" }, + { + role: "assistant", + content: [{ type: "toolCall", id: "call-1", name: "read", arguments: {} }], + }, + { role: "toolResult", toolCallId: "call-1", content: [{ type: "text", text: "data" }] }, + { + role: "assistant", + content: [{ type: "toolCall", id: "call-2", name: "exec", arguments: {} }], + }, + { + role: "assistant", + content: [{ type: "toolCall", id: "call-3", name: "search", arguments: {} }], + }, + ]; + + await runAnnounceFlowForTest("run-timeout-partial-progress", { + outcome: { status: "timeout" }, + roundOneReply: undefined, + }); + + const directAgentCall = findFinalDirectAgentCall(); + const internalEvents = + (directAgentCall?.params?.internalEvents as Array<{ result?: string }>) ?? []; + expect(internalEvents[0]?.result).toContain("3 tool call(s)"); + expect(internalEvents[0]?.result).not.toContain("data"); + }); + + it("preserves NO_REPLY when timeout history ends with silence after earlier progress", async () => { + chatHistoryMessages = [ + { + role: "assistant", + content: [ + { type: "text", text: "Still working through the files." }, + { type: "toolCall", id: "call-1", name: "read", arguments: {} }, + ], + }, + { + role: "assistant", + content: [{ type: "text", text: "NO_REPLY" }], + }, + { + role: "assistant", + content: [{ type: "toolCall", id: "call-2", name: "exec", arguments: {} }], + }, + ]; + + await runAnnounceFlowForTest("run-timeout-no-reply", { + outcome: { status: "timeout" }, + roundOneReply: undefined, + }); + + expect(findFinalDirectAgentCall()).toBeUndefined(); + }); + + it("prefers visible assistant progress over a later raw tool result", async () => { + chatHistoryMessages = [ + { + role: "assistant", + content: [{ type: "text", text: "Read 12 files. Narrowing the search now." }], + }, + { + role: "toolResult", + content: [{ type: "text", text: "grep output" }], + }, + ]; + + await runAnnounceFlowForTest("run-timeout-visible-assistant", { + outcome: { status: "timeout" }, + roundOneReply: undefined, + }); + + const directAgentCall = findFinalDirectAgentCall(); + const internalEvents = + (directAgentCall?.params?.internalEvents as Array<{ result?: string }>) ?? []; + expect(internalEvents[0]?.result).toContain("Read 12 files"); + expect(internalEvents[0]?.result).not.toContain("grep output"); + }); + + it("preserves NO_REPLY when timeout partial-progress history mixes prior text and later silence", async () => { + chatHistoryMessages = [ + { role: "user", content: "do something" }, + { + role: "assistant", + content: [ + { type: "text", text: "Still working through the files." }, + { type: "toolCall", id: "call1", name: "read", arguments: {} }, + ], + }, + { role: "toolResult", toolCallId: "call1", content: [{ type: "text", text: "data" }] }, + { + role: "assistant", + content: [{ type: "text", text: "NO_REPLY" }], + }, + { + role: "assistant", + content: [{ type: "toolCall", id: "call2", name: "exec", arguments: {} }], + }, + ]; + + await runAnnounceFlowForTest("run-timeout-mixed-no-reply", { + outcome: { status: "timeout" }, + roundOneReply: undefined, + }); + + expect( + findGatewayCall((call) => call.method === "agent" && call.expectFinal === true), + ).toBeUndefined(); + }); + + it("prefers NO_REPLY partial progress over a longer latest assistant reply", async () => { + chatHistoryMessages = [ + { role: "user", content: "do something" }, + { + role: "assistant", + content: [ + { type: "text", text: "Still working through the files." }, + { type: "toolCall", id: "call1", name: "read", arguments: {} }, + ], + }, + { role: "toolResult", toolCallId: "call1", content: [{ type: "text", text: "data" }] }, + { + role: "assistant", + content: [{ type: "text", text: "NO_REPLY" }], + }, + { + role: "assistant", + content: [{ type: "text", text: "A longer partial summary that should stay silent." }], + }, + ]; + + await runAnnounceFlowForTest("run-timeout-no-reply-overrides-latest-text", { + outcome: { status: "timeout" }, + roundOneReply: undefined, + }); + + expect( + findGatewayCall((call) => call.method === "agent" && call.expectFinal === true), + ).toBeUndefined(); + }); }); diff --git a/src/agents/subagent-announce.ts b/src/agents/subagent-announce.ts index eeef9db6b9b..ab2fbb1140e 100644 --- a/src/agents/subagent-announce.ts +++ b/src/agents/subagent-announce.ts @@ -47,7 +47,6 @@ import { import { type AnnounceQueueItem, enqueueAnnounce } from "./subagent-announce-queue.js"; import { getSubagentDepthFromSessionStore } from "./subagent-depth.js"; import type { SpawnSubagentMode } from "./subagent-spawn.js"; -import { readLatestAssistantReply } from "./tools/agent-step.js"; import { sanitizeTextContent, extractAssistantText } from "./tools/sessions-helpers.js"; import { isAnnounceSkip } from "./tools/sessions-send-helpers.js"; @@ -55,7 +54,6 @@ const FAST_TEST_MODE = process.env.OPENCLAW_TEST_FAST === "1"; const FAST_TEST_RETRY_INTERVAL_MS = 8; const DEFAULT_SUBAGENT_ANNOUNCE_TIMEOUT_MS = 90_000; const MAX_TIMER_SAFE_TIMEOUT_MS = 2_147_000_000; -const GATEWAY_TIMEOUT_PATTERN = /gateway timeout/i; let subagentRegistryRuntimePromise: Promise< typeof import("./subagent-registry-runtime.js") > | null = null; @@ -74,6 +72,14 @@ type ToolResultMessage = { content?: unknown; }; +type SubagentOutputSnapshot = { + latestAssistantText?: string; + latestSilentText?: string; + latestRawText?: string; + assistantFragments: string[]; + toolCallCount: number; +}; + function resolveSubagentAnnounceTimeoutMs(cfg: ReturnType): number { const configured = cfg.agents?.defaults?.subagents?.announceTimeoutMs; if (typeof configured !== "number" || !Number.isFinite(configured)) { @@ -110,7 +116,7 @@ const TRANSIENT_ANNOUNCE_DELIVERY_ERROR_PATTERNS: readonly RegExp[] = [ /no active .* listener/i, /gateway not connected/i, /gateway closed \(1006/i, - GATEWAY_TIMEOUT_PATTERN, + /gateway timeout/i, /\b(econnreset|econnrefused|etimedout|enotfound|ehostunreach|network error)\b/i, ]; @@ -136,11 +142,6 @@ function isTransientAnnounceDeliveryError(error: unknown): boolean { return TRANSIENT_ANNOUNCE_DELIVERY_ERROR_PATTERNS.some((re) => re.test(message)); } -function isGatewayTimeoutError(error: unknown): boolean { - const message = summarizeDeliveryError(error); - return Boolean(message) && GATEWAY_TIMEOUT_PATTERN.test(message); -} - async function waitForAnnounceRetryDelay(ms: number, signal?: AbortSignal): Promise { if (ms <= 0) { return; @@ -168,7 +169,6 @@ async function waitForAnnounceRetryDelay(ms: number, signal?: AbortSignal): Prom async function runAnnounceDeliveryWithRetry(params: { operation: string; - noRetryOnGatewayTimeout?: boolean; signal?: AbortSignal; run: () => Promise; }): Promise { @@ -180,9 +180,6 @@ async function runAnnounceDeliveryWithRetry(params: { try { return await params.run(); } catch (err) { - if (params.noRetryOnGatewayTimeout && isGatewayTimeoutError(err)) { - throw err; - } const delayMs = DIRECT_ANNOUNCE_TRANSIENT_RETRY_DELAYS_MS[retryIndex]; if (delayMs == null || !isTransientAnnounceDeliveryError(err) || params.signal?.aborted) { throw err; @@ -287,42 +284,126 @@ function extractSubagentOutputText(message: unknown): string { return ""; } -async function readLatestSubagentOutput(sessionKey: string): Promise { - try { - const latestAssistant = await readLatestAssistantReply({ - sessionKey, - limit: 50, - }); - if (latestAssistant?.trim()) { - return latestAssistant; - } - } catch { - // Best-effort: fall back to richer history parsing below. +function countAssistantToolCalls(content: unknown): number { + if (!Array.isArray(content)) { + return 0; } + let count = 0; + for (const block of content) { + if (!block || typeof block !== "object") { + continue; + } + const type = (block as { type?: unknown }).type; + if ( + type === "toolCall" || + type === "tool_use" || + type === "toolUse" || + type === "functionCall" || + type === "function_call" + ) { + count += 1; + } + } + return count; +} + +function summarizeSubagentOutputHistory(messages: Array): SubagentOutputSnapshot { + const snapshot: SubagentOutputSnapshot = { + assistantFragments: [], + toolCallCount: 0, + }; + for (const message of messages) { + if (!message || typeof message !== "object") { + continue; + } + const role = (message as { role?: unknown }).role; + if (role === "assistant") { + snapshot.toolCallCount += countAssistantToolCalls((message as { content?: unknown }).content); + const text = extractSubagentOutputText(message).trim(); + if (!text) { + continue; + } + if (isAnnounceSkip(text) || isSilentReplyText(text, SILENT_REPLY_TOKEN)) { + snapshot.latestSilentText = text; + snapshot.latestAssistantText = undefined; + snapshot.assistantFragments = []; + continue; + } + snapshot.latestSilentText = undefined; + snapshot.latestAssistantText = text; + snapshot.assistantFragments.push(text); + continue; + } + const text = extractSubagentOutputText(message).trim(); + if (text) { + snapshot.latestRawText = text; + } + } + return snapshot; +} + +function formatSubagentPartialProgress( + snapshot: SubagentOutputSnapshot, + outcome?: SubagentRunOutcome, +): string | undefined { + if (snapshot.latestSilentText) { + return undefined; + } + const timedOut = outcome?.status === "timeout"; + if (snapshot.assistantFragments.length === 0 && (!timedOut || snapshot.toolCallCount === 0)) { + return undefined; + } + const parts: string[] = []; + if (timedOut && snapshot.toolCallCount > 0) { + parts.push( + `[Partial progress: ${snapshot.toolCallCount} tool call(s) executed before timeout]`, + ); + } + if (snapshot.assistantFragments.length > 0) { + parts.push(snapshot.assistantFragments.slice(-3).join("\n\n---\n\n")); + } + return parts.join("\n\n") || undefined; +} + +function selectSubagentOutputText( + snapshot: SubagentOutputSnapshot, + outcome?: SubagentRunOutcome, +): string | undefined { + if (snapshot.latestSilentText) { + return snapshot.latestSilentText; + } + if (snapshot.latestAssistantText) { + return snapshot.latestAssistantText; + } + const partialProgress = formatSubagentPartialProgress(snapshot, outcome); + if (partialProgress) { + return partialProgress; + } + return snapshot.latestRawText; +} + +async function readSubagentOutput( + sessionKey: string, + outcome?: SubagentRunOutcome, +): Promise { const history = await callGateway<{ messages?: Array }>({ method: "chat.history", - params: { sessionKey, limit: 50 }, + params: { sessionKey, limit: 100 }, }); const messages = Array.isArray(history?.messages) ? history.messages : []; - for (let i = messages.length - 1; i >= 0; i -= 1) { - const msg = messages[i]; - const text = extractSubagentOutputText(msg); - if (text) { - return text; - } - } - return undefined; + return selectSubagentOutputText(summarizeSubagentOutputHistory(messages), outcome); } async function readLatestSubagentOutputWithRetry(params: { sessionKey: string; maxWaitMs: number; + outcome?: SubagentRunOutcome; }): Promise { const RETRY_INTERVAL_MS = FAST_TEST_MODE ? FAST_TEST_RETRY_INTERVAL_MS : 100; const deadline = Date.now() + Math.max(0, Math.min(params.maxWaitMs, 15_000)); let result: string | undefined; while (Date.now() < deadline) { - result = await readLatestSubagentOutput(params.sessionKey); + result = await readSubagentOutput(params.sessionKey, params.outcome); if (result?.trim()) { return result; } @@ -334,7 +415,7 @@ async function readLatestSubagentOutputWithRetry(params: { export async function captureSubagentCompletionReply( sessionKey: string, ): Promise { - const immediate = await readLatestSubagentOutput(sessionKey); + const immediate = await readSubagentOutput(sessionKey); if (immediate?.trim()) { return immediate; } @@ -811,7 +892,6 @@ async function sendSubagentAnnounceDirectly(params: { operation: params.expectsCompletionMessage ? "completion direct announce agent call" : "direct announce agent call", - noRetryOnGatewayTimeout: params.expectsCompletionMessage && shouldDeliverExternally, signal: params.signal, run: async () => await callGateway({ @@ -1321,13 +1401,14 @@ export async function runSubagentAnnounceFlow(params: { (isAnnounceSkip(fallbackReply) || isSilentReplyText(fallbackReply, SILENT_REPLY_TOKEN)); if (!reply) { - reply = await readLatestSubagentOutput(params.childSessionKey); + reply = await readSubagentOutput(params.childSessionKey, outcome); } if (!reply?.trim()) { reply = await readLatestSubagentOutputWithRetry({ sessionKey: params.childSessionKey, maxWaitMs: params.timeoutMs, + outcome, }); } diff --git a/src/config/config.web-search-provider.test.ts b/src/config/config.web-search-provider.test.ts index decb5e68e3b..b0319f219eb 100644 --- a/src/config/config.web-search-provider.test.ts +++ b/src/config/config.web-search-provider.test.ts @@ -136,6 +136,35 @@ function pluginWebSearchApiKey( } describe("web search provider config", () => { + it("does not warn for legacy brave config when bundled web search allowlist compat applies", () => { + const res = validateConfigObjectWithPlugins({ + plugins: { + allow: ["bluebubbles", "memory-core"], + }, + tools: { + web: { + search: { + enabled: true, + apiKey: "test-brave-key", // pragma: allowlist secret + }, + }, + }, + }); + + expect(res.ok).toBe(true); + if (!res.ok) { + return; + } + expect(res.warnings).not.toContainEqual( + expect.objectContaining({ + path: "plugins.entries.brave", + message: expect.stringContaining( + "plugin disabled (not in allowlist) but config is present", + ), + }), + ); + }); + it("accepts perplexity provider and config", () => { const res = validateConfigObjectWithPlugins( buildWebSearchProviderConfig({ diff --git a/src/config/validation.ts b/src/config/validation.ts index 0c2bba53aae..98a1fd29fc6 100644 --- a/src/config/validation.ts +++ b/src/config/validation.ts @@ -1,6 +1,8 @@ import path from "node:path"; import { resolveAgentWorkspaceDir, resolveDefaultAgentId } from "../agents/agent-scope.js"; import { CHANNEL_IDS, normalizeChatChannelId } from "../channels/registry.js"; +import { withBundledPluginAllowlistCompat } from "../plugins/bundled-compat.js"; +import { resolveBundledWebSearchPluginIds } from "../plugins/bundled-web-search.js"; import { normalizePluginsConfig, resolveEffectiveEnableState, @@ -351,15 +353,38 @@ function validateConfigObjectWithPluginsBase( }; let registryInfo: RegistryInfo | null = null; + let compatConfig: OpenClawConfig | null | undefined; + + const ensureCompatConfig = (): OpenClawConfig => { + if (compatConfig !== undefined) { + return compatConfig ?? config; + } + + const workspaceDir = resolveAgentWorkspaceDir(config, resolveDefaultAgentId(config)); + const bundledWebSearchPluginIds = resolveBundledWebSearchPluginIds({ + config, + workspaceDir: workspaceDir ?? undefined, + env: opts.env, + }); + compatConfig = withBundledPluginAllowlistCompat({ + config, + pluginIds: bundledWebSearchPluginIds, + }); + return compatConfig ?? config; + }; const ensureRegistry = (): RegistryInfo => { if (registryInfo) { return registryInfo; } - const workspaceDir = resolveAgentWorkspaceDir(config, resolveDefaultAgentId(config)); + const effectiveConfig = ensureCompatConfig(); + const workspaceDir = resolveAgentWorkspaceDir( + effectiveConfig, + resolveDefaultAgentId(effectiveConfig), + ); const registry = loadPluginManifestRegistry({ - config, + config: effectiveConfig, workspaceDir: workspaceDir ?? undefined, env: opts.env, }); @@ -393,7 +418,7 @@ function validateConfigObjectWithPluginsBase( const ensureNormalizedPlugins = (): ReturnType => { const info = ensureRegistry(); if (!info.normalizedPlugins) { - info.normalizedPlugins = normalizePluginsConfig(config.plugins); + info.normalizedPlugins = normalizePluginsConfig(ensureCompatConfig().plugins); } return info.normalizedPlugins; }; diff --git a/src/infra/outbound/message-action-runner.ts b/src/infra/outbound/message-action-runner.ts index 318699c1042..d8d40cbe28c 100644 --- a/src/infra/outbound/message-action-runner.ts +++ b/src/infra/outbound/message-action-runner.ts @@ -318,14 +318,16 @@ async function handleBroadcastAction( throw new Error("Broadcast requires at least one target in --targets."); } const channelHint = readStringParam(params, "channel"); - const configured = await listConfiguredMessageChannels(input.cfg); - if (configured.length === 0) { - throw new Error("Broadcast requires at least one configured channel."); - } const targetChannels = channelHint && channelHint.trim().toLowerCase() !== "all" ? [await resolveChannel(input.cfg, { channel: channelHint }, input.toolContext)] - : configured; + : await (async () => { + const configured = await listConfiguredMessageChannels(input.cfg); + if (configured.length === 0) { + throw new Error("Broadcast requires at least one configured channel."); + } + return configured; + })(); const results: Array<{ channel: ChannelId; to: string; diff --git a/src/plugins/runtime/runtime-matrix-boundary.ts b/src/plugins/runtime/runtime-matrix-boundary.ts new file mode 100644 index 00000000000..a122e613c1f --- /dev/null +++ b/src/plugins/runtime/runtime-matrix-boundary.ts @@ -0,0 +1,129 @@ +import fs from "node:fs"; +import path from "node:path"; +import { createJiti } from "jiti"; +import { loadConfig } from "../../config/config.js"; +import { loadPluginManifestRegistry } from "../manifest-registry.js"; +import { + buildPluginLoaderJitiOptions, + resolvePluginSdkAliasFile, + resolvePluginSdkScopedAliasMap, + shouldPreferNativeJiti, +} from "../sdk-alias.js"; + +const MATRIX_PLUGIN_ID = "matrix"; + +type MatrixModule = typeof import("../../../extensions/matrix/runtime-api.js"); + +type MatrixPluginRecord = { + rootDir?: string; + source: string; +}; + +let cachedModulePath: string | null = null; +let cachedModule: MatrixModule | null = null; + +const jitiLoaders = new Map>(); + +function readConfigSafely() { + try { + return loadConfig(); + } catch { + return {}; + } +} + +function resolveMatrixPluginRecord(): MatrixPluginRecord | null { + const manifestRegistry = loadPluginManifestRegistry({ + config: readConfigSafely(), + cache: true, + }); + const record = manifestRegistry.plugins.find((plugin) => plugin.id === MATRIX_PLUGIN_ID); + if (!record?.source) { + return null; + } + return { + rootDir: record.rootDir, + source: record.source, + }; +} + +function resolveMatrixRuntimeModulePath(record: MatrixPluginRecord): string | null { + const candidates = [ + path.join(path.dirname(record.source), "runtime-api.js"), + path.join(path.dirname(record.source), "runtime-api.ts"), + ...(record.rootDir + ? [path.join(record.rootDir, "runtime-api.js"), path.join(record.rootDir, "runtime-api.ts")] + : []), + ]; + for (const candidate of candidates) { + if (fs.existsSync(candidate)) { + return candidate; + } + } + return null; +} + +function getJiti(modulePath: string) { + const tryNative = shouldPreferNativeJiti(modulePath); + const cached = jitiLoaders.get(tryNative); + if (cached) { + return cached; + } + const pluginSdkAlias = resolvePluginSdkAliasFile({ + srcFile: "root-alias.cjs", + distFile: "root-alias.cjs", + modulePath, + }); + const aliasMap = { + ...(pluginSdkAlias ? { "openclaw/plugin-sdk": pluginSdkAlias } : {}), + ...resolvePluginSdkScopedAliasMap({ modulePath }), + }; + const loader = createJiti(import.meta.url, { + ...buildPluginLoaderJitiOptions(aliasMap), + tryNative, + }); + jitiLoaders.set(tryNative, loader); + return loader; +} + +function loadWithJiti(modulePath: string): TModule { + return getJiti(modulePath)(modulePath) as TModule; +} + +function loadMatrixModule(): MatrixModule | null { + const record = resolveMatrixPluginRecord(); + if (!record) { + return null; + } + const modulePath = resolveMatrixRuntimeModulePath(record); + if (!modulePath) { + return null; + } + if (cachedModule && cachedModulePath === modulePath) { + return cachedModule; + } + const loaded = loadWithJiti(modulePath); + cachedModulePath = modulePath; + cachedModule = loaded; + return loaded; +} + +export function setMatrixThreadBindingIdleTimeoutBySessionKey( + ...args: Parameters +): ReturnType { + const fn = loadMatrixModule()?.setMatrixThreadBindingIdleTimeoutBySessionKey; + if (typeof fn !== "function") { + return []; + } + return fn(...args); +} + +export function setMatrixThreadBindingMaxAgeBySessionKey( + ...args: Parameters +): ReturnType { + const fn = loadMatrixModule()?.setMatrixThreadBindingMaxAgeBySessionKey; + if (typeof fn !== "function") { + return []; + } + return fn(...args); +} diff --git a/src/plugins/runtime/runtime-matrix.ts b/src/plugins/runtime/runtime-matrix.ts index abcb0cdf375..ac72161f69f 100644 --- a/src/plugins/runtime/runtime-matrix.ts +++ b/src/plugins/runtime/runtime-matrix.ts @@ -1,7 +1,7 @@ import { setMatrixThreadBindingIdleTimeoutBySessionKey, setMatrixThreadBindingMaxAgeBySessionKey, -} from "../../../extensions/matrix/runtime-api.js"; +} from "./runtime-matrix-boundary.js"; import type { PluginRuntimeChannel } from "./types-channel.js"; export function createRuntimeMatrix(): PluginRuntimeChannel["matrix"] { diff --git a/test/fixtures/test-parallel.behavior.json b/test/fixtures/test-parallel.behavior.json index 954b5f87557..f1ec0643026 100644 --- a/test/fixtures/test-parallel.behavior.json +++ b/test/fixtures/test-parallel.behavior.json @@ -333,6 +333,10 @@ "file": "src/infra/outbound/message-action-runner.poll.test.ts", "reason": "Terminates cleanly under threads, but not process forks on this host." }, + { + "file": "src/infra/outbound/message-action-runner.context.test.ts", + "reason": "Terminates cleanly under threads, but not process forks on this host." + }, { "file": "src/tts/tts.test.ts", "reason": "Terminates cleanly under threads, but not process forks on this host."