zephyrdark 3c969fc53c feat: wire KJB into backtest worker, add Signal API, add scheduler job
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-19 15:16:13 +09:00

57 lines
1.5 KiB
Python

"""
KJB Signal API endpoints.
"""
from datetime import date
from typing import List, Optional
from fastapi import APIRouter, Depends, Query
from sqlalchemy.orm import Session
from app.core.database import get_db
from app.api.deps import CurrentUser
from app.models.signal import Signal
from app.schemas.signal import SignalResponse
router = APIRouter(prefix="/api/signal", tags=["signal"])
@router.get("/kjb/today", response_model=List[SignalResponse])
async def get_today_signals(
current_user: CurrentUser,
db: Session = Depends(get_db),
):
"""Get today's KJB trading signals."""
today = date.today()
signals = (
db.query(Signal)
.filter(Signal.date == today)
.order_by(Signal.signal_type, Signal.ticker)
.all()
)
return signals
@router.get("/kjb/history", response_model=List[SignalResponse])
async def get_signal_history(
current_user: CurrentUser,
db: Session = Depends(get_db),
start_date: Optional[date] = Query(None),
end_date: Optional[date] = Query(None),
ticker: Optional[str] = Query(None),
limit: int = Query(100, ge=1, le=1000),
):
"""Get historical KJB signals."""
query = db.query(Signal)
if start_date:
query = query.filter(Signal.date >= start_date)
if end_date:
query = query.filter(Signal.date <= end_date)
if ticker:
query = query.filter(Signal.ticker == ticker)
signals = (
query.order_by(Signal.date.desc(), Signal.ticker)
.limit(limit)
.all()
)
return signals