feat: add base collector infrastructure for data collection jobs

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
zephyrdark 2026-02-02 23:34:41 +09:00
parent 61e2748e9d
commit 3abbdfa5b6
3 changed files with 71 additions and 0 deletions

View File

@ -0,0 +1,3 @@
from app.services.collectors import BaseCollector
__all__ = ["BaseCollector"]

View File

@ -0,0 +1,3 @@
from app.services.collectors.base import BaseCollector
__all__ = ["BaseCollector"]

View File

@ -0,0 +1,65 @@
"""
Base collector class for data collection jobs.
"""
from abc import ABC, abstractmethod
from datetime import datetime
from typing import Optional
from sqlalchemy.orm import Session
from app.models.stock import JobLog
class BaseCollector(ABC):
"""Base class for all data collectors."""
def __init__(self, db: Session):
self.db = db
self.job_name = self.__class__.__name__
self.job_log: Optional[JobLog] = None
def start_job(self) -> JobLog:
"""Create a job log entry when starting."""
self.job_log = JobLog(
job_name=self.job_name,
status="running",
started_at=datetime.utcnow(),
)
self.db.add(self.job_log)
self.db.commit()
return self.job_log
def complete_job(self, records_count: int):
"""Mark job as completed."""
if self.job_log:
self.job_log.status = "success"
self.job_log.finished_at = datetime.utcnow()
self.job_log.records_count = records_count
self.db.commit()
def fail_job(self, error_msg: str):
"""Mark job as failed."""
if self.job_log:
self.job_log.status = "failed"
self.job_log.finished_at = datetime.utcnow()
self.job_log.error_msg = error_msg
self.db.commit()
@abstractmethod
def collect(self) -> int:
"""
Perform the data collection.
Returns the number of records collected.
"""
pass
def run(self) -> JobLog:
"""Execute the collection job with logging."""
self.start_job()
try:
records = self.collect()
self.complete_job(records)
except Exception as e:
self.fail_job(str(e))
raise
return self.job_log