Use LLM Gateway with Streaming Speech-to-Text (STT)

This script is modified to contain a global variable conversation_data that accumulates the transcribed text in the on_message function. Once the transcription session is closed, the conversation_data is sent to LLM Gateway for analysis.

Quickstart

1import pyaudio
2import websocket
3import json
4import threading
5import time
6import requests
7from urllib.parse import urlencode
8from datetime import datetime
9
10# --- Configuration ---
11YOUR_API_KEY = "YOUR_API_KEY" # Replace with your actual API key
12
13CONNECTION_PARAMS = {
14 "sample_rate": 16000,
15 "format_turns": True, # Request formatted final transcripts
16}
17API_ENDPOINT_BASE_URL = "wss://streaming.assemblyai.com/v3/ws"
18API_ENDPOINT = f"{API_ENDPOINT_BASE_URL}?{urlencode(CONNECTION_PARAMS)}"
19
20# Audio Configuration
21FRAMES_PER_BUFFER = 800 # 50ms of audio (0.05s * 16000Hz)
22SAMPLE_RATE = CONNECTION_PARAMS["sample_rate"]
23CHANNELS = 1
24FORMAT = pyaudio.paInt16
25
26# Global variables for audio stream and websocket
27audio = None
28stream = None
29ws_app = None
30audio_thread = None
31stop_event = threading.Event() # To signal the audio thread to stop
32conversation_data = ""
33
34# WAV recording variables
35recorded_frames = [] # Store audio frames for WAV file
36recording_lock = threading.Lock() # Thread-safe access to recorded_frames
37
38# --- Function to Analyze Text with LLM Gateway ---
39
40def analyze_with_llm_gateway(text):
41 """Called when the WebSocket connection is closing and the transcript text is sent to LLM Gateway to be analyzed."""
42 headers = {
43 "authorization": YOUR_API_KEY,
44 "content-type": "application/json"
45 }
46
47 prompt = "You are a helpful coach. Provide an analysis of the transcript and offer areas to improve with exact quotes. Include no preamble. Start with an overall summary then get into the examples with feedback."
48
49 llm_gateway_data = {
50 "model": "claude-sonnet-4-20250514",
51 "messages": [
52 {"role": "user", "content": f"{prompt}\n\nTranscript: {text}"}
53 ],
54 "max_tokens": 4000
55 }
56
57 result = requests.post(
58 "https://llm-gateway.assemblyai.com/v1/chat/completions",
59 headers=headers,
60 json=llm_gateway_data
61 )
62 return result.json()["choices"][0]["message"]["content"]
63
64# --- WebSocket Event Handlers ---
65
66def on_open(ws):
67 """Called when the WebSocket connection is established."""
68 print("WebSocket connection opened.")
69 print(f"Connected to: {API_ENDPOINT}")
70
71 # Start sending audio data in a separate thread
72 def stream_audio():
73 global stream
74 print("Starting audio streaming...")
75 while not stop_event.is_set():
76 try:
77 audio_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False)
78
79 # Store audio data for WAV recording
80 with recording_lock:
81 recorded_frames.append(audio_data)
82
83 # Send audio data as binary message
84 ws.send(audio_data, websocket.ABNF.OPCODE_BINARY)
85 except Exception as e:
86 print(f"Error streaming audio: {e}")
87 # If stream read fails, likely means it's closed, stop the loop
88 break
89 print("Audio streaming stopped.")
90
91 global audio_thread
92 audio_thread = threading.Thread(target=stream_audio)
93 audio_thread.daemon = (
94 True # Allow main thread to exit even if this thread is running
95 )
96 audio_thread.start()
97
98def on_message(ws, message):
99
100 try:
101 data = json.loads(message)
102 msg_type = data.get('type')
103
104 if msg_type == "Begin":
105 session_id = data.get('id')
106 expires_at = data.get('expires_at')
107 print(f"\nSession began: ID={session_id}, ExpiresAt={datetime.fromtimestamp(expires_at)}")
108 elif msg_type == "Turn":
109 transcript = data.get('transcript', '')
110 if data.get('end_of_turn'):
111 global conversation_data
112
113 print('\r' + ' ' * 80 + '\r', end='')
114 print(transcript)
115 conversation_data += f"{transcript}\n"
116
117 elif msg_type == "Termination":
118 audio_duration = data.get('audio_duration_seconds', 0)
119 session_duration = data.get('session_duration_seconds', 0)
120 print(f"\nSession Terminated: Audio Duration={audio_duration}s, Session Duration={session_duration}s")
121 except json.JSONDecodeError as e:
122 print(f"Error decoding message: {e}")
123 except Exception as e:
124 print(f"Error handling message: {e}")
125
126def on_error(ws, error):
127 """Called when a WebSocket error occurs."""
128 print(f"\nWebSocket Error: {error}")
129 # Attempt to signal stop on error
130 stop_event.set()
131
132def on_close(ws, close_status_code, close_msg):
133 """Called when the WebSocket connection is closed."""
134 print(f"\nWebSocket Disconnected: Status={close_status_code}, Msg={close_msg}")
135
136 # Ensure audio resources are released
137 global stream, audio
138 stop_event.set() # Signal audio thread just in case it's still running
139
140 if stream:
141 if stream.is_active():
142 stream.stop_stream()
143 stream.close()
144 stream = None
145 if audio:
146 audio.terminate()
147 audio = None
148 # Try to join the audio thread to ensure clean exit
149 if audio_thread and audio_thread.is_alive():
150 audio_thread.join(timeout=1.0)
151
152# --- Main Execution ---
153
154def run():
155 global audio, stream, ws_app
156
157 # Initialize PyAudio
158 audio = pyaudio.PyAudio()
159
160 # Open microphone stream
161 try:
162 stream = audio.open(
163 input=True,
164 frames_per_buffer=FRAMES_PER_BUFFER,
165 channels=CHANNELS,
166 format=FORMAT,
167 rate=SAMPLE_RATE,
168 )
169 print("Microphone stream opened successfully.")
170 print("Speak into your microphone. Press Ctrl+C to stop.")
171 print("Audio will be saved to a WAV file when the session ends.")
172 except Exception as e:
173 print(f"Error opening microphone stream: {e}")
174 if audio:
175 audio.terminate()
176 return # Exit if microphone cannot be opened
177
178 # Create WebSocketApp
179 ws_app = websocket.WebSocketApp(
180 API_ENDPOINT,
181 header={"Authorization": YOUR_API_KEY},
182 on_open=on_open,
183 on_message=on_message,
184 on_error=on_error,
185 on_close=on_close,
186 )
187
188 # Run WebSocketApp in a separate thread to allow main thread to catch KeyboardInterrupt
189 ws_thread = threading.Thread(target=ws_app.run_forever)
190 ws_thread.daemon = True
191 ws_thread.start()
192
193 try:
194 # Keep main thread alive until interrupted
195 while ws_thread.is_alive():
196 time.sleep(0.1)
197 except KeyboardInterrupt:
198 print("\nCtrl+C received. Stopping...")
199 stop_event.set() # Signal audio thread to stop
200
201 # Send termination message to the server
202 if ws_app and ws_app.sock and ws_app.sock.connected:
203 try:
204 terminate_message = {"type": "Terminate"}
205 print(f"Sending termination message: {json.dumps(terminate_message)}")
206 ws_app.send(json.dumps(terminate_message))
207 # Give a moment for messages to process before forceful close
208 time.sleep(5)
209 except Exception as e:
210 print(f"Error sending termination message: {e}")
211
212 # Close the WebSocket connection (will trigger on_close)
213 if ws_app:
214 ws_app.close()
215
216 # Wait for WebSocket thread to finish
217 ws_thread.join(timeout=2.0)
218
219 # Analyze transcript with LLM Gateway
220 if conversation_data.strip():
221 print("Analyzing conversation with LLM Gateway...")
222 print(analyze_with_llm_gateway(conversation_data))
223 else:
224 print("No conversation data to analyze.")
225
226 except Exception as e:
227 print(f"\nAn unexpected error occurred: {e}")
228 stop_event.set()
229 if ws_app:
230 ws_app.close()
231 ws_thread.join(timeout=2.0)
232
233 finally:
234 # Final cleanup (already handled in on_close, but good as a fallback)
235 if stream and stream.is_active():
236 stream.stop_stream()
237 if stream:
238 stream.close()
239 if audio:
240 audio.terminate()
241 print("Cleanup complete. Exiting.")
242
243
244if __name__ == "__main__":
245 run()

