galaxis-po/backend/tests/unit/test_kospi_screener.py
머니페니 0b66eae847
All checks were successful
Deploy to Production / deploy (push) Successful in 3m5s
feat: improve KJB screening workflow
2026-05-14 07:52:28 +09:00

431 lines
16 KiB
Python

import pytest
import pandas as pd
import numpy as np
from datetime import date, timedelta
from app.services.strategy.kospi_screener import (
MarketState, KOSPIMarketStateDetector, VolumeScreener, StockInfo,
ScreeningResult, SectorPortfolioManager, KJBScreeningSignalGenerator,
)
def _make_daily_index(start_date, n_days):
"""Create business day index."""
return pd.bdate_range(start=start_date, periods=n_days)
def _make_kospi_df(n_days=100, base_price=2500, trend="flat", crash_last=False):
"""Generate synthetic KOSPI index data."""
idx = _make_daily_index("2025-01-01", n_days)
if trend == "up":
prices = base_price + np.linspace(0, 300, n_days) + np.random.randn(n_days) * 5
elif trend == "down":
prices = base_price - np.linspace(0, 300, n_days) + np.random.randn(n_days) * 5
else:
prices = base_price + np.random.randn(n_days) * 5
if crash_last and len(prices) >= 2:
prices[-1] = prices[-2] * 0.97 # -3% crash
df = pd.DataFrame({
"open": prices * 0.999,
"high": prices * 1.005,
"low": prices * 0.995,
"close": prices,
"volume": np.random.randint(1_000_000, 10_000_000, n_days),
}, index=idx)
df.index.name = "date"
return df
def _make_stock_df(target_date, n_days=260, base_price=50000, limit_up_on_target=False, big_volume_on_target=False):
"""Generate synthetic stock price data."""
idx = _make_daily_index(target_date - timedelta(days=int(n_days * 1.5)), n_days)
# Make sure target_date is in the index by extending if needed
if target_date not in idx:
extra = pd.bdate_range(start=idx[-1] + timedelta(days=1), periods=20)
idx = idx.append(extra)
idx = idx[:n_days + 20]
prices = base_price + np.random.randn(len(idx)) * 500
volumes = np.random.randint(100_000, 500_000, len(idx))
df = pd.DataFrame({
"open": prices * 0.999,
"high": prices * 1.005,
"low": prices * 0.995,
"close": prices,
"volume": volumes,
"trading_value": prices * volumes,
}, index=idx)
df.index.name = "date"
if target_date in df.index:
loc = df.index.get_loc(target_date)
if limit_up_on_target and loc > 0:
prev_close = df.iloc[loc - 1]["close"]
df.iloc[loc, df.columns.get_loc("close")] = prev_close * 1.30 # +30% limit up
df.iloc[loc, df.columns.get_loc("high")] = prev_close * 1.30
if big_volume_on_target:
df.iloc[loc, df.columns.get_loc("volume")] = 5_000_000
df.iloc[loc, df.columns.get_loc("trading_value")] = df.iloc[loc]["close"] * 5_000_000
return df
class TestMarketState:
def test_market_state_bull(self):
"""Uptrend: close > 20MA, weekly 5MA > 20MA"""
df = _make_kospi_df(n_days=120, base_price=2500, trend="up")
detector = KOSPIMarketStateDetector()
result = detector.detect(df)
assert result == MarketState.bull
def test_market_state_bear(self):
"""Downtrend: close < 20MA, weekly 5MA < 20MA"""
df = _make_kospi_df(n_days=120, base_price=2800, trend="down")
detector = KOSPIMarketStateDetector()
result = detector.detect(df)
assert result == MarketState.bear
def test_market_state_crash(self):
"""Crash: daily return <= -2%"""
df = _make_kospi_df(n_days=100, base_price=2500, crash_last=True)
detector = KOSPIMarketStateDetector()
result = detector.detect(df)
assert result == MarketState.crash
class TestVolumeScreener:
def _make_test_data(self, target_date):
"""Create test price data and stock info for screening."""
# Stock A: limit up with big trading value (should pass)
stock_a_df = pd.DataFrame({
"open": [49000, 50000],
"high": [50000, 65000],
"low": [48500, 50000],
"close": [50000, 65000], # +30%
"volume": [100_000, 5_000_000],
"trading_value": [5_000_000_000, 300_000_000_000], # 3000억
}, index=pd.to_datetime([target_date - timedelta(days=1), target_date]))
stock_a_df.index.name = "date"
# Stock B: small trading value (should fail)
stock_b_df = pd.DataFrame({
"open": [10000, 10200],
"high": [10300, 10400],
"low": [9900, 10100],
"close": [10000, 10300], # +3%
"volume": [50_000, 60_000],
"trading_value": [500_000_000, 618_000_000], # 6.18억
}, index=pd.to_datetime([target_date - timedelta(days=1), target_date]))
stock_b_df.index.name = "date"
price_data = {"005930": stock_a_df, "000660": stock_b_df}
stock_info = {
"005930": StockInfo(ticker="005930", name="삼성전자", market_cap=500_000_000_000_000, sector="반도체"),
"000660": StockInfo(ticker="000660", name="SK하이닉스", market_cap=100_000_000_000_000, sector="반도체"),
}
return price_data, stock_info
def test_volume_screener_limit_up(self):
"""Limit up + big trading value should be detected."""
target_date = date(2025, 6, 2) # Monday
target_dt = pd.Timestamp(target_date)
stock_df = pd.DataFrame({
"open": [49000, 50000],
"high": [50000, 65000],
"low": [48500, 50000],
"close": [50000, 65000],
"volume": [100_000, 5_000_000],
"trading_value": [5_000_000_000, 300_000_000_000],
}, index=pd.to_datetime([target_date - timedelta(days=3), target_date]))
stock_df.index.name = "date"
price_data = {"005930": stock_df}
stock_info = {"005930": StockInfo("005930", "삼성전자", 5_000_000_000_000, "반도체")}
screener = VolumeScreener()
results = screener.screen(target_date, price_data, stock_info)
assert len(results) >= 1
assert results[0].is_limit_up == True
def test_volume_screener_frequency_filter(self):
"""Stock with trading_value >= threshold more than 2 times should be filtered out."""
target_date = date(2025, 6, 2)
n_days = 260
idx = _make_daily_index("2024-06-01", n_days)
prices = np.full(n_days, 50000.0)
volumes = np.full(n_days, 100_000)
trading_values = prices * volumes # 5B = well below threshold
# Make 5 days have huge trading value (> threshold) -> frequency = 5 > 2, should be filtered
for i in [50, 100, 150, 200, 250]:
if i < n_days:
trading_values[i] = 300_000_000_000 # 3000억
# Also make the last day have huge volume
if target_date in idx:
loc = idx.get_loc(target_date)
else:
idx = idx.append(pd.DatetimeIndex([target_date]))
prices = np.append(prices, 50000)
volumes = np.append(volumes, 5_000_000)
trading_values = np.append(trading_values, 300_000_000_000)
df = pd.DataFrame({
"open": prices * 0.999,
"high": prices * 1.005,
"low": prices * 0.995,
"close": prices,
"volume": volumes,
"trading_value": trading_values,
}, index=idx[:len(prices)])
df.index.name = "date"
price_data = {"005930": df}
stock_info = {"005930": StockInfo("005930", "삼성전자", 5_000_000_000_000, "반도체")}
screener = VolumeScreener()
results = screener.screen(target_date, price_data, stock_info)
assert len(results) == 0
def test_large_cap_exception(self):
"""Large cap (>10T) with less than 8% return should be filtered."""
target_date = date(2025, 6, 2)
stock_df = pd.DataFrame({
"open": [49000, 50000],
"high": [50500, 52000],
"low": [48500, 49800],
"close": [50000, 52000], # +4%, below 8% threshold
"volume": [100_000, 5_000_000],
"trading_value": [5_000_000_000, 300_000_000_000],
}, index=pd.to_datetime([target_date - timedelta(days=3), target_date]))
stock_df.index.name = "date"
price_data = {"005930": stock_df}
# market_cap = 15T > LARGE_CAP_THRESHOLD (10T)
stock_info = {"005930": StockInfo("005930", "삼성전자", 15_000_000_000_000, "반도체")}
screener = VolumeScreener()
results = screener.screen(target_date, price_data, stock_info)
assert len(results) == 0
class TestSectorAllocation:
def test_sector_allocation(self):
"""Test sector weight allocation."""
results = [
ScreeningResult("A", "주식A", 1_000_000_000_000, 50000, 0.1, 500_000_000_000, False, 1, "반도체", 2.5),
ScreeningResult("B", "주식B", 2_000_000_000_000, 30000, 0.05, 400_000_000_000, False, 1, "자동차", 2.0),
ScreeningResult("C", "주식C", 3_000_000_000_000, 70000, 0.08, 300_000_000_000, False, 1, "바이오", 1.5),
]
manager = SectorPortfolioManager()
allocations = manager.allocate(results, MarketState.bull, 100_000_000)
assert len(allocations) == 3
# First sector gets 40%
assert allocations[0].weight == pytest.approx(0.40)
# Second gets 30%
assert allocations[1].weight == pytest.approx(0.30)
# Third gets 20%
assert allocations[2].weight == pytest.approx(0.20)
# Amounts should be weight * investable_capital (90% of total)
investable = 100_000_000 * 0.90
assert allocations[0].amount == pytest.approx(investable * 0.40)
def test_crash_no_allocation(self):
"""Crash state should return empty allocation."""
results = [
ScreeningResult("A", "주식A", 1_000_000_000_000, 50000, 0.1, 500_000_000_000, False, 1, "반도체", 2.5),
]
manager = SectorPortfolioManager()
allocations = manager.allocate(results, MarketState.crash, 100_000_000)
assert len(allocations) == 0
class TestEntrySignal:
def test_entry_signal(self):
"""Entry signal when price bounces off 5MA after screen date."""
screen_date = date(2025, 6, 2)
idx = _make_daily_index("2025-05-20", 20)
prices = [50000] * len(idx)
# Create a pattern: screen_date has big move, then pullback to 5MA, then bounce
for i, d in enumerate(idx):
if d.date() < screen_date:
prices[i] = 50000 + i * 100
elif d.date() == screen_date:
prices[i] = 55000 # big move day
elif d.date() > screen_date:
days_after = (d.date() - screen_date).days
if days_after <= 2:
prices[i] = 53000 # pullback below 5MA
else:
prices[i] = 55500 # bounce above 5MA
df = pd.DataFrame({
"open": [p * 0.999 for p in prices],
"high": [p * 1.005 for p in prices],
"low": [p * 0.995 for p in prices],
"close": prices,
"volume": [1_000_000] * len(idx),
}, index=idx)
df.index.name = "date"
gen = KJBScreeningSignalGenerator()
result = gen.check_entry("005930", df, screen_date)
# Should find an entry within 2-5 days after screen_date
# Result may be None if the specific MA conditions aren't met with this data,
# but the function should at least not error
if result is not None:
assert "entry_date" in result
assert "entry_price" in result
assert "stop_price" in result
assert result["stop_price"] == df.loc[pd.Timestamp(screen_date), "low"]
class TestExitSignals:
def test_exit_stop_loss(self):
"""Stop loss when price drops to screen_date low."""
entry_price = 55000.0
stop_price = 50000.0
current_price = 49000.0 # below stop_price
df = pd.DataFrame({
"close": [55000, 54000, 52000, 49000],
"volume": [1_000_000, 1_000_000, 1_000_000, 1_000_000],
})
df.index.name = "date"
gen = KJBScreeningSignalGenerator()
result = gen.check_exit(entry_price, stop_price, current_price, df)
assert result is not None
assert result["exit_type"] == "stop_type_1"
assert result["sell_ratio"] == 1.0
def test_exit_max_loss(self):
"""Max loss -7% stop."""
entry_price = 55000.0
stop_price = 50000.0
current_price = 55000 * 0.92 # -8%, below -7% threshold
df = pd.DataFrame({
"close": [55000, 54000, 52000, current_price],
"volume": [1_000_000] * 4,
})
df.index.name = "date"
gen = KJBScreeningSignalGenerator()
result = gen.check_exit(entry_price, stop_price, current_price, df)
assert result is not None
assert result["exit_type"] == "stop_type_3"
assert result["sell_ratio"] == 1.0
def test_exit_take_profit(self):
"""Take profit at +10%: sell 50%."""
entry_price = 50000.0
stop_price = 45000.0
current_price = 56000.0 # +12%
df = pd.DataFrame({
"close": [50000, 52000, 54000, 56000],
"volume": [1_000_000] * 4,
})
df.index.name = "date"
gen = KJBScreeningSignalGenerator()
result = gen.check_exit(entry_price, stop_price, current_price, df)
assert result is not None
assert result["exit_type"] == "tp1"
assert result["sell_ratio"] == 0.5
def test_position_size(self):
"""Position size based on 2% risk."""
gen = KJBScreeningSignalGenerator()
qty = gen.calculate_position_size(
entry_price=50000,
stop_price=47000,
capital=100_000_000,
risk_pct=0.02,
)
# risk_per_share = 3000, allowed_risk = 2_000_000
# qty = 2_000_000 / 3000 = 666
assert qty == 666
def test_position_size_zero_risk(self):
"""Zero or negative risk should return 0."""
gen = KJBScreeningSignalGenerator()
assert gen.calculate_position_size(50000, 50000, 100_000_000) == 0
assert gen.calculate_position_size(50000, 55000, 100_000_000) == 0
class TestScreeningJobInputs:
def test_load_kospi_screening_inputs_uses_latest_sector(self, db):
"""DB 기반 운영 스크리닝 입력은 최신 섹터를 StockInfo에 반영한다."""
from app.models.stock import Price, Sector, Stock
from jobs.screening_job import _load_kospi_screening_inputs
target_date = date(2025, 6, 30)
db.add(Stock(
ticker="005930",
name="삼성전자",
market="KOSPI",
market_cap=500_000_000_000_000,
base_date=target_date,
))
db.add(Stock(
ticker="000660",
name="SK하이닉스",
market="KOSDAQ",
market_cap=100_000_000_000_000,
base_date=target_date,
))
db.add(Sector(
ticker="005930",
sector_code="G45",
company_name="삼성전자",
sector_name="반도체",
base_date=target_date,
))
for i, day in enumerate(pd.bdate_range(end=target_date, periods=25)):
db.add(Price(
ticker="005930",
date=day.date(),
open=50_000 + i,
high=51_000 + i,
low=49_000 + i,
close=50_500 + i,
volume=1_000_000,
trading_value=50_500_000_000 + i,
))
db.add(Price(
ticker="000660",
date=day.date(),
open=100_000 + i,
high=101_000 + i,
low=99_000 + i,
close=100_500 + i,
volume=1_000_000,
trading_value=100_500_000_000 + i,
))
db.commit()
price_data, stock_info = _load_kospi_screening_inputs(db, target_date)
assert set(price_data.keys()) == {"005930"}
assert stock_info["005930"].sector == "반도체"
assert pd.Timestamp(target_date) in price_data["005930"].index