nanobot-voice-interface/app.py

117 lines
3.9 KiB
Python
Raw Normal View History

2026-02-28 22:12:04 -05:00
import asyncio
import contextlib
import json
from pathlib import Path
from typing import Any, Awaitable, Callable
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
2026-03-06 00:23:16 -05:00
from fastapi.responses import HTMLResponse, JSONResponse
2026-03-04 08:20:42 -05:00
from fastapi.staticfiles import StaticFiles
2026-02-28 22:12:04 -05:00
from supertonic_gateway import SuperTonicGateway
from voice_rtc import WebRTCVoiceSession
BASE_DIR = Path(__file__).resolve().parent
2026-03-04 08:20:42 -05:00
STATIC_DIR = BASE_DIR / "static"
INDEX_PATH = STATIC_DIR / "index.html"
2026-02-28 22:12:04 -05:00
app = FastAPI(title="Nanobot SuperTonic Wisper Web")
gateway = SuperTonicGateway()
2026-03-04 08:20:42 -05:00
app.mount("/static", StaticFiles(directory=str(STATIC_DIR)), name="static")
2026-02-28 22:12:04 -05:00
@app.get("/health")
async def health() -> JSONResponse:
return JSONResponse({"status": "ok"})
@app.get("/")
2026-03-05 15:10:14 -05:00
async def index() -> HTMLResponse:
html = INDEX_PATH.read_text(encoding="utf-8")
return HTMLResponse(content=html)
2026-02-28 22:12:04 -05:00
@app.websocket("/ws/chat")
async def websocket_chat(websocket: WebSocket) -> None:
await websocket.accept()
send_lock = asyncio.Lock()
async def safe_send_json(payload: dict[str, Any]) -> None:
async with send_lock:
await websocket.send_json(payload)
queue = await gateway.subscribe()
voice_session = WebRTCVoiceSession(gateway=gateway, send_json=safe_send_json)
sender = asyncio.create_task(_sender_loop(safe_send_json, queue, voice_session))
try:
while True:
raw_message = await websocket.receive_text()
try:
message = json.loads(raw_message)
except json.JSONDecodeError:
await safe_send_json(
{"role": "system", "text": "Invalid JSON message.", "timestamp": ""}
)
continue
msg_type = str(message.get("type", "")).strip()
if msg_type == "spawn":
await gateway.spawn_tui()
elif msg_type == "stop":
await gateway.stop_tui()
elif msg_type == "rtc-offer":
await voice_session.handle_offer(message)
elif msg_type == "rtc-ice-candidate":
await voice_session.handle_ice_candidate(message)
elif msg_type == "voice-ptt":
2026-03-05 15:10:14 -05:00
voice_session.set_push_to_talk_pressed(bool(message.get("pressed", False)))
2026-03-04 08:20:42 -05:00
elif msg_type == "user-message":
await gateway.send_user_message(str(message.get("text", "")))
2026-03-05 15:10:14 -05:00
elif msg_type == "ui-response":
await gateway.send_ui_response(
str(message.get("request_id", "")),
str(message.get("value", "")),
)
elif msg_type == "command":
await gateway.send_command(str(message.get("command", "")))
2026-02-28 22:12:04 -05:00
else:
await safe_send_json(
{
"role": "system",
"text": (
"Unknown message type. Use spawn, stop, rtc-offer, "
2026-03-05 15:10:14 -05:00
"rtc-ice-candidate, voice-ptt, user-message, "
"ui-response, or command."
2026-02-28 22:12:04 -05:00
),
"timestamp": "",
}
)
except WebSocketDisconnect:
pass
finally:
sender.cancel()
with contextlib.suppress(asyncio.CancelledError):
await sender
await voice_session.close()
await gateway.unsubscribe(queue)
@app.on_event("shutdown")
async def on_shutdown() -> None:
await gateway.shutdown()
async def _sender_loop(
send_json: Callable[[dict[str, Any]], Awaitable[None]],
queue: asyncio.Queue,
voice_session: WebRTCVoiceSession,
) -> None:
while True:
event = await queue.get()
if event.role == "nanobot-tts":
await voice_session.queue_output_text(event.text)
continue
await send_json(event.to_dict())