# Phase 2: Data Collection Implementation Plan > **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task. **Goal:** Implement batch data collection system for Korean stock market data (stocks, sectors, prices, valuations, financials) with scheduler and admin UI. **Architecture:** Data collectors fetch from KRX, WISEindex, and pykrx. APScheduler runs jobs on schedule. Admin API allows manual triggers. Job logs track execution status. **Tech Stack:** pykrx, requests, beautifulsoup4, APScheduler, pandas --- ## Task 1: Base Collector Infrastructure **Files:** - Create: `backend/app/services/__init__.py` - Create: `backend/app/services/collectors/__init__.py` - Create: `backend/app/services/collectors/base.py` **Step 1: Create backend/app/services/collectors/__init__.py** ```python from app.services.collectors.base import BaseCollector __all__ = ["BaseCollector"] ``` **Step 2: Create backend/app/services/collectors/base.py** ```python """ Base collector class for data collection jobs. """ from abc import ABC, abstractmethod from datetime import datetime from typing import Optional from sqlalchemy.orm import Session from app.models.stock import JobLog class BaseCollector(ABC): """Base class for all data collectors.""" def __init__(self, db: Session): self.db = db self.job_name = self.__class__.__name__ self.job_log: Optional[JobLog] = None def start_job(self) -> JobLog: """Create a job log entry when starting.""" self.job_log = JobLog( job_name=self.job_name, status="running", started_at=datetime.utcnow(), ) self.db.add(self.job_log) self.db.commit() return self.job_log def complete_job(self, records_count: int): """Mark job as completed.""" if self.job_log: self.job_log.status = "success" self.job_log.finished_at = datetime.utcnow() self.job_log.records_count = records_count self.db.commit() def fail_job(self, error_msg: str): """Mark job as failed.""" if self.job_log: self.job_log.status = "failed" self.job_log.finished_at = datetime.utcnow() self.job_log.error_msg = error_msg self.db.commit() @abstractmethod def collect(self) -> int: """ Perform the data collection. Returns the number of records collected. """ pass def run(self) -> JobLog: """Execute the collection job with logging.""" self.start_job() try: records = self.collect() self.complete_job(records) except Exception as e: self.fail_job(str(e)) raise return self.job_log ``` **Step 3: Update backend/app/services/__init__.py** ```python from app.services.collectors import BaseCollector __all__ = ["BaseCollector"] ``` **Step 4: Commit** ```bash git add backend/app/services/ git commit -m "feat: add base collector infrastructure for data collection jobs" ``` --- ## Task 2: Stock and Sector Collectors **Files:** - Create: `backend/app/services/collectors/stock_collector.py` - Create: `backend/app/services/collectors/sector_collector.py` - Update: `backend/app/services/collectors/__init__.py` **Step 1: Create backend/app/services/collectors/stock_collector.py** ```python """ Stock data collector from KRX. """ from io import BytesIO from datetime import datetime import pandas as pd import numpy as np import requests from sqlalchemy.orm import Session from sqlalchemy.dialects.postgresql import insert from app.services.collectors.base import BaseCollector from app.models.stock import Stock, StockType class StockCollector(BaseCollector): """Collects stock master data from KRX.""" GEN_OTP_URL = "http://data.krx.co.kr/comm/fileDn/GenerateOTP/generate.cmd" DOWN_URL = "http://data.krx.co.kr/comm/fileDn/download_csv/download.cmd" HEADERS = { "Referer": "http://data.krx.co.kr/contents/MDC/MDI/mdiLoader", "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36", } def __init__(self, db: Session, biz_day: str = None): super().__init__(db) self.biz_day = biz_day or self._get_latest_biz_day() def _get_latest_biz_day(self) -> str: """Get the latest business day from Naver Finance.""" from bs4 import BeautifulSoup import re url = "https://finance.naver.com/sise/sise_index.naver?code=KOSPI" response = requests.get(url) soup = BeautifulSoup(response.content, "lxml") time_elem = soup.select_one("div.ly_realtime > span#time") if time_elem: date_str = re.sub(r"[^0-9]", "", time_elem.text) return date_str[:8] return datetime.now().strftime("%Y%m%d") def _fetch_stock_data(self, mkt_id: str) -> pd.DataFrame: """Fetch stock data for a specific market.""" gen_otp_data = { "locale": "ko_KR", "mktId": mkt_id, "trdDd": self.biz_day, "money": "1", "csvxls_isNo": "false", "name": "fileDown", "url": "dbms/MDC/STAT/standard/MDCSTAT03901", } otp = requests.post(self.GEN_OTP_URL, data=gen_otp_data, headers=self.HEADERS) response = requests.post(self.DOWN_URL, data={"code": otp.text}, headers=self.HEADERS) return pd.read_csv(BytesIO(response.content), encoding="EUC-KR") def _fetch_ind_data(self) -> pd.DataFrame: """Fetch individual stock indicators.""" gen_otp_data = { "locale": "ko_KR", "searchType": "1", "mktId": "ALL", "trdDd": self.biz_day, "csvxls_isNo": "false", "name": "fileDown", "url": "dbms/MDC/STAT/standard/MDCSTAT03501", } otp = requests.post(self.GEN_OTP_URL, data=gen_otp_data, headers=self.HEADERS) response = requests.post(self.DOWN_URL, data={"code": otp.text}, headers=self.HEADERS) return pd.read_csv(BytesIO(response.content), encoding="EUC-KR") def _classify_stock_type(self, row: pd.Series) -> str: """Classify stock type based on name and code.""" name = row.get("종목명", "") code = row.get("종목코드", "") if "스팩" in name or "제" in name and "호" in name: return StockType.SPAC.value elif code and code[-1] != "0": return StockType.PREFERRED.value elif name.endswith("리츠"): return StockType.REIT.value else: return StockType.COMMON.value def collect(self) -> int: """Collect stock master data.""" # Fetch data from both markets kospi = self._fetch_stock_data("STK") kosdaq = self._fetch_stock_data("KSQ") krx_sector = pd.concat([kospi, kosdaq]).reset_index(drop=True) krx_sector["종목명"] = krx_sector["종목명"].str.strip() # Fetch individual indicators krx_ind = self._fetch_ind_data() krx_ind["종목명"] = krx_ind["종목명"].str.strip() # Merge data merged = pd.merge( krx_sector, krx_ind, on=krx_sector.columns.intersection(krx_ind.columns).tolist(), how="outer", ) merged.columns = merged.columns.str.replace(" ", "") # Process and insert records = [] for _, row in merged.iterrows(): stock_type = self._classify_stock_type(row) records.append({ "ticker": row.get("종목코드"), "name": row.get("종목명"), "market": row.get("시장구분", "KOSPI"), "close_price": row.get("종가") if pd.notna(row.get("종가")) else None, "market_cap": row.get("시가총액") if pd.notna(row.get("시가총액")) else None, "eps": row.get("EPS") if pd.notna(row.get("EPS")) else None, "forward_eps": row.get("선행EPS") if pd.notna(row.get("선행EPS")) else None, "bps": row.get("BPS") if pd.notna(row.get("BPS")) else None, "dividend_per_share": row.get("주당배당금") if pd.notna(row.get("주당배당금")) else None, "stock_type": stock_type, "base_date": datetime.strptime(self.biz_day, "%Y%m%d").date(), }) # Upsert using PostgreSQL INSERT ON CONFLICT if records: stmt = insert(Stock).values(records) stmt = stmt.on_conflict_do_update( index_elements=["ticker"], set_={ "name": stmt.excluded.name, "market": stmt.excluded.market, "close_price": stmt.excluded.close_price, "market_cap": stmt.excluded.market_cap, "eps": stmt.excluded.eps, "forward_eps": stmt.excluded.forward_eps, "bps": stmt.excluded.bps, "dividend_per_share": stmt.excluded.dividend_per_share, "stock_type": stmt.excluded.stock_type, "base_date": stmt.excluded.base_date, }, ) self.db.execute(stmt) self.db.commit() return len(records) ``` **Step 2: Create backend/app/services/collectors/sector_collector.py** ```python """ Sector data collector from WISEindex. """ import time from datetime import datetime import pandas as pd import requests from sqlalchemy.orm import Session from sqlalchemy.dialects.postgresql import insert from app.services.collectors.base import BaseCollector from app.models.stock import Sector class SectorCollector(BaseCollector): """Collects WICS sector classification data.""" SECTOR_CODES = ["G25", "G35", "G50", "G40", "G10", "G20", "G55", "G30", "G15", "G45"] def __init__(self, db: Session, biz_day: str = None): super().__init__(db) self.biz_day = biz_day or datetime.now().strftime("%Y%m%d") def collect(self) -> int: """Collect sector classification data.""" all_data = [] for sector_code in self.SECTOR_CODES: url = f"http://www.wiseindex.com/Index/GetIndexComponets?ceil_yn=0&dt={self.biz_day}&sec_cd={sector_code}" try: response = requests.get(url, timeout=10) data = response.json() if "list" in data: df = pd.json_normalize(data["list"]) all_data.append(df) except Exception: continue time.sleep(1) # Rate limiting if not all_data: return 0 sectors = pd.concat(all_data, axis=0) sectors = sectors[["IDX_CD", "CMP_CD", "CMP_KOR", "SEC_NM_KOR"]] records = [] base_date = datetime.strptime(self.biz_day, "%Y%m%d").date() for _, row in sectors.iterrows(): records.append({ "ticker": row["CMP_CD"], "sector_code": row["IDX_CD"], "company_name": row["CMP_KOR"], "sector_name": row["SEC_NM_KOR"], "base_date": base_date, }) if records: stmt = insert(Sector).values(records) stmt = stmt.on_conflict_do_update( index_elements=["ticker"], set_={ "sector_code": stmt.excluded.sector_code, "company_name": stmt.excluded.company_name, "sector_name": stmt.excluded.sector_name, "base_date": stmt.excluded.base_date, }, ) self.db.execute(stmt) self.db.commit() return len(records) ``` **Step 3: Update backend/app/services/collectors/__init__.py** ```python from app.services.collectors.base import BaseCollector from app.services.collectors.stock_collector import StockCollector from app.services.collectors.sector_collector import SectorCollector __all__ = ["BaseCollector", "StockCollector", "SectorCollector"] ``` **Step 4: Commit** ```bash git add backend/app/services/collectors/ git commit -m "feat: add stock and sector data collectors" ``` --- ## Task 3: Price Collector **Files:** - Create: `backend/app/services/collectors/price_collector.py` - Update: `backend/app/services/collectors/__init__.py` **Step 1: Create backend/app/services/collectors/price_collector.py** ```python """ Price data collector using pykrx. """ from datetime import datetime, timedelta import pandas as pd from pykrx import stock as pykrx_stock from sqlalchemy.orm import Session from sqlalchemy.dialects.postgresql import insert from app.services.collectors.base import BaseCollector from app.models.stock import Price, Stock class PriceCollector(BaseCollector): """Collects daily OHLCV price data.""" def __init__(self, db: Session, start_date: str = None, end_date: str = None): super().__init__(db) self.end_date = end_date or datetime.now().strftime("%Y%m%d") self.start_date = start_date or ( datetime.now() - timedelta(days=7) ).strftime("%Y%m%d") def collect(self) -> int: """Collect price data for all stocks.""" # Get list of tickers from database tickers = self.db.query(Stock.ticker).all() ticker_list = [t[0] for t in tickers] if not ticker_list: return 0 total_records = 0 # Fetch prices in batches for ticker in ticker_list: try: df = pykrx_stock.get_market_ohlcv( self.start_date, self.end_date, ticker ) if df.empty: continue df = df.reset_index() df.columns = ["date", "open", "high", "low", "close", "volume", "value", "change"] records = [] for _, row in df.iterrows(): records.append({ "ticker": ticker, "date": row["date"].date() if hasattr(row["date"], "date") else row["date"], "open": float(row["open"]), "high": float(row["high"]), "low": float(row["low"]), "close": float(row["close"]), "volume": int(row["volume"]), }) if records: stmt = insert(Price).values(records) stmt = stmt.on_conflict_do_update( index_elements=["ticker", "date"], set_={ "open": stmt.excluded.open, "high": stmt.excluded.high, "low": stmt.excluded.low, "close": stmt.excluded.close, "volume": stmt.excluded.volume, }, ) self.db.execute(stmt) total_records += len(records) except Exception: continue self.db.commit() return total_records ``` **Step 2: Update backend/app/services/collectors/__init__.py** ```python from app.services.collectors.base import BaseCollector from app.services.collectors.stock_collector import StockCollector from app.services.collectors.sector_collector import SectorCollector from app.services.collectors.price_collector import PriceCollector __all__ = ["BaseCollector", "StockCollector", "SectorCollector", "PriceCollector"] ``` **Step 3: Commit** ```bash git add backend/app/services/collectors/ git commit -m "feat: add price data collector using pykrx" ``` --- ## Task 4: Valuation Collector **Files:** - Create: `backend/app/services/collectors/valuation_collector.py` - Update: `backend/app/services/collectors/__init__.py` **Step 1: Create backend/app/services/collectors/valuation_collector.py** ```python """ Valuation data collector from KRX. """ from io import BytesIO from datetime import datetime import pandas as pd import requests from sqlalchemy.orm import Session from sqlalchemy.dialects.postgresql import insert from app.services.collectors.base import BaseCollector from app.models.stock import Valuation class ValuationCollector(BaseCollector): """Collects valuation metrics (PER, PBR, etc.) from KRX.""" GEN_OTP_URL = "http://data.krx.co.kr/comm/fileDn/GenerateOTP/generate.cmd" DOWN_URL = "http://data.krx.co.kr/comm/fileDn/download_csv/download.cmd" HEADERS = { "Referer": "http://data.krx.co.kr/contents/MDC/MDI/mdiLoader", "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36", } def __init__(self, db: Session, biz_day: str = None): super().__init__(db) self.biz_day = biz_day or datetime.now().strftime("%Y%m%d") def collect(self) -> int: """Collect valuation data.""" gen_otp_data = { "locale": "ko_KR", "searchType": "1", "mktId": "ALL", "trdDd": self.biz_day, "csvxls_isNo": "false", "name": "fileDown", "url": "dbms/MDC/STAT/standard/MDCSTAT03501", } otp = requests.post(self.GEN_OTP_URL, data=gen_otp_data, headers=self.HEADERS) response = requests.post(self.DOWN_URL, data={"code": otp.text}, headers=self.HEADERS) df = pd.read_csv(BytesIO(response.content), encoding="EUC-KR") df.columns = df.columns.str.replace(" ", "") base_date = datetime.strptime(self.biz_day, "%Y%m%d").date() records = [] for _, row in df.iterrows(): ticker = row.get("종목코드") if not ticker: continue records.append({ "ticker": ticker, "base_date": base_date, "per": float(row["PER"]) if pd.notna(row.get("PER")) else None, "pbr": float(row["PBR"]) if pd.notna(row.get("PBR")) else None, "psr": None, # Not available from this endpoint "pcr": None, # Not available from this endpoint "dividend_yield": float(row["배당수익률"]) if pd.notna(row.get("배당수익률")) else None, }) if records: stmt = insert(Valuation).values(records) stmt = stmt.on_conflict_do_update( index_elements=["ticker", "base_date"], set_={ "per": stmt.excluded.per, "pbr": stmt.excluded.pbr, "psr": stmt.excluded.psr, "pcr": stmt.excluded.pcr, "dividend_yield": stmt.excluded.dividend_yield, }, ) self.db.execute(stmt) self.db.commit() return len(records) ``` **Step 2: Update backend/app/services/collectors/__init__.py** ```python from app.services.collectors.base import BaseCollector from app.services.collectors.stock_collector import StockCollector from app.services.collectors.sector_collector import SectorCollector from app.services.collectors.price_collector import PriceCollector from app.services.collectors.valuation_collector import ValuationCollector __all__ = [ "BaseCollector", "StockCollector", "SectorCollector", "PriceCollector", "ValuationCollector", ] ``` **Step 3: Commit** ```bash git add backend/app/services/collectors/ git commit -m "feat: add valuation data collector" ``` --- ## Task 5: Admin Data Collection API **Files:** - Create: `backend/app/api/admin.py` - Update: `backend/app/api/__init__.py` - Update: `backend/app/main.py` **Step 1: Create backend/app/api/admin.py** ```python """ Admin API for data collection management. """ from typing import List from fastapi import APIRouter, Depends, HTTPException, BackgroundTasks from sqlalchemy.orm import Session from pydantic import BaseModel from app.core.database import get_db from app.api.deps import CurrentUser from app.models.stock import JobLog from app.services.collectors import ( StockCollector, SectorCollector, PriceCollector, ValuationCollector, ) router = APIRouter(prefix="/api/admin", tags=["admin"]) class JobLogResponse(BaseModel): id: int job_name: str status: str started_at: str finished_at: str | None records_count: int | None error_msg: str | None class Config: from_attributes = True class CollectResponse(BaseModel): message: str job_id: int def run_collector(collector_class, db: Session, **kwargs): """Run a collector and return job log.""" collector = collector_class(db, **kwargs) return collector.run() @router.post("/collect/stocks", response_model=CollectResponse) async def collect_stocks( current_user: CurrentUser, db: Session = Depends(get_db), biz_day: str = None, ): """Collect stock master data from KRX.""" collector = StockCollector(db, biz_day=biz_day) job_log = collector.run() return CollectResponse( message=f"Stock collection completed: {job_log.records_count} records", job_id=job_log.id, ) @router.post("/collect/sectors", response_model=CollectResponse) async def collect_sectors( current_user: CurrentUser, db: Session = Depends(get_db), biz_day: str = None, ): """Collect sector classification data from WISEindex.""" collector = SectorCollector(db, biz_day=biz_day) job_log = collector.run() return CollectResponse( message=f"Sector collection completed: {job_log.records_count} records", job_id=job_log.id, ) @router.post("/collect/prices", response_model=CollectResponse) async def collect_prices( current_user: CurrentUser, db: Session = Depends(get_db), start_date: str = None, end_date: str = None, ): """Collect price data using pykrx.""" collector = PriceCollector(db, start_date=start_date, end_date=end_date) job_log = collector.run() return CollectResponse( message=f"Price collection completed: {job_log.records_count} records", job_id=job_log.id, ) @router.post("/collect/valuations", response_model=CollectResponse) async def collect_valuations( current_user: CurrentUser, db: Session = Depends(get_db), biz_day: str = None, ): """Collect valuation data from KRX.""" collector = ValuationCollector(db, biz_day=biz_day) job_log = collector.run() return CollectResponse( message=f"Valuation collection completed: {job_log.records_count} records", job_id=job_log.id, ) @router.get("/collect/status", response_model=List[JobLogResponse]) async def get_collection_status( current_user: CurrentUser, db: Session = Depends(get_db), limit: int = 20, ): """Get recent job execution status.""" jobs = ( db.query(JobLog) .order_by(JobLog.started_at.desc()) .limit(limit) .all() ) return jobs ``` **Step 2: Update backend/app/api/__init__.py** ```python from app.api.auth import router as auth_router from app.api.admin import router as admin_router __all__ = ["auth_router", "admin_router"] ``` **Step 3: Update backend/app/main.py** ```python """ Galaxy-PO Backend API """ from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware from app.api import auth_router, admin_router app = FastAPI( title="Galaxy-PO API", description="Quant Portfolio Management API", version="0.1.0", ) app.add_middleware( CORSMiddleware, allow_origins=["http://localhost:3000"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Include routers app.include_router(auth_router) app.include_router(admin_router) @app.get("/health") async def health_check(): return {"status": "healthy"} ``` **Step 4: Commit** ```bash git add backend/app/api/ backend/app/main.py git commit -m "feat: add admin API for data collection management" ``` --- ## Task 6: Frontend Data Management Page **Files:** - Create: `frontend/src/app/admin/data/page.tsx` **Step 1: Create frontend/src/app/admin/data/page.tsx** ```typescript 'use client'; import { useEffect, useState } from 'react'; import { useRouter } from 'next/navigation'; import Sidebar from '@/components/layout/Sidebar'; import Header from '@/components/layout/Header'; import { api } from '@/lib/api'; interface JobLog { id: number; job_name: string; status: string; started_at: string; finished_at: string | null; records_count: number | null; error_msg: string | null; } interface User { id: number; username: string; email: string; } const collectors = [ { key: 'stocks', label: '종목 마스터', description: 'KRX에서 종목 정보 수집' }, { key: 'sectors', label: '섹터 정보', description: 'WISEindex에서 섹터 분류 수집' }, { key: 'prices', label: '가격 데이터', description: 'pykrx로 OHLCV 데이터 수집' }, { key: 'valuations', label: '밸류 지표', description: 'KRX에서 PER/PBR 등 수집' }, ]; export default function DataManagementPage() { const router = useRouter(); const [user, setUser] = useState(null); const [loading, setLoading] = useState(true); const [jobs, setJobs] = useState([]); const [collecting, setCollecting] = useState(null); useEffect(() => { const init = async () => { try { const userData = await api.getCurrentUser() as User; setUser(userData); await fetchJobs(); } catch { router.push('/login'); } finally { setLoading(false); } }; init(); }, [router]); const fetchJobs = async () => { try { const data = await api.get('/api/admin/collect/status'); setJobs(data); } catch (err) { console.error('Failed to fetch jobs:', err); } }; const runCollector = async (key: string) => { setCollecting(key); try { await api.post(`/api/admin/collect/${key}`); await fetchJobs(); } catch (err) { console.error('Collection failed:', err); } finally { setCollecting(null); } }; const getStatusBadge = (status: string) => { const colors: Record = { success: 'bg-green-100 text-green-800', failed: 'bg-red-100 text-red-800', running: 'bg-yellow-100 text-yellow-800', }; return colors[status] || 'bg-gray-100 text-gray-800'; }; if (loading) { return (
Loading...
); } return (

데이터 수집 관리

수집 작업

{collectors.map((col) => (

{col.label}

{col.description}

))}

최근 작업 이력

{jobs.map((job) => ( ))} {jobs.length === 0 && ( )}
작업명 상태 시작 시간 건수 에러
{job.job_name} {job.status} {new Date(job.started_at).toLocaleString('ko-KR')} {job.records_count ?? '-'} {job.error_msg || '-'}
아직 수집 이력이 없습니다.
); } ``` **Step 2: Commit** ```bash git add frontend/src/app/admin/ git commit -m "feat: add data management admin page" ``` --- ## Task 7: Verify Phase 2 Integration **Step 1: Verify all files exist** Check: - backend/app/services/collectors/ (base, stock, sector, price, valuation) - backend/app/api/admin.py - frontend/src/app/admin/data/page.tsx **Step 2: Run frontend build** ```bash cd /home/zephyrdark/workspace/quant/galaxy-po/frontend npm run build ``` **Step 3: Final commit** ```bash git log --oneline -10 ``` --- ## Summary Phase 2 완료 시 구현된 기능: - BaseCollector 클래스 (작업 로깅 포함) - StockCollector (KRX 종목 마스터) - SectorCollector (WICS 섹터 분류) - PriceCollector (pykrx OHLCV 데이터) - ValuationCollector (PER, PBR 등) - Admin API (/api/admin/collect/*) - Data Management UI 페이지 다음 Phase: 포트폴리오 관리 기능 구현