Transcribe System Audio in Real-Time (macOS)

This guide solves the challenge of transcribing system audio, which is can be used for transcribing media content or online calls. By using virtual audio devices, you’ll learn how to easily pipe system audio to AssemblyAI’s transcription API on both Mac and Windows.

The key to success lies in creating a virtual input device that captures your speaker output and converts it into an input stream. This approach allows you to bypass the limitations of direct system audio access.

For Mac Users: We recommend using BlackHole, a free open-source tool available through Homebrew. BlackHole creates a virtual audio device that can route your system audio to AssemblyAI’s API seamlessly.

For Windows Users: Virtual Audio Cable (VAC) is a popular option. While we don’t provide specific Windows instructions in this guide, VAC offers similar functionality to BlackHole for the Windows environment.

Quickstart

1import pyaudio
2import websocket
3import json
4import threading
5import time
6from urllib.parse import urlencode
7from datetime import datetime
8
9# --- Configuration ---
10YOUR_API_KEY = "YOUR_API_KEY" # Replace with your actual API key
11
12CONNECTION_PARAMS = {
13 "sample_rate": 16000,
14 "format_turns": True, # Request formatted final transcripts
15}
16API_ENDPOINT_BASE_URL = "wss://streaming.assemblyai.com/v3/ws"
17API_ENDPOINT = f"{API_ENDPOINT_BASE_URL}?{urlencode(CONNECTION_PARAMS)}"
18
19# Audio Configuration
20FRAMES_PER_BUFFER = 800 # 50ms of audio (0.05s * 16000Hz)
21SAMPLE_RATE = CONNECTION_PARAMS["sample_rate"]
22CHANNELS = 1
23FORMAT = pyaudio.paInt16
24
25# Global variables for audio stream and websocket
26audio = None
27stream = None
28ws_app = None
29audio_thread = None
30stop_event = threading.Event() # To signal the audio thread to stop
31
32# WAV recording variables
33recorded_frames = [] # Store audio frames for WAV file
34recording_lock = threading.Lock() # Thread-safe access to recorded_frames
35
36# --- BlackHole Device Detection ---
37
38def get_blackhole_device_index():
39 """Find BlackHole audio device index."""
40 p = pyaudio.PyAudio()
41 blackhole_index = None
42
43 print("Available audio devices:")
44
45 for i in range(p.get_device_count()):
46 dev_info = p.get_device_info_by_index(i)
47 print(f" {i}: {dev_info['name']} (inputs: {dev_info['maxInputChannels']})")
48
49 if str(dev_info['name']).startswith('BlackHole') and dev_info['maxInputChannels'] > 0:
50 blackhole_index = i
51 print(f" -> Found BlackHole device at index {i}")
52
53 p.terminate()
54 return blackhole_index
55
56# --- WebSocket Event Handlers ---
57
58def on_open(ws):
59 """Called when the WebSocket connection is established."""
60 print("WebSocket connection opened.")
61 print(f"Connected to: {API_ENDPOINT}")
62
63 # Start sending audio data in a separate thread
64 def stream_audio():
65 global stream
66 print("Starting audio streaming...")
67 while not stop_event.is_set():
68 try:
69 audio_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False)
70
71 # Store audio data for WAV recording
72 with recording_lock:
73 recorded_frames.append(audio_data)
74
75 # Send audio data as binary message
76 ws.send(audio_data, websocket.ABNF.OPCODE_BINARY)
77 except Exception as e:
78 print(f"Error streaming audio: {e}")
79 # If stream read fails, likely means it's closed, stop the loop
80 break
81 print("Audio streaming stopped.")
82
83 global audio_thread
84 audio_thread = threading.Thread(target=stream_audio)
85 audio_thread.daemon = (
86 True # Allow main thread to exit even if this thread is running
87 )
88 audio_thread.start()
89
90def on_message(ws, message):
91 try:
92 data = json.loads(message)
93 msg_type = data.get('type')
94
95 if msg_type == "Begin":
96 session_id = data.get('id')
97 expires_at = data.get('expires_at')
98 print(f"\nSession began: ID={session_id}, ExpiresAt={datetime.fromtimestamp(expires_at)}")
99 elif msg_type == "Turn":
100 transcript = data.get('transcript', '')
101 if data.get('end_of_turn'):
102 print('\r' + ' ' * 80 + '\r', end='')
103 print(transcript)
104 else:
105 print(f"\r{transcript}", end='')
106 elif msg_type == "Termination":
107 audio_duration = data.get('audio_duration_seconds', 0)
108 session_duration = data.get('session_duration_seconds', 0)
109 print(f"\nSession Terminated: Audio Duration={audio_duration}s, Session Duration={session_duration}s")
110 except json.JSONDecodeError as e:
111 print(f"Error decoding message: {e}")
112 except Exception as e:
113 print(f"Error handling message: {e}")
114
115def on_error(ws, error):
116 """Called when a WebSocket error occurs."""
117 print(f"\nWebSocket Error: {error}")
118 # Attempt to signal stop on error
119 stop_event.set()
120
121
122def on_close(ws, close_status_code, close_msg):
123 """Called when the WebSocket connection is closed."""
124 print(f"\nWebSocket Disconnected: Status={close_status_code}, Msg={close_msg}")
125
126 # Ensure audio resources are released
127 global stream, audio
128 stop_event.set() # Signal audio thread just in case it's still running
129
130 if stream:
131 if stream.is_active():
132 stream.stop_stream()
133 stream.close()
134 stream = None
135 if audio:
136 audio.terminate()
137 audio = None
138 # Try to join the audio thread to ensure clean exit
139 if audio_thread and audio_thread.is_alive():
140 audio_thread.join(timeout=1.0)
141
142# --- Main Execution ---
143def run():
144 global audio, stream, ws_app
145
146
147 # Find BlackHole device
148 blackhole_index = get_blackhole_device_index()
149
150 if blackhole_index is None:
151 print("Error: BlackHole audio device not found!")
152 print("Please install BlackHole from https://existential.audio/blackhole/")
153 return
154
155 # Initialize PyAudio
156 audio = pyaudio.PyAudio()
157
158 # Open Blackhole audio stream
159 try:
160 stream = audio.open(
161 input=True,
162 input_device_index=blackhole_index, # Use BlackHole device
163 frames_per_buffer=FRAMES_PER_BUFFER,
164 channels=CHANNELS,
165 format=FORMAT,
166 rate=SAMPLE_RATE,
167 )
168
169 print(f"BlackHole audio stream opened successfully (device index: {blackhole_index}).")
170 print("Now capturing system audio through BlackHole. Press Ctrl+C to stop.")
171 print("Make sure audio is routed through BlackHole for transcription.")
172
173 except Exception as e:
174 print(f"Error opening Blackhole audio stream: {e}")
175 if audio:
176 audio.terminate()
177 return # Exit if blackhole cannot be opened
178
179 # Create WebSocketApp
180 ws_app = websocket.WebSocketApp(
181 API_ENDPOINT,
182 header={"Authorization": YOUR_API_KEY},
183 on_open=on_open,
184 on_message=on_message,
185 on_error=on_error,
186 on_close=on_close,
187 )
188
189 # Run WebSocketApp in a separate thread to allow main thread to catch KeyboardInterrupt
190 ws_thread = threading.Thread(target=ws_app.run_forever)
191 ws_thread.daemon = True
192 ws_thread.start()
193
194 try:
195 # Keep main thread alive until interrupted
196 while ws_thread.is_alive():
197 time.sleep(0.1)
198 except KeyboardInterrupt:
199 print("\nCtrl+C received. Stopping...")
200 stop_event.set() # Signal audio thread to stop
201
202 # Send termination message to the server
203 if ws_app and ws_app.sock and ws_app.sock.connected:
204 try:
205 terminate_message = {"type": "Terminate"}
206 print(f"Sending termination message: {json.dumps(terminate_message)}")
207 ws_app.send(json.dumps(terminate_message))
208 # Give a moment for messages to process before forceful close
209 time.sleep(5)
210 except Exception as e:
211 print(f"Error sending termination message: {e}")
212
213 # Close the WebSocket connection (will trigger on_close)
214 if ws_app:
215 ws_app.close()
216
217 # Wait for WebSocket thread to finish
218 ws_thread.join(timeout=2.0)
219
220 except Exception as e:
221 print(f"\nAn unexpected error occurred: {e}")
222 stop_event.set()
223 if ws_app:
224 ws_app.close()
225 ws_thread.join(timeout=2.0)
226
227 finally:
228 # Final cleanup (already handled in on_close, but good as a fallback)
229 if stream and stream.is_active():
230 stream.stop_stream()
231 if stream:
232 stream.close()
233 if audio:
234 audio.terminate()
235 print("Cleanup complete. Exiting.")
236
237if __name__ == "__main__":
238 run()

