openclaw/extensions/feishu/src/streaming-card.ts
songlei 8a2273e210
feat(feishu): support optional header in streaming cards (openclaw#22826)
Add an optional `header` parameter to `FeishuStreamingSession.start()`
so that streaming cards can display a colored title bar, matching the
appearance of non-streaming interactive cards.

The Card Kit API already supports `header` alongside `streaming_mode`,
but the current implementation omits it, producing headerless cards.

This change is fully backward-compatible: when `header` is not provided,
behavior is identical to before.

Closes #13267 (partial)

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-27 23:21:22 -06:00

307 lines
9.7 KiB
TypeScript

/**
* Feishu Streaming Card - Card Kit streaming API for real-time text output
*/
import type { Client } from "@larksuiteoapi/node-sdk";
import { fetchWithSsrFGuard } from "openclaw/plugin-sdk";
import type { FeishuDomain } from "./types.js";
type Credentials = { appId: string; appSecret: string; domain?: FeishuDomain };
type CardState = { cardId: string; messageId: string; sequence: number; currentText: string };
/** Optional header for streaming cards (title bar with color template) */
export type StreamingCardHeader = {
title: string;
/** Color template: blue, green, red, orange, purple, indigo, wathet, turquoise, yellow, grey, carmine, violet, lime */
template?: string;
};
// Token cache (keyed by domain + appId)
const tokenCache = new Map<string, { token: string; expiresAt: number }>();
function resolveApiBase(domain?: FeishuDomain): string {
if (domain === "lark") {
return "https://open.larksuite.com/open-apis";
}
if (domain && domain !== "feishu" && domain.startsWith("http")) {
return `${domain.replace(/\/+$/, "")}/open-apis`;
}
return "https://open.feishu.cn/open-apis";
}
function resolveAllowedHostnames(domain?: FeishuDomain): string[] {
if (domain === "lark") {
return ["open.larksuite.com"];
}
if (domain && domain !== "feishu" && domain.startsWith("http")) {
try {
return [new URL(domain).hostname];
} catch {
return [];
}
}
return ["open.feishu.cn"];
}
async function getToken(creds: Credentials): Promise<string> {
const key = `${creds.domain ?? "feishu"}|${creds.appId}`;
const cached = tokenCache.get(key);
if (cached && cached.expiresAt > Date.now() + 60000) {
return cached.token;
}
const { response, release } = await fetchWithSsrFGuard({
url: `${resolveApiBase(creds.domain)}/auth/v3/tenant_access_token/internal`,
init: {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ app_id: creds.appId, app_secret: creds.appSecret }),
},
policy: { allowedHostnames: resolveAllowedHostnames(creds.domain) },
auditContext: "feishu.streaming-card.token",
});
const data = (await response.json()) as {
code: number;
msg: string;
tenant_access_token?: string;
expire?: number;
};
await release();
if (data.code !== 0 || !data.tenant_access_token) {
throw new Error(`Token error: ${data.msg}`);
}
tokenCache.set(key, {
token: data.tenant_access_token,
expiresAt: Date.now() + (data.expire ?? 7200) * 1000,
});
return data.tenant_access_token;
}
function truncateSummary(text: string, max = 50): string {
if (!text) {
return "";
}
const clean = text.replace(/\n/g, " ").trim();
return clean.length <= max ? clean : clean.slice(0, max - 3) + "...";
}
/** Streaming card session manager */
export class FeishuStreamingSession {
private client: Client;
private creds: Credentials;
private state: CardState | null = null;
private queue: Promise<void> = Promise.resolve();
private closed = false;
private log?: (msg: string) => void;
private lastUpdateTime = 0;
private pendingText: string | null = null;
private updateThrottleMs = 100; // Throttle updates to max 10/sec
constructor(client: Client, creds: Credentials, log?: (msg: string) => void) {
this.client = client;
this.creds = creds;
this.log = log;
}
async start(
receiveId: string,
receiveIdType: "open_id" | "user_id" | "union_id" | "email" | "chat_id" = "chat_id",
options?: {
replyToMessageId?: string;
replyInThread?: boolean;
rootId?: string;
header?: StreamingCardHeader;
},
): Promise<void> {
if (this.state) {
return;
}
const apiBase = resolveApiBase(this.creds.domain);
const cardJson: Record<string, unknown> = {
schema: "2.0",
config: {
streaming_mode: true,
summary: { content: "[Generating...]" },
streaming_config: { print_frequency_ms: { default: 50 }, print_step: { default: 2 } },
},
body: {
elements: [{ tag: "markdown", content: "⏳ Thinking...", element_id: "content" }],
},
};
if (options?.header) {
cardJson.header = {
title: { tag: "plain_text", content: options.header.title },
template: options.header.template ?? "blue",
};
}
// Create card entity
const { response: createRes, release: releaseCreate } = await fetchWithSsrFGuard({
url: `${apiBase}/cardkit/v1/cards`,
init: {
method: "POST",
headers: {
Authorization: `Bearer ${await getToken(this.creds)}`,
"Content-Type": "application/json",
},
body: JSON.stringify({ type: "card_json", data: JSON.stringify(cardJson) }),
},
policy: { allowedHostnames: resolveAllowedHostnames(this.creds.domain) },
auditContext: "feishu.streaming-card.create",
});
const createData = (await createRes.json()) as {
code: number;
msg: string;
data?: { card_id: string };
};
await releaseCreate();
if (createData.code !== 0 || !createData.data?.card_id) {
throw new Error(`Create card failed: ${createData.msg}`);
}
const cardId = createData.data.card_id;
const cardContent = JSON.stringify({ type: "card", data: { card_id: cardId } });
// Topic-group replies require root_id routing. Prefer create+root_id when available.
let sendRes;
if (options?.rootId) {
const createData = {
receive_id: receiveId,
msg_type: "interactive",
content: cardContent,
root_id: options.rootId,
};
sendRes = await this.client.im.message.create({
params: { receive_id_type: receiveIdType },
data: createData,
});
} else if (options?.replyToMessageId) {
sendRes = await this.client.im.message.reply({
path: { message_id: options.replyToMessageId },
data: {
msg_type: "interactive",
content: cardContent,
...(options.replyInThread ? { reply_in_thread: true } : {}),
},
});
} else {
sendRes = await this.client.im.message.create({
params: { receive_id_type: receiveIdType },
data: {
receive_id: receiveId,
msg_type: "interactive",
content: cardContent,
},
});
}
if (sendRes.code !== 0 || !sendRes.data?.message_id) {
throw new Error(`Send card failed: ${sendRes.msg}`);
}
this.state = { cardId, messageId: sendRes.data.message_id, sequence: 1, currentText: "" };
this.log?.(`Started streaming: cardId=${cardId}, messageId=${sendRes.data.message_id}`);
}
private async updateCardContent(text: string, onError?: (error: unknown) => void): Promise<void> {
if (!this.state) {
return;
}
const apiBase = resolveApiBase(this.creds.domain);
this.state.sequence += 1;
await fetchWithSsrFGuard({
url: `${apiBase}/cardkit/v1/cards/${this.state.cardId}/elements/content/content`,
init: {
method: "PUT",
headers: {
Authorization: `Bearer ${await getToken(this.creds)}`,
"Content-Type": "application/json",
},
body: JSON.stringify({
content: text,
sequence: this.state.sequence,
uuid: `s_${this.state.cardId}_${this.state.sequence}`,
}),
},
policy: { allowedHostnames: resolveAllowedHostnames(this.creds.domain) },
auditContext: "feishu.streaming-card.update",
})
.then(async ({ release }) => {
await release();
})
.catch((error) => onError?.(error));
}
async update(text: string): Promise<void> {
if (!this.state || this.closed) {
return;
}
// Throttle: skip if updated recently, but remember pending text
const now = Date.now();
if (now - this.lastUpdateTime < this.updateThrottleMs) {
this.pendingText = text;
return;
}
this.pendingText = null;
this.lastUpdateTime = now;
this.queue = this.queue.then(async () => {
if (!this.state || this.closed) {
return;
}
this.state.currentText = text;
await this.updateCardContent(text, (e) => this.log?.(`Update failed: ${String(e)}`));
});
await this.queue;
}
async close(finalText?: string): Promise<void> {
if (!this.state || this.closed) {
return;
}
this.closed = true;
await this.queue;
// Use finalText, or pending throttled text, or current text
const text = finalText ?? this.pendingText ?? this.state.currentText;
const apiBase = resolveApiBase(this.creds.domain);
// Only send final update if content differs from what's already displayed
if (text && text !== this.state.currentText) {
await this.updateCardContent(text);
this.state.currentText = text;
}
// Close streaming mode
this.state.sequence += 1;
await fetchWithSsrFGuard({
url: `${apiBase}/cardkit/v1/cards/${this.state.cardId}/settings`,
init: {
method: "PATCH",
headers: {
Authorization: `Bearer ${await getToken(this.creds)}`,
"Content-Type": "application/json; charset=utf-8",
},
body: JSON.stringify({
settings: JSON.stringify({
config: { streaming_mode: false, summary: { content: truncateSummary(text) } },
}),
sequence: this.state.sequence,
uuid: `c_${this.state.cardId}_${this.state.sequence}`,
}),
},
policy: { allowedHostnames: resolveAllowedHostnames(this.creds.domain) },
auditContext: "feishu.streaming-card.close",
})
.then(async ({ release }) => {
await release();
})
.catch((e) => this.log?.(`Close failed: ${String(e)}`));
this.log?.(`Closed streaming: cardId=${this.state.cardId}`);
}
isActive(): boolean {
return this.state !== null && !this.closed;
}
}