feat(ra2): implement Context Sovereignty Layer (Phase 1)

Add deterministic context control layer that intercepts prompt
construction without modifying existing architecture:

- context_engine.py: single choke point (build_context) that assembles
  structured prompts from ledger + sigil + live window, with token
  budget enforcement and automatic window shrinking
- ledger.py: bounded per-stream JSON state (orientation, blockers,
  open questions, delta) with hard field/list limits
- sigil.py: FIFO shorthand memory (max 15 entries) with deterministic
  rule-based generation from message patterns
- token_gate.py: fast token estimation (~4 chars/token) and hard cap
  enforcement with configurable MAX_TOKENS/LIVE_WINDOW
- redact.py: secret pattern detection (Discord, OpenAI, Anthropic,
  AWS, Slack, GitHub, Telegram, Bearer, generic key=value) replaced
  with [REDACTED_SECRET] before any output path

All 64 tests passing. No modifications to existing agent spawning,
model routing, tool system, or Discord relay architecture.

https://claude.ai/code/session_01K7BWJY2gUoJi6dq91Yc7nx
This commit is contained in:
Claude 2026-02-19 22:41:23 +00:00
parent 6cdcb5904d
commit 56d19a0130
No known key found for this signature in database
12 changed files with 1096 additions and 0 deletions

21
ra2/__init__.py Normal file
View File

@ -0,0 +1,21 @@
"""
ra2 Context Sovereignty Layer (Phase 1)
Deterministic thin wrapper that:
- Prevents full markdown history injection into prompts
- Introduces structured ledger memory
- Introduces sigil shorthand memory
- Enforces hard token caps before provider calls
- Redacts secrets before logs and model calls
Usage:
from ra2.context_engine import build_context
result = build_context(stream_id="my-stream", new_messages=[...])
prompt = result["prompt"]
tokens = result["token_estimate"]
"""
from ra2.context_engine import build_context
__all__ = ["build_context"]

202
ra2/context_engine.py Normal file
View File

