Below is a side-by-side comparison of a basic snippet to transcribe live audio by Deepgram and AssemblyAI using a microphone:
Deepgram
AssemblyAI
import json, threading, time, websocket, pyaudio, signalfrom urllib.parse import urlencodeDG_KEY = "YOUR_DG_API_KEY"PARAMS = { "model": "nova-3", "encoding": "linear16", "sample_rate": "16000", "channels": "1", "punctuate": "true", "interim_results": "true",}EP = f"wss://api.deepgram.com/v1/listen?{urlencode(PARAMS)}"FRAMES = 800CHANNELS = 1FORMAT = pyaudio.paInt16SAMPLE_RATE = int(PARAMS["sample_rate"])audio = stream = ws_app = Noneaudio_thread = ws_thread = Nonestop_event = threading.Event()# ── WebSocket callbacks ───────────────────────────────────────────────def on_open(ws): def mic_loop(): while not stop_event.is_set(): ws.send(stream.read(FRAMES, exception_on_overflow=False), websocket.ABNF.OPCODE_BINARY) global audio_thread audio_thread = threading.Thread(target=mic_loop, daemon=True) audio_thread.start()def on_message(ws, msg): global final_metadata d = json.loads(msg) if d.get("type") == "Results": txt = d["channel"]["alternatives"][0]["transcript"] if d["is_final"]: print(" " * 80, end="\r") print(txt) else: print(txt, end="\r") elif d.get("type") == "Metadata": final_metadata = d # save the metadata for laterdef on_error(ws, error): print(f"\nWebSocket error: {error}") stop_event.set()def on_close(ws, *args): stop_event.set() if final_metadata: print("\nFinal Metadata from Deepgram:") print(json.dumps(final_metadata, indent=2)) else: print("\nNo metadata received before close.") def graceful_shutdown(signum, frame): print("\nCtrl+C received → shutting down …") stop_event.set() # Send CloseStream before closing WebSocket if ws_app and ws_app.sock and ws_app.sock.connected: try: ws_app.send(json.dumps({"type": "CloseStream"})) time.sleep(0.5) # Allow time for final metadata to come through except Exception as e: print("Error sending CloseStream:", e) ws_app.close() if ws_thread and ws_thread.is_alive(): ws_thread.join(timeout=2.0)signal.signal(signal.SIGINT, graceful_shutdown)def run(): global audio, stream, ws_app, ws_thread # 1. open microphone audio = pyaudio.PyAudio() try: stream = audio.open(format=FORMAT, channels=CHANNELS, rate=SAMPLE_RATE, input=True, frames_per_buffer=FRAMES) print("Microphone stream opened. Press Ctrl+C to stop.") except Exception as e: print(f"Error opening mic: {e}") audio.terminate() return # 2. create WebSocket ws_app = websocket.WebSocketApp( EP, header={"Authorization": f"Token {DG_KEY}"}, on_open=on_open, on_message=on_message, on_error=on_error, on_close=on_close, ) # 3. start WS thread (only once!) ws_thread = threading.Thread(target=ws_app.run_forever, daemon=True) ws_thread.start() # 4. block main thread until WS thread ends ws_thread.join() # 5. cleanup if stream and stream.is_active(): stream.stop_stream() if stream: stream.close() if audio: audio.terminate() print("Cleanup complete. Exiting.")if __name__ == "__main__": run()
import pyaudioimport websocketimport jsonimport threadingimport timefrom urllib.parse import urlencodefrom datetime import datetime# --- Configuration ---YOUR_API_KEY = "YOUR-API-KEY" # Replace with your actual API keyCONNECTION_PARAMS = { "sample_rate": 16000, "speech_model": "u3-rt-pro",}API_ENDPOINT_BASE_URL = "wss://streaming.assemblyai.com/v3/ws"API_ENDPOINT = f"{API_ENDPOINT_BASE_URL}?{urlencode(CONNECTION_PARAMS)}"# Audio ConfigurationFRAMES_PER_BUFFER = 800 # 50ms of audio (0.05s * 16000Hz)SAMPLE_RATE = CONNECTION_PARAMS["sample_rate"]CHANNELS = 1FORMAT = pyaudio.paInt16# Global variables for audio stream and websocketaudio = Nonestream = Nonews_app = Noneaudio_thread = Nonestop_event = threading.Event() # To signal the audio thread to stop# --- WebSocket Event Handlers ---def on_open(ws): """Called when the WebSocket connection is established.""" print("WebSocket connection opened.") print(f"Connected to: {API_ENDPOINT}") # Start sending audio data in a separate thread def stream_audio(): global stream print("Starting audio streaming...") while not stop_event.is_set(): try: audio_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False) # Send audio data as binary message ws.send(audio_data, websocket.ABNF.OPCODE_BINARY) except Exception as e: print(f"Error streaming audio: {e}") # If stream read fails, likely means it's closed, stop the loop break print("Audio streaming stopped.") global audio_thread audio_thread = threading.Thread(target=stream_audio) audio_thread.daemon = ( True # Allow main thread to exit even if this thread is running ) audio_thread.start()def on_message(ws, message): try: data = json.loads(message) msg_type = data.get('type') if msg_type == "Begin": session_id = data.get('id') expires_at = data.get('expires_at') print(f"\nSession began: ID={session_id}, ExpiresAt={datetime.fromtimestamp(expires_at)}") elif msg_type == "Turn": transcript = data.get('transcript', '') end_of_turn = data.get('end_of_turn', False) # Print final end-of-turn transcript if end_of_turn: print('\r' + ' ' * 80 + '\r', end='') print(transcript) else: print(f"\r{transcript}", end='') elif msg_type == "Termination": audio_duration = data.get('audio_duration_seconds', 0) session_duration = data.get('session_duration_seconds', 0) print(f"\nSession Terminated: Audio Duration={audio_duration}s, Session Duration={session_duration}s") except json.JSONDecodeError as e: print(f"Error decoding message: {e}") except Exception as e: print(f"Error handling message: {e}")def on_error(ws, error): """Called when a WebSocket error occurs.""" print(f"\nWebSocket Error: {error}") # Attempt to signal stop on error stop_event.set()def on_close(ws, close_status_code, close_msg): """Called when the WebSocket connection is closed.""" print(f"\nWebSocket Disconnected: Status={close_status_code}, Msg={close_msg}") # Ensure audio resources are released global stream, audio stop_event.set() # Signal audio thread just in case it's still running if stream: if stream.is_active(): stream.stop_stream() stream.close() stream = None if audio: audio.terminate() audio = None # Try to join the audio thread to ensure clean exit if audio_thread and audio_thread.is_alive(): audio_thread.join(timeout=1.0)# --- Main Execution ---def run(): global audio, stream, ws_app # Initialize PyAudio audio = pyaudio.PyAudio() # Open microphone stream try: stream = audio.open( input=True, frames_per_buffer=FRAMES_PER_BUFFER, channels=CHANNELS, format=FORMAT, rate=SAMPLE_RATE, ) print("Microphone stream opened successfully.") print("Speak into your microphone. Press Ctrl+C to stop.") except Exception as e: print(f"Error opening microphone stream: {e}") if audio: audio.terminate() return # Exit if microphone cannot be opened # Create WebSocketApp ws_app = websocket.WebSocketApp( API_ENDPOINT, header={"Authorization": YOUR_API_KEY}, on_open=on_open, on_message=on_message, on_error=on_error, on_close=on_close, ) # Run WebSocketApp in a separate thread to allow main thread to catch KeyboardInterrupt ws_thread = threading.Thread(target=ws_app.run_forever) ws_thread.daemon = True ws_thread.start() try: # Keep main thread alive until interrupted while ws_thread.is_alive(): time.sleep(0.1) except KeyboardInterrupt: print("\nCtrl+C received. Stopping...") stop_event.set() # Signal audio thread to stop # Send termination message to the server if ws_app and ws_app.sock and ws_app.sock.connected: try: terminate_message = {"type": "Terminate"} print(f"Sending termination message: {json.dumps(terminate_message)}") ws_app.send(json.dumps(terminate_message)) # Give a moment for messages to process before forceful close time.sleep(5) except Exception as e: print(f"Error sending termination message: {e}") # Close the WebSocket connection (will trigger on_close) if ws_app: ws_app.close() # Wait for WebSocket thread to finish ws_thread.join(timeout=2.0) except Exception as e: print(f"\nAn unexpected error occurred: {e}") stop_event.set() if ws_app: ws_app.close() ws_thread.join(timeout=2.0) finally: # Final cleanup (already handled in on_close, but good as a fallback) if stream and stream.is_active(): stream.stop_stream() if stream: stream.close() if audio: audio.terminate() print("Cleanup complete. Exiting.")if __name__ == "__main__": run()
CONNECTION_PARAMS = { "sample_rate": 16000, "speech_model": "u3-rt-pro",}API_ENDPOINT_BASE_URL = "wss://streaming.assemblyai.com/v3/ws"API_ENDPOINT = f"{API_ENDPOINT_BASE_URL}?{urlencode(CONNECTION_PARAMS)}"# Audio ConfigurationFRAMES_PER_BUFFER = 800 # 50ms of audio (0.05s * 16000Hz)SAMPLE_RATE = CONNECTION_PARAMS["sample_rate"]CHANNELS = 1FORMAT = pyaudio.paInt16# Global variables for audio stream and websocketaudio = Nonestream = Nonews_app = Noneaudio_thread = Nonestop_event = threading.Event() # To signal the audio thread to stop
Here are helpful things to know about connecting our streaming model:
Universal-3 Pro model — Connect to wss://streaming.assemblyai.com/v3/ws with speech_model=u3-rt-pro to use our latest, highest-accuracy streaming model. Unlike Deepgram’s model="nova-3", you use a single speech_model parameter.
Built-in formatting — Universal-3 Pro always returns formatted transcripts with smart punctuation & casing, similar to Deepgram’s punctuate=true. No extra parameter is needed.
Partials are always on — like Deepgram’s interim_results=true — AssemblyAI streams interim results automatically. Universal-3 Pro emits partials during periods of silence, with at most one partial per silence period.
def on_open(ws): def mic_loop(): while not stop_event.is_set(): ws.send(stream.read(FRAMES, exception_on_overflow=False), websocket.ABNF.OPCODE_BINARY) global audio_thread audio_thread = threading.Thread(target=mic_loop, daemon=True) audio_thread.start()
def on_open(ws): """Called when the WebSocket connection is established.""" print("WebSocket connection opened.") print(f"Connected to: {API_ENDPOINT}") # Start sending audio data in a separate thread def stream_audio(): global stream print("Starting audio streaming...") while not stop_event.is_set(): try: audio_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False) # Send audio data as binary message ws.send(audio_data, websocket.ABNF.OPCODE_BINARY) except Exception as e: print(f"Error streaming audio: {e}") # If stream read fails, likely means it's closed, stop the loop break print("Audio streaming stopped.") global audio_thread audio_thread = threading.Thread(target=stream_audio) audio_thread.daemon = ( True # Allow main thread to exit even if this thread is running ) audio_thread.start()
Tip: Adding error-handling and log lines (as in the AssemblyAI snippet) lets you see exactly when the socket opens, audio starts, or a read fails—catching issues early saving time debugging silent failures.
def on_message(ws, msg): global final_metadata d = json.loads(msg) if d.get("type") == "Results": txt = d["channel"]["alternatives"][0]["transcript"] if d["is_final"]: print(" " * 80, end="\r") print(txt) else: print(txt, end="\r") elif d.get("type") == "Metadata": final_metadata = d # save the metadata for later
Helpful things to know about AssemblyAI’s message payloads:
Clear message types – Instead of checking is_final, you’ll receive explicit "Begin", "Turn", and "Termination" events, making your logic simpler and more readable.
Session metadata up-front – The first "Begin" message delivers a session_id and expiry timestamp. You can log or surface these for tracing or billing.
End-of-turn detection – Each "Turn" object includes an end_of_turn boolean. When end_of_turn is true, the transcript is a final, formatted result. When false, it is a partial transcript. Universal-3 Pro always returns formatted transcripts with smart punctuation & casing built in.
def on_error(ws, error): """Called when a WebSocket error occurs.""" print(f'\nWebSocket Error: {error}') # Attempt to signal stop on error stop_event.set()
Capture and log any errors emitted by the WebSocket connection to streamline troubleshooting and maintain smooth operation.
def on_close(ws, *args): stop_event.set() if final_metadata: print("\nFinal Metadata from Deepgram:") print(json.dumps(final_metadata, indent=2)) else: print("\nNo metadata received before close.")
def on_close(ws, close_status_code, close_msg): """Called when the WebSocket connection is closed.""" print(f"\nWebSocket Disconnected: Status={close_status_code}, Msg={close_msg}") # Ensure audio resources are released global stream, audio stop_event.set() # Signal audio thread just in case it's still running if stream: if stream.is_active(): stream.stop_stream() stream.close() stream = None if audio: audio.terminate() audio = None # Try to join the audio thread to ensure clean exit if audio_thread and audio_thread.is_alive(): audio_thread.join(timeout=1.0)
Helpful things to know about AssemblyAI’s WebSocket Closure:
Connection diagnostics on tap - If the socket closes unexpectedly, AssemblyAI supplies both a status code and a reason message (close_status_code, close_msg), so you know immediately whether the server timed out, refused auth, or encountered another error.
Metadata arrives at session start, not at close - Deepgram sends its final metadata only when the socket closes. AssemblyAI delivers session information up front in the initial “Begin” message, so you can log IDs and expiry times right away.
global audio, stream, ws_app# Initialize PyAudioaudio = pyaudio.PyAudio()# Open microphone streamtry: stream = audio.open( input=True, frames_per_buffer=FRAMES_PER_BUFFER, channels=CHANNELS, format=FORMAT, rate=SAMPLE_RATE, ) print("Microphone stream opened successfully.") print("Speak into your microphone. Press Ctrl+C to stop.")except Exception as e: print(f"Error opening microphone stream: {e}") if audio: audio.terminate() return # Exit if microphone cannot be opened# Create WebSocketAppws_app = websocket.WebSocketApp( API_ENDPOINT, header={"Authorization": YOUR_API_KEY}, on_open=on_open, on_message=on_message, on_error=on_error, on_close=on_close,)# Run WebSocketApp in a separate thread to allow main thread to catch KeyboardInterruptws_thread = threading.Thread(target=ws_app.run_forever)ws_thread.daemon = Truews_thread.start()
def graceful_shutdown(signum, frame): print("\nCtrl+C received → shutting down …") stop_event.set() # Send CloseStream before closing WebSocket if ws_app and ws_app.sock and ws_app.sock.connected: try: ws_app.send(json.dumps({"type": "CloseStream"})) time.sleep(0.5) # Allow time for final metadata to come through except Exception as e: print("Error sending CloseStream:", e) ws_app.close() if ws_thread and ws_thread.is_alive(): ws_thread.join(timeout=2.0)signal.signal(signal.SIGINT, graceful_shutdown)def run(): # Open microphone and create WebSocket # cleanup if stream and stream.is_active(): stream.stop_stream() if stream: stream.close() if audio: audio.terminate() print("Cleanup complete. Exiting.")
try: # Keep main thread alive until interrupted while ws_thread.is_alive(): time.sleep(0.1)except KeyboardInterrupt: print("\nCtrl+C received. Stopping...") stop_event.set() # Signal audio thread to stop # Send termination message to the server if ws_app and ws_app.sock and ws_app.sock.connected: try: terminate_message = {"type": "Terminate"} print(f"Sending termination message: {json.dumps(terminate_message)}") ws_app.send(json.dumps(terminate_message)) # Give a moment for messages to process before forceful close time.sleep(5) except Exception as e: print(f"Error sending termination message: {e}") # Close the WebSocket connection (will trigger on_close) if ws_app: ws_app.close() # Wait for WebSocket thread to finish ws_thread.join(timeout=2.0)except Exception as e: print(f"\nAn unexpected error occurred: {e}") stop_event.set() if ws_app: ws_app.close() ws_thread.join(timeout=2.0)finally: # Final cleanup (already handled in on_close, but good as a fallback) if stream and stream.is_active(): stream.stop_stream() if stream: stream.close() if audio: audio.terminate() print("Cleanup complete. Exiting.")
Helpful things to know about AssemblyAI’s shutdown:
JSON payload difference - When closing the stream with AssemblyAI, your JSON payload will be {"type: "Terminate" } instead of {"type: "CloseStream" }
No metadata race condition - Because AssemblyAI already provided session info at “Begin” and doesn’t append extra data at shutdown, you don’t have to sleep (time.sleep(0.5)) to wait for “final metadata” before closing—making the exit faster and less error-prone.