"""SuperTonic Gateway - nanobot integration for the web UI. Connects to the already-running nanobot process via a Unix domain socket. nanobot must be started separately (e.g. ``nanobot gateway``) with the API channel enabled in its config. Wire protocol (newline-delimited JSON-RPC 2.0) ----------------------------------------------- Client -> nanobot notifications:: {"jsonrpc": "2.0", "method": "message.send", "params": {"content": "hello", "chat_id": "web", "metadata": {}}} {"jsonrpc": "2.0", "method": "card.respond", "params": {"card_id": "card_123", "value": "Option A"}} {"jsonrpc": "2.0", "method": "command.execute", "params": {"command": "reset", "chat_id": "web"}} nanobot -> client notifications:: {"jsonrpc": "2.0", "method": "message", "params": {"content": "Hi!", "chat_id": "web", "is_progress": false}} {"jsonrpc": "2.0", "method": "agent_state", "params": {"state": "thinking", "chat_id": "web"}} {"jsonrpc": "2.0", "method": "card", "params": {"id": "card_123", "kind": "text", "title": "Weather", "lane": "context"}} """ from __future__ import annotations import asyncio import json import os from pathlib import Path from typing import Any from wisper import WisperBus, WisperEvent DEFAULT_SOCKET_PATH = Path.home() / ".nanobot" / "api.sock" _JSONRPC_VERSION = "2.0" def _encode(obj: dict[str, Any]) -> bytes: return (json.dumps(obj, ensure_ascii=False) + "\n").encode("utf-8") def _jsonrpc_notification(method: str, params: dict[str, Any] | None = None) -> dict[str, Any]: payload: dict[str, Any] = { "jsonrpc": _JSONRPC_VERSION, "method": method, } if params is not None: payload["params"] = params return payload class NanobotApiProcess: """Connects to the running nanobot process via its Unix domain socket.""" def __init__(self, bus: WisperBus, socket_path: Path) -> None: self._bus = bus self._socket_path = socket_path self._reader: asyncio.StreamReader | None = None self._writer: asyncio.StreamWriter | None = None self._read_task: asyncio.Task | None = None self._socket_inode: int | None = None @property def running(self) -> bool: return ( self._writer is not None and not self._writer.is_closing() and self._read_task is not None and not self._read_task.done() ) def matches_current_socket(self) -> bool: if self._socket_inode is None: return False try: return self._socket_path.stat().st_ino == self._socket_inode except FileNotFoundError: return False except OSError: return False async def start(self) -> None: if self.running: await self._bus.publish(WisperEvent(role="system", text="Already connected to nanobot.")) return if not self._socket_path.exists(): await self._bus.publish( WisperEvent( role="system", text=( f"Nanobot API socket not found at {self._socket_path}. " "Make sure nanobot is running with the API channel enabled " "(set channels.api.enabled = true in ~/.nanobot/config.json, " "then run: nanobot gateway)." ), ) ) return try: self._reader, self._writer = await asyncio.open_unix_connection( path=str(self._socket_path) ) self._socket_inode = self._socket_path.stat().st_ino except OSError as exc: await self._bus.publish( WisperEvent(role="system", text=f"Could not connect to nanobot API socket: {exc}") ) return self._read_task = asyncio.create_task(self._read_loop(), name="nanobot-api-reader") await self._bus.publish(WisperEvent(role="system", text="Connected to nanobot.")) async def send(self, text: str, metadata: dict[str, Any] | None = None) -> None: if not self.running or self._writer is None: await self._bus.publish(WisperEvent(role="system", text="Not connected to nanobot.")) raise RuntimeError("Not connected to nanobot.") try: await self._send_notification( "message.send", { "content": text, "chat_id": "web", "metadata": dict(metadata or {}), }, ) except OSError as exc: await self._bus.publish(WisperEvent(role="system", text=f"Send failed: {exc}")) await self._cleanup() raise RuntimeError(f"Send failed: {exc}") from exc async def send_card_response(self, card_id: str, value: str) -> None: if not self.running or self._writer is None: raise RuntimeError("Not connected to nanobot.") try: await self._send_notification( "card.respond", { "card_id": card_id, "value": value, }, ) except OSError as exc: await self._bus.publish(WisperEvent(role="system", text=f"Send failed: {exc}")) await self._cleanup() raise RuntimeError(f"Send failed: {exc}") from exc async def send_command(self, command: str) -> None: if not self.running or self._writer is None: await self._bus.publish(WisperEvent(role="system", text="Not connected to nanobot.")) raise RuntimeError("Not connected to nanobot.") try: await self._send_notification( "command.execute", { "command": command, "chat_id": "web", }, ) except OSError as exc: await self._bus.publish(WisperEvent(role="system", text=f"Send failed: {exc}")) await self._cleanup() raise RuntimeError(f"Send failed: {exc}") from exc async def stop(self) -> None: await self._cleanup() await self._bus.publish(WisperEvent(role="system", text="Disconnected from nanobot.")) async def _cleanup(self) -> None: if self._read_task and not self._read_task.done(): self._read_task.cancel() try: await self._read_task except asyncio.CancelledError: pass self._read_task = None if self._writer: try: self._writer.close() await self._writer.wait_closed() except OSError: pass self._writer = None self._reader = None self._socket_inode = None async def _send_notification(self, method: str, params: dict[str, Any]) -> None: assert self._writer is not None self._writer.write(_encode(_jsonrpc_notification(method, params))) await self._writer.drain() async def _read_loop(self) -> None: assert self._reader is not None try: while True: try: line = await self._reader.readline() except OSError: break if not line: break await self._handle_line(line) finally: await self._bus.publish(WisperEvent(role="system", text="Nanobot closed the connection.")) self._writer = None self._reader = None async def _handle_line(self, line: bytes) -> None: raw = line.decode(errors="replace").strip() if not raw: return try: obj = json.loads(raw) except json.JSONDecodeError: await self._bus.publish( WisperEvent(role="system", text=f"Malformed response from nanobot: {raw[:200]}") ) return if not isinstance(obj, dict) or obj.get("jsonrpc") != _JSONRPC_VERSION: await self._bus.publish( WisperEvent(role="system", text=f"Malformed response from nanobot: {raw[:200]}") ) return if "method" not in obj: error = obj.get("error") if isinstance(error, dict): message = str(error.get("message", "Unknown JSON-RPC error")) await self._bus.publish(WisperEvent(role="system", text=f"Nanobot error: {message}")) return method = str(obj.get("method", "")).strip() params = obj.get("params", {}) if params is None or not isinstance(params, dict): params = {} if method == "message": content = str(params.get("content", "")) is_progress = bool(params.get("is_progress", False)) is_tool_hint = bool(params.get("is_tool_hint", False)) if is_progress: role = "nanobot-tool" if is_tool_hint else "nanobot-progress" await self._bus.publish(WisperEvent(role=role, text=content)) else: await self._bus.publish(WisperEvent(role="nanobot", text=content)) await self._bus.publish(WisperEvent(role="nanobot-tts", text=content)) elif method == "agent_state": state = str(params.get("state", "")) await self._bus.publish(WisperEvent(role="agent-state", text=state)) elif method == "card": await self._bus.publish(WisperEvent(role="card", text=json.dumps(params))) class SuperTonicGateway: def __init__(self) -> None: self.bus = WisperBus() self._lock = asyncio.Lock() self._process: NanobotApiProcess | None = None socket_path = Path(os.getenv("NANOBOT_API_SOCKET", str(DEFAULT_SOCKET_PATH))).expanduser() self._socket_path = socket_path async def subscribe(self) -> asyncio.Queue[WisperEvent]: return await self.bus.subscribe() async def unsubscribe(self, queue: asyncio.Queue[WisperEvent]) -> None: await self.bus.unsubscribe(queue) async def connect_nanobot(self) -> None: async with self._lock: if self._process and self._process.running: await self.bus.publish(WisperEvent(role="system", text="Already connected to nanobot.")) return self._process = NanobotApiProcess(bus=self.bus, socket_path=self._socket_path) await self._process.start() async def _ensure_connected_process(self) -> NanobotApiProcess: if self._process and self._process.running and self._process.matches_current_socket(): return self._process if self._process: await self._process.stop() self._process = NanobotApiProcess(bus=self.bus, socket_path=self._socket_path) await self._process.start() if not self._process.running or not self._process.matches_current_socket(): raise RuntimeError("Not connected to nanobot.") return self._process async def send_user_message(self, text: str, metadata: dict[str, Any] | None = None) -> None: message = text.strip() if not message: return await self.bus.publish(WisperEvent(role="user", text=message)) async with self._lock: process = await self._ensure_connected_process() await process.send(message, metadata=metadata) async def send_card_response(self, card_id: str, value: str) -> None: async with self._lock: process = await self._ensure_connected_process() await process.send_card_response(card_id, value) async def send_command(self, command: str) -> None: async with self._lock: process = await self._ensure_connected_process() await process.send_command(command) async def disconnect_nanobot(self) -> None: async with self._lock: if self._process: await self._process.stop() self._process = None async def shutdown(self) -> None: await self.disconnect_nanobot()