This commit is contained in:
kacper 2026-03-04 08:20:42 -05:00
parent 133b557512
commit ed629ff60e
7 changed files with 948 additions and 525 deletions

10
app.py
View file

@ -6,17 +6,21 @@ from typing import Any, Awaitable, Callable
from fastapi import FastAPI, WebSocket, WebSocketDisconnect from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import FileResponse, JSONResponse from fastapi.responses import FileResponse, JSONResponse
from fastapi.staticfiles import StaticFiles
from supertonic_gateway import SuperTonicGateway from supertonic_gateway import SuperTonicGateway
from voice_rtc import WebRTCVoiceSession from voice_rtc import WebRTCVoiceSession
BASE_DIR = Path(__file__).resolve().parent BASE_DIR = Path(__file__).resolve().parent
INDEX_PATH = BASE_DIR / "static" / "index.html" STATIC_DIR = BASE_DIR / "static"
INDEX_PATH = STATIC_DIR / "index.html"
app = FastAPI(title="Nanobot SuperTonic Wisper Web") app = FastAPI(title="Nanobot SuperTonic Wisper Web")
gateway = SuperTonicGateway() gateway = SuperTonicGateway()
app.mount("/static", StaticFiles(directory=str(STATIC_DIR)), name="static")
@app.get("/health") @app.get("/health")
async def health() -> JSONResponse: async def health() -> JSONResponse:
@ -64,13 +68,15 @@ async def websocket_chat(websocket: WebSocket) -> None:
voice_session.set_push_to_talk_pressed( voice_session.set_push_to_talk_pressed(
bool(message.get("pressed", False)) bool(message.get("pressed", False))
) )
elif msg_type == "user-message":
await gateway.send_user_message(str(message.get("text", "")))
else: else:
await safe_send_json( await safe_send_json(
{ {
"role": "system", "role": "system",
"text": ( "text": (
"Unknown message type. Use spawn, stop, rtc-offer, " "Unknown message type. Use spawn, stop, rtc-offer, "
"rtc-ice-candidate, or voice-ptt." "rtc-ice-candidate, voice-ptt, or user-message."
), ),
"timestamp": "", "timestamp": "",
} }

View file

@ -36,7 +36,7 @@ export NANOBOT_SUPPRESS_NOISY_UI NANOBOT_OUTPUT_DEDUP_WINDOW_S
# Host voice pipeline env vars (safe defaults). # Host voice pipeline env vars (safe defaults).
: "${HOST_STT_PROVIDER:=faster-whisper}" : "${HOST_STT_PROVIDER:=faster-whisper}"
: "${HOST_STT_COMMAND:=}" : "${HOST_STT_COMMAND:=}"
: "${HOST_STT_MODEL:=base.en}" : "${HOST_STT_MODEL:=distil-large-v3}"
: "${HOST_STT_DEVICE:=auto}" : "${HOST_STT_DEVICE:=auto}"
: "${HOST_STT_COMPUTE_TYPE:=int8}" : "${HOST_STT_COMPUTE_TYPE:=int8}"
: "${HOST_STT_LANGUAGE:=en}" : "${HOST_STT_LANGUAGE:=en}"
@ -48,45 +48,39 @@ export NANOBOT_SUPPRESS_NOISY_UI NANOBOT_OUTPUT_DEDUP_WINDOW_S
: "${HOST_STT_NO_SPEECH_THRESHOLD:=0.6}" : "${HOST_STT_NO_SPEECH_THRESHOLD:=0.6}"
: "${HOST_STT_COMPRESSION_RATIO_THRESHOLD:=2.4}" : "${HOST_STT_COMPRESSION_RATIO_THRESHOLD:=2.4}"
: "${HOST_STT_INITIAL_PROMPT:=Transcribe brief spoken English precisely. Prefer common words over sound effects.}" : "${HOST_STT_INITIAL_PROMPT:=Transcribe brief spoken English precisely. Prefer common words over sound effects.}"
: "${HOST_STT_REPETITION_PENALTY:=1.0}"
: "${HOST_STT_HALLUCINATION_SILENCE_THRESHOLD:=}"
: "${HOST_TTS_PROVIDER:=supertonic}" : "${HOST_TTS_PROVIDER:=supertonic}"
: "${HOST_TTS_COMMAND:=}" : "${HOST_TTS_COMMAND:=}"
: "${SUPERTONIC_MODEL:=supertonic-2}" : "${SUPERTONIC_MODEL:=supertonic-2}"
: "${SUPERTONIC_VOICE_STYLE:=M1}" : "${SUPERTONIC_VOICE_STYLE:=F1}"
: "${SUPERTONIC_LANG:=en}" : "${SUPERTONIC_LANG:=en}"
: "${SUPERTONIC_TOTAL_STEPS:=5}" : "${SUPERTONIC_TOTAL_STEPS:=8}"
: "${SUPERTONIC_SPEED:=1.05}" : "${SUPERTONIC_SPEED:=1.5}"
: "${SUPERTONIC_INTRA_OP_THREADS:=1}" : "${SUPERTONIC_INTRA_OP_THREADS:=1}"
: "${SUPERTONIC_INTER_OP_THREADS:=1}" : "${SUPERTONIC_INTER_OP_THREADS:=1}"
: "${SUPERTONIC_AUTO_DOWNLOAD:=1}" : "${SUPERTONIC_AUTO_DOWNLOAD:=1}"
: "${HOST_STT_MIN_PTT_MS:=220}" : "${HOST_STT_MIN_PTT_MS:=220}"
: "${HOST_STT_MAX_PTT_MS:=12000}"
: "${HOST_STT_SEGMENT_QUEUE_SIZE:=2}" : "${HOST_STT_SEGMENT_QUEUE_SIZE:=2}"
: "${HOST_STT_BACKLOG_NOTICE_INTERVAL_S:=6.0}" : "${HOST_STT_BACKLOG_NOTICE_INTERVAL_S:=6.0}"
: "${HOST_STT_SUPPRESS_DURING_TTS:=1}" : "${HOST_STT_SUPPRESS_DURING_TTS:=1}"
: "${HOST_STT_SUPPRESS_MS_AFTER_TTS:=300}" : "${HOST_STT_SUPPRESS_MS_AFTER_TTS:=300}"
: "${HOST_RTC_OUTBOUND_LEAD_IN_MS:=120}" : "${HOST_RTC_OUTBOUND_LEAD_IN_MS:=120}"
: "${HOST_RTC_OUTBOUND_IDLE_S:=0.6}" : "${HOST_RTC_OUTBOUND_IDLE_S:=0.6}"
: "${HOST_TTS_FLUSH_DELAY_S:=0.45}"
: "${HOST_TTS_SENTENCE_FLUSH_DELAY_S:=0.15}"
: "${HOST_TTS_MIN_CHARS:=10}"
: "${HOST_TTS_MAX_WAIT_MS:=1800}"
: "${HOST_TTS_MAX_CHUNK_CHARS:=140}"
export HOST_STT_PROVIDER HOST_STT_COMMAND HOST_STT_MODEL HOST_STT_DEVICE export HOST_STT_PROVIDER HOST_STT_COMMAND HOST_STT_MODEL HOST_STT_DEVICE
export HOST_STT_COMPUTE_TYPE HOST_STT_LANGUAGE HOST_STT_BEAM_SIZE HOST_STT_BEST_OF HOST_STT_VAD_FILTER export HOST_STT_COMPUTE_TYPE HOST_STT_LANGUAGE HOST_STT_BEAM_SIZE HOST_STT_BEST_OF HOST_STT_VAD_FILTER
export HOST_STT_TEMPERATURE HOST_STT_LOG_PROB_THRESHOLD HOST_STT_NO_SPEECH_THRESHOLD export HOST_STT_TEMPERATURE HOST_STT_LOG_PROB_THRESHOLD HOST_STT_NO_SPEECH_THRESHOLD
export HOST_STT_COMPRESSION_RATIO_THRESHOLD export HOST_STT_COMPRESSION_RATIO_THRESHOLD
export HOST_STT_INITIAL_PROMPT export HOST_STT_INITIAL_PROMPT HOST_STT_REPETITION_PENALTY HOST_STT_HALLUCINATION_SILENCE_THRESHOLD
export HOST_TTS_PROVIDER HOST_TTS_COMMAND export HOST_TTS_PROVIDER HOST_TTS_COMMAND
export SUPERTONIC_MODEL SUPERTONIC_VOICE_STYLE SUPERTONIC_LANG export SUPERTONIC_MODEL SUPERTONIC_VOICE_STYLE SUPERTONIC_LANG
export SUPERTONIC_TOTAL_STEPS SUPERTONIC_SPEED export SUPERTONIC_TOTAL_STEPS SUPERTONIC_SPEED
export SUPERTONIC_INTRA_OP_THREADS SUPERTONIC_INTER_OP_THREADS SUPERTONIC_AUTO_DOWNLOAD export SUPERTONIC_INTRA_OP_THREADS SUPERTONIC_INTER_OP_THREADS SUPERTONIC_AUTO_DOWNLOAD
export HOST_STT_MIN_PTT_MS HOST_STT_MAX_PTT_MS HOST_STT_SEGMENT_QUEUE_SIZE export HOST_STT_MIN_PTT_MS HOST_STT_SEGMENT_QUEUE_SIZE
export HOST_STT_BACKLOG_NOTICE_INTERVAL_S export HOST_STT_BACKLOG_NOTICE_INTERVAL_S
export HOST_STT_SUPPRESS_DURING_TTS HOST_STT_SUPPRESS_MS_AFTER_TTS export HOST_STT_SUPPRESS_DURING_TTS HOST_STT_SUPPRESS_MS_AFTER_TTS
export HOST_RTC_OUTBOUND_LEAD_IN_MS HOST_RTC_OUTBOUND_IDLE_S export HOST_RTC_OUTBOUND_LEAD_IN_MS HOST_RTC_OUTBOUND_IDLE_S
export HOST_TTS_FLUSH_DELAY_S HOST_TTS_SENTENCE_FLUSH_DELAY_S
export HOST_TTS_MIN_CHARS HOST_TTS_MAX_WAIT_MS HOST_TTS_MAX_CHUNK_CHARS
: "${UVICORN_HOST:=0.0.0.0}" : "${UVICORN_HOST:=0.0.0.0}"
: "${UVICORN_PORT:=8000}" : "${UVICORN_PORT:=8000}"

File diff suppressed because it is too large Load diff

7
static/three.min.js vendored Normal file

File diff suppressed because one or more lines are too long

View file

@ -18,26 +18,35 @@ CONTROL_CHAR_RE = re.compile(r"[\x00-\x08\x0b-\x1f\x7f]")
BRAILLE_SPINNER_RE = re.compile(r"[\u2800-\u28ff]") BRAILLE_SPINNER_RE = re.compile(r"[\u2800-\u28ff]")
SPINNER_ONLY_RE = re.compile(r"^[\s|/\\\-]+$") SPINNER_ONLY_RE = re.compile(r"^[\s|/\\\-]+$")
BOX_DRAWING_ONLY_RE = re.compile(r"^[\s\u2500-\u257f]+$") BOX_DRAWING_ONLY_RE = re.compile(r"^[\s\u2500-\u257f]+$")
THINKING_LINE_RE = re.compile(r"^(?:agent|nanobot)\s+is\s+thinking\b", re.IGNORECASE) THINKING_LINE_RE = re.compile(
r"\b(?:agent|nanobot|napbot)\b(?:\s+is)?\s+thinking\b",
re.IGNORECASE,
)
USER_ECHO_LINE_RE = re.compile(r"^(?:you|user)\s*:", re.IGNORECASE)
TOOL_STREAM_LINE_RE = re.compile( TOOL_STREAM_LINE_RE = re.compile(
r"^(?:tool(?:\s+call|\s+output)?|calling\s+tool|running\s+tool|executing\s+tool)\b", r"^(?:tool(?:\s+call|\s+output)?|calling\s+tool|running\s+tool|executing\s+tool)\b",
re.IGNORECASE, re.IGNORECASE,
) )
LEADING_NON_WORD_RE = re.compile(r"^[^\w]+")
WHITESPACE_RE = re.compile(r"\s+")
AGENT_OUTPUT_PREFIX_RE = re.compile(
r"^(?:nanobot|napbot)\b\s*[:>\-]?\s*", re.IGNORECASE
)
EMOJI_RE = re.compile( EMOJI_RE = re.compile(
"[" # Common emoji and pictograph blocks. "[" # Common emoji and pictograph blocks.
"\U0001F1E6-\U0001F1FF" "\U0001f1e6-\U0001f1ff"
"\U0001F300-\U0001F5FF" "\U0001f300-\U0001f5ff"
"\U0001F600-\U0001F64F" "\U0001f600-\U0001f64f"
"\U0001F680-\U0001F6FF" "\U0001f680-\U0001f6ff"
"\U0001F700-\U0001F77F" "\U0001f700-\U0001f77f"
"\U0001F780-\U0001F7FF" "\U0001f780-\U0001f7ff"
"\U0001F800-\U0001F8FF" "\U0001f800-\U0001f8ff"
"\U0001F900-\U0001F9FF" "\U0001f900-\U0001f9ff"
"\U0001FA00-\U0001FAFF" "\U0001fa00-\U0001faff"
"\u2600-\u26FF" "\u2600-\u26ff"
"\u2700-\u27BF" "\u2700-\u27bf"
"\uFE0F" "\ufe0f"
"\u200D" "\u200d"
"]" "]"
) )
@ -70,7 +79,10 @@ def _resolve_nanobot_command_and_workdir() -> tuple[str, Path]:
if nanobot_venv_python.exists(): if nanobot_venv_python.exists():
if not workdir_override: if not workdir_override:
default_workdir = nanobot_dir default_workdir = nanobot_dir
return f"{nanobot_venv_python} -m nanobot agent --no-markdown", default_workdir return (
f"{nanobot_venv_python} -m nanobot agent --no-markdown",
default_workdir,
)
return "nanobot agent --no-markdown", default_workdir return "nanobot agent --no-markdown", default_workdir
@ -80,7 +92,11 @@ def _infer_venv_root(command_parts: list[str], workdir: Path) -> Path | None:
return None return None
binary = Path(command_parts[0]).expanduser() binary = Path(command_parts[0]).expanduser()
if binary.is_absolute() and binary.name.startswith("python") and binary.parent.name == "bin": if (
binary.is_absolute()
and binary.name.startswith("python")
and binary.parent.name == "bin"
):
return binary.parent.parent return binary.parent.parent
for candidate in (workdir / ".venv", workdir / "venv"): for candidate in (workdir / ".venv", workdir / "venv"):
@ -89,7 +105,9 @@ def _infer_venv_root(command_parts: list[str], workdir: Path) -> Path | None:
return None return None
def _build_process_env(command_parts: list[str], workdir: Path) -> tuple[dict[str, str], Path | None]: def _build_process_env(
command_parts: list[str], workdir: Path
) -> tuple[dict[str, str], Path | None]:
env = os.environ.copy() env = os.environ.copy()
env.pop("PYTHONHOME", None) env.pop("PYTHONHOME", None)
@ -115,14 +133,18 @@ class NanobotTUIProcess:
self._master_fd: int | None = None self._master_fd: int | None = None
self._read_task: asyncio.Task[None] | None = None self._read_task: asyncio.Task[None] | None = None
self._pending_output = "" self._pending_output = ""
self._suppress_noisy_ui = os.getenv("NANOBOT_SUPPRESS_NOISY_UI", "1").strip() not in { self._suppress_noisy_ui = os.getenv(
"NANOBOT_SUPPRESS_NOISY_UI", "1"
).strip() not in {
"0", "0",
"false", "false",
"False", "False",
"no", "no",
"off", "off",
} }
self._dedup_window_s = max(0.2, float(os.getenv("NANOBOT_OUTPUT_DEDUP_WINDOW_S", "1.5"))) self._dedup_window_s = max(
0.2, float(os.getenv("NANOBOT_OUTPUT_DEDUP_WINDOW_S", "1.5"))
)
self._recent_lines: deque[tuple[str, float]] = deque() self._recent_lines: deque[tuple[str, float]] = deque()
self._last_tts_line = "" self._last_tts_line = ""
@ -132,14 +154,19 @@ class NanobotTUIProcess:
async def start(self) -> None: async def start(self) -> None:
if self.running: if self.running:
await self._bus.publish(WisperEvent(role="system", text="Nanobot TUI is already running.")) await self._bus.publish(
WisperEvent(role="system", text="Nanobot TUI is already running.")
)
return return
command_parts = [ command_parts = [
os.path.expandvars(os.path.expanduser(part)) for part in shlex.split(self._command) os.path.expandvars(os.path.expanduser(part))
for part in shlex.split(self._command)
] ]
if not command_parts: if not command_parts:
await self._bus.publish(WisperEvent(role="system", text="NANOBOT_COMMAND is empty.")) await self._bus.publish(
WisperEvent(role="system", text="NANOBOT_COMMAND is empty.")
)
return return
if not self._workdir.exists(): if not self._workdir.exists():
@ -152,7 +179,9 @@ class NanobotTUIProcess:
return return
master_fd, slave_fd = pty.openpty() master_fd, slave_fd = pty.openpty()
child_env, child_venv_root = _build_process_env(command_parts=command_parts, workdir=self._workdir) child_env, child_venv_root = _build_process_env(
command_parts=command_parts, workdir=self._workdir
)
try: try:
self._process = subprocess.Popen( self._process = subprocess.Popen(
command_parts, command_parts,
@ -188,7 +217,9 @@ class NanobotTUIProcess:
os.close(slave_fd) os.close(slave_fd)
os.set_blocking(master_fd, False) os.set_blocking(master_fd, False)
self._master_fd = master_fd self._master_fd = master_fd
self._read_task = asyncio.create_task(self._read_output(), name="nanobot-tui-reader") self._read_task = asyncio.create_task(
self._read_output(), name="nanobot-tui-reader"
)
await self._bus.publish( await self._bus.publish(
WisperEvent( WisperEvent(
role="system", role="system",
@ -206,14 +237,18 @@ class NanobotTUIProcess:
async def send(self, text: str) -> None: async def send(self, text: str) -> None:
if not self.running or self._master_fd is None: if not self.running or self._master_fd is None:
await self._bus.publish( await self._bus.publish(
WisperEvent(role="system", text="Nanobot TUI is not running. Click spawn first.") WisperEvent(
role="system", text="Nanobot TUI is not running. Click spawn first."
)
) )
return return
message = text.rstrip("\n") + "\n" message = text.rstrip("\n") + "\n"
try: try:
os.write(self._master_fd, message.encode()) os.write(self._master_fd, message.encode())
except OSError as exc: except OSError as exc:
await self._bus.publish(WisperEvent(role="system", text=f"Failed to write to TUI: {exc}")) await self._bus.publish(
WisperEvent(role="system", text=f"Failed to write to TUI: {exc}")
)
async def stop(self) -> None: async def stop(self) -> None:
if self._read_task: if self._read_task:
@ -251,29 +286,40 @@ class NanobotTUIProcess:
if self._master_fd is None: if self._master_fd is None:
return return
while self.running: while self.running:
if not await self._wait_for_fd_readable():
break
try: try:
chunk = os.read(self._master_fd, 4096) chunk = os.read(self._master_fd, 4096)
except BlockingIOError: except BlockingIOError:
await asyncio.sleep(0.05)
continue continue
except OSError: except OSError:
break break
if not chunk: if not chunk:
await asyncio.sleep(0.05) if not self.running:
break
await asyncio.sleep(0.01)
continue continue
text = _clean_output(chunk.decode(errors="ignore")) text = _clean_output(chunk.decode(errors="ignore"))
if not text.strip(): if not text.strip():
continue continue
displayable, tts_publishable = self._consume_output_chunk(text) displayable, tts_publishable, saw_thinking = self._consume_output_chunk(
text
)
if saw_thinking:
await self._bus.publish(
WisperEvent(role="agent-state", text="thinking")
)
if displayable: if displayable:
await self._bus.publish(WisperEvent(role="nanobot", text=displayable)) await self._bus.publish(WisperEvent(role="nanobot", text=displayable))
if tts_publishable: if tts_publishable:
await self._bus.publish(WisperEvent(role="nanobot-tts", text=tts_publishable)) await self._bus.publish(
WisperEvent(role="nanobot-tts", text=tts_publishable)
)
trailing_display, trailing_tts = self._consume_output_chunk("\n") trailing_display, trailing_tts, _ = self._consume_output_chunk("\n")
if trailing_display: if trailing_display:
await self._bus.publish(WisperEvent(role="nanobot", text=trailing_display)) await self._bus.publish(WisperEvent(role="nanobot", text=trailing_display))
if trailing_tts: if trailing_tts:
@ -282,10 +328,13 @@ class NanobotTUIProcess:
if self._process is not None: if self._process is not None:
exit_code = self._process.poll() exit_code = self._process.poll()
await self._bus.publish( await self._bus.publish(
WisperEvent(role="system", text=f"Nanobot TUI exited (code={exit_code}).") WisperEvent(
role="system", text=f"Nanobot TUI exited (code={exit_code})."
)
) )
def _consume_output_chunk(self, text: str) -> tuple[str, str]: def _consume_output_chunk(self, text: str) -> tuple[str, str, bool]:
"""Return (displayable, tts_publishable, saw_thinking)."""
self._pending_output += text self._pending_output += text
lines = self._pending_output.split("\n") lines = self._pending_output.split("\n")
@ -297,11 +346,16 @@ class NanobotTUIProcess:
kept_lines: list[str] = [] kept_lines: list[str] = []
tts_lines: list[str] = [] tts_lines: list[str] = []
saw_thinking = False
for line in lines: for line in lines:
normalized = self._normalize_line(line) normalized = self._normalize_line(line)
if not normalized: if not normalized:
continue continue
if self._suppress_noisy_ui and self._is_noisy_ui_line(normalized): if self._suppress_noisy_ui and self._is_noisy_ui_line(normalized):
# Detect thinking lines even though they are filtered from display.
candidate = LEADING_NON_WORD_RE.sub("", normalized)
if THINKING_LINE_RE.search(candidate):
saw_thinking = True
continue continue
if normalized != self._last_tts_line: if normalized != self._last_tts_line:
tts_lines.append(normalized) tts_lines.append(normalized)
@ -310,11 +364,16 @@ class NanobotTUIProcess:
continue continue
kept_lines.append(normalized) kept_lines.append(normalized)
return "\n".join(kept_lines).strip(), "\n".join(tts_lines).strip() return "\n".join(kept_lines).strip(), "\n".join(tts_lines).strip(), saw_thinking
def _normalize_line(self, line: str) -> str: def _normalize_line(self, line: str) -> str:
without_emoji = EMOJI_RE.sub(" ", line) without_emoji = EMOJI_RE.sub(" ", line)
return re.sub(r"\s+", " ", without_emoji).strip() normalized = WHITESPACE_RE.sub(" ", without_emoji).strip()
# Strip leading "nanobot:" prefix that the TUI echoes in its own output,
# since the frontend already labels lines with the role name and TTS
# should not read the agent's own name aloud.
normalized = AGENT_OUTPUT_PREFIX_RE.sub("", normalized)
return normalized
def _is_noisy_ui_line(self, line: str) -> bool: def _is_noisy_ui_line(self, line: str) -> bool:
if SPINNER_ONLY_RE.fullmatch(line): if SPINNER_ONLY_RE.fullmatch(line):
@ -322,18 +381,47 @@ class NanobotTUIProcess:
if BOX_DRAWING_ONLY_RE.fullmatch(line): if BOX_DRAWING_ONLY_RE.fullmatch(line):
return True return True
candidate = re.sub(r"^[^\w]+", "", line) candidate = LEADING_NON_WORD_RE.sub("", line)
if THINKING_LINE_RE.match(candidate): if THINKING_LINE_RE.search(candidate):
return True return True
if TOOL_STREAM_LINE_RE.match(candidate): if TOOL_STREAM_LINE_RE.match(candidate):
return True return True
if USER_ECHO_LINE_RE.match(candidate):
return True
return False return False
async def _wait_for_fd_readable(self) -> bool:
if self._master_fd is None:
return False
loop = asyncio.get_running_loop()
ready: asyncio.Future[None] = loop.create_future()
def _mark_ready() -> None:
if not ready.done():
ready.set_result(None)
try:
loop.add_reader(self._master_fd, _mark_ready)
except (AttributeError, NotImplementedError, OSError, ValueError):
await asyncio.sleep(0.01)
return True
try:
await ready
return True
finally:
with contextlib.suppress(Exception):
loop.remove_reader(self._master_fd)
def _is_recent_duplicate(self, line: str) -> bool: def _is_recent_duplicate(self, line: str) -> bool:
now = time.monotonic() now = time.monotonic()
normalized = line.lower() normalized = line.lower()
while self._recent_lines and (now - self._recent_lines[0][1]) > self._dedup_window_s: while (
self._recent_lines
and (now - self._recent_lines[0][1]) > self._dedup_window_s
):
self._recent_lines.popleft() self._recent_lines.popleft()
for previous, _timestamp in self._recent_lines: for previous, _timestamp in self._recent_lines:
@ -359,11 +447,15 @@ class SuperTonicGateway:
async def spawn_tui(self) -> None: async def spawn_tui(self) -> None:
async with self._lock: async with self._lock:
if self._tui and self._tui.running: if self._tui and self._tui.running:
await self.bus.publish(WisperEvent(role="system", text="Nanobot TUI is already running.")) await self.bus.publish(
WisperEvent(role="system", text="Nanobot TUI is already running.")
)
return return
command, workdir = _resolve_nanobot_command_and_workdir() command, workdir = _resolve_nanobot_command_and_workdir()
self._tui = NanobotTUIProcess(bus=self.bus, command=command, workdir=workdir) self._tui = NanobotTUIProcess(
bus=self.bus, command=command, workdir=workdir
)
await self._tui.start() await self._tui.start()
async def send_user_message(self, text: str) -> None: async def send_user_message(self, text: str) -> None:
@ -374,7 +466,10 @@ class SuperTonicGateway:
async with self._lock: async with self._lock:
if not self._tui: if not self._tui:
await self.bus.publish( await self.bus.publish(
WisperEvent(role="system", text="Nanobot TUI is not running. Click spawn first.") WisperEvent(
role="system",
text="Nanobot TUI is not running. Click spawn first.",
)
) )
return return
await self._tui.send(message) await self._tui.send(message)

View file

@ -68,8 +68,12 @@ SPEECH_FILTER_RE = re.compile(
r"^(spawned nanobot tui|stopped nanobot tui|nanobot tui exited|websocket)", r"^(spawned nanobot tui|stopped nanobot tui|nanobot tui exited|websocket)",
re.IGNORECASE, re.IGNORECASE,
) )
THINKING_STATUS_RE = re.compile(r"\bnanobot is thinking\b", re.IGNORECASE) THINKING_STATUS_RE = re.compile(
r"\b(?:agent|nanobot|napbot)\b(?:\s+is)?\s+thinking\b",
re.IGNORECASE,
)
USER_PREFIX_RE = re.compile(r"^(?:you|user)\s*:\s*", re.IGNORECASE) USER_PREFIX_RE = re.compile(r"^(?:you|user)\s*:\s*", re.IGNORECASE)
AGENT_PREFIX_RE = re.compile(r"^(?:nanobot|napbot)\b\s*[:>\-]?\s*", re.IGNORECASE)
VOICE_TRANSCRIPT_RE = re.compile( VOICE_TRANSCRIPT_RE = re.compile(
r"^(?:wisper\s*:\s*)?voice\s+transcript\s*:\s*", r"^(?:wisper\s*:\s*)?voice\s+transcript\s*:\s*",
re.IGNORECASE, re.IGNORECASE,
@ -77,7 +81,6 @@ VOICE_TRANSCRIPT_RE = re.compile(
ANSI_ESCAPE_RE = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])") ANSI_ESCAPE_RE = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])")
CONTROL_CHAR_RE = re.compile(r"[\x00-\x1f\x7f]") CONTROL_CHAR_RE = re.compile(r"[\x00-\x1f\x7f]")
BRAILLE_SPINNER_RE = re.compile(r"[\u2800-\u28ff]") BRAILLE_SPINNER_RE = re.compile(r"[\u2800-\u28ff]")
SENTENCE_END_RE = re.compile(r"[.!?]\s*$")
TTS_ALLOWED_ASCII = set( TTS_ALLOWED_ASCII = set(
"ABCDEFGHIJKLMNOPQRSTUVWXYZ" "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
"abcdefghijklmnopqrstuvwxyz" "abcdefghijklmnopqrstuvwxyz"
@ -141,6 +144,15 @@ if AIORTC_AVAILABLE:
self._closed = False self._closed = False
self._frame_duration_s = frame_ms / 1000.0 self._frame_duration_s = frame_ms / 1000.0
self._last_recv_at = 0.0 self._last_recv_at = 0.0
self._playing = False
self._idle_frames = 0
# Number of consecutive silent frames before signalling idle.
# At 20ms per frame, 15 frames = 300ms grace period to avoid
# flickering between TTS synthesis chunks.
self._idle_grace_frames = max(
1, int(os.getenv("HOST_RTC_IDLE_GRACE_MS", "300")) // max(1, frame_ms)
)
self._on_playing_changed: Callable[[bool], None] | None = None
async def enqueue_pcm( async def enqueue_pcm(
self, pcm: bytes, sample_rate: int, channels: int = 1 self, pcm: bytes, sample_rate: int, channels: int = 1
@ -211,8 +223,24 @@ if AIORTC_AVAILABLE:
try: try:
payload = self._queue.get_nowait() payload = self._queue.get_nowait()
has_audio = True
except asyncio.QueueEmpty: except asyncio.QueueEmpty:
payload = b"\x00" * self._bytes_per_frame payload = b"\x00" * self._bytes_per_frame
has_audio = False
# Notify when playback state changes.
if has_audio:
self._idle_frames = 0
if not self._playing:
self._playing = True
if self._on_playing_changed:
self._on_playing_changed(True)
elif self._playing:
self._idle_frames += 1
if self._idle_frames >= self._idle_grace_frames:
self._playing = False
if self._on_playing_changed:
self._on_playing_changed(False)
self._last_recv_at = loop.time() self._last_recv_at = loop.time()
@ -233,6 +261,8 @@ if AIORTC_AVAILABLE:
else: else:
class QueueAudioTrack: # pragma: no cover - used only when aiortc is unavailable class QueueAudioTrack: # pragma: no cover - used only when aiortc is unavailable
_on_playing_changed: Callable[[bool], None] | None = None
async def enqueue_pcm( async def enqueue_pcm(
self, pcm: bytes, sample_rate: int, channels: int = 1 self, pcm: bytes, sample_rate: int, channels: int = 1
) -> None: ) -> None:
@ -343,6 +373,15 @@ class FasterWhisperSpeechToText:
).strip() ).strip()
or None or None
) )
self._repetition_penalty = float(
os.getenv("HOST_STT_REPETITION_PENALTY", "1.0")
)
raw_hallucination_threshold = os.getenv(
"HOST_STT_HALLUCINATION_SILENCE_THRESHOLD", ""
).strip()
self._hallucination_silence_threshold: float | None = (
float(raw_hallucination_threshold) if raw_hallucination_threshold else None
)
self._model: Any = None self._model: Any = None
self._init_error: str | None = None self._init_error: str | None = None
@ -429,6 +468,8 @@ class FasterWhisperSpeechToText:
log_prob_threshold=self._log_prob_threshold, log_prob_threshold=self._log_prob_threshold,
no_speech_threshold=self._no_speech_threshold, no_speech_threshold=self._no_speech_threshold,
compression_ratio_threshold=self._compression_ratio_threshold, compression_ratio_threshold=self._compression_ratio_threshold,
repetition_penalty=self._repetition_penalty,
hallucination_silence_threshold=self._hallucination_silence_threshold,
) )
transcript_parts: list[str] = [] transcript_parts: list[str] = []
for segment in segments: for segment in segments:
@ -456,6 +497,8 @@ class FasterWhisperSpeechToText:
log_prob_threshold=self._log_prob_threshold, log_prob_threshold=self._log_prob_threshold,
no_speech_threshold=self._no_speech_threshold, no_speech_threshold=self._no_speech_threshold,
compression_ratio_threshold=self._compression_ratio_threshold, compression_ratio_threshold=self._compression_ratio_threshold,
repetition_penalty=self._repetition_penalty,
hallucination_silence_threshold=self._hallucination_silence_threshold,
) )
transcript_parts: list[str] = [] transcript_parts: list[str] = []
for segment in segments: for segment in segments:
@ -541,11 +584,11 @@ class SupertonicTextToSpeech:
os.getenv("SUPERTONIC_MODEL", "supertonic-2").strip() or "supertonic-2" os.getenv("SUPERTONIC_MODEL", "supertonic-2").strip() or "supertonic-2"
) )
self._voice_style_name = ( self._voice_style_name = (
os.getenv("SUPERTONIC_VOICE_STYLE", "M1").strip() or "M1" os.getenv("SUPERTONIC_VOICE_STYLE", "F1").strip() or "F1"
) )
self._lang = os.getenv("SUPERTONIC_LANG", "en").strip() or "en" self._lang = os.getenv("SUPERTONIC_LANG", "en").strip() or "en"
self._total_steps = int(os.getenv("SUPERTONIC_TOTAL_STEPS", "5")) self._total_steps = int(os.getenv("SUPERTONIC_TOTAL_STEPS", "8"))
self._speed = float(os.getenv("SUPERTONIC_SPEED", "1.05")) self._speed = float(os.getenv("SUPERTONIC_SPEED", "1.5"))
self._intra_op_num_threads = _optional_int_env("SUPERTONIC_INTRA_OP_THREADS") self._intra_op_num_threads = _optional_int_env("SUPERTONIC_INTRA_OP_THREADS")
self._inter_op_num_threads = _optional_int_env("SUPERTONIC_INTER_OP_THREADS") self._inter_op_num_threads = _optional_int_env("SUPERTONIC_INTER_OP_THREADS")
self._auto_download = os.getenv( self._auto_download = os.getenv(
@ -605,7 +648,7 @@ class SupertonicTextToSpeech:
message = str(exc) message = str(exc)
if "unsupported character" not in message.lower(): if "unsupported character" not in message.lower():
raise raise
fallback_text = self._sanitize_text_for_supertonic(text) fallback_text = _sanitize_tts_text(text)
if not fallback_text or fallback_text == text: if not fallback_text or fallback_text == text:
raise raise
wav, _duration = self._engine.synthesize( wav, _duration = self._engine.synthesize(
@ -655,9 +698,6 @@ class SupertonicTextToSpeech:
channels=max(1, channels), channels=max(1, channels),
) )
def _sanitize_text_for_supertonic(self, text: str) -> str:
return _sanitize_tts_text(text)
def _initialize_blocking(self) -> None: def _initialize_blocking(self) -> None:
if self._engine is not None and self._voice_style is not None: if self._engine is not None and self._voice_style is not None:
return return
@ -853,23 +893,15 @@ class WebRTCVoiceSession:
maxsize=self._stt_segment_queue_size maxsize=self._stt_segment_queue_size
) )
self._tts_buffer = "" self._tts_chunks: list[str] = []
self._tts_flush_handle: asyncio.TimerHandle | None = None self._tts_flush_handle: asyncio.TimerHandle | None = None
self._tts_flush_lock = asyncio.Lock() self._tts_flush_lock = asyncio.Lock()
self._tts_buffer_lock = asyncio.Lock() self._tts_buffer_lock = asyncio.Lock()
self._tts_flush_delay_s = max( # How long to wait after the last incoming chunk before flushing the
0.08, float(os.getenv("HOST_TTS_FLUSH_DELAY_S", "0.45")) # entire accumulated response to TTS in one go.
self._tts_response_end_delay_s = max(
0.1, float(os.getenv("HOST_TTS_RESPONSE_END_DELAY_S", "1.5"))
) )
self._tts_sentence_flush_delay_s = max(
0.06,
float(os.getenv("HOST_TTS_SENTENCE_FLUSH_DELAY_S", "0.15")),
)
self._tts_min_chars = max(1, int(os.getenv("HOST_TTS_MIN_CHARS", "10")))
self._tts_max_wait_ms = max(300, int(os.getenv("HOST_TTS_MAX_WAIT_MS", "1800")))
self._tts_max_chunk_chars = max(
60, int(os.getenv("HOST_TTS_MAX_CHUNK_CHARS", "140"))
)
self._tts_buffer_started_at = 0.0
self._closed = False self._closed = False
self._stt_unavailable_notice_sent = False self._stt_unavailable_notice_sent = False
@ -887,14 +919,7 @@ class WebRTCVoiceSession:
) )
), ),
) )
self._stt_max_ptt_ms = max(
self._stt_min_ptt_ms,
int(
os.getenv(
"HOST_STT_MAX_PTT_MS", os.getenv("HOST_STT_MAX_SEGMENT_MS", "12000")
)
),
)
self._stt_suppress_during_tts = os.getenv( self._stt_suppress_during_tts = os.getenv(
"HOST_STT_SUPPRESS_DURING_TTS", "1" "HOST_STT_SUPPRESS_DURING_TTS", "1"
).strip() not in { ).strip() not in {
@ -924,22 +949,16 @@ class WebRTCVoiceSession:
normalized_chunk = chunk.strip() normalized_chunk = chunk.strip()
if not normalized_chunk: if not normalized_chunk:
return return
flush_delay = (
self._tts_sentence_flush_delay_s
if SENTENCE_END_RE.search(normalized_chunk)
else self._tts_flush_delay_s
)
loop = asyncio.get_running_loop()
async with self._tts_buffer_lock: async with self._tts_buffer_lock:
if not self._pc or not self._outbound_track: if not self._pc or not self._outbound_track:
return return
if not self._tts_buffer.strip(): # Keep line boundaries between streamed chunks so line-based filters
self._tts_buffer_started_at = loop.time() # stay accurate while avoiding repeated full-string copies.
self._tts_buffer = normalized_chunk self._tts_chunks.append(normalized_chunk)
else: # Reset the flush timer on every incoming chunk so the entire
# Keep line boundaries between streamed chunks so line-based filters stay accurate. # response is accumulated before synthesis begins. The timer
self._tts_buffer = f"{self._tts_buffer}\n{normalized_chunk}" # fires once no new chunks arrive for the configured delay.
self._schedule_tts_flush_after(flush_delay) self._schedule_tts_flush_after(self._tts_response_end_delay_s)
async def handle_offer(self, payload: dict[str, Any]) -> None: async def handle_offer(self, payload: dict[str, Any]) -> None:
if not AIORTC_AVAILABLE or not RTCPeerConnection or not RTCSessionDescription: if not AIORTC_AVAILABLE or not RTCPeerConnection or not RTCSessionDescription:
@ -965,6 +984,7 @@ class WebRTCVoiceSession:
peer_connection = RTCPeerConnection() peer_connection = RTCPeerConnection()
self._pc = peer_connection self._pc = peer_connection
self._outbound_track = QueueAudioTrack() self._outbound_track = QueueAudioTrack()
self._outbound_track._on_playing_changed = self._on_track_playing_changed
peer_connection.addTrack(self._outbound_track) peer_connection.addTrack(self._outbound_track)
@peer_connection.on("connectionstatechange") @peer_connection.on("connectionstatechange")
@ -1096,8 +1116,7 @@ class WebRTCVoiceSession:
if self._tts_flush_handle: if self._tts_flush_handle:
self._tts_flush_handle.cancel() self._tts_flush_handle.cancel()
self._tts_flush_handle = None self._tts_flush_handle = None
self._tts_buffer = "" self._tts_chunks.clear()
self._tts_buffer_started_at = 0.0
if self._incoming_audio_task: if self._incoming_audio_task:
self._incoming_audio_task.cancel() self._incoming_audio_task.cancel()
@ -1136,61 +1155,45 @@ class WebRTCVoiceSession:
async with self._tts_flush_lock: async with self._tts_flush_lock:
async with self._tts_buffer_lock: async with self._tts_buffer_lock:
self._tts_flush_handle = None self._tts_flush_handle = None
raw_text = self._tts_buffer raw_text = "\n".join(self._tts_chunks)
buffer_started_at = self._tts_buffer_started_at self._tts_chunks.clear()
self._tts_buffer = ""
self._tts_buffer_started_at = 0.0
clean_text = self._clean_tts_text(raw_text) clean_text = self._clean_tts_text(raw_text)
if not clean_text: if not clean_text:
return return
loop = asyncio.get_running_loop()
now = loop.time()
buffer_age_ms = int(max(0.0, now - buffer_started_at) * 1000)
should_wait_for_more = (
len(clean_text) < self._tts_min_chars
and not SENTENCE_END_RE.search(clean_text)
and buffer_age_ms < self._tts_max_wait_ms
)
if should_wait_for_more:
async with self._tts_buffer_lock:
if self._tts_buffer.strip():
self._tts_buffer = f"{clean_text}\n{self._tts_buffer}".strip()
else:
self._tts_buffer = clean_text
if self._tts_buffer_started_at <= 0.0:
self._tts_buffer_started_at = (
buffer_started_at if buffer_started_at > 0.0 else now
)
self._schedule_tts_flush_after(self._tts_flush_delay_s)
return
if not self._outbound_track: if not self._outbound_track:
return return
for part in self._chunk_tts_text(clean_text): try:
try: audio = await self._tts.synthesize(clean_text)
audio = await self._tts.synthesize(part) except asyncio.CancelledError:
except Exception as exc: raise
await self._publish_system(f"Host TTS failed: {exc}") except Exception as exc:
return import traceback # noqa: local import in exception handler
if not audio: traceback.print_exc()
if not self._tts_unavailable_notice_sent: # Restore the lost text so a future flush can retry it.
self._tts_unavailable_notice_sent = True async with self._tts_buffer_lock:
await self._publish_system( self._tts_chunks.insert(0, clean_text)
f"Host TTS backend is unavailable. {self._tts.unavailable_reason()}" await self._publish_system(f"TTS synthesis error: {exc}")
) return
return
if not self._outbound_track: if not audio:
return if not self._tts_unavailable_notice_sent:
self._extend_stt_suppression(audio) self._tts_unavailable_notice_sent = True
await self._outbound_track.enqueue_pcm( await self._publish_system(
pcm=audio.pcm, f"Host TTS backend is unavailable. {self._tts.unavailable_reason()}"
sample_rate=audio.sample_rate, )
channels=audio.channels, return
)
if not self._outbound_track:
return
self._extend_stt_suppression(audio)
await self._outbound_track.enqueue_pcm(
pcm=audio.pcm,
sample_rate=audio.sample_rate,
channels=audio.channels,
)
def _extend_stt_suppression(self, audio: PCMChunk) -> None: def _extend_stt_suppression(self, audio: PCMChunk) -> None:
if not self._stt_suppress_during_tts: if not self._stt_suppress_during_tts:
@ -1221,7 +1224,6 @@ class WebRTCVoiceSession:
resample_state = None resample_state = None
recording = False recording = False
recording_started_at = 0.0 recording_started_at = 0.0
recording_truncated = False
segment_ms = 0.0 segment_ms = 0.0
segment_buffer = bytearray() segment_buffer = bytearray()
@ -1253,7 +1255,6 @@ class WebRTCVoiceSession:
): ):
recording = False recording = False
recording_started_at = 0.0 recording_started_at = 0.0
recording_truncated = False
segment_ms = 0.0 segment_ms = 0.0
segment_buffer = bytearray() segment_buffer = bytearray()
continue continue
@ -1262,20 +1263,11 @@ class WebRTCVoiceSession:
if not recording: if not recording:
recording = True recording = True
recording_started_at = asyncio.get_running_loop().time() recording_started_at = asyncio.get_running_loop().time()
recording_truncated = False
segment_ms = 0.0 segment_ms = 0.0
segment_buffer = bytearray() segment_buffer = bytearray()
if not recording_truncated: segment_buffer.extend(pcm16)
next_segment_ms = segment_ms + frame_ms segment_ms += frame_ms
if next_segment_ms <= self._stt_max_ptt_ms:
segment_buffer.extend(pcm16)
segment_ms = next_segment_ms
else:
recording_truncated = True
await self._publish_system(
"PTT max length reached; extra audio will be ignored until release."
)
continue continue
if recording: if recording:
@ -1291,7 +1283,6 @@ class WebRTCVoiceSession:
) )
recording = False recording = False
recording_started_at = 0.0 recording_started_at = 0.0
recording_truncated = False
segment_ms = 0.0 segment_ms = 0.0
segment_buffer = bytearray() segment_buffer = bytearray()
except asyncio.CancelledError: except asyncio.CancelledError:
@ -1456,10 +1447,21 @@ class WebRTCVoiceSession:
async def _publish_system(self, text: str) -> None: async def _publish_system(self, text: str) -> None:
await self._gateway.bus.publish(WisperEvent(role="system", text=text)) await self._gateway.bus.publish(WisperEvent(role="system", text=text))
async def _publish_agent_state(self, state: str) -> None:
await self._gateway.bus.publish(WisperEvent(role="agent-state", text=state))
def _on_track_playing_changed(self, playing: bool) -> None:
"""Called from QueueAudioTrack.recv() when audio playback starts or stops."""
try:
loop = asyncio.get_running_loop()
except RuntimeError:
return
loop.create_task(self._publish_agent_state("speaking" if playing else "idle"))
def _clean_tts_text(self, raw_text: str) -> str: def _clean_tts_text(self, raw_text: str) -> str:
lines = [line.strip() for line in raw_text.splitlines() if line.strip()] lines = [line.strip() for line in raw_text.splitlines() if line.strip()]
useful_lines = [ useful_lines = [
line AGENT_PREFIX_RE.sub("", line)
for line in lines for line in lines
if not SPEECH_FILTER_RE.match(line) if not SPEECH_FILTER_RE.match(line)
and not THINKING_STATUS_RE.search(line) and not THINKING_STATUS_RE.search(line)
@ -1468,71 +1470,6 @@ class WebRTCVoiceSession:
] ]
return _sanitize_tts_text(" ".join(useful_lines)) return _sanitize_tts_text(" ".join(useful_lines))
def _chunk_tts_text(self, text: str) -> list[str]:
clean_text = " ".join(text.split())
if not clean_text:
return []
max_chars = max(60, int(self._tts_max_chunk_chars))
if len(clean_text) <= max_chars:
return [clean_text]
sentence_parts = [
part.strip()
for part in re.split(r"(?<=[.!?])\s+", clean_text)
if part.strip()
]
if not sentence_parts:
sentence_parts = [clean_text]
chunks: list[str] = []
for part in sentence_parts:
if len(part) <= max_chars:
chunks.append(part)
else:
chunks.extend(self._chunk_tts_words(part, max_chars))
return chunks
def _chunk_tts_words(self, text: str, max_chars: int) -> list[str]:
words = [word for word in text.split() if word]
if not words:
return []
chunks: list[str] = []
current_words: list[str] = []
current_len = 0
for word in words:
if len(word) > max_chars:
if current_words:
chunks.append(" ".join(current_words))
current_words = []
current_len = 0
start = 0
while start < len(word):
end = min(start + max_chars, len(word))
piece = word[start:end]
if len(piece) == max_chars:
chunks.append(piece)
else:
current_words = [piece]
current_len = len(piece)
start = end
continue
extra = len(word) if not current_words else (1 + len(word))
if current_words and (current_len + extra) > max_chars:
chunks.append(" ".join(current_words))
current_words = [word]
current_len = len(word)
else:
current_words.append(word)
current_len += extra
if current_words:
chunks.append(" ".join(current_words))
return chunks
def _frame_to_pcm16k_mono( def _frame_to_pcm16k_mono(
self, frame: AudioFrame, resample_state: tuple[Any, ...] | None self, frame: AudioFrame, resample_state: tuple[Any, ...] | None
) -> tuple[bytes, float, tuple[Any, ...] | None]: ) -> tuple[bytes, float, tuple[Any, ...] | None]:

