from __future__ import annotations from datetime import datetime, timezone from fastapi import APIRouter, Depends, Request from fastapi.responses import JSONResponse, StreamingResponse from app_dependencies import get_runtime from message_pipeline import ( context_label_from_message_metadata, encode_sse_data, typed_message_from_api_notification, ) from nanobot_api_client import stream_nanobot_agent_turn from route_helpers import read_json_request from session_store import normalize_session_chat_id from web_runtime import WebAppRuntime router = APIRouter() @router.post("/message") async def post_message( request: Request, runtime: WebAppRuntime = Depends(get_runtime), ) -> JSONResponse: try: payload = await read_json_request(request) except ValueError as exc: return JSONResponse({"error": str(exc)}, status_code=400) text = str(payload.get("text", "")).strip() metadata = payload.get("metadata", {}) chat_id = normalize_session_chat_id(str(payload.get("chat_id", "web") or "web")) if not text: return JSONResponse({"error": "empty message"}, status_code=400) if not chat_id: return JSONResponse({"error": "invalid chat id"}, status_code=400) if not isinstance(metadata, dict): metadata = {} try: await runtime.gateway.send_user_message(text, metadata=metadata, chat_id=chat_id) except RuntimeError as exc: return JSONResponse({"error": str(exc)}, status_code=503) return JSONResponse({"status": "ok"}) @router.post("/message/stream") async def stream_message( request: Request, runtime: WebAppRuntime = Depends(get_runtime), ): try: payload = await read_json_request(request) except ValueError as exc: return JSONResponse({"error": str(exc)}, status_code=400) text = str(payload.get("text", "")).strip() metadata = payload.get("metadata", {}) chat_id = normalize_session_chat_id(str(payload.get("chat_id", "web") or "web")) if not text: return JSONResponse({"error": "empty message"}, status_code=400) if not chat_id: return JSONResponse({"error": "invalid chat id"}, status_code=400) if not isinstance(metadata, dict): metadata = {} context_label = context_label_from_message_metadata(metadata) async def stream_turn(): try: yield ": stream-open\n\n" yield encode_sse_data( { "type": "message", "role": "user", "content": text, "is_progress": False, "is_tool_hint": False, "timestamp": datetime.now(timezone.utc).isoformat(), "context_label": context_label or None, } ) async for typed_event in stream_nanobot_agent_turn( text, chat_id=chat_id, metadata=metadata, timeout_seconds=60.0, notification_to_event=lambda obj: typed_message_from_api_notification( obj, default_chat_id=chat_id, ), ): if typed_event.get("type") == "card": await runtime.publish_cards_changed( typed_event.get("chat_id") if isinstance(typed_event, dict) else chat_id ) elif typed_event.get("type") == "workbench": await runtime.publish_workbench_changed( typed_event.get("chat_id") if isinstance(typed_event, dict) else chat_id ) yield encode_sse_data(typed_event) await runtime.publish_sessions_changed() except RuntimeError as exc: yield encode_sse_data({"type": "error", "error": str(exc)}) return StreamingResponse( stream_turn(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no", }, )