1769 lines
54 KiB
Markdown
1769 lines
54 KiB
Markdown
|
|
# Kim Jong-bong (KJB) Strategy Implementation Plan
|
||
|
|
|
||
|
|
> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.
|
||
|
|
|
||
|
|
**Goal:** Implement the Kim Jong-bong short-term trading strategy as a full system: signal generation, daily backtesting, live signal alerts, and frontend dashboard.
|
||
|
|
|
||
|
|
**Architecture:** New strategy class (`KJBStrategy`) for ranking + separate `KJBSignalGenerator` for daily buy/sell signals. New `DailyBacktestEngine` runs daily simulations (not rebalance-based). New `TradingPortfolio` handles individual position management with stop-loss/trailing-stop. Signal model stores daily signals for API/frontend consumption.
|
||
|
|
|
||
|
|
**Tech Stack:** Python, FastAPI, SQLAlchemy, PostgreSQL, Alembic, Next.js/React, APScheduler
|
||
|
|
|
||
|
|
---
|
||
|
|
|
||
|
|
### Task 1: Signal DB Model
|
||
|
|
|
||
|
|
**Files:**
|
||
|
|
- Create: `backend/app/models/signal.py`
|
||
|
|
- Modify: `backend/app/models/__init__.py`
|
||
|
|
|
||
|
|
**Step 1: Create Signal model**
|
||
|
|
|
||
|
|
```python
|
||
|
|
# backend/app/models/signal.py
|
||
|
|
"""
|
||
|
|
Trading signal models.
|
||
|
|
"""
|
||
|
|
import enum
|
||
|
|
from datetime import datetime
|
||
|
|
|
||
|
|
from sqlalchemy import (
|
||
|
|
Column, Integer, String, Numeric, DateTime, Date,
|
||
|
|
Text, Enum as SQLEnum,
|
||
|
|
)
|
||
|
|
|
||
|
|
from app.core.database import Base
|
||
|
|
|
||
|
|
|
||
|
|
class SignalType(str, enum.Enum):
|
||
|
|
BUY = "buy"
|
||
|
|
SELL = "sell"
|
||
|
|
PARTIAL_SELL = "partial_sell"
|
||
|
|
|
||
|
|
|
||
|
|
class SignalStatus(str, enum.Enum):
|
||
|
|
ACTIVE = "active"
|
||
|
|
EXECUTED = "executed"
|
||
|
|
EXPIRED = "expired"
|
||
|
|
|
||
|
|
|
||
|
|
class Signal(Base):
|
||
|
|
__tablename__ = "signals"
|
||
|
|
|
||
|
|
id = Column(Integer, primary_key=True, index=True)
|
||
|
|
date = Column(Date, nullable=False, index=True)
|
||
|
|
ticker = Column(String(20), nullable=False, index=True)
|
||
|
|
name = Column(String(100))
|
||
|
|
signal_type = Column(SQLEnum(SignalType), nullable=False)
|
||
|
|
entry_price = Column(Numeric(12, 2))
|
||
|
|
target_price = Column(Numeric(12, 2))
|
||
|
|
stop_loss_price = Column(Numeric(12, 2))
|
||
|
|
reason = Column(String(200))
|
||
|
|
status = Column(SQLEnum(SignalStatus), default=SignalStatus.ACTIVE)
|
||
|
|
created_at = Column(DateTime, default=datetime.utcnow)
|
||
|
|
```
|
||
|
|
|
||
|
|
**Step 2: Register in models/__init__.py**
|
||
|
|
|
||
|
|
Add to `backend/app/models/__init__.py`:
|
||
|
|
```python
|
||
|
|
from app.models.signal import Signal, SignalType, SignalStatus
|
||
|
|
```
|
||
|
|
And add `"Signal", "SignalType", "SignalStatus"` to `__all__`.
|
||
|
|
|
||
|
|
**Step 3: Create Alembic migration**
|
||
|
|
|
||
|
|
Run: `cd /home/zephyrdark/workspace/quant/galaxy-po/backend && alembic revision --autogenerate -m "add signals table"`
|
||
|
|
|
||
|
|
**Step 4: Apply migration**
|
||
|
|
|
||
|
|
Run: `cd /home/zephyrdark/workspace/quant/galaxy-po/backend && alembic upgrade head`
|
||
|
|
|
||
|
|
**Step 5: Commit**
|
||
|
|
|
||
|
|
```bash
|
||
|
|
git add backend/app/models/signal.py backend/app/models/__init__.py backend/alembic/versions/
|
||
|
|
git commit -m "feat: add Signal model for KJB trading signals"
|
||
|
|
```
|
||
|
|
|
||
|
|
---
|
||
|
|
|
||
|
|
### Task 2: Signal Schemas
|
||
|
|
|
||
|
|
**Files:**
|
||
|
|
- Create: `backend/app/schemas/signal.py`
|
||
|
|
|
||
|
|
**Step 1: Create signal schemas**
|
||
|
|
|
||
|
|
```python
|
||
|
|
# backend/app/schemas/signal.py
|
||
|
|
"""
|
||
|
|
Signal related Pydantic schemas.
|
||
|
|
"""
|
||
|
|
from datetime import date, datetime
|
||
|
|
from decimal import Decimal
|
||
|
|
from typing import Optional, List
|
||
|
|
from enum import Enum
|
||
|
|
|
||
|
|
from pydantic import BaseModel
|
||
|
|
|
||
|
|
from app.schemas.portfolio import FloatDecimal
|
||
|
|
|
||
|
|
|
||
|
|
class SignalType(str, Enum):
|
||
|
|
BUY = "buy"
|
||
|
|
SELL = "sell"
|
||
|
|
PARTIAL_SELL = "partial_sell"
|
||
|
|
|
||
|
|
|
||
|
|
class SignalStatus(str, Enum):
|
||
|
|
ACTIVE = "active"
|
||
|
|
EXECUTED = "executed"
|
||
|
|
EXPIRED = "expired"
|
||
|
|
|
||
|
|
|
||
|
|
class SignalResponse(BaseModel):
|
||
|
|
"""Single signal response."""
|
||
|
|
id: int
|
||
|
|
date: date
|
||
|
|
ticker: str
|
||
|
|
name: Optional[str] = None
|
||
|
|
signal_type: str
|
||
|
|
entry_price: Optional[FloatDecimal] = None
|
||
|
|
target_price: Optional[FloatDecimal] = None
|
||
|
|
stop_loss_price: Optional[FloatDecimal] = None
|
||
|
|
reason: Optional[str] = None
|
||
|
|
status: str
|
||
|
|
created_at: datetime
|
||
|
|
|
||
|
|
class Config:
|
||
|
|
from_attributes = True
|
||
|
|
|
||
|
|
|
||
|
|
class ActivePosition(BaseModel):
|
||
|
|
"""Active trading position with P&L."""
|
||
|
|
ticker: str
|
||
|
|
name: Optional[str] = None
|
||
|
|
entry_date: date
|
||
|
|
entry_price: FloatDecimal
|
||
|
|
current_price: FloatDecimal
|
||
|
|
shares: int
|
||
|
|
stop_loss_price: FloatDecimal
|
||
|
|
target_price: FloatDecimal
|
||
|
|
pnl_percent: FloatDecimal
|
||
|
|
pnl_amount: FloatDecimal
|
||
|
|
```
|
||
|
|
|
||
|
|
**Step 2: Commit**
|
||
|
|
|
||
|
|
```bash
|
||
|
|
git add backend/app/schemas/signal.py
|
||
|
|
git commit -m "feat: add Signal Pydantic schemas"
|
||
|
|
```
|
||
|
|
|
||
|
|
---
|
||
|
|
|
||
|
|
### Task 3: TradingPortfolio
|
||
|
|
|
||
|
|
This is the core position management class for KJB strategy. Separate from `VirtualPortfolio`.
|
||
|
|
|
||
|
|
**Files:**
|
||
|
|
- Create: `backend/app/services/backtest/trading_portfolio.py`
|
||
|
|
- Create: `backend/tests/unit/test_trading_portfolio.py`
|
||
|
|
|
||
|
|
**Step 1: Write failing tests**
|
||
|
|
|
||
|
|
```python
|
||
|
|
# backend/tests/unit/test_trading_portfolio.py
|
||
|
|
"""
|
||
|
|
Unit tests for TradingPortfolio.
|
||
|
|
"""
|
||
|
|
from decimal import Decimal
|
||
|
|
from datetime import date
|
||
|
|
|
||
|
|
from app.services.backtest.trading_portfolio import TradingPortfolio
|
||
|
|
|
||
|
|
|
||
|
|
def test_initial_state():
|
||
|
|
tp = TradingPortfolio(Decimal("10000000"))
|
||
|
|
assert tp.cash == Decimal("10000000")
|
||
|
|
assert tp.investable_capital == Decimal("7000000") # 70%
|
||
|
|
assert len(tp.positions) == 0
|
||
|
|
|
||
|
|
|
||
|
|
def test_enter_position():
|
||
|
|
tp = TradingPortfolio(Decimal("10000000"))
|
||
|
|
txn = tp.enter_position(
|
||
|
|
ticker="005930",
|
||
|
|
price=Decimal("70000"),
|
||
|
|
date=date(2024, 1, 2),
|
||
|
|
commission_rate=Decimal("0.00015"),
|
||
|
|
slippage_rate=Decimal("0.001"),
|
||
|
|
)
|
||
|
|
assert txn is not None
|
||
|
|
assert txn.action == "buy"
|
||
|
|
assert "005930" in tp.positions
|
||
|
|
pos = tp.positions["005930"]
|
||
|
|
assert pos.entry_price == Decimal("70000")
|
||
|
|
assert pos.stop_loss == Decimal("67900") # -3%
|
||
|
|
assert pos.target1 == Decimal("73500") # +5%
|
||
|
|
assert pos.target2 == Decimal("77000") # +10%
|
||
|
|
|
||
|
|
|
||
|
|
def test_max_positions():
|
||
|
|
tp = TradingPortfolio(Decimal("10000000"), max_positions=2)
|
||
|
|
tp.enter_position("A", Decimal("1000"), date(2024, 1, 2), Decimal("0"), Decimal("0"))
|
||
|
|
tp.enter_position("B", Decimal("1000"), date(2024, 1, 2), Decimal("0"), Decimal("0"))
|
||
|
|
txn = tp.enter_position("C", Decimal("1000"), date(2024, 1, 2), Decimal("0"), Decimal("0"))
|
||
|
|
assert txn is None # rejected
|
||
|
|
assert len(tp.positions) == 2
|
||
|
|
|
||
|
|
|
||
|
|
def test_stop_loss_exit():
|
||
|
|
tp = TradingPortfolio(Decimal("10000000"))
|
||
|
|
tp.enter_position("005930", Decimal("70000"), date(2024, 1, 2), Decimal("0"), Decimal("0"))
|
||
|
|
exits = tp.check_exits(
|
||
|
|
date=date(2024, 1, 3),
|
||
|
|
prices={"005930": Decimal("67000")}, # below -3%
|
||
|
|
commission_rate=Decimal("0"),
|
||
|
|
slippage_rate=Decimal("0"),
|
||
|
|
)
|
||
|
|
assert len(exits) == 1
|
||
|
|
assert exits[0].action == "sell"
|
||
|
|
assert "005930" not in tp.positions
|
||
|
|
|
||
|
|
|
||
|
|
def test_partial_take_profit():
|
||
|
|
tp = TradingPortfolio(Decimal("10000000"))
|
||
|
|
tp.enter_position("005930", Decimal("70000"), date(2024, 1, 2), Decimal("0"), Decimal("0"))
|
||
|
|
pos = tp.positions["005930"]
|
||
|
|
initial_shares = pos.shares
|
||
|
|
|
||
|
|
exits = tp.check_exits(
|
||
|
|
date=date(2024, 1, 10),
|
||
|
|
prices={"005930": Decimal("73500")}, # exactly +5%
|
||
|
|
commission_rate=Decimal("0"),
|
||
|
|
slippage_rate=Decimal("0"),
|
||
|
|
)
|
||
|
|
assert len(exits) == 1
|
||
|
|
assert exits[0].action == "partial_sell"
|
||
|
|
# Should sell 50% of shares
|
||
|
|
assert exits[0].shares == initial_shares // 2
|
||
|
|
# Position should still exist with remaining shares
|
||
|
|
assert "005930" in tp.positions
|
||
|
|
# Stop loss should be updated to entry price (trailing)
|
||
|
|
assert tp.positions["005930"].stop_loss == Decimal("70000")
|
||
|
|
|
||
|
|
|
||
|
|
def test_full_take_profit():
|
||
|
|
tp = TradingPortfolio(Decimal("10000000"))
|
||
|
|
tp.enter_position("005930", Decimal("70000"), date(2024, 1, 2), Decimal("0"), Decimal("0"))
|
||
|
|
# First trigger partial at +5%
|
||
|
|
tp.check_exits(
|
||
|
|
date=date(2024, 1, 10),
|
||
|
|
prices={"005930": Decimal("73500")},
|
||
|
|
commission_rate=Decimal("0"),
|
||
|
|
slippage_rate=Decimal("0"),
|
||
|
|
)
|
||
|
|
# Then trigger full exit at +10%
|
||
|
|
exits = tp.check_exits(
|
||
|
|
date=date(2024, 1, 15),
|
||
|
|
prices={"005930": Decimal("77000")},
|
||
|
|
commission_rate=Decimal("0"),
|
||
|
|
slippage_rate=Decimal("0"),
|
||
|
|
)
|
||
|
|
assert len(exits) == 1
|
||
|
|
assert exits[0].action == "sell"
|
||
|
|
assert "005930" not in tp.positions
|
||
|
|
|
||
|
|
|
||
|
|
def test_trailing_stop_after_10pct():
|
||
|
|
tp = TradingPortfolio(Decimal("10000000"))
|
||
|
|
tp.enter_position("005930", Decimal("70000"), date(2024, 1, 2), Decimal("0"), Decimal("0"))
|
||
|
|
# Trigger partial at +5%
|
||
|
|
tp.check_exits(
|
||
|
|
date=date(2024, 1, 10),
|
||
|
|
prices={"005930": Decimal("73500")},
|
||
|
|
commission_rate=Decimal("0"),
|
||
|
|
slippage_rate=Decimal("0"),
|
||
|
|
)
|
||
|
|
# Price goes to +10% but doesn't trigger target2 (already partially sold)
|
||
|
|
# Actually +10% should trigger full exit of remaining
|
||
|
|
# Let's test trailing stop: price goes up to +8%, trailing stop at entry
|
||
|
|
tp.check_exits(
|
||
|
|
date=date(2024, 1, 12),
|
||
|
|
prices={"005930": Decimal("75600")}, # +8%
|
||
|
|
commission_rate=Decimal("0"),
|
||
|
|
slippage_rate=Decimal("0"),
|
||
|
|
)
|
||
|
|
# Stop should still be at entry (70000) since we're between 5-10%
|
||
|
|
assert tp.positions["005930"].stop_loss == Decimal("70000")
|
||
|
|
|
||
|
|
|
||
|
|
def test_cash_reserve():
|
||
|
|
tp = TradingPortfolio(Decimal("10000000"), cash_reserve_ratio=Decimal("0.3"))
|
||
|
|
# investable = 7,000,000
|
||
|
|
# position_size = 7,000,000 / 10 = 700,000
|
||
|
|
# Total cash should never go below 3,000,000 (30%)
|
||
|
|
assert tp.investable_capital == Decimal("7000000")
|
||
|
|
|
||
|
|
|
||
|
|
def test_get_portfolio_value():
|
||
|
|
tp = TradingPortfolio(Decimal("10000000"))
|
||
|
|
tp.enter_position("005930", Decimal("70000"), date(2024, 1, 2), Decimal("0"), Decimal("0"))
|
||
|
|
value = tp.get_value({"005930": Decimal("75000")})
|
||
|
|
assert value > Decimal("10000000") # should be worth more at higher price
|
||
|
|
```
|
||
|
|
|
||
|
|
**Step 2: Run tests to verify they fail**
|
||
|
|
|
||
|
|
Run: `cd /home/zephyrdark/workspace/quant/galaxy-po/backend && python -m pytest tests/unit/test_trading_portfolio.py -v`
|
||
|
|
Expected: FAIL (module not found)
|
||
|
|
|
||
|
|
**Step 3: Implement TradingPortfolio**
|
||
|
|
|
||
|
|
```python
|
||
|
|
# backend/app/services/backtest/trading_portfolio.py
|
||
|
|
"""
|
||
|
|
Trading portfolio for signal-based strategies (KJB).
|
||
|
|
Supports individual position management with stop-loss and trailing stops.
|
||
|
|
"""
|
||
|
|
from decimal import Decimal, ROUND_DOWN
|
||
|
|
from typing import Dict, List, Optional
|
||
|
|
from dataclasses import dataclass, field
|
||
|
|
from datetime import date
|
||
|
|
|
||
|
|
|
||
|
|
@dataclass
|
||
|
|
class Position:
|
||
|
|
"""An active trading position."""
|
||
|
|
ticker: str
|
||
|
|
shares: int
|
||
|
|
entry_price: Decimal
|
||
|
|
entry_date: date
|
||
|
|
stop_loss: Decimal
|
||
|
|
target1: Decimal # +5% partial sell
|
||
|
|
target2: Decimal # +10% full sell
|
||
|
|
partial_sold: bool = False # whether 50% has been sold
|
||
|
|
|
||
|
|
|
||
|
|
@dataclass
|
||
|
|
class TradingTransaction:
|
||
|
|
"""A single trading transaction."""
|
||
|
|
ticker: str
|
||
|
|
action: str # 'buy', 'sell', 'partial_sell'
|
||
|
|
shares: int
|
||
|
|
price: Decimal
|
||
|
|
commission: Decimal
|
||
|
|
reason: str = ""
|
||
|
|
|
||
|
|
|
||
|
|
class TradingPortfolio:
|
||
|
|
"""
|
||
|
|
Portfolio for signal-based daily trading.
|
||
|
|
|
||
|
|
Key differences from VirtualPortfolio:
|
||
|
|
- Individual position entry/exit (not rebalancing)
|
||
|
|
- Per-position stop-loss, take-profit, trailing stop
|
||
|
|
- Cash reserve enforcement (30%)
|
||
|
|
- Partial position exits (50% at +5%)
|
||
|
|
"""
|
||
|
|
|
||
|
|
def __init__(
|
||
|
|
self,
|
||
|
|
initial_capital: Decimal,
|
||
|
|
max_positions: int = 10,
|
||
|
|
cash_reserve_ratio: Decimal = Decimal("0.3"),
|
||
|
|
stop_loss_pct: Decimal = Decimal("0.03"),
|
||
|
|
target1_pct: Decimal = Decimal("0.05"),
|
||
|
|
target2_pct: Decimal = Decimal("0.10"),
|
||
|
|
):
|
||
|
|
self.initial_capital = initial_capital
|
||
|
|
self.cash = initial_capital
|
||
|
|
self.max_positions = max_positions
|
||
|
|
self.cash_reserve_ratio = cash_reserve_ratio
|
||
|
|
self.stop_loss_pct = stop_loss_pct
|
||
|
|
self.target1_pct = target1_pct
|
||
|
|
self.target2_pct = target2_pct
|
||
|
|
self.positions: Dict[str, Position] = {}
|
||
|
|
|
||
|
|
@property
|
||
|
|
def investable_capital(self) -> Decimal:
|
||
|
|
"""Capital available for investment (excluding cash reserve)."""
|
||
|
|
return self.initial_capital * (1 - self.cash_reserve_ratio)
|
||
|
|
|
||
|
|
@property
|
||
|
|
def position_size(self) -> Decimal:
|
||
|
|
"""Target size per position."""
|
||
|
|
return self.investable_capital / Decimal(str(self.max_positions))
|
||
|
|
|
||
|
|
def get_value(self, prices: Dict[str, Decimal]) -> Decimal:
|
||
|
|
"""Calculate total portfolio value."""
|
||
|
|
holdings_value = sum(
|
||
|
|
Decimal(str(pos.shares)) * prices.get(pos.ticker, Decimal("0"))
|
||
|
|
for pos in self.positions.values()
|
||
|
|
)
|
||
|
|
return self.cash + holdings_value
|
||
|
|
|
||
|
|
def enter_position(
|
||
|
|
self,
|
||
|
|
ticker: str,
|
||
|
|
price: Decimal,
|
||
|
|
date: date,
|
||
|
|
commission_rate: Decimal,
|
||
|
|
slippage_rate: Decimal,
|
||
|
|
) -> Optional[TradingTransaction]:
|
||
|
|
"""Enter a new position."""
|
||
|
|
# Check limits
|
||
|
|
if len(self.positions) >= self.max_positions:
|
||
|
|
return None
|
||
|
|
if ticker in self.positions:
|
||
|
|
return None
|
||
|
|
|
||
|
|
# Calculate shares
|
||
|
|
buy_price = price * (1 + slippage_rate)
|
||
|
|
max_cost = self.position_size
|
||
|
|
available = self.cash - (self.initial_capital * self.cash_reserve_ratio)
|
||
|
|
if available <= 0:
|
||
|
|
return None
|
||
|
|
actual_cost = min(max_cost, available)
|
||
|
|
shares = int((actual_cost / (buy_price * (1 + commission_rate))).to_integral_value(rounding=ROUND_DOWN))
|
||
|
|
|
||
|
|
if shares <= 0:
|
||
|
|
return None
|
||
|
|
|
||
|
|
cost = Decimal(str(shares)) * buy_price
|
||
|
|
commission = cost * commission_rate
|
||
|
|
total_cost = cost + commission
|
||
|
|
|
||
|
|
self.cash -= total_cost
|
||
|
|
|
||
|
|
self.positions[ticker] = Position(
|
||
|
|
ticker=ticker,
|
||
|
|
shares=shares,
|
||
|
|
entry_price=price,
|
||
|
|
entry_date=date,
|
||
|
|
stop_loss=price * (1 - self.stop_loss_pct),
|
||
|
|
target1=price * (1 + self.target1_pct),
|
||
|
|
target2=price * (1 + self.target2_pct),
|
||
|
|
)
|
||
|
|
|
||
|
|
return TradingTransaction(
|
||
|
|
ticker=ticker,
|
||
|
|
action="buy",
|
||
|
|
shares=shares,
|
||
|
|
price=buy_price,
|
||
|
|
commission=commission,
|
||
|
|
reason="entry_signal",
|
||
|
|
)
|
||
|
|
|
||
|
|
def check_exits(
|
||
|
|
self,
|
||
|
|
date: date,
|
||
|
|
prices: Dict[str, Decimal],
|
||
|
|
commission_rate: Decimal,
|
||
|
|
slippage_rate: Decimal,
|
||
|
|
) -> List[TradingTransaction]:
|
||
|
|
"""Check all positions for exit conditions. Returns transactions."""
|
||
|
|
transactions = []
|
||
|
|
|
||
|
|
for ticker in list(self.positions.keys()):
|
||
|
|
pos = self.positions[ticker]
|
||
|
|
current_price = prices.get(ticker)
|
||
|
|
if current_price is None:
|
||
|
|
continue
|
||
|
|
|
||
|
|
sell_price = current_price * (1 - slippage_rate)
|
||
|
|
|
||
|
|
# 1. Stop-loss check
|
||
|
|
if current_price <= pos.stop_loss:
|
||
|
|
txn = self._exit_full(ticker, sell_price, commission_rate, "stop_loss")
|
||
|
|
if txn:
|
||
|
|
transactions.append(txn)
|
||
|
|
continue
|
||
|
|
|
||
|
|
# 2. Take-profit 2: +10% (full exit of remaining)
|
||
|
|
if current_price >= pos.target2 and pos.partial_sold:
|
||
|
|
txn = self._exit_full(ticker, sell_price, commission_rate, "take_profit_2")
|
||
|
|
if txn:
|
||
|
|
transactions.append(txn)
|
||
|
|
continue
|
||
|
|
|
||
|
|
# 3. Take-profit 1: +5% (partial exit 50%)
|
||
|
|
if current_price >= pos.target1 and not pos.partial_sold:
|
||
|
|
txn = self._exit_partial(ticker, sell_price, commission_rate)
|
||
|
|
if txn:
|
||
|
|
transactions.append(txn)
|
||
|
|
# Update trailing stop to entry price
|
||
|
|
pos.stop_loss = pos.entry_price
|
||
|
|
continue
|
||
|
|
|
||
|
|
# 4. Trailing stop update (between 5-10%: stop at entry, above 10%: stop at +5%)
|
||
|
|
if pos.partial_sold:
|
||
|
|
gain_pct = (current_price - pos.entry_price) / pos.entry_price
|
||
|
|
if gain_pct >= self.target2_pct:
|
||
|
|
new_stop = pos.entry_price * (1 + self.target1_pct)
|
||
|
|
if new_stop > pos.stop_loss:
|
||
|
|
pos.stop_loss = new_stop
|
||
|
|
|
||
|
|
return transactions
|
||
|
|
|
||
|
|
def _exit_full(
|
||
|
|
self,
|
||
|
|
ticker: str,
|
||
|
|
sell_price: Decimal,
|
||
|
|
commission_rate: Decimal,
|
||
|
|
reason: str,
|
||
|
|
) -> Optional[TradingTransaction]:
|
||
|
|
"""Exit a position fully."""
|
||
|
|
pos = self.positions.get(ticker)
|
||
|
|
if not pos or pos.shares <= 0:
|
||
|
|
return None
|
||
|
|
|
||
|
|
proceeds = Decimal(str(pos.shares)) * sell_price
|
||
|
|
commission = proceeds * commission_rate
|
||
|
|
self.cash += proceeds - commission
|
||
|
|
|
||
|
|
shares = pos.shares
|
||
|
|
del self.positions[ticker]
|
||
|
|
|
||
|
|
return TradingTransaction(
|
||
|
|
ticker=ticker,
|
||
|
|
action="sell",
|
||
|
|
shares=shares,
|
||
|
|
price=sell_price,
|
||
|
|
commission=commission,
|
||
|
|
reason=reason,
|
||
|
|
)
|
||
|
|
|
||
|
|
def _exit_partial(
|
||
|
|
self,
|
||
|
|
ticker: str,
|
||
|
|
sell_price: Decimal,
|
||
|
|
commission_rate: Decimal,
|
||
|
|
) -> Optional[TradingTransaction]:
|
||
|
|
"""Exit 50% of a position."""
|
||
|
|
pos = self.positions.get(ticker)
|
||
|
|
if not pos or pos.shares <= 0:
|
||
|
|
return None
|
||
|
|
|
||
|
|
sell_shares = pos.shares // 2
|
||
|
|
if sell_shares <= 0:
|
||
|
|
return None
|
||
|
|
|
||
|
|
proceeds = Decimal(str(sell_shares)) * sell_price
|
||
|
|
commission = proceeds * commission_rate
|
||
|
|
self.cash += proceeds - commission
|
||
|
|
|
||
|
|
pos.shares -= sell_shares
|
||
|
|
pos.partial_sold = True
|
||
|
|
|
||
|
|
return TradingTransaction(
|
||
|
|
ticker=ticker,
|
||
|
|
action="partial_sell",
|
||
|
|
shares=sell_shares,
|
||
|
|
price=sell_price,
|
||
|
|
commission=commission,
|
||
|
|
reason="take_profit_1",
|
||
|
|
)
|
||
|
|
```
|
||
|
|
|
||
|
|
**Step 4: Run tests to verify they pass**
|
||
|
|
|
||
|
|
Run: `cd /home/zephyrdark/workspace/quant/galaxy-po/backend && python -m pytest tests/unit/test_trading_portfolio.py -v`
|
||
|
|
Expected: All PASS
|
||
|
|
|
||
|
|
**Step 5: Commit**
|
||
|
|
|
||
|
|
```bash
|
||
|
|
git add backend/app/services/backtest/trading_portfolio.py backend/tests/unit/test_trading_portfolio.py
|
||
|
|
git commit -m "feat: add TradingPortfolio for signal-based position management"
|
||
|
|
```
|
||
|
|
|
||
|
|
---
|
||
|
|
|
||
|
|
### Task 4: KJB Signal Generator
|
||
|
|
|
||
|
|
Core signal generation logic for detecting buy/sell signals.
|
||
|
|
|
||
|
|
**Files:**
|
||
|
|
- Create: `backend/app/services/strategy/kjb.py`
|
||
|
|
- Create: `backend/tests/unit/test_kjb_signal.py`
|
||
|
|
|
||
|
|
**Step 1: Write failing tests for signal generator**
|
||
|
|
|
||
|
|
```python
|
||
|
|
# backend/tests/unit/test_kjb_signal.py
|
||
|
|
"""
|
||
|
|
Unit tests for KJB signal generator.
|
||
|
|
"""
|
||
|
|
import pandas as pd
|
||
|
|
import numpy as np
|
||
|
|
from decimal import Decimal
|
||
|
|
from datetime import date, timedelta
|
||
|
|
|
||
|
|
from app.services.strategy.kjb import KJBSignalGenerator
|
||
|
|
|
||
|
|
|
||
|
|
def _make_price_df(closes, volumes=None, start_date=date(2024, 1, 2)):
|
||
|
|
"""Helper to create price DataFrame."""
|
||
|
|
dates = [start_date + timedelta(days=i) for i in range(len(closes))]
|
||
|
|
if volumes is None:
|
||
|
|
volumes = [1000000] * len(closes)
|
||
|
|
highs = [c * 1.01 for c in closes]
|
||
|
|
lows = [c * 0.99 for c in closes]
|
||
|
|
opens = closes.copy()
|
||
|
|
return pd.DataFrame({
|
||
|
|
"date": dates,
|
||
|
|
"open": opens,
|
||
|
|
"high": highs,
|
||
|
|
"low": lows,
|
||
|
|
"close": closes,
|
||
|
|
"volume": volumes,
|
||
|
|
}).set_index("date")
|
||
|
|
|
||
|
|
|
||
|
|
def _make_kospi_df(closes, start_date=date(2024, 1, 2)):
|
||
|
|
dates = [start_date + timedelta(days=i) for i in range(len(closes))]
|
||
|
|
return pd.DataFrame({
|
||
|
|
"date": dates,
|
||
|
|
"close": closes,
|
||
|
|
}).set_index("date")
|
||
|
|
|
||
|
|
|
||
|
|
def test_relative_strength_above_market():
|
||
|
|
gen = KJBSignalGenerator()
|
||
|
|
# Stock up 10% over 10 days, market up 5%
|
||
|
|
stock_closes = [100 + i for i in range(25)] # steady rise
|
||
|
|
kospi_closes = [100 + i * 0.5 for i in range(25)] # slower rise
|
||
|
|
stock_df = _make_price_df(stock_closes)
|
||
|
|
kospi_df = _make_kospi_df(kospi_closes)
|
||
|
|
rs = gen.calculate_relative_strength(stock_df, kospi_df, lookback=10)
|
||
|
|
# Last value should be > 100 (stock outperforming)
|
||
|
|
assert rs.iloc[-1] > 100
|
||
|
|
|
||
|
|
|
||
|
|
def test_relative_strength_below_market():
|
||
|
|
gen = KJBSignalGenerator()
|
||
|
|
stock_closes = [100 + i * 0.3 for i in range(25)]
|
||
|
|
kospi_closes = [100 + i for i in range(25)]
|
||
|
|
stock_df = _make_price_df(stock_closes)
|
||
|
|
kospi_df = _make_kospi_df(kospi_closes)
|
||
|
|
rs = gen.calculate_relative_strength(stock_df, kospi_df, lookback=10)
|
||
|
|
assert rs.iloc[-1] < 100
|
||
|
|
|
||
|
|
|
||
|
|
def test_detect_breakout():
|
||
|
|
gen = KJBSignalGenerator()
|
||
|
|
# Flat for 20 days, then breakout
|
||
|
|
closes = [100.0] * 20 + [105.0]
|
||
|
|
stock_df = _make_price_df(closes)
|
||
|
|
breakouts = gen.detect_breakout(stock_df, lookback=20)
|
||
|
|
assert breakouts.iloc[-1] == True
|
||
|
|
assert breakouts.iloc[-2] == False
|
||
|
|
|
||
|
|
|
||
|
|
def test_detect_large_candle():
|
||
|
|
gen = KJBSignalGenerator()
|
||
|
|
# Normal days, then a 6% jump with 2x volume
|
||
|
|
closes = [100.0] * 21 + [106.0]
|
||
|
|
volumes = [1000000] * 21 + [3000000]
|
||
|
|
stock_df = _make_price_df(closes, volumes)
|
||
|
|
large = gen.detect_large_candle(stock_df, pct_threshold=0.05, vol_multiplier=1.5)
|
||
|
|
assert large.iloc[-1] == True
|
||
|
|
assert large.iloc[-2] == False
|
||
|
|
|
||
|
|
|
||
|
|
def test_no_large_candle_low_volume():
|
||
|
|
gen = KJBSignalGenerator()
|
||
|
|
# 6% jump but normal volume
|
||
|
|
closes = [100.0] * 21 + [106.0]
|
||
|
|
volumes = [1000000] * 22 # no volume spike
|
||
|
|
stock_df = _make_price_df(closes, volumes)
|
||
|
|
large = gen.detect_large_candle(stock_df)
|
||
|
|
assert large.iloc[-1] == False
|
||
|
|
|
||
|
|
|
||
|
|
def test_generate_buy_signal():
|
||
|
|
gen = KJBSignalGenerator()
|
||
|
|
# Create scenario: RS > 100, breakout, large candle
|
||
|
|
# Stock rises faster than market with a breakout candle
|
||
|
|
closes = [100.0] * 20 + [106.0] # breakout + large candle
|
||
|
|
volumes = [1000000] * 20 + [3000000]
|
||
|
|
kospi_closes = [100.0 + i * 0.1 for i in range(21)] # market barely moves
|
||
|
|
|
||
|
|
stock_df = _make_price_df(closes, volumes)
|
||
|
|
kospi_df = _make_kospi_df(kospi_closes)
|
||
|
|
|
||
|
|
signals = gen.generate_signals(stock_df, kospi_df)
|
||
|
|
# Last day should have a buy signal
|
||
|
|
assert signals["buy"].iloc[-1] == True
|
||
|
|
```
|
||
|
|
|
||
|
|
**Step 2: Run tests to verify they fail**
|
||
|
|
|
||
|
|
Run: `cd /home/zephyrdark/workspace/quant/galaxy-po/backend && python -m pytest tests/unit/test_kjb_signal.py -v`
|
||
|
|
Expected: FAIL
|
||
|
|
|
||
|
|
**Step 3: Implement KJBSignalGenerator**
|
||
|
|
|
||
|
|
```python
|
||
|
|
# backend/app/services/strategy/kjb.py
|
||
|
|
"""
|
||
|
|
Kim Jong-bong (KJB) strategy implementation.
|
||
|
|
|
||
|
|
Signal-based short-term trading strategy:
|
||
|
|
- Universe: market cap top 30, daily trading value >= 200B KRW
|
||
|
|
- Entry: relative strength > KOSPI + breakout or large candle
|
||
|
|
- Exit: stop-loss -3%, take-profit +5%/+10%, trailing stop
|
||
|
|
"""
|
||
|
|
from datetime import date
|
||
|
|
from decimal import Decimal
|
||
|
|
from typing import Dict, List, Optional
|
||
|
|
|
||
|
|
import pandas as pd
|
||
|
|
from sqlalchemy.orm import Session
|
||
|
|
|
||
|
|
from app.services.strategy.base import BaseStrategy
|
||
|
|
from app.schemas.strategy import StockFactor, StrategyResult, UniverseFilter
|
||
|
|
from app.services.factor_calculator import FactorCalculator
|
||
|
|
|
||
|
|
|
||
|
|
class KJBSignalGenerator:
|
||
|
|
"""
|
||
|
|
Generates daily buy/sell signals based on KJB rules.
|
||
|
|
Pure computation - no DB access. Takes DataFrames as input.
|
||
|
|
"""
|
||
|
|
|
||
|
|
def calculate_relative_strength(
|
||
|
|
self,
|
||
|
|
stock_df: pd.DataFrame,
|
||
|
|
kospi_df: pd.DataFrame,
|
||
|
|
lookback: int = 10,
|
||
|
|
) -> pd.Series:
|
||
|
|
"""
|
||
|
|
Calculate relative strength vs KOSPI.
|
||
|
|
RS = (stock return / market return) * 100
|
||
|
|
RS > 100 means stock outperforms market.
|
||
|
|
"""
|
||
|
|
stock_ret = stock_df["close"].pct_change(lookback)
|
||
|
|
kospi_ret = kospi_df["close"].pct_change(lookback)
|
||
|
|
|
||
|
|
# Align indices
|
||
|
|
aligned = pd.DataFrame({
|
||
|
|
"stock_ret": stock_ret,
|
||
|
|
"kospi_ret": kospi_ret,
|
||
|
|
}).dropna()
|
||
|
|
|
||
|
|
# Avoid division by zero
|
||
|
|
rs = pd.Series(index=stock_df.index, dtype=float)
|
||
|
|
for idx in aligned.index:
|
||
|
|
market_ret = aligned.loc[idx, "kospi_ret"]
|
||
|
|
stock_r = aligned.loc[idx, "stock_ret"]
|
||
|
|
if abs(market_ret) < 1e-10:
|
||
|
|
rs[idx] = 100.0 if abs(stock_r) < 1e-10 else (200.0 if stock_r > 0 else 0.0)
|
||
|
|
else:
|
||
|
|
rs[idx] = (stock_r / market_ret) * 100
|
||
|
|
|
||
|
|
return rs
|
||
|
|
|
||
|
|
def detect_breakout(
|
||
|
|
self,
|
||
|
|
stock_df: pd.DataFrame,
|
||
|
|
lookback: int = 20,
|
||
|
|
) -> pd.Series:
|
||
|
|
"""
|
||
|
|
Detect box breakout: close > highest high of previous lookback days.
|
||
|
|
"""
|
||
|
|
prev_high = stock_df["high"].rolling(lookback).max().shift(1)
|
||
|
|
return stock_df["close"] > prev_high
|
||
|
|
|
||
|
|
def detect_large_candle(
|
||
|
|
self,
|
||
|
|
stock_df: pd.DataFrame,
|
||
|
|
pct_threshold: float = 0.05,
|
||
|
|
vol_multiplier: float = 1.5,
|
||
|
|
) -> pd.Series:
|
||
|
|
"""
|
||
|
|
Detect large bullish candle:
|
||
|
|
- Daily return >= pct_threshold (5%)
|
||
|
|
- Volume >= vol_multiplier * 20-day average volume
|
||
|
|
"""
|
||
|
|
daily_return = stock_df["close"].pct_change()
|
||
|
|
avg_volume = stock_df["volume"].rolling(20).mean()
|
||
|
|
volume_ratio = stock_df["volume"] / avg_volume
|
||
|
|
|
||
|
|
return (daily_return >= pct_threshold) & (volume_ratio >= vol_multiplier)
|
||
|
|
|
||
|
|
def generate_signals(
|
||
|
|
self,
|
||
|
|
stock_df: pd.DataFrame,
|
||
|
|
kospi_df: pd.DataFrame,
|
||
|
|
rs_lookback: int = 10,
|
||
|
|
breakout_lookback: int = 20,
|
||
|
|
) -> pd.DataFrame:
|
||
|
|
"""
|
||
|
|
Generate buy signals combining all filters.
|
||
|
|
Buy when: RS > 100 AND (breakout OR large candle)
|
||
|
|
"""
|
||
|
|
rs = self.calculate_relative_strength(stock_df, kospi_df, rs_lookback)
|
||
|
|
breakout = self.detect_breakout(stock_df, breakout_lookback)
|
||
|
|
large_candle = self.detect_large_candle(stock_df)
|
||
|
|
|
||
|
|
signals = pd.DataFrame(index=stock_df.index)
|
||
|
|
signals["rs"] = rs
|
||
|
|
signals["breakout"] = breakout
|
||
|
|
signals["large_candle"] = large_candle
|
||
|
|
signals["buy"] = (rs > 100) & (breakout | large_candle)
|
||
|
|
|
||
|
|
return signals
|
||
|
|
```
|
||
|
|
|
||
|
|
**Step 4: Run tests**
|
||
|
|
|
||
|
|
Run: `cd /home/zephyrdark/workspace/quant/galaxy-po/backend && python -m pytest tests/unit/test_kjb_signal.py -v`
|
||
|
|
Expected: All PASS
|
||
|
|
|
||
|
|
**Step 5: Commit**
|
||
|
|
|
||
|
|
```bash
|
||
|
|
git add backend/app/services/strategy/kjb.py backend/tests/unit/test_kjb_signal.py
|
||
|
|
git commit -m "feat: add KJBSignalGenerator for daily buy/sell signal detection"
|
||
|
|
```
|
||
|
|
|
||
|
|
---
|
||
|
|
|
||
|
|
### Task 5: KJB Strategy Class
|
||
|
|
|
||
|
|
Ranking strategy for compatibility with existing strategy pattern.
|
||
|
|
|
||
|
|
**Files:**
|
||
|
|
- Modify: `backend/app/services/strategy/kjb.py` (add KJBStrategy class)
|
||
|
|
- Modify: `backend/app/services/strategy/__init__.py`
|
||
|
|
- Modify: `backend/app/schemas/strategy.py` (add KJBRequest)
|
||
|
|
- Modify: `backend/app/api/strategy.py` (add endpoint)
|
||
|
|
|
||
|
|
**Step 1: Add KJBStrategy class to kjb.py**
|
||
|
|
|
||
|
|
Append to `backend/app/services/strategy/kjb.py`:
|
||
|
|
|
||
|
|
```python
|
||
|
|
class KJBStrategy(BaseStrategy):
|
||
|
|
"""
|
||
|
|
KJB strategy for stock ranking.
|
||
|
|
Ranks stocks by relative strength, breakout proximity, and volume trend.
|
||
|
|
Compatible with existing strategy pattern (returns StrategyResult).
|
||
|
|
"""
|
||
|
|
|
||
|
|
strategy_name = "kjb"
|
||
|
|
|
||
|
|
def run(
|
||
|
|
self,
|
||
|
|
universe_filter: UniverseFilter,
|
||
|
|
top_n: int,
|
||
|
|
base_date: date = None,
|
||
|
|
**kwargs,
|
||
|
|
) -> StrategyResult:
|
||
|
|
if base_date is None:
|
||
|
|
base_date = date.today()
|
||
|
|
|
||
|
|
# Get universe - filter to top 30 by market cap
|
||
|
|
stocks = self.get_universe(universe_filter)
|
||
|
|
stocks.sort(key=lambda s: s.market_cap or 0, reverse=True)
|
||
|
|
stocks = stocks[:30]
|
||
|
|
|
||
|
|
tickers = [s.ticker for s in stocks]
|
||
|
|
stock_map = {s.ticker: s for s in stocks}
|
||
|
|
|
||
|
|
if not tickers:
|
||
|
|
return StrategyResult(
|
||
|
|
strategy_name=self.strategy_name,
|
||
|
|
base_date=base_date,
|
||
|
|
universe_count=0,
|
||
|
|
result_count=0,
|
||
|
|
stocks=[],
|
||
|
|
)
|
||
|
|
|
||
|
|
# Get valuations and sectors
|
||
|
|
valuations = self.factor_calc.get_valuations(tickers, base_date)
|
||
|
|
sectors = self.factor_calc.get_sectors(tickers)
|
||
|
|
|
||
|
|
# Calculate momentum as proxy for relative strength ranking
|
||
|
|
momentum = self.factor_calc.calculate_momentum(
|
||
|
|
tickers, base_date, months=1, skip_recent=0,
|
||
|
|
)
|
||
|
|
|
||
|
|
# Build results
|
||
|
|
results = []
|
||
|
|
for ticker in tickers:
|
||
|
|
stock = stock_map[ticker]
|
||
|
|
val = valuations.get(ticker)
|
||
|
|
mom = momentum.get(ticker, Decimal("0"))
|
||
|
|
|
||
|
|
results.append(StockFactor(
|
||
|
|
ticker=ticker,
|
||
|
|
name=stock.name,
|
||
|
|
market=stock.market,
|
||
|
|
sector_name=sectors.get(ticker),
|
||
|
|
market_cap=int(stock.market_cap / 100_000_000) if stock.market_cap else None,
|
||
|
|
close_price=Decimal(str(stock.close_price)) if stock.close_price else None,
|
||
|
|
per=Decimal(str(val.per)) if val and val.per else None,
|
||
|
|
pbr=Decimal(str(val.pbr)) if val and val.pbr else None,
|
||
|
|
dividend_yield=Decimal(str(val.dividend_yield)) if val and val.dividend_yield else None,
|
||
|
|
momentum_score=mom,
|
||
|
|
total_score=mom,
|
||
|
|
))
|
||
|
|
|
||
|
|
results.sort(key=lambda x: x.total_score or Decimal("0"), reverse=True)
|
||
|
|
for i, r in enumerate(results[:top_n], 1):
|
||
|
|
r.rank = i
|
||
|
|
|
||
|
|
return StrategyResult(
|
||
|
|
strategy_name=self.strategy_name,
|
||
|
|
base_date=base_date,
|
||
|
|
universe_count=len(stocks),
|
||
|
|
result_count=min(top_n, len(results)),
|
||
|
|
stocks=results[:top_n],
|
||
|
|
)
|
||
|
|
```
|
||
|
|
|
||
|
|
**Step 2: Register in strategy __init__.py**
|
||
|
|
|
||
|
|
Add to `backend/app/services/strategy/__init__.py`:
|
||
|
|
```python
|
||
|
|
from app.services.strategy.kjb import KJBStrategy, KJBSignalGenerator
|
||
|
|
```
|
||
|
|
Add `"KJBStrategy", "KJBSignalGenerator"` to `__all__`.
|
||
|
|
|
||
|
|
**Step 3: Add KJBRequest schema**
|
||
|
|
|
||
|
|
Add to `backend/app/schemas/strategy.py`:
|
||
|
|
```python
|
||
|
|
class KJBRequest(StrategyRequest):
|
||
|
|
"""KJB strategy request."""
|
||
|
|
pass # uses default StrategyRequest params
|
||
|
|
```
|
||
|
|
|
||
|
|
**Step 4: Add API endpoint**
|
||
|
|
|
||
|
|
Add to `backend/app/api/strategy.py`:
|
||
|
|
```python
|
||
|
|
from app.schemas.strategy import KJBRequest
|
||
|
|
from app.services.strategy import KJBStrategy
|
||
|
|
|
||
|
|
@router.post("/kjb", response_model=StrategyResult)
|
||
|
|
async def run_kjb(
|
||
|
|
request: KJBRequest,
|
||
|
|
current_user: CurrentUser,
|
||
|
|
db: Session = Depends(get_db),
|
||
|
|
):
|
||
|
|
"""Run KJB strategy."""
|
||
|
|
strategy = KJBStrategy(db)
|
||
|
|
return strategy.run(
|
||
|
|
universe_filter=request.universe,
|
||
|
|
top_n=request.top_n,
|
||
|
|
base_date=request.base_date,
|
||
|
|
)
|
||
|
|
```
|
||
|
|
|
||
|
|
**Step 5: Commit**
|
||
|
|
|
||
|
|
```bash
|
||
|
|
git add backend/app/services/strategy/kjb.py backend/app/services/strategy/__init__.py \
|
||
|
|
backend/app/schemas/strategy.py backend/app/api/strategy.py
|
||
|
|
git commit -m "feat: add KJBStrategy ranking class and API endpoint"
|
||
|
|
```
|
||
|
|
|
||
|
|
---
|
||
|
|
|
||
|
|
### Task 6: Daily Backtest Engine
|
||
|
|
|
||
|
|
**Files:**
|
||
|
|
- Create: `backend/app/services/backtest/daily_engine.py`
|
||
|
|
- Modify: `backend/app/services/backtest/__init__.py`
|
||
|
|
|
||
|
|
**Step 1: Implement DailyBacktestEngine**
|
||
|
|
|
||
|
|
```python
|
||
|
|
# backend/app/services/backtest/daily_engine.py
|
||
|
|
"""
|
||
|
|
Daily simulation backtest engine for signal-based strategies (KJB).
|
||
|
|
Unlike the rebalance-based BacktestEngine, this engine:
|
||
|
|
- Checks entry/exit signals every trading day
|
||
|
|
- Manages individual positions with stop-loss and trailing stops
|
||
|
|
- Supports partial exits
|
||
|
|
"""
|
||
|
|
import logging
|
||
|
|
from datetime import date
|
||
|
|
from decimal import Decimal
|
||
|
|
from typing import Dict, List
|
||
|
|
|
||
|
|
import pandas as pd
|
||
|
|
from sqlalchemy.orm import Session
|
||
|
|
from sqlalchemy import func
|
||
|
|
|
||
|
|
from app.models.backtest import (
|
||
|
|
Backtest, BacktestResult, BacktestEquityCurve,
|
||
|
|
BacktestHolding, BacktestTransaction,
|
||
|
|
)
|
||
|
|
from app.models.stock import Stock, Price
|
||
|
|
from app.services.backtest.trading_portfolio import TradingPortfolio, TradingTransaction
|
||
|
|
from app.services.backtest.metrics import MetricsCalculator
|
||
|
|
from app.services.strategy.kjb import KJBSignalGenerator
|
||
|
|
|
||
|
|
logger = logging.getLogger(__name__)
|
||
|
|
|
||
|
|
|
||
|
|
class DailyBacktestEngine:
|
||
|
|
"""
|
||
|
|
Backtest engine for KJB signal-based strategy.
|
||
|
|
Runs daily simulation with individual position management.
|
||
|
|
"""
|
||
|
|
|
||
|
|
def __init__(self, db: Session):
|
||
|
|
self.db = db
|
||
|
|
self.signal_gen = KJBSignalGenerator()
|
||
|
|
|
||
|
|
def run(self, backtest_id: int) -> None:
|
||
|
|
"""Execute daily backtest."""
|
||
|
|
backtest = self.db.query(Backtest).get(backtest_id)
|
||
|
|
if not backtest:
|
||
|
|
raise ValueError(f"Backtest {backtest_id} not found")
|
||
|
|
|
||
|
|
params = backtest.strategy_params or {}
|
||
|
|
|
||
|
|
# Initialize trading portfolio
|
||
|
|
portfolio = TradingPortfolio(
|
||
|
|
initial_capital=backtest.initial_capital,
|
||
|
|
max_positions=params.get("max_positions", 10),
|
||
|
|
cash_reserve_ratio=Decimal(str(params.get("cash_reserve_ratio", 0.3))),
|
||
|
|
stop_loss_pct=Decimal(str(params.get("stop_loss_pct", 0.03))),
|
||
|
|
target1_pct=Decimal(str(params.get("target1_pct", 0.05))),
|
||
|
|
target2_pct=Decimal(str(params.get("target2_pct", 0.10))),
|
||
|
|
)
|
||
|
|
|
||
|
|
# Get trading days
|
||
|
|
trading_days = self._get_trading_days(backtest.start_date, backtest.end_date)
|
||
|
|
if not trading_days:
|
||
|
|
raise ValueError("No trading days found")
|
||
|
|
|
||
|
|
# Get universe (top 30 by market cap)
|
||
|
|
universe_tickers = self._get_universe_tickers()
|
||
|
|
|
||
|
|
# Load all price data upfront
|
||
|
|
price_data = self._load_price_data(universe_tickers, backtest.start_date, backtest.end_date)
|
||
|
|
|
||
|
|
# Load KOSPI proxy data
|
||
|
|
kospi_data = self._load_kospi_data(backtest.start_date, backtest.end_date)
|
||
|
|
|
||
|
|
# Load benchmark for metrics
|
||
|
|
benchmark_prices = self._load_benchmark_prices(
|
||
|
|
backtest.benchmark, backtest.start_date, backtest.end_date,
|
||
|
|
)
|
||
|
|
|
||
|
|
# Prepare per-stock DataFrames for signal generation
|
||
|
|
stock_dfs = self._build_stock_dfs(price_data, universe_tickers)
|
||
|
|
|
||
|
|
# Simulation
|
||
|
|
equity_curve_data: List[Dict] = []
|
||
|
|
all_transactions: List[tuple] = []
|
||
|
|
holdings_snapshots: List[Dict] = []
|
||
|
|
names = self._get_stock_names()
|
||
|
|
|
||
|
|
initial_benchmark = benchmark_prices.get(trading_days[0], Decimal("1"))
|
||
|
|
if initial_benchmark == 0:
|
||
|
|
initial_benchmark = Decimal("1")
|
||
|
|
|
||
|
|
for trading_date in trading_days:
|
||
|
|
day_prices = self._get_day_prices(price_data, trading_date)
|
||
|
|
|
||
|
|
# 1. Check exits first
|
||
|
|
exit_txns = portfolio.check_exits(
|
||
|
|
date=trading_date,
|
||
|
|
prices=day_prices,
|
||
|
|
commission_rate=backtest.commission_rate,
|
||
|
|
slippage_rate=backtest.slippage_rate,
|
||
|
|
)
|
||
|
|
for txn in exit_txns:
|
||
|
|
all_transactions.append((trading_date, txn))
|
||
|
|
|
||
|
|
# 2. Check entry signals for stocks not in portfolio
|
||
|
|
for ticker in universe_tickers:
|
||
|
|
if ticker in portfolio.positions:
|
||
|
|
continue
|
||
|
|
if ticker not in stock_dfs or ticker not in day_prices:
|
||
|
|
continue
|
||
|
|
|
||
|
|
stock_df = stock_dfs[ticker]
|
||
|
|
if trading_date not in stock_df.index:
|
||
|
|
continue
|
||
|
|
|
||
|
|
# Check if we have enough history
|
||
|
|
mask = stock_df.index <= trading_date
|
||
|
|
if mask.sum() < 21: # need at least 21 days for indicators
|
||
|
|
continue
|
||
|
|
|
||
|
|
hist = stock_df.loc[mask]
|
||
|
|
kospi_hist = kospi_data.loc[kospi_data.index <= trading_date]
|
||
|
|
|
||
|
|
if len(kospi_hist) < 11:
|
||
|
|
continue
|
||
|
|
|
||
|
|
signals = self.signal_gen.generate_signals(hist, kospi_hist)
|
||
|
|
|
||
|
|
if trading_date in signals.index and signals.loc[trading_date, "buy"]:
|
||
|
|
txn = portfolio.enter_position(
|
||
|
|
ticker=ticker,
|
||
|
|
price=day_prices[ticker],
|
||
|
|
date=trading_date,
|
||
|
|
commission_rate=backtest.commission_rate,
|
||
|
|
slippage_rate=backtest.slippage_rate,
|
||
|
|
)
|
||
|
|
if txn:
|
||
|
|
all_transactions.append((trading_date, txn))
|
||
|
|
|
||
|
|
# 3. Record daily portfolio value
|
||
|
|
portfolio_value = portfolio.get_value(day_prices)
|
||
|
|
benchmark_value = benchmark_prices.get(trading_date, initial_benchmark)
|
||
|
|
normalized_benchmark = (
|
||
|
|
benchmark_value / initial_benchmark * backtest.initial_capital
|
||
|
|
)
|
||
|
|
|
||
|
|
equity_curve_data.append({
|
||
|
|
"date": trading_date,
|
||
|
|
"portfolio_value": portfolio_value,
|
||
|
|
"benchmark_value": normalized_benchmark,
|
||
|
|
})
|
||
|
|
|
||
|
|
# Calculate and save results
|
||
|
|
portfolio_values = [Decimal(str(e["portfolio_value"])) for e in equity_curve_data]
|
||
|
|
benchmark_values = [Decimal(str(e["benchmark_value"])) for e in equity_curve_data]
|
||
|
|
|
||
|
|
metrics = MetricsCalculator.calculate_all(portfolio_values, benchmark_values)
|
||
|
|
drawdowns = MetricsCalculator.calculate_drawdown_series(portfolio_values)
|
||
|
|
|
||
|
|
self._save_results(
|
||
|
|
backtest_id=backtest_id,
|
||
|
|
metrics=metrics,
|
||
|
|
equity_curve_data=equity_curve_data,
|
||
|
|
drawdowns=drawdowns,
|
||
|
|
transactions=all_transactions,
|
||
|
|
)
|
||
|
|
|
||
|
|
def _get_trading_days(self, start_date: date, end_date: date) -> List[date]:
|
||
|
|
prices = (
|
||
|
|
self.db.query(Price.date)
|
||
|
|
.filter(Price.date >= start_date, Price.date <= end_date)
|
||
|
|
.distinct()
|
||
|
|
.order_by(Price.date)
|
||
|
|
.all()
|
||
|
|
)
|
||
|
|
return [p[0] for p in prices]
|
||
|
|
|
||
|
|
def _get_universe_tickers(self) -> List[str]:
|
||
|
|
"""Get top 30 stocks by market cap."""
|
||
|
|
stocks = (
|
||
|
|
self.db.query(Stock)
|
||
|
|
.filter(Stock.market_cap.isnot(None))
|
||
|
|
.order_by(Stock.market_cap.desc())
|
||
|
|
.limit(30)
|
||
|
|
.all()
|
||
|
|
)
|
||
|
|
return [s.ticker for s in stocks]
|
||
|
|
|
||
|
|
def _load_price_data(
|
||
|
|
self, tickers: List[str], start_date: date, end_date: date,
|
||
|
|
) -> List:
|
||
|
|
"""Load all price data for universe."""
|
||
|
|
return (
|
||
|
|
self.db.query(Price)
|
||
|
|
.filter(Price.ticker.in_(tickers))
|
||
|
|
.filter(Price.date >= start_date, Price.date <= end_date)
|
||
|
|
.all()
|
||
|
|
)
|
||
|
|
|
||
|
|
def _load_kospi_data(self, start_date: date, end_date: date) -> pd.DataFrame:
|
||
|
|
"""Load KOSPI proxy (KODEX 200) as DataFrame."""
|
||
|
|
prices = (
|
||
|
|
self.db.query(Price)
|
||
|
|
.filter(Price.ticker == "069500")
|
||
|
|
.filter(Price.date >= start_date, Price.date <= end_date)
|
||
|
|
.order_by(Price.date)
|
||
|
|
.all()
|
||
|
|
)
|
||
|
|
if not prices:
|
||
|
|
return pd.DataFrame(columns=["close"])
|
||
|
|
|
||
|
|
data = [{"date": p.date, "close": float(p.close)} for p in prices]
|
||
|
|
df = pd.DataFrame(data).set_index("date")
|
||
|
|
return df
|
||
|
|
|
||
|
|
def _load_benchmark_prices(
|
||
|
|
self, benchmark: str, start_date: date, end_date: date,
|
||
|
|
) -> Dict[date, Decimal]:
|
||
|
|
benchmark_ticker = "069500"
|
||
|
|
prices = (
|
||
|
|
self.db.query(Price)
|
||
|
|
.filter(Price.ticker == benchmark_ticker)
|
||
|
|
.filter(Price.date >= start_date, Price.date <= end_date)
|
||
|
|
.all()
|
||
|
|
)
|
||
|
|
return {p.date: p.close for p in prices}
|
||
|
|
|
||
|
|
def _build_stock_dfs(
|
||
|
|
self, price_data: List, tickers: List[str],
|
||
|
|
) -> Dict[str, pd.DataFrame]:
|
||
|
|
"""Build per-stock DataFrames from price data."""
|
||
|
|
ticker_rows: Dict[str, list] = {t: [] for t in tickers}
|
||
|
|
for p in price_data:
|
||
|
|
if p.ticker in ticker_rows:
|
||
|
|
ticker_rows[p.ticker].append({
|
||
|
|
"date": p.date,
|
||
|
|
"open": float(p.open),
|
||
|
|
"high": float(p.high),
|
||
|
|
"low": float(p.low),
|
||
|
|
"close": float(p.close),
|
||
|
|
"volume": int(p.volume),
|
||
|
|
})
|
||
|
|
|
||
|
|
result = {}
|
||
|
|
for ticker, rows in ticker_rows.items():
|
||
|
|
if rows:
|
||
|
|
df = pd.DataFrame(rows).set_index("date").sort_index()
|
||
|
|
result[ticker] = df
|
||
|
|
return result
|
||
|
|
|
||
|
|
def _get_day_prices(
|
||
|
|
self, price_data: List, trading_date: date,
|
||
|
|
) -> Dict[str, Decimal]:
|
||
|
|
"""Get prices for a specific day from preloaded data."""
|
||
|
|
return {
|
||
|
|
p.ticker: p.close
|
||
|
|
for p in price_data
|
||
|
|
if p.date == trading_date
|
||
|
|
}
|
||
|
|
|
||
|
|
def _get_stock_names(self) -> Dict[str, str]:
|
||
|
|
stocks = self.db.query(Stock).all()
|
||
|
|
return {s.ticker: s.name for s in stocks}
|
||
|
|
|
||
|
|
def _save_results(
|
||
|
|
self,
|
||
|
|
backtest_id: int,
|
||
|
|
metrics,
|
||
|
|
equity_curve_data: List[Dict],
|
||
|
|
drawdowns: List[Decimal],
|
||
|
|
transactions: List,
|
||
|
|
) -> None:
|
||
|
|
"""Save results to DB (same format as BacktestEngine)."""
|
||
|
|
result = BacktestResult(
|
||
|
|
backtest_id=backtest_id,
|
||
|
|
total_return=metrics.total_return,
|
||
|
|
cagr=metrics.cagr,
|
||
|
|
mdd=metrics.mdd,
|
||
|
|
sharpe_ratio=metrics.sharpe_ratio,
|
||
|
|
volatility=metrics.volatility,
|
||
|
|
benchmark_return=metrics.benchmark_return,
|
||
|
|
excess_return=metrics.excess_return,
|
||
|
|
)
|
||
|
|
self.db.add(result)
|
||
|
|
|
||
|
|
for i, point in enumerate(equity_curve_data):
|
||
|
|
curve_point = BacktestEquityCurve(
|
||
|
|
backtest_id=backtest_id,
|
||
|
|
date=point["date"],
|
||
|
|
portfolio_value=point["portfolio_value"],
|
||
|
|
benchmark_value=point["benchmark_value"],
|
||
|
|
drawdown=drawdowns[i] if i < len(drawdowns) else Decimal("0"),
|
||
|
|
)
|
||
|
|
self.db.add(curve_point)
|
||
|
|
|
||
|
|
for trading_date, txn in transactions:
|
||
|
|
t = BacktestTransaction(
|
||
|
|
backtest_id=backtest_id,
|
||
|
|
date=trading_date,
|
||
|
|
ticker=txn.ticker,
|
||
|
|
action=txn.action,
|
||
|
|
shares=txn.shares,
|
||
|
|
price=txn.price,
|
||
|
|
commission=txn.commission,
|
||
|
|
)
|
||
|
|
self.db.add(t)
|
||
|
|
|
||
|
|
self.db.commit()
|
||
|
|
```
|
||
|
|
|
||
|
|
**Step 2: Register in backtest __init__.py**
|
||
|
|
|
||
|
|
Add to `backend/app/services/backtest/__init__.py`:
|
||
|
|
```python
|
||
|
|
from app.services.backtest.daily_engine import DailyBacktestEngine
|
||
|
|
from app.services.backtest.trading_portfolio import TradingPortfolio, TradingTransaction
|
||
|
|
```
|
||
|
|
Add to `__all__`: `"DailyBacktestEngine", "TradingPortfolio", "TradingTransaction"`.
|
||
|
|
|
||
|
|
**Step 3: Commit**
|
||
|
|
|
||
|
|
```bash
|
||
|
|
git add backend/app/services/backtest/daily_engine.py backend/app/services/backtest/__init__.py
|
||
|
|
git commit -m "feat: add DailyBacktestEngine for KJB signal-based backtesting"
|
||
|
|
```
|
||
|
|
|
||
|
|
---
|
||
|
|
|
||
|
|
### Task 7: Wire KJB into Backtest System
|
||
|
|
|
||
|
|
Connect KJB strategy to the existing backtest worker so it can be triggered via the backtest API.
|
||
|
|
|
||
|
|
**Files:**
|
||
|
|
- Modify: `backend/app/services/backtest/worker.py`
|
||
|
|
- Modify: `backend/app/schemas/backtest.py`
|
||
|
|
|
||
|
|
**Step 1: Update worker to route KJB to DailyBacktestEngine**
|
||
|
|
|
||
|
|
In `backend/app/services/backtest/worker.py`, modify `_run_backtest_job`:
|
||
|
|
|
||
|
|
Replace the engine instantiation section. After `engine = BacktestEngine(db)` / `engine.run(backtest_id)`, change to:
|
||
|
|
|
||
|
|
```python
|
||
|
|
# In _run_backtest_job, replace:
|
||
|
|
# engine = BacktestEngine(db)
|
||
|
|
# engine.run(backtest_id)
|
||
|
|
# With:
|
||
|
|
backtest = db.query(Backtest).get(backtest_id)
|
||
|
|
if backtest.strategy_type == "kjb":
|
||
|
|
from app.services.backtest.daily_engine import DailyBacktestEngine
|
||
|
|
engine = DailyBacktestEngine(db)
|
||
|
|
else:
|
||
|
|
engine = BacktestEngine(db)
|
||
|
|
engine.run(backtest_id)
|
||
|
|
```
|
||
|
|
|
||
|
|
**Step 2: Update BacktestCreate schema**
|
||
|
|
|
||
|
|
In `backend/app/schemas/backtest.py`, update `BacktestCreate.strategy_type` description:
|
||
|
|
```python
|
||
|
|
strategy_type: str = Field(..., description="multi_factor, quality, value_momentum, or kjb")
|
||
|
|
```
|
||
|
|
|
||
|
|
**Step 3: Commit**
|
||
|
|
|
||
|
|
```bash
|
||
|
|
git add backend/app/services/backtest/worker.py backend/app/schemas/backtest.py
|
||
|
|
git commit -m "feat: wire KJB strategy into backtest worker with DailyBacktestEngine"
|
||
|
|
```
|
||
|
|
|
||
|
|
---
|
||
|
|
|
||
|
|
### Task 8: Signal API Endpoints
|
||
|
|
|
||
|
|
**Files:**
|
||
|
|
- Create: `backend/app/api/signal.py`
|
||
|
|
- Modify: `backend/app/api/__init__.py`
|
||
|
|
|
||
|
|
**Step 1: Create signal API**
|
||
|
|
|
||
|
|
```python
|
||
|
|
# backend/app/api/signal.py
|
||
|
|
"""
|
||
|
|
KJB Signal API endpoints.
|
||
|
|
"""
|
||
|
|
from datetime import date
|
||
|
|
from typing import List, Optional
|
||
|
|
|
||
|
|
from fastapi import APIRouter, Depends, Query
|
||
|
|
from sqlalchemy.orm import Session
|
||
|
|
|
||
|
|
from app.core.database import get_db
|
||
|
|
from app.api.deps import CurrentUser
|
||
|
|
from app.models.signal import Signal, SignalStatus
|
||
|
|
from app.schemas.signal import SignalResponse
|
||
|
|
|
||
|
|
router = APIRouter(prefix="/api/signal", tags=["signal"])
|
||
|
|
|
||
|
|
|
||
|
|
@router.get("/kjb/today", response_model=List[SignalResponse])
|
||
|
|
async def get_today_signals(
|
||
|
|
current_user: CurrentUser,
|
||
|
|
db: Session = Depends(get_db),
|
||
|
|
):
|
||
|
|
"""Get today's KJB trading signals."""
|
||
|
|
today = date.today()
|
||
|
|
signals = (
|
||
|
|
db.query(Signal)
|
||
|
|
.filter(Signal.date == today)
|
||
|
|
.order_by(Signal.signal_type, Signal.ticker)
|
||
|
|
.all()
|
||
|
|
)
|
||
|
|
return signals
|
||
|
|
|
||
|
|
|
||
|
|
@router.get("/kjb/history", response_model=List[SignalResponse])
|
||
|
|
async def get_signal_history(
|
||
|
|
current_user: CurrentUser,
|
||
|
|
db: Session = Depends(get_db),
|
||
|
|
start_date: Optional[date] = Query(None),
|
||
|
|
end_date: Optional[date] = Query(None),
|
||
|
|
ticker: Optional[str] = Query(None),
|
||
|
|
limit: int = Query(100, ge=1, le=1000),
|
||
|
|
):
|
||
|
|
"""Get historical KJB signals."""
|
||
|
|
query = db.query(Signal)
|
||
|
|
|
||
|
|
if start_date:
|
||
|
|
query = query.filter(Signal.date >= start_date)
|
||
|
|
if end_date:
|
||
|
|
query = query.filter(Signal.date <= end_date)
|
||
|
|
if ticker:
|
||
|
|
query = query.filter(Signal.ticker == ticker)
|
||
|
|
|
||
|
|
signals = (
|
||
|
|
query.order_by(Signal.date.desc(), Signal.ticker)
|
||
|
|
.limit(limit)
|
||
|
|
.all()
|
||
|
|
)
|
||
|
|
return signals
|
||
|
|
```
|
||
|
|
|
||
|
|
**Step 2: Register router in api/__init__.py**
|
||
|
|
|
||
|
|
Add to `backend/app/api/__init__.py`:
|
||
|
|
```python
|
||
|
|
from app.api.signal import router as signal_router
|
||
|
|
```
|
||
|
|
Add `"signal_router"` to `__all__`.
|
||
|
|
|
||
|
|
**Step 3: Register in main app**
|
||
|
|
|
||
|
|
Check `backend/app/main.py` for how routers are included and add `signal_router` following the same pattern.
|
||
|
|
|
||
|
|
**Step 4: Commit**
|
||
|
|
|
||
|
|
```bash
|
||
|
|
git add backend/app/api/signal.py backend/app/api/__init__.py backend/app/main.py
|
||
|
|
git commit -m "feat: add Signal API endpoints for KJB daily signals"
|
||
|
|
```
|
||
|
|
|
||
|
|
---
|
||
|
|
|
||
|
|
### Task 9: KJB Signal Scheduler Job
|
||
|
|
|
||
|
|
**Files:**
|
||
|
|
- Create: `backend/jobs/kjb_signal_job.py`
|
||
|
|
- Modify: `backend/jobs/scheduler.py`
|
||
|
|
- Modify: `backend/jobs/__init__.py`
|
||
|
|
|
||
|
|
**Step 1: Create signal job**
|
||
|
|
|
||
|
|
```python
|
||
|
|
# backend/jobs/kjb_signal_job.py
|
||
|
|
"""
|
||
|
|
Daily KJB signal generation job.
|
||
|
|
Runs after market data collection to generate buy/sell signals.
|
||
|
|
"""
|
||
|
|
import logging
|
||
|
|
from datetime import date, timedelta
|
||
|
|
|
||
|
|
import pandas as pd
|
||
|
|
from sqlalchemy.orm import Session
|
||
|
|
|
||
|
|
from app.core.database import SessionLocal
|
||
|
|
from app.models.stock import Stock, Price
|
||
|
|
from app.models.signal import Signal, SignalType, SignalStatus
|
||
|
|
from app.services.strategy.kjb import KJBSignalGenerator
|
||
|
|
|
||
|
|
logger = logging.getLogger(__name__)
|
||
|
|
|
||
|
|
|
||
|
|
def run_kjb_signals():
|
||
|
|
"""
|
||
|
|
Generate KJB trading signals for today.
|
||
|
|
Called by scheduler at 18:15 KST (after price collection).
|
||
|
|
"""
|
||
|
|
logger.info("Starting KJB signal generation")
|
||
|
|
db: Session = SessionLocal()
|
||
|
|
|
||
|
|
try:
|
||
|
|
today = date.today()
|
||
|
|
signal_gen = KJBSignalGenerator()
|
||
|
|
|
||
|
|
# Get universe: top 30 by market cap
|
||
|
|
stocks = (
|
||
|
|
db.query(Stock)
|
||
|
|
.filter(Stock.market_cap.isnot(None))
|
||
|
|
.order_by(Stock.market_cap.desc())
|
||
|
|
.limit(30)
|
||
|
|
.all()
|
||
|
|
)
|
||
|
|
tickers = [s.ticker for s in stocks]
|
||
|
|
name_map = {s.ticker: s.name for s in stocks}
|
||
|
|
|
||
|
|
# Load KOSPI proxy data (last 60 days for indicator calculation)
|
||
|
|
lookback_start = today - timedelta(days=90)
|
||
|
|
kospi_prices = (
|
||
|
|
db.query(Price)
|
||
|
|
.filter(Price.ticker == "069500")
|
||
|
|
.filter(Price.date >= lookback_start, Price.date <= today)
|
||
|
|
.order_by(Price.date)
|
||
|
|
.all()
|
||
|
|
)
|
||
|
|
if not kospi_prices:
|
||
|
|
logger.warning("No KOSPI data available for signal generation")
|
||
|
|
return
|
||
|
|
|
||
|
|
kospi_df = pd.DataFrame([
|
||
|
|
{"date": p.date, "close": float(p.close)}
|
||
|
|
for p in kospi_prices
|
||
|
|
]).set_index("date")
|
||
|
|
|
||
|
|
signals_created = 0
|
||
|
|
|
||
|
|
for ticker in tickers:
|
||
|
|
# Load stock price data
|
||
|
|
stock_prices = (
|
||
|
|
db.query(Price)
|
||
|
|
.filter(Price.ticker == ticker)
|
||
|
|
.filter(Price.date >= lookback_start, Price.date <= today)
|
||
|
|
.order_by(Price.date)
|
||
|
|
.all()
|
||
|
|
)
|
||
|
|
|
||
|
|
if len(stock_prices) < 21:
|
||
|
|
continue
|
||
|
|
|
||
|
|
stock_df = pd.DataFrame([{
|
||
|
|
"date": p.date,
|
||
|
|
"open": float(p.open),
|
||
|
|
"high": float(p.high),
|
||
|
|
"low": float(p.low),
|
||
|
|
"close": float(p.close),
|
||
|
|
"volume": int(p.volume),
|
||
|
|
} for p in stock_prices]).set_index("date")
|
||
|
|
|
||
|
|
# Generate signals
|
||
|
|
signals = signal_gen.generate_signals(stock_df, kospi_df)
|
||
|
|
|
||
|
|
if today in signals.index and signals.loc[today, "buy"]:
|
||
|
|
close_price = stock_df.loc[today, "close"]
|
||
|
|
reason_parts = []
|
||
|
|
if signals.loc[today, "breakout"]:
|
||
|
|
reason_parts.append("breakout")
|
||
|
|
if signals.loc[today, "large_candle"]:
|
||
|
|
reason_parts.append("large_candle")
|
||
|
|
|
||
|
|
signal = Signal(
|
||
|
|
date=today,
|
||
|
|
ticker=ticker,
|
||
|
|
name=name_map.get(ticker),
|
||
|
|
signal_type=SignalType.BUY,
|
||
|
|
entry_price=close_price,
|
||
|
|
target_price=round(close_price * 1.05, 2),
|
||
|
|
stop_loss_price=round(close_price * 0.97, 2),
|
||
|
|
reason=", ".join(reason_parts),
|
||
|
|
status=SignalStatus.ACTIVE,
|
||
|
|
)
|
||
|
|
db.add(signal)
|
||
|
|
signals_created += 1
|
||
|
|
|
||
|
|
db.commit()
|
||
|
|
logger.info(f"KJB signal generation complete: {signals_created} buy signals")
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
logger.exception(f"KJB signal generation failed: {e}")
|
||
|
|
finally:
|
||
|
|
db.close()
|
||
|
|
```
|
||
|
|
|
||
|
|
**Step 2: Register in scheduler**
|
||
|
|
|
||
|
|
Add to `backend/jobs/scheduler.py` after existing job configurations:
|
||
|
|
|
||
|
|
```python
|
||
|
|
from jobs.kjb_signal_job import run_kjb_signals
|
||
|
|
|
||
|
|
# In configure_jobs(), add:
|
||
|
|
scheduler.add_job(
|
||
|
|
run_kjb_signals,
|
||
|
|
trigger=CronTrigger(
|
||
|
|
hour=18,
|
||
|
|
minute=15,
|
||
|
|
day_of_week='mon-fri',
|
||
|
|
timezone=KST,
|
||
|
|
),
|
||
|
|
id='kjb_daily_signals',
|
||
|
|
name='Generate KJB trading signals',
|
||
|
|
replace_existing=True,
|
||
|
|
)
|
||
|
|
logger.info("Configured kjb_daily_signals job at 18:15 KST")
|
||
|
|
```
|
||
|
|
|
||
|
|
**Step 3: Update jobs/__init__.py**
|
||
|
|
|
||
|
|
Add to `backend/jobs/__init__.py`:
|
||
|
|
```python
|
||
|
|
from jobs.kjb_signal_job import run_kjb_signals
|
||
|
|
```
|
||
|
|
Add `"run_kjb_signals"` to `__all__`.
|
||
|
|
|
||
|
|
**Step 4: Commit**
|
||
|
|
|
||
|
|
```bash
|
||
|
|
git add backend/jobs/kjb_signal_job.py backend/jobs/scheduler.py backend/jobs/__init__.py
|
||
|
|
git commit -m "feat: add KJB daily signal generation scheduler job"
|
||
|
|
```
|
||
|
|
|
||
|
|
---
|
||
|
|
|
||
|
|
### Task 10: E2E Tests
|
||
|
|
|
||
|
|
**Files:**
|
||
|
|
- Create: `backend/tests/e2e/test_kjb_flow.py`
|
||
|
|
|
||
|
|
**Step 1: Write E2E tests**
|
||
|
|
|
||
|
|
```python
|
||
|
|
# backend/tests/e2e/test_kjb_flow.py
|
||
|
|
"""
|
||
|
|
E2E tests for KJB strategy flow.
|
||
|
|
"""
|
||
|
|
from fastapi.testclient import TestClient
|
||
|
|
|
||
|
|
|
||
|
|
def test_kjb_strategy_endpoint(client: TestClient, auth_headers):
|
||
|
|
"""Test KJB strategy ranking endpoint."""
|
||
|
|
response = client.post(
|
||
|
|
"/api/strategy/kjb",
|
||
|
|
json={
|
||
|
|
"universe": {"markets": ["KOSPI"]},
|
||
|
|
"top_n": 10,
|
||
|
|
},
|
||
|
|
headers=auth_headers,
|
||
|
|
)
|
||
|
|
assert response.status_code in [200, 400, 500]
|
||
|
|
if response.status_code == 200:
|
||
|
|
data = response.json()
|
||
|
|
assert data["strategy_name"] == "kjb"
|
||
|
|
assert "stocks" in data
|
||
|
|
|
||
|
|
|
||
|
|
def test_kjb_backtest_creation(client: TestClient, auth_headers):
|
||
|
|
"""Test creating a KJB backtest."""
|
||
|
|
response = client.post(
|
||
|
|
"/api/backtest",
|
||
|
|
json={
|
||
|
|
"strategy_type": "kjb",
|
||
|
|
"strategy_params": {
|
||
|
|
"max_positions": 10,
|
||
|
|
"cash_reserve_ratio": 0.3,
|
||
|
|
"stop_loss_pct": 0.03,
|
||
|
|
"target1_pct": 0.05,
|
||
|
|
"target2_pct": 0.10,
|
||
|
|
},
|
||
|
|
"start_date": "2023-01-01",
|
||
|
|
"end_date": "2023-12-31",
|
||
|
|
"initial_capital": 10000000,
|
||
|
|
"commission_rate": 0.00015,
|
||
|
|
"slippage_rate": 0.001,
|
||
|
|
"benchmark": "KOSPI",
|
||
|
|
"top_n": 30,
|
||
|
|
},
|
||
|
|
headers=auth_headers,
|
||
|
|
)
|
||
|
|
assert response.status_code == 200
|
||
|
|
data = response.json()
|
||
|
|
assert "id" in data
|
||
|
|
assert data["status"] == "pending"
|
||
|
|
|
||
|
|
|
||
|
|
def test_signal_today_endpoint(client: TestClient, auth_headers):
|
||
|
|
"""Test today's signals endpoint."""
|
||
|
|
response = client.get("/api/signal/kjb/today", headers=auth_headers)
|
||
|
|
assert response.status_code == 200
|
||
|
|
assert isinstance(response.json(), list)
|
||
|
|
|
||
|
|
|
||
|
|
def test_signal_history_endpoint(client: TestClient, auth_headers):
|
||
|
|
"""Test signal history endpoint."""
|
||
|
|
response = client.get(
|
||
|
|
"/api/signal/kjb/history",
|
||
|
|
params={"limit": 10},
|
||
|
|
headers=auth_headers,
|
||
|
|
)
|
||
|
|
assert response.status_code == 200
|
||
|
|
assert isinstance(response.json(), list)
|
||
|
|
|
||
|
|
|
||
|
|
def test_signal_requires_auth(client: TestClient):
|
||
|
|
"""Test that signal endpoints require authentication."""
|
||
|
|
response = client.get("/api/signal/kjb/today")
|
||
|
|
assert response.status_code == 401
|
||
|
|
```
|
||
|
|
|
||
|
|
**Step 2: Run tests**
|
||
|
|
|
||
|
|
Run: `cd /home/zephyrdark/workspace/quant/galaxy-po/backend && python -m pytest tests/e2e/test_kjb_flow.py -v`
|
||
|
|
Expected: All PASS
|
||
|
|
|
||
|
|
**Step 3: Commit**
|
||
|
|
|
||
|
|
```bash
|
||
|
|
git add backend/tests/e2e/test_kjb_flow.py
|
||
|
|
git commit -m "test: add E2E tests for KJB strategy, backtest, and signal endpoints"
|
||
|
|
```
|
||
|
|
|
||
|
|
---
|
||
|
|
|
||
|
|
### Task 11: Frontend - KJB Signal Dashboard
|
||
|
|
|
||
|
|
**Files:**
|
||
|
|
- Create: `frontend/src/app/signals/page.tsx`
|
||
|
|
|
||
|
|
**Step 1: Explore existing frontend page patterns**
|
||
|
|
|
||
|
|
Read an existing page (e.g., `frontend/src/app/backtest/page.tsx`) to understand the component patterns, API client usage, and layout structure used in this project.
|
||
|
|
|
||
|
|
**Step 2: Create signals page**
|
||
|
|
|
||
|
|
Build the KJB signals dashboard page following the existing frontend patterns:
|
||
|
|
- Fetch today's signals from `GET /api/signal/kjb/today`
|
||
|
|
- Display buy signals as cards showing ticker, name, entry price, target, stop-loss
|
||
|
|
- Add a history table fetching from `GET /api/signal/kjb/history`
|
||
|
|
- Use the same layout, styling, and API client patterns as existing pages
|
||
|
|
|
||
|
|
**Step 3: Add navigation link**
|
||
|
|
|
||
|
|
Add "Signals" or "KJB 신호" to the navigation/sidebar following the existing navigation pattern.
|
||
|
|
|
||
|
|
**Step 4: Commit**
|
||
|
|
|
||
|
|
```bash
|
||
|
|
git add frontend/src/app/signals/
|
||
|
|
git commit -m "feat: add KJB signal dashboard frontend page"
|
||
|
|
```
|
||
|
|
|
||
|
|
---
|
||
|
|
|
||
|
|
### Task 12: Final Integration & Smoke Test
|
||
|
|
|
||
|
|
**Step 1: Run all tests**
|
||
|
|
|
||
|
|
Run: `cd /home/zephyrdark/workspace/quant/galaxy-po/backend && python -m pytest tests/ -v`
|
||
|
|
Expected: All PASS
|
||
|
|
|
||
|
|
**Step 2: Start the application and verify**
|
||
|
|
|
||
|
|
Run: `cd /home/zephyrdark/workspace/quant/galaxy-po && docker compose up -d`
|
||
|
|
|
||
|
|
Verify:
|
||
|
|
- `POST /api/strategy/kjb` returns ranked stocks
|
||
|
|
- `POST /api/backtest` with `strategy_type: "kjb"` creates and starts a backtest
|
||
|
|
- `GET /api/signal/kjb/today` returns (possibly empty) signal list
|
||
|
|
- `GET /api/signal/kjb/history` returns (possibly empty) signal list
|
||
|
|
- Frontend signals page loads correctly
|
||
|
|
|
||
|
|
**Step 3: Final commit**
|
||
|
|
|
||
|
|
```bash
|
||
|
|
git add -A
|
||
|
|
git commit -m "feat: complete KJB strategy full system implementation"
|
||
|
|
```
|