penti/backend/app/utils/data_helpers.py

329 lines
10 KiB
Python
Raw Normal View History

2026-01-31 23:30:51 +09:00
"""Data query helper functions."""
from typing import List, Dict
from decimal import Decimal
from datetime import datetime, timedelta
from sqlalchemy.orm import Session
from sqlalchemy import and_, func
import pandas as pd
import numpy as np
from app.models import Asset, PriceData, FinancialStatement
def get_ticker_list(db_session: Session) -> pd.DataFrame:
"""
종목 리스트 조회.
Args:
db_session: 데이터베이스 세션
Returns:
종목 리스트 DataFrame
"""
assets = db_session.query(Asset).filter(Asset.is_active == True).all()
data = [{
'종목코드': asset.ticker,
'종목명': asset.name,
'시장': asset.market,
'섹터': asset.sector
} for asset in assets]
return pd.DataFrame(data)
def get_price_data(
db_session: Session,
tickers: List[str],
start_date: datetime,
end_date: datetime
) -> pd.DataFrame:
"""
가격 데이터 조회.
Args:
db_session: 데이터베이스 세션
tickers: 종목 코드 리스트
start_date: 시작일
end_date: 종료일
Returns:
가격 데이터 DataFrame
"""
prices = db_session.query(PriceData).filter(
and_(
PriceData.ticker.in_(tickers),
PriceData.timestamp >= start_date,
PriceData.timestamp <= end_date
)
).all()
data = [{
'종목코드': p.ticker,
'날짜': p.timestamp,
'시가': float(p.open) if p.open else None,
'고가': float(p.high) if p.high else None,
'저가': float(p.low) if p.low else None,
'종가': float(p.close),
'거래량': p.volume
} for p in prices]
return pd.DataFrame(data)
def get_latest_price(
db_session: Session,
ticker: str,
date: datetime
) -> Decimal:
"""
특정 날짜의 최신 가격 조회 (해당 날짜 또는 이전 가장 가까운 날짜).
Args:
db_session: 데이터베이스 세션
ticker: 종목 코드
date: 조회 날짜
Returns:
가격
"""
price = db_session.query(PriceData).filter(
and_(
PriceData.ticker == ticker,
PriceData.timestamp <= date
)
).order_by(PriceData.timestamp.desc()).first()
if price:
return price.close
return Decimal("0")
def get_prices_on_date(
db_session: Session,
tickers: List[str],
date: datetime
) -> Dict[str, Decimal]:
"""
특정 날짜의 종목들 가격 조회.
Args:
db_session: 데이터베이스 세션
tickers: 종목 코드 리스트
date: 조회 날짜
Returns:
{ticker: price} 딕셔너리
"""
prices = {}
for ticker in tickers:
price = get_latest_price(db_session, ticker, date)
if price > 0:
prices[ticker] = price
return prices
def get_financial_statements(
db_session: Session,
tickers: List[str],
base_date: datetime = None
) -> pd.DataFrame:
"""
재무제표 데이터 조회.
Args:
db_session: 데이터베이스 세션
tickers: 종목 코드 리스트
base_date: 기준일 (None이면 최신 데이터)
Returns:
재무제표 DataFrame
"""
query = db_session.query(FinancialStatement).filter(
FinancialStatement.ticker.in_(tickers)
)
if base_date:
query = query.filter(FinancialStatement.base_date <= base_date)
fs_data = query.all()
data = [{
'종목코드': fs.ticker,
'계정': fs.account,
'기준일': fs.base_date,
'': float(fs.value) if fs.value else None,
'공시구분': fs.disclosure_type
} for fs in fs_data]
return pd.DataFrame(data)
def get_value_indicators(
db_session: Session,
tickers: List[str],
base_date: datetime = None,
include_psr_pcr: bool = False
) -> pd.DataFrame:
"""
밸류 지표 조회 (PER, PBR, DY, 옵션으로 PSR, PCR).
Args:
db_session: 데이터베이스 세션
tickers: 종목 코드 리스트
base_date: 기준일 (PSR, PCR 계산용, None이면 최신)
include_psr_pcr: PSR, PCR 포함 여부
Returns:
밸류 지표 DataFrame
"""
assets = db_session.query(Asset).filter(
Asset.ticker.in_(tickers)
).all()
data = []
# PSR, PCR 계산을 위한 재무제표 데이터 (필요시)
psr_pcr_data = {}
if include_psr_pcr:
fs_list = get_financial_statements(db_session, tickers, base_date)
if not fs_list.empty:
# TTM 계산
fs_list = fs_list.sort_values(['종목코드', '계정', '기준일'])
fs_list['ttm'] = fs_list.groupby(['종목코드', '계정'], as_index=False)[''].rolling(
window=4, min_periods=4
).sum()['']
fs_list_clean = fs_list.copy()
# 자산과 자본은 평균, 나머지는 합
fs_list_clean['ttm'] = np.where(
fs_list_clean['계정'].isin(['자산', '자본']),
fs_list_clean['ttm'] / 4,
fs_list_clean['ttm']
)
fs_list_clean = fs_list_clean.groupby(['종목코드', '계정']).tail(1)
# Pivot
fs_pivot = fs_list_clean.pivot(index='종목코드', columns='계정', values='ttm')
for ticker in fs_pivot.index:
psr_pcr_data[ticker] = {
'매출액': fs_pivot.loc[ticker, '매출액'] if '매출액' in fs_pivot.columns else None,
'영업활동으로인한현금흐름': fs_pivot.loc[ticker, '영업활동으로인한현금흐름'] if '영업활동으로인한현금흐름' in fs_pivot.columns else None
}
for asset in assets:
# PER 계산
per = float(asset.last_price / asset.eps) if asset.eps and asset.eps > 0 else None
# PBR 계산
pbr = float(asset.last_price / asset.bps) if asset.bps and asset.bps > 0 else None
# DY 계산 (배당수익률)
dy = float(asset.dividend_per_share / asset.last_price * 100) if asset.last_price and asset.last_price > 0 else None
# 종목별 지표 추가
if per:
data.append({'종목코드': asset.ticker, '지표': 'PER', '': per})
if pbr:
data.append({'종목코드': asset.ticker, '지표': 'PBR', '': pbr})
if dy:
data.append({'종목코드': asset.ticker, '지표': 'DY', '': dy})
# PSR, PCR 계산 (옵션)
if include_psr_pcr and asset.ticker in psr_pcr_data:
ticker_fs = psr_pcr_data[asset.ticker]
market_cap = float(asset.market_cap) if asset.market_cap else None
# PSR = 시가총액 / 매출액
if market_cap and ticker_fs['매출액'] and ticker_fs['매출액'] > 0:
psr = market_cap / float(ticker_fs['매출액'])
data.append({'종목코드': asset.ticker, '지표': 'PSR', '': psr})
# PCR = 시가총액 / 영업활동현금흐름
if market_cap and ticker_fs['영업활동으로인한현금흐름'] and ticker_fs['영업활동으로인한현금흐름'] > 0:
pcr = market_cap / float(ticker_fs['영업활동으로인한현금흐름'])
data.append({'종목코드': asset.ticker, '지표': 'PCR', '': pcr})
return pd.DataFrame(data)
def calculate_value_rank(value_df: pd.DataFrame, indicators: List[str]) -> pd.Series:
"""
밸류 지표 순위 계산.
Args:
value_df: 밸류 지표 DataFrame (pivot된 형태, index=종목코드)
indicators: 순위를 계산할 지표 리스트 (: ['PER', 'PBR'])
Returns:
종목별 최종 순위 Series
"""
# 지표가 0 이하인 경우 nan으로 변경
value_clean = value_df[indicators].copy()
value_clean[value_clean <= 0] = np.nan
# DY는 높을수록 좋은 지표이므로 역수 처리
if 'DY' in indicators:
value_clean['DY'] = 1 / value_clean['DY']
# 각 지표별 순위 계산
value_rank = value_clean.rank(axis=0)
# 순위 합산 후 재순위
value_sum = value_rank.sum(axis=1, skipna=False).rank()
return value_sum
def calculate_quality_factors(fs_list: pd.DataFrame) -> pd.DataFrame:
"""
퀄리티 팩터 계산 (ROE, GPA, CFO).
Args:
fs_list: 재무제표 DataFrame
Returns:
퀄리티 팩터 DataFrame (종목코드, ROE, GPA, CFO)
"""
if fs_list.empty:
return pd.DataFrame()
# TTM (Trailing Twelve Months) 계산
fs_list = fs_list.sort_values(['종목코드', '계정', '기준일'])
fs_list['ttm'] = fs_list.groupby(['종목코드', '계정'], as_index=False)[''].rolling(
window=4, min_periods=4
).sum()['']
fs_list_clean = fs_list.copy()
# 자산과 자본은 재무상태표 항목이므로 평균, 나머지는 합
fs_list_clean['ttm'] = np.where(
fs_list_clean['계정'].isin(['자산', '자본']),
fs_list_clean['ttm'] / 4,
fs_list_clean['ttm']
)
# 최근 데이터만 선택
fs_list_clean = fs_list_clean.groupby(['종목코드', '계정']).tail(1)
# Pivot
fs_list_pivot = fs_list_clean.pivot(index='종목코드', columns='계정', values='ttm')
# 퀄리티 지표 계산
quality_df = pd.DataFrame()
quality_df['종목코드'] = fs_list_pivot.index
# ROE = 당기순이익 / 자본
if '당기순이익' in fs_list_pivot.columns and '자본' in fs_list_pivot.columns:
quality_df['ROE'] = fs_list_pivot['당기순이익'] / fs_list_pivot['자본']
# GPA = 매출총이익 / 자산
if '매출총이익' in fs_list_pivot.columns and '자산' in fs_list_pivot.columns:
quality_df['GPA'] = fs_list_pivot['매출총이익'] / fs_list_pivot['자산']
# CFO = 영업활동현금흐름 / 자산
if '영업활동으로인한현금흐름' in fs_list_pivot.columns and '자산' in fs_list_pivot.columns:
quality_df['CFO'] = fs_list_pivot['영업활동으로인한현금흐름'] / fs_list_pivot['자산']
return quality_df