Step-by-Step Instructions

Before we begin, make sure you have an AssemblyAI account and an API key. You can sign up and get your API key from your dashboard.

Import Packages & Set API Key

1import pyaudio
2import websocket
3import json
4import threading
5import time
6import requests
7from urllib.parse import urlencode
8from datetime import datetime
9
10YOUR_API_KEY = "YOUR_API_KEY" # Replace with your actual API key

Audio Configuration & Global Variables

Set all of your audio configurations and global variables. Make sure that you have the parameter format_turns set to True and a global variable conversation_data set to an empty string "".

1CONNECTION_PARAMS = {
2 "sample_rate": 16000,
3 "format_turns": True, # Request formatted final transcripts
4}
5API_ENDPOINT_BASE_URL = "wss://streaming.assemblyai.com/v3/ws"
6API_ENDPOINT = f"{API_ENDPOINT_BASE_URL}?{urlencode(CONNECTION_PARAMS)}"
7
8# Audio Configuration
9FRAMES_PER_BUFFER = 800 # 50ms of audio (0.05s * 16000Hz)
10SAMPLE_RATE = CONNECTION_PARAMS["sample_rate"]
11CHANNELS = 1
12FORMAT = pyaudio.paInt16
13
14# Global variables for audio stream and websocket
15audio = None
16stream = None
17ws_app = None
18audio_thread = None
19stop_event = threading.Event() # To signal the audio thread to stop
20conversation_data = ""
21
22# WAV recording variables
23recorded_frames = [] # Store audio frames for WAV file
24recording_lock = threading.Lock() # Thread-safe access to recorded_frames

