galaxis-po/backend/app/services/collectors/etf_price_collector.py
zephyrdark ecb3dca571 feat: add ETF data collectors and admin API endpoints
Add ETFCollector (KRX master data) and ETFPriceCollector (pykrx OHLCV)
with corresponding admin API endpoints and frontend collection UI buttons.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-08 23:00:27 +09:00

117 lines
3.9 KiB
Python

"""
ETF price data collector using pykrx.
"""
import logging
from datetime import datetime, timedelta
import pandas as pd
from pykrx import stock as pykrx_stock
from sqlalchemy.orm import Session
from sqlalchemy.dialects.postgresql import insert
from app.services.collectors.base import BaseCollector
from app.models.stock import ETFPrice, ETF
logger = logging.getLogger(__name__)
class ETFPriceCollector(BaseCollector):
"""Collects daily ETF price data."""
def __init__(self, db: Session, start_date: str = None, end_date: str = None):
super().__init__(db)
self.end_date = end_date or datetime.now().strftime("%Y%m%d")
self.start_date = start_date or (
datetime.now() - timedelta(days=7)
).strftime("%Y%m%d")
self._validate_dates()
def _validate_dates(self) -> None:
"""Validate date formats."""
for date_str, name in [(self.start_date, "start_date"), (self.end_date, "end_date")]:
try:
datetime.strptime(date_str, "%Y%m%d")
except ValueError:
raise ValueError(f"Invalid {name} format. Expected YYYYMMDD, got: {date_str}")
def _safe_float(self, value) -> float | None:
"""Safely convert value to float."""
if pd.isna(value):
return None
try:
return float(value)
except (ValueError, TypeError):
return None
def _safe_int(self, value) -> int | None:
"""Safely convert value to int."""
if pd.isna(value):
return None
try:
return int(float(value))
except (ValueError, TypeError):
return None
def collect(self) -> int:
"""Collect price data for all ETFs."""
tickers = self.db.query(ETF.ticker).all()
ticker_list = [t[0] for t in tickers]
if not ticker_list:
logger.warning("No ETFs found in database. Run ETFCollector first.")
return 0
total_records = 0
logger.info(f"Collecting ETF prices for {len(ticker_list)} ETFs from {self.start_date} to {self.end_date}")
for ticker in ticker_list:
try:
df = pykrx_stock.get_etf_ohlcv_by_date(
self.start_date, self.end_date, ticker
)
if df.empty:
continue
df = df.reset_index()
records = []
for _, row in df.iterrows():
close_val = self._safe_float(row.get("종가"))
if close_val is None:
continue
nav_val = self._safe_float(row.get("NAV"))
volume_val = self._safe_int(row.get("거래량"))
date_value = row["날짜"].date() if hasattr(row["날짜"], "date") else row["날짜"]
records.append({
"ticker": ticker,
"date": date_value,
"close": close_val,
"nav": nav_val,
"volume": volume_val,
})
if records:
stmt = insert(ETFPrice).values(records)
stmt = stmt.on_conflict_do_update(
index_elements=["ticker", "date"],
set_={
"close": stmt.excluded.close,
"nav": stmt.excluded.nav,
"volume": stmt.excluded.volume,
},
)
self.db.execute(stmt)
self.db.commit()
total_records += len(records)
except Exception as e:
self.db.rollback()
logger.warning(f"Failed to fetch ETF prices for {ticker}: {e}")
continue
logger.info(f"Collected {total_records} ETF price records")
return total_records