""" Factor calculation service for quant strategies. """ from decimal import Decimal from typing import Dict, List, Optional from datetime import date, timedelta import pandas as pd from sqlalchemy.orm import Session from sqlalchemy import func from app.models.stock import Stock, Valuation, Price, Financial, Sector class FactorCalculator: """Calculates factor scores for stocks.""" def __init__(self, db: Session): self.db = db def get_universe( self, markets: List[str] = None, min_market_cap: int = None, max_market_cap: int = None, exclude_stock_types: List[str] = None, exclude_sectors: List[str] = None, ) -> List[Stock]: """Get filtered stock universe.""" query = self.db.query(Stock) if markets: query = query.filter(Stock.market.in_(markets)) if min_market_cap: # market_cap is in won, min_market_cap is in 억원 query = query.filter(Stock.market_cap >= min_market_cap * 100_000_000) if max_market_cap: query = query.filter(Stock.market_cap <= max_market_cap * 100_000_000) if exclude_stock_types: query = query.filter(~Stock.stock_type.in_(exclude_stock_types)) stocks = query.all() # Filter by sector if needed if exclude_sectors: sector_tickers = ( self.db.query(Sector.ticker) .filter(Sector.sector_name.in_(exclude_sectors)) .all() ) excluded = {t[0] for t in sector_tickers} stocks = [s for s in stocks if s.ticker not in excluded] return stocks def get_valuations(self, tickers: List[str], base_date: date = None) -> Dict[str, Valuation]: """Get latest valuations for tickers.""" if base_date: valuations = ( self.db.query(Valuation) .filter(Valuation.ticker.in_(tickers)) .filter(Valuation.base_date <= base_date) .order_by(Valuation.base_date.desc()) .all() ) else: valuations = ( self.db.query(Valuation) .filter(Valuation.ticker.in_(tickers)) .all() ) # Get latest per ticker result = {} for v in valuations: if v.ticker not in result: result[v.ticker] = v return result def get_sectors(self, tickers: List[str]) -> Dict[str, str]: """Get sector names for tickers.""" sectors = ( self.db.query(Sector) .filter(Sector.ticker.in_(tickers)) .all() ) return {s.ticker: s.sector_name for s in sectors} def calculate_momentum( self, tickers: List[str], base_date: date = None, months: int = 12, skip_recent: int = 1, ) -> Dict[str, Decimal]: """Calculate price momentum.""" if base_date is None: base_date = date.today() start_date = base_date - timedelta(days=months * 30) skip_date = base_date - timedelta(days=skip_recent * 30) # Get prices prices = ( self.db.query(Price) .filter(Price.ticker.in_(tickers)) .filter(Price.date >= start_date) .filter(Price.date <= base_date) .all() ) # Group by ticker ticker_prices = {} for p in prices: if p.ticker not in ticker_prices: ticker_prices[p.ticker] = [] ticker_prices[p.ticker].append((p.date, float(p.close))) # Calculate returns momentum = {} for ticker, price_list in ticker_prices.items(): if len(price_list) < 2: continue price_list.sort(key=lambda x: x[0]) # Find start price start_price = price_list[0][1] # Find end price (skip recent month if specified) end_price = None for d, p in reversed(price_list): if d <= skip_date: end_price = p break if end_price and start_price > 0: momentum[ticker] = Decimal(str((end_price - start_price) / start_price * 100)) return momentum def calculate_value_scores( self, valuations: Dict[str, Valuation], ) -> Dict[str, Decimal]: """Calculate value factor scores (higher is cheaper/better).""" data = [] for ticker, v in valuations.items(): # Inverse of PER, PBR, etc. (lower ratio = higher score) per_inv = 1 / float(v.per) if v.per and float(v.per) > 0 else 0 pbr_inv = 1 / float(v.pbr) if v.pbr and float(v.pbr) > 0 else 0 psr_inv = 1 / float(v.psr) if v.psr and float(v.psr) > 0 else 0 pcr_inv = 1 / float(v.pcr) if v.pcr and float(v.pcr) > 0 else 0 div_yield = float(v.dividend_yield) if v.dividend_yield else 0 data.append({ 'ticker': ticker, 'per_inv': per_inv, 'pbr_inv': pbr_inv, 'psr_inv': psr_inv, 'pcr_inv': pcr_inv, 'div_yield': div_yield, }) if not data: return {} df = pd.DataFrame(data) # Z-score normalization for each metric for col in ['per_inv', 'pbr_inv', 'psr_inv', 'pcr_inv', 'div_yield']: mean = df[col].mean() std = df[col].std() if std > 0: df[f'{col}_z'] = (df[col] - mean) / std else: df[f'{col}_z'] = 0 # Composite value score (equal weight) df['value_score'] = ( df['per_inv_z'] + df['pbr_inv_z'] + df['psr_inv_z'] + df['pcr_inv_z'] + df['div_yield_z'] ) / 5 return {row['ticker']: Decimal(str(row['value_score'])) for _, row in df.iterrows()} def calculate_quality_scores( self, tickers: List[str], base_date: date = None, ) -> Dict[str, Decimal]: """Calculate quality factor scores based on ROE, GP/A, etc.""" # Get financial data financials = ( self.db.query(Financial) .filter(Financial.ticker.in_(tickers)) .filter(Financial.report_type == 'annual') .all() ) # Group by ticker ticker_financials = {} for f in financials: if f.ticker not in ticker_financials: ticker_financials[f.ticker] = {} ticker_financials[f.ticker][f.account] = float(f.value) if f.value else 0 data = [] for ticker, fin in ticker_financials.items(): total_equity = fin.get('total_equity', 0) total_assets = fin.get('total_assets', 0) net_income = fin.get('net_income', 0) gross_profit = fin.get('gross_profit', 0) operating_cf = fin.get('operating_cash_flow', 0) total_liabilities = fin.get('total_liabilities', 0) roe = net_income / total_equity if total_equity > 0 else 0 gpa = gross_profit / total_assets if total_assets > 0 else 0 cfo_a = operating_cf / total_assets if total_assets > 0 else 0 debt_ratio_inv = 1 / (total_liabilities / total_equity) if total_equity > 0 and total_liabilities > 0 else 0 data.append({ 'ticker': ticker, 'roe': roe, 'gpa': gpa, 'cfo_a': cfo_a, 'debt_ratio_inv': debt_ratio_inv, }) if not data: return {} df = pd.DataFrame(data) # Z-score normalization for col in ['roe', 'gpa', 'cfo_a', 'debt_ratio_inv']: mean = df[col].mean() std = df[col].std() if std > 0: df[f'{col}_z'] = (df[col] - mean) / std else: df[f'{col}_z'] = 0 # Composite quality score df['quality_score'] = (df['roe_z'] + df['gpa_z'] + df['cfo_a_z'] + df['debt_ratio_inv_z']) / 4 return {row['ticker']: Decimal(str(row['quality_score'])) for _, row in df.iterrows()} def calculate_fscore( self, tickers: List[str], ) -> Dict[str, int]: """Calculate Piotroski F-Score (0-9).""" # Get financial data for current and previous year financials = ( self.db.query(Financial) .filter(Financial.ticker.in_(tickers)) .filter(Financial.report_type == 'annual') .all() ) # Group by ticker and date ticker_data = {} for f in financials: key = (f.ticker, f.base_date) if key not in ticker_data: ticker_data[key] = {} ticker_data[key][f.account] = float(f.value) if f.value else 0 fscores = {} for ticker in tickers: # Get latest two years ticker_years = sorted( [(k, v) for k, v in ticker_data.items() if k[0] == ticker], key=lambda x: x[0][1], reverse=True )[:2] if len(ticker_years) < 2: fscores[ticker] = 0 continue curr = ticker_years[0][1] prev = ticker_years[1][1] score = 0 # Profitability (4 points) # 1. ROA > 0 ta = curr.get('total_assets', 1) ni = curr.get('net_income', 0) if ta > 0 and ni / ta > 0: score += 1 # 2. CFO > 0 cfo = curr.get('operating_cash_flow', 0) if cfo > 0: score += 1 # 3. ROA increased prev_ta = prev.get('total_assets', 1) prev_ni = prev.get('net_income', 0) if ta > 0 and prev_ta > 0: if ni / ta > prev_ni / prev_ta: score += 1 # 4. CFO > Net Income (Accrual) if cfo > ni: score += 1 # Leverage (3 points) # 5. Leverage decreased tl = curr.get('total_liabilities', 0) prev_tl = prev.get('total_liabilities', 0) if ta > 0 and prev_ta > 0: if tl / ta < prev_tl / prev_ta: score += 1 # 6. Liquidity increased ca = curr.get('current_assets', 0) cl = curr.get('current_liabilities', 1) prev_ca = prev.get('current_assets', 0) prev_cl = prev.get('current_liabilities', 1) if cl > 0 and prev_cl > 0: if ca / cl > prev_ca / prev_cl: score += 1 # 7. No new equity issued (simplified: equity increase <= net income) te = curr.get('total_equity', 0) prev_te = prev.get('total_equity', 0) if te - prev_te <= ni: score += 1 # Operating Efficiency (2 points) # 8. Gross margin improved rev = curr.get('revenue', 1) gp = curr.get('gross_profit', 0) prev_rev = prev.get('revenue', 1) prev_gp = prev.get('gross_profit', 0) if rev > 0 and prev_rev > 0: if gp / rev > prev_gp / prev_rev: score += 1 # 9. Asset turnover improved if ta > 0 and prev_ta > 0: if rev / ta > prev_rev / prev_ta: score += 1 fscores[ticker] = score return fscores