"""Main entry point and CLI loop for galaxis-agent.""" # ruff: noqa: E402 # Suppress deprecation warnings from langchain_core (e.g., Pydantic V1 on Python 3.14+) # ruff: noqa: E402 import logging import shlex import warnings logger = logging.getLogger(__name__) from langgraph.config import get_config from langgraph.graph.state import RunnableConfig from langgraph.pregel import Pregel from langgraph_sdk import get_client warnings.filterwarnings("ignore", module="langchain_core._api.deprecation") import asyncio # Suppress Pydantic v1 compatibility warnings from langchain on Python 3.14+ warnings.filterwarnings("ignore", message=".*Pydantic V1.*", category=UserWarning) # Now safe to import agent (which imports LangChain modules) from deepagents import create_deep_agent from deepagents.backends.protocol import SandboxBackendProtocol from .middleware import ( ToolErrorMiddleware, check_message_queue_before_model, ensure_no_empty_msg, open_pr_if_needed, ) from .prompt import construct_system_prompt from .tools import ( commit_and_open_pr, discord_reply, fetch_url, gitea_comment, http_request, ) from .utils.auth import get_gitea_token from .utils.model import make_model from .utils.sandbox import create_sandbox client = get_client() SANDBOX_CREATING = "__creating__" SANDBOX_CREATION_TIMEOUT = 180 SANDBOX_POLL_INTERVAL = 1.0 from .utils.agents_md import read_agents_md_in_sandbox from .utils.git_utils import ( _CRED_FILE_PATH, cleanup_git_credentials, git_has_uncommitted_changes, is_valid_git_repo, remove_directory, setup_git_credentials, ) from .utils.sandbox_paths import aresolve_repo_dir, aresolve_sandbox_work_dir from .utils.sandbox_state import SANDBOX_BACKENDS, get_sandbox_id_from_metadata class SandboxConnectionError(Exception): """Raised when the sandbox connection is lost or unreachable.""" async def _clone_or_pull_repo_in_sandbox( # noqa: PLR0915 sandbox_backend: SandboxBackendProtocol, owner: str, repo: str, gitea_token: str | None = None, ) -> str: """Clone a Gitea repo into the sandbox, or pull if it already exists. Args: sandbox_backend: The sandbox backend to execute commands in owner: Gitea repo owner repo: Gitea repo name gitea_token: Gitea access token Returns: Path to the cloned/updated repo directory """ logger.info("_clone_or_pull_repo_in_sandbox called for %s/%s", owner, repo) loop = asyncio.get_event_loop() token = gitea_token if not token: msg = "No Gitea token provided" logger.error(msg) raise ValueError(msg) work_dir = await aresolve_sandbox_work_dir(sandbox_backend) repo_dir = await aresolve_repo_dir(sandbox_backend, repo) clean_url = f"http://gitea:3000/{owner}/{repo}.git" cred_helper_arg = f"-c credential.helper='store --file={_CRED_FILE_PATH}'" safe_repo_dir = shlex.quote(repo_dir) safe_clean_url = shlex.quote(clean_url) logger.info("Resolved sandbox work dir to %s", work_dir) # Set up git credentials using store file await loop.run_in_executor( None, sandbox_backend.execute, f'echo "http://agent:{token}@gitea:3000" > /tmp/.git-credentials', ) await loop.run_in_executor( None, sandbox_backend.execute, "git config --global credential.helper 'store --file=/tmp/.git-credentials'", ) is_git_repo = await loop.run_in_executor(None, is_valid_git_repo, sandbox_backend, repo_dir) if not is_git_repo: logger.warning("Repo directory missing or not a valid git repo at %s, removing", repo_dir) try: removed = await loop.run_in_executor(None, remove_directory, sandbox_backend, repo_dir) if not removed: msg = f"Failed to remove invalid directory at {repo_dir}" logger.error(msg) raise RuntimeError(msg) logger.info("Removed invalid directory, will clone fresh repo") except Exception: logger.exception("Failed to remove invalid directory") raise else: logger.info("Repo exists at %s, checking for uncommitted changes", repo_dir) has_changes = await loop.run_in_executor( None, git_has_uncommitted_changes, sandbox_backend, repo_dir ) if has_changes: logger.warning("Repo has uncommitted changes at %s, skipping pull", repo_dir) return repo_dir logger.info("Repo is clean, pulling latest changes from %s/%s", owner, repo) try: pull_result = await loop.run_in_executor( None, sandbox_backend.execute, f"cd {repo_dir} && git {cred_helper_arg} pull origin $(git rev-parse --abbrev-ref HEAD)", ) logger.debug("Git pull result: exit_code=%s", pull_result.exit_code) if pull_result.exit_code != 0: logger.warning( "Git pull failed with exit code %s: %s", pull_result.exit_code, pull_result.output[:200] if pull_result.output else "", ) except Exception: logger.exception("Failed to execute git pull") raise finally: await loop.run_in_executor(None, cleanup_git_credentials, sandbox_backend) logger.info("Repo updated at %s", repo_dir) return repo_dir logger.info("Cloning repo %s/%s to %s", owner, repo, repo_dir) try: result = await loop.run_in_executor( None, sandbox_backend.execute, f"git {cred_helper_arg} clone {safe_clean_url} {safe_repo_dir}", ) logger.debug("Git clone result: exit_code=%s", result.exit_code) except Exception: logger.exception("Failed to execute git clone") raise finally: await loop.run_in_executor(None, cleanup_git_credentials, sandbox_backend) if result.exit_code != 0: msg = f"Failed to clone repo {owner}/{repo}: {result.output}" logger.error(msg) raise RuntimeError(msg) logger.info("Repo cloned successfully at %s", repo_dir) return repo_dir async def _recreate_sandbox( thread_id: str, repo_owner: str, repo_name: str, *, gitea_token: str | None, ) -> tuple[SandboxBackendProtocol, str]: """Recreate a sandbox and clone the repo after a connection failure.""" SANDBOX_BACKENDS.pop(thread_id, None) await client.threads.update( thread_id=thread_id, metadata={"sandbox_id": SANDBOX_CREATING}, ) try: sandbox_backend = await asyncio.to_thread(create_sandbox) repo_dir = await _clone_or_pull_repo_in_sandbox( sandbox_backend, repo_owner, repo_name, gitea_token ) except Exception: logger.exception("Failed to recreate sandbox after connection failure") await client.threads.update(thread_id=thread_id, metadata={"sandbox_id": None}) raise return sandbox_backend, repo_dir async def _wait_for_sandbox_id(thread_id: str) -> str: """Wait for sandbox_id to be set in thread metadata.""" elapsed = 0.0 while elapsed < SANDBOX_CREATION_TIMEOUT: sandbox_id = await get_sandbox_id_from_metadata(thread_id) if sandbox_id is not None and sandbox_id != SANDBOX_CREATING: return sandbox_id await asyncio.sleep(SANDBOX_POLL_INTERVAL) elapsed += SANDBOX_POLL_INTERVAL msg = f"Timeout waiting for sandbox creation for thread {thread_id}" raise TimeoutError(msg) def graph_loaded_for_execution(config: RunnableConfig) -> bool: """Check if the graph is loaded for actual execution vs introspection.""" return ( config["configurable"].get("__is_for_execution__", False) if "configurable" in config else False ) DEFAULT_RECURSION_LIMIT = 1_000 async def get_agent(config: RunnableConfig) -> Pregel: # noqa: PLR0915 """Get or create an agent with a sandbox for the given thread.""" thread_id = config["configurable"].get("thread_id", None) config["recursion_limit"] = DEFAULT_RECURSION_LIMIT repo_config = config["configurable"].get("repo", {}) repo_owner = repo_config.get("owner") repo_name = repo_config.get("name") if thread_id is None or not graph_loaded_for_execution(config): logger.info("No thread_id or not for execution, returning agent without sandbox") return create_deep_agent( system_prompt="", tools=[], ).with_config(config) gitea_token = await get_gitea_token() config["metadata"]["gitea_token"] = gitea_token sandbox_backend = SANDBOX_BACKENDS.get(thread_id) sandbox_id = await get_sandbox_id_from_metadata(thread_id) if sandbox_id == SANDBOX_CREATING and not sandbox_backend: logger.info("Sandbox creation in progress, waiting...") sandbox_id = await _wait_for_sandbox_id(thread_id) if sandbox_backend: logger.info("Using cached sandbox backend for thread %s", thread_id) metadata = get_config().get("metadata", {}) repo_dir = metadata.get("repo_dir") if repo_owner and repo_name: logger.info("Pulling latest changes for repo %s/%s", repo_owner, repo_name) try: repo_dir = await _clone_or_pull_repo_in_sandbox( sandbox_backend, repo_owner, repo_name, gitea_token ) except SandboxConnectionError: logger.warning( "Cached sandbox is no longer reachable for thread %s, recreating sandbox", thread_id, ) sandbox_backend, repo_dir = await _recreate_sandbox( thread_id, repo_owner, repo_name, gitea_token=gitea_token ) except Exception: logger.exception("Failed to pull repo in cached sandbox") raise elif sandbox_id is None: logger.info("Creating new sandbox for thread %s", thread_id) await client.threads.update(thread_id=thread_id, metadata={"sandbox_id": SANDBOX_CREATING}) try: # Create sandbox without context manager cleanup (sandbox persists) sandbox_backend = await asyncio.to_thread(create_sandbox) logger.info("Sandbox created: %s", sandbox_backend.id) repo_dir = None if repo_owner and repo_name: logger.info("Cloning repo %s/%s into sandbox", repo_owner, repo_name) repo_dir = await _clone_or_pull_repo_in_sandbox( sandbox_backend, repo_owner, repo_name, gitea_token ) logger.info("Repo cloned to %s", repo_dir) await client.threads.update( thread_id=thread_id, metadata={"repo_dir": repo_dir}, ) except Exception: logger.exception("Failed to create sandbox or clone repo") try: await client.threads.update(thread_id=thread_id, metadata={"sandbox_id": None}) logger.info("Reset sandbox_id to None for thread %s", thread_id) except Exception: logger.exception("Failed to reset sandbox_id metadata") raise else: logger.info("Connecting to existing sandbox %s", sandbox_id) try: # Connect to existing sandbox without context manager cleanup sandbox_backend = await asyncio.to_thread(create_sandbox, sandbox_id) logger.info("Connected to existing sandbox %s", sandbox_id) except Exception: logger.warning("Failed to connect to existing sandbox %s, creating new one", sandbox_id) # Reset sandbox_id and create a new sandbox await client.threads.update( thread_id=thread_id, metadata={"sandbox_id": SANDBOX_CREATING}, ) try: sandbox_backend = await asyncio.to_thread(create_sandbox) logger.info("New sandbox created: %s", sandbox_backend.id) except Exception: logger.exception("Failed to create replacement sandbox") await client.threads.update(thread_id=thread_id, metadata={"sandbox_id": None}) raise metadata = get_config().get("metadata", {}) repo_dir = metadata.get("repo_dir") if repo_owner and repo_name: logger.info("Pulling latest changes for repo %s/%s", repo_owner, repo_name) try: repo_dir = await _clone_or_pull_repo_in_sandbox( sandbox_backend, repo_owner, repo_name, gitea_token ) except SandboxConnectionError: logger.warning( "Existing sandbox is no longer reachable for thread %s, recreating sandbox", thread_id, ) sandbox_backend, repo_dir = await _recreate_sandbox( thread_id, repo_owner, repo_name, gitea_token=gitea_token ) except Exception: logger.exception("Failed to pull repo in existing sandbox") raise SANDBOX_BACKENDS[thread_id] = sandbox_backend if not repo_dir: msg = "Cannot proceed: no repo was cloned. Set 'repo.owner' and 'repo.name' in the configurable config" raise RuntimeError(msg) agents_md = await read_agents_md_in_sandbox(sandbox_backend, repo_dir) logger.info("Returning agent with sandbox for thread %s", thread_id) return create_deep_agent( model=make_model("anthropic:claude-opus-4-6", temperature=0, max_tokens=20_000), system_prompt=construct_system_prompt( repo_dir, agents_md=agents_md, ), tools=[ http_request, fetch_url, commit_and_open_pr, gitea_comment, discord_reply, ], backend=sandbox_backend, middleware=[ ToolErrorMiddleware(), check_message_queue_before_model, ensure_no_empty_msg, open_pr_if_needed, ], ).with_config(config)