From 625ffadcab612185a276217aba8d439c8bf7e048 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EB=A8=B8=EB=8B=88=ED=8E=98=EB=8B=88?= Date: Wed, 13 May 2026 22:15:22 +0900 Subject: [PATCH] perf: bulk-fetch prices in generate_snapshots to reduce API calls MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit OpenAPI: date당 1회 호출 (기존 ticker×date회) pykrx: ticker당 전체 기간 조회 1회 (기존 date×ticker회) date별 sleep(1) 제거 --- backend/scripts/generate_snapshots.py | 166 ++++++++++++++------------ 1 file changed, 91 insertions(+), 75 deletions(-) diff --git a/backend/scripts/generate_snapshots.py b/backend/scripts/generate_snapshots.py index c1b8626..e5c67df 100644 --- a/backend/scripts/generate_snapshots.py +++ b/backend/scripts/generate_snapshots.py @@ -17,7 +17,7 @@ import sys import os import time import logging -from datetime import date, datetime, timedelta +from datetime import date, timedelta from decimal import Decimal, ROUND_HALF_UP from collections import defaultdict from json import JSONDecodeError @@ -118,82 +118,101 @@ def _generate_snapshot_dates() -> list[date]: return dates -def _fetch_price_openapi(ticker: str, date_str: str) -> Decimal | None: - """Fetch closing price via KRX Open API.""" +def _bulk_fetch_openapi(dates: list[date]) -> dict[date, dict[str, Decimal]]: + """Fetch all ticker prices for each snapshot date — one API call per date. + + get_etf_daily returns the full ETF universe for a given date, so a single + call covers all tickers at once. We try up to 5 prior calendar days to + handle weekends / public holidays. + """ client = get_krx_client() if not client: - return None + return {} - target = datetime.strptime(date_str, "%Y%m%d").date() + tickers = set(ETF_MAP.values()) + result: dict[date, dict[str, Decimal]] = {} - for day_offset in range(5): - try_date = target - timedelta(days=day_offset) - try_date_str = try_date.strftime("%Y%m%d") + for snap_date in dates: + for day_offset in range(5): + try_date = snap_date - timedelta(days=day_offset) + try: + df = client.get_etf_daily(try_date.strftime("%Y%m%d")) + if df is None or df.empty: + continue + prices: dict[str, Decimal] = {} + for ticker in tickers: + match = df[df["ISU_SRT_CD"] == ticker] + if not match.empty: + close = match.iloc[0].get("TDD_CLSPRC") + if close and float(close) > 0: + prices[ticker] = Decimal(str(int(float(close)))) + if prices: + result[snap_date] = prices + break + except Exception as e: + logger.warning(f"Open API {try_date}: {e}") - try: - df = client.get_etf_daily(try_date_str) - if df is not None and not df.empty: - match = df[df["ISU_SRT_CD"] == ticker] - if not match.empty: - close = match.iloc[0].get("TDD_CLSPRC") - if close and float(close) > 0: - return Decimal(str(int(float(close)))) - except Exception as e: - logger.warning(f"Open API fetch for {ticker} on {try_date_str}: {e}") - continue - - return None + return result -def _fetch_price_pykrx(ticker: str, date_str: str, max_retries: int = 3) -> Decimal | None: - """Fetch closing price via pykrx scraping.""" +def _bulk_fetch_pykrx(dates: list[date]) -> dict[date, dict[str, Decimal]]: + """Fetch each ticker's full price series in one range query — one API call + per ticker — then extract the needed snapshot dates from the cached result. + """ + import pandas as pd from pykrx import stock as pykrx_stock - target = datetime.strptime(date_str, "%Y%m%d").date() + if not dates: + return {} - for day_offset in range(5): - try_date = target - timedelta(days=day_offset) - try_date_str = try_date.strftime("%Y%m%d") + # Extra buffer so the 5-day fallback window is always covered + start_str = (min(dates) - timedelta(days=7)).strftime("%Y%m%d") + end_str = max(dates).strftime("%Y%m%d") - for attempt in range(max_retries): + series: dict[str, pd.DataFrame] = {} + for ticker in ETF_MAP.values(): + for attempt in range(3): try: - df = pykrx_stock.get_etf_ohlcv_by_date(try_date_str, try_date_str, ticker) + df = pykrx_stock.get_etf_ohlcv_by_date(start_str, end_str, ticker) if df is not None and not df.empty: - close = df.iloc[0]["종가"] - if close and float(close) > 0: - return Decimal(str(int(close))) + series[ticker] = df + break except (JSONDecodeError, ConnectionError, KeyError, ValueError) as e: - if attempt < max_retries - 1: - logger.warning(f"Retry {attempt+1}/{max_retries} for {ticker} on {try_date_str}: {e}") + if attempt < 2: + logger.warning(f"pykrx retry {attempt+1}/3 for {ticker}: {e}") time.sleep(2) - continue + else: + logger.warning(f"pykrx failed for {ticker}: {e}") - # Fallback: try stock API (for non-ETF tickers) - for day_offset in range(5): - try_date = target - timedelta(days=day_offset) - try_date_str = try_date.strftime("%Y%m%d") - try: - df = pykrx_stock.get_market_ohlcv(try_date_str, try_date_str, ticker) - if df is not None and not df.empty: - close = df.iloc[0]["종가"] - if close and float(close) > 0: - return Decimal(str(int(close))) - except Exception: - continue + result: dict[date, dict[str, Decimal]] = {} + for snap_date in dates: + prices: dict[str, Decimal] = {} + for ticker, df in series.items(): + for day_offset in range(5): + ts = pd.Timestamp(snap_date - timedelta(days=day_offset)) + if ts in df.index: + close = df.loc[ts, "종가"] + if close and float(close) > 0: + prices[ticker] = Decimal(str(int(close))) + break + if prices: + result[snap_date] = prices - return None + return result -def _fetch_price_with_retry(ticker: str, date_str: str, max_retries: int = 3) -> Decimal | None: - """Fetch closing price, preferring Open API with pykrx fallback.""" +def _bulk_fetch_prices(dates: list[date]) -> dict[date, dict[str, Decimal]]: + """Fetch prices for all snapshot dates, preferring Open API with pykrx fallback.""" client = get_krx_client() if client: - price = _fetch_price_openapi(ticker, date_str) - if price: - return price - logger.warning(f"Open API failed for {ticker} on {date_str}, trying pykrx") - - return _fetch_price_pykrx(ticker, date_str, max_retries) + result = _bulk_fetch_openapi(dates) + missing = [d for d in dates if d not in result] + if missing: + logger.warning(f"Open API missing {len(missing)} dates, falling back to pykrx") + pykrx_result = _bulk_fetch_pykrx(missing) + result.update(pykrx_result) + return result + return _bulk_fetch_pykrx(dates) def generate_snapshots(db: Session): @@ -215,7 +234,11 @@ def generate_snapshots(db: Session): snapshot_dates = _generate_snapshot_dates() logger.info(f"Generating {len(snapshot_dates)} snapshots from {snapshot_dates[0]} to {snapshot_dates[-1]}") - all_tickers = list(ETF_MAP.values()) + # Bulk-fetch all prices upfront — minimises total API calls + logger.info("Fetching prices in bulk...") + all_prices = _bulk_fetch_prices(snapshot_dates) + logger.info(f"Prices fetched for {len(all_prices)}/{len(snapshot_dates)} dates") + created = 0 for snap_date in snapshot_dates: @@ -223,33 +246,28 @@ def generate_snapshots(db: Session): if not holdings: continue - date_str = snap_date.strftime("%Y%m%d") - logger.info(f"Processing {snap_date} ({len(holdings)} tickers)...") - - prices: dict[str, Decimal] = {} - for ticker in holdings: - price = _fetch_price_with_retry(ticker, date_str) - if price: - prices[ticker] = price - else: - logger.warning(f" Could not fetch price for {TICKER_NAMES.get(ticker, ticker)} on {snap_date}") - - if not prices: - logger.warning(f" Skipping {snap_date}: no prices available") + date_prices = all_prices.get(snap_date, {}) + if not date_prices: + logger.warning(f"Skipping {snap_date}: no prices available") continue + for ticker in holdings: + if ticker not in date_prices: + logger.warning(f" Missing price for {TICKER_NAMES.get(ticker, ticker)} on {snap_date}") + total_value = Decimal("0") snapshot_holdings = [] for ticker, qty in holdings.items(): - if ticker not in prices: + price = date_prices.get(ticker) + if not price: continue - value = qty * prices[ticker] + value = qty * price total_value += value snapshot_holdings.append({ "ticker": ticker, "quantity": qty, - "price": prices[ticker], + "price": price, "value": value, }) @@ -278,8 +296,6 @@ def generate_snapshots(db: Session): created += 1 logger.info(f" Snapshot {snap_date}: total={total_value:,.0f}") - time.sleep(1) - db.commit() logger.info(f"Done! Created {created} snapshots.")