85 lines
2.6 KiB
Python

"""
Background worker for backtest execution.
"""
from datetime import datetime, timezone
from concurrent.futures import ThreadPoolExecutor
import logging
from sqlalchemy.orm import Session
from app.core.database import SessionLocal
from app.models.backtest import Backtest, BacktestStatus
from app.services.backtest.engine import BacktestEngine
logger = logging.getLogger(__name__)
# Thread pool for background execution
executor = ThreadPoolExecutor(max_workers=2)
def submit_backtest(backtest_id: int) -> None:
"""
Submit a backtest job for background execution.
Returns immediately, backtest runs in background thread.
"""
executor.submit(_run_backtest_job, backtest_id)
logger.info(f"Backtest {backtest_id} submitted for background execution")
def _run_backtest_job(backtest_id: int) -> None:
"""
Execute backtest in background thread.
Creates its own database session.
"""
db: Session = SessionLocal()
try:
# Update status to running
backtest = db.get(Backtest, backtest_id)
if not backtest:
logger.error(f"Backtest {backtest_id} not found")
return
backtest.status = BacktestStatus.RUNNING
db.commit()
logger.info(f"Backtest {backtest_id} started")
# Run backtest - route KJB to DailyBacktestEngine
if backtest.strategy_type == "kjb":
from app.services.backtest.daily_engine import DailyBacktestEngine
engine = DailyBacktestEngine(db)
else:
engine = BacktestEngine(db)
engine.run(backtest_id)
# Update status to completed
backtest.status = BacktestStatus.COMPLETED
backtest.completed_at = datetime.now(timezone.utc)
db.commit()
logger.info(f"Backtest {backtest_id} completed successfully")
except Exception as e:
logger.exception(f"Backtest {backtest_id} failed: {e}")
# Update status to failed
try:
backtest = db.get(Backtest, backtest_id)
if backtest:
backtest.status = BacktestStatus.FAILED
backtest.error_message = str(e)[:1000] # Limit error message length
backtest.completed_at = datetime.now(timezone.utc)
db.commit()
except Exception as commit_error:
logger.exception(f"Failed to update backtest status: {commit_error}")
finally:
db.close()
def get_executor_status() -> dict:
"""Get current executor status for monitoring."""
return {
"max_workers": executor._max_workers,
"pending_tasks": executor._work_queue.qsize() if hasattr(executor, '_work_queue') else 0,
}