251 lines
8.3 KiB
Python
Raw Permalink Normal View History

2026-01-31 23:30:51 +09:00
"""KRX data crawler (종목 정보 수집)."""
import re
import time
from io import BytesIO
from datetime import datetime
from typing import Optional
import numpy as np
import pandas as pd
import requests as rq
from bs4 import BeautifulSoup
from sqlalchemy.orm import Session
from app.models.asset import Asset
# KRX 다운로드 URL
GEN_OTP_URL = 'http://data.krx.co.kr/comm/fileDn/GenerateOTP/generate.cmd'
DOWN_URL = 'http://data.krx.co.kr/comm/fileDn/download_csv/download.cmd'
def get_latest_biz_day() -> str:
"""
최근 영업일 조회 (Naver 증거금).
Returns:
영업일 (YYYYMMDD 형식)
"""
try:
url = 'https://finance.naver.com/sise/sise_deposit.nhn'
data = rq.post(url, timeout=30)
data_html = BeautifulSoup(data.content, 'lxml')
parse_day = data_html.select_one('div.subtop_sise_graph2 > ul.subtop_chart_note > li > span.tah').text
biz_day = re.findall('[0-9]+', parse_day)
biz_day = ''.join(biz_day)
return biz_day
except Exception as e:
print(f"최근 영업일 조회 오류 (방법1): {e}")
return get_latest_biz_day2()
def get_latest_biz_day2() -> str:
"""
최근 영업일 조회 (Naver KOSPI, 대체 방법).
Returns:
영업일 (YYYYMMDD 형식)
"""
try:
url = 'https://finance.naver.com/sise/sise_index.naver?code=KOSPI'
data = rq.post(url, timeout=30)
data_html = BeautifulSoup(data.content, 'lxml')
parse_day = data_html.select_one('div.group_heading > div.ly_realtime > span#time').text
biz_day = re.findall('[0-9]+', parse_day)
biz_day = ''.join(biz_day)
return biz_day
except Exception as e:
print(f"최근 영업일 조회 오류 (방법2): {e}")
raise
def get_stock_data(biz_day: str, mkt_id: str) -> pd.DataFrame:
"""
KRX 업종 분류 현황 조회.
Args:
biz_day: 영업일 (YYYYMMDD)
mkt_id: 시장 구분 (STK: 코스피, KSQ: 코스닥)
Returns:
업종 분류 DataFrame
"""
gen_otp_data = {
'locale': 'ko_KR',
'mktId': mkt_id,
'trdDd': biz_day,
'money': '1',
'csvxls_isNo': 'false',
'name': 'fileDown',
'url': 'dbms/MDC/STAT/standard/MDCSTAT03901'
}
headers = {
'Referer': 'http://data.krx.co.kr/contents/MDC/MDI/mdiLoader/index.cmd?menuId=MDC0201050201',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36'
}
otp = rq.post(url=GEN_OTP_URL, data=gen_otp_data, headers=headers, verify=False, timeout=30)
down_sector = rq.post(url=DOWN_URL, data={'code': otp.text}, headers=headers, timeout=30)
return pd.read_csv(BytesIO(down_sector.content), encoding='EUC-KR')
def get_ind_stock_data(biz_day: str) -> pd.DataFrame:
"""
KRX 개별 지표 조회.
Args:
biz_day: 영업일 (YYYYMMDD)
Returns:
개별 지표 DataFrame
"""
gen_otp_data = {
'locale': 'ko_KR',
'searchType': '1',
'mktId': 'ALL',
'trdDd': biz_day,
'csvxls_isNo': 'false',
'name': 'fileDown',
'url': 'dbms/MDC/STAT/standard/MDCSTAT03501'
}
headers = {
'Referer': 'http://data.krx.co.kr/contents/MDC/MDI/mdiLoader/index.cmd?menuId=MDC0201050201',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36'
}
otp = rq.post(url=GEN_OTP_URL, data=gen_otp_data, headers=headers, verify=False, timeout=30)
down_ind_sector = rq.post(url=DOWN_URL, data={'code': otp.text}, headers=headers, timeout=30)
return pd.read_csv(BytesIO(down_ind_sector.content), encoding='EUC-KR')
def process_ticker_data(biz_day: Optional[str] = None, db_session: Session = None) -> pd.DataFrame:
"""
종목 데이터 수집 처리.
Args:
biz_day: 영업일 (YYYYMMDD, None이면 최근 영업일 자동 조회)
db_session: 데이터베이스 세션
Returns:
처리된 종목 DataFrame
"""
if biz_day is None:
biz_day = get_latest_biz_day2()
print(f"최근 영업일: {biz_day}")
# 1. 업종 분류 현황 (코스피, 코스닥)
print("코스피 데이터 수집 중...")
sector_stk = get_stock_data(biz_day, 'STK')
time.sleep(1)
print("코스닥 데이터 수집 중...")
sector_ksq = get_stock_data(biz_day, 'KSQ')
time.sleep(1)
# 합치기
krx_sector = pd.concat([sector_stk, sector_ksq]).reset_index(drop=True)
krx_sector['종목명'] = krx_sector['종목명'].str.strip()
krx_sector['기준일'] = biz_day
# 2. 개별 지표 조회
print("개별 지표 수집 중...")
krx_ind = get_ind_stock_data(biz_day)
krx_ind['종목명'] = krx_ind['종목명'].str.strip()
krx_ind['기준일'] = biz_day
# 3. 데이터 병합
# 종목, 개별 중 한군데만 있는 데이터 삭제 (선박펀드, 광물펀드, 해외종목 등)
diff = list(set(krx_sector['종목명']).symmetric_difference(set(krx_ind['종목명'])))
kor_ticker = pd.merge(
krx_sector,
krx_ind,
on=krx_sector.columns.intersection(krx_ind.columns).tolist(),
how='outer'
)
# 4. 종목 구분 (보통주, 우선주, 스팩, 리츠, 기타)
kor_ticker['종목구분'] = np.where(
kor_ticker['종목명'].str.contains('스팩|제[0-9]+호'),
'스팩',
np.where(
kor_ticker['종목코드'].str[-1:] != '0',
'우선주',
np.where(
kor_ticker['종목명'].str.endswith('리츠'),
'리츠',
np.where(
kor_ticker['종목명'].isin(diff),
'기타',
'보통주'
)
)
)
)
# 5. 데이터 정리
kor_ticker = kor_ticker.reset_index(drop=True)
kor_ticker.columns = kor_ticker.columns.str.replace(' ', '')
kor_ticker = kor_ticker[[
'종목코드', '종목명', '시장구분', '종가',
'시가총액', '기준일', 'EPS', '선행EPS', 'BPS', '주당배당금', '종목구분'
]]
kor_ticker = kor_ticker.replace({np.nan: None})
kor_ticker['기준일'] = pd.to_datetime(kor_ticker['기준일'])
# 6. 데이터베이스 저장
if db_session:
save_ticker_to_db(kor_ticker, db_session)
return kor_ticker
def save_ticker_to_db(ticker_df: pd.DataFrame, db_session: Session):
"""
종목 데이터를 PostgreSQL에 저장 (UPSERT).
Args:
ticker_df: 종목 DataFrame
db_session: 데이터베이스 세션
"""
print(f"데이터베이스에 {len(ticker_df)}개 종목 저장 중...")
for _, row in ticker_df.iterrows():
# 기존 레코드 조회
existing = db_session.query(Asset).filter(
Asset.ticker == row['종목코드']
).first()
if existing:
# 업데이트
existing.name = row['종목명']
existing.market = row['시장구분']
existing.last_price = row['종가'] if row['종가'] else None
existing.market_cap = row['시가총액'] if row['시가총액'] else None
existing.eps = row['EPS'] if row['EPS'] else None
existing.bps = row['BPS'] if row['BPS'] else None
existing.dividend_per_share = row['주당배당금'] if row['주당배당금'] else None
existing.stock_type = row['종목구분']
existing.base_date = row['기준일']
existing.is_active = True
else:
# 신규 삽입
asset = Asset(
ticker=row['종목코드'],
name=row['종목명'],
market=row['시장구분'],
last_price=row['종가'] if row['종가'] else None,
market_cap=row['시가총액'] if row['시가총액'] else None,
eps=row['EPS'] if row['EPS'] else None,
bps=row['BPS'] if row['BPS'] else None,
dividend_per_share=row['주당배당금'] if row['주당배당금'] else None,
stock_type=row['종목구분'],
base_date=row['기준일'],
is_active=True
)
db_session.add(asset)
db_session.commit()
print("종목 데이터 저장 완료")