galaxis-po/backend/app/services/collectors/valuation_collector.py
zephyrdark 72c72994b2
All checks were successful
Deploy to Production / deploy (push) Successful in 1m8s
fix: collector error
2026-02-08 22:48:35 +09:00

154 lines
5.1 KiB
Python

"""
Valuation data collector from KRX.
"""
import logging
import time
from io import BytesIO
from datetime import datetime
import pandas as pd
import requests
from sqlalchemy.orm import Session
from sqlalchemy.dialects.postgresql import insert
from app.services.collectors.base import BaseCollector
from app.models.stock import Valuation
logger = logging.getLogger(__name__)
class ValuationCollector(BaseCollector):
"""Collects valuation metrics (PER, PBR, etc.) from KRX."""
GEN_OTP_URL = "http://data.krx.co.kr/comm/fileDn/GenerateOTP/generate.cmd"
DOWN_URL = "http://data.krx.co.kr/comm/fileDn/download_csv/download.cmd"
HEADERS = {
"Referer": "http://data.krx.co.kr/contents/MDC/MDI/mdiLoader/index.cmd?menuId=MDC0201050201",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36",
}
REQUEST_TIMEOUT = 10
RATE_LIMIT_DELAY = 1
def __init__(self, db: Session, biz_day: str = None):
super().__init__(db)
self.biz_day = biz_day or self._get_latest_biz_day()
self._validate_biz_day()
def _validate_biz_day(self) -> None:
"""Validate business day format."""
try:
datetime.strptime(self.biz_day, "%Y%m%d")
except ValueError:
raise ValueError(f"Invalid biz_day format. Expected YYYYMMDD, got: {self.biz_day}")
def _safe_float(self, value) -> float | None:
"""Safely convert value to float."""
if pd.isna(value):
return None
try:
return float(value)
except (ValueError, TypeError):
return None
def collect(self) -> int:
"""Collect valuation data."""
gen_otp_data = {
"locale": "ko_KR",
"searchType": "1",
"mktId": "ALL",
"trdDd": self.biz_day,
"csvxls_isNo": "false",
"name": "fileDown",
"url": "dbms/MDC/STAT/standard/MDCSTAT03501",
}
try:
otp = requests.post(
self.GEN_OTP_URL,
data=gen_otp_data,
headers=self.HEADERS,
timeout=self.REQUEST_TIMEOUT
)
otp.raise_for_status()
otp_code = otp.text.strip()
if not otp_code:
raise RuntimeError("Received empty OTP from KRX API")
time.sleep(self.RATE_LIMIT_DELAY)
response = requests.post(
self.DOWN_URL,
data={"code": otp_code},
headers=self.HEADERS,
timeout=self.REQUEST_TIMEOUT
)
response.raise_for_status()
except requests.RequestException as e:
raise RuntimeError(f"Failed to fetch valuation data: {e}")
df = pd.read_csv(BytesIO(response.content), encoding="EUC-KR")
if df.empty:
logger.warning(f"Empty CSV response for {self.biz_day}")
return 0
df.columns = df.columns.str.replace(" ", "")
required_cols = ["종목코드", "PER", "PBR", "배당수익률"]
missing_cols = [col for col in required_cols if col not in df.columns]
if missing_cols:
raise ValueError(f"Required columns missing from CSV: {missing_cols}")
base_date = datetime.strptime(self.biz_day, "%Y%m%d").date()
logger.info(f"Processing {len(df)} valuation records for {self.biz_day}")
records = []
for _, row in df.iterrows():
ticker = row.get("종목코드")
if not ticker or pd.isna(ticker):
continue
per = self._safe_float(row.get("PER"))
pbr = self._safe_float(row.get("PBR"))
dividend_yield = self._safe_float(row.get("배당수익률"))
# Skip records where all metrics are None
if all(v is None for v in [per, pbr, dividend_yield]):
continue
records.append({
"ticker": ticker,
"base_date": base_date,
"per": per,
"pbr": pbr,
"psr": None, # Not available from this endpoint
"pcr": None, # Not available from this endpoint
"dividend_yield": dividend_yield,
})
if records:
try:
stmt = insert(Valuation).values(records)
stmt = stmt.on_conflict_do_update(
index_elements=["ticker", "base_date"],
set_={
"per": stmt.excluded.per,
"pbr": stmt.excluded.pbr,
"psr": stmt.excluded.psr,
"pcr": stmt.excluded.pcr,
"dividend_yield": stmt.excluded.dividend_yield,
},
)
self.db.execute(stmt)
self.db.commit()
except Exception as e:
self.db.rollback()
raise RuntimeError(f"Failed to insert valuation data: {e}")
logger.info(f"Collected {len(records)} valuation records")
return len(records)