Skip to main content

Documentation Index

Fetch the complete documentation index at: https://assemblyai.com/docs/llms.txt

Use this file to discover all available pages before exploring further.

In this guide, you’ll learn how to use LLM Gateway with AssemblyAI’s Streaming API. This script accumulates transcribed text in the on_message function using a global conversation_data (Python) / conversationData (JavaScript) variable. Once the transcription session is closed, the accumulated transcript is sent to LLM Gateway for analysis.

Quickstart

import pyaudio
import websocket
import json
import threading
import time
import requests
from urllib.parse import urlencode

YOUR_API_KEY = "YOUR_API_KEY"  # Replace with your actual API key

CONNECTION_PARAMS = {
    "sample_rate": 16000,
    "speech_model": "u3-rt-pro",
}
API_ENDPOINT_BASE_URL = "wss://streaming.assemblyai.com/v3/ws"
API_ENDPOINT = f"{API_ENDPOINT_BASE_URL}?{urlencode(CONNECTION_PARAMS)}"

FRAMES_PER_BUFFER = 800
SAMPLE_RATE = CONNECTION_PARAMS["sample_rate"]
CHANNELS = 1
FORMAT = pyaudio.paInt16

audio = None
stream = None
ws_app = None
audio_thread = None
stop_event = threading.Event()
conversation_data = ""

def analyze_with_llm_gateway(text):
    """Called when the WebSocket connection is closing and the transcript text is sent to LLM Gateway to be analyzed."""
    headers = {
        "authorization": YOUR_API_KEY,
        "content-type": "application/json"
    }

    prompt = "You are a helpful coach. Provide an analysis of the transcript and offer areas to improve with exact quotes. Include no preamble. Start with an overall summary then get into the examples with feedback."

    llm_gateway_data = {
        "model": "claude-sonnet-4-20250514",
        "messages": [
            {"role": "user", "content": f"{prompt}\n\nTranscript: {text}"}
        ],
        "max_tokens": 4000
    }

    result = requests.post(
        "https://docs/llm-gateway.assemblyai.com/v1/chat/completions",
        headers=headers,
        json=llm_gateway_data
    )
    return result.json()["choices"][0]["message"]["content"]

def on_open(ws):
    print("WebSocket connection opened.")
    def stream_audio():
        global stream
        while not stop_event.is_set():
            try:
                audio_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False)
                ws.send(audio_data, websocket.ABNF.OPCODE_BINARY)
            except Exception as e:
                print(f"Error streaming audio: {e}")
                break

    global audio_thread
    audio_thread = threading.Thread(target=stream_audio)
    audio_thread.daemon = True
    audio_thread.start()

def on_message(ws, message):
    try:
        data = json.loads(message)
        msg_type = data.get("type")

        if msg_type == "Begin":
            print(f"Session began: ID={data.get('id')}")
        elif msg_type == "Turn":
            transcript = data.get("transcript", "")
            if data.get("end_of_turn"):
                global conversation_data
                print(f"\r{' ' * 80}\r{transcript}")
                conversation_data += f"{transcript}\n"
            else:
                print(f"\r{transcript}", end="")
        elif msg_type == "Termination":
            print(f"\nSession terminated: {data.get('audio_duration_seconds', 0)}s of audio")
    except Exception as e:
        print(f"Error handling message: {e}")

def on_error(ws, error):
    print(f"\nWebSocket Error: {error}")
    stop_event.set()

def on_close(ws, close_status_code, close_msg):
    print(f"\nWebSocket Disconnected: Status={close_status_code}")
    global stream, audio
    stop_event.set()
    if stream:
        if stream.is_active():
            stream.stop_stream()
        stream.close()
    if audio:
        audio.terminate()

def run():
    global audio, stream, ws_app

    audio = pyaudio.PyAudio()
    stream = audio.open(
        input=True,
        frames_per_buffer=FRAMES_PER_BUFFER,
        channels=CHANNELS,
        format=FORMAT,
        rate=SAMPLE_RATE,
    )
    print("Speak into your microphone. Press Ctrl+C to stop.")

    ws_app = websocket.WebSocketApp(
        API_ENDPOINT,
        header={"Authorization": YOUR_API_KEY},
        on_open=on_open,
        on_message=on_message,
        on_error=on_error,
        on_close=on_close,
    )

    ws_thread = threading.Thread(target=ws_app.run_forever)
    ws_thread.daemon = True
    ws_thread.start()

    try:
        while ws_thread.is_alive():
            time.sleep(0.1)
    except KeyboardInterrupt:
        print("\nStopping...")
        stop_event.set()
        if ws_app and ws_app.sock and ws_app.sock.connected:
            ws_app.send(json.dumps({"type": "Terminate"}))
            time.sleep(2)
        if ws_app:
            ws_app.close()
        ws_thread.join(timeout=2.0)

        if conversation_data.strip():
            print("Analyzing conversation with LLM Gateway...")
            print(analyze_with_llm_gateway(conversation_data))
        else:
            print("No conversation data to analyze.")

if __name__ == "__main__":
    run()

Step-by-Step Instructions

Before we begin, make sure you have an AssemblyAI account and an API key. You can sign up and get your API key from your dashboard.

Install Dependencies

pip install websocket-client pyaudio requests

Import Packages & Set API Key

import pyaudio
import websocket
import json
import threading
import time
import requests
from urllib.parse import urlencode

