voice-call: fix three realtime review bugs
- Remove direct onClose call from bridge.close() to prevent double-fire (the ws close event handler is the single source of truth for onClose) - Gate realtime TwiML intercept on CallStatus=ringing + Direction=inbound so outbound status callbacks and completed events fall through to normal webhook pipeline - Thread caller From/To from HTTP POST body through stream token nonce map to registerCallInManager so inboundPolicy allowlist checks have caller identity on synthetic call.initiated/call.answered events Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
d457ac8fe4
commit
1ea1695edc
@ -299,7 +299,8 @@ export class OpenAIRealtimeVoiceBridge {
|
||||
this.ws.close(1000, "Bridge closed");
|
||||
this.ws = null;
|
||||
}
|
||||
this.config.onClose?.();
|
||||
// onClose fires from the ws "close" event handler (intentionallyClosed branch)
|
||||
// to avoid double-firing on explicit close().
|
||||
}
|
||||
|
||||
/** True if the WebSocket is open and the session is configured. */
|
||||
|
||||
@ -421,12 +421,16 @@ export class VoiceCallWebhookServer {
|
||||
return { statusCode: 401, body: "Unauthorized" };
|
||||
}
|
||||
|
||||
// Realtime mode: return TwiML <Connect><Stream> after verification so
|
||||
// the request is still authenticated against the provider's signature.
|
||||
// The WebSocket that Twilio opens in response is routed via the upgrade
|
||||
// handler's isRealtimeWebSocketUpgrade() check.
|
||||
// Realtime mode: return TwiML <Connect><Stream> for inbound ringing calls only.
|
||||
// Status callbacks (CallStatus=completed, outbound calls, etc.) must fall
|
||||
// through to the normal webhook pipeline so call state is updated correctly.
|
||||
if (this.realtimeHandler) {
|
||||
return this.realtimeHandler.buildTwiMLPayload(req);
|
||||
const params = new URLSearchParams(ctx.rawBody);
|
||||
const callStatus = params.get("CallStatus");
|
||||
const direction = params.get("Direction");
|
||||
if (callStatus === "ringing" && (!direction || direction === "inbound")) {
|
||||
return this.realtimeHandler.buildTwiMLPayload(req, params);
|
||||
}
|
||||
}
|
||||
|
||||
const parsed = this.provider.parseWebhookEvent(ctx, {
|
||||
|
||||
@ -38,8 +38,9 @@ const STREAM_TOKEN_TTL_MS = 30_000;
|
||||
|
||||
export class RealtimeCallHandler {
|
||||
private toolHandlers = new Map<string, ToolHandlerFn>();
|
||||
/** One-time tokens issued per TwiML response; consumed on WS upgrade. */
|
||||
private pendingStreamTokens = new Map<string, number>();
|
||||
/** One-time tokens issued per TwiML response; consumed on WS upgrade.
|
||||
* Stores expiry + caller metadata so registerCallInManager can include From/To. */
|
||||
private pendingStreamTokens = new Map<string, { expiry: number; from?: string; to?: string }>();
|
||||
|
||||
constructor(
|
||||
private config: VoiceCallRealtimeConfig,
|
||||
@ -61,7 +62,8 @@ export class RealtimeCallHandler {
|
||||
handleWebSocketUpgrade(request: http.IncomingMessage, socket: Duplex, head: Buffer): void {
|
||||
const url = new URL(request.url ?? "/", "wss://localhost");
|
||||
const token = url.searchParams.get("token");
|
||||
if (!token || !this.consumeStreamToken(token)) {
|
||||
const callerMeta = token ? this.consumeStreamToken(token) : null;
|
||||
if (!callerMeta) {
|
||||
console.warn("[voice-call] Rejecting WS upgrade: missing or invalid stream token");
|
||||
socket.write("HTTP/1.1 401 Unauthorized\r\n\r\n");
|
||||
socket.destroy();
|
||||
@ -81,7 +83,7 @@ export class RealtimeCallHandler {
|
||||
const startData = msg.start as Record<string, string> | undefined;
|
||||
const streamSid = startData?.streamSid || "unknown";
|
||||
const callSid = startData?.callSid || "unknown";
|
||||
bridge = this.handleCall(streamSid, callSid, ws);
|
||||
bridge = this.handleCall(streamSid, callSid, ws, callerMeta);
|
||||
} else if (bridge) {
|
||||
const mediaData = msg.media as Record<string, unknown> | undefined;
|
||||
if (msg.event === "media" && mediaData?.payload) {
|
||||
@ -111,10 +113,16 @@ export class RealtimeCallHandler {
|
||||
* The WebSocket URL is derived from the incoming request host so no hostname
|
||||
* is hardcoded. A one-time stream token is embedded in the URL and validated
|
||||
* by handleWebSocketUpgrade to prevent unauthenticated WS connections.
|
||||
*
|
||||
* @param params - Parsed Twilio webhook body params (From/To stored with nonce
|
||||
* so registerCallInManager can populate caller fields).
|
||||
*/
|
||||
buildTwiMLPayload(req: http.IncomingMessage): WebhookResponsePayload {
|
||||
buildTwiMLPayload(req: http.IncomingMessage, params?: URLSearchParams): WebhookResponsePayload {
|
||||
const host = req.headers.host || "localhost:8443";
|
||||
const token = this.issueStreamToken();
|
||||
const token = this.issueStreamToken({
|
||||
from: params?.get("From") ?? undefined,
|
||||
to: params?.get("To") ?? undefined,
|
||||
});
|
||||
const wsUrl = `wss://${host}/voice/stream/realtime?token=${token}`;
|
||||
console.log(`[voice-call] Returning realtime TwiML with WebSocket: wss://${host}/voice/stream/realtime`);
|
||||
const twiml = `<?xml version="1.0" encoding="UTF-8"?>
|
||||
@ -147,22 +155,22 @@ export class RealtimeCallHandler {
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/** Generate a single-use stream token valid for STREAM_TOKEN_TTL_MS. */
|
||||
private issueStreamToken(): string {
|
||||
private issueStreamToken(meta: { from?: string; to?: string } = {}): string {
|
||||
const token = randomUUID();
|
||||
this.pendingStreamTokens.set(token, Date.now() + STREAM_TOKEN_TTL_MS);
|
||||
this.pendingStreamTokens.set(token, { expiry: Date.now() + STREAM_TOKEN_TTL_MS, ...meta });
|
||||
// Evict expired tokens to prevent unbounded growth if calls are abandoned
|
||||
for (const [t, expiry] of this.pendingStreamTokens) {
|
||||
if (Date.now() > expiry) this.pendingStreamTokens.delete(t);
|
||||
for (const [t, entry] of this.pendingStreamTokens) {
|
||||
if (Date.now() > entry.expiry) this.pendingStreamTokens.delete(t);
|
||||
}
|
||||
return token;
|
||||
}
|
||||
|
||||
/** Consume a stream token. Returns true if valid and not yet used. */
|
||||
private consumeStreamToken(token: string): boolean {
|
||||
const expiry = this.pendingStreamTokens.get(token);
|
||||
if (expiry === undefined) return false;
|
||||
/** Consume a stream token. Returns caller metadata if valid, null if not. */
|
||||
private consumeStreamToken(token: string): { from?: string; to?: string } | null {
|
||||
const entry = this.pendingStreamTokens.get(token);
|
||||
if (!entry) return null;
|
||||
this.pendingStreamTokens.delete(token);
|
||||
return Date.now() <= expiry;
|
||||
return Date.now() <= entry.expiry ? { from: entry.from, to: entry.to } : null;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -174,6 +182,7 @@ export class RealtimeCallHandler {
|
||||
streamSid: string,
|
||||
callSid: string,
|
||||
ws: WebSocket,
|
||||
callerMeta: { from?: string; to?: string },
|
||||
): OpenAIRealtimeVoiceBridge | null {
|
||||
const apiKey = this.openaiApiKey ?? process.env.OPENAI_API_KEY;
|
||||
if (!apiKey) {
|
||||
@ -182,7 +191,7 @@ export class RealtimeCallHandler {
|
||||
return null;
|
||||
}
|
||||
|
||||
const callId = this.registerCallInManager(callSid);
|
||||
const callId = this.registerCallInManager(callSid, callerMeta);
|
||||
console.log(`[voice-call] Realtime call: streamSid=${streamSid}, callSid=${callSid}, callId=${callId}`);
|
||||
|
||||
// Declare as null first so closures can capture the reference before bridge is created.
|
||||
@ -275,12 +284,17 @@ export class RealtimeCallHandler {
|
||||
* Tested directly via `as unknown as` cast — the logic is non-trivial
|
||||
* enough to warrant unit testing without promoting to a public method.
|
||||
*/
|
||||
private registerCallInManager(callSid: string): string {
|
||||
private registerCallInManager(
|
||||
callSid: string,
|
||||
callerMeta: { from?: string; to?: string } = {},
|
||||
): string {
|
||||
const now = Date.now();
|
||||
const baseFields = {
|
||||
providerCallId: callSid,
|
||||
timestamp: now,
|
||||
direction: "inbound" as const,
|
||||
...(callerMeta.from ? { from: callerMeta.from } : {}),
|
||||
...(callerMeta.to ? { to: callerMeta.to } : {}),
|
||||
};
|
||||
|
||||
// call.initiated causes the manager to auto-create the call record
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user