Add deterministic context control layer that intercepts prompt construction without modifying existing architecture: - context_engine.py: single choke point (build_context) that assembles structured prompts from ledger + sigil + live window, with token budget enforcement and automatic window shrinking - ledger.py: bounded per-stream JSON state (orientation, blockers, open questions, delta) with hard field/list limits - sigil.py: FIFO shorthand memory (max 15 entries) with deterministic rule-based generation from message patterns - token_gate.py: fast token estimation (~4 chars/token) and hard cap enforcement with configurable MAX_TOKENS/LIVE_WINDOW - redact.py: secret pattern detection (Discord, OpenAI, Anthropic, AWS, Slack, GitHub, Telegram, Bearer, generic key=value) replaced with [REDACTED_SECRET] before any output path All 64 tests passing. No modifications to existing agent spawning, model routing, tool system, or Discord relay architecture. https://claude.ai/code/session_01K7BWJY2gUoJi6dq91Yc7nx
101 lines
3.0 KiB
Python
101 lines
3.0 KiB
Python
"""Tests for ra2.ledger"""
|
|
|
|
import json
|
|
import os
|
|
import tempfile
|
|
import pytest
|
|
from ra2 import ledger
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def tmp_ledger_dir(monkeypatch, tmp_path):
|
|
"""Redirect ledger storage to a temp directory for each test."""
|
|
d = str(tmp_path / "ledgers")
|
|
monkeypatch.setattr(ledger, "LEDGER_DIR", d)
|
|
return d
|
|
|
|
|
|
class TestLoadSave:
|
|
def test_load_empty(self):
|
|
data = ledger.load("test-stream")
|
|
assert data["stream"] == "test-stream"
|
|
assert data["orientation"] == ""
|
|
assert data["blockers"] == []
|
|
assert data["open"] == []
|
|
|
|
def test_save_and_load(self):
|
|
data = {
|
|
"stream": "s1",
|
|
"orientation": "build context engine",
|
|
"latest": "implemented ledger",
|
|
"blockers": ["rate limits"],
|
|
"open": ["how to compress?"],
|
|
"delta": "added ledger module",
|
|
}
|
|
ledger.save("s1", data)
|
|
loaded = ledger.load("s1")
|
|
assert loaded == data
|
|
|
|
def test_save_enforces_field_length(self):
|
|
data = {
|
|
"stream": "s1",
|
|
"orientation": "x" * 1000,
|
|
"latest": "",
|
|
"blockers": [],
|
|
"open": [],
|
|
"delta": "",
|
|
}
|
|
ledger.save("s1", data)
|
|
loaded = ledger.load("s1")
|
|
assert len(loaded["orientation"]) == ledger.MAX_FIELD_CHARS
|
|
|
|
def test_save_enforces_list_length(self):
|
|
data = {
|
|
"stream": "s1",
|
|
"orientation": "",
|
|
"latest": "",
|
|
"blockers": [f"blocker-{i}" for i in range(20)],
|
|
"open": [f"question-{i}" for i in range(20)],
|
|
"delta": "",
|
|
}
|
|
ledger.save("s1", data)
|
|
loaded = ledger.load("s1")
|
|
assert len(loaded["blockers"]) == ledger.MAX_BLOCKERS
|
|
assert len(loaded["open"]) == ledger.MAX_OPEN
|
|
|
|
|
|
class TestUpdate:
|
|
def test_update_fields(self):
|
|
result = ledger.update("s1", orientation="test orientation", delta="did stuff")
|
|
assert result["orientation"] == "test orientation"
|
|
assert result["delta"] == "did stuff"
|
|
assert result["stream"] == "s1"
|
|
|
|
def test_update_ignores_unknown_keys(self):
|
|
result = ledger.update("s1", unknown_key="value")
|
|
assert "unknown_key" not in result
|
|
|
|
def test_update_persists(self):
|
|
ledger.update("s1", orientation="phase 1")
|
|
loaded = ledger.load("s1")
|
|
assert loaded["orientation"] == "phase 1"
|
|
|
|
|
|
class TestSnapshot:
|
|
def test_snapshot_empty(self):
|
|
snap = ledger.snapshot("empty-stream")
|
|
assert "stream: empty-stream" in snap
|
|
assert "orientation:" in snap
|
|
|
|
def test_snapshot_with_data(self):
|
|
ledger.update(
|
|
"s1",
|
|
orientation="context sovereignty",
|
|
blockers=["rate limits"],
|
|
open=["compression strategy?"],
|
|
)
|
|
snap = ledger.snapshot("s1")
|
|
assert "context sovereignty" in snap
|
|
assert "rate limits" in snap
|
|
assert "compression strategy?" in snap
|