api channel and tools

This commit is contained in:
kacper 2026-03-05 15:10:14 -05:00
parent 9222c59f03
commit 3816a9627e
4 changed files with 684 additions and 582 deletions

View file

@ -1,442 +1,270 @@
"""SuperTonic Gateway — nanobot integration for the web UI.
Connects to the already-running nanobot process via a Unix domain socket.
nanobot must be started separately (e.g. ``nanobot gateway``) with the API
channel enabled in its config.
Wire protocol (newline-delimited JSON)
---------------------------------------
Client nanobot::
{"type": "message", "content": "hello", "chat_id": "web"}
{"type": "ping"}
{"type": "ui-response", "request_id": "<uuid>", "value": "Option A", "chat_id": "web"}
{"type": "command", "command": "reset", "chat_id": "web"}
nanobot client::
{"type": "message", "content": "Hi!", "chat_id": "web", "is_progress": false}
{"type": "agent_state", "state": "thinking", "chat_id": "web"}
{"type": "toast", "kind": "text"|"image", "content": "...", "title": "...", "duration_ms": 5000}
{"type": "choice", "request_id": "<uuid>", "question": "...", "choices": ["A", "B"],
"title": "...", "chat_id": "web"}
{"type": "pong"}
{"type": "error", "error": "..."}
The public ``SuperTonicGateway`` interface (``spawn_tui``, ``send_user_message``,
``stop_tui``, ``shutdown``) is unchanged so ``app.py`` and ``voice_rtc.py``
require no modification.
"""
from __future__ import annotations
import asyncio
import contextlib
import json
import os
import pty
import re
import shlex
import signal
import subprocess
import time
from collections import deque
from pathlib import Path
from wisper import WisperBus, WisperEvent
ANSI_ESCAPE_RE = re.compile(r"\x1B\[[0-?]*[ -/]*[@-~]")
CONTROL_CHAR_RE = re.compile(r"[\x00-\x08\x0b-\x1f\x7f]")
BRAILLE_SPINNER_RE = re.compile(r"[\u2800-\u28ff]")
SPINNER_ONLY_RE = re.compile(r"^[\s|/\\\-]+$")
BOX_DRAWING_ONLY_RE = re.compile(r"^[\s\u2500-\u257f]+$")
THINKING_LINE_RE = re.compile(
r"\b(?:agent|nanobot|napbot)\b(?:\s+is)?\s+thinking\b",
re.IGNORECASE,
)
USER_ECHO_LINE_RE = re.compile(r"^(?:you|user)\s*:", re.IGNORECASE)
TOOL_STREAM_LINE_RE = re.compile(
r"^(?:tool(?:\s+call|\s+output)?|calling\s+tool|running\s+tool|executing\s+tool)\b",
re.IGNORECASE,
)
LEADING_NON_WORD_RE = re.compile(r"^[^\w]+")
WHITESPACE_RE = re.compile(r"\s+")
AGENT_OUTPUT_PREFIX_RE = re.compile(
r"^(?:nanobot|napbot)\b\s*[:>\-]?\s*", re.IGNORECASE
)
EMOJI_RE = re.compile(
"[" # Common emoji and pictograph blocks.
"\U0001f1e6-\U0001f1ff"
"\U0001f300-\U0001f5ff"
"\U0001f600-\U0001f64f"
"\U0001f680-\U0001f6ff"
"\U0001f700-\U0001f77f"
"\U0001f780-\U0001f7ff"
"\U0001f800-\U0001f8ff"
"\U0001f900-\U0001f9ff"
"\U0001fa00-\U0001faff"
"\u2600-\u26ff"
"\u2700-\u27bf"
"\ufe0f"
"\u200d"
"]"
)
# Default path — must match nanobot's channels.api.socket_path config value.
DEFAULT_SOCKET_PATH = Path.home() / ".nanobot" / "api.sock"
def _clean_output(text: str) -> str:
cleaned = ANSI_ESCAPE_RE.sub("", text)
cleaned = BRAILLE_SPINNER_RE.sub(" ", cleaned)
cleaned = CONTROL_CHAR_RE.sub("", cleaned)
return cleaned.replace("\r", "\n")
# ---------------------------------------------------------------------------
# NanobotApiProcess — connects to the running nanobot via its Unix socket
# ---------------------------------------------------------------------------
def _resolve_nanobot_command_and_workdir() -> tuple[str, Path]:
command_override = os.getenv("NANOBOT_COMMAND")
workdir_override = os.getenv("NANOBOT_WORKDIR")
class NanobotApiProcess:
"""Connects to the running nanobot process via its Unix domain socket.
if workdir_override:
default_workdir = Path(workdir_override).expanduser()
else:
default_workdir = Path.home()
Lifecycle
---------
``start()`` opens a connection to nanobot's API socket.
``send()`` writes a user message over the socket.
``stop()`` closes the connection.
"""
if command_override:
return command_override, default_workdir
nanobot_dir = Path.home() / "nanobot"
nanobot_python_candidates = [
nanobot_dir / ".venv" / "bin" / "python",
nanobot_dir / "venv" / "bin" / "python",
]
for nanobot_venv_python in nanobot_python_candidates:
if nanobot_venv_python.exists():
if not workdir_override:
default_workdir = nanobot_dir
return (
f"{nanobot_venv_python} -m nanobot agent --no-markdown",
default_workdir,
)
return "nanobot agent --no-markdown", default_workdir
def _infer_venv_root(command_parts: list[str], workdir: Path) -> Path | None:
if not command_parts:
return None
binary = Path(command_parts[0]).expanduser()
if (
binary.is_absolute()
and binary.name.startswith("python")
and binary.parent.name == "bin"
):
return binary.parent.parent
for candidate in (workdir / ".venv", workdir / "venv"):
if (candidate / "bin" / "python").exists():
return candidate
return None
def _build_process_env(
command_parts: list[str], workdir: Path
) -> tuple[dict[str, str], Path | None]:
env = os.environ.copy()
env.pop("PYTHONHOME", None)
venv_root = _infer_venv_root(command_parts, workdir)
if not venv_root:
return env, None
venv_bin = str(venv_root / "bin")
path_entries = [entry for entry in env.get("PATH", "").split(os.pathsep) if entry]
path_entries = [entry for entry in path_entries if entry != venv_bin]
path_entries.insert(0, venv_bin)
env["PATH"] = os.pathsep.join(path_entries)
env["VIRTUAL_ENV"] = str(venv_root)
return env, venv_root
class NanobotTUIProcess:
def __init__(self, bus: WisperBus, command: str, workdir: Path) -> None:
def __init__(self, bus: WisperBus, socket_path: Path) -> None:
self._bus = bus
self._command = command
self._workdir = workdir
self._process: subprocess.Popen[bytes] | None = None
self._master_fd: int | None = None
self._read_task: asyncio.Task[None] | None = None
self._pending_output = ""
self._suppress_noisy_ui = os.getenv(
"NANOBOT_SUPPRESS_NOISY_UI", "1"
).strip() not in {
"0",
"false",
"False",
"no",
"off",
}
self._dedup_window_s = max(
0.2, float(os.getenv("NANOBOT_OUTPUT_DEDUP_WINDOW_S", "1.5"))
)
self._recent_lines: deque[tuple[str, float]] = deque()
self._last_tts_line = ""
self._socket_path = socket_path
self._reader: asyncio.StreamReader | None = None
self._writer: asyncio.StreamWriter | None = None
self._read_task: asyncio.Task | None = None
@property
def running(self) -> bool:
return self._process is not None and self._process.poll() is None
return (
self._writer is not None
and not self._writer.is_closing()
and self._read_task is not None
and not self._read_task.done()
)
async def start(self) -> None:
if self.running:
await self._bus.publish(
WisperEvent(role="system", text="Nanobot TUI is already running.")
WisperEvent(role="system", text="Already connected to nanobot.")
)
return
command_parts = [
os.path.expandvars(os.path.expanduser(part))
for part in shlex.split(self._command)
]
if not command_parts:
await self._bus.publish(
WisperEvent(role="system", text="NANOBOT_COMMAND is empty.")
)
return
if not self._workdir.exists():
await self._bus.publish(
WisperEvent(
role="system",
text=f"NANOBOT_WORKDIR does not exist: {self._workdir}",
)
)
return
master_fd, slave_fd = pty.openpty()
child_env, child_venv_root = _build_process_env(
command_parts=command_parts, workdir=self._workdir
)
try:
self._process = subprocess.Popen(
command_parts,
stdin=slave_fd,
stdout=slave_fd,
stderr=slave_fd,
cwd=str(self._workdir),
start_new_session=True,
env=child_env,
)
except FileNotFoundError as exc:
os.close(master_fd)
os.close(slave_fd)
if not self._socket_path.exists():
await self._bus.publish(
WisperEvent(
role="system",
text=(
"Could not start Nanobot process "
f"(command='{command_parts[0]}', workdir='{self._workdir}'): {exc}. "
"Check NANOBOT_COMMAND and NANOBOT_WORKDIR."
f"Nanobot API socket not found at {self._socket_path}. "
"Make sure nanobot is running with the API channel enabled "
"(set channels.api.enabled = true in ~/.nanobot/config.json, "
"then run: nanobot gateway)."
),
)
)
return
except Exception as exc:
os.close(master_fd)
os.close(slave_fd)
await self._bus.publish(
WisperEvent(role="system", text=f"Failed to spawn TUI process: {exc}")
)
return
os.close(slave_fd)
os.set_blocking(master_fd, False)
self._master_fd = master_fd
self._read_task = asyncio.create_task(
self._read_output(), name="nanobot-tui-reader"
)
await self._bus.publish(
WisperEvent(
role="system",
text=f"Spawned Nanobot TUI with command: {' '.join(command_parts)}",
try:
self._reader, self._writer = await asyncio.open_unix_connection(
path=str(self._socket_path)
)
)
if child_venv_root:
except OSError as exc:
await self._bus.publish(
WisperEvent(
role="system",
text=f"Nanobot runtime venv: {child_venv_root}",
text=f"Could not connect to nanobot API socket: {exc}",
)
)
return
self._read_task = asyncio.create_task(self._read_loop(), name="nanobot-api-reader")
await self._bus.publish(WisperEvent(role="system", text="Connected to nanobot."))
async def send(self, text: str) -> None:
if not self.running or self._master_fd is None:
if not self.running or self._writer is None:
await self._bus.publish(
WisperEvent(
role="system", text="Nanobot TUI is not running. Click spawn first."
role="system",
text="Not connected to nanobot. Click spawn first.",
)
)
return
message = text.rstrip("\n") + "\n"
payload = json.dumps({"type": "message", "content": text, "chat_id": "web"}) + "\n"
try:
os.write(self._master_fd, message.encode())
self._writer.write(payload.encode())
await self._writer.drain()
except OSError as exc:
await self._bus.publish(
WisperEvent(role="system", text=f"Failed to write to TUI: {exc}")
await self._bus.publish(WisperEvent(role="system", text=f"Send failed: {exc}"))
await self._cleanup()
async def send_ui_response(self, request_id: str, value: str) -> None:
"""Forward a ui-response (choice selection) back to nanobot."""
if not self.running or self._writer is None:
return
payload = (
json.dumps(
{"type": "ui-response", "request_id": request_id, "value": value, "chat_id": "web"}
)
+ "\n"
)
try:
self._writer.write(payload.encode())
await self._writer.drain()
except OSError as exc:
await self._bus.publish(WisperEvent(role="system", text=f"Send failed: {exc}"))
await self._cleanup()
async def send_command(self, command: str) -> None:
"""Send a command (e.g. 'reset') to nanobot."""
if not self.running or self._writer is None:
await self._bus.publish(
WisperEvent(
role="system",
text="Not connected to nanobot. Click spawn first.",
)
)
return
payload = json.dumps({"type": "command", "command": command, "chat_id": "web"}) + "\n"
try:
self._writer.write(payload.encode())
await self._writer.drain()
except OSError as exc:
await self._bus.publish(WisperEvent(role="system", text=f"Send failed: {exc}"))
await self._cleanup()
async def stop(self) -> None:
if self._read_task:
await self._cleanup()
await self._bus.publish(WisperEvent(role="system", text="Disconnected from nanobot."))
# ------------------------------------------------------------------
# Internal
# ------------------------------------------------------------------
async def _cleanup(self) -> None:
if self._read_task and not self._read_task.done():
self._read_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
try:
await self._read_task
self._read_task = None
if self.running and self._process:
try:
os.killpg(self._process.pid, signal.SIGTERM)
except ProcessLookupError:
except asyncio.CancelledError:
pass
except Exception:
self._process.terminate()
try:
self._process.wait(timeout=3)
except Exception:
self._process.kill()
self._process.wait(timeout=1)
self._read_task = None
if self._master_fd is not None:
if self._writer:
try:
os.close(self._master_fd)
self._writer.close()
await self._writer.wait_closed()
except OSError:
pass
self._master_fd = None
self._process = None
self._pending_output = ""
self._recent_lines.clear()
self._last_tts_line = ""
await self._bus.publish(WisperEvent(role="system", text="Stopped Nanobot TUI."))
self._writer = None
self._reader = None
async def _read_output(self) -> None:
if self._master_fd is None:
return
while self.running:
if not await self._wait_for_fd_readable():
break
try:
chunk = os.read(self._master_fd, 4096)
except BlockingIOError:
continue
except OSError:
break
if not chunk:
if not self.running:
async def _read_loop(self) -> None:
"""Read newline-delimited JSON from nanobot and publish WisperEvents."""
assert self._reader is not None
try:
while True:
try:
line = await self._reader.readline()
except OSError:
break
await asyncio.sleep(0.01)
continue
text = _clean_output(chunk.decode(errors="ignore"))
if not text.strip():
continue
displayable, tts_publishable, saw_thinking = self._consume_output_chunk(
text
)
if saw_thinking:
await self._bus.publish(
WisperEvent(role="agent-state", text="thinking")
)
if displayable:
await self._bus.publish(WisperEvent(role="nanobot", text=displayable))
if tts_publishable:
await self._bus.publish(
WisperEvent(role="nanobot-tts", text=tts_publishable)
)
trailing_display, trailing_tts, _ = self._consume_output_chunk("\n")
if trailing_display:
await self._bus.publish(WisperEvent(role="nanobot", text=trailing_display))
if trailing_tts:
await self._bus.publish(WisperEvent(role="nanobot-tts", text=trailing_tts))
if self._process is not None:
exit_code = self._process.poll()
await self._bus.publish(
WisperEvent(
role="system", text=f"Nanobot TUI exited (code={exit_code})."
)
)
def _consume_output_chunk(self, text: str) -> tuple[str, str, bool]:
"""Return (displayable, tts_publishable, saw_thinking)."""
self._pending_output += text
lines = self._pending_output.split("\n")
self._pending_output = lines.pop()
if len(self._pending_output) > 1024:
lines.append(self._pending_output)
self._pending_output = ""
kept_lines: list[str] = []
tts_lines: list[str] = []
saw_thinking = False
for line in lines:
normalized = self._normalize_line(line)
if not normalized:
continue
if self._suppress_noisy_ui and self._is_noisy_ui_line(normalized):
# Detect thinking lines even though they are filtered from display.
candidate = LEADING_NON_WORD_RE.sub("", normalized)
if THINKING_LINE_RE.search(candidate):
saw_thinking = True
continue
if normalized != self._last_tts_line:
tts_lines.append(normalized)
self._last_tts_line = normalized
if self._is_recent_duplicate(normalized):
continue
kept_lines.append(normalized)
return "\n".join(kept_lines).strip(), "\n".join(tts_lines).strip(), saw_thinking
def _normalize_line(self, line: str) -> str:
without_emoji = EMOJI_RE.sub(" ", line)
normalized = WHITESPACE_RE.sub(" ", without_emoji).strip()
# Strip leading "nanobot:" prefix that the TUI echoes in its own output,
# since the frontend already labels lines with the role name and TTS
# should not read the agent's own name aloud.
normalized = AGENT_OUTPUT_PREFIX_RE.sub("", normalized)
return normalized
def _is_noisy_ui_line(self, line: str) -> bool:
if SPINNER_ONLY_RE.fullmatch(line):
return True
if BOX_DRAWING_ONLY_RE.fullmatch(line):
return True
candidate = LEADING_NON_WORD_RE.sub("", line)
if THINKING_LINE_RE.search(candidate):
return True
if TOOL_STREAM_LINE_RE.match(candidate):
return True
if USER_ECHO_LINE_RE.match(candidate):
return True
return False
async def _wait_for_fd_readable(self) -> bool:
if self._master_fd is None:
return False
loop = asyncio.get_running_loop()
ready: asyncio.Future[None] = loop.create_future()
def _mark_ready() -> None:
if not ready.done():
ready.set_result(None)
try:
loop.add_reader(self._master_fd, _mark_ready)
except (AttributeError, NotImplementedError, OSError, ValueError):
await asyncio.sleep(0.01)
return True
try:
await ready
return True
if not line:
break # EOF — nanobot closed the connection
await self._handle_line(line)
finally:
with contextlib.suppress(Exception):
loop.remove_reader(self._master_fd)
await self._bus.publish(
WisperEvent(role="system", text="Nanobot closed the connection.")
)
# Clear writer so running → False
self._writer = None
self._reader = None
def _is_recent_duplicate(self, line: str) -> bool:
now = time.monotonic()
normalized = line.lower()
async def _handle_line(self, line: bytes) -> None:
raw = line.decode(errors="replace").strip()
if not raw:
return
try:
obj = json.loads(raw)
except json.JSONDecodeError:
await self._bus.publish(
WisperEvent(role="system", text=f"Malformed response from nanobot: {raw[:200]}")
)
return
while (
self._recent_lines
and (now - self._recent_lines[0][1]) > self._dedup_window_s
):
self._recent_lines.popleft()
msg_type = str(obj.get("type", ""))
for previous, _timestamp in self._recent_lines:
if previous == normalized:
return True
if msg_type == "message":
content = str(obj.get("content", ""))
is_progress = bool(obj.get("is_progress", False))
if is_progress:
# Intermediate tool-call hint — show in UI, skip TTS
await self._bus.publish(WisperEvent(role="nanobot-progress", text=content))
else:
# Final answer — display + TTS
await self._bus.publish(WisperEvent(role="nanobot", text=content))
await self._bus.publish(WisperEvent(role="nanobot-tts", text=content))
self._recent_lines.append((normalized, now))
return False
elif msg_type == "agent_state":
state = str(obj.get("state", ""))
await self._bus.publish(WisperEvent(role="agent-state", text=state))
elif msg_type == "toast":
# Forward the full toast payload as JSON so the frontend can render it.
await self._bus.publish(WisperEvent(role="toast", text=json.dumps(obj)))
elif msg_type == "choice":
# Forward the full choice payload as JSON so the frontend can render it.
await self._bus.publish(WisperEvent(role="choice", text=json.dumps(obj)))
elif msg_type == "pong":
pass # keepalive, ignore
elif msg_type == "error":
await self._bus.publish(
WisperEvent(role="system", text=f"Nanobot error: {obj.get('error', '')}")
)
# ---------------------------------------------------------------------------
# SuperTonicGateway — public interface (unchanged from original)
# ---------------------------------------------------------------------------
class SuperTonicGateway:
def __init__(self) -> None:
self.bus = WisperBus()
self._lock = asyncio.Lock()
self._tui: NanobotTUIProcess | None = None
self._process: NanobotApiProcess | None = None
socket_path = Path(os.getenv("NANOBOT_API_SOCKET", str(DEFAULT_SOCKET_PATH))).expanduser()
self._socket_path = socket_path
async def subscribe(self) -> asyncio.Queue[WisperEvent]:
return await self.bus.subscribe()
@ -445,18 +273,15 @@ class SuperTonicGateway:
await self.bus.unsubscribe(queue)
async def spawn_tui(self) -> None:
"""Connect to nanobot (name kept for API compatibility with app.py)."""
async with self._lock:
if self._tui and self._tui.running:
if self._process and self._process.running:
await self.bus.publish(
WisperEvent(role="system", text="Nanobot TUI is already running.")
WisperEvent(role="system", text="Already connected to nanobot.")
)
return
command, workdir = _resolve_nanobot_command_and_workdir()
self._tui = NanobotTUIProcess(
bus=self.bus, command=command, workdir=workdir
)
await self._tui.start()
self._process = NanobotApiProcess(bus=self.bus, socket_path=self._socket_path)
await self._process.start()
async def send_user_message(self, text: str) -> None:
message = text.strip()
@ -464,20 +289,34 @@ class SuperTonicGateway:
return
await self.bus.publish(WisperEvent(role="user", text=message))
async with self._lock:
if not self._tui:
if not self._process:
await self.bus.publish(
WisperEvent(
role="system",
text="Nanobot TUI is not running. Click spawn first.",
text="Not connected to nanobot. Click spawn first.",
)
)
return
await self._tui.send(message)
await self._process.send(message)
async def send_ui_response(self, request_id: str, value: str) -> None:
"""Forward a choice selection back to nanobot."""
async with self._lock:
if self._process:
await self._process.send_ui_response(request_id, value)
async def send_command(self, command: str) -> None:
"""Send a command (e.g. 'reset') to nanobot."""
async with self._lock:
if self._process:
await self._process.send_command(command)
async def stop_tui(self) -> None:
"""Disconnect from nanobot (name kept for API compatibility with app.py)."""
async with self._lock:
if self._tui:
await self._tui.stop()
if self._process:
await self._process.stop()
self._process = None
async def shutdown(self) -> None:
await self.stop_tui()