@ -0,0 +1,202 @@
"""
ra2.context_engine The single choke point for all model calls.
All prompts must pass through build_context() before reaching any provider.
Internal flow:
1. Load ledger state for stream
2. Load sigil state
3. Load last N live messages (default LIVE_WINDOW)
4. Run rule-based compression pass
5. Assemble structured prompt
6. Estimate token count
7. If > MAX_TOKENS: shrink live window, reassemble
8. If still > MAX_TOKENS: raise controlled exception
Never reads full .md history.
"""
import re
from typing import List, Optional
from ra2 import ledger, sigil, token_gate, redact
# ── Compression rule patterns ───────────────────────────────────────
_DECISION_RE = re.compile(
r"(?:we\s+will|we\s+chose|decided\s+to|going\s+to|let'?s)\s+(.{10,120})",
re.IGNORECASE,
)
_ARCHITECTURE_RE = re.compile(
r"(?:architect(?:ure)?|refactor|redesign|restructur|migrat)\w*\s+(.{10,120})",
re.IGNORECASE,
)
_COST_RE = re.compile(
r"(?:budget|cost|spend|rate[_\s]*limit|token[_\s]*cap|pricing)\s*[:=→]?\s*(.{5,120})",
re.IGNORECASE,
)
_BLOCKER_RE = re.compile(
r"(?:block(?:er|ed|ing)|stuck|cannot|can'?t\s+proceed|waiting\s+on)\s+(.{5,120})",
re.IGNORECASE,
)
_QUESTION_RE = re.compile(
r"(?:should\s+we|do\s+we|how\s+(?:do|should)|what\s+(?:if|about)|need\s+to\s+decide)\s+(.{5,120})",
re.IGNORECASE,
)
def _extract_content(msg: dict) -> str:
"""Get text content from a message dict."""
content = msg.get("content", "")
if isinstance(content, str):
return content
if isinstance(content, list):
# Handle structured content blocks
parts = []
for block in content:
if isinstance(block, dict) and block.get("type") == "text":
parts.append(block.get("text", ""))
elif isinstance(block, str):
parts.append(block)
return " ".join(parts)
return str(content)
def _run_compression(messages: list, stream_id: str) -> None:
"""Rule-based compression pass over recent messages.
Extracts decisions, architecture shifts, cost constraints, blockers,
and open questions then updates the ledger accordingly.
"""
decisions: list[str] = []
blockers: list[str] = []
open_questions: list[str] = []
latest_summary_parts: list[str] = []
for msg in messages:
text = _extract_content(msg)
if not text:
continue
# Decisions
for m in _DECISION_RE.finditer(text):
decisions.append(m.group(1).strip())
# Architecture shifts
for m in _ARCHITECTURE_RE.finditer(text):
latest_summary_parts.append(f"arch: {m.group(1).strip()}")
# Cost/budget
for m in _COST_RE.finditer(text):
latest_summary_parts.append(f"cost: {m.group(1).strip()}")
# Blockers
for m in _BLOCKER_RE.finditer(text):
blockers.append(m.group(1).strip())
# Open questions
for m in _QUESTION_RE.finditer(text):
open_questions.append(m.group(1).strip())
# Sigil generation
sigil_body = sigil.generate_from_message(text)
if sigil_body:
sigil.append(stream_id, sigil_body)
# Build delta from decisions
delta = "; ".join(decisions[-5:]) if decisions else ""
latest = "; ".join(latest_summary_parts[-5:]) if latest_summary_parts else ""
# Update ledger (only non-empty fields)
updates = {}
if delta:
updates["delta"] = delta
if latest:
updates["latest"] = latest
if blockers:
updates["blockers"] = blockers[-token_gate.MAX_TOKENS:] # bounded
if open_questions:
updates["open"] = open_questions[-10:]
if updates:
ledger.update(stream_id, **updates)
def _assemble_prompt(stream_id: str, live_messages: list) -> str:
"""Build the structured prompt from ledger + sigil + live window."""
sections = []
# Ledger section
ledger_snap = ledger.snapshot(stream_id)
sections.append(f"=== LEDGER ===\n{ledger_snap}")
# Sigil section
sigil_snap = sigil.snapshot(stream_id)
sections.append(f"=== SIGIL ===\n{sigil_snap}")
# Live window section
live_lines = []
for msg in live_messages:
role = msg.get("role", "unknown")
content = _extract_content(msg)
live_lines.append(f"[{role}] {content}")
sections.append("=== LIVE WINDOW ===\n" + "\n".join(live_lines))
# Closing directive
sections.append("Respond concisely and aligned with orientation.")
return "\n\n".join(sections)
def build_context(stream_id: str, new_messages: list) -> dict:
"""Main entry point — the single choke point for all model calls.
Args:
stream_id: Unique identifier for the conversation stream.
new_messages: List of message dicts with at minimum 'role' and 'content'.
Returns:
{
"prompt": str, # The assembled, redacted prompt
"token_estimate": int # Estimated token count
}
Raises:
token_gate.TokenBudgetExceeded: If prompt exceeds MAX_TOKENS
even after shrinking the live window to minimum.
"""
# 1. Run compression pass on new messages → updates ledger + sigils
_run_compression(new_messages, stream_id)
# 2. Determine live window
window_size = token_gate.LIVE_WINDOW
live_messages = new_messages[-window_size:]
# 3. Assemble prompt
prompt = _assemble_prompt(stream_id, live_messages)
# 4. Redact secrets
prompt = redact.redact(prompt)
# 5. Estimate tokens
estimated = token_gate.estimate_tokens(prompt)
# 6. Shrink loop if over budget
while not token_gate.check_budget(estimated):
try:
window_size = token_gate.shrink_window(window_size)
except token_gate.TokenBudgetExceeded:
# Already at minimum window — hard fail
raise token_gate.TokenBudgetExceeded(
estimated=estimated,
limit=token_gate.MAX_TOKENS,
)
live_messages = new_messages[-window_size:]
prompt = _assemble_prompt(stream_id, live_messages)
prompt = redact.redact(prompt)
estimated = token_gate.estimate_tokens(prompt)
return {
"prompt": prompt,
"token_estimate": estimated,
}

111
ra2/ledger.py Normal file
View File

