voice-call: add OpenAI Realtime API voice mode

This commit is contained in:
Forrest Blount 2026-03-10 17:10:20 +00:00
parent 261a40dae1
commit 343ab464d5
9 changed files with 1381 additions and 3 deletions

View File

@ -88,7 +88,7 @@ RUN pnpm canvas:a2ui:bundle || \
echo "/* A2UI bundle unavailable in this build */" > src/canvas-host/a2ui/a2ui.bundle.js && \
echo "stub" > src/canvas-host/a2ui/.bundle.hash && \
rm -rf vendor/a2ui apps/shared/OpenClawKit/Tools/CanvasA2UI)
RUN pnpm build:docker
RUN NODE_OPTIONS=--max-old-space-size=2048 pnpm build:docker
# Force pnpm for UI build (Bun may fail on ARM/Synology architectures)
ENV OPENCLAW_PREFER_PNPM=1
RUN pnpm ui:build

View File

@ -24,6 +24,7 @@ services:
ports:
- "${OPENCLAW_GATEWAY_PORT:-18789}:18789"
- "${OPENCLAW_BRIDGE_PORT:-18790}:18790"
- "${OPENCLAW_VOICE_WEBHOOK_PORT:-3334}:3334"
init: true
restart: unless-stopped
command:

View File

@ -134,6 +134,36 @@
"label": "ElevenLabs Base URL",
"advanced": true
},
"realtime.enabled": {
"label": "Enable Realtime Voice (OpenAI)",
"help": "Full voice-to-voice mode via OpenAI Realtime API. Requires inboundPolicy open or allowlist."
},
"realtime.model": {
"label": "Realtime Model",
"advanced": true
},
"realtime.voice": {
"label": "Realtime Voice"
},
"realtime.instructions": {
"label": "Realtime Instructions"
},
"realtime.temperature": {
"label": "Realtime Temperature",
"advanced": true
},
"realtime.vadThreshold": {
"label": "VAD Sensitivity",
"advanced": true
},
"realtime.silenceDurationMs": {
"label": "Silence Timeout (ms)",
"advanced": true
},
"realtime.prefixPaddingMs": {
"label": "Prefix Padding (ms)",
"advanced": true
},
"publicUrl": {
"label": "Public Webhook URL",
"advanced": true
@ -385,6 +415,49 @@
}
}
},
"realtime": {
"type": "object",
"additionalProperties": false,
"properties": {
"enabled": {
"type": "boolean"
},
"model": {
"type": "string"
},
"voice": {
"type": "string",
"enum": ["alloy", "ash", "ballad", "cedar", "coral", "echo", "marin", "sage", "shimmer", "verse"]
},
"instructions": {
"type": "string"
},
"temperature": {
"type": "number",
"minimum": 0,
"maximum": 2
},
"vadThreshold": {
"type": "number",
"minimum": 0,
"maximum": 1
},
"silenceDurationMs": {
"type": "integer",
"minimum": 1
},
"prefixPaddingMs": {
"type": "integer",
"minimum": 0
},
"tools": {
"type": "array",
"items": {
"type": "object"
}
}
}
},
"publicUrl": {
"type": "string"
},

View File

