278 lines
8.8 KiB
Python
Raw Permalink Normal View History

"""
Benchmark comparison service.
Compares portfolio performance against KOSPI index and deposit rate.
"""
import logging
import math
from datetime import date, timedelta
from decimal import Decimal
from typing import List, Optional
from pykrx import stock as pykrx_stock
from sqlalchemy.orm import Session
from app.models.portfolio import Portfolio, PortfolioSnapshot
from app.schemas.benchmark import (
BenchmarkCompareResponse,
PerformanceMetrics,
TimeSeriesPoint,
)
logger = logging.getLogger(__name__)
KOSPI_INDEX_CODE = "1001"
DEPOSIT_ANNUAL_RATE = 3.5
RISK_FREE_RATE = 3.5
class BenchmarkService:
def __init__(self, db: Session):
self.db = db
def get_deposit_rate(self) -> float:
return DEPOSIT_ANNUAL_RATE
def get_benchmark_data(
self, benchmark_type: str, start_date: date, end_date: date
) -> List[dict]:
start_str = start_date.strftime("%Y%m%d")
end_str = end_date.strftime("%Y%m%d")
if benchmark_type == "kospi":
df = pykrx_stock.get_index_ohlcv(start_str, end_str, KOSPI_INDEX_CODE)
else:
return []
if df.empty:
return []
result = []
for idx, row in df.iterrows():
result.append({
"date": idx.date() if hasattr(idx, "date") else idx,
"close": row["종가"],
})
return result
def compare_with_benchmark(
self,
portfolio_id: int,
benchmark_type: str,
period: str,
user_id: int,
) -> BenchmarkCompareResponse:
portfolio = (
self.db.query(Portfolio)
.filter(Portfolio.id == portfolio_id, Portfolio.user_id == user_id)
.first()
)
if not portfolio:
raise ValueError("Portfolio not found")
snapshots = (
self.db.query(PortfolioSnapshot)
.filter(PortfolioSnapshot.portfolio_id == portfolio_id)
.order_by(PortfolioSnapshot.snapshot_date)
.all()
)
if not snapshots:
raise ValueError("스냅샷 데이터가 없습니다")
end_date = snapshots[-1].snapshot_date
start_date = self._calc_start_date(period, snapshots[0].snapshot_date, end_date)
filtered = [s for s in snapshots if s.snapshot_date >= start_date]
if len(filtered) < 2:
filtered = snapshots
start_date = filtered[0].snapshot_date
num_days = (end_date - start_date).days
# Portfolio daily returns
portfolio_values = [float(s.total_value) for s in filtered]
portfolio_returns = self._values_to_returns(portfolio_values)
# Benchmark data
benchmark_data = self.get_benchmark_data(benchmark_type, start_date, end_date)
benchmark_closes = [d["close"] for d in benchmark_data]
benchmark_returns = self._values_to_returns(benchmark_closes)
# Deposit returns (daily)
daily_deposit_rate = (1 + DEPOSIT_ANNUAL_RATE / 100) ** (1 / 365) - 1
deposit_returns = [daily_deposit_rate] * max(num_days, 0)
# Cumulative return time series
time_series = self._build_time_series(
filtered, benchmark_data, start_date, end_date
)
# Metrics
portfolio_metrics = self._calculate_metrics(portfolio_returns, num_days)
benchmark_metrics = self._calculate_metrics(benchmark_returns, num_days)
deposit_metrics = self._calculate_metrics(deposit_returns, num_days)
alpha = portfolio_metrics.cumulative_return - benchmark_metrics.cumulative_return
info_ratio = self._calculate_information_ratio(
portfolio_returns, benchmark_returns
)
return BenchmarkCompareResponse(
portfolio_name=portfolio.name,
benchmark_type=benchmark_type,
period=period,
start_date=start_date,
end_date=end_date,
time_series=time_series,
portfolio_metrics=portfolio_metrics,
benchmark_metrics=benchmark_metrics,
deposit_metrics=deposit_metrics,
alpha=round(alpha, 2),
information_ratio=round(info_ratio, 4) if info_ratio is not None else None,
)
def _calculate_metrics(
self, returns: List[float], num_days: int
) -> PerformanceMetrics:
if not returns:
return PerformanceMetrics(
cumulative_return=0.0,
annualized_return=0.0,
sharpe_ratio=None,
max_drawdown=0.0,
)
# Cumulative return
cum = 1.0
for r in returns:
cum *= 1 + r
cum_return = (cum - 1) * 100
# Annualized return
if num_days > 0:
ann_return = (cum ** (365 / num_days) - 1) * 100
else:
ann_return = 0.0
# Sharpe ratio
if len(returns) >= 2:
mean_r = sum(returns) / len(returns)
variance = sum((r - mean_r) ** 2 for r in returns) / (len(returns) - 1)
std_r = math.sqrt(variance)
if std_r > 1e-10:
daily_rf = (1 + RISK_FREE_RATE / 100) ** (1 / 365) - 1
sharpe = (mean_r - daily_rf) / std_r * math.sqrt(252)
sharpe = round(sharpe, 4)
else:
sharpe = None
else:
sharpe = None
# Max drawdown
peak = 1.0
max_dd = 0.0
cum_val = 1.0
for r in returns:
cum_val *= 1 + r
if cum_val > peak:
peak = cum_val
dd = (cum_val - peak) / peak
if dd < max_dd:
max_dd = dd
max_dd_pct = max_dd * 100
return PerformanceMetrics(
cumulative_return=round(cum_return, 2),
annualized_return=round(ann_return, 2),
sharpe_ratio=sharpe,
max_drawdown=round(max_dd_pct, 2),
)
def _calculate_information_ratio(
self, portfolio_returns: List[float], benchmark_returns: List[float]
) -> Optional[float]:
if not portfolio_returns or not benchmark_returns:
return None
min_len = min(len(portfolio_returns), len(benchmark_returns))
excess = [
portfolio_returns[i] - benchmark_returns[i] for i in range(min_len)
]
if len(excess) < 2:
return None
mean_excess = sum(excess) / len(excess)
variance = sum((e - mean_excess) ** 2 for e in excess) / (len(excess) - 1)
tracking_error = math.sqrt(variance)
if tracking_error < 1e-10:
return None
return (mean_excess / tracking_error) * math.sqrt(252)
def _values_to_returns(self, values: List[float]) -> List[float]:
if len(values) < 2:
return []
return [
(values[i] - values[i - 1]) / values[i - 1]
for i in range(1, len(values))
if values[i - 1] != 0
]
def _calc_start_date(
self, period: str, first_snapshot: date, end_date: date
) -> date:
period_map = {
"1m": timedelta(days=30),
"3m": timedelta(days=90),
"6m": timedelta(days=180),
"1y": timedelta(days=365),
}
if period == "all":
return first_snapshot
delta = period_map.get(period, timedelta(days=365))
return max(end_date - delta, first_snapshot)
def _build_time_series(
self,
snapshots: list,
benchmark_data: List[dict],
start_date: date,
end_date: date,
) -> List[TimeSeriesPoint]:
if not snapshots:
return []
base_portfolio = float(snapshots[0].total_value)
portfolio_map = {}
for s in snapshots:
val = float(s.total_value)
ret = ((val / base_portfolio) - 1) * 100 if base_portfolio else 0
portfolio_map[s.snapshot_date] = ret
benchmark_map = {}
if benchmark_data:
base_bench = benchmark_data[0]["close"]
for d in benchmark_data:
ret = ((d["close"] / base_bench) - 1) * 100 if base_bench else 0
benchmark_map[d["date"]] = ret
daily_deposit_rate = DEPOSIT_ANNUAL_RATE / 100 / 365
all_dates = sorted(set(list(portfolio_map.keys()) + list(benchmark_map.keys())))
result = []
for d in all_dates:
days_elapsed = (d - start_date).days
deposit_ret = ((1 + DEPOSIT_ANNUAL_RATE / 100) ** (days_elapsed / 365) - 1) * 100
result.append(TimeSeriesPoint(
date=d,
portfolio_return=round(portfolio_map[d], 2) if d in portfolio_map else None,
benchmark_return=round(benchmark_map[d], 2) if d in benchmark_map else None,
deposit_return=round(deposit_ret, 2),
))
return result