chore: clean up web ui repo hygiene

This commit is contained in:
kacper 2026-03-14 20:21:44 -04:00
parent 94e62c9456
commit 2fcc9db903
4786 changed files with 1271 additions and 1275231 deletions

657
app.py
View file

@ -5,39 +5,28 @@ import json
import os
import re
import shutil
import subprocess
import sys
import uuid
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from urllib.parse import urlparse, urlunparse
import httpx
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse, JSONResponse, Response
from fastapi.responses import FileResponse, JSONResponse
from fastapi.staticfiles import StaticFiles
from supertonic_gateway import SuperTonicGateway
from voice_rtc import WebRTCVoiceSession
BASE_DIR = Path(__file__).resolve().parent
DIST_DIR = BASE_DIR / "frontend" / "dist"
NANOBOT_CONFIG_PATH = Path(
os.getenv("NANOBOT_CONFIG_PATH", str(Path.home() / ".nanobot" / "config.json"))
).expanduser()
NANOBOT_WORKSPACE = Path(
os.getenv("NANOBOT_WORKSPACE", str(Path.home() / ".nanobot"))
).expanduser()
NANOBOT_SCRIPT_WORKSPACE = Path(
os.getenv("NANOBOT_SCRIPT_WORKSPACE", str(NANOBOT_WORKSPACE / "workspace"))
NANOBOT_WORKSPACE = Path(os.getenv("NANOBOT_WORKSPACE", str(Path.home() / ".nanobot"))).expanduser()
NANOBOT_API_SOCKET = Path(
os.getenv("NANOBOT_API_SOCKET", str(Path.home() / ".nanobot" / "api.sock"))
).expanduser()
CARDS_ROOT = NANOBOT_WORKSPACE / "cards"
CARD_INSTANCES_DIR = CARDS_ROOT / "instances"
CARD_TEMPLATES_DIR = CARDS_ROOT / "templates"
CARD_SOURCES_DIR = CARDS_ROOT / "sources"
CARD_SOURCE_STATE_DIR = CARDS_ROOT / "source-state"
TEMPLATES_CONTEXT_PATH = NANOBOT_WORKSPACE / "CARD_TEMPLATES.md"
MAX_TEMPLATES_IN_PROMPT = 12
MAX_TEMPLATE_HTML_CHARS = 4000
@ -45,12 +34,12 @@ _INVALID_TEMPLATE_KEY_CHARS = re.compile(r"[^a-z0-9_-]+")
_CARD_ID_PATTERN = re.compile(r"^[a-zA-Z0-9_-]{1,128}$")
_CARD_LANE_ORDER = {"attention": 0, "work": 1, "context": 2, "history": 3}
_CARD_STATE_ORDER = {"active": 0, "stale": 1, "resolved": 2, "superseded": 3, "archived": 4}
_MAX_SCRIPT_PROXY_ARGS = 16
_MAX_SCRIPT_PROXY_STDERR_CHARS = 2000
_JSONRPC_VERSION = "2.0"
_TOOL_JOB_TIMEOUT_SECONDS = 300.0
_TOOL_JOB_RETENTION_SECONDS = 15 * 60
_NANOBOT_API_STREAM_LIMIT = 2 * 1024 * 1024
CARD_INSTANCES_DIR.mkdir(parents=True, exist_ok=True)
CARD_TEMPLATES_DIR.mkdir(parents=True, exist_ok=True)
CARD_SOURCES_DIR.mkdir(parents=True, exist_ok=True)
CARD_SOURCE_STATE_DIR.mkdir(parents=True, exist_ok=True)
app = FastAPI(title="Nanobot SuperTonic Wisper Web")
@ -66,6 +55,9 @@ gateway = SuperTonicGateway()
_active_session: WebRTCVoiceSession | None = None
_active_queue: asyncio.Queue | None = None
_sender_task: asyncio.Task | None = None
_tool_jobs: dict[str, dict[str, Any]] = {}
_tool_job_tasks: dict[str, asyncio.Task[None]] = {}
_tool_job_lock = asyncio.Lock()
# ---------------------------------------------------------------------------
@ -112,6 +104,16 @@ def _decode_object(raw: str) -> dict[str, Any] | None:
return payload if isinstance(payload, dict) else None
async def _read_json_request(request: Request) -> dict[str, Any]:
try:
payload = await request.json()
except (json.JSONDecodeError, UnicodeDecodeError) as exc:
raise ValueError("request body must be valid JSON") from exc
if not isinstance(payload, dict):
raise ValueError("request body must be a JSON object")
return payload
def _coerce_card_record(raw: dict[str, Any]) -> dict[str, Any] | None:
card_id = _normalize_card_id(str(raw.get("id", "")))
if not card_id:
@ -178,8 +180,8 @@ def _materialize_card_content(card: dict[str, Any]) -> str:
template_html = html_path.read_text(encoding="utf-8")
except Exception:
return (
"<div style=\"padding:16px;border:1px solid #fecaca;border-radius:12px;"
"background:#fef2f2;color:#991b1b;font:600 14px/1.4 system-ui,sans-serif;\">"
'<div style="padding:16px;border:1px solid #fecaca;border-radius:12px;'
'background:#fef2f2;color:#991b1b;font:600 14px/1.4 system-ui,sans-serif;">'
f"Missing template: {html.escape(template_key)}"
"</div>"
)
@ -254,7 +256,12 @@ def _write_card(card: dict[str, Any]) -> dict[str, Any] | None:
)
if normalized["kind"] == "text":
state_path.write_text(
json.dumps(template_state if isinstance(template_state, dict) else {}, indent=2, ensure_ascii=False) + "\n",
json.dumps(
template_state if isinstance(template_state, dict) else {},
indent=2,
ensure_ascii=False,
)
+ "\n",
encoding="utf-8",
)
else:
@ -376,8 +383,14 @@ def _list_templates(limit: int | None = None) -> list[dict[str, Any]]:
meta = _read_template_meta(key)
if bool(meta.get("deprecated")):
continue
created_at = str(meta.get("created_at") or datetime.fromtimestamp(stat.st_ctime, timezone.utc).isoformat())
updated_at = str(meta.get("updated_at") or datetime.fromtimestamp(stat.st_mtime, timezone.utc).isoformat())
created_at = str(
meta.get("created_at")
or datetime.fromtimestamp(stat.st_ctime, timezone.utc).isoformat()
)
updated_at = str(
meta.get("updated_at")
or datetime.fromtimestamp(stat.st_mtime, timezone.utc).isoformat()
)
templates.append(
{
"id": key,
@ -417,7 +430,11 @@ def _render_templates_markdown(rows: list[dict[str, Any]]) -> str:
if len(content) > MAX_TEMPLATE_HTML_CHARS:
content = content[:MAX_TEMPLATE_HTML_CHARS] + "\n<!-- truncated -->"
html_lines = [f" {line}" for line in content.splitlines()] if content else [" "]
state_text = json.dumps(example_state, indent=2, ensure_ascii=False) if isinstance(example_state, dict) else "{}"
state_text = (
json.dumps(example_state, indent=2, ensure_ascii=False)
if isinstance(example_state, dict)
else "{}"
)
state_lines = [f" {line}" for line in state_text.splitlines()]
lines.extend(
[
@ -478,239 +495,231 @@ def _to_typed_message(event_dict: dict[str, Any]) -> dict[str, Any] | None:
# ---------------------------------------------------------------------------
# Nanobot config / HA proxy
# Nanobot API helpers
# ---------------------------------------------------------------------------
def _load_nanobot_config() -> dict[str, Any]:
class _NanobotApiError(RuntimeError):
def __init__(self, code: int, message: str) -> None:
super().__init__(message)
self.code = code
def _jsonrpc_request(request_id: str, method: str, params: dict[str, Any]) -> dict[str, Any]:
return {
"jsonrpc": _JSONRPC_VERSION,
"id": request_id,
"method": method,
"params": params,
}
async def _open_nanobot_api_socket() -> tuple[asyncio.StreamReader, asyncio.StreamWriter]:
if not NANOBOT_API_SOCKET.exists():
raise RuntimeError(
f"Nanobot API socket not found at {NANOBOT_API_SOCKET}. "
"Enable channels.api and start `nanobot gateway`."
)
try:
return json.loads(NANOBOT_CONFIG_PATH.read_text(encoding="utf-8"))
except Exception:
return {}
return await asyncio.open_unix_connection(
path=str(NANOBOT_API_SOCKET),
limit=_NANOBOT_API_STREAM_LIMIT,
)
except OSError as exc:
raise RuntimeError(f"failed to connect to Nanobot API socket: {exc}") from exc
def _get_home_assistant_mcp_config() -> tuple[str, dict[str, str]]:
cfg = _load_nanobot_config()
tools = cfg.get("tools") if isinstance(cfg, dict) else {}
if not isinstance(tools, dict):
raise RuntimeError("nanobot config missing tools section")
mcp_servers = tools.get("mcpServers")
if not isinstance(mcp_servers, dict):
raise RuntimeError("nanobot config missing tools.mcpServers section")
raw_server = mcp_servers.get("home assistant") or mcp_servers.get("home_assistant")
if not isinstance(raw_server, dict):
raise RuntimeError("home assistant MCP server is not configured")
url = str(raw_server.get("url", "")).strip()
if not url:
raise RuntimeError("home assistant MCP server URL is empty")
raw_headers = raw_server.get("headers", {})
headers: dict[str, str] = {}
if isinstance(raw_headers, dict):
for k, v in raw_headers.items():
headers[str(k)] = str(v)
return url, headers
def _home_assistant_origin(mcp_url: str) -> str:
parsed = urlparse(mcp_url.strip())
return urlunparse(parsed._replace(path="", params="", query="", fragment="")).rstrip("/")
def _normalize_home_assistant_proxy_path(target_path: str) -> str:
normalized = "/" + target_path.lstrip("/")
if normalized == "/":
raise ValueError("target path is required")
if normalized == "/api" or normalized.startswith("/api/"):
return normalized
return f"/api{normalized}"
def _run_workspace_script(script_file: Path, args: list[str], *, timeout_seconds: float) -> tuple[int, str, str]:
process = subprocess.run(
[sys.executable, str(script_file), *args],
cwd=str(script_file.parent),
capture_output=True,
text=True,
timeout=timeout_seconds,
)
return process.returncode, process.stdout.strip(), process.stderr.strip()
def _card_source_state_path(source_id: str) -> Path:
return CARD_SOURCE_STATE_DIR / f"{source_id}.json"
def _load_card_source_configs() -> list[dict[str, Any]]:
rows: list[dict[str, Any]] = []
for path in sorted(CARD_SOURCES_DIR.glob('*.json')):
try:
raw = json.loads(path.read_text(encoding='utf-8'))
except Exception:
continue
if not isinstance(raw, dict):
continue
source_id = _normalize_card_id(str(raw.get('id') or path.stem))
if not source_id or raw.get('enabled', True) is False:
continue
script = str(raw.get('script', '')).strip()
if not script:
continue
raw_args = raw.get('args', [])
if not isinstance(raw_args, list):
raw_args = []
try:
min_interval_ms = max(0, int(raw.get('min_interval_ms', 10000)))
except (TypeError, ValueError):
min_interval_ms = 10000
try:
timeout_seconds = max(1, min(300, int(raw.get('timeout_seconds', 60))))
except (TypeError, ValueError):
timeout_seconds = 60
rows.append({
'id': source_id,
'script': script,
'args': [str(arg) for arg in raw_args][: _MAX_SCRIPT_PROXY_ARGS],
'min_interval_ms': min_interval_ms,
'timeout_seconds': timeout_seconds,
})
return rows
def _load_card_source_state(source_id: str) -> dict[str, Any]:
path = _card_source_state_path(source_id)
async def _send_nanobot_api_request(
method: str,
params: dict[str, Any],
*,
timeout_seconds: float,
) -> Any:
request_id = str(uuid.uuid4())
reader, writer = await _open_nanobot_api_socket()
try:
payload = json.loads(path.read_text(encoding='utf-8'))
return payload if isinstance(payload, dict) else {}
except Exception:
return {}
writer.write(
(
json.dumps(_jsonrpc_request(request_id, method, params), ensure_ascii=False) + "\n"
).encode("utf-8")
)
await writer.drain()
loop = asyncio.get_running_loop()
deadline = loop.time() + timeout_seconds
def _save_card_source_state(source_id: str, payload: dict[str, Any]) -> None:
_card_source_state_path(source_id).write_text(
json.dumps(payload, indent=2, ensure_ascii=False) + '\n',
encoding='utf-8',
)
while True:
remaining = deadline - loop.time()
if remaining <= 0:
raise RuntimeError(f"timed out waiting for Nanobot API response to {method}")
try:
line = await asyncio.wait_for(reader.readline(), timeout=remaining)
except ValueError as exc:
raise RuntimeError(
"Nanobot API response exceeded the configured stream limit"
) from exc
if not line:
raise RuntimeError("Nanobot API socket closed before responding")
def _sync_card_sources(*, force: bool = False, source_id: str | None = None) -> list[dict[str, Any]]:
now = datetime.now(timezone.utc)
results: list[dict[str, Any]] = []
for config in _load_card_source_configs():
current_id = str(config.get('id', ''))
if source_id and current_id != source_id:
continue
state = _load_card_source_state(current_id)
last_completed_raw = str(state.get('last_completed_at', '') or '')
should_run = force
if not should_run:
if not last_completed_raw:
should_run = True
else:
try:
last_completed = datetime.fromisoformat(last_completed_raw)
elapsed_ms = (now - last_completed).total_seconds() * 1000
should_run = elapsed_ms >= int(config.get('min_interval_ms', 10000))
except ValueError:
should_run = True
if not should_run:
results.append({'id': current_id, 'status': 'skipped'})
continue
try:
script_file = _resolve_workspace_script(str(config.get('script', '')))
returncode, stdout_text, stderr_text = _run_workspace_script(
script_file,
list(config.get('args', [])),
timeout_seconds=float(config.get('timeout_seconds', 60)),
)
runtime = {
'id': current_id,
'last_started_at': now.isoformat(),
'last_completed_at': datetime.now(timezone.utc).isoformat(),
'last_return_code': returncode,
'script': str(config.get('script', '')),
'args': list(config.get('args', [])),
}
if returncode != 0:
runtime['last_error'] = stderr_text[:_MAX_SCRIPT_PROXY_STDERR_CHARS]
_save_card_source_state(current_id, runtime)
results.append({'id': current_id, 'status': 'error', 'error': runtime['last_error']})
try:
message = json.loads(line.decode("utf-8", errors="replace"))
except json.JSONDecodeError:
continue
parsed_stdout: Any = None
if stdout_text:
try:
parsed_stdout = json.loads(stdout_text)
except json.JSONDecodeError:
parsed_stdout = stdout_text
runtime['last_result'] = parsed_stdout
runtime.pop('last_error', None)
_save_card_source_state(current_id, runtime)
results.append({'id': current_id, 'status': 'synced', 'result': parsed_stdout})
except subprocess.TimeoutExpired:
runtime = {
'id': current_id,
'last_started_at': now.isoformat(),
'last_completed_at': datetime.now(timezone.utc).isoformat(),
'last_return_code': -1,
'last_error': 'card source timed out',
'script': str(config.get('script', '')),
'args': list(config.get('args', [])),
}
_save_card_source_state(current_id, runtime)
results.append({'id': current_id, 'status': 'error', 'error': 'card source timed out'})
except Exception as exc:
runtime = {
'id': current_id,
'last_started_at': now.isoformat(),
'last_completed_at': datetime.now(timezone.utc).isoformat(),
'last_return_code': -1,
'last_error': str(exc),
'script': str(config.get('script', '')),
'args': list(config.get('args', [])),
}
_save_card_source_state(current_id, runtime)
results.append({'id': current_id, 'status': 'error', 'error': str(exc)})
return results
if not isinstance(message, dict):
continue
if message.get("jsonrpc") != _JSONRPC_VERSION:
continue
if "method" in message:
continue
if str(message.get("id", "")).strip() != request_id:
continue
if "error" in message:
error = message.get("error", {})
if isinstance(error, dict):
raise _NanobotApiError(
int(error.get("code", -32000)),
str(error.get("message", "unknown Nanobot API error")),
)
raise _NanobotApiError(-32000, str(error))
return message.get("result")
finally:
writer.close()
await writer.wait_closed()
def _resolve_workspace_script(script_path: str) -> Path:
normalized = script_path.strip().lstrip("/")
if not normalized:
raise ValueError("script path is required")
def _utc_now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
async def _prune_tool_jobs_locked() -> None:
cutoff = datetime.now(timezone.utc).timestamp() - _TOOL_JOB_RETENTION_SECONDS
expired_job_ids: list[str] = []
for job_id, payload in _tool_jobs.items():
finished_at = str(payload.get("finished_at", "") or "")
if not finished_at:
continue
try:
finished_ts = datetime.fromisoformat(finished_at).timestamp()
except ValueError:
finished_ts = 0.0
if finished_ts <= cutoff:
expired_job_ids.append(job_id)
for job_id in expired_job_ids:
task = _tool_job_tasks.get(job_id)
if task is not None and not task.done():
continue
_tool_jobs.pop(job_id, None)
_tool_job_tasks.pop(job_id, None)
def _serialize_tool_job(payload: dict[str, Any]) -> dict[str, Any]:
result = payload.get("result")
if not isinstance(result, dict):
result = None
return {
"job_id": str(payload.get("job_id", "")),
"tool_name": str(payload.get("tool_name", "")),
"status": str(payload.get("status", "queued") or "queued"),
"created_at": str(payload.get("created_at", "")),
"started_at": payload.get("started_at"),
"finished_at": payload.get("finished_at"),
"result": result,
"error": payload.get("error"),
"error_code": payload.get("error_code"),
}
async def _start_tool_job(tool_name: str, arguments: dict[str, Any]) -> dict[str, Any]:
job_id = uuid.uuid4().hex
payload = {
"job_id": job_id,
"tool_name": tool_name,
"status": "queued",
"created_at": _utc_now_iso(),
"started_at": None,
"finished_at": None,
"result": None,
"error": None,
"error_code": None,
}
async with _tool_job_lock:
await _prune_tool_jobs_locked()
_tool_jobs[job_id] = payload
_tool_job_tasks[job_id] = asyncio.create_task(
_run_tool_job(job_id, tool_name, dict(arguments)),
name=f"manual-tool-{job_id}",
)
return _serialize_tool_job(payload)
async def _run_tool_job(job_id: str, tool_name: str, arguments: dict[str, Any]) -> None:
async with _tool_job_lock:
payload = _tool_jobs.get(job_id)
if payload is None:
return
payload["status"] = "running"
payload["started_at"] = _utc_now_iso()
root = NANOBOT_SCRIPT_WORKSPACE.resolve()
candidate = (root / normalized).resolve()
try:
candidate.relative_to(root)
except ValueError as exc:
raise ValueError("script path escapes workspace") from exc
if not candidate.is_file():
raise ValueError(f"script not found: {normalized}")
if candidate.suffix.lower() != ".py":
raise ValueError("only Python scripts are supported")
return candidate
def _script_proxy_args(request: Request) -> list[str]:
unknown_keys = sorted({key for key in request.query_params.keys() if key != "arg"})
if unknown_keys:
raise ValueError(
"unsupported script query parameters: " + ", ".join(unknown_keys)
result = await _send_nanobot_api_request(
"tool.call",
{"name": tool_name, "arguments": arguments},
timeout_seconds=_TOOL_JOB_TIMEOUT_SECONDS,
)
if not isinstance(result, dict):
raise RuntimeError("Nanobot API returned an invalid tool response")
async with _tool_job_lock:
payload = _tool_jobs.get(job_id)
if payload is None:
return
payload["status"] = "completed"
payload["result"] = result
payload["finished_at"] = _utc_now_iso()
except asyncio.CancelledError:
async with _tool_job_lock:
payload = _tool_jobs.get(job_id)
if payload is not None:
payload["status"] = "failed"
payload["error"] = "tool job cancelled"
payload["finished_at"] = _utc_now_iso()
raise
except _NanobotApiError as exc:
async with _tool_job_lock:
payload = _tool_jobs.get(job_id)
if payload is not None:
payload["status"] = "failed"
payload["error"] = str(exc)
payload["error_code"] = exc.code
payload["finished_at"] = _utc_now_iso()
except RuntimeError as exc:
async with _tool_job_lock:
payload = _tool_jobs.get(job_id)
if payload is not None:
payload["status"] = "failed"
payload["error"] = str(exc)
payload["finished_at"] = _utc_now_iso()
except Exception as exc:
async with _tool_job_lock:
payload = _tool_jobs.get(job_id)
if payload is not None:
payload["status"] = "failed"
payload["error"] = f"unexpected tool job error: {exc}"
payload["finished_at"] = _utc_now_iso()
finally:
async with _tool_job_lock:
_tool_job_tasks.pop(job_id, None)
await _prune_tool_jobs_locked()
args = [str(value) for value in request.query_params.getlist("arg")]
if len(args) > _MAX_SCRIPT_PROXY_ARGS:
raise ValueError(
f"too many script arguments ({len(args)} > {_MAX_SCRIPT_PROXY_ARGS})"
)
return args
async def _get_tool_job(job_id: str) -> dict[str, Any] | None:
async with _tool_job_lock:
await _prune_tool_jobs_locked()
payload = _tool_jobs.get(job_id)
return _serialize_tool_job(payload) if payload is not None else None
# ---------------------------------------------------------------------------
@ -723,124 +732,83 @@ async def health() -> JSONResponse:
return JSONResponse({"status": "ok"})
@app.api_route(
"/ha/proxy/{target_path:path}",
methods=["GET", "POST", "PUT", "PATCH", "DELETE"],
)
async def home_assistant_proxy(target_path: str, request: Request) -> Response:
raw_target = target_path.strip()
if not raw_target:
return JSONResponse({"error": "target path is required"}, status_code=400)
@app.get("/tools")
async def list_tools() -> JSONResponse:
try:
mcp_url, auth_headers = _get_home_assistant_mcp_config()
origin = _home_assistant_origin(mcp_url)
api_path = _normalize_home_assistant_proxy_path(raw_target)
except ValueError as exc:
return JSONResponse({"error": str(exc)}, status_code=400)
result = await _send_nanobot_api_request("tool.list", {}, timeout_seconds=20.0)
except _NanobotApiError as exc:
status_code = 503 if exc.code == -32000 else 502
return JSONResponse({"error": str(exc)}, status_code=status_code)
except RuntimeError as exc:
return JSONResponse({"error": str(exc)}, status_code=502)
return JSONResponse({"error": str(exc)}, status_code=503)
target_url = f"{origin}{api_path}"
if request.url.query:
target_url = f"{target_url}?{request.url.query}"
if not isinstance(result, dict):
return JSONResponse({"error": "Nanobot API returned an invalid tool list"}, status_code=502)
outbound_headers = dict(auth_headers)
incoming_content_type = request.headers.get("content-type")
if incoming_content_type:
outbound_headers["Content-Type"] = incoming_content_type
incoming_accept = request.headers.get("accept")
if incoming_accept:
outbound_headers["Accept"] = incoming_accept
tools = result.get("tools", [])
if not isinstance(tools, list):
return JSONResponse({"error": "Nanobot API returned an invalid tool list"}, status_code=502)
return JSONResponse({"tools": tools})
outbound_body = None if request.method in {"GET", "HEAD"} else await request.body()
@app.post("/tools/call")
async def call_tool(request: Request) -> JSONResponse:
try:
async with httpx.AsyncClient(timeout=20.0, follow_redirects=True) as client:
upstream = await client.request(
request.method,
target_url,
headers=outbound_headers,
content=outbound_body,
)
except httpx.RequestError as exc:
return JSONResponse(
{"error": f"Home Assistant connection failed: {exc}"},
status_code=502,
)
media_type = upstream.headers.get("content-type") or "application/json"
return Response(
content=upstream.content,
status_code=upstream.status_code,
media_type=media_type,
)
@app.get("/script/proxy/{script_path:path}")
async def workspace_script_proxy(script_path: str, request: Request) -> JSONResponse:
try:
script_file = _resolve_workspace_script(script_path)
args = _script_proxy_args(request)
payload = await _read_json_request(request)
except ValueError as exc:
return JSONResponse({"error": str(exc)}, status_code=400)
tool_name = str(payload.get("tool_name", payload.get("name", ""))).strip()
if not tool_name:
return JSONResponse({"error": "tool_name is required"}, status_code=400)
arguments = payload.get("arguments", payload.get("params", {}))
if arguments is None:
arguments = {}
if not isinstance(arguments, dict):
return JSONResponse({"error": "arguments must be a JSON object"}, status_code=400)
async_requested = payload.get("async") is True
if async_requested:
job_payload = await _start_tool_job(tool_name, arguments)
return JSONResponse(job_payload, status_code=202)
try:
returncode, stdout_text, stderr_text = await asyncio.to_thread(
_run_workspace_script,
script_file,
args,
result = await _send_nanobot_api_request(
"tool.call",
{"name": tool_name, "arguments": arguments},
timeout_seconds=60.0,
)
except subprocess.TimeoutExpired:
return JSONResponse({"error": "script execution timed out"}, status_code=504)
except OSError as exc:
return JSONResponse({"error": f"failed to start script: {exc}"}, status_code=502)
except _NanobotApiError as exc:
status_code = 400 if exc.code == -32602 else 503 if exc.code == -32000 else 502
return JSONResponse({"error": str(exc)}, status_code=status_code)
except RuntimeError as exc:
return JSONResponse({"error": str(exc)}, status_code=503)
if returncode != 0:
if not isinstance(result, dict):
return JSONResponse(
{
"error": f"script exited with code {returncode}",
"stderr": stderr_text[:_MAX_SCRIPT_PROXY_STDERR_CHARS],
},
status_code=502,
{"error": "Nanobot API returned an invalid tool response"}, status_code=502
)
return JSONResponse(result)
try:
payload = json.loads(stdout_text)
except json.JSONDecodeError as exc:
return JSONResponse(
{
"error": f"script did not return valid JSON: {exc}",
"stderr": stderr_text[:_MAX_SCRIPT_PROXY_STDERR_CHARS],
},
status_code=502,
)
@app.get("/tools/jobs/{job_id}")
async def get_tool_job(job_id: str) -> JSONResponse:
safe_job_id = job_id.strip()
if not safe_job_id:
return JSONResponse({"error": "job id is required"}, status_code=400)
payload = await _get_tool_job(safe_job_id)
if payload is None:
return JSONResponse({"error": "tool job not found"}, status_code=404)
return JSONResponse(payload)
@app.get("/cards")
async def get_cards() -> JSONResponse:
_sync_card_sources()
return JSONResponse(_load_cards())
@app.post("/cards/sync")
async def sync_cards_endpoint(request: Request) -> JSONResponse:
try:
payload = await request.json()
except Exception:
payload = {}
if not isinstance(payload, dict):
payload = {}
raw_source_id = str(payload.get('source_id', '')).strip()
source_id = _normalize_card_id(raw_source_id) if raw_source_id else ''
if raw_source_id and not source_id:
return JSONResponse({'error': 'invalid source id'}, status_code=400)
results = _sync_card_sources(force=True, source_id=source_id or None)
return JSONResponse({'status': 'ok', 'results': results})
@app.delete("/cards/{card_id}")
async def delete_card(card_id: str) -> JSONResponse:
if not _normalize_card_id(card_id):
@ -856,7 +824,10 @@ async def get_templates() -> JSONResponse:
@app.post("/templates")
async def save_template(request: Request) -> JSONResponse:
payload = await request.json()
try:
payload = await _read_json_request(request)
except ValueError as exc:
return JSONResponse({"error": str(exc)}, status_code=400)
key = _normalize_template_key(str(payload.get("key", "")))
title = str(payload.get("title", "")).strip()
content = str(payload.get("content", "")).strip()
@ -918,7 +889,10 @@ async def delete_template(template_key: str) -> JSONResponse:
@app.post("/message")
async def post_message(request: Request) -> JSONResponse:
payload = await request.json()
try:
payload = await _read_json_request(request)
except ValueError as exc:
return JSONResponse({"error": str(exc)}, status_code=400)
text = str(payload.get("text", "")).strip()
metadata = payload.get("metadata", {})
if not text:
@ -936,7 +910,10 @@ async def post_message(request: Request) -> JSONResponse:
async def rtc_offer(request: Request) -> JSONResponse:
global _active_session, _active_queue, _sender_task
payload = await request.json()
try:
payload = await _read_json_request(request)
except ValueError as exc:
return JSONResponse({"error": str(exc)}, status_code=400)
if _active_session is not None:
await _active_session.close()
@ -975,6 +952,12 @@ async def rtc_offer(request: Request) -> JSONResponse:
@app.on_event("shutdown")
async def on_shutdown() -> None:
global _active_session, _active_queue, _sender_task
tool_tasks = list(_tool_job_tasks.values())
for task in tool_tasks:
task.cancel()
if tool_tasks:
with contextlib.suppress(asyncio.CancelledError):
await asyncio.gather(*tool_tasks, return_exceptions=True)
if _sender_task is not None:
_sender_task.cancel()
with contextlib.suppress(asyncio.CancelledError):