Step-By-Step Guide

Before we begin, make sure you have an AssemblyAI account and an API key. You can sign up and get your API key from your dashboard.

Install/import Packages & Set API Key

Install the package pyaudio.

$pip install pyaudio

Import packages and set your API key.

1import pyaudio
2import websocket
3import json
4import threading
5import time
6from urllib.parse import urlencode
7from datetime import datetime
8
9YOUR_API_KEY = "YOUR-API-KEY" # Replace with your actual API key

Audio Configuration & Global Variables

Set all of your audio configurations and global variables. Make sure that you have the parameter format_turns set to True.

1CONNECTION_PARAMS = {
2 "sample_rate": 16000,
3 "format_turns": True, # Request formatted final transcripts
4}
5API_ENDPOINT_BASE_URL = "wss://streaming.assemblyai.com/v3/ws"
6API_ENDPOINT = f"{API_ENDPOINT_BASE_URL}?{urlencode(CONNECTION_PARAMS)}"
7
8# Audio Configuration
9FRAMES_PER_BUFFER = 800 # 50ms of audio (0.05s * 16000Hz)
10SAMPLE_RATE = CONNECTION_PARAMS["sample_rate"]
11CHANNELS = 1
12FORMAT = pyaudio.paInt16
13
14# Global variables for audio stream and websocket
15audio = None
16stream = None
17ws_app = None
18audio_thread = None
19stop_event = threading.Event() # To signal the audio thread to stop
20
21# WAV recording variables
22recorded_frames = [] # Store audio frames for WAV file
23recording_lock = threading.Lock() # Thread-safe access to recorded_frames

