Translate Streaming STT Transcripts with LLM Gateway
In this guide, you’ll learn how to implement real-time translation of final transcripts using AssemblyAI’s Streaming model and LLM Gateway.
Quickstart
1 import pyaudio 2 import websocket 3 import json 4 import threading 5 import time 6 import requests 7 from urllib.parse import urlencode 8 from datetime import datetime 9 10 # --- Configuration --- 11 YOUR_API_KEY = "YOUR_API_KEY" # Replace with your actual API key 12 13 CONNECTION_PARAMS = { 14 "sample_rate": 16000, 15 "format_turns": True, # Request formatted final transcripts 16 } 17 API_ENDPOINT_BASE_URL = "wss://streaming.assemblyai.com/v3/ws" 18 API_ENDPOINT = f"{API_ENDPOINT_BASE_URL}?{urlencode(CONNECTION_PARAMS)}" 19 20 # Audio Configuration 21 FRAMES_PER_BUFFER = 800 # 50ms of audio (0.05s * 16000Hz) 22 SAMPLE_RATE = CONNECTION_PARAMS["sample_rate"] 23 CHANNELS = 1 24 FORMAT = pyaudio.paInt16 25 26 # Global variables for audio stream and websocket 27 audio = None 28 stream = None 29 ws_app = None 30 audio_thread = None 31 stop_event = threading.Event() # To signal the audio thread to stop 32 33 # WAV recording variables 34 recorded_frames = [] # Store audio frames for WAV file 35 recording_lock = threading.Lock() # Thread-safe access to recorded_frames 36 37 # --- Function to Translate Text with LLM Gateway --- 38 39 def translate_text(text): 40 """Called when translating final transcripts.""" 41 headers = { 42 "authorization": YOUR_API_KEY 43 } 44 45 llm_gateway_data = { 46 "model": "gemini-2.5-flash-lite", 47 "prompt": f"Translate the following text into Spanish. Do not write a preamble. Just return the translated text.\n\nText: {text}", 48 "max_tokens": 1000 49 } 50 51 result = requests.post( 52 "https://llm-gateway.assemblyai.com/v1/chat/completions", 53 headers=headers, 54 json=llm_gateway_data 55 ) 56 return result.json()["choices"][0]["message"]["content"] 57 58 # --- WebSocket Event Handlers --- 59 60 def on_open(ws): 61 """Called when the WebSocket connection is established.""" 62 print("WebSocket connection opened.") 63 print(f"Connected to: {API_ENDPOINT}") 64 65 # Start sending audio data in a separate thread 66 def stream_audio(): 67 global stream 68 print("Starting audio streaming...") 69 while not stop_event.is_set(): 70 try: 71 audio_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False) 72 73 # Store audio data for WAV recording 74 with recording_lock: 75 recorded_frames.append(audio_data) 76 77 # Send audio data as binary message 78 ws.send(audio_data, websocket.ABNF.OPCODE_BINARY) 79 except Exception as e: 80 print(f"Error streaming audio: {e}") 81 # If stream read fails, likely means it's closed, stop the loop 82 break 83 print("Audio streaming stopped.") 84 85 global audio_thread 86 audio_thread = threading.Thread(target=stream_audio) 87 audio_thread.daemon = ( 88 True # Allow main thread to exit even if this thread is running 89 ) 90 audio_thread.start() 91 92 def on_message(ws, message): 93 try: 94 data = json.loads(message) 95 msg_type = data.get('type') 96 97 if msg_type == "Begin": 98 session_id = data.get('id') 99 expires_at = data.get('expires_at') 100 print(f"\nSession began: ID={session_id}, ExpiresAt={datetime.fromtimestamp(expires_at)}") 101 elif msg_type == "Turn": 102 transcript = data.get('transcript', '') 103 if data.get('end_of_turn'): 104 print('\r' + ' ' * 80 + '\r', end='') 105 print(translate_text(transcript)) 106 107 elif msg_type == "Termination": 108 audio_duration = data.get('audio_duration_seconds', 0) 109 session_duration = data.get('session_duration_seconds', 0) 110 print(f"\nSession Terminated: Audio Duration={audio_duration}s, Session Duration={session_duration}s") 111 112 except json.JSONDecodeError as e: 113 print(f"Error decoding message: {e}") 114 except Exception as e: 115 print(f"Error handling message: {e}") 116 117 def on_error(ws, error): 118 """Called when a WebSocket error occurs.""" 119 print(f"\nWebSocket Error: {error}") 120 # Attempt to signal stop on error 121 stop_event.set() 122 123 def on_close(ws, close_status_code, close_msg): 124 """Called when the WebSocket connection is closed.""" 125 print(f"\nWebSocket Disconnected: Status={close_status_code}, Msg={close_msg}") 126 127 # Ensure audio resources are released 128 global stream, audio 129 stop_event.set() # Signal audio thread just in case it's still running 130 131 if stream: 132 if stream.is_active(): 133 stream.stop_stream() 134 stream.close() 135 stream = None 136 if audio: 137 audio.terminate() 138 audio = None 139 # Try to join the audio thread to ensure clean exit 140 if audio_thread and audio_thread.is_alive(): 141 audio_thread.join(timeout=1.0) 142 143 # --- Main Execution --- 144 145 def run(): 146 global audio, stream, ws_app 147 148 # Initialize PyAudio 149 audio = pyaudio.PyAudio() 150 151 # Open microphone stream 152 try: 153 stream = audio.open( 154 input=True, 155 frames_per_buffer=FRAMES_PER_BUFFER, 156 channels=CHANNELS, 157 format=FORMAT, 158 rate=SAMPLE_RATE, 159 ) 160 print("Microphone stream opened successfully.") 161 print("Speak into your microphone. Press Ctrl+C to stop.") 162 print("Audio will be saved to a WAV file when the session ends.") 163 except Exception as e: 164 print(f"Error opening microphone stream: {e}") 165 if audio: 166 audio.terminate() 167 return # Exit if microphone cannot be opened 168 169 # Create WebSocketApp 170 ws_app = websocket.WebSocketApp( 171 API_ENDPOINT, 172 header={"Authorization": YOUR_API_KEY}, 173 on_open=on_open, 174 on_message=on_message, 175 on_error=on_error, 176 on_close=on_close, 177 ) 178 179 # Run WebSocketApp in a separate thread to allow main thread to catch KeyboardInterrupt 180 ws_thread = threading.Thread(target=ws_app.run_forever) 181 ws_thread.daemon = True 182 ws_thread.start() 183 184 try: 185 # Keep main thread alive until interrupted 186 while ws_thread.is_alive(): 187 time.sleep(0.1) 188 except KeyboardInterrupt: 189 print("\nCtrl+C received. Stopping...") 190 stop_event.set() # Signal audio thread to stop 191 192 # Send termination message to the server 193 if ws_app and ws_app.sock and ws_app.sock.connected: 194 try: 195 terminate_message = {"type": "Terminate"} 196 print(f"Sending termination message: {json.dumps(terminate_message)}") 197 ws_app.send(json.dumps(terminate_message)) 198 # Give a moment for messages to process before forceful close 199 time.sleep(5) 200 except Exception as e: 201 print(f"Error sending termination message: {e}") 202 203 # Close the WebSocket connection (will trigger on_close) 204 if ws_app: 205 ws_app.close() 206 207 # Wait for WebSocket thread to finish 208 ws_thread.join(timeout=2.0) 209 210 except Exception as e: 211 print(f"\nAn unexpected error occurred: {e}") 212 stop_event.set() 213 if ws_app: 214 ws_app.close() 215 ws_thread.join(timeout=2.0) 216 217 finally: 218 # Final cleanup (already handled in on_close, but good as a fallback) 219 if stream and stream.is_active(): 220 stream.stop_stream() 221 if stream: 222 stream.close() 223 if audio: 224 audio.terminate() 225 print("Cleanup complete. Exiting.") 226 227 if __name__ == "__main__": 228 run()
Step-by-Step Instructions
Before we begin, make sure you have an AssemblyAI account and an API key. You can sign up and get your API key from your dashboard.
Import Packages & Set API Key
1 import pyaudio 2 import websocket 3 import json 4 import threading 5 import time 6 import requests 7 from urllib.parse import urlencode 8 from datetime import datetime 9 10 YOUR_API_KEY = "YOUR_API_KEY" # Replace with your actual API key
Audio Configuration & Global Variables
Set all of your audio configurations and global variables. Make sure that you have the parameter format_turns set to True.
1 CONNECTION_PARAMS = { 2 "sample_rate": 16000, 3 "format_turns": True, # Request formatted final transcripts 4 } 5 API_ENDPOINT_BASE_URL = "wss://streaming.assemblyai.com/v3/ws" 6 API_ENDPOINT = f"{API_ENDPOINT_BASE_URL}?{urlencode(CONNECTION_PARAMS)}" 7 8 # Audio Configuration 9 FRAMES_PER_BUFFER = 800 # 50ms of audio (0.05s * 16000Hz) 10 SAMPLE_RATE = CONNECTION_PARAMS["sample_rate"] 11 CHANNELS = 1 12 FORMAT = pyaudio.paInt16 13 14 # Global variables for audio stream and websocket 15 audio = None 16 stream = None 17 ws_app = None 18 audio_thread = None 19 stop_event = threading.Event() # To signal the audio thread to stop 20 21 # WAV recording variables 22 recorded_frames = [] # Store audio frames for WAV file 23 recording_lock = threading.Lock() # Thread-safe access to recorded_frames
Define Translate Text Function
Define a function called translate_text, which uses LLM Gateway to translate the English final transcripts into another language. This example is translating the text into Spanish. To set this to a different language, just replace “Spanish” in the prompt with your language of choice.
1 def translate_text(text): 2 """Called when translating final transcripts.""" 3 headers = { 4 "authorization": YOUR_API_KEY 5 } 6 7 llm_gateway_data = { 8 "model": "claude-sonnet-4-20250514", 9 "prompt": f"Translate the following text into Spanish. Do not write a preamble. Just return the translated text.\n\nText: {text}", 10 "max_tokens": 1000 11 } 12 13 result = requests.post( 14 "https://llm-gateway.assemblyai.com/v1/chat/completions", 15 headers=headers, 16 json=llm_gateway_data 17 ) 18 return result.json()["choices"][0]["message"]["content"]
Websocket Event Handlers
Open Websocket
1 def on_open(ws): 2 """Called when the WebSocket connection is established.""" 3 print("WebSocket connection opened.") 4 print(f"Connected to: {API_ENDPOINT}") 5 6 # Start sending audio data in a separate thread 7 def stream_audio(): 8 global stream 9 print("Starting audio streaming...") 10 while not stop_event.is_set(): 11 try: 12 audio_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False) 13 14 # Store audio data for WAV recording 15 with recording_lock: 16 recorded_frames.append(audio_data) 17 18 # Send audio data as binary message 19 ws.send(audio_data, websocket.ABNF.OPCODE_BINARY) 20 except Exception as e: 21 print(f"Error streaming audio: {e}") 22 # If stream read fails, likely means it's closed, stop the loop 23 break 24 print("Audio streaming stopped.") 25 26 global audio_thread 27 audio_thread = threading.Thread(target=stream_audio) 28 audio_thread.daemon = ( 29 True # Allow main thread to exit even if this thread is running 30 ) 31 audio_thread.start()
Handle Websocket Messages
In this function, use the previously defined translate_text to translate all final transcripts.
1 def on_message(ws, message): 2 try: 3 data = json.loads(message) 4 msg_type = data.get('type') 5 6 if msg_type == "Begin": 7 session_id = data.get('id') 8 expires_at = data.get('expires_at') 9 print(f"\nSession began: ID={session_id}, ExpiresAt={datetime.fromtimestamp(expires_at)}") 10 elif msg_type == "Turn": 11 transcript = data.get('transcript', '') 12 if data.get('end_of_turn'): 13 print('\r' + ' ' * 80 + '\r', end='') 14 print(translate_text(transcript)) 15 16 elif msg_type == "Termination": 17 audio_duration = data.get('audio_duration_seconds', 0) 18 session_duration = data.get('session_duration_seconds', 0) 19 print(f"\nSession Terminated: Audio Duration={audio_duration}s, Session Duration={session_duration}s") 20 21 except json.JSONDecodeError as e: 22 print(f"Error decoding message: {e}") 23 except Exception as e: 24 print(f"Error handling message: {e}")
Close Websocket
1 def on_close(ws, close_status_code, close_msg): 2 """Called when the WebSocket connection is closed.""" 3 print(f"\nWebSocket Disconnected: Status={close_status_code}, Msg={close_msg}") 4 5 # Ensure audio resources are released 6 global stream, audio 7 stop_event.set() # Signal audio thread just in case it's still running 8 9 if stream: 10 if stream.is_active(): 11 stream.stop_stream() 12 stream.close() 13 stream = None 14 if audio: 15 audio.terminate() 16 audio = None 17 # Try to join the audio thread to ensure clean exit 18 if audio_thread and audio_thread.is_alive(): 19 audio_thread.join(timeout=1.0)
Websocket Error Handling
1 def on_error(ws, error): 2 """Called when a WebSocket error occurs.""" 3 print(f"\nWebSocket Error: {error}") 4 # Attempt to signal stop on error 5 stop_event.set()
Begin Streaming STT Transcription
1 def run(): 2 global audio, stream, ws_app 3 4 # Initialize PyAudio 5 audio = pyaudio.PyAudio() 6 7 # Open microphone stream 8 try: 9 stream = audio.open( 10 input=True, 11 frames_per_buffer=FRAMES_PER_BUFFER, 12 channels=CHANNELS, 13 format=FORMAT, 14 rate=SAMPLE_RATE, 15 ) 16 print("Microphone stream opened successfully.") 17 print("Speak into your microphone. Press Ctrl+C to stop.") 18 print("Audio will be saved to a WAV file when the session ends.") 19 except Exception as e: 20 print(f"Error opening microphone stream: {e}") 21 if audio: 22 audio.terminate() 23 return # Exit if microphone cannot be opened 24 25 # Create WebSocketApp 26 ws_app = websocket.WebSocketApp( 27 API_ENDPOINT, 28 header={"Authorization": YOUR_API_KEY}, 29 on_open=on_open, 30 on_message=on_message, 31 on_error=on_error, 32 on_close=on_close, 33 ) 34 35 # Run WebSocketApp in a separate thread to allow main thread to catch KeyboardInterrupt 36 ws_thread = threading.Thread(target=ws_app.run_forever) 37 ws_thread.daemon = True 38 ws_thread.start() 39 40 try: 41 # Keep main thread alive until interrupted 42 while ws_thread.is_alive(): 43 time.sleep(0.1) 44 except KeyboardInterrupt: 45 print("\nCtrl+C received. Stopping...") 46 stop_event.set() # Signal audio thread to stop 47 48 # Send termination message to the server 49 if ws_app and ws_app.sock and ws_app.sock.connected: 50 try: 51 terminate_message = {"type": "Terminate"} 52 print(f"Sending termination message: {json.dumps(terminate_message)}") 53 ws_app.send(json.dumps(terminate_message)) 54 # Give a moment for messages to process before forceful close 55 time.sleep(5) 56 except Exception as e: 57 print(f"Error sending termination message: {e}") 58 59 # Close the WebSocket connection (will trigger on_close) 60 if ws_app: 61 ws_app.close() 62 63 # Wait for WebSocket thread to finish 64 ws_thread.join(timeout=2.0) 65 66 except Exception as e: 67 print(f"\nAn unexpected error occurred: {e}") 68 stop_event.set() 69 if ws_app: 70 ws_app.close() 71 ws_thread.join(timeout=2.0) 72 73 finally: 74 # Final cleanup (already handled in on_close, but good as a fallback) 75 if stream and stream.is_active(): 76 stream.stop_stream() 77 if stream: 78 stream.close() 79 if audio: 80 audio.terminate() 81 print("Cleanup complete. Exiting.") 82 83 if __name__ == "__main__": 84 run()