Skip to main content
This guide walks through the process of migrating from Speechmatics to AssemblyAI for streaming Speech-to-text.

Get started

Before we begin, make sure you have an AssemblyAI account and an API key. You can sign up for a free account and get your API key from your dashboard.

Side-by-side code comparison

Below is a side-by-side comparison of a basic Python code snippet to transcribe streaming audio by Speechmatics and AssemblyAI.
import pyaudio
import websocket
import json
import threading
import time

# --- Configuration ---

YOUR_API_KEY = "YOUR-API-KEY" # Replace with your actual API key

CONNECTION_PARAMS = {
    "language": "en",
    "enable_partials": True,
    "max_delay": 2.0
}
API_ENDPOINT = "wss://eu2.rt.speechmatics.com/v2/en"

# Audio Configuration

FRAMES_PER_BUFFER = 1024 # Chunk size
SAMPLE_RATE = None # Will be set based on device capabilities
CHANNELS = 1
FORMAT = pyaudio.paFloat32 # Speechmatics uses float32 format

# Global variables for audio stream and websocket

audio = None
stream = None
ws_app = None
audio_thread = None
stop_event = threading.Event() # To signal the audio thread to stop
audio_seq_no = 0 # Track number of audio chunks sent

# --- WebSocket Event Handlers ---

def on_open(ws):
    """Called when the WebSocket connection is established."""
    print("WebSocket connection opened.")
    print(f"Connected to: {API_ENDPOINT}")

    # Send StartRecognition message
    start_message = {
        "message": "StartRecognition",
        "audio_format": {
            "type": "raw",
            "encoding": "pcm_f32le",
            "sample_rate": SAMPLE_RATE
        },
        "transcription_config": {
            "language": CONNECTION_PARAMS["language"],
            "enable_partials": CONNECTION_PARAMS["enable_partials"],
            "max_delay": CONNECTION_PARAMS["max_delay"]
        }
    }
    ws.send(json.dumps(start_message))

def on_message(ws, message):
    global audio_seq_no

    try:
        data = json.loads(message)
        msg_type = data.get('message')

        if msg_type == "RecognitionStarted":
            session_id = data.get('id')
            print(f"\nSession began: ID={session_id}")

            # Start sending audio data in a separate thread
            def stream_audio():
                global audio_seq_no, stream
                print("Starting audio streaming...")
                while not stop_event.is_set():
                    try:
                        audio_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False)
                        # Send audio data as binary message
                        ws.send(audio_data, websocket.ABNF.OPCODE_BINARY)
                        audio_seq_no += 1
                    except Exception as e:
                        print(f"Error streaming audio: {e}")
                        # If stream read fails, likely means it's closed, stop the loop
                        break
                print("Audio streaming stopped.")

            global audio_thread
            audio_thread = threading.Thread(target=stream_audio)
            audio_thread.daemon = (
                True  # Allow main thread to exit even if this thread is running
            )
            audio_thread.start()

        elif msg_type == "AddPartialTranscript":
            transcript = data.get('metadata', {}).get('transcript', '')
            if transcript:
                print(f"\r{transcript}", end='')

        elif msg_type == "AddTranscript":
            transcript = data.get('metadata', {}).get('transcript', '')
            if transcript:
                # Clear previous line for final messages
                print('\r' + ' ' * 80 + '\r', end='')
                print(transcript)

        elif msg_type == "EndOfTranscript":
            print("\nSession Terminated: Transcription complete")

        elif msg_type == "Error":
            error_type = data.get('type')
            reason = data.get('reason')
            print(f"\nWebSocket Error: {error_type} - {reason}")
            stop_event.set()

    except json.JSONDecodeError as e:
        print(f"Error decoding message: {e}")
    except Exception as e:
        print(f"Error handling message: {e}")

def on_error(ws, error):
    """Called when a WebSocket error occurs."""
    print(f"\nWebSocket Error: {error}")  # Attempt to signal stop on error
    stop_event.set()

def on_close(ws, close_status_code, close_msg):
    """Called when the WebSocket connection is closed."""
    print(f"\nWebSocket Disconnected: Status={close_status_code}, Msg={close_msg}")
    # Ensure audio resources are released
    global stream, audio
    stop_event.set()  # Signal audio thread just in case it's still running

    if stream:
        if stream.is_active():
            stream.stop_stream()
        stream.close()
        stream = None
    if audio:
        audio.terminate()
        audio = None
    # Try to join the audio thread to ensure clean exit
    if audio_thread and audio_thread.is_alive():
        audio_thread.join(timeout=1.0)

# --- Main Execution ---

