"""galaxis-agent webhook server.""" import hashlib import hmac import logging from fastapi import FastAPI, Request, HTTPException logger = logging.getLogger(__name__) app = FastAPI(title="galaxis-agent") def verify_gitea_signature(payload: bytes, signature: str, secret: str) -> bool: """Verify Gitea webhook HMAC-SHA256 signature.""" expected = hmac.new(secret.encode(), payload, hashlib.sha256).hexdigest() return hmac.compare_digest(expected, signature) def generate_thread_id(repo: str, issue_id: int) -> str: """Generate deterministic thread ID from issue.""" raw = hashlib.sha256(f"gitea-issue:{repo}:{issue_id}".encode()).hexdigest() return f"{raw[:8]}-{raw[8:12]}-{raw[12:16]}-{raw[16:20]}-{raw[20:32]}" @app.get("/health") async def health(): return {"status": "ok"} @app.post("/webhooks/gitea") async def gitea_webhook(request: Request): """Gitea webhook endpoint. Full implementation in Phase 3.""" import os body = await request.body() signature = request.headers.get("X-Gitea-Signature", "") secret = os.environ.get("GITEA_WEBHOOK_SECRET", "") if not verify_gitea_signature(body, signature, secret): raise HTTPException(status_code=401, detail="Invalid signature") logger.info("Gitea webhook received (not yet implemented)") return {"status": "received"}