diff --git a/extensions/matrix/src/matrix/thread-bindings.test.ts b/extensions/matrix/src/matrix/thread-bindings.test.ts index c872f720832..2b447447c81 100644 --- a/extensions/matrix/src/matrix/thread-bindings.test.ts +++ b/extensions/matrix/src/matrix/thread-bindings.test.ts @@ -59,11 +59,12 @@ describe("matrix thread bindings", () => { accessToken: "token", } as const; - function resolveBindingsFilePath() { + function resolveBindingsFilePath(customStateDir?: string) { return path.join( resolveMatrixStoragePaths({ ...auth, env: process.env, + ...(customStateDir ? { stateDir: customStateDir } : {}), }).rootDir, "thread-bindings.json", ); @@ -432,6 +433,98 @@ describe("matrix thread bindings", () => { expect(rotatedBindingsPath).toBe(initialBindingsPath); }); + it("replaces reused account managers when the bindings stateDir changes", async () => { + const initialStateDir = stateDir; + const replacementStateDir = await fs.mkdtemp( + path.join(os.tmpdir(), "matrix-thread-bindings-replacement-"), + ); + + const initialManager = await createMatrixThreadBindingManager({ + accountId: "ops", + auth, + client: {} as never, + stateDir: initialStateDir, + idleTimeoutMs: 24 * 60 * 60 * 1000, + maxAgeMs: 0, + enableSweeper: false, + }); + + await getSessionBindingService().bind({ + targetSessionKey: "agent:ops:subagent:child", + targetKind: "subagent", + conversation: { + channel: "matrix", + accountId: "ops", + conversationId: "$thread", + parentConversationId: "!room:example", + }, + placement: "current", + }); + + const replacementManager = await createMatrixThreadBindingManager({ + accountId: "ops", + auth, + client: {} as never, + stateDir: replacementStateDir, + idleTimeoutMs: 24 * 60 * 60 * 1000, + maxAgeMs: 0, + enableSweeper: false, + }); + + expect(replacementManager).not.toBe(initialManager); + expect(replacementManager.listBindings()).toEqual([]); + expect( + getSessionBindingService().resolveByConversation({ + channel: "matrix", + accountId: "ops", + conversationId: "$thread", + parentConversationId: "!room:example", + }), + ).toBeNull(); + + await getSessionBindingService().bind({ + targetSessionKey: "agent:ops:subagent:replacement", + targetKind: "subagent", + conversation: { + channel: "matrix", + accountId: "ops", + conversationId: "$thread-2", + parentConversationId: "!room:example", + }, + placement: "current", + }); + + await vi.waitFor(async () => { + const replacementRaw = await fs.readFile( + resolveBindingsFilePath(replacementStateDir), + "utf-8", + ); + expect(JSON.parse(replacementRaw)).toMatchObject({ + version: 1, + bindings: [ + expect.objectContaining({ + conversationId: "$thread-2", + parentConversationId: "!room:example", + targetSessionKey: "agent:ops:subagent:replacement", + }), + ], + }); + }); + await vi.waitFor(async () => { + const initialRaw = await fs.readFile(resolveBindingsFilePath(initialStateDir), "utf-8"); + expect(JSON.parse(initialRaw)).toMatchObject({ + version: 1, + bindings: [ + expect.objectContaining({ + conversationId: "$thread", + parentConversationId: "!room:example", + targetSessionKey: "agent:ops:subagent:child", + }), + ], + }); + }); + }); + it("updates lifecycle windows by session key and refreshes activity", async () => { vi.useFakeTimers(); vi.setSystemTime(new Date("2026-03-06T10:00:00.000Z")); diff --git a/extensions/matrix/src/matrix/thread-bindings.ts b/extensions/matrix/src/matrix/thread-bindings.ts index fe3116f3691..6cf8029f9e9 100644 --- a/extensions/matrix/src/matrix/thread-bindings.ts +++ b/extensions/matrix/src/matrix/thread-bindings.ts @@ -62,7 +62,12 @@ export type MatrixThreadBindingManager = { stop: () => void; }; -const MANAGERS_BY_ACCOUNT_ID = new Map(); +type MatrixThreadBindingManagerCacheEntry = { + filePath: string; + manager: MatrixThreadBindingManager; +}; + +const MANAGERS_BY_ACCOUNT_ID = new Map(); const BINDINGS_BY_ACCOUNT_CONVERSATION = new Map(); function normalizeDurationMs(raw: unknown, fallback: number): number { @@ -354,17 +359,19 @@ export async function createMatrixThreadBindingManager(params: { `Matrix thread binding account mismatch: requested ${params.accountId}, auth resolved ${params.auth.accountId}`, ); } - const existing = MANAGERS_BY_ACCOUNT_ID.get(params.accountId); - if (existing) { - return existing; - } - const filePath = resolveBindingsPath({ auth: params.auth, accountId: params.accountId, env: params.env, stateDir: params.stateDir, }); + const existingEntry = MANAGERS_BY_ACCOUNT_ID.get(params.accountId); + if (existingEntry) { + if (existingEntry.filePath === filePath) { + return existingEntry.manager; + } + existingEntry.manager.stop(); + } const loaded = await loadBindingsFromDisk(filePath, params.accountId); for (const record of loaded) { setBindingRecord(record); @@ -499,7 +506,7 @@ export async function createMatrixThreadBindingManager(params: { channel: "matrix", accountId: params.accountId, }); - if (MANAGERS_BY_ACCOUNT_ID.get(params.accountId) === manager) { + if (MANAGERS_BY_ACCOUNT_ID.get(params.accountId)?.manager === manager) { MANAGERS_BY_ACCOUNT_ID.delete(params.accountId); } for (const record of listBindingsForAccount(params.accountId)) { @@ -698,14 +705,17 @@ export async function createMatrixThreadBindingManager(params: { sweepTimer.unref?.(); } - MANAGERS_BY_ACCOUNT_ID.set(params.accountId, manager); + MANAGERS_BY_ACCOUNT_ID.set(params.accountId, { + filePath, + manager, + }); return manager; } export function getMatrixThreadBindingManager( accountId: string, ): MatrixThreadBindingManager | null { - return MANAGERS_BY_ACCOUNT_ID.get(accountId) ?? null; + return MANAGERS_BY_ACCOUNT_ID.get(accountId)?.manager ?? null; } export function setMatrixThreadBindingIdleTimeoutBySessionKey(params: { @@ -713,7 +723,7 @@ export function setMatrixThreadBindingIdleTimeoutBySessionKey(params: { targetSessionKey: string; idleTimeoutMs: number; }): SessionBindingRecord[] { - const manager = MANAGERS_BY_ACCOUNT_ID.get(params.accountId); + const manager = MANAGERS_BY_ACCOUNT_ID.get(params.accountId)?.manager; if (!manager) { return []; } @@ -730,7 +740,7 @@ export function setMatrixThreadBindingMaxAgeBySessionKey(params: { targetSessionKey: string; maxAgeMs: number; }): SessionBindingRecord[] { - const manager = MANAGERS_BY_ACCOUNT_ID.get(params.accountId); + const manager = MANAGERS_BY_ACCOUNT_ID.get(params.accountId)?.manager; if (!manager) { return []; } @@ -743,7 +753,7 @@ export function setMatrixThreadBindingMaxAgeBySessionKey(params: { } export function resetMatrixThreadBindingsForTests(): void { - for (const manager of MANAGERS_BY_ACCOUNT_ID.values()) { + for (const { manager } of MANAGERS_BY_ACCOUNT_ID.values()) { manager.stop(); } MANAGERS_BY_ACCOUNT_ID.clear();