"""Multi-Factor Strategy (Quality + Value + Momentum).""" from typing import List, Dict from decimal import Decimal from datetime import datetime, timedelta from sqlalchemy.orm import Session import pandas as pd import numpy as np from scipy.stats import zscore import statsmodels.api as sm from app.strategies.base import BaseStrategy from app.utils.data_helpers import ( get_ticker_list, get_price_data, get_financial_statements, get_value_indicators, get_prices_on_date, calculate_quality_factors ) def col_clean(df, cutoff=0.01, asc=False): """ 각 섹터별 아웃라이어를 제거한 후 순위와 z-score를 구하는 함수. Args: df: 데이터프레임 cutoff: 제거할 이상치 비율 asc: 오름차순 여부 Returns: z-score DataFrame """ q_low = df.quantile(cutoff) q_hi = df.quantile(1 - cutoff) # 이상치 데이터 제거 df_trim = df[(df > q_low) & (df < q_hi)] df_z_score = df_trim.rank(axis=0, ascending=asc).apply( zscore, nan_policy='omit') return df_z_score class MultiFactorStrategy(BaseStrategy): """ 멀티 팩터 전략. - 퀄리티: ROE, GPA, CFO - 밸류: PER, PBR, PSR, PCR, DY - 모멘텀: 12개월 수익률, K-Ratio """ def __init__(self, config: Dict = None): """ 초기화. Args: config: 전략 설정 - count: 선정 종목 수 (기본 20) - quality_weight: 퀄리티 가중치 (기본 0.3) - value_weight: 밸류 가중치 (기본 0.3) - momentum_weight: 모멘텀 가중치 (기본 0.4) """ super().__init__(config) self.count = config.get('count', 20) self.quality_weight = config.get('quality_weight', 0.3) self.value_weight = config.get('value_weight', 0.3) self.momentum_weight = config.get('momentum_weight', 0.4) def select_stocks(self, rebal_date: datetime, db_session: Session) -> List[str]: """ 종목 선정. Args: rebal_date: 리밸런싱 날짜 db_session: 데이터베이스 세션 Returns: 선정된 종목 코드 리스트 """ try: # 1. 종목 리스트 조회 ticker_list = get_ticker_list(db_session) if ticker_list.empty: return [] tickers = ticker_list['종목코드'].tolist() # 2. 재무제표 데이터 조회 fs_list = get_financial_statements(db_session, tickers, rebal_date) if fs_list.empty: return [] # 3. 퀄리티 지표 계산 quality_df = calculate_quality_factors(fs_list) # 4. 밸류 지표 조회 value_list = get_value_indicators(db_session, tickers) # 5. 모멘텀 지표 계산 momentum_df = self._calculate_momentum_factors( db_session, tickers, rebal_date ) # 6. 모든 지표 병합 data_bind = ticker_list[['종목코드', '종목명', '섹터']].copy() data_bind.loc[data_bind['섹터'].isnull(), '섹터'] = '기타' # 퀄리티 병합 if not quality_df.empty: data_bind = data_bind.merge(quality_df, on='종목코드', how='left') # 밸류 병합 if not value_list.empty: value_pivot = value_list.pivot(index='종목코드', columns='지표', values='값') data_bind = data_bind.merge(value_pivot, on='종목코드', how='left') # 모멘텀 병합 if not momentum_df.empty: data_bind = data_bind.merge(momentum_df, on='종목코드', how='left') # 7. 섹터별 z-score 계산 data_bind_group = data_bind.set_index(['종목코드', '섹터']).groupby('섹터', as_index=False) # 퀄리티 z-score z_quality = data_bind_group[['ROE', 'GPA', 'CFO']].apply( lambda x: col_clean(x, 0.01, False) ).sum(axis=1, skipna=False).to_frame('z_quality') data_bind = data_bind.merge(z_quality, how='left', on=['종목코드', '섹터']) # 밸류 z-score value_cols = [col for col in ['PER', 'PBR', 'DY'] if col in data_bind.columns] if value_cols: value_1 = data_bind_group[value_cols].apply(lambda x: col_clean(x, 0.01, True)) value_2 = data_bind_group[['DY']].apply(lambda x: col_clean(x, 0.01, False)) if 'DY' in data_bind.columns else None if value_2 is not None: z_value = value_1.merge(value_2, on=['종목코드', '섹터']).sum(axis=1, skipna=False).to_frame('z_value') else: z_value = value_1.sum(axis=1, skipna=False).to_frame('z_value') data_bind = data_bind.merge(z_value, how='left', on=['종목코드', '섹터']) # 모멘텀 z-score momentum_cols = [col for col in ['12M', 'K_ratio'] if col in data_bind.columns] if momentum_cols: z_momentum = data_bind_group[momentum_cols].apply( lambda x: col_clean(x, 0.01, False) ).sum(axis=1, skipna=False).to_frame('z_momentum') data_bind = data_bind.merge(z_momentum, how='left', on=['종목코드', '섹터']) # 8. 최종 z-score 정규화 및 가중치 적용 factor_cols = [col for col in ['z_quality', 'z_value', 'z_momentum'] if col in data_bind.columns] if not factor_cols: return [] data_bind_final = data_bind[['종목코드'] + factor_cols].set_index('종목코드').apply( zscore, nan_policy='omit' ) data_bind_final.columns = ['quality', 'value', 'momentum'][:len(factor_cols)] # 가중치 적용 weights = [self.quality_weight, self.value_weight, self.momentum_weight][:len(factor_cols)] data_bind_final_sum = (data_bind_final * weights).sum(axis=1, skipna=False).to_frame('qvm') # 최종 병합 port_qvm = data_bind.merge(data_bind_final_sum, on='종목코드') # 상위 N개 종목 선정 port_qvm = port_qvm.dropna(subset=['qvm']) port_qvm = port_qvm.nlargest(self.count, 'qvm') return port_qvm['종목코드'].tolist() except Exception as e: print(f"Multi-Factor 종목 선정 오류: {e}") return [] def _calculate_momentum_factors( self, db_session: Session, tickers: List[str], rebal_date: datetime ) -> pd.DataFrame: """ 모멘텀 지표 계산 (12개월 수익률, K-Ratio). Args: db_session: 데이터베이스 세션 tickers: 종목 코드 리스트 rebal_date: 리밸런싱 날짜 Returns: 모멘텀 지표 DataFrame """ # 12개월 전 날짜 start_date = rebal_date - timedelta(days=365) # 가격 데이터 조회 price_list = get_price_data(db_session, tickers, start_date, rebal_date) if price_list.empty: return pd.DataFrame() price_pivot = price_list.pivot(index='날짜', columns='종목코드', values='종가') # 12개월 수익률 ret_list = pd.DataFrame( data=(price_pivot.iloc[-1] / price_pivot.iloc[0]) - 1, columns=['12M'] ) # K-Ratio 계산 ret = price_pivot.pct_change().iloc[1:] ret_cum = np.log(1 + ret).cumsum() x = np.array(range(len(ret))) k_ratio = {} for ticker in tickers: try: if ticker in price_pivot.columns: y = ret_cum[ticker] reg = sm.OLS(y, x).fit() res = float(reg.params / reg.bse) k_ratio[ticker] = res except: k_ratio[ticker] = np.nan k_ratio_bind = pd.DataFrame.from_dict(k_ratio, orient='index').reset_index() k_ratio_bind.columns = ['종목코드', 'K_ratio'] # 병합 momentum_df = ret_list.merge(k_ratio_bind, on='종목코드', how='outer') return momentum_df def get_prices( self, tickers: List[str], date: datetime, db_session: Session ) -> Dict[str, Decimal]: """ 종목 가격 조회. Args: tickers: 종목 코드 리스트 date: 조회 날짜 db_session: 데이터베이스 세션 Returns: {ticker: price} 딕셔너리 """ return get_prices_on_date(db_session, tickers, date)