From 0bcf5bbf23a8ab4ea4675e952b86fc9f8c7ec89b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EB=A8=B8=EB=8B=88=ED=8E=98=EB=8B=88?= Date: Thu, 14 May 2026 23:40:55 +0900 Subject: [PATCH] fix: bulk fallback price collection --- .../services/collectors/price_collector.py | 138 +++++++++--------- 1 file changed, 72 insertions(+), 66 deletions(-) diff --git a/backend/app/services/collectors/price_collector.py b/backend/app/services/collectors/price_collector.py index 2ee7dd0..12e66cd 100644 --- a/backend/app/services/collectors/price_collector.py +++ b/backend/app/services/collectors/price_collector.py @@ -147,88 +147,94 @@ class PriceCollector(BaseCollector): return total_records def _collect_pykrx(self) -> int: - """Collect stock prices via pykrx scraping (ticker-based loop).""" + """Collect stock prices via pykrx scraping (date/market bulk fetch).""" from pykrx import stock as pykrx_stock - tickers = self.db.query(Stock.ticker).all() - ticker_list = [t[0] for t in tickers] + tickers = self.db.query(Stock.ticker, Stock.market).all() + ticker_market = {ticker: market for ticker, market in tickers} - if not ticker_list: + if not ticker_market: logger.warning("No stocks found in database. Run StockCollector first.") return 0 total_records = 0 - logger.info(f"Collecting prices for {len(ticker_list)} stocks from {self.start_date} to {self.end_date}") + logger.info( + "Collecting prices for %d stocks from %s to %s", + len(ticker_market), self.start_date, self.end_date, + ) - for ticker in ticker_list: - try: - df = pykrx_stock.get_market_ohlcv( - self.start_date, self.end_date, ticker - ) - if df.empty: - continue + start = datetime.strptime(self.start_date, "%Y%m%d") + end = datetime.strptime(self.end_date, "%Y%m%d") + current = start - df = df.reset_index() - df.columns = ["date", "open", "high", "low", "close", "volume", - "value"] + def get_value(row, *names): + for name in names: + if name in row: + return row[name] + return None - expected_cols = 7 - if len(df.columns) < expected_cols: - logger.warning(f"Unexpected column count for {ticker}: {len(df.columns)}") - continue + while current <= end: + date_str = current.strftime("%Y%m%d") + date_value = current.date() - records = [] - for _, row in df.iterrows(): - open_val = self._safe_float(row["open"]) - high_val = self._safe_float(row["high"]) - low_val = self._safe_float(row["low"]) - close_val = self._safe_float(row["close"]) - volume_val = self._safe_int(row["volume"]) - - if close_val is None: - logger.debug(f"Skipping record for {ticker}: missing close price") + for market in ("KOSPI", "KOSDAQ"): + try: + df = pykrx_stock.get_market_ohlcv(date_str, market=market) + if df is None or df.empty: continue - date_value = row["date"].date() if hasattr(row["date"], "date") else row["date"] - records.append({ - "ticker": ticker, - "date": date_value, - "open": open_val, - "high": high_val, - "low": low_val, - "close": close_val, - "volume": volume_val, - "trading_value": self._safe_int(row["value"]), - }) + records = [] + for ticker, row in df.iterrows(): + ticker = str(ticker).zfill(6) + if ticker_market.get(ticker) != market: + continue - if records: - stmt = insert(Price).values(records) - stmt = stmt.on_conflict_do_update( - index_elements=["ticker", "date"], - set_={ - "open": stmt.excluded.open, - "high": stmt.excluded.high, - "low": stmt.excluded.low, - "close": stmt.excluded.close, - "volume": stmt.excluded.volume, - "trading_value": stmt.excluded.trading_value, - }, + close_val = self._safe_float(get_value(row, "종가", "close")) + if close_val is None or close_val == 0: + continue + + records.append({ + "ticker": ticker, + "date": date_value, + "open": self._safe_float(get_value(row, "시가", "open")), + "high": self._safe_float(get_value(row, "고가", "high")), + "low": self._safe_float(get_value(row, "저가", "low")), + "close": close_val, + "volume": self._safe_int(get_value(row, "거래량", "volume")), + "trading_value": self._safe_int(get_value(row, "거래대금", "value", "trading_value")), + }) + + if records: + stmt = insert(Price).values(records) + stmt = stmt.on_conflict_do_update( + index_elements=["ticker", "date"], + set_={ + "open": stmt.excluded.open, + "high": stmt.excluded.high, + "low": stmt.excluded.low, + "close": stmt.excluded.close, + "volume": stmt.excluded.volume, + "trading_value": stmt.excluded.trading_value, + }, + ) + self.db.execute(stmt) + self.db.commit() + total_records += len(records) + + except JSONDecodeError as e: + self.db.rollback() + logger.warning( + "Price fetch for %s %s: JSON decode error (%s). " + "KRX may require login — set KRX_ID/KRX_PW env vars.", + market, date_str, e, ) - self.db.execute(stmt) - self.db.commit() - total_records += len(records) + continue + except Exception as e: + self.db.rollback() + logger.warning("Failed to fetch prices for %s %s: %s", market, date_str, e) + continue - except JSONDecodeError as e: - self.db.rollback() - logger.warning( - f"Price fetch for {ticker}: JSON decode error ({e}). " - "KRX may require login — set KRX_ID/KRX_PW env vars." - ) - continue - except Exception as e: - self.db.rollback() - logger.warning(f"Failed to fetch prices for {ticker}: {e}") - continue + current += timedelta(days=1) return total_records