Transcribe System Audio in Real-Time (macOS)
This guide solves the challenge of transcribing system audio, which is can be used for transcribing media content or online calls. By using virtual audio devices, you’ll learn how to easily pipe system audio to AssemblyAI’s transcription API on both Mac and Windows.
The key to success lies in creating a virtual input device that captures your speaker output and converts it into an input stream. This approach allows you to bypass the limitations of direct system audio access.
For Mac Users: We recommend using BlackHole, a free open-source tool available through Homebrew. BlackHole creates a virtual audio device that can route your system audio to AssemblyAI’s API seamlessly.
For Windows Users: Virtual Audio Cable (VAC) is a popular option. While we don’t provide specific Windows instructions in this guide, VAC offers similar functionality to BlackHole for the Windows environment.
Quickstart
1 import pyaudio 2 import websocket 3 import json 4 import threading 5 import time 6 from urllib.parse import urlencode 7 from datetime import datetime 8 9 # --- Configuration --- 10 YOUR_API_KEY = "YOUR_API_KEY" # Replace with your actual API key 11 12 CONNECTION_PARAMS = { 13 "sample_rate": 16000, 14 "format_turns": True, # Request formatted final transcripts 15 } 16 API_ENDPOINT_BASE_URL = "wss://streaming.assemblyai.com/v3/ws" 17 API_ENDPOINT = f"{API_ENDPOINT_BASE_URL}?{urlencode(CONNECTION_PARAMS)}" 18 19 # Audio Configuration 20 FRAMES_PER_BUFFER = 800 # 50ms of audio (0.05s * 16000Hz) 21 SAMPLE_RATE = CONNECTION_PARAMS["sample_rate"] 22 CHANNELS = 1 23 FORMAT = pyaudio.paInt16 24 25 # Global variables for audio stream and websocket 26 audio = None 27 stream = None 28 ws_app = None 29 audio_thread = None 30 stop_event = threading.Event() # To signal the audio thread to stop 31 32 # WAV recording variables 33 recorded_frames = [] # Store audio frames for WAV file 34 recording_lock = threading.Lock() # Thread-safe access to recorded_frames 35 36 # --- BlackHole Device Detection --- 37 38 def get_blackhole_device_index(): 39 """Find BlackHole audio device index.""" 40 p = pyaudio.PyAudio() 41 blackhole_index = None 42 43 print("Available audio devices:") 44 45 for i in range(p.get_device_count()): 46 dev_info = p.get_device_info_by_index(i) 47 print(f" {i}: {dev_info['name']} (inputs: {dev_info['maxInputChannels']})") 48 49 if str(dev_info['name']).startswith('BlackHole') and dev_info['maxInputChannels'] > 0: 50 blackhole_index = i 51 print(f" -> Found BlackHole device at index {i}") 52 53 p.terminate() 54 return blackhole_index 55 56 # --- WebSocket Event Handlers --- 57 58 def on_open(ws): 59 """Called when the WebSocket connection is established.""" 60 print("WebSocket connection opened.") 61 print(f"Connected to: {API_ENDPOINT}") 62 63 # Start sending audio data in a separate thread 64 def stream_audio(): 65 global stream 66 print("Starting audio streaming...") 67 while not stop_event.is_set(): 68 try: 69 audio_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False) 70 71 # Store audio data for WAV recording 72 with recording_lock: 73 recorded_frames.append(audio_data) 74 75 # Send audio data as binary message 76 ws.send(audio_data, websocket.ABNF.OPCODE_BINARY) 77 except Exception as e: 78 print(f"Error streaming audio: {e}") 79 # If stream read fails, likely means it's closed, stop the loop 80 break 81 print("Audio streaming stopped.") 82 83 global audio_thread 84 audio_thread = threading.Thread(target=stream_audio) 85 audio_thread.daemon = ( 86 True # Allow main thread to exit even if this thread is running 87 ) 88 audio_thread.start() 89 90 def on_message(ws, message): 91 try: 92 data = json.loads(message) 93 msg_type = data.get('type') 94 95 if msg_type == "Begin": 96 session_id = data.get('id') 97 expires_at = data.get('expires_at') 98 print(f"\nSession began: ID={session_id}, ExpiresAt={datetime.fromtimestamp(expires_at)}") 99 elif msg_type == "Turn": 100 transcript = data.get('transcript', '') 101 if data.get('end_of_turn'): 102 print('\r' + ' ' * 80 + '\r', end='') 103 print(transcript) 104 else: 105 print(f"\r{transcript}", end='') 106 elif msg_type == "Termination": 107 audio_duration = data.get('audio_duration_seconds', 0) 108 session_duration = data.get('session_duration_seconds', 0) 109 print(f"\nSession Terminated: Audio Duration={audio_duration}s, Session Duration={session_duration}s") 110 except json.JSONDecodeError as e: 111 print(f"Error decoding message: {e}") 112 except Exception as e: 113 print(f"Error handling message: {e}") 114 115 def on_error(ws, error): 116 """Called when a WebSocket error occurs.""" 117 print(f"\nWebSocket Error: {error}") 118 # Attempt to signal stop on error 119 stop_event.set() 120 121 122 def on_close(ws, close_status_code, close_msg): 123 """Called when the WebSocket connection is closed.""" 124 print(f"\nWebSocket Disconnected: Status={close_status_code}, Msg={close_msg}") 125 126 # Ensure audio resources are released 127 global stream, audio 128 stop_event.set() # Signal audio thread just in case it's still running 129 130 if stream: 131 if stream.is_active(): 132 stream.stop_stream() 133 stream.close() 134 stream = None 135 if audio: 136 audio.terminate() 137 audio = None 138 # Try to join the audio thread to ensure clean exit 139 if audio_thread and audio_thread.is_alive(): 140 audio_thread.join(timeout=1.0) 141 142 # --- Main Execution --- 143 def run(): 144 global audio, stream, ws_app 145 146 147 # Find BlackHole device 148 blackhole_index = get_blackhole_device_index() 149 150 if blackhole_index is None: 151 print("Error: BlackHole audio device not found!") 152 print("Please install BlackHole from https://existential.audio/blackhole/") 153 return 154 155 # Initialize PyAudio 156 audio = pyaudio.PyAudio() 157 158 # Open Blackhole audio stream 159 try: 160 stream = audio.open( 161 input=True, 162 input_device_index=blackhole_index, # Use BlackHole device 163 frames_per_buffer=FRAMES_PER_BUFFER, 164 channels=CHANNELS, 165 format=FORMAT, 166 rate=SAMPLE_RATE, 167 ) 168 169 print(f"BlackHole audio stream opened successfully (device index: {blackhole_index}).") 170 print("Now capturing system audio through BlackHole. Press Ctrl+C to stop.") 171 print("Make sure audio is routed through BlackHole for transcription.") 172 173 except Exception as e: 174 print(f"Error opening Blackhole audio stream: {e}") 175 if audio: 176 audio.terminate() 177 return # Exit if blackhole cannot be opened 178 179 # Create WebSocketApp 180 ws_app = websocket.WebSocketApp( 181 API_ENDPOINT, 182 header={"Authorization": YOUR_API_KEY}, 183 on_open=on_open, 184 on_message=on_message, 185 on_error=on_error, 186 on_close=on_close, 187 ) 188 189 # Run WebSocketApp in a separate thread to allow main thread to catch KeyboardInterrupt 190 ws_thread = threading.Thread(target=ws_app.run_forever) 191 ws_thread.daemon = True 192 ws_thread.start() 193 194 try: 195 # Keep main thread alive until interrupted 196 while ws_thread.is_alive(): 197 time.sleep(0.1) 198 except KeyboardInterrupt: 199 print("\nCtrl+C received. Stopping...") 200 stop_event.set() # Signal audio thread to stop 201 202 # Send termination message to the server 203 if ws_app and ws_app.sock and ws_app.sock.connected: 204 try: 205 terminate_message = {"type": "Terminate"} 206 print(f"Sending termination message: {json.dumps(terminate_message)}") 207 ws_app.send(json.dumps(terminate_message)) 208 # Give a moment for messages to process before forceful close 209 time.sleep(5) 210 except Exception as e: 211 print(f"Error sending termination message: {e}") 212 213 # Close the WebSocket connection (will trigger on_close) 214 if ws_app: 215 ws_app.close() 216 217 # Wait for WebSocket thread to finish 218 ws_thread.join(timeout=2.0) 219 220 except Exception as e: 221 print(f"\nAn unexpected error occurred: {e}") 222 stop_event.set() 223 if ws_app: 224 ws_app.close() 225 ws_thread.join(timeout=2.0) 226 227 finally: 228 # Final cleanup (already handled in on_close, but good as a fallback) 229 if stream and stream.is_active(): 230 stream.stop_stream() 231 if stream: 232 stream.close() 233 if audio: 234 audio.terminate() 235 print("Cleanup complete. Exiting.") 236 237 if __name__ == "__main__": 238 run()
Step-By-Step Guide
Before we begin, make sure you have an AssemblyAI account and an API key. You can sign up and get your API key from your dashboard.
Install/import Packages & Set API Key
Install the package pyaudio.
$ pip install pyaudio
Import packages and set your API key.
1 import pyaudio 2 import websocket 3 import json 4 import threading 5 import time 6 from urllib.parse import urlencode 7 from datetime import datetime 8 9 YOUR_API_KEY = "YOUR-API-KEY" # Replace with your actual API key
Audio Configuration & Global Variables
Set all of your audio configurations and global variables. Make sure that you have the parameter format_turns set to True.
1 CONNECTION_PARAMS = { 2 "sample_rate": 16000, 3 "format_turns": True, # Request formatted final transcripts 4 } 5 API_ENDPOINT_BASE_URL = "wss://streaming.assemblyai.com/v3/ws" 6 API_ENDPOINT = f"{API_ENDPOINT_BASE_URL}?{urlencode(CONNECTION_PARAMS)}" 7 8 # Audio Configuration 9 FRAMES_PER_BUFFER = 800 # 50ms of audio (0.05s * 16000Hz) 10 SAMPLE_RATE = CONNECTION_PARAMS["sample_rate"] 11 CHANNELS = 1 12 FORMAT = pyaudio.paInt16 13 14 # Global variables for audio stream and websocket 15 audio = None 16 stream = None 17 ws_app = None 18 audio_thread = None 19 stop_event = threading.Event() # To signal the audio thread to stop 20 21 # WAV recording variables 22 recorded_frames = [] # Store audio frames for WAV file 23 recording_lock = threading.Lock() # Thread-safe access to recorded_frames
Define Function to Find Blackhole Audio Device Index
Define a function called get_blackhole_device_index, which retrieves the device index for your BlackHole virtual input device.
1 def get_blackhole_device_index(): 2 """Find BlackHole audio device index.""" 3 p = pyaudio.PyAudio() 4 blackhole_index = None 5 6 print("Available audio devices:") 7 8 for i in range(p.get_device_count()): 9 dev_info = p.get_device_info_by_index(i) 10 print(f" {i}: {dev_info['name']} (inputs: {dev_info['maxInputChannels']})") 11 12 if str(dev_info['name']).startswith('BlackHole') and dev_info['maxInputChannels'] > 0: 13 blackhole_index = i 14 print(f" -> Found BlackHole device at index {i}") 15 16 p.terminate() 17 return blackhole_index
Websocket Event Handlers
Open Websocket
1 def on_open(ws): 2 """Called when the WebSocket connection is established.""" 3 print("WebSocket connection opened.") 4 print(f"Connected to: {API_ENDPOINT}") 5 6 # Start sending audio data in a separate thread 7 def stream_audio(): 8 global stream 9 print("Starting audio streaming...") 10 while not stop_event.is_set(): 11 try: 12 audio_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False) 13 14 # Store audio data for WAV recording 15 with recording_lock: 16 recorded_frames.append(audio_data) 17 18 # Send audio data as binary message 19 ws.send(audio_data, websocket.ABNF.OPCODE_BINARY) 20 except Exception as e: 21 print(f"Error streaming audio: {e}") 22 # If stream read fails, likely means it's closed, stop the loop 23 break 24 print("Audio streaming stopped.") 25 26 global audio_thread 27 audio_thread = threading.Thread(target=stream_audio) 28 audio_thread.daemon = ( 29 True # Allow main thread to exit even if this thread is running 30 ) 31 audio_thread.start()
Handle Websocket Messages
1 def on_message(ws, message): 2 try: 3 data = json.loads(message) 4 msg_type = data.get('type') 5 6 if msg_type == "Begin": 7 session_id = data.get('id') 8 expires_at = data.get('expires_at') 9 print(f"\nSession began: ID={session_id}, ExpiresAt={datetime.fromtimestamp(expires_at)}") 10 elif msg_type == "Turn": 11 transcript = data.get('transcript', '') 12 if data.get('end_of_turn'): 13 print('\r' + ' ' * 80 + '\r', end='') 14 print(transcript) 15 else: 16 print(f"\r{transcript}", end='') 17 elif msg_type == "Termination": 18 audio_duration = data.get('audio_duration_seconds', 0) 19 session_duration = data.get('session_duration_seconds', 0) 20 print(f"\nSession Terminated: Audio Duration={audio_duration}s, Session Duration={session_duration}s") 21 except json.JSONDecodeError as e: 22 print(f"Error decoding message: {e}") 23 except Exception as e: 24 print(f"Error handling message: {e}")
Close Websocket
1 def on_close(ws, close_status_code, close_msg): 2 """Called when the WebSocket connection is closed.""" 3 print(f"\nWebSocket Disconnected: Status={close_status_code}, Msg={close_msg}") 4 5 # Ensure audio resources are released 6 global stream, audio 7 stop_event.set() # Signal audio thread just in case it's still running 8 9 if stream: 10 if stream.is_active(): 11 stream.stop_stream() 12 stream.close() 13 stream = None 14 if audio: 15 audio.terminate() 16 audio = None 17 # Try to join the audio thread to ensure clean exit 18 if audio_thread and audio_thread.is_alive(): 19 audio_thread.join(timeout=1.0)
Websocket Error Handling
1 def on_error(ws, error): 2 """Called when a WebSocket error occurs.""" 3 print(f"\nWebSocket Error: {error}") 4 # Attempt to signal stop on error 5 stop_event.set()
Begin Streaming STT Transcription
Make sure to find the Blackhole device index and to set it to the input_device_index.
1 def run(): 2 global audio, stream, ws_app 3 4 # Find BlackHole device index 5 blackhole_index = get_blackhole_device_index() 6 7 if blackhole_index is None: 8 print("Error: BlackHole audio device not found!") 9 print("Please install BlackHole from https://existential.audio/blackhole/") 10 return 11 12 # Initialize PyAudio 13 audio = pyaudio.PyAudio() 14 15 # Open Blackhole audio stream 16 try: 17 stream = audio.open( 18 input=True, 19 input_device_index=blackhole_index, # Use BlackHole device 20 frames_per_buffer=FRAMES_PER_BUFFER, 21 channels=CHANNELS, 22 format=FORMAT, 23 rate=SAMPLE_RATE, 24 ) 25 26 print(f"BlackHole audio stream opened successfully (device index: {blackhole_index}).") 27 print("Now capturing system audio through BlackHole. Press Ctrl+C to stop.") 28 print("Make sure audio is routed through BlackHole for transcription.") 29 30 except Exception as e: 31 print(f"Error opening Blackhole audio stream: {e}") 32 if audio: 33 audio.terminate() 34 return # Exit if blackhole cannot be opened 35 36 # Create WebSocketApp 37 ws_app = websocket.WebSocketApp( 38 API_ENDPOINT, 39 header={"Authorization": YOUR_API_KEY}, 40 on_open=on_open, 41 on_message=on_message, 42 on_error=on_error, 43 on_close=on_close, 44 ) 45 46 # Run WebSocketApp in a separate thread to allow main thread to catch KeyboardInterrupt 47 ws_thread = threading.Thread(target=ws_app.run_forever) 48 ws_thread.daemon = True 49 ws_thread.start() 50 51 try: 52 # Keep main thread alive until interrupted 53 while ws_thread.is_alive(): 54 time.sleep(0.1) 55 except KeyboardInterrupt: 56 print("\nCtrl+C received. Stopping...") 57 stop_event.set() # Signal audio thread to stop 58 59 # Send termination message to the server 60 if ws_app and ws_app.sock and ws_app.sock.connected: 61 try: 62 terminate_message = {"type": "Terminate"} 63 print(f"Sending termination message: {json.dumps(terminate_message)}") 64 ws_app.send(json.dumps(terminate_message)) 65 # Give a moment for messages to process before forceful close 66 time.sleep(5) 67 except Exception as e: 68 print(f"Error sending termination message: {e}") 69 70 # Close the WebSocket connection (will trigger on_close) 71 if ws_app: 72 ws_app.close() 73 74 # Wait for WebSocket thread to finish 75 ws_thread.join(timeout=2.0) 76 77 except Exception as e: 78 print(f"\nAn unexpected error occurred: {e}") 79 stop_event.set() 80 if ws_app: 81 ws_app.close() 82 ws_thread.join(timeout=2.0) 83 84 finally: 85 # Final cleanup (already handled in on_close, but good as a fallback) 86 if stream and stream.is_active(): 87 stream.stop_stream() 88 if stream: 89 stream.close() 90 if audio: 91 audio.terminate() 92 print("Cleanup complete. Exiting.") 93 94 if __name__ == "__main__": 95 run()
You can press Ctrl+C to stop the transcription.
Troubleshooting
-
You need to select BlackHole as your system output device for the audio to be piped correctly
-
If you still need to hear the audio, you can create a multi-output device on Mac that sends audio to both BlackHole and your speakers/headphones Here’s how to set it up: Open “Audio MIDI Setup” (you can find this by searching in Spotlight). Click the ”+” button in the bottom left corner and choose “Create Multi-Output Device”. In the list on the right, check both your regular output (e.g., “MacBook Pro Speakers”) and “BlackHole 2ch”. Optionally, rename this new device to something like “BlackHole + Speakers”. You may need to modify your script to search for this new device.