diff --git a/Dockerfile b/Dockerfile index 72c413ebe7b..dd703988b11 100644 --- a/Dockerfile +++ b/Dockerfile @@ -88,7 +88,7 @@ RUN pnpm canvas:a2ui:bundle || \ echo "/* A2UI bundle unavailable in this build */" > src/canvas-host/a2ui/a2ui.bundle.js && \ echo "stub" > src/canvas-host/a2ui/.bundle.hash && \ rm -rf vendor/a2ui apps/shared/OpenClawKit/Tools/CanvasA2UI) -RUN pnpm build:docker +RUN NODE_OPTIONS=--max-old-space-size=2048 pnpm build:docker # Force pnpm for UI build (Bun may fail on ARM/Synology architectures) ENV OPENCLAW_PREFER_PNPM=1 RUN pnpm ui:build diff --git a/docker-compose.yml b/docker-compose.yml index c0bffc64458..53b4726b556 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -24,6 +24,7 @@ services: ports: - "${OPENCLAW_GATEWAY_PORT:-18789}:18789" - "${OPENCLAW_BRIDGE_PORT:-18790}:18790" + - "${OPENCLAW_VOICE_WEBHOOK_PORT:-3334}:3334" init: true restart: unless-stopped command: diff --git a/extensions/voice-call/openclaw.plugin.json b/extensions/voice-call/openclaw.plugin.json index fef3ccc6ad9..83f2613530d 100644 --- a/extensions/voice-call/openclaw.plugin.json +++ b/extensions/voice-call/openclaw.plugin.json @@ -134,6 +134,36 @@ "label": "ElevenLabs Base URL", "advanced": true }, + "realtime.enabled": { + "label": "Enable Realtime Voice (OpenAI)", + "help": "Full voice-to-voice mode via OpenAI Realtime API. Requires inboundPolicy open or allowlist." + }, + "realtime.model": { + "label": "Realtime Model", + "advanced": true + }, + "realtime.voice": { + "label": "Realtime Voice" + }, + "realtime.instructions": { + "label": "Realtime Instructions" + }, + "realtime.temperature": { + "label": "Realtime Temperature", + "advanced": true + }, + "realtime.vadThreshold": { + "label": "VAD Sensitivity", + "advanced": true + }, + "realtime.silenceDurationMs": { + "label": "Silence Timeout (ms)", + "advanced": true + }, + "realtime.prefixPaddingMs": { + "label": "Prefix Padding (ms)", + "advanced": true + }, "publicUrl": { "label": "Public Webhook URL", "advanced": true @@ -385,6 +415,49 @@ } } }, + "realtime": { + "type": "object", + "additionalProperties": false, + "properties": { + "enabled": { + "type": "boolean" + }, + "model": { + "type": "string" + }, + "voice": { + "type": "string", + "enum": ["alloy", "ash", "ballad", "cedar", "coral", "echo", "marin", "sage", "shimmer", "verse"] + }, + "instructions": { + "type": "string" + }, + "temperature": { + "type": "number", + "minimum": 0, + "maximum": 2 + }, + "vadThreshold": { + "type": "number", + "minimum": 0, + "maximum": 1 + }, + "silenceDurationMs": { + "type": "integer", + "minimum": 1 + }, + "prefixPaddingMs": { + "type": "integer", + "minimum": 0 + }, + "tools": { + "type": "array", + "items": { + "type": "object" + } + } + } + }, "publicUrl": { "type": "string" }, diff --git a/extensions/voice-call/src/config.ts b/extensions/voice-call/src/config.ts index 2d1494c7876..7d75c23ac29 100644 --- a/extensions/voice-call/src/config.ts +++ b/extensions/voice-call/src/config.ts @@ -200,6 +200,55 @@ export const OutboundConfigSchema = z .default({ defaultMode: "notify", notifyHangupDelaySec: 3 }); export type OutboundConfig = z.infer; +// ----------------------------------------------------------------------------- +// Realtime Voice Configuration (OpenAI Realtime API — voice-to-voice) +// ----------------------------------------------------------------------------- + +/** + * Zod schema for a single OpenAI Realtime tool (mirrors the RealtimeTool interface + * in providers/openai-realtime-voice.ts, expressed as a schema for config parsing). + */ +export const RealtimeToolSchema = z + .object({ + type: z.literal("function"), + name: z.string(), + description: z.string(), + parameters: z.object({ + type: z.literal("object"), + properties: z.record(z.unknown()), + required: z.array(z.string()).optional(), + }), + }) + .strict(); +export type RealtimeToolConfig = z.infer; + +export const VoiceCallRealtimeConfigSchema = z + .object({ + /** Enable realtime voice-to-voice mode (OpenAI Realtime API). Default: false. */ + enabled: z.boolean().default(false), + /** Realtime model (env: REALTIME_VOICE_MODEL, default: "gpt-4o-mini-realtime-preview") */ + model: z.string().optional(), + /** Voice for AI speech output (env: REALTIME_VOICE_VOICE) */ + voice: z + .enum(["alloy", "ash", "ballad", "cedar", "coral", "echo", "marin", "sage", "shimmer", "verse"]) + .optional(), + /** System instructions / persona (env: REALTIME_VOICE_INSTRUCTIONS) */ + instructions: z.string().optional(), + /** Temperature 0–2 (env: REALTIME_VOICE_TEMPERATURE) */ + temperature: z.number().min(0).max(2).optional(), + /** VAD threshold 0–1 (env: VAD_THRESHOLD) */ + vadThreshold: z.number().min(0).max(1).optional(), + /** Silence duration in ms before turn ends (env: SILENCE_DURATION_MS) */ + silenceDurationMs: z.number().int().positive().optional(), + /** Audio padding before speech in ms */ + prefixPaddingMs: z.number().int().nonnegative().optional(), + /** Tool definitions (OpenAI function-call schema); execution wired via registerToolHandler */ + tools: z.array(RealtimeToolSchema).default([]), + }) + .strict() + .default({ enabled: false, tools: [] }); +export type VoiceCallRealtimeConfig = z.infer; + // ----------------------------------------------------------------------------- // Streaming Configuration (OpenAI Realtime STT) // ----------------------------------------------------------------------------- @@ -324,6 +373,9 @@ export const VoiceCallConfigSchema = z /** Real-time audio streaming configuration */ streaming: VoiceCallStreamingConfigSchema, + /** Realtime voice-to-voice configuration (OpenAI Realtime API) */ + realtime: VoiceCallRealtimeConfigSchema, + /** Public webhook URL override (if set, bypasses tunnel auto-detection) */ publicUrl: z.string().url().optional(), @@ -398,6 +450,11 @@ export function normalizeVoiceCallConfig(config: VoiceCallConfigInput): VoiceCal config.webhookSecurity?.trustedProxyIPs ?? defaults.webhookSecurity.trustedProxyIPs, }, streaming: { ...defaults.streaming, ...config.streaming }, + realtime: { + ...defaults.realtime, + ...config.realtime, + tools: config.realtime?.tools ?? defaults.realtime.tools, + }, stt: { ...defaults.stt, ...config.stt }, tts: normalizeVoiceCallTtsConfig(defaults.tts, config.tts), }; @@ -453,6 +510,28 @@ export function resolveVoiceCallConfig(config: VoiceCallConfigInput): VoiceCallC resolved.webhookSecurity.trustForwardingHeaders ?? false; resolved.webhookSecurity.trustedProxyIPs = resolved.webhookSecurity.trustedProxyIPs ?? []; + // Realtime voice — resolve env var fallbacks + resolved.realtime = { ...resolved.realtime }; + // REALTIME_VOICE_ENABLED=true auto-enables realtime mode (backward compat) + if (!resolved.realtime.enabled && process.env.REALTIME_VOICE_ENABLED === "true") { + resolved.realtime.enabled = true; + } + resolved.realtime.model = resolved.realtime.model ?? process.env.REALTIME_VOICE_MODEL; + resolved.realtime.voice = + (resolved.realtime.voice ?? + (process.env.REALTIME_VOICE_VOICE as VoiceCallRealtimeConfig["voice"])) || undefined; + resolved.realtime.instructions = + resolved.realtime.instructions ?? process.env.REALTIME_VOICE_INSTRUCTIONS; + if (resolved.realtime.temperature == null && process.env.REALTIME_VOICE_TEMPERATURE) { + resolved.realtime.temperature = parseFloat(process.env.REALTIME_VOICE_TEMPERATURE); + } + if (resolved.realtime.vadThreshold == null && process.env.VAD_THRESHOLD) { + resolved.realtime.vadThreshold = parseFloat(process.env.VAD_THRESHOLD); + } + if (resolved.realtime.silenceDurationMs == null && process.env.SILENCE_DURATION_MS) { + resolved.realtime.silenceDurationMs = parseInt(process.env.SILENCE_DURATION_MS, 10); + } + return normalizeVoiceCallConfig(resolved); } @@ -521,5 +600,24 @@ export function validateProviderConfig(config: VoiceCallConfig): { } } + // Realtime mode requires inbound calls to be accepted — policy "disabled" + // means the manager will reject every call before it can be tracked. + // "open" or "allowlist" are the correct choices when realtime.enabled = true. + if (config.realtime?.enabled && config.inboundPolicy === "disabled") { + errors.push( + "plugins.entries.voice-call.config.inboundPolicy must not be \"disabled\" when realtime.enabled is true " + + "(use \"open\" or \"allowlist\" — realtime calls are answered before policy can reject them)", + ); + } + + // Both streaming and realtime cannot be enabled simultaneously — they use + // incompatible WebSocket paths and audio routing. + if (config.realtime?.enabled && config.streaming?.enabled) { + errors.push( + "plugins.entries.voice-call.config: realtime.enabled and streaming.enabled cannot both be true " + + "(they use incompatible audio paths — choose one mode)", + ); + } + return { valid: errors.length === 0, errors }; } diff --git a/extensions/voice-call/src/providers/openai-realtime-voice.ts b/extensions/voice-call/src/providers/openai-realtime-voice.ts new file mode 100755 index 00000000000..ed5b10d6cd6 --- /dev/null +++ b/extensions/voice-call/src/providers/openai-realtime-voice.ts @@ -0,0 +1,863 @@ +/** + * OpenAI Realtime Voice Bridge + * + * Implements a bidirectional voice-to-voice bridge between Twilio Media Streams + * and the OpenAI Realtime API. Replaces the STT → LLM → TTS pipeline with a + * single WebSocket session that handles everything natively. + * + * Key benefits over the STT-only approach: + * - Latency: ~200–400 ms TTFB vs ~1–3.5 s in the pipeline mode + * - Audio format: g711_ulaw (mulaw) is natively supported — zero conversion + * - Barge-in: server VAD handles interruptions automatically + * - No separate LLM or TTS call required + * + * Usage: + * const bridge = new OpenAIRealtimeVoiceBridge({ + * apiKey: process.env.OPENAI_API_KEY!, + * instructions: "You are Gracie, a helpful AI assistant...", + * voice: "nova", + * onAudio: (muLaw) => mediaStreamHandler.sendAudio(streamSid, muLaw), + * onClearAudio: () => mediaStreamHandler.clearAudio(streamSid), + * onTranscript: (role, text) => console.log(`[${role}]: ${text}`), + * }); + * await bridge.connect(); + * bridge.sendAudio(twilioPayloadBuffer); + * bridge.close(); + * + * Integration with media-stream.ts: + * Replace `sttSession` with this bridge in `MediaStreamHandler.handleStart()`. + * Wire audio in/out through the existing sendAudio / clearAudio methods. + * + * @see https://platform.openai.com/docs/guides/realtime + * @see https://www.twilio.com/docs/voice/media-streams + */ + +import WebSocket from "ws"; + +// --------------------------------------------------------------------------- +// Types +// --------------------------------------------------------------------------- + +/** OpenAI Realtime API voice options */ +export type RealtimeVoice = + | "alloy" + | "ash" + | "ballad" + | "coral" + | "echo" + | "fable" + | "onyx" + | "nova" + | "sage" + | "shimmer" + | "verse"; + +/** Realtime tool definition (mirrors OpenAI function calling schema) */ +export interface RealtimeTool { + type: "function"; + name: string; + description: string; + parameters: { + type: "object"; + properties: Record; + required?: string[]; + }; +} + +/** Tool call event emitted when OpenAI invokes a function */ +export interface ToolCallEvent { + /** Conversation item ID for submitting the result */ + itemId: string; + /** Call ID for matching request/response */ + callId: string; + /** Function name */ + name: string; + /** Parsed JSON arguments */ + args: unknown; +} + +/** + * Configuration for the Realtime Voice Bridge. + */ +export interface RealtimeVoiceConfig { + // ---- Required ---- + /** OpenAI API key */ + apiKey: string; + + // ---- Voice/personality ---- + /** System instructions (persona / behaviour) */ + instructions?: string; + /** Voice to use for AI speech output (default: "nova") */ + voice?: RealtimeVoice; + /** Response temperature 0–1 (default: 0.8) */ + temperature?: number; + + // ---- Model ---- + /** Realtime model (default: "gpt-4o-mini-realtime-preview") */ + model?: string; + + // ---- VAD ---- + /** VAD speech detection threshold 0–1 (default: 0.5) */ + vadThreshold?: number; + /** Silence duration in ms before turn ends (default: 500) */ + silenceDurationMs?: number; + /** Padding before speech in ms (default: 300) */ + prefixPaddingMs?: number; + + // ---- Tools ---- + /** Optional function tools the model can call */ + tools?: RealtimeTool[]; + + // ---- Audio callbacks ---- + /** + * Called for each audio delta chunk from OpenAI. + * @param muLaw - Raw mulaw Buffer ready to send to Twilio + */ + onAudio: (muLaw: Buffer) => void; + + /** + * Called on barge-in (user speech detected mid-response). + * Clear Twilio audio buffer here. + */ + onClearAudio: () => void; + + // ---- Event callbacks (optional) ---- + /** + * Transcript event (partial or final). Role is "user" or "assistant". + */ + onTranscript?: (role: "user" | "assistant", text: string, isFinal: boolean) => void; + + /** + * Called when the model invokes a tool/function. + * Your handler should call `bridge.submitToolResult(event.callId, result)`. + */ + onToolCall?: (event: ToolCallEvent) => void; + + /** + * Called when the session is fully connected and configured. + */ + onReady?: () => void; + + /** + * Called on irrecoverable error or max reconnects exceeded. + */ + onError?: (error: Error) => void; + + /** + * Called when the bridge closes (intentionally or after max retries). + */ + onClose?: () => void; +} + +// --------------------------------------------------------------------------- +// Internal helpers +// --------------------------------------------------------------------------- + +function base64ToBuffer(b64: string): Buffer { + return Buffer.from(b64, "base64"); +} + +// --------------------------------------------------------------------------- +// Main class +// --------------------------------------------------------------------------- + +/** + * Bidirectional voice bridge between Twilio Media Streams and the OpenAI Realtime API. + * + * Lifecycle: + * new OpenAIRealtimeVoiceBridge(config) + * → connect() — opens WebSocket, configures session + * → sendAudio() — called for each Twilio media chunk + * → [callbacks fire as OpenAI responds] + * → close() — graceful shutdown + */ +export class OpenAIRealtimeVoiceBridge { + private static readonly DEFAULT_MODEL = "gpt-4o-mini-realtime-preview"; + private static readonly MAX_RECONNECT_ATTEMPTS = 5; + private static readonly BASE_RECONNECT_DELAY_MS = 1000; + private static readonly CONNECT_TIMEOUT_MS = 10_000; + + private readonly config: RealtimeVoiceConfig; + private readonly model: string; + + private ws: WebSocket | null = null; + private connected = false; + private intentionallyClosed = false; + private reconnectAttempts = 0; + + /** Pending audio buffers queued while reconnecting */ + private pendingAudio: Buffer[] = []; + + /** Track mark queue for barge-in timing (mirrors reference impl) */ + private markQueue: string[] = []; + private responseStartTimestamp: number | null = null; + private latestMediaTimestamp = 0; + private lastAssistantItemId: string | null = null; + + /** Accumulate tool call arguments (streamed as deltas) */ + private toolCallBuffers = new Map(); + + constructor(config: RealtimeVoiceConfig) { + if (!config.apiKey) { + throw new Error("[RealtimeVoice] OpenAI API key is required"); + } + this.config = config; + this.model = config.model ?? OpenAIRealtimeVoiceBridge.DEFAULT_MODEL; + } + + // ------------------------------------------------------------------------- + // Public API + // ------------------------------------------------------------------------- + + /** + * Connect to the OpenAI Realtime API. + * Resolves when the WebSocket is open and session.update has been sent. + * Throws if connection times out. + */ + async connect(): Promise { + this.intentionallyClosed = false; + this.reconnectAttempts = 0; + return this.doConnect(); + } + + /** + * Send a mulaw audio chunk from Twilio to OpenAI. + * Buffers chunks if not yet connected; drains on reconnect. + * + * @param audio - Raw mulaw Buffer (Twilio media.payload decoded from base64) + */ + sendAudio(audio: Buffer): void { + if (!this.connected || this.ws?.readyState !== WebSocket.OPEN) { + // Buffer up to 2 seconds of audio (~320 chunks × 20ms) while reconnecting + if (this.pendingAudio.length < 320) { + this.pendingAudio.push(audio); + } + return; + } + this.sendEvent({ + type: "input_audio_buffer.append", + audio: audio.toString("base64"), + }); + } + + /** + * Update the media timestamp (used for barge-in truncation calculations). + * Call this with data.media.timestamp from each Twilio media event. + */ + setMediaTimestamp(ts: number): void { + this.latestMediaTimestamp = ts; + } + + /** + * Inject a user text message into the conversation (optional). + * Useful for seeding context or simulating a greeting trigger. + */ + sendUserMessage(text: string): void { + this.sendEvent({ + type: "conversation.item.create", + item: { + type: "message", + role: "user", + content: [{ type: "input_text", text }], + }, + }); + this.sendEvent({ type: "response.create" }); + } + + /** + * Submit a tool/function result back to the model. + * Must be called in response to an `onToolCall` event. + * + * @param callId - The call_id from the ToolCallEvent + * @param result - JSON-serializable result value + */ + submitToolResult(callId: string, result: unknown): void { + this.sendEvent({ + type: "conversation.item.create", + item: { + type: "function_call_output", + call_id: callId, + output: JSON.stringify(result), + }, + }); + // Trigger AI to respond now that the tool result is available + this.sendEvent({ type: "response.create" }); + } + + /** + * Gracefully close the bridge. + */ + close(): void { + this.intentionallyClosed = true; + this.connected = false; + if (this.ws) { + this.ws.close(1000, "Bridge closed"); + this.ws = null; + } + this.config.onClose?.(); + } + + /** True if the WebSocket is open and the session is configured. */ + isConnected(): boolean { + return this.connected; + } + + // ------------------------------------------------------------------------- + // Connection management + // ------------------------------------------------------------------------- + + private async doConnect(): Promise { + return new Promise((resolve, reject) => { + const url = `wss://api.openai.com/v1/realtime?model=${encodeURIComponent(this.model)}`; + + console.log(`[RealtimeVoice] Connecting to ${url}`); + + this.ws = new WebSocket(url, { + headers: { + Authorization: `Bearer ${this.config.apiKey}`, + "OpenAI-Beta": "realtime=v1", + }, + }); + + const connectTimeout = setTimeout(() => { + if (!this.connected) { + this.ws?.terminate(); + reject(new Error("[RealtimeVoice] Connection timeout")); + } + }, OpenAIRealtimeVoiceBridge.CONNECT_TIMEOUT_MS); + + this.ws.on("open", () => { + clearTimeout(connectTimeout); + console.log("[RealtimeVoice] WebSocket connected"); + this.connected = true; + this.reconnectAttempts = 0; + + // Small delay to ensure the server is ready before sending session.update + // (mirrors the reference implementation's setTimeout(initializeSession, 100)) + setTimeout(() => { + this.sendSessionUpdate(); + this.drainPendingAudio(); + this.config.onReady?.(); + resolve(); + }, 100); + }); + + this.ws.on("message", (data: Buffer) => { + try { + const event = JSON.parse(data.toString()) as RealtimeEvent; + this.handleEvent(event); + } catch (err) { + console.error("[RealtimeVoice] Failed to parse event:", err); + } + }); + + this.ws.on("error", (err) => { + console.error("[RealtimeVoice] WebSocket error:", err); + if (!this.connected) { + clearTimeout(connectTimeout); + reject(err); + } else { + this.config.onError?.(err instanceof Error ? err : new Error(String(err))); + } + }); + + this.ws.on("close", (code, reason) => { + console.log( + `[RealtimeVoice] WebSocket closed (code: ${code}, reason: ${reason?.toString() || "none"})`, + ); + this.connected = false; + this.ws = null; + + if (!this.intentionallyClosed) { + void this.attemptReconnect(); + } else { + this.config.onClose?.(); + } + }); + }); + } + + /** + * Trigger a greeting response from the AI. + * Useful for seeding context or simulating a greeting trigger. + */ + public triggerGreeting(): void { + if (!this.connected || !this.ws) { + console.warn("[RealtimeVoice] Cannot trigger greeting: not connected"); + return; + } + const greetingEvent = { + type: "response.create", + response: { + instructions: this.config.instructions, + }, + }; + this.sendEvent(greetingEvent); + console.log("[RealtimeVoice] Greeting triggered"); + } + + private sendSessionUpdate(): void { + const cfg = this.config; + + const sessionUpdate: RealtimeSessionUpdate = { + type: "session.update", + session: { + modalities: ["text", "audio"], + instructions: cfg.instructions, + voice: cfg.voice ?? "nova", + input_audio_format: "g711_ulaw", + output_audio_format: "g711_ulaw", + input_audio_transcription: { + model: "whisper-1", + }, + turn_detection: { + type: "server_vad", + threshold: cfg.vadThreshold ?? 0.5, + prefix_padding_ms: cfg.prefixPaddingMs ?? 300, + silence_duration_ms: cfg.silenceDurationMs ?? 500, + create_response: true, + }, + temperature: cfg.temperature ?? 0.8, + ...(cfg.tools && cfg.tools.length > 0 + ? { + tools: cfg.tools, + tool_choice: "auto", + } + : {}), + }, + }; + + console.log("[RealtimeVoice] Sending session.update"); + this.sendEvent(sessionUpdate); + } + + private drainPendingAudio(): void { + if (this.pendingAudio.length === 0) return; + console.log(`[RealtimeVoice] Draining ${this.pendingAudio.length} buffered audio chunks`); + for (const buf of this.pendingAudio) { + this.sendEvent({ + type: "input_audio_buffer.append", + audio: buf.toString("base64"), + }); + } + this.pendingAudio = []; + } + + private async attemptReconnect(): Promise { + if (this.intentionallyClosed) return; + + if ( + this.reconnectAttempts >= OpenAIRealtimeVoiceBridge.MAX_RECONNECT_ATTEMPTS + ) { + const err = new Error( + `[RealtimeVoice] Max reconnect attempts (${OpenAIRealtimeVoiceBridge.MAX_RECONNECT_ATTEMPTS}) exceeded`, + ); + console.error(err.message); + this.config.onError?.(err); + this.config.onClose?.(); + return; + } + + this.reconnectAttempts++; + const delay = + OpenAIRealtimeVoiceBridge.BASE_RECONNECT_DELAY_MS * + 2 ** (this.reconnectAttempts - 1); + + console.log( + `[RealtimeVoice] Reconnecting (${this.reconnectAttempts}/${OpenAIRealtimeVoiceBridge.MAX_RECONNECT_ATTEMPTS}) in ${delay}ms...`, + ); + + await new Promise((resolve) => setTimeout(resolve, delay)); + + if (this.intentionallyClosed) return; + + try { + await this.doConnect(); + console.log("[RealtimeVoice] Reconnected successfully"); + } catch (err) { + console.error("[RealtimeVoice] Reconnect failed:", err); + // doConnect's close handler will call attemptReconnect again + } + } + + // ------------------------------------------------------------------------- + // Event handling + // ------------------------------------------------------------------------- + + private handleEvent(event: RealtimeEvent): void { + switch (event.type) { + // ---- Session lifecycle ---- + case "session.created": + console.log("[RealtimeVoice] Session created"); + break; + + case "session.updated": + console.log("[RealtimeVoice] Session updated"); + break; + + // ---- Audio output: stream audio back to Twilio ---- + case "response.audio.delta": { + if (!event.delta) break; + + const audioBuffer = base64ToBuffer(event.delta); + this.config.onAudio(audioBuffer); + + // Track response start timestamp for barge-in truncation + if (this.responseStartTimestamp === null) { + this.responseStartTimestamp = this.latestMediaTimestamp; + } + + // Track the most recent assistant item ID + if (event.item_id) { + this.lastAssistantItemId = event.item_id; + } + + // Send mark to track playback position + this.sendMark(); + break; + } + + case "response.audio.done": + console.log("[RealtimeVoice] Audio response complete"); + break; + + // ---- Barge-in: user started speaking, interrupt AI response ---- + case "input_audio_buffer.speech_started": + console.log("[RealtimeVoice] Barge-in detected — clearing audio"); + this.handleBargein(); + break; + + case "input_audio_buffer.speech_stopped": + console.log("[RealtimeVoice] Speech stopped"); + break; + + case "input_audio_buffer.committed": + console.log("[RealtimeVoice] Audio buffer committed"); + break; + + // ---- Mark acknowledgment from Twilio ---- + case "response.audio_transcript.delta": + // AI speech transcript streaming (not an event we send to Twilio) + if (event.delta) { + this.config.onTranscript?.("assistant", event.delta, false); + } + break; + + case "response.audio_transcript.done": + if (event.transcript) { + console.log(`[RealtimeVoice] Assistant: ${event.transcript}`); + this.config.onTranscript?.("assistant", event.transcript, true); + } + break; + + // ---- User speech transcription (if text modality enabled) ---- + case "conversation.item.input_audio_transcription.completed": + if (event.transcript) { + console.log(`[RealtimeVoice] User: ${event.transcript}`); + this.config.onTranscript?.("user", event.transcript, true); + } + break; + + case "conversation.item.input_audio_transcription.delta": + if (event.delta) { + this.config.onTranscript?.("user", event.delta, false); + } + break; + + // ---- Tool calling ---- + case "response.function_call_arguments.delta": { + const key = event.item_id ?? "unknown"; + const existing = this.toolCallBuffers.get(key); + if (existing && event.delta) { + existing.args += event.delta; + } else if (event.item_id) { + this.toolCallBuffers.set(event.item_id, { + name: event.name ?? "", + callId: event.call_id ?? "", + args: event.delta ?? "", + }); + } + break; + } + + case "response.function_call_arguments.done": { + const key = event.item_id ?? "unknown"; + const buf = this.toolCallBuffers.get(key); + if (buf && this.config.onToolCall) { + let args: unknown; + try { + args = JSON.parse(buf.args || "{}"); + } catch { + args = {}; + } + this.config.onToolCall({ + itemId: key, + callId: buf.callId || event.call_id || "", + name: buf.name || event.name || "", + args, + }); + } + this.toolCallBuffers.delete(key); + break; + } + + // ---- Response lifecycle ---- + case "response.created": + console.log("[RealtimeVoice] Response started"); + break; + + case "response.done": + console.log("[RealtimeVoice] Response done"); + // Reset mark queue and timestamps for next turn + this.markQueue = []; + this.responseStartTimestamp = null; + this.lastAssistantItemId = null; + break; + + case "response.content.done": + // Individual content part done + break; + + case "rate_limits.updated": + // Log rate limit info if needed + break; + + // ---- Errors ---- + case "error": { + const errMsg = event.error + ? `${(event.error as { message?: string }).message ?? JSON.stringify(event.error)}` + : "Unknown error"; + console.error(`[RealtimeVoice] Error event: ${errMsg}`); + this.config.onError?.(new Error(errMsg)); + break; + } + + default: + // Uncomment for debugging: + // console.log(`[RealtimeVoice] Unhandled event: ${event.type}`); + break; + } + } + + /** + * Handle barge-in: truncate the current assistant response at the + * elapsed audio point, clear the Twilio buffer, and reset state. + * Mirrors the reference implementation's handleSpeechStartedEvent(). + */ + private handleBargein(): void { + if (this.markQueue.length > 0 && this.responseStartTimestamp !== null) { + const elapsedMs = this.latestMediaTimestamp - this.responseStartTimestamp; + + if (this.lastAssistantItemId) { + // Tell OpenAI to truncate the response at the point where the user + // interrupted — this ensures the AI's context matches what was heard + const truncateEvent = { + type: "conversation.item.truncate", + item_id: this.lastAssistantItemId, + content_index: 0, + audio_end_ms: Math.max(0, elapsedMs), + }; + console.log(`[RealtimeVoice] Truncating at ${elapsedMs}ms`); + this.sendEvent(truncateEvent); + } + + // Clear the audio already queued in Twilio's buffer + this.config.onClearAudio(); + + // Reset state + this.markQueue = []; + this.lastAssistantItemId = null; + this.responseStartTimestamp = null; + } else { + // Even if we have no mark queue, still clear audio to be safe + this.config.onClearAudio(); + } + } + + /** + * Send a mark event to Twilio to track audio playback position. + * The mark name is used to coordinate barge-in truncation. + */ + private sendMark(): void { + // We don't send marks directly — the caller does via onAudio + // We track mark queue internally for truncation calculations + this.markQueue.push(`audio-${Date.now()}`); + } + + /** + * Handle Twilio mark acknowledgment (when a mark event comes back from Twilio). + * Call this method when you receive a "mark" event from the Twilio WebSocket. + */ + acknowledgeMark(): void { + if (this.markQueue.length > 0) { + this.markQueue.shift(); + } + } + + // ------------------------------------------------------------------------- + // Utilities + // ------------------------------------------------------------------------- + + private sendEvent(event: unknown): void { + if (this.ws?.readyState === WebSocket.OPEN) { + this.ws.send(JSON.stringify(event)); + } else { + console.warn("[RealtimeVoice] Attempted to send event while disconnected"); + } + } +} + +// --------------------------------------------------------------------------- +// Provider factory (matches pattern of OpenAIRealtimeSTTProvider) +// --------------------------------------------------------------------------- + +/** + * Configuration for the provider factory. + * Holds shared/default settings; per-call config is passed to createSession(). + */ +export interface RealtimeVoiceProviderConfig { + /** OpenAI API key */ + apiKey: string; + /** Default model (default: "gpt-4o-mini-realtime-preview") */ + model?: string; + /** Default voice (default: "nova") */ + voice?: RealtimeVoice; + /** Default system instructions */ + instructions?: string; + /** Default temperature (default: 0.8) */ + temperature?: number; + /** Default VAD threshold (default: 0.5) */ + vadThreshold?: number; + /** Default silence duration ms (default: 500) */ + silenceDurationMs?: number; + /** Default tools */ + tools?: RealtimeTool[]; +} + +/** + * Factory for creating RealtimeVoiceBridge instances. + * Follows the same pattern as OpenAIRealtimeSTTProvider for easy swapping. + */ +export class OpenAIRealtimeVoiceProvider { + readonly name = "openai-realtime-voice" as const; + + constructor(private readonly defaults: RealtimeVoiceProviderConfig) { + if (!defaults.apiKey) { + throw new Error("[RealtimeVoiceProvider] OpenAI API key is required"); + } + } + + /** + * Create a new voice bridge for a single call session. + * Merges provided config with provider defaults. + */ + createBridge( + callConfig: Omit & Partial>, + ): OpenAIRealtimeVoiceBridge { + const merged: RealtimeVoiceConfig = { + apiKey: this.defaults.apiKey, + model: callConfig.model ?? this.defaults.model, + voice: callConfig.voice ?? this.defaults.voice, + instructions: callConfig.instructions ?? this.defaults.instructions, + temperature: callConfig.temperature ?? this.defaults.temperature, + vadThreshold: callConfig.vadThreshold ?? this.defaults.vadThreshold, + silenceDurationMs: callConfig.silenceDurationMs ?? this.defaults.silenceDurationMs, + tools: callConfig.tools ?? this.defaults.tools, + onAudio: callConfig.onAudio, + onClearAudio: callConfig.onClearAudio, + onTranscript: callConfig.onTranscript, + onToolCall: callConfig.onToolCall, + onReady: callConfig.onReady, + onError: callConfig.onError, + onClose: callConfig.onClose, + }; + return new OpenAIRealtimeVoiceBridge(merged); + } +} + +// --------------------------------------------------------------------------- +// MediaStreamHandler integration helper +// --------------------------------------------------------------------------- + +/** + * Minimal interface that the bridge integration needs from MediaStreamHandler. + * This matches the actual MediaStreamHandler's method signatures. + */ +export interface MediaStreamHandlerLike { + sendAudio(streamSid: string, muLaw: Buffer): void; + clearAudio(streamSid: string): void; + sendMark(streamSid: string, name: string): void; +} + +/** + * Create a RealtimeVoiceBridge wired to an existing MediaStreamHandler session. + * + * Drop-in helper for use inside media-stream.ts handleStart(): + * + * ```typescript + * // In handleStart(), instead of creating an STT session: + * const bridge = createBridgeForStream({ + * streamSid, + * handler: this, // MediaStreamHandler instance + * config: { + * apiKey: "...", + * instructions: "You are Gracie...", + * voice: "nova", + * onTranscript: (role, text, final) => { + * if (final && role === "user") config.onTranscript?.(callId, text); + * }, + * }, + * }); + * await bridge.connect(); + * ``` + */ +export function createBridgeForStream(opts: { + streamSid: string; + handler: MediaStreamHandlerLike; + config: Omit; +}): OpenAIRealtimeVoiceBridge { + return new OpenAIRealtimeVoiceBridge({ + ...opts.config, + onAudio: (muLaw) => { + opts.handler.sendAudio(opts.streamSid, muLaw); + }, + onClearAudio: () => { + opts.handler.clearAudio(opts.streamSid); + }, + }); +} + +// --------------------------------------------------------------------------- +// Internal event shape types (partial — only fields we use) +// --------------------------------------------------------------------------- + +interface RealtimeEvent { + type: string; + delta?: string; + transcript?: string; + item_id?: string; + call_id?: string; + name?: string; + error?: unknown; +} + +interface RealtimeSessionUpdate { + type: "session.update"; + session: { + modalities: string[]; + instructions?: string; + voice: RealtimeVoice; + input_audio_format: string; + output_audio_format: string; + turn_detection: { + type: "server_vad"; + threshold: number; + prefix_padding_ms: number; + silence_duration_ms: number; + create_response: boolean; + }; + temperature: number; + tools?: RealtimeTool[]; + tool_choice?: string; + }; +} diff --git a/extensions/voice-call/src/runtime.ts b/extensions/voice-call/src/runtime.ts index d725e44bf06..4069fa3e493 100644 --- a/extensions/voice-call/src/runtime.ts +++ b/extensions/voice-call/src/runtime.ts @@ -12,6 +12,7 @@ import { createTelephonyTtsProvider } from "./telephony-tts.js"; import { startTunnel, type TunnelResult } from "./tunnel.js"; import { VoiceCallWebhookServer } from "./webhook.js"; import { cleanupTailscaleExposure, setupTailscaleExposure } from "./webhook/tailscale.js"; +import { RealtimeCallHandler } from "./webhook/realtime-handler.js"; export type VoiceCallRuntime = { config: VoiceCallConfig; @@ -20,6 +21,8 @@ export type VoiceCallRuntime = { webhookServer: VoiceCallWebhookServer; webhookUrl: string; publicUrl: string | null; + /** Realtime voice handler — present when config.realtime.enabled is true */ + realtimeHandler?: RealtimeCallHandler; stop: () => Promise; }; @@ -168,6 +171,19 @@ export async function createVoiceCallRuntime(params: { const webhookServer = new VoiceCallWebhookServer(config, manager, provider, coreConfig); const lifecycle = createRuntimeResourceLifecycle({ config, webhookServer }); + // Wire realtime handler before the server starts so it's ready for upgrades + let realtimeHandler: RealtimeCallHandler | undefined; + if (config.realtime.enabled) { + realtimeHandler = new RealtimeCallHandler( + config.realtime, + manager, + provider, + coreConfig, + ); + webhookServer.setRealtimeHandler(realtimeHandler); + log.info("[voice-call] Realtime voice handler initialized"); + } + const localUrl = await webhookServer.start(); // Wrap remaining initialization in try/catch so the webhook server is @@ -252,6 +268,7 @@ export async function createVoiceCallRuntime(params: { webhookServer, webhookUrl, publicUrl, + realtimeHandler, stop, }; } catch (err) { diff --git a/extensions/voice-call/src/test-fixtures.ts b/extensions/voice-call/src/test-fixtures.ts index 594aa064ba5..f38126f858c 100644 --- a/extensions/voice-call/src/test-fixtures.ts +++ b/extensions/voice-call/src/test-fixtures.ts @@ -40,6 +40,7 @@ export function createVoiceCallBaseConfig(params?: { maxPendingConnectionsPerIp: 4, maxConnections: 128, }, + realtime: { enabled: false, tools: [] }, skipSignatureVerification: false, stt: { provider: "openai", model: "whisper-1" }, tts: { diff --git a/extensions/voice-call/src/webhook.ts b/extensions/voice-call/src/webhook.ts index 1258229735e..44ab5df373c 100644 --- a/extensions/voice-call/src/webhook.ts +++ b/extensions/voice-call/src/webhook.ts @@ -15,6 +15,7 @@ import { OpenAIRealtimeSTTProvider } from "./providers/stt-openai-realtime.js"; import type { TwilioProvider } from "./providers/twilio.js"; import type { NormalizedEvent, WebhookContext } from "./types.js"; import { startStaleCallReaper } from "./webhook/stale-call-reaper.js"; +import type { RealtimeCallHandler } from "./webhook/realtime-handler.js"; const MAX_WEBHOOK_BODY_BYTES = 1024 * 1024; @@ -60,6 +61,9 @@ export class VoiceCallWebhookServer { /** Media stream handler for bidirectional audio (when streaming enabled) */ private mediaStreamHandler: MediaStreamHandler | null = null; + /** Realtime voice handler — present when config.realtime.enabled is true */ + private realtimeHandler: RealtimeCallHandler | null = null; + constructor( config: VoiceCallConfig, manager: CallManager, @@ -84,6 +88,13 @@ export class VoiceCallWebhookServer { return this.mediaStreamHandler; } + /** + * Wire the realtime call handler (called from runtime.ts before server starts). + */ + setRealtimeHandler(handler: RealtimeCallHandler): void { + this.realtimeHandler = handler; + } + /** * Initialize media streaming with OpenAI Realtime STT. */ @@ -229,9 +240,15 @@ export class VoiceCallWebhookServer { }); }); - // Handle WebSocket upgrades for media streams - if (this.mediaStreamHandler) { + // Handle WebSocket upgrades for realtime voice and media streams + if (this.realtimeHandler || this.mediaStreamHandler) { this.server.on("upgrade", (request, socket, head) => { + // Realtime voice takes precedence when the path matches + if (this.realtimeHandler && this.isRealtimeMode(request)) { + console.log("[voice-call] WebSocket upgrade for realtime voice"); + this.realtimeHandler.handleWebSocketUpgrade(request, socket, head); + return; + } const path = this.getUpgradePathname(request); if (path === streamPath) { console.log("[voice-call] WebSocket upgrade for media stream"); @@ -338,10 +355,29 @@ export class VoiceCallWebhookServer { this.writeWebhookResponse(res, payload); } + /** + * Returns true for WebSocket upgrade paths that belong to the realtime handler. + * Used only for upgrade routing — not for the inbound HTTP webhook POST. + */ + private isRealtimeMode(req: http.IncomingMessage): boolean { + return (req.url ?? "/").includes("/realtime"); + } + private async runWebhookPipeline( req: http.IncomingMessage, webhookPath: string, ): Promise { + // Realtime mode: whenever the realtime handler is active, ALL inbound calls + // use it. The handler returns TwiML so Twilio opens a + // WebSocket to the /voice/stream/realtime path, which is routed back here + // via the upgrade handler's isRealtimeMode() check. + if (this.realtimeHandler && req.method === "POST") { + const url = buildRequestUrl(req.url, req.headers.host); + if (this.isWebhookPathMatch(url.pathname, webhookPath)) { + return this.realtimeHandler.buildTwiMLPayload(req); + } + } + const url = buildRequestUrl(req.url, req.headers.host); if (url.pathname === "/voice/hold-music") { diff --git a/extensions/voice-call/src/webhook/realtime-handler.ts b/extensions/voice-call/src/webhook/realtime-handler.ts new file mode 100644 index 00000000000..2fa6079696a --- /dev/null +++ b/extensions/voice-call/src/webhook/realtime-handler.ts @@ -0,0 +1,289 @@ +import http from "node:http"; +import type { Duplex } from "node:stream"; +import { type WebSocket, Server as WebSocketServer } from "ws"; +import type { VoiceCallRealtimeConfig } from "../config.js"; +import type { CoreConfig } from "../core-bridge.js"; +import type { CallManager } from "../manager.js"; +import { + OpenAIRealtimeVoiceBridge, + type RealtimeTool, +} from "../providers/openai-realtime-voice.js"; +import type { VoiceCallProvider } from "../providers/base.js"; +import type { NormalizedEvent } from "../types.js"; + +export type ToolHandlerFn = (args: unknown, callId: string) => Promise; + +type WebhookResponsePayload = { + statusCode: number; + body: string; + headers?: Record; +}; + +/** + * Handles inbound voice calls bridged directly to the OpenAI Realtime API. + * + * Responsibilities: + * - Accept WebSocket upgrades from Twilio Media Streams at the /realtime path + * - Return TwiML payload for the initial HTTP webhook + * - Register each call with CallManager (appears in voice status/history) + * - Route tool calls to registered handlers (Phase 5 tool framework) + */ +export class RealtimeCallHandler { + private toolHandlers = new Map(); + + constructor( + private config: VoiceCallRealtimeConfig, + private manager: CallManager, + private provider: VoiceCallProvider, + private coreConfig: CoreConfig | null, + ) {} + + /** + * Handle a WebSocket upgrade request from Twilio for a realtime media stream. + * Called from VoiceCallWebhookServer's upgrade handler when isRealtimeMode() is true. + */ + handleWebSocketUpgrade(request: http.IncomingMessage, socket: Duplex, head: Buffer): void { + const wss = new WebSocketServer({ noServer: true }); + wss.handleUpgrade(request, socket, head, (ws) => { + let bridge: OpenAIRealtimeVoiceBridge | null = null; + let initialized = false; + + ws.on("message", (data: Buffer) => { + try { + const msg = JSON.parse(data.toString()) as Record; + if (!initialized && msg.event === "start") { + initialized = true; + const startData = msg.start as Record | undefined; + const streamSid = startData?.streamSid || "unknown"; + const callSid = startData?.callSid || "unknown"; + bridge = this.handleCall(streamSid, callSid, ws); + } else if (bridge) { + const mediaData = msg.media as Record | undefined; + if (msg.event === "media" && mediaData?.payload) { + bridge.sendAudio(Buffer.from(mediaData.payload as string, "base64")); + if (mediaData.timestamp) { + bridge.setMediaTimestamp(Number(mediaData.timestamp)); + } + } else if (msg.event === "mark") { + bridge.acknowledgeMark(); + } else if (msg.event === "stop") { + bridge.close(); + } + } + } catch (err) { + console.error("[voice-call] Error parsing WS message:", err); + } + }); + + ws.on("close", () => { + bridge?.close(); + }); + }); + } + + /** + * Build the TwiML response payload for a realtime call. + * The WebSocket URL is derived from the incoming request host so no hostname + * is hardcoded. + */ + buildTwiMLPayload(req: http.IncomingMessage): WebhookResponsePayload { + const host = req.headers.host || "localhost:8443"; + const wsUrl = `wss://${host}/voice/stream/realtime`; + console.log(`[voice-call] Returning realtime TwiML with WebSocket: ${wsUrl}`); + const twiml = ` + + + + +`; + return { + statusCode: 200, + headers: { "Content-Type": "text/xml" }, + body: twiml, + }; + } + + /** + * Register a named tool handler to be called when the model invokes a function. + * Must be called before calls begin. + * + * @param name - Function name as declared in config.realtime.tools + * @param fn - Async handler receiving (parsedArgs, internalCallId); return value + * is submitted back to the model as the tool result. + */ + registerToolHandler(name: string, fn: ToolHandlerFn): void { + this.toolHandlers.set(name, fn); + } + + // --------------------------------------------------------------------------- + // Private + // --------------------------------------------------------------------------- + + /** + * Create and start the OpenAI Realtime bridge for a single call session. + * Registers the call with CallManager so it appears in status/history. + * Returns the bridge (or null on fatal config error). + */ + private handleCall( + streamSid: string, + callSid: string, + ws: WebSocket, + ): OpenAIRealtimeVoiceBridge | null { + const apiKey = process.env.OPENAI_API_KEY; + if (!apiKey) { + console.error("[voice-call] No OPENAI_API_KEY for realtime call"); + ws.close(1011, "No API key"); + return null; + } + + const callId = this.registerCallInManager(callSid); + console.log(`[voice-call] Realtime call: streamSid=${streamSid}, callSid=${callSid}, callId=${callId}`); + + // Declare as null first so closures can capture the reference before bridge is created. + // By the time any callback fires, bridge will be fully assigned. + let bridge: OpenAIRealtimeVoiceBridge | null = null; + + bridge = new OpenAIRealtimeVoiceBridge({ + apiKey, + model: this.config.model, + voice: this.config.voice, + instructions: this.config.instructions, + temperature: this.config.temperature, + vadThreshold: this.config.vadThreshold, + silenceDurationMs: this.config.silenceDurationMs, + prefixPaddingMs: this.config.prefixPaddingMs, + tools: this.config.tools as RealtimeTool[], + + onAudio: (muLaw) => { + ws.send( + JSON.stringify({ + event: "media", + streamSid, + media: { payload: muLaw.toString("base64") }, + }), + ); + }, + + onClearAudio: () => { + ws.send(JSON.stringify({ event: "clear", streamSid })); + }, + + onTranscript: (role, text, isFinal) => { + if (isFinal) { + // Emit user speech through the manager for transcript persistence + if (role === "user") { + const event: NormalizedEvent = { + id: `realtime-speech-${callSid}-${Date.now()}`, + type: "call.speech", + callId, + providerCallId: callSid, + timestamp: Date.now(), + transcript: text, + isFinal: true, + }; + this.manager.processEvent(event); + } + } + }, + + onToolCall: (toolEvent) => { + if (bridge) { + void this.executeToolCall( + bridge, + callId, + toolEvent.callId, + toolEvent.name, + toolEvent.args, + ); + } + }, + + onReady: () => { + bridge?.triggerGreeting(); + }, + + onError: (err) => { + console.error("[voice-call] Realtime error:", err.message); + }, + + onClose: () => { + this.endCallInManager(callSid, callId); + }, + }); + + bridge.connect().catch((err: Error) => { + console.error("[voice-call] Failed to connect realtime bridge:", err); + ws.close(1011, "Failed to connect"); + }); + + // Acknowledge the stream connection (mirrors Twilio Media Streams protocol) + ws.send(JSON.stringify({ event: "connected", protocol: "Call", version: "1.0.0" })); + + return bridge; + } + + /** + * Emit synthetic NormalizedEvents to register the call with CallManager. + * Returns the internal callId generated by the manager. + */ + private registerCallInManager(callSid: string): string { + const now = Date.now(); + const baseFields = { + providerCallId: callSid, + timestamp: now, + direction: "inbound" as const, + }; + + // call.initiated causes the manager to auto-create the call record + // (see manager/events.ts createWebhookCall path) + this.manager.processEvent({ + id: `realtime-initiated-${callSid}`, + callId: callSid, + type: "call.initiated", + ...baseFields, + }); + + this.manager.processEvent({ + id: `realtime-answered-${callSid}`, + callId: callSid, + type: "call.answered", + ...baseFields, + }); + + // Resolve the manager-generated internal callId + const call = this.manager.getCallByProviderCallId(callSid); + return call?.callId ?? callSid; + } + + private endCallInManager(callSid: string, callId: string): void { + this.manager.processEvent({ + id: `realtime-ended-${callSid}-${Date.now()}`, + type: "call.ended", + callId, + providerCallId: callSid, + timestamp: Date.now(), + reason: "completed", + }); + } + + private async executeToolCall( + bridge: OpenAIRealtimeVoiceBridge, + callId: string, + bridgeCallId: string, + name: string, + args: unknown, + ): Promise { + const handler = this.toolHandlers.get(name); + let result: unknown; + if (handler) { + try { + result = await handler(args, callId); + } catch (err) { + result = { error: err instanceof Error ? err.message : String(err) }; + } + } else { + result = { error: `Tool "${name}" not available` }; + } + bridge.submitToolResult(bridgeCallId, result); + } +}