Hunter Miller f4682742d9
feat: update tlon channel/plugin to be more fully featured (#21208)
* feat(tlon): sync with openclaw-tlon master

- Add tlon CLI tool registration with binary lookup
- Add approval, media, settings, foreigns, story, upload modules
- Add http-api wrapper for Urbit connection patching
- Update types for defaultAuthorizedShips support
- Fix type compatibility with core plugin SDK
- Stub uploadFile (API not yet available in @tloncorp/api-beta)
- Remove incompatible test files (security, sse-client, upload)

* chore(tlon): remove dead code

Remove unused Urbit channel client files:
- channel-client.ts
- channel-ops.ts
- context.ts

These were not imported anywhere in the extension.

* feat(tlon): add image upload support via @tloncorp/api

- Import configureClient and uploadFile from @tloncorp/api
- Implement uploadImageFromUrl using uploadFile
- Configure API client before media uploads
- Update dependency to github:tloncorp/api-beta#main

* fix(tlon): restore SSRF protection with event ack tracking

- Restore context.ts and channel-ops.ts for SSRF support
- Restore sse-client.ts with urbitFetch for SSRF-protected requests
- Add event ack tracking from openclaw-tlon (acks every 20 events)
- Pass ssrfPolicy through authenticate() and UrbitSSEClient
- Fixes security regression from sync with openclaw-tlon

* fix(tlon): restore buildTlonAccountFields for allowPrivateNetwork

The inlined payload building was missing allowPrivateNetwork field,
which would prevent the setting from being persisted to config.

* fix(tlon): restore SSRF protection in probeAccount

- Restore channel-client.ts for UrbitChannelClient
- Use UrbitChannelClient with ssrfPolicy in probeAccount
- Ensures account probe respects allowPrivateNetwork setting

* feat(tlon): add ownerShip to setup flow

ownerShip should always be set as it controls who receives
approval requests and can approve/deny actions.

* chore(tlon): remove unused http-api.ts

After restoring SSRF protection, probeAccount uses UrbitChannelClient
instead of @urbit/http-api. The http-api.ts wrapper is no longer needed.

* refactor(tlon): simplify probeAccount to direct /~/name request

No channel needed - just authenticate and GET /~/name.
Removes UrbitChannelClient, keeping only UrbitSSEClient for monitor.

* chore(tlon): add logging for event acks

* chore(tlon): lower ack threshold to 5 for testing

* fix(tlon): address security review issues

- Fix SSRF in upload.ts: use urbitFetch with SSRF protection
- Fix SSRF in media.ts: use urbitFetch with SSRF protection
- Add command whitelist to tlon tool to prevent command injection
- Add getDefaultSsrFPolicy() helper for uploads/downloads

* fix(tlon): restore auth retry and add reauth on SSE reconnect

- Add authenticateWithRetry() helper with exponential backoff (restores lost logic from #39)
- Add onReconnect callback to re-authenticate when SSE stream reconnects
- Add UrbitSSEClient.updateCookie() method for proper cookie normalization on reauth

* fix(tlon): add infinite reconnect with reset after max attempts

Instead of giving up after maxReconnectAttempts, wait 10 seconds then
reset the counter and keep trying. This ensures the monitor never
permanently disconnects due to temporary network issues.

* test(tlon): restore security, sse-client, and upload tests

- security.test.ts: DM allowlist, group invite, bot mention detection, ship normalization
- sse-client.test.ts: subscription handling, cookie updates, reconnection params
- upload.test.ts: image upload with SSRF protection, error handling

* fix(tlon): restore DM partner ship extraction for proper routing

- Add extractDmPartnerShip() to extract partner from 'whom' field
- Use partner ship for routing (more reliable than essay.author)
- Explicitly ignore bot's own outbound DM events
- Log mismatch between author and partner for debugging

* chore(tlon): restore ack threshold to 20

* chore(tlon): sync slash commands support from upstream

- Add stripBotMention for proper CommandBody parsing
- Add command authorization logic for owner-only slash commands
- Add CommandAuthorized and CommandSource to context payload

* fix(tlon): resolve TypeScript errors in tests and monitor

- Store validated account url/code before closure to fix type narrowing
- Fix test type annotations for mode rules
- Add proper Response type cast in sse-client mock
- Use optional chaining for init properties

* docs(tlon): update docs for new config options and capabilities

- Document ownerShip for approval system
- Document autoAcceptDmInvites and autoAcceptGroupInvites
- Update status to reflect rich text and image support
- Add bundled skill section
- Update notes with formatting and image details
- Fix pnpm-lock.yaml conflict

* docs(tlon): fix dmAllowlist description and improve allowPrivateNetwork docs

- Correct dmAllowlist: empty means no DMs allowed (not allow all)
- Promote allowPrivateNetwork to its own section with examples
- Add warning about SSRF protection implications

* docs(tlon): clarify ownerShip is auto-authorized everywhere

- Add ownerShip to minimal config example (recommended)
- Document that owner is automatically allowed for DMs and channels
- No need to add owner to dmAllowlist or defaultAuthorizedShips

* docs(tlon): add capabilities table, troubleshooting, and config reference

Align with Matrix docs format:
- Capabilities table for quick feature reference
- Troubleshooting section with common failures
- Configuration reference with all options

* docs(tlon): fix reactions status and expand bundled skill section

- Reactions ARE supported via bundled skill (not missing)
- Add link to skill GitHub repo
- List skill capabilities: contacts, channels, groups, DMs, reactions, settings

* fix(tlon): use crypto.randomUUID instead of Math.random for channel ID

Fixes security test failure - Math.random is flagged as weak randomness.

* docs: fix markdown lint - add blank line before </Step>

* fix: address PR review issues for tlon plugin

- upload.ts: Use fetchWithSsrFGuard directly instead of urbitFetch to
  preserve full URL path when fetching external images; add release() call
- media.ts: Same fix - use fetchWithSsrFGuard for external media downloads;
  add release() call to clean up resources
- channel.ts: Use urbitFetch for poke API to maintain consistent SSRF
  protection (DNS pinning + redirect handling)
- upload.test.ts: Update mocks to use fetchWithSsrFGuard instead of urbitFetch

Addresses blocking issues from jalehman's review:
1. Fixed incorrect URL being fetched (validateUrbitBaseUrl was stripping path)
2. Fixed missing release() calls that could leak resources
3. Restored guarded fetch semantics for poke operations

* docs: add tlon changelog fragment

* style: format tlon monitor

* fix: align tlon lockfile and sse id generation

* docs: fix onboarding markdown list spacing

---------

Co-authored-by: Josh Lehman <josh@martian.engineering>
2026-03-02 16:23:42 -08:00

1926 lines
68 KiB
TypeScript

import type { RuntimeEnv, ReplyPayload, OpenClawConfig } from "openclaw/plugin-sdk";
import { createLoggerBackedRuntime, createReplyPrefixOptions } from "openclaw/plugin-sdk";
import { getTlonRuntime } from "../runtime.js";
import { createSettingsManager, type TlonSettingsStore } from "../settings.js";
import { normalizeShip, parseChannelNest } from "../targets.js";
import { resolveTlonAccount } from "../types.js";
import { authenticate } from "../urbit/auth.js";
import { ssrfPolicyFromAllowPrivateNetwork } from "../urbit/context.js";
import type { Foreigns, DmInvite } from "../urbit/foreigns.js";
import { sendDm, sendGroupMessage } from "../urbit/send.js";
import { UrbitSSEClient } from "../urbit/sse-client.js";
import {
type PendingApproval,
type AdminCommand,
createPendingApproval,
formatApprovalRequest,
formatApprovalConfirmation,
parseApprovalResponse,
isApprovalResponse,
findPendingApproval,
removePendingApproval,
parseAdminCommand,
isAdminCommand,
formatBlockedList,
formatPendingList,
} from "./approval.js";
import { fetchAllChannels, fetchInitData } from "./discovery.js";
import { cacheMessage, getChannelHistory, fetchThreadHistory } from "./history.js";
import { downloadMessageImages } from "./media.js";
import { createProcessedMessageTracker } from "./processed-messages.js";
import {
extractMessageText,
extractCites,
formatModelName,
isBotMentioned,
stripBotMention,
isDmAllowed,
isSummarizationRequest,
type ParsedCite,
} from "./utils.js";
export type MonitorTlonOpts = {
runtime?: RuntimeEnv;
abortSignal?: AbortSignal;
accountId?: string | null;
};
type ChannelAuthorization = {
mode?: "restricted" | "open";
allowedShips?: string[];
};
/**
* Resolve channel authorization by merging file config with settings store.
* Settings store takes precedence for fields it defines.
*/
function resolveChannelAuthorization(
cfg: OpenClawConfig,
channelNest: string,
settings?: TlonSettingsStore,
): { mode: "restricted" | "open"; allowedShips: string[] } {
const tlonConfig = cfg.channels?.tlon as
| {
authorization?: { channelRules?: Record<string, ChannelAuthorization> };
defaultAuthorizedShips?: string[];
}
| undefined;
// Merge channel rules: settings override file config
const fileRules = tlonConfig?.authorization?.channelRules ?? {};
const settingsRules = settings?.channelRules ?? {};
const rule = settingsRules[channelNest] ?? fileRules[channelNest];
// Merge default authorized ships: settings override file config
const defaultShips = settings?.defaultAuthorizedShips ?? tlonConfig?.defaultAuthorizedShips ?? [];
const allowedShips = rule?.allowedShips ?? defaultShips;
const mode = rule?.mode ?? "restricted";
return { mode, allowedShips };
}
export async function monitorTlonProvider(opts: MonitorTlonOpts = {}): Promise<void> {
const core = getTlonRuntime();
const cfg = core.config.loadConfig() as OpenClawConfig;
if (cfg.channels?.tlon?.enabled === false) {
return;
}
const logger = core.logging.getChildLogger({ module: "tlon-auto-reply" });
const runtime: RuntimeEnv =
opts.runtime ??
createLoggerBackedRuntime({
logger,
});
const account = resolveTlonAccount(cfg, opts.accountId ?? undefined);
if (!account.enabled) {
return;
}
if (!account.configured || !account.ship || !account.url || !account.code) {
throw new Error("Tlon account not configured (ship/url/code required)");
}
const botShipName = normalizeShip(account.ship);
runtime.log?.(`[tlon] Starting monitor for ${botShipName}`);
const ssrfPolicy = ssrfPolicyFromAllowPrivateNetwork(account.allowPrivateNetwork);
// Store validated values for use in closures (TypeScript narrowing doesn't propagate)
const accountUrl = account.url;
const accountCode = account.code;
// Helper to authenticate with retry logic
async function authenticateWithRetry(maxAttempts = 10): Promise<string> {
for (let attempt = 1; ; attempt++) {
if (opts.abortSignal?.aborted) {
throw new Error("Aborted while waiting to authenticate");
}
try {
runtime.log?.(`[tlon] Attempting authentication to ${accountUrl}...`);
return await authenticate(accountUrl, accountCode, { ssrfPolicy });
} catch (error: any) {
runtime.error?.(
`[tlon] Failed to authenticate (attempt ${attempt}): ${error?.message ?? String(error)}`,
);
if (attempt >= maxAttempts) {
throw error;
}
const delay = Math.min(30000, 1000 * Math.pow(2, attempt - 1));
runtime.log?.(`[tlon] Retrying authentication in ${delay}ms...`);
await new Promise<void>((resolve, reject) => {
const timer = setTimeout(resolve, delay);
if (opts.abortSignal) {
const onAbort = () => {
clearTimeout(timer);
reject(new Error("Aborted"));
};
opts.abortSignal.addEventListener("abort", onAbort, { once: true });
}
});
}
}
}
let api: UrbitSSEClient | null = null;
const cookie = await authenticateWithRetry();
api = new UrbitSSEClient(account.url, cookie, {
ship: botShipName,
ssrfPolicy,
logger: {
log: (message) => runtime.log?.(message),
error: (message) => runtime.error?.(message),
},
// Re-authenticate on reconnect in case the session expired
onReconnect: async (client) => {
runtime.log?.("[tlon] Re-authenticating on SSE reconnect...");
const newCookie = await authenticateWithRetry(5);
client.updateCookie(newCookie);
runtime.log?.("[tlon] Re-authentication successful");
},
});
const processedTracker = createProcessedMessageTracker(2000);
let groupChannels: string[] = [];
let botNickname: string | null = null;
// Settings store manager for hot-reloading config
const settingsManager = createSettingsManager(api, {
log: (msg) => runtime.log?.(msg),
error: (msg) => runtime.error?.(msg),
});
// Reactive state that can be updated via settings store
let effectiveDmAllowlist: string[] = account.dmAllowlist;
let effectiveShowModelSig: boolean = account.showModelSignature ?? false;
let effectiveAutoAcceptDmInvites: boolean = account.autoAcceptDmInvites ?? false;
let effectiveAutoAcceptGroupInvites: boolean = account.autoAcceptGroupInvites ?? false;
let effectiveGroupInviteAllowlist: string[] = account.groupInviteAllowlist;
let effectiveAutoDiscoverChannels: boolean = account.autoDiscoverChannels ?? false;
let effectiveOwnerShip: string | null = account.ownerShip
? normalizeShip(account.ownerShip)
: null;
let pendingApprovals: PendingApproval[] = [];
let currentSettings: TlonSettingsStore = {};
// Track threads we've participated in (by parentId) - respond without mention requirement
const participatedThreads = new Set<string>();
// Track DM senders per session to detect shared sessions (security warning)
const dmSendersBySession = new Map<string, Set<string>>();
let sharedSessionWarningSent = false;
// Fetch bot's nickname from contacts
try {
const selfProfile = await api.scry("/contacts/v1/self.json");
if (selfProfile && typeof selfProfile === "object") {
const profile = selfProfile as { nickname?: { value?: string } };
botNickname = profile.nickname?.value || null;
if (botNickname) {
runtime.log?.(`[tlon] Bot nickname: ${botNickname}`);
}
}
} catch (error: any) {
runtime.log?.(`[tlon] Could not fetch nickname: ${error?.message ?? String(error)}`);
}
// Store init foreigns for processing after settings are loaded
let initForeigns: Foreigns | null = null;
// Migrate file config to settings store (seed on first run)
async function migrateConfigToSettings() {
const migrations: Array<{ key: string; fileValue: unknown; settingsValue: unknown }> = [
{
key: "dmAllowlist",
fileValue: account.dmAllowlist,
settingsValue: currentSettings.dmAllowlist,
},
{
key: "groupInviteAllowlist",
fileValue: account.groupInviteAllowlist,
settingsValue: currentSettings.groupInviteAllowlist,
},
{
key: "groupChannels",
fileValue: account.groupChannels,
settingsValue: currentSettings.groupChannels,
},
{
key: "defaultAuthorizedShips",
fileValue: account.defaultAuthorizedShips,
settingsValue: currentSettings.defaultAuthorizedShips,
},
{
key: "autoDiscoverChannels",
fileValue: account.autoDiscoverChannels,
settingsValue: currentSettings.autoDiscoverChannels,
},
{
key: "autoAcceptDmInvites",
fileValue: account.autoAcceptDmInvites,
settingsValue: currentSettings.autoAcceptDmInvites,
},
{
key: "autoAcceptGroupInvites",
fileValue: account.autoAcceptGroupInvites,
settingsValue: currentSettings.autoAcceptGroupInvites,
},
{
key: "showModelSig",
fileValue: account.showModelSignature,
settingsValue: currentSettings.showModelSig,
},
];
for (const { key, fileValue, settingsValue } of migrations) {
// Only migrate if file has a value and settings store doesn't
const hasFileValue = Array.isArray(fileValue) ? fileValue.length > 0 : fileValue != null;
const hasSettingsValue = Array.isArray(settingsValue)
? settingsValue.length > 0
: settingsValue != null;
if (hasFileValue && !hasSettingsValue) {
try {
await api!.poke({
app: "settings",
mark: "settings-event",
json: {
"put-entry": {
"bucket-key": "tlon",
"entry-key": key,
value: fileValue,
desk: "moltbot",
},
},
});
runtime.log?.(`[tlon] Migrated ${key} from config to settings store`);
} catch (err) {
runtime.log?.(`[tlon] Failed to migrate ${key}: ${String(err)}`);
}
}
}
}
// Load settings from settings store (hot-reloadable config)
try {
currentSettings = await settingsManager.load();
// Migrate file config to settings store if not already present
await migrateConfigToSettings();
// Apply settings overrides
// Note: groupChannels from settings store are merged AFTER discovery runs (below)
if (currentSettings.defaultAuthorizedShips?.length) {
runtime.log?.(
`[tlon] Using defaultAuthorizedShips from settings store: ${currentSettings.defaultAuthorizedShips.join(", ")}`,
);
}
if (currentSettings.autoDiscoverChannels !== undefined) {
effectiveAutoDiscoverChannels = currentSettings.autoDiscoverChannels;
runtime.log?.(
`[tlon] Using autoDiscoverChannels from settings store: ${effectiveAutoDiscoverChannels}`,
);
}
if (currentSettings.dmAllowlist?.length) {
effectiveDmAllowlist = currentSettings.dmAllowlist;
runtime.log?.(
`[tlon] Using dmAllowlist from settings store: ${effectiveDmAllowlist.join(", ")}`,
);
}
if (currentSettings.showModelSig !== undefined) {
effectiveShowModelSig = currentSettings.showModelSig;
}
if (currentSettings.autoAcceptDmInvites !== undefined) {
effectiveAutoAcceptDmInvites = currentSettings.autoAcceptDmInvites;
runtime.log?.(
`[tlon] Using autoAcceptDmInvites from settings store: ${effectiveAutoAcceptDmInvites}`,
);
}
if (currentSettings.autoAcceptGroupInvites !== undefined) {
effectiveAutoAcceptGroupInvites = currentSettings.autoAcceptGroupInvites;
runtime.log?.(
`[tlon] Using autoAcceptGroupInvites from settings store: ${effectiveAutoAcceptGroupInvites}`,
);
}
if (currentSettings.groupInviteAllowlist?.length) {
effectiveGroupInviteAllowlist = currentSettings.groupInviteAllowlist;
runtime.log?.(
`[tlon] Using groupInviteAllowlist from settings store: ${effectiveGroupInviteAllowlist.join(", ")}`,
);
}
if (currentSettings.ownerShip) {
effectiveOwnerShip = normalizeShip(currentSettings.ownerShip);
runtime.log?.(`[tlon] Using ownerShip from settings store: ${effectiveOwnerShip}`);
}
if (currentSettings.pendingApprovals?.length) {
pendingApprovals = currentSettings.pendingApprovals;
runtime.log?.(`[tlon] Loaded ${pendingApprovals.length} pending approval(s) from settings`);
}
} catch (err) {
runtime.log?.(`[tlon] Settings store not available, using file config: ${String(err)}`);
}
// Run channel discovery AFTER settings are loaded (so settings store value is used)
if (effectiveAutoDiscoverChannels) {
try {
const initData = await fetchInitData(api, runtime);
if (initData.channels.length > 0) {
groupChannels = initData.channels;
}
initForeigns = initData.foreigns;
} catch (error: any) {
runtime.error?.(`[tlon] Auto-discovery failed: ${error?.message ?? String(error)}`);
}
}
// Merge manual config with auto-discovered channels
if (account.groupChannels.length > 0) {
for (const ch of account.groupChannels) {
if (!groupChannels.includes(ch)) {
groupChannels.push(ch);
}
}
runtime.log?.(
`[tlon] Added ${account.groupChannels.length} manual groupChannels to monitoring`,
);
}
// Also merge settings store groupChannels (may have been set via tlon settings command)
if (currentSettings.groupChannels?.length) {
for (const ch of currentSettings.groupChannels) {
if (!groupChannels.includes(ch)) {
groupChannels.push(ch);
}
}
}
if (groupChannels.length > 0) {
runtime.log?.(
`[tlon] Monitoring ${groupChannels.length} group channel(s): ${groupChannels.join(", ")}`,
);
} else {
runtime.log?.("[tlon] No group channels to monitor (DMs only)");
}
// Helper to resolve cited message content
async function resolveCiteContent(cite: ParsedCite): Promise<string | null> {
if (cite.type !== "chan" || !cite.nest || !cite.postId) {
return null;
}
try {
// Scry for the specific post: /v4/{nest}/posts/post/{postId}
const scryPath = `/channels/v4/${cite.nest}/posts/post/${cite.postId}.json`;
runtime.log?.(`[tlon] Fetching cited post: ${scryPath}`);
const data: any = await api!.scry(scryPath);
// Extract text from the post's essay content
if (data?.essay?.content) {
const text = extractMessageText(data.essay.content);
return text || null;
}
return null;
} catch (err) {
runtime.log?.(`[tlon] Failed to fetch cited post: ${String(err)}`);
return null;
}
}
// Resolve all cites in message content and return quoted text
async function resolveAllCites(content: unknown): Promise<string> {
const cites = extractCites(content);
if (cites.length === 0) {
return "";
}
const resolved: string[] = [];
for (const cite of cites) {
const text = await resolveCiteContent(cite);
if (text) {
const author = cite.author || "unknown";
resolved.push(`> ${author} wrote: ${text}`);
}
}
return resolved.length > 0 ? resolved.join("\n") + "\n\n" : "";
}
// Helper to save pending approvals to settings store
async function savePendingApprovals(): Promise<void> {
try {
await api!.poke({
app: "settings",
mark: "settings-event",
json: {
"put-entry": {
desk: "moltbot",
"bucket-key": "tlon",
"entry-key": "pendingApprovals",
value: JSON.stringify(pendingApprovals),
},
},
});
} catch (err) {
runtime.error?.(`[tlon] Failed to save pending approvals: ${String(err)}`);
}
}
// Helper to update dmAllowlist in settings store
async function addToDmAllowlist(ship: string): Promise<void> {
const normalizedShip = normalizeShip(ship);
if (!effectiveDmAllowlist.includes(normalizedShip)) {
effectiveDmAllowlist = [...effectiveDmAllowlist, normalizedShip];
}
try {
await api!.poke({
app: "settings",
mark: "settings-event",
json: {
"put-entry": {
desk: "moltbot",
"bucket-key": "tlon",
"entry-key": "dmAllowlist",
value: effectiveDmAllowlist,
},
},
});
runtime.log?.(`[tlon] Added ${normalizedShip} to dmAllowlist`);
} catch (err) {
runtime.error?.(`[tlon] Failed to update dmAllowlist: ${String(err)}`);
}
}
// Helper to update channelRules in settings store
async function addToChannelAllowlist(ship: string, channelNest: string): Promise<void> {
const normalizedShip = normalizeShip(ship);
const channelRules = currentSettings.channelRules ?? {};
const rule = channelRules[channelNest] ?? { mode: "restricted", allowedShips: [] };
const allowedShips = [...(rule.allowedShips ?? [])]; // Clone to avoid mutation
if (!allowedShips.includes(normalizedShip)) {
allowedShips.push(normalizedShip);
}
const updatedRules = {
...channelRules,
[channelNest]: { ...rule, allowedShips },
};
// Update local state immediately (don't wait for settings subscription)
currentSettings = { ...currentSettings, channelRules: updatedRules };
try {
await api!.poke({
app: "settings",
mark: "settings-event",
json: {
"put-entry": {
desk: "moltbot",
"bucket-key": "tlon",
"entry-key": "channelRules",
value: JSON.stringify(updatedRules),
},
},
});
runtime.log?.(`[tlon] Added ${normalizedShip} to ${channelNest} allowlist`);
} catch (err) {
runtime.error?.(`[tlon] Failed to update channelRules: ${String(err)}`);
}
}
// Helper to block a ship using Tlon's native blocking
async function blockShip(ship: string): Promise<void> {
const normalizedShip = normalizeShip(ship);
try {
await api!.poke({
app: "chat",
mark: "chat-block-ship",
json: { ship: normalizedShip },
});
runtime.log?.(`[tlon] Blocked ship ${normalizedShip}`);
} catch (err) {
runtime.error?.(`[tlon] Failed to block ship ${normalizedShip}: ${String(err)}`);
}
}
// Check if a ship is blocked using Tlon's native block list
async function isShipBlocked(ship: string): Promise<boolean> {
const normalizedShip = normalizeShip(ship);
try {
const blocked = (await api!.scry("/chat/blocked.json")) as string[] | undefined;
return Array.isArray(blocked) && blocked.some((s) => normalizeShip(s) === normalizedShip);
} catch (err) {
runtime.log?.(`[tlon] Failed to check blocked list: ${String(err)}`);
return false;
}
}
// Get all blocked ships
async function getBlockedShips(): Promise<string[]> {
try {
const blocked = (await api!.scry("/chat/blocked.json")) as string[] | undefined;
return Array.isArray(blocked) ? blocked : [];
} catch (err) {
runtime.log?.(`[tlon] Failed to get blocked list: ${String(err)}`);
return [];
}
}
// Helper to unblock a ship using Tlon's native blocking
async function unblockShip(ship: string): Promise<boolean> {
const normalizedShip = normalizeShip(ship);
try {
await api!.poke({
app: "chat",
mark: "chat-unblock-ship",
json: { ship: normalizedShip },
});
runtime.log?.(`[tlon] Unblocked ship ${normalizedShip}`);
return true;
} catch (err) {
runtime.error?.(`[tlon] Failed to unblock ship ${normalizedShip}: ${String(err)}`);
return false;
}
}
// Helper to send DM notification to owner
async function sendOwnerNotification(message: string): Promise<void> {
if (!effectiveOwnerShip) {
runtime.log?.("[tlon] No ownerShip configured, cannot send notification");
return;
}
try {
await sendDm({
api: api!,
fromShip: botShipName,
toShip: effectiveOwnerShip,
text: message,
});
runtime.log?.(`[tlon] Sent notification to owner ${effectiveOwnerShip}`);
} catch (err) {
runtime.error?.(`[tlon] Failed to send notification to owner: ${String(err)}`);
}
}
// Queue a new approval request and notify the owner
async function queueApprovalRequest(approval: PendingApproval): Promise<void> {
// Check if ship is blocked - silently ignore
if (await isShipBlocked(approval.requestingShip)) {
runtime.log?.(`[tlon] Ignoring request from blocked ship ${approval.requestingShip}`);
return;
}
// Check for duplicate - if found, update it with new content and re-notify
const existingIndex = pendingApprovals.findIndex(
(a) =>
a.type === approval.type &&
a.requestingShip === approval.requestingShip &&
(approval.type !== "channel" || a.channelNest === approval.channelNest) &&
(approval.type !== "group" || a.groupFlag === approval.groupFlag),
);
if (existingIndex !== -1) {
// Update existing approval with new content (preserves the original ID)
const existing = pendingApprovals[existingIndex];
if (approval.originalMessage) {
existing.originalMessage = approval.originalMessage;
existing.messagePreview = approval.messagePreview;
}
runtime.log?.(
`[tlon] Updated existing approval for ${approval.requestingShip} (${approval.type}) - re-sending notification`,
);
await savePendingApprovals();
const message = formatApprovalRequest(existing);
await sendOwnerNotification(message);
return;
}
pendingApprovals.push(approval);
await savePendingApprovals();
const message = formatApprovalRequest(approval);
await sendOwnerNotification(message);
runtime.log?.(
`[tlon] Queued approval request: ${approval.id} (${approval.type} from ${approval.requestingShip})`,
);
}
// Process the owner's approval response
async function handleApprovalResponse(text: string): Promise<boolean> {
const parsed = parseApprovalResponse(text);
if (!parsed) {
return false;
}
const approval = findPendingApproval(pendingApprovals, parsed.id);
if (!approval) {
await sendOwnerNotification(
"No pending approval found" + (parsed.id ? ` for ID: ${parsed.id}` : ""),
);
return true; // Still consumed the message
}
if (parsed.action === "approve") {
switch (approval.type) {
case "dm":
await addToDmAllowlist(approval.requestingShip);
// Process the original message if available
if (approval.originalMessage) {
runtime.log?.(
`[tlon] Processing original message from ${approval.requestingShip} after approval`,
);
await processMessage({
messageId: approval.originalMessage.messageId,
senderShip: approval.requestingShip,
messageText: approval.originalMessage.messageText,
messageContent: approval.originalMessage.messageContent,
isGroup: false,
timestamp: approval.originalMessage.timestamp,
});
}
break;
case "channel":
if (approval.channelNest) {
await addToChannelAllowlist(approval.requestingShip, approval.channelNest);
// Process the original message if available
if (approval.originalMessage) {
const parsed = parseChannelNest(approval.channelNest);
runtime.log?.(
`[tlon] Processing original message from ${approval.requestingShip} in ${approval.channelNest} after approval`,
);
await processMessage({
messageId: approval.originalMessage.messageId,
senderShip: approval.requestingShip,
messageText: approval.originalMessage.messageText,
messageContent: approval.originalMessage.messageContent,
isGroup: true,
channelNest: approval.channelNest,
hostShip: parsed?.hostShip,
channelName: parsed?.channelName,
timestamp: approval.originalMessage.timestamp,
parentId: approval.originalMessage.parentId,
isThreadReply: approval.originalMessage.isThreadReply,
});
}
}
break;
case "group":
// Accept the group invite (don't add to allowlist - each invite requires approval)
if (approval.groupFlag) {
try {
await api!.poke({
app: "groups",
mark: "group-join",
json: {
flag: approval.groupFlag,
"join-all": true,
},
});
runtime.log?.(`[tlon] Joined group ${approval.groupFlag} after approval`);
// Immediately discover channels from the newly joined group
// Small delay to allow the join to propagate
setTimeout(async () => {
try {
const discoveredChannels = await fetchAllChannels(api!, runtime);
let newCount = 0;
for (const channelNest of discoveredChannels) {
if (!watchedChannels.has(channelNest)) {
watchedChannels.add(channelNest);
newCount++;
}
}
if (newCount > 0) {
runtime.log?.(
`[tlon] Discovered ${newCount} new channel(s) after joining group`,
);
}
} catch (err) {
runtime.log?.(`[tlon] Channel discovery after group join failed: ${String(err)}`);
}
}, 2000);
} catch (err) {
runtime.error?.(`[tlon] Failed to join group ${approval.groupFlag}: ${String(err)}`);
}
}
break;
}
await sendOwnerNotification(formatApprovalConfirmation(approval, "approve"));
} else if (parsed.action === "block") {
// Block the ship using Tlon's native blocking
await blockShip(approval.requestingShip);
await sendOwnerNotification(formatApprovalConfirmation(approval, "block"));
} else {
// Denied - just remove from pending, no notification to requester
await sendOwnerNotification(formatApprovalConfirmation(approval, "deny"));
}
// Remove from pending
pendingApprovals = removePendingApproval(pendingApprovals, approval.id);
await savePendingApprovals();
return true;
}
// Handle admin commands from owner (unblock, blocked, pending)
async function handleAdminCommand(text: string): Promise<boolean> {
const command = parseAdminCommand(text);
if (!command) {
return false;
}
switch (command.type) {
case "blocked": {
const blockedShips = await getBlockedShips();
await sendOwnerNotification(formatBlockedList(blockedShips));
runtime.log?.(`[tlon] Owner requested blocked ships list (${blockedShips.length} ships)`);
return true;
}
case "pending": {
await sendOwnerNotification(formatPendingList(pendingApprovals));
runtime.log?.(
`[tlon] Owner requested pending approvals list (${pendingApprovals.length} pending)`,
);
return true;
}
case "unblock": {
const shipToUnblock = command.ship;
const isBlocked = await isShipBlocked(shipToUnblock);
if (!isBlocked) {
await sendOwnerNotification(`${shipToUnblock} is not blocked.`);
return true;
}
const success = await unblockShip(shipToUnblock);
if (success) {
await sendOwnerNotification(`Unblocked ${shipToUnblock}.`);
} else {
await sendOwnerNotification(`Failed to unblock ${shipToUnblock}.`);
}
return true;
}
}
}
// Check if a ship is the owner (always allowed to DM)
function isOwner(ship: string): boolean {
if (!effectiveOwnerShip) {
return false;
}
return normalizeShip(ship) === effectiveOwnerShip;
}
/**
* Extract the DM partner ship from the 'whom' field.
* This is the canonical source for DM routing (more reliable than essay.author).
* Returns empty string if whom doesn't contain a valid patp-like value.
*/
function extractDmPartnerShip(whom: unknown): string {
const raw =
typeof whom === "string"
? whom
: whom && typeof whom === "object" && "ship" in whom && typeof whom.ship === "string"
? whom.ship
: "";
const normalized = normalizeShip(raw);
// Keep DM routing strict: accept only patp-like values.
return /^~?[a-z-]+$/i.test(normalized) ? normalized : "";
}
const processMessage = async (params: {
messageId: string;
senderShip: string;
messageText: string;
messageContent?: unknown; // Raw Tlon content for media extraction
isGroup: boolean;
channelNest?: string;
hostShip?: string;
channelName?: string;
timestamp: number;
parentId?: string | null;
isThreadReply?: boolean;
}) => {
const {
messageId,
senderShip,
isGroup,
channelNest,
hostShip,
channelName,
timestamp,
parentId,
isThreadReply,
messageContent,
} = params;
const groupChannel = channelNest; // For compatibility
let messageText = params.messageText;
// Download any images from the message content
let attachments: Array<{ path: string; contentType: string }> = [];
if (messageContent) {
try {
attachments = await downloadMessageImages(messageContent);
if (attachments.length > 0) {
runtime.log?.(`[tlon] Downloaded ${attachments.length} image(s) from message`);
}
} catch (error: any) {
runtime.log?.(`[tlon] Failed to download images: ${error?.message ?? String(error)}`);
}
}
// Fetch thread context when entering a thread for the first time
if (isThreadReply && parentId && groupChannel) {
try {
const threadHistory = await fetchThreadHistory(api, groupChannel, parentId, 20, runtime);
if (threadHistory.length > 0) {
const threadContext = threadHistory
.slice(-10) // Last 10 messages for context
.map((msg) => `${msg.author}: ${msg.content}`)
.join("\n");
// Prepend thread context to the message
// Include note about ongoing conversation for agent judgment
const contextNote = `[Thread conversation - ${threadHistory.length} previous replies. You are participating in this thread. Only respond if relevant or helpful - you don't need to reply to every message.]`;
messageText = `${contextNote}\n\n[Previous messages]\n${threadContext}\n\n[Current message]\n${messageText}`;
runtime?.log?.(
`[tlon] Added thread context (${threadHistory.length} replies) to message`,
);
}
} catch (error: any) {
runtime?.log?.(`[tlon] Could not fetch thread context: ${error?.message ?? String(error)}`);
// Continue without thread context - not critical
}
}
if (isGroup && groupChannel && isSummarizationRequest(messageText)) {
try {
const history = await getChannelHistory(api, groupChannel, 50, runtime);
if (history.length === 0) {
const noHistoryMsg =
"I couldn't fetch any messages for this channel. It might be empty or there might be a permissions issue.";
if (isGroup) {
const parsed = parseChannelNest(groupChannel);
if (parsed) {
await sendGroupMessage({
api: api,
fromShip: botShipName,
hostShip: parsed.hostShip,
channelName: parsed.channelName,
text: noHistoryMsg,
});
}
} else {
await sendDm({
api: api,
fromShip: botShipName,
toShip: senderShip,
text: noHistoryMsg,
});
}
return;
}
const historyText = history
.map(
(msg) => `[${new Date(msg.timestamp).toLocaleString()}] ${msg.author}: ${msg.content}`,
)
.join("\n");
messageText =
`Please summarize this channel conversation (${history.length} recent messages):\n\n${historyText}\n\n` +
"Provide a concise summary highlighting:\n" +
"1. Main topics discussed\n" +
"2. Key decisions or conclusions\n" +
"3. Action items if any\n" +
"4. Notable participants";
} catch (error: any) {
const errorMsg = `Sorry, I encountered an error while fetching the channel history: ${error?.message ?? String(error)}`;
if (isGroup && groupChannel) {
const parsed = parseChannelNest(groupChannel);
if (parsed) {
await sendGroupMessage({
api: api,
fromShip: botShipName,
hostShip: parsed.hostShip,
channelName: parsed.channelName,
text: errorMsg,
});
}
} else {
await sendDm({ api: api, fromShip: botShipName, toShip: senderShip, text: errorMsg });
}
return;
}
}
const route = core.channel.routing.resolveAgentRoute({
cfg,
channel: "tlon",
accountId: opts.accountId ?? undefined,
peer: {
kind: isGroup ? "group" : "direct",
id: isGroup ? (groupChannel ?? senderShip) : senderShip,
},
});
// Warn if multiple users share a DM session (insecure dmScope configuration)
if (!isGroup) {
const sessionKey = route.sessionKey;
if (!dmSendersBySession.has(sessionKey)) {
dmSendersBySession.set(sessionKey, new Set());
}
const senders = dmSendersBySession.get(sessionKey)!;
if (senders.size > 0 && !senders.has(senderShip)) {
// Log warning
runtime.log?.(
`[tlon] ⚠️ SECURITY: Multiple users sharing DM session. ` +
`Configure "session.dmScope: per-channel-peer" in OpenClaw config.`,
);
// Notify owner via DM (once per monitor session)
if (!sharedSessionWarningSent && effectiveOwnerShip) {
sharedSessionWarningSent = true;
const warningMsg =
`⚠️ Security Warning: Multiple users are sharing a DM session with this bot. ` +
`This can leak conversation context between users.\n\n` +
`Fix: Add to your OpenClaw config:\n` +
`session:\n dmScope: "per-channel-peer"\n\n` +
`Docs: https://docs.openclaw.ai/concepts/session#secure-dm-mode`;
// Send async, don't block message processing
sendDm({
api,
fromShip: botShipName,
toShip: effectiveOwnerShip,
text: warningMsg,
}).catch((err) =>
runtime.error?.(`[tlon] Failed to send security warning to owner: ${err}`),
);
}
}
senders.add(senderShip);
}
const senderRole = isOwner(senderShip) ? "owner" : "user";
const fromLabel = isGroup
? `${senderShip} [${senderRole}] in ${channelNest}`
: `${senderShip} [${senderRole}]`;
// Compute command authorization for slash commands (owner-only)
const shouldComputeAuth = core.channel.commands.shouldComputeCommandAuthorized(
messageText,
cfg,
);
let commandAuthorized = false;
if (shouldComputeAuth) {
const useAccessGroups = cfg.commands?.useAccessGroups !== false;
const senderIsOwner = isOwner(senderShip);
commandAuthorized = core.channel.commands.resolveCommandAuthorizedFromAuthorizers({
useAccessGroups,
authorizers: [{ configured: Boolean(effectiveOwnerShip), allowed: senderIsOwner }],
});
// Log when non-owner attempts a slash command (will be silently ignored by Gateway)
if (!commandAuthorized) {
console.log(
`[tlon] Command attempt denied: ${senderShip} is not owner (owner=${effectiveOwnerShip ?? "not configured"})`,
);
}
}
// Prepend attachment annotations to message body (similar to Signal format)
let bodyWithAttachments = messageText;
if (attachments.length > 0) {
const mediaLines = attachments
.map((a) => `[media attached: ${a.path} (${a.contentType}) | ${a.path}]`)
.join("\n");
bodyWithAttachments = mediaLines + "\n" + messageText;
}
const body = core.channel.reply.formatAgentEnvelope({
channel: "Tlon",
from: fromLabel,
timestamp,
body: bodyWithAttachments,
});
// Strip bot ship mention for CommandBody so "/status" is recognized as command-only
const commandBody = isGroup ? stripBotMention(messageText, botShipName) : messageText;
const ctxPayload = core.channel.reply.finalizeInboundContext({
Body: body,
RawBody: messageText,
CommandBody: commandBody,
From: isGroup ? `tlon:group:${groupChannel}` : `tlon:${senderShip}`,
To: `tlon:${botShipName}`,
SessionKey: route.sessionKey,
AccountId: route.accountId,
ChatType: isGroup ? "group" : "direct",
ConversationLabel: fromLabel,
SenderName: senderShip,
SenderId: senderShip,
SenderRole: senderRole,
CommandAuthorized: commandAuthorized,
CommandSource: "text" as const,
Provider: "tlon",
Surface: "tlon",
MessageSid: messageId,
// Include downloaded media attachments
...(attachments.length > 0 && { Attachments: attachments }),
OriginatingChannel: "tlon",
OriginatingTo: `tlon:${isGroup ? groupChannel : botShipName}`,
// Include thread context for automatic reply routing
...(parentId && { ThreadId: String(parentId), ReplyToId: String(parentId) }),
});
const dispatchStartTime = Date.now();
const responsePrefix = core.channel.reply.resolveEffectiveMessagesConfig(
cfg,
route.agentId,
).responsePrefix;
const humanDelay = core.channel.reply.resolveHumanDelayConfig(cfg, route.agentId);
await core.channel.reply.dispatchReplyWithBufferedBlockDispatcher({
ctx: ctxPayload,
cfg,
dispatcherOptions: {
responsePrefix,
humanDelay,
deliver: async (payload: ReplyPayload) => {
let replyText = payload.text;
if (!replyText) {
return;
}
// Use settings store value if set, otherwise fall back to file config
const showSignature = effectiveShowModelSig;
if (showSignature) {
const extPayload = payload as ReplyPayload & {
metadata?: { model?: string };
model?: string;
};
const extRoute = route as typeof route & { model?: string };
const defaultModel = cfg.agents?.defaults?.model;
const modelInfo =
extPayload.metadata?.model ||
extPayload.model ||
extRoute.model ||
(typeof defaultModel === "string" ? defaultModel : defaultModel?.primary);
extPayload.metadata?.model ||
extPayload.model ||
extRoute.model ||
(typeof defaultModel === "string" ? defaultModel : defaultModel?.primary);
replyText = `${replyText}\n\n_[Generated by ${formatModelName(modelInfo)}]_`;
}
if (isGroup && groupChannel) {
const parsed = parseChannelNest(groupChannel);
if (!parsed) {
return;
}
await sendGroupMessage({
api: api,
fromShip: botShipName,
hostShip: parsed.hostShip,
channelName: parsed.channelName,
text: replyText,
replyToId: parentId ?? undefined,
});
// Track thread participation for future replies without mention
if (parentId) {
participatedThreads.add(String(parentId));
runtime.log?.(`[tlon] Now tracking thread for future replies: ${parentId}`);
}
} else {
await sendDm({ api: api, fromShip: botShipName, toShip: senderShip, text: replyText });
}
},
onError: (err, info) => {
const dispatchDuration = Date.now() - dispatchStartTime;
runtime.error?.(
`[tlon] ${info.kind} reply failed after ${dispatchDuration}ms: ${String(err)}`,
);
},
},
});
};
// Track which channels we're interested in for filtering firehose events
const watchedChannels = new Set<string>(groupChannels);
const _watchedDMs = new Set<string>();
// Firehose handler for all channel messages (/v2)
const handleChannelsFirehose = async (event: any) => {
try {
const nest = event?.nest;
if (!nest) {
return;
}
// Only process channels we're watching
if (!watchedChannels.has(nest)) {
return;
}
const response = event?.response;
if (!response) {
return;
}
// Handle post responses (new posts and replies)
const essay = response?.post?.["r-post"]?.set?.essay;
const memo = response?.post?.["r-post"]?.reply?.["r-reply"]?.set?.memo;
if (!essay && !memo) {
return;
}
const content = memo || essay;
const isThreadReply = Boolean(memo);
const messageId = isThreadReply ? response?.post?.["r-post"]?.reply?.id : response?.post?.id;
if (!processedTracker.mark(messageId)) {
return;
}
const senderShip = normalizeShip(content.author ?? "");
if (!senderShip || senderShip === botShipName) {
return;
}
// Resolve any cited/quoted messages first
const citedContent = await resolveAllCites(content.content);
const rawText = extractMessageText(content.content);
const messageText = citedContent + rawText;
if (!messageText.trim()) {
return;
}
cacheMessage(nest, {
author: senderShip,
content: messageText,
timestamp: content.sent || Date.now(),
id: messageId,
});
// Get thread info early for participation check
const seal = isThreadReply
? response?.post?.["r-post"]?.reply?.["r-reply"]?.set?.seal
: response?.post?.["r-post"]?.set?.seal;
const parentId = seal?.["parent-id"] || seal?.parent || null;
// Check if we should respond:
// 1. Direct mention always triggers response
// 2. Thread replies where we've participated - respond if relevant (let agent decide)
const mentioned = isBotMentioned(messageText, botShipName, botNickname ?? undefined);
const inParticipatedThread =
isThreadReply && parentId && participatedThreads.has(String(parentId));
if (!mentioned && !inParticipatedThread) {
return;
}
// Log why we're responding
if (inParticipatedThread && !mentioned) {
runtime.log?.(`[tlon] Responding to thread we participated in (no mention): ${parentId}`);
}
// Owner is always allowed
if (isOwner(senderShip)) {
runtime.log?.(`[tlon] Owner ${senderShip} is always allowed in channels`);
} else {
const { mode, allowedShips } = resolveChannelAuthorization(cfg, nest, currentSettings);
if (mode === "restricted") {
const normalizedAllowed = allowedShips.map(normalizeShip);
if (!normalizedAllowed.includes(senderShip)) {
// If owner is configured, queue approval request
if (effectiveOwnerShip) {
const approval = createPendingApproval({
type: "channel",
requestingShip: senderShip,
channelNest: nest,
messagePreview: messageText.substring(0, 100),
originalMessage: {
messageId: messageId ?? "",
messageText,
messageContent: content.content,
timestamp: content.sent || Date.now(),
parentId: parentId ?? undefined,
isThreadReply,
},
});
await queueApprovalRequest(approval);
} else {
runtime.log?.(
`[tlon] Access denied: ${senderShip} in ${nest} (allowed: ${allowedShips.join(", ")})`,
);
}
return;
}
}
}
const parsed = parseChannelNest(nest);
await processMessage({
messageId: messageId ?? "",
senderShip,
messageText,
messageContent: content.content, // Pass raw content for media extraction
isGroup: true,
channelNest: nest,
hostShip: parsed?.hostShip,
channelName: parsed?.channelName,
timestamp: content.sent || Date.now(),
parentId,
isThreadReply,
});
} catch (error: any) {
runtime.error?.(
`[tlon] Error handling channel firehose event: ${error?.message ?? String(error)}`,
);
}
};
// Firehose handler for all DM messages (/v3)
// Track which DM invites we've already processed to avoid duplicate accepts
const processedDmInvites = new Set<string>();
const handleChatFirehose = async (event: any) => {
try {
// Handle DM invite lists (arrays)
if (Array.isArray(event)) {
for (const invite of event as DmInvite[]) {
const ship = normalizeShip(invite.ship || "");
if (!ship || processedDmInvites.has(ship)) {
continue;
}
// Owner is always allowed
if (isOwner(ship)) {
try {
await api.poke({
app: "chat",
mark: "chat-dm-rsvp",
json: { ship, ok: true },
});
processedDmInvites.add(ship);
runtime.log?.(`[tlon] Auto-accepted DM invite from owner ${ship}`);
} catch (err) {
runtime.error?.(`[tlon] Failed to auto-accept DM from owner: ${String(err)}`);
}
continue;
}
// Auto-accept if on allowlist and auto-accept is enabled
if (effectiveAutoAcceptDmInvites && isDmAllowed(ship, effectiveDmAllowlist)) {
try {
await api.poke({
app: "chat",
mark: "chat-dm-rsvp",
json: { ship, ok: true },
});
processedDmInvites.add(ship);
runtime.log?.(`[tlon] Auto-accepted DM invite from ${ship}`);
} catch (err) {
runtime.error?.(`[tlon] Failed to auto-accept DM from ${ship}: ${String(err)}`);
}
continue;
}
// If owner is configured and ship is not on allowlist, queue approval
if (effectiveOwnerShip && !isDmAllowed(ship, effectiveDmAllowlist)) {
const approval = createPendingApproval({
type: "dm",
requestingShip: ship,
messagePreview: "(DM invite - no message yet)",
});
await queueApprovalRequest(approval);
processedDmInvites.add(ship); // Mark as processed to avoid duplicate notifications
}
}
return;
}
if (!("whom" in event) || !("response" in event)) {
return;
}
const whom = event.whom; // DM partner ship or club ID
const messageId = event.id;
const response = event.response;
// Handle add events (new messages)
const essay = response?.add?.essay;
if (!essay) {
return;
}
if (!processedTracker.mark(messageId)) {
return;
}
const authorShip = normalizeShip(essay.author ?? "");
const partnerShip = extractDmPartnerShip(whom);
const senderShip = partnerShip || authorShip;
// Ignore the bot's own outbound DM events.
if (authorShip === botShipName) {
return;
}
if (!senderShip || senderShip === botShipName) {
return;
}
// Log mismatch between author and partner for debugging
if (authorShip && partnerShip && authorShip !== partnerShip) {
runtime.log?.(
`[tlon] DM ship mismatch (author=${authorShip}, partner=${partnerShip}) - routing to partner`,
);
}
// Resolve any cited/quoted messages first
const citedContent = await resolveAllCites(essay.content);
const rawText = extractMessageText(essay.content);
const messageText = citedContent + rawText;
if (!messageText.trim()) {
return;
}
// Check if this is the owner sending an approval response
if (isOwner(senderShip) && isApprovalResponse(messageText)) {
const handled = await handleApprovalResponse(messageText);
if (handled) {
runtime.log?.(`[tlon] Processed approval response from owner: ${messageText}`);
return;
}
}
// Check if this is the owner sending an admin command
if (isOwner(senderShip) && isAdminCommand(messageText)) {
const handled = await handleAdminCommand(messageText);
if (handled) {
runtime.log?.(`[tlon] Processed admin command from owner: ${messageText}`);
return;
}
}
// Owner is always allowed to DM (bypass allowlist)
if (isOwner(senderShip)) {
runtime.log?.(`[tlon] Processing DM from owner ${senderShip}`);
await processMessage({
messageId: messageId ?? "",
senderShip,
messageText,
messageContent: essay.content,
isGroup: false,
timestamp: essay.sent || Date.now(),
});
return;
}
// For DMs from others, check allowlist
if (!isDmAllowed(senderShip, effectiveDmAllowlist)) {
// If owner is configured, queue approval request
if (effectiveOwnerShip) {
const approval = createPendingApproval({
type: "dm",
requestingShip: senderShip,
messagePreview: messageText.substring(0, 100),
originalMessage: {
messageId: messageId ?? "",
messageText,
messageContent: essay.content,
timestamp: essay.sent || Date.now(),
},
});
await queueApprovalRequest(approval);
} else {
runtime.log?.(`[tlon] Blocked DM from ${senderShip}: not in allowlist`);
}
return;
}
await processMessage({
messageId: messageId ?? "",
senderShip,
messageText,
messageContent: essay.content, // Pass raw content for media extraction
isGroup: false,
timestamp: essay.sent || Date.now(),
});
} catch (error: any) {
runtime.error?.(
`[tlon] Error handling chat firehose event: ${error?.message ?? String(error)}`,
);
}
};
try {
runtime.log?.("[tlon] Subscribing to firehose updates...");
// Subscribe to channels firehose (/v2)
await api.subscribe({
app: "channels",
path: "/v2",
event: handleChannelsFirehose,
err: (error) => {
runtime.error?.(`[tlon] Channels firehose error: ${String(error)}`);
},
quit: () => {
runtime.log?.("[tlon] Channels firehose subscription ended");
},
});
runtime.log?.("[tlon] Subscribed to channels firehose (/v2)");
// Subscribe to chat/DM firehose (/v3)
await api.subscribe({
app: "chat",
path: "/v3",
event: handleChatFirehose,
err: (error) => {
runtime.error?.(`[tlon] Chat firehose error: ${String(error)}`);
},
quit: () => {
runtime.log?.("[tlon] Chat firehose subscription ended");
},
});
runtime.log?.("[tlon] Subscribed to chat firehose (/v3)");
// Subscribe to contacts updates to track nickname changes
await api.subscribe({
app: "contacts",
path: "/v1/news",
event: (event: any) => {
try {
// Look for self profile updates
if (event?.self) {
const selfUpdate = event.self;
if (selfUpdate?.contact?.nickname?.value !== undefined) {
const newNickname = selfUpdate.contact.nickname.value || null;
if (newNickname !== botNickname) {
botNickname = newNickname;
runtime.log?.(`[tlon] Nickname updated: ${botNickname}`);
}
}
}
} catch (error: any) {
runtime.error?.(
`[tlon] Error handling contacts event: ${error?.message ?? String(error)}`,
);
}
},
err: (error) => {
runtime.error?.(`[tlon] Contacts subscription error: ${String(error)}`);
},
quit: () => {
runtime.log?.("[tlon] Contacts subscription ended");
},
});
runtime.log?.("[tlon] Subscribed to contacts updates (/v1/news)");
// Subscribe to settings store for hot-reloading config
settingsManager.onChange((newSettings) => {
currentSettings = newSettings;
// Update watched channels if settings changed
if (newSettings.groupChannels?.length) {
const newChannels = newSettings.groupChannels;
for (const ch of newChannels) {
if (!watchedChannels.has(ch)) {
watchedChannels.add(ch);
runtime.log?.(`[tlon] Settings: now watching channel ${ch}`);
}
}
// Note: we don't remove channels from watchedChannels to avoid missing messages
// during transitions. The authorization check handles access control.
}
// Update DM allowlist
if (newSettings.dmAllowlist !== undefined) {
effectiveDmAllowlist =
newSettings.dmAllowlist.length > 0 ? newSettings.dmAllowlist : account.dmAllowlist;
runtime.log?.(`[tlon] Settings: dmAllowlist updated to ${effectiveDmAllowlist.join(", ")}`);
}
// Update model signature setting
if (newSettings.showModelSig !== undefined) {
effectiveShowModelSig = newSettings.showModelSig;
runtime.log?.(`[tlon] Settings: showModelSig = ${effectiveShowModelSig}`);
}
// Update auto-accept DM invites setting
if (newSettings.autoAcceptDmInvites !== undefined) {
effectiveAutoAcceptDmInvites = newSettings.autoAcceptDmInvites;
runtime.log?.(`[tlon] Settings: autoAcceptDmInvites = ${effectiveAutoAcceptDmInvites}`);
}
// Update auto-accept group invites setting
if (newSettings.autoAcceptGroupInvites !== undefined) {
effectiveAutoAcceptGroupInvites = newSettings.autoAcceptGroupInvites;
runtime.log?.(
`[tlon] Settings: autoAcceptGroupInvites = ${effectiveAutoAcceptGroupInvites}`,
);
}
// Update group invite allowlist
if (newSettings.groupInviteAllowlist !== undefined) {
effectiveGroupInviteAllowlist =
newSettings.groupInviteAllowlist.length > 0
? newSettings.groupInviteAllowlist
: account.groupInviteAllowlist;
runtime.log?.(
`[tlon] Settings: groupInviteAllowlist updated to ${effectiveGroupInviteAllowlist.join(", ")}`,
);
}
if (newSettings.defaultAuthorizedShips !== undefined) {
runtime.log?.(
`[tlon] Settings: defaultAuthorizedShips updated to ${(newSettings.defaultAuthorizedShips || []).join(", ")}`,
);
}
// Update auto-discover channels
if (newSettings.autoDiscoverChannels !== undefined) {
effectiveAutoDiscoverChannels = newSettings.autoDiscoverChannels;
runtime.log?.(`[tlon] Settings: autoDiscoverChannels = ${effectiveAutoDiscoverChannels}`);
}
// Update owner ship
if (newSettings.ownerShip !== undefined) {
effectiveOwnerShip = newSettings.ownerShip
? normalizeShip(newSettings.ownerShip)
: account.ownerShip
? normalizeShip(account.ownerShip)
: null;
runtime.log?.(`[tlon] Settings: ownerShip = ${effectiveOwnerShip}`);
}
// Update pending approvals
if (newSettings.pendingApprovals !== undefined) {
pendingApprovals = newSettings.pendingApprovals;
runtime.log?.(
`[tlon] Settings: pendingApprovals updated (${pendingApprovals.length} items)`,
);
}
});
try {
await settingsManager.startSubscription();
} catch (err) {
// Settings subscription is optional - don't fail if it doesn't work
runtime.log?.(`[tlon] Settings subscription not available: ${String(err)}`);
}
// Subscribe to groups-ui for real-time channel additions (when invites are accepted)
try {
await api.subscribe({
app: "groups",
path: "/groups/ui",
event: async (event: any) => {
try {
// Handle group/channel join events
// Event structure: { group: { flag: "~host/group-name", ... }, channels: { ... } }
if (event && typeof event === "object") {
// Check for new channels being added to groups
if (event.channels && typeof event.channels === "object") {
const channels = event.channels as Record<string, any>;
for (const [channelNest, _channelData] of Object.entries(channels)) {
// Only monitor chat channels
if (!channelNest.startsWith("chat/")) {
continue;
}
// If this is a new channel we're not watching yet, add it
if (!watchedChannels.has(channelNest)) {
watchedChannels.add(channelNest);
runtime.log?.(
`[tlon] Auto-detected new channel (invite accepted): ${channelNest}`,
);
// Persist to settings store so it survives restarts
if (effectiveAutoAcceptGroupInvites) {
try {
const currentChannels = currentSettings.groupChannels || [];
if (!currentChannels.includes(channelNest)) {
const updatedChannels = [...currentChannels, channelNest];
// Poke settings store to persist
await api.poke({
app: "settings",
mark: "settings-event",
json: {
"put-entry": {
"bucket-key": "tlon",
"entry-key": "groupChannels",
value: updatedChannels,
desk: "moltbot",
},
},
});
runtime.log?.(`[tlon] Persisted ${channelNest} to settings store`);
}
} catch (err) {
runtime.error?.(
`[tlon] Failed to persist channel to settings: ${String(err)}`,
);
}
}
}
}
}
// Also check for the "join" event structure
if (event.join && typeof event.join === "object") {
const join = event.join as { group?: string; channels?: string[] };
if (join.channels) {
for (const channelNest of join.channels) {
if (!channelNest.startsWith("chat/")) {
continue;
}
if (!watchedChannels.has(channelNest)) {
watchedChannels.add(channelNest);
runtime.log?.(`[tlon] Auto-detected joined channel: ${channelNest}`);
// Persist to settings store
if (effectiveAutoAcceptGroupInvites) {
try {
const currentChannels = currentSettings.groupChannels || [];
if (!currentChannels.includes(channelNest)) {
const updatedChannels = [...currentChannels, channelNest];
await api.poke({
app: "settings",
mark: "settings-event",
json: {
"put-entry": {
"bucket-key": "tlon",
"entry-key": "groupChannels",
value: updatedChannels,
desk: "moltbot",
},
},
});
runtime.log?.(`[tlon] Persisted ${channelNest} to settings store`);
}
} catch (err) {
runtime.error?.(
`[tlon] Failed to persist channel to settings: ${String(err)}`,
);
}
}
}
}
}
}
}
} catch (error: any) {
runtime.error?.(
`[tlon] Error handling groups-ui event: ${error?.message ?? String(error)}`,
);
}
},
err: (error) => {
runtime.error?.(`[tlon] Groups-ui subscription error: ${String(error)}`);
},
quit: () => {
runtime.log?.("[tlon] Groups-ui subscription ended");
},
});
runtime.log?.("[tlon] Subscribed to groups-ui for real-time channel detection");
} catch (err) {
// Groups-ui subscription is optional - channel discovery will still work via polling
runtime.log?.(`[tlon] Groups-ui subscription failed (will rely on polling): ${String(err)}`);
}
// Subscribe to foreigns for auto-accepting group invites
// Always subscribe so we can hot-reload the setting via settings store
{
const processedGroupInvites = new Set<string>();
// Helper to process pending invites
const processPendingInvites = async (foreigns: Foreigns) => {
if (!foreigns || typeof foreigns !== "object") {
return;
}
for (const [groupFlag, foreign] of Object.entries(foreigns)) {
if (processedGroupInvites.has(groupFlag)) {
continue;
}
if (!foreign.invites || foreign.invites.length === 0) {
continue;
}
const validInvite = foreign.invites.find((inv) => inv.valid);
if (!validInvite) {
continue;
}
const inviterShip = validInvite.from;
const normalizedInviter = normalizeShip(inviterShip);
// Owner invites are always accepted
if (isOwner(inviterShip)) {
try {
await api.poke({
app: "groups",
mark: "group-join",
json: {
flag: groupFlag,
"join-all": true,
},
});
processedGroupInvites.add(groupFlag);
runtime.log?.(`[tlon] Auto-accepted group invite from owner: ${groupFlag}`);
} catch (err) {
runtime.error?.(`[tlon] Failed to accept group invite from owner: ${String(err)}`);
}
continue;
}
// Skip if auto-accept is disabled
if (!effectiveAutoAcceptGroupInvites) {
// If owner is configured, queue approval
if (effectiveOwnerShip) {
const approval = createPendingApproval({
type: "group",
requestingShip: inviterShip,
groupFlag,
});
await queueApprovalRequest(approval);
processedGroupInvites.add(groupFlag);
}
continue;
}
// Check if inviter is on allowlist
const isAllowed =
effectiveGroupInviteAllowlist.length > 0
? effectiveGroupInviteAllowlist
.map((s) => normalizeShip(s))
.some((s) => s === normalizedInviter)
: false; // Fail-safe: empty allowlist means deny
if (!isAllowed) {
// If owner is configured, queue approval
if (effectiveOwnerShip) {
const approval = createPendingApproval({
type: "group",
requestingShip: inviterShip,
groupFlag,
});
await queueApprovalRequest(approval);
processedGroupInvites.add(groupFlag);
} else {
runtime.log?.(
`[tlon] Rejected group invite from ${inviterShip} (not in groupInviteAllowlist): ${groupFlag}`,
);
processedGroupInvites.add(groupFlag);
}
continue;
}
// Inviter is on allowlist - accept the invite
try {
await api.poke({
app: "groups",
mark: "group-join",
json: {
flag: groupFlag,
"join-all": true,
},
});
processedGroupInvites.add(groupFlag);
runtime.log?.(
`[tlon] Auto-accepted group invite: ${groupFlag} (from ${validInvite.from})`,
);
} catch (err) {
runtime.error?.(`[tlon] Failed to auto-accept group ${groupFlag}: ${String(err)}`);
}
}
};
// Process existing pending invites from init data
if (initForeigns) {
await processPendingInvites(initForeigns);
}
try {
await api.subscribe({
app: "groups",
path: "/v1/foreigns",
event: (data: unknown) => {
void (async () => {
try {
await processPendingInvites(data as Foreigns);
} catch (error: any) {
runtime.error?.(
`[tlon] Error handling foreigns event: ${error?.message ?? String(error)}`,
);
}
})();
},
err: (error) => {
runtime.error?.(`[tlon] Foreigns subscription error: ${String(error)}`);
},
quit: () => {
runtime.log?.("[tlon] Foreigns subscription ended");
},
});
runtime.log?.(
"[tlon] Subscribed to foreigns (/v1/foreigns) for auto-accepting group invites",
);
} catch (err) {
runtime.log?.(`[tlon] Foreigns subscription failed: ${String(err)}`);
}
}
// Discover channels to watch
if (effectiveAutoDiscoverChannels) {
const discoveredChannels = await fetchAllChannels(api, runtime);
for (const channelNest of discoveredChannels) {
watchedChannels.add(channelNest);
}
runtime.log?.(`[tlon] Watching ${watchedChannels.size} channel(s)`);
}
// Log watched channels
for (const channelNest of watchedChannels) {
runtime.log?.(`[tlon] Watching channel: ${channelNest}`);
}
runtime.log?.("[tlon] All subscriptions registered, connecting to SSE stream...");
await api.connect();
runtime.log?.("[tlon] Connected! Firehose subscriptions active");
// Periodically refresh channel discovery
const pollInterval = setInterval(
async () => {
if (!opts.abortSignal?.aborted) {
try {
if (effectiveAutoDiscoverChannels) {
const discoveredChannels = await fetchAllChannels(api, runtime);
for (const channelNest of discoveredChannels) {
if (!watchedChannels.has(channelNest)) {
watchedChannels.add(channelNest);
runtime.log?.(`[tlon] Now watching new channel: ${channelNest}`);
}
}
}
} catch (error: any) {
runtime.error?.(`[tlon] Channel refresh error: ${error?.message ?? String(error)}`);
}
}
},
2 * 60 * 1000,
);
if (opts.abortSignal) {
const signal = opts.abortSignal;
await new Promise((resolve) => {
signal.addEventListener(
"abort",
() => {
clearInterval(pollInterval);
resolve(null);
},
{ once: true },
);
});
} else {
await new Promise(() => {});
}
} finally {
try {
await api?.close();
} catch (error: any) {
runtime.error?.(`[tlon] Cleanup error: ${error?.message ?? String(error)}`);
}
}
}