Define Analyze With LLM Gateway Function

Define a function called analyze_with_llm_gateway, which uses LLM Gateway to analyze the complete final transcript text. The prompt can be modified to suit your individual requirements.

1def analyze_with_llm_gateway(text):
2 """Called when the WebSocket connection is closing and the transcript text is sent to LLM Gateway to be analyzed."""
3 headers = {
4 "authorization": YOUR_API_KEY,
5 "content-type": "application/json"
6 }
7
8 prompt = "You are a helpful coach. Provide an analysis of the transcript and offer areas to improve with exact quotes. Include no preamble. Start with an overall summary then get into the examples with feedback."
9
10 llm_gateway_data = {
11 "model": "claude-sonnet-4-20250514",
12 "messages": [
13 {"role": "user", "content": f"{prompt}\n\nTranscript: {text}"}
14 ],
15 "max_tokens": 4000
16 }
17
18 result = requests.post(
19 "https://llm-gateway.assemblyai.com/v1/chat/completions",
20 headers=headers,
21 json=llm_gateway_data
22 )
23 return result.json()["choices"][0]["message"]["content"]

Websocket Event Handlers

Open Websocket

1def on_open(ws):
2 """Called when the WebSocket connection is established."""
3 print("WebSocket connection opened.")
4 print(f"Connected to: {API_ENDPOINT}")
5
6 # Start sending audio data in a separate thread
7 def stream_audio():
8 global stream
9 print("Starting audio streaming...")
10 while not stop_event.is_set():
11 try:
12 audio_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False)
13
14 # Store audio data for WAV recording
15 with recording_lock:
16 recorded_frames.append(audio_data)
17
18 # Send audio data as binary message
19 ws.send(audio_data, websocket.ABNF.OPCODE_BINARY)
20 except Exception as e:
21 print(f"Error streaming audio: {e}")
22 # If stream read fails, likely means it's closed, stop the loop
23 break
24 print("Audio streaming stopped.")
25
26 global audio_thread
27 audio_thread = threading.Thread(target=stream_audio)
28 audio_thread.daemon = (
29 True # Allow main thread to exit even if this thread is running
30 )
31 audio_thread.start()

Handle Websocket Messages

In this function, use the previously defined conversation_data to store all final transcripts together for later analysis.

1def on_message(ws, message):
2 try:
3 data = json.loads(message)
4 msg_type = data.get('type')
5
6 if msg_type == "Begin":
7 session_id = data.get('id')
8 expires_at = data.get('expires_at')
9 print(f"\nSession began: ID={session_id}, ExpiresAt={datetime.fromtimestamp(expires_at)}")
10 elif msg_type == "Turn":
11 transcript = data.get('transcript', '')
12 if data.get('end_of_turn'):
13 global conversation_data
14
15 print('\r' + ' ' * 80 + '\r', end='')
16 print(transcript)
17 conversation_data += f"{transcript}\n"
18
19 elif msg_type == "Termination":
20 audio_duration = data.get('audio_duration_seconds', 0)
21 session_duration = data.get('session_duration_seconds', 0)
22 print(f"\nSession Terminated: Audio Duration={audio_duration}s, Session Duration={session_duration}s")
23
24 except json.JSONDecodeError as e:
25 print(f"Error decoding message: {e}")
26 except Exception as e:
27 print(f"Error handling message: {e}")

