galaxis-po/docs/plans/2026-02-14-data-collection-scheduling.md
2026-02-18 21:44:45 +09:00

20 KiB

Data Collection Scheduling Implementation Plan

For Claude: REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.

Goal: Add daily scheduled data collection (18:00 Mon-Fri) and an API-triggered backfill job to collect all historical price data.

Architecture: A new collection_job.py module orchestrates collectors in dependency order (master data first, then prices). The scheduler registers the daily job alongside the existing snapshot job. Backfill splits date ranges into yearly chunks, reusing existing PriceCollector and ETFPriceCollector.

Tech Stack: APScheduler (already in use), existing pykrx-based collectors, PostgreSQL, FastAPI


Task 1: Create collection_job.py with run_daily_collection

Files:

  • Create: backend/jobs/collection_job.py
  • Test: backend/tests/e2e/test_collection_job.py

Step 1: Write the failing test

Create backend/tests/e2e/test_collection_job.py:

"""
Tests for collection job orchestration.
"""
import pytest
from unittest.mock import patch, MagicMock
from datetime import datetime

from jobs.collection_job import run_daily_collection


def test_run_daily_collection_calls_collectors_in_order():
    """Daily collection should run all collectors in dependency order."""
    call_order = []

    def make_mock_collector(name):
        mock_cls = MagicMock()
        instance = MagicMock()
        instance.run.side_effect = lambda: call_order.append(name)
        mock_cls.return_value = instance
        return mock_cls

    with patch("jobs.collection_job.SessionLocal") as mock_session_local, \
         patch("jobs.collection_job.StockCollector", make_mock_collector("stock")), \
         patch("jobs.collection_job.SectorCollector", make_mock_collector("sector")), \
         patch("jobs.collection_job.PriceCollector", make_mock_collector("price")), \
         patch("jobs.collection_job.ValuationCollector", make_mock_collector("valuation")), \
         patch("jobs.collection_job.ETFCollector", make_mock_collector("etf")), \
         patch("jobs.collection_job.ETFPriceCollector", make_mock_collector("etf_price")):
        mock_session_local.return_value = MagicMock()
        run_daily_collection()

    assert call_order == ["stock", "sector", "price", "valuation", "etf", "etf_price"]


def test_run_daily_collection_continues_on_failure():
    """If one collector fails, the rest should still run."""
    call_order = []

    def make_mock_collector(name, should_fail=False):
        mock_cls = MagicMock()
        instance = MagicMock()
        def side_effect():
            if should_fail:
                raise RuntimeError(f"{name} failed")
            call_order.append(name)
        instance.run.side_effect = side_effect
        mock_cls.return_value = instance
        return mock_cls

    with patch("jobs.collection_job.SessionLocal") as mock_session_local, \
         patch("jobs.collection_job.StockCollector", make_mock_collector("stock", should_fail=True)), \
         patch("jobs.collection_job.SectorCollector", make_mock_collector("sector")), \
         patch("jobs.collection_job.PriceCollector", make_mock_collector("price")), \
         patch("jobs.collection_job.ValuationCollector", make_mock_collector("valuation")), \
         patch("jobs.collection_job.ETFCollector", make_mock_collector("etf")), \
         patch("jobs.collection_job.ETFPriceCollector", make_mock_collector("etf_price")):
        mock_session_local.return_value = MagicMock()
        run_daily_collection()

    # stock failed, but rest should continue
    assert call_order == ["sector", "price", "valuation", "etf", "etf_price"]

Step 2: Run test to verify it fails

Run: cd /home/zephyrdark/workspace/quant/galaxy-po/backend && python -m pytest tests/e2e/test_collection_job.py -v Expected: FAIL with ModuleNotFoundError: No module named 'jobs.collection_job'

Step 3: Write minimal implementation

Create backend/jobs/collection_job.py:

"""
Data collection orchestration jobs.
"""
import logging
from datetime import datetime

from app.core.database import SessionLocal
from app.services.collectors import (
    StockCollector,
    SectorCollector,
    PriceCollector,
    ValuationCollector,
    ETFCollector,
    ETFPriceCollector,
)

logger = logging.getLogger(__name__)

# Collectors in dependency order: master data first, then derived data
DAILY_COLLECTORS = [
    ("StockCollector", StockCollector, {}),
    ("SectorCollector", SectorCollector, {}),
    ("PriceCollector", PriceCollector, {}),
    ("ValuationCollector", ValuationCollector, {}),
    ("ETFCollector", ETFCollector, {}),
    ("ETFPriceCollector", ETFPriceCollector, {}),
]


