151 lines
5.6 KiB
Python
151 lines
5.6 KiB
Python
"""Git utilities for repository operations."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import shlex
|
|
|
|
from deepagents.backends.protocol import ExecuteResponse, SandboxBackendProtocol
|
|
|
|
|
|
def _run_git(
|
|
sandbox_backend: SandboxBackendProtocol, repo_dir: str, command: str
|
|
) -> ExecuteResponse:
|
|
"""Run a git command in the sandbox repo directory."""
|
|
return sandbox_backend.execute(f"cd {repo_dir} && {command}")
|
|
|
|
|
|
def is_valid_git_repo(sandbox_backend: SandboxBackendProtocol, repo_dir: str) -> bool:
|
|
"""Check if directory is a valid git repository."""
|
|
git_dir = f"{repo_dir}/.git"
|
|
safe_git_dir = shlex.quote(git_dir)
|
|
result = sandbox_backend.execute(f"test -d {safe_git_dir} && echo exists")
|
|
return result.exit_code == 0 and "exists" in result.output
|
|
|
|
|
|
def remove_directory(sandbox_backend: SandboxBackendProtocol, repo_dir: str) -> bool:
|
|
"""Remove a directory and all its contents."""
|
|
safe_repo_dir = shlex.quote(repo_dir)
|
|
result = sandbox_backend.execute(f"rm -rf {safe_repo_dir}")
|
|
return result.exit_code == 0
|
|
|
|
|
|
def git_has_uncommitted_changes(sandbox_backend: SandboxBackendProtocol, repo_dir: str) -> bool:
|
|
"""Check whether the repo has uncommitted changes."""
|
|
result = _run_git(sandbox_backend, repo_dir, "git status --porcelain")
|
|
return result.exit_code == 0 and bool(result.output.strip())
|
|
|
|
|
|
def git_fetch_origin(sandbox_backend: SandboxBackendProtocol, repo_dir: str) -> ExecuteResponse:
|
|
"""Fetch latest from origin (best-effort)."""
|
|
return _run_git(sandbox_backend, repo_dir, "git fetch origin 2>/dev/null || true")
|
|
|
|
|
|
def git_has_unpushed_commits(sandbox_backend: SandboxBackendProtocol, repo_dir: str) -> bool:
|
|
"""Check whether there are commits not pushed to upstream."""
|
|
git_log_cmd = (
|
|
"git log --oneline @{upstream}..HEAD 2>/dev/null "
|
|
"|| git log --oneline origin/HEAD..HEAD 2>/dev/null || echo ''"
|
|
)
|
|
result = _run_git(sandbox_backend, repo_dir, git_log_cmd)
|
|
return result.exit_code == 0 and bool(result.output.strip())
|
|
|
|
|
|
def git_current_branch(sandbox_backend: SandboxBackendProtocol, repo_dir: str) -> str:
|
|
"""Get the current git branch name."""
|
|
result = _run_git(sandbox_backend, repo_dir, "git rev-parse --abbrev-ref HEAD")
|
|
return result.output.strip() if result.exit_code == 0 else ""
|
|
|
|
|
|
def git_checkout_branch(
|
|
sandbox_backend: SandboxBackendProtocol, repo_dir: str, branch: str
|
|
) -> bool:
|
|
"""Checkout branch, creating it if needed."""
|
|
safe_branch = shlex.quote(branch)
|
|
checkout_result = _run_git(sandbox_backend, repo_dir, f"git checkout -B {safe_branch}")
|
|
if checkout_result.exit_code == 0:
|
|
return True
|
|
fallback_create = _run_git(sandbox_backend, repo_dir, f"git checkout -b {safe_branch}")
|
|
if fallback_create.exit_code == 0:
|
|
return True
|
|
fallback = _run_git(sandbox_backend, repo_dir, f"git checkout {safe_branch}")
|
|
return fallback.exit_code == 0
|
|
|
|
|
|
def git_config_user(
|
|
sandbox_backend: SandboxBackendProtocol,
|
|
repo_dir: str,
|
|
name: str,
|
|
email: str,
|
|
) -> None:
|
|
"""Configure git user name and email."""
|
|
safe_name = shlex.quote(name)
|
|
safe_email = shlex.quote(email)
|
|
_run_git(sandbox_backend, repo_dir, f"git config user.name {safe_name}")
|
|
_run_git(sandbox_backend, repo_dir, f"git config user.email {safe_email}")
|
|
|
|
|
|
def git_add_all(sandbox_backend: SandboxBackendProtocol, repo_dir: str) -> ExecuteResponse:
|
|
"""Stage all changes."""
|
|
return _run_git(sandbox_backend, repo_dir, "git add -A")
|
|
|
|
|
|
def git_commit(
|
|
sandbox_backend: SandboxBackendProtocol, repo_dir: str, message: str
|
|
) -> ExecuteResponse:
|
|
"""Commit staged changes with the given message."""
|
|
safe_message = shlex.quote(message)
|
|
return _run_git(sandbox_backend, repo_dir, f"git commit -m {safe_message}")
|
|
|
|
|
|
def git_get_remote_url(sandbox_backend: SandboxBackendProtocol, repo_dir: str) -> str | None:
|
|
"""Get the origin remote URL."""
|
|
result = _run_git(sandbox_backend, repo_dir, "git remote get-url origin")
|
|
if result.exit_code != 0:
|
|
return None
|
|
return result.output.strip()
|
|
|
|
|
|
_CRED_FILE_PATH = "/tmp/.git-credentials"
|
|
|
|
|
|
def setup_git_credentials(sandbox_backend: SandboxBackendProtocol, github_token: str) -> None:
|
|
"""Write GitHub credentials to a temporary file using the sandbox write API.
|
|
|
|
The write API sends content in the HTTP body (not via a shell command),
|
|
so the token never appears in shell history or process listings.
|
|
"""
|
|
sandbox_backend.write(_CRED_FILE_PATH, f"https://git:{github_token}@github.com\n")
|
|
sandbox_backend.execute(f"chmod 600 {_CRED_FILE_PATH}")
|
|
|
|
|
|
def cleanup_git_credentials(sandbox_backend: SandboxBackendProtocol) -> None:
|
|
"""Remove the temporary credentials file."""
|
|
sandbox_backend.execute(f"rm -f {_CRED_FILE_PATH}")
|
|
|
|
|
|
def _git_with_credentials(
|
|
sandbox_backend: SandboxBackendProtocol,
|
|
repo_dir: str,
|
|
command: str,
|
|
) -> ExecuteResponse:
|
|
"""Run a git command using the temporary credential file."""
|
|
cred_helper = shlex.quote(f"store --file={_CRED_FILE_PATH}")
|
|
return _run_git(sandbox_backend, repo_dir, f"git -c credential.helper={cred_helper} {command}")
|
|
|
|
|
|
def git_push(
|
|
sandbox_backend: SandboxBackendProtocol,
|
|
repo_dir: str,
|
|
branch: str,
|
|
github_token: str | None = None,
|
|
) -> ExecuteResponse:
|
|
"""Push the branch to origin, using a token if needed."""
|
|
safe_branch = shlex.quote(branch)
|
|
if not github_token:
|
|
return _run_git(sandbox_backend, repo_dir, f"git push origin {safe_branch}")
|
|
setup_git_credentials(sandbox_backend, github_token)
|
|
try:
|
|
return _git_with_credentials(sandbox_backend, repo_dir, f"push origin {safe_branch}")
|
|
finally:
|
|
cleanup_git_credentials(sandbox_backend)
|