From dbccc73d7a1b6681c96c1f513ca42646755a0cae Mon Sep 17 00:00:00 2001 From: Tak Hoffman <781889+Takhoffman@users.noreply.github.com> Date: Tue, 3 Mar 2026 00:21:15 -0600 Subject: [PATCH] security(line): synthesize strict LINE auth boundary hardening LINE auth boundary hardening synthesis for inbound webhook authn/z/authz: - account-scoped pairing-store access - strict DM/group allowlist boundary separation - fail-closed webhook auth/runtime behavior - replay and duplicate handling with in-flight continuity for concurrent redeliveries Source PRs: #26701, #26683, #25978, #17593, #16619, #31990, #26047, #30584, #18777 Related continuity context: #21955 Co-authored-by: bmendonca3 <208517100+bmendonca3@users.noreply.github.com> Co-authored-by: davidahmann <46606159+davidahmann@users.noreply.github.com> Co-authored-by: harshang03 <58983401+harshang03@users.noreply.github.com> Co-authored-by: haosenwang1018 <167664334+haosenwang1018@users.noreply.github.com> Co-authored-by: liuxiaopai-ai <73659136+liuxiaopai-ai@users.noreply.github.com> Co-authored-by: coygeek <65363919+coygeek@users.noreply.github.com> Co-authored-by: lailoo <20536249+lailoo@users.noreply.github.com> --- CHANGELOG.md | 1 + src/line/bot-handlers.test.ts | 428 ++++++++++++++++++++++---- src/line/bot-handlers.ts | 195 ++++++++++-- src/line/bot-message-context.test.ts | 20 ++ src/line/bot.ts | 4 +- src/line/webhook-node.test.ts | 7 +- src/line/webhook-node.ts | 12 +- src/line/webhook.test.ts | 45 ++- src/line/webhook.ts | 18 +- src/media-understanding/apply.test.ts | 2 + 10 files changed, 619 insertions(+), 113 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e0cb53962a8..3bb10f71d9c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ Docs: https://docs.openclaw.ai ### Fixes +- LINE/auth boundary hardening synthesis: enforce strict LINE webhook authn/z boundary semantics across pairing-store account scoping, DM/group allowlist separation, fail-closed webhook auth/runtime behavior, and replay/duplication controls (including in-flight replay reservation and post-success dedupe marking). (from #26701, #26683, #25978, #17593, #16619, #31990, #26047, #30584, #18777) Thanks @bmendonca3, @davidahmann, @harshang03, @haosenwang1018, @liuxiaopai-ai, @coygeek, and @Takhoffman. - LINE/media download synthesis: fix file-media download handling and M4A audio classification across overlapping LINE regressions. (from #26386, #27761, #27787, #29509, #29755, #29776, #29785, #32240) Thanks @kevinWangSheng, @loiie45e, @carrotRakko, @Sid-Qin, @codeafridi, and @bmendonca3. - LINE/context and routing synthesis: fix group/room peer routing and command-authorization context propagation, and keep processing later events in mixed-success webhook batches. (from #21955, #24475, #27035, #28286) Thanks @lailoo, @mcaxtr, @jervyclaw, @Glucksberg, and @Takhoffman. - LINE/status/config/webhook synthesis: fix status false positives from snapshot/config state and accept LINE webhook HEAD probes for compatibility. (from #10487, #25726, #27537, #27908, #31387) Thanks @BlueBirdBack, @stakeswky, @loiie45e, @puritysb, and @mcaxtr. diff --git a/src/line/bot-handlers.test.ts b/src/line/bot-handlers.test.ts index 85c8b273161..39bfdf939e0 100644 --- a/src/line/bot-handlers.test.ts +++ b/src/line/bot-handlers.test.ts @@ -1,4 +1,4 @@ -import type { MessageEvent } from "@line/bot-sdk"; +import type { MessageEvent, PostbackEvent } from "@line/bot-sdk"; import { beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; // Avoid pulling in globals/pairing/media dependencies; this suite only asserts @@ -16,15 +16,10 @@ vi.mock("../pairing/pairing-messages.js", () => ({ buildPairingReply: () => "pairing-reply", })); -const { downloadLineMediaMock } = vi.hoisted(() => ({ - downloadLineMediaMock: vi.fn(async () => ({ - path: "/tmp/line-media-file.pdf", - contentType: "application/pdf", - })), -})); - vi.mock("./download.js", () => ({ - downloadLineMedia: downloadLineMediaMock, + downloadLineMedia: async () => { + throw new Error("downloadLineMedia should not be called from bot-handlers tests"); + }, })); vi.mock("./send.js", () => ({ @@ -44,7 +39,7 @@ const { buildLineMessageContextMock, buildLinePostbackContextMock } = vi.hoisted isGroup: true, accountId: "default", })), - buildLinePostbackContextMock: vi.fn(async () => null), + buildLinePostbackContextMock: vi.fn(async () => null as unknown), })); vi.mock("./bot-message-context.js", () => ({ @@ -69,6 +64,7 @@ const { readAllowFromStoreMock, upsertPairingRequestMock } = vi.hoisted(() => ({ })); let handleLineWebhookEvents: typeof import("./bot-handlers.js").handleLineWebhookEvents; +let createLineWebhookReplayCache: typeof import("./bot-handlers.js").createLineWebhookReplayCache; const createRuntime = () => ({ log: vi.fn(), error: vi.fn(), exit: vi.fn() }); @@ -79,13 +75,12 @@ vi.mock("../pairing/pairing-store.js", () => ({ describe("handleLineWebhookEvents", () => { beforeAll(async () => { - ({ handleLineWebhookEvents } = await import("./bot-handlers.js")); + ({ handleLineWebhookEvents, createLineWebhookReplayCache } = await import("./bot-handlers.js")); }); beforeEach(() => { buildLineMessageContextMock.mockClear(); buildLinePostbackContextMock.mockClear(); - downloadLineMediaMock.mockClear(); readAllowFromStoreMock.mockClear(); upsertPairingRequestMock.mockClear(); }); @@ -188,23 +183,23 @@ describe("handleLineWebhookEvents", () => { expect(processMessage).toHaveBeenCalledTimes(1); }); - it("blocks group sender that is only present in pairing-store allowlist", async () => { + it("blocks group sender not in groupAllowFrom even when sender is paired in DM store", async () => { + readAllowFromStoreMock.mockResolvedValueOnce(["user-store"]); const processMessage = vi.fn(); - readAllowFromStoreMock.mockResolvedValueOnce(["user-paired"]); const event = { type: "message", - message: { id: "m3b", type: "text", text: "hi" }, + message: { id: "m5", type: "text", text: "hi" }, replyToken: "reply-token", timestamp: Date.now(), - source: { type: "group", groupId: "group-1", userId: "user-paired" }, + source: { type: "group", groupId: "group-1", userId: "user-store" }, mode: "active", - webhookEventId: "evt-3b", + webhookEventId: "evt-5", deliveryContext: { isRedelivery: false }, } as MessageEvent; await handleLineWebhookEvents([event], { cfg: { - channels: { line: { groupPolicy: "allowlist", groupAllowFrom: ["user-owner"] } }, + channels: { line: { groupPolicy: "allowlist", groupAllowFrom: ["user-group"] } }, }, account: { accountId: "default", @@ -212,15 +207,54 @@ describe("handleLineWebhookEvents", () => { channelAccessToken: "token", channelSecret: "secret", tokenSource: "config", - config: { groupPolicy: "allowlist", groupAllowFrom: ["user-owner"] }, + config: { groupPolicy: "allowlist", groupAllowFrom: ["user-group"] }, }, runtime: createRuntime(), mediaMaxBytes: 1, processMessage, }); - expect(buildLineMessageContextMock).not.toHaveBeenCalled(); expect(processMessage).not.toHaveBeenCalled(); + expect(buildLineMessageContextMock).not.toHaveBeenCalled(); + expect(readAllowFromStoreMock).toHaveBeenCalledWith("line", undefined, "default"); + }); + + it("does not authorize group messages from DM pairing-store entries when group allowlist is empty", async () => { + readAllowFromStoreMock.mockResolvedValueOnce(["user-5"]); + const processMessage = vi.fn(); + const event = { + type: "message", + message: { id: "m5b", type: "text", text: "hi" }, + replyToken: "reply-token", + timestamp: Date.now(), + source: { type: "group", groupId: "group-1", userId: "user-5" }, + mode: "active", + webhookEventId: "evt-5b", + deliveryContext: { isRedelivery: false }, + } as MessageEvent; + + await handleLineWebhookEvents([event], { + cfg: { channels: { line: { groupPolicy: "allowlist" } } }, + account: { + accountId: "default", + enabled: true, + channelAccessToken: "token", + channelSecret: "secret", + tokenSource: "config", + config: { + dmPolicy: "pairing", + allowFrom: [], + groupPolicy: "allowlist", + groupAllowFrom: [], + }, + }, + runtime: createRuntime(), + mediaMaxBytes: 1, + processMessage, + }); + + expect(processMessage).not.toHaveBeenCalled(); + expect(buildLineMessageContextMock).not.toHaveBeenCalled(); }); it("blocks group messages when wildcard group config disables groups", async () => { @@ -255,21 +289,286 @@ describe("handleLineWebhookEvents", () => { expect(buildLineMessageContextMock).not.toHaveBeenCalled(); }); - it("downloads file attachments and forwards media refs to message context", async () => { + it("scopes DM pairing requests to accountId", async () => { const processMessage = vi.fn(); const event = { type: "message", - message: { id: "mf-1", type: "file", fileName: "doc.pdf", fileSize: "42" }, + message: { id: "m5", type: "text", text: "hi" }, replyToken: "reply-token", timestamp: Date.now(), - source: { type: "user", userId: "user-file" }, + source: { type: "user", userId: "user-5" }, mode: "active", - webhookEventId: "evt-file-1", + webhookEventId: "evt-5", deliveryContext: { isRedelivery: false }, } as MessageEvent; await handleLineWebhookEvents([event], { - cfg: { channels: { line: {} } }, + cfg: { channels: { line: { dmPolicy: "pairing" } } }, + account: { + accountId: "default", + enabled: true, + channelAccessToken: "token", + channelSecret: "secret", + tokenSource: "config", + config: { dmPolicy: "pairing", allowFrom: ["user-owner"] }, + }, + runtime: createRuntime(), + mediaMaxBytes: 1, + processMessage, + }); + + expect(processMessage).not.toHaveBeenCalled(); + expect(upsertPairingRequestMock).toHaveBeenCalledWith( + expect.objectContaining({ + channel: "line", + id: "user-5", + accountId: "default", + }), + ); + }); + + it("does not authorize DM senders from another account's pairing-store entries", async () => { + const processMessage = vi.fn(); + readAllowFromStoreMock.mockImplementation(async (...args: unknown[]) => { + const accountId = args[2] as string | undefined; + if (accountId === "work") { + return []; + } + return ["cross-account-user"]; + }); + upsertPairingRequestMock.mockResolvedValue({ code: "CODE", created: false }); + + const event = { + type: "message", + message: { id: "m6", type: "text", text: "hi" }, + replyToken: "reply-token", + timestamp: Date.now(), + source: { type: "user", userId: "cross-account-user" }, + mode: "active", + webhookEventId: "evt-6", + deliveryContext: { isRedelivery: false }, + } as MessageEvent; + + await handleLineWebhookEvents([event], { + cfg: { channels: { line: { dmPolicy: "pairing" } } }, + account: { + accountId: "work", + enabled: true, + channelAccessToken: "token-work", + channelSecret: "secret-work", + tokenSource: "config", + config: { dmPolicy: "pairing" }, + }, + runtime: createRuntime(), + mediaMaxBytes: 1, + processMessage, + }); + + expect(readAllowFromStoreMock).toHaveBeenCalledWith("line", undefined, "work"); + expect(processMessage).not.toHaveBeenCalled(); + expect(upsertPairingRequestMock).toHaveBeenCalledWith( + expect.objectContaining({ + channel: "line", + id: "cross-account-user", + accountId: "work", + }), + ); + }); + + it("deduplicates replayed webhook events by webhookEventId before processing", async () => { + const processMessage = vi.fn(); + const event = { + type: "message", + message: { id: "m-replay", type: "text", text: "hello" }, + replyToken: "reply-token", + timestamp: Date.now(), + source: { type: "group", groupId: "group-replay", userId: "user-replay" }, + mode: "active", + webhookEventId: "evt-replay-1", + deliveryContext: { isRedelivery: true }, + } as MessageEvent; + + const context: Parameters[1] = { + cfg: { channels: { line: { groupPolicy: "open" } } }, + account: { + accountId: "default", + enabled: true, + channelAccessToken: "token", + channelSecret: "secret", + tokenSource: "config", + config: { groupPolicy: "open" }, + }, + runtime: createRuntime(), + mediaMaxBytes: 1, + processMessage, + replayCache: createLineWebhookReplayCache(), + }; + + await handleLineWebhookEvents([event], context); + await handleLineWebhookEvents([event], context); + + expect(buildLineMessageContextMock).toHaveBeenCalledTimes(1); + expect(processMessage).toHaveBeenCalledTimes(1); + }); + + it("skips concurrent redeliveries while the first event is still processing", async () => { + let resolveFirst: (() => void) | undefined; + const firstDone = new Promise((resolve) => { + resolveFirst = resolve; + }); + const processMessage = vi.fn(async () => { + await firstDone; + }); + const event = { + type: "message", + message: { id: "m-inflight", type: "text", text: "hello" }, + replyToken: "reply-token", + timestamp: Date.now(), + source: { type: "group", groupId: "group-inflight", userId: "user-inflight" }, + mode: "active", + webhookEventId: "evt-inflight-1", + deliveryContext: { isRedelivery: true }, + } as MessageEvent; + + const context: Parameters[1] = { + cfg: { channels: { line: { groupPolicy: "open" } } }, + account: { + accountId: "default", + enabled: true, + channelAccessToken: "token", + channelSecret: "secret", + tokenSource: "config", + config: { groupPolicy: "open" }, + }, + runtime: createRuntime(), + mediaMaxBytes: 1, + processMessage, + replayCache: createLineWebhookReplayCache(), + }; + + const firstRun = handleLineWebhookEvents([event], context); + await Promise.resolve(); + const secondRun = handleLineWebhookEvents([event], context); + resolveFirst?.(); + await Promise.all([firstRun, secondRun]); + + expect(buildLineMessageContextMock).toHaveBeenCalledTimes(1); + expect(processMessage).toHaveBeenCalledTimes(1); + }); + + it("mirrors in-flight replay failures so concurrent duplicates also fail", async () => { + let rejectFirst: ((err: Error) => void) | undefined; + const firstDone = new Promise((_, reject) => { + rejectFirst = reject; + }); + const processMessage = vi.fn(async () => { + await firstDone; + }); + const event = { + type: "message", + message: { id: "m-inflight-fail", type: "text", text: "hello" }, + replyToken: "reply-token", + timestamp: Date.now(), + source: { type: "group", groupId: "group-inflight", userId: "user-inflight" }, + mode: "active", + webhookEventId: "evt-inflight-fail-1", + deliveryContext: { isRedelivery: true }, + } as MessageEvent; + + const context: Parameters[1] = { + cfg: { channels: { line: { groupPolicy: "open" } } }, + account: { + accountId: "default", + enabled: true, + channelAccessToken: "token", + channelSecret: "secret", + tokenSource: "config", + config: { groupPolicy: "open" }, + }, + runtime: createRuntime(), + mediaMaxBytes: 1, + processMessage, + replayCache: createLineWebhookReplayCache(), + }; + + const firstRun = handleLineWebhookEvents([event], context); + await Promise.resolve(); + const secondRun = handleLineWebhookEvents([event], context); + rejectFirst?.(new Error("transient inflight failure")); + + await expect(firstRun).rejects.toThrow("transient inflight failure"); + await expect(secondRun).rejects.toThrow("transient inflight failure"); + expect(processMessage).toHaveBeenCalledTimes(1); + }); + + it("deduplicates redeliveries by LINE message id when webhookEventId changes", async () => { + const processMessage = vi.fn(); + const event = { + type: "message", + message: { id: "m-dup-1", type: "text", text: "hello" }, + replyToken: "reply-token", + timestamp: Date.now(), + source: { type: "group", groupId: "group-dup", userId: "user-dup" }, + mode: "active", + webhookEventId: "evt-dup-1", + deliveryContext: { isRedelivery: false }, + } as MessageEvent; + + const context: Parameters[1] = { + cfg: { + channels: { line: { groupPolicy: "allowlist", groupAllowFrom: ["user-dup"] } }, + }, + account: { + accountId: "default", + enabled: true, + channelAccessToken: "token", + channelSecret: "secret", + tokenSource: "config", + config: { groupPolicy: "allowlist", groupAllowFrom: ["user-dup"] }, + }, + runtime: createRuntime(), + mediaMaxBytes: 1, + processMessage, + replayCache: createLineWebhookReplayCache(), + }; + + await handleLineWebhookEvents([event], context); + await handleLineWebhookEvents( + [ + { + ...event, + webhookEventId: "evt-dup-redelivery", + deliveryContext: { isRedelivery: true }, + } as MessageEvent, + ], + context, + ); + + expect(buildLineMessageContextMock).toHaveBeenCalledTimes(1); + expect(processMessage).toHaveBeenCalledTimes(1); + }); + + it("deduplicates postback redeliveries by webhookEventId when replyToken changes", async () => { + const processMessage = vi.fn(); + buildLinePostbackContextMock.mockResolvedValue({ + ctxPayload: { From: "line:user:user-postback" }, + route: { agentId: "default" }, + isGroup: false, + accountId: "default", + }); + const event = { + type: "postback", + postback: { data: "action=confirm" }, + replyToken: "reply-token-1", + timestamp: Date.now(), + source: { type: "user", userId: "user-postback" }, + mode: "active", + webhookEventId: "evt-postback-1", + deliveryContext: { isRedelivery: false }, + } as PostbackEvent; + + const context: Parameters[1] = { + cfg: { channels: { line: { dmPolicy: "open" } } }, account: { accountId: "default", enabled: true, @@ -279,69 +578,66 @@ describe("handleLineWebhookEvents", () => { config: { dmPolicy: "open" }, }, runtime: createRuntime(), - mediaMaxBytes: 1234, + mediaMaxBytes: 1, processMessage, - }); + replayCache: createLineWebhookReplayCache(), + }; - expect(downloadLineMediaMock).toHaveBeenCalledTimes(1); - expect(downloadLineMediaMock).toHaveBeenCalledWith("mf-1", "token", 1234); - expect(buildLineMessageContextMock).toHaveBeenCalledTimes(1); - expect(buildLineMessageContextMock).toHaveBeenCalledWith( - expect.objectContaining({ - commandAuthorized: false, - allMedia: [ - { - path: "/tmp/line-media-file.pdf", - contentType: "application/pdf", - }, - ], - }), + await handleLineWebhookEvents([event], context); + await handleLineWebhookEvents( + [ + { + ...event, + replyToken: "reply-token-2", + deliveryContext: { isRedelivery: true }, + } as PostbackEvent, + ], + context, ); + + expect(buildLinePostbackContextMock).toHaveBeenCalledTimes(1); expect(processMessage).toHaveBeenCalledTimes(1); }); - it("continues processing later events when one event handler fails", async () => { - const failingEvent = { + it("does not mark replay cache when event processing fails", async () => { + const processMessage = vi + .fn() + .mockRejectedValueOnce(new Error("transient failure")) + .mockResolvedValueOnce(undefined); + const event = { type: "message", - message: { id: "m-err", type: "text", text: "hi" }, + message: { id: "m-fail-then-retry", type: "text", text: "hello" }, replyToken: "reply-token", timestamp: Date.now(), - source: { type: "user", userId: "user-err" }, + source: { type: "group", groupId: "group-retry", userId: "user-retry" }, mode: "active", - webhookEventId: "evt-err", + webhookEventId: "evt-fail-then-retry", deliveryContext: { isRedelivery: false }, } as MessageEvent; - const laterEvent = { - ...failingEvent, - message: { id: "m-later", type: "text", text: "hello" }, - webhookEventId: "evt-later", - } as MessageEvent; - const runtime = createRuntime(); - let invocation = 0; - const processMessage = vi.fn(async () => { - if (invocation === 0) { - invocation += 1; - throw new Error("boom"); - } - invocation += 1; - }); - await handleLineWebhookEvents([failingEvent, laterEvent], { - cfg: { channels: { line: {} } }, + const context: Parameters[1] = { + cfg: { channels: { line: { groupPolicy: "open" } } }, account: { accountId: "default", enabled: true, channelAccessToken: "token", channelSecret: "secret", tokenSource: "config", - config: { dmPolicy: "open" }, + config: { groupPolicy: "open" }, }, - runtime, - mediaMaxBytes: 1234, + runtime: createRuntime(), + mediaMaxBytes: 1, processMessage, - }); + replayCache: createLineWebhookReplayCache(), + }; + await expect(handleLineWebhookEvents([event], context)).rejects.toThrow("transient failure"); + await handleLineWebhookEvents([event], context); + + expect(buildLineMessageContextMock).toHaveBeenCalledTimes(2); expect(processMessage).toHaveBeenCalledTimes(2); - expect(runtime.error).toHaveBeenCalledTimes(1); + expect(context.runtime.error).toHaveBeenCalledWith( + expect.stringContaining("line: event handler failed: Error: transient failure"), + ); }); }); diff --git a/src/line/bot-handlers.ts b/src/line/bot-handlers.ts index 52b0cd76095..f28d41e66cf 100644 --- a/src/line/bot-handlers.ts +++ b/src/line/bot-handlers.ts @@ -63,6 +63,148 @@ export interface LineHandlerContext { runtime: RuntimeEnv; mediaMaxBytes: number; processMessage: (ctx: LineInboundContext) => Promise; + replayCache?: LineWebhookReplayCache; +} + +const LINE_WEBHOOK_REPLAY_WINDOW_MS = 10 * 60 * 1000; +const LINE_WEBHOOK_REPLAY_MAX_ENTRIES = 4096; +const LINE_WEBHOOK_REPLAY_PRUNE_INTERVAL_MS = 1000; +export type LineWebhookReplayCache = { + seenEvents: Map; + inFlightEvents: Map>; + lastPruneAtMs: number; +}; + +export function createLineWebhookReplayCache(): LineWebhookReplayCache { + return { + seenEvents: new Map(), + inFlightEvents: new Map>(), + lastPruneAtMs: 0, + }; +} + +function pruneLineWebhookReplayCache(cache: LineWebhookReplayCache, nowMs: number): void { + const minSeenAt = nowMs - LINE_WEBHOOK_REPLAY_WINDOW_MS; + for (const [key, seenAt] of cache.seenEvents) { + if (seenAt < minSeenAt) { + cache.seenEvents.delete(key); + } + } + + if (cache.seenEvents.size > LINE_WEBHOOK_REPLAY_MAX_ENTRIES) { + const deleteCount = cache.seenEvents.size - LINE_WEBHOOK_REPLAY_MAX_ENTRIES; + let deleted = 0; + for (const key of cache.seenEvents.keys()) { + if (deleted >= deleteCount) { + break; + } + cache.seenEvents.delete(key); + deleted += 1; + } + } +} + +function buildLineWebhookReplayKey( + event: WebhookEvent, + accountId: string, +): { key: string; eventId: string } | null { + if (event.type === "message") { + const messageId = event.message?.id?.trim(); + if (messageId) { + return { + key: `${accountId}|message:${messageId}`, + eventId: `message:${messageId}`, + }; + } + } + const eventId = (event as { webhookEventId?: string }).webhookEventId?.trim(); + if (!eventId) { + return null; + } + + const source = ( + event as { + source?: { type?: string; userId?: string; groupId?: string; roomId?: string }; + } + ).source; + const sourceId = + source?.type === "group" + ? `group:${source.groupId ?? ""}` + : source?.type === "room" + ? `room:${source.roomId ?? ""}` + : `user:${source?.userId ?? ""}`; + return { key: `${accountId}|${event.type}|${sourceId}|${eventId}`, eventId: `event:${eventId}` }; +} + +type LineReplayCandidate = { + key: string; + eventId: string; + seenAtMs: number; + cache: LineWebhookReplayCache; +}; + +type LineInFlightReplayResult = { + promise: Promise; + resolve: () => void; + reject: (err: unknown) => void; +}; + +function getLineReplayCandidate( + event: WebhookEvent, + context: LineHandlerContext, +): LineReplayCandidate | null { + const replay = buildLineWebhookReplayKey(event, context.account.accountId); + const cache = context.replayCache; + if (!replay || !cache) { + return null; + } + + const nowMs = Date.now(); + if ( + nowMs - cache.lastPruneAtMs >= LINE_WEBHOOK_REPLAY_PRUNE_INTERVAL_MS || + cache.seenEvents.size >= LINE_WEBHOOK_REPLAY_MAX_ENTRIES + ) { + pruneLineWebhookReplayCache(cache, nowMs); + cache.lastPruneAtMs = nowMs; + } + return { key: replay.key, eventId: replay.eventId, seenAtMs: nowMs, cache }; +} + +function shouldSkipLineReplayEvent( + candidate: LineReplayCandidate, +): { skip: true; inFlightResult?: Promise } | { skip: false } { + const inFlightResult = candidate.cache.inFlightEvents.get(candidate.key); + if (inFlightResult) { + logVerbose(`line: skipped in-flight replayed webhook event ${candidate.eventId}`); + return { skip: true, inFlightResult }; + } + if (candidate.cache.seenEvents.has(candidate.key)) { + logVerbose(`line: skipped replayed webhook event ${candidate.eventId}`); + return { skip: true }; + } + return { skip: false }; +} + +function markLineReplayEventInFlight(candidate: LineReplayCandidate): LineInFlightReplayResult { + let resolve!: () => void; + let reject!: (err: unknown) => void; + const promise = new Promise((resolvePromise, rejectPromise) => { + resolve = resolvePromise; + reject = rejectPromise; + }); + // Prevent unhandled rejection warnings when no concurrent duplicate awaits + // this in-flight reservation. + void promise.catch(() => {}); + candidate.cache.inFlightEvents.set(candidate.key, promise); + return { promise, resolve, reject }; +} + +function clearLineReplayEventInFlight(candidate: LineReplayCandidate): void { + candidate.cache.inFlightEvents.delete(candidate.key); +} + +function rememberLineReplayEvent(candidate: LineReplayCandidate): void { + candidate.cache.seenEvents.set(candidate.key, candidate.seenAtMs); } function resolveLineGroupConfig(params: { @@ -128,15 +270,11 @@ async function sendLinePairingReply(params: { } } -type LineAccessDecision = { - allowed: boolean; - commandAuthorized: boolean; -}; - async function shouldProcessLineEvent( event: MessageEvent | PostbackEvent, context: LineHandlerContext, -): Promise { +): Promise<{ allowed: boolean; commandAuthorized: boolean }> { + const denied = { allowed: false, commandAuthorized: false }; const { cfg, account } = context; const { userId, groupId, roomId, isGroup } = getLineSourceInfo(event.source); const senderId = userId ?? ""; @@ -144,7 +282,7 @@ async function shouldProcessLineEvent( const storeAllowFrom = await readChannelAllowFromStore( "line", - process.env, + undefined, account.accountId, ).catch(() => []); const effectiveDmAllow = normalizeDmAllowFromWithStore({ @@ -162,8 +300,8 @@ async function shouldProcessLineEvent( account.config.groupAllowFrom, fallbackGroupAllowFrom, ); - // Group authorization stays explicit to group allowlists and must not - // inherit DM pairing-store identities. + // Group sender policy must be derived from explicit group config only. + // Pairing store entries are DM-oriented and must not expand group allowlists. const effectiveGroupAllow = normalizeAllowFrom(groupAllowFrom); const defaultGroupPolicy = resolveDefaultGroupPolicy(cfg); const { groupPolicy, providerMissingFallbackApplied } = @@ -179,8 +317,6 @@ async function shouldProcessLineEvent( log: (message) => logVerbose(message), }); - const denied = { allowed: false, commandAuthorized: false }; - if (isGroup) { if (groupConfig?.enabled === false) { logVerbose(`Blocked line group ${groupId ?? roomId ?? "unknown"} (group disabled)`); @@ -214,8 +350,6 @@ async function shouldProcessLineEvent( return denied; } } - - // Resolve command authorization using the same pattern as Telegram/Discord/Slack. const allowForCommands = effectiveGroupAllow; const senderAllowedForCommands = isSenderAllowed({ allow: allowForCommands, senderId }); const useAccessGroups = cfg.commands?.useAccessGroups !== false; @@ -252,7 +386,6 @@ async function shouldProcessLineEvent( return denied; } - // Resolve command authorization for DMs. const allowForCommands = effectiveDmAllow; const senderAllowedForCommands = isSenderAllowed({ allow: allowForCommands, senderId }); const useAccessGroups = cfg.commands?.useAccessGroups !== false; @@ -266,7 +399,6 @@ async function shouldProcessLineEvent( return { allowed: true, commandAuthorized: commandGate.commandAuthorized }; } -/** Extract raw text from a LINE message or postback event for command detection. */ function resolveEventRawText(event: MessageEvent | PostbackEvent): string { if (event.type === "message") { const msg = event.message; @@ -382,7 +514,24 @@ export async function handleLineWebhookEvents( events: WebhookEvent[], context: LineHandlerContext, ): Promise { + let firstError: unknown; for (const event of events) { + const replayCandidate = getLineReplayCandidate(event, context); + const replaySkip = replayCandidate ? shouldSkipLineReplayEvent(replayCandidate) : null; + if (replaySkip?.skip) { + if (replaySkip.inFlightResult) { + try { + await replaySkip.inFlightResult; + } catch (err) { + context.runtime.error?.(danger(`line: replayed in-flight event failed: ${String(err)}`)); + firstError ??= err; + } + } + continue; + } + const inFlightReservation = replayCandidate + ? markLineReplayEventInFlight(replayCandidate) + : null; try { switch (event.type) { case "message": @@ -406,11 +555,21 @@ export async function handleLineWebhookEvents( default: logVerbose(`line: unhandled event type: ${(event as WebhookEvent).type}`); } + if (replayCandidate) { + rememberLineReplayEvent(replayCandidate); + inFlightReservation?.resolve(); + clearLineReplayEventInFlight(replayCandidate); + } } catch (err) { + if (replayCandidate) { + inFlightReservation?.reject(err); + clearLineReplayEventInFlight(replayCandidate); + } context.runtime.error?.(danger(`line: event handler failed: ${String(err)}`)); - // Continue processing remaining events in this batch. Webhook ACK is sent - // before processing, so dropping later events here would make them unrecoverable. - continue; + firstError ??= err; } } + if (firstError) { + throw firstError; + } } diff --git a/src/line/bot-message-context.test.ts b/src/line/bot-message-context.test.ts index 98d1dfdebac..f6d6583a60b 100644 --- a/src/line/bot-message-context.test.ts +++ b/src/line/bot-message-context.test.ts @@ -114,6 +114,26 @@ describe("buildLineMessageContext", () => { expect(context?.ctxPayload.To).toBe("line:room:room-1"); }); + it("keeps non-text message contexts fail-closed for command auth", async () => { + const event = createMessageEvent( + { type: "user", userId: "user-audio" }, + { + message: { id: "audio-1", type: "audio", duration: 1000 } as MessageEvent["message"], + }, + ); + + const context = await buildLineMessageContext({ + event, + allMedia: [], + cfg, + account, + commandAuthorized: false, + }); + + expect(context).not.toBeNull(); + expect(context?.ctxPayload.CommandAuthorized).toBe(false); + }); + it("sets CommandAuthorized=true when authorized", async () => { const event = createMessageEvent({ type: "user", userId: "user-auth" }); diff --git a/src/line/bot.ts b/src/line/bot.ts index b008cd94fbf..c7a6f508035 100644 --- a/src/line/bot.ts +++ b/src/line/bot.ts @@ -5,7 +5,7 @@ import { loadConfig } from "../config/config.js"; import { logVerbose } from "../globals.js"; import { createNonExitingRuntime, type RuntimeEnv } from "../runtime.js"; import { resolveLineAccount } from "./accounts.js"; -import { handleLineWebhookEvents } from "./bot-handlers.js"; +import { createLineWebhookReplayCache, handleLineWebhookEvents } from "./bot-handlers.js"; import type { LineInboundContext } from "./bot-message-context.js"; import type { ResolvedLineAccount } from "./types.js"; import { startLineWebhook } from "./webhook.js"; @@ -41,6 +41,7 @@ export function createLineBot(opts: LineBotOptions): LineBot { (async () => { logVerbose("line: no message handler configured"); }); + const replayCache = createLineWebhookReplayCache(); const handleWebhook = async (body: WebhookRequestBody): Promise => { if (!body.events || body.events.length === 0) { @@ -53,6 +54,7 @@ export function createLineBot(opts: LineBotOptions): LineBot { runtime, mediaMaxBytes, processMessage, + replayCache, }); }; diff --git a/src/line/webhook-node.test.ts b/src/line/webhook-node.test.ts index 105a20060d8..82cc8d1f1f0 100644 --- a/src/line/webhook-node.test.ts +++ b/src/line/webhook-node.test.ts @@ -195,7 +195,7 @@ describe("createLineNodeWebhookHandler", () => { ); }); - it("returns 200 immediately and logs when event processing fails", async () => { + it("returns 500 when event processing fails and does not acknowledge with 200", async () => { const rawBody = JSON.stringify({ events: [{ type: "message" }] }); const { secret } = createPostWebhookTestHarness(rawBody); const failingBot = { @@ -213,10 +213,9 @@ describe("createLineNodeWebhookHandler", () => { const { res } = createRes(); await runSignedPost({ handler: failingHandler, rawBody, secret, res }); - await Promise.resolve(); - expect(res.statusCode).toBe(200); - expect(res.body).toBe(JSON.stringify({ status: "ok" })); + expect(res.statusCode).toBe(500); + expect(res.body).toBe(JSON.stringify({ error: "Internal server error" })); expect(failingBot.handleWebhook).toHaveBeenCalledTimes(1); expect(runtime.error).toHaveBeenCalledTimes(1); }); diff --git a/src/line/webhook-node.ts b/src/line/webhook-node.ts index de6c08a1cd6..7d531cbed55 100644 --- a/src/line/webhook-node.ts +++ b/src/line/webhook-node.ts @@ -111,16 +111,14 @@ export function createLineNodeWebhookHandler(params: { return; } + if (body.events && body.events.length > 0) { + logVerbose(`line: received ${body.events.length} webhook events`); + await params.bot.handleWebhook(body); + } + res.statusCode = 200; res.setHeader("Content-Type", "application/json"); res.end(JSON.stringify({ status: "ok" })); - - if (body.events && body.events.length > 0) { - logVerbose(`line: received ${body.events.length} webhook events`); - void params.bot.handleWebhook(body).catch((err) => { - params.runtime.error?.(danger(`line webhook handler failed: ${String(err)}`)); - }); - } } catch (err) { if (isRequestBodyLimitError(err, "PAYLOAD_TOO_LARGE")) { res.statusCode = 413; diff --git a/src/line/webhook.test.ts b/src/line/webhook.test.ts index 6943d2aa995..19640fd3114 100644 --- a/src/line/webhook.test.ts +++ b/src/line/webhook.test.ts @@ -1,7 +1,7 @@ import crypto from "node:crypto"; import type { WebhookRequestBody } from "@line/bot-sdk"; import { describe, expect, it, vi } from "vitest"; -import { createLineWebhookMiddleware } from "./webhook.js"; +import { createLineWebhookMiddleware, startLineWebhook } from "./webhook.js"; const sign = (body: string, secret: string) => crypto.createHmac("SHA256", secret).update(body).digest("base64"); @@ -54,6 +54,15 @@ async function invokeWebhook(params: { } describe("createLineWebhookMiddleware", () => { + it("rejects startup when channel secret is missing", () => { + expect(() => + startLineWebhook({ + channelSecret: " ", + onEvents: async () => {}, + }), + ).toThrow(/requires a non-empty channel secret/i); + }); + it.each([ ["raw string body", JSON.stringify({ events: [{ type: "message" }] })], ["raw buffer body", Buffer.from(JSON.stringify({ events: [{ type: "follow" }] }), "utf-8")], @@ -112,17 +121,31 @@ describe("createLineWebhookMiddleware", () => { expect(onEvents).not.toHaveBeenCalled(); }); - it("returns 200 immediately when onEvents fails", async () => { - const { res, onEvents } = await invokeWebhook({ - body: JSON.stringify({ events: [{ type: "message" }] }), - onEvents: vi.fn(async () => { - throw new Error("transient failure"); - }), + it("returns 500 when event processing fails and does not acknowledge with 200", async () => { + const onEvents = vi.fn(async () => { + throw new Error("boom"); + }); + const runtime = { log: vi.fn(), error: vi.fn(), exit: vi.fn() }; + const rawBody = JSON.stringify({ events: [{ type: "message" }] }); + const middleware = createLineWebhookMiddleware({ + channelSecret: SECRET, + onEvents, + runtime, }); - await Promise.resolve(); - expect(onEvents).toHaveBeenCalledTimes(1); - expect(res.status).toHaveBeenCalledWith(200); - expect(res.json).toHaveBeenCalledWith({ status: "ok" }); + const req = { + headers: { "x-line-signature": sign(rawBody, SECRET) }, + body: rawBody, + // oxlint-disable-next-line typescript/no-explicit-any + } as any; + const res = createRes(); + + // oxlint-disable-next-line typescript/no-explicit-any + await middleware(req, res, {} as any); + + expect(res.status).toHaveBeenCalledWith(500); + expect(res.status).not.toHaveBeenCalledWith(200); + expect(res.json).toHaveBeenCalledWith({ error: "Internal server error" }); + expect(runtime.error).toHaveBeenCalled(); }); }); diff --git a/src/line/webhook.ts b/src/line/webhook.ts index f2452decc47..d16ee4aa7c9 100644 --- a/src/line/webhook.ts +++ b/src/line/webhook.ts @@ -71,14 +71,12 @@ export function createLineWebhookMiddleware( return; } - res.status(200).json({ status: "ok" }); - if (body.events && body.events.length > 0) { logVerbose(`line: received ${body.events.length} webhook events`); - void onEvents(body).catch((err) => { - runtime?.error?.(danger(`line webhook handler failed: ${String(err)}`)); - }); + await onEvents(body); } + + res.status(200).json({ status: "ok" }); } catch (err) { runtime?.error?.(danger(`line webhook error: ${String(err)}`)); if (!res.headersSent) { @@ -99,9 +97,17 @@ export function startLineWebhook(options: StartLineWebhookOptions): { path: string; handler: (req: Request, res: Response, _next: NextFunction) => Promise; } { + const channelSecret = + typeof options.channelSecret === "string" ? options.channelSecret.trim() : ""; + if (!channelSecret) { + throw new Error( + "LINE webhook mode requires a non-empty channel secret. " + + "Set channels.line.channelSecret in your config.", + ); + } const path = options.path ?? "/line/webhook"; const middleware = createLineWebhookMiddleware({ - channelSecret: options.channelSecret, + channelSecret, onEvents: options.onEvents, runtime: options.runtime, }); diff --git a/src/media-understanding/apply.test.ts b/src/media-understanding/apply.test.ts index 2b17720c143..f49bd859e31 100644 --- a/src/media-understanding/apply.test.ts +++ b/src/media-understanding/apply.test.ts @@ -307,6 +307,7 @@ describe("applyMediaUnderstanding", () => { const ctx = await createAudioCtx({ body: " /capture status", }); + ctx.CommandAuthorized = false; const result = await applyMediaUnderstanding({ ctx, cfg: createGroqAudioConfig(), @@ -320,6 +321,7 @@ describe("applyMediaUnderstanding", () => { body: "[Audio]\nUser text:\n/capture status\nTranscript:\ntranscribed text", commandBody: "/capture status", }); + expect(ctx.CommandAuthorized).toBe(false); }); it("handles URL-only attachments for audio transcription", async () => {