fix: debounce inbound messages across channels (#971) (thanks @juanpablodlc)
This commit is contained in:
parent
57d3c8572f
commit
1561b1c491
@ -8,6 +8,7 @@
|
||||
- Agents: add Current Date & Time system prompt section with configurable time format (auto/12/24).
|
||||
- Tools: normalize Slack/Discord message timestamps with `timestampMs`/`timestampUtc` while keeping raw provider fields.
|
||||
- Docs: add Date & Time guide and update prompt/timezone configuration docs.
|
||||
- Messages: debounce rapid inbound messages across channels with per-connector overrides. (#971) — thanks @juanpablodlc.
|
||||
- Fix: guard model fallback against undefined provider/model values. (#954) — thanks @roshanasingh4.
|
||||
- Memory: make `node-llama-cpp` an optional dependency (avoid Node 25 install failures) and improve local-embeddings fallback/errors.
|
||||
- Browser: add `snapshot refs=aria` (Playwright aria-ref ids) for self-resolving refs across `snapshot` → `act`.
|
||||
|
||||
@ -54,11 +54,7 @@ import { buildEmbeddedSystemPrompt, createSystemPromptOverride } from "./system-
|
||||
import { splitSdkTools } from "./tool-split.js";
|
||||
import type { EmbeddedPiCompactResult } from "./types.js";
|
||||
import { formatUserTime, resolveUserTimeFormat, resolveUserTimezone } from "../date-time.js";
|
||||
import {
|
||||
describeUnknownError,
|
||||
mapThinkingLevel,
|
||||
resolveExecToolDefaults,
|
||||
} from "./utils.js";
|
||||
import { describeUnknownError, mapThinkingLevel, resolveExecToolDefaults } from "./utils.js";
|
||||
|
||||
export async function compactEmbeddedPiSession(params: {
|
||||
sessionId: string;
|
||||
@ -227,9 +223,7 @@ export async function compactEmbeddedPiSession(params: {
|
||||
const sandboxInfo = buildEmbeddedSandboxInfo(sandbox, params.bashElevated);
|
||||
const reasoningTagHint = isReasoningTagProvider(provider);
|
||||
const userTimezone = resolveUserTimezone(params.config?.agents?.defaults?.userTimezone);
|
||||
const userTimeFormat = resolveUserTimeFormat(
|
||||
params.config?.agents?.defaults?.timeFormat,
|
||||
);
|
||||
const userTimeFormat = resolveUserTimeFormat(params.config?.agents?.defaults?.timeFormat);
|
||||
const userTime = formatUserTime(new Date(), userTimezone, userTimeFormat);
|
||||
const { defaultAgentId, sessionAgentId } = resolveSessionAgentIds({
|
||||
sessionKey: params.sessionKey,
|
||||
|
||||
@ -138,16 +138,16 @@ describe("handleDiscordMessagingAction", () => {
|
||||
});
|
||||
|
||||
it("adds normalized timestamps to readMessages payloads", async () => {
|
||||
readMessagesDiscord.mockResolvedValueOnce([
|
||||
{ id: "1", timestamp: "2026-01-15T10:00:00.000Z" },
|
||||
]);
|
||||
readMessagesDiscord.mockResolvedValueOnce([{ id: "1", timestamp: "2026-01-15T10:00:00.000Z" }]);
|
||||
|
||||
const result = await handleDiscordMessagingAction(
|
||||
"readMessages",
|
||||
{ channelId: "C1" },
|
||||
enableAllActions,
|
||||
);
|
||||
const payload = result.details as { messages: Array<{ timestampMs?: number; timestampUtc?: string }> };
|
||||
const payload = result.details as {
|
||||
messages: Array<{ timestampMs?: number; timestampUtc?: string }>;
|
||||
};
|
||||
|
||||
const expectedMs = Date.parse("2026-01-15T10:00:00.000Z");
|
||||
expect(payload.messages[0].timestampMs).toBe(expectedMs);
|
||||
@ -173,16 +173,16 @@ describe("handleDiscordMessagingAction", () => {
|
||||
});
|
||||
|
||||
it("adds normalized timestamps to listPins payloads", async () => {
|
||||
listPinsDiscord.mockResolvedValueOnce([
|
||||
{ id: "1", timestamp: "2026-01-15T12:00:00.000Z" },
|
||||
]);
|
||||
listPinsDiscord.mockResolvedValueOnce([{ id: "1", timestamp: "2026-01-15T12:00:00.000Z" }]);
|
||||
|
||||
const result = await handleDiscordMessagingAction(
|
||||
"listPins",
|
||||
{ channelId: "C1" },
|
||||
enableAllActions,
|
||||
);
|
||||
const payload = result.details as { pins: Array<{ timestampMs?: number; timestampUtc?: string }> };
|
||||
const payload = result.details as {
|
||||
pins: Array<{ timestampMs?: number; timestampUtc?: string }>;
|
||||
};
|
||||
|
||||
const expectedMs = Date.parse("2026-01-15T12:00:00.000Z");
|
||||
expect(payload.pins[0].timestampMs).toBe(expectedMs);
|
||||
|
||||
@ -334,7 +334,9 @@ describe("handleSlackAction", () => {
|
||||
});
|
||||
|
||||
const result = await handleSlackAction({ action: "readMessages", channelId: "C1" }, cfg);
|
||||
const payload = result.details as { messages: Array<{ timestampMs?: number; timestampUtc?: string }> };
|
||||
const payload = result.details as {
|
||||
messages: Array<{ timestampMs?: number; timestampUtc?: string }>;
|
||||
};
|
||||
|
||||
const expectedMs = Math.round(1735689600.456 * 1000);
|
||||
expect(payload.messages[0].timestampMs).toBe(expectedMs);
|
||||
|
||||
@ -263,9 +263,7 @@ describe("trigger handling", () => {
|
||||
|
||||
const text = Array.isArray(res) ? res[0]?.text : res?.text;
|
||||
// Selecting the default model shows "reset to default" instead of "set to"
|
||||
expect(normalizeTestText(text ?? "")).toContain(
|
||||
"anthropic/claude-opus-4-5",
|
||||
);
|
||||
expect(normalizeTestText(text ?? "")).toContain("anthropic/claude-opus-4-5");
|
||||
|
||||
const store = loadSessionStore(cfg.session.store);
|
||||
// When selecting the default, overrides are cleared
|
||||
|
||||
@ -63,24 +63,30 @@ export function createDiscordMessageHandler(params: {
|
||||
onFlush: async (entries) => {
|
||||
const last = entries.at(-1);
|
||||
if (!last) return;
|
||||
const combinedBaseText =
|
||||
entries.length === 1
|
||||
? resolveDiscordMessageText(last.data.message, { includeForwarded: false })
|
||||
: entries
|
||||
.map((entry) =>
|
||||
resolveDiscordMessageText(entry.data.message, { includeForwarded: false }),
|
||||
)
|
||||
.filter(Boolean)
|
||||
.join("\n");
|
||||
if (entries.length === 1) {
|
||||
const ctx = await preflightDiscordMessage({
|
||||
...params,
|
||||
ackReactionScope,
|
||||
groupPolicy,
|
||||
data: last.data,
|
||||
client: last.client,
|
||||
});
|
||||
if (!ctx) return;
|
||||
await processDiscordMessage(ctx);
|
||||
return;
|
||||
}
|
||||
const combinedBaseText = entries
|
||||
.map((entry) => resolveDiscordMessageText(entry.data.message, { includeForwarded: false }))
|
||||
.filter(Boolean)
|
||||
.join("\n");
|
||||
const syntheticMessage = {
|
||||
...last.data.message,
|
||||
content: combinedBaseText,
|
||||
attachments: [],
|
||||
message_snapshots: [],
|
||||
messageSnapshots: [],
|
||||
message_snapshots: (last.data.message as { message_snapshots?: unknown }).message_snapshots,
|
||||
messageSnapshots: (last.data.message as { messageSnapshots?: unknown }).messageSnapshots,
|
||||
rawData: {
|
||||
...(last.data.message as { rawData?: Record<string, unknown> }).rawData,
|
||||
message_snapshots: [],
|
||||
},
|
||||
};
|
||||
const syntheticData: DiscordMessageEvent = {
|
||||
@ -96,9 +102,7 @@ export function createDiscordMessageHandler(params: {
|
||||
});
|
||||
if (!ctx) return;
|
||||
if (entries.length > 1) {
|
||||
const ids = entries
|
||||
.map((entry) => entry.data.message?.id)
|
||||
.filter(Boolean) as string[];
|
||||
const ids = entries.map((entry) => entry.data.message?.id).filter(Boolean) as string[];
|
||||
if (ids.length > 0) {
|
||||
const ctxBatch = ctx as typeof ctx & {
|
||||
MessageSids?: string[];
|
||||
|
||||
@ -86,7 +86,7 @@ export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): P
|
||||
const conversationId =
|
||||
entry.message.chat_id != null
|
||||
? `chat:${entry.message.chat_id}`
|
||||
: entry.message.chat_guid ?? entry.message.chat_identifier ?? "unknown";
|
||||
: (entry.message.chat_guid ?? entry.message.chat_identifier ?? "unknown");
|
||||
return `imessage:${accountInfo.accountId}:${conversationId}:${sender}`;
|
||||
},
|
||||
shouldDebounce: (entry) => {
|
||||
@ -119,7 +119,6 @@ export async function monitorIMessageProvider(opts: MonitorIMessageOpts = {}): P
|
||||
});
|
||||
|
||||
async function handleMessageNow(message: IMessagePayload) {
|
||||
|
||||
const senderRaw = message.sender ?? "";
|
||||
const sender = senderRaw.trim();
|
||||
if (!sender) return;
|
||||
|
||||
@ -109,7 +109,9 @@ export function createSignalEventHandler(deps: SignalEventHandlerDeps) {
|
||||
Body: combinedBody,
|
||||
RawBody: entry.bodyText,
|
||||
CommandBody: entry.bodyText,
|
||||
From: entry.isGroup ? `group:${entry.groupId ?? "unknown"}` : `signal:${entry.senderRecipient}`,
|
||||
From: entry.isGroup
|
||||
? `group:${entry.groupId ?? "unknown"}`
|
||||
: `signal:${entry.senderRecipient}`,
|
||||
To: signalTo,
|
||||
SessionKey: route.sessionKey,
|
||||
AccountId: route.accountId,
|
||||
@ -207,7 +209,7 @@ export function createSignalEventHandler(deps: SignalEventHandlerDeps) {
|
||||
const inboundDebouncer = createInboundDebouncer<SignalInboundEntry>({
|
||||
debounceMs: inboundDebounceMs,
|
||||
buildKey: (entry) => {
|
||||
const conversationId = entry.isGroup ? entry.groupId ?? "unknown" : entry.senderPeerId;
|
||||
const conversationId = entry.isGroup ? (entry.groupId ?? "unknown") : entry.senderPeerId;
|
||||
if (!conversationId || !entry.senderPeerId) return null;
|
||||
return `signal:${deps.accountId}:${conversationId}:${entry.senderPeerId}`;
|
||||
},
|
||||
|
||||
@ -45,7 +45,7 @@ export function createSlackMessageHandler(params: {
|
||||
if (!last) return;
|
||||
const combinedText =
|
||||
entries.length === 1
|
||||
? last.message.text ?? ""
|
||||
? (last.message.text ?? "")
|
||||
: entries
|
||||
.map((entry) => entry.message.text ?? "")
|
||||
.filter(Boolean)
|
||||
|
||||
@ -66,7 +66,7 @@ export async function monitorWebInbox(options: {
|
||||
buildKey: (msg) => {
|
||||
const senderKey =
|
||||
msg.chatType === "group"
|
||||
? msg.senderJid ?? msg.senderE164 ?? msg.senderName ?? msg.from
|
||||
? (msg.senderJid ?? msg.senderE164 ?? msg.senderName ?? msg.from)
|
||||
: msg.from;
|
||||
if (!senderKey) return null;
|
||||
const conversationKey = msg.chatType === "group" ? msg.chatId : msg.from;
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user