galaxis-agent/agent/webapp.py
2026-03-20 14:38:07 +09:00

1494 lines
53 KiB
Python

"""Custom FastAPI routes for LangGraph server."""
import hashlib
import hmac
import json
import logging
import os
import re
import uuid
from typing import Any
import httpx
from fastapi import BackgroundTasks, FastAPI, HTTPException, Request
from langchain_core.messages.content import create_text_block
from langgraph_sdk import get_client
from langgraph_sdk.client import LangGraphClient
from .utils.auth import (
is_bot_token_only_mode,
persist_encrypted_github_token,
resolve_github_token_from_email,
)
from .utils.comments import get_recent_comments
from .utils.github_app import get_github_app_installation_token
from .utils.github_comments import (
OPEN_SWE_TAGS,
build_pr_prompt,
extract_pr_context,
fetch_issue_comments,
fetch_pr_comments_since_last_tag,
format_github_comment_body_for_prompt,
get_thread_id_from_branch,
react_to_github_comment,
sanitize_github_comment_body,
verify_github_signature,
)
from .utils.github_token import get_github_token_from_thread
from .utils.github_user_email_map import GITHUB_USER_EMAIL_MAP
from .utils.linear import post_linear_trace_comment
from .utils.linear_team_repo_map import LINEAR_TEAM_TO_REPO
from .utils.multimodal import dedupe_urls, extract_image_urls, fetch_image_block
from .utils.slack import (
add_slack_reaction,
fetch_slack_thread_messages,
format_slack_messages_for_prompt,
get_slack_user_info,
get_slack_user_names,
post_slack_thread_reply,
post_slack_trace_reply,
select_slack_context_messages,
strip_bot_mention,
verify_slack_signature,
)
logger = logging.getLogger(__name__)
app = FastAPI()
LINEAR_WEBHOOK_SECRET = os.environ.get("LINEAR_WEBHOOK_SECRET", "")
GITHUB_WEBHOOK_SECRET = os.environ.get("GITHUB_WEBHOOK_SECRET", "")
SLACK_SIGNING_SECRET = os.environ.get("SLACK_SIGNING_SECRET", "")
SLACK_BOT_USER_ID = os.environ.get("SLACK_BOT_USER_ID", "")
SLACK_BOT_USERNAME = os.environ.get("SLACK_BOT_USERNAME", "")
SLACK_REPO_OWNER = os.environ.get("SLACK_REPO_OWNER", "langchain-ai")
SLACK_REPO_NAME = os.environ.get("SLACK_REPO_NAME", "open-swe")
LANGGRAPH_URL = os.environ.get("LANGGRAPH_URL") or os.environ.get(
"LANGGRAPH_URL_PROD", "http://localhost:2024"
)
_AGENT_VERSION_METADATA: dict[str, str] = (
{"LANGSMITH_AGENT_VERSION": os.environ["LANGCHAIN_REVISION_ID"]}
if os.environ.get("LANGCHAIN_REVISION_ID")
else {}
)
ALLOWED_GITHUB_ORGS: frozenset[str] = frozenset(
org.strip().lower()
for org in os.environ.get("ALLOWED_GITHUB_ORGS", "").split(",")
if org.strip()
)
LINEAR_API_KEY = os.environ.get("LINEAR_API_KEY", "")
_GITHUB_BOT_MESSAGE_PREFIXES = (
"🔐 **GitHub Authentication Required**",
"✅ **Pull Request Created**",
"✅ **Pull Request Updated**",
"**Pull Request Created**",
"**Pull Request Updated**",
"🤖 **Agent Response**",
"❌ **Agent Error**",
)
def get_repo_config_from_team_mapping(
team_identifier: str, project_name: str = ""
) -> dict[str, str]:
"""
Look up repository configuration from LINEAR_TEAM_TO_REPO mapping.
Supports both legacy flat mapping (team -> repo) and new nested mapping (team -> project -> repo).
Args:
team_identifier: Team name or ID to look up (e.g., "LangChain OSS")
project_name: Name of the project (e.g., "deepagents")
Returns:
Repository config dict with 'owner' and 'name' keys. Defaults to langchainplus if not found.
"""
if not team_identifier or team_identifier not in LINEAR_TEAM_TO_REPO:
return {"owner": "langchain-ai", "name": "langchainplus"}
config = LINEAR_TEAM_TO_REPO[team_identifier]
if "owner" in config and "name" in config:
return config
if "projects" in config and project_name:
project_config = config["projects"].get(project_name)
if project_config:
return project_config
if "default" in config:
return config["default"]
return {"owner": "langchain-ai", "name": "langchainplus"}
async def react_to_linear_comment(comment_id: str, emoji: str = "👀") -> bool:
"""Add an emoji reaction to a Linear comment.
Args:
comment_id: The Linear comment ID
emoji: The emoji to react with (default: eyes 👀)
Returns:
True if successful, False otherwise
"""
if not LINEAR_API_KEY:
return False
url = "https://api.linear.app/graphql"
mutation = """
mutation ReactionCreate($commentId: String!, $emoji: String!) {
reactionCreate(input: { commentId: $commentId, emoji: $emoji }) {
success
}
}
"""
async with httpx.AsyncClient() as client:
try:
response = await client.post(
url,
headers={
"Authorization": LINEAR_API_KEY,
"Content-Type": "application/json",
},
json={
"query": mutation,
"variables": {"commentId": comment_id, "emoji": emoji},
},
)
response.raise_for_status()
result = response.json()
return bool(result.get("data", {}).get("reactionCreate", {}).get("success"))
except Exception: # noqa: BLE001
return False
async def fetch_linear_issue_details(issue_id: str) -> dict[str, Any] | None:
"""Fetch full issue details from Linear API including description and comments.
Args:
issue_id: The Linear issue ID
Returns:
Full issue data dict, or None if fetch failed
"""
if not LINEAR_API_KEY:
return None
url = "https://api.linear.app/graphql"
query = """
query GetIssue($issueId: String!) {
issue(id: $issueId) {
id
identifier
title
description
url
project {
id
name
}
team {
id
name
key
}
comments {
nodes {
id
body
createdAt
user {
id
name
email
}
}
}
}
}
"""
async with httpx.AsyncClient() as client:
try:
response = await client.post(
url,
headers={
"Authorization": LINEAR_API_KEY,
"Content-Type": "application/json",
},
json={
"query": query,
"variables": {"issueId": issue_id},
},
)
response.raise_for_status()
result = response.json()
return result.get("data", {}).get("issue")
except httpx.HTTPError:
return None
def generate_thread_id_from_issue(issue_id: str) -> str:
"""Generate a deterministic thread ID from a Linear issue ID.
Args:
issue_id: The Linear issue ID
Returns:
A UUID-formatted thread ID derived from the issue ID
"""
hash_bytes = hashlib.sha256(f"linear-issue:{issue_id}".encode()).hexdigest()
return (
f"{hash_bytes[:8]}-{hash_bytes[8:12]}-{hash_bytes[12:16]}-"
f"{hash_bytes[16:20]}-{hash_bytes[20:32]}"
)
def generate_thread_id_from_github_issue(issue_id: str) -> str:
"""Generate a deterministic thread ID from a GitHub issue ID."""
hash_bytes = hashlib.sha256(f"github-issue:{issue_id}".encode()).hexdigest()
return (
f"{hash_bytes[:8]}-{hash_bytes[8:12]}-{hash_bytes[12:16]}-"
f"{hash_bytes[16:20]}-{hash_bytes[20:32]}"
)
def generate_thread_id_from_slack_thread(channel_id: str, thread_id: str) -> str:
"""Generate a deterministic thread ID from a Slack thread identifier."""
composite = f"{channel_id}:{thread_id}"
md5_hex = hashlib.md5(composite.encode("utf-8")).hexdigest()
return str(uuid.UUID(hex=md5_hex))
def _extract_repo_config_from_thread(thread: dict[str, Any]) -> dict[str, str] | None:
"""Extract repo config from persisted thread data."""
metadata = thread.get("metadata")
if not isinstance(metadata, dict):
return None
repo = metadata.get("repo")
if isinstance(repo, dict):
owner = repo.get("owner")
name = repo.get("name")
if isinstance(owner, str) and owner and isinstance(name, str) and name:
return {"owner": owner, "name": name}
owner = metadata.get("repo_owner")
name = metadata.get("repo_name")
if isinstance(owner, str) and owner and isinstance(name, str) and name:
return {"owner": owner, "name": name}
return None
def _is_not_found_error(exc: Exception) -> bool:
"""Best-effort check for LangGraph 404 errors."""
return getattr(exc, "status_code", None) == 404
def _is_repo_org_allowed(repo_config: dict[str, str]) -> bool:
"""Check if the repo owner/org is in the allowlist.
Returns True if no allowlist is configured (empty ALLOWED_GITHUB_ORGS),
or if the repo owner is in the allowlist.
"""
if not ALLOWED_GITHUB_ORGS:
return True
owner = repo_config.get("owner", "").lower()
return owner in ALLOWED_GITHUB_ORGS
async def _upsert_slack_thread_repo_metadata(
thread_id: str, repo_config: dict[str, str], langgraph_client: LangGraphClient
) -> None:
"""Persist the selected repo config on the thread metadata."""
try:
await langgraph_client.threads.update(thread_id=thread_id, metadata={"repo": repo_config})
except Exception as exc: # noqa: BLE001
if _is_not_found_error(exc):
try:
await langgraph_client.threads.create(
thread_id=thread_id,
if_exists="do_nothing",
metadata={"repo": repo_config},
)
except Exception: # noqa: BLE001
logger.exception(
"Failed to create Slack thread %s while persisting repo metadata",
thread_id,
)
return
logger.exception(
"Failed to persist Slack thread repo metadata for thread %s",
thread_id,
)
async def check_if_using_repo_msg_sent(
channel_id: str, thread_ts: str, using_repo_str: str
) -> bool:
thread_messages = await fetch_slack_thread_messages(channel_id, thread_ts)
for message in thread_messages:
if using_repo_str in message.get("text", ""):
return True
return False
async def get_slack_repo_config(message: str, channel_id: str, thread_ts: str) -> dict[str, str]:
"""Resolve repository configuration for Slack-triggered runs."""
default_owner = SLACK_REPO_OWNER.strip() or "langchain-ai"
default_name = SLACK_REPO_NAME.strip() or "langchainplus"
thread_id = generate_thread_id_from_slack_thread(channel_id, thread_ts)
langgraph_client = get_client(url=LANGGRAPH_URL)
owner: str | None = None
name: str | None = None
if "repo:" in message or "repo " in message:
match = re.search(r"repo[: ]([a-zA-Z0-9_.-]+/[a-zA-Z0-9_.-]+)", message)
if match:
owner, name = match.group(1).split("/", 1)
if not owner or not name:
github_match = re.search(r"github\.com/([a-zA-Z0-9_.-]+/[a-zA-Z0-9_.-]+)", message)
if github_match:
owner, name = github_match.group(1).split("/", 1)
if not owner or not name:
try:
thread = await langgraph_client.threads.get(thread_id)
thread_repo_config = _extract_repo_config_from_thread(thread)
if thread_repo_config:
owner = thread_repo_config["owner"]
name = thread_repo_config["name"]
except Exception as exc: # noqa: BLE001
if not _is_not_found_error(exc):
logger.exception(
"Failed to fetch Slack thread %s for repo resolution",
thread_id,
)
if not owner or not name:
owner = default_owner
name = default_name
using_repo_str = f"Using repository: `{owner}/{name}`"
if not await check_if_using_repo_msg_sent(channel_id, thread_ts, using_repo_str):
await post_slack_thread_reply(channel_id, thread_ts, using_repo_str)
return {"owner": owner, "name": name}
async def is_thread_active(thread_id: str) -> bool:
"""Check if a thread is currently active (has a running run).
Args:
thread_id: The LangGraph thread ID
Returns:
True if the thread status is "busy", False otherwise
"""
langgraph_client = get_client(url=LANGGRAPH_URL)
try:
logger.debug("Fetching thread status for %s from %s", thread_id, LANGGRAPH_URL)
thread = await langgraph_client.threads.get(thread_id)
status = thread.get("status", "idle")
logger.info(
"Thread %s status check: status=%s, is_busy=%s",
thread_id,
status,
status == "busy",
)
except Exception as e: # noqa: BLE001
logger.warning(
"Failed to get thread status for %s: %s (type: %s) - assuming not active",
thread_id,
e,
type(e).__name__,
)
status = "idle"
return status == "busy"
async def _thread_exists(thread_id: str) -> bool:
"""Return whether a LangGraph thread already exists."""
langgraph_client = get_client(url=LANGGRAPH_URL)
try:
await langgraph_client.threads.get(thread_id)
return True
except Exception as exc: # noqa: BLE001
if _is_not_found_error(exc):
return False
logger.warning("Failed to fetch thread %s, assuming it exists", thread_id)
return True
async def queue_message_for_thread(
thread_id: str, message_content: str | list[dict[str, Any]] | dict[str, Any]
) -> bool:
"""Queue a message for a thread that is currently active.
Stores the message in the langgraph store, namespaced to the thread.
Supports multiple queued messages by storing them as a list (FIFO order).
The before_model middleware will pick them up and inject them into state.
Args:
thread_id: The LangGraph thread ID
message_content: The message content to queue (text or content blocks)
Returns:
True if successfully queued, False otherwise
"""
langgraph_client = get_client(url=LANGGRAPH_URL)
try:
namespace = ("queue", thread_id)
key = "pending_messages"
new_message = {"content": message_content}
existing_messages: list[dict[str, Any]] = []
try:
existing_item = await langgraph_client.store.get_item(namespace, key)
if existing_item and existing_item.get("value"):
existing_messages = existing_item["value"].get("messages", [])
except Exception: # noqa: BLE001
logger.debug("No existing queued messages for thread %s", thread_id)
existing_messages.append(new_message)
value = {"messages": existing_messages}
logger.info(
"Attempting to queue message for thread %s (total queued: %d)",
thread_id,
len(existing_messages),
)
await langgraph_client.store.put_item(namespace, key, value)
logger.info("Successfully queued message for thread %s", thread_id)
return True # noqa: TRY300
except Exception:
logger.exception("Failed to queue message for thread %s", thread_id)
return False
async def process_linear_issue( # noqa: PLR0912, PLR0915
issue_data: dict[str, Any], repo_config: dict[str, str]
) -> None:
"""Process a Linear issue by creating a new LangGraph thread and run.
Args:
issue_data: The Linear issue data from webhook (basic info only).
repo_config: The repo configuration with owner and name.
"""
issue_id = issue_data.get("id", "")
logger.info(
"Processing Linear issue %s for repo %s/%s",
issue_id,
repo_config.get("owner"),
repo_config.get("name"),
)
triggering_comment_id = issue_data.get("triggering_comment_id", "")
if triggering_comment_id:
await react_to_linear_comment(triggering_comment_id, "👀")
thread_id = generate_thread_id_from_issue(issue_id)
full_issue = await fetch_linear_issue_details(issue_id)
if not full_issue:
full_issue = issue_data
user_email = None
user_name = None
comment_author = issue_data.get("comment_author", {})
if comment_author:
user_email = comment_author.get("email")
user_name = comment_author.get("name")
if not user_email:
creator = full_issue.get("creator", {})
if creator:
user_email = creator.get("email")
user_name = user_name or creator.get("name")
if not user_email:
assignee = full_issue.get("assignee", {})
if assignee:
user_email = assignee.get("email")
user_name = user_name or assignee.get("name")
logger.info("User email for issue %s: %s", issue_id, user_email)
title = full_issue.get("title", "No title")
description = full_issue.get("description") or "No description"
image_urls: list[str] = []
description_image_urls = extract_image_urls(description)
if description_image_urls:
image_urls.extend(description_image_urls)
logger.debug(
"Found %d image URL(s) in issue description",
len(description_image_urls),
)
comments = full_issue.get("comments", {}).get("nodes", [])
comments_text = ""
triggering_comment = issue_data.get("triggering_comment", "")
triggering_comment_id = issue_data.get("triggering_comment_id", "")
bot_message_prefixes = (
"🔐 **GitHub Authentication Required**",
"✅ **Pull Request Created**",
"✅ **Pull Request Updated**",
"**Pull Request Created**",
"**Pull Request Updated**",
"🤖 **Agent Response**",
"❌ **Agent Error**",
)
comment_ids: set[str] = set()
comment_id_to_index: dict[str, int] = {}
if comments:
for i, comment in enumerate(comments):
comment_id = comment.get("id", "")
if comment_id:
comment_ids.add(comment_id)
comment_id_to_index[comment_id] = i
relevant_comments = []
trigger_index = None
if triggering_comment_id:
trigger_index = comment_id_to_index.get(triggering_comment_id)
if trigger_index is not None:
relevant_comments = comments[trigger_index:]
logger.debug(
"Using triggering comment index %d to build relevant comments",
trigger_index,
)
else:
relevant_comments = get_recent_comments(comments, bot_message_prefixes)
if relevant_comments:
comments_text = "\n\n## Comments:\n"
for comment in relevant_comments:
user = comment.get("user") or {}
author = user.get("name", "User")
body = comment.get("body", "")
body_image_urls = extract_image_urls(body)
if body_image_urls:
image_urls.extend(body_image_urls)
logger.debug(
"Found %d image URL(s) in comment by %s",
len(body_image_urls),
author,
)
if any(body.startswith(prefix) for prefix in bot_message_prefixes):
continue
comments_text += f"\n**{author}:** {body}\n"
if triggering_comment and triggering_comment_id not in comment_ids:
if not comments_text:
comments_text = "\n\n## Comments:\n"
trigger_author = comment_author.get("name", "Unknown")
trigger_body = triggering_comment
trigger_image_urls = extract_image_urls(trigger_body)
if trigger_image_urls:
image_urls.extend(trigger_image_urls)
logger.debug(
"Found %d image URL(s) in triggering comment by %s",
len(trigger_image_urls),
trigger_author,
)
comments_text += f"\n**{trigger_author}:** {trigger_body}\n"
logger.debug(
"Appended triggering comment %s not present in issue comments list",
triggering_comment_id or "<missing-id>",
)
identifier = full_issue.get("identifier", "") or issue_data.get("identifier", "")
triggered_by_line = f"## Triggered by: {user_name}\n\n" if user_name else ""
tag_instruction = (
f"When calling linear_comment, tag @{user_name} if you are asking them a question, need their input, or are notifying them of something important (e.g. a completed PR). For simple answers, tagging is not required."
if user_name
else ""
)
prompt = (
f"Please work on the following issue:\n\n"
f"## Title: {title}\n\n"
f"{triggered_by_line}"
f"## Linear Ticket: {identifier} - Ticket ID: {issue_id}\n\n"
f"## Description:\n{description}\n"
f"{comments_text}\n\n"
f"Please analyze this issue and implement the necessary changes. "
f"When you're done, commit and push your changes. {tag_instruction}"
)
content_blocks: list[dict[str, Any]] = [create_text_block(prompt)]
if image_urls:
image_urls = dedupe_urls(image_urls)
logger.info("Preparing %d image(s) for multimodal content", len(image_urls))
logger.debug("Image URLs: %s", image_urls)
async with httpx.AsyncClient() as client:
for image_url in image_urls:
image_block = await fetch_image_block(image_url, client)
if image_block:
content_blocks.append(image_block)
logger.info("Built %d content block(s) for prompt", len(content_blocks))
linear_project_id = ""
linear_issue_number = ""
if identifier and "-" in identifier:
parts = identifier.split("-", 1)
linear_project_id = parts[0]
linear_issue_number = parts[1]
configurable: dict[str, Any] = {
"repo": repo_config,
"linear_issue": {
"id": issue_id,
"title": title,
"url": full_issue.get("url", "") or issue_data.get("url", ""),
"identifier": identifier,
"linear_project_id": linear_project_id,
"linear_issue_number": linear_issue_number,
"triggering_user_name": user_name or "",
},
"user_email": user_email,
"source": "linear",
}
logger.info("Checking if thread %s is active before creating run", thread_id)
thread_active = await is_thread_active(thread_id)
logger.info("Thread %s active status: %s", thread_id, thread_active)
if thread_active:
logger.info(
"Thread %s is active (busy), will queue message instead of creating run",
thread_id,
)
queued_payload = {"text": prompt, "image_urls": image_urls}
queued = await queue_message_for_thread(
thread_id=thread_id,
message_content=queued_payload,
)
if queued:
logger.info("Message queued for thread %s, will be processed by middleware", thread_id)
langgraph_client = get_client(url=LANGGRAPH_URL)
runs = await langgraph_client.runs.list(thread_id, limit=1)
if runs:
await post_linear_trace_comment(issue_id, runs[0]["run_id"], triggering_comment_id)
else:
logger.error("Failed to queue message for thread %s", thread_id)
else:
logger.info("Creating LangGraph run for thread %s", thread_id)
langgraph_client = get_client(url=LANGGRAPH_URL)
run = await langgraph_client.runs.create(
thread_id,
"agent",
input={"messages": [{"role": "user", "content": content_blocks}]},
config={"configurable": configurable, "metadata": _AGENT_VERSION_METADATA},
if_not_exists="create",
)
logger.info("LangGraph run created successfully for thread %s", thread_id)
await post_linear_trace_comment(issue_id, run["run_id"], triggering_comment_id)
async def process_slack_mention(event_data: dict[str, Any], repo_config: dict[str, str]) -> None:
"""Process a Slack app mention by creating or interrupting a thread run."""
channel_id = event_data.get("channel_id", "")
thread_ts = event_data.get("thread_ts", "")
event_ts = event_data.get("event_ts", "")
user_id = event_data.get("user_id", "")
text = event_data.get("text", "")
bot_user_id = event_data.get("bot_user_id", "")
if not channel_id or not thread_ts or not event_ts:
logger.warning(
"Missing Slack event fields (channel_id=%s, thread_ts=%s, event_ts=%s)",
channel_id,
thread_ts,
event_ts,
)
return
reacted = await add_slack_reaction(channel_id, event_ts, "eyes")
if not reacted:
logger.debug(
"Unable to add eyes reaction for Slack message ts=%s in channel=%s",
event_ts,
channel_id,
)
thread_id = generate_thread_id_from_slack_thread(channel_id, thread_ts)
user_email = None
user_name = ""
if user_id:
slack_user = await get_slack_user_info(user_id)
if slack_user:
profile = slack_user.get("profile", {})
if isinstance(profile, dict):
user_email = profile.get("email")
user_name = (
profile.get("display_name")
or profile.get("real_name")
or slack_user.get("real_name")
or slack_user.get("name")
or ""
)
thread_messages = await fetch_slack_thread_messages(channel_id, thread_ts)
if not any(str(message.get("ts")) == str(event_ts) for message in thread_messages):
thread_messages.append({"ts": event_ts, "text": text, "user": user_id})
context_messages, context_mode = select_slack_context_messages(
thread_messages, event_ts, bot_user_id, SLACK_BOT_USERNAME
)
context_user_ids = [
value
for value in (message.get("user") for message in context_messages)
if isinstance(value, str) and value
]
user_names_by_id = await get_slack_user_names(context_user_ids)
if user_id and user_name and user_id not in user_names_by_id:
user_names_by_id[user_id] = user_name
context_text = format_slack_messages_for_prompt(
context_messages,
user_names_by_id,
bot_user_id=bot_user_id,
bot_username=SLACK_BOT_USERNAME,
)
context_source = (
"the previous message where I was tagged"
if context_mode == "last_mention"
else "the beginning of the thread"
)
clean_text = (
strip_bot_mention(text, bot_user_id, bot_username=SLACK_BOT_USERNAME)
or "(no text in mention)"
)
trigger_user = user_name or (f"<@{user_id}>" if user_id else "Unknown user")
prompt = (
"You were mentioned in Slack.\n\n"
f"## Repository\n{repo_config.get('owner')}/{repo_config.get('name')}\n\n"
f"## Triggered by\n{trigger_user}\n\n"
f"## Slack Thread\n- Channel: {channel_id}\n- Thread TS: {thread_ts}\n"
f"- Context starts at: {context_source}\n\n"
f"## Conversation Context\n{context_text}\n\n"
f"## Latest Mention Request\n{clean_text}\n\n"
"Use `slack_thread_reply` to communicate in this Slack thread for clarifications, "
"status updates, and final summaries."
)
content_blocks: list[dict[str, Any]] = [create_text_block(prompt)]
configurable: dict[str, Any] = {
"repo": repo_config,
"slack_thread": {
"channel_id": channel_id,
"thread_ts": thread_ts,
"triggering_user_id": user_id,
"triggering_user_name": user_name,
"triggering_user_email": user_email,
"triggering_event_ts": event_ts,
},
"user_email": user_email,
"source": "slack",
}
langgraph_client = get_client(url=LANGGRAPH_URL)
await _upsert_slack_thread_repo_metadata(thread_id, repo_config, langgraph_client)
thread_active = await is_thread_active(thread_id)
if thread_active:
logger.info(
"Thread %s is active, queuing Slack message for middleware pickup",
thread_id,
)
queued_payload = {"text": prompt, "image_urls": []}
queued = await queue_message_for_thread(
thread_id=thread_id,
message_content=queued_payload,
)
if queued:
logger.info("Slack message queued for thread %s", thread_id)
else:
logger.error("Failed to queue Slack message for thread %s", thread_id)
return
run = await langgraph_client.runs.create(
thread_id,
"agent",
input={"messages": [{"role": "user", "content": content_blocks}]},
config={"configurable": configurable, "metadata": _AGENT_VERSION_METADATA},
if_not_exists="create",
multitask_strategy="interrupt",
)
await post_slack_trace_reply(channel_id, thread_ts, run["run_id"])
def verify_linear_signature(body: bytes, signature: str, secret: str) -> bool:
"""Verify the Linear webhook signature.
Args:
body: Raw request body bytes
signature: The Linear-Signature header value
secret: The webhook signing secret
Returns:
True if signature is valid, False otherwise
"""
if not secret:
logger.warning("LINEAR_WEBHOOK_SECRET is not configured — rejecting webhook request")
return False
expected = hmac.new(secret.encode("utf-8"), body, hashlib.sha256).hexdigest()
return hmac.compare_digest(expected, signature)
@app.post("/webhooks/linear")
async def linear_webhook( # noqa: PLR0911, PLR0912, PLR0915
request: Request, background_tasks: BackgroundTasks
) -> dict[str, str]:
"""Handle Linear webhooks.
Triggers a new LangGraph run when an issue gets the 'open-swe' label added.
"""
logger.info("Received Linear webhook")
body = await request.body()
signature = request.headers.get("Linear-Signature", "")
if not verify_linear_signature(body, signature, LINEAR_WEBHOOK_SECRET):
logger.warning("Invalid webhook signature")
raise HTTPException(status_code=401, detail="Invalid signature")
try:
payload = json.loads(body)
except json.JSONDecodeError:
logger.exception("Failed to parse webhook JSON")
return {"status": "error", "message": "Invalid JSON"}
if payload.get("type") != "Comment":
logger.debug("Ignoring webhook: not a Comment event")
return {"status": "ignored", "reason": "Not a Comment event"}
action = payload.get("action")
if action != "create":
logger.debug("Ignoring webhook: action is %s, not create", action)
return {
"status": "ignored",
"reason": f"Comment action is '{action}', only processing 'create'",
}
data = payload.get("data", {})
if data.get("botActor"):
logger.debug("Ignoring webhook: comment is from a bot")
return {"status": "ignored", "reason": "Comment is from a bot"}
comment_body = data.get("body", "")
bot_message_prefixes = [
"🔐 **GitHub Authentication Required**",
"✅ **Pull Request Created**",
"✅ **Pull Request Updated**",
"**Pull Request Created**",
"**Pull Request Updated**",
"🤖 **Agent Response**",
"❌ **Agent Error**",
]
for prefix in bot_message_prefixes:
if comment_body.startswith(prefix):
logger.debug("Ignoring webhook: comment is our own bot message")
return {"status": "ignored", "reason": "Comment is our own bot message"}
if "@openswe" not in comment_body.lower():
logger.debug("Ignoring webhook: comment doesn't mention @openswe")
return {"status": "ignored", "reason": "Comment doesn't mention @openswe"}
issue = data.get("issue", {})
if not issue:
logger.debug("Ignoring webhook: no issue data in comment")
return {"status": "ignored", "reason": "No issue data in comment"}
# Fetch full issue details to get project info (webhook doesn't include it)
issue_id = issue.get("id", "")
full_issue = await fetch_linear_issue_details(issue_id)
if not full_issue:
logger.warning("Failed to fetch full issue details, using webhook data")
full_issue = issue
team = full_issue.get("team", {})
team_name = team.get("name", "") if team else ""
project = full_issue.get("project")
project_name = project.get("name", "") if project else ""
team_identifier = team_name.strip() if team_name else ""
project_key = project_name.strip() if project_name else ""
repo_config = get_repo_config_from_team_mapping(team_identifier, project_key)
logger.debug(
"Team/project lookup result",
extra={
"team_name": team_identifier,
"project_name": project_key,
"repo_config": repo_config,
},
)
if not _is_repo_org_allowed(repo_config):
logger.warning(
"Rejecting Linear webhook: org '%s' not in ALLOWED_GITHUB_ORGS",
repo_config.get("owner"),
)
return {"status": "ignored", "reason": "Repository org not in allowlist"}
repo_owner = repo_config["owner"]
repo_name = repo_config["name"]
issue["triggering_comment"] = comment_body
issue["triggering_comment_id"] = data.get("id", "")
comment_user = data.get("user", {})
if comment_user:
issue["comment_author"] = comment_user
logger.info(
"Accepted webhook for issue '%s' (%s), scheduling background task",
issue.get("title"),
issue.get("id"),
)
background_tasks.add_task(process_linear_issue, issue, repo_config)
return {
"status": "accepted",
"message": f"Processing issue '{issue.get('title')}' for repo {repo_owner}/{repo_name}",
}
@app.get("/webhooks/linear")
async def linear_webhook_verify() -> dict[str, str]:
"""Verify endpoint for Linear webhook setup."""
return {"status": "ok", "message": "Linear webhook endpoint is active"}
@app.post("/webhooks/slack")
async def slack_webhook(request: Request, background_tasks: BackgroundTasks) -> dict[str, str]:
"""Handle Slack Event API webhooks for app mentions."""
body = await request.body()
signature = request.headers.get("X-Slack-Signature", "")
timestamp = request.headers.get("X-Slack-Request-Timestamp", "")
if not verify_slack_signature(
body=body,
timestamp=timestamp,
signature=signature,
secret=SLACK_SIGNING_SECRET,
):
logger.warning("Invalid Slack signature")
raise HTTPException(status_code=401, detail="Invalid signature")
try:
payload = json.loads(body)
except json.JSONDecodeError:
logger.exception("Failed to parse Slack webhook JSON")
return {"status": "error", "message": "Invalid JSON"}
if payload.get("type") == "url_verification":
challenge = payload.get("challenge", "")
return {"challenge": challenge}
if payload.get("type") != "event_callback":
return {"status": "ignored", "reason": "Not an event callback"}
event = payload.get("event", {})
if event.get("type") != "app_mention":
message_text = event.get("text", "")
has_username_mention = bool(
event.get("type") == "message"
and SLACK_BOT_USERNAME
and f"@{SLACK_BOT_USERNAME}" in message_text
)
has_id_mention = bool(
event.get("type") == "message"
and SLACK_BOT_USER_ID
and f"<@{SLACK_BOT_USER_ID}>" in message_text
)
if not (has_username_mention or has_id_mention):
return {"status": "ignored", "reason": "Not an app_mention event"}
if event.get("subtype") == "bot_message" or event.get("bot_id"):
return {"status": "ignored", "reason": "Event from a bot"}
channel_id = event.get("channel", "")
event_ts = event.get("ts", "")
thread_ts = event.get("thread_ts") or event_ts
user_id = event.get("user", "")
text = event.get("text", "")
if not channel_id or not event_ts or not thread_ts:
return {"status": "ignored", "reason": "Missing channel/thread timestamp"}
bot_user_id = SLACK_BOT_USER_ID
if not bot_user_id:
authorizations = payload.get("authorizations", [])
if isinstance(authorizations, list) and authorizations:
auth_user_id = authorizations[0].get("user_id")
if isinstance(auth_user_id, str):
bot_user_id = auth_user_id
if not bot_user_id:
authed_users = payload.get("authed_users", [])
if isinstance(authed_users, list) and authed_users:
first_user = authed_users[0]
if isinstance(first_user, str):
bot_user_id = first_user
if bot_user_id and user_id == bot_user_id:
return {"status": "ignored", "reason": "Event from this bot user"}
event_data = {
"channel_id": channel_id,
"thread_ts": thread_ts,
"event_ts": event_ts,
"user_id": user_id,
"text": text,
"bot_user_id": bot_user_id,
}
repo_config = await get_slack_repo_config(text, channel_id, thread_ts)
if not _is_repo_org_allowed(repo_config):
logger.warning(
"Rejecting Slack webhook: org '%s' not in ALLOWED_GITHUB_ORGS",
repo_config.get("owner"),
)
return {"status": "ignored", "reason": "Repository org not in allowlist"}
background_tasks.add_task(process_slack_mention, event_data, repo_config)
return {"status": "accepted", "message": "Slack mention queued"}
@app.get("/webhooks/slack")
async def slack_webhook_verify() -> dict[str, str]:
"""Verify endpoint for Slack webhook setup."""
return {"status": "ok", "message": "Slack webhook endpoint is active"}
@app.get("/health")
async def health_check() -> dict[str, str]:
"""Health check endpoint."""
return {"status": "healthy"}
_SUPPORTED_GH_EVENTS = frozenset(
["issue_comment", "issues", "pull_request_review_comment", "pull_request_review"]
)
_SUPPORTED_GH_ISSUE_ACTIONS = frozenset(["edited", "opened", "reopened"])
def _build_github_issue_comments_text(comments: list[dict[str, Any]]) -> str:
lines: list[str] = []
for comment in comments:
body = comment.get("body", "")
if not body or any(body.startswith(prefix) for prefix in _GITHUB_BOT_MESSAGE_PREFIXES):
continue
author = comment.get("author", "unknown")
formatted_body = format_github_comment_body_for_prompt(author, body)
lines.append(f"\n**{author}:**\n{formatted_body}\n")
if not lines:
return ""
return "\n\n## Comments:\n" + "".join(lines)
def build_github_issue_prompt(
repo_config: dict[str, str],
issue_number: int,
issue_id: str,
title: str,
body: str,
comments: list[dict[str, Any]],
*,
github_login: str,
issue_author: str = "",
) -> str:
"""Build the user prompt for a GitHub issue-triggered run."""
triggered_by_line = f"## Triggered by: {github_login}\n\n" if github_login else ""
comments_text = _build_github_issue_comments_text(comments)
sanitized_title = sanitize_github_comment_body(title)
formatted_body = format_github_comment_body_for_prompt(issue_author or github_login, body)
return (
"Please work on the following GitHub issue:\n\n"
f"## Repository: {repo_config.get('owner')}/{repo_config.get('name')}\n\n"
f"{triggered_by_line}"
f"## GitHub Issue: #{issue_number} - Issue ID: {issue_id}\n\n"
f"## Title: {sanitized_title}\n\n"
f"## Description:\n{formatted_body}\n"
f"{comments_text}\n\n"
"Please analyze this issue and implement the necessary changes. "
"When you need to communicate on GitHub, use `github_comment` with the issue number."
)
def build_github_issue_followup_prompt(github_login: str, comment_body: str) -> str:
"""Build the prompt for a follow-up GitHub issue comment."""
return (
f"**{github_login}:**\n{format_github_comment_body_for_prompt(github_login, comment_body)}"
)
def build_github_issue_update_prompt(github_login: str, title: str, body: str) -> str:
"""Build the prompt for a follow-up GitHub issue title/body update."""
sanitized_title = sanitize_github_comment_body(title)
formatted_body = format_github_comment_body_for_prompt(github_login, body)
return (
f"**{github_login}:** updated the GitHub issue title/body.\n\n"
f"Title: {sanitized_title}\n\n"
f"Description:\n{formatted_body}"
)
async def _trigger_or_queue_run(
thread_id: str,
prompt: str,
*,
github_login: str,
repo_config: dict[str, str],
pr_number: int,
) -> None:
"""Create a new agent run or queue the message if the thread is busy."""
thread_active = await is_thread_active(thread_id)
if thread_active:
logger.info("Thread %s is busy, queuing GitHub PR comment message", thread_id)
await queue_message_for_thread(thread_id, prompt)
return
logger.info("Creating LangGraph run for thread %s from GitHub PR comment", thread_id)
langgraph_client = get_client(url=LANGGRAPH_URL)
await langgraph_client.runs.create(
thread_id,
"agent",
input={"messages": [{"role": "user", "content": prompt}]},
config={
"configurable": {
"source": "github",
"github_login": github_login,
"repo": repo_config,
"pr_number": pr_number,
},
"metadata": _AGENT_VERSION_METADATA,
},
if_not_exists="create",
)
logger.info("LangGraph run created for thread %s from GitHub PR comment", thread_id)
async def _get_or_resolve_thread_github_token(thread_id: str, email: str) -> str | None:
"""Resolve and persist a GitHub token for a thread when available.
In bot-token-only mode, returns a fresh GitHub App installation token
instead of resolving per-user OAuth tokens.
"""
if is_bot_token_only_mode():
bot_token = await get_github_app_installation_token()
if bot_token:
try:
await persist_encrypted_github_token(thread_id, bot_token)
except Exception:
logger.warning("Could not persist bot token for thread %s", thread_id)
return bot_token
logger.warning("Bot-token-only mode but GitHub App token unavailable")
return None
github_token, _encrypted_token = await get_github_token_from_thread(thread_id)
if github_token:
return github_token
auth_result = await resolve_github_token_from_email(email)
github_token = auth_result.get("token")
if not github_token:
return None
try:
await persist_encrypted_github_token(thread_id, github_token)
except Exception:
logger.warning("Could not persist GitHub token for thread %s", thread_id)
return github_token
async def process_github_pr_comment(payload: dict[str, Any], event_type: str) -> None:
"""Process a GitHub PR comment that tagged @open-swe.
Retrieves the existing thread token, reacts with 👀, fetches all comments
since the last @open-swe tag, then creates or queues a new run.
Args:
payload: The parsed GitHub webhook payload.
event_type: One of 'issue_comment', 'pull_request_review_comment',
'pull_request_review'.
"""
(
repo_config,
pr_number,
branch_name,
github_login,
pr_url,
comment_id,
node_id,
) = await extract_pr_context(payload, event_type)
logger.info(
"Processing GitHub PR comment: event=%s, pr=%s, branch=%s",
event_type,
pr_number,
branch_name,
)
thread_id = get_thread_id_from_branch(branch_name) if branch_name else None
if not thread_id:
logger.warning("Could not extract thread_id from branch '%s', skipping", branch_name)
return
email = GITHUB_USER_EMAIL_MAP.get(github_login, "")
if not email:
logger.warning("No email mapping for GitHub user '%s', skipping", github_login)
return
github_token = await _get_or_resolve_thread_github_token(thread_id, email)
if not github_token:
logger.warning("No GitHub token for thread %s, skipping", thread_id)
return
if comment_id:
await react_to_github_comment(
repo_config,
comment_id,
event_type=event_type,
token=github_token,
pull_number=pr_number,
node_id=node_id,
)
if not pr_number:
logger.warning("No PR number found in payload, skipping")
return
comments = await fetch_pr_comments_since_last_tag(repo_config, pr_number, token=github_token)
if not comments:
logger.info("No comments found since last @open-swe tag for PR %s", pr_number)
return
prompt = build_pr_prompt(comments, pr_url)
await _trigger_or_queue_run(
thread_id,
prompt,
github_login=github_login,
repo_config=repo_config,
pr_number=pr_number,
)
async def process_github_issue(payload: dict[str, Any], event_type: str) -> None:
"""Process a GitHub issue or issue comment that tagged @open-swe."""
issue = payload.get("issue", {})
repo = payload.get("repository", {})
repo_config = {
"owner": repo.get("owner", {}).get("login", ""),
"name": repo.get("name", ""),
}
issue_id = str(issue.get("id", ""))
issue_number = issue.get("number")
github_login = payload.get("sender", {}).get("login", "")
issue_url = issue.get("html_url", "") or issue.get("url", "")
title = issue.get("title", "No title")
description = issue.get("body") or "No description"
issue_author = issue.get("user", {}).get("login", "")
logger.info(
"Processing GitHub issue: event=%s, issue=%s, repo=%s/%s",
event_type,
issue_number,
repo_config.get("owner"),
repo_config.get("name"),
)
if not issue_id or not issue_number:
logger.warning("Missing GitHub issue id/number, skipping")
return
email = GITHUB_USER_EMAIL_MAP.get(github_login, "")
if not email:
logger.warning("No email mapping for GitHub user '%s', skipping", github_login)
return
thread_id = generate_thread_id_from_github_issue(issue_id)
existing_thread = await _thread_exists(thread_id)
github_token = await _get_or_resolve_thread_github_token(thread_id, email)
app_token = await get_github_app_installation_token()
reaction_token = github_token or app_token
comment = payload.get("comment", {})
comment_id = comment.get("id")
if event_type == "issue_comment" and comment_id:
if not reaction_token:
logger.warning("No GitHub token available to react to issue comment %s", comment_id)
else:
reacted = await react_to_github_comment(
repo_config,
comment_id,
event_type="issue_comment",
token=reaction_token,
)
if not reacted:
logger.warning("Failed to react to GitHub issue comment %s", comment_id)
if existing_thread:
if event_type == "issue_comment":
prompt = build_github_issue_followup_prompt(
comment.get("user", {}).get("login", github_login) or github_login,
comment.get("body", ""),
)
else:
prompt = build_github_issue_update_prompt(github_login, title, description)
else:
comments = await fetch_issue_comments(
repo_config, issue_number, token=github_token or app_token
)
if comment_id and not any(item.get("comment_id") == comment_id for item in comments):
comments.append(
{
"body": comment.get("body", ""),
"author": comment.get("user", {}).get("login", "unknown"),
"created_at": comment.get("created_at", ""),
"comment_id": comment_id,
}
)
comments.sort(key=lambda item: item.get("created_at", ""))
prompt = build_github_issue_prompt(
repo_config,
issue_number,
issue_id,
title,
description,
comments,
github_login=github_login,
issue_author=issue_author,
)
configurable: dict[str, Any] = {
"source": "github",
"github_login": github_login,
"repo": repo_config,
"github_issue": {
"id": issue_id,
"number": issue_number,
"title": title,
"url": issue_url,
},
}
thread_active = await is_thread_active(thread_id)
if thread_active:
logger.info("Thread %s is busy, queuing GitHub issue message", thread_id)
await queue_message_for_thread(thread_id, prompt)
return
logger.info("Creating LangGraph run for thread %s from GitHub issue", thread_id)
langgraph_client = get_client(url=LANGGRAPH_URL)
await langgraph_client.runs.create(
thread_id,
"agent",
input={"messages": [{"role": "user", "content": prompt}]},
config={"configurable": configurable, "metadata": _AGENT_VERSION_METADATA},
if_not_exists="create",
)
logger.info("LangGraph run created for thread %s from GitHub issue", thread_id)
@app.post("/webhooks/github")
async def github_webhook(request: Request, background_tasks: BackgroundTasks) -> dict[str, str]:
"""Handle GitHub webhooks for issue and PR events that tag @open-swe."""
body = await request.body()
signature = request.headers.get("X-Hub-Signature-256", "")
if not verify_github_signature(body, signature, secret=GITHUB_WEBHOOK_SECRET):
logger.warning("Invalid GitHub webhook signature")
raise HTTPException(status_code=401, detail="Invalid signature")
event_type = request.headers.get("X-GitHub-Event", "")
if event_type not in _SUPPORTED_GH_EVENTS:
logger.info("Ignoring unsupported GitHub event type: %s", event_type)
return {"status": "ignored", "reason": f"Unsupported event type: {event_type}"}
try:
payload = json.loads(body)
except json.JSONDecodeError:
logger.exception("Failed to parse GitHub webhook JSON")
return {"status": "error", "message": "Invalid JSON"}
# Check org allowlist
webhook_repo = payload.get("repository", {})
webhook_repo_config = {
"owner": webhook_repo.get("owner", {}).get("login", ""),
"name": webhook_repo.get("name", ""),
}
if not _is_repo_org_allowed(webhook_repo_config):
logger.warning(
"Rejecting GitHub webhook: org '%s' not in ALLOWED_GITHUB_ORGS",
webhook_repo_config.get("owner"),
)
return {"status": "ignored", "reason": "Repository org not in allowlist"}
issue = payload.get("issue", {})
is_pull_request_comment = bool(event_type == "issue_comment" and issue.get("pull_request"))
is_issue_comment = bool(event_type == "issue_comment" and not issue.get("pull_request"))
is_issue_event = event_type == "issues"
if is_issue_event:
action = payload.get("action", "")
if action not in _SUPPORTED_GH_ISSUE_ACTIONS:
logger.info("Ignoring unsupported GitHub issue action: %s", action)
return {"status": "ignored", "reason": f"Unsupported GitHub issue action: {action}"}
if action == "edited":
changes = payload.get("changes", {})
if not any(field in changes for field in ("body", "title")):
logger.info("Ignoring GitHub issue edit without title/body changes")
return {"status": "ignored", "reason": "Issue edit did not change title or body"}
issue_text = f"{issue.get('title', '')}\n\n{issue.get('body', '')}".lower()
if not any(tag in issue_text for tag in OPEN_SWE_TAGS):
logger.info("Ignoring issue that does not mention @openswe or @open-swe")
return {"status": "ignored", "reason": "Issue does not mention @openswe or @open-swe"}
logger.info("Accepted GitHub issue webhook, scheduling background task")
background_tasks.add_task(process_github_issue, payload, event_type)
return {"status": "accepted", "message": "Processing GitHub issue event"}
comment = payload.get("comment") or payload.get("review", {})
comment_body = (comment.get("body") or "") if comment else ""
if not any(tag in comment_body.lower() for tag in OPEN_SWE_TAGS):
logger.info("Ignoring comment that does not mention @openswe or @open-swe")
return {"status": "ignored", "reason": "Comment does not mention @openswe or @open-swe"}
logger.info("Accepted GitHub webhook: event=%s, scheduling background task", event_type)
if is_pull_request_comment or event_type in {
"pull_request_review_comment",
"pull_request_review",
}:
background_tasks.add_task(process_github_pr_comment, payload, event_type)
return {"status": "accepted", "message": f"Processing {event_type} event"}
if is_issue_comment:
background_tasks.add_task(process_github_issue, payload, event_type)
return {"status": "accepted", "message": "Processing GitHub issue comment event"}
logger.info("Ignoring unsupported GitHub payload shape for event=%s", event_type)
return {"status": "ignored", "reason": f"Unsupported payload for event type: {event_type}"}