Build a Real-Time Medical Scribe

This example implements a real-time medical scribe using Universal-3 Pro Streaming with LLM Gateway post-processing. It uses Medical Mode to improve accuracy for clinical terminology, streams audio from a microphone, applies LLM-powered clinical editing on each turn, and generates a SOAP note at the end of the session.

For post-visit documentation using pre-recorded audio, see the Post-Visit Medical Scribe guide instead.

1import os
2import json
3import time
4import threading
5from datetime import datetime
6from urllib.parse import urlencode
7
8import pyaudio
9import websocket
10import requests
11from dotenv import load_dotenv
12from simple_term_menu import TerminalMenu
13
14# Load environment variables from .env if present
15try:
16 load_dotenv()
17except Exception:
18 pass
19
20"""
21Medical Scribe – Streaming STT + LLM Gateway Enhancement (SOAP-ready)
22
23What this does
24--------------
251) Streams mic audio to AssemblyAI Streaming STT
262) On every utterance or end of turn, calls AssemblyAI LLM Gateway to
27 apply *medical* edits (terminology, punctuation, proper nouns, etc.)
283) Logs encounter turns and generates a SOAP note at session end via the Gateway
29
30Quick start
31-----------
32export ASSEMBLYAI_API_KEY=your_key
33python medical_scribe_llm_gateway.py
34"""
35
36# === Config ===
37ASSEMBLYAI_API_KEY = os.environ.get("ASSEMBLYAI_API_KEY", "your_api_key_here")
38
39# WebSocket / STT parameters - CONSERVATIVE SETTINGS FOR MEDICAL
40CONNECTION_PARAMS = {
41 "sample_rate": 16000,
42 "speech_model": "u3-rt-pro",
43 "domain": "medical-v1", # Enable Medical Mode for clinical terminology accuracy
44
45 # MEDICAL SCRIBE CONFIGURATION - Conservative for clinical accuracy
46 # Medical conversations have LONG pauses (provider thinking, examining patient, reviewing charts)
47 # u3-rt-pro defaults: min_turn_silence=100ms, max_turn_silence=1000ms
48 "min_turn_silence": 800, # Wait much longer (vs ~100ms for voice agents, 560ms for meetings)
49 "max_turn_silence": 2000, # Longer for clinical thinking pauses
50}
51
52API_ENDPOINT_BASE_URL = "wss://streaming.assemblyai.com/v3/ws"
53API_ENDPOINT = f"{API_ENDPOINT_BASE_URL}?{urlencode(CONNECTION_PARAMS)}"
54
55# Audio config
56FRAMES_PER_BUFFER = 800 # 50ms @ 16kHz
57SAMPLE_RATE = CONNECTION_PARAMS["sample_rate"]
58CHANNELS = 1
59FORMAT = pyaudio.paInt16
60
61# Globals
62audio = None
63stream = None
64ws_app = None
65audio_thread = None
66stop_event = threading.Event()
67encounter_buffer = [] # list of dicts with turn data
68last_processed_turn = None
69
70# === Model selection ===
71AVAILABLE_MODELS = [
72 {"id": "claude-haiku-4-5-20251001", "name": "Claude Haiku 4.5", "description": "Fastest Claude, good for simple tasks"},
73 {"id": "claude-sonnet-4-20250514", "name": "Claude Sonnet 4", "description": "Balanced speed & intelligence"},
74 {"id": "claude-sonnet-4-5-20250929", "name": "Claude Sonnet 4.5", "description": "Best for coding & agents"},
75 {"id": "claude-sonnet-4-6", "name": "Claude Sonnet 4.6", "description": "Latest Sonnet, fast with strong reasoning"},
76 {"id": "claude-opus-4-20250514", "name": "Claude Opus 4", "description": "Most powerful, deep reasoning"},
77]
78
79def select_model():
80 menu_entries = [f"{m['name']} - {m['description']}" for m in AVAILABLE_MODELS]
81 terminal_menu = TerminalMenu(
82 menu_entries,
83 title="Select a model (Use ↑↓ arrows, Enter to select):",
84 menu_cursor="❯ ",
85 menu_cursor_style=("fg_cyan", "bold"),
86 menu_highlight_style=("bg_cyan", "fg_black"),
87 cycle_cursor=True,
88 clear_screen=False,
89 show_search_hint=True,
90 )
91 idx = terminal_menu.show()
92 if idx is None:
93 print("Model selection cancelled. Exiting...")
94 raise SystemExit(0)
95 return AVAILABLE_MODELS[idx]["id"]
96
97selected_model = None
98
99# === Gateway helpers ===
100
101def _gateway_chat(messages, max_tokens=800, temperature=0.2, retries=3, backoff=0.75):
102 """Call AssemblyAI LLM Gateway with debug logging and retry."""
103 url = "https://llm-gateway.assemblyai.com/v1/chat/completions"
104 headers = {
105 "Authorization": ASSEMBLYAI_API_KEY,
106 "Content-Type": "application/json",
107 }
108 payload = {
109 "model": selected_model,
110 "messages": messages,
111 "max_tokens": max_tokens,
112 "temperature": temperature,
113 }
114
115 last = None
116 for attempt in range(retries):
117 try:
118 print(f"[LLM] POST {url} (model={selected_model}, attempt {attempt+1}/{retries})")
119 resp = requests.post(url, headers=headers, json=payload, timeout=60)
120 print(f"[LLM] ← status {resp.status_code}, bytes {len(resp.content)}")
121 last = resp
122 except Exception as e:
123 if attempt == retries - 1:
124 raise RuntimeError(f"Gateway request error: {e}")
125 time.sleep(backoff * (attempt + 1))
126 continue
127
128 if resp.status_code == 200:
129 data = resp.json()
130 if not data.get("choices") or not data["choices"][0].get("message"):
131 raise RuntimeError(f"Gateway OK but empty body: {str(data)[:200]}")
132 return data
133 if resp.status_code in (429, 500, 502, 503, 504):
134 print(f"[LLM RETRY] {resp.status_code}: {resp.text[:180]}")
135 time.sleep(backoff * (attempt + 1))
136 continue
137 raise RuntimeError(f"Gateway error {resp.status_code}: {resp.text[:300]}")
138
139 raise RuntimeError(
140 f"Gateway failed after retries. Last={getattr(last,'status_code','n/a')} {getattr(last,'text','')[:180]}"
141 )
142
143
144def post_process_with_llm(text: str) -> str:
145 """Medical editing & normalization using LLM Gateway."""
146 system = {
147 "role": "system",
148 "content": (
149 "You are a clinical transcription editor. Keep the speaker's words, "
150 "fix medical terminology (drug names, dosages, anatomy), proper nouns, "
151 "and punctuation for readability. Preserve meaning and avoid inventing "
152 "details. Prefer U.S. clinical style. If a medication or condition is "
153 "phonetically close, correct to the most likely clinical term."
154 ),
155 }
156
157 user = {
158 "role": "user",
159 "content": (
160 "Edit this short transcript for medical accuracy and readability.\n\n"
161 f"Transcript:\n{text}"
162 ),
163 }
164
165 try:
166 res = _gateway_chat([system, user], max_tokens=600)
167 return res["choices"][0]["message"]["content"].strip()
168 except Exception as e:
169 print(f"[LLM EDIT ERROR] {e}. Falling back to original.")
170 return text
171
172
173def generate_clinical_note():
174 """Create a SOAP note from the encounter buffer via Gateway."""
175 if not encounter_buffer:
176 print("No encounter data to summarize.")
177 return
178
179 print("\n=== GENERATING CLINICAL DOCUMENTATION (SOAP) ===")
180 # Build a compact transcript string for the LLM
181 lines = []
182 for e in encounter_buffer:
183 if e.get("type") == "utterance":
184 lines.append(f"[{e['timestamp']}] {e.get('speaker', 'Speaker')}: {e['text']}")
185 elif e.get("type") == "final":
186 lines.append(f"[{e['timestamp']}] FINAL: {e['text']}")
187 combined = "\n".join(lines)
188
189 system = {
190 "role": "system",
191 "content": (
192 "You are a clinician generating concise, structured notes. "
193 "Produce a SOAP note (Subjective, Objective, Assessment, Plan). "
194 "Use bullet points, keep it factual, infer reasonable clinical semantics "
195 "from the transcript but do NOT invent data. Include medications with dosage "
196 "and frequency if mentioned."
197 ),
198 }
199 user = {
200 "role": "user",
201 "content": (
202 "Create a SOAP note from this clinical encounter transcript.\n\n"
203 f"Transcript:\n{combined}\n\n"
204 "Format strictly as:\n"
205 "Subjective:\n- ...\n\nObjective:\n- ...\n\nAssessment:\n- ...\n\nPlan:\n- ...\n"
206 ),
207 }
208
209 try:
210 res = _gateway_chat([system, user], max_tokens=1200)
211 soap = res["choices"][0]["message"]["content"].strip()
212 fname = f"clinical_note_soap_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
213 with open(fname, "w", encoding="utf-8") as f:
214 f.write(soap)
215 print(f"SOAP note saved: {fname}")
216 except Exception as e:
217 print(f"[SOAP ERROR] {e}")
218
219
220# === WebSocket callbacks ===
221
222def on_open(ws):
223 print("=" * 80)
224 print(f"[{datetime.now().strftime('%H:%M:%S')}] Medical transcription started")
225 print(f"Connected to: {API_ENDPOINT_BASE_URL}")
226 print(f"Gateway model: {selected_model}")
227 print("=" * 80)
228 print("\nSpeak to begin. Press Ctrl+C to stop.\n")
229
230 def stream_audio():
231 global stream
232 while not stop_event.is_set():
233 try:
234 audio_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False)
235 ws.send(audio_data, websocket.ABNF.OPCODE_BINARY)
236 except Exception as e:
237 if not stop_event.is_set():
238 print(f"Error streaming audio: {e}")
239 break
240
241 global audio_thread
242 audio_thread = threading.Thread(target=stream_audio, daemon=True)
243 audio_thread.start()
244
245
246def on_message(ws, message):
247 global last_processed_turn
248 try:
249 data = json.loads(message)
250 msg_type = data.get("type")
251
252 if msg_type == "Begin":
253 print(f"[SESSION] Started - ID: {data.get('id','N/A')}\n")
254
255 elif msg_type == "Turn":
256 end_of_turn = data.get("end_of_turn", False)
257 transcript = data.get("transcript", "")
258 utterance = data.get("utterance", "")
259 turn_order = data.get("turn_order", 0)
260
261 # live partials
262 if not end_of_turn and transcript:
263 print(f"\r[PARTIAL] {transcript[:120]}...", end="", flush=True)
264
265 # If AssemblyAI has finalized a turn, LLM-edit the transcript
266 if end_of_turn and transcript:
267 if last_processed_turn == turn_order:
268 return # avoid duplicate processing
269 last_processed_turn = turn_order
270
271 ts = datetime.now().strftime('%H:%M:%S')
272 print("\n[DEBUG] EOT received. Calling LLM…")
273 edited = post_process_with_llm(transcript)
274
275 changed = "(edited)" if edited.strip() != transcript.strip() else "(no change)"
276 print(f"\n[{ts}] [FINAL {changed}]")
277 print(f" ├─ Original STT : {transcript}")
278 print(f" └─ Edited by LLM: {edited}")
279 print(f"Turn: {turn_order}")
280
281 encounter_buffer.append({
282 "timestamp": ts,
283 "text": edited,
284 "original_text": transcript,
285 "turn_order": turn_order,
286 "type": "final",
287 })
288
289 # If we also get per-utterance chunks, just log them raw (no LLM) for timeline
290 elif utterance:
291 ts = datetime.now().strftime('%H:%M:%S')
292
293 low = utterance.lower()
294 if any(t in low for t in ["medication", "prescribe", "dosage", "mg", "daily"]):
295 print(" 💊 MEDICATION MENTIONED")
296 if any(t in low for t in ["pain", "symptom", "complaint", "problem"]):
297 print(" 🏥 SYMPTOM REPORTED")
298 if any(t in low for t in ["diagnose", "assessment", "impression"]):
299 print(" 📋 DIAGNOSIS DISCUSSED")
300
301 encounter_buffer.append({
302 "timestamp": ts,
303 "text": utterance,
304 "original_text": utterance,
305 "turn_order": turn_order,
306 "type": "utterance",
307 })
308 print()
309
310 elif msg_type == "Termination":
311 dur = data.get("audio_duration_seconds", 0)
312 print(f"\n[SESSION] Terminated – Duration: {dur}s")
313 save_encounter_transcript()
314 generate_clinical_note()
315
316 elif msg_type == "Error":
317 print(f"\n[ERROR] {data.get('error', 'Unknown error')}")
318
319 except json.JSONDecodeError as e:
320 print(f"Error decoding message: {e}")
321 except Exception as e:
322 print(f"Error handling message: {e}")
323
324
325def on_error(ws, error):
326 print(f"\n[WEBSOCKET ERROR] {error}")
327 stop_event.set()
328
329
330def on_close(ws, close_status_code, close_msg):
331 print(f"\n[WEBSOCKET] Disconnected – Status: {close_status_code}")
332 global stream, audio
333 stop_event.set()
334
335 if stream:
336 if stream.is_active():
337 stream.stop_stream()
338 stream.close()
339 stream = None
340 if audio:
341 audio.terminate()
342 audio = None
343 if audio_thread and audio_thread.is_alive():
344 audio_thread.join(timeout=1.0)
345
346
347# === Persist artifacts ===
348
349def save_encounter_transcript():
350 if not encounter_buffer:
351 print("No encounter data to save.")
352 return
353
354 fname = f"encounter_transcript_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
355 with open(fname, "w", encoding="utf-8") as f:
356 f.write("Clinical Encounter Transcript\n")
357 f.write(f"Date: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
358 f.write("=" * 80 + "\n\n")
359 for e in encounter_buffer:
360 if e.get("speaker"):
361 f.write(f"[{e['timestamp']}] {e['speaker']}: {e['text']}\n")
362 else:
363 f.write(f"[{e['timestamp']}] {e['text']}\n")
364 f.write("\n")
365 print(f"Encounter transcript saved: {fname}")
366
367
368# === Main ===
369
370def run():
371 global audio, stream, ws_app, selected_model
372
373 print("=" * 60)
374 print(" 🎙️ Medical Scribe - STT + LLM Gateway")
375 print("=" * 60)
376 selected_model = select_model()
377 print(f"✓ Using model: {selected_model}")
378
379 # Init mic
380 audio = pyaudio.PyAudio()
381 try:
382 stream = audio.open(
383 input=True,
384 frames_per_buffer=FRAMES_PER_BUFFER,
385 channels=CHANNELS,
386 format=FORMAT,
387 rate=SAMPLE_RATE,
388 )
389 print("Audio stream opened successfully.")
390 except Exception as e:
391 print(f"Error opening audio stream: {e}")
392 if audio:
393 audio.terminate()
394 return
395
396 # Connect WS
397 ws_app = websocket.WebSocketApp(
398 API_ENDPOINT,
399 header={"Authorization": ASSEMBLYAI_API_KEY},
400 on_open=on_open,
401 on_message=on_message,
402 on_error=on_error,
403 on_close=on_close,
404 )
405
406 ws_thread = threading.Thread(target=ws_app.run_forever, daemon=True)
407 ws_thread.start()
408
409 try:
410 while ws_thread.is_alive():
411 time.sleep(0.1)
412 except KeyboardInterrupt:
413 print("\n\nCtrl+C received. Stopping...")
414 stop_event.set()
415 # best-effort terminate
416 if ws_app and ws_app.sock and ws_app.sock.connected:
417 try:
418 ws_app.send(json.dumps({"type": "Terminate"}))
419 time.sleep(2)
420 except Exception as e:
421 print(f"Error sending termination: {e}")
422 if ws_app:
423 ws_app.close()
424 ws_thread.join(timeout=2.0)
425 finally:
426 if stream and stream.is_active():
427 stream.stop_stream()
428 if stream:
429 stream.close()
430 if audio:
431 audio.terminate()
432 print("Cleanup complete. Exiting.")
433
434
435if __name__ == "__main__":
436 run()

Using a different streaming model

If you switch to a different streaming model such as Universal-Streaming, formatting is not applied automatically. Three alternative models are available:

Add format_turns=True to your connection parameters to receive transcripts with punctuation, casing, and inverse text normalization (for example, dates, times, and phone numbers):

1CONNECTION_PARAMS = {
2 "sample_rate": 16000,
3 "speech_model": "universal-streaming-english", # or "universal-streaming-multilingual" or "whisper-rt"
4 "format_turns": True,
5 ...
6}

When format_turns is enabled, the model emits two Turn messages when a turn ends: one with turn_is_formatted: false (the raw unformatted transcript) and a second with turn_is_formatted: true (the formatted transcript). To avoid calling the LLM twice, check for both end_of_turn and turn_is_formatted:

1elif msg_type == "Turn":
2 end_of_turn = data.get("end_of_turn", False)
3 turn_is_formatted = data.get("turn_is_formatted", False)
4 transcript = data.get("transcript", "")
5
6 if end_of_turn and turn_is_formatted and transcript:
7 # Formatted final transcript — safe to post-process with LLM
8 edited = post_process_with_llm(transcript)
9 ...

Note that turn_is_formatted should not be used on its own to detect end of turn — always use end_of_turn for that.

Universal-Streaming also uses a different turn detection system than U3 Pro. Instead of punctuation-based detection, it uses a confidence threshold controlled by end_of_turn_confidence_threshold (default 0.4). The min_turn_silence and max_turn_silence parameters in the main code above are U3 Pro–specific and should be replaced with end_of_turn_confidence_threshold when switching models:

1CONNECTION_PARAMS = {
2 "sample_rate": 16000,
3 "speech_model": "universal-streaming-english",
4 "format_turns": True,
5 "end_of_turn_confidence_threshold": 0.6, # increase for fewer false turn endings
6}

For Universal-3 Pro (u3-rt-pro), format_turns is not needed since formatting is built into the end-of-turn system and end_of_turn and turn_is_formatted always have the same value.

Next steps