feat: add daily collection job orchestration
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
c8bb675ba4
commit
cc7ab311ed
59
backend/jobs/collection_job.py
Normal file
59
backend/jobs/collection_job.py
Normal file
@ -0,0 +1,59 @@
|
||||
"""
|
||||
Data collection orchestration jobs.
|
||||
"""
|
||||
import logging
|
||||
|
||||
from app.core.database import SessionLocal
|
||||
from app.services.collectors import (
|
||||
StockCollector,
|
||||
SectorCollector,
|
||||
PriceCollector,
|
||||
ValuationCollector,
|
||||
ETFCollector,
|
||||
ETFPriceCollector,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _get_daily_collectors():
|
||||
"""
|
||||
Return collectors in dependency order: master data first, then derived data.
|
||||
|
||||
Built at call time so that module-level names can be patched in tests.
|
||||
"""
|
||||
return [
|
||||
("StockCollector", StockCollector, {}),
|
||||
("SectorCollector", SectorCollector, {}),
|
||||
("PriceCollector", PriceCollector, {}),
|
||||
("ValuationCollector", ValuationCollector, {}),
|
||||
("ETFCollector", ETFCollector, {}),
|
||||
("ETFPriceCollector", ETFPriceCollector, {}),
|
||||
]
|
||||
|
||||
|
||||
def run_daily_collection():
|
||||
"""
|
||||
Run all data collectors in dependency order.
|
||||
|
||||
Each collector gets its own DB session. If one fails, the rest continue.
|
||||
Designed to be called by APScheduler at 18:00 Mon-Fri.
|
||||
"""
|
||||
logger.info("Starting daily data collection")
|
||||
results = {}
|
||||
|
||||
for name, collector_cls, kwargs in _get_daily_collectors():
|
||||
db = SessionLocal()
|
||||
try:
|
||||
collector = collector_cls(db, **kwargs)
|
||||
collector.run()
|
||||
results[name] = "success"
|
||||
logger.info(f"{name} completed successfully")
|
||||
except Exception as e:
|
||||
results[name] = f"failed: {e}"
|
||||
logger.error(f"{name} failed: {e}")
|
||||
finally:
|
||||
db.close()
|
||||
|
||||
logger.info(f"Daily collection finished: {results}")
|
||||
return results
|
||||
59
backend/tests/e2e/test_collection_job.py
Normal file
59
backend/tests/e2e/test_collection_job.py
Normal file
@ -0,0 +1,59 @@
|
||||
"""
|
||||
Tests for collection job orchestration.
|
||||
"""
|
||||
from unittest.mock import patch, MagicMock
|
||||
|
||||
from jobs.collection_job import run_daily_collection
|
||||
|
||||
|
||||
def test_run_daily_collection_calls_collectors_in_order():
|
||||
"""Daily collection should run all collectors in dependency order."""
|
||||
call_order = []
|
||||
|
||||
def make_mock_collector(name):
|
||||
mock_cls = MagicMock()
|
||||
instance = MagicMock()
|
||||
instance.run.side_effect = lambda: call_order.append(name)
|
||||
mock_cls.return_value = instance
|
||||
return mock_cls
|
||||
|
||||
with patch("jobs.collection_job.SessionLocal") as mock_session_local, \
|
||||
patch("jobs.collection_job.StockCollector", make_mock_collector("stock")), \
|
||||
patch("jobs.collection_job.SectorCollector", make_mock_collector("sector")), \
|
||||
patch("jobs.collection_job.PriceCollector", make_mock_collector("price")), \
|
||||
patch("jobs.collection_job.ValuationCollector", make_mock_collector("valuation")), \
|
||||
patch("jobs.collection_job.ETFCollector", make_mock_collector("etf")), \
|
||||
patch("jobs.collection_job.ETFPriceCollector", make_mock_collector("etf_price")):
|
||||
mock_session_local.return_value = MagicMock()
|
||||
run_daily_collection()
|
||||
|
||||
assert call_order == ["stock", "sector", "price", "valuation", "etf", "etf_price"]
|
||||
|
||||
|
||||
def test_run_daily_collection_continues_on_failure():
|
||||
"""If one collector fails, the rest should still run."""
|
||||
call_order = []
|
||||
|
||||
def make_mock_collector(name, should_fail=False):
|
||||
mock_cls = MagicMock()
|
||||
instance = MagicMock()
|
||||
def side_effect():
|
||||
if should_fail:
|
||||
raise RuntimeError(f"{name} failed")
|
||||
call_order.append(name)
|
||||
instance.run.side_effect = side_effect
|
||||
mock_cls.return_value = instance
|
||||
return mock_cls
|
||||
|
||||
with patch("jobs.collection_job.SessionLocal") as mock_session_local, \
|
||||
patch("jobs.collection_job.StockCollector", make_mock_collector("stock", should_fail=True)), \
|
||||
patch("jobs.collection_job.SectorCollector", make_mock_collector("sector")), \
|
||||
patch("jobs.collection_job.PriceCollector", make_mock_collector("price")), \
|
||||
patch("jobs.collection_job.ValuationCollector", make_mock_collector("valuation")), \
|
||||
patch("jobs.collection_job.ETFCollector", make_mock_collector("etf")), \
|
||||
patch("jobs.collection_job.ETFPriceCollector", make_mock_collector("etf_price")):
|
||||
mock_session_local.return_value = MagicMock()
|
||||
run_daily_collection()
|
||||
|
||||
# stock failed, but rest should continue
|
||||
assert call_order == ["sector", "price", "valuation", "etf", "etf_price"]
|
||||
Loading…
x
Reference in New Issue
Block a user