Keyterms prompting for Universal-Streaming

The keyterms prompting feature helps improve recognition accuracy for specific words and phrases that are important to your use case.

Keyterms Prompting costs an additional $0.04/hour.

Quickstart

Firstly, install the required dependencies.

$pip install websocket-client pyaudio
1import pyaudio
2import websocket
3import json
4import threading
5import time
6import wave
7from urllib.parse import urlencode
8from datetime import datetime
9
10# --- Configuration ---
11YOUR_API_KEY = "YOUR-API-KEY" # Replace with your actual API key
12
13CONNECTION_PARAMS = {
14 "sample_rate": 16000,
15 "format_turns": True, # Request formatted final transcripts
16 "keyterms_prompt": json.dumps(["Keanu Reeves", "AssemblyAI", "Universal-2"])
17}
18API_ENDPOINT_BASE_URL = "wss://streaming.assemblyai.com/v3/ws"
19API_ENDPOINT = f"{API_ENDPOINT_BASE_URL}?{urlencode(CONNECTION_PARAMS)}"
20
21# Audio Configuration
22FRAMES_PER_BUFFER = 800 # 50ms of audio (0.05s * 16000Hz)
23SAMPLE_RATE = CONNECTION_PARAMS["sample_rate"]
24CHANNELS = 1
25FORMAT = pyaudio.paInt16
26
27# Global variables for audio stream and websocket
28audio = None
29stream = None
30ws_app = None
31audio_thread = None
32stop_event = threading.Event() # To signal the audio thread to stop
33
34# WAV recording variables
35recorded_frames = [] # Store audio frames for WAV file
36recording_lock = threading.Lock() # Thread-safe access to recorded_frames
37
38# --- WebSocket Event Handlers ---
39
40
41def on_open(ws):
42 """Called when the WebSocket connection is established."""
43 print("WebSocket connection opened.")
44 print(f"Connected to: {API_ENDPOINT}")
45
46 # Start sending audio data in a separate thread
47 def stream_audio():
48 global stream
49 print("Starting audio streaming...")
50 while not stop_event.is_set():
51 try:
52 audio_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False)
53
54 # Store audio data for WAV recording
55 with recording_lock:
56 recorded_frames.append(audio_data)
57
58 # Send audio data as binary message
59 ws.send(audio_data, websocket.ABNF.OPCODE_BINARY)
60 except Exception as e:
61 print(f"Error streaming audio: {e}")
62 # If stream read fails, likely means it's closed, stop the loop
63 break
64 print("Audio streaming stopped.")
65
66 global audio_thread
67 audio_thread = threading.Thread(target=stream_audio)
68 audio_thread.daemon = (
69 True # Allow main thread to exit even if this thread is running
70 )
71 audio_thread.start()
72
73def on_message(ws, message):
74 try:
75 data = json.loads(message)
76 msg_type = data.get('type')
77
78 if msg_type == "Begin":
79 session_id = data.get('id')
80 expires_at = data.get('expires_at')
81 print(f"\nSession began: ID={session_id}, ExpiresAt={datetime.fromtimestamp(expires_at)}")
82 elif msg_type == "Turn":
83 transcript = data.get('transcript', '')
84 formatted = data.get('turn_is_formatted', False)
85
86 # Clear previous line for formatted messages
87 if formatted:
88 print('\r' + ' ' * 80 + '\r', end='')
89 print(transcript)
90 else:
91 print(f"\r{transcript}", end='')
92 elif msg_type == "Termination":
93 audio_duration = data.get('audio_duration_seconds', 0)
94 session_duration = data.get('session_duration_seconds', 0)
95 print(f"\nSession Terminated: Audio Duration={audio_duration}s, Session Duration={session_duration}s")
96 except json.JSONDecodeError as e:
97 print(f"Error decoding message: {e}")
98 except Exception as e:
99 print(f"Error handling message: {e}")
100
101def on_error(ws, error):
102 """Called when a WebSocket error occurs."""
103 print(f"\nWebSocket Error: {error}")
104 # Attempt to signal stop on error
105 stop_event.set()
106
107
108def on_close(ws, close_status_code, close_msg):
109 """Called when the WebSocket connection is closed."""
110 print(f"\nWebSocket Disconnected: Status={close_status_code}, Msg={close_msg}")
111
112 # Save recorded audio to WAV file
113 save_wav_file()
114
115 # Ensure audio resources are released
116 global stream, audio
117 stop_event.set() # Signal audio thread just in case it's still running
118
119 if stream:
120 if stream.is_active():
121 stream.stop_stream()
122 stream.close()
123 stream = None
124 if audio:
125 audio.terminate()
126 audio = None
127 # Try to join the audio thread to ensure clean exit
128 if audio_thread and audio_thread.is_alive():
129 audio_thread.join(timeout=1.0)
130
131
132def save_wav_file():
133 """Save recorded audio frames to a WAV file."""
134 if not recorded_frames:
135 print("No audio data recorded.")
136 return
137
138 # Generate filename with timestamp
139 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
140 filename = f"recorded_audio_{timestamp}.wav"
141
142 try:
143 with wave.open(filename, 'wb') as wf:
144 wf.setnchannels(CHANNELS)
145 wf.setsampwidth(2) # 16-bit = 2 bytes
146 wf.setframerate(SAMPLE_RATE)
147
148 # Write all recorded frames
149 with recording_lock:
150 wf.writeframes(b''.join(recorded_frames))
151
152 print(f"Audio saved to: {filename}")
153 print(f"Duration: {len(recorded_frames) * FRAMES_PER_BUFFER / SAMPLE_RATE:.2f} seconds")
154
155 except Exception as e:
156 print(f"Error saving WAV file: {e}")
157
158
159# --- Main Execution ---
160def run():
161 global audio, stream, ws_app
162
163 # Initialize PyAudio
164 audio = pyaudio.PyAudio()
165
166 # Open microphone stream
167 try:
168 stream = audio.open(
169 input=True,
170 frames_per_buffer=FRAMES_PER_BUFFER,
171 channels=CHANNELS,
172 format=FORMAT,
173 rate=SAMPLE_RATE,
174 )
175 print("Microphone stream opened successfully.")
176 print("Speak into your microphone. Press Ctrl+C to stop.")
177 print("Audio will be saved to a WAV file when the session ends.")
178 except Exception as e:
179 print(f"Error opening microphone stream: {e}")
180 if audio:
181 audio.terminate()
182 return # Exit if microphone cannot be opened
183
184 # Create WebSocketApp
185 ws_app = websocket.WebSocketApp(
186 API_ENDPOINT,
187 header={"Authorization": YOUR_API_KEY},
188 on_open=on_open,
189 on_message=on_message,
190 on_error=on_error,
191 on_close=on_close,
192 )
193
194 # Run WebSocketApp in a separate thread to allow main thread to catch KeyboardInterrupt
195 ws_thread = threading.Thread(target=ws_app.run_forever)
196 ws_thread.daemon = True
197 ws_thread.start()
198
199 try:
200 # Keep main thread alive until interrupted
201 while ws_thread.is_alive():
202 time.sleep(0.1)
203 except KeyboardInterrupt:
204 print("\nCtrl+C received. Stopping...")
205 stop_event.set() # Signal audio thread to stop
206
207 # Send termination message to the server
208 if ws_app and ws_app.sock and ws_app.sock.connected:
209 try:
210 terminate_message = {"type": "Terminate"}
211 print(f"Sending termination message: {json.dumps(terminate_message)}")
212 ws_app.send(json.dumps(terminate_message))
213 # Give a moment for messages to process before forceful close
214 time.sleep(5)
215 except Exception as e:
216 print(f"Error sending termination message: {e}")
217
218 # Close the WebSocket connection (will trigger on_close)
219 if ws_app:
220 ws_app.close()
221
222 # Wait for WebSocket thread to finish
223 ws_thread.join(timeout=2.0)
224
225 except Exception as e:
226 print(f"\nAn unexpected error occurred: {e}")
227 stop_event.set()
228 if ws_app:
229 ws_app.close()
230 ws_thread.join(timeout=2.0)
231
232 finally:
233 # Final cleanup (already handled in on_close, but good as a fallback)
234 if stream and stream.is_active():
235 stream.stop_stream()
236 if stream:
237 stream.close()
238 if audio:
239 audio.terminate()
240 print("Cleanup complete. Exiting.")
241
242
243if __name__ == "__main__":
244 run()

