Nimrod Gutman 9aac55d306
Add /btw side questions (#45444)
* feat(agent): add /btw side questions

* fix(agent): gate and log /btw reviews

* feat(btw): isolate side-question delivery

* test(reply): update route reply runtime mocks

* fix(btw): complete side-result delivery across clients

* fix(gateway): handle streamed btw side results

* fix(telegram): unblock btw side questions

* fix(reply): make external btw replies explicit

* fix(chat): keep btw side results ephemeral in internal history

* fix(btw): address remaining review feedback

* fix(chat): preserve btw history on mobile refresh

* fix(acp): keep btw replies out of prompt history

* refactor(btw): narrow side questions to live channels

* fix(btw): preserve channel typing indicators

* fix(btw): keep side questions isolated in chat

* fix(outbound): restore typed channel send deps

* fix(btw): avoid blocking replies on transcript persistence

* fix(btw): keep side questions fast

* docs(commands): document btw slash command

* docs(changelog): add btw side questions entry

* test(outbound): align session transcript mocks
2026-03-14 17:27:54 +02:00

290 lines
8.4 KiB
TypeScript

import {
diagnosticLogger as diag,
logMessageQueued,
logSessionStateChange,
} from "../../logging/diagnostic.js";
import { resolveGlobalSingleton } from "../../shared/global-singleton.js";
type EmbeddedPiQueueHandle = {
queueMessage: (text: string) => Promise<void>;
isStreaming: () => boolean;
isCompacting: () => boolean;
abort: () => void;
};
export type ActiveEmbeddedRunSnapshot = {
transcriptLeafId: string | null;
messages?: unknown[];
inFlightPrompt?: string;
};
type EmbeddedRunWaiter = {
resolve: (ended: boolean) => void;
timer: NodeJS.Timeout;
};
/**
* Use global singleton state so busy/streaming checks stay consistent even
* when the bundler emits multiple copies of this module into separate chunks.
*/
const EMBEDDED_RUN_STATE_KEY = Symbol.for("openclaw.embeddedRunState");
const embeddedRunState = resolveGlobalSingleton(EMBEDDED_RUN_STATE_KEY, () => ({
activeRuns: new Map<string, EmbeddedPiQueueHandle>(),
snapshots: new Map<string, ActiveEmbeddedRunSnapshot>(),
waiters: new Map<string, Set<EmbeddedRunWaiter>>(),
}));
const ACTIVE_EMBEDDED_RUNS = embeddedRunState.activeRuns;
const ACTIVE_EMBEDDED_RUN_SNAPSHOTS = embeddedRunState.snapshots;
const EMBEDDED_RUN_WAITERS = embeddedRunState.waiters;
export function queueEmbeddedPiMessage(sessionId: string, text: string): boolean {
const handle = ACTIVE_EMBEDDED_RUNS.get(sessionId);
if (!handle) {
diag.debug(`queue message failed: sessionId=${sessionId} reason=no_active_run`);
return false;
}
if (!handle.isStreaming()) {
diag.debug(`queue message failed: sessionId=${sessionId} reason=not_streaming`);
return false;
}
if (handle.isCompacting()) {
diag.debug(`queue message failed: sessionId=${sessionId} reason=compacting`);
return false;
}
logMessageQueued({ sessionId, source: "pi-embedded-runner" });
void handle.queueMessage(text);
return true;
}
/**
* Abort embedded PI runs.
*
* - With a sessionId, aborts that single run.
* - With no sessionId, supports targeted abort modes (for example, compacting runs only).
*/
export function abortEmbeddedPiRun(sessionId: string): boolean;
export function abortEmbeddedPiRun(
sessionId: undefined,
opts: { mode: "all" | "compacting" },
): boolean;
export function abortEmbeddedPiRun(
sessionId?: string,
opts?: { mode?: "all" | "compacting" },
): boolean {
if (typeof sessionId === "string" && sessionId.length > 0) {
const handle = ACTIVE_EMBEDDED_RUNS.get(sessionId);
if (!handle) {
diag.debug(`abort failed: sessionId=${sessionId} reason=no_active_run`);
return false;
}
diag.debug(`aborting run: sessionId=${sessionId}`);
try {
handle.abort();
} catch (err) {
diag.warn(`abort failed: sessionId=${sessionId} err=${String(err)}`);
return false;
}
return true;
}
const mode = opts?.mode;
if (mode === "compacting") {
let aborted = false;
for (const [id, handle] of ACTIVE_EMBEDDED_RUNS) {
if (!handle.isCompacting()) {
continue;
}
diag.debug(`aborting compacting run: sessionId=${id}`);
try {
handle.abort();
aborted = true;
} catch (err) {
diag.warn(`abort failed: sessionId=${id} err=${String(err)}`);
}
}
return aborted;
}
if (mode === "all") {
let aborted = false;
for (const [id, handle] of ACTIVE_EMBEDDED_RUNS) {
diag.debug(`aborting run: sessionId=${id}`);
try {
handle.abort();
aborted = true;
} catch (err) {
diag.warn(`abort failed: sessionId=${id} err=${String(err)}`);
}
}
return aborted;
}
return false;
}
export function isEmbeddedPiRunActive(sessionId: string): boolean {
const active = ACTIVE_EMBEDDED_RUNS.has(sessionId);
if (active) {
diag.debug(`run active check: sessionId=${sessionId} active=true`);
}
return active;
}
export function isEmbeddedPiRunStreaming(sessionId: string): boolean {
const handle = ACTIVE_EMBEDDED_RUNS.get(sessionId);
if (!handle) {
return false;
}
return handle.isStreaming();
}
export function getActiveEmbeddedRunCount(): number {
return ACTIVE_EMBEDDED_RUNS.size;
}
export function getActiveEmbeddedRunSnapshot(
sessionId: string,
): ActiveEmbeddedRunSnapshot | undefined {
return ACTIVE_EMBEDDED_RUN_SNAPSHOTS.get(sessionId);
}
/**
* Wait for active embedded runs to drain.
*
* Used during restarts so in-flight compaction runs can release session write
* locks before the next lifecycle starts.
*/
export async function waitForActiveEmbeddedRuns(
timeoutMs = 15_000,
opts?: { pollMs?: number },
): Promise<{ drained: boolean }> {
const pollMsRaw = opts?.pollMs ?? 250;
const pollMs = Math.max(10, Math.floor(pollMsRaw));
const maxWaitMs = Math.max(pollMs, Math.floor(timeoutMs));
const startedAt = Date.now();
while (true) {
if (ACTIVE_EMBEDDED_RUNS.size === 0) {
return { drained: true };
}
const elapsedMs = Date.now() - startedAt;
if (elapsedMs >= maxWaitMs) {
diag.warn(
`wait for active embedded runs timed out: activeRuns=${ACTIVE_EMBEDDED_RUNS.size} timeoutMs=${maxWaitMs}`,
);
return { drained: false };
}
await new Promise<void>((resolve) => setTimeout(resolve, pollMs));
}
}
export function waitForEmbeddedPiRunEnd(sessionId: string, timeoutMs = 15_000): Promise<boolean> {
if (!sessionId || !ACTIVE_EMBEDDED_RUNS.has(sessionId)) {
return Promise.resolve(true);
}
diag.debug(`waiting for run end: sessionId=${sessionId} timeoutMs=${timeoutMs}`);
return new Promise((resolve) => {
const waiters = EMBEDDED_RUN_WAITERS.get(sessionId) ?? new Set();
const waiter: EmbeddedRunWaiter = {
resolve,
timer: setTimeout(
() => {
waiters.delete(waiter);
if (waiters.size === 0) {
EMBEDDED_RUN_WAITERS.delete(sessionId);
}
diag.warn(`wait timeout: sessionId=${sessionId} timeoutMs=${timeoutMs}`);
resolve(false);
},
Math.max(100, timeoutMs),
),
};
waiters.add(waiter);
EMBEDDED_RUN_WAITERS.set(sessionId, waiters);
if (!ACTIVE_EMBEDDED_RUNS.has(sessionId)) {
waiters.delete(waiter);
if (waiters.size === 0) {
EMBEDDED_RUN_WAITERS.delete(sessionId);
}
clearTimeout(waiter.timer);
resolve(true);
}
});
}
function notifyEmbeddedRunEnded(sessionId: string) {
const waiters = EMBEDDED_RUN_WAITERS.get(sessionId);
if (!waiters || waiters.size === 0) {
return;
}
EMBEDDED_RUN_WAITERS.delete(sessionId);
diag.debug(`notifying waiters: sessionId=${sessionId} waiterCount=${waiters.size}`);
for (const waiter of waiters) {
clearTimeout(waiter.timer);
waiter.resolve(true);
}
}
export function setActiveEmbeddedRun(
sessionId: string,
handle: EmbeddedPiQueueHandle,
sessionKey?: string,
) {
const wasActive = ACTIVE_EMBEDDED_RUNS.has(sessionId);
ACTIVE_EMBEDDED_RUNS.set(sessionId, handle);
logSessionStateChange({
sessionId,
sessionKey,
state: "processing",
reason: wasActive ? "run_replaced" : "run_started",
});
if (!sessionId.startsWith("probe-")) {
diag.debug(`run registered: sessionId=${sessionId} totalActive=${ACTIVE_EMBEDDED_RUNS.size}`);
}
}
export function updateActiveEmbeddedRunSnapshot(
sessionId: string,
snapshot: ActiveEmbeddedRunSnapshot,
) {
if (!ACTIVE_EMBEDDED_RUNS.has(sessionId)) {
return;
}
ACTIVE_EMBEDDED_RUN_SNAPSHOTS.set(sessionId, snapshot);
}
export function clearActiveEmbeddedRun(
sessionId: string,
handle: EmbeddedPiQueueHandle,
sessionKey?: string,
) {
if (ACTIVE_EMBEDDED_RUNS.get(sessionId) === handle) {
ACTIVE_EMBEDDED_RUNS.delete(sessionId);
ACTIVE_EMBEDDED_RUN_SNAPSHOTS.delete(sessionId);
logSessionStateChange({ sessionId, sessionKey, state: "idle", reason: "run_completed" });
if (!sessionId.startsWith("probe-")) {
diag.debug(`run cleared: sessionId=${sessionId} totalActive=${ACTIVE_EMBEDDED_RUNS.size}`);
}
notifyEmbeddedRunEnded(sessionId);
} else {
diag.debug(`run clear skipped: sessionId=${sessionId} reason=handle_mismatch`);
}
}
export const __testing = {
resetActiveEmbeddedRuns() {
for (const waiters of EMBEDDED_RUN_WAITERS.values()) {
for (const waiter of waiters) {
clearTimeout(waiter.timer);
waiter.resolve(true);
}
}
EMBEDDED_RUN_WAITERS.clear();
ACTIVE_EMBEDDED_RUNS.clear();
ACTIVE_EMBEDDED_RUN_SNAPSHOTS.clear();
},
};
export type { EmbeddedPiQueueHandle };