import asyncio import contextlib import html import json import os import re import shutil import sys from datetime import datetime, timezone from pathlib import Path from typing import Any from urllib.parse import urlparse, urlunparse import httpx from fastapi import FastAPI, Request from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import FileResponse, JSONResponse, Response from fastapi.staticfiles import StaticFiles from supertonic_gateway import SuperTonicGateway from voice_rtc import WebRTCVoiceSession BASE_DIR = Path(__file__).resolve().parent DIST_DIR = BASE_DIR / "frontend" / "dist" NANOBOT_CONFIG_PATH = Path( os.getenv("NANOBOT_CONFIG_PATH", str(Path.home() / ".nanobot" / "config.json")) ).expanduser() NANOBOT_WORKSPACE = Path( os.getenv("NANOBOT_WORKSPACE", str(Path.home() / ".nanobot")) ).expanduser() NANOBOT_SCRIPT_WORKSPACE = Path( os.getenv("NANOBOT_SCRIPT_WORKSPACE", str(NANOBOT_WORKSPACE / "workspace")) ).expanduser() CARDS_ROOT = NANOBOT_WORKSPACE / "cards" CARD_INSTANCES_DIR = CARDS_ROOT / "instances" CARD_TEMPLATES_DIR = CARDS_ROOT / "templates" TEMPLATES_CONTEXT_PATH = NANOBOT_WORKSPACE / "CARD_TEMPLATES.md" MAX_TEMPLATES_IN_PROMPT = 12 MAX_TEMPLATE_HTML_CHARS = 4000 _INVALID_TEMPLATE_KEY_CHARS = re.compile(r"[^a-z0-9_-]+") _CARD_ID_PATTERN = re.compile(r"^[a-zA-Z0-9_-]{1,128}$") _CARD_LANE_ORDER = {"attention": 0, "work": 1, "context": 2, "history": 3} _CARD_STATE_ORDER = {"active": 0, "stale": 1, "resolved": 2, "superseded": 3, "archived": 4} _MAX_SCRIPT_PROXY_ARGS = 16 _MAX_SCRIPT_PROXY_STDERR_CHARS = 2000 CARD_INSTANCES_DIR.mkdir(parents=True, exist_ok=True) CARD_TEMPLATES_DIR.mkdir(parents=True, exist_ok=True) app = FastAPI(title="Nanobot SuperTonic Wisper Web") app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"], ) gateway = SuperTonicGateway() _active_session: WebRTCVoiceSession | None = None _active_queue: asyncio.Queue | None = None _sender_task: asyncio.Task | None = None # --------------------------------------------------------------------------- # Cards (file-backed) # --------------------------------------------------------------------------- def _normalize_template_key(raw: str) -> str: key = _INVALID_TEMPLATE_KEY_CHARS.sub("-", raw.strip().lower()).strip("-") return key[:64] def _normalize_card_id(raw: str) -> str: card_id = raw.strip() return card_id if _CARD_ID_PATTERN.fullmatch(card_id) else "" def _card_instance_dir(card_id: str) -> Path | None: card_id_clean = _normalize_card_id(card_id) if not card_id_clean: return None return CARD_INSTANCES_DIR / card_id_clean def _card_meta_path(card_id: str) -> Path | None: instance_dir = _card_instance_dir(card_id) if instance_dir is None: return None return instance_dir / "card.json" def _card_state_path(card_id: str) -> Path | None: instance_dir = _card_instance_dir(card_id) if instance_dir is None: return None return instance_dir / "state.json" def _decode_object(raw: str) -> dict[str, Any] | None: try: payload = json.loads(raw) except (TypeError, ValueError): return None return payload if isinstance(payload, dict) else None def _coerce_card_record(raw: dict[str, Any]) -> dict[str, Any] | None: card_id = _normalize_card_id(str(raw.get("id", ""))) if not card_id: return None kind = str(raw.get("kind", "text") or "text").strip().lower() if kind not in {"text", "question"}: kind = "text" lane = str(raw.get("lane", "context") or "context").strip().lower() if lane not in _CARD_LANE_ORDER: lane = "context" state = str(raw.get("state", "active") or "active").strip().lower() if state not in _CARD_STATE_ORDER: state = "active" try: priority = int(raw.get("priority", 50)) except (TypeError, ValueError): priority = 50 priority = max(0, min(priority, 100)) raw_choices = raw.get("choices", []) choices = [str(choice) for choice in raw_choices] if isinstance(raw_choices, list) else [] raw_template_state = raw.get("template_state", {}) template_state = raw_template_state if isinstance(raw_template_state, dict) else {} return { "id": card_id, "kind": kind, "title": str(raw.get("title", "")), "content": str(raw.get("content", "")) if kind == "question" else "", "question": str(raw.get("question", "")), "choices": choices, "response_value": str(raw.get("response_value", "")), "slot": str(raw.get("slot", "")), "lane": lane, "priority": priority, "state": state, "template_key": str(raw.get("template_key", "")), "template_state": template_state, "context_summary": str(raw.get("context_summary", "")), "chat_id": str(raw.get("chat_id", "web") or "web"), "created_at": str(raw.get("created_at", "")), "updated_at": str(raw.get("updated_at", "")), } def _json_script_text(payload: dict[str, Any]) -> str: return json.dumps(payload, ensure_ascii=False).replace(" str: if card.get("kind") != "text": return str(card.get("content", "")) template_key = str(card.get("template_key", "")).strip() if not template_key: return "" html_path = _template_html_path(template_key) try: template_html = html_path.read_text(encoding="utf-8") except Exception: return ( "
" f"Missing template: {html.escape(template_key)}" "
" ) state_payload = card.get("template_state", {}) if not isinstance(state_payload, dict): state_payload = {} card_id = html.escape(str(card.get("id", ""))) safe_template_key = html.escape(template_key) return ( f'
' f'' f"{template_html}" "
" ) def _load_card(card_id: str) -> dict[str, Any] | None: meta_path = _card_meta_path(card_id) if meta_path is None or not meta_path.exists(): return None try: raw = json.loads(meta_path.read_text(encoding="utf-8")) except Exception: return None if not isinstance(raw, dict): return None state_path = _card_state_path(card_id) if state_path is not None and state_path.exists(): try: raw_state = json.loads(state_path.read_text(encoding="utf-8")) except Exception: raw_state = {} if isinstance(raw_state, dict): raw["template_state"] = raw_state card = _coerce_card_record(raw) if card is None: return None card["content"] = _materialize_card_content(card) return card def _write_card(card: dict[str, Any]) -> dict[str, Any] | None: normalized = _coerce_card_record(card) if normalized is None: return None now = datetime.now(timezone.utc).isoformat() existing = _load_card(normalized["id"]) if existing is not None: normalized["created_at"] = existing.get("created_at") or normalized.get("created_at") or now else: normalized["created_at"] = normalized.get("created_at") or now normalized["updated_at"] = normalized.get("updated_at") or now instance_dir = _card_instance_dir(normalized["id"]) meta_path = _card_meta_path(normalized["id"]) state_path = _card_state_path(normalized["id"]) if instance_dir is None or meta_path is None or state_path is None: return None instance_dir.mkdir(parents=True, exist_ok=True) template_state = normalized.pop("template_state", {}) meta_path.write_text( json.dumps(normalized, indent=2, ensure_ascii=False) + "\n", encoding="utf-8", ) if normalized["kind"] == "text": state_path.write_text( json.dumps(template_state if isinstance(template_state, dict) else {}, indent=2, ensure_ascii=False) + "\n", encoding="utf-8", ) else: state_path.unlink(missing_ok=True) return _load_card(normalized["id"]) def _sort_cards(cards: list[dict[str, Any]]) -> list[dict[str, Any]]: return sorted( cards, key=lambda item: ( _CARD_LANE_ORDER.get(str(item.get("lane", "context")), 99), _CARD_STATE_ORDER.get(str(item.get("state", "active")), 99), -int(item.get("priority", 0) or 0), str(item.get("updated_at", "")), str(item.get("created_at", "")), ), reverse=False, ) def _load_cards() -> list[dict[str, Any]]: CARD_INSTANCES_DIR.mkdir(parents=True, exist_ok=True) cards: list[dict[str, Any]] = [] for instance_dir in CARD_INSTANCES_DIR.iterdir(): if not instance_dir.is_dir(): continue card = _load_card(instance_dir.name) if card is None: continue if card.get("state") == "archived": continue cards.append(card) return _sort_cards(cards) def _find_card_by_slot(slot: str, *, chat_id: str) -> dict[str, Any] | None: target_slot = slot.strip() if not target_slot: return None for card in _load_cards(): if str(card.get("chat_id", "")).strip() != chat_id: continue if str(card.get("slot", "")).strip() == target_slot: return card return None def _persist_card(card: dict[str, Any]) -> dict[str, Any] | None: normalized = _coerce_card_record(card) if normalized is None: return None existing_same_slot = None slot = str(normalized.get("slot", "")).strip() chat_id = str(normalized.get("chat_id", "web") or "web") if slot: existing_same_slot = _find_card_by_slot(slot, chat_id=chat_id) if existing_same_slot is not None and existing_same_slot["id"] != normalized["id"]: superseded = dict(existing_same_slot) superseded["state"] = "superseded" superseded["updated_at"] = datetime.now(timezone.utc).isoformat() _write_card(superseded) return _write_card(normalized) def _delete_card(card_id: str) -> bool: instance_dir = _card_instance_dir(card_id) if instance_dir is None or not instance_dir.exists(): return False shutil.rmtree(instance_dir, ignore_errors=True) return True def _template_dir(template_key: str) -> Path: return CARD_TEMPLATES_DIR / template_key def _template_html_path(template_key: str) -> Path: return _template_dir(template_key) / "template.html" def _template_meta_path(template_key: str) -> Path: return _template_dir(template_key) / "manifest.json" def _read_template_meta(template_key: str) -> dict[str, Any]: meta_path = _template_meta_path(template_key) try: meta = json.loads(meta_path.read_text(encoding="utf-8")) return meta if isinstance(meta, dict) else {} except Exception: return {} def _list_templates(limit: int | None = None) -> list[dict[str, Any]]: CARD_TEMPLATES_DIR.mkdir(parents=True, exist_ok=True) templates: list[dict[str, Any]] = [] for template_dir in CARD_TEMPLATES_DIR.iterdir(): if not template_dir.is_dir(): continue key = _normalize_template_key(template_dir.name) if not key: continue html_path = _template_html_path(key) if not html_path.exists(): continue try: content = html_path.read_text(encoding="utf-8") except Exception: continue stat = html_path.stat() meta = _read_template_meta(key) if bool(meta.get("deprecated")): continue created_at = str(meta.get("created_at") or datetime.fromtimestamp(stat.st_ctime, timezone.utc).isoformat()) updated_at = str(meta.get("updated_at") or datetime.fromtimestamp(stat.st_mtime, timezone.utc).isoformat()) templates.append( { "id": key, "key": key, "title": str(meta.get("title", "")), "content": content, "notes": str(meta.get("notes", "")), "example_state": meta.get("example_state", {}), "created_at": created_at, "updated_at": updated_at, "file_url": f"/card-templates/{key}/template.html", } ) templates.sort(key=lambda item: item["updated_at"], reverse=True) if limit is not None: return templates[: max(0, limit)] return templates def _render_templates_markdown(rows: list[dict[str, Any]]) -> str: lines = [ "# Card Templates", "", "These are user-approved template layouts for `mcp_display_render_card` cards.", "Each card instance should provide a `template_key` and a `template_state` JSON object.", "Use a matching template when the request intent fits.", "Do not rewrite the HTML layout when an existing template already fits; fill the template_state instead.", "", ] for row in rows: key = str(row.get("key", "")).strip() or "unnamed" title = str(row.get("title", "")).strip() or "(untitled)" notes = str(row.get("notes", "")).strip() or "(no usage notes)" content = str(row.get("content", "")).strip() example_state = row.get("example_state", {}) if len(content) > MAX_TEMPLATE_HTML_CHARS: content = content[:MAX_TEMPLATE_HTML_CHARS] + "\n" html_lines = [f" {line}" for line in content.splitlines()] if content else [" "] state_text = json.dumps(example_state, indent=2, ensure_ascii=False) if isinstance(example_state, dict) else "{}" state_lines = [f" {line}" for line in state_text.splitlines()] lines.extend( [ f"## {key}", f"- Title: {title}", f"- Usage: {notes}", "- Example State:", *state_lines, "- HTML:", *html_lines, "", ] ) return "\n".join(lines).rstrip() + "\n" def _sync_templates_context_file() -> None: try: rows = _list_templates(limit=MAX_TEMPLATES_IN_PROMPT) if not rows: TEMPLATES_CONTEXT_PATH.unlink(missing_ok=True) return TEMPLATES_CONTEXT_PATH.parent.mkdir(parents=True, exist_ok=True) TEMPLATES_CONTEXT_PATH.write_text(_render_templates_markdown(rows), encoding="utf-8") except Exception: return def _to_typed_message(event_dict: dict[str, Any]) -> dict[str, Any] | None: role = str(event_dict.get("role", "")).strip() text = str(event_dict.get("text", "")) timestamp = str(event_dict.get("timestamp", "")) if role == "agent-state": return {"type": "agent_state", "state": text} if role in {"nanobot", "nanobot-progress", "nanobot-tool", "system", "user"}: return { "type": "message", "role": role, "content": text, "is_progress": role in {"nanobot-progress", "nanobot-tool"}, "is_tool_hint": role == "nanobot-tool", "timestamp": timestamp, } if role == "card": payload = _decode_object(text) if payload is None: return None card = _coerce_card_record(payload) if card is None: return None card["type"] = "card" return card return None # --------------------------------------------------------------------------- # Nanobot config / HA proxy # --------------------------------------------------------------------------- def _load_nanobot_config() -> dict[str, Any]: try: return json.loads(NANOBOT_CONFIG_PATH.read_text(encoding="utf-8")) except Exception: return {} def _get_home_assistant_mcp_config() -> tuple[str, dict[str, str]]: cfg = _load_nanobot_config() tools = cfg.get("tools") if isinstance(cfg, dict) else {} if not isinstance(tools, dict): raise RuntimeError("nanobot config missing tools section") mcp_servers = tools.get("mcpServers") if not isinstance(mcp_servers, dict): raise RuntimeError("nanobot config missing tools.mcpServers section") raw_server = mcp_servers.get("home assistant") or mcp_servers.get("home_assistant") if not isinstance(raw_server, dict): raise RuntimeError("home assistant MCP server is not configured") url = str(raw_server.get("url", "")).strip() if not url: raise RuntimeError("home assistant MCP server URL is empty") raw_headers = raw_server.get("headers", {}) headers: dict[str, str] = {} if isinstance(raw_headers, dict): for k, v in raw_headers.items(): headers[str(k)] = str(v) return url, headers def _home_assistant_origin(mcp_url: str) -> str: parsed = urlparse(mcp_url.strip()) return urlunparse(parsed._replace(path="", params="", query="", fragment="")).rstrip("/") def _normalize_home_assistant_proxy_path(target_path: str) -> str: normalized = "/" + target_path.lstrip("/") if normalized == "/": raise ValueError("target path is required") if normalized == "/api" or normalized.startswith("/api/"): return normalized return f"/api{normalized}" def _resolve_workspace_script(script_path: str) -> Path: normalized = script_path.strip().lstrip("/") if not normalized: raise ValueError("script path is required") root = NANOBOT_SCRIPT_WORKSPACE.resolve() candidate = (root / normalized).resolve() try: candidate.relative_to(root) except ValueError as exc: raise ValueError("script path escapes workspace") from exc if not candidate.is_file(): raise ValueError(f"script not found: {normalized}") if candidate.suffix.lower() != ".py": raise ValueError("only Python scripts are supported") return candidate def _script_proxy_args(request: Request) -> list[str]: unknown_keys = sorted({key for key in request.query_params.keys() if key != "arg"}) if unknown_keys: raise ValueError( "unsupported script query parameters: " + ", ".join(unknown_keys) ) args = [str(value) for value in request.query_params.getlist("arg")] if len(args) > _MAX_SCRIPT_PROXY_ARGS: raise ValueError( f"too many script arguments ({len(args)} > {_MAX_SCRIPT_PROXY_ARGS})" ) return args # --------------------------------------------------------------------------- # API routes # --------------------------------------------------------------------------- @app.get("/health") async def health() -> JSONResponse: return JSONResponse({"status": "ok"}) @app.api_route( "/ha/proxy/{target_path:path}", methods=["GET", "POST", "PUT", "PATCH", "DELETE"], ) async def home_assistant_proxy(target_path: str, request: Request) -> Response: raw_target = target_path.strip() if not raw_target: return JSONResponse({"error": "target path is required"}, status_code=400) try: mcp_url, auth_headers = _get_home_assistant_mcp_config() origin = _home_assistant_origin(mcp_url) api_path = _normalize_home_assistant_proxy_path(raw_target) except ValueError as exc: return JSONResponse({"error": str(exc)}, status_code=400) except RuntimeError as exc: return JSONResponse({"error": str(exc)}, status_code=502) target_url = f"{origin}{api_path}" if request.url.query: target_url = f"{target_url}?{request.url.query}" outbound_headers = dict(auth_headers) incoming_content_type = request.headers.get("content-type") if incoming_content_type: outbound_headers["Content-Type"] = incoming_content_type incoming_accept = request.headers.get("accept") if incoming_accept: outbound_headers["Accept"] = incoming_accept outbound_body = None if request.method in {"GET", "HEAD"} else await request.body() try: async with httpx.AsyncClient(timeout=20.0, follow_redirects=True) as client: upstream = await client.request( request.method, target_url, headers=outbound_headers, content=outbound_body, ) except httpx.RequestError as exc: return JSONResponse( {"error": f"Home Assistant connection failed: {exc}"}, status_code=502, ) media_type = upstream.headers.get("content-type") or "application/json" return Response( content=upstream.content, status_code=upstream.status_code, media_type=media_type, ) @app.get("/script/proxy/{script_path:path}") async def workspace_script_proxy(script_path: str, request: Request) -> JSONResponse: try: script_file = _resolve_workspace_script(script_path) args = _script_proxy_args(request) except ValueError as exc: return JSONResponse({"error": str(exc)}, status_code=400) try: process = await asyncio.create_subprocess_exec( sys.executable, str(script_file), *args, cwd=str(script_file.parent), stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=60.0) except asyncio.TimeoutError: with contextlib.suppress(ProcessLookupError): process.kill() return JSONResponse({"error": "script execution timed out"}, status_code=504) except OSError as exc: return JSONResponse({"error": f"failed to start script: {exc}"}, status_code=502) stderr_text = stderr.decode("utf-8", errors="replace").strip() if process.returncode != 0: return JSONResponse( { "error": f"script exited with code {process.returncode}", "stderr": stderr_text[:_MAX_SCRIPT_PROXY_STDERR_CHARS], }, status_code=502, ) stdout_text = stdout.decode("utf-8", errors="replace").strip() try: payload = json.loads(stdout_text) except json.JSONDecodeError as exc: return JSONResponse( { "error": f"script did not return valid JSON: {exc}", "stderr": stderr_text[:_MAX_SCRIPT_PROXY_STDERR_CHARS], }, status_code=502, ) return JSONResponse(payload) @app.get("/cards") async def get_cards() -> JSONResponse: return JSONResponse(_load_cards()) @app.delete("/cards/{card_id}") async def delete_card(card_id: str) -> JSONResponse: if not _normalize_card_id(card_id): return JSONResponse({"error": "invalid card id"}, status_code=400) _delete_card(card_id) return JSONResponse({"status": "ok"}) @app.get("/templates") async def get_templates() -> JSONResponse: return JSONResponse(_list_templates()) @app.post("/templates") async def save_template(request: Request) -> JSONResponse: payload = await request.json() key = _normalize_template_key(str(payload.get("key", ""))) title = str(payload.get("title", "")).strip() content = str(payload.get("content", "")).strip() notes = str(payload.get("notes", "")).strip() example_state = payload.get("example_state", {}) if not isinstance(example_state, dict): example_state = {} if not key: return JSONResponse({"error": "template key is required"}, status_code=400) if not content: return JSONResponse({"error": "template content is required"}, status_code=400) template_dir = _template_dir(key) template_dir.mkdir(parents=True, exist_ok=True) now = datetime.now(timezone.utc).isoformat() existing_meta = _read_template_meta(key) created_at = str(existing_meta.get("created_at") or now) _template_html_path(key).write_text(content, encoding="utf-8") _template_meta_path(key).write_text( json.dumps( { "key": key, "title": title, "notes": notes, "example_state": example_state, "created_at": created_at, "updated_at": now, }, indent=2, ensure_ascii=False, ) + "\n", encoding="utf-8", ) _sync_templates_context_file() return JSONResponse( { "status": "ok", "id": key, "key": key, "example_state": example_state, "file_url": f"/card-templates/{key}/template.html", } ) @app.delete("/templates/{template_key}") async def delete_template(template_key: str) -> JSONResponse: key = _normalize_template_key(template_key) if not key: return JSONResponse({"error": "invalid template key"}, status_code=400) shutil.rmtree(_template_dir(key), ignore_errors=True) _sync_templates_context_file() return JSONResponse({"status": "ok", "key": key}) @app.post("/message") async def post_message(request: Request) -> JSONResponse: payload = await request.json() text = str(payload.get("text", "")).strip() metadata = payload.get("metadata", {}) if not text: return JSONResponse({"error": "empty message"}, status_code=400) if not isinstance(metadata, dict): metadata = {} await gateway.send_user_message(text, metadata=metadata) return JSONResponse({"status": "ok"}) @app.post("/rtc/offer") async def rtc_offer(request: Request) -> JSONResponse: global _active_session, _active_queue, _sender_task payload = await request.json() if _active_session is not None: await _active_session.close() _active_session = None if _active_queue is not None: await gateway.unsubscribe(_active_queue) _active_queue = None if _sender_task is not None: _sender_task.cancel() with contextlib.suppress(asyncio.CancelledError): await _sender_task _sender_task = None queue = await gateway.subscribe() _active_queue = queue voice_session = WebRTCVoiceSession(gateway=gateway) _active_session = voice_session _sender_task = asyncio.create_task( _sender_loop(queue, voice_session), name="rtc-sender", ) answer = await voice_session.handle_offer(payload) if answer is None: return JSONResponse( {"error": "WebRTC backend unavailable on host (aiortc is not installed)."}, status_code=503, ) await gateway.connect_nanobot() return JSONResponse(answer) @app.on_event("shutdown") async def on_shutdown() -> None: global _active_session, _active_queue, _sender_task if _sender_task is not None: _sender_task.cancel() with contextlib.suppress(asyncio.CancelledError): await _sender_task if _active_session is not None: await _active_session.close() if _active_queue is not None: await gateway.unsubscribe(_active_queue) await gateway.shutdown() async def _sender_loop( queue: asyncio.Queue, voice_session: "WebRTCVoiceSession", ) -> None: while True: event = await queue.get() if event.role == "nanobot-tts": await voice_session.queue_output_text(event.text) continue typed_event = _to_typed_message(event.to_dict()) if typed_event is None: continue if typed_event.get("type") == "card": persisted = _persist_card(typed_event) if persisted is None: continue payload = dict(persisted) payload["type"] = "card" voice_session.send_to_datachannel(payload) continue voice_session.send_to_datachannel(typed_event) @app.on_event("startup") async def on_startup() -> None: CARD_INSTANCES_DIR.mkdir(parents=True, exist_ok=True) CARD_TEMPLATES_DIR.mkdir(parents=True, exist_ok=True) _sync_templates_context_file() if DIST_DIR.exists(): assets_dir = DIST_DIR / "assets" if assets_dir.exists(): app.mount("/assets", StaticFiles(directory=str(assets_dir)), name="assets") if CARD_TEMPLATES_DIR.exists(): app.mount( "/card-templates", StaticFiles(directory=str(CARD_TEMPLATES_DIR)), name="card-templates", ) @app.get("/{full_path:path}") async def spa_fallback(full_path: str) -> FileResponse: candidate = DIST_DIR / full_path if candidate.is_file(): return FileResponse(str(candidate)) response = FileResponse(str(DIST_DIR / "index.html")) response.headers["Cache-Control"] = "no-store, no-cache, must-revalidate" response.headers["Pragma"] = "no-cache" response.headers["Expires"] = "0" return response