Merge 33cdd8e32b1d107e8d6ede8bf4c8aeb7f299ce5e into 6b4c24c2e55b5b4013277bd799525086f6a0c40f
This commit is contained in:
commit
0daf6b8eb2
@ -105,15 +105,177 @@ export function readSessionMessages(
|
||||
return [];
|
||||
}
|
||||
|
||||
const lines = fs.readFileSync(filePath, "utf-8").split(/\r?\n/);
|
||||
|
||||
// NOTE: This is on the Gateway hot path (chat.history). Reading + splitting an entire transcript
|
||||
// file can freeze the UI when a session grows large (or when a single JSONL record is huge).
|
||||
// We therefore tail-read large files and apply a per-line size guard.
|
||||
//
|
||||
// Important: the downstream chat.history handler applies a 6 MB response budget *after*
|
||||
// sanitization/stripping, so the raw transcript can be substantially larger on disk while still
|
||||
// fitting in the response. To avoid silently dropping history that would fit post-sanitization,
|
||||
// we grow the tail window until we either (a) start at byte 0, (b) have enough line coverage, or
|
||||
// (c) hit a hard cap.
|
||||
const INITIAL_TAIL_BYTES = 18 * 1024 * 1024; // 3× the 6 MB chat.history response budget
|
||||
const MAX_TAIL_BYTES_CAP = 128 * 1024 * 1024; // hard cap to avoid huge reads on the RPC hot path
|
||||
const MIN_TAIL_LINES_TARGET = 1500; // heuristic: enough for ~1000 recent messages + headroom
|
||||
|
||||
// 200KB per line (UTF-8 bytes): a normal assistant reply is well under 50KB. Anything larger is a runaway
|
||||
// prompt/response that would only stall JSON.parse and bloat the UI.
|
||||
const MAX_LINE_BYTES = 200 * 1024;
|
||||
|
||||
let content = "";
|
||||
try {
|
||||
const stat = fs.statSync(filePath);
|
||||
|
||||
if (stat.size <= INITIAL_TAIL_BYTES) {
|
||||
content = fs.readFileSync(filePath, "utf-8");
|
||||
} else {
|
||||
const fd = fs.openSync(filePath, "r");
|
||||
try {
|
||||
let tailBytes = Math.min(stat.size, INITIAL_TAIL_BYTES);
|
||||
while (true) {
|
||||
const start = Math.max(0, stat.size - tailBytes);
|
||||
const buf = Buffer.allocUnsafe(stat.size - start);
|
||||
|
||||
// Capture bytesRead: if the file shrank between statSync and readSync (TOCTOU),
|
||||
// readSync returns fewer bytes than buf.length — slice to avoid feeding uninitialized
|
||||
// memory into the UTF-8 / JSON pipeline.
|
||||
const bytesRead = fs.readSync(fd, buf, 0, buf.length, start);
|
||||
content = buf.toString("utf-8", 0, bytesRead);
|
||||
|
||||
|
||||
// If we started mid-line, drop the partial first line.
|
||||
// Note: messages before the tail boundary are intentionally omitted to keep this RPC fast;
|
||||
// the UI will show the most recent history only.
|
||||
if (start > 0) {
|
||||
// Only drop the first line if we actually started in the middle of a record.
|
||||
// If the byte before `start` is a newline, `content` begins at a record boundary.
|
||||
const prev = Buffer.allocUnsafe(1);
|
||||
const prevRead = fs.readSync(fd, prev, 0, 1, start - 1);
|
||||
const prevChar = prevRead === 1 ? prev.toString("utf-8", 0, 1) : "";
|
||||
if (prevChar !== "\n") {
|
||||
const firstNewline = content.indexOf("\n");
|
||||
if (firstNewline >= 0) {
|
||||
content = content.slice(firstNewline + 1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const newlineCount = (content.match(/\n/g) || []).length;
|
||||
const lineCount = newlineCount + 1;
|
||||
|
||||
if (
|
||||
start === 0 ||
|
||||
lineCount >= MIN_TAIL_LINES_TARGET ||
|
||||
tailBytes >= MAX_TAIL_BYTES_CAP
|
||||
) {
|
||||
if (tailBytes >= MAX_TAIL_BYTES_CAP && start > 0 && lineCount < MIN_TAIL_LINES_TARGET) {
|
||||
console.warn(
|
||||
`[session-utils] transcript tail cap hit for session ${sessionId}: read ${tailBytes} bytes (cap ${MAX_TAIL_BYTES_CAP}) but only recovered ~${lineCount} lines; history may be truncated`,
|
||||
);
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
tailBytes = Math.min(MAX_TAIL_BYTES_CAP, tailBytes * 2);
|
||||
}
|
||||
} finally {
|
||||
fs.closeSync(fd);
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
return [];
|
||||
}
|
||||
|
||||
const lines = content.split(/\r?\n/);
|
||||
const messages: unknown[] = [];
|
||||
let messageSeq = 0;
|
||||
for (const line of lines) {
|
||||
if (!line.trim()) {
|
||||
const trimmed = line.trim();
|
||||
if (!trimmed) {
|
||||
continue;
|
||||
}
|
||||
const trimmedBytes = Buffer.byteLength(trimmed, "utf-8");
|
||||
if (trimmedBytes > MAX_LINE_BYTES) {
|
||||
// Preserve history semantics: emit a placeholder entry instead of dropping the line entirely.
|
||||
// (chat.history will still apply response-byte caps downstream.)
|
||||
console.warn(
|
||||
`[session-utils] oversized transcript line in session ${sessionId}: ${trimmedBytes} bytes (max ${MAX_LINE_BYTES}); emitting placeholder`,
|
||||
);
|
||||
// Best-effort: preserve original role/timestamp without JSON.parse (which would stall).
|
||||
// Regex scans are limited to a prefix so we don't do O(n) work over tens/hundreds of MB.
|
||||
const scan = trimmed.slice(0, 1_000_000);
|
||||
|
||||
const roleMatch = scan.match(/"role"\s*:\s*"([^"]+)"/);
|
||||
const roleCandidate = roleMatch?.[1];
|
||||
const role =
|
||||
roleCandidate === "user" ||
|
||||
roleCandidate === "assistant" ||
|
||||
roleCandidate === "tool" ||
|
||||
roleCandidate === "system"
|
||||
? roleCandidate
|
||||
: "assistant";
|
||||
|
||||
let timestamp = Date.now();
|
||||
const tsNum = scan.match(/"timestamp"\s*:\s*(\d{10,13})/);
|
||||
if (tsNum?.[1]) {
|
||||
const n = Number(tsNum[1]);
|
||||
if (Number.isFinite(n)) {
|
||||
timestamp = n;
|
||||
}
|
||||
} else {
|
||||
const tsIso = scan.match(/"timestamp"\s*:\s*"([^"]+)"/);
|
||||
const d = tsIso?.[1] ? Date.parse(tsIso[1]) : Number.NaN;
|
||||
if (Number.isFinite(d)) {
|
||||
timestamp = d;
|
||||
}
|
||||
}
|
||||
|
||||
// If the record is oversized mainly due to inline media (e.g. image blocks with `data:`),
|
||||
// downstream sanitization would normally strip that payload. Preserve some semantics here
|
||||
// without parsing the full JSON.
|
||||
const looksLikeInlineMedia = /"type"\s*:\s*"image"/.test(scan) || /"data"\s*:\s*"data:/.test(scan);
|
||||
|
||||
let placeholderText = "[chat.history omitted: message too large]";
|
||||
const textSnips: string[] = [];
|
||||
if (looksLikeInlineMedia) {
|
||||
placeholderText = "[chat.history omitted: inline media too large]";
|
||||
|
||||
// Best-effort: pull out a few text snippets so the user still sees *something* meaningful
|
||||
// even if the record is oversized due to inline media payload.
|
||||
const re = /"text"\s*:\s*"((?:\\.|[^"\\]){0,3000})"/g;
|
||||
let match: RegExpExecArray | null;
|
||||
while (textSnips.length < 4 && (match = re.exec(scan))) {
|
||||
const raw = match[1];
|
||||
const unescaped = raw.replace(/\\n/g, "\n").replace(/\\t/g, "\t");
|
||||
const flat = unescaped.replace(/\s+/g, " ").trim();
|
||||
if (flat) {
|
||||
textSnips.push(flat);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const blocks: Array<{ type: string; text: string }> = [{ type: "text", text: placeholderText }];
|
||||
if (textSnips.length) {
|
||||
blocks.push({ type: "text", text: `Context (best-effort): ${textSnips.join(" | ")}` });
|
||||
}
|
||||
|
||||
messages.push({
|
||||
role,
|
||||
content: blocks,
|
||||
timestamp,
|
||||
__openclaw: {
|
||||
kind: "oversized_transcript_line",
|
||||
sizeBytes: trimmedBytes,
|
||||
sizeChars: trimmed.length,
|
||||
guessed: true,
|
||||
looksLikeInlineMedia,
|
||||
},
|
||||
});
|
||||
continue;
|
||||
}
|
||||
try {
|
||||
const parsed = JSON.parse(line);
|
||||
const parsed = JSON.parse(trimmed);
|
||||
if (parsed?.message) {
|
||||
messageSeq += 1;
|
||||
messages.push(
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user