From cc7ab311edb6cb8d2a0d7c58bab981f2471d53a0 Mon Sep 17 00:00:00 2001 From: zephyrdark Date: Sat, 14 Feb 2026 00:23:12 +0900 Subject: [PATCH] feat: add daily collection job orchestration Co-Authored-By: Claude Opus 4.6 --- backend/jobs/collection_job.py | 59 ++++++++++++++++++++++++ backend/tests/e2e/test_collection_job.py | 59 ++++++++++++++++++++++++ 2 files changed, 118 insertions(+) create mode 100644 backend/jobs/collection_job.py create mode 100644 backend/tests/e2e/test_collection_job.py diff --git a/backend/jobs/collection_job.py b/backend/jobs/collection_job.py new file mode 100644 index 0000000..476f03d --- /dev/null +++ b/backend/jobs/collection_job.py @@ -0,0 +1,59 @@ +""" +Data collection orchestration jobs. +""" +import logging + +from app.core.database import SessionLocal +from app.services.collectors import ( + StockCollector, + SectorCollector, + PriceCollector, + ValuationCollector, + ETFCollector, + ETFPriceCollector, +) + +logger = logging.getLogger(__name__) + + +def _get_daily_collectors(): + """ + Return collectors in dependency order: master data first, then derived data. + + Built at call time so that module-level names can be patched in tests. + """ + return [ + ("StockCollector", StockCollector, {}), + ("SectorCollector", SectorCollector, {}), + ("PriceCollector", PriceCollector, {}), + ("ValuationCollector", ValuationCollector, {}), + ("ETFCollector", ETFCollector, {}), + ("ETFPriceCollector", ETFPriceCollector, {}), + ] + + +def run_daily_collection(): + """ + Run all data collectors in dependency order. + + Each collector gets its own DB session. If one fails, the rest continue. + Designed to be called by APScheduler at 18:00 Mon-Fri. + """ + logger.info("Starting daily data collection") + results = {} + + for name, collector_cls, kwargs in _get_daily_collectors(): + db = SessionLocal() + try: + collector = collector_cls(db, **kwargs) + collector.run() + results[name] = "success" + logger.info(f"{name} completed successfully") + except Exception as e: + results[name] = f"failed: {e}" + logger.error(f"{name} failed: {e}") + finally: + db.close() + + logger.info(f"Daily collection finished: {results}") + return results diff --git a/backend/tests/e2e/test_collection_job.py b/backend/tests/e2e/test_collection_job.py new file mode 100644 index 0000000..a70fdf1 --- /dev/null +++ b/backend/tests/e2e/test_collection_job.py @@ -0,0 +1,59 @@ +""" +Tests for collection job orchestration. +""" +from unittest.mock import patch, MagicMock + +from jobs.collection_job import run_daily_collection + + +def test_run_daily_collection_calls_collectors_in_order(): + """Daily collection should run all collectors in dependency order.""" + call_order = [] + + def make_mock_collector(name): + mock_cls = MagicMock() + instance = MagicMock() + instance.run.side_effect = lambda: call_order.append(name) + mock_cls.return_value = instance + return mock_cls + + with patch("jobs.collection_job.SessionLocal") as mock_session_local, \ + patch("jobs.collection_job.StockCollector", make_mock_collector("stock")), \ + patch("jobs.collection_job.SectorCollector", make_mock_collector("sector")), \ + patch("jobs.collection_job.PriceCollector", make_mock_collector("price")), \ + patch("jobs.collection_job.ValuationCollector", make_mock_collector("valuation")), \ + patch("jobs.collection_job.ETFCollector", make_mock_collector("etf")), \ + patch("jobs.collection_job.ETFPriceCollector", make_mock_collector("etf_price")): + mock_session_local.return_value = MagicMock() + run_daily_collection() + + assert call_order == ["stock", "sector", "price", "valuation", "etf", "etf_price"] + + +def test_run_daily_collection_continues_on_failure(): + """If one collector fails, the rest should still run.""" + call_order = [] + + def make_mock_collector(name, should_fail=False): + mock_cls = MagicMock() + instance = MagicMock() + def side_effect(): + if should_fail: + raise RuntimeError(f"{name} failed") + call_order.append(name) + instance.run.side_effect = side_effect + mock_cls.return_value = instance + return mock_cls + + with patch("jobs.collection_job.SessionLocal") as mock_session_local, \ + patch("jobs.collection_job.StockCollector", make_mock_collector("stock", should_fail=True)), \ + patch("jobs.collection_job.SectorCollector", make_mock_collector("sector")), \ + patch("jobs.collection_job.PriceCollector", make_mock_collector("price")), \ + patch("jobs.collection_job.ValuationCollector", make_mock_collector("valuation")), \ + patch("jobs.collection_job.ETFCollector", make_mock_collector("etf")), \ + patch("jobs.collection_job.ETFPriceCollector", make_mock_collector("etf_price")): + mock_session_local.return_value = MagicMock() + run_daily_collection() + + # stock failed, but rest should continue + assert call_order == ["sector", "price", "valuation", "etf", "etf_price"]