penti/backend/app/tasks/data_collection.py

111 lines
3.5 KiB
Python
Raw Permalink Normal View History

2026-01-31 23:30:51 +09:00
"""Data collection Celery tasks."""
from celery import Task
from sqlalchemy.orm import Session
from app.celery_worker import celery_app
from app.database import SessionLocal
from app.tasks.crawlers.krx import process_ticker_data
from app.tasks.crawlers.sectors import process_wics_data
from app.tasks.crawlers.prices import process_price_data, update_recent_prices
from app.tasks.crawlers.financial import process_financial_data
class DatabaseTask(Task):
"""Base task with database session."""
_db: Session = None
@property
def db(self) -> Session:
if self._db is None:
self._db = SessionLocal()
return self._db
def after_return(self, *args, **kwargs):
if self._db is not None:
self._db.close()
self._db = None
@celery_app.task(base=DatabaseTask, bind=True, max_retries=3)
def collect_ticker_data(self):
"""KRX 종목 데이터 수집."""
try:
print("종목 데이터 수집 시작...")
ticker_df = process_ticker_data(db_session=self.db)
print(f"종목 데이터 수집 완료: {len(ticker_df)}")
return {'success': len(ticker_df)}
except Exception as e:
print(f"종목 데이터 수집 오류: {e}")
raise self.retry(countdown=300, exc=e)
@celery_app.task(base=DatabaseTask, bind=True, max_retries=3)
def collect_price_data(self):
"""주가 데이터 수집 (최근 30일)."""
try:
print("주가 데이터 수집 시작...")
result = update_recent_prices(db_session=self.db, days=30, sleep_time=0.5)
print(f"주가 데이터 수집 완료: 성공 {result['success']}")
return result
except Exception as e:
print(f"주가 데이터 수집 오류: {e}")
raise self.retry(countdown=300, exc=e)
@celery_app.task(base=DatabaseTask, bind=True, max_retries=3, time_limit=7200)
def collect_financial_data(self):
"""재무제표 데이터 수집 (시간 소요 큼)."""
try:
print("재무제표 데이터 수집 시작...")
result = process_financial_data(db_session=self.db, sleep_time=2.0)
print(f"재무제표 수집 완료: 성공 {result['success']}")
return result
except Exception as e:
print(f"재무제표 데이터 수집 오류: {e}")
raise self.retry(countdown=300, exc=e)
@celery_app.task(base=DatabaseTask, bind=True, max_retries=3)
def collect_sector_data(self):
"""섹터 분류 데이터 수집."""
try:
print("섹터 데이터 수집 시작...")
sector_df = process_wics_data(db_session=self.db)
print(f"섹터 데이터 수집 완료: {len(sector_df)}")
return {'success': len(sector_df)}
except Exception as e:
print(f"섹터 데이터 수집 오류: {e}")
raise self.retry(countdown=300, exc=e)
@celery_app.task(base=DatabaseTask, bind=True)
def collect_all_data(self):
"""
전체 데이터 수집 (통합).
순서:
1. 종목 데이터
2. 주가 데이터
3. 재무제표 데이터
4. 섹터 데이터
"""
try:
print("전체 데이터 수집 시작...")
# 종목 데이터
collect_ticker_data.apply()
# 주가 데이터
collect_price_data.apply()
# 재무제표 데이터
collect_financial_data.apply()
# 섹터 데이터
collect_sector_data.apply()
print("전체 데이터 수집 완료")
except Exception as e:
print(f"전체 데이터 수집 오류: {e}")
raise