@ -200,6 +200,55 @@ export const OutboundConfigSchema = z
.default({ defaultMode: "notify", notifyHangupDelaySec: 3 });
export type OutboundConfig = z.infer<typeof OutboundConfigSchema>;
// -----------------------------------------------------------------------------
// Realtime Voice Configuration (OpenAI Realtime API — voice-to-voice)
// -----------------------------------------------------------------------------
/**
* Zod schema for a single OpenAI Realtime tool (mirrors the RealtimeTool interface
* in providers/openai-realtime-voice.ts, expressed as a schema for config parsing).
*/
export const RealtimeToolSchema = z
.object({
type: z.literal("function"),
name: z.string(),
description: z.string(),
parameters: z.object({
type: z.literal("object"),
properties: z.record(z.unknown()),
required: z.array(z.string()).optional(),
}),
})
.strict();
export type RealtimeToolConfig = z.infer<typeof RealtimeToolSchema>;
export const VoiceCallRealtimeConfigSchema = z
.object({
/** Enable realtime voice-to-voice mode (OpenAI Realtime API). Default: false. */
enabled: z.boolean().default(false),
/** Realtime model (env: REALTIME_VOICE_MODEL, default: "gpt-4o-mini-realtime-preview") */
model: z.string().optional(),
/** Voice for AI speech output (env: REALTIME_VOICE_VOICE) */
voice: z
.enum(["alloy", "ash", "ballad", "cedar", "coral", "echo", "marin", "sage", "shimmer", "verse"])
.optional(),
/** System instructions / persona (env: REALTIME_VOICE_INSTRUCTIONS) */
instructions: z.string().optional(),
/** Temperature 02 (env: REALTIME_VOICE_TEMPERATURE) */
temperature: z.number().min(0).max(2).optional(),
/** VAD threshold 01 (env: VAD_THRESHOLD) */
vadThreshold: z.number().min(0).max(1).optional(),
/** Silence duration in ms before turn ends (env: SILENCE_DURATION_MS) */
silenceDurationMs: z.number().int().positive().optional(),
/** Audio padding before speech in ms */
prefixPaddingMs: z.number().int().nonnegative().optional(),
/** Tool definitions (OpenAI function-call schema); execution wired via registerToolHandler */
tools: z.array(RealtimeToolSchema).default([]),
})
.strict()
.default({ enabled: false, tools: [] });
export type VoiceCallRealtimeConfig = z.infer<typeof VoiceCallRealtimeConfigSchema>;
// -----------------------------------------------------------------------------
// Streaming Configuration (OpenAI Realtime STT)
// -----------------------------------------------------------------------------
@ -324,6 +373,9 @@ export const VoiceCallConfigSchema = z
/** Real-time audio streaming configuration */
streaming: VoiceCallStreamingConfigSchema,
/** Realtime voice-to-voice configuration (OpenAI Realtime API) */
realtime: VoiceCallRealtimeConfigSchema,
/** Public webhook URL override (if set, bypasses tunnel auto-detection) */
publicUrl: z.string().url().optional(),
@ -398,6 +450,11 @@ export function normalizeVoiceCallConfig(config: VoiceCallConfigInput): VoiceCal
config.webhookSecurity?.trustedProxyIPs ?? defaults.webhookSecurity.trustedProxyIPs,
},
streaming: { ...defaults.streaming, ...config.streaming },
realtime: {
...defaults.realtime,
...config.realtime,
tools: config.realtime?.tools ?? defaults.realtime.tools,
},
stt: { ...defaults.stt, ...config.stt },
tts: normalizeVoiceCallTtsConfig(defaults.tts, config.tts),
};
@ -453,6 +510,28 @@ export function resolveVoiceCallConfig(config: VoiceCallConfigInput): VoiceCallC
resolved.webhookSecurity.trustForwardingHeaders ?? false;
resolved.webhookSecurity.trustedProxyIPs = resolved.webhookSecurity.trustedProxyIPs ?? [];
// Realtime voice — resolve env var fallbacks
resolved.realtime = { ...resolved.realtime };
// REALTIME_VOICE_ENABLED=true auto-enables realtime mode (backward compat)
if (!resolved.realtime.enabled && process.env.REALTIME_VOICE_ENABLED === "true") {
resolved.realtime.enabled = true;
}
resolved.realtime.model = resolved.realtime.model ?? process.env.REALTIME_VOICE_MODEL;
resolved.realtime.voice =
(resolved.realtime.voice ??
(process.env.REALTIME_VOICE_VOICE as VoiceCallRealtimeConfig["voice"])) || undefined;
resolved.realtime.instructions =
resolved.realtime.instructions ?? process.env.REALTIME_VOICE_INSTRUCTIONS;
if (resolved.realtime.temperature == null && process.env.REALTIME_VOICE_TEMPERATURE) {
resolved.realtime.temperature = parseFloat(process.env.REALTIME_VOICE_TEMPERATURE);
}
if (resolved.realtime.vadThreshold == null && process.env.VAD_THRESHOLD) {
resolved.realtime.vadThreshold = parseFloat(process.env.VAD_THRESHOLD);
}
if (resolved.realtime.silenceDurationMs == null && process.env.SILENCE_DURATION_MS) {
resolved.realtime.silenceDurationMs = parseInt(process.env.SILENCE_DURATION_MS, 10);
}
return normalizeVoiceCallConfig(resolved);
}
@ -521,5 +600,24 @@ export function validateProviderConfig(config: VoiceCallConfig): {
}
}
// Realtime mode requires inbound calls to be accepted — policy "disabled"
// means the manager will reject every call before it can be tracked.
// "open" or "allowlist" are the correct choices when realtime.enabled = true.
if (config.realtime?.enabled && config.inboundPolicy === "disabled") {
errors.push(
"plugins.entries.voice-call.config.inboundPolicy must not be \"disabled\" when realtime.enabled is true " +
"(use \"open\" or \"allowlist\" — realtime calls are answered before policy can reject them)",
);
}
// Both streaming and realtime cannot be enabled simultaneously — they use
// incompatible WebSocket paths and audio routing.
if (config.realtime?.enabled && config.streaming?.enabled) {
errors.push(
"plugins.entries.voice-call.config: realtime.enabled and streaming.enabled cannot both be true " +
"(they use incompatible audio paths — choose one mode)",
);
}
return { valid: errors.length === 0, errors };
}

View File

