galaxis-po/backend/app/services/price_service.py
zephyrdark 8842928363 feat: add PriceService and snapshot API endpoints
- PriceService: Mock implementation using DB prices
- Snapshot schemas: SnapshotListItem, ReturnsResponse, ReturnDataPoint
- Snapshot API: list, create, get, delete snapshots
- Returns API: portfolio returns calculation with CAGR

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-03 12:23:56 +09:00

184 lines
4.8 KiB
Python

"""
Price service for fetching stock prices.
This is a mock implementation that uses DB data.
Can be replaced with real OpenAPI implementation later.
"""
from datetime import date, timedelta
from decimal import Decimal
from typing import Dict, List, Optional
from sqlalchemy.orm import Session
from sqlalchemy import func
from app.models.stock import Price
class PriceData:
"""Price data point."""
def __init__(
self,
ticker: str,
date: date,
open: Decimal,
high: Decimal,
low: Decimal,
close: Decimal,
volume: int,
):
self.ticker = ticker
self.date = date
self.open = open
self.high = high
self.low = low
self.close = close
self.volume = volume
class PriceService:
"""
Service for fetching stock prices.
Current implementation uses DB data (prices table).
Can be extended to use real-time OpenAPI in the future.
"""
def __init__(self, db: Session):
self.db = db
def get_current_price(self, ticker: str) -> Optional[Decimal]:
"""
Get current price for a single ticker.
Returns the most recent closing price from DB.
"""
result = (
self.db.query(Price.close)
.filter(Price.ticker == ticker)
.order_by(Price.date.desc())
.first()
)
return Decimal(str(result[0])) if result else None
def get_current_prices(self, tickers: List[str]) -> Dict[str, Decimal]:
"""
Get current prices for multiple tickers.
Returns a dict mapping ticker to most recent closing price.
"""
if not tickers:
return {}
# Subquery to get max date for each ticker
subquery = (
self.db.query(
Price.ticker,
func.max(Price.date).label('max_date')
)
.filter(Price.ticker.in_(tickers))
.group_by(Price.ticker)
.subquery()
)
# Get prices at max date
results = (
self.db.query(Price.ticker, Price.close)
.join(
subquery,
(Price.ticker == subquery.c.ticker) &
(Price.date == subquery.c.max_date)
)
.all()
)
return {ticker: Decimal(str(close)) for ticker, close in results}
def get_price_history(
self,
ticker: str,
start_date: date,
end_date: date,
) -> List[PriceData]:
"""
Get price history for a ticker within date range.
"""
results = (
self.db.query(Price)
.filter(
Price.ticker == ticker,
Price.date >= start_date,
Price.date <= end_date,
)
.order_by(Price.date)
.all()
)
return [
PriceData(
ticker=p.ticker,
date=p.date,
open=Decimal(str(p.open)),
high=Decimal(str(p.high)),
low=Decimal(str(p.low)),
close=Decimal(str(p.close)),
volume=p.volume,
)
for p in results
]
def get_price_at_date(self, ticker: str, target_date: date) -> Optional[Decimal]:
"""
Get closing price at specific date.
If no price on exact date, returns most recent price before that date.
"""
result = (
self.db.query(Price.close)
.filter(
Price.ticker == ticker,
Price.date <= target_date,
)
.order_by(Price.date.desc())
.first()
)
return Decimal(str(result[0])) if result else None
def get_prices_at_date(
self,
tickers: List[str],
target_date: date,
) -> Dict[str, Decimal]:
"""
Get closing prices for multiple tickers at specific date.
"""
if not tickers:
return {}
# Subquery to get max date <= target_date for each ticker
subquery = (
self.db.query(
Price.ticker,
func.max(Price.date).label('max_date')
)
.filter(
Price.ticker.in_(tickers),
Price.date <= target_date,
)
.group_by(Price.ticker)
.subquery()
)
# Get prices at those dates
results = (
self.db.query(Price.ticker, Price.close)
.join(
subquery,
(Price.ticker == subquery.c.ticker) &
(Price.date == subquery.c.max_date)
)
.all()
)
return {ticker: Decimal(str(close)) for ticker, close in results}