def run():
    global audio, stream, ws_app, SAMPLE_RATE

    # Initialize PyAudio
    audio = pyaudio.PyAudio()

    # Get default input device (can alter to specify specific device)
    default_device = audio.get_default_input_device_info()
    device_index = default_device['index']
    SAMPLE_RATE = int(audio.get_device_info_by_index(device_index)['defaultSampleRate'])

    print(f"Using microphone: {default_device['name']}")

    # Open microphone stream
    try:
        stream = audio.open(
            input=True,
            frames_per_buffer=FRAMES_PER_BUFFER,
            channels=CHANNELS,
            format=FORMAT,
            rate=SAMPLE_RATE,
            input_device_index=device_index
        )
        print("Microphone stream opened successfully.")
        print("Speak into your microphone. Press Ctrl+C to stop.")
    except Exception as e:
        print(f"Error opening microphone stream: {e}")
        if audio:
            audio.terminate()
        return  # Exit if microphone cannot be opened

    # Create WebSocketApp
    ws_app = websocket.WebSocketApp(
        API_ENDPOINT,
        header={"Authorization": f"Bearer {YOUR_API_KEY}"},  # Speechmatics uses Bearer token
        on_open=on_open,
        on_message=on_message,
        on_error=on_error,
        on_close=on_close,
    )

    # Run WebSocketApp in a separate thread to allow main thread to catch KeyboardInterrupt
    ws_thread = threading.Thread(target=lambda: ws_app.run_forever(ping_interval=30, ping_timeout=10))
    ws_thread.daemon = True
    ws_thread.start()

    try:
        # Keep main thread alive until interrupted
        while ws_thread.is_alive():
            time.sleep(0.1)
    except KeyboardInterrupt:
        print("\nCtrl+C received. Stopping...")
        stop_event.set()  # Signal audio thread to stop

        # Send EndOfStream message to the server
        if ws_app and ws_app.sock and ws_app.sock.connected:
            try:
                end_message = {
                    "message": "EndOfStream",
                    "last_seq_no": audio_seq_no
                }
                print(f"Sending termination message: {json.dumps(end_message)}")
                ws_app.send(json.dumps(end_message))
                # Give a moment for messages to process before forceful close
                time.sleep(1)
            except Exception as e:
                print(f"Error sending termination message: {e}")

        # Close the WebSocket connection (will trigger on_close)
        if ws_app:
            ws_app.close()

        # Wait for WebSocket thread to finish
        ws_thread.join(timeout=2.0)

    except Exception as e:
        print(f"\nAn unexpected error occurred: {e}")
        stop_event.set()
        if ws_app:
            ws_app.close()
        ws_thread.join(timeout=2.0)

    finally:
        # Final cleanup (already handled in on_close, but good as a fallback)
        if stream and stream.is_active():
            stream.stop_stream()
        if stream:
            stream.close()
        if audio:
            audio.terminate()
        print("Cleanup complete. Exiting.")

if __name__ == "__main__":
    run()

Step 1: Install dependencies

1
Install the required Python packages.
pip install websocket-client pyaudio

Step 2: Configure the API key

In this step, you’ll configure your API key to authenticate your requests.
1
Navigate to API Keys in your account settings and copy your API key.
2
Store your API key in a variable. Replace <YOUR_API_KEY> with your copied API key.
import pyaudio
import websocket
import json
import threading
import time

YOUR_API_KEY = "YOUR-API-KEY"
3

Authenticate With A Temporary Token

import requests

def generate_temp_token(api_key, ttl=60):
    """Generate a temporary authentication token that expires after the specified time."""
    url = "https://mp.speechmatics.com/v1/api_keys?type=rt"
    headers = {
        "Content-Type": "application/json",
        "Authorization": f"Bearer {api_key}"
    }
    payload = {
        "ttl": ttl
    }

    response = requests.post(url, json=payload, headers=headers)
    data = response.json()
    return data.get("key_value")
Token usageInstead of authorizing your request with YOUR_API_KEY (via request header), use the temporary token generated by this function when establishing the WebSocket connection.
  API_ENDPOINT= f"wss://eu2.rt.speechmatics.com/v2?jwt={generate_temp_token(api_key)}"
  ws_app = websocket.WebSocketApp(
    API_ENDPOINT,
    on_open=on_open,
    on_message=on_message,
    on_error=on_error,
    on_close=on_close,
  )

Step 3: Set up audio configuration

1
Configure the audio settings for your microphone stream.
import pyaudio

# Audio Configuration
FRAMES_PER_BUFFER = 1024  # Chunk size
SAMPLE_RATE = None  # Will be set based on device capabilities
CHANNELS = 1
FORMAT = pyaudio.paFloat32  # Speechmatics uses float32 format

