Translate Streaming STT Transcripts with LeMUR
In this guide, you’ll learn how to implement real-time translation of final transcripts using AssemblyAI’s Streaming model and LeMUR framework.
Quickstart
1 import pyaudio 2 import websocket 3 import json 4 import threading 5 import time 6 import wave 7 import requests 8 from urllib.parse import urlencode 9 from datetime import datetime 10 11 # --- Configuration --- 12 YOUR_API_KEY = "YOUR_API_KEY" # Replace with your actual API key 13 14 CONNECTION_PARAMS = { 15 "sample_rate": 16000, 16 "format_turns": True, # Request formatted final transcripts 17 } 18 API_ENDPOINT_BASE_URL = "wss://streaming.assemblyai.com/v3/ws" 19 API_ENDPOINT = f"{API_ENDPOINT_BASE_URL}?{urlencode(CONNECTION_PARAMS)}" 20 21 # Audio Configuration 22 FRAMES_PER_BUFFER = 800 # 50ms of audio (0.05s * 16000Hz) 23 SAMPLE_RATE = CONNECTION_PARAMS["sample_rate"] 24 CHANNELS = 1 25 FORMAT = pyaudio.paInt16 26 27 # Global variables for audio stream and websocket 28 audio = None 29 stream = None 30 ws_app = None 31 audio_thread = None 32 stop_event = threading.Event() # To signal the audio thread to stop 33 34 # WAV recording variables 35 recorded_frames = [] # Store audio frames for WAV file 36 recording_lock = threading.Lock() # Thread-safe access to recorded_frames 37 38 # --- Function to Translate Text with LeMUR --- 39 40 def translate_text(text): 41 """Called when translating final transcripts.""" 42 headers = { 43 "authorization": YOUR_API_KEY 44 } 45 46 prompt = "Translate the following text into Spanish. Do not write a preamble. Just return the translated text." 47 48 lemur_data = { 49 "prompt": prompt, 50 "input_text": text, 51 "final_model": "anthropic/claude-3-7-sonnet-20250219", 52 } 53 result = requests.post("https://api.assemblyai.com/lemur/v3/generate/task", headers=headers, json=lemur_data) 54 return result.json()["response"] 55 56 # --- WebSocket Event Handlers --- 57 58 def on_open(ws): 59 """Called when the WebSocket connection is established.""" 60 print("WebSocket connection opened.") 61 print(f"Connected to: {API_ENDPOINT}") 62 63 # Start sending audio data in a separate thread 64 def stream_audio(): 65 global stream 66 print("Starting audio streaming...") 67 while not stop_event.is_set(): 68 try: 69 audio_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False) 70 71 # Store audio data for WAV recording 72 with recording_lock: 73 recorded_frames.append(audio_data) 74 75 # Send audio data as binary message 76 ws.send(audio_data, websocket.ABNF.OPCODE_BINARY) 77 except Exception as e: 78 print(f"Error streaming audio: {e}") 79 # If stream read fails, likely means it's closed, stop the loop 80 break 81 print("Audio streaming stopped.") 82 83 global audio_thread 84 audio_thread = threading.Thread(target=stream_audio) 85 audio_thread.daemon = ( 86 True # Allow main thread to exit even if this thread is running 87 ) 88 audio_thread.start() 89 90 def on_message(ws, message): 91 try: 92 data = json.loads(message) 93 msg_type = data.get('type') 94 95 if msg_type == "Begin": 96 session_id = data.get('id') 97 expires_at = data.get('expires_at') 98 print(f"\nSession began: ID={session_id}, ExpiresAt={datetime.fromtimestamp(expires_at)}") 99 elif msg_type == "Turn": 100 transcript = data.get('transcript', '') 101 formatted = data.get('turn_is_formatted', False) 102 103 if formatted: 104 print('\r' + ' ' * 80 + '\r', end='') 105 print(translate_text(transcript)) 106 107 elif msg_type == "Termination": 108 audio_duration = data.get('audio_duration_seconds', 0) 109 session_duration = data.get('session_duration_seconds', 0) 110 print(f"\nSession Terminated: Audio Duration={audio_duration}s, Session Duration={session_duration}s") 111 112 except json.JSONDecodeError as e: 113 print(f"Error decoding message: {e}") 114 except Exception as e: 115 print(f"Error handling message: {e}") 116 117 def on_error(ws, error): 118 """Called when a WebSocket error occurs.""" 119 print(f"\nWebSocket Error: {error}") 120 # Attempt to signal stop on error 121 stop_event.set() 122 123 def on_close(ws, close_status_code, close_msg): 124 """Called when the WebSocket connection is closed.""" 125 print(f"\nWebSocket Disconnected: Status={close_status_code}, Msg={close_msg}") 126 127 # Ensure audio resources are released 128 global stream, audio 129 stop_event.set() # Signal audio thread just in case it's still running 130 131 if stream: 132 if stream.is_active(): 133 stream.stop_stream() 134 stream.close() 135 stream = None 136 if audio: 137 audio.terminate() 138 audio = None 139 # Try to join the audio thread to ensure clean exit 140 if audio_thread and audio_thread.is_alive(): 141 audio_thread.join(timeout=1.0) 142 143 # --- Main Execution --- 144 145 def run(): 146 global audio, stream, ws_app 147 148 # Initialize PyAudio 149 audio = pyaudio.PyAudio() 150 151 # Open microphone stream 152 try: 153 stream = audio.open( 154 input=True, 155 frames_per_buffer=FRAMES_PER_BUFFER, 156 channels=CHANNELS, 157 format=FORMAT, 158 rate=SAMPLE_RATE, 159 ) 160 print("Microphone stream opened successfully.") 161 print("Speak into your microphone. Press Ctrl+C to stop.") 162 print("Audio will be saved to a WAV file when the session ends.") 163 except Exception as e: 164 print(f"Error opening microphone stream: {e}") 165 if audio: 166 audio.terminate() 167 return # Exit if microphone cannot be opened 168 169 # Create WebSocketApp 170 ws_app = websocket.WebSocketApp( 171 API_ENDPOINT, 172 header={"Authorization": YOUR_API_KEY}, 173 on_open=on_open, 174 on_message=on_message, 175 on_error=on_error, 176 on_close=on_close, 177 ) 178 179 # Run WebSocketApp in a separate thread to allow main thread to catch KeyboardInterrupt 180 ws_thread = threading.Thread(target=ws_app.run_forever) 181 ws_thread.daemon = True 182 ws_thread.start() 183 184 try: 185 # Keep main thread alive until interrupted 186 while ws_thread.is_alive(): 187 time.sleep(0.1) 188 except KeyboardInterrupt: 189 print("\nCtrl+C received. Stopping...") 190 stop_event.set() # Signal audio thread to stop 191 192 # Send termination message to the server 193 if ws_app and ws_app.sock and ws_app.sock.connected: 194 try: 195 terminate_message = {"type": "Terminate"} 196 print(f"Sending termination message: {json.dumps(terminate_message)}") 197 ws_app.send(json.dumps(terminate_message)) 198 # Give a moment for messages to process before forceful close 199 time.sleep(5) 200 except Exception as e: 201 print(f"Error sending termination message: {e}") 202 203 # Close the WebSocket connection (will trigger on_close) 204 if ws_app: 205 ws_app.close() 206 207 # Wait for WebSocket thread to finish 208 ws_thread.join(timeout=2.0) 209 210 except Exception as e: 211 print(f"\nAn unexpected error occurred: {e}") 212 stop_event.set() 213 if ws_app: 214 ws_app.close() 215 ws_thread.join(timeout=2.0) 216 217 finally: 218 # Final cleanup (already handled in on_close, but good as a fallback) 219 if stream and stream.is_active(): 220 stream.stop_stream() 221 if stream: 222 stream.close() 223 if audio: 224 audio.terminate() 225 print("Cleanup complete. Exiting.") 226 227 if __name__ == "__main__": 228 run()
Step-by-Step Instructions
Before we begin, make sure you have an AssemblyAI account and an API key. You can sign up and get your API key from your dashboard.
Import Packages & Set API Key
1 import pyaudio 2 import websocket 3 import json 4 import threading 5 import time 6 import requests 7 from urllib.parse import urlencode 8 from datetime import datetime 9 10 YOUR_API_KEY = "YOUR_API_KEY" # Replace with your actual API key
Audio Configuration & Global Variables
Set all of your audio configurations and global variables. Make sure that you have the parameter format_turns
set to True
.
1 CONNECTION_PARAMS = { 2 "sample_rate": 16000, 3 "format_turns": True, # Request formatted final transcripts 4 } 5 API_ENDPOINT_BASE_URL = "wss://streaming.assemblyai.com/v3/ws" 6 API_ENDPOINT = f"{API_ENDPOINT_BASE_URL}?{urlencode(CONNECTION_PARAMS)}" 7 8 # Audio Configuration 9 FRAMES_PER_BUFFER = 800 # 50ms of audio (0.05s * 16000Hz) 10 SAMPLE_RATE = CONNECTION_PARAMS["sample_rate"] 11 CHANNELS = 1 12 FORMAT = pyaudio.paInt16 13 14 # Global variables for audio stream and websocket 15 audio = None 16 stream = None 17 ws_app = None 18 audio_thread = None 19 stop_event = threading.Event() # To signal the audio thread to stop 20 21 # WAV recording variables 22 recorded_frames = [] # Store audio frames for WAV file 23 recording_lock = threading.Lock() # Thread-safe access to recorded_frames
Define Translate Text Function
Define a function called translate_text
, which uses LeMUR to translate the English final transcripts into another language. This example is translating the text into Spanish. To set this to a different language, just replace “Spanish” in the prompt with your language of choice.
1 def translate_text(text): 2 """Called when translating final transcripts.""" 3 headers = { 4 "authorization": YOUR_API_KEY 5 } 6 7 prompt = "Translate the following text into Spanish. Do not write a preamble. Just return the translated text." 8 9 lemur_data = { 10 "prompt": prompt, 11 "input_text": text, 12 "final_model": "anthropic/claude-3-7-sonnet-20250219", 13 } 14 result = requests.post("https://api.assemblyai.com/lemur/v3/generate/task", headers=headers, json=lemur_data) 15 return result.json()["response"]
Websocket Event Handlers
Open Websocket
1 def on_open(ws): 2 """Called when the WebSocket connection is established.""" 3 print("WebSocket connection opened.") 4 print(f"Connected to: {API_ENDPOINT}") 5 6 # Start sending audio data in a separate thread 7 def stream_audio(): 8 global stream 9 print("Starting audio streaming...") 10 while not stop_event.is_set(): 11 try: 12 audio_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False) 13 14 # Store audio data for WAV recording 15 with recording_lock: 16 recorded_frames.append(audio_data) 17 18 # Send audio data as binary message 19 ws.send(audio_data, websocket.ABNF.OPCODE_BINARY) 20 except Exception as e: 21 print(f"Error streaming audio: {e}") 22 # If stream read fails, likely means it's closed, stop the loop 23 break 24 print("Audio streaming stopped.") 25 26 global audio_thread 27 audio_thread = threading.Thread(target=stream_audio) 28 audio_thread.daemon = ( 29 True # Allow main thread to exit even if this thread is running 30 ) 31 audio_thread.start()
Handle Websocket Messages
In this function, use the previously defined translate_text
to translate all final transcripts.
1 def on_message(ws, message): 2 try: 3 data = json.loads(message) 4 msg_type = data.get('type') 5 6 if msg_type == "Begin": 7 session_id = data.get('id') 8 expires_at = data.get('expires_at') 9 print(f"\nSession began: ID={session_id}, ExpiresAt={datetime.fromtimestamp(expires_at)}") 10 elif msg_type == "Turn": 11 transcript = data.get('transcript', '') 12 formatted = data.get('turn_is_formatted', False) 13 14 if formatted: 15 print('\r' + ' ' * 80 + '\r', end='') 16 print(translate_text(transcript)) 17 18 elif msg_type == "Termination": 19 audio_duration = data.get('audio_duration_seconds', 0) 20 session_duration = data.get('session_duration_seconds', 0) 21 print(f"\nSession Terminated: Audio Duration={audio_duration}s, Session Duration={session_duration}s") 22 23 except json.JSONDecodeError as e: 24 print(f"Error decoding message: {e}") 25 except Exception as e: 26 print(f"Error handling message: {e}")
Close Websocket
1 def on_close(ws, close_status_code, close_msg): 2 """Called when the WebSocket connection is closed.""" 3 print(f"\nWebSocket Disconnected: Status={close_status_code}, Msg={close_msg}") 4 5 # Ensure audio resources are released 6 global stream, audio 7 stop_event.set() # Signal audio thread just in case it's still running 8 9 if stream: 10 if stream.is_active(): 11 stream.stop_stream() 12 stream.close() 13 stream = None 14 if audio: 15 audio.terminate() 16 audio = None 17 # Try to join the audio thread to ensure clean exit 18 if audio_thread and audio_thread.is_alive(): 19 audio_thread.join(timeout=1.0)
Websocket Error Handling
1 def on_error(ws, error): 2 """Called when a WebSocket error occurs.""" 3 print(f"\nWebSocket Error: {error}") 4 # Attempt to signal stop on error 5 stop_event.set()
Begin Streaming STT Transcription
1 def run(): 2 global audio, stream, ws_app 3 4 # Initialize PyAudio 5 audio = pyaudio.PyAudio() 6 7 # Open microphone stream 8 try: 9 stream = audio.open( 10 input=True, 11 frames_per_buffer=FRAMES_PER_BUFFER, 12 channels=CHANNELS, 13 format=FORMAT, 14 rate=SAMPLE_RATE, 15 ) 16 print("Microphone stream opened successfully.") 17 print("Speak into your microphone. Press Ctrl+C to stop.") 18 print("Audio will be saved to a WAV file when the session ends.") 19 except Exception as e: 20 print(f"Error opening microphone stream: {e}") 21 if audio: 22 audio.terminate() 23 return # Exit if microphone cannot be opened 24 25 # Create WebSocketApp 26 ws_app = websocket.WebSocketApp( 27 API_ENDPOINT, 28 header={"Authorization": YOUR_API_KEY}, 29 on_open=on_open, 30 on_message=on_message, 31 on_error=on_error, 32 on_close=on_close, 33 ) 34 35 # Run WebSocketApp in a separate thread to allow main thread to catch KeyboardInterrupt 36 ws_thread = threading.Thread(target=ws_app.run_forever) 37 ws_thread.daemon = True 38 ws_thread.start() 39 40 try: 41 # Keep main thread alive until interrupted 42 while ws_thread.is_alive(): 43 time.sleep(0.1) 44 except KeyboardInterrupt: 45 print("\nCtrl+C received. Stopping...") 46 stop_event.set() # Signal audio thread to stop 47 48 # Send termination message to the server 49 if ws_app and ws_app.sock and ws_app.sock.connected: 50 try: 51 terminate_message = {"type": "Terminate"} 52 print(f"Sending termination message: {json.dumps(terminate_message)}") 53 ws_app.send(json.dumps(terminate_message)) 54 # Give a moment for messages to process before forceful close 55 time.sleep(5) 56 except Exception as e: 57 print(f"Error sending termination message: {e}") 58 59 # Close the WebSocket connection (will trigger on_close) 60 if ws_app: 61 ws_app.close() 62 63 # Wait for WebSocket thread to finish 64 ws_thread.join(timeout=2.0) 65 66 except Exception as e: 67 print(f"\nAn unexpected error occurred: {e}") 68 stop_event.set() 69 if ws_app: 70 ws_app.close() 71 ws_thread.join(timeout=2.0) 72 73 finally: 74 # Final cleanup (already handled in on_close, but good as a fallback) 75 if stream and stream.is_active(): 76 stream.stop_stream() 77 if stream: 78 stream.close() 79 if audio: 80 audio.terminate() 81 print("Cleanup complete. Exiting.") 82 83 if __name__ == "__main__": 84 run()