YOUR_API_KEY = "YOUR_API_KEY"  # Replace with your actual API key

Audio Configuration & Global Variables

Set all of your audio configurations and global variables. Initialize the conversation_data / conversationData variable as an empty string to accumulate final transcripts.
CONNECTION_PARAMS = {
    "sample_rate": 16000,
    "speech_model": "u3-rt-pro",
}
API_ENDPOINT_BASE_URL = "wss://streaming.assemblyai.com/v3/ws"
API_ENDPOINT = f"{API_ENDPOINT_BASE_URL}?{urlencode(CONNECTION_PARAMS)}"

FRAMES_PER_BUFFER = 800
SAMPLE_RATE = CONNECTION_PARAMS["sample_rate"]
CHANNELS = 1
FORMAT = pyaudio.paInt16

audio = None
stream = None
ws_app = None
audio_thread = None
stop_event = threading.Event()
conversation_data = ""

Define Analyze With LLM Gateway Function

Define a function called analyze_with_llm_gateway (Python) or analyzeWithLlmGateway (JavaScript), which uses LLM Gateway to analyze the complete final transcript text. The prompt can be modified to suit your individual requirements.
def analyze_with_llm_gateway(text):
    """Called when the WebSocket connection is closing and the transcript text is sent to LLM Gateway to be analyzed."""
    headers = {
        "authorization": YOUR_API_KEY,
        "content-type": "application/json"
    }

    prompt = "You are a helpful coach. Provide an analysis of the transcript and offer areas to improve with exact quotes. Include no preamble. Start with an overall summary then get into the examples with feedback."

    llm_gateway_data = {
        "model": "claude-sonnet-4-20250514",
        "messages": [
            {"role": "user", "content": f"{prompt}\n\nTranscript: {text}"}
        ],
        "max_tokens": 4000
    }

    result = requests.post(
        "https://docs/llm-gateway.assemblyai.com/v1/chat/completions",
        headers=headers,
        json=llm_gateway_data
    )
    return result.json()["choices"][0]["message"]["content"]

Websocket Event Handlers

Open Websocket

def on_open(ws):
    print("WebSocket connection opened.")
    def stream_audio():
        global stream
        while not stop_event.is_set():
            try:
                audio_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False)
                ws.send(audio_data, websocket.ABNF.OPCODE_BINARY)
            except Exception as e:
                print(f"Error streaming audio: {e}")
                break

    global audio_thread
    audio_thread = threading.Thread(target=stream_audio)
    audio_thread.daemon = True
    audio_thread.start()

Handle Websocket Messages

In this function, use the previously defined conversation_data / conversationData to store all final transcripts together for later analysis.
def on_message(ws, message):
    try:
        data = json.loads(message)
        msg_type = data.get("type")

        if msg_type == "Begin":
            print(f"Session began: ID={data.get('id')}")
        elif msg_type == "Turn":
            transcript = data.get("transcript", "")
            if data.get("end_of_turn"):
                global conversation_data
                print(f"\r{' ' * 80}\r{transcript}")
                conversation_data += f"{transcript}\n"
            else:
                print(f"\r{transcript}", end="")
        elif msg_type == "Termination":
            print(f"\nSession terminated: {data.get('audio_duration_seconds', 0)}s of audio")
    except Exception as e:
        print(f"Error handling message: {e}")

Close Websocket

def on_close(ws, close_status_code, close_msg):
    print(f"\nWebSocket Disconnected: Status={close_status_code}")
    global stream, audio
    stop_event.set()
    if stream:
        if stream.is_active():
            stream.stop_stream()
        stream.close()
    if audio:
        audio.terminate()

Websocket Error Handling

def on_error(ws, error):
    print(f"\nWebSocket Error: {error}")
    stop_event.set()

Begin Streaming STT Transcription

After the socket is closed, conversation_data / conversationData is sent to the analyze_with_llm_gateway / analyzeWithLlmGateway function and the LLM Gateway results are printed out.
def run():
    global audio, stream, ws_app

    audio = pyaudio.PyAudio()
    stream = audio.open(
        input=True,
        frames_per_buffer=FRAMES_PER_BUFFER,
        channels=CHANNELS,
        format=FORMAT,
        rate=SAMPLE_RATE,
    )
    print("Speak into your microphone. Press Ctrl+C to stop.")

    ws_app = websocket.WebSocketApp(
        API_ENDPOINT,
        header={"Authorization": YOUR_API_KEY},
        on_open=on_open,
        on_message=on_message,
        on_error=on_error,
        on_close=on_close,
    )

    ws_thread = threading.Thread(target=ws_app.run_forever)
    ws_thread.daemon = True
    ws_thread.start()

    try:
        while ws_thread.is_alive():
            time.sleep(0.1)
    except KeyboardInterrupt:
        print("\nStopping...")
        stop_event.set()
        if ws_app and ws_app.sock and ws_app.sock.connected:
            ws_app.send(json.dumps({"type": "Terminate"}))
            time.sleep(2)
        if ws_app:
            ws_app.close()
        ws_thread.join(timeout=2.0)

        if conversation_data.strip():
            print("Analyzing conversation with LLM Gateway...")
            print(analyze_with_llm_gateway(conversation_data))
        else:
            print("No conversation data to analyze.")

if __name__ == "__main__":
    run()