penti/backend/app/services/rebalancing_service.py

320 lines
9.3 KiB
Python
Raw Permalink Normal View History

2026-01-31 23:30:51 +09:00
"""Rebalancing service."""
from typing import Dict, List
from decimal import Decimal
from sqlalchemy.orm import Session
from uuid import UUID
from app.models.portfolio import Portfolio, PortfolioAsset
from app.models.asset import Asset
from app.utils.data_helpers import get_latest_price
from datetime import datetime
class RebalancingService:
"""리밸런싱 서비스."""
@staticmethod
def calculate_rebalancing(
portfolio_id: UUID,
current_holdings: Dict[str, float],
cash: float,
db_session: Session
) -> Dict:
"""
리밸런싱 계산.
Args:
portfolio_id: 포트폴리오 ID
current_holdings: 현재 보유 수량 {ticker: quantity}
cash: 현금
db_session: 데이터베이스 세션
Returns:
리밸런싱 추천 딕셔너리
"""
# 1. 포트폴리오 조회
portfolio = db_session.query(Portfolio).filter(
Portfolio.id == portfolio_id
).first()
if not portfolio:
raise ValueError("포트폴리오를 찾을 수 없습니다")
# 2. 목표 비율 가져오기
target_ratios = {
asset.ticker: float(asset.target_ratio) / 100.0
for asset in portfolio.assets
}
# 3. 현재 가격 조회
all_tickers = set(target_ratios.keys()) | set(current_holdings.keys())
current_prices = {}
for ticker in all_tickers:
asset = db_session.query(Asset).filter(Asset.ticker == ticker).first()
if asset and asset.last_price:
current_prices[ticker] = float(asset.last_price)
else:
# 최신 가격 조회 시도
price = get_latest_price(db_session, ticker, datetime.now())
if price > 0:
current_prices[ticker] = float(price)
else:
current_prices[ticker] = 0
# 4. 현재 자산 가치 계산
current_values = {}
for ticker, quantity in current_holdings.items():
price = current_prices.get(ticker, 0)
current_values[ticker] = quantity * price
# 5. 총 자산 계산
total_holdings_value = sum(current_values.values())
total_value = total_holdings_value + cash
# 6. 목표 금액 계산
target_values = {
ticker: total_value * ratio
for ticker, ratio in target_ratios.items()
}
# 7. 리밸런싱 추천 생성
recommendations = []
for ticker in all_tickers:
# 종목명 조회
asset = db_session.query(Asset).filter(Asset.ticker == ticker).first()
name = asset.name if asset else ticker
current_quantity = current_holdings.get(ticker, 0)
current_value = current_values.get(ticker, 0)
current_price = current_prices.get(ticker, 0)
target_ratio = target_ratios.get(ticker, 0)
target_value = target_values.get(ticker, 0)
current_ratio = (current_value / total_value * 100) if total_value > 0 else 0
delta_value = target_value - current_value
# 매수/매도 수량 계산
if current_price > 0:
delta_quantity = delta_value / current_price
# 정수 주로 변환
delta_quantity = int(delta_quantity)
else:
delta_quantity = 0
# 액션 결정
if delta_quantity > 0:
action = 'buy'
elif delta_quantity < 0:
action = 'sell'
delta_quantity = abs(delta_quantity)
# 보유 수량을 초과하지 않도록
delta_quantity = min(delta_quantity, current_quantity)
else:
action = 'hold'
recommendations.append({
'ticker': ticker,
'name': name,
'current_quantity': current_quantity,
'current_value': round(current_value, 2),
'current_ratio': round(current_ratio, 2),
'target_ratio': round(target_ratio * 100, 2),
'target_value': round(target_value, 2),
'delta_value': round(delta_value, 2),
'delta_quantity': abs(delta_quantity),
'action': action,
'current_price': round(current_price, 2)
})
# 8. 요약 통계
summary = {
'buy': sum(1 for r in recommendations if r['action'] == 'buy'),
'sell': sum(1 for r in recommendations if r['action'] == 'sell'),
'hold': sum(1 for r in recommendations if r['action'] == 'hold')
}
return {
'total_value': round(total_value, 2),
'cash': round(cash, 2),
'recommendations': recommendations,
'summary': summary
}
class PortfolioService:
"""포트폴리오 서비스."""
@staticmethod
def create_portfolio(
name: str,
description: str,
assets: List[Dict],
user_id: str,
db_session: Session
) -> Portfolio:
"""
포트폴리오 생성.
Args:
name: 포트폴리오 이름
description: 설명
assets: 자산 리스트 [{'ticker': ..., 'target_ratio': ...}]
user_id: 사용자 ID
db_session: 데이터베이스 세션
Returns:
생성된 포트폴리오
"""
# 포트폴리오 생성
portfolio = Portfolio(
name=name,
description=description,
user_id=user_id
)
db_session.add(portfolio)
db_session.flush()
# 자산 추가
for asset_data in assets:
asset = PortfolioAsset(
portfolio_id=portfolio.id,
ticker=asset_data['ticker'],
target_ratio=asset_data['target_ratio']
)
db_session.add(asset)
db_session.commit()
db_session.refresh(portfolio)
return portfolio
@staticmethod
def get_portfolio(portfolio_id: UUID, db_session: Session) -> Portfolio:
"""
포트폴리오 조회.
Args:
portfolio_id: 포트폴리오 ID
db_session: 데이터베이스 세션
Returns:
포트폴리오
"""
portfolio = db_session.query(Portfolio).filter(
Portfolio.id == portfolio_id
).first()
return portfolio
@staticmethod
def list_portfolios(
db_session: Session,
user_id: str = None,
skip: int = 0,
limit: int = 100
) -> Dict:
"""
포트폴리오 목록 조회.
Args:
db_session: 데이터베이스 세션
user_id: 사용자 ID (필터)
skip: 건너뛸 레코드
limit: 최대 레코드
Returns:
포트폴리오 목록
"""
query = db_session.query(Portfolio)
if user_id:
query = query.filter(Portfolio.user_id == user_id)
total = query.count()
items = query.order_by(Portfolio.created_at.desc()).offset(skip).limit(limit).all()
return {
'items': items,
'total': total
}
@staticmethod
def update_portfolio(
portfolio_id: UUID,
name: str = None,
description: str = None,
assets: List[Dict] = None,
db_session: Session = None
) -> Portfolio:
"""
포트폴리오 수정.
Args:
portfolio_id: 포트폴리오 ID
name: 이름
description: 설명
assets: 자산 리스트
db_session: 데이터베이스 세션
Returns:
수정된 포트폴리오
"""
portfolio = db_session.query(Portfolio).filter(
Portfolio.id == portfolio_id
).first()
if not portfolio:
raise ValueError("포트폴리오를 찾을 수 없습니다")
if name:
portfolio.name = name
if description is not None:
portfolio.description = description
if assets is not None:
# 기존 자산 삭제
db_session.query(PortfolioAsset).filter(
PortfolioAsset.portfolio_id == portfolio_id
).delete()
# 새 자산 추가
for asset_data in assets:
asset = PortfolioAsset(
portfolio_id=portfolio.id,
ticker=asset_data['ticker'],
target_ratio=asset_data['target_ratio']
)
db_session.add(asset)
db_session.commit()
db_session.refresh(portfolio)
return portfolio
@staticmethod
def delete_portfolio(portfolio_id: UUID, db_session: Session) -> bool:
"""
포트폴리오 삭제.
Args:
portfolio_id: 포트폴리오 ID
db_session: 데이터베이스 세션
Returns:
삭제 성공 여부
"""
portfolio = db_session.query(Portfolio).filter(
Portfolio.id == portfolio_id
).first()
if not portfolio:
return False
db_session.delete(portfolio)
db_session.commit()
return True