from __future__ import annotations import json import re from datetime import datetime, timezone from typing import Any from card_store import coerce_card_record, persist_card from workbench_store import persist_workbench_item, workbench_notification_target _JSONRPC_VERSION = "2.0" _TTS_SENTENCE_BREAK_RE = re.compile(r"(?<=[.!?])\s+") _TTS_CLAUSE_BREAK_RE = re.compile(r"(?<=[,;:])\s+") _TTS_SEGMENT_TARGET_CHARS = 180 _TTS_SEGMENT_MAX_CHARS = 260 def encode_sse_data(payload: dict[str, Any]) -> str: return f"data: {json.dumps(payload, ensure_ascii=False)}\n\n" def context_label_from_message_metadata(metadata: dict[str, Any]) -> str: return str(metadata.get("context_label", "")).strip() def _decode_object(raw: str) -> dict[str, Any] | None: try: payload = json.loads(raw) except (TypeError, ValueError): return None return payload if isinstance(payload, dict) else None def _wrap_tts_words(text: str, max_chars: int) -> list[str]: words = text.split() if not words: return [] chunks: list[str] = [] current = words[0] for word in words[1:]: candidate = f"{current} {word}" if len(candidate) <= max_chars: current = candidate continue chunks.append(current) current = word chunks.append(current) return chunks def chunk_tts_text(text: str) -> list[str]: normalized = text.replace("\r\n", "\n").strip() if not normalized: return [] chunks: list[str] = [] paragraphs = [part.strip() for part in re.split(r"\n{2,}", normalized) if part.strip()] for paragraph in paragraphs: compact = re.sub(r"\s+", " ", paragraph).strip() if not compact: continue sentences = [ sentence.strip() for sentence in _TTS_SENTENCE_BREAK_RE.split(compact) if sentence.strip() ] if not sentences: sentences = [compact] current = "" for sentence in sentences: parts = [sentence] if len(sentence) > _TTS_SEGMENT_MAX_CHARS: parts = [ clause.strip() for clause in _TTS_CLAUSE_BREAK_RE.split(sentence) if clause.strip() ] or [sentence] for part in parts: if len(part) > _TTS_SEGMENT_MAX_CHARS: if current: chunks.append(current) current = "" chunks.extend(_wrap_tts_words(part, _TTS_SEGMENT_MAX_CHARS)) continue candidate = part if not current else f"{current} {part}" if len(candidate) <= _TTS_SEGMENT_TARGET_CHARS: current = candidate continue if current: chunks.append(current) current = part if current: chunks.append(current) return chunks or [re.sub(r"\s+", " ", normalized).strip()] def typed_message_from_gateway_event(event_dict: dict[str, Any]) -> dict[str, Any] | None: role = str(event_dict.get("role", "")).strip() text = str(event_dict.get("text", "")) timestamp = str(event_dict.get("timestamp", "")) if role == "agent-state": return {"type": "agent_state", "state": text} if role in {"nanobot", "nanobot-progress", "nanobot-tool", "system", "user"}: return { "type": "message", "role": role, "content": text, "is_progress": role in {"nanobot-progress", "nanobot-tool"}, "is_tool_hint": role == "nanobot-tool", "timestamp": timestamp, } if role == "card": payload = _decode_object(text) if payload is None: return None if workbench_notification_target(payload): item = persist_workbench_item(payload) if item is None: return None item["type"] = "workbench" return item card = coerce_card_record(payload) if card is None: return None card["type"] = "card" return card if role == "workbench": payload = _decode_object(text) if payload is None: return None item = persist_workbench_item(payload) if item is None: return None item["type"] = "workbench" return item return None def typed_message_from_api_notification( obj: dict[str, Any], *, default_chat_id: str | None = None ) -> dict[str, Any] | None: if obj.get("jsonrpc") != _JSONRPC_VERSION or "method" not in obj: return None method = str(obj.get("method", "")).strip() params = obj.get("params", {}) if not isinstance(params, dict): params = {} timestamp = datetime.now(timezone.utc).isoformat() if method == "message": content = str(params.get("content", "")) is_progress = bool(params.get("is_progress", False)) is_tool_hint = bool(params.get("is_tool_hint", False)) return { "type": "message", "role": "nanobot-tool" if is_tool_hint else ("nanobot-progress" if is_progress else "nanobot"), "content": content, "is_progress": is_progress, "is_tool_hint": is_tool_hint, "timestamp": timestamp, } if method == "agent_state": return {"type": "agent_state", "state": str(params.get("state", "")).strip()} if method == "card": if workbench_notification_target(params): if default_chat_id and not str(params.get("chat_id", "")).strip(): params = {**params, "chat_id": default_chat_id} persisted = persist_workbench_item(params, default_chat_id=default_chat_id) if persisted is None: return None payload = dict(persisted) payload["type"] = "workbench" return payload if default_chat_id and not str(params.get("chat_id", "")).strip(): params = {**params, "chat_id": default_chat_id} persisted = persist_card(params) if persisted is None: return None payload = dict(persisted) payload["type"] = "card" return payload if method == "workbench": if default_chat_id and not str(params.get("chat_id", "")).strip(): params = {**params, "chat_id": default_chat_id} persisted = persist_workbench_item(params, default_chat_id=default_chat_id) if persisted is None: return None payload = dict(persisted) payload["type"] = "workbench" return payload return None