Streaming Diarization and Multichannel

Identify and label individual speakers in real time, or transcribe multichannel audio using the Streaming API.

Streaming Diarization

Streaming Diarization lets you identify and label individual speakers in real time directly from the Streaming API. Each Turn event includes a speaker_label field (e.g. A, B) indicating which speaker produced that transcript. Speaker accuracy improves over the course of a session as the model accumulates embedding context — so the longer the conversation, the better the labels.

Diarization is supported on all streaming models: u3-rt-pro, universal-streaming-english, and universal-streaming-multilingual.

Already using AssemblyAI streaming?

You can enable Streaming Diarization by adding speaker_labels: true to your connection parameters. No other changes are required — the speaker_label field will appear on every Turn event automatically.

Quickstart

Get started with Streaming Diarization using the code below. This example streams audio from your microphone and prints each turn with its speaker label.

1

Install the required libraries

$pip install websocket-client pyaudio
2

Create a new file main.py and paste the code below. Replace <YOUR_API_KEY> with your API key.

3

Run with python main.py and speak into your microphone.

1import pyaudio
2import websocket
3import json
4import threading
5import time
6from urllib.parse import urlencode
7
8YOUR_API_KEY = "<YOUR_API_KEY>"
9CONNECTION_PARAMS = {
10 "sample_rate": 16000,
11 "speech_model": "u3-rt-pro",
12 "speaker_labels": "true",
13}
14API_ENDPOINT_BASE_URL = "wss://streaming.assemblyai.com/v3/ws"
15API_ENDPOINT = f"{API_ENDPOINT_BASE_URL}?{urlencode(CONNECTION_PARAMS)}"
16
17FRAMES_PER_BUFFER = 800
18SAMPLE_RATE = CONNECTION_PARAMS["sample_rate"]
19CHANNELS = 1
20FORMAT = pyaudio.paInt16
21
22audio = None
23stream = None
24ws_app = None
25audio_thread = None
26stop_event = threading.Event()
27
28def on_open(ws):
29 print("WebSocket connection opened.")
30
31 def stream_audio():
32 global stream
33 while not stop_event.is_set():
34 try:
35 audio_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False)
36 ws.send(audio_data, websocket.ABNF.OPCODE_BINARY)
37 except Exception as e:
38 print(f"Error streaming audio: {e}")
39 break
40
41 global audio_thread
42 audio_thread = threading.Thread(target=stream_audio)
43 audio_thread.daemon = True
44 audio_thread.start()
45
46def on_message(ws, message):
47 try:
48 data = json.loads(message)
49 msg_type = data.get("type")
50 if msg_type == "Begin":
51 print(f"Session began: ID={data.get('id')}")
52 elif msg_type == "Turn":
53 transcript = data.get("transcript", "")
54 speaker = data.get("speaker_label") or "UNKNOWN"
55 end_of_turn = data.get("end_of_turn", False)
56 if end_of_turn:
57 print(f"\r{' ' * 80}\r[{speaker}] {transcript}")
58 else:
59 print(f"\r[{speaker}] {transcript}", end="")
60 elif msg_type == "Termination":
61 print(f"\nSession terminated: {data.get('audio_duration_seconds', 0)}s of audio")
62 except Exception as e:
63 print(f"Error handling message: {e}")
64
65def on_error(ws, error):
66 print(f"\nWebSocket Error: {error}")
67 stop_event.set()
68
69def on_close(ws, close_status_code, close_msg):
70 print(f"\nWebSocket Disconnected: Status={close_status_code}")
71 global stream, audio
72 stop_event.set()
73 if stream:
74 if stream.is_active():
75 stream.stop_stream()
76 stream.close()
77 if audio:
78 audio.terminate()
79
80def run():
81 global audio, stream, ws_app
82 audio = pyaudio.PyAudio()
83 stream = audio.open(
84 input=True,
85 frames_per_buffer=FRAMES_PER_BUFFER,
86 channels=CHANNELS,
87 format=FORMAT,
88 rate=SAMPLE_RATE,
89 )
90 print("Speak into your microphone. Press Ctrl+C to stop.")
91 ws_app = websocket.WebSocketApp(
92 API_ENDPOINT,
93 header={"Authorization": YOUR_API_KEY},
94 on_open=on_open,
95 on_message=on_message,
96 on_error=on_error,
97 on_close=on_close,
98 )
99 ws_thread = threading.Thread(target=ws_app.run_forever)
100 ws_thread.daemon = True
101 ws_thread.start()
102 try:
103 while ws_thread.is_alive():
104 time.sleep(0.1)
105 except KeyboardInterrupt:
106 print("\nStopping...")
107 stop_event.set()
108 if ws_app and ws_app.sock and ws_app.sock.connected:
109 ws_app.send(json.dumps({"type": "Terminate"}))
110 time.sleep(2)
111 if ws_app:
112 ws_app.close()
113 ws_thread.join(timeout=2.0)
114
115if __name__ == "__main__":
116 run()

Configuration

Enable Streaming Diarization by adding speaker_labels: true to your connection parameters. You can optionally cap the number of speakers with max_speakers.

