From 82cb185881c5b345d2890e204f068ca7e2d7ec1d Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Wed, 18 Feb 2026 16:47:03 +0000 Subject: [PATCH] refactor(core): unify bounded concurrency runner --- src/media-understanding/concurrency.ts | 31 +++------- src/memory/internal.ts | 34 ++--------- src/utils/run-with-concurrency.test.ts | 81 ++++++++++++++++++++++++++ src/utils/run-with-concurrency.ts | 48 +++++++++++++++ 4 files changed, 143 insertions(+), 51 deletions(-) create mode 100644 src/utils/run-with-concurrency.test.ts create mode 100644 src/utils/run-with-concurrency.ts diff --git a/src/media-understanding/concurrency.ts b/src/media-understanding/concurrency.ts index b70be5afed4..8449358d58b 100644 --- a/src/media-understanding/concurrency.ts +++ b/src/media-understanding/concurrency.ts @@ -1,33 +1,18 @@ import { logVerbose, shouldLogVerbose } from "../globals.js"; +import { runTasksWithConcurrency } from "../utils/run-with-concurrency.js"; export async function runWithConcurrency( tasks: Array<() => Promise>, limit: number, ): Promise { - if (tasks.length === 0) { - return []; - } - const resolvedLimit = Math.max(1, Math.min(limit, tasks.length)); - const results: T[] = Array.from({ length: tasks.length }); - let next = 0; - - const workers = Array.from({ length: resolvedLimit }, async () => { - while (true) { - const index = next; - next += 1; - if (index >= tasks.length) { - return; + const { results } = await runTasksWithConcurrency({ + tasks, + limit, + onTaskError(err) { + if (shouldLogVerbose()) { + logVerbose(`Media understanding task failed: ${String(err)}`); } - try { - results[index] = await tasks[index](); - } catch (err) { - if (shouldLogVerbose()) { - logVerbose(`Media understanding task failed: ${String(err)}`); - } - } - } + }, }); - - await Promise.allSettled(workers); return results; } diff --git a/src/memory/internal.ts b/src/memory/internal.ts index 73fd2b63697..04afeb8c8a8 100644 --- a/src/memory/internal.ts +++ b/src/memory/internal.ts @@ -2,6 +2,7 @@ import crypto from "node:crypto"; import fsSync from "node:fs"; import fs from "node:fs/promises"; import path from "node:path"; +import { runTasksWithConcurrency } from "../utils/run-with-concurrency.js"; export type MemoryFileEntry = { path: string; @@ -301,35 +302,12 @@ export async function runWithConcurrency( tasks: Array<() => Promise>, limit: number, ): Promise { - if (tasks.length === 0) { - return []; - } - const resolvedLimit = Math.max(1, Math.min(limit, tasks.length)); - const results: T[] = Array.from({ length: tasks.length }); - let next = 0; - let firstError: unknown = null; - - const workers = Array.from({ length: resolvedLimit }, async () => { - while (true) { - if (firstError) { - return; - } - const index = next; - next += 1; - if (index >= tasks.length) { - return; - } - try { - results[index] = await tasks[index](); - } catch (err) { - firstError = err; - return; - } - } + const { results, firstError, hasError } = await runTasksWithConcurrency({ + tasks, + limit, + errorMode: "stop", }); - - await Promise.allSettled(workers); - if (firstError) { + if (hasError) { throw firstError; } return results; diff --git a/src/utils/run-with-concurrency.test.ts b/src/utils/run-with-concurrency.test.ts new file mode 100644 index 00000000000..91725b686e1 --- /dev/null +++ b/src/utils/run-with-concurrency.test.ts @@ -0,0 +1,81 @@ +import { describe, expect, it, vi } from "vitest"; +import { runTasksWithConcurrency } from "./run-with-concurrency.js"; + +describe("runTasksWithConcurrency", () => { + it("preserves task order with bounded worker count", async () => { + let running = 0; + let peak = 0; + const tasks = [25, 10, 5, 15].map((delayMs, index) => async (): Promise => { + running += 1; + peak = Math.max(peak, running); + await new Promise((resolve) => setTimeout(resolve, delayMs)); + running -= 1; + return index + 1; + }); + + const result = await runTasksWithConcurrency({ tasks, limit: 2 }); + expect(result.hasError).toBe(false); + expect(result.firstError).toBeUndefined(); + expect(result.results).toEqual([1, 2, 3, 4]); + expect(peak).toBeLessThanOrEqual(2); + }); + + it("stops scheduling after first failure in stop mode", async () => { + const err = new Error("boom"); + const seen: number[] = []; + const tasks = [ + async () => { + seen.push(0); + return 10; + }, + async () => { + seen.push(1); + throw err; + }, + async () => { + seen.push(2); + return 30; + }, + ]; + + const result = await runTasksWithConcurrency({ + tasks, + limit: 1, + errorMode: "stop", + }); + expect(result.hasError).toBe(true); + expect(result.firstError).toBe(err); + expect(result.results[0]).toBe(10); + expect(result.results[2]).toBeUndefined(); + expect(seen).toEqual([0, 1]); + }); + + it("continues after failures and reports the first one", async () => { + const firstErr = new Error("first"); + const onTaskError = vi.fn(); + const tasks = [ + async () => { + throw firstErr; + }, + async () => 20, + async () => { + throw new Error("second"); + }, + async () => 40, + ]; + + const result = await runTasksWithConcurrency({ + tasks, + limit: 1, + errorMode: "continue", + onTaskError, + }); + expect(result.hasError).toBe(true); + expect(result.firstError).toBe(firstErr); + expect(result.results[1]).toBe(20); + expect(result.results[3]).toBe(40); + expect(onTaskError).toHaveBeenCalledTimes(2); + expect(onTaskError).toHaveBeenNthCalledWith(1, firstErr, 0); + expect(onTaskError).toHaveBeenNthCalledWith(2, expect.any(Error), 2); + }); +}); diff --git a/src/utils/run-with-concurrency.ts b/src/utils/run-with-concurrency.ts new file mode 100644 index 00000000000..7f1c500c67f --- /dev/null +++ b/src/utils/run-with-concurrency.ts @@ -0,0 +1,48 @@ +export type ConcurrencyErrorMode = "continue" | "stop"; + +export async function runTasksWithConcurrency(params: { + tasks: Array<() => Promise>; + limit: number; + errorMode?: ConcurrencyErrorMode; + onTaskError?: (error: unknown, index: number) => void; +}): Promise<{ results: T[]; firstError: unknown; hasError: boolean }> { + const { tasks, limit, onTaskError } = params; + const errorMode = params.errorMode ?? "continue"; + if (tasks.length === 0) { + return { results: [], firstError: undefined, hasError: false }; + } + + const resolvedLimit = Math.max(1, Math.min(limit, tasks.length)); + const results: T[] = Array.from({ length: tasks.length }); + let next = 0; + let firstError: unknown = undefined; + let hasError = false; + + const workers = Array.from({ length: resolvedLimit }, async () => { + while (true) { + if (errorMode === "stop" && hasError) { + return; + } + const index = next; + next += 1; + if (index >= tasks.length) { + return; + } + try { + results[index] = await tasks[index](); + } catch (error) { + if (!hasError) { + firstError = error; + hasError = true; + } + onTaskError?.(error, index); + if (errorMode === "stop") { + return; + } + } + } + }); + + await Promise.allSettled(workers); + return { results, firstError, hasError }; +}