From ae244153d668d018e9374a28eb78c91a41ace429 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EB=A8=B8=EB=8B=88=ED=8E=98=EB=8B=88?= Date: Wed, 15 Apr 2026 23:42:12 +0900 Subject: [PATCH] feat: generate portfolio snapshots from KRX market prices - New script: scripts/generate_snapshots.py - Fetches actual ETF closing prices from KRX for each month-end - Computes portfolio value from cumulative holdings at each date - Generates 12 monthly snapshots (2025-04 ~ 2026-03) - Added to CI/CD deploy pipeline --- .gitea/workflows/deploy.yml | 4 + backend/scripts/generate_snapshots.py | 263 ++++++++++++++++++++++++++ 2 files changed, 267 insertions(+) create mode 100644 backend/scripts/generate_snapshots.py diff --git a/.gitea/workflows/deploy.yml b/.gitea/workflows/deploy.yml index 3f5e755..24ebc56 100644 --- a/.gitea/workflows/deploy.yml +++ b/.gitea/workflows/deploy.yml @@ -74,3 +74,7 @@ jobs: - name: Re-seed portfolio data run: | docker exec galaxis-po-backend python -m scripts.seed_data + + - name: Generate portfolio snapshots + run: | + docker exec galaxis-po-backend python -m scripts.generate_snapshots diff --git a/backend/scripts/generate_snapshots.py b/backend/scripts/generate_snapshots.py new file mode 100644 index 0000000..9596342 --- /dev/null +++ b/backend/scripts/generate_snapshots.py @@ -0,0 +1,263 @@ +""" +Generate portfolio snapshots from trade history using actual market prices. + +Fetches closing prices from KRX (via pykrx) for each snapshot date, +then computes portfolio value based on cumulative holdings at that point. + +Snapshot dates: end of each month where trades occurred, plus latest available. + +Usage: + cd backend && python -m scripts.generate_snapshots + +Requires: + - DATABASE_URL environment variable + - KRX_ID / KRX_PW environment variables (pykrx >= 1.2.5) +""" +import sys +import os +import time +import logging +from datetime import date, datetime, timedelta +from decimal import Decimal, ROUND_HALF_UP +from collections import defaultdict +from json import JSONDecodeError + +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from pykrx import stock as pykrx_stock + +from sqlalchemy.orm import Session +from app.core.database import SessionLocal +from app.models.portfolio import ( + Portfolio, PortfolioSnapshot, SnapshotHolding, +) + +logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") +logger = logging.getLogger(__name__) + +# ETF name -> ticker mapping (must match seed_data.py) +ETF_MAP = { + "TIGER 200": "102110", + "KIWOOM 국고채10년": "148070", + "KODEX 200미국채혼합": "284430", + "TIGER 미국S&P500": "360750", + "ACE KRX금현물": "411060", +} + +TICKER_NAMES = {v: k for k, v in ETF_MAP.items()} + +# Trade history (same as seed_data.py) +TRADES = [ + (date(2025, 4, 29), "ACE KRX금현물", 1, Decimal("21620")), + (date(2025, 4, 29), "TIGER 미국S&P500", 329, Decimal("19770")), + (date(2025, 4, 29), "KIWOOM 국고채10년", 1, Decimal("118000")), + (date(2025, 4, 29), "TIGER 200", 16, Decimal("33815")), + (date(2025, 4, 30), "KODEX 200미국채혼합", 355, Decimal("13235")), + (date(2025, 5, 13), "ACE KRX금현물", 260, Decimal("20820")), + (date(2025, 5, 13), "KODEX 200미국채혼합", 14, Decimal("13165")), + (date(2025, 5, 14), "ACE KRX금현물", 45, Decimal("20760")), + (date(2025, 5, 14), "TIGER 미국S&P500", 45, Decimal("20690")), + (date(2025, 5, 14), "KODEX 200미국채혼합", 733, Decimal("13220")), + (date(2025, 5, 14), "KIWOOM 국고채10년", 90, Decimal("116939")), + (date(2025, 5, 16), "KODEX 200미국채혼합", 169, Decimal("13125")), + (date(2025, 6, 12), "ACE KRX금현물", 14, Decimal("20855")), + (date(2025, 6, 12), "TIGER 미국S&P500", 3, Decimal("20355")), + (date(2025, 6, 12), "KODEX 200미국채혼합", 88, Decimal("13570")), + (date(2025, 6, 12), "KIWOOM 국고채10년", 5, Decimal("115945")), + (date(2025, 7, 30), "KIWOOM 국고채10년", 6, Decimal("116760")), + (date(2025, 8, 14), "ACE KRX금현물", 6, Decimal("21095")), + (date(2025, 8, 14), "TIGER 미국S&P500", 3, Decimal("22200")), + (date(2025, 8, 14), "KODEX 200미국채혼합", 27, Decimal("14465")), + (date(2025, 8, 14), "KIWOOM 국고채10년", 1, Decimal("117075")), + (date(2025, 8, 19), "ACE KRX금현물", 2, Decimal("21030")), + (date(2025, 10, 13), "TIGER 미국S&P500", 3, Decimal("23480")), + (date(2025, 10, 13), "KIWOOM 국고채10년", 12, Decimal("116465")), + (date(2025, 12, 5), "KIWOOM 국고채10년", 7, Decimal("112830")), + (date(2026, 1, 7), "TIGER 미국S&P500", 2, Decimal("25015")), + (date(2026, 1, 8), "KIWOOM 국고채10년", 11, Decimal("109527")), + (date(2026, 2, 20), "TIGER 미국S&P500", 20, Decimal("24685")), + (date(2026, 2, 20), "KIWOOM 국고채10년", 9, Decimal("108500")), + (date(2026, 3, 23), "ACE KRX금현물", 41, Decimal("30095")), + (date(2026, 3, 23), "TIGER 미국S&P500", 128, Decimal("24290")), + (date(2026, 3, 23), "KODEX 200미국채혼합", 188, Decimal("19579")), + (date(2026, 3, 23), "KIWOOM 국고채10년", 10, Decimal("106780")), +] + + +def _get_holdings_at_date(target_date: date) -> dict[str, int]: + """Compute cumulative holdings at a given date.""" + holdings: dict[str, int] = defaultdict(int) + for trade_date, name, qty, _ in TRADES: + if trade_date <= target_date: + ticker = ETF_MAP[name] + holdings[ticker] += qty + return dict(holdings) + + +def _generate_snapshot_dates() -> list[date]: + """Generate month-end snapshot dates from first trade to today.""" + if not TRADES: + return [] + + first_date = min(t[0] for t in TRADES) + today = date.today() + + dates = [] + # Month-end dates + current = date(first_date.year, first_date.month, 1) + while current <= today: + # Last day of month + if current.month == 12: + next_month = date(current.year + 1, 1, 1) + else: + next_month = date(current.year, current.month + 1, 1) + last_day = next_month - timedelta(days=1) + + # Only include if we have holdings at this date + if last_day >= first_date and last_day <= today: + dates.append(last_day) + + current = next_month + + return dates + + +def _fetch_price_with_retry(ticker: str, date_str: str, max_retries: int = 3) -> Decimal | None: + """Fetch closing price for a ticker on a date, with fallback to previous days.""" + target = datetime.strptime(date_str, "%Y%m%d").date() + + for day_offset in range(5): # Try up to 5 days back (weekends/holidays) + try_date = target - timedelta(days=day_offset) + try_date_str = try_date.strftime("%Y%m%d") + + for attempt in range(max_retries): + try: + # For ETFs, use get_etf_ohlcv_by_date + df = pykrx_stock.get_etf_ohlcv_by_date(try_date_str, try_date_str, ticker) + if df is not None and not df.empty: + close = df.iloc[0]["종가"] + if close and float(close) > 0: + return Decimal(str(int(close))) + except (JSONDecodeError, ConnectionError, KeyError, ValueError) as e: + if attempt < max_retries - 1: + logger.warning(f"Retry {attempt+1}/{max_retries} for {ticker} on {try_date_str}: {e}") + time.sleep(2) + continue + + # Fallback: try stock API (for non-ETF tickers) + for day_offset in range(5): + try_date = target - timedelta(days=day_offset) + try_date_str = try_date.strftime("%Y%m%d") + try: + df = pykrx_stock.get_market_ohlcv(try_date_str, try_date_str, ticker) + if df is not None and not df.empty: + close = df.iloc[0]["종가"] + if close and float(close) > 0: + return Decimal(str(int(close))) + except Exception: + continue + + return None + + +def generate_snapshots(db: Session): + """Generate portfolio snapshots from trade history with actual market prices.""" + # Find portfolio + portfolio = db.query(Portfolio).filter(Portfolio.name == "연금 포트폴리오").first() + if not portfolio: + logger.error("Portfolio '연금 포트폴리오' not found. Run seed_data.py first.") + return + + # Delete existing snapshots + existing = db.query(PortfolioSnapshot).filter( + PortfolioSnapshot.portfolio_id == portfolio.id + ).all() + if existing: + for snap in existing: + db.delete(snap) + db.flush() + logger.info(f"Deleted {len(existing)} existing snapshots") + + snapshot_dates = _generate_snapshot_dates() + logger.info(f"Generating {len(snapshot_dates)} snapshots from {snapshot_dates[0]} to {snapshot_dates[-1]}") + + all_tickers = list(ETF_MAP.values()) + created = 0 + + for snap_date in snapshot_dates: + holdings = _get_holdings_at_date(snap_date) + if not holdings: + continue + + date_str = snap_date.strftime("%Y%m%d") + logger.info(f"Processing {snap_date} ({len(holdings)} tickers)...") + + # Fetch prices for all held tickers + prices: dict[str, Decimal] = {} + for ticker in holdings: + price = _fetch_price_with_retry(ticker, date_str) + if price: + prices[ticker] = price + else: + logger.warning(f" Could not fetch price for {TICKER_NAMES.get(ticker, ticker)} on {snap_date}") + + if not prices: + logger.warning(f" Skipping {snap_date}: no prices available") + continue + + # Calculate portfolio value + total_value = Decimal("0") + snapshot_holdings = [] + + for ticker, qty in holdings.items(): + if ticker not in prices: + continue + value = qty * prices[ticker] + total_value += value + snapshot_holdings.append({ + "ticker": ticker, + "quantity": qty, + "price": prices[ticker], + "value": value, + }) + + if total_value == 0: + continue + + # Create snapshot + snapshot = PortfolioSnapshot( + portfolio_id=portfolio.id, + total_value=total_value, + snapshot_date=snap_date, + ) + db.add(snapshot) + db.flush() + + # Create snapshot holdings with ratios + for h in snapshot_holdings: + ratio = (h["value"] / total_value * 100).quantize(Decimal("0.01"), rounding=ROUND_HALF_UP) + db.add(SnapshotHolding( + snapshot_id=snapshot.id, + ticker=h["ticker"], + quantity=h["quantity"], + price=h["price"], + value=h["value"], + current_ratio=ratio, + )) + + created += 1 + logger.info(f" Snapshot {snap_date}: total={total_value:,.0f}") + + # Rate limit: be gentle with KRX + time.sleep(1) + + db.commit() + logger.info(f"Done! Created {created} snapshots.") + + +if __name__ == "__main__": + db = SessionLocal() + try: + generate_snapshots(db) + finally: + db.close()