import asyncio
import contextlib
import html
import json
import os
import re
import shutil
import uuid
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse, JSONResponse
from fastapi.staticfiles import StaticFiles
from supertonic_gateway import SuperTonicGateway
from voice_rtc import WebRTCVoiceSession
BASE_DIR = Path(__file__).resolve().parent
DIST_DIR = BASE_DIR / "frontend" / "dist"
NANOBOT_WORKSPACE = Path(os.getenv("NANOBOT_WORKSPACE", str(Path.home() / ".nanobot"))).expanduser()
NANOBOT_API_SOCKET = Path(
os.getenv("NANOBOT_API_SOCKET", str(Path.home() / ".nanobot" / "api.sock"))
).expanduser()
CARDS_ROOT = NANOBOT_WORKSPACE / "cards"
CARD_INSTANCES_DIR = CARDS_ROOT / "instances"
CARD_TEMPLATES_DIR = CARDS_ROOT / "templates"
TEMPLATES_CONTEXT_PATH = NANOBOT_WORKSPACE / "CARD_TEMPLATES.md"
MAX_TEMPLATES_IN_PROMPT = 12
MAX_TEMPLATE_HTML_CHARS = 4000
_INVALID_TEMPLATE_KEY_CHARS = re.compile(r"[^a-z0-9_-]+")
_CARD_ID_PATTERN = re.compile(r"^[a-zA-Z0-9_-]{1,128}$")
_CARD_LANE_ORDER = {"attention": 0, "work": 1, "context": 2, "history": 3}
_CARD_STATE_ORDER = {"active": 0, "stale": 1, "resolved": 2, "superseded": 3, "archived": 4}
_JSONRPC_VERSION = "2.0"
_TOOL_JOB_TIMEOUT_SECONDS = 300.0
_TOOL_JOB_RETENTION_SECONDS = 15 * 60
_NANOBOT_API_STREAM_LIMIT = 2 * 1024 * 1024
CARD_INSTANCES_DIR.mkdir(parents=True, exist_ok=True)
CARD_TEMPLATES_DIR.mkdir(parents=True, exist_ok=True)
app = FastAPI(title="Nanobot SuperTonic Wisper Web")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
gateway = SuperTonicGateway()
_active_session: WebRTCVoiceSession | None = None
_active_queue: asyncio.Queue | None = None
_sender_task: asyncio.Task | None = None
_tool_jobs: dict[str, dict[str, Any]] = {}
_tool_job_tasks: dict[str, asyncio.Task[None]] = {}
_tool_job_lock = asyncio.Lock()
# ---------------------------------------------------------------------------
# Cards (file-backed)
# ---------------------------------------------------------------------------
def _normalize_template_key(raw: str) -> str:
key = _INVALID_TEMPLATE_KEY_CHARS.sub("-", raw.strip().lower()).strip("-")
return key[:64]
def _normalize_card_id(raw: str) -> str:
card_id = raw.strip()
return card_id if _CARD_ID_PATTERN.fullmatch(card_id) else ""
def _card_instance_dir(card_id: str) -> Path | None:
card_id_clean = _normalize_card_id(card_id)
if not card_id_clean:
return None
return CARD_INSTANCES_DIR / card_id_clean
def _card_meta_path(card_id: str) -> Path | None:
instance_dir = _card_instance_dir(card_id)
if instance_dir is None:
return None
return instance_dir / "card.json"
def _card_state_path(card_id: str) -> Path | None:
instance_dir = _card_instance_dir(card_id)
if instance_dir is None:
return None
return instance_dir / "state.json"
def _decode_object(raw: str) -> dict[str, Any] | None:
try:
payload = json.loads(raw)
except (TypeError, ValueError):
return None
return payload if isinstance(payload, dict) else None
async def _read_json_request(request: Request) -> dict[str, Any]:
try:
payload = await request.json()
except (json.JSONDecodeError, UnicodeDecodeError) as exc:
raise ValueError("request body must be valid JSON") from exc
if not isinstance(payload, dict):
raise ValueError("request body must be a JSON object")
return payload
def _coerce_card_record(raw: dict[str, Any]) -> dict[str, Any] | None:
card_id = _normalize_card_id(str(raw.get("id", "")))
if not card_id:
return None
kind = str(raw.get("kind", "text") or "text").strip().lower()
if kind not in {"text", "question"}:
kind = "text"
lane = str(raw.get("lane", "context") or "context").strip().lower()
if lane not in _CARD_LANE_ORDER:
lane = "context"
state = str(raw.get("state", "active") or "active").strip().lower()
if state not in _CARD_STATE_ORDER:
state = "active"
try:
priority = int(raw.get("priority", 50))
except (TypeError, ValueError):
priority = 50
priority = max(0, min(priority, 100))
raw_choices = raw.get("choices", [])
choices = [str(choice) for choice in raw_choices] if isinstance(raw_choices, list) else []
raw_template_state = raw.get("template_state", {})
template_state = raw_template_state if isinstance(raw_template_state, dict) else {}
return {
"id": card_id,
"kind": kind,
"title": str(raw.get("title", "")),
"content": str(raw.get("content", "")) if kind == "question" else "",
"question": str(raw.get("question", "")),
"choices": choices,
"response_value": str(raw.get("response_value", "")),
"slot": str(raw.get("slot", "")),
"lane": lane,
"priority": priority,
"state": state,
"template_key": str(raw.get("template_key", "")),
"template_state": template_state,
"context_summary": str(raw.get("context_summary", "")),
"chat_id": str(raw.get("chat_id", "web") or "web"),
"created_at": str(raw.get("created_at", "")),
"updated_at": str(raw.get("updated_at", "")),
}
def _json_script_text(payload: dict[str, Any]) -> str:
return json.dumps(payload, ensure_ascii=False).replace("", "<\\/")
def _materialize_card_content(card: dict[str, Any]) -> str:
if card.get("kind") != "text":
return str(card.get("content", ""))
template_key = str(card.get("template_key", "")).strip()
if not template_key:
return ""
html_path = _template_html_path(template_key)
try:
template_html = html_path.read_text(encoding="utf-8")
except Exception:
return (
'
'
f"Missing template: {html.escape(template_key)}"
"
"
)
state_payload = card.get("template_state", {})
if not isinstance(state_payload, dict):
state_payload = {}
card_id = html.escape(str(card.get("id", "")))
safe_template_key = html.escape(template_key)
return (
f''
f''
f"{template_html}"
"
"
)
def _load_card(card_id: str) -> dict[str, Any] | None:
meta_path = _card_meta_path(card_id)
if meta_path is None or not meta_path.exists():
return None
try:
raw = json.loads(meta_path.read_text(encoding="utf-8"))
except Exception:
return None
if not isinstance(raw, dict):
return None
state_path = _card_state_path(card_id)
if state_path is not None and state_path.exists():
try:
raw_state = json.loads(state_path.read_text(encoding="utf-8"))
except Exception:
raw_state = {}
if isinstance(raw_state, dict):
raw["template_state"] = raw_state
card = _coerce_card_record(raw)
if card is None:
return None
card["content"] = _materialize_card_content(card)
return card
def _write_card(card: dict[str, Any]) -> dict[str, Any] | None:
normalized = _coerce_card_record(card)
if normalized is None:
return None
now = datetime.now(timezone.utc).isoformat()
existing = _load_card(normalized["id"])
if existing is not None:
normalized["created_at"] = existing.get("created_at") or normalized.get("created_at") or now
else:
normalized["created_at"] = normalized.get("created_at") or now
normalized["updated_at"] = normalized.get("updated_at") or now
instance_dir = _card_instance_dir(normalized["id"])
meta_path = _card_meta_path(normalized["id"])
state_path = _card_state_path(normalized["id"])
if instance_dir is None or meta_path is None or state_path is None:
return None
instance_dir.mkdir(parents=True, exist_ok=True)
template_state = normalized.pop("template_state", {})
meta_path.write_text(
json.dumps(normalized, indent=2, ensure_ascii=False) + "\n",
encoding="utf-8",
)
if normalized["kind"] == "text":
state_path.write_text(
json.dumps(
template_state if isinstance(template_state, dict) else {},
indent=2,
ensure_ascii=False,
)
+ "\n",
encoding="utf-8",
)
else:
state_path.unlink(missing_ok=True)
return _load_card(normalized["id"])
def _sort_cards(cards: list[dict[str, Any]]) -> list[dict[str, Any]]:
return sorted(
cards,
key=lambda item: (
_CARD_LANE_ORDER.get(str(item.get("lane", "context")), 99),
_CARD_STATE_ORDER.get(str(item.get("state", "active")), 99),
-int(item.get("priority", 0) or 0),
str(item.get("updated_at", "")),
str(item.get("created_at", "")),
),
reverse=False,
)
def _load_cards() -> list[dict[str, Any]]:
CARD_INSTANCES_DIR.mkdir(parents=True, exist_ok=True)
cards: list[dict[str, Any]] = []
for instance_dir in CARD_INSTANCES_DIR.iterdir():
if not instance_dir.is_dir():
continue
card = _load_card(instance_dir.name)
if card is None:
continue
if card.get("state") == "archived":
continue
cards.append(card)
return _sort_cards(cards)
def _find_card_by_slot(slot: str, *, chat_id: str) -> dict[str, Any] | None:
target_slot = slot.strip()
if not target_slot:
return None
for card in _load_cards():
if str(card.get("chat_id", "")).strip() != chat_id:
continue
if str(card.get("slot", "")).strip() == target_slot:
return card
return None
def _persist_card(card: dict[str, Any]) -> dict[str, Any] | None:
normalized = _coerce_card_record(card)
if normalized is None:
return None
existing_same_slot = None
slot = str(normalized.get("slot", "")).strip()
chat_id = str(normalized.get("chat_id", "web") or "web")
if slot:
existing_same_slot = _find_card_by_slot(slot, chat_id=chat_id)
if existing_same_slot is not None and existing_same_slot["id"] != normalized["id"]:
superseded = dict(existing_same_slot)
superseded["state"] = "superseded"
superseded["updated_at"] = datetime.now(timezone.utc).isoformat()
_write_card(superseded)
return _write_card(normalized)
def _delete_card(card_id: str) -> bool:
instance_dir = _card_instance_dir(card_id)
if instance_dir is None or not instance_dir.exists():
return False
shutil.rmtree(instance_dir, ignore_errors=True)
return True
def _template_dir(template_key: str) -> Path:
return CARD_TEMPLATES_DIR / template_key
def _template_html_path(template_key: str) -> Path:
return _template_dir(template_key) / "template.html"
def _template_meta_path(template_key: str) -> Path:
return _template_dir(template_key) / "manifest.json"
def _read_template_meta(template_key: str) -> dict[str, Any]:
meta_path = _template_meta_path(template_key)
try:
meta = json.loads(meta_path.read_text(encoding="utf-8"))
return meta if isinstance(meta, dict) else {}
except Exception:
return {}
def _list_templates(limit: int | None = None) -> list[dict[str, Any]]:
CARD_TEMPLATES_DIR.mkdir(parents=True, exist_ok=True)
templates: list[dict[str, Any]] = []
for template_dir in CARD_TEMPLATES_DIR.iterdir():
if not template_dir.is_dir():
continue
key = _normalize_template_key(template_dir.name)
if not key:
continue
html_path = _template_html_path(key)
if not html_path.exists():
continue
try:
content = html_path.read_text(encoding="utf-8")
except Exception:
continue
stat = html_path.stat()
meta = _read_template_meta(key)
if bool(meta.get("deprecated")):
continue
created_at = str(
meta.get("created_at")
or datetime.fromtimestamp(stat.st_ctime, timezone.utc).isoformat()
)
updated_at = str(
meta.get("updated_at")
or datetime.fromtimestamp(stat.st_mtime, timezone.utc).isoformat()
)
templates.append(
{
"id": key,
"key": key,
"title": str(meta.get("title", "")),
"content": content,
"notes": str(meta.get("notes", "")),
"example_state": meta.get("example_state", {}),
"created_at": created_at,
"updated_at": updated_at,
"file_url": f"/card-templates/{key}/template.html",
}
)
templates.sort(key=lambda item: item["updated_at"], reverse=True)
if limit is not None:
return templates[: max(0, limit)]
return templates
def _render_templates_markdown(rows: list[dict[str, Any]]) -> str:
lines = [
"# Card Templates",
"",
"These are user-approved template layouts for `mcp_display_render_card` cards.",
"Each card instance should provide a `template_key` and a `template_state` JSON object.",
"Use a matching template when the request intent fits.",
"Do not rewrite the HTML layout when an existing template already fits; fill the template_state instead.",
"",
]
for row in rows:
key = str(row.get("key", "")).strip() or "unnamed"
title = str(row.get("title", "")).strip() or "(untitled)"
notes = str(row.get("notes", "")).strip() or "(no usage notes)"
content = str(row.get("content", "")).strip()
example_state = row.get("example_state", {})
if len(content) > MAX_TEMPLATE_HTML_CHARS:
content = content[:MAX_TEMPLATE_HTML_CHARS] + "\n"
html_lines = [f" {line}" for line in content.splitlines()] if content else [" "]
state_text = (
json.dumps(example_state, indent=2, ensure_ascii=False)
if isinstance(example_state, dict)
else "{}"
)
state_lines = [f" {line}" for line in state_text.splitlines()]
lines.extend(
[
f"## {key}",
f"- Title: {title}",
f"- Usage: {notes}",
"- Example State:",
*state_lines,
"- HTML:",
*html_lines,
"",
]
)
return "\n".join(lines).rstrip() + "\n"
def _sync_templates_context_file() -> None:
try:
rows = _list_templates(limit=MAX_TEMPLATES_IN_PROMPT)
if not rows:
TEMPLATES_CONTEXT_PATH.unlink(missing_ok=True)
return
TEMPLATES_CONTEXT_PATH.parent.mkdir(parents=True, exist_ok=True)
TEMPLATES_CONTEXT_PATH.write_text(_render_templates_markdown(rows), encoding="utf-8")
except Exception:
return
def _to_typed_message(event_dict: dict[str, Any]) -> dict[str, Any] | None:
role = str(event_dict.get("role", "")).strip()
text = str(event_dict.get("text", ""))
timestamp = str(event_dict.get("timestamp", ""))
if role == "agent-state":
return {"type": "agent_state", "state": text}
if role in {"nanobot", "nanobot-progress", "nanobot-tool", "system", "user"}:
return {
"type": "message",
"role": role,
"content": text,
"is_progress": role in {"nanobot-progress", "nanobot-tool"},
"is_tool_hint": role == "nanobot-tool",
"timestamp": timestamp,
}
if role == "card":
payload = _decode_object(text)
if payload is None:
return None
card = _coerce_card_record(payload)
if card is None:
return None
card["type"] = "card"
return card
return None
# ---------------------------------------------------------------------------
# Nanobot API helpers
# ---------------------------------------------------------------------------
class _NanobotApiError(RuntimeError):
def __init__(self, code: int, message: str) -> None:
super().__init__(message)
self.code = code
def _jsonrpc_request(request_id: str, method: str, params: dict[str, Any]) -> dict[str, Any]:
return {
"jsonrpc": _JSONRPC_VERSION,
"id": request_id,
"method": method,
"params": params,
}
async def _open_nanobot_api_socket() -> tuple[asyncio.StreamReader, asyncio.StreamWriter]:
if not NANOBOT_API_SOCKET.exists():
raise RuntimeError(
f"Nanobot API socket not found at {NANOBOT_API_SOCKET}. "
"Enable channels.api and start `nanobot gateway`."
)
try:
return await asyncio.open_unix_connection(
path=str(NANOBOT_API_SOCKET),
limit=_NANOBOT_API_STREAM_LIMIT,
)
except OSError as exc:
raise RuntimeError(f"failed to connect to Nanobot API socket: {exc}") from exc
async def _send_nanobot_api_request(
method: str,
params: dict[str, Any],
*,
timeout_seconds: float,
) -> Any:
request_id = str(uuid.uuid4())
reader, writer = await _open_nanobot_api_socket()
try:
writer.write(
(
json.dumps(_jsonrpc_request(request_id, method, params), ensure_ascii=False) + "\n"
).encode("utf-8")
)
await writer.drain()
loop = asyncio.get_running_loop()
deadline = loop.time() + timeout_seconds
while True:
remaining = deadline - loop.time()
if remaining <= 0:
raise RuntimeError(f"timed out waiting for Nanobot API response to {method}")
try:
line = await asyncio.wait_for(reader.readline(), timeout=remaining)
except ValueError as exc:
raise RuntimeError(
"Nanobot API response exceeded the configured stream limit"
) from exc
if not line:
raise RuntimeError("Nanobot API socket closed before responding")
try:
message = json.loads(line.decode("utf-8", errors="replace"))
except json.JSONDecodeError:
continue
if not isinstance(message, dict):
continue
if message.get("jsonrpc") != _JSONRPC_VERSION:
continue
if "method" in message:
continue
if str(message.get("id", "")).strip() != request_id:
continue
if "error" in message:
error = message.get("error", {})
if isinstance(error, dict):
raise _NanobotApiError(
int(error.get("code", -32000)),
str(error.get("message", "unknown Nanobot API error")),
)
raise _NanobotApiError(-32000, str(error))
return message.get("result")
finally:
writer.close()
await writer.wait_closed()
def _utc_now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
async def _prune_tool_jobs_locked() -> None:
cutoff = datetime.now(timezone.utc).timestamp() - _TOOL_JOB_RETENTION_SECONDS
expired_job_ids: list[str] = []
for job_id, payload in _tool_jobs.items():
finished_at = str(payload.get("finished_at", "") or "")
if not finished_at:
continue
try:
finished_ts = datetime.fromisoformat(finished_at).timestamp()
except ValueError:
finished_ts = 0.0
if finished_ts <= cutoff:
expired_job_ids.append(job_id)
for job_id in expired_job_ids:
task = _tool_job_tasks.get(job_id)
if task is not None and not task.done():
continue
_tool_jobs.pop(job_id, None)
_tool_job_tasks.pop(job_id, None)
def _serialize_tool_job(payload: dict[str, Any]) -> dict[str, Any]:
result = payload.get("result")
if not isinstance(result, dict):
result = None
return {
"job_id": str(payload.get("job_id", "")),
"tool_name": str(payload.get("tool_name", "")),
"status": str(payload.get("status", "queued") or "queued"),
"created_at": str(payload.get("created_at", "")),
"started_at": payload.get("started_at"),
"finished_at": payload.get("finished_at"),
"result": result,
"error": payload.get("error"),
"error_code": payload.get("error_code"),
}
async def _start_tool_job(tool_name: str, arguments: dict[str, Any]) -> dict[str, Any]:
job_id = uuid.uuid4().hex
payload = {
"job_id": job_id,
"tool_name": tool_name,
"status": "queued",
"created_at": _utc_now_iso(),
"started_at": None,
"finished_at": None,
"result": None,
"error": None,
"error_code": None,
}
async with _tool_job_lock:
await _prune_tool_jobs_locked()
_tool_jobs[job_id] = payload
_tool_job_tasks[job_id] = asyncio.create_task(
_run_tool_job(job_id, tool_name, dict(arguments)),
name=f"manual-tool-{job_id}",
)
return _serialize_tool_job(payload)
async def _run_tool_job(job_id: str, tool_name: str, arguments: dict[str, Any]) -> None:
async with _tool_job_lock:
payload = _tool_jobs.get(job_id)
if payload is None:
return
payload["status"] = "running"
payload["started_at"] = _utc_now_iso()
try:
result = await _send_nanobot_api_request(
"tool.call",
{"name": tool_name, "arguments": arguments},
timeout_seconds=_TOOL_JOB_TIMEOUT_SECONDS,
)
if not isinstance(result, dict):
raise RuntimeError("Nanobot API returned an invalid tool response")
async with _tool_job_lock:
payload = _tool_jobs.get(job_id)
if payload is None:
return
payload["status"] = "completed"
payload["result"] = result
payload["finished_at"] = _utc_now_iso()
except asyncio.CancelledError:
async with _tool_job_lock:
payload = _tool_jobs.get(job_id)
if payload is not None:
payload["status"] = "failed"
payload["error"] = "tool job cancelled"
payload["finished_at"] = _utc_now_iso()
raise
except _NanobotApiError as exc:
async with _tool_job_lock:
payload = _tool_jobs.get(job_id)
if payload is not None:
payload["status"] = "failed"
payload["error"] = str(exc)
payload["error_code"] = exc.code
payload["finished_at"] = _utc_now_iso()
except RuntimeError as exc:
async with _tool_job_lock:
payload = _tool_jobs.get(job_id)
if payload is not None:
payload["status"] = "failed"
payload["error"] = str(exc)
payload["finished_at"] = _utc_now_iso()
except Exception as exc:
async with _tool_job_lock:
payload = _tool_jobs.get(job_id)
if payload is not None:
payload["status"] = "failed"
payload["error"] = f"unexpected tool job error: {exc}"
payload["finished_at"] = _utc_now_iso()
finally:
async with _tool_job_lock:
_tool_job_tasks.pop(job_id, None)
await _prune_tool_jobs_locked()
async def _get_tool_job(job_id: str) -> dict[str, Any] | None:
async with _tool_job_lock:
await _prune_tool_jobs_locked()
payload = _tool_jobs.get(job_id)
return _serialize_tool_job(payload) if payload is not None else None
# ---------------------------------------------------------------------------
# API routes
# ---------------------------------------------------------------------------
@app.get("/health")
async def health() -> JSONResponse:
return JSONResponse({"status": "ok"})
@app.get("/tools")
async def list_tools() -> JSONResponse:
try:
result = await _send_nanobot_api_request("tool.list", {}, timeout_seconds=20.0)
except _NanobotApiError as exc:
status_code = 503 if exc.code == -32000 else 502
return JSONResponse({"error": str(exc)}, status_code=status_code)
except RuntimeError as exc:
return JSONResponse({"error": str(exc)}, status_code=503)
if not isinstance(result, dict):
return JSONResponse({"error": "Nanobot API returned an invalid tool list"}, status_code=502)
tools = result.get("tools", [])
if not isinstance(tools, list):
return JSONResponse({"error": "Nanobot API returned an invalid tool list"}, status_code=502)
return JSONResponse({"tools": tools})
@app.post("/tools/call")
async def call_tool(request: Request) -> JSONResponse:
try:
payload = await _read_json_request(request)
except ValueError as exc:
return JSONResponse({"error": str(exc)}, status_code=400)
tool_name = str(payload.get("tool_name", payload.get("name", ""))).strip()
if not tool_name:
return JSONResponse({"error": "tool_name is required"}, status_code=400)
arguments = payload.get("arguments", payload.get("params", {}))
if arguments is None:
arguments = {}
if not isinstance(arguments, dict):
return JSONResponse({"error": "arguments must be a JSON object"}, status_code=400)
async_requested = payload.get("async") is True
if async_requested:
job_payload = await _start_tool_job(tool_name, arguments)
return JSONResponse(job_payload, status_code=202)
try:
result = await _send_nanobot_api_request(
"tool.call",
{"name": tool_name, "arguments": arguments},
timeout_seconds=60.0,
)
except _NanobotApiError as exc:
status_code = 400 if exc.code == -32602 else 503 if exc.code == -32000 else 502
return JSONResponse({"error": str(exc)}, status_code=status_code)
except RuntimeError as exc:
return JSONResponse({"error": str(exc)}, status_code=503)
if not isinstance(result, dict):
return JSONResponse(
{"error": "Nanobot API returned an invalid tool response"}, status_code=502
)
return JSONResponse(result)
@app.get("/tools/jobs/{job_id}")
async def get_tool_job(job_id: str) -> JSONResponse:
safe_job_id = job_id.strip()
if not safe_job_id:
return JSONResponse({"error": "job id is required"}, status_code=400)
payload = await _get_tool_job(safe_job_id)
if payload is None:
return JSONResponse({"error": "tool job not found"}, status_code=404)
return JSONResponse(payload)
@app.get("/cards")
async def get_cards() -> JSONResponse:
return JSONResponse(_load_cards())
@app.delete("/cards/{card_id}")
async def delete_card(card_id: str) -> JSONResponse:
if not _normalize_card_id(card_id):
return JSONResponse({"error": "invalid card id"}, status_code=400)
_delete_card(card_id)
return JSONResponse({"status": "ok"})
@app.get("/templates")
async def get_templates() -> JSONResponse:
return JSONResponse(_list_templates())
@app.post("/templates")
async def save_template(request: Request) -> JSONResponse:
try:
payload = await _read_json_request(request)
except ValueError as exc:
return JSONResponse({"error": str(exc)}, status_code=400)
key = _normalize_template_key(str(payload.get("key", "")))
title = str(payload.get("title", "")).strip()
content = str(payload.get("content", "")).strip()
notes = str(payload.get("notes", "")).strip()
example_state = payload.get("example_state", {})
if not isinstance(example_state, dict):
example_state = {}
if not key:
return JSONResponse({"error": "template key is required"}, status_code=400)
if not content:
return JSONResponse({"error": "template content is required"}, status_code=400)
template_dir = _template_dir(key)
template_dir.mkdir(parents=True, exist_ok=True)
now = datetime.now(timezone.utc).isoformat()
existing_meta = _read_template_meta(key)
created_at = str(existing_meta.get("created_at") or now)
_template_html_path(key).write_text(content, encoding="utf-8")
_template_meta_path(key).write_text(
json.dumps(
{
"key": key,
"title": title,
"notes": notes,
"example_state": example_state,
"created_at": created_at,
"updated_at": now,
},
indent=2,
ensure_ascii=False,
)
+ "\n",
encoding="utf-8",
)
_sync_templates_context_file()
return JSONResponse(
{
"status": "ok",
"id": key,
"key": key,
"example_state": example_state,
"file_url": f"/card-templates/{key}/template.html",
}
)
@app.delete("/templates/{template_key}")
async def delete_template(template_key: str) -> JSONResponse:
key = _normalize_template_key(template_key)
if not key:
return JSONResponse({"error": "invalid template key"}, status_code=400)
shutil.rmtree(_template_dir(key), ignore_errors=True)
_sync_templates_context_file()
return JSONResponse({"status": "ok", "key": key})
@app.post("/message")
async def post_message(request: Request) -> JSONResponse:
try:
payload = await _read_json_request(request)
except ValueError as exc:
return JSONResponse({"error": str(exc)}, status_code=400)
text = str(payload.get("text", "")).strip()
metadata = payload.get("metadata", {})
if not text:
return JSONResponse({"error": "empty message"}, status_code=400)
if not isinstance(metadata, dict):
metadata = {}
try:
await gateway.send_user_message(text, metadata=metadata)
except RuntimeError as exc:
return JSONResponse({"error": str(exc)}, status_code=503)
return JSONResponse({"status": "ok"})
@app.post("/rtc/offer")
async def rtc_offer(request: Request) -> JSONResponse:
global _active_session, _active_queue, _sender_task
try:
payload = await _read_json_request(request)
except ValueError as exc:
return JSONResponse({"error": str(exc)}, status_code=400)
if _active_session is not None:
await _active_session.close()
_active_session = None
if _active_queue is not None:
await gateway.unsubscribe(_active_queue)
_active_queue = None
if _sender_task is not None:
_sender_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await _sender_task
_sender_task = None
queue = await gateway.subscribe()
_active_queue = queue
voice_session = WebRTCVoiceSession(gateway=gateway)
_active_session = voice_session
_sender_task = asyncio.create_task(
_sender_loop(queue, voice_session),
name="rtc-sender",
)
answer = await voice_session.handle_offer(payload)
if answer is None:
return JSONResponse(
{"error": "WebRTC backend unavailable on host (aiortc is not installed)."},
status_code=503,
)
await gateway.connect_nanobot()
return JSONResponse(answer)
@app.on_event("shutdown")
async def on_shutdown() -> None:
global _active_session, _active_queue, _sender_task
tool_tasks = list(_tool_job_tasks.values())
for task in tool_tasks:
task.cancel()
if tool_tasks:
with contextlib.suppress(asyncio.CancelledError):
await asyncio.gather(*tool_tasks, return_exceptions=True)
if _sender_task is not None:
_sender_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await _sender_task
if _active_session is not None:
await _active_session.close()
if _active_queue is not None:
await gateway.unsubscribe(_active_queue)
await gateway.shutdown()
async def _sender_loop(
queue: asyncio.Queue,
voice_session: "WebRTCVoiceSession",
) -> None:
while True:
event = await queue.get()
if event.role == "nanobot-tts":
await voice_session.queue_output_text(event.text)
continue
typed_event = _to_typed_message(event.to_dict())
if typed_event is None:
continue
if typed_event.get("type") == "card":
persisted = _persist_card(typed_event)
if persisted is None:
continue
payload = dict(persisted)
payload["type"] = "card"
voice_session.send_to_datachannel(payload)
continue
voice_session.send_to_datachannel(typed_event)
@app.on_event("startup")
async def on_startup() -> None:
CARD_INSTANCES_DIR.mkdir(parents=True, exist_ok=True)
CARD_TEMPLATES_DIR.mkdir(parents=True, exist_ok=True)
_sync_templates_context_file()
if DIST_DIR.exists():
assets_dir = DIST_DIR / "assets"
if assets_dir.exists():
app.mount("/assets", StaticFiles(directory=str(assets_dir)), name="assets")
if CARD_TEMPLATES_DIR.exists():
app.mount(
"/card-templates",
StaticFiles(directory=str(CARD_TEMPLATES_DIR)),
name="card-templates",
)
@app.get("/{full_path:path}")
async def spa_fallback(full_path: str) -> FileResponse:
candidate = DIST_DIR / full_path
if candidate.is_file():
return FileResponse(str(candidate))
response = FileResponse(str(DIST_DIR / "index.html"))
response.headers["Cache-Control"] = "no-store, no-cache, must-revalidate"
response.headers["Pragma"] = "no-cache"
response.headers["Expires"] = "0"
return response