def run_daily_collection():
    """
    Run all data collectors in dependency order.

    Each collector gets its own DB session. If one fails, the rest continue.
    Designed to be called by APScheduler at 18:00 Mon-Fri.
    """
    logger.info("Starting daily data collection")
    results = {}

    for name, collector_cls, kwargs in DAILY_COLLECTORS:
        db = SessionLocal()
        try:
            collector = collector_cls(db, **kwargs)
            collector.run()
            results[name] = "success"
            logger.info(f"{name} completed successfully")
        except Exception as e:
            results[name] = f"failed: {e}"
            logger.error(f"{name} failed: {e}")
        finally:
            db.close()

    logger.info(f"Daily collection finished: {results}")

Step 4: Run test to verify it passes

Run: cd /home/zephyrdark/workspace/quant/galaxy-po/backend && python -m pytest tests/e2e/test_collection_job.py -v Expected: 2 tests PASS

Step 5: Commit

git add backend/jobs/collection_job.py backend/tests/e2e/test_collection_job.py
git commit -m "feat: add daily collection job orchestration"

Task 2: Add run_backfill to collection_job.py

Files:

  • Modify: backend/jobs/collection_job.py
  • Test: backend/tests/e2e/test_collection_job.py

Step 1: Write the failing test

Append to backend/tests/e2e/test_collection_job.py:

from jobs.collection_job import run_backfill


def test_run_backfill_generates_yearly_chunks():
    """Backfill should split date range into yearly chunks."""
    collected_ranges = []

    def make_price_collector(name):
        mock_cls = MagicMock()
        def capture_init(db, start_date=None, end_date=None):
            instance = MagicMock()
            collected_ranges.append((name, start_date, end_date))
            return instance
        mock_cls.side_effect = capture_init
        return mock_cls

    with patch("jobs.collection_job.SessionLocal") as mock_session_local, \
         patch("jobs.collection_job.PriceCollector", make_price_collector("price")), \
         patch("jobs.collection_job.ETFPriceCollector", make_price_collector("etf_price")):
        mock_db = MagicMock()
        mock_session_local.return_value = mock_db
        # Simulate no existing data (min date returns None)
        mock_db.query.return_value.scalar.return_value = None

        run_backfill(start_year=2023)

    # Should generate chunks: 2023, 2024, 2025, 2026 (partial) for both price and etf_price
    price_ranges = [(s, e) for name, s, e in collected_ranges if name == "price"]
    assert len(price_ranges) >= 3  # At least 2023, 2024, 2025
    assert price_ranges[0][0] == "20230101"  # First chunk starts at start_year


def test_run_backfill_skips_already_collected_range():
    """Backfill should start from earliest existing data backwards."""
    collected_ranges = []

    def make_price_collector(name):
        mock_cls = MagicMock()
        def capture_init(db, start_date=None, end_date=None):
            instance = MagicMock()
            collected_ranges.append((name, start_date, end_date))
            return instance
        mock_cls.side_effect = capture_init
        return mock_cls

    from datetime import date

    with patch("jobs.collection_job.SessionLocal") as mock_session_local, \
         patch("jobs.collection_job.PriceCollector", make_price_collector("price")), \
         patch("jobs.collection_job.ETFPriceCollector", make_price_collector("etf_price")):
        mock_db = MagicMock()
        mock_session_local.return_value = mock_db

        # Simulate: Price has data from 2024-06-01, ETFPrice has no data
        def query_side_effect(model_attr):
            mock_q = MagicMock()
            if "Price" in str(model_attr):
                mock_q.scalar.return_value = date(2024, 6, 1)
            else:
                mock_q.scalar.return_value = None
            return mock_q
        mock_db.query.return_value = MagicMock()
        # We'll verify the function runs without error
        run_backfill(start_year=2023)

    assert len(collected_ranges) > 0

Step 2: Run test to verify it fails

Run: cd /home/zephyrdark/workspace/quant/galaxy-po/backend && python -m pytest tests/e2e/test_collection_job.py::test_run_backfill_generates_yearly_chunks -v Expected: FAIL with ImportError: cannot import name 'run_backfill'

Step 3: Write minimal implementation

Add to backend/jobs/collection_job.py:

from datetime import date, timedelta
from sqlalchemy import func

from app.models.stock import Price, ETFPrice


def _generate_yearly_chunks(start_year: int, end_date: date) -> list[tuple[str, str]]:
    """Generate (start_date, end_date) pairs in YYYYMMDD format, one per year."""
    chunks = []
    current_start = date(start_year, 1, 1)

    while current_start < end_date:
        current_end = date(current_start.year, 12, 31)
        if current_end > end_date:
            current_end = end_date
        chunks.append((
            current_start.strftime("%Y%m%d"),
            current_end.strftime("%Y%m%d"),
        ))
        current_start = date(current_start.year + 1, 1, 1)

    return chunks


