from __future__ import annotations import asyncio import unittest from ui_event_bus import UiEventBus class UiEventBusTests(unittest.IsolatedAsyncioTestCase): async def test_publish_targets_chat_or_broadcasts(self) -> None: bus = UiEventBus() first_subscription, first_queue = await bus.subscribe("chat-a") second_subscription, second_queue = await bus.subscribe("chat-b") self.addAsyncCleanup(bus.unsubscribe, first_subscription) self.addAsyncCleanup(bus.unsubscribe, second_subscription) await bus.publish({"type": "cards.changed", "chat_id": "chat-a"}, chat_id="chat-a") self.assertEqual((await asyncio.wait_for(first_queue.get(), timeout=0.1))["type"], "cards.changed") self.assertTrue(second_queue.empty()) await bus.publish({"type": "sessions.changed"}, broadcast=True) self.assertEqual( (await asyncio.wait_for(first_queue.get(), timeout=0.1))["type"], "sessions.changed", ) self.assertEqual( (await asyncio.wait_for(second_queue.get(), timeout=0.1))["type"], "sessions.changed", ) async def test_publish_drops_oldest_entries_when_queue_is_full(self) -> None: bus = UiEventBus() subscription_id, queue = await bus.subscribe("chat-a") self.addAsyncCleanup(bus.unsubscribe, subscription_id) for seq in range(70): await bus.publish({"seq": seq}, chat_id="chat-a") self.assertEqual(queue.qsize(), 64) self.assertEqual((await queue.get())["seq"], 6)