Transcribe System Audio in Real-Time (macOS)
This guide solves the challenge of transcribing system audio, which is can be used for transcribing media content or online calls. By using virtual audio devices, you’ll learn how to easily pipe system audio to AssemblyAI’s transcription API on both Mac and Windows.
The key to success lies in creating a virtual input device that captures your speaker output and converts it into an input stream. This approach allows you to bypass the limitations of direct system audio access.
For Mac Users: We recommend using BlackHole, a free open-source tool available through Homebrew. BlackHole creates a virtual audio device that can route your system audio to AssemblyAI’s API seamlessly.
For Windows Users: Virtual Audio Cable (VAC) is a popular option. While we don’t provide specific Windows instructions in this guide, VAC offers similar functionality to BlackHole for the Windows environment.
Quickstart
1 import pyaudio 2 import websocket 3 import json 4 import threading 5 import time 6 from urllib.parse import urlencode 7 from datetime import datetime 8 9 # --- Configuration --- 10 YOUR_API_KEY = "YOUR_API_KEY" # Replace with your actual API key 11 12 CONNECTION_PARAMS = { 13 "sample_rate": 16000, 14 "format_turns": True, # Request formatted final transcripts 15 } 16 API_ENDPOINT_BASE_URL = "wss://streaming.assemblyai.com/v3/ws" 17 API_ENDPOINT = f"{API_ENDPOINT_BASE_URL}?{urlencode(CONNECTION_PARAMS)}" 18 19 # Audio Configuration 20 FRAMES_PER_BUFFER = 800 # 50ms of audio (0.05s * 16000Hz) 21 SAMPLE_RATE = CONNECTION_PARAMS["sample_rate"] 22 CHANNELS = 1 23 FORMAT = pyaudio.paInt16 24 25 # Global variables for audio stream and websocket 26 audio = None 27 stream = None 28 ws_app = None 29 audio_thread = None 30 stop_event = threading.Event() # To signal the audio thread to stop 31 32 # WAV recording variables 33 recorded_frames = [] # Store audio frames for WAV file 34 recording_lock = threading.Lock() # Thread-safe access to recorded_frames 35 36 # --- BlackHole Device Detection --- 37 38 def get_blackhole_device_index(): 39 """Find BlackHole audio device index.""" 40 p = pyaudio.PyAudio() 41 blackhole_index = None 42 43 print("Available audio devices:") 44 45 for i in range(p.get_device_count()): 46 dev_info = p.get_device_info_by_index(i) 47 print(f" {i}: {dev_info['name']} (inputs: {dev_info['maxInputChannels']})") 48 49 if str(dev_info['name']).startswith('BlackHole') and dev_info['maxInputChannels'] > 0: 50 blackhole_index = i 51 print(f" -> Found BlackHole device at index {i}") 52 53 p.terminate() 54 return blackhole_index 55 56 # --- WebSocket Event Handlers --- 57 58 def on_open(ws): 59 """Called when the WebSocket connection is established.""" 60 print("WebSocket connection opened.") 61 print(f"Connected to: {API_ENDPOINT}") 62 63 # Start sending audio data in a separate thread 64 def stream_audio(): 65 global stream 66 print("Starting audio streaming...") 67 while not stop_event.is_set(): 68 try: 69 audio_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False) 70 71 # Store audio data for WAV recording 72 with recording_lock: 73 recorded_frames.append(audio_data) 74 75 # Send audio data as binary message 76 ws.send(audio_data, websocket.ABNF.OPCODE_BINARY) 77 except Exception as e: 78 print(f"Error streaming audio: {e}") 79 # If stream read fails, likely means it's closed, stop the loop 80 break 81 print("Audio streaming stopped.") 82 83 global audio_thread 84 audio_thread = threading.Thread(target=stream_audio) 85 audio_thread.daemon = ( 86 True # Allow main thread to exit even if this thread is running 87 ) 88 audio_thread.start() 89 90 def on_message(ws, message): 91 try: 92 data = json.loads(message) 93 msg_type = data.get('type') 94 95 if msg_type == "Begin": 96 session_id = data.get('id') 97 expires_at = data.get('expires_at') 98 print(f"\nSession began: ID={session_id}, ExpiresAt={datetime.fromtimestamp(expires_at)}") 99 elif msg_type == "Turn": 100 transcript = data.get('transcript', '') 101 formatted = data.get('turn_is_formatted', False) 102 103 # Clear previous line for formatted messages 104 if formatted: 105 print('\r' + ' ' * 80 + '\r', end='') 106 print(transcript) 107 else: 108 print(f"\r{transcript}", end='') 109 elif msg_type == "Termination": 110 audio_duration = data.get('audio_duration_seconds', 0) 111 session_duration = data.get('session_duration_seconds', 0) 112 print(f"\nSession Terminated: Audio Duration={audio_duration}s, Session Duration={session_duration}s") 113 except json.JSONDecodeError as e: 114 print(f"Error decoding message: {e}") 115 except Exception as e: 116 print(f"Error handling message: {e}") 117 118 def on_error(ws, error): 119 """Called when a WebSocket error occurs.""" 120 print(f"\nWebSocket Error: {error}") 121 # Attempt to signal stop on error 122 stop_event.set() 123 124 125 def on_close(ws, close_status_code, close_msg): 126 """Called when the WebSocket connection is closed.""" 127 print(f"\nWebSocket Disconnected: Status={close_status_code}, Msg={close_msg}") 128 129 # Ensure audio resources are released 130 global stream, audio 131 stop_event.set() # Signal audio thread just in case it's still running 132 133 if stream: 134 if stream.is_active(): 135 stream.stop_stream() 136 stream.close() 137 stream = None 138 if audio: 139 audio.terminate() 140 audio = None 141 # Try to join the audio thread to ensure clean exit 142 if audio_thread and audio_thread.is_alive(): 143 audio_thread.join(timeout=1.0) 144 145 # --- Main Execution --- 146 def run(): 147 global audio, stream, ws_app 148 149 150 # Find BlackHole device 151 blackhole_index = get_blackhole_device_index() 152 153 if blackhole_index is None: 154 print("Error: BlackHole audio device not found!") 155 print("Please install BlackHole from https://existential.audio/blackhole/") 156 return 157 158 # Initialize PyAudio 159 audio = pyaudio.PyAudio() 160 161 # Open Blackhole audio stream 162 try: 163 stream = audio.open( 164 input=True, 165 input_device_index=blackhole_index, # Use BlackHole device 166 frames_per_buffer=FRAMES_PER_BUFFER, 167 channels=CHANNELS, 168 format=FORMAT, 169 rate=SAMPLE_RATE, 170 ) 171 172 print(f"BlackHole audio stream opened successfully (device index: {blackhole_index}).") 173 print("Now capturing system audio through BlackHole. Press Ctrl+C to stop.") 174 print("Make sure audio is routed through BlackHole for transcription.") 175 176 except Exception as e: 177 print(f"Error opening Blackhole audio stream: {e}") 178 if audio: 179 audio.terminate() 180 return # Exit if blackhole cannot be opened 181 182 # Create WebSocketApp 183 ws_app = websocket.WebSocketApp( 184 API_ENDPOINT, 185 header={"Authorization": YOUR_API_KEY}, 186 on_open=on_open, 187 on_message=on_message, 188 on_error=on_error, 189 on_close=on_close, 190 ) 191 192 # Run WebSocketApp in a separate thread to allow main thread to catch KeyboardInterrupt 193 ws_thread = threading.Thread(target=ws_app.run_forever) 194 ws_thread.daemon = True 195 ws_thread.start() 196 197 try: 198 # Keep main thread alive until interrupted 199 while ws_thread.is_alive(): 200 time.sleep(0.1) 201 except KeyboardInterrupt: 202 print("\nCtrl+C received. Stopping...") 203 stop_event.set() # Signal audio thread to stop 204 205 # Send termination message to the server 206 if ws_app and ws_app.sock and ws_app.sock.connected: 207 try: 208 terminate_message = {"type": "Terminate"} 209 print(f"Sending termination message: {json.dumps(terminate_message)}") 210 ws_app.send(json.dumps(terminate_message)) 211 # Give a moment for messages to process before forceful close 212 time.sleep(5) 213 except Exception as e: 214 print(f"Error sending termination message: {e}") 215 216 # Close the WebSocket connection (will trigger on_close) 217 if ws_app: 218 ws_app.close() 219 220 # Wait for WebSocket thread to finish 221 ws_thread.join(timeout=2.0) 222 223 except Exception as e: 224 print(f"\nAn unexpected error occurred: {e}") 225 stop_event.set() 226 if ws_app: 227 ws_app.close() 228 ws_thread.join(timeout=2.0) 229 230 finally: 231 # Final cleanup (already handled in on_close, but good as a fallback) 232 if stream and stream.is_active(): 233 stream.stop_stream() 234 if stream: 235 stream.close() 236 if audio: 237 audio.terminate() 238 print("Cleanup complete. Exiting.") 239 240 if __name__ == "__main__": 241 run()
Step-By-Step Guide
Before we begin, make sure you have an AssemblyAI account and an API key. You can sign up and get your API key from your dashboard.
Install/import Packages & Set API Key
Install the package pyaudio.
$ pip install pyaudio
Import packages and set your API key.
1 import pyaudio 2 import websocket 3 import json 4 import threading 5 import time 6 from urllib.parse import urlencode 7 from datetime import datetime 8 9 YOUR_API_KEY = "YOUR-API-KEY" # Replace with your actual API key
Audio Configuration & Global Variables
Set all of your audio configurations and global variables. Make sure that you have the parameter format_turns
set to True
.
1 CONNECTION_PARAMS = { 2 "sample_rate": 16000, 3 "format_turns": True, # Request formatted final transcripts 4 } 5 API_ENDPOINT_BASE_URL = "wss://streaming.assemblyai.com/v3/ws" 6 API_ENDPOINT = f"{API_ENDPOINT_BASE_URL}?{urlencode(CONNECTION_PARAMS)}" 7 8 # Audio Configuration 9 FRAMES_PER_BUFFER = 800 # 50ms of audio (0.05s * 16000Hz) 10 SAMPLE_RATE = CONNECTION_PARAMS["sample_rate"] 11 CHANNELS = 1 12 FORMAT = pyaudio.paInt16 13 14 # Global variables for audio stream and websocket 15 audio = None 16 stream = None 17 ws_app = None 18 audio_thread = None 19 stop_event = threading.Event() # To signal the audio thread to stop 20 21 # WAV recording variables 22 recorded_frames = [] # Store audio frames for WAV file 23 recording_lock = threading.Lock() # Thread-safe access to recorded_frames
Define Function to Find Blackhole Audio Device Index
Define a function called get_blackhole_device_index
, which retrieves the device index for your BlackHole virtual input device.
1 def get_blackhole_device_index(): 2 """Find BlackHole audio device index.""" 3 p = pyaudio.PyAudio() 4 blackhole_index = None 5 6 print("Available audio devices:") 7 8 for i in range(p.get_device_count()): 9 dev_info = p.get_device_info_by_index(i) 10 print(f" {i}: {dev_info['name']} (inputs: {dev_info['maxInputChannels']})") 11 12 if str(dev_info['name']).startswith('BlackHole') and dev_info['maxInputChannels'] > 0: 13 blackhole_index = i 14 print(f" -> Found BlackHole device at index {i}") 15 16 p.terminate() 17 return blackhole_index
Websocket Event Handlers
Open Websocket
1 def on_open(ws): 2 """Called when the WebSocket connection is established.""" 3 print("WebSocket connection opened.") 4 print(f"Connected to: {API_ENDPOINT}") 5 6 # Start sending audio data in a separate thread 7 def stream_audio(): 8 global stream 9 print("Starting audio streaming...") 10 while not stop_event.is_set(): 11 try: 12 audio_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False) 13 14 # Store audio data for WAV recording 15 with recording_lock: 16 recorded_frames.append(audio_data) 17 18 # Send audio data as binary message 19 ws.send(audio_data, websocket.ABNF.OPCODE_BINARY) 20 except Exception as e: 21 print(f"Error streaming audio: {e}") 22 # If stream read fails, likely means it's closed, stop the loop 23 break 24 print("Audio streaming stopped.") 25 26 global audio_thread 27 audio_thread = threading.Thread(target=stream_audio) 28 audio_thread.daemon = ( 29 True # Allow main thread to exit even if this thread is running 30 ) 31 audio_thread.start()
Handle Websocket Messages
1 def on_message(ws, message): 2 try: 3 data = json.loads(message) 4 msg_type = data.get('type') 5 6 if msg_type == "Begin": 7 session_id = data.get('id') 8 expires_at = data.get('expires_at') 9 print(f"\nSession began: ID={session_id}, ExpiresAt={datetime.fromtimestamp(expires_at)}") 10 elif msg_type == "Turn": 11 transcript = data.get('transcript', '') 12 formatted = data.get('turn_is_formatted', False) 13 14 # Clear previous line for formatted messages 15 if formatted: 16 print('\r' + ' ' * 80 + '\r', end='') 17 print(transcript) 18 else: 19 print(f"\r{transcript}", end='') 20 elif msg_type == "Termination": 21 audio_duration = data.get('audio_duration_seconds', 0) 22 session_duration = data.get('session_duration_seconds', 0) 23 print(f"\nSession Terminated: Audio Duration={audio_duration}s, Session Duration={session_duration}s") 24 except json.JSONDecodeError as e: 25 print(f"Error decoding message: {e}") 26 except Exception as e: 27 print(f"Error handling message: {e}")
Close Websocket
1 def on_close(ws, close_status_code, close_msg): 2 """Called when the WebSocket connection is closed.""" 3 print(f"\nWebSocket Disconnected: Status={close_status_code}, Msg={close_msg}") 4 5 # Ensure audio resources are released 6 global stream, audio 7 stop_event.set() # Signal audio thread just in case it's still running 8 9 if stream: 10 if stream.is_active(): 11 stream.stop_stream() 12 stream.close() 13 stream = None 14 if audio: 15 audio.terminate() 16 audio = None 17 # Try to join the audio thread to ensure clean exit 18 if audio_thread and audio_thread.is_alive(): 19 audio_thread.join(timeout=1.0)
Websocket Error Handling
1 def on_error(ws, error): 2 """Called when a WebSocket error occurs.""" 3 print(f"\nWebSocket Error: {error}") 4 # Attempt to signal stop on error 5 stop_event.set()
Begin Streaming STT Transcription
Make sure to find the Blackhole device index and to set it to the input_device_index
.
1 def run(): 2 global audio, stream, ws_app 3 4 # Find BlackHole device index 5 blackhole_index = get_blackhole_device_index() 6 7 if blackhole_index is None: 8 print("Error: BlackHole audio device not found!") 9 print("Please install BlackHole from https://existential.audio/blackhole/") 10 return 11 12 # Initialize PyAudio 13 audio = pyaudio.PyAudio() 14 15 # Open Blackhole audio stream 16 try: 17 stream = audio.open( 18 input=True, 19 input_device_index=blackhole_index, # Use BlackHole device 20 frames_per_buffer=FRAMES_PER_BUFFER, 21 channels=CHANNELS, 22 format=FORMAT, 23 rate=SAMPLE_RATE, 24 ) 25 26 print(f"BlackHole audio stream opened successfully (device index: {blackhole_index}).") 27 print("Now capturing system audio through BlackHole. Press Ctrl+C to stop.") 28 print("Make sure audio is routed through BlackHole for transcription.") 29 30 except Exception as e: 31 print(f"Error opening Blackhole audio stream: {e}") 32 if audio: 33 audio.terminate() 34 return # Exit if blackhole cannot be opened 35 36 # Create WebSocketApp 37 ws_app = websocket.WebSocketApp( 38 API_ENDPOINT, 39 header={"Authorization": YOUR_API_KEY}, 40 on_open=on_open, 41 on_message=on_message, 42 on_error=on_error, 43 on_close=on_close, 44 ) 45 46 # Run WebSocketApp in a separate thread to allow main thread to catch KeyboardInterrupt 47 ws_thread = threading.Thread(target=ws_app.run_forever) 48 ws_thread.daemon = True 49 ws_thread.start() 50 51 try: 52 # Keep main thread alive until interrupted 53 while ws_thread.is_alive(): 54 time.sleep(0.1) 55 except KeyboardInterrupt: 56 print("\nCtrl+C received. Stopping...") 57 stop_event.set() # Signal audio thread to stop 58 59 # Send termination message to the server 60 if ws_app and ws_app.sock and ws_app.sock.connected: 61 try: 62 terminate_message = {"type": "Terminate"} 63 print(f"Sending termination message: {json.dumps(terminate_message)}") 64 ws_app.send(json.dumps(terminate_message)) 65 # Give a moment for messages to process before forceful close 66 time.sleep(5) 67 except Exception as e: 68 print(f"Error sending termination message: {e}") 69 70 # Close the WebSocket connection (will trigger on_close) 71 if ws_app: 72 ws_app.close() 73 74 # Wait for WebSocket thread to finish 75 ws_thread.join(timeout=2.0) 76 77 except Exception as e: 78 print(f"\nAn unexpected error occurred: {e}") 79 stop_event.set() 80 if ws_app: 81 ws_app.close() 82 ws_thread.join(timeout=2.0) 83 84 finally: 85 # Final cleanup (already handled in on_close, but good as a fallback) 86 if stream and stream.is_active(): 87 stream.stop_stream() 88 if stream: 89 stream.close() 90 if audio: 91 audio.terminate() 92 print("Cleanup complete. Exiting.") 93 94 if __name__ == "__main__": 95 run()
You can press Ctrl+C to stop the transcription.
Troubleshooting
-
You need to select BlackHole as your system output device for the audio to be piped correctly
-
If you still need to hear the audio, you can create a multi-output device on Mac that sends audio to both BlackHole and your speakers/headphones Here’s how to set it up: Open “Audio MIDI Setup” (you can find this by searching in Spotlight). Click the ”+” button in the bottom left corner and choose “Create Multi-Output Device”. In the list on the right, check both your regular output (e.g., “MacBook Pro Speakers”) and “BlackHole 2ch”. Optionally, rename this new device to something like “BlackHole + Speakers”. You may need to modify your script to search for this new device.