feat(ra2/sigil): rewrite as JSON layered internal state system
Replace plain-text σN format with structured JSON per stream: - EVENT layer: decision causality log with operator→constraint→decision triples and ISO8601 timestamps. Max 15 entries, FIFO trim, dedup. - STATE layer: authoritative snapshot with arch, risk, mode sections. Overwritten (not appended) on each update cycle. - Fields capped at 64 chars, file size capped at 8KB (configurable). - Atomic writes via tmp+rename. - Corrupt/partial JSON gracefully falls back to empty template. Sigil is internal-only by default: - Not included in model prompts unless DEBUG_SIGIL=true - When debug enabled, injected as === INTERNAL SIGIL SNAPSHOT === - Never exposed to Discord users unless debug flag active context_engine.py updated: - Compression pass emits (operator, constraint, decision) triples - Sigil section gated behind DEBUG_SIGIL flag 85 tests passing (up from 64). https://claude.ai/code/session_01K7BWJY2gUoJi6dq91Yc7nx
This commit is contained in:
parent
56d19a0130
commit
218358da18
@ -98,10 +98,11 @@ def _run_compression(messages: list, stream_id: str) -> None:
|
||||
for m in _QUESTION_RE.finditer(text):
|
||||
open_questions.append(m.group(1).strip())
|
||||
|
||||
# Sigil generation
|
||||
sigil_body = sigil.generate_from_message(text)
|
||||
if sigil_body:
|
||||
sigil.append(stream_id, sigil_body)
|
||||
# Sigil event generation
|
||||
sigil_triple = sigil.generate_from_message(text)
|
||||
if sigil_triple:
|
||||
op, constraint, decision = sigil_triple
|
||||
sigil.append_event(stream_id, op, constraint, decision)
|
||||
|
||||
# Build delta from decisions
|
||||
delta = "; ".join(decisions[-5:]) if decisions else ""
|
||||
@ -123,17 +124,21 @@ def _run_compression(messages: list, stream_id: str) -> None:
|
||||
|
||||
|
||||
def _assemble_prompt(stream_id: str, live_messages: list) -> str:
|
||||
"""Build the structured prompt from ledger + sigil + live window."""
|
||||
"""Build the structured prompt from ledger + (optional sigil) + live window."""
|
||||
sections = []
|
||||
|
||||
# Sigil section — only when DEBUG_SIGIL is enabled
|
||||
if sigil.DEBUG_SIGIL:
|
||||
sigil_snap = sigil.snapshot(stream_id)
|
||||
if sigil_snap != "(no sigils)":
|
||||
sections.append(
|
||||
f"=== INTERNAL SIGIL SNAPSHOT ===\n{sigil_snap}"
|
||||
)
|
||||
|
||||
# Ledger section
|
||||
ledger_snap = ledger.snapshot(stream_id)
|
||||
sections.append(f"=== LEDGER ===\n{ledger_snap}")
|
||||
|
||||
# Sigil section
|
||||
sigil_snap = sigil.snapshot(stream_id)
|
||||
sections.append(f"=== SIGIL ===\n{sigil_snap}")
|
||||
|
||||
# Live window section
|
||||
live_lines = []
|
||||
for msg in live_messages:
|
||||
|
||||
255
ra2/sigil.py
255
ra2/sigil.py
@ -1,106 +1,245 @@
|
||||
"""
|
||||
ra2.sigil — Sigil shorthand memory (one plain-text file per stream).
|
||||
ra2.sigil — Layered internal state map stored as JSON (one file per stream).
|
||||
|
||||
Format per line: σN: key→value
|
||||
Max 15 entries, FIFO replacement.
|
||||
Deterministic rule-based generation only — no AI involvement.
|
||||
Two layers:
|
||||
EVENT — decision causality log [{operator, constraint, decision, timestamp}]
|
||||
STATE — authoritative snapshot {arch, risk, mode}
|
||||
|
||||
Deterministic. Bounded. Internal-only (hidden unless DEBUG_SIGIL=true).
|
||||
No AI generation. No semantic expansion. No prose.
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
from typing import List, Optional, Tuple
|
||||
from datetime import datetime, timezone
|
||||
from typing import Dict, List, Optional, Tuple
|
||||
|
||||
SIGIL_DIR: str = os.environ.get(
|
||||
"RA2_SIGIL_DIR",
|
||||
os.path.join(os.path.expanduser("~"), ".ra2", "sigils"),
|
||||
)
|
||||
|
||||
MAX_ENTRIES = 15
|
||||
_LINE_RE = re.compile(r"^σ(\d+):\s*(.+)$")
|
||||
DEBUG_SIGIL: bool = os.environ.get("DEBUG_SIGIL", "false").lower() == "true"
|
||||
|
||||
MAX_EVENT_ENTRIES = 15
|
||||
MAX_FIELD_CHARS = 64
|
||||
MAX_FILE_BYTES = int(os.environ.get("RA2_SIGIL_MAX_BYTES", "8192"))
|
||||
|
||||
_SNAKE_RE = re.compile(r"^[a-z][a-z0-9_]*$")
|
||||
|
||||
|
||||
# ── Schema ──────────────────────────────────────────────────────────
|
||||
|
||||
def _empty_state() -> dict:
|
||||
"""Return the canonical empty sigil document."""
|
||||
return {
|
||||
"event": [],
|
||||
"state": {
|
||||
"arch": {
|
||||
"wrapper": "",
|
||||
"compression": "",
|
||||
"agents": "",
|
||||
"router": "",
|
||||
},
|
||||
"risk": {
|
||||
"token_pressure": "",
|
||||
"cooldown": "",
|
||||
"scope_creep": "",
|
||||
},
|
||||
"mode": {
|
||||
"determinism": "",
|
||||
"rewrite_mode": "",
|
||||
"debug": False,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
def _validate_snake(value: str) -> str:
|
||||
"""Validate and truncate a snake_case string field."""
|
||||
value = value.strip()[:MAX_FIELD_CHARS]
|
||||
return value
|
||||
|
||||
|
||||
def _validate_event(event: dict) -> bool:
|
||||
"""Return True if an event dict has all required keys with valid values."""
|
||||
for key in ("operator", "constraint", "decision"):
|
||||
val = event.get(key)
|
||||
if not isinstance(val, str) or not val:
|
||||
return False
|
||||
if len(val) > MAX_FIELD_CHARS:
|
||||
return False
|
||||
return "timestamp" in event
|
||||
|
||||
|
||||
# ── File I/O ────────────────────────────────────────────────────────
|
||||
|
||||
def _sigil_path(stream_id: str) -> str:
|
||||
return os.path.join(SIGIL_DIR, f"{stream_id}.sigil")
|
||||
return os.path.join(SIGIL_DIR, f"{stream_id}.json")
|
||||
|
||||
|
||||
def load(stream_id: str) -> List[Tuple[int, str]]:
|
||||
"""Load sigil entries as list of (index, body) tuples."""
|
||||
def load(stream_id: str) -> dict:
|
||||
"""Load the JSON sigil state for a stream."""
|
||||
path = _sigil_path(stream_id)
|
||||
if not os.path.exists(path):
|
||||
return []
|
||||
entries: List[Tuple[int, str]] = []
|
||||
return _empty_state()
|
||||
with open(path, "r", encoding="utf-8") as f:
|
||||
for line in f:
|
||||
line = line.strip()
|
||||
m = _LINE_RE.match(line)
|
||||
if m:
|
||||
entries.append((int(m.group(1)), m.group(2)))
|
||||
return entries
|
||||
try:
|
||||
data = json.load(f)
|
||||
except (json.JSONDecodeError, ValueError):
|
||||
return _empty_state()
|
||||
|
||||
# Ensure structural integrity — fill missing keys from template
|
||||
template = _empty_state()
|
||||
if not isinstance(data.get("event"), list):
|
||||
data["event"] = template["event"]
|
||||
if not isinstance(data.get("state"), dict):
|
||||
data["state"] = template["state"]
|
||||
for section in ("arch", "risk", "mode"):
|
||||
if not isinstance(data["state"].get(section), dict):
|
||||
data["state"][section] = template["state"][section]
|
||||
return data
|
||||
|
||||
|
||||
def save(stream_id: str, entries: List[Tuple[int, str]]) -> None:
|
||||
"""Persist sigil entries, enforcing MAX_ENTRIES via FIFO."""
|
||||
# FIFO: keep only the last MAX_ENTRIES
|
||||
entries = entries[-MAX_ENTRIES:]
|
||||
def save(stream_id: str, state: dict) -> None:
|
||||
"""Atomically persist the JSON sigil state to disk.
|
||||
|
||||
Enforces EVENT cap, field lengths, and total file size.
|
||||
"""
|
||||
# FIFO trim events
|
||||
events = state.get("event", [])[-MAX_EVENT_ENTRIES:]
|
||||
state["event"] = events
|
||||
|
||||
os.makedirs(SIGIL_DIR, exist_ok=True)
|
||||
path = _sigil_path(stream_id)
|
||||
with open(path, "w", encoding="utf-8") as f:
|
||||
for idx, body in entries:
|
||||
f.write(f"\u03c3{idx}: {body}\n")
|
||||
|
||||
content = json.dumps(state, indent=2, ensure_ascii=False)
|
||||
|
||||
# Enforce total file size — trim oldest events until it fits
|
||||
while len(content.encode("utf-8")) > MAX_FILE_BYTES and state["event"]:
|
||||
state["event"].pop(0)
|
||||
content = json.dumps(state, indent=2, ensure_ascii=False)
|
||||
|
||||
# Atomic write: write to temp then rename
|
||||
tmp_path = path + ".tmp"
|
||||
with open(tmp_path, "w", encoding="utf-8") as f:
|
||||
f.write(content)
|
||||
os.replace(tmp_path, path)
|
||||
|
||||
|
||||
def append(stream_id: str, body: str) -> List[Tuple[int, str]]:
|
||||
"""Add a new sigil entry. Auto-numbers and FIFO-evicts if at capacity."""
|
||||
entries = load(stream_id)
|
||||
next_idx = (entries[-1][0] + 1) if entries else 1
|
||||
entries.append((next_idx, body))
|
||||
# FIFO eviction
|
||||
if len(entries) > MAX_ENTRIES:
|
||||
entries = entries[-MAX_ENTRIES:]
|
||||
save(stream_id, entries)
|
||||
return entries
|
||||
# ── Mutation helpers ────────────────────────────────────────────────
|
||||
|
||||
def _now_iso() -> str:
|
||||
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
|
||||
|
||||
|
||||
def append_event(stream_id: str, operator: str, constraint: str,
|
||||
decision: str) -> dict:
|
||||
"""Add an event triple. Deduplicates and FIFO-trims.
|
||||
|
||||
Rejects fields longer than MAX_FIELD_CHARS.
|
||||
"""
|
||||
operator = _validate_snake(operator)
|
||||
constraint = _validate_snake(constraint)
|
||||
decision = _validate_snake(decision)
|
||||
|
||||
if not operator or not constraint or not decision:
|
||||
return load(stream_id)
|
||||
|
||||
state = load(stream_id)
|
||||
|
||||
# Dedup on (operator, constraint, decision)
|
||||
triple = (operator, constraint, decision)
|
||||
for existing in state["event"]:
|
||||
if (existing["operator"], existing["constraint"],
|
||||
existing["decision"]) == triple:
|
||||
return state
|
||||
|
||||
event = {
|
||||
"operator": operator,
|
||||
"constraint": constraint,
|
||||
"decision": decision,
|
||||
"timestamp": _now_iso(),
|
||||
}
|
||||
|
||||
state["event"].append(event)
|
||||
state["event"] = state["event"][-MAX_EVENT_ENTRIES:]
|
||||
|
||||
save(stream_id, state)
|
||||
return state
|
||||
|
||||
|
||||
def update_state(stream_id: str,
|
||||
arch: Optional[Dict[str, str]] = None,
|
||||
risk: Optional[Dict[str, str]] = None,
|
||||
mode: Optional[dict] = None) -> dict:
|
||||
"""Overwrite STATE sections. STATE is authoritative snapshot."""
|
||||
state = load(stream_id)
|
||||
if arch is not None:
|
||||
state["state"]["arch"] = arch
|
||||
if risk is not None:
|
||||
state["state"]["risk"] = risk
|
||||
if mode is not None:
|
||||
state["state"]["mode"] = mode
|
||||
save(stream_id, state)
|
||||
return state
|
||||
|
||||
|
||||
# ── Snapshot ────────────────────────────────────────────────────────
|
||||
|
||||
def snapshot(stream_id: str) -> str:
|
||||
"""Return sigil state as plain text for prompt injection."""
|
||||
entries = load(stream_id)
|
||||
if not entries:
|
||||
"""Return compacted JSON string for debug prompt injection.
|
||||
|
||||
Only meaningful when DEBUG_SIGIL is true.
|
||||
"""
|
||||
state = load(stream_id)
|
||||
if not state["event"] and not any(
|
||||
v for v in state["state"]["arch"].values() if v
|
||||
):
|
||||
return "(no sigils)"
|
||||
return "\n".join(f"\u03c3{idx}: {body}" for idx, body in entries)
|
||||
return json.dumps(state, indent=2, ensure_ascii=False)
|
||||
|
||||
|
||||
# ── Deterministic sigil generators ──────────────────────────────────
|
||||
# ── Deterministic event generators ─────────────────────────────────
|
||||
|
||||
# Rule-based patterns that detect sigil-worthy events from messages.
|
||||
# Each rule: (regex_on_content, sigil_body_template)
|
||||
_SIGIL_RULES: List[Tuple[re.Pattern, str]] = [
|
||||
(re.compile(r"fork(?:ed|ing)?\s*(?:to|into|→)\s*(\S+)", re.I),
|
||||
"fork\u2192{0}"),
|
||||
# Each rule: (regex, (operator, constraint, decision))
|
||||
# The decision field may use {0} for first capture group.
|
||||
_EVENT_RULES: List[Tuple[re.Pattern, Tuple[str, str, str]]] = [
|
||||
(re.compile(r"fork(?:ed|ing)?\s*(?:to|into|\u2192)\s*(\S+)", re.I),
|
||||
("fork", "architectural_scope", "{0}")),
|
||||
(re.compile(r"token[_\s]*burn", re.I),
|
||||
"token_burn\u2192compress"),
|
||||
("token_burn", "context_overflow", "compress_first")),
|
||||
(re.compile(r"rewrite[_\s]*impulse", re.I),
|
||||
"rewrite_impulse\u2192layer"),
|
||||
("rewrite_impulse", "determinism_requirement", "layering_not_rewrite")),
|
||||
(re.compile(r"context[_\s]*sov(?:ereignty)?", re.I),
|
||||
"context_sov\u2192active"),
|
||||
("context_sov", "sovereignty_active", "enforce")),
|
||||
(re.compile(r"budget[_\s]*cap(?:ped)?", re.I),
|
||||
"budget\u2192capped"),
|
||||
("budget_cap", "cost_constraint", "enforce_limit")),
|
||||
(re.compile(r"rate[_\s]*limit", re.I),
|
||||
"rate_limit\u2192detected"),
|
||||
("rate_limit", "cooldown_active", "fallback_model")),
|
||||
(re.compile(r"provider[_\s]*switch(?:ed)?", re.I),
|
||||
"provider\u2192switched"),
|
||||
("provider_switch", "availability", "route_alternate")),
|
||||
(re.compile(r"compaction[_\s]*trigger", re.I),
|
||||
"compaction\u2192triggered"),
|
||||
("compaction", "history_overflow", "compact_now")),
|
||||
(re.compile(r"thin[_\s]*wrapper", re.I),
|
||||
("fork", "architectural_scope", "thin_wrapper")),
|
||||
(re.compile(r"rule[_\s]*based[_\s]*compress", re.I),
|
||||
("compression", "method_selection", "rule_based_v1")),
|
||||
]
|
||||
|
||||
|
||||
def generate_from_message(content: str) -> Optional[str]:
|
||||
"""Apply deterministic rules to a message. Returns sigil body or None."""
|
||||
for pattern, template in _SIGIL_RULES:
|
||||
def generate_from_message(content: str) -> Optional[Tuple[str, str, str]]:
|
||||
"""Apply deterministic rules to message content.
|
||||
|
||||
Returns (operator, constraint, decision) triple or None.
|
||||
"""
|
||||
for pattern, (op, constraint, decision) in _EVENT_RULES:
|
||||
m = pattern.search(content)
|
||||
if m:
|
||||
# Fill template with captured groups if any
|
||||
try:
|
||||
return template.format(*m.groups())
|
||||
filled_decision = decision.format(*m.groups())
|
||||
except (IndexError, KeyError):
|
||||
return template
|
||||
filled_decision = decision
|
||||
return (op, constraint, filled_decision)
|
||||
return None
|
||||
|
||||
@ -1,5 +1,6 @@
|
||||
"""Tests for ra2.context_engine"""
|
||||
|
||||
import json
|
||||
import pytest
|
||||
from ra2 import ledger, sigil, token_gate
|
||||
from ra2.context_engine import build_context
|
||||
@ -10,6 +11,8 @@ def tmp_storage(monkeypatch, tmp_path):
|
||||
"""Redirect all storage to temp directories."""
|
||||
monkeypatch.setattr(ledger, "LEDGER_DIR", str(tmp_path / "ledgers"))
|
||||
monkeypatch.setattr(sigil, "SIGIL_DIR", str(tmp_path / "sigils"))
|
||||
# Default: sigil hidden from prompt
|
||||
monkeypatch.setattr(sigil, "DEBUG_SIGIL", False)
|
||||
|
||||
|
||||
class TestBuildContext:
|
||||
@ -24,16 +27,35 @@ class TestBuildContext:
|
||||
assert isinstance(result["prompt"], str)
|
||||
assert isinstance(result["token_estimate"], int)
|
||||
|
||||
def test_prompt_structure(self):
|
||||
def test_prompt_structure_default(self):
|
||||
messages = [
|
||||
{"role": "user", "content": "Let's build a context engine"},
|
||||
]
|
||||
result = build_context("s1", messages)
|
||||
prompt = result["prompt"]
|
||||
assert "=== LEDGER ===" in prompt
|
||||
assert "=== SIGIL ===" in prompt
|
||||
assert "=== LIVE WINDOW ===" in prompt
|
||||
assert "Respond concisely" in prompt
|
||||
# Sigil should NOT appear by default
|
||||
assert "INTERNAL SIGIL SNAPSHOT" not in prompt
|
||||
|
||||
def test_sigil_hidden_by_default(self):
|
||||
messages = [
|
||||
{"role": "user", "content": "We forked to context_sov"},
|
||||
]
|
||||
result = build_context("s1", messages)
|
||||
# Event should be recorded in JSON but not in prompt
|
||||
state = sigil.load("s1")
|
||||
assert len(state["event"]) > 0
|
||||
assert "INTERNAL SIGIL SNAPSHOT" not in result["prompt"]
|
||||
|
||||
def test_sigil_shown_when_debug(self, monkeypatch):
|
||||
monkeypatch.setattr(sigil, "DEBUG_SIGIL", True)
|
||||
messages = [
|
||||
{"role": "user", "content": "We forked to context_sov"},
|
||||
]
|
||||
result = build_context("s1", messages)
|
||||
assert "=== INTERNAL SIGIL SNAPSHOT ===" in result["prompt"]
|
||||
|
||||
def test_live_window_content(self):
|
||||
messages = [
|
||||
@ -59,7 +81,6 @@ class TestBuildContext:
|
||||
]
|
||||
build_context("s1", messages)
|
||||
data = ledger.load("s1")
|
||||
# Compression should have extracted decisions into delta
|
||||
assert data["delta"] != ""
|
||||
|
||||
def test_compression_detects_blockers(self):
|
||||
@ -78,13 +99,24 @@ class TestBuildContext:
|
||||
data = ledger.load("s1")
|
||||
assert len(data["open"]) > 0
|
||||
|
||||
def test_sigil_generation(self):
|
||||
def test_sigil_event_generation(self):
|
||||
messages = [
|
||||
{"role": "user", "content": "We forked to context_sov"},
|
||||
]
|
||||
build_context("s1", messages)
|
||||
entries = sigil.load("s1")
|
||||
assert len(entries) > 0
|
||||
state = sigil.load("s1")
|
||||
assert len(state["event"]) > 0
|
||||
assert state["event"][0]["operator"] == "fork"
|
||||
|
||||
def test_sigil_dedup_across_calls(self):
|
||||
messages = [
|
||||
{"role": "user", "content": "We forked to context_sov"},
|
||||
]
|
||||
build_context("s1", messages)
|
||||
build_context("s1", messages)
|
||||
state = sigil.load("s1")
|
||||
# Same triple should not be duplicated
|
||||
assert len(state["event"]) == 1
|
||||
|
||||
def test_token_estimate_positive(self):
|
||||
messages = [{"role": "user", "content": "hello"}]
|
||||
@ -92,24 +124,18 @@ class TestBuildContext:
|
||||
assert result["token_estimate"] > 0
|
||||
|
||||
def test_window_shrinks_on_large_input(self, monkeypatch):
|
||||
# Set a very low token cap
|
||||
monkeypatch.setattr(token_gate, "MAX_TOKENS", 200)
|
||||
monkeypatch.setattr(token_gate, "LIVE_WINDOW", 16)
|
||||
|
||||
# Create many messages to exceed budget
|
||||
messages = [
|
||||
{"role": "user", "content": f"This is message number {i} with some content"}
|
||||
for i in range(20)
|
||||
]
|
||||
result = build_context("s1", messages)
|
||||
# Should succeed with a smaller window
|
||||
assert result["token_estimate"] <= 200
|
||||
|
||||
def test_hard_fail_on_impossible_budget(self, monkeypatch):
|
||||
# Set impossibly low token cap
|
||||
monkeypatch.setattr(token_gate, "MAX_TOKENS", 5)
|
||||
monkeypatch.setattr(token_gate, "LIVE_WINDOW", 4)
|
||||
|
||||
messages = [
|
||||
{"role": "user", "content": "x" * 1000},
|
||||
]
|
||||
@ -129,10 +155,24 @@ class TestBuildContext:
|
||||
assert "Hello from structured content" in result["prompt"]
|
||||
|
||||
def test_no_md_history_injection(self):
|
||||
"""Verify that build_context only uses provided messages, never reads .md files."""
|
||||
messages = [{"role": "user", "content": "just this"}]
|
||||
result = build_context("s1", messages)
|
||||
# The prompt should contain only our message content plus ledger/sigil structure
|
||||
assert "just this" in result["prompt"]
|
||||
# No markdown file references should appear
|
||||
assert ".md" not in result["prompt"]
|
||||
|
||||
def test_debug_sigil_snapshot_is_valid_json(self, monkeypatch):
|
||||
monkeypatch.setattr(sigil, "DEBUG_SIGIL", True)
|
||||
messages = [
|
||||
{"role": "user", "content": "We forked to context_sov"},
|
||||
]
|
||||
result = build_context("s1", messages)
|
||||
# Extract the sigil JSON from the prompt
|
||||
prompt = result["prompt"]
|
||||
marker = "=== INTERNAL SIGIL SNAPSHOT ==="
|
||||
assert marker in prompt
|
||||
start = prompt.index(marker) + len(marker)
|
||||
end = prompt.index("=== LEDGER ===")
|
||||
sigil_json = prompt[start:end].strip()
|
||||
data = json.loads(sigil_json)
|
||||
assert "event" in data
|
||||
assert "state" in data
|
||||
|
||||
@ -1,5 +1,6 @@
|
||||
"""Tests for ra2.sigil"""
|
||||
"""Tests for ra2.sigil (JSON layered format)"""
|
||||
|
||||
import json
|
||||
import pytest
|
||||
from ra2 import sigil
|
||||
|
||||
@ -12,80 +13,244 @@ def tmp_sigil_dir(monkeypatch, tmp_path):
|
||||
return d
|
||||
|
||||
|
||||
# ── Load / Save ─────────────────────────────────────────────────────
|
||||
|
||||
class TestLoadSave:
|
||||
def test_load_empty(self):
|
||||
entries = sigil.load("test-stream")
|
||||
assert entries == []
|
||||
state = sigil.load("test-stream")
|
||||
assert state["event"] == []
|
||||
assert state["state"]["arch"]["wrapper"] == ""
|
||||
assert state["state"]["risk"]["token_pressure"] == ""
|
||||
assert state["state"]["mode"]["debug"] is False
|
||||
|
||||
def test_save_and_load(self):
|
||||
entries = [(1, "fork\u2192context_sov"), (2, "token_burn\u2192compress")]
|
||||
sigil.save("s1", entries)
|
||||
def test_save_and_load_roundtrip(self):
|
||||
state = sigil._empty_state()
|
||||
state["event"].append({
|
||||
"operator": "fork",
|
||||
"constraint": "architectural_scope",
|
||||
"decision": "thin_wrapper",
|
||||
"timestamp": "2026-02-19T04:00:00Z",
|
||||
})
|
||||
state["state"]["arch"]["wrapper"] = "thin"
|
||||
sigil.save("s1", state)
|
||||
loaded = sigil.load("s1")
|
||||
assert loaded == entries
|
||||
assert len(loaded["event"]) == 1
|
||||
assert loaded["event"][0]["operator"] == "fork"
|
||||
assert loaded["state"]["arch"]["wrapper"] == "thin"
|
||||
|
||||
def test_save_creates_json_file(self, tmp_sigil_dir):
|
||||
state = sigil._empty_state()
|
||||
sigil.save("s1", state)
|
||||
import os
|
||||
path = os.path.join(tmp_sigil_dir, "s1.json")
|
||||
assert os.path.exists(path)
|
||||
with open(path) as f:
|
||||
data = json.load(f)
|
||||
assert "event" in data
|
||||
assert "state" in data
|
||||
|
||||
def test_fifo_on_save(self):
|
||||
entries = [(i, f"entry-{i}") for i in range(1, 25)]
|
||||
sigil.save("s1", entries)
|
||||
state = sigil._empty_state()
|
||||
for i in range(20):
|
||||
state["event"].append({
|
||||
"operator": f"op_{i}",
|
||||
"constraint": "c",
|
||||
"decision": "d",
|
||||
"timestamp": "2026-01-01T00:00:00Z",
|
||||
})
|
||||
sigil.save("s1", state)
|
||||
loaded = sigil.load("s1")
|
||||
assert len(loaded) == sigil.MAX_ENTRIES
|
||||
assert len(loaded["event"]) == sigil.MAX_EVENT_ENTRIES
|
||||
# Should keep the last 15
|
||||
assert loaded[0][0] == 10
|
||||
assert loaded[-1][0] == 24
|
||||
assert loaded["event"][0]["operator"] == "op_5"
|
||||
assert loaded["event"][-1]["operator"] == "op_19"
|
||||
|
||||
def test_corrupt_file_returns_empty(self, tmp_sigil_dir):
|
||||
import os
|
||||
os.makedirs(tmp_sigil_dir, exist_ok=True)
|
||||
path = os.path.join(tmp_sigil_dir, "bad.json")
|
||||
with open(path, "w") as f:
|
||||
f.write("not valid json{{{")
|
||||
state = sigil.load("bad")
|
||||
assert state["event"] == []
|
||||
assert "arch" in state["state"]
|
||||
|
||||
def test_missing_sections_filled(self, tmp_sigil_dir):
|
||||
import os
|
||||
os.makedirs(tmp_sigil_dir, exist_ok=True)
|
||||
path = os.path.join(tmp_sigil_dir, "partial.json")
|
||||
with open(path, "w") as f:
|
||||
json.dump({"event": [], "state": {}}, f)
|
||||
state = sigil.load("partial")
|
||||
assert "arch" in state["state"]
|
||||
assert "risk" in state["state"]
|
||||
assert "mode" in state["state"]
|
||||
|
||||
|
||||
class TestAppend:
|
||||
# ── append_event ────────────────────────────────────────────────────
|
||||
|
||||
class TestAppendEvent:
|
||||
def test_append_single(self):
|
||||
entries = sigil.append("s1", "fork\u2192ctx")
|
||||
assert len(entries) == 1
|
||||
assert entries[0] == (1, "fork\u2192ctx")
|
||||
state = sigil.append_event("s1", "fork", "arch_scope", "thin_wrapper")
|
||||
assert len(state["event"]) == 1
|
||||
assert state["event"][0]["operator"] == "fork"
|
||||
assert state["event"][0]["constraint"] == "arch_scope"
|
||||
assert state["event"][0]["decision"] == "thin_wrapper"
|
||||
assert "timestamp" in state["event"][0]
|
||||
|
||||
def test_append_multiple(self):
|
||||
sigil.append("s1", "entry-a")
|
||||
entries = sigil.append("s1", "entry-b")
|
||||
assert len(entries) == 2
|
||||
assert entries[0][1] == "entry-a"
|
||||
assert entries[1][1] == "entry-b"
|
||||
sigil.append_event("s1", "fork", "scope", "wrapper")
|
||||
state = sigil.append_event("s1", "token_burn", "overflow", "compress")
|
||||
assert len(state["event"]) == 2
|
||||
|
||||
def test_deduplication(self):
|
||||
sigil.append_event("s1", "fork", "scope", "wrapper")
|
||||
state = sigil.append_event("s1", "fork", "scope", "wrapper")
|
||||
assert len(state["event"]) == 1
|
||||
|
||||
def test_fifo_eviction(self):
|
||||
for i in range(20):
|
||||
entries = sigil.append("s1", f"e-{i}")
|
||||
assert len(entries) == sigil.MAX_ENTRIES
|
||||
# Oldest entries should be gone
|
||||
bodies = [e[1] for e in entries]
|
||||
assert "e-0" not in bodies
|
||||
assert "e-19" in bodies
|
||||
state = sigil.append_event("s1", f"op_{i}", "c", "d")
|
||||
assert len(state["event"]) == sigil.MAX_EVENT_ENTRIES
|
||||
operators = [e["operator"] for e in state["event"]]
|
||||
assert "op_0" not in operators
|
||||
assert "op_19" in operators
|
||||
|
||||
def test_rejects_empty_fields(self):
|
||||
state = sigil.append_event("s1", "", "c", "d")
|
||||
assert len(state["event"]) == 0
|
||||
|
||||
def test_truncates_long_fields(self):
|
||||
long_val = "a" * 100
|
||||
state = sigil.append_event("s1", long_val, "c", "d")
|
||||
assert len(state["event"]) == 1
|
||||
assert len(state["event"][0]["operator"]) <= sigil.MAX_FIELD_CHARS
|
||||
|
||||
|
||||
# ── update_state ────────────────────────────────────────────────────
|
||||
|
||||
class TestUpdateState:
|
||||
def test_update_arch(self):
|
||||
state = sigil.update_state("s1", arch={
|
||||
"wrapper": "thin",
|
||||
"compression": "rule_based_v1",
|
||||
"agents": "disabled",
|
||||
"router": "legacy",
|
||||
})
|
||||
assert state["state"]["arch"]["wrapper"] == "thin"
|
||||
assert state["state"]["arch"]["compression"] == "rule_based_v1"
|
||||
|
||||
def test_update_risk(self):
|
||||
state = sigil.update_state("s1", risk={
|
||||
"token_pressure": "controlled",
|
||||
"cooldown": "monitored",
|
||||
"scope_creep": "constrained",
|
||||
})
|
||||
assert state["state"]["risk"]["token_pressure"] == "controlled"
|
||||
|
||||
def test_update_mode(self):
|
||||
state = sigil.update_state("s1", mode={
|
||||
"determinism": "prioritized",
|
||||
"rewrite_mode": "disabled",
|
||||
"debug": False,
|
||||
})
|
||||
assert state["state"]["mode"]["determinism"] == "prioritized"
|
||||
|
||||
def test_update_overwrites(self):
|
||||
sigil.update_state("s1", arch={"wrapper": "thin"})
|
||||
state = sigil.update_state("s1", arch={"wrapper": "fat"})
|
||||
assert state["state"]["arch"]["wrapper"] == "fat"
|
||||
|
||||
def test_update_preserves_events(self):
|
||||
sigil.append_event("s1", "fork", "scope", "wrapper")
|
||||
state = sigil.update_state("s1", arch={"wrapper": "thin"})
|
||||
assert len(state["event"]) == 1
|
||||
assert state["state"]["arch"]["wrapper"] == "thin"
|
||||
|
||||
def test_partial_update(self):
|
||||
sigil.update_state("s1", arch={"wrapper": "thin"})
|
||||
state = sigil.update_state("s1", risk={"token_pressure": "high"})
|
||||
# arch should still be there
|
||||
assert state["state"]["arch"]["wrapper"] == "thin"
|
||||
assert state["state"]["risk"]["token_pressure"] == "high"
|
||||
|
||||
|
||||
# ── snapshot ────────────────────────────────────────────────────────
|
||||
|
||||
class TestSnapshot:
|
||||
def test_snapshot_empty(self):
|
||||
snap = sigil.snapshot("empty")
|
||||
assert snap == "(no sigils)"
|
||||
|
||||
def test_snapshot_with_entries(self):
|
||||
sigil.append("s1", "fork\u2192context_sov")
|
||||
sigil.append("s1", "token_burn\u2192compress")
|
||||
def test_snapshot_with_events(self):
|
||||
sigil.append_event("s1", "fork", "scope", "wrapper")
|
||||
snap = sigil.snapshot("s1")
|
||||
assert "\u03c31:" in snap
|
||||
assert "fork\u2192context_sov" in snap
|
||||
assert "\u03c32:" in snap
|
||||
assert "token_burn\u2192compress" in snap
|
||||
data = json.loads(snap)
|
||||
assert len(data["event"]) == 1
|
||||
assert data["event"][0]["operator"] == "fork"
|
||||
|
||||
def test_snapshot_with_state(self):
|
||||
sigil.update_state("s1", arch={"wrapper": "thin"})
|
||||
snap = sigil.snapshot("s1")
|
||||
data = json.loads(snap)
|
||||
assert data["state"]["arch"]["wrapper"] == "thin"
|
||||
|
||||
def test_snapshot_is_valid_json(self):
|
||||
sigil.append_event("s1", "fork", "scope", "wrapper")
|
||||
sigil.update_state("s1", arch={"wrapper": "thin"})
|
||||
snap = sigil.snapshot("s1")
|
||||
data = json.loads(snap) # Should not raise
|
||||
assert "event" in data
|
||||
assert "state" in data
|
||||
|
||||
|
||||
# ── generate_from_message ───────────────────────────────────────────
|
||||
|
||||
class TestGenerateFromMessage:
|
||||
def test_fork_detection(self):
|
||||
body = sigil.generate_from_message("We forked to context_sov branch")
|
||||
assert body is not None
|
||||
assert "fork" in body
|
||||
assert "context_sov" in body
|
||||
result = sigil.generate_from_message("We forked to context_sov branch")
|
||||
assert result is not None
|
||||
op, constraint, decision = result
|
||||
assert op == "fork"
|
||||
assert constraint == "architectural_scope"
|
||||
assert "context_sov" in decision
|
||||
|
||||
def test_token_burn_detection(self):
|
||||
body = sigil.generate_from_message("Seeing token burn on this stream")
|
||||
assert body == "token_burn\u2192compress"
|
||||
result = sigil.generate_from_message("Seeing token burn on this stream")
|
||||
assert result == ("token_burn", "context_overflow", "compress_first")
|
||||
|
||||
def test_rate_limit_detection(self):
|
||||
body = sigil.generate_from_message("Hit a rate limit again")
|
||||
assert body == "rate_limit\u2192detected"
|
||||
result = sigil.generate_from_message("Hit a rate limit again")
|
||||
assert result == ("rate_limit", "cooldown_active", "fallback_model")
|
||||
|
||||
def test_thin_wrapper_detection(self):
|
||||
result = sigil.generate_from_message("Use a thin wrapper approach")
|
||||
assert result is not None
|
||||
assert result[2] == "thin_wrapper"
|
||||
|
||||
def test_no_match(self):
|
||||
body = sigil.generate_from_message("Hello, how are you?")
|
||||
assert body is None
|
||||
result = sigil.generate_from_message("Hello, how are you?")
|
||||
assert result is None
|
||||
|
||||
def test_returns_triple(self):
|
||||
result = sigil.generate_from_message("compaction trigger needed")
|
||||
assert result is not None
|
||||
assert len(result) == 3
|
||||
op, constraint, decision = result
|
||||
assert op == "compaction"
|
||||
assert constraint == "history_overflow"
|
||||
assert decision == "compact_now"
|
||||
|
||||
|
||||
# ── File size cap ───────────────────────────────────────────────────
|
||||
|
||||
class TestFileSizeCap:
|
||||
def test_file_respects_size_cap(self, monkeypatch, tmp_sigil_dir):
|
||||
import os
|
||||
# Set a small cap
|
||||
monkeypatch.setattr(sigil, "MAX_FILE_BYTES", 512)
|
||||
for i in range(20):
|
||||
sigil.append_event("s1", f"operator_{i}", "constraint", "decision")
|
||||
path = os.path.join(tmp_sigil_dir, "s1.json")
|
||||
size = os.path.getsize(path)
|
||||
assert size <= 512
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user