galaxis-po/backend/app/services/collectors/valuation_collector.py
zephyrdark 4261e9c777
Some checks failed
Deploy to Production / deploy (push) Failing after 1m37s
fix: switch StockCollector and ValuationCollector from KRX CSV to pykrx
KRX CSV download endpoint blocks requests from cloud/server IPs,
causing "No columns to parse from file" errors. Replaced with pykrx's
JSON-based API (get_market_ticker_list, get_market_cap_by_ticker,
get_market_fundamental_by_ticker) which is more reliable.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-08 23:08:50 +09:00

97 lines
3.1 KiB
Python

"""
Valuation data collector using pykrx.
"""
import logging
from datetime import datetime
import pandas as pd
from pykrx import stock as pykrx_stock
from sqlalchemy.orm import Session
from sqlalchemy.dialects.postgresql import insert
from app.services.collectors.base import BaseCollector
from app.models.stock import Valuation
logger = logging.getLogger(__name__)
class ValuationCollector(BaseCollector):
"""Collects valuation metrics (PER, PBR, etc.) using pykrx."""
def __init__(self, db: Session, biz_day: str = None):
super().__init__(db)
self.biz_day = biz_day or self._get_latest_biz_day()
self._validate_biz_day()
def _validate_biz_day(self) -> None:
"""Validate business day format."""
try:
datetime.strptime(self.biz_day, "%Y%m%d")
except ValueError:
raise ValueError(f"Invalid biz_day format. Expected YYYYMMDD, got: {self.biz_day}")
def _safe_float(self, value) -> float | None:
"""Safely convert value to float."""
if pd.isna(value):
return None
try:
return float(value)
except (ValueError, TypeError):
return None
def collect(self) -> int:
"""Collect valuation data."""
fund_df = pykrx_stock.get_market_fundamental_by_ticker(self.biz_day, market="ALL")
if fund_df.empty:
logger.warning(f"No fundamental data returned for {self.biz_day}")
return 0
base_date = datetime.strptime(self.biz_day, "%Y%m%d").date()
logger.info(f"Processing {len(fund_df)} valuation records for {self.biz_day}")
records = []
for ticker in fund_df.index:
per = self._safe_float(fund_df.at[ticker, "PER"])
pbr = self._safe_float(fund_df.at[ticker, "PBR"])
div_yield = self._safe_float(fund_df.at[ticker, "DIV"])
# Skip records where all metrics are None
if all(v is None for v in [per, pbr, div_yield]):
continue
records.append({
"ticker": ticker,
"base_date": base_date,
"per": per,
"pbr": pbr,
"psr": None,
"pcr": None,
"dividend_yield": div_yield,
})
if records:
try:
stmt = insert(Valuation).values(records)
stmt = stmt.on_conflict_do_update(
index_elements=["ticker", "base_date"],
set_={
"per": stmt.excluded.per,
"pbr": stmt.excluded.pbr,
"psr": stmt.excluded.psr,
"pcr": stmt.excluded.pcr,
"dividend_yield": stmt.excluded.dividend_yield,
},
)
self.db.execute(stmt)
self.db.commit()
except Exception as e:
self.db.rollback()
raise RuntimeError(f"Failed to insert valuation data: {e}")
logger.info(f"Collected {len(records)} valuation records")
return len(records)