"""Docker container sandbox backend. execute() is synchronous. server.py calls it via loop.run_in_executor(). """ from __future__ import annotations import logging import os import docker from deepagents.backends.protocol import ExecuteResponse, FileDownloadResponse, FileUploadResponse from deepagents.backends.sandbox import BaseSandbox logger = logging.getLogger(__name__) class DockerSandbox(BaseSandbox): """Docker container-based sandbox implementation. Extends BaseSandbox, which auto-implements file I/O (read/write/ls/grep) by delegating to execute(). Only need to implement: id property, execute(), upload_files(), download_files(), and container lifecycle. """ def __init__( self, container_id: str | None = None, *, image: str | None = None, network: str = "galaxis-net", mem_limit: str = "4g", cpu_count: int = 2, pids_limit: int = 256, environment: dict | None = None, default_timeout: int = 300, ): self._docker = docker.DockerClient( base_url=os.environ.get("DOCKER_HOST", "unix:///var/run/docker.sock") ) self._default_timeout = default_timeout if container_id: # Connect to existing container self._container = self._docker.containers.get(container_id) else: # Create new container resolved_image = image or os.environ.get("SANDBOX_IMAGE", "galaxis-sandbox:latest") self._container = self._docker.containers.run( image=resolved_image, detach=True, network=network, mem_limit=mem_limit, cpu_count=cpu_count, pids_limit=pids_limit, environment=environment or {}, labels={"galaxis-agent-sandbox": "true"}, working_dir="/workspace", ) self._id = self._container.id @property def id(self) -> str: return self._id def execute( self, command: str, *, timeout: int | None = None, ) -> ExecuteResponse: """Execute a shell command in the container. Synchronous method - server.py calls via loop.run_in_executor(). """ effective_timeout = timeout if timeout is not None else self._default_timeout # Wrap command with timeout if specified if effective_timeout and effective_timeout > 0: cmd = ["timeout", str(effective_timeout), "sh", "-c", command] else: cmd = ["sh", "-c", command] # Execute command in container with demux=True to separate stdout/stderr result = self._container.exec_run(cmd=cmd, demux=True, workdir="/workspace") # Decode output stdout = (result.output[0] or b"").decode("utf-8", errors="replace") stderr = (result.output[1] or b"").decode("utf-8", errors="replace") output = stdout + stderr exit_code = result.exit_code # Handle timeout exit code if exit_code == 124: output += f"\n[TIMEOUT] Command timed out after {effective_timeout}s" return ExecuteResponse(output=output, exit_code=exit_code, truncated=False) def upload_files(self, files: list[tuple[str, bytes]]) -> list[FileUploadResponse]: """Upload multiple files to the sandbox. Supports partial success - returns errors per-file rather than raising. """ responses = [] for path, content in files: try: # Use tar to upload file to container import io import tarfile # Create tar archive in memory tar_stream = io.BytesIO() tar = tarfile.open(fileobj=tar_stream, mode="w") # Add file to archive tarinfo = tarfile.TarInfo(name=os.path.basename(path)) tarinfo.size = len(content) tar.addfile(tarinfo, io.BytesIO(content)) tar.close() # Upload to container tar_stream.seek(0) self._container.put_archive(os.path.dirname(path) or "/workspace", tar_stream) responses.append(FileUploadResponse(path=path, error=None)) except Exception as e: logger.exception("Failed to upload file: %s", path) responses.append(FileUploadResponse(path=path, error=str(e))) return responses def download_files(self, paths: list[str]) -> list[FileDownloadResponse]: """Download multiple files from the sandbox. Supports partial success - returns errors per-file rather than raising. """ responses = [] for path in paths: try: # Get file from container as tar archive bits, stat = self._container.get_archive(path) # Extract content from tar import io import tarfile tar_stream = io.BytesIO(b"".join(bits)) tar = tarfile.open(fileobj=tar_stream) # Get first file from archive member = tar.next() if member: f = tar.extractfile(member) if f: content = f.read() responses.append(FileDownloadResponse(path=path, content=content, error=None)) else: responses.append(FileDownloadResponse(path=path, content=None, error="Not a file")) else: responses.append(FileDownloadResponse(path=path, content=None, error="Empty archive")) tar.close() except Exception as e: logger.exception("Failed to download file: %s", path) responses.append(FileDownloadResponse(path=path, content=None, error=str(e))) return responses def close(self): """Stop and remove the container.""" try: self._container.stop(timeout=10) self._container.remove(force=True) except docker.errors.NotFound: pass except Exception: logger.exception("Failed to remove container: %s", self._id)