Apply LLM Gateway to Streaming

Learn how to analyze streaming audio transcripts using LLM Gateway.

Overview

A Large Language Model (LLM) is a machine learning model that uses natural language processing (NLP) to generate text. LLM Gateway is a unified API that provides access to 20+ models from Claude, GPT, Gemini, and more through a single interface. You can use LLM Gateway to analyze streaming audio transcripts in real time, for example to summarize a live conversation or extract action items as they happen.

By the end of this tutorial, you’ll be able to use LLM Gateway to analyze a streaming audio transcript from your microphone.

Here’s the full sample code for what you’ll build in this tutorial:

1import pyaudio
2import websocket
3import json
4import threading
5import time
6import wave
7from urllib.parse import urlencode
8from datetime import datetime
9
10# --- Configuration ---
11YOUR_API_KEY = "<YOUR_API_KEY>" # Replace with your actual API key
12
13# LLM Gateway Configuration
14PROMPT = "Provide a brief summary of the transcript.\n\nTranscript: {{turn}}"
15LLM_GATEWAY_CONFIG = {
16 "model": "claude-sonnet-4-20250514",
17 "messages": [
18 {"role": "user", "content": PROMPT}
19 ],
20 "max_tokens": 4000
21}
22
23CONNECTION_PARAMS = {
24 "sample_rate": 16000,
25 "format_turns": True, # Request formatted final transcripts
26 "speech_model": "u3-rt-pro", # USM 3 Pro model
27 "llm_gateway": json.dumps(LLM_GATEWAY_CONFIG) # LLM Gateway configuration
28}
29API_ENDPOINT_BASE_URL = "wss://streaming.assemblyai.com/v3/ws"
30API_ENDPOINT = f"{API_ENDPOINT_BASE_URL}?{urlencode(CONNECTION_PARAMS)}"
31
32# Audio Configuration
33FRAMES_PER_BUFFER = 800 # 50ms of audio (0.05s * 16000Hz)
34SAMPLE_RATE = CONNECTION_PARAMS["sample_rate"]
35CHANNELS = 1
36FORMAT = pyaudio.paInt16
37
38# Global variables for audio stream and websocket
39audio = None
40stream = None
41ws_app = None
42audio_thread = None
43stop_event = threading.Event() # To signal the audio thread to stop
44
45# WAV recording variables
46recorded_frames = [] # Store audio frames for WAV file
47recording_lock = threading.Lock() # Thread-safe access to recorded_frames
48
49def save_wav_file():
50 """Save recorded audio frames to a WAV file."""
51 if not recorded_frames:
52 print("No audio data recorded.")
53 return
54
55 # Generate filename with timestamp
56 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
57 filename = f"recorded_audio_{timestamp}.wav"
58
59 try:
60 with wave.open(filename, 'wb') as wf:
61 wf.setnchannels(CHANNELS)
62 wf.setsampwidth(2) # 16-bit = 2 bytes
63 wf.setframerate(SAMPLE_RATE)
64
65 # Write all recorded frames
66 with recording_lock:
67 wf.writeframes(b''.join(recorded_frames))
68
69 print(f"Audio saved to: {filename}")
70 print(f"Duration: {len(recorded_frames) * FRAMES_PER_BUFFER / SAMPLE_RATE:.2f} seconds")
71
72 except Exception as e:
73 print(f"Error saving WAV file: {e}")
74
75# --- WebSocket Event Handlers ---
76
77def on_open(ws):
78 """Called when the WebSocket connection is established."""
79 print("WebSocket connection opened.")
80 print(f"Connected to: {API_ENDPOINT_BASE_URL}")
81
82 # Start sending audio data in a separate thread
83 def stream_audio():
84 global stream
85 print("Starting audio streaming...")
86 while not stop_event.is_set():
87 try:
88 audio_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False)
89
90 # Store audio data for WAV recording
91 with recording_lock:
92 recorded_frames.append(audio_data)
93
94 # Send audio data as binary message
95 ws.send(audio_data, websocket.ABNF.OPCODE_BINARY)
96 except Exception as e:
97 print(f"Error streaming audio: {e}")
98 # If stream read fails, likely means it's closed, stop the loop
99 break
100 print("Audio streaming stopped.")
101
102 global audio_thread
103 audio_thread = threading.Thread(target=stream_audio)
104 audio_thread.daemon = (
105 True # Allow main thread to exit even if this thread is running
106 )
107 audio_thread.start()
108
109def on_message(ws, message):
110 try:
111 data = json.loads(message)
112 msg_type = data.get('type')
113
114 if msg_type == "Begin":
115 session_id = data.get('id')
116 expires_at = data.get('expires_at')
117 print(f"Session started: {session_id}")
118 elif msg_type == "Turn":
119 end_of_turn = data.get('end_of_turn', False)
120
121 if end_of_turn:
122 transcript = data.get('transcript', '')
123 print(f"\nTranscript:\n{transcript}\n")
124 elif msg_type == "LLMGatewayResponse":
125 # Extract the LLM response content
126 llm_data = data.get('data', {})
127 llm_content = llm_data.get("choices", [{}])[0].get("message", {}).get("content", "")
128 print(f"LLM Response:\n{llm_content}\n")
129 elif msg_type == "Termination":
130 audio_duration = data.get('audio_duration_seconds', 0)
131 session_duration = data.get('session_duration_seconds', 0)
132 print(f"Session terminated: {audio_duration} seconds of audio processed")
133 except json.JSONDecodeError as e:
134 print(f"Error decoding message: {e}")
135 except Exception as e:
136 print(f"Error handling message: {e}")
137
138def on_error(ws, error):
139 """Called when a WebSocket error occurs."""
140 print(f"\nWebSocket Error: {error}")
141 # Attempt to signal stop on error
142 stop_event.set()
143
144
145def on_close(ws, close_status_code, close_msg):
146 """Called when the WebSocket connection is closed."""
147 print(f"\nWebSocket Disconnected: Status={close_status_code}, Msg={close_msg}")
148
149 # Save recorded audio to WAV file
150 save_wav_file()
151
152 # Ensure audio resources are released
153 global stream, audio
154 stop_event.set() # Signal audio thread just in case it's still running
155
156 if stream:
157 if stream.is_active():
158 stream.stop_stream()
159 stream.close()
160 stream = None
161 if audio:
162 audio.terminate()
163 audio = None
164 # Try to join the audio thread to ensure clean exit
165 if audio_thread and audio_thread.is_alive():
166 audio_thread.join(timeout=1.0)
167
168# --- Main Execution ---
169def run():
170 global audio, stream, ws_app
171
172 # Initialize PyAudio
173 audio = pyaudio.PyAudio()
174
175 # Open microphone stream
176 try:
177 stream = audio.open(
178 input=True,
179 frames_per_buffer=FRAMES_PER_BUFFER,
180 channels=CHANNELS,
181 format=FORMAT,
182 rate=SAMPLE_RATE,
183 )
184 print("Microphone stream opened successfully.")
185 print("Speak into your microphone. Press Ctrl+C to stop.")
186 print("Audio will be saved to a WAV file when the session ends.")
187 except Exception as e:
188 print(f"Error opening microphone stream: {e}")
189 if audio:
190 audio.terminate()
191 return # Exit if microphone cannot be opened
192
193 # Create WebSocketApp
194 ws_app = websocket.WebSocketApp(
195 API_ENDPOINT,
196 header={"Authorization": YOUR_API_KEY},
197 on_open=on_open,
198 on_message=on_message,
199 on_error=on_error,
200 on_close=on_close,
201 )
202
203 # Run WebSocketApp in a separate thread to allow main thread to catch KeyboardInterrupt
204 ws_thread = threading.Thread(target=ws_app.run_forever)
205 ws_thread.daemon = True
206 ws_thread.start()
207
208 try:
209 # Keep main thread alive until interrupted
210 while ws_thread.is_alive():
211 time.sleep(0.1)
212 except KeyboardInterrupt:
213 print("\nCtrl+C received. Stopping...")
214 stop_event.set() # Signal audio thread to stop
215
216 # Send termination message to the server
217 if ws_app and ws_app.sock and ws_app.sock.connected:
218 try:
219 terminate_message = {"type": "Terminate"}
220 print(f"Sending termination message: {json.dumps(terminate_message)}")
221 ws_app.send(json.dumps(terminate_message))
222 # Give a moment for messages to process before forceful close
223 time.sleep(5)
224 except Exception as e:
225 print(f"Error sending termination message: {e}")
226
227 # Close the WebSocket connection (will trigger on_close)
228 if ws_app:
229 ws_app.close()
230
231 # Wait for WebSocket thread to finish
232 ws_thread.join(timeout=2.0)
233
234 except Exception as e:
235 print(f"\nAn unexpected error occurred: {e}")
236 stop_event.set()
237 if ws_app:
238 ws_app.close()
239 ws_thread.join(timeout=2.0)
240
241 finally:
242 # Final cleanup (already handled in on_close, but good as a fallback)
243 if stream and stream.is_active():
244 stream.stop_stream()
245 if stream:
246 stream.close()
247 if audio:
248 audio.terminate()
249 print("Cleanup complete. Exiting.")
250
251
252if __name__ == "__main__":
253 run()

Before you begin

To complete this tutorial, you need:

Step 1: Install prerequisites

Install the required packages via pip:

$pip install pyaudio websocket-client

Step 2: Connect to Universal Streaming

In this step, you’ll set up a connection to the Universal Streaming API with the llm_gateway parameter. This parameter configures LLM Gateway to process your streaming transcripts.

For more information about streaming transcription, see Transcribe streaming audio.

1import pyaudio
2import websocket
3import json
4import threading
5import time
6import wave
7from urllib.parse import urlencode
8from datetime import datetime
9
10# --- Configuration ---
11YOUR_API_KEY = "<YOUR_API_KEY>" # Replace with your actual API key
12
13# LLM Gateway Configuration
14PROMPT = "Provide a brief summary of the transcript.\n\nTranscript: {{turn}}"
15LLM_GATEWAY_CONFIG = {
16 "model": "claude-sonnet-4-20250514",
17 "messages": [
18 {"role": "user", "content": PROMPT}
19 ],
20 "max_tokens": 4000
21}
22
23CONNECTION_PARAMS = {
24 "sample_rate": 16000,
25 "format_turns": True, # Request formatted final transcripts
26 "speech_model": "u3-rt-pro", # USM 3 Pro model
27 "llm_gateway": json.dumps(LLM_GATEWAY_CONFIG) # LLM Gateway configuration
28}
29API_ENDPOINT_BASE_URL = "wss://streaming.assemblyai.com/v3/ws"
30API_ENDPOINT = f"{API_ENDPOINT_BASE_URL}?{urlencode(CONNECTION_PARAMS)}"
31
32# Audio Configuration
33FRAMES_PER_BUFFER = 800 # 50ms of audio (0.05s * 16000Hz)
34SAMPLE_RATE = CONNECTION_PARAMS["sample_rate"]
35CHANNELS = 1
36FORMAT = pyaudio.paInt16
37
38# Global variables for audio stream and websocket
39audio = None
40stream = None
41ws_app = None
42audio_thread = None
43stop_event = threading.Event() # To signal the audio thread to stop
44
45# WAV recording variables
46recorded_frames = [] # Store audio frames for WAV file
47recording_lock = threading.Lock() # Thread-safe access to recorded_frames

The llm_gateway parameterisa JSON-stringified object that follows the same interface as the LLM Gateway chat completions API. It accepts the following fields:

KeyTypeDescription
modelstringThe model to use. See Available models.
messagesarrayAn array of message objects. The content field contains your prompt.
max_tokensnumberThe maximum number of tokens to generate.

Step 3: Stream audio and analyze with LLM Gateway

In this step, you’ll stream audio from your microphone, collect the transcribed text from completed turns, and then send the accumulated transcript to LLM Gateway for analysis when the session ends.

1

Set up the event handlers to stream audio and collect transcripts from completed turns.

1def save_wav_file():
2 """Save recorded audio frames to a WAV file."""
3 if not recorded_frames:
4 print("No audio data recorded.")
5 return
6
7 # Generate filename with timestamp
8 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
9 filename = f"recorded_audio_{timestamp}.wav"
10
11 try:
12 with wave.open(filename, 'wb') as wf:
13 wf.setnchannels(CHANNELS)
14 wf.setsampwidth(2) # 16-bit = 2 bytes
15 wf.setframerate(SAMPLE_RATE)
16
17 # Write all recorded frames
18 with recording_lock:
19 wf.writeframes(b''.join(recorded_frames))
20
21 print(f"Audio saved to: {filename}")
22 print(f"Duration: {len(recorded_frames) * FRAMES_PER_BUFFER / SAMPLE_RATE:.2f} seconds")
23
24 except Exception as e:
25 print(f"Error saving WAV file: {e}")
26
27# --- WebSocket Event Handlers ---
28
29def on_open(ws):
30 """Called when the WebSocket connection is established."""
31 print("WebSocket connection opened.")
32 print(f"Connected to: {API_ENDPOINT_BASE_URL}")
33
34 # Start sending audio data in a separate thread
35 def stream_audio():
36 global stream
37 print("Starting audio streaming...")
38 while not stop_event.is_set():
39 try:
40 audio_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False)
41
42 # Store audio data for WAV recording
43 with recording_lock:
44 recorded_frames.append(audio_data)
45
46 # Send audio data as binary message
47 ws.send(audio_data, websocket.ABNF.OPCODE_BINARY)
48 except Exception as e:
49 print(f"Error streaming audio: {e}")
50 # If stream read fails, likely means it's closed, stop the loop
51 break
52 print("Audio streaming stopped.")
53
54 global audio_thread
55 audio_thread = threading.Thread(target=stream_audio)
56 audio_thread.daemon = (
57 True # Allow main thread to exit even if this thread is running
58 )
59 audio_thread.start()
60
61def on_message(ws, message):
62 try:
63 data = json.loads(message)
64 msg_type = data.get('type')
65
66 if msg_type == "Begin":
67 session_id = data.get('id')
68 expires_at = data.get('expires_at')
69 print(f"Session started: {session_id}")
70 elif msg_type == "Turn":
71 end_of_turn = data.get('end_of_turn', False)
72
73 if end_of_turn:
74 transcript = data.get('transcript', '')
75 print(f"\nTranscript:\n{transcript}\n")
76 elif msg_type == "LLMGatewayResponse":
77 # Extract the LLM response content
78 llm_data = data.get('data', {})
79 llm_content = llm_data.get("choices", [{}])[0].get("message", {}).get("content", "")
80 print(f"LLM Response:\n{llm_content}\n")
81 elif msg_type == "Termination":
82 audio_duration = data.get('audio_duration_seconds', 0)
83 session_duration = data.get('session_duration_seconds', 0)
84 print(f"Session terminated: {audio_duration} seconds of audio processed")
85 except json.JSONDecodeError as e:
86 print(f"Error decoding message: {e}")
87 except Exception as e:
88 print(f"Error handling message: {e}")
89
90def on_error(ws, error):
91 """Called when a WebSocket error occurs."""
92 print(f"\nWebSocket Error: {error}")
93 # Attempt to signal stop on error
94 stop_event.set()
95
96
97def on_close(ws, close_status_code, close_msg):
98 """Called when the WebSocket connection is closed."""
99 print(f"\nWebSocket Disconnected: Status={close_status_code}, Msg={close_msg}")
100
101 # Save recorded audio to WAV file
102 save_wav_file()
103
104 # Ensure audio resources are released
105 global stream, audio
106 stop_event.set() # Signal audio thread just in case it's still running
107
108 if stream:
109 if stream.is_active():
110 stream.stop_stream()
111 stream.close()
112 stream = None
113 if audio:
114 audio.terminate()
115 audio = None
116 # Try to join the audio thread to ensure clean exit
117 if audio_thread and audio_thread.is_alive():
118 audio_thread.join(timeout=1.0)
2

Define a function to send the accumulated transcript to LLM Gateway for analysis. This function uses the LLM Gateway chat completions API to process the transcript with your prompt.

When using the raw WebSocket approach with llm_gateway in the connection parameters, LLM Gateway responses are received as LLMGatewayResponse messages through the WebSocket, handled by the on_message callback registered in the previous step. No separate API call is needed.

3

Run the streaming session and analyze the transcript with LLM Gateway when the session ends.

1# --- Main Execution ---
2def run():
3 global audio, stream, ws_app
4
5 # Initialize PyAudio
6 audio = pyaudio.PyAudio()
7
8 # Open microphone stream
9 try:
10 stream = audio.open(
11 input=True,
12 frames_per_buffer=FRAMES_PER_BUFFER,
13 channels=CHANNELS,
14 format=FORMAT,
15 rate=SAMPLE_RATE,
16 )
17 print("Microphone stream opened successfully.")
18 print("Speak into your microphone. Press Ctrl+C to stop.")
19 print("Audio will be saved to a WAV file when the session ends.")
20 except Exception as e:
21 print(f"Error opening microphone stream: {e}")
22 if audio:
23 audio.terminate()
24 return # Exit if microphone cannot be opened
25
26 # Create WebSocketApp
27 ws_app = websocket.WebSocketApp(
28 API_ENDPOINT,
29 header={"Authorization": YOUR_API_KEY},
30 on_open=on_open,
31 on_message=on_message,
32 on_error=on_error,
33 on_close=on_close,
34 )
35
36 # Run WebSocketApp in a separate thread to allow main thread to catch KeyboardInterrupt
37 ws_thread = threading.Thread(target=ws_app.run_forever)
38 ws_thread.daemon = True
39 ws_thread.start()
40
41 try:
42 # Keep main thread alive until interrupted
43 while ws_thread.is_alive():
44 time.sleep(0.1)
45 except KeyboardInterrupt:
46 print("\nCtrl+C received. Stopping...")
47 stop_event.set() # Signal audio thread to stop
48
49 # Send termination message to the server
50 if ws_app and ws_app.sock and ws_app.sock.connected:
51 try:
52 terminate_message = {"type": "Terminate"}
53 print(f"Sending termination message: {json.dumps(terminate_message)}")
54 ws_app.send(json.dumps(terminate_message))
55 # Give a moment for messages to process before forceful close
56 time.sleep(5)
57 except Exception as e:
58 print(f"Error sending termination message: {e}")
59
60 # Close the WebSocket connection (will trigger on_close)
61 if ws_app:
62 ws_app.close()
63
64 # Wait for WebSocket thread to finish
65 ws_thread.join(timeout=2.0)
66
67 except Exception as e:
68 print(f"\nAn unexpected error occurred: {e}")
69 stop_event.set()
70 if ws_app:
71 ws_app.close()
72 ws_thread.join(timeout=2.0)
73
74 finally:
75 # Final cleanup (already handled in on_close, but good as a fallback)
76 if stream and stream.is_active():
77 stream.stop_stream()
78 if stream:
79 stream.close()
80 if audio:
81 audio.terminate()
82 print("Cleanup complete. Exiting.")
83
84
85if __name__ == "__main__":
86 run()

The output will look something like this:

Session started: de5d9927-73a6-4be8-b52d-b4c07be37e6b
Transcript: Hi, my name is Sonny.
Transcript: I am a voice agent.
Stopping...
Session terminated: 12s of audio processed
Analyzing conversation with LLM Gateway...
The speaker introduces themselves as Sonny and identifies as a voice agent.

Next steps

In this tutorial, you’ve learned how to analyze streaming audio transcripts using LLM Gateway. The type of output depends on your prompt, so try exploring different prompts to see how they affect the output. Here are a few more prompts to try:

  • “Provide an analysis of the transcript and offer areas to improve with exact quotes.”
  • “What’s the main take-away from the transcript?”
  • “Generate a set of action items from this transcript.”

To learn more about LLM Gateway and streaming, see the following resources:

Need some help?

If you get stuck, or have any other questions, we’d love to help you out. Contact our support team at support@assemblyai.com or create a support ticket.