- APScheduler for daily snapshots (18:30 weekdays) - ReturnsCalculator with CAGR, TWR, MDD, volatility - Portfolio history page with snapshots and returns tabs - FastAPI lifespan integration for scheduler Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
141 lines
4.4 KiB
Python
141 lines
4.4 KiB
Python
"""
|
|
Snapshot batch job for automatic daily snapshots.
|
|
"""
|
|
import logging
|
|
from datetime import date, datetime
|
|
from decimal import Decimal
|
|
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.core.database import SessionLocal
|
|
from app.models.portfolio import Portfolio, PortfolioSnapshot, SnapshotHolding
|
|
from app.models.stock import JobLog
|
|
from app.services.price_service import PriceService
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def create_daily_snapshots():
|
|
"""
|
|
Create snapshots for all portfolios with holdings.
|
|
|
|
This job runs daily after market close to record portfolio values.
|
|
"""
|
|
db: Session = SessionLocal()
|
|
job_log = None
|
|
|
|
try:
|
|
# Create job log
|
|
job_log = JobLog(
|
|
job_name="daily_snapshots",
|
|
status="running",
|
|
started_at=datetime.utcnow(),
|
|
)
|
|
db.add(job_log)
|
|
db.commit()
|
|
|
|
# Get all portfolios with holdings
|
|
portfolios = (
|
|
db.query(Portfolio)
|
|
.filter(Portfolio.holdings.any())
|
|
.all()
|
|
)
|
|
|
|
logger.info(f"Creating snapshots for {len(portfolios)} portfolios")
|
|
|
|
price_service = PriceService(db)
|
|
records_count = 0
|
|
today = date.today()
|
|
|
|
for portfolio in portfolios:
|
|
try:
|
|
# Check if snapshot already exists for today
|
|
existing = (
|
|
db.query(PortfolioSnapshot)
|
|
.filter(
|
|
PortfolioSnapshot.portfolio_id == portfolio.id,
|
|
PortfolioSnapshot.snapshot_date == today,
|
|
)
|
|
.first()
|
|
)
|
|
|
|
if existing:
|
|
logger.debug(f"Snapshot already exists for portfolio {portfolio.id}")
|
|
continue
|
|
|
|
# Get current prices
|
|
tickers = [h.ticker for h in portfolio.holdings]
|
|
prices = price_service.get_current_prices(tickers)
|
|
|
|
# Calculate total value
|
|
total_value = Decimal("0")
|
|
holding_values = []
|
|
|
|
for holding in portfolio.holdings:
|
|
price = prices.get(holding.ticker, Decimal("0"))
|
|
value = price * holding.quantity
|
|
total_value += value
|
|
holding_values.append({
|
|
"ticker": holding.ticker,
|
|
"quantity": holding.quantity,
|
|
"price": price,
|
|
"value": value,
|
|
})
|
|
|
|
# Skip if no value
|
|
if total_value <= 0:
|
|
logger.warning(f"Portfolio {portfolio.id} has zero value, skipping")
|
|
continue
|
|
|
|
# Create snapshot
|
|
snapshot = PortfolioSnapshot(
|
|
portfolio_id=portfolio.id,
|
|
total_value=total_value,
|
|
snapshot_date=today,
|
|
)
|
|
db.add(snapshot)
|
|
db.flush()
|
|
|
|
# Create snapshot holdings
|
|
for hv in holding_values:
|
|
ratio = (hv["value"] / total_value * 100).quantize(Decimal("0.01"))
|
|
snapshot_holding = SnapshotHolding(
|
|
snapshot_id=snapshot.id,
|
|
ticker=hv["ticker"],
|
|
quantity=hv["quantity"],
|
|
price=hv["price"],
|
|
value=hv["value"],
|
|
current_ratio=ratio,
|
|
)
|
|
db.add(snapshot_holding)
|
|
|
|
records_count += 1
|
|
logger.info(f"Created snapshot for portfolio {portfolio.id}: {total_value}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error creating snapshot for portfolio {portfolio.id}: {e}")
|
|
continue
|
|
|
|
db.commit()
|
|
|
|
# Update job log
|
|
if job_log:
|
|
job_log.status = "success"
|
|
job_log.finished_at = datetime.utcnow()
|
|
job_log.records_count = records_count
|
|
db.commit()
|
|
|
|
logger.info(f"Daily snapshots completed: {records_count} snapshots created")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Daily snapshots job failed: {e}")
|
|
if job_log:
|
|
job_log.status = "failed"
|
|
job_log.finished_at = datetime.utcnow()
|
|
job_log.error_msg = str(e)
|
|
db.commit()
|
|
raise
|
|
|
|
finally:
|
|
db.close()
|