fix: replace deprecated datetime.utcnow() and SQLAlchemy Query.get()

This commit is contained in:
머니페니 2026-03-18 18:53:29 +09:00
parent eb06dfc48b
commit 213f03a8e5
4 changed files with 14 additions and 14 deletions

View File

@ -373,7 +373,7 @@ async def apply_rebalance(
db: Session = Depends(get_db),
):
"""리밸런싱 결과를 적용하여 거래를 일괄 생성한다."""
from datetime import datetime
from datetime import datetime, timezone
portfolio = _get_portfolio(db, portfolio_id, current_user.id)
transactions = []
@ -387,7 +387,7 @@ async def apply_rebalance(
tx_type=tx_type,
quantity=item.quantity,
price=item.price,
executed_at=datetime.utcnow(),
executed_at=datetime.now(timezone.utc),
memo="리밸런싱 적용",
)
db.add(transaction)

View File

@ -1,7 +1,7 @@
"""
KJB Signal API endpoints.
"""
from datetime import date, datetime
from datetime import date, datetime, timezone
from typing import List, Optional
from fastapi import APIRouter, Depends, HTTPException, Query
@ -91,7 +91,7 @@ async def execute_signal(
tx_type=tx_type,
quantity=data.quantity,
price=data.price,
executed_at=datetime.utcnow(),
executed_at=datetime.now(timezone.utc),
memo=f"KJB signal #{signal.id}: {signal.signal_type.value}",
)
db.add(transaction)
@ -130,7 +130,7 @@ async def execute_signal(
signal.status = SignalStatus.EXECUTED
signal.executed_price = data.price
signal.executed_quantity = data.quantity
signal.executed_at = datetime.utcnow()
signal.executed_at = datetime.now(timezone.utc)
db.commit()
db.refresh(transaction)

View File

@ -1,7 +1,7 @@
"""
Background worker for backtest execution.
"""
from datetime import datetime
from datetime import datetime, timezone
from concurrent.futures import ThreadPoolExecutor
import logging
@ -35,7 +35,7 @@ def _run_backtest_job(backtest_id: int) -> None:
try:
# Update status to running
backtest = db.query(Backtest).get(backtest_id)
backtest = db.get(Backtest, backtest_id)
if not backtest:
logger.error(f"Backtest {backtest_id} not found")
return
@ -54,7 +54,7 @@ def _run_backtest_job(backtest_id: int) -> None:
# Update status to completed
backtest.status = BacktestStatus.COMPLETED
backtest.completed_at = datetime.utcnow()
backtest.completed_at = datetime.now(timezone.utc)
db.commit()
logger.info(f"Backtest {backtest_id} completed successfully")
@ -63,11 +63,11 @@ def _run_backtest_job(backtest_id: int) -> None:
# Update status to failed
try:
backtest = db.query(Backtest).get(backtest_id)
backtest = db.get(Backtest, backtest_id)
if backtest:
backtest.status = BacktestStatus.FAILED
backtest.error_message = str(e)[:1000] # Limit error message length
backtest.completed_at = datetime.utcnow()
backtest.completed_at = datetime.now(timezone.utc)
db.commit()
except Exception as commit_error:
logger.exception(f"Failed to update backtest status: {commit_error}")

View File

@ -4,7 +4,7 @@ Base collector class for data collection jobs.
import logging
import re
from abc import ABC, abstractmethod
from datetime import datetime
from datetime import datetime, timezone
from typing import Optional
import requests
@ -44,7 +44,7 @@ class BaseCollector(ABC):
self.job_log = JobLog(
job_name=self.job_name,
status="running",
started_at=datetime.utcnow(),
started_at=datetime.now(timezone.utc),
)
self.db.add(self.job_log)
self.db.commit()
@ -55,7 +55,7 @@ class BaseCollector(ABC):
if self.job_log:
try:
self.job_log.status = "success"
self.job_log.finished_at = datetime.utcnow()
self.job_log.finished_at = datetime.now(timezone.utc)
self.job_log.records_count = records_count
self.db.commit()
except Exception:
@ -67,7 +67,7 @@ class BaseCollector(ABC):
if self.job_log:
try:
self.job_log.status = "failed"
self.job_log.finished_at = datetime.utcnow()
self.job_log.finished_at = datetime.now(timezone.utc)
self.job_log.error_msg = error_msg
self.db.commit()
except Exception: