galaxis-agent/agent/integrations/docker_sandbox.py

177 lines
6.2 KiB
Python
Raw Permalink Normal View History

"""Docker container sandbox backend.
execute() is synchronous. server.py calls it via loop.run_in_executor().
"""
from __future__ import annotations
import logging
import os
import docker
from deepagents.backends.protocol import ExecuteResponse, FileDownloadResponse, FileUploadResponse
from deepagents.backends.sandbox import BaseSandbox
logger = logging.getLogger(__name__)
class DockerSandbox(BaseSandbox):
"""Docker container-based sandbox implementation.
Extends BaseSandbox, which auto-implements file I/O (read/write/ls/grep)
by delegating to execute(). Only need to implement: id property, execute(),
upload_files(), download_files(), and container lifecycle.
"""
def __init__(
self,
container_id: str | None = None,
*,
image: str | None = None,
network: str = "galaxis-net",
mem_limit: str = "4g",
cpu_count: int = 2,
pids_limit: int = 256,
environment: dict | None = None,
default_timeout: int = 300,
):
self._docker = docker.DockerClient(
base_url=os.environ.get("DOCKER_HOST", "unix:///var/run/docker.sock")
)
self._default_timeout = default_timeout
if container_id:
# Connect to existing container
self._container = self._docker.containers.get(container_id)
else:
# Create new container
resolved_image = image or os.environ.get("SANDBOX_IMAGE", "galaxis-sandbox:latest")
self._container = self._docker.containers.run(
image=resolved_image,
detach=True,
network=network,
mem_limit=mem_limit,
cpu_count=cpu_count,
pids_limit=pids_limit,
environment=environment or {},
labels={"galaxis-agent-sandbox": "true"},
working_dir="/workspace",
)
self._id = self._container.id
@property
def id(self) -> str:
return self._id
def execute(
self,
command: str,
*,
timeout: int | None = None,
) -> ExecuteResponse:
"""Execute a shell command in the container.
Synchronous method - server.py calls via loop.run_in_executor().
"""
effective_timeout = timeout if timeout is not None else self._default_timeout
# Wrap command with timeout if specified
if effective_timeout and effective_timeout > 0:
cmd = ["timeout", str(effective_timeout), "sh", "-c", command]
else:
cmd = ["sh", "-c", command]
# Execute command in container with demux=True to separate stdout/stderr
result = self._container.exec_run(cmd=cmd, demux=True, workdir="/workspace")
# Decode output
stdout = (result.output[0] or b"").decode("utf-8", errors="replace")
stderr = (result.output[1] or b"").decode("utf-8", errors="replace")
output = stdout + stderr
exit_code = result.exit_code
# Handle timeout exit code
if exit_code == 124:
output += f"\n[TIMEOUT] Command timed out after {effective_timeout}s"
return ExecuteResponse(output=output, exit_code=exit_code, truncated=False)
def upload_files(self, files: list[tuple[str, bytes]]) -> list[FileUploadResponse]:
"""Upload multiple files to the sandbox.
Supports partial success - returns errors per-file rather than raising.
"""
responses = []
for path, content in files:
try:
# Use tar to upload file to container
import io
import tarfile
# Create tar archive in memory
tar_stream = io.BytesIO()
tar = tarfile.open(fileobj=tar_stream, mode="w")
# Add file to archive
tarinfo = tarfile.TarInfo(name=os.path.basename(path))
tarinfo.size = len(content)
tar.addfile(tarinfo, io.BytesIO(content))
tar.close()
# Upload to container
tar_stream.seek(0)
self._container.put_archive(os.path.dirname(path) or "/workspace", tar_stream)
responses.append(FileUploadResponse(path=path, error=None))
except Exception as e:
logger.exception("Failed to upload file: %s", path)
responses.append(FileUploadResponse(path=path, error=str(e)))
return responses
def download_files(self, paths: list[str]) -> list[FileDownloadResponse]:
"""Download multiple files from the sandbox.
Supports partial success - returns errors per-file rather than raising.
"""
responses = []
for path in paths:
try:
# Get file from container as tar archive
bits, stat = self._container.get_archive(path)
# Extract content from tar
import io
import tarfile
tar_stream = io.BytesIO(b"".join(bits))
tar = tarfile.open(fileobj=tar_stream)
# Get first file from archive
member = tar.next()
if member:
f = tar.extractfile(member)
if f:
content = f.read()
responses.append(FileDownloadResponse(path=path, content=content, error=None))
else:
responses.append(FileDownloadResponse(path=path, content=None, error="Not a file"))
else:
responses.append(FileDownloadResponse(path=path, content=None, error="Empty archive"))
tar.close()
except Exception as e:
logger.exception("Failed to download file: %s", path)
responses.append(FileDownloadResponse(path=path, content=None, error=str(e)))
return responses
def close(self):
"""Stop and remove the container."""
try:
self._container.stop(timeout=10)
self._container.remove(force=True)
except docker.errors.NotFound:
pass
except Exception:
logger.exception("Failed to remove container: %s", self._id)