galaxis-agent/agent/webapp.py

43 lines
1.3 KiB
Python

"""galaxis-agent webhook server."""
import hashlib
import hmac
import logging
from fastapi import FastAPI, Request, HTTPException
logger = logging.getLogger(__name__)
app = FastAPI(title="galaxis-agent")
def verify_gitea_signature(payload: bytes, signature: str, secret: str) -> bool:
"""Verify Gitea webhook HMAC-SHA256 signature."""
expected = hmac.new(secret.encode(), payload, hashlib.sha256).hexdigest()
return hmac.compare_digest(expected, signature)
def generate_thread_id(repo: str, issue_id: int) -> str:
"""Generate deterministic thread ID from issue."""
raw = hashlib.sha256(f"gitea-issue:{repo}:{issue_id}".encode()).hexdigest()
return f"{raw[:8]}-{raw[8:12]}-{raw[12:16]}-{raw[16:20]}-{raw[20:32]}"
@app.get("/health")
async def health():
return {"status": "ok"}
@app.post("/webhooks/gitea")
async def gitea_webhook(request: Request):
"""Gitea webhook endpoint. Full implementation in Phase 3."""
import os
body = await request.body()
signature = request.headers.get("X-Gitea-Signature", "")
secret = os.environ.get("GITEA_WEBHOOK_SECRET", "")
if not verify_gitea_signature(body, signature, secret):
raise HTTPException(status_code=401, detail="Invalid signature")
logger.info("Gitea webhook received (not yet implemented)")
return {"status": "received"}