@ -0,0 +1,111 @@
"""
ra2.ledger Structured ledger memory (one per stream).
Each stream gets a JSON ledger file with bounded fields.
Fields are overwritten (never appended unbounded).
Only updated via the compression pass.
"""
import json
import os
from typing import Optional
# Configurable storage root
LEDGER_DIR: str = os.environ.get(
"RA2_LEDGER_DIR",
os.path.join(os.path.expanduser("~"), ".ra2", "ledgers"),
)
# Hard limits
MAX_BLOCKERS = 10
MAX_OPEN = 10
MAX_FIELD_CHARS = 500 # per string field
_EMPTY_LEDGER = {
"stream": "",
"orientation": "",
"latest": "",
"blockers": [],
"open": [],
"delta": "",
}
def _ledger_path(stream_id: str) -> str:
return os.path.join(LEDGER_DIR, f"{stream_id}.json")
def load(stream_id: str) -> dict:
"""Load ledger for *stream_id*, returning empty template if none exists."""
path = _ledger_path(stream_id)
if not os.path.exists(path):
ledger = dict(_EMPTY_LEDGER)
ledger["stream"] = stream_id
return ledger
with open(path, "r", encoding="utf-8") as f:
data = json.load(f)
# Ensure all expected keys exist
for key, default in _EMPTY_LEDGER.items():
if key not in data:
data[key] = default if not isinstance(default, list) else list(default)
return data
def save(stream_id: str, ledger: dict) -> None:
"""Persist ledger to disk, enforcing size limits."""
ledger = _enforce_limits(ledger)
os.makedirs(LEDGER_DIR, exist_ok=True)
path = _ledger_path(stream_id)
with open(path, "w", encoding="utf-8") as f:
json.dump(ledger, f, indent=2, ensure_ascii=False)
def update(stream_id: str, **fields) -> dict:
"""Load, merge fields, save, and return the updated ledger.
Only known keys are accepted. Unknown keys are silently dropped.
"""
ledger = load(stream_id)
for key, value in fields.items():
if key in _EMPTY_LEDGER:
ledger[key] = value
save(stream_id, ledger)
return ledger
def snapshot(stream_id: str) -> str:
"""Return a human-readable snapshot string for prompt injection."""
ledger = load(stream_id)
lines = []
lines.append(f"stream: {ledger['stream']}")
lines.append(f"orientation: {ledger['orientation']}")
lines.append(f"latest: {ledger['latest']}")
if ledger["blockers"]:
lines.append("blockers:")
for b in ledger["blockers"]:
lines.append(f" - {b}")
if ledger["open"]:
lines.append("open:")
for o in ledger["open"]:
lines.append(f" - {o}")
if ledger["delta"]:
lines.append(f"delta: {ledger['delta']}")
return "\n".join(lines)
def _enforce_limits(ledger: dict) -> dict:
"""Truncate fields and lists to hard limits."""
for key in ("orientation", "latest", "delta", "stream"):
if isinstance(ledger.get(key), str) and len(ledger[key]) > MAX_FIELD_CHARS:
ledger[key] = ledger[key][:MAX_FIELD_CHARS]
if isinstance(ledger.get("blockers"), list):
ledger["blockers"] = [
b[:MAX_FIELD_CHARS] if isinstance(b, str) else b
for b in ledger["blockers"][:MAX_BLOCKERS]
]
if isinstance(ledger.get("open"), list):
ledger["open"] = [
o[:MAX_FIELD_CHARS] if isinstance(o, str) else o
for o in ledger["open"][:MAX_OPEN]
]
return ledger

88
ra2/redact.py Normal file
View File

@ -0,0 +1,88 @@
"""
ra2.redact Secret redaction before logging, .md writes, and model calls.
Detects common API key patterns and replaces them with [REDACTED_SECRET].
Must be applied before any external output path.
"""
import re
from typing import List, Tuple
REDACTED = "[REDACTED_SECRET]"
# Each entry: (label, compiled regex)
_PATTERNS: List[Tuple[str, re.Pattern]] = [
# Discord bot tokens (base64-ish, three dot-separated segments)
("discord_token", re.compile(
r"[MN][A-Za-z0-9]{23,}\.[A-Za-z0-9_-]{6}\.[A-Za-z0-9_-]{27,}"
)),
# OpenAI keys
("openai_key", re.compile(r"sk-[A-Za-z0-9_-]{20,}")),
# Anthropic keys
("anthropic_key", re.compile(r"sk-ant-[A-Za-z0-9_-]{20,}")),
# Google / GCP API keys
("google_key", re.compile(r"AIza[A-Za-z0-9_-]{35}")),
# AWS access key IDs
("aws_access_key", re.compile(r"AKIA[A-Z0-9]{16}")),
# Generic long hex/base64 secrets (40+ chars, likely tokens)
("generic_secret", re.compile(
r"(?:api[_-]?key|secret|token|password|credential)"
r"[\s]*[:=][\s]*['\"]?([A-Za-z0-9_/+=-]{32,})['\"]?",
re.IGNORECASE,
)),
# Bearer tokens in auth headers
("bearer_token", re.compile(
r"Bearer\s+[A-Za-z0-9_.+/=-]{20,}", re.IGNORECASE
)),
# Slack tokens
("slack_token", re.compile(r"xox[bpas]-[A-Za-z0-9-]{10,}")),
# GitHub tokens
("github_token", re.compile(r"gh[ps]_[A-Za-z0-9]{36,}")),
# Telegram bot tokens
("telegram_token", re.compile(r"\d{8,10}:[A-Za-z0-9_-]{35}")),
]
def redact(text: str) -> str:
"""Replace all detected secret patterns in *text* with [REDACTED_SECRET]."""
for _label, pattern in _PATTERNS:
# For the generic_secret pattern that uses a capture group,
# replace only the captured secret value.
if _label == "generic_secret":
text = pattern.sub(_replace_generic, text)
else:
text = pattern.sub(REDACTED, text)
return text
def _replace_generic(match: re.Match) -> str:
"""Replace only the secret value inside a key=value match."""
full = match.group(0)
secret = match.group(1)
return full.replace(secret, REDACTED)
def redact_dict(d: dict) -> dict:
"""Recursively redact all string values in a dict."""
out = {}
for k, v in d.items():
if isinstance(v, str):
out[k] = redact(v)
elif isinstance(v, dict):
out[k] = redact_dict(v)
elif isinstance(v, list):
out[k] = [redact(i) if isinstance(i, str) else i for i in v]
else:
out[k] = v
return out
def redact_messages(messages: list) -> list:
"""Redact secrets from a list of message dicts (content field)."""
result = []
for msg in messages:
copy = dict(msg)
if isinstance(copy.get("content"), str):
copy["content"] = redact(copy["content"])
result.append(copy)
return result