Define Function to Find Blackhole Audio Device Index

Define a function called get_blackhole_device_index, which retrieves the device index for your BlackHole virtual input device.

1def get_blackhole_device_index():
2 """Find BlackHole audio device index."""
3 p = pyaudio.PyAudio()
4 blackhole_index = None
5
6 print("Available audio devices:")
7
8 for i in range(p.get_device_count()):
9 dev_info = p.get_device_info_by_index(i)
10 print(f" {i}: {dev_info['name']} (inputs: {dev_info['maxInputChannels']})")
11
12 if str(dev_info['name']).startswith('BlackHole') and dev_info['maxInputChannels'] > 0:
13 blackhole_index = i
14 print(f" -> Found BlackHole device at index {i}")
15
16 p.terminate()
17 return blackhole_index

Websocket Event Handlers

Open Websocket

1def on_open(ws):
2 """Called when the WebSocket connection is established."""
3 print("WebSocket connection opened.")
4 print(f"Connected to: {API_ENDPOINT}")
5
6 # Start sending audio data in a separate thread
7 def stream_audio():
8 global stream
9 print("Starting audio streaming...")
10 while not stop_event.is_set():
11 try:
12 audio_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False)
13
14 # Store audio data for WAV recording
15 with recording_lock:
16 recorded_frames.append(audio_data)
17
18 # Send audio data as binary message
19 ws.send(audio_data, websocket.ABNF.OPCODE_BINARY)
20 except Exception as e:
21 print(f"Error streaming audio: {e}")
22 # If stream read fails, likely means it's closed, stop the loop
23 break
24 print("Audio streaming stopped.")
25
26 global audio_thread
27 audio_thread = threading.Thread(target=stream_audio)
28 audio_thread.daemon = (
29 True # Allow main thread to exit even if this thread is running
30 )
31 audio_thread.start()

Handle Websocket Messages

1def on_message(ws, message):
2 try:
3 data = json.loads(message)
4 msg_type = data.get('type')
5
6 if msg_type == "Begin":
7 session_id = data.get('id')
8 expires_at = data.get('expires_at')
9 print(f"\nSession began: ID={session_id}, ExpiresAt={datetime.fromtimestamp(expires_at)}")
10 elif msg_type == "Turn":
11 transcript = data.get('transcript', '')
12 if data.get('end_of_turn'):
13 print('\r' + ' ' * 80 + '\r', end='')
14 print(transcript)
15 else:
16 print(f"\r{transcript}", end='')
17 elif msg_type == "Termination":
18 audio_duration = data.get('audio_duration_seconds', 0)
19 session_duration = data.get('session_duration_seconds', 0)
20 print(f"\nSession Terminated: Audio Duration={audio_duration}s, Session Duration={session_duration}s")
21 except json.JSONDecodeError as e:
22 print(f"Error decoding message: {e}")
23 except Exception as e:
24 print(f"Error handling message: {e}")

Close Websocket

