118 lines
3.4 KiB
Python
118 lines
3.4 KiB
Python
|
|
import os
|
||
|
|
import tempfile
|
||
|
|
|
||
|
|
import pytest
|
||
|
|
|
||
|
|
from agent.task_queue import PersistentTaskQueue
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.fixture
|
||
|
|
async def task_queue():
|
||
|
|
"""임시 SQLite DB로 TaskQueue를 생성한다."""
|
||
|
|
fd, db_path = tempfile.mkstemp(suffix=".db")
|
||
|
|
os.close(fd)
|
||
|
|
queue = PersistentTaskQueue(db_path=db_path)
|
||
|
|
await queue.initialize()
|
||
|
|
yield queue
|
||
|
|
await queue.close()
|
||
|
|
os.unlink(db_path)
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_enqueue_and_dequeue(task_queue):
|
||
|
|
"""작업을 enqueue하고 dequeue한다."""
|
||
|
|
task_id = await task_queue.enqueue(
|
||
|
|
thread_id="thread-1",
|
||
|
|
source="gitea",
|
||
|
|
payload={"issue_number": 42, "body": "Fix bug"},
|
||
|
|
)
|
||
|
|
assert task_id is not None
|
||
|
|
|
||
|
|
task = await task_queue.dequeue()
|
||
|
|
assert task is not None
|
||
|
|
assert task["thread_id"] == "thread-1"
|
||
|
|
assert task["status"] == "running"
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_dequeue_empty_returns_none(task_queue):
|
||
|
|
"""큐가 비어있으면 None을 반환한다."""
|
||
|
|
task = await task_queue.dequeue()
|
||
|
|
assert task is None
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_fifo_order(task_queue):
|
||
|
|
"""FIFO 순서로 dequeue한다."""
|
||
|
|
await task_queue.enqueue("thread-1", "gitea", {"order": 1})
|
||
|
|
await task_queue.enqueue("thread-2", "discord", {"order": 2})
|
||
|
|
|
||
|
|
task1 = await task_queue.dequeue()
|
||
|
|
assert task1["payload"]["order"] == 1
|
||
|
|
|
||
|
|
# Complete first task before dequeuing second
|
||
|
|
await task_queue.mark_completed(task1["id"])
|
||
|
|
|
||
|
|
task2 = await task_queue.dequeue()
|
||
|
|
assert task2["payload"]["order"] == 2
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_concurrency_limit(task_queue):
|
||
|
|
"""running 작업이 있으면 dequeue하지 않는다."""
|
||
|
|
await task_queue.enqueue("thread-1", "gitea", {"msg": "first"})
|
||
|
|
await task_queue.enqueue("thread-2", "gitea", {"msg": "second"})
|
||
|
|
|
||
|
|
task1 = await task_queue.dequeue()
|
||
|
|
assert task1 is not None
|
||
|
|
|
||
|
|
task2 = await task_queue.dequeue()
|
||
|
|
assert task2 is None
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_mark_completed(task_queue):
|
||
|
|
"""작업을 completed로 표시한다."""
|
||
|
|
await task_queue.enqueue("thread-1", "gitea", {})
|
||
|
|
task = await task_queue.dequeue()
|
||
|
|
assert task is not None
|
||
|
|
|
||
|
|
await task_queue.mark_completed(task["id"], result={"pr_url": "http://..."})
|
||
|
|
|
||
|
|
await task_queue.enqueue("thread-2", "gitea", {})
|
||
|
|
task2 = await task_queue.dequeue()
|
||
|
|
assert task2 is not None
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_mark_failed(task_queue):
|
||
|
|
"""작업을 failed로 표시한다."""
|
||
|
|
await task_queue.enqueue("thread-1", "gitea", {})
|
||
|
|
task = await task_queue.dequeue()
|
||
|
|
|
||
|
|
await task_queue.mark_failed(task["id"], error="Something broke")
|
||
|
|
|
||
|
|
await task_queue.enqueue("thread-2", "gitea", {})
|
||
|
|
task2 = await task_queue.dequeue()
|
||
|
|
assert task2 is not None
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_get_pending(task_queue):
|
||
|
|
"""미처리 작업 목록을 반환한다."""
|
||
|
|
await task_queue.enqueue("thread-1", "gitea", {})
|
||
|
|
await task_queue.enqueue("thread-2", "discord", {})
|
||
|
|
|
||
|
|
pending = await task_queue.get_pending()
|
||
|
|
assert len(pending) == 2
|
||
|
|
|
||
|
|
|
||
|
|
@pytest.mark.asyncio
|
||
|
|
async def test_has_running_task_for_thread(task_queue):
|
||
|
|
"""특정 스레드에 실행 중인 작업이 있는지 확인한다."""
|
||
|
|
await task_queue.enqueue("thread-1", "gitea", {})
|
||
|
|
task = await task_queue.dequeue() # → running
|
||
|
|
|
||
|
|
assert await task_queue.has_running_task("thread-1") is True
|
||
|
|
assert await task_queue.has_running_task("thread-2") is False
|