Terminate Streaming Session After Inactivity

An often-overlooked aspect of implementing AssemblyAI’s Streaming Speech-to-Text (STT) service is efficiently terminating transcription sessions. In this cookbook, you will learn how to terminate a Streaming session after any fixed duration of silence.

For the full code, refer to this GitHub gist.

Quickstart

1import pyaudio
2import websocket
3import json
4import os
5import threading
6import time
7from urllib.parse import urlencode
8from datetime import datetime
9
10# --- Configuration ---
11ASSEMBLYAI_API_KEY = os.environ["ASSEMBLYAI_API_KEY"]
12
13CONNECTION_PARAMS = {
14 "speech_model": "u3-rt-pro",
15 "sample_rate": 16000,
16}
17API_ENDPOINT_BASE_URL = "wss://streaming.assemblyai.com/v3/ws"
18API_ENDPOINT = f"{API_ENDPOINT_BASE_URL}?{urlencode(CONNECTION_PARAMS)}"
19
20# Audio Configuration
21FRAMES_PER_BUFFER = 800 # 50ms of audio (0.05s * 16000Hz)
22SAMPLE_RATE = CONNECTION_PARAMS["sample_rate"]
23CHANNELS = 1
24FORMAT = pyaudio.paInt16
25
26# Global variables for audio stream and websocket
27audio = None
28stream = None
29ws_app = None
30audio_thread = None
31stop_event = threading.Event()
32
33# Silence tracking
34last_transcript_received = datetime.now()
35terminated = False
36
37# --- WebSocket Event Handlers ---
38
39def on_open(ws):
40 """Called when the WebSocket connection is established."""
41 print("WebSocket connection opened.")
42 print(f"Connected to: {API_ENDPOINT}")
43
44 def stream_audio():
45 global stream
46 print("Starting audio streaming...")
47 while not stop_event.is_set():
48 try:
49 audio_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False)
50 ws.send(audio_data, websocket.ABNF.OPCODE_BINARY)
51 except Exception as e:
52 print(f"Error streaming audio: {e}")
53 break
54 print("Audio streaming stopped.")
55
56 global audio_thread
57 audio_thread = threading.Thread(target=stream_audio)
58 audio_thread.daemon = True
59 audio_thread.start()
60
61def on_message(ws, message):
62 global last_transcript_received, terminated
63
64 try:
65 data = json.loads(message)
66 msg_type = data.get('type')
67
68 if terminated and msg_type != "Termination":
69 return
70
71 if msg_type == "Begin":
72 session_id = data.get('id')
73 expires_at = data.get('expires_at')
74 print(f"\nSession began: ID={session_id}, ExpiresAt={datetime.fromtimestamp(expires_at)}")
75 elif msg_type == "Turn":
76 transcript = data.get('transcript', '')
77 if data.get('end_of_turn'):
78 print('\r' + ' ' * 80 + '\r', end='')
79 print(transcript)
80 else:
81 print(f"\r{transcript}", end='')
82
83 # Update timestamp if meaningful speech received
84 if transcript.strip():
85 last_transcript_received = datetime.now()
86
87 # Check for silence timeout
88 silence_duration = (datetime.now() - last_transcript_received).total_seconds()
89 if silence_duration > 5:
90 print("No transcription received in 5 seconds. Terminating session...")
91 try:
92 ws.send(json.dumps({"type": "Terminate"}))
93 except Exception:
94 pass
95 terminated = True
96 stop_event.set()
97 return
98
99 elif msg_type == "Termination":
100 audio_duration = data.get('audio_duration_seconds', 0)
101 session_duration = data.get('session_duration_seconds', 0)
102 print(f"\nSession Terminated: Audio Duration={audio_duration}s, Session Duration={session_duration}s")
103 except json.JSONDecodeError as e:
104 print(f"Error decoding message: {e}")
105 except Exception as e:
106 print(f"Error handling message: {e}")
107
108def on_error(ws, error):
109 """Called when a WebSocket error occurs."""
110 print(f"\nWebSocket Error: {error}")
111 stop_event.set()
112
113
114def on_close(ws, close_status_code, close_msg):
115 """Called when the WebSocket connection is closed."""
116 print(f"\nWebSocket Disconnected: Status={close_status_code}, Msg={close_msg}")
117
118 global stream, audio
119 stop_event.set()
120
121 if stream:
122 if stream.is_active():
123 stream.stop_stream()
124 stream.close()
125 stream = None
126 if audio:
127 audio.terminate()
128 audio = None
129 if audio_thread and audio_thread.is_alive():
130 audio_thread.join(timeout=1.0)
131
132# --- Main Execution ---
133def run():
134 global audio, stream, ws_app
135
136 # Initialize PyAudio
137 audio = pyaudio.PyAudio()
138
139 # Open microphone stream
140 try:
141 stream = audio.open(
142 input=True,
143 frames_per_buffer=FRAMES_PER_BUFFER,
144 channels=CHANNELS,
145 format=FORMAT,
146 rate=SAMPLE_RATE,
147 )
148 print("Microphone stream opened successfully.")
149 print("Speak into your microphone. Press Ctrl+C to stop.")
150 except Exception as e:
151 print(f"Error opening microphone stream: {e}")
152 if audio:
153 audio.terminate()
154 return
155
156 # Create WebSocketApp
157 ws_app = websocket.WebSocketApp(
158 API_ENDPOINT,
159 header={"Authorization": ASSEMBLYAI_API_KEY},
160 on_open=on_open,
161 on_message=on_message,
162 on_error=on_error,
163 on_close=on_close,
164 )
165
166 # Run WebSocketApp in a separate thread to allow main thread to catch KeyboardInterrupt
167 ws_thread = threading.Thread(target=ws_app.run_forever)
168 ws_thread.daemon = True
169 ws_thread.start()
170
171 try:
172 while ws_thread.is_alive():
173 time.sleep(0.1)
174 except KeyboardInterrupt:
175 print("\nCtrl+C received. Stopping...")
176 stop_event.set()
177
178 if ws_app and ws_app.sock and ws_app.sock.connected:
179 try:
180 terminate_message = {"type": "Terminate"}
181 print(f"Sending termination message: {json.dumps(terminate_message)}")
182 ws_app.send(json.dumps(terminate_message))
183 time.sleep(5)
184 except Exception as e:
185 print(f"Error sending termination message: {e}")
186
187 if ws_app:
188 ws_app.close()
189
190 ws_thread.join(timeout=2.0)
191
192 except Exception as e:
193 print(f"\nAn unexpected error occurred: {e}")
194 stop_event.set()
195 if ws_app:
196 ws_app.close()
197 ws_thread.join(timeout=2.0)
198
199 finally:
200 if stream and stream.is_active():
201 stream.stop_stream()
202 if stream:
203 stream.close()
204 if audio:
205 audio.terminate()
206 print("Cleanup complete. Exiting.")
207
208
209if __name__ == "__main__":
210 run()

