Documentation Index
Fetch the complete documentation index at: https://assemblyai.com/docs/llms.txt
Use this file to discover all available pages before exploring further.
Overview
A Large Language Model (LLM) is a machine learning model that uses natural language processing (NLP) to generate text. LLM Gateway is a unified API that provides access to 25+ models from Claude, GPT, Gemini, and more through a single interface. You can use LLM Gateway to analyze streaming audio transcripts in real time, for example to summarize a live conversation or extract action items as they happen. By the end of this tutorial, you’ll be able to use LLM Gateway to analyze a streaming audio transcript from your microphone. Here’s the full sample code for what you’ll build in this tutorial:- Python
- Python SDK
- JavaScript
- JavaScript SDK
import pyaudio
import websocket
import json
import threading
import time
import wave
from urllib.parse import urlencode
from datetime import datetime
# --- Configuration ---
YOUR_API_KEY = "<YOUR_API_KEY>" # Replace with your actual API key
# LLM Gateway Configuration
PROMPT = "Provide a brief summary of the transcript.\n\nTranscript: {{turn}}"
LLM_GATEWAY_CONFIG = {
"model": "claude-sonnet-4-6",
"messages": [
{"role": "user", "content": PROMPT}
],
"max_tokens": 4000
}
CONNECTION_PARAMS = {
"sample_rate": 16000,
"format_turns": True, # Request formatted final transcripts
"speech_model": "u3-rt-pro", # USM 3 Pro model
"llm_gateway": json.dumps(LLM_GATEWAY_CONFIG) # LLM Gateway configuration
}
API_ENDPOINT_BASE_URL = "wss://streaming.assemblyai.com/v3/ws"
API_ENDPOINT = f"{API_ENDPOINT_BASE_URL}?{urlencode(CONNECTION_PARAMS)}"
# Audio Configuration
FRAMES_PER_BUFFER = 800 # 50ms of audio (0.05s * 16000Hz)
SAMPLE_RATE = CONNECTION_PARAMS["sample_rate"]
CHANNELS = 1
FORMAT = pyaudio.paInt16
# Global variables for audio stream and websocket
audio = None
stream = None
ws_app = None
audio_thread = None
stop_event = threading.Event() # To signal the audio thread to stop
# WAV recording variables
recorded_frames = [] # Store audio frames for WAV file
recording_lock = threading.Lock() # Thread-safe access to recorded_frames
def save_wav_file():
"""Save recorded audio frames to a WAV file."""
if not recorded_frames:
print("No audio data recorded.")
return
# Generate filename with timestamp
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"recorded_audio_{timestamp}.wav"
try:
with wave.open(filename, 'wb') as wf:
wf.setnchannels(CHANNELS)
wf.setsampwidth(2) # 16-bit = 2 bytes
wf.setframerate(SAMPLE_RATE)
# Write all recorded frames
with recording_lock:
wf.writeframes(b''.join(recorded_frames))
print(f"Audio saved to: {filename}")
print(f"Duration: {len(recorded_frames) * FRAMES_PER_BUFFER / SAMPLE_RATE:.2f} seconds")
except Exception as e:
print(f"Error saving WAV file: {e}")
# --- WebSocket Event Handlers ---
def on_open(ws):
"""Called when the WebSocket connection is established."""
print("WebSocket connection opened.")
print(f"Connected to: {API_ENDPOINT_BASE_URL}")
# Start sending audio data in a separate thread
def stream_audio():
global stream
print("Starting audio streaming...")
while not stop_event.is_set():
try:
audio_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False)
# Store audio data for WAV recording
with recording_lock:
recorded_frames.append(audio_data)
# Send audio data as binary message
ws.send(audio_data, websocket.ABNF.OPCODE_BINARY)
except Exception as e:
print(f"Error streaming audio: {e}")
# If stream read fails, likely means it's closed, stop the loop
break
print("Audio streaming stopped.")
global audio_thread
audio_thread = threading.Thread(target=stream_audio)
audio_thread.daemon = (
True # Allow main thread to exit even if this thread is running
)
audio_thread.start()
def on_message(ws, message):
try:
data = json.loads(message)
msg_type = data.get('type')
if msg_type == "Begin":
session_id = data.get('id')
expires_at = data.get('expires_at')
print(f"Session started: {session_id}")
elif msg_type == "Turn":
end_of_turn = data.get('end_of_turn', False)
if end_of_turn:
transcript = data.get('transcript', '')
print(f"\nTranscript:\n{transcript}\n")
elif msg_type == "LLMGatewayResponse":
# Extract the LLM response content
llm_data = data.get('data', {})
llm_content = llm_data.get("choices", [{}])[0].get("message", {}).get("content", "")
print(f"LLM Response:\n{llm_content}\n")
elif msg_type == "Termination":
audio_duration = data.get('audio_duration_seconds', 0)
session_duration = data.get('session_duration_seconds', 0)
print(f"Session terminated: {audio_duration} seconds of audio processed")
except json.JSONDecodeError as e:
print(f"Error decoding message: {e}")
except Exception as e:
print(f"Error handling message: {e}")
def on_error(ws, error):
"""Called when a WebSocket error occurs."""
print(f"\nWebSocket Error: {error}")
# Attempt to signal stop on error
stop_event.set()
def on_close(ws, close_status_code, close_msg):
"""Called when the WebSocket connection is closed."""
print(f"\nWebSocket Disconnected: Status={close_status_code}, Msg={close_msg}")
# Save recorded audio to WAV file
save_wav_file()
# Ensure audio resources are released
global stream, audio
stop_event.set() # Signal audio thread just in case it's still running
if stream:
if stream.is_active():
stream.stop_stream()
stream.close()
stream = None
if audio:
audio.terminate()
audio = None
# Try to join the audio thread to ensure clean exit
if audio_thread and audio_thread.is_alive():
audio_thread.join(timeout=1.0)
# --- Main Execution ---
def run():
global audio, stream, ws_app
# Initialize PyAudio
audio = pyaudio.PyAudio()
# Open microphone stream
try:
stream = audio.open(
input=True,
frames_per_buffer=FRAMES_PER_BUFFER,
channels=CHANNELS,
format=FORMAT,
rate=SAMPLE_RATE,
)
print("Microphone stream opened successfully.")
print("Speak into your microphone. Press Ctrl+C to stop.")
print("Audio will be saved to a WAV file when the session ends.")
except Exception as e:
print(f"Error opening microphone stream: {e}")
if audio:
audio.terminate()
return # Exit if microphone cannot be opened
# Create WebSocketApp
ws_app = websocket.WebSocketApp(
API_ENDPOINT,
header={"Authorization": YOUR_API_KEY},
on_open=on_open,
on_message=on_message,
on_error=on_error,
on_close=on_close,
)
# Run WebSocketApp in a separate thread to allow main thread to catch KeyboardInterrupt
ws_thread = threading.Thread(target=ws_app.run_forever)
ws_thread.daemon = True
ws_thread.start()
try:
# Keep main thread alive until interrupted
while ws_thread.is_alive():
time.sleep(0.1)
except KeyboardInterrupt:
print("\nCtrl+C received. Stopping...")
stop_event.set() # Signal audio thread to stop
# Send termination message to the server
if ws_app and ws_app.sock and ws_app.sock.connected:
try:
terminate_message = {"type": "Terminate"}
print(f"Sending termination message: {json.dumps(terminate_message)}")
ws_app.send(json.dumps(terminate_message))
# Give a moment for messages to process before forceful close
time.sleep(5)
except Exception as e:
print(f"Error sending termination message: {e}")
# Close the WebSocket connection (will trigger on_close)
if ws_app:
ws_app.close()
# Wait for WebSocket thread to finish
ws_thread.join(timeout=2.0)
except Exception as e:
print(f"\nAn unexpected error occurred: {e}")
stop_event.set()
if ws_app:
ws_app.close()
ws_thread.join(timeout=2.0)
finally:
# Final cleanup (already handled in on_close, but good as a fallback)
if stream and stream.is_active():
stream.stop_stream()
if stream:
stream.close()
if audio:
audio.terminate()
print("Cleanup complete. Exiting.")
if __name__ == "__main__":
run()
import logging
from typing import Type
import assemblyai as aai
from assemblyai.streaming.v3 import (
BeginEvent,
LLMGatewayResponseEvent,
StreamingClient,
StreamingClientOptions,
StreamingError,
StreamingEvents,
StreamingParameters,
TurnEvent,
TerminationEvent,
)
from assemblyai.streaming.v3.models import LLMGatewayConfig, LLMGatewayMessage
api_key = "<YOUR_API_KEY>"
prompt = "Provide a brief summary of the transcript.\n\nTranscript: {{turn}}"
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def on_begin(self: Type[StreamingClient], event: BeginEvent):
print(f"Session started: {event.id}")
def on_turn(self: Type[StreamingClient], event: TurnEvent):
if event.end_of_turn:
print(f"\nTranscript:\n{event.transcript}\n")
def on_llm_response(self: Type[StreamingClient], event: LLMGatewayResponseEvent):
# Extract the actual LLM response content from the data
llm_content = event.data.get("choices", [{}])[0].get("message", {}).get("content", "")
print(f"LLM Response:\n{llm_content}\n")
def on_terminated(self: Type[StreamingClient], event: TerminationEvent):
print(
f"Session terminated: {event.audio_duration_seconds} seconds of audio processed"
)
def on_error(self: Type[StreamingClient], error: StreamingError):
print(f"Error occurred: {error}")
def main():
client = StreamingClient(
StreamingClientOptions(
api_key=api_key,
api_host="streaming.assemblyai.com",
)
)
client.on(StreamingEvents.Begin, on_begin)
client.on(StreamingEvents.Turn, on_turn)
client.on(StreamingEvents.LLMGatewayResponse, on_llm_response)
client.on(StreamingEvents.Termination, on_terminated)
client.on(StreamingEvents.Error, on_error)
client.connect(
StreamingParameters(
sample_rate=16000,
speech_model="u3-rt-pro",
format_turns=True,
llm_gateway=LLMGatewayConfig(
model="claude-sonnet-4-6",
messages=[
LLMGatewayMessage(role="user", content=prompt)
],
max_tokens=4000
)
)
)
try:
client.stream(
aai.extras.MicrophoneStream(sample_rate=16000)
)
finally:
client.disconnect(terminate=True)
if __name__ == "__main__":
main()
const WebSocket = require("ws");
const mic = require("mic");
const querystring = require("querystring");
const fs = require("fs");
// --- Configuration ---
const YOUR_API_KEY = "<YOUR_API_KEY>"; // Replace with your actual API key
// LLM Gateway Configuration
const PROMPT =
"Provide a brief summary of the transcript.\n\nTranscript: {{turn}}";
const LLM_GATEWAY_CONFIG = {
model: "claude-sonnet-4-6",
messages: [{ role: "user", content: PROMPT }],
max_tokens: 4000,
};
const CONNECTION_PARAMS = {
sample_rate: 16000,
format_turns: true, // Request formatted final transcripts
speech_model: "u3-rt-pro", // USM 3 Pro model
llm_gateway: JSON.stringify(LLM_GATEWAY_CONFIG), // LLM Gateway configuration
};
const API_ENDPOINT_BASE_URL = "wss://streaming.assemblyai.com/v3/ws";
const API_ENDPOINT = `${API_ENDPOINT_BASE_URL}?${querystring.stringify(CONNECTION_PARAMS)}`;
// Audio Configuration
const SAMPLE_RATE = CONNECTION_PARAMS.sample_rate;
const CHANNELS = 1;
// Global variables
let micInstance = null;
let micInputStream = null;
let ws = null;
let stopRequested = false;
// WAV recording variables
let recordedFrames = []; // Store audio frames for WAV file
// --- Helper functions ---
function formatTimestamp(timestamp) {
return new Date(timestamp * 1000).toISOString();
}
function createWavHeader(sampleRate, channels, dataLength) {
const buffer = Buffer.alloc(44);
// RIFF header
buffer.write("RIFF", 0);
buffer.writeUInt32LE(36 + dataLength, 4);
buffer.write("WAVE", 8);
// fmt chunk
buffer.write("fmt ", 12);
buffer.writeUInt32LE(16, 16); // fmt chunk size
buffer.writeUInt16LE(1, 20); // PCM format
buffer.writeUInt16LE(channels, 22);
buffer.writeUInt32LE(sampleRate, 24);
buffer.writeUInt32LE(sampleRate * channels * 2, 28); // byte rate
buffer.writeUInt16LE(channels * 2, 32); // block align
buffer.writeUInt16LE(16, 34); // bits per sample
// data chunk
buffer.write("data", 36);
buffer.writeUInt32LE(dataLength, 40);
return buffer;
}
function saveWavFile() {
if (recordedFrames.length === 0) {
console.log("No audio data recorded.");
return;
}
// Generate filename with timestamp
const timestamp = new Date().toISOString().replace(/[:.]/g, "-").slice(0, 19);
const filename = `recorded_audio_${timestamp}.wav`;
try {
// Combine all recorded frames
const audioData = Buffer.concat(recordedFrames);
const dataLength = audioData.length;
// Create WAV header
const wavHeader = createWavHeader(SAMPLE_RATE, CHANNELS, dataLength);
// Write WAV file
const wavFile = Buffer.concat([wavHeader, audioData]);
fs.writeFileSync(filename, wavFile);
console.log(`Audio saved to: ${filename}`);
console.log(
`Duration: ${(dataLength / (SAMPLE_RATE * CHANNELS * 2)).toFixed(2)} seconds`
);
} catch (error) {
console.error(`Error saving WAV file: ${error}`);
}
}
// --- Main function ---
async function run() {
console.log(
"Starting AssemblyAI streaming transcription with LLM Gateway..."
);
console.log("Audio will be saved to a WAV file when the session ends.");
// Initialize WebSocket connection
ws = new WebSocket(API_ENDPOINT, {
headers: {
Authorization: YOUR_API_KEY,
},
});
// Setup WebSocket event handlers
ws.on("open", () => {
console.log("WebSocket connection opened.");
console.log(`Connected to: ${API_ENDPOINT_BASE_URL}`);
// Start the microphone
startMicrophone();
});
ws.on("message", (message) => {
try {
const data = JSON.parse(message);
const msgType = data.type;
if (msgType === "Begin") {
const sessionId = data.id;
console.log(`Session started: ${sessionId}`);
} else if (msgType === "Turn") {
const endOfTurn = data.end_of_turn;
if (endOfTurn) {
const transcript = data.transcript || "";
console.log(`\nTranscript:\n${transcript}\n`);
}
} else if (msgType === "LLMGatewayResponse") {
// Extract the LLM response content
const llmData = data.data || {};
const llmContent = llmData.choices?.[0]?.message?.content || "";
console.log(`LLM Response:\n${llmContent}\n`);
} else if (msgType === "Termination") {
const audioDuration = data.audio_duration_seconds;
console.log(
`Session terminated: ${audioDuration} seconds of audio processed`
);
}
} catch (error) {
console.error(`\nError handling message: ${error}`);
console.error(`Message data: ${message}`);
}
});
ws.on("error", (error) => {
console.error(`\nWebSocket Error: ${error}`);
cleanup();
});
ws.on("close", (code, reason) => {
console.log(`\nWebSocket Disconnected: Status=${code}, Msg=${reason}`);
cleanup();
});
// Handle process termination
setupTerminationHandlers();
}
function startMicrophone() {
try {
micInstance = mic({
rate: SAMPLE_RATE.toString(),
channels: CHANNELS.toString(),
debug: false,
exitOnSilence: 6, // This won't actually exit, just a parameter for mic
});
micInputStream = micInstance.getAudioStream();
micInputStream.on("data", (data) => {
if (ws && ws.readyState === WebSocket.OPEN && !stopRequested) {
// Store audio data for WAV recording
recordedFrames.push(Buffer.from(data));
// Send audio data to WebSocket
ws.send(data);
}
});
micInputStream.on("error", (err) => {
console.error(`Microphone Error: ${err}`);
cleanup();
});
micInstance.start();
console.log("Microphone stream opened successfully.");
console.log("Speak into your microphone. Press Ctrl+C to stop.");
} catch (error) {
console.error(`Error opening microphone stream: ${error}`);
cleanup();
}
}
function cleanup() {
stopRequested = true;
// Save recorded audio to WAV file
saveWavFile();
// Stop microphone if it's running
if (micInstance) {
try {
micInstance.stop();
} catch (error) {
console.error(`Error stopping microphone: ${error}`);
}
micInstance = null;
}
// Close WebSocket connection if it's open
if (ws && [WebSocket.OPEN, WebSocket.CONNECTING].includes(ws.readyState)) {
try {
// Send termination message if possible
if (ws.readyState === WebSocket.OPEN) {
const terminateMessage = { type: "Terminate" };
console.log(
`Sending termination message: ${JSON.stringify(terminateMessage)}`
);
ws.send(JSON.stringify(terminateMessage));
}
ws.close();
} catch (error) {
console.error(`Error closing WebSocket: ${error}`);
}
ws = null;
}
console.log("Cleanup complete.");
}
function setupTerminationHandlers() {
// Handle Ctrl+C and other termination signals
process.on("SIGINT", () => {
console.log("\nCtrl+C received. Stopping...");
cleanup();
// Give time for cleanup before exiting
setTimeout(() => process.exit(0), 1000);
});
process.on("SIGTERM", () => {
console.log("\nTermination signal received. Stopping...");
cleanup();
// Give time for cleanup before exiting
setTimeout(() => process.exit(0), 1000);
});
// Handle uncaught exceptions
process.on("uncaughtException", (error) => {
console.error(`\nUncaught exception: ${error}`);
cleanup();
// Give time for cleanup before exiting
setTimeout(() => process.exit(1), 1000);
});
}
// Start the application
run();
import { AssemblyAI } from "assemblyai";
import recorder from "node-record-lpcm16";
import { Readable } from "stream";
// --- Configuration ---
const API_KEY = "<YOUR_API_KEY>";
// LLM Gateway Configuration
const PROMPT =
"Provide a brief summary of the transcript.\n\nTranscript: {{turn}}";
const LLM_GATEWAY_CONFIG = {
model: "claude-sonnet-4-6",
messages: [{ role: "user", content: PROMPT }],
max_tokens: 4000,
};
const run = async () => {
const client = new AssemblyAI({
apiKey: API_KEY,
});
const transcriber = client.streaming.transcriber({
sampleRate: 16000,
formatTurns: true,
speechModel: "u3-rt-pro",
llmGateway: LLM_GATEWAY_CONFIG,
});
// Session started event
transcriber.on("open", ({ id }) => {
console.log(`Session started: ${id}`);
});
// Error event
transcriber.on("error", (error) => {
console.error(`\nError: ${error}`);
});
// Close event
transcriber.on("close", (code, reason) => {
console.log("Session closed:", code, reason || "");
});
// Turn event - displays transcript when turn ends
transcriber.on("turn", (turn) => {
if (turn.end_of_turn) {
console.log(`\nTranscript:\n${turn.transcript}\n`);
}
});
// LLM Gateway Response event - displays LLM summary
transcriber.on("llmGatewayResponse", (response) => {
const llmContent = response.data?.choices?.[0]?.message?.content || "";
console.log(`\nLLM Response:\n${llmContent}\n`);
});
// Termination event
transcriber.on("termination", (event) => {
console.log(
`Session terminated: ${event.audio_duration_seconds} seconds of audio processed`
);
});
try {
await transcriber.connect();
console.log("Connecting to streaming transcript service");
console.log("Starting recording");
const recording = recorder.record({
channels: 1,
sampleRate: 16000,
audioType: "wav", // Linear PCM
});
Readable.toWeb(recording.stream()).pipeTo(transcriber.stream());
// Stop recording and close connection using Ctrl-C
process.on("SIGINT", async function () {
console.log();
console.log("Stopping recording");
recording.stop();
console.log("Closing streaming transcript connection");
await transcriber.close();
process.exit();
});
} catch (error) {
console.error(`Error: ${error}`);
}
};
run();
Before you begin
To complete this tutorial, you need:- Python or Node installed.
- An AssemblyAI account with a credit card set up.
- A microphone connected to your computer.
- Basic understanding of how to Transcribe streaming audio.
Step 1: Install prerequisites
- Python
- Python SDK
- JavaScript
- JavaScript SDK
Install the required packages via pip:
pip install pyaudio websocket-client
Install the AssemblyAI Python SDK via pip:
pip install "assemblyai[extras]"
Install the required packages via NPM:
npm install ws mic
Install the AssemblyAI JavaScript SDK and a recording package via NPM:
npm install assemblyai node-record-lpcm16
Step 2: Connect to Universal Streaming
In this step, you’ll set up a connection to the Universal Streaming API with thellm_gateway parameter. This parameter configures LLM Gateway to process your streaming transcripts.
For more information about streaming transcription, see Transcribe streaming audio.
- Python
- Python SDK
- JavaScript
- JavaScript SDK
import pyaudio
import websocket
import json
import threading
import time
import wave
from urllib.parse import urlencode
from datetime import datetime
# --- Configuration ---
YOUR_API_KEY = "<YOUR_API_KEY>" # Replace with your actual API key
# LLM Gateway Configuration
PROMPT = "Provide a brief summary of the transcript.\n\nTranscript: {{turn}}"
LLM_GATEWAY_CONFIG = {
"model": "claude-sonnet-4-6",
"messages": [
{"role": "user", "content": PROMPT}
],
"max_tokens": 4000
}
CONNECTION_PARAMS = {
"sample_rate": 16000,
"format_turns": True, # Request formatted final transcripts
"speech_model": "u3-rt-pro", # USM 3 Pro model
"llm_gateway": json.dumps(LLM_GATEWAY_CONFIG) # LLM Gateway configuration
}
API_ENDPOINT_BASE_URL = "wss://streaming.assemblyai.com/v3/ws"
API_ENDPOINT = f"{API_ENDPOINT_BASE_URL}?{urlencode(CONNECTION_PARAMS)}"
# Audio Configuration
FRAMES_PER_BUFFER = 800 # 50ms of audio (0.05s * 16000Hz)
SAMPLE_RATE = CONNECTION_PARAMS["sample_rate"]
CHANNELS = 1
FORMAT = pyaudio.paInt16
# Global variables for audio stream and websocket
audio = None
stream = None
ws_app = None
audio_thread = None
stop_event = threading.Event() # To signal the audio thread to stop
# WAV recording variables
recorded_frames = [] # Store audio frames for WAV file
recording_lock = threading.Lock() # Thread-safe access to recorded_frames
import logging
from typing import Type
import assemblyai as aai
from assemblyai.streaming.v3 import (
BeginEvent,
LLMGatewayResponseEvent,
StreamingClient,
StreamingClientOptions,
StreamingError,
StreamingEvents,
StreamingParameters,
TurnEvent,
TerminationEvent,
)
from assemblyai.streaming.v3.models import LLMGatewayConfig, LLMGatewayMessage
api_key = "<YOUR_API_KEY>"
prompt = "Provide a brief summary of the transcript.\n\nTranscript: {{turn}}"
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
const WebSocket = require("ws");
const mic = require("mic");
const querystring = require("querystring");
const fs = require("fs");
// --- Configuration ---
const YOUR_API_KEY = "<YOUR_API_KEY>"; // Replace with your actual API key
// LLM Gateway Configuration
const PROMPT =
"Provide a brief summary of the transcript.\n\nTranscript: {{turn}}";
const LLM_GATEWAY_CONFIG = {
model: "claude-sonnet-4-6",
messages: [{ role: "user", content: PROMPT }],
max_tokens: 4000,
};
const CONNECTION_PARAMS = {
sample_rate: 16000,
format_turns: true, // Request formatted final transcripts
speech_model: "u3-rt-pro", // USM 3 Pro model
llm_gateway: JSON.stringify(LLM_GATEWAY_CONFIG), // LLM Gateway configuration
};
const API_ENDPOINT_BASE_URL = "wss://streaming.assemblyai.com/v3/ws";
const API_ENDPOINT = `${API_ENDPOINT_BASE_URL}?${querystring.stringify(CONNECTION_PARAMS)}`;
// Audio Configuration
const SAMPLE_RATE = CONNECTION_PARAMS.sample_rate;
const CHANNELS = 1;
// Global variables
let micInstance = null;
let micInputStream = null;
let ws = null;
let stopRequested = false;
// WAV recording variables
let recordedFrames = []; // Store audio frames for WAV file
import { AssemblyAI } from "assemblyai";
import recorder from "node-record-lpcm16";
import { Readable } from "stream";
// --- Configuration ---
const API_KEY = "<YOUR_API_KEY>";
// LLM Gateway Configuration
const PROMPT =
"Provide a brief summary of the transcript.\n\nTranscript: {{turn}}";
const LLM_GATEWAY_CONFIG = {
model: "claude-sonnet-4-6",
messages: [{ role: "user", content: PROMPT }],
max_tokens: 4000,
};
llm_gateway parameterisa JSON-stringified object that follows the same interface as the LLM Gateway chat completions API. It accepts the following fields:
| Key | Type | Description |
|---|---|---|
model | string | The model to use. See Available models. |
messages | array | An array of message objects. The content field contains your prompt. |
max_tokens | number | The maximum number of tokens to generate. |
Step 3: Stream audio and analyze with LLM Gateway
In this step, you’ll stream audio from your microphone, collect the transcribed text from completed turns, and then send the accumulated transcript to LLM Gateway for analysis when the session ends.Set up the event handlers to stream audio and collect transcripts from completed turns.
- Python
- Python SDK
- JavaScript
- JavaScript SDK
def save_wav_file():
"""Save recorded audio frames to a WAV file."""
if not recorded_frames:
print("No audio data recorded.")
return
# Generate filename with timestamp
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"recorded_audio_{timestamp}.wav"
try:
with wave.open(filename, 'wb') as wf:
wf.setnchannels(CHANNELS)
wf.setsampwidth(2) # 16-bit = 2 bytes
wf.setframerate(SAMPLE_RATE)
# Write all recorded frames
with recording_lock:
wf.writeframes(b''.join(recorded_frames))
print(f"Audio saved to: {filename}")
print(f"Duration: {len(recorded_frames) * FRAMES_PER_BUFFER / SAMPLE_RATE:.2f} seconds")
except Exception as e:
print(f"Error saving WAV file: {e}")
# --- WebSocket Event Handlers ---
def on_open(ws):
"""Called when the WebSocket connection is established."""
print("WebSocket connection opened.")
print(f"Connected to: {API_ENDPOINT_BASE_URL}")
# Start sending audio data in a separate thread
def stream_audio():
global stream
print("Starting audio streaming...")
while not stop_event.is_set():
try:
audio_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False)
# Store audio data for WAV recording
with recording_lock:
recorded_frames.append(audio_data)
# Send audio data as binary message
ws.send(audio_data, websocket.ABNF.OPCODE_BINARY)
except Exception as e:
print(f"Error streaming audio: {e}")
# If stream read fails, likely means it's closed, stop the loop
break
print("Audio streaming stopped.")
global audio_thread
audio_thread = threading.Thread(target=stream_audio)
audio_thread.daemon = (
True # Allow main thread to exit even if this thread is running
)
audio_thread.start()
def on_message(ws, message):
try:
data = json.loads(message)
msg_type = data.get('type')
if msg_type == "Begin":
session_id = data.get('id')
expires_at = data.get('expires_at')
print(f"Session started: {session_id}")
elif msg_type == "Turn":
end_of_turn = data.get('end_of_turn', False)
if end_of_turn:
transcript = data.get('transcript', '')
print(f"\nTranscript:\n{transcript}\n")
elif msg_type == "LLMGatewayResponse":
# Extract the LLM response content
llm_data = data.get('data', {})
llm_content = llm_data.get("choices", [{}])[0].get("message", {}).get("content", "")
print(f"LLM Response:\n{llm_content}\n")
elif msg_type == "Termination":
audio_duration = data.get('audio_duration_seconds', 0)
session_duration = data.get('session_duration_seconds', 0)
print(f"Session terminated: {audio_duration} seconds of audio processed")
except json.JSONDecodeError as e:
print(f"Error decoding message: {e}")
except Exception as e:
print(f"Error handling message: {e}")
def on_error(ws, error):
"""Called when a WebSocket error occurs."""
print(f"\nWebSocket Error: {error}")
# Attempt to signal stop on error
stop_event.set()
def on_close(ws, close_status_code, close_msg):
"""Called when the WebSocket connection is closed."""
print(f"\nWebSocket Disconnected: Status={close_status_code}, Msg={close_msg}")
# Save recorded audio to WAV file
save_wav_file()
# Ensure audio resources are released
global stream, audio
stop_event.set() # Signal audio thread just in case it's still running
if stream:
if stream.is_active():
stream.stop_stream()
stream.close()
stream = None
if audio:
audio.terminate()
audio = None
# Try to join the audio thread to ensure clean exit
if audio_thread and audio_thread.is_alive():
audio_thread.join(timeout=1.0)
def on_begin(self: Type[StreamingClient], event: BeginEvent):
print(f"Session started: {event.id}")
def on_turn(self: Type[StreamingClient], event: TurnEvent):
if event.end_of_turn:
print(f"\nTranscript:\n{event.transcript}\n")
def on_llm_response(self: Type[StreamingClient], event: LLMGatewayResponseEvent):
# Extract the actual LLM response content from the data
llm_content = event.data.get("choices", [{}])[0].get("message", {}).get("content", "")
print(f"LLM Response:\n{llm_content}\n")
def on_terminated(self: Type[StreamingClient], event: TerminationEvent):
print(
f"Session terminated: {event.audio_duration_seconds} seconds of audio processed"
)
def on_error(self: Type[StreamingClient], error: StreamingError):
print(f"Error occurred: {error}")
// --- Helper functions ---
function formatTimestamp(timestamp) {
return new Date(timestamp * 1000).toISOString();
}
function createWavHeader(sampleRate, channels, dataLength) {
const buffer = Buffer.alloc(44);
// RIFF header
buffer.write("RIFF", 0);
buffer.writeUInt32LE(36 + dataLength, 4);
buffer.write("WAVE", 8);
// fmt chunk
buffer.write("fmt ", 12);
buffer.writeUInt32LE(16, 16); // fmt chunk size
buffer.writeUInt16LE(1, 20); // PCM format
buffer.writeUInt16LE(channels, 22);
buffer.writeUInt32LE(sampleRate, 24);
buffer.writeUInt32LE(sampleRate * channels * 2, 28); // byte rate
buffer.writeUInt16LE(channels * 2, 32); // block align
buffer.writeUInt16LE(16, 34); // bits per sample
// data chunk
buffer.write("data", 36);
buffer.writeUInt32LE(dataLength, 40);
return buffer;
}
function saveWavFile() {
if (recordedFrames.length === 0) {
console.log("No audio data recorded.");
return;
}
// Generate filename with timestamp
const timestamp = new Date().toISOString().replace(/[:.]/g, "-").slice(0, 19);
const filename = `recorded_audio_${timestamp}.wav`;
try {
// Combine all recorded frames
const audioData = Buffer.concat(recordedFrames);
const dataLength = audioData.length;
// Create WAV header
const wavHeader = createWavHeader(SAMPLE_RATE, CHANNELS, dataLength);
// Write WAV file
const wavFile = Buffer.concat([wavHeader, audioData]);
fs.writeFileSync(filename, wavFile);
console.log(`Audio saved to: ${filename}`);
console.log(
`Duration: ${(dataLength / (SAMPLE_RATE * CHANNELS * 2)).toFixed(2)} seconds`
);
} catch (error) {
console.error(`Error saving WAV file: ${error}`);
}
}
function startMicrophone() {
try {
micInstance = mic({
rate: SAMPLE_RATE.toString(),
channels: CHANNELS.toString(),
debug: false,
exitOnSilence: 6, // This won't actually exit, just a parameter for mic
});
micInputStream = micInstance.getAudioStream();
micInputStream.on("data", (data) => {
if (ws && ws.readyState === WebSocket.OPEN && !stopRequested) {
// Store audio data for WAV recording
recordedFrames.push(Buffer.from(data));
// Send audio data to WebSocket
ws.send(data);
}
});
micInputStream.on("error", (err) => {
console.error(`Microphone Error: ${err}`);
cleanup();
});
micInstance.start();
console.log("Microphone stream opened successfully.");
console.log("Speak into your microphone. Press Ctrl+C to stop.");
} catch (error) {
console.error(`Error opening microphone stream: ${error}`);
cleanup();
}
}
// Session started event
transcriber.on("open", ({ id }) => {
console.log(`Session started: ${id}`);
});
// Error event
transcriber.on("error", (error) => {
console.error(`\nError: ${error}`);
});
// Close event
transcriber.on("close", (code, reason) => {
console.log("Session closed:", code, reason || "");
});
// Turn event - displays transcript when turn ends
transcriber.on("turn", (turn) => {
if (turn.end_of_turn) {
console.log(`\nTranscript:\n${turn.transcript}\n`);
}
});
// LLM Gateway Response event - displays LLM summary
transcriber.on("llmGatewayResponse", (response) => {
const llmContent = response.data?.choices?.[0]?.message?.content || "";
console.log(`\nLLM Response:\n${llmContent}\n`);
});
// Termination event
transcriber.on("termination", (event) => {
console.log(
`Session terminated: ${event.audio_duration_seconds} seconds of audio processed`
);
});
Define a function to send the accumulated transcript to LLM Gateway for analysis. This function uses the LLM Gateway chat completions API to process the transcript with your prompt.
- Python
- Python SDK
- JavaScript
- JavaScript SDK
When using the raw WebSocket approach with
llm_gateway in the connection parameters, LLM Gateway responses are received as LLMGatewayResponse messages through the WebSocket, handled by the on_message callback registered in the previous step. No separate API call is needed.When using the Python SDK with
LLMGatewayConfig, analysis responses are received automatically through the LLMGatewayResponseEvent event handler registered in the previous step. No separate API call is needed.When using the raw WebSocket approach with
llm_gateway in the connection parameters, LLM Gateway responses are received as LLMGatewayResponse messages through the WebSocket, handled by the on_message callback in the run() function. No separate API call is needed.When using the JavaScript SDK with
llmGateway in the transcriber options, LLM Gateway responses are received automatically through the llmGatewayResponse event handler registered in the previous step. No separate API call is needed.Run the streaming session and analyze the transcript with LLM Gateway when the session ends.The output will look something like this:
- Python
- Python SDK
- JavaScript
- JavaScript SDK
# --- Main Execution ---
def run():
global audio, stream, ws_app
# Initialize PyAudio
audio = pyaudio.PyAudio()
# Open microphone stream
try:
stream = audio.open(
input=True,
frames_per_buffer=FRAMES_PER_BUFFER,
channels=CHANNELS,
format=FORMAT,
rate=SAMPLE_RATE,
)
print("Microphone stream opened successfully.")
print("Speak into your microphone. Press Ctrl+C to stop.")
print("Audio will be saved to a WAV file when the session ends.")
except Exception as e:
print(f"Error opening microphone stream: {e}")
if audio:
audio.terminate()
return # Exit if microphone cannot be opened
# Create WebSocketApp
ws_app = websocket.WebSocketApp(
API_ENDPOINT,
header={"Authorization": YOUR_API_KEY},
on_open=on_open,
on_message=on_message,
on_error=on_error,
on_close=on_close,
)
# Run WebSocketApp in a separate thread to allow main thread to catch KeyboardInterrupt
ws_thread = threading.Thread(target=ws_app.run_forever)
ws_thread.daemon = True
ws_thread.start()
try:
# Keep main thread alive until interrupted
while ws_thread.is_alive():
time.sleep(0.1)
except KeyboardInterrupt:
print("\nCtrl+C received. Stopping...")
stop_event.set() # Signal audio thread to stop
# Send termination message to the server
if ws_app and ws_app.sock and ws_app.sock.connected:
try:
terminate_message = {"type": "Terminate"}
print(f"Sending termination message: {json.dumps(terminate_message)}")
ws_app.send(json.dumps(terminate_message))
# Give a moment for messages to process before forceful close
time.sleep(5)
except Exception as e:
print(f"Error sending termination message: {e}")
# Close the WebSocket connection (will trigger on_close)
if ws_app:
ws_app.close()
# Wait for WebSocket thread to finish
ws_thread.join(timeout=2.0)
except Exception as e:
print(f"\nAn unexpected error occurred: {e}")
stop_event.set()
if ws_app:
ws_app.close()
ws_thread.join(timeout=2.0)
finally:
# Final cleanup (already handled in on_close, but good as a fallback)
if stream and stream.is_active():
stream.stop_stream()
if stream:
stream.close()
if audio:
audio.terminate()
print("Cleanup complete. Exiting.")
if __name__ == "__main__":
run()
def main():
client = StreamingClient(
StreamingClientOptions(
api_key=api_key,
api_host="streaming.assemblyai.com",
)
)
client.on(StreamingEvents.Begin, on_begin)
client.on(StreamingEvents.Turn, on_turn)
client.on(StreamingEvents.LLMGatewayResponse, on_llm_response)
client.on(StreamingEvents.Termination, on_terminated)
client.on(StreamingEvents.Error, on_error)
client.connect(
StreamingParameters(
sample_rate=16000,
speech_model="u3-rt-pro",
format_turns=True,
llm_gateway=LLMGatewayConfig(
model="claude-sonnet-4-6",
messages=[
LLMGatewayMessage(role="user", content=prompt)
],
max_tokens=4000
)
)
)
try:
client.stream(
aai.extras.MicrophoneStream(sample_rate=16000)
)
finally:
client.disconnect(terminate=True)
if __name__ == "__main__":
main()
// --- Main function ---
async function run() {
console.log(
"Starting AssemblyAI streaming transcription with LLM Gateway..."
);
console.log("Audio will be saved to a WAV file when the session ends.");
// Initialize WebSocket connection
ws = new WebSocket(API_ENDPOINT, {
headers: {
Authorization: YOUR_API_KEY,
},
});
// Setup WebSocket event handlers
ws.on("open", () => {
console.log("WebSocket connection opened.");
console.log(`Connected to: ${API_ENDPOINT_BASE_URL}`);
// Start the microphone
startMicrophone();
});
ws.on("message", (message) => {
try {
const data = JSON.parse(message);
const msgType = data.type;
if (msgType === "Begin") {
const sessionId = data.id;
console.log(`Session started: ${sessionId}`);
} else if (msgType === "Turn") {
const endOfTurn = data.end_of_turn;
if (endOfTurn) {
const transcript = data.transcript || "";
console.log(`\nTranscript:\n${transcript}\n`);
}
} else if (msgType === "LLMGatewayResponse") {
// Extract the LLM response content
const llmData = data.data || {};
const llmContent = llmData.choices?.[0]?.message?.content || "";
console.log(`LLM Response:\n${llmContent}\n`);
} else if (msgType === "Termination") {
const audioDuration = data.audio_duration_seconds;
console.log(
`Session terminated: ${audioDuration} seconds of audio processed`
);
}
} catch (error) {
console.error(`\nError handling message: ${error}`);
console.error(`Message data: ${message}`);
}
});
ws.on("error", (error) => {
console.error(`\nWebSocket Error: ${error}`);
cleanup();
});
ws.on("close", (code, reason) => {
console.log(`\nWebSocket Disconnected: Status=${code}, Msg=${reason}`);
cleanup();
});
// Handle process termination
setupTerminationHandlers();
}
function cleanup() {
stopRequested = true;
// Save recorded audio to WAV file
saveWavFile();
// Stop microphone if it's running
if (micInstance) {
try {
micInstance.stop();
} catch (error) {
console.error(`Error stopping microphone: ${error}`);
}
micInstance = null;
}
// Close WebSocket connection if it's open
if (ws && [WebSocket.OPEN, WebSocket.CONNECTING].includes(ws.readyState)) {
try {
// Send termination message if possible
if (ws.readyState === WebSocket.OPEN) {
const terminateMessage = { type: "Terminate" };
console.log(
`Sending termination message: ${JSON.stringify(terminateMessage)}`
);
ws.send(JSON.stringify(terminateMessage));
}
ws.close();
} catch (error) {
console.error(`Error closing WebSocket: ${error}`);
}
ws = null;
}
console.log("Cleanup complete.");
}
function setupTerminationHandlers() {
// Handle Ctrl+C and other termination signals
process.on("SIGINT", () => {
console.log("\nCtrl+C received. Stopping...");
cleanup();
// Give time for cleanup before exiting
setTimeout(() => process.exit(0), 1000);
});
process.on("SIGTERM", () => {
console.log("\nTermination signal received. Stopping...");
cleanup();
// Give time for cleanup before exiting
setTimeout(() => process.exit(0), 1000);
});
// Handle uncaught exceptions
process.on("uncaughtException", (error) => {
console.error(`\nUncaught exception: ${error}`);
cleanup();
// Give time for cleanup before exiting
setTimeout(() => process.exit(1), 1000);
});
}
// Start the application
run();
const run = async () => {
const client = new AssemblyAI({
apiKey: API_KEY,
});
const transcriber = client.streaming.transcriber({
sampleRate: 16000,
formatTurns: true,
speechModel: "u3-rt-pro",
llmGateway: LLM_GATEWAY_CONFIG,
});
// ... (event handlers from Step 1)
try {
await transcriber.connect();
console.log("Connecting to streaming transcript service");
console.log("Starting recording");
const recording = recorder.record({
channels: 1,
sampleRate: 16000,
audioType: "wav", // Linear PCM
});
Readable.toWeb(recording.stream()).pipeTo(transcriber.stream());
// Stop recording and close connection using Ctrl-C
process.on("SIGINT", async function () {
console.log();
console.log("Stopping recording");
recording.stop();
console.log("Closing streaming transcript connection");
await transcriber.close();
process.exit();
});
} catch (error) {
console.error(`Error: ${error}`);
}
};
run();
Session started: de5d9927-73a6-4be8-b52d-b4c07be37e6b
Transcript: Hi, my name is Sonny.
Transcript: I am a voice agent.
Stopping...
Session terminated: 12s of audio processed
Analyzing conversation with LLM Gateway...
The speaker introduces themselves as Sonny and identifies as a voice agent.
Want to make your LLM requests more resilient? Use fallback models to automatically switch to a backup model if your primary model is unavailable.
Next steps
In this tutorial, you’ve learned how to analyze streaming audio transcripts using LLM Gateway. The type of output depends on your prompt, so try exploring different prompts to see how they affect the output. Here are a few more prompts to try:- “Provide an analysis of the transcript and offer areas to improve with exact quotes.”
- “What’s the main take-away from the transcript?”
- “Generate a set of action items from this transcript.”