54 KiB
Kim Jong-bong (KJB) Strategy Implementation Plan
For Claude: REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.
Goal: Implement the Kim Jong-bong short-term trading strategy as a full system: signal generation, daily backtesting, live signal alerts, and frontend dashboard.
Architecture: New strategy class (KJBStrategy) for ranking + separate KJBSignalGenerator for daily buy/sell signals. New DailyBacktestEngine runs daily simulations (not rebalance-based). New TradingPortfolio handles individual position management with stop-loss/trailing-stop. Signal model stores daily signals for API/frontend consumption.
Tech Stack: Python, FastAPI, SQLAlchemy, PostgreSQL, Alembic, Next.js/React, APScheduler
Task 1: Signal DB Model
Files:
- Create:
backend/app/models/signal.py - Modify:
backend/app/models/__init__.py
Step 1: Create Signal model
# backend/app/models/signal.py
"""
Trading signal models.
"""
import enum
from datetime import datetime
from sqlalchemy import (
Column, Integer, String, Numeric, DateTime, Date,
Text, Enum as SQLEnum,
)
from app.core.database import Base
class SignalType(str, enum.Enum):
BUY = "buy"
SELL = "sell"
PARTIAL_SELL = "partial_sell"
class SignalStatus(str, enum.Enum):
ACTIVE = "active"
EXECUTED = "executed"
EXPIRED = "expired"
class Signal(Base):
__tablename__ = "signals"
id = Column(Integer, primary_key=True, index=True)
date = Column(Date, nullable=False, index=True)
ticker = Column(String(20), nullable=False, index=True)
name = Column(String(100))
signal_type = Column(SQLEnum(SignalType), nullable=False)
entry_price = Column(Numeric(12, 2))
target_price = Column(Numeric(12, 2))
stop_loss_price = Column(Numeric(12, 2))
reason = Column(String(200))
status = Column(SQLEnum(SignalStatus), default=SignalStatus.ACTIVE)
created_at = Column(DateTime, default=datetime.utcnow)
Step 2: Register in models/init.py
Add to backend/app/models/__init__.py:
from app.models.signal import Signal, SignalType, SignalStatus
And add "Signal", "SignalType", "SignalStatus" to __all__.
Step 3: Create Alembic migration
Run: cd /home/zephyrdark/workspace/quant/galaxy-po/backend && alembic revision --autogenerate -m "add signals table"
Step 4: Apply migration
Run: cd /home/zephyrdark/workspace/quant/galaxy-po/backend && alembic upgrade head
Step 5: Commit
git add backend/app/models/signal.py backend/app/models/__init__.py backend/alembic/versions/
git commit -m "feat: add Signal model for KJB trading signals"
Task 2: Signal Schemas
Files:
- Create:
backend/app/schemas/signal.py
Step 1: Create signal schemas
# backend/app/schemas/signal.py
"""
Signal related Pydantic schemas.
"""
from datetime import date, datetime
from decimal import Decimal
from typing import Optional, List
from enum import Enum
from pydantic import BaseModel
from app.schemas.portfolio import FloatDecimal
class SignalType(str, Enum):
BUY = "buy"
SELL = "sell"
PARTIAL_SELL = "partial_sell"
class SignalStatus(str, Enum):
ACTIVE = "active"
EXECUTED = "executed"
EXPIRED = "expired"
class SignalResponse(BaseModel):
"""Single signal response."""
id: int
date: date
ticker: str
name: Optional[str] = None
signal_type: str
entry_price: Optional[FloatDecimal] = None
target_price: Optional[FloatDecimal] = None
stop_loss_price: Optional[FloatDecimal] = None
reason: Optional[str] = None
status: str
created_at: datetime
class Config:
from_attributes = True
class ActivePosition(BaseModel):
"""Active trading position with P&L."""
ticker: str
name: Optional[str] = None
entry_date: date
entry_price: FloatDecimal
current_price: FloatDecimal
shares: int
stop_loss_price: FloatDecimal
target_price: FloatDecimal
pnl_percent: FloatDecimal
pnl_amount: FloatDecimal
Step 2: Commit
git add backend/app/schemas/signal.py
git commit -m "feat: add Signal Pydantic schemas"
Task 3: TradingPortfolio
This is the core position management class for KJB strategy. Separate from VirtualPortfolio.
Files:
- Create:
backend/app/services/backtest/trading_portfolio.py - Create:
backend/tests/unit/test_trading_portfolio.py
Step 1: Write failing tests
# backend/tests/unit/test_trading_portfolio.py
"""
Unit tests for TradingPortfolio.
"""
from decimal import Decimal
from datetime import date
from app.services.backtest.trading_portfolio import TradingPortfolio
def test_initial_state():
tp = TradingPortfolio(Decimal("10000000"))
assert tp.cash == Decimal("10000000")
assert tp.investable_capital == Decimal("7000000") # 70%
assert len(tp.positions) == 0
def test_enter_position():
tp = TradingPortfolio(Decimal("10000000"))
txn = tp.enter_position(
ticker="005930",
price=Decimal("70000"),
date=date(2024, 1, 2),
commission_rate=Decimal("0.00015"),
slippage_rate=Decimal("0.001"),
)
assert txn is not None
assert txn.action == "buy"
assert "005930" in tp.positions
pos = tp.positions["005930"]
assert pos.entry_price == Decimal("70000")
assert pos.stop_loss == Decimal("67900") # -3%
assert pos.target1 == Decimal("73500") # +5%
assert pos.target2 == Decimal("77000") # +10%
def test_max_positions():
tp = TradingPortfolio(Decimal("10000000"), max_positions=2)
tp.enter_position("A", Decimal("1000"), date(2024, 1, 2), Decimal("0"), Decimal("0"))
tp.enter_position("B", Decimal("1000"), date(2024, 1, 2), Decimal("0"), Decimal("0"))
txn = tp.enter_position("C", Decimal("1000"), date(2024, 1, 2), Decimal("0"), Decimal("0"))
assert txn is None # rejected
assert len(tp.positions) == 2
def test_stop_loss_exit():
tp = TradingPortfolio(Decimal("10000000"))
tp.enter_position("005930", Decimal("70000"), date(2024, 1, 2), Decimal("0"), Decimal("0"))
exits = tp.check_exits(
date=date(2024, 1, 3),
prices={"005930": Decimal("67000")}, # below -3%
commission_rate=Decimal("0"),
slippage_rate=Decimal("0"),
)
assert len(exits) == 1
assert exits[0].action == "sell"
assert "005930" not in tp.positions
def test_partial_take_profit():
tp = TradingPortfolio(Decimal("10000000"))
tp.enter_position("005930", Decimal("70000"), date(2024, 1, 2), Decimal("0"), Decimal("0"))
pos = tp.positions["005930"]
initial_shares = pos.shares
exits = tp.check_exits(
date=date(2024, 1, 10),
prices={"005930": Decimal("73500")}, # exactly +5%
commission_rate=Decimal("0"),
slippage_rate=Decimal("0"),
)
assert len(exits) == 1
assert exits[0].action == "partial_sell"
# Should sell 50% of shares
assert exits[0].shares == initial_shares // 2
# Position should still exist with remaining shares
assert "005930" in tp.positions
# Stop loss should be updated to entry price (trailing)
assert tp.positions["005930"].stop_loss == Decimal("70000")
def test_full_take_profit():
tp = TradingPortfolio(Decimal("10000000"))
tp.enter_position("005930", Decimal("70000"), date(2024, 1, 2), Decimal("0"), Decimal("0"))
# First trigger partial at +5%
tp.check_exits(
date=date(2024, 1, 10),
prices={"005930": Decimal("73500")},
commission_rate=Decimal("0"),
slippage_rate=Decimal("0"),
)
# Then trigger full exit at +10%
exits = tp.check_exits(
date=date(2024, 1, 15),
prices={"005930": Decimal("77000")},
commission_rate=Decimal("0"),
slippage_rate=Decimal("0"),
)
assert len(exits) == 1
assert exits[0].action == "sell"
assert "005930" not in tp.positions
def test_trailing_stop_after_10pct():
tp = TradingPortfolio(Decimal("10000000"))
tp.enter_position("005930", Decimal("70000"), date(2024, 1, 2), Decimal("0"), Decimal("0"))
# Trigger partial at +5%
tp.check_exits(
date=date(2024, 1, 10),
prices={"005930": Decimal("73500")},
commission_rate=Decimal("0"),
slippage_rate=Decimal("0"),
)
# Price goes to +10% but doesn't trigger target2 (already partially sold)
# Actually +10% should trigger full exit of remaining
# Let's test trailing stop: price goes up to +8%, trailing stop at entry
tp.check_exits(
date=date(2024, 1, 12),
prices={"005930": Decimal("75600")}, # +8%
commission_rate=Decimal("0"),
slippage_rate=Decimal("0"),
)
# Stop should still be at entry (70000) since we're between 5-10%
assert tp.positions["005930"].stop_loss == Decimal("70000")
def test_cash_reserve():
tp = TradingPortfolio(Decimal("10000000"), cash_reserve_ratio=Decimal("0.3"))
# investable = 7,000,000
# position_size = 7,000,000 / 10 = 700,000
# Total cash should never go below 3,000,000 (30%)
assert tp.investable_capital == Decimal("7000000")
def test_get_portfolio_value():
tp = TradingPortfolio(Decimal("10000000"))
tp.enter_position("005930", Decimal("70000"), date(2024, 1, 2), Decimal("0"), Decimal("0"))
value = tp.get_value({"005930": Decimal("75000")})
assert value > Decimal("10000000") # should be worth more at higher price
Step 2: Run tests to verify they fail
Run: cd /home/zephyrdark/workspace/quant/galaxy-po/backend && python -m pytest tests/unit/test_trading_portfolio.py -v
Expected: FAIL (module not found)
Step 3: Implement TradingPortfolio
# backend/app/services/backtest/trading_portfolio.py
"""
Trading portfolio for signal-based strategies (KJB).
Supports individual position management with stop-loss and trailing stops.
"""
from decimal import Decimal, ROUND_DOWN
from typing import Dict, List, Optional
from dataclasses import dataclass, field
from datetime import date
@dataclass
class Position:
"""An active trading position."""
ticker: str
shares: int
entry_price: Decimal
entry_date: date
stop_loss: Decimal
target1: Decimal # +5% partial sell
target2: Decimal # +10% full sell
partial_sold: bool = False # whether 50% has been sold
@dataclass
class TradingTransaction:
"""A single trading transaction."""
ticker: str
action: str # 'buy', 'sell', 'partial_sell'
shares: int
price: Decimal
commission: Decimal
reason: str = ""
class TradingPortfolio:
"""
Portfolio for signal-based daily trading.
Key differences from VirtualPortfolio:
- Individual position entry/exit (not rebalancing)
- Per-position stop-loss, take-profit, trailing stop
- Cash reserve enforcement (30%)
- Partial position exits (50% at +5%)
"""
def __init__(
self,
initial_capital: Decimal,
max_positions: int = 10,
cash_reserve_ratio: Decimal = Decimal("0.3"),
stop_loss_pct: Decimal = Decimal("0.03"),
target1_pct: Decimal = Decimal("0.05"),
target2_pct: Decimal = Decimal("0.10"),
):
self.initial_capital = initial_capital
self.cash = initial_capital
self.max_positions = max_positions
self.cash_reserve_ratio = cash_reserve_ratio
self.stop_loss_pct = stop_loss_pct
self.target1_pct = target1_pct
self.target2_pct = target2_pct
self.positions: Dict[str, Position] = {}
@property
def investable_capital(self) -> Decimal:
"""Capital available for investment (excluding cash reserve)."""
return self.initial_capital * (1 - self.cash_reserve_ratio)
@property
def position_size(self) -> Decimal:
"""Target size per position."""
return self.investable_capital / Decimal(str(self.max_positions))
def get_value(self, prices: Dict[str, Decimal]) -> Decimal:
"""Calculate total portfolio value."""
holdings_value = sum(
Decimal(str(pos.shares)) * prices.get(pos.ticker, Decimal("0"))
for pos in self.positions.values()
)
return self.cash + holdings_value
def enter_position(
self,
ticker: str,
price: Decimal,
date: date,
commission_rate: Decimal,
slippage_rate: Decimal,
) -> Optional[TradingTransaction]:
"""Enter a new position."""
# Check limits
if len(self.positions) >= self.max_positions:
return None
if ticker in self.positions:
return None
# Calculate shares
buy_price = price * (1 + slippage_rate)
max_cost = self.position_size
available = self.cash - (self.initial_capital * self.cash_reserve_ratio)
if available <= 0:
return None
actual_cost = min(max_cost, available)
shares = int((actual_cost / (buy_price * (1 + commission_rate))).to_integral_value(rounding=ROUND_DOWN))
if shares <= 0:
return None
cost = Decimal(str(shares)) * buy_price
commission = cost * commission_rate
total_cost = cost + commission
self.cash -= total_cost
self.positions[ticker] = Position(
ticker=ticker,
shares=shares,
entry_price=price,
entry_date=date,
stop_loss=price * (1 - self.stop_loss_pct),
target1=price * (1 + self.target1_pct),
target2=price * (1 + self.target2_pct),
)
return TradingTransaction(
ticker=ticker,
action="buy",
shares=shares,
price=buy_price,
commission=commission,
reason="entry_signal",
)
def check_exits(
self,
date: date,
prices: Dict[str, Decimal],
commission_rate: Decimal,
slippage_rate: Decimal,
) -> List[TradingTransaction]:
"""Check all positions for exit conditions. Returns transactions."""
transactions = []
for ticker in list(self.positions.keys()):
pos = self.positions[ticker]
current_price = prices.get(ticker)
if current_price is None:
continue
sell_price = current_price * (1 - slippage_rate)
# 1. Stop-loss check
if current_price <= pos.stop_loss:
txn = self._exit_full(ticker, sell_price, commission_rate, "stop_loss")
if txn:
transactions.append(txn)
continue
# 2. Take-profit 2: +10% (full exit of remaining)
if current_price >= pos.target2 and pos.partial_sold:
txn = self._exit_full(ticker, sell_price, commission_rate, "take_profit_2")
if txn:
transactions.append(txn)
continue
# 3. Take-profit 1: +5% (partial exit 50%)
if current_price >= pos.target1 and not pos.partial_sold:
txn = self._exit_partial(ticker, sell_price, commission_rate)
if txn:
transactions.append(txn)
# Update trailing stop to entry price
pos.stop_loss = pos.entry_price
continue
# 4. Trailing stop update (between 5-10%: stop at entry, above 10%: stop at +5%)
if pos.partial_sold:
gain_pct = (current_price - pos.entry_price) / pos.entry_price
if gain_pct >= self.target2_pct:
new_stop = pos.entry_price * (1 + self.target1_pct)
if new_stop > pos.stop_loss:
pos.stop_loss = new_stop
return transactions
def _exit_full(
self,
ticker: str,
sell_price: Decimal,
commission_rate: Decimal,
reason: str,
) -> Optional[TradingTransaction]:
"""Exit a position fully."""
pos = self.positions.get(ticker)
if not pos or pos.shares <= 0:
return None
proceeds = Decimal(str(pos.shares)) * sell_price
commission = proceeds * commission_rate
self.cash += proceeds - commission
shares = pos.shares
del self.positions[ticker]
return TradingTransaction(
ticker=ticker,
action="sell",
shares=shares,
price=sell_price,
commission=commission,
reason=reason,
)
def _exit_partial(
self,
ticker: str,
sell_price: Decimal,
commission_rate: Decimal,
) -> Optional[TradingTransaction]:
"""Exit 50% of a position."""
pos = self.positions.get(ticker)
if not pos or pos.shares <= 0:
return None
sell_shares = pos.shares // 2
if sell_shares <= 0:
return None
proceeds = Decimal(str(sell_shares)) * sell_price
commission = proceeds * commission_rate
self.cash += proceeds - commission
pos.shares -= sell_shares
pos.partial_sold = True
return TradingTransaction(
ticker=ticker,
action="partial_sell",
shares=sell_shares,
price=sell_price,
commission=commission,
reason="take_profit_1",
)
Step 4: Run tests to verify they pass
Run: cd /home/zephyrdark/workspace/quant/galaxy-po/backend && python -m pytest tests/unit/test_trading_portfolio.py -v
Expected: All PASS
Step 5: Commit
git add backend/app/services/backtest/trading_portfolio.py backend/tests/unit/test_trading_portfolio.py
git commit -m "feat: add TradingPortfolio for signal-based position management"
Task 4: KJB Signal Generator
Core signal generation logic for detecting buy/sell signals.
Files:
- Create:
backend/app/services/strategy/kjb.py - Create:
backend/tests/unit/test_kjb_signal.py
Step 1: Write failing tests for signal generator
# backend/tests/unit/test_kjb_signal.py
"""
Unit tests for KJB signal generator.
"""
import pandas as pd
import numpy as np
from decimal import Decimal
from datetime import date, timedelta
from app.services.strategy.kjb import KJBSignalGenerator
def _make_price_df(closes, volumes=None, start_date=date(2024, 1, 2)):
"""Helper to create price DataFrame."""
dates = [start_date + timedelta(days=i) for i in range(len(closes))]
if volumes is None:
volumes = [1000000] * len(closes)
highs = [c * 1.01 for c in closes]
lows = [c * 0.99 for c in closes]
opens = closes.copy()
return pd.DataFrame({
"date": dates,
"open": opens,
"high": highs,
"low": lows,
"close": closes,
"volume": volumes,
}).set_index("date")
def _make_kospi_df(closes, start_date=date(2024, 1, 2)):
dates = [start_date + timedelta(days=i) for i in range(len(closes))]
return pd.DataFrame({
"date": dates,
"close": closes,
}).set_index("date")
def test_relative_strength_above_market():
gen = KJBSignalGenerator()
# Stock up 10% over 10 days, market up 5%
stock_closes = [100 + i for i in range(25)] # steady rise
kospi_closes = [100 + i * 0.5 for i in range(25)] # slower rise
stock_df = _make_price_df(stock_closes)
kospi_df = _make_kospi_df(kospi_closes)
rs = gen.calculate_relative_strength(stock_df, kospi_df, lookback=10)
# Last value should be > 100 (stock outperforming)
assert rs.iloc[-1] > 100
def test_relative_strength_below_market():
gen = KJBSignalGenerator()
stock_closes = [100 + i * 0.3 for i in range(25)]
kospi_closes = [100 + i for i in range(25)]
stock_df = _make_price_df(stock_closes)
kospi_df = _make_kospi_df(kospi_closes)
rs = gen.calculate_relative_strength(stock_df, kospi_df, lookback=10)
assert rs.iloc[-1] < 100
def test_detect_breakout():
gen = KJBSignalGenerator()
# Flat for 20 days, then breakout
closes = [100.0] * 20 + [105.0]
stock_df = _make_price_df(closes)
breakouts = gen.detect_breakout(stock_df, lookback=20)
assert breakouts.iloc[-1] == True
assert breakouts.iloc[-2] == False
def test_detect_large_candle():
gen = KJBSignalGenerator()
# Normal days, then a 6% jump with 2x volume
closes = [100.0] * 21 + [106.0]
volumes = [1000000] * 21 + [3000000]
stock_df = _make_price_df(closes, volumes)
large = gen.detect_large_candle(stock_df, pct_threshold=0.05, vol_multiplier=1.5)
assert large.iloc[-1] == True
assert large.iloc[-2] == False
def test_no_large_candle_low_volume():
gen = KJBSignalGenerator()
# 6% jump but normal volume
closes = [100.0] * 21 + [106.0]
volumes = [1000000] * 22 # no volume spike
stock_df = _make_price_df(closes, volumes)
large = gen.detect_large_candle(stock_df)
assert large.iloc[-1] == False
def test_generate_buy_signal():
gen = KJBSignalGenerator()
# Create scenario: RS > 100, breakout, large candle
# Stock rises faster than market with a breakout candle
closes = [100.0] * 20 + [106.0] # breakout + large candle
volumes = [1000000] * 20 + [3000000]
kospi_closes = [100.0 + i * 0.1 for i in range(21)] # market barely moves
stock_df = _make_price_df(closes, volumes)
kospi_df = _make_kospi_df(kospi_closes)
signals = gen.generate_signals(stock_df, kospi_df)
# Last day should have a buy signal
assert signals["buy"].iloc[-1] == True
Step 2: Run tests to verify they fail
Run: cd /home/zephyrdark/workspace/quant/galaxy-po/backend && python -m pytest tests/unit/test_kjb_signal.py -v
Expected: FAIL
Step 3: Implement KJBSignalGenerator
# backend/app/services/strategy/kjb.py
"""
Kim Jong-bong (KJB) strategy implementation.
Signal-based short-term trading strategy:
- Universe: market cap top 30, daily trading value >= 200B KRW
- Entry: relative strength > KOSPI + breakout or large candle
- Exit: stop-loss -3%, take-profit +5%/+10%, trailing stop
"""
from datetime import date
from decimal import Decimal
from typing import Dict, List, Optional
import pandas as pd
from sqlalchemy.orm import Session
from app.services.strategy.base import BaseStrategy
from app.schemas.strategy import StockFactor, StrategyResult, UniverseFilter
from app.services.factor_calculator import FactorCalculator
class KJBSignalGenerator:
"""
Generates daily buy/sell signals based on KJB rules.
Pure computation - no DB access. Takes DataFrames as input.
"""
def calculate_relative_strength(
self,
stock_df: pd.DataFrame,
kospi_df: pd.DataFrame,
lookback: int = 10,
) -> pd.Series:
"""
Calculate relative strength vs KOSPI.
RS = (stock return / market return) * 100
RS > 100 means stock outperforms market.
"""
stock_ret = stock_df["close"].pct_change(lookback)
kospi_ret = kospi_df["close"].pct_change(lookback)
# Align indices
aligned = pd.DataFrame({
"stock_ret": stock_ret,
"kospi_ret": kospi_ret,
}).dropna()
# Avoid division by zero
rs = pd.Series(index=stock_df.index, dtype=float)
for idx in aligned.index:
market_ret = aligned.loc[idx, "kospi_ret"]
stock_r = aligned.loc[idx, "stock_ret"]
if abs(market_ret) < 1e-10:
rs[idx] = 100.0 if abs(stock_r) < 1e-10 else (200.0 if stock_r > 0 else 0.0)
else:
rs[idx] = (stock_r / market_ret) * 100
return rs
def detect_breakout(
self,
stock_df: pd.DataFrame,
lookback: int = 20,
) -> pd.Series:
"""
Detect box breakout: close > highest high of previous lookback days.
"""
prev_high = stock_df["high"].rolling(lookback).max().shift(1)
return stock_df["close"] > prev_high
def detect_large_candle(
self,
stock_df: pd.DataFrame,
pct_threshold: float = 0.05,
vol_multiplier: float = 1.5,
) -> pd.Series:
"""
Detect large bullish candle:
- Daily return >= pct_threshold (5%)
- Volume >= vol_multiplier * 20-day average volume
"""
daily_return = stock_df["close"].pct_change()
avg_volume = stock_df["volume"].rolling(20).mean()
volume_ratio = stock_df["volume"] / avg_volume
return (daily_return >= pct_threshold) & (volume_ratio >= vol_multiplier)
def generate_signals(
self,
stock_df: pd.DataFrame,
kospi_df: pd.DataFrame,
rs_lookback: int = 10,
breakout_lookback: int = 20,
) -> pd.DataFrame:
"""
Generate buy signals combining all filters.
Buy when: RS > 100 AND (breakout OR large candle)
"""
rs = self.calculate_relative_strength(stock_df, kospi_df, rs_lookback)
breakout = self.detect_breakout(stock_df, breakout_lookback)
large_candle = self.detect_large_candle(stock_df)
signals = pd.DataFrame(index=stock_df.index)
signals["rs"] = rs
signals["breakout"] = breakout
signals["large_candle"] = large_candle
signals["buy"] = (rs > 100) & (breakout | large_candle)
return signals
Step 4: Run tests
Run: cd /home/zephyrdark/workspace/quant/galaxy-po/backend && python -m pytest tests/unit/test_kjb_signal.py -v
Expected: All PASS
Step 5: Commit
git add backend/app/services/strategy/kjb.py backend/tests/unit/test_kjb_signal.py
git commit -m "feat: add KJBSignalGenerator for daily buy/sell signal detection"
Task 5: KJB Strategy Class
Ranking strategy for compatibility with existing strategy pattern.
Files:
- Modify:
backend/app/services/strategy/kjb.py(add KJBStrategy class) - Modify:
backend/app/services/strategy/__init__.py - Modify:
backend/app/schemas/strategy.py(add KJBRequest) - Modify:
backend/app/api/strategy.py(add endpoint)
Step 1: Add KJBStrategy class to kjb.py
Append to backend/app/services/strategy/kjb.py:
class KJBStrategy(BaseStrategy):
"""
KJB strategy for stock ranking.
Ranks stocks by relative strength, breakout proximity, and volume trend.
Compatible with existing strategy pattern (returns StrategyResult).
"""
strategy_name = "kjb"
def run(
self,
universe_filter: UniverseFilter,
top_n: int,
base_date: date = None,
**kwargs,
) -> StrategyResult:
if base_date is None:
base_date = date.today()
# Get universe - filter to top 30 by market cap
stocks = self.get_universe(universe_filter)
stocks.sort(key=lambda s: s.market_cap or 0, reverse=True)
stocks = stocks[:30]
tickers = [s.ticker for s in stocks]
stock_map = {s.ticker: s for s in stocks}
if not tickers:
return StrategyResult(
strategy_name=self.strategy_name,
base_date=base_date,
universe_count=0,
result_count=0,
stocks=[],
)
# Get valuations and sectors
valuations = self.factor_calc.get_valuations(tickers, base_date)
sectors = self.factor_calc.get_sectors(tickers)
# Calculate momentum as proxy for relative strength ranking
momentum = self.factor_calc.calculate_momentum(
tickers, base_date, months=1, skip_recent=0,
)
# Build results
results = []
for ticker in tickers:
stock = stock_map[ticker]
val = valuations.get(ticker)
mom = momentum.get(ticker, Decimal("0"))
results.append(StockFactor(
ticker=ticker,
name=stock.name,
market=stock.market,
sector_name=sectors.get(ticker),
market_cap=int(stock.market_cap / 100_000_000) if stock.market_cap else None,
close_price=Decimal(str(stock.close_price)) if stock.close_price else None,
per=Decimal(str(val.per)) if val and val.per else None,
pbr=Decimal(str(val.pbr)) if val and val.pbr else None,
dividend_yield=Decimal(str(val.dividend_yield)) if val and val.dividend_yield else None,
momentum_score=mom,
total_score=mom,
))
results.sort(key=lambda x: x.total_score or Decimal("0"), reverse=True)
for i, r in enumerate(results[:top_n], 1):
r.rank = i
return StrategyResult(
strategy_name=self.strategy_name,
base_date=base_date,
universe_count=len(stocks),
result_count=min(top_n, len(results)),
stocks=results[:top_n],
)
Step 2: Register in strategy init.py
Add to backend/app/services/strategy/__init__.py:
from app.services.strategy.kjb import KJBStrategy, KJBSignalGenerator
Add "KJBStrategy", "KJBSignalGenerator" to __all__.
Step 3: Add KJBRequest schema
Add to backend/app/schemas/strategy.py:
class KJBRequest(StrategyRequest):
"""KJB strategy request."""
pass # uses default StrategyRequest params
Step 4: Add API endpoint
Add to backend/app/api/strategy.py:
from app.schemas.strategy import KJBRequest
from app.services.strategy import KJBStrategy
@router.post("/kjb", response_model=StrategyResult)
async def run_kjb(
request: KJBRequest,
current_user: CurrentUser,
db: Session = Depends(get_db),
):
"""Run KJB strategy."""
strategy = KJBStrategy(db)
return strategy.run(
universe_filter=request.universe,
top_n=request.top_n,
base_date=request.base_date,
)
Step 5: Commit
git add backend/app/services/strategy/kjb.py backend/app/services/strategy/__init__.py \
backend/app/schemas/strategy.py backend/app/api/strategy.py
git commit -m "feat: add KJBStrategy ranking class and API endpoint"
Task 6: Daily Backtest Engine
Files:
- Create:
backend/app/services/backtest/daily_engine.py - Modify:
backend/app/services/backtest/__init__.py
Step 1: Implement DailyBacktestEngine
# backend/app/services/backtest/daily_engine.py
"""
Daily simulation backtest engine for signal-based strategies (KJB).
Unlike the rebalance-based BacktestEngine, this engine:
- Checks entry/exit signals every trading day
- Manages individual positions with stop-loss and trailing stops
- Supports partial exits
"""
import logging
from datetime import date
from decimal import Decimal
from typing import Dict, List
import pandas as pd
from sqlalchemy.orm import Session
from sqlalchemy import func
from app.models.backtest import (
Backtest, BacktestResult, BacktestEquityCurve,
BacktestHolding, BacktestTransaction,
)
from app.models.stock import Stock, Price
from app.services.backtest.trading_portfolio import TradingPortfolio, TradingTransaction
from app.services.backtest.metrics import MetricsCalculator
from app.services.strategy.kjb import KJBSignalGenerator
logger = logging.getLogger(__name__)
class DailyBacktestEngine:
"""
Backtest engine for KJB signal-based strategy.
Runs daily simulation with individual position management.
"""
def __init__(self, db: Session):
self.db = db
self.signal_gen = KJBSignalGenerator()
def run(self, backtest_id: int) -> None:
"""Execute daily backtest."""
backtest = self.db.query(Backtest).get(backtest_id)
if not backtest:
raise ValueError(f"Backtest {backtest_id} not found")
params = backtest.strategy_params or {}
# Initialize trading portfolio
portfolio = TradingPortfolio(
initial_capital=backtest.initial_capital,
max_positions=params.get("max_positions", 10),
cash_reserve_ratio=Decimal(str(params.get("cash_reserve_ratio", 0.3))),
stop_loss_pct=Decimal(str(params.get("stop_loss_pct", 0.03))),
target1_pct=Decimal(str(params.get("target1_pct", 0.05))),
target2_pct=Decimal(str(params.get("target2_pct", 0.10))),
)
# Get trading days
trading_days = self._get_trading_days(backtest.start_date, backtest.end_date)
if not trading_days:
raise ValueError("No trading days found")
# Get universe (top 30 by market cap)
universe_tickers = self._get_universe_tickers()
# Load all price data upfront
price_data = self._load_price_data(universe_tickers, backtest.start_date, backtest.end_date)
# Load KOSPI proxy data
kospi_data = self._load_kospi_data(backtest.start_date, backtest.end_date)
# Load benchmark for metrics
benchmark_prices = self._load_benchmark_prices(
backtest.benchmark, backtest.start_date, backtest.end_date,
)
# Prepare per-stock DataFrames for signal generation
stock_dfs = self._build_stock_dfs(price_data, universe_tickers)
# Simulation
equity_curve_data: List[Dict] = []
all_transactions: List[tuple] = []
holdings_snapshots: List[Dict] = []
names = self._get_stock_names()
initial_benchmark = benchmark_prices.get(trading_days[0], Decimal("1"))
if initial_benchmark == 0:
initial_benchmark = Decimal("1")
for trading_date in trading_days:
day_prices = self._get_day_prices(price_data, trading_date)
# 1. Check exits first
exit_txns = portfolio.check_exits(
date=trading_date,
prices=day_prices,
commission_rate=backtest.commission_rate,
slippage_rate=backtest.slippage_rate,
)
for txn in exit_txns:
all_transactions.append((trading_date, txn))
# 2. Check entry signals for stocks not in portfolio
for ticker in universe_tickers:
if ticker in portfolio.positions:
continue
if ticker not in stock_dfs or ticker not in day_prices:
continue
stock_df = stock_dfs[ticker]
if trading_date not in stock_df.index:
continue
# Check if we have enough history
mask = stock_df.index <= trading_date
if mask.sum() < 21: # need at least 21 days for indicators
continue
hist = stock_df.loc[mask]
kospi_hist = kospi_data.loc[kospi_data.index <= trading_date]
if len(kospi_hist) < 11:
continue
signals = self.signal_gen.generate_signals(hist, kospi_hist)
if trading_date in signals.index and signals.loc[trading_date, "buy"]:
txn = portfolio.enter_position(
ticker=ticker,
price=day_prices[ticker],
date=trading_date,
commission_rate=backtest.commission_rate,
slippage_rate=backtest.slippage_rate,
)
if txn:
all_transactions.append((trading_date, txn))
# 3. Record daily portfolio value
portfolio_value = portfolio.get_value(day_prices)
benchmark_value = benchmark_prices.get(trading_date, initial_benchmark)
normalized_benchmark = (
benchmark_value / initial_benchmark * backtest.initial_capital
)
equity_curve_data.append({
"date": trading_date,
"portfolio_value": portfolio_value,
"benchmark_value": normalized_benchmark,
})
# Calculate and save results
portfolio_values = [Decimal(str(e["portfolio_value"])) for e in equity_curve_data]
benchmark_values = [Decimal(str(e["benchmark_value"])) for e in equity_curve_data]
metrics = MetricsCalculator.calculate_all(portfolio_values, benchmark_values)
drawdowns = MetricsCalculator.calculate_drawdown_series(portfolio_values)
self._save_results(
backtest_id=backtest_id,
metrics=metrics,
equity_curve_data=equity_curve_data,
drawdowns=drawdowns,
transactions=all_transactions,
)
def _get_trading_days(self, start_date: date, end_date: date) -> List[date]:
prices = (
self.db.query(Price.date)
.filter(Price.date >= start_date, Price.date <= end_date)
.distinct()
.order_by(Price.date)
.all()
)
return [p[0] for p in prices]
def _get_universe_tickers(self) -> List[str]:
"""Get top 30 stocks by market cap."""
stocks = (
self.db.query(Stock)
.filter(Stock.market_cap.isnot(None))
.order_by(Stock.market_cap.desc())
.limit(30)
.all()
)
return [s.ticker for s in stocks]
def _load_price_data(
self, tickers: List[str], start_date: date, end_date: date,
) -> List:
"""Load all price data for universe."""
return (
self.db.query(Price)
.filter(Price.ticker.in_(tickers))
.filter(Price.date >= start_date, Price.date <= end_date)
.all()
)
def _load_kospi_data(self, start_date: date, end_date: date) -> pd.DataFrame:
"""Load KOSPI proxy (KODEX 200) as DataFrame."""
prices = (
self.db.query(Price)
.filter(Price.ticker == "069500")
.filter(Price.date >= start_date, Price.date <= end_date)
.order_by(Price.date)
.all()
)
if not prices:
return pd.DataFrame(columns=["close"])
data = [{"date": p.date, "close": float(p.close)} for p in prices]
df = pd.DataFrame(data).set_index("date")
return df
def _load_benchmark_prices(
self, benchmark: str, start_date: date, end_date: date,
) -> Dict[date, Decimal]:
benchmark_ticker = "069500"
prices = (
self.db.query(Price)
.filter(Price.ticker == benchmark_ticker)
.filter(Price.date >= start_date, Price.date <= end_date)
.all()
)
return {p.date: p.close for p in prices}
def _build_stock_dfs(
self, price_data: List, tickers: List[str],
) -> Dict[str, pd.DataFrame]:
"""Build per-stock DataFrames from price data."""
ticker_rows: Dict[str, list] = {t: [] for t in tickers}
for p in price_data:
if p.ticker in ticker_rows:
ticker_rows[p.ticker].append({
"date": p.date,
"open": float(p.open),
"high": float(p.high),
"low": float(p.low),
"close": float(p.close),
"volume": int(p.volume),
})
result = {}
for ticker, rows in ticker_rows.items():
if rows:
df = pd.DataFrame(rows).set_index("date").sort_index()
result[ticker] = df
return result
def _get_day_prices(
self, price_data: List, trading_date: date,
) -> Dict[str, Decimal]:
"""Get prices for a specific day from preloaded data."""
return {
p.ticker: p.close
for p in price_data
if p.date == trading_date
}
def _get_stock_names(self) -> Dict[str, str]:
stocks = self.db.query(Stock).all()
return {s.ticker: s.name for s in stocks}
def _save_results(
self,
backtest_id: int,
metrics,
equity_curve_data: List[Dict],
drawdowns: List[Decimal],
transactions: List,
) -> None:
"""Save results to DB (same format as BacktestEngine)."""
result = BacktestResult(
backtest_id=backtest_id,
total_return=metrics.total_return,
cagr=metrics.cagr,
mdd=metrics.mdd,
sharpe_ratio=metrics.sharpe_ratio,
volatility=metrics.volatility,
benchmark_return=metrics.benchmark_return,
excess_return=metrics.excess_return,
)
self.db.add(result)
for i, point in enumerate(equity_curve_data):
curve_point = BacktestEquityCurve(
backtest_id=backtest_id,
date=point["date"],
portfolio_value=point["portfolio_value"],
benchmark_value=point["benchmark_value"],
drawdown=drawdowns[i] if i < len(drawdowns) else Decimal("0"),
)
self.db.add(curve_point)
for trading_date, txn in transactions:
t = BacktestTransaction(
backtest_id=backtest_id,
date=trading_date,
ticker=txn.ticker,
action=txn.action,
shares=txn.shares,
price=txn.price,
commission=txn.commission,
)
self.db.add(t)
self.db.commit()
Step 2: Register in backtest init.py
Add to backend/app/services/backtest/__init__.py:
from app.services.backtest.daily_engine import DailyBacktestEngine
from app.services.backtest.trading_portfolio import TradingPortfolio, TradingTransaction
Add to __all__: "DailyBacktestEngine", "TradingPortfolio", "TradingTransaction".
Step 3: Commit
git add backend/app/services/backtest/daily_engine.py backend/app/services/backtest/__init__.py
git commit -m "feat: add DailyBacktestEngine for KJB signal-based backtesting"
Task 7: Wire KJB into Backtest System
Connect KJB strategy to the existing backtest worker so it can be triggered via the backtest API.
Files:
- Modify:
backend/app/services/backtest/worker.py - Modify:
backend/app/schemas/backtest.py
Step 1: Update worker to route KJB to DailyBacktestEngine
In backend/app/services/backtest/worker.py, modify _run_backtest_job:
Replace the engine instantiation section. After engine = BacktestEngine(db) / engine.run(backtest_id), change to:
# In _run_backtest_job, replace:
# engine = BacktestEngine(db)
# engine.run(backtest_id)
# With:
backtest = db.query(Backtest).get(backtest_id)
if backtest.strategy_type == "kjb":
from app.services.backtest.daily_engine import DailyBacktestEngine
engine = DailyBacktestEngine(db)
else:
engine = BacktestEngine(db)
engine.run(backtest_id)
Step 2: Update BacktestCreate schema
In backend/app/schemas/backtest.py, update BacktestCreate.strategy_type description:
strategy_type: str = Field(..., description="multi_factor, quality, value_momentum, or kjb")
Step 3: Commit
git add backend/app/services/backtest/worker.py backend/app/schemas/backtest.py
git commit -m "feat: wire KJB strategy into backtest worker with DailyBacktestEngine"
Task 8: Signal API Endpoints
Files:
- Create:
backend/app/api/signal.py - Modify:
backend/app/api/__init__.py
Step 1: Create signal API
# backend/app/api/signal.py
"""
KJB Signal API endpoints.
"""
from datetime import date
from typing import List, Optional
from fastapi import APIRouter, Depends, Query
from sqlalchemy.orm import Session
from app.core.database import get_db
from app.api.deps import CurrentUser
from app.models.signal import Signal, SignalStatus
from app.schemas.signal import SignalResponse
router = APIRouter(prefix="/api/signal", tags=["signal"])
@router.get("/kjb/today", response_model=List[SignalResponse])
async def get_today_signals(
current_user: CurrentUser,
db: Session = Depends(get_db),
):
"""Get today's KJB trading signals."""
today = date.today()
signals = (
db.query(Signal)
.filter(Signal.date == today)
.order_by(Signal.signal_type, Signal.ticker)
.all()
)
return signals
@router.get("/kjb/history", response_model=List[SignalResponse])
async def get_signal_history(
current_user: CurrentUser,
db: Session = Depends(get_db),
start_date: Optional[date] = Query(None),
end_date: Optional[date] = Query(None),
ticker: Optional[str] = Query(None),
limit: int = Query(100, ge=1, le=1000),
):
"""Get historical KJB signals."""
query = db.query(Signal)
if start_date:
query = query.filter(Signal.date >= start_date)
if end_date:
query = query.filter(Signal.date <= end_date)
if ticker:
query = query.filter(Signal.ticker == ticker)
signals = (
query.order_by(Signal.date.desc(), Signal.ticker)
.limit(limit)
.all()
)
return signals
Step 2: Register router in api/init.py
Add to backend/app/api/__init__.py:
from app.api.signal import router as signal_router
Add "signal_router" to __all__.
Step 3: Register in main app
Check backend/app/main.py for how routers are included and add signal_router following the same pattern.
Step 4: Commit
git add backend/app/api/signal.py backend/app/api/__init__.py backend/app/main.py
git commit -m "feat: add Signal API endpoints for KJB daily signals"
Task 9: KJB Signal Scheduler Job
Files:
- Create:
backend/jobs/kjb_signal_job.py - Modify:
backend/jobs/scheduler.py - Modify:
backend/jobs/__init__.py
Step 1: Create signal job
# backend/jobs/kjb_signal_job.py
"""
Daily KJB signal generation job.
Runs after market data collection to generate buy/sell signals.
"""
import logging
from datetime import date, timedelta
import pandas as pd
from sqlalchemy.orm import Session
from app.core.database import SessionLocal
from app.models.stock import Stock, Price
from app.models.signal import Signal, SignalType, SignalStatus
from app.services.strategy.kjb import KJBSignalGenerator
logger = logging.getLogger(__name__)
def run_kjb_signals():
"""
Generate KJB trading signals for today.
Called by scheduler at 18:15 KST (after price collection).
"""
logger.info("Starting KJB signal generation")
db: Session = SessionLocal()
try:
today = date.today()
signal_gen = KJBSignalGenerator()
# Get universe: top 30 by market cap
stocks = (
db.query(Stock)
.filter(Stock.market_cap.isnot(None))
.order_by(Stock.market_cap.desc())
.limit(30)
.all()
)
tickers = [s.ticker for s in stocks]
name_map = {s.ticker: s.name for s in stocks}
# Load KOSPI proxy data (last 60 days for indicator calculation)
lookback_start = today - timedelta(days=90)
kospi_prices = (
db.query(Price)
.filter(Price.ticker == "069500")
.filter(Price.date >= lookback_start, Price.date <= today)
.order_by(Price.date)
.all()
)
if not kospi_prices:
logger.warning("No KOSPI data available for signal generation")
return
kospi_df = pd.DataFrame([
{"date": p.date, "close": float(p.close)}
for p in kospi_prices
]).set_index("date")
signals_created = 0
for ticker in tickers:
# Load stock price data
stock_prices = (
db.query(Price)
.filter(Price.ticker == ticker)
.filter(Price.date >= lookback_start, Price.date <= today)
.order_by(Price.date)
.all()
)
if len(stock_prices) < 21:
continue
stock_df = pd.DataFrame([{
"date": p.date,
"open": float(p.open),
"high": float(p.high),
"low": float(p.low),
"close": float(p.close),
"volume": int(p.volume),
} for p in stock_prices]).set_index("date")
# Generate signals
signals = signal_gen.generate_signals(stock_df, kospi_df)
if today in signals.index and signals.loc[today, "buy"]:
close_price = stock_df.loc[today, "close"]
reason_parts = []
if signals.loc[today, "breakout"]:
reason_parts.append("breakout")
if signals.loc[today, "large_candle"]:
reason_parts.append("large_candle")
signal = Signal(
date=today,
ticker=ticker,
name=name_map.get(ticker),
signal_type=SignalType.BUY,
entry_price=close_price,
target_price=round(close_price * 1.05, 2),
stop_loss_price=round(close_price * 0.97, 2),
reason=", ".join(reason_parts),
status=SignalStatus.ACTIVE,
)
db.add(signal)
signals_created += 1
db.commit()
logger.info(f"KJB signal generation complete: {signals_created} buy signals")
except Exception as e:
logger.exception(f"KJB signal generation failed: {e}")
finally:
db.close()
Step 2: Register in scheduler
Add to backend/jobs/scheduler.py after existing job configurations:
from jobs.kjb_signal_job import run_kjb_signals
# In configure_jobs(), add:
scheduler.add_job(
run_kjb_signals,
trigger=CronTrigger(
hour=18,
minute=15,
day_of_week='mon-fri',
timezone=KST,
),
id='kjb_daily_signals',
name='Generate KJB trading signals',
replace_existing=True,
)
logger.info("Configured kjb_daily_signals job at 18:15 KST")
Step 3: Update jobs/init.py
Add to backend/jobs/__init__.py:
from jobs.kjb_signal_job import run_kjb_signals
Add "run_kjb_signals" to __all__.
Step 4: Commit
git add backend/jobs/kjb_signal_job.py backend/jobs/scheduler.py backend/jobs/__init__.py
git commit -m "feat: add KJB daily signal generation scheduler job"
Task 10: E2E Tests
Files:
- Create:
backend/tests/e2e/test_kjb_flow.py
Step 1: Write E2E tests
# backend/tests/e2e/test_kjb_flow.py
"""
E2E tests for KJB strategy flow.
"""
from fastapi.testclient import TestClient
def test_kjb_strategy_endpoint(client: TestClient, auth_headers):
"""Test KJB strategy ranking endpoint."""
response = client.post(
"/api/strategy/kjb",
json={
"universe": {"markets": ["KOSPI"]},
"top_n": 10,
},
headers=auth_headers,
)
assert response.status_code in [200, 400, 500]
if response.status_code == 200:
data = response.json()
assert data["strategy_name"] == "kjb"
assert "stocks" in data
def test_kjb_backtest_creation(client: TestClient, auth_headers):
"""Test creating a KJB backtest."""
response = client.post(
"/api/backtest",
json={
"strategy_type": "kjb",
"strategy_params": {
"max_positions": 10,
"cash_reserve_ratio": 0.3,
"stop_loss_pct": 0.03,
"target1_pct": 0.05,
"target2_pct": 0.10,
},
"start_date": "2023-01-01",
"end_date": "2023-12-31",
"initial_capital": 10000000,
"commission_rate": 0.00015,
"slippage_rate": 0.001,
"benchmark": "KOSPI",
"top_n": 30,
},
headers=auth_headers,
)
assert response.status_code == 200
data = response.json()
assert "id" in data
assert data["status"] == "pending"
def test_signal_today_endpoint(client: TestClient, auth_headers):
"""Test today's signals endpoint."""
response = client.get("/api/signal/kjb/today", headers=auth_headers)
assert response.status_code == 200
assert isinstance(response.json(), list)
def test_signal_history_endpoint(client: TestClient, auth_headers):
"""Test signal history endpoint."""
response = client.get(
"/api/signal/kjb/history",
params={"limit": 10},
headers=auth_headers,
)
assert response.status_code == 200
assert isinstance(response.json(), list)
def test_signal_requires_auth(client: TestClient):
"""Test that signal endpoints require authentication."""
response = client.get("/api/signal/kjb/today")
assert response.status_code == 401
Step 2: Run tests
Run: cd /home/zephyrdark/workspace/quant/galaxy-po/backend && python -m pytest tests/e2e/test_kjb_flow.py -v
Expected: All PASS
Step 3: Commit
git add backend/tests/e2e/test_kjb_flow.py
git commit -m "test: add E2E tests for KJB strategy, backtest, and signal endpoints"
Task 11: Frontend - KJB Signal Dashboard
Files:
- Create:
frontend/src/app/signals/page.tsx
Step 1: Explore existing frontend page patterns
Read an existing page (e.g., frontend/src/app/backtest/page.tsx) to understand the component patterns, API client usage, and layout structure used in this project.
Step 2: Create signals page
Build the KJB signals dashboard page following the existing frontend patterns:
- Fetch today's signals from
GET /api/signal/kjb/today - Display buy signals as cards showing ticker, name, entry price, target, stop-loss
- Add a history table fetching from
GET /api/signal/kjb/history - Use the same layout, styling, and API client patterns as existing pages
Step 3: Add navigation link
Add "Signals" or "KJB 신호" to the navigation/sidebar following the existing navigation pattern.
Step 4: Commit
git add frontend/src/app/signals/
git commit -m "feat: add KJB signal dashboard frontend page"
Task 12: Final Integration & Smoke Test
Step 1: Run all tests
Run: cd /home/zephyrdark/workspace/quant/galaxy-po/backend && python -m pytest tests/ -v
Expected: All PASS
Step 2: Start the application and verify
Run: cd /home/zephyrdark/workspace/quant/galaxy-po && docker compose up -d
Verify:
POST /api/strategy/kjbreturns ranked stocksPOST /api/backtestwithstrategy_type: "kjb"creates and starts a backtestGET /api/signal/kjb/todayreturns (possibly empty) signal listGET /api/signal/kjb/historyreturns (possibly empty) signal list- Frontend signals page loads correctly
Step 3: Final commit
git add -A
git commit -m "feat: complete KJB strategy full system implementation"