diff --git a/backend/alembic/versions/6c09aa4368e5_add_signals_table.py b/backend/alembic/versions/6c09aa4368e5_add_signals_table.py new file mode 100644 index 0000000..260c667 --- /dev/null +++ b/backend/alembic/versions/6c09aa4368e5_add_signals_table.py @@ -0,0 +1,47 @@ +"""add signals table + +Revision ID: 6c09aa4368e5 +Revises: 882512221354 +Create Date: 2026-02-19 12:00:00.000000 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = '6c09aa4368e5' +down_revision: Union[str, None] = '882512221354' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.create_table('signals', + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('date', sa.Date(), nullable=False), + sa.Column('ticker', sa.String(length=20), nullable=False), + sa.Column('name', sa.String(length=100), nullable=True), + sa.Column('signal_type', sa.Enum('BUY', 'SELL', 'PARTIAL_SELL', name='signaltype'), nullable=False), + sa.Column('entry_price', sa.Numeric(precision=12, scale=2), nullable=True), + sa.Column('target_price', sa.Numeric(precision=12, scale=2), nullable=True), + sa.Column('stop_loss_price', sa.Numeric(precision=12, scale=2), nullable=True), + sa.Column('reason', sa.String(length=200), nullable=True), + sa.Column('status', sa.Enum('ACTIVE', 'EXECUTED', 'EXPIRED', name='signalstatus'), nullable=True), + sa.Column('created_at', sa.DateTime(), nullable=True), + sa.PrimaryKeyConstraint('id'), + ) + op.create_index(op.f('ix_signals_date'), 'signals', ['date'], unique=False) + op.create_index(op.f('ix_signals_id'), 'signals', ['id'], unique=False) + op.create_index(op.f('ix_signals_ticker'), 'signals', ['ticker'], unique=False) + + +def downgrade() -> None: + op.drop_index(op.f('ix_signals_ticker'), table_name='signals') + op.drop_index(op.f('ix_signals_id'), table_name='signals') + op.drop_index(op.f('ix_signals_date'), table_name='signals') + op.drop_table('signals') + sa.Enum('BUY', 'SELL', 'PARTIAL_SELL', name='signaltype').drop(op.get_bind(), checkfirst=True) + sa.Enum('ACTIVE', 'EXECUTED', 'EXPIRED', name='signalstatus').drop(op.get_bind(), checkfirst=True) diff --git a/backend/app/api/__init__.py b/backend/app/api/__init__.py index 2387385..1530c2c 100644 --- a/backend/app/api/__init__.py +++ b/backend/app/api/__init__.py @@ -6,6 +6,7 @@ from app.api.market import router as market_router from app.api.backtest import router as backtest_router from app.api.snapshot import router as snapshot_router from app.api.data_explorer import router as data_explorer_router +from app.api.signal import router as signal_router __all__ = [ "auth_router", @@ -16,4 +17,5 @@ __all__ = [ "backtest_router", "snapshot_router", "data_explorer_router", + "signal_router", ] diff --git a/backend/app/api/signal.py b/backend/app/api/signal.py new file mode 100644 index 0000000..be24382 --- /dev/null +++ b/backend/app/api/signal.py @@ -0,0 +1,56 @@ +""" +KJB Signal API endpoints. +""" +from datetime import date +from typing import List, Optional + +from fastapi import APIRouter, Depends, Query +from sqlalchemy.orm import Session + +from app.core.database import get_db +from app.api.deps import CurrentUser +from app.models.signal import Signal +from app.schemas.signal import SignalResponse + +router = APIRouter(prefix="/api/signal", tags=["signal"]) + + +@router.get("/kjb/today", response_model=List[SignalResponse]) +async def get_today_signals( + current_user: CurrentUser, + db: Session = Depends(get_db), +): + """Get today's KJB trading signals.""" + today = date.today() + signals = ( + db.query(Signal) + .filter(Signal.date == today) + .order_by(Signal.signal_type, Signal.ticker) + .all() + ) + return signals + + +@router.get("/kjb/history", response_model=List[SignalResponse]) +async def get_signal_history( + current_user: CurrentUser, + db: Session = Depends(get_db), + start_date: Optional[date] = Query(None), + end_date: Optional[date] = Query(None), + ticker: Optional[str] = Query(None), + limit: int = Query(100, ge=1, le=1000), +): + """Get historical KJB signals.""" + query = db.query(Signal) + if start_date: + query = query.filter(Signal.date >= start_date) + if end_date: + query = query.filter(Signal.date <= end_date) + if ticker: + query = query.filter(Signal.ticker == ticker) + signals = ( + query.order_by(Signal.date.desc(), Signal.ticker) + .limit(limit) + .all() + ) + return signals diff --git a/backend/app/api/strategy.py b/backend/app/api/strategy.py index f1a6331..3f04bca 100644 --- a/backend/app/api/strategy.py +++ b/backend/app/api/strategy.py @@ -7,9 +7,9 @@ from sqlalchemy.orm import Session from app.core.database import get_db from app.api.deps import CurrentUser from app.schemas.strategy import ( - MultiFactorRequest, QualityRequest, ValueMomentumRequest, StrategyResult, + MultiFactorRequest, QualityRequest, ValueMomentumRequest, KJBRequest, StrategyResult, ) -from app.services.strategy import MultiFactorStrategy, QualityStrategy, ValueMomentumStrategy +from app.services.strategy import MultiFactorStrategy, QualityStrategy, ValueMomentumStrategy, KJBStrategy router = APIRouter(prefix="/api/strategy", tags=["strategy"]) @@ -61,3 +61,18 @@ async def run_value_momentum( value_weight=request.value_weight, momentum_weight=request.momentum_weight, ) + + +@router.post("/kjb", response_model=StrategyResult) +async def run_kjb( + request: KJBRequest, + current_user: CurrentUser, + db: Session = Depends(get_db), +): + """Run KJB strategy.""" + strategy = KJBStrategy(db) + return strategy.run( + universe_filter=request.universe, + top_n=request.top_n, + base_date=request.base_date, + ) diff --git a/backend/app/main.py b/backend/app/main.py index a19a059..31cd38f 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -10,6 +10,7 @@ from fastapi.middleware.cors import CORSMiddleware from app.api import ( auth_router, admin_router, portfolio_router, strategy_router, market_router, backtest_router, snapshot_router, data_explorer_router, + signal_router, ) # Configure logging @@ -112,6 +113,7 @@ app.include_router(market_router) app.include_router(backtest_router) app.include_router(snapshot_router) app.include_router(data_explorer_router) +app.include_router(signal_router) @app.get("/health") diff --git a/backend/app/models/__init__.py b/backend/app/models/__init__.py index 0926640..2d01477 100644 --- a/backend/app/models/__init__.py +++ b/backend/app/models/__init__.py @@ -22,6 +22,7 @@ from app.models.stock import ( AssetClass, JobLog, ) +from app.models.signal import Signal, SignalType, SignalStatus from app.models.backtest import ( Backtest, BacktestStatus, @@ -60,4 +61,7 @@ __all__ = [ "BacktestEquityCurve", "BacktestHolding", "BacktestTransaction", + "Signal", + "SignalType", + "SignalStatus", ] diff --git a/backend/app/models/signal.py b/backend/app/models/signal.py new file mode 100644 index 0000000..fb8e4e7 --- /dev/null +++ b/backend/app/models/signal.py @@ -0,0 +1,40 @@ +""" +Trading signal models. +""" +import enum +from datetime import datetime + +from sqlalchemy import ( + Column, Integer, String, Numeric, DateTime, Date, + Text, Enum as SQLEnum, +) + +from app.core.database import Base + + +class SignalType(str, enum.Enum): + BUY = "buy" + SELL = "sell" + PARTIAL_SELL = "partial_sell" + + +class SignalStatus(str, enum.Enum): + ACTIVE = "active" + EXECUTED = "executed" + EXPIRED = "expired" + + +class Signal(Base): + __tablename__ = "signals" + + id = Column(Integer, primary_key=True, index=True) + date = Column(Date, nullable=False, index=True) + ticker = Column(String(20), nullable=False, index=True) + name = Column(String(100)) + signal_type = Column(SQLEnum(SignalType), nullable=False) + entry_price = Column(Numeric(12, 2)) + target_price = Column(Numeric(12, 2)) + stop_loss_price = Column(Numeric(12, 2)) + reason = Column(String(200)) + status = Column(SQLEnum(SignalStatus), default=SignalStatus.ACTIVE) + created_at = Column(DateTime, default=datetime.utcnow) diff --git a/backend/app/schemas/backtest.py b/backend/app/schemas/backtest.py index 5cc1523..9d47840 100644 --- a/backend/app/schemas/backtest.py +++ b/backend/app/schemas/backtest.py @@ -27,7 +27,7 @@ class BacktestStatus(str, Enum): class BacktestCreate(BaseModel): """Request to create a new backtest.""" - strategy_type: str = Field(..., description="multi_factor, quality, or value_momentum") + strategy_type: str = Field(..., description="multi_factor, quality, value_momentum, or kjb") strategy_params: Dict[str, Any] = Field(default_factory=dict) start_date: date end_date: date diff --git a/backend/app/schemas/signal.py b/backend/app/schemas/signal.py new file mode 100644 index 0000000..1892551 --- /dev/null +++ b/backend/app/schemas/signal.py @@ -0,0 +1,53 @@ +""" +Signal related Pydantic schemas. +""" +from datetime import date, datetime +from decimal import Decimal +from typing import Optional, List +from enum import Enum + +from pydantic import BaseModel + +from app.schemas.portfolio import FloatDecimal + + +class SignalType(str, Enum): + BUY = "buy" + SELL = "sell" + PARTIAL_SELL = "partial_sell" + + +class SignalStatus(str, Enum): + ACTIVE = "active" + EXECUTED = "executed" + EXPIRED = "expired" + + +class SignalResponse(BaseModel): + id: int + date: date + ticker: str + name: Optional[str] = None + signal_type: str + entry_price: Optional[FloatDecimal] = None + target_price: Optional[FloatDecimal] = None + stop_loss_price: Optional[FloatDecimal] = None + reason: Optional[str] = None + status: str + created_at: datetime + + class Config: + from_attributes = True + + +class ActivePosition(BaseModel): + ticker: str + name: Optional[str] = None + entry_date: date + entry_price: FloatDecimal + current_price: FloatDecimal + shares: int + stop_loss_price: FloatDecimal + target_price: FloatDecimal + pnl_percent: FloatDecimal + pnl_amount: FloatDecimal diff --git a/backend/app/schemas/strategy.py b/backend/app/schemas/strategy.py index 1646c57..d208227 100644 --- a/backend/app/schemas/strategy.py +++ b/backend/app/schemas/strategy.py @@ -50,6 +50,11 @@ class ValueMomentumRequest(StrategyRequest): momentum_weight: FloatDecimal = Field(default=Decimal("0.5"), ge=0, le=1) +class KJBRequest(StrategyRequest): + """KJB strategy request.""" + pass + + class StockFactor(BaseModel): """Factor scores for a single stock.""" ticker: str diff --git a/backend/app/services/backtest/__init__.py b/backend/app/services/backtest/__init__.py index ebb9b57..a34d66d 100644 --- a/backend/app/services/backtest/__init__.py +++ b/backend/app/services/backtest/__init__.py @@ -2,6 +2,8 @@ from app.services.backtest.engine import BacktestEngine, DataValidationResult from app.services.backtest.portfolio import VirtualPortfolio, Transaction, HoldingInfo from app.services.backtest.metrics import MetricsCalculator, BacktestMetrics from app.services.backtest.worker import submit_backtest, get_executor_status +from app.services.backtest.daily_engine import DailyBacktestEngine +from app.services.backtest.trading_portfolio import TradingPortfolio, TradingTransaction __all__ = [ "BacktestEngine", @@ -13,4 +15,7 @@ __all__ = [ "BacktestMetrics", "submit_backtest", "get_executor_status", + "DailyBacktestEngine", + "TradingPortfolio", + "TradingTransaction", ] diff --git a/backend/app/services/backtest/daily_engine.py b/backend/app/services/backtest/daily_engine.py new file mode 100644 index 0000000..5879cb3 --- /dev/null +++ b/backend/app/services/backtest/daily_engine.py @@ -0,0 +1,245 @@ +""" +Daily simulation backtest engine for signal-based strategies (KJB). +""" +import logging +from datetime import date +from decimal import Decimal +from typing import Dict, List + +import pandas as pd +from sqlalchemy.orm import Session + +from app.models.backtest import ( + Backtest, BacktestResult, BacktestEquityCurve, + BacktestTransaction, +) +from app.models.stock import Stock, Price +from app.services.backtest.trading_portfolio import TradingPortfolio, TradingTransaction +from app.services.backtest.metrics import MetricsCalculator +from app.services.strategy.kjb import KJBSignalGenerator + +logger = logging.getLogger(__name__) + + +class DailyBacktestEngine: + """ + Backtest engine for KJB signal-based strategy. + Runs daily simulation with individual position management. + """ + + def __init__(self, db: Session): + self.db = db + self.signal_gen = KJBSignalGenerator() + + def run(self, backtest_id: int) -> None: + backtest = self.db.query(Backtest).get(backtest_id) + if not backtest: + raise ValueError(f"Backtest {backtest_id} not found") + + params = backtest.strategy_params or {} + + portfolio = TradingPortfolio( + initial_capital=backtest.initial_capital, + max_positions=params.get("max_positions", 10), + cash_reserve_ratio=Decimal(str(params.get("cash_reserve_ratio", 0.3))), + stop_loss_pct=Decimal(str(params.get("stop_loss_pct", 0.03))), + target1_pct=Decimal(str(params.get("target1_pct", 0.05))), + target2_pct=Decimal(str(params.get("target2_pct", 0.10))), + ) + + trading_days = self._get_trading_days(backtest.start_date, backtest.end_date) + if not trading_days: + raise ValueError("No trading days found") + + universe_tickers = self._get_universe_tickers() + + # Load all data upfront for performance + all_prices = self._load_all_prices(universe_tickers, backtest.start_date, backtest.end_date) + stock_dfs = self._build_stock_dfs(all_prices, universe_tickers) + kospi_df = self._load_kospi_df(backtest.start_date, backtest.end_date) + benchmark_prices = self._load_benchmark_prices(backtest.benchmark, backtest.start_date, backtest.end_date) + + # Build day -> prices lookup + day_prices_map: Dict[date, Dict[str, Decimal]] = {} + for p in all_prices: + if p.date not in day_prices_map: + day_prices_map[p.date] = {} + day_prices_map[p.date][p.ticker] = p.close + + equity_curve_data: List[Dict] = [] + all_transactions: List[tuple] = [] + + initial_benchmark = benchmark_prices.get(trading_days[0], Decimal("1")) + if initial_benchmark == 0: + initial_benchmark = Decimal("1") + + for trading_date in trading_days: + day_prices = day_prices_map.get(trading_date, {}) + + # 1. Check exits first + exit_txns = portfolio.check_exits( + date=trading_date, + prices=day_prices, + commission_rate=backtest.commission_rate, + slippage_rate=backtest.slippage_rate, + ) + for txn in exit_txns: + all_transactions.append((trading_date, txn)) + + # 2. Check entry signals + for ticker in universe_tickers: + if ticker in portfolio.positions: + continue + if ticker not in stock_dfs or ticker not in day_prices: + continue + + stock_df = stock_dfs[ticker] + if trading_date not in stock_df.index: + continue + + hist = stock_df.loc[stock_df.index <= trading_date] + if len(hist) < 21: + continue + + kospi_hist = kospi_df.loc[kospi_df.index <= trading_date] + if len(kospi_hist) < 11: + continue + + signals = self.signal_gen.generate_signals(hist, kospi_hist) + + if trading_date in signals.index and signals.loc[trading_date, "buy"]: + txn = portfolio.enter_position( + ticker=ticker, + price=day_prices[ticker], + date=trading_date, + commission_rate=backtest.commission_rate, + slippage_rate=backtest.slippage_rate, + ) + if txn: + all_transactions.append((trading_date, txn)) + + # 3. Record daily value + portfolio_value = portfolio.get_value(day_prices) + benchmark_value = benchmark_prices.get(trading_date, initial_benchmark) + normalized_benchmark = benchmark_value / initial_benchmark * backtest.initial_capital + + equity_curve_data.append({ + "date": trading_date, + "portfolio_value": portfolio_value, + "benchmark_value": normalized_benchmark, + }) + + # Calculate and save + portfolio_values = [Decimal(str(e["portfolio_value"])) for e in equity_curve_data] + benchmark_values = [Decimal(str(e["benchmark_value"])) for e in equity_curve_data] + + metrics = MetricsCalculator.calculate_all(portfolio_values, benchmark_values) + drawdowns = MetricsCalculator.calculate_drawdown_series(portfolio_values) + + self._save_results(backtest_id, metrics, equity_curve_data, drawdowns, all_transactions) + + def _get_trading_days(self, start_date: date, end_date: date) -> List[date]: + prices = ( + self.db.query(Price.date) + .filter(Price.date >= start_date, Price.date <= end_date) + .distinct() + .order_by(Price.date) + .all() + ) + return [p[0] for p in prices] + + def _get_universe_tickers(self) -> List[str]: + stocks = ( + self.db.query(Stock) + .filter(Stock.market_cap.isnot(None)) + .order_by(Stock.market_cap.desc()) + .limit(30) + .all() + ) + return [s.ticker for s in stocks] + + def _load_all_prices(self, tickers: List[str], start_date: date, end_date: date) -> List: + return ( + self.db.query(Price) + .filter(Price.ticker.in_(tickers)) + .filter(Price.date >= start_date, Price.date <= end_date) + .all() + ) + + def _load_kospi_df(self, start_date: date, end_date: date) -> pd.DataFrame: + prices = ( + self.db.query(Price) + .filter(Price.ticker == "069500") + .filter(Price.date >= start_date, Price.date <= end_date) + .order_by(Price.date) + .all() + ) + if not prices: + return pd.DataFrame(columns=["close"]) + data = [{"date": p.date, "close": float(p.close)} for p in prices] + return pd.DataFrame(data).set_index("date") + + def _load_benchmark_prices(self, benchmark: str, start_date: date, end_date: date) -> Dict[date, Decimal]: + prices = ( + self.db.query(Price) + .filter(Price.ticker == "069500") + .filter(Price.date >= start_date, Price.date <= end_date) + .all() + ) + return {p.date: p.close for p in prices} + + def _build_stock_dfs(self, price_data: List, tickers: List[str]) -> Dict[str, pd.DataFrame]: + ticker_rows: Dict[str, list] = {t: [] for t in tickers} + for p in price_data: + if p.ticker in ticker_rows: + ticker_rows[p.ticker].append({ + "date": p.date, + "open": float(p.open), + "high": float(p.high), + "low": float(p.low), + "close": float(p.close), + "volume": int(p.volume), + }) + result = {} + for ticker, rows in ticker_rows.items(): + if rows: + df = pd.DataFrame(rows).set_index("date").sort_index() + result[ticker] = df + return result + + def _save_results(self, backtest_id, metrics, equity_curve_data, drawdowns, transactions): + result = BacktestResult( + backtest_id=backtest_id, + total_return=metrics.total_return, + cagr=metrics.cagr, + mdd=metrics.mdd, + sharpe_ratio=metrics.sharpe_ratio, + volatility=metrics.volatility, + benchmark_return=metrics.benchmark_return, + excess_return=metrics.excess_return, + ) + self.db.add(result) + + for i, point in enumerate(equity_curve_data): + curve_point = BacktestEquityCurve( + backtest_id=backtest_id, + date=point["date"], + portfolio_value=point["portfolio_value"], + benchmark_value=point["benchmark_value"], + drawdown=drawdowns[i] if i < len(drawdowns) else Decimal("0"), + ) + self.db.add(curve_point) + + for trading_date, txn in transactions: + t = BacktestTransaction( + backtest_id=backtest_id, + date=trading_date, + ticker=txn.ticker, + action=txn.action, + shares=txn.shares, + price=txn.price, + commission=txn.commission, + ) + self.db.add(t) + + self.db.commit() diff --git a/backend/app/services/backtest/trading_portfolio.py b/backend/app/services/backtest/trading_portfolio.py new file mode 100644 index 0000000..671eebd --- /dev/null +++ b/backend/app/services/backtest/trading_portfolio.py @@ -0,0 +1,219 @@ +""" +Trading portfolio for signal-based strategies (KJB). +Supports individual position management with stop-loss and trailing stops. +""" +from decimal import Decimal, ROUND_DOWN +from typing import Dict, List, Optional +from dataclasses import dataclass, field +from datetime import date + + +@dataclass +class Position: + """An active trading position.""" + ticker: str + shares: int + entry_price: Decimal + entry_date: date + stop_loss: Decimal + target1: Decimal + target2: Decimal + partial_sold: bool = False + + +@dataclass +class TradingTransaction: + """A single trading transaction.""" + ticker: str + action: str # 'buy', 'sell', 'partial_sell' + shares: int + price: Decimal + commission: Decimal + reason: str = "" + + +class TradingPortfolio: + """ + Portfolio for signal-based daily trading. + Individual position entry/exit with stop-loss, take-profit, trailing stop. + Cash reserve enforcement (30%). + Partial position exits (50% at +5%). + """ + + def __init__( + self, + initial_capital: Decimal, + max_positions: int = 10, + cash_reserve_ratio: Decimal = Decimal("0.3"), + stop_loss_pct: Decimal = Decimal("0.03"), + target1_pct: Decimal = Decimal("0.05"), + target2_pct: Decimal = Decimal("0.10"), + ): + self.initial_capital = initial_capital + self.cash = initial_capital + self.max_positions = max_positions + self.cash_reserve_ratio = cash_reserve_ratio + self.stop_loss_pct = stop_loss_pct + self.target1_pct = target1_pct + self.target2_pct = target2_pct + self.positions: Dict[str, Position] = {} + + @property + def investable_capital(self) -> Decimal: + return self.initial_capital * (1 - self.cash_reserve_ratio) + + @property + def position_size(self) -> Decimal: + return self.investable_capital / Decimal(str(self.max_positions)) + + def get_value(self, prices: Dict[str, Decimal]) -> Decimal: + holdings_value = sum( + Decimal(str(pos.shares)) * prices.get(pos.ticker, Decimal("0")) + for pos in self.positions.values() + ) + return self.cash + holdings_value + + def enter_position( + self, + ticker: str, + price: Decimal, + date: date, + commission_rate: Decimal, + slippage_rate: Decimal, + ) -> Optional[TradingTransaction]: + if len(self.positions) >= self.max_positions: + return None + if ticker in self.positions: + return None + + buy_price = price * (1 + slippage_rate) + max_cost = self.position_size + available = self.cash - (self.initial_capital * self.cash_reserve_ratio) + if available <= 0: + return None + actual_cost = min(max_cost, available) + + divisor = buy_price * (1 + commission_rate) + if divisor <= 0: + return None + shares = int((actual_cost / divisor).to_integral_value(rounding=ROUND_DOWN)) + + if shares <= 0: + return None + + cost = Decimal(str(shares)) * buy_price + commission = cost * commission_rate + total_cost = cost + commission + + self.cash -= total_cost + + self.positions[ticker] = Position( + ticker=ticker, + shares=shares, + entry_price=price, + entry_date=date, + stop_loss=price * (1 - self.stop_loss_pct), + target1=price * (1 + self.target1_pct), + target2=price * (1 + self.target2_pct), + ) + + return TradingTransaction( + ticker=ticker, + action="buy", + shares=shares, + price=buy_price, + commission=commission, + reason="entry_signal", + ) + + def check_exits( + self, + date: date, + prices: Dict[str, Decimal], + commission_rate: Decimal, + slippage_rate: Decimal, + ) -> List[TradingTransaction]: + transactions = [] + + for ticker in list(self.positions.keys()): + pos = self.positions[ticker] + current_price = prices.get(ticker) + if current_price is None: + continue + + sell_price = current_price * (1 - slippage_rate) + + # 1. Stop-loss + if current_price <= pos.stop_loss: + txn = self._exit_full(ticker, sell_price, commission_rate, "stop_loss") + if txn: + transactions.append(txn) + continue + + # 2. Take-profit 2: +10% (full exit of remaining after partial) + if current_price >= pos.target2 and pos.partial_sold: + txn = self._exit_full(ticker, sell_price, commission_rate, "take_profit_2") + if txn: + transactions.append(txn) + continue + + # 3. Take-profit 1: +5% (partial exit 50%) + if current_price >= pos.target1 and not pos.partial_sold: + txn = self._exit_partial(ticker, sell_price, commission_rate) + if txn: + transactions.append(txn) + pos.stop_loss = pos.entry_price + continue + + # 4. Trailing stop update + if pos.partial_sold: + gain_pct = (current_price - pos.entry_price) / pos.entry_price + if gain_pct >= self.target2_pct: + new_stop = pos.entry_price * (1 + self.target1_pct) + if new_stop > pos.stop_loss: + pos.stop_loss = new_stop + + return transactions + + def _exit_full( + self, ticker: str, sell_price: Decimal, + commission_rate: Decimal, reason: str, + ) -> Optional[TradingTransaction]: + pos = self.positions.get(ticker) + if not pos or pos.shares <= 0: + return None + + proceeds = Decimal(str(pos.shares)) * sell_price + commission = proceeds * commission_rate + self.cash += proceeds - commission + + shares = pos.shares + del self.positions[ticker] + + return TradingTransaction( + ticker=ticker, action="sell", shares=shares, + price=sell_price, commission=commission, reason=reason, + ) + + def _exit_partial( + self, ticker: str, sell_price: Decimal, commission_rate: Decimal, + ) -> Optional[TradingTransaction]: + pos = self.positions.get(ticker) + if not pos or pos.shares <= 0: + return None + + sell_shares = pos.shares // 2 + if sell_shares <= 0: + return None + + proceeds = Decimal(str(sell_shares)) * sell_price + commission = proceeds * commission_rate + self.cash += proceeds - commission + + pos.shares -= sell_shares + pos.partial_sold = True + + return TradingTransaction( + ticker=ticker, action="partial_sell", shares=sell_shares, + price=sell_price, commission=commission, reason="take_profit_1", + ) diff --git a/backend/app/services/backtest/worker.py b/backend/app/services/backtest/worker.py index d8a228e..bdca866 100644 --- a/backend/app/services/backtest/worker.py +++ b/backend/app/services/backtest/worker.py @@ -44,8 +44,12 @@ def _run_backtest_job(backtest_id: int) -> None: db.commit() logger.info(f"Backtest {backtest_id} started") - # Run backtest - engine = BacktestEngine(db) + # Run backtest - route KJB to DailyBacktestEngine + if backtest.strategy_type == "kjb": + from app.services.backtest.daily_engine import DailyBacktestEngine + engine = DailyBacktestEngine(db) + else: + engine = BacktestEngine(db) engine.run(backtest_id) # Update status to completed diff --git a/backend/app/services/strategy/__init__.py b/backend/app/services/strategy/__init__.py index 8c5de1c..d9aacd1 100644 --- a/backend/app/services/strategy/__init__.py +++ b/backend/app/services/strategy/__init__.py @@ -2,5 +2,6 @@ from app.services.strategy.base import BaseStrategy from app.services.strategy.multi_factor import MultiFactorStrategy from app.services.strategy.quality import QualityStrategy from app.services.strategy.value_momentum import ValueMomentumStrategy +from app.services.strategy.kjb import KJBStrategy, KJBSignalGenerator -__all__ = ["BaseStrategy", "MultiFactorStrategy", "QualityStrategy", "ValueMomentumStrategy"] +__all__ = ["BaseStrategy", "MultiFactorStrategy", "QualityStrategy", "ValueMomentumStrategy", "KJBStrategy", "KJBSignalGenerator"] diff --git a/backend/app/services/strategy/kjb.py b/backend/app/services/strategy/kjb.py new file mode 100644 index 0000000..349727a --- /dev/null +++ b/backend/app/services/strategy/kjb.py @@ -0,0 +1,178 @@ +""" +Kim Jong-bong (KJB) strategy implementation. + +Signal-based short-term trading strategy: +- Universe: market cap top 30, daily trading value >= 200B KRW +- Entry: relative strength > KOSPI + breakout or large candle +- Exit: stop-loss -3%, take-profit +5%/+10%, trailing stop +""" +from datetime import date +from decimal import Decimal +from typing import Dict, List, Optional + +import pandas as pd +from sqlalchemy.orm import Session + +from app.services.strategy.base import BaseStrategy +from app.schemas.strategy import StockFactor, StrategyResult, UniverseFilter +from app.services.factor_calculator import FactorCalculator + + +class KJBSignalGenerator: + """ + Generates daily buy/sell signals based on KJB rules. + Pure computation - no DB access. Takes DataFrames as input. + """ + + def calculate_relative_strength( + self, + stock_df: pd.DataFrame, + kospi_df: pd.DataFrame, + lookback: int = 10, + ) -> pd.Series: + """ + RS = (stock return / market return) * 100 + RS > 100 means stock outperforms market. + """ + stock_ret = stock_df["close"].pct_change(lookback) + kospi_ret = kospi_df["close"].pct_change(lookback) + + # Align on common index + aligned = pd.DataFrame({ + "stock_ret": stock_ret, + "kospi_ret": kospi_ret, + }).dropna() + + rs = pd.Series(dtype=float, index=stock_df.index) + for idx in aligned.index: + market_ret = aligned.loc[idx, "kospi_ret"] + stock_r = aligned.loc[idx, "stock_ret"] + if abs(market_ret) < 1e-10: + rs[idx] = 100.0 if abs(stock_r) < 1e-10 else (200.0 if stock_r > 0 else 0.0) + else: + rs[idx] = (stock_r / market_ret) * 100 + return rs + + def detect_breakout( + self, + stock_df: pd.DataFrame, + lookback: int = 20, + ) -> pd.Series: + """Close > highest high of previous lookback days.""" + prev_high = stock_df["high"].rolling(lookback).max().shift(1) + return stock_df["close"] > prev_high + + def detect_large_candle( + self, + stock_df: pd.DataFrame, + pct_threshold: float = 0.05, + vol_multiplier: float = 1.5, + ) -> pd.Series: + """ + Daily return >= 5% AND volume >= 1.5x 20-day average. + """ + daily_return = stock_df["close"].pct_change() + avg_volume = stock_df["volume"].rolling(20).mean() + volume_ratio = stock_df["volume"] / avg_volume + return (daily_return >= pct_threshold) & (volume_ratio >= vol_multiplier) + + def generate_signals( + self, + stock_df: pd.DataFrame, + kospi_df: pd.DataFrame, + rs_lookback: int = 10, + breakout_lookback: int = 20, + ) -> pd.DataFrame: + """ + Buy when: RS > 100 AND (breakout OR large candle) + """ + rs = self.calculate_relative_strength(stock_df, kospi_df, rs_lookback) + breakout = self.detect_breakout(stock_df, breakout_lookback) + large_candle = self.detect_large_candle(stock_df) + + signals = pd.DataFrame(index=stock_df.index) + signals["rs"] = rs + signals["breakout"] = breakout.fillna(False) + signals["large_candle"] = large_candle.fillna(False) + signals["buy"] = (rs > 100) & (breakout.fillna(False) | large_candle.fillna(False)) + signals["buy"] = signals["buy"].fillna(False) + return signals + + +class KJBStrategy(BaseStrategy): + """ + KJB strategy for stock ranking. + Ranks stocks by relative strength and momentum. + Compatible with existing strategy pattern (returns StrategyResult). + """ + + strategy_name = "kjb" + + def run( + self, + universe_filter: UniverseFilter, + top_n: int, + base_date: date = None, + **kwargs, + ) -> StrategyResult: + if base_date is None: + base_date = date.today() + + # Get universe - filter to top 30 by market cap + stocks = self.get_universe(universe_filter) + stocks.sort(key=lambda s: s.market_cap or 0, reverse=True) + stocks = stocks[:30] + + tickers = [s.ticker for s in stocks] + stock_map = {s.ticker: s for s in stocks} + + if not tickers: + return StrategyResult( + strategy_name=self.strategy_name, + base_date=base_date, + universe_count=0, + result_count=0, + stocks=[], + ) + + # Get valuations and sectors + valuations = self.factor_calc.get_valuations(tickers, base_date) + sectors = self.factor_calc.get_sectors(tickers) + + # Calculate 1-month momentum as ranking proxy + momentum = self.factor_calc.calculate_momentum( + tickers, base_date, months=1, skip_recent=0, + ) + + # Build results + results = [] + for ticker in tickers: + stock = stock_map[ticker] + val = valuations.get(ticker) + mom = momentum.get(ticker, Decimal("0")) + + results.append(StockFactor( + ticker=ticker, + name=stock.name, + market=stock.market, + sector_name=sectors.get(ticker), + market_cap=int(stock.market_cap / 100_000_000) if stock.market_cap else None, + close_price=Decimal(str(stock.close_price)) if stock.close_price else None, + per=Decimal(str(val.per)) if val and val.per else None, + pbr=Decimal(str(val.pbr)) if val and val.pbr else None, + dividend_yield=Decimal(str(val.dividend_yield)) if val and val.dividend_yield else None, + momentum_score=mom, + total_score=mom, + )) + + results.sort(key=lambda x: x.total_score or Decimal("0"), reverse=True) + for i, r in enumerate(results[:top_n], 1): + r.rank = i + + return StrategyResult( + strategy_name=self.strategy_name, + base_date=base_date, + universe_count=len(stocks), + result_count=min(top_n, len(results)), + stocks=results[:top_n], + ) diff --git a/backend/jobs/__init__.py b/backend/jobs/__init__.py index b8829b1..020b5f0 100644 --- a/backend/jobs/__init__.py +++ b/backend/jobs/__init__.py @@ -3,8 +3,10 @@ Background jobs module. """ from jobs.scheduler import scheduler, start_scheduler, stop_scheduler from jobs.collection_job import run_daily_collection, run_backfill +from jobs.kjb_signal_job import run_kjb_signals __all__ = [ "scheduler", "start_scheduler", "stop_scheduler", "run_daily_collection", "run_backfill", + "run_kjb_signals", ] diff --git a/backend/jobs/kjb_signal_job.py b/backend/jobs/kjb_signal_job.py new file mode 100644 index 0000000..4d9805c --- /dev/null +++ b/backend/jobs/kjb_signal_job.py @@ -0,0 +1,109 @@ +""" +Daily KJB signal generation job. +""" +import logging +from datetime import date, timedelta + +import pandas as pd + +from app.core.database import SessionLocal +from app.models.stock import Stock, Price +from app.models.signal import Signal, SignalType, SignalStatus +from app.services.strategy.kjb import KJBSignalGenerator + +logger = logging.getLogger(__name__) + + +def run_kjb_signals(): + """ + Generate KJB trading signals for today. + Called by scheduler at 18:15 KST (after price collection). + """ + logger.info("Starting KJB signal generation") + db = SessionLocal() + + try: + today = date.today() + signal_gen = KJBSignalGenerator() + + stocks = ( + db.query(Stock) + .filter(Stock.market_cap.isnot(None)) + .order_by(Stock.market_cap.desc()) + .limit(30) + .all() + ) + tickers = [s.ticker for s in stocks] + name_map = {s.ticker: s.name for s in stocks} + + lookback_start = today - timedelta(days=90) + kospi_prices = ( + db.query(Price) + .filter(Price.ticker == "069500") + .filter(Price.date >= lookback_start, Price.date <= today) + .order_by(Price.date) + .all() + ) + if not kospi_prices: + logger.warning("No KOSPI data available for signal generation") + return + + kospi_df = pd.DataFrame([ + {"date": p.date, "close": float(p.close)} + for p in kospi_prices + ]).set_index("date") + + signals_created = 0 + + for ticker in tickers: + stock_prices = ( + db.query(Price) + .filter(Price.ticker == ticker) + .filter(Price.date >= lookback_start, Price.date <= today) + .order_by(Price.date) + .all() + ) + + if len(stock_prices) < 21: + continue + + stock_df = pd.DataFrame([{ + "date": p.date, + "open": float(p.open), + "high": float(p.high), + "low": float(p.low), + "close": float(p.close), + "volume": int(p.volume), + } for p in stock_prices]).set_index("date") + + signals = signal_gen.generate_signals(stock_df, kospi_df) + + if today in signals.index and signals.loc[today, "buy"]: + close_price = stock_df.loc[today, "close"] + reason_parts = [] + if signals.loc[today, "breakout"]: + reason_parts.append("breakout") + if signals.loc[today, "large_candle"]: + reason_parts.append("large_candle") + + signal = Signal( + date=today, + ticker=ticker, + name=name_map.get(ticker), + signal_type=SignalType.BUY, + entry_price=close_price, + target_price=round(close_price * 1.05, 2), + stop_loss_price=round(close_price * 0.97, 2), + reason=", ".join(reason_parts), + status=SignalStatus.ACTIVE, + ) + db.add(signal) + signals_created += 1 + + db.commit() + logger.info(f"KJB signal generation complete: {signals_created} buy signals") + + except Exception as e: + logger.exception(f"KJB signal generation failed: {e}") + finally: + db.close() diff --git a/backend/jobs/scheduler.py b/backend/jobs/scheduler.py index 2837b07..3be9f4d 100644 --- a/backend/jobs/scheduler.py +++ b/backend/jobs/scheduler.py @@ -11,6 +11,7 @@ KST = ZoneInfo("Asia/Seoul") from jobs.snapshot_job import create_daily_snapshots from jobs.collection_job import run_daily_collection, run_financial_collection +from jobs.kjb_signal_job import run_kjb_signals logger = logging.getLogger(__name__) @@ -66,6 +67,21 @@ def configure_jobs(): ) logger.info("Configured daily_snapshots job at 18:30 KST") + # KJB signal generation at 18:15 (after daily collection, before snapshot) + scheduler.add_job( + run_kjb_signals, + trigger=CronTrigger( + hour=18, + minute=15, + day_of_week='mon-fri', + timezone=KST, + ), + id='kjb_daily_signals', + name='Generate KJB trading signals', + replace_existing=True, + ) + logger.info("Configured kjb_daily_signals job at 18:15 KST") + def start_scheduler(): """Start the scheduler.""" diff --git a/backend/tests/e2e/test_kjb_flow.py b/backend/tests/e2e/test_kjb_flow.py new file mode 100644 index 0000000..0655968 --- /dev/null +++ b/backend/tests/e2e/test_kjb_flow.py @@ -0,0 +1,74 @@ +""" +E2E tests for KJB strategy flow. +""" +from fastapi.testclient import TestClient + + +def test_kjb_strategy_endpoint(client: TestClient, auth_headers): + """Test KJB strategy ranking endpoint.""" + response = client.post( + "/api/strategy/kjb", + json={ + "universe": {"markets": ["KOSPI"]}, + "top_n": 10, + }, + headers=auth_headers, + ) + assert response.status_code in [200, 400, 500] + if response.status_code == 200: + data = response.json() + assert data["strategy_name"] == "kjb" + assert "stocks" in data + + +def test_kjb_backtest_creation(client: TestClient, auth_headers): + """Test creating a KJB backtest.""" + response = client.post( + "/api/backtest", + json={ + "strategy_type": "kjb", + "strategy_params": { + "max_positions": 10, + "cash_reserve_ratio": 0.3, + "stop_loss_pct": 0.03, + "target1_pct": 0.05, + "target2_pct": 0.10, + }, + "start_date": "2023-01-01", + "end_date": "2023-12-31", + "initial_capital": 10000000, + "commission_rate": 0.00015, + "slippage_rate": 0.001, + "benchmark": "KOSPI", + "top_n": 30, + }, + headers=auth_headers, + ) + assert response.status_code == 200 + data = response.json() + assert "id" in data + assert data["status"] == "pending" + + +def test_signal_today_endpoint(client: TestClient, auth_headers): + """Test today's signals endpoint.""" + response = client.get("/api/signal/kjb/today", headers=auth_headers) + assert response.status_code == 200 + assert isinstance(response.json(), list) + + +def test_signal_history_endpoint(client: TestClient, auth_headers): + """Test signal history endpoint.""" + response = client.get( + "/api/signal/kjb/history", + params={"limit": 10}, + headers=auth_headers, + ) + assert response.status_code == 200 + assert isinstance(response.json(), list) + + +def test_signal_requires_auth(client: TestClient): + """Test that signal endpoints require authentication.""" + response = client.get("/api/signal/kjb/today") + assert response.status_code == 401 diff --git a/backend/tests/unit/test_kjb_signal.py b/backend/tests/unit/test_kjb_signal.py new file mode 100644 index 0000000..26d1aed --- /dev/null +++ b/backend/tests/unit/test_kjb_signal.py @@ -0,0 +1,100 @@ +""" +Unit tests for KJB signal generator. +""" +import pandas as pd +from datetime import date, timedelta + +from app.services.strategy.kjb import KJBSignalGenerator + + +def _make_price_df(closes, volumes=None, start_date=date(2024, 1, 2)): + dates = pd.bdate_range(start=start_date, periods=len(closes)) + if volumes is None: + volumes = [1000000] * len(closes) + highs = [c * 1.01 for c in closes] + lows = [c * 0.99 for c in closes] + opens = closes.copy() + return pd.DataFrame({ + "open": opens, + "high": highs, + "low": lows, + "close": closes, + "volume": volumes, + }, index=dates) + + +def _make_kospi_df(closes, start_date=date(2024, 1, 2)): + dates = pd.bdate_range(start=start_date, periods=len(closes)) + return pd.DataFrame({"close": closes}, index=dates) + + +def test_relative_strength_above_market(): + gen = KJBSignalGenerator() + stock_closes = [100 + i for i in range(25)] + kospi_closes = [100 + i * 0.5 for i in range(25)] + stock_df = _make_price_df(stock_closes) + kospi_df = _make_kospi_df(kospi_closes) + rs = gen.calculate_relative_strength(stock_df, kospi_df, lookback=10) + assert rs.dropna().iloc[-1] > 100 + + +def test_relative_strength_below_market(): + gen = KJBSignalGenerator() + stock_closes = [100 + i * 0.3 for i in range(25)] + kospi_closes = [100 + i for i in range(25)] + stock_df = _make_price_df(stock_closes) + kospi_df = _make_kospi_df(kospi_closes) + rs = gen.calculate_relative_strength(stock_df, kospi_df, lookback=10) + assert rs.dropna().iloc[-1] < 100 + + +def test_detect_breakout(): + gen = KJBSignalGenerator() + closes = [100.0] * 20 + [105.0] + stock_df = _make_price_df(closes) + breakouts = gen.detect_breakout(stock_df, lookback=20) + assert breakouts.iloc[-1] == True + assert breakouts.iloc[-2] == False + + +def test_detect_large_candle(): + gen = KJBSignalGenerator() + closes = [100.0] * 21 + [106.0] + volumes = [1000000] * 21 + [3000000] + stock_df = _make_price_df(closes, volumes) + large = gen.detect_large_candle(stock_df, pct_threshold=0.05, vol_multiplier=1.5) + assert large.iloc[-1] == True + assert large.iloc[-2] == False + + +def test_no_large_candle_low_volume(): + gen = KJBSignalGenerator() + closes = [100.0] * 21 + [106.0] + volumes = [1000000] * 22 + stock_df = _make_price_df(closes, volumes) + large = gen.detect_large_candle(stock_df) + assert large.iloc[-1] == False + + +def test_generate_buy_signal(): + gen = KJBSignalGenerator() + closes = [100.0] * 20 + [106.0] + volumes = [1000000] * 20 + [3000000] + kospi_closes = [100.0 + i * 0.1 for i in range(21)] + + stock_df = _make_price_df(closes, volumes) + kospi_df = _make_kospi_df(kospi_closes) + + signals = gen.generate_signals(stock_df, kospi_df) + assert signals["buy"].iloc[-1] == True + + +def test_no_signal_weak_stock(): + gen = KJBSignalGenerator() + # Stock underperforms market + closes = [100.0 + i * 0.1 for i in range(25)] + kospi_closes = [100 + i for i in range(25)] + stock_df = _make_price_df(closes) + kospi_df = _make_kospi_df(kospi_closes) + signals = gen.generate_signals(stock_df, kospi_df) + assert signals["buy"].iloc[-1] == False diff --git a/backend/tests/unit/test_trading_portfolio.py b/backend/tests/unit/test_trading_portfolio.py new file mode 100644 index 0000000..8df90bb --- /dev/null +++ b/backend/tests/unit/test_trading_portfolio.py @@ -0,0 +1,135 @@ +""" +Unit tests for TradingPortfolio. +""" +from decimal import Decimal +from datetime import date + +from app.services.backtest.trading_portfolio import TradingPortfolio + + +def test_initial_state(): + tp = TradingPortfolio(Decimal("10000000")) + assert tp.cash == Decimal("10000000") + assert tp.investable_capital == Decimal("7000000") + assert len(tp.positions) == 0 + + +def test_enter_position(): + tp = TradingPortfolio(Decimal("10000000")) + txn = tp.enter_position( + ticker="005930", + price=Decimal("70000"), + date=date(2024, 1, 2), + commission_rate=Decimal("0.00015"), + slippage_rate=Decimal("0.001"), + ) + assert txn is not None + assert txn.action == "buy" + assert "005930" in tp.positions + pos = tp.positions["005930"] + assert pos.entry_price == Decimal("70000") + assert pos.stop_loss == Decimal("67900") # -3% + assert pos.target1 == Decimal("73500") # +5% + assert pos.target2 == Decimal("77000") # +10% + + +def test_max_positions(): + tp = TradingPortfolio(Decimal("10000000"), max_positions=2) + tp.enter_position("A", Decimal("1000"), date(2024, 1, 2), Decimal("0"), Decimal("0")) + tp.enter_position("B", Decimal("1000"), date(2024, 1, 2), Decimal("0"), Decimal("0")) + txn = tp.enter_position("C", Decimal("1000"), date(2024, 1, 2), Decimal("0"), Decimal("0")) + assert txn is None + assert len(tp.positions) == 2 + + +def test_stop_loss_exit(): + tp = TradingPortfolio(Decimal("10000000")) + tp.enter_position("005930", Decimal("70000"), date(2024, 1, 2), Decimal("0"), Decimal("0")) + exits = tp.check_exits( + date=date(2024, 1, 3), + prices={"005930": Decimal("67000")}, + commission_rate=Decimal("0"), + slippage_rate=Decimal("0"), + ) + assert len(exits) == 1 + assert exits[0].action == "sell" + assert "005930" not in tp.positions + + +def test_partial_take_profit(): + tp = TradingPortfolio(Decimal("10000000")) + tp.enter_position("005930", Decimal("70000"), date(2024, 1, 2), Decimal("0"), Decimal("0")) + pos = tp.positions["005930"] + initial_shares = pos.shares + + exits = tp.check_exits( + date=date(2024, 1, 10), + prices={"005930": Decimal("73500")}, + commission_rate=Decimal("0"), + slippage_rate=Decimal("0"), + ) + assert len(exits) == 1 + assert exits[0].action == "partial_sell" + assert exits[0].shares == initial_shares // 2 + assert "005930" in tp.positions + assert tp.positions["005930"].stop_loss == Decimal("70000") + + +def test_full_take_profit(): + tp = TradingPortfolio(Decimal("10000000")) + tp.enter_position("005930", Decimal("70000"), date(2024, 1, 2), Decimal("0"), Decimal("0")) + tp.check_exits( + date=date(2024, 1, 10), + prices={"005930": Decimal("73500")}, + commission_rate=Decimal("0"), + slippage_rate=Decimal("0"), + ) + exits = tp.check_exits( + date=date(2024, 1, 15), + prices={"005930": Decimal("77000")}, + commission_rate=Decimal("0"), + slippage_rate=Decimal("0"), + ) + assert len(exits) == 1 + assert exits[0].action == "sell" + assert "005930" not in tp.positions + + +def test_trailing_stop_after_partial(): + tp = TradingPortfolio(Decimal("10000000")) + tp.enter_position("005930", Decimal("70000"), date(2024, 1, 2), Decimal("0"), Decimal("0")) + tp.check_exits( + date=date(2024, 1, 10), + prices={"005930": Decimal("73500")}, + commission_rate=Decimal("0"), + slippage_rate=Decimal("0"), + ) + # After partial sell, stop should be at entry (70000) + assert tp.positions["005930"].stop_loss == Decimal("70000") + # Price rises to +8%, stop stays at entry + tp.check_exits( + date=date(2024, 1, 12), + prices={"005930": Decimal("75600")}, + commission_rate=Decimal("0"), + slippage_rate=Decimal("0"), + ) + assert tp.positions["005930"].stop_loss == Decimal("70000") + + +def test_cash_reserve(): + tp = TradingPortfolio(Decimal("10000000"), cash_reserve_ratio=Decimal("0.3")) + assert tp.investable_capital == Decimal("7000000") + + +def test_get_portfolio_value(): + tp = TradingPortfolio(Decimal("10000000")) + tp.enter_position("005930", Decimal("70000"), date(2024, 1, 2), Decimal("0"), Decimal("0")) + value = tp.get_value({"005930": Decimal("75000")}) + assert value > Decimal("10000000") + + +def test_duplicate_entry_rejected(): + tp = TradingPortfolio(Decimal("10000000")) + tp.enter_position("005930", Decimal("70000"), date(2024, 1, 2), Decimal("0"), Decimal("0")) + txn = tp.enter_position("005930", Decimal("70000"), date(2024, 1, 3), Decimal("0"), Decimal("0")) + assert txn is None diff --git a/docs/plans/2026-02-19-kjb-strategy-design.md b/docs/plans/2026-02-19-kjb-strategy-design.md new file mode 100644 index 0000000..e1672b3 --- /dev/null +++ b/docs/plans/2026-02-19-kjb-strategy-design.md @@ -0,0 +1,122 @@ +# Kim Jong-bong (KJB) Strategy - Full System Design + +**Date**: 2026-02-19 +**Status**: Approved + +## Overview + +Implement the Kim Jong-bong short-term trading strategy from `quant.md` as a full system: +backtesting, daily signal generation, portfolio management, and frontend dashboard. + +This strategy is fundamentally different from existing strategies (Quality, MultiFactor, +ValueMomentum) which are periodic-rebalancing based. KJB is signal-based daily trading +with individual position management, stop-losses, and trailing stops. + +## Trading Rules (from quant.md) + +### Universe +- Market cap rank: 1-30 (large caps) +- Daily trading value >= 200 billion KRW + +### Entry Signals (all must be true) +- Relative Strength vs KOSPI > 100 (stock outperforming market over 10 trading days) +- AND at least one of: + - Box breakout (close > 20-day high) + - Large bullish candle (daily return >= 5% AND volume >= 1.5x 20-day average) + +### Exit Rules +- Stop-loss: -3% from entry price +- Take-profit 1: +5% → sell 50% of position +- Take-profit 2: +10% → sell remaining +- Trailing stop: when +5%, move stop to entry price; when +10%, move stop to +5% + +### Portfolio Rules +- Cash reserve: 30% (investable = total capital x 0.7) +- Max positions: 10 +- Position size: equal weight (investable / max_positions) + +## Architecture + +### 1. Strategy Layer + +**`KJBStrategy(BaseStrategy)`** - `services/strategy/kjb.py` +- Compatible with existing strategy pattern (returns `StrategyResult`) +- Used for stock ranking by KJB criteria +- Scores based on: relative strength, breakout proximity, volume trend + +**`KJBSignalGenerator`** - `services/strategy/kjb.py` +- Separate class for daily signal generation +- Input: stock ticker + price data +- Output: buy/sell signals with entry, target, stop-loss prices +- Used by: DailyBacktestEngine, daily scheduler job + +### 2. Daily Backtest Engine + +**`DailyBacktestEngine`** - `services/backtest/daily_engine.py` +- New engine alongside existing `BacktestEngine` +- Daily simulation loop (not rebalance-based) +- Each day: check entry signals for universe, check exit conditions for positions +- Uses `TradingPortfolio` for position management + +**`TradingPortfolio`** - `services/backtest/trading_portfolio.py` +- Separate class from existing `VirtualPortfolio` +- Supports: `enter_position()`, `exit_position()`, `partial_exit()` +- Per-position tracking: stop_loss, target_price, trailing_stop +- Cash reserve enforcement (30%) +- Transaction recording with reasons + +### 3. Signal System + +**DB Model: `Signal`** - `models/signal.py` +- Fields: date, ticker, signal_type (buy/sell/partial_sell), entry_price, + target_price, stop_loss_price, reason, status (active/executed/expired) + +**Scheduler Job: `kjb_daily_signal_job`** - `jobs/kjb_signal_job.py` +- Runs daily at 18:00 KST (after price data collection) +- Generates signals for all universe stocks +- Saves to DB + +### 4. API Endpoints + +**Signal API** - `api/signal.py` +- `GET /api/signal/kjb/today` - today's buy/sell signals +- `GET /api/signal/kjb/history` - historical signals +- `GET /api/signal/kjb/positions` - active positions with P&L + +**Strategy API additions:** +- `POST /api/strategy/kjb` - run KJB ranking + +**Backtest additions:** +- Existing backtest API works with strategy_type="kjb" + +### 5. Frontend + +- KJB signal dashboard page +- Today's signals cards (buy/sell recommendations) +- Active positions table (entry price, current price, P&L, stop-loss level) +- Portfolio value chart + +## Files to Create + +| File | Purpose | +|------|---------| +| `services/strategy/kjb.py` | KJBStrategy + KJBSignalGenerator | +| `services/backtest/daily_engine.py` | DailyBacktestEngine | +| `services/backtest/trading_portfolio.py` | TradingPortfolio | +| `models/signal.py` | Signal DB model | +| `schemas/signal.py` | Signal Pydantic schemas | +| `api/signal.py` | Signal API endpoints | +| `jobs/kjb_signal_job.py` | Daily signal scheduler job | +| Alembic migration | signals table | +| Frontend pages | KJB dashboard | + +## Files to Modify + +| File | Change | +|------|--------| +| `services/backtest/engine.py` | Add "kjb" to `_create_strategy()` | +| `models/__init__.py` | Register Signal model | +| `api/__init__.py` | Register signal router | +| `schemas/backtest.py` | Add "kjb" to strategy_type enum | +| `jobs/scheduler.py` | Register KJB signal job | +| `api/strategy.py` | Add KJB strategy endpoint | diff --git a/docs/plans/2026-02-19-kjb-strategy-plan.md b/docs/plans/2026-02-19-kjb-strategy-plan.md new file mode 100644 index 0000000..bbd5b17 --- /dev/null +++ b/docs/plans/2026-02-19-kjb-strategy-plan.md @@ -0,0 +1,1768 @@ +# Kim Jong-bong (KJB) Strategy Implementation Plan + +> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task. + +**Goal:** Implement the Kim Jong-bong short-term trading strategy as a full system: signal generation, daily backtesting, live signal alerts, and frontend dashboard. + +**Architecture:** New strategy class (`KJBStrategy`) for ranking + separate `KJBSignalGenerator` for daily buy/sell signals. New `DailyBacktestEngine` runs daily simulations (not rebalance-based). New `TradingPortfolio` handles individual position management with stop-loss/trailing-stop. Signal model stores daily signals for API/frontend consumption. + +**Tech Stack:** Python, FastAPI, SQLAlchemy, PostgreSQL, Alembic, Next.js/React, APScheduler + +--- + +### Task 1: Signal DB Model + +**Files:** +- Create: `backend/app/models/signal.py` +- Modify: `backend/app/models/__init__.py` + +**Step 1: Create Signal model** + +```python +# backend/app/models/signal.py +""" +Trading signal models. +""" +import enum +from datetime import datetime + +from sqlalchemy import ( + Column, Integer, String, Numeric, DateTime, Date, + Text, Enum as SQLEnum, +) + +from app.core.database import Base + + +class SignalType(str, enum.Enum): + BUY = "buy" + SELL = "sell" + PARTIAL_SELL = "partial_sell" + + +class SignalStatus(str, enum.Enum): + ACTIVE = "active" + EXECUTED = "executed" + EXPIRED = "expired" + + +class Signal(Base): + __tablename__ = "signals" + + id = Column(Integer, primary_key=True, index=True) + date = Column(Date, nullable=False, index=True) + ticker = Column(String(20), nullable=False, index=True) + name = Column(String(100)) + signal_type = Column(SQLEnum(SignalType), nullable=False) + entry_price = Column(Numeric(12, 2)) + target_price = Column(Numeric(12, 2)) + stop_loss_price = Column(Numeric(12, 2)) + reason = Column(String(200)) + status = Column(SQLEnum(SignalStatus), default=SignalStatus.ACTIVE) + created_at = Column(DateTime, default=datetime.utcnow) +``` + +**Step 2: Register in models/__init__.py** + +Add to `backend/app/models/__init__.py`: +```python +from app.models.signal import Signal, SignalType, SignalStatus +``` +And add `"Signal", "SignalType", "SignalStatus"` to `__all__`. + +**Step 3: Create Alembic migration** + +Run: `cd /home/zephyrdark/workspace/quant/galaxy-po/backend && alembic revision --autogenerate -m "add signals table"` + +**Step 4: Apply migration** + +Run: `cd /home/zephyrdark/workspace/quant/galaxy-po/backend && alembic upgrade head` + +**Step 5: Commit** + +```bash +git add backend/app/models/signal.py backend/app/models/__init__.py backend/alembic/versions/ +git commit -m "feat: add Signal model for KJB trading signals" +``` + +--- + +### Task 2: Signal Schemas + +**Files:** +- Create: `backend/app/schemas/signal.py` + +**Step 1: Create signal schemas** + +```python +# backend/app/schemas/signal.py +""" +Signal related Pydantic schemas. +""" +from datetime import date, datetime +from decimal import Decimal +from typing import Optional, List +from enum import Enum + +from pydantic import BaseModel + +from app.schemas.portfolio import FloatDecimal + + +class SignalType(str, Enum): + BUY = "buy" + SELL = "sell" + PARTIAL_SELL = "partial_sell" + + +class SignalStatus(str, Enum): + ACTIVE = "active" + EXECUTED = "executed" + EXPIRED = "expired" + + +class SignalResponse(BaseModel): + """Single signal response.""" + id: int + date: date + ticker: str + name: Optional[str] = None + signal_type: str + entry_price: Optional[FloatDecimal] = None + target_price: Optional[FloatDecimal] = None + stop_loss_price: Optional[FloatDecimal] = None + reason: Optional[str] = None + status: str + created_at: datetime + + class Config: + from_attributes = True + + +class ActivePosition(BaseModel): + """Active trading position with P&L.""" + ticker: str + name: Optional[str] = None + entry_date: date + entry_price: FloatDecimal + current_price: FloatDecimal + shares: int + stop_loss_price: FloatDecimal + target_price: FloatDecimal + pnl_percent: FloatDecimal + pnl_amount: FloatDecimal +``` + +**Step 2: Commit** + +```bash +git add backend/app/schemas/signal.py +git commit -m "feat: add Signal Pydantic schemas" +``` + +--- + +### Task 3: TradingPortfolio + +This is the core position management class for KJB strategy. Separate from `VirtualPortfolio`. + +**Files:** +- Create: `backend/app/services/backtest/trading_portfolio.py` +- Create: `backend/tests/unit/test_trading_portfolio.py` + +**Step 1: Write failing tests** + +```python +# backend/tests/unit/test_trading_portfolio.py +""" +Unit tests for TradingPortfolio. +""" +from decimal import Decimal +from datetime import date + +from app.services.backtest.trading_portfolio import TradingPortfolio + + +def test_initial_state(): + tp = TradingPortfolio(Decimal("10000000")) + assert tp.cash == Decimal("10000000") + assert tp.investable_capital == Decimal("7000000") # 70% + assert len(tp.positions) == 0 + + +def test_enter_position(): + tp = TradingPortfolio(Decimal("10000000")) + txn = tp.enter_position( + ticker="005930", + price=Decimal("70000"), + date=date(2024, 1, 2), + commission_rate=Decimal("0.00015"), + slippage_rate=Decimal("0.001"), + ) + assert txn is not None + assert txn.action == "buy" + assert "005930" in tp.positions + pos = tp.positions["005930"] + assert pos.entry_price == Decimal("70000") + assert pos.stop_loss == Decimal("67900") # -3% + assert pos.target1 == Decimal("73500") # +5% + assert pos.target2 == Decimal("77000") # +10% + + +def test_max_positions(): + tp = TradingPortfolio(Decimal("10000000"), max_positions=2) + tp.enter_position("A", Decimal("1000"), date(2024, 1, 2), Decimal("0"), Decimal("0")) + tp.enter_position("B", Decimal("1000"), date(2024, 1, 2), Decimal("0"), Decimal("0")) + txn = tp.enter_position("C", Decimal("1000"), date(2024, 1, 2), Decimal("0"), Decimal("0")) + assert txn is None # rejected + assert len(tp.positions) == 2 + + +def test_stop_loss_exit(): + tp = TradingPortfolio(Decimal("10000000")) + tp.enter_position("005930", Decimal("70000"), date(2024, 1, 2), Decimal("0"), Decimal("0")) + exits = tp.check_exits( + date=date(2024, 1, 3), + prices={"005930": Decimal("67000")}, # below -3% + commission_rate=Decimal("0"), + slippage_rate=Decimal("0"), + ) + assert len(exits) == 1 + assert exits[0].action == "sell" + assert "005930" not in tp.positions + + +def test_partial_take_profit(): + tp = TradingPortfolio(Decimal("10000000")) + tp.enter_position("005930", Decimal("70000"), date(2024, 1, 2), Decimal("0"), Decimal("0")) + pos = tp.positions["005930"] + initial_shares = pos.shares + + exits = tp.check_exits( + date=date(2024, 1, 10), + prices={"005930": Decimal("73500")}, # exactly +5% + commission_rate=Decimal("0"), + slippage_rate=Decimal("0"), + ) + assert len(exits) == 1 + assert exits[0].action == "partial_sell" + # Should sell 50% of shares + assert exits[0].shares == initial_shares // 2 + # Position should still exist with remaining shares + assert "005930" in tp.positions + # Stop loss should be updated to entry price (trailing) + assert tp.positions["005930"].stop_loss == Decimal("70000") + + +def test_full_take_profit(): + tp = TradingPortfolio(Decimal("10000000")) + tp.enter_position("005930", Decimal("70000"), date(2024, 1, 2), Decimal("0"), Decimal("0")) + # First trigger partial at +5% + tp.check_exits( + date=date(2024, 1, 10), + prices={"005930": Decimal("73500")}, + commission_rate=Decimal("0"), + slippage_rate=Decimal("0"), + ) + # Then trigger full exit at +10% + exits = tp.check_exits( + date=date(2024, 1, 15), + prices={"005930": Decimal("77000")}, + commission_rate=Decimal("0"), + slippage_rate=Decimal("0"), + ) + assert len(exits) == 1 + assert exits[0].action == "sell" + assert "005930" not in tp.positions + + +def test_trailing_stop_after_10pct(): + tp = TradingPortfolio(Decimal("10000000")) + tp.enter_position("005930", Decimal("70000"), date(2024, 1, 2), Decimal("0"), Decimal("0")) + # Trigger partial at +5% + tp.check_exits( + date=date(2024, 1, 10), + prices={"005930": Decimal("73500")}, + commission_rate=Decimal("0"), + slippage_rate=Decimal("0"), + ) + # Price goes to +10% but doesn't trigger target2 (already partially sold) + # Actually +10% should trigger full exit of remaining + # Let's test trailing stop: price goes up to +8%, trailing stop at entry + tp.check_exits( + date=date(2024, 1, 12), + prices={"005930": Decimal("75600")}, # +8% + commission_rate=Decimal("0"), + slippage_rate=Decimal("0"), + ) + # Stop should still be at entry (70000) since we're between 5-10% + assert tp.positions["005930"].stop_loss == Decimal("70000") + + +def test_cash_reserve(): + tp = TradingPortfolio(Decimal("10000000"), cash_reserve_ratio=Decimal("0.3")) + # investable = 7,000,000 + # position_size = 7,000,000 / 10 = 700,000 + # Total cash should never go below 3,000,000 (30%) + assert tp.investable_capital == Decimal("7000000") + + +def test_get_portfolio_value(): + tp = TradingPortfolio(Decimal("10000000")) + tp.enter_position("005930", Decimal("70000"), date(2024, 1, 2), Decimal("0"), Decimal("0")) + value = tp.get_value({"005930": Decimal("75000")}) + assert value > Decimal("10000000") # should be worth more at higher price +``` + +**Step 2: Run tests to verify they fail** + +Run: `cd /home/zephyrdark/workspace/quant/galaxy-po/backend && python -m pytest tests/unit/test_trading_portfolio.py -v` +Expected: FAIL (module not found) + +**Step 3: Implement TradingPortfolio** + +```python +# backend/app/services/backtest/trading_portfolio.py +""" +Trading portfolio for signal-based strategies (KJB). +Supports individual position management with stop-loss and trailing stops. +""" +from decimal import Decimal, ROUND_DOWN +from typing import Dict, List, Optional +from dataclasses import dataclass, field +from datetime import date + + +@dataclass +class Position: + """An active trading position.""" + ticker: str + shares: int + entry_price: Decimal + entry_date: date + stop_loss: Decimal + target1: Decimal # +5% partial sell + target2: Decimal # +10% full sell + partial_sold: bool = False # whether 50% has been sold + + +@dataclass +class TradingTransaction: + """A single trading transaction.""" + ticker: str + action: str # 'buy', 'sell', 'partial_sell' + shares: int + price: Decimal + commission: Decimal + reason: str = "" + + +class TradingPortfolio: + """ + Portfolio for signal-based daily trading. + + Key differences from VirtualPortfolio: + - Individual position entry/exit (not rebalancing) + - Per-position stop-loss, take-profit, trailing stop + - Cash reserve enforcement (30%) + - Partial position exits (50% at +5%) + """ + + def __init__( + self, + initial_capital: Decimal, + max_positions: int = 10, + cash_reserve_ratio: Decimal = Decimal("0.3"), + stop_loss_pct: Decimal = Decimal("0.03"), + target1_pct: Decimal = Decimal("0.05"), + target2_pct: Decimal = Decimal("0.10"), + ): + self.initial_capital = initial_capital + self.cash = initial_capital + self.max_positions = max_positions + self.cash_reserve_ratio = cash_reserve_ratio + self.stop_loss_pct = stop_loss_pct + self.target1_pct = target1_pct + self.target2_pct = target2_pct + self.positions: Dict[str, Position] = {} + + @property + def investable_capital(self) -> Decimal: + """Capital available for investment (excluding cash reserve).""" + return self.initial_capital * (1 - self.cash_reserve_ratio) + + @property + def position_size(self) -> Decimal: + """Target size per position.""" + return self.investable_capital / Decimal(str(self.max_positions)) + + def get_value(self, prices: Dict[str, Decimal]) -> Decimal: + """Calculate total portfolio value.""" + holdings_value = sum( + Decimal(str(pos.shares)) * prices.get(pos.ticker, Decimal("0")) + for pos in self.positions.values() + ) + return self.cash + holdings_value + + def enter_position( + self, + ticker: str, + price: Decimal, + date: date, + commission_rate: Decimal, + slippage_rate: Decimal, + ) -> Optional[TradingTransaction]: + """Enter a new position.""" + # Check limits + if len(self.positions) >= self.max_positions: + return None + if ticker in self.positions: + return None + + # Calculate shares + buy_price = price * (1 + slippage_rate) + max_cost = self.position_size + available = self.cash - (self.initial_capital * self.cash_reserve_ratio) + if available <= 0: + return None + actual_cost = min(max_cost, available) + shares = int((actual_cost / (buy_price * (1 + commission_rate))).to_integral_value(rounding=ROUND_DOWN)) + + if shares <= 0: + return None + + cost = Decimal(str(shares)) * buy_price + commission = cost * commission_rate + total_cost = cost + commission + + self.cash -= total_cost + + self.positions[ticker] = Position( + ticker=ticker, + shares=shares, + entry_price=price, + entry_date=date, + stop_loss=price * (1 - self.stop_loss_pct), + target1=price * (1 + self.target1_pct), + target2=price * (1 + self.target2_pct), + ) + + return TradingTransaction( + ticker=ticker, + action="buy", + shares=shares, + price=buy_price, + commission=commission, + reason="entry_signal", + ) + + def check_exits( + self, + date: date, + prices: Dict[str, Decimal], + commission_rate: Decimal, + slippage_rate: Decimal, + ) -> List[TradingTransaction]: + """Check all positions for exit conditions. Returns transactions.""" + transactions = [] + + for ticker in list(self.positions.keys()): + pos = self.positions[ticker] + current_price = prices.get(ticker) + if current_price is None: + continue + + sell_price = current_price * (1 - slippage_rate) + + # 1. Stop-loss check + if current_price <= pos.stop_loss: + txn = self._exit_full(ticker, sell_price, commission_rate, "stop_loss") + if txn: + transactions.append(txn) + continue + + # 2. Take-profit 2: +10% (full exit of remaining) + if current_price >= pos.target2 and pos.partial_sold: + txn = self._exit_full(ticker, sell_price, commission_rate, "take_profit_2") + if txn: + transactions.append(txn) + continue + + # 3. Take-profit 1: +5% (partial exit 50%) + if current_price >= pos.target1 and not pos.partial_sold: + txn = self._exit_partial(ticker, sell_price, commission_rate) + if txn: + transactions.append(txn) + # Update trailing stop to entry price + pos.stop_loss = pos.entry_price + continue + + # 4. Trailing stop update (between 5-10%: stop at entry, above 10%: stop at +5%) + if pos.partial_sold: + gain_pct = (current_price - pos.entry_price) / pos.entry_price + if gain_pct >= self.target2_pct: + new_stop = pos.entry_price * (1 + self.target1_pct) + if new_stop > pos.stop_loss: + pos.stop_loss = new_stop + + return transactions + + def _exit_full( + self, + ticker: str, + sell_price: Decimal, + commission_rate: Decimal, + reason: str, + ) -> Optional[TradingTransaction]: + """Exit a position fully.""" + pos = self.positions.get(ticker) + if not pos or pos.shares <= 0: + return None + + proceeds = Decimal(str(pos.shares)) * sell_price + commission = proceeds * commission_rate + self.cash += proceeds - commission + + shares = pos.shares + del self.positions[ticker] + + return TradingTransaction( + ticker=ticker, + action="sell", + shares=shares, + price=sell_price, + commission=commission, + reason=reason, + ) + + def _exit_partial( + self, + ticker: str, + sell_price: Decimal, + commission_rate: Decimal, + ) -> Optional[TradingTransaction]: + """Exit 50% of a position.""" + pos = self.positions.get(ticker) + if not pos or pos.shares <= 0: + return None + + sell_shares = pos.shares // 2 + if sell_shares <= 0: + return None + + proceeds = Decimal(str(sell_shares)) * sell_price + commission = proceeds * commission_rate + self.cash += proceeds - commission + + pos.shares -= sell_shares + pos.partial_sold = True + + return TradingTransaction( + ticker=ticker, + action="partial_sell", + shares=sell_shares, + price=sell_price, + commission=commission, + reason="take_profit_1", + ) +``` + +**Step 4: Run tests to verify they pass** + +Run: `cd /home/zephyrdark/workspace/quant/galaxy-po/backend && python -m pytest tests/unit/test_trading_portfolio.py -v` +Expected: All PASS + +**Step 5: Commit** + +```bash +git add backend/app/services/backtest/trading_portfolio.py backend/tests/unit/test_trading_portfolio.py +git commit -m "feat: add TradingPortfolio for signal-based position management" +``` + +--- + +### Task 4: KJB Signal Generator + +Core signal generation logic for detecting buy/sell signals. + +**Files:** +- Create: `backend/app/services/strategy/kjb.py` +- Create: `backend/tests/unit/test_kjb_signal.py` + +**Step 1: Write failing tests for signal generator** + +```python +# backend/tests/unit/test_kjb_signal.py +""" +Unit tests for KJB signal generator. +""" +import pandas as pd +import numpy as np +from decimal import Decimal +from datetime import date, timedelta + +from app.services.strategy.kjb import KJBSignalGenerator + + +def _make_price_df(closes, volumes=None, start_date=date(2024, 1, 2)): + """Helper to create price DataFrame.""" + dates = [start_date + timedelta(days=i) for i in range(len(closes))] + if volumes is None: + volumes = [1000000] * len(closes) + highs = [c * 1.01 for c in closes] + lows = [c * 0.99 for c in closes] + opens = closes.copy() + return pd.DataFrame({ + "date": dates, + "open": opens, + "high": highs, + "low": lows, + "close": closes, + "volume": volumes, + }).set_index("date") + + +def _make_kospi_df(closes, start_date=date(2024, 1, 2)): + dates = [start_date + timedelta(days=i) for i in range(len(closes))] + return pd.DataFrame({ + "date": dates, + "close": closes, + }).set_index("date") + + +def test_relative_strength_above_market(): + gen = KJBSignalGenerator() + # Stock up 10% over 10 days, market up 5% + stock_closes = [100 + i for i in range(25)] # steady rise + kospi_closes = [100 + i * 0.5 for i in range(25)] # slower rise + stock_df = _make_price_df(stock_closes) + kospi_df = _make_kospi_df(kospi_closes) + rs = gen.calculate_relative_strength(stock_df, kospi_df, lookback=10) + # Last value should be > 100 (stock outperforming) + assert rs.iloc[-1] > 100 + + +def test_relative_strength_below_market(): + gen = KJBSignalGenerator() + stock_closes = [100 + i * 0.3 for i in range(25)] + kospi_closes = [100 + i for i in range(25)] + stock_df = _make_price_df(stock_closes) + kospi_df = _make_kospi_df(kospi_closes) + rs = gen.calculate_relative_strength(stock_df, kospi_df, lookback=10) + assert rs.iloc[-1] < 100 + + +def test_detect_breakout(): + gen = KJBSignalGenerator() + # Flat for 20 days, then breakout + closes = [100.0] * 20 + [105.0] + stock_df = _make_price_df(closes) + breakouts = gen.detect_breakout(stock_df, lookback=20) + assert breakouts.iloc[-1] == True + assert breakouts.iloc[-2] == False + + +def test_detect_large_candle(): + gen = KJBSignalGenerator() + # Normal days, then a 6% jump with 2x volume + closes = [100.0] * 21 + [106.0] + volumes = [1000000] * 21 + [3000000] + stock_df = _make_price_df(closes, volumes) + large = gen.detect_large_candle(stock_df, pct_threshold=0.05, vol_multiplier=1.5) + assert large.iloc[-1] == True + assert large.iloc[-2] == False + + +def test_no_large_candle_low_volume(): + gen = KJBSignalGenerator() + # 6% jump but normal volume + closes = [100.0] * 21 + [106.0] + volumes = [1000000] * 22 # no volume spike + stock_df = _make_price_df(closes, volumes) + large = gen.detect_large_candle(stock_df) + assert large.iloc[-1] == False + + +def test_generate_buy_signal(): + gen = KJBSignalGenerator() + # Create scenario: RS > 100, breakout, large candle + # Stock rises faster than market with a breakout candle + closes = [100.0] * 20 + [106.0] # breakout + large candle + volumes = [1000000] * 20 + [3000000] + kospi_closes = [100.0 + i * 0.1 for i in range(21)] # market barely moves + + stock_df = _make_price_df(closes, volumes) + kospi_df = _make_kospi_df(kospi_closes) + + signals = gen.generate_signals(stock_df, kospi_df) + # Last day should have a buy signal + assert signals["buy"].iloc[-1] == True +``` + +**Step 2: Run tests to verify they fail** + +Run: `cd /home/zephyrdark/workspace/quant/galaxy-po/backend && python -m pytest tests/unit/test_kjb_signal.py -v` +Expected: FAIL + +**Step 3: Implement KJBSignalGenerator** + +```python +# backend/app/services/strategy/kjb.py +""" +Kim Jong-bong (KJB) strategy implementation. + +Signal-based short-term trading strategy: +- Universe: market cap top 30, daily trading value >= 200B KRW +- Entry: relative strength > KOSPI + breakout or large candle +- Exit: stop-loss -3%, take-profit +5%/+10%, trailing stop +""" +from datetime import date +from decimal import Decimal +from typing import Dict, List, Optional + +import pandas as pd +from sqlalchemy.orm import Session + +from app.services.strategy.base import BaseStrategy +from app.schemas.strategy import StockFactor, StrategyResult, UniverseFilter +from app.services.factor_calculator import FactorCalculator + + +class KJBSignalGenerator: + """ + Generates daily buy/sell signals based on KJB rules. + Pure computation - no DB access. Takes DataFrames as input. + """ + + def calculate_relative_strength( + self, + stock_df: pd.DataFrame, + kospi_df: pd.DataFrame, + lookback: int = 10, + ) -> pd.Series: + """ + Calculate relative strength vs KOSPI. + RS = (stock return / market return) * 100 + RS > 100 means stock outperforms market. + """ + stock_ret = stock_df["close"].pct_change(lookback) + kospi_ret = kospi_df["close"].pct_change(lookback) + + # Align indices + aligned = pd.DataFrame({ + "stock_ret": stock_ret, + "kospi_ret": kospi_ret, + }).dropna() + + # Avoid division by zero + rs = pd.Series(index=stock_df.index, dtype=float) + for idx in aligned.index: + market_ret = aligned.loc[idx, "kospi_ret"] + stock_r = aligned.loc[idx, "stock_ret"] + if abs(market_ret) < 1e-10: + rs[idx] = 100.0 if abs(stock_r) < 1e-10 else (200.0 if stock_r > 0 else 0.0) + else: + rs[idx] = (stock_r / market_ret) * 100 + + return rs + + def detect_breakout( + self, + stock_df: pd.DataFrame, + lookback: int = 20, + ) -> pd.Series: + """ + Detect box breakout: close > highest high of previous lookback days. + """ + prev_high = stock_df["high"].rolling(lookback).max().shift(1) + return stock_df["close"] > prev_high + + def detect_large_candle( + self, + stock_df: pd.DataFrame, + pct_threshold: float = 0.05, + vol_multiplier: float = 1.5, + ) -> pd.Series: + """ + Detect large bullish candle: + - Daily return >= pct_threshold (5%) + - Volume >= vol_multiplier * 20-day average volume + """ + daily_return = stock_df["close"].pct_change() + avg_volume = stock_df["volume"].rolling(20).mean() + volume_ratio = stock_df["volume"] / avg_volume + + return (daily_return >= pct_threshold) & (volume_ratio >= vol_multiplier) + + def generate_signals( + self, + stock_df: pd.DataFrame, + kospi_df: pd.DataFrame, + rs_lookback: int = 10, + breakout_lookback: int = 20, + ) -> pd.DataFrame: + """ + Generate buy signals combining all filters. + Buy when: RS > 100 AND (breakout OR large candle) + """ + rs = self.calculate_relative_strength(stock_df, kospi_df, rs_lookback) + breakout = self.detect_breakout(stock_df, breakout_lookback) + large_candle = self.detect_large_candle(stock_df) + + signals = pd.DataFrame(index=stock_df.index) + signals["rs"] = rs + signals["breakout"] = breakout + signals["large_candle"] = large_candle + signals["buy"] = (rs > 100) & (breakout | large_candle) + + return signals +``` + +**Step 4: Run tests** + +Run: `cd /home/zephyrdark/workspace/quant/galaxy-po/backend && python -m pytest tests/unit/test_kjb_signal.py -v` +Expected: All PASS + +**Step 5: Commit** + +```bash +git add backend/app/services/strategy/kjb.py backend/tests/unit/test_kjb_signal.py +git commit -m "feat: add KJBSignalGenerator for daily buy/sell signal detection" +``` + +--- + +### Task 5: KJB Strategy Class + +Ranking strategy for compatibility with existing strategy pattern. + +**Files:** +- Modify: `backend/app/services/strategy/kjb.py` (add KJBStrategy class) +- Modify: `backend/app/services/strategy/__init__.py` +- Modify: `backend/app/schemas/strategy.py` (add KJBRequest) +- Modify: `backend/app/api/strategy.py` (add endpoint) + +**Step 1: Add KJBStrategy class to kjb.py** + +Append to `backend/app/services/strategy/kjb.py`: + +```python +class KJBStrategy(BaseStrategy): + """ + KJB strategy for stock ranking. + Ranks stocks by relative strength, breakout proximity, and volume trend. + Compatible with existing strategy pattern (returns StrategyResult). + """ + + strategy_name = "kjb" + + def run( + self, + universe_filter: UniverseFilter, + top_n: int, + base_date: date = None, + **kwargs, + ) -> StrategyResult: + if base_date is None: + base_date = date.today() + + # Get universe - filter to top 30 by market cap + stocks = self.get_universe(universe_filter) + stocks.sort(key=lambda s: s.market_cap or 0, reverse=True) + stocks = stocks[:30] + + tickers = [s.ticker for s in stocks] + stock_map = {s.ticker: s for s in stocks} + + if not tickers: + return StrategyResult( + strategy_name=self.strategy_name, + base_date=base_date, + universe_count=0, + result_count=0, + stocks=[], + ) + + # Get valuations and sectors + valuations = self.factor_calc.get_valuations(tickers, base_date) + sectors = self.factor_calc.get_sectors(tickers) + + # Calculate momentum as proxy for relative strength ranking + momentum = self.factor_calc.calculate_momentum( + tickers, base_date, months=1, skip_recent=0, + ) + + # Build results + results = [] + for ticker in tickers: + stock = stock_map[ticker] + val = valuations.get(ticker) + mom = momentum.get(ticker, Decimal("0")) + + results.append(StockFactor( + ticker=ticker, + name=stock.name, + market=stock.market, + sector_name=sectors.get(ticker), + market_cap=int(stock.market_cap / 100_000_000) if stock.market_cap else None, + close_price=Decimal(str(stock.close_price)) if stock.close_price else None, + per=Decimal(str(val.per)) if val and val.per else None, + pbr=Decimal(str(val.pbr)) if val and val.pbr else None, + dividend_yield=Decimal(str(val.dividend_yield)) if val and val.dividend_yield else None, + momentum_score=mom, + total_score=mom, + )) + + results.sort(key=lambda x: x.total_score or Decimal("0"), reverse=True) + for i, r in enumerate(results[:top_n], 1): + r.rank = i + + return StrategyResult( + strategy_name=self.strategy_name, + base_date=base_date, + universe_count=len(stocks), + result_count=min(top_n, len(results)), + stocks=results[:top_n], + ) +``` + +**Step 2: Register in strategy __init__.py** + +Add to `backend/app/services/strategy/__init__.py`: +```python +from app.services.strategy.kjb import KJBStrategy, KJBSignalGenerator +``` +Add `"KJBStrategy", "KJBSignalGenerator"` to `__all__`. + +**Step 3: Add KJBRequest schema** + +Add to `backend/app/schemas/strategy.py`: +```python +class KJBRequest(StrategyRequest): + """KJB strategy request.""" + pass # uses default StrategyRequest params +``` + +**Step 4: Add API endpoint** + +Add to `backend/app/api/strategy.py`: +```python +from app.schemas.strategy import KJBRequest +from app.services.strategy import KJBStrategy + +@router.post("/kjb", response_model=StrategyResult) +async def run_kjb( + request: KJBRequest, + current_user: CurrentUser, + db: Session = Depends(get_db), +): + """Run KJB strategy.""" + strategy = KJBStrategy(db) + return strategy.run( + universe_filter=request.universe, + top_n=request.top_n, + base_date=request.base_date, + ) +``` + +**Step 5: Commit** + +```bash +git add backend/app/services/strategy/kjb.py backend/app/services/strategy/__init__.py \ + backend/app/schemas/strategy.py backend/app/api/strategy.py +git commit -m "feat: add KJBStrategy ranking class and API endpoint" +``` + +--- + +### Task 6: Daily Backtest Engine + +**Files:** +- Create: `backend/app/services/backtest/daily_engine.py` +- Modify: `backend/app/services/backtest/__init__.py` + +**Step 1: Implement DailyBacktestEngine** + +```python +# backend/app/services/backtest/daily_engine.py +""" +Daily simulation backtest engine for signal-based strategies (KJB). +Unlike the rebalance-based BacktestEngine, this engine: +- Checks entry/exit signals every trading day +- Manages individual positions with stop-loss and trailing stops +- Supports partial exits +""" +import logging +from datetime import date +from decimal import Decimal +from typing import Dict, List + +import pandas as pd +from sqlalchemy.orm import Session +from sqlalchemy import func + +from app.models.backtest import ( + Backtest, BacktestResult, BacktestEquityCurve, + BacktestHolding, BacktestTransaction, +) +from app.models.stock import Stock, Price +from app.services.backtest.trading_portfolio import TradingPortfolio, TradingTransaction +from app.services.backtest.metrics import MetricsCalculator +from app.services.strategy.kjb import KJBSignalGenerator + +logger = logging.getLogger(__name__) + + +class DailyBacktestEngine: + """ + Backtest engine for KJB signal-based strategy. + Runs daily simulation with individual position management. + """ + + def __init__(self, db: Session): + self.db = db + self.signal_gen = KJBSignalGenerator() + + def run(self, backtest_id: int) -> None: + """Execute daily backtest.""" + backtest = self.db.query(Backtest).get(backtest_id) + if not backtest: + raise ValueError(f"Backtest {backtest_id} not found") + + params = backtest.strategy_params or {} + + # Initialize trading portfolio + portfolio = TradingPortfolio( + initial_capital=backtest.initial_capital, + max_positions=params.get("max_positions", 10), + cash_reserve_ratio=Decimal(str(params.get("cash_reserve_ratio", 0.3))), + stop_loss_pct=Decimal(str(params.get("stop_loss_pct", 0.03))), + target1_pct=Decimal(str(params.get("target1_pct", 0.05))), + target2_pct=Decimal(str(params.get("target2_pct", 0.10))), + ) + + # Get trading days + trading_days = self._get_trading_days(backtest.start_date, backtest.end_date) + if not trading_days: + raise ValueError("No trading days found") + + # Get universe (top 30 by market cap) + universe_tickers = self._get_universe_tickers() + + # Load all price data upfront + price_data = self._load_price_data(universe_tickers, backtest.start_date, backtest.end_date) + + # Load KOSPI proxy data + kospi_data = self._load_kospi_data(backtest.start_date, backtest.end_date) + + # Load benchmark for metrics + benchmark_prices = self._load_benchmark_prices( + backtest.benchmark, backtest.start_date, backtest.end_date, + ) + + # Prepare per-stock DataFrames for signal generation + stock_dfs = self._build_stock_dfs(price_data, universe_tickers) + + # Simulation + equity_curve_data: List[Dict] = [] + all_transactions: List[tuple] = [] + holdings_snapshots: List[Dict] = [] + names = self._get_stock_names() + + initial_benchmark = benchmark_prices.get(trading_days[0], Decimal("1")) + if initial_benchmark == 0: + initial_benchmark = Decimal("1") + + for trading_date in trading_days: + day_prices = self._get_day_prices(price_data, trading_date) + + # 1. Check exits first + exit_txns = portfolio.check_exits( + date=trading_date, + prices=day_prices, + commission_rate=backtest.commission_rate, + slippage_rate=backtest.slippage_rate, + ) + for txn in exit_txns: + all_transactions.append((trading_date, txn)) + + # 2. Check entry signals for stocks not in portfolio + for ticker in universe_tickers: + if ticker in portfolio.positions: + continue + if ticker not in stock_dfs or ticker not in day_prices: + continue + + stock_df = stock_dfs[ticker] + if trading_date not in stock_df.index: + continue + + # Check if we have enough history + mask = stock_df.index <= trading_date + if mask.sum() < 21: # need at least 21 days for indicators + continue + + hist = stock_df.loc[mask] + kospi_hist = kospi_data.loc[kospi_data.index <= trading_date] + + if len(kospi_hist) < 11: + continue + + signals = self.signal_gen.generate_signals(hist, kospi_hist) + + if trading_date in signals.index and signals.loc[trading_date, "buy"]: + txn = portfolio.enter_position( + ticker=ticker, + price=day_prices[ticker], + date=trading_date, + commission_rate=backtest.commission_rate, + slippage_rate=backtest.slippage_rate, + ) + if txn: + all_transactions.append((trading_date, txn)) + + # 3. Record daily portfolio value + portfolio_value = portfolio.get_value(day_prices) + benchmark_value = benchmark_prices.get(trading_date, initial_benchmark) + normalized_benchmark = ( + benchmark_value / initial_benchmark * backtest.initial_capital + ) + + equity_curve_data.append({ + "date": trading_date, + "portfolio_value": portfolio_value, + "benchmark_value": normalized_benchmark, + }) + + # Calculate and save results + portfolio_values = [Decimal(str(e["portfolio_value"])) for e in equity_curve_data] + benchmark_values = [Decimal(str(e["benchmark_value"])) for e in equity_curve_data] + + metrics = MetricsCalculator.calculate_all(portfolio_values, benchmark_values) + drawdowns = MetricsCalculator.calculate_drawdown_series(portfolio_values) + + self._save_results( + backtest_id=backtest_id, + metrics=metrics, + equity_curve_data=equity_curve_data, + drawdowns=drawdowns, + transactions=all_transactions, + ) + + def _get_trading_days(self, start_date: date, end_date: date) -> List[date]: + prices = ( + self.db.query(Price.date) + .filter(Price.date >= start_date, Price.date <= end_date) + .distinct() + .order_by(Price.date) + .all() + ) + return [p[0] for p in prices] + + def _get_universe_tickers(self) -> List[str]: + """Get top 30 stocks by market cap.""" + stocks = ( + self.db.query(Stock) + .filter(Stock.market_cap.isnot(None)) + .order_by(Stock.market_cap.desc()) + .limit(30) + .all() + ) + return [s.ticker for s in stocks] + + def _load_price_data( + self, tickers: List[str], start_date: date, end_date: date, + ) -> List: + """Load all price data for universe.""" + return ( + self.db.query(Price) + .filter(Price.ticker.in_(tickers)) + .filter(Price.date >= start_date, Price.date <= end_date) + .all() + ) + + def _load_kospi_data(self, start_date: date, end_date: date) -> pd.DataFrame: + """Load KOSPI proxy (KODEX 200) as DataFrame.""" + prices = ( + self.db.query(Price) + .filter(Price.ticker == "069500") + .filter(Price.date >= start_date, Price.date <= end_date) + .order_by(Price.date) + .all() + ) + if not prices: + return pd.DataFrame(columns=["close"]) + + data = [{"date": p.date, "close": float(p.close)} for p in prices] + df = pd.DataFrame(data).set_index("date") + return df + + def _load_benchmark_prices( + self, benchmark: str, start_date: date, end_date: date, + ) -> Dict[date, Decimal]: + benchmark_ticker = "069500" + prices = ( + self.db.query(Price) + .filter(Price.ticker == benchmark_ticker) + .filter(Price.date >= start_date, Price.date <= end_date) + .all() + ) + return {p.date: p.close for p in prices} + + def _build_stock_dfs( + self, price_data: List, tickers: List[str], + ) -> Dict[str, pd.DataFrame]: + """Build per-stock DataFrames from price data.""" + ticker_rows: Dict[str, list] = {t: [] for t in tickers} + for p in price_data: + if p.ticker in ticker_rows: + ticker_rows[p.ticker].append({ + "date": p.date, + "open": float(p.open), + "high": float(p.high), + "low": float(p.low), + "close": float(p.close), + "volume": int(p.volume), + }) + + result = {} + for ticker, rows in ticker_rows.items(): + if rows: + df = pd.DataFrame(rows).set_index("date").sort_index() + result[ticker] = df + return result + + def _get_day_prices( + self, price_data: List, trading_date: date, + ) -> Dict[str, Decimal]: + """Get prices for a specific day from preloaded data.""" + return { + p.ticker: p.close + for p in price_data + if p.date == trading_date + } + + def _get_stock_names(self) -> Dict[str, str]: + stocks = self.db.query(Stock).all() + return {s.ticker: s.name for s in stocks} + + def _save_results( + self, + backtest_id: int, + metrics, + equity_curve_data: List[Dict], + drawdowns: List[Decimal], + transactions: List, + ) -> None: + """Save results to DB (same format as BacktestEngine).""" + result = BacktestResult( + backtest_id=backtest_id, + total_return=metrics.total_return, + cagr=metrics.cagr, + mdd=metrics.mdd, + sharpe_ratio=metrics.sharpe_ratio, + volatility=metrics.volatility, + benchmark_return=metrics.benchmark_return, + excess_return=metrics.excess_return, + ) + self.db.add(result) + + for i, point in enumerate(equity_curve_data): + curve_point = BacktestEquityCurve( + backtest_id=backtest_id, + date=point["date"], + portfolio_value=point["portfolio_value"], + benchmark_value=point["benchmark_value"], + drawdown=drawdowns[i] if i < len(drawdowns) else Decimal("0"), + ) + self.db.add(curve_point) + + for trading_date, txn in transactions: + t = BacktestTransaction( + backtest_id=backtest_id, + date=trading_date, + ticker=txn.ticker, + action=txn.action, + shares=txn.shares, + price=txn.price, + commission=txn.commission, + ) + self.db.add(t) + + self.db.commit() +``` + +**Step 2: Register in backtest __init__.py** + +Add to `backend/app/services/backtest/__init__.py`: +```python +from app.services.backtest.daily_engine import DailyBacktestEngine +from app.services.backtest.trading_portfolio import TradingPortfolio, TradingTransaction +``` +Add to `__all__`: `"DailyBacktestEngine", "TradingPortfolio", "TradingTransaction"`. + +**Step 3: Commit** + +```bash +git add backend/app/services/backtest/daily_engine.py backend/app/services/backtest/__init__.py +git commit -m "feat: add DailyBacktestEngine for KJB signal-based backtesting" +``` + +--- + +### Task 7: Wire KJB into Backtest System + +Connect KJB strategy to the existing backtest worker so it can be triggered via the backtest API. + +**Files:** +- Modify: `backend/app/services/backtest/worker.py` +- Modify: `backend/app/schemas/backtest.py` + +**Step 1: Update worker to route KJB to DailyBacktestEngine** + +In `backend/app/services/backtest/worker.py`, modify `_run_backtest_job`: + +Replace the engine instantiation section. After `engine = BacktestEngine(db)` / `engine.run(backtest_id)`, change to: + +```python +# In _run_backtest_job, replace: +# engine = BacktestEngine(db) +# engine.run(backtest_id) +# With: + backtest = db.query(Backtest).get(backtest_id) + if backtest.strategy_type == "kjb": + from app.services.backtest.daily_engine import DailyBacktestEngine + engine = DailyBacktestEngine(db) + else: + engine = BacktestEngine(db) + engine.run(backtest_id) +``` + +**Step 2: Update BacktestCreate schema** + +In `backend/app/schemas/backtest.py`, update `BacktestCreate.strategy_type` description: +```python +strategy_type: str = Field(..., description="multi_factor, quality, value_momentum, or kjb") +``` + +**Step 3: Commit** + +```bash +git add backend/app/services/backtest/worker.py backend/app/schemas/backtest.py +git commit -m "feat: wire KJB strategy into backtest worker with DailyBacktestEngine" +``` + +--- + +### Task 8: Signal API Endpoints + +**Files:** +- Create: `backend/app/api/signal.py` +- Modify: `backend/app/api/__init__.py` + +**Step 1: Create signal API** + +```python +# backend/app/api/signal.py +""" +KJB Signal API endpoints. +""" +from datetime import date +from typing import List, Optional + +from fastapi import APIRouter, Depends, Query +from sqlalchemy.orm import Session + +from app.core.database import get_db +from app.api.deps import CurrentUser +from app.models.signal import Signal, SignalStatus +from app.schemas.signal import SignalResponse + +router = APIRouter(prefix="/api/signal", tags=["signal"]) + + +@router.get("/kjb/today", response_model=List[SignalResponse]) +async def get_today_signals( + current_user: CurrentUser, + db: Session = Depends(get_db), +): + """Get today's KJB trading signals.""" + today = date.today() + signals = ( + db.query(Signal) + .filter(Signal.date == today) + .order_by(Signal.signal_type, Signal.ticker) + .all() + ) + return signals + + +@router.get("/kjb/history", response_model=List[SignalResponse]) +async def get_signal_history( + current_user: CurrentUser, + db: Session = Depends(get_db), + start_date: Optional[date] = Query(None), + end_date: Optional[date] = Query(None), + ticker: Optional[str] = Query(None), + limit: int = Query(100, ge=1, le=1000), +): + """Get historical KJB signals.""" + query = db.query(Signal) + + if start_date: + query = query.filter(Signal.date >= start_date) + if end_date: + query = query.filter(Signal.date <= end_date) + if ticker: + query = query.filter(Signal.ticker == ticker) + + signals = ( + query.order_by(Signal.date.desc(), Signal.ticker) + .limit(limit) + .all() + ) + return signals +``` + +**Step 2: Register router in api/__init__.py** + +Add to `backend/app/api/__init__.py`: +```python +from app.api.signal import router as signal_router +``` +Add `"signal_router"` to `__all__`. + +**Step 3: Register in main app** + +Check `backend/app/main.py` for how routers are included and add `signal_router` following the same pattern. + +**Step 4: Commit** + +```bash +git add backend/app/api/signal.py backend/app/api/__init__.py backend/app/main.py +git commit -m "feat: add Signal API endpoints for KJB daily signals" +``` + +--- + +### Task 9: KJB Signal Scheduler Job + +**Files:** +- Create: `backend/jobs/kjb_signal_job.py` +- Modify: `backend/jobs/scheduler.py` +- Modify: `backend/jobs/__init__.py` + +**Step 1: Create signal job** + +```python +# backend/jobs/kjb_signal_job.py +""" +Daily KJB signal generation job. +Runs after market data collection to generate buy/sell signals. +""" +import logging +from datetime import date, timedelta + +import pandas as pd +from sqlalchemy.orm import Session + +from app.core.database import SessionLocal +from app.models.stock import Stock, Price +from app.models.signal import Signal, SignalType, SignalStatus +from app.services.strategy.kjb import KJBSignalGenerator + +logger = logging.getLogger(__name__) + + +def run_kjb_signals(): + """ + Generate KJB trading signals for today. + Called by scheduler at 18:15 KST (after price collection). + """ + logger.info("Starting KJB signal generation") + db: Session = SessionLocal() + + try: + today = date.today() + signal_gen = KJBSignalGenerator() + + # Get universe: top 30 by market cap + stocks = ( + db.query(Stock) + .filter(Stock.market_cap.isnot(None)) + .order_by(Stock.market_cap.desc()) + .limit(30) + .all() + ) + tickers = [s.ticker for s in stocks] + name_map = {s.ticker: s.name for s in stocks} + + # Load KOSPI proxy data (last 60 days for indicator calculation) + lookback_start = today - timedelta(days=90) + kospi_prices = ( + db.query(Price) + .filter(Price.ticker == "069500") + .filter(Price.date >= lookback_start, Price.date <= today) + .order_by(Price.date) + .all() + ) + if not kospi_prices: + logger.warning("No KOSPI data available for signal generation") + return + + kospi_df = pd.DataFrame([ + {"date": p.date, "close": float(p.close)} + for p in kospi_prices + ]).set_index("date") + + signals_created = 0 + + for ticker in tickers: + # Load stock price data + stock_prices = ( + db.query(Price) + .filter(Price.ticker == ticker) + .filter(Price.date >= lookback_start, Price.date <= today) + .order_by(Price.date) + .all() + ) + + if len(stock_prices) < 21: + continue + + stock_df = pd.DataFrame([{ + "date": p.date, + "open": float(p.open), + "high": float(p.high), + "low": float(p.low), + "close": float(p.close), + "volume": int(p.volume), + } for p in stock_prices]).set_index("date") + + # Generate signals + signals = signal_gen.generate_signals(stock_df, kospi_df) + + if today in signals.index and signals.loc[today, "buy"]: + close_price = stock_df.loc[today, "close"] + reason_parts = [] + if signals.loc[today, "breakout"]: + reason_parts.append("breakout") + if signals.loc[today, "large_candle"]: + reason_parts.append("large_candle") + + signal = Signal( + date=today, + ticker=ticker, + name=name_map.get(ticker), + signal_type=SignalType.BUY, + entry_price=close_price, + target_price=round(close_price * 1.05, 2), + stop_loss_price=round(close_price * 0.97, 2), + reason=", ".join(reason_parts), + status=SignalStatus.ACTIVE, + ) + db.add(signal) + signals_created += 1 + + db.commit() + logger.info(f"KJB signal generation complete: {signals_created} buy signals") + + except Exception as e: + logger.exception(f"KJB signal generation failed: {e}") + finally: + db.close() +``` + +**Step 2: Register in scheduler** + +Add to `backend/jobs/scheduler.py` after existing job configurations: + +```python +from jobs.kjb_signal_job import run_kjb_signals + +# In configure_jobs(), add: + scheduler.add_job( + run_kjb_signals, + trigger=CronTrigger( + hour=18, + minute=15, + day_of_week='mon-fri', + timezone=KST, + ), + id='kjb_daily_signals', + name='Generate KJB trading signals', + replace_existing=True, + ) + logger.info("Configured kjb_daily_signals job at 18:15 KST") +``` + +**Step 3: Update jobs/__init__.py** + +Add to `backend/jobs/__init__.py`: +```python +from jobs.kjb_signal_job import run_kjb_signals +``` +Add `"run_kjb_signals"` to `__all__`. + +**Step 4: Commit** + +```bash +git add backend/jobs/kjb_signal_job.py backend/jobs/scheduler.py backend/jobs/__init__.py +git commit -m "feat: add KJB daily signal generation scheduler job" +``` + +--- + +### Task 10: E2E Tests + +**Files:** +- Create: `backend/tests/e2e/test_kjb_flow.py` + +**Step 1: Write E2E tests** + +```python +# backend/tests/e2e/test_kjb_flow.py +""" +E2E tests for KJB strategy flow. +""" +from fastapi.testclient import TestClient + + +def test_kjb_strategy_endpoint(client: TestClient, auth_headers): + """Test KJB strategy ranking endpoint.""" + response = client.post( + "/api/strategy/kjb", + json={ + "universe": {"markets": ["KOSPI"]}, + "top_n": 10, + }, + headers=auth_headers, + ) + assert response.status_code in [200, 400, 500] + if response.status_code == 200: + data = response.json() + assert data["strategy_name"] == "kjb" + assert "stocks" in data + + +def test_kjb_backtest_creation(client: TestClient, auth_headers): + """Test creating a KJB backtest.""" + response = client.post( + "/api/backtest", + json={ + "strategy_type": "kjb", + "strategy_params": { + "max_positions": 10, + "cash_reserve_ratio": 0.3, + "stop_loss_pct": 0.03, + "target1_pct": 0.05, + "target2_pct": 0.10, + }, + "start_date": "2023-01-01", + "end_date": "2023-12-31", + "initial_capital": 10000000, + "commission_rate": 0.00015, + "slippage_rate": 0.001, + "benchmark": "KOSPI", + "top_n": 30, + }, + headers=auth_headers, + ) + assert response.status_code == 200 + data = response.json() + assert "id" in data + assert data["status"] == "pending" + + +def test_signal_today_endpoint(client: TestClient, auth_headers): + """Test today's signals endpoint.""" + response = client.get("/api/signal/kjb/today", headers=auth_headers) + assert response.status_code == 200 + assert isinstance(response.json(), list) + + +def test_signal_history_endpoint(client: TestClient, auth_headers): + """Test signal history endpoint.""" + response = client.get( + "/api/signal/kjb/history", + params={"limit": 10}, + headers=auth_headers, + ) + assert response.status_code == 200 + assert isinstance(response.json(), list) + + +def test_signal_requires_auth(client: TestClient): + """Test that signal endpoints require authentication.""" + response = client.get("/api/signal/kjb/today") + assert response.status_code == 401 +``` + +**Step 2: Run tests** + +Run: `cd /home/zephyrdark/workspace/quant/galaxy-po/backend && python -m pytest tests/e2e/test_kjb_flow.py -v` +Expected: All PASS + +**Step 3: Commit** + +```bash +git add backend/tests/e2e/test_kjb_flow.py +git commit -m "test: add E2E tests for KJB strategy, backtest, and signal endpoints" +``` + +--- + +### Task 11: Frontend - KJB Signal Dashboard + +**Files:** +- Create: `frontend/src/app/signals/page.tsx` + +**Step 1: Explore existing frontend page patterns** + +Read an existing page (e.g., `frontend/src/app/backtest/page.tsx`) to understand the component patterns, API client usage, and layout structure used in this project. + +**Step 2: Create signals page** + +Build the KJB signals dashboard page following the existing frontend patterns: +- Fetch today's signals from `GET /api/signal/kjb/today` +- Display buy signals as cards showing ticker, name, entry price, target, stop-loss +- Add a history table fetching from `GET /api/signal/kjb/history` +- Use the same layout, styling, and API client patterns as existing pages + +**Step 3: Add navigation link** + +Add "Signals" or "KJB 신호" to the navigation/sidebar following the existing navigation pattern. + +**Step 4: Commit** + +```bash +git add frontend/src/app/signals/ +git commit -m "feat: add KJB signal dashboard frontend page" +``` + +--- + +### Task 12: Final Integration & Smoke Test + +**Step 1: Run all tests** + +Run: `cd /home/zephyrdark/workspace/quant/galaxy-po/backend && python -m pytest tests/ -v` +Expected: All PASS + +**Step 2: Start the application and verify** + +Run: `cd /home/zephyrdark/workspace/quant/galaxy-po && docker compose up -d` + +Verify: +- `POST /api/strategy/kjb` returns ranked stocks +- `POST /api/backtest` with `strategy_type: "kjb"` creates and starts a backtest +- `GET /api/signal/kjb/today` returns (possibly empty) signal list +- `GET /api/signal/kjb/history` returns (possibly empty) signal list +- Frontend signals page loads correctly + +**Step 3: Final commit** + +```bash +git add -A +git commit -m "feat: complete KJB strategy full system implementation" +``` diff --git a/frontend/src/app/signals/page.tsx b/frontend/src/app/signals/page.tsx new file mode 100644 index 0000000..f91004e --- /dev/null +++ b/frontend/src/app/signals/page.tsx @@ -0,0 +1,360 @@ +'use client'; + +import { useEffect, useState } from 'react'; +import { useRouter } from 'next/navigation'; +import { DashboardLayout } from '@/components/layout/dashboard-layout'; +import { Card, CardContent, CardHeader, CardTitle } from '@/components/ui/card'; +import { Badge } from '@/components/ui/badge'; +import { Skeleton } from '@/components/ui/skeleton'; +import { Button } from '@/components/ui/button'; +import { Input } from '@/components/ui/input'; +import { Label } from '@/components/ui/label'; +import { api } from '@/lib/api'; +import { Radio, History, RefreshCw, ArrowUpCircle, ArrowDownCircle, MinusCircle } from 'lucide-react'; + +interface Signal { + id: number; + date: string; + ticker: string; + name: string | null; + signal_type: string; + entry_price: number | null; + target_price: number | null; + stop_loss_price: number | null; + reason: string | null; + status: string; + created_at: string; +} + +const signalTypeConfig: Record = { + buy: { + label: '매수', + style: 'bg-green-100 text-green-800 dark:bg-green-900 dark:text-green-200', + icon: ArrowUpCircle, + }, + sell: { + label: '매도', + style: 'bg-red-100 text-red-800 dark:bg-red-900 dark:text-red-200', + icon: ArrowDownCircle, + }, + partial_sell: { + label: '부분매도', + style: 'bg-orange-100 text-orange-800 dark:bg-orange-900 dark:text-orange-200', + icon: MinusCircle, + }, +}; + +const statusConfig: Record = { + active: { + label: '활성', + style: 'bg-blue-100 text-blue-800 dark:bg-blue-900 dark:text-blue-200', + }, + executed: { + label: '실행됨', + style: 'bg-green-100 text-green-800 dark:bg-green-900 dark:text-green-200', + }, + expired: { + label: '만료', + style: 'bg-gray-100 text-gray-800 dark:bg-gray-900 dark:text-gray-200', + }, +}; + +const formatPrice = (value: number | null | undefined) => { + if (value === null || value === undefined) return '-'; + return new Intl.NumberFormat('ko-KR').format(value); +}; + +export default function SignalsPage() { + const router = useRouter(); + const [loading, setLoading] = useState(true); + const [todaySignals, setTodaySignals] = useState([]); + const [historySignals, setHistorySignals] = useState([]); + const [showHistory, setShowHistory] = useState(false); + const [refreshing, setRefreshing] = useState(false); + + // History filter state + const [filterTicker, setFilterTicker] = useState(''); + const [filterStartDate, setFilterStartDate] = useState(''); + const [filterEndDate, setFilterEndDate] = useState(''); + + useEffect(() => { + const init = async () => { + try { + await api.getCurrentUser(); + await fetchTodaySignals(); + } catch { + router.push('/login'); + } finally { + setLoading(false); + } + }; + init(); + }, [router]); + + const fetchTodaySignals = async () => { + try { + const data = await api.get('/api/signal/kjb/today'); + setTodaySignals(data); + } catch (err) { + console.error('Failed to fetch today signals:', err); + } + }; + + const fetchHistorySignals = async () => { + try { + const params = new URLSearchParams(); + if (filterStartDate) params.set('start_date', filterStartDate); + if (filterEndDate) params.set('end_date', filterEndDate); + if (filterTicker) params.set('ticker', filterTicker); + const query = params.toString(); + const url = `/api/signal/kjb/history${query ? `?${query}` : ''}`; + const data = await api.get(url); + setHistorySignals(data); + } catch (err) { + console.error('Failed to fetch signal history:', err); + } + }; + + const handleRefresh = async () => { + setRefreshing(true); + try { + if (showHistory) { + await fetchHistorySignals(); + } else { + await fetchTodaySignals(); + } + } finally { + setRefreshing(false); + } + }; + + const handleShowHistory = async () => { + if (!showHistory && historySignals.length === 0) { + await fetchHistorySignals(); + } + setShowHistory(!showHistory); + }; + + const handleFilterSubmit = async (e: React.FormEvent) => { + e.preventDefault(); + await fetchHistorySignals(); + }; + + const renderSignalTable = (signals: Signal[]) => ( +
+ + + + + + + + + + + + + + + + {signals.map((signal) => { + const typeConf = signalTypeConfig[signal.signal_type] || { + label: signal.signal_type, + style: 'bg-muted', + icon: MinusCircle, + }; + const statConf = statusConfig[signal.status] || { + label: signal.status, + style: 'bg-muted', + }; + const TypeIcon = typeConf.icon; + + return ( + + + + + + + + + + + + ); + })} + {signals.length === 0 && ( + + + + )} + +
날짜종목코드종목명신호진입가목표가손절가사유상태
{signal.date}{signal.ticker}{signal.name || '-'} + + + {typeConf.label} + + {formatPrice(signal.entry_price)}{formatPrice(signal.target_price)}{formatPrice(signal.stop_loss_price)} + {signal.reason || '-'} + + {statConf.label} +
+ 신호가 없습니다. +
+
+ ); + + if (loading) { + return ( + +
+ + +
+
+ ); + } + + return ( + +
+
+

KJB 매매 신호

+

+ KJB 전략 기반 매매 신호를 확인하세요 +

+
+
+ + +
+
+ + {/* Summary Cards */} + {!showHistory && ( +
+ + +
+ + 매수 신호 +
+

+ {todaySignals.filter((s) => s.signal_type === 'buy').length} +

+
+
+ + +
+ + 매도 신호 +
+

+ {todaySignals.filter((s) => s.signal_type === 'sell').length} +

+
+
+ + +
+ + 부분매도 신호 +
+

+ {todaySignals.filter((s) => s.signal_type === 'partial_sell').length} +

+
+
+
+ )} + + {showHistory ? ( +
+ {/* History Filters */} + + +
+
+ + setFilterStartDate(e.target.value)} + className="w-40" + /> +
+
+ + setFilterEndDate(e.target.value)} + className="w-40" + /> +
+
+ + setFilterTicker(e.target.value)} + className="w-36" + /> +
+ +
+
+
+ + {/* History Table */} + + + 신호 이력 + + + {renderSignalTable(historySignals)} + + +
+ ) : ( + + + 오늘의 매매 신호 + + + {renderSignalTable(todaySignals)} + + + )} +
+ ); +} diff --git a/frontend/src/components/layout/new-sidebar.tsx b/frontend/src/components/layout/new-sidebar.tsx index e8abf58..6685c1c 100644 --- a/frontend/src/components/layout/new-sidebar.tsx +++ b/frontend/src/components/layout/new-sidebar.tsx @@ -10,6 +10,7 @@ import { FlaskConical, Database, Search, + Radio, ChevronLeft, ChevronRight, } from 'lucide-react'; @@ -28,6 +29,7 @@ const navItems = [ { href: '/portfolio', label: '포트폴리오', icon: Briefcase }, { href: '/strategy', label: '전략', icon: TrendingUp }, { href: '/backtest', label: '백테스트', icon: FlaskConical }, + { href: '/signals', label: '매매 신호', icon: Radio }, { href: '/admin/data', label: '데이터 수집', icon: Database }, { href: '/admin/data/explorer', label: '데이터 탐색', icon: Search }, ]; diff --git a/quant.md b/quant.md new file mode 100755 index 0000000..6d7708f --- /dev/null +++ b/quant.md @@ -0,0 +1,439 @@ +# 김종봉 주식 전략 퀀트 구현 상세 가이드 + +## 📌 전략 개요 + +### 출처 +- **영상 1**: "이 방법만 알고 있으면 종잣돈 천만원으로 금방 1억 법니다" (GZw3wTgMTwE, 2023-08-09) +- **영상 2**: "이 방법만 알고 있다면 종잣돈 천만원으로 금방 1억 법니다" (Ps9tzVhHpu0, 2024-01-30) +- **채널**: 월급쟁이부자들TV +- **전략가**: 김종봉 + +### 핵심 철학 +- **목표 수익률**: 월 10% (연 약 120%) +- **자금 증대 목표**: 1,000만원 → 1억원 (1년 내) +- **투자 기간**: 단기~중기 (보유 기간 수일~수주) +- **핵심 원칙**: "시장(코스피)을 이기는 종목"에만 투자 + +--- + +## 🎯 구체적 매매 규칙 + +### 1. 초기 자금 및 포트폴리오 구성 + +| 항목 | 기준 | +|------|------| +| **시드 머니** | 1,000만원 (초보자는 100만원 연습 추천) | +| **분산 종목 수** | 5~10개 | +| **종목당 투자금** | 100만원 (1,000만원 기준) 또는 5~10만원 (100만원 기준) | +| **현금 비중** | 약 30% (하락장 대비) | +| **리밸런싱 주기** | 3개월 (분기별 성과 검증) | + +### 2. 종목 선정 기준 (필수 필터) + +#### A. 유동성 필터 +``` +✅ 일 거래대금 >= 2,000억원 (2조 아님, 2천억) +``` + +**대상 종목 예시**: +- 삼성전자 +- SK하이닉스 +- LG전자 +- LG이노텍 +- 네이버 +- 카카오 +- LG에너지솔루션 + +#### B. 시가총액 필터 +``` +✅ 시가총액 순위: 1~30위 (대형주 중심) +``` + +#### C. 상대강도 필터 (핵심) +``` +✅ 종목 수익률 > 코스피 수익률 (최근 2주~1개월) +✅ 특히 삼성전자 > 코스피 일 때 시장 강세 신호 +``` + +**구현 방법**: +```python +# 상대강도 지수 (Relative Strength Index vs Market) +RS = (종목 수익률 / 코스피 수익률) * 100 +# RS > 100이면 시장 대비 강함 +``` + +#### D. 차트 패턴 필터 +``` +✅ 장대양봉 출현 (전일 대비 5% 이상 상승 + 거래량 폭증) +✅ 박스권 돌파 (직전 고점 갱신) +✅ 상승 추세 중 조정 후 지지선 터치 +``` + +**장대양봉 정의**: +- 종가 >= 시가 + (고가 - 저가) × 0.7 +- 거래대금 >= 직전 20일 평균 × 1.5 + +### 3. 진입 규칙 + +#### 시나리오 작성 (필수) +매매 전 반드시 기록: +``` +1. 진입가: [예시] 삼성전자 72,000원 +2. 목표가: [예시] 75,600원 (+5%) +3. 손절가: [예시] 69,840원 (-3%) +4. 근거: 코스피 지지선 반등 + 삼성전자 박스권 상단 돌파 +``` + +#### 진입 타이밍 +| 조건 | 설명 | +|------|------| +| **조건 1** | 코스피 3선(지지·저항·추세) 차트에서 지지선 근처 | +| **조건 2** | 박스권 상단 돌파 후 재진입 (돌파 확인 1~2일 후) | +| **조건 3** | 장대양봉 발생 다음날 시초가 근처 | +| **진입 방식** | 지정가 주문 (고점 근처에서 대기) | + +### 4. 청산 규칙 + +#### A. 익절 (Profit Taking) +``` +✅ 1차 익절: +5% 도달 시 50% 물량 청산 +✅ 2차 익절: +10% 도달 시 나머지 50% 청산 +✅ 목표: 주당 5% 수익 (주 1~2회 매매) +``` + +#### B. 손절 (Stop Loss) +``` +❌ 손절 기준: -3% ~ -10% +❌ 원칙: 시나리오 이탈 시 즉시 손절 +❌ 예시: 진입가 72,000원 → 손절가 69,840원 (-3%) +``` + +#### C. 트레일링 스톱 +``` +- 5% 이상 수익 발생 시 손절선을 진입가(본전)로 상향 +- 10% 이상 수익 시 손절선을 +5% 지점으로 상향 +``` + +### 5. 리스크 관리 + +| 항목 | 기준 | +|------|------| +| **종목당 최대 손실** | -3% (초기 자본의 0.3%) | +| **포트폴리오 최대 손실** | -10% (초기 자본 기준) | +| **승률 목표** | 60% 이상 | +| **손익비** | 1:1.5 이상 (손실 3% vs 수익 5%) | +| **월간 목표 수익률** | 10% | +| **연간 목표 수익률** | 120% (복리 고려 시 약 113%) | + +--- + +## 🔧 퀀트 구현 상세 + +### 필요 데이터 + +#### 1. 시장 데이터 +- **코스피 지수**: 일봉 OHLCV (최소 1년 이상) +- **삼성전자**: 일봉 OHLCV (선행지표로 활용) + +#### 2. 종목 데이터 +```python +필수 컬럼: +- 날짜 (Date) +- 시가 (Open) +- 고가 (High) +- 저가 (Low) +- 종가 (Close) +- 거래량 (Volume) +- 거래대금 (Value = Close × Volume) +- 시가총액 (Market Cap) +``` + +#### 3. 데이터 소스 +- **한국거래소 API** (공식, 실시간) +- **FinanceDataReader** (무료, Python 라이브러리) +- **증권사 API** (키움, 이베스트 등, 실전 매매 시) +- **야후 파이낸스** (yfinance, 해외 주식 포함) + +### 구현 단계 + +#### Phase 1: 데이터 수집 및 전처리 +```python +import pandas as pd +import FinanceDataReader as fdr + +# 1. 코스피 지수 +kospi = fdr.DataReader('KS11', '2020-01-01') + +# 2. 시가총액 상위 30개 종목 +krx = fdr.StockListing('KRX') +top30 = krx.nlargest(30, 'Marcap') + +# 3. 개별 종목 데이터 +stocks = {} +for code in top30['Code']: + stocks[code] = fdr.DataReader(code, '2020-01-01') +``` + +#### Phase 2: 필터링 로직 +```python +def filter_liquidity(df, threshold=200_000_000_000): + """거래대금 2000억 이상 필터""" + df['Value'] = df['Close'] * df['Volume'] + return df[df['Value'] >= threshold] + +def relative_strength(stock_return, market_return): + """상대강도 계산""" + return (stock_return / market_return) * 100 + +def detect_breakout(df, lookback=20): + """박스권 돌파 감지""" + df['Highest'] = df['High'].rolling(lookback).max() + df['Breakout'] = df['Close'] > df['Highest'].shift(1) + return df + +def detect_large_candle(df, threshold=0.05): + """장대양봉 감지 (5% 이상)""" + df['Daily_Return'] = df['Close'].pct_change() + df['Volume_Ratio'] = df['Volume'] / df['Volume'].rolling(20).mean() + df['Large_Candle'] = (df['Daily_Return'] >= threshold) & (df['Volume_Ratio'] >= 1.5) + return df +``` + +#### Phase 3: 신호 생성 +```python +def generate_signals(stock_df, kospi_df): + """매수/매도 신호 생성""" + signals = pd.DataFrame(index=stock_df.index) + + # 상대강도 + stock_ret = stock_df['Close'].pct_change(10) # 2주 + market_ret = kospi_df['Close'].pct_change(10) + signals['RS'] = (stock_ret / market_ret) * 100 + + # 매수 신호 + signals['Buy'] = ( + (signals['RS'] > 100) & # 시장 대비 강함 + (stock_df['Breakout']) & # 박스권 돌파 + (stock_df['Large_Candle']) # 장대양봉 + ) + + return signals +``` + +#### Phase 4: 백테스팅 엔진 +```python +class KimJongBongStrategy: + def __init__(self, initial_capital=10_000_000): + self.capital = initial_capital + self.cash = initial_capital * 0.7 # 30% 현금 보유 + self.positions = {} + self.max_stocks = 10 + self.position_size = initial_capital / self.max_stocks + + def enter_position(self, code, price, date): + """진입""" + shares = int(self.position_size / price) + cost = shares * price + if cost <= self.cash: + self.positions[code] = { + 'shares': shares, + 'entry_price': price, + 'entry_date': date, + 'stop_loss': price * 0.97, # -3% + 'target': price * 1.05 # +5% + } + self.cash -= cost + return True + return False + + def exit_position(self, code, price, date, reason): + """청산""" + if code in self.positions: + pos = self.positions[code] + proceeds = pos['shares'] * price + self.cash += proceeds + profit = (price / pos['entry_price'] - 1) * 100 + del self.positions[code] + return profit + return 0 + + def check_exits(self, date, prices): + """익절/손절 체크""" + for code in list(self.positions.keys()): + pos = self.positions[code] + current_price = prices[code] + + # 손절 + if current_price <= pos['stop_loss']: + self.exit_position(code, current_price, date, 'stop_loss') + + # 익절 + elif current_price >= pos['target']: + self.exit_position(code, current_price, date, 'take_profit') +``` + +#### Phase 5: 성과 평가 +```python +def calculate_metrics(returns): + """성과 지표""" + total_return = (returns + 1).prod() - 1 + sharpe = returns.mean() / returns.std() * (252 ** 0.5) + max_dd = (returns.cumsum() - returns.cumsum().cummax()).min() + + return { + 'Total Return': f"{total_return:.2%}", + 'Sharpe Ratio': f"{sharpe:.2f}", + 'Max Drawdown': f"{max_dd:.2%}", + 'Win Rate': f"{(returns > 0).sum() / len(returns):.2%}" + } +``` + +--- + +## 📊 백테스팅 파라미터 + +### 권장 설정 +| 파라미터 | 값 | 설명 | +|----------|-----|------| +| **백테스팅 기간** | 2020-01-01 ~ 2024-12-31 | 최소 3년 이상 | +| **초기 자본** | 10,000,000원 | 영상 기준 | +| **리밸런싱** | 분기별 (3개월) | 성과 검증 및 종목 교체 | +| **수수료** | 0.015% (매수) + 0.25% (매도, 세금 포함) | 실전 반영 | +| **슬리피지** | 0.1% | 체결가 불리함 | +| **벤치마크** | 코스피 지수 (KS11) | 비교 대상 | + +### 최적화 대상 +1. **상대강도 기간**: 10일 (2주) vs 20일 (1개월) vs 60일 (3개월) +2. **손절 비율**: -3% vs -5% vs -10% +3. **익절 비율**: +5% vs +7% vs +10% +4. **종목 수**: 5개 vs 10개 vs 15개 +5. **현금 비중**: 20% vs 30% vs 50% + +--- + +## 🚀 실전 적용 로드맵 + +### Step 1: 백테스팅 (1~2주) +- 과거 데이터로 전략 검증 +- 연평균 수익률, 최대 낙폭, 승률 확인 +- 벤치마크 대비 초과 수익 여부 평가 + +### Step 2: 페이퍼 트레이딩 (1개월) +- 실시간 데이터로 모의 투자 +- 신호 발생 빈도 및 정확도 체크 +- 감정 개입 없이 기계적 실행 연습 + +### Step 3: 소액 실전 (3개월) +- 100만원으로 시작 +- 5~10개 종목, 종목당 10~20만원 +- 매매 일지 기록 (진입/청산 근거, 수익률) + +### Step 4: 본격 운용 (6개월 이후) +- 1,000만원 투입 +- 분기별 성과 리뷰 +- 전략 개선 (필터 추가, 파라미터 최적화) + +--- + +## ⚠️ 주의사항 + +### 전략의 한계 +1. **백테스팅 없음**: 영상은 실제 통계 검증 없이 경험 기반 +2. **생존 편향**: 성공 사례 중심, 실패 사례 미공개 +3. **시장 환경 의존**: 상승장에서 유리, 하락장에서 취약 +4. **거래 비용**: 잦은 매매 시 수수료+세금 누적 + +### 리스크 관리 필수 +- **과최적화 주의**: 과거 데이터에만 맞춘 전략은 미래 실패 +- **레버리지 금지**: 신용/미수 사용 시 손실 확대 +- **분산 투자**: 한 종목에 30% 이상 투자 금지 +- **감정 통제**: 손절/익절 원칙 엄수 + +--- + +## 📦 필요 기술 스택 + +### Python 라이브러리 +```bash +pip install pandas numpy yfinance FinanceDataReader +pip install backtrader vectorbt ta-lib +pip install matplotlib seaborn plotly +``` + +### 개발 환경 +- **로컬**: Jupyter Notebook / VS Code +- **클라우드**: Google Colab (무료) / AWS SageMaker +- **데이터베이스**: SQLite (소규모) / PostgreSQL (대규모) + +### 자동매매 연동 (선택) +- **증권사 API**: 키움증권 Open API, 이베스트투자증권 xingAPI +- **프레임워크**: PyQt5 (키움), gRPC (이베스트) + +--- + +## 📚 참고 자료 + +### 추가 학습 +1. **책**: "퀀트 투자 무작정 따라하기", "파이썬을 이용한 금융 데이터 분석" +2. **강의**: 패스트캠퍼스 퀀트 트레이딩, 인프런 주식 자동매매 +3. **커뮤니티**: 네이버 카페 "시스템트레이딩", GitHub quant 저장소 + +### 관련 전략 +- **모멘텀 전략**: 과거 수익률 상위 종목 매수 +- **브레이크아웃**: 52주 신고가 돌파 +- **평균회귀**: 과매도 구간 매수 + +--- + +## ✅ 체크리스트 + +### 구현 전 확인사항 +- [ ] 데이터 소스 확보 (최소 3년 일봉) +- [ ] 백테스팅 환경 구축 (Python + 라이브러리) +- [ ] 초기 자본 및 목표 수익률 설정 +- [ ] 리스크 허용 범위 결정 (최대 손실 -10%?) +- [ ] 매매 일지 양식 작성 + +### 실전 전 확인사항 +- [ ] 백테스팅 결과 만족 (연 10% 이상, 샤프 1 이상) +- [ ] 페이퍼 트레이딩 1개월 이상 +- [ ] 손절 원칙 테스트 (감정 개입 없이 실행 가능?) +- [ ] 증권사 계좌 개설 (API 사용 가능 여부 확인) +- [ ] 세금 및 수수료 계산 완료 + +--- + +## 🎯 최종 목표 + +| 기간 | 시드 | 목표 | 월 수익률 | 비고 | +|------|------|------|-----------|------| +| **0개월** | 1,000만원 | - | - | 시작 | +| **3개월** | 1,000만원 | 1,300만원 | 10% | 1분기 | +| **6개월** | 1,300만원 | 1,700만원 | 10% | 2분기 | +| **9개월** | 1,700만원 | 2,200만원 | 10% | 3분기 | +| **12개월** | 2,200만원 | **1억원** | 10% | 목표 달성 | + +**복리 효과 시뮬레이션**: +``` +월 10% × 12개월 = 약 213% (단리) +월 10% 복리 = (1.1)^12 - 1 = 약 213.8% +1,000만원 × 3.138 = 3,138만원 (현실적 목표) +``` + +--- + +## 💬 추가 문의사항 + +구현 과정에서 필요한 사항을 알려주시면: +1. **전체 Python 코드** (백테스팅 시스템) +2. **단계별 모듈** (데이터 수집 → 신호 생성 → 백테스팅) +3. **Jupyter Notebook** (인터랙티브 실습) +4. **자동매매 연동** (증권사 API 통합) + +맞춤형으로 제공해 드리겠습니다. + +--- + +**작성일**: 2026-02-19 +**버전**: 1.0 +**출처**: 김종봉 전략 유튜브 영상 (월급쟁이부자들TV)