feat: add factor calculator service
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
parent
d671befb90
commit
9eebc73390
351
backend/app/services/factor_calculator.py
Normal file
351
backend/app/services/factor_calculator.py
Normal file
@ -0,0 +1,351 @@
|
||||
"""
|
||||
Factor calculation service for quant strategies.
|
||||
"""
|
||||
from decimal import Decimal
|
||||
from typing import Dict, List, Optional
|
||||
from datetime import date, timedelta
|
||||
|
||||
import pandas as pd
|
||||
from sqlalchemy.orm import Session
|
||||
from sqlalchemy import func
|
||||
|
||||
from app.models.stock import Stock, Valuation, Price, Financial, Sector
|
||||
|
||||
|
||||
class FactorCalculator:
|
||||
"""Calculates factor scores for stocks."""
|
||||
|
||||
def __init__(self, db: Session):
|
||||
self.db = db
|
||||
|
||||
def get_universe(
|
||||
self,
|
||||
markets: List[str] = None,
|
||||
min_market_cap: int = None,
|
||||
max_market_cap: int = None,
|
||||
exclude_stock_types: List[str] = None,
|
||||
exclude_sectors: List[str] = None,
|
||||
) -> List[Stock]:
|
||||
"""Get filtered stock universe."""
|
||||
query = self.db.query(Stock)
|
||||
|
||||
if markets:
|
||||
query = query.filter(Stock.market.in_(markets))
|
||||
if min_market_cap:
|
||||
# market_cap is in won, min_market_cap is in 억원
|
||||
query = query.filter(Stock.market_cap >= min_market_cap * 100_000_000)
|
||||
if max_market_cap:
|
||||
query = query.filter(Stock.market_cap <= max_market_cap * 100_000_000)
|
||||
if exclude_stock_types:
|
||||
query = query.filter(~Stock.stock_type.in_(exclude_stock_types))
|
||||
|
||||
stocks = query.all()
|
||||
|
||||
# Filter by sector if needed
|
||||
if exclude_sectors:
|
||||
sector_tickers = (
|
||||
self.db.query(Sector.ticker)
|
||||
.filter(Sector.sector_name.in_(exclude_sectors))
|
||||
.all()
|
||||
)
|
||||
excluded = {t[0] for t in sector_tickers}
|
||||
stocks = [s for s in stocks if s.ticker not in excluded]
|
||||
|
||||
return stocks
|
||||
|
||||
def get_valuations(self, tickers: List[str], base_date: date = None) -> Dict[str, Valuation]:
|
||||
"""Get latest valuations for tickers."""
|
||||
if base_date:
|
||||
valuations = (
|
||||
self.db.query(Valuation)
|
||||
.filter(Valuation.ticker.in_(tickers))
|
||||
.filter(Valuation.base_date <= base_date)
|
||||
.order_by(Valuation.base_date.desc())
|
||||
.all()
|
||||
)
|
||||
else:
|
||||
valuations = (
|
||||
self.db.query(Valuation)
|
||||
.filter(Valuation.ticker.in_(tickers))
|
||||
.all()
|
||||
)
|
||||
|
||||
# Get latest per ticker
|
||||
result = {}
|
||||
for v in valuations:
|
||||
if v.ticker not in result:
|
||||
result[v.ticker] = v
|
||||
return result
|
||||
|
||||
def get_sectors(self, tickers: List[str]) -> Dict[str, str]:
|
||||
"""Get sector names for tickers."""
|
||||
sectors = (
|
||||
self.db.query(Sector)
|
||||
.filter(Sector.ticker.in_(tickers))
|
||||
.all()
|
||||
)
|
||||
return {s.ticker: s.sector_name for s in sectors}
|
||||
|
||||
def calculate_momentum(
|
||||
self,
|
||||
tickers: List[str],
|
||||
base_date: date = None,
|
||||
months: int = 12,
|
||||
skip_recent: int = 1,
|
||||
) -> Dict[str, Decimal]:
|
||||
"""Calculate price momentum."""
|
||||
if base_date is None:
|
||||
base_date = date.today()
|
||||
|
||||
start_date = base_date - timedelta(days=months * 30)
|
||||
skip_date = base_date - timedelta(days=skip_recent * 30)
|
||||
|
||||
# Get prices
|
||||
prices = (
|
||||
self.db.query(Price)
|
||||
.filter(Price.ticker.in_(tickers))
|
||||
.filter(Price.date >= start_date)
|
||||
.filter(Price.date <= base_date)
|
||||
.all()
|
||||
)
|
||||
|
||||
# Group by ticker
|
||||
ticker_prices = {}
|
||||
for p in prices:
|
||||
if p.ticker not in ticker_prices:
|
||||
ticker_prices[p.ticker] = []
|
||||
ticker_prices[p.ticker].append((p.date, float(p.close)))
|
||||
|
||||
# Calculate returns
|
||||
momentum = {}
|
||||
for ticker, price_list in ticker_prices.items():
|
||||
if len(price_list) < 2:
|
||||
continue
|
||||
|
||||
price_list.sort(key=lambda x: x[0])
|
||||
|
||||
# Find start price
|
||||
start_price = price_list[0][1]
|
||||
|
||||
# Find end price (skip recent month if specified)
|
||||
end_price = None
|
||||
for d, p in reversed(price_list):
|
||||
if d <= skip_date:
|
||||
end_price = p
|
||||
break
|
||||
|
||||
if end_price and start_price > 0:
|
||||
momentum[ticker] = Decimal(str((end_price - start_price) / start_price * 100))
|
||||
|
||||
return momentum
|
||||
|
||||
def calculate_value_scores(
|
||||
self,
|
||||
valuations: Dict[str, Valuation],
|
||||
) -> Dict[str, Decimal]:
|
||||
"""Calculate value factor scores (higher is cheaper/better)."""
|
||||
data = []
|
||||
for ticker, v in valuations.items():
|
||||
# Inverse of PER, PBR, etc. (lower ratio = higher score)
|
||||
per_inv = 1 / float(v.per) if v.per and float(v.per) > 0 else 0
|
||||
pbr_inv = 1 / float(v.pbr) if v.pbr and float(v.pbr) > 0 else 0
|
||||
psr_inv = 1 / float(v.psr) if v.psr and float(v.psr) > 0 else 0
|
||||
pcr_inv = 1 / float(v.pcr) if v.pcr and float(v.pcr) > 0 else 0
|
||||
div_yield = float(v.dividend_yield) if v.dividend_yield else 0
|
||||
|
||||
data.append({
|
||||
'ticker': ticker,
|
||||
'per_inv': per_inv,
|
||||
'pbr_inv': pbr_inv,
|
||||
'psr_inv': psr_inv,
|
||||
'pcr_inv': pcr_inv,
|
||||
'div_yield': div_yield,
|
||||
})
|
||||
|
||||
if not data:
|
||||
return {}
|
||||
|
||||
df = pd.DataFrame(data)
|
||||
|
||||
# Z-score normalization for each metric
|
||||
for col in ['per_inv', 'pbr_inv', 'psr_inv', 'pcr_inv', 'div_yield']:
|
||||
mean = df[col].mean()
|
||||
std = df[col].std()
|
||||
if std > 0:
|
||||
df[f'{col}_z'] = (df[col] - mean) / std
|
||||
else:
|
||||
df[f'{col}_z'] = 0
|
||||
|
||||
# Composite value score (equal weight)
|
||||
df['value_score'] = (
|
||||
df['per_inv_z'] + df['pbr_inv_z'] + df['psr_inv_z'] +
|
||||
df['pcr_inv_z'] + df['div_yield_z']
|
||||
) / 5
|
||||
|
||||
return {row['ticker']: Decimal(str(row['value_score'])) for _, row in df.iterrows()}
|
||||
|
||||
def calculate_quality_scores(
|
||||
self,
|
||||
tickers: List[str],
|
||||
base_date: date = None,
|
||||
) -> Dict[str, Decimal]:
|
||||
"""Calculate quality factor scores based on ROE, GP/A, etc."""
|
||||
# Get financial data
|
||||
financials = (
|
||||
self.db.query(Financial)
|
||||
.filter(Financial.ticker.in_(tickers))
|
||||
.filter(Financial.report_type == 'annual')
|
||||
.all()
|
||||
)
|
||||
|
||||
# Group by ticker
|
||||
ticker_financials = {}
|
||||
for f in financials:
|
||||
if f.ticker not in ticker_financials:
|
||||
ticker_financials[f.ticker] = {}
|
||||
ticker_financials[f.ticker][f.account] = float(f.value) if f.value else 0
|
||||
|
||||
data = []
|
||||
for ticker, fin in ticker_financials.items():
|
||||
total_equity = fin.get('total_equity', 0)
|
||||
total_assets = fin.get('total_assets', 0)
|
||||
net_income = fin.get('net_income', 0)
|
||||
gross_profit = fin.get('gross_profit', 0)
|
||||
operating_cf = fin.get('operating_cash_flow', 0)
|
||||
total_liabilities = fin.get('total_liabilities', 0)
|
||||
|
||||
roe = net_income / total_equity if total_equity > 0 else 0
|
||||
gpa = gross_profit / total_assets if total_assets > 0 else 0
|
||||
cfo_a = operating_cf / total_assets if total_assets > 0 else 0
|
||||
debt_ratio_inv = 1 / (total_liabilities / total_equity) if total_equity > 0 and total_liabilities > 0 else 0
|
||||
|
||||
data.append({
|
||||
'ticker': ticker,
|
||||
'roe': roe,
|
||||
'gpa': gpa,
|
||||
'cfo_a': cfo_a,
|
||||
'debt_ratio_inv': debt_ratio_inv,
|
||||
})
|
||||
|
||||
if not data:
|
||||
return {}
|
||||
|
||||
df = pd.DataFrame(data)
|
||||
|
||||
# Z-score normalization
|
||||
for col in ['roe', 'gpa', 'cfo_a', 'debt_ratio_inv']:
|
||||
mean = df[col].mean()
|
||||
std = df[col].std()
|
||||
if std > 0:
|
||||
df[f'{col}_z'] = (df[col] - mean) / std
|
||||
else:
|
||||
df[f'{col}_z'] = 0
|
||||
|
||||
# Composite quality score
|
||||
df['quality_score'] = (df['roe_z'] + df['gpa_z'] + df['cfo_a_z'] + df['debt_ratio_inv_z']) / 4
|
||||
|
||||
return {row['ticker']: Decimal(str(row['quality_score'])) for _, row in df.iterrows()}
|
||||
|
||||
def calculate_fscore(
|
||||
self,
|
||||
tickers: List[str],
|
||||
) -> Dict[str, int]:
|
||||
"""Calculate Piotroski F-Score (0-9)."""
|
||||
# Get financial data for current and previous year
|
||||
financials = (
|
||||
self.db.query(Financial)
|
||||
.filter(Financial.ticker.in_(tickers))
|
||||
.filter(Financial.report_type == 'annual')
|
||||
.all()
|
||||
)
|
||||
|
||||
# Group by ticker and date
|
||||
ticker_data = {}
|
||||
for f in financials:
|
||||
key = (f.ticker, f.base_date)
|
||||
if key not in ticker_data:
|
||||
ticker_data[key] = {}
|
||||
ticker_data[key][f.account] = float(f.value) if f.value else 0
|
||||
|
||||
fscores = {}
|
||||
for ticker in tickers:
|
||||
# Get latest two years
|
||||
ticker_years = sorted(
|
||||
[(k, v) for k, v in ticker_data.items() if k[0] == ticker],
|
||||
key=lambda x: x[0][1],
|
||||
reverse=True
|
||||
)[:2]
|
||||
|
||||
if len(ticker_years) < 2:
|
||||
fscores[ticker] = 0
|
||||
continue
|
||||
|
||||
curr = ticker_years[0][1]
|
||||
prev = ticker_years[1][1]
|
||||
|
||||
score = 0
|
||||
|
||||
# Profitability (4 points)
|
||||
# 1. ROA > 0
|
||||
ta = curr.get('total_assets', 1)
|
||||
ni = curr.get('net_income', 0)
|
||||
if ta > 0 and ni / ta > 0:
|
||||
score += 1
|
||||
|
||||
# 2. CFO > 0
|
||||
cfo = curr.get('operating_cash_flow', 0)
|
||||
if cfo > 0:
|
||||
score += 1
|
||||
|
||||
# 3. ROA increased
|
||||
prev_ta = prev.get('total_assets', 1)
|
||||
prev_ni = prev.get('net_income', 0)
|
||||
if ta > 0 and prev_ta > 0:
|
||||
if ni / ta > prev_ni / prev_ta:
|
||||
score += 1
|
||||
|
||||
# 4. CFO > Net Income (Accrual)
|
||||
if cfo > ni:
|
||||
score += 1
|
||||
|
||||
# Leverage (3 points)
|
||||
# 5. Leverage decreased
|
||||
tl = curr.get('total_liabilities', 0)
|
||||
prev_tl = prev.get('total_liabilities', 0)
|
||||
if ta > 0 and prev_ta > 0:
|
||||
if tl / ta < prev_tl / prev_ta:
|
||||
score += 1
|
||||
|
||||
# 6. Liquidity increased
|
||||
ca = curr.get('current_assets', 0)
|
||||
cl = curr.get('current_liabilities', 1)
|
||||
prev_ca = prev.get('current_assets', 0)
|
||||
prev_cl = prev.get('current_liabilities', 1)
|
||||
if cl > 0 and prev_cl > 0:
|
||||
if ca / cl > prev_ca / prev_cl:
|
||||
score += 1
|
||||
|
||||
# 7. No new equity issued (simplified: equity increase <= net income)
|
||||
te = curr.get('total_equity', 0)
|
||||
prev_te = prev.get('total_equity', 0)
|
||||
if te - prev_te <= ni:
|
||||
score += 1
|
||||
|
||||
# Operating Efficiency (2 points)
|
||||
# 8. Gross margin improved
|
||||
rev = curr.get('revenue', 1)
|
||||
gp = curr.get('gross_profit', 0)
|
||||
prev_rev = prev.get('revenue', 1)
|
||||
prev_gp = prev.get('gross_profit', 0)
|
||||
if rev > 0 and prev_rev > 0:
|
||||
if gp / rev > prev_gp / prev_rev:
|
||||
score += 1
|
||||
|
||||
# 9. Asset turnover improved
|
||||
if ta > 0 and prev_ta > 0:
|
||||
if rev / ta > prev_rev / prev_ta:
|
||||
score += 1
|
||||
|
||||
fscores[ticker] = score
|
||||
|
||||
return fscores
|
||||
Loading…
x
Reference in New Issue
Block a user