Compare commits
4 Commits
a899c17a65
...
653fa08fa4
| Author | SHA1 | Date | |
|---|---|---|---|
| 653fa08fa4 | |||
| 238c4d1caf | |||
| fe48e20642 | |||
| d12bf7b54f |
@ -2,9 +2,13 @@
|
|||||||
APScheduler configuration for background jobs.
|
APScheduler configuration for background jobs.
|
||||||
"""
|
"""
|
||||||
import logging
|
import logging
|
||||||
|
from zoneinfo import ZoneInfo
|
||||||
|
|
||||||
from apscheduler.schedulers.background import BackgroundScheduler
|
from apscheduler.schedulers.background import BackgroundScheduler
|
||||||
from apscheduler.triggers.cron import CronTrigger
|
from apscheduler.triggers.cron import CronTrigger
|
||||||
|
|
||||||
|
KST = ZoneInfo("Asia/Seoul")
|
||||||
|
|
||||||
from jobs.snapshot_job import create_daily_snapshots
|
from jobs.snapshot_job import create_daily_snapshots
|
||||||
from jobs.collection_job import run_daily_collection
|
from jobs.collection_job import run_daily_collection
|
||||||
|
|
||||||
@ -23,12 +27,13 @@ def configure_jobs():
|
|||||||
hour=18,
|
hour=18,
|
||||||
minute=0,
|
minute=0,
|
||||||
day_of_week='mon-fri',
|
day_of_week='mon-fri',
|
||||||
|
timezone=KST,
|
||||||
),
|
),
|
||||||
id='daily_collection',
|
id='daily_collection',
|
||||||
name='Collect daily market data',
|
name='Collect daily market data',
|
||||||
replace_existing=True,
|
replace_existing=True,
|
||||||
)
|
)
|
||||||
logger.info("Configured daily_collection job at 18:00")
|
logger.info("Configured daily_collection job at 18:00 KST")
|
||||||
|
|
||||||
# Daily snapshot at 18:30 (after data collection completes)
|
# Daily snapshot at 18:30 (after data collection completes)
|
||||||
scheduler.add_job(
|
scheduler.add_job(
|
||||||
@ -37,12 +42,13 @@ def configure_jobs():
|
|||||||
hour=18,
|
hour=18,
|
||||||
minute=30,
|
minute=30,
|
||||||
day_of_week='mon-fri',
|
day_of_week='mon-fri',
|
||||||
|
timezone=KST,
|
||||||
),
|
),
|
||||||
id='daily_snapshots',
|
id='daily_snapshots',
|
||||||
name='Create daily portfolio snapshots',
|
name='Create daily portfolio snapshots',
|
||||||
replace_existing=True,
|
replace_existing=True,
|
||||||
)
|
)
|
||||||
logger.info("Configured daily_snapshots job at 18:30")
|
logger.info("Configured daily_snapshots job at 18:30 KST")
|
||||||
|
|
||||||
|
|
||||||
def start_scheduler():
|
def start_scheduler():
|
||||||
|
|||||||
@ -42,6 +42,15 @@ TARGETS = {
|
|||||||
"411060": Decimal("15"),
|
"411060": Decimal("15"),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
# Actual total invested amounts per ticker (from brokerage records)
|
||||||
|
TOTAL_INVESTED = {
|
||||||
|
"069500": Decimal("541040"),
|
||||||
|
"148070": Decimal("15432133"),
|
||||||
|
"284430": Decimal("18375975"),
|
||||||
|
"360750": Decimal("7683515"),
|
||||||
|
"411060": Decimal("6829620"),
|
||||||
|
}
|
||||||
|
|
||||||
# Historical snapshots from data.txt
|
# Historical snapshots from data.txt
|
||||||
SNAPSHOTS = [
|
SNAPSHOTS = [
|
||||||
{
|
{
|
||||||
@ -242,13 +251,18 @@ def seed(db: Session):
|
|||||||
print(f"Created {tx_count} transactions from snapshot diffs")
|
print(f"Created {tx_count} transactions from snapshot diffs")
|
||||||
|
|
||||||
# Set current holdings from latest snapshot
|
# Set current holdings from latest snapshot
|
||||||
|
# avg_price = total invested amount / quantity (from actual brokerage records)
|
||||||
latest = SNAPSHOTS[-1]
|
latest = SNAPSHOTS[-1]
|
||||||
for h in latest["holdings"]:
|
for h in latest["holdings"]:
|
||||||
|
ticker = h["ticker"]
|
||||||
|
qty = h["qty"]
|
||||||
|
invested = TOTAL_INVESTED[ticker]
|
||||||
|
avg_price = (invested / qty).quantize(Decimal("0.01"))
|
||||||
db.add(Holding(
|
db.add(Holding(
|
||||||
portfolio_id=portfolio.id,
|
portfolio_id=portfolio.id,
|
||||||
ticker=h["ticker"],
|
ticker=ticker,
|
||||||
quantity=h["qty"],
|
quantity=qty,
|
||||||
avg_price=h["price"], # Using current price as avg (best available)
|
avg_price=avg_price,
|
||||||
))
|
))
|
||||||
print(f"Set {len(latest['holdings'])} current holdings from {latest['date']}")
|
print(f"Set {len(latest['holdings'])} current holdings from {latest['date']}")
|
||||||
|
|
||||||
|
|||||||
2064
docs/plans/2026-02-11-portfolio-rebalancing-design.md
Normal file
2064
docs/plans/2026-02-11-portfolio-rebalancing-design.md
Normal file
File diff suppressed because it is too large
Load Diff
605
docs/plans/2026-02-14-data-collection-scheduling.md
Normal file
605
docs/plans/2026-02-14-data-collection-scheduling.md
Normal file
@ -0,0 +1,605 @@
|
|||||||
|
# Data Collection Scheduling Implementation Plan
|
||||||
|
|
||||||
|
> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.
|
||||||
|
|
||||||
|
**Goal:** Add daily scheduled data collection (18:00 Mon-Fri) and an API-triggered backfill job to collect all historical price data.
|
||||||
|
|
||||||
|
**Architecture:** A new `collection_job.py` module orchestrates collectors in dependency order (master data first, then prices). The scheduler registers the daily job alongside the existing snapshot job. Backfill splits date ranges into yearly chunks, reusing existing `PriceCollector` and `ETFPriceCollector`.
|
||||||
|
|
||||||
|
**Tech Stack:** APScheduler (already in use), existing pykrx-based collectors, PostgreSQL, FastAPI
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
### Task 1: Create `collection_job.py` with `run_daily_collection`
|
||||||
|
|
||||||
|
**Files:**
|
||||||
|
- Create: `backend/jobs/collection_job.py`
|
||||||
|
- Test: `backend/tests/e2e/test_collection_job.py`
|
||||||
|
|
||||||
|
**Step 1: Write the failing test**
|
||||||
|
|
||||||
|
Create `backend/tests/e2e/test_collection_job.py`:
|
||||||
|
|
||||||
|
```python
|
||||||
|
"""
|
||||||
|
Tests for collection job orchestration.
|
||||||
|
"""
|
||||||
|
import pytest
|
||||||
|
from unittest.mock import patch, MagicMock
|
||||||
|
from datetime import datetime
|
||||||
|
|
||||||
|
from jobs.collection_job import run_daily_collection
|
||||||
|
|
||||||
|
|
||||||
|
def test_run_daily_collection_calls_collectors_in_order():
|
||||||
|
"""Daily collection should run all collectors in dependency order."""
|
||||||
|
call_order = []
|
||||||
|
|
||||||
|
def make_mock_collector(name):
|
||||||
|
mock_cls = MagicMock()
|
||||||
|
instance = MagicMock()
|
||||||
|
instance.run.side_effect = lambda: call_order.append(name)
|
||||||
|
mock_cls.return_value = instance
|
||||||
|
return mock_cls
|
||||||
|
|
||||||
|
with patch("jobs.collection_job.SessionLocal") as mock_session_local, \
|
||||||
|
patch("jobs.collection_job.StockCollector", make_mock_collector("stock")), \
|
||||||
|
patch("jobs.collection_job.SectorCollector", make_mock_collector("sector")), \
|
||||||
|
patch("jobs.collection_job.PriceCollector", make_mock_collector("price")), \
|
||||||
|
patch("jobs.collection_job.ValuationCollector", make_mock_collector("valuation")), \
|
||||||
|
patch("jobs.collection_job.ETFCollector", make_mock_collector("etf")), \
|
||||||
|
patch("jobs.collection_job.ETFPriceCollector", make_mock_collector("etf_price")):
|
||||||
|
mock_session_local.return_value = MagicMock()
|
||||||
|
run_daily_collection()
|
||||||
|
|
||||||
|
assert call_order == ["stock", "sector", "price", "valuation", "etf", "etf_price"]
|
||||||
|
|
||||||
|
|
||||||
|
def test_run_daily_collection_continues_on_failure():
|
||||||
|
"""If one collector fails, the rest should still run."""
|
||||||
|
call_order = []
|
||||||
|
|
||||||
|
def make_mock_collector(name, should_fail=False):
|
||||||
|
mock_cls = MagicMock()
|
||||||
|
instance = MagicMock()
|
||||||
|
def side_effect():
|
||||||
|
if should_fail:
|
||||||
|
raise RuntimeError(f"{name} failed")
|
||||||
|
call_order.append(name)
|
||||||
|
instance.run.side_effect = side_effect
|
||||||
|
mock_cls.return_value = instance
|
||||||
|
return mock_cls
|
||||||
|
|
||||||
|
with patch("jobs.collection_job.SessionLocal") as mock_session_local, \
|
||||||
|
patch("jobs.collection_job.StockCollector", make_mock_collector("stock", should_fail=True)), \
|
||||||
|
patch("jobs.collection_job.SectorCollector", make_mock_collector("sector")), \
|
||||||
|
patch("jobs.collection_job.PriceCollector", make_mock_collector("price")), \
|
||||||
|
patch("jobs.collection_job.ValuationCollector", make_mock_collector("valuation")), \
|
||||||
|
patch("jobs.collection_job.ETFCollector", make_mock_collector("etf")), \
|
||||||
|
patch("jobs.collection_job.ETFPriceCollector", make_mock_collector("etf_price")):
|
||||||
|
mock_session_local.return_value = MagicMock()
|
||||||
|
run_daily_collection()
|
||||||
|
|
||||||
|
# stock failed, but rest should continue
|
||||||
|
assert call_order == ["sector", "price", "valuation", "etf", "etf_price"]
|
||||||
|
```
|
||||||
|
|
||||||
|
**Step 2: Run test to verify it fails**
|
||||||
|
|
||||||
|
Run: `cd /home/zephyrdark/workspace/quant/galaxy-po/backend && python -m pytest tests/e2e/test_collection_job.py -v`
|
||||||
|
Expected: FAIL with `ModuleNotFoundError: No module named 'jobs.collection_job'`
|
||||||
|
|
||||||
|
**Step 3: Write minimal implementation**
|
||||||
|
|
||||||
|
Create `backend/jobs/collection_job.py`:
|
||||||
|
|
||||||
|
```python
|
||||||
|
"""
|
||||||
|
Data collection orchestration jobs.
|
||||||
|
"""
|
||||||
|
import logging
|
||||||
|
from datetime import datetime
|
||||||
|
|
||||||
|
from app.core.database import SessionLocal
|
||||||
|
from app.services.collectors import (
|
||||||
|
StockCollector,
|
||||||
|
SectorCollector,
|
||||||
|
PriceCollector,
|
||||||
|
ValuationCollector,
|
||||||
|
ETFCollector,
|
||||||
|
ETFPriceCollector,
|
||||||
|
)
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
# Collectors in dependency order: master data first, then derived data
|
||||||
|
DAILY_COLLECTORS = [
|
||||||
|
("StockCollector", StockCollector, {}),
|
||||||
|
("SectorCollector", SectorCollector, {}),
|
||||||
|
("PriceCollector", PriceCollector, {}),
|
||||||
|
("ValuationCollector", ValuationCollector, {}),
|
||||||
|
("ETFCollector", ETFCollector, {}),
|
||||||
|
("ETFPriceCollector", ETFPriceCollector, {}),
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
def run_daily_collection():
|
||||||
|
"""
|
||||||
|
Run all data collectors in dependency order.
|
||||||
|
|
||||||
|
Each collector gets its own DB session. If one fails, the rest continue.
|
||||||
|
Designed to be called by APScheduler at 18:00 Mon-Fri.
|
||||||
|
"""
|
||||||
|
logger.info("Starting daily data collection")
|
||||||
|
results = {}
|
||||||
|
|
||||||
|
for name, collector_cls, kwargs in DAILY_COLLECTORS:
|
||||||
|
db = SessionLocal()
|
||||||
|
try:
|
||||||
|
collector = collector_cls(db, **kwargs)
|
||||||
|
collector.run()
|
||||||
|
results[name] = "success"
|
||||||
|
logger.info(f"{name} completed successfully")
|
||||||
|
except Exception as e:
|
||||||
|
results[name] = f"failed: {e}"
|
||||||
|
logger.error(f"{name} failed: {e}")
|
||||||
|
finally:
|
||||||
|
db.close()
|
||||||
|
|
||||||
|
logger.info(f"Daily collection finished: {results}")
|
||||||
|
```
|
||||||
|
|
||||||
|
**Step 4: Run test to verify it passes**
|
||||||
|
|
||||||
|
Run: `cd /home/zephyrdark/workspace/quant/galaxy-po/backend && python -m pytest tests/e2e/test_collection_job.py -v`
|
||||||
|
Expected: 2 tests PASS
|
||||||
|
|
||||||
|
**Step 5: Commit**
|
||||||
|
|
||||||
|
```bash
|
||||||
|
git add backend/jobs/collection_job.py backend/tests/e2e/test_collection_job.py
|
||||||
|
git commit -m "feat: add daily collection job orchestration"
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
### Task 2: Add `run_backfill` to `collection_job.py`
|
||||||
|
|
||||||
|
**Files:**
|
||||||
|
- Modify: `backend/jobs/collection_job.py`
|
||||||
|
- Test: `backend/tests/e2e/test_collection_job.py`
|
||||||
|
|
||||||
|
**Step 1: Write the failing test**
|
||||||
|
|
||||||
|
Append to `backend/tests/e2e/test_collection_job.py`:
|
||||||
|
|
||||||
|
```python
|
||||||
|
from jobs.collection_job import run_backfill
|
||||||
|
|
||||||
|
|
||||||
|
def test_run_backfill_generates_yearly_chunks():
|
||||||
|
"""Backfill should split date range into yearly chunks."""
|
||||||
|
collected_ranges = []
|
||||||
|
|
||||||
|
def make_price_collector(name):
|
||||||
|
mock_cls = MagicMock()
|
||||||
|
def capture_init(db, start_date=None, end_date=None):
|
||||||
|
instance = MagicMock()
|
||||||
|
collected_ranges.append((name, start_date, end_date))
|
||||||
|
return instance
|
||||||
|
mock_cls.side_effect = capture_init
|
||||||
|
return mock_cls
|
||||||
|
|
||||||
|
with patch("jobs.collection_job.SessionLocal") as mock_session_local, \
|
||||||
|
patch("jobs.collection_job.PriceCollector", make_price_collector("price")), \
|
||||||
|
patch("jobs.collection_job.ETFPriceCollector", make_price_collector("etf_price")):
|
||||||
|
mock_db = MagicMock()
|
||||||
|
mock_session_local.return_value = mock_db
|
||||||
|
# Simulate no existing data (min date returns None)
|
||||||
|
mock_db.query.return_value.scalar.return_value = None
|
||||||
|
|
||||||
|
run_backfill(start_year=2023)
|
||||||
|
|
||||||
|
# Should generate chunks: 2023, 2024, 2025, 2026 (partial) for both price and etf_price
|
||||||
|
price_ranges = [(s, e) for name, s, e in collected_ranges if name == "price"]
|
||||||
|
assert len(price_ranges) >= 3 # At least 2023, 2024, 2025
|
||||||
|
assert price_ranges[0][0] == "20230101" # First chunk starts at start_year
|
||||||
|
|
||||||
|
|
||||||
|
def test_run_backfill_skips_already_collected_range():
|
||||||
|
"""Backfill should start from earliest existing data backwards."""
|
||||||
|
collected_ranges = []
|
||||||
|
|
||||||
|
def make_price_collector(name):
|
||||||
|
mock_cls = MagicMock()
|
||||||
|
def capture_init(db, start_date=None, end_date=None):
|
||||||
|
instance = MagicMock()
|
||||||
|
collected_ranges.append((name, start_date, end_date))
|
||||||
|
return instance
|
||||||
|
mock_cls.side_effect = capture_init
|
||||||
|
return mock_cls
|
||||||
|
|
||||||
|
from datetime import date
|
||||||
|
|
||||||
|
with patch("jobs.collection_job.SessionLocal") as mock_session_local, \
|
||||||
|
patch("jobs.collection_job.PriceCollector", make_price_collector("price")), \
|
||||||
|
patch("jobs.collection_job.ETFPriceCollector", make_price_collector("etf_price")):
|
||||||
|
mock_db = MagicMock()
|
||||||
|
mock_session_local.return_value = mock_db
|
||||||
|
|
||||||
|
# Simulate: Price has data from 2024-06-01, ETFPrice has no data
|
||||||
|
def query_side_effect(model_attr):
|
||||||
|
mock_q = MagicMock()
|
||||||
|
if "Price" in str(model_attr):
|
||||||
|
mock_q.scalar.return_value = date(2024, 6, 1)
|
||||||
|
else:
|
||||||
|
mock_q.scalar.return_value = None
|
||||||
|
return mock_q
|
||||||
|
mock_db.query.return_value = MagicMock()
|
||||||
|
# We'll verify the function runs without error
|
||||||
|
run_backfill(start_year=2023)
|
||||||
|
|
||||||
|
assert len(collected_ranges) > 0
|
||||||
|
```
|
||||||
|
|
||||||
|
**Step 2: Run test to verify it fails**
|
||||||
|
|
||||||
|
Run: `cd /home/zephyrdark/workspace/quant/galaxy-po/backend && python -m pytest tests/e2e/test_collection_job.py::test_run_backfill_generates_yearly_chunks -v`
|
||||||
|
Expected: FAIL with `ImportError: cannot import name 'run_backfill'`
|
||||||
|
|
||||||
|
**Step 3: Write minimal implementation**
|
||||||
|
|
||||||
|
Add to `backend/jobs/collection_job.py`:
|
||||||
|
|
||||||
|
```python
|
||||||
|
from datetime import date, timedelta
|
||||||
|
from sqlalchemy import func
|
||||||
|
|
||||||
|
from app.models.stock import Price, ETFPrice
|
||||||
|
|
||||||
|
|
||||||
|
def _generate_yearly_chunks(start_year: int, end_date: date) -> list[tuple[str, str]]:
|
||||||
|
"""Generate (start_date, end_date) pairs in YYYYMMDD format, one per year."""
|
||||||
|
chunks = []
|
||||||
|
current_start = date(start_year, 1, 1)
|
||||||
|
|
||||||
|
while current_start < end_date:
|
||||||
|
current_end = date(current_start.year, 12, 31)
|
||||||
|
if current_end > end_date:
|
||||||
|
current_end = end_date
|
||||||
|
chunks.append((
|
||||||
|
current_start.strftime("%Y%m%d"),
|
||||||
|
current_end.strftime("%Y%m%d"),
|
||||||
|
))
|
||||||
|
current_start = date(current_start.year + 1, 1, 1)
|
||||||
|
|
||||||
|
return chunks
|
||||||
|
|
||||||
|
|
||||||
|
def run_backfill(start_year: int = 2000):
|
||||||
|
"""
|
||||||
|
Collect historical price data from start_year to today.
|
||||||
|
|
||||||
|
Checks the earliest existing data in DB and only collects
|
||||||
|
missing periods. Splits into yearly chunks to avoid overloading pykrx.
|
||||||
|
"""
|
||||||
|
logger.info(f"Starting backfill from {start_year}")
|
||||||
|
today = date.today()
|
||||||
|
|
||||||
|
db = SessionLocal()
|
||||||
|
try:
|
||||||
|
# Determine what needs backfilling
|
||||||
|
backfill_targets = [
|
||||||
|
("Price", PriceCollector, Price.date),
|
||||||
|
("ETFPrice", ETFPriceCollector, ETFPrice.date),
|
||||||
|
]
|
||||||
|
|
||||||
|
for name, collector_cls, date_col in backfill_targets:
|
||||||
|
# Find earliest existing data
|
||||||
|
earliest = db.query(func.min(date_col)).scalar()
|
||||||
|
|
||||||
|
if earliest is None:
|
||||||
|
# No data at all - collect everything
|
||||||
|
backfill_end = today
|
||||||
|
else:
|
||||||
|
# Data exists - collect from start_year to day before earliest
|
||||||
|
backfill_end = earliest - timedelta(days=1)
|
||||||
|
|
||||||
|
if date(start_year, 1, 1) >= backfill_end:
|
||||||
|
logger.info(f"{name}: no backfill needed (data exists from {earliest})")
|
||||||
|
continue
|
||||||
|
|
||||||
|
chunks = _generate_yearly_chunks(start_year, backfill_end)
|
||||||
|
logger.info(f"{name}: backfilling {len(chunks)} yearly chunks from {start_year} to {backfill_end}")
|
||||||
|
|
||||||
|
for start_dt, end_dt in chunks:
|
||||||
|
chunk_db = SessionLocal()
|
||||||
|
try:
|
||||||
|
collector = collector_cls(chunk_db, start_date=start_dt, end_date=end_dt)
|
||||||
|
collector.run()
|
||||||
|
logger.info(f"{name}: chunk {start_dt}-{end_dt} completed")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"{name}: chunk {start_dt}-{end_dt} failed: {e}")
|
||||||
|
finally:
|
||||||
|
chunk_db.close()
|
||||||
|
|
||||||
|
# Also fill gap between earliest data and today (forward fill)
|
||||||
|
if earliest is not None:
|
||||||
|
latest = db.query(func.max(date_col)).scalar()
|
||||||
|
if latest and latest < today:
|
||||||
|
gap_start = (latest + timedelta(days=1)).strftime("%Y%m%d")
|
||||||
|
gap_end = today.strftime("%Y%m%d")
|
||||||
|
gap_db = SessionLocal()
|
||||||
|
try:
|
||||||
|
collector = collector_cls(gap_db, start_date=gap_start, end_date=gap_end)
|
||||||
|
collector.run()
|
||||||
|
logger.info(f"{name}: forward fill {gap_start}-{gap_end} completed")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"{name}: forward fill failed: {e}")
|
||||||
|
finally:
|
||||||
|
gap_db.close()
|
||||||
|
|
||||||
|
finally:
|
||||||
|
db.close()
|
||||||
|
|
||||||
|
logger.info("Backfill completed")
|
||||||
|
```
|
||||||
|
|
||||||
|
**Step 4: Run test to verify it passes**
|
||||||
|
|
||||||
|
Run: `cd /home/zephyrdark/workspace/quant/galaxy-po/backend && python -m pytest tests/e2e/test_collection_job.py -v`
|
||||||
|
Expected: All 4 tests PASS
|
||||||
|
|
||||||
|
**Step 5: Commit**
|
||||||
|
|
||||||
|
```bash
|
||||||
|
git add backend/jobs/collection_job.py backend/tests/e2e/test_collection_job.py
|
||||||
|
git commit -m "feat: add backfill job for historical price data"
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
### Task 3: Register daily collection in scheduler
|
||||||
|
|
||||||
|
**Files:**
|
||||||
|
- Modify: `backend/jobs/scheduler.py`
|
||||||
|
- Modify: `backend/jobs/__init__.py`
|
||||||
|
- Test: `backend/tests/e2e/test_collection_job.py`
|
||||||
|
|
||||||
|
**Step 1: Write the failing test**
|
||||||
|
|
||||||
|
Append to `backend/tests/e2e/test_collection_job.py`:
|
||||||
|
|
||||||
|
```python
|
||||||
|
def test_scheduler_has_daily_collection_job():
|
||||||
|
"""Scheduler should register a daily_collection job at 18:00."""
|
||||||
|
from jobs.scheduler import scheduler, configure_jobs
|
||||||
|
|
||||||
|
# Reset scheduler for test
|
||||||
|
if scheduler.running:
|
||||||
|
scheduler.shutdown(wait=False)
|
||||||
|
|
||||||
|
from apscheduler.schedulers.background import BackgroundScheduler
|
||||||
|
test_scheduler = BackgroundScheduler()
|
||||||
|
|
||||||
|
with patch("jobs.scheduler.scheduler", test_scheduler):
|
||||||
|
configure_jobs()
|
||||||
|
|
||||||
|
jobs = {job.id: job for job in test_scheduler.get_jobs()}
|
||||||
|
assert "daily_collection" in jobs
|
||||||
|
|
||||||
|
trigger = jobs["daily_collection"].trigger
|
||||||
|
# Verify it's a cron trigger (we can check string representation)
|
||||||
|
trigger_str = str(trigger)
|
||||||
|
assert "18" in trigger_str # hour=18
|
||||||
|
```
|
||||||
|
|
||||||
|
**Step 2: Run test to verify it fails**
|
||||||
|
|
||||||
|
Run: `cd /home/zephyrdark/workspace/quant/galaxy-po/backend && python -m pytest tests/e2e/test_collection_job.py::test_scheduler_has_daily_collection_job -v`
|
||||||
|
Expected: FAIL with `AssertionError: 'daily_collection' not in jobs`
|
||||||
|
|
||||||
|
**Step 3: Modify scheduler to add the daily collection job**
|
||||||
|
|
||||||
|
Modify `backend/jobs/scheduler.py`:
|
||||||
|
|
||||||
|
```python
|
||||||
|
"""
|
||||||
|
APScheduler configuration for background jobs.
|
||||||
|
"""
|
||||||
|
import logging
|
||||||
|
from apscheduler.schedulers.background import BackgroundScheduler
|
||||||
|
from apscheduler.triggers.cron import CronTrigger
|
||||||
|
|
||||||
|
from jobs.snapshot_job import create_daily_snapshots
|
||||||
|
from jobs.collection_job import run_daily_collection
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
# Create scheduler instance
|
||||||
|
scheduler = BackgroundScheduler()
|
||||||
|
|
||||||
|
|
||||||
|
def configure_jobs():
|
||||||
|
"""Configure scheduled jobs."""
|
||||||
|
# Daily data collection at 18:00 (after market close, before snapshot)
|
||||||
|
scheduler.add_job(
|
||||||
|
run_daily_collection,
|
||||||
|
trigger=CronTrigger(
|
||||||
|
hour=18,
|
||||||
|
minute=0,
|
||||||
|
day_of_week='mon-fri',
|
||||||
|
),
|
||||||
|
id='daily_collection',
|
||||||
|
name='Collect daily market data',
|
||||||
|
replace_existing=True,
|
||||||
|
)
|
||||||
|
logger.info("Configured daily_collection job at 18:00")
|
||||||
|
|
||||||
|
# Daily snapshot at 18:30 (after data collection completes)
|
||||||
|
scheduler.add_job(
|
||||||
|
create_daily_snapshots,
|
||||||
|
trigger=CronTrigger(
|
||||||
|
hour=18,
|
||||||
|
minute=30,
|
||||||
|
day_of_week='mon-fri',
|
||||||
|
),
|
||||||
|
id='daily_snapshots',
|
||||||
|
name='Create daily portfolio snapshots',
|
||||||
|
replace_existing=True,
|
||||||
|
)
|
||||||
|
logger.info("Configured daily_snapshots job at 18:30")
|
||||||
|
|
||||||
|
|
||||||
|
def start_scheduler():
|
||||||
|
"""Start the scheduler."""
|
||||||
|
if not scheduler.running:
|
||||||
|
configure_jobs()
|
||||||
|
scheduler.start()
|
||||||
|
logger.info("Scheduler started")
|
||||||
|
|
||||||
|
|
||||||
|
def stop_scheduler():
|
||||||
|
"""Stop the scheduler."""
|
||||||
|
if scheduler.running:
|
||||||
|
scheduler.shutdown()
|
||||||
|
logger.info("Scheduler stopped")
|
||||||
|
```
|
||||||
|
|
||||||
|
Update `backend/jobs/__init__.py`:
|
||||||
|
|
||||||
|
```python
|
||||||
|
"""
|
||||||
|
Background jobs module.
|
||||||
|
"""
|
||||||
|
from jobs.scheduler import scheduler, start_scheduler, stop_scheduler
|
||||||
|
from jobs.collection_job import run_daily_collection, run_backfill
|
||||||
|
|
||||||
|
__all__ = [
|
||||||
|
"scheduler", "start_scheduler", "stop_scheduler",
|
||||||
|
"run_daily_collection", "run_backfill",
|
||||||
|
]
|
||||||
|
```
|
||||||
|
|
||||||
|
**Step 4: Run test to verify it passes**
|
||||||
|
|
||||||
|
Run: `cd /home/zephyrdark/workspace/quant/galaxy-po/backend && python -m pytest tests/e2e/test_collection_job.py -v`
|
||||||
|
Expected: All 5 tests PASS
|
||||||
|
|
||||||
|
**Step 5: Commit**
|
||||||
|
|
||||||
|
```bash
|
||||||
|
git add backend/jobs/scheduler.py backend/jobs/__init__.py backend/tests/e2e/test_collection_job.py
|
||||||
|
git commit -m "feat: register daily collection job at 18:00 in scheduler"
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
### Task 4: Add backfill API endpoint
|
||||||
|
|
||||||
|
**Files:**
|
||||||
|
- Modify: `backend/app/api/admin.py`
|
||||||
|
- Test: `backend/tests/e2e/test_collection_job.py`
|
||||||
|
|
||||||
|
**Step 1: Write the failing test**
|
||||||
|
|
||||||
|
Append to `backend/tests/e2e/test_collection_job.py`:
|
||||||
|
|
||||||
|
```python
|
||||||
|
def test_backfill_api_endpoint(client, admin_auth_headers):
|
||||||
|
"""POST /api/admin/collect/backfill should trigger backfill."""
|
||||||
|
with patch("app.api.admin.run_backfill_background") as mock_backfill:
|
||||||
|
response = client.post(
|
||||||
|
"/api/admin/collect/backfill?start_year=2020",
|
||||||
|
headers=admin_auth_headers,
|
||||||
|
)
|
||||||
|
assert response.status_code == 200
|
||||||
|
assert "backfill" in response.json()["message"].lower()
|
||||||
|
mock_backfill.assert_called_once()
|
||||||
|
|
||||||
|
|
||||||
|
def test_backfill_api_requires_auth(client):
|
||||||
|
"""POST /api/admin/collect/backfill should require authentication."""
|
||||||
|
response = client.post("/api/admin/collect/backfill")
|
||||||
|
assert response.status_code == 401
|
||||||
|
```
|
||||||
|
|
||||||
|
**Step 2: Run test to verify it fails**
|
||||||
|
|
||||||
|
Run: `cd /home/zephyrdark/workspace/quant/galaxy-po/backend && python -m pytest tests/e2e/test_collection_job.py::test_backfill_api_endpoint -v`
|
||||||
|
Expected: FAIL with 404 or import error
|
||||||
|
|
||||||
|
**Step 3: Add backfill endpoint to admin API**
|
||||||
|
|
||||||
|
Add to `backend/app/api/admin.py`, after existing imports:
|
||||||
|
|
||||||
|
```python
|
||||||
|
from jobs.collection_job import run_backfill
|
||||||
|
```
|
||||||
|
|
||||||
|
Add the background runner function (after `_start_background_collection`):
|
||||||
|
|
||||||
|
```python
|
||||||
|
def _run_backfill_background(start_year: int):
|
||||||
|
"""Run backfill in a background thread."""
|
||||||
|
try:
|
||||||
|
run_backfill(start_year=start_year)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("Background backfill failed: %s", e)
|
||||||
|
|
||||||
|
|
||||||
|
def run_backfill_background(start_year: int):
|
||||||
|
"""Start backfill in a daemon thread."""
|
||||||
|
thread = threading.Thread(
|
||||||
|
target=_run_backfill_background,
|
||||||
|
args=(start_year,),
|
||||||
|
daemon=True,
|
||||||
|
)
|
||||||
|
thread.start()
|
||||||
|
```
|
||||||
|
|
||||||
|
Add the endpoint (after `collect_etf_prices`):
|
||||||
|
|
||||||
|
```python
|
||||||
|
@router.post("/collect/backfill", response_model=CollectResponse)
|
||||||
|
async def collect_backfill(
|
||||||
|
current_user: CurrentUser,
|
||||||
|
start_year: int = Query(2000, ge=1990, le=2026, description="Start year for backfill"),
|
||||||
|
):
|
||||||
|
"""Backfill historical price data from start_year to today (runs in background)."""
|
||||||
|
run_backfill_background(start_year)
|
||||||
|
return CollectResponse(message=f"Backfill started from {start_year}")
|
||||||
|
```
|
||||||
|
|
||||||
|
**Step 4: Run test to verify it passes**
|
||||||
|
|
||||||
|
Run: `cd /home/zephyrdark/workspace/quant/galaxy-po/backend && python -m pytest tests/e2e/test_collection_job.py -v`
|
||||||
|
Expected: All 7 tests PASS
|
||||||
|
|
||||||
|
**Step 5: Commit**
|
||||||
|
|
||||||
|
```bash
|
||||||
|
git add backend/app/api/admin.py backend/tests/e2e/test_collection_job.py
|
||||||
|
git commit -m "feat: add backfill API endpoint for historical data collection"
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
### Task 5: Run full test suite and verify
|
||||||
|
|
||||||
|
**Step 1: Run all existing tests**
|
||||||
|
|
||||||
|
Run: `cd /home/zephyrdark/workspace/quant/galaxy-po/backend && python -m pytest tests/ -v`
|
||||||
|
Expected: All tests PASS (existing + new)
|
||||||
|
|
||||||
|
**Step 2: Verify no import issues**
|
||||||
|
|
||||||
|
Run: `cd /home/zephyrdark/workspace/quant/galaxy-po/backend && python -c "from jobs import run_daily_collection, run_backfill; print('OK')"`
|
||||||
|
Expected: `OK`
|
||||||
|
|
||||||
|
**Step 3: Final commit if any fixes needed**
|
||||||
|
|
||||||
|
```bash
|
||||||
|
git add -A
|
||||||
|
git commit -m "fix: resolve any test issues from collection scheduling"
|
||||||
|
```
|
||||||
@ -111,8 +111,13 @@ export default function RebalancePage() {
|
|||||||
setError(null);
|
setError(null);
|
||||||
try {
|
try {
|
||||||
const priceMap: Record<string, number> = {};
|
const priceMap: Record<string, number> = {};
|
||||||
for (const [ticker, price] of Object.entries(prices)) {
|
for (const [ticker, value] of Object.entries(prices)) {
|
||||||
priceMap[ticker] = parseFloat(price);
|
const qty = getHoldingQty(ticker);
|
||||||
|
if (qty > 0) {
|
||||||
|
priceMap[ticker] = parseFloat(value) / qty;
|
||||||
|
} else {
|
||||||
|
priceMap[ticker] = parseFloat(value);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
const body: Record<string, unknown> = {
|
const body: Record<string, unknown> = {
|
||||||
@ -190,23 +195,25 @@ export default function RebalancePage() {
|
|||||||
{/* Price Input */}
|
{/* Price Input */}
|
||||||
<Card className="mb-6">
|
<Card className="mb-6">
|
||||||
<CardHeader>
|
<CardHeader>
|
||||||
<CardTitle>현재 가격 입력</CardTitle>
|
<CardTitle>평가 금액 입력</CardTitle>
|
||||||
</CardHeader>
|
</CardHeader>
|
||||||
<CardContent>
|
<CardContent>
|
||||||
<div className="grid grid-cols-1 md:grid-cols-2 lg:grid-cols-3 gap-4">
|
<div className="grid grid-cols-1 md:grid-cols-2 lg:grid-cols-3 gap-4">
|
||||||
{Object.keys(prices).map((ticker) => {
|
{Object.keys(prices).map((ticker) => {
|
||||||
const target = targets.find((t) => t.ticker === ticker);
|
const target = targets.find((t) => t.ticker === ticker);
|
||||||
|
const qty = getHoldingQty(ticker);
|
||||||
|
const isValuation = qty > 0;
|
||||||
return (
|
return (
|
||||||
<div key={ticker}>
|
<div key={ticker}>
|
||||||
<Label htmlFor={`price-${ticker}`}>
|
<Label htmlFor={`price-${ticker}`}>
|
||||||
{nameMap[ticker] || ticker} {target ? `(목표 ${target.target_ratio}%)` : ''} - 보유 {getHoldingQty(ticker)}주
|
{nameMap[ticker] || ticker} {target ? `(목표 ${target.target_ratio}%)` : ''} - 보유 {qty}주
|
||||||
</Label>
|
</Label>
|
||||||
<Input
|
<Input
|
||||||
id={`price-${ticker}`}
|
id={`price-${ticker}`}
|
||||||
type="number"
|
type="number"
|
||||||
value={prices[ticker]}
|
value={prices[ticker]}
|
||||||
onChange={(e) => setPrices((prev) => ({ ...prev, [ticker]: e.target.value }))}
|
onChange={(e) => setPrices((prev) => ({ ...prev, [ticker]: e.target.value }))}
|
||||||
placeholder="현재 가격"
|
placeholder={isValuation ? '평가 금액' : '현재 가격'}
|
||||||
className="mt-1"
|
className="mt-1"
|
||||||
/>
|
/>
|
||||||
</div>
|
</div>
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user