#!/usr/bin/env python3 from __future__ import annotations import argparse import base64 import contextlib import json import os import signal import socket import sys from pathlib import Path from typing import Any try: import numpy as np except Exception as exc: # pragma: no cover - runtime fallback when dependency is missing np = None # type: ignore[assignment] NUMPY_IMPORT_ERROR = exc else: NUMPY_IMPORT_ERROR = None ROOT_DIR = Path(__file__).resolve().parents[1] WORKSPACE_DIR = Path(os.getenv("NANOBOT_WORKSPACE", str(Path.home() / ".nanobot"))).expanduser() SOCKET_PATH = Path(os.getenv("MELO_TTS_SOCKET", str(WORKSPACE_DIR / "melotts.sock"))).expanduser() try: from melo.api import TTS MELO_TTS_AVAILABLE = True except Exception as exc: # pragma: no cover - runtime fallback when dependency is missing TTS = None # type: ignore[assignment] MELO_TTS_AVAILABLE = False IMPORT_ERROR = exc else: IMPORT_ERROR = None class MeloTTSServer: def __init__(self) -> None: if not MELO_TTS_AVAILABLE or TTS is None: raise RuntimeError(f"MeloTTS import failed: {IMPORT_ERROR}") if np is None: raise RuntimeError(f"numpy import failed: {NUMPY_IMPORT_ERROR}") self._language = os.getenv("MELO_TTS_LANGUAGE", "EN").strip() or "EN" self._device = os.getenv("MELO_TTS_DEVICE", "cpu").strip() or "cpu" self._speed = float(os.getenv("MELO_TTS_SPEED", "1.0")) self._speaker_name = os.getenv("MELO_TTS_SPEAKER", "EN-US").strip() or "EN-US" self._warmup_text = os.getenv("MELO_TTS_WARMUP_TEXT", "Nanobot is ready.").strip() self._model = TTS(language=self._language, device=self._device) self._speaker_ids = dict(getattr(self._model.hps.data, "spk2id", {})) if self._speaker_name not in self._speaker_ids: available = ", ".join(sorted(self._speaker_ids)) raise RuntimeError( f"speaker '{self._speaker_name}' is not available for language {self._language}. " f"Available speakers: {available}" ) self._speaker_id = self._speaker_ids[self._speaker_name] if self._warmup_text: self._warmup() def ping(self) -> dict[str, Any]: return { "ok": True, "language": self._language, "device": self._device, "speaker": self._speaker_name, "speakers": sorted(self._speaker_ids), } def synthesize_pcm(self, text: str) -> dict[str, Any]: clean_text = " ".join(text.split()) if not clean_text: raise RuntimeError("text is empty") pcm, sample_rate, channels = self._synthesize_pcm(clean_text) return { "ok": True, "encoding": "pcm_s16le_base64", "pcm": base64.b64encode(pcm).decode("ascii"), "sample_rate": sample_rate, "channels": channels, "language": self._language, "speaker": self._speaker_name, } def synthesize_to_file(self, text: str, output_wav: str) -> dict[str, Any]: clean_text = " ".join(text.split()) if not clean_text: raise RuntimeError("text is empty") output_path = Path(output_wav) output_path.parent.mkdir(parents=True, exist_ok=True) self._model.tts_to_file( clean_text, self._speaker_id, str(output_path), speed=self._speed, quiet=True, ) return { "ok": True, "output_wav": str(output_path), "language": self._language, "speaker": self._speaker_name, } def _warmup(self) -> None: self._synthesize_pcm(self._warmup_text) def _synthesize_pcm(self, text: str) -> tuple[bytes, int, int]: wav = self._model.tts_to_file( text, self._speaker_id, None, speed=self._speed, quiet=True, ) if np is None: raise RuntimeError("numpy is unavailable") samples = np.asarray(wav) if samples.size == 0: raise RuntimeError("MeloTTS produced empty audio") channels = 1 if samples.ndim == 0: samples = samples.reshape(1) elif samples.ndim == 1: channels = 1 elif samples.ndim == 2: dim0, dim1 = int(samples.shape[0]), int(samples.shape[1]) if dim0 <= 2 and dim1 > dim0: channels = dim0 samples = samples.T elif dim1 <= 2 and dim0 > dim1: channels = dim1 else: channels = 1 samples = samples.reshape(-1) else: channels = 1 samples = samples.reshape(-1) if np.issubdtype(samples.dtype, np.floating): samples = np.clip(samples, -1.0, 1.0) samples = (samples * 32767.0).astype(np.int16) elif samples.dtype != np.int16: samples = samples.astype(np.int16) sample_rate = int(getattr(self._model.hps.data, "sampling_rate", 44100)) return samples.tobytes(), sample_rate, max(1, channels) def _build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser(description="Persistent MeloTTS sidecar for Nanobot voice.") parser.add_argument("--socket-path", default=str(SOCKET_PATH)) return parser def _receive_json(conn: socket.socket) -> dict[str, Any]: chunks: list[bytes] = [] while True: data = conn.recv(8192) if not data: break chunks.append(data) if b"\n" in data: break payload = b"".join(chunks).decode("utf-8", errors="replace").strip() if not payload: return {} return json.loads(payload) def _send_json(conn: socket.socket, payload: dict[str, Any]) -> None: conn.sendall((json.dumps(payload) + "\n").encode("utf-8")) def main() -> int: args = _build_parser().parse_args() socket_path = Path(args.socket_path).expanduser() socket_path.parent.mkdir(parents=True, exist_ok=True) with contextlib.suppress(FileNotFoundError): socket_path.unlink() stop_requested = False def request_stop(_signum: int, _frame: object) -> None: nonlocal stop_requested stop_requested = True signal.signal(signal.SIGTERM, request_stop) signal.signal(signal.SIGINT, request_stop) try: server = MeloTTSServer() except Exception as exc: print(f"melotts server initialization failed: {exc}", file=sys.stderr, flush=True) return 1 listener = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) listener.bind(str(socket_path)) listener.listen(8) listener.settimeout(1.0) try: while not stop_requested: try: conn, _addr = listener.accept() except TimeoutError: continue except OSError: if stop_requested: break raise with conn: try: request = _receive_json(conn) action = str(request.get("action", "")).strip().lower() if action == "ping": _send_json(conn, server.ping()) continue if action == "synthesize_pcm": text = str(request.get("text", "")) response = server.synthesize_pcm(text) _send_json(conn, response) continue if action == "synthesize": text = str(request.get("text", "")) output_wav = str(request.get("output_wav", "")) if not output_wav: raise RuntimeError("output_wav is required") response = server.synthesize_to_file(text, output_wav) _send_json(conn, response) continue raise RuntimeError(f"unsupported action: {action or 'missing'}") except Exception as exc: _send_json(conn, {"ok": False, "error": str(exc)}) finally: listener.close() with contextlib.suppress(FileNotFoundError): socket_path.unlink() return 0 if __name__ == "__main__": raise SystemExit(main())