feat: add stock and sector data collectors

Implement StockCollector to fetch stock master data from KRX
(Korea Exchange) including market cap, EPS, BPS, and dividend info.
Implement SectorCollector to fetch WICS sector classification from
WISEindex. Both collectors use PostgreSQL upsert for efficient updates.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
zephyrdark 2026-02-02 23:38:45 +09:00
parent 52d9fdf1f7
commit 5479c36985
3 changed files with 227 additions and 1 deletions

View File

@ -1,3 +1,5 @@
from app.services.collectors.base import BaseCollector
from app.services.collectors.stock_collector import StockCollector
from app.services.collectors.sector_collector import SectorCollector
__all__ = ["BaseCollector"]
__all__ = ["BaseCollector", "StockCollector", "SectorCollector"]

View File

@ -0,0 +1,74 @@
"""
Sector data collector from WISEindex.
"""
import time
from datetime import datetime
import pandas as pd
import requests
from sqlalchemy.orm import Session
from sqlalchemy.dialects.postgresql import insert
from app.services.collectors.base import BaseCollector
from app.models.stock import Sector
class SectorCollector(BaseCollector):
"""Collects WICS sector classification data."""
SECTOR_CODES = ["G25", "G35", "G50", "G40", "G10", "G20", "G55", "G30", "G15", "G45"]
def __init__(self, db: Session, biz_day: str = None):
super().__init__(db)
self.biz_day = biz_day or datetime.now().strftime("%Y%m%d")
def collect(self) -> int:
"""Collect sector classification data."""
all_data = []
for sector_code in self.SECTOR_CODES:
url = f"http://www.wiseindex.com/Index/GetIndexComponets?ceil_yn=0&dt={self.biz_day}&sec_cd={sector_code}"
try:
response = requests.get(url, timeout=10)
data = response.json()
if "list" in data:
df = pd.json_normalize(data["list"])
all_data.append(df)
except Exception:
continue
time.sleep(1) # Rate limiting
if not all_data:
return 0
sectors = pd.concat(all_data, axis=0)
sectors = sectors[["IDX_CD", "CMP_CD", "CMP_KOR", "SEC_NM_KOR"]]
records = []
base_date = datetime.strptime(self.biz_day, "%Y%m%d").date()
for _, row in sectors.iterrows():
records.append({
"ticker": row["CMP_CD"],
"sector_code": row["IDX_CD"],
"company_name": row["CMP_KOR"],
"sector_name": row["SEC_NM_KOR"],
"base_date": base_date,
})
if records:
stmt = insert(Sector).values(records)
stmt = stmt.on_conflict_do_update(
index_elements=["ticker"],
set_={
"sector_code": stmt.excluded.sector_code,
"company_name": stmt.excluded.company_name,
"sector_name": stmt.excluded.sector_name,
"base_date": stmt.excluded.base_date,
},
)
self.db.execute(stmt)
self.db.commit()
return len(records)

View File

