#!/usr/bin/env python3 from __future__ import annotations import argparse import contextlib import json import os import socket import subprocess import sys import time from pathlib import Path from typing import Any ROOT_DIR = Path(__file__).resolve().parents[1] WORKSPACE_DIR = Path(os.getenv("NANOBOT_WORKSPACE", str(Path.home() / ".nanobot"))).expanduser() LOG_DIR = WORKSPACE_DIR / "logs" SOCKET_PATH = Path(os.getenv("MELO_TTS_SOCKET", str(WORKSPACE_DIR / "melotts.sock"))).expanduser() SERVER_SCRIPT = ROOT_DIR / "scripts" / "melotts_server.py" SERVER_LOG_PATH = LOG_DIR / "melotts-server.log" DEFAULT_STARTUP_TIMEOUT_S = float(os.getenv("MELO_TTS_SERVER_STARTUP_TIMEOUT_S", "120")) def _build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser(description="Nanobot MeloTTS command adapter.") parser.add_argument("--text", required=True) parser.add_argument("--output-wav", required=True) parser.add_argument("--socket-path", default=str(SOCKET_PATH)) return parser def _rpc(socket_path: Path, payload: dict[str, Any], timeout_s: float = 10.0) -> dict[str, Any]: sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) sock.settimeout(timeout_s) try: sock.connect(str(socket_path)) sock.sendall((json.dumps(payload) + "\n").encode("utf-8")) response = sock.recv(8192).decode("utf-8", errors="replace").strip() finally: sock.close() if not response: raise RuntimeError("empty response from MeloTTS server") return json.loads(response) def _ping(socket_path: Path) -> bool: try: response = _rpc(socket_path, {"action": "ping"}, timeout_s=2.0) except Exception: return False return bool(response.get("ok")) def _ensure_server(socket_path: Path) -> None: if _ping(socket_path): return with contextlib.suppress(FileNotFoundError): socket_path.unlink() LOG_DIR.mkdir(parents=True, exist_ok=True) with SERVER_LOG_PATH.open("a", encoding="utf-8") as log_handle: proc = subprocess.Popen( [sys.executable, str(SERVER_SCRIPT), "--socket-path", str(socket_path)], cwd=str(ROOT_DIR), stdin=subprocess.DEVNULL, stdout=log_handle, stderr=subprocess.STDOUT, start_new_session=True, ) deadline = time.time() + DEFAULT_STARTUP_TIMEOUT_S while time.time() < deadline: if _ping(socket_path): return exit_code = proc.poll() if exit_code is not None: raise RuntimeError( f"MeloTTS server exited during startup with code {exit_code}. " f"See {SERVER_LOG_PATH}" ) time.sleep(0.5) raise RuntimeError(f"MeloTTS server did not become ready within {DEFAULT_STARTUP_TIMEOUT_S:.0f}s") def main() -> int: args = _build_parser().parse_args() socket_path = Path(args.socket_path).expanduser() _ensure_server(socket_path) response = _rpc( socket_path, { "action": "synthesize", "text": args.text, "output_wav": args.output_wav, }, timeout_s=max(30.0, DEFAULT_STARTUP_TIMEOUT_S), ) if not response.get("ok"): raise RuntimeError(str(response.get("error", "MeloTTS synthesis failed"))) return 0 if __name__ == "__main__": raise SystemExit(main())