fix: add error handling, validation, and logging to collectors
- Add REQUEST_TIMEOUT and RATE_LIMIT_DELAY constants to StockCollector - Add timeout parameter to all HTTP requests - Wrap HTTP requests in try-except with proper error handling - Add _validate_biz_day() method to both collectors - Add validation for required fields (ticker, name) before insert - Replace generic Exception with specific exception types in SectorCollector - Add logging module and logger to both collectors - Remove unused numpy import from StockCollector Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
parent
5479c36985
commit
aed636f2b3
@ -1,6 +1,7 @@
|
||||
"""
|
||||
Sector data collector from WISEindex.
|
||||
"""
|
||||
import logging
|
||||
import time
|
||||
from datetime import datetime
|
||||
|
||||
@ -13,6 +14,8 @@ from sqlalchemy.dialects.postgresql import insert
|
||||
from app.services.collectors.base import BaseCollector
|
||||
from app.models.stock import Sector
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class SectorCollector(BaseCollector):
|
||||
"""Collects WICS sector classification data."""
|
||||
@ -22,6 +25,14 @@ class SectorCollector(BaseCollector):
|
||||
def __init__(self, db: Session, biz_day: str = None):
|
||||
super().__init__(db)
|
||||
self.biz_day = biz_day or datetime.now().strftime("%Y%m%d")
|
||||
self._validate_biz_day()
|
||||
|
||||
def _validate_biz_day(self) -> None:
|
||||
"""Validate business day format."""
|
||||
try:
|
||||
datetime.strptime(self.biz_day, "%Y%m%d")
|
||||
except ValueError:
|
||||
raise ValueError(f"Invalid biz_day format. Expected YYYYMMDD, got: {self.biz_day}")
|
||||
|
||||
def collect(self) -> int:
|
||||
"""Collect sector classification data."""
|
||||
@ -35,7 +46,8 @@ class SectorCollector(BaseCollector):
|
||||
if "list" in data:
|
||||
df = pd.json_normalize(data["list"])
|
||||
all_data.append(df)
|
||||
except Exception:
|
||||
except (requests.RequestException, ValueError, KeyError) as e:
|
||||
logger.warning(f"Failed to fetch sector {sector_code}: {e}")
|
||||
continue
|
||||
time.sleep(1) # Rate limiting
|
||||
|
||||
|
||||
@ -1,11 +1,12 @@
|
||||
"""
|
||||
Stock data collector from KRX.
|
||||
"""
|
||||
import logging
|
||||
import time
|
||||
from io import BytesIO
|
||||
from datetime import datetime
|
||||
|
||||
import pandas as pd
|
||||
import numpy as np
|
||||
import requests
|
||||
|
||||
from sqlalchemy.orm import Session
|
||||
@ -14,6 +15,8 @@ from sqlalchemy.dialects.postgresql import insert
|
||||
from app.services.collectors.base import BaseCollector
|
||||
from app.models.stock import Stock, StockType
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class StockCollector(BaseCollector):
|
||||
"""Collects stock master data from KRX."""
|
||||
@ -24,10 +27,20 @@ class StockCollector(BaseCollector):
|
||||
"Referer": "http://data.krx.co.kr/contents/MDC/MDI/mdiLoader",
|
||||
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
|
||||
}
|
||||
REQUEST_TIMEOUT = 10
|
||||
RATE_LIMIT_DELAY = 1
|
||||
|
||||
def __init__(self, db: Session, biz_day: str = None):
|
||||
super().__init__(db)
|
||||
self.biz_day = biz_day or self._get_latest_biz_day()
|
||||
self._validate_biz_day()
|
||||
|
||||
def _validate_biz_day(self) -> None:
|
||||
"""Validate business day format."""
|
||||
try:
|
||||
datetime.strptime(self.biz_day, "%Y%m%d")
|
||||
except ValueError:
|
||||
raise ValueError(f"Invalid biz_day format. Expected YYYYMMDD, got: {self.biz_day}")
|
||||
|
||||
def _get_latest_biz_day(self) -> str:
|
||||
"""Get the latest business day from Naver Finance."""
|
||||
@ -35,7 +48,7 @@ class StockCollector(BaseCollector):
|
||||
import re
|
||||
|
||||
url = "https://finance.naver.com/sise/sise_index.naver?code=KOSPI"
|
||||
response = requests.get(url)
|
||||
response = requests.get(url, timeout=self.REQUEST_TIMEOUT)
|
||||
soup = BeautifulSoup(response.content, "lxml")
|
||||
time_elem = soup.select_one("div.ly_realtime > span#time")
|
||||
if time_elem:
|
||||
@ -54,9 +67,25 @@ class StockCollector(BaseCollector):
|
||||
"name": "fileDown",
|
||||
"url": "dbms/MDC/STAT/standard/MDCSTAT03901",
|
||||
}
|
||||
otp = requests.post(self.GEN_OTP_URL, data=gen_otp_data, headers=self.HEADERS)
|
||||
response = requests.post(self.DOWN_URL, data={"code": otp.text}, headers=self.HEADERS)
|
||||
return pd.read_csv(BytesIO(response.content), encoding="EUC-KR")
|
||||
try:
|
||||
otp = requests.post(
|
||||
self.GEN_OTP_URL,
|
||||
data=gen_otp_data,
|
||||
headers=self.HEADERS,
|
||||
timeout=self.REQUEST_TIMEOUT
|
||||
)
|
||||
otp.raise_for_status()
|
||||
response = requests.post(
|
||||
self.DOWN_URL,
|
||||
data={"code": otp.text},
|
||||
headers=self.HEADERS,
|
||||
timeout=self.REQUEST_TIMEOUT
|
||||
)
|
||||
response.raise_for_status()
|
||||
time.sleep(self.RATE_LIMIT_DELAY)
|
||||
return pd.read_csv(BytesIO(response.content), encoding="EUC-KR")
|
||||
except requests.RequestException as e:
|
||||
raise RuntimeError(f"Failed to fetch stock data for market {mkt_id}: {e}")
|
||||
|
||||
def _fetch_ind_data(self) -> pd.DataFrame:
|
||||
"""Fetch individual stock indicators."""
|
||||
@ -69,9 +98,25 @@ class StockCollector(BaseCollector):
|
||||
"name": "fileDown",
|
||||
"url": "dbms/MDC/STAT/standard/MDCSTAT03501",
|
||||
}
|
||||
otp = requests.post(self.GEN_OTP_URL, data=gen_otp_data, headers=self.HEADERS)
|
||||
response = requests.post(self.DOWN_URL, data={"code": otp.text}, headers=self.HEADERS)
|
||||
return pd.read_csv(BytesIO(response.content), encoding="EUC-KR")
|
||||
try:
|
||||
otp = requests.post(
|
||||
self.GEN_OTP_URL,
|
||||
data=gen_otp_data,
|
||||
headers=self.HEADERS,
|
||||
timeout=self.REQUEST_TIMEOUT
|
||||
)
|
||||
otp.raise_for_status()
|
||||
response = requests.post(
|
||||
self.DOWN_URL,
|
||||
data={"code": otp.text},
|
||||
headers=self.HEADERS,
|
||||
timeout=self.REQUEST_TIMEOUT
|
||||
)
|
||||
response.raise_for_status()
|
||||
time.sleep(self.RATE_LIMIT_DELAY)
|
||||
return pd.read_csv(BytesIO(response.content), encoding="EUC-KR")
|
||||
except requests.RequestException as e:
|
||||
raise RuntimeError(f"Failed to fetch indicator data: {e}")
|
||||
|
||||
def _classify_stock_type(self, row: pd.Series) -> str:
|
||||
"""Classify stock type based on name and code."""
|
||||
@ -111,6 +156,12 @@ class StockCollector(BaseCollector):
|
||||
# Process and insert
|
||||
records = []
|
||||
for _, row in merged.iterrows():
|
||||
# Skip rows with missing required fields
|
||||
ticker = row.get("종목코드")
|
||||
name = row.get("종목명")
|
||||
if not ticker or pd.isna(ticker) or not name or pd.isna(name):
|
||||
continue
|
||||
|
||||
stock_type = self._classify_stock_type(row)
|
||||
records.append({
|
||||
"ticker": row.get("종목코드"),
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user