diff --git a/backend/app/api/admin.py b/backend/app/api/admin.py index 40c47c7..301ac6b 100644 --- a/backend/app/api/admin.py +++ b/backend/app/api/admin.py @@ -1,6 +1,12 @@ """ Admin API for data collection management. + +All collection tasks run in background threads to avoid blocking the +async event loop (uvicorn single-worker). Each background thread +creates its own DB session so the request can return immediately. """ +import logging +import threading from typing import List, Optional from datetime import datetime @@ -8,7 +14,7 @@ from fastapi import APIRouter, Depends, HTTPException, Query from sqlalchemy.orm import Session from pydantic import BaseModel -from app.core.database import get_db +from app.core.database import get_db, SessionLocal from app.api.deps import CurrentUser from app.models.stock import JobLog from app.services.collectors import ( @@ -20,6 +26,8 @@ from app.services.collectors import ( ETFPriceCollector, ) +logger = logging.getLogger(__name__) + router = APIRouter(prefix="/api/admin", tags=["admin"]) @@ -38,116 +46,90 @@ class JobLogResponse(BaseModel): class CollectResponse(BaseModel): message: str - job_id: int + job_id: int | None = None + + +def _run_collector_background(collector_cls, collector_kwargs: dict): + """Run a collector in a background thread with its own DB session.""" + db = SessionLocal() + try: + collector = collector_cls(db, **collector_kwargs) + collector.run() + except Exception as e: + logger.error("Background collection %s failed: %s", collector_cls.__name__, e) + finally: + db.close() + + +def _start_background_collection(collector_cls, **kwargs): + """Start a collector in a daemon thread and return immediately.""" + thread = threading.Thread( + target=_run_collector_background, + args=(collector_cls, kwargs), + daemon=True, + ) + thread.start() @router.post("/collect/stocks", response_model=CollectResponse) async def collect_stocks( current_user: CurrentUser, - db: Session = Depends(get_db), biz_day: Optional[str] = Query(None, pattern=r"^\d{8}$", description="Business day in YYYYMMDD format"), ): - """Collect stock master data from KRX.""" - try: - collector = StockCollector(db, biz_day=biz_day) - job_log = collector.run() - return CollectResponse( - message=f"Stock collection completed: {job_log.records_count} records", - job_id=job_log.id, - ) - except Exception as e: - raise HTTPException(status_code=500, detail=str(e)) + """Collect stock master data from KRX (runs in background).""" + _start_background_collection(StockCollector, biz_day=biz_day) + return CollectResponse(message="Stock collection started") @router.post("/collect/sectors", response_model=CollectResponse) async def collect_sectors( current_user: CurrentUser, - db: Session = Depends(get_db), biz_day: Optional[str] = Query(None, pattern=r"^\d{8}$", description="Business day in YYYYMMDD format"), ): - """Collect sector classification data from WISEindex.""" - try: - collector = SectorCollector(db, biz_day=biz_day) - job_log = collector.run() - return CollectResponse( - message=f"Sector collection completed: {job_log.records_count} records", - job_id=job_log.id, - ) - except Exception as e: - raise HTTPException(status_code=500, detail=str(e)) + """Collect sector classification data from WISEindex (runs in background).""" + _start_background_collection(SectorCollector, biz_day=biz_day) + return CollectResponse(message="Sector collection started") @router.post("/collect/prices", response_model=CollectResponse) async def collect_prices( current_user: CurrentUser, - db: Session = Depends(get_db), start_date: Optional[str] = Query(None, pattern=r"^\d{8}$", description="Start date in YYYYMMDD format"), end_date: Optional[str] = Query(None, pattern=r"^\d{8}$", description="End date in YYYYMMDD format"), ): - """Collect price data using pykrx.""" - try: - collector = PriceCollector(db, start_date=start_date, end_date=end_date) - job_log = collector.run() - return CollectResponse( - message=f"Price collection completed: {job_log.records_count} records", - job_id=job_log.id, - ) - except Exception as e: - raise HTTPException(status_code=500, detail=str(e)) + """Collect price data using pykrx (runs in background).""" + _start_background_collection(PriceCollector, start_date=start_date, end_date=end_date) + return CollectResponse(message="Price collection started") @router.post("/collect/valuations", response_model=CollectResponse) async def collect_valuations( current_user: CurrentUser, - db: Session = Depends(get_db), biz_day: Optional[str] = Query(None, pattern=r"^\d{8}$", description="Business day in YYYYMMDD format"), ): - """Collect valuation data from KRX.""" - try: - collector = ValuationCollector(db, biz_day=biz_day) - job_log = collector.run() - return CollectResponse( - message=f"Valuation collection completed: {job_log.records_count} records", - job_id=job_log.id, - ) - except Exception as e: - raise HTTPException(status_code=500, detail=str(e)) + """Collect valuation data from KRX (runs in background).""" + _start_background_collection(ValuationCollector, biz_day=biz_day) + return CollectResponse(message="Valuation collection started") @router.post("/collect/etfs", response_model=CollectResponse) async def collect_etfs( current_user: CurrentUser, - db: Session = Depends(get_db), ): - """Collect ETF master data from KRX.""" - try: - collector = ETFCollector(db) - job_log = collector.run() - return CollectResponse( - message=f"ETF collection completed: {job_log.records_count} records", - job_id=job_log.id, - ) - except Exception as e: - raise HTTPException(status_code=500, detail=str(e)) + """Collect ETF master data from KRX (runs in background).""" + _start_background_collection(ETFCollector) + return CollectResponse(message="ETF collection started") @router.post("/collect/etf-prices", response_model=CollectResponse) async def collect_etf_prices( current_user: CurrentUser, - db: Session = Depends(get_db), start_date: Optional[str] = Query(None, pattern=r"^\d{8}$", description="Start date in YYYYMMDD format"), end_date: Optional[str] = Query(None, pattern=r"^\d{8}$", description="End date in YYYYMMDD format"), ): - """Collect ETF price data using pykrx.""" - try: - collector = ETFPriceCollector(db, start_date=start_date, end_date=end_date) - job_log = collector.run() - return CollectResponse( - message=f"ETF price collection completed: {job_log.records_count} records", - job_id=job_log.id, - ) - except Exception as e: - raise HTTPException(status_code=500, detail=str(e)) + """Collect ETF price data using pykrx (runs in background).""" + _start_background_collection(ETFPriceCollector, start_date=start_date, end_date=end_date) + return CollectResponse(message="ETF price collection started") @router.get("/collect/status", response_model=List[JobLogResponse]) diff --git a/frontend/src/app/admin/data/page.tsx b/frontend/src/app/admin/data/page.tsx index 2977cc7..3b87688 100644 --- a/frontend/src/app/admin/data/page.tsx +++ b/frontend/src/app/admin/data/page.tsx @@ -1,10 +1,11 @@ 'use client'; -import { useEffect, useState } from 'react'; +import { useEffect, useState, useRef, useCallback } from 'react'; import { useRouter } from 'next/navigation'; import { DashboardLayout } from '@/components/layout/dashboard-layout'; import { Card, CardContent, CardHeader, CardTitle } from '@/components/ui/card'; import { Button } from '@/components/ui/button'; +import { Skeleton } from '@/components/ui/skeleton'; import { api } from '@/lib/api'; interface JobLog { @@ -26,6 +27,8 @@ const collectors = [ { key: 'etf-prices', label: 'ETF 가격', description: 'pykrx로 ETF 가격 데이터 수집' }, ]; +const POLL_INTERVAL_MS = 3000; + export default function DataManagementPage() { const router = useRouter(); const [loading, setLoading] = useState(true); @@ -33,12 +36,47 @@ export default function DataManagementPage() { const [collecting, setCollecting] = useState(null); const [error, setError] = useState(null); const [refreshing, setRefreshing] = useState(false); + const pollTimerRef = useRef | null>(null); + + const fetchJobs = useCallback(async () => { + try { + setError(null); + const data = await api.get('/api/admin/collect/status'); + setJobs(data); + return data; + } catch (err) { + const message = err instanceof Error ? err.message : 'Failed to fetch jobs'; + setError(message); + console.error('Failed to fetch jobs:', err); + return null; + } + }, []); + + const stopPolling = useCallback(() => { + if (pollTimerRef.current) { + clearInterval(pollTimerRef.current); + pollTimerRef.current = null; + } + }, []); + + const startPolling = useCallback(() => { + stopPolling(); + pollTimerRef.current = setInterval(async () => { + const data = await fetchJobs(); + if (data && !data.some((j) => j.status === 'running')) { + stopPolling(); + } + }, POLL_INTERVAL_MS); + }, [fetchJobs, stopPolling]); useEffect(() => { const init = async () => { try { await api.getCurrentUser(); - await fetchJobs(); + const data = await fetchJobs(); + if (data?.some((j) => j.status === 'running')) { + startPolling(); + } } catch { router.push('/login'); } finally { @@ -46,19 +84,8 @@ export default function DataManagementPage() { } }; init(); - }, [router]); - - const fetchJobs = async () => { - try { - setError(null); - const data = await api.get('/api/admin/collect/status'); - setJobs(data); - } catch (err) { - const message = err instanceof Error ? err.message : 'Failed to fetch jobs'; - setError(message); - console.error('Failed to fetch jobs:', err); - } - }; + return stopPolling; + }, [router, fetchJobs, startPolling, stopPolling]); const runCollector = async (key: string) => { setCollecting(key); @@ -66,6 +93,7 @@ export default function DataManagementPage() { try { await api.post(`/api/admin/collect/${key}`); await fetchJobs(); + startPolling(); } catch (err) { const message = err instanceof Error ? err.message : 'Collection failed'; setError(message); @@ -93,8 +121,16 @@ export default function DataManagementPage() { return colors[status] || 'bg-muted text-muted-foreground'; }; + const hasRunningJobs = jobs.some((j) => j.status === 'running'); + if (loading) { - return null; + return ( + + + + + + ); } return ( @@ -122,11 +158,11 @@ export default function DataManagementPage() {

{col.description}

))} @@ -136,7 +172,15 @@ export default function DataManagementPage() { - 최근 작업 이력 +
+ 최근 작업 이력 + {hasRunningJobs && ( + + + 실행 중 + + )} +