galaxis-agent/tests/test_integration_webhook_flow.py

68 lines
2.1 KiB
Python
Raw Normal View History

"""Integration tests for webhook-to-dispatcher flow."""
import pytest
import os
import tempfile
from unittest.mock import AsyncMock, patch
from agent.task_queue import PersistentTaskQueue
from agent.dispatcher import Dispatcher
@pytest.fixture
async def task_queue():
"""Create a temporary task queue for testing."""
fd, db_path = tempfile.mkstemp(suffix=".db")
os.close(fd)
queue = PersistentTaskQueue(db_path=db_path)
await queue.initialize()
yield queue
await queue.close()
os.unlink(db_path)
@pytest.mark.asyncio
async def test_webhook_to_dispatcher_flow(task_queue):
"""Gitea webhook → TaskQueue → Dispatcher 전체 흐름."""
from agent.webapp import parse_gitea_event, generate_thread_id
# 1. Webhook 이벤트 파싱
payload = {
"action": "created",
"comment": {"body": "@agent factor_calculator에 듀얼 모멘텀 추가해줘"},
"issue": {"number": 42, "title": "Feature request", "body": "desc"},
"repository": {"full_name": "quant/galaxis-po", "name": "galaxis-po"},
}
event = parse_gitea_event("issue_comment", payload)
assert event["should_process"] is True
# 2. TaskQueue에 enqueue
thread_id = generate_thread_id("galaxis-po", 42)
task_id = await task_queue.enqueue(
thread_id=thread_id,
source="gitea",
payload={
"issue_number": event["issue_number"],
"repo_owner": event["repo_owner"],
"repo_name": event["repo_name"],
"message": event["message"],
},
)
# 3. Dispatcher가 처리
mock_run_agent = AsyncMock(return_value={"status": "completed"})
dispatcher = Dispatcher(task_queue=task_queue)
dispatcher._run_agent_for_task = mock_run_agent
await dispatcher._poll_once()
# 4. 에이전트가 호출되었는지 확인
mock_run_agent.assert_called_once()
call_task = mock_run_agent.call_args[0][0]
assert call_task["thread_id"] == thread_id
assert call_task["payload"]["issue_number"] == 42
# 5. 작업이 완료 처리되었는지 확인
pending = await task_queue.get_pending()
assert len(pending) == 0