Compare commits
5 Commits
c8bb675ba4
...
98d8c1115e
| Author | SHA1 | Date | |
|---|---|---|---|
| 98d8c1115e | |||
| 9b4d678995 | |||
| f13be37470 | |||
| 20240fdb4d | |||
| cc7ab311ed |
@ -25,6 +25,7 @@ from app.services.collectors import (
|
||||
ETFCollector,
|
||||
ETFPriceCollector,
|
||||
)
|
||||
from jobs.collection_job import run_backfill
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@ -71,6 +72,24 @@ def _start_background_collection(collector_cls, **kwargs):
|
||||
thread.start()
|
||||
|
||||
|
||||
def _run_backfill_background(start_year: int):
|
||||
"""Run backfill in a background thread."""
|
||||
try:
|
||||
run_backfill(start_year=start_year)
|
||||
except Exception as e:
|
||||
logger.error("Background backfill failed: %s", e)
|
||||
|
||||
|
||||
def run_backfill_background(start_year: int):
|
||||
"""Start backfill in a daemon thread."""
|
||||
thread = threading.Thread(
|
||||
target=_run_backfill_background,
|
||||
args=(start_year,),
|
||||
daemon=True,
|
||||
)
|
||||
thread.start()
|
||||
|
||||
|
||||
@router.post("/collect/stocks", response_model=CollectResponse)
|
||||
async def collect_stocks(
|
||||
current_user: CurrentUser,
|
||||
@ -132,6 +151,16 @@ async def collect_etf_prices(
|
||||
return CollectResponse(message="ETF price collection started")
|
||||
|
||||
|
||||
@router.post("/collect/backfill", response_model=CollectResponse)
|
||||
async def collect_backfill(
|
||||
current_user: CurrentUser,
|
||||
start_year: int = Query(2000, ge=1990, le=2026, description="Start year for backfill"),
|
||||
):
|
||||
"""Backfill historical price data from start_year to today (runs in background)."""
|
||||
run_backfill_background(start_year)
|
||||
return CollectResponse(message=f"Backfill started from {start_year}")
|
||||
|
||||
|
||||
@router.get("/collect/status", response_model=List[JobLogResponse])
|
||||
async def get_collection_status(
|
||||
current_user: CurrentUser,
|
||||
|
||||
@ -2,5 +2,9 @@
|
||||
Background jobs module.
|
||||
"""
|
||||
from jobs.scheduler import scheduler, start_scheduler, stop_scheduler
|
||||
from jobs.collection_job import run_daily_collection, run_backfill
|
||||
|
||||
__all__ = ["scheduler", "start_scheduler", "stop_scheduler"]
|
||||
__all__ = [
|
||||
"scheduler", "start_scheduler", "stop_scheduler",
|
||||
"run_daily_collection", "run_backfill",
|
||||
]
|
||||
|
||||
150
backend/jobs/collection_job.py
Normal file
150
backend/jobs/collection_job.py
Normal file
@ -0,0 +1,150 @@
|
||||
"""
|
||||
Data collection orchestration jobs.
|
||||
"""
|
||||
import logging
|
||||
from datetime import date, timedelta
|
||||
|
||||
from sqlalchemy import func
|
||||
|
||||
from app.core.database import SessionLocal
|
||||
from app.models.stock import Price, ETFPrice
|
||||
from app.services.collectors import (
|
||||
StockCollector,
|
||||
SectorCollector,
|
||||
PriceCollector,
|
||||
ValuationCollector,
|
||||
ETFCollector,
|
||||
ETFPriceCollector,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _get_daily_collectors():
|
||||
"""
|
||||
Return collectors in dependency order: master data first, then derived data.
|
||||
|
||||
Built at call time so that module-level names can be patched in tests.
|
||||
"""
|
||||
return [
|
||||
("StockCollector", StockCollector, {}),
|
||||
("SectorCollector", SectorCollector, {}),
|
||||
("PriceCollector", PriceCollector, {}),
|
||||
("ValuationCollector", ValuationCollector, {}),
|
||||
("ETFCollector", ETFCollector, {}),
|
||||
("ETFPriceCollector", ETFPriceCollector, {}),
|
||||
]
|
||||
|
||||
|
||||
def run_daily_collection():
|
||||
"""
|
||||
Run all data collectors in dependency order.
|
||||
|
||||
Each collector gets its own DB session. If one fails, the rest continue.
|
||||
Designed to be called by APScheduler at 18:00 Mon-Fri.
|
||||
"""
|
||||
logger.info("Starting daily data collection")
|
||||
results = {}
|
||||
|
||||
for name, collector_cls, kwargs in _get_daily_collectors():
|
||||
db = SessionLocal()
|
||||
try:
|
||||
collector = collector_cls(db, **kwargs)
|
||||
collector.run()
|
||||
results[name] = "success"
|
||||
logger.info(f"{name} completed successfully")
|
||||
except Exception as e:
|
||||
results[name] = f"failed: {e}"
|
||||
logger.error(f"{name} failed: {e}")
|
||||
finally:
|
||||
db.close()
|
||||
|
||||
logger.info(f"Daily collection finished: {results}")
|
||||
return results
|
||||
|
||||
|
||||
def _generate_yearly_chunks(start_year: int, end_date: date) -> list[tuple[str, str]]:
|
||||
"""Generate (start_date, end_date) pairs in YYYYMMDD format, one per year."""
|
||||
chunks = []
|
||||
current_start = date(start_year, 1, 1)
|
||||
|
||||
while current_start < end_date:
|
||||
current_end = date(current_start.year, 12, 31)
|
||||
if current_end > end_date:
|
||||
current_end = end_date
|
||||
chunks.append((
|
||||
current_start.strftime("%Y%m%d"),
|
||||
current_end.strftime("%Y%m%d"),
|
||||
))
|
||||
current_start = date(current_start.year + 1, 1, 1)
|
||||
|
||||
return chunks
|
||||
|
||||
|
||||
def run_backfill(start_year: int = 2000):
|
||||
"""
|
||||
Collect historical price data from start_year to today.
|
||||
|
||||
Checks the earliest existing data in DB and only collects
|
||||
missing periods. Splits into yearly chunks to avoid overloading pykrx.
|
||||
"""
|
||||
logger.info(f"Starting backfill from {start_year}")
|
||||
today = date.today()
|
||||
|
||||
db = SessionLocal()
|
||||
try:
|
||||
# Determine what needs backfilling
|
||||
backfill_targets = [
|
||||
("Price", PriceCollector, Price.date),
|
||||
("ETFPrice", ETFPriceCollector, ETFPrice.date),
|
||||
]
|
||||
|
||||
for name, collector_cls, date_col in backfill_targets:
|
||||
# Find earliest existing data
|
||||
earliest = db.query(func.min(date_col)).scalar()
|
||||
|
||||
if earliest is None:
|
||||
# No data at all - collect everything
|
||||
backfill_end = today
|
||||
else:
|
||||
# Data exists - collect from start_year to day before earliest
|
||||
backfill_end = earliest - timedelta(days=1)
|
||||
|
||||
if date(start_year, 1, 1) >= backfill_end:
|
||||
logger.info(f"{name}: no backfill needed (data exists from {earliest})")
|
||||
continue
|
||||
|
||||
chunks = _generate_yearly_chunks(start_year, backfill_end)
|
||||
logger.info(f"{name}: backfilling {len(chunks)} yearly chunks from {start_year} to {backfill_end}")
|
||||
|
||||
for start_dt, end_dt in chunks:
|
||||
chunk_db = SessionLocal()
|
||||
try:
|
||||
collector = collector_cls(chunk_db, start_date=start_dt, end_date=end_dt)
|
||||
collector.run()
|
||||
logger.info(f"{name}: chunk {start_dt}-{end_dt} completed")
|
||||
except Exception as e:
|
||||
logger.error(f"{name}: chunk {start_dt}-{end_dt} failed: {e}")
|
||||
finally:
|
||||
chunk_db.close()
|
||||
|
||||
# Also fill gap between latest data and today (forward fill)
|
||||
if earliest is not None:
|
||||
latest = db.query(func.max(date_col)).scalar()
|
||||
if latest and latest < today:
|
||||
gap_start = (latest + timedelta(days=1)).strftime("%Y%m%d")
|
||||
gap_end = today.strftime("%Y%m%d")
|
||||
gap_db = SessionLocal()
|
||||
try:
|
||||
collector = collector_cls(gap_db, start_date=gap_start, end_date=gap_end)
|
||||
collector.run()
|
||||
logger.info(f"{name}: forward fill {gap_start}-{gap_end} completed")
|
||||
except Exception as e:
|
||||
logger.error(f"{name}: forward fill failed: {e}")
|
||||
finally:
|
||||
gap_db.close()
|
||||
|
||||
finally:
|
||||
db.close()
|
||||
|
||||
logger.info("Backfill completed")
|
||||
@ -6,6 +6,7 @@ from apscheduler.schedulers.background import BackgroundScheduler
|
||||
from apscheduler.triggers.cron import CronTrigger
|
||||
|
||||
from jobs.snapshot_job import create_daily_snapshots
|
||||
from jobs.collection_job import run_daily_collection
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@ -15,7 +16,21 @@ scheduler = BackgroundScheduler()
|
||||
|
||||
def configure_jobs():
|
||||
"""Configure scheduled jobs."""
|
||||
# Daily snapshot at 18:30 (after market close)
|
||||
# Daily data collection at 18:00 (after market close, before snapshot)
|
||||
scheduler.add_job(
|
||||
run_daily_collection,
|
||||
trigger=CronTrigger(
|
||||
hour=18,
|
||||
minute=0,
|
||||
day_of_week='mon-fri',
|
||||
),
|
||||
id='daily_collection',
|
||||
name='Collect daily market data',
|
||||
replace_existing=True,
|
||||
)
|
||||
logger.info("Configured daily_collection job at 18:00")
|
||||
|
||||
# Daily snapshot at 18:30 (after data collection completes)
|
||||
scheduler.add_job(
|
||||
create_daily_snapshots,
|
||||
trigger=CronTrigger(
|
||||
@ -27,7 +42,7 @@ def configure_jobs():
|
||||
name='Create daily portfolio snapshots',
|
||||
replace_existing=True,
|
||||
)
|
||||
logger.info("Configured daily_snapshots job")
|
||||
logger.info("Configured daily_snapshots job at 18:30")
|
||||
|
||||
|
||||
def start_scheduler():
|
||||
|
||||
177
backend/tests/e2e/test_collection_job.py
Normal file
177
backend/tests/e2e/test_collection_job.py
Normal file
@ -0,0 +1,177 @@
|
||||
"""
|
||||
Tests for collection job orchestration.
|
||||
"""
|
||||
from unittest.mock import patch, MagicMock
|
||||
|
||||
from jobs.collection_job import run_daily_collection
|
||||
|
||||
|
||||
def test_run_daily_collection_calls_collectors_in_order():
|
||||
"""Daily collection should run all collectors in dependency order."""
|
||||
call_order = []
|
||||
|
||||
def make_mock_collector(name):
|
||||
mock_cls = MagicMock()
|
||||
instance = MagicMock()
|
||||
instance.run.side_effect = lambda: call_order.append(name)
|
||||
mock_cls.return_value = instance
|
||||
return mock_cls
|
||||
|
||||
with patch("jobs.collection_job.SessionLocal") as mock_session_local, \
|
||||
patch("jobs.collection_job.StockCollector", make_mock_collector("stock")), \
|
||||
patch("jobs.collection_job.SectorCollector", make_mock_collector("sector")), \
|
||||
patch("jobs.collection_job.PriceCollector", make_mock_collector("price")), \
|
||||
patch("jobs.collection_job.ValuationCollector", make_mock_collector("valuation")), \
|
||||
patch("jobs.collection_job.ETFCollector", make_mock_collector("etf")), \
|
||||
patch("jobs.collection_job.ETFPriceCollector", make_mock_collector("etf_price")):
|
||||
mock_session_local.return_value = MagicMock()
|
||||
run_daily_collection()
|
||||
|
||||
assert call_order == ["stock", "sector", "price", "valuation", "etf", "etf_price"]
|
||||
|
||||
|
||||
def test_run_daily_collection_continues_on_failure():
|
||||
"""If one collector fails, the rest should still run."""
|
||||
call_order = []
|
||||
|
||||
def make_mock_collector(name, should_fail=False):
|
||||
mock_cls = MagicMock()
|
||||
instance = MagicMock()
|
||||
def side_effect():
|
||||
if should_fail:
|
||||
raise RuntimeError(f"{name} failed")
|
||||
call_order.append(name)
|
||||
instance.run.side_effect = side_effect
|
||||
mock_cls.return_value = instance
|
||||
return mock_cls
|
||||
|
||||
with patch("jobs.collection_job.SessionLocal") as mock_session_local, \
|
||||
patch("jobs.collection_job.StockCollector", make_mock_collector("stock", should_fail=True)), \
|
||||
patch("jobs.collection_job.SectorCollector", make_mock_collector("sector")), \
|
||||
patch("jobs.collection_job.PriceCollector", make_mock_collector("price")), \
|
||||
patch("jobs.collection_job.ValuationCollector", make_mock_collector("valuation")), \
|
||||
patch("jobs.collection_job.ETFCollector", make_mock_collector("etf")), \
|
||||
patch("jobs.collection_job.ETFPriceCollector", make_mock_collector("etf_price")):
|
||||
mock_session_local.return_value = MagicMock()
|
||||
run_daily_collection()
|
||||
|
||||
# stock failed, but rest should continue
|
||||
assert call_order == ["sector", "price", "valuation", "etf", "etf_price"]
|
||||
|
||||
|
||||
from jobs.collection_job import run_backfill
|
||||
|
||||
|
||||
def test_run_backfill_generates_yearly_chunks():
|
||||
"""Backfill should split date range into yearly chunks."""
|
||||
collected_ranges = []
|
||||
|
||||
def make_price_collector(name):
|
||||
mock_cls = MagicMock()
|
||||
def capture_init(db, start_date=None, end_date=None):
|
||||
instance = MagicMock()
|
||||
collected_ranges.append((name, start_date, end_date))
|
||||
return instance
|
||||
mock_cls.side_effect = capture_init
|
||||
return mock_cls
|
||||
|
||||
with patch("jobs.collection_job.SessionLocal") as mock_session_local, \
|
||||
patch("jobs.collection_job.PriceCollector", make_price_collector("price")), \
|
||||
patch("jobs.collection_job.ETFPriceCollector", make_price_collector("etf_price")):
|
||||
mock_db = MagicMock()
|
||||
mock_session_local.return_value = mock_db
|
||||
# Simulate no existing data (min date returns None)
|
||||
mock_db.query.return_value.scalar.return_value = None
|
||||
|
||||
run_backfill(start_year=2023)
|
||||
|
||||
# Should generate chunks: 2023, 2024, 2025, 2026 (partial) for both price and etf_price
|
||||
price_ranges = [(s, e) for name, s, e in collected_ranges if name == "price"]
|
||||
assert len(price_ranges) >= 3 # At least 2023, 2024, 2025
|
||||
assert price_ranges[0][0] == "20230101" # First chunk starts at start_year
|
||||
|
||||
|
||||
def test_run_backfill_with_existing_data_only_fills_gaps():
|
||||
"""Backfill should only collect before earliest and after latest existing data."""
|
||||
collected_ranges = []
|
||||
|
||||
def make_price_collector(name):
|
||||
mock_cls = MagicMock()
|
||||
def capture_init(db, start_date=None, end_date=None):
|
||||
instance = MagicMock()
|
||||
collected_ranges.append((name, start_date, end_date))
|
||||
return instance
|
||||
mock_cls.side_effect = capture_init
|
||||
return mock_cls
|
||||
|
||||
from datetime import date
|
||||
|
||||
with patch("jobs.collection_job.SessionLocal") as mock_session_local, \
|
||||
patch("jobs.collection_job.PriceCollector", make_price_collector("price")), \
|
||||
patch("jobs.collection_job.ETFPriceCollector", make_price_collector("etf_price")):
|
||||
mock_db = MagicMock()
|
||||
mock_session_local.return_value = mock_db
|
||||
|
||||
# Simulate: data exists from 2024-06-01 to 2024-12-31
|
||||
call_count = [0]
|
||||
def scalar_side_effect():
|
||||
call_count[0] += 1
|
||||
# func.min returns earliest date, func.max returns latest date
|
||||
# Calls alternate: min for Price, (then max for Price forward fill),
|
||||
# min for ETFPrice, (then max for ETFPrice forward fill)
|
||||
if call_count[0] == 1: # min(Price.date)
|
||||
return date(2024, 6, 1)
|
||||
elif call_count[0] == 2: # max(Price.date) for forward fill
|
||||
return date(2024, 12, 31)
|
||||
elif call_count[0] == 3: # min(ETFPrice.date)
|
||||
return date(2024, 6, 1)
|
||||
elif call_count[0] == 4: # max(ETFPrice.date) for forward fill
|
||||
return date(2024, 12, 31)
|
||||
return None
|
||||
|
||||
mock_db.query.return_value.scalar.side_effect = scalar_side_effect
|
||||
run_backfill(start_year=2023)
|
||||
|
||||
# Price backfill: should collect 2023-01-01 to 2024-05-31 (before earliest)
|
||||
price_ranges = [(s, e) for name, s, e in collected_ranges if name == "price"]
|
||||
assert len(price_ranges) >= 2 # At least backward chunks + forward fill
|
||||
assert price_ranges[0][0] == "20230101"
|
||||
# Last backward chunk should end at or before 2024-05-31
|
||||
backward_chunks = [r for r in price_ranges if r[1] <= "20240531"]
|
||||
assert len(backward_chunks) >= 1
|
||||
|
||||
|
||||
def test_scheduler_has_daily_collection_job():
|
||||
"""Scheduler should register a daily_collection job at 18:00."""
|
||||
from jobs.scheduler import configure_jobs
|
||||
|
||||
from apscheduler.schedulers.background import BackgroundScheduler
|
||||
test_scheduler = BackgroundScheduler()
|
||||
|
||||
with patch("jobs.scheduler.scheduler", test_scheduler):
|
||||
configure_jobs()
|
||||
|
||||
jobs = {job.id: job for job in test_scheduler.get_jobs()}
|
||||
assert "daily_collection" in jobs
|
||||
|
||||
trigger = jobs["daily_collection"].trigger
|
||||
trigger_str = str(trigger)
|
||||
assert "18" in trigger_str # hour=18
|
||||
|
||||
|
||||
def test_backfill_api_endpoint(client, admin_auth_headers):
|
||||
"""POST /api/admin/collect/backfill should trigger backfill."""
|
||||
with patch("app.api.admin.run_backfill_background") as mock_backfill:
|
||||
response = client.post(
|
||||
"/api/admin/collect/backfill?start_year=2020",
|
||||
headers=admin_auth_headers,
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert "backfill" in response.json()["message"].lower()
|
||||
mock_backfill.assert_called_once()
|
||||
|
||||
|
||||
def test_backfill_api_requires_auth(client):
|
||||
"""POST /api/admin/collect/backfill should require authentication."""
|
||||
response = client.post("/api/admin/collect/backfill")
|
||||
assert response.status_code == 401
|
||||
Loading…
x
Reference in New Issue
Block a user