import asyncio import contextlib import os from dataclasses import dataclass, field from datetime import datetime, timezone def _positive_int_env(name: str, default: int) -> int: raw_value = os.getenv(name, "").strip() if not raw_value: return default try: return max(1, int(raw_value)) except ValueError: return default @dataclass(slots=True) class WisperEvent: role: str text: str timestamp: str = field( default_factory=lambda: datetime.now(timezone.utc).isoformat(timespec="seconds") ) def to_dict(self) -> dict[str, str]: return {"role": self.role, "text": self.text, "timestamp": self.timestamp} class WisperBus: def __init__(self) -> None: self._subscribers: set[asyncio.Queue[WisperEvent]] = set() self._lock = asyncio.Lock() self._subscriber_snapshot: tuple[asyncio.Queue[WisperEvent], ...] = () self._subscriber_queue_size = _positive_int_env( "WISPER_SUBSCRIBER_QUEUE_SIZE", 512 ) async def subscribe(self) -> asyncio.Queue[WisperEvent]: queue: asyncio.Queue[WisperEvent] = asyncio.Queue( maxsize=self._subscriber_queue_size ) async with self._lock: self._subscribers.add(queue) self._subscriber_snapshot = tuple(self._subscribers) return queue async def unsubscribe(self, queue: asyncio.Queue[WisperEvent]) -> None: async with self._lock: self._subscribers.discard(queue) self._subscriber_snapshot = tuple(self._subscribers) async def publish(self, event: WisperEvent) -> None: for queue in self._subscriber_snapshot: if queue.full(): with contextlib.suppress(asyncio.QueueEmpty): queue.get_nowait() with contextlib.suppress(asyncio.QueueFull): queue.put_nowait(event)