- Add parse_gitea_event() function to parse issue comments, labels, and PR review requests - Detect @agent mentions in issue comments and strip them from message - Implement rate limiting with slowapi (10 requests/minute) - Integrate with PersistentTaskQueue and MessageStore - Queue messages if task is already running for the same thread - Add 6 comprehensive tests for event parsing and signature verification
95 lines
3.3 KiB
Python
95 lines
3.3 KiB
Python
"""Tests for Gitea webhook event parsing and signature verification."""
|
|
import hashlib
|
|
import hmac
|
|
|
|
|
|
def make_signature(payload: bytes, secret: str) -> str:
|
|
"""Gitea HMAC-SHA256 서명을 생성한다."""
|
|
return hmac.new(secret.encode(), payload, hashlib.sha256).hexdigest()
|
|
|
|
|
|
def test_verify_signature():
|
|
"""Gitea webhook 서명을 검증한다."""
|
|
from agent.webapp import verify_gitea_signature
|
|
|
|
payload = b'{"action": "created"}'
|
|
secret = "test-secret"
|
|
sig = make_signature(payload, secret)
|
|
assert verify_gitea_signature(payload, sig, secret) is True
|
|
assert verify_gitea_signature(payload, "wrong", secret) is False
|
|
|
|
|
|
def test_generate_thread_id():
|
|
"""결정론적 스레드 ID를 생성한다."""
|
|
from agent.webapp import generate_thread_id
|
|
|
|
tid1 = generate_thread_id("galaxis-po", 42)
|
|
tid2 = generate_thread_id("galaxis-po", 42)
|
|
tid3 = generate_thread_id("galaxis-po", 43)
|
|
assert tid1 == tid2
|
|
assert tid1 != tid3
|
|
assert len(tid1) == 36
|
|
assert tid1.count("-") == 4
|
|
|
|
|
|
def test_parse_issue_comment_with_mention():
|
|
"""이슈 코멘트에서 @agent 멘션을 감지한다."""
|
|
from agent.webapp import parse_gitea_event
|
|
|
|
payload = {
|
|
"action": "created",
|
|
"comment": {"body": "@agent factor_calculator에 듀얼 모멘텀 추가해줘"},
|
|
"issue": {"number": 42, "title": "Feature request", "body": "description"},
|
|
"repository": {"full_name": "quant/galaxis-po", "name": "galaxis-po"},
|
|
}
|
|
result = parse_gitea_event("issue_comment", payload)
|
|
assert result is not None
|
|
assert result["should_process"] is True
|
|
assert result["issue_number"] == 42
|
|
assert result["repo_name"] == "galaxis-po"
|
|
assert "@agent" not in result["message"]
|
|
|
|
|
|
def test_parse_issue_comment_without_mention():
|
|
"""@agent 멘션이 없는 코멘트는 무시한다."""
|
|
from agent.webapp import parse_gitea_event
|
|
|
|
payload = {
|
|
"action": "created",
|
|
"comment": {"body": "일반 코멘트입니다"},
|
|
"issue": {"number": 42, "title": "Bug", "body": "desc"},
|
|
"repository": {"full_name": "quant/galaxis-po", "name": "galaxis-po"},
|
|
}
|
|
result = parse_gitea_event("issue_comment", payload)
|
|
assert result["should_process"] is False
|
|
|
|
|
|
def test_parse_issue_label_agent_fix():
|
|
"""agent-fix 라벨 부착 시 작업을 트리거한다."""
|
|
from agent.webapp import parse_gitea_event
|
|
|
|
payload = {
|
|
"action": "label_updated",
|
|
"issue": {"number": 10, "title": "Fix login", "body": "Login fails"},
|
|
"label": {"name": "agent-fix"},
|
|
"repository": {"full_name": "quant/galaxis-po", "name": "galaxis-po"},
|
|
}
|
|
result = parse_gitea_event("issues", payload)
|
|
assert result is not None
|
|
assert result["should_process"] is True
|
|
|
|
|
|
def test_parse_pr_review_requested():
|
|
"""PR 리뷰 요청을 감지한다."""
|
|
from agent.webapp import parse_gitea_event
|
|
|
|
payload = {
|
|
"action": "review_requested",
|
|
"pull_request": {"number": 5, "title": "feat: add feature", "body": "desc"},
|
|
"repository": {"full_name": "quant/galaxis-po", "name": "galaxis-po"},
|
|
}
|
|
result = parse_gitea_event("pull_request", payload)
|
|
assert result is not None
|
|
assert result["should_process"] is True
|
|
assert result["issue_number"] == 5
|