Get started
Before we begin, make sure you have an AssemblyAI account and an API key. You can sign up for a free account and get your API key from your dashboard.Side-by-side code comparison
Below is a side-by-side comparison of a basic Python code snippet to transcribe streaming audio by Speechmatics and AssemblyAI.- Speechmatics
- AssemblyAI
import pyaudio
import websocket
import json
import threading
import time
# --- Configuration ---
YOUR_API_KEY = "YOUR-API-KEY" # Replace with your actual API key
CONNECTION_PARAMS = {
"language": "en",
"enable_partials": True,
"max_delay": 2.0
}
API_ENDPOINT = "wss://eu2.rt.speechmatics.com/v2/en"
# Audio Configuration
FRAMES_PER_BUFFER = 1024 # Chunk size
SAMPLE_RATE = None # Will be set based on device capabilities
CHANNELS = 1
FORMAT = pyaudio.paFloat32 # Speechmatics uses float32 format
# Global variables for audio stream and websocket
audio = None
stream = None
ws_app = None
audio_thread = None
stop_event = threading.Event() # To signal the audio thread to stop
audio_seq_no = 0 # Track number of audio chunks sent
# --- WebSocket Event Handlers ---
def on_open(ws):
"""Called when the WebSocket connection is established."""
print("WebSocket connection opened.")
print(f"Connected to: {API_ENDPOINT}")
# Send StartRecognition message
start_message = {
"message": "StartRecognition",
"audio_format": {
"type": "raw",
"encoding": "pcm_f32le",
"sample_rate": SAMPLE_RATE
},
"transcription_config": {
"language": CONNECTION_PARAMS["language"],
"enable_partials": CONNECTION_PARAMS["enable_partials"],
"max_delay": CONNECTION_PARAMS["max_delay"]
}
}
ws.send(json.dumps(start_message))
def on_message(ws, message):
global audio_seq_no
try:
data = json.loads(message)
msg_type = data.get('message')
if msg_type == "RecognitionStarted":
session_id = data.get('id')
print(f"\nSession began: ID={session_id}")
# Start sending audio data in a separate thread
def stream_audio():
global audio_seq_no, stream
print("Starting audio streaming...")
while not stop_event.is_set():
try:
audio_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False)
# Send audio data as binary message
ws.send(audio_data, websocket.ABNF.OPCODE_BINARY)
audio_seq_no += 1
except Exception as e:
print(f"Error streaming audio: {e}")
# If stream read fails, likely means it's closed, stop the loop
break
print("Audio streaming stopped.")
global audio_thread
audio_thread = threading.Thread(target=stream_audio)
audio_thread.daemon = (
True # Allow main thread to exit even if this thread is running
)
audio_thread.start()
elif msg_type == "AddPartialTranscript":
transcript = data.get('metadata', {}).get('transcript', '')
if transcript:
print(f"\r{transcript}", end='')
elif msg_type == "AddTranscript":
transcript = data.get('metadata', {}).get('transcript', '')
if transcript:
# Clear previous line for final messages
print('\r' + ' ' * 80 + '\r', end='')
print(transcript)
elif msg_type == "EndOfTranscript":
print("\nSession Terminated: Transcription complete")
elif msg_type == "Error":
error_type = data.get('type')
reason = data.get('reason')
print(f"\nWebSocket Error: {error_type} - {reason}")
stop_event.set()
except json.JSONDecodeError as e:
print(f"Error decoding message: {e}")
except Exception as e:
print(f"Error handling message: {e}")
def on_error(ws, error):
"""Called when a WebSocket error occurs."""
print(f"\nWebSocket Error: {error}") # Attempt to signal stop on error
stop_event.set()
def on_close(ws, close_status_code, close_msg):
"""Called when the WebSocket connection is closed."""
print(f"\nWebSocket Disconnected: Status={close_status_code}, Msg={close_msg}")
# Ensure audio resources are released
global stream, audio
stop_event.set() # Signal audio thread just in case it's still running
if stream:
if stream.is_active():
stream.stop_stream()
stream.close()
stream = None
if audio:
audio.terminate()
audio = None
# Try to join the audio thread to ensure clean exit
if audio_thread and audio_thread.is_alive():
audio_thread.join(timeout=1.0)
# --- Main Execution ---
def run():
global audio, stream, ws_app, SAMPLE_RATE
# Initialize PyAudio
audio = pyaudio.PyAudio()
# Get default input device (can alter to specify specific device)
default_device = audio.get_default_input_device_info()
device_index = default_device['index']
SAMPLE_RATE = int(audio.get_device_info_by_index(device_index)['defaultSampleRate'])
print(f"Using microphone: {default_device['name']}")
# Open microphone stream
try:
stream = audio.open(
input=True,
frames_per_buffer=FRAMES_PER_BUFFER,
channels=CHANNELS,
format=FORMAT,
rate=SAMPLE_RATE,
input_device_index=device_index
)
print("Microphone stream opened successfully.")
print("Speak into your microphone. Press Ctrl+C to stop.")
except Exception as e:
print(f"Error opening microphone stream: {e}")
if audio:
audio.terminate()
return # Exit if microphone cannot be opened
# Create WebSocketApp
ws_app = websocket.WebSocketApp(
API_ENDPOINT,
header={"Authorization": f"Bearer {YOUR_API_KEY}"}, # Speechmatics uses Bearer token
on_open=on_open,
on_message=on_message,
on_error=on_error,
on_close=on_close,
)
# Run WebSocketApp in a separate thread to allow main thread to catch KeyboardInterrupt
ws_thread = threading.Thread(target=lambda: ws_app.run_forever(ping_interval=30, ping_timeout=10))
ws_thread.daemon = True
ws_thread.start()
try:
# Keep main thread alive until interrupted
while ws_thread.is_alive():
time.sleep(0.1)
except KeyboardInterrupt:
print("\nCtrl+C received. Stopping...")
stop_event.set() # Signal audio thread to stop
# Send EndOfStream message to the server
if ws_app and ws_app.sock and ws_app.sock.connected:
try:
end_message = {
"message": "EndOfStream",
"last_seq_no": audio_seq_no
}
print(f"Sending termination message: {json.dumps(end_message)}")
ws_app.send(json.dumps(end_message))
# Give a moment for messages to process before forceful close
time.sleep(1)
except Exception as e:
print(f"Error sending termination message: {e}")
# Close the WebSocket connection (will trigger on_close)
if ws_app:
ws_app.close()
# Wait for WebSocket thread to finish
ws_thread.join(timeout=2.0)
except Exception as e:
print(f"\nAn unexpected error occurred: {e}")
stop_event.set()
if ws_app:
ws_app.close()
ws_thread.join(timeout=2.0)
finally:
# Final cleanup (already handled in on_close, but good as a fallback)
if stream and stream.is_active():
stream.stop_stream()
if stream:
stream.close()
if audio:
audio.terminate()
print("Cleanup complete. Exiting.")
if __name__ == "__main__":
run()
import pyaudio
import websocket
import json
import threading
import time
from urllib.parse import urlencode
from datetime import datetime
# --- Configuration ---
YOUR_API_KEY = "YOUR-API-KEY" # Replace with your actual API key
CONNECTION_PARAMS = {
"sample_rate": 16000,
"speech_model": "u3-rt-pro",
}
API_ENDPOINT_BASE_URL = "wss://streaming.assemblyai.com/v3/ws"
API_ENDPOINT = f"{API_ENDPOINT_BASE_URL}?{urlencode(CONNECTION_PARAMS)}"
# Audio Configuration
FRAMES_PER_BUFFER = 800 # 50ms of audio (0.05s * 16000Hz)
SAMPLE_RATE = CONNECTION_PARAMS["sample_rate"]
CHANNELS = 1
FORMAT = pyaudio.paInt16
# Global variables for audio stream and websocket
audio = None
stream = None
ws_app = None
audio_thread = None
stop_event = threading.Event() # To signal the audio thread to stop
# --- WebSocket Event Handlers ---
def on_open(ws):
"""Called when the WebSocket connection is established."""
print("WebSocket connection opened.")
print(f"Connected to: {API_ENDPOINT}")
# Start sending audio data in a separate thread
def stream_audio():
global stream
print("Starting audio streaming...")
while not stop_event.is_set():
try:
audio_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False)
# Send audio data as binary message
ws.send(audio_data, websocket.ABNF.OPCODE_BINARY)
except Exception as e:
print(f"Error streaming audio: {e}")
# If stream read fails, likely means it's closed, stop the loop
break
print("Audio streaming stopped.")
global audio_thread
audio_thread = threading.Thread(target=stream_audio)
audio_thread.daemon = (
True # Allow main thread to exit even if this thread is running
)
audio_thread.start()
def on_message(ws, message):
try:
data = json.loads(message)
msg_type = data.get('type')
if msg_type == "Begin":
session_id = data.get('id')
expires_at = data.get('expires_at')
print(f"\nSession began: ID={session_id}, ExpiresAt={datetime.fromtimestamp(expires_at)}")
elif msg_type == "Turn":
transcript = data.get('transcript', '')
end_of_turn = data.get('end_of_turn', False)
# Print final end-of-turn transcript
if end_of_turn:
print('\r' + ' ' * 80 + '\r', end='')
print(transcript)
else:
print(f"\r{transcript}", end='')
elif msg_type == "Termination":
audio_duration = data.get('audio_duration_seconds', 0)
session_duration = data.get('session_duration_seconds', 0)
print(f"\nSession Terminated: Audio Duration={audio_duration}s, Session Duration={session_duration}s")
except json.JSONDecodeError as e:
print(f"Error decoding message: {e}")
except Exception as e:
print(f"Error handling message: {e}")
def on_error(ws, error):
"""Called when a WebSocket error occurs."""
print(f"\nWebSocket Error: {error}")
# Attempt to signal stop on error
stop_event.set()
def on_close(ws, close_status_code, close_msg):
"""Called when the WebSocket connection is closed."""
print(f"\nWebSocket Disconnected: Status={close_status_code}, Msg={close_msg}")
# Ensure audio resources are released
global stream, audio
stop_event.set() # Signal audio thread just in case it's still running
if stream:
if stream.is_active():
stream.stop_stream()
stream.close()
stream = None
if audio:
audio.terminate()
audio = None
# Try to join the audio thread to ensure clean exit
if audio_thread and audio_thread.is_alive():
audio_thread.join(timeout=1.0)
# --- Main Execution ---
def run():
global audio, stream, ws_app
# Initialize PyAudio
audio = pyaudio.PyAudio()
# Open microphone stream
try:
stream = audio.open(
input=True,
frames_per_buffer=FRAMES_PER_BUFFER,
channels=CHANNELS,
format=FORMAT,
rate=SAMPLE_RATE,
)
print("Microphone stream opened successfully.")
print("Speak into your microphone. Press Ctrl+C to stop.")
except Exception as e:
print(f"Error opening microphone stream: {e}")
if audio:
audio.terminate()
return # Exit if microphone cannot be opened
# Create WebSocketApp
ws_app = websocket.WebSocketApp(
API_ENDPOINT,
header={"Authorization": YOUR_API_KEY},
on_open=on_open,
on_message=on_message,
on_error=on_error,
on_close=on_close,
)
# Run WebSocketApp in a separate thread to allow main thread to catch KeyboardInterrupt
ws_thread = threading.Thread(target=ws_app.run_forever)
ws_thread.daemon = True
ws_thread.start()
try:
# Keep main thread alive until interrupted
while ws_thread.is_alive():
time.sleep(0.1)
except KeyboardInterrupt:
print("\nCtrl+C received. Stopping...")
stop_event.set() # Signal audio thread to stop
# Send termination message to the server
if ws_app and ws_app.sock and ws_app.sock.connected:
try:
terminate_message = {"type": "Terminate"}
print(f"Sending termination message: {json.dumps(terminate_message)}")
ws_app.send(json.dumps(terminate_message))
# Give a moment for messages to process before forceful close
time.sleep(5)
except Exception as e:
print(f"Error sending termination message: {e}")
# Close the WebSocket connection (will trigger on_close)
if ws_app:
ws_app.close()
# Wait for WebSocket thread to finish
ws_thread.join(timeout=2.0)
except Exception as e:
print(f"\nAn unexpected error occurred: {e}")
stop_event.set()
if ws_app:
ws_app.close()
ws_thread.join(timeout=2.0)
finally:
# Final cleanup (already handled in on_close, but good as a fallback)
if stream and stream.is_active():
stream.stop_stream()
if stream:
stream.close()
if audio:
audio.terminate()
print("Cleanup complete. Exiting.")
if __name__ == "__main__":
run()
Step 1: Install dependencies
Step 2: Configure the API key
In this step, you’ll configure your API key to authenticate your requests.- Speechmatics
- AssemblyAI
Store your API key in a variable. Replace
<YOUR_API_KEY> with your copied API key.import pyaudio
import websocket
import json
import threading
import time
YOUR_API_KEY = "YOUR-API-KEY"
Store your API key in a variable. Replace
<YOUR_API_KEY> with your copied API key.import pyaudio
import websocket
import json
import threading
import time
from urllib.parse import urlencode
from datetime import datetime
YOUR_API_KEY = "YOUR-API-KEY"
Authenticate With A Temporary Token
- Speechmatics
- AssemblyAI
import requests
def generate_temp_token(api_key, ttl=60):
"""Generate a temporary authentication token that expires after the specified time."""
url = "https://mp.speechmatics.com/v1/api_keys?type=rt"
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {api_key}"
}
payload = {
"ttl": ttl
}
response = requests.post(url, json=payload, headers=headers)
data = response.json()
return data.get("key_value")
Token usageInstead of authorizing your request with
YOUR_API_KEY (via request header), use the temporary token generated by this function when establishing the WebSocket connection. API_ENDPOINT= f"wss://eu2.rt.speechmatics.com/v2?jwt={generate_temp_token(api_key)}"
ws_app = websocket.WebSocketApp(
API_ENDPOINT,
on_open=on_open,
on_message=on_message,
on_error=on_error,
on_close=on_close,
)
import requests
from urllib.parse import urlencode
def generate_temp_token(api_key, expires_in_seconds=60):
"""Generate a temporary authentication token that expires after the specified time."""
url = "https://streaming.assemblyai.com/v3/token"
response = requests.get(
f"{url}?{urlencode({'expires_in_seconds': expires_in_seconds})}",
headers={"Authorization": api_key}
)
data = response.json()
return data.get("token")
Token usageInstead of authorizing your request with
YOUR_API_KEY (via request header), use the temporary token generated by this function when establishing the WebSocket connection. API_ENDPOINT = f"{API_ENDPOINT_BASE_URL}?{urlencode(CONNECTION_PARAMS)}&token={generate_temp_token(api_key)}"
ws_app = websocket.WebSocketApp(
API_ENDPOINT,
on_open=on_open,
on_message=on_message,
on_error=on_error,
on_close=on_close,
)
Step 3: Set up audio configuration
Configure the audio settings for your microphone stream.
- Speechmatics
- AssemblyAI
import pyaudio
# Audio Configuration
FRAMES_PER_BUFFER = 1024 # Chunk size
SAMPLE_RATE = None # Will be set based on device capabilities
CHANNELS = 1
FORMAT = pyaudio.paFloat32 # Speechmatics uses float32 format
# Global variables for audio stream and websocket
audio = None
stream = None
ws_app = None
audio_thread = None
stop_event = threading.Event() # To signal the audio thread to stop
audio_seq_no = 0 # Track number of audio chunks sent
def run():
global audio, stream, ws_app, SAMPLE_RATE
# Initialize PyAudio
audio = pyaudio.PyAudio()
# Get default input device (can alter to specify specific device)
default_device = audio.get_default_input_device_info()
device_index = default_device['index']
SAMPLE_RATE = int(audio.get_device_info_by_index(device_index)['defaultSampleRate'])
print(f"Using microphone: {default_device['name']}")
# Open microphone stream
try:
stream = audio.open(
input=True,
frames_per_buffer=FRAMES_PER_BUFFER,
channels=CHANNELS,
format=FORMAT,
rate=SAMPLE_RATE,
input_device_index=device_index
)
print("Microphone stream opened successfully.")
print("Speak into your microphone. Press Ctrl+C to stop.")
except Exception as e:
print(f"Error opening microphone stream: {e}")
if audio:
audio.terminate()
return # Exit if microphone cannot be opened
Sample rateSpeechmatics recommends using a
16 kHz sample rate for speech audio. Anything higher will be downsampled server-side.import pyaudio
CONNECTION_PARAMS = {
"sample_rate": 16000,
"speech_model": "u3-rt-pro",
}
# Audio Configuration
FRAMES_PER_BUFFER = 800 # 50ms of audio (0.05s \* 16000Hz)
SAMPLE_RATE = CONNECTION_PARAMS["sample_rate"]
CHANNELS = 1
FORMAT = pyaudio.paInt16
# Global variables for audio stream and websocket
audio = None
stream = None
ws_app = None
audio_thread = None
stop_event = threading.Event() # To signal the audio thread to stop
def run():
global audio, stream, ws_app
# Initialize PyAudio
audio = pyaudio.PyAudio()
# Open microphone stream
try:
stream = audio.open(
input=True,
frames_per_buffer=FRAMES_PER_BUFFER,
channels=CHANNELS,
format=FORMAT,
rate=SAMPLE_RATE,
)
print("Microphone stream opened successfully.")
print("Speak into your microphone. Press Ctrl+C to stop.")
except Exception as e:
print(f"Error opening microphone stream: {e}")
if audio:
audio.terminate()
return # Exit if microphone cannot be opened
Sample rateUsing a sample rate of
16 kHz and encoding of pcm_s16le is recommended, as our STT model operates at a 16 kHz sample rate.
If the incoming audio uses a different rate, we perform additional sampling rate conversion under the hood, which might marginally increase latency.Audio data formatIf you want to stream data from elsewhere, make sure that your audio data is in the following format:
- Single-channel
- PCM16 (default) or Mu-law encoding
- A sample rate that matches the value of the
sample_rateparameter (16 kHz is recommended) - 50 milliseconds of audio per message (larger chunk sizes are workable, but may result in latency fluctuations)
Step 4: Create event handlers
In this step, you’ll set up callback functions that handle the different events.Create functions to handle the events from the real-time service.
- Speechmatics
- AssemblyAI
import json
def on_open(ws):
"""Called when the WebSocket connection is established."""
print("WebSocket connection opened.")
print(f"Connected to: {API_ENDPOINT}")
# Send StartRecognition message
start_message = {
"message": "StartRecognition",
"audio_format": {
"type": "raw",
"encoding": "pcm_f32le",
"sample_rate": SAMPLE_RATE
},
"transcription_config": {
"language": CONNECTION_PARAMS["language"],
"enable_partials": CONNECTION_PARAMS["enable_partials"],
"max_delay": CONNECTION_PARAMS["max_delay"]
}
}
ws.send(json.dumps(start_message))
def on_error(ws, error):
"""Called when a WebSocket error occurs."""
print(f"\nWebSocket Error: {error}")
# Attempt to signal stop on error
stop_event.set()
def on_close(ws, close_status_code, close_msg):
"""Called when the WebSocket connection is closed."""
print(f"\nWebSocket Disconnected: Status={close_status_code}, Msg={close_msg}")
# Ensure audio resources are released
global stream, audio
stop_event.set() # Signal audio thread just in case it's still running
if stream:
if stream.is_active():
stream.stop_stream()
stream.close()
stream = None
if audio:
audio.terminate()
audio = None
# Try to join the audio thread to ensure clean exit
if audio_thread and audio_thread.is_alive():
audio_thread.join(timeout=1.0)
import threading
def on_open(ws):
"""Called when the WebSocket connection is established."""
print("WebSocket connection opened.")
print(f"Connected to: {API_ENDPOINT}")
# Start sending audio data in a separate thread
def stream_audio():
global stream
print("Starting audio streaming...")
while not stop_event.is_set():
try:
audio_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False)
# Send audio data as binary message
ws.send(audio_data, websocket.ABNF.OPCODE_BINARY)
except Exception as e:
print(f"Error streaming audio: {e}")
# If stream read fails, likely means it's closed, stop the loop
break
print("Audio streaming stopped.")
global audio_thread
audio_thread = threading.Thread(target=stream_audio)
audio_thread.daemon = (
True # Allow main thread to exit even if this thread is running
)
audio_thread.start()
def on_error(ws, error):
"""Called when a WebSocket error occurs."""
print(f"\nWebSocket Error: {error}") # Attempt to signal stop on error
stop_event.set()
def on_close(ws, close_status_code, close_msg):
"""Called when the WebSocket connection is closed."""
print(f"\nWebSocket Disconnected: Status={close_status_code}, Msg={close_msg}")
# Ensure audio resources are released
global stream, audio
stop_event.set() # Signal audio thread just in case it's still running
if stream:
if stream.is_active():
stream.stop_stream()
stream.close()
stream = None
if audio:
audio.terminate()
audio = None
# Try to join the audio thread to ensure clean exit
if audio_thread and audio_thread.is_alive():
audio_thread.join(timeout=1.0)
Connection configurationSpeechmatics requires a handshake where the connection configuration is specified before audio is streamed. AssemblyAI allows you to configure the connection via query parameters in the URL and start streaming audio immediately.The Speechmatics handshake begins when
on_open sends a StartRecognition message to configure the session. Audio streaming only starts after the RecognitionStarted message type is parsed and confirmed in the on_message callback.Create another function to handle transcripts.Speechmatics has separate partial (
AddPartialTranscript) and final (AddTranscript) transcripts. The terminate session message is EndOfTranscript.AssemblyAI instead uses a Turn object with an end_of_turn boolean flag to indicate finality. The terminate session message is Termination.
For more on the Turn object, see Streaming Core concepts section.- Speechmatics
- AssemblyAI
def on_message(ws, message):
global audio_seq_no
try:
data = json.loads(message)
msg_type = data.get('message')
if msg_type == "RecognitionStarted":
session_id = data.get('id')
print(f"\nSession began: ID={session_id}")
# Start sending audio data in a separate thread
def stream_audio():
global audio_seq_no, stream
print("Starting audio streaming...")
while not stop_event.is_set():
try:
audio_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False)
# Send audio data as binary message
ws.send(audio_data, websocket.ABNF.OPCODE_BINARY)
audio_seq_no += 1
except Exception as e:
print(f"Error streaming audio: {e}")
# If stream read fails, likely means it's closed, stop the loop
break
print("Audio streaming stopped.")
global audio_thread
audio_thread = threading.Thread(target=stream_audio)
audio_thread.daemon = (
True # Allow main thread to exit even if this thread is running
)
audio_thread.start()
elif msg_type == "AddPartialTranscript":
transcript = data.get('metadata', {}).get('transcript', '')
if transcript:
print(f"\r{transcript}", end='')
elif msg_type == "AddTranscript":
transcript = data.get('metadata', {}).get('transcript', '')
if transcript:
# Clear previous line for final messages
print('\r' + ' ' * 80 + '\r', end='')
print(transcript)
elif msg_type == "EndOfTranscript":
print("\nSession Terminated: Transcription complete")
elif msg_type == "Error":
error_type = data.get('type')
reason = data.get('reason')
print(f"\nWebSocket Error: {error_type} - {reason}")
stop_event.set()
except json.JSONDecodeError as e:
print(f"Error decoding message: {e}")
except Exception as e:
print(f"Error handling message: {e}")
import json
from datetime import datetime
def on_message(ws, message):
try:
data = json.loads(message)
msg_type = data.get('type')
if msg_type == "Begin":
session_id = data.get('id')
expires_at = data.get('expires_at')
print(f"\nSession began: ID={session_id}, ExpiresAt={datetime.fromtimestamp(expires_at)}")
elif msg_type == "Turn":
transcript = data.get('transcript', '')
end_of_turn = data.get('end_of_turn', False)
# Print final end-of-turn transcript
if end_of_turn:
print('\r' + ' ' * 80 + '\r', end='')
print(transcript)
else:
print(f"\r{transcript}", end='')
elif msg_type == "Termination":
audio_duration = data.get('audio_duration_seconds', 0)
session_duration = data.get('session_duration_seconds', 0)
print(f"\nSession Terminated: Audio Duration={audio_duration}s, Session Duration={session_duration}s")
except json.JSONDecodeError as e:
print(f"Error decoding message: {e}")
except Exception as e:
print(f"Error handling message: {e}")
Transcript message structurePlease note the difference in transcript message structure below:
# Speechmatics
{
"message": "AddPartialTranscript",
"metadata": {
"transcript": "hello world"
},
# Other transcript data...
}
# AssemblyAI
{
"type": "Turn",
"transcript": "hello world",
"end_of_turn": false,
# Other transcript data...
}
Step 5: Connect and start transcription
To stream audio, establish a connection to the API via WebSockets.
- Speechmatics
- AssemblyAI
Create a WebSocket connection to the Realtime service.
def run():
global audio, stream, ws_app, SAMPLE_RATE
# Skipping audio/microphone setup code...
# Create WebSocketApp
ws_app = websocket.WebSocketApp(
API_ENDPOINT,
header={"Authorization": f"Bearer {YOUR_API_KEY}"}, # Speechmatics uses Bearer token
on_open=on_open,
on_message=on_message,
on_error=on_error,
on_close=on_close,
)
# Run WebSocketApp in a separate thread to allow main thread to catch KeyboardInterrupt
ws_thread = threading.Thread(target=lambda: ws_app.run_forever(ping_interval=30, ping_timeout=10))
ws_thread.daemon = True
ws_thread.start()
Create and run a WebSocket connection to the Realtime service.
import websocket
import threading
def run():
global audio, stream, ws_app
# Skipping audio/microphone setup code...
# Create WebSocketApp
ws_app = websocket.WebSocketApp(
API_ENDPOINT,
header={"Authorization": YOUR_API_KEY},
on_open=on_open,
on_message=on_message,
on_error=on_error,
on_close=on_close,
)
# Run WebSocketApp in a separate thread to allow main thread to catch KeyboardInterrupt
ws_thread = threading.Thread(target=ws_app.run_forever)
ws_thread.daemon = True
ws_thread.start()
**Authorization **Note that while both services use an
Authorization header to authenticate
the WebSocket connection, Speechmatics uses a Bearer prefix, while
AssemblyAI does not.Step 6: Close the connection
Keep the main thread alive until interrupted, handle keyboard interrupts and thrown exceptions, and clean up upon closing of the WebSocket connection.The connection will close automatically when you press
- Speechmatics
- AssemblyAI
def run():
global audio, stream, ws_app, SAMPLE_RATE
# Skipping audio/microphone setup and WebSocket connection code...
try:
# Keep main thread alive until interrupted
while ws_thread.is_alive():
time.sleep(0.1)
except KeyboardInterrupt:
print("\nCtrl+C received. Stopping...")
stop_event.set() # Signal audio thread to stop
# Send EndOfStream message to the server
if ws_app and ws_app.sock and ws_app.sock.connected:
try:
end_message = {
"message": "EndOfStream",
"last_seq_no": audio_seq_no
}
print(f"Sending termination message: {json.dumps(end_message)}")
ws_app.send(json.dumps(end_message))
# Give a moment for messages to process before forceful close
time.sleep(1)
except Exception as e:
print(f"Error sending termination message: {e}")
# Close the WebSocket connection (will trigger on_close)
if ws_app:
ws_app.close()
# Wait for WebSocket thread to finish
ws_thread.join(timeout=2.0)
except Exception as e:
print(f"\nAn unexpected error occurred: {e}")
stop_event.set()
if ws_app:
ws_app.close()
ws_thread.join(timeout=2.0)
finally:
# Final cleanup (already handled in on_close, but good as a fallback)
if stream and stream.is_active():
stream.stop_stream()
if stream:
stream.close()
if audio:
audio.terminate()
print("Cleanup complete. Exiting.")
def run():
global audio, stream, ws_app
# Skipping audio/microphone setup and WebSocket connection code...
try:
# Keep main thread alive until interrupted
while ws_thread.is_alive():
time.sleep(0.1)
except KeyboardInterrupt:
print("\nCtrl+C received. Stopping...")
stop_event.set() # Signal audio thread to stop
# Send termination message to the server
if ws_app and ws_app.sock and ws_app.sock.connected:
try:
terminate_message = {"type": "Terminate"}
print(f"Sending termination message: {json.dumps(terminate_message)}")
ws_app.send(json.dumps(terminate_message))
# Give a moment for messages to process before forceful close
time.sleep(5)
except Exception as e:
print(f"Error sending termination message: {e}")
# Close the WebSocket connection (will trigger on_close)
if ws_app:
ws_app.close()
# Wait for WebSocket thread to finish
ws_thread.join(timeout=2.0)
except Exception as e:
print(f"\nAn unexpected error occurred: {e}")
stop_event.set()
if ws_app:
ws_app.close()
ws_thread.join(timeout=2.0)
finally:
# Final cleanup (already handled in on_close, but good as a fallback)
if stream and stream.is_active():
stream.stop_stream()
if stream:
stream.close()
if audio:
audio.terminate()
print("Cleanup complete. Exiting.")
Ctrl+C. In both cases, the on_close handler will clean up the audio resources.Step 7: Execute the main function
Finally, run the main function to start the main execution.- Speechmatics
- AssemblyAI
if __name__ == "__main__":
run()
if __name__ == "__main__":
run()
Next steps
To learn more about both Streaming APIs, their key differences, and how to best migrate, see the following resources: AssemblyAI SpeechmaticsNeed some help?
If you get stuck or have any other questions, contact our support team atsupport@assemblyai.com or create a support ticket.