352 lines
11 KiB
Python
352 lines
11 KiB
Python
"""
|
|
Factor calculation service for quant strategies.
|
|
"""
|
|
from decimal import Decimal
|
|
from typing import Dict, List, Optional
|
|
from datetime import date, timedelta
|
|
|
|
import pandas as pd
|
|
from sqlalchemy.orm import Session
|
|
from sqlalchemy import func
|
|
|
|
from app.models.stock import Stock, Valuation, Price, Financial, Sector
|
|
|
|
|
|
class FactorCalculator:
|
|
"""Calculates factor scores for stocks."""
|
|
|
|
def __init__(self, db: Session):
|
|
self.db = db
|
|
|
|
def get_universe(
|
|
self,
|
|
markets: List[str] = None,
|
|
min_market_cap: int = None,
|
|
max_market_cap: int = None,
|
|
exclude_stock_types: List[str] = None,
|
|
exclude_sectors: List[str] = None,
|
|
) -> List[Stock]:
|
|
"""Get filtered stock universe."""
|
|
query = self.db.query(Stock)
|
|
|
|
if markets:
|
|
query = query.filter(Stock.market.in_(markets))
|
|
if min_market_cap:
|
|
# market_cap is in won, min_market_cap is in 억원
|
|
query = query.filter(Stock.market_cap >= min_market_cap * 100_000_000)
|
|
if max_market_cap:
|
|
query = query.filter(Stock.market_cap <= max_market_cap * 100_000_000)
|
|
if exclude_stock_types:
|
|
query = query.filter(~Stock.stock_type.in_(exclude_stock_types))
|
|
|
|
stocks = query.all()
|
|
|
|
# Filter by sector if needed
|
|
if exclude_sectors:
|
|
sector_tickers = (
|
|
self.db.query(Sector.ticker)
|
|
.filter(Sector.sector_name.in_(exclude_sectors))
|
|
.all()
|
|
)
|
|
excluded = {t[0] for t in sector_tickers}
|
|
stocks = [s for s in stocks if s.ticker not in excluded]
|
|
|
|
return stocks
|
|
|
|
def get_valuations(self, tickers: List[str], base_date: date = None) -> Dict[str, Valuation]:
|
|
"""Get latest valuations for tickers."""
|
|
if base_date:
|
|
valuations = (
|
|
self.db.query(Valuation)
|
|
.filter(Valuation.ticker.in_(tickers))
|
|
.filter(Valuation.base_date <= base_date)
|
|
.order_by(Valuation.base_date.desc())
|
|
.all()
|
|
)
|
|
else:
|
|
valuations = (
|
|
self.db.query(Valuation)
|
|
.filter(Valuation.ticker.in_(tickers))
|
|
.all()
|
|
)
|
|
|
|
# Get latest per ticker
|
|
result = {}
|
|
for v in valuations:
|
|
if v.ticker not in result:
|
|
result[v.ticker] = v
|
|
return result
|
|
|
|
def get_sectors(self, tickers: List[str]) -> Dict[str, str]:
|
|
"""Get sector names for tickers."""
|
|
sectors = (
|
|
self.db.query(Sector)
|
|
.filter(Sector.ticker.in_(tickers))
|
|
.all()
|
|
)
|
|
return {s.ticker: s.sector_name for s in sectors}
|
|
|
|
def calculate_momentum(
|
|
self,
|
|
tickers: List[str],
|
|
base_date: date = None,
|
|
months: int = 12,
|
|
skip_recent: int = 1,
|
|
) -> Dict[str, Decimal]:
|
|
"""Calculate price momentum."""
|
|
if base_date is None:
|
|
base_date = date.today()
|
|
|
|
start_date = base_date - timedelta(days=months * 30)
|
|
skip_date = base_date - timedelta(days=skip_recent * 30)
|
|
|
|
# Get prices
|
|
prices = (
|
|
self.db.query(Price)
|
|
.filter(Price.ticker.in_(tickers))
|
|
.filter(Price.date >= start_date)
|
|
.filter(Price.date <= base_date)
|
|
.all()
|
|
)
|
|
|
|
# Group by ticker
|
|
ticker_prices = {}
|
|
for p in prices:
|
|
if p.ticker not in ticker_prices:
|
|
ticker_prices[p.ticker] = []
|
|
ticker_prices[p.ticker].append((p.date, float(p.close)))
|
|
|
|
# Calculate returns
|
|
momentum = {}
|
|
for ticker, price_list in ticker_prices.items():
|
|
if len(price_list) < 2:
|
|
continue
|
|
|
|
price_list.sort(key=lambda x: x[0])
|
|
|
|
# Find start price
|
|
start_price = price_list[0][1]
|
|
|
|
# Find end price (skip recent month if specified)
|
|
end_price = None
|
|
for d, p in reversed(price_list):
|
|
if d <= skip_date:
|
|
end_price = p
|
|
break
|
|
|
|
if end_price and start_price > 0:
|
|
momentum[ticker] = Decimal(str((end_price - start_price) / start_price * 100))
|
|
|
|
return momentum
|
|
|
|
def calculate_value_scores(
|
|
self,
|
|
valuations: Dict[str, Valuation],
|
|
) -> Dict[str, Decimal]:
|
|
"""Calculate value factor scores (higher is cheaper/better)."""
|
|
data = []
|
|
for ticker, v in valuations.items():
|
|
# Inverse of PER, PBR, etc. (lower ratio = higher score)
|
|
per_inv = 1 / float(v.per) if v.per and float(v.per) > 0 else 0
|
|
pbr_inv = 1 / float(v.pbr) if v.pbr and float(v.pbr) > 0 else 0
|
|
psr_inv = 1 / float(v.psr) if v.psr and float(v.psr) > 0 else 0
|
|
pcr_inv = 1 / float(v.pcr) if v.pcr and float(v.pcr) > 0 else 0
|
|
div_yield = float(v.dividend_yield) if v.dividend_yield else 0
|
|
|
|
data.append({
|
|
'ticker': ticker,
|
|
'per_inv': per_inv,
|
|
'pbr_inv': pbr_inv,
|
|
'psr_inv': psr_inv,
|
|
'pcr_inv': pcr_inv,
|
|
'div_yield': div_yield,
|
|
})
|
|
|
|
if not data:
|
|
return {}
|
|
|
|
df = pd.DataFrame(data)
|
|
|
|
# Z-score normalization for each metric
|
|
for col in ['per_inv', 'pbr_inv', 'psr_inv', 'pcr_inv', 'div_yield']:
|
|
mean = df[col].mean()
|
|
std = df[col].std()
|
|
if std > 0:
|
|
df[f'{col}_z'] = (df[col] - mean) / std
|
|
else:
|
|
df[f'{col}_z'] = 0
|
|
|
|
# Composite value score (equal weight)
|
|
df['value_score'] = (
|
|
df['per_inv_z'] + df['pbr_inv_z'] + df['psr_inv_z'] +
|
|
df['pcr_inv_z'] + df['div_yield_z']
|
|
) / 5
|
|
|
|
return {row['ticker']: Decimal(str(row['value_score'])) for _, row in df.iterrows()}
|
|
|
|
def calculate_quality_scores(
|
|
self,
|
|
tickers: List[str],
|
|
base_date: date = None,
|
|
) -> Dict[str, Decimal]:
|
|
"""Calculate quality factor scores based on ROE, GP/A, etc."""
|
|
# Get financial data
|
|
financials = (
|
|
self.db.query(Financial)
|
|
.filter(Financial.ticker.in_(tickers))
|
|
.filter(Financial.report_type == 'annual')
|
|
.all()
|
|
)
|
|
|
|
# Group by ticker
|
|
ticker_financials = {}
|
|
for f in financials:
|
|
if f.ticker not in ticker_financials:
|
|
ticker_financials[f.ticker] = {}
|
|
ticker_financials[f.ticker][f.account] = float(f.value) if f.value else 0
|
|
|
|
data = []
|
|
for ticker, fin in ticker_financials.items():
|
|
total_equity = fin.get('total_equity', 0)
|
|
total_assets = fin.get('total_assets', 0)
|
|
net_income = fin.get('net_income', 0)
|
|
gross_profit = fin.get('gross_profit', 0)
|
|
operating_cf = fin.get('operating_cash_flow', 0)
|
|
total_liabilities = fin.get('total_liabilities', 0)
|
|
|
|
roe = net_income / total_equity if total_equity > 0 else 0
|
|
gpa = gross_profit / total_assets if total_assets > 0 else 0
|
|
cfo_a = operating_cf / total_assets if total_assets > 0 else 0
|
|
debt_ratio_inv = 1 / (total_liabilities / total_equity) if total_equity > 0 and total_liabilities > 0 else 0
|
|
|
|
data.append({
|
|
'ticker': ticker,
|
|
'roe': roe,
|
|
'gpa': gpa,
|
|
'cfo_a': cfo_a,
|
|
'debt_ratio_inv': debt_ratio_inv,
|
|
})
|
|
|
|
if not data:
|
|
return {}
|
|
|
|
df = pd.DataFrame(data)
|
|
|
|
# Z-score normalization
|
|
for col in ['roe', 'gpa', 'cfo_a', 'debt_ratio_inv']:
|
|
mean = df[col].mean()
|
|
std = df[col].std()
|
|
if std > 0:
|
|
df[f'{col}_z'] = (df[col] - mean) / std
|
|
else:
|
|
df[f'{col}_z'] = 0
|
|
|
|
# Composite quality score
|
|
df['quality_score'] = (df['roe_z'] + df['gpa_z'] + df['cfo_a_z'] + df['debt_ratio_inv_z']) / 4
|
|
|
|
return {row['ticker']: Decimal(str(row['quality_score'])) for _, row in df.iterrows()}
|
|
|
|
def calculate_fscore(
|
|
self,
|
|
tickers: List[str],
|
|
) -> Dict[str, int]:
|
|
"""Calculate Piotroski F-Score (0-9)."""
|
|
# Get financial data for current and previous year
|
|
financials = (
|
|
self.db.query(Financial)
|
|
.filter(Financial.ticker.in_(tickers))
|
|
.filter(Financial.report_type == 'annual')
|
|
.all()
|
|
)
|
|
|
|
# Group by ticker and date
|
|
ticker_data = {}
|
|
for f in financials:
|
|
key = (f.ticker, f.base_date)
|
|
if key not in ticker_data:
|
|
ticker_data[key] = {}
|
|
ticker_data[key][f.account] = float(f.value) if f.value else 0
|
|
|
|
fscores = {}
|
|
for ticker in tickers:
|
|
# Get latest two years
|
|
ticker_years = sorted(
|
|
[(k, v) for k, v in ticker_data.items() if k[0] == ticker],
|
|
key=lambda x: x[0][1],
|
|
reverse=True
|
|
)[:2]
|
|
|
|
if len(ticker_years) < 2:
|
|
fscores[ticker] = 0
|
|
continue
|
|
|
|
curr = ticker_years[0][1]
|
|
prev = ticker_years[1][1]
|
|
|
|
score = 0
|
|
|
|
# Profitability (4 points)
|
|
# 1. ROA > 0
|
|
ta = curr.get('total_assets', 1)
|
|
ni = curr.get('net_income', 0)
|
|
if ta > 0 and ni / ta > 0:
|
|
score += 1
|
|
|
|
# 2. CFO > 0
|
|
cfo = curr.get('operating_cash_flow', 0)
|
|
if cfo > 0:
|
|
score += 1
|
|
|
|
# 3. ROA increased
|
|
prev_ta = prev.get('total_assets', 1)
|
|
prev_ni = prev.get('net_income', 0)
|
|
if ta > 0 and prev_ta > 0:
|
|
if ni / ta > prev_ni / prev_ta:
|
|
score += 1
|
|
|
|
# 4. CFO > Net Income (Accrual)
|
|
if cfo > ni:
|
|
score += 1
|
|
|
|
# Leverage (3 points)
|
|
# 5. Leverage decreased
|
|
tl = curr.get('total_liabilities', 0)
|
|
prev_tl = prev.get('total_liabilities', 0)
|
|
if ta > 0 and prev_ta > 0:
|
|
if tl / ta < prev_tl / prev_ta:
|
|
score += 1
|
|
|
|
# 6. Liquidity increased
|
|
ca = curr.get('current_assets', 0)
|
|
cl = curr.get('current_liabilities', 1)
|
|
prev_ca = prev.get('current_assets', 0)
|
|
prev_cl = prev.get('current_liabilities', 1)
|
|
if cl > 0 and prev_cl > 0:
|
|
if ca / cl > prev_ca / prev_cl:
|
|
score += 1
|
|
|
|
# 7. No new equity issued (simplified: equity increase <= net income)
|
|
te = curr.get('total_equity', 0)
|
|
prev_te = prev.get('total_equity', 0)
|
|
if te - prev_te <= ni:
|
|
score += 1
|
|
|
|
# Operating Efficiency (2 points)
|
|
# 8. Gross margin improved
|
|
rev = curr.get('revenue', 1)
|
|
gp = curr.get('gross_profit', 0)
|
|
prev_rev = prev.get('revenue', 1)
|
|
prev_gp = prev.get('gross_profit', 0)
|
|
if rev > 0 and prev_rev > 0:
|
|
if gp / rev > prev_gp / prev_rev:
|
|
score += 1
|
|
|
|
# 9. Asset turnover improved
|
|
if ta > 0 and prev_ta > 0:
|
|
if rev / ta > prev_rev / prev_ta:
|
|
score += 1
|
|
|
|
fscores[ticker] = score
|
|
|
|
return fscores
|