Generate Custom Speaker Labels with Pyannote

In this guide, we’ll show you how to generate Speaker Labels using Pyannote with an AssemblyAI transcript. This can be used to generate Speaker Labels for languages we currently do not support for speaker labelling.

Quickstart

1import os
2import assemblyai as aai
3from pyannote.audio import Pipeline
4import torch
5import pandas as pd
6import numpy as np
7
8# Assign your API keys
9HUGGING_FACE_TOKEN = os.getenv("HF_TOKEN")
10ASSEMBLYAI_API_KEY = os.getenv("ASSEMBLYAI_API_KEY")
11
12# Authenticate with AssemblyAI
13aai.settings.api_key = ASSEMBLYAI_API_KEY
14
15def transcribe_audio(audio_file, language="en"):
16 """
17 Transcribe an audio file using AssemblyAI.
18
19 Args:
20 audio_file (str): Path to the audio file.
21 language (str, optional): Language code for transcription. Defaults to "en".
22
23 Returns:
24 aai.Transcript: The transcription result.
25 """
26
27 transcriber = aai.Transcriber(config=aai.TranscriptionConfig(speech_model='nano', language_code=language))
28 transcript = transcriber.transcribe(audio_file)
29 print(f"Transcript ID: {transcript.id}")
30 return transcript
31
32def get_speaker_labels(audio_file, transcript: aai.Transcript):
33 """
34 Perform speaker diarization on an audio file and combine results with the transcript.
35
36 Args:
37 audio_file (str): Path to the audio file.
38 transcript (aai.Transcript): The transcription result from AssemblyAI.
39
40 Returns:
41 str: A formatted string containing the transcript with speaker labels and timestamps.
42 """
43 # Initialize the speaker diarization pipeline with GPU support
44 device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
45 pipeline = Pipeline.from_pretrained(
46 "pyannote/speaker-diarization",
47 use_auth_token=HUGGING_FACE_TOKEN,
48 )
49
50 if pipeline is None:
51 raise ValueError("Failed to initialize the pipeline. Please check your authentication token and internet connection.")
52 else:
53 pipeline = pipeline.to(device)
54
55 # Apply the pipeline to the audio file
56 diarization = pipeline(audio_file)
57
58 # Create a dictionary to store speaker segments
59 speaker_segments = {}
60
61 # Process diarization results
62 for turn, _, speaker in diarization.itertracks(yield_label=True):
63 start, end = turn.start, turn.end
64 if speaker not in speaker_segments:
65 speaker_segments[speaker] = []
66 speaker_segments[speaker].append((start, end))
67
68 # Convert speaker_segments to a DataFrame
69 diarize_df = pd.DataFrame([(speaker, start, end)
70 for speaker, segments in speaker_segments.items()
71 for start, end in segments],
72 columns=['speaker', 'start', 'end'])
73
74 # Assign speakers to transcript words
75 for word in transcript.words:
76 word_start = float(word.start) / 1000
77 word_end = float(word.end) / 1000
78
79 overlaps = diarize_df[
80 (diarize_df['start'] <= word_end) & (diarize_df['end'] >= word_start)
81 ].copy()
82
83 if not overlaps.empty:
84 overlaps['overlap'] = np.minimum(overlaps['end'], word_end) - np.maximum(overlaps['start'], word_start)
85 word.speaker = overlaps.loc[overlaps['overlap'].idxmax(), 'speaker']
86 else:
87 word.speaker = "Unknown"
88
89 full_transcript = ''
90
91 # Update segment speakers based on the majority speaker of its words
92 for segment in transcript.get_sentences():
93 segment_start = float(segment.start) / 1000
94 segment_end = float(segment.end) / 1000
95
96 overlaps = diarize_df[
97 (diarize_df['start'] <= segment_end) & (diarize_df['end'] >= segment_start)
98 ].copy()
99
100 if not overlaps.empty:
101 overlaps['overlap'] = np.minimum(overlaps['end'], segment_end) - np.maximum(overlaps['start'], segment_start)
102 segment.speaker = overlaps.loc[overlaps['overlap'].idxmax(), 'speaker']
103 speaker_label = segment.speaker.replace('SPEAKER_', 'SPEAKER ')
104 full_transcript += f'[{format_timestamp(segment_start)}] {speaker_label}: {segment.text}\n'
105 else:
106 segment.speaker = "Unknown"
107 full_transcript += f'[{format_timestamp(segment_start)}] Unknown: {segment.text}\n'
108
109 return full_transcript
110
111def format_timestamp(seconds):
112 """
113 Convert seconds to a formatted timestamp string (HH:MM:SS).
114
115 Args:
116 seconds (float): Time in seconds.
117
118 Returns:
119 str: Formatted timestamp string.
120 """
121 hours, remainder = divmod(int(seconds), 3600)
122 minutes, seconds = divmod(remainder, 60)
123 return f"{hours:02d}:{minutes:02d}:{seconds:02d}"
124
125audio_file = "audio.wav" # your local file path
126transcript: aai.Transcript = transcribe_audio(audio_file, language="hr") # select a language code
127transcript_with_speakers = get_speaker_labels(audio_file, transcript)
128print(transcript_with_speakers)

