feat: add valuation data collector

Add ValuationCollector class that fetches PER, PBR, and dividend yield
data from KRX for all listed stocks. Includes business day validation,
safe float conversion, and upsert logic for the valuations table.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
zephyrdark 2026-02-02 23:48:08 +09:00
parent 29f727970d
commit 3e723b6146
2 changed files with 134 additions and 1 deletions

View File

@ -2,5 +2,12 @@ from app.services.collectors.base import BaseCollector
from app.services.collectors.stock_collector import StockCollector
from app.services.collectors.sector_collector import SectorCollector
from app.services.collectors.price_collector import PriceCollector
from app.services.collectors.valuation_collector import ValuationCollector
__all__ = ["BaseCollector", "StockCollector", "SectorCollector", "PriceCollector"]
__all__ = [
"BaseCollector",
"StockCollector",
"SectorCollector",
"PriceCollector",
"ValuationCollector",
]

View File

@ -0,0 +1,126 @@
"""
Valuation data collector from KRX.
"""
import logging
from io import BytesIO
from datetime import datetime
import pandas as pd
import requests
from sqlalchemy.orm import Session
from sqlalchemy.dialects.postgresql import insert
from app.services.collectors.base import BaseCollector
from app.models.stock import Valuation
logger = logging.getLogger(__name__)
class ValuationCollector(BaseCollector):
"""Collects valuation metrics (PER, PBR, etc.) from KRX."""
GEN_OTP_URL = "http://data.krx.co.kr/comm/fileDn/GenerateOTP/generate.cmd"
DOWN_URL = "http://data.krx.co.kr/comm/fileDn/download_csv/download.cmd"
HEADERS = {
"Referer": "http://data.krx.co.kr/contents/MDC/MDI/mdiLoader",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
}
REQUEST_TIMEOUT = 10
def __init__(self, db: Session, biz_day: str = None):
super().__init__(db)
self.biz_day = biz_day or datetime.now().strftime("%Y%m%d")
self._validate_biz_day()
def _validate_biz_day(self) -> None:
"""Validate business day format."""
try:
datetime.strptime(self.biz_day, "%Y%m%d")
except ValueError:
raise ValueError(f"Invalid biz_day format. Expected YYYYMMDD, got: {self.biz_day}")
def _safe_float(self, value) -> float | None:
"""Safely convert value to float."""
if pd.isna(value):
return None
try:
return float(value)
except (ValueError, TypeError):
return None
def collect(self) -> int:
"""Collect valuation data."""
gen_otp_data = {
"locale": "ko_KR",
"searchType": "1",
"mktId": "ALL",
"trdDd": self.biz_day,
"csvxls_isNo": "false",
"name": "fileDown",
"url": "dbms/MDC/STAT/standard/MDCSTAT03501",
}
try:
otp = requests.post(
self.GEN_OTP_URL,
data=gen_otp_data,
headers=self.HEADERS,
timeout=self.REQUEST_TIMEOUT
)
otp.raise_for_status()
response = requests.post(
self.DOWN_URL,
data={"code": otp.text},
headers=self.HEADERS,
timeout=self.REQUEST_TIMEOUT
)
response.raise_for_status()
except requests.RequestException as e:
raise RuntimeError(f"Failed to fetch valuation data: {e}")
df = pd.read_csv(BytesIO(response.content), encoding="EUC-KR")
df.columns = df.columns.str.replace(" ", "")
base_date = datetime.strptime(self.biz_day, "%Y%m%d").date()
logger.info(f"Processing {len(df)} valuation records for {self.biz_day}")
records = []
for _, row in df.iterrows():
ticker = row.get("종목코드")
if not ticker or pd.isna(ticker):
continue
records.append({
"ticker": ticker,
"base_date": base_date,
"per": self._safe_float(row.get("PER")),
"pbr": self._safe_float(row.get("PBR")),
"psr": None, # Not available from this endpoint
"pcr": None, # Not available from this endpoint
"dividend_yield": self._safe_float(row.get("배당수익률")),
})
if records:
try:
stmt = insert(Valuation).values(records)
stmt = stmt.on_conflict_do_update(
index_elements=["ticker", "base_date"],
set_={
"per": stmt.excluded.per,
"pbr": stmt.excluded.pbr,
"psr": stmt.excluded.psr,
"pcr": stmt.excluded.pcr,
"dividend_yield": stmt.excluded.dividend_yield,
},
)
self.db.execute(stmt)
self.db.commit()
except Exception as e:
self.db.rollback()
raise RuntimeError(f"Failed to insert valuation data: {e}")
logger.info(f"Collected {len(records)} valuation records")
return len(records)