In this guide, you’ll learn how to use LLM Gateway with AssemblyAI’s Streaming API. This script accumulates transcribed text in theDocumentation Index
Fetch the complete documentation index at: https://assemblyai.com/docs/llms.txt
Use this file to discover all available pages before exploring further.
on_message function using a global conversation_data (Python) / conversationData (JavaScript) variable. Once the transcription session is closed, the accumulated transcript is sent to LLM Gateway for analysis.
Quickstart
- Python
- JavaScript
import pyaudio
import websocket
import json
import threading
import time
import requests
from urllib.parse import urlencode
YOUR_API_KEY = "YOUR_API_KEY" # Replace with your actual API key
CONNECTION_PARAMS = {
"sample_rate": 16000,
"speech_model": "u3-rt-pro",
}
API_ENDPOINT_BASE_URL = "wss://streaming.assemblyai.com/v3/ws"
API_ENDPOINT = f"{API_ENDPOINT_BASE_URL}?{urlencode(CONNECTION_PARAMS)}"
FRAMES_PER_BUFFER = 800
SAMPLE_RATE = CONNECTION_PARAMS["sample_rate"]
CHANNELS = 1
FORMAT = pyaudio.paInt16
audio = None
stream = None
ws_app = None
audio_thread = None
stop_event = threading.Event()
conversation_data = ""
def analyze_with_llm_gateway(text):
"""Called when the WebSocket connection is closing and the transcript text is sent to LLM Gateway to be analyzed."""
headers = {
"authorization": YOUR_API_KEY,
"content-type": "application/json"
}
prompt = "You are a helpful coach. Provide an analysis of the transcript and offer areas to improve with exact quotes. Include no preamble. Start with an overall summary then get into the examples with feedback."
llm_gateway_data = {
"model": "claude-sonnet-4-20250514",
"messages": [
{"role": "user", "content": f"{prompt}\n\nTranscript: {text}"}
],
"max_tokens": 4000
}
result = requests.post(
"https://docs/llm-gateway.assemblyai.com/v1/chat/completions",
headers=headers,
json=llm_gateway_data
)
return result.json()["choices"][0]["message"]["content"]
def on_open(ws):
print("WebSocket connection opened.")
def stream_audio():
global stream
while not stop_event.is_set():
try:
audio_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False)
ws.send(audio_data, websocket.ABNF.OPCODE_BINARY)
except Exception as e:
print(f"Error streaming audio: {e}")
break
global audio_thread
audio_thread = threading.Thread(target=stream_audio)
audio_thread.daemon = True
audio_thread.start()
def on_message(ws, message):
try:
data = json.loads(message)
msg_type = data.get("type")
if msg_type == "Begin":
print(f"Session began: ID={data.get('id')}")
elif msg_type == "Turn":
transcript = data.get("transcript", "")
if data.get("end_of_turn"):
global conversation_data
print(f"\r{' ' * 80}\r{transcript}")
conversation_data += f"{transcript}\n"
else:
print(f"\r{transcript}", end="")
elif msg_type == "Termination":
print(f"\nSession terminated: {data.get('audio_duration_seconds', 0)}s of audio")
except Exception as e:
print(f"Error handling message: {e}")
def on_error(ws, error):
print(f"\nWebSocket Error: {error}")
stop_event.set()
def on_close(ws, close_status_code, close_msg):
print(f"\nWebSocket Disconnected: Status={close_status_code}")
global stream, audio
stop_event.set()
if stream:
if stream.is_active():
stream.stop_stream()
stream.close()
if audio:
audio.terminate()
def run():
global audio, stream, ws_app
audio = pyaudio.PyAudio()
stream = audio.open(
input=True,
frames_per_buffer=FRAMES_PER_BUFFER,
channels=CHANNELS,
format=FORMAT,
rate=SAMPLE_RATE,
)
print("Speak into your microphone. Press Ctrl+C to stop.")
ws_app = websocket.WebSocketApp(
API_ENDPOINT,
header={"Authorization": YOUR_API_KEY},
on_open=on_open,
on_message=on_message,
on_error=on_error,
on_close=on_close,
)
ws_thread = threading.Thread(target=ws_app.run_forever)
ws_thread.daemon = True
ws_thread.start()
try:
while ws_thread.is_alive():
time.sleep(0.1)
except KeyboardInterrupt:
print("\nStopping...")
stop_event.set()
if ws_app and ws_app.sock and ws_app.sock.connected:
ws_app.send(json.dumps({"type": "Terminate"}))
time.sleep(2)
if ws_app:
ws_app.close()
ws_thread.join(timeout=2.0)
if conversation_data.strip():
print("Analyzing conversation with LLM Gateway...")
print(analyze_with_llm_gateway(conversation_data))
else:
print("No conversation data to analyze.")
if __name__ == "__main__":
run()
import WebSocket from "ws";
import mic from "mic";
const YOUR_API_KEY = "YOUR_API_KEY";
const CONNECTION_PARAMS = {
sample_rate: 16000,
speech_model: "u3-rt-pro",
};
const API_ENDPOINT_BASE_URL = "wss://streaming.assemblyai.com/v3/ws";
const API_ENDPOINT = `${API_ENDPOINT_BASE_URL}?${new URLSearchParams(CONNECTION_PARAMS).toString()}`;
const SAMPLE_RATE = CONNECTION_PARAMS.sample_rate;
let micInstance = null;
let ws = null;
let conversationData = "";
async function analyzeWithLlmGateway(text) {
const prompt =
"You are a helpful coach. Provide an analysis of the transcript and offer areas to improve with exact quotes. Include no preamble. Start with an overall summary then get into the examples with feedback.";
const response = await fetch(
"https://docs/llm-gateway.assemblyai.com/v1/chat/completions",
{
method: "POST",
headers: {
Authorization: YOUR_API_KEY,
"Content-Type": "application/json",
},
body: JSON.stringify({
model: "claude-sonnet-4-20250514",
messages: [
{ role: "user", content: `${prompt}\n\nTranscript: ${text}` },
],
max_tokens: 4000,
}),
}
);
const data = await response.json();
return data.choices[0].message.content;
}
function run() {
ws = new WebSocket(API_ENDPOINT, {
headers: { Authorization: YOUR_API_KEY },
});
ws.on("open", () => {
console.log("WebSocket connection opened.");
micInstance = mic({
rate: String(SAMPLE_RATE),
channels: "1",
bitwidth: "16",
encoding: "signed-integer",
endian: "little",
});
const micInputStream = micInstance.getAudioStream();
micInputStream.on("data", (data) => {
if (ws.readyState === WebSocket.OPEN) {
ws.send(data);
}
});
micInstance.start();
console.log("Speak into your microphone. Press Ctrl+C to stop.");
});
ws.on("message", (data) => {
try {
const msg = JSON.parse(data);
if (msg.type === "Begin") {
console.log(`Session began: ID=${msg.id}`);
} else if (msg.type === "Turn") {
const transcript = msg.transcript || "";
if (msg.end_of_turn) {
process.stdout.write("\r" + " ".repeat(80) + "\r");
console.log(transcript);
conversationData += `${transcript}\n`;
} else {
process.stdout.write(`\r${transcript}`);
}
} else if (msg.type === "Termination") {
console.log(
`\nSession terminated: ${msg.audio_duration_seconds}s of audio`
);
}
} catch (e) {
console.error("Error handling message:", e);
}
});
ws.on("error", (error) => {
console.error("WebSocket error:", error);
});
ws.on("close", (code) => {
console.log(`WebSocket closed: ${code}`);
if (micInstance) micInstance.stop();
});
process.on("SIGINT", async () => {
console.log("\nStopping...");
if (micInstance) micInstance.stop();
if (ws && ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify({ type: "Terminate" }));
await new Promise((resolve) => setTimeout(resolve, 2000));
ws.close();
}
if (conversationData.trim()) {
console.log("Analyzing conversation with LLM Gateway...");
const analysis = await analyzeWithLlmGateway(conversationData);
console.log(analysis);
} else {
console.log("No conversation data to analyze.");
}
process.exit(0);
});
}
run();
Step-by-Step Instructions
Before we begin, make sure you have an AssemblyAI account and an API key. You can sign up and get your API key from your dashboard.Install Dependencies
- Python
- JavaScript
pip install websocket-client pyaudio requests
npm install ws mic
Import Packages & Set API Key
- Python
- JavaScript
import pyaudio
import websocket
import json
import threading
import time
import requests
from urllib.parse import urlencode
YOUR_API_KEY = "YOUR_API_KEY" # Replace with your actual API key
import WebSocket from "ws";
import mic from "mic";
const YOUR_API_KEY = "YOUR_API_KEY";
Audio Configuration & Global Variables
Set all of your audio configurations and global variables. Initialize theconversation_data / conversationData variable as an empty string to accumulate final transcripts.
- Python
- JavaScript
CONNECTION_PARAMS = {
"sample_rate": 16000,
"speech_model": "u3-rt-pro",
}
API_ENDPOINT_BASE_URL = "wss://streaming.assemblyai.com/v3/ws"
API_ENDPOINT = f"{API_ENDPOINT_BASE_URL}?{urlencode(CONNECTION_PARAMS)}"
FRAMES_PER_BUFFER = 800
SAMPLE_RATE = CONNECTION_PARAMS["sample_rate"]
CHANNELS = 1
FORMAT = pyaudio.paInt16
audio = None
stream = None
ws_app = None
audio_thread = None
stop_event = threading.Event()
conversation_data = ""
const CONNECTION_PARAMS = {
sample_rate: 16000,
speech_model: "u3-rt-pro",
};
const API_ENDPOINT_BASE_URL = "wss://streaming.assemblyai.com/v3/ws";
const API_ENDPOINT = `${API_ENDPOINT_BASE_URL}?${new URLSearchParams(CONNECTION_PARAMS).toString()}`;
const SAMPLE_RATE = CONNECTION_PARAMS.sample_rate;
let micInstance = null;
let ws = null;
let conversationData = "";
Define Analyze With LLM Gateway Function
Define a function calledanalyze_with_llm_gateway (Python) or analyzeWithLlmGateway (JavaScript), which uses LLM Gateway to analyze the complete final transcript text. The prompt can be modified to suit your individual requirements.
- Python
- JavaScript
def analyze_with_llm_gateway(text):
"""Called when the WebSocket connection is closing and the transcript text is sent to LLM Gateway to be analyzed."""
headers = {
"authorization": YOUR_API_KEY,
"content-type": "application/json"
}
prompt = "You are a helpful coach. Provide an analysis of the transcript and offer areas to improve with exact quotes. Include no preamble. Start with an overall summary then get into the examples with feedback."
llm_gateway_data = {
"model": "claude-sonnet-4-20250514",
"messages": [
{"role": "user", "content": f"{prompt}\n\nTranscript: {text}"}
],
"max_tokens": 4000
}
result = requests.post(
"https://docs/llm-gateway.assemblyai.com/v1/chat/completions",
headers=headers,
json=llm_gateway_data
)
return result.json()["choices"][0]["message"]["content"]
async function analyzeWithLlmGateway(text) {
const prompt =
"You are a helpful coach. Provide an analysis of the transcript and offer areas to improve with exact quotes. Include no preamble. Start with an overall summary then get into the examples with feedback.";
const response = await fetch(
"https://docs/llm-gateway.assemblyai.com/v1/chat/completions",
{
method: "POST",
headers: {
Authorization: YOUR_API_KEY,
"Content-Type": "application/json",
},
body: JSON.stringify({
model: "claude-sonnet-4-20250514",
messages: [
{ role: "user", content: `${prompt}\n\nTranscript: ${text}` },
],
max_tokens: 4000,
}),
}
);
const data = await response.json();
return data.choices[0].message.content;
}
Websocket Event Handlers
Open Websocket
- Python
- JavaScript
def on_open(ws):
print("WebSocket connection opened.")
def stream_audio():
global stream
while not stop_event.is_set():
try:
audio_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False)
ws.send(audio_data, websocket.ABNF.OPCODE_BINARY)
except Exception as e:
print(f"Error streaming audio: {e}")
break
global audio_thread
audio_thread = threading.Thread(target=stream_audio)
audio_thread.daemon = True
audio_thread.start()
ws.on("open", () => {
console.log("WebSocket connection opened.");
micInstance = mic({
rate: String(SAMPLE_RATE),
channels: "1",
bitwidth: "16",
encoding: "signed-integer",
endian: "little",
});
const micInputStream = micInstance.getAudioStream();
micInputStream.on("data", (data) => {
if (ws.readyState === WebSocket.OPEN) {
ws.send(data);
}
});
micInstance.start();
console.log("Speak into your microphone. Press Ctrl+C to stop.");
});
Handle Websocket Messages
In this function, use the previously definedconversation_data / conversationData to store all final transcripts together for later analysis.
- Python
- JavaScript
def on_message(ws, message):
try:
data = json.loads(message)
msg_type = data.get("type")
if msg_type == "Begin":
print(f"Session began: ID={data.get('id')}")
elif msg_type == "Turn":
transcript = data.get("transcript", "")
if data.get("end_of_turn"):
global conversation_data
print(f"\r{' ' * 80}\r{transcript}")
conversation_data += f"{transcript}\n"
else:
print(f"\r{transcript}", end="")
elif msg_type == "Termination":
print(f"\nSession terminated: {data.get('audio_duration_seconds', 0)}s of audio")
except Exception as e:
print(f"Error handling message: {e}")
ws.on("message", (data) => {
try {
const msg = JSON.parse(data);
if (msg.type === "Begin") {
console.log(`Session began: ID=${msg.id}`);
} else if (msg.type === "Turn") {
const transcript = msg.transcript || "";
if (msg.end_of_turn) {
process.stdout.write("\r" + " ".repeat(80) + "\r");
console.log(transcript);
conversationData += `${transcript}\n`;
} else {
process.stdout.write(`\r${transcript}`);
}
} else if (msg.type === "Termination") {
console.log(
`\nSession terminated: ${msg.audio_duration_seconds}s of audio`
);
}
} catch (e) {
console.error("Error handling message:", e);
}
});
Close Websocket
- Python
- JavaScript
def on_close(ws, close_status_code, close_msg):
print(f"\nWebSocket Disconnected: Status={close_status_code}")
global stream, audio
stop_event.set()
if stream:
if stream.is_active():
stream.stop_stream()
stream.close()
if audio:
audio.terminate()
ws.on("close", (code) => {
console.log(`WebSocket closed: ${code}`);
if (micInstance) micInstance.stop();
});
Websocket Error Handling
- Python
- JavaScript
def on_error(ws, error):
print(f"\nWebSocket Error: {error}")
stop_event.set()
ws.on("error", (error) => {
console.error("WebSocket error:", error);
});
Begin Streaming STT Transcription
After the socket is closed,conversation_data / conversationData is sent to the analyze_with_llm_gateway / analyzeWithLlmGateway function and the LLM Gateway results are printed out.
- Python
- JavaScript
def run():
global audio, stream, ws_app
audio = pyaudio.PyAudio()
stream = audio.open(
input=True,
frames_per_buffer=FRAMES_PER_BUFFER,
channels=CHANNELS,
format=FORMAT,
rate=SAMPLE_RATE,
)
print("Speak into your microphone. Press Ctrl+C to stop.")
ws_app = websocket.WebSocketApp(
API_ENDPOINT,
header={"Authorization": YOUR_API_KEY},
on_open=on_open,
on_message=on_message,
on_error=on_error,
on_close=on_close,
)
ws_thread = threading.Thread(target=ws_app.run_forever)
ws_thread.daemon = True
ws_thread.start()
try:
while ws_thread.is_alive():
time.sleep(0.1)
except KeyboardInterrupt:
print("\nStopping...")
stop_event.set()
if ws_app and ws_app.sock and ws_app.sock.connected:
ws_app.send(json.dumps({"type": "Terminate"}))
time.sleep(2)
if ws_app:
ws_app.close()
ws_thread.join(timeout=2.0)
if conversation_data.strip():
print("Analyzing conversation with LLM Gateway...")
print(analyze_with_llm_gateway(conversation_data))
else:
print("No conversation data to analyze.")
if __name__ == "__main__":
run()
process.on("SIGINT", async () => {
console.log("\nStopping...");
if (micInstance) micInstance.stop();
if (ws && ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify({ type: "Terminate" }));
await new Promise((resolve) => setTimeout(resolve, 2000));
ws.close();
}
if (conversationData.trim()) {
console.log("Analyzing conversation with LLM Gateway...");
const analysis = await analyzeWithLlmGateway(conversationData);
console.log(analysis);
} else {
console.log("No conversation data to analyze.");
}
process.exit(0);
});
run();