nanobot-voice-interface/app.py

1039 lines
35 KiB
Python

import asyncio
import contextlib
import html
import json
import os
import re
import shutil
import subprocess
import sys
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from urllib.parse import urlparse, urlunparse
import httpx
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse, JSONResponse, Response
from fastapi.staticfiles import StaticFiles
from supertonic_gateway import SuperTonicGateway
from voice_rtc import WebRTCVoiceSession
BASE_DIR = Path(__file__).resolve().parent
DIST_DIR = BASE_DIR / "frontend" / "dist"
NANOBOT_CONFIG_PATH = Path(
os.getenv("NANOBOT_CONFIG_PATH", str(Path.home() / ".nanobot" / "config.json"))
).expanduser()
NANOBOT_WORKSPACE = Path(
os.getenv("NANOBOT_WORKSPACE", str(Path.home() / ".nanobot"))
).expanduser()
NANOBOT_SCRIPT_WORKSPACE = Path(
os.getenv("NANOBOT_SCRIPT_WORKSPACE", str(NANOBOT_WORKSPACE / "workspace"))
).expanduser()
CARDS_ROOT = NANOBOT_WORKSPACE / "cards"
CARD_INSTANCES_DIR = CARDS_ROOT / "instances"
CARD_TEMPLATES_DIR = CARDS_ROOT / "templates"
CARD_SOURCES_DIR = CARDS_ROOT / "sources"
CARD_SOURCE_STATE_DIR = CARDS_ROOT / "source-state"
TEMPLATES_CONTEXT_PATH = NANOBOT_WORKSPACE / "CARD_TEMPLATES.md"
MAX_TEMPLATES_IN_PROMPT = 12
MAX_TEMPLATE_HTML_CHARS = 4000
_INVALID_TEMPLATE_KEY_CHARS = re.compile(r"[^a-z0-9_-]+")
_CARD_ID_PATTERN = re.compile(r"^[a-zA-Z0-9_-]{1,128}$")
_CARD_LANE_ORDER = {"attention": 0, "work": 1, "context": 2, "history": 3}
_CARD_STATE_ORDER = {"active": 0, "stale": 1, "resolved": 2, "superseded": 3, "archived": 4}
_MAX_SCRIPT_PROXY_ARGS = 16
_MAX_SCRIPT_PROXY_STDERR_CHARS = 2000
CARD_INSTANCES_DIR.mkdir(parents=True, exist_ok=True)
CARD_TEMPLATES_DIR.mkdir(parents=True, exist_ok=True)
CARD_SOURCES_DIR.mkdir(parents=True, exist_ok=True)
CARD_SOURCE_STATE_DIR.mkdir(parents=True, exist_ok=True)
app = FastAPI(title="Nanobot SuperTonic Wisper Web")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
gateway = SuperTonicGateway()
_active_session: WebRTCVoiceSession | None = None
_active_queue: asyncio.Queue | None = None
_sender_task: asyncio.Task | None = None
# ---------------------------------------------------------------------------
# Cards (file-backed)
# ---------------------------------------------------------------------------
def _normalize_template_key(raw: str) -> str:
key = _INVALID_TEMPLATE_KEY_CHARS.sub("-", raw.strip().lower()).strip("-")
return key[:64]
def _normalize_card_id(raw: str) -> str:
card_id = raw.strip()
return card_id if _CARD_ID_PATTERN.fullmatch(card_id) else ""
def _card_instance_dir(card_id: str) -> Path | None:
card_id_clean = _normalize_card_id(card_id)
if not card_id_clean:
return None
return CARD_INSTANCES_DIR / card_id_clean
def _card_meta_path(card_id: str) -> Path | None:
instance_dir = _card_instance_dir(card_id)
if instance_dir is None:
return None
return instance_dir / "card.json"
def _card_state_path(card_id: str) -> Path | None:
instance_dir = _card_instance_dir(card_id)
if instance_dir is None:
return None
return instance_dir / "state.json"
def _decode_object(raw: str) -> dict[str, Any] | None:
try:
payload = json.loads(raw)
except (TypeError, ValueError):
return None
return payload if isinstance(payload, dict) else None
def _coerce_card_record(raw: dict[str, Any]) -> dict[str, Any] | None:
card_id = _normalize_card_id(str(raw.get("id", "")))
if not card_id:
return None
kind = str(raw.get("kind", "text") or "text").strip().lower()
if kind not in {"text", "question"}:
kind = "text"
lane = str(raw.get("lane", "context") or "context").strip().lower()
if lane not in _CARD_LANE_ORDER:
lane = "context"
state = str(raw.get("state", "active") or "active").strip().lower()
if state not in _CARD_STATE_ORDER:
state = "active"
try:
priority = int(raw.get("priority", 50))
except (TypeError, ValueError):
priority = 50
priority = max(0, min(priority, 100))
raw_choices = raw.get("choices", [])
choices = [str(choice) for choice in raw_choices] if isinstance(raw_choices, list) else []
raw_template_state = raw.get("template_state", {})
template_state = raw_template_state if isinstance(raw_template_state, dict) else {}
return {
"id": card_id,
"kind": kind,
"title": str(raw.get("title", "")),
"content": str(raw.get("content", "")) if kind == "question" else "",
"question": str(raw.get("question", "")),
"choices": choices,
"response_value": str(raw.get("response_value", "")),
"slot": str(raw.get("slot", "")),
"lane": lane,
"priority": priority,
"state": state,
"template_key": str(raw.get("template_key", "")),
"template_state": template_state,
"context_summary": str(raw.get("context_summary", "")),
"chat_id": str(raw.get("chat_id", "web") or "web"),
"created_at": str(raw.get("created_at", "")),
"updated_at": str(raw.get("updated_at", "")),
}
def _json_script_text(payload: dict[str, Any]) -> str:
return json.dumps(payload, ensure_ascii=False).replace("</", "<\\/")
def _materialize_card_content(card: dict[str, Any]) -> str:
if card.get("kind") != "text":
return str(card.get("content", ""))
template_key = str(card.get("template_key", "")).strip()
if not template_key:
return ""
html_path = _template_html_path(template_key)
try:
template_html = html_path.read_text(encoding="utf-8")
except Exception:
return (
"<div style=\"padding:16px;border:1px solid #fecaca;border-radius:12px;"
"background:#fef2f2;color:#991b1b;font:600 14px/1.4 system-ui,sans-serif;\">"
f"Missing template: {html.escape(template_key)}"
"</div>"
)
state_payload = card.get("template_state", {})
if not isinstance(state_payload, dict):
state_payload = {}
card_id = html.escape(str(card.get("id", "")))
safe_template_key = html.escape(template_key)
return (
f'<div data-nanobot-card-root data-card-id="{card_id}" data-template-key="{safe_template_key}">'
f'<script type="application/json" data-card-state>{_json_script_text(state_payload)}</script>'
f"{template_html}"
"</div>"
)
def _load_card(card_id: str) -> dict[str, Any] | None:
meta_path = _card_meta_path(card_id)
if meta_path is None or not meta_path.exists():
return None
try:
raw = json.loads(meta_path.read_text(encoding="utf-8"))
except Exception:
return None
if not isinstance(raw, dict):
return None
state_path = _card_state_path(card_id)
if state_path is not None and state_path.exists():
try:
raw_state = json.loads(state_path.read_text(encoding="utf-8"))
except Exception:
raw_state = {}
if isinstance(raw_state, dict):
raw["template_state"] = raw_state
card = _coerce_card_record(raw)
if card is None:
return None
card["content"] = _materialize_card_content(card)
return card
def _write_card(card: dict[str, Any]) -> dict[str, Any] | None:
normalized = _coerce_card_record(card)
if normalized is None:
return None
now = datetime.now(timezone.utc).isoformat()
existing = _load_card(normalized["id"])
if existing is not None:
normalized["created_at"] = existing.get("created_at") or normalized.get("created_at") or now
else:
normalized["created_at"] = normalized.get("created_at") or now
normalized["updated_at"] = normalized.get("updated_at") or now
instance_dir = _card_instance_dir(normalized["id"])
meta_path = _card_meta_path(normalized["id"])
state_path = _card_state_path(normalized["id"])
if instance_dir is None or meta_path is None or state_path is None:
return None
instance_dir.mkdir(parents=True, exist_ok=True)
template_state = normalized.pop("template_state", {})
meta_path.write_text(
json.dumps(normalized, indent=2, ensure_ascii=False) + "\n",
encoding="utf-8",
)
if normalized["kind"] == "text":
state_path.write_text(
json.dumps(template_state if isinstance(template_state, dict) else {}, indent=2, ensure_ascii=False) + "\n",
encoding="utf-8",
)
else:
state_path.unlink(missing_ok=True)
return _load_card(normalized["id"])
def _sort_cards(cards: list[dict[str, Any]]) -> list[dict[str, Any]]:
return sorted(
cards,
key=lambda item: (
_CARD_LANE_ORDER.get(str(item.get("lane", "context")), 99),
_CARD_STATE_ORDER.get(str(item.get("state", "active")), 99),
-int(item.get("priority", 0) or 0),
str(item.get("updated_at", "")),
str(item.get("created_at", "")),
),
reverse=False,
)
def _load_cards() -> list[dict[str, Any]]:
CARD_INSTANCES_DIR.mkdir(parents=True, exist_ok=True)
cards: list[dict[str, Any]] = []
for instance_dir in CARD_INSTANCES_DIR.iterdir():
if not instance_dir.is_dir():
continue
card = _load_card(instance_dir.name)
if card is None:
continue
if card.get("state") == "archived":
continue
cards.append(card)
return _sort_cards(cards)
def _find_card_by_slot(slot: str, *, chat_id: str) -> dict[str, Any] | None:
target_slot = slot.strip()
if not target_slot:
return None
for card in _load_cards():
if str(card.get("chat_id", "")).strip() != chat_id:
continue
if str(card.get("slot", "")).strip() == target_slot:
return card
return None
def _persist_card(card: dict[str, Any]) -> dict[str, Any] | None:
normalized = _coerce_card_record(card)
if normalized is None:
return None
existing_same_slot = None
slot = str(normalized.get("slot", "")).strip()
chat_id = str(normalized.get("chat_id", "web") or "web")
if slot:
existing_same_slot = _find_card_by_slot(slot, chat_id=chat_id)
if existing_same_slot is not None and existing_same_slot["id"] != normalized["id"]:
superseded = dict(existing_same_slot)
superseded["state"] = "superseded"
superseded["updated_at"] = datetime.now(timezone.utc).isoformat()
_write_card(superseded)
return _write_card(normalized)
def _delete_card(card_id: str) -> bool:
instance_dir = _card_instance_dir(card_id)
if instance_dir is None or not instance_dir.exists():
return False
shutil.rmtree(instance_dir, ignore_errors=True)
return True
def _template_dir(template_key: str) -> Path:
return CARD_TEMPLATES_DIR / template_key
def _template_html_path(template_key: str) -> Path:
return _template_dir(template_key) / "template.html"
def _template_meta_path(template_key: str) -> Path:
return _template_dir(template_key) / "manifest.json"
def _read_template_meta(template_key: str) -> dict[str, Any]:
meta_path = _template_meta_path(template_key)
try:
meta = json.loads(meta_path.read_text(encoding="utf-8"))
return meta if isinstance(meta, dict) else {}
except Exception:
return {}
def _list_templates(limit: int | None = None) -> list[dict[str, Any]]:
CARD_TEMPLATES_DIR.mkdir(parents=True, exist_ok=True)
templates: list[dict[str, Any]] = []
for template_dir in CARD_TEMPLATES_DIR.iterdir():
if not template_dir.is_dir():
continue
key = _normalize_template_key(template_dir.name)
if not key:
continue
html_path = _template_html_path(key)
if not html_path.exists():
continue
try:
content = html_path.read_text(encoding="utf-8")
except Exception:
continue
stat = html_path.stat()
meta = _read_template_meta(key)
if bool(meta.get("deprecated")):
continue
created_at = str(meta.get("created_at") or datetime.fromtimestamp(stat.st_ctime, timezone.utc).isoformat())
updated_at = str(meta.get("updated_at") or datetime.fromtimestamp(stat.st_mtime, timezone.utc).isoformat())
templates.append(
{
"id": key,
"key": key,
"title": str(meta.get("title", "")),
"content": content,
"notes": str(meta.get("notes", "")),
"example_state": meta.get("example_state", {}),
"created_at": created_at,
"updated_at": updated_at,
"file_url": f"/card-templates/{key}/template.html",
}
)
templates.sort(key=lambda item: item["updated_at"], reverse=True)
if limit is not None:
return templates[: max(0, limit)]
return templates
def _render_templates_markdown(rows: list[dict[str, Any]]) -> str:
lines = [
"# Card Templates",
"",
"These are user-approved template layouts for `mcp_display_render_card` cards.",
"Each card instance should provide a `template_key` and a `template_state` JSON object.",
"Use a matching template when the request intent fits.",
"Do not rewrite the HTML layout when an existing template already fits; fill the template_state instead.",
"",
]
for row in rows:
key = str(row.get("key", "")).strip() or "unnamed"
title = str(row.get("title", "")).strip() or "(untitled)"
notes = str(row.get("notes", "")).strip() or "(no usage notes)"
content = str(row.get("content", "")).strip()
example_state = row.get("example_state", {})
if len(content) > MAX_TEMPLATE_HTML_CHARS:
content = content[:MAX_TEMPLATE_HTML_CHARS] + "\n<!-- truncated -->"
html_lines = [f" {line}" for line in content.splitlines()] if content else [" "]
state_text = json.dumps(example_state, indent=2, ensure_ascii=False) if isinstance(example_state, dict) else "{}"
state_lines = [f" {line}" for line in state_text.splitlines()]
lines.extend(
[
f"## {key}",
f"- Title: {title}",
f"- Usage: {notes}",
"- Example State:",
*state_lines,
"- HTML:",
*html_lines,
"",
]
)
return "\n".join(lines).rstrip() + "\n"
def _sync_templates_context_file() -> None:
try:
rows = _list_templates(limit=MAX_TEMPLATES_IN_PROMPT)
if not rows:
TEMPLATES_CONTEXT_PATH.unlink(missing_ok=True)
return
TEMPLATES_CONTEXT_PATH.parent.mkdir(parents=True, exist_ok=True)
TEMPLATES_CONTEXT_PATH.write_text(_render_templates_markdown(rows), encoding="utf-8")
except Exception:
return
def _to_typed_message(event_dict: dict[str, Any]) -> dict[str, Any] | None:
role = str(event_dict.get("role", "")).strip()
text = str(event_dict.get("text", ""))
timestamp = str(event_dict.get("timestamp", ""))
if role == "agent-state":
return {"type": "agent_state", "state": text}
if role in {"nanobot", "nanobot-progress", "nanobot-tool", "system", "user"}:
return {
"type": "message",
"role": role,
"content": text,
"is_progress": role in {"nanobot-progress", "nanobot-tool"},
"is_tool_hint": role == "nanobot-tool",
"timestamp": timestamp,
}
if role == "card":
payload = _decode_object(text)
if payload is None:
return None
card = _coerce_card_record(payload)
if card is None:
return None
card["type"] = "card"
return card
return None
# ---------------------------------------------------------------------------
# Nanobot config / HA proxy
# ---------------------------------------------------------------------------
def _load_nanobot_config() -> dict[str, Any]:
try:
return json.loads(NANOBOT_CONFIG_PATH.read_text(encoding="utf-8"))
except Exception:
return {}
def _get_home_assistant_mcp_config() -> tuple[str, dict[str, str]]:
cfg = _load_nanobot_config()
tools = cfg.get("tools") if isinstance(cfg, dict) else {}
if not isinstance(tools, dict):
raise RuntimeError("nanobot config missing tools section")
mcp_servers = tools.get("mcpServers")
if not isinstance(mcp_servers, dict):
raise RuntimeError("nanobot config missing tools.mcpServers section")
raw_server = mcp_servers.get("home assistant") or mcp_servers.get("home_assistant")
if not isinstance(raw_server, dict):
raise RuntimeError("home assistant MCP server is not configured")
url = str(raw_server.get("url", "")).strip()
if not url:
raise RuntimeError("home assistant MCP server URL is empty")
raw_headers = raw_server.get("headers", {})
headers: dict[str, str] = {}
if isinstance(raw_headers, dict):
for k, v in raw_headers.items():
headers[str(k)] = str(v)
return url, headers
def _home_assistant_origin(mcp_url: str) -> str:
parsed = urlparse(mcp_url.strip())
return urlunparse(parsed._replace(path="", params="", query="", fragment="")).rstrip("/")
def _normalize_home_assistant_proxy_path(target_path: str) -> str:
normalized = "/" + target_path.lstrip("/")
if normalized == "/":
raise ValueError("target path is required")
if normalized == "/api" or normalized.startswith("/api/"):
return normalized
return f"/api{normalized}"
def _run_workspace_script(script_file: Path, args: list[str], *, timeout_seconds: float) -> tuple[int, str, str]:
process = subprocess.run(
[sys.executable, str(script_file), *args],
cwd=str(script_file.parent),
capture_output=True,
text=True,
timeout=timeout_seconds,
)
return process.returncode, process.stdout.strip(), process.stderr.strip()
def _card_source_state_path(source_id: str) -> Path:
return CARD_SOURCE_STATE_DIR / f"{source_id}.json"
def _load_card_source_configs() -> list[dict[str, Any]]:
rows: list[dict[str, Any]] = []
for path in sorted(CARD_SOURCES_DIR.glob('*.json')):
try:
raw = json.loads(path.read_text(encoding='utf-8'))
except Exception:
continue
if not isinstance(raw, dict):
continue
source_id = _normalize_card_id(str(raw.get('id') or path.stem))
if not source_id or raw.get('enabled', True) is False:
continue
script = str(raw.get('script', '')).strip()
if not script:
continue
raw_args = raw.get('args', [])
if not isinstance(raw_args, list):
raw_args = []
try:
min_interval_ms = max(0, int(raw.get('min_interval_ms', 10000)))
except (TypeError, ValueError):
min_interval_ms = 10000
try:
timeout_seconds = max(1, min(300, int(raw.get('timeout_seconds', 60))))
except (TypeError, ValueError):
timeout_seconds = 60
rows.append({
'id': source_id,
'script': script,
'args': [str(arg) for arg in raw_args][: _MAX_SCRIPT_PROXY_ARGS],
'min_interval_ms': min_interval_ms,
'timeout_seconds': timeout_seconds,
})
return rows
def _load_card_source_state(source_id: str) -> dict[str, Any]:
path = _card_source_state_path(source_id)
try:
payload = json.loads(path.read_text(encoding='utf-8'))
return payload if isinstance(payload, dict) else {}
except Exception:
return {}
def _save_card_source_state(source_id: str, payload: dict[str, Any]) -> None:
_card_source_state_path(source_id).write_text(
json.dumps(payload, indent=2, ensure_ascii=False) + '\n',
encoding='utf-8',
)
def _sync_card_sources(*, force: bool = False, source_id: str | None = None) -> list[dict[str, Any]]:
now = datetime.now(timezone.utc)
results: list[dict[str, Any]] = []
for config in _load_card_source_configs():
current_id = str(config.get('id', ''))
if source_id and current_id != source_id:
continue
state = _load_card_source_state(current_id)
last_completed_raw = str(state.get('last_completed_at', '') or '')
should_run = force
if not should_run:
if not last_completed_raw:
should_run = True
else:
try:
last_completed = datetime.fromisoformat(last_completed_raw)
elapsed_ms = (now - last_completed).total_seconds() * 1000
should_run = elapsed_ms >= int(config.get('min_interval_ms', 10000))
except ValueError:
should_run = True
if not should_run:
results.append({'id': current_id, 'status': 'skipped'})
continue
try:
script_file = _resolve_workspace_script(str(config.get('script', '')))
returncode, stdout_text, stderr_text = _run_workspace_script(
script_file,
list(config.get('args', [])),
timeout_seconds=float(config.get('timeout_seconds', 60)),
)
runtime = {
'id': current_id,
'last_started_at': now.isoformat(),
'last_completed_at': datetime.now(timezone.utc).isoformat(),
'last_return_code': returncode,
'script': str(config.get('script', '')),
'args': list(config.get('args', [])),
}
if returncode != 0:
runtime['last_error'] = stderr_text[:_MAX_SCRIPT_PROXY_STDERR_CHARS]
_save_card_source_state(current_id, runtime)
results.append({'id': current_id, 'status': 'error', 'error': runtime['last_error']})
continue
parsed_stdout: Any = None
if stdout_text:
try:
parsed_stdout = json.loads(stdout_text)
except json.JSONDecodeError:
parsed_stdout = stdout_text
runtime['last_result'] = parsed_stdout
runtime.pop('last_error', None)
_save_card_source_state(current_id, runtime)
results.append({'id': current_id, 'status': 'synced', 'result': parsed_stdout})
except subprocess.TimeoutExpired:
runtime = {
'id': current_id,
'last_started_at': now.isoformat(),
'last_completed_at': datetime.now(timezone.utc).isoformat(),
'last_return_code': -1,
'last_error': 'card source timed out',
'script': str(config.get('script', '')),
'args': list(config.get('args', [])),
}
_save_card_source_state(current_id, runtime)
results.append({'id': current_id, 'status': 'error', 'error': 'card source timed out'})
except Exception as exc:
runtime = {
'id': current_id,
'last_started_at': now.isoformat(),
'last_completed_at': datetime.now(timezone.utc).isoformat(),
'last_return_code': -1,
'last_error': str(exc),
'script': str(config.get('script', '')),
'args': list(config.get('args', [])),
}
_save_card_source_state(current_id, runtime)
results.append({'id': current_id, 'status': 'error', 'error': str(exc)})
return results
def _resolve_workspace_script(script_path: str) -> Path:
normalized = script_path.strip().lstrip("/")
if not normalized:
raise ValueError("script path is required")
root = NANOBOT_SCRIPT_WORKSPACE.resolve()
candidate = (root / normalized).resolve()
try:
candidate.relative_to(root)
except ValueError as exc:
raise ValueError("script path escapes workspace") from exc
if not candidate.is_file():
raise ValueError(f"script not found: {normalized}")
if candidate.suffix.lower() != ".py":
raise ValueError("only Python scripts are supported")
return candidate
def _script_proxy_args(request: Request) -> list[str]:
unknown_keys = sorted({key for key in request.query_params.keys() if key != "arg"})
if unknown_keys:
raise ValueError(
"unsupported script query parameters: " + ", ".join(unknown_keys)
)
args = [str(value) for value in request.query_params.getlist("arg")]
if len(args) > _MAX_SCRIPT_PROXY_ARGS:
raise ValueError(
f"too many script arguments ({len(args)} > {_MAX_SCRIPT_PROXY_ARGS})"
)
return args
# ---------------------------------------------------------------------------
# API routes
# ---------------------------------------------------------------------------
@app.get("/health")
async def health() -> JSONResponse:
return JSONResponse({"status": "ok"})
@app.api_route(
"/ha/proxy/{target_path:path}",
methods=["GET", "POST", "PUT", "PATCH", "DELETE"],
)
async def home_assistant_proxy(target_path: str, request: Request) -> Response:
raw_target = target_path.strip()
if not raw_target:
return JSONResponse({"error": "target path is required"}, status_code=400)
try:
mcp_url, auth_headers = _get_home_assistant_mcp_config()
origin = _home_assistant_origin(mcp_url)
api_path = _normalize_home_assistant_proxy_path(raw_target)
except ValueError as exc:
return JSONResponse({"error": str(exc)}, status_code=400)
except RuntimeError as exc:
return JSONResponse({"error": str(exc)}, status_code=502)
target_url = f"{origin}{api_path}"
if request.url.query:
target_url = f"{target_url}?{request.url.query}"
outbound_headers = dict(auth_headers)
incoming_content_type = request.headers.get("content-type")
if incoming_content_type:
outbound_headers["Content-Type"] = incoming_content_type
incoming_accept = request.headers.get("accept")
if incoming_accept:
outbound_headers["Accept"] = incoming_accept
outbound_body = None if request.method in {"GET", "HEAD"} else await request.body()
try:
async with httpx.AsyncClient(timeout=20.0, follow_redirects=True) as client:
upstream = await client.request(
request.method,
target_url,
headers=outbound_headers,
content=outbound_body,
)
except httpx.RequestError as exc:
return JSONResponse(
{"error": f"Home Assistant connection failed: {exc}"},
status_code=502,
)
media_type = upstream.headers.get("content-type") or "application/json"
return Response(
content=upstream.content,
status_code=upstream.status_code,
media_type=media_type,
)
@app.get("/script/proxy/{script_path:path}")
async def workspace_script_proxy(script_path: str, request: Request) -> JSONResponse:
try:
script_file = _resolve_workspace_script(script_path)
args = _script_proxy_args(request)
except ValueError as exc:
return JSONResponse({"error": str(exc)}, status_code=400)
try:
returncode, stdout_text, stderr_text = await asyncio.to_thread(
_run_workspace_script,
script_file,
args,
timeout_seconds=60.0,
)
except subprocess.TimeoutExpired:
return JSONResponse({"error": "script execution timed out"}, status_code=504)
except OSError as exc:
return JSONResponse({"error": f"failed to start script: {exc}"}, status_code=502)
if returncode != 0:
return JSONResponse(
{
"error": f"script exited with code {returncode}",
"stderr": stderr_text[:_MAX_SCRIPT_PROXY_STDERR_CHARS],
},
status_code=502,
)
try:
payload = json.loads(stdout_text)
except json.JSONDecodeError as exc:
return JSONResponse(
{
"error": f"script did not return valid JSON: {exc}",
"stderr": stderr_text[:_MAX_SCRIPT_PROXY_STDERR_CHARS],
},
status_code=502,
)
return JSONResponse(payload)
@app.get("/cards")
async def get_cards() -> JSONResponse:
_sync_card_sources()
return JSONResponse(_load_cards())
@app.post("/cards/sync")
async def sync_cards_endpoint(request: Request) -> JSONResponse:
try:
payload = await request.json()
except Exception:
payload = {}
if not isinstance(payload, dict):
payload = {}
raw_source_id = str(payload.get('source_id', '')).strip()
source_id = _normalize_card_id(raw_source_id) if raw_source_id else ''
if raw_source_id and not source_id:
return JSONResponse({'error': 'invalid source id'}, status_code=400)
results = _sync_card_sources(force=True, source_id=source_id or None)
return JSONResponse({'status': 'ok', 'results': results})
@app.delete("/cards/{card_id}")
async def delete_card(card_id: str) -> JSONResponse:
if not _normalize_card_id(card_id):
return JSONResponse({"error": "invalid card id"}, status_code=400)
_delete_card(card_id)
return JSONResponse({"status": "ok"})
@app.get("/templates")
async def get_templates() -> JSONResponse:
return JSONResponse(_list_templates())
@app.post("/templates")
async def save_template(request: Request) -> JSONResponse:
payload = await request.json()
key = _normalize_template_key(str(payload.get("key", "")))
title = str(payload.get("title", "")).strip()
content = str(payload.get("content", "")).strip()
notes = str(payload.get("notes", "")).strip()
example_state = payload.get("example_state", {})
if not isinstance(example_state, dict):
example_state = {}
if not key:
return JSONResponse({"error": "template key is required"}, status_code=400)
if not content:
return JSONResponse({"error": "template content is required"}, status_code=400)
template_dir = _template_dir(key)
template_dir.mkdir(parents=True, exist_ok=True)
now = datetime.now(timezone.utc).isoformat()
existing_meta = _read_template_meta(key)
created_at = str(existing_meta.get("created_at") or now)
_template_html_path(key).write_text(content, encoding="utf-8")
_template_meta_path(key).write_text(
json.dumps(
{
"key": key,
"title": title,
"notes": notes,
"example_state": example_state,
"created_at": created_at,
"updated_at": now,
},
indent=2,
ensure_ascii=False,
)
+ "\n",
encoding="utf-8",
)
_sync_templates_context_file()
return JSONResponse(
{
"status": "ok",
"id": key,
"key": key,
"example_state": example_state,
"file_url": f"/card-templates/{key}/template.html",
}
)
@app.delete("/templates/{template_key}")
async def delete_template(template_key: str) -> JSONResponse:
key = _normalize_template_key(template_key)
if not key:
return JSONResponse({"error": "invalid template key"}, status_code=400)
shutil.rmtree(_template_dir(key), ignore_errors=True)
_sync_templates_context_file()
return JSONResponse({"status": "ok", "key": key})
@app.post("/message")
async def post_message(request: Request) -> JSONResponse:
payload = await request.json()
text = str(payload.get("text", "")).strip()
metadata = payload.get("metadata", {})
if not text:
return JSONResponse({"error": "empty message"}, status_code=400)
if not isinstance(metadata, dict):
metadata = {}
try:
await gateway.send_user_message(text, metadata=metadata)
except RuntimeError as exc:
return JSONResponse({"error": str(exc)}, status_code=503)
return JSONResponse({"status": "ok"})
@app.post("/rtc/offer")
async def rtc_offer(request: Request) -> JSONResponse:
global _active_session, _active_queue, _sender_task
payload = await request.json()
if _active_session is not None:
await _active_session.close()
_active_session = None
if _active_queue is not None:
await gateway.unsubscribe(_active_queue)
_active_queue = None
if _sender_task is not None:
_sender_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await _sender_task
_sender_task = None
queue = await gateway.subscribe()
_active_queue = queue
voice_session = WebRTCVoiceSession(gateway=gateway)
_active_session = voice_session
_sender_task = asyncio.create_task(
_sender_loop(queue, voice_session),
name="rtc-sender",
)
answer = await voice_session.handle_offer(payload)
if answer is None:
return JSONResponse(
{"error": "WebRTC backend unavailable on host (aiortc is not installed)."},
status_code=503,
)
await gateway.connect_nanobot()
return JSONResponse(answer)
@app.on_event("shutdown")
async def on_shutdown() -> None:
global _active_session, _active_queue, _sender_task
if _sender_task is not None:
_sender_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await _sender_task
if _active_session is not None:
await _active_session.close()
if _active_queue is not None:
await gateway.unsubscribe(_active_queue)
await gateway.shutdown()
async def _sender_loop(
queue: asyncio.Queue,
voice_session: "WebRTCVoiceSession",
) -> None:
while True:
event = await queue.get()
if event.role == "nanobot-tts":
await voice_session.queue_output_text(event.text)
continue
typed_event = _to_typed_message(event.to_dict())
if typed_event is None:
continue
if typed_event.get("type") == "card":
persisted = _persist_card(typed_event)
if persisted is None:
continue
payload = dict(persisted)
payload["type"] = "card"
voice_session.send_to_datachannel(payload)
continue
voice_session.send_to_datachannel(typed_event)
@app.on_event("startup")
async def on_startup() -> None:
CARD_INSTANCES_DIR.mkdir(parents=True, exist_ok=True)
CARD_TEMPLATES_DIR.mkdir(parents=True, exist_ok=True)
_sync_templates_context_file()
if DIST_DIR.exists():
assets_dir = DIST_DIR / "assets"
if assets_dir.exists():
app.mount("/assets", StaticFiles(directory=str(assets_dir)), name="assets")
if CARD_TEMPLATES_DIR.exists():
app.mount(
"/card-templates",
StaticFiles(directory=str(CARD_TEMPLATES_DIR)),
name="card-templates",
)
@app.get("/{full_path:path}")
async def spa_fallback(full_path: str) -> FileResponse:
candidate = DIST_DIR / full_path
if candidate.is_file():
return FileResponse(str(candidate))
response = FileResponse(str(DIST_DIR / "index.html"))
response.headers["Cache-Control"] = "no-store, no-cache, must-revalidate"
response.headers["Pragma"] = "no-cache"
response.headers["Expires"] = "0"
return response