galaxis-agent/agent/server.py

390 lines
14 KiB
Python

"""Main entry point and CLI loop for galaxis-agent."""
# ruff: noqa: E402
# Suppress deprecation warnings from langchain_core (e.g., Pydantic V1 on Python 3.14+)
# ruff: noqa: E402
import logging
import shlex
import warnings
logger = logging.getLogger(__name__)
from langgraph.config import get_config
from langgraph.graph.state import RunnableConfig
from langgraph.pregel import Pregel
from langgraph_sdk import get_client
warnings.filterwarnings("ignore", module="langchain_core._api.deprecation")
import asyncio
# Suppress Pydantic v1 compatibility warnings from langchain on Python 3.14+
warnings.filterwarnings("ignore", message=".*Pydantic V1.*", category=UserWarning)
# Now safe to import agent (which imports LangChain modules)
from deepagents import create_deep_agent
from deepagents.backends.protocol import SandboxBackendProtocol
from .middleware import (
ToolErrorMiddleware,
check_message_queue_before_model,
ensure_no_empty_msg,
open_pr_if_needed,
)
from .prompt import construct_system_prompt
from .tools import (
commit_and_open_pr,
discord_reply,
fetch_url,
gitea_comment,
http_request,
)
from .utils.auth import get_gitea_token
from .utils.model import make_model
from .utils.sandbox import create_sandbox
client = get_client()
SANDBOX_CREATING = "__creating__"
SANDBOX_CREATION_TIMEOUT = 180
SANDBOX_POLL_INTERVAL = 1.0
from .utils.agents_md import read_agents_md_in_sandbox
from .utils.git_utils import (
_CRED_FILE_PATH,
cleanup_git_credentials,
git_has_uncommitted_changes,
is_valid_git_repo,
remove_directory,
setup_git_credentials,
)
from .utils.sandbox_paths import aresolve_repo_dir, aresolve_sandbox_work_dir
from .utils.sandbox_state import SANDBOX_BACKENDS, get_sandbox_id_from_metadata
class SandboxConnectionError(Exception):
"""Raised when the sandbox connection is lost or unreachable."""
async def _clone_or_pull_repo_in_sandbox( # noqa: PLR0915
sandbox_backend: SandboxBackendProtocol,
owner: str,
repo: str,
gitea_token: str | None = None,
) -> str:
"""Clone a Gitea repo into the sandbox, or pull if it already exists.
Args:
sandbox_backend: The sandbox backend to execute commands in
owner: Gitea repo owner
repo: Gitea repo name
gitea_token: Gitea access token
Returns:
Path to the cloned/updated repo directory
"""
logger.info("_clone_or_pull_repo_in_sandbox called for %s/%s", owner, repo)
loop = asyncio.get_event_loop()
token = gitea_token
if not token:
msg = "No Gitea token provided"
logger.error(msg)
raise ValueError(msg)
work_dir = await aresolve_sandbox_work_dir(sandbox_backend)
repo_dir = await aresolve_repo_dir(sandbox_backend, repo)
clean_url = f"http://gitea:3000/{owner}/{repo}.git"
cred_helper_arg = f"-c credential.helper='store --file={_CRED_FILE_PATH}'"
safe_repo_dir = shlex.quote(repo_dir)
safe_clean_url = shlex.quote(clean_url)
logger.info("Resolved sandbox work dir to %s", work_dir)
# Set up git credentials using store file
await loop.run_in_executor(
None,
sandbox_backend.execute,
f'echo "http://agent:{token}@gitea:3000" > /tmp/.git-credentials',
)
await loop.run_in_executor(
None,
sandbox_backend.execute,
"git config --global credential.helper 'store --file=/tmp/.git-credentials'",
)
is_git_repo = await loop.run_in_executor(None, is_valid_git_repo, sandbox_backend, repo_dir)
if not is_git_repo:
logger.warning("Repo directory missing or not a valid git repo at %s, removing", repo_dir)
try:
removed = await loop.run_in_executor(None, remove_directory, sandbox_backend, repo_dir)
if not removed:
msg = f"Failed to remove invalid directory at {repo_dir}"
logger.error(msg)
raise RuntimeError(msg)
logger.info("Removed invalid directory, will clone fresh repo")
except Exception:
logger.exception("Failed to remove invalid directory")
raise
else:
logger.info("Repo exists at %s, checking for uncommitted changes", repo_dir)
has_changes = await loop.run_in_executor(
None, git_has_uncommitted_changes, sandbox_backend, repo_dir
)
if has_changes:
logger.warning("Repo has uncommitted changes at %s, skipping pull", repo_dir)
return repo_dir
logger.info("Repo is clean, pulling latest changes from %s/%s", owner, repo)
try:
pull_result = await loop.run_in_executor(
None,
sandbox_backend.execute,
f"cd {repo_dir} && git {cred_helper_arg} pull origin $(git rev-parse --abbrev-ref HEAD)",
)
logger.debug("Git pull result: exit_code=%s", pull_result.exit_code)
if pull_result.exit_code != 0:
logger.warning(
"Git pull failed with exit code %s: %s",
pull_result.exit_code,
pull_result.output[:200] if pull_result.output else "",
)
except Exception:
logger.exception("Failed to execute git pull")
raise
finally:
await loop.run_in_executor(None, cleanup_git_credentials, sandbox_backend)
logger.info("Repo updated at %s", repo_dir)
return repo_dir
logger.info("Cloning repo %s/%s to %s", owner, repo, repo_dir)
try:
result = await loop.run_in_executor(
None,
sandbox_backend.execute,
f"git {cred_helper_arg} clone {safe_clean_url} {safe_repo_dir}",
)
logger.debug("Git clone result: exit_code=%s", result.exit_code)
except Exception:
logger.exception("Failed to execute git clone")
raise
finally:
await loop.run_in_executor(None, cleanup_git_credentials, sandbox_backend)
if result.exit_code != 0:
msg = f"Failed to clone repo {owner}/{repo}: {result.output}"
logger.error(msg)
raise RuntimeError(msg)
logger.info("Repo cloned successfully at %s", repo_dir)
return repo_dir
async def _recreate_sandbox(
thread_id: str,
repo_owner: str,
repo_name: str,
*,
gitea_token: str | None,
) -> tuple[SandboxBackendProtocol, str]:
"""Recreate a sandbox and clone the repo after a connection failure."""
SANDBOX_BACKENDS.pop(thread_id, None)
await client.threads.update(
thread_id=thread_id,
metadata={"sandbox_id": SANDBOX_CREATING},
)
try:
sandbox_backend = await asyncio.to_thread(create_sandbox)
repo_dir = await _clone_or_pull_repo_in_sandbox(
sandbox_backend, repo_owner, repo_name, gitea_token
)
except Exception:
logger.exception("Failed to recreate sandbox after connection failure")
await client.threads.update(thread_id=thread_id, metadata={"sandbox_id": None})
raise
return sandbox_backend, repo_dir
async def _wait_for_sandbox_id(thread_id: str) -> str:
"""Wait for sandbox_id to be set in thread metadata."""
elapsed = 0.0
while elapsed < SANDBOX_CREATION_TIMEOUT:
sandbox_id = await get_sandbox_id_from_metadata(thread_id)
if sandbox_id is not None and sandbox_id != SANDBOX_CREATING:
return sandbox_id
await asyncio.sleep(SANDBOX_POLL_INTERVAL)
elapsed += SANDBOX_POLL_INTERVAL
msg = f"Timeout waiting for sandbox creation for thread {thread_id}"
raise TimeoutError(msg)
def graph_loaded_for_execution(config: RunnableConfig) -> bool:
"""Check if the graph is loaded for actual execution vs introspection."""
return (
config["configurable"].get("__is_for_execution__", False)
if "configurable" in config
else False
)
DEFAULT_RECURSION_LIMIT = 1_000
async def get_agent(config: RunnableConfig) -> Pregel: # noqa: PLR0915
"""Get or create an agent with a sandbox for the given thread."""
thread_id = config["configurable"].get("thread_id", None)
config["recursion_limit"] = DEFAULT_RECURSION_LIMIT
repo_config = config["configurable"].get("repo", {})
repo_owner = repo_config.get("owner")
repo_name = repo_config.get("name")
if thread_id is None or not graph_loaded_for_execution(config):
logger.info("No thread_id or not for execution, returning agent without sandbox")
return create_deep_agent(
system_prompt="",
tools=[],
).with_config(config)
gitea_token = await get_gitea_token()
config["metadata"]["gitea_token"] = gitea_token
sandbox_backend = SANDBOX_BACKENDS.get(thread_id)
sandbox_id = await get_sandbox_id_from_metadata(thread_id)
if sandbox_id == SANDBOX_CREATING and not sandbox_backend:
logger.info("Sandbox creation in progress, waiting...")
sandbox_id = await _wait_for_sandbox_id(thread_id)
if sandbox_backend:
logger.info("Using cached sandbox backend for thread %s", thread_id)
metadata = get_config().get("metadata", {})
repo_dir = metadata.get("repo_dir")
if repo_owner and repo_name:
logger.info("Pulling latest changes for repo %s/%s", repo_owner, repo_name)
try:
repo_dir = await _clone_or_pull_repo_in_sandbox(
sandbox_backend, repo_owner, repo_name, gitea_token
)
except SandboxConnectionError:
logger.warning(
"Cached sandbox is no longer reachable for thread %s, recreating sandbox",
thread_id,
)
sandbox_backend, repo_dir = await _recreate_sandbox(
thread_id, repo_owner, repo_name, gitea_token=gitea_token
)
except Exception:
logger.exception("Failed to pull repo in cached sandbox")
raise
elif sandbox_id is None:
logger.info("Creating new sandbox for thread %s", thread_id)
await client.threads.update(thread_id=thread_id, metadata={"sandbox_id": SANDBOX_CREATING})
try:
# Create sandbox without context manager cleanup (sandbox persists)
sandbox_backend = await asyncio.to_thread(create_sandbox)
logger.info("Sandbox created: %s", sandbox_backend.id)
repo_dir = None
if repo_owner and repo_name:
logger.info("Cloning repo %s/%s into sandbox", repo_owner, repo_name)
repo_dir = await _clone_or_pull_repo_in_sandbox(
sandbox_backend, repo_owner, repo_name, gitea_token
)
logger.info("Repo cloned to %s", repo_dir)
await client.threads.update(
thread_id=thread_id,
metadata={"repo_dir": repo_dir},
)
except Exception:
logger.exception("Failed to create sandbox or clone repo")
try:
await client.threads.update(thread_id=thread_id, metadata={"sandbox_id": None})
logger.info("Reset sandbox_id to None for thread %s", thread_id)
except Exception:
logger.exception("Failed to reset sandbox_id metadata")
raise
else:
logger.info("Connecting to existing sandbox %s", sandbox_id)
try:
# Connect to existing sandbox without context manager cleanup
sandbox_backend = await asyncio.to_thread(create_sandbox, sandbox_id)
logger.info("Connected to existing sandbox %s", sandbox_id)
except Exception:
logger.warning("Failed to connect to existing sandbox %s, creating new one", sandbox_id)
# Reset sandbox_id and create a new sandbox
await client.threads.update(
thread_id=thread_id,
metadata={"sandbox_id": SANDBOX_CREATING},
)
try:
sandbox_backend = await asyncio.to_thread(create_sandbox)
logger.info("New sandbox created: %s", sandbox_backend.id)
except Exception:
logger.exception("Failed to create replacement sandbox")
await client.threads.update(thread_id=thread_id, metadata={"sandbox_id": None})
raise
metadata = get_config().get("metadata", {})
repo_dir = metadata.get("repo_dir")
if repo_owner and repo_name:
logger.info("Pulling latest changes for repo %s/%s", repo_owner, repo_name)
try:
repo_dir = await _clone_or_pull_repo_in_sandbox(
sandbox_backend, repo_owner, repo_name, gitea_token
)
except SandboxConnectionError:
logger.warning(
"Existing sandbox is no longer reachable for thread %s, recreating sandbox",
thread_id,
)
sandbox_backend, repo_dir = await _recreate_sandbox(
thread_id, repo_owner, repo_name, gitea_token=gitea_token
)
except Exception:
logger.exception("Failed to pull repo in existing sandbox")
raise
SANDBOX_BACKENDS[thread_id] = sandbox_backend
if not repo_dir:
msg = "Cannot proceed: no repo was cloned. Set 'repo.owner' and 'repo.name' in the configurable config"
raise RuntimeError(msg)
agents_md = await read_agents_md_in_sandbox(sandbox_backend, repo_dir)
logger.info("Returning agent with sandbox for thread %s", thread_id)
return create_deep_agent(
model=make_model("anthropic:claude-opus-4-6", temperature=0, max_tokens=20_000),
system_prompt=construct_system_prompt(
repo_dir,
agents_md=agents_md,
),
tools=[
http_request,
fetch_url,
commit_and_open_pr,
gitea_comment,
discord_reply,
],
backend=sandbox_backend,
middleware=[
ToolErrorMiddleware(),
check_message_queue_before_model,
ensure_no_empty_msg,
open_pr_if_needed,
],
).with_config(config)