210 lines
6.2 KiB
Python

"""Financial statement data crawler (재무제표 수집)."""
import re
import time
from typing import List, Optional
import pandas as pd
import requests as rq
from bs4 import BeautifulSoup
from tqdm import tqdm
from sqlalchemy.orm import Session
from app.models.asset import Asset
from app.models.financial import FinancialStatement
def clean_fs(df: pd.DataFrame, ticker: str, frequency: str) -> pd.DataFrame:
"""
재무제표 데이터 클렌징.
Args:
df: 재무제표 DataFrame
ticker: 종목코드
frequency: 공시구분 ('Y': 연간, 'Q': 분기)
Returns:
클렌징된 DataFrame
"""
# 빈 행 제거
df = df[~df.loc[:, ~df.columns.isin(['계정'])].isna().all(axis=1)]
# 중복 계정 제거
df = df.drop_duplicates(['계정'], keep='first')
# Long 형태로 변환
df = pd.melt(df, id_vars='계정', var_name='기준일', value_name='')
# 결측치 제거
df = df[~pd.isnull(df[''])]
# 계정명 정리
df['계정'] = df['계정'].replace({'계산에 참여한 계정 펼치기': ''}, regex=True)
# 기준일 변환 (월말)
df['기준일'] = pd.to_datetime(df['기준일'], format='%Y/%m') + pd.tseries.offsets.MonthEnd()
df['종목코드'] = ticker
df['공시구분'] = frequency
return df
def get_financial_data_from_fnguide(ticker: str) -> Optional[pd.DataFrame]:
"""
FnGuide에서 재무제표 데이터 다운로드.
Args:
ticker: 종목코드
Returns:
재무제표 DataFrame (실패 시 None)
"""
try:
# URL 생성
url = f'https://comp.fnguide.com/SVO2/ASP/SVD_Finance.asp?pGB=1&gicode=A{ticker}'
# 데이터 받아오기
data = pd.read_html(url, displayed_only=False)
# 연간 데이터
data_fs_y = pd.concat([
data[0].iloc[:, ~data[0].columns.str.contains('전년동기')],
data[2],
data[4]
])
data_fs_y = data_fs_y.rename(columns={data_fs_y.columns[0]: "계정"})
# 결산년 찾기
page_data = rq.get(url, timeout=30)
page_data_html = BeautifulSoup(page_data.content, 'html.parser')
fiscal_data = page_data_html.select('div.corp_group1 > h2')
if len(fiscal_data) < 2:
print(f"종목 {ticker}: 결산년 정보 없음")
return None
fiscal_data_text = fiscal_data[1].text
fiscal_data_text = re.findall('[0-9]+', fiscal_data_text)
# 결산년에 해당하는 계정만 남기기
data_fs_y = data_fs_y.loc[:, (data_fs_y.columns == '계정') | (
data_fs_y.columns.str[-2:].isin(fiscal_data_text))]
# 클렌징
data_fs_y_clean = clean_fs(data_fs_y, ticker, 'Y')
# 분기 데이터
data_fs_q = pd.concat([
data[1].iloc[:, ~data[1].columns.str.contains('전년동기')],
data[3],
data[5]
])
data_fs_q = data_fs_q.rename(columns={data_fs_q.columns[0]: "계정"})
data_fs_q_clean = clean_fs(data_fs_q, ticker, 'Q')
# 두개 합치기
data_fs_bind = pd.concat([data_fs_y_clean, data_fs_q_clean])
return data_fs_bind
except Exception as e:
print(f"종목 {ticker} 재무제표 다운로드 오류: {e}")
return None
def process_financial_data(
db_session: Session,
tickers: Optional[List[str]] = None,
sleep_time: float = 2.0
) -> dict:
"""
재무제표 데이터 수집 및 저장.
Args:
db_session: 데이터베이스 세션
tickers: 종목코드 리스트 (None이면 전체 종목)
sleep_time: 요청 간격 (초)
Returns:
{'success': 성공 종목 수, 'failed': 실패 종목 리스트}
"""
# 종목 리스트 조회
if tickers is None:
assets = db_session.query(Asset).filter(
Asset.is_active == True,
Asset.stock_type == '보통주' # 보통주만 조회
).all()
tickers = [asset.ticker for asset in assets]
print(f"전체 {len(tickers)}개 종목 재무제표 수집 시작")
else:
print(f"{len(tickers)}개 종목 재무제표 수집 시작")
# 결과 추적
success_count = 0
error_list = []
# 전종목 재무제표 다운로드 및 저장
for ticker in tqdm(tickers):
try:
# FnGuide에서 데이터 다운로드
fs_df = get_financial_data_from_fnguide(ticker)
if fs_df is None or fs_df.empty:
error_list.append(ticker)
continue
# 데이터베이스 저장
save_financial_to_db(fs_df, db_session)
success_count += 1
except Exception as e:
print(f"종목 {ticker} 처리 오류: {e}")
error_list.append(ticker)
# 요청 간격
time.sleep(sleep_time)
print(f"\n재무제표 수집 완료: 성공 {success_count}개, 실패 {len(error_list)}")
if error_list:
print(f"실패 종목: {error_list[:10]}...") # 처음 10개만 출력
return {
'success': success_count,
'failed': error_list
}
def save_financial_to_db(fs_df: pd.DataFrame, db_session: Session):
"""
재무제표 데이터를 PostgreSQL에 저장 (UPSERT).
Args:
fs_df: 재무제표 DataFrame
db_session: 데이터베이스 세션
"""
for _, row in fs_df.iterrows():
# 기존 레코드 조회
existing = db_session.query(FinancialStatement).filter(
FinancialStatement.ticker == row['종목코드'],
FinancialStatement.account == row['계정'],
FinancialStatement.base_date == row['기준일'],
FinancialStatement.disclosure_type == row['공시구분']
).first()
if existing:
# 업데이트
existing.value = row['']
else:
# 신규 삽입
fs = FinancialStatement(
ticker=row['종목코드'],
account=row['계정'],
base_date=row['기준일'],
value=row[''],
disclosure_type=row['공시구분']
)
db_session.add(fs)
db_session.commit()