"""WICS sector data crawler (섹터 정보 수집).""" import time from typing import Optional from datetime import datetime import pandas as pd import requests as rq from tqdm import tqdm from sqlalchemy.orm import Session from app.models.asset import Asset def process_wics_data(biz_day: Optional[str] = None, db_session: Session = None) -> pd.DataFrame: """ WICS 기준 섹터 정보 수집. Args: biz_day: 영업일 (YYYYMMDD) db_session: 데이터베이스 세션 Returns: 섹터 정보 DataFrame """ if biz_day is None: from app.tasks.crawlers.krx import get_latest_biz_day2 biz_day = get_latest_biz_day2() print(f"최근 영업일: {biz_day}") # WICS 섹터 코드 sector_code = [ 'G25', # 경기소비재 'G35', # 산업재 'G50', # 유틸리티 'G40', # 금융 'G10', # 에너지 'G20', # 소재 'G55', # 커뮤니케이션서비스 'G30', # 임의소비재 'G15', # 헬스케어 'G45' # IT ] data_sector = [] print("WICS 섹터 데이터 수집 중...") for i in tqdm(sector_code): try: url = f'http://www.wiseindex.com/Index/GetIndexComponets?ceil_yn=0&dt={biz_day}&sec_cd={i}' data = rq.get(url, timeout=30).json() data_pd = pd.json_normalize(data['list']) data_sector.append(data_pd) time.sleep(2) # 요청 간격 조절 except Exception as e: print(f"섹터 {i} 수집 오류: {e}") continue if not data_sector: print("섹터 데이터 수집 실패") return pd.DataFrame() # 데이터 병합 kor_sector = pd.concat(data_sector, axis=0) kor_sector = kor_sector[['IDX_CD', 'CMP_CD', 'CMP_KOR', 'SEC_NM_KOR']] kor_sector['기준일'] = biz_day kor_sector['기준일'] = pd.to_datetime(kor_sector['기준일']) # 데이터베이스 저장 if db_session: save_sector_to_db(kor_sector, db_session) return kor_sector def save_sector_to_db(sector_df: pd.DataFrame, db_session: Session): """ 섹터 데이터를 PostgreSQL에 저장 (assets 테이블의 sector 필드 업데이트). Args: sector_df: 섹터 DataFrame db_session: 데이터베이스 세션 """ print(f"섹터 정보 업데이트 중... ({len(sector_df)}개)") updated_count = 0 for _, row in sector_df.iterrows(): # 종목코드로 Asset 조회 asset = db_session.query(Asset).filter( Asset.ticker == row['CMP_CD'] ).first() if asset: # 섹터 정보 업데이트 asset.sector = row['SEC_NM_KOR'] updated_count += 1 db_session.commit() print(f"섹터 정보 업데이트 완료 ({updated_count}개)")