feat: add FinancialCollector for FnGuide financial statement scraping
All checks were successful
Deploy to Production / deploy (push) Successful in 1m8s

Port make-quant-py's FnGuide scraping logic into galaxy-po's
BaseCollector pattern. Collects annual and quarterly financial
statements (revenue, net income, total assets, etc.) and maps
Korean account names to English keys for FactorCalculator.
Scheduled weekly on Monday 19:00 KST since data updates quarterly.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
zephyrdark 2026-02-18 22:38:05 +09:00
parent 653fa08fa4
commit 5422383fd8
8 changed files with 1275 additions and 1 deletions

View File

@ -5,6 +5,7 @@ from app.services.collectors.price_collector import PriceCollector
from app.services.collectors.valuation_collector import ValuationCollector
from app.services.collectors.etf_collector import ETFCollector
from app.services.collectors.etf_price_collector import ETFPriceCollector
from app.services.collectors.financial_collector import FinancialCollector
__all__ = [
"BaseCollector",
@ -14,4 +15,5 @@ __all__ = [
"ValuationCollector",
"ETFCollector",
"ETFPriceCollector",
"FinancialCollector",
]

View File

@ -0,0 +1,229 @@
"""
Financial statement collector using FnGuide scraping.
Ported from make-quant-py/src/data/financial.py and adapted
to galaxy-po's BaseCollector pattern with PostgreSQL upsert.
"""
import logging
import re
import time
from datetime import date
from io import StringIO
import pandas as pd
import requests
from bs4 import BeautifulSoup
from sqlalchemy.orm import Session
from sqlalchemy.dialects.postgresql import insert
from app.services.collectors.base import BaseCollector
from app.models.stock import Financial, Stock, StockType
logger = logging.getLogger(__name__)
FNGUIDE_URL = "https://comp.fnguide.com/SVO2/ASP/SVD_Finance.asp"
SLEEP_SECONDS = 2
class FinancialCollector(BaseCollector):
"""Collects financial statements from FnGuide."""
ACCOUNT_MAP = {
"매출액": "revenue",
"매출총이익": "gross_profit",
"영업이익": "operating_income",
"당기순이익": "net_income",
"자산총계": "total_assets",
"부채총계": "total_liabilities",
"자본총계": "total_equity",
"유동자산": "current_assets",
"유동부채": "current_liabilities",
"영업활동으로인한현금흐름": "operating_cash_flow",
}
def _get_tickers(self) -> list[str]:
"""Get list of common stock tickers from DB."""
stocks = (
self.db.query(Stock.ticker)
.filter(Stock.stock_type == StockType.COMMON.value)
.all()
)
return [s.ticker for s in stocks]
def _clean_financial_data(
self, df: pd.DataFrame, ticker: str, report_type: str
) -> list[dict]:
"""
Clean a raw FnGuide DataFrame and return list of record dicts.
Steps:
1. Rename first column to '계정'
2. Strip FnGuide UI text from account names
3. Drop rows where all value columns are NaN
4. Deduplicate by account name
5. Melt widelong
6. Map Korean account names to English
7. Drop unmapped accounts and NaN values
"""
df = df.copy()
# Ensure first column is named '계정'
if df.columns[0] != "계정":
df = df.rename(columns={df.columns[0]: "계정"})
# Strip FnGuide suffix
df["계정"] = df["계정"].str.replace("계산에 참여한 계정 펼치기", "", regex=False)
df["계정"] = df["계정"].str.strip()
# Drop rows where all non-계정 columns are NaN
value_cols = [c for c in df.columns if c != "계정"]
df = df[~df[value_cols].isna().all(axis=1)]
# Deduplicate accounts (keep first occurrence)
df = df.drop_duplicates(subset=["계정"], keep="first")
# Melt wide → long
df = pd.melt(df, id_vars="계정", var_name="date_str", value_name="value")
# Drop NaN values
df = df[df["value"].notna()]
# Parse dates
df["base_date"] = (
pd.to_datetime(df["date_str"], format="%Y/%m")
+ pd.tseries.offsets.MonthEnd()
).dt.date
# Map account names
df["account"] = df["계정"].map(self.ACCOUNT_MAP)
# Drop unmapped accounts
df = df[df["account"].notna()]
records = []
for _, row in df.iterrows():
try:
val = float(row["value"])
except (ValueError, TypeError):
continue
records.append({
"ticker": ticker,
"base_date": row["base_date"],
"report_type": report_type,
"account": row["account"],
"value": val,
})
return records
def _fetch_financial_data(self, ticker: str) -> list[dict]:
"""
Scrape FnGuide for one ticker's financial statements.
Returns list of record dicts ready for DB insertion.
FnGuide returns 6 HTML tables:
[0] annual income, [1] quarterly income
[2] annual balance, [3] quarterly balance
[4] annual cashflow, [5] quarterly cashflow
"""
url = f"{FNGUIDE_URL}?pGB=1&gicode=A{ticker}"
response = requests.get(url, timeout=self.REQUEST_TIMEOUT)
data = pd.read_html(StringIO(response.text), displayed_only=False)
if len(data) < 6:
logger.warning(f"{ticker}: expected 6 tables, got {len(data)}, skipping")
return []
# Annual data: concat income(0), balance(2), cashflow(4)
# Remove '전년동기' columns from income statement
annual_income = data[0].iloc[:, ~data[0].columns.str.contains("전년동기")]
data_fs_y = pd.concat([annual_income, data[2], data[4]])
data_fs_y = data_fs_y.rename(columns={data_fs_y.columns[0]: "계정"})
# Filter by fiscal year end month
soup = BeautifulSoup(response.content, "html.parser")
fiscal_elems = soup.select("div.corp_group1 > h2")
if len(fiscal_elems) >= 2:
fiscal_months = re.findall(r"[0-9]+", fiscal_elems[1].text)
data_fs_y = data_fs_y.loc[
:,
(data_fs_y.columns == "계정")
| data_fs_y.columns.str[-2:].isin(fiscal_months),
]
annual_records = self._clean_financial_data(data_fs_y, ticker, "annual")
# Quarterly data: concat income(1), balance(3), cashflow(5)
quarterly_income = data[1].iloc[:, ~data[1].columns.str.contains("전년동기")]
data_fs_q = pd.concat([quarterly_income, data[3], data[5]])
data_fs_q = data_fs_q.rename(columns={data_fs_q.columns[0]: "계정"})
quarterly_records = self._clean_financial_data(data_fs_q, ticker, "quarterly")
return annual_records + quarterly_records
def _upsert_records(self, records: list[dict]) -> None:
"""Upsert financial records. Uses PostgreSQL ON CONFLICT when available,
falls back to merge for other dialects (e.g., SQLite in tests)."""
if not records:
return
dialect = self.db.get_bind().dialect.name
if dialect == "postgresql":
stmt = insert(Financial).values(records)
stmt = stmt.on_conflict_do_update(
index_elements=["ticker", "base_date", "report_type", "account"],
set_={"value": stmt.excluded.value},
)
self.db.execute(stmt)
self.db.commit()
elif dialect == "sqlite":
# SQLite-compatible: merge (delete + insert)
for record in records:
existing = (
self.db.query(Financial)
.filter_by(
ticker=record["ticker"],
base_date=record["base_date"],
report_type=record["report_type"],
account=record["account"],
)
.first()
)
if existing:
existing.value = record["value"]
else:
self.db.add(Financial(**record))
self.db.commit()
else:
raise ValueError(f"Unsupported database dialect: {dialect}")
def collect(self) -> int:
"""Collect financial statements for all common stocks."""
tickers = self._get_tickers()
if not tickers:
logger.warning("No tickers found in stocks table")
return 0
total_records = 0
error_tickers = []
for i, ticker in enumerate(tickers):
try:
records = self._fetch_financial_data(ticker)
self._upsert_records(records)
total_records += len(records)
if (i + 1) % 100 == 0:
logger.info(f"Progress: {i + 1}/{len(tickers)} tickers processed")
except Exception as e:
logger.warning(f"{ticker}: failed to collect financial data: {e}")
error_tickers.append(ticker)
time.sleep(SLEEP_SECONDS)
if error_tickers:
logger.warning(f"Failed tickers ({len(error_tickers)}): {error_tickers[:20]}")
logger.info(f"Collected {total_records} financial records from {len(tickers) - len(error_tickers)} tickers")
return total_records

View File

@ -13,6 +13,7 @@ from app.services.collectors import (
SectorCollector,
PriceCollector,
ValuationCollector,
FinancialCollector,
ETFCollector,
ETFPriceCollector,
)
@ -63,6 +64,28 @@ def run_daily_collection():
return results
def run_financial_collection():
"""
Run financial statement collector.
Financial data updates quarterly, so this runs weekly (Monday)
rather than daily. Separated from daily collection to avoid
unnecessary FnGuide scraping on every business day.
"""
logger.info("Starting weekly financial statement collection")
db = SessionLocal()
try:
collector = FinancialCollector(db)
collector.run()
logger.info("FinancialCollector completed successfully")
return {"FinancialCollector": "success"}
except Exception as e:
logger.error(f"FinancialCollector failed: {e}")
return {"FinancialCollector": f"failed: {e}"}
finally:
db.close()
def _generate_yearly_chunks(start_year: int, end_date: date) -> list[tuple[str, str]]:
"""Generate (start_date, end_date) pairs in YYYYMMDD format, one per year."""
chunks = []

View File

@ -10,7 +10,7 @@ from apscheduler.triggers.cron import CronTrigger
KST = ZoneInfo("Asia/Seoul")
from jobs.snapshot_job import create_daily_snapshots
from jobs.collection_job import run_daily_collection
from jobs.collection_job import run_daily_collection, run_financial_collection
logger = logging.getLogger(__name__)
@ -35,6 +35,22 @@ def configure_jobs():
)
logger.info("Configured daily_collection job at 18:00 KST")
# Weekly financial statement collection at 19:00 Monday
# (after daily collection, financial data updates quarterly)
scheduler.add_job(
run_financial_collection,
trigger=CronTrigger(
hour=19,
minute=0,
day_of_week='mon',
timezone=KST,
),
id='weekly_financial_collection',
name='Collect financial statements from FnGuide',
replace_existing=True,
)
logger.info("Configured weekly_financial_collection job at 19:00 KST Monday")
# Daily snapshot at 18:30 (after data collection completes)
scheduler.add_job(
create_daily_snapshots,

View File

View File

@ -0,0 +1,273 @@
"""
Unit tests for FinancialCollector.
These tests mock HTTP responses to avoid hitting FnGuide in CI.
"""
from datetime import date
from unittest.mock import patch, MagicMock
import pandas as pd
import pytest
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import StaticPool
from app.core.database import Base
from app.models.stock import Financial, Stock, StockType, ReportType
from app.services.collectors.financial_collector import FinancialCollector
@pytest.fixture
def db():
"""In-memory SQLite database for testing."""
engine = create_engine(
"sqlite:///:memory:",
connect_args={"check_same_thread": False},
poolclass=StaticPool,
)
Base.metadata.create_all(bind=engine)
Session = sessionmaker(bind=engine)
session = Session()
yield session
session.close()
Base.metadata.drop_all(bind=engine)
@pytest.fixture
def db_with_stocks(db):
"""Database with sample stock records."""
stocks = [
Stock(
ticker="005930",
name="삼성전자",
market="KOSPI",
close_price=70000,
market_cap=418000000000000,
stock_type=StockType.COMMON.value,
base_date=date(2025, 3, 28),
),
Stock(
ticker="000660",
name="SK하이닉스",
market="KOSPI",
close_price=120000,
market_cap=87000000000000,
stock_type=StockType.COMMON.value,
base_date=date(2025, 3, 28),
),
]
for s in stocks:
db.add(s)
db.commit()
return db
# Sample FnGuide HTML tables (6 tables: annual/quarterly x income/balance/cashflow)
def _make_sample_tables():
"""Build 6 DataFrames mimicking pd.read_html output from FnGuide."""
# Annual income statement (index 0)
annual_income = pd.DataFrame({
"IFRS(연결)": ["매출액", "매출총이익", "영업이익", "당기순이익"],
"2022/12": [302231400, 108747000, 43376600, 55654200],
"2023/12": [258935500, 73024400, 6566500, 15487100],
"2024/12": [300870000, 100000000, 32726500, 34681300],
"전년동기(%)": [None, None, None, None],
})
# Quarterly income statement (index 1)
quarterly_income = pd.DataFrame({
"IFRS(연결)": ["매출액", "매출총이익", "영업이익", "당기순이익"],
"2024/03": [71922800, 22735000, 6609800, 6745200],
"2024/06": [74069300, 25558200, 10443900, 9837900],
})
# Annual balance sheet (index 2)
annual_balance = pd.DataFrame({
"IFRS(연결)": ["자산총계", "부채총계", "자본총계", "유동자산", "유동부채"],
"2022/12": [448424400, 101153300, 347271100, 218439000, 67766200],
"2023/12": [455905400, 107064700, 348840700, 213137900, 73291500],
"2024/12": [480000000, 110000000, 370000000, 220000000, 75000000],
})
# Quarterly balance sheet (index 3)
quarterly_balance = pd.DataFrame({
"IFRS(연결)": ["자산총계", "부채총계", "자본총계", "유동자산", "유동부채"],
"2024/03": [460000000, 108000000, 352000000, 215000000, 74000000],
"2024/06": [465000000, 109000000, 356000000, 218000000, 74500000],
})
# Annual cash flow (index 4)
annual_cashflow = pd.DataFrame({
"IFRS(연결)": ["영업활동으로인한현금흐름"],
"2022/12": [49050400],
"2023/12": [67442000],
"2024/12": [50000000],
})
# Quarterly cash flow (index 5)
quarterly_cashflow = pd.DataFrame({
"IFRS(연결)": ["영업활동으로인한현금흐름"],
"2024/03": [12000000],
"2024/06": [15000000],
})
return [
annual_income, quarterly_income,
annual_balance, quarterly_balance,
annual_cashflow, quarterly_cashflow,
]
def _make_fiscal_html():
"""Build HTML snippet with fiscal year end month (12월)."""
return """
<html><body>
<div class="corp_group1">
<h2>삼성전자</h2>
<h2>12 결산</h2>
</div>
</body></html>
"""
class TestAccountMapping:
"""Test that Korean account names map correctly to English."""
def test_known_accounts_are_mapped(self):
assert FinancialCollector.ACCOUNT_MAP["매출액"] == "revenue"
assert FinancialCollector.ACCOUNT_MAP["당기순이익"] == "net_income"
assert FinancialCollector.ACCOUNT_MAP["자산총계"] == "total_assets"
assert FinancialCollector.ACCOUNT_MAP["자본총계"] == "total_equity"
assert FinancialCollector.ACCOUNT_MAP["영업활동으로인한현금흐름"] == "operating_cash_flow"
def test_all_factor_calculator_accounts_covered(self):
"""FactorCalculator expects these account keys."""
required = {
"revenue", "gross_profit", "operating_income", "net_income",
"total_assets", "total_liabilities", "total_equity",
"current_assets", "current_liabilities", "operating_cash_flow",
}
mapped_values = set(FinancialCollector.ACCOUNT_MAP.values())
assert required.issubset(mapped_values)
class TestCleanFinancialData:
"""Test the data cleaning logic."""
def test_clean_removes_nan_rows(self, db):
collector = FinancialCollector(db)
df = pd.DataFrame({
"계정": ["매출액", "빈행"],
"2024/12": [100000, None],
})
result = collector._clean_financial_data(df, "005930", "annual")
# Only 매출액 should remain (빈행 has NaN value and is not in ACCOUNT_MAP)
assert len(result) == 1
assert result[0]["account"] == "revenue"
def test_clean_maps_account_names(self, db):
collector = FinancialCollector(db)
df = pd.DataFrame({
"계정": ["매출액", "자산총계"],
"2024/12": [100000, 500000],
})
result = collector._clean_financial_data(df, "005930", "annual")
accounts = {r["account"] for r in result}
assert "revenue" in accounts
assert "total_assets" in accounts
def test_clean_skips_unmapped_accounts(self, db):
collector = FinancialCollector(db)
df = pd.DataFrame({
"계정": ["매출액", "알수없는계정"],
"2024/12": [100000, 999],
})
result = collector._clean_financial_data(df, "005930", "annual")
accounts = {r["account"] for r in result}
assert "revenue" in accounts
assert "알수없는계정" not in accounts
def test_clean_strips_fnguide_suffix(self, db):
collector = FinancialCollector(db)
df = pd.DataFrame({
"계정": ["매출액계산에 참여한 계정 펼치기"],
"2024/12": [100000],
})
result = collector._clean_financial_data(df, "005930", "annual")
assert len(result) == 1
assert result[0]["account"] == "revenue"
class TestCollect:
"""Test full collect flow with mocked HTTP."""
@patch("app.services.collectors.financial_collector.time.sleep")
@patch("app.services.collectors.financial_collector.requests.get")
@patch("app.services.collectors.financial_collector.pd.read_html")
def test_collect_saves_records(self, mock_read_html, mock_get, mock_sleep, db_with_stocks):
mock_read_html.return_value = _make_sample_tables()
mock_response = MagicMock()
mock_response.text = _make_fiscal_html()
mock_response.content = _make_fiscal_html().encode()
mock_get.return_value = mock_response
collector = FinancialCollector(db_with_stocks)
count = collector.collect()
assert count > 0
records = db_with_stocks.query(Financial).all()
assert len(records) > 0
@patch("app.services.collectors.financial_collector.time.sleep")
@patch("app.services.collectors.financial_collector.requests.get")
@patch("app.services.collectors.financial_collector.pd.read_html")
def test_collect_stores_correct_report_types(self, mock_read_html, mock_get, mock_sleep, db_with_stocks):
mock_read_html.return_value = _make_sample_tables()
mock_response = MagicMock()
mock_response.text = _make_fiscal_html()
mock_response.content = _make_fiscal_html().encode()
mock_get.return_value = mock_response
collector = FinancialCollector(db_with_stocks)
collector.collect()
report_types = {r.report_type for r in db_with_stocks.query(Financial).all()}
assert ReportType.ANNUAL in report_types or "annual" in report_types
@patch("app.services.collectors.financial_collector.time.sleep")
@patch("app.services.collectors.financial_collector.requests.get")
@patch("app.services.collectors.financial_collector.pd.read_html")
def test_collect_continues_on_ticker_error(self, mock_read_html, mock_get, mock_sleep, db_with_stocks):
"""If one ticker fails, the collector should continue to the next."""
call_count = 0
def side_effect(*args, **kwargs):
nonlocal call_count
call_count += 1
if call_count <= 1:
raise ValueError("Simulated error")
return _make_sample_tables()
mock_read_html.side_effect = side_effect
mock_response = MagicMock()
mock_response.text = _make_fiscal_html()
mock_response.content = _make_fiscal_html().encode()
mock_get.return_value = mock_response
collector = FinancialCollector(db_with_stocks)
count = collector.collect()
# Should still have records from the second ticker
assert count > 0
@patch("app.services.collectors.financial_collector.time.sleep")
@patch("app.services.collectors.financial_collector.requests.get")
@patch("app.services.collectors.financial_collector.pd.read_html")
def test_collect_upserts_on_duplicate(self, mock_read_html, mock_get, mock_sleep, db_with_stocks):
"""Running collect twice should update, not duplicate."""
mock_read_html.return_value = _make_sample_tables()
mock_response = MagicMock()
mock_response.text = _make_fiscal_html()
mock_response.content = _make_fiscal_html().encode()
mock_get.return_value = mock_response
collector = FinancialCollector(db_with_stocks)
count1 = collector.collect()
count2 = collector.collect()
# Both runs should succeed with same count
assert count1 == count2

View File

@ -0,0 +1,660 @@
# Financial Statement Collector Implementation Plan
> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.
**Goal:** Implement a `FinancialCollector` that scrapes financial statement data from FnGuide and populates the existing `financials` table so that `FactorCalculator` can compute quality/f-score factors.
**Architecture:** Port make-quant-py's FnGuide scraping logic into galaxy-po's `BaseCollector` pattern. The collector fetches annual and quarterly financial statements per ticker, maps Korean account names to the English keys expected by `FactorCalculator`, and upserts into PostgreSQL.
**Tech Stack:** Python, pandas (`read_html`), BeautifulSoup, requests, SQLAlchemy (PostgreSQL dialect)
---
### Task 1: Write tests for FinancialCollector
**Files:**
- Create: `backend/tests/unit/__init__.py`
- Create: `backend/tests/unit/test_financial_collector.py`
**Step 1: Create unit test directory**
Create `backend/tests/unit/__init__.py` as an empty file.
**Step 2: Write the failing tests**
Create `backend/tests/unit/test_financial_collector.py`:
```python
"""
Unit tests for FinancialCollector.
These tests mock HTTP responses to avoid hitting FnGuide in CI.
"""
import re
from datetime import date
from unittest.mock import patch, MagicMock
import pandas as pd
import pytest
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import StaticPool
from app.core.database import Base
from app.models.stock import Financial, Stock, StockType, ReportType
from app.services.collectors.financial_collector import FinancialCollector
@pytest.fixture
def db():
"""In-memory SQLite database for testing."""
engine = create_engine(
"sqlite:///:memory:",
connect_args={"check_same_thread": False},
poolclass=StaticPool,
)
Base.metadata.create_all(bind=engine)
Session = sessionmaker(bind=engine)
session = Session()
yield session
session.close()
Base.metadata.drop_all(bind=engine)
@pytest.fixture
def db_with_stocks(db):
"""Database with sample stock records."""
stocks = [
Stock(
ticker="005930",
name="삼성전자",
market="KOSPI",
close_price=70000,
market_cap=418000000000000,
stock_type=StockType.COMMON.value,
base_date=date(2025, 3, 28),
),
Stock(
ticker="000660",
name="SK하이닉스",
market="KOSPI",
close_price=120000,
market_cap=87000000000000,
stock_type=StockType.COMMON.value,
base_date=date(2025, 3, 28),
),
]
for s in stocks:
db.add(s)
db.commit()
return db
# Sample FnGuide HTML tables (6 tables: annual/quarterly x income/balance/cashflow)
def _make_sample_tables():
"""Build 6 DataFrames mimicking pd.read_html output from FnGuide."""
# Annual income statement (index 0)
annual_income = pd.DataFrame({
"IFRS(연결)": ["매출액", "매출총이익", "영업이익", "당기순이익"],
"2022/12": [302231400, 108747000, 43376600, 55654200],
"2023/12": [258935500, 73024400, 6566500, 15487100],
"2024/12": [300870000, 100000000, 32726500, 34681300],
"전년동기(%)": [None, None, None, None],
})
# Quarterly income statement (index 1)
quarterly_income = pd.DataFrame({
"IFRS(연결)": ["매출액", "매출총이익", "영업이익", "당기순이익"],
"2024/03": [71922800, 22735000, 6609800, 6745200],
"2024/06": [74069300, 25558200, 10443900, 9837900],
})
# Annual balance sheet (index 2)
annual_balance = pd.DataFrame({
"IFRS(연결)": ["자산총계", "부채총계", "자본총계", "유동자산", "유동부채"],
"2022/12": [448424400, 101153300, 347271100, 218439000, 67766200],
"2023/12": [455905400, 107064700, 348840700, 213137900, 73291500],
"2024/12": [480000000, 110000000, 370000000, 220000000, 75000000],
})
# Quarterly balance sheet (index 3)
quarterly_balance = pd.DataFrame({
"IFRS(연결)": ["자산총계", "부채총계", "자본총계", "유동자산", "유동부채"],
"2024/03": [460000000, 108000000, 352000000, 215000000, 74000000],
"2024/06": [465000000, 109000000, 356000000, 218000000, 74500000],
})
# Annual cash flow (index 4)
annual_cashflow = pd.DataFrame({
"IFRS(연결)": ["영업활동으로인한현금흐름"],
"2022/12": [49050400],
"2023/12": [67442000],
"2024/12": [50000000],
})
# Quarterly cash flow (index 5)
quarterly_cashflow = pd.DataFrame({
"IFRS(연결)": ["영업활동으로인한현금흐름"],
"2024/03": [12000000],
"2024/06": [15000000],
})
return [
annual_income, quarterly_income,
annual_balance, quarterly_balance,
annual_cashflow, quarterly_cashflow,
]
def _make_fiscal_html():
"""Build HTML snippet with fiscal year end month (12월)."""
return """
<html><body>
<div class="corp_group1">
<h2>삼성전자</h2>
<h2>12월 결산</h2>
</div>
</body></html>
"""
class TestAccountMapping:
"""Test that Korean account names map correctly to English."""
def test_known_accounts_are_mapped(self):
assert FinancialCollector.ACCOUNT_MAP["매출액"] == "revenue"
assert FinancialCollector.ACCOUNT_MAP["당기순이익"] == "net_income"
assert FinancialCollector.ACCOUNT_MAP["자산총계"] == "total_assets"
assert FinancialCollector.ACCOUNT_MAP["자본총계"] == "total_equity"
assert FinancialCollector.ACCOUNT_MAP["영업활동으로인한현금흐름"] == "operating_cash_flow"
def test_all_factor_calculator_accounts_covered(self):
"""FactorCalculator expects these account keys."""
required = {
"revenue", "gross_profit", "operating_income", "net_income",
"total_assets", "total_liabilities", "total_equity",
"current_assets", "current_liabilities", "operating_cash_flow",
}
mapped_values = set(FinancialCollector.ACCOUNT_MAP.values())
assert required.issubset(mapped_values)
class TestCleanFinancialData:
"""Test the data cleaning logic."""
def test_clean_removes_nan_rows(self, db):
collector = FinancialCollector(db)
df = pd.DataFrame({
"계정": ["매출액", "빈행"],
"2024/12": [100000, None],
})
result = collector._clean_financial_data(df, "005930", "annual")
# Only 매출액 should remain (빈행 has NaN value and is not in ACCOUNT_MAP)
assert len(result) >= 1
assert all(r["account"] == "revenue" for r in result if r["account"] == "revenue")
def test_clean_maps_account_names(self, db):
collector = FinancialCollector(db)
df = pd.DataFrame({
"계정": ["매출액", "자산총계"],
"2024/12": [100000, 500000],
})
result = collector._clean_financial_data(df, "005930", "annual")
accounts = {r["account"] for r in result}
assert "revenue" in accounts
assert "total_assets" in accounts
def test_clean_skips_unmapped_accounts(self, db):
collector = FinancialCollector(db)
df = pd.DataFrame({
"계정": ["매출액", "알수없는계정"],
"2024/12": [100000, 999],
})
result = collector._clean_financial_data(df, "005930", "annual")
accounts = {r["account"] for r in result}
assert "revenue" in accounts
assert "알수없는계정" not in accounts
def test_clean_strips_fnguide_suffix(self, db):
collector = FinancialCollector(db)
df = pd.DataFrame({
"계정": ["매출액계산에 참여한 계정 펼치기"],
"2024/12": [100000],
})
result = collector._clean_financial_data(df, "005930", "annual")
assert len(result) == 1
assert result[0]["account"] == "revenue"
class TestCollect:
"""Test full collect flow with mocked HTTP."""
@patch("app.services.collectors.financial_collector.requests.get")
@patch("app.services.collectors.financial_collector.pd.read_html")
def test_collect_saves_records(self, mock_read_html, mock_get, db_with_stocks):
mock_read_html.return_value = _make_sample_tables()
mock_response = MagicMock()
mock_response.content = _make_fiscal_html().encode()
mock_get.return_value = mock_response
collector = FinancialCollector(db_with_stocks)
count = collector.collect()
assert count > 0
records = db_with_stocks.query(Financial).all()
assert len(records) > 0
@patch("app.services.collectors.financial_collector.requests.get")
@patch("app.services.collectors.financial_collector.pd.read_html")
def test_collect_stores_correct_report_types(self, mock_read_html, mock_get, db_with_stocks):
mock_read_html.return_value = _make_sample_tables()
mock_response = MagicMock()
mock_response.content = _make_fiscal_html().encode()
mock_get.return_value = mock_response
collector = FinancialCollector(db_with_stocks)
collector.collect()
report_types = {r.report_type for r in db_with_stocks.query(Financial).all()}
assert ReportType.ANNUAL in report_types or "annual" in report_types
@patch("app.services.collectors.financial_collector.requests.get")
@patch("app.services.collectors.financial_collector.pd.read_html")
def test_collect_continues_on_ticker_error(self, mock_read_html, mock_get, db_with_stocks):
"""If one ticker fails, the collector should continue to the next."""
call_count = 0
def side_effect(*args, **kwargs):
nonlocal call_count
call_count += 1
if call_count <= 1:
raise ValueError("Simulated error")
return _make_sample_tables()
mock_read_html.side_effect = side_effect
mock_response = MagicMock()
mock_response.content = _make_fiscal_html().encode()
mock_get.return_value = mock_response
collector = FinancialCollector(db_with_stocks)
count = collector.collect()
# Should still have records from the second ticker
assert count > 0
@patch("app.services.collectors.financial_collector.requests.get")
@patch("app.services.collectors.financial_collector.pd.read_html")
def test_collect_upserts_on_duplicate(self, mock_read_html, mock_get, db_with_stocks):
"""Running collect twice should update, not duplicate."""
mock_read_html.return_value = _make_sample_tables()
mock_response = MagicMock()
mock_response.content = _make_fiscal_html().encode()
mock_get.return_value = mock_response
collector = FinancialCollector(db_with_stocks)
count1 = collector.collect()
count2 = collector.collect()
# Both runs should succeed with same count
assert count1 == count2
```
**Step 3: Run tests to verify they fail**
Run: `cd /home/zephyrdark/workspace/quant/galaxy-po/backend && python -m pytest tests/unit/test_financial_collector.py -v`
Expected: FAIL with `ModuleNotFoundError: No module named 'app.services.collectors.financial_collector'`
**Step 4: Commit**
```bash
git add backend/tests/unit/__init__.py backend/tests/unit/test_financial_collector.py
git commit -m "test: add unit tests for FinancialCollector"
```
---
### Task 2: Implement FinancialCollector
**Files:**
- Create: `backend/app/services/collectors/financial_collector.py`
**Step 1: Write the implementation**
Create `backend/app/services/collectors/financial_collector.py`:
```python
"""
Financial statement collector using FnGuide scraping.
Ported from make-quant-py/src/data/financial.py and adapted
to galaxy-po's BaseCollector pattern with PostgreSQL upsert.
"""
import logging
import re
import time
from datetime import date
import pandas as pd
import requests
from bs4 import BeautifulSoup
from sqlalchemy.orm import Session
from sqlalchemy.dialects.postgresql import insert
from app.services.collectors.base import BaseCollector
from app.models.stock import Financial, Stock, StockType
logger = logging.getLogger(__name__)
FNGUIDE_URL = "https://comp.fnguide.com/SVO2/ASP/SVD_Finance.asp"
SLEEP_SECONDS = 2
class FinancialCollector(BaseCollector):
"""Collects financial statements from FnGuide."""
ACCOUNT_MAP = {
"매출액": "revenue",
"매출총이익": "gross_profit",
"영업이익": "operating_income",
"당기순이익": "net_income",
"자산총계": "total_assets",
"부채총계": "total_liabilities",
"자본총계": "total_equity",
"유동자산": "current_assets",
"유동부채": "current_liabilities",
"영업활동으로인한현금흐름": "operating_cash_flow",
}
def __init__(self, db: Session):
super().__init__(db)
def _get_tickers(self) -> list[str]:
"""Get list of common stock tickers from DB."""
stocks = (
self.db.query(Stock.ticker)
.filter(Stock.stock_type == StockType.COMMON.value)
.all()
)
return [s.ticker for s in stocks]
def _clean_financial_data(
self, df: pd.DataFrame, ticker: str, report_type: str
) -> list[dict]:
"""
Clean a raw FnGuide DataFrame and return list of record dicts.
Steps:
1. Rename first column to '계정'
2. Strip FnGuide UI text from account names
3. Drop rows where all value columns are NaN
4. Deduplicate by account name
5. Melt wide→long
6. Map Korean account names to English
7. Drop unmapped accounts and NaN values
"""
df = df.copy()
# Ensure first column is named '계정'
if df.columns[0] != "계정":
df = df.rename(columns={df.columns[0]: "계정"})
# Strip FnGuide suffix
df["계정"] = df["계정"].str.replace("계산에 참여한 계정 펼치기", "", regex=False)
df["계정"] = df["계정"].str.strip()
# Drop rows where all non-계정 columns are NaN
value_cols = [c for c in df.columns if c != "계정"]
df = df[~df[value_cols].isna().all(axis=1)]
# Deduplicate accounts (keep first occurrence)
df = df.drop_duplicates(subset=["계정"], keep="first")
# Melt wide → long
df = pd.melt(df, id_vars="계정", var_name="date_str", value_name="value")
# Drop NaN values
df = df[df["value"].notna()]
# Parse dates
df["base_date"] = (
pd.to_datetime(df["date_str"], format="%Y/%m")
+ pd.tseries.offsets.MonthEnd()
).dt.date
# Map account names
df["account"] = df["계정"].map(self.ACCOUNT_MAP)
# Drop unmapped accounts
df = df[df["account"].notna()]
records = []
for _, row in df.iterrows():
records.append({
"ticker": ticker,
"base_date": row["base_date"],
"report_type": report_type,
"account": row["account"],
"value": float(row["value"]),
})
return records
def _fetch_financial_data(self, ticker: str) -> list[dict]:
"""
Scrape FnGuide for one ticker's financial statements.
Returns list of record dicts ready for DB insertion.
FnGuide returns 6 HTML tables:
[0] annual income, [1] quarterly income
[2] annual balance, [3] quarterly balance
[4] annual cashflow, [5] quarterly cashflow
"""
url = f"{FNGUIDE_URL}?pGB=1&gicode=A{ticker}"
data = pd.read_html(url, displayed_only=False)
if len(data) < 6:
logger.warning(f"{ticker}: expected 6 tables, got {len(data)}, skipping")
return []
# Annual data: concat income(0), balance(2), cashflow(4)
# Remove '전년동기' columns from income statement
annual_income = data[0].iloc[:, ~data[0].columns.str.contains("전년동기")]
data_fs_y = pd.concat([annual_income, data[2], data[4]])
data_fs_y = data_fs_y.rename(columns={data_fs_y.columns[0]: "계정"})
# Filter by fiscal year end month
page_data = requests.get(url, timeout=self.REQUEST_TIMEOUT)
soup = BeautifulSoup(page_data.content, "html.parser")
fiscal_elems = soup.select("div.corp_group1 > h2")
if len(fiscal_elems) >= 2:
fiscal_months = re.findall(r"[0-9]+", fiscal_elems[1].text)
data_fs_y = data_fs_y.loc[
:,
(data_fs_y.columns == "계정")
| data_fs_y.columns.str[-2:].isin(fiscal_months),
]
annual_records = self._clean_financial_data(data_fs_y, ticker, "annual")
# Quarterly data: concat income(1), balance(3), cashflow(5)
quarterly_income = data[1].iloc[:, ~data[1].columns.str.contains("전년동기")]
data_fs_q = pd.concat([quarterly_income, data[3], data[5]])
data_fs_q = data_fs_q.rename(columns={data_fs_q.columns[0]: "계정"})
quarterly_records = self._clean_financial_data(data_fs_q, ticker, "quarterly")
return annual_records + quarterly_records
def _upsert_records(self, records: list[dict]) -> None:
"""Upsert financial records using PostgreSQL ON CONFLICT."""
if not records:
return
stmt = insert(Financial).values(records)
stmt = stmt.on_conflict_do_update(
index_elements=["ticker", "base_date", "report_type", "account"],
set_={"value": stmt.excluded.value},
)
self.db.execute(stmt)
self.db.commit()
def collect(self) -> int:
"""Collect financial statements for all common stocks."""
tickers = self._get_tickers()
if not tickers:
logger.warning("No tickers found in stocks table")
return 0
total_records = 0
error_tickers = []
for i, ticker in enumerate(tickers):
try:
records = self._fetch_financial_data(ticker)
self._upsert_records(records)
total_records += len(records)
if (i + 1) % 100 == 0:
logger.info(f"Progress: {i + 1}/{len(tickers)} tickers processed")
except Exception as e:
logger.warning(f"{ticker}: failed to collect financial data: {e}")
error_tickers.append(ticker)
time.sleep(SLEEP_SECONDS)
if error_tickers:
logger.warning(f"Failed tickers ({len(error_tickers)}): {error_tickers[:20]}")
logger.info(f"Collected {total_records} financial records from {len(tickers) - len(error_tickers)} tickers")
return total_records
```
**Step 2: Run tests to verify they pass**
Run: `cd /home/zephyrdark/workspace/quant/galaxy-po/backend && python -m pytest tests/unit/test_financial_collector.py -v`
Expected: Most tests PASS. The upsert test may need SQLite-compatible handling — see Step 3.
**Step 3: Fix SQLite compatibility for upsert test if needed**
The `_upsert_records` method uses PostgreSQL-specific `insert().on_conflict_do_update()`. For the SQLite-based test, the upsert test may fail. If so, the test should verify behavior using the mock pattern (checking that records exist), not the actual upsert. The implementation itself is correct for production PostgreSQL.
**Step 4: Commit**
```bash
git add backend/app/services/collectors/financial_collector.py
git commit -m "feat: implement FinancialCollector for FnGuide scraping"
```
---
### Task 3: Register FinancialCollector in module exports
**Files:**
- Modify: `backend/app/services/collectors/__init__.py`
**Step 1: Add import and export**
Add `FinancialCollector` to `__init__.py`:
```python
from app.services.collectors.base import BaseCollector
from app.services.collectors.stock_collector import StockCollector
from app.services.collectors.sector_collector import SectorCollector
from app.services.collectors.price_collector import PriceCollector
from app.services.collectors.valuation_collector import ValuationCollector
from app.services.collectors.etf_collector import ETFCollector
from app.services.collectors.etf_price_collector import ETFPriceCollector
from app.services.collectors.financial_collector import FinancialCollector
__all__ = [
"BaseCollector",
"StockCollector",
"SectorCollector",
"PriceCollector",
"ValuationCollector",
"ETFCollector",
"ETFPriceCollector",
"FinancialCollector",
]
```
**Step 2: Commit**
```bash
git add backend/app/services/collectors/__init__.py
git commit -m "feat: export FinancialCollector from collectors module"
```
---
### Task 4: Add FinancialCollector to daily collection job
**Files:**
- Modify: `backend/jobs/collection_job.py`
**Step 1: Add import**
Add `FinancialCollector` to the imports in `collection_job.py`:
```python
from app.services.collectors import (
StockCollector,
SectorCollector,
PriceCollector,
ValuationCollector,
ETFCollector,
ETFPriceCollector,
FinancialCollector,
)
```
**Step 2: Add to daily collectors list**
In `_get_daily_collectors()`, add `FinancialCollector` after `ValuationCollector` (it depends on stock master data being collected first):
```python
def _get_daily_collectors():
return [
("StockCollector", StockCollector, {}),
("SectorCollector", SectorCollector, {}),
("PriceCollector", PriceCollector, {}),
("ValuationCollector", ValuationCollector, {}),
("FinancialCollector", FinancialCollector, {}),
("ETFCollector", ETFCollector, {}),
("ETFPriceCollector", ETFPriceCollector, {}),
]
```
**Step 3: Run existing tests to check no regressions**
Run: `cd /home/zephyrdark/workspace/quant/galaxy-po/backend && python -m pytest tests/ -v`
Expected: All tests pass.
**Step 4: Commit**
```bash
git add backend/jobs/collection_job.py
git commit -m "feat: add FinancialCollector to daily collection schedule"
```
---
### Task 5: Verify end-to-end with a single ticker (manual)
**Step 1: Run a quick manual test against real FnGuide**
```bash
cd /home/zephyrdark/workspace/quant/galaxy-po/backend
python -c "
import pandas as pd
url = 'https://comp.fnguide.com/SVO2/ASP/SVD_Finance.asp?pGB=1&gicode=A005930'
tables = pd.read_html(url, displayed_only=False)
print(f'Number of tables: {len(tables)}')
for i, t in enumerate(tables):
print(f'Table {i}: {t.shape}, columns: {list(t.columns[:3])}...')
"
```
Expected: 6 tables returned, showing financial data for Samsung Electronics.
**Step 2: Run the full test suite one final time**
Run: `cd /home/zephyrdark/workspace/quant/galaxy-po/backend && python -m pytest tests/ -v`
Expected: All tests pass.

View File

@ -0,0 +1,71 @@
# Financial Statement Collector Design
## Date: 2026-02-18
## Problem
galaxy-po has a `Financial` model and `FactorCalculator` that depends on financial statement data (ROE, GPA, F-Score calculations), but no collector exists to actually populate the `financials` table.
make-quant-py already implements FnGuide scraping for financial statements in `src/data/financial.py`.
## Solution
Implement `FinancialCollector` following the existing `BaseCollector` pattern, porting make-quant-py's FnGuide scraping logic to galaxy-po's architecture.
## Data Source
FnGuide (`https://comp.fnguide.com/SVO2/ASP/SVD_Finance.asp`) provides:
- Annual and quarterly financial statements
- Income statement, balance sheet, cash flow statement
- Free, no API key required
- HTML table scraping via `pd.read_html()`
## Account Name Mapping
FnGuide returns Korean account names. Map to English keys expected by `FactorCalculator`:
| FnGuide (Korean) | Financial.account (English) |
|---|---|
| 매출액 | revenue |
| 매출총이익 | gross_profit |
| 영업이익 | operating_income |
| 당기순이익 | net_income |
| 자산총계 | total_assets |
| 부채총계 | total_liabilities |
| 자본총계 | total_equity |
| 유동자산 | current_assets |
| 유동부채 | current_liabilities |
| 영업활동으로인한현금흐름 | operating_cash_flow |
## Architecture
```
FinancialCollector(BaseCollector)
├── collect() → iterate all tickers, call _fetch_financial_data for each
├── _fetch_financial_data(ticker) → scrape FnGuide, return list of record dicts
├── _clean_financial_data(df, ticker, report_type) → clean and normalize DataFrame
└── ACCOUNT_MAP (class constant) → Korean → English account mapping
```
## Data Flow
1. Get ticker list from `stocks` table
2. For each ticker:
- Fetch FnGuide page via `pd.read_html(url, displayed_only=False)`
- Annual: concat data[0], data[2], data[4] (income, balance, cashflow)
- Quarterly: concat data[1], data[3], data[5]
- Parse fiscal year end month from page HTML
- Clean: remove NaN rows, deduplicate accounts, melt wide→long
- Map Korean account names to English
- Sleep 2 seconds between tickers (rate limiting)
3. Upsert all records to `financials` table (PostgreSQL ON CONFLICT)
## Files to Change
- **New:** `backend/app/services/collectors/financial_collector.py`
- **Modify:** `backend/app/services/collectors/__init__.py` (add export)
- **Modify:** `backend/jobs/collection_job.py` (add to daily collection)
## Scheduler Integration
Add `FinancialCollector` to `run_daily_collection()`. Financial data updates quarterly, but upsert makes daily runs idempotent.