@ -0,0 +1,150 @@
"""
Stock data collector from KRX.
"""
from io import BytesIO
from datetime import datetime
import pandas as pd
import numpy as np
import requests
from sqlalchemy.orm import Session
from sqlalchemy.dialects.postgresql import insert
from app.services.collectors.base import BaseCollector
from app.models.stock import Stock, StockType
class StockCollector(BaseCollector):
"""Collects stock master data from KRX."""
GEN_OTP_URL = "http://data.krx.co.kr/comm/fileDn/GenerateOTP/generate.cmd"
DOWN_URL = "http://data.krx.co.kr/comm/fileDn/download_csv/download.cmd"
HEADERS = {
"Referer": "http://data.krx.co.kr/contents/MDC/MDI/mdiLoader",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
}
def __init__(self, db: Session, biz_day: str = None):
super().__init__(db)
self.biz_day = biz_day or self._get_latest_biz_day()
def _get_latest_biz_day(self) -> str:
"""Get the latest business day from Naver Finance."""
from bs4 import BeautifulSoup
import re
url = "https://finance.naver.com/sise/sise_index.naver?code=KOSPI"
response = requests.get(url)
soup = BeautifulSoup(response.content, "lxml")
time_elem = soup.select_one("div.ly_realtime > span#time")
if time_elem:
date_str = re.sub(r"[^0-9]", "", time_elem.text)
return date_str[:8]
return datetime.now().strftime("%Y%m%d")
def _fetch_stock_data(self, mkt_id: str) -> pd.DataFrame:
"""Fetch stock data for a specific market."""
gen_otp_data = {
"locale": "ko_KR",
"mktId": mkt_id,
"trdDd": self.biz_day,
"money": "1",
"csvxls_isNo": "false",
"name": "fileDown",
"url": "dbms/MDC/STAT/standard/MDCSTAT03901",
}
otp = requests.post(self.GEN_OTP_URL, data=gen_otp_data, headers=self.HEADERS)
response = requests.post(self.DOWN_URL, data={"code": otp.text}, headers=self.HEADERS)
return pd.read_csv(BytesIO(response.content), encoding="EUC-KR")
def _fetch_ind_data(self) -> pd.DataFrame:
"""Fetch individual stock indicators."""
gen_otp_data = {
"locale": "ko_KR",
"searchType": "1",
"mktId": "ALL",
"trdDd": self.biz_day,
"csvxls_isNo": "false",
"name": "fileDown",
"url": "dbms/MDC/STAT/standard/MDCSTAT03501",
}
otp = requests.post(self.GEN_OTP_URL, data=gen_otp_data, headers=self.HEADERS)
response = requests.post(self.DOWN_URL, data={"code": otp.text}, headers=self.HEADERS)
return pd.read_csv(BytesIO(response.content), encoding="EUC-KR")
def _classify_stock_type(self, row: pd.Series) -> str:
"""Classify stock type based on name and code."""
name = row.get("종목명", "")
code = row.get("종목코드", "")
if "스팩" in name or "" in name and "" in name:
return StockType.SPAC.value
elif code and code[-1] != "0":
return StockType.PREFERRED.value
elif name.endswith("리츠"):
return StockType.REIT.value
else:
return StockType.COMMON.value
def collect(self) -> int:
"""Collect stock master data."""
# Fetch data from both markets
kospi = self._fetch_stock_data("STK")
kosdaq = self._fetch_stock_data("KSQ")
krx_sector = pd.concat([kospi, kosdaq]).reset_index(drop=True)
krx_sector["종목명"] = krx_sector["종목명"].str.strip()
# Fetch individual indicators
krx_ind = self._fetch_ind_data()
krx_ind["종목명"] = krx_ind["종목명"].str.strip()
# Merge data
merged = pd.merge(
krx_sector,
krx_ind,
on=krx_sector.columns.intersection(krx_ind.columns).tolist(),
how="outer",
)
merged.columns = merged.columns.str.replace(" ", "")
# Process and insert
records = []
for _, row in merged.iterrows():
stock_type = self._classify_stock_type(row)
records.append({
"ticker": row.get("종목코드"),
"name": row.get("종목명"),
"market": row.get("시장구분", "KOSPI"),
"close_price": row.get("종가") if pd.notna(row.get("종가")) else None,
"market_cap": row.get("시가총액") if pd.notna(row.get("시가총액")) else None,
"eps": row.get("EPS") if pd.notna(row.get("EPS")) else None,
"forward_eps": row.get("선행EPS") if pd.notna(row.get("선행EPS")) else None,
"bps": row.get("BPS") if pd.notna(row.get("BPS")) else None,
"dividend_per_share": row.get("주당배당금") if pd.notna(row.get("주당배당금")) else None,
"stock_type": stock_type,
"base_date": datetime.strptime(self.biz_day, "%Y%m%d").date(),
})
# Upsert using PostgreSQL INSERT ON CONFLICT
if records:
stmt = insert(Stock).values(records)
stmt = stmt.on_conflict_do_update(
index_elements=["ticker"],
set_={
"name": stmt.excluded.name,
"market": stmt.excluded.market,
"close_price": stmt.excluded.close_price,
"market_cap": stmt.excluded.market_cap,
"eps": stmt.excluded.eps,
"forward_eps": stmt.excluded.forward_eps,
"bps": stmt.excluded.bps,
"dividend_per_share": stmt.excluded.dividend_per_share,
"stock_type": stmt.excluded.stock_type,
"base_date": stmt.excluded.base_date,
},
)
self.db.execute(stmt)
self.db.commit()
return len(records)