@ -0,0 +1,863 @@
/**
* OpenAI Realtime Voice Bridge
*
* Implements a bidirectional voice-to-voice bridge between Twilio Media Streams
* and the OpenAI Realtime API. Replaces the STT LLM TTS pipeline with a
* single WebSocket session that handles everything natively.
*
* Key benefits over the STT-only approach:
* - Latency: ~200400 ms TTFB vs ~13.5 s in the pipeline mode
* - Audio format: g711_ulaw (mulaw) is natively supported zero conversion
* - Barge-in: server VAD handles interruptions automatically
* - No separate LLM or TTS call required
*
* Usage:
* const bridge = new OpenAIRealtimeVoiceBridge({
* apiKey: process.env.OPENAI_API_KEY!,
* instructions: "You are Gracie, a helpful AI assistant...",
* voice: "nova",
* onAudio: (muLaw) => mediaStreamHandler.sendAudio(streamSid, muLaw),
* onClearAudio: () => mediaStreamHandler.clearAudio(streamSid),
* onTranscript: (role, text) => console.log(`[${role}]: ${text}`),
* });
* await bridge.connect();
* bridge.sendAudio(twilioPayloadBuffer);
* bridge.close();
*
* Integration with media-stream.ts:
* Replace `sttSession` with this bridge in `MediaStreamHandler.handleStart()`.
* Wire audio in/out through the existing sendAudio / clearAudio methods.
*
* @see https://platform.openai.com/docs/guides/realtime
* @see https://www.twilio.com/docs/voice/media-streams
*/
import WebSocket from "ws";
// ---------------------------------------------------------------------------
// Types
// ---------------------------------------------------------------------------
/** OpenAI Realtime API voice options */
export type RealtimeVoice =
| "alloy"
| "ash"
| "ballad"
| "coral"
| "echo"
| "fable"
| "onyx"
| "nova"
| "sage"
| "shimmer"
| "verse";
/** Realtime tool definition (mirrors OpenAI function calling schema) */
export interface RealtimeTool {
type: "function";
name: string;
description: string;
parameters: {
type: "object";
properties: Record<string, unknown>;
required?: string[];
};
}
/** Tool call event emitted when OpenAI invokes a function */
export interface ToolCallEvent {
/** Conversation item ID for submitting the result */
itemId: string;
/** Call ID for matching request/response */
callId: string;
/** Function name */
name: string;
/** Parsed JSON arguments */
args: unknown;
}
/**
* Configuration for the Realtime Voice Bridge.
*/
export interface RealtimeVoiceConfig {
// ---- Required ----
/** OpenAI API key */
apiKey: string;
// ---- Voice/personality ----
/** System instructions (persona / behaviour) */
instructions?: string;
/** Voice to use for AI speech output (default: "nova") */
voice?: RealtimeVoice;
/** Response temperature 01 (default: 0.8) */
temperature?: number;
// ---- Model ----
/** Realtime model (default: "gpt-4o-mini-realtime-preview") */
model?: string;
// ---- VAD ----
/** VAD speech detection threshold 01 (default: 0.5) */
vadThreshold?: number;
/** Silence duration in ms before turn ends (default: 500) */
silenceDurationMs?: number;
/** Padding before speech in ms (default: 300) */
prefixPaddingMs?: number;
// ---- Tools ----
/** Optional function tools the model can call */
tools?: RealtimeTool[];
// ---- Audio callbacks ----
/**
* Called for each audio delta chunk from OpenAI.
* @param muLaw - Raw mulaw Buffer ready to send to Twilio
*/
onAudio: (muLaw: Buffer) => void;
/**
* Called on barge-in (user speech detected mid-response).
* Clear Twilio audio buffer here.
*/
onClearAudio: () => void;
// ---- Event callbacks (optional) ----
/**
* Transcript event (partial or final). Role is "user" or "assistant".
*/
onTranscript?: (role: "user" | "assistant", text: string, isFinal: boolean) => void;
/**
* Called when the model invokes a tool/function.
* Your handler should call `bridge.submitToolResult(event.callId, result)`.
*/
onToolCall?: (event: ToolCallEvent) => void;
/**
* Called when the session is fully connected and configured.
*/
onReady?: () => void;
/**
* Called on irrecoverable error or max reconnects exceeded.
*/
onError?: (error: Error) => void;
/**
* Called when the bridge closes (intentionally or after max retries).
*/
onClose?: () => void;
}
// ---------------------------------------------------------------------------
// Internal helpers
// ---------------------------------------------------------------------------
function base64ToBuffer(b64: string): Buffer {
return Buffer.from(b64, "base64");
}
// ---------------------------------------------------------------------------
// Main class
// ---------------------------------------------------------------------------
/**
* Bidirectional voice bridge between Twilio Media Streams and the OpenAI Realtime API.
*
* Lifecycle:
* new OpenAIRealtimeVoiceBridge(config)
* connect() opens WebSocket, configures session
* sendAudio() called for each Twilio media chunk
* [callbacks fire as OpenAI responds]
* close() graceful shutdown
*/
export class OpenAIRealtimeVoiceBridge {
private static readonly DEFAULT_MODEL = "gpt-4o-mini-realtime-preview";
private static readonly MAX_RECONNECT_ATTEMPTS = 5;
private static readonly BASE_RECONNECT_DELAY_MS = 1000;
private static readonly CONNECT_TIMEOUT_MS = 10_000;
private readonly config: RealtimeVoiceConfig;
private readonly model: string;
private ws: WebSocket | null = null;
private connected = false;
private intentionallyClosed = false;
private reconnectAttempts = 0;
/** Pending audio buffers queued while reconnecting */
private pendingAudio: Buffer[] = [];
/** Track mark queue for barge-in timing (mirrors reference impl) */
private markQueue: string[] = [];
private responseStartTimestamp: number | null = null;
private latestMediaTimestamp = 0;
private lastAssistantItemId: string | null = null;
/** Accumulate tool call arguments (streamed as deltas) */
private toolCallBuffers = new Map<string, { name: string; callId: string; args: string }>();
constructor(config: RealtimeVoiceConfig) {
if (!config.apiKey) {
throw new Error("[RealtimeVoice] OpenAI API key is required");
}
this.config = config;
this.model = config.model ?? OpenAIRealtimeVoiceBridge.DEFAULT_MODEL;
}
// -------------------------------------------------------------------------
// Public API
// -------------------------------------------------------------------------
/**
* Connect to the OpenAI Realtime API.
* Resolves when the WebSocket is open and session.update has been sent.
* Throws if connection times out.
*/
async connect(): Promise<void> {
this.intentionallyClosed = false;
this.reconnectAttempts = 0;
return this.doConnect();
}
/**
* Send a mulaw audio chunk from Twilio to OpenAI.
* Buffers chunks if not yet connected; drains on reconnect.
*
* @param audio - Raw mulaw Buffer (Twilio media.payload decoded from base64)
*/
sendAudio(audio: Buffer): void {
if (!this.connected || this.ws?.readyState !== WebSocket.OPEN) {
// Buffer up to 2 seconds of audio (~320 chunks × 20ms) while reconnecting
if (this.pendingAudio.length < 320) {
this.pendingAudio.push(audio);
}
return;
}
this.sendEvent({
type: "input_audio_buffer.append",
audio: audio.toString("base64"),
});
}
/**
* Update the media timestamp (used for barge-in truncation calculations).
* Call this with data.media.timestamp from each Twilio media event.
*/
setMediaTimestamp(ts: number): void {
this.latestMediaTimestamp = ts;
}
/**
* Inject a user text message into the conversation (optional).
* Useful for seeding context or simulating a greeting trigger.
*/
sendUserMessage(text: string): void {
this.sendEvent({
type: "conversation.item.create",
item: {
type: "message",
role: "user",
content: [{ type: "input_text", text }],
},
});
this.sendEvent({ type: "response.create" });
}
/**
* Submit a tool/function result back to the model.
* Must be called in response to an `onToolCall` event.
*
* @param callId - The call_id from the ToolCallEvent
* @param result - JSON-serializable result value
*/
submitToolResult(callId: string, result: unknown): void {
this.sendEvent({
type: "conversation.item.create",
item: {
type: "function_call_output",
call_id: callId,
output: JSON.stringify(result),
},
});
// Trigger AI to respond now that the tool result is available
this.sendEvent({ type: "response.create" });
}
/**
* Gracefully close the bridge.
*/
close(): void {
this.intentionallyClosed = true;
this.connected = false;
if (this.ws) {
this.ws.close(1000, "Bridge closed");
this.ws = null;
}
this.config.onClose?.();
}
/** True if the WebSocket is open and the session is configured. */
isConnected(): boolean {
return this.connected;
}
// -------------------------------------------------------------------------
// Connection management
// -------------------------------------------------------------------------
private async doConnect(): Promise<void> {
return new Promise((resolve, reject) => {
const url = `wss://api.openai.com/v1/realtime?model=${encodeURIComponent(this.model)}`;
console.log(`[RealtimeVoice] Connecting to ${url}`);
this.ws = new WebSocket(url, {
headers: {
Authorization: `Bearer ${this.config.apiKey}`,
"OpenAI-Beta": "realtime=v1",
},
});
const connectTimeout = setTimeout(() => {
if (!this.connected) {
this.ws?.terminate();
reject(new Error("[RealtimeVoice] Connection timeout"));
}
}, OpenAIRealtimeVoiceBridge.CONNECT_TIMEOUT_MS);
this.ws.on("open", () => {
clearTimeout(connectTimeout);
console.log("[RealtimeVoice] WebSocket connected");
this.connected = true;
this.reconnectAttempts = 0;
// Small delay to ensure the server is ready before sending session.update
// (mirrors the reference implementation's setTimeout(initializeSession, 100))
setTimeout(() => {
this.sendSessionUpdate();
this.drainPendingAudio();
this.config.onReady?.();
resolve();
}, 100);
});
this.ws.on("message", (data: Buffer) => {
try {
const event = JSON.parse(data.toString()) as RealtimeEvent;
this.handleEvent(event);
} catch (err) {
console.error("[RealtimeVoice] Failed to parse event:", err);
}
});
this.ws.on("error", (err) => {
console.error("[RealtimeVoice] WebSocket error:", err);
if (!this.connected) {
clearTimeout(connectTimeout);
reject(err);
} else {
this.config.onError?.(err instanceof Error ? err : new Error(String(err)));
}
});
this.ws.on("close", (code, reason) => {
console.log(
`[RealtimeVoice] WebSocket closed (code: ${code}, reason: ${reason?.toString() || "none"})`,
);
this.connected = false;
this.ws = null;
if (!this.intentionallyClosed) {
void this.attemptReconnect();
} else {
this.config.onClose?.();
}
});
});
}
/**
* Trigger a greeting response from the AI.
* Useful for seeding context or simulating a greeting trigger.
*/
public triggerGreeting(): void {
if (!this.connected || !this.ws) {
console.warn("[RealtimeVoice] Cannot trigger greeting: not connected");
return;
}
const greetingEvent = {
type: "response.create",
response: {
instructions: this.config.instructions,
},
};
this.sendEvent(greetingEvent);
console.log("[RealtimeVoice] Greeting triggered");
}
private sendSessionUpdate(): void {
const cfg = this.config;
const sessionUpdate: RealtimeSessionUpdate = {
type: "session.update",
session: {
modalities: ["text", "audio"],
instructions: cfg.instructions,
voice: cfg.voice ?? "nova",
input_audio_format: "g711_ulaw",
output_audio_format: "g711_ulaw",
input_audio_transcription: {
model: "whisper-1",
},
turn_detection: {
type: "server_vad",
threshold: cfg.vadThreshold ?? 0.5,
prefix_padding_ms: cfg.prefixPaddingMs ?? 300,
silence_duration_ms: cfg.silenceDurationMs ?? 500,
create_response: true,
},
temperature: cfg.temperature ?? 0.8,
...(cfg.tools && cfg.tools.length > 0
? {
tools: cfg.tools,
tool_choice: "auto",
}
: {}),
},
};
console.log("[RealtimeVoice] Sending session.update");
this.sendEvent(sessionUpdate);
}
private drainPendingAudio(): void {
if (this.pendingAudio.length === 0) return;
console.log(`[RealtimeVoice] Draining ${this.pendingAudio.length} buffered audio chunks`);
for (const buf of this.pendingAudio) {
this.sendEvent({
type: "input_audio_buffer.append",
audio: buf.toString("base64"),
});
}
this.pendingAudio = [];
}
private async attemptReconnect(): Promise<void> {
if (this.intentionallyClosed) return;
if (
this.reconnectAttempts >= OpenAIRealtimeVoiceBridge.MAX_RECONNECT_ATTEMPTS
) {
const err = new Error(
`[RealtimeVoice] Max reconnect attempts (${OpenAIRealtimeVoiceBridge.MAX_RECONNECT_ATTEMPTS}) exceeded`,
);
console.error(err.message);
this.config.onError?.(err);
this.config.onClose?.();
return;
}
this.reconnectAttempts++;
const delay =
OpenAIRealtimeVoiceBridge.BASE_RECONNECT_DELAY_MS *
2 ** (this.reconnectAttempts - 1);
console.log(
`[RealtimeVoice] Reconnecting (${this.reconnectAttempts}/${OpenAIRealtimeVoiceBridge.MAX_RECONNECT_ATTEMPTS}) in ${delay}ms...`,
);
await new Promise<void>((resolve) => setTimeout(resolve, delay));
if (this.intentionallyClosed) return;
try {
await this.doConnect();
console.log("[RealtimeVoice] Reconnected successfully");
} catch (err) {
console.error("[RealtimeVoice] Reconnect failed:", err);
// doConnect's close handler will call attemptReconnect again
}
}
// -------------------------------------------------------------------------
// Event handling
// -------------------------------------------------------------------------
private handleEvent(event: RealtimeEvent): void {
switch (event.type) {
// ---- Session lifecycle ----
case "session.created":
console.log("[RealtimeVoice] Session created");
break;
case "session.updated":
console.log("[RealtimeVoice] Session updated");
break;
// ---- Audio output: stream audio back to Twilio ----
case "response.audio.delta": {
if (!event.delta) break;
const audioBuffer = base64ToBuffer(event.delta);
this.config.onAudio(audioBuffer);
// Track response start timestamp for barge-in truncation
if (this.responseStartTimestamp === null) {
this.responseStartTimestamp = this.latestMediaTimestamp;
}
// Track the most recent assistant item ID
if (event.item_id) {
this.lastAssistantItemId = event.item_id;
}
// Send mark to track playback position
this.sendMark();
break;
}
case "response.audio.done":
console.log("[RealtimeVoice] Audio response complete");
break;
// ---- Barge-in: user started speaking, interrupt AI response ----
case "input_audio_buffer.speech_started":
console.log("[RealtimeVoice] Barge-in detected — clearing audio");
this.handleBargein();
break;
case "input_audio_buffer.speech_stopped":
console.log("[RealtimeVoice] Speech stopped");
break;
case "input_audio_buffer.committed":
console.log("[RealtimeVoice] Audio buffer committed");
break;
// ---- Mark acknowledgment from Twilio ----
case "response.audio_transcript.delta":
// AI speech transcript streaming (not an event we send to Twilio)
if (event.delta) {
this.config.onTranscript?.("assistant", event.delta, false);
}
break;
case "response.audio_transcript.done":
if (event.transcript) {
console.log(`[RealtimeVoice] Assistant: ${event.transcript}`);
this.config.onTranscript?.("assistant", event.transcript, true);
}
break;
// ---- User speech transcription (if text modality enabled) ----
case "conversation.item.input_audio_transcription.completed":
if (event.transcript) {
console.log(`[RealtimeVoice] User: ${event.transcript}`);
this.config.onTranscript?.("user", event.transcript, true);
}
break;
case "conversation.item.input_audio_transcription.delta":
if (event.delta) {
this.config.onTranscript?.("user", event.delta, false);
}
break;
// ---- Tool calling ----
case "response.function_call_arguments.delta": {
const key = event.item_id ?? "unknown";
const existing = this.toolCallBuffers.get(key);
if (existing && event.delta) {
existing.args += event.delta;
} else if (event.item_id) {
this.toolCallBuffers.set(event.item_id, {
name: event.name ?? "",
callId: event.call_id ?? "",
args: event.delta ?? "",
});
}
break;
}
case "response.function_call_arguments.done": {
const key = event.item_id ?? "unknown";
const buf = this.toolCallBuffers.get(key);
if (buf && this.config.onToolCall) {
let args: unknown;
try {
args = JSON.parse(buf.args || "{}");
} catch {
args = {};
}
this.config.onToolCall({
itemId: key,
callId: buf.callId || event.call_id || "",
name: buf.name || event.name || "",
args,
});
}
this.toolCallBuffers.delete(key);
break;
}
// ---- Response lifecycle ----
case "response.created":
console.log("[RealtimeVoice] Response started");
break;
case "response.done":
console.log("[RealtimeVoice] Response done");
// Reset mark queue and timestamps for next turn
this.markQueue = [];
this.responseStartTimestamp = null;
this.lastAssistantItemId = null;
break;
case "response.content.done":
// Individual content part done
break;
case "rate_limits.updated":
// Log rate limit info if needed
break;
// ---- Errors ----
case "error": {
const errMsg = event.error
? `${(event.error as { message?: string }).message ?? JSON.stringify(event.error)}`
: "Unknown error";
console.error(`[RealtimeVoice] Error event: ${errMsg}`);
this.config.onError?.(new Error(errMsg));
break;
}
default:
// Uncomment for debugging:
// console.log(`[RealtimeVoice] Unhandled event: ${event.type}`);
break;
}
}
/**
* Handle barge-in: truncate the current assistant response at the
* elapsed audio point, clear the Twilio buffer, and reset state.
* Mirrors the reference implementation's handleSpeechStartedEvent().
*/
private handleBargein(): void {
if (this.markQueue.length > 0 && this.responseStartTimestamp !== null) {
const elapsedMs = this.latestMediaTimestamp - this.responseStartTimestamp;
if (this.lastAssistantItemId) {
// Tell OpenAI to truncate the response at the point where the user
// interrupted — this ensures the AI's context matches what was heard
const truncateEvent = {
type: "conversation.item.truncate",
item_id: this.lastAssistantItemId,
content_index: 0,
audio_end_ms: Math.max(0, elapsedMs),
};
console.log(`[RealtimeVoice] Truncating at ${elapsedMs}ms`);
this.sendEvent(truncateEvent);
}
// Clear the audio already queued in Twilio's buffer
this.config.onClearAudio();
// Reset state
this.markQueue = [];
this.lastAssistantItemId = null;
this.responseStartTimestamp = null;
} else {
// Even if we have no mark queue, still clear audio to be safe
this.config.onClearAudio();
}
}
/**
* Send a mark event to Twilio to track audio playback position.
* The mark name is used to coordinate barge-in truncation.
*/
private sendMark(): void {
// We don't send marks directly — the caller does via onAudio
// We track mark queue internally for truncation calculations
this.markQueue.push(`audio-${Date.now()}`);
}
/**
* Handle Twilio mark acknowledgment (when a mark event comes back from Twilio).
* Call this method when you receive a "mark" event from the Twilio WebSocket.
*/
acknowledgeMark(): void {
if (this.markQueue.length > 0) {
this.markQueue.shift();
}
}
// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------
private sendEvent(event: unknown): void {
if (this.ws?.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify(event));
} else {
console.warn("[RealtimeVoice] Attempted to send event while disconnected");
}
}
}
// ---------------------------------------------------------------------------
// Provider factory (matches pattern of OpenAIRealtimeSTTProvider)
// ---------------------------------------------------------------------------
/**
* Configuration for the provider factory.
* Holds shared/default settings; per-call config is passed to createSession().
*/
export interface RealtimeVoiceProviderConfig {
/** OpenAI API key */
apiKey: string;
/** Default model (default: "gpt-4o-mini-realtime-preview") */
model?: string;
/** Default voice (default: "nova") */
voice?: RealtimeVoice;
/** Default system instructions */
instructions?: string;
/** Default temperature (default: 0.8) */
temperature?: number;
/** Default VAD threshold (default: 0.5) */
vadThreshold?: number;
/** Default silence duration ms (default: 500) */
silenceDurationMs?: number;
/** Default tools */
tools?: RealtimeTool[];
}
/**
* Factory for creating RealtimeVoiceBridge instances.
* Follows the same pattern as OpenAIRealtimeSTTProvider for easy swapping.
*/
export class OpenAIRealtimeVoiceProvider {
readonly name = "openai-realtime-voice" as const;
constructor(private readonly defaults: RealtimeVoiceProviderConfig) {
if (!defaults.apiKey) {
throw new Error("[RealtimeVoiceProvider] OpenAI API key is required");
}
}
/**
* Create a new voice bridge for a single call session.
* Merges provided config with provider defaults.
*/
createBridge(
callConfig: Omit<RealtimeVoiceConfig, "apiKey"> & Partial<Pick<RealtimeVoiceConfig, "apiKey">>,
): OpenAIRealtimeVoiceBridge {
const merged: RealtimeVoiceConfig = {
apiKey: this.defaults.apiKey,
model: callConfig.model ?? this.defaults.model,
voice: callConfig.voice ?? this.defaults.voice,
instructions: callConfig.instructions ?? this.defaults.instructions,
temperature: callConfig.temperature ?? this.defaults.temperature,
vadThreshold: callConfig.vadThreshold ?? this.defaults.vadThreshold,
silenceDurationMs: callConfig.silenceDurationMs ?? this.defaults.silenceDurationMs,
tools: callConfig.tools ?? this.defaults.tools,
onAudio: callConfig.onAudio,
onClearAudio: callConfig.onClearAudio,
onTranscript: callConfig.onTranscript,
onToolCall: callConfig.onToolCall,
onReady: callConfig.onReady,
onError: callConfig.onError,
onClose: callConfig.onClose,
};
return new OpenAIRealtimeVoiceBridge(merged);
}
}
// ---------------------------------------------------------------------------
// MediaStreamHandler integration helper
// ---------------------------------------------------------------------------
/**
* Minimal interface that the bridge integration needs from MediaStreamHandler.
* This matches the actual MediaStreamHandler's method signatures.
*/
export interface MediaStreamHandlerLike {
sendAudio(streamSid: string, muLaw: Buffer): void;
clearAudio(streamSid: string): void;
sendMark(streamSid: string, name: string): void;
}
/**
* Create a RealtimeVoiceBridge wired to an existing MediaStreamHandler session.
*
* Drop-in helper for use inside media-stream.ts handleStart():
*
* ```typescript
* // In handleStart(), instead of creating an STT session:
* const bridge = createBridgeForStream({
* streamSid,
* handler: this, // MediaStreamHandler instance
* config: {
* apiKey: "...",
* instructions: "You are Gracie...",
* voice: "nova",
* onTranscript: (role, text, final) => {
* if (final && role === "user") config.onTranscript?.(callId, text);
* },
* },
* });
* await bridge.connect();
* ```
*/
export function createBridgeForStream(opts: {
streamSid: string;
handler: MediaStreamHandlerLike;
config: Omit<RealtimeVoiceConfig, "onAudio" | "onClearAudio">;
}): OpenAIRealtimeVoiceBridge {
return new OpenAIRealtimeVoiceBridge({
...opts.config,
onAudio: (muLaw) => {
opts.handler.sendAudio(opts.streamSid, muLaw);
},
onClearAudio: () => {
opts.handler.clearAudio(opts.streamSid);
},
});
}
// ---------------------------------------------------------------------------
// Internal event shape types (partial — only fields we use)
// ---------------------------------------------------------------------------
interface RealtimeEvent {
type: string;
delta?: string;
transcript?: string;
item_id?: string;
call_id?: string;
name?: string;
error?: unknown;
}
interface RealtimeSessionUpdate {
type: "session.update";
session: {
modalities: string[];
instructions?: string;
voice: RealtimeVoice;
input_audio_format: string;
output_audio_format: string;
turn_detection: {
type: "server_vad";
threshold: number;
prefix_padding_ms: number;
silence_duration_ms: number;
create_response: boolean;
};
temperature: number;
tools?: RealtimeTool[];
tool_choice?: string;
};
}

