galaxis-agent/agent/task_history.py
머니페니 db6e9b4a41 feat: add TaskHistory for completed task audit logging
Implements SQLite-based task history tracking with metrics (cost, duration, tokens).
- TaskHistory class with record() and get_recent() methods
- Tracks task_id, thread_id, issue_number, repo_name, source, status
- Records duration_seconds, tokens_input, tokens_output, cost_usd, error_message
- 4 passing tests covering completed/failed recording, ordering, empty state
2026-03-20 18:40:47 +09:00

85 lines
2.8 KiB
Python

"""완료 작업 이력 DB.
작업의 비용, 소요시간, 토큰 사용량을 SQLite에 기록한다.
"""
from __future__ import annotations
import logging
import os
import aiosqlite
logger = logging.getLogger(__name__)
_CREATE_TABLE = """
CREATE TABLE IF NOT EXISTS task_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
task_id TEXT UNIQUE NOT NULL,
thread_id TEXT NOT NULL,
issue_number INTEGER NOT NULL DEFAULT 0,
repo_name TEXT NOT NULL DEFAULT '',
source TEXT NOT NULL DEFAULT '',
status TEXT NOT NULL,
created_at TEXT NOT NULL,
completed_at TEXT NOT NULL,
duration_seconds REAL NOT NULL DEFAULT 0,
tokens_input INTEGER NOT NULL DEFAULT 0,
tokens_output INTEGER NOT NULL DEFAULT 0,
cost_usd REAL NOT NULL DEFAULT 0,
error_message TEXT NOT NULL DEFAULT ''
)
"""
class TaskHistory:
def __init__(self, db_path: str = "/data/task_history.db"):
self._db_path = db_path
self._db: aiosqlite.Connection | None = None
async def initialize(self) -> None:
self._db = await aiosqlite.connect(self._db_path)
self._db.row_factory = aiosqlite.Row
await self._db.execute(_CREATE_TABLE)
await self._db.commit()
async def close(self) -> None:
if self._db:
await self._db.close()
async def record(
self, task_id: str, thread_id: str, issue_number: int, repo_name: str,
source: str, status: str, created_at: str, completed_at: str,
duration_seconds: float, tokens_input: int, tokens_output: int,
cost_usd: float, error_message: str = "",
) -> None:
await self._db.execute(
"INSERT OR REPLACE INTO task_history "
"(task_id, thread_id, issue_number, repo_name, source, status, "
"created_at, completed_at, duration_seconds, tokens_input, tokens_output, "
"cost_usd, error_message) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
(task_id, thread_id, issue_number, repo_name, source, status,
created_at, completed_at, duration_seconds, tokens_input, tokens_output,
cost_usd, error_message),
)
await self._db.commit()
logger.info("Recorded history: task=%s status=%s cost=$%.4f", task_id, status, cost_usd)
async def get_recent(self, limit: int = 20) -> list[dict]:
cursor = await self._db.execute(
"SELECT * FROM task_history ORDER BY completed_at DESC LIMIT ?", (limit,),
)
rows = await cursor.fetchall()
return [dict(row) for row in rows]
_history: TaskHistory | None = None
async def get_task_history() -> TaskHistory:
global _history
if _history is None:
db_path = os.environ.get("TASK_HISTORY_DB", "/data/task_history.db")
_history = TaskHistory(db_path=db_path)
await _history.initialize()
return _history