feat: implement DockerSandbox with docker-py container management

Implement DockerSandbox extending deepagents' BaseSandbox to manage Docker
containers via docker-py. This completes Task 1 of Phase 2.

Key implementation details:
- Extends BaseSandbox which auto-implements file I/O (read/write/ls/grep)
  by delegating to execute()
- Synchronous execute() method called via loop.run_in_executor() by server.py
- Container lifecycle management (create/connect/close)
- Upload/download file support via tar archives
- Configurable resource limits (memory, CPU, PIDs)
- Timeout support with proper exit code handling
- Environment variable configuration via create_sandbox() factory

Tests:
- 6 new tests covering container creation, command execution, and cleanup
- All 46 tests passing (40 existing + 6 new)
This commit is contained in:
머니페니 2026-03-20 16:07:27 +09:00
parent bb2a47157e
commit 5d44c2e7e2
3 changed files with 283 additions and 11 deletions

View File

@ -1,15 +1,176 @@
"""Docker container-based sandbox backend. Phase 2 implementation.""" """Docker container sandbox backend.
execute() is synchronous. server.py calls it via loop.run_in_executor().
"""
from __future__ import annotations
import logging
import os
import docker
from deepagents.backends.protocol import ExecuteResponse, FileDownloadResponse, FileUploadResponse
from deepagents.backends.sandbox import BaseSandbox
logger = logging.getLogger(__name__)
class DockerSandbox: class DockerSandbox(BaseSandbox):
async def execute(self, command: str, timeout: int = 300): """Docker container-based sandbox implementation.
raise NotImplementedError("Phase 2")
async def read_file(self, path: str) -> str: Extends BaseSandbox, which auto-implements file I/O (read/write/ls/grep)
raise NotImplementedError("Phase 2") by delegating to execute(). Only need to implement: id property, execute(),
upload_files(), download_files(), and container lifecycle.
"""
async def write_file(self, path: str, content: str) -> None: def __init__(
raise NotImplementedError("Phase 2") self,
container_id: str | None = None,
*,
image: str | None = None,
network: str = "galaxis-net",
mem_limit: str = "4g",
cpu_count: int = 2,
pids_limit: int = 256,
environment: dict | None = None,
default_timeout: int = 300,
):
self._docker = docker.DockerClient(
base_url=os.environ.get("DOCKER_HOST", "unix:///var/run/docker.sock")
)
self._default_timeout = default_timeout
async def close(self) -> None: if container_id:
raise NotImplementedError("Phase 2") # Connect to existing container
self._container = self._docker.containers.get(container_id)
else:
# Create new container
resolved_image = image or os.environ.get("SANDBOX_IMAGE", "galaxis-sandbox:latest")
self._container = self._docker.containers.run(
image=resolved_image,
detach=True,
network=network,
mem_limit=mem_limit,
cpu_count=cpu_count,
pids_limit=pids_limit,
environment=environment or {},
labels={"galaxis-agent-sandbox": "true"},
working_dir="/workspace",
)
self._id = self._container.id
@property
def id(self) -> str:
return self._id
def execute(
self,
command: str,
*,
timeout: int | None = None,
) -> ExecuteResponse:
"""Execute a shell command in the container.
Synchronous method - server.py calls via loop.run_in_executor().
"""
effective_timeout = timeout if timeout is not None else self._default_timeout
# Wrap command with timeout if specified
if effective_timeout and effective_timeout > 0:
cmd = ["timeout", str(effective_timeout), "sh", "-c", command]
else:
cmd = ["sh", "-c", command]
# Execute command in container with demux=True to separate stdout/stderr
result = self._container.exec_run(cmd=cmd, demux=True, workdir="/workspace")
# Decode output
stdout = (result.output[0] or b"").decode("utf-8", errors="replace")
stderr = (result.output[1] or b"").decode("utf-8", errors="replace")
output = stdout + stderr
exit_code = result.exit_code
# Handle timeout exit code
if exit_code == 124:
output += f"\n[TIMEOUT] Command timed out after {effective_timeout}s"
return ExecuteResponse(output=output, exit_code=exit_code, truncated=False)
def upload_files(self, files: list[tuple[str, bytes]]) -> list[FileUploadResponse]:
"""Upload multiple files to the sandbox.
Supports partial success - returns errors per-file rather than raising.
"""
responses = []
for path, content in files:
try:
# Use tar to upload file to container
import io
import tarfile
# Create tar archive in memory
tar_stream = io.BytesIO()
tar = tarfile.open(fileobj=tar_stream, mode="w")
# Add file to archive
tarinfo = tarfile.TarInfo(name=os.path.basename(path))
tarinfo.size = len(content)
tar.addfile(tarinfo, io.BytesIO(content))
tar.close()
# Upload to container
tar_stream.seek(0)
self._container.put_archive(os.path.dirname(path) or "/workspace", tar_stream)
responses.append(FileUploadResponse(path=path, error=None))
except Exception as e:
logger.exception("Failed to upload file: %s", path)
responses.append(FileUploadResponse(path=path, error=str(e)))
return responses
def download_files(self, paths: list[str]) -> list[FileDownloadResponse]:
"""Download multiple files from the sandbox.
Supports partial success - returns errors per-file rather than raising.
"""
responses = []
for path in paths:
try:
# Get file from container as tar archive
bits, stat = self._container.get_archive(path)
# Extract content from tar
import io
import tarfile
tar_stream = io.BytesIO(b"".join(bits))
tar = tarfile.open(fileobj=tar_stream)
# Get first file from archive
member = tar.next()
if member:
f = tar.extractfile(member)
if f:
content = f.read()
responses.append(FileDownloadResponse(path=path, content=content, error=None))
else:
responses.append(FileDownloadResponse(path=path, content=None, error="Not a file"))
else:
responses.append(FileDownloadResponse(path=path, content=None, error="Empty archive"))
tar.close()
except Exception as e:
logger.exception("Failed to download file: %s", path)
responses.append(FileDownloadResponse(path=path, content=None, error=str(e)))
return responses
def close(self):
"""Stop and remove the container."""
try:
self._container.stop(timeout=10)
self._container.remove(force=True)
except docker.errors.NotFound:
pass
except Exception:
logger.exception("Failed to remove container: %s", self._id)

