from __future__ import annotations import json import os import re from datetime import datetime, timezone from pathlib import Path from typing import Any, Callable from card_store import NANOBOT_WORKSPACE NANOBOT_RUNTIME_WORKSPACE = Path( os.getenv( "NANOBOT_RUNTIME_WORKSPACE", str( (NANOBOT_WORKSPACE / "workspace") if (NANOBOT_WORKSPACE / "workspace").exists() else NANOBOT_WORKSPACE ), ) ).expanduser() SESSIONS_DIR = NANOBOT_RUNTIME_WORKSPACE / "sessions" _SESSION_CHAT_ID_PATTERN = re.compile(r"^[a-zA-Z0-9_-]{1,128}$") def ensure_session_store_dirs() -> None: SESSIONS_DIR.mkdir(parents=True, exist_ok=True) def _normalize_session_chat_id(raw: str) -> str: chat_id = raw.strip() return chat_id if _SESSION_CHAT_ID_PATTERN.fullmatch(chat_id) else "" def _is_web_session_chat_id(chat_id: str) -> bool: return chat_id == "web" or chat_id.startswith("web-") def _session_key(chat_id: str) -> str: return f"api:{chat_id}" def _session_path(chat_id: str) -> Path: return SESSIONS_DIR / f"api_{chat_id}.jsonl" def _session_summary_from_messages( chat_id: str, *, created_at: str, updated_at: str, metadata: dict[str, Any], messages: list[dict[str, Any]], ) -> dict[str, Any]: title = str(metadata.get("title", "")).strip() preview = "" for message in messages: if message.get("role") == "user": text = str(message.get("content", "")).strip() if text: if not title: title = text break for message in reversed(messages): role = str(message.get("role", "")).strip() text = str(message.get("content", "")).strip() if role in {"user", "assistant", "system"} and text: preview = text break if not title: title = "New conversation" return { "chat_id": chat_id, "key": _session_key(chat_id), "title": title[:120], "preview": preview[:240], "created_at": created_at, "updated_at": updated_at, "message_count": len(messages), } def _load_session_file(chat_id: str) -> tuple[dict[str, Any], list[dict[str, Any]]] | None: ensure_session_store_dirs() path = _session_path(chat_id) if not path.exists(): return None metadata: dict[str, Any] = { "key": _session_key(chat_id), "created_at": datetime.now(timezone.utc).isoformat(), "updated_at": datetime.now(timezone.utc).isoformat(), "metadata": {}, } messages: list[dict[str, Any]] = [] try: with path.open(encoding="utf-8") as handle: for raw_line in handle: line = raw_line.strip() if not line: continue try: payload = json.loads(line) except json.JSONDecodeError: continue if not isinstance(payload, dict): continue if payload.get("_type") == "metadata": metadata = payload continue messages.append(payload) except OSError: return None return metadata, messages def _serialize_session_messages(messages: list[dict[str, Any]]) -> list[dict[str, str]]: rows: list[dict[str, str]] = [] for message in messages: role = str(message.get("role", "")).strip().lower() content = str(message.get("content", "")).strip() timestamp = str(message.get("timestamp", "")).strip() if role not in {"user", "assistant", "tool", "system"}: continue if not content: continue rows.append( { "role": "nanobot" if role == "assistant" else role, "text": content, "timestamp": timestamp or datetime.now(timezone.utc).isoformat(), } ) return rows def _load_session_summary(chat_id: str) -> dict[str, Any] | None: loaded = _load_session_file(chat_id) if loaded is None: return None metadata, messages = loaded meta_dict = metadata.get("metadata", {}) return _session_summary_from_messages( chat_id, created_at=str(metadata.get("created_at", "")), updated_at=str(metadata.get("updated_at", "")), metadata=meta_dict if isinstance(meta_dict, dict) else {}, messages=messages, ) def _empty_web_session_payload() -> dict[str, Any]: now = datetime.now(timezone.utc).isoformat() return { "session": { "chat_id": "web", "key": _session_key("web"), "title": "Current conversation", "preview": "", "created_at": now, "updated_at": now, "message_count": 0, }, "messages": [], } def _load_session_payload(chat_id: str) -> dict[str, Any] | None: loaded = _load_session_file(chat_id) if loaded is None: return _empty_web_session_payload() if chat_id == "web" else None metadata, messages = loaded meta_dict = metadata.get("metadata", {}) summary = _session_summary_from_messages( chat_id, created_at=str(metadata.get("created_at", "")), updated_at=str(metadata.get("updated_at", "")), metadata=meta_dict if isinstance(meta_dict, dict) else {}, messages=messages, ) return {"session": summary, "messages": _serialize_session_messages(messages)} def _list_web_sessions() -> list[dict[str, Any]]: ensure_session_store_dirs() sessions: list[dict[str, Any]] = [] seen_chat_ids: set[str] = set() for path in sorted(SESSIONS_DIR.glob("api_*.jsonl")): chat_id = path.stem[4:] if not _is_web_session_chat_id(chat_id): continue summary = _load_session_summary(chat_id) if summary is None: continue sessions.append(summary) seen_chat_ids.add(chat_id) if "web" not in seen_chat_ids: now = datetime.now(timezone.utc).isoformat() sessions.append( { "chat_id": "web", "key": _session_key("web"), "title": "Current conversation", "preview": "", "created_at": now, "updated_at": now, "message_count": 0, } ) sessions.sort(key=lambda item: str(item.get("updated_at", "")), reverse=True) return sessions def _create_session(chat_id: str, title: str = "") -> dict[str, Any]: ensure_session_store_dirs() now = datetime.now(timezone.utc).isoformat() payload = { "_type": "metadata", "key": _session_key(chat_id), "created_at": now, "updated_at": now, "metadata": {"title": title.strip()} if title.strip() else {}, "last_consolidated": 0, } _session_path(chat_id).write_text( json.dumps(payload, ensure_ascii=False) + "\n", encoding="utf-8" ) summary = _load_session_summary(chat_id) if summary is None: raise RuntimeError("failed to create session") return summary def _write_session_file( chat_id: str, metadata: dict[str, Any], messages: list[dict[str, Any]] ) -> dict[str, Any] | None: try: ensure_session_store_dirs() encoded_lines = [json.dumps(metadata, ensure_ascii=False)] encoded_lines.extend(json.dumps(message, ensure_ascii=False) for message in messages) _session_path(chat_id).write_text("\n".join(encoded_lines) + "\n", encoding="utf-8") except OSError: return None return _load_session_summary(chat_id) def _rename_session(chat_id: str, title: str) -> dict[str, Any] | None: loaded = _load_session_file(chat_id) if loaded is None: if chat_id != "web": return None return _create_session(chat_id, title=title) metadata, messages = loaded meta_dict = metadata.get("metadata", {}) if not isinstance(meta_dict, dict): meta_dict = {} if title.strip(): meta_dict["title"] = title.strip() else: meta_dict.pop("title", None) now = datetime.now(timezone.utc).isoformat() normalized_metadata = { **metadata, "_type": "metadata", "key": _session_key(chat_id), "created_at": str(metadata.get("created_at", "") or now), "updated_at": now, "metadata": meta_dict, "last_consolidated": metadata.get("last_consolidated", 0), } return _write_session_file(chat_id, normalized_metadata, messages) def _delete_session(chat_id: str, delete_cards_for_chat: Callable[[str], int]) -> bool: path = _session_path(chat_id) deleted_file = False if path.exists(): try: path.unlink() deleted_file = True except OSError: return False deleted_cards = delete_cards_for_chat(chat_id) return deleted_file or deleted_cards > 0 or chat_id == "web" normalize_session_chat_id = _normalize_session_chat_id is_web_session_chat_id = _is_web_session_chat_id list_web_sessions = _list_web_sessions load_session_payload = _load_session_payload create_session = _create_session rename_session = _rename_session delete_session = _delete_session __all__ = [ "SESSIONS_DIR", "create_session", "delete_session", "ensure_session_store_dirs", "is_web_session_chat_id", "list_web_sessions", "load_session_payload", "normalize_session_chat_id", "rename_session", ]