From 24e394534e76477947c59d0f2e7abbcb41bbbc73 Mon Sep 17 00:00:00 2001 From: OpenClaw Bot Date: Wed, 18 Mar 2026 05:54:54 +0000 Subject: [PATCH] Signal: add container REST API support with unified adapter Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../fragments/pr-signal-container-mode.md | 1 + docs/channels/signal.md | 60 +- extensions/signal/src/client-adapter.test.ts | 433 ++++++++++++ extensions/signal/src/client-adapter.ts | 187 +++++ .../signal/src/client-container.test.ts | 638 ++++++++++++++++++ extensions/signal/src/client-container.ts | 544 +++++++++++++++ .../src/monitor.tool-result.test-harness.ts | 10 +- extensions/signal/src/monitor.ts | 9 +- extensions/signal/src/probe.test.ts | 8 +- extensions/signal/src/probe.ts | 2 +- extensions/signal/src/send-reactions.test.ts | 2 +- extensions/signal/src/send-reactions.ts | 2 +- extensions/signal/src/send.ts | 2 +- extensions/signal/src/sse-reconnect.ts | 16 +- src/config/types.signal.ts | 9 + 15 files changed, 1902 insertions(+), 21 deletions(-) create mode 100644 changelog/fragments/pr-signal-container-mode.md create mode 100644 extensions/signal/src/client-adapter.test.ts create mode 100644 extensions/signal/src/client-adapter.ts create mode 100644 extensions/signal/src/client-container.test.ts create mode 100644 extensions/signal/src/client-container.ts diff --git a/changelog/fragments/pr-signal-container-mode.md b/changelog/fragments/pr-signal-container-mode.md new file mode 100644 index 00000000000..9f0231681de --- /dev/null +++ b/changelog/fragments/pr-signal-container-mode.md @@ -0,0 +1 @@ +- Signal/container mode: add REST API support for bbernhard/signal-cli-rest-api containerized deployments via a unified adapter layer, with automatic mode detection and `channels.signal.apiMode` config. (#10240) Thanks @Hua688. diff --git a/docs/channels/signal.md b/docs/channels/signal.md index fb5747dc417..d3fa29a3bdb 100644 --- a/docs/channels/signal.md +++ b/docs/channels/signal.md @@ -1,5 +1,5 @@ --- -summary: "Signal support via signal-cli (JSON-RPC + SSE), setup paths, and number model" +summary: "Signal support via signal-cli (native daemon or bbernhard container), setup paths, and number model" read_when: - Setting up Signal support - Debugging Signal send/receive @@ -8,12 +8,14 @@ title: "Signal" # Signal (signal-cli) -Status: external CLI integration. Gateway talks to `signal-cli` over HTTP JSON-RPC + SSE. +Status: external CLI integration. Gateway talks to `signal-cli` over HTTP — either native daemon (JSON-RPC + SSE) or bbernhard/signal-cli-rest-api container (REST + WebSocket). ## Prerequisites - OpenClaw installed on your server (Linux flow below tested on Ubuntu 24). -- `signal-cli` available on the host where the gateway runs. +- One of: + - `signal-cli` available on the host (native mode), **or** + - `bbernhard/signal-cli-rest-api` Docker container (container mode). - A phone number that can receive one verification SMS (for SMS registration path). - Browser access for Signal captcha (`signalcaptchas.org`) during registration. @@ -179,6 +181,54 @@ If you want to manage `signal-cli` yourself (slow JVM cold starts, container ini This skips auto-spawn and the startup wait inside OpenClaw. For slow starts when auto-spawning, set `channels.signal.startupTimeoutMs`. +## Container mode (bbernhard/signal-cli-rest-api) + +Instead of running `signal-cli` natively, you can use the [bbernhard/signal-cli-rest-api](https://github.com/bbernhard/signal-cli-rest-api) Docker container. This wraps `signal-cli` behind a REST API and WebSocket interface. + +Requirements: + +- The container **must** run with `MODE=json-rpc` for real-time message receiving. +- Register or link your Signal account inside the container before connecting OpenClaw. + +Example `docker-compose.yml` service: + +```yaml +signal-cli: + image: bbernhard/signal-cli-rest-api:latest + environment: + MODE: json-rpc + ports: + - "8080:8080" + volumes: + - signal-cli-data:/home/.local/share/signal-cli +``` + +OpenClaw config: + +```json5 +{ + channels: { + signal: { + enabled: true, + account: "+15551234567", + httpUrl: "http://signal-cli:8080", + autoStart: false, + apiMode: "container", // or "auto" to detect automatically + }, + }, +} +``` + +The `apiMode` field controls which protocol OpenClaw uses: + +| Value | Behavior | +| ------------- | ------------------------------------------------------------------------------------ | +| `"auto"` | (Default) Probes both endpoints and picks whichever responds first | +| `"native"` | Force native signal-cli (JSON-RPC at `/api/v1/rpc`, SSE at `/api/v1/events`) | +| `"container"` | Force bbernhard container (REST at `/v2/send`, WebSocket at `/v1/receive/{account}`) | + +When `apiMode` is `"auto"`, OpenClaw caches the detected mode for 30 seconds to avoid repeated probes. + ## Access control (DMs + groups) DMs: @@ -201,7 +251,8 @@ Groups: ## How it works (behavior) -- `signal-cli` runs as a daemon; the gateway reads events via SSE. +- Native mode: `signal-cli` runs as a daemon; the gateway reads events via SSE. +- Container mode: the gateway sends via REST API and receives via WebSocket. - Inbound messages are normalized into the shared channel envelope. - Replies always route back to the same number or group. @@ -300,6 +351,7 @@ Full configuration: [Configuration](/gateway/configuration) Provider options: - `channels.signal.enabled`: enable/disable channel startup. +- `channels.signal.apiMode`: `auto | native | container` (default: auto). See [Container mode](#container-mode-bbernhardsignal-cli-rest-api). - `channels.signal.account`: E.164 for the bot account. - `channels.signal.cliPath`: path to `signal-cli`. - `channels.signal.httpUrl`: full daemon URL (overrides host/port). diff --git a/extensions/signal/src/client-adapter.test.ts b/extensions/signal/src/client-adapter.test.ts new file mode 100644 index 00000000000..c02b0f2f486 --- /dev/null +++ b/extensions/signal/src/client-adapter.test.ts @@ -0,0 +1,433 @@ +import * as configModule from "openclaw/plugin-sdk/config-runtime"; +import { describe, expect, it, vi, beforeEach } from "vitest"; +import { + signalRpcRequest, + detectSignalApiMode, + signalCheck, + streamSignalEvents, + fetchAttachment, +} from "./client-adapter.js"; +import * as containerClientModule from "./client-container.js"; +import * as nativeClientModule from "./client.js"; + +const mockNativeCheck = vi.fn(); +const mockNativeRpcRequest = vi.fn(); +const mockNativeStreamEvents = vi.fn(); +const mockContainerCheck = vi.fn(); +const mockContainerRpcRequest = vi.fn(); +const mockContainerFetchAttachment = vi.fn(); +const mockStreamContainerEvents = vi.fn(); +const mockLoadConfig = vi.fn(() => ({})); + +beforeEach(() => { + vi.spyOn(nativeClientModule, "signalCheck").mockImplementation(mockNativeCheck as any); + vi.spyOn(nativeClientModule, "signalRpcRequest").mockImplementation(mockNativeRpcRequest as any); + vi.spyOn(nativeClientModule, "streamSignalEvents").mockImplementation( + mockNativeStreamEvents as any, + ); + vi.spyOn(containerClientModule, "containerCheck").mockImplementation(mockContainerCheck as any); + vi.spyOn(containerClientModule, "containerRpcRequest").mockImplementation( + mockContainerRpcRequest as any, + ); + vi.spyOn(containerClientModule, "containerFetchAttachment").mockImplementation( + mockContainerFetchAttachment as any, + ); + vi.spyOn(containerClientModule, "streamContainerEvents").mockImplementation( + mockStreamContainerEvents as any, + ); + vi.spyOn(configModule, "loadConfig").mockImplementation(mockLoadConfig as any); +}); + +function setApiMode(mode: "native" | "container" | "auto") { + mockLoadConfig.mockReturnValue({ + channels: { + signal: { + apiMode: mode, + }, + }, + }); +} + +describe("detectSignalApiMode", () => { + beforeEach(() => { + vi.clearAllMocks(); + setApiMode("native"); + }); + + it("returns native when native endpoint responds", async () => { + mockNativeCheck.mockResolvedValue({ ok: true, status: 200 }); + mockContainerCheck.mockResolvedValue({ ok: false, status: 404 }); + + const result = await detectSignalApiMode("http://localhost:8080"); + expect(result).toBe("native"); + }); + + it("returns container when only container endpoint responds", async () => { + mockNativeCheck.mockResolvedValue({ ok: false, status: 404 }); + mockContainerCheck.mockResolvedValue({ ok: true, status: 200 }); + + const result = await detectSignalApiMode("http://localhost:8080"); + expect(result).toBe("container"); + }); + + it("prefers native when both endpoints respond", async () => { + mockNativeCheck.mockResolvedValue({ ok: true, status: 200 }); + mockContainerCheck.mockResolvedValue({ ok: true, status: 200 }); + + const result = await detectSignalApiMode("http://localhost:8080"); + expect(result).toBe("native"); + }); + + it("throws error when neither endpoint responds", async () => { + mockNativeCheck.mockResolvedValue({ ok: false, status: null, error: "Connection refused" }); + mockContainerCheck.mockResolvedValue({ ok: false, status: null, error: "Connection refused" }); + + await expect(detectSignalApiMode("http://localhost:8080")).rejects.toThrow( + "Signal API not reachable at http://localhost:8080", + ); + }); + + it("handles exceptions from check functions", async () => { + mockNativeCheck.mockRejectedValue(new Error("Network error")); + mockContainerCheck.mockRejectedValue(new Error("Network error")); + + await expect(detectSignalApiMode("http://localhost:8080")).rejects.toThrow( + "Signal API not reachable", + ); + }); + + it("respects timeout parameter", async () => { + mockNativeCheck.mockResolvedValue({ ok: true, status: 200 }); + mockContainerCheck.mockResolvedValue({ ok: false }); + + await detectSignalApiMode("http://localhost:8080", 5000); + expect(mockNativeCheck).toHaveBeenCalledWith("http://localhost:8080", 5000); + expect(mockContainerCheck).toHaveBeenCalledWith("http://localhost:8080", 5000); + }); +}); + +describe("signalRpcRequest", () => { + beforeEach(() => { + vi.clearAllMocks(); + setApiMode("native"); + }); + + it("routes to native JSON-RPC for native mode", async () => { + mockNativeRpcRequest.mockResolvedValue({ timestamp: 1700000000000 }); + + const result = await signalRpcRequest( + "send", + { message: "Hello", account: "+14259798283", recipient: ["+15550001111"] }, + { baseUrl: "http://localhost:8080" }, + ); + + expect(result).toEqual({ timestamp: 1700000000000 }); + expect(mockNativeRpcRequest).toHaveBeenCalledWith( + "send", + expect.objectContaining({ message: "Hello" }), + expect.objectContaining({ baseUrl: "http://localhost:8080" }), + ); + expect(mockContainerRpcRequest).not.toHaveBeenCalled(); + }); + + it("routes to container RPC for container mode", async () => { + setApiMode("container"); + mockContainerRpcRequest.mockResolvedValue({ timestamp: 1700000000000 }); + + const result = await signalRpcRequest( + "send", + { message: "Hello", account: "+14259798283", recipient: ["+15550001111"] }, + { baseUrl: "http://localhost:8080" }, + ); + + expect(result).toEqual({ timestamp: 1700000000000 }); + expect(mockContainerRpcRequest).toHaveBeenCalledWith( + "send", + expect.objectContaining({ message: "Hello" }), + expect.objectContaining({ baseUrl: "http://localhost:8080" }), + ); + expect(mockNativeRpcRequest).not.toHaveBeenCalled(); + }); + + it("passes all RPC methods through to native", async () => { + mockNativeRpcRequest.mockResolvedValue({}); + + await signalRpcRequest( + "sendTyping", + { account: "+1", recipient: ["+2"] }, + { baseUrl: "http://localhost:8080" }, + ); + expect(mockNativeRpcRequest).toHaveBeenCalledWith( + "sendTyping", + expect.anything(), + expect.anything(), + ); + }); + + it("passes all RPC methods through to container", async () => { + setApiMode("container"); + mockContainerRpcRequest.mockResolvedValue({}); + + await signalRpcRequest( + "sendReceipt", + { account: "+1", recipient: ["+2"] }, + { baseUrl: "http://localhost:8080" }, + ); + expect(mockContainerRpcRequest).toHaveBeenCalledWith( + "sendReceipt", + expect.anything(), + expect.anything(), + ); + }); +}); + +describe("signalCheck", () => { + beforeEach(() => { + vi.clearAllMocks(); + setApiMode("native"); + }); + + it("uses native check for native mode", async () => { + mockNativeCheck.mockResolvedValue({ ok: true, status: 200 }); + + const result = await signalCheck("http://localhost:8080"); + + expect(result).toEqual({ ok: true, status: 200 }); + expect(mockNativeCheck).toHaveBeenCalledWith("http://localhost:8080", 10000); + expect(mockContainerCheck).not.toHaveBeenCalled(); + }); + + it("uses container check for container mode", async () => { + setApiMode("container"); + mockContainerCheck.mockResolvedValue({ ok: true, status: 200 }); + + const result = await signalCheck("http://localhost:8080"); + + expect(result).toEqual({ ok: true, status: 200 }); + expect(mockContainerCheck).toHaveBeenCalledWith("http://localhost:8080", 10000); + expect(mockNativeCheck).not.toHaveBeenCalled(); + }); + + it("respects timeout parameter", async () => { + mockNativeCheck.mockResolvedValue({ ok: true }); + + await signalCheck("http://localhost:8080", 5000); + + expect(mockNativeCheck).toHaveBeenCalledWith("http://localhost:8080", 5000); + }); +}); + +describe("streamSignalEvents", () => { + beforeEach(() => { + vi.clearAllMocks(); + setApiMode("native"); + }); + + it("uses native SSE for native mode", async () => { + mockNativeStreamEvents.mockResolvedValue(); + + const onEvent = vi.fn(); + await streamSignalEvents({ + baseUrl: "http://localhost:8080", + account: "+14259798283", + onEvent, + }); + + expect(mockNativeStreamEvents).toHaveBeenCalledWith( + expect.objectContaining({ + baseUrl: "http://localhost:8080", + account: "+14259798283", + }), + ); + expect(mockStreamContainerEvents).not.toHaveBeenCalled(); + }); + + it("uses container WebSocket for container mode", async () => { + setApiMode("container"); + mockStreamContainerEvents.mockResolvedValue(); + + const onEvent = vi.fn(); + await streamSignalEvents({ + baseUrl: "http://localhost:8080", + account: "+14259798283", + onEvent, + }); + + expect(mockStreamContainerEvents).toHaveBeenCalledWith( + expect.objectContaining({ + baseUrl: "http://localhost:8080", + account: "+14259798283", + }), + ); + expect(mockNativeStreamEvents).not.toHaveBeenCalled(); + }); + + it("passes native SSE events through unchanged", async () => { + const payload = { envelope: { sourceNumber: "+1555000111" } }; + mockNativeStreamEvents.mockImplementation(async (params) => { + params.onEvent({ event: "receive", data: JSON.stringify(payload) }); + }); + + const events: unknown[] = []; + await streamSignalEvents({ + baseUrl: "http://localhost:8080", + onEvent: (evt) => events.push(evt), + }); + + expect(events).toHaveLength(1); + expect(events[0]).toEqual({ event: "receive", data: JSON.stringify(payload) }); + }); + + it("converts container events to SSE-like receive events", async () => { + setApiMode("container"); + mockStreamContainerEvents.mockImplementation(async (params) => { + params.onEvent({ envelope: { sourceNumber: "+1555000111" } }); + }); + + const events: unknown[] = []; + await streamSignalEvents({ + baseUrl: "http://localhost:8080", + onEvent: (evt) => events.push(evt), + }); + + expect(events).toHaveLength(1); + expect(events[0]).toEqual({ + event: "receive", + data: JSON.stringify({ envelope: { sourceNumber: "+1555000111" } }), + }); + }); + + it("passes abort signal to underlying stream", async () => { + mockNativeStreamEvents.mockResolvedValue(); + + const abortController = new AbortController(); + await streamSignalEvents({ + baseUrl: "http://localhost:8080", + abortSignal: abortController.signal, + onEvent: vi.fn(), + }); + + expect(mockNativeStreamEvents).toHaveBeenCalledWith( + expect.objectContaining({ + abortSignal: abortController.signal, + }), + ); + }); +}); + +describe("fetchAttachment", () => { + beforeEach(() => { + vi.clearAllMocks(); + setApiMode("native"); + }); + + it("uses native JSON-RPC for native mode with sender", async () => { + mockNativeRpcRequest.mockResolvedValue({ data: "base64data" }); + + const result = await fetchAttachment({ + baseUrl: "http://localhost:8080", + account: "+14259798283", + attachmentId: "attachment-123", + sender: "+15550001111", + }); + + expect(result).toBeInstanceOf(Buffer); + expect(mockNativeRpcRequest).toHaveBeenCalledWith( + "getAttachment", + expect.objectContaining({ + id: "attachment-123", + account: "+14259798283", + recipient: "+15550001111", + }), + expect.anything(), + ); + }); + + it("uses container REST for container mode", async () => { + setApiMode("container"); + const mockBuffer = Buffer.from([0x89, 0x50, 0x4e, 0x47]); + mockContainerFetchAttachment.mockResolvedValue(mockBuffer); + + const result = await fetchAttachment({ + baseUrl: "http://localhost:8080", + attachmentId: "attachment-123", + }); + + expect(result).toBe(mockBuffer); + expect(mockContainerFetchAttachment).toHaveBeenCalledWith( + "attachment-123", + expect.objectContaining({ baseUrl: "http://localhost:8080" }), + ); + }); + + it("returns null for native mode without sender or groupId", async () => { + const result = await fetchAttachment({ + baseUrl: "http://localhost:8080", + attachmentId: "attachment-123", + }); + + expect(result).toBeNull(); + expect(mockNativeRpcRequest).not.toHaveBeenCalled(); + }); + + it("uses groupId when provided for native mode", async () => { + mockNativeRpcRequest.mockResolvedValue({ data: "base64data" }); + + await fetchAttachment({ + baseUrl: "http://localhost:8080", + attachmentId: "attachment-123", + groupId: "group-123", + }); + + expect(mockNativeRpcRequest).toHaveBeenCalledWith( + "getAttachment", + expect.objectContaining({ groupId: "group-123" }), + expect.anything(), + ); + }); + + it("returns null when native RPC returns no data", async () => { + mockNativeRpcRequest.mockResolvedValue({}); + + const result = await fetchAttachment({ + baseUrl: "http://localhost:8080", + attachmentId: "attachment-123", + sender: "+15550001111", + }); + + expect(result).toBeNull(); + }); + + it("prefers groupId over sender when both provided", async () => { + mockNativeRpcRequest.mockResolvedValue({ data: "base64data" }); + + await fetchAttachment({ + baseUrl: "http://localhost:8080", + attachmentId: "attachment-123", + sender: "+15550001111", + groupId: "group-123", + }); + + const callParams = mockNativeRpcRequest.mock.calls[0][1]; + expect(callParams).toHaveProperty("groupId", "group-123"); + expect(callParams).not.toHaveProperty("recipient"); + }); + + it("passes timeout to container fetch", async () => { + setApiMode("container"); + mockContainerFetchAttachment.mockResolvedValue(Buffer.from([])); + + await fetchAttachment({ + baseUrl: "http://localhost:8080", + attachmentId: "attachment-123", + timeoutMs: 60000, + }); + + expect(mockContainerFetchAttachment).toHaveBeenCalledWith( + "attachment-123", + expect.objectContaining({ + timeoutMs: 60000, + }), + ); + }); +}); diff --git a/extensions/signal/src/client-adapter.ts b/extensions/signal/src/client-adapter.ts new file mode 100644 index 00000000000..8af898ed913 --- /dev/null +++ b/extensions/signal/src/client-adapter.ts @@ -0,0 +1,187 @@ +/** + * Signal client adapter - unified interface for both native signal-cli and bbernhard container. + * + * This adapter provides a single API that routes to the appropriate implementation + * based on the configured API mode. Exports mirror client.ts names so consumers + * only need to change their import path. + */ + +import { loadConfig } from "openclaw/plugin-sdk/config-runtime"; +import { + containerCheck, + containerRpcRequest, + streamContainerEvents, + containerFetchAttachment, +} from "./client-container.js"; +import type { SignalRpcOptions } from "./client.js"; +import { + signalCheck as nativeCheck, + signalRpcRequest as nativeRpcRequest, + streamSignalEvents as nativeStreamEvents, +} from "./client.js"; + +const DEFAULT_TIMEOUT_MS = 10_000; +const MODE_CACHE_TTL_MS = 30_000; + +export type SignalAdapterEvent = { + event?: string; + data?: string; +}; + +// Re-export the options type so consumers can import it from the adapter. +export type { SignalRpcOptions } from "./client.js"; + +// Cache auto-detected modes per baseUrl to avoid repeated network probes. +const detectedModeCache = new Map(); + +/** + * Resolve the effective API mode for a given baseUrl + accountId. + * Reads config internally; callers never need to pass apiMode. + */ +async function resolveApiMode( + baseUrl: string, + _accountId?: string, +): Promise<"native" | "container"> { + const cfg = loadConfig(); + const configured = cfg.channels?.signal?.apiMode ?? "auto"; + + if (configured === "native" || configured === "container") { + return configured; + } + + // "auto" — check cache first, then probe + const cached = detectedModeCache.get(baseUrl); + if (cached && cached.expiresAt > Date.now()) { + return cached.mode; + } + const detected = await detectSignalApiMode(baseUrl); + detectedModeCache.set(baseUrl, { mode: detected, expiresAt: Date.now() + MODE_CACHE_TTL_MS }); + return detected; +} + +/** + * Detect which Signal API mode is available by probing endpoints. + * First endpoint to respond OK wins. + */ +export async function detectSignalApiMode( + baseUrl: string, + timeoutMs = DEFAULT_TIMEOUT_MS, +): Promise<"native" | "container"> { + const nativePromise = nativeCheck(baseUrl, timeoutMs).then((r) => + r.ok ? ("native" as const) : Promise.reject(new Error("native not ok")), + ); + const containerPromise = containerCheck(baseUrl, timeoutMs).then((r) => + r.ok ? ("container" as const) : Promise.reject(new Error("container not ok")), + ); + + try { + return await Promise.any([nativePromise, containerPromise]); + } catch { + throw new Error(`Signal API not reachable at ${baseUrl}`); + } +} + +/** + * Drop-in replacement for native signalRpcRequest. + * Routes to native JSON-RPC or container REST based on config. + */ +export async function signalRpcRequest( + method: string, + params: Record | undefined, + opts: SignalRpcOptions & { accountId?: string }, +): Promise { + const mode = await resolveApiMode(opts.baseUrl, opts.accountId); + if (mode === "native") { + return nativeRpcRequest(method, params, opts); + } + return containerRpcRequest(method, params, opts); +} + +/** + * Drop-in replacement for native signalCheck. + */ +export async function signalCheck( + baseUrl: string, + timeoutMs = DEFAULT_TIMEOUT_MS, +): Promise<{ ok: boolean; status?: number | null; error?: string | null }> { + const mode = await resolveApiMode(baseUrl); + if (mode === "container") { + return containerCheck(baseUrl, timeoutMs); + } + return nativeCheck(baseUrl, timeoutMs); +} + +/** + * Drop-in replacement for native streamSignalEvents. + * Container mode uses WebSocket; native uses SSE. + */ +export async function streamSignalEvents(params: { + baseUrl: string; + account?: string; + accountId?: string; + abortSignal?: AbortSignal; + onEvent: (event: SignalAdapterEvent) => void; + logger?: { log?: (msg: string) => void; error?: (msg: string) => void }; +}): Promise { + const mode = await resolveApiMode(params.baseUrl, params.accountId); + + if (mode === "container") { + return streamContainerEvents({ + baseUrl: params.baseUrl, + account: params.account, + abortSignal: params.abortSignal, + onEvent: (event) => params.onEvent({ event: "receive", data: JSON.stringify(event) }), + logger: params.logger, + }); + } + + return nativeStreamEvents({ + baseUrl: params.baseUrl, + account: params.account, + abortSignal: params.abortSignal, + onEvent: (event) => params.onEvent(event), + }); +} + +/** + * Fetch attachment, routing to native or container implementation. + */ +export async function fetchAttachment(params: { + baseUrl: string; + account?: string; + accountId?: string; + attachmentId: string; + sender?: string; + groupId?: string; + timeoutMs?: number; +}): Promise { + const mode = await resolveApiMode(params.baseUrl, params.accountId); + if (mode === "container") { + return containerFetchAttachment(params.attachmentId, { + baseUrl: params.baseUrl, + timeoutMs: params.timeoutMs, + }); + } + + const rpcParams: Record = { + id: params.attachmentId, + }; + if (params.account) { + rpcParams.account = params.account; + } + if (params.groupId) { + rpcParams.groupId = params.groupId; + } else if (params.sender) { + rpcParams.recipient = params.sender; + } else { + return null; + } + const result = await nativeRpcRequest<{ data?: string }>("getAttachment", rpcParams, { + baseUrl: params.baseUrl, + timeoutMs: params.timeoutMs, + }); + if (!result?.data) { + return null; + } + return Buffer.from(result.data, "base64"); +} diff --git a/extensions/signal/src/client-container.test.ts b/extensions/signal/src/client-container.test.ts new file mode 100644 index 00000000000..b963ef98cdc --- /dev/null +++ b/extensions/signal/src/client-container.test.ts @@ -0,0 +1,638 @@ +import * as fetchModule from "openclaw/plugin-sdk/infra-runtime"; +import { describe, expect, it, vi, beforeEach } from "vitest"; +import { + containerCheck, + containerRestRequest, + containerSendMessage, + containerSendTyping, + containerSendReceipt, + containerFetchAttachment, + containerSendReaction, + containerRemoveReaction, +} from "./client-container.js"; + +// spyOn approach works with vitest forks pool for cross-directory imports +const mockFetch = vi.fn(); + +beforeEach(() => { + vi.spyOn(fetchModule, "resolveFetch").mockReturnValue(mockFetch as unknown as typeof fetch); +}); + +// Mock WebSocket (not testing streamContainerEvents in unit tests due to complexity) +vi.mock("ws", () => ({ + default: class MockWebSocket { + on() {} + close() {} + }, +})); + +describe("containerCheck", () => { + beforeEach(() => { + vi.clearAllMocks(); + }); + + it("returns ok:true when /v1/about returns 200", async () => { + mockFetch.mockResolvedValue({ + ok: true, + status: 200, + }); + + const result = await containerCheck("http://localhost:8080"); + expect(result).toEqual({ ok: true, status: 200, error: null }); + expect(mockFetch).toHaveBeenCalledWith( + "http://localhost:8080/v1/about", + expect.objectContaining({ method: "GET" }), + ); + }); + + it("returns ok:false when /v1/about returns 404", async () => { + mockFetch.mockResolvedValue({ + ok: false, + status: 404, + }); + + const result = await containerCheck("http://localhost:8080"); + expect(result).toEqual({ ok: false, status: 404, error: "HTTP 404" }); + }); + + it("returns ok:false with error message on fetch failure", async () => { + mockFetch.mockRejectedValue(new Error("Network error")); + + const result = await containerCheck("http://localhost:8080"); + expect(result).toEqual({ ok: false, status: null, error: "Network error" }); + }); + + it("normalizes base URL by removing trailing slash", async () => { + mockFetch.mockResolvedValue({ ok: true, status: 200 }); + + await containerCheck("http://localhost:8080/"); + expect(mockFetch).toHaveBeenCalledWith("http://localhost:8080/v1/about", expect.anything()); + }); + + it("adds http:// prefix when missing", async () => { + mockFetch.mockResolvedValue({ ok: true, status: 200 }); + + await containerCheck("localhost:8080"); + expect(mockFetch).toHaveBeenCalledWith("http://localhost:8080/v1/about", expect.anything()); + }); +}); + +describe("containerRestRequest", () => { + beforeEach(() => { + vi.clearAllMocks(); + }); + + it("makes GET request with correct endpoint", async () => { + mockFetch.mockResolvedValue({ + ok: true, + status: 200, + text: async () => JSON.stringify({ version: "1.0" }), + }); + + const result = await containerRestRequest("/v1/about", { baseUrl: "http://localhost:8080" }); + expect(result).toEqual({ version: "1.0" }); + expect(mockFetch).toHaveBeenCalledWith( + "http://localhost:8080/v1/about", + expect.objectContaining({ + method: "GET", + headers: { "Content-Type": "application/json" }, + }), + ); + }); + + it("makes POST request with body", async () => { + mockFetch.mockResolvedValue({ + ok: true, + status: 201, + }); + + await containerRestRequest("/v2/send", { baseUrl: "http://localhost:8080" }, "POST", { + message: "test", + number: "+1234567890", + recipients: ["+1234567890"], + }); + + expect(mockFetch).toHaveBeenCalledWith( + "http://localhost:8080/v2/send", + expect.objectContaining({ + method: "POST", + body: JSON.stringify({ + message: "test", + number: "+1234567890", + recipients: ["+1234567890"], + }), + }), + ); + }); + + it("returns undefined for 201 status", async () => { + mockFetch.mockResolvedValue({ + ok: true, + status: 201, + }); + + const result = await containerRestRequest( + "/v2/send", + { baseUrl: "http://localhost:8080" }, + "POST", + ); + expect(result).toBeUndefined(); + }); + + it("returns undefined for 204 status", async () => { + mockFetch.mockResolvedValue({ + ok: true, + status: 204, + }); + + const result = await containerRestRequest( + "/v1/typing-indicator/+1234567890", + { baseUrl: "http://localhost:8080" }, + "PUT", + ); + expect(result).toBeUndefined(); + }); + + it("throws error on non-ok response", async () => { + mockFetch.mockResolvedValue({ + ok: false, + status: 500, + statusText: "Internal Server Error", + text: async () => "Server error details", + }); + + await expect( + containerRestRequest("/v2/send", { baseUrl: "http://localhost:8080" }, "POST"), + ).rejects.toThrow("Signal REST 500: Server error details"); + }); + + it("handles empty response body", async () => { + mockFetch.mockResolvedValue({ + ok: true, + status: 200, + text: async () => "", + }); + + const result = await containerRestRequest("/v1/about", { baseUrl: "http://localhost:8080" }); + expect(result).toBeUndefined(); + }); + + it("respects custom timeout by using abort signal", async () => { + mockFetch.mockResolvedValue({ + ok: true, + status: 200, + text: async () => "{}", + }); + + await containerRestRequest("/v1/about", { baseUrl: "http://localhost:8080", timeoutMs: 5000 }); + + // The timeout is enforced via AbortController, so we verify the call was made with a signal + expect(mockFetch).toHaveBeenCalled(); + const callArgs = mockFetch.mock.calls[0]; + expect(callArgs[1].signal).toBeDefined(); + }); +}); + +describe("containerSendMessage", () => { + beforeEach(() => { + vi.clearAllMocks(); + }); + + it("sends message to recipients", async () => { + mockFetch.mockResolvedValue({ + ok: true, + status: 200, + text: async () => JSON.stringify({ timestamp: 1700000000000 }), + }); + + const result = await containerSendMessage({ + baseUrl: "http://localhost:8080", + account: "+14259798283", + recipients: ["+15550001111"], + message: "Hello world", + }); + + expect(result).toEqual({ timestamp: 1700000000000 }); + expect(mockFetch).toHaveBeenCalledWith( + "http://localhost:8080/v2/send", + expect.objectContaining({ + method: "POST", + body: JSON.stringify({ + message: "Hello world", + number: "+14259798283", + recipients: ["+15550001111"], + }), + }), + ); + }); + + it("includes text styles when provided", async () => { + mockFetch.mockResolvedValue({ + ok: true, + status: 200, + text: async () => JSON.stringify({}), + }); + + await containerSendMessage({ + baseUrl: "http://localhost:8080", + account: "+14259798283", + recipients: ["+15550001111"], + message: "Bold text", + textStyles: [{ start: 0, length: 4, style: "BOLD" }], + }); + + const callArgs = mockFetch.mock.calls[0]; + const body = JSON.parse(callArgs[1].body); + expect(body["text_style"]).toEqual(["0:4:BOLD"]); + }); + + it("includes attachments as base64 data URIs", async () => { + const fs = await import("node:fs/promises"); + const os = await import("node:os"); + const path = await import("node:path"); + + // Create a temp file with known content + const tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), "signal-test-")); + const tmpFile = path.join(tmpDir, "test-image.jpg"); + const content = Buffer.from([0xff, 0xd8, 0xff, 0xe0]); // JPEG magic bytes + await fs.writeFile(tmpFile, content); + + mockFetch.mockResolvedValue({ + ok: true, + status: 200, + text: async () => JSON.stringify({}), + }); + + await containerSendMessage({ + baseUrl: "http://localhost:8080", + account: "+14259798283", + recipients: ["+15550001111"], + message: "Photo", + attachments: [tmpFile], + }); + + const callArgs = mockFetch.mock.calls[0]; + const body = JSON.parse(callArgs[1].body); + expect(body.attachments).toBeUndefined(); + expect(body.base64_attachments).toBeDefined(); + expect(body.base64_attachments).toHaveLength(1); + expect(body.base64_attachments[0]).toMatch( + /^data:image\/jpeg;filename=test-image\.jpg;base64,/, + ); + + // Cleanup + await fs.rm(tmpDir, { recursive: true }); + }); +}); + +describe("containerSendTyping", () => { + beforeEach(() => { + vi.clearAllMocks(); + }); + + it("sends typing indicator with PUT", async () => { + mockFetch.mockResolvedValue({ + ok: true, + status: 204, + }); + + const result = await containerSendTyping({ + baseUrl: "http://localhost:8080", + account: "+14259798283", + recipient: "+15550001111", + }); + + expect(result).toBe(true); + expect(mockFetch).toHaveBeenCalledWith( + "http://localhost:8080/v1/typing-indicator/%2B14259798283", + expect.objectContaining({ + method: "PUT", + body: JSON.stringify({ recipient: "+15550001111" }), + }), + ); + }); + + it("stops typing indicator with DELETE", async () => { + mockFetch.mockResolvedValue({ + ok: true, + status: 204, + }); + + await containerSendTyping({ + baseUrl: "http://localhost:8080", + account: "+14259798283", + recipient: "+15550001111", + stop: true, + }); + + expect(mockFetch).toHaveBeenCalledWith( + expect.anything(), + expect.objectContaining({ method: "DELETE" }), + ); + }); +}); + +describe("containerSendReceipt", () => { + beforeEach(() => { + vi.clearAllMocks(); + }); + + it("sends read receipt", async () => { + mockFetch.mockResolvedValue({ + ok: true, + status: 204, + }); + + const result = await containerSendReceipt({ + baseUrl: "http://localhost:8080", + account: "+14259798283", + recipient: "+15550001111", + timestamp: 1700000000000, + }); + + expect(result).toBe(true); + expect(mockFetch).toHaveBeenCalledWith( + "http://localhost:8080/v1/receipts/%2B14259798283", + expect.objectContaining({ + method: "POST", + body: JSON.stringify({ + recipient: "+15550001111", + timestamp: 1700000000000, + receipt_type: "read", + }), + }), + ); + }); + + it("sends viewed receipt when type specified", async () => { + mockFetch.mockResolvedValue({ + ok: true, + status: 204, + }); + + await containerSendReceipt({ + baseUrl: "http://localhost:8080", + account: "+14259798283", + recipient: "+15550001111", + timestamp: 1700000000000, + type: "viewed", + }); + + const callArgs = mockFetch.mock.calls[0]; + const body = JSON.parse(callArgs[1].body); + expect(body.receipt_type).toBe("viewed"); + }); +}); + +describe("containerFetchAttachment", () => { + beforeEach(() => { + vi.clearAllMocks(); + }); + + it("fetches attachment binary", async () => { + const binaryData = new Uint8Array([0x89, 0x50, 0x4e, 0x47]); // PNG header + mockFetch.mockResolvedValue({ + ok: true, + status: 200, + arrayBuffer: async () => binaryData.buffer, + }); + + const result = await containerFetchAttachment("attachment-123", { + baseUrl: "http://localhost:8080", + }); + + expect(result).toBeInstanceOf(Buffer); + expect(mockFetch).toHaveBeenCalledWith( + "http://localhost:8080/v1/attachments/attachment-123", + expect.objectContaining({ method: "GET" }), + ); + }); + + it("returns null on non-ok response", async () => { + mockFetch.mockResolvedValue({ + ok: false, + status: 404, + }); + + const result = await containerFetchAttachment("attachment-123", { + baseUrl: "http://localhost:8080", + }); + + expect(result).toBeNull(); + }); + + it("encodes attachment ID in URL", async () => { + mockFetch.mockResolvedValue({ + ok: true, + status: 200, + arrayBuffer: async () => new ArrayBuffer(0), + }); + + await containerFetchAttachment("path/with/slashes", { + baseUrl: "http://localhost:8080", + }); + + expect(mockFetch).toHaveBeenCalledWith( + "http://localhost:8080/v1/attachments/path%2Fwith%2Fslashes", + expect.anything(), + ); + }); +}); + +describe("normalizeBaseUrl edge cases", () => { + beforeEach(() => { + vi.clearAllMocks(); + }); + + it("throws error for empty base URL", async () => { + await expect(containerCheck("")).rejects.toThrow("Signal base URL is required"); + }); + + it("throws error for whitespace-only base URL", async () => { + await expect(containerCheck(" ")).rejects.toThrow("Signal base URL is required"); + }); + + it("handles https URLs", async () => { + mockFetch.mockResolvedValue({ ok: true, status: 200 }); + + await containerCheck("https://signal.example.com"); + expect(mockFetch).toHaveBeenCalledWith( + "https://signal.example.com/v1/about", + expect.anything(), + ); + }); + + it("handles URLs with ports", async () => { + mockFetch.mockResolvedValue({ ok: true, status: 200 }); + + await containerCheck("http://192.168.1.100:9922"); + expect(mockFetch).toHaveBeenCalledWith("http://192.168.1.100:9922/v1/about", expect.anything()); + }); +}); + +describe("containerRestRequest edge cases", () => { + beforeEach(() => { + vi.clearAllMocks(); + }); + + it("handles DELETE method", async () => { + mockFetch.mockResolvedValue({ + ok: true, + status: 204, + }); + + await containerRestRequest( + "/v1/some-resource/123", + { baseUrl: "http://localhost:8080" }, + "DELETE", + ); + + expect(mockFetch).toHaveBeenCalledWith( + "http://localhost:8080/v1/some-resource/123", + expect.objectContaining({ method: "DELETE" }), + ); + }); + + it("handles error response with empty body", async () => { + mockFetch.mockResolvedValue({ + ok: false, + status: 500, + statusText: "Internal Server Error", + text: async () => "", + }); + + await expect( + containerRestRequest("/v2/send", { baseUrl: "http://localhost:8080" }, "POST"), + ).rejects.toThrow("Signal REST 500: Internal Server Error"); + }); + + it("handles JSON parse errors gracefully", async () => { + mockFetch.mockResolvedValue({ + ok: true, + status: 200, + text: async () => "not-valid-json", + }); + + await expect( + containerRestRequest("/v1/about", { baseUrl: "http://localhost:8080" }), + ).rejects.toThrow(); + }); +}); + +describe("containerSendReaction", () => { + beforeEach(() => { + vi.clearAllMocks(); + }); + + it("sends reaction to recipient", async () => { + mockFetch.mockResolvedValue({ + ok: true, + status: 200, + text: async () => JSON.stringify({ timestamp: 1700000000000 }), + }); + + const result = await containerSendReaction({ + baseUrl: "http://localhost:8080", + account: "+14259798283", + recipient: "+15550001111", + emoji: "👍", + targetAuthor: "+15550001111", + targetTimestamp: 1699999999999, + }); + + expect(result).toEqual({ timestamp: 1700000000000 }); + expect(mockFetch).toHaveBeenCalledWith( + "http://localhost:8080/v1/reactions/%2B14259798283", + expect.objectContaining({ + method: "POST", + body: JSON.stringify({ + recipient: "+15550001111", + reaction: "👍", + target_author: "+15550001111", + timestamp: 1699999999999, + }), + }), + ); + }); + + it("includes group_id when provided", async () => { + mockFetch.mockResolvedValue({ + ok: true, + status: 200, + text: async () => JSON.stringify({}), + }); + + await containerSendReaction({ + baseUrl: "http://localhost:8080", + account: "+14259798283", + recipient: "+15550001111", + emoji: "❤️", + targetAuthor: "+15550001111", + targetTimestamp: 1699999999999, + groupId: "group-123", + }); + + const callArgs = mockFetch.mock.calls[0]; + const body = JSON.parse(callArgs[1].body); + expect(body.group_id).toBe("group-123"); + }); +}); + +describe("containerRemoveReaction", () => { + beforeEach(() => { + vi.clearAllMocks(); + }); + + it("removes reaction with DELETE", async () => { + mockFetch.mockResolvedValue({ + ok: true, + status: 200, + text: async () => JSON.stringify({ timestamp: 1700000000000 }), + }); + + const result = await containerRemoveReaction({ + baseUrl: "http://localhost:8080", + account: "+14259798283", + recipient: "+15550001111", + emoji: "👍", + targetAuthor: "+15550001111", + targetTimestamp: 1699999999999, + }); + + expect(result).toEqual({ timestamp: 1700000000000 }); + expect(mockFetch).toHaveBeenCalledWith( + "http://localhost:8080/v1/reactions/%2B14259798283", + expect.objectContaining({ + method: "DELETE", + body: JSON.stringify({ + recipient: "+15550001111", + reaction: "👍", + target_author: "+15550001111", + timestamp: 1699999999999, + }), + }), + ); + }); + + it("includes group_id when provided", async () => { + mockFetch.mockResolvedValue({ + ok: true, + status: 200, + text: async () => JSON.stringify({}), + }); + + await containerRemoveReaction({ + baseUrl: "http://localhost:8080", + account: "+14259798283", + recipient: "+15550001111", + emoji: "❤️", + targetAuthor: "+15550001111", + targetTimestamp: 1699999999999, + groupId: "group-123", + }); + + const callArgs = mockFetch.mock.calls[0]; + const body = JSON.parse(callArgs[1].body); + expect(body.group_id).toBe("group-123"); + }); +}); diff --git a/extensions/signal/src/client-container.ts b/extensions/signal/src/client-container.ts new file mode 100644 index 00000000000..8779a245299 --- /dev/null +++ b/extensions/signal/src/client-container.ts @@ -0,0 +1,544 @@ +/** + * Signal client for bbernhard/signal-cli-rest-api container. + * Uses WebSocket for receiving messages and REST API for sending. + * + * This is a separate implementation from client.ts (native signal-cli) + * to keep the two modes cleanly isolated. + */ + +import fs from "node:fs/promises"; +import nodePath from "node:path"; +import { resolveFetch } from "openclaw/plugin-sdk/infra-runtime"; +import { detectMime } from "openclaw/plugin-sdk/media-runtime"; +import WebSocket from "ws"; + +export type ContainerRpcOptions = { + baseUrl: string; + timeoutMs?: number; +}; + +export type ContainerWebSocketMessage = { + envelope?: { + syncMessage?: unknown; + dataMessage?: { + message?: string; + groupInfo?: { groupId?: string; groupName?: string }; + attachments?: Array<{ + id?: string; + contentType?: string; + filename?: string; + size?: number; + }>; + quote?: { text?: string }; + reaction?: unknown; + }; + editMessage?: { dataMessage?: unknown }; + reactionMessage?: unknown; + sourceNumber?: string; + sourceUuid?: string; + sourceName?: string; + timestamp?: number; + }; + exception?: { message?: string }; +}; + +const DEFAULT_TIMEOUT_MS = 10_000; + +function normalizeBaseUrl(url: string): string { + const trimmed = url.trim(); + if (!trimmed) { + throw new Error("Signal base URL is required"); + } + if (/^https?:\/\//i.test(trimmed)) { + return trimmed.replace(/\/+$/, ""); + } + return `http://${trimmed}`.replace(/\/+$/, ""); +} + +async function fetchWithTimeout(url: string, init: RequestInit, timeoutMs: number) { + const fetchImpl = resolveFetch(); + if (!fetchImpl) { + throw new Error("fetch is not available"); + } + const controller = new AbortController(); + const timer = setTimeout(() => controller.abort(), timeoutMs); + try { + return await fetchImpl(url, { ...init, signal: controller.signal }); + } finally { + clearTimeout(timer); + } +} + +/** + * Check if bbernhard container REST API is available. + */ +export async function containerCheck( + baseUrl: string, + timeoutMs = DEFAULT_TIMEOUT_MS, +): Promise<{ ok: boolean; status?: number | null; error?: string | null }> { + const normalized = normalizeBaseUrl(baseUrl); + try { + const res = await fetchWithTimeout(`${normalized}/v1/about`, { method: "GET" }, timeoutMs); + if (!res.ok) { + return { ok: false, status: res.status, error: `HTTP ${res.status}` }; + } + return { ok: true, status: res.status, error: null }; + } catch (err) { + return { + ok: false, + status: null, + error: err instanceof Error ? err.message : String(err), + }; + } +} + +/** + * Make a REST API request to bbernhard container. + */ +export async function containerRestRequest( + endpoint: string, + opts: ContainerRpcOptions, + method: "GET" | "POST" | "PUT" | "DELETE" = "GET", + body?: unknown, +): Promise { + const baseUrl = normalizeBaseUrl(opts.baseUrl); + const url = `${baseUrl}${endpoint}`; + + const init: RequestInit = { + method, + headers: { "Content-Type": "application/json" }, + }; + + if (body) { + init.body = JSON.stringify(body); + } + + const res = await fetchWithTimeout(url, init, opts.timeoutMs ?? DEFAULT_TIMEOUT_MS); + + if (res.status === 201 || res.status === 204) { + return undefined as T; + } + + if (!res.ok) { + const errorText = await res.text().catch(() => ""); + throw new Error(`Signal REST ${res.status}: ${errorText || res.statusText}`); + } + + const text = await res.text(); + if (!text) { + return undefined as T; + } + + return JSON.parse(text) as T; +} + +/** + * Fetch attachment binary from bbernhard container. + */ +export async function containerFetchAttachment( + attachmentId: string, + opts: ContainerRpcOptions, +): Promise { + const baseUrl = normalizeBaseUrl(opts.baseUrl); + const url = `${baseUrl}/v1/attachments/${encodeURIComponent(attachmentId)}`; + + const res = await fetchWithTimeout(url, { method: "GET" }, opts.timeoutMs ?? DEFAULT_TIMEOUT_MS); + + if (!res.ok) { + return null; + } + + const arrayBuffer = await res.arrayBuffer(); + return Buffer.from(arrayBuffer); +} + +/** + * Stream messages using WebSocket from bbernhard container. + * The Promise resolves when the connection closes (for any reason). + * The caller (runSignalLoopAdapter) is responsible for reconnection. + */ +export async function streamContainerEvents(params: { + baseUrl: string; + account?: string; + abortSignal?: AbortSignal; + onEvent: (event: ContainerWebSocketMessage) => void; + logger?: { log?: (msg: string) => void; error?: (msg: string) => void }; +}): Promise { + const normalized = normalizeBaseUrl(params.baseUrl); + const wsUrl = `${normalized.replace(/^http/, "ws")}/v1/receive/${encodeURIComponent(params.account ?? "")}`; + const log = params.logger?.log ?? (() => {}); + const logError = params.logger?.error ?? (() => {}); + + log(`[signal-ws] connecting to ${wsUrl}`); + + return new Promise((resolve, reject) => { + let ws: WebSocket; + let resolved = false; + + const cleanup = () => { + if (resolved) { + return; + } + resolved = true; + }; + + try { + ws = new WebSocket(wsUrl); + } catch (err) { + logError( + `[signal-ws] failed to create WebSocket: ${err instanceof Error ? err.message : String(err)}`, + ); + reject(err); + return; + } + + ws.on("open", () => { + log("[signal-ws] connected"); + }); + + ws.on("message", (data: Buffer) => { + try { + const text = data.toString(); + log(`[signal-ws] received: ${text.slice(0, 200)}${text.length > 200 ? "..." : ""}`); + const envelope = JSON.parse(text) as ContainerWebSocketMessage; + if (envelope) { + params.onEvent(envelope); + } + } catch (err) { + logError(`[signal-ws] parse error: ${err instanceof Error ? err.message : String(err)}`); + } + }); + + ws.on("error", (err) => { + logError(`[signal-ws] error: ${err instanceof Error ? err.message : String(err)}`); + // Don't resolve here - the close event will fire next + }); + + ws.on("close", (code, reason) => { + const reasonStr = reason?.toString() || "no reason"; + log(`[signal-ws] closed (code=${code}, reason=${reasonStr})`); + cleanup(); + resolve(); // Let the outer loop handle reconnection + }); + + ws.on("ping", () => { + log("[signal-ws] ping received"); + }); + + ws.on("pong", () => { + log("[signal-ws] pong received"); + }); + + params.abortSignal?.addEventListener( + "abort", + () => { + log("[signal-ws] aborted, closing connection"); + cleanup(); + ws.close(); + resolve(); + }, + { once: true }, + ); + }); +} + +/** + * Convert local file paths to base64 data URIs for the container REST API. + * The bbernhard container /v2/send only accepts `base64_attachments` (not file paths). + */ +async function filesToBase64DataUris(filePaths: string[]): Promise { + const results: string[] = []; + for (const filePath of filePaths) { + const buffer = await fs.readFile(filePath); + const mime = (await detectMime({ buffer, filePath })) ?? "application/octet-stream"; + const filename = nodePath.basename(filePath); + const b64 = buffer.toString("base64"); + results.push(`data:${mime};filename=${filename};base64,${b64}`); + } + return results; +} + +/** + * Send message via bbernhard container REST API. + */ +export async function containerSendMessage(params: { + baseUrl: string; + account: string; + recipients: string[]; + message: string; + textStyles?: Array<{ start: number; length: number; style: string }>; + attachments?: string[]; + timeoutMs?: number; +}): Promise<{ timestamp?: number }> { + const payload: Record = { + message: params.message, + number: params.account, + recipients: params.recipients, + }; + + if (params.textStyles && params.textStyles.length > 0) { + payload["text_style"] = params.textStyles.map( + (style) => `${style.start}:${style.length}:${style.style}`, + ); + } + + if (params.attachments && params.attachments.length > 0) { + // Container API only accepts base64-encoded attachments, not file paths. + payload.base64_attachments = await filesToBase64DataUris(params.attachments); + } + + const result = await containerRestRequest<{ timestamp?: number }>( + "/v2/send", + { baseUrl: params.baseUrl, timeoutMs: params.timeoutMs }, + "POST", + payload, + ); + + return result ?? {}; +} + +/** + * Send typing indicator via bbernhard container REST API. + */ +export async function containerSendTyping(params: { + baseUrl: string; + account: string; + recipient: string; + stop?: boolean; + timeoutMs?: number; +}): Promise { + const method = params.stop ? "DELETE" : "PUT"; + await containerRestRequest( + `/v1/typing-indicator/${encodeURIComponent(params.account)}`, + { baseUrl: params.baseUrl, timeoutMs: params.timeoutMs }, + method, + { recipient: params.recipient }, + ); + return true; +} + +/** + * Send read receipt via bbernhard container REST API. + */ +export async function containerSendReceipt(params: { + baseUrl: string; + account: string; + recipient: string; + timestamp: number; + type?: "read" | "viewed"; + timeoutMs?: number; +}): Promise { + await containerRestRequest( + `/v1/receipts/${encodeURIComponent(params.account)}`, + { baseUrl: params.baseUrl, timeoutMs: params.timeoutMs }, + "POST", + { + recipient: params.recipient, + timestamp: params.timestamp, + receipt_type: params.type ?? "read", + }, + ); + return true; +} + +/** + * Send a reaction to a message via bbernhard container REST API. + */ +export async function containerSendReaction(params: { + baseUrl: string; + account: string; + recipient: string; + emoji: string; + targetAuthor: string; + targetTimestamp: number; + groupId?: string; + timeoutMs?: number; +}): Promise<{ timestamp?: number }> { + const payload: Record = { + recipient: params.recipient, + reaction: params.emoji, + target_author: params.targetAuthor, + timestamp: params.targetTimestamp, + }; + + if (params.groupId) { + payload.group_id = params.groupId; + } + + const result = await containerRestRequest<{ timestamp?: number }>( + `/v1/reactions/${encodeURIComponent(params.account)}`, + { baseUrl: params.baseUrl, timeoutMs: params.timeoutMs }, + "POST", + payload, + ); + + return result ?? {}; +} + +/** + * Remove a reaction from a message via bbernhard container REST API. + */ +export async function containerRemoveReaction(params: { + baseUrl: string; + account: string; + recipient: string; + emoji: string; + targetAuthor: string; + targetTimestamp: number; + groupId?: string; + timeoutMs?: number; +}): Promise<{ timestamp?: number }> { + const payload: Record = { + recipient: params.recipient, + reaction: params.emoji, + target_author: params.targetAuthor, + timestamp: params.targetTimestamp, + }; + + if (params.groupId) { + payload.group_id = params.groupId; + } + + const result = await containerRestRequest<{ timestamp?: number }>( + `/v1/reactions/${encodeURIComponent(params.account)}`, + { baseUrl: params.baseUrl, timeoutMs: params.timeoutMs }, + "DELETE", + payload, + ); + + return result ?? {}; +} + +/** + * Strip the "uuid:" prefix that native signal-cli accepts but the container API rejects. + */ +function stripUuidPrefix(id: string): string { + return id.startsWith("uuid:") ? id.slice(5) : id; +} + +/** + * Convert a group internal_id to the container-expected format. + * The bbernhard container expects groups as "group.{base64(internal_id)}". + */ +function formatGroupIdForContainer(groupId: string): string { + if (groupId.startsWith("group.")) { + return groupId; + } + return `group.${Buffer.from(groupId).toString("base64")}`; +} + +/** + * Drop-in replacement for native signalRpcRequest that translates + * JSON-RPC method + params into the equivalent container REST API calls. + * This keeps all container protocol details (uuid: stripping, group ID + * formatting, base64 attachments, text-style conversion) isolated here. + */ +export async function containerRpcRequest( + method: string, + params: Record | undefined, + opts: ContainerRpcOptions, +): Promise { + const p = params ?? {}; + switch (method) { + case "send": { + const recipients = ((p.recipient as string[] | undefined) ?? []).map(stripUuidPrefix); + const usernames = ((p.username as string[] | undefined) ?? []).map(stripUuidPrefix); + const groupId = p.groupId as string | undefined; + const formattedGroupId = groupId ? formatGroupIdForContainer(groupId) : undefined; + const finalRecipients = + recipients.length > 0 + ? recipients + : usernames.length > 0 + ? usernames + : formattedGroupId + ? [formattedGroupId] + : []; + + const textStylesRaw = p["text-style"] as string[] | undefined; + const textStyles = textStylesRaw?.map((s) => { + const [start, length, style] = s.split(":"); + return { start: Number(start), length: Number(length), style }; + }); + + const result = await containerSendMessage({ + baseUrl: opts.baseUrl, + account: (p.account as string) ?? "", + recipients: finalRecipients, + message: (p.message as string) ?? "", + textStyles, + attachments: p.attachments as string[] | undefined, + timeoutMs: opts.timeoutMs, + }); + return result as T; + } + + case "sendTyping": { + const recipient = stripUuidPrefix( + (p.recipient as string[] | undefined)?.[0] ?? (p.groupId as string | undefined) ?? "", + ); + await containerSendTyping({ + baseUrl: opts.baseUrl, + account: (p.account as string) ?? "", + recipient, + stop: p.stop as boolean | undefined, + timeoutMs: opts.timeoutMs, + }); + return undefined as T; + } + + case "sendReceipt": { + const recipient = stripUuidPrefix((p.recipient as string[] | undefined)?.[0] ?? ""); + await containerSendReceipt({ + baseUrl: opts.baseUrl, + account: (p.account as string) ?? "", + recipient, + timestamp: p.targetTimestamp as number, + type: p.type as "read" | "viewed" | undefined, + timeoutMs: opts.timeoutMs, + }); + return undefined as T; + } + + case "sendReaction": { + const recipient = stripUuidPrefix((p.recipients as string[] | undefined)?.[0] ?? ""); + const groupId = (p.groupIds as string[] | undefined)?.[0] ?? undefined; + const formattedGroupId = groupId ? formatGroupIdForContainer(groupId) : undefined; + const reactionParams = { + baseUrl: opts.baseUrl, + account: (p.account as string) ?? "", + recipient, + emoji: (p.emoji as string) ?? "", + targetAuthor: stripUuidPrefix((p.targetAuthor as string) ?? recipient), + targetTimestamp: p.targetTimestamp as number, + groupId: formattedGroupId, + timeoutMs: opts.timeoutMs, + }; + const fn = p.remove ? containerRemoveReaction : containerSendReaction; + return (await fn(reactionParams)) as T; + } + + case "getAttachment": { + const attachmentId = p.id as string; + const buffer = await containerFetchAttachment(attachmentId, { + baseUrl: opts.baseUrl, + timeoutMs: opts.timeoutMs, + }); + // Convert to native format: { data: base64String } + if (!buffer) { + return { data: undefined } as T; + } + return { data: buffer.toString("base64") } as T; + } + + case "version": { + const result = await containerRestRequest<{ versions?: string[]; build?: number }>( + "/v1/about", + { baseUrl: opts.baseUrl, timeoutMs: opts.timeoutMs }, + ); + return result as T; + } + + default: + throw new Error(`Unsupported container RPC method: ${method}`); + } +} diff --git a/extensions/signal/src/monitor.tool-result.test-harness.ts b/extensions/signal/src/monitor.tool-result.test-harness.ts index 364b86c5bdf..5da0fb64d47 100644 --- a/extensions/signal/src/monitor.tool-result.test-harness.ts +++ b/extensions/signal/src/monitor.tool-result.test-harness.ts @@ -150,8 +150,14 @@ vi.mock("./client.js", () => ({ signalRpcRequest: (...args: unknown[]) => signalRpcRequestMock(...args), })); -vi.mock("./daemon.js", async () => { - const actual = await vi.importActual("./daemon.js"); +vi.mock("./client-adapter.js", () => ({ + streamSignalEvents: (...args: unknown[]) => streamMock(...args), + signalCheck: (...args: unknown[]) => signalCheckMock(...args), + signalRpcRequest: (...args: unknown[]) => signalRpcRequestMock(...args), +})); + +vi.mock("./daemon.js", async (importOriginal) => { + const actual = await importOriginal(); return { ...actual, spawnSignalDaemon: (...args: unknown[]) => spawnSignalDaemonMock(...args), diff --git a/extensions/signal/src/monitor.ts b/extensions/signal/src/monitor.ts index 9aa32731b1d..9251adb0163 100644 --- a/extensions/signal/src/monitor.ts +++ b/extensions/signal/src/monitor.ts @@ -24,7 +24,7 @@ import { createNonExitingRuntime, type RuntimeEnv } from "openclaw/plugin-sdk/ru import { normalizeStringEntries } from "openclaw/plugin-sdk/text-runtime"; import { normalizeE164 } from "openclaw/plugin-sdk/text-runtime"; import { resolveSignalAccount } from "./accounts.js"; -import { signalCheck, signalRpcRequest } from "./client.js"; +import { signalRpcRequest, signalCheck } from "./client-adapter.js"; import { formatSignalDaemonExit, spawnSignalDaemon, type SignalDaemonHandle } from "./daemon.js"; import { isSignalSenderAllowed, type resolveSignalSender } from "./identity.js"; import { createSignalEventHandler } from "./monitor/event-handler.js"; @@ -380,6 +380,7 @@ export async function monitorSignalProvider(opts: MonitorSignalOpts = {}): Promi const waitForTransportReadyFn = opts.waitForTransportReady ?? waitForTransportReady; const autoStart = opts.autoStart ?? accountInfo.config.autoStart ?? !accountInfo.config.httpUrl; + const configuredApiMode = cfg.channels?.signal?.apiMode ?? "auto"; const startupTimeoutMs = Math.min( 120_000, Math.max(1_000, opts.startupTimeoutMs ?? accountInfo.config.startupTimeoutMs ?? 30_000), @@ -388,6 +389,12 @@ export async function monitorSignalProvider(opts: MonitorSignalOpts = {}): Promi const daemonLifecycle = createSignalDaemonLifecycle({ abortSignal: opts.abortSignal }); let daemonHandle: SignalDaemonHandle | null = null; + if (autoStart && configuredApiMode === "container") { + throw new Error( + "channels.signal.autoStart=true is incompatible with channels.signal.apiMode=container", + ); + } + if (autoStart) { const cliPath = opts.cliPath ?? accountInfo.config.cliPath ?? "signal-cli"; const httpHost = opts.httpHost ?? accountInfo.config.httpHost ?? "127.0.0.1"; diff --git a/extensions/signal/src/probe.test.ts b/extensions/signal/src/probe.test.ts index 30816129107..4486c758ba3 100644 --- a/extensions/signal/src/probe.test.ts +++ b/extensions/signal/src/probe.test.ts @@ -1,5 +1,5 @@ import { beforeEach, describe, expect, it, vi } from "vitest"; -import * as clientModule from "./client.js"; +import * as clientAdapterModule from "./client-adapter.js"; import { classifySignalCliLogLine } from "./daemon.js"; import { probeSignal } from "./probe.js"; @@ -9,12 +9,12 @@ describe("probeSignal", () => { }); it("extracts version from {version} result", async () => { - vi.spyOn(clientModule, "signalCheck").mockResolvedValueOnce({ + vi.spyOn(clientAdapterModule, "signalCheck").mockResolvedValueOnce({ ok: true, status: 200, error: null, }); - vi.spyOn(clientModule, "signalRpcRequest").mockResolvedValueOnce({ version: "0.13.22" }); + vi.spyOn(clientAdapterModule, "signalRpcRequest").mockResolvedValueOnce({ version: "0.13.22" }); const res = await probeSignal("http://127.0.0.1:8080", 1000); @@ -24,7 +24,7 @@ describe("probeSignal", () => { }); it("returns ok=false when /check fails", async () => { - vi.spyOn(clientModule, "signalCheck").mockResolvedValueOnce({ + vi.spyOn(clientAdapterModule, "signalCheck").mockResolvedValueOnce({ ok: false, status: 503, error: "HTTP 503", diff --git a/extensions/signal/src/probe.ts b/extensions/signal/src/probe.ts index 4fd26f12355..36684961501 100644 --- a/extensions/signal/src/probe.ts +++ b/extensions/signal/src/probe.ts @@ -1,5 +1,5 @@ import type { BaseProbeResult } from "openclaw/plugin-sdk/channel-contract"; -import { signalCheck, signalRpcRequest } from "./client.js"; +import { signalCheck, signalRpcRequest } from "./client-adapter.js"; export type SignalProbe = BaseProbeResult & { status?: number | null; diff --git a/extensions/signal/src/send-reactions.test.ts b/extensions/signal/src/send-reactions.test.ts index 698d836df0e..074f7c4d19b 100644 --- a/extensions/signal/src/send-reactions.test.ts +++ b/extensions/signal/src/send-reactions.test.ts @@ -20,7 +20,7 @@ vi.mock("./accounts.js", () => ({ }), })); -vi.mock("./client.js", () => ({ +vi.mock("./client-adapter.js", () => ({ signalRpcRequest: (...args: unknown[]) => rpcMock(...args), })); diff --git a/extensions/signal/src/send-reactions.ts b/extensions/signal/src/send-reactions.ts index 6b8c3791b2d..031fe9003d4 100644 --- a/extensions/signal/src/send-reactions.ts +++ b/extensions/signal/src/send-reactions.ts @@ -5,7 +5,7 @@ import { loadConfig } from "openclaw/plugin-sdk/config-runtime"; import type { OpenClawConfig } from "openclaw/plugin-sdk/config-runtime"; import { resolveSignalAccount } from "./accounts.js"; -import { signalRpcRequest } from "./client.js"; +import { signalRpcRequest } from "./client-adapter.js"; import { resolveSignalRpcContext } from "./rpc-context.js"; export type SignalReactionOpts = { diff --git a/extensions/signal/src/send.ts b/extensions/signal/src/send.ts index c102624836e..08a368a6c43 100644 --- a/extensions/signal/src/send.ts +++ b/extensions/signal/src/send.ts @@ -3,7 +3,7 @@ import { resolveMarkdownTableMode } from "openclaw/plugin-sdk/config-runtime"; import { kindFromMime } from "openclaw/plugin-sdk/media-runtime"; import { resolveOutboundAttachmentFromUrl } from "openclaw/plugin-sdk/media-runtime"; import { resolveSignalAccount } from "./accounts.js"; -import { signalRpcRequest } from "./client.js"; +import { signalRpcRequest } from "./client-adapter.js"; import { markdownToSignalText, type SignalTextStyleRange } from "./format.js"; import { resolveSignalRpcContext } from "./rpc-context.js"; diff --git a/extensions/signal/src/sse-reconnect.ts b/extensions/signal/src/sse-reconnect.ts index f825a211afb..cc38909dda9 100644 --- a/extensions/signal/src/sse-reconnect.ts +++ b/extensions/signal/src/sse-reconnect.ts @@ -2,7 +2,7 @@ import type { BackoffPolicy } from "openclaw/plugin-sdk/infra-runtime"; import { computeBackoff, sleepWithAbort } from "openclaw/plugin-sdk/infra-runtime"; import { logVerbose, shouldLogVerbose } from "openclaw/plugin-sdk/runtime-env"; import type { RuntimeEnv } from "openclaw/plugin-sdk/runtime-env"; -import { type SignalSseEvent, streamSignalEvents } from "./client.js"; +import { type SignalAdapterEvent, streamSignalEvents } from "./client-adapter.js"; const DEFAULT_RECONNECT_POLICY: BackoffPolicy = { initialMs: 1_000, @@ -16,7 +16,7 @@ type RunSignalSseLoopParams = { account?: string; abortSignal?: AbortSignal; runtime: RuntimeEnv; - onEvent: (event: SignalSseEvent) => void; + onEvent: (event: SignalAdapterEvent) => void; policy?: Partial; }; @@ -47,26 +47,30 @@ export async function runSignalSseLoop({ baseUrl, account, abortSignal, - onEvent: (event) => { + onEvent: (event: SignalAdapterEvent) => { reconnectAttempts = 0; onEvent(event); }, + logger: { + log: runtime.log, + error: runtime.error, + }, }); if (abortSignal?.aborted) { return; } reconnectAttempts += 1; const delayMs = computeBackoff(reconnectPolicy, reconnectAttempts); - logReconnectVerbose(`Signal SSE stream ended, reconnecting in ${delayMs / 1000}s...`); + logReconnectVerbose(`Signal stream ended, reconnecting in ${delayMs / 1000}s...`); await sleepWithAbort(delayMs, abortSignal); } catch (err) { if (abortSignal?.aborted) { return; } - runtime.error?.(`Signal SSE stream error: ${String(err)}`); + runtime.error?.(`Signal stream error: ${String(err)}`); reconnectAttempts += 1; const delayMs = computeBackoff(reconnectPolicy, reconnectAttempts); - runtime.log?.(`Signal SSE connection lost, reconnecting in ${delayMs / 1000}s...`); + runtime.log?.(`Signal connection lost, reconnecting in ${delayMs / 1000}s...`); try { await sleepWithAbort(delayMs, abortSignal); } catch (sleepErr) { diff --git a/src/config/types.signal.ts b/src/config/types.signal.ts index bd33a64cf51..120debc02e2 100644 --- a/src/config/types.signal.ts +++ b/src/config/types.signal.ts @@ -3,6 +3,7 @@ import type { GroupToolPolicyBySenderConfig, GroupToolPolicyConfig } from "./typ export type SignalReactionNotificationMode = "off" | "own" | "all" | "allowlist"; export type SignalReactionLevel = "off" | "ack" | "minimal" | "extensive"; +export type SignalApiMode = "auto" | "native" | "container"; export type SignalGroupConfig = { requireMention?: boolean; @@ -55,6 +56,14 @@ export type SignalAccountConfig = CommonChannelMessagingConfig & { }; export type SignalConfig = { + /** + * Signal API mode (channel-global): + * - "auto" (default): Auto-detect based on available endpoints + * - "native": Use native signal-cli with JSON-RPC + SSE (/api/v1/rpc, /api/v1/events) + * - "container": Use bbernhard/signal-cli-rest-api with REST + WebSocket (/v2/send, /v1/receive/{account}). + * Requires the container to run with MODE=json-rpc for real-time message receiving. + */ + apiMode?: SignalApiMode; /** Optional per-account Signal configuration (multi-account). */ accounts?: Record; /** Optional default account id when multiple accounts are configured. */