View File

@ -1,5 +1,35 @@
import os
from agent.integrations.docker_sandbox import DockerSandbox from agent.integrations.docker_sandbox import DockerSandbox
def create_sandbox(sandbox_id: str | None = None) -> DockerSandbox: def create_sandbox(sandbox_id: str | None = None) -> DockerSandbox:
return DockerSandbox() # Phase 2 implementation """Factory function for creating DockerSandbox instances.
Args:
sandbox_id: Optional container ID to connect to existing container.
If None, creates a new container.
Returns:
DockerSandbox instance configured from environment variables.
"""
# Build environment variables for the container
env = {}
test_db_url = os.environ.get("TEST_DATABASE_URL", "")
if test_db_url:
env["DATABASE_URL"] = test_db_url
# Connect to existing container if ID provided
if sandbox_id:
return DockerSandbox(container_id=sandbox_id)
# Create new container with environment configuration
return DockerSandbox(
image=os.environ.get("SANDBOX_IMAGE", "galaxis-sandbox:latest"),
network=os.environ.get("SANDBOX_NETWORK", "galaxis-net"),
mem_limit=os.environ.get("SANDBOX_MEM_LIMIT", "4g"),
cpu_count=int(os.environ.get("SANDBOX_CPU_COUNT", "2")),
pids_limit=int(os.environ.get("SANDBOX_PIDS_LIMIT", "256")),
environment=env,
default_timeout=int(os.environ.get("SANDBOX_TIMEOUT", "300")),
)

View File

@ -0,0 +1,81 @@
import pytest
from unittest.mock import MagicMock, patch
@pytest.fixture
def mock_docker_client():
client = MagicMock()
container = MagicMock()
container.id = "abc123def456"
container.status = "running"
client.containers.run.return_value = container
client.containers.get.return_value = container
return client, container
def test_sandbox_create_new_container(mock_docker_client):
client, container = mock_docker_client
with patch("agent.integrations.docker_sandbox.docker.DockerClient", return_value=client):
from agent.integrations.docker_sandbox import DockerSandbox
sandbox = DockerSandbox()
assert sandbox.id == "abc123def456"
client.containers.run.assert_called_once()
call_kwargs = client.containers.run.call_args.kwargs
assert call_kwargs["detach"] is True
assert call_kwargs["labels"] == {"galaxis-agent-sandbox": "true"}
def test_sandbox_connect_existing_container(mock_docker_client):
client, container = mock_docker_client
with patch("agent.integrations.docker_sandbox.docker.DockerClient", return_value=client):
from agent.integrations.docker_sandbox import DockerSandbox
sandbox = DockerSandbox(container_id="abc123def456")
assert sandbox.id == "abc123def456"
client.containers.get.assert_called_once_with("abc123def456")
client.containers.run.assert_not_called()
def test_sandbox_execute_success(mock_docker_client):
client, container = mock_docker_client
container.exec_run.return_value = MagicMock(exit_code=0, output=(b"hello world\n", None))
with patch("agent.integrations.docker_sandbox.docker.DockerClient", return_value=client):
from agent.integrations.docker_sandbox import DockerSandbox
sandbox = DockerSandbox(container_id="abc123def456")
result = sandbox.execute("echo hello world")
assert "hello world" in result.output
assert result.exit_code == 0
assert result.truncated is False
def test_sandbox_execute_with_stderr(mock_docker_client):
client, container = mock_docker_client
container.exec_run.return_value = MagicMock(exit_code=1, output=(b"", b"error: not found\n"))
with patch("agent.integrations.docker_sandbox.docker.DockerClient", return_value=client):
from agent.integrations.docker_sandbox import DockerSandbox
sandbox = DockerSandbox(container_id="abc123def456")
result = sandbox.execute("cat missing.txt")
assert "error: not found" in result.output
assert result.exit_code == 1
def test_sandbox_execute_with_timeout(mock_docker_client):
client, container = mock_docker_client
container.exec_run.return_value = MagicMock(exit_code=0, output=(b"ok\n", None))
with patch("agent.integrations.docker_sandbox.docker.DockerClient", return_value=client):
from agent.integrations.docker_sandbox import DockerSandbox
sandbox = DockerSandbox(container_id="abc123def456")
sandbox.execute("sleep 1", timeout=30)
call_args = container.exec_run.call_args
cmd = call_args.kwargs.get("cmd", call_args.args[0] if call_args.args else [])
assert cmd[0] == "timeout"
assert cmd[1] == "30"
def test_sandbox_close(mock_docker_client):
client, container = mock_docker_client
with patch("agent.integrations.docker_sandbox.docker.DockerClient", return_value=client):
from agent.integrations.docker_sandbox import DockerSandbox
sandbox = DockerSandbox(container_id="abc123def456")
sandbox.close()
container.stop.assert_called_once()
container.remove.assert_called_once()