Close Websocket

1def on_close(ws, close_status_code, close_msg):
2 """Called when the WebSocket connection is closed."""
3 print(f"\nWebSocket Disconnected: Status={close_status_code}, Msg={close_msg}")
4
5 # Ensure audio resources are released
6 global stream, audio
7 stop_event.set() # Signal audio thread just in case it's still running
8
9 if stream:
10 if stream.is_active():
11 stream.stop_stream()
12 stream.close()
13 stream = None
14 if audio:
15 audio.terminate()
16 audio = None
17 # Try to join the audio thread to ensure clean exit
18 if audio_thread and audio_thread.is_alive():
19 audio_thread.join(timeout=1.0)

Websocket Error Handling

1def on_error(ws, error):
2 """Called when a WebSocket error occurs."""
3 print(f"\nWebSocket Error: {error}")
4 # Attempt to signal stop on error
5 stop_event.set()

Begin Streaming STT Transcription

After the socket is closed, conversation_data is sent to the analyze_with_llm_gateway function and the LLM Gateway results are printed out.

1def run():
2 global audio, stream, ws_app
3
4 # Initialize PyAudio
5 audio = pyaudio.PyAudio()
6
7 # Open microphone stream
8 try:
9 stream = audio.open(
10 input=True,
11 frames_per_buffer=FRAMES_PER_BUFFER,
12 channels=CHANNELS,
13 format=FORMAT,
14 rate=SAMPLE_RATE,
15 )
16 print("Microphone stream opened successfully.")
17 print("Speak into your microphone. Press Ctrl+C to stop.")
18 print("Audio will be saved to a WAV file when the session ends.")
19 except Exception as e:
20 print(f"Error opening microphone stream: {e}")
21 if audio:
22 audio.terminate()
23 return # Exit if microphone cannot be opened
24
25 # Create WebSocketApp
26 ws_app = websocket.WebSocketApp(
27 API_ENDPOINT,
28 header={"Authorization": YOUR_API_KEY},
29 on_open=on_open,
30 on_message=on_message,
31 on_error=on_error,
32 on_close=on_close,
33 )
34
35 # Run WebSocketApp in a separate thread to allow main thread to catch KeyboardInterrupt
36 ws_thread = threading.Thread(target=ws_app.run_forever)
37 ws_thread.daemon = True
38 ws_thread.start()
39
40 try:
41 # Keep main thread alive until interrupted
42 while ws_thread.is_alive():
43 time.sleep(0.1)
44 except KeyboardInterrupt:
45 print("\nCtrl+C received. Stopping...")
46 stop_event.set() # Signal audio thread to stop
47
48 # Send termination message to the server
49 if ws_app and ws_app.sock and ws_app.sock.connected:
50 try:
51 terminate_message = {"type": "Terminate"}
52 print(f"Sending termination message: {json.dumps(terminate_message)}")
53 ws_app.send(json.dumps(terminate_message))
54 # Give a moment for messages to process before forceful close
55 time.sleep(5)
56 except Exception as e:
57 print(f"Error sending termination message: {e}")
58
59 # Close the WebSocket connection (will trigger on_close)
60 if ws_app:
61 ws_app.close()
62
63 # Wait for WebSocket thread to finish
64 ws_thread.join(timeout=2.0)
65
66 # Analyze transcript with LLM Gateway
67 if conversation_data.strip():
68 print("Analyzing conversation with LLM Gateway...")
69 print(analyze_with_llm_gateway(conversation_data))
70 else:
71 print("No conversation data to analyze.")
72
73 except Exception as e:
74 print(f"\nAn unexpected error occurred: {e}")
75 stop_event.set()
76 if ws_app:
77 ws_app.close()
78 ws_thread.join(timeout=2.0)
79
80 finally:
81 # Final cleanup (already handled in on_close, but good as a fallback)
82 if stream and stream.is_active():
83 stream.stop_stream()
84 if stream:
85 stream.close()
86 if audio:
87 audio.terminate()
88 print("Cleanup complete. Exiting.")
89
90
91if __name__ == "__main__":
92 run()