Get Started
Before we begin, make sure you have an AssemblyAI account and an API key. You can sign up for a free account and get your API key from your AssemblyAI dashboard.Side-By-Side Code Comparison
Below is a side-by-side comparison of a basic snippet to transcribe live audio by Gladia and AssemblyAI using a microphone:- Gladia
- AssemblyAI
import asyncio
import base64
import json
import signal
from datetime import time
import pyaudio
import requests
from websockets.asyncio.client import connect
from websockets.exceptions import ConnectionClosedOK
# Constants
GLADIA_API_KEY = "<YOUR_GLADIA_API_KEY>"
GLADIA_API_URL = "https://api.gladia.io"
# Audio configuration
CHANNELS = 1
FORMAT = pyaudio.paInt16
FRAMES_PER_BUFFER = 3200
SAMPLE_RATE = 16_000
async def main():
# Initialize the session
config = {
"encoding": "wav/pcm",
"sample_rate": SAMPLE_RATE,
"bit_depth": 16,
"channels": CHANNELS,
"language_config": {
"languages": ["en"],
},
}
# Start the live session
response = requests.post(
f"{GLADIA_API_URL}/v2/live",
headers={"X-Gladia-Key": GLADIA_API_KEY},
json=config,
timeout=3,
)
if not response.ok:
print(f"{response.status_code}: {response.text or response.reason}")
exit(response.status_code)
session_data = response.json()
# Connect to the websocket
async with connect(session_data["url"]) as websocket:
print("\n################ Begin session ################\n")
# Handle Ctrl+C to stop recording
loop = asyncio.get_running_loop()
loop.add_signal_handler(
signal.SIGINT,
lambda: loop.create_task(stop_recording(websocket)),
)
# Create tasks for sending audio and receiving transcripts
send_audio_task = asyncio.create_task(send_audio(websocket))
receive_transcript_task = asyncio.create_task(receive_transcript(websocket))
await asyncio.wait([send_audio_task, receive_transcript_task])
async def stop_recording(websocket):
print(">>>>> Ending the recording…")
await websocket.send(json.dumps({"type": "stop_recording"}))
await asyncio.sleep(0)
async def send_audio(websocket):
# Initialize PyAudio
p = pyaudio.PyAudio()
# Open audio stream
stream = p.open(
format=FORMAT,
channels=CHANNELS,
rate=SAMPLE_RATE,
input=True,
frames_per_buffer=FRAMES_PER_BUFFER,
)
# Send audio chunks
while True:
data = stream.read(FRAMES_PER_BUFFER)
data = base64.b64encode(data).decode("utf-8")
json_data = json.dumps({"type": "audio_chunk", "data": {"chunk": str(data)}})
try:
await websocket.send(json_data)
await asyncio.sleep(0.1) # Send audio every 100ms
except ConnectionClosedOK:
return
async def receive_transcript(websocket):
# Process incoming messages
async for message in websocket:
content = json.loads(message)
# Print transcripts
if content["type"] == "transcript" and content["data"]["is_final"]:
text = content["data"]["utterance"]["text"].strip()
print(f"Final: {text}")
# Print final results
if content["type"] == "post_final_transcript":
print("\n################ End of session ################\n")
print(json.dumps(content, indent=2, ensure_ascii=False))
if __name__ == "__main__":
asyncio.run(main())
import pyaudio
import websocket
import json
import threading
import time
from urllib.parse import urlencode
from datetime import datetime
# --- Configuration ---
YOUR_API_KEY = "YOUR-API-KEY" # Replace with your actual API key
CONNECTION_PARAMS = {
"sample_rate": 16000,
"speech_model": "u3-rt-pro",
}
API_ENDPOINT_BASE_URL = "wss://streaming.assemblyai.com/v3/ws"
API_ENDPOINT = f"{API_ENDPOINT_BASE_URL}?{urlencode(CONNECTION_PARAMS)}"
# Audio Configuration
FRAMES_PER_BUFFER = 800 # 50ms of audio (0.05s * 16000Hz)
SAMPLE_RATE = CONNECTION_PARAMS["sample_rate"]
CHANNELS = 1
FORMAT = pyaudio.paInt16
# Global variables for audio stream and websocket
audio = None
stream = None
ws_app = None
audio_thread = None
stop_event = threading.Event() # To signal the audio thread to stop
# --- WebSocket Event Handlers ---
def on_open(ws):
"""Called when the WebSocket connection is established."""
print("WebSocket connection opened.")
print(f"Connected to: {API_ENDPOINT}")
# Start sending audio data in a separate thread
def stream_audio():
global stream
print("Starting audio streaming...")
while not stop_event.is_set():
try:
audio_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False)
# Send audio data as binary message
ws.send(audio_data, websocket.ABNF.OPCODE_BINARY)
except Exception as e:
print(f"Error streaming audio: {e}")
# If stream read fails, likely means it's closed, stop the loop
break
print("Audio streaming stopped.")
global audio_thread
audio_thread = threading.Thread(target=stream_audio)
audio_thread.daemon = (
True # Allow main thread to exit even if this thread is running
)
audio_thread.start()
def on_message(ws, message):
try:
data = json.loads(message)
msg_type = data.get('type')
if msg_type == "Begin":
session_id = data.get('id')
expires_at = data.get('expires_at')
print(f"\nSession began: ID={session_id}, ExpiresAt={datetime.fromtimestamp(expires_at)}")
elif msg_type == "Turn":
transcript = data.get('transcript', '')
end_of_turn = data.get('end_of_turn', False)
# Print final end-of-turn transcript
if end_of_turn:
print('\r' + ' ' * 80 + '\r', end='')
print(transcript)
else:
print(f"\r{transcript}", end='')
elif msg_type == "Termination":
audio_duration = data.get('audio_duration_seconds', 0)
session_duration = data.get('session_duration_seconds', 0)
print(f"\nSession Terminated: Audio Duration={audio_duration}s, Session Duration={session_duration}s")
except json.JSONDecodeError as e:
print(f"Error decoding message: {e}")
except Exception as e:
print(f"Error handling message: {e}")
def on_error(ws, error):
"""Called when a WebSocket error occurs."""
print(f"\nWebSocket Error: {error}")
# Attempt to signal stop on error
stop_event.set()
def on_close(ws, close_status_code, close_msg):
"""Called when the WebSocket connection is closed."""
print(f"\nWebSocket Disconnected: Status={close_status_code}, Msg={close_msg}")
# Ensure audio resources are released
global stream, audio
stop_event.set() # Signal audio thread just in case it's still running
if stream:
if stream.is_active():
stream.stop_stream()
stream.close()
stream = None
if audio:
audio.terminate()
audio = None
# Try to join the audio thread to ensure clean exit
if audio_thread and audio_thread.is_alive():
audio_thread.join(timeout=1.0)
# --- Main Execution ---
def run():
global audio, stream, ws_app
# Initialize PyAudio
audio = pyaudio.PyAudio()
# Open microphone stream
try:
stream = audio.open(
input=True,
frames_per_buffer=FRAMES_PER_BUFFER,
channels=CHANNELS,
format=FORMAT,
rate=SAMPLE_RATE,
)
print("Microphone stream opened successfully.")
print("Speak into your microphone. Press Ctrl+C to stop.")
except Exception as e:
print(f"Error opening microphone stream: {e}")
if audio:
audio.terminate()
return # Exit if microphone cannot be opened
# Create WebSocketApp
ws_app = websocket.WebSocketApp(
API_ENDPOINT,
header={"Authorization": YOUR_API_KEY},
on_open=on_open,
on_message=on_message,
on_error=on_error,
on_close=on_close,
)
# Run WebSocketApp in a separate thread to allow main thread to catch KeyboardInterrupt
ws_thread = threading.Thread(target=ws_app.run_forever)
ws_thread.daemon = True
ws_thread.start()
try:
# Keep main thread alive until interrupted
while ws_thread.is_alive():
time.sleep(0.1)
except KeyboardInterrupt:
print("\nCtrl+C received. Stopping...")
stop_event.set() # Signal audio thread to stop
# Send termination message to the server
if ws_app and ws_app.sock and ws_app.sock.connected:
try:
terminate_message = {"type": "Terminate"}
print(f"Sending termination message: {json.dumps(terminate_message)}")
ws_app.send(json.dumps(terminate_message))
# Give a moment for messages to process before forceful close
time.sleep(5)
except Exception as e:
print(f"Error sending termination message: {e}")
# Close the WebSocket connection (will trigger on_close)
if ws_app:
ws_app.close()
# Wait for WebSocket thread to finish
ws_thread.join(timeout=2.0)
except Exception as e:
print(f"\nAn unexpected error occurred: {e}")
stop_event.set()
if ws_app:
ws_app.close()
ws_thread.join(timeout=2.0)
finally:
# Final cleanup (already handled in on_close, but good as a fallback)
if stream and stream.is_active():
stream.stop_stream()
if stream:
stream.close()
if audio:
audio.terminate()
print("Cleanup complete. Exiting.")
if __name__ == "__main__":
run()
Authentication
- Gladia
- AssemblyAI
import asyncio
import base64
import json
import signal
from datetime import time
import pyaudio
import requests
from websockets.asyncio.client import connect
from websockets.exceptions import ConnectionClosedOK
GLADIA_API_KEY = "<YOUR_GLADIA_API_KEY>"
import pyaudio
import websocket
import json
import threading
import time
from urllib.parse import urlencode
from datetime import datetime
YOUR_API_KEY = "YOUR-API-KEY"
Protect Your API KeyFor improved security, store your API key as an environment variable.
Connection Parameters & Microphone Setup
- Gladia
- AssemblyAI
GLADIA_API_URL = "https://api.gladia.io"
CHANNELS = 1
FORMAT = pyaudio.paInt16
FRAMES_PER_BUFFER = 3200
SAMPLE_RATE = 16_000
config = {
"encoding": "wav/pcm",
"sample_rate": SAMPLE_RATE,
"bit_depth": 16,
"channels": CHANNELS,
"language_config": {
"languages": ["en"]
},
}
CONNECTION_PARAMS = {
"sample_rate": 16000,
"speech_model": "u3-rt-pro",
}
API_ENDPOINT_BASE_URL = "wss://streaming.assemblyai.com/v3/ws"
API_ENDPOINT = f"{API_ENDPOINT_BASE_URL}?{urlencode(CONNECTION_PARAMS)}"
# Audio Configuration
FRAMES_PER_BUFFER = 800 # 50ms of audio (0.05s * 16000Hz)
SAMPLE_RATE = CONNECTION_PARAMS["sample_rate"]
CHANNELS = 1
FORMAT = pyaudio.paInt16
# Global variables for audio stream and websocket
audio = None
stream = None
ws_app = None
audio_thread = None
stop_event = threading.Event() # To signal the audio thread to stop
-
Universal-3 Pro Model — Connect to
wss://streaming.assemblyai.com/v3/wswithspeech_model=u3-rt-proto use our latest, highest-accuracy streaming model — Universal-3 Pro. - Built-in Formatting — Universal-3 Pro always returns formatted transcripts with smart punctuation & casing. No extra parameter is needed.
- Partial Transcripts — AssemblyAI streams interim results automatically. Universal-3 Pro emits partials during periods of silence, with at most one partial per silence period.
Open Microphone Stream & Create WebSocket
- Gladia
- AssemblyAI
# Start the live session
response = requests.post(
f"{GLADIA_API_URL}/v2/live",
headers={"X-Gladia-Key": GLADIA_API_KEY},
json=config,
timeout=3,
)
if not response.ok:
print(f"{response.status_code}: {response.text or response.reason}")
exit(response.status_code)
session_data = response.json()
async def send_audio(websocket):
# Initialize PyAudio
p = pyaudio.PyAudio()
# Open audio stream
stream = p.open(
format=FORMAT,
channels=CHANNELS,
rate=SAMPLE_RATE,
input=True,
frames_per_buffer=FRAMES_PER_BUFFER,
)
# Send audio chunks
while True:
data = stream.read(FRAMES_PER_BUFFER)
data = base64.b64encode(data).decode("utf-8")
json_data = json.dumps({"type": "audio_chunk", "data": {"chunk": str(data)}})
try:
await websocket.send(json_data)
await asyncio.sleep(0.1) # Send audio every 100ms
except ConnectionClosedOK:
return
global audio, stream, ws_app
# Initialize PyAudio
audio = pyaudio.PyAudio()
# Open microphone stream
try:
stream = audio.open(
input=True,
frames_per_buffer=FRAMES_PER_BUFFER,
channels=CHANNELS,
format=FORMAT,
rate=SAMPLE_RATE,
)
print("Microphone stream opened successfully.")
print("Speak into your microphone. Press Ctrl+C to stop.")
except Exception as e:
print(f"Error opening microphone stream: {e}")
if audio:
audio.terminate()
return # Exit if microphone cannot be opened
# Create WebSocketApp
ws_app = websocket.WebSocketApp(
API_ENDPOINT,
header={"Authorization": YOUR_API_KEY},
on_open=on_open,
on_message=on_message,
on_error=on_error,
on_close=on_close,
)
# Run WebSocketApp in a separate thread to allow main thread to catch KeyboardInterrupt
ws_thread = threading.Thread(target=ws_app.run_forever)
ws_thread.daemon = True
ws_thread.start()
Websocket Error HandlingCapture and log any errors emitted by the WebSocket connection to streamline troubleshooting and maintain smooth operation.
def on_error(ws, error):
"""Called when a WebSocket error occurs."""
print(f'\nWebSocket Error: {error}')
# Attempt to signal stop on error
stop_event.set()
Open WebSocket
- Gladia
- AssemblyAI
# Connect to Websocket
async with connect(session_data["url"]) as websocket:
print("\n################ Begin session ################\n")
# Create tasks for sending audio and receiving transcripts
send_audio_task = asyncio.create_task(send_audio(websocket))
receive_transcript_task = asyncio.create_task(receive_transcript(websocket))
await asyncio.wait([send_audio_task, receive_transcript_task])
def on_open(ws):
"""Called when the WebSocket connection is established."""
print("WebSocket connection opened.")
print(f"Connected to: {API_ENDPOINT}")
# Start sending audio data in a separate thread
def stream_audio():
global stream
print("Starting audio streaming...")
while not stop_event.is_set():
try:
audio_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False)
# Send audio data as binary message
ws.send(audio_data, websocket.ABNF.OPCODE_BINARY)
except Exception as e:
print(f"Error streaming audio: {e}")
# If stream read fails, likely means it's closed, stop the loop
break
print("Audio streaming stopped.")
global audio_thread
audio_thread = threading.Thread(target=stream_audio)
audio_thread.daemon = (
True # Allow main thread to exit even if this thread is running
)
audio_thread.start()
Receive Messsages from WebSocket
- Gladia
- AssemblyAI
async def receive_transcript(websocket):
# Process incoming messages
async for message in websocket:
content = json.loads(message)
# Print transcripts
if content["type"] == "transcript" and content["data"]["is_final"]:
text = content["data"]["utterance"]["text"].strip()
print(f"Final: {text}")
# Print final results
if content["type"] == "post_final_transcript":
print("\n################ End of session ################\n")
print(json.dumps(content, indent=2, ensure_ascii=False))
def on_message(ws, message):
try:
data = json.loads(message)
msg_type = data.get('type')
if msg_type == "Begin":
session_id = data.get('id')
expires_at = data.get('expires_at')
print(f"\nSession began: ID={session_id}, ExpiresAt={datetime.fromtimestamp(expires_at)}")
elif msg_type == "Turn":
transcript = data.get('transcript', '')
end_of_turn = data.get('end_of_turn', False)
# Print final end-of-turn transcript
if end_of_turn:
print('\r' + ' ' * 80 + '\r', end='')
print(transcript)
else:
print(f"\r{transcript}", end='')
elif msg_type == "Termination":
audio_duration = data.get('audio_duration_seconds', 0)
session_duration = data.get('session_duration_seconds', 0)
print(f"\nSession Terminated: Audio Duration={audio_duration}s, Session Duration={session_duration}s")
except json.JSONDecodeError as e:
print(f"Error decoding message: {e}")
except Exception as e:
print(f"Error handling message: {e}")
-
Clear Message Types – Instead of checking
is_final, you’ll receive explicit"Begin","Turn", and"Termination"events, making your logic simpler and more readable. -
Session Metadata Up-Front – The first
"Begin"message delivers asession_idand expiry timestamp so you can immediately log or surface these for tracing or billing. -
End-of-Turn Detection – Each
"Turn"object includes anend_of_turnboolean. Whenend_of_turnistrue, the transcript is a final, formatted result. Whenfalse, it is a partial transcript. Universal-3 Pro always returns formatted transcripts with smart punctuation & casing built in.
Close the WebSocket
- Gladia
- AssemblyAI
async def stop_recording(websocket):
print(">>>>> Ending the recording…")
await websocket.send(json.dumps({"type": "stop_recording"}))
await asyncio.sleep(0)
def on_close(ws, close_status_code, close_msg):
"""Called when the WebSocket connection is closed."""
print(f"\nWebSocket Disconnected: Status={close_status_code}, Msg={close_msg}")
# Ensure audio resources are released
global stream, audio
stop_event.set() # Signal audio thread just in case it's still running
if stream:
if stream.is_active():
stream.stop_stream()
stream.close()
stream = None
if audio:
audio.terminate()
audio = None
# Try to join the audio thread to ensure clean exit
if audio_thread and audio_thread.is_alive():
audio_thread.join(timeout=1.0)
- Connection Diagnostics - If the socket closes unexpectedly, AssemblyAI supplies both a status code and a reason message (close_status_code, close_msg), so you know immediately whether the server timed out, refused authentication, or encountered a different error.
Session Shutdown
- Gladia
- AssemblyAI
async with connect(session_data["url"]) as websocket:
...
# Handle Ctrl+C to stop recording
loop = asyncio.get_running_loop()
loop.add_signal_handler(
signal.SIGINT,
lambda: loop.create_task(stop_recording(websocket)),
)
try:
# Keep main thread alive until interrupted
while ws_thread.is_alive():
time.sleep(0.1)
except KeyboardInterrupt:
print("\nCtrl+C received. Stopping...")
stop_event.set() # Signal audio thread to stop
# Send termination message to the server
if ws_app and ws_app.sock and ws_app.sock.connected:
try:
terminate_message = {"type": "Terminate"}
print(f"Sending termination message: {json.dumps(terminate_message)}")
ws_app.send(json.dumps(terminate_message))
# Give a moment for messages to process before forceful close
time.sleep(5)
except Exception as e:
print(f"Error sending termination message: {e}")
# Close the WebSocket connection (will trigger on_close)
if ws_app:
ws_app.close()
# Wait for WebSocket thread to finish
ws_thread.join(timeout=2.0)
except Exception as e:
print(f"\nAn unexpected error occurred: {e}")
stop_event.set()
if ws_app:
ws_app.close()
ws_thread.join(timeout=2.0)
finally:
# Final cleanup (already handled in on_close, but good as a fallback)
if stream and stream.is_active():
stream.stop_stream()
if stream:
stream.close()
if audio:
audio.terminate()
print("Cleanup complete. Exiting.")
-
JSON Payload Difference - When closing the stream with AssemblyAI, your JSON payload will be
{ "type": "Terminate" }instead of{ "type": "stop_recording" }. - No Metadata Race Condition - AssemblyAI provides session info at “Begin” and doesn’t append extra data at shutdown, making the exit faster and less error-prone.