Signal: add container REST API support with unified adapter

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
OpenClaw Bot 2026-03-18 05:54:54 +00:00
parent 9fb78453e0
commit 24e394534e
15 changed files with 1902 additions and 21 deletions

View File

@ -0,0 +1 @@
- Signal/container mode: add REST API support for bbernhard/signal-cli-rest-api containerized deployments via a unified adapter layer, with automatic mode detection and `channels.signal.apiMode` config. (#10240) Thanks @Hua688.

View File

@ -1,5 +1,5 @@
---
summary: "Signal support via signal-cli (JSON-RPC + SSE), setup paths, and number model"
summary: "Signal support via signal-cli (native daemon or bbernhard container), setup paths, and number model"
read_when:
- Setting up Signal support
- Debugging Signal send/receive
@ -8,12 +8,14 @@ title: "Signal"
# Signal (signal-cli)
Status: external CLI integration. Gateway talks to `signal-cli` over HTTP JSON-RPC + SSE.
Status: external CLI integration. Gateway talks to `signal-cli` over HTTP — either native daemon (JSON-RPC + SSE) or bbernhard/signal-cli-rest-api container (REST + WebSocket).
## Prerequisites
- OpenClaw installed on your server (Linux flow below tested on Ubuntu 24).
- `signal-cli` available on the host where the gateway runs.
- One of:
- `signal-cli` available on the host (native mode), **or**
- `bbernhard/signal-cli-rest-api` Docker container (container mode).
- A phone number that can receive one verification SMS (for SMS registration path).
- Browser access for Signal captcha (`signalcaptchas.org`) during registration.
@ -179,6 +181,54 @@ If you want to manage `signal-cli` yourself (slow JVM cold starts, container ini
This skips auto-spawn and the startup wait inside OpenClaw. For slow starts when auto-spawning, set `channels.signal.startupTimeoutMs`.
## Container mode (bbernhard/signal-cli-rest-api)
Instead of running `signal-cli` natively, you can use the [bbernhard/signal-cli-rest-api](https://github.com/bbernhard/signal-cli-rest-api) Docker container. This wraps `signal-cli` behind a REST API and WebSocket interface.
Requirements:
- The container **must** run with `MODE=json-rpc` for real-time message receiving.
- Register or link your Signal account inside the container before connecting OpenClaw.
Example `docker-compose.yml` service:
```yaml
signal-cli:
image: bbernhard/signal-cli-rest-api:latest
environment:
MODE: json-rpc
ports:
- "8080:8080"
volumes:
- signal-cli-data:/home/.local/share/signal-cli
```
OpenClaw config:
```json5
{
channels: {
signal: {
enabled: true,
account: "+15551234567",
httpUrl: "http://signal-cli:8080",
autoStart: false,
apiMode: "container", // or "auto" to detect automatically
},
},
}
```
The `apiMode` field controls which protocol OpenClaw uses:
| Value | Behavior |
| ------------- | ------------------------------------------------------------------------------------ |
| `"auto"` | (Default) Probes both endpoints and picks whichever responds first |
| `"native"` | Force native signal-cli (JSON-RPC at `/api/v1/rpc`, SSE at `/api/v1/events`) |
| `"container"` | Force bbernhard container (REST at `/v2/send`, WebSocket at `/v1/receive/{account}`) |
When `apiMode` is `"auto"`, OpenClaw caches the detected mode for 30 seconds to avoid repeated probes.
## Access control (DMs + groups)
DMs:
@ -201,7 +251,8 @@ Groups:
## How it works (behavior)
- `signal-cli` runs as a daemon; the gateway reads events via SSE.
- Native mode: `signal-cli` runs as a daemon; the gateway reads events via SSE.
- Container mode: the gateway sends via REST API and receives via WebSocket.
- Inbound messages are normalized into the shared channel envelope.
- Replies always route back to the same number or group.
@ -300,6 +351,7 @@ Full configuration: [Configuration](/gateway/configuration)
Provider options:
- `channels.signal.enabled`: enable/disable channel startup.
- `channels.signal.apiMode`: `auto | native | container` (default: auto). See [Container mode](#container-mode-bbernhardsignal-cli-rest-api).
- `channels.signal.account`: E.164 for the bot account.
- `channels.signal.cliPath`: path to `signal-cli`.
- `channels.signal.httpUrl`: full daemon URL (overrides host/port).

View File

@ -0,0 +1,433 @@
import * as configModule from "openclaw/plugin-sdk/config-runtime";
import { describe, expect, it, vi, beforeEach } from "vitest";
import {
signalRpcRequest,
detectSignalApiMode,
signalCheck,
streamSignalEvents,
fetchAttachment,
} from "./client-adapter.js";
import * as containerClientModule from "./client-container.js";
import * as nativeClientModule from "./client.js";
const mockNativeCheck = vi.fn();
const mockNativeRpcRequest = vi.fn();
const mockNativeStreamEvents = vi.fn();
const mockContainerCheck = vi.fn();
const mockContainerRpcRequest = vi.fn();
const mockContainerFetchAttachment = vi.fn();
const mockStreamContainerEvents = vi.fn();
const mockLoadConfig = vi.fn(() => ({}));
beforeEach(() => {
vi.spyOn(nativeClientModule, "signalCheck").mockImplementation(mockNativeCheck as any);
vi.spyOn(nativeClientModule, "signalRpcRequest").mockImplementation(mockNativeRpcRequest as any);
vi.spyOn(nativeClientModule, "streamSignalEvents").mockImplementation(
mockNativeStreamEvents as any,
);
vi.spyOn(containerClientModule, "containerCheck").mockImplementation(mockContainerCheck as any);
vi.spyOn(containerClientModule, "containerRpcRequest").mockImplementation(
mockContainerRpcRequest as any,
);
vi.spyOn(containerClientModule, "containerFetchAttachment").mockImplementation(
mockContainerFetchAttachment as any,
);
vi.spyOn(containerClientModule, "streamContainerEvents").mockImplementation(
mockStreamContainerEvents as any,
);
vi.spyOn(configModule, "loadConfig").mockImplementation(mockLoadConfig as any);
});
function setApiMode(mode: "native" | "container" | "auto") {
mockLoadConfig.mockReturnValue({
channels: {
signal: {
apiMode: mode,
},
},
});
}
describe("detectSignalApiMode", () => {
beforeEach(() => {
vi.clearAllMocks();
setApiMode("native");
});
it("returns native when native endpoint responds", async () => {
mockNativeCheck.mockResolvedValue({ ok: true, status: 200 });
mockContainerCheck.mockResolvedValue({ ok: false, status: 404 });
const result = await detectSignalApiMode("http://localhost:8080");
expect(result).toBe("native");
});
it("returns container when only container endpoint responds", async () => {
mockNativeCheck.mockResolvedValue({ ok: false, status: 404 });
mockContainerCheck.mockResolvedValue({ ok: true, status: 200 });
const result = await detectSignalApiMode("http://localhost:8080");
expect(result).toBe("container");
});
it("prefers native when both endpoints respond", async () => {
mockNativeCheck.mockResolvedValue({ ok: true, status: 200 });
mockContainerCheck.mockResolvedValue({ ok: true, status: 200 });
const result = await detectSignalApiMode("http://localhost:8080");
expect(result).toBe("native");
});
it("throws error when neither endpoint responds", async () => {
mockNativeCheck.mockResolvedValue({ ok: false, status: null, error: "Connection refused" });
mockContainerCheck.mockResolvedValue({ ok: false, status: null, error: "Connection refused" });
await expect(detectSignalApiMode("http://localhost:8080")).rejects.toThrow(
"Signal API not reachable at http://localhost:8080",
);
});
it("handles exceptions from check functions", async () => {
mockNativeCheck.mockRejectedValue(new Error("Network error"));
mockContainerCheck.mockRejectedValue(new Error("Network error"));
await expect(detectSignalApiMode("http://localhost:8080")).rejects.toThrow(
"Signal API not reachable",
);
});
it("respects timeout parameter", async () => {
mockNativeCheck.mockResolvedValue({ ok: true, status: 200 });
mockContainerCheck.mockResolvedValue({ ok: false });
await detectSignalApiMode("http://localhost:8080", 5000);
expect(mockNativeCheck).toHaveBeenCalledWith("http://localhost:8080", 5000);
expect(mockContainerCheck).toHaveBeenCalledWith("http://localhost:8080", 5000);
});
});
describe("signalRpcRequest", () => {
beforeEach(() => {
vi.clearAllMocks();
setApiMode("native");
});
it("routes to native JSON-RPC for native mode", async () => {
mockNativeRpcRequest.mockResolvedValue({ timestamp: 1700000000000 });
const result = await signalRpcRequest(
"send",
{ message: "Hello", account: "+14259798283", recipient: ["+15550001111"] },
{ baseUrl: "http://localhost:8080" },
);
expect(result).toEqual({ timestamp: 1700000000000 });
expect(mockNativeRpcRequest).toHaveBeenCalledWith(
"send",
expect.objectContaining({ message: "Hello" }),
expect.objectContaining({ baseUrl: "http://localhost:8080" }),
);
expect(mockContainerRpcRequest).not.toHaveBeenCalled();
});
it("routes to container RPC for container mode", async () => {
setApiMode("container");
mockContainerRpcRequest.mockResolvedValue({ timestamp: 1700000000000 });
const result = await signalRpcRequest(
"send",
{ message: "Hello", account: "+14259798283", recipient: ["+15550001111"] },
{ baseUrl: "http://localhost:8080" },
);
expect(result).toEqual({ timestamp: 1700000000000 });
expect(mockContainerRpcRequest).toHaveBeenCalledWith(
"send",
expect.objectContaining({ message: "Hello" }),
expect.objectContaining({ baseUrl: "http://localhost:8080" }),
);
expect(mockNativeRpcRequest).not.toHaveBeenCalled();
});
it("passes all RPC methods through to native", async () => {
mockNativeRpcRequest.mockResolvedValue({});
await signalRpcRequest(
"sendTyping",
{ account: "+1", recipient: ["+2"] },
{ baseUrl: "http://localhost:8080" },
);
expect(mockNativeRpcRequest).toHaveBeenCalledWith(
"sendTyping",
expect.anything(),
expect.anything(),
);
});
it("passes all RPC methods through to container", async () => {
setApiMode("container");
mockContainerRpcRequest.mockResolvedValue({});
await signalRpcRequest(
"sendReceipt",
{ account: "+1", recipient: ["+2"] },
{ baseUrl: "http://localhost:8080" },
);
expect(mockContainerRpcRequest).toHaveBeenCalledWith(
"sendReceipt",
expect.anything(),
expect.anything(),
);
});
});
describe("signalCheck", () => {
beforeEach(() => {
vi.clearAllMocks();
setApiMode("native");
});
it("uses native check for native mode", async () => {
mockNativeCheck.mockResolvedValue({ ok: true, status: 200 });
const result = await signalCheck("http://localhost:8080");
expect(result).toEqual({ ok: true, status: 200 });
expect(mockNativeCheck).toHaveBeenCalledWith("http://localhost:8080", 10000);
expect(mockContainerCheck).not.toHaveBeenCalled();
});
it("uses container check for container mode", async () => {
setApiMode("container");
mockContainerCheck.mockResolvedValue({ ok: true, status: 200 });
const result = await signalCheck("http://localhost:8080");
expect(result).toEqual({ ok: true, status: 200 });
expect(mockContainerCheck).toHaveBeenCalledWith("http://localhost:8080", 10000);
expect(mockNativeCheck).not.toHaveBeenCalled();
});
it("respects timeout parameter", async () => {
mockNativeCheck.mockResolvedValue({ ok: true });
await signalCheck("http://localhost:8080", 5000);
expect(mockNativeCheck).toHaveBeenCalledWith("http://localhost:8080", 5000);
});
});
describe("streamSignalEvents", () => {
beforeEach(() => {
vi.clearAllMocks();
setApiMode("native");
});
it("uses native SSE for native mode", async () => {
mockNativeStreamEvents.mockResolvedValue();
const onEvent = vi.fn();
await streamSignalEvents({
baseUrl: "http://localhost:8080",
account: "+14259798283",
onEvent,
});
expect(mockNativeStreamEvents).toHaveBeenCalledWith(
expect.objectContaining({
baseUrl: "http://localhost:8080",
account: "+14259798283",
}),
);
expect(mockStreamContainerEvents).not.toHaveBeenCalled();
});
it("uses container WebSocket for container mode", async () => {
setApiMode("container");
mockStreamContainerEvents.mockResolvedValue();
const onEvent = vi.fn();
await streamSignalEvents({
baseUrl: "http://localhost:8080",
account: "+14259798283",
onEvent,
});
expect(mockStreamContainerEvents).toHaveBeenCalledWith(
expect.objectContaining({
baseUrl: "http://localhost:8080",
account: "+14259798283",
}),
);
expect(mockNativeStreamEvents).not.toHaveBeenCalled();
});
it("passes native SSE events through unchanged", async () => {
const payload = { envelope: { sourceNumber: "+1555000111" } };
mockNativeStreamEvents.mockImplementation(async (params) => {
params.onEvent({ event: "receive", data: JSON.stringify(payload) });
});
const events: unknown[] = [];
await streamSignalEvents({
baseUrl: "http://localhost:8080",
onEvent: (evt) => events.push(evt),
});
expect(events).toHaveLength(1);
expect(events[0]).toEqual({ event: "receive", data: JSON.stringify(payload) });
});
it("converts container events to SSE-like receive events", async () => {
setApiMode("container");
mockStreamContainerEvents.mockImplementation(async (params) => {
params.onEvent({ envelope: { sourceNumber: "+1555000111" } });
});
const events: unknown[] = [];
await streamSignalEvents({
baseUrl: "http://localhost:8080",
onEvent: (evt) => events.push(evt),
});
expect(events).toHaveLength(1);
expect(events[0]).toEqual({
event: "receive",
data: JSON.stringify({ envelope: { sourceNumber: "+1555000111" } }),
});
});
it("passes abort signal to underlying stream", async () => {
mockNativeStreamEvents.mockResolvedValue();
const abortController = new AbortController();
await streamSignalEvents({
baseUrl: "http://localhost:8080",
abortSignal: abortController.signal,
onEvent: vi.fn(),
});
expect(mockNativeStreamEvents).toHaveBeenCalledWith(
expect.objectContaining({
abortSignal: abortController.signal,
}),
);
});
});
describe("fetchAttachment", () => {
beforeEach(() => {
vi.clearAllMocks();
setApiMode("native");
});
it("uses native JSON-RPC for native mode with sender", async () => {
mockNativeRpcRequest.mockResolvedValue({ data: "base64data" });
const result = await fetchAttachment({
baseUrl: "http://localhost:8080",
account: "+14259798283",
attachmentId: "attachment-123",
sender: "+15550001111",
});
expect(result).toBeInstanceOf(Buffer);
expect(mockNativeRpcRequest).toHaveBeenCalledWith(
"getAttachment",
expect.objectContaining({
id: "attachment-123",
account: "+14259798283",
recipient: "+15550001111",
}),
expect.anything(),
);
});
it("uses container REST for container mode", async () => {
setApiMode("container");
const mockBuffer = Buffer.from([0x89, 0x50, 0x4e, 0x47]);
mockContainerFetchAttachment.mockResolvedValue(mockBuffer);
const result = await fetchAttachment({
baseUrl: "http://localhost:8080",
attachmentId: "attachment-123",
});
expect(result).toBe(mockBuffer);
expect(mockContainerFetchAttachment).toHaveBeenCalledWith(
"attachment-123",
expect.objectContaining({ baseUrl: "http://localhost:8080" }),
);
});
it("returns null for native mode without sender or groupId", async () => {
const result = await fetchAttachment({
baseUrl: "http://localhost:8080",
attachmentId: "attachment-123",
});
expect(result).toBeNull();
expect(mockNativeRpcRequest).not.toHaveBeenCalled();
});
it("uses groupId when provided for native mode", async () => {
mockNativeRpcRequest.mockResolvedValue({ data: "base64data" });
await fetchAttachment({
baseUrl: "http://localhost:8080",
attachmentId: "attachment-123",
groupId: "group-123",
});
expect(mockNativeRpcRequest).toHaveBeenCalledWith(
"getAttachment",
expect.objectContaining({ groupId: "group-123" }),
expect.anything(),
);
});
it("returns null when native RPC returns no data", async () => {
mockNativeRpcRequest.mockResolvedValue({});
const result = await fetchAttachment({
baseUrl: "http://localhost:8080",
attachmentId: "attachment-123",
sender: "+15550001111",
});
expect(result).toBeNull();
});
it("prefers groupId over sender when both provided", async () => {
mockNativeRpcRequest.mockResolvedValue({ data: "base64data" });
await fetchAttachment({
baseUrl: "http://localhost:8080",
attachmentId: "attachment-123",
sender: "+15550001111",
groupId: "group-123",
});
const callParams = mockNativeRpcRequest.mock.calls[0][1];
expect(callParams).toHaveProperty("groupId", "group-123");
expect(callParams).not.toHaveProperty("recipient");
});
it("passes timeout to container fetch", async () => {
setApiMode("container");
mockContainerFetchAttachment.mockResolvedValue(Buffer.from([]));
await fetchAttachment({
baseUrl: "http://localhost:8080",
attachmentId: "attachment-123",
timeoutMs: 60000,
});
expect(mockContainerFetchAttachment).toHaveBeenCalledWith(
"attachment-123",
expect.objectContaining({
timeoutMs: 60000,
}),
);
});
});

View File

@ -0,0 +1,187 @@
/**
* Signal client adapter - unified interface for both native signal-cli and bbernhard container.
*
* This adapter provides a single API that routes to the appropriate implementation
* based on the configured API mode. Exports mirror client.ts names so consumers
* only need to change their import path.
*/
import { loadConfig } from "openclaw/plugin-sdk/config-runtime";
import {
containerCheck,
containerRpcRequest,
streamContainerEvents,
containerFetchAttachment,
} from "./client-container.js";
import type { SignalRpcOptions } from "./client.js";
import {
signalCheck as nativeCheck,
signalRpcRequest as nativeRpcRequest,
streamSignalEvents as nativeStreamEvents,
} from "./client.js";
const DEFAULT_TIMEOUT_MS = 10_000;
const MODE_CACHE_TTL_MS = 30_000;
export type SignalAdapterEvent = {
event?: string;
data?: string;
};
// Re-export the options type so consumers can import it from the adapter.
export type { SignalRpcOptions } from "./client.js";
// Cache auto-detected modes per baseUrl to avoid repeated network probes.
const detectedModeCache = new Map<string, { mode: "native" | "container"; expiresAt: number }>();
/**
* Resolve the effective API mode for a given baseUrl + accountId.
* Reads config internally; callers never need to pass apiMode.
*/
async function resolveApiMode(
baseUrl: string,
_accountId?: string,
): Promise<"native" | "container"> {
const cfg = loadConfig();
const configured = cfg.channels?.signal?.apiMode ?? "auto";
if (configured === "native" || configured === "container") {
return configured;
}
// "auto" — check cache first, then probe
const cached = detectedModeCache.get(baseUrl);
if (cached && cached.expiresAt > Date.now()) {
return cached.mode;
}
const detected = await detectSignalApiMode(baseUrl);
detectedModeCache.set(baseUrl, { mode: detected, expiresAt: Date.now() + MODE_CACHE_TTL_MS });
return detected;
}
/**
* Detect which Signal API mode is available by probing endpoints.
* First endpoint to respond OK wins.
*/
export async function detectSignalApiMode(
baseUrl: string,
timeoutMs = DEFAULT_TIMEOUT_MS,
): Promise<"native" | "container"> {
const nativePromise = nativeCheck(baseUrl, timeoutMs).then((r) =>
r.ok ? ("native" as const) : Promise.reject(new Error("native not ok")),
);
const containerPromise = containerCheck(baseUrl, timeoutMs).then((r) =>
r.ok ? ("container" as const) : Promise.reject(new Error("container not ok")),
);
try {
return await Promise.any([nativePromise, containerPromise]);
} catch {
throw new Error(`Signal API not reachable at ${baseUrl}`);
}
}
/**
* Drop-in replacement for native signalRpcRequest.
* Routes to native JSON-RPC or container REST based on config.
*/
export async function signalRpcRequest<T = unknown>(
method: string,
params: Record<string, unknown> | undefined,
opts: SignalRpcOptions & { accountId?: string },
): Promise<T> {
const mode = await resolveApiMode(opts.baseUrl, opts.accountId);
if (mode === "native") {
return nativeRpcRequest<T>(method, params, opts);
}
return containerRpcRequest<T>(method, params, opts);
}
/**
* Drop-in replacement for native signalCheck.
*/
export async function signalCheck(
baseUrl: string,
timeoutMs = DEFAULT_TIMEOUT_MS,
): Promise<{ ok: boolean; status?: number | null; error?: string | null }> {
const mode = await resolveApiMode(baseUrl);
if (mode === "container") {
return containerCheck(baseUrl, timeoutMs);
}
return nativeCheck(baseUrl, timeoutMs);
}
/**
* Drop-in replacement for native streamSignalEvents.
* Container mode uses WebSocket; native uses SSE.
*/
export async function streamSignalEvents(params: {
baseUrl: string;
account?: string;
accountId?: string;
abortSignal?: AbortSignal;
onEvent: (event: SignalAdapterEvent) => void;
logger?: { log?: (msg: string) => void; error?: (msg: string) => void };
}): Promise<void> {
const mode = await resolveApiMode(params.baseUrl, params.accountId);
if (mode === "container") {
return streamContainerEvents({
baseUrl: params.baseUrl,
account: params.account,
abortSignal: params.abortSignal,
onEvent: (event) => params.onEvent({ event: "receive", data: JSON.stringify(event) }),
logger: params.logger,
});
}
return nativeStreamEvents({
baseUrl: params.baseUrl,
account: params.account,
abortSignal: params.abortSignal,
onEvent: (event) => params.onEvent(event),
});
}
/**
* Fetch attachment, routing to native or container implementation.
*/
export async function fetchAttachment(params: {
baseUrl: string;
account?: string;
accountId?: string;
attachmentId: string;
sender?: string;
groupId?: string;
timeoutMs?: number;
}): Promise<Buffer | null> {
const mode = await resolveApiMode(params.baseUrl, params.accountId);
if (mode === "container") {
return containerFetchAttachment(params.attachmentId, {
baseUrl: params.baseUrl,
timeoutMs: params.timeoutMs,
});
}
const rpcParams: Record<string, unknown> = {
id: params.attachmentId,
};
if (params.account) {
rpcParams.account = params.account;
}
if (params.groupId) {
rpcParams.groupId = params.groupId;
} else if (params.sender) {
rpcParams.recipient = params.sender;
} else {
return null;
}
const result = await nativeRpcRequest<{ data?: string }>("getAttachment", rpcParams, {
baseUrl: params.baseUrl,
timeoutMs: params.timeoutMs,
});
if (!result?.data) {
return null;
}
return Buffer.from(result.data, "base64");
}

View File

@ -0,0 +1,638 @@
import * as fetchModule from "openclaw/plugin-sdk/infra-runtime";
import { describe, expect, it, vi, beforeEach } from "vitest";
import {
containerCheck,
containerRestRequest,
containerSendMessage,
containerSendTyping,
containerSendReceipt,
containerFetchAttachment,
containerSendReaction,
containerRemoveReaction,
} from "./client-container.js";
// spyOn approach works with vitest forks pool for cross-directory imports
const mockFetch = vi.fn();
beforeEach(() => {
vi.spyOn(fetchModule, "resolveFetch").mockReturnValue(mockFetch as unknown as typeof fetch);
});
// Mock WebSocket (not testing streamContainerEvents in unit tests due to complexity)
vi.mock("ws", () => ({
default: class MockWebSocket {
on() {}
close() {}
},
}));
describe("containerCheck", () => {
beforeEach(() => {
vi.clearAllMocks();
});
it("returns ok:true when /v1/about returns 200", async () => {
mockFetch.mockResolvedValue({
ok: true,
status: 200,
});
const result = await containerCheck("http://localhost:8080");
expect(result).toEqual({ ok: true, status: 200, error: null });
expect(mockFetch).toHaveBeenCalledWith(
"http://localhost:8080/v1/about",
expect.objectContaining({ method: "GET" }),
);
});
it("returns ok:false when /v1/about returns 404", async () => {
mockFetch.mockResolvedValue({
ok: false,
status: 404,
});
const result = await containerCheck("http://localhost:8080");
expect(result).toEqual({ ok: false, status: 404, error: "HTTP 404" });
});
it("returns ok:false with error message on fetch failure", async () => {
mockFetch.mockRejectedValue(new Error("Network error"));
const result = await containerCheck("http://localhost:8080");
expect(result).toEqual({ ok: false, status: null, error: "Network error" });
});
it("normalizes base URL by removing trailing slash", async () => {
mockFetch.mockResolvedValue({ ok: true, status: 200 });
await containerCheck("http://localhost:8080/");
expect(mockFetch).toHaveBeenCalledWith("http://localhost:8080/v1/about", expect.anything());
});
it("adds http:// prefix when missing", async () => {
mockFetch.mockResolvedValue({ ok: true, status: 200 });
await containerCheck("localhost:8080");
expect(mockFetch).toHaveBeenCalledWith("http://localhost:8080/v1/about", expect.anything());
});
});
describe("containerRestRequest", () => {
beforeEach(() => {
vi.clearAllMocks();
});
it("makes GET request with correct endpoint", async () => {
mockFetch.mockResolvedValue({
ok: true,
status: 200,
text: async () => JSON.stringify({ version: "1.0" }),
});
const result = await containerRestRequest("/v1/about", { baseUrl: "http://localhost:8080" });
expect(result).toEqual({ version: "1.0" });
expect(mockFetch).toHaveBeenCalledWith(
"http://localhost:8080/v1/about",
expect.objectContaining({
method: "GET",
headers: { "Content-Type": "application/json" },
}),
);
});
it("makes POST request with body", async () => {
mockFetch.mockResolvedValue({
ok: true,
status: 201,
});
await containerRestRequest("/v2/send", { baseUrl: "http://localhost:8080" }, "POST", {
message: "test",
number: "+1234567890",
recipients: ["+1234567890"],
});
expect(mockFetch).toHaveBeenCalledWith(
"http://localhost:8080/v2/send",
expect.objectContaining({
method: "POST",
body: JSON.stringify({
message: "test",
number: "+1234567890",
recipients: ["+1234567890"],
}),
}),
);
});
it("returns undefined for 201 status", async () => {
mockFetch.mockResolvedValue({
ok: true,
status: 201,
});
const result = await containerRestRequest(
"/v2/send",
{ baseUrl: "http://localhost:8080" },
"POST",
);
expect(result).toBeUndefined();
});
it("returns undefined for 204 status", async () => {
mockFetch.mockResolvedValue({
ok: true,
status: 204,
});
const result = await containerRestRequest(
"/v1/typing-indicator/+1234567890",
{ baseUrl: "http://localhost:8080" },
"PUT",
);
expect(result).toBeUndefined();
});
it("throws error on non-ok response", async () => {
mockFetch.mockResolvedValue({
ok: false,
status: 500,
statusText: "Internal Server Error",
text: async () => "Server error details",
});
await expect(
containerRestRequest("/v2/send", { baseUrl: "http://localhost:8080" }, "POST"),
).rejects.toThrow("Signal REST 500: Server error details");
});
it("handles empty response body", async () => {
mockFetch.mockResolvedValue({
ok: true,
status: 200,
text: async () => "",
});
const result = await containerRestRequest("/v1/about", { baseUrl: "http://localhost:8080" });
expect(result).toBeUndefined();
});
it("respects custom timeout by using abort signal", async () => {
mockFetch.mockResolvedValue({
ok: true,
status: 200,
text: async () => "{}",
});
await containerRestRequest("/v1/about", { baseUrl: "http://localhost:8080", timeoutMs: 5000 });
// The timeout is enforced via AbortController, so we verify the call was made with a signal
expect(mockFetch).toHaveBeenCalled();
const callArgs = mockFetch.mock.calls[0];
expect(callArgs[1].signal).toBeDefined();
});
});
describe("containerSendMessage", () => {
beforeEach(() => {
vi.clearAllMocks();
});
it("sends message to recipients", async () => {
mockFetch.mockResolvedValue({
ok: true,
status: 200,
text: async () => JSON.stringify({ timestamp: 1700000000000 }),
});
const result = await containerSendMessage({
baseUrl: "http://localhost:8080",
account: "+14259798283",
recipients: ["+15550001111"],
message: "Hello world",
});
expect(result).toEqual({ timestamp: 1700000000000 });
expect(mockFetch).toHaveBeenCalledWith(
"http://localhost:8080/v2/send",
expect.objectContaining({
method: "POST",
body: JSON.stringify({
message: "Hello world",
number: "+14259798283",
recipients: ["+15550001111"],
}),
}),
);
});
it("includes text styles when provided", async () => {
mockFetch.mockResolvedValue({
ok: true,
status: 200,
text: async () => JSON.stringify({}),
});
await containerSendMessage({
baseUrl: "http://localhost:8080",
account: "+14259798283",
recipients: ["+15550001111"],
message: "Bold text",
textStyles: [{ start: 0, length: 4, style: "BOLD" }],
});
const callArgs = mockFetch.mock.calls[0];
const body = JSON.parse(callArgs[1].body);
expect(body["text_style"]).toEqual(["0:4:BOLD"]);
});
it("includes attachments as base64 data URIs", async () => {
const fs = await import("node:fs/promises");
const os = await import("node:os");
const path = await import("node:path");
// Create a temp file with known content
const tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), "signal-test-"));
const tmpFile = path.join(tmpDir, "test-image.jpg");
const content = Buffer.from([0xff, 0xd8, 0xff, 0xe0]); // JPEG magic bytes
await fs.writeFile(tmpFile, content);
mockFetch.mockResolvedValue({
ok: true,
status: 200,
text: async () => JSON.stringify({}),
});
await containerSendMessage({
baseUrl: "http://localhost:8080",
account: "+14259798283",
recipients: ["+15550001111"],
message: "Photo",
attachments: [tmpFile],
});
const callArgs = mockFetch.mock.calls[0];
const body = JSON.parse(callArgs[1].body);
expect(body.attachments).toBeUndefined();
expect(body.base64_attachments).toBeDefined();
expect(body.base64_attachments).toHaveLength(1);
expect(body.base64_attachments[0]).toMatch(
/^data:image\/jpeg;filename=test-image\.jpg;base64,/,
);
// Cleanup
await fs.rm(tmpDir, { recursive: true });
});
});
describe("containerSendTyping", () => {
beforeEach(() => {
vi.clearAllMocks();
});
it("sends typing indicator with PUT", async () => {
mockFetch.mockResolvedValue({
ok: true,
status: 204,
});
const result = await containerSendTyping({
baseUrl: "http://localhost:8080",
account: "+14259798283",
recipient: "+15550001111",
});
expect(result).toBe(true);
expect(mockFetch).toHaveBeenCalledWith(
"http://localhost:8080/v1/typing-indicator/%2B14259798283",
expect.objectContaining({
method: "PUT",
body: JSON.stringify({ recipient: "+15550001111" }),
}),
);
});
it("stops typing indicator with DELETE", async () => {
mockFetch.mockResolvedValue({
ok: true,
status: 204,
});
await containerSendTyping({
baseUrl: "http://localhost:8080",
account: "+14259798283",
recipient: "+15550001111",
stop: true,
});
expect(mockFetch).toHaveBeenCalledWith(
expect.anything(),
expect.objectContaining({ method: "DELETE" }),
);
});
});
describe("containerSendReceipt", () => {
beforeEach(() => {
vi.clearAllMocks();
});
it("sends read receipt", async () => {
mockFetch.mockResolvedValue({
ok: true,
status: 204,
});
const result = await containerSendReceipt({
baseUrl: "http://localhost:8080",
account: "+14259798283",
recipient: "+15550001111",
timestamp: 1700000000000,
});
expect(result).toBe(true);
expect(mockFetch).toHaveBeenCalledWith(
"http://localhost:8080/v1/receipts/%2B14259798283",
expect.objectContaining({
method: "POST",
body: JSON.stringify({
recipient: "+15550001111",
timestamp: 1700000000000,
receipt_type: "read",
}),
}),
);
});
it("sends viewed receipt when type specified", async () => {
mockFetch.mockResolvedValue({
ok: true,
status: 204,
});
await containerSendReceipt({
baseUrl: "http://localhost:8080",
account: "+14259798283",
recipient: "+15550001111",
timestamp: 1700000000000,
type: "viewed",
});
const callArgs = mockFetch.mock.calls[0];
const body = JSON.parse(callArgs[1].body);
expect(body.receipt_type).toBe("viewed");
});
});
describe("containerFetchAttachment", () => {
beforeEach(() => {
vi.clearAllMocks();
});
it("fetches attachment binary", async () => {
const binaryData = new Uint8Array([0x89, 0x50, 0x4e, 0x47]); // PNG header
mockFetch.mockResolvedValue({
ok: true,
status: 200,
arrayBuffer: async () => binaryData.buffer,
});
const result = await containerFetchAttachment("attachment-123", {
baseUrl: "http://localhost:8080",
});
expect(result).toBeInstanceOf(Buffer);
expect(mockFetch).toHaveBeenCalledWith(
"http://localhost:8080/v1/attachments/attachment-123",
expect.objectContaining({ method: "GET" }),
);
});
it("returns null on non-ok response", async () => {
mockFetch.mockResolvedValue({
ok: false,
status: 404,
});
const result = await containerFetchAttachment("attachment-123", {
baseUrl: "http://localhost:8080",
});
expect(result).toBeNull();
});
it("encodes attachment ID in URL", async () => {
mockFetch.mockResolvedValue({
ok: true,
status: 200,
arrayBuffer: async () => new ArrayBuffer(0),
});
await containerFetchAttachment("path/with/slashes", {
baseUrl: "http://localhost:8080",
});
expect(mockFetch).toHaveBeenCalledWith(
"http://localhost:8080/v1/attachments/path%2Fwith%2Fslashes",
expect.anything(),
);
});
});
describe("normalizeBaseUrl edge cases", () => {
beforeEach(() => {
vi.clearAllMocks();
});
it("throws error for empty base URL", async () => {
await expect(containerCheck("")).rejects.toThrow("Signal base URL is required");
});
it("throws error for whitespace-only base URL", async () => {
await expect(containerCheck(" ")).rejects.toThrow("Signal base URL is required");
});
it("handles https URLs", async () => {
mockFetch.mockResolvedValue({ ok: true, status: 200 });
await containerCheck("https://signal.example.com");
expect(mockFetch).toHaveBeenCalledWith(
"https://signal.example.com/v1/about",
expect.anything(),
);
});
it("handles URLs with ports", async () => {
mockFetch.mockResolvedValue({ ok: true, status: 200 });
await containerCheck("http://192.168.1.100:9922");
expect(mockFetch).toHaveBeenCalledWith("http://192.168.1.100:9922/v1/about", expect.anything());
});
});
describe("containerRestRequest edge cases", () => {
beforeEach(() => {
vi.clearAllMocks();
});
it("handles DELETE method", async () => {
mockFetch.mockResolvedValue({
ok: true,
status: 204,
});
await containerRestRequest(
"/v1/some-resource/123",
{ baseUrl: "http://localhost:8080" },
"DELETE",
);
expect(mockFetch).toHaveBeenCalledWith(
"http://localhost:8080/v1/some-resource/123",
expect.objectContaining({ method: "DELETE" }),
);
});
it("handles error response with empty body", async () => {
mockFetch.mockResolvedValue({
ok: false,
status: 500,
statusText: "Internal Server Error",
text: async () => "",
});
await expect(
containerRestRequest("/v2/send", { baseUrl: "http://localhost:8080" }, "POST"),
).rejects.toThrow("Signal REST 500: Internal Server Error");
});
it("handles JSON parse errors gracefully", async () => {
mockFetch.mockResolvedValue({
ok: true,
status: 200,
text: async () => "not-valid-json",
});
await expect(
containerRestRequest("/v1/about", { baseUrl: "http://localhost:8080" }),
).rejects.toThrow();
});
});
describe("containerSendReaction", () => {
beforeEach(() => {
vi.clearAllMocks();
});
it("sends reaction to recipient", async () => {
mockFetch.mockResolvedValue({
ok: true,
status: 200,
text: async () => JSON.stringify({ timestamp: 1700000000000 }),
});
const result = await containerSendReaction({
baseUrl: "http://localhost:8080",
account: "+14259798283",
recipient: "+15550001111",
emoji: "👍",
targetAuthor: "+15550001111",
targetTimestamp: 1699999999999,
});
expect(result).toEqual({ timestamp: 1700000000000 });
expect(mockFetch).toHaveBeenCalledWith(
"http://localhost:8080/v1/reactions/%2B14259798283",
expect.objectContaining({
method: "POST",
body: JSON.stringify({
recipient: "+15550001111",
reaction: "👍",
target_author: "+15550001111",
timestamp: 1699999999999,
}),
}),
);
});
it("includes group_id when provided", async () => {
mockFetch.mockResolvedValue({
ok: true,
status: 200,
text: async () => JSON.stringify({}),
});
await containerSendReaction({
baseUrl: "http://localhost:8080",
account: "+14259798283",
recipient: "+15550001111",
emoji: "❤️",
targetAuthor: "+15550001111",
targetTimestamp: 1699999999999,
groupId: "group-123",
});
const callArgs = mockFetch.mock.calls[0];
const body = JSON.parse(callArgs[1].body);
expect(body.group_id).toBe("group-123");
});
});
describe("containerRemoveReaction", () => {
beforeEach(() => {
vi.clearAllMocks();
});
it("removes reaction with DELETE", async () => {
mockFetch.mockResolvedValue({
ok: true,
status: 200,
text: async () => JSON.stringify({ timestamp: 1700000000000 }),
});
const result = await containerRemoveReaction({
baseUrl: "http://localhost:8080",
account: "+14259798283",
recipient: "+15550001111",
emoji: "👍",
targetAuthor: "+15550001111",
targetTimestamp: 1699999999999,
});
expect(result).toEqual({ timestamp: 1700000000000 });
expect(mockFetch).toHaveBeenCalledWith(
"http://localhost:8080/v1/reactions/%2B14259798283",
expect.objectContaining({
method: "DELETE",
body: JSON.stringify({
recipient: "+15550001111",
reaction: "👍",
target_author: "+15550001111",
timestamp: 1699999999999,
}),
}),
);
});
it("includes group_id when provided", async () => {
mockFetch.mockResolvedValue({
ok: true,
status: 200,
text: async () => JSON.stringify({}),
});
await containerRemoveReaction({
baseUrl: "http://localhost:8080",
account: "+14259798283",
recipient: "+15550001111",
emoji: "❤️",
targetAuthor: "+15550001111",
targetTimestamp: 1699999999999,
groupId: "group-123",
});
const callArgs = mockFetch.mock.calls[0];
const body = JSON.parse(callArgs[1].body);
expect(body.group_id).toBe("group-123");
});
});

View File

@ -0,0 +1,544 @@
/**
* Signal client for bbernhard/signal-cli-rest-api container.
* Uses WebSocket for receiving messages and REST API for sending.
*
* This is a separate implementation from client.ts (native signal-cli)
* to keep the two modes cleanly isolated.
*/
import fs from "node:fs/promises";
import nodePath from "node:path";
import { resolveFetch } from "openclaw/plugin-sdk/infra-runtime";
import { detectMime } from "openclaw/plugin-sdk/media-runtime";
import WebSocket from "ws";
export type ContainerRpcOptions = {
baseUrl: string;
timeoutMs?: number;
};
export type ContainerWebSocketMessage = {
envelope?: {
syncMessage?: unknown;
dataMessage?: {
message?: string;
groupInfo?: { groupId?: string; groupName?: string };
attachments?: Array<{
id?: string;
contentType?: string;
filename?: string;
size?: number;
}>;
quote?: { text?: string };
reaction?: unknown;
};
editMessage?: { dataMessage?: unknown };
reactionMessage?: unknown;
sourceNumber?: string;
sourceUuid?: string;
sourceName?: string;
timestamp?: number;
};
exception?: { message?: string };
};
const DEFAULT_TIMEOUT_MS = 10_000;
function normalizeBaseUrl(url: string): string {
const trimmed = url.trim();
if (!trimmed) {
throw new Error("Signal base URL is required");
}
if (/^https?:\/\//i.test(trimmed)) {
return trimmed.replace(/\/+$/, "");
}
return `http://${trimmed}`.replace(/\/+$/, "");
}
async function fetchWithTimeout(url: string, init: RequestInit, timeoutMs: number) {
const fetchImpl = resolveFetch();
if (!fetchImpl) {
throw new Error("fetch is not available");
}
const controller = new AbortController();
const timer = setTimeout(() => controller.abort(), timeoutMs);
try {
return await fetchImpl(url, { ...init, signal: controller.signal });
} finally {
clearTimeout(timer);
}
}
/**
* Check if bbernhard container REST API is available.
*/
export async function containerCheck(
baseUrl: string,
timeoutMs = DEFAULT_TIMEOUT_MS,
): Promise<{ ok: boolean; status?: number | null; error?: string | null }> {
const normalized = normalizeBaseUrl(baseUrl);
try {
const res = await fetchWithTimeout(`${normalized}/v1/about`, { method: "GET" }, timeoutMs);
if (!res.ok) {
return { ok: false, status: res.status, error: `HTTP ${res.status}` };
}
return { ok: true, status: res.status, error: null };
} catch (err) {
return {
ok: false,
status: null,
error: err instanceof Error ? err.message : String(err),
};
}
}
/**
* Make a REST API request to bbernhard container.
*/
export async function containerRestRequest<T = unknown>(
endpoint: string,
opts: ContainerRpcOptions,
method: "GET" | "POST" | "PUT" | "DELETE" = "GET",
body?: unknown,
): Promise<T> {
const baseUrl = normalizeBaseUrl(opts.baseUrl);
const url = `${baseUrl}${endpoint}`;
const init: RequestInit = {
method,
headers: { "Content-Type": "application/json" },
};
if (body) {
init.body = JSON.stringify(body);
}
const res = await fetchWithTimeout(url, init, opts.timeoutMs ?? DEFAULT_TIMEOUT_MS);
if (res.status === 201 || res.status === 204) {
return undefined as T;
}
if (!res.ok) {
const errorText = await res.text().catch(() => "");
throw new Error(`Signal REST ${res.status}: ${errorText || res.statusText}`);
}
const text = await res.text();
if (!text) {
return undefined as T;
}
return JSON.parse(text) as T;
}
/**
* Fetch attachment binary from bbernhard container.
*/
export async function containerFetchAttachment(
attachmentId: string,
opts: ContainerRpcOptions,
): Promise<Buffer | null> {
const baseUrl = normalizeBaseUrl(opts.baseUrl);
const url = `${baseUrl}/v1/attachments/${encodeURIComponent(attachmentId)}`;
const res = await fetchWithTimeout(url, { method: "GET" }, opts.timeoutMs ?? DEFAULT_TIMEOUT_MS);
if (!res.ok) {
return null;
}
const arrayBuffer = await res.arrayBuffer();
return Buffer.from(arrayBuffer);
}
/**
* Stream messages using WebSocket from bbernhard container.
* The Promise resolves when the connection closes (for any reason).
* The caller (runSignalLoopAdapter) is responsible for reconnection.
*/
export async function streamContainerEvents(params: {
baseUrl: string;
account?: string;
abortSignal?: AbortSignal;
onEvent: (event: ContainerWebSocketMessage) => void;
logger?: { log?: (msg: string) => void; error?: (msg: string) => void };
}): Promise<void> {
const normalized = normalizeBaseUrl(params.baseUrl);
const wsUrl = `${normalized.replace(/^http/, "ws")}/v1/receive/${encodeURIComponent(params.account ?? "")}`;
const log = params.logger?.log ?? (() => {});
const logError = params.logger?.error ?? (() => {});
log(`[signal-ws] connecting to ${wsUrl}`);
return new Promise((resolve, reject) => {
let ws: WebSocket;
let resolved = false;
const cleanup = () => {
if (resolved) {
return;
}
resolved = true;
};
try {
ws = new WebSocket(wsUrl);
} catch (err) {
logError(
`[signal-ws] failed to create WebSocket: ${err instanceof Error ? err.message : String(err)}`,
);
reject(err);
return;
}
ws.on("open", () => {
log("[signal-ws] connected");
});
ws.on("message", (data: Buffer) => {
try {
const text = data.toString();
log(`[signal-ws] received: ${text.slice(0, 200)}${text.length > 200 ? "..." : ""}`);
const envelope = JSON.parse(text) as ContainerWebSocketMessage;
if (envelope) {
params.onEvent(envelope);
}
} catch (err) {
logError(`[signal-ws] parse error: ${err instanceof Error ? err.message : String(err)}`);
}
});
ws.on("error", (err) => {
logError(`[signal-ws] error: ${err instanceof Error ? err.message : String(err)}`);
// Don't resolve here - the close event will fire next
});
ws.on("close", (code, reason) => {
const reasonStr = reason?.toString() || "no reason";
log(`[signal-ws] closed (code=${code}, reason=${reasonStr})`);
cleanup();
resolve(); // Let the outer loop handle reconnection
});
ws.on("ping", () => {
log("[signal-ws] ping received");
});
ws.on("pong", () => {
log("[signal-ws] pong received");
});
params.abortSignal?.addEventListener(
"abort",
() => {
log("[signal-ws] aborted, closing connection");
cleanup();
ws.close();
resolve();
},
{ once: true },
);
});
}
/**
* Convert local file paths to base64 data URIs for the container REST API.
* The bbernhard container /v2/send only accepts `base64_attachments` (not file paths).
*/
async function filesToBase64DataUris(filePaths: string[]): Promise<string[]> {
const results: string[] = [];
for (const filePath of filePaths) {
const buffer = await fs.readFile(filePath);
const mime = (await detectMime({ buffer, filePath })) ?? "application/octet-stream";
const filename = nodePath.basename(filePath);
const b64 = buffer.toString("base64");
results.push(`data:${mime};filename=${filename};base64,${b64}`);
}
return results;
}
/**
* Send message via bbernhard container REST API.
*/
export async function containerSendMessage(params: {
baseUrl: string;
account: string;
recipients: string[];
message: string;
textStyles?: Array<{ start: number; length: number; style: string }>;
attachments?: string[];
timeoutMs?: number;
}): Promise<{ timestamp?: number }> {
const payload: Record<string, unknown> = {
message: params.message,
number: params.account,
recipients: params.recipients,
};
if (params.textStyles && params.textStyles.length > 0) {
payload["text_style"] = params.textStyles.map(
(style) => `${style.start}:${style.length}:${style.style}`,
);
}
if (params.attachments && params.attachments.length > 0) {
// Container API only accepts base64-encoded attachments, not file paths.
payload.base64_attachments = await filesToBase64DataUris(params.attachments);
}
const result = await containerRestRequest<{ timestamp?: number }>(
"/v2/send",
{ baseUrl: params.baseUrl, timeoutMs: params.timeoutMs },
"POST",
payload,
);
return result ?? {};
}
/**
* Send typing indicator via bbernhard container REST API.
*/
export async function containerSendTyping(params: {
baseUrl: string;
account: string;
recipient: string;
stop?: boolean;
timeoutMs?: number;
}): Promise<boolean> {
const method = params.stop ? "DELETE" : "PUT";
await containerRestRequest(
`/v1/typing-indicator/${encodeURIComponent(params.account)}`,
{ baseUrl: params.baseUrl, timeoutMs: params.timeoutMs },
method,
{ recipient: params.recipient },
);
return true;
}
/**
* Send read receipt via bbernhard container REST API.
*/
export async function containerSendReceipt(params: {
baseUrl: string;
account: string;
recipient: string;
timestamp: number;
type?: "read" | "viewed";
timeoutMs?: number;
}): Promise<boolean> {
await containerRestRequest(
`/v1/receipts/${encodeURIComponent(params.account)}`,
{ baseUrl: params.baseUrl, timeoutMs: params.timeoutMs },
"POST",
{
recipient: params.recipient,
timestamp: params.timestamp,
receipt_type: params.type ?? "read",
},
);
return true;
}
/**
* Send a reaction to a message via bbernhard container REST API.
*/
export async function containerSendReaction(params: {
baseUrl: string;
account: string;
recipient: string;
emoji: string;
targetAuthor: string;
targetTimestamp: number;
groupId?: string;
timeoutMs?: number;
}): Promise<{ timestamp?: number }> {
const payload: Record<string, unknown> = {
recipient: params.recipient,
reaction: params.emoji,
target_author: params.targetAuthor,
timestamp: params.targetTimestamp,
};
if (params.groupId) {
payload.group_id = params.groupId;
}
const result = await containerRestRequest<{ timestamp?: number }>(
`/v1/reactions/${encodeURIComponent(params.account)}`,
{ baseUrl: params.baseUrl, timeoutMs: params.timeoutMs },
"POST",
payload,
);
return result ?? {};
}
/**
* Remove a reaction from a message via bbernhard container REST API.
*/
export async function containerRemoveReaction(params: {
baseUrl: string;
account: string;
recipient: string;
emoji: string;
targetAuthor: string;
targetTimestamp: number;
groupId?: string;
timeoutMs?: number;
}): Promise<{ timestamp?: number }> {
const payload: Record<string, unknown> = {
recipient: params.recipient,
reaction: params.emoji,
target_author: params.targetAuthor,
timestamp: params.targetTimestamp,
};
if (params.groupId) {
payload.group_id = params.groupId;
}
const result = await containerRestRequest<{ timestamp?: number }>(
`/v1/reactions/${encodeURIComponent(params.account)}`,
{ baseUrl: params.baseUrl, timeoutMs: params.timeoutMs },
"DELETE",
payload,
);
return result ?? {};
}
/**
* Strip the "uuid:" prefix that native signal-cli accepts but the container API rejects.
*/
function stripUuidPrefix(id: string): string {
return id.startsWith("uuid:") ? id.slice(5) : id;
}
/**
* Convert a group internal_id to the container-expected format.
* The bbernhard container expects groups as "group.{base64(internal_id)}".
*/
function formatGroupIdForContainer(groupId: string): string {
if (groupId.startsWith("group.")) {
return groupId;
}
return `group.${Buffer.from(groupId).toString("base64")}`;
}
/**
* Drop-in replacement for native signalRpcRequest that translates
* JSON-RPC method + params into the equivalent container REST API calls.
* This keeps all container protocol details (uuid: stripping, group ID
* formatting, base64 attachments, text-style conversion) isolated here.
*/
export async function containerRpcRequest<T = unknown>(
method: string,
params: Record<string, unknown> | undefined,
opts: ContainerRpcOptions,
): Promise<T> {
const p = params ?? {};
switch (method) {
case "send": {
const recipients = ((p.recipient as string[] | undefined) ?? []).map(stripUuidPrefix);
const usernames = ((p.username as string[] | undefined) ?? []).map(stripUuidPrefix);
const groupId = p.groupId as string | undefined;
const formattedGroupId = groupId ? formatGroupIdForContainer(groupId) : undefined;
const finalRecipients =
recipients.length > 0
? recipients
: usernames.length > 0
? usernames
: formattedGroupId
? [formattedGroupId]
: [];
const textStylesRaw = p["text-style"] as string[] | undefined;
const textStyles = textStylesRaw?.map((s) => {
const [start, length, style] = s.split(":");
return { start: Number(start), length: Number(length), style };
});
const result = await containerSendMessage({
baseUrl: opts.baseUrl,
account: (p.account as string) ?? "",
recipients: finalRecipients,
message: (p.message as string) ?? "",
textStyles,
attachments: p.attachments as string[] | undefined,
timeoutMs: opts.timeoutMs,
});
return result as T;
}
case "sendTyping": {
const recipient = stripUuidPrefix(
(p.recipient as string[] | undefined)?.[0] ?? (p.groupId as string | undefined) ?? "",
);
await containerSendTyping({
baseUrl: opts.baseUrl,
account: (p.account as string) ?? "",
recipient,
stop: p.stop as boolean | undefined,
timeoutMs: opts.timeoutMs,
});
return undefined as T;
}
case "sendReceipt": {
const recipient = stripUuidPrefix((p.recipient as string[] | undefined)?.[0] ?? "");
await containerSendReceipt({
baseUrl: opts.baseUrl,
account: (p.account as string) ?? "",
recipient,
timestamp: p.targetTimestamp as number,
type: p.type as "read" | "viewed" | undefined,
timeoutMs: opts.timeoutMs,
});
return undefined as T;
}
case "sendReaction": {
const recipient = stripUuidPrefix((p.recipients as string[] | undefined)?.[0] ?? "");
const groupId = (p.groupIds as string[] | undefined)?.[0] ?? undefined;
const formattedGroupId = groupId ? formatGroupIdForContainer(groupId) : undefined;
const reactionParams = {
baseUrl: opts.baseUrl,
account: (p.account as string) ?? "",
recipient,
emoji: (p.emoji as string) ?? "",
targetAuthor: stripUuidPrefix((p.targetAuthor as string) ?? recipient),
targetTimestamp: p.targetTimestamp as number,
groupId: formattedGroupId,
timeoutMs: opts.timeoutMs,
};
const fn = p.remove ? containerRemoveReaction : containerSendReaction;
return (await fn(reactionParams)) as T;
}
case "getAttachment": {
const attachmentId = p.id as string;
const buffer = await containerFetchAttachment(attachmentId, {
baseUrl: opts.baseUrl,
timeoutMs: opts.timeoutMs,
});
// Convert to native format: { data: base64String }
if (!buffer) {
return { data: undefined } as T;
}
return { data: buffer.toString("base64") } as T;
}
case "version": {
const result = await containerRestRequest<{ versions?: string[]; build?: number }>(
"/v1/about",
{ baseUrl: opts.baseUrl, timeoutMs: opts.timeoutMs },
);
return result as T;
}
default:
throw new Error(`Unsupported container RPC method: ${method}`);
}
}

View File

@ -150,8 +150,14 @@ vi.mock("./client.js", () => ({
signalRpcRequest: (...args: unknown[]) => signalRpcRequestMock(...args),
}));
vi.mock("./daemon.js", async () => {
const actual = await vi.importActual<typeof import("./daemon.js")>("./daemon.js");
vi.mock("./client-adapter.js", () => ({
streamSignalEvents: (...args: unknown[]) => streamMock(...args),
signalCheck: (...args: unknown[]) => signalCheckMock(...args),
signalRpcRequest: (...args: unknown[]) => signalRpcRequestMock(...args),
}));
vi.mock("./daemon.js", async (importOriginal) => {
const actual = await importOriginal<typeof import("./daemon.js")>();
return {
...actual,
spawnSignalDaemon: (...args: unknown[]) => spawnSignalDaemonMock(...args),

View File

@ -24,7 +24,7 @@ import { createNonExitingRuntime, type RuntimeEnv } from "openclaw/plugin-sdk/ru
import { normalizeStringEntries } from "openclaw/plugin-sdk/text-runtime";
import { normalizeE164 } from "openclaw/plugin-sdk/text-runtime";
import { resolveSignalAccount } from "./accounts.js";
import { signalCheck, signalRpcRequest } from "./client.js";
import { signalRpcRequest, signalCheck } from "./client-adapter.js";
import { formatSignalDaemonExit, spawnSignalDaemon, type SignalDaemonHandle } from "./daemon.js";
import { isSignalSenderAllowed, type resolveSignalSender } from "./identity.js";
import { createSignalEventHandler } from "./monitor/event-handler.js";
@ -380,6 +380,7 @@ export async function monitorSignalProvider(opts: MonitorSignalOpts = {}): Promi
const waitForTransportReadyFn = opts.waitForTransportReady ?? waitForTransportReady;
const autoStart = opts.autoStart ?? accountInfo.config.autoStart ?? !accountInfo.config.httpUrl;
const configuredApiMode = cfg.channels?.signal?.apiMode ?? "auto";
const startupTimeoutMs = Math.min(
120_000,
Math.max(1_000, opts.startupTimeoutMs ?? accountInfo.config.startupTimeoutMs ?? 30_000),
@ -388,6 +389,12 @@ export async function monitorSignalProvider(opts: MonitorSignalOpts = {}): Promi
const daemonLifecycle = createSignalDaemonLifecycle({ abortSignal: opts.abortSignal });
let daemonHandle: SignalDaemonHandle | null = null;
if (autoStart && configuredApiMode === "container") {
throw new Error(
"channels.signal.autoStart=true is incompatible with channels.signal.apiMode=container",
);
}
if (autoStart) {
const cliPath = opts.cliPath ?? accountInfo.config.cliPath ?? "signal-cli";
const httpHost = opts.httpHost ?? accountInfo.config.httpHost ?? "127.0.0.1";

View File

@ -1,5 +1,5 @@
import { beforeEach, describe, expect, it, vi } from "vitest";
import * as clientModule from "./client.js";
import * as clientAdapterModule from "./client-adapter.js";
import { classifySignalCliLogLine } from "./daemon.js";
import { probeSignal } from "./probe.js";
@ -9,12 +9,12 @@ describe("probeSignal", () => {
});
it("extracts version from {version} result", async () => {
vi.spyOn(clientModule, "signalCheck").mockResolvedValueOnce({
vi.spyOn(clientAdapterModule, "signalCheck").mockResolvedValueOnce({
ok: true,
status: 200,
error: null,
});
vi.spyOn(clientModule, "signalRpcRequest").mockResolvedValueOnce({ version: "0.13.22" });
vi.spyOn(clientAdapterModule, "signalRpcRequest").mockResolvedValueOnce({ version: "0.13.22" });
const res = await probeSignal("http://127.0.0.1:8080", 1000);
@ -24,7 +24,7 @@ describe("probeSignal", () => {
});
it("returns ok=false when /check fails", async () => {
vi.spyOn(clientModule, "signalCheck").mockResolvedValueOnce({
vi.spyOn(clientAdapterModule, "signalCheck").mockResolvedValueOnce({
ok: false,
status: 503,
error: "HTTP 503",

View File

@ -1,5 +1,5 @@
import type { BaseProbeResult } from "openclaw/plugin-sdk/channel-contract";
import { signalCheck, signalRpcRequest } from "./client.js";
import { signalCheck, signalRpcRequest } from "./client-adapter.js";
export type SignalProbe = BaseProbeResult & {
status?: number | null;

View File

@ -20,7 +20,7 @@ vi.mock("./accounts.js", () => ({
}),
}));
vi.mock("./client.js", () => ({
vi.mock("./client-adapter.js", () => ({
signalRpcRequest: (...args: unknown[]) => rpcMock(...args),
}));

View File

@ -5,7 +5,7 @@
import { loadConfig } from "openclaw/plugin-sdk/config-runtime";
import type { OpenClawConfig } from "openclaw/plugin-sdk/config-runtime";
import { resolveSignalAccount } from "./accounts.js";
import { signalRpcRequest } from "./client.js";
import { signalRpcRequest } from "./client-adapter.js";
import { resolveSignalRpcContext } from "./rpc-context.js";
export type SignalReactionOpts = {

View File

@ -3,7 +3,7 @@ import { resolveMarkdownTableMode } from "openclaw/plugin-sdk/config-runtime";
import { kindFromMime } from "openclaw/plugin-sdk/media-runtime";
import { resolveOutboundAttachmentFromUrl } from "openclaw/plugin-sdk/media-runtime";
import { resolveSignalAccount } from "./accounts.js";
import { signalRpcRequest } from "./client.js";
import { signalRpcRequest } from "./client-adapter.js";
import { markdownToSignalText, type SignalTextStyleRange } from "./format.js";
import { resolveSignalRpcContext } from "./rpc-context.js";

View File

@ -2,7 +2,7 @@ import type { BackoffPolicy } from "openclaw/plugin-sdk/infra-runtime";
import { computeBackoff, sleepWithAbort } from "openclaw/plugin-sdk/infra-runtime";
import { logVerbose, shouldLogVerbose } from "openclaw/plugin-sdk/runtime-env";
import type { RuntimeEnv } from "openclaw/plugin-sdk/runtime-env";
import { type SignalSseEvent, streamSignalEvents } from "./client.js";
import { type SignalAdapterEvent, streamSignalEvents } from "./client-adapter.js";
const DEFAULT_RECONNECT_POLICY: BackoffPolicy = {
initialMs: 1_000,
@ -16,7 +16,7 @@ type RunSignalSseLoopParams = {
account?: string;
abortSignal?: AbortSignal;
runtime: RuntimeEnv;
onEvent: (event: SignalSseEvent) => void;
onEvent: (event: SignalAdapterEvent) => void;
policy?: Partial<BackoffPolicy>;
};
@ -47,26 +47,30 @@ export async function runSignalSseLoop({
baseUrl,
account,
abortSignal,
onEvent: (event) => {
onEvent: (event: SignalAdapterEvent) => {
reconnectAttempts = 0;
onEvent(event);
},
logger: {
log: runtime.log,
error: runtime.error,
},
});
if (abortSignal?.aborted) {
return;
}
reconnectAttempts += 1;
const delayMs = computeBackoff(reconnectPolicy, reconnectAttempts);
logReconnectVerbose(`Signal SSE stream ended, reconnecting in ${delayMs / 1000}s...`);
logReconnectVerbose(`Signal stream ended, reconnecting in ${delayMs / 1000}s...`);
await sleepWithAbort(delayMs, abortSignal);
} catch (err) {
if (abortSignal?.aborted) {
return;
}
runtime.error?.(`Signal SSE stream error: ${String(err)}`);
runtime.error?.(`Signal stream error: ${String(err)}`);
reconnectAttempts += 1;
const delayMs = computeBackoff(reconnectPolicy, reconnectAttempts);
runtime.log?.(`Signal SSE connection lost, reconnecting in ${delayMs / 1000}s...`);
runtime.log?.(`Signal connection lost, reconnecting in ${delayMs / 1000}s...`);
try {
await sleepWithAbort(delayMs, abortSignal);
} catch (sleepErr) {

View File

@ -3,6 +3,7 @@ import type { GroupToolPolicyBySenderConfig, GroupToolPolicyConfig } from "./typ
export type SignalReactionNotificationMode = "off" | "own" | "all" | "allowlist";
export type SignalReactionLevel = "off" | "ack" | "minimal" | "extensive";
export type SignalApiMode = "auto" | "native" | "container";
export type SignalGroupConfig = {
requireMention?: boolean;
@ -55,6 +56,14 @@ export type SignalAccountConfig = CommonChannelMessagingConfig & {
};
export type SignalConfig = {
/**
* Signal API mode (channel-global):
* - "auto" (default): Auto-detect based on available endpoints
* - "native": Use native signal-cli with JSON-RPC + SSE (/api/v1/rpc, /api/v1/events)
* - "container": Use bbernhard/signal-cli-rest-api with REST + WebSocket (/v2/send, /v1/receive/{account}).
* Requires the container to run with MODE=json-rpc for real-time message receiving.
*/
apiMode?: SignalApiMode;
/** Optional per-account Signal configuration (multi-account). */
accounts?: Record<string, SignalAccountConfig>;
/** Optional default account id when multiple accounts are configured. */