Build a Real-Time Medical Scribe
Build a Real-Time Medical Scribe
This example implements a real-time medical scribe using Universal-3 Pro Streaming with LLM Gateway post-processing. It uses Medical Mode to improve accuracy for clinical terminology, streams audio from a microphone, applies LLM-powered clinical editing on each turn, and generates a SOAP note at the end of the session.
For post-visit documentation using pre-recorded audio, see the Post-Visit Medical Scribe guide instead.
1 import os 2 import json 3 import time 4 import threading 5 from datetime import datetime 6 from urllib.parse import urlencode 7 8 import pyaudio 9 import websocket 10 import requests 11 from dotenv import load_dotenv 12 from simple_term_menu import TerminalMenu 13 14 # Load environment variables from .env if present 15 try: 16 load_dotenv() 17 except Exception: 18 pass 19 20 """ 21 Medical Scribe – Streaming STT + LLM Gateway Enhancement (SOAP-ready) 22 23 What this does 24 -------------- 25 1) Streams mic audio to AssemblyAI Streaming STT 26 2) On every utterance or end of turn, calls AssemblyAI LLM Gateway to 27 apply *medical* edits (terminology, punctuation, proper nouns, etc.) 28 3) Logs encounter turns and generates a SOAP note at session end via the Gateway 29 30 Quick start 31 ----------- 32 export ASSEMBLYAI_API_KEY=your_key 33 python medical_scribe_llm_gateway.py 34 """ 35 36 # === Config === 37 ASSEMBLYAI_API_KEY = os.environ.get("ASSEMBLYAI_API_KEY", "your_api_key_here") 38 39 # WebSocket / STT parameters - CONSERVATIVE SETTINGS FOR MEDICAL 40 CONNECTION_PARAMS = { 41 "sample_rate": 16000, 42 "speech_model": "u3-rt-pro", 43 "domain": "medical-v1", # Enable Medical Mode for clinical terminology accuracy 44 45 # MEDICAL SCRIBE CONFIGURATION - Conservative for clinical accuracy 46 # Medical conversations have LONG pauses (provider thinking, examining patient, reviewing charts) 47 # u3-rt-pro defaults: min_turn_silence=100ms, max_turn_silence=1000ms 48 "min_turn_silence": 800, # Wait much longer (vs ~100ms for voice agents, 560ms for meetings) 49 "max_turn_silence": 2000, # Longer for clinical thinking pauses 50 } 51 52 API_ENDPOINT_BASE_URL = "wss://streaming.assemblyai.com/v3/ws" 53 API_ENDPOINT = f"{API_ENDPOINT_BASE_URL}?{urlencode(CONNECTION_PARAMS)}" 54 55 # Audio config 56 FRAMES_PER_BUFFER = 800 # 50ms @ 16kHz 57 SAMPLE_RATE = CONNECTION_PARAMS["sample_rate"] 58 CHANNELS = 1 59 FORMAT = pyaudio.paInt16 60 61 # Globals 62 audio = None 63 stream = None 64 ws_app = None 65 audio_thread = None 66 stop_event = threading.Event() 67 encounter_buffer = [] # list of dicts with turn data 68 last_processed_turn = None 69 70 # === Model selection === 71 AVAILABLE_MODELS = [ 72 {"id": "claude-haiku-4-5-20251001", "name": "Claude Haiku 4.5", "description": "Fastest Claude, good for simple tasks"}, 73 {"id": "claude-sonnet-4-20250514", "name": "Claude Sonnet 4", "description": "Balanced speed & intelligence"}, 74 {"id": "claude-sonnet-4-5-20250929", "name": "Claude Sonnet 4.5", "description": "Best for coding & agents"}, 75 {"id": "claude-sonnet-4-6", "name": "Claude Sonnet 4.6", "description": "Latest Sonnet, fast with strong reasoning"}, 76 {"id": "claude-opus-4-20250514", "name": "Claude Opus 4", "description": "Most powerful, deep reasoning"}, 77 ] 78 79 def select_model(): 80 menu_entries = [f"{m['name']} - {m['description']}" for m in AVAILABLE_MODELS] 81 terminal_menu = TerminalMenu( 82 menu_entries, 83 title="Select a model (Use ↑↓ arrows, Enter to select):", 84 menu_cursor="❯ ", 85 menu_cursor_style=("fg_cyan", "bold"), 86 menu_highlight_style=("bg_cyan", "fg_black"), 87 cycle_cursor=True, 88 clear_screen=False, 89 show_search_hint=True, 90 ) 91 idx = terminal_menu.show() 92 if idx is None: 93 print("Model selection cancelled. Exiting...") 94 raise SystemExit(0) 95 return AVAILABLE_MODELS[idx]["id"] 96 97 selected_model = None 98 99 # === Gateway helpers === 100 101 def _gateway_chat(messages, max_tokens=800, temperature=0.2, retries=3, backoff=0.75): 102 """Call AssemblyAI LLM Gateway with debug logging and retry.""" 103 url = "https://llm-gateway.assemblyai.com/v1/chat/completions" 104 headers = { 105 "Authorization": ASSEMBLYAI_API_KEY, 106 "Content-Type": "application/json", 107 } 108 payload = { 109 "model": selected_model, 110 "messages": messages, 111 "max_tokens": max_tokens, 112 "temperature": temperature, 113 } 114 115 last = None 116 for attempt in range(retries): 117 try: 118 print(f"[LLM] POST {url} (model={selected_model}, attempt {attempt+1}/{retries})") 119 resp = requests.post(url, headers=headers, json=payload, timeout=60) 120 print(f"[LLM] ← status {resp.status_code}, bytes {len(resp.content)}") 121 last = resp 122 except Exception as e: 123 if attempt == retries - 1: 124 raise RuntimeError(f"Gateway request error: {e}") 125 time.sleep(backoff * (attempt + 1)) 126 continue 127 128 if resp.status_code == 200: 129 data = resp.json() 130 if not data.get("choices") or not data["choices"][0].get("message"): 131 raise RuntimeError(f"Gateway OK but empty body: {str(data)[:200]}") 132 return data 133 if resp.status_code in (429, 500, 502, 503, 504): 134 print(f"[LLM RETRY] {resp.status_code}: {resp.text[:180]}") 135 time.sleep(backoff * (attempt + 1)) 136 continue 137 raise RuntimeError(f"Gateway error {resp.status_code}: {resp.text[:300]}") 138 139 raise RuntimeError( 140 f"Gateway failed after retries. Last={getattr(last,'status_code','n/a')} {getattr(last,'text','')[:180]}" 141 ) 142 143 144 def post_process_with_llm(text: str) -> str: 145 """Medical editing & normalization using LLM Gateway.""" 146 system = { 147 "role": "system", 148 "content": ( 149 "You are a clinical transcription editor. Keep the speaker's words, " 150 "fix medical terminology (drug names, dosages, anatomy), proper nouns, " 151 "and punctuation for readability. Preserve meaning and avoid inventing " 152 "details. Prefer U.S. clinical style. If a medication or condition is " 153 "phonetically close, correct to the most likely clinical term." 154 ), 155 } 156 157 user = { 158 "role": "user", 159 "content": ( 160 "Edit this short transcript for medical accuracy and readability.\n\n" 161 f"Transcript:\n{text}" 162 ), 163 } 164 165 try: 166 res = _gateway_chat([system, user], max_tokens=600) 167 return res["choices"][0]["message"]["content"].strip() 168 except Exception as e: 169 print(f"[LLM EDIT ERROR] {e}. Falling back to original.") 170 return text 171 172 173 def generate_clinical_note(): 174 """Create a SOAP note from the encounter buffer via Gateway.""" 175 if not encounter_buffer: 176 print("No encounter data to summarize.") 177 return 178 179 print("\n=== GENERATING CLINICAL DOCUMENTATION (SOAP) ===") 180 # Build a compact transcript string for the LLM 181 lines = [] 182 for e in encounter_buffer: 183 if e.get("type") == "utterance": 184 lines.append(f"[{e['timestamp']}] {e.get('speaker', 'Speaker')}: {e['text']}") 185 elif e.get("type") == "final": 186 lines.append(f"[{e['timestamp']}] FINAL: {e['text']}") 187 combined = "\n".join(lines) 188 189 system = { 190 "role": "system", 191 "content": ( 192 "You are a clinician generating concise, structured notes. " 193 "Produce a SOAP note (Subjective, Objective, Assessment, Plan). " 194 "Use bullet points, keep it factual, infer reasonable clinical semantics " 195 "from the transcript but do NOT invent data. Include medications with dosage " 196 "and frequency if mentioned." 197 ), 198 } 199 user = { 200 "role": "user", 201 "content": ( 202 "Create a SOAP note from this clinical encounter transcript.\n\n" 203 f"Transcript:\n{combined}\n\n" 204 "Format strictly as:\n" 205 "Subjective:\n- ...\n\nObjective:\n- ...\n\nAssessment:\n- ...\n\nPlan:\n- ...\n" 206 ), 207 } 208 209 try: 210 res = _gateway_chat([system, user], max_tokens=1200) 211 soap = res["choices"][0]["message"]["content"].strip() 212 fname = f"clinical_note_soap_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt" 213 with open(fname, "w", encoding="utf-8") as f: 214 f.write(soap) 215 print(f"SOAP note saved: {fname}") 216 except Exception as e: 217 print(f"[SOAP ERROR] {e}") 218 219 220 # === WebSocket callbacks === 221 222 def on_open(ws): 223 print("=" * 80) 224 print(f"[{datetime.now().strftime('%H:%M:%S')}] Medical transcription started") 225 print(f"Connected to: {API_ENDPOINT_BASE_URL}") 226 print(f"Gateway model: {selected_model}") 227 print("=" * 80) 228 print("\nSpeak to begin. Press Ctrl+C to stop.\n") 229 230 def stream_audio(): 231 global stream 232 while not stop_event.is_set(): 233 try: 234 audio_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False) 235 ws.send(audio_data, websocket.ABNF.OPCODE_BINARY) 236 except Exception as e: 237 if not stop_event.is_set(): 238 print(f"Error streaming audio: {e}") 239 break 240 241 global audio_thread 242 audio_thread = threading.Thread(target=stream_audio, daemon=True) 243 audio_thread.start() 244 245 246 def on_message(ws, message): 247 global last_processed_turn 248 try: 249 data = json.loads(message) 250 msg_type = data.get("type") 251 252 if msg_type == "Begin": 253 print(f"[SESSION] Started - ID: {data.get('id','N/A')}\n") 254 255 elif msg_type == "Turn": 256 end_of_turn = data.get("end_of_turn", False) 257 transcript = data.get("transcript", "") 258 utterance = data.get("utterance", "") 259 turn_order = data.get("turn_order", 0) 260 261 # live partials 262 if not end_of_turn and transcript: 263 print(f"\r[PARTIAL] {transcript[:120]}...", end="", flush=True) 264 265 # If AssemblyAI has finalized a turn, LLM-edit the transcript 266 if end_of_turn and transcript: 267 if last_processed_turn == turn_order: 268 return # avoid duplicate processing 269 last_processed_turn = turn_order 270 271 ts = datetime.now().strftime('%H:%M:%S') 272 print("\n[DEBUG] EOT received. Calling LLM…") 273 edited = post_process_with_llm(transcript) 274 275 changed = "(edited)" if edited.strip() != transcript.strip() else "(no change)" 276 print(f"\n[{ts}] [FINAL {changed}]") 277 print(f" ├─ Original STT : {transcript}") 278 print(f" └─ Edited by LLM: {edited}") 279 print(f"Turn: {turn_order}") 280 281 encounter_buffer.append({ 282 "timestamp": ts, 283 "text": edited, 284 "original_text": transcript, 285 "turn_order": turn_order, 286 "type": "final", 287 }) 288 289 # If we also get per-utterance chunks, just log them raw (no LLM) for timeline 290 elif utterance: 291 ts = datetime.now().strftime('%H:%M:%S') 292 293 low = utterance.lower() 294 if any(t in low for t in ["medication", "prescribe", "dosage", "mg", "daily"]): 295 print(" 💊 MEDICATION MENTIONED") 296 if any(t in low for t in ["pain", "symptom", "complaint", "problem"]): 297 print(" 🏥 SYMPTOM REPORTED") 298 if any(t in low for t in ["diagnose", "assessment", "impression"]): 299 print(" 📋 DIAGNOSIS DISCUSSED") 300 301 encounter_buffer.append({ 302 "timestamp": ts, 303 "text": utterance, 304 "original_text": utterance, 305 "turn_order": turn_order, 306 "type": "utterance", 307 }) 308 print() 309 310 elif msg_type == "Termination": 311 dur = data.get("audio_duration_seconds", 0) 312 print(f"\n[SESSION] Terminated – Duration: {dur}s") 313 save_encounter_transcript() 314 generate_clinical_note() 315 316 elif msg_type == "Error": 317 print(f"\n[ERROR] {data.get('error', 'Unknown error')}") 318 319 except json.JSONDecodeError as e: 320 print(f"Error decoding message: {e}") 321 except Exception as e: 322 print(f"Error handling message: {e}") 323 324 325 def on_error(ws, error): 326 print(f"\n[WEBSOCKET ERROR] {error}") 327 stop_event.set() 328 329 330 def on_close(ws, close_status_code, close_msg): 331 print(f"\n[WEBSOCKET] Disconnected – Status: {close_status_code}") 332 global stream, audio 333 stop_event.set() 334 335 if stream: 336 if stream.is_active(): 337 stream.stop_stream() 338 stream.close() 339 stream = None 340 if audio: 341 audio.terminate() 342 audio = None 343 if audio_thread and audio_thread.is_alive(): 344 audio_thread.join(timeout=1.0) 345 346 347 # === Persist artifacts === 348 349 def save_encounter_transcript(): 350 if not encounter_buffer: 351 print("No encounter data to save.") 352 return 353 354 fname = f"encounter_transcript_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt" 355 with open(fname, "w", encoding="utf-8") as f: 356 f.write("Clinical Encounter Transcript\n") 357 f.write(f"Date: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") 358 f.write("=" * 80 + "\n\n") 359 for e in encounter_buffer: 360 if e.get("speaker"): 361 f.write(f"[{e['timestamp']}] {e['speaker']}: {e['text']}\n") 362 else: 363 f.write(f"[{e['timestamp']}] {e['text']}\n") 364 f.write("\n") 365 print(f"Encounter transcript saved: {fname}") 366 367 368 # === Main === 369 370 def run(): 371 global audio, stream, ws_app, selected_model 372 373 print("=" * 60) 374 print(" 🎙️ Medical Scribe - STT + LLM Gateway") 375 print("=" * 60) 376 selected_model = select_model() 377 print(f"✓ Using model: {selected_model}") 378 379 # Init mic 380 audio = pyaudio.PyAudio() 381 try: 382 stream = audio.open( 383 input=True, 384 frames_per_buffer=FRAMES_PER_BUFFER, 385 channels=CHANNELS, 386 format=FORMAT, 387 rate=SAMPLE_RATE, 388 ) 389 print("Audio stream opened successfully.") 390 except Exception as e: 391 print(f"Error opening audio stream: {e}") 392 if audio: 393 audio.terminate() 394 return 395 396 # Connect WS 397 ws_app = websocket.WebSocketApp( 398 API_ENDPOINT, 399 header={"Authorization": ASSEMBLYAI_API_KEY}, 400 on_open=on_open, 401 on_message=on_message, 402 on_error=on_error, 403 on_close=on_close, 404 ) 405 406 ws_thread = threading.Thread(target=ws_app.run_forever, daemon=True) 407 ws_thread.start() 408 409 try: 410 while ws_thread.is_alive(): 411 time.sleep(0.1) 412 except KeyboardInterrupt: 413 print("\n\nCtrl+C received. Stopping...") 414 stop_event.set() 415 # best-effort terminate 416 if ws_app and ws_app.sock and ws_app.sock.connected: 417 try: 418 ws_app.send(json.dumps({"type": "Terminate"})) 419 time.sleep(2) 420 except Exception as e: 421 print(f"Error sending termination: {e}") 422 if ws_app: 423 ws_app.close() 424 ws_thread.join(timeout=2.0) 425 finally: 426 if stream and stream.is_active(): 427 stream.stop_stream() 428 if stream: 429 stream.close() 430 if audio: 431 audio.terminate() 432 print("Cleanup complete. Exiting.") 433 434 435 if __name__ == "__main__": 436 run()
Using a different streaming model
If you switch to a different streaming model such as Universal-Streaming, formatting is not applied automatically. Three alternative models are available:
universal-streaming-english— English onlyuniversal-streaming-multilingual— Supports 6 languages: English, Spanish, German, French, Italian, and Portuguesewhisper-rt— 99+ languages with automatic language detection; best choice when broad language coverage is required.
Add format_turns=True to your connection parameters to receive transcripts with punctuation, casing, and inverse text normalization (for example, dates, times, and phone numbers):
1 CONNECTION_PARAMS = { 2 "sample_rate": 16000, 3 "speech_model": "universal-streaming-english", # or "universal-streaming-multilingual" or "whisper-rt" 4 "format_turns": True, 5 ... 6 }
When format_turns is enabled, the model emits two Turn messages when a turn ends: one with turn_is_formatted: false (the raw unformatted transcript) and a second with turn_is_formatted: true (the formatted transcript). To avoid calling the LLM twice, check for both end_of_turn and turn_is_formatted:
1 elif msg_type == "Turn": 2 end_of_turn = data.get("end_of_turn", False) 3 turn_is_formatted = data.get("turn_is_formatted", False) 4 transcript = data.get("transcript", "") 5 6 if end_of_turn and turn_is_formatted and transcript: 7 # Formatted final transcript — safe to post-process with LLM 8 edited = post_process_with_llm(transcript) 9 ...
Note that turn_is_formatted should not be used on its own to detect end of turn — always use end_of_turn for that.
Universal-Streaming also uses a different turn detection system than U3 Pro. Instead of punctuation-based detection, it uses a confidence threshold controlled by end_of_turn_confidence_threshold (default 0.4). The min_turn_silence and max_turn_silence parameters in the main code above are U3 Pro–specific and should be replaced with end_of_turn_confidence_threshold when switching models:
1 CONNECTION_PARAMS = { 2 "sample_rate": 16000, 3 "speech_model": "universal-streaming-english", 4 "format_turns": True, 5 "end_of_turn_confidence_threshold": 0.6, # increase for fewer false turn endings 6 }
For Universal-3 Pro (u3-rt-pro), format_turns is not needed since formatting is built into the end-of-turn system and end_of_turn and turn_is_formatted always have the same value.
Next steps
- Build a Post-Visit Medical Scribe — Pre-recorded transcription for post-visit documentation
- Medical Mode for Streaming — Improve streaming medical terminology accuracy
- Universal-3 Pro Streaming — Full streaming model documentation