After a drain loop empties the queue it deletes the key from FOLLOWUP_QUEUES. If a new message arrives at that moment enqueueFollowupRun creates a fresh queue object with draining:false but never starts a drain, leaving the message stranded until the next run completes and calls finalizeWithFollowup. Fix: persist the most recent runFollowup callback per queue key in FOLLOWUP_RUN_CALLBACKS (drain.ts). enqueueFollowupRun now calls kickFollowupDrainIfIdle after a successful push; if a cached callback exists and no drain is running it calls scheduleFollowupDrain to restart immediately. clearSessionQueues cleans up the callback cache alongside the queue state.
32 lines
996 B
TypeScript
32 lines
996 B
TypeScript
import { resolveEmbeddedSessionLane } from "../../../agents/pi-embedded.js";
|
|
import { clearCommandLane } from "../../../process/command-queue.js";
|
|
import { clearFollowupDrainCallback } from "./drain.js";
|
|
import { clearFollowupQueue } from "./state.js";
|
|
|
|
export type ClearSessionQueueResult = {
|
|
followupCleared: number;
|
|
laneCleared: number;
|
|
keys: string[];
|
|
};
|
|
|
|
export function clearSessionQueues(keys: Array<string | undefined>): ClearSessionQueueResult {
|
|
const seen = new Set<string>();
|
|
let followupCleared = 0;
|
|
let laneCleared = 0;
|
|
const clearedKeys: string[] = [];
|
|
|
|
for (const key of keys) {
|
|
const cleaned = key?.trim();
|
|
if (!cleaned || seen.has(cleaned)) {
|
|
continue;
|
|
}
|
|
seen.add(cleaned);
|
|
clearedKeys.push(cleaned);
|
|
followupCleared += clearFollowupQueue(cleaned);
|
|
clearFollowupDrainCallback(cleaned);
|
|
laneCleared += clearCommandLane(resolveEmbeddedSessionLane(cleaned));
|
|
}
|
|
|
|
return { followupCleared, laneCleared, keys: clearedKeys };
|
|
}
|