zephyrdark 8b9fe7064c
All checks were successful
Deploy to Production / deploy (push) Successful in 1m42s
fix: correct pykrx ETF module import path and method call
The pykrx library uses 'etx' not 'etf' as the module directory name,
and fetch() is the proper method that returns a DataFrame.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-10 21:27:41 +09:00

93 lines
3.0 KiB
Python

"""
ETF master data collector from KRX.
"""
import logging
import pandas as pd
from pykrx.website.krx.etx.core import ETF_전종목기본종목
from sqlalchemy.orm import Session
from sqlalchemy.dialects.postgresql import insert
from app.services.collectors.base import BaseCollector
from app.models.stock import ETF, AssetClass
logger = logging.getLogger(__name__)
class ETFCollector(BaseCollector):
"""Collects ETF master data from KRX."""
def __init__(self, db: Session):
super().__init__(db)
def _classify_asset_class(self, asset_class_str: str, name: str) -> str:
"""Classify ETF asset class."""
if not asset_class_str or pd.isna(asset_class_str):
return AssetClass.MIXED.value
if "주식" in asset_class_str:
return AssetClass.EQUITY.value
elif "채권" in asset_class_str:
return AssetClass.BOND.value
elif "부동산" in asset_class_str:
return AssetClass.EQUITY.value
elif "원자재" in asset_class_str:
name_lower = name if name else ""
if "" in name_lower or "골드" in name_lower:
return AssetClass.GOLD.value
return AssetClass.MIXED.value
else:
return AssetClass.MIXED.value
def collect(self) -> int:
"""Collect ETF master data."""
df = ETF_전종목기본종목().fetch()
if df.empty:
logger.warning("No ETF data returned from KRX.")
return 0
records = []
for _, row in df.iterrows():
ticker = row.get("ISU_SRT_CD")
name = row.get("ISU_ABBRV")
if not ticker or pd.isna(ticker) or not name or pd.isna(name):
continue
asset_class_str = row.get("IDX_ASST_CLSS_NM", "")
market = row.get("IDX_MKT_CLSS_NM", "")
expense_ratio_raw = row.get("ETF_TOT_FEE")
expense_ratio = None
if expense_ratio_raw and not pd.isna(expense_ratio_raw):
try:
expense_ratio = float(expense_ratio_raw)
except (ValueError, TypeError):
pass
records.append({
"ticker": ticker,
"name": name,
"asset_class": self._classify_asset_class(asset_class_str, name),
"market": market if market and not pd.isna(market) else "",
"expense_ratio": expense_ratio,
})
if records:
stmt = insert(ETF).values(records)
stmt = stmt.on_conflict_do_update(
index_elements=["ticker"],
set_={
"name": stmt.excluded.name,
"asset_class": stmt.excluded.asset_class,
"market": stmt.excluded.market,
"expense_ratio": stmt.excluded.expense_ratio,
},
)
self.db.execute(stmt)
self.db.commit()
logger.info(f"Collected {len(records)} ETF records")
return len(records)