from __future__ import annotations import asyncio import uuid from collections.abc import Iterable from typing import Any class UiEventBus: def __init__(self) -> None: self._lock = asyncio.Lock() self._subscribers: dict[str, tuple[str, asyncio.Queue[dict[str, Any]]]] = {} async def subscribe(self, chat_id: str) -> tuple[str, asyncio.Queue[dict[str, Any]]]: queue: asyncio.Queue[dict[str, Any]] = asyncio.Queue(maxsize=64) subscription_id = uuid.uuid4().hex async with self._lock: self._subscribers[subscription_id] = (chat_id, queue) return subscription_id, queue async def unsubscribe(self, subscription_id: str) -> None: async with self._lock: self._subscribers.pop(subscription_id, None) async def publish( self, payload: dict[str, Any], *, chat_id: str | None = None, broadcast: bool = False, ) -> None: async with self._lock: targets = list(self._iter_target_queues(chat_id=chat_id, broadcast=broadcast)) for queue in targets: while queue.full(): try: queue.get_nowait() except asyncio.QueueEmpty: break queue.put_nowait(dict(payload)) def _iter_target_queues( self, *, chat_id: str | None, broadcast: bool ) -> Iterable[asyncio.Queue[dict[str, Any]]]: for subscriber_chat_id, queue in self._subscribers.values(): if broadcast or subscriber_chat_id == chat_id: yield queue