106
ra2/sigil.py Normal file
View File

@ -0,0 +1,106 @@
"""
ra2.sigil Sigil shorthand memory (one plain-text file per stream).
Format per line: σN: keyvalue
Max 15 entries, FIFO replacement.
Deterministic rule-based generation only no AI involvement.
"""
import os
import re
from typing import List, Optional, Tuple
SIGIL_DIR: str = os.environ.get(
"RA2_SIGIL_DIR",
os.path.join(os.path.expanduser("~"), ".ra2", "sigils"),
)
MAX_ENTRIES = 15
_LINE_RE = re.compile(r"^σ(\d+):\s*(.+)$")
def _sigil_path(stream_id: str) -> str:
return os.path.join(SIGIL_DIR, f"{stream_id}.sigil")
def load(stream_id: str) -> List[Tuple[int, str]]:
"""Load sigil entries as list of (index, body) tuples."""
path = _sigil_path(stream_id)
if not os.path.exists(path):
return []
entries: List[Tuple[int, str]] = []
with open(path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
m = _LINE_RE.match(line)
if m:
entries.append((int(m.group(1)), m.group(2)))
return entries
def save(stream_id: str, entries: List[Tuple[int, str]]) -> None:
"""Persist sigil entries, enforcing MAX_ENTRIES via FIFO."""
# FIFO: keep only the last MAX_ENTRIES
entries = entries[-MAX_ENTRIES:]
os.makedirs(SIGIL_DIR, exist_ok=True)
path = _sigil_path(stream_id)
with open(path, "w", encoding="utf-8") as f:
for idx, body in entries:
f.write(f"\u03c3{idx}: {body}\n")
def append(stream_id: str, body: str) -> List[Tuple[int, str]]:
"""Add a new sigil entry. Auto-numbers and FIFO-evicts if at capacity."""
entries = load(stream_id)
next_idx = (entries[-1][0] + 1) if entries else 1
entries.append((next_idx, body))
# FIFO eviction
if len(entries) > MAX_ENTRIES:
entries = entries[-MAX_ENTRIES:]
save(stream_id, entries)
return entries
def snapshot(stream_id: str) -> str:
"""Return sigil state as plain text for prompt injection."""
entries = load(stream_id)
if not entries:
return "(no sigils)"
return "\n".join(f"\u03c3{idx}: {body}" for idx, body in entries)
# ── Deterministic sigil generators ──────────────────────────────────
# Rule-based patterns that detect sigil-worthy events from messages.
# Each rule: (regex_on_content, sigil_body_template)
_SIGIL_RULES: List[Tuple[re.Pattern, str]] = [
(re.compile(r"fork(?:ed|ing)?\s*(?:to|into|→)\s*(\S+)", re.I),
"fork\u2192{0}"),
(re.compile(r"token[_\s]*burn", re.I),
"token_burn\u2192compress"),
(re.compile(r"rewrite[_\s]*impulse", re.I),
"rewrite_impulse\u2192layer"),
(re.compile(r"context[_\s]*sov(?:ereignty)?", re.I),
"context_sov\u2192active"),
(re.compile(r"budget[_\s]*cap(?:ped)?", re.I),
"budget\u2192capped"),
(re.compile(r"rate[_\s]*limit", re.I),
"rate_limit\u2192detected"),
(re.compile(r"provider[_\s]*switch(?:ed)?", re.I),
"provider\u2192switched"),
(re.compile(r"compaction[_\s]*trigger", re.I),
"compaction\u2192triggered"),
]
def generate_from_message(content: str) -> Optional[str]:
"""Apply deterministic rules to a message. Returns sigil body or None."""
for pattern, template in _SIGIL_RULES:
m = pattern.search(content)
if m:
# Fill template with captured groups if any
try:
return template.format(*m.groups())
except (IndexError, KeyError):
return template
return None

0
ra2/tests/__init__.py Normal file
View File

View File

@ -0,0 +1,138 @@
"""Tests for ra2.context_engine"""
import pytest
from ra2 import ledger, sigil, token_gate
from ra2.context_engine import build_context
@pytest.fixture(autouse=True)
def tmp_storage(monkeypatch, tmp_path):
"""Redirect all storage to temp directories."""
monkeypatch.setattr(ledger, "LEDGER_DIR", str(tmp_path / "ledgers"))
monkeypatch.setattr(sigil, "SIGIL_DIR", str(tmp_path / "sigils"))
class TestBuildContext:
def test_basic_output_shape(self):
messages = [
{"role": "user", "content": "Hello"},
{"role": "assistant", "content": "Hi there"},
]
result = build_context("test-stream", messages)
assert "prompt" in result
assert "token_estimate" in result
assert isinstance(result["prompt"], str)
assert isinstance(result["token_estimate"], int)
def test_prompt_structure(self):
messages = [
{"role": "user", "content": "Let's build a context engine"},
]
result = build_context("s1", messages)
prompt = result["prompt"]
assert "=== LEDGER ===" in prompt
assert "=== SIGIL ===" in prompt
assert "=== LIVE WINDOW ===" in prompt
assert "Respond concisely" in prompt
def test_live_window_content(self):
messages = [
{"role": "user", "content": "message one"},
{"role": "assistant", "content": "response one"},
]
result = build_context("s1", messages)
assert "[user] message one" in result["prompt"]
assert "[assistant] response one" in result["prompt"]
def test_redaction_applied(self):
messages = [
{"role": "user", "content": "my key is sk-abc123def456ghi789jklmnopqrs"},
]
result = build_context("s1", messages)
assert "sk-abc" not in result["prompt"]
assert "[REDACTED_SECRET]" in result["prompt"]
def test_compression_updates_ledger(self):
messages = [
{"role": "user", "content": "we will use deterministic compression"},
{"role": "assistant", "content": "decided to skip AI summarization"},
]
build_context("s1", messages)
data = ledger.load("s1")
# Compression should have extracted decisions into delta
assert data["delta"] != ""
def test_compression_detects_blockers(self):
messages = [
{"role": "user", "content": "I'm blocked on rate limit issues"},
]
build_context("s1", messages)
data = ledger.load("s1")
assert len(data["blockers"]) > 0
def test_compression_detects_open_questions(self):
messages = [
{"role": "user", "content": "should we use tiktoken for counting?"},
]
build_context("s1", messages)
data = ledger.load("s1")
assert len(data["open"]) > 0
def test_sigil_generation(self):
messages = [
{"role": "user", "content": "We forked to context_sov"},
]
build_context("s1", messages)
entries = sigil.load("s1")
assert len(entries) > 0
def test_token_estimate_positive(self):
messages = [{"role": "user", "content": "hello"}]
result = build_context("s1", messages)
assert result["token_estimate"] > 0
def test_window_shrinks_on_large_input(self, monkeypatch):
# Set a very low token cap
monkeypatch.setattr(token_gate, "MAX_TOKENS", 200)
monkeypatch.setattr(token_gate, "LIVE_WINDOW", 16)
# Create many messages to exceed budget
messages = [
{"role": "user", "content": f"This is message number {i} with some content"}
for i in range(20)
]
result = build_context("s1", messages)
# Should succeed with a smaller window
assert result["token_estimate"] <= 200
def test_hard_fail_on_impossible_budget(self, monkeypatch):
# Set impossibly low token cap
monkeypatch.setattr(token_gate, "MAX_TOKENS", 5)
monkeypatch.setattr(token_gate, "LIVE_WINDOW", 4)
messages = [
{"role": "user", "content": "x" * 1000},
]
with pytest.raises(token_gate.TokenBudgetExceeded):
build_context("s1", messages)
def test_structured_content_blocks(self):
messages = [
{
"role": "user",
"content": [
{"type": "text", "text": "Hello from structured content"},
],
},
]
result = build_context("s1", messages)
assert "Hello from structured content" in result["prompt"]
def test_no_md_history_injection(self):
"""Verify that build_context only uses provided messages, never reads .md files."""
messages = [{"role": "user", "content": "just this"}]
result = build_context("s1", messages)
# The prompt should contain only our message content plus ledger/sigil structure
assert "just this" in result["prompt"]
# No markdown file references should appear
assert ".md" not in result["prompt"]

100
ra2/tests/test_ledger.py Normal file
View File

@ -0,0 +1,100 @@
"""Tests for ra2.ledger"""
import json
import os
import tempfile
import pytest
from ra2 import ledger
@pytest.fixture(autouse=True)
def tmp_ledger_dir(monkeypatch, tmp_path):
"""Redirect ledger storage to a temp directory for each test."""
d = str(tmp_path / "ledgers")
monkeypatch.setattr(ledger, "LEDGER_DIR", d)
return d
class TestLoadSave:
def test_load_empty(self):
data = ledger.load("test-stream")
assert data["stream"] == "test-stream"
assert data["orientation"] == ""
assert data["blockers"] == []
assert data["open"] == []
def test_save_and_load(self):
data = {
"stream": "s1",
"orientation": "build context engine",
"latest": "implemented ledger",
"blockers": ["rate limits"],
"open": ["how to compress?"],
"delta": "added ledger module",
}
ledger.save("s1", data)
loaded = ledger.load("s1")
assert loaded == data
def test_save_enforces_field_length(self):
data = {
"stream": "s1",
"orientation": "x" * 1000,
"latest": "",
"blockers": [],
"open": [],
"delta": "",
}
ledger.save("s1", data)
loaded = ledger.load("s1")
assert len(loaded["orientation"]) == ledger.MAX_FIELD_CHARS
def test_save_enforces_list_length(self):
data = {
"stream": "s1",
"orientation": "",
"latest": "",
"blockers": [f"blocker-{i}" for i in range(20)],
"open": [f"question-{i}" for i in range(20)],
"delta": "",
}
ledger.save("s1", data)
loaded = ledger.load("s1")
assert len(loaded["blockers"]) == ledger.MAX_BLOCKERS
assert len(loaded["open"]) == ledger.MAX_OPEN
class TestUpdate:
def test_update_fields(self):
result = ledger.update("s1", orientation="test orientation", delta="did stuff")
assert result["orientation"] == "test orientation"
assert result["delta"] == "did stuff"
assert result["stream"] == "s1"
def test_update_ignores_unknown_keys(self):
result = ledger.update("s1", unknown_key="value")
assert "unknown_key" not in result
def test_update_persists(self):
ledger.update("s1", orientation="phase 1")
loaded = ledger.load("s1")
assert loaded["orientation"] == "phase 1"
class TestSnapshot:
def test_snapshot_empty(self):
snap = ledger.snapshot("empty-stream")
assert "stream: empty-stream" in snap
assert "orientation:" in snap
def test_snapshot_with_data(self):
ledger.update(
"s1",
orientation="context sovereignty",
blockers=["rate limits"],
open=["compression strategy?"],
)
snap = ledger.snapshot("s1")
assert "context sovereignty" in snap
assert "rate limits" in snap
assert "compression strategy?" in snap

114
ra2/tests/test_redact.py Normal file
View File

@ -0,0 +1,114 @@
"""Tests for ra2.redact"""
import pytest
from ra2.redact import redact, redact_dict, redact_messages, REDACTED
class TestRedact:
def test_openai_key(self):
text = "my key is sk-abc123def456ghi789jklmnopqrs"
result = redact(text)
assert "sk-abc" not in result
assert REDACTED in result
def test_anthropic_key(self):
text = "key: sk-ant-abc123def456ghi789jklmnopqrs"
result = redact(text)
assert "sk-ant-" not in result
assert REDACTED in result
def test_discord_token(self):
# Build a fake Discord-shaped token dynamically to avoid push protection.
# Pattern: [MN][A-Za-z0-9]{23,}.[A-Za-z0-9_-]{6}.[A-Za-z0-9_-]{27,}
prefix = "M" + "T" * 23 # 24 chars, starts with M
mid = "G" + "a" * 5 # 6 chars
suffix = "x" * 27 # 27 chars
token = f"{prefix}.{mid}.{suffix}"
text = f"token is {token}"
result = redact(text)
assert token not in result
assert REDACTED in result
def test_google_key(self):
text = "key=AIzaSyD-abcdefghijklmnopqrstuvwxyz12345"
result = redact(text)
assert "AIza" not in result
assert REDACTED in result
def test_aws_key(self):
text = "aws key: AKIAIOSFODNN7EXAMPLE"
result = redact(text)
assert "AKIA" not in result
def test_slack_token(self):
text = "token: xoxb-123456789012-abcdefghij"
result = redact(text)
assert "xoxb-" not in result
def test_github_token(self):
text = "auth: ghp_ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijkl"
result = redact(text)
assert "ghp_" not in result
def test_telegram_token(self):
text = "bot: 123456789:ABCDefGHIJKlMNOpQRSTuvWXYz0123456789a"
result = redact(text)
assert "ABCDef" not in result
def test_bearer_token(self):
text = "Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.abc"
result = redact(text)
assert "eyJh" not in result
def test_generic_secret_key_value(self):
text = 'api_key = "abcdefghijklmnopqrstuvwxyz1234567890ABCD"'
result = redact(text)
assert "abcdefghij" not in result
# The label should still be there
assert "api_key" in result
def test_no_false_positive_normal_text(self):
text = "Hello, this is a normal message with no secrets."
assert redact(text) == text
def test_multiple_secrets(self):
text = "keys: sk-abc123def456ghi789jklmnopqrs and sk-ant-xyz123abc456def789ghi"
result = redact(text)
assert "sk-abc" not in result
assert "sk-ant-" not in result
assert result.count(REDACTED) == 2
class TestRedactDict:
def test_flat_dict(self):
d = {"key": "sk-abc123def456ghi789jklmnopqrs", "name": "test"}
result = redact_dict(d)
assert REDACTED in result["key"]
assert result["name"] == "test"
def test_nested_dict(self):
d = {"outer": {"inner": "sk-abc123def456ghi789jklmnopqrs"}}
result = redact_dict(d)
assert REDACTED in result["outer"]["inner"]
def test_list_values(self):
d = {"tokens": ["sk-abc123def456ghi789jklmnopqrs", "normal"]}
result = redact_dict(d)
assert REDACTED in result["tokens"][0]
assert result["tokens"][1] == "normal"
class TestRedactMessages:
def test_redacts_content(self):
msgs = [
{"role": "user", "content": "my key is sk-abc123def456ghi789jklmnopqrs"},
{"role": "assistant", "content": "I see a key"},
]
result = redact_messages(msgs)
assert REDACTED in result[0]["content"]
assert result[1]["content"] == "I see a key"
def test_preserves_non_string_content(self):
msgs = [{"role": "user", "content": 42}]
result = redact_messages(msgs)
assert result[0]["content"] == 42

91
ra2/tests/test_sigil.py Normal file
View File

@ -0,0 +1,91 @@
"""Tests for ra2.sigil"""
import pytest
from ra2 import sigil
@pytest.fixture(autouse=True)
def tmp_sigil_dir(monkeypatch, tmp_path):
"""Redirect sigil storage to a temp directory for each test."""
d = str(tmp_path / "sigils")
monkeypatch.setattr(sigil, "SIGIL_DIR", d)
return d
class TestLoadSave:
def test_load_empty(self):
entries = sigil.load("test-stream")
assert entries == []
def test_save_and_load(self):
entries = [(1, "fork\u2192context_sov"), (2, "token_burn\u2192compress")]
sigil.save("s1", entries)
loaded = sigil.load("s1")
assert loaded == entries
def test_fifo_on_save(self):
entries = [(i, f"entry-{i}") for i in range(1, 25)]
sigil.save("s1", entries)
loaded = sigil.load("s1")
assert len(loaded) == sigil.MAX_ENTRIES
# Should keep the last 15
assert loaded[0][0] == 10
assert loaded[-1][0] == 24
class TestAppend:
def test_append_single(self):
entries = sigil.append("s1", "fork\u2192ctx")
assert len(entries) == 1
assert entries[0] == (1, "fork\u2192ctx")
def test_append_multiple(self):
sigil.append("s1", "entry-a")
entries = sigil.append("s1", "entry-b")
assert len(entries) == 2
assert entries[0][1] == "entry-a"
assert entries[1][1] == "entry-b"
def test_fifo_eviction(self):
for i in range(20):
entries = sigil.append("s1", f"e-{i}")
assert len(entries) == sigil.MAX_ENTRIES
# Oldest entries should be gone
bodies = [e[1] for e in entries]
assert "e-0" not in bodies
assert "e-19" in bodies
class TestSnapshot:
def test_snapshot_empty(self):
snap = sigil.snapshot("empty")
assert snap == "(no sigils)"
def test_snapshot_with_entries(self):
sigil.append("s1", "fork\u2192context_sov")
sigil.append("s1", "token_burn\u2192compress")
snap = sigil.snapshot("s1")
assert "\u03c31:" in snap
assert "fork\u2192context_sov" in snap
assert "\u03c32:" in snap
assert "token_burn\u2192compress" in snap
class TestGenerateFromMessage:
def test_fork_detection(self):
body = sigil.generate_from_message("We forked to context_sov branch")
assert body is not None
assert "fork" in body
assert "context_sov" in body
def test_token_burn_detection(self):
body = sigil.generate_from_message("Seeing token burn on this stream")
assert body == "token_burn\u2192compress"
def test_rate_limit_detection(self):
body = sigil.generate_from_message("Hit a rate limit again")
assert body == "rate_limit\u2192detected"
def test_no_match(self):
body = sigil.generate_from_message("Hello, how are you?")
assert body is None

View File

@ -0,0 +1,69 @@
"""Tests for ra2.token_gate"""
import pytest
from ra2.token_gate import (
estimate_tokens,
check_budget,
shrink_window,
TokenBudgetExceeded,
LIVE_WINDOW_MIN,
)
class TestEstimateTokens:
def test_empty_string(self):
assert estimate_tokens("") == 0
def test_short_string(self):
# "ab" = 2 chars, 2//4 = 0 → clamped to 1
assert estimate_tokens("ab") == 1
def test_known_length(self):
text = "a" * 400
# 400 / 4 = 100
assert estimate_tokens(text) == 100
def test_proportional(self):
short = estimate_tokens("hello world")
long = estimate_tokens("hello world " * 100)
assert long > short
class TestCheckBudget:
def test_within_budget(self):
assert check_budget(100, limit=200) is True
def test_at_budget(self):
assert check_budget(200, limit=200) is True
def test_over_budget(self):
assert check_budget(201, limit=200) is False
class TestShrinkWindow:
def test_halves(self):
assert shrink_window(16) == 8
def test_halves_again(self):
assert shrink_window(8) == 4
def test_at_minimum_raises(self):
with pytest.raises(TokenBudgetExceeded):
shrink_window(LIVE_WINDOW_MIN)
def test_below_minimum_raises(self):
with pytest.raises(TokenBudgetExceeded):
shrink_window(2)
def test_odd_number(self):
# 5 // 2 = 2, but clamped to LIVE_WINDOW_MIN (4)
assert shrink_window(5) == LIVE_WINDOW_MIN
class TestTokenBudgetExceeded:
def test_attributes(self):
exc = TokenBudgetExceeded(estimated=7000, limit=6000)
assert exc.estimated == 7000
assert exc.limit == 6000
assert "7000" in str(exc)
assert "6000" in str(exc)

56
ra2/token_gate.py Normal file
View File

@ -0,0 +1,56 @@
"""
ra2.token_gate Token estimation and hard cap enforcement.
Provides a fast, deterministic token estimator (no external tokenizer dependency)
and gate logic that prevents any prompt from exceeding MAX_TOKENS.
"""
import os
# Configurable via environment or direct override
MAX_TOKENS: int = int(os.environ.get("RA2_MAX_TOKENS", "6000"))
LIVE_WINDOW: int = int(os.environ.get("RA2_LIVE_WINDOW", "16"))
LIVE_WINDOW_MIN: int = 4 # Never shrink below this
class TokenBudgetExceeded(Exception):
"""Raised when prompt exceeds MAX_TOKENS even after shrinking."""
def __init__(self, estimated: int, limit: int):
self.estimated = estimated
self.limit = limit
super().__init__(
f"Token budget exceeded: {estimated} > {limit} after all shrink attempts"
)
def estimate_tokens(text: str) -> int:
"""Fast deterministic token estimate.
Uses the ~4 chars per token heuristic which is a reasonable average
across GPT/Claude tokenizers for English text. No external dependency.
"""
if not text:
return 0
# Rough estimate: 1 token per 4 characters, minimum 1
return max(1, len(text) // 4)
def check_budget(estimated: int, limit: int | None = None) -> bool:
"""Return True if *estimated* is within budget, False otherwise."""
limit = limit if limit is not None else MAX_TOKENS
return estimated <= limit
def shrink_window(current_window: int) -> int:
"""Halve the live window, respecting the minimum.
Returns the new window size, or raises TokenBudgetExceeded if
already at minimum.
"""
if current_window <= LIVE_WINDOW_MIN:
raise TokenBudgetExceeded(
estimated=0, # caller should fill real value
limit=MAX_TOKENS,
)
return max(LIVE_WINDOW_MIN, current_window // 2)