penti/backend/app/utils/data_helpers.py

329 lines
10 KiB
Python

"""Data query helper functions."""
from typing import List, Dict
from decimal import Decimal
from datetime import datetime, timedelta
from sqlalchemy.orm import Session
from sqlalchemy import and_, func
import pandas as pd
import numpy as np
from app.models import Asset, PriceData, FinancialStatement
def get_ticker_list(db_session: Session) -> pd.DataFrame:
"""
종목 리스트 조회.
Args:
db_session: 데이터베이스 세션
Returns:
종목 리스트 DataFrame
"""
assets = db_session.query(Asset).filter(Asset.is_active == True).all()
data = [{
'종목코드': asset.ticker,
'종목명': asset.name,
'시장': asset.market,
'섹터': asset.sector
} for asset in assets]
return pd.DataFrame(data)
def get_price_data(
db_session: Session,
tickers: List[str],
start_date: datetime,
end_date: datetime
) -> pd.DataFrame:
"""
가격 데이터 조회.
Args:
db_session: 데이터베이스 세션
tickers: 종목 코드 리스트
start_date: 시작일
end_date: 종료일
Returns:
가격 데이터 DataFrame
"""
prices = db_session.query(PriceData).filter(
and_(
PriceData.ticker.in_(tickers),
PriceData.timestamp >= start_date,
PriceData.timestamp <= end_date
)
).all()
data = [{
'종목코드': p.ticker,
'날짜': p.timestamp,
'시가': float(p.open) if p.open else None,
'고가': float(p.high) if p.high else None,
'저가': float(p.low) if p.low else None,
'종가': float(p.close),
'거래량': p.volume
} for p in prices]
return pd.DataFrame(data)
def get_latest_price(
db_session: Session,
ticker: str,
date: datetime
) -> Decimal:
"""
특정 날짜의 최신 가격 조회 (해당 날짜 또는 이전 가장 가까운 날짜).
Args:
db_session: 데이터베이스 세션
ticker: 종목 코드
date: 조회 날짜
Returns:
가격
"""
price = db_session.query(PriceData).filter(
and_(
PriceData.ticker == ticker,
PriceData.timestamp <= date
)
).order_by(PriceData.timestamp.desc()).first()
if price:
return price.close
return Decimal("0")
def get_prices_on_date(
db_session: Session,
tickers: List[str],
date: datetime
) -> Dict[str, Decimal]:
"""
특정 날짜의 종목들 가격 조회.
Args:
db_session: 데이터베이스 세션
tickers: 종목 코드 리스트
date: 조회 날짜
Returns:
{ticker: price} 딕셔너리
"""
prices = {}
for ticker in tickers:
price = get_latest_price(db_session, ticker, date)
if price > 0:
prices[ticker] = price
return prices
def get_financial_statements(
db_session: Session,
tickers: List[str],
base_date: datetime = None
) -> pd.DataFrame:
"""
재무제표 데이터 조회.
Args:
db_session: 데이터베이스 세션
tickers: 종목 코드 리스트
base_date: 기준일 (None이면 최신 데이터)
Returns:
재무제표 DataFrame
"""
query = db_session.query(FinancialStatement).filter(
FinancialStatement.ticker.in_(tickers)
)
if base_date:
query = query.filter(FinancialStatement.base_date <= base_date)
fs_data = query.all()
data = [{
'종목코드': fs.ticker,
'계정': fs.account,
'기준일': fs.base_date,
'': float(fs.value) if fs.value else None,
'공시구분': fs.disclosure_type
} for fs in fs_data]
return pd.DataFrame(data)
def get_value_indicators(
db_session: Session,
tickers: List[str],
base_date: datetime = None,
include_psr_pcr: bool = False
) -> pd.DataFrame:
"""
밸류 지표 조회 (PER, PBR, DY, 옵션으로 PSR, PCR).
Args:
db_session: 데이터베이스 세션
tickers: 종목 코드 리스트
base_date: 기준일 (PSR, PCR 계산용, None이면 최신)
include_psr_pcr: PSR, PCR 포함 여부
Returns:
밸류 지표 DataFrame
"""
assets = db_session.query(Asset).filter(
Asset.ticker.in_(tickers)
).all()
data = []
# PSR, PCR 계산을 위한 재무제표 데이터 (필요시)
psr_pcr_data = {}
if include_psr_pcr:
fs_list = get_financial_statements(db_session, tickers, base_date)
if not fs_list.empty:
# TTM 계산
fs_list = fs_list.sort_values(['종목코드', '계정', '기준일'])
fs_list['ttm'] = fs_list.groupby(['종목코드', '계정'], as_index=False)[''].rolling(
window=4, min_periods=4
).sum()['']
fs_list_clean = fs_list.copy()
# 자산과 자본은 평균, 나머지는 합
fs_list_clean['ttm'] = np.where(
fs_list_clean['계정'].isin(['자산', '자본']),
fs_list_clean['ttm'] / 4,
fs_list_clean['ttm']
)
fs_list_clean = fs_list_clean.groupby(['종목코드', '계정']).tail(1)
# Pivot
fs_pivot = fs_list_clean.pivot(index='종목코드', columns='계정', values='ttm')
for ticker in fs_pivot.index:
psr_pcr_data[ticker] = {
'매출액': fs_pivot.loc[ticker, '매출액'] if '매출액' in fs_pivot.columns else None,
'영업활동으로인한현금흐름': fs_pivot.loc[ticker, '영업활동으로인한현금흐름'] if '영업활동으로인한현금흐름' in fs_pivot.columns else None
}
for asset in assets:
# PER 계산
per = float(asset.last_price / asset.eps) if asset.eps and asset.eps > 0 else None
# PBR 계산
pbr = float(asset.last_price / asset.bps) if asset.bps and asset.bps > 0 else None
# DY 계산 (배당수익률)
dy = float(asset.dividend_per_share / asset.last_price * 100) if asset.last_price and asset.last_price > 0 else None
# 종목별 지표 추가
if per:
data.append({'종목코드': asset.ticker, '지표': 'PER', '': per})
if pbr:
data.append({'종목코드': asset.ticker, '지표': 'PBR', '': pbr})
if dy:
data.append({'종목코드': asset.ticker, '지표': 'DY', '': dy})
# PSR, PCR 계산 (옵션)
if include_psr_pcr and asset.ticker in psr_pcr_data:
ticker_fs = psr_pcr_data[asset.ticker]
market_cap = float(asset.market_cap) if asset.market_cap else None
# PSR = 시가총액 / 매출액
if market_cap and ticker_fs['매출액'] and ticker_fs['매출액'] > 0:
psr = market_cap / float(ticker_fs['매출액'])
data.append({'종목코드': asset.ticker, '지표': 'PSR', '': psr})
# PCR = 시가총액 / 영업활동현금흐름
if market_cap and ticker_fs['영업활동으로인한현금흐름'] and ticker_fs['영업활동으로인한현금흐름'] > 0:
pcr = market_cap / float(ticker_fs['영업활동으로인한현금흐름'])
data.append({'종목코드': asset.ticker, '지표': 'PCR', '': pcr})
return pd.DataFrame(data)
def calculate_value_rank(value_df: pd.DataFrame, indicators: List[str]) -> pd.Series:
"""
밸류 지표 순위 계산.
Args:
value_df: 밸류 지표 DataFrame (pivot된 형태, index=종목코드)
indicators: 순위를 계산할 지표 리스트 (예: ['PER', 'PBR'])
Returns:
종목별 최종 순위 Series
"""
# 지표가 0 이하인 경우 nan으로 변경
value_clean = value_df[indicators].copy()
value_clean[value_clean <= 0] = np.nan
# DY는 높을수록 좋은 지표이므로 역수 처리
if 'DY' in indicators:
value_clean['DY'] = 1 / value_clean['DY']
# 각 지표별 순위 계산
value_rank = value_clean.rank(axis=0)
# 순위 합산 후 재순위
value_sum = value_rank.sum(axis=1, skipna=False).rank()
return value_sum
def calculate_quality_factors(fs_list: pd.DataFrame) -> pd.DataFrame:
"""
퀄리티 팩터 계산 (ROE, GPA, CFO).
Args:
fs_list: 재무제표 DataFrame
Returns:
퀄리티 팩터 DataFrame (종목코드, ROE, GPA, CFO)
"""
if fs_list.empty:
return pd.DataFrame()
# TTM (Trailing Twelve Months) 계산
fs_list = fs_list.sort_values(['종목코드', '계정', '기준일'])
fs_list['ttm'] = fs_list.groupby(['종목코드', '계정'], as_index=False)[''].rolling(
window=4, min_periods=4
).sum()['']
fs_list_clean = fs_list.copy()
# 자산과 자본은 재무상태표 항목이므로 평균, 나머지는 합
fs_list_clean['ttm'] = np.where(
fs_list_clean['계정'].isin(['자산', '자본']),
fs_list_clean['ttm'] / 4,
fs_list_clean['ttm']
)
# 최근 데이터만 선택
fs_list_clean = fs_list_clean.groupby(['종목코드', '계정']).tail(1)
# Pivot
fs_list_pivot = fs_list_clean.pivot(index='종목코드', columns='계정', values='ttm')
# 퀄리티 지표 계산
quality_df = pd.DataFrame()
quality_df['종목코드'] = fs_list_pivot.index
# ROE = 당기순이익 / 자본
if '당기순이익' in fs_list_pivot.columns and '자본' in fs_list_pivot.columns:
quality_df['ROE'] = fs_list_pivot['당기순이익'] / fs_list_pivot['자본']
# GPA = 매출총이익 / 자산
if '매출총이익' in fs_list_pivot.columns and '자산' in fs_list_pivot.columns:
quality_df['GPA'] = fs_list_pivot['매출총이익'] / fs_list_pivot['자산']
# CFO = 영업활동현금흐름 / 자산
if '영업활동으로인한현금흐름' in fs_list_pivot.columns and '자산' in fs_list_pivot.columns:
quality_df['CFO'] = fs_list_pivot['영업활동으로인한현금흐름'] / fs_list_pivot['자산']
return quality_df