# Global variables for audio stream and websocket
audio = None
stream = None
ws_app = None
audio_thread = None
stop_event = threading.Event()  # To signal the audio thread to stop
audio_seq_no = 0  # Track number of audio chunks sent

def run():
  global audio, stream, ws_app, SAMPLE_RATE

  # Initialize PyAudio
  audio = pyaudio.PyAudio()

  # Get default input device (can alter to specify specific device)
  default_device = audio.get_default_input_device_info()
  device_index = default_device['index']
  SAMPLE_RATE = int(audio.get_device_info_by_index(device_index)['defaultSampleRate'])

  print(f"Using microphone: {default_device['name']}")

  # Open microphone stream
  try:
      stream = audio.open(
          input=True,
          frames_per_buffer=FRAMES_PER_BUFFER,
          channels=CHANNELS,
          format=FORMAT,
          rate=SAMPLE_RATE,
          input_device_index=device_index
      )
      print("Microphone stream opened successfully.")
      print("Speak into your microphone. Press Ctrl+C to stop.")
  except Exception as e:
      print(f"Error opening microphone stream: {e}")
      if audio:
          audio.terminate()
      return  # Exit if microphone cannot be opened
Sample rateSpeechmatics recommends using a 16 kHz sample rate for speech audio. Anything higher will be downsampled server-side.
Audio data formatIf you want to stream data from elsewhere, make sure that your audio data is in the following format:
  • Single-channel
  • PCM16 (default) or Mu-law encoding
  • A sample rate that matches the value of the sample_rate parameter (16 kHz is recommended)
  • 50 milliseconds of audio per message (larger chunk sizes are workable, but may result in latency fluctuations)

Step 4: Create event handlers

In this step, you’ll set up callback functions that handle the different events.
1
Create functions to handle the events from the real-time service.
import json

def on_open(ws):
    """Called when the WebSocket connection is established."""
    print("WebSocket connection opened.")
    print(f"Connected to: {API_ENDPOINT}")

    # Send StartRecognition message
    start_message = {
        "message": "StartRecognition",
        "audio_format": {
            "type": "raw",
            "encoding": "pcm_f32le",
            "sample_rate": SAMPLE_RATE
        },
        "transcription_config": {
            "language": CONNECTION_PARAMS["language"],
            "enable_partials": CONNECTION_PARAMS["enable_partials"],
            "max_delay": CONNECTION_PARAMS["max_delay"]
        }
    }
    ws.send(json.dumps(start_message))

def on_error(ws, error):
    """Called when a WebSocket error occurs."""
    print(f"\nWebSocket Error: {error}")
    # Attempt to signal stop on error
    stop_event.set()

def on_close(ws, close_status_code, close_msg):
    """Called when the WebSocket connection is closed."""
    print(f"\nWebSocket Disconnected: Status={close_status_code}, Msg={close_msg}")
    # Ensure audio resources are released
    global stream, audio
    stop_event.set()  # Signal audio thread just in case it's still running

    if stream:
        if stream.is_active():
            stream.stop_stream()
        stream.close()
        stream = None
    if audio:
        audio.terminate()
        audio = None
    # Try to join the audio thread to ensure clean exit
    if audio_thread and audio_thread.is_alive():
        audio_thread.join(timeout=1.0)
Connection configurationSpeechmatics requires a handshake where the connection configuration is specified before audio is streamed. AssemblyAI allows you to configure the connection via query parameters in the URL and start streaming audio immediately.The Speechmatics handshake begins when on_open sends a StartRecognition message to configure the session. Audio streaming only starts after the RecognitionStarted message type is parsed and confirmed in the on_message callback.
2
Create another function to handle transcripts.Speechmatics has separate partial (AddPartialTranscript) and final (AddTranscript) transcripts. The terminate session message is EndOfTranscript.AssemblyAI instead uses a Turn object with an end_of_turn boolean flag to indicate finality. The terminate session message is Termination. For more on the Turn object, see Streaming Core concepts section.
def on_message(ws, message):
    global audio_seq_no

    try:
        data = json.loads(message)
        msg_type = data.get('message')

        if msg_type == "RecognitionStarted":
            session_id = data.get('id')
            print(f"\nSession began: ID={session_id}")

            # Start sending audio data in a separate thread
            def stream_audio():
                global audio_seq_no, stream
                print("Starting audio streaming...")
                while not stop_event.is_set():
                    try:
                        audio_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False)
                        # Send audio data as binary message
                        ws.send(audio_data, websocket.ABNF.OPCODE_BINARY)
                        audio_seq_no += 1
                    except Exception as e:
                        print(f"Error streaming audio: {e}")
                        # If stream read fails, likely means it's closed, stop the loop
                        break
                print("Audio streaming stopped.")

            global audio_thread
            audio_thread = threading.Thread(target=stream_audio)
            audio_thread.daemon = (
                True  # Allow main thread to exit even if this thread is running
            )
            audio_thread.start()

        elif msg_type == "AddPartialTranscript":
            transcript = data.get('metadata', {}).get('transcript', '')
            if transcript:
                print(f"\r{transcript}", end='')

        elif msg_type == "AddTranscript":
            transcript = data.get('metadata', {}).get('transcript', '')
            if transcript:
                # Clear previous line for final messages
                print('\r' + ' ' * 80 + '\r', end='')
                print(transcript)

        elif msg_type == "EndOfTranscript":
            print("\nSession Terminated: Transcription complete")

        elif msg_type == "Error":
            error_type = data.get('type')
            reason = data.get('reason')
            print(f"\nWebSocket Error: {error_type} - {reason}")
            stop_event.set()

    except json.JSONDecodeError as e:
        print(f"Error decoding message: {e}")
    except Exception as e:
        print(f"Error handling message: {e}")
