"""Data collection API endpoints.""" from fastapi import APIRouter, BackgroundTasks, status from typing import Optional from app.tasks.data_collection import ( collect_ticker_data, collect_price_data, collect_financial_data, collect_sector_data, collect_all_data ) router = APIRouter() @router.post("/collect/ticker", status_code=status.HTTP_202_ACCEPTED) async def trigger_ticker_collection(background_tasks: BackgroundTasks): """ 종목 데이터 수집 트리거. Returns: 태스크 실행 메시지 """ task = collect_ticker_data.delay() return { "message": "종목 데이터 수집이 시작되었습니다", "task_id": task.id } @router.post("/collect/price", status_code=status.HTTP_202_ACCEPTED) async def trigger_price_collection(background_tasks: BackgroundTasks): """ 주가 데이터 수집 트리거 (최근 30일). Returns: 태스크 실행 메시지 """ task = collect_price_data.delay() return { "message": "주가 데이터 수집이 시작되었습니다 (최근 30일)", "task_id": task.id } @router.post("/collect/financial", status_code=status.HTTP_202_ACCEPTED) async def trigger_financial_collection(background_tasks: BackgroundTasks): """ 재무제표 데이터 수집 트리거. Warning: 재무제표 수집은 시간이 오래 걸립니다 (수 시간). Returns: 태스크 실행 메시지 """ task = collect_financial_data.delay() return { "message": "재무제표 데이터 수집이 시작되었습니다 (시간 소요 예상)", "task_id": task.id, "warning": "이 작업은 수 시간이 걸릴 수 있습니다" } @router.post("/collect/sector", status_code=status.HTTP_202_ACCEPTED) async def trigger_sector_collection(background_tasks: BackgroundTasks): """ 섹터 데이터 수집 트리거. Returns: 태스크 실행 메시지 """ task = collect_sector_data.delay() return { "message": "섹터 데이터 수집이 시작되었습니다", "task_id": task.id } @router.post("/collect/all", status_code=status.HTTP_202_ACCEPTED) async def trigger_all_data_collection(background_tasks: BackgroundTasks): """ 전체 데이터 수집 트리거. 순서: 1. 종목 데이터 2. 주가 데이터 3. 재무제표 데이터 4. 섹터 데이터 Warning: 이 작업은 매우 오래 걸립니다 (수 시간). Returns: 태스크 실행 메시지 """ task = collect_all_data.delay() return { "message": "전체 데이터 수집이 시작되었습니다", "task_id": task.id, "warning": "이 작업은 매우 오래 걸릴 수 있습니다 (수 시간)" } @router.get("/task/{task_id}") async def get_task_status(task_id: str): """ Celery 태스크 상태 조회. Args: task_id: Celery 태스크 ID Returns: 태스크 상태 정보 """ from celery.result import AsyncResult from app.celery_worker import celery_app task_result = AsyncResult(task_id, app=celery_app) return { "task_id": task_id, "status": task_result.status, "result": task_result.result if task_result.ready() else None, "traceback": str(task_result.traceback) if task_result.failed() else None } @router.get("/stats") async def get_data_stats(): """ 데이터베이스 통계 조회. Returns: 데이터 통계 """ from app.database import SessionLocal from app.models import Asset, PriceData, FinancialStatement db = SessionLocal() try: # 종목 수 total_assets = db.query(Asset).count() active_assets = db.query(Asset).filter(Asset.is_active == True).count() # 주가 데이터 수 total_prices = db.query(PriceData).count() # 재무제표 데이터 수 total_financials = db.query(FinancialStatement).count() return { "assets": { "total": total_assets, "active": active_assets }, "price_data": { "total_records": total_prices }, "financial_statements": { "total_records": total_financials } } finally: db.close()