Configuration

To utilize keyterms prompting, you need to include your desired keyterms as query parameters in the WebSocket URL.

  • You can include a maximum of 100 keyterms per session.
  • Each individual keyterm string must be 50 characters or less in length.

How it works

Streaming Keyterms Prompting has two components to improve accuracy for your terms.

Word-level boosting

The streaming model itself is biased during inference to be more accurate at identifying words from your keyterms list. This happens in real-time as words are emitted during the streaming process, providing immediate improvements to recognition accuracy. This component is enabled by default.

Turn-level boosting

After each turn is completed, an additional boosting pass analyzes the full transcript using your keyterms list. This post-processing step, similar to formatting, provides a second layer of accuracy improvement by examining the complete context of the turn. To enable this component, set format_turns to True.

Both stages work together to maximize recognition accuracy for your keyterms throughout the streaming process.

Dynamic keyterms prompting

Dynamic keyterms prompting allows you to update keyterms during an active streaming session using the UpdateConfiguration message. This enables you to adapt the recognition context in real-time based on conversation flow or changing requirements.

Updating keyterms during a session

To update keyterms while streaming, send an UpdateConfiguration message with a new keyterms_prompt array:

1# Replace or establish new set of keyterms
2websocket.send('{"type": "UpdateConfiguration", "keyterms_prompt": ["Universal-3"]}')
3
4# Remove keyterms and reset context biasing
5websocket.send('{"type": "UpdateConfiguration", "keyterms_prompt": []}')

How dynamic keyterms work

When you send an UpdateConfiguration message:

  • Replacing keyterms: Providing a new array of keyterms completely replaces the existing set. The new keyterms take effect immediately for subsequent audio processing.
  • Clearing keyterms: Sending an empty array [] removes all keyterms and resets context biasing to the default state.
  • Both boosting stages: Dynamic keyterms work with both word-level boosting (native context biasing) and turn-level boosting (metaphone-based), just like initial keyterms.

Use cases for dynamic keyterms

Dynamic keyterms are particularly useful for:

  • Context-aware voice agents: Update keyterms based on conversation stage (e.g., switching from menu items to payment terms)
  • Multi-topic conversations: Adapt vocabulary as the conversation topic changes
  • Progressive disclosure: Add relevant keyterms as new information becomes available
  • Cleanup: Remove keyterms that are no longer relevant to reduce processing overhead

Important notes

  • Keyterms prompts longer than 50 characters are ignored.
  • Requests containing more than 100 keyterms will result in an error.

Best practices

To maximize the effectiveness of keyterms prompting:

  • Specify Unique Terminology: Include proper names, company names, technical terms, or vocabulary specific to your domain that might not be commonly recognized.
  • Exact Spelling and Capitalization: Provide keyterms with the precise spelling and capitalization you expect to see in the output transcript. This helps the system accurately identify the terms.
  • Avoid Common Words: Do not include single, common English words (e.g., “information”) as keyterms. The system is generally proficient with such words, and adding them as keyterms can be redundant.