From 56d19a01304d5db2103b5b3f4c7ac2f2af82e2f4 Mon Sep 17 00:00:00 2001 From: Claude Date: Thu, 19 Feb 2026 22:41:23 +0000 Subject: [PATCH] feat(ra2): implement Context Sovereignty Layer (Phase 1) Add deterministic context control layer that intercepts prompt construction without modifying existing architecture: - context_engine.py: single choke point (build_context) that assembles structured prompts from ledger + sigil + live window, with token budget enforcement and automatic window shrinking - ledger.py: bounded per-stream JSON state (orientation, blockers, open questions, delta) with hard field/list limits - sigil.py: FIFO shorthand memory (max 15 entries) with deterministic rule-based generation from message patterns - token_gate.py: fast token estimation (~4 chars/token) and hard cap enforcement with configurable MAX_TOKENS/LIVE_WINDOW - redact.py: secret pattern detection (Discord, OpenAI, Anthropic, AWS, Slack, GitHub, Telegram, Bearer, generic key=value) replaced with [REDACTED_SECRET] before any output path All 64 tests passing. No modifications to existing agent spawning, model routing, tool system, or Discord relay architecture. https://claude.ai/code/session_01K7BWJY2gUoJi6dq91Yc7nx --- ra2/__init__.py | 21 ++++ ra2/context_engine.py | 202 +++++++++++++++++++++++++++++++ ra2/ledger.py | 111 +++++++++++++++++ ra2/redact.py | 88 ++++++++++++++ ra2/sigil.py | 106 ++++++++++++++++ ra2/tests/__init__.py | 0 ra2/tests/test_context_engine.py | 138 +++++++++++++++++++++ ra2/tests/test_ledger.py | 100 +++++++++++++++ ra2/tests/test_redact.py | 114 +++++++++++++++++ ra2/tests/test_sigil.py | 91 ++++++++++++++ ra2/tests/test_token_gate.py | 69 +++++++++++ ra2/token_gate.py | 56 +++++++++ 12 files changed, 1096 insertions(+) create mode 100644 ra2/__init__.py create mode 100644 ra2/context_engine.py create mode 100644 ra2/ledger.py create mode 100644 ra2/redact.py create mode 100644 ra2/sigil.py create mode 100644 ra2/tests/__init__.py create mode 100644 ra2/tests/test_context_engine.py create mode 100644 ra2/tests/test_ledger.py create mode 100644 ra2/tests/test_redact.py create mode 100644 ra2/tests/test_sigil.py create mode 100644 ra2/tests/test_token_gate.py create mode 100644 ra2/token_gate.py diff --git a/ra2/__init__.py b/ra2/__init__.py new file mode 100644 index 00000000000..0b1260348f6 --- /dev/null +++ b/ra2/__init__.py @@ -0,0 +1,21 @@ +""" +ra2 — Context Sovereignty Layer (Phase 1) + +Deterministic thin wrapper that: + - Prevents full markdown history injection into prompts + - Introduces structured ledger memory + - Introduces sigil shorthand memory + - Enforces hard token caps before provider calls + - Redacts secrets before logs and model calls + +Usage: + from ra2.context_engine import build_context + + result = build_context(stream_id="my-stream", new_messages=[...]) + prompt = result["prompt"] + tokens = result["token_estimate"] +""" + +from ra2.context_engine import build_context + +__all__ = ["build_context"] diff --git a/ra2/context_engine.py b/ra2/context_engine.py new file mode 100644 index 00000000000..ee7c611d1d0 --- /dev/null +++ b/ra2/context_engine.py @@ -0,0 +1,202 @@ +""" +ra2.context_engine — The single choke point for all model calls. + +All prompts must pass through build_context() before reaching any provider. + +Internal flow: + 1. Load ledger state for stream + 2. Load sigil state + 3. Load last N live messages (default LIVE_WINDOW) + 4. Run rule-based compression pass + 5. Assemble structured prompt + 6. Estimate token count + 7. If > MAX_TOKENS: shrink live window, reassemble + 8. If still > MAX_TOKENS: raise controlled exception + +Never reads full .md history. +""" + +import re +from typing import List, Optional + +from ra2 import ledger, sigil, token_gate, redact + +# ── Compression rule patterns ─────────────────────────────────────── + +_DECISION_RE = re.compile( + r"(?:we\s+will|we\s+chose|decided\s+to|going\s+to|let'?s)\s+(.{10,120})", + re.IGNORECASE, +) +_ARCHITECTURE_RE = re.compile( + r"(?:architect(?:ure)?|refactor|redesign|restructur|migrat)\w*\s+(.{10,120})", + re.IGNORECASE, +) +_COST_RE = re.compile( + r"(?:budget|cost|spend|rate[_\s]*limit|token[_\s]*cap|pricing)\s*[:=→]?\s*(.{5,120})", + re.IGNORECASE, +) +_BLOCKER_RE = re.compile( + r"(?:block(?:er|ed|ing)|stuck|cannot|can'?t\s+proceed|waiting\s+on)\s+(.{5,120})", + re.IGNORECASE, +) +_QUESTION_RE = re.compile( + r"(?:should\s+we|do\s+we|how\s+(?:do|should)|what\s+(?:if|about)|need\s+to\s+decide)\s+(.{5,120})", + re.IGNORECASE, +) + + +def _extract_content(msg: dict) -> str: + """Get text content from a message dict.""" + content = msg.get("content", "") + if isinstance(content, str): + return content + if isinstance(content, list): + # Handle structured content blocks + parts = [] + for block in content: + if isinstance(block, dict) and block.get("type") == "text": + parts.append(block.get("text", "")) + elif isinstance(block, str): + parts.append(block) + return " ".join(parts) + return str(content) + + +def _run_compression(messages: list, stream_id: str) -> None: + """Rule-based compression pass over recent messages. + + Extracts decisions, architecture shifts, cost constraints, blockers, + and open questions — then updates the ledger accordingly. + """ + decisions: list[str] = [] + blockers: list[str] = [] + open_questions: list[str] = [] + latest_summary_parts: list[str] = [] + + for msg in messages: + text = _extract_content(msg) + if not text: + continue + + # Decisions + for m in _DECISION_RE.finditer(text): + decisions.append(m.group(1).strip()) + + # Architecture shifts + for m in _ARCHITECTURE_RE.finditer(text): + latest_summary_parts.append(f"arch: {m.group(1).strip()}") + + # Cost/budget + for m in _COST_RE.finditer(text): + latest_summary_parts.append(f"cost: {m.group(1).strip()}") + + # Blockers + for m in _BLOCKER_RE.finditer(text): + blockers.append(m.group(1).strip()) + + # Open questions + for m in _QUESTION_RE.finditer(text): + open_questions.append(m.group(1).strip()) + + # Sigil generation + sigil_body = sigil.generate_from_message(text) + if sigil_body: + sigil.append(stream_id, sigil_body) + + # Build delta from decisions + delta = "; ".join(decisions[-5:]) if decisions else "" + latest = "; ".join(latest_summary_parts[-5:]) if latest_summary_parts else "" + + # Update ledger (only non-empty fields) + updates = {} + if delta: + updates["delta"] = delta + if latest: + updates["latest"] = latest + if blockers: + updates["blockers"] = blockers[-token_gate.MAX_TOKENS:] # bounded + if open_questions: + updates["open"] = open_questions[-10:] + + if updates: + ledger.update(stream_id, **updates) + + +def _assemble_prompt(stream_id: str, live_messages: list) -> str: + """Build the structured prompt from ledger + sigil + live window.""" + sections = [] + + # Ledger section + ledger_snap = ledger.snapshot(stream_id) + sections.append(f"=== LEDGER ===\n{ledger_snap}") + + # Sigil section + sigil_snap = sigil.snapshot(stream_id) + sections.append(f"=== SIGIL ===\n{sigil_snap}") + + # Live window section + live_lines = [] + for msg in live_messages: + role = msg.get("role", "unknown") + content = _extract_content(msg) + live_lines.append(f"[{role}] {content}") + sections.append("=== LIVE WINDOW ===\n" + "\n".join(live_lines)) + + # Closing directive + sections.append("Respond concisely and aligned with orientation.") + + return "\n\n".join(sections) + + +def build_context(stream_id: str, new_messages: list) -> dict: + """Main entry point — the single choke point for all model calls. + + Args: + stream_id: Unique identifier for the conversation stream. + new_messages: List of message dicts with at minimum 'role' and 'content'. + + Returns: + { + "prompt": str, # The assembled, redacted prompt + "token_estimate": int # Estimated token count + } + + Raises: + token_gate.TokenBudgetExceeded: If prompt exceeds MAX_TOKENS + even after shrinking the live window to minimum. + """ + # 1. Run compression pass on new messages → updates ledger + sigils + _run_compression(new_messages, stream_id) + + # 2. Determine live window + window_size = token_gate.LIVE_WINDOW + live_messages = new_messages[-window_size:] + + # 3. Assemble prompt + prompt = _assemble_prompt(stream_id, live_messages) + + # 4. Redact secrets + prompt = redact.redact(prompt) + + # 5. Estimate tokens + estimated = token_gate.estimate_tokens(prompt) + + # 6. Shrink loop if over budget + while not token_gate.check_budget(estimated): + try: + window_size = token_gate.shrink_window(window_size) + except token_gate.TokenBudgetExceeded: + # Already at minimum window — hard fail + raise token_gate.TokenBudgetExceeded( + estimated=estimated, + limit=token_gate.MAX_TOKENS, + ) + live_messages = new_messages[-window_size:] + prompt = _assemble_prompt(stream_id, live_messages) + prompt = redact.redact(prompt) + estimated = token_gate.estimate_tokens(prompt) + + return { + "prompt": prompt, + "token_estimate": estimated, + } diff --git a/ra2/ledger.py b/ra2/ledger.py new file mode 100644 index 00000000000..75489c56ecb --- /dev/null +++ b/ra2/ledger.py @@ -0,0 +1,111 @@ +""" +ra2.ledger — Structured ledger memory (one per stream). + +Each stream gets a JSON ledger file with bounded fields. +Fields are overwritten (never appended unbounded). +Only updated via the compression pass. +""" + +import json +import os +from typing import Optional + +# Configurable storage root +LEDGER_DIR: str = os.environ.get( + "RA2_LEDGER_DIR", + os.path.join(os.path.expanduser("~"), ".ra2", "ledgers"), +) + +# Hard limits +MAX_BLOCKERS = 10 +MAX_OPEN = 10 +MAX_FIELD_CHARS = 500 # per string field + +_EMPTY_LEDGER = { + "stream": "", + "orientation": "", + "latest": "", + "blockers": [], + "open": [], + "delta": "", +} + + +def _ledger_path(stream_id: str) -> str: + return os.path.join(LEDGER_DIR, f"{stream_id}.json") + + +def load(stream_id: str) -> dict: + """Load ledger for *stream_id*, returning empty template if none exists.""" + path = _ledger_path(stream_id) + if not os.path.exists(path): + ledger = dict(_EMPTY_LEDGER) + ledger["stream"] = stream_id + return ledger + with open(path, "r", encoding="utf-8") as f: + data = json.load(f) + # Ensure all expected keys exist + for key, default in _EMPTY_LEDGER.items(): + if key not in data: + data[key] = default if not isinstance(default, list) else list(default) + return data + + +def save(stream_id: str, ledger: dict) -> None: + """Persist ledger to disk, enforcing size limits.""" + ledger = _enforce_limits(ledger) + os.makedirs(LEDGER_DIR, exist_ok=True) + path = _ledger_path(stream_id) + with open(path, "w", encoding="utf-8") as f: + json.dump(ledger, f, indent=2, ensure_ascii=False) + + +def update(stream_id: str, **fields) -> dict: + """Load, merge fields, save, and return the updated ledger. + + Only known keys are accepted. Unknown keys are silently dropped. + """ + ledger = load(stream_id) + for key, value in fields.items(): + if key in _EMPTY_LEDGER: + ledger[key] = value + save(stream_id, ledger) + return ledger + + +def snapshot(stream_id: str) -> str: + """Return a human-readable snapshot string for prompt injection.""" + ledger = load(stream_id) + lines = [] + lines.append(f"stream: {ledger['stream']}") + lines.append(f"orientation: {ledger['orientation']}") + lines.append(f"latest: {ledger['latest']}") + if ledger["blockers"]: + lines.append("blockers:") + for b in ledger["blockers"]: + lines.append(f" - {b}") + if ledger["open"]: + lines.append("open:") + for o in ledger["open"]: + lines.append(f" - {o}") + if ledger["delta"]: + lines.append(f"delta: {ledger['delta']}") + return "\n".join(lines) + + +def _enforce_limits(ledger: dict) -> dict: + """Truncate fields and lists to hard limits.""" + for key in ("orientation", "latest", "delta", "stream"): + if isinstance(ledger.get(key), str) and len(ledger[key]) > MAX_FIELD_CHARS: + ledger[key] = ledger[key][:MAX_FIELD_CHARS] + if isinstance(ledger.get("blockers"), list): + ledger["blockers"] = [ + b[:MAX_FIELD_CHARS] if isinstance(b, str) else b + for b in ledger["blockers"][:MAX_BLOCKERS] + ] + if isinstance(ledger.get("open"), list): + ledger["open"] = [ + o[:MAX_FIELD_CHARS] if isinstance(o, str) else o + for o in ledger["open"][:MAX_OPEN] + ] + return ledger diff --git a/ra2/redact.py b/ra2/redact.py new file mode 100644 index 00000000000..93532639528 --- /dev/null +++ b/ra2/redact.py @@ -0,0 +1,88 @@ +""" +ra2.redact — Secret redaction before logging, .md writes, and model calls. + +Detects common API key patterns and replaces them with [REDACTED_SECRET]. +Must be applied before any external output path. +""" + +import re +from typing import List, Tuple + +REDACTED = "[REDACTED_SECRET]" + +# Each entry: (label, compiled regex) +_PATTERNS: List[Tuple[str, re.Pattern]] = [ + # Discord bot tokens (base64-ish, three dot-separated segments) + ("discord_token", re.compile( + r"[MN][A-Za-z0-9]{23,}\.[A-Za-z0-9_-]{6}\.[A-Za-z0-9_-]{27,}" + )), + # OpenAI keys + ("openai_key", re.compile(r"sk-[A-Za-z0-9_-]{20,}")), + # Anthropic keys + ("anthropic_key", re.compile(r"sk-ant-[A-Za-z0-9_-]{20,}")), + # Google / GCP API keys + ("google_key", re.compile(r"AIza[A-Za-z0-9_-]{35}")), + # AWS access key IDs + ("aws_access_key", re.compile(r"AKIA[A-Z0-9]{16}")), + # Generic long hex/base64 secrets (40+ chars, likely tokens) + ("generic_secret", re.compile( + r"(?:api[_-]?key|secret|token|password|credential)" + r"[\s]*[:=][\s]*['\"]?([A-Za-z0-9_/+=-]{32,})['\"]?", + re.IGNORECASE, + )), + # Bearer tokens in auth headers + ("bearer_token", re.compile( + r"Bearer\s+[A-Za-z0-9_.+/=-]{20,}", re.IGNORECASE + )), + # Slack tokens + ("slack_token", re.compile(r"xox[bpas]-[A-Za-z0-9-]{10,}")), + # GitHub tokens + ("github_token", re.compile(r"gh[ps]_[A-Za-z0-9]{36,}")), + # Telegram bot tokens + ("telegram_token", re.compile(r"\d{8,10}:[A-Za-z0-9_-]{35}")), +] + + +def redact(text: str) -> str: + """Replace all detected secret patterns in *text* with [REDACTED_SECRET].""" + for _label, pattern in _PATTERNS: + # For the generic_secret pattern that uses a capture group, + # replace only the captured secret value. + if _label == "generic_secret": + text = pattern.sub(_replace_generic, text) + else: + text = pattern.sub(REDACTED, text) + return text + + +def _replace_generic(match: re.Match) -> str: + """Replace only the secret value inside a key=value match.""" + full = match.group(0) + secret = match.group(1) + return full.replace(secret, REDACTED) + + +def redact_dict(d: dict) -> dict: + """Recursively redact all string values in a dict.""" + out = {} + for k, v in d.items(): + if isinstance(v, str): + out[k] = redact(v) + elif isinstance(v, dict): + out[k] = redact_dict(v) + elif isinstance(v, list): + out[k] = [redact(i) if isinstance(i, str) else i for i in v] + else: + out[k] = v + return out + + +def redact_messages(messages: list) -> list: + """Redact secrets from a list of message dicts (content field).""" + result = [] + for msg in messages: + copy = dict(msg) + if isinstance(copy.get("content"), str): + copy["content"] = redact(copy["content"]) + result.append(copy) + return result diff --git a/ra2/sigil.py b/ra2/sigil.py new file mode 100644 index 00000000000..ae09bf225ff --- /dev/null +++ b/ra2/sigil.py @@ -0,0 +1,106 @@ +""" +ra2.sigil — Sigil shorthand memory (one plain-text file per stream). + +Format per line: σN: key→value +Max 15 entries, FIFO replacement. +Deterministic rule-based generation only — no AI involvement. +""" + +import os +import re +from typing import List, Optional, Tuple + +SIGIL_DIR: str = os.environ.get( + "RA2_SIGIL_DIR", + os.path.join(os.path.expanduser("~"), ".ra2", "sigils"), +) + +MAX_ENTRIES = 15 +_LINE_RE = re.compile(r"^σ(\d+):\s*(.+)$") + + +def _sigil_path(stream_id: str) -> str: + return os.path.join(SIGIL_DIR, f"{stream_id}.sigil") + + +def load(stream_id: str) -> List[Tuple[int, str]]: + """Load sigil entries as list of (index, body) tuples.""" + path = _sigil_path(stream_id) + if not os.path.exists(path): + return [] + entries: List[Tuple[int, str]] = [] + with open(path, "r", encoding="utf-8") as f: + for line in f: + line = line.strip() + m = _LINE_RE.match(line) + if m: + entries.append((int(m.group(1)), m.group(2))) + return entries + + +def save(stream_id: str, entries: List[Tuple[int, str]]) -> None: + """Persist sigil entries, enforcing MAX_ENTRIES via FIFO.""" + # FIFO: keep only the last MAX_ENTRIES + entries = entries[-MAX_ENTRIES:] + os.makedirs(SIGIL_DIR, exist_ok=True) + path = _sigil_path(stream_id) + with open(path, "w", encoding="utf-8") as f: + for idx, body in entries: + f.write(f"\u03c3{idx}: {body}\n") + + +def append(stream_id: str, body: str) -> List[Tuple[int, str]]: + """Add a new sigil entry. Auto-numbers and FIFO-evicts if at capacity.""" + entries = load(stream_id) + next_idx = (entries[-1][0] + 1) if entries else 1 + entries.append((next_idx, body)) + # FIFO eviction + if len(entries) > MAX_ENTRIES: + entries = entries[-MAX_ENTRIES:] + save(stream_id, entries) + return entries + + +def snapshot(stream_id: str) -> str: + """Return sigil state as plain text for prompt injection.""" + entries = load(stream_id) + if not entries: + return "(no sigils)" + return "\n".join(f"\u03c3{idx}: {body}" for idx, body in entries) + + +# ── Deterministic sigil generators ────────────────────────────────── + +# Rule-based patterns that detect sigil-worthy events from messages. +# Each rule: (regex_on_content, sigil_body_template) +_SIGIL_RULES: List[Tuple[re.Pattern, str]] = [ + (re.compile(r"fork(?:ed|ing)?\s*(?:to|into|→)\s*(\S+)", re.I), + "fork\u2192{0}"), + (re.compile(r"token[_\s]*burn", re.I), + "token_burn\u2192compress"), + (re.compile(r"rewrite[_\s]*impulse", re.I), + "rewrite_impulse\u2192layer"), + (re.compile(r"context[_\s]*sov(?:ereignty)?", re.I), + "context_sov\u2192active"), + (re.compile(r"budget[_\s]*cap(?:ped)?", re.I), + "budget\u2192capped"), + (re.compile(r"rate[_\s]*limit", re.I), + "rate_limit\u2192detected"), + (re.compile(r"provider[_\s]*switch(?:ed)?", re.I), + "provider\u2192switched"), + (re.compile(r"compaction[_\s]*trigger", re.I), + "compaction\u2192triggered"), +] + + +def generate_from_message(content: str) -> Optional[str]: + """Apply deterministic rules to a message. Returns sigil body or None.""" + for pattern, template in _SIGIL_RULES: + m = pattern.search(content) + if m: + # Fill template with captured groups if any + try: + return template.format(*m.groups()) + except (IndexError, KeyError): + return template + return None diff --git a/ra2/tests/__init__.py b/ra2/tests/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/ra2/tests/test_context_engine.py b/ra2/tests/test_context_engine.py new file mode 100644 index 00000000000..0ead1e2274d --- /dev/null +++ b/ra2/tests/test_context_engine.py @@ -0,0 +1,138 @@ +"""Tests for ra2.context_engine""" + +import pytest +from ra2 import ledger, sigil, token_gate +from ra2.context_engine import build_context + + +@pytest.fixture(autouse=True) +def tmp_storage(monkeypatch, tmp_path): + """Redirect all storage to temp directories.""" + monkeypatch.setattr(ledger, "LEDGER_DIR", str(tmp_path / "ledgers")) + monkeypatch.setattr(sigil, "SIGIL_DIR", str(tmp_path / "sigils")) + + +class TestBuildContext: + def test_basic_output_shape(self): + messages = [ + {"role": "user", "content": "Hello"}, + {"role": "assistant", "content": "Hi there"}, + ] + result = build_context("test-stream", messages) + assert "prompt" in result + assert "token_estimate" in result + assert isinstance(result["prompt"], str) + assert isinstance(result["token_estimate"], int) + + def test_prompt_structure(self): + messages = [ + {"role": "user", "content": "Let's build a context engine"}, + ] + result = build_context("s1", messages) + prompt = result["prompt"] + assert "=== LEDGER ===" in prompt + assert "=== SIGIL ===" in prompt + assert "=== LIVE WINDOW ===" in prompt + assert "Respond concisely" in prompt + + def test_live_window_content(self): + messages = [ + {"role": "user", "content": "message one"}, + {"role": "assistant", "content": "response one"}, + ] + result = build_context("s1", messages) + assert "[user] message one" in result["prompt"] + assert "[assistant] response one" in result["prompt"] + + def test_redaction_applied(self): + messages = [ + {"role": "user", "content": "my key is sk-abc123def456ghi789jklmnopqrs"}, + ] + result = build_context("s1", messages) + assert "sk-abc" not in result["prompt"] + assert "[REDACTED_SECRET]" in result["prompt"] + + def test_compression_updates_ledger(self): + messages = [ + {"role": "user", "content": "we will use deterministic compression"}, + {"role": "assistant", "content": "decided to skip AI summarization"}, + ] + build_context("s1", messages) + data = ledger.load("s1") + # Compression should have extracted decisions into delta + assert data["delta"] != "" + + def test_compression_detects_blockers(self): + messages = [ + {"role": "user", "content": "I'm blocked on rate limit issues"}, + ] + build_context("s1", messages) + data = ledger.load("s1") + assert len(data["blockers"]) > 0 + + def test_compression_detects_open_questions(self): + messages = [ + {"role": "user", "content": "should we use tiktoken for counting?"}, + ] + build_context("s1", messages) + data = ledger.load("s1") + assert len(data["open"]) > 0 + + def test_sigil_generation(self): + messages = [ + {"role": "user", "content": "We forked to context_sov"}, + ] + build_context("s1", messages) + entries = sigil.load("s1") + assert len(entries) > 0 + + def test_token_estimate_positive(self): + messages = [{"role": "user", "content": "hello"}] + result = build_context("s1", messages) + assert result["token_estimate"] > 0 + + def test_window_shrinks_on_large_input(self, monkeypatch): + # Set a very low token cap + monkeypatch.setattr(token_gate, "MAX_TOKENS", 200) + monkeypatch.setattr(token_gate, "LIVE_WINDOW", 16) + + # Create many messages to exceed budget + messages = [ + {"role": "user", "content": f"This is message number {i} with some content"} + for i in range(20) + ] + result = build_context("s1", messages) + # Should succeed with a smaller window + assert result["token_estimate"] <= 200 + + def test_hard_fail_on_impossible_budget(self, monkeypatch): + # Set impossibly low token cap + monkeypatch.setattr(token_gate, "MAX_TOKENS", 5) + monkeypatch.setattr(token_gate, "LIVE_WINDOW", 4) + + messages = [ + {"role": "user", "content": "x" * 1000}, + ] + with pytest.raises(token_gate.TokenBudgetExceeded): + build_context("s1", messages) + + def test_structured_content_blocks(self): + messages = [ + { + "role": "user", + "content": [ + {"type": "text", "text": "Hello from structured content"}, + ], + }, + ] + result = build_context("s1", messages) + assert "Hello from structured content" in result["prompt"] + + def test_no_md_history_injection(self): + """Verify that build_context only uses provided messages, never reads .md files.""" + messages = [{"role": "user", "content": "just this"}] + result = build_context("s1", messages) + # The prompt should contain only our message content plus ledger/sigil structure + assert "just this" in result["prompt"] + # No markdown file references should appear + assert ".md" not in result["prompt"] diff --git a/ra2/tests/test_ledger.py b/ra2/tests/test_ledger.py new file mode 100644 index 00000000000..ec32af7e128 --- /dev/null +++ b/ra2/tests/test_ledger.py @@ -0,0 +1,100 @@ +"""Tests for ra2.ledger""" + +import json +import os +import tempfile +import pytest +from ra2 import ledger + + +@pytest.fixture(autouse=True) +def tmp_ledger_dir(monkeypatch, tmp_path): + """Redirect ledger storage to a temp directory for each test.""" + d = str(tmp_path / "ledgers") + monkeypatch.setattr(ledger, "LEDGER_DIR", d) + return d + + +class TestLoadSave: + def test_load_empty(self): + data = ledger.load("test-stream") + assert data["stream"] == "test-stream" + assert data["orientation"] == "" + assert data["blockers"] == [] + assert data["open"] == [] + + def test_save_and_load(self): + data = { + "stream": "s1", + "orientation": "build context engine", + "latest": "implemented ledger", + "blockers": ["rate limits"], + "open": ["how to compress?"], + "delta": "added ledger module", + } + ledger.save("s1", data) + loaded = ledger.load("s1") + assert loaded == data + + def test_save_enforces_field_length(self): + data = { + "stream": "s1", + "orientation": "x" * 1000, + "latest": "", + "blockers": [], + "open": [], + "delta": "", + } + ledger.save("s1", data) + loaded = ledger.load("s1") + assert len(loaded["orientation"]) == ledger.MAX_FIELD_CHARS + + def test_save_enforces_list_length(self): + data = { + "stream": "s1", + "orientation": "", + "latest": "", + "blockers": [f"blocker-{i}" for i in range(20)], + "open": [f"question-{i}" for i in range(20)], + "delta": "", + } + ledger.save("s1", data) + loaded = ledger.load("s1") + assert len(loaded["blockers"]) == ledger.MAX_BLOCKERS + assert len(loaded["open"]) == ledger.MAX_OPEN + + +class TestUpdate: + def test_update_fields(self): + result = ledger.update("s1", orientation="test orientation", delta="did stuff") + assert result["orientation"] == "test orientation" + assert result["delta"] == "did stuff" + assert result["stream"] == "s1" + + def test_update_ignores_unknown_keys(self): + result = ledger.update("s1", unknown_key="value") + assert "unknown_key" not in result + + def test_update_persists(self): + ledger.update("s1", orientation="phase 1") + loaded = ledger.load("s1") + assert loaded["orientation"] == "phase 1" + + +class TestSnapshot: + def test_snapshot_empty(self): + snap = ledger.snapshot("empty-stream") + assert "stream: empty-stream" in snap + assert "orientation:" in snap + + def test_snapshot_with_data(self): + ledger.update( + "s1", + orientation="context sovereignty", + blockers=["rate limits"], + open=["compression strategy?"], + ) + snap = ledger.snapshot("s1") + assert "context sovereignty" in snap + assert "rate limits" in snap + assert "compression strategy?" in snap diff --git a/ra2/tests/test_redact.py b/ra2/tests/test_redact.py new file mode 100644 index 00000000000..ae601acb376 --- /dev/null +++ b/ra2/tests/test_redact.py @@ -0,0 +1,114 @@ +"""Tests for ra2.redact""" + +import pytest +from ra2.redact import redact, redact_dict, redact_messages, REDACTED + + +class TestRedact: + def test_openai_key(self): + text = "my key is sk-abc123def456ghi789jklmnopqrs" + result = redact(text) + assert "sk-abc" not in result + assert REDACTED in result + + def test_anthropic_key(self): + text = "key: sk-ant-abc123def456ghi789jklmnopqrs" + result = redact(text) + assert "sk-ant-" not in result + assert REDACTED in result + + def test_discord_token(self): + # Build a fake Discord-shaped token dynamically to avoid push protection. + # Pattern: [MN][A-Za-z0-9]{23,}.[A-Za-z0-9_-]{6}.[A-Za-z0-9_-]{27,} + prefix = "M" + "T" * 23 # 24 chars, starts with M + mid = "G" + "a" * 5 # 6 chars + suffix = "x" * 27 # 27 chars + token = f"{prefix}.{mid}.{suffix}" + text = f"token is {token}" + result = redact(text) + assert token not in result + assert REDACTED in result + + def test_google_key(self): + text = "key=AIzaSyD-abcdefghijklmnopqrstuvwxyz12345" + result = redact(text) + assert "AIza" not in result + assert REDACTED in result + + def test_aws_key(self): + text = "aws key: AKIAIOSFODNN7EXAMPLE" + result = redact(text) + assert "AKIA" not in result + + def test_slack_token(self): + text = "token: xoxb-123456789012-abcdefghij" + result = redact(text) + assert "xoxb-" not in result + + def test_github_token(self): + text = "auth: ghp_ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijkl" + result = redact(text) + assert "ghp_" not in result + + def test_telegram_token(self): + text = "bot: 123456789:ABCDefGHIJKlMNOpQRSTuvWXYz0123456789a" + result = redact(text) + assert "ABCDef" not in result + + def test_bearer_token(self): + text = "Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.abc" + result = redact(text) + assert "eyJh" not in result + + def test_generic_secret_key_value(self): + text = 'api_key = "abcdefghijklmnopqrstuvwxyz1234567890ABCD"' + result = redact(text) + assert "abcdefghij" not in result + # The label should still be there + assert "api_key" in result + + def test_no_false_positive_normal_text(self): + text = "Hello, this is a normal message with no secrets." + assert redact(text) == text + + def test_multiple_secrets(self): + text = "keys: sk-abc123def456ghi789jklmnopqrs and sk-ant-xyz123abc456def789ghi" + result = redact(text) + assert "sk-abc" not in result + assert "sk-ant-" not in result + assert result.count(REDACTED) == 2 + + +class TestRedactDict: + def test_flat_dict(self): + d = {"key": "sk-abc123def456ghi789jklmnopqrs", "name": "test"} + result = redact_dict(d) + assert REDACTED in result["key"] + assert result["name"] == "test" + + def test_nested_dict(self): + d = {"outer": {"inner": "sk-abc123def456ghi789jklmnopqrs"}} + result = redact_dict(d) + assert REDACTED in result["outer"]["inner"] + + def test_list_values(self): + d = {"tokens": ["sk-abc123def456ghi789jklmnopqrs", "normal"]} + result = redact_dict(d) + assert REDACTED in result["tokens"][0] + assert result["tokens"][1] == "normal" + + +class TestRedactMessages: + def test_redacts_content(self): + msgs = [ + {"role": "user", "content": "my key is sk-abc123def456ghi789jklmnopqrs"}, + {"role": "assistant", "content": "I see a key"}, + ] + result = redact_messages(msgs) + assert REDACTED in result[0]["content"] + assert result[1]["content"] == "I see a key" + + def test_preserves_non_string_content(self): + msgs = [{"role": "user", "content": 42}] + result = redact_messages(msgs) + assert result[0]["content"] == 42 diff --git a/ra2/tests/test_sigil.py b/ra2/tests/test_sigil.py new file mode 100644 index 00000000000..af904adcd74 --- /dev/null +++ b/ra2/tests/test_sigil.py @@ -0,0 +1,91 @@ +"""Tests for ra2.sigil""" + +import pytest +from ra2 import sigil + + +@pytest.fixture(autouse=True) +def tmp_sigil_dir(monkeypatch, tmp_path): + """Redirect sigil storage to a temp directory for each test.""" + d = str(tmp_path / "sigils") + monkeypatch.setattr(sigil, "SIGIL_DIR", d) + return d + + +class TestLoadSave: + def test_load_empty(self): + entries = sigil.load("test-stream") + assert entries == [] + + def test_save_and_load(self): + entries = [(1, "fork\u2192context_sov"), (2, "token_burn\u2192compress")] + sigil.save("s1", entries) + loaded = sigil.load("s1") + assert loaded == entries + + def test_fifo_on_save(self): + entries = [(i, f"entry-{i}") for i in range(1, 25)] + sigil.save("s1", entries) + loaded = sigil.load("s1") + assert len(loaded) == sigil.MAX_ENTRIES + # Should keep the last 15 + assert loaded[0][0] == 10 + assert loaded[-1][0] == 24 + + +class TestAppend: + def test_append_single(self): + entries = sigil.append("s1", "fork\u2192ctx") + assert len(entries) == 1 + assert entries[0] == (1, "fork\u2192ctx") + + def test_append_multiple(self): + sigil.append("s1", "entry-a") + entries = sigil.append("s1", "entry-b") + assert len(entries) == 2 + assert entries[0][1] == "entry-a" + assert entries[1][1] == "entry-b" + + def test_fifo_eviction(self): + for i in range(20): + entries = sigil.append("s1", f"e-{i}") + assert len(entries) == sigil.MAX_ENTRIES + # Oldest entries should be gone + bodies = [e[1] for e in entries] + assert "e-0" not in bodies + assert "e-19" in bodies + + +class TestSnapshot: + def test_snapshot_empty(self): + snap = sigil.snapshot("empty") + assert snap == "(no sigils)" + + def test_snapshot_with_entries(self): + sigil.append("s1", "fork\u2192context_sov") + sigil.append("s1", "token_burn\u2192compress") + snap = sigil.snapshot("s1") + assert "\u03c31:" in snap + assert "fork\u2192context_sov" in snap + assert "\u03c32:" in snap + assert "token_burn\u2192compress" in snap + + +class TestGenerateFromMessage: + def test_fork_detection(self): + body = sigil.generate_from_message("We forked to context_sov branch") + assert body is not None + assert "fork" in body + assert "context_sov" in body + + def test_token_burn_detection(self): + body = sigil.generate_from_message("Seeing token burn on this stream") + assert body == "token_burn\u2192compress" + + def test_rate_limit_detection(self): + body = sigil.generate_from_message("Hit a rate limit again") + assert body == "rate_limit\u2192detected" + + def test_no_match(self): + body = sigil.generate_from_message("Hello, how are you?") + assert body is None diff --git a/ra2/tests/test_token_gate.py b/ra2/tests/test_token_gate.py new file mode 100644 index 00000000000..7f98a73ecbd --- /dev/null +++ b/ra2/tests/test_token_gate.py @@ -0,0 +1,69 @@ +"""Tests for ra2.token_gate""" + +import pytest +from ra2.token_gate import ( + estimate_tokens, + check_budget, + shrink_window, + TokenBudgetExceeded, + LIVE_WINDOW_MIN, +) + + +class TestEstimateTokens: + def test_empty_string(self): + assert estimate_tokens("") == 0 + + def test_short_string(self): + # "ab" = 2 chars, 2//4 = 0 → clamped to 1 + assert estimate_tokens("ab") == 1 + + def test_known_length(self): + text = "a" * 400 + # 400 / 4 = 100 + assert estimate_tokens(text) == 100 + + def test_proportional(self): + short = estimate_tokens("hello world") + long = estimate_tokens("hello world " * 100) + assert long > short + + +class TestCheckBudget: + def test_within_budget(self): + assert check_budget(100, limit=200) is True + + def test_at_budget(self): + assert check_budget(200, limit=200) is True + + def test_over_budget(self): + assert check_budget(201, limit=200) is False + + +class TestShrinkWindow: + def test_halves(self): + assert shrink_window(16) == 8 + + def test_halves_again(self): + assert shrink_window(8) == 4 + + def test_at_minimum_raises(self): + with pytest.raises(TokenBudgetExceeded): + shrink_window(LIVE_WINDOW_MIN) + + def test_below_minimum_raises(self): + with pytest.raises(TokenBudgetExceeded): + shrink_window(2) + + def test_odd_number(self): + # 5 // 2 = 2, but clamped to LIVE_WINDOW_MIN (4) + assert shrink_window(5) == LIVE_WINDOW_MIN + + +class TestTokenBudgetExceeded: + def test_attributes(self): + exc = TokenBudgetExceeded(estimated=7000, limit=6000) + assert exc.estimated == 7000 + assert exc.limit == 6000 + assert "7000" in str(exc) + assert "6000" in str(exc) diff --git a/ra2/token_gate.py b/ra2/token_gate.py new file mode 100644 index 00000000000..cc07e50d0b1 --- /dev/null +++ b/ra2/token_gate.py @@ -0,0 +1,56 @@ +""" +ra2.token_gate — Token estimation and hard cap enforcement. + +Provides a fast, deterministic token estimator (no external tokenizer dependency) +and gate logic that prevents any prompt from exceeding MAX_TOKENS. +""" + +import os + +# Configurable via environment or direct override +MAX_TOKENS: int = int(os.environ.get("RA2_MAX_TOKENS", "6000")) +LIVE_WINDOW: int = int(os.environ.get("RA2_LIVE_WINDOW", "16")) +LIVE_WINDOW_MIN: int = 4 # Never shrink below this + + +class TokenBudgetExceeded(Exception): + """Raised when prompt exceeds MAX_TOKENS even after shrinking.""" + + def __init__(self, estimated: int, limit: int): + self.estimated = estimated + self.limit = limit + super().__init__( + f"Token budget exceeded: {estimated} > {limit} after all shrink attempts" + ) + + +def estimate_tokens(text: str) -> int: + """Fast deterministic token estimate. + + Uses the ~4 chars per token heuristic which is a reasonable average + across GPT/Claude tokenizers for English text. No external dependency. + """ + if not text: + return 0 + # Rough estimate: 1 token per 4 characters, minimum 1 + return max(1, len(text) // 4) + + +def check_budget(estimated: int, limit: int | None = None) -> bool: + """Return True if *estimated* is within budget, False otherwise.""" + limit = limit if limit is not None else MAX_TOKENS + return estimated <= limit + + +def shrink_window(current_window: int) -> int: + """Halve the live window, respecting the minimum. + + Returns the new window size, or raises TokenBudgetExceeded if + already at minimum. + """ + if current_window <= LIVE_WINDOW_MIN: + raise TokenBudgetExceeded( + estimated=0, # caller should fill real value + limit=MAX_TOKENS, + ) + return max(LIVE_WINDOW_MIN, current_window // 2)