머니페니 25115b33dd fix: resolve 5 site inspection issues (2026-05-26)
ISSUE-1: ETFPriceCollector 좀비 프로세스 재발 방지
- etf_price_collector: 루프마다 heartbeat() 호출 추가
- admin API: POST /api/admin/collect/reset-stuck 엔드포인트 추가

ISSUE-3: 헤더 제목 "대시보드" 고정 버그
- new-header.tsx pageTitles에 누락된 7개 경로 추가

ISSUE-4: 대시보드 파이 차트 미렌더링
- DonutChart Legend를 Recharts 외부로 분리하여 파이 공간 확보

ISSUE-5: daily_snapshots records_count 항상 0
- PriceService에 ETFPrice 테이블 fallback 추가
2026-05-26 22:35:02 +09:00

216 lines
7.1 KiB
Python

"""
Admin API for data collection management.
All collection tasks run in background threads to avoid blocking the
async event loop (uvicorn single-worker). Each background thread
creates its own DB session so the request can return immediately.
"""
import logging
import threading
from typing import List, Optional
from datetime import datetime, timezone, timedelta
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.orm import Session
from pydantic import BaseModel
from app.core.database import get_db, SessionLocal
from app.api.deps import CurrentUser
from app.models.stock import JobLog
from app.services.collectors.base import HEARTBEAT_STALE_MINUTES
from app.services.collectors import (
StockCollector,
SectorCollector,
PriceCollector,
ValuationCollector,
ETFCollector,
ETFPriceCollector,
)
from jobs.collection_job import run_backfill
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/admin", tags=["admin"])
class JobLogResponse(BaseModel):
id: int
job_name: str
status: str
started_at: datetime
finished_at: datetime | None
records_count: int | None
error_msg: str | None
class Config:
from_attributes = True
class CollectResponse(BaseModel):
message: str
job_id: int | None = None
def _run_collector_background(collector_cls, collector_kwargs: dict):
"""Run a collector in a background thread with its own DB session."""
db = SessionLocal()
try:
collector = collector_cls(db, **collector_kwargs)
collector.run()
except Exception as e:
logger.error("Background collection %s failed: %s", collector_cls.__name__, e)
finally:
db.close()
def _start_background_collection(collector_cls, **kwargs):
"""Start a collector in a daemon thread and return immediately."""
thread = threading.Thread(
target=_run_collector_background,
args=(collector_cls, kwargs),
daemon=True,
)
thread.start()
def _run_backfill_background(start_year: int):
"""Run backfill in a background thread."""
try:
run_backfill(start_year=start_year)
except Exception as e:
logger.error("Background backfill failed: %s", e)
def run_backfill_background(start_year: int):
"""Start backfill in a daemon thread."""
thread = threading.Thread(
target=_run_backfill_background,
args=(start_year,),
daemon=True,
)
thread.start()
@router.post("/collect/stocks", response_model=CollectResponse)
async def collect_stocks(
current_user: CurrentUser,
biz_day: Optional[str] = Query(None, pattern=r"^\d{8}$", description="Business day in YYYYMMDD format"),
):
"""Collect stock master data from KRX (runs in background)."""
_start_background_collection(StockCollector, biz_day=biz_day)
return CollectResponse(message="Stock collection started")
@router.post("/collect/sectors", response_model=CollectResponse)
async def collect_sectors(
current_user: CurrentUser,
biz_day: Optional[str] = Query(None, pattern=r"^\d{8}$", description="Business day in YYYYMMDD format"),
):
"""Collect sector classification data from WISEindex (runs in background)."""
_start_background_collection(SectorCollector, biz_day=biz_day)
return CollectResponse(message="Sector collection started")
@router.post("/collect/prices", response_model=CollectResponse)
async def collect_prices(
current_user: CurrentUser,
start_date: Optional[str] = Query(None, pattern=r"^\d{8}$", description="Start date in YYYYMMDD format"),
end_date: Optional[str] = Query(None, pattern=r"^\d{8}$", description="End date in YYYYMMDD format"),
):
"""Collect price data using pykrx (runs in background)."""
_start_background_collection(PriceCollector, start_date=start_date, end_date=end_date)
return CollectResponse(message="Price collection started")
@router.post("/collect/valuations", response_model=CollectResponse)
async def collect_valuations(
current_user: CurrentUser,
biz_day: Optional[str] = Query(None, pattern=r"^\d{8}$", description="Business day in YYYYMMDD format"),
):
"""Collect valuation data from KRX (runs in background)."""
_start_background_collection(ValuationCollector, biz_day=biz_day)
return CollectResponse(message="Valuation collection started")
@router.post("/collect/etfs", response_model=CollectResponse)
async def collect_etfs(
current_user: CurrentUser,
):
"""Collect ETF master data from KRX (runs in background)."""
_start_background_collection(ETFCollector)
return CollectResponse(message="ETF collection started")
@router.post("/collect/etf-prices", response_model=CollectResponse)
async def collect_etf_prices(
current_user: CurrentUser,
start_date: Optional[str] = Query(None, pattern=r"^\d{8}$", description="Start date in YYYYMMDD format"),
end_date: Optional[str] = Query(None, pattern=r"^\d{8}$", description="End date in YYYYMMDD format"),
):
"""Collect ETF price data using pykrx (runs in background)."""
_start_background_collection(ETFPriceCollector, start_date=start_date, end_date=end_date)
return CollectResponse(message="ETF price collection started")
@router.post("/collect/backfill", response_model=CollectResponse)
async def collect_backfill(
current_user: CurrentUser,
start_year: int = Query(2000, ge=1990, le=2026, description="Start year for backfill"),
):
"""Backfill historical price data from start_year to today (runs in background)."""
run_backfill_background(start_year)
return CollectResponse(message=f"Backfill started from {start_year}")
class ResetStuckResponse(BaseModel):
reset_count: int
message: str
@router.post("/collect/reset-stuck", response_model=ResetStuckResponse)
async def reset_stuck_jobs(
current_user: CurrentUser,
db: Session = Depends(get_db),
stale_minutes: int = Query(
HEARTBEAT_STALE_MINUTES,
gt=0,
le=1440,
description="Mark as failed if heartbeat is older than this many minutes",
),
):
"""Reset jobs stuck in 'running' state with a stale heartbeat."""
cutoff = datetime.now(timezone.utc) - timedelta(minutes=stale_minutes)
stuck_jobs = (
db.query(JobLog)
.filter(
JobLog.status == "running",
(JobLog.last_heartbeat < cutoff) | (JobLog.last_heartbeat == None), # noqa: E711
)
.all()
)
for job in stuck_jobs:
job.status = "failed"
job.finished_at = datetime.now(timezone.utc)
job.error_msg = f"Reset by admin: no heartbeat for >{stale_minutes}min"
db.commit()
return ResetStuckResponse(
reset_count=len(stuck_jobs),
message=f"Reset {len(stuck_jobs)} stuck job(s)",
)
@router.get("/collect/status", response_model=List[JobLogResponse])
async def get_collection_status(
current_user: CurrentUser,
db: Session = Depends(get_db),
limit: int = Query(20, gt=0, le=100, description="Number of records to return"),
):
"""Get recent job execution status."""
jobs = (
db.query(JobLog)
.order_by(JobLog.started_at.desc())
.limit(limit)
.all()
)
return jobs