galaxis-po/backend/app/services/strategy/kospi_screener.py
머니페니 34d09d9d34
Some checks failed
Deploy to Production / deploy (push) Failing after 6m46s
feat: 김종봉식 KOSPI 종목발굴 전략 구현
- KOSPIMarketStateDetector: KOSPI MA 기반 시장 상태 판단 (bull/neutral/bear/crash)
- VolumeScreener: 거래대금 2000억+ 스크리닝 (상한가 우선, 희소성 체크, 대형주 예외)
- SectorPortfolioManager: 섹터 기반 비중 배분
- KJBScreeningSignalGenerator: 눌림목 진입, 5MA 손절, 단계적 익절
- KISTradeExecutor: KIS API 자동 매수/매도 (기본값 모의투자)
- ScreeningSignal / AutoOrder DB 모델 추가
- screening API 엔드포인트 추가
- 스케줄러 잡 3종 추가 (08:30/5분/15:35)
- Price.trading_value 컬럼 추가
- MarketIndex 테이블 추가 (KOSPI/KOSDAQ 지수 일봉)
- IndexCollector 추가 (일일 수집 잡 등록)
- intraday_exit_check 시간 필터 추가 (09:05~15:20 KST)
- 드라이런 스크립트 추가 (scripts/screening_dryrun.py)
2026-05-05 23:03:53 +09:00

361 lines
10 KiB
Python

