penti/backend/app/services/rebalancing_service.py

320 lines
9.3 KiB
Python

"""Rebalancing service."""
from typing import Dict, List
from decimal import Decimal
from sqlalchemy.orm import Session
from uuid import UUID
from app.models.portfolio import Portfolio, PortfolioAsset
from app.models.asset import Asset
from app.utils.data_helpers import get_latest_price
from datetime import datetime
class RebalancingService:
"""리밸런싱 서비스."""
@staticmethod
def calculate_rebalancing(
portfolio_id: UUID,
current_holdings: Dict[str, float],
cash: float,
db_session: Session
) -> Dict:
"""
리밸런싱 계산.
Args:
portfolio_id: 포트폴리오 ID
current_holdings: 현재 보유 수량 {ticker: quantity}
cash: 현금
db_session: 데이터베이스 세션
Returns:
리밸런싱 추천 딕셔너리
"""
# 1. 포트폴리오 조회
portfolio = db_session.query(Portfolio).filter(
Portfolio.id == portfolio_id
).first()
if not portfolio:
raise ValueError("포트폴리오를 찾을 수 없습니다")
# 2. 목표 비율 가져오기
target_ratios = {
asset.ticker: float(asset.target_ratio) / 100.0
for asset in portfolio.assets
}
# 3. 현재 가격 조회
all_tickers = set(target_ratios.keys()) | set(current_holdings.keys())
current_prices = {}
for ticker in all_tickers:
asset = db_session.query(Asset).filter(Asset.ticker == ticker).first()
if asset and asset.last_price:
current_prices[ticker] = float(asset.last_price)
else:
# 최신 가격 조회 시도
price = get_latest_price(db_session, ticker, datetime.now())
if price > 0:
current_prices[ticker] = float(price)
else:
current_prices[ticker] = 0
# 4. 현재 자산 가치 계산
current_values = {}
for ticker, quantity in current_holdings.items():
price = current_prices.get(ticker, 0)
current_values[ticker] = quantity * price
# 5. 총 자산 계산
total_holdings_value = sum(current_values.values())
total_value = total_holdings_value + cash
# 6. 목표 금액 계산
target_values = {
ticker: total_value * ratio
for ticker, ratio in target_ratios.items()
}
# 7. 리밸런싱 추천 생성
recommendations = []
for ticker in all_tickers:
# 종목명 조회
asset = db_session.query(Asset).filter(Asset.ticker == ticker).first()
name = asset.name if asset else ticker
current_quantity = current_holdings.get(ticker, 0)
current_value = current_values.get(ticker, 0)
current_price = current_prices.get(ticker, 0)
target_ratio = target_ratios.get(ticker, 0)
target_value = target_values.get(ticker, 0)
current_ratio = (current_value / total_value * 100) if total_value > 0 else 0
delta_value = target_value - current_value
# 매수/매도 수량 계산
if current_price > 0:
delta_quantity = delta_value / current_price
# 정수 주로 변환
delta_quantity = int(delta_quantity)
else:
delta_quantity = 0
# 액션 결정
if delta_quantity > 0:
action = 'buy'
elif delta_quantity < 0:
action = 'sell'
delta_quantity = abs(delta_quantity)
# 보유 수량을 초과하지 않도록
delta_quantity = min(delta_quantity, current_quantity)
else:
action = 'hold'
recommendations.append({
'ticker': ticker,
'name': name,
'current_quantity': current_quantity,
'current_value': round(current_value, 2),
'current_ratio': round(current_ratio, 2),
'target_ratio': round(target_ratio * 100, 2),
'target_value': round(target_value, 2),
'delta_value': round(delta_value, 2),
'delta_quantity': abs(delta_quantity),
'action': action,
'current_price': round(current_price, 2)
})
# 8. 요약 통계
summary = {
'buy': sum(1 for r in recommendations if r['action'] == 'buy'),
'sell': sum(1 for r in recommendations if r['action'] == 'sell'),
'hold': sum(1 for r in recommendations if r['action'] == 'hold')
}
return {
'total_value': round(total_value, 2),
'cash': round(cash, 2),
'recommendations': recommendations,
'summary': summary
}
class PortfolioService:
"""포트폴리오 서비스."""
@staticmethod
def create_portfolio(
name: str,
description: str,
assets: List[Dict],
user_id: str,
db_session: Session
) -> Portfolio:
"""
포트폴리오 생성.
Args:
name: 포트폴리오 이름
description: 설명
assets: 자산 리스트 [{'ticker': ..., 'target_ratio': ...}]
user_id: 사용자 ID
db_session: 데이터베이스 세션
Returns:
생성된 포트폴리오
"""
# 포트폴리오 생성
portfolio = Portfolio(
name=name,
description=description,
user_id=user_id
)
db_session.add(portfolio)
db_session.flush()
# 자산 추가
for asset_data in assets:
asset = PortfolioAsset(
portfolio_id=portfolio.id,
ticker=asset_data['ticker'],
target_ratio=asset_data['target_ratio']
)
db_session.add(asset)
db_session.commit()
db_session.refresh(portfolio)
return portfolio
@staticmethod
def get_portfolio(portfolio_id: UUID, db_session: Session) -> Portfolio:
"""
포트폴리오 조회.
Args:
portfolio_id: 포트폴리오 ID
db_session: 데이터베이스 세션
Returns:
포트폴리오
"""
portfolio = db_session.query(Portfolio).filter(
Portfolio.id == portfolio_id
).first()
return portfolio
@staticmethod
def list_portfolios(
db_session: Session,
user_id: str = None,
skip: int = 0,
limit: int = 100
) -> Dict:
"""
포트폴리오 목록 조회.
Args:
db_session: 데이터베이스 세션
user_id: 사용자 ID (필터)
skip: 건너뛸 레코드 수
limit: 최대 레코드 수
Returns:
포트폴리오 목록
"""
query = db_session.query(Portfolio)
if user_id:
query = query.filter(Portfolio.user_id == user_id)
total = query.count()
items = query.order_by(Portfolio.created_at.desc()).offset(skip).limit(limit).all()
return {
'items': items,
'total': total
}
@staticmethod
def update_portfolio(
portfolio_id: UUID,
name: str = None,
description: str = None,
assets: List[Dict] = None,
db_session: Session = None
) -> Portfolio:
"""
포트폴리오 수정.
Args:
portfolio_id: 포트폴리오 ID
name: 새 이름
description: 새 설명
assets: 새 자산 리스트
db_session: 데이터베이스 세션
Returns:
수정된 포트폴리오
"""
portfolio = db_session.query(Portfolio).filter(
Portfolio.id == portfolio_id
).first()
if not portfolio:
raise ValueError("포트폴리오를 찾을 수 없습니다")
if name:
portfolio.name = name
if description is not None:
portfolio.description = description
if assets is not None:
# 기존 자산 삭제
db_session.query(PortfolioAsset).filter(
PortfolioAsset.portfolio_id == portfolio_id
).delete()
# 새 자산 추가
for asset_data in assets:
asset = PortfolioAsset(
portfolio_id=portfolio.id,
ticker=asset_data['ticker'],
target_ratio=asset_data['target_ratio']
)
db_session.add(asset)
db_session.commit()
db_session.refresh(portfolio)
return portfolio
@staticmethod
def delete_portfolio(portfolio_id: UUID, db_session: Session) -> bool:
"""
포트폴리오 삭제.
Args:
portfolio_id: 포트폴리오 ID
db_session: 데이터베이스 세션
Returns:
삭제 성공 여부
"""
portfolio = db_session.query(Portfolio).filter(
Portfolio.id == portfolio_id
).first()
if not portfolio:
return False
db_session.delete(portfolio)
db_session.commit()
return True