diff --git a/ra2/context_engine.py b/ra2/context_engine.py index ee7c611d1d0..6e0404f7fea 100644 --- a/ra2/context_engine.py +++ b/ra2/context_engine.py @@ -98,10 +98,11 @@ def _run_compression(messages: list, stream_id: str) -> None: for m in _QUESTION_RE.finditer(text): open_questions.append(m.group(1).strip()) - # Sigil generation - sigil_body = sigil.generate_from_message(text) - if sigil_body: - sigil.append(stream_id, sigil_body) + # Sigil event generation + sigil_triple = sigil.generate_from_message(text) + if sigil_triple: + op, constraint, decision = sigil_triple + sigil.append_event(stream_id, op, constraint, decision) # Build delta from decisions delta = "; ".join(decisions[-5:]) if decisions else "" @@ -123,17 +124,21 @@ def _run_compression(messages: list, stream_id: str) -> None: def _assemble_prompt(stream_id: str, live_messages: list) -> str: - """Build the structured prompt from ledger + sigil + live window.""" + """Build the structured prompt from ledger + (optional sigil) + live window.""" sections = [] + # Sigil section — only when DEBUG_SIGIL is enabled + if sigil.DEBUG_SIGIL: + sigil_snap = sigil.snapshot(stream_id) + if sigil_snap != "(no sigils)": + sections.append( + f"=== INTERNAL SIGIL SNAPSHOT ===\n{sigil_snap}" + ) + # Ledger section ledger_snap = ledger.snapshot(stream_id) sections.append(f"=== LEDGER ===\n{ledger_snap}") - # Sigil section - sigil_snap = sigil.snapshot(stream_id) - sections.append(f"=== SIGIL ===\n{sigil_snap}") - # Live window section live_lines = [] for msg in live_messages: diff --git a/ra2/sigil.py b/ra2/sigil.py index ae09bf225ff..850c4fece58 100644 --- a/ra2/sigil.py +++ b/ra2/sigil.py @@ -1,106 +1,245 @@ """ -ra2.sigil — Sigil shorthand memory (one plain-text file per stream). +ra2.sigil — Layered internal state map stored as JSON (one file per stream). -Format per line: σN: key→value -Max 15 entries, FIFO replacement. -Deterministic rule-based generation only — no AI involvement. +Two layers: + EVENT — decision causality log [{operator, constraint, decision, timestamp}] + STATE — authoritative snapshot {arch, risk, mode} + +Deterministic. Bounded. Internal-only (hidden unless DEBUG_SIGIL=true). +No AI generation. No semantic expansion. No prose. """ +import json import os import re -from typing import List, Optional, Tuple +from datetime import datetime, timezone +from typing import Dict, List, Optional, Tuple SIGIL_DIR: str = os.environ.get( "RA2_SIGIL_DIR", os.path.join(os.path.expanduser("~"), ".ra2", "sigils"), ) -MAX_ENTRIES = 15 -_LINE_RE = re.compile(r"^σ(\d+):\s*(.+)$") +DEBUG_SIGIL: bool = os.environ.get("DEBUG_SIGIL", "false").lower() == "true" +MAX_EVENT_ENTRIES = 15 +MAX_FIELD_CHARS = 64 +MAX_FILE_BYTES = int(os.environ.get("RA2_SIGIL_MAX_BYTES", "8192")) + +_SNAKE_RE = re.compile(r"^[a-z][a-z0-9_]*$") + + +# ── Schema ────────────────────────────────────────────────────────── + +def _empty_state() -> dict: + """Return the canonical empty sigil document.""" + return { + "event": [], + "state": { + "arch": { + "wrapper": "", + "compression": "", + "agents": "", + "router": "", + }, + "risk": { + "token_pressure": "", + "cooldown": "", + "scope_creep": "", + }, + "mode": { + "determinism": "", + "rewrite_mode": "", + "debug": False, + }, + }, + } + + +def _validate_snake(value: str) -> str: + """Validate and truncate a snake_case string field.""" + value = value.strip()[:MAX_FIELD_CHARS] + return value + + +def _validate_event(event: dict) -> bool: + """Return True if an event dict has all required keys with valid values.""" + for key in ("operator", "constraint", "decision"): + val = event.get(key) + if not isinstance(val, str) or not val: + return False + if len(val) > MAX_FIELD_CHARS: + return False + return "timestamp" in event + + +# ── File I/O ──────────────────────────────────────────────────────── def _sigil_path(stream_id: str) -> str: - return os.path.join(SIGIL_DIR, f"{stream_id}.sigil") + return os.path.join(SIGIL_DIR, f"{stream_id}.json") -def load(stream_id: str) -> List[Tuple[int, str]]: - """Load sigil entries as list of (index, body) tuples.""" +def load(stream_id: str) -> dict: + """Load the JSON sigil state for a stream.""" path = _sigil_path(stream_id) if not os.path.exists(path): - return [] - entries: List[Tuple[int, str]] = [] + return _empty_state() with open(path, "r", encoding="utf-8") as f: - for line in f: - line = line.strip() - m = _LINE_RE.match(line) - if m: - entries.append((int(m.group(1)), m.group(2))) - return entries + try: + data = json.load(f) + except (json.JSONDecodeError, ValueError): + return _empty_state() + + # Ensure structural integrity — fill missing keys from template + template = _empty_state() + if not isinstance(data.get("event"), list): + data["event"] = template["event"] + if not isinstance(data.get("state"), dict): + data["state"] = template["state"] + for section in ("arch", "risk", "mode"): + if not isinstance(data["state"].get(section), dict): + data["state"][section] = template["state"][section] + return data -def save(stream_id: str, entries: List[Tuple[int, str]]) -> None: - """Persist sigil entries, enforcing MAX_ENTRIES via FIFO.""" - # FIFO: keep only the last MAX_ENTRIES - entries = entries[-MAX_ENTRIES:] +def save(stream_id: str, state: dict) -> None: + """Atomically persist the JSON sigil state to disk. + + Enforces EVENT cap, field lengths, and total file size. + """ + # FIFO trim events + events = state.get("event", [])[-MAX_EVENT_ENTRIES:] + state["event"] = events + os.makedirs(SIGIL_DIR, exist_ok=True) path = _sigil_path(stream_id) - with open(path, "w", encoding="utf-8") as f: - for idx, body in entries: - f.write(f"\u03c3{idx}: {body}\n") + + content = json.dumps(state, indent=2, ensure_ascii=False) + + # Enforce total file size — trim oldest events until it fits + while len(content.encode("utf-8")) > MAX_FILE_BYTES and state["event"]: + state["event"].pop(0) + content = json.dumps(state, indent=2, ensure_ascii=False) + + # Atomic write: write to temp then rename + tmp_path = path + ".tmp" + with open(tmp_path, "w", encoding="utf-8") as f: + f.write(content) + os.replace(tmp_path, path) -def append(stream_id: str, body: str) -> List[Tuple[int, str]]: - """Add a new sigil entry. Auto-numbers and FIFO-evicts if at capacity.""" - entries = load(stream_id) - next_idx = (entries[-1][0] + 1) if entries else 1 - entries.append((next_idx, body)) - # FIFO eviction - if len(entries) > MAX_ENTRIES: - entries = entries[-MAX_ENTRIES:] - save(stream_id, entries) - return entries +# ── Mutation helpers ──────────────────────────────────────────────── +def _now_iso() -> str: + return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") + + +def append_event(stream_id: str, operator: str, constraint: str, + decision: str) -> dict: + """Add an event triple. Deduplicates and FIFO-trims. + + Rejects fields longer than MAX_FIELD_CHARS. + """ + operator = _validate_snake(operator) + constraint = _validate_snake(constraint) + decision = _validate_snake(decision) + + if not operator or not constraint or not decision: + return load(stream_id) + + state = load(stream_id) + + # Dedup on (operator, constraint, decision) + triple = (operator, constraint, decision) + for existing in state["event"]: + if (existing["operator"], existing["constraint"], + existing["decision"]) == triple: + return state + + event = { + "operator": operator, + "constraint": constraint, + "decision": decision, + "timestamp": _now_iso(), + } + + state["event"].append(event) + state["event"] = state["event"][-MAX_EVENT_ENTRIES:] + + save(stream_id, state) + return state + + +def update_state(stream_id: str, + arch: Optional[Dict[str, str]] = None, + risk: Optional[Dict[str, str]] = None, + mode: Optional[dict] = None) -> dict: + """Overwrite STATE sections. STATE is authoritative snapshot.""" + state = load(stream_id) + if arch is not None: + state["state"]["arch"] = arch + if risk is not None: + state["state"]["risk"] = risk + if mode is not None: + state["state"]["mode"] = mode + save(stream_id, state) + return state + + +# ── Snapshot ──────────────────────────────────────────────────────── def snapshot(stream_id: str) -> str: - """Return sigil state as plain text for prompt injection.""" - entries = load(stream_id) - if not entries: + """Return compacted JSON string for debug prompt injection. + + Only meaningful when DEBUG_SIGIL is true. + """ + state = load(stream_id) + if not state["event"] and not any( + v for v in state["state"]["arch"].values() if v + ): return "(no sigils)" - return "\n".join(f"\u03c3{idx}: {body}" for idx, body in entries) + return json.dumps(state, indent=2, ensure_ascii=False) -# ── Deterministic sigil generators ────────────────────────────────── +# ── Deterministic event generators ───────────────────────────────── -# Rule-based patterns that detect sigil-worthy events from messages. -# Each rule: (regex_on_content, sigil_body_template) -_SIGIL_RULES: List[Tuple[re.Pattern, str]] = [ - (re.compile(r"fork(?:ed|ing)?\s*(?:to|into|→)\s*(\S+)", re.I), - "fork\u2192{0}"), +# Each rule: (regex, (operator, constraint, decision)) +# The decision field may use {0} for first capture group. +_EVENT_RULES: List[Tuple[re.Pattern, Tuple[str, str, str]]] = [ + (re.compile(r"fork(?:ed|ing)?\s*(?:to|into|\u2192)\s*(\S+)", re.I), + ("fork", "architectural_scope", "{0}")), (re.compile(r"token[_\s]*burn", re.I), - "token_burn\u2192compress"), + ("token_burn", "context_overflow", "compress_first")), (re.compile(r"rewrite[_\s]*impulse", re.I), - "rewrite_impulse\u2192layer"), + ("rewrite_impulse", "determinism_requirement", "layering_not_rewrite")), (re.compile(r"context[_\s]*sov(?:ereignty)?", re.I), - "context_sov\u2192active"), + ("context_sov", "sovereignty_active", "enforce")), (re.compile(r"budget[_\s]*cap(?:ped)?", re.I), - "budget\u2192capped"), + ("budget_cap", "cost_constraint", "enforce_limit")), (re.compile(r"rate[_\s]*limit", re.I), - "rate_limit\u2192detected"), + ("rate_limit", "cooldown_active", "fallback_model")), (re.compile(r"provider[_\s]*switch(?:ed)?", re.I), - "provider\u2192switched"), + ("provider_switch", "availability", "route_alternate")), (re.compile(r"compaction[_\s]*trigger", re.I), - "compaction\u2192triggered"), + ("compaction", "history_overflow", "compact_now")), + (re.compile(r"thin[_\s]*wrapper", re.I), + ("fork", "architectural_scope", "thin_wrapper")), + (re.compile(r"rule[_\s]*based[_\s]*compress", re.I), + ("compression", "method_selection", "rule_based_v1")), ] -def generate_from_message(content: str) -> Optional[str]: - """Apply deterministic rules to a message. Returns sigil body or None.""" - for pattern, template in _SIGIL_RULES: +def generate_from_message(content: str) -> Optional[Tuple[str, str, str]]: + """Apply deterministic rules to message content. + + Returns (operator, constraint, decision) triple or None. + """ + for pattern, (op, constraint, decision) in _EVENT_RULES: m = pattern.search(content) if m: - # Fill template with captured groups if any try: - return template.format(*m.groups()) + filled_decision = decision.format(*m.groups()) except (IndexError, KeyError): - return template + filled_decision = decision + return (op, constraint, filled_decision) return None diff --git a/ra2/tests/test_context_engine.py b/ra2/tests/test_context_engine.py index 0ead1e2274d..8417d01aecd 100644 --- a/ra2/tests/test_context_engine.py +++ b/ra2/tests/test_context_engine.py @@ -1,5 +1,6 @@ """Tests for ra2.context_engine""" +import json import pytest from ra2 import ledger, sigil, token_gate from ra2.context_engine import build_context @@ -10,6 +11,8 @@ def tmp_storage(monkeypatch, tmp_path): """Redirect all storage to temp directories.""" monkeypatch.setattr(ledger, "LEDGER_DIR", str(tmp_path / "ledgers")) monkeypatch.setattr(sigil, "SIGIL_DIR", str(tmp_path / "sigils")) + # Default: sigil hidden from prompt + monkeypatch.setattr(sigil, "DEBUG_SIGIL", False) class TestBuildContext: @@ -24,16 +27,35 @@ class TestBuildContext: assert isinstance(result["prompt"], str) assert isinstance(result["token_estimate"], int) - def test_prompt_structure(self): + def test_prompt_structure_default(self): messages = [ {"role": "user", "content": "Let's build a context engine"}, ] result = build_context("s1", messages) prompt = result["prompt"] assert "=== LEDGER ===" in prompt - assert "=== SIGIL ===" in prompt assert "=== LIVE WINDOW ===" in prompt assert "Respond concisely" in prompt + # Sigil should NOT appear by default + assert "INTERNAL SIGIL SNAPSHOT" not in prompt + + def test_sigil_hidden_by_default(self): + messages = [ + {"role": "user", "content": "We forked to context_sov"}, + ] + result = build_context("s1", messages) + # Event should be recorded in JSON but not in prompt + state = sigil.load("s1") + assert len(state["event"]) > 0 + assert "INTERNAL SIGIL SNAPSHOT" not in result["prompt"] + + def test_sigil_shown_when_debug(self, monkeypatch): + monkeypatch.setattr(sigil, "DEBUG_SIGIL", True) + messages = [ + {"role": "user", "content": "We forked to context_sov"}, + ] + result = build_context("s1", messages) + assert "=== INTERNAL SIGIL SNAPSHOT ===" in result["prompt"] def test_live_window_content(self): messages = [ @@ -59,7 +81,6 @@ class TestBuildContext: ] build_context("s1", messages) data = ledger.load("s1") - # Compression should have extracted decisions into delta assert data["delta"] != "" def test_compression_detects_blockers(self): @@ -78,13 +99,24 @@ class TestBuildContext: data = ledger.load("s1") assert len(data["open"]) > 0 - def test_sigil_generation(self): + def test_sigil_event_generation(self): messages = [ {"role": "user", "content": "We forked to context_sov"}, ] build_context("s1", messages) - entries = sigil.load("s1") - assert len(entries) > 0 + state = sigil.load("s1") + assert len(state["event"]) > 0 + assert state["event"][0]["operator"] == "fork" + + def test_sigil_dedup_across_calls(self): + messages = [ + {"role": "user", "content": "We forked to context_sov"}, + ] + build_context("s1", messages) + build_context("s1", messages) + state = sigil.load("s1") + # Same triple should not be duplicated + assert len(state["event"]) == 1 def test_token_estimate_positive(self): messages = [{"role": "user", "content": "hello"}] @@ -92,24 +124,18 @@ class TestBuildContext: assert result["token_estimate"] > 0 def test_window_shrinks_on_large_input(self, monkeypatch): - # Set a very low token cap monkeypatch.setattr(token_gate, "MAX_TOKENS", 200) monkeypatch.setattr(token_gate, "LIVE_WINDOW", 16) - - # Create many messages to exceed budget messages = [ {"role": "user", "content": f"This is message number {i} with some content"} for i in range(20) ] result = build_context("s1", messages) - # Should succeed with a smaller window assert result["token_estimate"] <= 200 def test_hard_fail_on_impossible_budget(self, monkeypatch): - # Set impossibly low token cap monkeypatch.setattr(token_gate, "MAX_TOKENS", 5) monkeypatch.setattr(token_gate, "LIVE_WINDOW", 4) - messages = [ {"role": "user", "content": "x" * 1000}, ] @@ -129,10 +155,24 @@ class TestBuildContext: assert "Hello from structured content" in result["prompt"] def test_no_md_history_injection(self): - """Verify that build_context only uses provided messages, never reads .md files.""" messages = [{"role": "user", "content": "just this"}] result = build_context("s1", messages) - # The prompt should contain only our message content plus ledger/sigil structure assert "just this" in result["prompt"] - # No markdown file references should appear assert ".md" not in result["prompt"] + + def test_debug_sigil_snapshot_is_valid_json(self, monkeypatch): + monkeypatch.setattr(sigil, "DEBUG_SIGIL", True) + messages = [ + {"role": "user", "content": "We forked to context_sov"}, + ] + result = build_context("s1", messages) + # Extract the sigil JSON from the prompt + prompt = result["prompt"] + marker = "=== INTERNAL SIGIL SNAPSHOT ===" + assert marker in prompt + start = prompt.index(marker) + len(marker) + end = prompt.index("=== LEDGER ===") + sigil_json = prompt[start:end].strip() + data = json.loads(sigil_json) + assert "event" in data + assert "state" in data diff --git a/ra2/tests/test_sigil.py b/ra2/tests/test_sigil.py index af904adcd74..1a8038327c4 100644 --- a/ra2/tests/test_sigil.py +++ b/ra2/tests/test_sigil.py @@ -1,5 +1,6 @@ -"""Tests for ra2.sigil""" +"""Tests for ra2.sigil (JSON layered format)""" +import json import pytest from ra2 import sigil @@ -12,80 +13,244 @@ def tmp_sigil_dir(monkeypatch, tmp_path): return d +# ── Load / Save ───────────────────────────────────────────────────── + class TestLoadSave: def test_load_empty(self): - entries = sigil.load("test-stream") - assert entries == [] + state = sigil.load("test-stream") + assert state["event"] == [] + assert state["state"]["arch"]["wrapper"] == "" + assert state["state"]["risk"]["token_pressure"] == "" + assert state["state"]["mode"]["debug"] is False - def test_save_and_load(self): - entries = [(1, "fork\u2192context_sov"), (2, "token_burn\u2192compress")] - sigil.save("s1", entries) + def test_save_and_load_roundtrip(self): + state = sigil._empty_state() + state["event"].append({ + "operator": "fork", + "constraint": "architectural_scope", + "decision": "thin_wrapper", + "timestamp": "2026-02-19T04:00:00Z", + }) + state["state"]["arch"]["wrapper"] = "thin" + sigil.save("s1", state) loaded = sigil.load("s1") - assert loaded == entries + assert len(loaded["event"]) == 1 + assert loaded["event"][0]["operator"] == "fork" + assert loaded["state"]["arch"]["wrapper"] == "thin" + + def test_save_creates_json_file(self, tmp_sigil_dir): + state = sigil._empty_state() + sigil.save("s1", state) + import os + path = os.path.join(tmp_sigil_dir, "s1.json") + assert os.path.exists(path) + with open(path) as f: + data = json.load(f) + assert "event" in data + assert "state" in data def test_fifo_on_save(self): - entries = [(i, f"entry-{i}") for i in range(1, 25)] - sigil.save("s1", entries) + state = sigil._empty_state() + for i in range(20): + state["event"].append({ + "operator": f"op_{i}", + "constraint": "c", + "decision": "d", + "timestamp": "2026-01-01T00:00:00Z", + }) + sigil.save("s1", state) loaded = sigil.load("s1") - assert len(loaded) == sigil.MAX_ENTRIES + assert len(loaded["event"]) == sigil.MAX_EVENT_ENTRIES # Should keep the last 15 - assert loaded[0][0] == 10 - assert loaded[-1][0] == 24 + assert loaded["event"][0]["operator"] == "op_5" + assert loaded["event"][-1]["operator"] == "op_19" + + def test_corrupt_file_returns_empty(self, tmp_sigil_dir): + import os + os.makedirs(tmp_sigil_dir, exist_ok=True) + path = os.path.join(tmp_sigil_dir, "bad.json") + with open(path, "w") as f: + f.write("not valid json{{{") + state = sigil.load("bad") + assert state["event"] == [] + assert "arch" in state["state"] + + def test_missing_sections_filled(self, tmp_sigil_dir): + import os + os.makedirs(tmp_sigil_dir, exist_ok=True) + path = os.path.join(tmp_sigil_dir, "partial.json") + with open(path, "w") as f: + json.dump({"event": [], "state": {}}, f) + state = sigil.load("partial") + assert "arch" in state["state"] + assert "risk" in state["state"] + assert "mode" in state["state"] -class TestAppend: +# ── append_event ──────────────────────────────────────────────────── + +class TestAppendEvent: def test_append_single(self): - entries = sigil.append("s1", "fork\u2192ctx") - assert len(entries) == 1 - assert entries[0] == (1, "fork\u2192ctx") + state = sigil.append_event("s1", "fork", "arch_scope", "thin_wrapper") + assert len(state["event"]) == 1 + assert state["event"][0]["operator"] == "fork" + assert state["event"][0]["constraint"] == "arch_scope" + assert state["event"][0]["decision"] == "thin_wrapper" + assert "timestamp" in state["event"][0] def test_append_multiple(self): - sigil.append("s1", "entry-a") - entries = sigil.append("s1", "entry-b") - assert len(entries) == 2 - assert entries[0][1] == "entry-a" - assert entries[1][1] == "entry-b" + sigil.append_event("s1", "fork", "scope", "wrapper") + state = sigil.append_event("s1", "token_burn", "overflow", "compress") + assert len(state["event"]) == 2 + + def test_deduplication(self): + sigil.append_event("s1", "fork", "scope", "wrapper") + state = sigil.append_event("s1", "fork", "scope", "wrapper") + assert len(state["event"]) == 1 def test_fifo_eviction(self): for i in range(20): - entries = sigil.append("s1", f"e-{i}") - assert len(entries) == sigil.MAX_ENTRIES - # Oldest entries should be gone - bodies = [e[1] for e in entries] - assert "e-0" not in bodies - assert "e-19" in bodies + state = sigil.append_event("s1", f"op_{i}", "c", "d") + assert len(state["event"]) == sigil.MAX_EVENT_ENTRIES + operators = [e["operator"] for e in state["event"]] + assert "op_0" not in operators + assert "op_19" in operators + def test_rejects_empty_fields(self): + state = sigil.append_event("s1", "", "c", "d") + assert len(state["event"]) == 0 + + def test_truncates_long_fields(self): + long_val = "a" * 100 + state = sigil.append_event("s1", long_val, "c", "d") + assert len(state["event"]) == 1 + assert len(state["event"][0]["operator"]) <= sigil.MAX_FIELD_CHARS + + +# ── update_state ──────────────────────────────────────────────────── + +class TestUpdateState: + def test_update_arch(self): + state = sigil.update_state("s1", arch={ + "wrapper": "thin", + "compression": "rule_based_v1", + "agents": "disabled", + "router": "legacy", + }) + assert state["state"]["arch"]["wrapper"] == "thin" + assert state["state"]["arch"]["compression"] == "rule_based_v1" + + def test_update_risk(self): + state = sigil.update_state("s1", risk={ + "token_pressure": "controlled", + "cooldown": "monitored", + "scope_creep": "constrained", + }) + assert state["state"]["risk"]["token_pressure"] == "controlled" + + def test_update_mode(self): + state = sigil.update_state("s1", mode={ + "determinism": "prioritized", + "rewrite_mode": "disabled", + "debug": False, + }) + assert state["state"]["mode"]["determinism"] == "prioritized" + + def test_update_overwrites(self): + sigil.update_state("s1", arch={"wrapper": "thin"}) + state = sigil.update_state("s1", arch={"wrapper": "fat"}) + assert state["state"]["arch"]["wrapper"] == "fat" + + def test_update_preserves_events(self): + sigil.append_event("s1", "fork", "scope", "wrapper") + state = sigil.update_state("s1", arch={"wrapper": "thin"}) + assert len(state["event"]) == 1 + assert state["state"]["arch"]["wrapper"] == "thin" + + def test_partial_update(self): + sigil.update_state("s1", arch={"wrapper": "thin"}) + state = sigil.update_state("s1", risk={"token_pressure": "high"}) + # arch should still be there + assert state["state"]["arch"]["wrapper"] == "thin" + assert state["state"]["risk"]["token_pressure"] == "high" + + +# ── snapshot ──────────────────────────────────────────────────────── class TestSnapshot: def test_snapshot_empty(self): snap = sigil.snapshot("empty") assert snap == "(no sigils)" - def test_snapshot_with_entries(self): - sigil.append("s1", "fork\u2192context_sov") - sigil.append("s1", "token_burn\u2192compress") + def test_snapshot_with_events(self): + sigil.append_event("s1", "fork", "scope", "wrapper") snap = sigil.snapshot("s1") - assert "\u03c31:" in snap - assert "fork\u2192context_sov" in snap - assert "\u03c32:" in snap - assert "token_burn\u2192compress" in snap + data = json.loads(snap) + assert len(data["event"]) == 1 + assert data["event"][0]["operator"] == "fork" + def test_snapshot_with_state(self): + sigil.update_state("s1", arch={"wrapper": "thin"}) + snap = sigil.snapshot("s1") + data = json.loads(snap) + assert data["state"]["arch"]["wrapper"] == "thin" + + def test_snapshot_is_valid_json(self): + sigil.append_event("s1", "fork", "scope", "wrapper") + sigil.update_state("s1", arch={"wrapper": "thin"}) + snap = sigil.snapshot("s1") + data = json.loads(snap) # Should not raise + assert "event" in data + assert "state" in data + + +# ── generate_from_message ─────────────────────────────────────────── class TestGenerateFromMessage: def test_fork_detection(self): - body = sigil.generate_from_message("We forked to context_sov branch") - assert body is not None - assert "fork" in body - assert "context_sov" in body + result = sigil.generate_from_message("We forked to context_sov branch") + assert result is not None + op, constraint, decision = result + assert op == "fork" + assert constraint == "architectural_scope" + assert "context_sov" in decision def test_token_burn_detection(self): - body = sigil.generate_from_message("Seeing token burn on this stream") - assert body == "token_burn\u2192compress" + result = sigil.generate_from_message("Seeing token burn on this stream") + assert result == ("token_burn", "context_overflow", "compress_first") def test_rate_limit_detection(self): - body = sigil.generate_from_message("Hit a rate limit again") - assert body == "rate_limit\u2192detected" + result = sigil.generate_from_message("Hit a rate limit again") + assert result == ("rate_limit", "cooldown_active", "fallback_model") + + def test_thin_wrapper_detection(self): + result = sigil.generate_from_message("Use a thin wrapper approach") + assert result is not None + assert result[2] == "thin_wrapper" def test_no_match(self): - body = sigil.generate_from_message("Hello, how are you?") - assert body is None + result = sigil.generate_from_message("Hello, how are you?") + assert result is None + + def test_returns_triple(self): + result = sigil.generate_from_message("compaction trigger needed") + assert result is not None + assert len(result) == 3 + op, constraint, decision = result + assert op == "compaction" + assert constraint == "history_overflow" + assert decision == "compact_now" + + +# ── File size cap ─────────────────────────────────────────────────── + +class TestFileSizeCap: + def test_file_respects_size_cap(self, monkeypatch, tmp_sigil_dir): + import os + # Set a small cap + monkeypatch.setattr(sigil, "MAX_FILE_BYTES", 512) + for i in range(20): + sigil.append_event("s1", f"operator_{i}", "constraint", "decision") + path = os.path.join(tmp_sigil_dir, "s1.json") + size = os.path.getsize(path) + assert size <= 512