ParameterTypeDefaultDescription
speaker_labelsbooleanfalseSet to true to enable real-time speaker diarization.
max_speakersintegerOptional. Hint the maximum number of speakers expected (1–10). Setting this accurately can improve assignment accuracy when you know the speaker count in advance.
1{
2 "speech_model": "u3-rt-pro",
3 "speaker_labels": true,
4 "max_speakers": 2
5}

Diarization is supported on u3-rt-pro, universal-streaming-english, and universal-streaming-multilingual. You do not need to change your speech model to use it — just add speaker_labels: true.

Reading speaker labels

When diarization is enabled, every Turn event includes a speaker_label field with a label such as A, B, and so on.

1{
2 "type": "Turn",
3 "transcript": "Good morning, thanks for joining the call.",
4 "speaker_label": "A",
5 "end_of_turn": true,
6 "turn_is_formatted": true
7}

If a turn contains less than approximately 1 second of audio, the speaker_label will be set to "UNKNOWN". This is because the model needs at least ~1 second of audio to generate a reliable diarization embedding — without enough audio, embeddings may be inaccurate and could lead to a single speaker being labeled as multiple speakers. Labeling short turns as "UNKNOWN" ensures that speaker labels remain as accurate as possible.

1{
2 "type": "Turn",
3 "transcript": "Hello?",
4 "speaker_label": "UNKNOWN",
5 "end_of_turn": true,
6 "turn_is_formatted": true
7}

Your application should handle this case gracefully.

A typical multi-speaker exchange looks like this:

[A] Good morning, thanks for joining the call.
[B] Good morning. Happy to be here.
[A] So let's start with a quick overview of the project timeline.
[B] Sure. We're currently on track for the March deadline.
[A] Great. And how's the team handling the workload?
[C] It's been busy, but manageable. We brought on two new engineers last week.

How speaker accuracy improves over time

Streaming Diarization builds a speaker profile incrementally as audio flows in. In practice this means:

  • Early in a session, speaker assignments may be less stable, especially if the first few turns are short.
  • As the session progresses, the model accumulates richer speaker embeddings and assignments become more consistent.

For long-form use cases (call center, clinical scribe, meeting transcription), the model will settle into accurate, stable labels well before the end of the conversation.

Known limitations

Real-time diarization is an inherently harder problem than diarization for async transcription on pre-recorded audio. The following limitations apply to the current beta:

  • Short utterances — Turns with less than ~1 second of audio are labeled as "UNKNOWN" because there is insufficient audio to generate a reliable speaker embedding. This prevents inaccurate embeddings from causing a single speaker to be split across multiple labels.
  • Overlapping speech — When two speakers talk simultaneously, the model cannot split the audio and will assign the turn to a single speaker. Performance degrades with frequent cross-talk.
  • Session start accuracy — The first 1–2 turns of a session may be misassigned because the model has not yet built up speaker profiles. This self-corrects quickly in practice.
  • Noisy environments — Background noise and microphone bleed between speakers can reduce embedding quality and lead to more frequent misassignments.

For the best results, use a microphone setup that minimizes cross-talk and background noise, and ensure each speaker produces at least a few complete sentences before you rely on per-turn labels for downstream processing.

Supported models

Modelspeech_model valueDiarization supported
Universal-3 Pro Streamingu3-rt-pro
Universal Streaming (English)universal-streaming-english
Universal Streaming (Multilingual)universal-streaming-multilingual

Multichannel streaming audio

To transcribe multichannel streaming audio, we recommend creating a separate session for each channel. This approach allows you to maintain clear speaker separation and get accurate diarized transcriptions for conversations, phone calls, or interviews where speakers are recorded on two different channels.

The following code example demonstrates how to transcribe a dual-channel audio file with diarized, speaker-separated transcripts. This same approach can be applied to any multi-channel audio stream, including those with more than two channels.

1

Firstly, install the required dependencies.

$pip install websocket-client numpy pyaudio
2

Use this complete script to transcribe dual-channel audio with speaker separation:

1import websocket
2import json
3import threading
4import numpy as np
5import wave
6import time
7import pyaudio
8from urllib.parse import urlencode
9
10# Configuration
11YOUR_API_KEY = "<YOUR_API_KEY>"
12AUDIO_FILE_PATH = "<DUAL_CHANNEL_AUDIO_FILE_PATH>"
13API_BASE_URL = "wss://streaming.assemblyai.com/v3/ws"
14API_PARAMS = {
15 "sample_rate": 8000,
16 "format_turns": "true",
17 "end_of_turn_confidence_threshold": 0.4,
18 "min_turn_silence": 160,
19 "max_turn_silence": 400,
20}
21# Build API endpoint with URL encoding
22API_ENDPOINT = f"{API_BASE_URL}?{urlencode(API_PARAMS)}"
23
24class ChannelTranscriber:
25 def __init__(self, channel_id, channel_name):
26 self.channel_id = channel_id
27 self.channel_name = channel_name
28 self.ws_app = None
29 self.audio_data = []
30 self.current_turn_line = None
31 self.line_count = 0
32
33 def load_audio_channel(self):
34 """Extract single channel from dual-channel audio file."""
35 with wave.open(AUDIO_FILE_PATH, 'rb') as wf:
36 frames = wf.readframes(wf.getnframes())
37 audio_array = np.frombuffer(frames, dtype=np.int16)
38
39 if wf.getnchannels() == 2:
40 audio_array = audio_array.reshape(-1, 2)
41 channel_audio = audio_array[:, self.channel_id]
42
43 # Split into chunks for streaming
44 FRAMES_PER_BUFFER = 400 # 50ms chunks
45 for i in range(0, len(channel_audio), FRAMES_PER_BUFFER):
46 chunk = channel_audio[i:i+FRAMES_PER_BUFFER]
47 if len(chunk) < FRAMES_PER_BUFFER:
48 chunk = np.pad(chunk, (0, FRAMES_PER_BUFFER - len(chunk)), 'constant')
49 self.audio_data.append(chunk.astype(np.int16).tobytes())
50
51 def on_open(self, ws):
52 """Stream audio data when connection opens."""
53 def stream_audio():
54 for chunk in self.audio_data:
55 ws.send(chunk, websocket.ABNF.OPCODE_BINARY)
56 time.sleep(0.05) # 50ms intervals
57
58 # Send termination message
59 terminate_message = {"type": "Terminate"}
60 ws.send(json.dumps(terminate_message))
61
62 threading.Thread(target=stream_audio, daemon=True).start()
63
64 def clear_current_line(self):
65 if self.current_turn_line is not None:
66 print("\r" + " " * 100 + "\r", end="", flush=True)
67
68 def print_partial_transcript(self, words):
69 self.clear_current_line()
70 # Build transcript from individual words
71 word_texts = [word.get('text', '') for word in words]
72 transcript = ' '.join(word_texts)
73 partial_text = f"{self.channel_name}: {transcript}"
74 print(partial_text, end="", flush=True)
75 self.current_turn_line = len(partial_text)
76
77 def print_final_transcript(self, transcript):
78 self.clear_current_line()
79 final_text = f"{self.channel_name}: {transcript}"
80 print(final_text, flush=True)
81 self.current_turn_line = None
82 self.line_count += 1
83
84 def on_message(self, ws, message):
85 """Handle transcription results."""
86 data = json.loads(message)
87 msg_type = data.get('type')
88
89 if msg_type == "Turn":
90 transcript = data.get('transcript', '').strip()
91 formatted = data.get('turn_is_formatted', False)
92 words = data.get('words', [])
93
94 if transcript or words:
95 if formatted:
96 self.print_final_transcript(transcript)
97 else:
98 self.print_partial_transcript(words)
99
100 def start_transcription(self):
101 self.load_audio_channel()
102
103 self.ws_app = websocket.WebSocketApp(
104 API_ENDPOINT,
105 header={"Authorization": YOUR_API_KEY},
106 on_open=self.on_open,
107 on_message=self.on_message,
108 )
109
110 thread = threading.Thread(target=self.ws_app.run_forever, daemon=True)
111 thread.start()
112 return thread
113
114def play_audio_file():
115 try:
116 with wave.open(AUDIO_FILE_PATH, 'rb') as wf:
117 p = pyaudio.PyAudio()
118
119 stream = p.open(
120 format=p.get_format_from_width(wf.getsampwidth()),
121 channels=wf.getnchannels(),
122 rate=wf.getframerate(),
123 output=True
124 )
125
126 print(f"Playing audio: {AUDIO_FILE_PATH}")
127
128 # Play audio in chunks
129 chunk_size = 1024
130 data = wf.readframes(chunk_size)
131
132 while data:
133 stream.write(data)
134 data = wf.readframes(chunk_size)
135
136 stream.stop_stream()
137 stream.close()
138 p.terminate()
139
140 print("Audio playback finished")
141
142 except Exception as e:
143 print(f"Error playing audio: {e}")
144
145
146def transcribe_multichannel():
147 # Create transcribers for each channel
148 transcriber_1 = ChannelTranscriber(0, "Speaker 1")
149 transcriber_2 = ChannelTranscriber(1, "Speaker 2")
150
151 # Start audio playback
152 audio_thread = threading.Thread(target=play_audio_file, daemon=True)
153 audio_thread.start()
154
155 # Start both transcriptions
156 thread_1 = transcriber_1.start_transcription()
157 thread_2 = transcriber_2.start_transcription()
158
159 # Wait for completion
160 thread_1.join()
161 thread_2.join()
162 audio_thread.join()
163
164if __name__ == "__main__":
165 transcribe_multichannel()
Configure turn detection for your use case

The examples above use turn detection settings optimized for short responses and rapid back-and-forth conversations. To optimize for your specific audio scenario, you can adjust the turn detection parameters.

For configuration examples tailored to different use cases, refer to our Configuration examples.

Modify the turn detection parameters in API_PARAMS:

1API_PARAMS = {
2 "sample_rate": 8000,
3 "format_turns": "true",
4 "end_of_turn_confidence_threshold": 0.4,
5 "min_turn_silence": 160,
6 "max_turn_silence": 400,
7}