3
Transcript message structurePlease note the difference in transcript message structure below:
# Speechmatics
{
  "message": "AddPartialTranscript",
  "metadata": {
    "transcript": "hello world"
  },
  # Other transcript data...
}

# AssemblyAI
{
  "type": "Turn",
  "transcript": "hello world",
  "end_of_turn": false,
  # Other transcript data...
}

Step 5: Connect and start transcription

1
To stream audio, establish a connection to the API via WebSockets.
Create a WebSocket connection to the Realtime service.
def run():
    global audio, stream, ws_app, SAMPLE_RATE
    # Skipping audio/microphone setup code...

    # Create WebSocketApp
    ws_app = websocket.WebSocketApp(
        API_ENDPOINT,
        header={"Authorization": f"Bearer {YOUR_API_KEY}"},  # Speechmatics uses Bearer token
        on_open=on_open,
        on_message=on_message,
        on_error=on_error,
        on_close=on_close,
    )

    # Run WebSocketApp in a separate thread to allow main thread to catch KeyboardInterrupt
    ws_thread = threading.Thread(target=lambda: ws_app.run_forever(ping_interval=30, ping_timeout=10))
    ws_thread.daemon = True
    ws_thread.start()

**Authorization **Note that while both services use an Authorization header to authenticate the WebSocket connection, Speechmatics uses a Bearer prefix, while AssemblyAI does not.

Step 6: Close the connection

1
Keep the main thread alive until interrupted, handle keyboard interrupts and thrown exceptions, and clean up upon closing of the WebSocket connection.
def run():
    global audio, stream, ws_app, SAMPLE_RATE
    # Skipping audio/microphone setup and WebSocket connection code...

    try:
        # Keep main thread alive until interrupted
        while ws_thread.is_alive():
            time.sleep(0.1)
    except KeyboardInterrupt:
        print("\nCtrl+C received. Stopping...")
        stop_event.set()  # Signal audio thread to stop

        # Send EndOfStream message to the server
        if ws_app and ws_app.sock and ws_app.sock.connected:
            try:
                end_message = {
                    "message": "EndOfStream",
                    "last_seq_no": audio_seq_no
                }
                print(f"Sending termination message: {json.dumps(end_message)}")
                ws_app.send(json.dumps(end_message))
                # Give a moment for messages to process before forceful close
                time.sleep(1)
            except Exception as e:
                print(f"Error sending termination message: {e}")

        # Close the WebSocket connection (will trigger on_close)
        if ws_app:
            ws_app.close()

        # Wait for WebSocket thread to finish
        ws_thread.join(timeout=2.0)

    except Exception as e:
        print(f"\nAn unexpected error occurred: {e}")
        stop_event.set()
        if ws_app:
            ws_app.close()
        ws_thread.join(timeout=2.0)

    finally:
        # Final cleanup (already handled in on_close, but good as a fallback)
        if stream and stream.is_active():
            stream.stop_stream()
        if stream:
            stream.close()
        if audio:
            audio.terminate()
        print("Cleanup complete. Exiting.")
The connection will close automatically when you press Ctrl+C. In both cases, the on_close handler will clean up the audio resources.

Step 7: Execute the main function

Finally, run the main function to start the main execution.
if __name__ == "__main__":
    run()

Next steps

To learn more about both Streaming APIs, their key differences, and how to best migrate, see the following resources: AssemblyAI Speechmatics

Need some help?

If you get stuck or have any other questions, contact our support team at support@assemblyai.com or create a support ticket.