View file

@ -1,8 +1,20 @@
import asyncio import asyncio
import contextlib
import os
from dataclasses import dataclass, field from dataclasses import dataclass, field
from datetime import datetime, timezone from datetime import datetime, timezone
def _positive_int_env(name: str, default: int) -> int:
raw_value = os.getenv(name, "").strip()
if not raw_value:
return default
try:
return max(1, int(raw_value))
except ValueError:
return default
@dataclass(slots=True) @dataclass(slots=True)
class WisperEvent: class WisperEvent:
role: str role: str
@ -19,19 +31,29 @@ class WisperBus:
def __init__(self) -> None: def __init__(self) -> None:
self._subscribers: set[asyncio.Queue[WisperEvent]] = set() self._subscribers: set[asyncio.Queue[WisperEvent]] = set()
self._lock = asyncio.Lock() self._lock = asyncio.Lock()
self._subscriber_snapshot: tuple[asyncio.Queue[WisperEvent], ...] = ()
self._subscriber_queue_size = _positive_int_env(
"WISPER_SUBSCRIBER_QUEUE_SIZE", 512
)
async def subscribe(self) -> asyncio.Queue[WisperEvent]: async def subscribe(self) -> asyncio.Queue[WisperEvent]:
queue: asyncio.Queue[WisperEvent] = asyncio.Queue() queue: asyncio.Queue[WisperEvent] = asyncio.Queue(
maxsize=self._subscriber_queue_size
)
async with self._lock: async with self._lock:
self._subscribers.add(queue) self._subscribers.add(queue)
self._subscriber_snapshot = tuple(self._subscribers)
return queue return queue
async def unsubscribe(self, queue: asyncio.Queue[WisperEvent]) -> None: async def unsubscribe(self, queue: asyncio.Queue[WisperEvent]) -> None:
async with self._lock: async with self._lock:
self._subscribers.discard(queue) self._subscribers.discard(queue)
self._subscriber_snapshot = tuple(self._subscribers)
async def publish(self, event: WisperEvent) -> None: async def publish(self, event: WisperEvent) -> None:
async with self._lock: for queue in self._subscriber_snapshot:
subscribers = list(self._subscribers) if queue.full():
for queue in subscribers: with contextlib.suppress(asyncio.QueueEmpty):
queue.put_nowait(event) queue.get_nowait()
with contextlib.suppress(asyncio.QueueFull):
queue.put_nowait(event)