feat: add MetricsCalculator service

- Total return, CAGR, MDD, Sharpe ratio, volatility
- Benchmark comparison metrics

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
zephyrdark 2026-02-03 09:37:19 +09:00
parent d9d9c8d772
commit a78c00ecbb

View File

@ -0,0 +1,174 @@
"""
Backtest metrics calculation service.
"""
from decimal import Decimal
from typing import List
from dataclasses import dataclass
import math
@dataclass
class BacktestMetrics:
"""Calculated backtest metrics."""
total_return: Decimal
cagr: Decimal
mdd: Decimal
sharpe_ratio: Decimal
volatility: Decimal
benchmark_return: Decimal
excess_return: Decimal
class MetricsCalculator:
"""Calculates performance metrics from equity curve."""
TRADING_DAYS_PER_YEAR = 252
RISK_FREE_RATE = Decimal("0.03") # 3% annual risk-free rate
@classmethod
def calculate_all(
cls,
portfolio_values: List[Decimal],
benchmark_values: List[Decimal],
) -> BacktestMetrics:
"""Calculate all metrics from equity curves."""
if len(portfolio_values) < 2:
return BacktestMetrics(
total_return=Decimal("0"),
cagr=Decimal("0"),
mdd=Decimal("0"),
sharpe_ratio=Decimal("0"),
volatility=Decimal("0"),
benchmark_return=Decimal("0"),
excess_return=Decimal("0"),
)
years = Decimal(str(len(portfolio_values))) / Decimal(str(cls.TRADING_DAYS_PER_YEAR))
total_return = cls.calculate_total_return(portfolio_values)
benchmark_return = cls.calculate_total_return(benchmark_values)
returns = cls._calculate_daily_returns(portfolio_values)
return BacktestMetrics(
total_return=total_return,
cagr=cls.calculate_cagr(portfolio_values, years),
mdd=cls.calculate_mdd(portfolio_values),
sharpe_ratio=cls.calculate_sharpe_ratio(returns),
volatility=cls.calculate_volatility(returns),
benchmark_return=benchmark_return,
excess_return=total_return - benchmark_return,
)
@classmethod
def calculate_total_return(cls, values: List[Decimal]) -> Decimal:
"""Calculate total return percentage."""
if not values or values[0] == 0:
return Decimal("0")
return (values[-1] - values[0]) / values[0] * 100
@classmethod
def calculate_cagr(cls, values: List[Decimal], years: Decimal) -> Decimal:
"""Calculate Compound Annual Growth Rate."""
if not values or values[0] == 0 or years == 0:
return Decimal("0")
total_return = float(values[-1]) / float(values[0])
if total_return <= 0:
return Decimal("0")
cagr = (total_return ** (1 / float(years)) - 1) * 100
return Decimal(str(round(cagr, 4)))
@classmethod
def calculate_mdd(cls, values: List[Decimal]) -> Decimal:
"""Calculate Maximum Drawdown (negative value)."""
if not values:
return Decimal("0")
peak = values[0]
max_dd = Decimal("0")
for value in values:
if value > peak:
peak = value
if peak > 0:
dd = (peak - value) / peak * 100
if dd > max_dd:
max_dd = dd
return -max_dd
@classmethod
def calculate_sharpe_ratio(cls, daily_returns: List[Decimal]) -> Decimal:
"""Calculate Sharpe Ratio."""
if len(daily_returns) < 2:
return Decimal("0")
returns_float = [float(r) for r in daily_returns]
avg_return = sum(returns_float) / len(returns_float)
variance = sum((r - avg_return) ** 2 for r in returns_float) / len(returns_float)
std_return = math.sqrt(variance) if variance > 0 else 0
if std_return == 0:
return Decimal("0")
# Annualize
annual_return = avg_return * cls.TRADING_DAYS_PER_YEAR
annual_std = std_return * math.sqrt(cls.TRADING_DAYS_PER_YEAR)
sharpe = (annual_return - float(cls.RISK_FREE_RATE)) / annual_std
return Decimal(str(round(sharpe, 4)))
@classmethod
def calculate_volatility(cls, daily_returns: List[Decimal]) -> Decimal:
"""Calculate annualized volatility."""
if len(daily_returns) < 2:
return Decimal("0")
returns_float = [float(r) for r in daily_returns]
avg_return = sum(returns_float) / len(returns_float)
variance = sum((r - avg_return) ** 2 for r in returns_float) / len(returns_float)
std_return = math.sqrt(variance) if variance > 0 else 0
# Annualize
annual_volatility = std_return * math.sqrt(cls.TRADING_DAYS_PER_YEAR) * 100
return Decimal(str(round(annual_volatility, 4)))
@classmethod
def calculate_drawdown_series(cls, values: List[Decimal]) -> List[Decimal]:
"""Calculate drawdown for each point."""
if not values:
return []
drawdowns = []
peak = values[0]
for value in values:
if value > peak:
peak = value
if peak > 0:
dd = (peak - value) / peak * 100
drawdowns.append(-dd)
else:
drawdowns.append(Decimal("0"))
return drawdowns
@classmethod
def _calculate_daily_returns(cls, values: List[Decimal]) -> List[Decimal]:
"""Calculate daily returns from value series."""
if len(values) < 2:
return []
returns = []
for i in range(1, len(values)):
if values[i - 1] != 0:
ret = (values[i] - values[i - 1]) / values[i - 1]
returns.append(ret)
else:
returns.append(Decimal("0"))
return returns