Use LLM Gateway with Streaming Speech-to-Text (STT)
This script is modified to contain a global variable conversation_data that accumulates the transcribed text in the on_message function. Once the transcription session is closed, the conversation_data is sent to LLM Gateway for analysis.
Quickstart
1 import pyaudio 2 import websocket 3 import json 4 import threading 5 import time 6 import requests 7 from urllib.parse import urlencode 8 from datetime import datetime 9 10 # --- Configuration --- 11 YOUR_API_KEY = "YOUR_API_KEY" # Replace with your actual API key 12 13 CONNECTION_PARAMS = { 14 "sample_rate": 16000, 15 "format_turns": True, # Request formatted final transcripts 16 } 17 API_ENDPOINT_BASE_URL = "wss://streaming.assemblyai.com/v3/ws" 18 API_ENDPOINT = f"{API_ENDPOINT_BASE_URL}?{urlencode(CONNECTION_PARAMS)}" 19 20 # Audio Configuration 21 FRAMES_PER_BUFFER = 800 # 50ms of audio (0.05s * 16000Hz) 22 SAMPLE_RATE = CONNECTION_PARAMS["sample_rate"] 23 CHANNELS = 1 24 FORMAT = pyaudio.paInt16 25 26 # Global variables for audio stream and websocket 27 audio = None 28 stream = None 29 ws_app = None 30 audio_thread = None 31 stop_event = threading.Event() # To signal the audio thread to stop 32 conversation_data = "" 33 34 # WAV recording variables 35 recorded_frames = [] # Store audio frames for WAV file 36 recording_lock = threading.Lock() # Thread-safe access to recorded_frames 37 38 # --- Function to Analyze Text with LLM Gateway --- 39 40 def analyze_with_llm_gateway(text): 41 """Called when the WebSocket connection is closing and the transcript text is sent to LLM Gateway to be analyzed.""" 42 headers = { 43 "authorization": YOUR_API_KEY, 44 "content-type": "application/json" 45 } 46 47 prompt = "You are a helpful coach. Provide an analysis of the transcript and offer areas to improve with exact quotes. Include no preamble. Start with an overall summary then get into the examples with feedback." 48 49 llm_gateway_data = { 50 "model": "claude-sonnet-4-20250514", 51 "messages": [ 52 {"role": "user", "content": f"{prompt}\n\nTranscript: {text}"} 53 ], 54 "max_tokens": 4000 55 } 56 57 result = requests.post( 58 "https://llm-gateway.assemblyai.com/v1/chat/completions", 59 headers=headers, 60 json=llm_gateway_data 61 ) 62 return result.json()["choices"][0]["message"]["content"] 63 64 # --- WebSocket Event Handlers --- 65 66 def on_open(ws): 67 """Called when the WebSocket connection is established.""" 68 print("WebSocket connection opened.") 69 print(f"Connected to: {API_ENDPOINT}") 70 71 # Start sending audio data in a separate thread 72 def stream_audio(): 73 global stream 74 print("Starting audio streaming...") 75 while not stop_event.is_set(): 76 try: 77 audio_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False) 78 79 # Store audio data for WAV recording 80 with recording_lock: 81 recorded_frames.append(audio_data) 82 83 # Send audio data as binary message 84 ws.send(audio_data, websocket.ABNF.OPCODE_BINARY) 85 except Exception as e: 86 print(f"Error streaming audio: {e}") 87 # If stream read fails, likely means it's closed, stop the loop 88 break 89 print("Audio streaming stopped.") 90 91 global audio_thread 92 audio_thread = threading.Thread(target=stream_audio) 93 audio_thread.daemon = ( 94 True # Allow main thread to exit even if this thread is running 95 ) 96 audio_thread.start() 97 98 def on_message(ws, message): 99 100 try: 101 data = json.loads(message) 102 msg_type = data.get('type') 103 104 if msg_type == "Begin": 105 session_id = data.get('id') 106 expires_at = data.get('expires_at') 107 print(f"\nSession began: ID={session_id}, ExpiresAt={datetime.fromtimestamp(expires_at)}") 108 elif msg_type == "Turn": 109 transcript = data.get('transcript', '') 110 formatted = data.get('turn_is_formatted', False) 111 112 if formatted: 113 global conversation_data 114 115 print('\r' + ' ' * 80 + '\r', end='') 116 print(transcript) 117 conversation_data += f"{transcript}\n" 118 119 elif msg_type == "Termination": 120 audio_duration = data.get('audio_duration_seconds', 0) 121 session_duration = data.get('session_duration_seconds', 0) 122 print(f"\nSession Terminated: Audio Duration={audio_duration}s, Session Duration={session_duration}s") 123 except json.JSONDecodeError as e: 124 print(f"Error decoding message: {e}") 125 except Exception as e: 126 print(f"Error handling message: {e}") 127 128 def on_error(ws, error): 129 """Called when a WebSocket error occurs.""" 130 print(f"\nWebSocket Error: {error}") 131 # Attempt to signal stop on error 132 stop_event.set() 133 134 def on_close(ws, close_status_code, close_msg): 135 """Called when the WebSocket connection is closed.""" 136 print(f"\nWebSocket Disconnected: Status={close_status_code}, Msg={close_msg}") 137 138 # Ensure audio resources are released 139 global stream, audio 140 stop_event.set() # Signal audio thread just in case it's still running 141 142 if stream: 143 if stream.is_active(): 144 stream.stop_stream() 145 stream.close() 146 stream = None 147 if audio: 148 audio.terminate() 149 audio = None 150 # Try to join the audio thread to ensure clean exit 151 if audio_thread and audio_thread.is_alive(): 152 audio_thread.join(timeout=1.0) 153 154 # --- Main Execution --- 155 156 def run(): 157 global audio, stream, ws_app 158 159 # Initialize PyAudio 160 audio = pyaudio.PyAudio() 161 162 # Open microphone stream 163 try: 164 stream = audio.open( 165 input=True, 166 frames_per_buffer=FRAMES_PER_BUFFER, 167 channels=CHANNELS, 168 format=FORMAT, 169 rate=SAMPLE_RATE, 170 ) 171 print("Microphone stream opened successfully.") 172 print("Speak into your microphone. Press Ctrl+C to stop.") 173 print("Audio will be saved to a WAV file when the session ends.") 174 except Exception as e: 175 print(f"Error opening microphone stream: {e}") 176 if audio: 177 audio.terminate() 178 return # Exit if microphone cannot be opened 179 180 # Create WebSocketApp 181 ws_app = websocket.WebSocketApp( 182 API_ENDPOINT, 183 header={"Authorization": YOUR_API_KEY}, 184 on_open=on_open, 185 on_message=on_message, 186 on_error=on_error, 187 on_close=on_close, 188 ) 189 190 # Run WebSocketApp in a separate thread to allow main thread to catch KeyboardInterrupt 191 ws_thread = threading.Thread(target=ws_app.run_forever) 192 ws_thread.daemon = True 193 ws_thread.start() 194 195 try: 196 # Keep main thread alive until interrupted 197 while ws_thread.is_alive(): 198 time.sleep(0.1) 199 except KeyboardInterrupt: 200 print("\nCtrl+C received. Stopping...") 201 stop_event.set() # Signal audio thread to stop 202 203 # Send termination message to the server 204 if ws_app and ws_app.sock and ws_app.sock.connected: 205 try: 206 terminate_message = {"type": "Terminate"} 207 print(f"Sending termination message: {json.dumps(terminate_message)}") 208 ws_app.send(json.dumps(terminate_message)) 209 # Give a moment for messages to process before forceful close 210 time.sleep(5) 211 except Exception as e: 212 print(f"Error sending termination message: {e}") 213 214 # Close the WebSocket connection (will trigger on_close) 215 if ws_app: 216 ws_app.close() 217 218 # Wait for WebSocket thread to finish 219 ws_thread.join(timeout=2.0) 220 221 # Analyze transcript with LLM Gateway 222 if conversation_data.strip(): 223 print("Analyzing conversation with LLM Gateway...") 224 print(analyze_with_llm_gateway(conversation_data)) 225 else: 226 print("No conversation data to analyze.") 227 228 except Exception as e: 229 print(f"\nAn unexpected error occurred: {e}") 230 stop_event.set() 231 if ws_app: 232 ws_app.close() 233 ws_thread.join(timeout=2.0) 234 235 finally: 236 # Final cleanup (already handled in on_close, but good as a fallback) 237 if stream and stream.is_active(): 238 stream.stop_stream() 239 if stream: 240 stream.close() 241 if audio: 242 audio.terminate() 243 print("Cleanup complete. Exiting.") 244 245 246 if __name__ == "__main__": 247 run()
Step-by-Step Instructions
Before we begin, make sure you have an AssemblyAI account and an API key. You can sign up and get your API key from your dashboard.
Import Packages & Set API Key
1 import pyaudio 2 import websocket 3 import json 4 import threading 5 import time 6 import requests 7 from urllib.parse import urlencode 8 from datetime import datetime 9 10 YOUR_API_KEY = "YOUR_API_KEY" # Replace with your actual API key
Audio Configuration & Global Variables
Set all of your audio configurations and global variables. Make sure that you have the parameter format_turns set to True and a global variable conversation_data set to an empty string "".
1 CONNECTION_PARAMS = { 2 "sample_rate": 16000, 3 "format_turns": True, # Request formatted final transcripts 4 } 5 API_ENDPOINT_BASE_URL = "wss://streaming.assemblyai.com/v3/ws" 6 API_ENDPOINT = f"{API_ENDPOINT_BASE_URL}?{urlencode(CONNECTION_PARAMS)}" 7 8 # Audio Configuration 9 FRAMES_PER_BUFFER = 800 # 50ms of audio (0.05s * 16000Hz) 10 SAMPLE_RATE = CONNECTION_PARAMS["sample_rate"] 11 CHANNELS = 1 12 FORMAT = pyaudio.paInt16 13 14 # Global variables for audio stream and websocket 15 audio = None 16 stream = None 17 ws_app = None 18 audio_thread = None 19 stop_event = threading.Event() # To signal the audio thread to stop 20 conversation_data = "" 21 22 # WAV recording variables 23 recorded_frames = [] # Store audio frames for WAV file 24 recording_lock = threading.Lock() # Thread-safe access to recorded_frames
Define Analyze With LLM Gateway Function
Define a function called analyze_with_llm_gateway, which uses LLM Gateway to analyze the complete final transcript text. The prompt can be modified to suit your individual requirements.
1 def analyze_with_llm_gateway(text): 2 """Called when the WebSocket connection is closing and the transcript text is sent to LLM Gateway to be analyzed.""" 3 headers = { 4 "authorization": YOUR_API_KEY, 5 "content-type": "application/json" 6 } 7 8 prompt = "You are a helpful coach. Provide an analysis of the transcript and offer areas to improve with exact quotes. Include no preamble. Start with an overall summary then get into the examples with feedback." 9 10 llm_gateway_data = { 11 "model": "claude-sonnet-4-20250514", 12 "messages": [ 13 {"role": "user", "content": f"{prompt}\n\nTranscript: {text}"} 14 ], 15 "max_tokens": 4000 16 } 17 18 result = requests.post( 19 "https://llm-gateway.assemblyai.com/v1/chat/completions", 20 headers=headers, 21 json=llm_gateway_data 22 ) 23 return result.json()["choices"][0]["message"]["content"]
Websocket Event Handlers
Open Websocket
1 def on_open(ws): 2 """Called when the WebSocket connection is established.""" 3 print("WebSocket connection opened.") 4 print(f"Connected to: {API_ENDPOINT}") 5 6 # Start sending audio data in a separate thread 7 def stream_audio(): 8 global stream 9 print("Starting audio streaming...") 10 while not stop_event.is_set(): 11 try: 12 audio_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False) 13 14 # Store audio data for WAV recording 15 with recording_lock: 16 recorded_frames.append(audio_data) 17 18 # Send audio data as binary message 19 ws.send(audio_data, websocket.ABNF.OPCODE_BINARY) 20 except Exception as e: 21 print(f"Error streaming audio: {e}") 22 # If stream read fails, likely means it's closed, stop the loop 23 break 24 print("Audio streaming stopped.") 25 26 global audio_thread 27 audio_thread = threading.Thread(target=stream_audio) 28 audio_thread.daemon = ( 29 True # Allow main thread to exit even if this thread is running 30 ) 31 audio_thread.start()
Handle Websocket Messages
In this function, use the previously defined conversation_data to store all final transcripts together for later analysis.
1 def on_message(ws, message): 2 try: 3 data = json.loads(message) 4 msg_type = data.get('type') 5 6 if msg_type == "Begin": 7 session_id = data.get('id') 8 expires_at = data.get('expires_at') 9 print(f"\nSession began: ID={session_id}, ExpiresAt={datetime.fromtimestamp(expires_at)}") 10 elif msg_type == "Turn": 11 transcript = data.get('transcript', '') 12 formatted = data.get('turn_is_formatted', False) 13 14 if formatted: 15 global conversation_data 16 17 print('\r' + ' ' * 80 + '\r', end='') 18 print(transcript) 19 conversation_data += f"{transcript}\n" 20 21 elif msg_type == "Termination": 22 audio_duration = data.get('audio_duration_seconds', 0) 23 session_duration = data.get('session_duration_seconds', 0) 24 print(f"\nSession Terminated: Audio Duration={audio_duration}s, Session Duration={session_duration}s") 25 26 except json.JSONDecodeError as e: 27 print(f"Error decoding message: {e}") 28 except Exception as e: 29 print(f"Error handling message: {e}")
Close Websocket
1 def on_close(ws, close_status_code, close_msg): 2 """Called when the WebSocket connection is closed.""" 3 print(f"\nWebSocket Disconnected: Status={close_status_code}, Msg={close_msg}") 4 5 # Ensure audio resources are released 6 global stream, audio 7 stop_event.set() # Signal audio thread just in case it's still running 8 9 if stream: 10 if stream.is_active(): 11 stream.stop_stream() 12 stream.close() 13 stream = None 14 if audio: 15 audio.terminate() 16 audio = None 17 # Try to join the audio thread to ensure clean exit 18 if audio_thread and audio_thread.is_alive(): 19 audio_thread.join(timeout=1.0)
Websocket Error Handling
1 def on_error(ws, error): 2 """Called when a WebSocket error occurs.""" 3 print(f"\nWebSocket Error: {error}") 4 # Attempt to signal stop on error 5 stop_event.set()
Begin Streaming STT Transcription
After the socket is closed, conversation_data is sent to the analyze_with_llm_gateway function and the LLM Gateway results are printed out.
1 def run(): 2 global audio, stream, ws_app 3 4 # Initialize PyAudio 5 audio = pyaudio.PyAudio() 6 7 # Open microphone stream 8 try: 9 stream = audio.open( 10 input=True, 11 frames_per_buffer=FRAMES_PER_BUFFER, 12 channels=CHANNELS, 13 format=FORMAT, 14 rate=SAMPLE_RATE, 15 ) 16 print("Microphone stream opened successfully.") 17 print("Speak into your microphone. Press Ctrl+C to stop.") 18 print("Audio will be saved to a WAV file when the session ends.") 19 except Exception as e: 20 print(f"Error opening microphone stream: {e}") 21 if audio: 22 audio.terminate() 23 return # Exit if microphone cannot be opened 24 25 # Create WebSocketApp 26 ws_app = websocket.WebSocketApp( 27 API_ENDPOINT, 28 header={"Authorization": YOUR_API_KEY}, 29 on_open=on_open, 30 on_message=on_message, 31 on_error=on_error, 32 on_close=on_close, 33 ) 34 35 # Run WebSocketApp in a separate thread to allow main thread to catch KeyboardInterrupt 36 ws_thread = threading.Thread(target=ws_app.run_forever) 37 ws_thread.daemon = True 38 ws_thread.start() 39 40 try: 41 # Keep main thread alive until interrupted 42 while ws_thread.is_alive(): 43 time.sleep(0.1) 44 except KeyboardInterrupt: 45 print("\nCtrl+C received. Stopping...") 46 stop_event.set() # Signal audio thread to stop 47 48 # Send termination message to the server 49 if ws_app and ws_app.sock and ws_app.sock.connected: 50 try: 51 terminate_message = {"type": "Terminate"} 52 print(f"Sending termination message: {json.dumps(terminate_message)}") 53 ws_app.send(json.dumps(terminate_message)) 54 # Give a moment for messages to process before forceful close 55 time.sleep(5) 56 except Exception as e: 57 print(f"Error sending termination message: {e}") 58 59 # Close the WebSocket connection (will trigger on_close) 60 if ws_app: 61 ws_app.close() 62 63 # Wait for WebSocket thread to finish 64 ws_thread.join(timeout=2.0) 65 66 # Analyze transcript with LLM Gateway 67 if conversation_data.strip(): 68 print("Analyzing conversation with LLM Gateway...") 69 print(analyze_with_llm_gateway(conversation_data)) 70 else: 71 print("No conversation data to analyze.") 72 73 except Exception as e: 74 print(f"\nAn unexpected error occurred: {e}") 75 stop_event.set() 76 if ws_app: 77 ws_app.close() 78 ws_thread.join(timeout=2.0) 79 80 finally: 81 # Final cleanup (already handled in on_close, but good as a fallback) 82 if stream and stream.is_active(): 83 stream.stop_stream() 84 if stream: 85 stream.close() 86 if audio: 87 audio.terminate() 88 print("Cleanup complete. Exiting.") 89 90 91 if __name__ == "__main__": 92 run()