def run_backfill(start_year: int = 2000):
    """
    Collect historical price data from start_year to today.

    Checks the earliest existing data in DB and only collects
    missing periods. Splits into yearly chunks to avoid overloading pykrx.
    """
    logger.info(f"Starting backfill from {start_year}")
    today = date.today()

    db = SessionLocal()
    try:
        # Determine what needs backfilling
        backfill_targets = [
            ("Price", PriceCollector, Price.date),
            ("ETFPrice", ETFPriceCollector, ETFPrice.date),
        ]

        for name, collector_cls, date_col in backfill_targets:
            # Find earliest existing data
            earliest = db.query(func.min(date_col)).scalar()

            if earliest is None:
                # No data at all - collect everything
                backfill_end = today
            else:
                # Data exists - collect from start_year to day before earliest
                backfill_end = earliest - timedelta(days=1)

            if date(start_year, 1, 1) >= backfill_end:
                logger.info(f"{name}: no backfill needed (data exists from {earliest})")
                continue

            chunks = _generate_yearly_chunks(start_year, backfill_end)
            logger.info(f"{name}: backfilling {len(chunks)} yearly chunks from {start_year} to {backfill_end}")

            for start_dt, end_dt in chunks:
                chunk_db = SessionLocal()
                try:
                    collector = collector_cls(chunk_db, start_date=start_dt, end_date=end_dt)
                    collector.run()
                    logger.info(f"{name}: chunk {start_dt}-{end_dt} completed")
                except Exception as e:
                    logger.error(f"{name}: chunk {start_dt}-{end_dt} failed: {e}")
                finally:
                    chunk_db.close()

            # Also fill gap between earliest data and today (forward fill)
            if earliest is not None:
                latest = db.query(func.max(date_col)).scalar()
                if latest and latest < today:
                    gap_start = (latest + timedelta(days=1)).strftime("%Y%m%d")
                    gap_end = today.strftime("%Y%m%d")
                    gap_db = SessionLocal()
                    try:
                        collector = collector_cls(gap_db, start_date=gap_start, end_date=gap_end)
                        collector.run()
                        logger.info(f"{name}: forward fill {gap_start}-{gap_end} completed")
                    except Exception as e:
                        logger.error(f"{name}: forward fill failed: {e}")
                    finally:
                        gap_db.close()

    finally:
        db.close()

    logger.info("Backfill completed")

Step 4: Run test to verify it passes

Run: cd /home/zephyrdark/workspace/quant/galaxy-po/backend && python -m pytest tests/e2e/test_collection_job.py -v Expected: All 4 tests PASS

Step 5: Commit

git add backend/jobs/collection_job.py backend/tests/e2e/test_collection_job.py
git commit -m "feat: add backfill job for historical price data"

Task 3: Register daily collection in scheduler

Files:

  • Modify: backend/jobs/scheduler.py
  • Modify: backend/jobs/__init__.py
  • Test: backend/tests/e2e/test_collection_job.py

Step 1: Write the failing test

Append to backend/tests/e2e/test_collection_job.py:

def test_scheduler_has_daily_collection_job():
    """Scheduler should register a daily_collection job at 18:00."""
    from jobs.scheduler import scheduler, configure_jobs

    # Reset scheduler for test
    if scheduler.running:
        scheduler.shutdown(wait=False)

    from apscheduler.schedulers.background import BackgroundScheduler
    test_scheduler = BackgroundScheduler()

    with patch("jobs.scheduler.scheduler", test_scheduler):
        configure_jobs()

    jobs = {job.id: job for job in test_scheduler.get_jobs()}
    assert "daily_collection" in jobs

    trigger = jobs["daily_collection"].trigger
    # Verify it's a cron trigger (we can check string representation)
    trigger_str = str(trigger)
    assert "18" in trigger_str  # hour=18

Step 2: Run test to verify it fails

Run: cd /home/zephyrdark/workspace/quant/galaxy-po/backend && python -m pytest tests/e2e/test_collection_job.py::test_scheduler_has_daily_collection_job -v Expected: FAIL with AssertionError: 'daily_collection' not in jobs

Step 3: Modify scheduler to add the daily collection job

Modify backend/jobs/scheduler.py:

"""
APScheduler configuration for background jobs.
"""
import logging
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger

from jobs.snapshot_job import create_daily_snapshots
from jobs.collection_job import run_daily_collection

logger = logging.getLogger(__name__)

# Create scheduler instance
scheduler = BackgroundScheduler()


