import os
import json
import time
import uuid
import requests
import websocket
from urllib.parse import quote
from datetime import datetime, timezone
from dotenv import load_dotenv
load_dotenv()
CLIENT_ID = os.getenv("CLIENT_ID")
CLIENT_SECRET = os.getenv("CLIENT_SECRET")
TENANT_NAME = os.getenv("TENANT_NAME")
ENVIRONMENT = os.getenv("ENVIRONMENT")
def get_access_token():
print("π Step 1 β Authenticating via Keycloak...")
url = f"https://auth.{ENVIRONMENT}.corti.app/realms/{TENANT_NAME}/protocol/openid-connect/token"
payload = {
"grant_type": "client_credentials",
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET,
"scope": "openid"
}
headers = {"Content-Type": "application/x-www-form-urlencoded"}
response = requests.post(url, data=payload, headers=headers)
if response.ok:
print("β
Authenticated")
return response.json()["access_token"]
raise Exception(f"Authentication failed: {response.status_code} - {response.text}")
def create_interaction(token):
print("π Step 2 β Creating interaction session...")
url = f"https://api.{ENVIRONMENT}.corti.app/v2/interactions"
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
"Tenant-Name": TENANT_NAME
}
now = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
payload = {
"assignedUserId": str(uuid.uuid4()),
"encounter": {
"identifier": str(uuid.uuid4()),
"status": "planned",
"type": "first_consultation",
"period": {
"startedAt": now,
"startedAtTzoffset": "+00:00",
"endedAt": now,
"endedAtTzoffset": "+00:00"
},
"title": "Consultation"
}
}
response = requests.post(url, json=payload, headers=headers)
if response.ok:
print("β
Interaction session created")
return response.json()
raise Exception(f"Interaction creation failed: {response.status_code} - {response.text}")
def stream_audio(ws, path):
print("π§ Step 4 β Streaming audio...")
chunk_size = 32000
with open(path, "rb") as f:
while chunk := f.read(chunk_size):
ws.send(chunk, opcode=websocket.ABNF.OPCODE_BINARY)
time.sleep(0.5)
ws.send(json.dumps({"type": "end"}))
print("β
Audio stream completed")
def connect_and_stream_audio(ws_url, audio_path):
print("π Step 3 β Connecting to WebSocket and sending config...")
transcript_lines = []
def on_open(ws):
config = {
"type": "config",
"configuration": {
"transcription": {
"primaryLanguage": "en",
"isDiarization": False,
"isMultichannel": False,
"participants": [{"channel": 0, "role": "multiple"}]
},
"mode": {"type": "facts", "outputLocale": "en"}
}
}
ws.send(json.dumps(config))
print("β
Configuration sent")
def on_message(ws, message):
msg = json.loads(message)
msg_type = msg.get("type").upper()
if msg_type == "CONFIG_ACCEPTED":
print("π’ Config accepted by server")
stream_audio(ws, audio_path)
elif msg_type == "TRANSCRIPT":
print("πStep 5 -Transcript received:")
content = " ".join([x["transcript"] for x in msg["data"]])
if content:
transcript_lines.append(content)
elif msg_type == "FACTS":
print("πStep 5 - Facts received:")
print(json.dumps(msg, indent=2))
elif msg_type == "ENDED":
print("π Step 6 β Session ended by server")
if transcript_lines:
print("\nπ Final Transcript:")
for line in transcript_lines:
print(line)
else:
print("β οΈ No transcript received.")
ws.close()
def on_close(ws, code, reason):
print("β
WebSocket closed")
websocket.enableTrace(False)
ws = websocket.WebSocketApp(
ws_url,
on_open=on_open,
on_message=on_message,
on_error=lambda ws, err: print(f"[ERROR] {err}"),
on_close=on_close
)
ws.run_forever()
def main(audio_path):
if not os.path.isfile(audio_path):
raise FileNotFoundError(f"Missing file: {audio_path}")
token = get_access_token()
interaction = create_interaction(token)
print("\nπ¦ Final interaction response:")
print(json.dumps(interaction, indent=2))
ws_url = interaction.get("websocketUrl")
if not ws_url:
raise ValueError("Missing WebSocket URL in response")
ws_url += f"&token={quote(f'Bearer {token}')}"
connect_and_stream_audio(ws_url, audio_path)
if __name__ == "__main__":
audio_file_path = "PATH_TO_FILE.wav"
main(audio_file_path)