zephyrdark 8d1a2f7937 feat: add DailyBacktestEngine for KJB signal-based backtesting
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-19 15:14:15 +09:00

246 lines
9.2 KiB
Python

"""
Daily simulation backtest engine for signal-based strategies (KJB).
"""
import logging
from datetime import date
from decimal import Decimal
from typing import Dict, List
import pandas as pd
from sqlalchemy.orm import Session
from app.models.backtest import (
Backtest, BacktestResult, BacktestEquityCurve,
BacktestTransaction,
)
from app.models.stock import Stock, Price
from app.services.backtest.trading_portfolio import TradingPortfolio, TradingTransaction
from app.services.backtest.metrics import MetricsCalculator
from app.services.strategy.kjb import KJBSignalGenerator
logger = logging.getLogger(__name__)
class DailyBacktestEngine:
"""
Backtest engine for KJB signal-based strategy.
Runs daily simulation with individual position management.
"""
def __init__(self, db: Session):
self.db = db
self.signal_gen = KJBSignalGenerator()
def run(self, backtest_id: int) -> None:
backtest = self.db.query(Backtest).get(backtest_id)
if not backtest:
raise ValueError(f"Backtest {backtest_id} not found")
params = backtest.strategy_params or {}
portfolio = TradingPortfolio(
initial_capital=backtest.initial_capital,
max_positions=params.get("max_positions", 10),
cash_reserve_ratio=Decimal(str(params.get("cash_reserve_ratio", 0.3))),
stop_loss_pct=Decimal(str(params.get("stop_loss_pct", 0.03))),
target1_pct=Decimal(str(params.get("target1_pct", 0.05))),
target2_pct=Decimal(str(params.get("target2_pct", 0.10))),
)
trading_days = self._get_trading_days(backtest.start_date, backtest.end_date)
if not trading_days:
raise ValueError("No trading days found")
universe_tickers = self._get_universe_tickers()
# Load all data upfront for performance
all_prices = self._load_all_prices(universe_tickers, backtest.start_date, backtest.end_date)
stock_dfs = self._build_stock_dfs(all_prices, universe_tickers)
kospi_df = self._load_kospi_df(backtest.start_date, backtest.end_date)
benchmark_prices = self._load_benchmark_prices(backtest.benchmark, backtest.start_date, backtest.end_date)
# Build day -> prices lookup
day_prices_map: Dict[date, Dict[str, Decimal]] = {}
for p in all_prices:
if p.date not in day_prices_map:
day_prices_map[p.date] = {}
day_prices_map[p.date][p.ticker] = p.close
equity_curve_data: List[Dict] = []
all_transactions: List[tuple] = []
initial_benchmark = benchmark_prices.get(trading_days[0], Decimal("1"))
if initial_benchmark == 0:
initial_benchmark = Decimal("1")
for trading_date in trading_days:
day_prices = day_prices_map.get(trading_date, {})
# 1. Check exits first
exit_txns = portfolio.check_exits(
date=trading_date,
prices=day_prices,
commission_rate=backtest.commission_rate,
slippage_rate=backtest.slippage_rate,
)
for txn in exit_txns:
all_transactions.append((trading_date, txn))
# 2. Check entry signals
for ticker in universe_tickers:
if ticker in portfolio.positions:
continue
if ticker not in stock_dfs or ticker not in day_prices:
continue
stock_df = stock_dfs[ticker]
if trading_date not in stock_df.index:
continue
hist = stock_df.loc[stock_df.index <= trading_date]
if len(hist) < 21:
continue
kospi_hist = kospi_df.loc[kospi_df.index <= trading_date]
if len(kospi_hist) < 11:
continue
signals = self.signal_gen.generate_signals(hist, kospi_hist)
if trading_date in signals.index and signals.loc[trading_date, "buy"]:
txn = portfolio.enter_position(
ticker=ticker,
price=day_prices[ticker],
date=trading_date,
commission_rate=backtest.commission_rate,
slippage_rate=backtest.slippage_rate,
)
if txn:
all_transactions.append((trading_date, txn))
# 3. Record daily value
portfolio_value = portfolio.get_value(day_prices)
benchmark_value = benchmark_prices.get(trading_date, initial_benchmark)
normalized_benchmark = benchmark_value / initial_benchmark * backtest.initial_capital
equity_curve_data.append({
"date": trading_date,
"portfolio_value": portfolio_value,
"benchmark_value": normalized_benchmark,
})
# Calculate and save
portfolio_values = [Decimal(str(e["portfolio_value"])) for e in equity_curve_data]
benchmark_values = [Decimal(str(e["benchmark_value"])) for e in equity_curve_data]
metrics = MetricsCalculator.calculate_all(portfolio_values, benchmark_values)
drawdowns = MetricsCalculator.calculate_drawdown_series(portfolio_values)
self._save_results(backtest_id, metrics, equity_curve_data, drawdowns, all_transactions)
def _get_trading_days(self, start_date: date, end_date: date) -> List[date]:
prices = (
self.db.query(Price.date)
.filter(Price.date >= start_date, Price.date <= end_date)
.distinct()
.order_by(Price.date)
.all()
)
return [p[0] for p in prices]
def _get_universe_tickers(self) -> List[str]:
stocks = (
self.db.query(Stock)
.filter(Stock.market_cap.isnot(None))
.order_by(Stock.market_cap.desc())
.limit(30)
.all()
)
return [s.ticker for s in stocks]
def _load_all_prices(self, tickers: List[str], start_date: date, end_date: date) -> List:
return (
self.db.query(Price)
.filter(Price.ticker.in_(tickers))
.filter(Price.date >= start_date, Price.date <= end_date)
.all()
)
def _load_kospi_df(self, start_date: date, end_date: date) -> pd.DataFrame:
prices = (
self.db.query(Price)
.filter(Price.ticker == "069500")
.filter(Price.date >= start_date, Price.date <= end_date)
.order_by(Price.date)
.all()
)
if not prices:
return pd.DataFrame(columns=["close"])
data = [{"date": p.date, "close": float(p.close)} for p in prices]
return pd.DataFrame(data).set_index("date")
def _load_benchmark_prices(self, benchmark: str, start_date: date, end_date: date) -> Dict[date, Decimal]:
prices = (
self.db.query(Price)
.filter(Price.ticker == "069500")
.filter(Price.date >= start_date, Price.date <= end_date)
.all()
)
return {p.date: p.close for p in prices}
def _build_stock_dfs(self, price_data: List, tickers: List[str]) -> Dict[str, pd.DataFrame]:
ticker_rows: Dict[str, list] = {t: [] for t in tickers}
for p in price_data:
if p.ticker in ticker_rows:
ticker_rows[p.ticker].append({
"date": p.date,
"open": float(p.open),
"high": float(p.high),
"low": float(p.low),
"close": float(p.close),
"volume": int(p.volume),
})
result = {}
for ticker, rows in ticker_rows.items():
if rows:
df = pd.DataFrame(rows).set_index("date").sort_index()
result[ticker] = df
return result
def _save_results(self, backtest_id, metrics, equity_curve_data, drawdowns, transactions):
result = BacktestResult(
backtest_id=backtest_id,
total_return=metrics.total_return,
cagr=metrics.cagr,
mdd=metrics.mdd,
sharpe_ratio=metrics.sharpe_ratio,
volatility=metrics.volatility,
benchmark_return=metrics.benchmark_return,
excess_return=metrics.excess_return,
)
self.db.add(result)
for i, point in enumerate(equity_curve_data):
curve_point = BacktestEquityCurve(
backtest_id=backtest_id,
date=point["date"],
portfolio_value=point["portfolio_value"],
benchmark_value=point["benchmark_value"],
drawdown=drawdowns[i] if i < len(drawdowns) else Decimal("0"),
)
self.db.add(curve_point)
for trading_date, txn in transactions:
t = BacktestTransaction(
backtest_id=backtest_id,
date=trading_date,
ticker=txn.ticker,
action=txn.action,
shares=txn.shares,
price=txn.price,
commission=txn.commission,
)
self.db.add(t)
self.db.commit()