85 lines
2.8 KiB
Python
85 lines
2.8 KiB
Python
|
|
"""완료 작업 이력 DB.
|
||
|
|
|
||
|
|
작업의 비용, 소요시간, 토큰 사용량을 SQLite에 기록한다.
|
||
|
|
"""
|
||
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
import logging
|
||
|
|
import os
|
||
|
|
|
||
|
|
import aiosqlite
|
||
|
|
|
||
|
|
logger = logging.getLogger(__name__)
|
||
|
|
|
||
|
|
_CREATE_TABLE = """
|
||
|
|
CREATE TABLE IF NOT EXISTS task_history (
|
||
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||
|
|
task_id TEXT UNIQUE NOT NULL,
|
||
|
|
thread_id TEXT NOT NULL,
|
||
|
|
issue_number INTEGER NOT NULL DEFAULT 0,
|
||
|
|
repo_name TEXT NOT NULL DEFAULT '',
|
||
|
|
source TEXT NOT NULL DEFAULT '',
|
||
|
|
status TEXT NOT NULL,
|
||
|
|
created_at TEXT NOT NULL,
|
||
|
|
completed_at TEXT NOT NULL,
|
||
|
|
duration_seconds REAL NOT NULL DEFAULT 0,
|
||
|
|
tokens_input INTEGER NOT NULL DEFAULT 0,
|
||
|
|
tokens_output INTEGER NOT NULL DEFAULT 0,
|
||
|
|
cost_usd REAL NOT NULL DEFAULT 0,
|
||
|
|
error_message TEXT NOT NULL DEFAULT ''
|
||
|
|
)
|
||
|
|
"""
|
||
|
|
|
||
|
|
|
||
|
|
class TaskHistory:
|
||
|
|
def __init__(self, db_path: str = "/data/task_history.db"):
|
||
|
|
self._db_path = db_path
|
||
|
|
self._db: aiosqlite.Connection | None = None
|
||
|
|
|
||
|
|
async def initialize(self) -> None:
|
||
|
|
self._db = await aiosqlite.connect(self._db_path)
|
||
|
|
self._db.row_factory = aiosqlite.Row
|
||
|
|
await self._db.execute(_CREATE_TABLE)
|
||
|
|
await self._db.commit()
|
||
|
|
|
||
|
|
async def close(self) -> None:
|
||
|
|
if self._db:
|
||
|
|
await self._db.close()
|
||
|
|
|
||
|
|
async def record(
|
||
|
|
self, task_id: str, thread_id: str, issue_number: int, repo_name: str,
|
||
|
|
source: str, status: str, created_at: str, completed_at: str,
|
||
|
|
duration_seconds: float, tokens_input: int, tokens_output: int,
|
||
|
|
cost_usd: float, error_message: str = "",
|
||
|
|
) -> None:
|
||
|
|
await self._db.execute(
|
||
|
|
"INSERT OR REPLACE INTO task_history "
|
||
|
|
"(task_id, thread_id, issue_number, repo_name, source, status, "
|
||
|
|
"created_at, completed_at, duration_seconds, tokens_input, tokens_output, "
|
||
|
|
"cost_usd, error_message) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
|
||
|
|
(task_id, thread_id, issue_number, repo_name, source, status,
|
||
|
|
created_at, completed_at, duration_seconds, tokens_input, tokens_output,
|
||
|
|
cost_usd, error_message),
|
||
|
|
)
|
||
|
|
await self._db.commit()
|
||
|
|
logger.info("Recorded history: task=%s status=%s cost=$%.4f", task_id, status, cost_usd)
|
||
|
|
|
||
|
|
async def get_recent(self, limit: int = 20) -> list[dict]:
|
||
|
|
cursor = await self._db.execute(
|
||
|
|
"SELECT * FROM task_history ORDER BY completed_at DESC LIMIT ?", (limit,),
|
||
|
|
)
|
||
|
|
rows = await cursor.fetchall()
|
||
|
|
return [dict(row) for row in rows]
|
||
|
|
|
||
|
|
|
||
|
|
_history: TaskHistory | None = None
|
||
|
|
|
||
|
|
|
||
|
|
async def get_task_history() -> TaskHistory:
|
||
|
|
global _history
|
||
|
|
if _history is None:
|
||
|
|
db_path = os.environ.get("TASK_HISTORY_DB", "/data/task_history.db")
|
||
|
|
_history = TaskHistory(db_path=db_path)
|
||
|
|
await _history.initialize()
|
||
|
|
return _history
|