From 5479c369859759f258e109f54194564ba24d5a0b Mon Sep 17 00:00:00 2001 From: zephyrdark Date: Mon, 2 Feb 2026 23:38:45 +0900 Subject: [PATCH] feat: add stock and sector data collectors Implement StockCollector to fetch stock master data from KRX (Korea Exchange) including market cap, EPS, BPS, and dividend info. Implement SectorCollector to fetch WICS sector classification from WISEindex. Both collectors use PostgreSQL upsert for efficient updates. Co-Authored-By: Claude Opus 4.5 --- backend/app/services/collectors/__init__.py | 4 +- .../services/collectors/sector_collector.py | 74 +++++++++ .../services/collectors/stock_collector.py | 150 ++++++++++++++++++ 3 files changed, 227 insertions(+), 1 deletion(-) create mode 100644 backend/app/services/collectors/sector_collector.py create mode 100644 backend/app/services/collectors/stock_collector.py diff --git a/backend/app/services/collectors/__init__.py b/backend/app/services/collectors/__init__.py index 5e58481..c332ee9 100644 --- a/backend/app/services/collectors/__init__.py +++ b/backend/app/services/collectors/__init__.py @@ -1,3 +1,5 @@ from app.services.collectors.base import BaseCollector +from app.services.collectors.stock_collector import StockCollector +from app.services.collectors.sector_collector import SectorCollector -__all__ = ["BaseCollector"] +__all__ = ["BaseCollector", "StockCollector", "SectorCollector"] diff --git a/backend/app/services/collectors/sector_collector.py b/backend/app/services/collectors/sector_collector.py new file mode 100644 index 0000000..7bf63d7 --- /dev/null +++ b/backend/app/services/collectors/sector_collector.py @@ -0,0 +1,74 @@ +""" +Sector data collector from WISEindex. +""" +import time +from datetime import datetime + +import pandas as pd +import requests + +from sqlalchemy.orm import Session +from sqlalchemy.dialects.postgresql import insert + +from app.services.collectors.base import BaseCollector +from app.models.stock import Sector + + +class SectorCollector(BaseCollector): + """Collects WICS sector classification data.""" + + SECTOR_CODES = ["G25", "G35", "G50", "G40", "G10", "G20", "G55", "G30", "G15", "G45"] + + def __init__(self, db: Session, biz_day: str = None): + super().__init__(db) + self.biz_day = biz_day or datetime.now().strftime("%Y%m%d") + + def collect(self) -> int: + """Collect sector classification data.""" + all_data = [] + + for sector_code in self.SECTOR_CODES: + url = f"http://www.wiseindex.com/Index/GetIndexComponets?ceil_yn=0&dt={self.biz_day}&sec_cd={sector_code}" + try: + response = requests.get(url, timeout=10) + data = response.json() + if "list" in data: + df = pd.json_normalize(data["list"]) + all_data.append(df) + except Exception: + continue + time.sleep(1) # Rate limiting + + if not all_data: + return 0 + + sectors = pd.concat(all_data, axis=0) + sectors = sectors[["IDX_CD", "CMP_CD", "CMP_KOR", "SEC_NM_KOR"]] + + records = [] + base_date = datetime.strptime(self.biz_day, "%Y%m%d").date() + + for _, row in sectors.iterrows(): + records.append({ + "ticker": row["CMP_CD"], + "sector_code": row["IDX_CD"], + "company_name": row["CMP_KOR"], + "sector_name": row["SEC_NM_KOR"], + "base_date": base_date, + }) + + if records: + stmt = insert(Sector).values(records) + stmt = stmt.on_conflict_do_update( + index_elements=["ticker"], + set_={ + "sector_code": stmt.excluded.sector_code, + "company_name": stmt.excluded.company_name, + "sector_name": stmt.excluded.sector_name, + "base_date": stmt.excluded.base_date, + }, + ) + self.db.execute(stmt) + self.db.commit() + + return len(records) diff --git a/backend/app/services/collectors/stock_collector.py b/backend/app/services/collectors/stock_collector.py new file mode 100644 index 0000000..5af86a8 --- /dev/null +++ b/backend/app/services/collectors/stock_collector.py @@ -0,0 +1,150 @@ +""" +Stock data collector from KRX. +""" +from io import BytesIO +from datetime import datetime + +import pandas as pd +import numpy as np +import requests + +from sqlalchemy.orm import Session +from sqlalchemy.dialects.postgresql import insert + +from app.services.collectors.base import BaseCollector +from app.models.stock import Stock, StockType + + +class StockCollector(BaseCollector): + """Collects stock master data from KRX.""" + + GEN_OTP_URL = "http://data.krx.co.kr/comm/fileDn/GenerateOTP/generate.cmd" + DOWN_URL = "http://data.krx.co.kr/comm/fileDn/download_csv/download.cmd" + HEADERS = { + "Referer": "http://data.krx.co.kr/contents/MDC/MDI/mdiLoader", + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36", + } + + def __init__(self, db: Session, biz_day: str = None): + super().__init__(db) + self.biz_day = biz_day or self._get_latest_biz_day() + + def _get_latest_biz_day(self) -> str: + """Get the latest business day from Naver Finance.""" + from bs4 import BeautifulSoup + import re + + url = "https://finance.naver.com/sise/sise_index.naver?code=KOSPI" + response = requests.get(url) + soup = BeautifulSoup(response.content, "lxml") + time_elem = soup.select_one("div.ly_realtime > span#time") + if time_elem: + date_str = re.sub(r"[^0-9]", "", time_elem.text) + return date_str[:8] + return datetime.now().strftime("%Y%m%d") + + def _fetch_stock_data(self, mkt_id: str) -> pd.DataFrame: + """Fetch stock data for a specific market.""" + gen_otp_data = { + "locale": "ko_KR", + "mktId": mkt_id, + "trdDd": self.biz_day, + "money": "1", + "csvxls_isNo": "false", + "name": "fileDown", + "url": "dbms/MDC/STAT/standard/MDCSTAT03901", + } + otp = requests.post(self.GEN_OTP_URL, data=gen_otp_data, headers=self.HEADERS) + response = requests.post(self.DOWN_URL, data={"code": otp.text}, headers=self.HEADERS) + return pd.read_csv(BytesIO(response.content), encoding="EUC-KR") + + def _fetch_ind_data(self) -> pd.DataFrame: + """Fetch individual stock indicators.""" + gen_otp_data = { + "locale": "ko_KR", + "searchType": "1", + "mktId": "ALL", + "trdDd": self.biz_day, + "csvxls_isNo": "false", + "name": "fileDown", + "url": "dbms/MDC/STAT/standard/MDCSTAT03501", + } + otp = requests.post(self.GEN_OTP_URL, data=gen_otp_data, headers=self.HEADERS) + response = requests.post(self.DOWN_URL, data={"code": otp.text}, headers=self.HEADERS) + return pd.read_csv(BytesIO(response.content), encoding="EUC-KR") + + def _classify_stock_type(self, row: pd.Series) -> str: + """Classify stock type based on name and code.""" + name = row.get("종목명", "") + code = row.get("종목코드", "") + + if "스팩" in name or "제" in name and "호" in name: + return StockType.SPAC.value + elif code and code[-1] != "0": + return StockType.PREFERRED.value + elif name.endswith("리츠"): + return StockType.REIT.value + else: + return StockType.COMMON.value + + def collect(self) -> int: + """Collect stock master data.""" + # Fetch data from both markets + kospi = self._fetch_stock_data("STK") + kosdaq = self._fetch_stock_data("KSQ") + krx_sector = pd.concat([kospi, kosdaq]).reset_index(drop=True) + krx_sector["종목명"] = krx_sector["종목명"].str.strip() + + # Fetch individual indicators + krx_ind = self._fetch_ind_data() + krx_ind["종목명"] = krx_ind["종목명"].str.strip() + + # Merge data + merged = pd.merge( + krx_sector, + krx_ind, + on=krx_sector.columns.intersection(krx_ind.columns).tolist(), + how="outer", + ) + merged.columns = merged.columns.str.replace(" ", "") + + # Process and insert + records = [] + for _, row in merged.iterrows(): + stock_type = self._classify_stock_type(row) + records.append({ + "ticker": row.get("종목코드"), + "name": row.get("종목명"), + "market": row.get("시장구분", "KOSPI"), + "close_price": row.get("종가") if pd.notna(row.get("종가")) else None, + "market_cap": row.get("시가총액") if pd.notna(row.get("시가총액")) else None, + "eps": row.get("EPS") if pd.notna(row.get("EPS")) else None, + "forward_eps": row.get("선행EPS") if pd.notna(row.get("선행EPS")) else None, + "bps": row.get("BPS") if pd.notna(row.get("BPS")) else None, + "dividend_per_share": row.get("주당배당금") if pd.notna(row.get("주당배당금")) else None, + "stock_type": stock_type, + "base_date": datetime.strptime(self.biz_day, "%Y%m%d").date(), + }) + + # Upsert using PostgreSQL INSERT ON CONFLICT + if records: + stmt = insert(Stock).values(records) + stmt = stmt.on_conflict_do_update( + index_elements=["ticker"], + set_={ + "name": stmt.excluded.name, + "market": stmt.excluded.market, + "close_price": stmt.excluded.close_price, + "market_cap": stmt.excluded.market_cap, + "eps": stmt.excluded.eps, + "forward_eps": stmt.excluded.forward_eps, + "bps": stmt.excluded.bps, + "dividend_per_share": stmt.excluded.dividend_per_share, + "stock_type": stmt.excluded.stock_type, + "base_date": stmt.excluded.base_date, + }, + ) + self.db.execute(stmt) + self.db.commit() + + return len(records)