All checks were successful
Deploy to Production / deploy (push) Successful in 1m12s
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
178 lines
5.9 KiB
Python
178 lines
5.9 KiB
Python
"""
|
|
Admin API for data collection management.
|
|
|
|
All collection tasks run in background threads to avoid blocking the
|
|
async event loop (uvicorn single-worker). Each background thread
|
|
creates its own DB session so the request can return immediately.
|
|
"""
|
|
import logging
|
|
import threading
|
|
from typing import List, Optional
|
|
from datetime import datetime
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException, Query
|
|
from sqlalchemy.orm import Session
|
|
from pydantic import BaseModel
|
|
|
|
from app.core.database import get_db, SessionLocal
|
|
from app.api.deps import CurrentUser
|
|
from app.models.stock import JobLog
|
|
from app.services.collectors import (
|
|
StockCollector,
|
|
SectorCollector,
|
|
PriceCollector,
|
|
ValuationCollector,
|
|
ETFCollector,
|
|
ETFPriceCollector,
|
|
)
|
|
from jobs.collection_job import run_backfill
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
router = APIRouter(prefix="/api/admin", tags=["admin"])
|
|
|
|
|
|
class JobLogResponse(BaseModel):
|
|
id: int
|
|
job_name: str
|
|
status: str
|
|
started_at: datetime
|
|
finished_at: datetime | None
|
|
records_count: int | None
|
|
error_msg: str | None
|
|
|
|
class Config:
|
|
from_attributes = True
|
|
|
|
|
|
class CollectResponse(BaseModel):
|
|
message: str
|
|
job_id: int | None = None
|
|
|
|
|
|
def _run_collector_background(collector_cls, collector_kwargs: dict):
|
|
"""Run a collector in a background thread with its own DB session."""
|
|
db = SessionLocal()
|
|
try:
|
|
collector = collector_cls(db, **collector_kwargs)
|
|
collector.run()
|
|
except Exception as e:
|
|
logger.error("Background collection %s failed: %s", collector_cls.__name__, e)
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
def _start_background_collection(collector_cls, **kwargs):
|
|
"""Start a collector in a daemon thread and return immediately."""
|
|
thread = threading.Thread(
|
|
target=_run_collector_background,
|
|
args=(collector_cls, kwargs),
|
|
daemon=True,
|
|
)
|
|
thread.start()
|
|
|
|
|
|
def _run_backfill_background(start_year: int):
|
|
"""Run backfill in a background thread."""
|
|
try:
|
|
run_backfill(start_year=start_year)
|
|
except Exception as e:
|
|
logger.error("Background backfill failed: %s", e)
|
|
|
|
|
|
def run_backfill_background(start_year: int):
|
|
"""Start backfill in a daemon thread."""
|
|
thread = threading.Thread(
|
|
target=_run_backfill_background,
|
|
args=(start_year,),
|
|
daemon=True,
|
|
)
|
|
thread.start()
|
|
|
|
|
|
@router.post("/collect/stocks", response_model=CollectResponse)
|
|
async def collect_stocks(
|
|
current_user: CurrentUser,
|
|
biz_day: Optional[str] = Query(None, pattern=r"^\d{8}$", description="Business day in YYYYMMDD format"),
|
|
):
|
|
"""Collect stock master data from KRX (runs in background)."""
|
|
_start_background_collection(StockCollector, biz_day=biz_day)
|
|
return CollectResponse(message="Stock collection started")
|
|
|
|
|
|
@router.post("/collect/sectors", response_model=CollectResponse)
|
|
async def collect_sectors(
|
|
current_user: CurrentUser,
|
|
biz_day: Optional[str] = Query(None, pattern=r"^\d{8}$", description="Business day in YYYYMMDD format"),
|
|
):
|
|
"""Collect sector classification data from WISEindex (runs in background)."""
|
|
_start_background_collection(SectorCollector, biz_day=biz_day)
|
|
return CollectResponse(message="Sector collection started")
|
|
|
|
|
|
@router.post("/collect/prices", response_model=CollectResponse)
|
|
async def collect_prices(
|
|
current_user: CurrentUser,
|
|
start_date: Optional[str] = Query(None, pattern=r"^\d{8}$", description="Start date in YYYYMMDD format"),
|
|
end_date: Optional[str] = Query(None, pattern=r"^\d{8}$", description="End date in YYYYMMDD format"),
|
|
):
|
|
"""Collect price data using pykrx (runs in background)."""
|
|
_start_background_collection(PriceCollector, start_date=start_date, end_date=end_date)
|
|
return CollectResponse(message="Price collection started")
|
|
|
|
|
|
@router.post("/collect/valuations", response_model=CollectResponse)
|
|
async def collect_valuations(
|
|
current_user: CurrentUser,
|
|
biz_day: Optional[str] = Query(None, pattern=r"^\d{8}$", description="Business day in YYYYMMDD format"),
|
|
):
|
|
"""Collect valuation data from KRX (runs in background)."""
|
|
_start_background_collection(ValuationCollector, biz_day=biz_day)
|
|
return CollectResponse(message="Valuation collection started")
|
|
|
|
|
|
@router.post("/collect/etfs", response_model=CollectResponse)
|
|
async def collect_etfs(
|
|
current_user: CurrentUser,
|
|
):
|
|
"""Collect ETF master data from KRX (runs in background)."""
|
|
_start_background_collection(ETFCollector)
|
|
return CollectResponse(message="ETF collection started")
|
|
|
|
|
|
@router.post("/collect/etf-prices", response_model=CollectResponse)
|
|
async def collect_etf_prices(
|
|
current_user: CurrentUser,
|
|
start_date: Optional[str] = Query(None, pattern=r"^\d{8}$", description="Start date in YYYYMMDD format"),
|
|
end_date: Optional[str] = Query(None, pattern=r"^\d{8}$", description="End date in YYYYMMDD format"),
|
|
):
|
|
"""Collect ETF price data using pykrx (runs in background)."""
|
|
_start_background_collection(ETFPriceCollector, start_date=start_date, end_date=end_date)
|
|
return CollectResponse(message="ETF price collection started")
|
|
|
|
|
|
@router.post("/collect/backfill", response_model=CollectResponse)
|
|
async def collect_backfill(
|
|
current_user: CurrentUser,
|
|
start_year: int = Query(2000, ge=1990, le=2026, description="Start year for backfill"),
|
|
):
|
|
"""Backfill historical price data from start_year to today (runs in background)."""
|
|
run_backfill_background(start_year)
|
|
return CollectResponse(message=f"Backfill started from {start_year}")
|
|
|
|
|
|
@router.get("/collect/status", response_model=List[JobLogResponse])
|
|
async def get_collection_status(
|
|
current_user: CurrentUser,
|
|
db: Session = Depends(get_db),
|
|
limit: int = Query(20, gt=0, le=100, description="Number of records to return"),
|
|
):
|
|
"""Get recent job execution status."""
|
|
jobs = (
|
|
db.query(JobLog)
|
|
.order_by(JobLog.started_at.desc())
|
|
.limit(limit)
|
|
.all()
|
|
)
|
|
return jobs
|