galaxis-po/backend/app/api/screening.py
2026-05-24 21:08:03 +09:00

580 lines
20 KiB
Python

from datetime import date, timedelta
from decimal import Decimal
from typing import Any, List, Optional, cast
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.orm import Session
from app.core.database import get_db
from app.api.deps import CurrentUser
from app.models.screening import ScreeningSignal, AutoOrder
from app.models.stock import MarketIndex, Price, Sector, Stock
from app.schemas.screening import (
ScreeningSignalResponse, AutoOrderResponse, WatchlistItem, ScreeningSummary,
SectorStrongSignalResponse, RecentSectorCandidateResponse,
)
router = APIRouter(tags=["screening"])
TRADING_VALUE_THRESHOLD = Decimal("200000000000")
KOSPI_INDEX_CODE = "1001"
def _calculate_signal_strength(signal: ScreeningSignal) -> Decimal:
trading_value = Decimal(cast(Optional[int], signal.trading_value) or 0)
daily_return = Decimal(str(cast(Optional[Decimal], signal.daily_return) or 0))
limit_up_multiplier = Decimal("2") if cast(bool, signal.is_limit_up) else Decimal("1")
return (trading_value / TRADING_VALUE_THRESHOLD) * limit_up_multiplier * (Decimal("1") + max(daily_return, Decimal("0")))
def _price_trading_value(row: Price) -> int:
trading_value = cast(Optional[int], row.trading_value)
if trading_value is not None:
return trading_value
return int(cast(Decimal, row.close) * cast(int, row.volume))
def _kospi_return(db: Session, start_date: date, end_date: date) -> Decimal:
rows = (
db.query(MarketIndex)
.filter(
MarketIndex.code == KOSPI_INDEX_CODE,
MarketIndex.date >= start_date,
MarketIndex.date <= end_date,
)
.order_by(MarketIndex.date)
.all()
)
if len(rows) < 2:
return Decimal("0")
first_close = cast(Decimal, rows[0].close)
last_close = cast(Decimal, rows[-1].close)
if first_close <= 0:
return Decimal("0")
return (last_close / first_close) - Decimal("1")
def _score_recent_candidate(
*,
ticker: str,
name: Optional[str],
sector: str,
rows: list[Price],
kospi_return: Decimal,
) -> Optional[dict[str, Any]]:
if len(rows) < 2:
return None
ordered = sorted(rows, key=lambda row: cast(date, row.date))
first = ordered[0]
latest = ordered[-1]
previous = ordered[-2]
first_close = cast(Decimal, first.close)
latest_close = cast(Decimal, latest.close)
previous_close = cast(Decimal, previous.close)
if first_close <= 0 or previous_close <= 0:
return None
daily_return = (latest_close / previous_close) - Decimal("1")
one_month_return = (latest_close / first_close) - Decimal("1")
relative_strength = one_month_return - kospi_return
trading_value = _price_trading_value(latest)
recent_values = [_price_trading_value(row) for row in ordered[-20:]]
avg_trading_value_20 = int(sum(recent_values) / len(recent_values)) if recent_values else 0
trading_value_ratio = (
Decimal(trading_value) / Decimal(avg_trading_value_20)
if avg_trading_value_20 > 0
else Decimal("0")
)
recent_closes = [cast(Decimal, row.close) for row in ordered[-5:]]
ma5 = sum(recent_closes) / Decimal(len(recent_closes))
ma5_support = latest_close >= ma5
breakout = latest_close > cast(Decimal, previous.high)
score = (
max(relative_strength, Decimal("0")) * Decimal("100")
+ max(daily_return, Decimal("0")) * Decimal("50")
+ min(trading_value_ratio, Decimal("5")) * Decimal("10")
+ (Decimal(trading_value) / TRADING_VALUE_THRESHOLD) * Decimal("20")
+ (Decimal("15") if breakout else Decimal("0"))
+ (Decimal("10") if ma5_support else Decimal("0"))
)
return {
"ticker": ticker,
"name": name,
"sector": sector,
"latest_date": cast(date, latest.date),
"close_price": latest_close,
"daily_return": daily_return,
"one_month_return": one_month_return,
"relative_strength": relative_strength,
"trading_value": trading_value,
"avg_trading_value_20": avg_trading_value_20,
"trading_value_ratio": trading_value_ratio,
"ma5_support": ma5_support,
"breakout": breakout,
"score": score,
"is_stronger_than_source": False,
}
@router.get("/api/screening/today", response_model=List[ScreeningSignalResponse])
async def get_today_screening(
current_user: CurrentUser,
db: Session = Depends(get_db),
):
today = date.today()
signals = (
db.query(ScreeningSignal)
.filter(ScreeningSignal.screen_date == today)
.order_by(ScreeningSignal.ticker)
.all()
)
return signals
@router.get("/api/screening/history", response_model=List[ScreeningSignalResponse])
async def get_screening_history(
current_user: CurrentUser,
db: Session = Depends(get_db),
start_date: Optional[date] = Query(None),
end_date: Optional[date] = Query(None),
ticker: Optional[str] = Query(None),
status: Optional[str] = Query(None),
limit: int = Query(100, ge=1, le=1000),
):
query = db.query(ScreeningSignal)
if start_date:
query = query.filter(ScreeningSignal.screen_date >= start_date)
if end_date:
query = query.filter(ScreeningSignal.screen_date <= end_date)
if ticker:
query = query.filter(ScreeningSignal.ticker == ticker)
if status:
query = query.filter(ScreeningSignal.status == status)
signals = (
query.order_by(ScreeningSignal.screen_date.desc(), ScreeningSignal.ticker)
.limit(limit)
.all()
)
return signals
@router.get("/api/screening/watchlist", response_model=List[WatchlistItem])
async def get_watchlist(
current_user: CurrentUser,
db: Session = Depends(get_db),
):
signals = (
db.query(ScreeningSignal)
.filter(ScreeningSignal.status.in_(["pending", "watching"]))
.order_by(ScreeningSignal.screen_date.desc(), ScreeningSignal.ticker)
.all()
)
return signals
@router.get("/api/screening/sector-strongest", response_model=List[SectorStrongSignalResponse])
async def get_sector_strongest_signals(
current_user: CurrentUser,
db: Session = Depends(get_db),
target_date: Optional[date] = Query(None),
):
"""섹터별로 감지된 스크리닝 신호 중 가장 강한 매수 후보를 반환한다."""
screen_date = target_date or date.today()
signals = (
db.query(ScreeningSignal)
.filter(ScreeningSignal.screen_date == screen_date)
.all()
)
sector_groups: dict[str, list[ScreeningSignal]] = {}
for signal in signals:
sector = cast(Optional[str], signal.sector) or "미분류"
sector_groups.setdefault(sector, []).append(signal)
strongest: list[dict[str, Any]] = []
for sector, sector_signals in sector_groups.items():
ranked = sorted(
sector_signals,
key=lambda s: (
_calculate_signal_strength(s),
Decimal(str(cast(Optional[Decimal], s.daily_return) or 0)),
Decimal(cast(Optional[int], s.trading_value) or 0),
cast(str, s.ticker),
),
reverse=True,
)
leader = ranked[0]
strongest.append({
"sector": sector,
"signal_count": len(sector_signals),
"id": cast(int, leader.id),
"screen_date": cast(date, leader.screen_date),
"ticker": cast(str, leader.ticker),
"name": cast(Optional[str], leader.name),
"trading_value": cast(Optional[int], leader.trading_value),
"is_limit_up": cast(bool, leader.is_limit_up),
"daily_return": cast(Optional[Decimal], leader.daily_return),
"signal_strength": _calculate_signal_strength(leader),
"status": cast(str, leader.status),
})
strongest.sort(key=lambda item: (item["signal_strength"], item["sector"]), reverse=True)
return strongest
@router.get("/api/screening/recent-sector-candidates", response_model=List[RecentSectorCandidateResponse])
async def get_recent_sector_candidates(
current_user: CurrentUser,
db: Session = Depends(get_db),
as_of: Optional[date] = Query(None),
window_days: int = Query(30, ge=5, le=60),
limit_per_signal: int = Query(5, ge=1, le=20),
):
"""최근 KJB 매수 신호 섹터에서 더 강한 후보 종목을 찾는다."""
latest_price = (
db.query(Price)
.order_by(Price.date.desc())
.first()
)
end_date = as_of or (cast(date, latest_price.date) if latest_price else date.today())
start_date = end_date - timedelta(days=window_days)
kospi_return = _kospi_return(db, start_date, end_date)
signals = (
db.query(ScreeningSignal)
.filter(
ScreeningSignal.screen_date >= start_date,
ScreeningSignal.screen_date <= end_date,
)
.order_by(ScreeningSignal.screen_date.desc(), ScreeningSignal.ticker)
.all()
)
responses: list[dict[str, Any]] = []
for signal in signals:
signal_id = cast(int, signal.id)
source_ticker = cast(str, signal.ticker)
sector = cast(Optional[str], signal.sector)
if not sector:
continue
sector_rows = (
db.query(Sector)
.filter(Sector.sector_name == sector)
.all()
)
sector_tickers = sorted({cast(str, row.ticker) for row in sector_rows})
if source_ticker not in sector_tickers:
sector_tickers.append(source_ticker)
stock_rows = (
db.query(Stock)
.filter(Stock.ticker.in_(sector_tickers), Stock.market == "KOSPI")
.all()
)
stock_names = {cast(str, stock.ticker): cast(str, stock.name) for stock in stock_rows}
candidate_tickers = sorted(stock_names)
if source_ticker not in candidate_tickers:
candidate_tickers.append(source_ticker)
stock_names[source_ticker] = cast(Optional[str], signal.name) or source_ticker
price_rows = (
db.query(Price)
.filter(
Price.ticker.in_(candidate_tickers),
Price.date >= start_date,
Price.date <= end_date,
)
.order_by(Price.ticker, Price.date)
.all()
)
rows_by_ticker: dict[str, list[Price]] = {}
for row in price_rows:
rows_by_ticker.setdefault(cast(str, row.ticker), []).append(row)
source_metrics = _score_recent_candidate(
ticker=source_ticker,
name=stock_names.get(source_ticker, cast(Optional[str], signal.name)),
sector=sector,
rows=rows_by_ticker.get(source_ticker, []),
kospi_return=kospi_return,
)
if source_metrics is None:
source_metrics = {
"score": _calculate_signal_strength(signal),
"one_month_return": Decimal("0"),
"relative_strength": Decimal("0"),
}
source_score = cast(Decimal, source_metrics["score"])
candidates: list[dict[str, Any]] = []
for ticker in candidate_tickers:
if ticker == source_ticker:
continue
metrics = _score_recent_candidate(
ticker=ticker,
name=stock_names.get(ticker),
sector=sector,
rows=rows_by_ticker.get(ticker, []),
kospi_return=kospi_return,
)
if metrics is None:
continue
is_stronger = cast(Decimal, metrics["score"]) > source_score
if not is_stronger:
continue
metrics["is_stronger_than_source"] = True
candidates.append(metrics)
candidates.sort(key=lambda item: cast(Decimal, item["score"]), reverse=True)
limited_candidates = candidates[:limit_per_signal]
responses.append({
"signal_id": signal_id,
"screen_date": cast(date, signal.screen_date),
"ticker": source_ticker,
"name": cast(Optional[str], signal.name),
"sector": sector,
"source_score": source_score,
"source_one_month_return": cast(Decimal, source_metrics["one_month_return"]),
"source_relative_strength": cast(Decimal, source_metrics["relative_strength"]),
"stronger_count": len(candidates),
"candidates": limited_candidates,
})
responses.sort(key=lambda item: (item["stronger_count"], item["screen_date"]), reverse=True)
return responses
@router.post("/api/screening/execute", response_model=dict)
async def execute_screening(
current_user: CurrentUser,
db: Session = Depends(get_db),
execute_orders: bool = Query(False, description="true일 때만 KIS 주문 전송"),
):
from app.core.config import get_settings
from app.services.trading.kis_executor import KISTradeExecutor
from app.models.stock import Price
from app.services.strategy.kospi_screener import KJBScreeningSignalGenerator
from jobs.screening_job import _latest_price_date, _price_rows_to_frame
from datetime import datetime, timedelta
settings = get_settings()
if not settings.kis_app_key or not settings.kis_app_secret:
raise HTTPException(status_code=400, detail="KIS API credentials not configured")
watching = (
db.query(ScreeningSignal)
.filter(ScreeningSignal.status.in_(["pending", "watching"]))
.all()
)
if not watching:
return {"message": "No watching signals to execute", "orders": [], "execute_orders": execute_orders}
executor = KISTradeExecutor(
app_key=settings.kis_app_key,
app_secret=settings.kis_app_secret,
account_no=settings.kis_account_no,
paper_trade=settings.kis_paper_trade,
)
balance = executor.get_account_balance()
capital = balance.total_amount or balance.available_amount
if capital <= 0:
raise HTTPException(status_code=400, detail="KIS account balance unavailable")
signal_gen = KJBScreeningSignalGenerator()
latest_date = _latest_price_date(db, market="KOSPI") or date.today()
held_tickers = {position.ticker for position in executor.get_positions()}
results = []
for signal in watching:
signal_id = cast(int, signal.id)
signal_ticker = cast(str, signal.ticker)
signal_screen_date = cast(date, signal.screen_date)
if signal_ticker in held_tickers:
results.append({
"ticker": signal_ticker,
"success": False,
"status": "skipped",
"message": "Already held",
})
continue
rows = (
db.query(Price)
.filter(
Price.ticker == signal_ticker,
Price.date >= (signal_screen_date - timedelta(days=20)),
Price.date <= latest_date,
)
.order_by(Price.date)
.all()
)
price_df = _price_rows_to_frame(rows)
entry = signal_gen.check_entry(signal_ticker, price_df, signal_screen_date)
if not entry:
results.append({
"ticker": signal_ticker,
"success": False,
"status": "waiting",
"message": "Entry condition not met",
})
continue
entry_price = float(entry["entry_price"])
stop_price = float(entry["stop_price"])
qty = signal_gen.calculate_position_size(
entry_price=entry_price,
stop_price=stop_price,
capital=capital,
risk_pct=settings.screening_risk_pct,
)
max_qty_by_order = int(settings.screening_max_order_amount / entry_price)
qty = min(qty, max_qty_by_order)
if qty <= 0:
results.append({
"ticker": signal_ticker,
"success": False,
"status": "skipped",
"entry_price": entry_price,
"stop_price": stop_price,
"qty": 0,
"message": "Calculated quantity is zero",
})
continue
entry_date = entry["entry_date"].date() if hasattr(entry["entry_date"], "date") else entry["entry_date"]
setattr(signal, "entry_date", entry_date)
setattr(signal, "entry_price", Decimal(str(entry_price)))
if not execute_orders:
setattr(signal, "status", "watching")
results.append({
"ticker": signal_ticker,
"success": True,
"status": "planned",
"entry_price": entry_price,
"stop_price": stop_price,
"qty": qty,
"risk_amount": (entry_price - stop_price) * qty,
"message": "Dry-run only. Add execute_orders=true to place KIS order.",
})
continue
order = executor.place_buy_order(ticker=signal_ticker, qty=qty, price=int(entry_price))
auto_order = AutoOrder(
order_date=datetime.now(),
ticker=signal_ticker,
order_type="buy",
qty=qty,
price=entry_price,
order_no=order.order_no,
status="filled" if order.success else "rejected",
screening_signal_id=signal_id,
)
db.add(auto_order)
if order.success:
setattr(signal, "status", "entered")
held_tickers.add(signal_ticker)
results.append({
"ticker": signal_ticker,
"success": order.success,
"status": "ordered" if order.success else "rejected",
"order_no": order.order_no,
"entry_price": entry_price,
"stop_price": stop_price,
"qty": qty,
"message": order.message,
})
db.commit()
ordered_count = sum(1 for item in results if item.get("status") == "ordered")
planned_count = sum(1 for item in results if item.get("status") == "planned")
return {
"message": f"ordered={ordered_count}, planned={planned_count}, total={len(results)}",
"execute_orders": execute_orders,
"orders": results,
}
@router.get("/api/trading/orders", response_model=List[AutoOrderResponse])
async def get_orders(
current_user: CurrentUser,
db: Session = Depends(get_db),
limit: int = Query(100, ge=1, le=1000),
):
orders = (
db.query(AutoOrder)
.order_by(AutoOrder.order_date.desc())
.limit(limit)
.all()
)
return orders
@router.get("/api/trading/positions", response_model=dict)
async def get_trading_positions(
current_user: CurrentUser,
):
from app.core.config import get_settings
from app.services.trading.kis_executor import KISTradeExecutor
settings = get_settings()
if not settings.kis_app_key or not settings.kis_app_secret:
raise HTTPException(status_code=400, detail="KIS API credentials not configured")
executor = KISTradeExecutor(
app_key=settings.kis_app_key,
app_secret=settings.kis_app_secret,
account_no=settings.kis_account_no,
paper_trade=settings.kis_paper_trade,
)
positions = executor.get_positions()
return {"positions": [
{
"ticker": p.ticker,
"name": p.name,
"qty": p.qty,
"avg_price": p.avg_price,
"current_price": p.current_price,
"pnl_amount": p.pnl_amount,
"pnl_rate": p.pnl_rate,
}
for p in positions
]}
@router.get("/api/trading/balance", response_model=dict)
async def get_trading_balance(
current_user: CurrentUser,
):
from app.core.config import get_settings
from app.services.trading.kis_executor import KISTradeExecutor
settings = get_settings()
if not settings.kis_app_key or not settings.kis_app_secret:
raise HTTPException(status_code=400, detail="KIS API credentials not configured")
executor = KISTradeExecutor(
app_key=settings.kis_app_key,
app_secret=settings.kis_app_secret,
account_no=settings.kis_account_no,
paper_trade=settings.kis_paper_trade,
)
balance = executor.get_account_balance()
return {
"total_amount": balance.total_amount,
"available_amount": balance.available_amount,
"stock_amount": balance.stock_amount,
"pnl_amount": balance.pnl_amount,
}