From 9eebc73390018f25301783944ea868bb2389a64b Mon Sep 17 00:00:00 2001 From: zephyrdark Date: Tue, 3 Feb 2026 08:58:00 +0900 Subject: [PATCH] feat: add factor calculator service Co-Authored-By: Claude Opus 4.5 --- backend/app/services/factor_calculator.py | 351 ++++++++++++++++++++++ 1 file changed, 351 insertions(+) create mode 100644 backend/app/services/factor_calculator.py diff --git a/backend/app/services/factor_calculator.py b/backend/app/services/factor_calculator.py new file mode 100644 index 0000000..8b6ba73 --- /dev/null +++ b/backend/app/services/factor_calculator.py @@ -0,0 +1,351 @@ +""" +Factor calculation service for quant strategies. +""" +from decimal import Decimal +from typing import Dict, List, Optional +from datetime import date, timedelta + +import pandas as pd +from sqlalchemy.orm import Session +from sqlalchemy import func + +from app.models.stock import Stock, Valuation, Price, Financial, Sector + + +class FactorCalculator: + """Calculates factor scores for stocks.""" + + def __init__(self, db: Session): + self.db = db + + def get_universe( + self, + markets: List[str] = None, + min_market_cap: int = None, + max_market_cap: int = None, + exclude_stock_types: List[str] = None, + exclude_sectors: List[str] = None, + ) -> List[Stock]: + """Get filtered stock universe.""" + query = self.db.query(Stock) + + if markets: + query = query.filter(Stock.market.in_(markets)) + if min_market_cap: + # market_cap is in won, min_market_cap is in 억원 + query = query.filter(Stock.market_cap >= min_market_cap * 100_000_000) + if max_market_cap: + query = query.filter(Stock.market_cap <= max_market_cap * 100_000_000) + if exclude_stock_types: + query = query.filter(~Stock.stock_type.in_(exclude_stock_types)) + + stocks = query.all() + + # Filter by sector if needed + if exclude_sectors: + sector_tickers = ( + self.db.query(Sector.ticker) + .filter(Sector.sector_name.in_(exclude_sectors)) + .all() + ) + excluded = {t[0] for t in sector_tickers} + stocks = [s for s in stocks if s.ticker not in excluded] + + return stocks + + def get_valuations(self, tickers: List[str], base_date: date = None) -> Dict[str, Valuation]: + """Get latest valuations for tickers.""" + if base_date: + valuations = ( + self.db.query(Valuation) + .filter(Valuation.ticker.in_(tickers)) + .filter(Valuation.base_date <= base_date) + .order_by(Valuation.base_date.desc()) + .all() + ) + else: + valuations = ( + self.db.query(Valuation) + .filter(Valuation.ticker.in_(tickers)) + .all() + ) + + # Get latest per ticker + result = {} + for v in valuations: + if v.ticker not in result: + result[v.ticker] = v + return result + + def get_sectors(self, tickers: List[str]) -> Dict[str, str]: + """Get sector names for tickers.""" + sectors = ( + self.db.query(Sector) + .filter(Sector.ticker.in_(tickers)) + .all() + ) + return {s.ticker: s.sector_name for s in sectors} + + def calculate_momentum( + self, + tickers: List[str], + base_date: date = None, + months: int = 12, + skip_recent: int = 1, + ) -> Dict[str, Decimal]: + """Calculate price momentum.""" + if base_date is None: + base_date = date.today() + + start_date = base_date - timedelta(days=months * 30) + skip_date = base_date - timedelta(days=skip_recent * 30) + + # Get prices + prices = ( + self.db.query(Price) + .filter(Price.ticker.in_(tickers)) + .filter(Price.date >= start_date) + .filter(Price.date <= base_date) + .all() + ) + + # Group by ticker + ticker_prices = {} + for p in prices: + if p.ticker not in ticker_prices: + ticker_prices[p.ticker] = [] + ticker_prices[p.ticker].append((p.date, float(p.close))) + + # Calculate returns + momentum = {} + for ticker, price_list in ticker_prices.items(): + if len(price_list) < 2: + continue + + price_list.sort(key=lambda x: x[0]) + + # Find start price + start_price = price_list[0][1] + + # Find end price (skip recent month if specified) + end_price = None + for d, p in reversed(price_list): + if d <= skip_date: + end_price = p + break + + if end_price and start_price > 0: + momentum[ticker] = Decimal(str((end_price - start_price) / start_price * 100)) + + return momentum + + def calculate_value_scores( + self, + valuations: Dict[str, Valuation], + ) -> Dict[str, Decimal]: + """Calculate value factor scores (higher is cheaper/better).""" + data = [] + for ticker, v in valuations.items(): + # Inverse of PER, PBR, etc. (lower ratio = higher score) + per_inv = 1 / float(v.per) if v.per and float(v.per) > 0 else 0 + pbr_inv = 1 / float(v.pbr) if v.pbr and float(v.pbr) > 0 else 0 + psr_inv = 1 / float(v.psr) if v.psr and float(v.psr) > 0 else 0 + pcr_inv = 1 / float(v.pcr) if v.pcr and float(v.pcr) > 0 else 0 + div_yield = float(v.dividend_yield) if v.dividend_yield else 0 + + data.append({ + 'ticker': ticker, + 'per_inv': per_inv, + 'pbr_inv': pbr_inv, + 'psr_inv': psr_inv, + 'pcr_inv': pcr_inv, + 'div_yield': div_yield, + }) + + if not data: + return {} + + df = pd.DataFrame(data) + + # Z-score normalization for each metric + for col in ['per_inv', 'pbr_inv', 'psr_inv', 'pcr_inv', 'div_yield']: + mean = df[col].mean() + std = df[col].std() + if std > 0: + df[f'{col}_z'] = (df[col] - mean) / std + else: + df[f'{col}_z'] = 0 + + # Composite value score (equal weight) + df['value_score'] = ( + df['per_inv_z'] + df['pbr_inv_z'] + df['psr_inv_z'] + + df['pcr_inv_z'] + df['div_yield_z'] + ) / 5 + + return {row['ticker']: Decimal(str(row['value_score'])) for _, row in df.iterrows()} + + def calculate_quality_scores( + self, + tickers: List[str], + base_date: date = None, + ) -> Dict[str, Decimal]: + """Calculate quality factor scores based on ROE, GP/A, etc.""" + # Get financial data + financials = ( + self.db.query(Financial) + .filter(Financial.ticker.in_(tickers)) + .filter(Financial.report_type == 'annual') + .all() + ) + + # Group by ticker + ticker_financials = {} + for f in financials: + if f.ticker not in ticker_financials: + ticker_financials[f.ticker] = {} + ticker_financials[f.ticker][f.account] = float(f.value) if f.value else 0 + + data = [] + for ticker, fin in ticker_financials.items(): + total_equity = fin.get('total_equity', 0) + total_assets = fin.get('total_assets', 0) + net_income = fin.get('net_income', 0) + gross_profit = fin.get('gross_profit', 0) + operating_cf = fin.get('operating_cash_flow', 0) + total_liabilities = fin.get('total_liabilities', 0) + + roe = net_income / total_equity if total_equity > 0 else 0 + gpa = gross_profit / total_assets if total_assets > 0 else 0 + cfo_a = operating_cf / total_assets if total_assets > 0 else 0 + debt_ratio_inv = 1 / (total_liabilities / total_equity) if total_equity > 0 and total_liabilities > 0 else 0 + + data.append({ + 'ticker': ticker, + 'roe': roe, + 'gpa': gpa, + 'cfo_a': cfo_a, + 'debt_ratio_inv': debt_ratio_inv, + }) + + if not data: + return {} + + df = pd.DataFrame(data) + + # Z-score normalization + for col in ['roe', 'gpa', 'cfo_a', 'debt_ratio_inv']: + mean = df[col].mean() + std = df[col].std() + if std > 0: + df[f'{col}_z'] = (df[col] - mean) / std + else: + df[f'{col}_z'] = 0 + + # Composite quality score + df['quality_score'] = (df['roe_z'] + df['gpa_z'] + df['cfo_a_z'] + df['debt_ratio_inv_z']) / 4 + + return {row['ticker']: Decimal(str(row['quality_score'])) for _, row in df.iterrows()} + + def calculate_fscore( + self, + tickers: List[str], + ) -> Dict[str, int]: + """Calculate Piotroski F-Score (0-9).""" + # Get financial data for current and previous year + financials = ( + self.db.query(Financial) + .filter(Financial.ticker.in_(tickers)) + .filter(Financial.report_type == 'annual') + .all() + ) + + # Group by ticker and date + ticker_data = {} + for f in financials: + key = (f.ticker, f.base_date) + if key not in ticker_data: + ticker_data[key] = {} + ticker_data[key][f.account] = float(f.value) if f.value else 0 + + fscores = {} + for ticker in tickers: + # Get latest two years + ticker_years = sorted( + [(k, v) for k, v in ticker_data.items() if k[0] == ticker], + key=lambda x: x[0][1], + reverse=True + )[:2] + + if len(ticker_years) < 2: + fscores[ticker] = 0 + continue + + curr = ticker_years[0][1] + prev = ticker_years[1][1] + + score = 0 + + # Profitability (4 points) + # 1. ROA > 0 + ta = curr.get('total_assets', 1) + ni = curr.get('net_income', 0) + if ta > 0 and ni / ta > 0: + score += 1 + + # 2. CFO > 0 + cfo = curr.get('operating_cash_flow', 0) + if cfo > 0: + score += 1 + + # 3. ROA increased + prev_ta = prev.get('total_assets', 1) + prev_ni = prev.get('net_income', 0) + if ta > 0 and prev_ta > 0: + if ni / ta > prev_ni / prev_ta: + score += 1 + + # 4. CFO > Net Income (Accrual) + if cfo > ni: + score += 1 + + # Leverage (3 points) + # 5. Leverage decreased + tl = curr.get('total_liabilities', 0) + prev_tl = prev.get('total_liabilities', 0) + if ta > 0 and prev_ta > 0: + if tl / ta < prev_tl / prev_ta: + score += 1 + + # 6. Liquidity increased + ca = curr.get('current_assets', 0) + cl = curr.get('current_liabilities', 1) + prev_ca = prev.get('current_assets', 0) + prev_cl = prev.get('current_liabilities', 1) + if cl > 0 and prev_cl > 0: + if ca / cl > prev_ca / prev_cl: + score += 1 + + # 7. No new equity issued (simplified: equity increase <= net income) + te = curr.get('total_equity', 0) + prev_te = prev.get('total_equity', 0) + if te - prev_te <= ni: + score += 1 + + # Operating Efficiency (2 points) + # 8. Gross margin improved + rev = curr.get('revenue', 1) + gp = curr.get('gross_profit', 0) + prev_rev = prev.get('revenue', 1) + prev_gp = prev.get('gross_profit', 0) + if rev > 0 and prev_rev > 0: + if gp / rev > prev_gp / prev_rev: + score += 1 + + # 9. Asset turnover improved + if ta > 0 and prev_ta > 0: + if rev / ta > prev_rev / prev_ta: + score += 1 + + fscores[ticker] = score + + return fscores