galaxis-agent/agent/webapp.py
머니페니 7e95aeb8ce feat: integrate Discord bot and Dispatcher in FastAPI lifespan
Adds asynccontextmanager lifespan to webapp.py that:
- Initializes task_queue and message_store on startup
- Starts Dispatcher as background task
- Starts Discord bot gateway if DISCORD_TOKEN is set
- Properly shuts down all resources on application exit

This completes Phase 3 Task 6, enabling the webapp to run Discord bot
and dispatcher concurrently with the webhook server.
2026-03-20 18:21:59 +09:00

182 lines
5.9 KiB
Python

"""galaxis-agent webhook server."""
import asyncio
import hashlib
import hmac
import json
import logging
import os
import re
from contextlib import asynccontextmanager
from fastapi import FastAPI, Request, HTTPException
from slowapi import Limiter
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
from slowapi.middleware import SlowAPIMiddleware
logger = logging.getLogger(__name__)
@asynccontextmanager
async def lifespan(app: FastAPI):
"""애플리케이션 시작/종료 시 리소스를 관리한다."""
from agent.task_queue import get_task_queue
from agent.message_store import get_message_store
from agent.dispatcher import Dispatcher
from agent.integrations.discord_handler import DiscordHandler
task_queue = await get_task_queue()
message_store = await get_message_store()
dispatcher = Dispatcher(task_queue=task_queue)
await dispatcher.start()
app.state.dispatcher = dispatcher
discord_token = os.environ.get("DISCORD_TOKEN", "")
discord_handler = None
if discord_token:
discord_handler = DiscordHandler()
discord_task = asyncio.create_task(discord_handler.start(discord_token))
app.state.discord_handler = discord_handler
logger.info("Discord bot starting...")
yield
await dispatcher.stop()
if discord_handler:
await discord_handler.close()
await task_queue.close()
await message_store.close()
logger.info("Application shutdown complete")
app = FastAPI(title="galaxis-agent", lifespan=lifespan)
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_middleware(SlowAPIMiddleware)
def verify_gitea_signature(payload: bytes, signature: str, secret: str) -> bool:
"""Verify Gitea webhook HMAC-SHA256 signature."""
expected = hmac.new(secret.encode(), payload, hashlib.sha256).hexdigest()
return hmac.compare_digest(expected, signature)
def generate_thread_id(repo: str, issue_id: int) -> str:
"""Generate deterministic thread ID from issue."""
raw = hashlib.sha256(f"gitea-issue:{repo}:{issue_id}".encode()).hexdigest()
return f"{raw[:8]}-{raw[8:12]}-{raw[12:16]}-{raw[16:20]}-{raw[20:32]}"
def parse_gitea_event(event_type: str, payload: dict) -> dict:
"""Gitea webhook 페이로드를 파싱하여 처리 대상인지 판단한다."""
repo = payload.get("repository", {})
repo_name = repo.get("name", "")
full_name = repo.get("full_name", "")
repo_owner = full_name.split("/")[0] if "/" in full_name else ""
base = {
"should_process": False,
"issue_number": 0,
"repo_name": repo_name,
"repo_owner": repo_owner,
"message": "",
"event_type": event_type,
"title": "",
}
if event_type == "issue_comment":
action = payload.get("action", "")
if action != "created":
return base
comment_body = payload.get("comment", {}).get("body", "")
issue = payload.get("issue", {})
if "@agent" not in comment_body.lower():
return base
message = re.sub(r"@agent\b", "", comment_body, flags=re.IGNORECASE).strip()
base.update({
"should_process": True,
"issue_number": issue.get("number", 0),
"message": message,
"title": issue.get("title", ""),
})
return base
if event_type == "issues":
label = payload.get("label", {})
if label.get("name") == "agent-fix":
issue = payload.get("issue", {})
base.update({
"should_process": True,
"issue_number": issue.get("number", 0),
"message": issue.get("body", ""),
"title": issue.get("title", ""),
})
return base
if event_type == "pull_request":
action = payload.get("action", "")
if action == "review_requested":
pr = payload.get("pull_request", {})
base.update({
"should_process": True,
"issue_number": pr.get("number", 0),
"message": pr.get("body", ""),
"title": pr.get("title", ""),
})
return base
return base
@app.get("/health")
async def health():
return {"status": "ok"}
@app.post("/webhooks/gitea")
@limiter.limit("10/minute")
async def gitea_webhook(request: Request):
"""Gitea webhook endpoint with event parsing and task dispatch."""
payload_bytes = await request.body()
signature = request.headers.get("X-Gitea-Signature", "")
secret = os.environ.get("GITEA_WEBHOOK_SECRET", "")
if not verify_gitea_signature(payload_bytes, signature, secret):
raise HTTPException(status_code=401, detail="Invalid signature")
payload = json.loads(payload_bytes)
event_type = request.headers.get("X-Gitea-Event", "")
event = parse_gitea_event(event_type, payload)
if not event["should_process"]:
return {"status": "ignored"}
thread_id = generate_thread_id(event["repo_name"], event["issue_number"])
from agent.message_store import get_message_store
from agent.task_queue import get_task_queue
task_queue = await get_task_queue()
if await task_queue.has_running_task(thread_id):
store = await get_message_store()
await store.push_message(thread_id, {
"role": "human",
"content": event["message"],
})
return {"status": "queued_message", "thread_id": thread_id}
task_id = await task_queue.enqueue(
thread_id=thread_id,
source="gitea",
payload={
"issue_number": event["issue_number"],
"repo_owner": event["repo_owner"],
"repo_name": event["repo_name"],
"message": event["message"],
"title": event["title"],
"event_type": event["event_type"],
},
)
return {"status": "enqueued", "task_id": task_id, "thread_id": thread_id}