def configure_jobs():
    """Configure scheduled jobs."""
    # Daily data collection at 18:00 (after market close, before snapshot)
    scheduler.add_job(
        run_daily_collection,
        trigger=CronTrigger(
            hour=18,
            minute=0,
            day_of_week='mon-fri',
        ),
        id='daily_collection',
        name='Collect daily market data',
        replace_existing=True,
    )
    logger.info("Configured daily_collection job at 18:00")

    # Daily snapshot at 18:30 (after data collection completes)
    scheduler.add_job(
        create_daily_snapshots,
        trigger=CronTrigger(
            hour=18,
            minute=30,
            day_of_week='mon-fri',
        ),
        id='daily_snapshots',
        name='Create daily portfolio snapshots',
        replace_existing=True,
    )
    logger.info("Configured daily_snapshots job at 18:30")


def start_scheduler():
    """Start the scheduler."""
    if not scheduler.running:
        configure_jobs()
        scheduler.start()
        logger.info("Scheduler started")


def stop_scheduler():
    """Stop the scheduler."""
    if scheduler.running:
        scheduler.shutdown()
        logger.info("Scheduler stopped")

Update backend/jobs/__init__.py:

"""
Background jobs module.
"""
from jobs.scheduler import scheduler, start_scheduler, stop_scheduler
from jobs.collection_job import run_daily_collection, run_backfill

__all__ = [
    "scheduler", "start_scheduler", "stop_scheduler",
    "run_daily_collection", "run_backfill",
]

Step 4: Run test to verify it passes

Run: cd /home/zephyrdark/workspace/quant/galaxy-po/backend && python -m pytest tests/e2e/test_collection_job.py -v Expected: All 5 tests PASS

Step 5: Commit

git add backend/jobs/scheduler.py backend/jobs/__init__.py backend/tests/e2e/test_collection_job.py
git commit -m "feat: register daily collection job at 18:00 in scheduler"

Task 4: Add backfill API endpoint

Files:

  • Modify: backend/app/api/admin.py
  • Test: backend/tests/e2e/test_collection_job.py

Step 1: Write the failing test

Append to backend/tests/e2e/test_collection_job.py:

def test_backfill_api_endpoint(client, admin_auth_headers):
    """POST /api/admin/collect/backfill should trigger backfill."""
    with patch("app.api.admin.run_backfill_background") as mock_backfill:
        response = client.post(
            "/api/admin/collect/backfill?start_year=2020",
            headers=admin_auth_headers,
        )
    assert response.status_code == 200
    assert "backfill" in response.json()["message"].lower()
    mock_backfill.assert_called_once()


def test_backfill_api_requires_auth(client):
    """POST /api/admin/collect/backfill should require authentication."""
    response = client.post("/api/admin/collect/backfill")
    assert response.status_code == 401

Step 2: Run test to verify it fails

Run: cd /home/zephyrdark/workspace/quant/galaxy-po/backend && python -m pytest tests/e2e/test_collection_job.py::test_backfill_api_endpoint -v Expected: FAIL with 404 or import error

Step 3: Add backfill endpoint to admin API

Add to backend/app/api/admin.py, after existing imports:

from jobs.collection_job import run_backfill

Add the background runner function (after _start_background_collection):

def _run_backfill_background(start_year: int):
    """Run backfill in a background thread."""
    try:
        run_backfill(start_year=start_year)
    except Exception as e:
        logger.error("Background backfill failed: %s", e)


def run_backfill_background(start_year: int):
    """Start backfill in a daemon thread."""
    thread = threading.Thread(
        target=_run_backfill_background,
        args=(start_year,),
        daemon=True,
    )
    thread.start()

Add the endpoint (after collect_etf_prices):

@router.post("/collect/backfill", response_model=CollectResponse)
async def collect_backfill(
    current_user: CurrentUser,
    start_year: int = Query(2000, ge=1990, le=2026, description="Start year for backfill"),
):
    """Backfill historical price data from start_year to today (runs in background)."""
    run_backfill_background(start_year)
    return CollectResponse(message=f"Backfill started from {start_year}")

Step 4: Run test to verify it passes

Run: cd /home/zephyrdark/workspace/quant/galaxy-po/backend && python -m pytest tests/e2e/test_collection_job.py -v Expected: All 7 tests PASS

Step 5: Commit

git add backend/app/api/admin.py backend/tests/e2e/test_collection_job.py
git commit -m "feat: add backfill API endpoint for historical data collection"

Task 5: Run full test suite and verify

Step 1: Run all existing tests

Run: cd /home/zephyrdark/workspace/quant/galaxy-po/backend && python -m pytest tests/ -v Expected: All tests PASS (existing + new)

Step 2: Verify no import issues

Run: cd /home/zephyrdark/workspace/quant/galaxy-po/backend && python -c "from jobs import run_daily_collection, run_backfill; print('OK')" Expected: OK

Step 3: Final commit if any fixes needed

git add -A
git commit -m "fix: resolve any test issues from collection scheduling"