import pyaudio
import websocket
import json
import threading
import time
from urllib.parse import urlencode
YOUR_API_KEY = "<YOUR_API_KEY>"
CONNECTION_PARAMS = {
"sample_rate": 16000,
"speech_model": "u3-rt-pro", # or "universal-streaming-english", "universal-streaming-multilingual"
"min_turn_silence": 100,
"max_turn_silence": 1000,
# "format_turns": True, # Whether to return formatted final transcripts (not applicable to u3-rt-pro)
}
API_ENDPOINT_BASE_URL = "wss://streaming.assemblyai.com/v3/ws"
API_ENDPOINT = f"{API_ENDPOINT_BASE_URL}?{urlencode(CONNECTION_PARAMS)}"
FRAMES_PER_BUFFER = 800
SAMPLE_RATE = CONNECTION_PARAMS["sample_rate"]
CHANNELS = 1
FORMAT = pyaudio.paInt16
audio = None
stream = None
ws_app = None
audio_thread = None
stop_event = threading.Event()
def on_open(ws):
print("WebSocket connection opened.")
def stream_audio():
global stream
while not stop_event.is_set():
try:
audio_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False)
ws.send(audio_data, websocket.ABNF.OPCODE_BINARY)
except Exception as e:
print(f"Error streaming audio: {e}")
break
global audio_thread
audio_thread = threading.Thread(target=stream_audio)
audio_thread.daemon = True
audio_thread.start()
def on_message(ws, message):
try:
data = json.loads(message)
msg_type = data.get("type")
if msg_type == "Begin":
print(f"Session began: ID={data.get('id')}")
elif msg_type == "Turn":
transcript = data.get("transcript", "")
end_of_turn = data.get("end_of_turn", False)
if end_of_turn:
print(f"\r{' ' * 80}\r{transcript}")
else:
print(f"\r{transcript}", end="")
elif msg_type == "Termination":
print(f"\nSession terminated: {data.get('audio_duration_seconds', 0)}s of audio")
except Exception as e:
print(f"Error handling message: {e}")
def on_error(ws, error):
print(f"\nWebSocket Error: {error}")
stop_event.set()
def on_close(ws, close_status_code, close_msg):
print(f"\nWebSocket Disconnected: Status={close_status_code}")
global stream, audio
stop_event.set()
if stream:
if stream.is_active():
stream.stop_stream()
stream.close()
if audio:
audio.terminate()
def run():
global audio, stream, ws_app
audio = pyaudio.PyAudio()
stream = audio.open(
input=True,
frames_per_buffer=FRAMES_PER_BUFFER,
channels=CHANNELS,
format=FORMAT,
rate=SAMPLE_RATE,
)
print("Speak into your microphone. Press Ctrl+C to stop.")
ws_app = websocket.WebSocketApp(
API_ENDPOINT,
header={"Authorization": YOUR_API_KEY},
on_open=on_open,
on_message=on_message,
on_error=on_error,
on_close=on_close,
)
ws_thread = threading.Thread(target=ws_app.run_forever)
ws_thread.daemon = True
ws_thread.start()
try:
while ws_thread.is_alive():
time.sleep(0.1)
except KeyboardInterrupt:
print("\nStopping...")
stop_event.set()
if ws_app and ws_app.sock and ws_app.sock.connected:
ws_app.send(json.dumps({"type": "Terminate"}))
time.sleep(2)
if ws_app:
ws_app.close()
ws_thread.join(timeout=2.0)
if __name__ == "__main__":
run()