Get Started

Before we begin, make sure you have an AssemblyAI account and an API key. You can sign up for an AssemblyAI account and get your API key from your dashboard.

Step-by-step instructions

First, install the required packages.

$pip install websocket-client pyaudio

Import packages and set your API key.

1import pyaudio
2import websocket
3import json
4import os
5import threading
6import time
7from urllib.parse import urlencode
8from datetime import datetime
9
10ASSEMBLYAI_API_KEY = os.environ["ASSEMBLYAI_API_KEY"]

Audio configuration and global variables

1CONNECTION_PARAMS = {
2 "speech_model": "u3-rt-pro",
3 "sample_rate": 16000,
4}
5API_ENDPOINT_BASE_URL = "wss://streaming.assemblyai.com/v3/ws"
6API_ENDPOINT = f"{API_ENDPOINT_BASE_URL}?{urlencode(CONNECTION_PARAMS)}"
7
8# Audio Configuration
9FRAMES_PER_BUFFER = 800 # 50ms of audio (0.05s * 16000Hz)
10SAMPLE_RATE = CONNECTION_PARAMS["sample_rate"]
11CHANNELS = 1
12FORMAT = pyaudio.paInt16
13
14# Global variables for audio stream and websocket
15audio = None
16stream = None
17ws_app = None
18audio_thread = None
19stop_event = threading.Event()

Implementing Speech Activity Checks

Our Streaming API emits a Turn Event each time speech is processed. You can use this behavior to detect inactivity and automatically terminate the session.

We track the timestamp of the most recent non-empty transcript. On every Turn Event, we:

  • Update the timestamp if meaningful speech is received

  • Check how many seconds have passed since the last valid transcript

  • If that exceeds your timeout (e.g. 5 seconds), terminate the session

Key Variables

1last_transcript_received = datetime.now()
2terminated = False

These are updated on every Turn event.

WebSocket event handlers

Open WebSocket

1def on_open(ws):
2 """Called when the WebSocket connection is established."""
3 print("WebSocket connection opened.")
4 print(f"Connected to: {API_ENDPOINT}")
5
6 def stream_audio():
7 global stream
8 print("Starting audio streaming...")
9 while not stop_event.is_set():
10 try:
11 audio_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False)
12 ws.send(audio_data, websocket.ABNF.OPCODE_BINARY)
13 except Exception as e:
14 print(f"Error streaming audio: {e}")
15 break
16 print("Audio streaming stopped.")
17
18 global audio_thread
19 audio_thread = threading.Thread(target=stream_audio)
20 audio_thread.daemon = True
21 audio_thread.start()

Handle WebSocket messages with silence detection