Get Started

Before we begin, make sure you have an AssemblyAI account and an API key. You can sign up for a free account and get your API key from your dashboard.

You’ll also need a HuggingFace account and API key. You can sign up for a free account and get your API key here. Create a Read type API token to ensure the necessary permissions are enabled.

Browse to the speaker-diarization and segmentation model pages and accept the Gated Model Terms & Conditions by entering your Company/University, Website and Use Case details in order to gain access to the use of these models.

Step-by-Step Instructions

Install the necessary dependencies.

$pip install assemblyai pyannote.audio torch pandas numpy

Import the necessary dependencies, assign your API keys and authenticate with AssemblyAI.

1import os
2import assemblyai as aai
3from pyannote.audio import Pipeline
4import torch
5import pandas as pd
6import numpy as np
7
8# Assign your API keys
9HUGGING_FACE_TOKEN = os.getenv("HF_TOKEN")
10ASSEMBLYAI_API_KEY = os.getenv("ASSEMBLYAI_API_KEY")
11
12# Authenticate with AssemblyAI
13aai.settings.api_key = ASSEMBLYAI_API_KEY

Create the transcribe_audio function, this will handle the transcription process with AssemblyAI.

1def transcribe_audio(audio_file, language="en"):
2 """
3 Transcribe an audio file using AssemblyAI.
4
5 Args:
6 audio_file (str): Path to the audio file.
7 language (str, optional): Language code for transcription. Defaults to "en".
8
9 Returns:
10 aai.Transcript: The transcription result.
11 """
12
13 transcriber = aai.Transcriber(config=aai.TranscriptionConfig(speech_model='nano', language_code=language))
14 transcript = transcriber.transcribe(audio_file)
15 print(f"Transcript ID: {transcript.id}")
16 return transcript

Create the get_speaker_labelsfunction, this will handle the speaker diarization model processing to generate the custom speaker labels for the transcript.

Firstly, it initializes and applies the pipeline to the audio file.

Secondly, it processes the diarization results and converts the speaker segments into a DataFrame so we can compare the results with the transcript.

Lastly, the speaker segments are compared and assigned to the words and sentences of the transcript to create the speaker labelled transcript.

