- Add read_repo_instructions() to read both AGENTS.md and CLAUDE.md - Add path_validator.validate_paths() for writable/blocked path enforcement - Add 10 passing tests (test_prompt_loading.py, test_path_validator.py) - All 71 tests pass
73 lines
2.1 KiB
Python
73 lines
2.1 KiB
Python
"""Helpers for reading agent instructions from AGENTS.md."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import logging
|
|
import shlex
|
|
|
|
from deepagents.backends.protocol import SandboxBackendProtocol
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
async def read_agents_md_in_sandbox(
|
|
sandbox_backend: SandboxBackendProtocol,
|
|
repo_dir: str | None,
|
|
) -> str | None:
|
|
"""Read AGENTS.md from the repo root if it exists."""
|
|
if not repo_dir:
|
|
return None
|
|
|
|
safe_agents_path = shlex.quote(f"{repo_dir}/AGENTS.md")
|
|
loop = asyncio.get_event_loop()
|
|
result = await loop.run_in_executor(
|
|
None,
|
|
sandbox_backend.execute,
|
|
f"test -f {safe_agents_path} && cat {safe_agents_path}",
|
|
)
|
|
if result.exit_code != 0:
|
|
logger.debug("AGENTS.md not found at %s", safe_agents_path)
|
|
return None
|
|
content = result.output or ""
|
|
content = content.strip()
|
|
return content or None
|
|
|
|
|
|
async def read_repo_instructions(
|
|
sandbox_backend: SandboxBackendProtocol,
|
|
repo_dir: str,
|
|
) -> str:
|
|
"""AGENTS.md와 CLAUDE.md를 모두 읽어서 프롬프트에 주입할 문자열을 반환한다."""
|
|
sections = []
|
|
|
|
agents_md = await _read_file_if_exists(sandbox_backend, f"{repo_dir}/AGENTS.md")
|
|
if agents_md:
|
|
sections.append(f"## Repository Agent Rules\n{agents_md}")
|
|
|
|
claude_md = await _read_file_if_exists(sandbox_backend, f"{repo_dir}/CLAUDE.md")
|
|
if claude_md:
|
|
sections.append(f"## Project Conventions\n{claude_md}")
|
|
|
|
return "\n\n".join(sections)
|
|
|
|
|
|
async def _read_file_if_exists(
|
|
sandbox_backend: SandboxBackendProtocol,
|
|
file_path: str,
|
|
) -> str | None:
|
|
"""파일이 존재하면 내용을 읽고, 없으면 None을 반환한다."""
|
|
safe_path = shlex.quote(file_path)
|
|
try:
|
|
loop = asyncio.get_event_loop()
|
|
result = await loop.run_in_executor(
|
|
None,
|
|
sandbox_backend.execute,
|
|
f"test -f {safe_path} && cat {safe_path}",
|
|
)
|
|
if result.exit_code == 0 and result.output.strip():
|
|
return result.output.strip()
|
|
except Exception:
|
|
logger.debug("Failed to read %s", file_path)
|
|
return None
|