41 lines
1.3 KiB
Python
41 lines
1.3 KiB
Python
"""Gitea 이슈/PR 코멘트 작성 도구."""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import logging
|
|
import os
|
|
from typing import Any
|
|
|
|
from agent.utils.gitea_client import get_gitea_client
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def _get_repo_info() -> tuple[str, str]:
|
|
owner = os.environ.get("DEFAULT_REPO_OWNER", "quant")
|
|
repo = os.environ.get("DEFAULT_REPO_NAME", "galaxis-po")
|
|
return owner, repo
|
|
|
|
|
|
def gitea_comment(message: str, issue_number: int) -> dict[str, Any]:
|
|
if not issue_number or issue_number <= 0:
|
|
return {"success": False, "error": "유효한 issue_number가 필요합니다."}
|
|
|
|
if not message.strip():
|
|
return {"success": False, "error": "빈 메시지는 작성할 수 없습니다."}
|
|
|
|
owner, repo = _get_repo_info()
|
|
client = get_gitea_client()
|
|
|
|
try:
|
|
result = asyncio.run(
|
|
client.create_issue_comment(
|
|
owner=owner, repo=repo, issue_number=issue_number, body=message
|
|
)
|
|
)
|
|
logger.info("Posted comment on %s/%s#%d", owner, repo, issue_number)
|
|
return {"success": True, "comment_id": result.get("id")}
|
|
except Exception as e:
|
|
logger.exception("Failed to post comment on %s/%s#%d", owner, repo, issue_number)
|
|
return {"success": False, "error": str(e)}
|