1def on_close(ws, close_status_code, close_msg):
2 """Called when the WebSocket connection is closed."""
3 print(f"\nWebSocket Disconnected: Status={close_status_code}, Msg={close_msg}")
4
5 # Ensure audio resources are released
6 global stream, audio
7 stop_event.set() # Signal audio thread just in case it's still running
8
9 if stream:
10 if stream.is_active():
11 stream.stop_stream()
12 stream.close()
13 stream = None
14 if audio:
15 audio.terminate()
16 audio = None
17 # Try to join the audio thread to ensure clean exit
18 if audio_thread and audio_thread.is_alive():
19 audio_thread.join(timeout=1.0)

Websocket Error Handling

1def on_error(ws, error):
2 """Called when a WebSocket error occurs."""
3 print(f"\nWebSocket Error: {error}")
4 # Attempt to signal stop on error
5 stop_event.set()

Begin Streaming STT Transcription

Make sure to find the Blackhole device index and to set it to the input_device_index.

1def run():
2 global audio, stream, ws_app
3
4 # Find BlackHole device index
5 blackhole_index = get_blackhole_device_index()
6
7 if blackhole_index is None:
8 print("Error: BlackHole audio device not found!")
9 print("Please install BlackHole from https://existential.audio/blackhole/")
10 return
11
12 # Initialize PyAudio
13 audio = pyaudio.PyAudio()
14
15 # Open Blackhole audio stream
16 try:
17 stream = audio.open(
18 input=True,
19 input_device_index=blackhole_index, # Use BlackHole device
20 frames_per_buffer=FRAMES_PER_BUFFER,
21 channels=CHANNELS,
22 format=FORMAT,
23 rate=SAMPLE_RATE,
24 )
25
26 print(f"BlackHole audio stream opened successfully (device index: {blackhole_index}).")
27 print("Now capturing system audio through BlackHole. Press Ctrl+C to stop.")
28 print("Make sure audio is routed through BlackHole for transcription.")
29
30 except Exception as e:
31 print(f"Error opening Blackhole audio stream: {e}")
32 if audio:
33 audio.terminate()
34 return # Exit if blackhole cannot be opened
35
36 # Create WebSocketApp
37 ws_app = websocket.WebSocketApp(
38 API_ENDPOINT,
39 header={"Authorization": YOUR_API_KEY},
40 on_open=on_open,
41 on_message=on_message,
42 on_error=on_error,
43 on_close=on_close,
44 )
45
46 # Run WebSocketApp in a separate thread to allow main thread to catch KeyboardInterrupt
47 ws_thread = threading.Thread(target=ws_app.run_forever)
48 ws_thread.daemon = True
49 ws_thread.start()
50
51 try:
52 # Keep main thread alive until interrupted
53 while ws_thread.is_alive():
54 time.sleep(0.1)
55 except KeyboardInterrupt:
56 print("\nCtrl+C received. Stopping...")
57 stop_event.set() # Signal audio thread to stop
58
59 # Send termination message to the server
60 if ws_app and ws_app.sock and ws_app.sock.connected:
61 try:
62 terminate_message = {"type": "Terminate"}
63 print(f"Sending termination message: {json.dumps(terminate_message)}")
64 ws_app.send(json.dumps(terminate_message))
65 # Give a moment for messages to process before forceful close
66 time.sleep(5)
67 except Exception as e:
68 print(f"Error sending termination message: {e}")
69
70 # Close the WebSocket connection (will trigger on_close)
71 if ws_app:
72 ws_app.close()
73
74 # Wait for WebSocket thread to finish
75 ws_thread.join(timeout=2.0)
76
77 except Exception as e:
78 print(f"\nAn unexpected error occurred: {e}")
79 stop_event.set()
80 if ws_app:
81 ws_app.close()
82 ws_thread.join(timeout=2.0)
83
84 finally:
85 # Final cleanup (already handled in on_close, but good as a fallback)
86 if stream and stream.is_active():
87 stream.stop_stream()
88 if stream:
89 stream.close()
90 if audio:
91 audio.terminate()
92 print("Cleanup complete. Exiting.")
93
94if __name__ == "__main__":
95 run()

You can press Ctrl+C to stop the transcription.

Troubleshooting

  • You need to select BlackHole as your system output device for the audio to be piped correctly

  • If you still need to hear the audio, you can create a multi-output device on Mac that sends audio to both BlackHole and your speakers/headphones Here’s how to set it up: Open “Audio MIDI Setup” (you can find this by searching in Spotlight). Click the ”+” button in the bottom left corner and choose “Create Multi-Output Device”. In the list on the right, check both your regular output (e.g., “MacBook Pro Speakers”) and “BlackHole 2ch”. Optionally, rename this new device to something like “BlackHole + Speakers”. You may need to modify your script to search for this new device.