1def get_speaker_labels(audio_file, transcript: aai.Transcript):
2 """
3 Perform speaker diarization on an audio file and combine results with the transcript.
4
5 Args:
6 audio_file (str): Path to the audio file.
7 transcript (aai.Transcript): The transcription result from AssemblyAI.
8
9 Returns:
10 str: A formatted string containing the transcript with speaker labels and timestamps.
11 """
12 # Initialize the speaker diarization pipeline with GPU support
13 device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
14 pipeline = Pipeline.from_pretrained(
15 "pyannote/speaker-diarization",
16 use_auth_token=HUGGING_FACE_TOKEN,
17 )
18
19 if pipeline is None:
20 raise ValueError("Failed to initialize the pipeline. Please check your authentication token and internet connection.")
21 else:
22 pipeline = pipeline.to(device)
23
24 # Apply the pipeline to the audio file
25 diarization = pipeline(audio_file)
26
27 # Create a dictionary to store speaker segments
28 speaker_segments = {}
29
30 # Process diarization results
31 for turn, _, speaker in diarization.itertracks(yield_label=True):
32 start, end = turn.start, turn.end
33 if speaker not in speaker_segments:
34 speaker_segments[speaker] = []
35 speaker_segments[speaker].append((start, end))
36
37 # Convert speaker_segments to a DataFrame
38 diarize_df = pd.DataFrame([(speaker, start, end)
39 for speaker, segments in speaker_segments.items()
40 for start, end in segments],
41 columns=['speaker', 'start', 'end'])
42
43 # Assign speakers to transcript words
44 for word in transcript.words:
45 word_start = float(word.start) / 1000
46 word_end = float(word.end) / 1000
47
48 overlaps = diarize_df[
49 (diarize_df['start'] <= word_end) & (diarize_df['end'] >= word_start)
50 ].copy()
51
52 if not overlaps.empty:
53 overlaps['overlap'] = np.minimum(overlaps['end'], word_end) - np.maximum(overlaps['start'], word_start)
54 word.speaker = overlaps.loc[overlaps['overlap'].idxmax(), 'speaker']
55 else:
56 word.speaker = "Unknown"
57
58 full_transcript = ''
59
60 # Update segment speakers based on the majority speaker of its words
61 for segment in transcript.get_sentences():
62 segment_start = float(segment.start) / 1000
63 segment_end = float(segment.end) / 1000
64
65 overlaps = diarize_df[
66 (diarize_df['start'] <= segment_end) & (diarize_df['end'] >= segment_start)
67 ].copy()
68
69 if not overlaps.empty:
70 overlaps['overlap'] = np.minimum(overlaps['end'], segment_end) - np.maximum(overlaps['start'], segment_start)
71 segment.speaker = overlaps.loc[overlaps['overlap'].idxmax(), 'speaker']
72 speaker_label = segment.speaker.replace('SPEAKER_', 'SPEAKER ')
73 full_transcript += f'[{format_timestamp(segment_start)}] {speaker_label}: {segment.text}\n'
74 else:
75 segment.speaker = "Unknown"
76 full_transcript += f'[{format_timestamp(segment_start)}] Unknown: {segment.text}\n'
77
78 return full_transcript

If you know the number of speakers in advance, you can use the num_speakers parameter to set the number of speakers:

1# Apply the pipeline to the audio file
2diarization = pipeline(audio_file, num_speakers=4)

You can also provide upper/lower bands on the number of speakers using the min_speakers and max_speakers parameters:

1# Apply the pipeline to the audio file
2diarization = pipeline(audio_file, min_speakers=2, max_speakers=5)

Create the format_timestamp, this will handle the timestamps conversion to improve the readability of the final speaker labelled transcript.

1def format_timestamp(seconds):
2 """
3 Convert seconds to a formatted timestamp string (HH:MM:SS).
4
5 Args:
6 seconds (float): Time in seconds.
7
8 Returns:
9 str: Formatted timestamp string.
10 """
11 hours, remainder = divmod(int(seconds), 3600)
12 minutes, seconds = divmod(remainder, 60)
13 return f"{hours:02d}:{minutes:02d}:{seconds:02d}"

Finally, select a local file and call the functions to generate and print your custom Speaker Labelled transcript.

1audio_file = "audio.wav" # your local file path
2transcript: aai.Transcript = transcribe_audio(audio_file, language="hr") # select a language code
3transcript_with_speakers = get_speaker_labels(audio_file, transcript)
4print(transcript_with_speakers)

Here’s an example speaker labelled output from a Croatian file:

[00:00:05] SPEAKER 04: Nalazimo se u Centro Zagreba, u parku Zrinjevac, gdje je kao što vidite jako ljepo, vreme je prekrasno, a danas ćemo ljude pitati što im se sviđa u Zagrebu ili što im se možda ne sviđa u Zagrebu.
[00:00:42] SPEAKER 04: Dobar dan, može jednokratko pitanje samo.
[00:00:46] SPEAKER 04: Može?
[00:00:48] SPEAKER 04: Evo lako, što vam se najviše sviđa u Zagrebu?
[00:00:50] SPEAKER 07: Što mi se najviše sviđa u Zagrebu?
[00:00:53] SPEAKER 07: E sad, teško pitanje, ali trenutno mi se najviše sviđa što nije klasična jesen, nego više prođeče u zraku.
[00:01:06] SPEAKER 07: Dobre.
[00:01:09] SPEAKER 07: Može sigurnost još uvijek s osišam sigurno u Zagrebu.
[00:01:13] SPEAKER 04: I po noći?
[00:01:15] SPEAKER 07: Pa po noći ne šetam baš toliko po noći, ali centar grada mi je dosta siguran, osvijetljen i to mi je okej.