refactor: modularize slack/config/cron/daemon internals
This commit is contained in:
parent
5d3032b293
commit
f9cbcfca0d
32
src/config/redact-snapshot.raw.ts
Normal file
32
src/config/redact-snapshot.raw.ts
Normal file
@ -0,0 +1,32 @@
|
|||||||
|
import { isDeepStrictEqual } from "node:util";
|
||||||
|
import JSON5 from "json5";
|
||||||
|
|
||||||
|
export function replaceSensitiveValuesInRaw(params: {
|
||||||
|
raw: string;
|
||||||
|
sensitiveValues: string[];
|
||||||
|
redactedSentinel: string;
|
||||||
|
}): string {
|
||||||
|
const values = [...params.sensitiveValues].toSorted((a, b) => b.length - a.length);
|
||||||
|
let result = params.raw;
|
||||||
|
for (const value of values) {
|
||||||
|
result = result.replaceAll(value, params.redactedSentinel);
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
export function shouldFallbackToStructuredRawRedaction(params: {
|
||||||
|
redactedRaw: string;
|
||||||
|
originalConfig: unknown;
|
||||||
|
restoreParsed: (parsed: unknown) => { ok: boolean; result?: unknown };
|
||||||
|
}): boolean {
|
||||||
|
try {
|
||||||
|
const parsed = JSON5.parse(params.redactedRaw);
|
||||||
|
const restored = params.restoreParsed(parsed);
|
||||||
|
if (!restored.ok) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return !isDeepStrictEqual(restored.result, params.originalConfig);
|
||||||
|
} catch {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
20
src/config/redact-snapshot.secret-ref.ts
Normal file
20
src/config/redact-snapshot.secret-ref.ts
Normal file
@ -0,0 +1,20 @@
|
|||||||
|
export function isSecretRefShape(
|
||||||
|
value: Record<string, unknown>,
|
||||||
|
): value is Record<string, unknown> & { source: string; id: string } {
|
||||||
|
return typeof value.source === "string" && typeof value.id === "string";
|
||||||
|
}
|
||||||
|
|
||||||
|
export function redactSecretRefId(params: {
|
||||||
|
value: Record<string, unknown> & { source: string; id: string };
|
||||||
|
values: string[];
|
||||||
|
redactedSentinel: string;
|
||||||
|
isEnvVarPlaceholder: (value: string) => boolean;
|
||||||
|
}): Record<string, unknown> {
|
||||||
|
const { value, values, redactedSentinel, isEnvVarPlaceholder } = params;
|
||||||
|
const redacted: Record<string, unknown> = { ...value };
|
||||||
|
if (!isEnvVarPlaceholder(value.id)) {
|
||||||
|
values.push(value.id);
|
||||||
|
redacted.id = redactedSentinel;
|
||||||
|
}
|
||||||
|
return redacted;
|
||||||
|
}
|
||||||
@ -1,6 +1,10 @@
|
|||||||
import { isDeepStrictEqual } from "node:util";
|
|
||||||
import JSON5 from "json5";
|
import JSON5 from "json5";
|
||||||
import { createSubsystemLogger } from "../logging/subsystem.js";
|
import { createSubsystemLogger } from "../logging/subsystem.js";
|
||||||
|
import {
|
||||||
|
replaceSensitiveValuesInRaw,
|
||||||
|
shouldFallbackToStructuredRawRedaction,
|
||||||
|
} from "./redact-snapshot.raw.js";
|
||||||
|
import { isSecretRefShape, redactSecretRefId } from "./redact-snapshot.secret-ref.js";
|
||||||
import { isSensitiveConfigPath, type ConfigUiHints } from "./schema.hints.js";
|
import { isSensitiveConfigPath, type ConfigUiHints } from "./schema.hints.js";
|
||||||
import type { ConfigFileSnapshot } from "./types.openclaw.js";
|
import type { ConfigFileSnapshot } from "./types.openclaw.js";
|
||||||
|
|
||||||
@ -24,24 +28,6 @@ function isWholeObjectSensitivePath(path: string): boolean {
|
|||||||
return lowered.endsWith("serviceaccount") || lowered.endsWith("serviceaccountref");
|
return lowered.endsWith("serviceaccount") || lowered.endsWith("serviceaccountref");
|
||||||
}
|
}
|
||||||
|
|
||||||
function isSecretRefShape(
|
|
||||||
value: Record<string, unknown>,
|
|
||||||
): value is Record<string, unknown> & { source: string; id: string } {
|
|
||||||
return typeof value.source === "string" && typeof value.id === "string";
|
|
||||||
}
|
|
||||||
|
|
||||||
function redactSecretRef(
|
|
||||||
value: Record<string, unknown> & { source: string; id: string },
|
|
||||||
values: string[],
|
|
||||||
): Record<string, unknown> {
|
|
||||||
const redacted: Record<string, unknown> = { ...value };
|
|
||||||
if (!isEnvVarPlaceholder(value.id)) {
|
|
||||||
values.push(value.id);
|
|
||||||
redacted.id = REDACTED_SENTINEL;
|
|
||||||
}
|
|
||||||
return redacted;
|
|
||||||
}
|
|
||||||
|
|
||||||
function collectSensitiveStrings(value: unknown, values: string[]): void {
|
function collectSensitiveStrings(value: unknown, values: string[]): void {
|
||||||
if (typeof value === "string") {
|
if (typeof value === "string") {
|
||||||
if (!isEnvVarPlaceholder(value)) {
|
if (!isEnvVarPlaceholder(value)) {
|
||||||
@ -206,7 +192,12 @@ function redactObjectWithLookup(
|
|||||||
if (hints[candidate]?.sensitive === true && !Array.isArray(value)) {
|
if (hints[candidate]?.sensitive === true && !Array.isArray(value)) {
|
||||||
const objectValue = value as Record<string, unknown>;
|
const objectValue = value as Record<string, unknown>;
|
||||||
if (isSecretRefShape(objectValue)) {
|
if (isSecretRefShape(objectValue)) {
|
||||||
result[key] = redactSecretRef(objectValue, values);
|
result[key] = redactSecretRefId({
|
||||||
|
value: objectValue,
|
||||||
|
values,
|
||||||
|
redactedSentinel: REDACTED_SENTINEL,
|
||||||
|
isEnvVarPlaceholder,
|
||||||
|
});
|
||||||
} else {
|
} else {
|
||||||
collectSensitiveStrings(objectValue, values);
|
collectSensitiveStrings(objectValue, values);
|
||||||
result[key] = REDACTED_SENTINEL;
|
result[key] = REDACTED_SENTINEL;
|
||||||
@ -320,12 +311,11 @@ function redactObjectGuessing(
|
|||||||
*/
|
*/
|
||||||
function redactRawText(raw: string, config: unknown, hints?: ConfigUiHints): string {
|
function redactRawText(raw: string, config: unknown, hints?: ConfigUiHints): string {
|
||||||
const sensitiveValues = collectSensitiveValues(config, hints);
|
const sensitiveValues = collectSensitiveValues(config, hints);
|
||||||
sensitiveValues.sort((a, b) => b.length - a.length);
|
return replaceSensitiveValuesInRaw({
|
||||||
let result = raw;
|
raw,
|
||||||
for (const value of sensitiveValues) {
|
sensitiveValues,
|
||||||
result = result.replaceAll(value, REDACTED_SENTINEL);
|
redactedSentinel: REDACTED_SENTINEL,
|
||||||
}
|
});
|
||||||
return result;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
let suppressRestoreWarnings = false;
|
let suppressRestoreWarnings = false;
|
||||||
@ -340,25 +330,6 @@ function withRestoreWarningsSuppressed<T>(fn: () => T): T {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
function shouldFallbackToStructuredRawRedaction(params: {
|
|
||||||
redactedRaw: string;
|
|
||||||
originalConfig: unknown;
|
|
||||||
hints?: ConfigUiHints;
|
|
||||||
}): boolean {
|
|
||||||
try {
|
|
||||||
const parsed = JSON5.parse(params.redactedRaw);
|
|
||||||
const restored = withRestoreWarningsSuppressed(() =>
|
|
||||||
restoreRedactedValues(parsed, params.originalConfig, params.hints),
|
|
||||||
);
|
|
||||||
if (!restored.ok) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
return !isDeepStrictEqual(restored.result, params.originalConfig);
|
|
||||||
} catch {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns a copy of the config snapshot with all sensitive fields
|
* Returns a copy of the config snapshot with all sensitive fields
|
||||||
* replaced by {@link REDACTED_SENTINEL}. The `hash` is preserved
|
* replaced by {@link REDACTED_SENTINEL}. The `hash` is preserved
|
||||||
@ -410,7 +381,10 @@ export function redactConfigSnapshot(
|
|||||||
shouldFallbackToStructuredRawRedaction({
|
shouldFallbackToStructuredRawRedaction({
|
||||||
redactedRaw,
|
redactedRaw,
|
||||||
originalConfig: snapshot.config,
|
originalConfig: snapshot.config,
|
||||||
hints: uiHints,
|
restoreParsed: (parsed) =>
|
||||||
|
withRestoreWarningsSuppressed(() =>
|
||||||
|
restoreRedactedValues(parsed, snapshot.config, uiHints),
|
||||||
|
),
|
||||||
})
|
})
|
||||||
) {
|
) {
|
||||||
redactedRaw = JSON5.stringify(redactedParsed ?? redactedConfig, null, 2);
|
redactedRaw = JSON5.stringify(redactedParsed ?? redactedConfig, null, 2);
|
||||||
|
|||||||
59
src/cron/heartbeat-policy.test.ts
Normal file
59
src/cron/heartbeat-policy.test.ts
Normal file
@ -0,0 +1,59 @@
|
|||||||
|
import { describe, expect, it } from "vitest";
|
||||||
|
import {
|
||||||
|
shouldEnqueueCronMainSummary,
|
||||||
|
shouldSkipHeartbeatOnlyDelivery,
|
||||||
|
} from "./heartbeat-policy.js";
|
||||||
|
|
||||||
|
describe("shouldSkipHeartbeatOnlyDelivery", () => {
|
||||||
|
it("suppresses empty payloads", () => {
|
||||||
|
expect(shouldSkipHeartbeatOnlyDelivery([], 300)).toBe(true);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("suppresses when any payload is a heartbeat ack and no media is present", () => {
|
||||||
|
expect(
|
||||||
|
shouldSkipHeartbeatOnlyDelivery(
|
||||||
|
[{ text: "Checked inbox and calendar." }, { text: "HEARTBEAT_OK" }],
|
||||||
|
300,
|
||||||
|
),
|
||||||
|
).toBe(true);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("does not suppress when media is present", () => {
|
||||||
|
expect(
|
||||||
|
shouldSkipHeartbeatOnlyDelivery(
|
||||||
|
[{ text: "HEARTBEAT_OK", mediaUrl: "https://example.com/image.png" }],
|
||||||
|
300,
|
||||||
|
),
|
||||||
|
).toBe(false);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe("shouldEnqueueCronMainSummary", () => {
|
||||||
|
const isSystemEvent = (text: string) => text.includes("HEARTBEAT_OK");
|
||||||
|
|
||||||
|
it("enqueues only when delivery was requested but did not run", () => {
|
||||||
|
expect(
|
||||||
|
shouldEnqueueCronMainSummary({
|
||||||
|
summaryText: "HEARTBEAT_OK",
|
||||||
|
deliveryRequested: true,
|
||||||
|
delivered: false,
|
||||||
|
deliveryAttempted: false,
|
||||||
|
suppressMainSummary: false,
|
||||||
|
isCronSystemEvent: isSystemEvent,
|
||||||
|
}),
|
||||||
|
).toBe(true);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("does not enqueue after attempted outbound delivery", () => {
|
||||||
|
expect(
|
||||||
|
shouldEnqueueCronMainSummary({
|
||||||
|
summaryText: "HEARTBEAT_OK",
|
||||||
|
deliveryRequested: true,
|
||||||
|
delivered: false,
|
||||||
|
deliveryAttempted: true,
|
||||||
|
suppressMainSummary: false,
|
||||||
|
isCronSystemEvent: isSystemEvent,
|
||||||
|
}),
|
||||||
|
).toBe(false);
|
||||||
|
});
|
||||||
|
});
|
||||||
48
src/cron/heartbeat-policy.ts
Normal file
48
src/cron/heartbeat-policy.ts
Normal file
@ -0,0 +1,48 @@
|
|||||||
|
import { stripHeartbeatToken } from "../auto-reply/heartbeat.js";
|
||||||
|
|
||||||
|
export type HeartbeatDeliveryPayload = {
|
||||||
|
text?: string;
|
||||||
|
mediaUrl?: string;
|
||||||
|
mediaUrls?: string[];
|
||||||
|
};
|
||||||
|
|
||||||
|
export function shouldSkipHeartbeatOnlyDelivery(
|
||||||
|
payloads: HeartbeatDeliveryPayload[],
|
||||||
|
ackMaxChars: number,
|
||||||
|
): boolean {
|
||||||
|
if (payloads.length === 0) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
const hasAnyMedia = payloads.some(
|
||||||
|
(payload) => (payload.mediaUrls?.length ?? 0) > 0 || Boolean(payload.mediaUrl),
|
||||||
|
);
|
||||||
|
if (hasAnyMedia) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return payloads.some((payload) => {
|
||||||
|
const result = stripHeartbeatToken(payload.text, {
|
||||||
|
mode: "heartbeat",
|
||||||
|
maxAckChars: ackMaxChars,
|
||||||
|
});
|
||||||
|
return result.shouldSkip;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
export function shouldEnqueueCronMainSummary(params: {
|
||||||
|
summaryText: string | undefined;
|
||||||
|
deliveryRequested: boolean;
|
||||||
|
delivered: boolean | undefined;
|
||||||
|
deliveryAttempted: boolean | undefined;
|
||||||
|
suppressMainSummary: boolean;
|
||||||
|
isCronSystemEvent: (text: string) => boolean;
|
||||||
|
}): boolean {
|
||||||
|
const summaryText = params.summaryText?.trim();
|
||||||
|
return Boolean(
|
||||||
|
summaryText &&
|
||||||
|
params.isCronSystemEvent(summaryText) &&
|
||||||
|
params.deliveryRequested &&
|
||||||
|
!params.delivered &&
|
||||||
|
params.deliveryAttempted !== true &&
|
||||||
|
!params.suppressMainSummary,
|
||||||
|
);
|
||||||
|
}
|
||||||
@ -1,8 +1,6 @@
|
|||||||
import {
|
import { DEFAULT_HEARTBEAT_ACK_MAX_CHARS } from "../../auto-reply/heartbeat.js";
|
||||||
DEFAULT_HEARTBEAT_ACK_MAX_CHARS,
|
|
||||||
stripHeartbeatToken,
|
|
||||||
} from "../../auto-reply/heartbeat.js";
|
|
||||||
import { truncateUtf16Safe } from "../../utils.js";
|
import { truncateUtf16Safe } from "../../utils.js";
|
||||||
|
import { shouldSkipHeartbeatOnlyDelivery } from "../heartbeat-policy.js";
|
||||||
|
|
||||||
type DeliveryPayload = {
|
type DeliveryPayload = {
|
||||||
text?: string;
|
text?: string;
|
||||||
@ -91,27 +89,7 @@ export function pickLastDeliverablePayload(payloads: DeliveryPayload[]) {
|
|||||||
* Returns true when any payload is a heartbeat ack token and no payload contains media.
|
* Returns true when any payload is a heartbeat ack token and no payload contains media.
|
||||||
*/
|
*/
|
||||||
export function isHeartbeatOnlyResponse(payloads: DeliveryPayload[], ackMaxChars: number) {
|
export function isHeartbeatOnlyResponse(payloads: DeliveryPayload[], ackMaxChars: number) {
|
||||||
if (payloads.length === 0) {
|
return shouldSkipHeartbeatOnlyDelivery(payloads, ackMaxChars);
|
||||||
return true;
|
|
||||||
}
|
|
||||||
// If any payload has media, deliver regardless — there's real content.
|
|
||||||
const hasAnyMedia = payloads.some(
|
|
||||||
(payload) => (payload.mediaUrls?.length ?? 0) > 0 || Boolean(payload.mediaUrl),
|
|
||||||
);
|
|
||||||
if (hasAnyMedia) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
// An agent may emit multiple text payloads (narration, tool summaries)
|
|
||||||
// before a final HEARTBEAT_OK. If *any* payload is a heartbeat ack token,
|
|
||||||
// the agent is signaling "nothing needs attention" — the preceding text
|
|
||||||
// payloads are just internal narration and should not be delivered.
|
|
||||||
return payloads.some((payload) => {
|
|
||||||
const result = stripHeartbeatToken(payload.text, {
|
|
||||||
mode: "heartbeat",
|
|
||||||
maxAckChars: ackMaxChars,
|
|
||||||
});
|
|
||||||
return result.shouldSkip;
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
export function resolveHeartbeatAckMaxChars(agentCfg?: { heartbeat?: { ackMaxChars?: number } }) {
|
export function resolveHeartbeatAckMaxChars(agentCfg?: { heartbeat?: { ackMaxChars?: number } }) {
|
||||||
|
|||||||
@ -3,6 +3,7 @@ import { isCronSystemEvent } from "../../infra/heartbeat-events-filter.js";
|
|||||||
import type { HeartbeatRunResult } from "../../infra/heartbeat-wake.js";
|
import type { HeartbeatRunResult } from "../../infra/heartbeat-wake.js";
|
||||||
import { DEFAULT_AGENT_ID } from "../../routing/session-key.js";
|
import { DEFAULT_AGENT_ID } from "../../routing/session-key.js";
|
||||||
import { resolveCronDeliveryPlan } from "../delivery.js";
|
import { resolveCronDeliveryPlan } from "../delivery.js";
|
||||||
|
import { shouldEnqueueCronMainSummary } from "../heartbeat-policy.js";
|
||||||
import { sweepCronRunSessions } from "../session-reaper.js";
|
import { sweepCronRunSessions } from "../session-reaper.js";
|
||||||
import type {
|
import type {
|
||||||
CronDeliveryStatus,
|
CronDeliveryStatus,
|
||||||
@ -995,12 +996,14 @@ export async function executeJobCore(
|
|||||||
const suppressMainSummary =
|
const suppressMainSummary =
|
||||||
res.status === "error" && res.errorKind === "delivery-target" && deliveryPlan.requested;
|
res.status === "error" && res.errorKind === "delivery-target" && deliveryPlan.requested;
|
||||||
if (
|
if (
|
||||||
summaryText &&
|
shouldEnqueueCronMainSummary({
|
||||||
isCronSystemEvent(summaryText) &&
|
summaryText,
|
||||||
deliveryPlan.requested &&
|
deliveryRequested: deliveryPlan.requested,
|
||||||
!res.delivered &&
|
delivered: res.delivered,
|
||||||
res.deliveryAttempted !== true &&
|
deliveryAttempted: res.deliveryAttempted,
|
||||||
!suppressMainSummary
|
suppressMainSummary,
|
||||||
|
isCronSystemEvent,
|
||||||
|
})
|
||||||
) {
|
) {
|
||||||
const prefix = "Cron";
|
const prefix = "Cron";
|
||||||
const label =
|
const label =
|
||||||
|
|||||||
@ -329,58 +329,6 @@ describe("buildServiceEnvironment", () => {
|
|||||||
expect(env.http_proxy).toBe("http://proxy.local:7890");
|
expect(env.http_proxy).toBe("http://proxy.local:7890");
|
||||||
expect(env.all_proxy).toBe("socks5://proxy.local:1080");
|
expect(env.all_proxy).toBe("socks5://proxy.local:1080");
|
||||||
});
|
});
|
||||||
it("defaults NODE_EXTRA_CA_CERTS to system cert bundle on macOS", () => {
|
|
||||||
const env = buildServiceEnvironment({
|
|
||||||
env: { HOME: "/home/user" },
|
|
||||||
port: 18789,
|
|
||||||
platform: "darwin",
|
|
||||||
});
|
|
||||||
expect(env.NODE_EXTRA_CA_CERTS).toBe("/etc/ssl/cert.pem");
|
|
||||||
});
|
|
||||||
|
|
||||||
it("does not default NODE_EXTRA_CA_CERTS on non-macOS", () => {
|
|
||||||
const env = buildServiceEnvironment({
|
|
||||||
env: { HOME: "/home/user" },
|
|
||||||
port: 18789,
|
|
||||||
platform: "linux",
|
|
||||||
});
|
|
||||||
expect(env.NODE_EXTRA_CA_CERTS).toBeUndefined();
|
|
||||||
});
|
|
||||||
|
|
||||||
it("respects user-provided NODE_EXTRA_CA_CERTS over the default", () => {
|
|
||||||
const env = buildServiceEnvironment({
|
|
||||||
env: { HOME: "/home/user", NODE_EXTRA_CA_CERTS: "/custom/certs/ca.pem" },
|
|
||||||
port: 18789,
|
|
||||||
});
|
|
||||||
expect(env.NODE_EXTRA_CA_CERTS).toBe("/custom/certs/ca.pem");
|
|
||||||
});
|
|
||||||
|
|
||||||
it("defaults NODE_USE_SYSTEM_CA=1 on macOS", () => {
|
|
||||||
const env = buildServiceEnvironment({
|
|
||||||
env: { HOME: "/home/user" },
|
|
||||||
port: 18789,
|
|
||||||
platform: "darwin",
|
|
||||||
});
|
|
||||||
expect(env.NODE_USE_SYSTEM_CA).toBe("1");
|
|
||||||
});
|
|
||||||
|
|
||||||
it("does not default NODE_USE_SYSTEM_CA on non-macOS", () => {
|
|
||||||
const env = buildServiceEnvironment({
|
|
||||||
env: { HOME: "/home/user" },
|
|
||||||
port: 18789,
|
|
||||||
platform: "linux",
|
|
||||||
});
|
|
||||||
expect(env.NODE_USE_SYSTEM_CA).toBeUndefined();
|
|
||||||
});
|
|
||||||
|
|
||||||
it("respects user-provided NODE_USE_SYSTEM_CA over the default", () => {
|
|
||||||
const env = buildServiceEnvironment({
|
|
||||||
env: { HOME: "/home/user", NODE_USE_SYSTEM_CA: "0" },
|
|
||||||
port: 18789,
|
|
||||||
platform: "darwin",
|
|
||||||
});
|
|
||||||
expect(env.NODE_USE_SYSTEM_CA).toBe("0");
|
|
||||||
});
|
|
||||||
});
|
});
|
||||||
|
|
||||||
describe("buildNodeServiceEnvironment", () => {
|
describe("buildNodeServiceEnvironment", () => {
|
||||||
@ -453,51 +401,49 @@ describe("buildNodeServiceEnvironment", () => {
|
|||||||
});
|
});
|
||||||
expect(env.TMPDIR).toBe(os.tmpdir());
|
expect(env.TMPDIR).toBe(os.tmpdir());
|
||||||
});
|
});
|
||||||
|
});
|
||||||
|
|
||||||
it("defaults NODE_EXTRA_CA_CERTS to system cert bundle on macOS for node services", () => {
|
describe("shared Node TLS env defaults", () => {
|
||||||
const env = buildNodeServiceEnvironment({
|
const builders = [
|
||||||
env: { HOME: "/home/user" },
|
{
|
||||||
platform: "darwin",
|
name: "gateway service env",
|
||||||
});
|
build: (env: Record<string, string | undefined>, platform?: NodeJS.Platform) =>
|
||||||
|
buildServiceEnvironment({ env, port: 18789, platform }),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "node service env",
|
||||||
|
build: (env: Record<string, string | undefined>, platform?: NodeJS.Platform) =>
|
||||||
|
buildNodeServiceEnvironment({ env, platform }),
|
||||||
|
},
|
||||||
|
] as const;
|
||||||
|
|
||||||
|
it.each(builders)("$name defaults NODE_EXTRA_CA_CERTS on macOS", ({ build }) => {
|
||||||
|
const env = build({ HOME: "/home/user" }, "darwin");
|
||||||
expect(env.NODE_EXTRA_CA_CERTS).toBe("/etc/ssl/cert.pem");
|
expect(env.NODE_EXTRA_CA_CERTS).toBe("/etc/ssl/cert.pem");
|
||||||
});
|
});
|
||||||
|
|
||||||
it("does not default NODE_EXTRA_CA_CERTS on non-macOS for node services", () => {
|
it.each(builders)("$name does not default NODE_EXTRA_CA_CERTS on non-macOS", ({ build }) => {
|
||||||
const env = buildNodeServiceEnvironment({
|
const env = build({ HOME: "/home/user" }, "linux");
|
||||||
env: { HOME: "/home/user" },
|
|
||||||
platform: "linux",
|
|
||||||
});
|
|
||||||
expect(env.NODE_EXTRA_CA_CERTS).toBeUndefined();
|
expect(env.NODE_EXTRA_CA_CERTS).toBeUndefined();
|
||||||
});
|
});
|
||||||
|
|
||||||
it("respects user-provided NODE_EXTRA_CA_CERTS for node services", () => {
|
it.each(builders)("$name respects user-provided NODE_EXTRA_CA_CERTS", ({ build }) => {
|
||||||
const env = buildNodeServiceEnvironment({
|
const env = build({ HOME: "/home/user", NODE_EXTRA_CA_CERTS: "/custom/certs/ca.pem" });
|
||||||
env: { HOME: "/home/user", NODE_EXTRA_CA_CERTS: "/custom/certs/ca.pem" },
|
|
||||||
});
|
|
||||||
expect(env.NODE_EXTRA_CA_CERTS).toBe("/custom/certs/ca.pem");
|
expect(env.NODE_EXTRA_CA_CERTS).toBe("/custom/certs/ca.pem");
|
||||||
});
|
});
|
||||||
|
|
||||||
it("defaults NODE_USE_SYSTEM_CA=1 on macOS for node services", () => {
|
it.each(builders)("$name defaults NODE_USE_SYSTEM_CA=1 on macOS", ({ build }) => {
|
||||||
const env = buildNodeServiceEnvironment({
|
const env = build({ HOME: "/home/user" }, "darwin");
|
||||||
env: { HOME: "/home/user" },
|
|
||||||
platform: "darwin",
|
|
||||||
});
|
|
||||||
expect(env.NODE_USE_SYSTEM_CA).toBe("1");
|
expect(env.NODE_USE_SYSTEM_CA).toBe("1");
|
||||||
});
|
});
|
||||||
|
|
||||||
it("does not default NODE_USE_SYSTEM_CA on non-macOS for node services", () => {
|
it.each(builders)("$name does not default NODE_USE_SYSTEM_CA on non-macOS", ({ build }) => {
|
||||||
const env = buildNodeServiceEnvironment({
|
const env = build({ HOME: "/home/user" }, "linux");
|
||||||
env: { HOME: "/home/user" },
|
|
||||||
platform: "linux",
|
|
||||||
});
|
|
||||||
expect(env.NODE_USE_SYSTEM_CA).toBeUndefined();
|
expect(env.NODE_USE_SYSTEM_CA).toBeUndefined();
|
||||||
});
|
});
|
||||||
|
|
||||||
it("respects user-provided NODE_USE_SYSTEM_CA for node services", () => {
|
it.each(builders)("$name respects user-provided NODE_USE_SYSTEM_CA", ({ build }) => {
|
||||||
const env = buildNodeServiceEnvironment({
|
const env = build({ HOME: "/home/user", NODE_USE_SYSTEM_CA: "0" }, "darwin");
|
||||||
env: { HOME: "/home/user", NODE_USE_SYSTEM_CA: "0" },
|
|
||||||
platform: "darwin",
|
|
||||||
});
|
|
||||||
expect(env.NODE_USE_SYSTEM_CA).toBe("0");
|
expect(env.NODE_USE_SYSTEM_CA).toBe("0");
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|||||||
@ -25,6 +25,16 @@ type BuildServicePathOptions = MinimalServicePathOptions & {
|
|||||||
env?: Record<string, string | undefined>;
|
env?: Record<string, string | undefined>;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
type SharedServiceEnvironmentFields = {
|
||||||
|
stateDir: string | undefined;
|
||||||
|
configPath: string | undefined;
|
||||||
|
tmpDir: string;
|
||||||
|
minimalPath: string;
|
||||||
|
proxyEnv: Record<string, string | undefined>;
|
||||||
|
nodeCaCerts: string | undefined;
|
||||||
|
nodeUseSystemCa: string | undefined;
|
||||||
|
};
|
||||||
|
|
||||||
const SERVICE_PROXY_ENV_KEYS = [
|
const SERVICE_PROXY_ENV_KEYS = [
|
||||||
"HTTP_PROXY",
|
"HTTP_PROXY",
|
||||||
"HTTPS_PROXY",
|
"HTTPS_PROXY",
|
||||||
@ -246,15 +256,8 @@ export function buildServiceEnvironment(params: {
|
|||||||
launchdLabel || (platform === "darwin" ? resolveGatewayLaunchAgentLabel(profile) : undefined);
|
launchdLabel || (platform === "darwin" ? resolveGatewayLaunchAgentLabel(profile) : undefined);
|
||||||
const systemdUnit = `${resolveGatewaySystemdServiceName(profile)}.service`;
|
const systemdUnit = `${resolveGatewaySystemdServiceName(profile)}.service`;
|
||||||
return {
|
return {
|
||||||
HOME: env.HOME,
|
...buildCommonServiceEnvironment(env, sharedEnv),
|
||||||
TMPDIR: sharedEnv.tmpDir,
|
|
||||||
PATH: sharedEnv.minimalPath,
|
|
||||||
...sharedEnv.proxyEnv,
|
|
||||||
NODE_EXTRA_CA_CERTS: sharedEnv.nodeCaCerts,
|
|
||||||
NODE_USE_SYSTEM_CA: sharedEnv.nodeUseSystemCa,
|
|
||||||
OPENCLAW_PROFILE: profile,
|
OPENCLAW_PROFILE: profile,
|
||||||
OPENCLAW_STATE_DIR: sharedEnv.stateDir,
|
|
||||||
OPENCLAW_CONFIG_PATH: sharedEnv.configPath,
|
|
||||||
OPENCLAW_GATEWAY_PORT: String(port),
|
OPENCLAW_GATEWAY_PORT: String(port),
|
||||||
OPENCLAW_GATEWAY_TOKEN: token,
|
OPENCLAW_GATEWAY_TOKEN: token,
|
||||||
OPENCLAW_LAUNCHD_LABEL: resolvedLaunchdLabel,
|
OPENCLAW_LAUNCHD_LABEL: resolvedLaunchdLabel,
|
||||||
@ -275,14 +278,7 @@ export function buildNodeServiceEnvironment(params: {
|
|||||||
const gatewayToken =
|
const gatewayToken =
|
||||||
env.OPENCLAW_GATEWAY_TOKEN?.trim() || env.CLAWDBOT_GATEWAY_TOKEN?.trim() || undefined;
|
env.OPENCLAW_GATEWAY_TOKEN?.trim() || env.CLAWDBOT_GATEWAY_TOKEN?.trim() || undefined;
|
||||||
return {
|
return {
|
||||||
HOME: env.HOME,
|
...buildCommonServiceEnvironment(env, sharedEnv),
|
||||||
TMPDIR: sharedEnv.tmpDir,
|
|
||||||
PATH: sharedEnv.minimalPath,
|
|
||||||
...sharedEnv.proxyEnv,
|
|
||||||
NODE_EXTRA_CA_CERTS: sharedEnv.nodeCaCerts,
|
|
||||||
NODE_USE_SYSTEM_CA: sharedEnv.nodeUseSystemCa,
|
|
||||||
OPENCLAW_STATE_DIR: sharedEnv.stateDir,
|
|
||||||
OPENCLAW_CONFIG_PATH: sharedEnv.configPath,
|
|
||||||
OPENCLAW_GATEWAY_TOKEN: gatewayToken,
|
OPENCLAW_GATEWAY_TOKEN: gatewayToken,
|
||||||
OPENCLAW_LAUNCHD_LABEL: resolveNodeLaunchAgentLabel(),
|
OPENCLAW_LAUNCHD_LABEL: resolveNodeLaunchAgentLabel(),
|
||||||
OPENCLAW_SYSTEMD_UNIT: resolveNodeSystemdServiceName(),
|
OPENCLAW_SYSTEMD_UNIT: resolveNodeSystemdServiceName(),
|
||||||
@ -295,18 +291,26 @@ export function buildNodeServiceEnvironment(params: {
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function buildCommonServiceEnvironment(
|
||||||
|
env: Record<string, string | undefined>,
|
||||||
|
sharedEnv: SharedServiceEnvironmentFields,
|
||||||
|
): Record<string, string | undefined> {
|
||||||
|
return {
|
||||||
|
HOME: env.HOME,
|
||||||
|
TMPDIR: sharedEnv.tmpDir,
|
||||||
|
PATH: sharedEnv.minimalPath,
|
||||||
|
...sharedEnv.proxyEnv,
|
||||||
|
NODE_EXTRA_CA_CERTS: sharedEnv.nodeCaCerts,
|
||||||
|
NODE_USE_SYSTEM_CA: sharedEnv.nodeUseSystemCa,
|
||||||
|
OPENCLAW_STATE_DIR: sharedEnv.stateDir,
|
||||||
|
OPENCLAW_CONFIG_PATH: sharedEnv.configPath,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
function resolveSharedServiceEnvironmentFields(
|
function resolveSharedServiceEnvironmentFields(
|
||||||
env: Record<string, string | undefined>,
|
env: Record<string, string | undefined>,
|
||||||
platform: NodeJS.Platform,
|
platform: NodeJS.Platform,
|
||||||
): {
|
): SharedServiceEnvironmentFields {
|
||||||
stateDir: string | undefined;
|
|
||||||
configPath: string | undefined;
|
|
||||||
tmpDir: string;
|
|
||||||
minimalPath: string;
|
|
||||||
proxyEnv: Record<string, string | undefined>;
|
|
||||||
nodeCaCerts: string | undefined;
|
|
||||||
nodeUseSystemCa: string | undefined;
|
|
||||||
} {
|
|
||||||
const stateDir = env.OPENCLAW_STATE_DIR;
|
const stateDir = env.OPENCLAW_STATE_DIR;
|
||||||
const configPath = env.OPENCLAW_CONFIG_PATH;
|
const configPath = env.OPENCLAW_CONFIG_PATH;
|
||||||
// Keep a usable temp directory for supervised services even when the host env omits TMPDIR.
|
// Keep a usable temp directory for supervised services even when the host env omits TMPDIR.
|
||||||
|
|||||||
106
src/slack/monitor/message-handler/prepare-content.ts
Normal file
106
src/slack/monitor/message-handler/prepare-content.ts
Normal file
@ -0,0 +1,106 @@
|
|||||||
|
import { logVerbose } from "../../../globals.js";
|
||||||
|
import type { SlackFile, SlackMessageEvent } from "../../types.js";
|
||||||
|
import {
|
||||||
|
MAX_SLACK_MEDIA_FILES,
|
||||||
|
resolveSlackAttachmentContent,
|
||||||
|
resolveSlackMedia,
|
||||||
|
type SlackMediaResult,
|
||||||
|
type SlackThreadStarter,
|
||||||
|
} from "../media.js";
|
||||||
|
|
||||||
|
export type SlackResolvedMessageContent = {
|
||||||
|
rawBody: string;
|
||||||
|
effectiveDirectMedia: SlackMediaResult[] | null;
|
||||||
|
};
|
||||||
|
|
||||||
|
function filterInheritedParentFiles(params: {
|
||||||
|
files: SlackFile[] | undefined;
|
||||||
|
isThreadReply: boolean;
|
||||||
|
threadStarter: SlackThreadStarter | null;
|
||||||
|
}): SlackFile[] | undefined {
|
||||||
|
const { files, isThreadReply, threadStarter } = params;
|
||||||
|
if (!isThreadReply || !files?.length) {
|
||||||
|
return files;
|
||||||
|
}
|
||||||
|
if (!threadStarter?.files?.length) {
|
||||||
|
return files;
|
||||||
|
}
|
||||||
|
const starterFileIds = new Set(threadStarter.files.map((file) => file.id));
|
||||||
|
const filtered = files.filter((file) => !file.id || !starterFileIds.has(file.id));
|
||||||
|
if (filtered.length < files.length) {
|
||||||
|
logVerbose(
|
||||||
|
`slack: filtered ${files.length - filtered.length} inherited parent file(s) from thread reply`,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
return filtered.length > 0 ? filtered : undefined;
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function resolveSlackMessageContent(params: {
|
||||||
|
message: SlackMessageEvent;
|
||||||
|
isThreadReply: boolean;
|
||||||
|
threadStarter: SlackThreadStarter | null;
|
||||||
|
isBotMessage: boolean;
|
||||||
|
botToken: string;
|
||||||
|
mediaMaxBytes: number;
|
||||||
|
}): Promise<SlackResolvedMessageContent | null> {
|
||||||
|
const ownFiles = filterInheritedParentFiles({
|
||||||
|
files: params.message.files,
|
||||||
|
isThreadReply: params.isThreadReply,
|
||||||
|
threadStarter: params.threadStarter,
|
||||||
|
});
|
||||||
|
|
||||||
|
const media = await resolveSlackMedia({
|
||||||
|
files: ownFiles,
|
||||||
|
token: params.botToken,
|
||||||
|
maxBytes: params.mediaMaxBytes,
|
||||||
|
});
|
||||||
|
|
||||||
|
const attachmentContent = await resolveSlackAttachmentContent({
|
||||||
|
attachments: params.message.attachments,
|
||||||
|
token: params.botToken,
|
||||||
|
maxBytes: params.mediaMaxBytes,
|
||||||
|
});
|
||||||
|
|
||||||
|
const mergedMedia = [...(media ?? []), ...(attachmentContent?.media ?? [])];
|
||||||
|
const effectiveDirectMedia = mergedMedia.length > 0 ? mergedMedia : null;
|
||||||
|
const mediaPlaceholder = effectiveDirectMedia
|
||||||
|
? effectiveDirectMedia.map((item) => item.placeholder).join(" ")
|
||||||
|
: undefined;
|
||||||
|
|
||||||
|
const fallbackFiles = ownFiles ?? [];
|
||||||
|
const fileOnlyFallback =
|
||||||
|
!mediaPlaceholder && fallbackFiles.length > 0
|
||||||
|
? fallbackFiles
|
||||||
|
.slice(0, MAX_SLACK_MEDIA_FILES)
|
||||||
|
.map((file) => file.name?.trim() || "file")
|
||||||
|
.join(", ")
|
||||||
|
: undefined;
|
||||||
|
const fileOnlyPlaceholder = fileOnlyFallback ? `[Slack file: ${fileOnlyFallback}]` : undefined;
|
||||||
|
|
||||||
|
const botAttachmentText =
|
||||||
|
params.isBotMessage && !attachmentContent?.text
|
||||||
|
? (params.message.attachments ?? [])
|
||||||
|
.map((attachment) => attachment.text?.trim() || attachment.fallback?.trim())
|
||||||
|
.filter(Boolean)
|
||||||
|
.join("\n")
|
||||||
|
: undefined;
|
||||||
|
|
||||||
|
const rawBody =
|
||||||
|
[
|
||||||
|
(params.message.text ?? "").trim(),
|
||||||
|
attachmentContent?.text,
|
||||||
|
botAttachmentText,
|
||||||
|
mediaPlaceholder,
|
||||||
|
fileOnlyPlaceholder,
|
||||||
|
]
|
||||||
|
.filter(Boolean)
|
||||||
|
.join("\n") || "";
|
||||||
|
if (!rawBody) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
return {
|
||||||
|
rawBody,
|
||||||
|
effectiveDirectMedia,
|
||||||
|
};
|
||||||
|
}
|
||||||
137
src/slack/monitor/message-handler/prepare-thread-context.ts
Normal file
137
src/slack/monitor/message-handler/prepare-thread-context.ts
Normal file
@ -0,0 +1,137 @@
|
|||||||
|
import { formatInboundEnvelope } from "../../../auto-reply/envelope.js";
|
||||||
|
import { readSessionUpdatedAt } from "../../../config/sessions.js";
|
||||||
|
import { logVerbose } from "../../../globals.js";
|
||||||
|
import type { ResolvedSlackAccount } from "../../accounts.js";
|
||||||
|
import type { SlackMessageEvent } from "../../types.js";
|
||||||
|
import type { SlackMonitorContext } from "../context.js";
|
||||||
|
import {
|
||||||
|
resolveSlackMedia,
|
||||||
|
resolveSlackThreadHistory,
|
||||||
|
type SlackMediaResult,
|
||||||
|
type SlackThreadStarter,
|
||||||
|
} from "../media.js";
|
||||||
|
|
||||||
|
export type SlackThreadContextData = {
|
||||||
|
threadStarterBody: string | undefined;
|
||||||
|
threadHistoryBody: string | undefined;
|
||||||
|
threadSessionPreviousTimestamp: number | undefined;
|
||||||
|
threadLabel: string | undefined;
|
||||||
|
threadStarterMedia: SlackMediaResult[] | null;
|
||||||
|
};
|
||||||
|
|
||||||
|
export async function resolveSlackThreadContextData(params: {
|
||||||
|
ctx: SlackMonitorContext;
|
||||||
|
account: ResolvedSlackAccount;
|
||||||
|
message: SlackMessageEvent;
|
||||||
|
isThreadReply: boolean;
|
||||||
|
threadTs: string | undefined;
|
||||||
|
threadStarter: SlackThreadStarter | null;
|
||||||
|
roomLabel: string;
|
||||||
|
storePath: string;
|
||||||
|
sessionKey: string;
|
||||||
|
envelopeOptions: ReturnType<
|
||||||
|
typeof import("../../../auto-reply/envelope.js").resolveEnvelopeFormatOptions
|
||||||
|
>;
|
||||||
|
effectiveDirectMedia: SlackMediaResult[] | null;
|
||||||
|
}): Promise<SlackThreadContextData> {
|
||||||
|
let threadStarterBody: string | undefined;
|
||||||
|
let threadHistoryBody: string | undefined;
|
||||||
|
let threadSessionPreviousTimestamp: number | undefined;
|
||||||
|
let threadLabel: string | undefined;
|
||||||
|
let threadStarterMedia: SlackMediaResult[] | null = null;
|
||||||
|
|
||||||
|
if (!params.isThreadReply || !params.threadTs) {
|
||||||
|
return {
|
||||||
|
threadStarterBody,
|
||||||
|
threadHistoryBody,
|
||||||
|
threadSessionPreviousTimestamp,
|
||||||
|
threadLabel,
|
||||||
|
threadStarterMedia,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
const starter = params.threadStarter;
|
||||||
|
if (starter?.text) {
|
||||||
|
threadStarterBody = starter.text;
|
||||||
|
const snippet = starter.text.replace(/\s+/g, " ").slice(0, 80);
|
||||||
|
threadLabel = `Slack thread ${params.roomLabel}${snippet ? `: ${snippet}` : ""}`;
|
||||||
|
if (!params.effectiveDirectMedia && starter.files && starter.files.length > 0) {
|
||||||
|
threadStarterMedia = await resolveSlackMedia({
|
||||||
|
files: starter.files,
|
||||||
|
token: params.ctx.botToken,
|
||||||
|
maxBytes: params.ctx.mediaMaxBytes,
|
||||||
|
});
|
||||||
|
if (threadStarterMedia) {
|
||||||
|
const starterPlaceholders = threadStarterMedia.map((item) => item.placeholder).join(", ");
|
||||||
|
logVerbose(`slack: hydrated thread starter file ${starterPlaceholders} from root message`);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
threadLabel = `Slack thread ${params.roomLabel}`;
|
||||||
|
}
|
||||||
|
|
||||||
|
const threadInitialHistoryLimit = params.account.config?.thread?.initialHistoryLimit ?? 20;
|
||||||
|
threadSessionPreviousTimestamp = readSessionUpdatedAt({
|
||||||
|
storePath: params.storePath,
|
||||||
|
sessionKey: params.sessionKey,
|
||||||
|
});
|
||||||
|
|
||||||
|
if (threadInitialHistoryLimit > 0 && !threadSessionPreviousTimestamp) {
|
||||||
|
const threadHistory = await resolveSlackThreadHistory({
|
||||||
|
channelId: params.message.channel,
|
||||||
|
threadTs: params.threadTs,
|
||||||
|
client: params.ctx.app.client,
|
||||||
|
currentMessageTs: params.message.ts,
|
||||||
|
limit: threadInitialHistoryLimit,
|
||||||
|
});
|
||||||
|
|
||||||
|
if (threadHistory.length > 0) {
|
||||||
|
const uniqueUserIds = [
|
||||||
|
...new Set(
|
||||||
|
threadHistory.map((item) => item.userId).filter((id): id is string => Boolean(id)),
|
||||||
|
),
|
||||||
|
];
|
||||||
|
const userMap = new Map<string, { name?: string }>();
|
||||||
|
await Promise.all(
|
||||||
|
uniqueUserIds.map(async (id) => {
|
||||||
|
const user = await params.ctx.resolveUserName(id);
|
||||||
|
if (user) {
|
||||||
|
userMap.set(id, user);
|
||||||
|
}
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
|
||||||
|
const historyParts: string[] = [];
|
||||||
|
for (const historyMsg of threadHistory) {
|
||||||
|
const msgUser = historyMsg.userId ? userMap.get(historyMsg.userId) : null;
|
||||||
|
const msgSenderName =
|
||||||
|
msgUser?.name ?? (historyMsg.botId ? `Bot (${historyMsg.botId})` : "Unknown");
|
||||||
|
const isBot = Boolean(historyMsg.botId);
|
||||||
|
const role = isBot ? "assistant" : "user";
|
||||||
|
const msgWithId = `${historyMsg.text}\n[slack message id: ${historyMsg.ts ?? "unknown"} channel: ${params.message.channel}]`;
|
||||||
|
historyParts.push(
|
||||||
|
formatInboundEnvelope({
|
||||||
|
channel: "Slack",
|
||||||
|
from: `${msgSenderName} (${role})`,
|
||||||
|
timestamp: historyMsg.ts ? Math.round(Number(historyMsg.ts) * 1000) : undefined,
|
||||||
|
body: msgWithId,
|
||||||
|
chatType: "channel",
|
||||||
|
envelope: params.envelopeOptions,
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
threadHistoryBody = historyParts.join("\n\n");
|
||||||
|
logVerbose(
|
||||||
|
`slack: populated thread history with ${threadHistory.length} messages for new session`,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return {
|
||||||
|
threadStarterBody,
|
||||||
|
threadHistoryBody,
|
||||||
|
threadSessionPreviousTimestamp,
|
||||||
|
threadLabel,
|
||||||
|
threadStarterMedia,
|
||||||
|
};
|
||||||
|
}
|
||||||
@ -46,14 +46,10 @@ import { resolveSlackChannelConfig } from "../channel-config.js";
|
|||||||
import { stripSlackMentionsForCommandDetection } from "../commands.js";
|
import { stripSlackMentionsForCommandDetection } from "../commands.js";
|
||||||
import { normalizeSlackChannelType, type SlackMonitorContext } from "../context.js";
|
import { normalizeSlackChannelType, type SlackMonitorContext } from "../context.js";
|
||||||
import { authorizeSlackDirectMessage } from "../dm-auth.js";
|
import { authorizeSlackDirectMessage } from "../dm-auth.js";
|
||||||
import {
|
import { resolveSlackThreadStarter } from "../media.js";
|
||||||
resolveSlackAttachmentContent,
|
|
||||||
MAX_SLACK_MEDIA_FILES,
|
|
||||||
resolveSlackMedia,
|
|
||||||
resolveSlackThreadHistory,
|
|
||||||
resolveSlackThreadStarter,
|
|
||||||
} from "../media.js";
|
|
||||||
import { resolveSlackRoomContextHints } from "../room-context.js";
|
import { resolveSlackRoomContextHints } from "../room-context.js";
|
||||||
|
import { resolveSlackMessageContent } from "./prepare-content.js";
|
||||||
|
import { resolveSlackThreadContextData } from "./prepare-thread-context.js";
|
||||||
import type { PreparedSlackMessage } from "./types.js";
|
import type { PreparedSlackMessage } from "./types.js";
|
||||||
|
|
||||||
const mentionRegexCache = new WeakMap<SlackMonitorContext, Map<string, RegExp[]>>();
|
const mentionRegexCache = new WeakMap<SlackMonitorContext, Map<string, RegExp[]>>();
|
||||||
@ -515,87 +511,26 @@ export async function prepareSlackMessage(params: {
|
|||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
// When processing a thread reply, filter out files that belong to the thread
|
const threadStarter =
|
||||||
// starter (parent message). Slack's Events API includes the parent's `files`
|
isThreadReply && threadTs
|
||||||
// array in every thread reply payload, which causes ghost media attachments
|
? await resolveSlackThreadStarter({
|
||||||
// on text-only replies. We eagerly resolve the thread starter here (the result
|
channelId: message.channel,
|
||||||
// is cached) and exclude any file IDs that match the parent. (#32203)
|
threadTs,
|
||||||
let ownFiles = message.files;
|
client: ctx.app.client,
|
||||||
if (isThreadReply && threadTs && message.files?.length) {
|
})
|
||||||
const starter = await resolveSlackThreadStarter({
|
: null;
|
||||||
channelId: message.channel,
|
const resolvedMessageContent = await resolveSlackMessageContent({
|
||||||
threadTs,
|
message,
|
||||||
client: ctx.app.client,
|
isThreadReply,
|
||||||
});
|
threadStarter,
|
||||||
if (starter?.files?.length) {
|
isBotMessage,
|
||||||
const starterFileIds = new Set(starter.files.map((f) => f.id));
|
botToken: ctx.botToken,
|
||||||
const filtered = message.files.filter((f) => !f.id || !starterFileIds.has(f.id));
|
mediaMaxBytes: ctx.mediaMaxBytes,
|
||||||
if (filtered.length < message.files.length) {
|
|
||||||
logVerbose(
|
|
||||||
`slack: filtered ${message.files.length - filtered.length} inherited parent file(s) from thread reply`,
|
|
||||||
);
|
|
||||||
}
|
|
||||||
ownFiles = filtered.length > 0 ? filtered : undefined;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
const media = await resolveSlackMedia({
|
|
||||||
files: ownFiles,
|
|
||||||
token: ctx.botToken,
|
|
||||||
maxBytes: ctx.mediaMaxBytes,
|
|
||||||
});
|
});
|
||||||
|
if (!resolvedMessageContent) {
|
||||||
// Resolve forwarded message content (text + media) from Slack attachments
|
|
||||||
const attachmentContent = await resolveSlackAttachmentContent({
|
|
||||||
attachments: message.attachments,
|
|
||||||
token: ctx.botToken,
|
|
||||||
maxBytes: ctx.mediaMaxBytes,
|
|
||||||
});
|
|
||||||
|
|
||||||
// Merge forwarded media into the message's media array
|
|
||||||
const mergedMedia = [...(media ?? []), ...(attachmentContent?.media ?? [])];
|
|
||||||
const effectiveDirectMedia = mergedMedia.length > 0 ? mergedMedia : null;
|
|
||||||
|
|
||||||
const mediaPlaceholder = effectiveDirectMedia
|
|
||||||
? effectiveDirectMedia.map((m) => m.placeholder).join(" ")
|
|
||||||
: undefined;
|
|
||||||
|
|
||||||
// When files were attached but all downloads failed, create a fallback
|
|
||||||
// placeholder so the message is still delivered to the agent instead of
|
|
||||||
// being silently dropped (#25064).
|
|
||||||
const fileOnlyFallback =
|
|
||||||
!mediaPlaceholder && (message.files?.length ?? 0) > 0
|
|
||||||
? message
|
|
||||||
.files!.slice(0, MAX_SLACK_MEDIA_FILES)
|
|
||||||
.map((f) => f.name?.trim() || "file")
|
|
||||||
.join(", ")
|
|
||||||
: undefined;
|
|
||||||
const fileOnlyPlaceholder = fileOnlyFallback ? `[Slack file: ${fileOnlyFallback}]` : undefined;
|
|
||||||
|
|
||||||
// Bot messages (e.g. Prometheus, Gatus webhooks) often carry content only in
|
|
||||||
// non-forwarded attachments (is_share !== true). Extract their text/fallback
|
|
||||||
// so the message isn't silently dropped when `allowBots: true` (#27616).
|
|
||||||
const botAttachmentText =
|
|
||||||
isBotMessage && !attachmentContent?.text
|
|
||||||
? (message.attachments ?? [])
|
|
||||||
.map((a) => a.text?.trim() || a.fallback?.trim())
|
|
||||||
.filter(Boolean)
|
|
||||||
.join("\n")
|
|
||||||
: undefined;
|
|
||||||
|
|
||||||
const rawBody =
|
|
||||||
[
|
|
||||||
(message.text ?? "").trim(),
|
|
||||||
attachmentContent?.text,
|
|
||||||
botAttachmentText,
|
|
||||||
mediaPlaceholder,
|
|
||||||
fileOnlyPlaceholder,
|
|
||||||
]
|
|
||||||
.filter(Boolean)
|
|
||||||
.join("\n") || "";
|
|
||||||
if (!rawBody) {
|
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
const { rawBody, effectiveDirectMedia } = resolvedMessageContent;
|
||||||
|
|
||||||
const ackReaction = resolveAckReaction(cfg, route.agentId, {
|
const ackReaction = resolveAckReaction(cfg, route.agentId, {
|
||||||
channel: "slack",
|
channel: "slack",
|
||||||
@ -711,99 +646,25 @@ export async function prepareSlackMessage(params: {
|
|||||||
channelConfig,
|
channelConfig,
|
||||||
});
|
});
|
||||||
|
|
||||||
let threadStarterBody: string | undefined;
|
const {
|
||||||
let threadHistoryBody: string | undefined;
|
threadStarterBody,
|
||||||
let threadSessionPreviousTimestamp: number | undefined;
|
threadHistoryBody,
|
||||||
let threadLabel: string | undefined;
|
threadSessionPreviousTimestamp,
|
||||||
let threadStarterMedia: Awaited<ReturnType<typeof resolveSlackMedia>> = null;
|
threadLabel,
|
||||||
if (isThreadReply && threadTs) {
|
threadStarterMedia,
|
||||||
const starter = await resolveSlackThreadStarter({
|
} = await resolveSlackThreadContextData({
|
||||||
channelId: message.channel,
|
ctx,
|
||||||
threadTs,
|
account,
|
||||||
client: ctx.app.client,
|
message,
|
||||||
});
|
isThreadReply,
|
||||||
if (starter?.text) {
|
threadTs,
|
||||||
// Keep thread starter as raw text; metadata is provided out-of-band in the system prompt.
|
threadStarter,
|
||||||
threadStarterBody = starter.text;
|
roomLabel,
|
||||||
const snippet = starter.text.replace(/\s+/g, " ").slice(0, 80);
|
storePath,
|
||||||
threadLabel = `Slack thread ${roomLabel}${snippet ? `: ${snippet}` : ""}`;
|
sessionKey,
|
||||||
// If current message has no files but thread starter does, fetch starter's files
|
envelopeOptions,
|
||||||
if (!effectiveDirectMedia && starter.files && starter.files.length > 0) {
|
effectiveDirectMedia,
|
||||||
threadStarterMedia = await resolveSlackMedia({
|
});
|
||||||
files: starter.files,
|
|
||||||
token: ctx.botToken,
|
|
||||||
maxBytes: ctx.mediaMaxBytes,
|
|
||||||
});
|
|
||||||
if (threadStarterMedia) {
|
|
||||||
const starterPlaceholders = threadStarterMedia.map((m) => m.placeholder).join(", ");
|
|
||||||
logVerbose(
|
|
||||||
`slack: hydrated thread starter file ${starterPlaceholders} from root message`,
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
threadLabel = `Slack thread ${roomLabel}`;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Fetch full thread history for new thread sessions
|
|
||||||
// This provides context of previous messages (including bot replies) in the thread
|
|
||||||
// Use the thread session key (not base session key) to determine if this is a new session
|
|
||||||
const threadInitialHistoryLimit = account.config?.thread?.initialHistoryLimit ?? 20;
|
|
||||||
threadSessionPreviousTimestamp = readSessionUpdatedAt({
|
|
||||||
storePath,
|
|
||||||
sessionKey, // Thread-specific session key
|
|
||||||
});
|
|
||||||
// Only fetch thread history for NEW sessions (existing sessions already have this context in their transcript)
|
|
||||||
if (threadInitialHistoryLimit > 0 && !threadSessionPreviousTimestamp) {
|
|
||||||
const threadHistory = await resolveSlackThreadHistory({
|
|
||||||
channelId: message.channel,
|
|
||||||
threadTs,
|
|
||||||
client: ctx.app.client,
|
|
||||||
currentMessageTs: message.ts,
|
|
||||||
limit: threadInitialHistoryLimit,
|
|
||||||
});
|
|
||||||
|
|
||||||
if (threadHistory.length > 0) {
|
|
||||||
// Batch resolve user names to avoid N sequential API calls
|
|
||||||
const uniqueUserIds = [
|
|
||||||
...new Set(threadHistory.map((m) => m.userId).filter((id): id is string => Boolean(id))),
|
|
||||||
];
|
|
||||||
const userMap = new Map<string, { name?: string }>();
|
|
||||||
await Promise.all(
|
|
||||||
uniqueUserIds.map(async (id) => {
|
|
||||||
const user = await ctx.resolveUserName(id);
|
|
||||||
if (user) {
|
|
||||||
userMap.set(id, user);
|
|
||||||
}
|
|
||||||
}),
|
|
||||||
);
|
|
||||||
|
|
||||||
const historyParts: string[] = [];
|
|
||||||
for (const historyMsg of threadHistory) {
|
|
||||||
const msgUser = historyMsg.userId ? userMap.get(historyMsg.userId) : null;
|
|
||||||
const msgSenderName =
|
|
||||||
msgUser?.name ?? (historyMsg.botId ? `Bot (${historyMsg.botId})` : "Unknown");
|
|
||||||
const isBot = Boolean(historyMsg.botId);
|
|
||||||
const role = isBot ? "assistant" : "user";
|
|
||||||
const msgWithId = `${historyMsg.text}\n[slack message id: ${historyMsg.ts ?? "unknown"} channel: ${message.channel}]`;
|
|
||||||
historyParts.push(
|
|
||||||
formatInboundEnvelope({
|
|
||||||
channel: "Slack",
|
|
||||||
from: `${msgSenderName} (${role})`,
|
|
||||||
timestamp: historyMsg.ts ? Math.round(Number(historyMsg.ts) * 1000) : undefined,
|
|
||||||
body: msgWithId,
|
|
||||||
chatType: "channel",
|
|
||||||
envelope: envelopeOptions,
|
|
||||||
}),
|
|
||||||
);
|
|
||||||
}
|
|
||||||
threadHistoryBody = historyParts.join("\n\n");
|
|
||||||
logVerbose(
|
|
||||||
`slack: populated thread history with ${threadHistory.length} messages for new session`,
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Use direct media (including forwarded attachment media) if available, else thread starter media
|
// Use direct media (including forwarded attachment media) if available, else thread starter media
|
||||||
const effectiveMedia = effectiveDirectMedia ?? threadStarterMedia;
|
const effectiveMedia = effectiveDirectMedia ?? threadStarterMedia;
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user