fix: run data collectors in background threads to prevent server blocking
The collect endpoints were defined as async def but called synchronous collector.run() directly, blocking the single uvicorn event loop for up to 15+ minutes during price collection. This caused all other requests (including auth/login) to hang, making the app unusable. Backend: Run each collector in a daemon thread with its own DB session, returning HTTP 200 immediately. The collector logs status to JobLog as before, which the frontend can poll. Frontend: Auto-poll job status every 3s while any job is "running", with a visual indicator. Disable collect buttons during active jobs. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
3d5e695559
commit
51fb812d57
@ -1,6 +1,12 @@
|
|||||||
"""
|
"""
|
||||||
Admin API for data collection management.
|
Admin API for data collection management.
|
||||||
|
|
||||||
|
All collection tasks run in background threads to avoid blocking the
|
||||||
|
async event loop (uvicorn single-worker). Each background thread
|
||||||
|
creates its own DB session so the request can return immediately.
|
||||||
"""
|
"""
|
||||||
|
import logging
|
||||||
|
import threading
|
||||||
from typing import List, Optional
|
from typing import List, Optional
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
|
|
||||||
@ -8,7 +14,7 @@ from fastapi import APIRouter, Depends, HTTPException, Query
|
|||||||
from sqlalchemy.orm import Session
|
from sqlalchemy.orm import Session
|
||||||
from pydantic import BaseModel
|
from pydantic import BaseModel
|
||||||
|
|
||||||
from app.core.database import get_db
|
from app.core.database import get_db, SessionLocal
|
||||||
from app.api.deps import CurrentUser
|
from app.api.deps import CurrentUser
|
||||||
from app.models.stock import JobLog
|
from app.models.stock import JobLog
|
||||||
from app.services.collectors import (
|
from app.services.collectors import (
|
||||||
@ -20,6 +26,8 @@ from app.services.collectors import (
|
|||||||
ETFPriceCollector,
|
ETFPriceCollector,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
router = APIRouter(prefix="/api/admin", tags=["admin"])
|
router = APIRouter(prefix="/api/admin", tags=["admin"])
|
||||||
|
|
||||||
|
|
||||||
@ -38,116 +46,90 @@ class JobLogResponse(BaseModel):
|
|||||||
|
|
||||||
class CollectResponse(BaseModel):
|
class CollectResponse(BaseModel):
|
||||||
message: str
|
message: str
|
||||||
job_id: int
|
job_id: int | None = None
|
||||||
|
|
||||||
|
|
||||||
|
def _run_collector_background(collector_cls, collector_kwargs: dict):
|
||||||
|
"""Run a collector in a background thread with its own DB session."""
|
||||||
|
db = SessionLocal()
|
||||||
|
try:
|
||||||
|
collector = collector_cls(db, **collector_kwargs)
|
||||||
|
collector.run()
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("Background collection %s failed: %s", collector_cls.__name__, e)
|
||||||
|
finally:
|
||||||
|
db.close()
|
||||||
|
|
||||||
|
|
||||||
|
def _start_background_collection(collector_cls, **kwargs):
|
||||||
|
"""Start a collector in a daemon thread and return immediately."""
|
||||||
|
thread = threading.Thread(
|
||||||
|
target=_run_collector_background,
|
||||||
|
args=(collector_cls, kwargs),
|
||||||
|
daemon=True,
|
||||||
|
)
|
||||||
|
thread.start()
|
||||||
|
|
||||||
|
|
||||||
@router.post("/collect/stocks", response_model=CollectResponse)
|
@router.post("/collect/stocks", response_model=CollectResponse)
|
||||||
async def collect_stocks(
|
async def collect_stocks(
|
||||||
current_user: CurrentUser,
|
current_user: CurrentUser,
|
||||||
db: Session = Depends(get_db),
|
|
||||||
biz_day: Optional[str] = Query(None, pattern=r"^\d{8}$", description="Business day in YYYYMMDD format"),
|
biz_day: Optional[str] = Query(None, pattern=r"^\d{8}$", description="Business day in YYYYMMDD format"),
|
||||||
):
|
):
|
||||||
"""Collect stock master data from KRX."""
|
"""Collect stock master data from KRX (runs in background)."""
|
||||||
try:
|
_start_background_collection(StockCollector, biz_day=biz_day)
|
||||||
collector = StockCollector(db, biz_day=biz_day)
|
return CollectResponse(message="Stock collection started")
|
||||||
job_log = collector.run()
|
|
||||||
return CollectResponse(
|
|
||||||
message=f"Stock collection completed: {job_log.records_count} records",
|
|
||||||
job_id=job_log.id,
|
|
||||||
)
|
|
||||||
except Exception as e:
|
|
||||||
raise HTTPException(status_code=500, detail=str(e))
|
|
||||||
|
|
||||||
|
|
||||||
@router.post("/collect/sectors", response_model=CollectResponse)
|
@router.post("/collect/sectors", response_model=CollectResponse)
|
||||||
async def collect_sectors(
|
async def collect_sectors(
|
||||||
current_user: CurrentUser,
|
current_user: CurrentUser,
|
||||||
db: Session = Depends(get_db),
|
|
||||||
biz_day: Optional[str] = Query(None, pattern=r"^\d{8}$", description="Business day in YYYYMMDD format"),
|
biz_day: Optional[str] = Query(None, pattern=r"^\d{8}$", description="Business day in YYYYMMDD format"),
|
||||||
):
|
):
|
||||||
"""Collect sector classification data from WISEindex."""
|
"""Collect sector classification data from WISEindex (runs in background)."""
|
||||||
try:
|
_start_background_collection(SectorCollector, biz_day=biz_day)
|
||||||
collector = SectorCollector(db, biz_day=biz_day)
|
return CollectResponse(message="Sector collection started")
|
||||||
job_log = collector.run()
|
|
||||||
return CollectResponse(
|
|
||||||
message=f"Sector collection completed: {job_log.records_count} records",
|
|
||||||
job_id=job_log.id,
|
|
||||||
)
|
|
||||||
except Exception as e:
|
|
||||||
raise HTTPException(status_code=500, detail=str(e))
|
|
||||||
|
|
||||||
|
|
||||||
@router.post("/collect/prices", response_model=CollectResponse)
|
@router.post("/collect/prices", response_model=CollectResponse)
|
||||||
async def collect_prices(
|
async def collect_prices(
|
||||||
current_user: CurrentUser,
|
current_user: CurrentUser,
|
||||||
db: Session = Depends(get_db),
|
|
||||||
start_date: Optional[str] = Query(None, pattern=r"^\d{8}$", description="Start date in YYYYMMDD format"),
|
start_date: Optional[str] = Query(None, pattern=r"^\d{8}$", description="Start date in YYYYMMDD format"),
|
||||||
end_date: Optional[str] = Query(None, pattern=r"^\d{8}$", description="End date in YYYYMMDD format"),
|
end_date: Optional[str] = Query(None, pattern=r"^\d{8}$", description="End date in YYYYMMDD format"),
|
||||||
):
|
):
|
||||||
"""Collect price data using pykrx."""
|
"""Collect price data using pykrx (runs in background)."""
|
||||||
try:
|
_start_background_collection(PriceCollector, start_date=start_date, end_date=end_date)
|
||||||
collector = PriceCollector(db, start_date=start_date, end_date=end_date)
|
return CollectResponse(message="Price collection started")
|
||||||
job_log = collector.run()
|
|
||||||
return CollectResponse(
|
|
||||||
message=f"Price collection completed: {job_log.records_count} records",
|
|
||||||
job_id=job_log.id,
|
|
||||||
)
|
|
||||||
except Exception as e:
|
|
||||||
raise HTTPException(status_code=500, detail=str(e))
|
|
||||||
|
|
||||||
|
|
||||||
@router.post("/collect/valuations", response_model=CollectResponse)
|
@router.post("/collect/valuations", response_model=CollectResponse)
|
||||||
async def collect_valuations(
|
async def collect_valuations(
|
||||||
current_user: CurrentUser,
|
current_user: CurrentUser,
|
||||||
db: Session = Depends(get_db),
|
|
||||||
biz_day: Optional[str] = Query(None, pattern=r"^\d{8}$", description="Business day in YYYYMMDD format"),
|
biz_day: Optional[str] = Query(None, pattern=r"^\d{8}$", description="Business day in YYYYMMDD format"),
|
||||||
):
|
):
|
||||||
"""Collect valuation data from KRX."""
|
"""Collect valuation data from KRX (runs in background)."""
|
||||||
try:
|
_start_background_collection(ValuationCollector, biz_day=biz_day)
|
||||||
collector = ValuationCollector(db, biz_day=biz_day)
|
return CollectResponse(message="Valuation collection started")
|
||||||
job_log = collector.run()
|
|
||||||
return CollectResponse(
|
|
||||||
message=f"Valuation collection completed: {job_log.records_count} records",
|
|
||||||
job_id=job_log.id,
|
|
||||||
)
|
|
||||||
except Exception as e:
|
|
||||||
raise HTTPException(status_code=500, detail=str(e))
|
|
||||||
|
|
||||||
|
|
||||||
@router.post("/collect/etfs", response_model=CollectResponse)
|
@router.post("/collect/etfs", response_model=CollectResponse)
|
||||||
async def collect_etfs(
|
async def collect_etfs(
|
||||||
current_user: CurrentUser,
|
current_user: CurrentUser,
|
||||||
db: Session = Depends(get_db),
|
|
||||||
):
|
):
|
||||||
"""Collect ETF master data from KRX."""
|
"""Collect ETF master data from KRX (runs in background)."""
|
||||||
try:
|
_start_background_collection(ETFCollector)
|
||||||
collector = ETFCollector(db)
|
return CollectResponse(message="ETF collection started")
|
||||||
job_log = collector.run()
|
|
||||||
return CollectResponse(
|
|
||||||
message=f"ETF collection completed: {job_log.records_count} records",
|
|
||||||
job_id=job_log.id,
|
|
||||||
)
|
|
||||||
except Exception as e:
|
|
||||||
raise HTTPException(status_code=500, detail=str(e))
|
|
||||||
|
|
||||||
|
|
||||||
@router.post("/collect/etf-prices", response_model=CollectResponse)
|
@router.post("/collect/etf-prices", response_model=CollectResponse)
|
||||||
async def collect_etf_prices(
|
async def collect_etf_prices(
|
||||||
current_user: CurrentUser,
|
current_user: CurrentUser,
|
||||||
db: Session = Depends(get_db),
|
|
||||||
start_date: Optional[str] = Query(None, pattern=r"^\d{8}$", description="Start date in YYYYMMDD format"),
|
start_date: Optional[str] = Query(None, pattern=r"^\d{8}$", description="Start date in YYYYMMDD format"),
|
||||||
end_date: Optional[str] = Query(None, pattern=r"^\d{8}$", description="End date in YYYYMMDD format"),
|
end_date: Optional[str] = Query(None, pattern=r"^\d{8}$", description="End date in YYYYMMDD format"),
|
||||||
):
|
):
|
||||||
"""Collect ETF price data using pykrx."""
|
"""Collect ETF price data using pykrx (runs in background)."""
|
||||||
try:
|
_start_background_collection(ETFPriceCollector, start_date=start_date, end_date=end_date)
|
||||||
collector = ETFPriceCollector(db, start_date=start_date, end_date=end_date)
|
return CollectResponse(message="ETF price collection started")
|
||||||
job_log = collector.run()
|
|
||||||
return CollectResponse(
|
|
||||||
message=f"ETF price collection completed: {job_log.records_count} records",
|
|
||||||
job_id=job_log.id,
|
|
||||||
)
|
|
||||||
except Exception as e:
|
|
||||||
raise HTTPException(status_code=500, detail=str(e))
|
|
||||||
|
|
||||||
|
|
||||||
@router.get("/collect/status", response_model=List[JobLogResponse])
|
@router.get("/collect/status", response_model=List[JobLogResponse])
|
||||||
|
|||||||
@ -1,10 +1,11 @@
|
|||||||
'use client';
|
'use client';
|
||||||
|
|
||||||
import { useEffect, useState } from 'react';
|
import { useEffect, useState, useRef, useCallback } from 'react';
|
||||||
import { useRouter } from 'next/navigation';
|
import { useRouter } from 'next/navigation';
|
||||||
import { DashboardLayout } from '@/components/layout/dashboard-layout';
|
import { DashboardLayout } from '@/components/layout/dashboard-layout';
|
||||||
import { Card, CardContent, CardHeader, CardTitle } from '@/components/ui/card';
|
import { Card, CardContent, CardHeader, CardTitle } from '@/components/ui/card';
|
||||||
import { Button } from '@/components/ui/button';
|
import { Button } from '@/components/ui/button';
|
||||||
|
import { Skeleton } from '@/components/ui/skeleton';
|
||||||
import { api } from '@/lib/api';
|
import { api } from '@/lib/api';
|
||||||
|
|
||||||
interface JobLog {
|
interface JobLog {
|
||||||
@ -26,6 +27,8 @@ const collectors = [
|
|||||||
{ key: 'etf-prices', label: 'ETF 가격', description: 'pykrx로 ETF 가격 데이터 수집' },
|
{ key: 'etf-prices', label: 'ETF 가격', description: 'pykrx로 ETF 가격 데이터 수집' },
|
||||||
];
|
];
|
||||||
|
|
||||||
|
const POLL_INTERVAL_MS = 3000;
|
||||||
|
|
||||||
export default function DataManagementPage() {
|
export default function DataManagementPage() {
|
||||||
const router = useRouter();
|
const router = useRouter();
|
||||||
const [loading, setLoading] = useState(true);
|
const [loading, setLoading] = useState(true);
|
||||||
@ -33,12 +36,47 @@ export default function DataManagementPage() {
|
|||||||
const [collecting, setCollecting] = useState<string | null>(null);
|
const [collecting, setCollecting] = useState<string | null>(null);
|
||||||
const [error, setError] = useState<string | null>(null);
|
const [error, setError] = useState<string | null>(null);
|
||||||
const [refreshing, setRefreshing] = useState(false);
|
const [refreshing, setRefreshing] = useState(false);
|
||||||
|
const pollTimerRef = useRef<ReturnType<typeof setInterval> | null>(null);
|
||||||
|
|
||||||
|
const fetchJobs = useCallback(async () => {
|
||||||
|
try {
|
||||||
|
setError(null);
|
||||||
|
const data = await api.get<JobLog[]>('/api/admin/collect/status');
|
||||||
|
setJobs(data);
|
||||||
|
return data;
|
||||||
|
} catch (err) {
|
||||||
|
const message = err instanceof Error ? err.message : 'Failed to fetch jobs';
|
||||||
|
setError(message);
|
||||||
|
console.error('Failed to fetch jobs:', err);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}, []);
|
||||||
|
|
||||||
|
const stopPolling = useCallback(() => {
|
||||||
|
if (pollTimerRef.current) {
|
||||||
|
clearInterval(pollTimerRef.current);
|
||||||
|
pollTimerRef.current = null;
|
||||||
|
}
|
||||||
|
}, []);
|
||||||
|
|
||||||
|
const startPolling = useCallback(() => {
|
||||||
|
stopPolling();
|
||||||
|
pollTimerRef.current = setInterval(async () => {
|
||||||
|
const data = await fetchJobs();
|
||||||
|
if (data && !data.some((j) => j.status === 'running')) {
|
||||||
|
stopPolling();
|
||||||
|
}
|
||||||
|
}, POLL_INTERVAL_MS);
|
||||||
|
}, [fetchJobs, stopPolling]);
|
||||||
|
|
||||||
useEffect(() => {
|
useEffect(() => {
|
||||||
const init = async () => {
|
const init = async () => {
|
||||||
try {
|
try {
|
||||||
await api.getCurrentUser();
|
await api.getCurrentUser();
|
||||||
await fetchJobs();
|
const data = await fetchJobs();
|
||||||
|
if (data?.some((j) => j.status === 'running')) {
|
||||||
|
startPolling();
|
||||||
|
}
|
||||||
} catch {
|
} catch {
|
||||||
router.push('/login');
|
router.push('/login');
|
||||||
} finally {
|
} finally {
|
||||||
@ -46,19 +84,8 @@ export default function DataManagementPage() {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
init();
|
init();
|
||||||
}, [router]);
|
return stopPolling;
|
||||||
|
}, [router, fetchJobs, startPolling, stopPolling]);
|
||||||
const fetchJobs = async () => {
|
|
||||||
try {
|
|
||||||
setError(null);
|
|
||||||
const data = await api.get<JobLog[]>('/api/admin/collect/status');
|
|
||||||
setJobs(data);
|
|
||||||
} catch (err) {
|
|
||||||
const message = err instanceof Error ? err.message : 'Failed to fetch jobs';
|
|
||||||
setError(message);
|
|
||||||
console.error('Failed to fetch jobs:', err);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
const runCollector = async (key: string) => {
|
const runCollector = async (key: string) => {
|
||||||
setCollecting(key);
|
setCollecting(key);
|
||||||
@ -66,6 +93,7 @@ export default function DataManagementPage() {
|
|||||||
try {
|
try {
|
||||||
await api.post(`/api/admin/collect/${key}`);
|
await api.post(`/api/admin/collect/${key}`);
|
||||||
await fetchJobs();
|
await fetchJobs();
|
||||||
|
startPolling();
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
const message = err instanceof Error ? err.message : 'Collection failed';
|
const message = err instanceof Error ? err.message : 'Collection failed';
|
||||||
setError(message);
|
setError(message);
|
||||||
@ -93,8 +121,16 @@ export default function DataManagementPage() {
|
|||||||
return colors[status] || 'bg-muted text-muted-foreground';
|
return colors[status] || 'bg-muted text-muted-foreground';
|
||||||
};
|
};
|
||||||
|
|
||||||
|
const hasRunningJobs = jobs.some((j) => j.status === 'running');
|
||||||
|
|
||||||
if (loading) {
|
if (loading) {
|
||||||
return null;
|
return (
|
||||||
|
<DashboardLayout>
|
||||||
|
<Skeleton className="h-8 w-48 mb-6" />
|
||||||
|
<Skeleton className="h-48 rounded-xl mb-6" />
|
||||||
|
<Skeleton className="h-64 rounded-xl" />
|
||||||
|
</DashboardLayout>
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
return (
|
return (
|
||||||
@ -122,11 +158,11 @@ export default function DataManagementPage() {
|
|||||||
<p className="text-sm text-muted-foreground mb-4">{col.description}</p>
|
<p className="text-sm text-muted-foreground mb-4">{col.description}</p>
|
||||||
<Button
|
<Button
|
||||||
onClick={() => runCollector(col.key)}
|
onClick={() => runCollector(col.key)}
|
||||||
disabled={collecting !== null}
|
disabled={collecting !== null || hasRunningJobs}
|
||||||
aria-label={`${col.label} 수집 실행`}
|
aria-label={`${col.label} 수집 실행`}
|
||||||
className="mt-auto"
|
className="mt-auto"
|
||||||
>
|
>
|
||||||
{collecting === col.key ? '수집 중...' : '실행'}
|
{collecting === col.key ? '요청 중...' : '실행'}
|
||||||
</Button>
|
</Button>
|
||||||
</div>
|
</div>
|
||||||
))}
|
))}
|
||||||
@ -136,7 +172,15 @@ export default function DataManagementPage() {
|
|||||||
|
|
||||||
<Card>
|
<Card>
|
||||||
<CardHeader className="flex flex-row items-center justify-between">
|
<CardHeader className="flex flex-row items-center justify-between">
|
||||||
<CardTitle>최근 작업 이력</CardTitle>
|
<div className="flex items-center gap-2">
|
||||||
|
<CardTitle>최근 작업 이력</CardTitle>
|
||||||
|
{hasRunningJobs && (
|
||||||
|
<span className="inline-flex items-center gap-1 px-2 py-0.5 rounded text-xs bg-yellow-100 text-yellow-800 dark:bg-yellow-900 dark:text-yellow-200">
|
||||||
|
<span className="inline-block w-2 h-2 rounded-full bg-yellow-500 animate-pulse" />
|
||||||
|
실행 중
|
||||||
|
</span>
|
||||||
|
)}
|
||||||
|
</div>
|
||||||
<button
|
<button
|
||||||
onClick={handleRefresh}
|
onClick={handleRefresh}
|
||||||
disabled={refreshing}
|
disabled={refreshing}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user