Terminate Streaming Session After Inactivity
An often-overlooked aspect of implementing AssemblyAI’s Streaming Speech-to-Text (STT) service is efficiently terminating transcription sessions. In this cookbook, you will learn how to terminate a Streaming session after any fixed duration of silence.
For the full code, refer to this GitHub gist.
Quickstart
1 import pyaudio 2 import websocket 3 import json 4 import os 5 import threading 6 import time 7 from urllib.parse import urlencode 8 from datetime import datetime 9 10 # --- Configuration --- 11 ASSEMBLYAI_API_KEY = os.environ["ASSEMBLYAI_API_KEY"] 12 13 CONNECTION_PARAMS = { 14 "speech_model": "u3-rt-pro", 15 "sample_rate": 16000, 16 } 17 API_ENDPOINT_BASE_URL = "wss://streaming.assemblyai.com/v3/ws" 18 API_ENDPOINT = f"{API_ENDPOINT_BASE_URL}?{urlencode(CONNECTION_PARAMS)}" 19 20 # Audio Configuration 21 FRAMES_PER_BUFFER = 800 # 50ms of audio (0.05s * 16000Hz) 22 SAMPLE_RATE = CONNECTION_PARAMS["sample_rate"] 23 CHANNELS = 1 24 FORMAT = pyaudio.paInt16 25 26 # Global variables for audio stream and websocket 27 audio = None 28 stream = None 29 ws_app = None 30 audio_thread = None 31 stop_event = threading.Event() 32 33 # Silence tracking 34 last_transcript_received = datetime.now() 35 terminated = False 36 37 # --- WebSocket Event Handlers --- 38 39 def on_open(ws): 40 """Called when the WebSocket connection is established.""" 41 print("WebSocket connection opened.") 42 print(f"Connected to: {API_ENDPOINT}") 43 44 def stream_audio(): 45 global stream 46 print("Starting audio streaming...") 47 while not stop_event.is_set(): 48 try: 49 audio_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False) 50 ws.send(audio_data, websocket.ABNF.OPCODE_BINARY) 51 except Exception as e: 52 print(f"Error streaming audio: {e}") 53 break 54 print("Audio streaming stopped.") 55 56 global audio_thread 57 audio_thread = threading.Thread(target=stream_audio) 58 audio_thread.daemon = True 59 audio_thread.start() 60 61 def on_message(ws, message): 62 global last_transcript_received, terminated 63 64 try: 65 data = json.loads(message) 66 msg_type = data.get('type') 67 68 if terminated and msg_type != "Termination": 69 return 70 71 if msg_type == "Begin": 72 session_id = data.get('id') 73 expires_at = data.get('expires_at') 74 print(f"\nSession began: ID={session_id}, ExpiresAt={datetime.fromtimestamp(expires_at)}") 75 elif msg_type == "Turn": 76 transcript = data.get('transcript', '') 77 if data.get('end_of_turn'): 78 print('\r' + ' ' * 80 + '\r', end='') 79 print(transcript) 80 else: 81 print(f"\r{transcript}", end='') 82 83 # Update timestamp if meaningful speech received 84 if transcript.strip(): 85 last_transcript_received = datetime.now() 86 87 # Check for silence timeout 88 silence_duration = (datetime.now() - last_transcript_received).total_seconds() 89 if silence_duration > 5: 90 print("No transcription received in 5 seconds. Terminating session...") 91 try: 92 ws.send(json.dumps({"type": "Terminate"})) 93 except Exception: 94 pass 95 terminated = True 96 stop_event.set() 97 return 98 99 elif msg_type == "Termination": 100 audio_duration = data.get('audio_duration_seconds', 0) 101 session_duration = data.get('session_duration_seconds', 0) 102 print(f"\nSession Terminated: Audio Duration={audio_duration}s, Session Duration={session_duration}s") 103 except json.JSONDecodeError as e: 104 print(f"Error decoding message: {e}") 105 except Exception as e: 106 print(f"Error handling message: {e}") 107 108 def on_error(ws, error): 109 """Called when a WebSocket error occurs.""" 110 print(f"\nWebSocket Error: {error}") 111 stop_event.set() 112 113 114 def on_close(ws, close_status_code, close_msg): 115 """Called when the WebSocket connection is closed.""" 116 print(f"\nWebSocket Disconnected: Status={close_status_code}, Msg={close_msg}") 117 118 global stream, audio 119 stop_event.set() 120 121 if stream: 122 if stream.is_active(): 123 stream.stop_stream() 124 stream.close() 125 stream = None 126 if audio: 127 audio.terminate() 128 audio = None 129 if audio_thread and audio_thread.is_alive(): 130 audio_thread.join(timeout=1.0) 131 132 # --- Main Execution --- 133 def run(): 134 global audio, stream, ws_app 135 136 # Initialize PyAudio 137 audio = pyaudio.PyAudio() 138 139 # Open microphone stream 140 try: 141 stream = audio.open( 142 input=True, 143 frames_per_buffer=FRAMES_PER_BUFFER, 144 channels=CHANNELS, 145 format=FORMAT, 146 rate=SAMPLE_RATE, 147 ) 148 print("Microphone stream opened successfully.") 149 print("Speak into your microphone. Press Ctrl+C to stop.") 150 except Exception as e: 151 print(f"Error opening microphone stream: {e}") 152 if audio: 153 audio.terminate() 154 return 155 156 # Create WebSocketApp 157 ws_app = websocket.WebSocketApp( 158 API_ENDPOINT, 159 header={"Authorization": ASSEMBLYAI_API_KEY}, 160 on_open=on_open, 161 on_message=on_message, 162 on_error=on_error, 163 on_close=on_close, 164 ) 165 166 # Run WebSocketApp in a separate thread to allow main thread to catch KeyboardInterrupt 167 ws_thread = threading.Thread(target=ws_app.run_forever) 168 ws_thread.daemon = True 169 ws_thread.start() 170 171 try: 172 while ws_thread.is_alive(): 173 time.sleep(0.1) 174 except KeyboardInterrupt: 175 print("\nCtrl+C received. Stopping...") 176 stop_event.set() 177 178 if ws_app and ws_app.sock and ws_app.sock.connected: 179 try: 180 terminate_message = {"type": "Terminate"} 181 print(f"Sending termination message: {json.dumps(terminate_message)}") 182 ws_app.send(json.dumps(terminate_message)) 183 time.sleep(5) 184 except Exception as e: 185 print(f"Error sending termination message: {e}") 186 187 if ws_app: 188 ws_app.close() 189 190 ws_thread.join(timeout=2.0) 191 192 except Exception as e: 193 print(f"\nAn unexpected error occurred: {e}") 194 stop_event.set() 195 if ws_app: 196 ws_app.close() 197 ws_thread.join(timeout=2.0) 198 199 finally: 200 if stream and stream.is_active(): 201 stream.stop_stream() 202 if stream: 203 stream.close() 204 if audio: 205 audio.terminate() 206 print("Cleanup complete. Exiting.") 207 208 209 if __name__ == "__main__": 210 run()
Get Started
Before we begin, make sure you have an AssemblyAI account and an API key. You can sign up for an AssemblyAI account and get your API key from your dashboard.
Step-by-step instructions
First, install the required packages.
$ pip install websocket-client pyaudio
Import packages and set your API key.
1 import pyaudio 2 import websocket 3 import json 4 import os 5 import threading 6 import time 7 from urllib.parse import urlencode 8 from datetime import datetime 9 10 ASSEMBLYAI_API_KEY = os.environ["ASSEMBLYAI_API_KEY"]
Audio configuration and global variables
1 CONNECTION_PARAMS = { 2 "speech_model": "u3-rt-pro", 3 "sample_rate": 16000, 4 } 5 API_ENDPOINT_BASE_URL = "wss://streaming.assemblyai.com/v3/ws" 6 API_ENDPOINT = f"{API_ENDPOINT_BASE_URL}?{urlencode(CONNECTION_PARAMS)}" 7 8 # Audio Configuration 9 FRAMES_PER_BUFFER = 800 # 50ms of audio (0.05s * 16000Hz) 10 SAMPLE_RATE = CONNECTION_PARAMS["sample_rate"] 11 CHANNELS = 1 12 FORMAT = pyaudio.paInt16 13 14 # Global variables for audio stream and websocket 15 audio = None 16 stream = None 17 ws_app = None 18 audio_thread = None 19 stop_event = threading.Event()
Implementing Speech Activity Checks
Our Streaming API emits a Turn Event each time speech is processed. You can use this behavior to detect inactivity and automatically terminate the session.
We track the timestamp of the most recent non-empty transcript. On every Turn Event, we:
-
Update the timestamp if meaningful speech is received
-
Check how many seconds have passed since the last valid transcript
-
If that exceeds your timeout (e.g. 5 seconds), terminate the session
Key Variables
1 last_transcript_received = datetime.now() 2 terminated = False
These are updated on every Turn event.
WebSocket event handlers
Open WebSocket
1 def on_open(ws): 2 """Called when the WebSocket connection is established.""" 3 print("WebSocket connection opened.") 4 print(f"Connected to: {API_ENDPOINT}") 5 6 def stream_audio(): 7 global stream 8 print("Starting audio streaming...") 9 while not stop_event.is_set(): 10 try: 11 audio_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False) 12 ws.send(audio_data, websocket.ABNF.OPCODE_BINARY) 13 except Exception as e: 14 print(f"Error streaming audio: {e}") 15 break 16 print("Audio streaming stopped.") 17 18 global audio_thread 19 audio_thread = threading.Thread(target=stream_audio) 20 audio_thread.daemon = True 21 audio_thread.start()
Handle WebSocket messages with silence detection
1 def on_message(ws, message): 2 global last_transcript_received, terminated 3 4 try: 5 data = json.loads(message) 6 msg_type = data.get('type') 7 8 if terminated and msg_type != "Termination": 9 return 10 11 if msg_type == "Begin": 12 session_id = data.get('id') 13 expires_at = data.get('expires_at') 14 print(f"\nSession began: ID={session_id}, ExpiresAt={datetime.fromtimestamp(expires_at)}") 15 elif msg_type == "Turn": 16 transcript = data.get('transcript', '') 17 if data.get('end_of_turn'): 18 print('\r' + ' ' * 80 + '\r', end='') 19 print(transcript) 20 else: 21 print(f"\r{transcript}", end='') 22 23 # Update timestamp if meaningful speech received 24 if transcript.strip(): 25 last_transcript_received = datetime.now() 26 27 # Check for silence timeout 28 silence_duration = (datetime.now() - last_transcript_received).total_seconds() 29 if silence_duration > 5: 30 print("No transcription received in 5 seconds. Terminating session...") 31 try: 32 ws.send(json.dumps({"type": "Terminate"})) 33 except Exception: 34 pass 35 terminated = True 36 stop_event.set() 37 return 38 39 elif msg_type == "Termination": 40 audio_duration = data.get('audio_duration_seconds', 0) 41 session_duration = data.get('session_duration_seconds', 0) 42 print(f"\nSession Terminated: Audio Duration={audio_duration}s, Session Duration={session_duration}s") 43 except json.JSONDecodeError as e: 44 print(f"Error decoding message: {e}") 45 except Exception as e: 46 print(f"Error handling message: {e}")
This pattern ensures sessions are cleanly terminated after inactivity.
WebSocket error and close handlers
1 def on_error(ws, error): 2 """Called when a WebSocket error occurs.""" 3 print(f"\nWebSocket Error: {error}") 4 stop_event.set() 5 6 7 def on_close(ws, close_status_code, close_msg): 8 """Called when the WebSocket connection is closed.""" 9 print(f"\nWebSocket Disconnected: Status={close_status_code}, Msg={close_msg}") 10 11 global stream, audio 12 stop_event.set() 13 14 if stream: 15 if stream.is_active(): 16 stream.stop_stream() 17 stream.close() 18 stream = None 19 if audio: 20 audio.terminate() 21 audio = None 22 if audio_thread and audio_thread.is_alive(): 23 audio_thread.join(timeout=1.0)
Begin streaming STT transcription
1 def run(): 2 global audio, stream, ws_app 3 4 audio = pyaudio.PyAudio() 5 6 try: 7 stream = audio.open( 8 input=True, 9 frames_per_buffer=FRAMES_PER_BUFFER, 10 channels=CHANNELS, 11 format=FORMAT, 12 rate=SAMPLE_RATE, 13 ) 14 print("Microphone stream opened successfully.") 15 print("Speak into your microphone. Press Ctrl+C to stop.") 16 except Exception as e: 17 print(f"Error opening microphone stream: {e}") 18 if audio: 19 audio.terminate() 20 return 21 22 ws_app = websocket.WebSocketApp( 23 API_ENDPOINT, 24 header={"Authorization": ASSEMBLYAI_API_KEY}, 25 on_open=on_open, 26 on_message=on_message, 27 on_error=on_error, 28 on_close=on_close, 29 ) 30 31 ws_thread = threading.Thread(target=ws_app.run_forever) 32 ws_thread.daemon = True 33 ws_thread.start() 34 35 try: 36 while ws_thread.is_alive(): 37 time.sleep(0.1) 38 except KeyboardInterrupt: 39 print("\nCtrl+C received. Stopping...") 40 stop_event.set() 41 42 if ws_app and ws_app.sock and ws_app.sock.connected: 43 try: 44 terminate_message = {"type": "Terminate"} 45 print(f"Sending termination message: {json.dumps(terminate_message)}") 46 ws_app.send(json.dumps(terminate_message)) 47 time.sleep(5) 48 except Exception as e: 49 print(f"Error sending termination message: {e}") 50 51 if ws_app: 52 ws_app.close() 53 54 ws_thread.join(timeout=2.0) 55 56 except Exception as e: 57 print(f"\nAn unexpected error occurred: {e}") 58 stop_event.set() 59 if ws_app: 60 ws_app.close() 61 ws_thread.join(timeout=2.0) 62 63 finally: 64 if stream and stream.is_active(): 65 stream.stop_stream() 66 if stream: 67 stream.close() 68 if audio: 69 audio.terminate() 70 print("Cleanup complete. Exiting.") 71 72 73 if __name__ == "__main__": 74 run()
What You’ll Observe
-
Live transcription continues as long as there’s speech
-
After 5 seconds of silence, the session ends automatically
You can change the timeout value to suit your needs by modifying the silence_duration > 5 check.