galaxis-po/backend/app/services/factor_calculator.py
zephyrdark 9eebc73390 feat: add factor calculator service
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-03 08:58:00 +09:00

352 lines
11 KiB
Python

"""
Factor calculation service for quant strategies.
"""
from decimal import Decimal
from typing import Dict, List, Optional
from datetime import date, timedelta
import pandas as pd
from sqlalchemy.orm import Session
from sqlalchemy import func
from app.models.stock import Stock, Valuation, Price, Financial, Sector
class FactorCalculator:
"""Calculates factor scores for stocks."""
def __init__(self, db: Session):
self.db = db
def get_universe(
self,
markets: List[str] = None,
min_market_cap: int = None,
max_market_cap: int = None,
exclude_stock_types: List[str] = None,
exclude_sectors: List[str] = None,
) -> List[Stock]:
"""Get filtered stock universe."""
query = self.db.query(Stock)
if markets:
query = query.filter(Stock.market.in_(markets))
if min_market_cap:
# market_cap is in won, min_market_cap is in 억원
query = query.filter(Stock.market_cap >= min_market_cap * 100_000_000)
if max_market_cap:
query = query.filter(Stock.market_cap <= max_market_cap * 100_000_000)
if exclude_stock_types:
query = query.filter(~Stock.stock_type.in_(exclude_stock_types))
stocks = query.all()
# Filter by sector if needed
if exclude_sectors:
sector_tickers = (
self.db.query(Sector.ticker)
.filter(Sector.sector_name.in_(exclude_sectors))
.all()
)
excluded = {t[0] for t in sector_tickers}
stocks = [s for s in stocks if s.ticker not in excluded]
return stocks
def get_valuations(self, tickers: List[str], base_date: date = None) -> Dict[str, Valuation]:
"""Get latest valuations for tickers."""
if base_date:
valuations = (
self.db.query(Valuation)
.filter(Valuation.ticker.in_(tickers))
.filter(Valuation.base_date <= base_date)
.order_by(Valuation.base_date.desc())
.all()
)
else:
valuations = (
self.db.query(Valuation)
.filter(Valuation.ticker.in_(tickers))
.all()
)
# Get latest per ticker
result = {}
for v in valuations:
if v.ticker not in result:
result[v.ticker] = v
return result
def get_sectors(self, tickers: List[str]) -> Dict[str, str]:
"""Get sector names for tickers."""
sectors = (
self.db.query(Sector)
.filter(Sector.ticker.in_(tickers))
.all()
)
return {s.ticker: s.sector_name for s in sectors}
def calculate_momentum(
self,
tickers: List[str],
base_date: date = None,
months: int = 12,
skip_recent: int = 1,
) -> Dict[str, Decimal]:
"""Calculate price momentum."""
if base_date is None:
base_date = date.today()
start_date = base_date - timedelta(days=months * 30)
skip_date = base_date - timedelta(days=skip_recent * 30)
# Get prices
prices = (
self.db.query(Price)
.filter(Price.ticker.in_(tickers))
.filter(Price.date >= start_date)
.filter(Price.date <= base_date)
.all()
)
# Group by ticker
ticker_prices = {}
for p in prices:
if p.ticker not in ticker_prices:
ticker_prices[p.ticker] = []
ticker_prices[p.ticker].append((p.date, float(p.close)))
# Calculate returns
momentum = {}
for ticker, price_list in ticker_prices.items():
if len(price_list) < 2:
continue
price_list.sort(key=lambda x: x[0])
# Find start price
start_price = price_list[0][1]
# Find end price (skip recent month if specified)
end_price = None
for d, p in reversed(price_list):
if d <= skip_date:
end_price = p
break
if end_price and start_price > 0:
momentum[ticker] = Decimal(str((end_price - start_price) / start_price * 100))
return momentum
def calculate_value_scores(
self,
valuations: Dict[str, Valuation],
) -> Dict[str, Decimal]:
"""Calculate value factor scores (higher is cheaper/better)."""
data = []
for ticker, v in valuations.items():
# Inverse of PER, PBR, etc. (lower ratio = higher score)
per_inv = 1 / float(v.per) if v.per and float(v.per) > 0 else 0
pbr_inv = 1 / float(v.pbr) if v.pbr and float(v.pbr) > 0 else 0
psr_inv = 1 / float(v.psr) if v.psr and float(v.psr) > 0 else 0
pcr_inv = 1 / float(v.pcr) if v.pcr and float(v.pcr) > 0 else 0
div_yield = float(v.dividend_yield) if v.dividend_yield else 0
data.append({
'ticker': ticker,
'per_inv': per_inv,
'pbr_inv': pbr_inv,
'psr_inv': psr_inv,
'pcr_inv': pcr_inv,
'div_yield': div_yield,
})
if not data:
return {}
df = pd.DataFrame(data)
# Z-score normalization for each metric
for col in ['per_inv', 'pbr_inv', 'psr_inv', 'pcr_inv', 'div_yield']:
mean = df[col].mean()
std = df[col].std()
if std > 0:
df[f'{col}_z'] = (df[col] - mean) / std
else:
df[f'{col}_z'] = 0
# Composite value score (equal weight)
df['value_score'] = (
df['per_inv_z'] + df['pbr_inv_z'] + df['psr_inv_z'] +
df['pcr_inv_z'] + df['div_yield_z']
) / 5
return {row['ticker']: Decimal(str(row['value_score'])) for _, row in df.iterrows()}
def calculate_quality_scores(
self,
tickers: List[str],
base_date: date = None,
) -> Dict[str, Decimal]:
"""Calculate quality factor scores based on ROE, GP/A, etc."""
# Get financial data
financials = (
self.db.query(Financial)
.filter(Financial.ticker.in_(tickers))
.filter(Financial.report_type == 'annual')
.all()
)
# Group by ticker
ticker_financials = {}
for f in financials:
if f.ticker not in ticker_financials:
ticker_financials[f.ticker] = {}
ticker_financials[f.ticker][f.account] = float(f.value) if f.value else 0
data = []
for ticker, fin in ticker_financials.items():
total_equity = fin.get('total_equity', 0)
total_assets = fin.get('total_assets', 0)
net_income = fin.get('net_income', 0)
gross_profit = fin.get('gross_profit', 0)
operating_cf = fin.get('operating_cash_flow', 0)
total_liabilities = fin.get('total_liabilities', 0)
roe = net_income / total_equity if total_equity > 0 else 0
gpa = gross_profit / total_assets if total_assets > 0 else 0
cfo_a = operating_cf / total_assets if total_assets > 0 else 0
debt_ratio_inv = 1 / (total_liabilities / total_equity) if total_equity > 0 and total_liabilities > 0 else 0
data.append({
'ticker': ticker,
'roe': roe,
'gpa': gpa,
'cfo_a': cfo_a,
'debt_ratio_inv': debt_ratio_inv,
})
if not data:
return {}
df = pd.DataFrame(data)
# Z-score normalization
for col in ['roe', 'gpa', 'cfo_a', 'debt_ratio_inv']:
mean = df[col].mean()
std = df[col].std()
if std > 0:
df[f'{col}_z'] = (df[col] - mean) / std
else:
df[f'{col}_z'] = 0
# Composite quality score
df['quality_score'] = (df['roe_z'] + df['gpa_z'] + df['cfo_a_z'] + df['debt_ratio_inv_z']) / 4
return {row['ticker']: Decimal(str(row['quality_score'])) for _, row in df.iterrows()}
def calculate_fscore(
self,
tickers: List[str],
) -> Dict[str, int]:
"""Calculate Piotroski F-Score (0-9)."""
# Get financial data for current and previous year
financials = (
self.db.query(Financial)
.filter(Financial.ticker.in_(tickers))
.filter(Financial.report_type == 'annual')
.all()
)
# Group by ticker and date
ticker_data = {}
for f in financials:
key = (f.ticker, f.base_date)
if key not in ticker_data:
ticker_data[key] = {}
ticker_data[key][f.account] = float(f.value) if f.value else 0
fscores = {}
for ticker in tickers:
# Get latest two years
ticker_years = sorted(
[(k, v) for k, v in ticker_data.items() if k[0] == ticker],
key=lambda x: x[0][1],
reverse=True
)[:2]
if len(ticker_years) < 2:
fscores[ticker] = 0
continue
curr = ticker_years[0][1]
prev = ticker_years[1][1]
score = 0
# Profitability (4 points)
# 1. ROA > 0
ta = curr.get('total_assets', 1)
ni = curr.get('net_income', 0)
if ta > 0 and ni / ta > 0:
score += 1
# 2. CFO > 0
cfo = curr.get('operating_cash_flow', 0)
if cfo > 0:
score += 1
# 3. ROA increased
prev_ta = prev.get('total_assets', 1)
prev_ni = prev.get('net_income', 0)
if ta > 0 and prev_ta > 0:
if ni / ta > prev_ni / prev_ta:
score += 1
# 4. CFO > Net Income (Accrual)
if cfo > ni:
score += 1
# Leverage (3 points)
# 5. Leverage decreased
tl = curr.get('total_liabilities', 0)
prev_tl = prev.get('total_liabilities', 0)
if ta > 0 and prev_ta > 0:
if tl / ta < prev_tl / prev_ta:
score += 1
# 6. Liquidity increased
ca = curr.get('current_assets', 0)
cl = curr.get('current_liabilities', 1)
prev_ca = prev.get('current_assets', 0)
prev_cl = prev.get('current_liabilities', 1)
if cl > 0 and prev_cl > 0:
if ca / cl > prev_ca / prev_cl:
score += 1
# 7. No new equity issued (simplified: equity increase <= net income)
te = curr.get('total_equity', 0)
prev_te = prev.get('total_equity', 0)
if te - prev_te <= ni:
score += 1
# Operating Efficiency (2 points)
# 8. Gross margin improved
rev = curr.get('revenue', 1)
gp = curr.get('gross_profit', 0)
prev_rev = prev.get('revenue', 1)
prev_gp = prev.get('gross_profit', 0)
if rev > 0 and prev_rev > 0:
if gp / rev > prev_gp / prev_rev:
score += 1
# 9. Asset turnover improved
if ta > 0 and prev_ta > 0:
if rev / ta > prev_rev / prev_ta:
score += 1
fscores[ticker] = score
return fscores