nanobot-voice-interface/ui_event_bus.py

48 lines
1.6 KiB
Python
Raw Permalink Normal View History

from __future__ import annotations
import asyncio
import uuid
from collections.abc import Iterable
from typing import Any
class UiEventBus:
def __init__(self) -> None:
self._lock = asyncio.Lock()
self._subscribers: dict[str, tuple[str, asyncio.Queue[dict[str, Any]]]] = {}
async def subscribe(self, chat_id: str) -> tuple[str, asyncio.Queue[dict[str, Any]]]:
queue: asyncio.Queue[dict[str, Any]] = asyncio.Queue(maxsize=64)
subscription_id = uuid.uuid4().hex
async with self._lock:
self._subscribers[subscription_id] = (chat_id, queue)
return subscription_id, queue
async def unsubscribe(self, subscription_id: str) -> None:
async with self._lock:
self._subscribers.pop(subscription_id, None)
async def publish(
self,
payload: dict[str, Any],
*,
chat_id: str | None = None,
broadcast: bool = False,
) -> None:
async with self._lock:
targets = list(self._iter_target_queues(chat_id=chat_id, broadcast=broadcast))
for queue in targets:
while queue.full():
try:
queue.get_nowait()
except asyncio.QueueEmpty:
break
queue.put_nowait(dict(payload))
def _iter_target_queues(
self, *, chat_id: str | None, broadcast: bool
) -> Iterable[asyncio.Queue[dict[str, Any]]]:
for subscriber_chat_id, queue in self._subscribers.values():
if broadcast or subscriber_chat_id == chat_id:
yield queue