From 4261e9c7772f13ff8ca276e861d315a1a5060b68 Mon Sep 17 00:00:00 2001 From: zephyrdark Date: Sun, 8 Feb 2026 23:08:50 +0900 Subject: [PATCH] fix: switch StockCollector and ValuationCollector from KRX CSV to pykrx KRX CSV download endpoint blocks requests from cloud/server IPs, causing "No columns to parse from file" errors. Replaced with pykrx's JSON-based API (get_market_ticker_list, get_market_cap_by_ticker, get_market_fundamental_by_ticker) which is more reliable. Co-Authored-By: Claude Opus 4.6 --- .../services/collectors/stock_collector.py | 177 ++++++------------ .../collectors/valuation_collector.py | 87 ++------- 2 files changed, 77 insertions(+), 187 deletions(-) diff --git a/backend/app/services/collectors/stock_collector.py b/backend/app/services/collectors/stock_collector.py index 028ae0e..8c905dd 100644 --- a/backend/app/services/collectors/stock_collector.py +++ b/backend/app/services/collectors/stock_collector.py @@ -1,13 +1,11 @@ """ -Stock data collector from KRX. +Stock data collector using pykrx. """ import logging -import time -from io import BytesIO from datetime import datetime import pandas as pd -import requests +from pykrx import stock as pykrx_stock from sqlalchemy.orm import Session from sqlalchemy.dialects.postgresql import insert @@ -19,16 +17,7 @@ logger = logging.getLogger(__name__) class StockCollector(BaseCollector): - """Collects stock master data from KRX.""" - - GEN_OTP_URL = "http://data.krx.co.kr/comm/fileDn/GenerateOTP/generate.cmd" - DOWN_URL = "http://data.krx.co.kr/comm/fileDn/download_csv/download.cmd" - HEADERS = { - "Referer": "http://data.krx.co.kr/contents/MDC/MDI/mdiLoader/index.cmd?menuId=MDC0201050201", - "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36", - } - REQUEST_TIMEOUT = 10 - RATE_LIMIT_DELAY = 1 + """Collects stock master data using pykrx.""" def __init__(self, db: Session, biz_day: str = None): super().__init__(db) @@ -42,125 +31,82 @@ class StockCollector(BaseCollector): except ValueError: raise ValueError(f"Invalid biz_day format. Expected YYYYMMDD, got: {self.biz_day}") - def _fetch_stock_data(self, mkt_id: str) -> pd.DataFrame: - """Fetch stock data for a specific market.""" - gen_otp_data = { - "locale": "ko_KR", - "mktId": mkt_id, - "trdDd": self.biz_day, - "money": "1", - "csvxls_isNo": "false", - "name": "fileDown", - "url": "dbms/MDC/STAT/standard/MDCSTAT03901", - } - try: - otp = requests.post( - self.GEN_OTP_URL, - data=gen_otp_data, - headers=self.HEADERS, - timeout=self.REQUEST_TIMEOUT - ) - otp.raise_for_status() - response = requests.post( - self.DOWN_URL, - data={"code": otp.text}, - headers=self.HEADERS, - timeout=self.REQUEST_TIMEOUT - ) - response.raise_for_status() - time.sleep(self.RATE_LIMIT_DELAY) - return pd.read_csv(BytesIO(response.content), encoding="EUC-KR") - except requests.RequestException as e: - raise RuntimeError(f"Failed to fetch stock data for market {mkt_id}: {e}") - - def _fetch_ind_data(self) -> pd.DataFrame: - """Fetch individual stock indicators.""" - gen_otp_data = { - "locale": "ko_KR", - "searchType": "1", - "mktId": "ALL", - "trdDd": self.biz_day, - "csvxls_isNo": "false", - "name": "fileDown", - "url": "dbms/MDC/STAT/standard/MDCSTAT03501", - } - try: - otp = requests.post( - self.GEN_OTP_URL, - data=gen_otp_data, - headers=self.HEADERS, - timeout=self.REQUEST_TIMEOUT - ) - otp.raise_for_status() - response = requests.post( - self.DOWN_URL, - data={"code": otp.text}, - headers=self.HEADERS, - timeout=self.REQUEST_TIMEOUT - ) - response.raise_for_status() - time.sleep(self.RATE_LIMIT_DELAY) - return pd.read_csv(BytesIO(response.content), encoding="EUC-KR") - except requests.RequestException as e: - raise RuntimeError(f"Failed to fetch indicator data: {e}") - - def _classify_stock_type(self, row: pd.Series) -> str: + def _classify_stock_type(self, name: str, ticker: str) -> str: """Classify stock type based on name and code.""" - name = row.get("종목명", "") - code = row.get("종목코드", "") - - if "스팩" in name or "제" in name and "호" in name: + if "스팩" in name or ("제" in name and "호" in name): return StockType.SPAC.value - elif code and code[-1] != "0": + elif ticker and ticker[-1] != "0": return StockType.PREFERRED.value elif name.endswith("리츠"): return StockType.REIT.value else: return StockType.COMMON.value + def _safe_value(self, value): + """Safely extract a value, returning None for NaN.""" + if pd.isna(value): + return None + try: + return float(value) + except (ValueError, TypeError): + return None + def collect(self) -> int: """Collect stock master data.""" - # Fetch data from both markets - kospi = self._fetch_stock_data("STK") - kosdaq = self._fetch_stock_data("KSQ") - krx_sector = pd.concat([kospi, kosdaq]).reset_index(drop=True) - krx_sector["종목명"] = krx_sector["종목명"].str.strip() + # Get tickers per market (also caches ticker-name mappings internally) + kospi_tickers = pykrx_stock.get_market_ticker_list(self.biz_day, market="KOSPI") + kosdaq_tickers = pykrx_stock.get_market_ticker_list(self.biz_day, market="KOSDAQ") - # Fetch individual indicators - krx_ind = self._fetch_ind_data() - krx_ind["종목명"] = krx_ind["종목명"].str.strip() + ticker_market = {} + for t in kospi_tickers: + ticker_market[t] = "KOSPI" + for t in kosdaq_tickers: + ticker_market[t] = "KOSDAQ" - # Merge data - merged = pd.merge( - krx_sector, - krx_ind, - on=krx_sector.columns.intersection(krx_ind.columns).tolist(), - how="outer", - ) - merged.columns = merged.columns.str.replace(" ", "") + if not ticker_market: + logger.warning("No tickers found from pykrx.") + return 0 + + # Fetch bulk data + cap_df = pykrx_stock.get_market_cap_by_ticker(self.biz_day) + fund_df = pykrx_stock.get_market_fundamental_by_ticker(self.biz_day, market="ALL") + + base_date = datetime.strptime(self.biz_day, "%Y%m%d").date() - # Process and insert records = [] - for _, row in merged.iterrows(): - # Skip rows with missing required fields - ticker = row.get("종목코드") - name = row.get("종목명") - if not ticker or pd.isna(ticker) or not name or pd.isna(name): + for ticker, market in ticker_market.items(): + name = pykrx_stock.get_market_ticker_name(ticker) + if not name: continue - stock_type = self._classify_stock_type(row) + close_price = None + market_cap = None + if ticker in cap_df.index: + close_price = self._safe_value(cap_df.at[ticker, "종가"]) + market_cap = self._safe_value(cap_df.at[ticker, "시가총액"]) + + eps = None + bps = None + dps = None + if ticker in fund_df.index: + eps = self._safe_value(fund_df.at[ticker, "EPS"]) + bps = self._safe_value(fund_df.at[ticker, "BPS"]) + dps = self._safe_value(fund_df.at[ticker, "DPS"]) + + stock_type = self._classify_stock_type(name, ticker) + records.append({ - "ticker": row.get("종목코드"), - "name": row.get("종목명"), - "market": row.get("시장구분", "KOSPI"), - "close_price": row.get("종가") if pd.notna(row.get("종가")) else None, - "market_cap": row.get("시가총액") if pd.notna(row.get("시가총액")) else None, - "eps": row.get("EPS") if pd.notna(row.get("EPS")) else None, - "forward_eps": row.get("선행EPS") if pd.notna(row.get("선행EPS")) else None, - "bps": row.get("BPS") if pd.notna(row.get("BPS")) else None, - "dividend_per_share": row.get("주당배당금") if pd.notna(row.get("주당배당금")) else None, + "ticker": ticker, + "name": name, + "market": market, + "close_price": close_price, + "market_cap": market_cap, + "eps": eps, + "forward_eps": None, + "bps": bps, + "dividend_per_share": dps, "stock_type": stock_type, - "base_date": datetime.strptime(self.biz_day, "%Y%m%d").date(), + "base_date": base_date, }) # Upsert using PostgreSQL INSERT ON CONFLICT @@ -184,4 +130,5 @@ class StockCollector(BaseCollector): self.db.execute(stmt) self.db.commit() + logger.info(f"Collected {len(records)} stock records") return len(records) diff --git a/backend/app/services/collectors/valuation_collector.py b/backend/app/services/collectors/valuation_collector.py index 8a953cc..6d59922 100644 --- a/backend/app/services/collectors/valuation_collector.py +++ b/backend/app/services/collectors/valuation_collector.py @@ -1,13 +1,11 @@ """ -Valuation data collector from KRX. +Valuation data collector using pykrx. """ import logging -import time -from io import BytesIO from datetime import datetime import pandas as pd -import requests +from pykrx import stock as pykrx_stock from sqlalchemy.orm import Session from sqlalchemy.dialects.postgresql import insert @@ -20,16 +18,7 @@ logger = logging.getLogger(__name__) class ValuationCollector(BaseCollector): - """Collects valuation metrics (PER, PBR, etc.) from KRX.""" - - GEN_OTP_URL = "http://data.krx.co.kr/comm/fileDn/GenerateOTP/generate.cmd" - DOWN_URL = "http://data.krx.co.kr/comm/fileDn/download_csv/download.cmd" - HEADERS = { - "Referer": "http://data.krx.co.kr/contents/MDC/MDI/mdiLoader/index.cmd?menuId=MDC0201050201", - "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36", - } - REQUEST_TIMEOUT = 10 - RATE_LIMIT_DELAY = 1 + """Collects valuation metrics (PER, PBR, etc.) using pykrx.""" def __init__(self, db: Session, biz_day: str = None): super().__init__(db) @@ -54,70 +43,24 @@ class ValuationCollector(BaseCollector): def collect(self) -> int: """Collect valuation data.""" - gen_otp_data = { - "locale": "ko_KR", - "searchType": "1", - "mktId": "ALL", - "trdDd": self.biz_day, - "csvxls_isNo": "false", - "name": "fileDown", - "url": "dbms/MDC/STAT/standard/MDCSTAT03501", - } + fund_df = pykrx_stock.get_market_fundamental_by_ticker(self.biz_day, market="ALL") - try: - otp = requests.post( - self.GEN_OTP_URL, - data=gen_otp_data, - headers=self.HEADERS, - timeout=self.REQUEST_TIMEOUT - ) - otp.raise_for_status() - - otp_code = otp.text.strip() - if not otp_code: - raise RuntimeError("Received empty OTP from KRX API") - - time.sleep(self.RATE_LIMIT_DELAY) - - response = requests.post( - self.DOWN_URL, - data={"code": otp_code}, - headers=self.HEADERS, - timeout=self.REQUEST_TIMEOUT - ) - response.raise_for_status() - except requests.RequestException as e: - raise RuntimeError(f"Failed to fetch valuation data: {e}") - - df = pd.read_csv(BytesIO(response.content), encoding="EUC-KR") - - if df.empty: - logger.warning(f"Empty CSV response for {self.biz_day}") + if fund_df.empty: + logger.warning(f"No fundamental data returned for {self.biz_day}") return 0 - df.columns = df.columns.str.replace(" ", "") - - required_cols = ["종목코드", "PER", "PBR", "배당수익률"] - missing_cols = [col for col in required_cols if col not in df.columns] - if missing_cols: - raise ValueError(f"Required columns missing from CSV: {missing_cols}") - base_date = datetime.strptime(self.biz_day, "%Y%m%d").date() - logger.info(f"Processing {len(df)} valuation records for {self.biz_day}") + logger.info(f"Processing {len(fund_df)} valuation records for {self.biz_day}") records = [] - for _, row in df.iterrows(): - ticker = row.get("종목코드") - if not ticker or pd.isna(ticker): - continue - - per = self._safe_float(row.get("PER")) - pbr = self._safe_float(row.get("PBR")) - dividend_yield = self._safe_float(row.get("배당수익률")) + for ticker in fund_df.index: + per = self._safe_float(fund_df.at[ticker, "PER"]) + pbr = self._safe_float(fund_df.at[ticker, "PBR"]) + div_yield = self._safe_float(fund_df.at[ticker, "DIV"]) # Skip records where all metrics are None - if all(v is None for v in [per, pbr, dividend_yield]): + if all(v is None for v in [per, pbr, div_yield]): continue records.append({ @@ -125,9 +68,9 @@ class ValuationCollector(BaseCollector): "base_date": base_date, "per": per, "pbr": pbr, - "psr": None, # Not available from this endpoint - "pcr": None, # Not available from this endpoint - "dividend_yield": dividend_yield, + "psr": None, + "pcr": None, + "dividend_yield": div_yield, }) if records: