Migrating from Streaming v2 to Streaming v3 (Python)

This cookbook guides you through migrating from AssemblyAI’s legacy Streaming STT model (v2) to our latest Universal Streaming STT model (v3), which provides ultra-low latency for faster transcription, intelligent endpointing for more natural speech detection, and improved accuracy across various audio conditions.

Check out this blog post to learn more about this new model!

Overview of changes

The migration involves several key improvements:

  • API Version: Upgrade from v2 (/v2/realtime/ws) to v3 (/v3/ws)
  • Enhanced Error Handling: Robust cleanup and resource management
  • Improved Threading: Better control over audio streaming threads
  • Modern Message Format: Updated message types and structure
  • Configuration Options: More flexible connection parameters
  • Graceful Shutdown: Proper termination handling

You can follow the step-by-step guide below to make changes to your existing code but here is what your code should look like in the end:

1import pyaudio
2import websocket
3import json
4import threading
5import time
6from urllib.parse import urlencode
7from datetime import datetime
8
9# --- Configuration ---
10YOUR_API_KEY = "YOUR-API-KEY" # Replace with your actual API key
11
12CONNECTION_PARAMS = {
13 "sample_rate": 16000,
14 "format_turns": True, # Request formatted final transcripts
15}
16API_ENDPOINT_BASE_URL = "wss://streaming.assemblyai.com/v3/ws"
17API_ENDPOINT = f"{API_ENDPOINT_BASE_URL}?{urlencode(CONNECTION_PARAMS)}"
18
19# Audio Configuration
20FRAMES_PER_BUFFER = 800 # 50ms of audio (0.05s * 16000Hz)
21SAMPLE_RATE = CONNECTION_PARAMS["sample_rate"]
22CHANNELS = 1
23FORMAT = pyaudio.paInt16
24
25# Global variables for audio stream and websocket
26audio = None
27stream = None
28ws_app = None
29audio_thread = None
30stop_event = threading.Event() # To signal the audio thread to stop
31
32# --- WebSocket Event Handlers ---
33
34
35def on_open(ws):
36 """Called when the WebSocket connection is established."""
37 print("WebSocket connection opened.")
38 print(f"Connected to: {API_ENDPOINT}")
39
40 # Start sending audio data in a separate thread
41 def stream_audio():
42 global stream
43 print("Starting audio streaming...")
44 while not stop_event.is_set():
45 try:
46 audio_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False)
47 # Send audio data as binary message
48 ws.send(audio_data, websocket.ABNF.OPCODE_BINARY)
49 except Exception as e:
50 print(f"Error streaming audio: {e}")
51 # If stream read fails, likely means it's closed, stop the loop
52 break
53 print("Audio streaming stopped.")
54
55 global audio_thread
56 audio_thread = threading.Thread(target=stream_audio)
57 audio_thread.daemon = (
58 True # Allow main thread to exit even if this thread is running
59 )
60 audio_thread.start()
61
62def on_message(ws, message):
63 try:
64 data = json.loads(message)
65 msg_type = data.get('type')
66
67 if msg_type == "Begin":
68 session_id = data.get('id')
69 expires_at = data.get('expires_at')
70 print(f"\nSession began: ID={session_id}, ExpiresAt={datetime.fromtimestamp(expires_at)}")
71 elif msg_type == "Turn":
72 transcript = data.get('transcript', '')
73 if data.get('end_of_turn'):
74 print('\r' + ' ' * 80 + '\r', end='')
75 print(transcript)
76 else:
77 print(f"\r{transcript}", end='')
78 elif msg_type == "Termination":
79 audio_duration = data.get('audio_duration_seconds', 0)
80 session_duration = data.get('session_duration_seconds', 0)
81 print(f"\nSession Terminated: Audio Duration={audio_duration}s, Session Duration={session_duration}s")
82 except json.JSONDecodeError as e:
83 print(f"Error decoding message: {e}")
84 except Exception as e:
85 print(f"Error handling message: {e}")
86
87def on_error(ws, error):
88 """Called when a WebSocket error occurs."""
89 print(f"\nWebSocket Error: {error}")
90 # Attempt to signal stop on error
91 stop_event.set()
92
93
94def on_close(ws, close_status_code, close_msg):
95 """Called when the WebSocket connection is closed."""
96 print(f"\nWebSocket Disconnected: Status={close_status_code}, Msg={close_msg}")
97 # Ensure audio resources are released
98 global stream, audio
99 stop_event.set() # Signal audio thread just in case it's still running
100
101 if stream:
102 if stream.is_active():
103 stream.stop_stream()
104 stream.close()
105 stream = None
106 if audio:
107 audio.terminate()
108 audio = None
109 # Try to join the audio thread to ensure clean exit
110 if audio_thread and audio_thread.is_alive():
111 audio_thread.join(timeout=1.0)
112
113
114# --- Main Execution ---
115def run():
116 global audio, stream, ws_app
117
118 # Initialize PyAudio
119 audio = pyaudio.PyAudio()
120
121 # Open microphone stream
122 try:
123 stream = audio.open(
124 input=True,
125 frames_per_buffer=FRAMES_PER_BUFFER,
126 channels=CHANNELS,
127 format=FORMAT,
128 rate=SAMPLE_RATE,
129 )
130 print("Microphone stream opened successfully.")
131 print("Speak into your microphone. Press Ctrl+C to stop.")
132 except Exception as e:
133 print(f"Error opening microphone stream: {e}")
134 if audio:
135 audio.terminate()
136 return # Exit if microphone cannot be opened
137
138 # Create WebSocketApp
139 ws_app = websocket.WebSocketApp(
140 API_ENDPOINT,
141 header={"Authorization": YOUR_API_KEY},
142 on_open=on_open,
143 on_message=on_message,
144 on_error=on_error,
145 on_close=on_close,
146 )
147
148 # Run WebSocketApp in a separate thread to allow main thread to catch KeyboardInterrupt
149 ws_thread = threading.Thread(target=ws_app.run_forever)
150 ws_thread.daemon = True
151 ws_thread.start()
152
153 try:
154 # Keep main thread alive until interrupted
155 while ws_thread.is_alive():
156 time.sleep(0.1)
157 except KeyboardInterrupt:
158 print("\nCtrl+C received. Stopping...")
159 stop_event.set() # Signal audio thread to stop
160
161 # Send termination message to the server
162 if ws_app and ws_app.sock and ws_app.sock.connected:
163 try:
164 terminate_message = {"type": "Terminate"}
165 print(f"Sending termination message: {json.dumps(terminate_message)}")
166 ws_app.send(json.dumps(terminate_message))
167 # Give a moment for messages to process before forceful close
168 time.sleep(5)
169 except Exception as e:
170 print(f"Error sending termination message: {e}")
171
172 # Close the WebSocket connection (will trigger on_close)
173 if ws_app:
174 ws_app.close()
175
176 # Wait for WebSocket thread to finish
177 ws_thread.join(timeout=2.0)
178
179 except Exception as e:
180 print(f"\nAn unexpected error occurred: {e}")
181 stop_event.set()
182 if ws_app:
183 ws_app.close()
184 ws_thread.join(timeout=2.0)
185
186 finally:
187 # Final cleanup (already handled in on_close, but good as a fallback)
188 if stream and stream.is_active():
189 stream.stop_stream()
190 if stream:
191 stream.close()
192 if audio:
193 audio.terminate()
194 print("Cleanup complete. Exiting.")
195
196
197if __name__ == "__main__":
198 run()

For more information on our Universal Streaming feature, see this section of our official documentation.

Step-by-step migration guide

1. Update API endpoint and configuration

Before (v2):

1ws = websocket.WebSocketApp(
2 f'wss://api.assemblyai.com/v2/realtime/ws?sample_rate={SAMPLE_RATE}',
3 header={'Authorization': YOUR_API_KEY},
4 on_message=on_message,
5 on_open=on_open,
6 on_error=on_error,
7 on_close=on_close
8)

After (v3):

1CONNECTION_PARAMS = {
2 "sample_rate": 16000,
3 "format_turns": True, # Request formatted final transcripts
4}
5API_ENDPOINT_BASE_URL = "wss://streaming.assemblyai.com/v3/ws"
6API_ENDPOINT = f"{API_ENDPOINT_BASE_URL}?{urlencode(CONNECTION_PARAMS)}"
7
8ws_app = websocket.WebSocketApp(
9 API_ENDPOINT,
10 header={"Authorization": YOUR_API_KEY},
11 # ...
12)

Key Changes:

  • New base URL: streaming.assemblyai.com instead of api.assemblyai.com
  • Version upgrade: /v3/ws instead of /v2/realtime/ws
  • Configuration via URL parameters using urlencode()
  • Added format_turns option for better transcript formatting

2. Improve audio configuration

Before (v2):

1FRAMES_PER_BUFFER = 3200 # 200ms of audio
2SAMPLE_RATE = 16000
3CHANNELS = 1
4FORMAT = pyaudio.paInt16

After (v3):

1FRAMES_PER_BUFFER = 800 # 50ms of audio (0.05s * 16000Hz)
2SAMPLE_RATE = CONNECTION_PARAMS["sample_rate"]
3CHANNELS = 1
4FORMAT = pyaudio.paInt16

Key Changes:

  • Reduced buffer size from 200ms to 50ms for lower latency
  • Sample rate now references the configuration parameter
  • Added detailed comments explaining the calculations

3. Enhance thread management

Before (v2):

1def on_open(ws):
2 def stream_audio():
3 while True:
4 try:
5 audio_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False)
6 ws.send(audio_data, websocket.ABNF.OPCODE_BINARY)
7 except Exception as e:
8 print(f'\nError streaming audio: {e}')
9 break
10
11 audio_thread = Thread(target=stream_audio, daemon=True)
12 audio_thread.start()

After (v3):

1# Global variables for better resource management
2stop_event = threading.Event()
3audio_thread = None
4
5def on_open(ws):
6 def stream_audio():
7 while not stop_event.is_set():
8 try:
9 audio_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False)
10 ws.send(audio_data, websocket.ABNF.OPCODE_BINARY)
11 except Exception as e:
12 print(f"Error streaming audio: {e}")
13 break
14
15 global audio_thread
16 audio_thread = threading.Thread(target=stream_audio)
17 audio_thread.daemon = True
18 audio_thread.start()

Key Changes:

  • Added threading.Event() for controlled thread termination
  • Global audio_thread variable for better lifecycle management
  • Condition-based loop (while not stop_event.is_set()) instead of infinite loop
  • Improved error handling and logging

4. Update message handling

Before (v2):

1def on_message(ws, message):
2 try:
3 msg = json.loads(message)
4 msg_type = msg.get('message_type')
5
6 if msg_type == 'SessionBegins':
7 session_id = msg.get('session_id')
8 print("Session ID:", session_id)
9 return
10
11 text = msg.get('text', '')
12 if not text:
13 return
14
15 if msg_type == 'PartialTranscript':
16 print(text, end='\r')
17 elif msg_type == 'FinalTranscript':
18 print(text, end='\r\n')
19 elif msg_type == 'error':
20 print(f'\nError: {msg.get("error", "Unknown error")}')
21 except Exception as e:
22 print(f'\nError handling message: {e}')

After (v3):

1def on_message(ws, message):
2 try:
3 data = json.loads(message)
4 msg_type = data.get('type')
5
6 if msg_type == "Begin":
7 session_id = data.get('id')
8 expires_at = data.get('expires_at')
9 print(f"\nSession began: ID={session_id}, ExpiresAt={datetime.fromtimestamp(expires_at)}")
10 elif msg_type == "Turn":
11 transcript = data.get('transcript', '')
12 if data.get('end_of_turn'):
13 print('\r' + ' ' * 80 + '\r', end='')
14 print(transcript)
15 else:
16 print(f"\r{transcript}", end='')
17 elif msg_type == "Termination":
18 audio_duration = data.get('audio_duration_seconds', 0)
19 session_duration = data.get('session_duration_seconds', 0)
20 print(f"\nSession Terminated: Audio Duration={audio_duration}s, Session Duration={session_duration}s")
21 except json.JSONDecodeError as e:
22 print(f"Error decoding message: {e}")
23 except Exception as e:
24 print(f"Error handling message: {e}")

Key Changes:

  • Message types renamed: SessionBeginsBegin, PartialTranscript/FinalTranscriptTurn
  • Field names updated: message_typetype, session_idid, texttranscript
  • Added session expiration timestamp handling
  • Improved transcript formatting with turn_is_formatted flag
  • Added Termination message handling with session statistics
  • Enhanced error handling with specific JSONDecodeError catch

5. Implement robust resource management

Before (v2):

1def on_close(ws, status, msg):
2 stream.stop_stream()
3 stream.close()
4 audio.terminate()
5 print('\nDisconnected')
6
7# Global audio resources (potential for memory leaks)
8audio = pyaudio.PyAudio()
9stream = audio.open(...)

After (v3):

1def on_close(ws, close_status_code, close_msg):
2 print(f"\nWebSocket Disconnected: Status={close_status_code}, Msg={close_msg}")
3 global stream, audio
4 stop_event.set() # Signal audio thread to stop
5
6 if stream:
7 if stream.is_active():
8 stream.stop_stream()
9 stream.close()
10 stream = None
11 if audio:
12 audio.terminate()
13 audio = None
14 # Ensure audio thread cleanup
15 if audio_thread and audio_thread.is_alive():
16 audio_thread.join(timeout=1.0)
17
18def on_error(ws, error):
19 print(f"\nWebSocket Error: {error}")
20 stop_event.set() # Signal threads to stop on error

Key Changes:

  • Added thread stop signaling via stop_event.set()
  • Conditional resource cleanup with null checks
  • Proper thread joining with timeout
  • Resource nullification to prevent reuse
  • Enhanced error handling in on_error

6. Add graceful shutdown handling

Before (v2):

1try:
2 ws.run_forever()
3except Exception as e:
4 print(f'\nError: {e}')

After (v3):

1def run():
2 # ... initialization code ...
3
4 ws_thread = threading.Thread(target=ws_app.run_forever)
5 ws_thread.daemon = True
6 ws_thread.start()
7
8 try:
9 while ws_thread.is_alive():
10 time.sleep(0.1)
11 except KeyboardInterrupt:
12 print("\nCtrl+C received. Stopping...")
13 stop_event.set()
14
15 # Send termination message
16 if ws_app and ws_app.sock and ws_app.sock.connected:
17 try:
18 terminate_message = {"type": "Terminate"}
19 ws_app.send(json.dumps(terminate_message))
20 time.sleep(5) # Allow message processing
21 except Exception as e:
22 print(f"Error sending termination message: {e}")
23
24 ws_app.close()
25 ws_thread.join(timeout=2.0)
26 finally:
27 # Final cleanup
28 # ... resource cleanup code ...

Key Changes:

  • WebSocket runs in separate thread for better control
  • Proper KeyboardInterrupt handling
  • Graceful termination message sending
  • Thread joining with timeouts
  • Comprehensive cleanup in finally block

Note: Pricing is based on session duration so it is very important to close sessions properly to avoid unexpected usage and cost.

7. Improve error handling and logging

Before (v2):

  • Basic error printing
  • Limited context in error messages
  • No resource cleanup on errors

After (v3):

  • Detailed error context and timestamps
  • Proper exception type handling
  • Resource cleanup on all error paths
  • Connection status checking before operations

Migration checklist

  • Update API endpoint from v2 to v3
  • Change base URL to streaming.assemblyai.com
  • Update message type handling (Begin, Turn, Termination)
  • Implement threading.Event() for thread control
  • Add proper resource cleanup in all code paths
  • Update field names in message parsing
  • Add graceful shutdown with termination messages
  • Implement timeout-based thread joining
  • Add detailed error logging with context
  • Test KeyboardInterrupt handling
  • Verify audio resource cleanup
  • Test connection failure scenarios

Testing your migration

  1. Basic Functionality: Verify transcription works with simple speech
  2. Error Handling: Test with invalid API keys or network issues
  3. Graceful Shutdown: Test Ctrl+C interruption
  4. Resource Cleanup: Monitor for memory leaks during extended use
  5. Thread Management: Verify proper thread termination
  6. Message Formatting: Test with format_turns enabled/disabled

Common migration issues

Issue: “WebSocket connection failed”

Solution: Verify you’re using the new v3 endpoint URL and proper authentication header format.

Issue: “Message type not recognized”

Solution: Update message type handling from old names (SessionBegins, PartialTranscript) to new ones (Begin, Turn).

Issue: “Audio thread won’t stop”

Solution: Ensure you’re using threading.Event() and calling stop_event.set() in error handlers.

Issue: “Resource leak warnings”

Solution: Verify all audio resources are properly cleaned up in on_close and finally blocks.

Benefits of migration

  • Improved Reliability: Better error handling and recovery
  • Lower Latency: Reduced buffer sizes for faster response
  • Enhanced Features: Formatted transcripts and session statistics
  • Better Resource Management: Proper cleanup prevents memory leaks
  • Graceful Shutdown: Clean termination with proper cleanup
  • Modern Architecture: Improved threading and event handling

Conclusion

This migration provides a more robust, maintainable, and feature-rich streaming transcription implementation. The enhanced error handling, resource management, and modern API features make it suitable for production use cases where reliability and performance are critical.