diff --git a/CHANGELOG.md b/CHANGELOG.md index b6a0ae6d0cf..72931764f67 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ Docs: https://docs.openclaw.ai ### Fixes - Agents/Compaction: centralize exec default resolution in the shared tool factory so per-agent `tools.exec` overrides (host/security/ask/node and related defaults) persist across compaction retries. (#15833) Thanks @napetrov. +- Voice Call: route webhook runtime event handling through shared manager event logic so rejected inbound hangups are idempotent in production, with regression tests for duplicate reject events and provider-call-ID remapping parity. (#15892) Thanks @dcantu96. - CLI/Completion: route plugin-load logs to stderr and write generated completion scripts directly to stdout to avoid `source <(openclaw completion ...)` corruption. (#15481) Thanks @arosstale. - Gateway/Agents: stop injecting a phantom `main` agent into gateway agent listings when `agents.list` explicitly excludes it. (#11450) Thanks @arosstale. - Agents/Heartbeat: stop auto-creating `HEARTBEAT.md` during workspace bootstrap so missing files continue to run heartbeat as documented. (#11766) Thanks @shadril238. diff --git a/extensions/voice-call/src/manager.test.ts b/extensions/voice-call/src/manager.test.ts index e0285a4444a..3ffe9b040a4 100644 --- a/extensions/voice-call/src/manager.test.ts +++ b/extensions/voice-call/src/manager.test.ts @@ -195,6 +195,46 @@ describe("CallManager", () => { expect(provider.hangupCalls[0]?.providerCallId).toBe("provider-suffix"); }); + it("rejects duplicate inbound events with a single hangup call", () => { + const config = VoiceCallConfigSchema.parse({ + enabled: true, + provider: "plivo", + fromNumber: "+15550000000", + inboundPolicy: "disabled", + }); + + const storePath = path.join(os.tmpdir(), `openclaw-voice-call-test-${Date.now()}`); + const provider = new FakeProvider(); + const manager = new CallManager(config, storePath); + manager.initialize(provider, "https://example.com/voice/webhook"); + + manager.processEvent({ + id: "evt-reject-init", + type: "call.initiated", + callId: "provider-dup", + providerCallId: "provider-dup", + timestamp: Date.now(), + direction: "inbound", + from: "+15552222222", + to: "+15550000000", + }); + + manager.processEvent({ + id: "evt-reject-ring", + type: "call.ringing", + callId: "provider-dup", + providerCallId: "provider-dup", + timestamp: Date.now(), + direction: "inbound", + from: "+15552222222", + to: "+15550000000", + }); + + expect(manager.getCallByProviderCallId("provider-dup")).toBeUndefined(); + expect(provider.hangupCalls).toHaveLength(1); + expect(provider.hangupCalls[0]?.providerCallId).toBe("provider-dup"); + }); + it("accepts inbound calls that exactly match the allowlist", () => { const config = VoiceCallConfigSchema.parse({ enabled: true, diff --git a/extensions/voice-call/src/manager.ts b/extensions/voice-call/src/manager.ts index 0cfc9158efa..480e21d70ef 100644 --- a/extensions/voice-call/src/manager.ts +++ b/extensions/voice-call/src/manager.ts @@ -4,13 +4,13 @@ import fsp from "node:fs/promises"; import os from "node:os"; import path from "node:path"; import type { CallMode, VoiceCallConfig } from "./config.js"; +import type { CallManagerContext } from "./manager/context.js"; import type { VoiceCallProvider } from "./providers/base.js"; -import { isAllowlistedCaller, normalizePhoneNumber } from "./allowlist.js"; +import { processEvent as processManagerEvent } from "./manager/events.js"; import { type CallId, type CallRecord, CallRecordSchema, - type CallState, type NormalizedEvent, type OutboundCallOptions, TerminalStates, @@ -44,6 +44,7 @@ export class CallManager { private activeCalls = new Map(); private providerCallIdMap = new Map(); // providerCallId -> internal callId private processedEventIds = new Set(); + private rejectedProviderCallIds = new Set(); private provider: VoiceCallProvider | null = null; private config: VoiceCallConfig; private storePath: string; @@ -282,35 +283,6 @@ export class CallManager { } } - /** - * Start max duration timer for a call. - * Auto-hangup when maxDurationSeconds is reached. - */ - private startMaxDurationTimer(callId: CallId): void { - // Clear any existing timer - this.clearMaxDurationTimer(callId); - - const maxDurationMs = this.config.maxDurationSeconds * 1000; - console.log( - `[voice-call] Starting max duration timer (${this.config.maxDurationSeconds}s) for call ${callId}`, - ); - - const timer = setTimeout(async () => { - this.maxDurationTimers.delete(callId); - const call = this.getCall(callId); - if (call && !TerminalStates.has(call.state)) { - console.log( - `[voice-call] Max duration reached (${this.config.maxDurationSeconds}s), ending call ${callId}`, - ); - call.endReason = "timeout"; - this.persistCallRecord(call); - await this.endCall(callId); - } - }, maxDurationMs); - - this.maxDurationTimers.set(callId, timer); - } - /** * Clear max duration timer for a call. */ @@ -340,15 +312,6 @@ export class CallManager { waiter.reject(new Error(reason)); } - private resolveTranscriptWaiter(callId: CallId, transcript: string): void { - const waiter = this.transcriptWaiters.get(callId); - if (!waiter) { - return; - } - this.clearTranscriptWaiter(callId); - waiter.resolve(transcript); - } - private waitForFinalTranscript(callId: CallId): Promise { // Only allow one in-flight waiter per call. this.rejectTranscriptWaiter(callId, "Transcript waiter replaced"); @@ -458,220 +421,29 @@ export class CallManager { } } - /** - * Check if an inbound call should be accepted based on policy. - */ - private shouldAcceptInbound(from: string | undefined): boolean { - const { inboundPolicy: policy, allowFrom } = this.config; - - switch (policy) { - case "disabled": - console.log("[voice-call] Inbound call rejected: policy is disabled"); - return false; - - case "open": - console.log("[voice-call] Inbound call accepted: policy is open"); - return true; - - case "allowlist": - case "pairing": { - const normalized = normalizePhoneNumber(from); - if (!normalized) { - console.log("[voice-call] Inbound call rejected: missing caller ID"); - return false; - } - const allowed = isAllowlistedCaller(normalized, allowFrom); - const status = allowed ? "accepted" : "rejected"; - console.log( - `[voice-call] Inbound call ${status}: ${from} ${allowed ? "is in" : "not in"} allowlist`, - ); - return allowed; - } - - default: - return false; - } - } - - /** - * Create a call record for an inbound call. - */ - private createInboundCall(providerCallId: string, from: string, to: string): CallRecord { - const callId = crypto.randomUUID(); - - const callRecord: CallRecord = { - callId, - providerCallId, - provider: this.provider?.name || "twilio", - direction: "inbound", - state: "ringing", - from, - to, - startedAt: Date.now(), - transcript: [], - processedEventIds: [], - metadata: { - initialMessage: this.config.inboundGreeting || "Hello! How can I help you today?", + private getContext(): CallManagerContext { + return { + activeCalls: this.activeCalls, + providerCallIdMap: this.providerCallIdMap, + processedEventIds: this.processedEventIds, + rejectedProviderCallIds: this.rejectedProviderCallIds, + onCallAnswered: (call) => { + this.maybeSpeakInitialMessageOnAnswered(call); }, + provider: this.provider, + config: this.config, + storePath: this.storePath, + webhookUrl: this.webhookUrl, + transcriptWaiters: this.transcriptWaiters, + maxDurationTimers: this.maxDurationTimers, }; - - this.activeCalls.set(callId, callRecord); - this.providerCallIdMap.set(providerCallId, callId); // Map providerCallId to internal callId - this.persistCallRecord(callRecord); - - console.log(`[voice-call] Created inbound call record: ${callId} from ${from}`); - return callRecord; - } - - /** - * Look up a call by either internal callId or providerCallId. - */ - private findCall(callIdOrProviderCallId: string): CallRecord | undefined { - // Try direct lookup by internal callId - const directCall = this.activeCalls.get(callIdOrProviderCallId); - if (directCall) { - return directCall; - } - - // Try lookup by providerCallId - return this.getCallByProviderCallId(callIdOrProviderCallId); } /** * Process a webhook event. */ processEvent(event: NormalizedEvent): void { - // Idempotency check - if (this.processedEventIds.has(event.id)) { - return; - } - this.processedEventIds.add(event.id); - - let call = this.findCall(event.callId); - - // Handle inbound calls - create record if it doesn't exist - if (!call && event.direction === "inbound" && event.providerCallId) { - // Check if we should accept this inbound call - if (!this.shouldAcceptInbound(event.from)) { - void this.rejectInboundCall(event); - return; - } - - // Create a new call record for this inbound call - call = this.createInboundCall( - event.providerCallId, - event.from || "unknown", - event.to || this.config.fromNumber || "unknown", - ); - - // Update the event's callId to use our internal ID - event.callId = call.callId; - } - - if (!call) { - // Still no call record - ignore event - return; - } - - // Update provider call ID if we got it - if (event.providerCallId && event.providerCallId !== call.providerCallId) { - const previousProviderCallId = call.providerCallId; - call.providerCallId = event.providerCallId; - this.providerCallIdMap.set(event.providerCallId, call.callId); - if (previousProviderCallId) { - const mapped = this.providerCallIdMap.get(previousProviderCallId); - if (mapped === call.callId) { - this.providerCallIdMap.delete(previousProviderCallId); - } - } - } - - // Track processed event - call.processedEventIds.push(event.id); - - // Process event based on type - switch (event.type) { - case "call.initiated": - this.transitionState(call, "initiated"); - break; - - case "call.ringing": - this.transitionState(call, "ringing"); - break; - - case "call.answered": - call.answeredAt = event.timestamp; - this.transitionState(call, "answered"); - // Start max duration timer when call is answered - this.startMaxDurationTimer(call.callId); - // Best-effort: speak initial message (for inbound greetings and outbound - // conversation mode) once the call is answered. - this.maybeSpeakInitialMessageOnAnswered(call); - break; - - case "call.active": - this.transitionState(call, "active"); - break; - - case "call.speaking": - this.transitionState(call, "speaking"); - break; - - case "call.speech": - if (event.isFinal) { - this.addTranscriptEntry(call, "user", event.transcript); - this.resolveTranscriptWaiter(call.callId, event.transcript); - } - this.transitionState(call, "listening"); - break; - - case "call.ended": - call.endedAt = event.timestamp; - call.endReason = event.reason; - this.transitionState(call, event.reason as CallState); - this.clearMaxDurationTimer(call.callId); - this.rejectTranscriptWaiter(call.callId, `Call ended: ${event.reason}`); - this.activeCalls.delete(call.callId); - if (call.providerCallId) { - this.providerCallIdMap.delete(call.providerCallId); - } - break; - - case "call.error": - if (!event.retryable) { - call.endedAt = event.timestamp; - call.endReason = "error"; - this.transitionState(call, "error"); - this.clearMaxDurationTimer(call.callId); - this.rejectTranscriptWaiter(call.callId, `Call error: ${event.error}`); - this.activeCalls.delete(call.callId); - if (call.providerCallId) { - this.providerCallIdMap.delete(call.providerCallId); - } - } - break; - } - - this.persistCallRecord(call); - } - - private async rejectInboundCall(event: NormalizedEvent): Promise { - if (!this.provider || !event.providerCallId) { - return; - } - const callId = event.callId || event.providerCallId; - try { - await this.provider.hangupCall({ - callId, - providerCallId: event.providerCallId, - reason: "hangup-bot", - }); - } catch (err) { - console.warn( - `[voice-call] Failed to reject inbound call ${event.providerCallId}:`, - err instanceof Error ? err.message : err, - ); - } + processManagerEvent(this.getContext(), event); } private maybeSpeakInitialMessageOnAnswered(call: CallRecord): void { @@ -758,52 +530,6 @@ export class CallManager { return calls; } - // States that can cycle during multi-turn conversations - private static readonly ConversationStates = new Set(["speaking", "listening"]); - - // Non-terminal state order for monotonic transitions - private static readonly StateOrder: readonly CallState[] = [ - "initiated", - "ringing", - "answered", - "active", - "speaking", - "listening", - ]; - - /** - * Transition call state with monotonic enforcement. - */ - private transitionState(call: CallRecord, newState: CallState): void { - // No-op for same state or already terminal - if (call.state === newState || TerminalStates.has(call.state)) { - return; - } - - // Terminal states can always be reached from non-terminal - if (TerminalStates.has(newState)) { - call.state = newState; - return; - } - - // Allow cycling between speaking and listening (multi-turn conversations) - if ( - CallManager.ConversationStates.has(call.state) && - CallManager.ConversationStates.has(newState) - ) { - call.state = newState; - return; - } - - // Only allow forward transitions in state order - const currentIndex = CallManager.StateOrder.indexOf(call.state); - const newIndex = CallManager.StateOrder.indexOf(newState); - - if (newIndex > currentIndex) { - call.state = newState; - } - } - /** * Add an entry to the call transcript. */ diff --git a/extensions/voice-call/src/manager/context.ts b/extensions/voice-call/src/manager/context.ts index 334570ab8c5..6cac6c93225 100644 --- a/extensions/voice-call/src/manager/context.ts +++ b/extensions/voice-call/src/manager/context.ts @@ -12,6 +12,10 @@ export type CallManagerContext = { activeCalls: Map; providerCallIdMap: Map; processedEventIds: Set; + /** Provider call IDs we already sent a reject hangup for; avoids duplicate hangup calls. */ + rejectedProviderCallIds: Set; + /** Optional runtime hook invoked after an event transitions a call into answered state. */ + onCallAnswered?: (call: CallRecord) => void; provider: VoiceCallProvider | null; config: VoiceCallConfig; storePath: string; diff --git a/extensions/voice-call/src/manager/events.test.ts b/extensions/voice-call/src/manager/events.test.ts new file mode 100644 index 00000000000..93707609cf0 --- /dev/null +++ b/extensions/voice-call/src/manager/events.test.ts @@ -0,0 +1,240 @@ +import fs from "node:fs"; +import os from "node:os"; +import path from "node:path"; +import { describe, expect, it } from "vitest"; +import type { HangupCallInput, NormalizedEvent } from "../types.js"; +import type { CallManagerContext } from "./context.js"; +import { VoiceCallConfigSchema } from "../config.js"; +import { processEvent } from "./events.js"; + +function createContext(overrides: Partial = {}): CallManagerContext { + const storePath = path.join(os.tmpdir(), `openclaw-voice-call-events-test-${Date.now()}`); + fs.mkdirSync(storePath, { recursive: true }); + return { + activeCalls: new Map(), + providerCallIdMap: new Map(), + processedEventIds: new Set(), + rejectedProviderCallIds: new Set(), + provider: null, + config: VoiceCallConfigSchema.parse({ + enabled: true, + provider: "plivo", + fromNumber: "+15550000000", + }), + storePath, + webhookUrl: null, + transcriptWaiters: new Map(), + maxDurationTimers: new Map(), + ...overrides, + }; +} + +describe("processEvent (functional)", () => { + it("calls provider hangup when rejecting inbound call", () => { + const hangupCalls: HangupCallInput[] = []; + const provider = { + name: "plivo" as const, + async hangupCall(input: HangupCallInput): Promise { + hangupCalls.push(input); + }, + }; + + const ctx = createContext({ + config: VoiceCallConfigSchema.parse({ + enabled: true, + provider: "plivo", + fromNumber: "+15550000000", + inboundPolicy: "disabled", + }), + provider, + }); + const event: NormalizedEvent = { + id: "evt-1", + type: "call.initiated", + callId: "prov-1", + providerCallId: "prov-1", + timestamp: Date.now(), + direction: "inbound", + from: "+15559999999", + to: "+15550000000", + }; + + processEvent(ctx, event); + + expect(ctx.activeCalls.size).toBe(0); + expect(hangupCalls).toHaveLength(1); + expect(hangupCalls[0]).toEqual({ + callId: "prov-1", + providerCallId: "prov-1", + reason: "hangup-bot", + }); + }); + + it("does not call hangup when provider is null", () => { + const ctx = createContext({ + config: VoiceCallConfigSchema.parse({ + enabled: true, + provider: "plivo", + fromNumber: "+15550000000", + inboundPolicy: "disabled", + }), + provider: null, + }); + const event: NormalizedEvent = { + id: "evt-2", + type: "call.initiated", + callId: "prov-2", + providerCallId: "prov-2", + timestamp: Date.now(), + direction: "inbound", + from: "+15551111111", + to: "+15550000000", + }; + + processEvent(ctx, event); + + expect(ctx.activeCalls.size).toBe(0); + }); + + it("calls hangup only once for duplicate events for same rejected call", () => { + const hangupCalls: HangupCallInput[] = []; + const provider = { + name: "plivo" as const, + async hangupCall(input: HangupCallInput): Promise { + hangupCalls.push(input); + }, + }; + const ctx = createContext({ + config: VoiceCallConfigSchema.parse({ + enabled: true, + provider: "plivo", + fromNumber: "+15550000000", + inboundPolicy: "disabled", + }), + provider, + }); + const event1: NormalizedEvent = { + id: "evt-init", + type: "call.initiated", + callId: "prov-dup", + providerCallId: "prov-dup", + timestamp: Date.now(), + direction: "inbound", + from: "+15552222222", + to: "+15550000000", + }; + const event2: NormalizedEvent = { + id: "evt-ring", + type: "call.ringing", + callId: "prov-dup", + providerCallId: "prov-dup", + timestamp: Date.now(), + direction: "inbound", + from: "+15552222222", + to: "+15550000000", + }; + + processEvent(ctx, event1); + processEvent(ctx, event2); + + expect(ctx.activeCalls.size).toBe(0); + expect(hangupCalls).toHaveLength(1); + expect(hangupCalls[0]?.providerCallId).toBe("prov-dup"); + }); + + it("updates providerCallId map when provider ID changes", () => { + const now = Date.now(); + const ctx = createContext(); + ctx.activeCalls.set("call-1", { + callId: "call-1", + providerCallId: "request-uuid", + provider: "plivo", + direction: "outbound", + state: "initiated", + from: "+15550000000", + to: "+15550000001", + startedAt: now, + transcript: [], + processedEventIds: [], + metadata: {}, + }); + ctx.providerCallIdMap.set("request-uuid", "call-1"); + + processEvent(ctx, { + id: "evt-provider-id-change", + type: "call.answered", + callId: "call-1", + providerCallId: "call-uuid", + timestamp: now + 1, + }); + + expect(ctx.activeCalls.get("call-1")?.providerCallId).toBe("call-uuid"); + expect(ctx.providerCallIdMap.get("call-uuid")).toBe("call-1"); + expect(ctx.providerCallIdMap.has("request-uuid")).toBe(false); + }); + + it("invokes onCallAnswered hook for answered events", () => { + const now = Date.now(); + let answeredCallId: string | null = null; + const ctx = createContext({ + onCallAnswered: (call) => { + answeredCallId = call.callId; + }, + }); + ctx.activeCalls.set("call-2", { + callId: "call-2", + providerCallId: "call-2-provider", + provider: "plivo", + direction: "inbound", + state: "ringing", + from: "+15550000002", + to: "+15550000000", + startedAt: now, + transcript: [], + processedEventIds: [], + metadata: {}, + }); + ctx.providerCallIdMap.set("call-2-provider", "call-2"); + + processEvent(ctx, { + id: "evt-answered-hook", + type: "call.answered", + callId: "call-2", + providerCallId: "call-2-provider", + timestamp: now + 1, + }); + + expect(answeredCallId).toBe("call-2"); + }); + + it("when hangup throws, logs and does not throw", () => { + const provider = { + name: "plivo" as const, + async hangupCall(): Promise { + throw new Error("provider down"); + }, + }; + const ctx = createContext({ + config: VoiceCallConfigSchema.parse({ + enabled: true, + provider: "plivo", + fromNumber: "+15550000000", + inboundPolicy: "disabled", + }), + provider, + }); + const event: NormalizedEvent = { + id: "evt-fail", + type: "call.initiated", + callId: "prov-fail", + providerCallId: "prov-fail", + timestamp: Date.now(), + direction: "inbound", + from: "+15553333333", + to: "+15550000000", + }; + + expect(() => processEvent(ctx, event)).not.toThrow(); + expect(ctx.activeCalls.size).toBe(0); + }); +}); diff --git a/extensions/voice-call/src/manager/events.ts b/extensions/voice-call/src/manager/events.ts index 3ebc8423eff..2fb3639250e 100644 --- a/extensions/voice-call/src/manager/events.ts +++ b/extensions/voice-call/src/manager/events.ts @@ -94,7 +94,29 @@ export function processEvent(ctx: CallManagerContext, event: NormalizedEvent): v if (!call && event.direction === "inbound" && event.providerCallId) { if (!shouldAcceptInbound(ctx.config, event.from)) { - // TODO: Could hang up the call here. + const pid = event.providerCallId; + if (!ctx.provider) { + console.warn( + `[voice-call] Inbound call rejected by policy but no provider to hang up (providerCallId: ${pid}, from: ${event.from}); call will time out on provider side.`, + ); + return; + } + if (ctx.rejectedProviderCallIds.has(pid)) { + return; + } + ctx.rejectedProviderCallIds.add(pid); + const callId = event.callId ?? pid; + console.log(`[voice-call] Rejecting inbound call by policy: ${pid}`); + void ctx.provider + .hangupCall({ + callId, + providerCallId: pid, + reason: "hangup-bot", + }) + .catch((err) => { + const message = err instanceof Error ? err.message : String(err); + console.warn(`[voice-call] Failed to reject inbound call ${pid}:`, message); + }); return; } @@ -113,9 +135,16 @@ export function processEvent(ctx: CallManagerContext, event: NormalizedEvent): v return; } - if (event.providerCallId && !call.providerCallId) { + if (event.providerCallId && event.providerCallId !== call.providerCallId) { + const previousProviderCallId = call.providerCallId; call.providerCallId = event.providerCallId; ctx.providerCallIdMap.set(event.providerCallId, call.callId); + if (previousProviderCallId) { + const mapped = ctx.providerCallIdMap.get(previousProviderCallId); + if (mapped === call.callId) { + ctx.providerCallIdMap.delete(previousProviderCallId); + } + } } call.processedEventIds.push(event.id); @@ -139,6 +168,7 @@ export function processEvent(ctx: CallManagerContext, event: NormalizedEvent): v await endCall(ctx, callId); }, }); + ctx.onCallAnswered?.(call); break; case "call.active": diff --git a/extensions/voice-call/src/manager/store.ts b/extensions/voice-call/src/manager/store.ts index 888381c3342..a15edaa8277 100644 --- a/extensions/voice-call/src/manager/store.ts +++ b/extensions/voice-call/src/manager/store.ts @@ -16,6 +16,7 @@ export function loadActiveCallsFromStore(storePath: string): { activeCalls: Map; providerCallIdMap: Map; processedEventIds: Set; + rejectedProviderCallIds: Set; } { const logPath = path.join(storePath, "calls.jsonl"); if (!fs.existsSync(logPath)) { @@ -23,6 +24,7 @@ export function loadActiveCallsFromStore(storePath: string): { activeCalls: new Map(), providerCallIdMap: new Map(), processedEventIds: new Set(), + rejectedProviderCallIds: new Set(), }; } @@ -45,6 +47,7 @@ export function loadActiveCallsFromStore(storePath: string): { const activeCalls = new Map(); const providerCallIdMap = new Map(); const processedEventIds = new Set(); + const rejectedProviderCallIds = new Set(); for (const [callId, call] of callMap) { if (TerminalStates.has(call.state)) { @@ -59,7 +62,7 @@ export function loadActiveCallsFromStore(storePath: string): { } } - return { activeCalls, providerCallIdMap, processedEventIds }; + return { activeCalls, providerCallIdMap, processedEventIds, rejectedProviderCallIds }; } export async function getCallHistoryFromStore(