210 lines
6.2 KiB
Python
210 lines
6.2 KiB
Python
|
|
"""Financial statement data crawler (재무제표 수집)."""
|
||
|
|
import re
|
||
|
|
import time
|
||
|
|
from typing import List, Optional
|
||
|
|
|
||
|
|
import pandas as pd
|
||
|
|
import requests as rq
|
||
|
|
from bs4 import BeautifulSoup
|
||
|
|
from tqdm import tqdm
|
||
|
|
from sqlalchemy.orm import Session
|
||
|
|
|
||
|
|
from app.models.asset import Asset
|
||
|
|
from app.models.financial import FinancialStatement
|
||
|
|
|
||
|
|
|
||
|
|
def clean_fs(df: pd.DataFrame, ticker: str, frequency: str) -> pd.DataFrame:
|
||
|
|
"""
|
||
|
|
재무제표 데이터 클렌징.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
df: 재무제표 DataFrame
|
||
|
|
ticker: 종목코드
|
||
|
|
frequency: 공시구분 ('Y': 연간, 'Q': 분기)
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
클렌징된 DataFrame
|
||
|
|
"""
|
||
|
|
# 빈 행 제거
|
||
|
|
df = df[~df.loc[:, ~df.columns.isin(['계정'])].isna().all(axis=1)]
|
||
|
|
|
||
|
|
# 중복 계정 제거
|
||
|
|
df = df.drop_duplicates(['계정'], keep='first')
|
||
|
|
|
||
|
|
# Long 형태로 변환
|
||
|
|
df = pd.melt(df, id_vars='계정', var_name='기준일', value_name='값')
|
||
|
|
|
||
|
|
# 결측치 제거
|
||
|
|
df = df[~pd.isnull(df['값'])]
|
||
|
|
|
||
|
|
# 계정명 정리
|
||
|
|
df['계정'] = df['계정'].replace({'계산에 참여한 계정 펼치기': ''}, regex=True)
|
||
|
|
|
||
|
|
# 기준일 변환 (월말)
|
||
|
|
df['기준일'] = pd.to_datetime(df['기준일'], format='%Y/%m') + pd.tseries.offsets.MonthEnd()
|
||
|
|
|
||
|
|
df['종목코드'] = ticker
|
||
|
|
df['공시구분'] = frequency
|
||
|
|
|
||
|
|
return df
|
||
|
|
|
||
|
|
|
||
|
|
def get_financial_data_from_fnguide(ticker: str) -> Optional[pd.DataFrame]:
|
||
|
|
"""
|
||
|
|
FnGuide에서 재무제표 데이터 다운로드.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
ticker: 종목코드
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
재무제표 DataFrame (실패 시 None)
|
||
|
|
"""
|
||
|
|
try:
|
||
|
|
# URL 생성
|
||
|
|
url = f'https://comp.fnguide.com/SVO2/ASP/SVD_Finance.asp?pGB=1&gicode=A{ticker}'
|
||
|
|
|
||
|
|
# 데이터 받아오기
|
||
|
|
data = pd.read_html(url, displayed_only=False)
|
||
|
|
|
||
|
|
# 연간 데이터
|
||
|
|
data_fs_y = pd.concat([
|
||
|
|
data[0].iloc[:, ~data[0].columns.str.contains('전년동기')],
|
||
|
|
data[2],
|
||
|
|
data[4]
|
||
|
|
])
|
||
|
|
data_fs_y = data_fs_y.rename(columns={data_fs_y.columns[0]: "계정"})
|
||
|
|
|
||
|
|
# 결산년 찾기
|
||
|
|
page_data = rq.get(url, timeout=30)
|
||
|
|
page_data_html = BeautifulSoup(page_data.content, 'html.parser')
|
||
|
|
|
||
|
|
fiscal_data = page_data_html.select('div.corp_group1 > h2')
|
||
|
|
if len(fiscal_data) < 2:
|
||
|
|
print(f"종목 {ticker}: 결산년 정보 없음")
|
||
|
|
return None
|
||
|
|
|
||
|
|
fiscal_data_text = fiscal_data[1].text
|
||
|
|
fiscal_data_text = re.findall('[0-9]+', fiscal_data_text)
|
||
|
|
|
||
|
|
# 결산년에 해당하는 계정만 남기기
|
||
|
|
data_fs_y = data_fs_y.loc[:, (data_fs_y.columns == '계정') | (
|
||
|
|
data_fs_y.columns.str[-2:].isin(fiscal_data_text))]
|
||
|
|
|
||
|
|
# 클렌징
|
||
|
|
data_fs_y_clean = clean_fs(data_fs_y, ticker, 'Y')
|
||
|
|
|
||
|
|
# 분기 데이터
|
||
|
|
data_fs_q = pd.concat([
|
||
|
|
data[1].iloc[:, ~data[1].columns.str.contains('전년동기')],
|
||
|
|
data[3],
|
||
|
|
data[5]
|
||
|
|
])
|
||
|
|
data_fs_q = data_fs_q.rename(columns={data_fs_q.columns[0]: "계정"})
|
||
|
|
|
||
|
|
data_fs_q_clean = clean_fs(data_fs_q, ticker, 'Q')
|
||
|
|
|
||
|
|
# 두개 합치기
|
||
|
|
data_fs_bind = pd.concat([data_fs_y_clean, data_fs_q_clean])
|
||
|
|
|
||
|
|
return data_fs_bind
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
print(f"종목 {ticker} 재무제표 다운로드 오류: {e}")
|
||
|
|
return None
|
||
|
|
|
||
|
|
|
||
|
|
def process_financial_data(
|
||
|
|
db_session: Session,
|
||
|
|
tickers: Optional[List[str]] = None,
|
||
|
|
sleep_time: float = 2.0
|
||
|
|
) -> dict:
|
||
|
|
"""
|
||
|
|
재무제표 데이터 수집 및 저장.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
db_session: 데이터베이스 세션
|
||
|
|
tickers: 종목코드 리스트 (None이면 전체 종목)
|
||
|
|
sleep_time: 요청 간격 (초)
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
{'success': 성공 종목 수, 'failed': 실패 종목 리스트}
|
||
|
|
"""
|
||
|
|
# 종목 리스트 조회
|
||
|
|
if tickers is None:
|
||
|
|
assets = db_session.query(Asset).filter(
|
||
|
|
Asset.is_active == True,
|
||
|
|
Asset.stock_type == '보통주' # 보통주만 조회
|
||
|
|
).all()
|
||
|
|
tickers = [asset.ticker for asset in assets]
|
||
|
|
print(f"전체 {len(tickers)}개 종목 재무제표 수집 시작")
|
||
|
|
else:
|
||
|
|
print(f"{len(tickers)}개 종목 재무제표 수집 시작")
|
||
|
|
|
||
|
|
# 결과 추적
|
||
|
|
success_count = 0
|
||
|
|
error_list = []
|
||
|
|
|
||
|
|
# 전종목 재무제표 다운로드 및 저장
|
||
|
|
for ticker in tqdm(tickers):
|
||
|
|
try:
|
||
|
|
# FnGuide에서 데이터 다운로드
|
||
|
|
fs_df = get_financial_data_from_fnguide(ticker)
|
||
|
|
|
||
|
|
if fs_df is None or fs_df.empty:
|
||
|
|
error_list.append(ticker)
|
||
|
|
continue
|
||
|
|
|
||
|
|
# 데이터베이스 저장
|
||
|
|
save_financial_to_db(fs_df, db_session)
|
||
|
|
success_count += 1
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
print(f"종목 {ticker} 처리 오류: {e}")
|
||
|
|
error_list.append(ticker)
|
||
|
|
|
||
|
|
# 요청 간격
|
||
|
|
time.sleep(sleep_time)
|
||
|
|
|
||
|
|
print(f"\n재무제표 수집 완료: 성공 {success_count}개, 실패 {len(error_list)}개")
|
||
|
|
if error_list:
|
||
|
|
print(f"실패 종목: {error_list[:10]}...") # 처음 10개만 출력
|
||
|
|
|
||
|
|
return {
|
||
|
|
'success': success_count,
|
||
|
|
'failed': error_list
|
||
|
|
}
|
||
|
|
|
||
|
|
|
||
|
|
def save_financial_to_db(fs_df: pd.DataFrame, db_session: Session):
|
||
|
|
"""
|
||
|
|
재무제표 데이터를 PostgreSQL에 저장 (UPSERT).
|
||
|
|
|
||
|
|
Args:
|
||
|
|
fs_df: 재무제표 DataFrame
|
||
|
|
db_session: 데이터베이스 세션
|
||
|
|
"""
|
||
|
|
for _, row in fs_df.iterrows():
|
||
|
|
# 기존 레코드 조회
|
||
|
|
existing = db_session.query(FinancialStatement).filter(
|
||
|
|
FinancialStatement.ticker == row['종목코드'],
|
||
|
|
FinancialStatement.account == row['계정'],
|
||
|
|
FinancialStatement.base_date == row['기준일'],
|
||
|
|
FinancialStatement.disclosure_type == row['공시구분']
|
||
|
|
).first()
|
||
|
|
|
||
|
|
if existing:
|
||
|
|
# 업데이트
|
||
|
|
existing.value = row['값']
|
||
|
|
else:
|
||
|
|
# 신규 삽입
|
||
|
|
fs = FinancialStatement(
|
||
|
|
ticker=row['종목코드'],
|
||
|
|
account=row['계정'],
|
||
|
|
base_date=row['기준일'],
|
||
|
|
value=row['값'],
|
||
|
|
disclosure_type=row['공시구분']
|
||
|
|
)
|
||
|
|
db_session.add(fs)
|
||
|
|
|
||
|
|
db_session.commit()
|