Multichannel streams
Multichannel streaming audio
To transcribe multichannel streaming audio, we recommend creating a separate session for each channel. This approach allows you to maintain clear speaker separation and get accurate diarized transcriptions for conversations, phone calls, or interviews where speakers are recorded on two different channels.
The following code example demonstrates how to transcribe a dual-channel audio file with diarized, speaker-separated transcripts. This same approach can be applied to any multi-channel audio stream, including those with more than two channels.
Python SDK
Python
JavaScript SDK
JavaScript
Use this complete script to transcribe dual-channel audio with speaker separation:
1 import logging 2 from typing import Type 3 import threading 4 import time 5 import wave 6 import numpy as np 7 import pyaudio 8 9 import assemblyai as aai 10 from assemblyai.streaming.v3 import ( 11 BeginEvent, 12 StreamingClient, 13 StreamingClientOptions, 14 StreamingError, 15 StreamingEvents, 16 StreamingParameters, 17 TerminationEvent, 18 TurnEvent, 19 ) 20 21 # Configuration 22 API_KEY = "<YOUR_API_KEY>" 23 AUDIO_FILE_PATH = "<DUAL_CHANNEL_AUDIO_FILE_PATH>" 24 25 logging.basicConfig(level=logging.INFO) 26 logger = logging.getLogger(__name__) 27 28 29 class ChannelTranscriber: 30 def __init__(self, channel_id, channel_name, sample_rate): 31 self.channel_id = channel_id 32 self.channel_name = channel_name 33 self.sample_rate = sample_rate 34 self.client = None 35 self.audio_data = [] 36 self.current_turn_line = None 37 self.line_count = 0 38 self.streaming_done = threading.Event() 39 40 def load_audio_channel(self): 41 """Extract single channel from dual-channel audio file.""" 42 with wave.open(AUDIO_FILE_PATH, 'rb') as wf: 43 frames = wf.readframes(wf.getnframes()) 44 audio_array = np.frombuffer(frames, dtype=np.int16) 45 46 if wf.getnchannels() == 2: 47 audio_array = audio_array.reshape(-1, 2) 48 channel_audio = audio_array[:, self.channel_id] 49 50 # Split into chunks for streaming 51 FRAMES_PER_BUFFER = 400 # 50ms chunks 52 for i in range(0, len(channel_audio), FRAMES_PER_BUFFER): 53 chunk = channel_audio[i:i+FRAMES_PER_BUFFER] 54 if len(chunk) < FRAMES_PER_BUFFER: 55 chunk = np.pad(chunk, (0, FRAMES_PER_BUFFER - len(chunk)), 'constant') 56 self.audio_data.append(chunk.astype(np.int16).tobytes()) 57 58 def clear_current_line(self): 59 if self.current_turn_line is not None: 60 print("\r" + " " * 100 + "\r", end="", flush=True) 61 62 def print_partial_transcript(self, words): 63 self.clear_current_line() 64 # Build transcript from individual words 65 word_texts = [word.text for word in words] 66 transcript = ' '.join(word_texts) 67 partial_text = f"{self.channel_name}: {transcript}" 68 print(partial_text, end="", flush=True) 69 self.current_turn_line = len(partial_text) 70 71 def print_final_transcript(self, transcript): 72 self.clear_current_line() 73 final_text = f"{self.channel_name}: {transcript}" 74 print(final_text, flush=True) 75 self.current_turn_line = None 76 self.line_count += 1 77 78 def on_begin(self, client: Type[StreamingClient], event: BeginEvent): 79 """Called when the streaming session begins.""" 80 pass # Session started 81 82 def on_turn(self, client: Type[StreamingClient], event: TurnEvent): 83 """Called when a turn is received.""" 84 transcript = event.transcript.strip() if event.transcript else '' 85 formatted = event.turn_is_formatted 86 words = event.words if event.words else [] 87 88 if transcript or words: 89 if formatted: 90 self.print_final_transcript(transcript) 91 else: 92 self.print_partial_transcript(words) 93 94 def on_terminated(self, client: Type[StreamingClient], event: TerminationEvent): 95 """Called when the session is terminated.""" 96 self.clear_current_line() 97 self.streaming_done.set() 98 99 def on_error(self, client: Type[StreamingClient], error: StreamingError): 100 """Called when an error occurs.""" 101 print(f"\n{self.channel_name}: Error: {error}") 102 self.streaming_done.set() 103 104 def start_transcription(self): 105 """Start the transcription for this channel.""" 106 self.load_audio_channel() 107 108 # Create streaming client 109 self.client = StreamingClient( 110 StreamingClientOptions( 111 api_key=API_KEY, 112 api_host="streaming.assemblyai.com", 113 ) 114 ) 115 116 # Register event handlers 117 self.client.on(StreamingEvents.Begin, self.on_begin) 118 self.client.on(StreamingEvents.Turn, self.on_turn) 119 self.client.on(StreamingEvents.Termination, self.on_terminated) 120 self.client.on(StreamingEvents.Error, self.on_error) 121 122 # Connect to streaming service with turn detection configuration 123 self.client.connect( 124 StreamingParameters( 125 sample_rate=self.sample_rate, 126 format_turns=True, 127 end_of_turn_confidence_threshold=0.4, 128 min_end_of_turn_silence_when_confident=160, 129 max_turn_silence=400, 130 ) 131 ) 132 133 # Create audio generator 134 def audio_generator(): 135 for chunk in self.audio_data: 136 yield chunk 137 time.sleep(0.05) # 50ms intervals 138 139 try: 140 # Stream audio 141 self.client.stream(audio_generator()) 142 finally: 143 # Disconnect 144 self.client.disconnect(terminate=True) 145 self.streaming_done.set() 146 147 def start_transcription_thread(self): 148 """Start transcription in a separate thread.""" 149 thread = threading.Thread(target=self.start_transcription, daemon=True) 150 thread.start() 151 return thread 152 153 154 def play_audio_file(): 155 try: 156 with wave.open(AUDIO_FILE_PATH, 'rb') as wf: 157 p = pyaudio.PyAudio() 158 159 stream = p.open( 160 format=p.get_format_from_width(wf.getsampwidth()), 161 channels=wf.getnchannels(), 162 rate=wf.getframerate(), 163 output=True 164 ) 165 166 print(f"Playing audio: {AUDIO_FILE_PATH}") 167 168 # Play audio in chunks 169 chunk_size = 1024 170 data = wf.readframes(chunk_size) 171 172 while data: 173 stream.write(data) 174 data = wf.readframes(chunk_size) 175 176 stream.stop_stream() 177 stream.close() 178 p.terminate() 179 180 print("Audio playback finished") 181 182 except Exception as e: 183 print(f"Error playing audio: {e}") 184 185 186 def transcribe_multichannel(): 187 # Get sample rate from file 188 with wave.open(AUDIO_FILE_PATH, 'rb') as wf: 189 sample_rate = wf.getframerate() 190 191 # Create transcribers for each channel 192 transcriber_1 = ChannelTranscriber(0, "Speaker 1", sample_rate) 193 transcriber_2 = ChannelTranscriber(1, "Speaker 2", sample_rate) 194 195 # Start audio playback 196 audio_thread = threading.Thread(target=play_audio_file, daemon=True) 197 audio_thread.start() 198 199 # Start both transcriptions 200 thread_1 = transcriber_1.start_transcription_thread() 201 thread_2 = transcriber_2.start_transcription_thread() 202 203 # Wait for completion 204 thread_1.join() 205 thread_2.join() 206 audio_thread.join() 207 208 209 if __name__ == "__main__": 210 transcribe_multichannel()
Configure turn detection for your use case
The examples above use turn detection settings optimized for short responses and rapid back-and-forth conversations. To optimize for your specific audio scenario, you can adjust the turn detection parameters.
For configuration examples tailored to different use cases, refer to our Configuration examples.
Python SDK
Python
JavaScript SDK
JavaScript
Modify the StreamingParameters in the start_transcription method:
1 # Connect to streaming service with turn detection configuration 2 self.client.connect( 3 StreamingParameters( 4 sample_rate=self.sample_rate, 5 format_turns=True, 6 end_of_turn_confidence_threshold=0.4, 7 min_end_of_turn_silence_when_confident=160, 8 max_turn_silence=400, 9 ) 10 )