"""galaxis-agent webhook server.""" import asyncio import hashlib import hmac import json import logging import os import re from contextlib import asynccontextmanager from fastapi import FastAPI, Request, HTTPException from slowapi import Limiter from slowapi.util import get_remote_address from slowapi.errors import RateLimitExceeded from slowapi.middleware import SlowAPIMiddleware logger = logging.getLogger(__name__) @asynccontextmanager async def lifespan(app: FastAPI): """애플리케이션 시작/종료 시 리소스를 관리한다.""" from agent.task_queue import get_task_queue from agent.message_store import get_message_store from agent.dispatcher import Dispatcher from agent.integrations.discord_handler import DiscordHandler from agent.json_logging import setup_logging from agent.recovery import recover_on_startup, ContainerCleaner from agent.cost_guard import get_cost_guard from agent.task_history import get_task_history # 구조화 로깅 설정 setup_logging(log_format=os.environ.get("LOG_FORMAT", "json")) task_queue = await get_task_queue() message_store = await get_message_store() # 서버 시작 시 복구 await recover_on_startup(task_queue) # CostGuard + TaskHistory 초기화 cost_guard = await get_cost_guard() task_history = await get_task_history() # Dispatcher에 CostGuard + TaskHistory 주입 dispatcher = Dispatcher(task_queue=task_queue, cost_guard=cost_guard, task_history=task_history) await dispatcher.start() app.state.dispatcher = dispatcher # ContainerCleaner 시작 container_cleaner = None try: import docker docker_client = docker.from_env() sandbox_timeout = int(os.environ.get("SANDBOX_TIMEOUT", "600")) container_cleaner = ContainerCleaner( docker_client=docker_client, max_age_seconds=sandbox_timeout * 2, ) await container_cleaner.start() except Exception: logger.debug("Docker not available, container cleanup disabled") discord_token = os.environ.get("DISCORD_TOKEN", "") discord_handler = None if discord_token: discord_handler = DiscordHandler() discord_task = asyncio.create_task(discord_handler.start(discord_token)) app.state.discord_handler = discord_handler logger.info("Discord bot starting...") yield await dispatcher.stop() if container_cleaner: await container_cleaner.stop() if discord_handler: await discord_handler.close() await cost_guard.close() await task_history.close() await task_queue.close() await message_store.close() logger.info("Application shutdown complete") app = FastAPI(title="galaxis-agent", lifespan=lifespan) limiter = Limiter(key_func=get_remote_address) app.state.limiter = limiter app.add_middleware(SlowAPIMiddleware) def verify_gitea_signature(payload: bytes, signature: str, secret: str) -> bool: """Verify Gitea webhook HMAC-SHA256 signature.""" expected = hmac.new(secret.encode(), payload, hashlib.sha256).hexdigest() return hmac.compare_digest(expected, signature) def generate_thread_id(repo: str, issue_id: int) -> str: """Generate deterministic thread ID from issue.""" raw = hashlib.sha256(f"gitea-issue:{repo}:{issue_id}".encode()).hexdigest() return f"{raw[:8]}-{raw[8:12]}-{raw[12:16]}-{raw[16:20]}-{raw[20:32]}" def parse_gitea_event(event_type: str, payload: dict) -> dict: """Gitea webhook 페이로드를 파싱하여 처리 대상인지 판단한다.""" repo = payload.get("repository", {}) repo_name = repo.get("name", "") full_name = repo.get("full_name", "") repo_owner = full_name.split("/")[0] if "/" in full_name else "" base = { "should_process": False, "issue_number": 0, "repo_name": repo_name, "repo_owner": repo_owner, "message": "", "event_type": event_type, "title": "", } if event_type == "issue_comment": action = payload.get("action", "") if action != "created": return base comment_body = payload.get("comment", {}).get("body", "") issue = payload.get("issue", {}) if "@agent" not in comment_body.lower(): return base message = re.sub(r"@agent\b", "", comment_body, flags=re.IGNORECASE).strip() base.update({ "should_process": True, "issue_number": issue.get("number", 0), "message": message, "title": issue.get("title", ""), }) return base if event_type == "issues": label = payload.get("label", {}) if label.get("name") == "agent-fix": issue = payload.get("issue", {}) base.update({ "should_process": True, "issue_number": issue.get("number", 0), "message": issue.get("body", ""), "title": issue.get("title", ""), }) return base if event_type == "pull_request": action = payload.get("action", "") if action == "review_requested": pr = payload.get("pull_request", {}) base.update({ "should_process": True, "issue_number": pr.get("number", 0), "message": pr.get("body", ""), "title": pr.get("title", ""), }) return base return base @app.get("/health") async def health(): return {"status": "ok"} @app.get("/health/gitea") async def health_gitea(): """Gitea 연결 상태를 확인한다.""" try: from agent.utils.gitea_client import get_gitea_client client = get_gitea_client() resp = await client._client.get("/settings/api") return {"status": "ok", "gitea_status_code": resp.status_code} except Exception as e: return {"status": "error", "error": str(e)} @app.get("/health/discord") async def health_discord(request: Request): """Discord 봇 연결 상태를 확인한다.""" handler = getattr(request.app.state, "discord_handler", None) if not handler: return {"status": "not_configured"} bot = handler.bot if bot.is_ready(): return {"status": "ok", "user": str(bot.user)} return {"status": "connecting"} @app.get("/health/queue") async def health_queue(): """작업 큐 상태를 반환한다.""" from agent.task_queue import get_task_queue task_queue = await get_task_queue() pending = await task_queue.get_pending() return { "status": "ok", "pending_tasks": len(pending), } @app.get("/health/costs") async def health_costs(): """API 비용 현황을 반환한다.""" from agent.cost_guard import get_cost_guard guard = await get_cost_guard() return await guard.get_daily_summary() @app.post("/webhooks/gitea") @limiter.limit("10/minute") async def gitea_webhook(request: Request): """Gitea webhook endpoint with event parsing and task dispatch.""" payload_bytes = await request.body() signature = request.headers.get("X-Gitea-Signature", "") secret = os.environ.get("GITEA_WEBHOOK_SECRET", "") if not verify_gitea_signature(payload_bytes, signature, secret): raise HTTPException(status_code=401, detail="Invalid signature") payload = json.loads(payload_bytes) event_type = request.headers.get("X-Gitea-Event", "") event = parse_gitea_event(event_type, payload) if not event["should_process"]: return {"status": "ignored"} thread_id = generate_thread_id(event["repo_name"], event["issue_number"]) from agent.message_store import get_message_store from agent.task_queue import get_task_queue task_queue = await get_task_queue() if await task_queue.has_running_task(thread_id): store = await get_message_store() await store.push_message(thread_id, { "role": "human", "content": event["message"], }) return {"status": "queued_message", "thread_id": thread_id} task_id = await task_queue.enqueue( thread_id=thread_id, source="gitea", payload={ "issue_number": event["issue_number"], "repo_owner": event["repo_owner"], "repo_name": event["repo_name"], "message": event["message"], "title": event["title"], "event_type": event["event_type"], }, ) return {"status": "enqueued", "task_id": task_id, "thread_id": thread_id}