from __future__ import annotations import html import json import os import re import shutil from datetime import datetime, timezone from pathlib import Path from typing import Any NANOBOT_WORKSPACE = Path(os.getenv("NANOBOT_WORKSPACE", str(Path.home() / ".nanobot"))).expanduser() CARDS_ROOT = NANOBOT_WORKSPACE / "cards" CARD_INSTANCES_DIR = CARDS_ROOT / "instances" CARD_TEMPLATES_DIR = CARDS_ROOT / "templates" TEMPLATES_CONTEXT_PATH = NANOBOT_WORKSPACE / "CARD_TEMPLATES.md" MAX_TEMPLATES_IN_PROMPT = 12 MAX_TEMPLATE_HTML_CHARS = 4000 _INVALID_TEMPLATE_KEY_CHARS = re.compile(r"[^a-z0-9_-]+") _CARD_ID_PATTERN = re.compile(r"^[a-zA-Z0-9_-]{1,128}$") _CARD_LANE_ORDER = {"attention": 0, "work": 1, "context": 2, "history": 3} _CARD_STATE_ORDER = {"active": 0, "stale": 1, "resolved": 2, "superseded": 3, "archived": 4} def ensure_card_store_dirs() -> None: CARD_INSTANCES_DIR.mkdir(parents=True, exist_ok=True) CARD_TEMPLATES_DIR.mkdir(parents=True, exist_ok=True) def _normalize_template_key(raw: str) -> str: key = _INVALID_TEMPLATE_KEY_CHARS.sub("-", raw.strip().lower()).strip("-") return key[:64] def _normalize_card_id(raw: str) -> str: card_id = raw.strip() return card_id if _CARD_ID_PATTERN.fullmatch(card_id) else "" def _card_instance_dir(card_id: str) -> Path | None: card_id_clean = _normalize_card_id(card_id) if not card_id_clean: return None return CARD_INSTANCES_DIR / card_id_clean def _card_meta_path(card_id: str) -> Path | None: instance_dir = _card_instance_dir(card_id) if instance_dir is None: return None return instance_dir / "card.json" def _card_state_path(card_id: str) -> Path | None: instance_dir = _card_instance_dir(card_id) if instance_dir is None: return None return instance_dir / "state.json" def _parse_iso_datetime(raw: str) -> datetime | None: value = raw.strip() if not value: return None try: parsed = datetime.fromisoformat(value.replace("Z", "+00:00")) except ValueError: return None if parsed.tzinfo is None: parsed = parsed.replace(tzinfo=timezone.utc) return parsed.astimezone(timezone.utc) def _coerce_card_record(raw: dict[str, Any]) -> dict[str, Any] | None: card_id = _normalize_card_id(str(raw.get("id", ""))) if not card_id: return None kind = str(raw.get("kind", "text") or "text").strip().lower() if kind not in {"text", "question"}: kind = "text" lane = str(raw.get("lane", "context") or "context").strip().lower() if lane not in _CARD_LANE_ORDER: lane = "context" state = str(raw.get("state", "active") or "active").strip().lower() if state not in _CARD_STATE_ORDER: state = "active" try: priority = int(raw.get("priority", 50)) except (TypeError, ValueError): priority = 50 priority = max(0, min(priority, 100)) raw_choices = raw.get("choices", []) choices = [str(choice) for choice in raw_choices] if isinstance(raw_choices, list) else [] raw_template_state = raw.get("template_state", {}) template_state = raw_template_state if isinstance(raw_template_state, dict) else {} return { "id": card_id, "kind": kind, "title": str(raw.get("title", "")), "content": str(raw.get("content", "")), "question": str(raw.get("question", "")), "choices": choices, "response_value": str(raw.get("response_value", "")), "slot": str(raw.get("slot", "")), "lane": lane, "priority": priority, "state": state, "template_key": str(raw.get("template_key", "")), "template_state": template_state, "context_summary": str(raw.get("context_summary", "")), "chat_id": str(raw.get("chat_id", "web") or "web"), "snooze_until": str(raw.get("snooze_until", "") or ""), "created_at": str(raw.get("created_at", "")), "updated_at": str(raw.get("updated_at", "")), } def _json_script_text(payload: dict[str, Any]) -> str: return json.dumps(payload, ensure_ascii=False).replace(" Path: return CARD_TEMPLATES_DIR / template_key def _template_html_path(template_key: str) -> Path: return _template_dir(template_key) / "template.html" def _template_meta_path(template_key: str) -> Path: return _template_dir(template_key) / "manifest.json" def _read_template_meta(template_key: str) -> dict[str, Any]: meta_path = _template_meta_path(template_key) try: meta = json.loads(meta_path.read_text(encoding="utf-8")) return meta if isinstance(meta, dict) else {} except Exception: return {} def _materialize_card_content(card: dict[str, Any]) -> str: if card.get("kind") != "text": return str(card.get("content", "")) template_key = str(card.get("template_key", "")).strip() if not template_key: return str(card.get("content", "")) html_path = _template_html_path(template_key) try: template_html = html_path.read_text(encoding="utf-8") except Exception: return ( '
' f"Missing template: {html.escape(template_key)}" "
" ) state_payload = card.get("template_state", {}) if not isinstance(state_payload, dict): state_payload = {} card_id = html.escape(str(card.get("id", ""))) safe_template_key = html.escape(template_key) return ( f'
' f'' f"{template_html}" "
" ) def _load_card(card_id: str) -> dict[str, Any] | None: meta_path = _card_meta_path(card_id) if meta_path is None or not meta_path.exists(): return None try: raw = json.loads(meta_path.read_text(encoding="utf-8")) except Exception: return None if not isinstance(raw, dict): return None state_path = _card_state_path(card_id) if state_path is not None and state_path.exists(): try: raw_state = json.loads(state_path.read_text(encoding="utf-8")) except Exception: raw_state = {} if isinstance(raw_state, dict): raw["template_state"] = raw_state card = _coerce_card_record(raw) if card is None: return None card["content"] = _materialize_card_content(card) return card def _write_card(card: dict[str, Any]) -> dict[str, Any] | None: normalized = _coerce_card_record(card) if normalized is None: return None now = datetime.now(timezone.utc).isoformat() existing = _load_card(normalized["id"]) if existing is not None: normalized["created_at"] = existing.get("created_at") or normalized.get("created_at") or now else: normalized["created_at"] = normalized.get("created_at") or now normalized["updated_at"] = normalized.get("updated_at") or now instance_dir = _card_instance_dir(normalized["id"]) meta_path = _card_meta_path(normalized["id"]) state_path = _card_state_path(normalized["id"]) if instance_dir is None or meta_path is None or state_path is None: return None instance_dir.mkdir(parents=True, exist_ok=True) template_state = normalized.pop("template_state", {}) meta_path.write_text( json.dumps(normalized, indent=2, ensure_ascii=False) + "\n", encoding="utf-8", ) if normalized["kind"] == "text": state_path.write_text( json.dumps( template_state if isinstance(template_state, dict) else {}, indent=2, ensure_ascii=False, ) + "\n", encoding="utf-8", ) else: state_path.unlink(missing_ok=True) return _load_card(normalized["id"]) def _sort_cards(cards: list[dict[str, Any]]) -> list[dict[str, Any]]: return sorted( cards, key=lambda item: ( _CARD_LANE_ORDER.get(str(item.get("lane", "context")), 99), _CARD_STATE_ORDER.get(str(item.get("state", "active")), 99), -int(item.get("priority", 0) or 0), str(item.get("updated_at", "")), str(item.get("created_at", "")), ), reverse=False, ) def _load_cards(chat_id: str | None = None) -> list[dict[str, Any]]: ensure_card_store_dirs() cards: list[dict[str, Any]] = [] now = datetime.now(timezone.utc) for instance_dir in CARD_INSTANCES_DIR.iterdir(): if not instance_dir.is_dir(): continue card = _load_card(instance_dir.name) if card is None: continue if chat_id is not None and str(card.get("chat_id", "web") or "web") != chat_id: continue if card.get("state") == "archived": continue snooze_until = _parse_iso_datetime(str(card.get("snooze_until", "") or "")) if snooze_until is not None and snooze_until > now: continue cards.append(card) return _sort_cards(cards) def _find_card_by_slot(slot: str, *, chat_id: str) -> dict[str, Any] | None: target_slot = slot.strip() if not target_slot: return None for card in _load_cards(): if str(card.get("chat_id", "")).strip() != chat_id: continue if str(card.get("slot", "")).strip() == target_slot: return card return None def _persist_card(card: dict[str, Any]) -> dict[str, Any] | None: normalized = _coerce_card_record(card) if normalized is None: return None existing_same_slot = None slot = str(normalized.get("slot", "")).strip() chat_id = str(normalized.get("chat_id", "web") or "web") if slot: existing_same_slot = _find_card_by_slot(slot, chat_id=chat_id) if existing_same_slot is not None and existing_same_slot["id"] != normalized["id"]: superseded = dict(existing_same_slot) superseded["state"] = "superseded" superseded["updated_at"] = datetime.now(timezone.utc).isoformat() _write_card(superseded) return _write_card(normalized) def _delete_card(card_id: str) -> bool: instance_dir = _card_instance_dir(card_id) if instance_dir is None or not instance_dir.exists(): return False shutil.rmtree(instance_dir, ignore_errors=True) return True def _delete_cards_for_chat(chat_id: str) -> int: ensure_card_store_dirs() removed = 0 for instance_dir in CARD_INSTANCES_DIR.iterdir(): if not instance_dir.is_dir(): continue card = _load_card(instance_dir.name) if card is None: continue if str(card.get("chat_id", "web") or "web") != chat_id: continue if _delete_card(instance_dir.name): removed += 1 return removed def _list_templates(limit: int | None = None) -> list[dict[str, Any]]: ensure_card_store_dirs() templates: list[dict[str, Any]] = [] for template_dir in CARD_TEMPLATES_DIR.iterdir(): if not template_dir.is_dir(): continue key = _normalize_template_key(template_dir.name) if not key: continue html_path = _template_html_path(key) if not html_path.exists(): continue try: content = html_path.read_text(encoding="utf-8") except Exception: continue stat = html_path.stat() meta = _read_template_meta(key) if bool(meta.get("deprecated")): continue created_at = str( meta.get("created_at") or datetime.fromtimestamp(stat.st_ctime, timezone.utc).isoformat() ) updated_at = str( meta.get("updated_at") or datetime.fromtimestamp(stat.st_mtime, timezone.utc).isoformat() ) templates.append( { "id": key, "key": key, "title": str(meta.get("title", "")), "content": content, "notes": str(meta.get("notes", "")), "example_state": meta.get("example_state", {}), "created_at": created_at, "updated_at": updated_at, "file_url": f"/card-templates/{key}/template.html", } ) templates.sort(key=lambda item: item["updated_at"], reverse=True) if limit is not None: return templates[: max(0, limit)] return templates def _render_templates_markdown(rows: list[dict[str, Any]]) -> str: lines = [ "# Card Templates", "", "These are user-approved template layouts for `mcp_display_render_card` cards.", "Each card instance should provide a `template_key` and a `template_state` JSON object.", "Use a matching template when the request intent fits.", "Do not rewrite the HTML layout when an existing template already fits; fill the template_state instead.", "", ] for row in rows: key = str(row.get("key", "")).strip() or "unnamed" title = str(row.get("title", "")).strip() or "(untitled)" notes = str(row.get("notes", "")).strip() or "(no usage notes)" content = str(row.get("content", "")).strip() example_state = row.get("example_state", {}) if len(content) > MAX_TEMPLATE_HTML_CHARS: content = content[:MAX_TEMPLATE_HTML_CHARS] + "\n" html_lines = [f" {line}" for line in content.splitlines()] if content else [" "] state_text = ( json.dumps(example_state, indent=2, ensure_ascii=False) if isinstance(example_state, dict) else "{}" ) state_lines = [f" {line}" for line in state_text.splitlines()] lines.extend( [ f"## {key}", f"- Title: {title}", f"- Usage: {notes}", "- Example State:", *state_lines, "- HTML:", *html_lines, "", ] ) return "\n".join(lines).rstrip() + "\n" def _sync_templates_context_file() -> None: try: ensure_card_store_dirs() rows = _list_templates(limit=MAX_TEMPLATES_IN_PROMPT) if not rows: TEMPLATES_CONTEXT_PATH.unlink(missing_ok=True) return TEMPLATES_CONTEXT_PATH.parent.mkdir(parents=True, exist_ok=True) TEMPLATES_CONTEXT_PATH.write_text(_render_templates_markdown(rows), encoding="utf-8") except Exception: return normalize_template_key = _normalize_template_key normalize_card_id = _normalize_card_id parse_iso_datetime = _parse_iso_datetime coerce_card_record = _coerce_card_record template_dir = _template_dir template_html_path = _template_html_path template_meta_path = _template_meta_path read_template_meta = _read_template_meta load_card = _load_card load_cards = _load_cards write_card = _write_card persist_card = _persist_card delete_card = _delete_card delete_cards_for_chat = _delete_cards_for_chat list_templates = _list_templates sync_templates_context_file = _sync_templates_context_file json_script_text = _json_script_text __all__ = [ "CARD_INSTANCES_DIR", "CARD_TEMPLATES_DIR", "NANOBOT_WORKSPACE", "TEMPLATES_CONTEXT_PATH", "coerce_card_record", "delete_card", "delete_cards_for_chat", "ensure_card_store_dirs", "json_script_text", "list_templates", "load_card", "load_cards", "normalize_card_id", "normalize_template_key", "parse_iso_datetime", "persist_card", "read_template_meta", "sync_templates_context_file", "template_dir", "template_html_path", "template_meta_path", "write_card", ]