View File

@ -12,6 +12,7 @@ import { createTelephonyTtsProvider } from "./telephony-tts.js";
import { startTunnel, type TunnelResult } from "./tunnel.js";
import { VoiceCallWebhookServer } from "./webhook.js";
import { cleanupTailscaleExposure, setupTailscaleExposure } from "./webhook/tailscale.js";
import { RealtimeCallHandler } from "./webhook/realtime-handler.js";
export type VoiceCallRuntime = {
config: VoiceCallConfig;
@ -20,6 +21,8 @@ export type VoiceCallRuntime = {
webhookServer: VoiceCallWebhookServer;
webhookUrl: string;
publicUrl: string | null;
/** Realtime voice handler — present when config.realtime.enabled is true */
realtimeHandler?: RealtimeCallHandler;
stop: () => Promise<void>;
};
@ -168,6 +171,19 @@ export async function createVoiceCallRuntime(params: {
const webhookServer = new VoiceCallWebhookServer(config, manager, provider, coreConfig);
const lifecycle = createRuntimeResourceLifecycle({ config, webhookServer });
// Wire realtime handler before the server starts so it's ready for upgrades
let realtimeHandler: RealtimeCallHandler | undefined;
if (config.realtime.enabled) {
realtimeHandler = new RealtimeCallHandler(
config.realtime,
manager,
provider,
coreConfig,
);
webhookServer.setRealtimeHandler(realtimeHandler);
log.info("[voice-call] Realtime voice handler initialized");
}
const localUrl = await webhookServer.start();
// Wrap remaining initialization in try/catch so the webhook server is
@ -252,6 +268,7 @@ export async function createVoiceCallRuntime(params: {
webhookServer,
webhookUrl,
publicUrl,
realtimeHandler,
stop,
};
} catch (err) {

View File

@ -40,6 +40,7 @@ export function createVoiceCallBaseConfig(params?: {
maxPendingConnectionsPerIp: 4,
maxConnections: 128,
},
realtime: { enabled: false, tools: [] },
skipSignatureVerification: false,
stt: { provider: "openai", model: "whisper-1" },
tts: {

View File

@ -15,6 +15,7 @@ import { OpenAIRealtimeSTTProvider } from "./providers/stt-openai-realtime.js";
import type { TwilioProvider } from "./providers/twilio.js";
import type { NormalizedEvent, WebhookContext } from "./types.js";
import { startStaleCallReaper } from "./webhook/stale-call-reaper.js";
import type { RealtimeCallHandler } from "./webhook/realtime-handler.js";
const MAX_WEBHOOK_BODY_BYTES = 1024 * 1024;
@ -60,6 +61,9 @@ export class VoiceCallWebhookServer {
/** Media stream handler for bidirectional audio (when streaming enabled) */
private mediaStreamHandler: MediaStreamHandler | null = null;
/** Realtime voice handler — present when config.realtime.enabled is true */
private realtimeHandler: RealtimeCallHandler | null = null;
constructor(
config: VoiceCallConfig,
manager: CallManager,
@ -84,6 +88,13 @@ export class VoiceCallWebhookServer {
return this.mediaStreamHandler;
}
/**
* Wire the realtime call handler (called from runtime.ts before server starts).
*/
setRealtimeHandler(handler: RealtimeCallHandler): void {
this.realtimeHandler = handler;
}
/**
* Initialize media streaming with OpenAI Realtime STT.
*/
@ -229,9 +240,15 @@ export class VoiceCallWebhookServer {
});
});
// Handle WebSocket upgrades for media streams
if (this.mediaStreamHandler) {
// Handle WebSocket upgrades for realtime voice and media streams
if (this.realtimeHandler || this.mediaStreamHandler) {
this.server.on("upgrade", (request, socket, head) => {
// Realtime voice takes precedence when the path matches
if (this.realtimeHandler && this.isRealtimeMode(request)) {
console.log("[voice-call] WebSocket upgrade for realtime voice");
this.realtimeHandler.handleWebSocketUpgrade(request, socket, head);
return;
}
const path = this.getUpgradePathname(request);
if (path === streamPath) {
console.log("[voice-call] WebSocket upgrade for media stream");
@ -338,10 +355,29 @@ export class VoiceCallWebhookServer {
this.writeWebhookResponse(res, payload);
}
/**
* Returns true for WebSocket upgrade paths that belong to the realtime handler.
* Used only for upgrade routing not for the inbound HTTP webhook POST.
*/
private isRealtimeMode(req: http.IncomingMessage): boolean {
return (req.url ?? "/").includes("/realtime");
}
private async runWebhookPipeline(
req: http.IncomingMessage,
webhookPath: string,
): Promise<WebhookResponsePayload> {
// Realtime mode: whenever the realtime handler is active, ALL inbound calls
// use it. The handler returns TwiML <Connect><Stream> so Twilio opens a
// WebSocket to the /voice/stream/realtime path, which is routed back here
// via the upgrade handler's isRealtimeMode() check.
if (this.realtimeHandler && req.method === "POST") {
const url = buildRequestUrl(req.url, req.headers.host);
if (this.isWebhookPathMatch(url.pathname, webhookPath)) {
return this.realtimeHandler.buildTwiMLPayload(req);
}
}
const url = buildRequestUrl(req.url, req.headers.host);
if (url.pathname === "/voice/hold-music") {

View File

@ -0,0 +1,289 @@
import http from "node:http";
import type { Duplex } from "node:stream";
import { type WebSocket, Server as WebSocketServer } from "ws";
import type { VoiceCallRealtimeConfig } from "../config.js";
import type { CoreConfig } from "../core-bridge.js";
import type { CallManager } from "../manager.js";
import {
OpenAIRealtimeVoiceBridge,
type RealtimeTool,
} from "../providers/openai-realtime-voice.js";
import type { VoiceCallProvider } from "../providers/base.js";
import type { NormalizedEvent } from "../types.js";
export type ToolHandlerFn = (args: unknown, callId: string) => Promise<unknown>;
type WebhookResponsePayload = {
statusCode: number;
body: string;
headers?: Record<string, string>;
};
/**
* Handles inbound voice calls bridged directly to the OpenAI Realtime API.
*
* Responsibilities:
* - Accept WebSocket upgrades from Twilio Media Streams at the /realtime path
* - Return TwiML <Connect><Stream> payload for the initial HTTP webhook
* - Register each call with CallManager (appears in voice status/history)
* - Route tool calls to registered handlers (Phase 5 tool framework)
*/
export class RealtimeCallHandler {
private toolHandlers = new Map<string, ToolHandlerFn>();
constructor(
private config: VoiceCallRealtimeConfig,
private manager: CallManager,
private provider: VoiceCallProvider,
private coreConfig: CoreConfig | null,
) {}
/**
* Handle a WebSocket upgrade request from Twilio for a realtime media stream.
* Called from VoiceCallWebhookServer's upgrade handler when isRealtimeMode() is true.
*/
handleWebSocketUpgrade(request: http.IncomingMessage, socket: Duplex, head: Buffer): void {
const wss = new WebSocketServer({ noServer: true });
wss.handleUpgrade(request, socket, head, (ws) => {
let bridge: OpenAIRealtimeVoiceBridge | null = null;
let initialized = false;
ws.on("message", (data: Buffer) => {
try {
const msg = JSON.parse(data.toString()) as Record<string, unknown>;
if (!initialized && msg.event === "start") {
initialized = true;
const startData = msg.start as Record<string, string> | undefined;
const streamSid = startData?.streamSid || "unknown";
const callSid = startData?.callSid || "unknown";
bridge = this.handleCall(streamSid, callSid, ws);
} else if (bridge) {
const mediaData = msg.media as Record<string, unknown> | undefined;
if (msg.event === "media" && mediaData?.payload) {
bridge.sendAudio(Buffer.from(mediaData.payload as string, "base64"));
if (mediaData.timestamp) {
bridge.setMediaTimestamp(Number(mediaData.timestamp));
}
} else if (msg.event === "mark") {
bridge.acknowledgeMark();
} else if (msg.event === "stop") {
bridge.close();
}
}
} catch (err) {
console.error("[voice-call] Error parsing WS message:", err);
}
});
ws.on("close", () => {
bridge?.close();
});
});
}
/**
* Build the TwiML <Connect><Stream> response payload for a realtime call.
* The WebSocket URL is derived from the incoming request host so no hostname
* is hardcoded.
*/
buildTwiMLPayload(req: http.IncomingMessage): WebhookResponsePayload {
const host = req.headers.host || "localhost:8443";
const wsUrl = `wss://${host}/voice/stream/realtime`;
console.log(`[voice-call] Returning realtime TwiML with WebSocket: ${wsUrl}`);
const twiml = `<?xml version="1.0" encoding="UTF-8"?>
<Response>
<Connect>
<Stream url="${wsUrl}" />
</Connect>
</Response>`;
return {
statusCode: 200,
headers: { "Content-Type": "text/xml" },
body: twiml,
};
}
/**
* Register a named tool handler to be called when the model invokes a function.
* Must be called before calls begin.
*
* @param name - Function name as declared in config.realtime.tools
* @param fn - Async handler receiving (parsedArgs, internalCallId); return value
* is submitted back to the model as the tool result.
*/
registerToolHandler(name: string, fn: ToolHandlerFn): void {
this.toolHandlers.set(name, fn);
}
// ---------------------------------------------------------------------------
// Private
// ---------------------------------------------------------------------------
/**
* Create and start the OpenAI Realtime bridge for a single call session.
* Registers the call with CallManager so it appears in status/history.
* Returns the bridge (or null on fatal config error).
*/
private handleCall(
streamSid: string,
callSid: string,
ws: WebSocket,
): OpenAIRealtimeVoiceBridge | null {
const apiKey = process.env.OPENAI_API_KEY;
if (!apiKey) {
console.error("[voice-call] No OPENAI_API_KEY for realtime call");
ws.close(1011, "No API key");
return null;
}
const callId = this.registerCallInManager(callSid);
console.log(`[voice-call] Realtime call: streamSid=${streamSid}, callSid=${callSid}, callId=${callId}`);
// Declare as null first so closures can capture the reference before bridge is created.
// By the time any callback fires, bridge will be fully assigned.
let bridge: OpenAIRealtimeVoiceBridge | null = null;
bridge = new OpenAIRealtimeVoiceBridge({
apiKey,
model: this.config.model,
voice: this.config.voice,
instructions: this.config.instructions,
temperature: this.config.temperature,
vadThreshold: this.config.vadThreshold,
silenceDurationMs: this.config.silenceDurationMs,
prefixPaddingMs: this.config.prefixPaddingMs,
tools: this.config.tools as RealtimeTool[],
onAudio: (muLaw) => {
ws.send(
JSON.stringify({
event: "media",
streamSid,
media: { payload: muLaw.toString("base64") },
}),
);
},
onClearAudio: () => {
ws.send(JSON.stringify({ event: "clear", streamSid }));
},
onTranscript: (role, text, isFinal) => {
if (isFinal) {
// Emit user speech through the manager for transcript persistence
if (role === "user") {
const event: NormalizedEvent = {
id: `realtime-speech-${callSid}-${Date.now()}`,
type: "call.speech",
callId,
providerCallId: callSid,
timestamp: Date.now(),
transcript: text,
isFinal: true,
};
this.manager.processEvent(event);
}
}
},
onToolCall: (toolEvent) => {
if (bridge) {
void this.executeToolCall(
bridge,
callId,
toolEvent.callId,
toolEvent.name,
toolEvent.args,
);
}
},
onReady: () => {
bridge?.triggerGreeting();
},
onError: (err) => {
console.error("[voice-call] Realtime error:", err.message);
},
onClose: () => {
this.endCallInManager(callSid, callId);
},
});
bridge.connect().catch((err: Error) => {
console.error("[voice-call] Failed to connect realtime bridge:", err);
ws.close(1011, "Failed to connect");
});
// Acknowledge the stream connection (mirrors Twilio Media Streams protocol)
ws.send(JSON.stringify({ event: "connected", protocol: "Call", version: "1.0.0" }));
return bridge;
}
/**
* Emit synthetic NormalizedEvents to register the call with CallManager.
* Returns the internal callId generated by the manager.
*/
private registerCallInManager(callSid: string): string {
const now = Date.now();
const baseFields = {
providerCallId: callSid,
timestamp: now,
direction: "inbound" as const,
};
// call.initiated causes the manager to auto-create the call record
// (see manager/events.ts createWebhookCall path)
this.manager.processEvent({
id: `realtime-initiated-${callSid}`,
callId: callSid,
type: "call.initiated",
...baseFields,
});
this.manager.processEvent({
id: `realtime-answered-${callSid}`,
callId: callSid,
type: "call.answered",
...baseFields,
});
// Resolve the manager-generated internal callId
const call = this.manager.getCallByProviderCallId(callSid);
return call?.callId ?? callSid;
}
private endCallInManager(callSid: string, callId: string): void {
this.manager.processEvent({
id: `realtime-ended-${callSid}-${Date.now()}`,
type: "call.ended",
callId,
providerCallId: callSid,
timestamp: Date.now(),
reason: "completed",
});
}
private async executeToolCall(
bridge: OpenAIRealtimeVoiceBridge,
callId: string,
bridgeCallId: string,
name: string,
args: unknown,
): Promise<void> {
const handler = this.toolHandlers.get(name);
let result: unknown;
if (handler) {
try {
result = await handler(args, callId);
} catch (err) {
result = { error: err instanceof Error ? err.message : String(err) };
}
} else {
result = { error: `Tool "${name}" not available` };
}
bridge.submitToolResult(bridgeCallId, result);
}
}