1def on_message(ws, message):
2 global last_transcript_received, terminated
3
4 try:
5 data = json.loads(message)
6 msg_type = data.get('type')
7
8 if terminated and msg_type != "Termination":
9 return
10
11 if msg_type == "Begin":
12 session_id = data.get('id')
13 expires_at = data.get('expires_at')
14 print(f"\nSession began: ID={session_id}, ExpiresAt={datetime.fromtimestamp(expires_at)}")
15 elif msg_type == "Turn":
16 transcript = data.get('transcript', '')
17 if data.get('end_of_turn'):
18 print('\r' + ' ' * 80 + '\r', end='')
19 print(transcript)
20 else:
21 print(f"\r{transcript}", end='')
22
23 # Update timestamp if meaningful speech received
24 if transcript.strip():
25 last_transcript_received = datetime.now()
26
27 # Check for silence timeout
28 silence_duration = (datetime.now() - last_transcript_received).total_seconds()
29 if silence_duration > 5:
30 print("No transcription received in 5 seconds. Terminating session...")
31 try:
32 ws.send(json.dumps({"type": "Terminate"}))
33 except Exception:
34 pass
35 terminated = True
36 stop_event.set()
37 return
38
39 elif msg_type == "Termination":
40 audio_duration = data.get('audio_duration_seconds', 0)
41 session_duration = data.get('session_duration_seconds', 0)
42 print(f"\nSession Terminated: Audio Duration={audio_duration}s, Session Duration={session_duration}s")
43 except json.JSONDecodeError as e:
44 print(f"Error decoding message: {e}")
45 except Exception as e:
46 print(f"Error handling message: {e}")

This pattern ensures sessions are cleanly terminated after inactivity.

WebSocket error and close handlers

1def on_error(ws, error):
2 """Called when a WebSocket error occurs."""
3 print(f"\nWebSocket Error: {error}")
4 stop_event.set()
5
6
7def on_close(ws, close_status_code, close_msg):
8 """Called when the WebSocket connection is closed."""
9 print(f"\nWebSocket Disconnected: Status={close_status_code}, Msg={close_msg}")
10
11 global stream, audio
12 stop_event.set()
13
14 if stream:
15 if stream.is_active():
16 stream.stop_stream()
17 stream.close()
18 stream = None
19 if audio:
20 audio.terminate()
21 audio = None
22 if audio_thread and audio_thread.is_alive():
23 audio_thread.join(timeout=1.0)

Begin streaming STT transcription

1def run():
2 global audio, stream, ws_app
3
4 audio = pyaudio.PyAudio()
5
6 try:
7 stream = audio.open(
8 input=True,
9 frames_per_buffer=FRAMES_PER_BUFFER,
10 channels=CHANNELS,
11 format=FORMAT,
12 rate=SAMPLE_RATE,
13 )
14 print("Microphone stream opened successfully.")
15 print("Speak into your microphone. Press Ctrl+C to stop.")
16 except Exception as e:
17 print(f"Error opening microphone stream: {e}")
18 if audio:
19 audio.terminate()
20 return
21
22 ws_app = websocket.WebSocketApp(
23 API_ENDPOINT,
24 header={"Authorization": ASSEMBLYAI_API_KEY},
25 on_open=on_open,
26 on_message=on_message,
27 on_error=on_error,
28 on_close=on_close,
29 )
30
31 ws_thread = threading.Thread(target=ws_app.run_forever)
32 ws_thread.daemon = True
33 ws_thread.start()
34
35 try:
36 while ws_thread.is_alive():
37 time.sleep(0.1)
38 except KeyboardInterrupt:
39 print("\nCtrl+C received. Stopping...")
40 stop_event.set()
41
42 if ws_app and ws_app.sock and ws_app.sock.connected:
43 try:
44 terminate_message = {"type": "Terminate"}
45 print(f"Sending termination message: {json.dumps(terminate_message)}")
46 ws_app.send(json.dumps(terminate_message))
47 time.sleep(5)
48 except Exception as e:
49 print(f"Error sending termination message: {e}")
50
51 if ws_app:
52 ws_app.close()
53
54 ws_thread.join(timeout=2.0)
55
56 except Exception as e:
57 print(f"\nAn unexpected error occurred: {e}")
58 stop_event.set()
59 if ws_app:
60 ws_app.close()
61 ws_thread.join(timeout=2.0)
62
63 finally:
64 if stream and stream.is_active():
65 stream.stop_stream()
66 if stream:
67 stream.close()
68 if audio:
69 audio.terminate()
70 print("Cleanup complete. Exiting.")
71
72
73if __name__ == "__main__":
74 run()

What You’ll Observe

  • Live transcription continues as long as there’s speech

  • After 5 seconds of silence, the session ends automatically

You can change the timeout value to suit your needs by modifying the silence_duration > 5 check.