Gateway: fix hook dispatch and Discord provider teardown
This commit is contained in:
parent
6b4c24c2e5
commit
ed6e5652cd
@ -20,6 +20,39 @@ const { listAccountIds, resolveDefaultAccountId } = createAccountListHelpers("di
|
||||
export const listDiscordAccountIds = listAccountIds;
|
||||
export const resolveDefaultDiscordAccountId = resolveDefaultAccountId;
|
||||
|
||||
const managedDiscordAccountIdByBotUserId = new Map<string, string>();
|
||||
|
||||
export function rememberDiscordManagedBotIdentity(params: {
|
||||
botUserId: string;
|
||||
accountId: string;
|
||||
}): void {
|
||||
managedDiscordAccountIdByBotUserId.set(params.botUserId, params.accountId);
|
||||
}
|
||||
|
||||
export function forgetDiscordManagedBotIdentity(params: {
|
||||
botUserId?: string | null;
|
||||
accountId?: string | null;
|
||||
}): void {
|
||||
if (!params.botUserId) {
|
||||
return;
|
||||
}
|
||||
if (
|
||||
!params.accountId ||
|
||||
managedDiscordAccountIdByBotUserId.get(params.botUserId) === params.accountId
|
||||
) {
|
||||
managedDiscordAccountIdByBotUserId.delete(params.botUserId);
|
||||
}
|
||||
}
|
||||
|
||||
export function resolveDiscordManagedAccountIdByBotUserId(
|
||||
botUserId?: string | null,
|
||||
): string | undefined {
|
||||
if (!botUserId) {
|
||||
return undefined;
|
||||
}
|
||||
return managedDiscordAccountIdByBotUserId.get(botUserId);
|
||||
}
|
||||
|
||||
export function resolveDiscordAccountConfig(
|
||||
cfg: OpenClawConfig,
|
||||
accountId: string,
|
||||
|
||||
@ -18,6 +18,7 @@ type DiscordInboundWorkerParams = {
|
||||
export type DiscordInboundWorker = {
|
||||
enqueue: (job: DiscordInboundJob) => void;
|
||||
deactivate: () => void;
|
||||
waitForIdle: () => Promise<void>;
|
||||
};
|
||||
|
||||
function formatDiscordRunContextSuffix(job: DiscordInboundJob): string {
|
||||
@ -101,5 +102,8 @@ export function createDiscordInboundWorker(
|
||||
});
|
||||
},
|
||||
deactivate: runState.deactivate,
|
||||
waitForIdle: async () => {
|
||||
await runQueue.waitForIdle();
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
@ -36,7 +36,10 @@ import { danger, logVerbose, shouldLogVerbose } from "openclaw/plugin-sdk/runtim
|
||||
import { convertMarkdownTables } from "openclaw/plugin-sdk/text-runtime";
|
||||
import { stripReasoningTagsFromText } from "openclaw/plugin-sdk/text-runtime";
|
||||
import { truncateUtf16Safe } from "openclaw/plugin-sdk/text-runtime";
|
||||
import { resolveDiscordMaxLinesPerMessage } from "../accounts.js";
|
||||
import {
|
||||
resolveDiscordManagedAccountIdByBotUserId,
|
||||
resolveDiscordMaxLinesPerMessage,
|
||||
} from "../accounts.js";
|
||||
import { chunkDiscordTextWithMode } from "../chunk.js";
|
||||
import { resolveDiscordDraftStreamingChunking } from "../draft-chunking.js";
|
||||
import { createDiscordDraftStream } from "../draft-stream.js";
|
||||
@ -224,6 +227,7 @@ export async function processDiscordMessage(ctx: DiscordMessagePreflightContext)
|
||||
? (sender.tag ?? sender.name ?? author.username)
|
||||
: author.username;
|
||||
const senderTag = sender.tag;
|
||||
const senderManagedAccountId = resolveDiscordManagedAccountIdByBotUserId(sender.id);
|
||||
const { groupSystemPrompt, ownerAllowFrom, untrustedContext } = buildDiscordInboundAccessContext({
|
||||
channelConfig,
|
||||
guildInfo,
|
||||
@ -364,6 +368,7 @@ export async function processDiscordMessage(ctx: DiscordMessagePreflightContext)
|
||||
ChatType: isDirectMessage ? "direct" : "channel",
|
||||
ConversationLabel: fromLabel,
|
||||
SenderName: senderName,
|
||||
SenderManagedAccountId: senderManagedAccountId,
|
||||
SenderId: sender.id,
|
||||
SenderUsername: senderUsername,
|
||||
SenderTag: senderTag,
|
||||
|
||||
@ -28,6 +28,7 @@ type DiscordMessageHandlerParams = Omit<
|
||||
|
||||
export type DiscordMessageHandlerWithLifecycle = DiscordMessageHandler & {
|
||||
deactivate: () => void;
|
||||
waitForIdle: () => Promise<void>;
|
||||
};
|
||||
|
||||
export function createDiscordMessageHandler(
|
||||
@ -181,6 +182,7 @@ export function createDiscordMessageHandler(
|
||||
};
|
||||
|
||||
handler.deactivate = inboundWorker.deactivate;
|
||||
handler.waitForIdle = inboundWorker.waitForIdle;
|
||||
|
||||
return handler;
|
||||
}
|
||||
|
||||
@ -7,6 +7,7 @@ import {
|
||||
baseRuntime,
|
||||
getFirstDiscordMessageHandlerParams,
|
||||
getProviderMonitorTestMocks,
|
||||
mockResolvedDiscordAccountConfig,
|
||||
resetDiscordProviderMonitorMocks,
|
||||
} from "../../../../test/helpers/extensions/discord-provider.test-support.js";
|
||||
|
||||
@ -28,29 +29,16 @@ const {
|
||||
listSkillCommandsForAgentsMock,
|
||||
monitorLifecycleMock,
|
||||
reconcileAcpThreadBindingsOnStartupMock,
|
||||
rememberDiscordManagedBotIdentityMock,
|
||||
resolveDiscordAllowlistConfigMock,
|
||||
resolveDiscordAccountMock,
|
||||
resolveNativeCommandsEnabledMock,
|
||||
resolveNativeSkillsEnabledMock,
|
||||
shouldLogVerboseMock,
|
||||
forgetDiscordManagedBotIdentityMock,
|
||||
voiceRuntimeModuleLoadedMock,
|
||||
} = getProviderMonitorTestMocks();
|
||||
|
||||
function createConfigWithDiscordAccount(overrides: Record<string, unknown> = {}): OpenClawConfig {
|
||||
return {
|
||||
channels: {
|
||||
discord: {
|
||||
accounts: {
|
||||
default: {
|
||||
token: "MTIz.abc.def",
|
||||
...overrides,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
} as OpenClawConfig;
|
||||
}
|
||||
|
||||
vi.mock("openclaw/plugin-sdk/plugin-runtime", async () => {
|
||||
const actual = await vi.importActual<typeof import("openclaw/plugin-sdk/plugin-runtime")>(
|
||||
"openclaw/plugin-sdk/plugin-runtime",
|
||||
@ -61,6 +49,23 @@ vi.mock("openclaw/plugin-sdk/plugin-runtime", async () => {
|
||||
};
|
||||
});
|
||||
|
||||
vi.mock("../accounts.js", () => ({
|
||||
forgetDiscordManagedBotIdentity: forgetDiscordManagedBotIdentityMock,
|
||||
rememberDiscordManagedBotIdentity: rememberDiscordManagedBotIdentityMock,
|
||||
resolveDiscordAccount: resolveDiscordAccountMock,
|
||||
}));
|
||||
|
||||
vi.mock("../probe.js", () => ({
|
||||
fetchDiscordApplicationId: async () => "app-1",
|
||||
}));
|
||||
|
||||
vi.mock("../token.js", () => ({
|
||||
normalizeDiscordToken: (value?: string) => value,
|
||||
}));
|
||||
|
||||
vi.mock("../voice/command.js", () => ({
|
||||
createDiscordVoiceCommand: () => ({ name: "voice-command" }),
|
||||
}));
|
||||
vi.mock("../voice/manager.runtime.js", () => {
|
||||
voiceRuntimeModuleLoadedMock();
|
||||
return {
|
||||
@ -115,18 +120,7 @@ describe("monitorDiscordProvider", () => {
|
||||
};
|
||||
|
||||
beforeEach(() => {
|
||||
vi.resetModules();
|
||||
resetDiscordProviderMonitorMocks();
|
||||
vi.doMock("../accounts.js", () => ({
|
||||
resolveDiscordAccount: (...args: Parameters<typeof resolveDiscordAccountMock>) =>
|
||||
resolveDiscordAccountMock(...args),
|
||||
}));
|
||||
vi.doMock("../probe.js", () => ({
|
||||
fetchDiscordApplicationId: async () => "app-1",
|
||||
}));
|
||||
vi.doMock("../token.js", () => ({
|
||||
normalizeDiscordToken: (value?: string) => value,
|
||||
}));
|
||||
});
|
||||
|
||||
it("stops thread bindings when startup fails before lifecycle begins", async () => {
|
||||
@ -175,7 +169,7 @@ describe("monitorDiscordProvider", () => {
|
||||
it("loads the Discord voice runtime only when voice is enabled", async () => {
|
||||
resolveDiscordAccountMock.mockReturnValue({
|
||||
accountId: "default",
|
||||
token: "MTIz.abc.def",
|
||||
token: "cfg-token",
|
||||
config: {
|
||||
commands: { native: true, nativeSkills: false },
|
||||
voice: { enabled: true },
|
||||
@ -392,19 +386,12 @@ describe("monitorDiscordProvider", () => {
|
||||
});
|
||||
|
||||
it("forwards custom eventQueue config from discord config to Carbon Client", async () => {
|
||||
resolveDiscordAccountMock.mockReturnValue({
|
||||
accountId: "default",
|
||||
token: "MTIz.abc.def",
|
||||
config: {
|
||||
commands: { native: true, nativeSkills: false },
|
||||
voice: { enabled: false },
|
||||
agentComponents: { enabled: false },
|
||||
execApprovals: { enabled: false },
|
||||
eventQueue: { listenerTimeout: 300_000 },
|
||||
},
|
||||
});
|
||||
const { monitorDiscordProvider } = await import("./provider.js");
|
||||
|
||||
mockResolvedDiscordAccountConfig({
|
||||
eventQueue: { listenerTimeout: 300_000 },
|
||||
});
|
||||
|
||||
await monitorDiscordProvider({
|
||||
config: baseConfig(),
|
||||
runtime: baseRuntime(),
|
||||
@ -417,10 +404,12 @@ describe("monitorDiscordProvider", () => {
|
||||
it("does not reuse eventQueue.listenerTimeout as the queued inbound worker timeout", async () => {
|
||||
const { monitorDiscordProvider } = await import("./provider.js");
|
||||
|
||||
mockResolvedDiscordAccountConfig({
|
||||
eventQueue: { listenerTimeout: 50_000 },
|
||||
});
|
||||
|
||||
await monitorDiscordProvider({
|
||||
config: createConfigWithDiscordAccount({
|
||||
eventQueue: { listenerTimeout: 50_000 },
|
||||
}),
|
||||
config: baseConfig(),
|
||||
runtime: baseRuntime(),
|
||||
});
|
||||
|
||||
@ -433,19 +422,12 @@ describe("monitorDiscordProvider", () => {
|
||||
});
|
||||
|
||||
it("forwards inbound worker timeout config to the Discord message handler", async () => {
|
||||
resolveDiscordAccountMock.mockReturnValue({
|
||||
accountId: "default",
|
||||
token: "MTIz.abc.def",
|
||||
config: {
|
||||
commands: { native: true, nativeSkills: false },
|
||||
voice: { enabled: false },
|
||||
agentComponents: { enabled: false },
|
||||
execApprovals: { enabled: false },
|
||||
inboundWorker: { runTimeoutMs: 300_000 },
|
||||
},
|
||||
});
|
||||
const { monitorDiscordProvider } = await import("./provider.js");
|
||||
|
||||
mockResolvedDiscordAccountConfig({
|
||||
inboundWorker: { runTimeoutMs: 300_000 },
|
||||
});
|
||||
|
||||
await monitorDiscordProvider({
|
||||
config: baseConfig(),
|
||||
runtime: baseRuntime(),
|
||||
@ -479,43 +461,6 @@ describe("monitorDiscordProvider", () => {
|
||||
expect(commandNames).toContain("cron_jobs");
|
||||
});
|
||||
|
||||
it("registers plugin commands from the real registry as native Discord commands", async () => {
|
||||
const { clearPluginCommands, getPluginCommandSpecs, registerPluginCommand } =
|
||||
await import("../../../../src/plugins/commands.js");
|
||||
clearPluginCommands();
|
||||
const { monitorDiscordProvider } = await import("./provider.js");
|
||||
listNativeCommandSpecsForConfigMock.mockReturnValue([
|
||||
{ name: "status", description: "Status", acceptsArgs: false },
|
||||
]);
|
||||
getPluginCommandSpecsMock.mockImplementation((provider?: string) =>
|
||||
getPluginCommandSpecs(provider),
|
||||
);
|
||||
|
||||
expect(
|
||||
registerPluginCommand("demo-plugin", {
|
||||
name: "pair",
|
||||
description: "Pair device",
|
||||
acceptsArgs: true,
|
||||
requireAuth: false,
|
||||
handler: async ({ args }) => ({ text: `paired:${args ?? ""}` }),
|
||||
}),
|
||||
).toEqual({ ok: true });
|
||||
|
||||
await monitorDiscordProvider({
|
||||
config: baseConfig(),
|
||||
runtime: baseRuntime(),
|
||||
});
|
||||
|
||||
const commandNames = (createDiscordNativeCommandMock.mock.calls as Array<unknown[]>)
|
||||
.map((call) => (call[0] as { command?: { name?: string } } | undefined)?.command?.name)
|
||||
.filter((value): value is string => typeof value === "string");
|
||||
|
||||
expect(commandNames).toContain("status");
|
||||
expect(commandNames).toContain("pair");
|
||||
expect(clientHandleDeployRequestMock).toHaveBeenCalledTimes(1);
|
||||
expect(monitorLifecycleMock).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("continues startup when Discord daily slash-command create quota is exhausted", async () => {
|
||||
const { RateLimitError } = await import("@buape/carbon");
|
||||
const { monitorDiscordProvider } = await import("./provider.js");
|
||||
@ -550,6 +495,85 @@ describe("monitorDiscordProvider", () => {
|
||||
);
|
||||
});
|
||||
|
||||
it("waits for inbound handler idle before forgetting managed bot identity", async () => {
|
||||
const { monitorDiscordProvider } = await import("./provider.js");
|
||||
const deactivate = vi.fn();
|
||||
const waitForIdle = vi.fn(async () => undefined);
|
||||
createDiscordMessageHandlerMock.mockImplementation(() =>
|
||||
Object.assign(
|
||||
vi.fn(async () => undefined),
|
||||
{
|
||||
deactivate,
|
||||
waitForIdle,
|
||||
},
|
||||
),
|
||||
);
|
||||
|
||||
await monitorDiscordProvider({
|
||||
config: baseConfig(),
|
||||
runtime: baseRuntime(),
|
||||
});
|
||||
|
||||
expect(rememberDiscordManagedBotIdentityMock).toHaveBeenCalledWith({
|
||||
botUserId: "bot-1",
|
||||
accountId: "default",
|
||||
});
|
||||
expect(deactivate).toHaveBeenCalledTimes(1);
|
||||
expect(waitForIdle).toHaveBeenCalledTimes(1);
|
||||
expect(forgetDiscordManagedBotIdentityMock).toHaveBeenCalledWith({
|
||||
botUserId: "bot-1",
|
||||
accountId: "default",
|
||||
});
|
||||
expect(deactivate.mock.invocationCallOrder[0]).toBeLessThan(
|
||||
waitForIdle.mock.invocationCallOrder[0],
|
||||
);
|
||||
expect(waitForIdle.mock.invocationCallOrder[0]).toBeLessThan(
|
||||
forgetDiscordManagedBotIdentityMock.mock.invocationCallOrder[0],
|
||||
);
|
||||
});
|
||||
|
||||
it("bounds inbound handler idle wait during teardown", async () => {
|
||||
vi.useFakeTimers();
|
||||
try {
|
||||
const { monitorDiscordProvider } = await import("./provider.js");
|
||||
const deactivate = vi.fn();
|
||||
const waitForIdle = vi.fn(() => new Promise<undefined>(() => undefined));
|
||||
createDiscordMessageHandlerMock.mockImplementation(() =>
|
||||
Object.assign(
|
||||
vi.fn(async () => undefined),
|
||||
{
|
||||
deactivate,
|
||||
waitForIdle,
|
||||
},
|
||||
),
|
||||
);
|
||||
mockResolvedDiscordAccountConfig({
|
||||
inboundWorker: { runTimeoutMs: 5_000 },
|
||||
});
|
||||
const runtime = baseRuntime();
|
||||
|
||||
const monitorPromise = monitorDiscordProvider({
|
||||
config: baseConfig(),
|
||||
runtime,
|
||||
});
|
||||
|
||||
await vi.advanceTimersByTimeAsync(5_100);
|
||||
await expect(monitorPromise).resolves.toBeUndefined();
|
||||
|
||||
expect(deactivate).toHaveBeenCalledTimes(1);
|
||||
expect(waitForIdle).toHaveBeenCalledTimes(1);
|
||||
expect(forgetDiscordManagedBotIdentityMock).toHaveBeenCalledWith({
|
||||
botUserId: "bot-1",
|
||||
accountId: "default",
|
||||
});
|
||||
expect(runtime.log).toHaveBeenCalledWith(
|
||||
expect.stringContaining("inbound handler did not drain within 5000ms during teardown"),
|
||||
);
|
||||
} finally {
|
||||
vi.useRealTimers();
|
||||
}
|
||||
});
|
||||
|
||||
it("configures Carbon native deploy by default", async () => {
|
||||
const { monitorDiscordProvider } = await import("./provider.js");
|
||||
|
||||
@ -581,50 +605,4 @@ describe("monitorDiscordProvider", () => {
|
||||
expect(connectedTrue).toBeDefined();
|
||||
expect(connectedFalse).toBeDefined();
|
||||
});
|
||||
|
||||
it("logs Discord startup phases and early gateway debug events", async () => {
|
||||
const { monitorDiscordProvider } = await import("./provider.js");
|
||||
const runtime = baseRuntime();
|
||||
const emitter = new EventEmitter();
|
||||
const gateway = { emitter, isConnected: true, reconnectAttempts: 0 };
|
||||
clientGetPluginMock.mockImplementation((name: string) =>
|
||||
name === "gateway" ? gateway : undefined,
|
||||
);
|
||||
clientFetchUserMock.mockImplementationOnce(async () => {
|
||||
emitter.emit("debug", "WebSocket connection opened");
|
||||
return { id: "bot-1", username: "Molty" };
|
||||
});
|
||||
isVerboseMock.mockReturnValue(true);
|
||||
|
||||
await monitorDiscordProvider({
|
||||
config: baseConfig(),
|
||||
runtime,
|
||||
});
|
||||
|
||||
const messages = vi.mocked(runtime.log).mock.calls.map((call) => String(call[0]));
|
||||
expect(messages.some((msg) => msg.includes("fetch-application-id:start"))).toBe(true);
|
||||
expect(messages.some((msg) => msg.includes("fetch-application-id:done"))).toBe(true);
|
||||
expect(messages.some((msg) => msg.includes("deploy-commands:start"))).toBe(true);
|
||||
expect(messages.some((msg) => msg.includes("deploy-commands:done"))).toBe(true);
|
||||
expect(messages.some((msg) => msg.includes("fetch-bot-identity:start"))).toBe(true);
|
||||
expect(messages.some((msg) => msg.includes("fetch-bot-identity:done"))).toBe(true);
|
||||
expect(
|
||||
messages.some(
|
||||
(msg) => msg.includes("gateway-debug") && msg.includes("WebSocket connection opened"),
|
||||
),
|
||||
).toBe(true);
|
||||
});
|
||||
|
||||
it("keeps Discord startup chatter quiet by default", async () => {
|
||||
const { monitorDiscordProvider } = await import("./provider.js");
|
||||
const runtime = baseRuntime();
|
||||
|
||||
await monitorDiscordProvider({
|
||||
config: baseConfig(),
|
||||
runtime,
|
||||
});
|
||||
|
||||
const messages = vi.mocked(runtime.log).mock.calls.map((call) => String(call[0]));
|
||||
expect(messages.some((msg) => msg.includes("discord startup ["))).toBe(false);
|
||||
});
|
||||
});
|
||||
|
||||
@ -52,7 +52,11 @@ import {
|
||||
import { createSubsystemLogger } from "openclaw/plugin-sdk/runtime-env";
|
||||
import { createNonExitingRuntime, type RuntimeEnv } from "openclaw/plugin-sdk/runtime-env";
|
||||
import { summarizeStringEntries } from "openclaw/plugin-sdk/text-runtime";
|
||||
import { resolveDiscordAccount } from "../accounts.js";
|
||||
import {
|
||||
forgetDiscordManagedBotIdentity,
|
||||
rememberDiscordManagedBotIdentity,
|
||||
resolveDiscordAccount,
|
||||
} from "../accounts.js";
|
||||
import { getDiscordGatewayEmitter } from "../monitor.gateway.js";
|
||||
import { fetchDiscordApplicationId } from "../probe.js";
|
||||
import { normalizeDiscordToken } from "../token.js";
|
||||
@ -99,6 +103,7 @@ import {
|
||||
reconcileAcpThreadBindingsOnStartup,
|
||||
} from "./thread-bindings.js";
|
||||
import { formatThreadBindingDurationLabel } from "./thread-bindings.messages.js";
|
||||
import { normalizeDiscordInboundWorkerTimeoutMs } from "./timeouts.js";
|
||||
|
||||
export type MonitorDiscordOpts = {
|
||||
token?: string;
|
||||
@ -161,6 +166,30 @@ function appendPluginCommandSpecs(params: {
|
||||
|
||||
const DISCORD_ACP_STATUS_PROBE_TIMEOUT_MS = 8_000;
|
||||
const DISCORD_ACP_STALE_RUNNING_ACTIVITY_MS = 2 * 60 * 1000;
|
||||
function withTimeout<T>(promise: Promise<T>, timeoutMs: number): Promise<T> {
|
||||
if (!timeoutMs || timeoutMs <= 0) {
|
||||
return promise;
|
||||
}
|
||||
let timer: NodeJS.Timeout | null = null;
|
||||
const timeout = new Promise<T>((_, reject) => {
|
||||
timer = setTimeout(() => reject(new Error("timeout")), timeoutMs);
|
||||
});
|
||||
return Promise.race([promise, timeout]).finally(() => {
|
||||
if (timer) {
|
||||
clearTimeout(timer);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
const DISCORD_MESSAGE_HANDLER_IDLE_TIMEOUT_CAP_MS = 15_000;
|
||||
|
||||
function resolveDiscordMessageHandlerIdleTimeoutMs(raw: number | undefined): number {
|
||||
const normalized = normalizeDiscordInboundWorkerTimeoutMs(raw);
|
||||
if (!normalized) {
|
||||
return DISCORD_MESSAGE_HANDLER_IDLE_TIMEOUT_CAP_MS;
|
||||
}
|
||||
return Math.min(normalized, DISCORD_MESSAGE_HANDLER_IDLE_TIMEOUT_CAP_MS);
|
||||
}
|
||||
|
||||
function isLegacyMissingSessionError(message: string): boolean {
|
||||
return (
|
||||
@ -631,9 +660,11 @@ export async function monitorDiscordProvider(opts: MonitorDiscordOpts = {}) {
|
||||
let lifecycleStarted = false;
|
||||
let releaseEarlyGatewayErrorGuard = () => {};
|
||||
let deactivateMessageHandler: (() => void) | undefined;
|
||||
let waitForMessageHandlerIdle: (() => Promise<void>) | undefined;
|
||||
let autoPresenceController: ReturnType<typeof createDiscordAutoPresenceController> | null = null;
|
||||
let earlyGatewayEmitter: ReturnType<typeof getDiscordGatewayEmitter> | undefined;
|
||||
let onEarlyGatewayDebug: ((msg: unknown) => void) | undefined;
|
||||
let botUserId: string | undefined;
|
||||
try {
|
||||
const commands: BaseCommand[] = commandSpecs.map((spec) =>
|
||||
createDiscordNativeCommand({
|
||||
@ -827,7 +858,6 @@ export async function monitorDiscordProvider(opts: MonitorDiscordOpts = {}) {
|
||||
|
||||
const logger = createSubsystemLogger("discord/monitor");
|
||||
const guildHistories = new Map<string, HistoryEntry[]>();
|
||||
let botUserId: string | undefined;
|
||||
let botUserName: string | undefined;
|
||||
let voiceManager: DiscordVoiceManager | null = null;
|
||||
|
||||
@ -864,6 +894,12 @@ export async function monitorDiscordProvider(opts: MonitorDiscordOpts = {}) {
|
||||
const botUser = await client.fetchUser("@me");
|
||||
botUserId = botUser?.id;
|
||||
botUserName = botUser?.username?.trim() || botUser?.globalName?.trim() || undefined;
|
||||
if (botUserId) {
|
||||
rememberDiscordManagedBotIdentity({
|
||||
botUserId,
|
||||
accountId: account.accountId,
|
||||
});
|
||||
}
|
||||
logDiscordStartupPhase({
|
||||
runtime,
|
||||
accountId: account.accountId,
|
||||
@ -922,6 +958,7 @@ export async function monitorDiscordProvider(opts: MonitorDiscordOpts = {}) {
|
||||
discordRestFetch,
|
||||
});
|
||||
deactivateMessageHandler = messageHandler.deactivate;
|
||||
waitForMessageHandlerIdle = messageHandler.waitForIdle;
|
||||
const trackInboundEvent = opts.setStatus
|
||||
? () => {
|
||||
const at = Date.now();
|
||||
@ -996,6 +1033,24 @@ export async function monitorDiscordProvider(opts: MonitorDiscordOpts = {}) {
|
||||
});
|
||||
} finally {
|
||||
deactivateMessageHandler?.();
|
||||
if (waitForMessageHandlerIdle) {
|
||||
const idleTimeoutMs = resolveDiscordMessageHandlerIdleTimeoutMs(
|
||||
discordCfg.inboundWorker?.runTimeoutMs,
|
||||
);
|
||||
try {
|
||||
await withTimeout(waitForMessageHandlerIdle(), idleTimeoutMs);
|
||||
} catch (error) {
|
||||
const message =
|
||||
error instanceof Error && error.message === "timeout"
|
||||
? `discord: inbound handler did not drain within ${idleTimeoutMs}ms during teardown; continuing shutdown`
|
||||
: `discord: inbound handler idle wait failed during teardown: ${formatErrorMessage(error)}`;
|
||||
runtime.log?.(warn(message));
|
||||
}
|
||||
}
|
||||
forgetDiscordManagedBotIdentity({
|
||||
botUserId,
|
||||
accountId: account.accountId,
|
||||
});
|
||||
autoPresenceController?.stop();
|
||||
opts.setStatus?.({ connected: false });
|
||||
if (onEarlyGatewayDebug) {
|
||||
|
||||
@ -123,6 +123,8 @@ export type MsgContext = {
|
||||
/** Explicit owner allowlist overrides (trusted, configuration-derived). */
|
||||
OwnerAllowFrom?: Array<string | number>;
|
||||
SenderName?: string;
|
||||
/** Provider-managed account id when the inbound sender is one of our configured bot accounts. */
|
||||
SenderManagedAccountId?: string;
|
||||
SenderId?: string;
|
||||
SenderUsername?: string;
|
||||
SenderTag?: string;
|
||||
|
||||
@ -252,6 +252,7 @@ function loadSchemaWithPlugins(): ConfigSchemaResponse {
|
||||
runtimeOptions: {
|
||||
allowGatewaySubagentBinding: true,
|
||||
},
|
||||
inheritSharedRuntimeOptions: true,
|
||||
logger: {
|
||||
info: () => {},
|
||||
warn: () => {},
|
||||
|
||||
@ -171,6 +171,9 @@ describe("message hook mappers", () => {
|
||||
channelId: "telegram",
|
||||
accountId: "acc-1",
|
||||
messageId: "out-1",
|
||||
threadId: "thread-1",
|
||||
sessionKey: "agent:agent-1:telegram:chat:456",
|
||||
agentId: "agent-1",
|
||||
isGroup: true,
|
||||
groupId: "telegram:chat:456",
|
||||
});
|
||||
@ -185,6 +188,17 @@ describe("message hook mappers", () => {
|
||||
content: "reply",
|
||||
success: false,
|
||||
error: "network error",
|
||||
metadata: {
|
||||
channel: "telegram",
|
||||
accountId: "acc-1",
|
||||
conversationId: "telegram:chat:456",
|
||||
messageId: "out-1",
|
||||
threadId: "thread-1",
|
||||
sessionKey: "agent:agent-1:telegram:chat:456",
|
||||
agentId: "agent-1",
|
||||
isGroup: true,
|
||||
groupId: "telegram:chat:456",
|
||||
},
|
||||
});
|
||||
expect(toInternalMessageSentContext(canonical)).toEqual({
|
||||
to: "telegram:chat:456",
|
||||
|
||||
@ -28,6 +28,7 @@ export type CanonicalInboundMessageHookContext = {
|
||||
messageId?: string;
|
||||
senderId?: string;
|
||||
senderName?: string;
|
||||
senderManagedAccountId?: string;
|
||||
senderUsername?: string;
|
||||
senderE164?: string;
|
||||
provider?: string;
|
||||
@ -52,6 +53,9 @@ export type CanonicalSentMessageHookContext = {
|
||||
accountId?: string;
|
||||
conversationId?: string;
|
||||
messageId?: string;
|
||||
threadId?: string | number;
|
||||
sessionKey?: string;
|
||||
agentId?: string;
|
||||
isGroup?: boolean;
|
||||
groupId?: string;
|
||||
};
|
||||
@ -97,6 +101,7 @@ export function deriveInboundMessageHookContext(
|
||||
ctx.MessageSidLast,
|
||||
senderId: ctx.SenderId,
|
||||
senderName: ctx.SenderName,
|
||||
senderManagedAccountId: ctx.SenderManagedAccountId,
|
||||
senderUsername: ctx.SenderUsername,
|
||||
senderE164: ctx.SenderE164,
|
||||
provider: ctx.Provider,
|
||||
@ -122,6 +127,9 @@ export function buildCanonicalSentMessageHookContext(params: {
|
||||
accountId?: string;
|
||||
conversationId?: string;
|
||||
messageId?: string;
|
||||
threadId?: string | number;
|
||||
sessionKey?: string;
|
||||
agentId?: string;
|
||||
isGroup?: boolean;
|
||||
groupId?: string;
|
||||
}): CanonicalSentMessageHookContext {
|
||||
@ -134,6 +142,9 @@ export function buildCanonicalSentMessageHookContext(params: {
|
||||
accountId: params.accountId,
|
||||
conversationId: params.conversationId ?? params.to,
|
||||
messageId: params.messageId,
|
||||
threadId: params.threadId,
|
||||
sessionKey: params.sessionKey,
|
||||
agentId: params.agentId,
|
||||
isGroup: params.isGroup,
|
||||
groupId: params.groupId,
|
||||
};
|
||||
@ -296,6 +307,7 @@ export function toPluginMessageReceivedEvent(
|
||||
messageId: canonical.messageId,
|
||||
senderId: canonical.senderId,
|
||||
senderName: canonical.senderName,
|
||||
senderManagedAccountId: canonical.senderManagedAccountId,
|
||||
senderUsername: canonical.senderUsername,
|
||||
senderE164: canonical.senderE164,
|
||||
guildId: canonical.guildId,
|
||||
@ -312,6 +324,17 @@ export function toPluginMessageSentEvent(
|
||||
content: canonical.content,
|
||||
success: canonical.success,
|
||||
...(canonical.error ? { error: canonical.error } : {}),
|
||||
metadata: {
|
||||
channel: canonical.channelId,
|
||||
accountId: canonical.accountId,
|
||||
conversationId: canonical.conversationId,
|
||||
messageId: canonical.messageId,
|
||||
threadId: canonical.threadId,
|
||||
sessionKey: canonical.sessionKey,
|
||||
agentId: canonical.agentId,
|
||||
isGroup: canonical.isGroup,
|
||||
groupId: canonical.groupId,
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
@ -333,6 +356,7 @@ export function toInternalMessageReceivedContext(
|
||||
threadId: canonical.threadId,
|
||||
senderId: canonical.senderId,
|
||||
senderName: canonical.senderName,
|
||||
senderManagedAccountId: canonical.senderManagedAccountId,
|
||||
senderUsername: canonical.senderUsername,
|
||||
senderE164: canonical.senderE164,
|
||||
guildId: canonical.guildId,
|
||||
|
||||
@ -126,6 +126,7 @@ describe("outbound channel resolution", () => {
|
||||
runtimeOptions: {
|
||||
allowGatewaySubagentBinding: true,
|
||||
},
|
||||
inheritSharedRuntimeOptions: true,
|
||||
});
|
||||
|
||||
getChannelPluginMock.mockReturnValue(undefined);
|
||||
@ -140,6 +141,7 @@ describe("outbound channel resolution", () => {
|
||||
runtimeOptions: {
|
||||
allowGatewaySubagentBinding: true,
|
||||
},
|
||||
inheritSharedRuntimeOptions: true,
|
||||
});
|
||||
});
|
||||
|
||||
|
||||
@ -57,6 +57,7 @@ function maybeBootstrapChannelPlugin(params: {
|
||||
runtimeOptions: {
|
||||
allowGatewaySubagentBinding: true,
|
||||
},
|
||||
inheritSharedRuntimeOptions: true,
|
||||
});
|
||||
} catch {
|
||||
// Allow a follow-up resolution attempt if bootstrap failed transiently.
|
||||
|
||||
@ -20,7 +20,8 @@ const mocks = vi.hoisted(() => ({
|
||||
}));
|
||||
const hookMocks = vi.hoisted(() => ({
|
||||
runner: {
|
||||
hasHooks: vi.fn(() => false),
|
||||
hasHooks: vi.fn<(name: string) => boolean>((_name) => false),
|
||||
runMessageSending: vi.fn(async () => undefined),
|
||||
runMessageSent: vi.fn(async () => {}),
|
||||
},
|
||||
}));
|
||||
@ -210,6 +211,8 @@ describe("deliverOutboundPayloads", () => {
|
||||
mocks.appendAssistantMessageToSessionTranscript.mockClear();
|
||||
hookMocks.runner.hasHooks.mockClear();
|
||||
hookMocks.runner.hasHooks.mockReturnValue(false);
|
||||
hookMocks.runner.runMessageSending.mockClear();
|
||||
hookMocks.runner.runMessageSending.mockResolvedValue(undefined);
|
||||
hookMocks.runner.runMessageSent.mockClear();
|
||||
hookMocks.runner.runMessageSent.mockResolvedValue(undefined);
|
||||
internalHookMocks.createInternalHookEvent.mockClear();
|
||||
@ -983,6 +986,167 @@ describe("deliverOutboundPayloads", () => {
|
||||
);
|
||||
});
|
||||
|
||||
it("uses the same resolved sessionKey in message_sending and message_sent", async () => {
|
||||
hookMocks.runner.hasHooks.mockImplementation(
|
||||
(name: string) => name === "message_sending" || name === "message_sent",
|
||||
);
|
||||
const sendWhatsApp = vi.fn().mockResolvedValue({ messageId: "w1", toJid: "jid" });
|
||||
|
||||
await deliverOutboundPayloads({
|
||||
cfg: {},
|
||||
channel: "whatsapp",
|
||||
to: "+1555",
|
||||
payloads: [{ text: "hello" }],
|
||||
deps: { sendWhatsApp },
|
||||
session: { key: "agent:sender:session", agentId: "sender-agent" },
|
||||
mirror: { sessionKey: "agent:mirror:session", agentId: "mirror-agent" },
|
||||
});
|
||||
|
||||
expect(hookMocks.runner.runMessageSending).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
metadata: expect.objectContaining({
|
||||
sessionKey: "agent:mirror:session",
|
||||
agentId: "mirror-agent",
|
||||
}),
|
||||
}),
|
||||
expect.anything(),
|
||||
);
|
||||
expect(hookMocks.runner.runMessageSent).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
metadata: expect.objectContaining({
|
||||
sessionKey: "agent:mirror:session",
|
||||
agentId: "mirror-agent",
|
||||
}),
|
||||
}),
|
||||
expect.anything(),
|
||||
);
|
||||
});
|
||||
|
||||
it("uses replyToId as hook threadId for Slack threaded sends", async () => {
|
||||
hookMocks.runner.hasHooks.mockImplementation(
|
||||
(name: string) => name === "message_sending" || name === "message_sent",
|
||||
);
|
||||
const sendText = vi.fn().mockResolvedValue({ channel: "slack", messageId: "slack-1" });
|
||||
setActivePluginRegistry(
|
||||
createTestRegistry([
|
||||
{
|
||||
pluginId: "slack",
|
||||
source: "test",
|
||||
plugin: createOutboundTestPlugin({
|
||||
id: "slack",
|
||||
outbound: { deliveryMode: "direct", sendText },
|
||||
}),
|
||||
},
|
||||
]),
|
||||
);
|
||||
|
||||
await deliverOutboundPayloads({
|
||||
cfg: {},
|
||||
channel: "slack",
|
||||
to: "C123",
|
||||
payloads: [{ text: "hello" }],
|
||||
replyToId: "1741719181.247349",
|
||||
threadId: null,
|
||||
session: { key: "agent:sender:session", agentId: "sender-agent" },
|
||||
});
|
||||
|
||||
expect(hookMocks.runner.runMessageSending).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
metadata: expect.objectContaining({
|
||||
threadId: "1741719181.247349",
|
||||
sessionKey: "agent:sender:session",
|
||||
agentId: "sender-agent",
|
||||
}),
|
||||
}),
|
||||
expect.anything(),
|
||||
);
|
||||
expect(hookMocks.runner.runMessageSent).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
metadata: expect.objectContaining({
|
||||
threadId: "1741719181.247349",
|
||||
sessionKey: "agent:sender:session",
|
||||
agentId: "sender-agent",
|
||||
}),
|
||||
}),
|
||||
expect.anything(),
|
||||
);
|
||||
});
|
||||
|
||||
it("uses payload-level replyToId as hook threadId for Slack threaded sends", async () => {
|
||||
hookMocks.runner.hasHooks.mockImplementation(
|
||||
(name: string) => name === "message_sending" || name === "message_sent",
|
||||
);
|
||||
const sendText = vi.fn().mockResolvedValue({ channel: "slack", messageId: "slack-2" });
|
||||
setActivePluginRegistry(
|
||||
createTestRegistry([
|
||||
{
|
||||
pluginId: "slack",
|
||||
source: "test",
|
||||
plugin: createOutboundTestPlugin({
|
||||
id: "slack",
|
||||
outbound: { deliveryMode: "direct", sendText },
|
||||
}),
|
||||
},
|
||||
]),
|
||||
);
|
||||
|
||||
await deliverOutboundPayloads({
|
||||
cfg: {},
|
||||
channel: "slack",
|
||||
to: "C123",
|
||||
payloads: [{ text: "hello", replyToId: "1741719181.999999" }],
|
||||
replyToId: undefined,
|
||||
threadId: null,
|
||||
session: { key: "agent:sender:session", agentId: "sender-agent" },
|
||||
});
|
||||
|
||||
expect(hookMocks.runner.runMessageSending).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
metadata: expect.objectContaining({
|
||||
threadId: "1741719181.999999",
|
||||
sessionKey: "agent:sender:session",
|
||||
agentId: "sender-agent",
|
||||
}),
|
||||
}),
|
||||
expect.anything(),
|
||||
);
|
||||
expect(hookMocks.runner.runMessageSent).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
metadata: expect.objectContaining({
|
||||
threadId: "1741719181.999999",
|
||||
sessionKey: "agent:sender:session",
|
||||
agentId: "sender-agent",
|
||||
}),
|
||||
}),
|
||||
expect.anything(),
|
||||
);
|
||||
});
|
||||
|
||||
it("falls back to mirror metadata for message_sending when session is absent", async () => {
|
||||
hookMocks.runner.hasHooks.mockImplementation((name: string) => name === "message_sending");
|
||||
const sendWhatsApp = vi.fn().mockResolvedValue({ messageId: "w1", toJid: "jid" });
|
||||
|
||||
await deliverOutboundPayloads({
|
||||
cfg: {},
|
||||
channel: "whatsapp",
|
||||
to: "+1555",
|
||||
payloads: [{ text: "hello" }],
|
||||
deps: { sendWhatsApp },
|
||||
mirror: { sessionKey: "agent:mirror:session", agentId: "mirror-agent" },
|
||||
skipQueue: true,
|
||||
});
|
||||
|
||||
expect(hookMocks.runner.runMessageSending).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
metadata: expect.objectContaining({
|
||||
sessionKey: "agent:mirror:session",
|
||||
agentId: "mirror-agent",
|
||||
}),
|
||||
}),
|
||||
expect.anything(),
|
||||
);
|
||||
});
|
||||
|
||||
it("emits message_sent success for sendPayload deliveries", async () => {
|
||||
hookMocks.runner.hasHooks.mockReturnValue(true);
|
||||
const sendPayload = vi.fn().mockResolvedValue({ channel: "matrix", messageId: "mx-1" });
|
||||
@ -1199,6 +1363,47 @@ describe("deliverOutboundPayloads", () => {
|
||||
expect.objectContaining({ channelId: "whatsapp" }),
|
||||
);
|
||||
});
|
||||
|
||||
it("emits payload-level threadId in message_sent failure metadata for Slack threaded sends", async () => {
|
||||
hookMocks.runner.hasHooks.mockReturnValue(true);
|
||||
const sendText = vi.fn().mockRejectedValue(new Error("downstream failed"));
|
||||
setActivePluginRegistry(
|
||||
createTestRegistry([
|
||||
{
|
||||
pluginId: "slack",
|
||||
source: "test",
|
||||
plugin: createOutboundTestPlugin({
|
||||
id: "slack",
|
||||
outbound: { deliveryMode: "direct", sendText },
|
||||
}),
|
||||
},
|
||||
]),
|
||||
);
|
||||
|
||||
await expect(
|
||||
deliverOutboundPayloads({
|
||||
cfg: {},
|
||||
channel: "slack",
|
||||
to: "C123",
|
||||
payloads: [{ text: "hi", replyToId: "1741719181.424242" }],
|
||||
threadId: null,
|
||||
session: { key: "agent:sender:session", agentId: "sender-agent" },
|
||||
}),
|
||||
).rejects.toThrow("downstream failed");
|
||||
|
||||
expect(hookMocks.runner.runMessageSent).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
metadata: expect.objectContaining({
|
||||
threadId: "1741719181.424242",
|
||||
sessionKey: "agent:sender:session",
|
||||
agentId: "sender-agent",
|
||||
}),
|
||||
success: false,
|
||||
error: "downstream failed",
|
||||
}),
|
||||
expect.anything(),
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
const emptyRegistry = createTestRegistry([]);
|
||||
|
||||
@ -345,13 +345,23 @@ function createMessageSentEmitter(params: {
|
||||
channel: Exclude<OutboundChannel, "none">;
|
||||
to: string;
|
||||
accountId?: string;
|
||||
conversationId?: string;
|
||||
agentId?: string;
|
||||
sessionKey?: string;
|
||||
sessionKeyForInternalHooks?: string;
|
||||
mirrorIsGroup?: boolean;
|
||||
mirrorGroupId?: string;
|
||||
}): { emitMessageSent: (event: MessageSentEvent) => void; hasMessageSentHooks: boolean } {
|
||||
}): {
|
||||
emitMessageSent: (
|
||||
event: MessageSentEvent & { threadId?: string | number; conversationId?: string },
|
||||
) => void;
|
||||
hasMessageSentHooks: boolean;
|
||||
} {
|
||||
const hasMessageSentHooks = params.hookRunner?.hasHooks("message_sent") ?? false;
|
||||
const canEmitInternalHook = Boolean(params.sessionKeyForInternalHooks);
|
||||
const emitMessageSent = (event: MessageSentEvent) => {
|
||||
const emitMessageSent = (
|
||||
event: MessageSentEvent & { threadId?: string | number; conversationId?: string },
|
||||
) => {
|
||||
if (!hasMessageSentHooks && !canEmitInternalHook) {
|
||||
return;
|
||||
}
|
||||
@ -362,8 +372,11 @@ function createMessageSentEmitter(params: {
|
||||
error: event.error,
|
||||
channelId: params.channel,
|
||||
accountId: params.accountId ?? undefined,
|
||||
conversationId: params.to,
|
||||
conversationId: event.conversationId ?? params.conversationId ?? params.to,
|
||||
messageId: event.messageId,
|
||||
threadId: event.threadId,
|
||||
sessionKey: params.sessionKey,
|
||||
agentId: params.agentId,
|
||||
isGroup: params.mirrorIsGroup,
|
||||
groupId: params.mirrorGroupId,
|
||||
});
|
||||
@ -400,6 +413,31 @@ function createMessageSentEmitter(params: {
|
||||
return { emitMessageSent, hasMessageSentHooks };
|
||||
}
|
||||
|
||||
function resolveOutboundHookMetadata(params: {
|
||||
channel: Exclude<OutboundChannel, "none">;
|
||||
to: string;
|
||||
conversationId?: string;
|
||||
replyToId?: string | null;
|
||||
threadId?: string | number | null;
|
||||
session?: OutboundSessionContext;
|
||||
mirror?: DeliverOutboundPayloadsCoreParams["mirror"];
|
||||
}): {
|
||||
conversationId: string;
|
||||
threadId?: string | number;
|
||||
sessionKey?: string;
|
||||
agentId?: string;
|
||||
} {
|
||||
const resolvedThreadId =
|
||||
params.threadId ??
|
||||
(params.channel === "slack" && params.replyToId ? params.replyToId : undefined);
|
||||
return {
|
||||
conversationId: params.conversationId ?? params.to,
|
||||
threadId: resolvedThreadId,
|
||||
sessionKey: params.mirror?.sessionKey ?? params.session?.key,
|
||||
agentId: params.mirror?.agentId ?? params.session?.agentId,
|
||||
};
|
||||
}
|
||||
|
||||
async function applyMessageSendingHook(params: {
|
||||
hookRunner: ReturnType<typeof getGlobalHookRunner>;
|
||||
enabled: boolean;
|
||||
@ -408,6 +446,10 @@ async function applyMessageSendingHook(params: {
|
||||
to: string;
|
||||
channel: Exclude<OutboundChannel, "none">;
|
||||
accountId?: string;
|
||||
conversationId?: string;
|
||||
threadId?: string | number | null;
|
||||
sessionKey?: string;
|
||||
agentId?: string;
|
||||
}): Promise<{
|
||||
cancelled: boolean;
|
||||
payload: ReplyPayload;
|
||||
@ -428,12 +470,17 @@ async function applyMessageSendingHook(params: {
|
||||
metadata: {
|
||||
channel: params.channel,
|
||||
accountId: params.accountId,
|
||||
conversationId: params.conversationId ?? params.to,
|
||||
threadId: params.threadId ?? undefined,
|
||||
sessionKey: params.sessionKey,
|
||||
agentId: params.agentId,
|
||||
mediaUrls: params.payloadSummary.mediaUrls,
|
||||
},
|
||||
},
|
||||
{
|
||||
channelId: params.channel,
|
||||
accountId: params.accountId ?? undefined,
|
||||
conversationId: params.conversationId ?? params.to,
|
||||
},
|
||||
);
|
||||
if (sendingResult?.cancel) {
|
||||
@ -609,7 +656,16 @@ async function deliverOutboundPayloadsCore(
|
||||
};
|
||||
const normalizedPayloads = normalizePayloadsForChannelDelivery(payloads, channel, handler);
|
||||
const hookRunner = getGlobalHookRunner();
|
||||
const sessionKeyForInternalHooks = params.mirror?.sessionKey ?? params.session?.key;
|
||||
const hookMetadata = resolveOutboundHookMetadata({
|
||||
channel,
|
||||
to,
|
||||
conversationId: to,
|
||||
replyToId: params.replyToId,
|
||||
threadId: params.threadId,
|
||||
session: params.session,
|
||||
mirror: params.mirror,
|
||||
});
|
||||
const sessionKeyForInternalHooks = hookMetadata.sessionKey;
|
||||
const mirrorIsGroup = params.mirror?.isGroup;
|
||||
const mirrorGroupId = params.mirror?.groupId;
|
||||
const { emitMessageSent, hasMessageSentHooks } = createMessageSentEmitter({
|
||||
@ -617,23 +673,35 @@ async function deliverOutboundPayloadsCore(
|
||||
channel,
|
||||
to,
|
||||
accountId,
|
||||
conversationId: hookMetadata.conversationId,
|
||||
agentId: hookMetadata.agentId,
|
||||
sessionKey: hookMetadata.sessionKey,
|
||||
sessionKeyForInternalHooks,
|
||||
mirrorIsGroup,
|
||||
mirrorGroupId,
|
||||
});
|
||||
const hasMessageSendingHooks = hookRunner?.hasHooks("message_sending") ?? false;
|
||||
if (hasMessageSentHooks && params.session?.agentId && !sessionKeyForInternalHooks) {
|
||||
if (hasMessageSentHooks && hookMetadata.agentId && !sessionKeyForInternalHooks) {
|
||||
log.warn(
|
||||
"deliverOutboundPayloads: session.agentId present without session key; internal message:sent hook will be skipped",
|
||||
{
|
||||
channel,
|
||||
to,
|
||||
agentId: params.session.agentId,
|
||||
agentId: hookMetadata.agentId,
|
||||
},
|
||||
);
|
||||
}
|
||||
for (const payload of normalizedPayloads) {
|
||||
let payloadSummary = buildPayloadSummary(payload);
|
||||
let payloadHookMetadata = resolveOutboundHookMetadata({
|
||||
channel,
|
||||
to,
|
||||
conversationId: to,
|
||||
replyToId: payload.replyToId ?? params.replyToId,
|
||||
threadId: params.threadId,
|
||||
session: params.session,
|
||||
mirror: params.mirror,
|
||||
});
|
||||
try {
|
||||
throwIfAborted(abortSignal);
|
||||
|
||||
@ -646,6 +714,10 @@ async function deliverOutboundPayloadsCore(
|
||||
to,
|
||||
channel,
|
||||
accountId,
|
||||
conversationId: payloadHookMetadata.conversationId,
|
||||
threadId: payloadHookMetadata.threadId,
|
||||
sessionKey: payloadHookMetadata.sessionKey,
|
||||
agentId: payloadHookMetadata.agentId,
|
||||
});
|
||||
if (hookResult.cancelled) {
|
||||
continue;
|
||||
@ -659,6 +731,15 @@ async function deliverOutboundPayloadsCore(
|
||||
threadId: params.threadId ?? undefined,
|
||||
forceDocument: params.forceDocument,
|
||||
};
|
||||
payloadHookMetadata = resolveOutboundHookMetadata({
|
||||
channel,
|
||||
to,
|
||||
conversationId: to,
|
||||
replyToId: sendOverrides.replyToId,
|
||||
threadId: sendOverrides.threadId,
|
||||
session: params.session,
|
||||
mirror: params.mirror,
|
||||
});
|
||||
if (
|
||||
handler.sendPayload &&
|
||||
hasReplyPayloadContent({
|
||||
@ -672,6 +753,8 @@ async function deliverOutboundPayloadsCore(
|
||||
success: true,
|
||||
content: payloadSummary.text,
|
||||
messageId: delivery.messageId,
|
||||
conversationId: payloadHookMetadata.conversationId,
|
||||
threadId: payloadHookMetadata.threadId,
|
||||
});
|
||||
continue;
|
||||
}
|
||||
@ -687,6 +770,8 @@ async function deliverOutboundPayloadsCore(
|
||||
success: results.length > beforeCount,
|
||||
content: payloadSummary.text,
|
||||
messageId,
|
||||
conversationId: payloadHookMetadata.conversationId,
|
||||
threadId: payloadHookMetadata.threadId,
|
||||
});
|
||||
continue;
|
||||
}
|
||||
@ -713,6 +798,8 @@ async function deliverOutboundPayloadsCore(
|
||||
success: results.length > beforeCount,
|
||||
content: payloadSummary.text,
|
||||
messageId,
|
||||
conversationId: payloadHookMetadata.conversationId,
|
||||
threadId: payloadHookMetadata.threadId,
|
||||
});
|
||||
continue;
|
||||
}
|
||||
@ -742,12 +829,16 @@ async function deliverOutboundPayloadsCore(
|
||||
success: true,
|
||||
content: payloadSummary.text,
|
||||
messageId: lastMessageId,
|
||||
conversationId: payloadHookMetadata.conversationId,
|
||||
threadId: payloadHookMetadata.threadId,
|
||||
});
|
||||
} catch (err) {
|
||||
emitMessageSent({
|
||||
success: false,
|
||||
content: payloadSummary.text,
|
||||
error: err instanceof Error ? err.message : String(err),
|
||||
conversationId: payloadHookMetadata.conversationId,
|
||||
threadId: payloadHookMetadata.threadId,
|
||||
});
|
||||
if (!params.bestEffort) {
|
||||
throw err;
|
||||
|
||||
@ -34,7 +34,12 @@ async function writeRuntimePostBuildScaffold(tmp: string): Promise<void> {
|
||||
await fs.utimes(pluginSdkAliasPath, baselineTime, baselineTime);
|
||||
}
|
||||
|
||||
function expectedBuildSpawn() {
|
||||
async function readJsonFile(filePath: string): Promise<unknown> {
|
||||
return JSON.parse(await fs.readFile(filePath, "utf-8"));
|
||||
}
|
||||
|
||||
function expectedBuildSpawn(platform: NodeJS.Platform = process.platform) {
|
||||
void platform;
|
||||
return [process.execPath, "scripts/tsdown-build.mjs", "--no-clean"];
|
||||
}
|
||||
|
||||
@ -153,10 +158,11 @@ describe("run-node script", () => {
|
||||
fs.readFile(path.join(tmp, "dist", "plugin-sdk", "root-alias.cjs"), "utf-8"),
|
||||
).resolves.toContain("module.exports = {};");
|
||||
await expect(
|
||||
fs
|
||||
.readFile(path.join(tmp, "dist", "extensions", "demo", "openclaw.plugin.json"), "utf-8")
|
||||
.then((raw) => JSON.parse(raw)),
|
||||
).resolves.toMatchObject({ id: "demo" });
|
||||
readJsonFile(path.join(tmp, "dist", "extensions", "demo", "openclaw.plugin.json")),
|
||||
).resolves.toMatchObject({
|
||||
id: "demo",
|
||||
configSchema: { type: "object" },
|
||||
});
|
||||
await expect(
|
||||
fs.readFile(path.join(tmp, "dist", "extensions", "demo", "package.json"), "utf-8"),
|
||||
).resolves.toContain(
|
||||
@ -499,10 +505,9 @@ describe("run-node script", () => {
|
||||
|
||||
expect(exitCode).toBe(0);
|
||||
expect(spawnCalls).toEqual([[process.execPath, "openclaw.mjs", "status"]]);
|
||||
await expect(
|
||||
fs.readFile(distManifestPath, "utf-8").then((raw) => JSON.parse(raw)),
|
||||
).resolves.toMatchObject({
|
||||
await expect(readJsonFile(distManifestPath)).resolves.toMatchObject({
|
||||
id: "demo",
|
||||
configSchema: { type: "object" },
|
||||
});
|
||||
});
|
||||
});
|
||||
@ -568,10 +573,9 @@ describe("run-node script", () => {
|
||||
|
||||
expect(exitCode).toBe(0);
|
||||
expect(spawnCalls).toEqual([[process.execPath, "openclaw.mjs", "status"]]);
|
||||
await expect(
|
||||
fs.readFile(distManifestPath, "utf-8").then((raw) => JSON.parse(raw)),
|
||||
).resolves.toMatchObject({
|
||||
await expect(readJsonFile(distManifestPath)).resolves.toMatchObject({
|
||||
id: "demo",
|
||||
configSchema: { type: "object" },
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@ -105,4 +105,42 @@ describe("KeyedAsyncQueue", () => {
|
||||
await Promise.resolve();
|
||||
expect(queue.getTailMapForTesting().has("actor")).toBe(false);
|
||||
});
|
||||
|
||||
it("waits for tasks enqueued while draining", async () => {
|
||||
const queue = new KeyedAsyncQueue();
|
||||
const firstGate = deferred<void>();
|
||||
const secondGate = deferred<void>();
|
||||
let secondStarted = false;
|
||||
|
||||
void queue.enqueue("actor", async () => {
|
||||
await firstGate.promise;
|
||||
});
|
||||
|
||||
const idlePromise = queue.waitForIdle();
|
||||
|
||||
await vi.waitFor(() => {
|
||||
expect(queue.getTailMapForTesting().has("actor")).toBe(true);
|
||||
});
|
||||
|
||||
void queue.enqueue("actor", async () => {
|
||||
secondStarted = true;
|
||||
await secondGate.promise;
|
||||
});
|
||||
|
||||
firstGate.resolve();
|
||||
await vi.waitFor(() => {
|
||||
expect(secondStarted).toBe(true);
|
||||
});
|
||||
|
||||
let idleResolved = false;
|
||||
void idlePromise.then(() => {
|
||||
idleResolved = true;
|
||||
});
|
||||
await Promise.resolve();
|
||||
expect(idleResolved).toBe(false);
|
||||
|
||||
secondGate.resolve();
|
||||
await idlePromise;
|
||||
expect(queue.getTailMapForTesting().size).toBe(0);
|
||||
});
|
||||
});
|
||||
|
||||
@ -46,4 +46,10 @@ export class KeyedAsyncQueue {
|
||||
...(hooks ? { hooks } : {}),
|
||||
});
|
||||
}
|
||||
|
||||
async waitForIdle(): Promise<void> {
|
||||
while (this.tails.size > 0) {
|
||||
await Promise.allSettled(this.tails.values());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -61,6 +61,7 @@ export type PluginLoadOptions = {
|
||||
logger?: PluginLogger;
|
||||
coreGatewayHandlers?: Record<string, GatewayRequestHandler>;
|
||||
runtimeOptions?: CreatePluginRuntimeOptions;
|
||||
inheritSharedRuntimeOptions?: boolean;
|
||||
cache?: boolean;
|
||||
mode?: "full" | "validate";
|
||||
onlyPluginIds?: string[];
|
||||
|
||||
@ -1670,6 +1670,7 @@ export type PluginHookMessageSentEvent = {
|
||||
content: string;
|
||||
success: boolean;
|
||||
error?: string;
|
||||
metadata?: Record<string, unknown>;
|
||||
};
|
||||
|
||||
// Tool context
|
||||
|
||||
@ -38,8 +38,10 @@ type ProviderMonitorTestMocks = {
|
||||
listNativeCommandSpecsForConfigMock: Mock<() => NativeCommandSpecMock[]>;
|
||||
listSkillCommandsForAgentsMock: Mock<() => unknown[]>;
|
||||
monitorLifecycleMock: Mock<(params: { threadBindings: { stop: () => void } }) => Promise<void>>;
|
||||
rememberDiscordManagedBotIdentityMock: Mock<() => void>;
|
||||
resolveDiscordAccountMock: Mock<() => unknown>;
|
||||
resolveDiscordAllowlistConfigMock: Mock<() => Promise<unknown>>;
|
||||
forgetDiscordManagedBotIdentityMock: Mock<() => void>;
|
||||
resolveNativeCommandsEnabledMock: Mock<() => boolean>;
|
||||
resolveNativeSkillsEnabledMock: Mock<() => boolean>;
|
||||
isVerboseMock: Mock<() => boolean>;
|
||||
@ -81,6 +83,7 @@ const providerMonitorTestMocks: ProviderMonitorTestMocks = vi.hoisted(() => {
|
||||
vi.fn(async () => undefined),
|
||||
{
|
||||
deactivate: vi.fn(),
|
||||
waitForIdle: vi.fn(async () => undefined),
|
||||
},
|
||||
),
|
||||
),
|
||||
@ -113,6 +116,7 @@ const providerMonitorTestMocks: ProviderMonitorTestMocks = vi.hoisted(() => {
|
||||
monitorLifecycleMock: vi.fn(async (params: { threadBindings: { stop: () => void } }) => {
|
||||
params.threadBindings.stop();
|
||||
}),
|
||||
rememberDiscordManagedBotIdentityMock: vi.fn(),
|
||||
resolveDiscordAccountMock: vi.fn(() => ({
|
||||
accountId: "default",
|
||||
token: "cfg-token",
|
||||
@ -122,6 +126,7 @@ const providerMonitorTestMocks: ProviderMonitorTestMocks = vi.hoisted(() => {
|
||||
guildEntries: undefined,
|
||||
allowFrom: undefined,
|
||||
})),
|
||||
forgetDiscordManagedBotIdentityMock: vi.fn(),
|
||||
resolveNativeCommandsEnabledMock: vi.fn(() => true),
|
||||
resolveNativeSkillsEnabledMock: vi.fn(() => false),
|
||||
isVerboseMock,
|
||||
@ -147,8 +152,10 @@ const {
|
||||
listNativeCommandSpecsForConfigMock,
|
||||
listSkillCommandsForAgentsMock,
|
||||
monitorLifecycleMock,
|
||||
rememberDiscordManagedBotIdentityMock,
|
||||
resolveDiscordAccountMock,
|
||||
resolveDiscordAllowlistConfigMock,
|
||||
forgetDiscordManagedBotIdentityMock,
|
||||
resolveNativeCommandsEnabledMock,
|
||||
resolveNativeSkillsEnabledMock,
|
||||
isVerboseMock,
|
||||
@ -199,6 +206,7 @@ export function resetDiscordProviderMonitorMocks(params?: {
|
||||
vi.fn(async () => undefined),
|
||||
{
|
||||
deactivate: vi.fn(),
|
||||
waitForIdle: vi.fn(async () => undefined),
|
||||
},
|
||||
),
|
||||
);
|
||||
@ -221,6 +229,7 @@ export function resetDiscordProviderMonitorMocks(params?: {
|
||||
monitorLifecycleMock.mockClear().mockImplementation(async (monitorParams) => {
|
||||
monitorParams.threadBindings.stop();
|
||||
});
|
||||
rememberDiscordManagedBotIdentityMock.mockClear();
|
||||
resolveDiscordAccountMock.mockClear().mockReturnValue({
|
||||
accountId: "default",
|
||||
token: "cfg-token",
|
||||
@ -230,6 +239,7 @@ export function resetDiscordProviderMonitorMocks(params?: {
|
||||
guildEntries: undefined,
|
||||
allowFrom: undefined,
|
||||
});
|
||||
forgetDiscordManagedBotIdentityMock.mockClear();
|
||||
resolveNativeCommandsEnabledMock.mockClear().mockReturnValue(true);
|
||||
resolveNativeSkillsEnabledMock.mockClear().mockReturnValue(false);
|
||||
isVerboseMock.mockClear().mockReturnValue(false);
|
||||
@ -248,16 +258,34 @@ export const baseConfig = (): OpenClawConfig =>
|
||||
channels: {
|
||||
discord: {
|
||||
accounts: {
|
||||
default: {
|
||||
token: "MTIz.abc.def",
|
||||
},
|
||||
default: {},
|
||||
},
|
||||
},
|
||||
},
|
||||
}) as OpenClawConfig;
|
||||
|
||||
vi.mock("@buape/carbon", async (importOriginal) => {
|
||||
const actual = await importOriginal<typeof import("@buape/carbon")>();
|
||||
vi.mock("@buape/carbon", () => {
|
||||
class Button {}
|
||||
class ChannelSelectMenu {}
|
||||
class Command {}
|
||||
class CommandWithSubcommands {}
|
||||
class Container {
|
||||
constructor(
|
||||
_components?: unknown,
|
||||
_options?: {
|
||||
accentColor?: string;
|
||||
spoiler?: boolean;
|
||||
},
|
||||
) {}
|
||||
}
|
||||
class MentionableSelectMenu {}
|
||||
class Message {}
|
||||
class MessageCreateListener {}
|
||||
class MessageReactionAddListener {}
|
||||
class MessageReactionRemoveListener {}
|
||||
class Modal {}
|
||||
class PresenceUpdateListener {}
|
||||
class ReadyListener {}
|
||||
class RateLimitError extends Error {
|
||||
status = 429;
|
||||
discordCode?: number;
|
||||
@ -294,16 +322,91 @@ vi.mock("@buape/carbon", async (importOriginal) => {
|
||||
return clientGetPluginMock(name);
|
||||
}
|
||||
}
|
||||
return { ...actual, Client, RateLimitError };
|
||||
class RequestClient {}
|
||||
class Row {}
|
||||
class RoleSelectMenu {}
|
||||
class Separator {}
|
||||
class StringSelectMenu {}
|
||||
class TextDisplay {}
|
||||
class ThreadUpdateListener {}
|
||||
class UserSelectMenu {}
|
||||
class Embed {}
|
||||
const ChannelType = {
|
||||
GuildText: 0,
|
||||
DM: 1,
|
||||
GuildVoice: 2,
|
||||
GroupDM: 3,
|
||||
GuildCategory: 4,
|
||||
GuildAnnouncement: 5,
|
||||
AnnouncementThread: 10,
|
||||
PublicThread: 11,
|
||||
PrivateThread: 12,
|
||||
};
|
||||
const MessageType = {
|
||||
Default: 0,
|
||||
Reply: 19,
|
||||
};
|
||||
return {
|
||||
Client,
|
||||
Button,
|
||||
ChannelSelectMenu,
|
||||
ChannelType,
|
||||
Command,
|
||||
CommandWithSubcommands,
|
||||
Container,
|
||||
Embed,
|
||||
MentionableSelectMenu,
|
||||
Message,
|
||||
MessageCreateListener,
|
||||
MessageType,
|
||||
MessageReactionAddListener,
|
||||
MessageReactionRemoveListener,
|
||||
Modal,
|
||||
PresenceUpdateListener,
|
||||
RateLimitError,
|
||||
ReadyListener,
|
||||
RequestClient,
|
||||
Row,
|
||||
RoleSelectMenu,
|
||||
Separator,
|
||||
serializePayload: (payload: unknown) => payload,
|
||||
StringSelectMenu,
|
||||
TextDisplay,
|
||||
ThreadUpdateListener,
|
||||
UserSelectMenu,
|
||||
};
|
||||
});
|
||||
|
||||
vi.mock("@buape/carbon/gateway", () => ({
|
||||
GatewayCloseCodes: { DisallowedIntents: 4014 },
|
||||
}));
|
||||
vi.mock("@buape/carbon/gateway", () => {
|
||||
class GatewayPlugin {
|
||||
gatewayInfo?: unknown;
|
||||
constructor(_options?: unknown) {}
|
||||
async registerClient(_client: unknown) {
|
||||
return undefined;
|
||||
}
|
||||
}
|
||||
return {
|
||||
GatewayCloseCodes: { DisallowedIntents: 4014 },
|
||||
GatewayIntents: {
|
||||
Guilds: 1 << 0,
|
||||
GuildMessages: 1 << 9,
|
||||
MessageContent: 1 << 15,
|
||||
DirectMessages: 1 << 12,
|
||||
GuildMessageReactions: 1 << 10,
|
||||
DirectMessageReactions: 1 << 13,
|
||||
GuildVoiceStates: 1 << 7,
|
||||
GuildPresences: 1 << 8,
|
||||
GuildMembers: 1 << 1,
|
||||
},
|
||||
GatewayPlugin,
|
||||
};
|
||||
});
|
||||
|
||||
vi.mock("@buape/carbon/voice", () => ({
|
||||
VoicePlugin: class VoicePlugin {},
|
||||
}));
|
||||
vi.mock("@buape/carbon/voice", () => {
|
||||
return {
|
||||
VoicePlugin: class VoicePlugin {},
|
||||
};
|
||||
});
|
||||
|
||||
vi.mock("openclaw/plugin-sdk/acp-runtime", async () => {
|
||||
const actual = await vi.importActual<typeof import("openclaw/plugin-sdk/acp-runtime")>(
|
||||
@ -388,6 +491,8 @@ vi.mock("openclaw/plugin-sdk/infra-runtime", async () => {
|
||||
});
|
||||
|
||||
vi.mock("../../../extensions/discord/src/accounts.js", () => ({
|
||||
forgetDiscordManagedBotIdentity: forgetDiscordManagedBotIdentityMock,
|
||||
rememberDiscordManagedBotIdentity: rememberDiscordManagedBotIdentityMock,
|
||||
resolveDiscordAccount: resolveDiscordAccountMock,
|
||||
}));
|
||||
|
||||
@ -472,9 +577,7 @@ vi.mock("../../../extensions/discord/src/monitor/provider.lifecycle.js", () => (
|
||||
}));
|
||||
|
||||
vi.mock("../../../extensions/discord/src/monitor/rest-fetch.js", () => ({
|
||||
resolveDiscordRestFetch: () => async () => {
|
||||
throw new Error("offline");
|
||||
},
|
||||
resolveDiscordRestFetch: () => async () => undefined,
|
||||
}));
|
||||
|
||||
vi.mock("../../../extensions/discord/src/monitor/thread-bindings.js", () => ({
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user