""" Snapshot batch job for automatic daily snapshots. """ import logging from datetime import date, datetime from decimal import Decimal from sqlalchemy.orm import Session from app.core.database import SessionLocal from app.models.portfolio import Portfolio, PortfolioSnapshot, SnapshotHolding from app.models.stock import JobLog from app.services.price_service import PriceService logger = logging.getLogger(__name__) def create_daily_snapshots(): """ Create snapshots for all portfolios with holdings. This job runs daily after market close to record portfolio values. """ db: Session = SessionLocal() job_log = None try: # Create job log job_log = JobLog( job_name="daily_snapshots", status="running", started_at=datetime.utcnow(), ) db.add(job_log) db.commit() # Get all portfolios with holdings portfolios = ( db.query(Portfolio) .filter(Portfolio.holdings.any()) .all() ) logger.info(f"Creating snapshots for {len(portfolios)} portfolios") price_service = PriceService(db) records_count = 0 today = date.today() for portfolio in portfolios: try: # Check if snapshot already exists for today existing = ( db.query(PortfolioSnapshot) .filter( PortfolioSnapshot.portfolio_id == portfolio.id, PortfolioSnapshot.snapshot_date == today, ) .first() ) if existing: logger.debug(f"Snapshot already exists for portfolio {portfolio.id}") continue # Get current prices tickers = [h.ticker for h in portfolio.holdings] prices = price_service.get_current_prices(tickers) # Calculate total value total_value = Decimal("0") holding_values = [] for holding in portfolio.holdings: price = prices.get(holding.ticker, Decimal("0")) value = price * holding.quantity total_value += value holding_values.append({ "ticker": holding.ticker, "quantity": holding.quantity, "price": price, "value": value, }) # Skip if no value if total_value <= 0: logger.warning(f"Portfolio {portfolio.id} has zero value, skipping") continue # Create snapshot snapshot = PortfolioSnapshot( portfolio_id=portfolio.id, total_value=total_value, snapshot_date=today, ) db.add(snapshot) db.flush() # Create snapshot holdings for hv in holding_values: ratio = (hv["value"] / total_value * 100).quantize(Decimal("0.01")) snapshot_holding = SnapshotHolding( snapshot_id=snapshot.id, ticker=hv["ticker"], quantity=hv["quantity"], price=hv["price"], value=hv["value"], current_ratio=ratio, ) db.add(snapshot_holding) records_count += 1 logger.info(f"Created snapshot for portfolio {portfolio.id}: {total_value}") except Exception as e: logger.error(f"Error creating snapshot for portfolio {portfolio.id}: {e}") continue db.commit() # Update job log if job_log: job_log.status = "success" job_log.finished_at = datetime.utcnow() job_log.records_count = records_count db.commit() logger.info(f"Daily snapshots completed: {records_count} snapshots created") except Exception as e: logger.error(f"Daily snapshots job failed: {e}") if job_log: job_log.status = "failed" job_log.finished_at = datetime.utcnow() job_log.error_msg = str(e) db.commit() raise finally: db.close()