From f1e012e0fc41418ca193c4067beca7b0c966ca3d Mon Sep 17 00:00:00 2001 From: Vincent Koc Date: Fri, 20 Mar 2026 00:28:52 -0700 Subject: [PATCH] fix(telegram): serialize thread binding persists --- .../telegram/src/thread-bindings.test.ts | 54 ++++++++- extensions/telegram/src/thread-bindings.ts | 113 +++++++++++++++--- .../session-binding.contract.test.ts | 4 +- 3 files changed, 144 insertions(+), 27 deletions(-) diff --git a/extensions/telegram/src/thread-bindings.test.ts b/extensions/telegram/src/thread-bindings.test.ts index 39b9c63338b..cc9bd2a1209 100644 --- a/extensions/telegram/src/thread-bindings.test.ts +++ b/extensions/telegram/src/thread-bindings.test.ts @@ -15,12 +15,13 @@ import { describe("telegram thread bindings", () => { let stateDirOverride: string | undefined; - beforeEach(() => { - __testing.resetTelegramThreadBindingsForTests(); + beforeEach(async () => { + await __testing.resetTelegramThreadBindingsForTests(); }); - afterEach(() => { + afterEach(async () => { vi.useRealTimers(); + await __testing.resetTelegramThreadBindingsForTests(); if (stateDirOverride) { delete process.env.OPENCLAW_STATE_DIR; fs.rmSync(stateDirOverride, { recursive: true, force: true }); @@ -90,7 +91,7 @@ describe("telegram thread bindings", () => { "./thread-bindings.js?scope=shared-b", ); - bindingsA.__testing.resetTelegramThreadBindingsForTests(); + await bindingsA.__testing.resetTelegramThreadBindingsForTests(); try { const managerA = bindingsA.createTelegramThreadBindingManager({ @@ -123,7 +124,7 @@ describe("telegram thread bindings", () => { ?.getByConversationId("-100200300:topic:44")?.targetSessionKey, ).toBe("agent:main:subagent:child-shared"); } finally { - bindingsA.__testing.resetTelegramThreadBindingsForTests(); + await bindingsA.__testing.resetTelegramThreadBindingsForTests(); } }); @@ -237,7 +238,7 @@ describe("telegram thread bindings", () => { reason: "test-detach", }); - __testing.resetTelegramThreadBindingsForTests(); + await __testing.resetTelegramThreadBindingsForTests(); const reloaded = createTelegramThreadBindingManager({ accountId: "default", @@ -247,4 +248,45 @@ describe("telegram thread bindings", () => { expect(reloaded.getByConversationId("8460800771")).toBeUndefined(); }); + + it("flushes pending lifecycle update persists before test reset", async () => { + stateDirOverride = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-telegram-bindings-")); + process.env.OPENCLAW_STATE_DIR = stateDirOverride; + vi.useFakeTimers(); + vi.setSystemTime(new Date("2026-03-06T10:00:00.000Z")); + + createTelegramThreadBindingManager({ + accountId: "persist-reset", + persist: true, + enableSweeper: false, + }); + + await getSessionBindingService().bind({ + targetSessionKey: "agent:main:subagent:child-3", + targetKind: "subagent", + conversation: { + channel: "telegram", + accountId: "persist-reset", + conversationId: "-100200300:topic:99", + }, + }); + + setTelegramThreadBindingIdleTimeoutBySessionKey({ + accountId: "persist-reset", + targetSessionKey: "agent:main:subagent:child-3", + idleTimeoutMs: 90_000, + }); + + await __testing.resetTelegramThreadBindingsForTests(); + + const statePath = path.join( + resolveStateDir(process.env, os.homedir), + "telegram", + "thread-bindings-persist-reset.json", + ); + const persisted = JSON.parse(fs.readFileSync(statePath, "utf8")) as { + bindings?: Array<{ idleTimeoutMs?: number }>; + }; + expect(persisted.bindings?.[0]?.idleTimeoutMs).toBe(90_000); + }); }); diff --git a/extensions/telegram/src/thread-bindings.ts b/extensions/telegram/src/thread-bindings.ts index aaf13e15561..8b7be041197 100644 --- a/extensions/telegram/src/thread-bindings.ts +++ b/extensions/telegram/src/thread-bindings.ts @@ -67,6 +67,7 @@ export type TelegramThreadBindingManager = { type TelegramThreadBindingsState = { managersByAccountId: Map; bindingsByAccountConversation: Map; + persistQueueByAccountId: Map>; }; /** @@ -80,10 +81,12 @@ const threadBindingsState = resolveGlobalSingleton( () => ({ managersByAccountId: new Map(), bindingsByAccountConversation: new Map(), + persistQueueByAccountId: new Map>(), }), ); const MANAGERS_BY_ACCOUNT_ID = threadBindingsState.managersByAccountId; const BINDINGS_BY_ACCOUNT_CONVERSATION = threadBindingsState.bindingsByAccountConversation; +const PERSIST_QUEUE_BY_ACCOUNT_ID = threadBindingsState.persistQueueByAccountId; function normalizeDurationMs(raw: unknown, fallback: number): number { if (typeof raw !== "number" || !Number.isFinite(raw)) { @@ -323,16 +326,18 @@ function loadBindingsFromDisk(accountId: string): TelegramThreadBindingRecord[] async function persistBindingsToDisk(params: { accountId: string; persist: boolean; + bindings?: TelegramThreadBindingRecord[]; }): Promise { if (!params.persist) { return; } - const bindings = [...BINDINGS_BY_ACCOUNT_CONVERSATION.values()].filter( - (entry) => entry.accountId === params.accountId, - ); const payload: StoredTelegramBindingState = { version: STORE_VERSION, - bindings, + bindings: + params.bindings ?? + [...BINDINGS_BY_ACCOUNT_CONVERSATION.values()].filter( + (entry) => entry.accountId === params.accountId, + ), }; await writeJsonAtomic(resolveBindingsPath(params.accountId), payload, { mode: 0o600, @@ -341,6 +346,48 @@ async function persistBindingsToDisk(params: { }); } +function listBindingsForAccount(accountId: string): TelegramThreadBindingRecord[] { + return [...BINDINGS_BY_ACCOUNT_CONVERSATION.values()].filter( + (entry) => entry.accountId === accountId, + ); +} + +function enqueuePersistBindings(params: { + accountId: string; + persist: boolean; + bindings?: TelegramThreadBindingRecord[]; +}): Promise { + if (!params.persist) { + return Promise.resolve(); + } + const previous = PERSIST_QUEUE_BY_ACCOUNT_ID.get(params.accountId) ?? Promise.resolve(); + const next = previous + .catch(() => undefined) + .then(async () => { + await persistBindingsToDisk(params); + }); + PERSIST_QUEUE_BY_ACCOUNT_ID.set(params.accountId, next); + void next.finally(() => { + if (PERSIST_QUEUE_BY_ACCOUNT_ID.get(params.accountId) === next) { + PERSIST_QUEUE_BY_ACCOUNT_ID.delete(params.accountId); + } + }); + return next; +} + +function persistBindingsSafely(params: { + accountId: string; + persist: boolean; + bindings?: TelegramThreadBindingRecord[]; + reason: string; +}): void { + void enqueuePersistBindings(params).catch((err) => { + logVerbose( + `telegram thread bindings persist failed (${params.accountId}, ${params.reason}): ${String(err)}`, + ); + }); +} + function normalizeTimestampMs(raw: unknown): number { if (typeof raw !== "number" || !Number.isFinite(raw)) { return Date.now(); @@ -414,9 +461,6 @@ export function createTelegramThreadBindingManager( }); } - const listBindingsForAccount = () => - [...BINDINGS_BY_ACCOUNT_CONVERSATION.values()].filter((entry) => entry.accountId === accountId); - let sweepTimer: NodeJS.Timeout | null = null; const manager: TelegramThreadBindingManager = { @@ -441,11 +485,11 @@ export function createTelegramThreadBindingManager( if (!targetSessionKey) { return []; } - return listBindingsForAccount().filter( + return listBindingsForAccount(accountId).filter( (entry) => entry.targetSessionKey === targetSessionKey, ); }, - listBindings: () => listBindingsForAccount(), + listBindings: () => listBindingsForAccount(accountId), touchConversation: (conversationIdRaw, at) => { const conversationId = normalizeConversationId(conversationIdRaw); if (!conversationId) { @@ -461,7 +505,12 @@ export function createTelegramThreadBindingManager( lastActivityAt: normalizeTimestampMs(at ?? Date.now()), }; BINDINGS_BY_ACCOUNT_CONVERSATION.set(key, nextRecord); - void persistBindingsToDisk({ accountId, persist: manager.shouldPersistMutations() }); + persistBindingsSafely({ + accountId, + persist: manager.shouldPersistMutations(), + bindings: listBindingsForAccount(accountId), + reason: "touch", + }); return nextRecord; }, unbindConversation: (unbindParams) => { @@ -475,7 +524,12 @@ export function createTelegramThreadBindingManager( return null; } BINDINGS_BY_ACCOUNT_CONVERSATION.delete(key); - void persistBindingsToDisk({ accountId, persist: manager.shouldPersistMutations() }); + persistBindingsSafely({ + accountId, + persist: manager.shouldPersistMutations(), + bindings: listBindingsForAccount(accountId), + reason: "unbind-conversation", + }); return removed; }, unbindBySessionKey: (unbindParams) => { @@ -484,7 +538,7 @@ export function createTelegramThreadBindingManager( return []; } const removed: TelegramThreadBindingRecord[] = []; - for (const entry of listBindingsForAccount()) { + for (const entry of listBindingsForAccount(accountId)) { if (entry.targetSessionKey !== targetSessionKey) { continue; } @@ -496,7 +550,12 @@ export function createTelegramThreadBindingManager( removed.push(entry); } if (removed.length > 0) { - void persistBindingsToDisk({ accountId, persist: manager.shouldPersistMutations() }); + persistBindingsSafely({ + accountId, + persist: manager.shouldPersistMutations(), + bindings: listBindingsForAccount(accountId), + reason: "unbind-session", + }); } return removed; }, @@ -544,7 +603,11 @@ export function createTelegramThreadBindingManager( resolveBindingKey({ accountId, conversationId }), record, ); - await persistBindingsToDisk({ accountId, persist: manager.shouldPersistMutations() }); + await enqueuePersistBindings({ + accountId, + persist: manager.shouldPersistMutations(), + bindings: listBindingsForAccount(accountId), + }); logVerbose( `telegram: bound conversation ${conversationId} -> ${targetSessionKey} (${summarizeLifecycleForLog( record, @@ -605,7 +668,11 @@ export function createTelegramThreadBindingManager( sendFarewell: false, }); if (removed.length > 0) { - await persistBindingsToDisk({ accountId, persist: manager.shouldPersistMutations() }); + await enqueuePersistBindings({ + accountId, + persist: manager.shouldPersistMutations(), + bindings: listBindingsForAccount(accountId), + }); } return removed.map((entry) => toSessionBindingRecord(entry, { @@ -627,7 +694,11 @@ export function createTelegramThreadBindingManager( sendFarewell: false, }); if (removed) { - await persistBindingsToDisk({ accountId, persist: manager.shouldPersistMutations() }); + await enqueuePersistBindings({ + accountId, + persist: manager.shouldPersistMutations(), + bindings: listBindingsForAccount(accountId), + }); } return removed ? [ @@ -644,7 +715,7 @@ export function createTelegramThreadBindingManager( if (sweeperEnabled) { sweepTimer = setInterval(() => { const now = Date.now(); - for (const record of listBindingsForAccount()) { + for (const record of listBindingsForAccount(accountId)) { const idleExpired = shouldExpireByIdle({ now, record, @@ -699,9 +770,11 @@ function updateTelegramBindingsBySessionKey(params: { updated.push(next); } if (updated.length > 0) { - void persistBindingsToDisk({ + persistBindingsSafely({ accountId: params.manager.accountId, persist: params.manager.shouldPersistMutations(), + bindings: listBindingsForAccount(params.manager.accountId), + reason: "session-lifecycle-update", }); } return updated; @@ -750,10 +823,12 @@ export function setTelegramThreadBindingMaxAgeBySessionKey(params: { } export const __testing = { - resetTelegramThreadBindingsForTests() { + async resetTelegramThreadBindingsForTests() { for (const manager of MANAGERS_BY_ACCOUNT_ID.values()) { manager.stop(); } + await Promise.allSettled(PERSIST_QUEUE_BY_ACCOUNT_ID.values()); + PERSIST_QUEUE_BY_ACCOUNT_ID.clear(); MANAGERS_BY_ACCOUNT_ID.clear(); BINDINGS_BY_ACCOUNT_CONVERSATION.clear(); }, diff --git a/src/channels/plugins/contracts/session-binding.contract.test.ts b/src/channels/plugins/contracts/session-binding.contract.test.ts index efc85cb74b4..87f7922c3e4 100644 --- a/src/channels/plugins/contracts/session-binding.contract.test.ts +++ b/src/channels/plugins/contracts/session-binding.contract.test.ts @@ -22,12 +22,12 @@ vi.mock("../../../../extensions/matrix/src/matrix/send.js", async () => { }; }); -beforeEach(() => { +beforeEach(async () => { sessionBindingTesting.resetSessionBindingAdaptersForTests(); discordThreadBindingTesting.resetThreadBindingsForTests(); feishuThreadBindingTesting.resetFeishuThreadBindingsForTests(); resetMatrixThreadBindingsForTests(); - telegramThreadBindingTesting.resetTelegramThreadBindingsForTests(); + await telegramThreadBindingTesting.resetTelegramThreadBindingsForTests(); }); for (const entry of sessionBindingContractRegistry) {