diff --git a/agent/recovery.py b/agent/recovery.py new file mode 100644 index 0000000..2748044 --- /dev/null +++ b/agent/recovery.py @@ -0,0 +1,90 @@ +"""서버 시작 시 복구 + 좀비 컨테이너 정리.""" +from __future__ import annotations + +import asyncio +import logging +from datetime import datetime, timezone + +from agent.task_queue import PersistentTaskQueue + +logger = logging.getLogger(__name__) + + +async def recover_on_startup(task_queue: PersistentTaskQueue) -> None: + reset_count = await task_queue.reset_running_to_pending() + if reset_count: + logger.info("Recovery: reset %d running task(s) to pending", reset_count) + await _cleanup_zombie_containers() + + +async def _cleanup_zombie_containers() -> int: + try: + import docker + client = docker.from_env() + containers = client.containers.list( + filters={"label": "galaxis-agent-sandbox"}, all=True, + ) + cleaned = 0 + for container in containers: + try: + container.stop(timeout=10) + container.remove(force=True) + cleaned += 1 + logger.info("Recovery: removed zombie container %s", container.name) + except Exception: + logger.warning("Recovery: failed to remove container %s", container.name) + return cleaned + except Exception: + logger.debug("Recovery: Docker not available, skipping container cleanup") + return 0 + + +class ContainerCleaner: + def __init__(self, docker_client=None, max_age_seconds: int = 1200, interval_seconds: int = 1800): + self._docker = docker_client + self._max_age = max_age_seconds + self._interval = interval_seconds + self._running = False + self._task: asyncio.Task | None = None + + async def start(self) -> None: + self._running = True + self._task = asyncio.create_task(self._loop()) + + async def stop(self) -> None: + self._running = False + if self._task: + self._task.cancel() + try: + await self._task + except asyncio.CancelledError: + pass + + async def _loop(self) -> None: + while self._running: + try: + await self.cleanup_once() + except Exception: + logger.exception("ContainerCleaner error") + await asyncio.sleep(self._interval) + + async def cleanup_once(self) -> int: + if not self._docker: + return 0 + now = datetime.now(timezone.utc) + containers = self._docker.containers.list( + filters={"label": "galaxis-agent-sandbox"}, all=True, + ) + removed = 0 + for container in containers: + created_str = container.attrs.get("Created", "") + try: + created = datetime.fromisoformat(created_str.replace("Z", "+00:00")) + age = (now - created).total_seconds() + if age > self._max_age: + container.stop(timeout=10) + container.remove(force=True) + removed += 1 + except Exception: + logger.debug("Failed to check/remove container %s", getattr(container, "name", "unknown")) + return removed diff --git a/agent/task_queue.py b/agent/task_queue.py index 59c8446..a96fa57 100644 --- a/agent/task_queue.py +++ b/agent/task_queue.py @@ -133,6 +133,24 @@ class PersistentTaskQueue: row = await cursor.fetchone() return row["cnt"] > 0 + async def reset_running_to_pending(self) -> int: + """running 상태 작업을 pending으로 리셋한다 (복구용). + + Returns: + 리셋된 작업 수. + """ + cursor = await self._db.execute( + "SELECT COUNT(*) as cnt FROM tasks WHERE status = 'running'" + ) + row = await cursor.fetchone() + count = row["cnt"] + if count: + await self._db.execute( + "UPDATE tasks SET status = 'pending', started_at = NULL WHERE status = 'running'" + ) + await self._db.commit() + return count + # 지연 초기화 싱글턴 _queue: PersistentTaskQueue | None = None diff --git a/tests/test_recovery.py b/tests/test_recovery.py new file mode 100644 index 0000000..e4a2e7b --- /dev/null +++ b/tests/test_recovery.py @@ -0,0 +1,79 @@ +import pytest +import os +import tempfile +from unittest.mock import AsyncMock, MagicMock, patch + +from agent.task_queue import PersistentTaskQueue +from agent.recovery import recover_on_startup, ContainerCleaner + + +@pytest.fixture +async def task_queue(): + fd, db_path = tempfile.mkstemp(suffix=".db") + os.close(fd) + queue = PersistentTaskQueue(db_path=db_path) + await queue.initialize() + yield queue + await queue.close() + os.unlink(db_path) + + +@pytest.mark.asyncio +async def test_recover_resets_running_to_pending(task_queue): + await task_queue.enqueue("thread-1", "gitea", {"msg": "interrupted"}) + await task_queue.dequeue() # → running + assert await task_queue.has_running_task("thread-1") is True + + with patch("agent.recovery._cleanup_zombie_containers", new_callable=AsyncMock): + await recover_on_startup(task_queue) + + assert await task_queue.has_running_task("thread-1") is False + pending = await task_queue.get_pending() + assert len(pending) == 1 + + +@pytest.mark.asyncio +async def test_recover_no_running_tasks(task_queue): + with patch("agent.recovery._cleanup_zombie_containers", new_callable=AsyncMock): + await recover_on_startup(task_queue) + pending = await task_queue.get_pending() + assert len(pending) == 0 + + +@pytest.mark.asyncio +async def test_container_cleaner_removes_old(): + mock_container = MagicMock() + mock_container.name = "galaxis-sandbox-old" + mock_container.labels = {"galaxis-agent-sandbox": "true"} + mock_container.attrs = {"Created": "2026-03-19T00:00:00Z"} + mock_container.stop = MagicMock() + mock_container.remove = MagicMock() + + mock_docker = MagicMock() + mock_docker.containers.list.return_value = [mock_container] + + cleaner = ContainerCleaner(docker_client=mock_docker, max_age_seconds=600) + removed = await cleaner.cleanup_once() + + assert removed == 1 + mock_container.stop.assert_called_once() + mock_container.remove.assert_called_once() + + +@pytest.mark.asyncio +async def test_container_cleaner_keeps_recent(): + from datetime import datetime, timezone + now = datetime.now(timezone.utc).isoformat() + + mock_container = MagicMock() + mock_container.labels = {"galaxis-agent-sandbox": "true"} + mock_container.attrs = {"Created": now} + + mock_docker = MagicMock() + mock_docker.containers.list.return_value = [mock_container] + + cleaner = ContainerCleaner(docker_client=mock_docker, max_age_seconds=3600) + removed = await cleaner.cleanup_once() + + assert removed == 0 + mock_container.stop.assert_not_called()