"""Data collection Celery tasks.""" from celery import Task from sqlalchemy.orm import Session from app.celery_worker import celery_app from app.database import SessionLocal from app.tasks.crawlers.krx import process_ticker_data from app.tasks.crawlers.sectors import process_wics_data from app.tasks.crawlers.prices import process_price_data, update_recent_prices from app.tasks.crawlers.financial import process_financial_data class DatabaseTask(Task): """Base task with database session.""" _db: Session = None @property def db(self) -> Session: if self._db is None: self._db = SessionLocal() return self._db def after_return(self, *args, **kwargs): if self._db is not None: self._db.close() self._db = None @celery_app.task(base=DatabaseTask, bind=True, max_retries=3) def collect_ticker_data(self): """KRX 종목 데이터 수집.""" try: print("종목 데이터 수집 시작...") ticker_df = process_ticker_data(db_session=self.db) print(f"종목 데이터 수집 완료: {len(ticker_df)}개") return {'success': len(ticker_df)} except Exception as e: print(f"종목 데이터 수집 오류: {e}") raise self.retry(countdown=300, exc=e) @celery_app.task(base=DatabaseTask, bind=True, max_retries=3) def collect_price_data(self): """주가 데이터 수집 (최근 30일).""" try: print("주가 데이터 수집 시작...") result = update_recent_prices(db_session=self.db, days=30, sleep_time=0.5) print(f"주가 데이터 수집 완료: 성공 {result['success']}개") return result except Exception as e: print(f"주가 데이터 수집 오류: {e}") raise self.retry(countdown=300, exc=e) @celery_app.task(base=DatabaseTask, bind=True, max_retries=3, time_limit=7200) def collect_financial_data(self): """재무제표 데이터 수집 (시간 소요 큼).""" try: print("재무제표 데이터 수집 시작...") result = process_financial_data(db_session=self.db, sleep_time=2.0) print(f"재무제표 수집 완료: 성공 {result['success']}개") return result except Exception as e: print(f"재무제표 데이터 수집 오류: {e}") raise self.retry(countdown=300, exc=e) @celery_app.task(base=DatabaseTask, bind=True, max_retries=3) def collect_sector_data(self): """섹터 분류 데이터 수집.""" try: print("섹터 데이터 수집 시작...") sector_df = process_wics_data(db_session=self.db) print(f"섹터 데이터 수집 완료: {len(sector_df)}개") return {'success': len(sector_df)} except Exception as e: print(f"섹터 데이터 수집 오류: {e}") raise self.retry(countdown=300, exc=e) @celery_app.task(base=DatabaseTask, bind=True) def collect_all_data(self): """ 전체 데이터 수집 (통합). 순서: 1. 종목 데이터 2. 주가 데이터 3. 재무제표 데이터 4. 섹터 데이터 """ try: print("전체 데이터 수집 시작...") # 종목 데이터 collect_ticker_data.apply() # 주가 데이터 collect_price_data.apply() # 재무제표 데이터 collect_financial_data.apply() # 섹터 데이터 collect_sector_data.apply() print("전체 데이터 수집 완료") except Exception as e: print(f"전체 데이터 수집 오류: {e}") raise