from enum import Enum
from dataclasses import dataclass
from typing import Dict, List, Optional
from datetime import date
import pandas as pd
import numpy as np
class MarketState(str, Enum):
bull = "bull"
neutral = "neutral"
bear = "bear"
crash = "crash"
class KOSPIMarketStateDetector:
def detect(self, kospi_df: pd.DataFrame) -> MarketState:
df = kospi_df.copy()
if "date" in df.columns and df.index.name != "date":
df = df.set_index("date")
df = df.sort_index()
latest_close = df["close"].iloc[-1]
daily_return = (latest_close / df["close"].iloc[-2]) - 1 if len(df) >= 2 else 0
if daily_return <= -0.02:
return MarketState.crash
ma_20 = df["close"].rolling(window=20).mean().iloc[-1]
weekly_df = df.resample("W-FRI").agg({
"open": "first",
"high": "max",
"low": "min",
"close": "last",
"volume": "sum"
}).dropna()
weekly_ma_5 = weekly_df["close"].rolling(window=5).mean().iloc[-1]
weekly_ma_20 = weekly_df["close"].rolling(window=20).mean().iloc[-1]
if latest_close > ma_20 and weekly_ma_5 > weekly_ma_20:
return MarketState.bull
elif latest_close < ma_20 and weekly_ma_5 < weekly_ma_20:
return MarketState.bear
else:
return MarketState.neutral
@dataclass
class ScreeningResult:
ticker: str
name: str
market_cap: int
close_price: float
daily_return: float
trading_value: float
is_limit_up: bool
frequency_count: int
sector: str
signal_strength: float
@dataclass
class StockInfo:
ticker: str
name: str
market_cap: int
sector: str
class VolumeScreener:
TRADING_VALUE_THRESHOLD = 200_000_000_000
LARGE_CAP_THRESHOLD = 10_000_000_000_000
def screen(
self,
target_date: date,
price_data: Dict[str, pd.DataFrame],
stock_info: Dict[str, StockInfo]
) -> List[ScreeningResult]:
results = []
for ticker, df in price_data.items():
if ticker not in stock_info:
continue
info = stock_info[ticker]
df = df.copy()
if "date" in df.columns and df.index.name != "date":
df = df.set_index("date")
df = df.sort_index()
df.index = pd.to_datetime(df.index)
target_ts = pd.Timestamp(target_date)
if target_ts not in df.index:
continue
target_idx = df.index.get_loc(target_ts)
if target_idx == 0:
continue
current_row = df.iloc[target_idx]
prev_row = df.iloc[target_idx - 1]
close_price = current_row["close"]
prev_close = prev_row["close"]
daily_return = (close_price / prev_close) - 1
if "trading_value" in df.columns:
trading_value = current_row["trading_value"]
else:
trading_value = close_price * current_row["volume"]
if trading_value < self.TRADING_VALUE_THRESHOLD:
continue
is_limit_up = daily_return >= 0.299
lookback_start = max(0, target_idx - 251)
lookback_df = df.iloc[lookback_start:target_idx + 1]
if "trading_value" in lookback_df.columns:
frequency_count = (lookback_df["trading_value"] >= self.TRADING_VALUE_THRESHOLD).sum()
else:
lookback_trading_values = lookback_df["close"] * lookback_df["volume"]
frequency_count = (lookback_trading_values >= self.TRADING_VALUE_THRESHOLD).sum()
if frequency_count > 2:
continue
if info.market_cap >= self.LARGE_CAP_THRESHOLD:
if daily_return < 0.08:
continue
signal_strength = (trading_value / self.TRADING_VALUE_THRESHOLD) * (2.0 if is_limit_up else 1.0)
results.append(ScreeningResult(
ticker=ticker,
name=info.name,
market_cap=info.market_cap,
close_price=close_price,
daily_return=daily_return,
trading_value=trading_value,
is_limit_up=is_limit_up,
frequency_count=frequency_count,
sector=info.sector,
signal_strength=signal_strength
))
results.sort(key=lambda x: (not x.is_limit_up, -x.daily_return if x.daily_return > 0 else 0, -x.trading_value))
sector_groups: Dict[str, List[ScreeningResult]] = {}
for result in results:
if result.sector not in sector_groups:
sector_groups[result.sector] = []
sector_groups[result.sector].append(result)
filtered_results = []
for sector, sector_results in sector_groups.items():
sector_results.sort(key=lambda x: -x.trading_value)
filtered_results.extend(sector_results[:2])
return filtered_results
@dataclass
class SectorAllocation:
sector: str
tickers: List[str]
weight: float
amount: float
class SectorPortfolioManager:
SECTOR_WEIGHTS = {1: 0.40, 2: 0.30, 3: 0.20}
CASH_RESERVE = 0.10
MAX_SECTORS = {"bull": 5, "neutral": 3, "bear": 3, "crash": 0}
def allocate(
self,
screening_results: List[ScreeningResult],
market_state: MarketState,
capital: float
) -> List[SectorAllocation]:
max_sectors = self.MAX_SECTORS[market_state.value]
if max_sectors == 0:
return []
sector_groups: Dict[str, List[ScreeningResult]] = {}
for result in screening_results:
if result.sector not in sector_groups:
sector_groups[result.sector] = []
sector_groups[result.sector].append(result)
sector_rankings = []
for sector, results in sector_groups.items():
total_trading_value = sum(r.trading_value for r in results)
sector_rankings.append((sector, total_trading_value, results))
sector_rankings.sort(key=lambda x: -x[1])
top_sectors = sector_rankings[:max_sectors]
allocations = []
investable_capital = capital * (1 - self.CASH_RESERVE)
for idx, (sector, _, results) in enumerate(top_sectors, start=1):
if idx in self.SECTOR_WEIGHTS:
weight = self.SECTOR_WEIGHTS[idx]
else:
remaining_weight = 1.0 - sum(self.SECTOR_WEIGHTS.values())
remaining_sectors = max_sectors - len(self.SECTOR_WEIGHTS)
weight = remaining_weight / remaining_sectors if remaining_sectors > 0 else 0
amount = investable_capital * weight
tickers = [r.ticker for r in results]
allocations.append(SectorAllocation(
sector=sector,
tickers=tickers,
weight=weight,
amount=amount
))
return allocations
class KJBScreeningSignalGenerator:
def check_entry(
self,
ticker: str,
price_df: pd.DataFrame,
screen_date: date
) -> Optional[dict]:
df = price_df.copy()
if "date" in df.columns and df.index.name != "date":
df = df.set_index("date")
df = df.sort_index()
df.index = pd.to_datetime(df.index)
screen_ts = pd.Timestamp(screen_date)
if screen_ts not in df.index:
return None
screen_idx = df.index.get_loc(screen_ts)
screen_row = df.iloc[screen_idx]
stop_price = screen_row["low"]
df["ma_5"] = df["close"].rolling(window=5).mean()
for i in range(screen_idx + 1, min(screen_idx + 6, len(df))):
current_row = df.iloc[i]
prev_row = df.iloc[i - 1]
ma_5 = current_row["ma_5"]
if pd.isna(ma_5):
continue
close = current_row["close"]
prev_close = prev_row["close"]
prev_high = prev_row["high"]
prev_ma_5 = prev_row["ma_5"]
condition_1 = close >= ma_5
condition_2 = (prev_close < prev_ma_5 if not pd.isna(prev_ma_5) else False) or (close > prev_high)
if condition_1 and condition_2:
return {
"entry_date": df.index[i],
"entry_price": close,
"stop_price": stop_price
}
return None
def check_exit(
self,
entry_price: float,
stop_price: float,
current_price: float,
price_df: pd.DataFrame,
partial_sold: bool = False
) -> Optional[dict]:
df = price_df.copy()
if "date" in df.columns and df.index.name != "date":
df = df.set_index("date")
df = df.sort_index()
if current_price <= stop_price:
return {
"exit_type": "stop_type_1",
"exit_price": current_price,
"sell_ratio": 1.0
}
if current_price <= entry_price * 0.93:
return {
"exit_type": "stop_type_3",
"exit_price": current_price,
"sell_ratio": 1.0
}
df["ma_5"] = df["close"].rolling(window=5).mean()
df["volume_ma_20"] = df["volume"].rolling(window=20).mean()
current_row = df.iloc[-1]
ma_5 = current_row["ma_5"]
volume_ma_20 = current_row["volume_ma_20"]
if not pd.isna(ma_5) and not pd.isna(volume_ma_20):
if current_price < ma_5 and current_row["volume"] >= volume_ma_20 * 1.3:
return {
"exit_type": "stop_type_2",
"exit_price": current_price,
"sell_ratio": 1.0
}
if current_price >= entry_price * 1.10 and not partial_sold:
return {
"exit_type": "tp1",
"exit_price": current_price,
"sell_ratio": 0.5
}
if partial_sold and not pd.isna(ma_5) and current_price < ma_5:
return {
"exit_type": "tp2_trailing",
"exit_price": current_price,
"sell_ratio": 1.0
}
return None
def calculate_position_size(
self,
entry_price: float,
stop_price: float,
capital: float,
risk_pct: float = 0.02
) -> int:
risk_per_share = entry_price - stop_price
if risk_per_share <= 0:
return 0
allowed_risk = capital * risk_pct
qty = int(allowed_risk / risk_per_share)
return qty