galaxis-po/backend/jobs/snapshot_job.py

141 lines
4.4 KiB
Python
Raw Permalink Normal View History

"""
Snapshot batch job for automatic daily snapshots.
"""
import logging
from datetime import date, datetime
from decimal import Decimal
from sqlalchemy.orm import Session
from app.core.database import SessionLocal
from app.models.portfolio import Portfolio, PortfolioSnapshot, SnapshotHolding
from app.models.stock import JobLog
from app.services.price_service import PriceService
logger = logging.getLogger(__name__)
def create_daily_snapshots():
"""
Create snapshots for all portfolios with holdings.
This job runs daily after market close to record portfolio values.
"""
db: Session = SessionLocal()
job_log = None
try:
# Create job log
job_log = JobLog(
job_name="daily_snapshots",
status="running",
started_at=datetime.utcnow(),
)
db.add(job_log)
db.commit()
# Get all portfolios with holdings
portfolios = (
db.query(Portfolio)
.filter(Portfolio.holdings.any())
.all()
)
logger.info(f"Creating snapshots for {len(portfolios)} portfolios")
price_service = PriceService(db)
records_count = 0
today = date.today()
for portfolio in portfolios:
try:
# Check if snapshot already exists for today
existing = (
db.query(PortfolioSnapshot)
.filter(
PortfolioSnapshot.portfolio_id == portfolio.id,
PortfolioSnapshot.snapshot_date == today,
)
.first()
)
if existing:
logger.debug(f"Snapshot already exists for portfolio {portfolio.id}")
continue
# Get current prices
tickers = [h.ticker for h in portfolio.holdings]
prices = price_service.get_current_prices(tickers)
# Calculate total value
total_value = Decimal("0")
holding_values = []
for holding in portfolio.holdings:
price = prices.get(holding.ticker, Decimal("0"))
value = price * holding.quantity
total_value += value
holding_values.append({
"ticker": holding.ticker,
"quantity": holding.quantity,
"price": price,
"value": value,
})
# Skip if no value
if total_value <= 0:
logger.warning(f"Portfolio {portfolio.id} has zero value, skipping")
continue
# Create snapshot
snapshot = PortfolioSnapshot(
portfolio_id=portfolio.id,
total_value=total_value,
snapshot_date=today,
)
db.add(snapshot)
db.flush()
# Create snapshot holdings
for hv in holding_values:
ratio = (hv["value"] / total_value * 100).quantize(Decimal("0.01"))
snapshot_holding = SnapshotHolding(
snapshot_id=snapshot.id,
ticker=hv["ticker"],
quantity=hv["quantity"],
price=hv["price"],
value=hv["value"],
current_ratio=ratio,
)
db.add(snapshot_holding)
records_count += 1
logger.info(f"Created snapshot for portfolio {portfolio.id}: {total_value}")
except Exception as e:
logger.error(f"Error creating snapshot for portfolio {portfolio.id}: {e}")
continue
db.commit()
# Update job log
if job_log:
job_log.status = "success"
job_log.finished_at = datetime.utcnow()
job_log.records_count = records_count
db.commit()
logger.info(f"Daily snapshots completed: {records_count} snapshots created")
except Exception as e:
logger.error(f"Daily snapshots job failed: {e}")
if job_log:
job_log.status = "failed"
job_log.finished_at = datetime.utcnow()
job_log.error_msg = str(e)
db.commit()
raise
finally:
db.close()