- APScheduler for daily snapshots (18:30 weekdays) - ReturnsCalculator with CAGR, TWR, MDD, volatility - Portfolio history page with snapshots and returns tabs - FastAPI lifespan integration for scheduler Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
46 lines
1.1 KiB
Python
46 lines
1.1 KiB
Python
"""
|
|
APScheduler configuration for background jobs.
|
|
"""
|
|
import logging
|
|
from apscheduler.schedulers.background import BackgroundScheduler
|
|
from apscheduler.triggers.cron import CronTrigger
|
|
|
|
from jobs.snapshot_job import create_daily_snapshots
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Create scheduler instance
|
|
scheduler = BackgroundScheduler()
|
|
|
|
|
|
def configure_jobs():
|
|
"""Configure scheduled jobs."""
|
|
# Daily snapshot at 18:30 (after market close)
|
|
scheduler.add_job(
|
|
create_daily_snapshots,
|
|
trigger=CronTrigger(
|
|
hour=18,
|
|
minute=30,
|
|
day_of_week='mon-fri',
|
|
),
|
|
id='daily_snapshots',
|
|
name='Create daily portfolio snapshots',
|
|
replace_existing=True,
|
|
)
|
|
logger.info("Configured daily_snapshots job")
|
|
|
|
|
|
def start_scheduler():
|
|
"""Start the scheduler."""
|
|
if not scheduler.running:
|
|
configure_jobs()
|
|
scheduler.start()
|
|
logger.info("Scheduler started")
|
|
|
|
|
|
def stop_scheduler():
|
|
"""Stop the scheduler."""
|
|
if scheduler.running:
|
|
scheduler.shutdown()
|
|
logger.info("Scheduler stopped")
|